YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/rpc/outbound_call.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include "yb/rpc/outbound_call.h"
34
35
#include <algorithm>
36
#include <mutex>
37
#include <string>
38
#include <vector>
39
40
#include <boost/functional/hash.hpp>
41
#include <gflags/gflags.h>
42
43
#include "yb/gutil/strings/substitute.h"
44
#include "yb/gutil/walltime.h"
45
46
#include "yb/rpc/connection.h"
47
#include "yb/rpc/constants.h"
48
#include "yb/rpc/proxy_base.h"
49
#include "yb/rpc/rpc_controller.h"
50
#include "yb/rpc/rpc_introspection.pb.h"
51
#include "yb/rpc/rpc_metrics.h"
52
#include "yb/rpc/serialization.h"
53
54
#include "yb/util/flag_tags.h"
55
#include "yb/util/format.h"
56
#include "yb/util/logging.h"
57
#include "yb/util/memory/memory.h"
58
#include "yb/util/metrics.h"
59
#include "yb/util/pb_util.h"
60
#include "yb/util/result.h"
61
#include "yb/util/scope_exit.h"
62
#include "yb/util/status_format.h"
63
#include "yb/util/thread_restrictions.h"
64
#include "yb/util/trace.h"
65
#include "yb/util/tsan_util.h"
66
67
METRIC_DEFINE_coarse_histogram(
68
    server, handler_latency_outbound_call_queue_time, "Time taken to queue the request ",
69
    yb::MetricUnit::kMicroseconds, "Microseconds spent to queue the request to the reactor");
70
METRIC_DEFINE_coarse_histogram(
71
    server, handler_latency_outbound_call_send_time, "Time taken to send the request ",
72
    yb::MetricUnit::kMicroseconds, "Microseconds spent to queue and write the request to the wire");
73
METRIC_DEFINE_coarse_histogram(
74
    server, handler_latency_outbound_call_time_to_response, "Time taken to get the response ",
75
    yb::MetricUnit::kMicroseconds,
76
    "Microseconds spent to send the request and get a response on the wire");
77
78
// 100M cycles should be about 50ms on a 2Ghz box. This should be high
79
// enough that involuntary context switches don't trigger it, but low enough
80
// that any serious blocking behavior on the reactor would.
81
DEFINE_int64(
82
    rpc_callback_max_cycles, 100 * 1000 * 1000 * yb::kTimeMultiplier,
83
    "The maximum number of cycles for which an RPC callback "
84
    "should be allowed to run without emitting a warning."
85
    " (Advanced debugging option)");
