YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/rpc/outbound_call.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include "yb/rpc/outbound_call.h"
34
35
#include <algorithm>
36
#include <mutex>
37
#include <string>
38
#include <vector>
39
40
#include <boost/functional/hash.hpp>
41
#include <gflags/gflags.h>
42
43
#include "yb/gutil/strings/substitute.h"
44
#include "yb/gutil/walltime.h"
45
46
#include "yb/rpc/connection.h"
47
#include "yb/rpc/constants.h"
48
#include "yb/rpc/proxy_base.h"
49
#include "yb/rpc/rpc_controller.h"
50
#include "yb/rpc/rpc_introspection.pb.h"
51
#include "yb/rpc/rpc_metrics.h"
52
#include "yb/rpc/serialization.h"
53
54
#include "yb/util/flag_tags.h"
55
#include "yb/util/format.h"
56
#include "yb/util/logging.h"
57
#include "yb/util/memory/memory.h"
58
#include "yb/util/metrics.h"
59
#include "yb/util/pb_util.h"
60
#include "yb/util/result.h"
61
#include "yb/util/scope_exit.h"
62
#include "yb/util/status_format.h"
63
#include "yb/util/thread_restrictions.h"
64
#include "yb/util/trace.h"
65
#include "yb/util/tsan_util.h"
66
67
METRIC_DEFINE_coarse_histogram(
68
    server, handler_latency_outbound_call_queue_time, "Time taken to queue the request ",
69
    yb::MetricUnit::kMicroseconds, "Microseconds spent to queue the request to the reactor");
70
METRIC_DEFINE_coarse_histogram(
71
    server, handler_latency_outbound_call_send_time, "Time taken to send the request ",
72
    yb::MetricUnit::kMicroseconds, "Microseconds spent to queue and write the request to the wire");
73
METRIC_DEFINE_coarse_histogram(
74
    server, handler_latency_outbound_call_time_to_response, "Time taken to get the response ",
75
    yb::MetricUnit::kMicroseconds,
76
    "Microseconds spent to send the request and get a response on the wire");
77
78
// 100M cycles should be about 50ms on a 2Ghz box. This should be high
79
// enough that involuntary context switches don't trigger it, but low enough
80
// that any serious blocking behavior on the reactor would.
81
DEFINE_int64(
82
    rpc_callback_max_cycles, 100 * 1000 * 1000 * yb::kTimeMultiplier,
83
    "The maximum number of cycles for which an RPC callback "
84
    "should be allowed to run without emitting a warning."
85
    " (Advanced debugging option)");
