/Users/deen/code/yugabyte-db/src/yb/yql/redis/redisserver/redis_rpc.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // |
2 | | // Copyright (c) YugaByte, Inc. |
3 | | // |
4 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
5 | | // in compliance with the License. You may obtain a copy of the License at |
6 | | // |
7 | | // http://www.apache.org/licenses/LICENSE-2.0 |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
10 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
11 | | // or implied. See the License for the specific language governing permissions and limitations |
12 | | // under the License. |
13 | | // |
14 | | // |
15 | | |
16 | | #include "yb/yql/redis/redisserver/redis_rpc.h" |
17 | | |
18 | | #include "yb/client/client_fwd.h" |
19 | | |
20 | | #include "yb/common/redis_protocol.pb.h" |
21 | | |
22 | | #include "yb/rpc/connection.h" |
23 | | #include "yb/rpc/reactor.h" |
24 | | #include "yb/rpc/rpc_introspection.pb.h" |
25 | | |
26 | | #include "yb/util/debug/trace_event.h" |
27 | | #include "yb/util/format.h" |
28 | | #include "yb/util/memory/memory.h" |
29 | | #include "yb/util/metrics.h" |
30 | | #include "yb/util/result.h" |
31 | | #include "yb/util/size_literals.h" |
32 | | #include "yb/util/status_format.h" |
33 | | |
34 | | #include "yb/yql/redis/redisserver/redis_encoding.h" |
35 | | #include "yb/yql/redis/redisserver/redis_parser.h" |
36 | | |
37 | | using namespace std::literals; |
38 | | using namespace std::placeholders; |
39 | | using namespace yb::size_literals; |
40 | | |
41 | | DECLARE_bool(rpc_dump_all_traces); |
42 | | DECLARE_int32(rpc_slow_query_threshold_ms); |
43 | | DEFINE_uint64(redis_max_concurrent_commands, 1, |
44 | | "Max number of redis commands received from single connection, " |
45 | | "that could be processed concurrently"); |
46 | | DEFINE_uint64(redis_max_batch, 500, "Max number of redis commands that forms batch"); |
47 | | DEFINE_int32(rpcz_max_redis_query_dump_size, 4_KB, |
48 | | "The maximum size of the Redis query string in the RPCZ dump."); |
49 | | DEFINE_uint64(redis_max_read_buffer_size, 128_MB, |
50 | | "Max read buffer size for Redis connections."); |
51 | | |
52 | | DEFINE_uint64(redis_max_queued_bytes, 128_MB, |
53 | | "Max number of bytes in queued redis commands."); |
54 | | |
55 | | DEFINE_int32( |
56 | | redis_connection_soft_limit_grace_period_sec, 60, |
57 | | "The duration for which the outbound data needs to exceeed the softlimit " |
58 | | "before the connection gets closed down."); |
59 | | |
60 | | namespace yb { |
61 | | namespace redisserver { |
62 | | |
63 | | RedisConnectionContext::RedisConnectionContext( |
64 | | rpc::GrowableBufferAllocator* allocator, |
65 | | const MemTrackerPtr& call_tracker) |
66 | | : ConnectionContextWithQueue( |
67 | | FLAGS_redis_max_concurrent_commands, FLAGS_redis_max_queued_bytes), |
68 | | read_buffer_(allocator, FLAGS_redis_max_read_buffer_size), |
69 | 1.51k | call_mem_tracker_(call_tracker) {} |
70 | | |
71 | 471 | RedisConnectionContext::~RedisConnectionContext() {} |
72 | | |
73 | | Result<rpc::ProcessCallsResult> RedisConnectionContext::ProcessCalls( |
74 | | const rpc::ConnectionPtr& connection, const IoVecs& data, |
75 | 215k | rpc::ReadBufferFull read_buffer_full) { |
76 | 215k | if (!can_enqueue()) { |
77 | 0 | return rpc::ProcessCallsResult{ |
78 | 0 | .consumed = 0, |
79 | 0 | .buffer = Slice(), |
80 | 0 | }; |
81 | 0 | } |
82 | | |
83 | 215k | if (!parser_) { |
84 | 1.49k | parser_.reset(new RedisParser(data)); |
85 | 213k | } else { |
86 | 213k | parser_->Update(data); |
87 | 213k | } |
88 | 215k | RedisParser& parser = *parser_; |
89 | 215k | size_t begin_of_batch = 0; |
90 | | // Try to parse all received commands to a single RedisInboundCall. |
91 | 424k | for (;;) { |
92 | 424k | auto end_of_command = VERIFY_RESULT424k (parser.NextCommand());424k |
93 | 424k | if (end_of_command == 0) { |
94 | 215k | break; |
95 | 215k | } |
96 | 209k | end_of_batch_ = end_of_command; |
97 | 209k | if (++commands_in_batch_ >= FLAGS_redis_max_batch) { |
98 | 0 | rpc::CallData call_data(end_of_batch_ - begin_of_batch); |
99 | 0 | IoVecsToBuffer(data, begin_of_batch, end_of_batch_, call_data.data()); |
100 | 0 | RETURN_NOT_OK(HandleInboundCall(connection, commands_in_batch_, &call_data)); |
101 | 0 | begin_of_batch = end_of_batch_; |
102 | 0 | commands_in_batch_ = 0; |
103 | 0 | } |
104 | 209k | } |
105 | | // Create call for rest of commands. |
106 | | // Do not form new call if we are in a middle of command. |
107 | | // It means that soon we should receive remaining data for this command and could wait. |
108 | 215k | if (commands_in_batch_ > 0 && (209k end_of_batch_ == IoVecsFullSize(data)209k || read_buffer_full0 )) { |
109 | 209k | rpc::CallData call_data(end_of_batch_ - begin_of_batch); |
110 | 209k | IoVecsToBuffer(data, begin_of_batch, end_of_batch_, call_data.data()); |
111 | 209k | RETURN_NOT_OK(HandleInboundCall(connection, commands_in_batch_, &call_data)); |
112 | 209k | begin_of_batch = end_of_batch_; |
113 | 209k | commands_in_batch_ = 0; |
114 | 209k | } |
115 | 215k | parser.Consume(begin_of_batch); |
116 | 215k | end_of_batch_ -= begin_of_batch; |
117 | 215k | return rpc::ProcessCallsResult{ |
118 | 215k | .consumed = begin_of_batch, |
119 | 215k | .buffer = Slice(), |
120 | 215k | }; |
121 | 215k | } |
122 | | |
123 | | Status RedisConnectionContext::HandleInboundCall(const rpc::ConnectionPtr& connection, |
124 | | size_t commands_in_batch, |
125 | 209k | rpc::CallData* data) { |
126 | 209k | auto reactor = connection->reactor(); |
127 | 209k | DCHECK(reactor->IsCurrentThread()); |
128 | | |
129 | 209k | auto call = rpc::InboundCall::Create<RedisInboundCall>( |
130 | 209k | connection, data->size(), call_processed_listener()); |
131 | | |
132 | 209k | Status s = call->ParseFrom(call_mem_tracker_, commands_in_batch, data); |
133 | 209k | if (!s.ok()) { |
134 | 0 | return s; |
135 | 0 | } |
136 | | |
137 | 209k | Enqueue(std::move(call)); |
138 | | |
139 | 209k | return Status::OK(); |
140 | 209k | } |
141 | | |
142 | 419k | Status RedisConnectionContext::ReportPendingWriteBytes(size_t pending_bytes) { |
143 | 419k | static constexpr size_t kHardLimit = 32_MB; |
144 | 419k | static constexpr size_t kSoftLimit = 8_MB; |
145 | 419k | auto mode = ClientMode(); |
146 | 419k | DVLOG(3) << "Connection in mode " << ToString(mode) << " has " << pending_bytes |
147 | 1 | << " bytes in the queue."; |
148 | 419k | if (mode == RedisClientMode::kNormal) { |
149 | 419k | return Status::OK(); |
150 | 419k | } |
151 | | |
152 | | // We use the same buffering logic for subscribers and monitoring clients. |
153 | | // Close a client if: |
154 | | // 1) it exceeds the hard limit of 32MB. or |
155 | | // 2) it has been over the soft limit of 8MB for longer than 1min. |
156 | 588 | if (pending_bytes > kHardLimit) { |
157 | 2 | LOG(INFO) << "Connection in mode " << ToString(mode) << " has reached the HardLimit. " |
158 | 2 | << pending_bytes << " bytes in the queue."; |
159 | 2 | return STATUS(NetworkError, "Slow Redis Client: HardLimit exceeded."); |
160 | 586 | } else if (pending_bytes > kSoftLimit) { |
161 | 122 | auto now = CoarseMonoClock::Now(); |
162 | 122 | static const CoarseDuration kGracePeriod = |
163 | 122 | std::chrono::seconds(FLAGS_redis_connection_soft_limit_grace_period_sec); |
164 | 122 | if (soft_limit_exceeded_since_ == CoarseTimePoint::max()) { |
165 | 11 | DVLOG(1) << "Connection in mode " << ToString(mode) << " has reached the Softlimit now. " |
166 | 0 | << pending_bytes << " bytes in the queue."; |
167 | 11 | soft_limit_exceeded_since_ = now; |
168 | 111 | } else if (now > soft_limit_exceeded_since_ + kGracePeriod) { |
169 | 1 | LOG(INFO) << "Connection in mode " << ToString(mode) << " has reached the Softlimit > " |
170 | 1 | << yb::ToString(kGracePeriod) << " ago. " << pending_bytes |
171 | 1 | << " bytes in the queue."; |
172 | 1 | return STATUS(NetworkError, "Slow Redis Client: Softlimit exceeded."); |
173 | 110 | } else { |
174 | 110 | DVLOG(1) << "Connection in mode " << ToString(mode) |
175 | 0 | << " has reached the Softlimit less than " << yb::ToString(kGracePeriod) << " ago. " |
176 | 0 | << pending_bytes; |
177 | 110 | } |
178 | 464 | } else { |
179 | 464 | if (soft_limit_exceeded_since_ != CoarseTimePoint::max()) { |
180 | 7 | DVLOG(1) << "Connection in mode " << ToString(mode) << " has dropped below the Softlimit. " |
181 | 0 | << pending_bytes << " bytes in the queue."; |
182 | 7 | soft_limit_exceeded_since_ = CoarseTimePoint::max(); |
183 | 7 | } |
184 | 464 | } |
185 | 585 | return Status::OK(); |
186 | 588 | } |
187 | | |
188 | 945 | void RedisConnectionContext::Shutdown(const Status& status) { |
189 | 945 | if (cleanup_hook_) { |
190 | 83 | cleanup_hook_(); |
191 | 83 | } |
192 | 945 | rpc::ConnectionContextWithQueue::Shutdown(status); |
193 | 945 | } |
194 | | |
195 | | RedisInboundCall::RedisInboundCall(rpc::ConnectionPtr conn, |
196 | | size_t weight_in_bytes, |
197 | | CallProcessedListener call_processed_listener) |
198 | 209k | : QueueableInboundCall(std::move(conn), weight_in_bytes, std::move(call_processed_listener)) {} |
199 | | |
200 | 209k | RedisInboundCall::~RedisInboundCall() { |
201 | 209k | Status status; |
202 | 209k | if (quit_.load(std::memory_order_acquire)) { |
203 | 116 | rpc::ConnectionPtr conn = connection(); |
204 | 116 | rpc::Reactor* reactor = conn->reactor(); |
205 | 116 | auto scheduled = reactor->ScheduleReactorTask( |
206 | 116 | MakeFunctorReactorTask(std::bind(&rpc::Reactor::DestroyConnection, |
207 | 116 | reactor, |
208 | 116 | conn.get(), |
209 | 116 | status), |
210 | 116 | conn, SOURCE_LOCATION())); |
211 | 116 | LOG_IF(WARNING, !scheduled) << "Failed to schedule destroy"0 ; |
212 | 116 | } |
213 | 209k | } |
214 | | |
215 | | Status RedisInboundCall::ParseFrom( |
216 | 209k | const MemTrackerPtr& mem_tracker, size_t commands, rpc::CallData* data) { |
217 | 209k | TRACE_EVENT_FLOW_BEGIN0("rpc", "RedisInboundCall", this); |
218 | 209k | TRACE_EVENT0("rpc", "RedisInboundCall::ParseFrom"); |
219 | | |
220 | 209k | consumption_ = ScopedTrackedConsumption(mem_tracker, data->size()); |
221 | | |
222 | 209k | request_data_memory_usage_.store(data->size(), std::memory_order_release); |
223 | 209k | request_data_ = std::move(*data); |
224 | 209k | serialized_request_ = Slice(request_data_.data(), request_data_.size()); |
225 | | |
226 | 209k | client_batch_.resize(commands); |
227 | 209k | responses_.resize(commands); |
228 | 209k | ready_.reserve(commands); |
229 | 419k | for (size_t i = 0; i != commands; ++i209k ) { |
230 | 209k | ready_.emplace_back(0); |
231 | 209k | } |
232 | 209k | RedisParser parser(IoVecs(1, iovec{request_data_.data(), request_data_.size()})); |
233 | 209k | size_t end_of_command = 0; |
234 | 419k | for (size_t i = 0; i != commands; ++i209k ) { |
235 | 209k | parser.SetArgs(&client_batch_[i]); |
236 | 209k | end_of_command = VERIFY_RESULT(parser.NextCommand()); |
237 | 0 | DCHECK_NE(0, client_batch_[i].size()); |
238 | 209k | if (client_batch_[i].empty()) { // Should not be there. |
239 | 0 | return STATUS(Corruption, "Empty command"); |
240 | 0 | } |
241 | 209k | if (!end_of_command) { |
242 | 0 | break; |
243 | 0 | } |
244 | 209k | } |
245 | 209k | if (end_of_command != request_data_.size()) { |
246 | 0 | return STATUS_FORMAT(Corruption, |
247 | 0 | "Parsed size $0 does not match source size $1", |
248 | 0 | end_of_command, request_data_.size()); |
249 | 0 | } |
250 | | |
251 | 209k | parsed_.store(true, std::memory_order_release); |
252 | 209k | return Status::OK(); |
253 | 209k | } |
254 | | |
255 | | namespace { |
256 | | |
257 | | const rpc::RemoteMethod remote_method("yb.redisserver.RedisServerService", "anyMethod"); |
258 | | |
259 | | } |
260 | | |
261 | 2.93k | Slice RedisInboundCall::static_serialized_remote_method() { |
262 | 2.93k | return remote_method.serialized_body(); |
263 | 2.93k | } |
264 | | |
265 | 209k | Slice RedisInboundCall::serialized_remote_method() const { |
266 | 209k | return remote_method.serialized_body(); |
267 | 209k | } |
268 | | |
269 | 270 | Slice RedisInboundCall::method_name() const { |
270 | 270 | return remote_method.method_name(); |
271 | 270 | } |
272 | | |
273 | 419k | CoarseTimePoint RedisInboundCall::GetClientDeadline() const { |
274 | 419k | return CoarseTimePoint::max(); // No timeout specified in the protocol for Redis. |
275 | 419k | } |
276 | | |
277 | 0 | void RedisInboundCall::GetCallDetails(rpc::RpcCallInProgressPB *call_in_progress_pb) const { |
278 | 0 | rpc::RedisCallDetailsPB* redis_details = call_in_progress_pb->mutable_redis_details(); |
279 | 0 | for (const RedisClientCommand& command : client_batch_) { |
280 | 0 | string query; |
281 | 0 | for (const Slice& arg : command) { |
282 | 0 | query += " " + arg.ToDebugString(FLAGS_rpcz_max_redis_query_dump_size); |
283 | 0 | } |
284 | 0 | redis_details->add_call_details()->set_redis_string(query); |
285 | 0 | } |
286 | 0 | } |
287 | | |
288 | 209k | void RedisInboundCall::LogTrace() const { |
289 | 209k | MonoTime now = MonoTime::Now(); |
290 | 209k | auto total_time = now.GetDeltaSince(timing_.time_received).ToMilliseconds(); |
291 | | |
292 | 209k | if (PREDICT_FALSE(FLAGS_rpc_dump_all_traces |
293 | 209k | || (trace_ && trace_->must_print()) |
294 | 209k | || total_time > FLAGS_rpc_slow_query_threshold_ms)) { |
295 | 0 | LOG(WARNING) << ToString() << " took " << total_time << "ms. Details:"; |
296 | 0 | rpc::RpcCallInProgressPB call_in_progress_pb; |
297 | 0 | GetCallDetails(&call_in_progress_pb); |
298 | 0 | LOG(WARNING) << call_in_progress_pb.DebugString() << "Trace: "; |
299 | 0 | trace_->Dump(&LOG(WARNING), /* include_time_deltas */ true); |
300 | 0 | } |
301 | 209k | } |
302 | | |
303 | 3 | string RedisInboundCall::ToString() const { |
304 | 3 | return Format("Redis Call from $0", connection()->remote()); |
305 | 3 | } |
306 | | |
307 | | bool RedisInboundCall::DumpPB(const rpc::DumpRunningRpcsRequestPB& req, |
308 | 0 | rpc::RpcCallInProgressPB* resp) { |
309 | 0 | if (req.include_traces() && trace_) { |
310 | 0 | resp->set_trace_buffer(trace_->DumpToString(true)); |
311 | 0 | } |
312 | 0 | resp->set_elapsed_millis(MonoTime::Now().GetDeltaSince(timing_.time_received) |
313 | 0 | .ToMilliseconds()); |
314 | |
|
315 | 0 | if (!parsed_.load(std::memory_order_acquire)) { |
316 | 0 | return true; |
317 | 0 | } |
318 | | |
319 | 0 | GetCallDetails(resp); |
320 | 0 | return true; |
321 | 0 | } |
322 | | |
323 | | template <class Collection, class Out> |
324 | 419k | Out DoSerializeResponses(const Collection& responses, Out out) { |
325 | | // TODO(Amit): As and when we implement get/set and its h* equivalents, we would have to |
326 | | // handle arrays, hashes etc. For now, we only support the string response. |
327 | | |
328 | 419k | for (const auto& redis_response : responses) { |
329 | 419k | string error_message = redis_response.error_message(); |
330 | 419k | if (error_message == "") { |
331 | 418k | error_message = "Unknown error"; |
332 | 418k | } |
333 | | // Several types of error cases: |
334 | | // 1) Parsing error: The command is malformed (eg. too few arguments "SET a") |
335 | | // 2) Server error: Request to server failed due to reasons not related to the command |
336 | | // 3) Execution error: The command ran into problem during execution (eg. WrongType errors, |
337 | | // HSET on a key that isn't a hash). |
338 | | |
339 | 419k | if (redis_response.code() == RedisResponsePB_RedisStatusCode_PARSING_ERROR) { |
340 | 18 | out = SerializeError(error_message, out); |
341 | 419k | } else if (redis_response.code() == RedisResponsePB_RedisStatusCode_SERVER_ERROR) { |
342 | 446 | out = SerializeError(error_message, out); |
343 | 419k | } else if (redis_response.code() == RedisResponsePB_RedisStatusCode_NIL) { |
344 | 6.13k | out = SerializeEncoded(kNilResponse, out); |
345 | 412k | } else if (redis_response.code() != RedisResponsePB_RedisStatusCode_OK) { |
346 | 52 | out = SerializeError(error_message, out); |
347 | 412k | } else if (redis_response.has_string_response()) { |
348 | 139k | out = SerializeBulkString(redis_response.string_response(), out); |
349 | 273k | } else if (redis_response.has_status_response()) { |
350 | 961 | out = SerializeSimpleString(redis_response.status_response(), out); |
351 | 272k | } else if (redis_response.has_int_response()) { |
352 | 43.9k | out = SerializeInteger(redis_response.int_response(), out); |
353 | 228k | } else if (redis_response.has_array_response()) { |
354 | 22.4k | if (redis_response.array_response().has_encoded() && |
355 | 22.4k | redis_response.array_response().encoded()306 ) { |
356 | 304 | out = SerializeEncodedArray(redis_response.array_response().elements(), out); |
357 | 22.1k | } else { |
358 | 22.1k | out = SerializeArray(redis_response.array_response().elements(), out); |
359 | 22.1k | } |
360 | 206k | } else if (redis_response.has_encoded_response()) { |
361 | 254 | out = SerializeEncoded(redis_response.encoded_response(), out); |
362 | 206k | } else { |
363 | 206k | out = SerializeEncoded(kOkResponse, out); |
364 | 206k | } |
365 | 419k | } |
366 | 419k | return out; |
367 | 419k | } unsigned long yb::redisserver::DoSerializeResponses<boost::container::small_vector<yb::RedisResponsePB, 16ul, void, void>, unsigned long>(boost::container::small_vector<yb::RedisResponsePB, 16ul, void, void> const&, unsigned long) Line | Count | Source | 324 | 209k | Out DoSerializeResponses(const Collection& responses, Out out) { | 325 | | // TODO(Amit): As and when we implement get/set and its h* equivalents, we would have to | 326 | | // handle arrays, hashes etc. For now, we only support the string response. | 327 | | | 328 | 209k | for (const auto& redis_response : responses) { | 329 | 209k | string error_message = redis_response.error_message(); | 330 | 209k | if (error_message == "") { | 331 | 209k | error_message = "Unknown error"; | 332 | 209k | } | 333 | | // Several types of error cases: | 334 | | // 1) Parsing error: The command is malformed (eg. too few arguments "SET a") | 335 | | // 2) Server error: Request to server failed due to reasons not related to the command | 336 | | // 3) Execution error: The command ran into problem during execution (eg. WrongType errors, | 337 | | // HSET on a key that isn't a hash). | 338 | | | 339 | 209k | if (redis_response.code() == RedisResponsePB_RedisStatusCode_PARSING_ERROR) { | 340 | 9 | out = SerializeError(error_message, out); | 341 | 209k | } else if (redis_response.code() == RedisResponsePB_RedisStatusCode_SERVER_ERROR) { | 342 | 223 | out = SerializeError(error_message, out); | 343 | 209k | } else if (redis_response.code() == RedisResponsePB_RedisStatusCode_NIL) { | 344 | 3.06k | out = SerializeEncoded(kNilResponse, out); | 345 | 206k | } else if (redis_response.code() != RedisResponsePB_RedisStatusCode_OK) { | 346 | 26 | out = SerializeError(error_message, out); | 347 | 206k | } else if (redis_response.has_string_response()) { | 348 | 69.5k | out = SerializeBulkString(redis_response.string_response(), out); | 349 | 136k | } else if (redis_response.has_status_response()) { | 350 | 480 | out = SerializeSimpleString(redis_response.status_response(), out); | 351 | 136k | } else if (redis_response.has_int_response()) { | 352 | 21.9k | out = SerializeInteger(redis_response.int_response(), out); | 353 | 114k | } else if (redis_response.has_array_response()) { | 354 | 11.2k | if (redis_response.array_response().has_encoded() && | 355 | 11.2k | redis_response.array_response().encoded()153 ) { | 356 | 152 | out = SerializeEncodedArray(redis_response.array_response().elements(), out); | 357 | 11.0k | } else { | 358 | 11.0k | out = SerializeArray(redis_response.array_response().elements(), out); | 359 | 11.0k | } | 360 | 103k | } else if (redis_response.has_encoded_response()) { | 361 | 127 | out = SerializeEncoded(redis_response.encoded_response(), out); | 362 | 103k | } else { | 363 | 103k | out = SerializeEncoded(kOkResponse, out); | 364 | 103k | } | 365 | 209k | } | 366 | 209k | return out; | 367 | 209k | } |
unsigned char* yb::redisserver::DoSerializeResponses<boost::container::small_vector<yb::RedisResponsePB, 16ul, void, void>, unsigned char*>(boost::container::small_vector<yb::RedisResponsePB, 16ul, void, void> const&, unsigned char*) Line | Count | Source | 324 | 209k | Out DoSerializeResponses(const Collection& responses, Out out) { | 325 | | // TODO(Amit): As and when we implement get/set and its h* equivalents, we would have to | 326 | | // handle arrays, hashes etc. For now, we only support the string response. | 327 | | | 328 | 209k | for (const auto& redis_response : responses) { | 329 | 209k | string error_message = redis_response.error_message(); | 330 | 209k | if (error_message == "") { | 331 | 209k | error_message = "Unknown error"; | 332 | 209k | } | 333 | | // Several types of error cases: | 334 | | // 1) Parsing error: The command is malformed (eg. too few arguments "SET a") | 335 | | // 2) Server error: Request to server failed due to reasons not related to the command | 336 | | // 3) Execution error: The command ran into problem during execution (eg. WrongType errors, | 337 | | // HSET on a key that isn't a hash). | 338 | | | 339 | 209k | if (redis_response.code() == RedisResponsePB_RedisStatusCode_PARSING_ERROR) { | 340 | 9 | out = SerializeError(error_message, out); | 341 | 209k | } else if (redis_response.code() == RedisResponsePB_RedisStatusCode_SERVER_ERROR) { | 342 | 223 | out = SerializeError(error_message, out); | 343 | 209k | } else if (redis_response.code() == RedisResponsePB_RedisStatusCode_NIL) { | 344 | 3.06k | out = SerializeEncoded(kNilResponse, out); | 345 | 206k | } else if (redis_response.code() != RedisResponsePB_RedisStatusCode_OK) { | 346 | 26 | out = SerializeError(error_message, out); | 347 | 206k | } else if (redis_response.has_string_response()) { | 348 | 69.5k | out = SerializeBulkString(redis_response.string_response(), out); | 349 | 136k | } else if (redis_response.has_status_response()) { | 350 | 481 | out = SerializeSimpleString(redis_response.status_response(), out); | 351 | 136k | } else if (redis_response.has_int_response()) { | 352 | 21.9k | out = SerializeInteger(redis_response.int_response(), out); | 353 | 114k | } else if (redis_response.has_array_response()) { | 354 | 11.2k | if (redis_response.array_response().has_encoded() && | 355 | 11.2k | redis_response.array_response().encoded()153 ) { | 356 | 152 | out = SerializeEncodedArray(redis_response.array_response().elements(), out); | 357 | 11.0k | } else { | 358 | 11.0k | out = SerializeArray(redis_response.array_response().elements(), out); | 359 | 11.0k | } | 360 | 103k | } else if (redis_response.has_encoded_response()) { | 361 | 127 | out = SerializeEncoded(redis_response.encoded_response(), out); | 362 | 103k | } else { | 363 | 103k | out = SerializeEncoded(kOkResponse, out); | 364 | 103k | } | 365 | 209k | } | 366 | 209k | return out; | 367 | 209k | } |
|
368 | | |
369 | | template <class Collection> |
370 | 209k | RefCntBuffer SerializeResponses(const Collection& responses) { |
371 | 209k | constexpr size_t kZero = 0; |
372 | 209k | size_t size = DoSerializeResponses(responses, kZero); |
373 | 209k | RefCntBuffer result(size); |
374 | 209k | uint8_t* end = DoSerializeResponses(responses, result.udata()); |
375 | 209k | DCHECK_EQ(result.uend(), end); |
376 | 209k | return result; |
377 | 209k | } |
378 | | |
379 | 209k | void RedisInboundCall::DoSerialize(boost::container::small_vector_base<RefCntBuffer>* output) { |
380 | 209k | output->push_back(SerializeResponses(responses_)); |
381 | 209k | } |
382 | | |
383 | 210k | RedisConnectionContext& RedisInboundCall::connection_context() const { |
384 | 210k | return static_cast<RedisConnectionContext&>(connection()->context()); |
385 | 210k | } |
386 | | |
387 | | void RedisInboundCall::RespondFailure(rpc::ErrorStatusPB::RpcErrorCodePB error_code, |
388 | 0 | const Status& status) { |
389 | 0 | for (size_t i = 0; i != client_batch_.size(); ++i) { |
390 | 0 | RespondFailure(i, status); |
391 | 0 | } |
392 | 0 | } |
393 | | |
394 | | // We wait until all responses are ready for batch embedded in this call. |
395 | 209k | void RedisInboundCall::Respond(size_t idx, bool is_success, RedisResponsePB* resp) { |
396 | | // Did we set response for command at this index already? |
397 | 209k | VLOG(2) << "Responding to '" << client_batch_[idx][0] << "' with " << resp->ShortDebugString()0 ; |
398 | 209k | if (ready_[idx].fetch_add(1, std::memory_order_relaxed) == 0) { |
399 | 209k | if (!is_success) { |
400 | 225 | had_failures_.store(true, std::memory_order_release); |
401 | 225 | } |
402 | 209k | responses_[idx].Swap(resp); |
403 | | // Did we get all responses and ready to send data. |
404 | 209k | size_t responded = ready_count_.fetch_add(1, std::memory_order_release) + 1; |
405 | 209k | if (responded == client_batch_.size()) { |
406 | 209k | RecordHandlingCompleted(); |
407 | 209k | QueueResponse(!had_failures_.load(std::memory_order_acquire)); |
408 | 209k | } |
409 | 209k | } |
410 | 209k | } |
411 | | |
412 | | void RedisInboundCall::RespondSuccess(size_t idx, |
413 | | const rpc::RpcMethodMetrics& metrics, |
414 | 209k | RedisResponsePB* resp) { |
415 | 209k | Respond(idx, true, resp); |
416 | 209k | metrics.handler_latency->Increment((MonoTime::Now() - timing_.time_handled).ToMicroseconds()); |
417 | 209k | } |
418 | | |
419 | 10 | void RedisInboundCall::RespondFailure(size_t idx, const Status& status) { |
420 | 10 | RedisResponsePB resp; |
421 | 10 | Slice message = status.message(); |
422 | 10 | resp.set_code(RedisResponsePB_RedisStatusCode_PARSING_ERROR); |
423 | 10 | resp.set_error_message(message.data(), message.size()); |
424 | 10 | Respond(idx, false, &resp); |
425 | 10 | } |
426 | | |
427 | | } // namespace redisserver |
428 | | } // namespace yb |