/Users/deen/code/yugabyte-db/src/yb/docdb/redis_operation.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | // |
13 | | |
14 | | #include "yb/docdb/redis_operation.h" |
15 | | |
16 | | #include "yb/docdb/doc_key.h" |
17 | | #include "yb/docdb/doc_path.h" |
18 | | #include "yb/docdb/doc_reader_redis.h" |
19 | | #include "yb/docdb/doc_ttl_util.h" |
20 | | #include "yb/docdb/doc_write_batch.h" |
21 | | #include "yb/docdb/doc_write_batch_cache.h" |
22 | | #include "yb/docdb/docdb_rocksdb_util.h" |
23 | | #include "yb/docdb/subdocument.h" |
24 | | |
25 | | #include "yb/server/hybrid_clock.h" |
26 | | |
27 | | #include "yb/util/redis_util.h" |
28 | | #include "yb/util/status_format.h" |
29 | | #include "yb/util/stol_utils.h" |
30 | | |
31 | | DEFINE_bool(emulate_redis_responses, |
32 | | true, |
33 | | "If emulate_redis_responses is false, we hope to get slightly better performance by just " |
34 | | "returning OK for commands that might require us to read additional records viz. SADD, HSET, " |
35 | | "and HDEL. If emulate_redis_responses is true, we read the required records to compute the " |
36 | | "response as specified by the official Redis API documentation. https://redis.io/commands"); |
37 | | |
38 | | namespace yb { |
39 | | namespace docdb { |
40 | | |
41 | | // A simple conversion from RedisDataTypes to ValueTypes |
42 | | // Note: May run into issues if we want to support ttl on individual set elements, |
43 | | // as they are represented by ValueType::kNullLow. |
44 | 18 | ValueType ValueTypeFromRedisType(RedisDataType dt) { |
45 | 18 | switch(dt) { |
46 | 15 | case RedisDataType::REDIS_TYPE_STRING: |
47 | 15 | return ValueType::kString; |
48 | 0 | case RedisDataType::REDIS_TYPE_SET: |
49 | 0 | return ValueType::kRedisSet; |
50 | 0 | case RedisDataType::REDIS_TYPE_HASH: |
51 | 0 | return ValueType::kObject; |
52 | 3 | case RedisDataType::REDIS_TYPE_SORTEDSET: |
53 | 3 | return ValueType::kRedisSortedSet; |
54 | 0 | case RedisDataType::REDIS_TYPE_TIMESERIES: |
55 | 0 | return ValueType::kRedisTS; |
56 | 0 | case RedisDataType::REDIS_TYPE_LIST: |
57 | 0 | return ValueType::kRedisList; |
58 | 0 | default: |
59 | 0 | return ValueType::kInvalid; |
60 | 18 | } |
61 | 18 | } |
62 | | |
63 | | Status RedisWriteOperation::GetDocPaths( |
64 | 61.5k | GetDocPathsMode mode, DocPathsToLock* paths, IsolationLevel *level) const { |
65 | 61.5k | paths->push_back(DocKey::FromRedisKey( |
66 | 61.5k | request_.key_value().hash_code(), request_.key_value().key()).EncodeAsRefCntPrefix()); |
67 | 61.5k | *level = IsolationLevel::SNAPSHOT_ISOLATION; |
68 | | |
69 | 61.5k | return Status::OK(); |
70 | 61.5k | } |
71 | | |
72 | | namespace { |
73 | | |
74 | 51 | bool EmulateRedisResponse(const RedisDataType& data_type) { |
75 | 51 | return FLAGS_emulate_redis_responses && data_type != REDIS_TYPE_TIMESERIES; |
76 | 51 | } |
77 | | |
78 | | static const string wrong_type_message = |
79 | | "WRONGTYPE Operation against a key holding the wrong kind of value"; |
80 | | |
81 | | CHECKED_STATUS PrimitiveValueFromSubKey(const RedisKeyValueSubKeyPB &subkey_pb, |
82 | 45.4k | PrimitiveValue *primitive_value) { |
83 | 45.4k | switch (subkey_pb.subkey_case()) { |
84 | 29.7k | case RedisKeyValueSubKeyPB::kStringSubkey: |
85 | 29.7k | *primitive_value = PrimitiveValue(subkey_pb.string_subkey()); |
86 | 29.7k | break; |
87 | 15.7k | case RedisKeyValueSubKeyPB::kTimestampSubkey: |
88 | | // We use descending order for the timestamp in the timeseries type so that the latest |
89 | | // value sorts on top. |
90 | 15.7k | *primitive_value = PrimitiveValue(subkey_pb.timestamp_subkey(), SortOrder::kDescending); |
91 | 15.7k | break; |
92 | 0 | case RedisKeyValueSubKeyPB::kDoubleSubkey: { |
93 | 0 | *primitive_value = PrimitiveValue::Double(subkey_pb.double_subkey()); |
94 | 0 | break; |
95 | 0 | } |
96 | 0 | default: |
97 | 0 | return STATUS_SUBSTITUTE(IllegalState, "Invalid enum value $0", subkey_pb.subkey_case()); |
98 | 45.4k | } |
99 | 45.4k | return Status::OK(); |
100 | 45.4k | } |
101 | | |
102 | | // Stricter version of the above when we know the exact datatype to expect. |
103 | | CHECKED_STATUS PrimitiveValueFromSubKeyStrict(const RedisKeyValueSubKeyPB &subkey_pb, |
104 | | const RedisDataType &data_type, |
105 | 44.8k | PrimitiveValue *primitive_value) { |
106 | 44.8k | switch (data_type) { |
107 | 0 | case REDIS_TYPE_LIST: FALLTHROUGH_INTENDED; |
108 | 0 | case REDIS_TYPE_SET: FALLTHROUGH_INTENDED; |
109 | 29.7k | case REDIS_TYPE_HASH: |
110 | 29.7k | if (!subkey_pb.has_string_subkey()) { |
111 | 0 | return STATUS_SUBSTITUTE(InvalidArgument, "subkey: $0 should be of string type", |
112 | 0 | subkey_pb.ShortDebugString()); |
113 | 0 | } |
114 | 29.7k | break; |
115 | 15.1k | case REDIS_TYPE_TIMESERIES: |
116 | 15.1k | if (!subkey_pb.has_timestamp_subkey()) { |
117 | 0 | return STATUS_SUBSTITUTE(InvalidArgument, "subkey: $0 should be of int64 type", |
118 | 0 | subkey_pb.ShortDebugString()); |
119 | 0 | } |
120 | 15.1k | break; |
121 | 0 | case REDIS_TYPE_SORTEDSET: |
122 | 0 | if (!subkey_pb.has_double_subkey()) { |
123 | 0 | return STATUS_SUBSTITUTE(InvalidArgument, "subkey: $0 should be of double type", |
124 | 0 | subkey_pb.ShortDebugString()); |
125 | 0 | } |
126 | 0 | break; |
127 | 0 | default: |
128 | 0 | return STATUS_SUBSTITUTE(IllegalState, "Invalid enum value $0", data_type); |
129 | 44.8k | } |
130 | 44.8k | return PrimitiveValueFromSubKey(subkey_pb, primitive_value); |
131 | 44.8k | } |
132 | | |
133 | | Result<RedisDataType> GetRedisValueType( |
134 | | IntentAwareIterator* iterator, |
135 | | const RedisKeyValuePB &key_value_pb, |
136 | | DocWriteBatch* doc_write_batch = nullptr, |
137 | | int subkey_index = kNilSubkeyIndex, |
138 | 57.6k | bool always_override = false) { |
139 | 57.6k | if (!key_value_pb.has_key()) { |
140 | 0 | return STATUS(Corruption, "Expected KeyValuePB"); |
141 | 0 | } |
142 | 57.6k | KeyBytes encoded_subdoc_key; |
143 | 57.6k | if (subkey_index == kNilSubkeyIndex) { |
144 | 57.6k | encoded_subdoc_key = DocKey::EncodedFromRedisKey(key_value_pb.hash_code(), key_value_pb.key()); |
145 | 0 | } else { |
146 | 0 | if (subkey_index >= key_value_pb.subkey_size()) { |
147 | 0 | return STATUS_SUBSTITUTE(InvalidArgument, |
148 | 0 | "Size of subkeys ($0) must be larger than subkey_index ($1)", |
149 | 0 | key_value_pb.subkey_size(), subkey_index); |
150 | 0 | } |
151 | | |
152 | 0 | PrimitiveValue subkey_primitive; |
153 | 0 | RETURN_NOT_OK(PrimitiveValueFromSubKey(key_value_pb.subkey(subkey_index), &subkey_primitive)); |
154 | 0 | encoded_subdoc_key = DocKey::EncodedFromRedisKey(key_value_pb.hash_code(), key_value_pb.key()); |
155 | 0 | subkey_primitive.AppendToKey(&encoded_subdoc_key); |
156 | 0 | } |
157 | 57.6k | SubDocument doc; |
158 | 57.6k | bool doc_found = false; |
159 | | // Use the cached entry if possible to determine the value type. |
160 | 57.6k | boost::optional<DocWriteBatchCache::Entry> cached_entry; |
161 | 57.6k | if (doc_write_batch) { |
162 | 20.5k | cached_entry = doc_write_batch->LookupCache(encoded_subdoc_key); |
163 | 20.5k | } |
164 | | |
165 | 57.6k | if (cached_entry) { |
166 | 0 | doc_found = true; |
167 | 0 | doc = SubDocument(cached_entry->value_type); |
168 | 57.6k | } else { |
169 | | // TODO(dtxn) - pass correct transaction context when we implement cross-shard transactions |
170 | | // support for Redis. |
171 | 57.6k | GetRedisSubDocumentData data = { encoded_subdoc_key, &doc, &doc_found }; |
172 | 57.6k | data.return_type_only = true; |
173 | 57.6k | data.exp.always_override = always_override; |
174 | 57.6k | RETURN_NOT_OK(GetRedisSubDocument(iterator, data, /* projection */ nullptr, |
175 | 57.6k | SeekFwdSuffices::kFalse)); |
176 | 57.6k | } |
177 | | |
178 | 57.6k | if (!doc_found) { |
179 | 22.5k | return REDIS_TYPE_NONE; |
180 | 22.5k | } |
181 | | |
182 | 35.1k | switch (doc.value_type()) { |
183 | 0 | case ValueType::kInvalid: FALLTHROUGH_INTENDED; |
184 | 0 | case ValueType::kTombstone: |
185 | 0 | return REDIS_TYPE_NONE; |
186 | 0 | case ValueType::kObject: |
187 | 0 | return REDIS_TYPE_HASH; |
188 | 0 | case ValueType::kRedisSet: |
189 | 0 | return REDIS_TYPE_SET; |
190 | 627 | case ValueType::kRedisTS: |
191 | 627 | return REDIS_TYPE_TIMESERIES; |
192 | 51 | case ValueType::kRedisSortedSet: |
193 | 51 | return REDIS_TYPE_SORTEDSET; |
194 | 0 | case ValueType::kRedisList: |
195 | 0 | return REDIS_TYPE_LIST; |
196 | 0 | case ValueType::kNullLow: FALLTHROUGH_INTENDED; // This value is a set member. |
197 | 34.4k | case ValueType::kString: |
198 | 34.4k | return REDIS_TYPE_STRING; |
199 | 0 | default: |
200 | 0 | return STATUS_FORMAT(Corruption, |
201 | 35.1k | "Unknown value type for redis record: $0", |
202 | 35.1k | static_cast<char>(doc.value_type())); |
203 | 35.1k | } |
204 | 35.1k | } |
205 | | |
206 | | Result<RedisValue> GetRedisValue( |
207 | | IntentAwareIterator* iterator, |
208 | | const RedisKeyValuePB &key_value_pb, |
209 | | int subkey_index = kNilSubkeyIndex, |
210 | | bool always_override = false, |
211 | 37.0k | Expiration* exp = nullptr) { |
212 | 37.0k | if (!key_value_pb.has_key()) { |
213 | 0 | return STATUS(Corruption, "Expected KeyValuePB"); |
214 | 0 | } |
215 | 37.0k | auto encoded_doc_key = DocKey::EncodedFromRedisKey(key_value_pb.hash_code(), key_value_pb.key()); |
216 | | |
217 | 37.0k | if (!key_value_pb.subkey().empty()) { |
218 | 609 | if (key_value_pb.subkey().size() != 1 && subkey_index == kNilSubkeyIndex) { |
219 | 0 | return STATUS_SUBSTITUTE(Corruption, |
220 | 0 | "Expected at most one subkey, got $0", key_value_pb.subkey().size()); |
221 | 0 | } |
222 | 609 | PrimitiveValue subkey_primitive; |
223 | 609 | RETURN_NOT_OK(PrimitiveValueFromSubKey( |
224 | 609 | key_value_pb.subkey(subkey_index == kNilSubkeyIndex ? 0 : subkey_index), |
225 | 609 | &subkey_primitive)); |
226 | 609 | subkey_primitive.AppendToKey(&encoded_doc_key); |
227 | 609 | } |
228 | | |
229 | 37.0k | SubDocument doc; |
230 | 37.0k | bool doc_found = false; |
231 | | |
232 | | // TODO(dtxn) - pass correct transaction context when we implement cross-shard transactions |
233 | | // support for Redis. |
234 | 37.0k | GetRedisSubDocumentData data = { encoded_doc_key, &doc, &doc_found }; |
235 | 37.0k | data.exp.always_override = always_override; |
236 | 37.0k | RETURN_NOT_OK(GetRedisSubDocument( |
237 | 37.0k | iterator, data, /* projection */ nullptr, SeekFwdSuffices::kFalse)); |
238 | 37.0k | if (!doc_found) { |
239 | 2.31k | return RedisValue{REDIS_TYPE_NONE}; |
240 | 2.31k | } |
241 | | |
242 | 34.7k | if (HasExpiredTTL(data.exp.write_ht, data.exp.ttl, iterator->read_time().read)) { |
243 | 0 | return RedisValue{REDIS_TYPE_NONE}; |
244 | 0 | } |
245 | | |
246 | 34.7k | if (exp) |
247 | 24 | *exp = data.exp; |
248 | | |
249 | 34.7k | if (!doc.IsPrimitive()) { |
250 | 3 | switch (doc.value_type()) { |
251 | 0 | case ValueType::kObject: |
252 | 0 | return RedisValue{REDIS_TYPE_HASH}; |
253 | 0 | case ValueType::kRedisTS: |
254 | 0 | return RedisValue{REDIS_TYPE_TIMESERIES}; |
255 | 3 | case ValueType::kRedisSortedSet: |
256 | 3 | return RedisValue{REDIS_TYPE_SORTEDSET}; |
257 | 0 | case ValueType::kRedisSet: |
258 | 0 | return RedisValue{REDIS_TYPE_SET}; |
259 | 0 | case ValueType::kRedisList: |
260 | 0 | return RedisValue{REDIS_TYPE_LIST}; |
261 | 0 | default: |
262 | 0 | return STATUS_SUBSTITUTE(IllegalState, "Invalid value type: $0", |
263 | 34.7k | static_cast<int>(doc.value_type())); |
264 | 34.7k | } |
265 | 34.7k | } |
266 | | |
267 | 34.7k | auto val = RedisValue{REDIS_TYPE_STRING, doc.GetString(), data.exp}; |
268 | 34.7k | return val; |
269 | 34.7k | } |
270 | | |
271 | | YB_STRONGLY_TYPED_BOOL(VerifySuccessIfMissing); |
272 | | |
273 | | // Set response based on the type match. Return whether the type matches what's expected. |
274 | | bool VerifyTypeAndSetCode( |
275 | | const RedisDataType expected_type, |
276 | | const RedisDataType actual_type, |
277 | | RedisResponsePB *response, |
278 | 74.0k | VerifySuccessIfMissing verify_success_if_missing = VerifySuccessIfMissing::kFalse) { |
279 | 74.0k | if (actual_type == RedisDataType::REDIS_TYPE_NONE) { |
280 | 4.28k | if (verify_success_if_missing) { |
281 | 4.28k | response->set_code(RedisResponsePB::NIL); |
282 | 0 | } else { |
283 | 0 | response->set_code(RedisResponsePB::NOT_FOUND); |
284 | 0 | } |
285 | 4.28k | return verify_success_if_missing; |
286 | 4.28k | } |
287 | 69.7k | if (actual_type != expected_type) { |
288 | 6 | response->set_code(RedisResponsePB::WRONG_TYPE); |
289 | 6 | response->set_error_message(wrong_type_message); |
290 | 6 | return false; |
291 | 6 | } |
292 | 69.7k | response->set_code(RedisResponsePB::OK); |
293 | 69.7k | return true; |
294 | 69.7k | } |
295 | | |
296 | | bool VerifyTypeAndSetCode( |
297 | | const docdb::ValueType expected_type, |
298 | | const docdb::ValueType actual_type, |
299 | 5.47k | RedisResponsePB *response) { |
300 | 5.47k | if (actual_type != expected_type) { |
301 | 3 | response->set_code(RedisResponsePB::WRONG_TYPE); |
302 | 3 | response->set_error_message(wrong_type_message); |
303 | 3 | return false; |
304 | 3 | } |
305 | 5.47k | response->set_code(RedisResponsePB::OK); |
306 | 5.47k | return true; |
307 | 5.47k | } |
308 | | |
309 | | CHECKED_STATUS AddPrimitiveValueToResponseArray(const PrimitiveValue& value, |
310 | 1.97M | RedisArrayPB* redis_array) { |
311 | 1.97M | switch (value.value_type()) { |
312 | 1.03M | case ValueType::kString: FALLTHROUGH_INTENDED; |
313 | 1.03M | case ValueType::kStringDescending: { |
314 | 1.03M | redis_array->add_elements(value.GetString()); |
315 | 1.03M | return Status::OK(); |
316 | 1.03M | } |
317 | 0 | case ValueType::kInt64: FALLTHROUGH_INTENDED; |
318 | 942k | case ValueType::kInt64Descending: { |
319 | 942k | redis_array->add_elements(std::to_string(value.GetInt64())); |
320 | 942k | return Status::OK(); |
321 | 0 | } |
322 | 15 | case ValueType::kDouble: FALLTHROUGH_INTENDED; |
323 | 15 | case ValueType::kDoubleDescending: { |
324 | 15 | redis_array->add_elements(std::to_string(value.GetDouble())); |
325 | 15 | return Status::OK(); |
326 | 15 | } |
327 | 0 | default: |
328 | 0 | return STATUS_SUBSTITUTE(InvalidArgument, "Invalid value type: $0", |
329 | 1.97M | static_cast<int>(value.value_type())); |
330 | 1.97M | } |
331 | 1.97M | } |
332 | | |
333 | | CHECKED_STATUS AddResponseValuesGeneric(const PrimitiveValue& first, |
334 | | const PrimitiveValue& second, |
335 | | RedisResponsePB* response, |
336 | | bool add_keys, |
337 | | bool add_values, |
338 | 942k | bool reverse = false) { |
339 | 942k | if (add_keys) { |
340 | 942k | RETURN_NOT_OK(AddPrimitiveValueToResponseArray(first, response->mutable_array_response())); |
341 | 942k | } |
342 | 942k | if (add_values) { |
343 | 942k | RETURN_NOT_OK(AddPrimitiveValueToResponseArray(second, response->mutable_array_response())); |
344 | 942k | } |
345 | 942k | return Status::OK(); |
346 | 942k | } |
347 | | |
348 | | // Populate the response array for sorted sets range queries. |
349 | | // first refers to the score for the given values. |
350 | | // second refers to a subdocument where each key is a value with the given score. |
351 | | CHECKED_STATUS AddResponseValuesSortedSets(const PrimitiveValue& first, |
352 | | const SubDocument& second, |
353 | | RedisResponsePB* response, |
354 | | bool add_keys, |
355 | | bool add_values, |
356 | 93 | bool reverse = false) { |
357 | 93 | if (reverse) { |
358 | 0 | for (auto it = second.object_container().rbegin(); |
359 | 0 | it != second.object_container().rend(); |
360 | 0 | it++) { |
361 | 0 | const PrimitiveValue& value = it->first; |
362 | 0 | RETURN_NOT_OK(AddResponseValuesGeneric(value, first, response, add_values, add_keys)); |
363 | 0 | } |
364 | 93 | } else { |
365 | 81 | for (const auto& kv : second.object_container()) { |
366 | 81 | const PrimitiveValue& value = kv.first; |
367 | 81 | RETURN_NOT_OK(AddResponseValuesGeneric(value, first, response, add_values, add_keys)); |
368 | 81 | } |
369 | 93 | } |
370 | 93 | return Status::OK(); |
371 | 93 | } |
372 | | |
373 | | template <typename T, typename AddResponseRow> |
374 | | CHECKED_STATUS PopulateRedisResponseFromInternal(T iter, |
375 | | AddResponseRow add_response_row, |
376 | | const T& iter_end, |
377 | | RedisResponsePB *response, |
378 | | bool add_keys, |
379 | | bool add_values, |
380 | 5.47k | bool reverse = false) { |
381 | 5.47k | response->set_allocated_array_response(new RedisArrayPB()); |
382 | 948k | for (; iter != iter_end; iter++) { |
383 | 942k | RETURN_NOT_OK(add_response_row( |
384 | 942k | iter->first, iter->second, response, add_keys, add_values, reverse)); |
385 | 942k | } |
386 | 5.47k | return Status::OK(); |
387 | 5.47k | } redis_operation.cc:_ZN2yb5docdb12_GLOBAL__N_133PopulateRedisResponseFromInternalINSt3__116reverse_iteratorINS3_20__map_const_iteratorINS3_21__tree_const_iteratorINS3_12__value_typeINS0_14PrimitiveValueENS0_11SubDocumentEEEPNS3_11__tree_nodeISA_PvEElEEEEEEPFNS_6StatusERKS8_SK_PNS_15RedisResponsePBEbbbEEESI_T_T0_RKSP_SM_bbb Line | Count | Source | 380 | 30 | bool reverse = false) { | 381 | 30 | response->set_allocated_array_response(new RedisArrayPB()); | 382 | 732 | for (; iter != iter_end; iter++) { | 383 | 702 | RETURN_NOT_OK(add_response_row( | 384 | 702 | iter->first, iter->second, response, add_keys, add_values, reverse)); | 385 | 702 | } | 386 | 30 | return Status::OK(); | 387 | 30 | } |
redis_operation.cc:_ZN2yb5docdb12_GLOBAL__N_133PopulateRedisResponseFromInternalINSt3__120__map_const_iteratorINS3_21__tree_const_iteratorINS3_12__value_typeINS0_14PrimitiveValueENS0_11SubDocumentEEEPNS3_11__tree_nodeIS9_PvEElEEEEPFNS_6StatusERKS7_SI_PNS_15RedisResponsePBEbbbEEESG_T_T0_RKSN_SK_bbb Line | Count | Source | 380 | 5.41k | bool reverse = false) { | 381 | 5.41k | response->set_allocated_array_response(new RedisArrayPB()); | 382 | 947k | for (; iter != iter_end; iter++) { | 383 | 941k | RETURN_NOT_OK(add_response_row( | 384 | 941k | iter->first, iter->second, response, add_keys, add_values, reverse)); | 385 | 941k | } | 386 | 5.41k | return Status::OK(); | 387 | 5.41k | } |
Unexecuted instantiation: redis_operation.cc:_ZN2yb5docdb12_GLOBAL__N_133PopulateRedisResponseFromInternalINSt3__116reverse_iteratorINS3_20__map_const_iteratorINS3_21__tree_const_iteratorINS3_12__value_typeINS0_14PrimitiveValueENS0_11SubDocumentEEEPNS3_11__tree_nodeISA_PvEElEEEEEEPFNS_6StatusERKS8_RKS9_PNS_15RedisResponsePBEbbbEEESI_T_T0_RKSR_SO_bbb redis_operation.cc:_ZN2yb5docdb12_GLOBAL__N_133PopulateRedisResponseFromInternalINSt3__120__map_const_iteratorINS3_21__tree_const_iteratorINS3_12__value_typeINS0_14PrimitiveValueENS0_11SubDocumentEEEPNS3_11__tree_nodeIS9_PvEElEEEEPFNS_6StatusERKS7_RKS8_PNS_15RedisResponsePBEbbbEEESG_T_T0_RKSP_SM_bbb Line | Count | Source | 380 | 27 | bool reverse = false) { | 381 | 27 | response->set_allocated_array_response(new RedisArrayPB()); | 382 | 120 | for (; iter != iter_end; iter++) { | 383 | 93 | RETURN_NOT_OK(add_response_row( | 384 | 93 | iter->first, iter->second, response, add_keys, add_values, reverse)); | 385 | 93 | } | 386 | 27 | return Status::OK(); | 387 | 27 | } |
|
388 | | |
389 | | template <typename AddResponseRow> |
390 | | CHECKED_STATUS PopulateResponseFrom(const SubDocument::ObjectContainer &key_values, |
391 | | AddResponseRow add_response_row, |
392 | | RedisResponsePB *response, |
393 | | bool add_keys, |
394 | | bool add_values, |
395 | 5.47k | bool reverse = false) { |
396 | 5.47k | if (reverse) { |
397 | 30 | return PopulateRedisResponseFromInternal(key_values.rbegin(), add_response_row, |
398 | 30 | key_values.rend(), response, add_keys, |
399 | 30 | add_values, reverse); |
400 | 5.44k | } else { |
401 | 5.44k | return PopulateRedisResponseFromInternal(key_values.begin(), add_response_row, |
402 | 5.44k | key_values.end(), response, add_keys, |
403 | 5.44k | add_values, reverse); |
404 | 5.44k | } |
405 | 5.47k | } redis_operation.cc:_ZN2yb5docdb12_GLOBAL__N_120PopulateResponseFromIPFNS_6StatusERKNS0_14PrimitiveValueES6_PNS_15RedisResponsePBEbbbEEES3_RKNSt3__13mapIS4_NS0_11SubDocumentENSB_4lessIS4_EENSB_9allocatorINSB_4pairIS5_SD_EEEEEET_S8_bbb Line | Count | Source | 395 | 5.44k | bool reverse = false) { | 396 | 5.44k | if (reverse) { | 397 | 30 | return PopulateRedisResponseFromInternal(key_values.rbegin(), add_response_row, | 398 | 30 | key_values.rend(), response, add_keys, | 399 | 30 | add_values, reverse); | 400 | 5.41k | } else { | 401 | 5.41k | return PopulateRedisResponseFromInternal(key_values.begin(), add_response_row, | 402 | 5.41k | key_values.end(), response, add_keys, | 403 | 5.41k | add_values, reverse); | 404 | 5.41k | } | 405 | 5.44k | } |
redis_operation.cc:_ZN2yb5docdb12_GLOBAL__N_120PopulateResponseFromIPFNS_6StatusERKNS0_14PrimitiveValueERKNS0_11SubDocumentEPNS_15RedisResponsePBEbbbEEES3_RKNSt3__13mapIS4_S7_NSE_4lessIS4_EENSE_9allocatorINSE_4pairIS5_S7_EEEEEET_SB_bbb Line | Count | Source | 395 | 27 | bool reverse = false) { | 396 | 27 | if (reverse) { | 397 | 0 | return PopulateRedisResponseFromInternal(key_values.rbegin(), add_response_row, | 398 | 0 | key_values.rend(), response, add_keys, | 399 | 0 | add_values, reverse); | 400 | 27 | } else { | 401 | 27 | return PopulateRedisResponseFromInternal(key_values.begin(), add_response_row, | 402 | 27 | key_values.end(), response, add_keys, | 403 | 27 | add_values, reverse); | 404 | 27 | } | 405 | 27 | } |
|
406 | | |
407 | | void SetOptionalInt(RedisDataType type, int64_t value, int64_t none_value, |
408 | 0 | RedisResponsePB* response) { |
409 | 0 | if (type == RedisDataType::REDIS_TYPE_NONE) { |
410 | 0 | response->set_code(RedisResponsePB::NIL); |
411 | 0 | response->set_int_response(none_value); |
412 | 0 | } else { |
413 | 0 | response->set_code(RedisResponsePB::OK); |
414 | 0 | response->set_int_response(value); |
415 | 0 | } |
416 | 0 | } |
417 | | |
418 | 0 | void SetOptionalInt(RedisDataType type, int64_t value, RedisResponsePB* response) { |
419 | 0 | SetOptionalInt(type, value, 0, response); |
420 | 0 | } |
421 | | |
422 | 10.5k | Result<int64_t> GetCardinality(IntentAwareIterator* iterator, const RedisKeyValuePB& kv) { |
423 | 10.5k | auto encoded_key_card = DocKey::EncodedFromRedisKey(kv.hash_code(), kv.key()); |
424 | 10.5k | PrimitiveValue(ValueType::kCounter).AppendToKey(&encoded_key_card); |
425 | 10.5k | SubDocument subdoc_card; |
426 | | |
427 | 10.5k | bool subdoc_card_found = false; |
428 | 10.5k | GetRedisSubDocumentData data = { encoded_key_card, &subdoc_card, &subdoc_card_found }; |
429 | | |
430 | 10.5k | RETURN_NOT_OK(GetRedisSubDocument( |
431 | 10.5k | iterator, data, /* projection */ nullptr, SeekFwdSuffices::kFalse)); |
432 | | |
433 | 10.5k | return subdoc_card_found ? subdoc_card.GetInt64() : 0; |
434 | 10.5k | } |
435 | | |
436 | | template <typename AddResponseValues> |
437 | | CHECKED_STATUS GetAndPopulateResponseValues( |
438 | | IntentAwareIterator* iterator, |
439 | | AddResponseValues add_response_values, |
440 | | const GetRedisSubDocumentData& data, |
441 | | ValueType expected_type, |
442 | | const RedisReadRequestPB& request, |
443 | | RedisResponsePB* response, |
444 | 5.46k | bool add_keys, bool add_values, bool reverse) { |
445 | | |
446 | 5.46k | RETURN_NOT_OK(GetRedisSubDocument( |
447 | 5.46k | iterator, data, /* projection */ nullptr, SeekFwdSuffices::kFalse)); |
448 | | |
449 | | // Validate and populate response. |
450 | 5.46k | response->set_allocated_array_response(new RedisArrayPB()); |
451 | 5.46k | if (!(*data.doc_found)) { |
452 | 3 | response->set_code(RedisResponsePB::NIL); |
453 | 3 | return Status::OK(); |
454 | 3 | } |
455 | | |
456 | 5.46k | if (VerifyTypeAndSetCode(expected_type, data.result->value_type(), response)) { |
457 | 5.46k | RETURN_NOT_OK(PopulateResponseFrom(data.result->object_container(), |
458 | 5.46k | add_response_values, |
459 | 5.46k | response, add_keys, add_values, reverse)); |
460 | 5.46k | } |
461 | 5.46k | return Status::OK(); |
462 | 5.46k | } redis_operation.cc:_ZN2yb5docdb12_GLOBAL__N_128GetAndPopulateResponseValuesIPFNS_6StatusERKNS0_14PrimitiveValueERKNS0_11SubDocumentEPNS_15RedisResponsePBEbbbEEES3_PNS0_19IntentAwareIteratorET_RKNS0_23GetRedisSubDocumentDataENS0_9ValueTypeERKNS_18RedisReadRequestPBESB_bbb Line | Count | Source | 444 | 30 | bool add_keys, bool add_values, bool reverse) { | 445 | | | 446 | 30 | RETURN_NOT_OK(GetRedisSubDocument( | 447 | 30 | iterator, data, /* projection */ nullptr, SeekFwdSuffices::kFalse)); | 448 | | | 449 | | // Validate and populate response. | 450 | 30 | response->set_allocated_array_response(new RedisArrayPB()); | 451 | 30 | if (!(*data.doc_found)) { | 452 | 3 | response->set_code(RedisResponsePB::NIL); | 453 | 3 | return Status::OK(); | 454 | 3 | } | 455 | | | 456 | 27 | if (VerifyTypeAndSetCode(expected_type, data.result->value_type(), response)) { | 457 | 27 | RETURN_NOT_OK(PopulateResponseFrom(data.result->object_container(), | 458 | 27 | add_response_values, | 459 | 27 | response, add_keys, add_values, reverse)); | 460 | 27 | } | 461 | 27 | return Status::OK(); | 462 | 27 | } |
redis_operation.cc:_ZN2yb5docdb12_GLOBAL__N_128GetAndPopulateResponseValuesIPFNS_6StatusERKNS0_14PrimitiveValueES6_PNS_15RedisResponsePBEbbbEEES3_PNS0_19IntentAwareIteratorET_RKNS0_23GetRedisSubDocumentDataENS0_9ValueTypeERKNS_18RedisReadRequestPBES8_bbb Line | Count | Source | 444 | 5.43k | bool add_keys, bool add_values, bool reverse) { | 445 | | | 446 | 5.43k | RETURN_NOT_OK(GetRedisSubDocument( | 447 | 5.43k | iterator, data, /* projection */ nullptr, SeekFwdSuffices::kFalse)); | 448 | | | 449 | | // Validate and populate response. | 450 | 5.43k | response->set_allocated_array_response(new RedisArrayPB()); | 451 | 5.43k | if (!(*data.doc_found)) { | 452 | 0 | response->set_code(RedisResponsePB::NIL); | 453 | 0 | return Status::OK(); | 454 | 0 | } | 455 | | | 456 | 5.43k | if (VerifyTypeAndSetCode(expected_type, data.result->value_type(), response)) { | 457 | 5.43k | RETURN_NOT_OK(PopulateResponseFrom(data.result->object_container(), | 458 | 5.43k | add_response_values, | 459 | 5.43k | response, add_keys, add_values, reverse)); | 460 | 5.43k | } | 461 | 5.43k | return Status::OK(); | 462 | 5.43k | } |
|
463 | | |
464 | | // Get normalized (with respect to card) upper and lower index bounds for reverse range scans. |
465 | | void GetNormalizedBounds(int64 low_idx, int64 high_idx, int64 card, bool reverse, |
466 | 0 | int64* low_idx_normalized, int64* high_idx_normalized) { |
467 | | // Turn negative bounds positive. |
468 | 0 | if (low_idx < 0) { |
469 | 0 | low_idx = card + low_idx; |
470 | 0 | } |
471 | 0 | if (high_idx < 0) { |
472 | 0 | high_idx = card + high_idx; |
473 | 0 | } |
474 | | |
475 | | // Index from lower to upper instead of upper to lower. |
476 | 0 | if (reverse) { |
477 | 0 | *low_idx_normalized = card - high_idx - 1; |
478 | 0 | *high_idx_normalized = card - low_idx - 1; |
479 | 0 | } else { |
480 | 0 | *low_idx_normalized = low_idx; |
481 | 0 | *high_idx_normalized = high_idx; |
482 | 0 | } |
483 | | |
484 | | // Fit bounds to range [0, card). |
485 | 0 | if (*low_idx_normalized < 0) { |
486 | 0 | *low_idx_normalized = 0; |
487 | 0 | } |
488 | 0 | if (*high_idx_normalized >= card) { |
489 | 0 | *high_idx_normalized = card - 1; |
490 | 0 | } |
491 | 0 | } |
492 | | |
493 | | } // anonymous namespace |
494 | | |
495 | 20.5k | void RedisWriteOperation::InitializeIterator(const DocOperationApplyData& data) { |
496 | 20.5k | auto subdoc_key = SubDocKey(DocKey::FromRedisKey( |
497 | 20.5k | request_.key_value().hash_code(), request_.key_value().key())); |
498 | | |
499 | 20.5k | auto iter = CreateIntentAwareIterator( |
500 | 20.5k | data.doc_write_batch->doc_db(), |
501 | 20.5k | BloomFilterMode::USE_BLOOM_FILTER, subdoc_key.Encode().AsSlice(), |
502 | 20.5k | redis_query_id(), TransactionOperationContext(), data.deadline, data.read_time); |
503 | | |
504 | 20.5k | iterator_ = std::move(iter); |
505 | 20.5k | } |
506 | | |
507 | 61.5k | Status RedisWriteOperation::Apply(const DocOperationApplyData& data) { |
508 | 61.5k | switch (request_.request_case()) { |
509 | 61.4k | case RedisWriteRequestPB::kSetRequest: |
510 | 61.4k | return ApplySet(data); |
511 | 48 | case RedisWriteRequestPB::kSetTtlRequest: |
512 | 48 | return ApplySetTtl(data); |
513 | 0 | case RedisWriteRequestPB::kGetsetRequest: |
514 | 0 | return ApplyGetSet(data); |
515 | 0 | case RedisWriteRequestPB::kAppendRequest: |
516 | 0 | return ApplyAppend(data); |
517 | 18 | case RedisWriteRequestPB::kDelRequest: |
518 | 18 | return ApplyDel(data); |
519 | 0 | case RedisWriteRequestPB::kSetRangeRequest: |
520 | 0 | return ApplySetRange(data); |
521 | 0 | case RedisWriteRequestPB::kIncrRequest: |
522 | 0 | return ApplyIncr(data); |
523 | 0 | case RedisWriteRequestPB::kPushRequest: |
524 | 0 | return ApplyPush(data); |
525 | 0 | case RedisWriteRequestPB::kInsertRequest: |
526 | 0 | return ApplyInsert(data); |
527 | 0 | case RedisWriteRequestPB::kPopRequest: |
528 | 0 | return ApplyPop(data); |
529 | 0 | case RedisWriteRequestPB::kAddRequest: |
530 | 0 | return ApplyAdd(data); |
531 | | // TODO: Cut this short in doc_operation. |
532 | 3 | case RedisWriteRequestPB::kNoOpRequest: |
533 | 3 | return Status::OK(); |
534 | 0 | case RedisWriteRequestPB::REQUEST_NOT_SET: break; |
535 | 0 | } |
536 | 0 | return STATUS_FORMAT(Corruption, "Unsupported redis read operation: $0", request_.request_case()); |
537 | 0 | } |
538 | | |
539 | | Result<RedisDataType> RedisWriteOperation::GetValueType( |
540 | 20.5k | const DocOperationApplyData& data, int subkey_index) { |
541 | 20.5k | if (!iterator_) { |
542 | 20.5k | InitializeIterator(data); |
543 | 20.5k | } |
544 | 20.5k | return GetRedisValueType( |
545 | 20.5k | iterator_.get(), request_.key_value(), data.doc_write_batch, subkey_index); |
546 | 20.5k | } |
547 | | |
548 | | Result<RedisValue> RedisWriteOperation::GetValue( |
549 | 48 | const DocOperationApplyData& data, int subkey_index, Expiration* ttl) { |
550 | 48 | if (!iterator_) { |
551 | 48 | InitializeIterator(data); |
552 | 48 | } |
553 | 48 | return GetRedisValue(iterator_.get(), request_.key_value(), |
554 | 48 | subkey_index, /* always_override */ false, ttl); |
555 | 48 | } |
556 | | |
557 | 61.4k | Status RedisWriteOperation::ApplySet(const DocOperationApplyData& data) { |
558 | 61.4k | const RedisKeyValuePB& kv = request_.key_value(); |
559 | 61.4k | const MonoDelta ttl = request_.set_request().has_ttl() ? |
560 | 61.4k | MonoDelta::FromMilliseconds(request_.set_request().ttl()) : Value::kMaxTtl; |
561 | 61.4k | DocPath doc_path = DocPath::DocPathFromRedisKey(kv.hash_code(), kv.key()); |
562 | 61.4k | if (kv.subkey_size() > 0) { |
563 | 20.4k | RedisDataType data_type = VERIFY_RESULT(GetValueType(data)); |
564 | 20.4k | switch (kv.type()) { |
565 | 87 | case REDIS_TYPE_TIMESERIES: FALLTHROUGH_INTENDED; |
566 | 9.98k | case REDIS_TYPE_HASH: { |
567 | 9.98k | if (data_type != kv.type() && data_type != REDIS_TYPE_NONE) { |
568 | 0 | response_.set_code(RedisResponsePB::WRONG_TYPE); |
569 | 0 | response_.set_error_message(wrong_type_message); |
570 | 0 | return Status::OK(); |
571 | 0 | } |
572 | 9.98k | SubDocument kv_entries = SubDocument(); |
573 | 54.5k | for (int i = 0; i < kv.subkey_size(); i++) { |
574 | 44.5k | PrimitiveValue subkey_value; |
575 | 44.5k | RETURN_NOT_OK(PrimitiveValueFromSubKeyStrict(kv.subkey(i), kv.type(), &subkey_value)); |
576 | 44.5k | kv_entries.SetChild(subkey_value, |
577 | 44.5k | SubDocument(PrimitiveValue(kv.value(i)))); |
578 | 44.5k | } |
579 | | |
580 | 9.98k | if (kv.type() == REDIS_TYPE_TIMESERIES) { |
581 | 87 | RETURN_NOT_OK(kv_entries.ConvertToRedisTS()); |
582 | 87 | } |
583 | | |
584 | | // For an HSET command (which has only one subkey), we need to read the subkey to find out |
585 | | // if the key already existed, and return 0 or 1 accordingly. This read is unnecessary for |
586 | | // HMSET and TSADD. |
587 | 9.98k | if (kv.subkey_size() == 1 && EmulateRedisResponse(kv.type()) && |
588 | 0 | !request_.set_request().expect_ok_response()) { |
589 | 0 | RedisDataType type = VERIFY_RESULT(GetValueType(data, 0)); |
590 | | // For HSET/TSADD, we return 0 or 1 depending on if the key already existed. |
591 | | // If flag is false, no int response is returned. |
592 | 0 | SetOptionalInt(type, 0, 1, &response_); |
593 | 0 | } |
594 | 9.98k | if (data_type == REDIS_TYPE_NONE && kv.type() == REDIS_TYPE_TIMESERIES) { |
595 | | // Need to insert the document instead of extending it. |
596 | 81 | RETURN_NOT_OK(data.doc_write_batch->InsertSubDocument( |
597 | 81 | doc_path, kv_entries, data.read_time, data.deadline, redis_query_id(), ttl, |
598 | 81 | Value::kInvalidUserTimestamp, false /* init_marker_ttl */)); |
599 | 9.90k | } else { |
600 | 9.90k | RETURN_NOT_OK(data.doc_write_batch->ExtendSubDocument( |
601 | 9.90k | doc_path, kv_entries, data.read_time, data.deadline, redis_query_id(), ttl)); |
602 | 9.90k | } |
603 | 9.98k | break; |
604 | 9.98k | } |
605 | 10.5k | case REDIS_TYPE_SORTEDSET: { |
606 | 10.5k | if (data_type != kv.type() && data_type != REDIS_TYPE_NONE) { |
607 | 0 | response_.set_code(RedisResponsePB::WRONG_TYPE); |
608 | 0 | response_.set_error_message(wrong_type_message); |
609 | 0 | return Status::OK(); |
610 | 0 | } |
611 | | |
612 | | // The SubDocuments to be inserted for card, the forward mapping, and reverse mapping. |
613 | 10.5k | SubDocument kv_entries_card; |
614 | 10.5k | SubDocument kv_entries_forward; |
615 | 10.5k | SubDocument kv_entries_reverse; |
616 | | |
617 | | // The top level mapping. |
618 | 10.5k | SubDocument kv_entries; |
619 | | |
620 | 10.5k | int new_elements_added = 0; |
621 | 10.5k | int return_value = 0; |
622 | 21.0k | for (int i = 0; i < kv.subkey_size(); i++) { |
623 | | // Check whether the value is already in the document, if so delete it. |
624 | 10.5k | SubDocKey key_reverse = SubDocKey(DocKey::FromRedisKey(kv.hash_code(), kv.key()), |
625 | 10.5k | PrimitiveValue(ValueType::kSSReverse), |
626 | 10.5k | PrimitiveValue(kv.value(i))); |
627 | 10.5k | SubDocument subdoc_reverse; |
628 | 10.5k | bool subdoc_reverse_found = false; |
629 | 10.5k | auto encoded_key_reverse = key_reverse.EncodeWithoutHt(); |
630 | 10.5k | GetRedisSubDocumentData get_data = { encoded_key_reverse, &subdoc_reverse, |
631 | 10.5k | &subdoc_reverse_found }; |
632 | 10.5k | RETURN_NOT_OK(GetRedisSubDocument( |
633 | 10.5k | data.doc_write_batch->doc_db(), |
634 | 10.5k | get_data, redis_query_id(), TransactionOperationContext(), data.deadline, |
635 | 10.5k | data.read_time)); |
636 | | |
637 | | // Flag indicating whether we should add the given entry to the sorted set. |
638 | 10.5k | bool should_add_entry = true; |
639 | | // Flag indicating whether we shoould remove an entry from the sorted set. |
640 | 10.5k | bool should_remove_existing_entry = false; |
641 | | |
642 | 10.5k | if (!subdoc_reverse_found) { |
643 | | // The value is not already in the document. |
644 | 10.5k | switch (request_.set_request().sorted_set_options().update_options()) { |
645 | 0 | case SortedSetOptionsPB::NX: FALLTHROUGH_INTENDED; |
646 | 10.5k | case SortedSetOptionsPB::NONE: { |
647 | | // Both these options call for inserting new elements, increment return_value and |
648 | | // keep should_add_entry as true. |
649 | 10.5k | return_value++; |
650 | 10.5k | new_elements_added++; |
651 | 10.5k | break; |
652 | 0 | } |
653 | 0 | default: { |
654 | | // XX option calls for no new elements, don't increment return_value and set |
655 | | // should_add_entry to false. |
656 | 0 | should_add_entry = false; |
657 | 0 | break; |
658 | 0 | } |
659 | 0 | } |
660 | 0 | } else { |
661 | | // The value is already in the document. |
662 | 0 | switch (request_.set_request().sorted_set_options().update_options()) { |
663 | 0 | case SortedSetOptionsPB::XX: |
664 | 0 | case SortedSetOptionsPB::NONE: { |
665 | | // First make sure that the new score is different from the old score. |
666 | | // Both these options call for updating existing elements, set |
667 | | // should_remove_existing_entry to true, and if the CH flag is on (return both |
668 | | // elements changed and elements added), increment return_value. |
669 | 0 | double score_to_remove = subdoc_reverse.GetDouble(); |
670 | | // If incr option is set, we add the increment to the existing score. |
671 | 0 | double score_to_add = request_.set_request().sorted_set_options().incr() ? |
672 | 0 | score_to_remove + kv.subkey(i).double_subkey() : kv.subkey(i).double_subkey(); |
673 | 0 | if (score_to_remove != score_to_add) { |
674 | 0 | should_remove_existing_entry = true; |
675 | 0 | if (request_.set_request().sorted_set_options().ch()) { |
676 | 0 | return_value++; |
677 | 0 | } |
678 | 0 | } |
679 | 0 | break; |
680 | 0 | } |
681 | 0 | default: { |
682 | | // NX option calls for only new elements, set should_add_entry to false. |
683 | 0 | should_add_entry = false; |
684 | 0 | break; |
685 | 10.5k | } |
686 | 0 | } |
687 | 0 | } |
688 | | |
689 | 10.5k | if (should_remove_existing_entry) { |
690 | 0 | double score_to_remove = subdoc_reverse.GetDouble(); |
691 | 0 | SubDocument subdoc_forward_tombstone; |
692 | 0 | subdoc_forward_tombstone.SetChild(PrimitiveValue(kv.value(i)), |
693 | 0 | SubDocument(ValueType::kTombstone)); |
694 | 0 | kv_entries_forward.SetChild(PrimitiveValue::Double(score_to_remove), |
695 | 0 | SubDocument(subdoc_forward_tombstone)); |
696 | 0 | } |
697 | | |
698 | 10.5k | if (should_add_entry) { |
699 | | // If the incr option is specified, we need insert the existing score + new score |
700 | | // instead of just the new score. |
701 | 10.5k | double score_to_add = request_.set_request().sorted_set_options().incr() ? |
702 | 0 | kv.subkey(i).double_subkey() + subdoc_reverse.GetDouble() : |
703 | 10.5k | kv.subkey(i).double_subkey(); |
704 | | |
705 | | // Add the forward mapping to the entries. |
706 | 10.5k | SubDocument *forward_entry = |
707 | 10.5k | kv_entries_forward.GetOrAddChild(PrimitiveValue::Double(score_to_add)).first; |
708 | 10.5k | forward_entry->SetChild(PrimitiveValue(kv.value(i)), |
709 | 10.5k | SubDocument(PrimitiveValue())); |
710 | | |
711 | | // Add the reverse mapping to the entries. |
712 | 10.5k | kv_entries_reverse.SetChild(PrimitiveValue(kv.value(i)), |
713 | 10.5k | SubDocument(PrimitiveValue::Double(score_to_add))); |
714 | 10.5k | } |
715 | 10.5k | } |
716 | | |
717 | 10.5k | if (new_elements_added > 0) { |
718 | 10.5k | int64_t card = VERIFY_RESULT(GetCardinality(iterator_.get(), kv)); |
719 | | // Insert card + new_elements_added back into the document for the updated card. |
720 | 10.5k | kv_entries_card = SubDocument(PrimitiveValue(card + new_elements_added)); |
721 | 10.5k | kv_entries.SetChild(PrimitiveValue(ValueType::kCounter), SubDocument(kv_entries_card)); |
722 | 10.5k | } |
723 | | |
724 | 10.5k | if (kv_entries_forward.object_num_keys() > 0) { |
725 | 10.5k | kv_entries.SetChild(PrimitiveValue(ValueType::kSSForward), |
726 | 10.5k | SubDocument(kv_entries_forward)); |
727 | 10.5k | } |
728 | | |
729 | 10.5k | if (kv_entries_reverse.object_num_keys() > 0) { |
730 | 10.5k | kv_entries.SetChild(PrimitiveValue(ValueType::kSSReverse), |
731 | 10.5k | SubDocument(kv_entries_reverse)); |
732 | 10.5k | } |
733 | | |
734 | 10.5k | if (kv_entries.object_num_keys() > 0) { |
735 | 10.5k | RETURN_NOT_OK(kv_entries.ConvertToRedisSortedSet()); |
736 | 10.5k | if (data_type == REDIS_TYPE_NONE) { |
737 | 10.5k | RETURN_NOT_OK(data.doc_write_batch->InsertSubDocument( |
738 | 10.5k | doc_path, kv_entries, data.read_time, data.deadline, redis_query_id(), ttl)); |
739 | 0 | } else { |
740 | 0 | RETURN_NOT_OK(data.doc_write_batch->ExtendSubDocument( |
741 | 0 | doc_path, kv_entries, data.read_time, data.deadline, redis_query_id(), ttl)); |
742 | 0 | } |
743 | 10.5k | } |
744 | 10.5k | response_.set_code(RedisResponsePB::OK); |
745 | 10.5k | response_.set_int_response(return_value); |
746 | 10.5k | break; |
747 | 10.5k | } |
748 | 0 | case REDIS_TYPE_STRING: { |
749 | 0 | return STATUS_SUBSTITUTE(InvalidCommand, |
750 | 10.5k | "Redis data type $0 in SET command should not have subkeys", kv.type()); |
751 | 10.5k | } |
752 | 0 | default: |
753 | 0 | return STATUS_SUBSTITUTE(InvalidCommand, |
754 | 40.9k | "Redis data type $0 not supported in SET command", kv.type()); |
755 | 40.9k | } |
756 | 40.9k | } else { |
757 | 40.9k | if (kv.type() != REDIS_TYPE_STRING) { |
758 | 0 | return STATUS_SUBSTITUTE(InvalidCommand, |
759 | 0 | "Redis data type for SET must be string if subkey not present, found $0", kv.type()); |
760 | 0 | } |
761 | 40.9k | if (kv.value_size() != 1) { |
762 | 0 | return STATUS_SUBSTITUTE(InvalidCommand, |
763 | 0 | "There must be only one value in SET if there is only one key, found $0", |
764 | 0 | kv.value_size()); |
765 | 0 | } |
766 | 40.9k | const RedisWriteMode mode = request_.set_request().mode(); |
767 | 40.9k | if (mode != RedisWriteMode::REDIS_WRITEMODE_UPSERT) { |
768 | 33 | RedisDataType data_type = VERIFY_RESULT(GetValueType(data)); |
769 | 33 | if ((mode == RedisWriteMode::REDIS_WRITEMODE_INSERT && data_type != REDIS_TYPE_NONE) |
770 | 24 | || (mode == RedisWriteMode::REDIS_WRITEMODE_UPDATE && data_type == REDIS_TYPE_NONE)) { |
771 | | |
772 | 15 | if (request_.set_request().has_expect_ok_response() && |
773 | 0 | !request_.set_request().expect_ok_response()) { |
774 | | // For SETNX we return 0 or 1 depending on if the key already existed. |
775 | 0 | response_.set_int_response(0); |
776 | 0 | response_.set_code(RedisResponsePB::OK); |
777 | 15 | } else { |
778 | 15 | response_.set_code(RedisResponsePB::NIL); |
779 | 15 | } |
780 | 15 | return Status::OK(); |
781 | 15 | } |
782 | 40.9k | } |
783 | 40.9k | RETURN_NOT_OK(data.doc_write_batch->SetPrimitive( |
784 | 40.9k | doc_path, Value(PrimitiveValue(kv.value(0)), ttl), |
785 | 40.9k | data.read_time, data.deadline, redis_query_id())); |
786 | 40.9k | } |
787 | | |
788 | 61.4k | if (request_.set_request().has_expect_ok_response() && |
789 | 9.90k | !request_.set_request().expect_ok_response()) { |
790 | | // For SETNX we return 0 or 1 depending on if the key already existed. |
791 | 0 | response_.set_int_response(1); |
792 | 0 | } |
793 | | |
794 | 61.4k | response_.set_code(RedisResponsePB::OK); |
795 | 61.4k | return Status::OK(); |
796 | 61.4k | } |
797 | | |
798 | 48 | Status RedisWriteOperation::ApplySetTtl(const DocOperationApplyData& data) { |
799 | 48 | const RedisKeyValuePB& kv = request_.key_value(); |
800 | | |
801 | | // We only support setting TTLs on top-level keys. |
802 | 48 | if (!kv.subkey().empty()) { |
803 | 0 | return STATUS_SUBSTITUTE(Corruption, |
804 | 0 | "Expected no subkeys, got $0", kv.subkey().size()); |
805 | 0 | } |
806 | | |
807 | 48 | MonoDelta ttl; |
808 | 48 | bool absolute_expiration = request_.set_ttl_request().has_absolute_time(); |
809 | | |
810 | | // Handle ExpireAt |
811 | 48 | if (absolute_expiration) { |
812 | 0 | int64_t calc_ttl = request_.set_ttl_request().absolute_time() - |
813 | 0 | server::HybridClock::GetPhysicalValueNanos(data.read_time.read) / |
814 | 0 | MonoTime::kNanosecondsPerMillisecond; |
815 | 0 | if (calc_ttl <= 0) { |
816 | 0 | return ApplyDel(data); |
817 | 0 | } |
818 | 0 | ttl = MonoDelta::FromMilliseconds(calc_ttl); |
819 | 0 | } |
820 | | |
821 | 48 | Expiration exp; |
822 | 48 | auto value = VERIFY_RESULT(GetValue(data, kNilSubkeyIndex, &exp)); |
823 | | |
824 | 48 | if (value.type == REDIS_TYPE_TIMESERIES) { // This command is not supported. |
825 | 0 | return STATUS_SUBSTITUTE(InvalidCommand, |
826 | 0 | "Redis data type $0 not supported in EXPIRE and PERSIST commands", value.type); |
827 | 0 | } |
828 | | |
829 | 48 | if (value.type == REDIS_TYPE_NONE) { // Key does not exist. |
830 | 0 | VLOG(1) << "TTL cannot be set because the key does not exist"; |
831 | 24 | response_.set_int_response(0); |
832 | 24 | return Status::OK(); |
833 | 24 | } |
834 | | |
835 | 24 | if (!absolute_expiration && request_.set_ttl_request().ttl() == -1) { // Handle PERSIST. |
836 | 9 | MonoDelta new_ttl = VERIFY_RESULT(exp.ComputeRelativeTtl(iterator_->read_time().read)); |
837 | 9 | if (new_ttl.IsNegative() || new_ttl == Value::kMaxTtl) { |
838 | 6 | response_.set_int_response(0); |
839 | 6 | return Status::OK(); |
840 | 6 | } |
841 | 18 | } |
842 | | |
843 | 18 | DocPath doc_path = DocPath::DocPathFromRedisKey(kv.hash_code(), kv.key()); |
844 | 18 | if (!absolute_expiration) { |
845 | 3 | ttl = request_.set_ttl_request().ttl() == -1 ? Value::kMaxTtl : |
846 | 15 | MonoDelta::FromMilliseconds(request_.set_ttl_request().ttl()); |
847 | 18 | } |
848 | | |
849 | 18 | ValueType v_type = ValueTypeFromRedisType(value.type); |
850 | 18 | if (v_type == ValueType::kInvalid) |
851 | 0 | return STATUS(Corruption, "Invalid value type."); |
852 | | |
853 | 18 | RETURN_NOT_OK(data.doc_write_batch->SetPrimitive( |
854 | 18 | doc_path, Value(PrimitiveValue(v_type), ttl, Value::kInvalidUserTimestamp, Value::kTtlFlag), |
855 | 18 | data.read_time, data.deadline, redis_query_id())); |
856 | 0 | VLOG(2) << "Set TTL successfully to " << ttl << " for key " << kv.key(); |
857 | 18 | response_.set_int_response(1); |
858 | 18 | return Status::OK(); |
859 | 18 | } |
860 | | |
861 | 0 | Status RedisWriteOperation::ApplyGetSet(const DocOperationApplyData& data) { |
862 | 0 | const RedisKeyValuePB& kv = request_.key_value(); |
863 | |
|
864 | 0 | if (kv.value_size() != 1) { |
865 | 0 | return STATUS_SUBSTITUTE(Corruption, |
866 | 0 | "Getset kv should have 1 value, found $0", kv.value_size()); |
867 | 0 | } |
868 | | |
869 | 0 | auto value = GetValue(data); |
870 | 0 | RETURN_NOT_OK(value); |
871 | |
|
872 | 0 | if (!VerifyTypeAndSetCode(RedisDataType::REDIS_TYPE_STRING, value->type, &response_, |
873 | 0 | VerifySuccessIfMissing::kTrue)) { |
874 | | // We've already set the error code in the response. |
875 | 0 | return Status::OK(); |
876 | 0 | } |
877 | 0 | response_.set_string_response(value->value); |
878 | |
|
879 | 0 | return data.doc_write_batch->SetPrimitive( |
880 | 0 | DocPath::DocPathFromRedisKey(kv.hash_code(), kv.key()), |
881 | 0 | Value(PrimitiveValue(kv.value(0))), data.read_time, data.deadline, redis_query_id()); |
882 | 0 | } |
883 | | |
884 | 0 | Status RedisWriteOperation::ApplyAppend(const DocOperationApplyData& data) { |
885 | 0 | const RedisKeyValuePB& kv = request_.key_value(); |
886 | |
|
887 | 0 | if (kv.value_size() != 1) { |
888 | 0 | return STATUS_SUBSTITUTE(Corruption, |
889 | 0 | "Append kv should have 1 value, found $0", kv.value_size()); |
890 | 0 | } |
891 | | |
892 | 0 | auto value = GetValue(data); |
893 | 0 | RETURN_NOT_OK(value); |
894 | |
|
895 | 0 | if (!VerifyTypeAndSetCode(RedisDataType::REDIS_TYPE_STRING, value->type, &response_, |
896 | 0 | VerifySuccessIfMissing::kTrue)) { |
897 | | // We've already set the error code in the response. |
898 | 0 | return Status::OK(); |
899 | 0 | } |
900 | | |
901 | 0 | value->value += kv.value(0); |
902 | |
|
903 | 0 | response_.set_code(RedisResponsePB::OK); |
904 | 0 | response_.set_int_response(value->value.length()); |
905 | | |
906 | | // TODO: update the TTL with the write time rather than read time, |
907 | | // or store the expiration. |
908 | 0 | return data.doc_write_batch->SetPrimitive( |
909 | 0 | DocPath::DocPathFromRedisKey(kv.hash_code(), kv.key()), |
910 | 0 | Value(PrimitiveValue(value->value), |
911 | 0 | VERIFY_RESULT(value->exp.ComputeRelativeTtl(iterator_->read_time().read))), |
912 | 0 | data.read_time, |
913 | 0 | data.deadline, |
914 | 0 | redis_query_id()); |
915 | 0 | } |
916 | | |
917 | | // TODO (akashnil): Actually check if the value existed, return 0 if not. handle multidel in future. |
918 | | // See ENG-807 |
919 | 18 | Status RedisWriteOperation::ApplyDel(const DocOperationApplyData& data) { |
920 | 18 | const RedisKeyValuePB& kv = request_.key_value(); |
921 | 18 | RedisDataType data_type = VERIFY_RESULT(GetValueType(data)); |
922 | 18 | if (data_type != REDIS_TYPE_NONE && data_type != kv.type() && kv.type() != REDIS_TYPE_NONE) { |
923 | 0 | response_.set_code(RedisResponsePB::WRONG_TYPE); |
924 | 0 | response_.set_error_message(wrong_type_message); |
925 | 0 | return Status::OK(); |
926 | 0 | } |
927 | | |
928 | 18 | SubDocument values; |
929 | | // Number of distinct keys being removed. |
930 | 18 | int num_keys = 0; |
931 | 18 | switch (kv.type()) { |
932 | 9 | case REDIS_TYPE_NONE: { |
933 | 9 | values = SubDocument(ValueType::kTombstone); |
934 | 9 | num_keys = data_type == REDIS_TYPE_NONE ? 0 : 1; |
935 | 9 | break; |
936 | 0 | } |
937 | 9 | case REDIS_TYPE_TIMESERIES: { |
938 | 9 | if (data_type == REDIS_TYPE_NONE) { |
939 | 3 | return Status::OK(); |
940 | 3 | } |
941 | 309 | for (int i = 0; i < kv.subkey_size(); i++) { |
942 | 303 | PrimitiveValue primitive_value; |
943 | 303 | RETURN_NOT_OK(PrimitiveValueFromSubKeyStrict(kv.subkey(i), data_type, &primitive_value)); |
944 | 303 | values.SetChild(primitive_value, SubDocument(ValueType::kTombstone)); |
945 | 303 | } |
946 | 6 | num_keys = kv.subkey_size(); |
947 | 6 | break; |
948 | 6 | } |
949 | 0 | case REDIS_TYPE_SORTEDSET: { |
950 | 0 | SubDocument values_card; |
951 | 0 | SubDocument values_forward; |
952 | 0 | SubDocument values_reverse; |
953 | 0 | num_keys = kv.subkey_size(); |
954 | 0 | for (int i = 0; i < kv.subkey_size(); i++) { |
955 | | // Check whether the value is already in the document. |
956 | 0 | SubDocument doc_reverse; |
957 | 0 | bool doc_reverse_found = false; |
958 | 0 | SubDocKey subdoc_key_reverse = SubDocKey(DocKey::FromRedisKey(kv.hash_code(), kv.key()), |
959 | 0 | PrimitiveValue(ValueType::kSSReverse), |
960 | 0 | PrimitiveValue(kv.subkey(i).string_subkey())); |
961 | | // Todo(Rahul): Add values to the write batch cache and then do an additional check. |
962 | | // As of now, we only check to see if a value is in rocksdb, and we should also check |
963 | | // the write batch. |
964 | 0 | auto encoded_subdoc_key_reverse = subdoc_key_reverse.EncodeWithoutHt(); |
965 | 0 | GetRedisSubDocumentData get_data = { encoded_subdoc_key_reverse, &doc_reverse, |
966 | 0 | &doc_reverse_found }; |
967 | 0 | RETURN_NOT_OK(GetRedisSubDocument( |
968 | 0 | data.doc_write_batch->doc_db(), |
969 | 0 | get_data, redis_query_id(), TransactionOperationContext(), data.deadline, |
970 | 0 | data.read_time)); |
971 | 0 | if (doc_reverse_found && doc_reverse.value_type() != ValueType::kTombstone) { |
972 | | // The value is already in the doc, needs to be removed. |
973 | 0 | values_reverse.SetChild(PrimitiveValue(kv.subkey(i).string_subkey()), |
974 | 0 | SubDocument(ValueType::kTombstone)); |
975 | | // For sorted sets, the forward mapping also needs to be deleted. |
976 | 0 | SubDocument doc_forward; |
977 | 0 | doc_forward.SetChild(PrimitiveValue(kv.subkey(i).string_subkey()), |
978 | 0 | SubDocument(ValueType::kTombstone)); |
979 | 0 | values_forward.SetChild(PrimitiveValue::Double(doc_reverse.GetDouble()), |
980 | 0 | SubDocument(doc_forward)); |
981 | 0 | } else { |
982 | | // If the key is absent, it doesn't contribute to the count of keys being deleted. |
983 | 0 | num_keys--; |
984 | 0 | } |
985 | 0 | } |
986 | 0 | int64_t card = VERIFY_RESULT(GetCardinality(iterator_.get(), kv)); |
987 | | // The new cardinality is card - num_keys. |
988 | 0 | values_card = SubDocument(PrimitiveValue(card - num_keys)); |
989 | |
|
990 | 0 | values.SetChild(PrimitiveValue(ValueType::kCounter), SubDocument(values_card)); |
991 | 0 | values.SetChild(PrimitiveValue(ValueType::kSSForward), SubDocument(values_forward)); |
992 | 0 | values.SetChild(PrimitiveValue(ValueType::kSSReverse), SubDocument(values_reverse)); |
993 | |
|
994 | 0 | break; |
995 | 0 | } |
996 | 0 | default: { |
997 | 0 | num_keys = kv.subkey_size(); // We know the subkeys are distinct. |
998 | | // Avoid reads for redis timeseries type. |
999 | 0 | if (EmulateRedisResponse(kv.type())) { |
1000 | 0 | for (int i = 0; i < kv.subkey_size(); i++) { |
1001 | 0 | RedisDataType type = VERIFY_RESULT(GetValueType(data, i)); |
1002 | 0 | if (type == REDIS_TYPE_STRING) { |
1003 | 0 | values.SetChild(PrimitiveValue(kv.subkey(i).string_subkey()), |
1004 | 0 | SubDocument(ValueType::kTombstone)); |
1005 | 0 | } else { |
1006 | | // If the key is absent, it doesn't contribute to the count of keys being deleted. |
1007 | 0 | num_keys--; |
1008 | 0 | } |
1009 | 0 | } |
1010 | 0 | } |
1011 | 0 | break; |
1012 | 15 | } |
1013 | 15 | } |
1014 | | |
1015 | 15 | if (num_keys != 0) { |
1016 | 15 | DocPath doc_path = DocPath::DocPathFromRedisKey(kv.hash_code(), kv.key()); |
1017 | 15 | RETURN_NOT_OK(data.doc_write_batch->ExtendSubDocument(doc_path, values, |
1018 | 15 | data.read_time, data.deadline, redis_query_id())); |
1019 | 15 | } |
1020 | | |
1021 | 15 | response_.set_code(RedisResponsePB::OK); |
1022 | 15 | if (EmulateRedisResponse(kv.type())) { |
1023 | | // If the flag is true, we respond with the number of keys actually being deleted. We don't |
1024 | | // report this number for the redis timeseries type to avoid reads. |
1025 | 9 | response_.set_int_response(num_keys); |
1026 | 9 | } |
1027 | 15 | return Status::OK(); |
1028 | 15 | } |
1029 | | |
1030 | 0 | Status RedisWriteOperation::ApplySetRange(const DocOperationApplyData& data) { |
1031 | 0 | const RedisKeyValuePB& kv = request_.key_value(); |
1032 | 0 | if (kv.value_size() != 1) { |
1033 | 0 | return STATUS_SUBSTITUTE(Corruption, |
1034 | 0 | "SetRange kv should have 1 value, found $0", kv.value_size()); |
1035 | 0 | } |
1036 | | |
1037 | 0 | auto value = GetValue(data); |
1038 | 0 | RETURN_NOT_OK(value); |
1039 | |
|
1040 | 0 | if (!VerifyTypeAndSetCode(RedisDataType::REDIS_TYPE_STRING, value->type, &response_, |
1041 | 0 | VerifySuccessIfMissing::kTrue)) { |
1042 | | // We've already set the error code in the response. |
1043 | 0 | return Status::OK(); |
1044 | 0 | } |
1045 | | |
1046 | 0 | size_t set_range_offset = request_.set_range_request().offset(); |
1047 | 0 | if (set_range_offset > value->value.length()) { |
1048 | 0 | value->value.resize(set_range_offset, 0); |
1049 | 0 | } |
1050 | 0 | value->value.replace(set_range_offset, kv.value(0).length(), kv.value(0)); |
1051 | 0 | response_.set_code(RedisResponsePB::OK); |
1052 | 0 | response_.set_int_response(value->value.length()); |
1053 | | |
1054 | | // TODO: update the TTL with the write time rather than read time, |
1055 | | // or store the expiration. |
1056 | 0 | Value new_val = Value(PrimitiveValue(value->value), |
1057 | 0 | VERIFY_RESULT(value->exp.ComputeRelativeTtl(iterator_->read_time().read))); |
1058 | 0 | return data.doc_write_batch->SetPrimitive( |
1059 | 0 | DocPath::DocPathFromRedisKey(kv.hash_code(), kv.key()), new_val, std::move(iterator_)); |
1060 | 0 | } |
1061 | | |
1062 | 0 | Status RedisWriteOperation::ApplyIncr(const DocOperationApplyData& data) { |
1063 | 0 | const RedisKeyValuePB& kv = request_.key_value(); |
1064 | 0 | const int64_t incr = request_.incr_request().increment_int(); |
1065 | |
|
1066 | 0 | if (kv.type() != REDIS_TYPE_HASH && kv.type() != REDIS_TYPE_STRING) { |
1067 | 0 | return STATUS_SUBSTITUTE(InvalidCommand, |
1068 | 0 | "Redis data type $0 not supported in Incr command", kv.type()); |
1069 | 0 | } |
1070 | | |
1071 | 0 | RedisDataType container_type = VERIFY_RESULT(GetValueType(data)); |
1072 | 0 | if (!VerifyTypeAndSetCode(kv.type(), container_type, &response_, |
1073 | 0 | VerifySuccessIfMissing::kTrue)) { |
1074 | | // We've already set the error code in the response. |
1075 | 0 | return Status::OK(); |
1076 | 0 | } |
1077 | | |
1078 | 0 | int subkey = (kv.type() == REDIS_TYPE_HASH ? 0 : -1); |
1079 | 0 | auto value = GetValue(data, subkey); |
1080 | 0 | RETURN_NOT_OK(value); |
1081 | |
|
1082 | 0 | if (!VerifyTypeAndSetCode(RedisDataType::REDIS_TYPE_STRING, value->type, &response_, |
1083 | 0 | VerifySuccessIfMissing::kTrue)) { |
1084 | 0 | response_.set_code(RedisResponsePB::WRONG_TYPE); |
1085 | 0 | response_.set_error_message(wrong_type_message); |
1086 | 0 | return Status::OK(); |
1087 | 0 | } |
1088 | | |
1089 | | // If no value is present, 0 is the default. |
1090 | 0 | int64_t old_value = 0, new_value; |
1091 | 0 | if (value->type != REDIS_TYPE_NONE) { |
1092 | 0 | auto old = CheckedStoll(value->value); |
1093 | 0 | if (!old.ok()) { |
1094 | | // This can happen if there are leading or trailing spaces, or the value |
1095 | | // is out of range. |
1096 | 0 | response_.set_code(RedisResponsePB::WRONG_TYPE); |
1097 | 0 | response_.set_error_message("ERR value is not an integer or out of range"); |
1098 | 0 | return Status::OK(); |
1099 | 0 | } |
1100 | 0 | old_value = *old; |
1101 | 0 | } |
1102 | |
|
1103 | 0 | if ((incr < 0 && old_value < 0 && incr < numeric_limits<int64_t>::min() - old_value) || |
1104 | 0 | (incr > 0 && old_value > 0 && incr > numeric_limits<int64_t>::max() - old_value)) { |
1105 | 0 | response_.set_code(RedisResponsePB::WRONG_TYPE); |
1106 | 0 | response_.set_error_message("Increment would overflow"); |
1107 | 0 | return Status::OK(); |
1108 | 0 | } |
1109 | 0 | new_value = old_value + incr; |
1110 | 0 | response_.set_code(RedisResponsePB::OK); |
1111 | 0 | response_.set_int_response(new_value); |
1112 | |
|
1113 | 0 | DocPath doc_path = DocPath::DocPathFromRedisKey(kv.hash_code(), kv.key()); |
1114 | 0 | PrimitiveValue new_pvalue = PrimitiveValue(std::to_string(new_value)); |
1115 | 0 | if (kv.type() == REDIS_TYPE_HASH) { |
1116 | 0 | SubDocument kv_entries = SubDocument(); |
1117 | 0 | PrimitiveValue subkey_value; |
1118 | 0 | RETURN_NOT_OK(PrimitiveValueFromSubKeyStrict(kv.subkey(0), kv.type(), &subkey_value)); |
1119 | 0 | kv_entries.SetChild(subkey_value, SubDocument(new_pvalue)); |
1120 | 0 | return data.doc_write_batch->ExtendSubDocument( |
1121 | 0 | doc_path, kv_entries, data.read_time, data.deadline, redis_query_id()); |
1122 | 0 | } else { // kv.type() == REDIS_TYPE_STRING |
1123 | | // TODO: update the TTL with the write time rather than read time, |
1124 | | // or store the expiration. |
1125 | 0 | Value new_val = Value(new_pvalue, |
1126 | 0 | VERIFY_RESULT(value->exp.ComputeRelativeTtl(iterator_->read_time().read))); |
1127 | 0 | return data.doc_write_batch->SetPrimitive(doc_path, new_val, std::move(iterator_)); |
1128 | 0 | } |
1129 | 0 | } |
1130 | | |
1131 | 0 | Status RedisWriteOperation::ApplyPush(const DocOperationApplyData& data) { |
1132 | 0 | const RedisKeyValuePB& kv = request_.key_value(); |
1133 | 0 | DocPath doc_path = DocPath::DocPathFromRedisKey(kv.hash_code(), kv.key()); |
1134 | 0 | RedisDataType data_type = VERIFY_RESULT(GetValueType(data)); |
1135 | 0 | if (data_type != REDIS_TYPE_LIST && data_type != REDIS_TYPE_NONE) { |
1136 | 0 | response_.set_code(RedisResponsePB::WRONG_TYPE); |
1137 | 0 | response_.set_error_message(wrong_type_message); |
1138 | 0 | return Status::OK(); |
1139 | 0 | } |
1140 | | |
1141 | 0 | SubDocument list; |
1142 | 0 | int64_t card = VERIFY_RESULT(GetCardinality(iterator_.get(), kv)) + kv.value_size(); |
1143 | 0 | list.SetChild(PrimitiveValue(ValueType::kCounter), SubDocument(PrimitiveValue(card))); |
1144 | |
|
1145 | 0 | SubDocument elements(request_.push_request().side() == REDIS_SIDE_LEFT ? |
1146 | 0 | ListExtendOrder::PREPEND : ListExtendOrder::APPEND); |
1147 | 0 | for (auto val : kv.value()) { |
1148 | 0 | elements.AddListElement(SubDocument(PrimitiveValue(val))); |
1149 | 0 | } |
1150 | 0 | list.SetChild(PrimitiveValue(ValueType::kArray), std::move(elements)); |
1151 | 0 | RETURN_NOT_OK(list.ConvertToRedisList()); |
1152 | |
|
1153 | 0 | if (data_type == REDIS_TYPE_NONE) { |
1154 | 0 | RETURN_NOT_OK(data.doc_write_batch->InsertSubDocument( |
1155 | 0 | DocPath::DocPathFromRedisKey(kv.hash_code(), kv.key()), list, |
1156 | 0 | data.read_time, data.deadline, redis_query_id())); |
1157 | 0 | } else { |
1158 | 0 | RETURN_NOT_OK(data.doc_write_batch->ExtendSubDocument( |
1159 | 0 | DocPath::DocPathFromRedisKey(kv.hash_code(), kv.key()), list, |
1160 | 0 | data.read_time, data.deadline, redis_query_id())); |
1161 | 0 | } |
1162 | |
|
1163 | 0 | response_.set_int_response(card); |
1164 | 0 | response_.set_code(RedisResponsePB::OK); |
1165 | 0 | return Status::OK(); |
1166 | 0 | } |
1167 | | |
1168 | 0 | Status RedisWriteOperation::ApplyInsert(const DocOperationApplyData& data) { |
1169 | 0 | return STATUS(NotSupported, "Redis operation has not been implemented"); |
1170 | 0 | } |
1171 | | |
1172 | 0 | Status RedisWriteOperation::ApplyPop(const DocOperationApplyData& data) { |
1173 | 0 | const RedisKeyValuePB& kv = request_.key_value(); |
1174 | 0 | DocPath doc_path = DocPath::DocPathFromRedisKey(kv.hash_code(), kv.key()); |
1175 | 0 | RedisDataType data_type = VERIFY_RESULT(GetValueType(data)); |
1176 | |
|
1177 | 0 | if (!VerifyTypeAndSetCode(kv.type(), data_type, &response_, VerifySuccessIfMissing::kTrue)) { |
1178 | | // We already set the error code in the function. |
1179 | 0 | return Status::OK(); |
1180 | 0 | } |
1181 | | |
1182 | 0 | SubDocument list; |
1183 | 0 | int64_t card = VERIFY_RESULT(GetCardinality(iterator_.get(), kv)); |
1184 | |
|
1185 | 0 | if (!card) { |
1186 | 0 | response_.set_code(RedisResponsePB::NIL); |
1187 | 0 | return Status::OK(); |
1188 | 0 | } |
1189 | | |
1190 | 0 | std::vector<int64_t> indices; |
1191 | 0 | std::vector<SubDocument> new_value = {SubDocument(PrimitiveValue(ValueType::kTombstone))}; |
1192 | 0 | std::vector<std::string> value; |
1193 | |
|
1194 | 0 | if (request_.pop_request().side() == REDIS_SIDE_LEFT) { |
1195 | 0 | indices.push_back(1); |
1196 | 0 | RETURN_NOT_OK(data.doc_write_batch->ReplaceRedisInList(doc_path, indices, new_value, |
1197 | 0 | data.read_time, data.deadline, redis_query_id(), Direction::kForward, 0, &value)); |
1198 | 0 | } else { |
1199 | 0 | indices.push_back(card); |
1200 | 0 | RETURN_NOT_OK(data.doc_write_batch->ReplaceRedisInList(doc_path, indices, new_value, |
1201 | 0 | data.read_time, data.deadline, redis_query_id(), Direction::kBackward, card + 1, &value)); |
1202 | 0 | } |
1203 | |
|
1204 | 0 | list.SetChild(PrimitiveValue(ValueType::kCounter), SubDocument(PrimitiveValue(--card))); |
1205 | 0 | RETURN_NOT_OK(list.ConvertToRedisList()); |
1206 | 0 | RETURN_NOT_OK(data.doc_write_batch->ExtendSubDocument( |
1207 | 0 | doc_path, list, data.read_time, data.deadline, redis_query_id())); |
1208 | |
|
1209 | 0 | if (value.size() != 1) |
1210 | 0 | return STATUS_SUBSTITUTE(Corruption, |
1211 | 0 | "Expected one popped value, got $0", value.size()); |
1212 | | |
1213 | 0 | response_.set_string_response(value[0]); |
1214 | 0 | response_.set_code(RedisResponsePB::OK); |
1215 | 0 | return Status::OK(); |
1216 | 0 | } |
1217 | | |
1218 | 0 | Status RedisWriteOperation::ApplyAdd(const DocOperationApplyData& data) { |
1219 | 0 | const RedisKeyValuePB& kv = request_.key_value(); |
1220 | 0 | RedisDataType data_type = VERIFY_RESULT(GetValueType(data)); |
1221 | |
|
1222 | 0 | if (data_type != REDIS_TYPE_SET && data_type != REDIS_TYPE_NONE) { |
1223 | 0 | response_.set_code(RedisResponsePB::WRONG_TYPE); |
1224 | 0 | response_.set_error_message(wrong_type_message); |
1225 | 0 | return Status::OK(); |
1226 | 0 | } |
1227 | | |
1228 | 0 | DocPath doc_path = DocPath::DocPathFromRedisKey(kv.hash_code(), kv.key()); |
1229 | |
|
1230 | 0 | if (kv.subkey_size() == 0) { |
1231 | 0 | return STATUS(InvalidCommand, "SADD request has no subkeys set"); |
1232 | 0 | } |
1233 | | |
1234 | 0 | int num_keys_found = 0; |
1235 | |
|
1236 | 0 | SubDocument set_entries = SubDocument(); |
1237 | |
|
1238 | 0 | for (int i = 0 ; i < kv.subkey_size(); i++) { // We know that each subkey is distinct. |
1239 | 0 | if (FLAGS_emulate_redis_responses) { |
1240 | 0 | RedisDataType type = VERIFY_RESULT(GetValueType(data, i)); |
1241 | 0 | if (type != REDIS_TYPE_NONE) { |
1242 | 0 | num_keys_found++; |
1243 | 0 | } |
1244 | 0 | } |
1245 | |
|
1246 | 0 | set_entries.SetChild( |
1247 | 0 | PrimitiveValue(kv.subkey(i).string_subkey()), |
1248 | 0 | SubDocument(PrimitiveValue(ValueType::kNullLow))); |
1249 | 0 | } |
1250 | |
|
1251 | 0 | RETURN_NOT_OK(set_entries.ConvertToRedisSet()); |
1252 | |
|
1253 | 0 | Status s; |
1254 | |
|
1255 | 0 | if (data_type == REDIS_TYPE_NONE) { |
1256 | 0 | RETURN_NOT_OK(data.doc_write_batch->InsertSubDocument( |
1257 | 0 | doc_path, set_entries, data.read_time, data.deadline, redis_query_id())); |
1258 | 0 | } else { |
1259 | 0 | RETURN_NOT_OK(data.doc_write_batch->ExtendSubDocument( |
1260 | 0 | doc_path, set_entries, data.read_time, data.deadline, redis_query_id())); |
1261 | 0 | } |
1262 | |
|
1263 | 0 | response_.set_code(RedisResponsePB::OK); |
1264 | 0 | if (FLAGS_emulate_redis_responses) { |
1265 | | // If flag is set, the actual number of new keys added is sent as response. |
1266 | 0 | response_.set_int_response(kv.subkey_size() - num_keys_found); |
1267 | 0 | } |
1268 | 0 | return Status::OK(); |
1269 | 0 | } |
1270 | | |
1271 | 0 | Status RedisWriteOperation::ApplyRemove(const DocOperationApplyData& data) { |
1272 | 0 | return STATUS(NotSupported, "Redis operation has not been implemented"); |
1273 | 0 | } |
1274 | | |
1275 | 42.9k | Status RedisReadOperation::Execute() { |
1276 | 42.9k | SimulateTimeoutIfTesting(&deadline_); |
1277 | | // If we have a KEYS command, we don't specify any key for the iterator. Therefore, don't use |
1278 | | // bloom filters for this command. |
1279 | 42.9k | SubDocKey doc_key( |
1280 | 42.9k | DocKey::FromRedisKey(request_.key_value().hash_code(), request_.key_value().key())); |
1281 | 42.9k | auto bloom_filter_mode = request_.has_keys_request() ? |
1282 | 42.6k | BloomFilterMode::DONT_USE_BLOOM_FILTER : BloomFilterMode::USE_BLOOM_FILTER; |
1283 | 42.9k | auto iter = yb::docdb::CreateIntentAwareIterator( |
1284 | 42.9k | doc_db_, bloom_filter_mode, |
1285 | 42.9k | doc_key.Encode().AsSlice(), |
1286 | 42.9k | redis_query_id(), TransactionOperationContext(), deadline_, read_time_); |
1287 | 42.9k | iterator_ = std::move(iter); |
1288 | 42.9k | deadline_info_.emplace(deadline_); |
1289 | | |
1290 | 42.9k | switch (request_.request_case()) { |
1291 | 0 | case RedisReadRequestPB::kGetForRenameRequest: |
1292 | 0 | return ExecuteGetForRename(); |
1293 | 37.0k | case RedisReadRequestPB::kGetRequest: |
1294 | 37.0k | return ExecuteGet(); |
1295 | 132 | case RedisReadRequestPB::kGetTtlRequest: |
1296 | 132 | return ExecuteGetTtl(); |
1297 | 0 | case RedisReadRequestPB::kStrlenRequest: |
1298 | 0 | return ExecuteStrLen(); |
1299 | 0 | case RedisReadRequestPB::kExistsRequest: |
1300 | 0 | return ExecuteExists(); |
1301 | 0 | case RedisReadRequestPB::kGetRangeRequest: |
1302 | 0 | return ExecuteGetRange(); |
1303 | 5.47k | case RedisReadRequestPB::kGetCollectionRangeRequest: |
1304 | 5.47k | return ExecuteCollectionGetRange(); |
1305 | 297 | case RedisReadRequestPB::kKeysRequest: |
1306 | 297 | return ExecuteKeys(); |
1307 | 0 | default: |
1308 | 0 | return STATUS_FORMAT( |
1309 | 42.9k | Corruption, "Unsupported redis read operation: $0", request_.request_case()); |
1310 | 42.9k | } |
1311 | 42.9k | } |
1312 | | |
1313 | | namespace { |
1314 | | |
1315 | 0 | ssize_t ApplyIndex(ssize_t index, ssize_t len) { |
1316 | 0 | if (index < 0) { |
1317 | 0 | index += len; |
1318 | 0 | return std::max<ssize_t>(index, 0); |
1319 | 0 | } |
1320 | 0 | return std::min<ssize_t>(index, len); |
1321 | 0 | } |
1322 | | |
1323 | | } // namespace |
1324 | | |
1325 | | Status RedisReadOperation::ExecuteHGetAllLikeCommands(ValueType value_type, |
1326 | | bool add_keys, |
1327 | 18 | bool add_values) { |
1328 | 18 | SubDocKey doc_key( |
1329 | 18 | DocKey::FromRedisKey(request_.key_value().hash_code(), request_.key_value().key())); |
1330 | 18 | SubDocument doc; |
1331 | 18 | bool doc_found = false; |
1332 | 18 | auto encoded_doc_key = doc_key.EncodeWithoutHt(); |
1333 | | |
1334 | | // TODO(dtxn) - pass correct transaction context when we implement cross-shard transactions |
1335 | | // support for Redis. |
1336 | 18 | GetRedisSubDocumentData data = { encoded_doc_key, &doc, &doc_found }; |
1337 | 18 | data.deadline_info = deadline_info_.get_ptr(); |
1338 | | |
1339 | 18 | bool has_cardinality_subkey = value_type == ValueType::kRedisSortedSet || |
1340 | 18 | value_type == ValueType::kRedisList; |
1341 | 18 | bool return_array_response = add_keys || add_values; |
1342 | | |
1343 | 18 | if (has_cardinality_subkey) { |
1344 | 0 | data.return_type_only = !return_array_response; |
1345 | 18 | } else { |
1346 | 18 | data.count_only = !return_array_response; |
1347 | 18 | } |
1348 | | |
1349 | 18 | RETURN_NOT_OK(GetRedisSubDocument(iterator_.get(), data, /* projection */ nullptr, |
1350 | 18 | SeekFwdSuffices::kFalse)); |
1351 | 18 | if (return_array_response) |
1352 | 0 | response_.set_allocated_array_response(new RedisArrayPB()); |
1353 | | |
1354 | 18 | if (!doc_found) { |
1355 | 3 | response_.set_code(RedisResponsePB::OK); |
1356 | 3 | if (!return_array_response) |
1357 | 3 | response_.set_int_response(0); |
1358 | 3 | return Status::OK(); |
1359 | 3 | } |
1360 | | |
1361 | 15 | if (VerifyTypeAndSetCode(value_type, doc.value_type(), &response_)) { |
1362 | 12 | if (return_array_response) { |
1363 | 0 | RETURN_NOT_OK(PopulateResponseFrom(doc.object_container(), AddResponseValuesGeneric, |
1364 | 0 | &response_, add_keys, add_values)); |
1365 | 12 | } else { |
1366 | 12 | int64_t card = has_cardinality_subkey ? |
1367 | 12 | VERIFY_RESULT(GetCardinality(iterator_.get(), request_.key_value())) : |
1368 | 12 | data.record_count; |
1369 | 12 | response_.set_int_response(card); |
1370 | 12 | response_.set_code(RedisResponsePB::OK); |
1371 | 12 | } |
1372 | 12 | } |
1373 | 15 | return Status::OK(); |
1374 | 15 | } |
1375 | | |
1376 | | Status RedisReadOperation::ExecuteCollectionGetRangeByBounds( |
1377 | 0 | RedisCollectionGetRangeRequestPB::GetRangeRequestType request_type, bool add_keys) { |
1378 | 0 | RedisSubKeyBoundPB lower_bound; |
1379 | 0 | lower_bound.set_infinity_type(RedisSubKeyBoundPB::NEGATIVE); |
1380 | 0 | RedisSubKeyBoundPB upper_bound; |
1381 | 0 | upper_bound.set_infinity_type(RedisSubKeyBoundPB::POSITIVE); |
1382 | 0 | return ExecuteCollectionGetRangeByBounds(request_type, lower_bound, upper_bound, add_keys); |
1383 | 0 | } |
1384 | | |
1385 | | Status RedisReadOperation::ExecuteCollectionGetRangeByBounds( |
1386 | | RedisCollectionGetRangeRequestPB::GetRangeRequestType request_type, |
1387 | 5.47k | const RedisSubKeyBoundPB& lower_bound, const RedisSubKeyBoundPB& upper_bound, bool add_keys) { |
1388 | 5.47k | if ((lower_bound.has_infinity_type() && |
1389 | 57 | lower_bound.infinity_type() == RedisSubKeyBoundPB::POSITIVE) || |
1390 | 5.47k | (upper_bound.has_infinity_type() && |
1391 | 57 | upper_bound.infinity_type() == RedisSubKeyBoundPB::NEGATIVE)) { |
1392 | | // Return empty response. |
1393 | 0 | response_.set_code(RedisResponsePB::OK); |
1394 | 0 | RETURN_NOT_OK(PopulateResponseFrom( |
1395 | 0 | SubDocument::ObjectContainer(), AddResponseValuesGeneric, &response_, /* add_keys */ true, |
1396 | 0 | /* add_values */ true)); |
1397 | 0 | return Status::OK(); |
1398 | 5.47k | } |
1399 | | |
1400 | 5.47k | if (request_type == RedisCollectionGetRangeRequestPB::ZRANGEBYSCORE) { |
1401 | 42 | auto type = VERIFY_RESULT(GetValueType()); |
1402 | 42 | auto expected_type = REDIS_TYPE_SORTEDSET; |
1403 | 42 | if (!VerifyTypeAndSetCode(expected_type, type, &response_, VerifySuccessIfMissing::kTrue)) { |
1404 | 3 | return Status::OK(); |
1405 | 3 | } |
1406 | 39 | auto encoded_doc_key = |
1407 | 39 | DocKey::EncodedFromRedisKey(request_.key_value().hash_code(), request_.key_value().key()); |
1408 | 39 | PrimitiveValue(ValueType::kSSForward).AppendToKey(&encoded_doc_key); |
1409 | 39 | double low_double = lower_bound.subkey_bound().double_subkey(); |
1410 | 39 | double high_double = upper_bound.subkey_bound().double_subkey(); |
1411 | | |
1412 | 39 | KeyBytes low_sub_key_bound; |
1413 | 39 | KeyBytes high_sub_key_bound; |
1414 | | |
1415 | 39 | SliceKeyBound low_subkey; |
1416 | 39 | if (!lower_bound.has_infinity_type()) { |
1417 | 6 | low_sub_key_bound = encoded_doc_key; |
1418 | 6 | PrimitiveValue::Double(low_double).AppendToKey(&low_sub_key_bound); |
1419 | 6 | low_subkey = SliceKeyBound(low_sub_key_bound, LowerBound(lower_bound.is_exclusive())); |
1420 | 6 | } |
1421 | 39 | SliceKeyBound high_subkey; |
1422 | 39 | if (!upper_bound.has_infinity_type()) { |
1423 | 6 | high_sub_key_bound = encoded_doc_key; |
1424 | 6 | PrimitiveValue::Double(high_double).AppendToKey(&high_sub_key_bound); |
1425 | 6 | high_subkey = SliceKeyBound(high_sub_key_bound, UpperBound(upper_bound.is_exclusive())); |
1426 | 6 | } |
1427 | | |
1428 | 39 | SubDocument doc; |
1429 | 39 | bool doc_found = false; |
1430 | 39 | GetRedisSubDocumentData data = {encoded_doc_key, &doc, &doc_found}; |
1431 | 39 | data.deadline_info = deadline_info_.get_ptr(); |
1432 | 39 | data.low_subkey = &low_subkey; |
1433 | 39 | data.high_subkey = &high_subkey; |
1434 | | |
1435 | 39 | IndexBound low_index; |
1436 | 39 | IndexBound high_index; |
1437 | 39 | if (request_.has_range_request_limit()) { |
1438 | 21 | auto offset = request_.index_range().lower_bound().index(); |
1439 | 21 | int32_t limit = request_.range_request_limit(); |
1440 | | |
1441 | 21 | if (offset < 0 || limit == 0) { |
1442 | | // Return an empty response. |
1443 | 9 | response_.set_code(RedisResponsePB::OK); |
1444 | 9 | RETURN_NOT_OK(PopulateResponseFrom( |
1445 | 9 | SubDocument::ObjectContainer(), AddResponseValuesGeneric, &response_, |
1446 | 9 | /* add_keys */ true, /* add_values */ true)); |
1447 | 9 | return Status::OK(); |
1448 | 12 | } |
1449 | | |
1450 | 12 | low_index = IndexBound(offset, true /* is_lower */); |
1451 | 12 | data.low_index = &low_index; |
1452 | 12 | if (limit > 0) { |
1453 | | // Only define upper bound if limit is positive. |
1454 | 9 | high_index = IndexBound(offset + limit - 1, false /* is_lower */); |
1455 | 9 | data.high_index = &high_index; |
1456 | 9 | } |
1457 | 12 | } |
1458 | 30 | RETURN_NOT_OK(GetAndPopulateResponseValues( |
1459 | 30 | iterator_.get(), AddResponseValuesSortedSets, data, ValueType::kObject, request_, |
1460 | 30 | &response_, |
1461 | 30 | /* add_keys */ add_keys, /* add_values */ true, /* reverse */ false)); |
1462 | 5.43k | } else { |
1463 | 5.43k | auto encoded_doc_key = |
1464 | 5.43k | DocKey::EncodedFromRedisKey(request_.key_value().hash_code(), request_.key_value().key()); |
1465 | 5.43k | int64_t low_timestamp = lower_bound.subkey_bound().timestamp_subkey(); |
1466 | 5.43k | int64_t high_timestamp = upper_bound.subkey_bound().timestamp_subkey(); |
1467 | | |
1468 | 5.43k | KeyBytes low_sub_key_bound; |
1469 | 5.43k | KeyBytes high_sub_key_bound; |
1470 | | |
1471 | 5.43k | SliceKeyBound low_subkey; |
1472 | | // Need to switch the order since we store the timestamps in descending order. |
1473 | 5.43k | if (!upper_bound.has_infinity_type()) { |
1474 | 5.41k | low_sub_key_bound = encoded_doc_key; |
1475 | 5.41k | PrimitiveValue(high_timestamp, SortOrder::kDescending).AppendToKey(&low_sub_key_bound); |
1476 | 5.41k | low_subkey = SliceKeyBound(low_sub_key_bound, LowerBound(upper_bound.is_exclusive())); |
1477 | 5.41k | } |
1478 | 5.43k | SliceKeyBound high_subkey; |
1479 | 5.43k | if (!lower_bound.has_infinity_type()) { |
1480 | 5.41k | high_sub_key_bound = encoded_doc_key; |
1481 | 5.41k | PrimitiveValue(low_timestamp, SortOrder::kDescending).AppendToKey(&high_sub_key_bound); |
1482 | 5.41k | high_subkey = SliceKeyBound(high_sub_key_bound, UpperBound(lower_bound.is_exclusive())); |
1483 | 5.41k | } |
1484 | | |
1485 | 5.43k | SubDocument doc; |
1486 | 5.43k | bool doc_found = false; |
1487 | 5.43k | GetRedisSubDocumentData data = {encoded_doc_key, &doc, &doc_found}; |
1488 | 5.43k | data.deadline_info = deadline_info_.get_ptr(); |
1489 | 5.43k | data.low_subkey = &low_subkey; |
1490 | 5.43k | data.high_subkey = &high_subkey; |
1491 | 5.43k | data.limit = request_.range_request_limit(); |
1492 | 5.43k | bool is_reverse = true; |
1493 | 5.43k | if (request_type == RedisCollectionGetRangeRequestPB::TSREVRANGEBYTIME) { |
1494 | | // If reverse is false, newest element is the first element returned. |
1495 | 5.40k | is_reverse = false; |
1496 | 5.40k | } |
1497 | 5.43k | RETURN_NOT_OK(GetAndPopulateResponseValues( |
1498 | 5.43k | iterator_.get(), AddResponseValuesGeneric, data, ValueType::kRedisTS, request_, &response_, |
1499 | 5.43k | /* add_keys */ true, /* add_values */ true, is_reverse)); |
1500 | 5.43k | } |
1501 | 5.46k | return Status::OK(); |
1502 | 5.47k | } |
1503 | | |
1504 | 5.47k | Status RedisReadOperation::ExecuteCollectionGetRange() { |
1505 | 5.47k | const RedisKeyValuePB& key_value = request_.key_value(); |
1506 | 5.47k | if (!request_.has_key_value() || !key_value.has_key()) { |
1507 | 0 | return STATUS(InvalidArgument, "Need to specify the key"); |
1508 | 0 | } |
1509 | | |
1510 | 5.47k | const auto request_type = request_.get_collection_range_request().request_type(); |
1511 | 5.47k | switch (request_type) { |
1512 | 5.40k | case RedisCollectionGetRangeRequestPB::TSREVRANGEBYTIME: |
1513 | 5.40k | FALLTHROUGH_INTENDED; |
1514 | 5.44k | case RedisCollectionGetRangeRequestPB::ZRANGEBYSCORE: FALLTHROUGH_INTENDED; |
1515 | 5.47k | case RedisCollectionGetRangeRequestPB::TSRANGEBYTIME: { |
1516 | 5.47k | if(!request_.has_subkey_range() || !request_.subkey_range().has_lower_bound() || |
1517 | 5.47k | !request_.subkey_range().has_upper_bound()) { |
1518 | 0 | return STATUS(InvalidArgument, "Need to specify the subkey range"); |
1519 | 0 | } |
1520 | 5.47k | const RedisSubKeyBoundPB& lower_bound = request_.subkey_range().lower_bound(); |
1521 | 5.47k | const RedisSubKeyBoundPB& upper_bound = request_.subkey_range().upper_bound(); |
1522 | 5.47k | const bool add_keys = request_.get_collection_range_request().with_scores(); |
1523 | 5.47k | return ExecuteCollectionGetRangeByBounds(request_type, lower_bound, upper_bound, add_keys); |
1524 | 5.47k | } |
1525 | 0 | case RedisCollectionGetRangeRequestPB::ZRANGE: FALLTHROUGH_INTENDED; |
1526 | 0 | case RedisCollectionGetRangeRequestPB::ZREVRANGE: { |
1527 | 0 | if(!request_.has_index_range() || !request_.index_range().has_lower_bound() || |
1528 | 0 | !request_.index_range().has_upper_bound()) { |
1529 | 0 | return STATUS(InvalidArgument, "Need to specify the index range"); |
1530 | 0 | } |
1531 | | |
1532 | | // First make sure is of type sorted set or none. |
1533 | 0 | RedisDataType type = VERIFY_RESULT(GetValueType()); |
1534 | 0 | auto expected_type = RedisDataType::REDIS_TYPE_SORTEDSET; |
1535 | 0 | if (!VerifyTypeAndSetCode(expected_type, type, &response_, VerifySuccessIfMissing::kTrue)) { |
1536 | 0 | return Status::OK(); |
1537 | 0 | } |
1538 | | |
1539 | 0 | int64_t card = VERIFY_RESULT(GetCardinality(iterator_.get(), request_.key_value())); |
1540 | |
|
1541 | 0 | const RedisIndexBoundPB& low_index_bound = request_.index_range().lower_bound(); |
1542 | 0 | const RedisIndexBoundPB& high_index_bound = request_.index_range().upper_bound(); |
1543 | |
|
1544 | 0 | int64 low_idx_normalized, high_idx_normalized; |
1545 | |
|
1546 | 0 | int64 low_idx = low_index_bound.index(); |
1547 | 0 | int64 high_idx = high_index_bound.index(); |
1548 | | // Normalize the bounds to be positive and go from low to high index. |
1549 | 0 | bool reverse = false; |
1550 | 0 | if (request_type == RedisCollectionGetRangeRequestPB::ZREVRANGE) { |
1551 | 0 | reverse = true; |
1552 | 0 | } |
1553 | 0 | GetNormalizedBounds( |
1554 | 0 | low_idx, high_idx, card, reverse, &low_idx_normalized, &high_idx_normalized); |
1555 | |
|
1556 | 0 | if (high_idx_normalized < low_idx_normalized) { |
1557 | | // Return empty response. |
1558 | 0 | response_.set_code(RedisResponsePB::OK); |
1559 | 0 | RETURN_NOT_OK(PopulateResponseFrom(SubDocument::ObjectContainer(), |
1560 | 0 | AddResponseValuesGeneric, |
1561 | 0 | &response_, /* add_keys */ |
1562 | 0 | true, /* add_values */ |
1563 | 0 | true)); |
1564 | 0 | return Status::OK(); |
1565 | 0 | } |
1566 | 0 | auto encoded_doc_key = DocKey::EncodedFromRedisKey( |
1567 | 0 | request_.key_value().hash_code(), request_.key_value().key()); |
1568 | 0 | PrimitiveValue(ValueType::kSSForward).AppendToKey(&encoded_doc_key); |
1569 | |
|
1570 | 0 | bool add_keys = request_.get_collection_range_request().with_scores(); |
1571 | |
|
1572 | 0 | IndexBound low_bound = IndexBound(low_idx_normalized, true /* is_lower */); |
1573 | 0 | IndexBound high_bound = IndexBound(high_idx_normalized, false /* is_lower */); |
1574 | |
|
1575 | 0 | SubDocument doc; |
1576 | 0 | bool doc_found = false; |
1577 | 0 | GetRedisSubDocumentData data = { encoded_doc_key, &doc, &doc_found}; |
1578 | 0 | data.deadline_info = deadline_info_.get_ptr(); |
1579 | 0 | data.low_index = &low_bound; |
1580 | 0 | data.high_index = &high_bound; |
1581 | |
|
1582 | 0 | RETURN_NOT_OK(GetAndPopulateResponseValues( |
1583 | 0 | iterator_.get(), AddResponseValuesSortedSets, data, ValueType::kObject, request_, |
1584 | 0 | &response_, add_keys, /* add_values */ true, reverse)); |
1585 | 0 | break; |
1586 | 0 | } |
1587 | 0 | case RedisCollectionGetRangeRequestPB::UNKNOWN: |
1588 | 0 | return STATUS(InvalidCommand, "Unknown Collection Get Range Request not supported"); |
1589 | 0 | } |
1590 | | |
1591 | 0 | return Status::OK(); |
1592 | 0 | } |
1593 | | |
1594 | 37.0k | Result<RedisDataType> RedisReadOperation::GetValueType(int subkey_index) { |
1595 | 37.0k | return GetRedisValueType(iterator_.get(), request_.key_value(), |
1596 | 37.0k | nullptr /* doc_write_batch */, subkey_index); |
1597 | 37.0k | } |
1598 | | |
1599 | 609 | Result<RedisValue> RedisReadOperation::GetOverrideValue(int subkey_index) { |
1600 | 609 | return GetRedisValue(iterator_.get(), request_.key_value(), |
1601 | 609 | subkey_index, /* always_override */ true); |
1602 | 609 | } |
1603 | | |
1604 | 36.3k | Result<RedisValue> RedisReadOperation::GetValue(int subkey_index) { |
1605 | 36.3k | return GetRedisValue(iterator_.get(), request_.key_value(), subkey_index); |
1606 | 36.3k | } |
1607 | | |
1608 | | namespace { |
1609 | | |
1610 | | // Note: Do not use if also retrieving other value, as some work will be repeated. |
1611 | | // Assumes every value has a TTL, and the TTL is stored in the row with this key. |
1612 | | // Also observe that tombstone checking only works because we assume the key has |
1613 | | // no ancestors. |
1614 | | Result<boost::optional<Expiration>> GetTtl( |
1615 | 132 | const Slice& encoded_subdoc_key, IntentAwareIterator* iter) { |
1616 | 132 | auto dockey_size = |
1617 | 132 | VERIFY_RESULT(DocKey::EncodedSize(encoded_subdoc_key, DocKeyPart::kWholeDocKey)); |
1618 | 132 | Slice key_slice(encoded_subdoc_key.data(), dockey_size); |
1619 | 132 | iter->Seek(key_slice); |
1620 | 132 | if (!iter->valid()) |
1621 | 6 | return boost::none; |
1622 | 126 | auto key_data = VERIFY_RESULT(iter->FetchKey()); |
1623 | 126 | if (!key_data.key.compare(key_slice)) { |
1624 | 126 | Value doc_value = Value(PrimitiveValue(ValueType::kInvalid)); |
1625 | 126 | RETURN_NOT_OK(doc_value.Decode(iter->value())); |
1626 | 126 | if (doc_value.value_type() != ValueType::kTombstone) { |
1627 | 108 | return Expiration(key_data.write_time.hybrid_time(), doc_value.ttl()); |
1628 | 108 | } |
1629 | 18 | } |
1630 | 18 | return boost::none; |
1631 | 18 | } |
1632 | | |
1633 | | } // namespace |
1634 | | |
1635 | 132 | Status RedisReadOperation::ExecuteGetTtl() { |
1636 | 132 | const RedisKeyValuePB& kv = request_.key_value(); |
1637 | 132 | if (!kv.has_key()) { |
1638 | 0 | return STATUS(Corruption, "Expected KeyValuePB"); |
1639 | 0 | } |
1640 | | // We currently only support getting and setting TTL on top level keys. |
1641 | 132 | if (!kv.subkey().empty()) { |
1642 | 0 | return STATUS_SUBSTITUTE(Corruption, |
1643 | 0 | "Expected no subkeys, got $0", kv.subkey().size()); |
1644 | 0 | } |
1645 | | |
1646 | 132 | auto encoded_doc_key = DocKey::EncodedFromRedisKey(kv.hash_code(), kv.key()); |
1647 | 132 | auto maybe_ttl_exp = VERIFY_RESULT(GetTtl(encoded_doc_key.AsSlice(), iterator_.get())); |
1648 | | |
1649 | 132 | if (!maybe_ttl_exp.has_value()) { |
1650 | 24 | response_.set_int_response(-2); |
1651 | 24 | return Status::OK(); |
1652 | 24 | } |
1653 | | |
1654 | 108 | auto exp = maybe_ttl_exp.get(); |
1655 | 108 | if (exp.ttl.Equals(Value::kMaxTtl)) { |
1656 | 24 | response_.set_int_response(-1); |
1657 | 24 | return Status::OK(); |
1658 | 24 | } |
1659 | | |
1660 | 84 | MonoDelta ttl = VERIFY_RESULT(exp.ComputeRelativeTtl(iterator_->read_time().read)); |
1661 | 84 | if (ttl.IsNegative()) { |
1662 | | // The value has expired. |
1663 | 24 | response_.set_int_response(-2); |
1664 | 24 | return Status::OK(); |
1665 | 24 | } |
1666 | | |
1667 | 60 | response_.set_int_response(request_.get_ttl_request().return_seconds() ? |
1668 | 30 | (int64_t) std::round(ttl.ToSeconds()) : |
1669 | 30 | ttl.ToMilliseconds()); |
1670 | 60 | return Status::OK(); |
1671 | 60 | } |
1672 | | |
1673 | 0 | Status RedisReadOperation::ExecuteGetForRename() { |
1674 | 0 | RedisDataType type = VERIFY_RESULT(GetValueType()); |
1675 | 0 | response_.set_type(type); |
1676 | 0 | switch (type) { |
1677 | 0 | case RedisDataType::REDIS_TYPE_STRING: { |
1678 | 0 | return ExecuteGet(RedisGetRequestPB::GET); |
1679 | 0 | } |
1680 | | |
1681 | 0 | case RedisDataType::REDIS_TYPE_HASH: { |
1682 | 0 | return ExecuteGet(RedisGetRequestPB::HGETALL); |
1683 | 0 | } |
1684 | | |
1685 | 0 | case RedisDataType::REDIS_TYPE_SET: { |
1686 | 0 | return ExecuteGet(RedisGetRequestPB::SMEMBERS); |
1687 | 0 | } |
1688 | | |
1689 | 0 | case RedisDataType::REDIS_TYPE_SORTEDSET: { |
1690 | 0 | return ExecuteCollectionGetRangeByBounds( |
1691 | 0 | RedisCollectionGetRangeRequestPB::ZRANGEBYSCORE, true); |
1692 | 0 | } |
1693 | | |
1694 | 0 | case RedisDataType::REDIS_TYPE_TIMESERIES: { |
1695 | 0 | return ExecuteCollectionGetRangeByBounds( |
1696 | 0 | RedisCollectionGetRangeRequestPB::TSRANGEBYTIME, true); |
1697 | 0 | } |
1698 | | |
1699 | 0 | case RedisDataType::REDIS_TYPE_NONE: { |
1700 | 0 | response_.set_code(RedisResponsePB::NOT_FOUND); |
1701 | 0 | return Status::OK(); |
1702 | 0 | } |
1703 | | |
1704 | 0 | case RedisDataType::REDIS_TYPE_LIST: |
1705 | 0 | default: { |
1706 | 0 | LOG(DFATAL) << "Unhandled Redis Data Type " << type; |
1707 | 0 | } |
1708 | 0 | } |
1709 | 0 | return Status::OK(); |
1710 | 0 | } |
1711 | | |
1712 | 0 | Status RedisReadOperation::ExecuteGet(RedisGetRequestPB::GetRequestType type) { |
1713 | 0 | RedisGetRequestPB request; |
1714 | 0 | request.set_request_type(type); |
1715 | 0 | return ExecuteGet(request); |
1716 | 0 | } |
1717 | | |
1718 | 37.0k | Status RedisReadOperation::ExecuteGet() { return ExecuteGet(request_.get_request()); } |
1719 | | |
1720 | 37.0k | Status RedisReadOperation::ExecuteGet(const RedisGetRequestPB& get_request) { |
1721 | 37.0k | auto request_type = get_request.request_type(); |
1722 | 37.0k | RedisDataType expected_type = REDIS_TYPE_NONE; |
1723 | 37.0k | switch (request_type) { |
1724 | 36.3k | case RedisGetRequestPB::GET: |
1725 | 36.3k | expected_type = REDIS_TYPE_STRING; break; |
1726 | 609 | case RedisGetRequestPB::TSGET: |
1727 | 609 | expected_type = REDIS_TYPE_TIMESERIES; break; |
1728 | 0 | case RedisGetRequestPB::HGET: FALLTHROUGH_INTENDED; |
1729 | 0 | case RedisGetRequestPB::HEXISTS: |
1730 | 0 | expected_type = REDIS_TYPE_HASH; break; |
1731 | 0 | case RedisGetRequestPB::SISMEMBER: |
1732 | 0 | expected_type = REDIS_TYPE_SET; break; |
1733 | 21 | case RedisGetRequestPB::ZSCORE: |
1734 | 21 | expected_type = REDIS_TYPE_SORTEDSET; break; |
1735 | 18 | default: |
1736 | 18 | expected_type = REDIS_TYPE_NONE; |
1737 | 37.0k | } |
1738 | 37.0k | switch (request_type) { |
1739 | 36.3k | case RedisGetRequestPB::GET: FALLTHROUGH_INTENDED; |
1740 | 36.9k | case RedisGetRequestPB::TSGET: FALLTHROUGH_INTENDED; |
1741 | 36.9k | case RedisGetRequestPB::HGET: { |
1742 | 36.9k | RedisDataType type = VERIFY_RESULT(GetValueType()); |
1743 | | // TODO: this is primarily glue for the Timeseries bug where the parent |
1744 | | // may get compacted due to an outdated TTL even though the children |
1745 | | // have longer TTL's and thus still exist. When fixing, take note that |
1746 | | // GetValueType finds the value type of the parent, so if the parent |
1747 | | // does not have the maximum TTL, it will return REDIS_TYPE_NONE when it |
1748 | | // should not. |
1749 | 36.9k | if (expected_type == REDIS_TYPE_TIMESERIES && type == REDIS_TYPE_NONE) { |
1750 | 0 | type = expected_type; |
1751 | 0 | } |
1752 | | // If wrong type, we set the error code in the response. |
1753 | 36.9k | if (VerifyTypeAndSetCode(expected_type, type, &response_, VerifySuccessIfMissing::kTrue)) { |
1754 | 36.3k | auto value = request_type == RedisGetRequestPB::TSGET ? GetOverrideValue() : GetValue(); |
1755 | 36.9k | RETURN_NOT_OK(value); |
1756 | 36.9k | if (VerifyTypeAndSetCode(RedisDataType::REDIS_TYPE_STRING, value->type, &response_, |
1757 | 36.9k | VerifySuccessIfMissing::kTrue)) { |
1758 | 36.9k | response_.set_string_response(value->value); |
1759 | 36.9k | } |
1760 | 36.9k | } |
1761 | 36.9k | return Status::OK(); |
1762 | 36.9k | } |
1763 | 21 | case RedisGetRequestPB::ZSCORE: { |
1764 | 21 | RedisDataType type = VERIFY_RESULT(GetValueType()); |
1765 | | // If wrong type, we set the error code in the response. |
1766 | 21 | if (!VerifyTypeAndSetCode(expected_type, type, &response_, VerifySuccessIfMissing::kTrue)) { |
1767 | 3 | return Status::OK(); |
1768 | 3 | } |
1769 | 18 | SubDocKey key_reverse = SubDocKey( |
1770 | 18 | DocKey::FromRedisKey(request_.key_value().hash_code(), request_.key_value().key()), |
1771 | 18 | PrimitiveValue(ValueType::kSSReverse), |
1772 | 18 | PrimitiveValue(request_.key_value().subkey(0).string_subkey())); |
1773 | 18 | SubDocument subdoc_reverse; |
1774 | 18 | bool subdoc_reverse_found = false; |
1775 | 18 | auto encoded_key_reverse = key_reverse.EncodeWithoutHt(); |
1776 | 18 | GetRedisSubDocumentData get_data = { |
1777 | 18 | encoded_key_reverse, &subdoc_reverse, &subdoc_reverse_found }; |
1778 | 18 | RETURN_NOT_OK(GetRedisSubDocument(doc_db_, get_data, redis_query_id(), |
1779 | 18 | TransactionOperationContext(), deadline_, read_time_)); |
1780 | 18 | if (subdoc_reverse_found) { |
1781 | 12 | double score = subdoc_reverse.GetDouble(); |
1782 | 12 | response_.set_string_response(std::to_string(score)); |
1783 | 6 | } else { |
1784 | 6 | response_.set_code(RedisResponsePB::NIL); |
1785 | 6 | } |
1786 | 18 | return Status::OK(); |
1787 | 18 | } |
1788 | 0 | case RedisGetRequestPB::HEXISTS: FALLTHROUGH_INTENDED; |
1789 | 0 | case RedisGetRequestPB::SISMEMBER: { |
1790 | 0 | RedisDataType type = VERIFY_RESULT(GetValueType()); |
1791 | 0 | if (VerifyTypeAndSetCode(expected_type, type, &response_, VerifySuccessIfMissing::kTrue)) { |
1792 | 0 | RedisDataType subtype = VERIFY_RESULT(GetValueType(0)); |
1793 | 0 | SetOptionalInt(subtype, 1, &response_); |
1794 | 0 | response_.set_code(RedisResponsePB::OK); |
1795 | 0 | } |
1796 | 0 | return Status::OK(); |
1797 | 0 | } |
1798 | 0 | case RedisGetRequestPB::HSTRLEN: { |
1799 | 0 | RedisDataType type = VERIFY_RESULT(GetValueType()); |
1800 | 0 | if (VerifyTypeAndSetCode(RedisDataType::REDIS_TYPE_HASH, type, &response_, |
1801 | 0 | VerifySuccessIfMissing::kTrue)) { |
1802 | 0 | auto value = GetValue(); |
1803 | 0 | RETURN_NOT_OK(value); |
1804 | 0 | SetOptionalInt(value->type, value->value.length(), &response_); |
1805 | 0 | response_.set_code(RedisResponsePB::OK); |
1806 | 0 | } |
1807 | 0 | return Status::OK(); |
1808 | 0 | } |
1809 | 0 | case RedisGetRequestPB::MGET: { |
1810 | 0 | return STATUS(NotSupported, "MGET not yet supported"); |
1811 | 0 | } |
1812 | 0 | case RedisGetRequestPB::HMGET: { |
1813 | 0 | RedisDataType type = VERIFY_RESULT(GetValueType()); |
1814 | 0 | if (!VerifyTypeAndSetCode(RedisDataType::REDIS_TYPE_HASH, type, &response_, |
1815 | 0 | VerifySuccessIfMissing::kTrue)) { |
1816 | 0 | return Status::OK(); |
1817 | 0 | } |
1818 | | |
1819 | 0 | response_.set_allocated_array_response(new RedisArrayPB()); |
1820 | 0 | const auto& req_kv = request_.key_value(); |
1821 | 0 | auto num_subkeys = req_kv.subkey_size(); |
1822 | 0 | vector<int> indices(num_subkeys); |
1823 | 0 | for (int i = 0; i < num_subkeys; ++i) { |
1824 | 0 | indices[i] = i; |
1825 | 0 | } |
1826 | 0 | std::sort(indices.begin(), indices.end(), [&req_kv](int i, int j) { |
1827 | 0 | return req_kv.subkey(i).string_subkey() < req_kv.subkey(j).string_subkey(); |
1828 | 0 | }); |
1829 | |
|
1830 | 0 | string current_value = ""; |
1831 | 0 | response_.mutable_array_response()->mutable_elements()->Reserve(num_subkeys); |
1832 | 0 | for (int i = 0; i < num_subkeys; ++i) { |
1833 | 0 | response_.mutable_array_response()->add_elements(); |
1834 | 0 | } |
1835 | 0 | for (int i = 0; i < num_subkeys; ++i) { |
1836 | 0 | if (i == 0 || |
1837 | 0 | req_kv.subkey(indices[i]).string_subkey() != |
1838 | 0 | req_kv.subkey(indices[i - 1]).string_subkey()) { |
1839 | | // If the condition above is false, we encountered the same key again, no need to call |
1840 | | // GetValue() once more, current_value is already correct. |
1841 | 0 | auto value = GetValue(indices[i]); |
1842 | 0 | RETURN_NOT_OK(value); |
1843 | 0 | if (value->type == REDIS_TYPE_STRING) { |
1844 | 0 | current_value = std::move(value->value); |
1845 | 0 | } else { |
1846 | 0 | current_value = ""; // Empty string is nil response. |
1847 | 0 | } |
1848 | 0 | } |
1849 | 0 | *response_.mutable_array_response()->mutable_elements(indices[i]) = current_value; |
1850 | 0 | } |
1851 | |
|
1852 | 0 | response_.set_code(RedisResponsePB::OK); |
1853 | 0 | return Status::OK(); |
1854 | 0 | } |
1855 | 0 | case RedisGetRequestPB::HGETALL: |
1856 | 0 | return ExecuteHGetAllLikeCommands(ValueType::kObject, true, true); |
1857 | 0 | case RedisGetRequestPB::HKEYS: |
1858 | 0 | return ExecuteHGetAllLikeCommands(ValueType::kObject, true, false); |
1859 | 0 | case RedisGetRequestPB::HVALS: |
1860 | 0 | return ExecuteHGetAllLikeCommands(ValueType::kObject, false, true); |
1861 | 0 | case RedisGetRequestPB::HLEN: |
1862 | 0 | return ExecuteHGetAllLikeCommands(ValueType::kObject, false, false); |
1863 | 0 | case RedisGetRequestPB::SMEMBERS: |
1864 | 0 | return ExecuteHGetAllLikeCommands(ValueType::kRedisSet, true, false); |
1865 | 0 | case RedisGetRequestPB::SCARD: |
1866 | 0 | return ExecuteHGetAllLikeCommands(ValueType::kRedisSet, false, false); |
1867 | 18 | case RedisGetRequestPB::TSCARD: |
1868 | 18 | return ExecuteHGetAllLikeCommands(ValueType::kRedisTS, false, false); |
1869 | 0 | case RedisGetRequestPB::ZCARD: |
1870 | 0 | return ExecuteHGetAllLikeCommands(ValueType::kRedisSortedSet, false, false); |
1871 | 0 | case RedisGetRequestPB::LLEN: |
1872 | 0 | return ExecuteHGetAllLikeCommands(ValueType::kRedisList, false, false); |
1873 | 0 | case RedisGetRequestPB::UNKNOWN: { |
1874 | 0 | return STATUS(InvalidCommand, "Unknown Get Request not supported"); |
1875 | 0 | } |
1876 | 0 | } |
1877 | 0 | return Status::OK(); |
1878 | 0 | } |
1879 | | |
1880 | 0 | Status RedisReadOperation::ExecuteStrLen() { |
1881 | 0 | auto value = GetValue(); |
1882 | 0 | response_.set_code(RedisResponsePB::OK); |
1883 | 0 | RETURN_NOT_OK(value); |
1884 | |
|
1885 | 0 | if (VerifyTypeAndSetCode(RedisDataType::REDIS_TYPE_STRING, value->type, &response_, |
1886 | 0 | VerifySuccessIfMissing::kTrue)) { |
1887 | 0 | SetOptionalInt(value->type, value->value.length(), &response_); |
1888 | 0 | } |
1889 | 0 | response_.set_code(RedisResponsePB::OK); |
1890 | |
|
1891 | 0 | return Status::OK(); |
1892 | 0 | } |
1893 | | |
1894 | 0 | Status RedisReadOperation::ExecuteExists() { |
1895 | 0 | auto value = GetValue(); |
1896 | 0 | response_.set_code(RedisResponsePB::OK); |
1897 | 0 | RETURN_NOT_OK(value); |
1898 | | |
1899 | | // We only support exist command with one argument currently. |
1900 | 0 | SetOptionalInt(value->type, 1, &response_); |
1901 | 0 | response_.set_code(RedisResponsePB::OK); |
1902 | |
|
1903 | 0 | return Status::OK(); |
1904 | 0 | } |
1905 | | |
1906 | 0 | Status RedisReadOperation::ExecuteGetRange() { |
1907 | 0 | auto value = GetValue(); |
1908 | 0 | RETURN_NOT_OK(value); |
1909 | |
|
1910 | 0 | if (!VerifyTypeAndSetCode(RedisDataType::REDIS_TYPE_STRING, value->type, &response_, |
1911 | 0 | VerifySuccessIfMissing::kTrue)) { |
1912 | | // We've already set the error code in the response. |
1913 | 0 | return Status::OK(); |
1914 | 0 | } |
1915 | | |
1916 | 0 | const ssize_t len = value->value.length(); |
1917 | 0 | ssize_t exclusive_end = request_.get_range_request().end() + 1; |
1918 | 0 | if (exclusive_end == 0) { |
1919 | 0 | exclusive_end = len; |
1920 | 0 | } |
1921 | | |
1922 | | // We treat negative indices to refer backwards from the end of the string. |
1923 | 0 | const auto start = ApplyIndex(request_.get_range_request().start(), len); |
1924 | 0 | auto end = ApplyIndex(exclusive_end, len); |
1925 | 0 | if (end < start) { |
1926 | 0 | end = start; |
1927 | 0 | } |
1928 | |
|
1929 | 0 | response_.set_code(RedisResponsePB::OK); |
1930 | 0 | response_.set_string_response(value->value.c_str() + start, end - start); |
1931 | 0 | return Status::OK(); |
1932 | 0 | } |
1933 | | |
1934 | 297 | Status RedisReadOperation::ExecuteKeys() { |
1935 | 297 | iterator_->Seek(DocKey()); |
1936 | 297 | int threshold = request_.keys_request().threshold(); |
1937 | | |
1938 | 297 | bool doc_found; |
1939 | 297 | SubDocument result; |
1940 | | |
1941 | 240k | while (iterator_->valid()) { |
1942 | 240k | if (deadline_info_.get_ptr() && deadline_info_->CheckAndSetDeadlinePassed()) { |
1943 | 0 | return STATUS(Expired, "Deadline for query passed."); |
1944 | 0 | } |
1945 | 240k | auto key = VERIFY_RESULT(iterator_->FetchKey()).key; |
1946 | | |
1947 | | // Key could be invalidated because we could move iterator, so back it up. |
1948 | 240k | KeyBytes key_copy(key); |
1949 | 240k | key = key_copy.AsSlice(); |
1950 | | |
1951 | 240k | DocKey doc_key; |
1952 | 240k | RETURN_NOT_OK(doc_key.FullyDecodeFrom(key)); |
1953 | 240k | const PrimitiveValue& key_primitive = doc_key.hashed_group().front(); |
1954 | 240k | if (!key_primitive.IsString() || |
1955 | 240k | !RedisPatternMatch(request_.keys_request().pattern(), |
1956 | 240k | key_primitive.GetString(), |
1957 | 146k | false /* ignore_case */)) { |
1958 | 146k | iterator_->SeekOutOfSubDoc(key); |
1959 | 146k | continue; |
1960 | 146k | } |
1961 | | |
1962 | 93.7k | GetRedisSubDocumentData data = {key, &result, &doc_found}; |
1963 | 93.7k | data.deadline_info = deadline_info_.get_ptr(); |
1964 | 93.7k | data.return_type_only = true; |
1965 | 93.7k | RETURN_NOT_OK(GetRedisSubDocument(iterator_.get(), data, /* projection */ nullptr, |
1966 | 93.7k | SeekFwdSuffices::kFalse)); |
1967 | | |
1968 | 93.7k | if (doc_found) { |
1969 | 93.7k | if (--threshold < 0) { |
1970 | 3 | response_.clear_array_response(); |
1971 | 3 | response_.set_code(RedisResponsePB::SERVER_ERROR); |
1972 | 3 | response_.set_error_message("Too many keys in the database."); |
1973 | 3 | return Status::OK(); |
1974 | 3 | } |
1975 | 93.7k | RETURN_NOT_OK(AddPrimitiveValueToResponseArray(key_primitive, |
1976 | 93.7k | response_.mutable_array_response())); |
1977 | 93.7k | } |
1978 | 93.7k | iterator_->SeekOutOfSubDoc(key); |
1979 | 93.7k | } |
1980 | | |
1981 | 294 | response_.set_code(RedisResponsePB::OK); |
1982 | 294 | return Status::OK(); |
1983 | 297 | } |
1984 | | |
1985 | 42.9k | const RedisResponsePB& RedisReadOperation::response() { |
1986 | 42.9k | return response_; |
1987 | 42.9k | } |
1988 | | |
1989 | | } // namespace docdb |
1990 | | } // namespace yb |