86
TAG_FLAG(rpc_callback_max_cycles, advanced);
87
TAG_FLAG(rpc_callback_max_cycles, runtime);
88
DECLARE_bool(rpc_dump_all_traces);
89
90
namespace yb {
91
namespace rpc {
92
93
using strings::Substitute;
94
using google::protobuf::Message;
95
using google::protobuf::io::CodedOutputStream;
96
97
OutboundCallMetrics::OutboundCallMetrics(const scoped_refptr<MetricEntity>& entity)
98
    : queue_time(METRIC_handler_latency_outbound_call_queue_time.Instantiate(entity)),
99
      send_time(METRIC_handler_latency_outbound_call_send_time.Instantiate(entity)),
100
206k
      time_to_response(METRIC_handler_latency_outbound_call_time_to_response.Instantiate(entity)) {
101
206k
}
102
103
namespace {
104
105
std::atomic<int32_t> call_id_ = {0};
106
107
86.4M
int32_t NextCallId() {
108
86.4M
  for (;;) {
109
86.4M
    auto result = call_id_.fetch_add(1, std::memory_order_acquire);
110
86.4M
    ++result;
111
86.5M
    if (
result > 086.4M
) {
112
86.5M
      return result;
113
86.5M
    }
114
    // When call id overflows, we reset it to zero.
115
18.4E
    call_id_.compare_exchange_weak(result, 0);
116
18.4E
  }
117
86.4M
}
118
119
const std::string kEmptyString;
120
121
} // namespace
122
123
66.8M
void InvokeCallbackTask::Run() {
124
66.8M
  CHECK_NOTNULL(call_.get());
125
66.8M
  call_->InvokeCallbackSync();
126
66.8M
}
127
128
66.8M
void InvokeCallbackTask::Done(const Status& status) {
129
66.8M
  CHECK_NOTNULL(call_.get());
130
66.8M
  if (!status.ok()) {
131
283
    LOG(WARNING) << Format(
132
283
        "Failed to schedule invoking callback on response for request $0 to $1: $2",
133
283
        call_->remote_method(), call_->hostname(), status);
134
283
    call_->SetThreadPoolFailure(status);
135
    // We are in the shutdown path, with the threadpool closing, so allow IO and wait.
136
283
    ThreadRestrictions::SetWaitAllowed(true);
137
283
    ThreadRestrictions::SetIOAllowed(true);
138
283
    call_->InvokeCallbackSync();
139
283
  }
140
  // Clear the call, since it holds OutboundCall object.
141
66.8M
  call_ = nullptr;
142
66.8M
}
143
144
///
145
/// OutboundCall
146
///
147
148
OutboundCall::OutboundCall(const RemoteMethod* remote_method,
149
                           const std::shared_ptr<OutboundCallMetrics>& outbound_call_metrics,
150
                           std::shared_ptr<const OutboundMethodMetrics> method_metrics,
151
                           AnyMessagePtr response_storage,
152
                           RpcController* controller,
153
                           std::shared_ptr<RpcMetrics> rpc_metrics,
154
                           ResponseCallback callback,
155
                           ThreadPool* callback_thread_pool)
156
    : hostname_(&kEmptyString),
157
      start_(CoarseMonoClock::Now()),
158
      controller_(DCHECK_NOTNULL(controller)),
159
      response_(DCHECK_NOTNULL(response_storage)),
160
      trace_(new Trace),
161
      call_id_(NextCallId()),
162
      remote_method_(remote_method),
163
      callback_(std::move(callback)),
164
      callback_thread_pool_(callback_thread_pool),
165
      outbound_call_metrics_(outbound_call_metrics),
166
      rpc_metrics_(std::move(rpc_metrics)),
167
86.4M
      method_metrics_(std::move(method_metrics)) {
168
86.4M
  TRACE_TO_WITH_TIME(trace_, start_, "$0.", remote_method_->ToString());
169
170
86.4M
  if (Trace::CurrentTrace()) {
171
1.01M
    Trace::CurrentTrace()->AddChildTrace(trace_.get());
172
1.01M
  }
173
174
86.4M
  DVLOG(4) << "OutboundCall " << this << " constructed with state_: " << StateName(state_)
175
33.3k
           << " and RPC timeout: "
176
33.3k
           << (controller_->timeout().Initialized() ? 
controller_->timeout().ToString()0
: "none");
177
178
86.4M
  IncrementCounter(rpc_metrics_->outbound_calls_created);
179
86.4M
  IncrementGauge(rpc_metrics_->outbound_calls_alive);
180
86.4M
}
181
182
86.3M
OutboundCall::~OutboundCall() {
183
86.3M
  DCHECK(IsFinished());
184
86.3M
  DVLOG
(4) << "OutboundCall " << this << " destroyed with state_: " << StateName(state_)19.9k
;
185
186
86.3M
  if (PREDICT_FALSE(FLAGS_rpc_dump_all_traces)) {
187
0
    LOG(INFO) << ToString() << " took "
188
0
              << MonoDelta(CoarseMonoClock::Now() - start_).ToMicroseconds() << "us. Trace:";
189
0
    trace_->Dump(&LOG(INFO), true);
190
0
  }
191
192
86.3M
  DecrementGauge(rpc_metrics_->outbound_calls_alive);
193
86.3M
}
194
195
74.8M
void OutboundCall::NotifyTransferred(const Status& status, Connection* conn) {
196
74.8M
  if (status.ok()) {
197
    // Even when call is already finished (timed out) we should notify connection that it was sent
198
    // because it should expect response with appropriate id.
199
72.0M
    conn->CallSent(shared_from(this));
200
72.0M
  }
201
202
74.8M
  if (IsFinished()) {
203
192
    
LOG_IF_WITH_PREFIX0
(DFATAL, !IsTimedOut())
204
0
        << "Transferred call is in wrong state: " << state_.load(std::memory_order_acquire);
205
74.8M
  } else if (status.ok()) {
206
72.4M
    SetSent();
207
72.4M
  } else {
208
18.4E
    VLOG_WITH_PREFIX(1) << "Connection torn down: " << status;
209
2.40M
    SetFailed(status);
210
2.40M
  }
211
74.8M
}
212
213
72.9M
void OutboundCall::Serialize(boost::container::small_vector_base<RefCntBuffer>* output) {
214
72.9M
  output->push_back(std::move(buffer_));
215
72.9M
  buffer_consumption_ = ScopedTrackedConsumption();
216
72.9M
}
217
218
75.6M
Status OutboundCall::SetRequestParam(AnyMessageConstPtr req, const MemTrackerPtr& mem_tracker) {
219
75.6M
  auto req_size = req.SerializedSize();
220
75.6M
  size_t message_size = SerializedMessageSize(req_size, 0);
221
222
75.6M
  using Output = google::protobuf::io::CodedOutputStream;
223
75.6M
  auto timeout_ms = 
VERIFY_RESULT75.6M
(TimeoutMs());75.6M
224
0
  size_t call_id_size = Output::VarintSize32(call_id_);
225
75.6M
  size_t timeout_ms_size = Output::VarintSize32(timeout_ms);
226
75.6M
  auto serialized_remote_method = remote_method_->serialized();
227
228
75.6M
  size_t header_pb_len = 1 + call_id_size + serialized_remote_method.size() + 1 + timeout_ms_size;
229
75.6M
  size_t header_size =
230
75.6M
      kMsgLengthPrefixLength                            // Int prefix for the total length.
231
75.6M
      + CodedOutputStream::VarintSize32(
232
75.6M
            narrow_cast<uint32_t>(header_pb_len))       // Varint delimiter for header PB.
233
75.6M
      + header_pb_len;                                  // Length for the header PB itself.
234
75.6M
  size_t total_size = header_size + message_size;
235
236
75.6M
  buffer_ = RefCntBuffer(total_size);
237
75.6M
  uint8_t* dst = buffer_.udata();
238
239
  // 1. The length for the whole request, not including the 4-byte
240
  // length prefix.
241
75.6M
  NetworkByteOrder::Store32(dst, narrow_cast<uint32_t>(total_size - kMsgLengthPrefixLength));
242
75.6M
  dst += sizeof(uint32_t);
243
244
  // 2. The varint-prefixed RequestHeader PB
245
75.6M
  dst = CodedOutputStream::WriteVarint32ToArray(narrow_cast<uint32_t>(header_pb_len), dst);
246
75.6M
  dst = Output::WriteTagToArray(RequestHeader::kCallIdFieldNumber << 3, dst);
247
75.6M
  dst = Output::WriteVarint32ToArray(call_id_, dst);
248
75.6M
  memcpy(dst, serialized_remote_method.data(), serialized_remote_method.size());
249
75.6M
  dst += serialized_remote_method.size();
250
75.6M
  dst = CodedOutputStream::WriteTagToArray(RequestHeader::kTimeoutMillisFieldNumber << 3, dst);
251
75.6M
  dst = Output::WriteVarint32ToArray(timeout_ms, dst);
252
253
75.6M
  DCHECK_EQ(dst - buffer_.udata(), header_size);
254
255
75.6M
  if (mem_tracker) {
256
75.5M
    buffer_consumption_ = ScopedTrackedConsumption(mem_tracker, buffer_.size());
257
75.5M
  }
258
75.6M
  RETURN_NOT_OK(SerializeMessage(req, req_size, buffer_, 0, header_size));
259
75.6M
  if (method_metrics_) {
260
73.7M
    IncrementCounterBy(method_metrics_->request_bytes, buffer_.size());
261
73.7M
  }
262
75.6M
  return Status::OK();
263
75.6M
}
264
265
84.9M
Status OutboundCall::status() const {
266
84.9M
  std::lock_guard<simple_spinlock> l(lock_);
267
84.9M
  return status_;
268
84.9M
}
269
270
268k
const ErrorStatusPB* OutboundCall::error_pb() const {
271
268k
  std::lock_guard<simple_spinlock> l(lock_);
272
268k
  return error_pb_.get();
273
268k
}
274
275
195
string OutboundCall::StateName(State state) {
276
195
  return RpcCallState_Name(state);
277
195
}
278
279
1
OutboundCall::State OutboundCall::state() const {
280
1
  return state_.load(std::memory_order_acquire);
281
1
}
282
283
673M
bool FinishedState(RpcCallState state) {
284
673M
  switch (state) {
285
86.1M
    case READY:
286
394M
    case ON_OUTBOUND_QUEUE:
287
561M
    case SENT:
288
561M
      return false;
289
58.5k
    case TIMED_OUT:
290
3.35M
    case FINISHED_ERROR:
291
113M
    case FINISHED_SUCCESS:
292
113M
      return true;
293
673M
  }
294
0
  LOG(FATAL) << "Unknown call state: " << state;
295
0
  return false;
296
673M
}
297
298
255M
bool ValidStateTransition(RpcCallState old_state, RpcCallState new_state) {
299
255M
  switch (new_state) {
300
86.0M
    case ON_OUTBOUND_QUEUE:
301
86.0M
      return old_state == READY;
302
83.6M
    case SENT:
303
83.6M
      return old_state == ON_OUTBOUND_QUEUE;
304
38.4k
    case TIMED_OUT:
305
38.4k
      return old_state == SENT || 
old_state == ON_OUTBOUND_QUEUE3.98k
;
306
83.1M
    case FINISHED_SUCCESS:
307
83.1M
      return old_state == SENT;
308
3.01M
    case FINISHED_ERROR:
309
3.01M
      return old_state == SENT || 
old_state == ON_OUTBOUND_QUEUE2.72M
||
old_state == READY160k
;
310
0
    default:
311
      // No sanity checks for others.
312
0
      return true;
313
255M
  }
314
255M
}
315
316
255M
bool OutboundCall::SetState(State new_state) {
317
255M
  auto old_state = state_.load(std::memory_order_acquire);
318
  // Sanity check state transitions.
319
255M
  DVLOG(3) << "OutboundCall " << this << " (" << ToString() << ") switching from " <<
320
744k
    StateName(old_state) << " to " << StateName(new_state);
321
255M
  for (;;) {
322
255M
    if (FinishedState(old_state)) {
323
0
      VLOG(1) << "Call already finished: " << RpcCallState_Name(old_state) << ", new state: "
324
0
              << RpcCallState_Name(new_state);
325
0
      return false;
326
0
    }
327
255M
    if (!ValidStateTransition(old_state, new_state)) {
328
0
      LOG(DFATAL)
329
0
          << "Invalid call state transition: " << RpcCallState_Name(old_state) << " => "
330
0
          << RpcCallState_Name(new_state);
331
0
      return false;
332
0
    }
333
255M
    
if (255M
state_.compare_exchange_weak(old_state, new_state, std::memory_order_acq_rel)255M
) {
334
255M
      return true;
335
255M
    }
336
255M
  }
337
255M
}
338
339
86.1M
void OutboundCall::InvokeCallback() {
340
86.1M
  if (callback_thread_pool_) {
341
66.5M
    callback_task_.SetOutboundCall(shared_from(this));
342
66.5M
    callback_thread_pool_->Enqueue(&callback_task_);
343
66.5M
    TRACE_TO(trace_, "Callback will be called asynchronously.");
344
66.5M
  } else {
345
19.5M
    InvokeCallbackSync();
346
19.5M
    TRACE_TO(trace_, "Callback called synchronously.");
347
19.5M
  }
348
86.1M
}
349
350
86.4M
void OutboundCall::InvokeCallbackSync() {
351
86.4M
  if (!callback_) {
352
0
    LOG(DFATAL) << "Callback has been already invoked.";
353
0
    return;
354
0
  }
355
356
86.4M
  int64_t start_cycles = CycleClock::Now();
357
86.4M
  callback_();
358
  // Clear the callback, since it may be holding onto reference counts
359
  // via bound parameters. We do this inside the timer because it's possible
360
  // the user has naughty destructors that block, and we want to account for that
361
  // time here if they happen to run on this thread.
362
86.4M
  callback_ = nullptr;
363
86.4M
  int64_t end_cycles = CycleClock::Now();
364
86.4M
  int64_t wait_cycles = end_cycles - start_cycles;
365
86.4M
  if (PREDICT_FALSE(wait_cycles > FLAGS_rpc_callback_max_cycles)) {
366
146
    auto time_spent = MonoDelta::FromSeconds(
367
146
        static_cast<double>(wait_cycles) / base::CyclesPerSecond());
368
369
146
    LOG(WARNING) << "RPC callback for " << ToString() << " took " << time_spent;
370
146
  }
371
372
  // Could be destroyed during callback. So reset it.
373
86.4M
  controller_ = nullptr;
374
86.4M
  response_ = nullptr;
375
86.4M
}
376
377
72.7M
void OutboundCall::SetResponse(CallResponse&& resp) {
378
72.7M
  DCHECK(!IsFinished());
379
380
72.7M
  auto now = CoarseMonoClock::Now();
381
72.7M
  TRACE_TO_WITH_TIME(trace_, now, "Response received.");
382
  // Avoid expensive conn_id.ToString() in production.
383
72.7M
  VTRACE_TO(1, trace_, "from $0", conn_id_.ToString());
384
  // Track time taken to be responded.
385
386
72.7M
  if (outbound_call_metrics_) {
387
71.1M
    outbound_call_metrics_->time_to_response->Increment(MonoDelta(now - start_).ToMicroseconds());
388
71.1M
  }
389
72.7M
  call_response_ = std::move(resp);
390
72.7M
  Slice r(call_response_.serialized_response());
391
392
72.7M
  if (method_metrics_) {
393
70.9M
    IncrementCounterBy(method_metrics_->response_bytes, r.size());
394
70.9M
  }
395
396
72.7M
  if (call_response_.is_success()) {
397
    // TODO: here we're deserializing the call response within the reactor thread,
398
    // which isn't great, since it would block processing of other RPCs in parallel.
399
    // Should look into a way to avoid this.
400
72.2M
    auto status = response_.ParseFromSlice(r);
401
72.2M
    if (!status.ok()) {
402
0
      SetFailed(status);
403
0
      return;
404
0
    }
405
72.2M
    if (SetState(FINISHED_SUCCESS)) {
406
72.2M
      InvokeCallback();
407
72.2M
    } else {
408
35.6k
      LOG(DFATAL) << "Success of already finished call: "
409
35.6k
                  << RpcCallState_Name(state_.load(std::memory_order_acquire));
410
35.6k
    }
411
72.2M
  } else {
412
    // Error
413
460k
    auto err = std::make_unique<ErrorStatusPB>();
414
460k
    if (!pb_util::ParseFromArray(err.get(), r.data(), r.size()).IsOk()) {
415
0
      SetFailed(STATUS(IOError, "Was an RPC error but could not parse error response",
416
0
                                err->InitializationErrorString()));
417
0
      return;
418
0
    }
419
460k
    auto status = STATUS(RemoteError, err->message());
420
460k
    SetFailed(status, std::move(err));
421
460k
  }
422
72.7M
}
423
424
86.0M
void OutboundCall::SetQueued() {
425
86.0M
  auto end_time = CoarseMonoClock::Now();
426
  // Track time taken to be queued.
427
86.0M
  if (outbound_call_metrics_) {
428
84.0M
    outbound_call_metrics_->queue_time->Increment(MonoDelta(end_time - start_).ToMicroseconds());
429
84.0M
  }
430
86.0M
  SetState(ON_OUTBOUND_QUEUE);
431
86.0M
  TRACE_TO_WITH_TIME(trace_, end_time, "Queued.");
432
86.0M
}
433
434
83.3M
void OutboundCall::SetSent() {
435
83.3M
  auto end_time = CoarseMonoClock::Now();
436
  // Track time taken to be sent
437
83.3M
  if (outbound_call_metrics_) {
438
81.4M
    outbound_call_metrics_->send_time->Increment(MonoDelta(end_time - start_).ToMicroseconds());
439
81.4M
  }
440
83.3M
  SetState(SENT);
441
83.3M
  TRACE_TO_WITH_TIME(trace_, end_time, "Call Sent.");
442
83.3M
}
443
444
10.7M
void OutboundCall::SetFinished() {
445
10.7M
  DCHECK(!IsFinished());
446
447
  // Track time taken to be responded.
448
10.7M
  if (outbound_call_metrics_) {
449
10.7M
    outbound_call_metrics_->time_to_response->Increment(
450
10.7M
        MonoDelta(CoarseMonoClock::Now() - start_).ToMicroseconds());
451
10.7M
  }
452
10.8M
  if (
SetState(FINISHED_SUCCESS)10.7M
) {
453
10.8M
    InvokeCallback();
454
10.8M
  }
455
10.7M
}
456
457
3.00M
void OutboundCall::SetFailed(const Status &status, std::unique_ptr<ErrorStatusPB> err_pb) {
458
3.00M
  DCHECK(!IsFinished());
459
460
3.00M
  TRACE_TO(trace_, "Call Failed.");
461
3.00M
  bool invoke_callback;
462
3.00M
  {
463
3.00M
    std::lock_guard<simple_spinlock> l(lock_);
464
3.00M
    status_ = status;
465
3.00M
    if (status_.IsRemoteError()) {
466
268k
      CHECK(err_pb);
467
268k
      error_pb_ = std::move(err_pb);
468
268k
      if (error_pb_->has_code()) {
469
266k
        status_ = status_.CloneAndAddErrorCode(RpcError(error_pb_->code()));
470
266k
      }
471
2.74M
    } else {
472
2.74M
      CHECK(!err_pb);
473
2.74M
    }
474
3.00M
    invoke_callback = SetState(FINISHED_ERROR);
475
3.00M
  }
476
3.01M
  if (
invoke_callback3.00M
) {
477
3.01M
    InvokeCallback();
478
3.01M
  }
479
3.00M
}
480
481
38.4k
void OutboundCall::SetTimedOut() {
482
38.4k
  DCHECK(!IsFinished());
483
484
38.4k
  TRACE_TO(trace_, "Call TimedOut.");
485
38.4k
  bool invoke_callback;
486
38.4k
  {
487
38.4k
    auto status = STATUS_FORMAT(
488
38.4k
        TimedOut,
489
38.4k
        "$0 RPC (request call id $3) to $1 timed out after $2",
490
38.4k
        remote_method_->method_name(),
491
38.4k
        conn_id_.remote(),
492
38.4k
        controller_->timeout(),
493
38.4k
        call_id_);
494
38.4k
    std::lock_guard<simple_spinlock> l(lock_);
495
38.4k
    status_ = std::move(status);
496
38.4k
    invoke_callback = SetState(TIMED_OUT);
497
38.4k
  }
498
38.4k
  if (invoke_callback) {
499
38.4k
    InvokeCallback();
500
38.4k
  }
501
38.4k
}
502
503
192
bool OutboundCall::IsTimedOut() const {
504
192
  return state_.load(std::memory_order_acquire) == TIMED_OUT;
505
192
}
506
507
419M
bool OutboundCall::IsFinished() const {
508
419M
  return FinishedState(state_.load(std::memory_order_acquire));
509
419M
}
510
511
7.52M
Result<Slice> OutboundCall::GetSidecar(size_t idx) const {
512
7.52M
  auto ptr = VERIFY_RESULT(GetSidecarPtr(idx));
513
0
  return Slice(ptr[0], ptr[1]);
514
7.52M
}
515
516
10.7M
Result<const uint8_t*const*> OutboundCall::GetSidecarPtr(size_t idx) const {
517
10.7M
  return call_response_.GetSidecarPtr(idx);
518
10.7M
}
519
520
7.21M
Result<SidecarHolder> OutboundCall::GetSidecarHolder(size_t idx) const {
521
7.21M
  return call_response_.GetSidecarHolder(idx);
522
7.21M
}
523
524
165
string OutboundCall::ToString() const {
525
165
  return Format("RPC call $0 -> $1 , state=$2.", *remote_method_, conn_id_, StateName(state_));
526
165
}
527
528
bool OutboundCall::DumpPB(const DumpRunningRpcsRequestPB& req,
529
1
                          RpcCallInProgressPB* resp) {
530
1
  std::lock_guard<simple_spinlock> l(lock_);
531
1
  auto state_value = state();
532
1
  if (!req.dump_timed_out() && state_value == RpcCallState::TIMED_OUT) {
533
0
    return false;
534
0
  }
535
1
  if (!InitHeader(resp->mutable_header()).ok() && 
!req.dump_timed_out()0
) {
536
    // Note that if we proceed here due to req.dump_timed_out() being true, then the
537
    // header.timeout_millis() will be inaccurate/not-set. This is ok because DumpPB
538
    // is only used for dumping the PB and not to send the RPC over the wire.
539
0
    return false;
540
0
  }
541
1
  resp->set_elapsed_millis(MonoDelta(CoarseMonoClock::Now() - start_).ToMilliseconds());
542
1
  resp->set_state(state_value);
543
1
  if (req.include_traces() && trace_) {
544
1
    resp->set_trace_buffer(trace_->DumpToString(true));
545
1
  }
546
1
  return true;
547
1
}
548
549
0
std::string OutboundCall::LogPrefix() const {
550
0
  return Format("{ OutboundCall@$0 } ", this);
551
0
}
552
553
75.6M
Result<uint32_t> OutboundCall::TimeoutMs() const {
554
75.6M
  MonoDelta timeout = controller_->timeout();
555
75.6M
  if (
timeout.Initialized()75.6M
) {
556
75.6M
    auto timeout_millis = timeout.ToMilliseconds();
557
75.6M
    if (timeout_millis <= 0) {
558
1.37k
      return STATUS(TimedOut, "Call timed out before sending");
559
1.37k
    }
560
75.6M
    return narrow_cast<uint32_t>(timeout_millis);
561
18.4E
  } else {
562
18.4E
    return 0;
563
18.4E
  }
564
75.6M
}
565
566
1
Status OutboundCall::InitHeader(RequestHeader* header) {
567
1
  header->set_call_id(call_id_);
568
1
  remote_method_->ToPB(header->mutable_remote_method());
569
570
1
  if (!IsFinished()) {
571
1
    header->set_timeout_millis(VERIFY_RESULT(TimeoutMs()));
572
1
  }
573
1
  return Status::OK();
574
1
}
575
576
///
577
/// ConnectionId
578
///
579
580
10.2k
string ConnectionId::ToString() const {
581
10.2k
  return Format("{ remote: $0 idx: $1 protocol: $2 }", remote_, idx_, protocol_);
582
10.2k
}
583
584
104M
size_t ConnectionId::HashCode() const {
585
104M
  size_t seed = 0;
586
104M
  boost::hash_combine(seed, hash_value(remote_));
587
104M
  boost::hash_combine(seed, idx_);
588
104M
  boost::hash_combine(seed, protocol_);
589
104M
  return seed;
590
104M
}
591
592
104M
size_t ConnectionIdHash::operator() (const ConnectionId& conn_id) const {
593
104M
  return conn_id.HashCode();
594
104M
}
595
596
///
597
/// CallResponse
598
///
599
600
CallResponse::CallResponse()
601
159M
    : parsed_(false) {
602
159M
}
603
604
17.9M
Result<const uint8_t*const*> CallResponse::GetSidecarPtr(size_t idx) const {
605
17.9M
  DCHECK(parsed_);
606
17.9M
  if (idx + 1 >= sidecar_bounds_.size()) {
607
0
    return STATUS_FORMAT(InvalidArgument, "Index $0 does not reference a valid sidecar", idx);
608
0
  }
609
17.9M
  return &sidecar_bounds_[idx];
610
17.9M
}
611
612
7.21M
Result<SidecarHolder> CallResponse::GetSidecarHolder(size_t idx) const {
613
7.21M
  auto bounds = VERIFY_RESULT(GetSidecarPtr(idx));
614
0
  return SidecarHolder(response_data_.buffer(), Slice(bounds[0], bounds[1]));
615
7.21M
}
616
617
72.7M
Status CallResponse::ParseFrom(CallData* call_data) {
618
72.7M
  CHECK(!parsed_);
619
72.7M
  Slice entire_message;
620
621
72.7M
  response_data_ = std::move(*call_data);
622
72.7M
  Slice source(response_data_.data(), response_data_.size());
623
72.7M
  RETURN_NOT_OK(ParseYBMessage(source, &header_, &entire_message));
624
625
  // Use information from header to extract the payload slices.
626
72.7M
  const size_t sidecars = header_.sidecar_offsets_size();
627
628
72.7M
  if (sidecars > 0) {
629
4.52M
    serialized_response_ = Slice(entire_message.data(),
630
4.52M
                                 header_.sidecar_offsets(0));
631
4.52M
    sidecar_bounds_.reserve(sidecars + 1);
632
633
4.52M
    uint32_t prev_offset = 0;
634
17.9M
    for (auto offset : header_.sidecar_offsets()) {
635
17.9M
      if (
offset > entire_message.size()17.9M
|| offset < prev_offset) {
636
0
        return STATUS_FORMAT(
637
0
            Corruption,
638
0
            "Invalid sidecar offsets; sidecar apparently starts at $0,"
639
0
            " ends at $1, but the entire message has length $2",
640
0
            prev_offset, offset, entire_message.size());
641
0
      }
642
17.9M
      sidecar_bounds_.push_back(entire_message.data() + offset);
643
17.9M
      prev_offset = offset;
644
17.9M
    }
645
4.52M
    sidecar_bounds_.emplace_back(entire_message.end());
646
68.2M
  } else {
647
68.2M
    serialized_response_ = entire_message;
648
68.2M
  }
649
650
72.7M
  parsed_ = true;
651
72.7M
  return Status::OK();
652
72.7M
}
653
654
const std::string kRpcErrorCategoryName = "rpc error";
655
656
StatusCategoryRegisterer rpc_error_category_registerer(
657
    StatusCategoryDescription::Make<RpcErrorTag>(&kRpcErrorCategoryName));
658
659
}  // namespace rpc
660
}  // namespace yb