YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/yql/redis/redisserver/redis_parser.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include "yb/yql/redis/redisserver/redis_parser.h"
15
16
#include <memory>
17
#include <string>
18
19
#include <boost/algorithm/string.hpp>
20
21
#include "yb/client/callbacks.h"
22
#include "yb/client/yb_op.h"
23
24
#include "yb/common/redis_protocol.pb.h"
25
26
#include "yb/gutil/casts.h"
27
28
#include "yb/util/split.h"
29
#include "yb/util/status.h"
30
#include "yb/util/status_format.h"
31
#include "yb/util/stol_utils.h"
32
#include "yb/util/string_case.h"
33
34
#include "yb/yql/redis/redisserver/redis_constants.h"
35
36
namespace yb {
37
namespace redisserver {
38
39
using yb::client::YBTable;
40
using yb::client::YBRedisWriteOp;
41
using yb::client::YBRedisReadOp;
42
using std::vector;
43
using std::shared_ptr;
44
using std::string;
45
46
namespace {
47
48
constexpr size_t kMaxNumberOfArgs = 1 << 20;
49
constexpr size_t kLineEndLength = 2;
50
constexpr size_t kMaxNumberLength = 25;
51
constexpr char kPositiveInfinity[] = "+inf";
52
constexpr char kNegativeInfinity[] = "-inf";
53
54
41.1k
string to_lower_case(Slice slice) {
55
41.1k
  return boost::to_lower_copy(slice.ToBuffer());
56
41.1k
}
57
58
59.5k
CHECKED_STATUS add_string_subkey(string subkey, RedisKeyValuePB* kv_pb) {
59
59.5k
  kv_pb->add_subkey()->set_string_subkey(std::move(subkey));
60
59.5k
  return Status::OK();
61
59.5k
}
62
63
30.2k
CHECKED_STATUS add_timestamp_subkey(const string &subkey, RedisKeyValuePB *kv_pb) {
64
30.2k
  auto timestamp = CheckedStoll(subkey);
65
30.2k
  RETURN_NOT_OK(timestamp);
66
30.2k
  kv_pb->add_subkey()->set_timestamp_subkey(*timestamp);
67
30.2k
  return Status::OK();
68
30.2k
}
69
70
21.1k
CHECKED_STATUS add_double_subkey(const string &subkey, RedisKeyValuePB *kv_pb) {
71
21.1k
  auto double_key = CheckedStold(subkey);
72
21.1k
  RETURN_NOT_OK(double_key);
73
21.1k
  kv_pb->add_subkey()->set_double_subkey(*double_key);
74
21.1k
  return Status::OK();
75
21.1k
}
76
77
5.73k
Result<int64_t> ParseInt64(const Slice& slice, const char* field) {
78
5.73k
  auto result = CheckedStoll(slice);
79
5.73k
  if (!result.ok()) {
80
0
    return STATUS_SUBSTITUTE(InvalidArgument,
81
0
        "$0 field $1 is not a valid number", field, slice.ToDebugString());
82
0
  }
83
5.73k
  return *result;
84
5.73k
}
85
86
5.49k
Result<int32_t> ParseInt32(const Slice& slice, const char* field) {
87
5.49k
  auto val = ParseInt64(slice, field);
88
5.49k
  if (!val.ok()) {
89
0
    return std::move(val.status());
90
0
  }
91
5.49k
  if (*val < std::numeric_limits<int32_t>::min() ||
92
5.49k
      *val > std::numeric_limits<int32_t>::max()) {
93
0
    return STATUS_SUBSTITUTE(InvalidArgument,
94
0
        "$0 field $1 is not within valid bounds", field, slice.ToDebugString());
95
0
  }
96
5.49k
  return static_cast<int32_t>(*val);
97
5.49k
}
98
99
} // namespace
100
101
81.9k
CHECKED_STATUS ParseSet(YBRedisWriteOp *op, const RedisClientCommand& args) {
102
81.9k
  if (args[1].empty()) {
103
0
    return STATUS_SUBSTITUTE(InvalidCommand,
104
0
        "A SET request must have a non empty key field");
105
0
  }
106
81.9k
  op->mutable_request()->mutable_set_request(); // Allocates new RedisSetRequestPB().
107
81.9k
  const auto& key = args[1];
108
81.9k
  const auto& value = args[2];
109
81.9k
  op->mutable_request()->mutable_key_value()->set_key(key.cdata(), key.size());
110
81.9k
  op->mutable_request()->mutable_key_value()->add_value(value.cdata(), value.size());
111
81.9k
  op->mutable_request()->mutable_key_value()->set_type(REDIS_TYPE_STRING);
112
81.9k
  size_t idx = 3;
113
82.1k
  while (idx < args.size()) {
114
134
    string upper_arg;
115
134
    if (args[idx].size() == 2) {
116
134
      ToUpperCase(args[idx].ToBuffer(), &upper_arg);
117
134
    }
118
119
134
    if (upper_arg == "EX" || 
upper_arg == "PX"89
) {
120
58
      if (args.size() < idx + 2) {
121
0
        return STATUS_SUBSTITUTE(InvalidCommand,
122
0
            "Expected TTL field after the EX flag, no value found");
123
0
      }
124
58
      auto ttl_val = ParseInt64(args[idx + 1], "TTL");
125
58
      RETURN_NOT_OK(ttl_val);
126
58
      if (*ttl_val < kRedisMinTtlSetExSeconds || *ttl_val > kRedisMaxTtlSeconds) {
127
0
        return STATUS_FORMAT(InvalidCommand,
128
0
            "TTL field $0 is not within valid bounds", args[idx + 1]);
129
0
      }
130
58
      const int64_t milliseconds_per_unit =
131
58
          upper_arg == "EX" ? 
MonoTime::kMillisecondsPerSecond45
:
113
;
132
58
      op->mutable_request()->mutable_set_request()->set_ttl(*ttl_val * milliseconds_per_unit);
133
58
      idx += 2;
134
76
    } else if (upper_arg == "XX") {
135
26
      op->mutable_request()->mutable_set_request()->set_mode(REDIS_WRITEMODE_UPDATE);
136
26
      idx += 1;
137
50
    } else if (upper_arg == "NX") {
138
50
      op->mutable_request()->mutable_set_request()->set_mode(REDIS_WRITEMODE_INSERT);
139
50
      idx += 1;
140
50
    } else {
141
0
      return STATUS_FORMAT(InvalidCommand,
142
0
          "Unidentified argument $0 found while parsing set command", args[idx]);
143
0
    }
144
134
  }
145
81.9k
  return Status::OK();
146
81.9k
}
147
148
4
CHECKED_STATUS ParseSetNX(YBRedisWriteOp *op, const RedisClientCommand& args) {
149
4
  if (args[1].empty()) {
150
0
    return STATUS_SUBSTITUTE(InvalidCommand,
151
0
        "A SETNX request must have a non empty key field");
152
0
  }
153
154
4
  const auto& key = args[1];
155
4
  const auto& value = args[2];
156
157
4
  auto key_value = op->mutable_request()->mutable_key_value();
158
4
  key_value->set_key(key.cdata(), key.size());
159
4
  key_value->add_value(value.cdata(), value.size());
160
4
  key_value->set_type(REDIS_TYPE_STRING);
161
162
4
  auto set_request = op->mutable_request()->mutable_set_request();
163
4
  set_request->set_mode(REDIS_WRITEMODE_INSERT);
164
  // SETNX returns 1 / 0 (instead of OK / (nil) in 'SET k v NX' command).
165
4
  set_request->set_expect_ok_response(false);
166
4
  return Status::OK();
167
4
}
168
169
// TODO: support MSET
170
0
CHECKED_STATUS ParseMSet(YBRedisWriteOp *op, const RedisClientCommand& args) {
171
0
  if (args.size() < 3 || args.size() % 2 == 0) {
172
0
    return STATUS_SUBSTITUTE(InvalidCommand,
173
0
        "An MSET request must have at least 3, odd number of arguments, found $0", args.size());
174
0
  }
175
0
  return STATUS(InvalidCommand, "MSET command not yet supported");
176
0
}
177
178
8
CHECKED_STATUS ParseHSet(YBRedisWriteOp *op, const RedisClientCommand& args) {
179
8
  const auto& key = args[1];
180
8
  const auto& subkey = args[2];
181
8
  const auto& value = args[3];
182
183
8
  op->mutable_request()->mutable_set_request(); // Allocates new RedisSetRequestPB().
184
8
  op->mutable_request()->mutable_key_value()->set_key(key.cdata(), key.size());
185
8
  op->mutable_request()->mutable_key_value()->set_type(REDIS_TYPE_HASH);
186
8
  op->mutable_request()->mutable_key_value()->add_subkey()->set_string_subkey(subkey.cdata(),
187
8
                                                                              subkey.size());
188
8
  op->mutable_request()->mutable_key_value()->add_value(value.cdata(), value.size());
189
8
  return Status::OK();
190
8
}
191
192
193
6
CHECKED_STATUS ParseHIncrBy(YBRedisWriteOp *op, const RedisClientCommand& args) {
194
6
  const auto& key = args[1];
195
6
  const auto& subkey = args[2];
196
6
  const auto& incr_by = ParseInt64(args[3], "INCR_BY");
197
6
  RETURN_NOT_OK(incr_by);
198
6
  op->mutable_request()->mutable_incr_request()->set_increment_int(*incr_by);
199
6
  op->mutable_request()->mutable_key_value()->set_key(key.cdata(), key.size());
200
6
  op->mutable_request()->mutable_key_value()->set_type(REDIS_TYPE_HASH);
201
6
  op->mutable_request()->mutable_key_value()->add_subkey()->set_string_subkey(subkey.cdata(),
202
6
                                                                              subkey.size());
203
6
  return Status::OK();
204
6
}
205
206
CHECKED_STATUS ParseZAddOptions(
207
21.1k
    SortedSetOptionsPB *options, const RedisClientCommand& args, size_t *idx) {
208
  // While we keep seeing flags, set the appropriate field in options and increment idx. When
209
  // we finally stop seeing flags, the idx will be set to that token for later parsing.
210
  // Note that we can see duplicate flags, and it should have the same behavior as seeing the
211
  // flag once.
212
21.1k
  while (*idx < args.size()) {
213
21.1k
    if (boost::iequals(args[*idx].ToBuffer(), kCH)) {
214
2
      options->set_ch(true);
215
21.1k
    } else if (boost::iequals(args[*idx].ToBuffer(), kINCR)) {
216
0
      options->set_incr(true);
217
21.1k
    } else if (boost::iequals(args[*idx].ToBuffer(), kNX)) {
218
2
      if (options->update_options() == SortedSetOptionsPB_UpdateOptions_XX) {
219
0
        return STATUS_SUBSTITUTE(InvalidArgument,
220
0
                                 "XX and NX options at the same time are not compatible");
221
0
      }
222
2
      options->set_update_options(SortedSetOptionsPB_UpdateOptions_NX);
223
21.1k
    } else if (boost::iequals(args[*idx].ToBuffer(), kXX)) {
224
2
      if (options->update_options() == SortedSetOptionsPB_UpdateOptions_NX) {
225
0
        return STATUS_SUBSTITUTE(InvalidArgument,
226
0
                                 "XX and NX options at the same time are not compatible");
227
0
      }
228
2
      options->set_update_options(SortedSetOptionsPB_UpdateOptions_XX);
229
21.1k
    } else {
230
      // We have encountered a non-option token, return.
231
21.1k
      return Status::OK();
232
21.1k
    }
233
6
    *idx = *idx + 1;
234
6
  }
235
0
  return Status::OK();
236
21.1k
}
237
238
template <typename AddSubKey>
239
CHECKED_STATUS ParseHMSetLikeCommands(YBRedisWriteOp *op, const RedisClientCommand& args,
240
                                      const RedisDataType& type,
241
41.1k
                                      AddSubKey add_sub_key) {
242
41.1k
  if (args.size() < 4 || (args.size() % 2 == 1 && 
type == REDIS_TYPE_HASH6
)) {
243
0
    return STATUS_SUBSTITUTE(InvalidArgument,
244
0
                             "wrong number of arguments: $0 for command: $1", args.size(),
245
0
                             string(args[0].cdata(), args[0].size()));
246
0
  }
247
248
41.1k
  op->mutable_request()->mutable_set_request(); // Allocates new RedisSetRequestPB().
249
41.1k
  op->mutable_request()->mutable_key_value()->set_type(type);
250
41.1k
  op->mutable_request()->mutable_key_value()->set_key(args[1].cdata(), args[1].size());
251
252
41.1k
  if (type == REDIS_TYPE_HASH) {
253
19.8k
    op->mutable_request()->mutable_set_request()->set_expect_ok_response(true);
254
19.8k
  }
255
256
41.1k
  size_t start_idx = 2;
257
41.1k
  if (type == REDIS_TYPE_SORTEDSET) {
258
21.1k
    RETURN_NOT_OK(ParseZAddOptions(
259
21.1k
        op->mutable_request()->mutable_set_request()->mutable_sorted_set_options(),
260
21.1k
        args, &start_idx));
261
262
    // If the INCR flag is set, can only have one [score member] pair.
263
21.1k
    if (op->request().set_request().sorted_set_options().incr() && 
(args.size() - start_idx) != 20
) {
264
0
      return STATUS_SUBSTITUTE(InvalidArgument,
265
0
                               "wrong number of tokens after INCR flag specified: Need 2 but found "
266
0
                                   "$0 for command: $1", args.size() - start_idx,
267
0
                               string(args[0].cdata(), args[0].size()));
268
0
    }
269
21.1k
  }
270
271
  // Need [score member] to come in pairs.
272
41.1k
  if ((args.size() - start_idx) % 2 == 1 || args.size() - start_idx == 0) {
273
0
    return STATUS_SUBSTITUTE(InvalidArgument,
274
0
                             "Expect even and non-zero number of arguments "
275
0
                             "for command: $0, found $1",
276
0
                             string(args[0].cdata(), args[0].size()), args.size() - start_idx);
277
0
  }
278
279
41.1k
  std::unordered_map<string, string> kv_map;
280
151k
  for (size_t i = start_idx; i < args.size(); 
i += 2110k
) {
281
    // EXPIRE_AT/EXPIRE_IN only supported for redis timeseries currently.
282
110k
    if (type == REDIS_TYPE_TIMESERIES) {
283
29.6k
      string upper_arg;
284
29.6k
      ToUpperCase(args[i].ToBuffer(), &upper_arg);
285
29.6k
      if (upper_arg == kExpireAt || upper_arg == kExpireIn) {
286
12
        if (i + 2 != args.size()) {
287
0
          return STATUS_SUBSTITUTE(InvalidCommand, "$0 should be at the end of the command",
288
0
                                   string(args[i].cdata(), args[i].size()));
289
0
        }
290
12
        auto temp = CheckedStoll(args[i + 1]);
291
12
        RETURN_NOT_OK(temp);
292
12
        int64_t ttl = 0;
293
12
        if (upper_arg == kExpireIn) {
294
12
          ttl = *temp;
295
12
          if (ttl > kRedisMaxTtlSeconds || ttl < kRedisMinTtlSetExSeconds) {
296
0
            return STATUS_SUBSTITUTE(InvalidCommand, "TTL: $0 needs be in the range [$1, $2]", ttl,
297
0
                                     kRedisMinTtlSetExSeconds, kRedisMaxTtlSeconds);
298
0
          }
299
12
        } else {
300
0
          auto current_time = GetCurrentTimeMicros() / MonoTime::kMicrosecondsPerSecond;
301
0
          ttl = *temp - current_time;
302
0
          if (ttl > kRedisMaxTtlSeconds || ttl < kRedisMinTtlSetExSeconds) {
303
0
            return STATUS_SUBSTITUTE(InvalidCommand, "EXPIRE_AT: $0 needs be in the range [$1, $2]",
304
0
                                     *temp,
305
0
                                     kRedisMinTtlSetExSeconds + current_time,
306
0
                                     kRedisMaxTtlSeconds + current_time);
307
0
          }
308
0
        }
309
310
        // Need to pass ttl in milliseconds, user supplied values are in seconds.
311
12
        op->mutable_request()->mutable_set_request()->set_ttl(
312
12
            ttl * MonoTime::kMillisecondsPerSecond);
313
29.6k
      } else {
314
29.6k
        kv_map[args[i].ToBuffer()] = args[i + 1].ToBuffer();
315
29.6k
      }
316
80.6k
    } else if (type == REDIS_TYPE_SORTEDSET) {
317
      // For sorted sets, we store the mapping from values to scores, since values are distinct
318
      // but scores aren't.
319
21.1k
      kv_map[args[i + 1].ToBuffer()] = args[i].ToBuffer();
320
59.4k
    } else {
321
59.4k
      kv_map[args[i].ToBuffer()] = args[i + 1].ToBuffer();
322
59.4k
    }
323
110k
  }
324
325
110k
  
for (const auto& kv : kv_map)41.1k
{
326
110k
    auto req_kv = op->mutable_request()->mutable_key_value();
327
110k
    if (type == REDIS_TYPE_SORTEDSET) {
328
      // Since the mapping is values to scores, need to reverse when creating the request.
329
21.1k
      RETURN_NOT_OK(add_sub_key(kv.second, req_kv));
330
21.1k
      req_kv->add_value(kv.first);
331
89.0k
    } else {
332
89.0k
      RETURN_NOT_OK(add_sub_key(kv.first, req_kv));
333
89.0k
      req_kv->add_value(kv.second);
334
89.0k
    }
335
110k
  }
336
41.1k
  return Status::OK();
337
41.1k
}
yb::Status yb::redisserver::ParseHMSetLikeCommands<yb::Status (*)(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, yb::RedisKeyValuePB*)>(yb::client::YBRedisWriteOp*, boost::container::small_vector<yb::Slice, 8ul, void, void> const&, yb::RedisDataType const&, yb::Status (*)(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, yb::RedisKeyValuePB*))
Line
Count
Source
241
19.8k
                                      AddSubKey add_sub_key) {
242
19.8k
  if (args.size() < 4 || (args.size() % 2 == 1 && 
type == REDIS_TYPE_HASH0
)) {
243
0
    return STATUS_SUBSTITUTE(InvalidArgument,
244
0
                             "wrong number of arguments: $0 for command: $1", args.size(),
245
0
                             string(args[0].cdata(), args[0].size()));
246
0
  }
247
248
19.8k
  op->mutable_request()->mutable_set_request(); // Allocates new RedisSetRequestPB().
249
19.8k
  op->mutable_request()->mutable_key_value()->set_type(type);
250
19.8k
  op->mutable_request()->mutable_key_value()->set_key(args[1].cdata(), args[1].size());
251
252
19.8k
  if (type == REDIS_TYPE_HASH) {
253
19.8k
    op->mutable_request()->mutable_set_request()->set_expect_ok_response(true);
254
19.8k
  }
255
256
19.8k
  size_t start_idx = 2;
257
19.8k
  if (type == REDIS_TYPE_SORTEDSET) {
258
0
    RETURN_NOT_OK(ParseZAddOptions(
259
0
        op->mutable_request()->mutable_set_request()->mutable_sorted_set_options(),
260
0
        args, &start_idx));
261
262
    // If the INCR flag is set, can only have one [score member] pair.
263
0
    if (op->request().set_request().sorted_set_options().incr() && (args.size() - start_idx) != 2) {
264
0
      return STATUS_SUBSTITUTE(InvalidArgument,
265
0
                               "wrong number of tokens after INCR flag specified: Need 2 but found "
266
0
                                   "$0 for command: $1", args.size() - start_idx,
267
0
                               string(args[0].cdata(), args[0].size()));
268
0
    }
269
0
  }
270
271
  // Need [score member] to come in pairs.
272
19.8k
  if ((args.size() - start_idx) % 2 == 1 || args.size() - start_idx == 0) {
273
0
    return STATUS_SUBSTITUTE(InvalidArgument,
274
0
                             "Expect even and non-zero number of arguments "
275
0
                             "for command: $0, found $1",
276
0
                             string(args[0].cdata(), args[0].size()), args.size() - start_idx);
277
0
  }
278
279
19.8k
  std::unordered_map<string, string> kv_map;
280
79.2k
  for (size_t i = start_idx; i < args.size(); 
i += 259.4k
) {
281
    // EXPIRE_AT/EXPIRE_IN only supported for redis timeseries currently.
282
59.4k
    if (type == REDIS_TYPE_TIMESERIES) {
283
0
      string upper_arg;
284
0
      ToUpperCase(args[i].ToBuffer(), &upper_arg);
285
0
      if (upper_arg == kExpireAt || upper_arg == kExpireIn) {
286
0
        if (i + 2 != args.size()) {
287
0
          return STATUS_SUBSTITUTE(InvalidCommand, "$0 should be at the end of the command",
288
0
                                   string(args[i].cdata(), args[i].size()));
289
0
        }
290
0
        auto temp = CheckedStoll(args[i + 1]);
291
0
        RETURN_NOT_OK(temp);
292
0
        int64_t ttl = 0;
293
0
        if (upper_arg == kExpireIn) {
294
0
          ttl = *temp;
295
0
          if (ttl > kRedisMaxTtlSeconds || ttl < kRedisMinTtlSetExSeconds) {
296
0
            return STATUS_SUBSTITUTE(InvalidCommand, "TTL: $0 needs be in the range [$1, $2]", ttl,
297
0
                                     kRedisMinTtlSetExSeconds, kRedisMaxTtlSeconds);
298
0
          }
299
0
        } else {
300
0
          auto current_time = GetCurrentTimeMicros() / MonoTime::kMicrosecondsPerSecond;
301
0
          ttl = *temp - current_time;
302
0
          if (ttl > kRedisMaxTtlSeconds || ttl < kRedisMinTtlSetExSeconds) {
303
0
            return STATUS_SUBSTITUTE(InvalidCommand, "EXPIRE_AT: $0 needs be in the range [$1, $2]",
304
0
                                     *temp,
305
0
                                     kRedisMinTtlSetExSeconds + current_time,
306
0
                                     kRedisMaxTtlSeconds + current_time);
307
0
          }
308
0
        }
309
310
        // Need to pass ttl in milliseconds, user supplied values are in seconds.
311
0
        op->mutable_request()->mutable_set_request()->set_ttl(
312
0
            ttl * MonoTime::kMillisecondsPerSecond);
313
0
      } else {
314
0
        kv_map[args[i].ToBuffer()] = args[i + 1].ToBuffer();
315
0
      }
316
59.4k
    } else if (type == REDIS_TYPE_SORTEDSET) {
317
      // For sorted sets, we store the mapping from values to scores, since values are distinct
318
      // but scores aren't.
319
0
      kv_map[args[i + 1].ToBuffer()] = args[i].ToBuffer();
320
59.4k
    } else {
321
59.4k
      kv_map[args[i].ToBuffer()] = args[i + 1].ToBuffer();
322
59.4k
    }
323
59.4k
  }
324
325
59.4k
  
for (const auto& kv : kv_map)19.8k
{
326
59.4k
    auto req_kv = op->mutable_request()->mutable_key_value();
327
59.4k
    if (type == REDIS_TYPE_SORTEDSET) {
328
      // Since the mapping is values to scores, need to reverse when creating the request.
329
0
      RETURN_NOT_OK(add_sub_key(kv.second, req_kv));
330
0
      req_kv->add_value(kv.first);
331
59.4k
    } else {
332
59.4k
      RETURN_NOT_OK(add_sub_key(kv.first, req_kv));
333
59.4k
      req_kv->add_value(kv.second);
334
59.4k
    }
335
59.4k
  }
336
19.8k
  return Status::OK();
337
19.8k
}
yb::Status yb::redisserver::ParseHMSetLikeCommands<yb::Status (*)(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, yb::RedisKeyValuePB*)>(yb::client::YBRedisWriteOp*, boost::container::small_vector<yb::Slice, 8ul, void, void> const&, yb::RedisDataType const&, yb::Status (*)(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, yb::RedisKeyValuePB*))
Line
Count
Source
241
21.2k
                                      AddSubKey add_sub_key) {
242
21.2k
  if (args.size() < 4 || (args.size() % 2 == 1 && 
type == REDIS_TYPE_HASH6
)) {
243
0
    return STATUS_SUBSTITUTE(InvalidArgument,
244
0
                             "wrong number of arguments: $0 for command: $1", args.size(),
245
0
                             string(args[0].cdata(), args[0].size()));
246
0
  }
247
248
21.2k
  op->mutable_request()->mutable_set_request(); // Allocates new RedisSetRequestPB().
249
21.2k
  op->mutable_request()->mutable_key_value()->set_type(type);
250
21.2k
  op->mutable_request()->mutable_key_value()->set_key(args[1].cdata(), args[1].size());
251
252
21.2k
  if (type == REDIS_TYPE_HASH) {
253
0
    op->mutable_request()->mutable_set_request()->set_expect_ok_response(true);
254
0
  }
255
256
21.2k
  size_t start_idx = 2;
257
21.2k
  if (type == REDIS_TYPE_SORTEDSET) {
258
21.1k
    RETURN_NOT_OK(ParseZAddOptions(
259
21.1k
        op->mutable_request()->mutable_set_request()->mutable_sorted_set_options(),
260
21.1k
        args, &start_idx));
261
262
    // If the INCR flag is set, can only have one [score member] pair.
263
21.1k
    if (op->request().set_request().sorted_set_options().incr() && 
(args.size() - start_idx) != 20
) {
264
0
      return STATUS_SUBSTITUTE(InvalidArgument,
265
0
                               "wrong number of tokens after INCR flag specified: Need 2 but found "
266
0
                                   "$0 for command: $1", args.size() - start_idx,
267
0
                               string(args[0].cdata(), args[0].size()));
268
0
    }
269
21.1k
  }
270
271
  // Need [score member] to come in pairs.
272
21.2k
  if ((args.size() - start_idx) % 2 == 1 || args.size() - start_idx == 0) {
273
0
    return STATUS_SUBSTITUTE(InvalidArgument,
274
0
                             "Expect even and non-zero number of arguments "
275
0
                             "for command: $0, found $1",
276
0
                             string(args[0].cdata(), args[0].size()), args.size() - start_idx);
277
0
  }
278
279
21.2k
  std::unordered_map<string, string> kv_map;
280
72.1k
  for (size_t i = start_idx; i < args.size(); 
i += 250.8k
) {
281
    // EXPIRE_AT/EXPIRE_IN only supported for redis timeseries currently.
282
50.8k
    if (type == REDIS_TYPE_TIMESERIES) {
283
29.6k
      string upper_arg;
284
29.6k
      ToUpperCase(args[i].ToBuffer(), &upper_arg);
285
29.6k
      if (upper_arg == kExpireAt || upper_arg == kExpireIn) {
286
12
        if (i + 2 != args.size()) {
287
0
          return STATUS_SUBSTITUTE(InvalidCommand, "$0 should be at the end of the command",
288
0
                                   string(args[i].cdata(), args[i].size()));
289
0
        }
290
12
        auto temp = CheckedStoll(args[i + 1]);
291
12
        RETURN_NOT_OK(temp);
292
12
        int64_t ttl = 0;
293
12
        if (upper_arg == kExpireIn) {
294
12
          ttl = *temp;
295
12
          if (ttl > kRedisMaxTtlSeconds || ttl < kRedisMinTtlSetExSeconds) {
296
0
            return STATUS_SUBSTITUTE(InvalidCommand, "TTL: $0 needs be in the range [$1, $2]", ttl,
297
0
                                     kRedisMinTtlSetExSeconds, kRedisMaxTtlSeconds);
298
0
          }
299
12
        } else {
300
0
          auto current_time = GetCurrentTimeMicros() / MonoTime::kMicrosecondsPerSecond;
301
0
          ttl = *temp - current_time;
302
0
          if (ttl > kRedisMaxTtlSeconds || ttl < kRedisMinTtlSetExSeconds) {
303
0
            return STATUS_SUBSTITUTE(InvalidCommand, "EXPIRE_AT: $0 needs be in the range [$1, $2]",
304
0
                                     *temp,
305
0
                                     kRedisMinTtlSetExSeconds + current_time,
306
0
                                     kRedisMaxTtlSeconds + current_time);
307
0
          }
308
0
        }
309
310
        // Need to pass ttl in milliseconds, user supplied values are in seconds.
311
12
        op->mutable_request()->mutable_set_request()->set_ttl(
312
12
            ttl * MonoTime::kMillisecondsPerSecond);
313
29.6k
      } else {
314
29.6k
        kv_map[args[i].ToBuffer()] = args[i + 1].ToBuffer();
315
29.6k
      }
316
29.6k
    } else 
if (21.1k
type == REDIS_TYPE_SORTEDSET21.1k
) {
317
      // For sorted sets, we store the mapping from values to scores, since values are distinct
318
      // but scores aren't.
319
21.1k
      kv_map[args[i + 1].ToBuffer()] = args[i].ToBuffer();
320
21.1k
    } else {
321
0
      kv_map[args[i].ToBuffer()] = args[i + 1].ToBuffer();
322
0
    }
323
50.8k
  }
324
325
50.8k
  
for (const auto& kv : kv_map)21.2k
{
326
50.8k
    auto req_kv = op->mutable_request()->mutable_key_value();
327
50.8k
    if (type == REDIS_TYPE_SORTEDSET) {
328
      // Since the mapping is values to scores, need to reverse when creating the request.
329
21.1k
      RETURN_NOT_OK(add_sub_key(kv.second, req_kv));
330
21.1k
      req_kv->add_value(kv.first);
331
29.6k
    } else {
332
29.6k
      RETURN_NOT_OK(add_sub_key(kv.first, req_kv));
333
29.6k
      req_kv->add_value(kv.second);
334
29.6k
    }
335
50.8k
  }
336
21.2k
  return Status::OK();
337
21.2k
}
338
339
19.8k
CHECKED_STATUS ParseHMSet(YBRedisWriteOp *op, const RedisClientCommand& args) {
340
19.8k
  DCHECK_EQ("hmset", to_lower_case(args[0]))
341
0
      << "Parsing hmset request where first arg is not hmset.";
342
19.8k
  return ParseHMSetLikeCommands(op, args, REDIS_TYPE_HASH, add_string_subkey);
343
19.8k
}
344
345
174
CHECKED_STATUS ParseTsAdd(YBRedisWriteOp *op, const RedisClientCommand& args) {
346
174
  DCHECK_EQ("tsadd", to_lower_case(args[0]))
347
0
    << "Parsing hmset request where first arg is not hmset.";
348
174
  return ParseHMSetLikeCommands(op, args, REDIS_TYPE_TIMESERIES,
349
174
                                add_timestamp_subkey);
350
174
}
351
352
21.1k
Status ParseZAdd(YBRedisWriteOp *op, const RedisClientCommand& args) {
353
21.1k
  DCHECK_EQ("zadd", to_lower_case(args[0]))
354
0
    << "Parsing zadd request where first arg is not zadd.";
355
21.1k
  return ParseHMSetLikeCommands(op, args, REDIS_TYPE_SORTEDSET, add_double_subkey);
356
21.1k
}
357
358
16
Status ParsePush(YBRedisWriteOp *op, const RedisClientCommand& args, RedisSide side) {
359
16
  op->mutable_request()->mutable_push_request()->set_side(side);
360
16
  op->mutable_request()->mutable_key_value()->set_key(args[1].cdata(), args[1].size());
361
16
  op->mutable_request()->mutable_key_value()->set_type(REDIS_TYPE_LIST);
362
363
16
  auto mutable_key = op->mutable_request()->mutable_key_value();
364
36
  for (size_t i = 2; i < args.size(); 
++i20
) {
365
20
    mutable_key->add_value(args[i].cdata(), args[i].size());
366
20
  }
367
16
  return Status::OK();
368
16
}
369
370
10
Status ParseLPush(YBRedisWriteOp *op, const RedisClientCommand& args) {
371
10
  return ParsePush(op, args, REDIS_SIDE_LEFT);
372
10
}
373
374
6
Status ParseRPush(YBRedisWriteOp *op, const RedisClientCommand& args) {
375
6
  return ParsePush(op, args, REDIS_SIDE_RIGHT);
376
6
}
377
378
template <typename YBRedisOp, typename AddSubKey>
379
CHECKED_STATUS ParseCollection(YBRedisOp *op,
380
                               const RedisClientCommand& args,
381
                               boost::optional<RedisDataType> type,
382
                               AddSubKey add_sub_key,
383
165
                               bool remove_duplicates = true) {
384
165
  const auto& key = args[1];
385
165
  op->mutable_request()->mutable_key_value()->set_key(key.cdata(), key.size());
386
165
  if (type) {
387
70
    op->mutable_request()->mutable_key_value()->set_type(*type);
388
70
  }
389
165
  if (remove_duplicates) {
390
    // We remove duplicates from the subkeys here.
391
70
    std::set<string> subkey_set;
392
752
    for (size_t i = 2; i < args.size(); 
i++682
) {
393
682
      subkey_set.insert(string(args[i].cdata(), args[i].size()));
394
682
    }
395
70
    op->mutable_request()->mutable_key_value()->mutable_subkey()->Reserve(
396
70
        narrow_cast<int>(subkey_set.size()));
397
682
    for (const auto &val : subkey_set) {
398
682
      RETURN_NOT_OK(add_sub_key(val, op->mutable_request()->mutable_key_value()));
399
682
    }
400
95
  } else {
401
95
    op->mutable_request()->mutable_key_value()->mutable_subkey()->Reserve(
402
95
        narrow_cast<int>(args.size() - 2));
403
127
    for (size_t i = 2; i < args.size(); 
i++32
) {
404
32
      RETURN_NOT_OK(add_sub_key(string(args[i].cdata(), args[i].size()),
405
32
                                op->mutable_request()->mutable_key_value()));
406
32
    }
407
95
  }
408
165
  return Status::OK();
409
165
}
yb::Status yb::redisserver::ParseCollection<yb::client::YBRedisWriteOp, yb::Status (*)(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, yb::RedisKeyValuePB*)>(yb::client::YBRedisWriteOp*, boost::container::small_vector<yb::Slice, 8ul, void, void> const&, boost::optional<yb::RedisDataType>, yb::Status (*)(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, yb::RedisKeyValuePB*), bool)
Line
Count
Source
383
52
                               bool remove_duplicates = true) {
384
52
  const auto& key = args[1];
385
52
  op->mutable_request()->mutable_key_value()->set_key(key.cdata(), key.size());
386
52
  if (type) {
387
52
    op->mutable_request()->mutable_key_value()->set_type(*type);
388
52
  }
389
52
  if (remove_duplicates) {
390
    // We remove duplicates from the subkeys here.
391
52
    std::set<string> subkey_set;
392
122
    for (size_t i = 2; i < args.size(); 
i++70
) {
393
70
      subkey_set.insert(string(args[i].cdata(), args[i].size()));
394
70
    }
395
52
    op->mutable_request()->mutable_key_value()->mutable_subkey()->Reserve(
396
52
        narrow_cast<int>(subkey_set.size()));
397
70
    for (const auto &val : subkey_set) {
398
70
      RETURN_NOT_OK(add_sub_key(val, op->mutable_request()->mutable_key_value()));
399
70
    }
400
52
  } else {
401
0
    op->mutable_request()->mutable_key_value()->mutable_subkey()->Reserve(
402
0
        narrow_cast<int>(args.size() - 2));
403
0
    for (size_t i = 2; i < args.size(); i++) {
404
0
      RETURN_NOT_OK(add_sub_key(string(args[i].cdata(), args[i].size()),
405
0
                                op->mutable_request()->mutable_key_value()));
406
0
    }
407
0
  }
408
52
  return Status::OK();
409
52
}
yb::Status yb::redisserver::ParseCollection<yb::client::YBRedisWriteOp, yb::Status (*)(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, yb::RedisKeyValuePB*)>(yb::client::YBRedisWriteOp*, boost::container::small_vector<yb::Slice, 8ul, void, void> const&, boost::optional<yb::RedisDataType>, yb::Status (*)(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, yb::RedisKeyValuePB*), bool)
Line
Count
Source
383
18
                               bool remove_duplicates = true) {
384
18
  const auto& key = args[1];
385
18
  op->mutable_request()->mutable_key_value()->set_key(key.cdata(), key.size());
386
18
  if (type) {
387
18
    op->mutable_request()->mutable_key_value()->set_type(*type);
388
18
  }
389
18
  if (remove_duplicates) {
390
    // We remove duplicates from the subkeys here.
391
18
    std::set<string> subkey_set;
392
630
    for (size_t i = 2; i < args.size(); 
i++612
) {
393
612
      subkey_set.insert(string(args[i].cdata(), args[i].size()));
394
612
    }
395
18
    op->mutable_request()->mutable_key_value()->mutable_subkey()->Reserve(
396
18
        narrow_cast<int>(subkey_set.size()));
397
612
    for (const auto &val : subkey_set) {
398
612
      RETURN_NOT_OK(add_sub_key(val, op->mutable_request()->mutable_key_value()));
399
612
    }
400
18
  } else {
401
0
    op->mutable_request()->mutable_key_value()->mutable_subkey()->Reserve(
402
0
        narrow_cast<int>(args.size() - 2));
403
0
    for (size_t i = 2; i < args.size(); i++) {
404
0
      RETURN_NOT_OK(add_sub_key(string(args[i].cdata(), args[i].size()),
405
0
                                op->mutable_request()->mutable_key_value()));
406
0
    }
407
0
  }
408
18
  return Status::OK();
409
18
}
yb::Status yb::redisserver::ParseCollection<yb::client::YBRedisReadOp, yb::Status (*)(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, yb::RedisKeyValuePB*)>(yb::client::YBRedisReadOp*, boost::container::small_vector<yb::Slice, 8ul, void, void> const&, boost::optional<yb::RedisDataType>, yb::Status (*)(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, yb::RedisKeyValuePB*), bool)
Line
Count
Source
383
95
                               bool remove_duplicates = true) {
384
95
  const auto& key = args[1];
385
95
  op->mutable_request()->mutable_key_value()->set_key(key.cdata(), key.size());
386
95
  if (type) {
387
0
    op->mutable_request()->mutable_key_value()->set_type(*type);
388
0
  }
389
95
  if (remove_duplicates) {
390
    // We remove duplicates from the subkeys here.
391
0
    std::set<string> subkey_set;
392
0
    for (size_t i = 2; i < args.size(); i++) {
393
0
      subkey_set.insert(string(args[i].cdata(), args[i].size()));
394
0
    }
395
0
    op->mutable_request()->mutable_key_value()->mutable_subkey()->Reserve(
396
0
        narrow_cast<int>(subkey_set.size()));
397
0
    for (const auto &val : subkey_set) {
398
0
      RETURN_NOT_OK(add_sub_key(val, op->mutable_request()->mutable_key_value()));
399
0
    }
400
95
  } else {
401
95
    op->mutable_request()->mutable_key_value()->mutable_subkey()->Reserve(
402
95
        narrow_cast<int>(args.size() - 2));
403
127
    for (size_t i = 2; i < args.size(); 
i++32
) {
404
32
      RETURN_NOT_OK(add_sub_key(string(args[i].cdata(), args[i].size()),
405
32
                                op->mutable_request()->mutable_key_value()));
406
32
    }
407
95
  }
408
95
  return Status::OK();
409
95
}
410
411
12
CHECKED_STATUS ParseHDel(YBRedisWriteOp *op, const RedisClientCommand& args) {
412
12
  op->mutable_request()->mutable_del_request(); // Allocates new RedisDelRequestPB().
413
12
  return ParseCollection(op, args, REDIS_TYPE_HASH, add_string_subkey);
414
12
}
415
416
18
CHECKED_STATUS ParseTsRem(YBRedisWriteOp *op, const RedisClientCommand& args) {
417
18
  op->mutable_request()->mutable_del_request(); // Allocates new RedisDelRequestPB().
418
18
  return ParseCollection(op, args, REDIS_TYPE_TIMESERIES, add_timestamp_subkey);
419
18
}
420
421
10
CHECKED_STATUS ParseZRem(YBRedisWriteOp *op, const RedisClientCommand& args) {
422
10
  op->mutable_request()->mutable_del_request(); // Allocates new RedisDelRequestPB().
423
10
  return ParseCollection(op, args, REDIS_TYPE_SORTEDSET, add_string_subkey);
424
10
}
425
426
26
CHECKED_STATUS ParseSAdd(YBRedisWriteOp *op, const RedisClientCommand& args) {
427
26
  op->mutable_request()->mutable_add_request(); // Allocates new RedisAddRequestPB().
428
26
  return ParseCollection(op, args, REDIS_TYPE_SET, add_string_subkey);
429
26
}
430
431
4
CHECKED_STATUS ParseSRem(YBRedisWriteOp *op, const RedisClientCommand& args) {
432
4
  op->mutable_request()->mutable_del_request(); // Allocates new RedisDelRequestPB().
433
4
  return ParseCollection(op, args, REDIS_TYPE_SET, add_string_subkey);
434
4
}
435
436
0
CHECKED_STATUS ParsePop(YBRedisWriteOp *op, const RedisClientCommand& args, RedisSide side) {
437
0
  op->mutable_request()->mutable_pop_request()->set_side(side);
438
0
  op->mutable_request()->mutable_key_value()->set_key(args[1].cdata(), args[1].size());
439
0
  op->mutable_request()->mutable_key_value()->set_type(REDIS_TYPE_LIST);
440
0
  return Status::OK();
441
0
}
442
443
0
CHECKED_STATUS ParseLPop(YBRedisWriteOp *op, const RedisClientCommand& args) {
444
0
  return ParsePop(op, args, REDIS_SIDE_LEFT);
445
0
}
446
447
0
CHECKED_STATUS ParseRPop(YBRedisWriteOp *op, const RedisClientCommand& args) {
448
0
  return ParsePop(op, args, REDIS_SIDE_RIGHT);
449
0
}
450
451
2
CHECKED_STATUS ParseGetSet(YBRedisWriteOp *op, const RedisClientCommand& args) {
452
2
  const auto& key = args[1];
453
2
  const auto& value = args[2];
454
2
  op->mutable_request()->mutable_getset_request(); // Allocates new RedisGetSetRequestPB().
455
2
  op->mutable_request()->mutable_key_value()->set_key(key.cdata(), key.size());
456
2
  op->mutable_request()->mutable_key_value()->add_value(value.cdata(), value.size());
457
2
  return Status::OK();
458
2
}
459
460
4
CHECKED_STATUS ParseAppend(YBRedisWriteOp* op, const RedisClientCommand& args) {
461
4
  const auto& key = args[1];
462
4
  const auto& value = args[2];
463
4
  op->mutable_request()->mutable_append_request(); // Allocates new RedisAppendRequestPB().
464
4
  op->mutable_request()->mutable_key_value()->set_key(key.cdata(), key.size());
465
4
  op->mutable_request()->mutable_key_value()->add_value(value.cdata(), value.size());
466
4
  return Status::OK();
467
4
}
468
469
// Note: deleting only one key is supported using one command as of now.
470
10
CHECKED_STATUS ParseDel(YBRedisWriteOp* op, const RedisClientCommand& args) {
471
10
  const auto& key = args[1];
472
10
  op->mutable_request()->mutable_del_request(); // Allocates new RedisDelRequestPB().
473
10
  op->mutable_request()->mutable_key_value()->set_key(key.cdata(), key.size());
474
  // We should be able to delete all types of top level keys
475
10
  op->mutable_request()->mutable_key_value()->set_type(REDIS_TYPE_NONE);
476
10
  return Status::OK();
477
10
}
478
479
1
CHECKED_STATUS ParseSetRange(YBRedisWriteOp* op, const RedisClientCommand& args) {
480
1
  const auto& key = args[1];
481
1
  const auto& value = args[3];
482
1
  op->mutable_request()->mutable_key_value()->set_key(key.cdata(), key.size());
483
1
  op->mutable_request()->mutable_key_value()->add_value(value.cdata(), value.size());
484
485
1
  auto offset = ParseInt32(args[2], "offset");
486
1
  RETURN_NOT_OK(offset);
487
  // TODO: Should we have an upper bound?
488
  // A very large offset would allocate a lot of memory and maybe crash
489
1
  if (*offset < 0) {
490
0
    return STATUS_SUBSTITUTE(InvalidArgument,
491
0
        "offset field of SETRANGE must be non-negative, found: $0", *offset);
492
0
  }
493
1
  op->mutable_request()->mutable_set_range_request()->set_offset(*offset);
494
495
1
  return Status::OK();
496
1
}
497
498
10
CHECKED_STATUS ParseIncr(YBRedisWriteOp* op, const RedisClientCommand& args) {
499
10
  const auto& key = args[1];
500
10
  op->mutable_request()->mutable_incr_request()->set_increment_int(1);
501
10
  op->mutable_request()->mutable_key_value()->set_key(key.cdata(), key.size());
502
10
  op->mutable_request()->mutable_key_value()->set_type(REDIS_TYPE_STRING);
503
10
  return Status::OK();
504
10
}
505
506
7
CHECKED_STATUS ParseIncrBy(YBRedisWriteOp* op, const RedisClientCommand& args) {
507
7
  const auto& key = args[1];
508
7
  const auto& incr_by = ParseInt64(args[2], "INCR_BY");
509
7
  RETURN_NOT_OK(incr_by);
510
7
  op->mutable_request()->mutable_incr_request()->set_increment_int(*incr_by);
511
7
  op->mutable_request()->mutable_key_value()->set_key(key.cdata(), key.size());
512
7
  op->mutable_request()->mutable_key_value()->set_type(REDIS_TYPE_STRING);
513
7
  return Status::OK();
514
7
}
515
516
71.5k
CHECKED_STATUS ParseGet(YBRedisReadOp* op, const RedisClientCommand& args) {
517
71.5k
  const auto& key = args[1];
518
71.5k
  if (key.empty()) {
519
0
    return STATUS_SUBSTITUTE(InvalidCommand,
520
0
        "A GET request must have non empty key field");
521
0
  }
522
71.5k
  op->mutable_request()->mutable_key_value()->set_key(key.cdata(), key.size());
523
71.5k
  op->mutable_request()->mutable_get_request()->set_request_type(
524
71.5k
      RedisGetRequestPB_GetRequestType_GET);
525
71.5k
  return Status::OK();
526
71.5k
}
527
528
//  Used for HGET/HSTRLEN/HEXISTS. Also for HMGet
529
//  CMD <KEY> [<SUB-KEY>]*
530
CHECKED_STATUS ParseHGetLikeCommands(YBRedisReadOp* op, const RedisClientCommand& args,
531
                                     RedisGetRequestPB_GetRequestType request_type,
532
95
                                     bool remove_duplicates = false) {
533
95
  op->mutable_request()->mutable_get_request()->set_request_type(request_type);
534
95
  return ParseCollection(op, args, boost::none, add_string_subkey, remove_duplicates);
535
95
}
536
537
// TODO: Support MGET
538
0
CHECKED_STATUS ParseMGet(YBRedisReadOp* op, const RedisClientCommand& args) {
539
0
  return STATUS(InvalidCommand, "MGET command not yet supported");
540
0
}
541
542
14
CHECKED_STATUS ParseHGet(YBRedisReadOp* op, const RedisClientCommand& args) {
543
14
  return ParseHGetLikeCommands(op, args, RedisGetRequestPB_GetRequestType_HGET);
544
14
}
545
546
CHECKED_STATUS ParseTsBoundArg(const Slice& slice, RedisSubKeyBoundPB* bound_pb,
547
                               RedisCollectionGetRangeRequestPB::GetRangeRequestType request_type,
548
21.8k
                               bool exclusive) {
549
21.8k
  string bound(slice.cdata(), slice.size());
550
21.8k
  if (bound == kPositiveInfinity) {
551
78
    bound_pb->set_infinity_type(RedisSubKeyBoundPB_InfinityType_POSITIVE);
552
21.7k
  } else if (bound == kNegativeInfinity) {
553
80
    bound_pb->set_infinity_type(RedisSubKeyBoundPB_InfinityType_NEGATIVE);
554
21.7k
  } else {
555
21.7k
    bound_pb->set_is_exclusive(exclusive);
556
21.7k
    switch (request_type) {
557
21.6k
      case RedisCollectionGetRangeRequestPB_GetRangeRequestType_TSREVRANGEBYTIME:
558
21.6k
        FALLTHROUGH_INTENDED;
559
21.6k
      case RedisCollectionGetRangeRequestPB_GetRangeRequestType_TSRANGEBYTIME: {
560
21.6k
        auto ts_bound = CheckedStoll(slice);
561
21.6k
        RETURN_NOT_OK(ts_bound);
562
21.6k
        bound_pb->mutable_subkey_bound()->set_timestamp_subkey(*ts_bound);
563
21.6k
        break;
564
21.6k
      }
565
50
      case RedisCollectionGetRangeRequestPB_GetRangeRequestType_ZRANGEBYSCORE: {
566
50
        auto double_bound = CheckedStold(slice);
567
50
        RETURN_NOT_OK(double_bound);
568
50
        bound_pb->mutable_subkey_bound()->set_double_subkey(*double_bound);
569
50
        break;
570
50
      }
571
0
      default:
572
0
        return STATUS_SUBSTITUTE(InvalidArgument, "Invalid request type: $0", request_type);
573
21.7k
    }
574
575
21.7k
  }
576
21.8k
  return Status::OK();
577
21.8k
}
578
579
CHECKED_STATUS
580
40
ParseIndexBoundArg(const Slice& slice, RedisIndexBoundPB* bound_pb) {
581
40
  auto index_bound = CheckedStoll(slice);
582
40
  RETURN_NOT_OK(index_bound);
583
40
  bound_pb->set_index(*index_bound);
584
40
  return Status::OK();
585
40
}
586
587
CHECKED_STATUS
588
ParseTsSubKeyBound(const Slice& slice, RedisSubKeyBoundPB* bound_pb,
589
21.8k
                   RedisCollectionGetRangeRequestPB::GetRangeRequestType request_type) {
590
21.8k
  if (slice.empty()) {
591
0
    return STATUS(InvalidCommand, "range bound key cannot be empty");
592
0
  }
593
594
21.8k
  if (slice[0] == '(' && 
slice.size() > 13.56k
) {
595
3.56k
    auto slice_copy = slice;
596
3.56k
    slice_copy.remove_prefix(1);
597
3.56k
    RETURN_NOT_OK(ParseTsBoundArg(slice_copy, bound_pb, request_type, /* exclusive */ true));
598
18.3k
  } else {
599
18.3k
    RETURN_NOT_OK(ParseTsBoundArg(slice, bound_pb, request_type, /* exclusive */ false));
600
18.3k
  }
601
21.8k
  return Status::OK();
602
21.8k
}
603
604
40
CHECKED_STATUS ParseIndexBound(const Slice& slice, RedisIndexBoundPB* bound_pb) {
605
40
  if (slice.empty()) {
606
0
    return STATUS(InvalidArgument, "range bound index cannot be empty");
607
0
  }
608
40
  RETURN_NOT_OK(ParseIndexBoundArg(slice, bound_pb));
609
40
  return Status::OK();
610
40
}
611
612
36
CHECKED_STATUS ParseTsCard(YBRedisReadOp* op, const RedisClientCommand& args) {
613
36
  return ParseHGetLikeCommands(op, args, RedisGetRequestPB_GetRequestType_TSCARD);
614
36
}
615
616
36
CHECKED_STATUS ParseTsLastN(YBRedisReadOp* op, const RedisClientCommand& args) {
617
  // TSLastN is basically TSRangeByTime -INF, INF with a limit on number of entries. Note that
618
  // there is a subtle difference here since TSRangeByTime iterates on entries from highest to
619
  // lowest and hence we end up returning the highest N entries. This operation is more like
620
  // TSRevRangeByTime -INF, INF with a limit (Note that TSRevRangeByTime is not implemented).
621
36
  op->mutable_request()->mutable_get_collection_range_request()->set_request_type(
622
36
      RedisCollectionGetRangeRequestPB_GetRangeRequestType_TSRANGEBYTIME);
623
36
  const auto& key = args[1];
624
36
  auto limit = ParseInt32(args[2], "limit");
625
36
  RETURN_NOT_OK(limit);
626
36
  if ((*limit) <= 0) {
627
0
    return STATUS_SUBSTITUTE(InvalidArgument,
628
0
        "$0 field $1 is not within valid bounds", "limit", args[2].ToDebugString());
629
0
  }
630
36
  op->mutable_request()->mutable_key_value()->set_key(key.ToBuffer());
631
36
  op->mutable_request()->set_range_request_limit(*limit);
632
36
  op->mutable_request()->mutable_subkey_range()->mutable_lower_bound()->set_infinity_type
633
36
      (RedisSubKeyBoundPB_InfinityType_NEGATIVE);
634
36
  op->mutable_request()->mutable_subkey_range()->mutable_upper_bound()->set_infinity_type
635
36
      (RedisSubKeyBoundPB_InfinityType_POSITIVE);
636
36
  return Status::OK();
637
36
}
638
639
30
CHECKED_STATUS ParseTsRangeByTime(YBRedisReadOp* op, const RedisClientCommand& args) {
640
30
  op->mutable_request()->mutable_get_collection_range_request()->set_request_type(
641
30
      RedisCollectionGetRangeRequestPB_GetRangeRequestType_TSRANGEBYTIME);
642
643
30
  const auto& key = args[1];
644
30
  RETURN_NOT_OK(ParseTsSubKeyBound(
645
30
      args[2],
646
30
      op->mutable_request()->mutable_subkey_range()->mutable_lower_bound(),
647
30
      RedisCollectionGetRangeRequestPB_GetRangeRequestType_TSRANGEBYTIME));
648
24
  RETURN_NOT_OK(ParseTsSubKeyBound(
649
24
      args[3],
650
24
      op->mutable_request()->mutable_subkey_range()->mutable_upper_bound(),
651
24
      RedisCollectionGetRangeRequestPB_GetRangeRequestType_TSRANGEBYTIME));
652
653
24
  op->mutable_request()->mutable_key_value()->set_key(key.ToBuffer());
654
24
  return Status::OK();
655
24
}
656
657
10.8k
CHECKED_STATUS ParseTsRevRangeByTime(YBRedisReadOp* op, const RedisClientCommand& args) {
658
10.8k
  op->mutable_request()->mutable_get_collection_range_request()->set_request_type(
659
10.8k
      RedisCollectionGetRangeRequestPB_GetRangeRequestType_TSREVRANGEBYTIME);
660
661
10.8k
  const auto& key = args[1];
662
10.8k
      RETURN_NOT_OK(ParseTsSubKeyBound(
663
10.8k
      args[2],
664
10.8k
      op->mutable_request()->mutable_subkey_range()->mutable_lower_bound(),
665
10.8k
      RedisCollectionGetRangeRequestPB_GetRangeRequestType_TSREVRANGEBYTIME));
666
10.8k
      RETURN_NOT_OK(ParseTsSubKeyBound(
667
10.8k
      args[3],
668
10.8k
      op->mutable_request()->mutable_subkey_range()->mutable_upper_bound(),
669
10.8k
      RedisCollectionGetRangeRequestPB_GetRangeRequestType_TSREVRANGEBYTIME));
670
671
10.8k
  op->mutable_request()->mutable_key_value()->set_key(key.ToBuffer());
672
673
10.8k
  if (args.size() > 4) {
674
5.40k
    if (args.size() != 6) {
675
0
      return STATUS_SUBSTITUTE(InvalidCommand,
676
0
                               "Invalid number of arguments. Command should have 4 or 6 arguments");
677
0
    }
678
5.40k
    string upper_arg;
679
5.40k
    ToUpperCase(args[4].ToBuffer(), &upper_arg);
680
5.40k
    if (upper_arg != "LIMIT") {
681
0
      return STATUS_SUBSTITUTE(InvalidArgument,
682
0
                               "Invalid argument $0. Expecting $1", args[4].ToBuffer(), "limit");
683
0
    }
684
5.40k
    auto limit = ParseInt32(args[5], "limit");
685
5.40k
    RETURN_NOT_OK(limit);
686
5.40k
    if ((*limit) <= 0) {
687
0
      return STATUS_SUBSTITUTE(InvalidArgument,
688
0
                               "$0 field $1 is not within valid bounds", "limit",
689
0
                               args[5].ToDebugString());
690
0
    }
691
5.40k
    op->mutable_request()->set_range_request_limit(*limit);
692
5.40k
  }
693
694
10.8k
  return Status::OK();
695
10.8k
}
696
697
8
CHECKED_STATUS ParseWithScores(const Slice& slice, RedisCollectionGetRangeRequestPB* request) {
698
8
  if(!boost::iequals(slice.ToBuffer(), kWithScores)) {
699
0
    return STATUS_SUBSTITUTE(InvalidArgument, "unexpected argument $0", slice.ToBuffer());
700
0
  }
701
8
  request->set_with_scores(true);
702
8
  return Status::OK();
703
8
}
704
705
98
CHECKED_STATUS ParseRangeByScoreOptions(YBRedisReadOp* op, const RedisClientCommand& args) {
706
98
  auto args_size = args.size();
707
172
  for (size_t i = 4; i < args_size; 
++i74
) {
708
74
    string upper_arg;
709
74
    ToUpperCase(args[i].ToBuffer(), &upper_arg);
710
74
    if (upper_arg == "LIMIT") {
711
50
      if (i >= args_size - 2) {
712
0
        return STATUS_SUBSTITUTE(InvalidArgument, "Not enough args passed into LIMIT clause, "
713
0
            "expected 2 but found $0", args_size - (i + 1));
714
0
      }
715
50
      auto offset = VERIFY_RESULT(ParseInt64(args[++i], "offset"));
716
50
      auto limit = VERIFY_RESULT(ParseInt32(args[++i], "count"));
717
0
      op->mutable_request()->mutable_index_range()->mutable_lower_bound()->set_index(offset);
718
50
      op->mutable_request()->set_range_request_limit(limit);
719
50
    } else 
if (24
upper_arg == "WITHSCORES"24
) {
720
24
      op->mutable_request()->mutable_get_collection_range_request()->set_with_scores(true);
721
24
    } else {
722
0
      return STATUS_SUBSTITUTE(InvalidArgument, "Invalid argument $0", args[i].ToBuffer());
723
0
    }
724
74
  }
725
98
  return Status::OK();
726
98
}
727
728
98
CHECKED_STATUS ParseZRangeByScore(YBRedisReadOp* op, const RedisClientCommand& args) {
729
98
  op->mutable_request()->mutable_get_collection_range_request()->set_request_type(
730
98
      RedisCollectionGetRangeRequestPB_GetRangeRequestType_ZRANGEBYSCORE);
731
732
98
  const auto& key = args[1];
733
98
  RETURN_NOT_OK(ParseTsSubKeyBound(
734
98
  args[2],
735
98
  op->mutable_request()->mutable_subkey_range()->mutable_lower_bound(),
736
98
  RedisCollectionGetRangeRequestPB_GetRangeRequestType_ZRANGEBYSCORE));
737
98
  RETURN_NOT_OK(ParseTsSubKeyBound(
738
98
  args[3],
739
98
  op->mutable_request()->mutable_subkey_range()->mutable_upper_bound(),
740
98
  RedisCollectionGetRangeRequestPB_GetRangeRequestType_ZRANGEBYSCORE));
741
98
  op->mutable_request()->mutable_key_value()->set_key(key.ToBuffer());
742
743
98
  return ParseRangeByScoreOptions(op, args);
744
98
}
745
746
CHECKED_STATUS ParseIndexBasedQuery(
747
    YBRedisReadOp* op,
748
    const RedisClientCommand& args,
749
20
    RedisCollectionGetRangeRequestPB::GetRangeRequestType request_type) {
750
20
  if (args.size() <= 5) {
751
20
    op->mutable_request()->mutable_get_collection_range_request()->set_request_type(request_type);
752
753
20
    const auto& key = args[1];
754
20
    RETURN_NOT_OK(ParseIndexBound(
755
20
    args[2],
756
20
    op->mutable_request()->mutable_index_range()->mutable_lower_bound()));
757
20
    RETURN_NOT_OK(ParseIndexBound(
758
20
    args[3],
759
20
    op->mutable_request()->mutable_index_range()->mutable_upper_bound()));
760
20
    op->mutable_request()->mutable_key_value()->set_key(key.ToBuffer());
761
20
    if (args.size() == 5) {
762
8
      RETURN_NOT_OK(ParseWithScores(
763
8
      args[4],
764
8
      op->mutable_request()->mutable_get_collection_range_request()));
765
8
    }
766
20
    return Status::OK();
767
20
  }
768
0
  return STATUS(InvalidArgument, "Expected at most 5 arguments, found $0",
769
20
                  std::to_string(args.size()));
770
20
}
771
772
12
CHECKED_STATUS ParseZRange(YBRedisReadOp* op, const RedisClientCommand& args) {
773
12
  return ParseIndexBasedQuery(
774
12
      op, args, RedisCollectionGetRangeRequestPB_GetRangeRequestType_ZRANGE);
775
12
}
776
777
8
CHECKED_STATUS ParseZRevRange(YBRedisReadOp* op, const RedisClientCommand& args) {
778
8
  return ParseIndexBasedQuery(
779
8
      op, args, RedisCollectionGetRangeRequestPB_GetRangeRequestType_ZREVRANGE);
780
8
}
781
782
1.21k
CHECKED_STATUS ParseTsGet(YBRedisReadOp* op, const RedisClientCommand& args) {
783
1.21k
  op->mutable_request()->mutable_get_request()->set_request_type(
784
1.21k
      RedisGetRequestPB_GetRequestType_TSGET);
785
786
1.21k
  const auto& key = args[1];
787
1.21k
  op->mutable_request()->mutable_key_value()->set_key(key.cdata(), key.size());
788
1.21k
  auto timestamp = CheckedStoll(args[2]);
789
1.21k
  RETURN_NOT_OK(timestamp);
790
1.21k
  op->mutable_request()->mutable_key_value()->add_subkey()->set_timestamp_subkey(*timestamp);
791
792
1.21k
  return Status::OK();
793
1.21k
}
794
795
52
CHECKED_STATUS ParseZScore(YBRedisReadOp* op, const RedisClientCommand& args) {
796
52
  op->mutable_request()->mutable_get_request()->set_request_type(
797
52
      RedisGetRequestPB_GetRequestType_ZSCORE);
798
799
52
  const auto& key = args[1];
800
52
  op->mutable_request()->mutable_key_value()->set_key(key.cdata(), key.size());
801
52
  auto member = args[2].ToBuffer();
802
52
  op->mutable_request()->mutable_key_value()->add_subkey()->set_string_subkey(member);
803
804
52
  return Status::OK();
805
52
}
806
807
0
CHECKED_STATUS ParseHStrLen(YBRedisReadOp* op, const RedisClientCommand& args) {
808
0
  return ParseHGetLikeCommands(op, args, RedisGetRequestPB_GetRequestType_HSTRLEN);
809
0
}
810
811
6
CHECKED_STATUS ParseHExists(YBRedisReadOp* op, const RedisClientCommand& args) {
812
6
  return ParseHGetLikeCommands(op, args, RedisGetRequestPB_GetRequestType_HEXISTS);
813
6
}
814
815
3
CHECKED_STATUS ParseHMGet(YBRedisReadOp* op, const RedisClientCommand& args) {
816
3
  return ParseHGetLikeCommands(op, args, RedisGetRequestPB_GetRequestType_HMGET);
817
3
}
818
819
5
CHECKED_STATUS ParseHGetAll(YBRedisReadOp* op, const RedisClientCommand& args) {
820
5
  return ParseHGetLikeCommands(op, args, RedisGetRequestPB_GetRequestType_HGETALL);
821
5
}
822
823
3
CHECKED_STATUS ParseHKeys(YBRedisReadOp* op, const RedisClientCommand& args) {
824
3
  return ParseHGetLikeCommands(op, args, RedisGetRequestPB_GetRequestType_HKEYS);
825
3
}
826
827
3
CHECKED_STATUS ParseHVals(YBRedisReadOp* op, const RedisClientCommand& args) {
828
3
  return ParseHGetLikeCommands(op, args, RedisGetRequestPB_GetRequestType_HVALS);
829
3
}
830
831
4
CHECKED_STATUS ParseHLen(YBRedisReadOp* op, const RedisClientCommand& args) {
832
4
  return ParseHGetLikeCommands(op, args, RedisGetRequestPB_GetRequestType_HLEN);
833
4
}
834
835
5
CHECKED_STATUS ParseSMembers(YBRedisReadOp* op, const RedisClientCommand& args) {
836
5
  return ParseHGetLikeCommands(op, args, RedisGetRequestPB_GetRequestType_SMEMBERS);
837
5
}
838
839
4
CHECKED_STATUS ParseSIsMember(YBRedisReadOp* op, const RedisClientCommand& args) {
840
4
  return ParseHGetLikeCommands(op, args, RedisGetRequestPB_GetRequestType_SISMEMBER);
841
4
}
842
843
4
CHECKED_STATUS ParseSCard(YBRedisReadOp* op, const RedisClientCommand& args) {
844
4
  return ParseHGetLikeCommands(op, args, RedisGetRequestPB_GetRequestType_SCARD);
845
4
}
846
847
6
CHECKED_STATUS ParseLLen(YBRedisReadOp* op, const RedisClientCommand& args) {
848
6
  return ParseHGetLikeCommands(op, args, RedisGetRequestPB_GetRequestType_LLEN);
849
6
}
850
851
2
CHECKED_STATUS ParseZCard(YBRedisReadOp* op, const RedisClientCommand& args) {
852
2
  return ParseHGetLikeCommands(op, args, RedisGetRequestPB_GetRequestType_ZCARD);
853
2
}
854
855
2
CHECKED_STATUS ParseStrLen(YBRedisReadOp* op, const RedisClientCommand& args) {
856
2
  op->mutable_request()->mutable_strlen_request(); // Allocates new RedisStrLenRequestPB().
857
2
  const auto& key = args[1];
858
2
  op->mutable_request()->mutable_key_value()->set_key(key.cdata(), key.size());
859
2
  return Status::OK();
860
2
}
861
862
// Note: Checking existence of only one key is supported as of now.
863
4
CHECKED_STATUS ParseExists(YBRedisReadOp* op, const RedisClientCommand& args) {
864
4
  op->mutable_request()->mutable_exists_request(); // Allocates new RedisExistsRequestPB().
865
4
  const auto& key = args[1];
866
4
  op->mutable_request()->mutable_key_value()->set_key(key.cdata(), key.size());
867
4
  return Status::OK();
868
4
}
869
870
2
CHECKED_STATUS ParseGetRange(YBRedisReadOp* op, const RedisClientCommand& args) {
871
2
  const auto& key = args[1];
872
2
  op->mutable_request()->mutable_key_value()->set_key(key.cdata(), key.size());
873
874
2
  auto start = ParseInt32(args[2], "Start");
875
2
  RETURN_NOT_OK(start);
876
2
  op->mutable_request()->mutable_get_range_request()->set_start(*start);
877
878
2
  auto end = ParseInt32(args[3], "End");
879
2
  RETURN_NOT_OK(end);
880
2
  op->mutable_request()->mutable_get_range_request()->set_end(*end);
881
882
2
  return Status::OK();
883
2
}
884
885
CHECKED_STATUS ParseExpire(YBRedisWriteOp* op,
886
                           const RedisClientCommand& args,
887
94
                           const bool using_millis) {
888
94
  const auto& key = args[1];
889
94
  auto ttl = ParseInt64(args[2], "TTL");
890
94
  RETURN_NOT_OK(ttl);
891
  // If the TTL is not positive, we immediately delete.
892
94
  if (*ttl <= 0) {
893
12
      op->mutable_request()->mutable_del_request(); // Allocates new RedisDelRequestPB().
894
12
      op->mutable_request()->mutable_key_value()->set_key(key.cdata(), key.size());
895
12
      op->mutable_request()->mutable_key_value()->set_type(REDIS_TYPE_NONE);
896
12
      return Status::OK();
897
12
  }
898
82
  *ttl *= using_millis ? 
110
:
MonoTime::kMillisecondsPerSecond72
;
899
82
  if (*ttl < kRedisMinTtlMillis || *ttl > kRedisMaxTtlMillis) {
900
0
    return STATUS_FORMAT(InvalidCommand,
901
0
        "TTL field $0 is not within valid bounds", args[2]);
902
0
  }
903
82
  op->mutable_request()->mutable_key_value()->set_key(key.cdata(), key.size());
904
82
  op->mutable_request()->mutable_set_ttl_request()->set_ttl(*ttl);
905
82
  return Status::OK();
906
82
}
907
908
84
CHECKED_STATUS ParseExpire(YBRedisWriteOp* op, const RedisClientCommand& args) {
909
84
  return ParseExpire(op, args, false);
910
84
}
911
912
10
CHECKED_STATUS ParsePExpire(YBRedisWriteOp* op, const RedisClientCommand& args) {
913
10
  return ParseExpire(op, args, true);
914
10
}
915
916
26
CHECKED_STATUS ParsePersist(YBRedisWriteOp* op, const RedisClientCommand& args) {
917
26
  const auto& key = args[1];
918
26
  op->mutable_request()->mutable_key_value()->set_key(key.cdata(), key.size());
919
26
  op->mutable_request()->mutable_set_ttl_request()->set_ttl(-1);
920
26
  return Status::OK();
921
26
}
922
923
CHECKED_STATUS ParseExpireAt(YBRedisWriteOp* op,
924
                           const RedisClientCommand& args,
925
6
                           const bool using_millis) {
926
6
  const auto& key = args[1];
927
6
  auto expiration = VERIFY_RESULT(ParseInt64(args[2], "expiration"));
928
  // If the TTL is not positive, we immediately delete.
929
6
  if (expiration <= 0) {
930
0
      op->mutable_request()->mutable_del_request(); // Allocates new RedisDelRequestPB().
931
0
      op->mutable_request()->mutable_key_value()->set_key(key.cdata(), key.size());
932
0
      op->mutable_request()->mutable_key_value()->set_type(REDIS_TYPE_NONE);
933
0
      return Status::OK();
934
0
  }
935
6
  expiration *= using_millis ? 
12
:
MonoTime::kMillisecondsPerSecond4
;
936
6
  op->mutable_request()->mutable_key_value()->set_key(key.cdata(), key.size());
937
6
  op->mutable_request()->mutable_set_ttl_request()->set_absolute_time(expiration);
938
6
  return Status::OK();
939
6
}
940
941
4
CHECKED_STATUS ParseExpireAt(YBRedisWriteOp* op, const RedisClientCommand& args) {
942
4
  return ParseExpireAt(op, args, /* using_millis */ false);
943
4
}
944
945
2
CHECKED_STATUS ParsePExpireAt(YBRedisWriteOp* op, const RedisClientCommand& args) {
946
2
  return ParseExpireAt(op, args, /* using_millis */ true);
947
2
}
948
949
CHECKED_STATUS ParseSetEx(YBRedisWriteOp* op,
950
                          const RedisClientCommand& args,
951
24
                          const bool using_millis) {
952
24
  const auto& key = args[1];
953
24
  const auto value = args[3];
954
24
  auto ttl = VERIFY_RESULT(ParseInt64(args[2], "TTL"));
955
24
  if (ttl <= 0) {
956
6
    op->mutable_request()->mutable_no_op_request(); // Allocates new RedisNoOpRequestPB().
957
6
    return Status::OK();
958
6
  }
959
18
  ttl *= using_millis ? 
18
:
MonoTime::kMillisecondsPerSecond10
;
960
18
  if (ttl < kRedisMinTtlMillis || ttl > kRedisMaxTtlMillis) {
961
0
    return STATUS_FORMAT(InvalidCommand,
962
0
        "TTL field $0 is not within valid bounds", args[3]);
963
0
  }
964
18
  op->mutable_request()->mutable_key_value()->set_key(key.cdata(), key.size());
965
18
  op->mutable_request()->mutable_key_value()->add_value(value.cdata(), value.size());
966
18
  op->mutable_request()->mutable_key_value()->set_type(REDIS_TYPE_STRING);
967
18
  op->mutable_request()->mutable_set_request()->set_ttl(ttl);
968
18
  return Status::OK();
969
18
}
970
971
16
CHECKED_STATUS ParseSetEx(YBRedisWriteOp* op, const RedisClientCommand& args) {
972
16
  return ParseSetEx(op, args, false);
973
16
}
974
975
8
CHECKED_STATUS ParsePSetEx(YBRedisWriteOp* op, const RedisClientCommand& args) {
976
8
  return ParseSetEx(op, args, true);
977
8
}
978
979
CHECKED_STATUS ParseTtl(YBRedisReadOp* op,
980
                        const RedisClientCommand& args,
981
287
                        const bool return_seconds) {
982
287
  const auto& key = args[1];
983
287
  op->mutable_request()->mutable_key_value()->set_key(key.cdata(), key.size());
984
287
  op->mutable_request()->mutable_get_ttl_request()->set_return_seconds(return_seconds);
985
287
  return Status::OK();
986
287
}
987
988
149
CHECKED_STATUS ParseTtl(YBRedisReadOp* op, const RedisClientCommand& args) {
989
149
  return ParseTtl(op, args, true);
990
149
}
991
992
138
CHECKED_STATUS ParsePTtl(YBRedisReadOp* op, const RedisClientCommand& args) {
993
138
  return ParseTtl(op, args, false);
994
138
}
995
996
// Begin of input is going to be consumed, so we should adjust our pointers.
997
// Since the beginning of input is being consumed by shifting the remaining bytes to the
998
// beginning of the buffer.
999
215k
void RedisParser::Consume(size_t count) {
1000
215k
  pos_ -= count;
1001
1002
215k
  if (token_begin_ != kNoToken) {
1003
215k
    token_begin_ -= count;
1004
215k
  }
1005
215k
}
1006
1007
// New data arrived, so update the end of available bytes.
1008
213k
void RedisParser::Update(const IoVecs& data) {
1009
213k
  source_ = data;
1010
213k
  full_size_ = IoVecsFullSize(data);
1011
213k
  DCHECK_LE(pos_, full_size_);
1012
213k
}
1013
1014
// Parse next command.
1015
634k
Result<size_t> RedisParser::NextCommand() {
1016
4.08M
  while (pos_ != full_size_) {
1017
3.87M
    incomplete_ = false;
1018
3.87M
    Status status = AdvanceToNextToken();
1019
3.87M
    if (!status.ok()) {
1020
15
      return status;
1021
15
    }
1022
3.87M
    if (incomplete_) {
1023
5.37k
      pos_ = full_size_;
1024
5.37k
      return 0;
1025
5.37k
    }
1026
3.86M
    if (state_ == State::FINISHED) {
1027
419k
      state_ = State::INITIAL;
1028
419k
      return pos_;
1029
419k
    }
1030
3.86M
  }
1031
209k
  return 0;
1032
634k
}
1033
1034
3.87M
CHECKED_STATUS RedisParser::AdvanceToNextToken() {
1035
3.87M
  switch (state_) {
1036
419k
    case State::INITIAL:
1037
419k
      return Initial();
1038
15
    case State::SINGLE_LINE:
1039
15
      return SingleLine();
1040
419k
    case State::BULK_HEADER:
1041
419k
      return BulkHeader();
1042
1.51M
    case State::BULK_ARGUMENT_SIZE:
1043
1.51M
      return BulkArgumentSize();
1044
1.52M
    case State::BULK_ARGUMENT_BODY:
1045
1.52M
      return BulkArgumentBody();
1046
0
    case State::FINISHED:
1047
0
      return STATUS(IllegalState, "Should not be in FINISHED state during NextToken");
1048
3.87M
  }
1049
0
  LOG(FATAL) << "Unexpected parser state: " << to_underlying(state_);
1050
0
}
1051
1052
419k
CHECKED_STATUS RedisParser::Initial() {
1053
419k
  token_begin_ = pos_;
1054
419k
  state_ = char_at_offset(pos_) == '*' ? 
State::BULK_HEADER419k
:
State::SINGLE_LINE16
;
1055
419k
  return Status::OK();
1056
419k
}
1057
1058
15
CHECKED_STATUS RedisParser::SingleLine() {
1059
15
  auto status = FindEndOfLine();
1060
15
  if (!status.ok() || 
incomplete_0
) {
1061
15
    return status;
1062
15
  }
1063
0
  auto start = token_begin_;
1064
0
  auto finish = pos_ - 2;
1065
0
  while (start < finish && isspace(char_at_offset(start))) {
1066
0
    ++start;
1067
0
  }
1068
0
  if (start >= finish) {
1069
0
    return STATUS(InvalidArgument, "Empty line");
1070
0
  }
1071
0
  if (args_) {
1072
    // Args is supported only when parsing from single block of data.
1073
    // Because we parse prepared call data in this case, that is contained in a single buffer.
1074
0
    DCHECK_EQ(source_.size(), 1);
1075
0
    RETURN_NOT_OK(util::SplitArgs(Slice(offset_to_pointer(start), finish - start), args_));
1076
0
  }
1077
0
  state_ = State::FINISHED;
1078
0
  return Status::OK();
1079
0
}
1080
1081
419k
CHECKED_STATUS RedisParser::BulkHeader() {
1082
419k
  auto status = FindEndOfLine();
1083
419k
  if (!status.ok() || incomplete_) {
1084
0
    return status;
1085
0
  }
1086
419k
  auto num_args = VERIFY_RESULT(ParseNumber(
1087
419k
      '*', 1, kMaxNumberOfArgs, "Number of lines in multiline"));
1088
419k
  if (args_) {
1089
209k
    args_->clear();
1090
209k
    args_->reserve(num_args);
1091
209k
  }
1092
419k
  state_ = State::BULK_ARGUMENT_SIZE;
1093
419k
  token_begin_ = pos_;
1094
419k
  arguments_left_ = num_args;
1095
419k
  return Status::OK();
1096
419k
}
1097
1098
1.51M
CHECKED_STATUS RedisParser::BulkArgumentSize() {
1099
1.51M
  auto status = FindEndOfLine();
1100
1.51M
  if (!status.ok() || 
incomplete_1.51M
) {
1101
23
    return status;
1102
23
  }
1103
1.51M
  auto current_size = VERIFY_RESULT(ParseNumber('$', 0, kMaxRedisValueSize, "Argument size"));
1104
0
  state_ = State::BULK_ARGUMENT_BODY;
1105
1.51M
  token_begin_ = pos_;
1106
1.51M
  current_argument_size_ = current_size;
1107
1.51M
  return Status::OK();
1108
1.51M
}
1109
1110
1.52M
CHECKED_STATUS RedisParser::BulkArgumentBody() {
1111
1.52M
  auto desired_position = token_begin_ + current_argument_size_ + kLineEndLength;
1112
1.52M
  if (desired_position > full_size_) {
1113
5.35k
    incomplete_ = true;
1114
5.35k
    pos_ = full_size_;
1115
5.35k
    return Status::OK();
1116
5.35k
  }
1117
1.51M
  if (char_at_offset(desired_position - 1) != '\n' ||
1118
1.51M
      char_at_offset(desired_position - 2) != '\r') {
1119
0
    return STATUS(NetworkError, "No \\r\\n after bulk");
1120
0
  }
1121
1.51M
  if (args_) {
1122
    // Args is supported only when parsing from single block of data.
1123
    // Because we parse prepared call data in this case, that is contained in a single buffer.
1124
757k
    DCHECK_EQ(source_.size(), 1);
1125
757k
    args_->emplace_back(offset_to_pointer(token_begin_), current_argument_size_);
1126
757k
  }
1127
1.51M
  --arguments_left_;
1128
1.51M
  pos_ = desired_position;
1129
1.51M
  token_begin_ = pos_;
1130
1.51M
  if (arguments_left_ == 0) {
1131
419k
    state_ = State::FINISHED;
1132
1.09M
  } else {
1133
1.09M
    state_ = State::BULK_ARGUMENT_SIZE;
1134
1.09M
  }
1135
1.51M
  return Status::OK();
1136
1.51M
}
1137
1138
1.93M
CHECKED_STATUS RedisParser::FindEndOfLine() {
1139
1.93M
  auto p = offset_to_idx_and_local_offset(pos_);
1140
1141
1.93M
  size_t new_line_offset = pos_;
1142
1.93M
  while (p.first != source_.size()) {
1143
1.93M
    auto begin = IoVecBegin(source_[p.first]) + p.second;
1144
1.93M
    auto new_line = static_cast<const char*>(memchr(
1145
1.93M
        begin, '\n', IoVecEnd(source_[p.first]) - begin));
1146
1.93M
    if (new_line) {
1147
1.93M
      new_line_offset += new_line - begin;
1148
1.93M
      break;
1149
1.93M
    }
1150
24
    new_line_offset += source_[p.first].iov_len - p.second;
1151
24
    ++p.first;
1152
24
    p.second = 0;
1153
24
  }
1154
1155
1.93M
  incomplete_ = p.first == source_.size();
1156
1.93M
  if (!incomplete_) {
1157
1.93M
    if (new_line_offset == token_begin_) {
1158
0
      return STATUS(NetworkError, "End of line at the beginning of a Redis command");
1159
0
    }
1160
1.93M
    if (char_at_offset(new_line_offset - 1) != '\r') {
1161
15
      return STATUS(NetworkError, "\\n is not prefixed with \\r");
1162
15
    }
1163
1.93M
    pos_ = ++new_line_offset;
1164
1.93M
  }
1165
1.93M
  return Status::OK();
1166
1.93M
}
1167
1168
10.0M
std::pair<size_t, size_t> RedisParser::offset_to_idx_and_local_offset(size_t offset) const {
1169
  // We assume that there are at most 2 blocks of data.
1170
10.0M
  if (offset < source_[0].iov_len) {
1171
10.0M
    return std::pair<size_t, size_t>(0, offset);
1172
10.0M
  }
1173
1174
371
  offset -= source_[0].iov_len;
1175
371
  size_t idx = offset / source_[1].iov_len;
1176
371
  offset -= idx * source_[1].iov_len;
1177
1178
371
  return std::pair<size_t, size_t>(idx + 1, offset);
1179
10.0M
}
1180
1181
8.07M
const char* RedisParser::offset_to_pointer(size_t offset) const {
1182
8.07M
  auto p = offset_to_idx_and_local_offset(offset);
1183
8.07M
  return IoVecBegin(source_[p.first]) + p.second;
1184
8.07M
}
1185
1186
// Parses number with specified bounds.
1187
// Number is located in separate line, and contain prefix before actual number.
1188
// Line starts at token_begin_ and pos_ is a start of next line.
1189
Result<ptrdiff_t> RedisParser::ParseNumber(char prefix,
1190
                                           ptrdiff_t min,
1191
                                           ptrdiff_t max,
1192
1.93M
                                           const char* name) {
1193
1.93M
  if (char_at_offset(token_begin_) != prefix) {
1194
0
    return STATUS_FORMAT(Corruption,
1195
0
                         "Invalid character before number, expected: $0, but found: $1",
1196
0
                         prefix, char_at_offset(token_begin_));
1197
0
  }
1198
1.93M
  auto number_begin = token_begin_ + 1;
1199
1.93M
  auto expected_stop = pos_ - kLineEndLength;
1200
1.93M
  if (expected_stop - number_begin > kMaxNumberLength) {
1201
0
    return STATUS_FORMAT(
1202
0
        Corruption, "Too long $0 of length $1", name, expected_stop - number_begin);
1203
0
  }
1204
1.93M
  number_buffer_.reserve(kMaxNumberLength);
1205
1.93M
  IoVecsToBuffer(source_, number_begin, expected_stop, &number_buffer_);
1206
1.93M
  number_buffer_.push_back(0);
1207
1.93M
  auto parsed_number = VERIFY_RESULT(CheckedStoll(
1208
1.93M
      Slice(number_buffer_.data(), number_buffer_.size() - 1)));
1209
0
  static_assert(sizeof(parsed_number) == sizeof(ptrdiff_t), "Expected size");
1210
1.93M
  SCHECK_BOUNDS(parsed_number,
1211
1.93M
                min,
1212
1.93M
                max,
1213
1.93M
                Corruption,
1214
1.93M
                yb::Format("$0 out of expected range [$1, $2] : $3",
1215
1.93M
                           name, min, max, parsed_number));
1216
1.93M
  return static_cast<ptrdiff_t>(parsed_number);
1217
1.93M
}
1218
1219
209k
void RedisParser::SetArgs(boost::container::small_vector_base<Slice>* args) {
1220
209k
  DCHECK_EQ(source_.size(), 1);
1221
209k
  args_ = args;
1222
209k
}
1223
1224
}  // namespace redisserver
1225
}  // namespace yb