YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/docdb/redis_operation.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include "yb/docdb/redis_operation.h"
15
16
#include "yb/common/value.pb.h"
17
#include "yb/common/ql_value.h"
18
19
#include "yb/docdb/doc_key.h"
20
#include "yb/docdb/doc_path.h"
21
#include "yb/docdb/doc_reader_redis.h"
22
#include "yb/docdb/doc_ttl_util.h"
23
#include "yb/docdb/doc_write_batch.h"
24
#include "yb/docdb/doc_write_batch_cache.h"
25
#include "yb/docdb/docdb_rocksdb_util.h"
26
#include "yb/docdb/subdocument.h"
27
28
#include "yb/server/hybrid_clock.h"
29
30
#include "yb/util/redis_util.h"
31
#include "yb/util/status_format.h"
32
#include "yb/util/stol_utils.h"
33
34
DEFINE_bool(emulate_redis_responses,
35
    true,
36
    "If emulate_redis_responses is false, we hope to get slightly better performance by just "
37
    "returning OK for commands that might require us to read additional records viz. SADD, HSET, "
38
    "and HDEL. If emulate_redis_responses is true, we read the required records to compute the "
39
    "response as specified by the official Redis API documentation. https://redis.io/commands");
40
41
namespace yb {
42
namespace docdb {
43
44
// A simple conversion from RedisDataTypes to ValueTypes
45
// Note: May run into issues if we want to support ttl on individual set elements,
46
// as they are represented by ValueType::kNullLow.
47
48
ValueType ValueTypeFromRedisType(RedisDataType dt) {
48
48
  switch(dt) {
49
42
  case RedisDataType::REDIS_TYPE_STRING:
50
42
    return ValueType::kString;
51
0
  case RedisDataType::REDIS_TYPE_SET:
52
0
    return ValueType::kRedisSet;
53
0
  case RedisDataType::REDIS_TYPE_HASH:
54
0
    return ValueType::kObject;
55
6
  case RedisDataType::REDIS_TYPE_SORTEDSET:
56
6
    return ValueType::kRedisSortedSet;
57
0
  case RedisDataType::REDIS_TYPE_TIMESERIES:
58
0
    return ValueType::kRedisTS;
59
0
  case RedisDataType::REDIS_TYPE_LIST:
60
0
    return ValueType::kRedisList;
61
0
  default:
62
0
    return ValueType::kInvalid;
63
48
  }
64
48
}
65
66
Status RedisWriteOperation::GetDocPaths(
67
123k
    GetDocPathsMode mode, DocPathsToLock* paths, IsolationLevel *level) const {
68
123k
  paths->push_back(DocKey::FromRedisKey(
69
123k
      request_.key_value().hash_code(), request_.key_value().key()).EncodeAsRefCntPrefix());
70
123k
  *level = IsolationLevel::SNAPSHOT_ISOLATION;
71
72
123k
  return Status::OK();
73
123k
}
74
75
namespace {
76
77
161
bool EmulateRedisResponse(const RedisDataType& data_type) {
78
161
  return FLAGS_emulate_redis_responses && data_type != REDIS_TYPE_TIMESERIES;
79
161
}
80
81
static const string wrong_type_message =
82
    "WRONGTYPE Operation against a key holding the wrong kind of value";
83
84
91.0k
CHECKED_STATUS QLValueFromSubKey(const RedisKeyValueSubKeyPB &subkey_pb, QLValuePB *out) {
85
91.0k
  switch (subkey_pb.subkey_case()) {
86
59.5k
    case RedisKeyValueSubKeyPB::kStringSubkey:
87
59.5k
      out->set_string_value(subkey_pb.string_subkey());
88
59.5k
      break;
89
31.4k
    case RedisKeyValueSubKeyPB::kTimestampSubkey:
90
      // We use descending order for the timestamp in the timeseries type so that the latest
91
      // value sorts on top.
92
31.4k
      out->set_int64_value(subkey_pb.timestamp_subkey());
93
31.4k
      break;
94
0
    case RedisKeyValueSubKeyPB::kDoubleSubkey: {
95
0
      out->set_double_value(subkey_pb.double_subkey());
96
0
      break;
97
0
    }
98
0
    default:
99
0
      return STATUS_SUBSTITUTE(IllegalState, "Invalid enum value $0", subkey_pb.subkey_case());
100
91.0k
  }
101
91.0k
  return Status::OK();
102
91.0k
}
103
104
CHECKED_STATUS PrimitiveValueFromSubKey(
105
1.31k
    const RedisKeyValueSubKeyPB &subkey_pb, PrimitiveValue *out) {
106
1.31k
  QLValuePB value;
107
1.31k
  RETURN_NOT_OK(QLValueFromSubKey(subkey_pb, &value));
108
1.31k
  *out = PrimitiveValue::FromQLValuePB(
109
1.31k
      value,
110
1.31k
      subkey_pb.subkey_case() == RedisKeyValueSubKeyPB::kTimestampSubkey ? 
SortingType::kDescending1.21k
111
1.31k
                                                                         : 
SortingType::kAscending100
);
112
1.31k
  return Status::OK();
113
1.31k
}
114
115
// Stricter version of the above when we know the exact datatype to expect.
116
CHECKED_STATUS QLValueFromSubKeyStrict(const RedisKeyValueSubKeyPB& subkey_pb,
117
                                       RedisDataType data_type,
118
89.7k
                                       QLValuePB* out) {
119
89.7k
  switch (data_type) {
120
0
    case REDIS_TYPE_LIST: FALLTHROUGH_INTENDED;
121
0
    case REDIS_TYPE_SET: FALLTHROUGH_INTENDED;
122
59.4k
    case REDIS_TYPE_HASH:
123
59.4k
      if (!subkey_pb.has_string_subkey()) {
124
0
        return STATUS_SUBSTITUTE(InvalidArgument, "subkey: $0 should be of string type",
125
0
                                 subkey_pb.ShortDebugString());
126
0
      }
127
59.4k
      break;
128
59.4k
    case REDIS_TYPE_TIMESERIES:
129
30.2k
      if (!subkey_pb.has_timestamp_subkey()) {
130
0
        return STATUS_SUBSTITUTE(InvalidArgument, "subkey: $0 should be of int64 type",
131
0
                                 subkey_pb.ShortDebugString());
132
0
      }
133
30.2k
      break;
134
30.2k
    case REDIS_TYPE_SORTEDSET:
135
0
      if (!subkey_pb.has_double_subkey()) {
136
0
        return STATUS_SUBSTITUTE(InvalidArgument, "subkey: $0 should be of double type",
137
0
                             subkey_pb.ShortDebugString());
138
0
      }
139
0
      break;
140
0
    default:
141
0
      return STATUS_SUBSTITUTE(IllegalState, "Invalid enum value $0", data_type);
142
89.7k
  }
143
89.7k
  return QLValueFromSubKey(subkey_pb, out);
144
89.7k
}
145
146
Result<RedisDataType> GetRedisValueType(
147
    IntentAwareIterator* iterator,
148
    const RedisKeyValuePB &key_value_pb,
149
    DocWriteBatch* doc_write_batch = nullptr,
150
    int subkey_index = kNilSubkeyIndex,
151
114k
    bool always_override = false) {
152
114k
  if (!key_value_pb.has_key()) {
153
0
    return STATUS(Corruption, "Expected KeyValuePB");
154
0
  }
155
114k
  KeyBytes encoded_subdoc_key;
156
114k
  if (subkey_index == kNilSubkeyIndex) {
157
114k
    encoded_subdoc_key = DocKey::EncodedFromRedisKey(key_value_pb.hash_code(), key_value_pb.key());
158
114k
  } else {
159
72
    if (subkey_index >= key_value_pb.subkey_size()) {
160
0
      return STATUS_SUBSTITUTE(InvalidArgument,
161
0
                               "Size of subkeys ($0) must be larger than subkey_index ($1)",
162
0
                               key_value_pb.subkey_size(), subkey_index);
163
0
    }
164
165
72
    PrimitiveValue subkey_primitive;
166
72
    RETURN_NOT_OK(PrimitiveValueFromSubKey(key_value_pb.subkey(subkey_index), &subkey_primitive));
167
72
    encoded_subdoc_key = DocKey::EncodedFromRedisKey(key_value_pb.hash_code(), key_value_pb.key());
168
72
    subkey_primitive.AppendToKey(&encoded_subdoc_key);
169
72
  }
170
114k
  SubDocument doc;
171
114k
  bool doc_found = false;
172
  // Use the cached entry if possible to determine the value type.
173
114k
  boost::optional<DocWriteBatchCache::Entry> cached_entry;
174
114k
  if (doc_write_batch) {
175
41.3k
    cached_entry = doc_write_batch->LookupCache(encoded_subdoc_key);
176
41.3k
  }
177
178
114k
  if (cached_entry) {
179
0
    doc_found = true;
180
0
    doc = SubDocument(cached_entry->value_type);
181
114k
  } else {
182
    // TODO(dtxn) - pass correct transaction context when we implement cross-shard transactions
183
    // support for Redis.
184
114k
    GetRedisSubDocumentData data = { encoded_subdoc_key, &doc, &doc_found };
185
114k
    data.return_type_only = true;
186
114k
    data.exp.always_override = always_override;
187
114k
    RETURN_NOT_OK(GetRedisSubDocument(iterator, data, /* projection */ nullptr,
188
114k
                                 SeekFwdSuffices::kFalse));
189
114k
  }
190
191
114k
  if (!doc_found) {
192
43.5k
    return REDIS_TYPE_NONE;
193
43.5k
  }
194
195
70.5k
  switch (doc.value_type()) {
196
0
    case ValueType::kInvalid: FALLTHROUGH_INTENDED;
197
0
    case ValueType::kTombstone:
198
0
      return REDIS_TYPE_NONE;
199
34
    case ValueType::kObject:
200
34
      return REDIS_TYPE_HASH;
201
22
    case ValueType::kRedisSet:
202
22
      return REDIS_TYPE_SET;
203
1.25k
    case ValueType::kRedisTS:
204
1.25k
      return REDIS_TYPE_TIMESERIES;
205
222
    case ValueType::kRedisSortedSet:
206
222
      return REDIS_TYPE_SORTEDSET;
207
10
    case ValueType::kRedisList:
208
10
      return REDIS_TYPE_LIST;
209
12
    case ValueType::kNullLow: FALLTHROUGH_INTENDED; // This value is a set member.
210
69.0k
    case ValueType::kString:
211
69.0k
      return REDIS_TYPE_STRING;
212
0
    default:
213
0
      return STATUS_FORMAT(Corruption,
214
70.5k
                           "Unknown value type for redis record: $0",
215
70.5k
                           static_cast<char>(doc.value_type()));
216
70.5k
  }
217
70.5k
}
218
219
Result<RedisValue> GetRedisValue(
220
    IntentAwareIterator* iterator,
221
    const RedisKeyValuePB &key_value_pb,
222
    int subkey_index = kNilSubkeyIndex,
223
    bool always_override = false,
224
72.7k
    Expiration* exp = nullptr) {
225
72.7k
  if (!key_value_pb.has_key()) {
226
0
    return STATUS(Corruption, "Expected KeyValuePB");
227
0
  }
228
72.7k
  auto encoded_doc_key = DocKey::EncodedFromRedisKey(key_value_pb.hash_code(), key_value_pb.key());
229
230
72.7k
  if (!key_value_pb.subkey().empty()) {
231
1.24k
    if (key_value_pb.subkey().size() != 1 && 
subkey_index == kNilSubkeyIndex8
) {
232
0
      return STATUS_SUBSTITUTE(Corruption,
233
0
                               "Expected at most one subkey, got $0", key_value_pb.subkey().size());
234
0
    }
235
1.24k
    PrimitiveValue subkey_primitive;
236
1.24k
    RETURN_NOT_OK(PrimitiveValueFromSubKey(
237
1.24k
        key_value_pb.subkey(subkey_index == kNilSubkeyIndex ? 0 : subkey_index),
238
1.24k
        &subkey_primitive));
239
1.24k
    subkey_primitive.AppendToKey(&encoded_doc_key);
240
1.24k
  }
241
242
72.7k
  SubDocument doc;
243
72.7k
  bool doc_found = false;
244
245
  // TODO(dtxn) - pass correct transaction context when we implement cross-shard transactions
246
  // support for Redis.
247
72.7k
  GetRedisSubDocumentData data = { encoded_doc_key, &doc, &doc_found };
248
72.7k
  data.exp.always_override = always_override;
249
72.7k
  RETURN_NOT_OK(GetRedisSubDocument(
250
72.7k
      iterator, data, /* projection */ nullptr, SeekFwdSuffices::kFalse));
251
72.7k
  if (!doc_found) {
252
3.08k
    return RedisValue{REDIS_TYPE_NONE};
253
3.08k
  }
254
255
69.6k
  if (HasExpiredTTL(data.exp.write_ht, data.exp.ttl, iterator->read_time().read)) {
256
0
    return RedisValue{REDIS_TYPE_NONE};
257
0
  }
258
259
69.6k
  if (exp)
260
60
    *exp = data.exp;
261
262
69.6k
  if (!doc.IsPrimitive()) {
263
6
    switch (doc.value_type()) {
264
0
      case ValueType::kObject:
265
0
        return RedisValue{REDIS_TYPE_HASH};
266
0
      case ValueType::kRedisTS:
267
0
        return RedisValue{REDIS_TYPE_TIMESERIES};
268
6
      case ValueType::kRedisSortedSet:
269
6
        return RedisValue{REDIS_TYPE_SORTEDSET};
270
0
      case ValueType::kRedisSet:
271
0
        return RedisValue{REDIS_TYPE_SET};
272
0
      case ValueType::kRedisList:
273
0
        return RedisValue{REDIS_TYPE_LIST};
274
0
      default:
275
0
        return STATUS_SUBSTITUTE(IllegalState, "Invalid value type: $0",
276
6
                                 static_cast<int>(doc.value_type()));
277
6
    }
278
6
  }
279
280
69.6k
  auto val = RedisValue{REDIS_TYPE_STRING, doc.GetString(), data.exp};
281
69.6k
  return val;
282
69.6k
}
283
284
YB_STRONGLY_TYPED_BOOL(VerifySuccessIfMissing);
285
286
// Set response based on the type match. Return whether the type matches what's expected.
287
bool VerifyTypeAndSetCode(
288
    const RedisDataType expected_type,
289
    const RedisDataType actual_type,
290
    RedisResponsePB *response,
291
145k
    VerifySuccessIfMissing verify_success_if_missing = VerifySuccessIfMissing::kFalse) {
292
145k
  if (actual_type == RedisDataType::REDIS_TYPE_NONE) {
293
5.44k
    if (verify_success_if_missing) {
294
5.44k
      response->set_code(RedisResponsePB::NIL);
295
5.44k
    } else {
296
0
      response->set_code(RedisResponsePB::NOT_FOUND);
297
0
    }
298
5.44k
    return verify_success_if_missing;
299
5.44k
  }
300
139k
  if (actual_type != expected_type) {
301
12
    response->set_code(RedisResponsePB::WRONG_TYPE);
302
12
    response->set_error_message(wrong_type_message);
303
12
    return false;
304
12
  }
305
139k
  response->set_code(RedisResponsePB::OK);
306
139k
  return true;
307
139k
}
308
309
bool VerifyTypeAndSetCode(
310
    const docdb::ValueType expected_type,
311
    const docdb::ValueType actual_type,
312
11.0k
    RedisResponsePB *response) {
313
11.0k
  if (actual_type != expected_type) {
314
9
    response->set_code(RedisResponsePB::WRONG_TYPE);
315
9
    response->set_error_message(wrong_type_message);
316
9
    return false;
317
9
  }
318
11.0k
  response->set_code(RedisResponsePB::OK);
319
11.0k
  return true;
320
11.0k
}
321
322
CHECKED_STATUS AddPrimitiveValueToResponseArray(const PrimitiveValue& value,
323
3.85M
                                                RedisArrayPB* redis_array) {
324
3.85M
  switch (value.value_type()) {
325
2.02M
    case ValueType::kString: FALLTHROUGH_INTENDED;
326
2.02M
    case ValueType::kStringDescending: {
327
2.02M
      redis_array->add_elements(value.GetString());
328
2.02M
      return Status::OK();
329
2.02M
    }
330
0
    case ValueType::kInt64: FALLTHROUGH_INTENDED;
331
1.83M
    case ValueType::kInt64Descending: {
332
1.83M
      redis_array->add_elements(std::to_string(value.GetInt64()));
333
1.83M
      return Status::OK();
334
0
    }
335
58
    case ValueType::kDouble: FALLTHROUGH_INTENDED;
336
58
    case ValueType::kDoubleDescending: {
337
58
      redis_array->add_elements(std::to_string(value.GetDouble()));
338
58
      return Status::OK();
339
58
    }
340
0
    default:
341
0
      return STATUS_SUBSTITUTE(InvalidArgument, "Invalid value type: $0",
342
3.85M
                             static_cast<int>(value.value_type()));
343
3.85M
  }
344
3.85M
}
345
346
CHECKED_STATUS AddResponseValuesGeneric(const PrimitiveValue& first,
347
                                        const PrimitiveValue& second,
348
                                        RedisResponsePB* response,
349
                                        bool add_keys,
350
                                        bool add_values,
351
1.83M
                                        bool reverse = false) {
352
1.83M
  if (add_keys) {
353
1.83M
    RETURN_NOT_OK(AddPrimitiveValueToResponseArray(first, response->mutable_array_response()));
354
1.83M
  }
355
1.83M
  if (add_values) {
356
1.83M
    RETURN_NOT_OK(AddPrimitiveValueToResponseArray(second, response->mutable_array_response()));
357
1.83M
  }
358
1.83M
  return Status::OK();
359
1.83M
}
360
361
// Populate the response array for sorted sets range queries.
362
// first refers to the score for the given values.
363
// second refers to a subdocument where each key is a value with the given score.
364
CHECKED_STATUS AddResponseValuesSortedSets(const PrimitiveValue& first,
365
                                           const SubDocument& second,
366
                                           RedisResponsePB* response,
367
                                           bool add_keys,
368
                                           bool add_values,
369
288
                                           bool reverse = false) {
370
288
  if (reverse) {
371
32
    for (auto it = second.object_container().rbegin();
372
52
         it != second.object_container().rend();
373
32
         
it++20
) {
374
20
      const PrimitiveValue& value = it->first;
375
20
      RETURN_NOT_OK(AddResponseValuesGeneric(value, first, response, add_values, add_keys));
376
20
    }
377
256
  } else {
378
256
    for (const auto& kv : second.object_container()) {
379
204
      const PrimitiveValue& value = kv.first;
380
204
      RETURN_NOT_OK(AddResponseValuesGeneric(value, first, response, add_values, add_keys));
381
204
    }
382
256
  }
383
288
  return Status::OK();
384
288
}
385
386
template <typename T, typename AddResponseRow>
387
CHECKED_STATUS PopulateRedisResponseFromInternal(T iter,
388
                                                 AddResponseRow add_response_row,
389
                                                 const T& iter_end,
390
                                                 RedisResponsePB *response,
391
                                                 bool add_keys,
392
                                                 bool add_values,
393
10.9k
                                                 bool reverse = false) {
394
10.9k
  response->set_allocated_array_response(new RedisArrayPB());
395
1.84M
  for (; iter != iter_end; 
iter++1.83M
) {
396
1.83M
    RETURN_NOT_OK(add_response_row(
397
1.83M
        iter->first, iter->second, response, add_keys, add_values, reverse));
398
1.83M
  }
399
10.9k
  return Status::OK();
400
10.9k
}
redis_operation.cc:yb::Status yb::docdb::(anonymous namespace)::PopulateRedisResponseFromInternal<std::__1::reverse_iterator<std::__1::__map_const_iterator<std::__1::__tree_const_iterator<std::__1::__value_type<yb::docdb::PrimitiveValue, yb::docdb::SubDocument>, std::__1::__tree_node<std::__1::__value_type<yb::docdb::PrimitiveValue, yb::docdb::SubDocument>, void*>*, long> > >, yb::Status (*)(yb::docdb::PrimitiveValue const&, yb::docdb::PrimitiveValue const&, yb::RedisResponsePB*, bool, bool, bool)>(std::__1::reverse_iterator<std::__1::__map_const_iterator<std::__1::__tree_const_iterator<std::__1::__value_type<yb::docdb::PrimitiveValue, yb::docdb::SubDocument>, std::__1::__tree_node<std::__1::__value_type<yb::docdb::PrimitiveValue, yb::docdb::SubDocument>, void*>*, long> > >, yb::Status (*)(yb::docdb::PrimitiveValue const&, yb::docdb::PrimitiveValue const&, yb::RedisResponsePB*, bool, bool, bool), std::__1::reverse_iterator<std::__1::__map_const_iterator<std::__1::__tree_const_iterator<std::__1::__value_type<yb::docdb::PrimitiveValue, yb::docdb::SubDocument>, std::__1::__tree_node<std::__1::__value_type<yb::docdb::PrimitiveValue, yb::docdb::SubDocument>, void*>*, long> > > const&, yb::RedisResponsePB*, bool, bool, bool)
Line
Count
Source
393
60
                                                 bool reverse = false) {
394
60
  response->set_allocated_array_response(new RedisArrayPB());
395
1.46k
  for (; iter != iter_end; 
iter++1.40k
) {
396
1.40k
    RETURN_NOT_OK(add_response_row(
397
1.40k
        iter->first, iter->second, response, add_keys, add_values, reverse));
398
1.40k
  }
399
60
  return Status::OK();
400
60
}
redis_operation.cc:yb::Status yb::docdb::(anonymous namespace)::PopulateRedisResponseFromInternal<std::__1::__map_const_iterator<std::__1::__tree_const_iterator<std::__1::__value_type<yb::docdb::PrimitiveValue, yb::docdb::SubDocument>, std::__1::__tree_node<std::__1::__value_type<yb::docdb::PrimitiveValue, yb::docdb::SubDocument>, void*>*, long> >, yb::Status (*)(yb::docdb::PrimitiveValue const&, yb::docdb::PrimitiveValue const&, yb::RedisResponsePB*, bool, bool, bool)>(std::__1::__map_const_iterator<std::__1::__tree_const_iterator<std::__1::__value_type<yb::docdb::PrimitiveValue, yb::docdb::SubDocument>, std::__1::__tree_node<std::__1::__value_type<yb::docdb::PrimitiveValue, yb::docdb::SubDocument>, void*>*, long> >, yb::Status (*)(yb::docdb::PrimitiveValue const&, yb::docdb::PrimitiveValue const&, yb::RedisResponsePB*, bool, bool, bool), std::__1::__map_const_iterator<std::__1::__tree_const_iterator<std::__1::__value_type<yb::docdb::PrimitiveValue, yb::docdb::SubDocument>, std::__1::__tree_node<std::__1::__value_type<yb::docdb::PrimitiveValue, yb::docdb::SubDocument>, void*>*, long> > const&, yb::RedisResponsePB*, bool, bool, bool)
Line
Count
Source
393
10.8k
                                                 bool reverse = false) {
394
10.8k
  response->set_allocated_array_response(new RedisArrayPB());
395
1.84M
  for (; iter != iter_end; 
iter++1.83M
) {
396
1.83M
    RETURN_NOT_OK(add_response_row(
397
1.83M
        iter->first, iter->second, response, add_keys, add_values, reverse));
398
1.83M
  }
399
10.8k
  return Status::OK();
400
10.8k
}
redis_operation.cc:yb::Status yb::docdb::(anonymous namespace)::PopulateRedisResponseFromInternal<std::__1::reverse_iterator<std::__1::__map_const_iterator<std::__1::__tree_const_iterator<std::__1::__value_type<yb::docdb::PrimitiveValue, yb::docdb::SubDocument>, std::__1::__tree_node<std::__1::__value_type<yb::docdb::PrimitiveValue, yb::docdb::SubDocument>, void*>*, long> > >, yb::Status (*)(yb::docdb::PrimitiveValue const&, yb::docdb::SubDocument const&, yb::RedisResponsePB*, bool, bool, bool)>(std::__1::reverse_iterator<std::__1::__map_const_iterator<std::__1::__tree_const_iterator<std::__1::__value_type<yb::docdb::PrimitiveValue, yb::docdb::SubDocument>, std::__1::__tree_node<std::__1::__value_type<yb::docdb::PrimitiveValue, yb::docdb::SubDocument>, void*>*, long> > >, yb::Status (*)(yb::docdb::PrimitiveValue const&, yb::docdb::SubDocument const&, yb::RedisResponsePB*, bool, bool, bool), std::__1::reverse_iterator<std::__1::__map_const_iterator<std::__1::__tree_const_iterator<std::__1::__value_type<yb::docdb::PrimitiveValue, yb::docdb::SubDocument>, std::__1::__tree_node<std::__1::__value_type<yb::docdb::PrimitiveValue, yb::docdb::SubDocument>, void*>*, long> > > const&, yb::RedisResponsePB*, bool, bool, bool)
Line
Count
Source
393
8
                                                 bool reverse = false) {
394
8
  response->set_allocated_array_response(new RedisArrayPB());
395
40
  for (; iter != iter_end; 
iter++32
) {
396
32
    RETURN_NOT_OK(add_response_row(
397
32
        iter->first, iter->second, response, add_keys, add_values, reverse));
398
32
  }
399
8
  return Status::OK();
400
8
}
redis_operation.cc:yb::Status yb::docdb::(anonymous namespace)::PopulateRedisResponseFromInternal<std::__1::__map_const_iterator<std::__1::__tree_const_iterator<std::__1::__value_type<yb::docdb::PrimitiveValue, yb::docdb::SubDocument>, std::__1::__tree_node<std::__1::__value_type<yb::docdb::PrimitiveValue, yb::docdb::SubDocument>, void*>*, long> >, yb::Status (*)(yb::docdb::PrimitiveValue const&, yb::docdb::SubDocument const&, yb::RedisResponsePB*, bool, bool, bool)>(std::__1::__map_const_iterator<std::__1::__tree_const_iterator<std::__1::__value_type<yb::docdb::PrimitiveValue, yb::docdb::SubDocument>, std::__1::__tree_node<std::__1::__value_type<yb::docdb::PrimitiveValue, yb::docdb::SubDocument>, void*>*, long> >, yb::Status (*)(yb::docdb::PrimitiveValue const&, yb::docdb::SubDocument const&, yb::RedisResponsePB*, bool, bool, bool), std::__1::__map_const_iterator<std::__1::__tree_const_iterator<std::__1::__value_type<yb::docdb::PrimitiveValue, yb::docdb::SubDocument>, std::__1::__tree_node<std::__1::__value_type<yb::docdb::PrimitiveValue, yb::docdb::SubDocument>, void*>*, long> > const&, yb::RedisResponsePB*, bool, bool, bool)
Line
Count
Source
393
80
                                                 bool reverse = false) {
394
80
  response->set_allocated_array_response(new RedisArrayPB());
395
336
  for (; iter != iter_end; 
iter++256
) {
396
256
    RETURN_NOT_OK(add_response_row(
397
256
        iter->first, iter->second, response, add_keys, add_values, reverse));
398
256
  }
399
80
  return Status::OK();
400
80
}
401
402
template <typename AddResponseRow>
403
CHECKED_STATUS PopulateResponseFrom(const SubDocument::ObjectContainer &key_values,
404
                                    AddResponseRow add_response_row,
405
                                    RedisResponsePB *response,
406
                                    bool add_keys,
407
                                    bool add_values,
408
10.9k
                                    bool reverse = false) {
409
10.9k
  if (reverse) {
410
68
    return PopulateRedisResponseFromInternal(key_values.rbegin(), add_response_row,
411
68
                                             key_values.rend(), response, add_keys,
412
68
                                             add_values, reverse);
413
10.9k
  } else {
414
10.9k
    return PopulateRedisResponseFromInternal(key_values.begin(),  add_response_row,
415
10.9k
                                             key_values.end(), response, add_keys,
416
10.9k
                                             add_values, reverse);
417
10.9k
  }
418
10.9k
}
redis_operation.cc:yb::Status yb::docdb::(anonymous namespace)::PopulateResponseFrom<yb::Status (*)(yb::docdb::PrimitiveValue const&, yb::docdb::PrimitiveValue const&, yb::RedisResponsePB*, bool, bool, bool)>(std::__1::map<yb::docdb::PrimitiveValue, yb::docdb::SubDocument, std::__1::less<yb::docdb::PrimitiveValue>, std::__1::allocator<std::__1::pair<yb::docdb::PrimitiveValue const, yb::docdb::SubDocument> > > const&, yb::Status (*)(yb::docdb::PrimitiveValue const&, yb::docdb::PrimitiveValue const&, yb::RedisResponsePB*, bool, bool, bool), yb::RedisResponsePB*, bool, bool, bool)
Line
Count
Source
408
10.9k
                                    bool reverse = false) {
409
10.9k
  if (reverse) {
410
60
    return PopulateRedisResponseFromInternal(key_values.rbegin(), add_response_row,
411
60
                                             key_values.rend(), response, add_keys,
412
60
                                             add_values, reverse);
413
10.8k
  } else {
414
10.8k
    return PopulateRedisResponseFromInternal(key_values.begin(),  add_response_row,
415
10.8k
                                             key_values.end(), response, add_keys,
416
10.8k
                                             add_values, reverse);
417
10.8k
  }
418
10.9k
}
redis_operation.cc:yb::Status yb::docdb::(anonymous namespace)::PopulateResponseFrom<yb::Status (*)(yb::docdb::PrimitiveValue const&, yb::docdb::SubDocument const&, yb::RedisResponsePB*, bool, bool, bool)>(std::__1::map<yb::docdb::PrimitiveValue, yb::docdb::SubDocument, std::__1::less<yb::docdb::PrimitiveValue>, std::__1::allocator<std::__1::pair<yb::docdb::PrimitiveValue const, yb::docdb::SubDocument> > > const&, yb::Status (*)(yb::docdb::PrimitiveValue const&, yb::docdb::SubDocument const&, yb::RedisResponsePB*, bool, bool, bool), yb::RedisResponsePB*, bool, bool, bool)
Line
Count
Source
408
88
                                    bool reverse = false) {
409
88
  if (reverse) {
410
8
    return PopulateRedisResponseFromInternal(key_values.rbegin(), add_response_row,
411
8
                                             key_values.rend(), response, add_keys,
412
8
                                             add_values, reverse);
413
80
  } else {
414
80
    return PopulateRedisResponseFromInternal(key_values.begin(),  add_response_row,
415
80
                                             key_values.end(), response, add_keys,
416
80
                                             add_values, reverse);
417
80
  }
418
88
}
419
420
void SetOptionalInt(RedisDataType type, int64_t value, int64_t none_value,
421
24
                    RedisResponsePB* response) {
422
24
  if (type == RedisDataType::REDIS_TYPE_NONE) {
423
14
    response->set_code(RedisResponsePB::NIL);
424
14
    response->set_int_response(none_value);
425
14
  } else {
426
10
    response->set_code(RedisResponsePB::OK);
427
10
    response->set_int_response(value);
428
10
  }
429
24
}
430
431
16
void SetOptionalInt(RedisDataType type, int64_t value, RedisResponsePB* response) {
432
16
  SetOptionalInt(type, value, 0, response);
433
16
}
434
435
21.1k
Result<int64_t> GetCardinality(IntentAwareIterator* iterator, const RedisKeyValuePB& kv) {
436
21.1k
  auto encoded_key_card = DocKey::EncodedFromRedisKey(kv.hash_code(), kv.key());
437
21.1k
  PrimitiveValue(ValueType::kCounter).AppendToKey(&encoded_key_card);
438
21.1k
  SubDocument subdoc_card;
439
440
21.1k
  bool subdoc_card_found = false;
441
21.1k
  GetRedisSubDocumentData data = { encoded_key_card, &subdoc_card, &subdoc_card_found };
442
443
21.1k
  RETURN_NOT_OK(GetRedisSubDocument(
444
21.1k
      iterator, data, /* projection */ nullptr, SeekFwdSuffices::kFalse));
445
446
21.1k
  return subdoc_card_found ? 
subdoc_card.GetInt64()90
:
021.0k
;
447
21.1k
}
448
449
template <typename AddResponseValues>
450
CHECKED_STATUS GetAndPopulateResponseValues(
451
    IntentAwareIterator* iterator,
452
    AddResponseValues add_response_values,
453
    const GetRedisSubDocumentData& data,
454
    ValueType expected_type,
455
    const RedisReadRequestPB& request,
456
    RedisResponsePB* response,
457
10.9k
    bool add_keys, bool add_values, bool reverse) {
458
459
10.9k
  RETURN_NOT_OK(GetRedisSubDocument(
460
10.9k
      iterator, data, /* projection */ nullptr, SeekFwdSuffices::kFalse));
461
462
  // Validate and populate response.
463
10.9k
  response->set_allocated_array_response(new RedisArrayPB());
464
10.9k
  if (!(*data.doc_found)) {
465
6
    response->set_code(RedisResponsePB::NIL);
466
6
    return Status::OK();
467
6
  }
468
469
10.9k
  if (VerifyTypeAndSetCode(expected_type, data.result->value_type(), response)) {
470
10.9k
      RETURN_NOT_OK(PopulateResponseFrom(data.result->object_container(),
471
10.9k
                                         add_response_values,
472
10.9k
                                         response, add_keys, add_values, reverse));
473
10.9k
  }
474
10.9k
  return Status::OK();
475
10.9k
}
redis_operation.cc:yb::Status yb::docdb::(anonymous namespace)::GetAndPopulateResponseValues<yb::Status (*)(yb::docdb::PrimitiveValue const&, yb::docdb::SubDocument const&, yb::RedisResponsePB*, bool, bool, bool)>(yb::docdb::IntentAwareIterator*, yb::Status (*)(yb::docdb::PrimitiveValue const&, yb::docdb::SubDocument const&, yb::RedisResponsePB*, bool, bool, bool), yb::docdb::GetRedisSubDocumentData const&, yb::docdb::ValueType, yb::RedisReadRequestPB const&, yb::RedisResponsePB*, bool, bool, bool)
Line
Count
Source
457
94
    bool add_keys, bool add_values, bool reverse) {
458
459
94
  RETURN_NOT_OK(GetRedisSubDocument(
460
94
      iterator, data, /* projection */ nullptr, SeekFwdSuffices::kFalse));
461
462
  // Validate and populate response.
463
94
  response->set_allocated_array_response(new RedisArrayPB());
464
94
  if (!(*data.doc_found)) {
465
6
    response->set_code(RedisResponsePB::NIL);
466
6
    return Status::OK();
467
6
  }
468
469
88
  if (VerifyTypeAndSetCode(expected_type, data.result->value_type(), response)) {
470
88
      RETURN_NOT_OK(PopulateResponseFrom(data.result->object_container(),
471
88
                                         add_response_values,
472
88
                                         response, add_keys, add_values, reverse));
473
88
  }
474
88
  return Status::OK();
475
88
}
redis_operation.cc:yb::Status yb::docdb::(anonymous namespace)::GetAndPopulateResponseValues<yb::Status (*)(yb::docdb::PrimitiveValue const&, yb::docdb::PrimitiveValue const&, yb::RedisResponsePB*, bool, bool, bool)>(yb::docdb::IntentAwareIterator*, yb::Status (*)(yb::docdb::PrimitiveValue const&, yb::docdb::PrimitiveValue const&, yb::RedisResponsePB*, bool, bool, bool), yb::docdb::GetRedisSubDocumentData const&, yb::docdb::ValueType, yb::RedisReadRequestPB const&, yb::RedisResponsePB*, bool, bool, bool)
Line
Count
Source
457
10.8k
    bool add_keys, bool add_values, bool reverse) {
458
459
10.8k
  RETURN_NOT_OK(GetRedisSubDocument(
460
10.8k
      iterator, data, /* projection */ nullptr, SeekFwdSuffices::kFalse));
461
462
  // Validate and populate response.
463
10.8k
  response->set_allocated_array_response(new RedisArrayPB());
464
10.8k
  if (!(*data.doc_found)) {
465
0
    response->set_code(RedisResponsePB::NIL);
466
0
    return Status::OK();
467
0
  }
468
469
10.8k
  if (VerifyTypeAndSetCode(expected_type, data.result->value_type(), response)) {
470
10.8k
      RETURN_NOT_OK(PopulateResponseFrom(data.result->object_container(),
471
10.8k
                                         add_response_values,
472
10.8k
                                         response, add_keys, add_values, reverse));
473
10.8k
  }
474
10.8k
  return Status::OK();
475
10.8k
}
476
477
// Get normalized (with respect to card) upper and lower index bounds for reverse range scans.
478
void GetNormalizedBounds(int64 low_idx, int64 high_idx, int64 card, bool reverse,
479
20
                         int64* low_idx_normalized, int64* high_idx_normalized) {
480
  // Turn negative bounds positive.
481
20
  if (low_idx < 0) {
482
0
    low_idx = card + low_idx;
483
0
  }
484
20
  if (high_idx < 0) {
485
0
    high_idx = card + high_idx;
486
0
  }
487
488
  // Index from lower to upper instead of upper to lower.
489
20
  if (reverse) {
490
8
    *low_idx_normalized = card - high_idx - 1;
491
8
    *high_idx_normalized = card - low_idx - 1;
492
12
  } else {
493
12
    *low_idx_normalized = low_idx;
494
12
    *high_idx_normalized = high_idx;
495
12
  }
496
497
  // Fit bounds to range [0, card).
498
20
  if (*low_idx_normalized < 0) {
499
4
    *low_idx_normalized = 0;
500
4
  }
501
20
  if (*high_idx_normalized >= card) {
502
8
    *high_idx_normalized = card - 1;
503
8
  }
504
20
}
505
506
} // anonymous namespace
507
508
41.4k
void RedisWriteOperation::InitializeIterator(const DocOperationApplyData& data) {
509
41.4k
  auto subdoc_key = SubDocKey(DocKey::FromRedisKey(
510
41.4k
      request_.key_value().hash_code(), request_.key_value().key()));
511
512
41.4k
  auto iter = CreateIntentAwareIterator(
513
41.4k
      data.doc_write_batch->doc_db(),
514
41.4k
      BloomFilterMode::USE_BLOOM_FILTER, subdoc_key.Encode().AsSlice(),
515
41.4k
      redis_query_id(), TransactionOperationContext(), data.deadline, data.read_time);
516
517
41.4k
  iterator_ = std::move(iter);
518
41.4k
}
519
520
123k
Status RedisWriteOperation::Apply(const DocOperationApplyData& data) {
521
123k
  switch (request_.request_case()) {
522
123k
    case RedisWriteRequestPB::kSetRequest:
523
123k
      return ApplySet(data);
524
114
    case RedisWriteRequestPB::kSetTtlRequest:
525
114
      return ApplySetTtl(data);
526
2
    case RedisWriteRequestPB::kGetsetRequest:
527
2
      return ApplyGetSet(data);
528
4
    case RedisWriteRequestPB::kAppendRequest:
529
4
      return ApplyAppend(data);
530
70
    case RedisWriteRequestPB::kDelRequest:
531
70
      return ApplyDel(data);
532
1
    case RedisWriteRequestPB::kSetRangeRequest:
533
1
      return ApplySetRange(data);
534
23
    case RedisWriteRequestPB::kIncrRequest:
535
23
      return ApplyIncr(data);
536
16
    case RedisWriteRequestPB::kPushRequest:
537
16
      return ApplyPush(data);
538
0
    case RedisWriteRequestPB::kInsertRequest:
539
0
      return ApplyInsert(data);
540
0
    case RedisWriteRequestPB::kPopRequest:
541
0
      return ApplyPop(data);
542
26
    case RedisWriteRequestPB::kAddRequest:
543
26
      return ApplyAdd(data);
544
    // TODO: Cut this short in doc_operation.
545
6
    case RedisWriteRequestPB::kNoOpRequest:
546
6
      return Status::OK();
547
0
    case RedisWriteRequestPB::REQUEST_NOT_SET: break;
548
123k
  }
549
0
  return STATUS_FORMAT(Corruption, "Unsupported redis read operation: $0", request_.request_case());
550
123k
}
551
552
Result<RedisDataType> RedisWriteOperation::GetValueType(
553
41.3k
    const DocOperationApplyData& data, int subkey_index) {
554
41.3k
  if (!iterator_) {
555
41.3k
    InitializeIterator(data);
556
41.3k
  }
557
41.3k
  return GetRedisValueType(
558
41.3k
      iterator_.get(), request_.key_value(), data.doc_write_batch, subkey_index);
559
41.3k
}
560
561
Result<RedisValue> RedisWriteOperation::GetValue(
562
144
    const DocOperationApplyData& data, int subkey_index, Expiration* ttl) {
563
144
  if (!iterator_) {
564
121
    InitializeIterator(data);
565
121
  }
566
144
  return GetRedisValue(iterator_.get(), request_.key_value(),
567
144
                       subkey_index, /* always_override */ false, ttl);
568
144
}
569
570
42.3k
void EnsureMapEntry(QLVirtualValuePB type, QLValuePB* value, QLMapValuePB** cache) {
571
42.3k
  if (*cache) {
572
142
    return;
573
142
  }
574
42.2k
  value->mutable_map_value()->mutable_keys()->Add()->set_virtual_value(type);
575
42.2k
  *cache = value->mutable_map_value()->mutable_values()->Add()->mutable_map_value();
576
42.2k
}
577
578
123k
Status RedisWriteOperation::ApplySet(const DocOperationApplyData& data) {
579
123k
  const RedisKeyValuePB& kv = request_.key_value();
580
123k
  const MonoDelta ttl = request_.set_request().has_ttl() ?
581
123k
      
MonoDelta::FromMilliseconds(request_.set_request().ttl())89
: ValueControlFields::kMaxTtl;
582
123k
  DocPath doc_path = DocPath::DocPathFromRedisKey(kv.hash_code(), kv.key());
583
123k
  if (kv.subkey_size() > 0) {
584
41.1k
    RedisDataType data_type = VERIFY_RESULT(GetValueType(data));
585
0
    switch (kv.type()) {
586
174
      case REDIS_TYPE_TIMESERIES: FALLTHROUGH_INTENDED;
587
20.0k
      case REDIS_TYPE_HASH: {
588
20.0k
        if (data_type != kv.type() && 
data_type != REDIS_TYPE_NONE19.9k
) {
589
0
          response_.set_code(RedisResponsePB::WRONG_TYPE);
590
0
          response_.set_error_message(wrong_type_message);
591
0
          return Status::OK();
592
0
        }
593
20.0k
        QLValuePB kv_entries;
594
20.0k
        auto& map = *kv_entries.mutable_map_value();
595
109k
        for (int i = 0; i < kv.subkey_size(); 
i++89.1k
) {
596
89.1k
          RETURN_NOT_OK(QLValueFromSubKeyStrict(
597
89.1k
              kv.subkey(i), kv.type(), map.mutable_keys()->Add()));
598
89.1k
          map.mutable_values()->Add()->set_string_value(kv.value(i));
599
89.1k
        }
600
601
        // For an HSET command (which has only one subkey), we need to read the subkey to find out
602
        // if the key already existed, and return 0 or 1 accordingly. This read is unnecessary for
603
        // HMSET and TSADD.
604
20.0k
        if (kv.subkey_size() == 1 && 
EmulateRedisResponse(kv.type())81
&&
605
20.0k
            
!request_.set_request().expect_ok_response()9
) {
606
8
          RedisDataType type = VERIFY_RESULT(GetValueType(data, 0));
607
          // For HSET/TSADD, we return 0 or 1 depending on if the key already existed.
608
          // If flag is false, no int response is returned.
609
0
          SetOptionalInt(type, 0, 1, &response_);
610
8
        }
611
612
20.0k
        ValueRef value_ref(kv_entries);
613
20.0k
        if (kv.type() == REDIS_TYPE_TIMESERIES) {
614
174
          value_ref.set_custom_value_type(ValueType::kRedisTS);
615
174
          value_ref.set_list_extend_order(ListExtendOrder::PREPEND);
616
174
        }
617
618
20.0k
        if (data_type == REDIS_TYPE_NONE && 
kv.type() == REDIS_TYPE_TIMESERIES19.9k
) {
619
          // Need to insert the document instead of extending it.
620
162
          RETURN_NOT_OK(data.doc_write_batch->InsertSubDocument(
621
162
              doc_path, value_ref, data.read_time,
622
162
              data.deadline, redis_query_id(), ttl, ValueControlFields::kInvalidUserTimestamp,
623
162
              false /* init_marker_ttl */));
624
19.8k
        } else {
625
19.8k
          RETURN_NOT_OK(data.doc_write_batch->ExtendSubDocument(
626
19.8k
              doc_path, value_ref, data.read_time,
627
19.8k
              data.deadline, redis_query_id(), ttl));
628
19.8k
        }
629
20.0k
        break;
630
20.0k
      }
631
21.1k
      case REDIS_TYPE_SORTEDSET: {
632
21.1k
        if (data_type != kv.type() && 
data_type != REDIS_TYPE_NONE21.0k
) {
633
0
          response_.set_code(RedisResponsePB::WRONG_TYPE);
634
0
          response_.set_error_message(wrong_type_message);
635
0
          return Status::OK();
636
0
        }
637
638
        // The SubDocuments to be inserted for card, the forward mapping, and reverse mapping.
639
21.1k
        std::unordered_map<double, QLMapValuePB*> forward_entries;
640
21.1k
        QLMapValuePB* kv_entries_forward = nullptr;
641
21.1k
        QLMapValuePB* kv_entries_reverse = nullptr;
642
643
        // The top level mapping.
644
21.1k
        QLValuePB kv_entries;
645
646
21.1k
        int new_elements_added = 0;
647
21.1k
        int return_value = 0;
648
42.3k
        for (int i = 0; i < kv.subkey_size(); 
i++21.1k
) {
649
          // Check whether the value is already in the document, if so delete it.
650
21.1k
          SubDocKey key_reverse = SubDocKey(DocKey::FromRedisKey(kv.hash_code(), kv.key()),
651
21.1k
                                            PrimitiveValue(ValueType::kSSReverse),
652
21.1k
                                            PrimitiveValue(kv.value(i)));
653
21.1k
          SubDocument subdoc_reverse;
654
21.1k
          bool subdoc_reverse_found = false;
655
21.1k
          auto encoded_key_reverse = key_reverse.EncodeWithoutHt();
656
21.1k
          GetRedisSubDocumentData get_data = { encoded_key_reverse, &subdoc_reverse,
657
21.1k
                                               &subdoc_reverse_found };
658
21.1k
          RETURN_NOT_OK(GetRedisSubDocument(
659
21.1k
              data.doc_write_batch->doc_db(),
660
21.1k
              get_data, redis_query_id(), TransactionOperationContext(), data.deadline,
661
21.1k
              data.read_time));
662
663
          // Flag indicating whether we should add the given entry to the sorted set.
664
21.1k
          bool should_add_entry = true;
665
          // Flag indicating whether we shoould remove an entry from the sorted set.
666
21.1k
          bool should_remove_existing_entry = false;
667
668
21.1k
          if (!subdoc_reverse_found) {
669
            // The value is not already in the document.
670
21.1k
            switch (request_.set_request().sorted_set_options().update_options()) {
671
0
              case SortedSetOptionsPB::NX: FALLTHROUGH_INTENDED;
672
21.1k
              case SortedSetOptionsPB::NONE: {
673
                // Both these options call for inserting new elements, increment return_value and
674
                // keep should_add_entry as true.
675
21.1k
                return_value++;
676
21.1k
                new_elements_added++;
677
21.1k
                break;
678
0
              }
679
2
              default: {
680
                // XX option calls for no new elements, don't increment return_value and set
681
                // should_add_entry to false.
682
2
                should_add_entry = false;
683
2
                break;
684
0
              }
685
21.1k
            }
686
21.1k
          } else {
687
            // The value is already in the document.
688
24
            switch (request_.set_request().sorted_set_options().update_options()) {
689
0
              case SortedSetOptionsPB::XX:
690
22
              case SortedSetOptionsPB::NONE: {
691
                // First make sure that the new score is different from the old score.
692
                // Both these options call for updating existing elements, set
693
                // should_remove_existing_entry to true, and if the CH flag is on (return both
694
                // elements changed and elements added), increment return_value.
695
22
                double score_to_remove = subdoc_reverse.GetDouble();
696
                // If incr option is set, we add the increment to the existing score.
697
22
                double score_to_add = request_.set_request().sorted_set_options().incr() ?
698
22
                    
score_to_remove + kv.subkey(i).double_subkey()0
: kv.subkey(i).double_subkey();
699
22
                if (score_to_remove != score_to_add) {
700
22
                  should_remove_existing_entry = true;
701
22
                  if (request_.set_request().sorted_set_options().ch()) {
702
2
                    return_value++;
703
2
                  }
704
22
                }
705
22
                break;
706
0
              }
707
2
              default: {
708
                // NX option calls for only new elements, set should_add_entry to false.
709
2
                should_add_entry = false;
710
2
                break;
711
0
              }
712
24
            }
713
24
          }
714
715
21.1k
          if (should_remove_existing_entry) {
716
22
            double score_to_remove = subdoc_reverse.GetDouble();
717
22
            EnsureMapEntry(QLVirtualValuePB::SS_FORWARD, &kv_entries, &kv_entries_forward);
718
22
            kv_entries_forward->mutable_keys()->Add()->set_double_value(score_to_remove);
719
22
            auto* value = kv_entries_forward->mutable_values()->Add()->mutable_map_value();
720
22
            forward_entries[score_to_remove] = value;
721
22
            value->mutable_keys()->Add()->set_string_value(kv.value(i));
722
22
            value->mutable_values()->Add()->set_virtual_value(
723
22
                QLVirtualValuePB::TOMBSTONE);
724
22
          }
725
726
21.1k
          if (should_add_entry) {
727
            // If the incr option is specified, we need insert the existing score + new score
728
            // instead of just the new score.
729
21.1k
            double score_to_add = request_.set_request().sorted_set_options().incr() ?
730
0
                kv.subkey(i).double_subkey() + subdoc_reverse.GetDouble() :
731
21.1k
                kv.subkey(i).double_subkey();
732
733
            // Add the forward mapping to the entries.
734
21.1k
            EnsureMapEntry(QLVirtualValuePB::SS_FORWARD, &kv_entries, &kv_entries_forward);
735
21.1k
            auto it = forward_entries.find(score_to_add);
736
21.1k
            if (it == forward_entries.end()) {
737
21.1k
              kv_entries_forward->mutable_keys()->Add()->set_double_value(score_to_add);
738
21.1k
              auto* value = kv_entries_forward->mutable_values()->Add()->mutable_map_value();
739
21.1k
              it = forward_entries.emplace(score_to_add, value).first;
740
21.1k
            }
741
21.1k
            it->second->mutable_keys()->Add()->set_string_value(kv.value(i));
742
21.1k
            it->second->mutable_values()->Add()->set_virtual_value(QLVirtualValuePB::NULL_LOW);
743
744
21.1k
            EnsureMapEntry(QLVirtualValuePB::SS_REVERSE, &kv_entries, &kv_entries_reverse);
745
            // Add the reverse mapping to the entries.
746
21.1k
            kv_entries_reverse->mutable_keys()->Add()->set_string_value(kv.value(i));
747
21.1k
            kv_entries_reverse->mutable_values()->Add()->set_double_value(score_to_add);
748
21.1k
          }
749
21.1k
        }
750
751
21.1k
        if (new_elements_added > 0) {
752
21.0k
          int64_t card = VERIFY_RESULT(GetCardinality(iterator_.get(), kv));
753
          // Insert card + new_elements_added back into the document for the updated card.
754
0
          auto& map = *kv_entries.mutable_map_value();
755
21.0k
          map.mutable_keys()->Add()->set_virtual_value(QLVirtualValuePB::COUNTER);
756
21.0k
          map.mutable_values()->Add()->set_int64_value(card + new_elements_added);
757
21.0k
        }
758
759
21.1k
        if (!kv_entries.map_value().keys().empty()) {
760
21.1k
          ValueRef value(kv_entries);
761
21.1k
          value.set_custom_value_type(ValueType::kRedisSortedSet);
762
21.1k
          if (data_type == REDIS_TYPE_NONE) {
763
21.0k
            RETURN_NOT_OK(data.doc_write_batch->InsertSubDocument(
764
21.0k
                doc_path, value, data.read_time, data.deadline, redis_query_id(), ttl));
765
21.0k
          } else {
766
64
            RETURN_NOT_OK(data.doc_write_batch->ExtendSubDocument(
767
64
                doc_path, value, data.read_time, data.deadline, redis_query_id(), ttl));
768
64
          }
769
21.1k
        }
770
21.1k
        response_.set_code(RedisResponsePB::OK);
771
21.1k
        response_.set_int_response(return_value);
772
21.1k
        break;
773
21.1k
    }
774
0
    case REDIS_TYPE_STRING: {
775
0
        return STATUS_SUBSTITUTE(InvalidCommand,
776
21.1k
            "Redis data type $0 in SET command should not have subkeys", kv.type());
777
21.1k
      }
778
0
      default:
779
0
        return STATUS_SUBSTITUTE(InvalidCommand,
780
41.1k
            "Redis data type $0 not supported in SET command", kv.type());
781
41.1k
    }
782
81.9k
  } else {
783
81.9k
    if (kv.type() != REDIS_TYPE_STRING) {
784
0
      return STATUS_SUBSTITUTE(InvalidCommand,
785
0
          "Redis data type for SET must be string if subkey not present, found $0", kv.type());
786
0
    }
787
81.9k
    if (kv.value_size() != 1) {
788
0
      return STATUS_SUBSTITUTE(InvalidCommand,
789
0
          "There must be only one value in SET if there is only one key, found $0",
790
0
          kv.value_size());
791
0
    }
792
81.9k
    const RedisWriteMode mode = request_.set_request().mode();
793
81.9k
    if (mode != RedisWriteMode::REDIS_WRITEMODE_UPSERT) {
794
80
      RedisDataType data_type = VERIFY_RESULT(GetValueType(data));
795
80
      if ((mode == RedisWriteMode::REDIS_WRITEMODE_INSERT && 
data_type != REDIS_TYPE_NONE54
)
796
80
          || 
(57
mode == RedisWriteMode::REDIS_WRITEMODE_UPDATE57
&&
data_type == REDIS_TYPE_NONE26
)) {
797
798
36
        if (request_.set_request().has_expect_ok_response() &&
799
36
            
!request_.set_request().expect_ok_response()2
) {
800
          // For SETNX we return 0 or 1 depending on if the key already existed.
801
2
          response_.set_int_response(0);
802
2
          response_.set_code(RedisResponsePB::OK);
803
34
        } else {
804
34
          response_.set_code(RedisResponsePB::NIL);
805
34
        }
806
36
        return Status::OK();
807
36
      }
808
80
    }
809
81.9k
    QLValuePB value;
810
81.9k
    value.set_string_value(kv.value(0));
811
812
81.9k
    RETURN_NOT_OK(data.doc_write_batch->SetPrimitive(
813
81.9k
        doc_path, ValueControlFields { .ttl = ttl }, ValueRef(value, SortingType::kNotSpecified),
814
81.9k
        data.read_time, data.deadline, redis_query_id()));
815
81.9k
  }
816
817
123k
  if (request_.set_request().has_expect_ok_response() &&
818
123k
      
!request_.set_request().expect_ok_response()19.8k
) {
819
    // For SETNX we return 0 or 1 depending on if the key already existed.
820
2
    response_.set_int_response(1);
821
2
  }
822
823
123k
  response_.set_code(RedisResponsePB::OK);
824
123k
  return Status::OK();
825
123k
}
826
827
114
Status RedisWriteOperation::ApplySetTtl(const DocOperationApplyData& data) {
828
114
  const RedisKeyValuePB& kv = request_.key_value();
829
830
  // We only support setting TTLs on top-level keys.
831
114
  if (!kv.subkey().empty()) {
832
0
    return STATUS_SUBSTITUTE(Corruption,
833
0
                             "Expected no subkeys, got $0", kv.subkey().size());
834
0
  }
835
836
114
  MonoDelta ttl;
837
114
  bool absolute_expiration = request_.set_ttl_request().has_absolute_time();
838
839
  // Handle ExpireAt
840
114
  if (absolute_expiration) {
841
6
    int64_t calc_ttl = request_.set_ttl_request().absolute_time() -
842
6
      server::HybridClock::GetPhysicalValueNanos(data.read_time.read) /
843
6
      MonoTime::kNanosecondsPerMillisecond;
844
6
    if (calc_ttl <= 0) {
845
0
      return ApplyDel(data);
846
0
    }
847
6
    ttl = MonoDelta::FromMilliseconds(calc_ttl);
848
6
  }
849
850
114
  Expiration exp;
851
114
  auto value = VERIFY_RESULT(GetValue(data, kNilSubkeyIndex, &exp));
852
853
114
  if (value.type == REDIS_TYPE_TIMESERIES) { // This command is not supported.
854
0
    return STATUS_SUBSTITUTE(InvalidCommand,
855
0
        "Redis data type $0 not supported in EXPIRE and PERSIST commands", value.type);
856
0
  }
857
858
114
  if (value.type == REDIS_TYPE_NONE) { // Key does not exist.
859
54
    VLOG
(1) << "TTL cannot be set because the key does not exist"0
;
860
54
    response_.set_int_response(0);
861
54
    return Status::OK();
862
54
  }
863
864
60
  if (!absolute_expiration && 
request_.set_ttl_request().ttl() == -157
) { // Handle PERSIST.
865
20
    MonoDelta new_ttl = VERIFY_RESULT(exp.ComputeRelativeTtl(iterator_->read_time().read));
866
20
    if (new_ttl.IsNegative() || new_ttl == ValueControlFields::kMaxTtl) {
867
12
      response_.set_int_response(0);
868
12
      return Status::OK();
869
12
    }
870
20
  }
871
872
48
  DocPath doc_path = DocPath::DocPathFromRedisKey(kv.hash_code(), kv.key());
873
48
  if (!absolute_expiration) {
874
45
    ttl = request_.set_ttl_request().ttl() == -1 ? 
ValueControlFields::kMaxTtl8
:
875
45
      
MonoDelta::FromMilliseconds(request_.set_ttl_request().ttl())37
;
876
45
  }
877
878
48
  ValueType v_type = ValueTypeFromRedisType(value.type);
879
48
  if (v_type == ValueType::kInvalid) {
880
0
    return STATUS(Corruption, "Invalid value type.");
881
0
  }
882
883
48
  auto control_flags = ValueControlFields {
884
48
    .merge_flags = ValueControlFields::kTtlFlag,
885
48
    .ttl = ttl,
886
48
  };
887
48
  RETURN_NOT_OK(data.doc_write_batch->SetPrimitive(
888
48
      doc_path, control_flags, ValueRef(v_type), data.read_time,
889
48
      data.deadline, redis_query_id()));
890
48
  VLOG
(2) << "Set TTL successfully to " << ttl << " for key " << kv.key()0
;
891
48
  response_.set_int_response(1);
892
48
  return Status::OK();
893
48
}
894
895
2
Status RedisWriteOperation::ApplyGetSet(const DocOperationApplyData& data) {
896
2
  const RedisKeyValuePB& kv = request_.key_value();
897
898
2
  if (kv.value_size() != 1) {
899
0
    return STATUS_SUBSTITUTE(Corruption,
900
0
        "Getset kv should have 1 value, found $0", kv.value_size());
901
0
  }
902
903
2
  auto value = GetValue(data);
904
2
  RETURN_NOT_OK(value);
905
906
2
  if (!VerifyTypeAndSetCode(RedisDataType::REDIS_TYPE_STRING, value->type, &response_,
907
2
      VerifySuccessIfMissing::kTrue)) {
908
    // We've already set the error code in the response.
909
0
    return Status::OK();
910
0
  }
911
2
  response_.set_string_response(value->value);
912
913
2
  QLValuePB value_pb;
914
2
  value_pb.set_string_value(kv.value(0));
915
916
2
  return data.doc_write_batch->SetPrimitive(
917
2
      DocPath::DocPathFromRedisKey(kv.hash_code(), kv.key()), ValueControlFields(),
918
2
      ValueRef(value_pb, SortingType::kNotSpecified), data.read_time, data.deadline,
919
2
      redis_query_id());
920
2
}
921
922
4
Status RedisWriteOperation::ApplyAppend(const DocOperationApplyData& data) {
923
4
  const RedisKeyValuePB& kv = request_.key_value();
924
925
4
  if (kv.value_size() != 1) {
926
0
    return STATUS_SUBSTITUTE(Corruption,
927
0
        "Append kv should have 1 value, found $0", kv.value_size());
928
0
  }
929
930
4
  auto value = GetValue(data);
931
4
  RETURN_NOT_OK(value);
932
933
4
  if (!VerifyTypeAndSetCode(RedisDataType::REDIS_TYPE_STRING, value->type, &response_,
934
4
                            VerifySuccessIfMissing::kTrue)) {
935
    // We've already set the error code in the response.
936
0
    return Status::OK();
937
0
  }
938
939
4
  value->value += kv.value(0);
940
941
4
  response_.set_code(RedisResponsePB::OK);
942
4
  response_.set_int_response(value->value.length());
943
944
  // TODO: update the TTL with the write time rather than read time,
945
  // or store the expiration.
946
4
  auto control_fields = ValueControlFields {
947
4
    .ttl = VERIFY_RESULT(value->exp.ComputeRelativeTtl(iterator_->read_time().read))
948
0
  };
949
950
0
  QLValuePB value_pb;
951
4
  value_pb.set_string_value(value->value);
952
953
4
  return data.doc_write_batch->SetPrimitive(
954
4
      DocPath::DocPathFromRedisKey(kv.hash_code(), kv.key()), control_fields,
955
4
      ValueRef(value_pb, SortingType::kNotSpecified), data.read_time, data.deadline,
956
4
      redis_query_id());
957
4
}
958
959
// TODO (akashnil): Actually check if the value existed, return 0 if not. handle multidel in future.
960
//                  See ENG-807
961
70
Status RedisWriteOperation::ApplyDel(const DocOperationApplyData& data) {
962
70
  const RedisKeyValuePB& kv = request_.key_value();
963
70
  RedisDataType data_type = VERIFY_RESULT(GetValueType(data));
964
70
  if (data_type != REDIS_TYPE_NONE && 
data_type != kv.type()56
&&
kv.type() != REDIS_TYPE_NONE22
) {
965
0
    response_.set_code(RedisResponsePB::WRONG_TYPE);
966
0
    response_.set_error_message(wrong_type_message);
967
0
    return Status::OK();
968
0
  }
969
970
70
  QLValuePB value;
971
70
  ValueRef value_ref(value, SortingType::kNotSpecified);
972
  // Number of distinct keys being removed.
973
70
  int num_keys = 0;
974
70
  switch (kv.type()) {
975
26
    case REDIS_TYPE_NONE: {
976
26
      num_keys = data_type == REDIS_TYPE_NONE ? 
04
:
122
;
977
26
      break;
978
0
    }
979
18
    case REDIS_TYPE_TIMESERIES: {
980
18
      if (data_type == REDIS_TYPE_NONE) {
981
6
        return Status::OK();
982
6
      }
983
12
      value_ref.set_list_extend_order(ListExtendOrder::PREPEND);
984
12
      value_ref.set_write_instruction(bfql::TSOpcode::kSetRemove);
985
618
      for (int i = 0; i < kv.subkey_size(); 
i++606
) {
986
606
        RETURN_NOT_OK(QLValueFromSubKeyStrict(
987
606
            kv.subkey(i), data_type, value.mutable_map_value()->mutable_keys()->Add()));
988
606
        value.mutable_map_value()->mutable_values()->Add()->set_virtual_value(
989
606
            QLVirtualValuePB::TOMBSTONE);
990
606
      }
991
12
      num_keys = kv.subkey_size();
992
12
      break;
993
12
    }
994
10
    case REDIS_TYPE_SORTEDSET: {
995
10
      num_keys = kv.subkey_size();
996
997
10
      auto& map = *value.mutable_map_value();
998
10
      map.mutable_keys()->Add()->set_virtual_value(QLVirtualValuePB::SS_FORWARD);
999
10
      auto& values_forward = *map.mutable_values()->Add()->mutable_map_value();
1000
1001
10
      map.mutable_keys()->Add()->set_virtual_value(QLVirtualValuePB::SS_REVERSE);
1002
10
      auto& values_reverse = *map.mutable_values()->Add()->mutable_map_value();
1003
1004
26
      for (int i = 0; i < kv.subkey_size(); 
i++16
) {
1005
        // Check whether the value is already in the document.
1006
16
        SubDocument doc_reverse;
1007
16
        bool doc_reverse_found = false;
1008
16
        SubDocKey subdoc_key_reverse = SubDocKey(DocKey::FromRedisKey(kv.hash_code(), kv.key()),
1009
16
                                                 PrimitiveValue(ValueType::kSSReverse),
1010
16
                                                 PrimitiveValue(kv.subkey(i).string_subkey()));
1011
        // Todo(Rahul): Add values to the write batch cache and then do an additional check.
1012
        // As of now, we only check to see if a value is in rocksdb, and we should also check
1013
        // the write batch.
1014
16
        auto encoded_subdoc_key_reverse = subdoc_key_reverse.EncodeWithoutHt();
1015
16
        GetRedisSubDocumentData get_data = { encoded_subdoc_key_reverse, &doc_reverse,
1016
16
                                             &doc_reverse_found };
1017
16
        RETURN_NOT_OK(GetRedisSubDocument(
1018
16
            data.doc_write_batch->doc_db(),
1019
16
            get_data, redis_query_id(), TransactionOperationContext(), data.deadline,
1020
16
            data.read_time));
1021
16
        if (doc_reverse_found && 
doc_reverse.value_type() != ValueType::kTombstone8
) {
1022
          // The value is already in the doc, needs to be removed.
1023
8
          values_reverse.mutable_keys()->Add()->set_string_value(kv.subkey(i).string_subkey());
1024
8
          values_reverse.mutable_values()->Add()->set_virtual_value(QLVirtualValuePB::TOMBSTONE);
1025
          // For sorted sets, the forward mapping also needs to be deleted.
1026
8
          values_forward.mutable_keys()->Add()->set_double_value(doc_reverse.GetDouble());
1027
8
          auto& fwd_map = *values_forward.mutable_values()->Add()->mutable_map_value();
1028
8
          fwd_map.mutable_keys()->Add()->set_string_value(kv.subkey(i).string_subkey());
1029
8
          fwd_map.mutable_values()->Add()->set_virtual_value(QLVirtualValuePB::TOMBSTONE);
1030
8
        } else {
1031
          // If the key is absent, it doesn't contribute to the count of keys being deleted.
1032
8
          num_keys--;
1033
8
        }
1034
16
      }
1035
10
      int64_t card = VERIFY_RESULT(GetCardinality(iterator_.get(), kv));
1036
      // The new cardinality is card - num_keys.
1037
0
      map.mutable_keys()->Add()->set_virtual_value(QLVirtualValuePB::COUNTER);
1038
10
      map.mutable_values()->Add()->set_int64_value(card - num_keys);
1039
1040
10
      break;
1041
10
    }
1042
16
    default: {
1043
16
      num_keys = kv.subkey_size(); // We know the subkeys are distinct.
1044
      // Avoid reads for redis timeseries type.
1045
16
      if (EmulateRedisResponse(kv.type())) {
1046
38
        for (int i = 0; i < kv.subkey_size(); 
i++22
) {
1047
22
          RedisDataType type = VERIFY_RESULT(GetValueType(data, i));
1048
22
          if (type == REDIS_TYPE_STRING) {
1049
8
            value.mutable_map_value()->mutable_keys()->Add()->set_string_value(
1050
8
                kv.subkey(i).string_subkey());
1051
8
            value.mutable_map_value()->mutable_values()->Add()->set_virtual_value(
1052
8
                QLVirtualValuePB::TOMBSTONE);
1053
14
          } else {
1054
            // If the key is absent, it doesn't contribute to the count of keys being deleted.
1055
14
            num_keys--;
1056
14
          }
1057
22
        }
1058
16
      }
1059
16
      break;
1060
16
    }
1061
70
  }
1062
1063
64
  if (num_keys != 0) {
1064
46
    DocPath doc_path = DocPath::DocPathFromRedisKey(kv.hash_code(), kv.key());
1065
46
    RETURN_NOT_OK(data.doc_write_batch->ExtendSubDocument(
1066
46
        doc_path, value_ref, data.read_time, data.deadline, redis_query_id()));
1067
46
  }
1068
1069
64
  response_.set_code(RedisResponsePB::OK);
1070
64
  if (EmulateRedisResponse(kv.type())) {
1071
    // If the flag is true, we respond with the number of keys actually being deleted. We don't
1072
    // report this number for the redis timeseries type to avoid reads.
1073
52
    response_.set_int_response(num_keys);
1074
52
  }
1075
64
  return Status::OK();
1076
64
}
1077
1078
1
Status RedisWriteOperation::ApplySetRange(const DocOperationApplyData& data) {
1079
1
  const RedisKeyValuePB& kv = request_.key_value();
1080
1
  if (kv.value_size() != 1) {
1081
0
    return STATUS_SUBSTITUTE(Corruption,
1082
0
        "SetRange kv should have 1 value, found $0", kv.value_size());
1083
0
  }
1084
1085
1
  auto value = GetValue(data);
1086
1
  RETURN_NOT_OK(value);
1087
1088
1
  if (!VerifyTypeAndSetCode(RedisDataType::REDIS_TYPE_STRING, value->type, &response_,
1089
1
        VerifySuccessIfMissing::kTrue)) {
1090
    // We've already set the error code in the response.
1091
0
    return Status::OK();
1092
0
  }
1093
1094
1
  size_t set_range_offset = request_.set_range_request().offset();
1095
1
  if (set_range_offset > value->value.length()) {
1096
0
    value->value.resize(set_range_offset, 0);
1097
0
  }
1098
1
  value->value.replace(set_range_offset, kv.value(0).length(), kv.value(0));
1099
1
  response_.set_code(RedisResponsePB::OK);
1100
1
  response_.set_int_response(value->value.length());
1101
1102
  // TODO: update the TTL with the write time rather than read time,
1103
  // or store the expiration.
1104
1
  auto control_fields = ValueControlFields {
1105
1
    .ttl = VERIFY_RESULT(value->exp.ComputeRelativeTtl(iterator_->read_time().read))
1106
0
  };
1107
1108
0
  QLValuePB value_pb;
1109
1
  value_pb.set_string_value(value->value);
1110
1
  return data.doc_write_batch->SetPrimitive(
1111
1
      DocPath::DocPathFromRedisKey(kv.hash_code(), kv.key()), control_fields,
1112
1
      ValueRef(value_pb, SortingType::kNotSpecified), std::move(iterator_));
1113
1
}
1114
1115
23
Status RedisWriteOperation::ApplyIncr(const DocOperationApplyData& data) {
1116
23
  const RedisKeyValuePB& kv = request_.key_value();
1117
23
  const int64_t incr = request_.incr_request().increment_int();
1118
1119
23
  if (kv.type() != REDIS_TYPE_HASH && 
kv.type() != REDIS_TYPE_STRING17
) {
1120
0
    return STATUS_SUBSTITUTE(InvalidCommand,
1121
0
                             "Redis data type $0 not supported in Incr command", kv.type());
1122
0
  }
1123
1124
23
  RedisDataType container_type = VERIFY_RESULT(GetValueType(data));
1125
23
  if (!VerifyTypeAndSetCode(kv.type(), container_type, &response_,
1126
23
                            VerifySuccessIfMissing::kTrue)) {
1127
    // We've already set the error code in the response.
1128
0
    return Status::OK();
1129
0
  }
1130
1131
23
  int subkey = (kv.type() == REDIS_TYPE_HASH ? 
06
:
-117
);
1132
23
  auto value = GetValue(data, subkey);
1133
23
  RETURN_NOT_OK(value);
1134
1135
23
  if (!VerifyTypeAndSetCode(RedisDataType::REDIS_TYPE_STRING, value->type, &response_,
1136
23
      VerifySuccessIfMissing::kTrue)) {
1137
0
    response_.set_code(RedisResponsePB::WRONG_TYPE);
1138
0
    response_.set_error_message(wrong_type_message);
1139
0
    return Status::OK();
1140
0
  }
1141
1142
  // If no value is present, 0 is the default.
1143
23
  int64_t old_value = 0, new_value;
1144
23
  if (value->type != REDIS_TYPE_NONE) {
1145
15
    auto old = CheckedStoll(value->value);
1146
15
    if (!old.ok()) {
1147
      // This can happen if there are leading or trailing spaces, or the value
1148
      // is out of range.
1149
4
      response_.set_code(RedisResponsePB::WRONG_TYPE);
1150
4
      response_.set_error_message("ERR value is not an integer or out of range");
1151
4
      return Status::OK();
1152
4
    }
1153
11
    old_value = *old;
1154
11
  }
1155
1156
19
  if ((incr < 0 && 
old_value < 04
&&
incr < numeric_limits<int64_t>::min() - old_value0
) ||
1157
19
      (incr > 0 && 
old_value > 015
&&
incr > numeric_limits<int64_t>::max() - old_value6
)) {
1158
1
    response_.set_code(RedisResponsePB::WRONG_TYPE);
1159
1
    response_.set_error_message("Increment would overflow");
1160
1
    return Status::OK();
1161
1
  }
1162
18
  new_value = old_value + incr;
1163
18
  response_.set_code(RedisResponsePB::OK);
1164
18
  response_.set_int_response(new_value);
1165
1166
18
  DocPath doc_path = DocPath::DocPathFromRedisKey(kv.hash_code(), kv.key());
1167
18
  if (kv.type() == REDIS_TYPE_HASH) {
1168
6
    QLValuePB kv_entries;
1169
6
    RETURN_NOT_OK(QLValueFromSubKeyStrict(
1170
6
        kv.subkey(0), kv.type(), kv_entries.mutable_map_value()->mutable_keys()->Add()));
1171
6
    kv_entries.mutable_map_value()->mutable_values()->Add()->set_string_value(
1172
6
        std::to_string(new_value));
1173
6
    return data.doc_write_batch->ExtendSubDocument(
1174
6
        doc_path, ValueRef(kv_entries, SortingType::kNotSpecified), data.read_time, data.deadline,
1175
6
        redis_query_id());
1176
0
    return Status::OK();
1177
12
  } else {  // kv.type() == REDIS_TYPE_STRING
1178
    // TODO: update the TTL with the write time rather than read time,
1179
    // or store the expiration.
1180
12
    auto control_fields = ValueControlFields {
1181
12
      .ttl = VERIFY_RESULT(value->exp.ComputeRelativeTtl(iterator_->read_time().read))
1182
0
    };
1183
0
    QLValuePB value_pb;
1184
12
    value_pb.set_string_value(std::to_string(new_value));
1185
12
    return data.doc_write_batch->SetPrimitive(
1186
12
        doc_path, control_fields, ValueRef(value_pb, SortingType::kNotSpecified),
1187
12
        std::move(iterator_));
1188
12
  }
1189
18
}
1190
1191
16
Status RedisWriteOperation::ApplyPush(const DocOperationApplyData& data) {
1192
16
  const RedisKeyValuePB& kv = request_.key_value();
1193
16
  DocPath doc_path = DocPath::DocPathFromRedisKey(kv.hash_code(), kv.key());
1194
16
  RedisDataType data_type = VERIFY_RESULT(GetValueType(data));
1195
16
  if (data_type != REDIS_TYPE_LIST && 
data_type != REDIS_TYPE_NONE6
) {
1196
0
    response_.set_code(RedisResponsePB::WRONG_TYPE);
1197
0
    response_.set_error_message(wrong_type_message);
1198
0
    return Status::OK();
1199
0
  }
1200
1201
16
  QLValuePB list;
1202
16
  int64_t card = VERIFY_RESULT(GetCardinality(iterator_.get(), kv)) + kv.value_size();
1203
0
  list.mutable_map_value()->mutable_keys()->Add()->set_virtual_value(QLVirtualValuePB::COUNTER);
1204
16
  list.mutable_map_value()->mutable_values()->Add()->set_int64_value(card);
1205
1206
16
  list.mutable_map_value()->mutable_keys()->Add()->set_virtual_value(QLVirtualValuePB::ARRAY);
1207
16
  auto& elements = *list.mutable_map_value()->mutable_values()->Add();
1208
1209
20
  for (const auto& val : kv.value()) {
1210
20
    elements.mutable_list_value()->mutable_elems()->Add()->set_string_value(val);
1211
20
  }
1212
1213
16
  ValueRef value_ref(list);
1214
16
  if (request_.push_request().side() == REDIS_SIDE_LEFT) {
1215
10
    value_ref.set_list_extend_order(ListExtendOrder::PREPEND);
1216
10
  }
1217
16
  value_ref.set_custom_value_type(ValueType::kRedisList);
1218
16
  if (data_type == REDIS_TYPE_NONE) {
1219
6
    RETURN_NOT_OK(data.doc_write_batch->InsertSubDocument(
1220
6
        DocPath::DocPathFromRedisKey(kv.hash_code(), kv.key()), value_ref,
1221
6
        data.read_time, data.deadline, redis_query_id()));
1222
10
  } else {
1223
10
    RETURN_NOT_OK(data.doc_write_batch->ExtendSubDocument(
1224
10
        DocPath::DocPathFromRedisKey(kv.hash_code(), kv.key()), value_ref,
1225
10
        data.read_time, data.deadline, redis_query_id()));
1226
10
  }
1227
1228
16
  response_.set_int_response(card);
1229
16
  response_.set_code(RedisResponsePB::OK);
1230
16
  return Status::OK();
1231
16
}
1232
1233
0
Status RedisWriteOperation::ApplyInsert(const DocOperationApplyData& data) {
1234
0
  return STATUS(NotSupported, "Redis operation has not been implemented");
1235
0
}
1236
1237
0
Status RedisWriteOperation::ApplyPop(const DocOperationApplyData& data) {
1238
0
  const RedisKeyValuePB& kv = request_.key_value();
1239
0
  DocPath doc_path = DocPath::DocPathFromRedisKey(kv.hash_code(), kv.key());
1240
0
  RedisDataType data_type = VERIFY_RESULT(GetValueType(data));
1241
1242
0
  if (!VerifyTypeAndSetCode(kv.type(), data_type, &response_, VerifySuccessIfMissing::kTrue)) {
1243
    // We already set the error code in the function.
1244
0
    return Status::OK();
1245
0
  }
1246
1247
0
  int64_t card = VERIFY_RESULT(GetCardinality(iterator_.get(), kv));
1248
1249
0
  if (!card) {
1250
0
    response_.set_code(RedisResponsePB::NIL);
1251
0
    return Status::OK();
1252
0
  }
1253
1254
0
  std::vector<std::string> value;
1255
0
  {
1256
0
    ValueRef value_ref(ValueType::kTombstone);
1257
1258
0
    if (request_.pop_request().side() == REDIS_SIDE_LEFT) {
1259
0
      RETURN_NOT_OK(data.doc_write_batch->ReplaceRedisInList(doc_path, 1, value_ref,
1260
0
          data.read_time, data.deadline, redis_query_id(), Direction::kForward, 0, &value));
1261
0
    } else {
1262
0
      RETURN_NOT_OK(data.doc_write_batch->ReplaceRedisInList(doc_path, card, value_ref,
1263
0
          data.read_time, data.deadline, redis_query_id(), Direction::kBackward, card + 1, &value));
1264
0
    }
1265
0
  }
1266
1267
0
  QLValuePB list;
1268
0
  list.mutable_map_value()->mutable_keys()->Add()->set_virtual_value(QLVirtualValuePB::COUNTER);
1269
0
  list.mutable_map_value()->mutable_values()->Add()->set_int64_value(--card);
1270
0
  ValueRef value_ref(list);
1271
0
  value_ref.set_custom_value_type(ValueType::kRedisList);
1272
0
  RETURN_NOT_OK(data.doc_write_batch->ExtendSubDocument(
1273
0
       doc_path, value_ref, data.read_time, data.deadline, redis_query_id()));
1274
1275
0
  if (value.size() != 1) {
1276
0
    return STATUS_SUBSTITUTE(Corruption, "Expected one popped value, got $0", value.size());
1277
0
  }
1278
1279
0
  response_.set_string_response(value[0]);
1280
0
  response_.set_code(RedisResponsePB::OK);
1281
0
  return Status::OK();
1282
0
}
1283
1284
26
Status RedisWriteOperation::ApplyAdd(const DocOperationApplyData& data) {
1285
26
  const RedisKeyValuePB& kv = request_.key_value();
1286
26
  RedisDataType data_type = VERIFY_RESULT(GetValueType(data));
1287
1288
26
  if (data_type != REDIS_TYPE_SET && 
data_type != REDIS_TYPE_NONE12
) {
1289
0
    response_.set_code(RedisResponsePB::WRONG_TYPE);
1290
0
    response_.set_error_message(wrong_type_message);
1291
0
    return Status::OK();
1292
0
  }
1293
1294
26
  DocPath doc_path = DocPath::DocPathFromRedisKey(kv.hash_code(), kv.key());
1295
1296
26
  if (kv.subkey_size() == 0) {
1297
0
    return STATUS(InvalidCommand, "SADD request has no subkeys set");
1298
0
  }
1299
1300
26
  int num_keys_found = 0;
1301
1302
26
  QLValuePB set_entries;
1303
1304
58
  for (int i = 0 ; i < kv.subkey_size(); 
i++32
) { // We know that each subkey is distinct.
1305
32
    if (FLAGS_emulate_redis_responses) {
1306
32
      RedisDataType type = VERIFY_RESULT(GetValueType(data, i));
1307
32
      if (type != REDIS_TYPE_NONE) {
1308
8
        num_keys_found++;
1309
8
      }
1310
32
    }
1311
1312
32
    set_entries.mutable_map_value()->mutable_keys()->Add()->set_string_value(
1313
32
        kv.subkey(i).string_subkey());
1314
32
    set_entries.mutable_map_value()->mutable_values()->Add()->set_virtual_value(
1315
32
        QLVirtualValuePB::NULL_LOW);
1316
32
  }
1317
1318
26
  ValueRef value_ref(set_entries);
1319
26
  value_ref.set_custom_value_type(ValueType::kRedisSet);
1320
1321
26
  Status s;
1322
1323
26
  if (data_type == REDIS_TYPE_NONE) {
1324
12
    RETURN_NOT_OK(data.doc_write_batch->InsertSubDocument(
1325
12
        doc_path, value_ref, data.read_time, data.deadline, redis_query_id()));
1326
14
  } else {
1327
14
    RETURN_NOT_OK(data.doc_write_batch->ExtendSubDocument(
1328
14
        doc_path, value_ref, data.read_time, data.deadline, redis_query_id()));
1329
14
  }
1330
1331
26
  response_.set_code(RedisResponsePB::OK);
1332
26
  if (FLAGS_emulate_redis_responses) {
1333
    // If flag is set, the actual number of new keys added is sent as response.
1334
26
    response_.set_int_response(kv.subkey_size() - num_keys_found);
1335
26
  }
1336
26
  return Status::OK();
1337
26
}
1338
1339
0
Status RedisWriteOperation::ApplyRemove(const DocOperationApplyData& data) {
1340
0
  return STATUS(NotSupported, "Redis operation has not been implemented");
1341
0
}
1342
1343
84.6k
Status RedisReadOperation::Execute() {
1344
84.6k
  SimulateTimeoutIfTesting(&deadline_);
1345
  // If we have a KEYS command, we don't specify any key for the iterator. Therefore, don't use
1346
  // bloom filters for this command.
1347
84.6k
  SubDocKey doc_key(
1348
84.6k
      DocKey::FromRedisKey(request_.key_value().hash_code(), request_.key_value().key()));
1349
84.6k
  auto bloom_filter_mode = request_.has_keys_request() ?
1350
83.9k
      
BloomFilterMode::DONT_USE_BLOOM_FILTER630
: BloomFilterMode::USE_BLOOM_FILTER;
1351
84.6k
  auto iter = yb::docdb::CreateIntentAwareIterator(
1352
84.6k
      doc_db_, bloom_filter_mode,
1353
84.6k
      doc_key.Encode().AsSlice(),
1354
84.6k
      redis_query_id(), TransactionOperationContext(), deadline_, read_time_);
1355
84.6k
  iterator_ = std::move(iter);
1356
84.6k
  deadline_info_.emplace(deadline_);
1357
1358
84.6k
  switch (request_.request_case()) {
1359
4
    case RedisReadRequestPB::kGetForRenameRequest:
1360
4
      return ExecuteGetForRename();
1361
72.6k
    case RedisReadRequestPB::kGetRequest:
1362
72.6k
      return ExecuteGet();
1363
291
    case RedisReadRequestPB::kGetTtlRequest:
1364
291
      return ExecuteGetTtl();
1365
2
    case RedisReadRequestPB::kStrlenRequest:
1366
2
      return ExecuteStrLen();
1367
4
    case RedisReadRequestPB::kExistsRequest:
1368
4
      return ExecuteExists();
1369
2
    case RedisReadRequestPB::kGetRangeRequest:
1370
2
      return ExecuteGetRange();
1371
10.9k
    case RedisReadRequestPB::kGetCollectionRangeRequest:
1372
10.9k
      return ExecuteCollectionGetRange();
1373
630
    case RedisReadRequestPB::kKeysRequest:
1374
630
      return ExecuteKeys();
1375
0
    default:
1376
0
      return STATUS_FORMAT(
1377
84.6k
          Corruption, "Unsupported redis read operation: $0", request_.request_case());
1378
84.6k
  }
1379
84.6k
}
1380
1381
namespace {
1382
1383
4
ssize_t ApplyIndex(ssize_t index, ssize_t len) {
1384
4
  if (index < 0) {
1385
0
    index += len;
1386
0
    return std::max<ssize_t>(index, 0);
1387
0
  }
1388
4
  return std::min<ssize_t>(index, len);
1389
4
}
1390
1391
} // namespace
1392
1393
Status RedisReadOperation::ExecuteHGetAllLikeCommands(ValueType value_type,
1394
                                                      bool add_keys,
1395
68
                                                      bool add_values) {
1396
68
  SubDocKey doc_key(
1397
68
      DocKey::FromRedisKey(request_.key_value().hash_code(), request_.key_value().key()));
1398
68
  SubDocument doc;
1399
68
  bool doc_found = false;
1400
68
  auto encoded_doc_key = doc_key.EncodeWithoutHt();
1401
1402
  // TODO(dtxn) - pass correct transaction context when we implement cross-shard transactions
1403
  // support for Redis.
1404
68
  GetRedisSubDocumentData data = { encoded_doc_key, &doc, &doc_found };
1405
68
  data.deadline_info = deadline_info_.get_ptr();
1406
1407
68
  bool has_cardinality_subkey = value_type == ValueType::kRedisSortedSet ||
1408
68
                                
value_type == ValueType::kRedisList66
;
1409
68
  bool return_array_response = add_keys || 
add_values55
;
1410
1411
68
  if (has_cardinality_subkey) {
1412
8
    data.return_type_only = !return_array_response;
1413
60
  } else {
1414
60
    data.count_only = !return_array_response;
1415
60
  }
1416
1417
68
  RETURN_NOT_OK(GetRedisSubDocument(iterator_.get(), data, /* projection */ nullptr,
1418
68
                               SeekFwdSuffices::kFalse));
1419
68
  if (return_array_response)
1420
16
    response_.set_allocated_array_response(new RedisArrayPB());
1421
1422
68
  if (!doc_found) {
1423
12
    response_.set_code(RedisResponsePB::OK);
1424
12
    if (!return_array_response)
1425
12
      response_.set_int_response(0);
1426
12
    return Status::OK();
1427
12
  }
1428
1429
56
  if (VerifyTypeAndSetCode(value_type, doc.value_type(), &response_)) {
1430
47
    if (return_array_response) {
1431
15
      RETURN_NOT_OK(PopulateResponseFrom(doc.object_container(), AddResponseValuesGeneric,
1432
15
                                         &response_, add_keys, add_values));
1433
32
    } else {
1434
32
      int64_t card = has_cardinality_subkey ?
1435
32
        VERIFY_RESULT(GetCardinality(iterator_.get(), request_.key_value())) :
1436
32
        
data.record_count28
;
1437
0
      response_.set_int_response(card);
1438
32
      response_.set_code(RedisResponsePB::OK);
1439
32
    }
1440
47
  }
1441
56
  return Status::OK();
1442
56
}
1443
1444
Status RedisReadOperation::ExecuteCollectionGetRangeByBounds(
1445
0
    RedisCollectionGetRangeRequestPB::GetRangeRequestType request_type, bool add_keys) {
1446
0
  RedisSubKeyBoundPB lower_bound;
1447
0
  lower_bound.set_infinity_type(RedisSubKeyBoundPB::NEGATIVE);
1448
0
  RedisSubKeyBoundPB upper_bound;
1449
0
  upper_bound.set_infinity_type(RedisSubKeyBoundPB::POSITIVE);
1450
0
  return ExecuteCollectionGetRangeByBounds(request_type, lower_bound, upper_bound, add_keys);
1451
0
}
1452
1453
Status RedisReadOperation::ExecuteCollectionGetRangeByBounds(
1454
    RedisCollectionGetRangeRequestPB::GetRangeRequestType request_type,
1455
10.9k
    const RedisSubKeyBoundPB& lower_bound, const RedisSubKeyBoundPB& upper_bound, bool add_keys) {
1456
10.9k
  if ((lower_bound.has_infinity_type() &&
1457
10.9k
       
lower_bound.infinity_type() == RedisSubKeyBoundPB::POSITIVE116
) ||
1458
10.9k
      (upper_bound.has_infinity_type() &&
1459
10.9k
       
upper_bound.infinity_type() == RedisSubKeyBoundPB::NEGATIVE114
)) {
1460
    // Return empty response.
1461
0
    response_.set_code(RedisResponsePB::OK);
1462
0
    RETURN_NOT_OK(PopulateResponseFrom(
1463
0
        SubDocument::ObjectContainer(), AddResponseValuesGeneric, &response_, /* add_keys */ true,
1464
0
        /* add_values */ true));
1465
0
    return Status::OK();
1466
0
  }
1467
1468
10.9k
  if (request_type == RedisCollectionGetRangeRequestPB::ZRANGEBYSCORE) {
1469
98
    auto type = VERIFY_RESULT(GetValueType());
1470
0
    auto expected_type = REDIS_TYPE_SORTEDSET;
1471
98
    if (!VerifyTypeAndSetCode(expected_type, type, &response_, VerifySuccessIfMissing::kTrue)) {
1472
6
      return Status::OK();
1473
6
    }
1474
92
    auto encoded_doc_key =
1475
92
        DocKey::EncodedFromRedisKey(request_.key_value().hash_code(), request_.key_value().key());
1476
92
    PrimitiveValue(ValueType::kSSForward).AppendToKey(&encoded_doc_key);
1477
92
    double low_double = lower_bound.subkey_bound().double_subkey();
1478
92
    double high_double = upper_bound.subkey_bound().double_subkey();
1479
1480
92
    KeyBytes low_sub_key_bound;
1481
92
    KeyBytes high_sub_key_bound;
1482
1483
92
    SliceKeyBound low_subkey;
1484
92
    if (!lower_bound.has_infinity_type()) {
1485
24
      low_sub_key_bound = encoded_doc_key;
1486
24
      PrimitiveValue::Double(low_double).AppendToKey(&low_sub_key_bound);
1487
24
      low_subkey = SliceKeyBound(low_sub_key_bound, LowerBound(lower_bound.is_exclusive()));
1488
24
    }
1489
92
    SliceKeyBound high_subkey;
1490
92
    if (!upper_bound.has_infinity_type()) {
1491
26
      high_sub_key_bound = encoded_doc_key;
1492
26
      PrimitiveValue::Double(high_double).AppendToKey(&high_sub_key_bound);
1493
26
      high_subkey = SliceKeyBound(high_sub_key_bound, UpperBound(upper_bound.is_exclusive()));
1494
26
    }
1495
1496
92
    SubDocument doc;
1497
92
    bool doc_found = false;
1498
92
    GetRedisSubDocumentData data = {encoded_doc_key, &doc, &doc_found};
1499
92
    data.deadline_info = deadline_info_.get_ptr();
1500
92
    data.low_subkey = &low_subkey;
1501
92
    data.high_subkey = &high_subkey;
1502
1503
92
    IndexBound low_index;
1504
92
    IndexBound high_index;
1505
92
    if (request_.has_range_request_limit()) {
1506
50
      auto offset = request_.index_range().lower_bound().index();
1507
50
      int32_t limit = request_.range_request_limit();
1508
1509
50
      if (offset < 0 || 
limit == 038
) {
1510
        // Return an empty response.
1511
18
        response_.set_code(RedisResponsePB::OK);
1512
18
        RETURN_NOT_OK(PopulateResponseFrom(
1513
18
            SubDocument::ObjectContainer(), AddResponseValuesGeneric, &response_,
1514
18
            /* add_keys */ true, /* add_values */ true));
1515
18
        return Status::OK();
1516
18
      }
1517
1518
32
      low_index = IndexBound(offset, true /* is_lower */);
1519
32
      data.low_index = &low_index;
1520
32
      if (limit > 0) {
1521
        // Only define upper bound if limit is positive.
1522
26
        high_index = IndexBound(offset + limit - 1, false /* is_lower */);
1523
26
        data.high_index = &high_index;
1524
26
      }
1525
32
    }
1526
74
    RETURN_NOT_OK(GetAndPopulateResponseValues(
1527
74
        iterator_.get(), AddResponseValuesSortedSets, data, ValueType::kObject, request_,
1528
74
        &response_,
1529
74
        /* add_keys */ add_keys, /* add_values */ true, /* reverse */ false));
1530
10.8k
  } else {
1531
10.8k
    auto encoded_doc_key =
1532
10.8k
        DocKey::EncodedFromRedisKey(request_.key_value().hash_code(), request_.key_value().key());
1533
10.8k
    int64_t low_timestamp = lower_bound.subkey_bound().timestamp_subkey();
1534
10.8k
    int64_t high_timestamp = upper_bound.subkey_bound().timestamp_subkey();
1535
1536
10.8k
    KeyBytes low_sub_key_bound;
1537
10.8k
    KeyBytes high_sub_key_bound;
1538
1539
10.8k
    SliceKeyBound low_subkey;
1540
    // Need to switch the order since we store the timestamps in descending order.
1541
10.8k
    if (!upper_bound.has_infinity_type()) {
1542
10.8k
      low_sub_key_bound = encoded_doc_key;
1543
10.8k
      PrimitiveValue(high_timestamp, SortOrder::kDescending).AppendToKey(&low_sub_key_bound);
1544
10.8k
      low_subkey = SliceKeyBound(low_sub_key_bound, LowerBound(upper_bound.is_exclusive()));
1545
10.8k
    }
1546
10.8k
    SliceKeyBound high_subkey;
1547
10.8k
    if (!lower_bound.has_infinity_type()) {
1548
10.8k
      high_sub_key_bound = encoded_doc_key;
1549
10.8k
      PrimitiveValue(low_timestamp, SortOrder::kDescending).AppendToKey(&high_sub_key_bound);
1550
10.8k
      high_subkey = SliceKeyBound(high_sub_key_bound, UpperBound(lower_bound.is_exclusive()));
1551
10.8k
    }
1552
1553
10.8k
    SubDocument doc;
1554
10.8k
    bool doc_found = false;
1555
10.8k
    GetRedisSubDocumentData data = {encoded_doc_key, &doc, &doc_found};
1556
10.8k
    data.deadline_info = deadline_info_.get_ptr();
1557
10.8k
    data.low_subkey = &low_subkey;
1558
10.8k
    data.high_subkey = &high_subkey;
1559
10.8k
    data.limit = request_.range_request_limit();
1560
10.8k
    bool is_reverse = true;
1561
10.8k
    if (request_type == RedisCollectionGetRangeRequestPB::TSREVRANGEBYTIME) {
1562
      // If reverse is false, newest element is the first element returned.
1563
10.8k
      is_reverse = false;
1564
10.8k
    }
1565
10.8k
    RETURN_NOT_OK(GetAndPopulateResponseValues(
1566
10.8k
        iterator_.get(), AddResponseValuesGeneric, data, ValueType::kRedisTS, request_, &response_,
1567
10.8k
        /* add_keys */ true, /* add_values */ true, is_reverse));
1568
10.8k
  }
1569
10.9k
  return Status::OK();
1570
10.9k
}
1571
1572
10.9k
Status RedisReadOperation::ExecuteCollectionGetRange() {
1573
10.9k
  const RedisKeyValuePB& key_value = request_.key_value();
1574
10.9k
  if (!request_.has_key_value() || !key_value.has_key()) {
1575
0
    return STATUS(InvalidArgument, "Need to specify the key");
1576
0
  }
1577
1578
10.9k
  const auto request_type = request_.get_collection_range_request().request_type();
1579
10.9k
  switch (request_type) {
1580
10.8k
    case RedisCollectionGetRangeRequestPB::TSREVRANGEBYTIME:
1581
10.8k
      FALLTHROUGH_INTENDED;
1582
10.9k
    case RedisCollectionGetRangeRequestPB::ZRANGEBYSCORE: FALLTHROUGH_INTENDED;
1583
10.9k
    case RedisCollectionGetRangeRequestPB::TSRANGEBYTIME: {
1584
10.9k
      if(!request_.has_subkey_range() || !request_.subkey_range().has_lower_bound() ||
1585
10.9k
          !request_.subkey_range().has_upper_bound()) {
1586
0
        return STATUS(InvalidArgument, "Need to specify the subkey range");
1587
0
      }
1588
10.9k
      const RedisSubKeyBoundPB& lower_bound = request_.subkey_range().lower_bound();
1589
10.9k
      const RedisSubKeyBoundPB& upper_bound = request_.subkey_range().upper_bound();
1590
10.9k
      const bool add_keys = request_.get_collection_range_request().with_scores();
1591
10.9k
      return ExecuteCollectionGetRangeByBounds(request_type, lower_bound, upper_bound, add_keys);
1592
10.9k
    }
1593
12
    case RedisCollectionGetRangeRequestPB::ZRANGE: FALLTHROUGH_INTENDED;
1594
20
    case RedisCollectionGetRangeRequestPB::ZREVRANGE: {
1595
20
      if(!request_.has_index_range() || !request_.index_range().has_lower_bound() ||
1596
20
          !request_.index_range().has_upper_bound()) {
1597
0
        return STATUS(InvalidArgument, "Need to specify the index range");
1598
0
      }
1599
1600
      // First make sure is of type sorted set or none.
1601
20
      RedisDataType type = VERIFY_RESULT(GetValueType());
1602
0
      auto expected_type = RedisDataType::REDIS_TYPE_SORTEDSET;
1603
20
      if (!VerifyTypeAndSetCode(expected_type, type, &response_, VerifySuccessIfMissing::kTrue)) {
1604
0
        return Status::OK();
1605
0
      }
1606
1607
20
      int64_t card = VERIFY_RESULT(GetCardinality(iterator_.get(), request_.key_value()));
1608
1609
0
      const RedisIndexBoundPB& low_index_bound = request_.index_range().lower_bound();
1610
20
      const RedisIndexBoundPB& high_index_bound = request_.index_range().upper_bound();
1611
1612
20
      int64 low_idx_normalized, high_idx_normalized;
1613
1614
20
      int64 low_idx = low_index_bound.index();
1615
20
      int64 high_idx = high_index_bound.index();
1616
      // Normalize the bounds to be positive and go from low to high index.
1617
20
      bool reverse = false;
1618
20
      if (request_type == RedisCollectionGetRangeRequestPB::ZREVRANGE) {
1619
8
        reverse = true;
1620
8
      }
1621
20
      GetNormalizedBounds(
1622
20
          low_idx, high_idx, card, reverse, &low_idx_normalized, &high_idx_normalized);
1623
1624
20
      if (high_idx_normalized < low_idx_normalized) {
1625
        // Return empty response.
1626
0
        response_.set_code(RedisResponsePB::OK);
1627
0
        RETURN_NOT_OK(PopulateResponseFrom(SubDocument::ObjectContainer(),
1628
0
                                           AddResponseValuesGeneric,
1629
0
                                           &response_, /* add_keys */
1630
0
                                           true, /* add_values */
1631
0
                                           true));
1632
0
        return Status::OK();
1633
0
      }
1634
20
      auto encoded_doc_key = DocKey::EncodedFromRedisKey(
1635
20
          request_.key_value().hash_code(), request_.key_value().key());
1636
20
      PrimitiveValue(ValueType::kSSForward).AppendToKey(&encoded_doc_key);
1637
1638
20
      bool add_keys = request_.get_collection_range_request().with_scores();
1639
1640
20
      IndexBound low_bound = IndexBound(low_idx_normalized, true /* is_lower */);
1641
20
      IndexBound high_bound = IndexBound(high_idx_normalized, false /* is_lower */);
1642
1643
20
      SubDocument doc;
1644
20
      bool doc_found = false;
1645
20
      GetRedisSubDocumentData data = { encoded_doc_key, &doc, &doc_found};
1646
20
      data.deadline_info = deadline_info_.get_ptr();
1647
20
      data.low_index = &low_bound;
1648
20
      data.high_index = &high_bound;
1649
1650
20
      RETURN_NOT_OK(GetAndPopulateResponseValues(
1651
20
          iterator_.get(), AddResponseValuesSortedSets, data, ValueType::kObject, request_,
1652
20
          &response_, add_keys, /* add_values */ true, reverse));
1653
20
      break;
1654
20
    }
1655
20
    case RedisCollectionGetRangeRequestPB::UNKNOWN:
1656
0
      return STATUS(InvalidCommand, "Unknown Collection Get Range Request not supported");
1657
10.9k
  }
1658
1659
20
  return Status::OK();
1660
10.9k
}
1661
1662
72.7k
Result<RedisDataType> RedisReadOperation::GetValueType(int subkey_index) {
1663
72.7k
  return GetRedisValueType(iterator_.get(), request_.key_value(),
1664
72.7k
                           nullptr /* doc_write_batch */, subkey_index);
1665
72.7k
}
1666
1667
1.21k
Result<RedisValue> RedisReadOperation::GetOverrideValue(int subkey_index) {
1668
1.21k
  return GetRedisValue(iterator_.get(), request_.key_value(),
1669
1.21k
                       subkey_index, /* always_override */ true);
1670
1.21k
}
1671
1672
71.3k
Result<RedisValue> RedisReadOperation::GetValue(int subkey_index) {
1673
71.3k
    return GetRedisValue(iterator_.get(), request_.key_value(), subkey_index);
1674
71.3k
}
1675
1676
namespace {
1677
1678
// Note: Do not use if also retrieving other value, as some work will be repeated.
1679
// Assumes every value has a TTL, and the TTL is stored in the row with this key.
1680
// Also observe that tombstone checking only works because we assume the key has
1681
// no ancestors.
1682
Result<boost::optional<Expiration>> GetTtl(
1683
291
    const Slice& encoded_subdoc_key, IntentAwareIterator* iter) {
1684
291
  auto dockey_size =
1685
291
    VERIFY_RESULT(DocKey::EncodedSize(encoded_subdoc_key, DocKeyPart::kWholeDocKey));
1686
0
  Slice key_slice(encoded_subdoc_key.data(), dockey_size);
1687
291
  iter->Seek(key_slice);
1688
291
  if (!iter->valid())
1689
16
    return boost::none;
1690
275
  auto key_data = VERIFY_RESULT(iter->FetchKey());
1691
275
  if (!key_data.key.compare(key_slice)) {
1692
275
    Value doc_value = Value(PrimitiveValue(ValueType::kInvalid));
1693
275
    RETURN_NOT_OK(doc_value.Decode(iter->value()));
1694
275
    if (doc_value.value_type() != ValueType::kTombstone) {
1695
239
      return Expiration(key_data.write_time.hybrid_time(), doc_value.ttl());
1696
239
    }
1697
275
  }
1698
36
  return boost::none;
1699
275
}
1700
1701
} // namespace
1702
1703
291
Status RedisReadOperation::ExecuteGetTtl() {
1704
291
  const RedisKeyValuePB& kv = request_.key_value();
1705
291
  if (!kv.has_key()) {
1706
0
    return STATUS(Corruption, "Expected KeyValuePB");
1707
0
  }
1708
  // We currently only support getting and setting TTL on top level keys.
1709
291
  if (!kv.subkey().empty()) {
1710
0
    return STATUS_SUBSTITUTE(Corruption,
1711
0
                             "Expected no subkeys, got $0", kv.subkey().size());
1712
0
  }
1713
1714
291
  auto encoded_doc_key = DocKey::EncodedFromRedisKey(kv.hash_code(), kv.key());
1715
291
  auto maybe_ttl_exp = VERIFY_RESULT(GetTtl(encoded_doc_key.AsSlice(), iterator_.get()));
1716
1717
291
  if (!maybe_ttl_exp.has_value()) {
1718
52
    response_.set_int_response(-2);
1719
52
    return Status::OK();
1720
52
  }
1721
1722
239
  auto exp = maybe_ttl_exp.get();
1723
239
  if (exp.ttl.Equals(ValueControlFields::kMaxTtl)) {
1724
57
    response_.set_int_response(-1);
1725
57
    return Status::OK();
1726
57
  }
1727
1728
182
  MonoDelta ttl = VERIFY_RESULT(exp.ComputeRelativeTtl(iterator_->read_time().read));
1729
182
  if (ttl.IsNegative()) {
1730
    // The value has expired.
1731
48
    response_.set_int_response(-2);
1732
48
    return Status::OK();
1733
48
  }
1734
1735
134
  response_.set_int_response(request_.get_ttl_request().return_seconds() ?
1736
71
                             (int64_t) std::round(ttl.ToSeconds()) :
1737
134
                             
ttl.ToMilliseconds()63
);
1738
134
  return Status::OK();
1739
182
}
1740
1741
4
Status RedisReadOperation::ExecuteGetForRename() {
1742
4
  RedisDataType type = VERIFY_RESULT(GetValueType());
1743
0
  response_.set_type(type);
1744
4
  switch (type) {
1745
4
    case RedisDataType::REDIS_TYPE_STRING: {
1746
4
      return ExecuteGet(RedisGetRequestPB::GET);
1747
0
    }
1748
1749
0
    case RedisDataType::REDIS_TYPE_HASH: {
1750
0
      return ExecuteGet(RedisGetRequestPB::HGETALL);
1751
0
    }
1752
1753
0
    case RedisDataType::REDIS_TYPE_SET: {
1754
0
      return ExecuteGet(RedisGetRequestPB::SMEMBERS);
1755
0
    }
1756
1757
0
    case RedisDataType::REDIS_TYPE_SORTEDSET: {
1758
0
      return ExecuteCollectionGetRangeByBounds(
1759
0
          RedisCollectionGetRangeRequestPB::ZRANGEBYSCORE, true);
1760
0
    }
1761
1762
0
    case RedisDataType::REDIS_TYPE_TIMESERIES: {
1763
0
      return ExecuteCollectionGetRangeByBounds(
1764
0
          RedisCollectionGetRangeRequestPB::TSRANGEBYTIME, true);
1765
0
    }
1766
1767
0
    case RedisDataType::REDIS_TYPE_NONE: {
1768
0
      response_.set_code(RedisResponsePB::NOT_FOUND);
1769
0
      return Status::OK();
1770
0
    }
1771
1772
0
    case RedisDataType::REDIS_TYPE_LIST:
1773
0
    default: {
1774
0
      LOG(DFATAL) << "Unhandled Redis Data Type " << type;
1775
0
    }
1776
4
  }
1777
0
  return Status::OK();
1778
4
}
1779
1780
4
Status RedisReadOperation::ExecuteGet(RedisGetRequestPB::GetRequestType type) {
1781
4
  RedisGetRequestPB request;
1782
4
  request.set_request_type(type);
1783
4
  return ExecuteGet(request);
1784
4
}
1785
1786
72.6k
Status RedisReadOperation::ExecuteGet() { return ExecuteGet(request_.get_request()); }
1787
1788
72.7k
Status RedisReadOperation::ExecuteGet(const RedisGetRequestPB& get_request) {
1789
72.7k
  auto request_type = get_request.request_type();
1790
72.7k
  RedisDataType expected_type = REDIS_TYPE_NONE;
1791
72.7k
  switch (request_type) {
1792
71.3k
    case RedisGetRequestPB::GET:
1793
71.3k
      expected_type = REDIS_TYPE_STRING; break;
1794
1.21k
    case RedisGetRequestPB::TSGET:
1795
1.21k
      expected_type = REDIS_TYPE_TIMESERIES; break;
1796
14
    case RedisGetRequestPB::HGET: FALLTHROUGH_INTENDED;
1797
20
    case RedisGetRequestPB::HEXISTS:
1798
20
      expected_type = REDIS_TYPE_HASH; break;
1799
4
    case RedisGetRequestPB::SISMEMBER:
1800
4
      expected_type = REDIS_TYPE_SET; break;
1801
52
    case RedisGetRequestPB::ZSCORE:
1802
52
      expected_type = REDIS_TYPE_SORTEDSET; break;
1803
71
    default:
1804
71
      expected_type = REDIS_TYPE_NONE;
1805
72.7k
  }
1806
72.7k
  switch (request_type) {
1807
71.3k
    case RedisGetRequestPB::GET: FALLTHROUGH_INTENDED;
1808
72.5k
    case RedisGetRequestPB::TSGET: FALLTHROUGH_INTENDED;
1809
72.5k
    case RedisGetRequestPB::HGET: {
1810
72.5k
      RedisDataType type = VERIFY_RESULT(GetValueType());
1811
      // TODO: this is primarily glue for the Timeseries bug where the parent
1812
      // may get compacted due to an outdated TTL even though the children
1813
      // have longer TTL's and thus still exist. When fixing, take note that
1814
      // GetValueType finds the value type of the parent, so if the parent
1815
      // does not have the maximum TTL, it will return REDIS_TYPE_NONE when it
1816
      // should not.
1817
72.5k
      if (expected_type == REDIS_TYPE_TIMESERIES && 
type == REDIS_TYPE_NONE1.21k
) {
1818
0
        type = expected_type;
1819
0
      }
1820
      // If wrong type, we set the error code in the response.
1821
72.5k
      if (VerifyTypeAndSetCode(expected_type, type, &response_, VerifySuccessIfMissing::kTrue)) {
1822
72.5k
        auto value = request_type == RedisGetRequestPB::TSGET ? 
GetOverrideValue()1.21k
:
GetValue()71.3k
;
1823
72.5k
        RETURN_NOT_OK(value);
1824
72.5k
        if (VerifyTypeAndSetCode(RedisDataType::REDIS_TYPE_STRING, value->type, &response_,
1825
72.5k
            VerifySuccessIfMissing::kTrue)) {
1826
72.5k
          response_.set_string_response(value->value);
1827
72.5k
        }
1828
72.5k
      }
1829
72.5k
      return Status::OK();
1830
72.5k
    }
1831
52
    case RedisGetRequestPB::ZSCORE: {
1832
52
      RedisDataType type = VERIFY_RESULT(GetValueType());
1833
      // If wrong type, we set the error code in the response.
1834
52
      if (!VerifyTypeAndSetCode(expected_type, type, &response_, VerifySuccessIfMissing::kTrue)) {
1835
6
        return Status::OK();
1836
6
      }
1837
46
      SubDocKey key_reverse = SubDocKey(
1838
46
          DocKey::FromRedisKey(request_.key_value().hash_code(), request_.key_value().key()),
1839
46
          PrimitiveValue(ValueType::kSSReverse),
1840
46
          PrimitiveValue(request_.key_value().subkey(0).string_subkey()));
1841
46
      SubDocument subdoc_reverse;
1842
46
      bool subdoc_reverse_found = false;
1843
46
      auto encoded_key_reverse = key_reverse.EncodeWithoutHt();
1844
46
      GetRedisSubDocumentData get_data = {
1845
46
          encoded_key_reverse, &subdoc_reverse, &subdoc_reverse_found };
1846
46
      RETURN_NOT_OK(GetRedisSubDocument(doc_db_, get_data, redis_query_id(),
1847
46
                                        TransactionOperationContext(), deadline_, read_time_));
1848
46
      if (subdoc_reverse_found) {
1849
31
        double score = subdoc_reverse.GetDouble();
1850
31
        response_.set_string_response(std::to_string(score));
1851
31
      } else {
1852
15
        response_.set_code(RedisResponsePB::NIL);
1853
15
      }
1854
46
      return Status::OK();
1855
46
    }
1856
6
    case RedisGetRequestPB::HEXISTS: FALLTHROUGH_INTENDED;
1857
10
    case RedisGetRequestPB::SISMEMBER: {
1858
10
      RedisDataType type = VERIFY_RESULT(GetValueType());
1859
10
      if (VerifyTypeAndSetCode(expected_type, type, &response_, VerifySuccessIfMissing::kTrue)) {
1860
10
        RedisDataType subtype = VERIFY_RESULT(GetValueType(0));
1861
0
        SetOptionalInt(subtype, 1, &response_);
1862
10
        response_.set_code(RedisResponsePB::OK);
1863
10
      }
1864
10
      return Status::OK();
1865
10
    }
1866
0
    case RedisGetRequestPB::HSTRLEN: {
1867
0
      RedisDataType type = VERIFY_RESULT(GetValueType());
1868
0
      if (VerifyTypeAndSetCode(RedisDataType::REDIS_TYPE_HASH, type, &response_,
1869
0
                               VerifySuccessIfMissing::kTrue)) {
1870
0
        auto value = GetValue();
1871
0
        RETURN_NOT_OK(value);
1872
0
        SetOptionalInt(value->type, value->value.length(), &response_);
1873
0
        response_.set_code(RedisResponsePB::OK);
1874
0
      }
1875
0
      return Status::OK();
1876
0
    }
1877
0
    case RedisGetRequestPB::MGET: {
1878
0
      return STATUS(NotSupported, "MGET not yet supported");
1879
0
    }
1880
3
    case RedisGetRequestPB::HMGET: {
1881
3
      RedisDataType type = VERIFY_RESULT(GetValueType());
1882
3
      if (!VerifyTypeAndSetCode(RedisDataType::REDIS_TYPE_HASH, type, &response_,
1883
3
                                VerifySuccessIfMissing::kTrue)) {
1884
0
        return Status::OK();
1885
0
      }
1886
1887
3
      response_.set_allocated_array_response(new RedisArrayPB());
1888
3
      const auto& req_kv = request_.key_value();
1889
3
      auto num_subkeys = req_kv.subkey_size();
1890
3
      vector<int> indices(num_subkeys);
1891
11
      for (int i = 0; i < num_subkeys; 
++i8
) {
1892
8
        indices[i] = i;
1893
8
      }
1894
6
      std::sort(indices.begin(), indices.end(), [&req_kv](int i, int j) {
1895
6
            return req_kv.subkey(i).string_subkey() < req_kv.subkey(j).string_subkey();
1896
6
          });
1897
1898
3
      string current_value = "";
1899
3
      response_.mutable_array_response()->mutable_elements()->Reserve(num_subkeys);
1900
11
      for (int i = 0; i < num_subkeys; 
++i8
) {
1901
8
        response_.mutable_array_response()->add_elements();
1902
8
      }
1903
11
      for (int i = 0; i < num_subkeys; 
++i8
) {
1904
8
        if (i == 0 ||
1905
8
            req_kv.subkey(indices[i]).string_subkey() !=
1906
8
            req_kv.subkey(indices[i - 1]).string_subkey()) {
1907
          // If the condition above is false, we encountered the same key again, no need to call
1908
          // GetValue() once more, current_value is already correct.
1909
8
          auto value = GetValue(indices[i]);
1910
8
          RETURN_NOT_OK(value);
1911
8
          if (value->type == REDIS_TYPE_STRING) {
1912
6
            current_value = std::move(value->value);
1913
6
          } else {
1914
2
            current_value = ""; // Empty string is nil response.
1915
2
          }
1916
8
        }
1917
8
        *response_.mutable_array_response()->mutable_elements(indices[i]) = current_value;
1918
8
      }
1919
1920
3
      response_.set_code(RedisResponsePB::OK);
1921
3
      return Status::OK();
1922
3
    }
1923
5
    case RedisGetRequestPB::HGETALL:
1924
5
      return ExecuteHGetAllLikeCommands(ValueType::kObject, true, true);
1925
3
    case RedisGetRequestPB::HKEYS:
1926
3
      return ExecuteHGetAllLikeCommands(ValueType::kObject, true, false);
1927
3
    case RedisGetRequestPB::HVALS:
1928
3
      return ExecuteHGetAllLikeCommands(ValueType::kObject, false, true);
1929
4
    case RedisGetRequestPB::HLEN:
1930
4
      return ExecuteHGetAllLikeCommands(ValueType::kObject, false, false);
1931
5
    case RedisGetRequestPB::SMEMBERS:
1932
5
      return ExecuteHGetAllLikeCommands(ValueType::kRedisSet, true, false);
1933
4
    case RedisGetRequestPB::SCARD:
1934
4
      return ExecuteHGetAllLikeCommands(ValueType::kRedisSet, false, false);
1935
36
    case RedisGetRequestPB::TSCARD:
1936
36
      return ExecuteHGetAllLikeCommands(ValueType::kRedisTS, false, false);
1937
2
    case RedisGetRequestPB::ZCARD:
1938
2
      return ExecuteHGetAllLikeCommands(ValueType::kRedisSortedSet, false, false);
1939
6
    case RedisGetRequestPB::LLEN:
1940
6
      return ExecuteHGetAllLikeCommands(ValueType::kRedisList, false, false);
1941
0
    case RedisGetRequestPB::UNKNOWN: {
1942
0
      return STATUS(InvalidCommand, "Unknown Get Request not supported");
1943
3
    }
1944
72.7k
  }
1945
0
  return Status::OK();
1946
72.7k
}
1947
1948
2
Status RedisReadOperation::ExecuteStrLen() {
1949
2
  auto value = GetValue();
1950
2
  response_.set_code(RedisResponsePB::OK);
1951
2
  RETURN_NOT_OK(value);
1952
1953
2
  if (VerifyTypeAndSetCode(RedisDataType::REDIS_TYPE_STRING, value->type, &response_,
1954
2
        VerifySuccessIfMissing::kTrue)) {
1955
2
    SetOptionalInt(value->type, value->value.length(), &response_);
1956
2
  }
1957
2
  response_.set_code(RedisResponsePB::OK);
1958
1959
2
  return Status::OK();
1960
2
}
1961
1962
4
Status RedisReadOperation::ExecuteExists() {
1963
4
  auto value = GetValue();
1964
4
  response_.set_code(RedisResponsePB::OK);
1965
4
  RETURN_NOT_OK(value);
1966
1967
  // We only support exist command with one argument currently.
1968
4
  SetOptionalInt(value->type, 1, &response_);
1969
4
  response_.set_code(RedisResponsePB::OK);
1970
1971
4
  return Status::OK();
1972
4
}
1973
1974
2
Status RedisReadOperation::ExecuteGetRange() {
1975
2
  auto value = GetValue();
1976
2
  RETURN_NOT_OK(value);
1977
1978
2
  if (!VerifyTypeAndSetCode(RedisDataType::REDIS_TYPE_STRING, value->type, &response_,
1979
2
      VerifySuccessIfMissing::kTrue)) {
1980
    // We've already set the error code in the response.
1981
0
    return Status::OK();
1982
0
  }
1983
1984
2
  const ssize_t len = value->value.length();
1985
2
  ssize_t exclusive_end = request_.get_range_request().end() + 1;
1986
2
  if (exclusive_end == 0) {
1987
0
    exclusive_end = len;
1988
0
  }
1989
1990
  // We treat negative indices to refer backwards from the end of the string.
1991
2
  const auto start = ApplyIndex(request_.get_range_request().start(), len);
1992
2
  auto end = ApplyIndex(exclusive_end, len);
1993
2
  if (end < start) {
1994
0
    end = start;
1995
0
  }
1996
1997
2
  response_.set_code(RedisResponsePB::OK);
1998
2
  response_.set_string_response(value->value.c_str() + start, end - start);
1999
2
  return Status::OK();
2000
2
}
2001
2002
630
Status RedisReadOperation::ExecuteKeys() {
2003
630
  iterator_->Seek(DocKey());
2004
630
  int threshold = request_.keys_request().threshold();
2005
2006
630
  bool doc_found;
2007
630
  SubDocument result;
2008
2009
480k
  while (iterator_->valid()) {
2010
480k
    if (deadline_info_.get_ptr() && deadline_info_->CheckAndSetDeadlinePassed()) {
2011
0
      return STATUS(Expired, "Deadline for query passed.");
2012
0
    }
2013
480k
    auto key = VERIFY_RESULT(iterator_->FetchKey()).key;
2014
2015
    // Key could be invalidated because we could move iterator, so back it up.
2016
0
    KeyBytes key_copy(key);
2017
480k
    key = key_copy.AsSlice();
2018
2019
480k
    DocKey doc_key;
2020
480k
    RETURN_NOT_OK(doc_key.FullyDecodeFrom(key));
2021
480k
    const PrimitiveValue& key_primitive = doc_key.hashed_group().front();
2022
480k
    if (!key_primitive.IsString() ||
2023
480k
        !RedisPatternMatch(request_.keys_request().pattern(),
2024
480k
                           key_primitive.GetString(),
2025
480k
                           false /* ignore_case */)) {
2026
292k
      iterator_->SeekOutOfSubDoc(key);
2027
292k
      continue;
2028
292k
    }
2029
2030
187k
    GetRedisSubDocumentData data = {key, &result, &doc_found};
2031
187k
    data.deadline_info = deadline_info_.get_ptr();
2032
187k
    data.return_type_only = true;
2033
187k
    RETURN_NOT_OK(GetRedisSubDocument(iterator_.get(), data, /* projection */ nullptr,
2034
187k
                                      SeekFwdSuffices::kFalse));
2035
2036
187k
    if (doc_found) {
2037
187k
      if (--threshold < 0) {
2038
6
        response_.clear_array_response();
2039
6
        response_.set_code(RedisResponsePB::SERVER_ERROR);
2040
6
        response_.set_error_message("Too many keys in the database.");
2041
6
        return Status::OK();
2042
6
      }
2043
187k
      RETURN_NOT_OK(AddPrimitiveValueToResponseArray(key_primitive,
2044
187k
                                                     response_.mutable_array_response()));
2045
187k
    }
2046
187k
    iterator_->SeekOutOfSubDoc(key);
2047
187k
  }
2048
2049
624
  response_.set_code(RedisResponsePB::OK);
2050
624
  return Status::OK();
2051
630
}
2052
2053
84.6k
const RedisResponsePB& RedisReadOperation::response() {
2054
84.6k
  return response_;
2055
84.6k
}
2056
2057
}  // namespace docdb
2058
}  // namespace yb