YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/yql/redis/redisserver/redis_rpc.cc
Line
Count
Source (jump to first uncovered line)
1
//
2
// Copyright (c) YugaByte, Inc.
3
//
4
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
5
// in compliance with the License.  You may obtain a copy of the License at
6
//
7
// http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software distributed under the License
10
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
11
// or implied.  See the License for the specific language governing permissions and limitations
12
// under the License.
13
//
14
//
15
16
#include "yb/yql/redis/redisserver/redis_rpc.h"
17
18
#include "yb/client/client_fwd.h"
19
20
#include "yb/common/redis_protocol.pb.h"
21
22
#include "yb/rpc/connection.h"
23
#include "yb/rpc/reactor.h"
24
#include "yb/rpc/rpc_introspection.pb.h"
25
26
#include "yb/util/debug/trace_event.h"
27
#include "yb/util/format.h"
28
#include "yb/util/memory/memory.h"
29
#include "yb/util/metrics.h"
30
#include "yb/util/result.h"
31
#include "yb/util/size_literals.h"
32
#include "yb/util/status_format.h"
33
34
#include "yb/yql/redis/redisserver/redis_encoding.h"
35
#include "yb/yql/redis/redisserver/redis_parser.h"
36
37
using namespace std::literals;
38
using namespace std::placeholders;
39
using namespace yb::size_literals;
40
41
DECLARE_bool(rpc_dump_all_traces);
42
DECLARE_int32(rpc_slow_query_threshold_ms);
43
DEFINE_uint64(redis_max_concurrent_commands, 1,
44
              "Max number of redis commands received from single connection, "
45
              "that could be processed concurrently");
46
DEFINE_uint64(redis_max_batch, 500, "Max number of redis commands that forms batch");
47
DEFINE_int32(rpcz_max_redis_query_dump_size, 4_KB,
48
             "The maximum size of the Redis query string in the RPCZ dump.");
49
DEFINE_uint64(redis_max_read_buffer_size, 128_MB,
50
              "Max read buffer size for Redis connections.");
51
52
DEFINE_uint64(redis_max_queued_bytes, 128_MB,
53
              "Max number of bytes in queued redis commands.");
54
55
DEFINE_int32(
56
    redis_connection_soft_limit_grace_period_sec, 60,
57
    "The duration for which the outbound data needs to exceeed the softlimit "
58
    "before the connection gets closed down.");
