/Users/deen/code/yugabyte-db/src/yb/docdb/redis_operation.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | // |
13 | | |
14 | | #include "yb/docdb/redis_operation.h" |
15 | | |
16 | | #include "yb/common/value.pb.h" |
17 | | #include "yb/common/ql_value.h" |
18 | | |
19 | | #include "yb/docdb/doc_key.h" |
20 | | #include "yb/docdb/doc_path.h" |
21 | | #include "yb/docdb/doc_reader_redis.h" |
22 | | #include "yb/docdb/doc_ttl_util.h" |
23 | | #include "yb/docdb/doc_write_batch.h" |
24 | | #include "yb/docdb/doc_write_batch_cache.h" |
25 | | #include "yb/docdb/docdb_rocksdb_util.h" |
26 | | #include "yb/docdb/subdocument.h" |
27 | | |
28 | | #include "yb/server/hybrid_clock.h" |
29 | | |
30 | | #include "yb/util/redis_util.h" |
31 | | #include "yb/util/status_format.h" |
32 | | #include "yb/util/stol_utils.h" |
33 | | |
34 | | DEFINE_bool(emulate_redis_responses, |
35 | | true, |
36 | | "If emulate_redis_responses is false, we hope to get slightly better performance by just " |
37 | | "returning OK for commands that might require us to read additional records viz. SADD, HSET, " |
38 | | "and HDEL. If emulate_redis_responses is true, we read the required records to compute the " |
39 | | "response as specified by the official Redis API documentation. https://redis.io/commands"); |
40 | | |
41 | | namespace yb { |
42 | | namespace docdb { |
43 | | |
44 | | // A simple conversion from RedisDataTypes to ValueTypes |
45 | | // Note: May run into issues if we want to support ttl on individual set elements, |
46 | | // as they are represented by ValueType::kNullLow. |
47 | 48 | ValueType ValueTypeFromRedisType(RedisDataType dt) { |
48 | 48 | switch(dt) { |
49 | 42 | case RedisDataType::REDIS_TYPE_STRING: |
50 | 42 | return ValueType::kString; |
51 | 0 | case RedisDataType::REDIS_TYPE_SET: |
52 | 0 | return ValueType::kRedisSet; |
53 | 0 | case RedisDataType::REDIS_TYPE_HASH: |
54 | 0 | return ValueType::kObject; |
55 | 6 | case RedisDataType::REDIS_TYPE_SORTEDSET: |
56 | 6 | return ValueType::kRedisSortedSet; |
57 | 0 | case RedisDataType::REDIS_TYPE_TIMESERIES: |
58 | 0 | return ValueType::kRedisTS; |
59 | 0 | case RedisDataType::REDIS_TYPE_LIST: |
60 | 0 | return ValueType::kRedisList; |
61 | 0 | default: |
62 | 0 | return ValueType::kInvalid; |
63 | 48 | } |
64 | 48 | } |
65 | | |
66 | | Status RedisWriteOperation::GetDocPaths( |
67 | 123k | GetDocPathsMode mode, DocPathsToLock* paths, IsolationLevel *level) const { |
68 | 123k | paths->push_back(DocKey::FromRedisKey( |
69 | 123k | request_.key_value().hash_code(), request_.key_value().key()).EncodeAsRefCntPrefix()); |
70 | 123k | *level = IsolationLevel::SNAPSHOT_ISOLATION; |
71 | | |
72 | 123k | return Status::OK(); |
73 | 123k | } |
74 | | |
75 | | namespace { |
76 | | |
77 | 161 | bool EmulateRedisResponse(const RedisDataType& data_type) { |
78 | 161 | return FLAGS_emulate_redis_responses && data_type != REDIS_TYPE_TIMESERIES; |
79 | 161 | } |
80 | | |
81 | | static const string wrong_type_message = |
82 | | "WRONGTYPE Operation against a key holding the wrong kind of value"; |
83 | | |
84 | 91.0k | CHECKED_STATUS QLValueFromSubKey(const RedisKeyValueSubKeyPB &subkey_pb, QLValuePB *out) { |
85 | 91.0k | switch (subkey_pb.subkey_case()) { |
86 | 59.5k | case RedisKeyValueSubKeyPB::kStringSubkey: |
87 | 59.5k | out->set_string_value(subkey_pb.string_subkey()); |
88 | 59.5k | break; |
89 | 31.4k | case RedisKeyValueSubKeyPB::kTimestampSubkey: |
90 | | // We use descending order for the timestamp in the timeseries type so that the latest |
91 | | // value sorts on top. |
92 | 31.4k | out->set_int64_value(subkey_pb.timestamp_subkey()); |
93 | 31.4k | break; |
94 | 0 | case RedisKeyValueSubKeyPB::kDoubleSubkey: { |
95 | 0 | out->set_double_value(subkey_pb.double_subkey()); |
96 | 0 | break; |
97 | 0 | } |
98 | 0 | default: |
99 | 0 | return STATUS_SUBSTITUTE(IllegalState, "Invalid enum value $0", subkey_pb.subkey_case()); |
100 | 91.0k | } |
101 | 91.0k | return Status::OK(); |
102 | 91.0k | } |
103 | | |
104 | | CHECKED_STATUS PrimitiveValueFromSubKey( |
105 | 1.31k | const RedisKeyValueSubKeyPB &subkey_pb, PrimitiveValue *out) { |
106 | 1.31k | QLValuePB value; |
107 | 1.31k | RETURN_NOT_OK(QLValueFromSubKey(subkey_pb, &value)); |
108 | 1.31k | *out = PrimitiveValue::FromQLValuePB( |
109 | 1.31k | value, |
110 | 1.31k | subkey_pb.subkey_case() == RedisKeyValueSubKeyPB::kTimestampSubkey ? SortingType::kDescending1.21k |
111 | 1.31k | : SortingType::kAscending100 ); |
112 | 1.31k | return Status::OK(); |
113 | 1.31k | } |
114 | | |
115 | | // Stricter version of the above when we know the exact datatype to expect. |
116 | | CHECKED_STATUS QLValueFromSubKeyStrict(const RedisKeyValueSubKeyPB& subkey_pb, |
117 | | RedisDataType data_type, |
118 | 89.7k | QLValuePB* out) { |
119 | 89.7k | switch (data_type) { |
120 | 0 | case REDIS_TYPE_LIST: FALLTHROUGH_INTENDED; |
121 | 0 | case REDIS_TYPE_SET: FALLTHROUGH_INTENDED; |
122 | 59.4k | case REDIS_TYPE_HASH: |
123 | 59.4k | if (!subkey_pb.has_string_subkey()) { |
124 | 0 | return STATUS_SUBSTITUTE(InvalidArgument, "subkey: $0 should be of string type", |
125 | 0 | subkey_pb.ShortDebugString()); |
126 | 0 | } |
127 | 59.4k | break; |
128 | 59.4k | case REDIS_TYPE_TIMESERIES: |
129 | 30.2k | if (!subkey_pb.has_timestamp_subkey()) { |
130 | 0 | return STATUS_SUBSTITUTE(InvalidArgument, "subkey: $0 should be of int64 type", |
131 | 0 | subkey_pb.ShortDebugString()); |
132 | 0 | } |
133 | 30.2k | break; |
134 | 30.2k | case REDIS_TYPE_SORTEDSET: |
135 | 0 | if (!subkey_pb.has_double_subkey()) { |
136 | 0 | return STATUS_SUBSTITUTE(InvalidArgument, "subkey: $0 should be of double type", |
137 | 0 | subkey_pb.ShortDebugString()); |
138 | 0 | } |
139 | 0 | break; |
140 | 0 | default: |
141 | 0 | return STATUS_SUBSTITUTE(IllegalState, "Invalid enum value $0", data_type); |
142 | 89.7k | } |
143 | 89.7k | return QLValueFromSubKey(subkey_pb, out); |
144 | 89.7k | } |
145 | | |
146 | | Result<RedisDataType> GetRedisValueType( |
147 | | IntentAwareIterator* iterator, |
148 | | const RedisKeyValuePB &key_value_pb, |
149 | | DocWriteBatch* doc_write_batch = nullptr, |
150 | | int subkey_index = kNilSubkeyIndex, |
151 | 114k | bool always_override = false) { |
152 | 114k | if (!key_value_pb.has_key()) { |
153 | 0 | return STATUS(Corruption, "Expected KeyValuePB"); |
154 | 0 | } |
155 | 114k | KeyBytes encoded_subdoc_key; |
156 | 114k | if (subkey_index == kNilSubkeyIndex) { |
157 | 114k | encoded_subdoc_key = DocKey::EncodedFromRedisKey(key_value_pb.hash_code(), key_value_pb.key()); |
158 | 114k | } else { |
159 | 72 | if (subkey_index >= key_value_pb.subkey_size()) { |
160 | 0 | return STATUS_SUBSTITUTE(InvalidArgument, |
161 | 0 | "Size of subkeys ($0) must be larger than subkey_index ($1)", |
162 | 0 | key_value_pb.subkey_size(), subkey_index); |
163 | 0 | } |
164 | | |
165 | 72 | PrimitiveValue subkey_primitive; |
166 | 72 | RETURN_NOT_OK(PrimitiveValueFromSubKey(key_value_pb.subkey(subkey_index), &subkey_primitive)); |
167 | 72 | encoded_subdoc_key = DocKey::EncodedFromRedisKey(key_value_pb.hash_code(), key_value_pb.key()); |
168 | 72 | subkey_primitive.AppendToKey(&encoded_subdoc_key); |
169 | 72 | } |
170 | 114k | SubDocument doc; |
171 | 114k | bool doc_found = false; |
172 | | // Use the cached entry if possible to determine the value type. |
173 | 114k | boost::optional<DocWriteBatchCache::Entry> cached_entry; |
174 | 114k | if (doc_write_batch) { |
175 | 41.3k | cached_entry = doc_write_batch->LookupCache(encoded_subdoc_key); |
176 | 41.3k | } |
177 | | |
178 | 114k | if (cached_entry) { |
179 | 0 | doc_found = true; |
180 | 0 | doc = SubDocument(cached_entry->value_type); |
181 | 114k | } else { |
182 | | // TODO(dtxn) - pass correct transaction context when we implement cross-shard transactions |
183 | | // support for Redis. |
184 | 114k | GetRedisSubDocumentData data = { encoded_subdoc_key, &doc, &doc_found }; |
185 | 114k | data.return_type_only = true; |
186 | 114k | data.exp.always_override = always_override; |
187 | 114k | RETURN_NOT_OK(GetRedisSubDocument(iterator, data, /* projection */ nullptr, |
188 | 114k | SeekFwdSuffices::kFalse)); |
189 | 114k | } |
190 | | |
191 | 114k | if (!doc_found) { |
192 | 43.5k | return REDIS_TYPE_NONE; |
193 | 43.5k | } |
194 | | |
195 | 70.5k | switch (doc.value_type()) { |
196 | 0 | case ValueType::kInvalid: FALLTHROUGH_INTENDED; |
197 | 0 | case ValueType::kTombstone: |
198 | 0 | return REDIS_TYPE_NONE; |
199 | 34 | case ValueType::kObject: |
200 | 34 | return REDIS_TYPE_HASH; |
201 | 22 | case ValueType::kRedisSet: |
202 | 22 | return REDIS_TYPE_SET; |
203 | 1.25k | case ValueType::kRedisTS: |
204 | 1.25k | return REDIS_TYPE_TIMESERIES; |
205 | 222 | case ValueType::kRedisSortedSet: |
206 | 222 | return REDIS_TYPE_SORTEDSET; |
207 | 10 | case ValueType::kRedisList: |
208 | 10 | return REDIS_TYPE_LIST; |
209 | 12 | case ValueType::kNullLow: FALLTHROUGH_INTENDED; // This value is a set member. |
210 | 69.0k | case ValueType::kString: |
211 | 69.0k | return REDIS_TYPE_STRING; |
212 | 0 | default: |
213 | 0 | return STATUS_FORMAT(Corruption, |
214 | 70.5k | "Unknown value type for redis record: $0", |
215 | 70.5k | static_cast<char>(doc.value_type())); |
216 | 70.5k | } |
217 | 70.5k | } |
218 | | |
219 | | Result<RedisValue> GetRedisValue( |
220 | | IntentAwareIterator* iterator, |
221 | | const RedisKeyValuePB &key_value_pb, |
222 | | int subkey_index = kNilSubkeyIndex, |
223 | | bool always_override = false, |
224 | 72.7k | Expiration* exp = nullptr) { |
225 | 72.7k | if (!key_value_pb.has_key()) { |
226 | 0 | return STATUS(Corruption, "Expected KeyValuePB"); |
227 | 0 | } |
228 | 72.7k | auto encoded_doc_key = DocKey::EncodedFromRedisKey(key_value_pb.hash_code(), key_value_pb.key()); |
229 | | |
230 | 72.7k | if (!key_value_pb.subkey().empty()) { |
231 | 1.24k | if (key_value_pb.subkey().size() != 1 && subkey_index == kNilSubkeyIndex8 ) { |
232 | 0 | return STATUS_SUBSTITUTE(Corruption, |
233 | 0 | "Expected at most one subkey, got $0", key_value_pb.subkey().size()); |
234 | 0 | } |
235 | 1.24k | PrimitiveValue subkey_primitive; |
236 | 1.24k | RETURN_NOT_OK(PrimitiveValueFromSubKey( |
237 | 1.24k | key_value_pb.subkey(subkey_index == kNilSubkeyIndex ? 0 : subkey_index), |
238 | 1.24k | &subkey_primitive)); |
239 | 1.24k | subkey_primitive.AppendToKey(&encoded_doc_key); |
240 | 1.24k | } |
241 | | |
242 | 72.7k | SubDocument doc; |
243 | 72.7k | bool doc_found = false; |
244 | | |
245 | | // TODO(dtxn) - pass correct transaction context when we implement cross-shard transactions |
246 | | // support for Redis. |
247 | 72.7k | GetRedisSubDocumentData data = { encoded_doc_key, &doc, &doc_found }; |
248 | 72.7k | data.exp.always_override = always_override; |
249 | 72.7k | RETURN_NOT_OK(GetRedisSubDocument( |
250 | 72.7k | iterator, data, /* projection */ nullptr, SeekFwdSuffices::kFalse)); |
251 | 72.7k | if (!doc_found) { |
252 | 3.08k | return RedisValue{REDIS_TYPE_NONE}; |
253 | 3.08k | } |
254 | | |
255 | 69.6k | if (HasExpiredTTL(data.exp.write_ht, data.exp.ttl, iterator->read_time().read)) { |
256 | 0 | return RedisValue{REDIS_TYPE_NONE}; |
257 | 0 | } |
258 | | |
259 | 69.6k | if (exp) |
260 | 60 | *exp = data.exp; |
261 | | |
262 | 69.6k | if (!doc.IsPrimitive()) { |
263 | 6 | switch (doc.value_type()) { |
264 | 0 | case ValueType::kObject: |
265 | 0 | return RedisValue{REDIS_TYPE_HASH}; |
266 | 0 | case ValueType::kRedisTS: |
267 | 0 | return RedisValue{REDIS_TYPE_TIMESERIES}; |
268 | 6 | case ValueType::kRedisSortedSet: |
269 | 6 | return RedisValue{REDIS_TYPE_SORTEDSET}; |
270 | 0 | case ValueType::kRedisSet: |
271 | 0 | return RedisValue{REDIS_TYPE_SET}; |
272 | 0 | case ValueType::kRedisList: |
273 | 0 | return RedisValue{REDIS_TYPE_LIST}; |
274 | 0 | default: |
275 | 0 | return STATUS_SUBSTITUTE(IllegalState, "Invalid value type: $0", |
276 | 6 | static_cast<int>(doc.value_type())); |
277 | 6 | } |
278 | 6 | } |
279 | | |
280 | 69.6k | auto val = RedisValue{REDIS_TYPE_STRING, doc.GetString(), data.exp}; |
281 | 69.6k | return val; |
282 | 69.6k | } |
283 | | |
284 | | YB_STRONGLY_TYPED_BOOL(VerifySuccessIfMissing); |
285 | | |
286 | | // Set response based on the type match. Return whether the type matches what's expected. |
287 | | bool VerifyTypeAndSetCode( |
288 | | const RedisDataType expected_type, |
289 | | const RedisDataType actual_type, |
290 | | RedisResponsePB *response, |
291 | 145k | VerifySuccessIfMissing verify_success_if_missing = VerifySuccessIfMissing::kFalse) { |
292 | 145k | if (actual_type == RedisDataType::REDIS_TYPE_NONE) { |
293 | 5.44k | if (verify_success_if_missing) { |
294 | 5.44k | response->set_code(RedisResponsePB::NIL); |
295 | 5.44k | } else { |
296 | 0 | response->set_code(RedisResponsePB::NOT_FOUND); |
297 | 0 | } |
298 | 5.44k | return verify_success_if_missing; |
299 | 5.44k | } |
300 | 139k | if (actual_type != expected_type) { |
301 | 12 | response->set_code(RedisResponsePB::WRONG_TYPE); |
302 | 12 | response->set_error_message(wrong_type_message); |
303 | 12 | return false; |
304 | 12 | } |
305 | 139k | response->set_code(RedisResponsePB::OK); |
306 | 139k | return true; |
307 | 139k | } |
308 | | |
309 | | bool VerifyTypeAndSetCode( |
310 | | const docdb::ValueType expected_type, |
311 | | const docdb::ValueType actual_type, |
312 | 11.0k | RedisResponsePB *response) { |
313 | 11.0k | if (actual_type != expected_type) { |
314 | 9 | response->set_code(RedisResponsePB::WRONG_TYPE); |
315 | 9 | response->set_error_message(wrong_type_message); |
316 | 9 | return false; |
317 | 9 | } |
318 | 11.0k | response->set_code(RedisResponsePB::OK); |
319 | 11.0k | return true; |
320 | 11.0k | } |
321 | | |
322 | | CHECKED_STATUS AddPrimitiveValueToResponseArray(const PrimitiveValue& value, |
323 | 3.85M | RedisArrayPB* redis_array) { |
324 | 3.85M | switch (value.value_type()) { |
325 | 2.02M | case ValueType::kString: FALLTHROUGH_INTENDED; |
326 | 2.02M | case ValueType::kStringDescending: { |
327 | 2.02M | redis_array->add_elements(value.GetString()); |
328 | 2.02M | return Status::OK(); |
329 | 2.02M | } |
330 | 0 | case ValueType::kInt64: FALLTHROUGH_INTENDED; |
331 | 1.83M | case ValueType::kInt64Descending: { |
332 | 1.83M | redis_array->add_elements(std::to_string(value.GetInt64())); |
333 | 1.83M | return Status::OK(); |
334 | 0 | } |
335 | 58 | case ValueType::kDouble: FALLTHROUGH_INTENDED; |
336 | 58 | case ValueType::kDoubleDescending: { |
337 | 58 | redis_array->add_elements(std::to_string(value.GetDouble())); |
338 | 58 | return Status::OK(); |
339 | 58 | } |
340 | 0 | default: |
341 | 0 | return STATUS_SUBSTITUTE(InvalidArgument, "Invalid value type: $0", |
342 | 3.85M | static_cast<int>(value.value_type())); |
343 | 3.85M | } |
344 | 3.85M | } |
345 | | |
346 | | CHECKED_STATUS AddResponseValuesGeneric(const PrimitiveValue& first, |
347 | | const PrimitiveValue& second, |
348 | | RedisResponsePB* response, |
349 | | bool add_keys, |
350 | | bool add_values, |
351 | 1.83M | bool reverse = false) { |
352 | 1.83M | if (add_keys) { |
353 | 1.83M | RETURN_NOT_OK(AddPrimitiveValueToResponseArray(first, response->mutable_array_response())); |
354 | 1.83M | } |
355 | 1.83M | if (add_values) { |
356 | 1.83M | RETURN_NOT_OK(AddPrimitiveValueToResponseArray(second, response->mutable_array_response())); |
357 | 1.83M | } |
358 | 1.83M | return Status::OK(); |
359 | 1.83M | } |
360 | | |
361 | | // Populate the response array for sorted sets range queries. |
362 | | // first refers to the score for the given values. |
363 | | // second refers to a subdocument where each key is a value with the given score. |
364 | | CHECKED_STATUS AddResponseValuesSortedSets(const PrimitiveValue& first, |
365 | | const SubDocument& second, |
366 | | RedisResponsePB* response, |
367 | | bool add_keys, |
368 | | bool add_values, |
369 | 288 | bool reverse = false) { |
370 | 288 | if (reverse) { |
371 | 32 | for (auto it = second.object_container().rbegin(); |
372 | 52 | it != second.object_container().rend(); |
373 | 32 | it++20 ) { |
374 | 20 | const PrimitiveValue& value = it->first; |
375 | 20 | RETURN_NOT_OK(AddResponseValuesGeneric(value, first, response, add_values, add_keys)); |
376 | 20 | } |
377 | 256 | } else { |
378 | 256 | for (const auto& kv : second.object_container()) { |
379 | 204 | const PrimitiveValue& value = kv.first; |
380 | 204 | RETURN_NOT_OK(AddResponseValuesGeneric(value, first, response, add_values, add_keys)); |
381 | 204 | } |
382 | 256 | } |
383 | 288 | return Status::OK(); |
384 | 288 | } |
385 | | |
386 | | template <typename T, typename AddResponseRow> |
387 | | CHECKED_STATUS PopulateRedisResponseFromInternal(T iter, |
388 | | AddResponseRow add_response_row, |
389 | | const T& iter_end, |
390 | | RedisResponsePB *response, |
391 | | bool add_keys, |
392 | | bool add_values, |
393 | 10.9k | bool reverse = false) { |
394 | 10.9k | response->set_allocated_array_response(new RedisArrayPB()); |
395 | 1.84M | for (; iter != iter_end; iter++1.83M ) { |
396 | 1.83M | RETURN_NOT_OK(add_response_row( |
397 | 1.83M | iter->first, iter->second, response, add_keys, add_values, reverse)); |
398 | 1.83M | } |
399 | 10.9k | return Status::OK(); |
400 | 10.9k | } redis_operation.cc:yb::Status yb::docdb::(anonymous namespace)::PopulateRedisResponseFromInternal<std::__1::reverse_iterator<std::__1::__map_const_iterator<std::__1::__tree_const_iterator<std::__1::__value_type<yb::docdb::PrimitiveValue, yb::docdb::SubDocument>, std::__1::__tree_node<std::__1::__value_type<yb::docdb::PrimitiveValue, yb::docdb::SubDocument>, void*>*, long> > >, yb::Status (*)(yb::docdb::PrimitiveValue const&, yb::docdb::PrimitiveValue const&, yb::RedisResponsePB*, bool, bool, bool)>(std::__1::reverse_iterator<std::__1::__map_const_iterator<std::__1::__tree_const_iterator<std::__1::__value_type<yb::docdb::PrimitiveValue, yb::docdb::SubDocument>, std::__1::__tree_node<std::__1::__value_type<yb::docdb::PrimitiveValue, yb::docdb::SubDocument>, void*>*, long> > >, yb::Status (*)(yb::docdb::PrimitiveValue const&, yb::docdb::PrimitiveValue const&, yb::RedisResponsePB*, bool, bool, bool), std::__1::reverse_iterator<std::__1::__map_const_iterator<std::__1::__tree_const_iterator<std::__1::__value_type<yb::docdb::PrimitiveValue, yb::docdb::SubDocument>, std::__1::__tree_node<std::__1::__value_type<yb::docdb::PrimitiveValue, yb::docdb::SubDocument>, void*>*, long> > > const&, yb::RedisResponsePB*, bool, bool, bool) Line | Count | Source | 393 | 60 | bool reverse = false) { | 394 | 60 | response->set_allocated_array_response(new RedisArrayPB()); | 395 | 1.46k | for (; iter != iter_end; iter++1.40k ) { | 396 | 1.40k | RETURN_NOT_OK(add_response_row( | 397 | 1.40k | iter->first, iter->second, response, add_keys, add_values, reverse)); | 398 | 1.40k | } | 399 | 60 | return Status::OK(); | 400 | 60 | } |
redis_operation.cc:yb::Status yb::docdb::(anonymous namespace)::PopulateRedisResponseFromInternal<std::__1::__map_const_iterator<std::__1::__tree_const_iterator<std::__1::__value_type<yb::docdb::PrimitiveValue, yb::docdb::SubDocument>, std::__1::__tree_node<std::__1::__value_type<yb::docdb::PrimitiveValue, yb::docdb::SubDocument>, void*>*, long> >, yb::Status (*)(yb::docdb::PrimitiveValue const&, yb::docdb::PrimitiveValue const&, yb::RedisResponsePB*, bool, bool, bool)>(std::__1::__map_const_iterator<std::__1::__tree_const_iterator<std::__1::__value_type<yb::docdb::PrimitiveValue, yb::docdb::SubDocument>, std::__1::__tree_node<std::__1::__value_type<yb::docdb::PrimitiveValue, yb::docdb::SubDocument>, void*>*, long> >, yb::Status (*)(yb::docdb::PrimitiveValue const&, yb::docdb::PrimitiveValue const&, yb::RedisResponsePB*, bool, bool, bool), std::__1::__map_const_iterator<std::__1::__tree_const_iterator<std::__1::__value_type<yb::docdb::PrimitiveValue, yb::docdb::SubDocument>, std::__1::__tree_node<std::__1::__value_type<yb::docdb::PrimitiveValue, yb::docdb::SubDocument>, void*>*, long> > const&, yb::RedisResponsePB*, bool, bool, bool) Line | Count | Source | 393 | 10.8k | bool reverse = false) { | 394 | 10.8k | response->set_allocated_array_response(new RedisArrayPB()); | 395 | 1.84M | for (; iter != iter_end; iter++1.83M ) { | 396 | 1.83M | RETURN_NOT_OK(add_response_row( | 397 | 1.83M | iter->first, iter->second, response, add_keys, add_values, reverse)); | 398 | 1.83M | } | 399 | 10.8k | return Status::OK(); | 400 | 10.8k | } |
redis_operation.cc:yb::Status yb::docdb::(anonymous namespace)::PopulateRedisResponseFromInternal<std::__1::reverse_iterator<std::__1::__map_const_iterator<std::__1::__tree_const_iterator<std::__1::__value_type<yb::docdb::PrimitiveValue, yb::docdb::SubDocument>, std::__1::__tree_node<std::__1::__value_type<yb::docdb::PrimitiveValue, yb::docdb::SubDocument>, void*>*, long> > >, yb::Status (*)(yb::docdb::PrimitiveValue const&, yb::docdb::SubDocument const&, yb::RedisResponsePB*, bool, bool, bool)>(std::__1::reverse_iterator<std::__1::__map_const_iterator<std::__1::__tree_const_iterator<std::__1::__value_type<yb::docdb::PrimitiveValue, yb::docdb::SubDocument>, std::__1::__tree_node<std::__1::__value_type<yb::docdb::PrimitiveValue, yb::docdb::SubDocument>, void*>*, long> > >, yb::Status (*)(yb::docdb::PrimitiveValue const&, yb::docdb::SubDocument const&, yb::RedisResponsePB*, bool, bool, bool), std::__1::reverse_iterator<std::__1::__map_const_iterator<std::__1::__tree_const_iterator<std::__1::__value_type<yb::docdb::PrimitiveValue, yb::docdb::SubDocument>, std::__1::__tree_node<std::__1::__value_type<yb::docdb::PrimitiveValue, yb::docdb::SubDocument>, void*>*, long> > > const&, yb::RedisResponsePB*, bool, bool, bool) Line | Count | Source | 393 | 8 | bool reverse = false) { | 394 | 8 | response->set_allocated_array_response(new RedisArrayPB()); | 395 | 40 | for (; iter != iter_end; iter++32 ) { | 396 | 32 | RETURN_NOT_OK(add_response_row( | 397 | 32 | iter->first, iter->second, response, add_keys, add_values, reverse)); | 398 | 32 | } | 399 | 8 | return Status::OK(); | 400 | 8 | } |
redis_operation.cc:yb::Status yb::docdb::(anonymous namespace)::PopulateRedisResponseFromInternal<std::__1::__map_const_iterator<std::__1::__tree_const_iterator<std::__1::__value_type<yb::docdb::PrimitiveValue, yb::docdb::SubDocument>, std::__1::__tree_node<std::__1::__value_type<yb::docdb::PrimitiveValue, yb::docdb::SubDocument>, void*>*, long> >, yb::Status (*)(yb::docdb::PrimitiveValue const&, yb::docdb::SubDocument const&, yb::RedisResponsePB*, bool, bool, bool)>(std::__1::__map_const_iterator<std::__1::__tree_const_iterator<std::__1::__value_type<yb::docdb::PrimitiveValue, yb::docdb::SubDocument>, std::__1::__tree_node<std::__1::__value_type<yb::docdb::PrimitiveValue, yb::docdb::SubDocument>, void*>*, long> >, yb::Status (*)(yb::docdb::PrimitiveValue const&, yb::docdb::SubDocument const&, yb::RedisResponsePB*, bool, bool, bool), std::__1::__map_const_iterator<std::__1::__tree_const_iterator<std::__1::__value_type<yb::docdb::PrimitiveValue, yb::docdb::SubDocument>, std::__1::__tree_node<std::__1::__value_type<yb::docdb::PrimitiveValue, yb::docdb::SubDocument>, void*>*, long> > const&, yb::RedisResponsePB*, bool, bool, bool) Line | Count | Source | 393 | 80 | bool reverse = false) { | 394 | 80 | response->set_allocated_array_response(new RedisArrayPB()); | 395 | 336 | for (; iter != iter_end; iter++256 ) { | 396 | 256 | RETURN_NOT_OK(add_response_row( | 397 | 256 | iter->first, iter->second, response, add_keys, add_values, reverse)); | 398 | 256 | } | 399 | 80 | return Status::OK(); | 400 | 80 | } |
|
401 | | |
402 | | template <typename AddResponseRow> |
403 | | CHECKED_STATUS PopulateResponseFrom(const SubDocument::ObjectContainer &key_values, |
404 | | AddResponseRow add_response_row, |
405 | | RedisResponsePB *response, |
406 | | bool add_keys, |
407 | | bool add_values, |
408 | 10.9k | bool reverse = false) { |
409 | 10.9k | if (reverse) { |
410 | 68 | return PopulateRedisResponseFromInternal(key_values.rbegin(), add_response_row, |
411 | 68 | key_values.rend(), response, add_keys, |
412 | 68 | add_values, reverse); |
413 | 10.9k | } else { |
414 | 10.9k | return PopulateRedisResponseFromInternal(key_values.begin(), add_response_row, |
415 | 10.9k | key_values.end(), response, add_keys, |
416 | 10.9k | add_values, reverse); |
417 | 10.9k | } |
418 | 10.9k | } redis_operation.cc:yb::Status yb::docdb::(anonymous namespace)::PopulateResponseFrom<yb::Status (*)(yb::docdb::PrimitiveValue const&, yb::docdb::PrimitiveValue const&, yb::RedisResponsePB*, bool, bool, bool)>(std::__1::map<yb::docdb::PrimitiveValue, yb::docdb::SubDocument, std::__1::less<yb::docdb::PrimitiveValue>, std::__1::allocator<std::__1::pair<yb::docdb::PrimitiveValue const, yb::docdb::SubDocument> > > const&, yb::Status (*)(yb::docdb::PrimitiveValue const&, yb::docdb::PrimitiveValue const&, yb::RedisResponsePB*, bool, bool, bool), yb::RedisResponsePB*, bool, bool, bool) Line | Count | Source | 408 | 10.9k | bool reverse = false) { | 409 | 10.9k | if (reverse) { | 410 | 60 | return PopulateRedisResponseFromInternal(key_values.rbegin(), add_response_row, | 411 | 60 | key_values.rend(), response, add_keys, | 412 | 60 | add_values, reverse); | 413 | 10.8k | } else { | 414 | 10.8k | return PopulateRedisResponseFromInternal(key_values.begin(), add_response_row, | 415 | 10.8k | key_values.end(), response, add_keys, | 416 | 10.8k | add_values, reverse); | 417 | 10.8k | } | 418 | 10.9k | } |
redis_operation.cc:yb::Status yb::docdb::(anonymous namespace)::PopulateResponseFrom<yb::Status (*)(yb::docdb::PrimitiveValue const&, yb::docdb::SubDocument const&, yb::RedisResponsePB*, bool, bool, bool)>(std::__1::map<yb::docdb::PrimitiveValue, yb::docdb::SubDocument, std::__1::less<yb::docdb::PrimitiveValue>, std::__1::allocator<std::__1::pair<yb::docdb::PrimitiveValue const, yb::docdb::SubDocument> > > const&, yb::Status (*)(yb::docdb::PrimitiveValue const&, yb::docdb::SubDocument const&, yb::RedisResponsePB*, bool, bool, bool), yb::RedisResponsePB*, bool, bool, bool) Line | Count | Source | 408 | 88 | bool reverse = false) { | 409 | 88 | if (reverse) { | 410 | 8 | return PopulateRedisResponseFromInternal(key_values.rbegin(), add_response_row, | 411 | 8 | key_values.rend(), response, add_keys, | 412 | 8 | add_values, reverse); | 413 | 80 | } else { | 414 | 80 | return PopulateRedisResponseFromInternal(key_values.begin(), add_response_row, | 415 | 80 | key_values.end(), response, add_keys, | 416 | 80 | add_values, reverse); | 417 | 80 | } | 418 | 88 | } |
|
419 | | |
420 | | void SetOptionalInt(RedisDataType type, int64_t value, int64_t none_value, |
421 | 24 | RedisResponsePB* response) { |
422 | 24 | if (type == RedisDataType::REDIS_TYPE_NONE) { |
423 | 14 | response->set_code(RedisResponsePB::NIL); |
424 | 14 | response->set_int_response(none_value); |
425 | 14 | } else { |
426 | 10 | response->set_code(RedisResponsePB::OK); |
427 | 10 | response->set_int_response(value); |
428 | 10 | } |
429 | 24 | } |
430 | | |
431 | 16 | void SetOptionalInt(RedisDataType type, int64_t value, RedisResponsePB* response) { |
432 | 16 | SetOptionalInt(type, value, 0, response); |
433 | 16 | } |
434 | | |
435 | 21.1k | Result<int64_t> GetCardinality(IntentAwareIterator* iterator, const RedisKeyValuePB& kv) { |
436 | 21.1k | auto encoded_key_card = DocKey::EncodedFromRedisKey(kv.hash_code(), kv.key()); |
437 | 21.1k | PrimitiveValue(ValueType::kCounter).AppendToKey(&encoded_key_card); |
438 | 21.1k | SubDocument subdoc_card; |
439 | | |
440 | 21.1k | bool subdoc_card_found = false; |
441 | 21.1k | GetRedisSubDocumentData data = { encoded_key_card, &subdoc_card, &subdoc_card_found }; |
442 | | |
443 | 21.1k | RETURN_NOT_OK(GetRedisSubDocument( |
444 | 21.1k | iterator, data, /* projection */ nullptr, SeekFwdSuffices::kFalse)); |
445 | | |
446 | 21.1k | return subdoc_card_found ? subdoc_card.GetInt64()90 : 021.0k ; |
447 | 21.1k | } |
448 | | |
449 | | template <typename AddResponseValues> |
450 | | CHECKED_STATUS GetAndPopulateResponseValues( |
451 | | IntentAwareIterator* iterator, |
452 | | AddResponseValues add_response_values, |
453 | | const GetRedisSubDocumentData& data, |
454 | | ValueType expected_type, |
455 | | const RedisReadRequestPB& request, |
456 | | RedisResponsePB* response, |
457 | 10.9k | bool add_keys, bool add_values, bool reverse) { |
458 | | |
459 | 10.9k | RETURN_NOT_OK(GetRedisSubDocument( |
460 | 10.9k | iterator, data, /* projection */ nullptr, SeekFwdSuffices::kFalse)); |
461 | | |
462 | | // Validate and populate response. |
463 | 10.9k | response->set_allocated_array_response(new RedisArrayPB()); |
464 | 10.9k | if (!(*data.doc_found)) { |
465 | 6 | response->set_code(RedisResponsePB::NIL); |
466 | 6 | return Status::OK(); |
467 | 6 | } |
468 | | |
469 | 10.9k | if (VerifyTypeAndSetCode(expected_type, data.result->value_type(), response)) { |
470 | 10.9k | RETURN_NOT_OK(PopulateResponseFrom(data.result->object_container(), |
471 | 10.9k | add_response_values, |
472 | 10.9k | response, add_keys, add_values, reverse)); |
473 | 10.9k | } |
474 | 10.9k | return Status::OK(); |
475 | 10.9k | } redis_operation.cc:yb::Status yb::docdb::(anonymous namespace)::GetAndPopulateResponseValues<yb::Status (*)(yb::docdb::PrimitiveValue const&, yb::docdb::SubDocument const&, yb::RedisResponsePB*, bool, bool, bool)>(yb::docdb::IntentAwareIterator*, yb::Status (*)(yb::docdb::PrimitiveValue const&, yb::docdb::SubDocument const&, yb::RedisResponsePB*, bool, bool, bool), yb::docdb::GetRedisSubDocumentData const&, yb::docdb::ValueType, yb::RedisReadRequestPB const&, yb::RedisResponsePB*, bool, bool, bool) Line | Count | Source | 457 | 94 | bool add_keys, bool add_values, bool reverse) { | 458 | | | 459 | 94 | RETURN_NOT_OK(GetRedisSubDocument( | 460 | 94 | iterator, data, /* projection */ nullptr, SeekFwdSuffices::kFalse)); | 461 | | | 462 | | // Validate and populate response. | 463 | 94 | response->set_allocated_array_response(new RedisArrayPB()); | 464 | 94 | if (!(*data.doc_found)) { | 465 | 6 | response->set_code(RedisResponsePB::NIL); | 466 | 6 | return Status::OK(); | 467 | 6 | } | 468 | | | 469 | 88 | if (VerifyTypeAndSetCode(expected_type, data.result->value_type(), response)) { | 470 | 88 | RETURN_NOT_OK(PopulateResponseFrom(data.result->object_container(), | 471 | 88 | add_response_values, | 472 | 88 | response, add_keys, add_values, reverse)); | 473 | 88 | } | 474 | 88 | return Status::OK(); | 475 | 88 | } |
redis_operation.cc:yb::Status yb::docdb::(anonymous namespace)::GetAndPopulateResponseValues<yb::Status (*)(yb::docdb::PrimitiveValue const&, yb::docdb::PrimitiveValue const&, yb::RedisResponsePB*, bool, bool, bool)>(yb::docdb::IntentAwareIterator*, yb::Status (*)(yb::docdb::PrimitiveValue const&, yb::docdb::PrimitiveValue const&, yb::RedisResponsePB*, bool, bool, bool), yb::docdb::GetRedisSubDocumentData const&, yb::docdb::ValueType, yb::RedisReadRequestPB const&, yb::RedisResponsePB*, bool, bool, bool) Line | Count | Source | 457 | 10.8k | bool add_keys, bool add_values, bool reverse) { | 458 | | | 459 | 10.8k | RETURN_NOT_OK(GetRedisSubDocument( | 460 | 10.8k | iterator, data, /* projection */ nullptr, SeekFwdSuffices::kFalse)); | 461 | | | 462 | | // Validate and populate response. | 463 | 10.8k | response->set_allocated_array_response(new RedisArrayPB()); | 464 | 10.8k | if (!(*data.doc_found)) { | 465 | 0 | response->set_code(RedisResponsePB::NIL); | 466 | 0 | return Status::OK(); | 467 | 0 | } | 468 | | | 469 | 10.8k | if (VerifyTypeAndSetCode(expected_type, data.result->value_type(), response)) { | 470 | 10.8k | RETURN_NOT_OK(PopulateResponseFrom(data.result->object_container(), | 471 | 10.8k | add_response_values, | 472 | 10.8k | response, add_keys, add_values, reverse)); | 473 | 10.8k | } | 474 | 10.8k | return Status::OK(); | 475 | 10.8k | } |
|
476 | | |
477 | | // Get normalized (with respect to card) upper and lower index bounds for reverse range scans. |
478 | | void GetNormalizedBounds(int64 low_idx, int64 high_idx, int64 card, bool reverse, |
479 | 20 | int64* low_idx_normalized, int64* high_idx_normalized) { |
480 | | // Turn negative bounds positive. |
481 | 20 | if (low_idx < 0) { |
482 | 0 | low_idx = card + low_idx; |
483 | 0 | } |
484 | 20 | if (high_idx < 0) { |
485 | 0 | high_idx = card + high_idx; |
486 | 0 | } |
487 | | |
488 | | // Index from lower to upper instead of upper to lower. |
489 | 20 | if (reverse) { |
490 | 8 | *low_idx_normalized = card - high_idx - 1; |
491 | 8 | *high_idx_normalized = card - low_idx - 1; |
492 | 12 | } else { |
493 | 12 | *low_idx_normalized = low_idx; |
494 | 12 | *high_idx_normalized = high_idx; |
495 | 12 | } |
496 | | |
497 | | // Fit bounds to range [0, card). |
498 | 20 | if (*low_idx_normalized < 0) { |
499 | 4 | *low_idx_normalized = 0; |
500 | 4 | } |
501 | 20 | if (*high_idx_normalized >= card) { |
502 | 8 | *high_idx_normalized = card - 1; |
503 | 8 | } |
504 | 20 | } |
505 | | |
506 | | } // anonymous namespace |
507 | | |
508 | 41.4k | void RedisWriteOperation::InitializeIterator(const DocOperationApplyData& data) { |
509 | 41.4k | auto subdoc_key = SubDocKey(DocKey::FromRedisKey( |
510 | 41.4k | request_.key_value().hash_code(), request_.key_value().key())); |
511 | | |
512 | 41.4k | auto iter = CreateIntentAwareIterator( |
513 | 41.4k | data.doc_write_batch->doc_db(), |
514 | 41.4k | BloomFilterMode::USE_BLOOM_FILTER, subdoc_key.Encode().AsSlice(), |
515 | 41.4k | redis_query_id(), TransactionOperationContext(), data.deadline, data.read_time); |
516 | | |
517 | 41.4k | iterator_ = std::move(iter); |
518 | 41.4k | } |
519 | | |
520 | 123k | Status RedisWriteOperation::Apply(const DocOperationApplyData& data) { |
521 | 123k | switch (request_.request_case()) { |
522 | 123k | case RedisWriteRequestPB::kSetRequest: |
523 | 123k | return ApplySet(data); |
524 | 114 | case RedisWriteRequestPB::kSetTtlRequest: |
525 | 114 | return ApplySetTtl(data); |
526 | 2 | case RedisWriteRequestPB::kGetsetRequest: |
527 | 2 | return ApplyGetSet(data); |
528 | 4 | case RedisWriteRequestPB::kAppendRequest: |
529 | 4 | return ApplyAppend(data); |
530 | 70 | case RedisWriteRequestPB::kDelRequest: |
531 | 70 | return ApplyDel(data); |
532 | 1 | case RedisWriteRequestPB::kSetRangeRequest: |
533 | 1 | return ApplySetRange(data); |
534 | 23 | case RedisWriteRequestPB::kIncrRequest: |
535 | 23 | return ApplyIncr(data); |
536 | 16 | case RedisWriteRequestPB::kPushRequest: |
537 | 16 | return ApplyPush(data); |
538 | 0 | case RedisWriteRequestPB::kInsertRequest: |
539 | 0 | return ApplyInsert(data); |
540 | 0 | case RedisWriteRequestPB::kPopRequest: |
541 | 0 | return ApplyPop(data); |
542 | 26 | case RedisWriteRequestPB::kAddRequest: |
543 | 26 | return ApplyAdd(data); |
544 | | // TODO: Cut this short in doc_operation. |
545 | 6 | case RedisWriteRequestPB::kNoOpRequest: |
546 | 6 | return Status::OK(); |
547 | 0 | case RedisWriteRequestPB::REQUEST_NOT_SET: break; |
548 | 123k | } |
549 | 0 | return STATUS_FORMAT(Corruption, "Unsupported redis read operation: $0", request_.request_case()); |
550 | 123k | } |
551 | | |
552 | | Result<RedisDataType> RedisWriteOperation::GetValueType( |
553 | 41.3k | const DocOperationApplyData& data, int subkey_index) { |
554 | 41.3k | if (!iterator_) { |
555 | 41.3k | InitializeIterator(data); |
556 | 41.3k | } |
557 | 41.3k | return GetRedisValueType( |
558 | 41.3k | iterator_.get(), request_.key_value(), data.doc_write_batch, subkey_index); |
559 | 41.3k | } |
560 | | |
561 | | Result<RedisValue> RedisWriteOperation::GetValue( |
562 | 144 | const DocOperationApplyData& data, int subkey_index, Expiration* ttl) { |
563 | 144 | if (!iterator_) { |
564 | 121 | InitializeIterator(data); |
565 | 121 | } |
566 | 144 | return GetRedisValue(iterator_.get(), request_.key_value(), |
567 | 144 | subkey_index, /* always_override */ false, ttl); |
568 | 144 | } |
569 | | |
570 | 42.3k | void EnsureMapEntry(QLVirtualValuePB type, QLValuePB* value, QLMapValuePB** cache) { |
571 | 42.3k | if (*cache) { |
572 | 142 | return; |
573 | 142 | } |
574 | 42.2k | value->mutable_map_value()->mutable_keys()->Add()->set_virtual_value(type); |
575 | 42.2k | *cache = value->mutable_map_value()->mutable_values()->Add()->mutable_map_value(); |
576 | 42.2k | } |
577 | | |
578 | 123k | Status RedisWriteOperation::ApplySet(const DocOperationApplyData& data) { |
579 | 123k | const RedisKeyValuePB& kv = request_.key_value(); |
580 | 123k | const MonoDelta ttl = request_.set_request().has_ttl() ? |
581 | 123k | MonoDelta::FromMilliseconds(request_.set_request().ttl())89 : ValueControlFields::kMaxTtl; |
582 | 123k | DocPath doc_path = DocPath::DocPathFromRedisKey(kv.hash_code(), kv.key()); |
583 | 123k | if (kv.subkey_size() > 0) { |
584 | 41.1k | RedisDataType data_type = VERIFY_RESULT(GetValueType(data)); |
585 | 0 | switch (kv.type()) { |
586 | 174 | case REDIS_TYPE_TIMESERIES: FALLTHROUGH_INTENDED; |
587 | 20.0k | case REDIS_TYPE_HASH: { |
588 | 20.0k | if (data_type != kv.type() && data_type != REDIS_TYPE_NONE19.9k ) { |
589 | 0 | response_.set_code(RedisResponsePB::WRONG_TYPE); |
590 | 0 | response_.set_error_message(wrong_type_message); |
591 | 0 | return Status::OK(); |
592 | 0 | } |
593 | 20.0k | QLValuePB kv_entries; |
594 | 20.0k | auto& map = *kv_entries.mutable_map_value(); |
595 | 109k | for (int i = 0; i < kv.subkey_size(); i++89.1k ) { |
596 | 89.1k | RETURN_NOT_OK(QLValueFromSubKeyStrict( |
597 | 89.1k | kv.subkey(i), kv.type(), map.mutable_keys()->Add())); |
598 | 89.1k | map.mutable_values()->Add()->set_string_value(kv.value(i)); |
599 | 89.1k | } |
600 | | |
601 | | // For an HSET command (which has only one subkey), we need to read the subkey to find out |
602 | | // if the key already existed, and return 0 or 1 accordingly. This read is unnecessary for |
603 | | // HMSET and TSADD. |
604 | 20.0k | if (kv.subkey_size() == 1 && EmulateRedisResponse(kv.type())81 && |
605 | 20.0k | !request_.set_request().expect_ok_response()9 ) { |
606 | 8 | RedisDataType type = VERIFY_RESULT(GetValueType(data, 0)); |
607 | | // For HSET/TSADD, we return 0 or 1 depending on if the key already existed. |
608 | | // If flag is false, no int response is returned. |
609 | 0 | SetOptionalInt(type, 0, 1, &response_); |
610 | 8 | } |
611 | | |
612 | 20.0k | ValueRef value_ref(kv_entries); |
613 | 20.0k | if (kv.type() == REDIS_TYPE_TIMESERIES) { |
614 | 174 | value_ref.set_custom_value_type(ValueType::kRedisTS); |
615 | 174 | value_ref.set_list_extend_order(ListExtendOrder::PREPEND); |
616 | 174 | } |
617 | | |
618 | 20.0k | if (data_type == REDIS_TYPE_NONE && kv.type() == REDIS_TYPE_TIMESERIES19.9k ) { |
619 | | // Need to insert the document instead of extending it. |
620 | 162 | RETURN_NOT_OK(data.doc_write_batch->InsertSubDocument( |
621 | 162 | doc_path, value_ref, data.read_time, |
622 | 162 | data.deadline, redis_query_id(), ttl, ValueControlFields::kInvalidUserTimestamp, |
623 | 162 | false /* init_marker_ttl */)); |
624 | 19.8k | } else { |
625 | 19.8k | RETURN_NOT_OK(data.doc_write_batch->ExtendSubDocument( |
626 | 19.8k | doc_path, value_ref, data.read_time, |
627 | 19.8k | data.deadline, redis_query_id(), ttl)); |
628 | 19.8k | } |
629 | 20.0k | break; |
630 | 20.0k | } |
631 | 21.1k | case REDIS_TYPE_SORTEDSET: { |
632 | 21.1k | if (data_type != kv.type() && data_type != REDIS_TYPE_NONE21.0k ) { |
633 | 0 | response_.set_code(RedisResponsePB::WRONG_TYPE); |
634 | 0 | response_.set_error_message(wrong_type_message); |
635 | 0 | return Status::OK(); |
636 | 0 | } |
637 | | |
638 | | // The SubDocuments to be inserted for card, the forward mapping, and reverse mapping. |
639 | 21.1k | std::unordered_map<double, QLMapValuePB*> forward_entries; |
640 | 21.1k | QLMapValuePB* kv_entries_forward = nullptr; |
641 | 21.1k | QLMapValuePB* kv_entries_reverse = nullptr; |
642 | | |
643 | | // The top level mapping. |
644 | 21.1k | QLValuePB kv_entries; |
645 | | |
646 | 21.1k | int new_elements_added = 0; |
647 | 21.1k | int return_value = 0; |
648 | 42.3k | for (int i = 0; i < kv.subkey_size(); i++21.1k ) { |
649 | | // Check whether the value is already in the document, if so delete it. |
650 | 21.1k | SubDocKey key_reverse = SubDocKey(DocKey::FromRedisKey(kv.hash_code(), kv.key()), |
651 | 21.1k | PrimitiveValue(ValueType::kSSReverse), |
652 | 21.1k | PrimitiveValue(kv.value(i))); |
653 | 21.1k | SubDocument subdoc_reverse; |
654 | 21.1k | bool subdoc_reverse_found = false; |
655 | 21.1k | auto encoded_key_reverse = key_reverse.EncodeWithoutHt(); |
656 | 21.1k | GetRedisSubDocumentData get_data = { encoded_key_reverse, &subdoc_reverse, |
657 | 21.1k | &subdoc_reverse_found }; |
658 | 21.1k | RETURN_NOT_OK(GetRedisSubDocument( |
659 | 21.1k | data.doc_write_batch->doc_db(), |
660 | 21.1k | get_data, redis_query_id(), TransactionOperationContext(), data.deadline, |
661 | 21.1k | data.read_time)); |
662 | | |
663 | | // Flag indicating whether we should add the given entry to the sorted set. |
664 | 21.1k | bool should_add_entry = true; |
665 | | // Flag indicating whether we shoould remove an entry from the sorted set. |
666 | 21.1k | bool should_remove_existing_entry = false; |
667 | | |
668 | 21.1k | if (!subdoc_reverse_found) { |
669 | | // The value is not already in the document. |
670 | 21.1k | switch (request_.set_request().sorted_set_options().update_options()) { |
671 | 0 | case SortedSetOptionsPB::NX: FALLTHROUGH_INTENDED; |
672 | 21.1k | case SortedSetOptionsPB::NONE: { |
673 | | // Both these options call for inserting new elements, increment return_value and |
674 | | // keep should_add_entry as true. |
675 | 21.1k | return_value++; |
676 | 21.1k | new_elements_added++; |
677 | 21.1k | break; |
678 | 0 | } |
679 | 2 | default: { |
680 | | // XX option calls for no new elements, don't increment return_value and set |
681 | | // should_add_entry to false. |
682 | 2 | should_add_entry = false; |
683 | 2 | break; |
684 | 0 | } |
685 | 21.1k | } |
686 | 21.1k | } else { |
687 | | // The value is already in the document. |
688 | 24 | switch (request_.set_request().sorted_set_options().update_options()) { |
689 | 0 | case SortedSetOptionsPB::XX: |
690 | 22 | case SortedSetOptionsPB::NONE: { |
691 | | // First make sure that the new score is different from the old score. |
692 | | // Both these options call for updating existing elements, set |
693 | | // should_remove_existing_entry to true, and if the CH flag is on (return both |
694 | | // elements changed and elements added), increment return_value. |
695 | 22 | double score_to_remove = subdoc_reverse.GetDouble(); |
696 | | // If incr option is set, we add the increment to the existing score. |
697 | 22 | double score_to_add = request_.set_request().sorted_set_options().incr() ? |
698 | 22 | score_to_remove + kv.subkey(i).double_subkey()0 : kv.subkey(i).double_subkey(); |
699 | 22 | if (score_to_remove != score_to_add) { |
700 | 22 | should_remove_existing_entry = true; |
701 | 22 | if (request_.set_request().sorted_set_options().ch()) { |
702 | 2 | return_value++; |
703 | 2 | } |
704 | 22 | } |
705 | 22 | break; |
706 | 0 | } |
707 | 2 | default: { |
708 | | // NX option calls for only new elements, set should_add_entry to false. |
709 | 2 | should_add_entry = false; |
710 | 2 | break; |
711 | 0 | } |
712 | 24 | } |
713 | 24 | } |
714 | | |
715 | 21.1k | if (should_remove_existing_entry) { |
716 | 22 | double score_to_remove = subdoc_reverse.GetDouble(); |
717 | 22 | EnsureMapEntry(QLVirtualValuePB::SS_FORWARD, &kv_entries, &kv_entries_forward); |
718 | 22 | kv_entries_forward->mutable_keys()->Add()->set_double_value(score_to_remove); |
719 | 22 | auto* value = kv_entries_forward->mutable_values()->Add()->mutable_map_value(); |
720 | 22 | forward_entries[score_to_remove] = value; |
721 | 22 | value->mutable_keys()->Add()->set_string_value(kv.value(i)); |
722 | 22 | value->mutable_values()->Add()->set_virtual_value( |
723 | 22 | QLVirtualValuePB::TOMBSTONE); |
724 | 22 | } |
725 | | |
726 | 21.1k | if (should_add_entry) { |
727 | | // If the incr option is specified, we need insert the existing score + new score |
728 | | // instead of just the new score. |
729 | 21.1k | double score_to_add = request_.set_request().sorted_set_options().incr() ? |
730 | 0 | kv.subkey(i).double_subkey() + subdoc_reverse.GetDouble() : |
731 | 21.1k | kv.subkey(i).double_subkey(); |
732 | | |
733 | | // Add the forward mapping to the entries. |
734 | 21.1k | EnsureMapEntry(QLVirtualValuePB::SS_FORWARD, &kv_entries, &kv_entries_forward); |
735 | 21.1k | auto it = forward_entries.find(score_to_add); |
736 | 21.1k | if (it == forward_entries.end()) { |
737 | 21.1k | kv_entries_forward->mutable_keys()->Add()->set_double_value(score_to_add); |
738 | 21.1k | auto* value = kv_entries_forward->mutable_values()->Add()->mutable_map_value(); |
739 | 21.1k | it = forward_entries.emplace(score_to_add, value).first; |
740 | 21.1k | } |
741 | 21.1k | it->second->mutable_keys()->Add()->set_string_value(kv.value(i)); |
742 | 21.1k | it->second->mutable_values()->Add()->set_virtual_value(QLVirtualValuePB::NULL_LOW); |
743 | | |
744 | 21.1k | EnsureMapEntry(QLVirtualValuePB::SS_REVERSE, &kv_entries, &kv_entries_reverse); |
745 | | // Add the reverse mapping to the entries. |
746 | 21.1k | kv_entries_reverse->mutable_keys()->Add()->set_string_value(kv.value(i)); |
747 | 21.1k | kv_entries_reverse->mutable_values()->Add()->set_double_value(score_to_add); |
748 | 21.1k | } |
749 | 21.1k | } |
750 | | |
751 | 21.1k | if (new_elements_added > 0) { |
752 | 21.0k | int64_t card = VERIFY_RESULT(GetCardinality(iterator_.get(), kv)); |
753 | | // Insert card + new_elements_added back into the document for the updated card. |
754 | 0 | auto& map = *kv_entries.mutable_map_value(); |
755 | 21.0k | map.mutable_keys()->Add()->set_virtual_value(QLVirtualValuePB::COUNTER); |
756 | 21.0k | map.mutable_values()->Add()->set_int64_value(card + new_elements_added); |
757 | 21.0k | } |
758 | | |
759 | 21.1k | if (!kv_entries.map_value().keys().empty()) { |
760 | 21.1k | ValueRef value(kv_entries); |
761 | 21.1k | value.set_custom_value_type(ValueType::kRedisSortedSet); |
762 | 21.1k | if (data_type == REDIS_TYPE_NONE) { |
763 | 21.0k | RETURN_NOT_OK(data.doc_write_batch->InsertSubDocument( |
764 | 21.0k | doc_path, value, data.read_time, data.deadline, redis_query_id(), ttl)); |
765 | 21.0k | } else { |
766 | 64 | RETURN_NOT_OK(data.doc_write_batch->ExtendSubDocument( |
767 | 64 | doc_path, value, data.read_time, data.deadline, redis_query_id(), ttl)); |
768 | 64 | } |
769 | 21.1k | } |
770 | 21.1k | response_.set_code(RedisResponsePB::OK); |
771 | 21.1k | response_.set_int_response(return_value); |
772 | 21.1k | break; |
773 | 21.1k | } |
774 | 0 | case REDIS_TYPE_STRING: { |
775 | 0 | return STATUS_SUBSTITUTE(InvalidCommand, |
776 | 21.1k | "Redis data type $0 in SET command should not have subkeys", kv.type()); |
777 | 21.1k | } |
778 | 0 | default: |
779 | 0 | return STATUS_SUBSTITUTE(InvalidCommand, |
780 | 41.1k | "Redis data type $0 not supported in SET command", kv.type()); |
781 | 41.1k | } |
782 | 81.9k | } else { |
783 | 81.9k | if (kv.type() != REDIS_TYPE_STRING) { |
784 | 0 | return STATUS_SUBSTITUTE(InvalidCommand, |
785 | 0 | "Redis data type for SET must be string if subkey not present, found $0", kv.type()); |
786 | 0 | } |
787 | 81.9k | if (kv.value_size() != 1) { |
788 | 0 | return STATUS_SUBSTITUTE(InvalidCommand, |
789 | 0 | "There must be only one value in SET if there is only one key, found $0", |
790 | 0 | kv.value_size()); |
791 | 0 | } |
792 | 81.9k | const RedisWriteMode mode = request_.set_request().mode(); |
793 | 81.9k | if (mode != RedisWriteMode::REDIS_WRITEMODE_UPSERT) { |
794 | 80 | RedisDataType data_type = VERIFY_RESULT(GetValueType(data)); |
795 | 80 | if ((mode == RedisWriteMode::REDIS_WRITEMODE_INSERT && data_type != REDIS_TYPE_NONE54 ) |
796 | 80 | || (57 mode == RedisWriteMode::REDIS_WRITEMODE_UPDATE57 && data_type == REDIS_TYPE_NONE26 )) { |
797 | | |
798 | 36 | if (request_.set_request().has_expect_ok_response() && |
799 | 36 | !request_.set_request().expect_ok_response()2 ) { |
800 | | // For SETNX we return 0 or 1 depending on if the key already existed. |
801 | 2 | response_.set_int_response(0); |
802 | 2 | response_.set_code(RedisResponsePB::OK); |
803 | 34 | } else { |
804 | 34 | response_.set_code(RedisResponsePB::NIL); |
805 | 34 | } |
806 | 36 | return Status::OK(); |
807 | 36 | } |
808 | 80 | } |
809 | 81.9k | QLValuePB value; |
810 | 81.9k | value.set_string_value(kv.value(0)); |
811 | | |
812 | 81.9k | RETURN_NOT_OK(data.doc_write_batch->SetPrimitive( |
813 | 81.9k | doc_path, ValueControlFields { .ttl = ttl }, ValueRef(value, SortingType::kNotSpecified), |
814 | 81.9k | data.read_time, data.deadline, redis_query_id())); |
815 | 81.9k | } |
816 | | |
817 | 123k | if (request_.set_request().has_expect_ok_response() && |
818 | 123k | !request_.set_request().expect_ok_response()19.8k ) { |
819 | | // For SETNX we return 0 or 1 depending on if the key already existed. |
820 | 2 | response_.set_int_response(1); |
821 | 2 | } |
822 | | |
823 | 123k | response_.set_code(RedisResponsePB::OK); |
824 | 123k | return Status::OK(); |
825 | 123k | } |
826 | | |
827 | 114 | Status RedisWriteOperation::ApplySetTtl(const DocOperationApplyData& data) { |
828 | 114 | const RedisKeyValuePB& kv = request_.key_value(); |
829 | | |
830 | | // We only support setting TTLs on top-level keys. |
831 | 114 | if (!kv.subkey().empty()) { |
832 | 0 | return STATUS_SUBSTITUTE(Corruption, |
833 | 0 | "Expected no subkeys, got $0", kv.subkey().size()); |
834 | 0 | } |
835 | | |
836 | 114 | MonoDelta ttl; |
837 | 114 | bool absolute_expiration = request_.set_ttl_request().has_absolute_time(); |
838 | | |
839 | | // Handle ExpireAt |
840 | 114 | if (absolute_expiration) { |
841 | 6 | int64_t calc_ttl = request_.set_ttl_request().absolute_time() - |
842 | 6 | server::HybridClock::GetPhysicalValueNanos(data.read_time.read) / |
843 | 6 | MonoTime::kNanosecondsPerMillisecond; |
844 | 6 | if (calc_ttl <= 0) { |
845 | 0 | return ApplyDel(data); |
846 | 0 | } |
847 | 6 | ttl = MonoDelta::FromMilliseconds(calc_ttl); |
848 | 6 | } |
849 | | |
850 | 114 | Expiration exp; |
851 | 114 | auto value = VERIFY_RESULT(GetValue(data, kNilSubkeyIndex, &exp)); |
852 | | |
853 | 114 | if (value.type == REDIS_TYPE_TIMESERIES) { // This command is not supported. |
854 | 0 | return STATUS_SUBSTITUTE(InvalidCommand, |
855 | 0 | "Redis data type $0 not supported in EXPIRE and PERSIST commands", value.type); |
856 | 0 | } |
857 | | |
858 | 114 | if (value.type == REDIS_TYPE_NONE) { // Key does not exist. |
859 | 54 | VLOG(1) << "TTL cannot be set because the key does not exist"0 ; |
860 | 54 | response_.set_int_response(0); |
861 | 54 | return Status::OK(); |
862 | 54 | } |
863 | | |
864 | 60 | if (!absolute_expiration && request_.set_ttl_request().ttl() == -157 ) { // Handle PERSIST. |
865 | 20 | MonoDelta new_ttl = VERIFY_RESULT(exp.ComputeRelativeTtl(iterator_->read_time().read)); |
866 | 20 | if (new_ttl.IsNegative() || new_ttl == ValueControlFields::kMaxTtl) { |
867 | 12 | response_.set_int_response(0); |
868 | 12 | return Status::OK(); |
869 | 12 | } |
870 | 20 | } |
871 | | |
872 | 48 | DocPath doc_path = DocPath::DocPathFromRedisKey(kv.hash_code(), kv.key()); |
873 | 48 | if (!absolute_expiration) { |
874 | 45 | ttl = request_.set_ttl_request().ttl() == -1 ? ValueControlFields::kMaxTtl8 : |
875 | 45 | MonoDelta::FromMilliseconds(request_.set_ttl_request().ttl())37 ; |
876 | 45 | } |
877 | | |
878 | 48 | ValueType v_type = ValueTypeFromRedisType(value.type); |
879 | 48 | if (v_type == ValueType::kInvalid) { |
880 | 0 | return STATUS(Corruption, "Invalid value type."); |
881 | 0 | } |
882 | | |
883 | 48 | auto control_flags = ValueControlFields { |
884 | 48 | .merge_flags = ValueControlFields::kTtlFlag, |
885 | 48 | .ttl = ttl, |
886 | 48 | }; |
887 | 48 | RETURN_NOT_OK(data.doc_write_batch->SetPrimitive( |
888 | 48 | doc_path, control_flags, ValueRef(v_type), data.read_time, |
889 | 48 | data.deadline, redis_query_id())); |
890 | 48 | VLOG(2) << "Set TTL successfully to " << ttl << " for key " << kv.key()0 ; |
891 | 48 | response_.set_int_response(1); |
892 | 48 | return Status::OK(); |
893 | 48 | } |
894 | | |
895 | 2 | Status RedisWriteOperation::ApplyGetSet(const DocOperationApplyData& data) { |
896 | 2 | const RedisKeyValuePB& kv = request_.key_value(); |
897 | | |
898 | 2 | if (kv.value_size() != 1) { |
899 | 0 | return STATUS_SUBSTITUTE(Corruption, |
900 | 0 | "Getset kv should have 1 value, found $0", kv.value_size()); |
901 | 0 | } |
902 | | |
903 | 2 | auto value = GetValue(data); |
904 | 2 | RETURN_NOT_OK(value); |
905 | | |
906 | 2 | if (!VerifyTypeAndSetCode(RedisDataType::REDIS_TYPE_STRING, value->type, &response_, |
907 | 2 | VerifySuccessIfMissing::kTrue)) { |
908 | | // We've already set the error code in the response. |
909 | 0 | return Status::OK(); |
910 | 0 | } |
911 | 2 | response_.set_string_response(value->value); |
912 | | |
913 | 2 | QLValuePB value_pb; |
914 | 2 | value_pb.set_string_value(kv.value(0)); |
915 | | |
916 | 2 | return data.doc_write_batch->SetPrimitive( |
917 | 2 | DocPath::DocPathFromRedisKey(kv.hash_code(), kv.key()), ValueControlFields(), |
918 | 2 | ValueRef(value_pb, SortingType::kNotSpecified), data.read_time, data.deadline, |
919 | 2 | redis_query_id()); |
920 | 2 | } |
921 | | |
922 | 4 | Status RedisWriteOperation::ApplyAppend(const DocOperationApplyData& data) { |
923 | 4 | const RedisKeyValuePB& kv = request_.key_value(); |
924 | | |
925 | 4 | if (kv.value_size() != 1) { |
926 | 0 | return STATUS_SUBSTITUTE(Corruption, |
927 | 0 | "Append kv should have 1 value, found $0", kv.value_size()); |
928 | 0 | } |
929 | | |
930 | 4 | auto value = GetValue(data); |
931 | 4 | RETURN_NOT_OK(value); |
932 | | |
933 | 4 | if (!VerifyTypeAndSetCode(RedisDataType::REDIS_TYPE_STRING, value->type, &response_, |
934 | 4 | VerifySuccessIfMissing::kTrue)) { |
935 | | // We've already set the error code in the response. |
936 | 0 | return Status::OK(); |
937 | 0 | } |
938 | | |
939 | 4 | value->value += kv.value(0); |
940 | | |
941 | 4 | response_.set_code(RedisResponsePB::OK); |
942 | 4 | response_.set_int_response(value->value.length()); |
943 | | |
944 | | // TODO: update the TTL with the write time rather than read time, |
945 | | // or store the expiration. |
946 | 4 | auto control_fields = ValueControlFields { |
947 | 4 | .ttl = VERIFY_RESULT(value->exp.ComputeRelativeTtl(iterator_->read_time().read)) |
948 | 0 | }; |
949 | | |
950 | 0 | QLValuePB value_pb; |
951 | 4 | value_pb.set_string_value(value->value); |
952 | | |
953 | 4 | return data.doc_write_batch->SetPrimitive( |
954 | 4 | DocPath::DocPathFromRedisKey(kv.hash_code(), kv.key()), control_fields, |
955 | 4 | ValueRef(value_pb, SortingType::kNotSpecified), data.read_time, data.deadline, |
956 | 4 | redis_query_id()); |
957 | 4 | } |
958 | | |
959 | | // TODO (akashnil): Actually check if the value existed, return 0 if not. handle multidel in future. |
960 | | // See ENG-807 |
961 | 70 | Status RedisWriteOperation::ApplyDel(const DocOperationApplyData& data) { |
962 | 70 | const RedisKeyValuePB& kv = request_.key_value(); |
963 | 70 | RedisDataType data_type = VERIFY_RESULT(GetValueType(data)); |
964 | 70 | if (data_type != REDIS_TYPE_NONE && data_type != kv.type()56 && kv.type() != REDIS_TYPE_NONE22 ) { |
965 | 0 | response_.set_code(RedisResponsePB::WRONG_TYPE); |
966 | 0 | response_.set_error_message(wrong_type_message); |
967 | 0 | return Status::OK(); |
968 | 0 | } |
969 | | |
970 | 70 | QLValuePB value; |
971 | 70 | ValueRef value_ref(value, SortingType::kNotSpecified); |
972 | | // Number of distinct keys being removed. |
973 | 70 | int num_keys = 0; |
974 | 70 | switch (kv.type()) { |
975 | 26 | case REDIS_TYPE_NONE: { |
976 | 26 | num_keys = data_type == REDIS_TYPE_NONE ? 04 : 122 ; |
977 | 26 | break; |
978 | 0 | } |
979 | 18 | case REDIS_TYPE_TIMESERIES: { |
980 | 18 | if (data_type == REDIS_TYPE_NONE) { |
981 | 6 | return Status::OK(); |
982 | 6 | } |
983 | 12 | value_ref.set_list_extend_order(ListExtendOrder::PREPEND); |
984 | 12 | value_ref.set_write_instruction(bfql::TSOpcode::kSetRemove); |
985 | 618 | for (int i = 0; i < kv.subkey_size(); i++606 ) { |
986 | 606 | RETURN_NOT_OK(QLValueFromSubKeyStrict( |
987 | 606 | kv.subkey(i), data_type, value.mutable_map_value()->mutable_keys()->Add())); |
988 | 606 | value.mutable_map_value()->mutable_values()->Add()->set_virtual_value( |
989 | 606 | QLVirtualValuePB::TOMBSTONE); |
990 | 606 | } |
991 | 12 | num_keys = kv.subkey_size(); |
992 | 12 | break; |
993 | 12 | } |
994 | 10 | case REDIS_TYPE_SORTEDSET: { |
995 | 10 | num_keys = kv.subkey_size(); |
996 | | |
997 | 10 | auto& map = *value.mutable_map_value(); |
998 | 10 | map.mutable_keys()->Add()->set_virtual_value(QLVirtualValuePB::SS_FORWARD); |
999 | 10 | auto& values_forward = *map.mutable_values()->Add()->mutable_map_value(); |
1000 | | |
1001 | 10 | map.mutable_keys()->Add()->set_virtual_value(QLVirtualValuePB::SS_REVERSE); |
1002 | 10 | auto& values_reverse = *map.mutable_values()->Add()->mutable_map_value(); |
1003 | | |
1004 | 26 | for (int i = 0; i < kv.subkey_size(); i++16 ) { |
1005 | | // Check whether the value is already in the document. |
1006 | 16 | SubDocument doc_reverse; |
1007 | 16 | bool doc_reverse_found = false; |
1008 | 16 | SubDocKey subdoc_key_reverse = SubDocKey(DocKey::FromRedisKey(kv.hash_code(), kv.key()), |
1009 | 16 | PrimitiveValue(ValueType::kSSReverse), |
1010 | 16 | PrimitiveValue(kv.subkey(i).string_subkey())); |
1011 | | // Todo(Rahul): Add values to the write batch cache and then do an additional check. |
1012 | | // As of now, we only check to see if a value is in rocksdb, and we should also check |
1013 | | // the write batch. |
1014 | 16 | auto encoded_subdoc_key_reverse = subdoc_key_reverse.EncodeWithoutHt(); |
1015 | 16 | GetRedisSubDocumentData get_data = { encoded_subdoc_key_reverse, &doc_reverse, |
1016 | 16 | &doc_reverse_found }; |
1017 | 16 | RETURN_NOT_OK(GetRedisSubDocument( |
1018 | 16 | data.doc_write_batch->doc_db(), |
1019 | 16 | get_data, redis_query_id(), TransactionOperationContext(), data.deadline, |
1020 | 16 | data.read_time)); |
1021 | 16 | if (doc_reverse_found && doc_reverse.value_type() != ValueType::kTombstone8 ) { |
1022 | | // The value is already in the doc, needs to be removed. |
1023 | 8 | values_reverse.mutable_keys()->Add()->set_string_value(kv.subkey(i).string_subkey()); |
1024 | 8 | values_reverse.mutable_values()->Add()->set_virtual_value(QLVirtualValuePB::TOMBSTONE); |
1025 | | // For sorted sets, the forward mapping also needs to be deleted. |
1026 | 8 | values_forward.mutable_keys()->Add()->set_double_value(doc_reverse.GetDouble()); |
1027 | 8 | auto& fwd_map = *values_forward.mutable_values()->Add()->mutable_map_value(); |
1028 | 8 | fwd_map.mutable_keys()->Add()->set_string_value(kv.subkey(i).string_subkey()); |
1029 | 8 | fwd_map.mutable_values()->Add()->set_virtual_value(QLVirtualValuePB::TOMBSTONE); |
1030 | 8 | } else { |
1031 | | // If the key is absent, it doesn't contribute to the count of keys being deleted. |
1032 | 8 | num_keys--; |
1033 | 8 | } |
1034 | 16 | } |
1035 | 10 | int64_t card = VERIFY_RESULT(GetCardinality(iterator_.get(), kv)); |
1036 | | // The new cardinality is card - num_keys. |
1037 | 0 | map.mutable_keys()->Add()->set_virtual_value(QLVirtualValuePB::COUNTER); |
1038 | 10 | map.mutable_values()->Add()->set_int64_value(card - num_keys); |
1039 | | |
1040 | 10 | break; |
1041 | 10 | } |
1042 | 16 | default: { |
1043 | 16 | num_keys = kv.subkey_size(); // We know the subkeys are distinct. |
1044 | | // Avoid reads for redis timeseries type. |
1045 | 16 | if (EmulateRedisResponse(kv.type())) { |
1046 | 38 | for (int i = 0; i < kv.subkey_size(); i++22 ) { |
1047 | 22 | RedisDataType type = VERIFY_RESULT(GetValueType(data, i)); |
1048 | 22 | if (type == REDIS_TYPE_STRING) { |
1049 | 8 | value.mutable_map_value()->mutable_keys()->Add()->set_string_value( |
1050 | 8 | kv.subkey(i).string_subkey()); |
1051 | 8 | value.mutable_map_value()->mutable_values()->Add()->set_virtual_value( |
1052 | 8 | QLVirtualValuePB::TOMBSTONE); |
1053 | 14 | } else { |
1054 | | // If the key is absent, it doesn't contribute to the count of keys being deleted. |
1055 | 14 | num_keys--; |
1056 | 14 | } |
1057 | 22 | } |
1058 | 16 | } |
1059 | 16 | break; |
1060 | 16 | } |
1061 | 70 | } |
1062 | | |
1063 | 64 | if (num_keys != 0) { |
1064 | 46 | DocPath doc_path = DocPath::DocPathFromRedisKey(kv.hash_code(), kv.key()); |
1065 | 46 | RETURN_NOT_OK(data.doc_write_batch->ExtendSubDocument( |
1066 | 46 | doc_path, value_ref, data.read_time, data.deadline, redis_query_id())); |
1067 | 46 | } |
1068 | | |
1069 | 64 | response_.set_code(RedisResponsePB::OK); |
1070 | 64 | if (EmulateRedisResponse(kv.type())) { |
1071 | | // If the flag is true, we respond with the number of keys actually being deleted. We don't |
1072 | | // report this number for the redis timeseries type to avoid reads. |
1073 | 52 | response_.set_int_response(num_keys); |
1074 | 52 | } |
1075 | 64 | return Status::OK(); |
1076 | 64 | } |
1077 | | |
1078 | 1 | Status RedisWriteOperation::ApplySetRange(const DocOperationApplyData& data) { |
1079 | 1 | const RedisKeyValuePB& kv = request_.key_value(); |
1080 | 1 | if (kv.value_size() != 1) { |
1081 | 0 | return STATUS_SUBSTITUTE(Corruption, |
1082 | 0 | "SetRange kv should have 1 value, found $0", kv.value_size()); |
1083 | 0 | } |
1084 | | |
1085 | 1 | auto value = GetValue(data); |
1086 | 1 | RETURN_NOT_OK(value); |
1087 | | |
1088 | 1 | if (!VerifyTypeAndSetCode(RedisDataType::REDIS_TYPE_STRING, value->type, &response_, |
1089 | 1 | VerifySuccessIfMissing::kTrue)) { |
1090 | | // We've already set the error code in the response. |
1091 | 0 | return Status::OK(); |
1092 | 0 | } |
1093 | | |
1094 | 1 | size_t set_range_offset = request_.set_range_request().offset(); |
1095 | 1 | if (set_range_offset > value->value.length()) { |
1096 | 0 | value->value.resize(set_range_offset, 0); |
1097 | 0 | } |
1098 | 1 | value->value.replace(set_range_offset, kv.value(0).length(), kv.value(0)); |
1099 | 1 | response_.set_code(RedisResponsePB::OK); |
1100 | 1 | response_.set_int_response(value->value.length()); |
1101 | | |
1102 | | // TODO: update the TTL with the write time rather than read time, |
1103 | | // or store the expiration. |
1104 | 1 | auto control_fields = ValueControlFields { |
1105 | 1 | .ttl = VERIFY_RESULT(value->exp.ComputeRelativeTtl(iterator_->read_time().read)) |
1106 | 0 | }; |
1107 | | |
1108 | 0 | QLValuePB value_pb; |
1109 | 1 | value_pb.set_string_value(value->value); |
1110 | 1 | return data.doc_write_batch->SetPrimitive( |
1111 | 1 | DocPath::DocPathFromRedisKey(kv.hash_code(), kv.key()), control_fields, |
1112 | 1 | ValueRef(value_pb, SortingType::kNotSpecified), std::move(iterator_)); |
1113 | 1 | } |
1114 | | |
1115 | 23 | Status RedisWriteOperation::ApplyIncr(const DocOperationApplyData& data) { |
1116 | 23 | const RedisKeyValuePB& kv = request_.key_value(); |
1117 | 23 | const int64_t incr = request_.incr_request().increment_int(); |
1118 | | |
1119 | 23 | if (kv.type() != REDIS_TYPE_HASH && kv.type() != REDIS_TYPE_STRING17 ) { |
1120 | 0 | return STATUS_SUBSTITUTE(InvalidCommand, |
1121 | 0 | "Redis data type $0 not supported in Incr command", kv.type()); |
1122 | 0 | } |
1123 | | |
1124 | 23 | RedisDataType container_type = VERIFY_RESULT(GetValueType(data)); |
1125 | 23 | if (!VerifyTypeAndSetCode(kv.type(), container_type, &response_, |
1126 | 23 | VerifySuccessIfMissing::kTrue)) { |
1127 | | // We've already set the error code in the response. |
1128 | 0 | return Status::OK(); |
1129 | 0 | } |
1130 | | |
1131 | 23 | int subkey = (kv.type() == REDIS_TYPE_HASH ? 06 : -117 ); |
1132 | 23 | auto value = GetValue(data, subkey); |
1133 | 23 | RETURN_NOT_OK(value); |
1134 | | |
1135 | 23 | if (!VerifyTypeAndSetCode(RedisDataType::REDIS_TYPE_STRING, value->type, &response_, |
1136 | 23 | VerifySuccessIfMissing::kTrue)) { |
1137 | 0 | response_.set_code(RedisResponsePB::WRONG_TYPE); |
1138 | 0 | response_.set_error_message(wrong_type_message); |
1139 | 0 | return Status::OK(); |
1140 | 0 | } |
1141 | | |
1142 | | // If no value is present, 0 is the default. |
1143 | 23 | int64_t old_value = 0, new_value; |
1144 | 23 | if (value->type != REDIS_TYPE_NONE) { |
1145 | 15 | auto old = CheckedStoll(value->value); |
1146 | 15 | if (!old.ok()) { |
1147 | | // This can happen if there are leading or trailing spaces, or the value |
1148 | | // is out of range. |
1149 | 4 | response_.set_code(RedisResponsePB::WRONG_TYPE); |
1150 | 4 | response_.set_error_message("ERR value is not an integer or out of range"); |
1151 | 4 | return Status::OK(); |
1152 | 4 | } |
1153 | 11 | old_value = *old; |
1154 | 11 | } |
1155 | | |
1156 | 19 | if ((incr < 0 && old_value < 04 && incr < numeric_limits<int64_t>::min() - old_value0 ) || |
1157 | 19 | (incr > 0 && old_value > 015 && incr > numeric_limits<int64_t>::max() - old_value6 )) { |
1158 | 1 | response_.set_code(RedisResponsePB::WRONG_TYPE); |
1159 | 1 | response_.set_error_message("Increment would overflow"); |
1160 | 1 | return Status::OK(); |
1161 | 1 | } |
1162 | 18 | new_value = old_value + incr; |
1163 | 18 | response_.set_code(RedisResponsePB::OK); |
1164 | 18 | response_.set_int_response(new_value); |
1165 | | |
1166 | 18 | DocPath doc_path = DocPath::DocPathFromRedisKey(kv.hash_code(), kv.key()); |
1167 | 18 | if (kv.type() == REDIS_TYPE_HASH) { |
1168 | 6 | QLValuePB kv_entries; |
1169 | 6 | RETURN_NOT_OK(QLValueFromSubKeyStrict( |
1170 | 6 | kv.subkey(0), kv.type(), kv_entries.mutable_map_value()->mutable_keys()->Add())); |
1171 | 6 | kv_entries.mutable_map_value()->mutable_values()->Add()->set_string_value( |
1172 | 6 | std::to_string(new_value)); |
1173 | 6 | return data.doc_write_batch->ExtendSubDocument( |
1174 | 6 | doc_path, ValueRef(kv_entries, SortingType::kNotSpecified), data.read_time, data.deadline, |
1175 | 6 | redis_query_id()); |
1176 | 0 | return Status::OK(); |
1177 | 12 | } else { // kv.type() == REDIS_TYPE_STRING |
1178 | | // TODO: update the TTL with the write time rather than read time, |
1179 | | // or store the expiration. |
1180 | 12 | auto control_fields = ValueControlFields { |
1181 | 12 | .ttl = VERIFY_RESULT(value->exp.ComputeRelativeTtl(iterator_->read_time().read)) |
1182 | 0 | }; |
1183 | 0 | QLValuePB value_pb; |
1184 | 12 | value_pb.set_string_value(std::to_string(new_value)); |
1185 | 12 | return data.doc_write_batch->SetPrimitive( |
1186 | 12 | doc_path, control_fields, ValueRef(value_pb, SortingType::kNotSpecified), |
1187 | 12 | std::move(iterator_)); |
1188 | 12 | } |
1189 | 18 | } |
1190 | | |
1191 | 16 | Status RedisWriteOperation::ApplyPush(const DocOperationApplyData& data) { |
1192 | 16 | const RedisKeyValuePB& kv = request_.key_value(); |
1193 | 16 | DocPath doc_path = DocPath::DocPathFromRedisKey(kv.hash_code(), kv.key()); |
1194 | 16 | RedisDataType data_type = VERIFY_RESULT(GetValueType(data)); |
1195 | 16 | if (data_type != REDIS_TYPE_LIST && data_type != REDIS_TYPE_NONE6 ) { |
1196 | 0 | response_.set_code(RedisResponsePB::WRONG_TYPE); |
1197 | 0 | response_.set_error_message(wrong_type_message); |
1198 | 0 | return Status::OK(); |
1199 | 0 | } |
1200 | | |
1201 | 16 | QLValuePB list; |
1202 | 16 | int64_t card = VERIFY_RESULT(GetCardinality(iterator_.get(), kv)) + kv.value_size(); |
1203 | 0 | list.mutable_map_value()->mutable_keys()->Add()->set_virtual_value(QLVirtualValuePB::COUNTER); |
1204 | 16 | list.mutable_map_value()->mutable_values()->Add()->set_int64_value(card); |
1205 | | |
1206 | 16 | list.mutable_map_value()->mutable_keys()->Add()->set_virtual_value(QLVirtualValuePB::ARRAY); |
1207 | 16 | auto& elements = *list.mutable_map_value()->mutable_values()->Add(); |
1208 | | |
1209 | 20 | for (const auto& val : kv.value()) { |
1210 | 20 | elements.mutable_list_value()->mutable_elems()->Add()->set_string_value(val); |
1211 | 20 | } |
1212 | | |
1213 | 16 | ValueRef value_ref(list); |
1214 | 16 | if (request_.push_request().side() == REDIS_SIDE_LEFT) { |
1215 | 10 | value_ref.set_list_extend_order(ListExtendOrder::PREPEND); |
1216 | 10 | } |
1217 | 16 | value_ref.set_custom_value_type(ValueType::kRedisList); |
1218 | 16 | if (data_type == REDIS_TYPE_NONE) { |
1219 | 6 | RETURN_NOT_OK(data.doc_write_batch->InsertSubDocument( |
1220 | 6 | DocPath::DocPathFromRedisKey(kv.hash_code(), kv.key()), value_ref, |
1221 | 6 | data.read_time, data.deadline, redis_query_id())); |
1222 | 10 | } else { |
1223 | 10 | RETURN_NOT_OK(data.doc_write_batch->ExtendSubDocument( |
1224 | 10 | DocPath::DocPathFromRedisKey(kv.hash_code(), kv.key()), value_ref, |
1225 | 10 | data.read_time, data.deadline, redis_query_id())); |
1226 | 10 | } |
1227 | | |
1228 | 16 | response_.set_int_response(card); |
1229 | 16 | response_.set_code(RedisResponsePB::OK); |
1230 | 16 | return Status::OK(); |
1231 | 16 | } |
1232 | | |
1233 | 0 | Status RedisWriteOperation::ApplyInsert(const DocOperationApplyData& data) { |
1234 | 0 | return STATUS(NotSupported, "Redis operation has not been implemented"); |
1235 | 0 | } |
1236 | | |
1237 | 0 | Status RedisWriteOperation::ApplyPop(const DocOperationApplyData& data) { |
1238 | 0 | const RedisKeyValuePB& kv = request_.key_value(); |
1239 | 0 | DocPath doc_path = DocPath::DocPathFromRedisKey(kv.hash_code(), kv.key()); |
1240 | 0 | RedisDataType data_type = VERIFY_RESULT(GetValueType(data)); |
1241 | | |
1242 | 0 | if (!VerifyTypeAndSetCode(kv.type(), data_type, &response_, VerifySuccessIfMissing::kTrue)) { |
1243 | | // We already set the error code in the function. |
1244 | 0 | return Status::OK(); |
1245 | 0 | } |
1246 | | |
1247 | 0 | int64_t card = VERIFY_RESULT(GetCardinality(iterator_.get(), kv)); |
1248 | | |
1249 | 0 | if (!card) { |
1250 | 0 | response_.set_code(RedisResponsePB::NIL); |
1251 | 0 | return Status::OK(); |
1252 | 0 | } |
1253 | | |
1254 | 0 | std::vector<std::string> value; |
1255 | 0 | { |
1256 | 0 | ValueRef value_ref(ValueType::kTombstone); |
1257 | |
|
1258 | 0 | if (request_.pop_request().side() == REDIS_SIDE_LEFT) { |
1259 | 0 | RETURN_NOT_OK(data.doc_write_batch->ReplaceRedisInList(doc_path, 1, value_ref, |
1260 | 0 | data.read_time, data.deadline, redis_query_id(), Direction::kForward, 0, &value)); |
1261 | 0 | } else { |
1262 | 0 | RETURN_NOT_OK(data.doc_write_batch->ReplaceRedisInList(doc_path, card, value_ref, |
1263 | 0 | data.read_time, data.deadline, redis_query_id(), Direction::kBackward, card + 1, &value)); |
1264 | 0 | } |
1265 | 0 | } |
1266 | | |
1267 | 0 | QLValuePB list; |
1268 | 0 | list.mutable_map_value()->mutable_keys()->Add()->set_virtual_value(QLVirtualValuePB::COUNTER); |
1269 | 0 | list.mutable_map_value()->mutable_values()->Add()->set_int64_value(--card); |
1270 | 0 | ValueRef value_ref(list); |
1271 | 0 | value_ref.set_custom_value_type(ValueType::kRedisList); |
1272 | 0 | RETURN_NOT_OK(data.doc_write_batch->ExtendSubDocument( |
1273 | 0 | doc_path, value_ref, data.read_time, data.deadline, redis_query_id())); |
1274 | | |
1275 | 0 | if (value.size() != 1) { |
1276 | 0 | return STATUS_SUBSTITUTE(Corruption, "Expected one popped value, got $0", value.size()); |
1277 | 0 | } |
1278 | | |
1279 | 0 | response_.set_string_response(value[0]); |
1280 | 0 | response_.set_code(RedisResponsePB::OK); |
1281 | 0 | return Status::OK(); |
1282 | 0 | } |
1283 | | |
1284 | 26 | Status RedisWriteOperation::ApplyAdd(const DocOperationApplyData& data) { |
1285 | 26 | const RedisKeyValuePB& kv = request_.key_value(); |
1286 | 26 | RedisDataType data_type = VERIFY_RESULT(GetValueType(data)); |
1287 | | |
1288 | 26 | if (data_type != REDIS_TYPE_SET && data_type != REDIS_TYPE_NONE12 ) { |
1289 | 0 | response_.set_code(RedisResponsePB::WRONG_TYPE); |
1290 | 0 | response_.set_error_message(wrong_type_message); |
1291 | 0 | return Status::OK(); |
1292 | 0 | } |
1293 | | |
1294 | 26 | DocPath doc_path = DocPath::DocPathFromRedisKey(kv.hash_code(), kv.key()); |
1295 | | |
1296 | 26 | if (kv.subkey_size() == 0) { |
1297 | 0 | return STATUS(InvalidCommand, "SADD request has no subkeys set"); |
1298 | 0 | } |
1299 | | |
1300 | 26 | int num_keys_found = 0; |
1301 | | |
1302 | 26 | QLValuePB set_entries; |
1303 | | |
1304 | 58 | for (int i = 0 ; i < kv.subkey_size(); i++32 ) { // We know that each subkey is distinct. |
1305 | 32 | if (FLAGS_emulate_redis_responses) { |
1306 | 32 | RedisDataType type = VERIFY_RESULT(GetValueType(data, i)); |
1307 | 32 | if (type != REDIS_TYPE_NONE) { |
1308 | 8 | num_keys_found++; |
1309 | 8 | } |
1310 | 32 | } |
1311 | | |
1312 | 32 | set_entries.mutable_map_value()->mutable_keys()->Add()->set_string_value( |
1313 | 32 | kv.subkey(i).string_subkey()); |
1314 | 32 | set_entries.mutable_map_value()->mutable_values()->Add()->set_virtual_value( |
1315 | 32 | QLVirtualValuePB::NULL_LOW); |
1316 | 32 | } |
1317 | | |
1318 | 26 | ValueRef value_ref(set_entries); |
1319 | 26 | value_ref.set_custom_value_type(ValueType::kRedisSet); |
1320 | | |
1321 | 26 | Status s; |
1322 | | |
1323 | 26 | if (data_type == REDIS_TYPE_NONE) { |
1324 | 12 | RETURN_NOT_OK(data.doc_write_batch->InsertSubDocument( |
1325 | 12 | doc_path, value_ref, data.read_time, data.deadline, redis_query_id())); |
1326 | 14 | } else { |
1327 | 14 | RETURN_NOT_OK(data.doc_write_batch->ExtendSubDocument( |
1328 | 14 | doc_path, value_ref, data.read_time, data.deadline, redis_query_id())); |
1329 | 14 | } |
1330 | | |
1331 | 26 | response_.set_code(RedisResponsePB::OK); |
1332 | 26 | if (FLAGS_emulate_redis_responses) { |
1333 | | // If flag is set, the actual number of new keys added is sent as response. |
1334 | 26 | response_.set_int_response(kv.subkey_size() - num_keys_found); |
1335 | 26 | } |
1336 | 26 | return Status::OK(); |
1337 | 26 | } |
1338 | | |
1339 | 0 | Status RedisWriteOperation::ApplyRemove(const DocOperationApplyData& data) { |
1340 | 0 | return STATUS(NotSupported, "Redis operation has not been implemented"); |
1341 | 0 | } |
1342 | | |
1343 | 84.6k | Status RedisReadOperation::Execute() { |
1344 | 84.6k | SimulateTimeoutIfTesting(&deadline_); |
1345 | | // If we have a KEYS command, we don't specify any key for the iterator. Therefore, don't use |
1346 | | // bloom filters for this command. |
1347 | 84.6k | SubDocKey doc_key( |
1348 | 84.6k | DocKey::FromRedisKey(request_.key_value().hash_code(), request_.key_value().key())); |
1349 | 84.6k | auto bloom_filter_mode = request_.has_keys_request() ? |
1350 | 83.9k | BloomFilterMode::DONT_USE_BLOOM_FILTER630 : BloomFilterMode::USE_BLOOM_FILTER; |
1351 | 84.6k | auto iter = yb::docdb::CreateIntentAwareIterator( |
1352 | 84.6k | doc_db_, bloom_filter_mode, |
1353 | 84.6k | doc_key.Encode().AsSlice(), |
1354 | 84.6k | redis_query_id(), TransactionOperationContext(), deadline_, read_time_); |
1355 | 84.6k | iterator_ = std::move(iter); |
1356 | 84.6k | deadline_info_.emplace(deadline_); |
1357 | | |
1358 | 84.6k | switch (request_.request_case()) { |
1359 | 4 | case RedisReadRequestPB::kGetForRenameRequest: |
1360 | 4 | return ExecuteGetForRename(); |
1361 | 72.6k | case RedisReadRequestPB::kGetRequest: |
1362 | 72.6k | return ExecuteGet(); |
1363 | 291 | case RedisReadRequestPB::kGetTtlRequest: |
1364 | 291 | return ExecuteGetTtl(); |
1365 | 2 | case RedisReadRequestPB::kStrlenRequest: |
1366 | 2 | return ExecuteStrLen(); |
1367 | 4 | case RedisReadRequestPB::kExistsRequest: |
1368 | 4 | return ExecuteExists(); |
1369 | 2 | case RedisReadRequestPB::kGetRangeRequest: |
1370 | 2 | return ExecuteGetRange(); |
1371 | 10.9k | case RedisReadRequestPB::kGetCollectionRangeRequest: |
1372 | 10.9k | return ExecuteCollectionGetRange(); |
1373 | 630 | case RedisReadRequestPB::kKeysRequest: |
1374 | 630 | return ExecuteKeys(); |
1375 | 0 | default: |
1376 | 0 | return STATUS_FORMAT( |
1377 | 84.6k | Corruption, "Unsupported redis read operation: $0", request_.request_case()); |
1378 | 84.6k | } |
1379 | 84.6k | } |
1380 | | |
1381 | | namespace { |
1382 | | |
1383 | 4 | ssize_t ApplyIndex(ssize_t index, ssize_t len) { |
1384 | 4 | if (index < 0) { |
1385 | 0 | index += len; |
1386 | 0 | return std::max<ssize_t>(index, 0); |
1387 | 0 | } |
1388 | 4 | return std::min<ssize_t>(index, len); |
1389 | 4 | } |
1390 | | |
1391 | | } // namespace |
1392 | | |
1393 | | Status RedisReadOperation::ExecuteHGetAllLikeCommands(ValueType value_type, |
1394 | | bool add_keys, |
1395 | 68 | bool add_values) { |
1396 | 68 | SubDocKey doc_key( |
1397 | 68 | DocKey::FromRedisKey(request_.key_value().hash_code(), request_.key_value().key())); |
1398 | 68 | SubDocument doc; |
1399 | 68 | bool doc_found = false; |
1400 | 68 | auto encoded_doc_key = doc_key.EncodeWithoutHt(); |
1401 | | |
1402 | | // TODO(dtxn) - pass correct transaction context when we implement cross-shard transactions |
1403 | | // support for Redis. |
1404 | 68 | GetRedisSubDocumentData data = { encoded_doc_key, &doc, &doc_found }; |
1405 | 68 | data.deadline_info = deadline_info_.get_ptr(); |
1406 | | |
1407 | 68 | bool has_cardinality_subkey = value_type == ValueType::kRedisSortedSet || |
1408 | 68 | value_type == ValueType::kRedisList66 ; |
1409 | 68 | bool return_array_response = add_keys || add_values55 ; |
1410 | | |
1411 | 68 | if (has_cardinality_subkey) { |
1412 | 8 | data.return_type_only = !return_array_response; |
1413 | 60 | } else { |
1414 | 60 | data.count_only = !return_array_response; |
1415 | 60 | } |
1416 | | |
1417 | 68 | RETURN_NOT_OK(GetRedisSubDocument(iterator_.get(), data, /* projection */ nullptr, |
1418 | 68 | SeekFwdSuffices::kFalse)); |
1419 | 68 | if (return_array_response) |
1420 | 16 | response_.set_allocated_array_response(new RedisArrayPB()); |
1421 | | |
1422 | 68 | if (!doc_found) { |
1423 | 12 | response_.set_code(RedisResponsePB::OK); |
1424 | 12 | if (!return_array_response) |
1425 | 12 | response_.set_int_response(0); |
1426 | 12 | return Status::OK(); |
1427 | 12 | } |
1428 | | |
1429 | 56 | if (VerifyTypeAndSetCode(value_type, doc.value_type(), &response_)) { |
1430 | 47 | if (return_array_response) { |
1431 | 15 | RETURN_NOT_OK(PopulateResponseFrom(doc.object_container(), AddResponseValuesGeneric, |
1432 | 15 | &response_, add_keys, add_values)); |
1433 | 32 | } else { |
1434 | 32 | int64_t card = has_cardinality_subkey ? |
1435 | 32 | VERIFY_RESULT(GetCardinality(iterator_.get(), request_.key_value())) : |
1436 | 32 | data.record_count28 ; |
1437 | 0 | response_.set_int_response(card); |
1438 | 32 | response_.set_code(RedisResponsePB::OK); |
1439 | 32 | } |
1440 | 47 | } |
1441 | 56 | return Status::OK(); |
1442 | 56 | } |
1443 | | |
1444 | | Status RedisReadOperation::ExecuteCollectionGetRangeByBounds( |
1445 | 0 | RedisCollectionGetRangeRequestPB::GetRangeRequestType request_type, bool add_keys) { |
1446 | 0 | RedisSubKeyBoundPB lower_bound; |
1447 | 0 | lower_bound.set_infinity_type(RedisSubKeyBoundPB::NEGATIVE); |
1448 | 0 | RedisSubKeyBoundPB upper_bound; |
1449 | 0 | upper_bound.set_infinity_type(RedisSubKeyBoundPB::POSITIVE); |
1450 | 0 | return ExecuteCollectionGetRangeByBounds(request_type, lower_bound, upper_bound, add_keys); |
1451 | 0 | } |
1452 | | |
1453 | | Status RedisReadOperation::ExecuteCollectionGetRangeByBounds( |
1454 | | RedisCollectionGetRangeRequestPB::GetRangeRequestType request_type, |
1455 | 10.9k | const RedisSubKeyBoundPB& lower_bound, const RedisSubKeyBoundPB& upper_bound, bool add_keys) { |
1456 | 10.9k | if ((lower_bound.has_infinity_type() && |
1457 | 10.9k | lower_bound.infinity_type() == RedisSubKeyBoundPB::POSITIVE116 ) || |
1458 | 10.9k | (upper_bound.has_infinity_type() && |
1459 | 10.9k | upper_bound.infinity_type() == RedisSubKeyBoundPB::NEGATIVE114 )) { |
1460 | | // Return empty response. |
1461 | 0 | response_.set_code(RedisResponsePB::OK); |
1462 | 0 | RETURN_NOT_OK(PopulateResponseFrom( |
1463 | 0 | SubDocument::ObjectContainer(), AddResponseValuesGeneric, &response_, /* add_keys */ true, |
1464 | 0 | /* add_values */ true)); |
1465 | 0 | return Status::OK(); |
1466 | 0 | } |
1467 | | |
1468 | 10.9k | if (request_type == RedisCollectionGetRangeRequestPB::ZRANGEBYSCORE) { |
1469 | 98 | auto type = VERIFY_RESULT(GetValueType()); |
1470 | 0 | auto expected_type = REDIS_TYPE_SORTEDSET; |
1471 | 98 | if (!VerifyTypeAndSetCode(expected_type, type, &response_, VerifySuccessIfMissing::kTrue)) { |
1472 | 6 | return Status::OK(); |
1473 | 6 | } |
1474 | 92 | auto encoded_doc_key = |
1475 | 92 | DocKey::EncodedFromRedisKey(request_.key_value().hash_code(), request_.key_value().key()); |
1476 | 92 | PrimitiveValue(ValueType::kSSForward).AppendToKey(&encoded_doc_key); |
1477 | 92 | double low_double = lower_bound.subkey_bound().double_subkey(); |
1478 | 92 | double high_double = upper_bound.subkey_bound().double_subkey(); |
1479 | | |
1480 | 92 | KeyBytes low_sub_key_bound; |
1481 | 92 | KeyBytes high_sub_key_bound; |
1482 | | |
1483 | 92 | SliceKeyBound low_subkey; |
1484 | 92 | if (!lower_bound.has_infinity_type()) { |
1485 | 24 | low_sub_key_bound = encoded_doc_key; |
1486 | 24 | PrimitiveValue::Double(low_double).AppendToKey(&low_sub_key_bound); |
1487 | 24 | low_subkey = SliceKeyBound(low_sub_key_bound, LowerBound(lower_bound.is_exclusive())); |
1488 | 24 | } |
1489 | 92 | SliceKeyBound high_subkey; |
1490 | 92 | if (!upper_bound.has_infinity_type()) { |
1491 | 26 | high_sub_key_bound = encoded_doc_key; |
1492 | 26 | PrimitiveValue::Double(high_double).AppendToKey(&high_sub_key_bound); |
1493 | 26 | high_subkey = SliceKeyBound(high_sub_key_bound, UpperBound(upper_bound.is_exclusive())); |
1494 | 26 | } |
1495 | | |
1496 | 92 | SubDocument doc; |
1497 | 92 | bool doc_found = false; |
1498 | 92 | GetRedisSubDocumentData data = {encoded_doc_key, &doc, &doc_found}; |
1499 | 92 | data.deadline_info = deadline_info_.get_ptr(); |
1500 | 92 | data.low_subkey = &low_subkey; |
1501 | 92 | data.high_subkey = &high_subkey; |
1502 | | |
1503 | 92 | IndexBound low_index; |
1504 | 92 | IndexBound high_index; |
1505 | 92 | if (request_.has_range_request_limit()) { |
1506 | 50 | auto offset = request_.index_range().lower_bound().index(); |
1507 | 50 | int32_t limit = request_.range_request_limit(); |
1508 | | |
1509 | 50 | if (offset < 0 || limit == 038 ) { |
1510 | | // Return an empty response. |
1511 | 18 | response_.set_code(RedisResponsePB::OK); |
1512 | 18 | RETURN_NOT_OK(PopulateResponseFrom( |
1513 | 18 | SubDocument::ObjectContainer(), AddResponseValuesGeneric, &response_, |
1514 | 18 | /* add_keys */ true, /* add_values */ true)); |
1515 | 18 | return Status::OK(); |
1516 | 18 | } |
1517 | | |
1518 | 32 | low_index = IndexBound(offset, true /* is_lower */); |
1519 | 32 | data.low_index = &low_index; |
1520 | 32 | if (limit > 0) { |
1521 | | // Only define upper bound if limit is positive. |
1522 | 26 | high_index = IndexBound(offset + limit - 1, false /* is_lower */); |
1523 | 26 | data.high_index = &high_index; |
1524 | 26 | } |
1525 | 32 | } |
1526 | 74 | RETURN_NOT_OK(GetAndPopulateResponseValues( |
1527 | 74 | iterator_.get(), AddResponseValuesSortedSets, data, ValueType::kObject, request_, |
1528 | 74 | &response_, |
1529 | 74 | /* add_keys */ add_keys, /* add_values */ true, /* reverse */ false)); |
1530 | 10.8k | } else { |
1531 | 10.8k | auto encoded_doc_key = |
1532 | 10.8k | DocKey::EncodedFromRedisKey(request_.key_value().hash_code(), request_.key_value().key()); |
1533 | 10.8k | int64_t low_timestamp = lower_bound.subkey_bound().timestamp_subkey(); |
1534 | 10.8k | int64_t high_timestamp = upper_bound.subkey_bound().timestamp_subkey(); |
1535 | | |
1536 | 10.8k | KeyBytes low_sub_key_bound; |
1537 | 10.8k | KeyBytes high_sub_key_bound; |
1538 | | |
1539 | 10.8k | SliceKeyBound low_subkey; |
1540 | | // Need to switch the order since we store the timestamps in descending order. |
1541 | 10.8k | if (!upper_bound.has_infinity_type()) { |
1542 | 10.8k | low_sub_key_bound = encoded_doc_key; |
1543 | 10.8k | PrimitiveValue(high_timestamp, SortOrder::kDescending).AppendToKey(&low_sub_key_bound); |
1544 | 10.8k | low_subkey = SliceKeyBound(low_sub_key_bound, LowerBound(upper_bound.is_exclusive())); |
1545 | 10.8k | } |
1546 | 10.8k | SliceKeyBound high_subkey; |
1547 | 10.8k | if (!lower_bound.has_infinity_type()) { |
1548 | 10.8k | high_sub_key_bound = encoded_doc_key; |
1549 | 10.8k | PrimitiveValue(low_timestamp, SortOrder::kDescending).AppendToKey(&high_sub_key_bound); |
1550 | 10.8k | high_subkey = SliceKeyBound(high_sub_key_bound, UpperBound(lower_bound.is_exclusive())); |
1551 | 10.8k | } |
1552 | | |
1553 | 10.8k | SubDocument doc; |
1554 | 10.8k | bool doc_found = false; |
1555 | 10.8k | GetRedisSubDocumentData data = {encoded_doc_key, &doc, &doc_found}; |
1556 | 10.8k | data.deadline_info = deadline_info_.get_ptr(); |
1557 | 10.8k | data.low_subkey = &low_subkey; |
1558 | 10.8k | data.high_subkey = &high_subkey; |
1559 | 10.8k | data.limit = request_.range_request_limit(); |
1560 | 10.8k | bool is_reverse = true; |
1561 | 10.8k | if (request_type == RedisCollectionGetRangeRequestPB::TSREVRANGEBYTIME) { |
1562 | | // If reverse is false, newest element is the first element returned. |
1563 | 10.8k | is_reverse = false; |
1564 | 10.8k | } |
1565 | 10.8k | RETURN_NOT_OK(GetAndPopulateResponseValues( |
1566 | 10.8k | iterator_.get(), AddResponseValuesGeneric, data, ValueType::kRedisTS, request_, &response_, |
1567 | 10.8k | /* add_keys */ true, /* add_values */ true, is_reverse)); |
1568 | 10.8k | } |
1569 | 10.9k | return Status::OK(); |
1570 | 10.9k | } |
1571 | | |
1572 | 10.9k | Status RedisReadOperation::ExecuteCollectionGetRange() { |
1573 | 10.9k | const RedisKeyValuePB& key_value = request_.key_value(); |
1574 | 10.9k | if (!request_.has_key_value() || !key_value.has_key()) { |
1575 | 0 | return STATUS(InvalidArgument, "Need to specify the key"); |
1576 | 0 | } |
1577 | | |
1578 | 10.9k | const auto request_type = request_.get_collection_range_request().request_type(); |
1579 | 10.9k | switch (request_type) { |
1580 | 10.8k | case RedisCollectionGetRangeRequestPB::TSREVRANGEBYTIME: |
1581 | 10.8k | FALLTHROUGH_INTENDED; |
1582 | 10.9k | case RedisCollectionGetRangeRequestPB::ZRANGEBYSCORE: FALLTHROUGH_INTENDED; |
1583 | 10.9k | case RedisCollectionGetRangeRequestPB::TSRANGEBYTIME: { |
1584 | 10.9k | if(!request_.has_subkey_range() || !request_.subkey_range().has_lower_bound() || |
1585 | 10.9k | !request_.subkey_range().has_upper_bound()) { |
1586 | 0 | return STATUS(InvalidArgument, "Need to specify the subkey range"); |
1587 | 0 | } |
1588 | 10.9k | const RedisSubKeyBoundPB& lower_bound = request_.subkey_range().lower_bound(); |
1589 | 10.9k | const RedisSubKeyBoundPB& upper_bound = request_.subkey_range().upper_bound(); |
1590 | 10.9k | const bool add_keys = request_.get_collection_range_request().with_scores(); |
1591 | 10.9k | return ExecuteCollectionGetRangeByBounds(request_type, lower_bound, upper_bound, add_keys); |
1592 | 10.9k | } |
1593 | 12 | case RedisCollectionGetRangeRequestPB::ZRANGE: FALLTHROUGH_INTENDED; |
1594 | 20 | case RedisCollectionGetRangeRequestPB::ZREVRANGE: { |
1595 | 20 | if(!request_.has_index_range() || !request_.index_range().has_lower_bound() || |
1596 | 20 | !request_.index_range().has_upper_bound()) { |
1597 | 0 | return STATUS(InvalidArgument, "Need to specify the index range"); |
1598 | 0 | } |
1599 | | |
1600 | | // First make sure is of type sorted set or none. |
1601 | 20 | RedisDataType type = VERIFY_RESULT(GetValueType()); |
1602 | 0 | auto expected_type = RedisDataType::REDIS_TYPE_SORTEDSET; |
1603 | 20 | if (!VerifyTypeAndSetCode(expected_type, type, &response_, VerifySuccessIfMissing::kTrue)) { |
1604 | 0 | return Status::OK(); |
1605 | 0 | } |
1606 | | |
1607 | 20 | int64_t card = VERIFY_RESULT(GetCardinality(iterator_.get(), request_.key_value())); |
1608 | | |
1609 | 0 | const RedisIndexBoundPB& low_index_bound = request_.index_range().lower_bound(); |
1610 | 20 | const RedisIndexBoundPB& high_index_bound = request_.index_range().upper_bound(); |
1611 | | |
1612 | 20 | int64 low_idx_normalized, high_idx_normalized; |
1613 | | |
1614 | 20 | int64 low_idx = low_index_bound.index(); |
1615 | 20 | int64 high_idx = high_index_bound.index(); |
1616 | | // Normalize the bounds to be positive and go from low to high index. |
1617 | 20 | bool reverse = false; |
1618 | 20 | if (request_type == RedisCollectionGetRangeRequestPB::ZREVRANGE) { |
1619 | 8 | reverse = true; |
1620 | 8 | } |
1621 | 20 | GetNormalizedBounds( |
1622 | 20 | low_idx, high_idx, card, reverse, &low_idx_normalized, &high_idx_normalized); |
1623 | | |
1624 | 20 | if (high_idx_normalized < low_idx_normalized) { |
1625 | | // Return empty response. |
1626 | 0 | response_.set_code(RedisResponsePB::OK); |
1627 | 0 | RETURN_NOT_OK(PopulateResponseFrom(SubDocument::ObjectContainer(), |
1628 | 0 | AddResponseValuesGeneric, |
1629 | 0 | &response_, /* add_keys */ |
1630 | 0 | true, /* add_values */ |
1631 | 0 | true)); |
1632 | 0 | return Status::OK(); |
1633 | 0 | } |
1634 | 20 | auto encoded_doc_key = DocKey::EncodedFromRedisKey( |
1635 | 20 | request_.key_value().hash_code(), request_.key_value().key()); |
1636 | 20 | PrimitiveValue(ValueType::kSSForward).AppendToKey(&encoded_doc_key); |
1637 | | |
1638 | 20 | bool add_keys = request_.get_collection_range_request().with_scores(); |
1639 | | |
1640 | 20 | IndexBound low_bound = IndexBound(low_idx_normalized, true /* is_lower */); |
1641 | 20 | IndexBound high_bound = IndexBound(high_idx_normalized, false /* is_lower */); |
1642 | | |
1643 | 20 | SubDocument doc; |
1644 | 20 | bool doc_found = false; |
1645 | 20 | GetRedisSubDocumentData data = { encoded_doc_key, &doc, &doc_found}; |
1646 | 20 | data.deadline_info = deadline_info_.get_ptr(); |
1647 | 20 | data.low_index = &low_bound; |
1648 | 20 | data.high_index = &high_bound; |
1649 | | |
1650 | 20 | RETURN_NOT_OK(GetAndPopulateResponseValues( |
1651 | 20 | iterator_.get(), AddResponseValuesSortedSets, data, ValueType::kObject, request_, |
1652 | 20 | &response_, add_keys, /* add_values */ true, reverse)); |
1653 | 20 | break; |
1654 | 20 | } |
1655 | 20 | case RedisCollectionGetRangeRequestPB::UNKNOWN: |
1656 | 0 | return STATUS(InvalidCommand, "Unknown Collection Get Range Request not supported"); |
1657 | 10.9k | } |
1658 | | |
1659 | 20 | return Status::OK(); |
1660 | 10.9k | } |
1661 | | |
1662 | 72.7k | Result<RedisDataType> RedisReadOperation::GetValueType(int subkey_index) { |
1663 | 72.7k | return GetRedisValueType(iterator_.get(), request_.key_value(), |
1664 | 72.7k | nullptr /* doc_write_batch */, subkey_index); |
1665 | 72.7k | } |
1666 | | |
1667 | 1.21k | Result<RedisValue> RedisReadOperation::GetOverrideValue(int subkey_index) { |
1668 | 1.21k | return GetRedisValue(iterator_.get(), request_.key_value(), |
1669 | 1.21k | subkey_index, /* always_override */ true); |
1670 | 1.21k | } |
1671 | | |
1672 | 71.3k | Result<RedisValue> RedisReadOperation::GetValue(int subkey_index) { |
1673 | 71.3k | return GetRedisValue(iterator_.get(), request_.key_value(), subkey_index); |
1674 | 71.3k | } |
1675 | | |
1676 | | namespace { |
1677 | | |
1678 | | // Note: Do not use if also retrieving other value, as some work will be repeated. |
1679 | | // Assumes every value has a TTL, and the TTL is stored in the row with this key. |
1680 | | // Also observe that tombstone checking only works because we assume the key has |
1681 | | // no ancestors. |
1682 | | Result<boost::optional<Expiration>> GetTtl( |
1683 | 291 | const Slice& encoded_subdoc_key, IntentAwareIterator* iter) { |
1684 | 291 | auto dockey_size = |
1685 | 291 | VERIFY_RESULT(DocKey::EncodedSize(encoded_subdoc_key, DocKeyPart::kWholeDocKey)); |
1686 | 0 | Slice key_slice(encoded_subdoc_key.data(), dockey_size); |
1687 | 291 | iter->Seek(key_slice); |
1688 | 291 | if (!iter->valid()) |
1689 | 16 | return boost::none; |
1690 | 275 | auto key_data = VERIFY_RESULT(iter->FetchKey()); |
1691 | 275 | if (!key_data.key.compare(key_slice)) { |
1692 | 275 | Value doc_value = Value(PrimitiveValue(ValueType::kInvalid)); |
1693 | 275 | RETURN_NOT_OK(doc_value.Decode(iter->value())); |
1694 | 275 | if (doc_value.value_type() != ValueType::kTombstone) { |
1695 | 239 | return Expiration(key_data.write_time.hybrid_time(), doc_value.ttl()); |
1696 | 239 | } |
1697 | 275 | } |
1698 | 36 | return boost::none; |
1699 | 275 | } |
1700 | | |
1701 | | } // namespace |
1702 | | |
1703 | 291 | Status RedisReadOperation::ExecuteGetTtl() { |
1704 | 291 | const RedisKeyValuePB& kv = request_.key_value(); |
1705 | 291 | if (!kv.has_key()) { |
1706 | 0 | return STATUS(Corruption, "Expected KeyValuePB"); |
1707 | 0 | } |
1708 | | // We currently only support getting and setting TTL on top level keys. |
1709 | 291 | if (!kv.subkey().empty()) { |
1710 | 0 | return STATUS_SUBSTITUTE(Corruption, |
1711 | 0 | "Expected no subkeys, got $0", kv.subkey().size()); |
1712 | 0 | } |
1713 | | |
1714 | 291 | auto encoded_doc_key = DocKey::EncodedFromRedisKey(kv.hash_code(), kv.key()); |
1715 | 291 | auto maybe_ttl_exp = VERIFY_RESULT(GetTtl(encoded_doc_key.AsSlice(), iterator_.get())); |
1716 | | |
1717 | 291 | if (!maybe_ttl_exp.has_value()) { |
1718 | 52 | response_.set_int_response(-2); |
1719 | 52 | return Status::OK(); |
1720 | 52 | } |
1721 | | |
1722 | 239 | auto exp = maybe_ttl_exp.get(); |
1723 | 239 | if (exp.ttl.Equals(ValueControlFields::kMaxTtl)) { |
1724 | 57 | response_.set_int_response(-1); |
1725 | 57 | return Status::OK(); |
1726 | 57 | } |
1727 | | |
1728 | 182 | MonoDelta ttl = VERIFY_RESULT(exp.ComputeRelativeTtl(iterator_->read_time().read)); |
1729 | 182 | if (ttl.IsNegative()) { |
1730 | | // The value has expired. |
1731 | 48 | response_.set_int_response(-2); |
1732 | 48 | return Status::OK(); |
1733 | 48 | } |
1734 | | |
1735 | 134 | response_.set_int_response(request_.get_ttl_request().return_seconds() ? |
1736 | 71 | (int64_t) std::round(ttl.ToSeconds()) : |
1737 | 134 | ttl.ToMilliseconds()63 ); |
1738 | 134 | return Status::OK(); |
1739 | 182 | } |
1740 | | |
1741 | 4 | Status RedisReadOperation::ExecuteGetForRename() { |
1742 | 4 | RedisDataType type = VERIFY_RESULT(GetValueType()); |
1743 | 0 | response_.set_type(type); |
1744 | 4 | switch (type) { |
1745 | 4 | case RedisDataType::REDIS_TYPE_STRING: { |
1746 | 4 | return ExecuteGet(RedisGetRequestPB::GET); |
1747 | 0 | } |
1748 | | |
1749 | 0 | case RedisDataType::REDIS_TYPE_HASH: { |
1750 | 0 | return ExecuteGet(RedisGetRequestPB::HGETALL); |
1751 | 0 | } |
1752 | | |
1753 | 0 | case RedisDataType::REDIS_TYPE_SET: { |
1754 | 0 | return ExecuteGet(RedisGetRequestPB::SMEMBERS); |
1755 | 0 | } |
1756 | | |
1757 | 0 | case RedisDataType::REDIS_TYPE_SORTEDSET: { |
1758 | 0 | return ExecuteCollectionGetRangeByBounds( |
1759 | 0 | RedisCollectionGetRangeRequestPB::ZRANGEBYSCORE, true); |
1760 | 0 | } |
1761 | | |
1762 | 0 | case RedisDataType::REDIS_TYPE_TIMESERIES: { |
1763 | 0 | return ExecuteCollectionGetRangeByBounds( |
1764 | 0 | RedisCollectionGetRangeRequestPB::TSRANGEBYTIME, true); |
1765 | 0 | } |
1766 | | |
1767 | 0 | case RedisDataType::REDIS_TYPE_NONE: { |
1768 | 0 | response_.set_code(RedisResponsePB::NOT_FOUND); |
1769 | 0 | return Status::OK(); |
1770 | 0 | } |
1771 | | |
1772 | 0 | case RedisDataType::REDIS_TYPE_LIST: |
1773 | 0 | default: { |
1774 | 0 | LOG(DFATAL) << "Unhandled Redis Data Type " << type; |
1775 | 0 | } |
1776 | 4 | } |
1777 | 0 | return Status::OK(); |
1778 | 4 | } |
1779 | | |
1780 | 4 | Status RedisReadOperation::ExecuteGet(RedisGetRequestPB::GetRequestType type) { |
1781 | 4 | RedisGetRequestPB request; |
1782 | 4 | request.set_request_type(type); |
1783 | 4 | return ExecuteGet(request); |
1784 | 4 | } |
1785 | | |
1786 | 72.6k | Status RedisReadOperation::ExecuteGet() { return ExecuteGet(request_.get_request()); } |
1787 | | |
1788 | 72.7k | Status RedisReadOperation::ExecuteGet(const RedisGetRequestPB& get_request) { |
1789 | 72.7k | auto request_type = get_request.request_type(); |
1790 | 72.7k | RedisDataType expected_type = REDIS_TYPE_NONE; |
1791 | 72.7k | switch (request_type) { |
1792 | 71.3k | case RedisGetRequestPB::GET: |
1793 | 71.3k | expected_type = REDIS_TYPE_STRING; break; |
1794 | 1.21k | case RedisGetRequestPB::TSGET: |
1795 | 1.21k | expected_type = REDIS_TYPE_TIMESERIES; break; |
1796 | 14 | case RedisGetRequestPB::HGET: FALLTHROUGH_INTENDED; |
1797 | 20 | case RedisGetRequestPB::HEXISTS: |
1798 | 20 | expected_type = REDIS_TYPE_HASH; break; |
1799 | 4 | case RedisGetRequestPB::SISMEMBER: |
1800 | 4 | expected_type = REDIS_TYPE_SET; break; |
1801 | 52 | case RedisGetRequestPB::ZSCORE: |
1802 | 52 | expected_type = REDIS_TYPE_SORTEDSET; break; |
1803 | 71 | default: |
1804 | 71 | expected_type = REDIS_TYPE_NONE; |
1805 | 72.7k | } |
1806 | 72.7k | switch (request_type) { |
1807 | 71.3k | case RedisGetRequestPB::GET: FALLTHROUGH_INTENDED; |
1808 | 72.5k | case RedisGetRequestPB::TSGET: FALLTHROUGH_INTENDED; |
1809 | 72.5k | case RedisGetRequestPB::HGET: { |
1810 | 72.5k | RedisDataType type = VERIFY_RESULT(GetValueType()); |
1811 | | // TODO: this is primarily glue for the Timeseries bug where the parent |
1812 | | // may get compacted due to an outdated TTL even though the children |
1813 | | // have longer TTL's and thus still exist. When fixing, take note that |
1814 | | // GetValueType finds the value type of the parent, so if the parent |
1815 | | // does not have the maximum TTL, it will return REDIS_TYPE_NONE when it |
1816 | | // should not. |
1817 | 72.5k | if (expected_type == REDIS_TYPE_TIMESERIES && type == REDIS_TYPE_NONE1.21k ) { |
1818 | 0 | type = expected_type; |
1819 | 0 | } |
1820 | | // If wrong type, we set the error code in the response. |
1821 | 72.5k | if (VerifyTypeAndSetCode(expected_type, type, &response_, VerifySuccessIfMissing::kTrue)) { |
1822 | 72.5k | auto value = request_type == RedisGetRequestPB::TSGET ? GetOverrideValue()1.21k : GetValue()71.3k ; |
1823 | 72.5k | RETURN_NOT_OK(value); |
1824 | 72.5k | if (VerifyTypeAndSetCode(RedisDataType::REDIS_TYPE_STRING, value->type, &response_, |
1825 | 72.5k | VerifySuccessIfMissing::kTrue)) { |
1826 | 72.5k | response_.set_string_response(value->value); |
1827 | 72.5k | } |
1828 | 72.5k | } |
1829 | 72.5k | return Status::OK(); |
1830 | 72.5k | } |
1831 | 52 | case RedisGetRequestPB::ZSCORE: { |
1832 | 52 | RedisDataType type = VERIFY_RESULT(GetValueType()); |
1833 | | // If wrong type, we set the error code in the response. |
1834 | 52 | if (!VerifyTypeAndSetCode(expected_type, type, &response_, VerifySuccessIfMissing::kTrue)) { |
1835 | 6 | return Status::OK(); |
1836 | 6 | } |
1837 | 46 | SubDocKey key_reverse = SubDocKey( |
1838 | 46 | DocKey::FromRedisKey(request_.key_value().hash_code(), request_.key_value().key()), |
1839 | 46 | PrimitiveValue(ValueType::kSSReverse), |
1840 | 46 | PrimitiveValue(request_.key_value().subkey(0).string_subkey())); |
1841 | 46 | SubDocument subdoc_reverse; |
1842 | 46 | bool subdoc_reverse_found = false; |
1843 | 46 | auto encoded_key_reverse = key_reverse.EncodeWithoutHt(); |
1844 | 46 | GetRedisSubDocumentData get_data = { |
1845 | 46 | encoded_key_reverse, &subdoc_reverse, &subdoc_reverse_found }; |
1846 | 46 | RETURN_NOT_OK(GetRedisSubDocument(doc_db_, get_data, redis_query_id(), |
1847 | 46 | TransactionOperationContext(), deadline_, read_time_)); |
1848 | 46 | if (subdoc_reverse_found) { |
1849 | 31 | double score = subdoc_reverse.GetDouble(); |
1850 | 31 | response_.set_string_response(std::to_string(score)); |
1851 | 31 | } else { |
1852 | 15 | response_.set_code(RedisResponsePB::NIL); |
1853 | 15 | } |
1854 | 46 | return Status::OK(); |
1855 | 46 | } |
1856 | 6 | case RedisGetRequestPB::HEXISTS: FALLTHROUGH_INTENDED; |
1857 | 10 | case RedisGetRequestPB::SISMEMBER: { |
1858 | 10 | RedisDataType type = VERIFY_RESULT(GetValueType()); |
1859 | 10 | if (VerifyTypeAndSetCode(expected_type, type, &response_, VerifySuccessIfMissing::kTrue)) { |
1860 | 10 | RedisDataType subtype = VERIFY_RESULT(GetValueType(0)); |
1861 | 0 | SetOptionalInt(subtype, 1, &response_); |
1862 | 10 | response_.set_code(RedisResponsePB::OK); |
1863 | 10 | } |
1864 | 10 | return Status::OK(); |
1865 | 10 | } |
1866 | 0 | case RedisGetRequestPB::HSTRLEN: { |
1867 | 0 | RedisDataType type = VERIFY_RESULT(GetValueType()); |
1868 | 0 | if (VerifyTypeAndSetCode(RedisDataType::REDIS_TYPE_HASH, type, &response_, |
1869 | 0 | VerifySuccessIfMissing::kTrue)) { |
1870 | 0 | auto value = GetValue(); |
1871 | 0 | RETURN_NOT_OK(value); |
1872 | 0 | SetOptionalInt(value->type, value->value.length(), &response_); |
1873 | 0 | response_.set_code(RedisResponsePB::OK); |
1874 | 0 | } |
1875 | 0 | return Status::OK(); |
1876 | 0 | } |
1877 | 0 | case RedisGetRequestPB::MGET: { |
1878 | 0 | return STATUS(NotSupported, "MGET not yet supported"); |
1879 | 0 | } |
1880 | 3 | case RedisGetRequestPB::HMGET: { |
1881 | 3 | RedisDataType type = VERIFY_RESULT(GetValueType()); |
1882 | 3 | if (!VerifyTypeAndSetCode(RedisDataType::REDIS_TYPE_HASH, type, &response_, |
1883 | 3 | VerifySuccessIfMissing::kTrue)) { |
1884 | 0 | return Status::OK(); |
1885 | 0 | } |
1886 | | |
1887 | 3 | response_.set_allocated_array_response(new RedisArrayPB()); |
1888 | 3 | const auto& req_kv = request_.key_value(); |
1889 | 3 | auto num_subkeys = req_kv.subkey_size(); |
1890 | 3 | vector<int> indices(num_subkeys); |
1891 | 11 | for (int i = 0; i < num_subkeys; ++i8 ) { |
1892 | 8 | indices[i] = i; |
1893 | 8 | } |
1894 | 6 | std::sort(indices.begin(), indices.end(), [&req_kv](int i, int j) { |
1895 | 6 | return req_kv.subkey(i).string_subkey() < req_kv.subkey(j).string_subkey(); |
1896 | 6 | }); |
1897 | | |
1898 | 3 | string current_value = ""; |
1899 | 3 | response_.mutable_array_response()->mutable_elements()->Reserve(num_subkeys); |
1900 | 11 | for (int i = 0; i < num_subkeys; ++i8 ) { |
1901 | 8 | response_.mutable_array_response()->add_elements(); |
1902 | 8 | } |
1903 | 11 | for (int i = 0; i < num_subkeys; ++i8 ) { |
1904 | 8 | if (i == 0 || |
1905 | 8 | req_kv.subkey(indices[i]).string_subkey() != |
1906 | 8 | req_kv.subkey(indices[i - 1]).string_subkey()) { |
1907 | | // If the condition above is false, we encountered the same key again, no need to call |
1908 | | // GetValue() once more, current_value is already correct. |
1909 | 8 | auto value = GetValue(indices[i]); |
1910 | 8 | RETURN_NOT_OK(value); |
1911 | 8 | if (value->type == REDIS_TYPE_STRING) { |
1912 | 6 | current_value = std::move(value->value); |
1913 | 6 | } else { |
1914 | 2 | current_value = ""; // Empty string is nil response. |
1915 | 2 | } |
1916 | 8 | } |
1917 | 8 | *response_.mutable_array_response()->mutable_elements(indices[i]) = current_value; |
1918 | 8 | } |
1919 | | |
1920 | 3 | response_.set_code(RedisResponsePB::OK); |
1921 | 3 | return Status::OK(); |
1922 | 3 | } |
1923 | 5 | case RedisGetRequestPB::HGETALL: |
1924 | 5 | return ExecuteHGetAllLikeCommands(ValueType::kObject, true, true); |
1925 | 3 | case RedisGetRequestPB::HKEYS: |
1926 | 3 | return ExecuteHGetAllLikeCommands(ValueType::kObject, true, false); |
1927 | 3 | case RedisGetRequestPB::HVALS: |
1928 | 3 | return ExecuteHGetAllLikeCommands(ValueType::kObject, false, true); |
1929 | 4 | case RedisGetRequestPB::HLEN: |
1930 | 4 | return ExecuteHGetAllLikeCommands(ValueType::kObject, false, false); |
1931 | 5 | case RedisGetRequestPB::SMEMBERS: |
1932 | 5 | return ExecuteHGetAllLikeCommands(ValueType::kRedisSet, true, false); |
1933 | 4 | case RedisGetRequestPB::SCARD: |
1934 | 4 | return ExecuteHGetAllLikeCommands(ValueType::kRedisSet, false, false); |
1935 | 36 | case RedisGetRequestPB::TSCARD: |
1936 | 36 | return ExecuteHGetAllLikeCommands(ValueType::kRedisTS, false, false); |
1937 | 2 | case RedisGetRequestPB::ZCARD: |
1938 | 2 | return ExecuteHGetAllLikeCommands(ValueType::kRedisSortedSet, false, false); |
1939 | 6 | case RedisGetRequestPB::LLEN: |
1940 | 6 | return ExecuteHGetAllLikeCommands(ValueType::kRedisList, false, false); |
1941 | 0 | case RedisGetRequestPB::UNKNOWN: { |
1942 | 0 | return STATUS(InvalidCommand, "Unknown Get Request not supported"); |
1943 | 3 | } |
1944 | 72.7k | } |
1945 | 0 | return Status::OK(); |
1946 | 72.7k | } |
1947 | | |
1948 | 2 | Status RedisReadOperation::ExecuteStrLen() { |
1949 | 2 | auto value = GetValue(); |
1950 | 2 | response_.set_code(RedisResponsePB::OK); |
1951 | 2 | RETURN_NOT_OK(value); |
1952 | | |
1953 | 2 | if (VerifyTypeAndSetCode(RedisDataType::REDIS_TYPE_STRING, value->type, &response_, |
1954 | 2 | VerifySuccessIfMissing::kTrue)) { |
1955 | 2 | SetOptionalInt(value->type, value->value.length(), &response_); |
1956 | 2 | } |
1957 | 2 | response_.set_code(RedisResponsePB::OK); |
1958 | | |
1959 | 2 | return Status::OK(); |
1960 | 2 | } |
1961 | | |
1962 | 4 | Status RedisReadOperation::ExecuteExists() { |
1963 | 4 | auto value = GetValue(); |
1964 | 4 | response_.set_code(RedisResponsePB::OK); |
1965 | 4 | RETURN_NOT_OK(value); |
1966 | | |
1967 | | // We only support exist command with one argument currently. |
1968 | 4 | SetOptionalInt(value->type, 1, &response_); |
1969 | 4 | response_.set_code(RedisResponsePB::OK); |
1970 | | |
1971 | 4 | return Status::OK(); |
1972 | 4 | } |
1973 | | |
1974 | 2 | Status RedisReadOperation::ExecuteGetRange() { |
1975 | 2 | auto value = GetValue(); |
1976 | 2 | RETURN_NOT_OK(value); |
1977 | | |
1978 | 2 | if (!VerifyTypeAndSetCode(RedisDataType::REDIS_TYPE_STRING, value->type, &response_, |
1979 | 2 | VerifySuccessIfMissing::kTrue)) { |
1980 | | // We've already set the error code in the response. |
1981 | 0 | return Status::OK(); |
1982 | 0 | } |
1983 | | |
1984 | 2 | const ssize_t len = value->value.length(); |
1985 | 2 | ssize_t exclusive_end = request_.get_range_request().end() + 1; |
1986 | 2 | if (exclusive_end == 0) { |
1987 | 0 | exclusive_end = len; |
1988 | 0 | } |
1989 | | |
1990 | | // We treat negative indices to refer backwards from the end of the string. |
1991 | 2 | const auto start = ApplyIndex(request_.get_range_request().start(), len); |
1992 | 2 | auto end = ApplyIndex(exclusive_end, len); |
1993 | 2 | if (end < start) { |
1994 | 0 | end = start; |
1995 | 0 | } |
1996 | | |
1997 | 2 | response_.set_code(RedisResponsePB::OK); |
1998 | 2 | response_.set_string_response(value->value.c_str() + start, end - start); |
1999 | 2 | return Status::OK(); |
2000 | 2 | } |
2001 | | |
2002 | 630 | Status RedisReadOperation::ExecuteKeys() { |
2003 | 630 | iterator_->Seek(DocKey()); |
2004 | 630 | int threshold = request_.keys_request().threshold(); |
2005 | | |
2006 | 630 | bool doc_found; |
2007 | 630 | SubDocument result; |
2008 | | |
2009 | 480k | while (iterator_->valid()) { |
2010 | 480k | if (deadline_info_.get_ptr() && deadline_info_->CheckAndSetDeadlinePassed()) { |
2011 | 0 | return STATUS(Expired, "Deadline for query passed."); |
2012 | 0 | } |
2013 | 480k | auto key = VERIFY_RESULT(iterator_->FetchKey()).key; |
2014 | | |
2015 | | // Key could be invalidated because we could move iterator, so back it up. |
2016 | 0 | KeyBytes key_copy(key); |
2017 | 480k | key = key_copy.AsSlice(); |
2018 | | |
2019 | 480k | DocKey doc_key; |
2020 | 480k | RETURN_NOT_OK(doc_key.FullyDecodeFrom(key)); |
2021 | 480k | const PrimitiveValue& key_primitive = doc_key.hashed_group().front(); |
2022 | 480k | if (!key_primitive.IsString() || |
2023 | 480k | !RedisPatternMatch(request_.keys_request().pattern(), |
2024 | 480k | key_primitive.GetString(), |
2025 | 480k | false /* ignore_case */)) { |
2026 | 292k | iterator_->SeekOutOfSubDoc(key); |
2027 | 292k | continue; |
2028 | 292k | } |
2029 | | |
2030 | 187k | GetRedisSubDocumentData data = {key, &result, &doc_found}; |
2031 | 187k | data.deadline_info = deadline_info_.get_ptr(); |
2032 | 187k | data.return_type_only = true; |
2033 | 187k | RETURN_NOT_OK(GetRedisSubDocument(iterator_.get(), data, /* projection */ nullptr, |
2034 | 187k | SeekFwdSuffices::kFalse)); |
2035 | | |
2036 | 187k | if (doc_found) { |
2037 | 187k | if (--threshold < 0) { |
2038 | 6 | response_.clear_array_response(); |
2039 | 6 | response_.set_code(RedisResponsePB::SERVER_ERROR); |
2040 | 6 | response_.set_error_message("Too many keys in the database."); |
2041 | 6 | return Status::OK(); |
2042 | 6 | } |
2043 | 187k | RETURN_NOT_OK(AddPrimitiveValueToResponseArray(key_primitive, |
2044 | 187k | response_.mutable_array_response())); |
2045 | 187k | } |
2046 | 187k | iterator_->SeekOutOfSubDoc(key); |
2047 | 187k | } |
2048 | | |
2049 | 624 | response_.set_code(RedisResponsePB::OK); |
2050 | 624 | return Status::OK(); |
2051 | 630 | } |
2052 | | |
2053 | 84.6k | const RedisResponsePB& RedisReadOperation::response() { |
2054 | 84.6k | return response_; |
2055 | 84.6k | } |
2056 | | |
2057 | | } // namespace docdb |
2058 | | } // namespace yb |