86
TAG_FLAG(rpc_callback_max_cycles, advanced);
87
TAG_FLAG(rpc_callback_max_cycles, runtime);
88
DECLARE_bool(rpc_dump_all_traces);
89
90
namespace yb {
91
namespace rpc {
92
93
using strings::Substitute;
94
using google::protobuf::Message;
95
using google::protobuf::io::CodedOutputStream;
96
97
OutboundCallMetrics::OutboundCallMetrics(const scoped_refptr<MetricEntity>& entity)
98
    : queue_time(METRIC_handler_latency_outbound_call_queue_time.Instantiate(entity)),
99
      send_time(METRIC_handler_latency_outbound_call_send_time.Instantiate(entity)),
100
139k
      time_to_response(METRIC_handler_latency_outbound_call_time_to_response.Instantiate(entity)) {
101
139k
}
102
103
namespace {
104
105
std::atomic<int32_t> call_id_ = {0};
106
107
25.5M
int32_t NextCallId() {
108
25.5M
  for (;;) {
109
25.5M
    auto result = call_id_.fetch_add(1, std::memory_order_acquire);
110
25.5M
    ++result;
111
25.5M
    if (result > 0) {
112
25.5M
      return result;
113
25.5M
    }
114
    // When call id overflows, we reset it to zero.
115
18.4E
    call_id_.compare_exchange_weak(result, 0);
116
18.4E
  }
117
25.5M
}
118
119
const std::string kEmptyString;
120
121
} // namespace
122
123
18.1M
void InvokeCallbackTask::Run() {
124
18.1M
  CHECK_NOTNULL(call_.get());
125
18.1M
  call_->InvokeCallbackSync();
126
18.1M
}
127
128
18.1M
void InvokeCallbackTask::Done(const Status& status) {
129
18.1M
  CHECK_NOTNULL(call_.get());
130
18.1M
  if (!status.ok()) {
131
210
    LOG(WARNING) << Format(
132
210
        "Failed to schedule invoking callback on response for request $0 to $1: $2",
133
210
        call_->remote_method(), call_->hostname(), status);
134
210
    call_->SetThreadPoolFailure(status);
135
    // We are in the shutdown path, with the threadpool closing, so allow IO and wait.
136
210
    ThreadRestrictions::SetWaitAllowed(true);
137
210
    ThreadRestrictions::SetIOAllowed(true);
138
210
    call_->InvokeCallbackSync();
139
210
  }
140
  // Clear the call, since it holds OutboundCall object.
141
18.1M
  call_ = nullptr;
142
18.1M
}
143
144
///
145
/// OutboundCall
146
///
147
148
OutboundCall::OutboundCall(const RemoteMethod* remote_method,
149
                           const std::shared_ptr<OutboundCallMetrics>& outbound_call_metrics,
150
                           std::shared_ptr<const OutboundMethodMetrics> method_metrics,
151
                           AnyMessagePtr response_storage,
152
                           RpcController* controller,
153
                           std::shared_ptr<RpcMetrics> rpc_metrics,
154
                           ResponseCallback callback,
155
                           ThreadPool* callback_thread_pool)
156
    : hostname_(&kEmptyString),
157
      start_(CoarseMonoClock::Now()),
158
      controller_(DCHECK_NOTNULL(controller)),
159
      response_(DCHECK_NOTNULL(response_storage)),
160
      trace_(new Trace),
161
      call_id_(NextCallId()),
162
      remote_method_(remote_method),
163
      callback_(std::move(callback)),
164
      callback_thread_pool_(callback_thread_pool),
165
      outbound_call_metrics_(outbound_call_metrics),
166
      rpc_metrics_(std::move(rpc_metrics)),
167
22.1M
      method_metrics_(std::move(method_metrics)) {
168
22.1M
  TRACE_TO_WITH_TIME(trace_, start_, "$0.", remote_method_->ToString());
169
170
22.1M
  if (Trace::CurrentTrace()) {
171
0
    Trace::CurrentTrace()->AddChildTrace(trace_.get());
172
0
  }
173
174
9.98k
  DVLOG(4) << "OutboundCall " << this << " constructed with state_: " << StateName(state_)
175
9.98k
           << " and RPC timeout: "
176
9.98k
           << (controller_->timeout().Initialized() ? controller_->timeout().ToString() : "none");
177
178
22.1M
  IncrementCounter(rpc_metrics_->outbound_calls_created);
179
22.1M
  IncrementGauge(rpc_metrics_->outbound_calls_alive);
180
22.1M
}
181
182
25.4M
OutboundCall::~OutboundCall() {
183
25.4M
  DCHECK(IsFinished());
184
4.84k
  DVLOG(4) << "OutboundCall " << this << " destroyed with state_: " << StateName(state_);
185
186
25.4M
  if (PREDICT_FALSE(FLAGS_rpc_dump_all_traces)) {
187
0
    LOG(INFO) << ToString() << " took "
188
0
              << MonoDelta(CoarseMonoClock::Now() - start_).ToMicroseconds() << "us. Trace:";
189
0
    trace_->Dump(&LOG(INFO), true);
190
0
  }
191
192
25.4M
  DecrementGauge(rpc_metrics_->outbound_calls_alive);
193
25.4M
}
194
195
19.8M
void OutboundCall::NotifyTransferred(const Status& status, Connection* conn) {
196
19.8M
  if (status.ok()) {
197
    // Even when call is already finished (timed out) we should notify connection that it was sent
198
    // because it should expect response with appropriate id.
199
19.4M
    conn->CallSent(shared_from(this));
200
19.4M
  }
201
202
19.8M
  if (IsFinished()) {
203
0
    LOG_IF_WITH_PREFIX(DFATAL, !IsTimedOut())
204
0
        << "Transferred call is in wrong state: " << state_.load(std::memory_order_acquire);
205
19.8M
  } else if (status.ok()) {
206
19.5M
    SetSent();
207
380k
  } else {
208
18.4E
    VLOG_WITH_PREFIX(1) << "Connection torn down: " << status;
209
380k
    SetFailed(status);
210
380k
  }
211
19.8M
}
212
213
16.5M
void OutboundCall::Serialize(boost::container::small_vector_base<RefCntBuffer>* output) {
214
16.5M
  output->push_back(std::move(buffer_));
215
16.5M
  buffer_consumption_ = ScopedTrackedConsumption();
216
16.5M
}
217
218
19.9M
Status OutboundCall::SetRequestParam(AnyMessageConstPtr req, const MemTrackerPtr& mem_tracker) {
219
19.9M
  auto req_size = req.SerializedSize();
220
19.9M
  size_t message_size = SerializedMessageSize(req_size, 0);
221
222
19.9M
  using Output = google::protobuf::io::CodedOutputStream;
223
19.9M
  auto timeout_ms = VERIFY_RESULT(TimeoutMs());
224
19.9M
  size_t call_id_size = Output::VarintSize32(call_id_);
225
19.9M
  size_t timeout_ms_size = Output::VarintSize32(timeout_ms);
226
19.9M
  auto serialized_remote_method = remote_method_->serialized();
227
228
19.9M
  size_t header_pb_len = 1 + call_id_size + serialized_remote_method.size() + 1 + timeout_ms_size;
229
19.9M
  size_t header_size =
230
19.9M
      kMsgLengthPrefixLength                            // Int prefix for the total length.
231
19.9M
      + CodedOutputStream::VarintSize32(
232
19.9M
            narrow_cast<uint32_t>(header_pb_len))       // Varint delimiter for header PB.
233
19.9M
      + header_pb_len;                                  // Length for the header PB itself.
234
19.9M
  size_t total_size = header_size + message_size;
235
236
19.9M
  buffer_ = RefCntBuffer(total_size);
237
19.9M
  uint8_t* dst = buffer_.udata();
238
239
  // 1. The length for the whole request, not including the 4-byte
240
  // length prefix.
241
19.9M
  NetworkByteOrder::Store32(dst, narrow_cast<uint32_t>(total_size - kMsgLengthPrefixLength));
242
19.9M
  dst += sizeof(uint32_t);
243
244
  // 2. The varint-prefixed RequestHeader PB
245
19.9M
  dst = CodedOutputStream::WriteVarint32ToArray(narrow_cast<uint32_t>(header_pb_len), dst);
246
19.9M
  dst = Output::WriteTagToArray(RequestHeader::kCallIdFieldNumber << 3, dst);
247
19.9M
  dst = Output::WriteVarint32ToArray(call_id_, dst);
248
19.9M
  memcpy(dst, serialized_remote_method.data(), serialized_remote_method.size());
249
19.9M
  dst += serialized_remote_method.size();
250
19.9M
  dst = CodedOutputStream::WriteTagToArray(RequestHeader::kTimeoutMillisFieldNumber << 3, dst);
251
19.9M
  dst = Output::WriteVarint32ToArray(timeout_ms, dst);
252
253
19.9M
  DCHECK_EQ(dst - buffer_.udata(), header_size);
254
255
19.9M
  if (mem_tracker) {
256
19.9M
    buffer_consumption_ = ScopedTrackedConsumption(mem_tracker, buffer_.size());
257
19.9M
  }
258
19.9M
  RETURN_NOT_OK(SerializeMessage(req, req_size, buffer_, 0, header_size));
259
19.9M
  if (method_metrics_) {
260
18.2M
    IncrementCounterBy(method_metrics_->request_bytes, buffer_.size());
261
18.2M
  }
262
19.9M
  return Status::OK();
263
19.9M
}
264
265
24.9M
Status OutboundCall::status() const {
266
24.9M
  std::lock_guard<simple_spinlock> l(lock_);
267
24.9M
  return status_;
268
24.9M
}
269
270
2.19k
const ErrorStatusPB* OutboundCall::error_pb() const {
271
2.19k
  std::lock_guard<simple_spinlock> l(lock_);
272
2.19k
  return error_pb_.get();
273
2.19k
}
274
275
93
string OutboundCall::StateName(State state) {
276
93
  return RpcCallState_Name(state);
277
93
}
278
279
2
OutboundCall::State OutboundCall::state() const {
280
2
  return state_.load(std::memory_order_acquire);
281
2
}
282
283
196M
bool FinishedState(RpcCallState state) {
284
196M
  switch (state) {
285
25.5M
    case READY:
286
110M
    case ON_OUTBOUND_QUEUE:
287
160M
    case SENT:
288
160M
      return false;
289
25.1k
    case TIMED_OUT:
290
506k
    case FINISHED_ERROR:
291
36.2M
    case FINISHED_SUCCESS:
292
36.2M
      return true;
293
0
  }
294
0
  LOG(FATAL) << "Unknown call state: " << state;
295
0
  return false;
296
0
}
297
298
76.0M
bool ValidStateTransition(RpcCallState old_state, RpcCallState new_state) {
299
76.0M
  switch (new_state) {
300
25.4M
    case ON_OUTBOUND_QUEUE:
301
25.4M
      return old_state == READY;
302
25.1M
    case SENT:
303
25.1M
      return old_state == ON_OUTBOUND_QUEUE;
304
14.7k
    case TIMED_OUT:
305
14.7k
      return old_state == SENT || old_state == ON_OUTBOUND_QUEUE;
306
25.0M
    case FINISHED_SUCCESS:
307
25.0M
      return old_state == SENT;
308
427k
    case FINISHED_ERROR:
309
427k
      return old_state == SENT || old_state == ON_OUTBOUND_QUEUE || old_state == READY;
310
0
    default:
311
      // No sanity checks for others.
312
0
      return true;
313
76.0M
  }
314
76.0M
}
315
316
76.0M
bool OutboundCall::SetState(State new_state) {
317
76.0M
  auto old_state = state_.load(std::memory_order_acquire);
318
  // Sanity check state transitions.
319
67.1k
  DVLOG(3) << "OutboundCall " << this << " (" << ToString() << ") switching from " <<
320
67.1k
    StateName(old_state) << " to " << StateName(new_state);
321
75.9M
  for (;;) {
322
75.9M
    if (FinishedState(old_state)) {
323
0
      VLOG(1) << "Call already finished: " << RpcCallState_Name(old_state) << ", new state: "
324
0
              << RpcCallState_Name(new_state);
325
0
      return false;
326
0
    }
327
75.9M
    if (!ValidStateTransition(old_state, new_state)) {
328
0
      LOG(DFATAL)
329
0
          << "Invalid call state transition: " << RpcCallState_Name(old_state) << " => "
330
0
          << RpcCallState_Name(new_state);
331
0
      return false;
332
0
    }
333
76.1M
    if (state_.compare_exchange_weak(old_state, new_state, std::memory_order_acq_rel)) {
334
76.1M
      return true;
335
76.1M
    }
336
75.9M
  }
337
76.0M
}
338
339
25.5M
void OutboundCall::InvokeCallback() {
340
25.5M
  if (callback_thread_pool_) {
341
18.1M
    callback_task_.SetOutboundCall(shared_from(this));
342
18.1M
    callback_thread_pool_->Enqueue(&callback_task_);
343
18.1M
    TRACE_TO(trace_, "Callback will be called asynchronously.");
344
7.42M
  } else {
345
7.42M
    InvokeCallbackSync();
346
7.42M
    TRACE_TO(trace_, "Callback called synchronously.");
347
7.42M
  }
348
25.5M
}
349
350
25.5M
void OutboundCall::InvokeCallbackSync() {
351
25.5M
  if (!callback_) {
352
0
    LOG(DFATAL) << "Callback has been already invoked.";
353
0
    return;
354
0
  }
355
356
25.5M
  int64_t start_cycles = CycleClock::Now();
357
25.5M
  callback_();
358
  // Clear the callback, since it may be holding onto reference counts
359
  // via bound parameters. We do this inside the timer because it's possible
360
  // the user has naughty destructors that block, and we want to account for that
361
  // time here if they happen to run on this thread.
362
25.5M
  callback_ = nullptr;
363
25.5M
  int64_t end_cycles = CycleClock::Now();
364
25.5M
  int64_t wait_cycles = end_cycles - start_cycles;
365
25.5M
  if (PREDICT_FALSE(wait_cycles > FLAGS_rpc_callback_max_cycles)) {
366
0
    auto time_spent = MonoDelta::FromSeconds(
367
0
        static_cast<double>(wait_cycles) / base::CyclesPerSecond());
368
369
0
    LOG(WARNING) << "RPC callback for " << ToString() << " took " << time_spent;
370
0
  }
371
372
  // Could be destroyed during callback. So reset it.
373
25.5M
  controller_ = nullptr;
374
25.5M
  response_ = nullptr;
375
25.5M
}
376
377
19.5M
void OutboundCall::SetResponse(CallResponse&& resp) {
378
19.5M
  DCHECK(!IsFinished());
379
380
19.5M
  auto now = CoarseMonoClock::Now();
381
19.5M
  TRACE_TO_WITH_TIME(trace_, now, "Response received.");
382
  // Avoid expensive conn_id.ToString() in production.
383
19.5M
  VTRACE_TO(1, trace_, "from $0", conn_id_.ToString());
384
  // Track time taken to be responded.
385
386
19.5M
  if (outbound_call_metrics_) {
387
18.0M
    outbound_call_metrics_->time_to_response->Increment(MonoDelta(now - start_).ToMicroseconds());
388
18.0M
  }
389
19.5M
  call_response_ = std::move(resp);
390
19.5M
  Slice r(call_response_.serialized_response());
391
392
19.5M
  if (method_metrics_) {
393
17.8M
    IncrementCounterBy(method_metrics_->response_bytes, r.size());
394
17.8M
  }
395
396
19.5M
  if (call_response_.is_success()) {
397
    // TODO: here we're deserializing the call response within the reactor thread,
398
    // which isn't great, since it would block processing of other RPCs in parallel.
399
    // Should look into a way to avoid this.
400
19.4M
    auto status = response_.ParseFromSlice(r);
401
19.4M
    if (!status.ok()) {
402
0
      SetFailed(status);
403
0
      return;
404
0
    }
405
19.5M
    if (SetState(FINISHED_SUCCESS)) {
406
19.5M
      InvokeCallback();
407
18.4E
    } else {
408
18.4E
      LOG(DFATAL) << "Success of already finished call: "
409
18.4E
                  << RpcCallState_Name(state_.load(std::memory_order_acquire));
410
18.4E
    }
411
34.4k
  } else {
412
    // Error
413
34.4k
    auto err = std::make_unique<ErrorStatusPB>();
414
34.4k
    if (!pb_util::ParseFromArray(err.get(), r.data(), r.size()).IsOk()) {
415
0
      SetFailed(STATUS(IOError, "Was an RPC error but could not parse error response",
416
0
                                err->InitializationErrorString()));
417
0
      return;
418
0
    }
419
34.4k
    auto status = STATUS(RemoteError, err->message());
420
34.4k
    SetFailed(status, std::move(err));
421
34.4k
  }
422
19.5M
}
423
424
25.4M
void OutboundCall::SetQueued() {
425
25.4M
  auto end_time = CoarseMonoClock::Now();
426
  // Track time taken to be queued.
427
25.4M
  if (outbound_call_metrics_) {
428
23.9M
    outbound_call_metrics_->queue_time->Increment(MonoDelta(end_time - start_).ToMicroseconds());
429
23.9M
  }
430
25.4M
  SetState(ON_OUTBOUND_QUEUE);
431
25.4M
  TRACE_TO_WITH_TIME(trace_, end_time, "Queued.");
432
25.4M
}
433
434
25.0M
void OutboundCall::SetSent() {
435
25.0M
  auto end_time = CoarseMonoClock::Now();
436
  // Track time taken to be sent
437
25.0M
  if (outbound_call_metrics_) {
438
23.5M
    outbound_call_metrics_->send_time->Increment(MonoDelta(end_time - start_).ToMicroseconds());
439
23.5M
  }
440
25.0M
  SetState(SENT);
441
25.0M
  TRACE_TO_WITH_TIME(trace_, end_time, "Call Sent.");
442
25.0M
}
443
444
5.58M
void OutboundCall::SetFinished() {
445
5.58M
  DCHECK(!IsFinished());
446
447
  // Track time taken to be responded.
448
5.58M
  if (outbound_call_metrics_) {
449
5.57M
    outbound_call_metrics_->time_to_response->Increment(
450
5.57M
        MonoDelta(CoarseMonoClock::Now() - start_).ToMicroseconds());
451
5.57M
  }
452
5.59M
  if (SetState(FINISHED_SUCCESS)) {
453
5.59M
    InvokeCallback();
454
5.59M
  }
455
5.58M
}
456
457
427k
void OutboundCall::SetFailed(const Status &status, std::unique_ptr<ErrorStatusPB> err_pb) {
458
427k
  DCHECK(!IsFinished());
459
460
427k
  TRACE_TO(trace_, "Call Failed.");
461
427k
  bool invoke_callback;
462
427k
  {
463
427k
    std::lock_guard<simple_spinlock> l(lock_);
464
427k
    status_ = status;
465
427k
    if (status_.IsRemoteError()) {
466
2.29k
      CHECK(err_pb);
467
2.29k
      error_pb_ = std::move(err_pb);
468
2.29k
      if (error_pb_->has_code()) {
469
1.27k
        status_ = status_.CloneAndAddErrorCode(RpcError(error_pb_->code()));
470
1.27k
      }
471
425k
    } else {
472
425k
      CHECK(!err_pb);
473
425k
    }
474
427k
    invoke_callback = SetState(FINISHED_ERROR);
475
427k
  }
476
429k
  if (invoke_callback) {
477
429k
    InvokeCallback();
478
429k
  }
479
427k
}
480
481
14.7k
void OutboundCall::SetTimedOut() {
482
14.7k
  DCHECK(!IsFinished());
483
484
14.7k
  TRACE_TO(trace_, "Call TimedOut.");
485
14.7k
  bool invoke_callback;
486
14.7k
  {
487
14.7k
    auto status = STATUS_FORMAT(
488
14.7k
        TimedOut,
489
14.7k
        "$0 RPC (request call id $3) to $1 timed out after $2",
490
14.7k
        remote_method_->method_name(),
491
14.7k
        conn_id_.remote(),
492
14.7k
        controller_->timeout(),
493
14.7k
        call_id_);
494
14.7k
    std::lock_guard<simple_spinlock> l(lock_);
495
14.7k
    status_ = std::move(status);
496
14.7k
    invoke_callback = SetState(TIMED_OUT);
497
14.7k
  }
498
14.7k
  if (invoke_callback) {
499
14.7k
    InvokeCallback();
500
14.7k
  }
501
14.7k
}
502
503
91
bool OutboundCall::IsTimedOut() const {
504
91
  return state_.load(std::memory_order_acquire) == TIMED_OUT;
505
91
}
506
507
120M
bool OutboundCall::IsFinished() const {
508
120M
  return FinishedState(state_.load(std::memory_order_acquire));
509
120M
}
510
511
3.91M
Result<Slice> OutboundCall::GetSidecar(size_t idx) const {
512
3.91M
  auto ptr = VERIFY_RESULT(GetSidecarPtr(idx));
513
3.91M
  return Slice(ptr[0], ptr[1]);
514
3.91M
}
515
516
3.78M
Result<const uint8_t*const*> OutboundCall::GetSidecarPtr(size_t idx) const {
517
3.78M
  return call_response_.GetSidecarPtr(idx);
518
3.78M
}
519
520
2.06M
Result<SidecarHolder> OutboundCall::GetSidecarHolder(size_t idx) const {
521
2.06M
  return call_response_.GetSidecarHolder(idx);
522
2.06M
}
523
524
33
string OutboundCall::ToString() const {
525
33
  return Format("RPC call $0 -> $1 , state=$2.", *remote_method_, conn_id_, StateName(state_));
526
33
}
527
528
bool OutboundCall::DumpPB(const DumpRunningRpcsRequestPB& req,
529
2
                          RpcCallInProgressPB* resp) {
530
2
  std::lock_guard<simple_spinlock> l(lock_);
531
2
  auto state_value = state();
532
2
  if (!req.dump_timed_out() && state_value == RpcCallState::TIMED_OUT) {
533
1
    return false;
534
1
  }
535
1
  if (!InitHeader(resp->mutable_header()).ok() && !req.dump_timed_out()) {
536
    // Note that if we proceed here due to req.dump_timed_out() being true, then the
537
    // header.timeout_millis() will be inaccurate/not-set. This is ok because DumpPB
538
    // is only used for dumping the PB and not to send the RPC over the wire.
539
0
    return false;
540
0
  }
541
1
  resp->set_elapsed_millis(MonoDelta(CoarseMonoClock::Now() - start_).ToMilliseconds());
542
1
  resp->set_state(state_value);
543
1
  if (req.include_traces() && trace_) {
544
1
    resp->set_trace_buffer(trace_->DumpToString(true));
545
1
  }
546
1
  return true;
547
1
}
548
549
0
std::string OutboundCall::LogPrefix() const {
550
0
  return Format("{ OutboundCall@$0 } ", this);
551
0
}
552
553
19.9M
Result<uint32_t> OutboundCall::TimeoutMs() const {
554
19.9M
  MonoDelta timeout = controller_->timeout();
555
19.9M
  if (timeout.Initialized()) {
556
19.9M
    auto timeout_millis = timeout.ToMilliseconds();
557
19.9M
    if (timeout_millis <= 0) {
558
650
      return STATUS(TimedOut, "Call timed out before sending");
559
650
    }
560
19.9M
    return narrow_cast<uint32_t>(timeout_millis);
561
18.4E
  } else {
562
18.4E
    return 0;
563
18.4E
  }
564
19.9M
}
565
566
1
Status OutboundCall::InitHeader(RequestHeader* header) {
567
1
  header->set_call_id(call_id_);
568
1
  remote_method_->ToPB(header->mutable_remote_method());
569
570
1
  if (!IsFinished()) {
571
1
    header->set_timeout_millis(VERIFY_RESULT(TimeoutMs()));
572
1
  }
573
1
  return Status::OK();
574
1
}
575
576
///
577
/// ConnectionId
578
///
579
580
9.34k
string ConnectionId::ToString() const {
581
9.34k
  return Format("{ remote: $0 idx: $1 protocol: $2 }", remote_, idx_, protocol_);
582
9.34k
}
583
584
24.8M
size_t ConnectionId::HashCode() const {
585
24.8M
  size_t seed = 0;
586
24.8M
  boost::hash_combine(seed, hash_value(remote_));
587
24.8M
  boost::hash_combine(seed, idx_);
588
24.8M
  boost::hash_combine(seed, protocol_);
589
24.8M
  return seed;
590
24.8M
}
591
592
24.8M
size_t ConnectionIdHash::operator() (const ConnectionId& conn_id) const {
593
24.8M
  return conn_id.HashCode();
594
24.8M
}
595
596
///
597
/// CallResponse
598
///
599
600
CallResponse::CallResponse()
601
45.1M
    : parsed_(false) {
602
45.1M
}
603
604
5.84M
Result<const uint8_t*const*> CallResponse::GetSidecarPtr(size_t idx) const {
605
5.84M
  DCHECK(parsed_);
606
5.84M
  if (idx + 1 >= sidecar_bounds_.size()) {
607
0
    return STATUS_FORMAT(InvalidArgument, "Index $0 does not reference a valid sidecar", idx);
608
0
  }
609
5.84M
  return &sidecar_bounds_[idx];
610
5.84M
}
611
612
2.06M
Result<SidecarHolder> CallResponse::GetSidecarHolder(size_t idx) const {
613
2.06M
  auto bounds = VERIFY_RESULT(GetSidecarPtr(idx));
614
2.06M
  return SidecarHolder(response_data_.buffer(), Slice(bounds[0], bounds[1]));
615
2.06M
}
616
617
19.5M
Status CallResponse::ParseFrom(CallData* call_data) {
618
19.5M
  CHECK(!parsed_);
619
19.5M
  Slice entire_message;
620
621
19.5M
  response_data_ = std::move(*call_data);
622
19.5M
  Slice source(response_data_.data(), response_data_.size());
623
19.5M
  RETURN_NOT_OK(ParseYBMessage(source, &header_, &entire_message));
624
625
  // Use information from header to extract the payload slices.
626
19.5M
  const size_t sidecars = header_.sidecar_offsets_size();
627
628
19.5M
  if (sidecars > 0) {
629
1.79M
    serialized_response_ = Slice(entire_message.data(),
630
1.79M
                                 header_.sidecar_offsets(0));
631
1.79M
    sidecar_bounds_.reserve(sidecars + 1);
632
633
1.79M
    uint32_t prev_offset = 0;
634
5.85M
    for (auto offset : header_.sidecar_offsets()) {
635
5.85M
      if (offset > entire_message.size() || offset < prev_offset) {
636
0
        return STATUS_FORMAT(
637
0
            Corruption,
638
0
            "Invalid sidecar offsets; sidecar apparently starts at $0,"
639
0
            " ends at $1, but the entire message has length $2",
640
0
            prev_offset, offset, entire_message.size());
641
0
      }
642
5.85M
      sidecar_bounds_.push_back(entire_message.data() + offset);
643
5.85M
      prev_offset = offset;
644
5.85M
    }
645
1.79M
    sidecar_bounds_.emplace_back(entire_message.end());
646
17.7M
  } else {
647
17.7M
    serialized_response_ = entire_message;
648
17.7M
  }
649
650
19.5M
  parsed_ = true;
651
19.5M
  return Status::OK();
652
19.5M
}
653
654
const std::string kRpcErrorCategoryName = "rpc error";
655
656
StatusCategoryRegisterer rpc_error_category_registerer(
657
    StatusCategoryDescription::Make<RpcErrorTag>(&kRpcErrorCategoryName));
658
659
}  // namespace rpc
660
}  // namespace yb