YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/rpc/yb_rpc.cc
Line
Count
Source (jump to first uncovered line)
1
//
2
// Copyright (c) YugaByte, Inc.
3
//
4
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
5
// in compliance with the License.  You may obtain a copy of the License at
6
//
7
// http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software distributed under the License
10
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
11
// or implied.  See the License for the specific language governing permissions and limitations
12
// under the License.
13
//
14
//
15
16
#include "yb/rpc/yb_rpc.h"
17
18
#include <google/protobuf/io/coded_stream.h>
19
20
#include "yb/gutil/casts.h"
21
#include "yb/gutil/endian.h"
22
23
#include "yb/rpc/connection.h"
24
#include "yb/rpc/messenger.h"
25
#include "yb/rpc/reactor.h"
26
#include "yb/rpc/rpc_context.h"
27
#include "yb/rpc/rpc_introspection.pb.h"
28
#include "yb/rpc/serialization.h"
29
30
#include "yb/util/debug/trace_event.h"
31
#include "yb/util/flag_tags.h"
32
#include "yb/util/format.h"
33
#include "yb/util/memory/memory.h"
34
#include "yb/util/result.h"
35
#include "yb/util/size_literals.h"
36
#include "yb/util/status_format.h"
37
38
using google::protobuf::io::CodedInputStream;
39
using namespace yb::size_literals;
40
using namespace std::literals;
41
42
DECLARE_bool(rpc_dump_all_traces);
43
DECLARE_uint64(rpc_max_message_size);
44
45
DEFINE_bool(enable_rpc_keepalive, true, "Whether to enable RPC keepalive mechanism");
46
47
DEFINE_uint64(min_sidecar_buffer_size, 16_KB, "Minimal buffer to allocate for sidecar");
48
49
DEFINE_test_flag(uint64, yb_inbound_big_calls_parse_delay_ms, false,
50
                 "Test flag for simulating slow parsing of inbound calls larger than "
51
                 "rpc_throttle_threshold_bytes");