59
60
namespace yb {
61
namespace redisserver {
62
63
RedisConnectionContext::RedisConnectionContext(
64
    rpc::GrowableBufferAllocator* allocator,
65
    const MemTrackerPtr& call_tracker)
66
    : ConnectionContextWithQueue(
67
          FLAGS_redis_max_concurrent_commands, FLAGS_redis_max_queued_bytes),
68
      read_buffer_(allocator, FLAGS_redis_max_read_buffer_size),
69
1.51k
      call_mem_tracker_(call_tracker) {}
70
71
471
RedisConnectionContext::~RedisConnectionContext() {}
72
73
Result<rpc::ProcessCallsResult> RedisConnectionContext::ProcessCalls(
74
    const rpc::ConnectionPtr& connection, const IoVecs& data,
75
215k
    rpc::ReadBufferFull read_buffer_full) {
76
215k
  if (!can_enqueue()) {
77
0
    return rpc::ProcessCallsResult{
78
0
      .consumed = 0,
79
0
      .buffer = Slice(),
80
0
    };
81
0
  }
82
83
215k
  if (!parser_) {
84
1.49k
    parser_.reset(new RedisParser(data));
85
213k
  } else {
86
213k
    parser_->Update(data);
87
213k
  }
88
215k
  RedisParser& parser = *parser_;
89
215k
  size_t begin_of_batch = 0;
90
  // Try to parse all received commands to a single RedisInboundCall.
91
424k
  for (;;) {
92
424k
    auto end_of_command = 
VERIFY_RESULT424k
(parser.NextCommand());424k
93
424k
    if (end_of_command == 0) {
94
215k
      break;
95
215k
    }
96
209k
    end_of_batch_ = end_of_command;
97
209k
    if (++commands_in_batch_ >= FLAGS_redis_max_batch) {
98
0
      rpc::CallData call_data(end_of_batch_ - begin_of_batch);
99
0
      IoVecsToBuffer(data, begin_of_batch, end_of_batch_, call_data.data());
100
0
      RETURN_NOT_OK(HandleInboundCall(connection, commands_in_batch_, &call_data));
101
0
      begin_of_batch = end_of_batch_;
102
0
      commands_in_batch_ = 0;
103
0
    }
104
209k
  }
105
  // Create call for rest of commands.
106
  // Do not form new call if we are in a middle of command.
107
  // It means that soon we should receive remaining data for this command and could wait.
108
215k
  if (commands_in_batch_ > 0 && 
(209k
end_of_batch_ == IoVecsFullSize(data)209k
||
read_buffer_full0
)) {
109
209k
    rpc::CallData call_data(end_of_batch_ - begin_of_batch);
110
209k
    IoVecsToBuffer(data, begin_of_batch, end_of_batch_, call_data.data());
111
209k
    RETURN_NOT_OK(HandleInboundCall(connection, commands_in_batch_, &call_data));
112
209k
    begin_of_batch = end_of_batch_;
113
209k
    commands_in_batch_ = 0;
114
209k
  }
115
215k
  parser.Consume(begin_of_batch);
116
215k
  end_of_batch_ -= begin_of_batch;
117
215k
  return rpc::ProcessCallsResult{
118
215k
    .consumed = begin_of_batch,
119
215k
    .buffer = Slice(),
120
215k
  };
121
215k
}
122
123
Status RedisConnectionContext::HandleInboundCall(const rpc::ConnectionPtr& connection,
124
                                                 size_t commands_in_batch,
125
209k
                                                 rpc::CallData* data) {
126
209k
  auto reactor = connection->reactor();
127
209k
  DCHECK(reactor->IsCurrentThread());
128
129
209k
  auto call = rpc::InboundCall::Create<RedisInboundCall>(
130
209k
      connection, data->size(), call_processed_listener());
131
132
209k
  Status s = call->ParseFrom(call_mem_tracker_, commands_in_batch, data);
133
209k
  if (!s.ok()) {
134
0
    return s;
135
0
  }
136
137
209k
  Enqueue(std::move(call));
138
139
209k
  return Status::OK();
140
209k
}
141
142
419k
Status RedisConnectionContext::ReportPendingWriteBytes(size_t pending_bytes) {
143
419k
  static constexpr size_t kHardLimit = 32_MB;
144
419k
  static constexpr size_t kSoftLimit = 8_MB;
145
419k
  auto mode = ClientMode();
146
419k
  DVLOG(3) << "Connection in mode " << ToString(mode) << " has " << pending_bytes
147
1
           << " bytes in the queue.";
148
419k
  if (mode == RedisClientMode::kNormal) {
149
419k
    return Status::OK();
150
419k
  }
151
152
  // We use the same buffering logic for subscribers and monitoring clients.
153
  // Close a client if:
154
  // 1) it exceeds the hard limit of 32MB. or
155
  // 2) it has been over the soft limit of 8MB for longer than 1min.
156
588
  if (pending_bytes > kHardLimit) {
157
2
    LOG(INFO) << "Connection in mode " << ToString(mode) << " has reached the HardLimit. "
158
2
              << pending_bytes << " bytes in the queue.";
159
2
    return STATUS(NetworkError, "Slow Redis Client: HardLimit exceeded.");
160
586
  } else if (pending_bytes > kSoftLimit) {
161
122
    auto now = CoarseMonoClock::Now();
162
122
    static const CoarseDuration kGracePeriod =
163
122
        std::chrono::seconds(FLAGS_redis_connection_soft_limit_grace_period_sec);
164
122
    if (soft_limit_exceeded_since_ == CoarseTimePoint::max()) {
165
11
      DVLOG(1) << "Connection in mode " << ToString(mode) << " has reached the Softlimit now. "
166
0
               << pending_bytes << " bytes in the queue.";
167
11
      soft_limit_exceeded_since_ = now;
168
111
    } else if (now > soft_limit_exceeded_since_ + kGracePeriod) {
169
1
      LOG(INFO) << "Connection in mode " << ToString(mode) << " has reached the Softlimit > "
170
1
                << yb::ToString(kGracePeriod) << " ago. " << pending_bytes
171
1
                << " bytes in the queue.";
172
1
      return STATUS(NetworkError, "Slow Redis Client: Softlimit exceeded.");
173
110
    } else {
174
110
      DVLOG(1) << "Connection in mode " << ToString(mode)
175
0
               << " has reached the Softlimit less than  " << yb::ToString(kGracePeriod) << " ago. "
176
0
               << pending_bytes;
177
110
    }
178
464
  } else {
179
464
    if (soft_limit_exceeded_since_ != CoarseTimePoint::max()) {
180
7
      DVLOG(1) << "Connection in mode " << ToString(mode) << " has dropped below the Softlimit. "
181
0
               << pending_bytes << " bytes in the queue.";
182
7
      soft_limit_exceeded_since_ = CoarseTimePoint::max();
183
7
    }
184
464
  }
185
585
  return Status::OK();
186
588
}
187
188
945
void RedisConnectionContext::Shutdown(const Status& status) {
189
945
  if (cleanup_hook_) {
190
83
    cleanup_hook_();
191
83
  }
192
945
  rpc::ConnectionContextWithQueue::Shutdown(status);
193
945
}
194
195
RedisInboundCall::RedisInboundCall(rpc::ConnectionPtr conn,
196
                                   size_t weight_in_bytes,
197
                                   CallProcessedListener call_processed_listener)
198
209k
    : QueueableInboundCall(std::move(conn), weight_in_bytes, std::move(call_processed_listener)) {}
199
200
209k
RedisInboundCall::~RedisInboundCall() {
201
209k
  Status status;
202
209k
  if (quit_.load(std::memory_order_acquire)) {
203
116
    rpc::ConnectionPtr conn = connection();
204
116
    rpc::Reactor* reactor = conn->reactor();
205
116
    auto scheduled = reactor->ScheduleReactorTask(
206
116
        MakeFunctorReactorTask(std::bind(&rpc::Reactor::DestroyConnection,
207
116
                                         reactor,
208
116
                                         conn.get(),
209
116
                                         status),
210
116
                               conn, SOURCE_LOCATION()));
211
116
    LOG_IF
(WARNING, !scheduled) << "Failed to schedule destroy"0
;
212
116
  }
213
209k
}
214
215
Status RedisInboundCall::ParseFrom(
216
209k
    const MemTrackerPtr& mem_tracker, size_t commands, rpc::CallData* data) {
217
209k
  TRACE_EVENT_FLOW_BEGIN0("rpc", "RedisInboundCall", this);
218
209k
  TRACE_EVENT0("rpc", "RedisInboundCall::ParseFrom");
219
220
209k
  consumption_ = ScopedTrackedConsumption(mem_tracker, data->size());
221
222
209k
  request_data_memory_usage_.store(data->size(), std::memory_order_release);
223
209k
  request_data_ = std::move(*data);
224
209k
  serialized_request_ = Slice(request_data_.data(), request_data_.size());
225
226
209k
  client_batch_.resize(commands);
227
209k
  responses_.resize(commands);
228
209k
  ready_.reserve(commands);
229
419k
  for (size_t i = 0; i != commands; 
++i209k
) {
230
209k
    ready_.emplace_back(0);
231
209k
  }
232
209k
  RedisParser parser(IoVecs(1, iovec{request_data_.data(), request_data_.size()}));
233
209k
  size_t end_of_command = 0;
234
419k
  for (size_t i = 0; i != commands; 
++i209k
) {
235
209k
    parser.SetArgs(&client_batch_[i]);
236
209k
    end_of_command = VERIFY_RESULT(parser.NextCommand());
237
0
    DCHECK_NE(0, client_batch_[i].size());
238
209k
    if (client_batch_[i].empty()) { // Should not be there.
239
0
      return STATUS(Corruption, "Empty command");
240
0
    }
241
209k
    if (!end_of_command) {
242
0
      break;
243
0
    }
244
209k
  }
245
209k
  if (end_of_command != request_data_.size()) {
246
0
    return STATUS_FORMAT(Corruption,
247
0
                         "Parsed size $0 does not match source size $1",
248
0
                         end_of_command, request_data_.size());
249
0
  }
250
251
209k
  parsed_.store(true, std::memory_order_release);
252
209k
  return Status::OK();
253
209k
}
254
255
namespace {
256
257
const rpc::RemoteMethod remote_method("yb.redisserver.RedisServerService", "anyMethod");
258
259
}
260
261
2.93k
Slice RedisInboundCall::static_serialized_remote_method() {
262
2.93k
  return remote_method.serialized_body();
263
2.93k
}
264
265
209k
Slice RedisInboundCall::serialized_remote_method() const {
266
209k
  return remote_method.serialized_body();
267
209k
}
268
269
270
Slice RedisInboundCall::method_name() const {
270
270
  return remote_method.method_name();
271
270
}
272
273
419k
CoarseTimePoint RedisInboundCall::GetClientDeadline() const {
274
419k
  return CoarseTimePoint::max();  // No timeout specified in the protocol for Redis.
275
419k
}
276
277
0
void RedisInboundCall::GetCallDetails(rpc::RpcCallInProgressPB *call_in_progress_pb) const {
278
0
    rpc::RedisCallDetailsPB* redis_details = call_in_progress_pb->mutable_redis_details();
279
0
    for (const RedisClientCommand& command : client_batch_) {
280
0
        string query;
281
0
        for (const Slice& arg : command) {
282
0
            query += " " + arg.ToDebugString(FLAGS_rpcz_max_redis_query_dump_size);
283
0
        }
284
0
        redis_details->add_call_details()->set_redis_string(query);
285
0
    }
286
0
}
287
288
209k
void RedisInboundCall::LogTrace() const {
289
209k
  MonoTime now = MonoTime::Now();
290
209k
  auto total_time = now.GetDeltaSince(timing_.time_received).ToMilliseconds();
291
292
209k
  if (PREDICT_FALSE(FLAGS_rpc_dump_all_traces
293
209k
          || (trace_ && trace_->must_print())
294
209k
          || total_time > FLAGS_rpc_slow_query_threshold_ms)) {
295
0
    LOG(WARNING) << ToString() << " took " << total_time << "ms. Details:";
296
0
    rpc::RpcCallInProgressPB call_in_progress_pb;
297
0
    GetCallDetails(&call_in_progress_pb);
298
0
    LOG(WARNING) << call_in_progress_pb.DebugString() << "Trace: ";
299
0
    trace_->Dump(&LOG(WARNING), /* include_time_deltas */ true);
300
0
  }
301
209k
}
302
303
3
string RedisInboundCall::ToString() const {
304
3
  return Format("Redis Call from $0", connection()->remote());
305
3
}
306
307
bool RedisInboundCall::DumpPB(const rpc::DumpRunningRpcsRequestPB& req,
308
0
                              rpc::RpcCallInProgressPB* resp) {
309
0
  if (req.include_traces() && trace_) {
310
0
    resp->set_trace_buffer(trace_->DumpToString(true));
311
0
  }
312
0
  resp->set_elapsed_millis(MonoTime::Now().GetDeltaSince(timing_.time_received)
313
0
      .ToMilliseconds());
314
315
0
  if (!parsed_.load(std::memory_order_acquire)) {
316
0
    return true;
317
0
  }
318
319
0
  GetCallDetails(resp);
320
0
  return true;
321
0
}
322
323
template <class Collection, class Out>
324
419k
Out DoSerializeResponses(const Collection& responses, Out out) {
325
  // TODO(Amit): As and when we implement get/set and its h* equivalents, we would have to
326
  // handle arrays, hashes etc. For now, we only support the string response.
327
328
419k
  for (const auto& redis_response : responses) {
329
419k
    string error_message = redis_response.error_message();
330
419k
    if (error_message == "") {
331
418k
      error_message = "Unknown error";
332
418k
    }
333
    // Several types of error cases:
334
    //    1) Parsing error: The command is malformed (eg. too few arguments "SET a")
335
    //    2) Server error: Request to server failed due to reasons not related to the command
336
    //    3) Execution error: The command ran into problem during execution (eg. WrongType errors,
337
    //       HSET on a key that isn't a hash).
338
339
419k
    if (redis_response.code() == RedisResponsePB_RedisStatusCode_PARSING_ERROR) {
340
18
      out = SerializeError(error_message, out);
341
419k
    } else if (redis_response.code() == RedisResponsePB_RedisStatusCode_SERVER_ERROR) {
342
446
      out = SerializeError(error_message, out);
343
419k
    } else if (redis_response.code() == RedisResponsePB_RedisStatusCode_NIL) {
344
6.13k
      out = SerializeEncoded(kNilResponse, out);
345
412k
    } else if (redis_response.code() != RedisResponsePB_RedisStatusCode_OK) {
346
52
      out = SerializeError(error_message, out);
347
412k
    } else if (redis_response.has_string_response()) {
348
139k
      out = SerializeBulkString(redis_response.string_response(), out);
349
273k
    } else if (redis_response.has_status_response()) {
350
961
      out = SerializeSimpleString(redis_response.status_response(), out);
351
272k
    } else if (redis_response.has_int_response()) {
352
43.9k
      out = SerializeInteger(redis_response.int_response(), out);
353
228k
    } else if (redis_response.has_array_response()) {
354
22.4k
      if (redis_response.array_response().has_encoded() &&
355
22.4k
          
redis_response.array_response().encoded()306
) {
356
304
        out = SerializeEncodedArray(redis_response.array_response().elements(), out);
357
22.1k
      } else {
358
22.1k
        out = SerializeArray(redis_response.array_response().elements(), out);
359
22.1k
      }
360
206k
    } else if (redis_response.has_encoded_response()) {
361
254
      out = SerializeEncoded(redis_response.encoded_response(), out);
362
206k
    } else {
363
206k
      out = SerializeEncoded(kOkResponse, out);
364
206k
    }
365
419k
  }
366
419k
  return out;
367
419k
}
unsigned long yb::redisserver::DoSerializeResponses<boost::container::small_vector<yb::RedisResponsePB, 16ul, void, void>, unsigned long>(boost::container::small_vector<yb::RedisResponsePB, 16ul, void, void> const&, unsigned long)
Line
Count
Source
324
209k
Out DoSerializeResponses(const Collection& responses, Out out) {
325
  // TODO(Amit): As and when we implement get/set and its h* equivalents, we would have to
326
  // handle arrays, hashes etc. For now, we only support the string response.
327
328
209k
  for (const auto& redis_response : responses) {
329
209k
    string error_message = redis_response.error_message();
330
209k
    if (error_message == "") {
331
209k
      error_message = "Unknown error";
332
209k
    }
333
    // Several types of error cases:
334
    //    1) Parsing error: The command is malformed (eg. too few arguments "SET a")
335
    //    2) Server error: Request to server failed due to reasons not related to the command
336
    //    3) Execution error: The command ran into problem during execution (eg. WrongType errors,
337
    //       HSET on a key that isn't a hash).
338
339
209k
    if (redis_response.code() == RedisResponsePB_RedisStatusCode_PARSING_ERROR) {
340
9
      out = SerializeError(error_message, out);
341
209k
    } else if (redis_response.code() == RedisResponsePB_RedisStatusCode_SERVER_ERROR) {
342
223
      out = SerializeError(error_message, out);
343
209k
    } else if (redis_response.code() == RedisResponsePB_RedisStatusCode_NIL) {
344
3.06k
      out = SerializeEncoded(kNilResponse, out);
345
206k
    } else if (redis_response.code() != RedisResponsePB_RedisStatusCode_OK) {
346
26
      out = SerializeError(error_message, out);
347
206k
    } else if (redis_response.has_string_response()) {
348
69.5k
      out = SerializeBulkString(redis_response.string_response(), out);
349
136k
    } else if (redis_response.has_status_response()) {
350
480
      out = SerializeSimpleString(redis_response.status_response(), out);
351
136k
    } else if (redis_response.has_int_response()) {
352
21.9k
      out = SerializeInteger(redis_response.int_response(), out);
353
114k
    } else if (redis_response.has_array_response()) {
354
11.2k
      if (redis_response.array_response().has_encoded() &&
355
11.2k
          
redis_response.array_response().encoded()153
) {
356
152
        out = SerializeEncodedArray(redis_response.array_response().elements(), out);
357
11.0k
      } else {
358
11.0k
        out = SerializeArray(redis_response.array_response().elements(), out);
359
11.0k
      }
360
103k
    } else if (redis_response.has_encoded_response()) {
361
127
      out = SerializeEncoded(redis_response.encoded_response(), out);
362
103k
    } else {
363
103k
      out = SerializeEncoded(kOkResponse, out);
364
103k
    }
365
209k
  }
366
209k
  return out;
367
209k
}
unsigned char* yb::redisserver::DoSerializeResponses<boost::container::small_vector<yb::RedisResponsePB, 16ul, void, void>, unsigned char*>(boost::container::small_vector<yb::RedisResponsePB, 16ul, void, void> const&, unsigned char*)
Line
Count
Source
324
209k
Out DoSerializeResponses(const Collection& responses, Out out) {
325
  // TODO(Amit): As and when we implement get/set and its h* equivalents, we would have to
326
  // handle arrays, hashes etc. For now, we only support the string response.
327
328
209k
  for (const auto& redis_response : responses) {
329
209k
    string error_message = redis_response.error_message();
330
209k
    if (error_message == "") {
331
209k
      error_message = "Unknown error";
332
209k
    }
333
    // Several types of error cases:
334
    //    1) Parsing error: The command is malformed (eg. too few arguments "SET a")
335
    //    2) Server error: Request to server failed due to reasons not related to the command
336
    //    3) Execution error: The command ran into problem during execution (eg. WrongType errors,
337
    //       HSET on a key that isn't a hash).
338
339
209k
    if (redis_response.code() == RedisResponsePB_RedisStatusCode_PARSING_ERROR) {
340
9
      out = SerializeError(error_message, out);
341
209k
    } else if (redis_response.code() == RedisResponsePB_RedisStatusCode_SERVER_ERROR) {
342
223
      out = SerializeError(error_message, out);
343
209k
    } else if (redis_response.code() == RedisResponsePB_RedisStatusCode_NIL) {
344
3.06k
      out = SerializeEncoded(kNilResponse, out);
345
206k
    } else if (redis_response.code() != RedisResponsePB_RedisStatusCode_OK) {
346
26
      out = SerializeError(error_message, out);
347
206k
    } else if (redis_response.has_string_response()) {
348
69.5k
      out = SerializeBulkString(redis_response.string_response(), out);
349
136k
    } else if (redis_response.has_status_response()) {
350
481
      out = SerializeSimpleString(redis_response.status_response(), out);
351
136k
    } else if (redis_response.has_int_response()) {
352
21.9k
      out = SerializeInteger(redis_response.int_response(), out);
353
114k
    } else if (redis_response.has_array_response()) {
354
11.2k
      if (redis_response.array_response().has_encoded() &&
355
11.2k
          
redis_response.array_response().encoded()153
) {
356
152
        out = SerializeEncodedArray(redis_response.array_response().elements(), out);
357
11.0k
      } else {
358
11.0k
        out = SerializeArray(redis_response.array_response().elements(), out);
359
11.0k
      }
360
103k
    } else if (redis_response.has_encoded_response()) {
361
127
      out = SerializeEncoded(redis_response.encoded_response(), out);
362
103k
    } else {
363
103k
      out = SerializeEncoded(kOkResponse, out);
364
103k
    }
365
209k
  }
366
209k
  return out;
367
209k
}
368
369
template <class Collection>
370
209k
RefCntBuffer SerializeResponses(const Collection& responses) {
371
209k
  constexpr size_t kZero = 0;
372
209k
  size_t size = DoSerializeResponses(responses, kZero);
373
209k
  RefCntBuffer result(size);
374
209k
  uint8_t* end = DoSerializeResponses(responses, result.udata());
375
209k
  DCHECK_EQ(result.uend(), end);
376
209k
  return result;
377
209k
}
378
379
209k
void RedisInboundCall::DoSerialize(boost::container::small_vector_base<RefCntBuffer>* output) {
380
209k
  output->push_back(SerializeResponses(responses_));
381
209k
}
382
383
210k
RedisConnectionContext& RedisInboundCall::connection_context() const {
384
210k
  return static_cast<RedisConnectionContext&>(connection()->context());
385
210k
}
386
387
void RedisInboundCall::RespondFailure(rpc::ErrorStatusPB::RpcErrorCodePB error_code,
388
0
                                      const Status& status) {
389
0
  for (size_t i = 0; i != client_batch_.size(); ++i) {
390
0
    RespondFailure(i, status);
391
0
  }
392
0
}
393
394
// We wait until all responses are ready for batch embedded in this call.
395
209k
void RedisInboundCall::Respond(size_t idx, bool is_success, RedisResponsePB* resp) {
396
  // Did we set response for command at this index already?
397
209k
  VLOG
(2) << "Responding to '" << client_batch_[idx][0] << "' with " << resp->ShortDebugString()0
;
398
209k
  if (ready_[idx].fetch_add(1, std::memory_order_relaxed) == 0) {
399
209k
    if (!is_success) {
400
225
      had_failures_.store(true, std::memory_order_release);
401
225
    }
402
209k
    responses_[idx].Swap(resp);
403
    // Did we get all responses and ready to send data.
404
209k
    size_t responded = ready_count_.fetch_add(1, std::memory_order_release) + 1;
405
209k
    if (responded == client_batch_.size()) {
406
209k
      RecordHandlingCompleted();
407
209k
      QueueResponse(!had_failures_.load(std::memory_order_acquire));
408
209k
    }
409
209k
  }
410
209k
}
411
412
void RedisInboundCall::RespondSuccess(size_t idx,
413
                                      const rpc::RpcMethodMetrics& metrics,
414
209k
                                      RedisResponsePB* resp) {
415
209k
  Respond(idx, true, resp);
416
209k
  metrics.handler_latency->Increment((MonoTime::Now() - timing_.time_handled).ToMicroseconds());
417
209k
}
418
419
10
void RedisInboundCall::RespondFailure(size_t idx, const Status& status) {
420
10
  RedisResponsePB resp;
421
10
  Slice message = status.message();
422
10
  resp.set_code(RedisResponsePB_RedisStatusCode_PARSING_ERROR);
423
10
  resp.set_error_message(message.data(), message.size());
424
10
  Respond(idx, false, &resp);
425
10
}
426
427
} // namespace redisserver
428
} // namespace yb