/Users/deen/code/yugabyte-db/src/yb/yql/redis/redisserver/redis_parser.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | // |
13 | | |
14 | | #include "yb/yql/redis/redisserver/redis_parser.h" |
15 | | |
16 | | #include <memory> |
17 | | #include <string> |
18 | | |
19 | | #include <boost/algorithm/string.hpp> |
20 | | |
21 | | #include "yb/client/callbacks.h" |
22 | | #include "yb/client/yb_op.h" |
23 | | |
24 | | #include "yb/common/redis_protocol.pb.h" |
25 | | |
26 | | #include "yb/gutil/casts.h" |
27 | | |
28 | | #include "yb/util/split.h" |
29 | | #include "yb/util/status.h" |
30 | | #include "yb/util/status_format.h" |
31 | | #include "yb/util/stol_utils.h" |
32 | | #include "yb/util/string_case.h" |
33 | | |
34 | | #include "yb/yql/redis/redisserver/redis_constants.h" |
35 | | |
36 | | namespace yb { |
37 | | namespace redisserver { |
38 | | |
39 | | using yb::client::YBTable; |
40 | | using yb::client::YBRedisWriteOp; |
41 | | using yb::client::YBRedisReadOp; |
42 | | using std::vector; |
43 | | using std::shared_ptr; |
44 | | using std::string; |
45 | | |
46 | | namespace { |
47 | | |
48 | | constexpr size_t kMaxNumberOfArgs = 1 << 20; |
49 | | constexpr size_t kLineEndLength = 2; |
50 | | constexpr size_t kMaxNumberLength = 25; |
51 | | constexpr char kPositiveInfinity[] = "+inf"; |
52 | | constexpr char kNegativeInfinity[] = "-inf"; |
53 | | |
54 | 20.4k | string to_lower_case(Slice slice) { |
55 | 20.4k | return boost::to_lower_copy(slice.ToBuffer()); |
56 | 20.4k | } |
57 | | |
58 | 29.7k | CHECKED_STATUS add_string_subkey(string subkey, RedisKeyValuePB* kv_pb) { |
59 | 29.7k | kv_pb->add_subkey()->set_string_subkey(std::move(subkey)); |
60 | 29.7k | return Status::OK(); |
61 | 29.7k | } |
62 | | |
63 | 15.1k | CHECKED_STATUS add_timestamp_subkey(const string &subkey, RedisKeyValuePB *kv_pb) { |
64 | 15.1k | auto timestamp = CheckedStoll(subkey); |
65 | 15.1k | RETURN_NOT_OK(timestamp); |
66 | 15.1k | kv_pb->add_subkey()->set_timestamp_subkey(*timestamp); |
67 | 15.1k | return Status::OK(); |
68 | 15.1k | } |
69 | | |
70 | 10.5k | CHECKED_STATUS add_double_subkey(const string &subkey, RedisKeyValuePB *kv_pb) { |
71 | 10.5k | auto double_key = CheckedStold(subkey); |
72 | 10.5k | RETURN_NOT_OK(double_key); |
73 | 10.5k | kv_pb->add_subkey()->set_double_subkey(*double_key); |
74 | 10.5k | return Status::OK(); |
75 | 10.5k | } |
76 | | |
77 | 2.83k | Result<int64_t> ParseInt64(const Slice& slice, const char* field) { |
78 | 2.83k | auto result = CheckedStoll(slice); |
79 | 2.83k | if (!result.ok()) { |
80 | 0 | return STATUS_SUBSTITUTE(InvalidArgument, |
81 | 0 | "$0 field $1 is not a valid number", field, slice.ToDebugString()); |
82 | 0 | } |
83 | 2.83k | return *result; |
84 | 2.83k | } |
85 | | |
86 | 2.73k | Result<int32_t> ParseInt32(const Slice& slice, const char* field) { |
87 | 2.73k | auto val = ParseInt64(slice, field); |
88 | 2.73k | if (!val.ok()) { |
89 | 0 | return std::move(val.status()); |
90 | 0 | } |
91 | 2.73k | if (*val < std::numeric_limits<int32_t>::min() || |
92 | 2.73k | *val > std::numeric_limits<int32_t>::max()) { |
93 | 0 | return STATUS_SUBSTITUTE(InvalidArgument, |
94 | 0 | "$0 field $1 is not within valid bounds", field, slice.ToDebugString()); |
95 | 0 | } |
96 | 2.73k | return static_cast<int32_t>(*val); |
97 | 2.73k | } |
98 | | |
99 | | } // namespace |
100 | | |
101 | 40.9k | CHECKED_STATUS ParseSet(YBRedisWriteOp *op, const RedisClientCommand& args) { |
102 | 40.9k | if (args[1].empty()) { |
103 | 0 | return STATUS_SUBSTITUTE(InvalidCommand, |
104 | 0 | "A SET request must have a non empty key field"); |
105 | 0 | } |
106 | 40.9k | op->mutable_request()->mutable_set_request(); // Allocates new RedisSetRequestPB(). |
107 | 40.9k | const auto& key = args[1]; |
108 | 40.9k | const auto& value = args[2]; |
109 | 40.9k | op->mutable_request()->mutable_key_value()->set_key(key.cdata(), key.size()); |
110 | 40.9k | op->mutable_request()->mutable_key_value()->add_value(value.cdata(), value.size()); |
111 | 40.9k | op->mutable_request()->mutable_key_value()->set_type(REDIS_TYPE_STRING); |
112 | 40.9k | size_t idx = 3; |
113 | 41.0k | while (idx < args.size()) { |
114 | 57 | string upper_arg; |
115 | 57 | if (args[idx].size() == 2) { |
116 | 57 | ToUpperCase(args[idx].ToBuffer(), &upper_arg); |
117 | 57 | } |
118 | | |
119 | 57 | if (upper_arg == "EX" || upper_arg == "PX") { |
120 | 24 | if (args.size() < idx + 2) { |
121 | 0 | return STATUS_SUBSTITUTE(InvalidCommand, |
122 | 0 | "Expected TTL field after the EX flag, no value found"); |
123 | 0 | } |
124 | 24 | auto ttl_val = ParseInt64(args[idx + 1], "TTL"); |
125 | 24 | RETURN_NOT_OK(ttl_val); |
126 | 24 | if (*ttl_val < kRedisMinTtlSetExSeconds || *ttl_val > kRedisMaxTtlSeconds) { |
127 | 0 | return STATUS_FORMAT(InvalidCommand, |
128 | 0 | "TTL field $0 is not within valid bounds", args[idx + 1]); |
129 | 0 | } |
130 | 24 | const int64_t milliseconds_per_unit = |
131 | 18 | upper_arg == "EX" ? MonoTime::kMillisecondsPerSecond : 1; |
132 | 24 | op->mutable_request()->mutable_set_request()->set_ttl(*ttl_val * milliseconds_per_unit); |
133 | 24 | idx += 2; |
134 | 33 | } else if (upper_arg == "XX") { |
135 | 12 | op->mutable_request()->mutable_set_request()->set_mode(REDIS_WRITEMODE_UPDATE); |
136 | 12 | idx += 1; |
137 | 21 | } else if (upper_arg == "NX") { |
138 | 21 | op->mutable_request()->mutable_set_request()->set_mode(REDIS_WRITEMODE_INSERT); |
139 | 21 | idx += 1; |
140 | 0 | } else { |
141 | 0 | return STATUS_FORMAT(InvalidCommand, |
142 | 0 | "Unidentified argument $0 found while parsing set command", args[idx]); |
143 | 0 | } |
144 | 57 | } |
145 | 40.9k | return Status::OK(); |
146 | 40.9k | } |
147 | | |
148 | 0 | CHECKED_STATUS ParseSetNX(YBRedisWriteOp *op, const RedisClientCommand& args) { |
149 | 0 | if (args[1].empty()) { |
150 | 0 | return STATUS_SUBSTITUTE(InvalidCommand, |
151 | 0 | "A SETNX request must have a non empty key field"); |
152 | 0 | } |
153 | | |
154 | 0 | const auto& key = args[1]; |
155 | 0 | const auto& value = args[2]; |
156 | |
|
157 | 0 | auto key_value = op->mutable_request()->mutable_key_value(); |
158 | 0 | key_value->set_key(key.cdata(), key.size()); |
159 | 0 | key_value->add_value(value.cdata(), value.size()); |
160 | 0 | key_value->set_type(REDIS_TYPE_STRING); |
161 | |
|
162 | 0 | auto set_request = op->mutable_request()->mutable_set_request(); |
163 | 0 | set_request->set_mode(REDIS_WRITEMODE_INSERT); |
164 | | // SETNX returns 1 / 0 (instead of OK / (nil) in 'SET k v NX' command). |
165 | 0 | set_request->set_expect_ok_response(false); |
166 | 0 | return Status::OK(); |
167 | 0 | } |
168 | | |
169 | | // TODO: support MSET |
170 | 0 | CHECKED_STATUS ParseMSet(YBRedisWriteOp *op, const RedisClientCommand& args) { |
171 | 0 | if (args.size() < 3 || args.size() % 2 == 0) { |
172 | 0 | return STATUS_SUBSTITUTE(InvalidCommand, |
173 | 0 | "An MSET request must have at least 3, odd number of arguments, found $0", args.size()); |
174 | 0 | } |
175 | 0 | return STATUS(InvalidCommand, "MSET command not yet supported"); |
176 | 0 | } |
177 | | |
178 | 0 | CHECKED_STATUS ParseHSet(YBRedisWriteOp *op, const RedisClientCommand& args) { |
179 | 0 | const auto& key = args[1]; |
180 | 0 | const auto& subkey = args[2]; |
181 | 0 | const auto& value = args[3]; |
182 | |
|
183 | 0 | op->mutable_request()->mutable_set_request(); // Allocates new RedisSetRequestPB(). |
184 | 0 | op->mutable_request()->mutable_key_value()->set_key(key.cdata(), key.size()); |
185 | 0 | op->mutable_request()->mutable_key_value()->set_type(REDIS_TYPE_HASH); |
186 | 0 | op->mutable_request()->mutable_key_value()->add_subkey()->set_string_subkey(subkey.cdata(), |
187 | 0 | subkey.size()); |
188 | 0 | op->mutable_request()->mutable_key_value()->add_value(value.cdata(), value.size()); |
189 | 0 | return Status::OK(); |
190 | 0 | } |
191 | | |
192 | | |
193 | 0 | CHECKED_STATUS ParseHIncrBy(YBRedisWriteOp *op, const RedisClientCommand& args) { |
194 | 0 | const auto& key = args[1]; |
195 | 0 | const auto& subkey = args[2]; |
196 | 0 | const auto& incr_by = ParseInt64(args[3], "INCR_BY"); |
197 | 0 | RETURN_NOT_OK(incr_by); |
198 | 0 | op->mutable_request()->mutable_incr_request()->set_increment_int(*incr_by); |
199 | 0 | op->mutable_request()->mutable_key_value()->set_key(key.cdata(), key.size()); |
200 | 0 | op->mutable_request()->mutable_key_value()->set_type(REDIS_TYPE_HASH); |
201 | 0 | op->mutable_request()->mutable_key_value()->add_subkey()->set_string_subkey(subkey.cdata(), |
202 | 0 | subkey.size()); |
203 | 0 | return Status::OK(); |
204 | 0 | } |
205 | | |
206 | | CHECKED_STATUS ParseZAddOptions( |
207 | 10.5k | SortedSetOptionsPB *options, const RedisClientCommand& args, size_t *idx) { |
208 | | // While we keep seeing flags, set the appropriate field in options and increment idx. When |
209 | | // we finally stop seeing flags, the idx will be set to that token for later parsing. |
210 | | // Note that we can see duplicate flags, and it should have the same behavior as seeing the |
211 | | // flag once. |
212 | 10.5k | while (*idx < args.size()) { |
213 | 10.5k | if (boost::iequals(args[*idx].ToBuffer(), kCH)) { |
214 | 0 | options->set_ch(true); |
215 | 10.5k | } else if (boost::iequals(args[*idx].ToBuffer(), kINCR)) { |
216 | 0 | options->set_incr(true); |
217 | 10.5k | } else if (boost::iequals(args[*idx].ToBuffer(), kNX)) { |
218 | 0 | if (options->update_options() == SortedSetOptionsPB_UpdateOptions_XX) { |
219 | 0 | return STATUS_SUBSTITUTE(InvalidArgument, |
220 | 0 | "XX and NX options at the same time are not compatible"); |
221 | 0 | } |
222 | 0 | options->set_update_options(SortedSetOptionsPB_UpdateOptions_NX); |
223 | 10.5k | } else if (boost::iequals(args[*idx].ToBuffer(), kXX)) { |
224 | 0 | if (options->update_options() == SortedSetOptionsPB_UpdateOptions_NX) { |
225 | 0 | return STATUS_SUBSTITUTE(InvalidArgument, |
226 | 0 | "XX and NX options at the same time are not compatible"); |
227 | 0 | } |
228 | 0 | options->set_update_options(SortedSetOptionsPB_UpdateOptions_XX); |
229 | 10.5k | } else { |
230 | | // We have encountered a non-option token, return. |
231 | 10.5k | return Status::OK(); |
232 | 10.5k | } |
233 | 0 | *idx = *idx + 1; |
234 | 0 | } |
235 | 0 | return Status::OK(); |
236 | 10.5k | } |
237 | | |
238 | | template <typename AddSubKey> |
239 | | CHECKED_STATUS ParseHMSetLikeCommands(YBRedisWriteOp *op, const RedisClientCommand& args, |
240 | | const RedisDataType& type, |
241 | 20.4k | AddSubKey add_sub_key) { |
242 | 20.4k | if (args.size() < 4 || (args.size() % 2 == 1 && type == REDIS_TYPE_HASH)) { |
243 | 0 | return STATUS_SUBSTITUTE(InvalidArgument, |
244 | 0 | "wrong number of arguments: $0 for command: $1", args.size(), |
245 | 0 | string(args[0].cdata(), args[0].size())); |
246 | 0 | } |
247 | | |
248 | 20.4k | op->mutable_request()->mutable_set_request(); // Allocates new RedisSetRequestPB(). |
249 | 20.4k | op->mutable_request()->mutable_key_value()->set_type(type); |
250 | 20.4k | op->mutable_request()->mutable_key_value()->set_key(args[1].cdata(), args[1].size()); |
251 | | |
252 | 20.4k | if (type == REDIS_TYPE_HASH) { |
253 | 9.90k | op->mutable_request()->mutable_set_request()->set_expect_ok_response(true); |
254 | 9.90k | } |
255 | | |
256 | 20.4k | size_t start_idx = 2; |
257 | 20.4k | if (type == REDIS_TYPE_SORTEDSET) { |
258 | 10.5k | RETURN_NOT_OK(ParseZAddOptions( |
259 | 10.5k | op->mutable_request()->mutable_set_request()->mutable_sorted_set_options(), |
260 | 10.5k | args, &start_idx)); |
261 | | |
262 | | // If the INCR flag is set, can only have one [score member] pair. |
263 | 10.5k | if (op->request().set_request().sorted_set_options().incr() && (args.size() - start_idx) != 2) { |
264 | 0 | return STATUS_SUBSTITUTE(InvalidArgument, |
265 | 0 | "wrong number of tokens after INCR flag specified: Need 2 but found " |
266 | 0 | "$0 for command: $1", args.size() - start_idx, |
267 | 0 | string(args[0].cdata(), args[0].size())); |
268 | 0 | } |
269 | 20.4k | } |
270 | | |
271 | | // Need [score member] to come in pairs. |
272 | 20.4k | if ((args.size() - start_idx) % 2 == 1 || args.size() - start_idx == 0) { |
273 | 0 | return STATUS_SUBSTITUTE(InvalidArgument, |
274 | 0 | "Expect even and non-zero number of arguments " |
275 | 0 | "for command: $0, found $1", |
276 | 0 | string(args[0].cdata(), args[0].size()), args.size() - start_idx); |
277 | 0 | } |
278 | | |
279 | 20.4k | std::unordered_map<string, string> kv_map; |
280 | 75.5k | for (size_t i = start_idx; i < args.size(); i += 2) { |
281 | | // EXPIRE_AT/EXPIRE_IN only supported for redis timeseries currently. |
282 | 55.0k | if (type == REDIS_TYPE_TIMESERIES) { |
283 | 14.8k | string upper_arg; |
284 | 14.8k | ToUpperCase(args[i].ToBuffer(), &upper_arg); |
285 | 14.8k | if (upper_arg == kExpireAt || upper_arg == kExpireIn) { |
286 | 6 | if (i + 2 != args.size()) { |
287 | 0 | return STATUS_SUBSTITUTE(InvalidCommand, "$0 should be at the end of the command", |
288 | 0 | string(args[i].cdata(), args[i].size())); |
289 | 0 | } |
290 | 6 | auto temp = CheckedStoll(args[i + 1]); |
291 | 6 | RETURN_NOT_OK(temp); |
292 | 6 | int64_t ttl = 0; |
293 | 6 | if (upper_arg == kExpireIn) { |
294 | 6 | ttl = *temp; |
295 | 6 | if (ttl > kRedisMaxTtlSeconds || ttl < kRedisMinTtlSetExSeconds) { |
296 | 0 | return STATUS_SUBSTITUTE(InvalidCommand, "TTL: $0 needs be in the range [$1, $2]", ttl, |
297 | 0 | kRedisMinTtlSetExSeconds, kRedisMaxTtlSeconds); |
298 | 0 | } |
299 | 0 | } else { |
300 | 0 | auto current_time = GetCurrentTimeMicros() / MonoTime::kMicrosecondsPerSecond; |
301 | 0 | ttl = *temp - current_time; |
302 | 0 | if (ttl > kRedisMaxTtlSeconds || ttl < kRedisMinTtlSetExSeconds) { |
303 | 0 | return STATUS_SUBSTITUTE(InvalidCommand, "EXPIRE_AT: $0 needs be in the range [$1, $2]", |
304 | 0 | *temp, |
305 | 0 | kRedisMinTtlSetExSeconds + current_time, |
306 | 0 | kRedisMaxTtlSeconds + current_time); |
307 | 0 | } |
308 | 6 | } |
309 | | |
310 | | // Need to pass ttl in milliseconds, user supplied values are in seconds. |
311 | 6 | op->mutable_request()->mutable_set_request()->set_ttl( |
312 | 6 | ttl * MonoTime::kMillisecondsPerSecond); |
313 | 14.8k | } else { |
314 | 14.8k | kv_map[args[i].ToBuffer()] = args[i + 1].ToBuffer(); |
315 | 14.8k | } |
316 | 40.2k | } else if (type == REDIS_TYPE_SORTEDSET) { |
317 | | // For sorted sets, we store the mapping from values to scores, since values are distinct |
318 | | // but scores aren't. |
319 | 10.5k | kv_map[args[i + 1].ToBuffer()] = args[i].ToBuffer(); |
320 | 29.7k | } else { |
321 | 29.7k | kv_map[args[i].ToBuffer()] = args[i + 1].ToBuffer(); |
322 | 29.7k | } |
323 | 55.0k | } |
324 | | |
325 | 55.0k | for (const auto& kv : kv_map) { |
326 | 55.0k | auto req_kv = op->mutable_request()->mutable_key_value(); |
327 | 55.0k | if (type == REDIS_TYPE_SORTEDSET) { |
328 | | // Since the mapping is values to scores, need to reverse when creating the request. |
329 | 10.5k | RETURN_NOT_OK(add_sub_key(kv.second, req_kv)); |
330 | 10.5k | req_kv->add_value(kv.first); |
331 | 44.5k | } else { |
332 | 44.5k | RETURN_NOT_OK(add_sub_key(kv.first, req_kv)); |
333 | 44.5k | req_kv->add_value(kv.second); |
334 | 44.5k | } |
335 | 55.0k | } |
336 | 20.4k | return Status::OK(); |
337 | 20.4k | } _ZN2yb11redisserver22ParseHMSetLikeCommandsIPFNS_6StatusENSt3__112basic_stringIcNS3_11char_traitsIcEENS3_9allocatorIcEEEEPNS_15RedisKeyValuePBEEEES2_PNS_6client14YBRedisWriteOpERKN5boost9container12small_vectorINS_5SliceELm8EvvEERKNS_13RedisDataTypeET_ Line | Count | Source | 241 | 9.90k | AddSubKey add_sub_key) { | 242 | 9.90k | if (args.size() < 4 || (args.size() % 2 == 1 && type == REDIS_TYPE_HASH)) { | 243 | 0 | return STATUS_SUBSTITUTE(InvalidArgument, | 244 | 0 | "wrong number of arguments: $0 for command: $1", args.size(), | 245 | 0 | string(args[0].cdata(), args[0].size())); | 246 | 0 | } | 247 | | | 248 | 9.90k | op->mutable_request()->mutable_set_request(); // Allocates new RedisSetRequestPB(). | 249 | 9.90k | op->mutable_request()->mutable_key_value()->set_type(type); | 250 | 9.90k | op->mutable_request()->mutable_key_value()->set_key(args[1].cdata(), args[1].size()); | 251 | | | 252 | 9.90k | if (type == REDIS_TYPE_HASH) { | 253 | 9.90k | op->mutable_request()->mutable_set_request()->set_expect_ok_response(true); | 254 | 9.90k | } | 255 | | | 256 | 9.90k | size_t start_idx = 2; | 257 | 9.90k | if (type == REDIS_TYPE_SORTEDSET) { | 258 | 0 | RETURN_NOT_OK(ParseZAddOptions( | 259 | 0 | op->mutable_request()->mutable_set_request()->mutable_sorted_set_options(), | 260 | 0 | args, &start_idx)); | 261 | | | 262 | | // If the INCR flag is set, can only have one [score member] pair. | 263 | 0 | if (op->request().set_request().sorted_set_options().incr() && (args.size() - start_idx) != 2) { | 264 | 0 | return STATUS_SUBSTITUTE(InvalidArgument, | 265 | 0 | "wrong number of tokens after INCR flag specified: Need 2 but found " | 266 | 0 | "$0 for command: $1", args.size() - start_idx, | 267 | 0 | string(args[0].cdata(), args[0].size())); | 268 | 0 | } | 269 | 9.90k | } | 270 | | | 271 | | // Need [score member] to come in pairs. | 272 | 9.90k | if ((args.size() - start_idx) % 2 == 1 || args.size() - start_idx == 0) { | 273 | 0 | return STATUS_SUBSTITUTE(InvalidArgument, | 274 | 0 | "Expect even and non-zero number of arguments " | 275 | 0 | "for command: $0, found $1", | 276 | 0 | string(args[0].cdata(), args[0].size()), args.size() - start_idx); | 277 | 0 | } | 278 | | | 279 | 9.90k | std::unordered_map<string, string> kv_map; | 280 | 39.6k | for (size_t i = start_idx; i < args.size(); i += 2) { | 281 | | // EXPIRE_AT/EXPIRE_IN only supported for redis timeseries currently. | 282 | 29.7k | if (type == REDIS_TYPE_TIMESERIES) { | 283 | 0 | string upper_arg; | 284 | 0 | ToUpperCase(args[i].ToBuffer(), &upper_arg); | 285 | 0 | if (upper_arg == kExpireAt || upper_arg == kExpireIn) { | 286 | 0 | if (i + 2 != args.size()) { | 287 | 0 | return STATUS_SUBSTITUTE(InvalidCommand, "$0 should be at the end of the command", | 288 | 0 | string(args[i].cdata(), args[i].size())); | 289 | 0 | } | 290 | 0 | auto temp = CheckedStoll(args[i + 1]); | 291 | 0 | RETURN_NOT_OK(temp); | 292 | 0 | int64_t ttl = 0; | 293 | 0 | if (upper_arg == kExpireIn) { | 294 | 0 | ttl = *temp; | 295 | 0 | if (ttl > kRedisMaxTtlSeconds || ttl < kRedisMinTtlSetExSeconds) { | 296 | 0 | return STATUS_SUBSTITUTE(InvalidCommand, "TTL: $0 needs be in the range [$1, $2]", ttl, | 297 | 0 | kRedisMinTtlSetExSeconds, kRedisMaxTtlSeconds); | 298 | 0 | } | 299 | 0 | } else { | 300 | 0 | auto current_time = GetCurrentTimeMicros() / MonoTime::kMicrosecondsPerSecond; | 301 | 0 | ttl = *temp - current_time; | 302 | 0 | if (ttl > kRedisMaxTtlSeconds || ttl < kRedisMinTtlSetExSeconds) { | 303 | 0 | return STATUS_SUBSTITUTE(InvalidCommand, "EXPIRE_AT: $0 needs be in the range [$1, $2]", | 304 | 0 | *temp, | 305 | 0 | kRedisMinTtlSetExSeconds + current_time, | 306 | 0 | kRedisMaxTtlSeconds + current_time); | 307 | 0 | } | 308 | 0 | } | 309 | | | 310 | | // Need to pass ttl in milliseconds, user supplied values are in seconds. | 311 | 0 | op->mutable_request()->mutable_set_request()->set_ttl( | 312 | 0 | ttl * MonoTime::kMillisecondsPerSecond); | 313 | 0 | } else { | 314 | 0 | kv_map[args[i].ToBuffer()] = args[i + 1].ToBuffer(); | 315 | 0 | } | 316 | 29.7k | } else if (type == REDIS_TYPE_SORTEDSET) { | 317 | | // For sorted sets, we store the mapping from values to scores, since values are distinct | 318 | | // but scores aren't. | 319 | 0 | kv_map[args[i + 1].ToBuffer()] = args[i].ToBuffer(); | 320 | 29.7k | } else { | 321 | 29.7k | kv_map[args[i].ToBuffer()] = args[i + 1].ToBuffer(); | 322 | 29.7k | } | 323 | 29.7k | } | 324 | | | 325 | 29.7k | for (const auto& kv : kv_map) { | 326 | 29.7k | auto req_kv = op->mutable_request()->mutable_key_value(); | 327 | 29.7k | if (type == REDIS_TYPE_SORTEDSET) { | 328 | | // Since the mapping is values to scores, need to reverse when creating the request. | 329 | 0 | RETURN_NOT_OK(add_sub_key(kv.second, req_kv)); | 330 | 0 | req_kv->add_value(kv.first); | 331 | 29.7k | } else { | 332 | 29.7k | RETURN_NOT_OK(add_sub_key(kv.first, req_kv)); | 333 | 29.7k | req_kv->add_value(kv.second); | 334 | 29.7k | } | 335 | 29.7k | } | 336 | 9.90k | return Status::OK(); | 337 | 9.90k | } |
_ZN2yb11redisserver22ParseHMSetLikeCommandsIPFNS_6StatusERKNSt3__112basic_stringIcNS3_11char_traitsIcEENS3_9allocatorIcEEEEPNS_15RedisKeyValuePBEEEES2_PNS_6client14YBRedisWriteOpERKN5boost9container12small_vectorINS_5SliceELm8EvvEERKNS_13RedisDataTypeET_ Line | Count | Source | 241 | 10.5k | AddSubKey add_sub_key) { | 242 | 10.5k | if (args.size() < 4 || (args.size() % 2 == 1 && type == REDIS_TYPE_HASH)) { | 243 | 0 | return STATUS_SUBSTITUTE(InvalidArgument, | 244 | 0 | "wrong number of arguments: $0 for command: $1", args.size(), | 245 | 0 | string(args[0].cdata(), args[0].size())); | 246 | 0 | } | 247 | | | 248 | 10.5k | op->mutable_request()->mutable_set_request(); // Allocates new RedisSetRequestPB(). | 249 | 10.5k | op->mutable_request()->mutable_key_value()->set_type(type); | 250 | 10.5k | op->mutable_request()->mutable_key_value()->set_key(args[1].cdata(), args[1].size()); | 251 | | | 252 | 10.5k | if (type == REDIS_TYPE_HASH) { | 253 | 0 | op->mutable_request()->mutable_set_request()->set_expect_ok_response(true); | 254 | 0 | } | 255 | | | 256 | 10.5k | size_t start_idx = 2; | 257 | 10.5k | if (type == REDIS_TYPE_SORTEDSET) { | 258 | 10.5k | RETURN_NOT_OK(ParseZAddOptions( | 259 | 10.5k | op->mutable_request()->mutable_set_request()->mutable_sorted_set_options(), | 260 | 10.5k | args, &start_idx)); | 261 | | | 262 | | // If the INCR flag is set, can only have one [score member] pair. | 263 | 10.5k | if (op->request().set_request().sorted_set_options().incr() && (args.size() - start_idx) != 2) { | 264 | 0 | return STATUS_SUBSTITUTE(InvalidArgument, | 265 | 0 | "wrong number of tokens after INCR flag specified: Need 2 but found " | 266 | 0 | "$0 for command: $1", args.size() - start_idx, | 267 | 0 | string(args[0].cdata(), args[0].size())); | 268 | 0 | } | 269 | 10.5k | } | 270 | | | 271 | | // Need [score member] to come in pairs. | 272 | 10.5k | if ((args.size() - start_idx) % 2 == 1 || args.size() - start_idx == 0) { | 273 | 0 | return STATUS_SUBSTITUTE(InvalidArgument, | 274 | 0 | "Expect even and non-zero number of arguments " | 275 | 0 | "for command: $0, found $1", | 276 | 0 | string(args[0].cdata(), args[0].size()), args.size() - start_idx); | 277 | 0 | } | 278 | | | 279 | 10.5k | std::unordered_map<string, string> kv_map; | 280 | 35.9k | for (size_t i = start_idx; i < args.size(); i += 2) { | 281 | | // EXPIRE_AT/EXPIRE_IN only supported for redis timeseries currently. | 282 | 25.3k | if (type == REDIS_TYPE_TIMESERIES) { | 283 | 14.8k | string upper_arg; | 284 | 14.8k | ToUpperCase(args[i].ToBuffer(), &upper_arg); | 285 | 14.8k | if (upper_arg == kExpireAt || upper_arg == kExpireIn) { | 286 | 6 | if (i + 2 != args.size()) { | 287 | 0 | return STATUS_SUBSTITUTE(InvalidCommand, "$0 should be at the end of the command", | 288 | 0 | string(args[i].cdata(), args[i].size())); | 289 | 0 | } | 290 | 6 | auto temp = CheckedStoll(args[i + 1]); | 291 | 6 | RETURN_NOT_OK(temp); | 292 | 6 | int64_t ttl = 0; | 293 | 6 | if (upper_arg == kExpireIn) { | 294 | 6 | ttl = *temp; | 295 | 6 | if (ttl > kRedisMaxTtlSeconds || ttl < kRedisMinTtlSetExSeconds) { | 296 | 0 | return STATUS_SUBSTITUTE(InvalidCommand, "TTL: $0 needs be in the range [$1, $2]", ttl, | 297 | 0 | kRedisMinTtlSetExSeconds, kRedisMaxTtlSeconds); | 298 | 0 | } | 299 | 0 | } else { | 300 | 0 | auto current_time = GetCurrentTimeMicros() / MonoTime::kMicrosecondsPerSecond; | 301 | 0 | ttl = *temp - current_time; | 302 | 0 | if (ttl > kRedisMaxTtlSeconds || ttl < kRedisMinTtlSetExSeconds) { | 303 | 0 | return STATUS_SUBSTITUTE(InvalidCommand, "EXPIRE_AT: $0 needs be in the range [$1, $2]", | 304 | 0 | *temp, | 305 | 0 | kRedisMinTtlSetExSeconds + current_time, | 306 | 0 | kRedisMaxTtlSeconds + current_time); | 307 | 0 | } | 308 | 6 | } | 309 | | | 310 | | // Need to pass ttl in milliseconds, user supplied values are in seconds. | 311 | 6 | op->mutable_request()->mutable_set_request()->set_ttl( | 312 | 6 | ttl * MonoTime::kMillisecondsPerSecond); | 313 | 14.8k | } else { | 314 | 14.8k | kv_map[args[i].ToBuffer()] = args[i + 1].ToBuffer(); | 315 | 14.8k | } | 316 | 10.5k | } else if (type == REDIS_TYPE_SORTEDSET) { | 317 | | // For sorted sets, we store the mapping from values to scores, since values are distinct | 318 | | // but scores aren't. | 319 | 10.5k | kv_map[args[i + 1].ToBuffer()] = args[i].ToBuffer(); | 320 | 0 | } else { | 321 | 0 | kv_map[args[i].ToBuffer()] = args[i + 1].ToBuffer(); | 322 | 0 | } | 323 | 25.3k | } | 324 | | | 325 | 25.3k | for (const auto& kv : kv_map) { | 326 | 25.3k | auto req_kv = op->mutable_request()->mutable_key_value(); | 327 | 25.3k | if (type == REDIS_TYPE_SORTEDSET) { | 328 | | // Since the mapping is values to scores, need to reverse when creating the request. | 329 | 10.5k | RETURN_NOT_OK(add_sub_key(kv.second, req_kv)); | 330 | 10.5k | req_kv->add_value(kv.first); | 331 | 14.8k | } else { | 332 | 14.8k | RETURN_NOT_OK(add_sub_key(kv.first, req_kv)); | 333 | 14.8k | req_kv->add_value(kv.second); | 334 | 14.8k | } | 335 | 25.3k | } | 336 | 10.5k | return Status::OK(); | 337 | 10.5k | } |
|
338 | | |
339 | 9.90k | CHECKED_STATUS ParseHMSet(YBRedisWriteOp *op, const RedisClientCommand& args) { |
340 | 0 | DCHECK_EQ("hmset", to_lower_case(args[0])) |
341 | 0 | << "Parsing hmset request where first arg is not hmset."; |
342 | 9.90k | return ParseHMSetLikeCommands(op, args, REDIS_TYPE_HASH, add_string_subkey); |
343 | 9.90k | } |
344 | | |
345 | 87 | CHECKED_STATUS ParseTsAdd(YBRedisWriteOp *op, const RedisClientCommand& args) { |
346 | 0 | DCHECK_EQ("tsadd", to_lower_case(args[0])) |
347 | 0 | << "Parsing hmset request where first arg is not hmset."; |
348 | 87 | return ParseHMSetLikeCommands(op, args, REDIS_TYPE_TIMESERIES, |
349 | 87 | add_timestamp_subkey); |
350 | 87 | } |
351 | | |
352 | 10.5k | Status ParseZAdd(YBRedisWriteOp *op, const RedisClientCommand& args) { |
353 | 0 | DCHECK_EQ("zadd", to_lower_case(args[0])) |
354 | 0 | << "Parsing zadd request where first arg is not zadd."; |
355 | 10.5k | return ParseHMSetLikeCommands(op, args, REDIS_TYPE_SORTEDSET, add_double_subkey); |
356 | 10.5k | } |
357 | | |
358 | 0 | Status ParsePush(YBRedisWriteOp *op, const RedisClientCommand& args, RedisSide side) { |
359 | 0 | op->mutable_request()->mutable_push_request()->set_side(side); |
360 | 0 | op->mutable_request()->mutable_key_value()->set_key(args[1].cdata(), args[1].size()); |
361 | 0 | op->mutable_request()->mutable_key_value()->set_type(REDIS_TYPE_LIST); |
362 | |
|
363 | 0 | auto mutable_key = op->mutable_request()->mutable_key_value(); |
364 | 0 | for (size_t i = 2; i < args.size(); ++i) { |
365 | 0 | mutable_key->add_value(args[i].cdata(), args[i].size()); |
366 | 0 | } |
367 | 0 | return Status::OK(); |
368 | 0 | } |
369 | | |
370 | 0 | Status ParseLPush(YBRedisWriteOp *op, const RedisClientCommand& args) { |
371 | 0 | return ParsePush(op, args, REDIS_SIDE_LEFT); |
372 | 0 | } |
373 | | |
374 | 0 | Status ParseRPush(YBRedisWriteOp *op, const RedisClientCommand& args) { |
375 | 0 | return ParsePush(op, args, REDIS_SIDE_RIGHT); |
376 | 0 | } |
377 | | |
378 | | template <typename YBRedisOp, typename AddSubKey> |
379 | | CHECKED_STATUS ParseCollection(YBRedisOp *op, |
380 | | const RedisClientCommand& args, |
381 | | boost::optional<RedisDataType> type, |
382 | | AddSubKey add_sub_key, |
383 | 27 | bool remove_duplicates = true) { |
384 | 27 | const auto& key = args[1]; |
385 | 27 | op->mutable_request()->mutable_key_value()->set_key(key.cdata(), key.size()); |
386 | 27 | if (type) { |
387 | 9 | op->mutable_request()->mutable_key_value()->set_type(*type); |
388 | 9 | } |
389 | 27 | if (remove_duplicates) { |
390 | | // We remove duplicates from the subkeys here. |
391 | 9 | std::set<string> subkey_set; |
392 | 315 | for (size_t i = 2; i < args.size(); i++) { |
393 | 306 | subkey_set.insert(string(args[i].cdata(), args[i].size())); |
394 | 306 | } |
395 | 9 | op->mutable_request()->mutable_key_value()->mutable_subkey()->Reserve( |
396 | 9 | narrow_cast<int>(subkey_set.size())); |
397 | 306 | for (const auto &val : subkey_set) { |
398 | 306 | RETURN_NOT_OK(add_sub_key(val, op->mutable_request()->mutable_key_value())); |
399 | 306 | } |
400 | 18 | } else { |
401 | 18 | op->mutable_request()->mutable_key_value()->mutable_subkey()->Reserve( |
402 | 18 | narrow_cast<int>(args.size() - 2)); |
403 | 18 | for (size_t i = 2; i < args.size(); i++) { |
404 | 0 | RETURN_NOT_OK(add_sub_key(string(args[i].cdata(), args[i].size()), |
405 | 0 | op->mutable_request()->mutable_key_value())); |
406 | 0 | } |
407 | 18 | } |
408 | 27 | return Status::OK(); |
409 | 27 | } Unexecuted instantiation: _ZN2yb11redisserver15ParseCollectionINS_6client14YBRedisWriteOpEPFNS_6StatusENSt3__112basic_stringIcNS5_11char_traitsIcEENS5_9allocatorIcEEEEPNS_15RedisKeyValuePBEEEES4_PT_RKN5boost9container12small_vectorINS_5SliceELm8EvvEENSI_8optionalINS_13RedisDataTypeEEET0_b _ZN2yb11redisserver15ParseCollectionINS_6client14YBRedisWriteOpEPFNS_6StatusERKNSt3__112basic_stringIcNS5_11char_traitsIcEENS5_9allocatorIcEEEEPNS_15RedisKeyValuePBEEEES4_PT_RKN5boost9container12small_vectorINS_5SliceELm8EvvEENSK_8optionalINS_13RedisDataTypeEEET0_b Line | Count | Source | 383 | 9 | bool remove_duplicates = true) { | 384 | 9 | const auto& key = args[1]; | 385 | 9 | op->mutable_request()->mutable_key_value()->set_key(key.cdata(), key.size()); | 386 | 9 | if (type) { | 387 | 9 | op->mutable_request()->mutable_key_value()->set_type(*type); | 388 | 9 | } | 389 | 9 | if (remove_duplicates) { | 390 | | // We remove duplicates from the subkeys here. | 391 | 9 | std::set<string> subkey_set; | 392 | 315 | for (size_t i = 2; i < args.size(); i++) { | 393 | 306 | subkey_set.insert(string(args[i].cdata(), args[i].size())); | 394 | 306 | } | 395 | 9 | op->mutable_request()->mutable_key_value()->mutable_subkey()->Reserve( | 396 | 9 | narrow_cast<int>(subkey_set.size())); | 397 | 306 | for (const auto &val : subkey_set) { | 398 | 306 | RETURN_NOT_OK(add_sub_key(val, op->mutable_request()->mutable_key_value())); | 399 | 306 | } | 400 | 0 | } else { | 401 | 0 | op->mutable_request()->mutable_key_value()->mutable_subkey()->Reserve( | 402 | 0 | narrow_cast<int>(args.size() - 2)); | 403 | 0 | for (size_t i = 2; i < args.size(); i++) { | 404 | 0 | RETURN_NOT_OK(add_sub_key(string(args[i].cdata(), args[i].size()), | 405 | 0 | op->mutable_request()->mutable_key_value())); | 406 | 0 | } | 407 | 0 | } | 408 | 9 | return Status::OK(); | 409 | 9 | } |
_ZN2yb11redisserver15ParseCollectionINS_6client13YBRedisReadOpEPFNS_6StatusENSt3__112basic_stringIcNS5_11char_traitsIcEENS5_9allocatorIcEEEEPNS_15RedisKeyValuePBEEEES4_PT_RKN5boost9container12small_vectorINS_5SliceELm8EvvEENSI_8optionalINS_13RedisDataTypeEEET0_b Line | Count | Source | 383 | 18 | bool remove_duplicates = true) { | 384 | 18 | const auto& key = args[1]; | 385 | 18 | op->mutable_request()->mutable_key_value()->set_key(key.cdata(), key.size()); | 386 | 18 | if (type) { | 387 | 0 | op->mutable_request()->mutable_key_value()->set_type(*type); | 388 | 0 | } | 389 | 18 | if (remove_duplicates) { | 390 | | // We remove duplicates from the subkeys here. | 391 | 0 | std::set<string> subkey_set; | 392 | 0 | for (size_t i = 2; i < args.size(); i++) { | 393 | 0 | subkey_set.insert(string(args[i].cdata(), args[i].size())); | 394 | 0 | } | 395 | 0 | op->mutable_request()->mutable_key_value()->mutable_subkey()->Reserve( | 396 | 0 | narrow_cast<int>(subkey_set.size())); | 397 | 0 | for (const auto &val : subkey_set) { | 398 | 0 | RETURN_NOT_OK(add_sub_key(val, op->mutable_request()->mutable_key_value())); | 399 | 0 | } | 400 | 18 | } else { | 401 | 18 | op->mutable_request()->mutable_key_value()->mutable_subkey()->Reserve( | 402 | 18 | narrow_cast<int>(args.size() - 2)); | 403 | 18 | for (size_t i = 2; i < args.size(); i++) { | 404 | 0 | RETURN_NOT_OK(add_sub_key(string(args[i].cdata(), args[i].size()), | 405 | 0 | op->mutable_request()->mutable_key_value())); | 406 | 0 | } | 407 | 18 | } | 408 | 18 | return Status::OK(); | 409 | 18 | } |
|
410 | | |
411 | 0 | CHECKED_STATUS ParseHDel(YBRedisWriteOp *op, const RedisClientCommand& args) { |
412 | 0 | op->mutable_request()->mutable_del_request(); // Allocates new RedisDelRequestPB(). |
413 | 0 | return ParseCollection(op, args, REDIS_TYPE_HASH, add_string_subkey); |
414 | 0 | } |
415 | | |
416 | 9 | CHECKED_STATUS ParseTsRem(YBRedisWriteOp *op, const RedisClientCommand& args) { |
417 | 9 | op->mutable_request()->mutable_del_request(); // Allocates new RedisDelRequestPB(). |
418 | 9 | return ParseCollection(op, args, REDIS_TYPE_TIMESERIES, add_timestamp_subkey); |
419 | 9 | } |
420 | | |
421 | 0 | CHECKED_STATUS ParseZRem(YBRedisWriteOp *op, const RedisClientCommand& args) { |
422 | 0 | op->mutable_request()->mutable_del_request(); // Allocates new RedisDelRequestPB(). |
423 | 0 | return ParseCollection(op, args, REDIS_TYPE_SORTEDSET, add_string_subkey); |
424 | 0 | } |
425 | | |
426 | 0 | CHECKED_STATUS ParseSAdd(YBRedisWriteOp *op, const RedisClientCommand& args) { |
427 | 0 | op->mutable_request()->mutable_add_request(); // Allocates new RedisAddRequestPB(). |
428 | 0 | return ParseCollection(op, args, REDIS_TYPE_SET, add_string_subkey); |
429 | 0 | } |
430 | | |
431 | 0 | CHECKED_STATUS ParseSRem(YBRedisWriteOp *op, const RedisClientCommand& args) { |
432 | 0 | op->mutable_request()->mutable_del_request(); // Allocates new RedisDelRequestPB(). |
433 | 0 | return ParseCollection(op, args, REDIS_TYPE_SET, add_string_subkey); |
434 | 0 | } |
435 | | |
436 | 0 | CHECKED_STATUS ParsePop(YBRedisWriteOp *op, const RedisClientCommand& args, RedisSide side) { |
437 | 0 | op->mutable_request()->mutable_pop_request()->set_side(side); |
438 | 0 | op->mutable_request()->mutable_key_value()->set_key(args[1].cdata(), args[1].size()); |
439 | 0 | op->mutable_request()->mutable_key_value()->set_type(REDIS_TYPE_LIST); |
440 | 0 | return Status::OK(); |
441 | 0 | } |
442 | | |
443 | 0 | CHECKED_STATUS ParseLPop(YBRedisWriteOp *op, const RedisClientCommand& args) { |
444 | 0 | return ParsePop(op, args, REDIS_SIDE_LEFT); |
445 | 0 | } |
446 | | |
447 | 0 | CHECKED_STATUS ParseRPop(YBRedisWriteOp *op, const RedisClientCommand& args) { |
448 | 0 | return ParsePop(op, args, REDIS_SIDE_RIGHT); |
449 | 0 | } |
450 | | |
451 | 0 | CHECKED_STATUS ParseGetSet(YBRedisWriteOp *op, const RedisClientCommand& args) { |
452 | 0 | const auto& key = args[1]; |
453 | 0 | const auto& value = args[2]; |
454 | 0 | op->mutable_request()->mutable_getset_request(); // Allocates new RedisGetSetRequestPB(). |
455 | 0 | op->mutable_request()->mutable_key_value()->set_key(key.cdata(), key.size()); |
456 | 0 | op->mutable_request()->mutable_key_value()->add_value(value.cdata(), value.size()); |
457 | 0 | return Status::OK(); |
458 | 0 | } |
459 | | |
460 | 0 | CHECKED_STATUS ParseAppend(YBRedisWriteOp* op, const RedisClientCommand& args) { |
461 | 0 | const auto& key = args[1]; |
462 | 0 | const auto& value = args[2]; |
463 | 0 | op->mutable_request()->mutable_append_request(); // Allocates new RedisAppendRequestPB(). |
464 | 0 | op->mutable_request()->mutable_key_value()->set_key(key.cdata(), key.size()); |
465 | 0 | op->mutable_request()->mutable_key_value()->add_value(value.cdata(), value.size()); |
466 | 0 | return Status::OK(); |
467 | 0 | } |
468 | | |
469 | | // Note: deleting only one key is supported using one command as of now. |
470 | 3 | CHECKED_STATUS ParseDel(YBRedisWriteOp* op, const RedisClientCommand& args) { |
471 | 3 | const auto& key = args[1]; |
472 | 3 | op->mutable_request()->mutable_del_request(); // Allocates new RedisDelRequestPB(). |
473 | 3 | op->mutable_request()->mutable_key_value()->set_key(key.cdata(), key.size()); |
474 | | // We should be able to delete all types of top level keys |
475 | 3 | op->mutable_request()->mutable_key_value()->set_type(REDIS_TYPE_NONE); |
476 | 3 | return Status::OK(); |
477 | 3 | } |
478 | | |
479 | 0 | CHECKED_STATUS ParseSetRange(YBRedisWriteOp* op, const RedisClientCommand& args) { |
480 | 0 | const auto& key = args[1]; |
481 | 0 | const auto& value = args[3]; |
482 | 0 | op->mutable_request()->mutable_key_value()->set_key(key.cdata(), key.size()); |
483 | 0 | op->mutable_request()->mutable_key_value()->add_value(value.cdata(), value.size()); |
484 | |
|
485 | 0 | auto offset = ParseInt32(args[2], "offset"); |
486 | 0 | RETURN_NOT_OK(offset); |
487 | | // TODO: Should we have an upper bound? |
488 | | // A very large offset would allocate a lot of memory and maybe crash |
489 | 0 | if (*offset < 0) { |
490 | 0 | return STATUS_SUBSTITUTE(InvalidArgument, |
491 | 0 | "offset field of SETRANGE must be non-negative, found: $0", *offset); |
492 | 0 | } |
493 | 0 | op->mutable_request()->mutable_set_range_request()->set_offset(*offset); |
494 | |
|
495 | 0 | return Status::OK(); |
496 | 0 | } |
497 | | |
498 | 0 | CHECKED_STATUS ParseIncr(YBRedisWriteOp* op, const RedisClientCommand& args) { |
499 | 0 | const auto& key = args[1]; |
500 | 0 | op->mutable_request()->mutable_incr_request()->set_increment_int(1); |
501 | 0 | op->mutable_request()->mutable_key_value()->set_key(key.cdata(), key.size()); |
502 | 0 | op->mutable_request()->mutable_key_value()->set_type(REDIS_TYPE_STRING); |
503 | 0 | return Status::OK(); |
504 | 0 | } |
505 | | |
506 | 0 | CHECKED_STATUS ParseIncrBy(YBRedisWriteOp* op, const RedisClientCommand& args) { |
507 | 0 | const auto& key = args[1]; |
508 | 0 | const auto& incr_by = ParseInt64(args[2], "INCR_BY"); |
509 | 0 | RETURN_NOT_OK(incr_by); |
510 | 0 | op->mutable_request()->mutable_incr_request()->set_increment_int(*incr_by); |
511 | 0 | op->mutable_request()->mutable_key_value()->set_key(key.cdata(), key.size()); |
512 | 0 | op->mutable_request()->mutable_key_value()->set_type(REDIS_TYPE_STRING); |
513 | 0 | return Status::OK(); |
514 | 0 | } |
515 | | |
516 | 36.4k | CHECKED_STATUS ParseGet(YBRedisReadOp* op, const RedisClientCommand& args) { |
517 | 36.4k | const auto& key = args[1]; |
518 | 36.4k | if (key.empty()) { |
519 | 0 | return STATUS_SUBSTITUTE(InvalidCommand, |
520 | 0 | "A GET request must have non empty key field"); |
521 | 0 | } |
522 | 36.4k | op->mutable_request()->mutable_key_value()->set_key(key.cdata(), key.size()); |
523 | 36.4k | op->mutable_request()->mutable_get_request()->set_request_type( |
524 | 36.4k | RedisGetRequestPB_GetRequestType_GET); |
525 | 36.4k | return Status::OK(); |
526 | 36.4k | } |
527 | | |
528 | | // Used for HGET/HSTRLEN/HEXISTS. Also for HMGet |
529 | | // CMD <KEY> [<SUB-KEY>]* |
530 | | CHECKED_STATUS ParseHGetLikeCommands(YBRedisReadOp* op, const RedisClientCommand& args, |
531 | | RedisGetRequestPB_GetRequestType request_type, |
532 | 18 | bool remove_duplicates = false) { |
533 | 18 | op->mutable_request()->mutable_get_request()->set_request_type(request_type); |
534 | 18 | return ParseCollection(op, args, boost::none, add_string_subkey, remove_duplicates); |
535 | 18 | } |
536 | | |
537 | | // TODO: Support MGET |
538 | 0 | CHECKED_STATUS ParseMGet(YBRedisReadOp* op, const RedisClientCommand& args) { |
539 | 0 | return STATUS(InvalidCommand, "MGET command not yet supported"); |
540 | 0 | } |
541 | | |
542 | 0 | CHECKED_STATUS ParseHGet(YBRedisReadOp* op, const RedisClientCommand& args) { |
543 | 0 | return ParseHGetLikeCommands(op, args, RedisGetRequestPB_GetRequestType_HGET); |
544 | 0 | } |
545 | | |
546 | | CHECKED_STATUS ParseTsBoundArg(const Slice& slice, RedisSubKeyBoundPB* bound_pb, |
547 | | RedisCollectionGetRangeRequestPB::GetRangeRequestType request_type, |
548 | 10.9k | bool exclusive) { |
549 | 10.9k | string bound(slice.cdata(), slice.size()); |
550 | 10.9k | if (bound == kPositiveInfinity) { |
551 | 39 | bound_pb->set_infinity_type(RedisSubKeyBoundPB_InfinityType_POSITIVE); |
552 | 10.8k | } else if (bound == kNegativeInfinity) { |
553 | 39 | bound_pb->set_infinity_type(RedisSubKeyBoundPB_InfinityType_NEGATIVE); |
554 | 10.8k | } else { |
555 | 10.8k | bound_pb->set_is_exclusive(exclusive); |
556 | 10.8k | switch (request_type) { |
557 | 10.8k | case RedisCollectionGetRangeRequestPB_GetRangeRequestType_TSREVRANGEBYTIME: |
558 | 10.8k | FALLTHROUGH_INTENDED; |
559 | 10.8k | case RedisCollectionGetRangeRequestPB_GetRangeRequestType_TSRANGEBYTIME: { |
560 | 10.8k | auto ts_bound = CheckedStoll(slice); |
561 | 10.8k | RETURN_NOT_OK(ts_bound); |
562 | 10.8k | bound_pb->mutable_subkey_bound()->set_timestamp_subkey(*ts_bound); |
563 | 10.8k | break; |
564 | 10.8k | } |
565 | 12 | case RedisCollectionGetRangeRequestPB_GetRangeRequestType_ZRANGEBYSCORE: { |
566 | 12 | auto double_bound = CheckedStold(slice); |
567 | 12 | RETURN_NOT_OK(double_bound); |
568 | 12 | bound_pb->mutable_subkey_bound()->set_double_subkey(*double_bound); |
569 | 12 | break; |
570 | 12 | } |
571 | 0 | default: |
572 | 0 | return STATUS_SUBSTITUTE(InvalidArgument, "Invalid request type: $0", request_type); |
573 | 10.9k | } |
574 | | |
575 | 10.9k | } |
576 | 10.9k | return Status::OK(); |
577 | 10.9k | } |
578 | | |
579 | | CHECKED_STATUS |
580 | 0 | ParseIndexBoundArg(const Slice& slice, RedisIndexBoundPB* bound_pb) { |
581 | 0 | auto index_bound = CheckedStoll(slice); |
582 | 0 | RETURN_NOT_OK(index_bound); |
583 | 0 | bound_pb->set_index(*index_bound); |
584 | 0 | return Status::OK(); |
585 | 0 | } |
586 | | |
587 | | CHECKED_STATUS |
588 | | ParseTsSubKeyBound(const Slice& slice, RedisSubKeyBoundPB* bound_pb, |
589 | 10.9k | RedisCollectionGetRangeRequestPB::GetRangeRequestType request_type) { |
590 | 10.9k | if (slice.empty()) { |
591 | 0 | return STATUS(InvalidCommand, "range bound key cannot be empty"); |
592 | 0 | } |
593 | | |
594 | 10.9k | if (slice[0] == '(' && slice.size() > 1) { |
595 | 1.80k | auto slice_copy = slice; |
596 | 1.80k | slice_copy.remove_prefix(1); |
597 | 1.80k | RETURN_NOT_OK(ParseTsBoundArg(slice_copy, bound_pb, request_type, /* exclusive */ true)); |
598 | 9.12k | } else { |
599 | 9.12k | RETURN_NOT_OK(ParseTsBoundArg(slice, bound_pb, request_type, /* exclusive */ false)); |
600 | 9.12k | } |
601 | 10.9k | return Status::OK(); |
602 | 10.9k | } |
603 | | |
604 | 0 | CHECKED_STATUS ParseIndexBound(const Slice& slice, RedisIndexBoundPB* bound_pb) { |
605 | 0 | if (slice.empty()) { |
606 | 0 | return STATUS(InvalidArgument, "range bound index cannot be empty"); |
607 | 0 | } |
608 | 0 | RETURN_NOT_OK(ParseIndexBoundArg(slice, bound_pb)); |
609 | 0 | return Status::OK(); |
610 | 0 | } |
611 | | |
612 | 18 | CHECKED_STATUS ParseTsCard(YBRedisReadOp* op, const RedisClientCommand& args) { |
613 | 18 | return ParseHGetLikeCommands(op, args, RedisGetRequestPB_GetRequestType_TSCARD); |
614 | 18 | } |
615 | | |
616 | 18 | CHECKED_STATUS ParseTsLastN(YBRedisReadOp* op, const RedisClientCommand& args) { |
617 | | // TSLastN is basically TSRangeByTime -INF, INF with a limit on number of entries. Note that |
618 | | // there is a subtle difference here since TSRangeByTime iterates on entries from highest to |
619 | | // lowest and hence we end up returning the highest N entries. This operation is more like |
620 | | // TSRevRangeByTime -INF, INF with a limit (Note that TSRevRangeByTime is not implemented). |
621 | 18 | op->mutable_request()->mutable_get_collection_range_request()->set_request_type( |
622 | 18 | RedisCollectionGetRangeRequestPB_GetRangeRequestType_TSRANGEBYTIME); |
623 | 18 | const auto& key = args[1]; |
624 | 18 | auto limit = ParseInt32(args[2], "limit"); |
625 | 18 | RETURN_NOT_OK(limit); |
626 | 18 | if ((*limit) <= 0) { |
627 | 0 | return STATUS_SUBSTITUTE(InvalidArgument, |
628 | 0 | "$0 field $1 is not within valid bounds", "limit", args[2].ToDebugString()); |
629 | 0 | } |
630 | 18 | op->mutable_request()->mutable_key_value()->set_key(key.ToBuffer()); |
631 | 18 | op->mutable_request()->set_range_request_limit(*limit); |
632 | 18 | op->mutable_request()->mutable_subkey_range()->mutable_lower_bound()->set_infinity_type |
633 | 18 | (RedisSubKeyBoundPB_InfinityType_NEGATIVE); |
634 | 18 | op->mutable_request()->mutable_subkey_range()->mutable_upper_bound()->set_infinity_type |
635 | 18 | (RedisSubKeyBoundPB_InfinityType_POSITIVE); |
636 | 18 | return Status::OK(); |
637 | 18 | } |
638 | | |
639 | 15 | CHECKED_STATUS ParseTsRangeByTime(YBRedisReadOp* op, const RedisClientCommand& args) { |
640 | 15 | op->mutable_request()->mutable_get_collection_range_request()->set_request_type( |
641 | 15 | RedisCollectionGetRangeRequestPB_GetRangeRequestType_TSRANGEBYTIME); |
642 | | |
643 | 15 | const auto& key = args[1]; |
644 | 15 | RETURN_NOT_OK(ParseTsSubKeyBound( |
645 | 15 | args[2], |
646 | 15 | op->mutable_request()->mutable_subkey_range()->mutable_lower_bound(), |
647 | 15 | RedisCollectionGetRangeRequestPB_GetRangeRequestType_TSRANGEBYTIME)); |
648 | 12 | RETURN_NOT_OK(ParseTsSubKeyBound( |
649 | 12 | args[3], |
650 | 12 | op->mutable_request()->mutable_subkey_range()->mutable_upper_bound(), |
651 | 12 | RedisCollectionGetRangeRequestPB_GetRangeRequestType_TSRANGEBYTIME)); |
652 | | |
653 | 12 | op->mutable_request()->mutable_key_value()->set_key(key.ToBuffer()); |
654 | 12 | return Status::OK(); |
655 | 12 | } |
656 | | |
657 | 5.40k | CHECKED_STATUS ParseTsRevRangeByTime(YBRedisReadOp* op, const RedisClientCommand& args) { |
658 | 5.40k | op->mutable_request()->mutable_get_collection_range_request()->set_request_type( |
659 | 5.40k | RedisCollectionGetRangeRequestPB_GetRangeRequestType_TSREVRANGEBYTIME); |
660 | | |
661 | 5.40k | const auto& key = args[1]; |
662 | 5.40k | RETURN_NOT_OK(ParseTsSubKeyBound( |
663 | 5.40k | args[2], |
664 | 5.40k | op->mutable_request()->mutable_subkey_range()->mutable_lower_bound(), |
665 | 5.40k | RedisCollectionGetRangeRequestPB_GetRangeRequestType_TSREVRANGEBYTIME)); |
666 | 5.40k | RETURN_NOT_OK(ParseTsSubKeyBound( |
667 | 5.40k | args[3], |
668 | 5.40k | op->mutable_request()->mutable_subkey_range()->mutable_upper_bound(), |
669 | 5.40k | RedisCollectionGetRangeRequestPB_GetRangeRequestType_TSREVRANGEBYTIME)); |
670 | | |
671 | 5.40k | op->mutable_request()->mutable_key_value()->set_key(key.ToBuffer()); |
672 | | |
673 | 5.40k | if (args.size() > 4) { |
674 | 2.70k | if (args.size() != 6) { |
675 | 0 | return STATUS_SUBSTITUTE(InvalidCommand, |
676 | 0 | "Invalid number of arguments. Command should have 4 or 6 arguments"); |
677 | 0 | } |
678 | 2.70k | string upper_arg; |
679 | 2.70k | ToUpperCase(args[4].ToBuffer(), &upper_arg); |
680 | 2.70k | if (upper_arg != "LIMIT") { |
681 | 0 | return STATUS_SUBSTITUTE(InvalidArgument, |
682 | 0 | "Invalid argument $0. Expecting $1", args[4].ToBuffer(), "limit"); |
683 | 0 | } |
684 | 2.70k | auto limit = ParseInt32(args[5], "limit"); |
685 | 2.70k | RETURN_NOT_OK(limit); |
686 | 2.70k | if ((*limit) <= 0) { |
687 | 0 | return STATUS_SUBSTITUTE(InvalidArgument, |
688 | 0 | "$0 field $1 is not within valid bounds", "limit", |
689 | 0 | args[5].ToDebugString()); |
690 | 0 | } |
691 | 2.70k | op->mutable_request()->set_range_request_limit(*limit); |
692 | 2.70k | } |
693 | | |
694 | 5.40k | return Status::OK(); |
695 | 5.40k | } |
696 | | |
697 | 0 | CHECKED_STATUS ParseWithScores(const Slice& slice, RedisCollectionGetRangeRequestPB* request) { |
698 | 0 | if(!boost::iequals(slice.ToBuffer(), kWithScores)) { |
699 | 0 | return STATUS_SUBSTITUTE(InvalidArgument, "unexpected argument $0", slice.ToBuffer()); |
700 | 0 | } |
701 | 0 | request->set_with_scores(true); |
702 | 0 | return Status::OK(); |
703 | 0 | } |
704 | | |
705 | 42 | CHECKED_STATUS ParseRangeByScoreOptions(YBRedisReadOp* op, const RedisClientCommand& args) { |
706 | 42 | auto args_size = args.size(); |
707 | 72 | for (size_t i = 4; i < args_size; ++i) { |
708 | 30 | string upper_arg; |
709 | 30 | ToUpperCase(args[i].ToBuffer(), &upper_arg); |
710 | 30 | if (upper_arg == "LIMIT") { |
711 | 21 | if (i >= args_size - 2) { |
712 | 0 | return STATUS_SUBSTITUTE(InvalidArgument, "Not enough args passed into LIMIT clause, " |
713 | 0 | "expected 2 but found $0", args_size - (i + 1)); |
714 | 0 | } |
715 | 21 | auto offset = VERIFY_RESULT(ParseInt64(args[++i], "offset")); |
716 | 21 | auto limit = VERIFY_RESULT(ParseInt32(args[++i], "count")); |
717 | 21 | op->mutable_request()->mutable_index_range()->mutable_lower_bound()->set_index(offset); |
718 | 21 | op->mutable_request()->set_range_request_limit(limit); |
719 | 9 | } else if (upper_arg == "WITHSCORES") { |
720 | 9 | op->mutable_request()->mutable_get_collection_range_request()->set_with_scores(true); |
721 | 0 | } else { |
722 | 0 | return STATUS_SUBSTITUTE(InvalidArgument, "Invalid argument $0", args[i].ToBuffer()); |
723 | 0 | } |
724 | 30 | } |
725 | 42 | return Status::OK(); |
726 | 42 | } |
727 | | |
728 | 42 | CHECKED_STATUS ParseZRangeByScore(YBRedisReadOp* op, const RedisClientCommand& args) { |
729 | 42 | op->mutable_request()->mutable_get_collection_range_request()->set_request_type( |
730 | 42 | RedisCollectionGetRangeRequestPB_GetRangeRequestType_ZRANGEBYSCORE); |
731 | | |
732 | 42 | const auto& key = args[1]; |
733 | 42 | RETURN_NOT_OK(ParseTsSubKeyBound( |
734 | 42 | args[2], |
735 | 42 | op->mutable_request()->mutable_subkey_range()->mutable_lower_bound(), |
736 | 42 | RedisCollectionGetRangeRequestPB_GetRangeRequestType_ZRANGEBYSCORE)); |
737 | 42 | RETURN_NOT_OK(ParseTsSubKeyBound( |
738 | 42 | args[3], |
739 | 42 | op->mutable_request()->mutable_subkey_range()->mutable_upper_bound(), |
740 | 42 | RedisCollectionGetRangeRequestPB_GetRangeRequestType_ZRANGEBYSCORE)); |
741 | 42 | op->mutable_request()->mutable_key_value()->set_key(key.ToBuffer()); |
742 | | |
743 | 42 | return ParseRangeByScoreOptions(op, args); |
744 | 42 | } |
745 | | |
746 | | CHECKED_STATUS ParseIndexBasedQuery( |
747 | | YBRedisReadOp* op, |
748 | | const RedisClientCommand& args, |
749 | 0 | RedisCollectionGetRangeRequestPB::GetRangeRequestType request_type) { |
750 | 0 | if (args.size() <= 5) { |
751 | 0 | op->mutable_request()->mutable_get_collection_range_request()->set_request_type(request_type); |
752 | |
|
753 | 0 | const auto& key = args[1]; |
754 | 0 | RETURN_NOT_OK(ParseIndexBound( |
755 | 0 | args[2], |
756 | 0 | op->mutable_request()->mutable_index_range()->mutable_lower_bound())); |
757 | 0 | RETURN_NOT_OK(ParseIndexBound( |
758 | 0 | args[3], |
759 | 0 | op->mutable_request()->mutable_index_range()->mutable_upper_bound())); |
760 | 0 | op->mutable_request()->mutable_key_value()->set_key(key.ToBuffer()); |
761 | 0 | if (args.size() == 5) { |
762 | 0 | RETURN_NOT_OK(ParseWithScores( |
763 | 0 | args[4], |
764 | 0 | op->mutable_request()->mutable_get_collection_range_request())); |
765 | 0 | } |
766 | 0 | return Status::OK(); |
767 | 0 | } |
768 | 0 | return STATUS(InvalidArgument, "Expected at most 5 arguments, found $0", |
769 | 0 | std::to_string(args.size())); |
770 | 0 | } |
771 | | |
772 | 0 | CHECKED_STATUS ParseZRange(YBRedisReadOp* op, const RedisClientCommand& args) { |
773 | 0 | return ParseIndexBasedQuery( |
774 | 0 | op, args, RedisCollectionGetRangeRequestPB_GetRangeRequestType_ZRANGE); |
775 | 0 | } |
776 | | |
777 | 0 | CHECKED_STATUS ParseZRevRange(YBRedisReadOp* op, const RedisClientCommand& args) { |
778 | 0 | return ParseIndexBasedQuery( |
779 | 0 | op, args, RedisCollectionGetRangeRequestPB_GetRangeRequestType_ZREVRANGE); |
780 | 0 | } |
781 | | |
782 | 609 | CHECKED_STATUS ParseTsGet(YBRedisReadOp* op, const RedisClientCommand& args) { |
783 | 609 | op->mutable_request()->mutable_get_request()->set_request_type( |
784 | 609 | RedisGetRequestPB_GetRequestType_TSGET); |
785 | | |
786 | 609 | const auto& key = args[1]; |
787 | 609 | op->mutable_request()->mutable_key_value()->set_key(key.cdata(), key.size()); |
788 | 609 | auto timestamp = CheckedStoll(args[2]); |
789 | 609 | RETURN_NOT_OK(timestamp); |
790 | 609 | op->mutable_request()->mutable_key_value()->add_subkey()->set_timestamp_subkey(*timestamp); |
791 | | |
792 | 609 | return Status::OK(); |
793 | 609 | } |
794 | | |
795 | 21 | CHECKED_STATUS ParseZScore(YBRedisReadOp* op, const RedisClientCommand& args) { |
796 | 21 | op->mutable_request()->mutable_get_request()->set_request_type( |
797 | 21 | RedisGetRequestPB_GetRequestType_ZSCORE); |
798 | | |
799 | 21 | const auto& key = args[1]; |
800 | 21 | op->mutable_request()->mutable_key_value()->set_key(key.cdata(), key.size()); |
801 | 21 | auto member = args[2].ToBuffer(); |
802 | 21 | op->mutable_request()->mutable_key_value()->add_subkey()->set_string_subkey(member); |
803 | | |
804 | 21 | return Status::OK(); |
805 | 21 | } |
806 | | |
807 | 0 | CHECKED_STATUS ParseHStrLen(YBRedisReadOp* op, const RedisClientCommand& args) { |
808 | 0 | return ParseHGetLikeCommands(op, args, RedisGetRequestPB_GetRequestType_HSTRLEN); |
809 | 0 | } |
810 | | |
811 | 0 | CHECKED_STATUS ParseHExists(YBRedisReadOp* op, const RedisClientCommand& args) { |
812 | 0 | return ParseHGetLikeCommands(op, args, RedisGetRequestPB_GetRequestType_HEXISTS); |
813 | 0 | } |
814 | | |
815 | 0 | CHECKED_STATUS ParseHMGet(YBRedisReadOp* op, const RedisClientCommand& args) { |
816 | 0 | return ParseHGetLikeCommands(op, args, RedisGetRequestPB_GetRequestType_HMGET); |
817 | 0 | } |
818 | | |
819 | 0 | CHECKED_STATUS ParseHGetAll(YBRedisReadOp* op, const RedisClientCommand& args) { |
820 | 0 | return ParseHGetLikeCommands(op, args, RedisGetRequestPB_GetRequestType_HGETALL); |
821 | 0 | } |
822 | | |
823 | 0 | CHECKED_STATUS ParseHKeys(YBRedisReadOp* op, const RedisClientCommand& args) { |
824 | 0 | return ParseHGetLikeCommands(op, args, RedisGetRequestPB_GetRequestType_HKEYS); |
825 | 0 | } |
826 | | |
827 | 0 | CHECKED_STATUS ParseHVals(YBRedisReadOp* op, const RedisClientCommand& args) { |
828 | 0 | return ParseHGetLikeCommands(op, args, RedisGetRequestPB_GetRequestType_HVALS); |
829 | 0 | } |
830 | | |
831 | 0 | CHECKED_STATUS ParseHLen(YBRedisReadOp* op, const RedisClientCommand& args) { |
832 | 0 | return ParseHGetLikeCommands(op, args, RedisGetRequestPB_GetRequestType_HLEN); |
833 | 0 | } |
834 | | |
835 | 0 | CHECKED_STATUS ParseSMembers(YBRedisReadOp* op, const RedisClientCommand& args) { |
836 | 0 | return ParseHGetLikeCommands(op, args, RedisGetRequestPB_GetRequestType_SMEMBERS); |
837 | 0 | } |
838 | | |
839 | 0 | CHECKED_STATUS ParseSIsMember(YBRedisReadOp* op, const RedisClientCommand& args) { |
840 | 0 | return ParseHGetLikeCommands(op, args, RedisGetRequestPB_GetRequestType_SISMEMBER); |
841 | 0 | } |
842 | | |
843 | 0 | CHECKED_STATUS ParseSCard(YBRedisReadOp* op, const RedisClientCommand& args) { |
844 | 0 | return ParseHGetLikeCommands(op, args, RedisGetRequestPB_GetRequestType_SCARD); |
845 | 0 | } |
846 | | |
847 | 0 | CHECKED_STATUS ParseLLen(YBRedisReadOp* op, const RedisClientCommand& args) { |
848 | 0 | return ParseHGetLikeCommands(op, args, RedisGetRequestPB_GetRequestType_LLEN); |
849 | 0 | } |
850 | | |
851 | 0 | CHECKED_STATUS ParseZCard(YBRedisReadOp* op, const RedisClientCommand& args) { |
852 | 0 | return ParseHGetLikeCommands(op, args, RedisGetRequestPB_GetRequestType_ZCARD); |
853 | 0 | } |
854 | | |
855 | 0 | CHECKED_STATUS ParseStrLen(YBRedisReadOp* op, const RedisClientCommand& args) { |
856 | 0 | op->mutable_request()->mutable_strlen_request(); // Allocates new RedisStrLenRequestPB(). |
857 | 0 | const auto& key = args[1]; |
858 | 0 | op->mutable_request()->mutable_key_value()->set_key(key.cdata(), key.size()); |
859 | 0 | return Status::OK(); |
860 | 0 | } |
861 | | |
862 | | // Note: Checking existence of only one key is supported as of now. |
863 | 0 | CHECKED_STATUS ParseExists(YBRedisReadOp* op, const RedisClientCommand& args) { |
864 | 0 | op->mutable_request()->mutable_exists_request(); // Allocates new RedisExistsRequestPB(). |
865 | 0 | const auto& key = args[1]; |
866 | 0 | op->mutable_request()->mutable_key_value()->set_key(key.cdata(), key.size()); |
867 | 0 | return Status::OK(); |
868 | 0 | } |
869 | | |
870 | 0 | CHECKED_STATUS ParseGetRange(YBRedisReadOp* op, const RedisClientCommand& args) { |
871 | 0 | const auto& key = args[1]; |
872 | 0 | op->mutable_request()->mutable_key_value()->set_key(key.cdata(), key.size()); |
873 | |
|
874 | 0 | auto start = ParseInt32(args[2], "Start"); |
875 | 0 | RETURN_NOT_OK(start); |
876 | 0 | op->mutable_request()->mutable_get_range_request()->set_start(*start); |
877 | |
|
878 | 0 | auto end = ParseInt32(args[3], "End"); |
879 | 0 | RETURN_NOT_OK(end); |
880 | 0 | op->mutable_request()->mutable_get_range_request()->set_end(*end); |
881 | |
|
882 | 0 | return Status::OK(); |
883 | 0 | } |
884 | | |
885 | | CHECKED_STATUS ParseExpire(YBRedisWriteOp* op, |
886 | | const RedisClientCommand& args, |
887 | 42 | const bool using_millis) { |
888 | 42 | const auto& key = args[1]; |
889 | 42 | auto ttl = ParseInt64(args[2], "TTL"); |
890 | 42 | RETURN_NOT_OK(ttl); |
891 | | // If the TTL is not positive, we immediately delete. |
892 | 42 | if (*ttl <= 0) { |
893 | 6 | op->mutable_request()->mutable_del_request(); // Allocates new RedisDelRequestPB(). |
894 | 6 | op->mutable_request()->mutable_key_value()->set_key(key.cdata(), key.size()); |
895 | 6 | op->mutable_request()->mutable_key_value()->set_type(REDIS_TYPE_NONE); |
896 | 6 | return Status::OK(); |
897 | 6 | } |
898 | 36 | *ttl *= using_millis ? 1 : MonoTime::kMillisecondsPerSecond; |
899 | 36 | if (*ttl < kRedisMinTtlMillis || *ttl > kRedisMaxTtlMillis) { |
900 | 0 | return STATUS_FORMAT(InvalidCommand, |
901 | 0 | "TTL field $0 is not within valid bounds", args[2]); |
902 | 0 | } |
903 | 36 | op->mutable_request()->mutable_key_value()->set_key(key.cdata(), key.size()); |
904 | 36 | op->mutable_request()->mutable_set_ttl_request()->set_ttl(*ttl); |
905 | 36 | return Status::OK(); |
906 | 36 | } |
907 | | |
908 | 39 | CHECKED_STATUS ParseExpire(YBRedisWriteOp* op, const RedisClientCommand& args) { |
909 | 39 | return ParseExpire(op, args, false); |
910 | 39 | } |
911 | | |
912 | 3 | CHECKED_STATUS ParsePExpire(YBRedisWriteOp* op, const RedisClientCommand& args) { |
913 | 3 | return ParseExpire(op, args, true); |
914 | 3 | } |
915 | | |
916 | 12 | CHECKED_STATUS ParsePersist(YBRedisWriteOp* op, const RedisClientCommand& args) { |
917 | 12 | const auto& key = args[1]; |
918 | 12 | op->mutable_request()->mutable_key_value()->set_key(key.cdata(), key.size()); |
919 | 12 | op->mutable_request()->mutable_set_ttl_request()->set_ttl(-1); |
920 | 12 | return Status::OK(); |
921 | 12 | } |
922 | | |
923 | | CHECKED_STATUS ParseExpireAt(YBRedisWriteOp* op, |
924 | | const RedisClientCommand& args, |
925 | 0 | const bool using_millis) { |
926 | 0 | const auto& key = args[1]; |
927 | 0 | auto expiration = VERIFY_RESULT(ParseInt64(args[2], "expiration")); |
928 | | // If the TTL is not positive, we immediately delete. |
929 | 0 | if (expiration <= 0) { |
930 | 0 | op->mutable_request()->mutable_del_request(); // Allocates new RedisDelRequestPB(). |
931 | 0 | op->mutable_request()->mutable_key_value()->set_key(key.cdata(), key.size()); |
932 | 0 | op->mutable_request()->mutable_key_value()->set_type(REDIS_TYPE_NONE); |
933 | 0 | return Status::OK(); |
934 | 0 | } |
935 | 0 | expiration *= using_millis ? 1 : MonoTime::kMillisecondsPerSecond; |
936 | 0 | op->mutable_request()->mutable_key_value()->set_key(key.cdata(), key.size()); |
937 | 0 | op->mutable_request()->mutable_set_ttl_request()->set_absolute_time(expiration); |
938 | 0 | return Status::OK(); |
939 | 0 | } |
940 | | |
941 | 0 | CHECKED_STATUS ParseExpireAt(YBRedisWriteOp* op, const RedisClientCommand& args) { |
942 | 0 | return ParseExpireAt(op, args, /* using_millis */ false); |
943 | 0 | } |
944 | | |
945 | 0 | CHECKED_STATUS ParsePExpireAt(YBRedisWriteOp* op, const RedisClientCommand& args) { |
946 | 0 | return ParseExpireAt(op, args, /* using_millis */ true); |
947 | 0 | } |
948 | | |
949 | | CHECKED_STATUS ParseSetEx(YBRedisWriteOp* op, |
950 | | const RedisClientCommand& args, |
951 | 9 | const bool using_millis) { |
952 | 9 | const auto& key = args[1]; |
953 | 9 | const auto value = args[3]; |
954 | 9 | auto ttl = VERIFY_RESULT(ParseInt64(args[2], "TTL")); |
955 | 9 | if (ttl <= 0) { |
956 | 3 | op->mutable_request()->mutable_no_op_request(); // Allocates new RedisNoOpRequestPB(). |
957 | 3 | return Status::OK(); |
958 | 3 | } |
959 | 6 | ttl *= using_millis ? 1 : MonoTime::kMillisecondsPerSecond; |
960 | 6 | if (ttl < kRedisMinTtlMillis || ttl > kRedisMaxTtlMillis) { |
961 | 0 | return STATUS_FORMAT(InvalidCommand, |
962 | 0 | "TTL field $0 is not within valid bounds", args[3]); |
963 | 0 | } |
964 | 6 | op->mutable_request()->mutable_key_value()->set_key(key.cdata(), key.size()); |
965 | 6 | op->mutable_request()->mutable_key_value()->add_value(value.cdata(), value.size()); |
966 | 6 | op->mutable_request()->mutable_key_value()->set_type(REDIS_TYPE_STRING); |
967 | 6 | op->mutable_request()->mutable_set_request()->set_ttl(ttl); |
968 | 6 | return Status::OK(); |
969 | 6 | } |
970 | | |
971 | 6 | CHECKED_STATUS ParseSetEx(YBRedisWriteOp* op, const RedisClientCommand& args) { |
972 | 6 | return ParseSetEx(op, args, false); |
973 | 6 | } |
974 | | |
975 | 3 | CHECKED_STATUS ParsePSetEx(YBRedisWriteOp* op, const RedisClientCommand& args) { |
976 | 3 | return ParseSetEx(op, args, true); |
977 | 3 | } |
978 | | |
979 | | CHECKED_STATUS ParseTtl(YBRedisReadOp* op, |
980 | | const RedisClientCommand& args, |
981 | 132 | const bool return_seconds) { |
982 | 132 | const auto& key = args[1]; |
983 | 132 | op->mutable_request()->mutable_key_value()->set_key(key.cdata(), key.size()); |
984 | 132 | op->mutable_request()->mutable_get_ttl_request()->set_return_seconds(return_seconds); |
985 | 132 | return Status::OK(); |
986 | 132 | } |
987 | | |
988 | 66 | CHECKED_STATUS ParseTtl(YBRedisReadOp* op, const RedisClientCommand& args) { |
989 | 66 | return ParseTtl(op, args, true); |
990 | 66 | } |
991 | | |
992 | 66 | CHECKED_STATUS ParsePTtl(YBRedisReadOp* op, const RedisClientCommand& args) { |
993 | 66 | return ParseTtl(op, args, false); |
994 | 66 | } |
995 | | |
996 | | // Begin of input is going to be consumed, so we should adjust our pointers. |
997 | | // Since the beginning of input is being consumed by shifting the remaining bytes to the |
998 | | // beginning of the buffer. |
999 | 104k | void RedisParser::Consume(size_t count) { |
1000 | 104k | pos_ -= count; |
1001 | | |
1002 | 104k | if (token_begin_ != kNoToken) { |
1003 | 104k | token_begin_ -= count; |
1004 | 104k | } |
1005 | 104k | } |
1006 | | |
1007 | | // New data arrived, so update the end of available bytes. |
1008 | 104k | void RedisParser::Update(const IoVecs& data) { |
1009 | 104k | source_ = data; |
1010 | 104k | full_size_ = IoVecsFullSize(data); |
1011 | 104k | DCHECK_LE(pos_, full_size_); |
1012 | 104k | } |
1013 | | |
1014 | | // Parse next command. |
1015 | 314k | Result<size_t> RedisParser::NextCommand() { |
1016 | 2.03M | while (pos_ != full_size_) { |
1017 | 1.92M | incomplete_ = false; |
1018 | 1.92M | Status status = AdvanceToNextToken(); |
1019 | 1.92M | if (!status.ok()) { |
1020 | 2 | return status; |
1021 | 2 | } |
1022 | 1.92M | if (incomplete_) { |
1023 | 55 | pos_ = full_size_; |
1024 | 55 | return 0; |
1025 | 55 | } |
1026 | 1.92M | if (state_ == State::FINISHED) { |
1027 | 209k | state_ = State::INITIAL; |
1028 | 209k | return pos_; |
1029 | 209k | } |
1030 | 1.92M | } |
1031 | 104k | return 0; |
1032 | 314k | } |
1033 | | |
1034 | 1.92M | CHECKED_STATUS RedisParser::AdvanceToNextToken() { |
1035 | 1.92M | switch (state_) { |
1036 | 209k | case State::INITIAL: |
1037 | 209k | return Initial(); |
1038 | 2 | case State::SINGLE_LINE: |
1039 | 2 | return SingleLine(); |
1040 | 209k | case State::BULK_HEADER: |
1041 | 209k | return BulkHeader(); |
1042 | 755k | case State::BULK_ARGUMENT_SIZE: |
1043 | 755k | return BulkArgumentSize(); |
1044 | 755k | case State::BULK_ARGUMENT_BODY: |
1045 | 755k | return BulkArgumentBody(); |
1046 | 0 | case State::FINISHED: |
1047 | 0 | return STATUS(IllegalState, "Should not be in FINISHED state during NextToken"); |
1048 | 0 | } |
1049 | 0 | LOG(FATAL) << "Unexpected parser state: " << to_underlying(state_); |
1050 | 0 | } |
1051 | | |
1052 | 209k | CHECKED_STATUS RedisParser::Initial() { |
1053 | 209k | token_begin_ = pos_; |
1054 | 209k | state_ = char_at_offset(pos_) == '*' ? State::BULK_HEADER : State::SINGLE_LINE; |
1055 | 209k | return Status::OK(); |
1056 | 209k | } |
1057 | | |
1058 | 2 | CHECKED_STATUS RedisParser::SingleLine() { |
1059 | 2 | auto status = FindEndOfLine(); |
1060 | 2 | if (!status.ok() || incomplete_) { |
1061 | 2 | return status; |
1062 | 2 | } |
1063 | 0 | auto start = token_begin_; |
1064 | 0 | auto finish = pos_ - 2; |
1065 | 0 | while (start < finish && isspace(char_at_offset(start))) { |
1066 | 0 | ++start; |
1067 | 0 | } |
1068 | 0 | if (start >= finish) { |
1069 | 0 | return STATUS(InvalidArgument, "Empty line"); |
1070 | 0 | } |
1071 | 0 | if (args_) { |
1072 | | // Args is supported only when parsing from single block of data. |
1073 | | // Because we parse prepared call data in this case, that is contained in a single buffer. |
1074 | 0 | DCHECK_EQ(source_.size(), 1); |
1075 | 0 | RETURN_NOT_OK(util::SplitArgs(Slice(offset_to_pointer(start), finish - start), args_)); |
1076 | 0 | } |
1077 | 0 | state_ = State::FINISHED; |
1078 | 0 | return Status::OK(); |
1079 | 0 | } |
1080 | | |
1081 | 209k | CHECKED_STATUS RedisParser::BulkHeader() { |
1082 | 209k | auto status = FindEndOfLine(); |
1083 | 209k | if (!status.ok() || incomplete_) { |
1084 | 0 | return status; |
1085 | 0 | } |
1086 | 209k | auto num_args = VERIFY_RESULT(ParseNumber( |
1087 | 209k | '*', 1, kMaxNumberOfArgs, "Number of lines in multiline")); |
1088 | 209k | if (args_) { |
1089 | 104k | args_->clear(); |
1090 | 104k | args_->reserve(num_args); |
1091 | 104k | } |
1092 | 209k | state_ = State::BULK_ARGUMENT_SIZE; |
1093 | 209k | token_begin_ = pos_; |
1094 | 209k | arguments_left_ = num_args; |
1095 | 209k | return Status::OK(); |
1096 | 209k | } |
1097 | | |
1098 | 755k | CHECKED_STATUS RedisParser::BulkArgumentSize() { |
1099 | 755k | auto status = FindEndOfLine(); |
1100 | 755k | if (!status.ok() || incomplete_) { |
1101 | 7 | return status; |
1102 | 7 | } |
1103 | 755k | auto current_size = VERIFY_RESULT(ParseNumber('$', 0, kMaxRedisValueSize, "Argument size")); |
1104 | 755k | state_ = State::BULK_ARGUMENT_BODY; |
1105 | 755k | token_begin_ = pos_; |
1106 | 755k | current_argument_size_ = current_size; |
1107 | 755k | return Status::OK(); |
1108 | 755k | } |
1109 | | |
1110 | 755k | CHECKED_STATUS RedisParser::BulkArgumentBody() { |
1111 | 755k | auto desired_position = token_begin_ + current_argument_size_ + kLineEndLength; |
1112 | 755k | if (desired_position > full_size_) { |
1113 | 48 | incomplete_ = true; |
1114 | 48 | pos_ = full_size_; |
1115 | 48 | return Status::OK(); |
1116 | 48 | } |
1117 | 755k | if (char_at_offset(desired_position - 1) != '\n' || |
1118 | 755k | char_at_offset(desired_position - 2) != '\r') { |
1119 | 0 | return STATUS(NetworkError, "No \\r\\n after bulk"); |
1120 | 0 | } |
1121 | 755k | if (args_) { |
1122 | | // Args is supported only when parsing from single block of data. |
1123 | | // Because we parse prepared call data in this case, that is contained in a single buffer. |
1124 | 377k | DCHECK_EQ(source_.size(), 1); |
1125 | 377k | args_->emplace_back(offset_to_pointer(token_begin_), current_argument_size_); |
1126 | 377k | } |
1127 | 755k | --arguments_left_; |
1128 | 755k | pos_ = desired_position; |
1129 | 755k | token_begin_ = pos_; |
1130 | 755k | if (arguments_left_ == 0) { |
1131 | 209k | state_ = State::FINISHED; |
1132 | 546k | } else { |
1133 | 546k | state_ = State::BULK_ARGUMENT_SIZE; |
1134 | 546k | } |
1135 | 755k | return Status::OK(); |
1136 | 755k | } |
1137 | | |
1138 | 964k | CHECKED_STATUS RedisParser::FindEndOfLine() { |
1139 | 964k | auto p = offset_to_idx_and_local_offset(pos_); |
1140 | | |
1141 | 964k | size_t new_line_offset = pos_; |
1142 | 964k | while (p.first != source_.size()) { |
1143 | 964k | auto begin = IoVecBegin(source_[p.first]) + p.second; |
1144 | 964k | auto new_line = static_cast<const char*>(memchr( |
1145 | 964k | begin, '\n', IoVecEnd(source_[p.first]) - begin)); |
1146 | 964k | if (new_line) { |
1147 | 964k | new_line_offset += new_line - begin; |
1148 | 964k | break; |
1149 | 964k | } |
1150 | 4 | new_line_offset += source_[p.first].iov_len - p.second; |
1151 | 4 | ++p.first; |
1152 | 4 | p.second = 0; |
1153 | 4 | } |
1154 | | |
1155 | 964k | incomplete_ = p.first == source_.size(); |
1156 | 964k | if (!incomplete_) { |
1157 | 964k | if (new_line_offset == token_begin_) { |
1158 | 0 | return STATUS(NetworkError, "End of line at the beginning of a Redis command"); |
1159 | 0 | } |
1160 | 964k | if (char_at_offset(new_line_offset - 1) != '\r') { |
1161 | 2 | return STATUS(NetworkError, "\\n is not prefixed with \\r"); |
1162 | 2 | } |
1163 | 964k | pos_ = ++new_line_offset; |
1164 | 964k | } |
1165 | 964k | return Status::OK(); |
1166 | 964k | } |
1167 | | |
1168 | 4.99M | std::pair<size_t, size_t> RedisParser::offset_to_idx_and_local_offset(size_t offset) const { |
1169 | | // We assume that there are at most 2 blocks of data. |
1170 | 4.99M | if (offset < source_[0].iov_len) { |
1171 | 4.99M | return std::pair<size_t, size_t>(0, offset); |
1172 | 4.99M | } |
1173 | | |
1174 | 2 | offset -= source_[0].iov_len; |
1175 | 2 | size_t idx = offset / source_[1].iov_len; |
1176 | 2 | offset -= idx * source_[1].iov_len; |
1177 | | |
1178 | 2 | return std::pair<size_t, size_t>(idx + 1, offset); |
1179 | 2 | } |
1180 | | |
1181 | 4.02M | const char* RedisParser::offset_to_pointer(size_t offset) const { |
1182 | 4.02M | auto p = offset_to_idx_and_local_offset(offset); |
1183 | 4.02M | return IoVecBegin(source_[p.first]) + p.second; |
1184 | 4.02M | } |
1185 | | |
1186 | | // Parses number with specified bounds. |
1187 | | // Number is located in separate line, and contain prefix before actual number. |
1188 | | // Line starts at token_begin_ and pos_ is a start of next line. |
1189 | | Result<ptrdiff_t> RedisParser::ParseNumber(char prefix, |
1190 | | ptrdiff_t min, |
1191 | | ptrdiff_t max, |
1192 | 964k | const char* name) { |
1193 | 964k | if (char_at_offset(token_begin_) != prefix) { |
1194 | 0 | return STATUS_FORMAT(Corruption, |
1195 | 0 | "Invalid character before number, expected: $0, but found: $1", |
1196 | 0 | prefix, char_at_offset(token_begin_)); |
1197 | 0 | } |
1198 | 964k | auto number_begin = token_begin_ + 1; |
1199 | 964k | auto expected_stop = pos_ - kLineEndLength; |
1200 | 964k | if (expected_stop - number_begin > kMaxNumberLength) { |
1201 | 0 | return STATUS_FORMAT( |
1202 | 0 | Corruption, "Too long $0 of length $1", name, expected_stop - number_begin); |
1203 | 0 | } |
1204 | 964k | number_buffer_.reserve(kMaxNumberLength); |
1205 | 964k | IoVecsToBuffer(source_, number_begin, expected_stop, &number_buffer_); |
1206 | 964k | number_buffer_.push_back(0); |
1207 | 964k | auto parsed_number = VERIFY_RESULT(CheckedStoll( |
1208 | 964k | Slice(number_buffer_.data(), number_buffer_.size() - 1))); |
1209 | 964k | static_assert(sizeof(parsed_number) == sizeof(ptrdiff_t), "Expected size"); |
1210 | 964k | SCHECK_BOUNDS(parsed_number, |
1211 | 964k | min, |
1212 | 964k | max, |
1213 | 964k | Corruption, |
1214 | 964k | yb::Format("$0 out of expected range [$1, $2] : $3", |
1215 | 964k | name, min, max, parsed_number)); |
1216 | 964k | return static_cast<ptrdiff_t>(parsed_number); |
1217 | 964k | } |
1218 | | |
1219 | 104k | void RedisParser::SetArgs(boost::container::small_vector_base<Slice>* args) { |
1220 | 104k | DCHECK_EQ(source_.size(), 1); |
1221 | 104k | args_ = args; |
1222 | 104k | } |
1223 | | |
1224 | | } // namespace redisserver |
1225 | | } // namespace yb |