52
53
using std::placeholders::_1;
54
DECLARE_uint64(rpc_connection_timeout_ms);
55
DECLARE_int32(rpc_slow_query_threshold_ms);
56
DECLARE_int64(rpc_throttle_threshold_bytes);
57
58
namespace yb {
59
namespace rpc {
60
61
constexpr const auto kHeartbeatsPerTimeoutPeriod = 3;
62
63
namespace {
64
65
// One byte after YugaByte is reserved for future use. It could control type of connection.
66
const char kConnectionHeaderBytes[] = "YB\1";
67
const size_t kConnectionHeaderSize = sizeof(kConnectionHeaderBytes) - 1;
68
69
622k
OutboundDataPtr ConnectionHeaderInstance() {
70
622k
  static OutboundDataPtr result = std::make_shared<StringOutboundData>(
71
622k
      kConnectionHeaderBytes, kConnectionHeaderSize, "ConnectionHeader");
72
622k
  return result;
73
622k
}
74
75
const char kEmptyMsgLengthPrefix[kMsgLengthPrefixLength] = {0};
76
77
class HeartbeatOutboundData : public StringOutboundData {
78
 public:
79
445k
  bool IsHeartbeat() const override { return true; }
80
81
446k
  static std::shared_ptr<HeartbeatOutboundData> Instance() {
82
446k
    static std::shared_ptr<HeartbeatOutboundData> instance(new HeartbeatOutboundData());
83
446k
    return instance;
84
446k
  }
85
86
 private:
87
  HeartbeatOutboundData() :
88
6.86k
      StringOutboundData(kEmptyMsgLengthPrefix, kMsgLengthPrefixLength, "Heartbeat") {}
89
};
90
91
} // namespace
92
93
using google::protobuf::FieldDescriptor;
94
using google::protobuf::Message;
95
using google::protobuf::MessageLite;
96
using google::protobuf::io::CodedOutputStream;
97
98
YBConnectionContext::YBConnectionContext(
99
    size_t receive_buffer_size, const MemTrackerPtr& buffer_tracker,
100
    const MemTrackerPtr& call_tracker)
101
    : parser_(buffer_tracker, kMsgLengthPrefixLength, 0 /* size_offset */,
102
              FLAGS_rpc_max_message_size, IncludeHeader::kFalse, rpc::SkipEmptyMessages::kTrue,
103
              this),
104
      read_buffer_(receive_buffer_size, buffer_tracker),
105
934k
      call_tracker_(call_tracker) {}
106
107
934k
void YBConnectionContext::SetEventLoop(ev::loop_ref* loop) {
108
934k
  loop_ = loop;
109
934k
}
110
111
1.19M
void YBConnectionContext::Shutdown(const Status& status) {
112
1.19M
  timer_.Shutdown();
113
1.19M
  loop_ = nullptr;
114
1.19M
}
115
116
615k
YBConnectionContext::~YBConnectionContext() {}
117
118
namespace {
119
120
99.1M
CoarseMonoClock::Duration Timeout() {
121
99.1M
  return FLAGS_rpc_connection_timeout_ms * 1ms;
122
99.1M
}
123
124
98.4M
CoarseMonoClock::Duration HeartbeatPeriod() {
125
98.4M
  return Timeout() / kHeartbeatsPerTimeoutPeriod;
126
98.4M
}
127
128
} // namespace
129
130
39.1M
uint64_t YBConnectionContext::ExtractCallId(InboundCall* call) {
131
39.1M
  return down_cast<YBInboundCall*>(call)->call_id();
132
39.1M
}
133
134
Result<ProcessCallsResult> YBInboundConnectionContext::ProcessCalls(
135
16.3M
    const ConnectionPtr& connection, const IoVecs& data, ReadBufferFull read_buffer_full) {
136
16.3M
  if (state_ == RpcConnectionPB::NEGOTIATING) {
137
    // We assume that header is fully contained in the first block.
138
254k
    if (data[0].iov_len < kConnectionHeaderSize) {
139
0
      return ProcessCallsResult{ 0, Slice() };
140
0
    }
141
142
254k
    Slice slice(static_cast<const char*>(data[0].iov_base), data[0].iov_len);
143
254k
    if (!slice.starts_with(kConnectionHeaderBytes, kConnectionHeaderSize)) {
144
0
      return STATUS_FORMAT(NetworkError,
145
0
                           "Invalid connection header: $0",
146
0
                           slice.ToDebugHexString());
147
0
    }
148
254k
    state_ = RpcConnectionPB::OPEN;
149
254k
    IoVecs data_copy(data);
150
254k
    data_copy[0].iov_len -= kConnectionHeaderSize;
151
254k
    data_copy[0].iov_base = const_cast<uint8_t*>(slice.data() + kConnectionHeaderSize);
152
254k
    auto result = VERIFY_RESULT(
153
254k
        parser().Parse(connection, data_copy, ReadBufferFull::kFalse, &call_tracker()));
154
254k
    result.consumed += kConnectionHeaderSize;
155
254k
    return result;
156
16.0M
  }
157
158
16.0M
  return parser().Parse(connection, data, read_buffer_full, &call_tracker());
159
16.0M
}
160
161
namespace {
162
163
38.9M
CHECKED_STATUS ThrottleRpcStatus(const MemTrackerPtr& throttle_tracker, const YBInboundCall& call) {
164
38.9M
  if (ShouldThrottleRpc(throttle_tracker, call.request_data().size(), "Rejecting RPC call: ")) {
165
1
    return STATUS_FORMAT(ServiceUnavailable, "Call rejected due to memory pressure: $0", call);
166
38.9M
  } else {
167
38.9M
    return Status::OK();
168
38.9M
  }
169
38.9M
}
170
171
} // namespace
172
173
Status YBInboundConnectionContext::HandleCall(
174
19.6M
    const ConnectionPtr& connection, CallData* call_data) {
175
19.6M
  auto reactor = connection->reactor();
176
19.6M
  DCHECK(reactor->IsCurrentThread());
177
178
19.6M
  auto call = InboundCall::Create<YBInboundCall>(connection, call_processed_listener());
179
180
19.6M
  Status s = call->ParseFrom(call_tracker(), call_data);
181
19.6M
  if (!s.ok()) {
182
0
    return s;
183
0
  }
184
185
19.6M
  s = Store(call.get());
186
19.6M
  if (!s.ok()) {
187
0
    return s;
188
0
  }
189
190
19.6M
  auto throttle_status = ThrottleRpcStatus(call_tracker(), *call);
191
19.6M
  if (!throttle_status.ok()) {
192
1
    call->RespondFailure(ErrorStatusPB::ERROR_APPLICATION, throttle_status);
193
1
    return Status::OK();
194
1
  }
195
196
19.6M
  reactor->messenger()->Handle(call, Queue::kTrue);
197
198
19.6M
  return Status::OK();
199
19.6M
}
200
201
304k
void YBInboundConnectionContext::Connected(const ConnectionPtr& connection) {
202
304k
  DCHECK_EQ(connection->direction(), Connection::Direction::SERVER);
203
204
304k
  state_ = RpcConnectionPB::NEGOTIATING;
205
206
304k
  connection_ = connection;
207
304k
  last_write_time_ = connection->reactor()->cur_time();
208
304k
  if (FLAGS_enable_rpc_keepalive) {
209
304k
    timer_.Init(*loop_);
210
304k
    timer_.SetCallback<
211
304k
        YBInboundConnectionContext, &YBInboundConnectionContext::HandleTimeout>(this);
212
304k
    timer_.Start(HeartbeatPeriod());
213
304k
  }
214
304k
}
215
216
19.5M
void YBInboundConnectionContext::UpdateLastWrite(const ConnectionPtr& connection) {
217
19.5M
  last_write_time_ = connection->reactor()->cur_time();
218
7.55k
  VLOG(4) << connection->ToString() << ": " << "Updated last_write_time_="
219
7.55k
          << AsString(last_write_time_);
220
19.5M
}
221
222
97.7M
void YBInboundConnectionContext::HandleTimeout(ev::timer& watcher, int revents) {  // NOLINT
223
97.7M
  const auto connection = connection_.lock();
224
97.7M
  if (connection) {
225
97.7M
    if (EV_ERROR & revents) {
226
0
      LOG(WARNING) << connection->ToString() << ": " << "Got an error in handle timeout";
227
0
      return;
228
0
    }
229
230
97.7M
    const auto now = connection->reactor()->cur_time();
231
232
97.7M
    const auto deadline =
233
97.7M
        std::max(last_heartbeat_sending_time_, last_write_time_) + HeartbeatPeriod();
234
97.7M
    if (now >= deadline) {
235
446k
      if (last_write_time_ >= last_heartbeat_sending_time_) {
236
        // last_write_time_ < last_heartbeat_sending_time_ means that last heartbeat we've queued
237
        // for sending is still in queue due to RPC/networking issues, so no need to queue
238
        // another one.
239
56
        VLOG(4) << connection->ToString() << ": " << "Sending heartbeat, now: " << AsString(now)
240
56
                << ", deadline: " << AsString(deadline)
241
56
                << ", last_write_time_: " << AsString(last_write_time_)
242
56
                << ", last_heartbeat_sending_time_: " << AsString(last_heartbeat_sending_time_);
243
446k
        connection->QueueOutboundData(HeartbeatOutboundData::Instance());
244
446k
        last_heartbeat_sending_time_ = now;
245
446k
      }
246
446k
      timer_.Start(HeartbeatPeriod());
247
97.2M
    } else {
248
97.2M
      timer_.Start(deadline - now);
249
97.2M
    }
250
97.7M
  }
251
97.7M
}
252
253
YBInboundCall::YBInboundCall(ConnectionPtr conn, CallProcessedListener call_processed_listener)
254
19.6M
    : InboundCall(std::move(conn), nullptr /* rpc_metrics */, std::move(call_processed_listener)) {}
255
256
YBInboundCall::YBInboundCall(RpcMetrics* rpc_metrics, const RemoteMethod& remote_method)
257
5.58M
    : InboundCall(nullptr /* conn */, rpc_metrics, nullptr /* call_processed_listener */) {
258
5.58M
  header_.remote_method = remote_method.serialized_body();
259
5.58M
}
260
261
25.1M
YBInboundCall::~YBInboundCall() {}
262
263
53.7M
CoarseTimePoint YBInboundCall::GetClientDeadline() const {
264
53.7M
  if (header_.timeout_ms == 0) {
265
18.7k
    return CoarseTimePoint::max();
266
18.7k
  }
267
53.7M
  return ToCoarse(timing_.time_received) + header_.timeout_ms * 1ms;
268
53.7M
}
269
270
19.5M
Status YBInboundCall::ParseFrom(const MemTrackerPtr& mem_tracker, CallData* call_data) {
271
19.5M
  TRACE_EVENT_FLOW_BEGIN0("rpc", "YBInboundCall", this);
272
19.5M
  TRACE_EVENT0("rpc", "YBInboundCall::ParseFrom");
273
274
19.5M
  Slice source(call_data->data(), call_data->size());
275
19.5M
  RETURN_NOT_OK(ParseYBMessage(source, &header_, &serialized_request_));
276
18.4E
  DVLOG(4) << "Parsed YBInboundCall header: " << header_.call_id;
277
278
19.5M
  consumption_ = ScopedTrackedConsumption(mem_tracker, call_data->size());
279
19.5M
  request_data_memory_usage_.store(call_data->size(), std::memory_order_release);
280
19.5M
  request_data_ = std::move(*call_data);
281
282
  // Adopt the service/method info from the header as soon as it's available.
283
19.5M
  if (PREDICT_FALSE(header_.remote_method.empty())) {
284
0
    return STATUS(Corruption, "Non-connection context request header must specify remote_method");
285
0
  }
286
287
19.5M
  return Status::OK();
288
19.5M
}
289
290
5.85M
size_t YBInboundCall::CopyToLastSidecarBuffer(const Slice& car) {
291
5.85M
  if (sidecar_buffers_.empty()) {
292
1.62M
    return 0;
293
1.62M
  }
294
4.23M
  auto& last_buffer =  sidecar_buffers_.back();
295
4.23M
  auto len = std::min(last_buffer.size() - filled_bytes_in_last_sidecar_buffer_, car.size());
296
4.23M
  memcpy(last_buffer.data() + filled_bytes_in_last_sidecar_buffer_, car.data(), len);
297
4.23M
  filled_bytes_in_last_sidecar_buffer_ += len;
298
299
4.23M
  return len;
300
4.23M
}
301
302
5.85M
size_t YBInboundCall::AddRpcSidecar(Slice car) {
303
5.85M
  sidecar_offsets_.Add(narrow_cast<uint32_t>(total_sidecars_size_));
304
5.85M
  total_sidecars_size_ += car.size();
305
  // Copy start of sidecar to existing buffer if present.
306
5.85M
  car.remove_prefix(CopyToLastSidecarBuffer(car));
307
308
  // If sidecar did not fit into last buffer, then we should allocate a new one.
309
5.85M
  if (!car.empty()) {
310
1.61M
    DCHECK(sidecar_buffers_.empty() ||
311
1.61M
           filled_bytes_in_last_sidecar_buffer_ == sidecar_buffers_.back().size());
312
313
    // Allocate new sidecar buffer and copy remaining part of sidecar to it.
314
1.61M
    AllocateSidecarBuffer(std::max<size_t>(car.size(), FLAGS_min_sidecar_buffer_size));
315
1.61M
    memcpy(sidecar_buffers_.back().data(), car.data(), car.size());
316
1.61M
    filled_bytes_in_last_sidecar_buffer_ = car.size();
317
1.61M
  }
318
319
5.85M
  return num_sidecars_++;
320
5.85M
}
321
322
4.64M
void YBInboundCall::ResetRpcSidecars() {
323
4.64M
  if (consumption_) {
324
0
    for (const auto& buffer : sidecar_buffers_) {
325
0
      consumption_.Add(-buffer.size());
326
0
    }
327
902k
  }
328
4.64M
  num_sidecars_ = 0;
329
4.64M
  filled_bytes_in_last_sidecar_buffer_ = 0;
330
4.64M
  total_sidecars_size_ = 0;
331
4.64M
  sidecar_buffers_.clear();
332
4.64M
  sidecar_offsets_.Clear();
333
4.64M
}
334
335
236k
void YBInboundCall::ReserveSidecarSpace(size_t space) {
336
236k
  if (num_sidecars_ != 0) {
337
0
    LOG(DFATAL) << "Attempt to ReserveSidecarSpace when there are already sidecars present";
338
0
    return;
339
0
  }
340
341
236k
  AllocateSidecarBuffer(space);
342
236k
}
343
344
1.85M
void YBInboundCall::AllocateSidecarBuffer(size_t size) {
345
1.85M
  sidecar_buffers_.push_back(RefCntBuffer(size));
346
1.85M
  if (consumption_) {
347
1.79M
    consumption_.Add(size);
348
1.79M
  }
349
1.85M
}
350
351
19.6M
Status YBInboundCall::SerializeResponseBuffer(AnyMessageConstPtr response, bool is_success) {
352
19.6M
  auto body_size = response.SerializedSize();
353
354
19.6M
  ResponseHeader resp_hdr;
355
19.6M
  resp_hdr.set_call_id(header_.call_id);
356
19.6M
  resp_hdr.set_is_error(!is_success);
357
5.84M
  for (auto& offset : sidecar_offsets_) {
358
5.84M
    offset += body_size;
359
5.84M
  }
360
19.6M
  *resp_hdr.mutable_sidecar_offsets() = std::move(sidecar_offsets_);
361
362
19.6M
  response_buf_ = VERIFY_RESULT(SerializeRequest(
363
19.6M
      body_size, total_sidecars_size_, resp_hdr, response));
364
19.6M
  return Status::OK();
365
19.6M
}
366
367
2.45k
string YBInboundCall::ToString() const {
368
2.45k
  return Format("Call $0 $1 => $2 (request call id $3)",
369
2.45k
                header_.RemoteMethodAsString(), remote_address(), local_address(), header_.call_id);
370
2.45k
}
371
372
bool YBInboundCall::DumpPB(const DumpRunningRpcsRequestPB& req,
373
1
                           RpcCallInProgressPB* resp) {
374
1
  header_.ToPB(resp->mutable_header());
375
1
  if (req.include_traces() && trace_) {
376
1
    resp->set_trace_buffer(trace_->DumpToString(true));
377
1
  }
378
1
  resp->set_elapsed_millis(MonoTime::Now().GetDeltaSince(timing_.time_received)
379
1
      .ToMilliseconds());
380
1
  return true;
381
1
}
382
383
19.5M
void YBInboundCall::LogTrace() const {
384
19.5M
  MonoTime now = MonoTime::Now();
385
19.5M
  auto total_time = now.GetDeltaSince(timing_.time_received).ToMilliseconds();
386
387
19.5M
  if (header_.timeout_ms > 0) {
388
19.5M
    double log_threshold = header_.timeout_ms * 0.75f;
389
19.5M
    if (total_time > log_threshold) {
390
      // TODO: consider pushing this onto another thread since it may be slow.
391
      // The traces may also be too large to fit in a log message.
392
2.40k
      LOG(WARNING) << ToString() << " took " << total_time << "ms (client timeout "
393
2.40k
                   << header_.timeout_ms << "ms).";
394
2.40k
      std::string s = trace_->DumpToString(1, true);
395
2.40k
      if (!s.empty()) {
396
2.17k
        LOG(WARNING) << "Trace:\n" << s;
397
2.17k
      }
398
2.40k
      return;
399
2.40k
    }
400
19.5M
  }
401
402
19.5M
  if (PREDICT_FALSE(
403
19.5M
          trace_->must_print() ||
404
19.5M
          FLAGS_rpc_dump_all_traces ||
405
12
          total_time > FLAGS_rpc_slow_query_threshold_ms)) {
406
12
    LOG(INFO) << ToString() << " took " << total_time << "ms. Trace:";
407
12
    trace_->Dump(&LOG(INFO), true);
408
12
  }
409
19.5M
}
410
411
16.5M
void YBInboundCall::DoSerialize(boost::container::small_vector_base<RefCntBuffer>* output) {
412
16.5M
  TRACE_EVENT0("rpc", "YBInboundCall::Serialize");
413
16.5M
  CHECK_GT(response_buf_.size(), 0);
414
16.5M
  output->push_back(std::move(response_buf_));
415
16.5M
  if (!sidecar_buffers_.empty()) {
416
1.78M
    sidecar_buffers_.back().Shrink(filled_bytes_in_last_sidecar_buffer_);
417
1.79M
    for (auto& car : sidecar_buffers_) {
418
1.79M
      output->push_back(std::move(car));
419
1.79M
    }
420
1.78M
    sidecar_buffers_.clear();
421
1.78M
  }
422
16.5M
}
423
424
19.3M
Status YBInboundCall::ParseParam(RpcCallParams* params) {
425
19.3M
  RETURN_NOT_OK(ThrottleRpcStatus(consumption_.mem_tracker(), *this));
426
427
19.3M
  auto consumption = params->ParseRequest(serialized_request());
428
19.3M
  if (!consumption.ok()) {
429
1
    auto status = consumption.status().CloneAndPrepend(
430
1
        Format("Invalid parameter for call $0", header_.RemoteMethodAsString()));
431
1
    LOG(WARNING) << status;
432
1
    return status;
433
1
  }
434
19.3M
  consumption_.Add(*consumption);
435
436
19.3M
  if (PREDICT_FALSE(FLAGS_TEST_yb_inbound_big_calls_parse_delay_ms > 0 &&
437
5
          implicit_cast<ssize_t>(request_data_.size()) > FLAGS_rpc_throttle_threshold_bytes)) {
438
5
    std::this_thread::sleep_for(FLAGS_TEST_yb_inbound_big_calls_parse_delay_ms * 1ms);
439
5
  }
440
441
19.3M
  return Status::OK();
442
19.3M
}
443
444
25.1M
void YBInboundCall::RespondSuccess(AnyMessageConstPtr response) {
445
25.1M
  TRACE_EVENT0("rpc", "InboundCall::RespondSuccess");
446
25.1M
  Respond(response, true);
447
25.1M
}
448
449
void YBInboundCall::RespondFailure(ErrorStatusPB::RpcErrorCodePB error_code,
450
1.32k
                                   const Status& status) {
451
1.32k
  TRACE_EVENT0("rpc", "InboundCall::RespondFailure");
452
1.32k
  ErrorStatusPB err;
453
1.32k
  err.set_message(status.ToString());
454
1.32k
  err.set_code(error_code);
455
456
1.32k
  Respond(AnyMessageConstPtr(&err), false);
457
1.32k
}
458
459
void YBInboundCall::RespondApplicationError(int error_ext_id, const std::string& message,
460
1.02k
                                            const MessageLite& app_error_pb) {
461
1.02k
  ErrorStatusPB err;
462
1.02k
  ApplicationErrorToPB(error_ext_id, message, app_error_pb, &err);
463
1.02k
  Respond(AnyMessageConstPtr(&err), false);
464
1.02k
}
465
466
void YBInboundCall::ApplicationErrorToPB(int error_ext_id, const std::string& message,
467
                                         const google::protobuf::MessageLite& app_error_pb,
468
1.02k
                                         ErrorStatusPB* err) {
469
1.02k
  err->set_message(message);
470
1.02k
  const FieldDescriptor* app_error_field =
471
1.02k
      err->GetReflection()->FindKnownExtensionByNumber(error_ext_id);
472
1.02k
  if (app_error_field != nullptr) {
473
1.02k
    err->GetReflection()->MutableMessage(err, app_error_field)->CheckTypeAndMergeFrom(app_error_pb);
474
18.4E
  } else {
475
18.4E
    LOG(DFATAL) << "Unable to find application error extension ID " << error_ext_id
476
18.4E
                << " (message=" << message << ")";
477
18.4E
  }
478
1.02k
}
479
480
19.6M
void YBInboundCall::Respond(AnyMessageConstPtr response, bool is_success) {
481
19.6M
  TRACE_EVENT_FLOW_END0("rpc", "InboundCall", this);
482
19.6M
  Status s = SerializeResponseBuffer(response, is_success);
483
19.6M
  if (PREDICT_FALSE(!s.ok())) {
484
0
    RespondFailure(ErrorStatusPB::ERROR_APPLICATION, s);
485
0
    return;
486
0
  }
487
488
19.6M
  TRACE_EVENT_ASYNC_END1("rpc", "InboundCall", this, "method", method_name().ToBuffer());
489
490
19.6M
  QueueResponse(is_success);
491
19.6M
}
492
493
3.20M
Slice YBInboundCall::method_name() const {
494
3.20M
  auto parsed_remote_method = ParseRemoteMethod(header_.remote_method);
495
3.20M
  return parsed_remote_method.ok() ? parsed_remote_method->method : Slice();
496
3.20M
}
497
498
Status YBOutboundConnectionContext::HandleCall(
499
19.5M
    const ConnectionPtr& connection, CallData* call_data) {
500
19.5M
  return connection->HandleCallResponse(call_data);
501
19.5M
}
502
503
297k
void YBOutboundConnectionContext::Connected(const ConnectionPtr& connection) {
504
297k
  DCHECK_EQ(connection->direction(), Connection::Direction::CLIENT);
505
297k
  connection_ = connection;
506
297k
  last_read_time_ = connection->reactor()->cur_time();
507
297k
  if (FLAGS_enable_rpc_keepalive) {
508
296k
    timer_.Init(*loop_);
509
296k
    timer_.SetCallback<
510
296k
        YBOutboundConnectionContext, &YBOutboundConnectionContext::HandleTimeout>(this);
511
296k
    timer_.Start(Timeout());
512
296k
  }
513
297k
}
514
515
625k
void YBOutboundConnectionContext::AssignConnection(const ConnectionPtr& connection) {
516
625k
  connection->QueueOutboundData(ConnectionHeaderInstance());
517
625k
}
518
519
Result<ProcessCallsResult> YBOutboundConnectionContext::ProcessCalls(
520
16.6M
    const ConnectionPtr& connection, const IoVecs& data, ReadBufferFull read_buffer_full) {
521
16.6M
  return parser().Parse(connection, data, read_buffer_full, nullptr /* tracker_for_throttle */);
522
16.6M
}
523
524
19.9M
void YBOutboundConnectionContext::UpdateLastRead(const ConnectionPtr& connection) {
525
19.9M
  last_read_time_ = connection->reactor()->cur_time();
526
11.8k
  VLOG(4) << Format("$0: Updated last_read_time_=$1", connection, last_read_time_);
527
19.9M
}
528
529
399k
void YBOutboundConnectionContext::HandleTimeout(ev::timer& watcher, int revents) {  // NOLINT
530
399k
  const auto connection = connection_.lock();
531
399k
  if (connection) {
532
120
    VLOG(5) << Format("$0: YBOutboundConnectionContext::HandleTimeout", connection);
533
399k
    if (EV_ERROR & revents) {
534
0
      LOG(WARNING) << connection->ToString() << ": " << "Got an error in handle timeout";
535
0
      return;
536
0
    }
537
538
399k
    const auto now = connection->reactor()->cur_time();
539
399k
    const MonoDelta timeout = Timeout();
540
541
399k
    auto deadline = last_read_time_ + timeout;
542
346
    VLOG(5) << Format(
543
346
        "$0: YBOutboundConnectionContext::HandleTimeout last_read_time_: $1, timeout: $2",
544
346
        connection, last_read_time_, timeout);
545
399k
    if (now > deadline) {
546
1.18k
      auto passed = now - last_read_time_;
547
1.18k
      const auto status = STATUS_FORMAT(
548
1.18k
          NetworkError, "Rpc timeout, passed: $0, timeout: $1, now: $2, last_read_time_: $3",
549
1.18k
          passed, timeout, now, last_read_time_);
550
1.18k
      LOG(WARNING) << connection->ToString() << ": " << status;
551
1.18k
      connection->reactor()->DestroyConnection(connection.get(), status);
552
1.18k
      return;
553
1.18k
    }
554
555
398k
    timer_.Start(deadline - now);
556
398k
  }
557
399k
}
558
559
} // namespace rpc
560
} // namespace yb