YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/yql/redis/redisserver/redis_parser.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include "yb/yql/redis/redisserver/redis_parser.h"
15
16
#include <memory>
17
#include <string>
18
19
#include <boost/algorithm/string.hpp>
20
21
#include "yb/client/callbacks.h"
22
#include "yb/client/yb_op.h"
23
24
#include "yb/common/redis_protocol.pb.h"
25
26
#include "yb/gutil/casts.h"
27
28
#include "yb/util/split.h"
29
#include "yb/util/status.h"
30
#include "yb/util/status_format.h"
31
#include "yb/util/stol_utils.h"
32
#include "yb/util/string_case.h"
33
34
#include "yb/yql/redis/redisserver/redis_constants.h"
35
36
namespace yb {
37
namespace redisserver {
38
39
using yb::client::YBTable;
40
using yb::client::YBRedisWriteOp;
41
using yb::client::YBRedisReadOp;
42
using std::vector;
43
using std::shared_ptr;
44
using std::string;
45
46
namespace {
47
48
constexpr size_t kMaxNumberOfArgs = 1 << 20;
49
constexpr size_t kLineEndLength = 2;
50
constexpr size_t kMaxNumberLength = 25;
51
constexpr char kPositiveInfinity[] = "+inf";
52
constexpr char kNegativeInfinity[] = "-inf";
53
54
20.4k
string to_lower_case(Slice slice) {
55
20.4k
  return boost::to_lower_copy(slice.ToBuffer());
56
20.4k
}
57
58
29.7k
CHECKED_STATUS add_string_subkey(string subkey, RedisKeyValuePB* kv_pb) {
59
29.7k
  kv_pb->add_subkey()->set_string_subkey(std::move(subkey));
60
29.7k
  return Status::OK();
61
29.7k
}
62
63
15.1k
CHECKED_STATUS add_timestamp_subkey(const string &subkey, RedisKeyValuePB *kv_pb) {
64
15.1k
  auto timestamp = CheckedStoll(subkey);
65
15.1k
  RETURN_NOT_OK(timestamp);
66
15.1k
  kv_pb->add_subkey()->set_timestamp_subkey(*timestamp);
67
15.1k
  return Status::OK();
68
15.1k
}
69
70
10.5k
CHECKED_STATUS add_double_subkey(const string &subkey, RedisKeyValuePB *kv_pb) {
71
10.5k
  auto double_key = CheckedStold(subkey);
72
10.5k
  RETURN_NOT_OK(double_key);
73
10.5k
  kv_pb->add_subkey()->set_double_subkey(*double_key);
74
10.5k
  return Status::OK();
75
10.5k
}
76
77
2.83k
Result<int64_t> ParseInt64(const Slice& slice, const char* field) {
78
2.83k
  auto result = CheckedStoll(slice);
79
2.83k
  if (!result.ok()) {
80
0
    return STATUS_SUBSTITUTE(InvalidArgument,
81
0
        "$0 field $1 is not a valid number", field, slice.ToDebugString());
82
0
  }
83
2.83k
  return *result;
84
2.83k
}
85
86
2.73k
Result<int32_t> ParseInt32(const Slice& slice, const char* field) {
87
2.73k
  auto val = ParseInt64(slice, field);
88
2.73k
  if (!val.ok()) {
89
0
    return std::move(val.status());
90
0
  }
91
2.73k
  if (*val < std::numeric_limits<int32_t>::min() ||
92
2.73k
      *val > std::numeric_limits<int32_t>::max()) {
93
0
    return STATUS_SUBSTITUTE(InvalidArgument,
94
0
        "$0 field $1 is not within valid bounds", field, slice.ToDebugString());
95
0
  }
96
2.73k
  return static_cast<int32_t>(*val);
97
2.73k
}
98
99
} // namespace
100
101
40.9k
CHECKED_STATUS ParseSet(YBRedisWriteOp *op, const RedisClientCommand& args) {
102
40.9k
  if (args[1].empty()) {
103
0
    return STATUS_SUBSTITUTE(InvalidCommand,
104
0
        "A SET request must have a non empty key field");
105
0
  }
106
40.9k
  op->mutable_request()->mutable_set_request(); // Allocates new RedisSetRequestPB().
107
40.9k
  const auto& key = args[1];
108
40.9k
  const auto& value = args[2];
109
40.9k
  op->mutable_request()->mutable_key_value()->set_key(key.cdata(), key.size());
110
40.9k
  op->mutable_request()->mutable_key_value()->add_value(value.cdata(), value.size());
111
40.9k
  op->mutable_request()->mutable_key_value()->set_type(REDIS_TYPE_STRING);
112
40.9k
  size_t idx = 3;
113
41.0k
  while (idx < args.size()) {
114
57
    string upper_arg;
115
57
    if (args[idx].size() == 2) {
116
57
      ToUpperCase(args[idx].ToBuffer(), &upper_arg);
117
57
    }
118
119
57
    if (upper_arg == "EX" || upper_arg == "PX") {
120
24
      if (args.size() < idx + 2) {
121
0
        return STATUS_SUBSTITUTE(InvalidCommand,
122
0
            "Expected TTL field after the EX flag, no value found");
123
0
      }
124
24
      auto ttl_val = ParseInt64(args[idx + 1], "TTL");
125
24
      RETURN_NOT_OK(ttl_val);
126
24
      if (*ttl_val < kRedisMinTtlSetExSeconds || *ttl_val > kRedisMaxTtlSeconds) {
127
0
        return STATUS_FORMAT(InvalidCommand,
128
0
            "TTL field $0 is not within valid bounds", args[idx + 1]);
129
0
      }
130
24
      const int64_t milliseconds_per_unit =
131
18
          upper_arg == "EX" ? MonoTime::kMillisecondsPerSecond : 1;
132
24
      op->mutable_request()->mutable_set_request()->set_ttl(*ttl_val * milliseconds_per_unit);
133
24
      idx += 2;
134
33
    } else if (upper_arg == "XX") {
135
12
      op->mutable_request()->mutable_set_request()->set_mode(REDIS_WRITEMODE_UPDATE);
136
12
      idx += 1;
137
21
    } else if (upper_arg == "NX") {
138
21
      op->mutable_request()->mutable_set_request()->set_mode(REDIS_WRITEMODE_INSERT);
139
21
      idx += 1;
140
0
    } else {
141
0
      return STATUS_FORMAT(InvalidCommand,
142
0
          "Unidentified argument $0 found while parsing set command", args[idx]);
143
0
    }
144
57
  }
145
40.9k
  return Status::OK();
146
40.9k
}
147
148
0
CHECKED_STATUS ParseSetNX(YBRedisWriteOp *op, const RedisClientCommand& args) {
149
0
  if (args[1].empty()) {
150
0
    return STATUS_SUBSTITUTE(InvalidCommand,
151
0
        "A SETNX request must have a non empty key field");
152
0
  }
153
154
0
  const auto& key = args[1];
155
0
  const auto& value = args[2];
156
157
0
  auto key_value = op->mutable_request()->mutable_key_value();
158
0
  key_value->set_key(key.cdata(), key.size());
159
0
  key_value->add_value(value.cdata(), value.size());
160
0
  key_value->set_type(REDIS_TYPE_STRING);
161
162
0
  auto set_request = op->mutable_request()->mutable_set_request();
163
0
  set_request->set_mode(REDIS_WRITEMODE_INSERT);
164
  // SETNX returns 1 / 0 (instead of OK / (nil) in 'SET k v NX' command).
165
0
  set_request->set_expect_ok_response(false);
166
0
  return Status::OK();
167
0
}
168
169
// TODO: support MSET
170
0
CHECKED_STATUS ParseMSet(YBRedisWriteOp *op, const RedisClientCommand& args) {
171
0
  if (args.size() < 3 || args.size() % 2 == 0) {
172
0
    return STATUS_SUBSTITUTE(InvalidCommand,
173
0
        "An MSET request must have at least 3, odd number of arguments, found $0", args.size());
174
0
  }
175
0
  return STATUS(InvalidCommand, "MSET command not yet supported");
176
0
}
177
178
0
CHECKED_STATUS ParseHSet(YBRedisWriteOp *op, const RedisClientCommand& args) {
179
0
  const auto& key = args[1];
180
0
  const auto& subkey = args[2];
181
0
  const auto& value = args[3];
182
183
0
  op->mutable_request()->mutable_set_request(); // Allocates new RedisSetRequestPB().
184
0
  op->mutable_request()->mutable_key_value()->set_key(key.cdata(), key.size());
185
0
  op->mutable_request()->mutable_key_value()->set_type(REDIS_TYPE_HASH);
186
0
  op->mutable_request()->mutable_key_value()->add_subkey()->set_string_subkey(subkey.cdata(),
187
0
                                                                              subkey.size());
188
0
  op->mutable_request()->mutable_key_value()->add_value(value.cdata(), value.size());
189
0
  return Status::OK();
190
0
}
191
192
193
0
CHECKED_STATUS ParseHIncrBy(YBRedisWriteOp *op, const RedisClientCommand& args) {
194
0
  const auto& key = args[1];
195
0
  const auto& subkey = args[2];
196
0
  const auto& incr_by = ParseInt64(args[3], "INCR_BY");
197
0
  RETURN_NOT_OK(incr_by);
198
0
  op->mutable_request()->mutable_incr_request()->set_increment_int(*incr_by);
199
0
  op->mutable_request()->mutable_key_value()->set_key(key.cdata(), key.size());
200
0
  op->mutable_request()->mutable_key_value()->set_type(REDIS_TYPE_HASH);
201
0
  op->mutable_request()->mutable_key_value()->add_subkey()->set_string_subkey(subkey.cdata(),
202
0
                                                                              subkey.size());
203
0
  return Status::OK();
204
0
}
205
206
CHECKED_STATUS ParseZAddOptions(
207
10.5k
    SortedSetOptionsPB *options, const RedisClientCommand& args, size_t *idx) {
208
  // While we keep seeing flags, set the appropriate field in options and increment idx. When
209
  // we finally stop seeing flags, the idx will be set to that token for later parsing.
210
  // Note that we can see duplicate flags, and it should have the same behavior as seeing the
211
  // flag once.
212
10.5k
  while (*idx < args.size()) {
213
10.5k
    if (boost::iequals(args[*idx].ToBuffer(), kCH)) {
214
0
      options->set_ch(true);
215
10.5k
    } else if (boost::iequals(args[*idx].ToBuffer(), kINCR)) {
216
0
      options->set_incr(true);
217
10.5k
    } else if (boost::iequals(args[*idx].ToBuffer(), kNX)) {
218
0
      if (options->update_options() == SortedSetOptionsPB_UpdateOptions_XX) {
219
0
        return STATUS_SUBSTITUTE(InvalidArgument,
220
0
                                 "XX and NX options at the same time are not compatible");
221
0
      }
222
0
      options->set_update_options(SortedSetOptionsPB_UpdateOptions_NX);
223
10.5k
    } else if (boost::iequals(args[*idx].ToBuffer(), kXX)) {
224
0
      if (options->update_options() == SortedSetOptionsPB_UpdateOptions_NX) {
225
0
        return STATUS_SUBSTITUTE(InvalidArgument,
226
0
                                 "XX and NX options at the same time are not compatible");
227
0
      }
228
0
      options->set_update_options(SortedSetOptionsPB_UpdateOptions_XX);
229
10.5k
    } else {
230
      // We have encountered a non-option token, return.
231
10.5k
      return Status::OK();
232
10.5k
    }
233
0
    *idx = *idx + 1;
234
0
  }
235
0
  return Status::OK();
236
10.5k
}
237
238
template <typename AddSubKey>
239
CHECKED_STATUS ParseHMSetLikeCommands(YBRedisWriteOp *op, const RedisClientCommand& args,
240
                                      const RedisDataType& type,
241
20.4k
                                      AddSubKey add_sub_key) {
242
20.4k
  if (args.size() < 4 || (args.size() % 2 == 1 && type == REDIS_TYPE_HASH)) {
243
0
    return STATUS_SUBSTITUTE(InvalidArgument,
244
0
                             "wrong number of arguments: $0 for command: $1", args.size(),
245
0
                             string(args[0].cdata(), args[0].size()));
246
0
  }
247
248
20.4k
  op->mutable_request()->mutable_set_request(); // Allocates new RedisSetRequestPB().
249
20.4k
  op->mutable_request()->mutable_key_value()->set_type(type);
250
20.4k
  op->mutable_request()->mutable_key_value()->set_key(args[1].cdata(), args[1].size());
251
252
20.4k
  if (type == REDIS_TYPE_HASH) {
253
9.90k
    op->mutable_request()->mutable_set_request()->set_expect_ok_response(true);
254
9.90k
  }
255
256
20.4k
  size_t start_idx = 2;
257
20.4k
  if (type == REDIS_TYPE_SORTEDSET) {
258
10.5k
    RETURN_NOT_OK(ParseZAddOptions(
259
10.5k
        op->mutable_request()->mutable_set_request()->mutable_sorted_set_options(),
260
10.5k
        args, &start_idx));
261
262
    // If the INCR flag is set, can only have one [score member] pair.
263
10.5k
    if (op->request().set_request().sorted_set_options().incr() && (args.size() - start_idx) != 2) {
264
0
      return STATUS_SUBSTITUTE(InvalidArgument,
265
0
                               "wrong number of tokens after INCR flag specified: Need 2 but found "
266
0
                                   "$0 for command: $1", args.size() - start_idx,
267
0
                               string(args[0].cdata(), args[0].size()));
268
0
    }
269
20.4k
  }
270
271
  // Need [score member] to come in pairs.
272
20.4k
  if ((args.size() - start_idx) % 2 == 1 || args.size() - start_idx == 0) {
273
0
    return STATUS_SUBSTITUTE(InvalidArgument,
274
0
                             "Expect even and non-zero number of arguments "
275
0
                             "for command: $0, found $1",
276
0
                             string(args[0].cdata(), args[0].size()), args.size() - start_idx);
277
0
  }
278
279
20.4k
  std::unordered_map<string, string> kv_map;
280
75.5k
  for (size_t i = start_idx; i < args.size(); i += 2) {
281
    // EXPIRE_AT/EXPIRE_IN only supported for redis timeseries currently.
282
55.0k
    if (type == REDIS_TYPE_TIMESERIES) {
283
14.8k
      string upper_arg;
284
14.8k
      ToUpperCase(args[i].ToBuffer(), &upper_arg);
285
14.8k
      if (upper_arg == kExpireAt || upper_arg == kExpireIn) {
286
6
        if (i + 2 != args.size()) {
287
0
          return STATUS_SUBSTITUTE(InvalidCommand, "$0 should be at the end of the command",
288
0
                                   string(args[i].cdata(), args[i].size()));
289
0
        }
290
6
        auto temp = CheckedStoll(args[i + 1]);
291
6
        RETURN_NOT_OK(temp);
292
6
        int64_t ttl = 0;
293
6
        if (upper_arg == kExpireIn) {
294
6
          ttl = *temp;
295
6
          if (ttl > kRedisMaxTtlSeconds || ttl < kRedisMinTtlSetExSeconds) {
296
0
            return STATUS_SUBSTITUTE(InvalidCommand, "TTL: $0 needs be in the range [$1, $2]", ttl,
297
0
                                     kRedisMinTtlSetExSeconds, kRedisMaxTtlSeconds);
298
0
          }
299
0
        } else {
300
0
          auto current_time = GetCurrentTimeMicros() / MonoTime::kMicrosecondsPerSecond;
301
0
          ttl = *temp - current_time;
302
0
          if (ttl > kRedisMaxTtlSeconds || ttl < kRedisMinTtlSetExSeconds) {
303
0
            return STATUS_SUBSTITUTE(InvalidCommand, "EXPIRE_AT: $0 needs be in the range [$1, $2]",
304
0
                                     *temp,
305
0
                                     kRedisMinTtlSetExSeconds + current_time,
306
0
                                     kRedisMaxTtlSeconds + current_time);
307
0
          }
308
6
        }
309
310
        // Need to pass ttl in milliseconds, user supplied values are in seconds.
311
6
        op->mutable_request()->mutable_set_request()->set_ttl(
312
6
            ttl * MonoTime::kMillisecondsPerSecond);
313
14.8k
      } else {
314
14.8k
        kv_map[args[i].ToBuffer()] = args[i + 1].ToBuffer();
315
14.8k
      }
316
40.2k
    } else if (type == REDIS_TYPE_SORTEDSET) {
317
      // For sorted sets, we store the mapping from values to scores, since values are distinct
318
      // but scores aren't.
319
10.5k
      kv_map[args[i + 1].ToBuffer()] = args[i].ToBuffer();
320
29.7k
    } else {
321
29.7k
      kv_map[args[i].ToBuffer()] = args[i + 1].ToBuffer();
322
29.7k
    }
323
55.0k
  }
324
325
55.0k
  for (const auto& kv : kv_map) {
326
55.0k
    auto req_kv = op->mutable_request()->mutable_key_value();
327
55.0k
    if (type == REDIS_TYPE_SORTEDSET) {
328
      // Since the mapping is values to scores, need to reverse when creating the request.
329
10.5k
      RETURN_NOT_OK(add_sub_key(kv.second, req_kv));
330
10.5k
      req_kv->add_value(kv.first);
331
44.5k
    } else {
332
44.5k
      RETURN_NOT_OK(add_sub_key(kv.first, req_kv));
333
44.5k
      req_kv->add_value(kv.second);
334
44.5k
    }
335
55.0k
  }
336
20.4k
  return Status::OK();
337
20.4k
}
_ZN2yb11redisserver22ParseHMSetLikeCommandsIPFNS_6StatusENSt3__112basic_stringIcNS3_11char_traitsIcEENS3_9allocatorIcEEEEPNS_15RedisKeyValuePBEEEES2_PNS_6client14YBRedisWriteOpERKN5boost9container12small_vectorINS_5SliceELm8EvvEERKNS_13RedisDataTypeET_
Line
Count
Source
241
9.90k
                                      AddSubKey add_sub_key) {
242
9.90k
  if (args.size() < 4 || (args.size() % 2 == 1 && type == REDIS_TYPE_HASH)) {
243
0
    return STATUS_SUBSTITUTE(InvalidArgument,
244
0
                             "wrong number of arguments: $0 for command: $1", args.size(),
245
0
                             string(args[0].cdata(), args[0].size()));
246
0
  }
247
248
9.90k
  op->mutable_request()->mutable_set_request(); // Allocates new RedisSetRequestPB().
249
9.90k
  op->mutable_request()->mutable_key_value()->set_type(type);
250
9.90k
  op->mutable_request()->mutable_key_value()->set_key(args[1].cdata(), args[1].size());
251
252
9.90k
  if (type == REDIS_TYPE_HASH) {
253
9.90k
    op->mutable_request()->mutable_set_request()->set_expect_ok_response(true);
254
9.90k
  }
255
256
9.90k
  size_t start_idx = 2;
257
9.90k
  if (type == REDIS_TYPE_SORTEDSET) {
258
0
    RETURN_NOT_OK(ParseZAddOptions(
259
0
        op->mutable_request()->mutable_set_request()->mutable_sorted_set_options(),
260
0
        args, &start_idx));
261
262
    // If the INCR flag is set, can only have one [score member] pair.
263
0
    if (op->request().set_request().sorted_set_options().incr() && (args.size() - start_idx) != 2) {
264
0
      return STATUS_SUBSTITUTE(InvalidArgument,
265
0
                               "wrong number of tokens after INCR flag specified: Need 2 but found "
266
0
                                   "$0 for command: $1", args.size() - start_idx,
267
0
                               string(args[0].cdata(), args[0].size()));
268
0
    }
269
9.90k
  }
270
271
  // Need [score member] to come in pairs.
272
9.90k
  if ((args.size() - start_idx) % 2 == 1 || args.size() - start_idx == 0) {
273
0
    return STATUS_SUBSTITUTE(InvalidArgument,
274
0
                             "Expect even and non-zero number of arguments "
275
0
                             "for command: $0, found $1",
276
0
                             string(args[0].cdata(), args[0].size()), args.size() - start_idx);
277
0
  }
278
279
9.90k
  std::unordered_map<string, string> kv_map;
280
39.6k
  for (size_t i = start_idx; i < args.size(); i += 2) {
281
    // EXPIRE_AT/EXPIRE_IN only supported for redis timeseries currently.
282
29.7k
    if (type == REDIS_TYPE_TIMESERIES) {
283
0
      string upper_arg;
284
0
      ToUpperCase(args[i].ToBuffer(), &upper_arg);
285
0
      if (upper_arg == kExpireAt || upper_arg == kExpireIn) {
286
0
        if (i + 2 != args.size()) {
287
0
          return STATUS_SUBSTITUTE(InvalidCommand, "$0 should be at the end of the command",
288
0
                                   string(args[i].cdata(), args[i].size()));
289
0
        }
290
0
        auto temp = CheckedStoll(args[i + 1]);
291
0
        RETURN_NOT_OK(temp);
292
0
        int64_t ttl = 0;
293
0
        if (upper_arg == kExpireIn) {
294
0
          ttl = *temp;
295
0
          if (ttl > kRedisMaxTtlSeconds || ttl < kRedisMinTtlSetExSeconds) {
296
0
            return STATUS_SUBSTITUTE(InvalidCommand, "TTL: $0 needs be in the range [$1, $2]", ttl,
297
0
                                     kRedisMinTtlSetExSeconds, kRedisMaxTtlSeconds);
298
0
          }
299
0
        } else {
300
0
          auto current_time = GetCurrentTimeMicros() / MonoTime::kMicrosecondsPerSecond;
301
0
          ttl = *temp - current_time;
302
0
          if (ttl > kRedisMaxTtlSeconds || ttl < kRedisMinTtlSetExSeconds) {
303
0
            return STATUS_SUBSTITUTE(InvalidCommand, "EXPIRE_AT: $0 needs be in the range [$1, $2]",
304
0
                                     *temp,
305
0
                                     kRedisMinTtlSetExSeconds + current_time,
306
0
                                     kRedisMaxTtlSeconds + current_time);
307
0
          }
308
0
        }
309
310
        // Need to pass ttl in milliseconds, user supplied values are in seconds.
311
0
        op->mutable_request()->mutable_set_request()->set_ttl(
312
0
            ttl * MonoTime::kMillisecondsPerSecond);
313
0
      } else {
314
0
        kv_map[args[i].ToBuffer()] = args[i + 1].ToBuffer();
315
0
      }
316
29.7k
    } else if (type == REDIS_TYPE_SORTEDSET) {
317
      // For sorted sets, we store the mapping from values to scores, since values are distinct
318
      // but scores aren't.
319
0
      kv_map[args[i + 1].ToBuffer()] = args[i].ToBuffer();
320
29.7k
    } else {
321
29.7k
      kv_map[args[i].ToBuffer()] = args[i + 1].ToBuffer();
322
29.7k
    }
323
29.7k
  }
324
325
29.7k
  for (const auto& kv : kv_map) {
326
29.7k
    auto req_kv = op->mutable_request()->mutable_key_value();
327
29.7k
    if (type == REDIS_TYPE_SORTEDSET) {
328
      // Since the mapping is values to scores, need to reverse when creating the request.
329
0
      RETURN_NOT_OK(add_sub_key(kv.second, req_kv));
330
0
      req_kv->add_value(kv.first);
331
29.7k
    } else {
332
29.7k
      RETURN_NOT_OK(add_sub_key(kv.first, req_kv));
333
29.7k
      req_kv->add_value(kv.second);
334
29.7k
    }
335
29.7k
  }
336
9.90k
  return Status::OK();
337
9.90k
}
_ZN2yb11redisserver22ParseHMSetLikeCommandsIPFNS_6StatusERKNSt3__112basic_stringIcNS3_11char_traitsIcEENS3_9allocatorIcEEEEPNS_15RedisKeyValuePBEEEES2_PNS_6client14YBRedisWriteOpERKN5boost9container12small_vectorINS_5SliceELm8EvvEERKNS_13RedisDataTypeET_
Line
Count
Source
241
10.5k
                                      AddSubKey add_sub_key) {
242
10.5k
  if (args.size() < 4 || (args.size() % 2 == 1 && type == REDIS_TYPE_HASH)) {
243
0
    return STATUS_SUBSTITUTE(InvalidArgument,
244
0
                             "wrong number of arguments: $0 for command: $1", args.size(),
245
0
                             string(args[0].cdata(), args[0].size()));
246
0
  }
247
248
10.5k
  op->mutable_request()->mutable_set_request(); // Allocates new RedisSetRequestPB().
249
10.5k
  op->mutable_request()->mutable_key_value()->set_type(type);
250
10.5k
  op->mutable_request()->mutable_key_value()->set_key(args[1].cdata(), args[1].size());
251
252
10.5k
  if (type == REDIS_TYPE_HASH) {
253
0
    op->mutable_request()->mutable_set_request()->set_expect_ok_response(true);
254
0
  }
255
256
10.5k
  size_t start_idx = 2;
257
10.5k
  if (type == REDIS_TYPE_SORTEDSET) {
258
10.5k
    RETURN_NOT_OK(ParseZAddOptions(
259
10.5k
        op->mutable_request()->mutable_set_request()->mutable_sorted_set_options(),
260
10.5k
        args, &start_idx));
261
262
    // If the INCR flag is set, can only have one [score member] pair.
263
10.5k
    if (op->request().set_request().sorted_set_options().incr() && (args.size() - start_idx) != 2) {
264
0
      return STATUS_SUBSTITUTE(InvalidArgument,
265
0
                               "wrong number of tokens after INCR flag specified: Need 2 but found "
266
0
                                   "$0 for command: $1", args.size() - start_idx,
267
0
                               string(args[0].cdata(), args[0].size()));
268
0
    }
269
10.5k
  }
270
271
  // Need [score member] to come in pairs.
272
10.5k
  if ((args.size() - start_idx) % 2 == 1 || args.size() - start_idx == 0) {
273
0
    return STATUS_SUBSTITUTE(InvalidArgument,
274
0
                             "Expect even and non-zero number of arguments "
275
0
                             "for command: $0, found $1",
276
0
                             string(args[0].cdata(), args[0].size()), args.size() - start_idx);
277
0
  }
278
279
10.5k
  std::unordered_map<string, string> kv_map;
280
35.9k
  for (size_t i = start_idx; i < args.size(); i += 2) {
281
    // EXPIRE_AT/EXPIRE_IN only supported for redis timeseries currently.
282
25.3k
    if (type == REDIS_TYPE_TIMESERIES) {
283
14.8k
      string upper_arg;
284
14.8k
      ToUpperCase(args[i].ToBuffer(), &upper_arg);
285
14.8k
      if (upper_arg == kExpireAt || upper_arg == kExpireIn) {
286
6
        if (i + 2 != args.size()) {
287
0
          return STATUS_SUBSTITUTE(InvalidCommand, "$0 should be at the end of the command",
288
0
                                   string(args[i].cdata(), args[i].size()));
289
0
        }
290
6
        auto temp = CheckedStoll(args[i + 1]);
291
6
        RETURN_NOT_OK(temp);
292
6
        int64_t ttl = 0;
293
6
        if (upper_arg == kExpireIn) {
294
6
          ttl = *temp;
295
6
          if (ttl > kRedisMaxTtlSeconds || ttl < kRedisMinTtlSetExSeconds) {
296
0
            return STATUS_SUBSTITUTE(InvalidCommand, "TTL: $0 needs be in the range [$1, $2]", ttl,
297
0
                                     kRedisMinTtlSetExSeconds, kRedisMaxTtlSeconds);
298
0
          }
299
0
        } else {
300
0
          auto current_time = GetCurrentTimeMicros() / MonoTime::kMicrosecondsPerSecond;
301
0
          ttl = *temp - current_time;
302
0
          if (ttl > kRedisMaxTtlSeconds || ttl < kRedisMinTtlSetExSeconds) {
303
0
            return STATUS_SUBSTITUTE(InvalidCommand, "EXPIRE_AT: $0 needs be in the range [$1, $2]",
304
0
                                     *temp,
305
0
                                     kRedisMinTtlSetExSeconds + current_time,
306
0
                                     kRedisMaxTtlSeconds + current_time);
307
0
          }
308
6
        }
309
310
        // Need to pass ttl in milliseconds, user supplied values are in seconds.
311
6
        op->mutable_request()->mutable_set_request()->set_ttl(
312
6
            ttl * MonoTime::kMillisecondsPerSecond);
313
14.8k
      } else {
314
14.8k
        kv_map[args[i].ToBuffer()] = args[i + 1].ToBuffer();
315
14.8k
      }
316
10.5k
    } else if (type == REDIS_TYPE_SORTEDSET) {
317
      // For sorted sets, we store the mapping from values to scores, since values are distinct
318
      // but scores aren't.
319
10.5k
      kv_map[args[i + 1].ToBuffer()] = args[i].ToBuffer();
320
0
    } else {
321
0
      kv_map[args[i].ToBuffer()] = args[i + 1].ToBuffer();
322
0
    }
323
25.3k
  }
324
325
25.3k
  for (const auto& kv : kv_map) {
326
25.3k
    auto req_kv = op->mutable_request()->mutable_key_value();
327
25.3k
    if (type == REDIS_TYPE_SORTEDSET) {
328
      // Since the mapping is values to scores, need to reverse when creating the request.
329
10.5k
      RETURN_NOT_OK(add_sub_key(kv.second, req_kv));
330
10.5k
      req_kv->add_value(kv.first);
331
14.8k
    } else {
332
14.8k
      RETURN_NOT_OK(add_sub_key(kv.first, req_kv));
333
14.8k
      req_kv->add_value(kv.second);
334
14.8k
    }
335
25.3k
  }
336
10.5k
  return Status::OK();
337
10.5k
}
338
339
9.90k
CHECKED_STATUS ParseHMSet(YBRedisWriteOp *op, const RedisClientCommand& args) {
340
0
  DCHECK_EQ("hmset", to_lower_case(args[0]))
341
0
      << "Parsing hmset request where first arg is not hmset.";
342
9.90k
  return ParseHMSetLikeCommands(op, args, REDIS_TYPE_HASH, add_string_subkey);
343
9.90k
}
344
345
87
CHECKED_STATUS ParseTsAdd(YBRedisWriteOp *op, const RedisClientCommand& args) {
346
0
  DCHECK_EQ("tsadd", to_lower_case(args[0]))
347
0
    << "Parsing hmset request where first arg is not hmset.";
348
87
  return ParseHMSetLikeCommands(op, args, REDIS_TYPE_TIMESERIES,
349
87
                                add_timestamp_subkey);
350
87
}
351
352
10.5k
Status ParseZAdd(YBRedisWriteOp *op, const RedisClientCommand& args) {
353
0
  DCHECK_EQ("zadd", to_lower_case(args[0]))
354
0
    << "Parsing zadd request where first arg is not zadd.";
355
10.5k
  return ParseHMSetLikeCommands(op, args, REDIS_TYPE_SORTEDSET, add_double_subkey);
356
10.5k
}
357
358
0
Status ParsePush(YBRedisWriteOp *op, const RedisClientCommand& args, RedisSide side) {
359
0
  op->mutable_request()->mutable_push_request()->set_side(side);
360
0
  op->mutable_request()->mutable_key_value()->set_key(args[1].cdata(), args[1].size());
361
0
  op->mutable_request()->mutable_key_value()->set_type(REDIS_TYPE_LIST);
362
363
0
  auto mutable_key = op->mutable_request()->mutable_key_value();
364
0
  for (size_t i = 2; i < args.size(); ++i) {
365
0
    mutable_key->add_value(args[i].cdata(), args[i].size());
366
0
  }
367
0
  return Status::OK();
368
0
}
369
370
0
Status ParseLPush(YBRedisWriteOp *op, const RedisClientCommand& args) {
371
0
  return ParsePush(op, args, REDIS_SIDE_LEFT);
372
0
}
373
374
0
Status ParseRPush(YBRedisWriteOp *op, const RedisClientCommand& args) {
375
0
  return ParsePush(op, args, REDIS_SIDE_RIGHT);
376
0
}
377
378
template <typename YBRedisOp, typename AddSubKey>
379
CHECKED_STATUS ParseCollection(YBRedisOp *op,
380
                               const RedisClientCommand& args,
381
                               boost::optional<RedisDataType> type,
382
                               AddSubKey add_sub_key,
383
27
                               bool remove_duplicates = true) {
384
27
  const auto& key = args[1];
385
27
  op->mutable_request()->mutable_key_value()->set_key(key.cdata(), key.size());
386
27
  if (type) {
387
9
    op->mutable_request()->mutable_key_value()->set_type(*type);
388
9
  }
389
27
  if (remove_duplicates) {
390
    // We remove duplicates from the subkeys here.
391
9
    std::set<string> subkey_set;
392
315
    for (size_t i = 2; i < args.size(); i++) {
393
306
      subkey_set.insert(string(args[i].cdata(), args[i].size()));
394
306
    }
395
9
    op->mutable_request()->mutable_key_value()->mutable_subkey()->Reserve(
396
9
        narrow_cast<int>(subkey_set.size()));
397
306
    for (const auto &val : subkey_set) {
398
306
      RETURN_NOT_OK(add_sub_key(val, op->mutable_request()->mutable_key_value()));
399
306
    }
400
18
  } else {
401
18
    op->mutable_request()->mutable_key_value()->mutable_subkey()->Reserve(
402
18
        narrow_cast<int>(args.size() - 2));
403
18
    for (size_t i = 2; i < args.size(); i++) {
404
0
      RETURN_NOT_OK(add_sub_key(string(args[i].cdata(), args[i].size()),
405
0
                                op->mutable_request()->mutable_key_value()));
406
0
    }
407
18
  }
408
27
  return Status::OK();
409
27
}
Unexecuted instantiation: _ZN2yb11redisserver15ParseCollectionINS_6client14YBRedisWriteOpEPFNS_6StatusENSt3__112basic_stringIcNS5_11char_traitsIcEENS5_9allocatorIcEEEEPNS_15RedisKeyValuePBEEEES4_PT_RKN5boost9container12small_vectorINS_5SliceELm8EvvEENSI_8optionalINS_13RedisDataTypeEEET0_b
_ZN2yb11redisserver15ParseCollectionINS_6client14YBRedisWriteOpEPFNS_6StatusERKNSt3__112basic_stringIcNS5_11char_traitsIcEENS5_9allocatorIcEEEEPNS_15RedisKeyValuePBEEEES4_PT_RKN5boost9container12small_vectorINS_5SliceELm8EvvEENSK_8optionalINS_13RedisDataTypeEEET0_b
Line
Count
Source
383
9
                               bool remove_duplicates = true) {
384
9
  const auto& key = args[1];
385
9
  op->mutable_request()->mutable_key_value()->set_key(key.cdata(), key.size());
386
9
  if (type) {
387
9
    op->mutable_request()->mutable_key_value()->set_type(*type);
388
9
  }
389
9
  if (remove_duplicates) {
390
    // We remove duplicates from the subkeys here.
391
9
    std::set<string> subkey_set;
392
315
    for (size_t i = 2; i < args.size(); i++) {
393
306
      subkey_set.insert(string(args[i].cdata(), args[i].size()));
394
306
    }
395
9
    op->mutable_request()->mutable_key_value()->mutable_subkey()->Reserve(
396
9
        narrow_cast<int>(subkey_set.size()));
397
306
    for (const auto &val : subkey_set) {
398
306
      RETURN_NOT_OK(add_sub_key(val, op->mutable_request()->mutable_key_value()));
399
306
    }
400
0
  } else {
401
0
    op->mutable_request()->mutable_key_value()->mutable_subkey()->Reserve(
402
0
        narrow_cast<int>(args.size() - 2));
403
0
    for (size_t i = 2; i < args.size(); i++) {
404
0
      RETURN_NOT_OK(add_sub_key(string(args[i].cdata(), args[i].size()),
405
0
                                op->mutable_request()->mutable_key_value()));
406
0
    }
407
0
  }
408
9
  return Status::OK();
409
9
}
_ZN2yb11redisserver15ParseCollectionINS_6client13YBRedisReadOpEPFNS_6StatusENSt3__112basic_stringIcNS5_11char_traitsIcEENS5_9allocatorIcEEEEPNS_15RedisKeyValuePBEEEES4_PT_RKN5boost9container12small_vectorINS_5SliceELm8EvvEENSI_8optionalINS_13RedisDataTypeEEET0_b
Line
Count
Source
383
18
                               bool remove_duplicates = true) {
384
18
  const auto& key = args[1];
385
18
  op->mutable_request()->mutable_key_value()->set_key(key.cdata(), key.size());
386
18
  if (type) {
387
0
    op->mutable_request()->mutable_key_value()->set_type(*type);
388
0
  }
389
18
  if (remove_duplicates) {
390
    // We remove duplicates from the subkeys here.
391
0
    std::set<string> subkey_set;
392
0
    for (size_t i = 2; i < args.size(); i++) {
393
0
      subkey_set.insert(string(args[i].cdata(), args[i].size()));
394
0
    }
395
0
    op->mutable_request()->mutable_key_value()->mutable_subkey()->Reserve(
396
0
        narrow_cast<int>(subkey_set.size()));
397
0
    for (const auto &val : subkey_set) {
398
0
      RETURN_NOT_OK(add_sub_key(val, op->mutable_request()->mutable_key_value()));
399
0
    }
400
18
  } else {
401
18
    op->mutable_request()->mutable_key_value()->mutable_subkey()->Reserve(
402
18
        narrow_cast<int>(args.size() - 2));
403
18
    for (size_t i = 2; i < args.size(); i++) {
404
0
      RETURN_NOT_OK(add_sub_key(string(args[i].cdata(), args[i].size()),
405
0
                                op->mutable_request()->mutable_key_value()));
406
0
    }
407
18
  }
408
18
  return Status::OK();
409
18
}
410
411
0
CHECKED_STATUS ParseHDel(YBRedisWriteOp *op, const RedisClientCommand& args) {
412
0
  op->mutable_request()->mutable_del_request(); // Allocates new RedisDelRequestPB().
413
0
  return ParseCollection(op, args, REDIS_TYPE_HASH, add_string_subkey);
414
0
}
415
416
9
CHECKED_STATUS ParseTsRem(YBRedisWriteOp *op, const RedisClientCommand& args) {
417
9
  op->mutable_request()->mutable_del_request(); // Allocates new RedisDelRequestPB().
418
9
  return ParseCollection(op, args, REDIS_TYPE_TIMESERIES, add_timestamp_subkey);
419
9
}
420
421
0
CHECKED_STATUS ParseZRem(YBRedisWriteOp *op, const RedisClientCommand& args) {
422
0
  op->mutable_request()->mutable_del_request(); // Allocates new RedisDelRequestPB().
423
0
  return ParseCollection(op, args, REDIS_TYPE_SORTEDSET, add_string_subkey);
424
0
}
425
426
0
CHECKED_STATUS ParseSAdd(YBRedisWriteOp *op, const RedisClientCommand& args) {
427
0
  op->mutable_request()->mutable_add_request(); // Allocates new RedisAddRequestPB().
428
0
  return ParseCollection(op, args, REDIS_TYPE_SET, add_string_subkey);
429
0
}
430
431
0
CHECKED_STATUS ParseSRem(YBRedisWriteOp *op, const RedisClientCommand& args) {
432
0
  op->mutable_request()->mutable_del_request(); // Allocates new RedisDelRequestPB().
433
0
  return ParseCollection(op, args, REDIS_TYPE_SET, add_string_subkey);
434
0
}
435
436
0
CHECKED_STATUS ParsePop(YBRedisWriteOp *op, const RedisClientCommand& args, RedisSide side) {
437
0
  op->mutable_request()->mutable_pop_request()->set_side(side);
438
0
  op->mutable_request()->mutable_key_value()->set_key(args[1].cdata(), args[1].size());
439
0
  op->mutable_request()->mutable_key_value()->set_type(REDIS_TYPE_LIST);
440
0
  return Status::OK();
441
0
}
442
443
0
CHECKED_STATUS ParseLPop(YBRedisWriteOp *op, const RedisClientCommand& args) {
444
0
  return ParsePop(op, args, REDIS_SIDE_LEFT);
445
0
}
446
447
0
CHECKED_STATUS ParseRPop(YBRedisWriteOp *op, const RedisClientCommand& args) {
448
0
  return ParsePop(op, args, REDIS_SIDE_RIGHT);
449
0
}
450
451
0
CHECKED_STATUS ParseGetSet(YBRedisWriteOp *op, const RedisClientCommand& args) {
452
0
  const auto& key = args[1];
453
0
  const auto& value = args[2];
454
0
  op->mutable_request()->mutable_getset_request(); // Allocates new RedisGetSetRequestPB().
455
0
  op->mutable_request()->mutable_key_value()->set_key(key.cdata(), key.size());
456
0
  op->mutable_request()->mutable_key_value()->add_value(value.cdata(), value.size());
457
0
  return Status::OK();
458
0
}
459
460
0
CHECKED_STATUS ParseAppend(YBRedisWriteOp* op, const RedisClientCommand& args) {
461
0
  const auto& key = args[1];
462
0
  const auto& value = args[2];
463
0
  op->mutable_request()->mutable_append_request(); // Allocates new RedisAppendRequestPB().
464
0
  op->mutable_request()->mutable_key_value()->set_key(key.cdata(), key.size());
465
0
  op->mutable_request()->mutable_key_value()->add_value(value.cdata(), value.size());
466
0
  return Status::OK();
467
0
}
468
469
// Note: deleting only one key is supported using one command as of now.
470
3
CHECKED_STATUS ParseDel(YBRedisWriteOp* op, const RedisClientCommand& args) {
471
3
  const auto& key = args[1];
472
3
  op->mutable_request()->mutable_del_request(); // Allocates new RedisDelRequestPB().
473
3
  op->mutable_request()->mutable_key_value()->set_key(key.cdata(), key.size());
474
  // We should be able to delete all types of top level keys
475
3
  op->mutable_request()->mutable_key_value()->set_type(REDIS_TYPE_NONE);
476
3
  return Status::OK();
477
3
}
478
479
0
CHECKED_STATUS ParseSetRange(YBRedisWriteOp* op, const RedisClientCommand& args) {
480
0
  const auto& key = args[1];
481
0
  const auto& value = args[3];
482
0
  op->mutable_request()->mutable_key_value()->set_key(key.cdata(), key.size());
483
0
  op->mutable_request()->mutable_key_value()->add_value(value.cdata(), value.size());
484
485
0
  auto offset = ParseInt32(args[2], "offset");
486
0
  RETURN_NOT_OK(offset);
487
  // TODO: Should we have an upper bound?
488
  // A very large offset would allocate a lot of memory and maybe crash
489
0
  if (*offset < 0) {
490
0
    return STATUS_SUBSTITUTE(InvalidArgument,
491
0
        "offset field of SETRANGE must be non-negative, found: $0", *offset);
492
0
  }
493
0
  op->mutable_request()->mutable_set_range_request()->set_offset(*offset);
494
495
0
  return Status::OK();
496
0
}
497
498
0
CHECKED_STATUS ParseIncr(YBRedisWriteOp* op, const RedisClientCommand& args) {
499
0
  const auto& key = args[1];
500
0
  op->mutable_request()->mutable_incr_request()->set_increment_int(1);
501
0
  op->mutable_request()->mutable_key_value()->set_key(key.cdata(), key.size());
502
0
  op->mutable_request()->mutable_key_value()->set_type(REDIS_TYPE_STRING);
503
0
  return Status::OK();
504
0
}
505
506
0
CHECKED_STATUS ParseIncrBy(YBRedisWriteOp* op, const RedisClientCommand& args) {
507
0
  const auto& key = args[1];
508
0
  const auto& incr_by = ParseInt64(args[2], "INCR_BY");
509
0
  RETURN_NOT_OK(incr_by);
510
0
  op->mutable_request()->mutable_incr_request()->set_increment_int(*incr_by);
511
0
  op->mutable_request()->mutable_key_value()->set_key(key.cdata(), key.size());
512
0
  op->mutable_request()->mutable_key_value()->set_type(REDIS_TYPE_STRING);
513
0
  return Status::OK();
514
0
}
515
516
36.4k
CHECKED_STATUS ParseGet(YBRedisReadOp* op, const RedisClientCommand& args) {
517
36.4k
  const auto& key = args[1];
518
36.4k
  if (key.empty()) {
519
0
    return STATUS_SUBSTITUTE(InvalidCommand,
520
0
        "A GET request must have non empty key field");
521
0
  }
522
36.4k
  op->mutable_request()->mutable_key_value()->set_key(key.cdata(), key.size());
523
36.4k
  op->mutable_request()->mutable_get_request()->set_request_type(
524
36.4k
      RedisGetRequestPB_GetRequestType_GET);
525
36.4k
  return Status::OK();
526
36.4k
}
527
528
//  Used for HGET/HSTRLEN/HEXISTS. Also for HMGet
529
//  CMD <KEY> [<SUB-KEY>]*
530
CHECKED_STATUS ParseHGetLikeCommands(YBRedisReadOp* op, const RedisClientCommand& args,
531
                                     RedisGetRequestPB_GetRequestType request_type,
532
18
                                     bool remove_duplicates = false) {
533
18
  op->mutable_request()->mutable_get_request()->set_request_type(request_type);
534
18
  return ParseCollection(op, args, boost::none, add_string_subkey, remove_duplicates);
535
18
}
536
537
// TODO: Support MGET
538
0
CHECKED_STATUS ParseMGet(YBRedisReadOp* op, const RedisClientCommand& args) {
539
0
  return STATUS(InvalidCommand, "MGET command not yet supported");
540
0
}
541
542
0
CHECKED_STATUS ParseHGet(YBRedisReadOp* op, const RedisClientCommand& args) {
543
0
  return ParseHGetLikeCommands(op, args, RedisGetRequestPB_GetRequestType_HGET);
544
0
}
545
546
CHECKED_STATUS ParseTsBoundArg(const Slice& slice, RedisSubKeyBoundPB* bound_pb,
547
                               RedisCollectionGetRangeRequestPB::GetRangeRequestType request_type,
548
10.9k
                               bool exclusive) {
549
10.9k
  string bound(slice.cdata(), slice.size());
550
10.9k
  if (bound == kPositiveInfinity) {
551
39
    bound_pb->set_infinity_type(RedisSubKeyBoundPB_InfinityType_POSITIVE);
552
10.8k
  } else if (bound == kNegativeInfinity) {
553
39
    bound_pb->set_infinity_type(RedisSubKeyBoundPB_InfinityType_NEGATIVE);
554
10.8k
  } else {
555
10.8k
    bound_pb->set_is_exclusive(exclusive);
556
10.8k
    switch (request_type) {
557
10.8k
      case RedisCollectionGetRangeRequestPB_GetRangeRequestType_TSREVRANGEBYTIME:
558
10.8k
        FALLTHROUGH_INTENDED;
559
10.8k
      case RedisCollectionGetRangeRequestPB_GetRangeRequestType_TSRANGEBYTIME: {
560
10.8k
        auto ts_bound = CheckedStoll(slice);
561
10.8k
        RETURN_NOT_OK(ts_bound);
562
10.8k
        bound_pb->mutable_subkey_bound()->set_timestamp_subkey(*ts_bound);
563
10.8k
        break;
564
10.8k
      }
565
12
      case RedisCollectionGetRangeRequestPB_GetRangeRequestType_ZRANGEBYSCORE: {
566
12
        auto double_bound = CheckedStold(slice);
567
12
        RETURN_NOT_OK(double_bound);
568
12
        bound_pb->mutable_subkey_bound()->set_double_subkey(*double_bound);
569
12
        break;
570
12
      }
571
0
      default:
572
0
        return STATUS_SUBSTITUTE(InvalidArgument, "Invalid request type: $0", request_type);
573
10.9k
    }
574
575
10.9k
  }
576
10.9k
  return Status::OK();
577
10.9k
}
578
579
CHECKED_STATUS
580
0
ParseIndexBoundArg(const Slice& slice, RedisIndexBoundPB* bound_pb) {
581
0
  auto index_bound = CheckedStoll(slice);
582
0
  RETURN_NOT_OK(index_bound);
583
0
  bound_pb->set_index(*index_bound);
584
0
  return Status::OK();
585
0
}
586
587
CHECKED_STATUS
588
ParseTsSubKeyBound(const Slice& slice, RedisSubKeyBoundPB* bound_pb,
589
10.9k
                   RedisCollectionGetRangeRequestPB::GetRangeRequestType request_type) {
590
10.9k
  if (slice.empty()) {
591
0
    return STATUS(InvalidCommand, "range bound key cannot be empty");
592
0
  }
593
594
10.9k
  if (slice[0] == '(' && slice.size() > 1) {
595
1.80k
    auto slice_copy = slice;
596
1.80k
    slice_copy.remove_prefix(1);
597
1.80k
    RETURN_NOT_OK(ParseTsBoundArg(slice_copy, bound_pb, request_type, /* exclusive */ true));
598
9.12k
  } else {
599
9.12k
    RETURN_NOT_OK(ParseTsBoundArg(slice, bound_pb, request_type, /* exclusive */ false));
600
9.12k
  }
601
10.9k
  return Status::OK();
602
10.9k
}
603
604
0
CHECKED_STATUS ParseIndexBound(const Slice& slice, RedisIndexBoundPB* bound_pb) {
605
0
  if (slice.empty()) {
606
0
    return STATUS(InvalidArgument, "range bound index cannot be empty");
607
0
  }
608
0
  RETURN_NOT_OK(ParseIndexBoundArg(slice, bound_pb));
609
0
  return Status::OK();
610
0
}
611
612
18
CHECKED_STATUS ParseTsCard(YBRedisReadOp* op, const RedisClientCommand& args) {
613
18
  return ParseHGetLikeCommands(op, args, RedisGetRequestPB_GetRequestType_TSCARD);
614
18
}
615
616
18
CHECKED_STATUS ParseTsLastN(YBRedisReadOp* op, const RedisClientCommand& args) {
617
  // TSLastN is basically TSRangeByTime -INF, INF with a limit on number of entries. Note that
618
  // there is a subtle difference here since TSRangeByTime iterates on entries from highest to
619
  // lowest and hence we end up returning the highest N entries. This operation is more like
620
  // TSRevRangeByTime -INF, INF with a limit (Note that TSRevRangeByTime is not implemented).
621
18
  op->mutable_request()->mutable_get_collection_range_request()->set_request_type(
622
18
      RedisCollectionGetRangeRequestPB_GetRangeRequestType_TSRANGEBYTIME);
623
18
  const auto& key = args[1];
624
18
  auto limit = ParseInt32(args[2], "limit");
625
18
  RETURN_NOT_OK(limit);
626
18
  if ((*limit) <= 0) {
627
0
    return STATUS_SUBSTITUTE(InvalidArgument,
628
0
        "$0 field $1 is not within valid bounds", "limit", args[2].ToDebugString());
629
0
  }
630
18
  op->mutable_request()->mutable_key_value()->set_key(key.ToBuffer());
631
18
  op->mutable_request()->set_range_request_limit(*limit);
632
18
  op->mutable_request()->mutable_subkey_range()->mutable_lower_bound()->set_infinity_type
633
18
      (RedisSubKeyBoundPB_InfinityType_NEGATIVE);
634
18
  op->mutable_request()->mutable_subkey_range()->mutable_upper_bound()->set_infinity_type
635
18
      (RedisSubKeyBoundPB_InfinityType_POSITIVE);
636
18
  return Status::OK();
637
18
}
638
639
15
CHECKED_STATUS ParseTsRangeByTime(YBRedisReadOp* op, const RedisClientCommand& args) {
640
15
  op->mutable_request()->mutable_get_collection_range_request()->set_request_type(
641
15
      RedisCollectionGetRangeRequestPB_GetRangeRequestType_TSRANGEBYTIME);
642
643
15
  const auto& key = args[1];
644
15
  RETURN_NOT_OK(ParseTsSubKeyBound(
645
15
      args[2],
646
15
      op->mutable_request()->mutable_subkey_range()->mutable_lower_bound(),
647
15
      RedisCollectionGetRangeRequestPB_GetRangeRequestType_TSRANGEBYTIME));
648
12
  RETURN_NOT_OK(ParseTsSubKeyBound(
649
12
      args[3],
650
12
      op->mutable_request()->mutable_subkey_range()->mutable_upper_bound(),
651
12
      RedisCollectionGetRangeRequestPB_GetRangeRequestType_TSRANGEBYTIME));
652
653
12
  op->mutable_request()->mutable_key_value()->set_key(key.ToBuffer());
654
12
  return Status::OK();
655
12
}
656
657
5.40k
CHECKED_STATUS ParseTsRevRangeByTime(YBRedisReadOp* op, const RedisClientCommand& args) {
658
5.40k
  op->mutable_request()->mutable_get_collection_range_request()->set_request_type(
659
5.40k
      RedisCollectionGetRangeRequestPB_GetRangeRequestType_TSREVRANGEBYTIME);
660
661
5.40k
  const auto& key = args[1];
662
5.40k
      RETURN_NOT_OK(ParseTsSubKeyBound(
663
5.40k
      args[2],
664
5.40k
      op->mutable_request()->mutable_subkey_range()->mutable_lower_bound(),
665
5.40k
      RedisCollectionGetRangeRequestPB_GetRangeRequestType_TSREVRANGEBYTIME));
666
5.40k
      RETURN_NOT_OK(ParseTsSubKeyBound(
667
5.40k
      args[3],
668
5.40k
      op->mutable_request()->mutable_subkey_range()->mutable_upper_bound(),
669
5.40k
      RedisCollectionGetRangeRequestPB_GetRangeRequestType_TSREVRANGEBYTIME));
670
671
5.40k
  op->mutable_request()->mutable_key_value()->set_key(key.ToBuffer());
672
673
5.40k
  if (args.size() > 4) {
674
2.70k
    if (args.size() != 6) {
675
0
      return STATUS_SUBSTITUTE(InvalidCommand,
676
0
                               "Invalid number of arguments. Command should have 4 or 6 arguments");
677
0
    }
678
2.70k
    string upper_arg;
679
2.70k
    ToUpperCase(args[4].ToBuffer(), &upper_arg);
680
2.70k
    if (upper_arg != "LIMIT") {
681
0
      return STATUS_SUBSTITUTE(InvalidArgument,
682
0
                               "Invalid argument $0. Expecting $1", args[4].ToBuffer(), "limit");
683
0
    }
684
2.70k
    auto limit = ParseInt32(args[5], "limit");
685
2.70k
    RETURN_NOT_OK(limit);
686
2.70k
    if ((*limit) <= 0) {
687
0
      return STATUS_SUBSTITUTE(InvalidArgument,
688
0
                               "$0 field $1 is not within valid bounds", "limit",
689
0
                               args[5].ToDebugString());
690
0
    }
691
2.70k
    op->mutable_request()->set_range_request_limit(*limit);
692
2.70k
  }
693
694
5.40k
  return Status::OK();
695
5.40k
}
696
697
0
CHECKED_STATUS ParseWithScores(const Slice& slice, RedisCollectionGetRangeRequestPB* request) {
698
0
  if(!boost::iequals(slice.ToBuffer(), kWithScores)) {
699
0
    return STATUS_SUBSTITUTE(InvalidArgument, "unexpected argument $0", slice.ToBuffer());
700
0
  }
701
0
  request->set_with_scores(true);
702
0
  return Status::OK();
703
0
}
704
705
42
CHECKED_STATUS ParseRangeByScoreOptions(YBRedisReadOp* op, const RedisClientCommand& args) {
706
42
  auto args_size = args.size();
707
72
  for (size_t i = 4; i < args_size; ++i) {
708
30
    string upper_arg;
709
30
    ToUpperCase(args[i].ToBuffer(), &upper_arg);
710
30
    if (upper_arg == "LIMIT") {
711
21
      if (i >= args_size - 2) {
712
0
        return STATUS_SUBSTITUTE(InvalidArgument, "Not enough args passed into LIMIT clause, "
713
0
            "expected 2 but found $0", args_size - (i + 1));
714
0
      }
715
21
      auto offset = VERIFY_RESULT(ParseInt64(args[++i], "offset"));
716
21
      auto limit = VERIFY_RESULT(ParseInt32(args[++i], "count"));
717
21
      op->mutable_request()->mutable_index_range()->mutable_lower_bound()->set_index(offset);
718
21
      op->mutable_request()->set_range_request_limit(limit);
719
9
    } else if (upper_arg == "WITHSCORES") {
720
9
      op->mutable_request()->mutable_get_collection_range_request()->set_with_scores(true);
721
0
    } else {
722
0
      return STATUS_SUBSTITUTE(InvalidArgument, "Invalid argument $0", args[i].ToBuffer());
723
0
    }
724
30
  }
725
42
  return Status::OK();
726
42
}
727
728
42
CHECKED_STATUS ParseZRangeByScore(YBRedisReadOp* op, const RedisClientCommand& args) {
729
42
  op->mutable_request()->mutable_get_collection_range_request()->set_request_type(
730
42
      RedisCollectionGetRangeRequestPB_GetRangeRequestType_ZRANGEBYSCORE);
731
732
42
  const auto& key = args[1];
733
42
  RETURN_NOT_OK(ParseTsSubKeyBound(
734
42
  args[2],
735
42
  op->mutable_request()->mutable_subkey_range()->mutable_lower_bound(),
736
42
  RedisCollectionGetRangeRequestPB_GetRangeRequestType_ZRANGEBYSCORE));
737
42
  RETURN_NOT_OK(ParseTsSubKeyBound(
738
42
  args[3],
739
42
  op->mutable_request()->mutable_subkey_range()->mutable_upper_bound(),
740
42
  RedisCollectionGetRangeRequestPB_GetRangeRequestType_ZRANGEBYSCORE));
741
42
  op->mutable_request()->mutable_key_value()->set_key(key.ToBuffer());
742
743
42
  return ParseRangeByScoreOptions(op, args);
744
42
}
745
746
CHECKED_STATUS ParseIndexBasedQuery(
747
    YBRedisReadOp* op,
748
    const RedisClientCommand& args,
749
0
    RedisCollectionGetRangeRequestPB::GetRangeRequestType request_type) {
750
0
  if (args.size() <= 5) {
751
0
    op->mutable_request()->mutable_get_collection_range_request()->set_request_type(request_type);
752
753
0
    const auto& key = args[1];
754
0
    RETURN_NOT_OK(ParseIndexBound(
755
0
    args[2],
756
0
    op->mutable_request()->mutable_index_range()->mutable_lower_bound()));
757
0
    RETURN_NOT_OK(ParseIndexBound(
758
0
    args[3],
759
0
    op->mutable_request()->mutable_index_range()->mutable_upper_bound()));
760
0
    op->mutable_request()->mutable_key_value()->set_key(key.ToBuffer());
761
0
    if (args.size() == 5) {
762
0
      RETURN_NOT_OK(ParseWithScores(
763
0
      args[4],
764
0
      op->mutable_request()->mutable_get_collection_range_request()));
765
0
    }
766
0
    return Status::OK();
767
0
  }
768
0
  return STATUS(InvalidArgument, "Expected at most 5 arguments, found $0",
769
0
                  std::to_string(args.size()));
770
0
}
771
772
0
CHECKED_STATUS ParseZRange(YBRedisReadOp* op, const RedisClientCommand& args) {
773
0
  return ParseIndexBasedQuery(
774
0
      op, args, RedisCollectionGetRangeRequestPB_GetRangeRequestType_ZRANGE);
775
0
}
776
777
0
CHECKED_STATUS ParseZRevRange(YBRedisReadOp* op, const RedisClientCommand& args) {
778
0
  return ParseIndexBasedQuery(
779
0
      op, args, RedisCollectionGetRangeRequestPB_GetRangeRequestType_ZREVRANGE);
780
0
}
781
782
609
CHECKED_STATUS ParseTsGet(YBRedisReadOp* op, const RedisClientCommand& args) {
783
609
  op->mutable_request()->mutable_get_request()->set_request_type(
784
609
      RedisGetRequestPB_GetRequestType_TSGET);
785
786
609
  const auto& key = args[1];
787
609
  op->mutable_request()->mutable_key_value()->set_key(key.cdata(), key.size());
788
609
  auto timestamp = CheckedStoll(args[2]);
789
609
  RETURN_NOT_OK(timestamp);
790
609
  op->mutable_request()->mutable_key_value()->add_subkey()->set_timestamp_subkey(*timestamp);
791
792
609
  return Status::OK();
793
609
}
794
795
21
CHECKED_STATUS ParseZScore(YBRedisReadOp* op, const RedisClientCommand& args) {
796
21
  op->mutable_request()->mutable_get_request()->set_request_type(
797
21
      RedisGetRequestPB_GetRequestType_ZSCORE);
798
799
21
  const auto& key = args[1];
800
21
  op->mutable_request()->mutable_key_value()->set_key(key.cdata(), key.size());
801
21
  auto member = args[2].ToBuffer();
802
21
  op->mutable_request()->mutable_key_value()->add_subkey()->set_string_subkey(member);
803
804
21
  return Status::OK();
805
21
}
806
807
0
CHECKED_STATUS ParseHStrLen(YBRedisReadOp* op, const RedisClientCommand& args) {
808
0
  return ParseHGetLikeCommands(op, args, RedisGetRequestPB_GetRequestType_HSTRLEN);
809
0
}
810
811
0
CHECKED_STATUS ParseHExists(YBRedisReadOp* op, const RedisClientCommand& args) {
812
0
  return ParseHGetLikeCommands(op, args, RedisGetRequestPB_GetRequestType_HEXISTS);
813
0
}
814
815
0
CHECKED_STATUS ParseHMGet(YBRedisReadOp* op, const RedisClientCommand& args) {
816
0
  return ParseHGetLikeCommands(op, args, RedisGetRequestPB_GetRequestType_HMGET);
817
0
}
818
819
0
CHECKED_STATUS ParseHGetAll(YBRedisReadOp* op, const RedisClientCommand& args) {
820
0
  return ParseHGetLikeCommands(op, args, RedisGetRequestPB_GetRequestType_HGETALL);
821
0
}
822
823
0
CHECKED_STATUS ParseHKeys(YBRedisReadOp* op, const RedisClientCommand& args) {
824
0
  return ParseHGetLikeCommands(op, args, RedisGetRequestPB_GetRequestType_HKEYS);
825
0
}
826
827
0
CHECKED_STATUS ParseHVals(YBRedisReadOp* op, const RedisClientCommand& args) {
828
0
  return ParseHGetLikeCommands(op, args, RedisGetRequestPB_GetRequestType_HVALS);
829
0
}
830
831
0
CHECKED_STATUS ParseHLen(YBRedisReadOp* op, const RedisClientCommand& args) {
832
0
  return ParseHGetLikeCommands(op, args, RedisGetRequestPB_GetRequestType_HLEN);
833
0
}
834
835
0
CHECKED_STATUS ParseSMembers(YBRedisReadOp* op, const RedisClientCommand& args) {
836
0
  return ParseHGetLikeCommands(op, args, RedisGetRequestPB_GetRequestType_SMEMBERS);
837
0
}
838
839
0
CHECKED_STATUS ParseSIsMember(YBRedisReadOp* op, const RedisClientCommand& args) {
840
0
  return ParseHGetLikeCommands(op, args, RedisGetRequestPB_GetRequestType_SISMEMBER);
841
0
}
842
843
0
CHECKED_STATUS ParseSCard(YBRedisReadOp* op, const RedisClientCommand& args) {
844
0
  return ParseHGetLikeCommands(op, args, RedisGetRequestPB_GetRequestType_SCARD);
845
0
}
846
847
0
CHECKED_STATUS ParseLLen(YBRedisReadOp* op, const RedisClientCommand& args) {
848
0
  return ParseHGetLikeCommands(op, args, RedisGetRequestPB_GetRequestType_LLEN);
849
0
}
850
851
0
CHECKED_STATUS ParseZCard(YBRedisReadOp* op, const RedisClientCommand& args) {
852
0
  return ParseHGetLikeCommands(op, args, RedisGetRequestPB_GetRequestType_ZCARD);
853
0
}
854
855
0
CHECKED_STATUS ParseStrLen(YBRedisReadOp* op, const RedisClientCommand& args) {
856
0
  op->mutable_request()->mutable_strlen_request(); // Allocates new RedisStrLenRequestPB().
857
0
  const auto& key = args[1];
858
0
  op->mutable_request()->mutable_key_value()->set_key(key.cdata(), key.size());
859
0
  return Status::OK();
860
0
}
861
862
// Note: Checking existence of only one key is supported as of now.
863
0
CHECKED_STATUS ParseExists(YBRedisReadOp* op, const RedisClientCommand& args) {
864
0
  op->mutable_request()->mutable_exists_request(); // Allocates new RedisExistsRequestPB().
865
0
  const auto& key = args[1];
866
0
  op->mutable_request()->mutable_key_value()->set_key(key.cdata(), key.size());
867
0
  return Status::OK();
868
0
}
869
870
0
CHECKED_STATUS ParseGetRange(YBRedisReadOp* op, const RedisClientCommand& args) {
871
0
  const auto& key = args[1];
872
0
  op->mutable_request()->mutable_key_value()->set_key(key.cdata(), key.size());
873
874
0
  auto start = ParseInt32(args[2], "Start");
875
0
  RETURN_NOT_OK(start);
876
0
  op->mutable_request()->mutable_get_range_request()->set_start(*start);
877
878
0
  auto end = ParseInt32(args[3], "End");
879
0
  RETURN_NOT_OK(end);
880
0
  op->mutable_request()->mutable_get_range_request()->set_end(*end);
881
882
0
  return Status::OK();
883
0
}
884
885
CHECKED_STATUS ParseExpire(YBRedisWriteOp* op,
886
                           const RedisClientCommand& args,
887
42
                           const bool using_millis) {
888
42
  const auto& key = args[1];
889
42
  auto ttl = ParseInt64(args[2], "TTL");
890
42
  RETURN_NOT_OK(ttl);
891
  // If the TTL is not positive, we immediately delete.
892
42
  if (*ttl <= 0) {
893
6
      op->mutable_request()->mutable_del_request(); // Allocates new RedisDelRequestPB().
894
6
      op->mutable_request()->mutable_key_value()->set_key(key.cdata(), key.size());
895
6
      op->mutable_request()->mutable_key_value()->set_type(REDIS_TYPE_NONE);
896
6
      return Status::OK();
897
6
  }
898
36
  *ttl *= using_millis ? 1 : MonoTime::kMillisecondsPerSecond;
899
36
  if (*ttl < kRedisMinTtlMillis || *ttl > kRedisMaxTtlMillis) {
900
0
    return STATUS_FORMAT(InvalidCommand,
901
0
        "TTL field $0 is not within valid bounds", args[2]);
902
0
  }
903
36
  op->mutable_request()->mutable_key_value()->set_key(key.cdata(), key.size());
904
36
  op->mutable_request()->mutable_set_ttl_request()->set_ttl(*ttl);
905
36
  return Status::OK();
906
36
}
907
908
39
CHECKED_STATUS ParseExpire(YBRedisWriteOp* op, const RedisClientCommand& args) {
909
39
  return ParseExpire(op, args, false);
910
39
}
911
912
3
CHECKED_STATUS ParsePExpire(YBRedisWriteOp* op, const RedisClientCommand& args) {
913
3
  return ParseExpire(op, args, true);
914
3
}
915
916
12
CHECKED_STATUS ParsePersist(YBRedisWriteOp* op, const RedisClientCommand& args) {
917
12
  const auto& key = args[1];
918
12
  op->mutable_request()->mutable_key_value()->set_key(key.cdata(), key.size());
919
12
  op->mutable_request()->mutable_set_ttl_request()->set_ttl(-1);
920
12
  return Status::OK();
921
12
}
922
923
CHECKED_STATUS ParseExpireAt(YBRedisWriteOp* op,
924
                           const RedisClientCommand& args,
925
0
                           const bool using_millis) {
926
0
  const auto& key = args[1];
927
0
  auto expiration = VERIFY_RESULT(ParseInt64(args[2], "expiration"));
928
  // If the TTL is not positive, we immediately delete.
929
0
  if (expiration <= 0) {
930
0
      op->mutable_request()->mutable_del_request(); // Allocates new RedisDelRequestPB().
931
0
      op->mutable_request()->mutable_key_value()->set_key(key.cdata(), key.size());
932
0
      op->mutable_request()->mutable_key_value()->set_type(REDIS_TYPE_NONE);
933
0
      return Status::OK();
934
0
  }
935
0
  expiration *= using_millis ? 1 : MonoTime::kMillisecondsPerSecond;
936
0
  op->mutable_request()->mutable_key_value()->set_key(key.cdata(), key.size());
937
0
  op->mutable_request()->mutable_set_ttl_request()->set_absolute_time(expiration);
938
0
  return Status::OK();
939
0
}
940
941
0
CHECKED_STATUS ParseExpireAt(YBRedisWriteOp* op, const RedisClientCommand& args) {
942
0
  return ParseExpireAt(op, args, /* using_millis */ false);
943
0
}
944
945
0
CHECKED_STATUS ParsePExpireAt(YBRedisWriteOp* op, const RedisClientCommand& args) {
946
0
  return ParseExpireAt(op, args, /* using_millis */ true);
947
0
}
948
949
CHECKED_STATUS ParseSetEx(YBRedisWriteOp* op,
950
                          const RedisClientCommand& args,
951
9
                          const bool using_millis) {
952
9
  const auto& key = args[1];
953
9
  const auto value = args[3];
954
9
  auto ttl = VERIFY_RESULT(ParseInt64(args[2], "TTL"));
955
9
  if (ttl <= 0) {
956
3
    op->mutable_request()->mutable_no_op_request(); // Allocates new RedisNoOpRequestPB().
957
3
    return Status::OK();
958
3
  }
959
6
  ttl *= using_millis ? 1 : MonoTime::kMillisecondsPerSecond;
960
6
  if (ttl < kRedisMinTtlMillis || ttl > kRedisMaxTtlMillis) {
961
0
    return STATUS_FORMAT(InvalidCommand,
962
0
        "TTL field $0 is not within valid bounds", args[3]);
963
0
  }
964
6
  op->mutable_request()->mutable_key_value()->set_key(key.cdata(), key.size());
965
6
  op->mutable_request()->mutable_key_value()->add_value(value.cdata(), value.size());
966
6
  op->mutable_request()->mutable_key_value()->set_type(REDIS_TYPE_STRING);
967
6
  op->mutable_request()->mutable_set_request()->set_ttl(ttl);
968
6
  return Status::OK();
969
6
}
970
971
6
CHECKED_STATUS ParseSetEx(YBRedisWriteOp* op, const RedisClientCommand& args) {
972
6
  return ParseSetEx(op, args, false);
973
6
}
974
975
3
CHECKED_STATUS ParsePSetEx(YBRedisWriteOp* op, const RedisClientCommand& args) {
976
3
  return ParseSetEx(op, args, true);
977
3
}
978
979
CHECKED_STATUS ParseTtl(YBRedisReadOp* op,
980
                        const RedisClientCommand& args,
981
132
                        const bool return_seconds) {
982
132
  const auto& key = args[1];
983
132
  op->mutable_request()->mutable_key_value()->set_key(key.cdata(), key.size());
984
132
  op->mutable_request()->mutable_get_ttl_request()->set_return_seconds(return_seconds);
985
132
  return Status::OK();
986
132
}
987
988
66
CHECKED_STATUS ParseTtl(YBRedisReadOp* op, const RedisClientCommand& args) {
989
66
  return ParseTtl(op, args, true);
990
66
}
991
992
66
CHECKED_STATUS ParsePTtl(YBRedisReadOp* op, const RedisClientCommand& args) {
993
66
  return ParseTtl(op, args, false);
994
66
}
995
996
// Begin of input is going to be consumed, so we should adjust our pointers.
997
// Since the beginning of input is being consumed by shifting the remaining bytes to the
998
// beginning of the buffer.
999
104k
void RedisParser::Consume(size_t count) {
1000
104k
  pos_ -= count;
1001
1002
104k
  if (token_begin_ != kNoToken) {
1003
104k
    token_begin_ -= count;
1004
104k
  }
1005
104k
}
1006
1007
// New data arrived, so update the end of available bytes.
1008
104k
void RedisParser::Update(const IoVecs& data) {
1009
104k
  source_ = data;
1010
104k
  full_size_ = IoVecsFullSize(data);
1011
104k
  DCHECK_LE(pos_, full_size_);
1012
104k
}
1013
1014
// Parse next command.
1015
314k
Result<size_t> RedisParser::NextCommand() {
1016
2.03M
  while (pos_ != full_size_) {
1017
1.92M
    incomplete_ = false;
1018
1.92M
    Status status = AdvanceToNextToken();
1019
1.92M
    if (!status.ok()) {
1020
2
      return status;
1021
2
    }
1022
1.92M
    if (incomplete_) {
1023
55
      pos_ = full_size_;
1024
55
      return 0;
1025
55
    }
1026
1.92M
    if (state_ == State::FINISHED) {
1027
209k
      state_ = State::INITIAL;
1028
209k
      return pos_;
1029
209k
    }
1030
1.92M
  }
1031
104k
  return 0;
1032
314k
}
1033
1034
1.92M
CHECKED_STATUS RedisParser::AdvanceToNextToken() {
1035
1.92M
  switch (state_) {
1036
209k
    case State::INITIAL:
1037
209k
      return Initial();
1038
2
    case State::SINGLE_LINE:
1039
2
      return SingleLine();
1040
209k
    case State::BULK_HEADER:
1041
209k
      return BulkHeader();
1042
755k
    case State::BULK_ARGUMENT_SIZE:
1043
755k
      return BulkArgumentSize();
1044
755k
    case State::BULK_ARGUMENT_BODY:
1045
755k
      return BulkArgumentBody();
1046
0
    case State::FINISHED:
1047
0
      return STATUS(IllegalState, "Should not be in FINISHED state during NextToken");
1048
0
  }
1049
0
  LOG(FATAL) << "Unexpected parser state: " << to_underlying(state_);
1050
0
}
1051
1052
209k
CHECKED_STATUS RedisParser::Initial() {
1053
209k
  token_begin_ = pos_;
1054
209k
  state_ = char_at_offset(pos_) == '*' ? State::BULK_HEADER : State::SINGLE_LINE;
1055
209k
  return Status::OK();
1056
209k
}
1057
1058
2
CHECKED_STATUS RedisParser::SingleLine() {
1059
2
  auto status = FindEndOfLine();
1060
2
  if (!status.ok() || incomplete_) {
1061
2
    return status;
1062
2
  }
1063
0
  auto start = token_begin_;
1064
0
  auto finish = pos_ - 2;
1065
0
  while (start < finish && isspace(char_at_offset(start))) {
1066
0
    ++start;
1067
0
  }
1068
0
  if (start >= finish) {
1069
0
    return STATUS(InvalidArgument, "Empty line");
1070
0
  }
1071
0
  if (args_) {
1072
    // Args is supported only when parsing from single block of data.
1073
    // Because we parse prepared call data in this case, that is contained in a single buffer.
1074
0
    DCHECK_EQ(source_.size(), 1);
1075
0
    RETURN_NOT_OK(util::SplitArgs(Slice(offset_to_pointer(start), finish - start), args_));
1076
0
  }
1077
0
  state_ = State::FINISHED;
1078
0
  return Status::OK();
1079
0
}
1080
1081
209k
CHECKED_STATUS RedisParser::BulkHeader() {
1082
209k
  auto status = FindEndOfLine();
1083
209k
  if (!status.ok() || incomplete_) {
1084
0
    return status;
1085
0
  }
1086
209k
  auto num_args = VERIFY_RESULT(ParseNumber(
1087
209k
      '*', 1, kMaxNumberOfArgs, "Number of lines in multiline"));
1088
209k
  if (args_) {
1089
104k
    args_->clear();
1090
104k
    args_->reserve(num_args);
1091
104k
  }
1092
209k
  state_ = State::BULK_ARGUMENT_SIZE;
1093
209k
  token_begin_ = pos_;
1094
209k
  arguments_left_ = num_args;
1095
209k
  return Status::OK();
1096
209k
}
1097
1098
755k
CHECKED_STATUS RedisParser::BulkArgumentSize() {
1099
755k
  auto status = FindEndOfLine();
1100
755k
  if (!status.ok() || incomplete_) {
1101
7
    return status;
1102
7
  }
1103
755k
  auto current_size = VERIFY_RESULT(ParseNumber('$', 0, kMaxRedisValueSize, "Argument size"));
1104
755k
  state_ = State::BULK_ARGUMENT_BODY;
1105
755k
  token_begin_ = pos_;
1106
755k
  current_argument_size_ = current_size;
1107
755k
  return Status::OK();
1108
755k
}
1109
1110
755k
CHECKED_STATUS RedisParser::BulkArgumentBody() {
1111
755k
  auto desired_position = token_begin_ + current_argument_size_ + kLineEndLength;
1112
755k
  if (desired_position > full_size_) {
1113
48
    incomplete_ = true;
1114
48
    pos_ = full_size_;
1115
48
    return Status::OK();
1116
48
  }
1117
755k
  if (char_at_offset(desired_position - 1) != '\n' ||
1118
755k
      char_at_offset(desired_position - 2) != '\r') {
1119
0
    return STATUS(NetworkError, "No \\r\\n after bulk");
1120
0
  }
1121
755k
  if (args_) {
1122
    // Args is supported only when parsing from single block of data.
1123
    // Because we parse prepared call data in this case, that is contained in a single buffer.
1124
377k
    DCHECK_EQ(source_.size(), 1);
1125
377k
    args_->emplace_back(offset_to_pointer(token_begin_), current_argument_size_);
1126
377k
  }
1127
755k
  --arguments_left_;
1128
755k
  pos_ = desired_position;
1129
755k
  token_begin_ = pos_;
1130
755k
  if (arguments_left_ == 0) {
1131
209k
    state_ = State::FINISHED;
1132
546k
  } else {
1133
546k
    state_ = State::BULK_ARGUMENT_SIZE;
1134
546k
  }
1135
755k
  return Status::OK();
1136
755k
}
1137
1138
964k
CHECKED_STATUS RedisParser::FindEndOfLine() {
1139
964k
  auto p = offset_to_idx_and_local_offset(pos_);
1140
1141
964k
  size_t new_line_offset = pos_;
1142
964k
  while (p.first != source_.size()) {
1143
964k
    auto begin = IoVecBegin(source_[p.first]) + p.second;
1144
964k
    auto new_line = static_cast<const char*>(memchr(
1145
964k
        begin, '\n', IoVecEnd(source_[p.first]) - begin));
1146
964k
    if (new_line) {
1147
964k
      new_line_offset += new_line - begin;
1148
964k
      break;
1149
964k
    }
1150
4
    new_line_offset += source_[p.first].iov_len - p.second;
1151
4
    ++p.first;
1152
4
    p.second = 0;
1153
4
  }
1154
1155
964k
  incomplete_ = p.first == source_.size();
1156
964k
  if (!incomplete_) {
1157
964k
    if (new_line_offset == token_begin_) {
1158
0
      return STATUS(NetworkError, "End of line at the beginning of a Redis command");
1159
0
    }
1160
964k
    if (char_at_offset(new_line_offset - 1) != '\r') {
1161
2
      return STATUS(NetworkError, "\\n is not prefixed with \\r");
1162
2
    }
1163
964k
    pos_ = ++new_line_offset;
1164
964k
  }
1165
964k
  return Status::OK();
1166
964k
}
1167
1168
4.99M
std::pair<size_t, size_t> RedisParser::offset_to_idx_and_local_offset(size_t offset) const {
1169
  // We assume that there are at most 2 blocks of data.
1170
4.99M
  if (offset < source_[0].iov_len) {
1171
4.99M
    return std::pair<size_t, size_t>(0, offset);
1172
4.99M
  }
1173
1174
2
  offset -= source_[0].iov_len;
1175
2
  size_t idx = offset / source_[1].iov_len;
1176
2
  offset -= idx * source_[1].iov_len;
1177
1178
2
  return std::pair<size_t, size_t>(idx + 1, offset);
1179
2
}
1180
1181
4.02M
const char* RedisParser::offset_to_pointer(size_t offset) const {
1182
4.02M
  auto p = offset_to_idx_and_local_offset(offset);
1183
4.02M
  return IoVecBegin(source_[p.first]) + p.second;
1184
4.02M
}
1185
1186
// Parses number with specified bounds.
1187
// Number is located in separate line, and contain prefix before actual number.
1188
// Line starts at token_begin_ and pos_ is a start of next line.
1189
Result<ptrdiff_t> RedisParser::ParseNumber(char prefix,
1190
                                           ptrdiff_t min,
1191
                                           ptrdiff_t max,
1192
964k
                                           const char* name) {
1193
964k
  if (char_at_offset(token_begin_) != prefix) {
1194
0
    return STATUS_FORMAT(Corruption,
1195
0
                         "Invalid character before number, expected: $0, but found: $1",
1196
0
                         prefix, char_at_offset(token_begin_));
1197
0
  }
1198
964k
  auto number_begin = token_begin_ + 1;
1199
964k
  auto expected_stop = pos_ - kLineEndLength;
1200
964k
  if (expected_stop - number_begin > kMaxNumberLength) {
1201
0
    return STATUS_FORMAT(
1202
0
        Corruption, "Too long $0 of length $1", name, expected_stop - number_begin);
1203
0
  }
1204
964k
  number_buffer_.reserve(kMaxNumberLength);
1205
964k
  IoVecsToBuffer(source_, number_begin, expected_stop, &number_buffer_);
1206
964k
  number_buffer_.push_back(0);
1207
964k
  auto parsed_number = VERIFY_RESULT(CheckedStoll(
1208
964k
      Slice(number_buffer_.data(), number_buffer_.size() - 1)));
1209
964k
  static_assert(sizeof(parsed_number) == sizeof(ptrdiff_t), "Expected size");
1210
964k
  SCHECK_BOUNDS(parsed_number,
1211
964k
                min,
1212
964k
                max,
1213
964k
                Corruption,
1214
964k
                yb::Format("$0 out of expected range [$1, $2] : $3",
1215
964k
                           name, min, max, parsed_number));
1216
964k
  return static_cast<ptrdiff_t>(parsed_number);
1217
964k
}
1218
1219
104k
void RedisParser::SetArgs(boost::container::small_vector_base<Slice>* args) {
1220
104k
  DCHECK_EQ(source_.size(), 1);
1221
104k
  args_ = args;
1222
104k
}
1223
1224
}  // namespace redisserver
1225
}  // namespace yb