YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/docdb/redis_operation.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include "yb/docdb/redis_operation.h"
15
16
#include "yb/docdb/doc_key.h"
17
#include "yb/docdb/doc_path.h"
18
#include "yb/docdb/doc_reader_redis.h"
19
#include "yb/docdb/doc_ttl_util.h"
20
#include "yb/docdb/doc_write_batch.h"
21
#include "yb/docdb/doc_write_batch_cache.h"
22
#include "yb/docdb/docdb_rocksdb_util.h"
23
#include "yb/docdb/subdocument.h"
24
25
#include "yb/server/hybrid_clock.h"
26
27
#include "yb/util/redis_util.h"
28
#include "yb/util/status_format.h"
29
#include "yb/util/stol_utils.h"
30
31
DEFINE_bool(emulate_redis_responses,
32
    true,
33
    "If emulate_redis_responses is false, we hope to get slightly better performance by just "
34
    "returning OK for commands that might require us to read additional records viz. SADD, HSET, "
35
    "and HDEL. If emulate_redis_responses is true, we read the required records to compute the "
36
    "response as specified by the official Redis API documentation. https://redis.io/commands");
37
38
namespace yb {
39
namespace docdb {
40
41
// A simple conversion from RedisDataTypes to ValueTypes
42
// Note: May run into issues if we want to support ttl on individual set elements,
43
// as they are represented by ValueType::kNullLow.
44
18
ValueType ValueTypeFromRedisType(RedisDataType dt) {
45
18
  switch(dt) {
46
15
  case RedisDataType::REDIS_TYPE_STRING:
47
15
    return ValueType::kString;
48
0
  case RedisDataType::REDIS_TYPE_SET:
49
0
    return ValueType::kRedisSet;
50
0
  case RedisDataType::REDIS_TYPE_HASH:
51
0
    return ValueType::kObject;
52
3
  case RedisDataType::REDIS_TYPE_SORTEDSET:
53
3
    return ValueType::kRedisSortedSet;
54
0
  case RedisDataType::REDIS_TYPE_TIMESERIES:
55
0
    return ValueType::kRedisTS;
56
0
  case RedisDataType::REDIS_TYPE_LIST:
57
0
    return ValueType::kRedisList;
58
0
  default:
59
0
    return ValueType::kInvalid;
60
18
  }
61
18
}
62
63
Status RedisWriteOperation::GetDocPaths(
64
61.5k
    GetDocPathsMode mode, DocPathsToLock* paths, IsolationLevel *level) const {
65
61.5k
  paths->push_back(DocKey::FromRedisKey(
66
61.5k
      request_.key_value().hash_code(), request_.key_value().key()).EncodeAsRefCntPrefix());
67
61.5k
  *level = IsolationLevel::SNAPSHOT_ISOLATION;
68
69
61.5k
  return Status::OK();
70
61.5k
}
71
72
namespace {
73
74
51
bool EmulateRedisResponse(const RedisDataType& data_type) {
75
51
  return FLAGS_emulate_redis_responses && data_type != REDIS_TYPE_TIMESERIES;
76
51
}
77
78
static const string wrong_type_message =
79
    "WRONGTYPE Operation against a key holding the wrong kind of value";
80
81
CHECKED_STATUS PrimitiveValueFromSubKey(const RedisKeyValueSubKeyPB &subkey_pb,
82
45.4k
                                        PrimitiveValue *primitive_value) {
83
45.4k
  switch (subkey_pb.subkey_case()) {
84
29.7k
    case RedisKeyValueSubKeyPB::kStringSubkey:
85
29.7k
      *primitive_value = PrimitiveValue(subkey_pb.string_subkey());
86
29.7k
      break;
87
15.7k
    case RedisKeyValueSubKeyPB::kTimestampSubkey:
88
      // We use descending order for the timestamp in the timeseries type so that the latest
89
      // value sorts on top.
90
15.7k
      *primitive_value = PrimitiveValue(subkey_pb.timestamp_subkey(), SortOrder::kDescending);
91
15.7k
      break;
92
0
    case RedisKeyValueSubKeyPB::kDoubleSubkey: {
93
0
      *primitive_value = PrimitiveValue::Double(subkey_pb.double_subkey());
94
0
      break;
95
0
    }
96
0
    default:
97
0
      return STATUS_SUBSTITUTE(IllegalState, "Invalid enum value $0", subkey_pb.subkey_case());
98
45.4k
  }
99
45.4k
  return Status::OK();
100
45.4k
}
101
102
// Stricter version of the above when we know the exact datatype to expect.
103
CHECKED_STATUS PrimitiveValueFromSubKeyStrict(const RedisKeyValueSubKeyPB &subkey_pb,
104
                                              const RedisDataType &data_type,
105
44.8k
                                              PrimitiveValue *primitive_value) {
106
44.8k
  switch (data_type) {
107
0
    case REDIS_TYPE_LIST: FALLTHROUGH_INTENDED;
108
0
    case REDIS_TYPE_SET: FALLTHROUGH_INTENDED;
109
29.7k
    case REDIS_TYPE_HASH:
110
29.7k
      if (!subkey_pb.has_string_subkey()) {
111
0
        return STATUS_SUBSTITUTE(InvalidArgument, "subkey: $0 should be of string type",
112
0
                                 subkey_pb.ShortDebugString());
113
0
      }
114
29.7k
      break;
115
15.1k
    case REDIS_TYPE_TIMESERIES:
116
15.1k
      if (!subkey_pb.has_timestamp_subkey()) {
117
0
        return STATUS_SUBSTITUTE(InvalidArgument, "subkey: $0 should be of int64 type",
118
0
                                 subkey_pb.ShortDebugString());
119
0
      }
120
15.1k
      break;
121
0
    case REDIS_TYPE_SORTEDSET:
122
0
      if (!subkey_pb.has_double_subkey()) {
123
0
        return STATUS_SUBSTITUTE(InvalidArgument, "subkey: $0 should be of double type",
124
0
                             subkey_pb.ShortDebugString());
125
0
      }
126
0
      break;
127
0
    default:
128
0
      return STATUS_SUBSTITUTE(IllegalState, "Invalid enum value $0", data_type);
129
44.8k
  }
130
44.8k
  return PrimitiveValueFromSubKey(subkey_pb, primitive_value);
131
44.8k
}
132
133
Result<RedisDataType> GetRedisValueType(
134
    IntentAwareIterator* iterator,
135
    const RedisKeyValuePB &key_value_pb,
136
    DocWriteBatch* doc_write_batch = nullptr,
137
    int subkey_index = kNilSubkeyIndex,
138
57.6k
    bool always_override = false) {
139
57.6k
  if (!key_value_pb.has_key()) {
140
0
    return STATUS(Corruption, "Expected KeyValuePB");
141
0
  }
142
57.6k
  KeyBytes encoded_subdoc_key;
143
57.6k
  if (subkey_index == kNilSubkeyIndex) {
144
57.6k
    encoded_subdoc_key = DocKey::EncodedFromRedisKey(key_value_pb.hash_code(), key_value_pb.key());
145
0
  } else {
146
0
    if (subkey_index >= key_value_pb.subkey_size()) {
147
0
      return STATUS_SUBSTITUTE(InvalidArgument,
148
0
                               "Size of subkeys ($0) must be larger than subkey_index ($1)",
149
0
                               key_value_pb.subkey_size(), subkey_index);
150
0
    }
151
152
0
    PrimitiveValue subkey_primitive;
153
0
    RETURN_NOT_OK(PrimitiveValueFromSubKey(key_value_pb.subkey(subkey_index), &subkey_primitive));
154
0
    encoded_subdoc_key = DocKey::EncodedFromRedisKey(key_value_pb.hash_code(), key_value_pb.key());
155
0
    subkey_primitive.AppendToKey(&encoded_subdoc_key);
156
0
  }
157
57.6k
  SubDocument doc;
158
57.6k
  bool doc_found = false;
159
  // Use the cached entry if possible to determine the value type.
160
57.6k
  boost::optional<DocWriteBatchCache::Entry> cached_entry;
161
57.6k
  if (doc_write_batch) {
162
20.5k
    cached_entry = doc_write_batch->LookupCache(encoded_subdoc_key);
163
20.5k
  }
164
165
57.6k
  if (cached_entry) {
166
0
    doc_found = true;
167
0
    doc = SubDocument(cached_entry->value_type);
168
57.6k
  } else {
169
    // TODO(dtxn) - pass correct transaction context when we implement cross-shard transactions
170
    // support for Redis.
171
57.6k
    GetRedisSubDocumentData data = { encoded_subdoc_key, &doc, &doc_found };
172
57.6k
    data.return_type_only = true;
173
57.6k
    data.exp.always_override = always_override;
174
57.6k
    RETURN_NOT_OK(GetRedisSubDocument(iterator, data, /* projection */ nullptr,
175
57.6k
                                 SeekFwdSuffices::kFalse));
176
57.6k
  }
177
178
57.6k
  if (!doc_found) {
179
22.5k
    return REDIS_TYPE_NONE;
180
22.5k
  }
181
182
35.1k
  switch (doc.value_type()) {
183
0
    case ValueType::kInvalid: FALLTHROUGH_INTENDED;
184
0
    case ValueType::kTombstone:
185
0
      return REDIS_TYPE_NONE;
186
0
    case ValueType::kObject:
187
0
      return REDIS_TYPE_HASH;
188
0
    case ValueType::kRedisSet:
189
0
      return REDIS_TYPE_SET;
190
627
    case ValueType::kRedisTS:
191
627
      return REDIS_TYPE_TIMESERIES;
192
51
    case ValueType::kRedisSortedSet:
193
51
      return REDIS_TYPE_SORTEDSET;
194
0
    case ValueType::kRedisList:
195
0
      return REDIS_TYPE_LIST;
196
0
    case ValueType::kNullLow: FALLTHROUGH_INTENDED; // This value is a set member.
197
34.4k
    case ValueType::kString:
198
34.4k
      return REDIS_TYPE_STRING;
199
0
    default:
200
0
      return STATUS_FORMAT(Corruption,
201
35.1k
                           "Unknown value type for redis record: $0",
202
35.1k
                           static_cast<char>(doc.value_type()));
203
35.1k
  }
204
35.1k
}
205
206
Result<RedisValue> GetRedisValue(
207
    IntentAwareIterator* iterator,
208
    const RedisKeyValuePB &key_value_pb,
209
    int subkey_index = kNilSubkeyIndex,
210
    bool always_override = false,
211
37.0k
    Expiration* exp = nullptr) {
212
37.0k
  if (!key_value_pb.has_key()) {
213
0
    return STATUS(Corruption, "Expected KeyValuePB");
214
0
  }
215
37.0k
  auto encoded_doc_key = DocKey::EncodedFromRedisKey(key_value_pb.hash_code(), key_value_pb.key());
216
217
37.0k
  if (!key_value_pb.subkey().empty()) {
218
609
    if (key_value_pb.subkey().size() != 1 && subkey_index == kNilSubkeyIndex) {
219
0
      return STATUS_SUBSTITUTE(Corruption,
220
0
                               "Expected at most one subkey, got $0", key_value_pb.subkey().size());
221
0
    }
222
609
    PrimitiveValue subkey_primitive;
223
609
    RETURN_NOT_OK(PrimitiveValueFromSubKey(
224
609
        key_value_pb.subkey(subkey_index == kNilSubkeyIndex ? 0 : subkey_index),
225
609
        &subkey_primitive));
226
609
    subkey_primitive.AppendToKey(&encoded_doc_key);
227
609
  }
228
229
37.0k
  SubDocument doc;
230
37.0k
  bool doc_found = false;
231
232
  // TODO(dtxn) - pass correct transaction context when we implement cross-shard transactions
233
  // support for Redis.
234
37.0k
  GetRedisSubDocumentData data = { encoded_doc_key, &doc, &doc_found };
235
37.0k
  data.exp.always_override = always_override;
236
37.0k
  RETURN_NOT_OK(GetRedisSubDocument(
237
37.0k
      iterator, data, /* projection */ nullptr, SeekFwdSuffices::kFalse));
238
37.0k
  if (!doc_found) {
239
2.31k
    return RedisValue{REDIS_TYPE_NONE};
240
2.31k
  }
241
242
34.7k
  if (HasExpiredTTL(data.exp.write_ht, data.exp.ttl, iterator->read_time().read)) {
243
0
    return RedisValue{REDIS_TYPE_NONE};
244
0
  }
245
246
34.7k
  if (exp)
247
24
    *exp = data.exp;
248
249
34.7k
  if (!doc.IsPrimitive()) {
250
3
    switch (doc.value_type()) {
251
0
      case ValueType::kObject:
252
0
        return RedisValue{REDIS_TYPE_HASH};
253
0
      case ValueType::kRedisTS:
254
0
        return RedisValue{REDIS_TYPE_TIMESERIES};
255
3
      case ValueType::kRedisSortedSet:
256
3
        return RedisValue{REDIS_TYPE_SORTEDSET};
257
0
      case ValueType::kRedisSet:
258
0
        return RedisValue{REDIS_TYPE_SET};
259
0
      case ValueType::kRedisList:
260
0
        return RedisValue{REDIS_TYPE_LIST};
261
0
      default:
262
0
        return STATUS_SUBSTITUTE(IllegalState, "Invalid value type: $0",
263
34.7k
                                 static_cast<int>(doc.value_type()));
264
34.7k
    }
265
34.7k
  }
266
267
34.7k
  auto val = RedisValue{REDIS_TYPE_STRING, doc.GetString(), data.exp};
268
34.7k
  return val;
269
34.7k
}
270
271
YB_STRONGLY_TYPED_BOOL(VerifySuccessIfMissing);
272
273
// Set response based on the type match. Return whether the type matches what's expected.
274
bool VerifyTypeAndSetCode(
275
    const RedisDataType expected_type,
276
    const RedisDataType actual_type,
277
    RedisResponsePB *response,
278
74.0k
    VerifySuccessIfMissing verify_success_if_missing = VerifySuccessIfMissing::kFalse) {
279
74.0k
  if (actual_type == RedisDataType::REDIS_TYPE_NONE) {
280
4.28k
    if (verify_success_if_missing) {
281
4.28k
      response->set_code(RedisResponsePB::NIL);
282
0
    } else {
283
0
      response->set_code(RedisResponsePB::NOT_FOUND);
284
0
    }
285
4.28k
    return verify_success_if_missing;
286
4.28k
  }
287
69.7k
  if (actual_type != expected_type) {
288
6
    response->set_code(RedisResponsePB::WRONG_TYPE);
289
6
    response->set_error_message(wrong_type_message);
290
6
    return false;
291
6
  }
292
69.7k
  response->set_code(RedisResponsePB::OK);
293
69.7k
  return true;
294
69.7k
}
295
296
bool VerifyTypeAndSetCode(
297
    const docdb::ValueType expected_type,
298
    const docdb::ValueType actual_type,
299
5.47k
    RedisResponsePB *response) {
300
5.47k
  if (actual_type != expected_type) {
301
3
    response->set_code(RedisResponsePB::WRONG_TYPE);
302
3
    response->set_error_message(wrong_type_message);
303
3
    return false;
304
3
  }
305
5.47k
  response->set_code(RedisResponsePB::OK);
306
5.47k
  return true;
307
5.47k
}
308
309
CHECKED_STATUS AddPrimitiveValueToResponseArray(const PrimitiveValue& value,
310
1.97M
                                                RedisArrayPB* redis_array) {
311
1.97M
  switch (value.value_type()) {
312
1.03M
    case ValueType::kString: FALLTHROUGH_INTENDED;
313
1.03M
    case ValueType::kStringDescending: {
314
1.03M
      redis_array->add_elements(value.GetString());
315
1.03M
      return Status::OK();
316
1.03M
    }
317
0
    case ValueType::kInt64: FALLTHROUGH_INTENDED;
318
942k
    case ValueType::kInt64Descending: {
319
942k
      redis_array->add_elements(std::to_string(value.GetInt64()));
320
942k
      return Status::OK();
321
0
    }
322
15
    case ValueType::kDouble: FALLTHROUGH_INTENDED;
323
15
    case ValueType::kDoubleDescending: {
324
15
      redis_array->add_elements(std::to_string(value.GetDouble()));
325
15
      return Status::OK();
326
15
    }
327
0
    default:
328
0
      return STATUS_SUBSTITUTE(InvalidArgument, "Invalid value type: $0",
329
1.97M
                             static_cast<int>(value.value_type()));
330
1.97M
  }
331
1.97M
}
332
333
CHECKED_STATUS AddResponseValuesGeneric(const PrimitiveValue& first,
334
                                        const PrimitiveValue& second,
335
                                        RedisResponsePB* response,
336
                                        bool add_keys,
337
                                        bool add_values,
338
942k
                                        bool reverse = false) {
339
942k
  if (add_keys) {
340
942k
    RETURN_NOT_OK(AddPrimitiveValueToResponseArray(first, response->mutable_array_response()));
341
942k
  }
342
942k
  if (add_values) {
343
942k
    RETURN_NOT_OK(AddPrimitiveValueToResponseArray(second, response->mutable_array_response()));
344
942k
  }
345
942k
  return Status::OK();
346
942k
}
347
348
// Populate the response array for sorted sets range queries.
349
// first refers to the score for the given values.
350
// second refers to a subdocument where each key is a value with the given score.
351
CHECKED_STATUS AddResponseValuesSortedSets(const PrimitiveValue& first,
352
                                           const SubDocument& second,
353
                                           RedisResponsePB* response,
354
                                           bool add_keys,
355
                                           bool add_values,
356
93
                                           bool reverse = false) {
357
93
  if (reverse) {
358
0
    for (auto it = second.object_container().rbegin();
359
0
         it != second.object_container().rend();
360
0
         it++) {
361
0
      const PrimitiveValue& value = it->first;
362
0
      RETURN_NOT_OK(AddResponseValuesGeneric(value, first, response, add_values, add_keys));
363
0
    }
364
93
  } else {
365
81
    for (const auto& kv : second.object_container()) {
366
81
      const PrimitiveValue& value = kv.first;
367
81
      RETURN_NOT_OK(AddResponseValuesGeneric(value, first, response, add_values, add_keys));
368
81
    }
369
93
  }
370
93
  return Status::OK();
371
93
}
372
373
template <typename T, typename AddResponseRow>
374
CHECKED_STATUS PopulateRedisResponseFromInternal(T iter,
375
                                                 AddResponseRow add_response_row,
376
                                                 const T& iter_end,
377
                                                 RedisResponsePB *response,
378
                                                 bool add_keys,
379
                                                 bool add_values,
380
5.47k
                                                 bool reverse = false) {
381
5.47k
  response->set_allocated_array_response(new RedisArrayPB());
382
948k
  for (; iter != iter_end; iter++) {
383
942k
    RETURN_NOT_OK(add_response_row(
384
942k
        iter->first, iter->second, response, add_keys, add_values, reverse));
385
942k
  }
386
5.47k
  return Status::OK();
387
5.47k
}
redis_operation.cc:_ZN2yb5docdb12_GLOBAL__N_133PopulateRedisResponseFromInternalINSt3__116reverse_iteratorINS3_20__map_const_iteratorINS3_21__tree_const_iteratorINS3_12__value_typeINS0_14PrimitiveValueENS0_11SubDocumentEEEPNS3_11__tree_nodeISA_PvEElEEEEEEPFNS_6StatusERKS8_SK_PNS_15RedisResponsePBEbbbEEESI_T_T0_RKSP_SM_bbb
Line
Count
Source
380
30
                                                 bool reverse = false) {
381
30
  response->set_allocated_array_response(new RedisArrayPB());
382
732
  for (; iter != iter_end; iter++) {
383
702
    RETURN_NOT_OK(add_response_row(
384
702
        iter->first, iter->second, response, add_keys, add_values, reverse));
385
702
  }
386
30
  return Status::OK();
387
30
}
redis_operation.cc:_ZN2yb5docdb12_GLOBAL__N_133PopulateRedisResponseFromInternalINSt3__120__map_const_iteratorINS3_21__tree_const_iteratorINS3_12__value_typeINS0_14PrimitiveValueENS0_11SubDocumentEEEPNS3_11__tree_nodeIS9_PvEElEEEEPFNS_6StatusERKS7_SI_PNS_15RedisResponsePBEbbbEEESG_T_T0_RKSN_SK_bbb
Line
Count
Source
380
5.41k
                                                 bool reverse = false) {
381
5.41k
  response->set_allocated_array_response(new RedisArrayPB());
382
947k
  for (; iter != iter_end; iter++) {
383
941k
    RETURN_NOT_OK(add_response_row(
384
941k
        iter->first, iter->second, response, add_keys, add_values, reverse));
385
941k
  }
386
5.41k
  return Status::OK();
387
5.41k
}
Unexecuted instantiation: redis_operation.cc:_ZN2yb5docdb12_GLOBAL__N_133PopulateRedisResponseFromInternalINSt3__116reverse_iteratorINS3_20__map_const_iteratorINS3_21__tree_const_iteratorINS3_12__value_typeINS0_14PrimitiveValueENS0_11SubDocumentEEEPNS3_11__tree_nodeISA_PvEElEEEEEEPFNS_6StatusERKS8_RKS9_PNS_15RedisResponsePBEbbbEEESI_T_T0_RKSR_SO_bbb
redis_operation.cc:_ZN2yb5docdb12_GLOBAL__N_133PopulateRedisResponseFromInternalINSt3__120__map_const_iteratorINS3_21__tree_const_iteratorINS3_12__value_typeINS0_14PrimitiveValueENS0_11SubDocumentEEEPNS3_11__tree_nodeIS9_PvEElEEEEPFNS_6StatusERKS7_RKS8_PNS_15RedisResponsePBEbbbEEESG_T_T0_RKSP_SM_bbb
Line
Count
Source
380
27
                                                 bool reverse = false) {
381
27
  response->set_allocated_array_response(new RedisArrayPB());
382
120
  for (; iter != iter_end; iter++) {
383
93
    RETURN_NOT_OK(add_response_row(
384
93
        iter->first, iter->second, response, add_keys, add_values, reverse));
385
93
  }
386
27
  return Status::OK();
387
27
}
388
389
template <typename AddResponseRow>
390
CHECKED_STATUS PopulateResponseFrom(const SubDocument::ObjectContainer &key_values,
391
                                    AddResponseRow add_response_row,
392
                                    RedisResponsePB *response,
393
                                    bool add_keys,
394
                                    bool add_values,
395
5.47k
                                    bool reverse = false) {
396
5.47k
  if (reverse) {
397
30
    return PopulateRedisResponseFromInternal(key_values.rbegin(), add_response_row,
398
30
                                             key_values.rend(), response, add_keys,
399
30
                                             add_values, reverse);
400
5.44k
  } else {
401
5.44k
    return PopulateRedisResponseFromInternal(key_values.begin(),  add_response_row,
402
5.44k
                                             key_values.end(), response, add_keys,
403
5.44k
                                             add_values, reverse);
404
5.44k
  }
405
5.47k
}
redis_operation.cc:_ZN2yb5docdb12_GLOBAL__N_120PopulateResponseFromIPFNS_6StatusERKNS0_14PrimitiveValueES6_PNS_15RedisResponsePBEbbbEEES3_RKNSt3__13mapIS4_NS0_11SubDocumentENSB_4lessIS4_EENSB_9allocatorINSB_4pairIS5_SD_EEEEEET_S8_bbb
Line
Count
Source
395
5.44k
                                    bool reverse = false) {
396
5.44k
  if (reverse) {
397
30
    return PopulateRedisResponseFromInternal(key_values.rbegin(), add_response_row,
398
30
                                             key_values.rend(), response, add_keys,
399
30
                                             add_values, reverse);
400
5.41k
  } else {
401
5.41k
    return PopulateRedisResponseFromInternal(key_values.begin(),  add_response_row,
402
5.41k
                                             key_values.end(), response, add_keys,
403
5.41k
                                             add_values, reverse);
404
5.41k
  }
405
5.44k
}
redis_operation.cc:_ZN2yb5docdb12_GLOBAL__N_120PopulateResponseFromIPFNS_6StatusERKNS0_14PrimitiveValueERKNS0_11SubDocumentEPNS_15RedisResponsePBEbbbEEES3_RKNSt3__13mapIS4_S7_NSE_4lessIS4_EENSE_9allocatorINSE_4pairIS5_S7_EEEEEET_SB_bbb
Line
Count
Source
395
27
                                    bool reverse = false) {
396
27
  if (reverse) {
397
0
    return PopulateRedisResponseFromInternal(key_values.rbegin(), add_response_row,
398
0
                                             key_values.rend(), response, add_keys,
399
0
                                             add_values, reverse);
400
27
  } else {
401
27
    return PopulateRedisResponseFromInternal(key_values.begin(),  add_response_row,
402
27
                                             key_values.end(), response, add_keys,
403
27
                                             add_values, reverse);
404
27
  }
405
27
}
406
407
void SetOptionalInt(RedisDataType type, int64_t value, int64_t none_value,
408
0
                    RedisResponsePB* response) {
409
0
  if (type == RedisDataType::REDIS_TYPE_NONE) {
410
0
    response->set_code(RedisResponsePB::NIL);
411
0
    response->set_int_response(none_value);
412
0
  } else {
413
0
    response->set_code(RedisResponsePB::OK);
414
0
    response->set_int_response(value);
415
0
  }
416
0
}
417
418
0
void SetOptionalInt(RedisDataType type, int64_t value, RedisResponsePB* response) {
419
0
  SetOptionalInt(type, value, 0, response);
420
0
}
421
422
10.5k
Result<int64_t> GetCardinality(IntentAwareIterator* iterator, const RedisKeyValuePB& kv) {
423
10.5k
  auto encoded_key_card = DocKey::EncodedFromRedisKey(kv.hash_code(), kv.key());
424
10.5k
  PrimitiveValue(ValueType::kCounter).AppendToKey(&encoded_key_card);
425
10.5k
  SubDocument subdoc_card;
426
427
10.5k
  bool subdoc_card_found = false;
428
10.5k
  GetRedisSubDocumentData data = { encoded_key_card, &subdoc_card, &subdoc_card_found };
429
430
10.5k
  RETURN_NOT_OK(GetRedisSubDocument(
431
10.5k
      iterator, data, /* projection */ nullptr, SeekFwdSuffices::kFalse));
432
433
10.5k
  return subdoc_card_found ? subdoc_card.GetInt64() : 0;
434
10.5k
}
435
436
template <typename AddResponseValues>
437
CHECKED_STATUS GetAndPopulateResponseValues(
438
    IntentAwareIterator* iterator,
439
    AddResponseValues add_response_values,
440
    const GetRedisSubDocumentData& data,
441
    ValueType expected_type,
442
    const RedisReadRequestPB& request,
443
    RedisResponsePB* response,
444
5.46k
    bool add_keys, bool add_values, bool reverse) {
445
446
5.46k
  RETURN_NOT_OK(GetRedisSubDocument(
447
5.46k
      iterator, data, /* projection */ nullptr, SeekFwdSuffices::kFalse));
448
449
  // Validate and populate response.
450
5.46k
  response->set_allocated_array_response(new RedisArrayPB());
451
5.46k
  if (!(*data.doc_found)) {
452
3
    response->set_code(RedisResponsePB::NIL);
453
3
    return Status::OK();
454
3
  }
455
456
5.46k
  if (VerifyTypeAndSetCode(expected_type, data.result->value_type(), response)) {
457
5.46k
      RETURN_NOT_OK(PopulateResponseFrom(data.result->object_container(),
458
5.46k
                                         add_response_values,
459
5.46k
                                         response, add_keys, add_values, reverse));
460
5.46k
  }
461
5.46k
  return Status::OK();
462
5.46k
}
redis_operation.cc:_ZN2yb5docdb12_GLOBAL__N_128GetAndPopulateResponseValuesIPFNS_6StatusERKNS0_14PrimitiveValueERKNS0_11SubDocumentEPNS_15RedisResponsePBEbbbEEES3_PNS0_19IntentAwareIteratorET_RKNS0_23GetRedisSubDocumentDataENS0_9ValueTypeERKNS_18RedisReadRequestPBESB_bbb
Line
Count
Source
444
30
    bool add_keys, bool add_values, bool reverse) {
445
446
30
  RETURN_NOT_OK(GetRedisSubDocument(
447
30
      iterator, data, /* projection */ nullptr, SeekFwdSuffices::kFalse));
448
449
  // Validate and populate response.
450
30
  response->set_allocated_array_response(new RedisArrayPB());
451
30
  if (!(*data.doc_found)) {
452
3
    response->set_code(RedisResponsePB::NIL);
453
3
    return Status::OK();
454
3
  }
455
456
27
  if (VerifyTypeAndSetCode(expected_type, data.result->value_type(), response)) {
457
27
      RETURN_NOT_OK(PopulateResponseFrom(data.result->object_container(),
458
27
                                         add_response_values,
459
27
                                         response, add_keys, add_values, reverse));
460
27
  }
461
27
  return Status::OK();
462
27
}
redis_operation.cc:_ZN2yb5docdb12_GLOBAL__N_128GetAndPopulateResponseValuesIPFNS_6StatusERKNS0_14PrimitiveValueES6_PNS_15RedisResponsePBEbbbEEES3_PNS0_19IntentAwareIteratorET_RKNS0_23GetRedisSubDocumentDataENS0_9ValueTypeERKNS_18RedisReadRequestPBES8_bbb
Line
Count
Source
444
5.43k
    bool add_keys, bool add_values, bool reverse) {
445
446
5.43k
  RETURN_NOT_OK(GetRedisSubDocument(
447
5.43k
      iterator, data, /* projection */ nullptr, SeekFwdSuffices::kFalse));
448
449
  // Validate and populate response.
450
5.43k
  response->set_allocated_array_response(new RedisArrayPB());
451
5.43k
  if (!(*data.doc_found)) {
452
0
    response->set_code(RedisResponsePB::NIL);
453
0
    return Status::OK();
454
0
  }
455
456
5.43k
  if (VerifyTypeAndSetCode(expected_type, data.result->value_type(), response)) {
457
5.43k
      RETURN_NOT_OK(PopulateResponseFrom(data.result->object_container(),
458
5.43k
                                         add_response_values,
459
5.43k
                                         response, add_keys, add_values, reverse));
460
5.43k
  }
461
5.43k
  return Status::OK();
462
5.43k
}
463
464
// Get normalized (with respect to card) upper and lower index bounds for reverse range scans.
465
void GetNormalizedBounds(int64 low_idx, int64 high_idx, int64 card, bool reverse,
466
0
                         int64* low_idx_normalized, int64* high_idx_normalized) {
467
  // Turn negative bounds positive.
468
0
  if (low_idx < 0) {
469
0
    low_idx = card + low_idx;
470
0
  }
471
0
  if (high_idx < 0) {
472
0
    high_idx = card + high_idx;
473
0
  }
474
475
  // Index from lower to upper instead of upper to lower.
476
0
  if (reverse) {
477
0
    *low_idx_normalized = card - high_idx - 1;
478
0
    *high_idx_normalized = card - low_idx - 1;
479
0
  } else {
480
0
    *low_idx_normalized = low_idx;
481
0
    *high_idx_normalized = high_idx;
482
0
  }
483
484
  // Fit bounds to range [0, card).
485
0
  if (*low_idx_normalized < 0) {
486
0
    *low_idx_normalized = 0;
487
0
  }
488
0
  if (*high_idx_normalized >= card) {
489
0
    *high_idx_normalized = card - 1;
490
0
  }
491
0
}
492
493
} // anonymous namespace
494
495
20.5k
void RedisWriteOperation::InitializeIterator(const DocOperationApplyData& data) {
496
20.5k
  auto subdoc_key = SubDocKey(DocKey::FromRedisKey(
497
20.5k
      request_.key_value().hash_code(), request_.key_value().key()));
498
499
20.5k
  auto iter = CreateIntentAwareIterator(
500
20.5k
      data.doc_write_batch->doc_db(),
501
20.5k
      BloomFilterMode::USE_BLOOM_FILTER, subdoc_key.Encode().AsSlice(),
502
20.5k
      redis_query_id(), TransactionOperationContext(), data.deadline, data.read_time);
503
504
20.5k
  iterator_ = std::move(iter);
505
20.5k
}
506
507
61.5k
Status RedisWriteOperation::Apply(const DocOperationApplyData& data) {
508
61.5k
  switch (request_.request_case()) {
509
61.4k
    case RedisWriteRequestPB::kSetRequest:
510
61.4k
      return ApplySet(data);
511
48
    case RedisWriteRequestPB::kSetTtlRequest:
512
48
      return ApplySetTtl(data);
513
0
    case RedisWriteRequestPB::kGetsetRequest:
514
0
      return ApplyGetSet(data);
515
0
    case RedisWriteRequestPB::kAppendRequest:
516
0
      return ApplyAppend(data);
517
18
    case RedisWriteRequestPB::kDelRequest:
518
18
      return ApplyDel(data);
519
0
    case RedisWriteRequestPB::kSetRangeRequest:
520
0
      return ApplySetRange(data);
521
0
    case RedisWriteRequestPB::kIncrRequest:
522
0
      return ApplyIncr(data);
523
0
    case RedisWriteRequestPB::kPushRequest:
524
0
      return ApplyPush(data);
525
0
    case RedisWriteRequestPB::kInsertRequest:
526
0
      return ApplyInsert(data);
527
0
    case RedisWriteRequestPB::kPopRequest:
528
0
      return ApplyPop(data);
529
0
    case RedisWriteRequestPB::kAddRequest:
530
0
      return ApplyAdd(data);
531
    // TODO: Cut this short in doc_operation.
532
3
    case RedisWriteRequestPB::kNoOpRequest:
533
3
      return Status::OK();
534
0
    case RedisWriteRequestPB::REQUEST_NOT_SET: break;
535
0
  }
536
0
  return STATUS_FORMAT(Corruption, "Unsupported redis read operation: $0", request_.request_case());
537
0
}
538
539
Result<RedisDataType> RedisWriteOperation::GetValueType(
540
20.5k
    const DocOperationApplyData& data, int subkey_index) {
541
20.5k
  if (!iterator_) {
542
20.5k
    InitializeIterator(data);
543
20.5k
  }
544
20.5k
  return GetRedisValueType(
545
20.5k
      iterator_.get(), request_.key_value(), data.doc_write_batch, subkey_index);
546
20.5k
}
547
548
Result<RedisValue> RedisWriteOperation::GetValue(
549
48
    const DocOperationApplyData& data, int subkey_index, Expiration* ttl) {
550
48
  if (!iterator_) {
551
48
    InitializeIterator(data);
552
48
  }
553
48
  return GetRedisValue(iterator_.get(), request_.key_value(),
554
48
                       subkey_index, /* always_override */ false, ttl);
555
48
}
556
557
61.4k
Status RedisWriteOperation::ApplySet(const DocOperationApplyData& data) {
558
61.4k
  const RedisKeyValuePB& kv = request_.key_value();
559
61.4k
  const MonoDelta ttl = request_.set_request().has_ttl() ?
560
61.4k
      MonoDelta::FromMilliseconds(request_.set_request().ttl()) : Value::kMaxTtl;
561
61.4k
  DocPath doc_path = DocPath::DocPathFromRedisKey(kv.hash_code(), kv.key());
562
61.4k
  if (kv.subkey_size() > 0) {
563
20.4k
    RedisDataType data_type = VERIFY_RESULT(GetValueType(data));
564
20.4k
    switch (kv.type()) {
565
87
      case REDIS_TYPE_TIMESERIES: FALLTHROUGH_INTENDED;
566
9.98k
      case REDIS_TYPE_HASH: {
567
9.98k
        if (data_type != kv.type() && data_type != REDIS_TYPE_NONE) {
568
0
          response_.set_code(RedisResponsePB::WRONG_TYPE);
569
0
          response_.set_error_message(wrong_type_message);
570
0
          return Status::OK();
571
0
        }
572
9.98k
        SubDocument kv_entries = SubDocument();
573
54.5k
        for (int i = 0; i < kv.subkey_size(); i++) {
574
44.5k
          PrimitiveValue subkey_value;
575
44.5k
          RETURN_NOT_OK(PrimitiveValueFromSubKeyStrict(kv.subkey(i), kv.type(), &subkey_value));
576
44.5k
          kv_entries.SetChild(subkey_value,
577
44.5k
                              SubDocument(PrimitiveValue(kv.value(i))));
578
44.5k
        }
579
580
9.98k
        if (kv.type() == REDIS_TYPE_TIMESERIES) {
581
87
          RETURN_NOT_OK(kv_entries.ConvertToRedisTS());
582
87
        }
583
584
        // For an HSET command (which has only one subkey), we need to read the subkey to find out
585
        // if the key already existed, and return 0 or 1 accordingly. This read is unnecessary for
586
        // HMSET and TSADD.
587
9.98k
        if (kv.subkey_size() == 1 && EmulateRedisResponse(kv.type()) &&
588
0
            !request_.set_request().expect_ok_response()) {
589
0
          RedisDataType type = VERIFY_RESULT(GetValueType(data, 0));
590
          // For HSET/TSADD, we return 0 or 1 depending on if the key already existed.
591
          // If flag is false, no int response is returned.
592
0
          SetOptionalInt(type, 0, 1, &response_);
593
0
        }
594
9.98k
        if (data_type == REDIS_TYPE_NONE && kv.type() == REDIS_TYPE_TIMESERIES) {
595
          // Need to insert the document instead of extending it.
596
81
          RETURN_NOT_OK(data.doc_write_batch->InsertSubDocument(
597
81
              doc_path, kv_entries, data.read_time, data.deadline, redis_query_id(), ttl,
598
81
              Value::kInvalidUserTimestamp, false /* init_marker_ttl */));
599
9.90k
        } else {
600
9.90k
          RETURN_NOT_OK(data.doc_write_batch->ExtendSubDocument(
601
9.90k
              doc_path, kv_entries, data.read_time, data.deadline, redis_query_id(), ttl));
602
9.90k
        }
603
9.98k
        break;
604
9.98k
      }
605
10.5k
      case REDIS_TYPE_SORTEDSET: {
606
10.5k
        if (data_type != kv.type() && data_type != REDIS_TYPE_NONE) {
607
0
          response_.set_code(RedisResponsePB::WRONG_TYPE);
608
0
          response_.set_error_message(wrong_type_message);
609
0
          return Status::OK();
610
0
        }
611
612
        // The SubDocuments to be inserted for card, the forward mapping, and reverse mapping.
613
10.5k
        SubDocument kv_entries_card;
614
10.5k
        SubDocument kv_entries_forward;
615
10.5k
        SubDocument kv_entries_reverse;
616
617
        // The top level mapping.
618
10.5k
        SubDocument kv_entries;
619
620
10.5k
        int new_elements_added = 0;
621
10.5k
        int return_value = 0;
622
21.0k
        for (int i = 0; i < kv.subkey_size(); i++) {
623
          // Check whether the value is already in the document, if so delete it.
624
10.5k
          SubDocKey key_reverse = SubDocKey(DocKey::FromRedisKey(kv.hash_code(), kv.key()),
625
10.5k
                                            PrimitiveValue(ValueType::kSSReverse),
626
10.5k
                                            PrimitiveValue(kv.value(i)));
627
10.5k
          SubDocument subdoc_reverse;
628
10.5k
          bool subdoc_reverse_found = false;
629
10.5k
          auto encoded_key_reverse = key_reverse.EncodeWithoutHt();
630
10.5k
          GetRedisSubDocumentData get_data = { encoded_key_reverse, &subdoc_reverse,
631
10.5k
                                               &subdoc_reverse_found };
632
10.5k
          RETURN_NOT_OK(GetRedisSubDocument(
633
10.5k
              data.doc_write_batch->doc_db(),
634
10.5k
              get_data, redis_query_id(), TransactionOperationContext(), data.deadline,
635
10.5k
              data.read_time));
636
637
          // Flag indicating whether we should add the given entry to the sorted set.
638
10.5k
          bool should_add_entry = true;
639
          // Flag indicating whether we shoould remove an entry from the sorted set.
640
10.5k
          bool should_remove_existing_entry = false;
641
642
10.5k
          if (!subdoc_reverse_found) {
643
            // The value is not already in the document.
644
10.5k
            switch (request_.set_request().sorted_set_options().update_options()) {
645
0
              case SortedSetOptionsPB::NX: FALLTHROUGH_INTENDED;
646
10.5k
              case SortedSetOptionsPB::NONE: {
647
                // Both these options call for inserting new elements, increment return_value and
648
                // keep should_add_entry as true.
649
10.5k
                return_value++;
650
10.5k
                new_elements_added++;
651
10.5k
                break;
652
0
              }
653
0
              default: {
654
                // XX option calls for no new elements, don't increment return_value and set
655
                // should_add_entry to false.
656
0
                should_add_entry = false;
657
0
                break;
658
0
              }
659
0
            }
660
0
          } else {
661
            // The value is already in the document.
662
0
            switch (request_.set_request().sorted_set_options().update_options()) {
663
0
              case SortedSetOptionsPB::XX:
664
0
              case SortedSetOptionsPB::NONE: {
665
                // First make sure that the new score is different from the old score.
666
                // Both these options call for updating existing elements, set
667
                // should_remove_existing_entry to true, and if the CH flag is on (return both
668
                // elements changed and elements added), increment return_value.
669
0
                double score_to_remove = subdoc_reverse.GetDouble();
670
                // If incr option is set, we add the increment to the existing score.
671
0
                double score_to_add = request_.set_request().sorted_set_options().incr() ?
672
0
                    score_to_remove + kv.subkey(i).double_subkey() : kv.subkey(i).double_subkey();
673
0
                if (score_to_remove != score_to_add) {
674
0
                  should_remove_existing_entry = true;
675
0
                  if (request_.set_request().sorted_set_options().ch()) {
676
0
                    return_value++;
677
0
                  }
678
0
                }
679
0
                break;
680
0
              }
681
0
              default: {
682
                // NX option calls for only new elements, set should_add_entry to false.
683
0
                should_add_entry = false;
684
0
                break;
685
10.5k
              }
686
0
            }
687
0
          }
688
689
10.5k
          if (should_remove_existing_entry) {
690
0
            double score_to_remove = subdoc_reverse.GetDouble();
691
0
            SubDocument subdoc_forward_tombstone;
692
0
            subdoc_forward_tombstone.SetChild(PrimitiveValue(kv.value(i)),
693
0
                                              SubDocument(ValueType::kTombstone));
694
0
            kv_entries_forward.SetChild(PrimitiveValue::Double(score_to_remove),
695
0
                                        SubDocument(subdoc_forward_tombstone));
696
0
          }
697
698
10.5k
          if (should_add_entry) {
699
            // If the incr option is specified, we need insert the existing score + new score
700
            // instead of just the new score.
701
10.5k
            double score_to_add = request_.set_request().sorted_set_options().incr() ?
702
0
                kv.subkey(i).double_subkey() + subdoc_reverse.GetDouble() :
703
10.5k
                kv.subkey(i).double_subkey();
704
705
            // Add the forward mapping to the entries.
706
10.5k
            SubDocument *forward_entry =
707
10.5k
                kv_entries_forward.GetOrAddChild(PrimitiveValue::Double(score_to_add)).first;
708
10.5k
            forward_entry->SetChild(PrimitiveValue(kv.value(i)),
709
10.5k
                                    SubDocument(PrimitiveValue()));
710
711
            // Add the reverse mapping to the entries.
712
10.5k
            kv_entries_reverse.SetChild(PrimitiveValue(kv.value(i)),
713
10.5k
                                        SubDocument(PrimitiveValue::Double(score_to_add)));
714
10.5k
          }
715
10.5k
        }
716
717
10.5k
        if (new_elements_added > 0) {
718
10.5k
          int64_t card = VERIFY_RESULT(GetCardinality(iterator_.get(), kv));
719
          // Insert card + new_elements_added back into the document for the updated card.
720
10.5k
          kv_entries_card = SubDocument(PrimitiveValue(card + new_elements_added));
721
10.5k
          kv_entries.SetChild(PrimitiveValue(ValueType::kCounter), SubDocument(kv_entries_card));
722
10.5k
        }
723
724
10.5k
        if (kv_entries_forward.object_num_keys() > 0) {
725
10.5k
          kv_entries.SetChild(PrimitiveValue(ValueType::kSSForward),
726
10.5k
                              SubDocument(kv_entries_forward));
727
10.5k
        }
728
729
10.5k
        if (kv_entries_reverse.object_num_keys() > 0) {
730
10.5k
          kv_entries.SetChild(PrimitiveValue(ValueType::kSSReverse),
731
10.5k
                              SubDocument(kv_entries_reverse));
732
10.5k
        }
733
734
10.5k
        if (kv_entries.object_num_keys() > 0) {
735
10.5k
          RETURN_NOT_OK(kv_entries.ConvertToRedisSortedSet());
736
10.5k
          if (data_type == REDIS_TYPE_NONE) {
737
10.5k
                RETURN_NOT_OK(data.doc_write_batch->InsertSubDocument(
738
10.5k
                    doc_path, kv_entries, data.read_time, data.deadline, redis_query_id(), ttl));
739
0
          } else {
740
0
                RETURN_NOT_OK(data.doc_write_batch->ExtendSubDocument(
741
0
                    doc_path, kv_entries, data.read_time, data.deadline, redis_query_id(), ttl));
742
0
          }
743
10.5k
        }
744
10.5k
        response_.set_code(RedisResponsePB::OK);
745
10.5k
        response_.set_int_response(return_value);
746
10.5k
        break;
747
10.5k
    }
748
0
    case REDIS_TYPE_STRING: {
749
0
        return STATUS_SUBSTITUTE(InvalidCommand,
750
10.5k
            "Redis data type $0 in SET command should not have subkeys", kv.type());
751
10.5k
      }
752
0
      default:
753
0
        return STATUS_SUBSTITUTE(InvalidCommand,
754
40.9k
            "Redis data type $0 not supported in SET command", kv.type());
755
40.9k
    }
756
40.9k
  } else {
757
40.9k
    if (kv.type() != REDIS_TYPE_STRING) {
758
0
      return STATUS_SUBSTITUTE(InvalidCommand,
759
0
          "Redis data type for SET must be string if subkey not present, found $0", kv.type());
760
0
    }
761
40.9k
    if (kv.value_size() != 1) {
762
0
      return STATUS_SUBSTITUTE(InvalidCommand,
763
0
          "There must be only one value in SET if there is only one key, found $0",
764
0
          kv.value_size());
765
0
    }
766
40.9k
    const RedisWriteMode mode = request_.set_request().mode();
767
40.9k
    if (mode != RedisWriteMode::REDIS_WRITEMODE_UPSERT) {
768
33
      RedisDataType data_type = VERIFY_RESULT(GetValueType(data));
769
33
      if ((mode == RedisWriteMode::REDIS_WRITEMODE_INSERT && data_type != REDIS_TYPE_NONE)
770
24
          || (mode == RedisWriteMode::REDIS_WRITEMODE_UPDATE && data_type == REDIS_TYPE_NONE)) {
771
772
15
        if (request_.set_request().has_expect_ok_response() &&
773
0
            !request_.set_request().expect_ok_response()) {
774
          // For SETNX we return 0 or 1 depending on if the key already existed.
775
0
          response_.set_int_response(0);
776
0
          response_.set_code(RedisResponsePB::OK);
777
15
        } else {
778
15
          response_.set_code(RedisResponsePB::NIL);
779
15
        }
780
15
        return Status::OK();
781
15
      }
782
40.9k
    }
783
40.9k
    RETURN_NOT_OK(data.doc_write_batch->SetPrimitive(
784
40.9k
        doc_path, Value(PrimitiveValue(kv.value(0)), ttl),
785
40.9k
        data.read_time, data.deadline, redis_query_id()));
786
40.9k
  }
787
788
61.4k
  if (request_.set_request().has_expect_ok_response() &&
789
9.90k
      !request_.set_request().expect_ok_response()) {
790
    // For SETNX we return 0 or 1 depending on if the key already existed.
791
0
    response_.set_int_response(1);
792
0
  }
793
794
61.4k
  response_.set_code(RedisResponsePB::OK);
795
61.4k
  return Status::OK();
796
61.4k
}
797
798
48
Status RedisWriteOperation::ApplySetTtl(const DocOperationApplyData& data) {
799
48
  const RedisKeyValuePB& kv = request_.key_value();
800
801
  // We only support setting TTLs on top-level keys.
802
48
  if (!kv.subkey().empty()) {
803
0
    return STATUS_SUBSTITUTE(Corruption,
804
0
                             "Expected no subkeys, got $0", kv.subkey().size());
805
0
  }
806
807
48
  MonoDelta ttl;
808
48
  bool absolute_expiration = request_.set_ttl_request().has_absolute_time();
809
810
  // Handle ExpireAt
811
48
  if (absolute_expiration) {
812
0
    int64_t calc_ttl = request_.set_ttl_request().absolute_time() -
813
0
      server::HybridClock::GetPhysicalValueNanos(data.read_time.read) /
814
0
      MonoTime::kNanosecondsPerMillisecond;
815
0
    if (calc_ttl <= 0) {
816
0
      return ApplyDel(data);
817
0
    }
818
0
    ttl = MonoDelta::FromMilliseconds(calc_ttl);
819
0
  }
820
821
48
  Expiration exp;
822
48
  auto value = VERIFY_RESULT(GetValue(data, kNilSubkeyIndex, &exp));
823
824
48
  if (value.type == REDIS_TYPE_TIMESERIES) { // This command is not supported.
825
0
    return STATUS_SUBSTITUTE(InvalidCommand,
826
0
        "Redis data type $0 not supported in EXPIRE and PERSIST commands", value.type);
827
0
  }
828
829
48
  if (value.type == REDIS_TYPE_NONE) { // Key does not exist.
830
0
    VLOG(1) << "TTL cannot be set because the key does not exist";
831
24
    response_.set_int_response(0);
832
24
    return Status::OK();
833
24
  }
834
835
24
  if (!absolute_expiration && request_.set_ttl_request().ttl() == -1) { // Handle PERSIST.
836
9
    MonoDelta new_ttl = VERIFY_RESULT(exp.ComputeRelativeTtl(iterator_->read_time().read));
837
9
    if (new_ttl.IsNegative() || new_ttl == Value::kMaxTtl) {
838
6
      response_.set_int_response(0);
839
6
      return Status::OK();
840
6
    }
841
18
  }
842
843
18
  DocPath doc_path = DocPath::DocPathFromRedisKey(kv.hash_code(), kv.key());
844
18
  if (!absolute_expiration) {
845
3
    ttl = request_.set_ttl_request().ttl() == -1 ? Value::kMaxTtl :
846
15
      MonoDelta::FromMilliseconds(request_.set_ttl_request().ttl());
847
18
  }
848
849
18
  ValueType v_type = ValueTypeFromRedisType(value.type);
850
18
  if (v_type == ValueType::kInvalid)
851
0
    return STATUS(Corruption, "Invalid value type.");
852
853
18
  RETURN_NOT_OK(data.doc_write_batch->SetPrimitive(
854
18
      doc_path, Value(PrimitiveValue(v_type), ttl, Value::kInvalidUserTimestamp, Value::kTtlFlag),
855
18
      data.read_time, data.deadline, redis_query_id()));
856
0
  VLOG(2) << "Set TTL successfully to " << ttl << " for key " << kv.key();
857
18
  response_.set_int_response(1);
858
18
  return Status::OK();
859
18
}
860
861
0
Status RedisWriteOperation::ApplyGetSet(const DocOperationApplyData& data) {
862
0
  const RedisKeyValuePB& kv = request_.key_value();
863
864
0
  if (kv.value_size() != 1) {
865
0
    return STATUS_SUBSTITUTE(Corruption,
866
0
        "Getset kv should have 1 value, found $0", kv.value_size());
867
0
  }
868
869
0
  auto value = GetValue(data);
870
0
  RETURN_NOT_OK(value);
871
872
0
  if (!VerifyTypeAndSetCode(RedisDataType::REDIS_TYPE_STRING, value->type, &response_,
873
0
      VerifySuccessIfMissing::kTrue)) {
874
    // We've already set the error code in the response.
875
0
    return Status::OK();
876
0
  }
877
0
  response_.set_string_response(value->value);
878
879
0
  return data.doc_write_batch->SetPrimitive(
880
0
      DocPath::DocPathFromRedisKey(kv.hash_code(), kv.key()),
881
0
      Value(PrimitiveValue(kv.value(0))), data.read_time, data.deadline, redis_query_id());
882
0
}
883
884
0
Status RedisWriteOperation::ApplyAppend(const DocOperationApplyData& data) {
885
0
  const RedisKeyValuePB& kv = request_.key_value();
886
887
0
  if (kv.value_size() != 1) {
888
0
    return STATUS_SUBSTITUTE(Corruption,
889
0
        "Append kv should have 1 value, found $0", kv.value_size());
890
0
  }
891
892
0
  auto value = GetValue(data);
893
0
  RETURN_NOT_OK(value);
894
895
0
  if (!VerifyTypeAndSetCode(RedisDataType::REDIS_TYPE_STRING, value->type, &response_,
896
0
                            VerifySuccessIfMissing::kTrue)) {
897
    // We've already set the error code in the response.
898
0
    return Status::OK();
899
0
  }
900
901
0
  value->value += kv.value(0);
902
903
0
  response_.set_code(RedisResponsePB::OK);
904
0
  response_.set_int_response(value->value.length());
905
906
  // TODO: update the TTL with the write time rather than read time,
907
  // or store the expiration.
908
0
  return data.doc_write_batch->SetPrimitive(
909
0
      DocPath::DocPathFromRedisKey(kv.hash_code(), kv.key()),
910
0
      Value(PrimitiveValue(value->value),
911
0
            VERIFY_RESULT(value->exp.ComputeRelativeTtl(iterator_->read_time().read))),
912
0
      data.read_time,
913
0
      data.deadline,
914
0
      redis_query_id());
915
0
}
916
917
// TODO (akashnil): Actually check if the value existed, return 0 if not. handle multidel in future.
918
//                  See ENG-807
919
18
Status RedisWriteOperation::ApplyDel(const DocOperationApplyData& data) {
920
18
  const RedisKeyValuePB& kv = request_.key_value();
921
18
  RedisDataType data_type = VERIFY_RESULT(GetValueType(data));
922
18
  if (data_type != REDIS_TYPE_NONE && data_type != kv.type() && kv.type() != REDIS_TYPE_NONE) {
923
0
    response_.set_code(RedisResponsePB::WRONG_TYPE);
924
0
    response_.set_error_message(wrong_type_message);
925
0
    return Status::OK();
926
0
  }
927
928
18
  SubDocument values;
929
  // Number of distinct keys being removed.
930
18
  int num_keys = 0;
931
18
  switch (kv.type()) {
932
9
    case REDIS_TYPE_NONE: {
933
9
      values = SubDocument(ValueType::kTombstone);
934
9
      num_keys = data_type == REDIS_TYPE_NONE ? 0 : 1;
935
9
      break;
936
0
    }
937
9
    case REDIS_TYPE_TIMESERIES: {
938
9
      if (data_type == REDIS_TYPE_NONE) {
939
3
        return Status::OK();
940
3
      }
941
309
      for (int i = 0; i < kv.subkey_size(); i++) {
942
303
        PrimitiveValue primitive_value;
943
303
        RETURN_NOT_OK(PrimitiveValueFromSubKeyStrict(kv.subkey(i), data_type, &primitive_value));
944
303
        values.SetChild(primitive_value, SubDocument(ValueType::kTombstone));
945
303
      }
946
6
      num_keys = kv.subkey_size();
947
6
      break;
948
6
    }
949
0
    case REDIS_TYPE_SORTEDSET: {
950
0
      SubDocument values_card;
951
0
      SubDocument values_forward;
952
0
      SubDocument values_reverse;
953
0
      num_keys = kv.subkey_size();
954
0
      for (int i = 0; i < kv.subkey_size(); i++) {
955
        // Check whether the value is already in the document.
956
0
        SubDocument doc_reverse;
957
0
        bool doc_reverse_found = false;
958
0
        SubDocKey subdoc_key_reverse = SubDocKey(DocKey::FromRedisKey(kv.hash_code(), kv.key()),
959
0
                                                 PrimitiveValue(ValueType::kSSReverse),
960
0
                                                 PrimitiveValue(kv.subkey(i).string_subkey()));
961
        // Todo(Rahul): Add values to the write batch cache and then do an additional check.
962
        // As of now, we only check to see if a value is in rocksdb, and we should also check
963
        // the write batch.
964
0
        auto encoded_subdoc_key_reverse = subdoc_key_reverse.EncodeWithoutHt();
965
0
        GetRedisSubDocumentData get_data = { encoded_subdoc_key_reverse, &doc_reverse,
966
0
                                             &doc_reverse_found };
967
0
        RETURN_NOT_OK(GetRedisSubDocument(
968
0
            data.doc_write_batch->doc_db(),
969
0
            get_data, redis_query_id(), TransactionOperationContext(), data.deadline,
970
0
            data.read_time));
971
0
        if (doc_reverse_found && doc_reverse.value_type() != ValueType::kTombstone) {
972
          // The value is already in the doc, needs to be removed.
973
0
          values_reverse.SetChild(PrimitiveValue(kv.subkey(i).string_subkey()),
974
0
                          SubDocument(ValueType::kTombstone));
975
          // For sorted sets, the forward mapping also needs to be deleted.
976
0
          SubDocument doc_forward;
977
0
          doc_forward.SetChild(PrimitiveValue(kv.subkey(i).string_subkey()),
978
0
                               SubDocument(ValueType::kTombstone));
979
0
          values_forward.SetChild(PrimitiveValue::Double(doc_reverse.GetDouble()),
980
0
                          SubDocument(doc_forward));
981
0
        } else {
982
          // If the key is absent, it doesn't contribute to the count of keys being deleted.
983
0
          num_keys--;
984
0
        }
985
0
      }
986
0
      int64_t card = VERIFY_RESULT(GetCardinality(iterator_.get(), kv));
987
      // The new cardinality is card - num_keys.
988
0
      values_card = SubDocument(PrimitiveValue(card - num_keys));
989
990
0
      values.SetChild(PrimitiveValue(ValueType::kCounter), SubDocument(values_card));
991
0
      values.SetChild(PrimitiveValue(ValueType::kSSForward), SubDocument(values_forward));
992
0
      values.SetChild(PrimitiveValue(ValueType::kSSReverse), SubDocument(values_reverse));
993
994
0
      break;
995
0
    }
996
0
    default: {
997
0
      num_keys = kv.subkey_size(); // We know the subkeys are distinct.
998
      // Avoid reads for redis timeseries type.
999
0
      if (EmulateRedisResponse(kv.type())) {
1000
0
        for (int i = 0; i < kv.subkey_size(); i++) {
1001
0
          RedisDataType type = VERIFY_RESULT(GetValueType(data, i));
1002
0
          if (type == REDIS_TYPE_STRING) {
1003
0
            values.SetChild(PrimitiveValue(kv.subkey(i).string_subkey()),
1004
0
                            SubDocument(ValueType::kTombstone));
1005
0
          } else {
1006
            // If the key is absent, it doesn't contribute to the count of keys being deleted.
1007
0
            num_keys--;
1008
0
          }
1009
0
        }
1010
0
      }
1011
0
      break;
1012
15
    }
1013
15
  }
1014
1015
15
  if (num_keys != 0) {
1016
15
    DocPath doc_path = DocPath::DocPathFromRedisKey(kv.hash_code(), kv.key());
1017
15
    RETURN_NOT_OK(data.doc_write_batch->ExtendSubDocument(doc_path, values,
1018
15
        data.read_time, data.deadline, redis_query_id()));
1019
15
  }
1020
1021
15
  response_.set_code(RedisResponsePB::OK);
1022
15
  if (EmulateRedisResponse(kv.type())) {
1023
    // If the flag is true, we respond with the number of keys actually being deleted. We don't
1024
    // report this number for the redis timeseries type to avoid reads.
1025
9
    response_.set_int_response(num_keys);
1026
9
  }
1027
15
  return Status::OK();
1028
15
}
1029
1030
0
Status RedisWriteOperation::ApplySetRange(const DocOperationApplyData& data) {
1031
0
  const RedisKeyValuePB& kv = request_.key_value();
1032
0
  if (kv.value_size() != 1) {
1033
0
    return STATUS_SUBSTITUTE(Corruption,
1034
0
        "SetRange kv should have 1 value, found $0", kv.value_size());
1035
0
  }
1036
1037
0
  auto value = GetValue(data);
1038
0
  RETURN_NOT_OK(value);
1039
1040
0
  if (!VerifyTypeAndSetCode(RedisDataType::REDIS_TYPE_STRING, value->type, &response_,
1041
0
        VerifySuccessIfMissing::kTrue)) {
1042
    // We've already set the error code in the response.
1043
0
    return Status::OK();
1044
0
  }
1045
1046
0
  size_t set_range_offset = request_.set_range_request().offset();
1047
0
  if (set_range_offset > value->value.length()) {
1048
0
    value->value.resize(set_range_offset, 0);
1049
0
  }
1050
0
  value->value.replace(set_range_offset, kv.value(0).length(), kv.value(0));
1051
0
  response_.set_code(RedisResponsePB::OK);
1052
0
  response_.set_int_response(value->value.length());
1053
1054
  // TODO: update the TTL with the write time rather than read time,
1055
  // or store the expiration.
1056
0
  Value new_val = Value(PrimitiveValue(value->value),
1057
0
        VERIFY_RESULT(value->exp.ComputeRelativeTtl(iterator_->read_time().read)));
1058
0
  return data.doc_write_batch->SetPrimitive(
1059
0
      DocPath::DocPathFromRedisKey(kv.hash_code(), kv.key()), new_val, std::move(iterator_));
1060
0
}
1061
1062
0
Status RedisWriteOperation::ApplyIncr(const DocOperationApplyData& data) {
1063
0
  const RedisKeyValuePB& kv = request_.key_value();
1064
0
  const int64_t incr = request_.incr_request().increment_int();
1065
1066
0
  if (kv.type() != REDIS_TYPE_HASH && kv.type() != REDIS_TYPE_STRING) {
1067
0
    return STATUS_SUBSTITUTE(InvalidCommand,
1068
0
                             "Redis data type $0 not supported in Incr command", kv.type());
1069
0
  }
1070
1071
0
  RedisDataType container_type = VERIFY_RESULT(GetValueType(data));
1072
0
  if (!VerifyTypeAndSetCode(kv.type(), container_type, &response_,
1073
0
                            VerifySuccessIfMissing::kTrue)) {
1074
    // We've already set the error code in the response.
1075
0
    return Status::OK();
1076
0
  }
1077
1078
0
  int subkey = (kv.type() == REDIS_TYPE_HASH ? 0 : -1);
1079
0
  auto value = GetValue(data, subkey);
1080
0
  RETURN_NOT_OK(value);
1081
1082
0
  if (!VerifyTypeAndSetCode(RedisDataType::REDIS_TYPE_STRING, value->type, &response_,
1083
0
      VerifySuccessIfMissing::kTrue)) {
1084
0
    response_.set_code(RedisResponsePB::WRONG_TYPE);
1085
0
    response_.set_error_message(wrong_type_message);
1086
0
    return Status::OK();
1087
0
  }
1088
1089
  // If no value is present, 0 is the default.
1090
0
  int64_t old_value = 0, new_value;
1091
0
  if (value->type != REDIS_TYPE_NONE) {
1092
0
    auto old = CheckedStoll(value->value);
1093
0
    if (!old.ok()) {
1094
      // This can happen if there are leading or trailing spaces, or the value
1095
      // is out of range.
1096
0
      response_.set_code(RedisResponsePB::WRONG_TYPE);
1097
0
      response_.set_error_message("ERR value is not an integer or out of range");
1098
0
      return Status::OK();
1099
0
    }
1100
0
    old_value = *old;
1101
0
  }
1102
1103
0
  if ((incr < 0 && old_value < 0 && incr < numeric_limits<int64_t>::min() - old_value) ||
1104
0
      (incr > 0 && old_value > 0 && incr > numeric_limits<int64_t>::max() - old_value)) {
1105
0
    response_.set_code(RedisResponsePB::WRONG_TYPE);
1106
0
    response_.set_error_message("Increment would overflow");
1107
0
    return Status::OK();
1108
0
  }
1109
0
  new_value = old_value + incr;
1110
0
  response_.set_code(RedisResponsePB::OK);
1111
0
  response_.set_int_response(new_value);
1112
1113
0
  DocPath doc_path = DocPath::DocPathFromRedisKey(kv.hash_code(), kv.key());
1114
0
  PrimitiveValue new_pvalue = PrimitiveValue(std::to_string(new_value));
1115
0
  if (kv.type() == REDIS_TYPE_HASH) {
1116
0
    SubDocument kv_entries = SubDocument();
1117
0
    PrimitiveValue subkey_value;
1118
0
    RETURN_NOT_OK(PrimitiveValueFromSubKeyStrict(kv.subkey(0), kv.type(), &subkey_value));
1119
0
    kv_entries.SetChild(subkey_value, SubDocument(new_pvalue));
1120
0
    return data.doc_write_batch->ExtendSubDocument(
1121
0
        doc_path, kv_entries, data.read_time, data.deadline, redis_query_id());
1122
0
  } else {  // kv.type() == REDIS_TYPE_STRING
1123
    // TODO: update the TTL with the write time rather than read time,
1124
    // or store the expiration.
1125
0
    Value new_val = Value(new_pvalue,
1126
0
        VERIFY_RESULT(value->exp.ComputeRelativeTtl(iterator_->read_time().read)));
1127
0
    return data.doc_write_batch->SetPrimitive(doc_path, new_val, std::move(iterator_));
1128
0
  }
1129
0
}
1130
1131
0
Status RedisWriteOperation::ApplyPush(const DocOperationApplyData& data) {
1132
0
  const RedisKeyValuePB& kv = request_.key_value();
1133
0
  DocPath doc_path = DocPath::DocPathFromRedisKey(kv.hash_code(), kv.key());
1134
0
  RedisDataType data_type = VERIFY_RESULT(GetValueType(data));
1135
0
  if (data_type != REDIS_TYPE_LIST && data_type != REDIS_TYPE_NONE) {
1136
0
    response_.set_code(RedisResponsePB::WRONG_TYPE);
1137
0
    response_.set_error_message(wrong_type_message);
1138
0
    return Status::OK();
1139
0
  }
1140
1141
0
  SubDocument list;
1142
0
  int64_t card = VERIFY_RESULT(GetCardinality(iterator_.get(), kv)) + kv.value_size();
1143
0
  list.SetChild(PrimitiveValue(ValueType::kCounter), SubDocument(PrimitiveValue(card)));
1144
1145
0
  SubDocument elements(request_.push_request().side() == REDIS_SIDE_LEFT ?
1146
0
                   ListExtendOrder::PREPEND : ListExtendOrder::APPEND);
1147
0
  for (auto val : kv.value()) {
1148
0
    elements.AddListElement(SubDocument(PrimitiveValue(val)));
1149
0
  }
1150
0
  list.SetChild(PrimitiveValue(ValueType::kArray), std::move(elements));
1151
0
  RETURN_NOT_OK(list.ConvertToRedisList());
1152
1153
0
  if (data_type == REDIS_TYPE_NONE) {
1154
0
    RETURN_NOT_OK(data.doc_write_batch->InsertSubDocument(
1155
0
        DocPath::DocPathFromRedisKey(kv.hash_code(), kv.key()), list,
1156
0
        data.read_time, data.deadline, redis_query_id()));
1157
0
  } else {
1158
0
    RETURN_NOT_OK(data.doc_write_batch->ExtendSubDocument(
1159
0
        DocPath::DocPathFromRedisKey(kv.hash_code(), kv.key()), list,
1160
0
        data.read_time, data.deadline, redis_query_id()));
1161
0
  }
1162
1163
0
  response_.set_int_response(card);
1164
0
  response_.set_code(RedisResponsePB::OK);
1165
0
  return Status::OK();
1166
0
}
1167
1168
0
Status RedisWriteOperation::ApplyInsert(const DocOperationApplyData& data) {
1169
0
  return STATUS(NotSupported, "Redis operation has not been implemented");
1170
0
}
1171
1172
0
Status RedisWriteOperation::ApplyPop(const DocOperationApplyData& data) {
1173
0
  const RedisKeyValuePB& kv = request_.key_value();
1174
0
  DocPath doc_path = DocPath::DocPathFromRedisKey(kv.hash_code(), kv.key());
1175
0
  RedisDataType data_type = VERIFY_RESULT(GetValueType(data));
1176
1177
0
  if (!VerifyTypeAndSetCode(kv.type(), data_type, &response_, VerifySuccessIfMissing::kTrue)) {
1178
    // We already set the error code in the function.
1179
0
    return Status::OK();
1180
0
  }
1181
1182
0
  SubDocument list;
1183
0
  int64_t card = VERIFY_RESULT(GetCardinality(iterator_.get(), kv));
1184
1185
0
  if (!card) {
1186
0
    response_.set_code(RedisResponsePB::NIL);
1187
0
    return Status::OK();
1188
0
  }
1189
1190
0
  std::vector<int64_t> indices;
1191
0
  std::vector<SubDocument> new_value = {SubDocument(PrimitiveValue(ValueType::kTombstone))};
1192
0
  std::vector<std::string> value;
1193
1194
0
  if (request_.pop_request().side() == REDIS_SIDE_LEFT) {
1195
0
    indices.push_back(1);
1196
0
    RETURN_NOT_OK(data.doc_write_batch->ReplaceRedisInList(doc_path, indices, new_value,
1197
0
        data.read_time, data.deadline, redis_query_id(), Direction::kForward, 0, &value));
1198
0
  } else {
1199
0
    indices.push_back(card);
1200
0
    RETURN_NOT_OK(data.doc_write_batch->ReplaceRedisInList(doc_path, indices, new_value,
1201
0
        data.read_time, data.deadline, redis_query_id(), Direction::kBackward, card + 1, &value));
1202
0
  }
1203
1204
0
  list.SetChild(PrimitiveValue(ValueType::kCounter), SubDocument(PrimitiveValue(--card)));
1205
0
  RETURN_NOT_OK(list.ConvertToRedisList());
1206
0
  RETURN_NOT_OK(data.doc_write_batch->ExtendSubDocument(
1207
0
        doc_path, list, data.read_time, data.deadline, redis_query_id()));
1208
1209
0
  if (value.size() != 1)
1210
0
    return STATUS_SUBSTITUTE(Corruption,
1211
0
                             "Expected one popped value, got $0", value.size());
1212
1213
0
  response_.set_string_response(value[0]);
1214
0
  response_.set_code(RedisResponsePB::OK);
1215
0
  return Status::OK();
1216
0
}
1217
1218
0
Status RedisWriteOperation::ApplyAdd(const DocOperationApplyData& data) {
1219
0
  const RedisKeyValuePB& kv = request_.key_value();
1220
0
  RedisDataType data_type = VERIFY_RESULT(GetValueType(data));
1221
1222
0
  if (data_type != REDIS_TYPE_SET && data_type != REDIS_TYPE_NONE) {
1223
0
    response_.set_code(RedisResponsePB::WRONG_TYPE);
1224
0
    response_.set_error_message(wrong_type_message);
1225
0
    return Status::OK();
1226
0
  }
1227
1228
0
  DocPath doc_path = DocPath::DocPathFromRedisKey(kv.hash_code(), kv.key());
1229
1230
0
  if (kv.subkey_size() == 0) {
1231
0
    return STATUS(InvalidCommand, "SADD request has no subkeys set");
1232
0
  }
1233
1234
0
  int num_keys_found = 0;
1235
1236
0
  SubDocument set_entries = SubDocument();
1237
1238
0
  for (int i = 0 ; i < kv.subkey_size(); i++) { // We know that each subkey is distinct.
1239
0
    if (FLAGS_emulate_redis_responses) {
1240
0
      RedisDataType type = VERIFY_RESULT(GetValueType(data, i));
1241
0
      if (type != REDIS_TYPE_NONE) {
1242
0
        num_keys_found++;
1243
0
      }
1244
0
    }
1245
1246
0
    set_entries.SetChild(
1247
0
        PrimitiveValue(kv.subkey(i).string_subkey()),
1248
0
        SubDocument(PrimitiveValue(ValueType::kNullLow)));
1249
0
  }
1250
1251
0
  RETURN_NOT_OK(set_entries.ConvertToRedisSet());
1252
1253
0
  Status s;
1254
1255
0
  if (data_type == REDIS_TYPE_NONE) {
1256
0
    RETURN_NOT_OK(data.doc_write_batch->InsertSubDocument(
1257
0
        doc_path, set_entries, data.read_time, data.deadline, redis_query_id()));
1258
0
  } else {
1259
0
    RETURN_NOT_OK(data.doc_write_batch->ExtendSubDocument(
1260
0
        doc_path, set_entries, data.read_time, data.deadline, redis_query_id()));
1261
0
  }
1262
1263
0
  response_.set_code(RedisResponsePB::OK);
1264
0
  if (FLAGS_emulate_redis_responses) {
1265
    // If flag is set, the actual number of new keys added is sent as response.
1266
0
    response_.set_int_response(kv.subkey_size() - num_keys_found);
1267
0
  }
1268
0
  return Status::OK();
1269
0
}
1270
1271
0
Status RedisWriteOperation::ApplyRemove(const DocOperationApplyData& data) {
1272
0
  return STATUS(NotSupported, "Redis operation has not been implemented");
1273
0
}
1274
1275
42.9k
Status RedisReadOperation::Execute() {
1276
42.9k
  SimulateTimeoutIfTesting(&deadline_);
1277
  // If we have a KEYS command, we don't specify any key for the iterator. Therefore, don't use
1278
  // bloom filters for this command.
1279
42.9k
  SubDocKey doc_key(
1280
42.9k
      DocKey::FromRedisKey(request_.key_value().hash_code(), request_.key_value().key()));
1281
42.9k
  auto bloom_filter_mode = request_.has_keys_request() ?
1282
42.6k
      BloomFilterMode::DONT_USE_BLOOM_FILTER : BloomFilterMode::USE_BLOOM_FILTER;
1283
42.9k
  auto iter = yb::docdb::CreateIntentAwareIterator(
1284
42.9k
      doc_db_, bloom_filter_mode,
1285
42.9k
      doc_key.Encode().AsSlice(),
1286
42.9k
      redis_query_id(), TransactionOperationContext(), deadline_, read_time_);
1287
42.9k
  iterator_ = std::move(iter);
1288
42.9k
  deadline_info_.emplace(deadline_);
1289
1290
42.9k
  switch (request_.request_case()) {
1291
0
    case RedisReadRequestPB::kGetForRenameRequest:
1292
0
      return ExecuteGetForRename();
1293
37.0k
    case RedisReadRequestPB::kGetRequest:
1294
37.0k
      return ExecuteGet();
1295
132
    case RedisReadRequestPB::kGetTtlRequest:
1296
132
      return ExecuteGetTtl();
1297
0
    case RedisReadRequestPB::kStrlenRequest:
1298
0
      return ExecuteStrLen();
1299
0
    case RedisReadRequestPB::kExistsRequest:
1300
0
      return ExecuteExists();
1301
0
    case RedisReadRequestPB::kGetRangeRequest:
1302
0
      return ExecuteGetRange();
1303
5.47k
    case RedisReadRequestPB::kGetCollectionRangeRequest:
1304
5.47k
      return ExecuteCollectionGetRange();
1305
297
    case RedisReadRequestPB::kKeysRequest:
1306
297
      return ExecuteKeys();
1307
0
    default:
1308
0
      return STATUS_FORMAT(
1309
42.9k
          Corruption, "Unsupported redis read operation: $0", request_.request_case());
1310
42.9k
  }
1311
42.9k
}
1312
1313
namespace {
1314
1315
0
ssize_t ApplyIndex(ssize_t index, ssize_t len) {
1316
0
  if (index < 0) {
1317
0
    index += len;
1318
0
    return std::max<ssize_t>(index, 0);
1319
0
  }
1320
0
  return std::min<ssize_t>(index, len);
1321
0
}
1322
1323
} // namespace
1324
1325
Status RedisReadOperation::ExecuteHGetAllLikeCommands(ValueType value_type,
1326
                                                      bool add_keys,
1327
18
                                                      bool add_values) {
1328
18
  SubDocKey doc_key(
1329
18
      DocKey::FromRedisKey(request_.key_value().hash_code(), request_.key_value().key()));
1330
18
  SubDocument doc;
1331
18
  bool doc_found = false;
1332
18
  auto encoded_doc_key = doc_key.EncodeWithoutHt();
1333
1334
  // TODO(dtxn) - pass correct transaction context when we implement cross-shard transactions
1335
  // support for Redis.
1336
18
  GetRedisSubDocumentData data = { encoded_doc_key, &doc, &doc_found };
1337
18
  data.deadline_info = deadline_info_.get_ptr();
1338
1339
18
  bool has_cardinality_subkey = value_type == ValueType::kRedisSortedSet ||
1340
18
                                value_type == ValueType::kRedisList;
1341
18
  bool return_array_response = add_keys || add_values;
1342
1343
18
  if (has_cardinality_subkey) {
1344
0
    data.return_type_only = !return_array_response;
1345
18
  } else {
1346
18
    data.count_only = !return_array_response;
1347
18
  }
1348
1349
18
  RETURN_NOT_OK(GetRedisSubDocument(iterator_.get(), data, /* projection */ nullptr,
1350
18
                               SeekFwdSuffices::kFalse));
1351
18
  if (return_array_response)
1352
0
    response_.set_allocated_array_response(new RedisArrayPB());
1353
1354
18
  if (!doc_found) {
1355
3
    response_.set_code(RedisResponsePB::OK);
1356
3
    if (!return_array_response)
1357
3
      response_.set_int_response(0);
1358
3
    return Status::OK();
1359
3
  }
1360
1361
15
  if (VerifyTypeAndSetCode(value_type, doc.value_type(), &response_)) {
1362
12
    if (return_array_response) {
1363
0
      RETURN_NOT_OK(PopulateResponseFrom(doc.object_container(), AddResponseValuesGeneric,
1364
0
                                         &response_, add_keys, add_values));
1365
12
    } else {
1366
12
      int64_t card = has_cardinality_subkey ?
1367
12
        VERIFY_RESULT(GetCardinality(iterator_.get(), request_.key_value())) :
1368
12
        data.record_count;
1369
12
      response_.set_int_response(card);
1370
12
      response_.set_code(RedisResponsePB::OK);
1371
12
    }
1372
12
  }
1373
15
  return Status::OK();
1374
15
}
1375
1376
Status RedisReadOperation::ExecuteCollectionGetRangeByBounds(
1377
0
    RedisCollectionGetRangeRequestPB::GetRangeRequestType request_type, bool add_keys) {
1378
0
  RedisSubKeyBoundPB lower_bound;
1379
0
  lower_bound.set_infinity_type(RedisSubKeyBoundPB::NEGATIVE);
1380
0
  RedisSubKeyBoundPB upper_bound;
1381
0
  upper_bound.set_infinity_type(RedisSubKeyBoundPB::POSITIVE);
1382
0
  return ExecuteCollectionGetRangeByBounds(request_type, lower_bound, upper_bound, add_keys);
1383
0
}
1384
1385
Status RedisReadOperation::ExecuteCollectionGetRangeByBounds(
1386
    RedisCollectionGetRangeRequestPB::GetRangeRequestType request_type,
1387
5.47k
    const RedisSubKeyBoundPB& lower_bound, const RedisSubKeyBoundPB& upper_bound, bool add_keys) {
1388
5.47k
  if ((lower_bound.has_infinity_type() &&
1389
57
       lower_bound.infinity_type() == RedisSubKeyBoundPB::POSITIVE) ||
1390
5.47k
      (upper_bound.has_infinity_type() &&
1391
57
       upper_bound.infinity_type() == RedisSubKeyBoundPB::NEGATIVE)) {
1392
    // Return empty response.
1393
0
    response_.set_code(RedisResponsePB::OK);
1394
0
    RETURN_NOT_OK(PopulateResponseFrom(
1395
0
        SubDocument::ObjectContainer(), AddResponseValuesGeneric, &response_, /* add_keys */ true,
1396
0
        /* add_values */ true));
1397
0
    return Status::OK();
1398
5.47k
  }
1399
1400
5.47k
  if (request_type == RedisCollectionGetRangeRequestPB::ZRANGEBYSCORE) {
1401
42
    auto type = VERIFY_RESULT(GetValueType());
1402
42
    auto expected_type = REDIS_TYPE_SORTEDSET;
1403
42
    if (!VerifyTypeAndSetCode(expected_type, type, &response_, VerifySuccessIfMissing::kTrue)) {
1404
3
      return Status::OK();
1405
3
    }
1406
39
    auto encoded_doc_key =
1407
39
        DocKey::EncodedFromRedisKey(request_.key_value().hash_code(), request_.key_value().key());
1408
39
    PrimitiveValue(ValueType::kSSForward).AppendToKey(&encoded_doc_key);
1409
39
    double low_double = lower_bound.subkey_bound().double_subkey();
1410
39
    double high_double = upper_bound.subkey_bound().double_subkey();
1411
1412
39
    KeyBytes low_sub_key_bound;
1413
39
    KeyBytes high_sub_key_bound;
1414
1415
39
    SliceKeyBound low_subkey;
1416
39
    if (!lower_bound.has_infinity_type()) {
1417
6
      low_sub_key_bound = encoded_doc_key;
1418
6
      PrimitiveValue::Double(low_double).AppendToKey(&low_sub_key_bound);
1419
6
      low_subkey = SliceKeyBound(low_sub_key_bound, LowerBound(lower_bound.is_exclusive()));
1420
6
    }
1421
39
    SliceKeyBound high_subkey;
1422
39
    if (!upper_bound.has_infinity_type()) {
1423
6
      high_sub_key_bound = encoded_doc_key;
1424
6
      PrimitiveValue::Double(high_double).AppendToKey(&high_sub_key_bound);
1425
6
      high_subkey = SliceKeyBound(high_sub_key_bound, UpperBound(upper_bound.is_exclusive()));
1426
6
    }
1427
1428
39
    SubDocument doc;
1429
39
    bool doc_found = false;
1430
39
    GetRedisSubDocumentData data = {encoded_doc_key, &doc, &doc_found};
1431
39
    data.deadline_info = deadline_info_.get_ptr();
1432
39
    data.low_subkey = &low_subkey;
1433
39
    data.high_subkey = &high_subkey;
1434
1435
39
    IndexBound low_index;
1436
39
    IndexBound high_index;
1437
39
    if (request_.has_range_request_limit()) {
1438
21
      auto offset = request_.index_range().lower_bound().index();
1439
21
      int32_t limit = request_.range_request_limit();
1440
1441
21
      if (offset < 0 || limit == 0) {
1442
        // Return an empty response.
1443
9
        response_.set_code(RedisResponsePB::OK);
1444
9
        RETURN_NOT_OK(PopulateResponseFrom(
1445
9
            SubDocument::ObjectContainer(), AddResponseValuesGeneric, &response_,
1446
9
            /* add_keys */ true, /* add_values */ true));
1447
9
        return Status::OK();
1448
12
      }
1449
1450
12
      low_index = IndexBound(offset, true /* is_lower */);
1451
12
      data.low_index = &low_index;
1452
12
      if (limit > 0) {
1453
        // Only define upper bound if limit is positive.
1454
9
        high_index = IndexBound(offset + limit - 1, false /* is_lower */);
1455
9
        data.high_index = &high_index;
1456
9
      }
1457
12
    }
1458
30
    RETURN_NOT_OK(GetAndPopulateResponseValues(
1459
30
        iterator_.get(), AddResponseValuesSortedSets, data, ValueType::kObject, request_,
1460
30
        &response_,
1461
30
        /* add_keys */ add_keys, /* add_values */ true, /* reverse */ false));
1462
5.43k
  } else {
1463
5.43k
    auto encoded_doc_key =
1464
5.43k
        DocKey::EncodedFromRedisKey(request_.key_value().hash_code(), request_.key_value().key());
1465
5.43k
    int64_t low_timestamp = lower_bound.subkey_bound().timestamp_subkey();
1466
5.43k
    int64_t high_timestamp = upper_bound.subkey_bound().timestamp_subkey();
1467
1468
5.43k
    KeyBytes low_sub_key_bound;
1469
5.43k
    KeyBytes high_sub_key_bound;
1470
1471
5.43k
    SliceKeyBound low_subkey;
1472
    // Need to switch the order since we store the timestamps in descending order.
1473
5.43k
    if (!upper_bound.has_infinity_type()) {
1474
5.41k
      low_sub_key_bound = encoded_doc_key;
1475
5.41k
      PrimitiveValue(high_timestamp, SortOrder::kDescending).AppendToKey(&low_sub_key_bound);
1476
5.41k
      low_subkey = SliceKeyBound(low_sub_key_bound, LowerBound(upper_bound.is_exclusive()));
1477
5.41k
    }
1478
5.43k
    SliceKeyBound high_subkey;
1479
5.43k
    if (!lower_bound.has_infinity_type()) {
1480
5.41k
      high_sub_key_bound = encoded_doc_key;
1481
5.41k
      PrimitiveValue(low_timestamp, SortOrder::kDescending).AppendToKey(&high_sub_key_bound);
1482
5.41k
      high_subkey = SliceKeyBound(high_sub_key_bound, UpperBound(lower_bound.is_exclusive()));
1483
5.41k
    }
1484
1485
5.43k
    SubDocument doc;
1486
5.43k
    bool doc_found = false;
1487
5.43k
    GetRedisSubDocumentData data = {encoded_doc_key, &doc, &doc_found};
1488
5.43k
    data.deadline_info = deadline_info_.get_ptr();
1489
5.43k
    data.low_subkey = &low_subkey;
1490
5.43k
    data.high_subkey = &high_subkey;
1491
5.43k
    data.limit = request_.range_request_limit();
1492
5.43k
    bool is_reverse = true;
1493
5.43k
    if (request_type == RedisCollectionGetRangeRequestPB::TSREVRANGEBYTIME) {
1494
      // If reverse is false, newest element is the first element returned.
1495
5.40k
      is_reverse = false;
1496
5.40k
    }
1497
5.43k
    RETURN_NOT_OK(GetAndPopulateResponseValues(
1498
5.43k
        iterator_.get(), AddResponseValuesGeneric, data, ValueType::kRedisTS, request_, &response_,
1499
5.43k
        /* add_keys */ true, /* add_values */ true, is_reverse));
1500
5.43k
  }
1501
5.46k
  return Status::OK();
1502
5.47k
}
1503
1504
5.47k
Status RedisReadOperation::ExecuteCollectionGetRange() {
1505
5.47k
  const RedisKeyValuePB& key_value = request_.key_value();
1506
5.47k
  if (!request_.has_key_value() || !key_value.has_key()) {
1507
0
    return STATUS(InvalidArgument, "Need to specify the key");
1508
0
  }
1509
1510
5.47k
  const auto request_type = request_.get_collection_range_request().request_type();
1511
5.47k
  switch (request_type) {
1512
5.40k
    case RedisCollectionGetRangeRequestPB::TSREVRANGEBYTIME:
1513
5.40k
      FALLTHROUGH_INTENDED;
1514
5.44k
    case RedisCollectionGetRangeRequestPB::ZRANGEBYSCORE: FALLTHROUGH_INTENDED;
1515
5.47k
    case RedisCollectionGetRangeRequestPB::TSRANGEBYTIME: {
1516
5.47k
      if(!request_.has_subkey_range() || !request_.subkey_range().has_lower_bound() ||
1517
5.47k
          !request_.subkey_range().has_upper_bound()) {
1518
0
        return STATUS(InvalidArgument, "Need to specify the subkey range");
1519
0
      }
1520
5.47k
      const RedisSubKeyBoundPB& lower_bound = request_.subkey_range().lower_bound();
1521
5.47k
      const RedisSubKeyBoundPB& upper_bound = request_.subkey_range().upper_bound();
1522
5.47k
      const bool add_keys = request_.get_collection_range_request().with_scores();
1523
5.47k
      return ExecuteCollectionGetRangeByBounds(request_type, lower_bound, upper_bound, add_keys);
1524
5.47k
    }
1525
0
    case RedisCollectionGetRangeRequestPB::ZRANGE: FALLTHROUGH_INTENDED;
1526
0
    case RedisCollectionGetRangeRequestPB::ZREVRANGE: {
1527
0
      if(!request_.has_index_range() || !request_.index_range().has_lower_bound() ||
1528
0
          !request_.index_range().has_upper_bound()) {
1529
0
        return STATUS(InvalidArgument, "Need to specify the index range");
1530
0
      }
1531
1532
      // First make sure is of type sorted set or none.
1533
0
      RedisDataType type = VERIFY_RESULT(GetValueType());
1534
0
      auto expected_type = RedisDataType::REDIS_TYPE_SORTEDSET;
1535
0
      if (!VerifyTypeAndSetCode(expected_type, type, &response_, VerifySuccessIfMissing::kTrue)) {
1536
0
        return Status::OK();
1537
0
      }
1538
1539
0
      int64_t card = VERIFY_RESULT(GetCardinality(iterator_.get(), request_.key_value()));
1540
1541
0
      const RedisIndexBoundPB& low_index_bound = request_.index_range().lower_bound();
1542
0
      const RedisIndexBoundPB& high_index_bound = request_.index_range().upper_bound();
1543
1544
0
      int64 low_idx_normalized, high_idx_normalized;
1545
1546
0
      int64 low_idx = low_index_bound.index();
1547
0
      int64 high_idx = high_index_bound.index();
1548
      // Normalize the bounds to be positive and go from low to high index.
1549
0
      bool reverse = false;
1550
0
      if (request_type == RedisCollectionGetRangeRequestPB::ZREVRANGE) {
1551
0
        reverse = true;
1552
0
      }
1553
0
      GetNormalizedBounds(
1554
0
          low_idx, high_idx, card, reverse, &low_idx_normalized, &high_idx_normalized);
1555
1556
0
      if (high_idx_normalized < low_idx_normalized) {
1557
        // Return empty response.
1558
0
        response_.set_code(RedisResponsePB::OK);
1559
0
        RETURN_NOT_OK(PopulateResponseFrom(SubDocument::ObjectContainer(),
1560
0
                                           AddResponseValuesGeneric,
1561
0
                                           &response_, /* add_keys */
1562
0
                                           true, /* add_values */
1563
0
                                           true));
1564
0
        return Status::OK();
1565
0
      }
1566
0
      auto encoded_doc_key = DocKey::EncodedFromRedisKey(
1567
0
          request_.key_value().hash_code(), request_.key_value().key());
1568
0
      PrimitiveValue(ValueType::kSSForward).AppendToKey(&encoded_doc_key);
1569
1570
0
      bool add_keys = request_.get_collection_range_request().with_scores();
1571
1572
0
      IndexBound low_bound = IndexBound(low_idx_normalized, true /* is_lower */);
1573
0
      IndexBound high_bound = IndexBound(high_idx_normalized, false /* is_lower */);
1574
1575
0
      SubDocument doc;
1576
0
      bool doc_found = false;
1577
0
      GetRedisSubDocumentData data = { encoded_doc_key, &doc, &doc_found};
1578
0
      data.deadline_info = deadline_info_.get_ptr();
1579
0
      data.low_index = &low_bound;
1580
0
      data.high_index = &high_bound;
1581
1582
0
      RETURN_NOT_OK(GetAndPopulateResponseValues(
1583
0
          iterator_.get(), AddResponseValuesSortedSets, data, ValueType::kObject, request_,
1584
0
          &response_, add_keys, /* add_values */ true, reverse));
1585
0
      break;
1586
0
    }
1587
0
    case RedisCollectionGetRangeRequestPB::UNKNOWN:
1588
0
      return STATUS(InvalidCommand, "Unknown Collection Get Range Request not supported");
1589
0
  }
1590
1591
0
  return Status::OK();
1592
0
}
1593
1594
37.0k
Result<RedisDataType> RedisReadOperation::GetValueType(int subkey_index) {
1595
37.0k
  return GetRedisValueType(iterator_.get(), request_.key_value(),
1596
37.0k
                           nullptr /* doc_write_batch */, subkey_index);
1597
37.0k
}
1598
1599
609
Result<RedisValue> RedisReadOperation::GetOverrideValue(int subkey_index) {
1600
609
  return GetRedisValue(iterator_.get(), request_.key_value(),
1601
609
                       subkey_index, /* always_override */ true);
1602
609
}
1603
1604
36.3k
Result<RedisValue> RedisReadOperation::GetValue(int subkey_index) {
1605
36.3k
    return GetRedisValue(iterator_.get(), request_.key_value(), subkey_index);
1606
36.3k
}
1607
1608
namespace {
1609
1610
// Note: Do not use if also retrieving other value, as some work will be repeated.
1611
// Assumes every value has a TTL, and the TTL is stored in the row with this key.
1612
// Also observe that tombstone checking only works because we assume the key has
1613
// no ancestors.
1614
Result<boost::optional<Expiration>> GetTtl(
1615
132
    const Slice& encoded_subdoc_key, IntentAwareIterator* iter) {
1616
132
  auto dockey_size =
1617
132
    VERIFY_RESULT(DocKey::EncodedSize(encoded_subdoc_key, DocKeyPart::kWholeDocKey));
1618
132
  Slice key_slice(encoded_subdoc_key.data(), dockey_size);
1619
132
  iter->Seek(key_slice);
1620
132
  if (!iter->valid())
1621
6
    return boost::none;
1622
126
  auto key_data = VERIFY_RESULT(iter->FetchKey());
1623
126
  if (!key_data.key.compare(key_slice)) {
1624
126
    Value doc_value = Value(PrimitiveValue(ValueType::kInvalid));
1625
126
    RETURN_NOT_OK(doc_value.Decode(iter->value()));
1626
126
    if (doc_value.value_type() != ValueType::kTombstone) {
1627
108
      return Expiration(key_data.write_time.hybrid_time(), doc_value.ttl());
1628
108
    }
1629
18
  }
1630
18
  return boost::none;
1631
18
}
1632
1633
} // namespace
1634
1635
132
Status RedisReadOperation::ExecuteGetTtl() {
1636
132
  const RedisKeyValuePB& kv = request_.key_value();
1637
132
  if (!kv.has_key()) {
1638
0
    return STATUS(Corruption, "Expected KeyValuePB");
1639
0
  }
1640
  // We currently only support getting and setting TTL on top level keys.
1641
132
  if (!kv.subkey().empty()) {
1642
0
    return STATUS_SUBSTITUTE(Corruption,
1643
0
                             "Expected no subkeys, got $0", kv.subkey().size());
1644
0
  }
1645
1646
132
  auto encoded_doc_key = DocKey::EncodedFromRedisKey(kv.hash_code(), kv.key());
1647
132
  auto maybe_ttl_exp = VERIFY_RESULT(GetTtl(encoded_doc_key.AsSlice(), iterator_.get()));
1648
1649
132
  if (!maybe_ttl_exp.has_value()) {
1650
24
    response_.set_int_response(-2);
1651
24
    return Status::OK();
1652
24
  }
1653
1654
108
  auto exp = maybe_ttl_exp.get();
1655
108
  if (exp.ttl.Equals(Value::kMaxTtl)) {
1656
24
    response_.set_int_response(-1);
1657
24
    return Status::OK();
1658
24
  }
1659
1660
84
  MonoDelta ttl = VERIFY_RESULT(exp.ComputeRelativeTtl(iterator_->read_time().read));
1661
84
  if (ttl.IsNegative()) {
1662
    // The value has expired.
1663
24
    response_.set_int_response(-2);
1664
24
    return Status::OK();
1665
24
  }
1666
1667
60
  response_.set_int_response(request_.get_ttl_request().return_seconds() ?
1668
30
                             (int64_t) std::round(ttl.ToSeconds()) :
1669
30
                             ttl.ToMilliseconds());
1670
60
  return Status::OK();
1671
60
}
1672
1673
0
Status RedisReadOperation::ExecuteGetForRename() {
1674
0
  RedisDataType type = VERIFY_RESULT(GetValueType());
1675
0
  response_.set_type(type);
1676
0
  switch (type) {
1677
0
    case RedisDataType::REDIS_TYPE_STRING: {
1678
0
      return ExecuteGet(RedisGetRequestPB::GET);
1679
0
    }
1680
1681
0
    case RedisDataType::REDIS_TYPE_HASH: {
1682
0
      return ExecuteGet(RedisGetRequestPB::HGETALL);
1683
0
    }
1684
1685
0
    case RedisDataType::REDIS_TYPE_SET: {
1686
0
      return ExecuteGet(RedisGetRequestPB::SMEMBERS);
1687
0
    }
1688
1689
0
    case RedisDataType::REDIS_TYPE_SORTEDSET: {
1690
0
      return ExecuteCollectionGetRangeByBounds(
1691
0
          RedisCollectionGetRangeRequestPB::ZRANGEBYSCORE, true);
1692
0
    }
1693
1694
0
    case RedisDataType::REDIS_TYPE_TIMESERIES: {
1695
0
      return ExecuteCollectionGetRangeByBounds(
1696
0
          RedisCollectionGetRangeRequestPB::TSRANGEBYTIME, true);
1697
0
    }
1698
1699
0
    case RedisDataType::REDIS_TYPE_NONE: {
1700
0
      response_.set_code(RedisResponsePB::NOT_FOUND);
1701
0
      return Status::OK();
1702
0
    }
1703
1704
0
    case RedisDataType::REDIS_TYPE_LIST:
1705
0
    default: {
1706
0
      LOG(DFATAL) << "Unhandled Redis Data Type " << type;
1707
0
    }
1708
0
  }
1709
0
  return Status::OK();
1710
0
}
1711
1712
0
Status RedisReadOperation::ExecuteGet(RedisGetRequestPB::GetRequestType type) {
1713
0
  RedisGetRequestPB request;
1714
0
  request.set_request_type(type);
1715
0
  return ExecuteGet(request);
1716
0
}
1717
1718
37.0k
Status RedisReadOperation::ExecuteGet() { return ExecuteGet(request_.get_request()); }
1719
1720
37.0k
Status RedisReadOperation::ExecuteGet(const RedisGetRequestPB& get_request) {
1721
37.0k
  auto request_type = get_request.request_type();
1722
37.0k
  RedisDataType expected_type = REDIS_TYPE_NONE;
1723
37.0k
  switch (request_type) {
1724
36.3k
    case RedisGetRequestPB::GET:
1725
36.3k
      expected_type = REDIS_TYPE_STRING; break;
1726
609
    case RedisGetRequestPB::TSGET:
1727
609
      expected_type = REDIS_TYPE_TIMESERIES; break;
1728
0
    case RedisGetRequestPB::HGET: FALLTHROUGH_INTENDED;
1729
0
    case RedisGetRequestPB::HEXISTS:
1730
0
      expected_type = REDIS_TYPE_HASH; break;
1731
0
    case RedisGetRequestPB::SISMEMBER:
1732
0
      expected_type = REDIS_TYPE_SET; break;
1733
21
    case RedisGetRequestPB::ZSCORE:
1734
21
      expected_type = REDIS_TYPE_SORTEDSET; break;
1735
18
    default:
1736
18
      expected_type = REDIS_TYPE_NONE;
1737
37.0k
  }
1738
37.0k
  switch (request_type) {
1739
36.3k
    case RedisGetRequestPB::GET: FALLTHROUGH_INTENDED;
1740
36.9k
    case RedisGetRequestPB::TSGET: FALLTHROUGH_INTENDED;
1741
36.9k
    case RedisGetRequestPB::HGET: {
1742
36.9k
      RedisDataType type = VERIFY_RESULT(GetValueType());
1743
      // TODO: this is primarily glue for the Timeseries bug where the parent
1744
      // may get compacted due to an outdated TTL even though the children
1745
      // have longer TTL's and thus still exist. When fixing, take note that
1746
      // GetValueType finds the value type of the parent, so if the parent
1747
      // does not have the maximum TTL, it will return REDIS_TYPE_NONE when it
1748
      // should not.
1749
36.9k
      if (expected_type == REDIS_TYPE_TIMESERIES && type == REDIS_TYPE_NONE) {
1750
0
        type = expected_type;
1751
0
      }
1752
      // If wrong type, we set the error code in the response.
1753
36.9k
      if (VerifyTypeAndSetCode(expected_type, type, &response_, VerifySuccessIfMissing::kTrue)) {
1754
36.3k
        auto value = request_type == RedisGetRequestPB::TSGET ? GetOverrideValue() : GetValue();
1755
36.9k
        RETURN_NOT_OK(value);
1756
36.9k
        if (VerifyTypeAndSetCode(RedisDataType::REDIS_TYPE_STRING, value->type, &response_,
1757
36.9k
            VerifySuccessIfMissing::kTrue)) {
1758
36.9k
          response_.set_string_response(value->value);
1759
36.9k
        }
1760
36.9k
      }
1761
36.9k
      return Status::OK();
1762
36.9k
    }
1763
21
    case RedisGetRequestPB::ZSCORE: {
1764
21
      RedisDataType type = VERIFY_RESULT(GetValueType());
1765
      // If wrong type, we set the error code in the response.
1766
21
      if (!VerifyTypeAndSetCode(expected_type, type, &response_, VerifySuccessIfMissing::kTrue)) {
1767
3
        return Status::OK();
1768
3
      }
1769
18
      SubDocKey key_reverse = SubDocKey(
1770
18
          DocKey::FromRedisKey(request_.key_value().hash_code(), request_.key_value().key()),
1771
18
          PrimitiveValue(ValueType::kSSReverse),
1772
18
          PrimitiveValue(request_.key_value().subkey(0).string_subkey()));
1773
18
      SubDocument subdoc_reverse;
1774
18
      bool subdoc_reverse_found = false;
1775
18
      auto encoded_key_reverse = key_reverse.EncodeWithoutHt();
1776
18
      GetRedisSubDocumentData get_data = {
1777
18
          encoded_key_reverse, &subdoc_reverse, &subdoc_reverse_found };
1778
18
      RETURN_NOT_OK(GetRedisSubDocument(doc_db_, get_data, redis_query_id(),
1779
18
                                        TransactionOperationContext(), deadline_, read_time_));
1780
18
      if (subdoc_reverse_found) {
1781
12
        double score = subdoc_reverse.GetDouble();
1782
12
        response_.set_string_response(std::to_string(score));
1783
6
      } else {
1784
6
        response_.set_code(RedisResponsePB::NIL);
1785
6
      }
1786
18
      return Status::OK();
1787
18
    }
1788
0
    case RedisGetRequestPB::HEXISTS: FALLTHROUGH_INTENDED;
1789
0
    case RedisGetRequestPB::SISMEMBER: {
1790
0
      RedisDataType type = VERIFY_RESULT(GetValueType());
1791
0
      if (VerifyTypeAndSetCode(expected_type, type, &response_, VerifySuccessIfMissing::kTrue)) {
1792
0
        RedisDataType subtype = VERIFY_RESULT(GetValueType(0));
1793
0
        SetOptionalInt(subtype, 1, &response_);
1794
0
        response_.set_code(RedisResponsePB::OK);
1795
0
      }
1796
0
      return Status::OK();
1797
0
    }
1798
0
    case RedisGetRequestPB::HSTRLEN: {
1799
0
      RedisDataType type = VERIFY_RESULT(GetValueType());
1800
0
      if (VerifyTypeAndSetCode(RedisDataType::REDIS_TYPE_HASH, type, &response_,
1801
0
                               VerifySuccessIfMissing::kTrue)) {
1802
0
        auto value = GetValue();
1803
0
        RETURN_NOT_OK(value);
1804
0
        SetOptionalInt(value->type, value->value.length(), &response_);
1805
0
        response_.set_code(RedisResponsePB::OK);
1806
0
      }
1807
0
      return Status::OK();
1808
0
    }
1809
0
    case RedisGetRequestPB::MGET: {
1810
0
      return STATUS(NotSupported, "MGET not yet supported");
1811
0
    }
1812
0
    case RedisGetRequestPB::HMGET: {
1813
0
      RedisDataType type = VERIFY_RESULT(GetValueType());
1814
0
      if (!VerifyTypeAndSetCode(RedisDataType::REDIS_TYPE_HASH, type, &response_,
1815
0
                                VerifySuccessIfMissing::kTrue)) {
1816
0
        return Status::OK();
1817
0
      }
1818
1819
0
      response_.set_allocated_array_response(new RedisArrayPB());
1820
0
      const auto& req_kv = request_.key_value();
1821
0
      auto num_subkeys = req_kv.subkey_size();
1822
0
      vector<int> indices(num_subkeys);
1823
0
      for (int i = 0; i < num_subkeys; ++i) {
1824
0
        indices[i] = i;
1825
0
      }
1826
0
      std::sort(indices.begin(), indices.end(), [&req_kv](int i, int j) {
1827
0
            return req_kv.subkey(i).string_subkey() < req_kv.subkey(j).string_subkey();
1828
0
          });
1829
1830
0
      string current_value = "";
1831
0
      response_.mutable_array_response()->mutable_elements()->Reserve(num_subkeys);
1832
0
      for (int i = 0; i < num_subkeys; ++i) {
1833
0
        response_.mutable_array_response()->add_elements();
1834
0
      }
1835
0
      for (int i = 0; i < num_subkeys; ++i) {
1836
0
        if (i == 0 ||
1837
0
            req_kv.subkey(indices[i]).string_subkey() !=
1838
0
            req_kv.subkey(indices[i - 1]).string_subkey()) {
1839
          // If the condition above is false, we encountered the same key again, no need to call
1840
          // GetValue() once more, current_value is already correct.
1841
0
          auto value = GetValue(indices[i]);
1842
0
          RETURN_NOT_OK(value);
1843
0
          if (value->type == REDIS_TYPE_STRING) {
1844
0
            current_value = std::move(value->value);
1845
0
          } else {
1846
0
            current_value = ""; // Empty string is nil response.
1847
0
          }
1848
0
        }
1849
0
        *response_.mutable_array_response()->mutable_elements(indices[i]) = current_value;
1850
0
      }
1851
1852
0
      response_.set_code(RedisResponsePB::OK);
1853
0
      return Status::OK();
1854
0
    }
1855
0
    case RedisGetRequestPB::HGETALL:
1856
0
      return ExecuteHGetAllLikeCommands(ValueType::kObject, true, true);
1857
0
    case RedisGetRequestPB::HKEYS:
1858
0
      return ExecuteHGetAllLikeCommands(ValueType::kObject, true, false);
1859
0
    case RedisGetRequestPB::HVALS:
1860
0
      return ExecuteHGetAllLikeCommands(ValueType::kObject, false, true);
1861
0
    case RedisGetRequestPB::HLEN:
1862
0
      return ExecuteHGetAllLikeCommands(ValueType::kObject, false, false);
1863
0
    case RedisGetRequestPB::SMEMBERS:
1864
0
      return ExecuteHGetAllLikeCommands(ValueType::kRedisSet, true, false);
1865
0
    case RedisGetRequestPB::SCARD:
1866
0
      return ExecuteHGetAllLikeCommands(ValueType::kRedisSet, false, false);
1867
18
    case RedisGetRequestPB::TSCARD:
1868
18
      return ExecuteHGetAllLikeCommands(ValueType::kRedisTS, false, false);
1869
0
    case RedisGetRequestPB::ZCARD:
1870
0
      return ExecuteHGetAllLikeCommands(ValueType::kRedisSortedSet, false, false);
1871
0
    case RedisGetRequestPB::LLEN:
1872
0
      return ExecuteHGetAllLikeCommands(ValueType::kRedisList, false, false);
1873
0
    case RedisGetRequestPB::UNKNOWN: {
1874
0
      return STATUS(InvalidCommand, "Unknown Get Request not supported");
1875
0
    }
1876
0
  }
1877
0
  return Status::OK();
1878
0
}
1879
1880
0
Status RedisReadOperation::ExecuteStrLen() {
1881
0
  auto value = GetValue();
1882
0
  response_.set_code(RedisResponsePB::OK);
1883
0
  RETURN_NOT_OK(value);
1884
1885
0
  if (VerifyTypeAndSetCode(RedisDataType::REDIS_TYPE_STRING, value->type, &response_,
1886
0
        VerifySuccessIfMissing::kTrue)) {
1887
0
    SetOptionalInt(value->type, value->value.length(), &response_);
1888
0
  }
1889
0
  response_.set_code(RedisResponsePB::OK);
1890
1891
0
  return Status::OK();
1892
0
}
1893
1894
0
Status RedisReadOperation::ExecuteExists() {
1895
0
  auto value = GetValue();
1896
0
  response_.set_code(RedisResponsePB::OK);
1897
0
  RETURN_NOT_OK(value);
1898
1899
  // We only support exist command with one argument currently.
1900
0
  SetOptionalInt(value->type, 1, &response_);
1901
0
  response_.set_code(RedisResponsePB::OK);
1902
1903
0
  return Status::OK();
1904
0
}
1905
1906
0
Status RedisReadOperation::ExecuteGetRange() {
1907
0
  auto value = GetValue();
1908
0
  RETURN_NOT_OK(value);
1909
1910
0
  if (!VerifyTypeAndSetCode(RedisDataType::REDIS_TYPE_STRING, value->type, &response_,
1911
0
      VerifySuccessIfMissing::kTrue)) {
1912
    // We've already set the error code in the response.
1913
0
    return Status::OK();
1914
0
  }
1915
1916
0
  const ssize_t len = value->value.length();
1917
0
  ssize_t exclusive_end = request_.get_range_request().end() + 1;
1918
0
  if (exclusive_end == 0) {
1919
0
    exclusive_end = len;
1920
0
  }
1921
1922
  // We treat negative indices to refer backwards from the end of the string.
1923
0
  const auto start = ApplyIndex(request_.get_range_request().start(), len);
1924
0
  auto end = ApplyIndex(exclusive_end, len);
1925
0
  if (end < start) {
1926
0
    end = start;
1927
0
  }
1928
1929
0
  response_.set_code(RedisResponsePB::OK);
1930
0
  response_.set_string_response(value->value.c_str() + start, end - start);
1931
0
  return Status::OK();
1932
0
}
1933
1934
297
Status RedisReadOperation::ExecuteKeys() {
1935
297
  iterator_->Seek(DocKey());
1936
297
  int threshold = request_.keys_request().threshold();
1937
1938
297
  bool doc_found;
1939
297
  SubDocument result;
1940
1941
240k
  while (iterator_->valid()) {
1942
240k
    if (deadline_info_.get_ptr() && deadline_info_->CheckAndSetDeadlinePassed()) {
1943
0
      return STATUS(Expired, "Deadline for query passed.");
1944
0
    }
1945
240k
    auto key = VERIFY_RESULT(iterator_->FetchKey()).key;
1946
1947
    // Key could be invalidated because we could move iterator, so back it up.
1948
240k
    KeyBytes key_copy(key);
1949
240k
    key = key_copy.AsSlice();
1950
1951
240k
    DocKey doc_key;
1952
240k
    RETURN_NOT_OK(doc_key.FullyDecodeFrom(key));
1953
240k
    const PrimitiveValue& key_primitive = doc_key.hashed_group().front();
1954
240k
    if (!key_primitive.IsString() ||
1955
240k
        !RedisPatternMatch(request_.keys_request().pattern(),
1956
240k
                           key_primitive.GetString(),
1957
146k
                           false /* ignore_case */)) {
1958
146k
      iterator_->SeekOutOfSubDoc(key);
1959
146k
      continue;
1960
146k
    }
1961
1962
93.7k
    GetRedisSubDocumentData data = {key, &result, &doc_found};
1963
93.7k
    data.deadline_info = deadline_info_.get_ptr();
1964
93.7k
    data.return_type_only = true;
1965
93.7k
    RETURN_NOT_OK(GetRedisSubDocument(iterator_.get(), data, /* projection */ nullptr,
1966
93.7k
                                      SeekFwdSuffices::kFalse));
1967
1968
93.7k
    if (doc_found) {
1969
93.7k
      if (--threshold < 0) {
1970
3
        response_.clear_array_response();
1971
3
        response_.set_code(RedisResponsePB::SERVER_ERROR);
1972
3
        response_.set_error_message("Too many keys in the database.");
1973
3
        return Status::OK();
1974
3
      }
1975
93.7k
      RETURN_NOT_OK(AddPrimitiveValueToResponseArray(key_primitive,
1976
93.7k
                                                     response_.mutable_array_response()));
1977
93.7k
    }
1978
93.7k
    iterator_->SeekOutOfSubDoc(key);
1979
93.7k
  }
1980
1981
294
  response_.set_code(RedisResponsePB::OK);
1982
294
  return Status::OK();
1983
297
}
1984
1985
42.9k
const RedisResponsePB& RedisReadOperation::response() {
1986
42.9k
  return response_;
1987
42.9k
}
1988
1989
}  // namespace docdb
1990
}  // namespace yb