YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/rpc/yb_rpc.cc
Line
Count
Source (jump to first uncovered line)
1
//
2
// Copyright (c) YugaByte, Inc.
3
//
4
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
5
// in compliance with the License.  You may obtain a copy of the License at
6
//
7
// http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software distributed under the License
10
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
11
// or implied.  See the License for the specific language governing permissions and limitations
12
// under the License.
13
//
14
//
15
16
#include "yb/rpc/yb_rpc.h"
17
18
#include <google/protobuf/io/coded_stream.h>
19
20
#include "yb/gutil/casts.h"
21
#include "yb/gutil/endian.h"
22
23
#include "yb/rpc/connection.h"
24
#include "yb/rpc/messenger.h"
25
#include "yb/rpc/reactor.h"
26
#include "yb/rpc/rpc_context.h"
27
#include "yb/rpc/rpc_introspection.pb.h"
28
#include "yb/rpc/serialization.h"
29
30
#include "yb/util/debug/trace_event.h"
31
#include "yb/util/flag_tags.h"
32
#include "yb/util/format.h"
33
#include "yb/util/memory/memory.h"
34
#include "yb/util/result.h"
35
#include "yb/util/size_literals.h"
36
#include "yb/util/status_format.h"
37
38
using google::protobuf::io::CodedInputStream;
39
using namespace yb::size_literals;
40
using namespace std::literals;
41
42
DECLARE_bool(rpc_dump_all_traces);
43
DECLARE_uint64(rpc_max_message_size);
44
45
DEFINE_bool(enable_rpc_keepalive, true, "Whether to enable RPC keepalive mechanism");
46
47
DEFINE_uint64(min_sidecar_buffer_size, 16_KB, "Minimal buffer to allocate for sidecar");
48
49
DEFINE_test_flag(uint64, yb_inbound_big_calls_parse_delay_ms, false,
50
                 "Test flag for simulating slow parsing of inbound calls larger than "
51
                 "rpc_throttle_threshold_bytes");
52
53
using std::placeholders::_1;
54
DECLARE_uint64(rpc_connection_timeout_ms);
55
DECLARE_int32(rpc_slow_query_threshold_ms);
56
DECLARE_int64(rpc_throttle_threshold_bytes);
57
58
namespace yb {
59
namespace rpc {
60
61
constexpr const auto kHeartbeatsPerTimeoutPeriod = 3;
62
63
namespace {
64
65
// One byte after YugaByte is reserved for future use. It could control type of connection.
66
const char kConnectionHeaderBytes[] = "YB\1";
67
const size_t kConnectionHeaderSize = sizeof(kConnectionHeaderBytes) - 1;
68
69
3.11M
OutboundDataPtr ConnectionHeaderInstance() {
70
3.11M
  static OutboundDataPtr result = std::make_shared<StringOutboundData>(
71
3.11M
      kConnectionHeaderBytes, kConnectionHeaderSize, "ConnectionHeader");
72
3.11M
  return result;
73
3.11M
}
74
75
const char kEmptyMsgLengthPrefix[kMsgLengthPrefixLength] = {0};
76
77
class HeartbeatOutboundData : public StringOutboundData {
78
 public:
79
9.30M
  bool IsHeartbeat() const override { return true; }
80
81
9.31M
  static std::shared_ptr<HeartbeatOutboundData> Instance() {
82
9.31M
    static std::shared_ptr<HeartbeatOutboundData> instance(new HeartbeatOutboundData());
83
9.31M
    return instance;
84
9.31M
  }
85
86
 private:
87
  HeartbeatOutboundData() :
88
11.0k
      StringOutboundData(kEmptyMsgLengthPrefix, kMsgLengthPrefixLength, "Heartbeat") {}
89
};
90
91
} // namespace
92
93
using google::protobuf::FieldDescriptor;
94
using google::protobuf::Message;
95
using google::protobuf::MessageLite;
96
using google::protobuf::io::CodedOutputStream;
97
98
YBConnectionContext::YBConnectionContext(
99
    size_t receive_buffer_size, const MemTrackerPtr& buffer_tracker,
100
    const MemTrackerPtr& call_tracker)
101
    : parser_(buffer_tracker, kMsgLengthPrefixLength, 0 /* size_offset */,
102
              FLAGS_rpc_max_message_size, IncludeHeader::kFalse, rpc::SkipEmptyMessages::kTrue,
103
              this),
104
      read_buffer_(receive_buffer_size, buffer_tracker),
105
3.75M
      call_tracker_(call_tracker) {}
106
107
3.74M
void YBConnectionContext::SetEventLoop(ev::loop_ref* loop) {
108
3.74M
  loop_ = loop;
109
3.74M
}
110
111
6.31M
void YBConnectionContext::Shutdown(const Status& status) {
112
6.31M
  timer_.Shutdown();
113
6.31M
  loop_ = nullptr;
114
6.31M
}
115
116
3.24M
YBConnectionContext::~YBConnectionContext() {}
117
118
namespace {
119
120
1.10G
CoarseMonoClock::Duration Timeout() {
121
1.10G
  return FLAGS_rpc_connection_timeout_ms * 1ms;
122
1.10G
}
123
124
1.10G
CoarseMonoClock::Duration HeartbeatPeriod() {
125
1.10G
  return Timeout() / kHeartbeatsPerTimeoutPeriod;
126
1.10G
}
127
128
} // namespace
129
130
141M
uint64_t YBConnectionContext::ExtractCallId(InboundCall* call) {
131
141M
  return down_cast<YBInboundCall*>(call)->call_id();
132
141M
}
133
134
Result<ProcessCallsResult> YBInboundConnectionContext::ProcessCalls(
135
70.0M
    const ConnectionPtr& connection, const IoVecs& data, ReadBufferFull read_buffer_full) {
136
70.0M
  if (state_ == RpcConnectionPB::NEGOTIATING) {
137
    // We assume that header is fully contained in the first block.
138
609k
    if (data[0].iov_len < kConnectionHeaderSize) {
139
0
      return ProcessCallsResult{ 0, Slice() };
140
0
    }
141
142
609k
    Slice slice(static_cast<const char*>(data[0].iov_base), data[0].iov_len);
143
609k
    if (!slice.starts_with(kConnectionHeaderBytes, kConnectionHeaderSize)) {
144
0
      return STATUS_FORMAT(NetworkError,
145
0
                           "Invalid connection header: $0",
146
0
                           slice.ToDebugHexString());
147
0
    }
148
609k
    state_ = RpcConnectionPB::OPEN;
149
609k
    IoVecs data_copy(data);
150
609k
    data_copy[0].iov_len -= kConnectionHeaderSize;
151
609k
    data_copy[0].iov_base = const_cast<uint8_t*>(slice.data() + kConnectionHeaderSize);
152
609k
    auto result = VERIFY_RESULT(
153
609k
        parser().Parse(connection, data_copy, ReadBufferFull::kFalse, &call_tracker()));
154
0
    result.consumed += kConnectionHeaderSize;
155
609k
    return result;
156
609k
  }
157
158
69.3M
  return parser().Parse(connection, data, read_buffer_full, &call_tracker());
159
70.0M
}
160
161
namespace {
162
163
141M
CHECKED_STATUS ThrottleRpcStatus(const MemTrackerPtr& throttle_tracker, const YBInboundCall& call) {
164
141M
  if (ShouldThrottleRpc(throttle_tracker, call.request_data().size(), "Rejecting RPC call: ")) {
165
0
    return STATUS_FORMAT(ServiceUnavailable, "Call rejected due to memory pressure: $0", call);
166
141M
  } else {
167
141M
    return Status::OK();
168
141M
  }
169
141M
}
170
171
} // namespace
172
173
Status YBInboundConnectionContext::HandleCall(
174
70.8M
    const ConnectionPtr& connection, CallData* call_data) {
175
70.8M
  auto reactor = connection->reactor();
176
70.8M
  DCHECK(reactor->IsCurrentThread());
177
178
70.8M
  auto call = InboundCall::Create<YBInboundCall>(connection, call_processed_listener());
179
180
70.8M
  Status s = call->ParseFrom(call_tracker(), call_data);
181
70.8M
  if (!s.ok()) {
182
0
    return s;
183
0
  }
184
185
70.8M
  s = Store(call.get());
186
70.8M
  if (!s.ok()) {
187
0
    return s;
188
0
  }
189
190
70.8M
  auto throttle_status = ThrottleRpcStatus(call_tracker(), *call);
191
70.8M
  if (!throttle_status.ok()) {
192
0
    call->RespondFailure(ErrorStatusPB::ERROR_APPLICATION, throttle_status);
193
0
    return Status::OK();
194
0
  }
195
196
70.8M
  reactor->messenger()->Handle(call, Queue::kTrue);
197
198
70.8M
  return Status::OK();
199
70.8M
}
200
201
609k
void YBInboundConnectionContext::Connected(const ConnectionPtr& connection) {
202
609k
  DCHECK_EQ(connection->direction(), Connection::Direction::SERVER);
203
204
609k
  state_ = RpcConnectionPB::NEGOTIATING;
205
206
609k
  connection_ = connection;
207
609k
  last_write_time_ = connection->reactor()->cur_time();
208
609k
  if (FLAGS_enable_rpc_keepalive) {
209
609k
    timer_.Init(*loop_);
210
609k
    timer_.SetCallback<
211
609k
        YBInboundConnectionContext, &YBInboundConnectionContext::HandleTimeout>(this);
212
609k
    timer_.Start(HeartbeatPeriod());
213
609k
  }
214
609k
}
215
216
79.4M
void YBInboundConnectionContext::UpdateLastWrite(const ConnectionPtr& connection) {
217
79.4M
  last_write_time_ = connection->reactor()->cur_time();
218
79.4M
  VLOG(4) << connection->ToString() << ": " << "Updated last_write_time_="
219
27.9k
          << AsString(last_write_time_);
220
79.4M
}
221
222
1.09G
void YBInboundConnectionContext::HandleTimeout(ev::timer& watcher, int revents) {  // NOLINT
223
1.09G
  const auto connection = connection_.lock();
224
1.09G
  if (
connection1.09G
) {
225
1.09G
    if (EV_ERROR & revents) {
226
0
      LOG(WARNING) << connection->ToString() << ": " << "Got an error in handle timeout";
227
0
      return;
228
0
    }
229
230
1.09G
    const auto now = connection->reactor()->cur_time();
231
232
1.09G
    const auto deadline =
233
1.09G
        std::max(last_heartbeat_sending_time_, last_write_time_) + HeartbeatPeriod();
234
1.09G
    if (now >= deadline) {
235
9.31M
      if (last_write_time_ >= last_heartbeat_sending_time_) {
236
        // last_write_time_ < last_heartbeat_sending_time_ means that last heartbeat we've queued
237
        // for sending is still in queue due to RPC/networking issues, so no need to queue
238
        // another one.
239
9.31M
        VLOG(4) << connection->ToString() << ": " << "Sending heartbeat, now: " << AsString(now)
240
151
                << ", deadline: " << AsString(deadline)
241
151
                << ", last_write_time_: " << AsString(last_write_time_)
242
151
                << ", last_heartbeat_sending_time_: " << AsString(last_heartbeat_sending_time_);
243
9.31M
        connection->QueueOutboundData(HeartbeatOutboundData::Instance());
244
9.31M
        last_heartbeat_sending_time_ = now;
245
9.31M
      }
246
9.31M
      timer_.Start(HeartbeatPeriod());
247
1.08G
    } else {
248
1.08G
      timer_.Start(deadline - now);
249
1.08G
    }
250
1.09G
  }
251
1.09G
}
252
253
YBInboundCall::YBInboundCall(ConnectionPtr conn, CallProcessedListener call_processed_listener)
254
70.8M
    : InboundCall(std::move(conn), nullptr /* rpc_metrics */, std::move(call_processed_listener)) {}
255
256
YBInboundCall::YBInboundCall(RpcMetrics* rpc_metrics, const RemoteMethod& remote_method)
257
10.8M
    : InboundCall(nullptr /* conn */, rpc_metrics, nullptr /* call_processed_listener */) {
258
10.8M
  header_.remote_method = remote_method.serialized_body();
259
10.8M
}
260
261
81.6M
YBInboundCall::~YBInboundCall() {}
262
263
184M
CoarseTimePoint YBInboundCall::GetClientDeadline() const {
264
184M
  if (header_.timeout_ms == 0) {
265
20.3k
    return CoarseTimePoint::max();
266
20.3k
  }
267
184M
  return ToCoarse(timing_.time_received) + header_.timeout_ms * 1ms;
268
184M
}
269
270
70.8M
Status YBInboundCall::ParseFrom(const MemTrackerPtr& mem_tracker, CallData* call_data) {
271
70.8M
  TRACE_EVENT_FLOW_BEGIN0("rpc", "YBInboundCall", this);
272
70.8M
  TRACE_EVENT0("rpc", "YBInboundCall::ParseFrom");
273
274
70.8M
  Slice source(call_data->data(), call_data->size());
275
70.8M
  RETURN_NOT_OK(ParseYBMessage(source, &header_, &serialized_request_));
276
18.4E
  DVLOG(4) << "Parsed YBInboundCall header: " << header_.call_id;
277
278
70.8M
  consumption_ = ScopedTrackedConsumption(mem_tracker, call_data->size());
279
70.8M
  request_data_memory_usage_.store(call_data->size(), std::memory_order_release);
280
70.8M
  request_data_ = std::move(*call_data);
281
282
  // Adopt the service/method info from the header as soon as it's available.
283
70.8M
  if (PREDICT_FALSE(header_.remote_method.empty())) {
284
0
    return STATUS(Corruption, "Non-connection context request header must specify remote_method");
285
0
  }
286
287
70.8M
  return Status::OK();
288
70.8M
}
289
290
17.9M
size_t YBInboundCall::CopyToLastSidecarBuffer(const Slice& car) {
291
17.9M
  if (sidecar_buffers_.empty()) {
292
4.09M
    return 0;
293
4.09M
  }
294
13.8M
  auto& last_buffer =  sidecar_buffers_.back();
295
13.8M
  auto len = std::min(last_buffer.size() - filled_bytes_in_last_sidecar_buffer_, car.size());
296
13.8M
  memcpy(last_buffer.data() + filled_bytes_in_last_sidecar_buffer_, car.data(), len);
297
13.8M
  filled_bytes_in_last_sidecar_buffer_ += len;
298
299
13.8M
  return len;
300
17.9M
}
301
302
17.9M
size_t YBInboundCall::AddRpcSidecar(Slice car) {
303
17.9M
  sidecar_offsets_.Add(narrow_cast<uint32_t>(total_sidecars_size_));
304
17.9M
  total_sidecars_size_ += car.size();
305
  // Copy start of sidecar to existing buffer if present.
306
17.9M
  car.remove_prefix(CopyToLastSidecarBuffer(car));
307
308
  // If sidecar did not fit into last buffer, then we should allocate a new one.
309
17.9M
  if (!car.empty()) {
310
4.09M
    DCHECK(sidecar_buffers_.empty() ||
311
4.09M
           filled_bytes_in_last_sidecar_buffer_ == sidecar_buffers_.back().size());
312
313
    // Allocate new sidecar buffer and copy remaining part of sidecar to it.
314
4.09M
    AllocateSidecarBuffer(std::max<size_t>(car.size(), FLAGS_min_sidecar_buffer_size));
315
4.09M
    memcpy(sidecar_buffers_.back().data(), car.data(), car.size());
316
4.09M
    filled_bytes_in_last_sidecar_buffer_ = car.size();
317
4.09M
  }
318
319
17.9M
  return num_sidecars_++;
320
17.9M
}
321
322
9.43M
void YBInboundCall::ResetRpcSidecars() {
323
9.43M
  if (consumption_) {
324
2.05M
    for (const auto& buffer : sidecar_buffers_) {
325
0
      consumption_.Add(-buffer.size());
326
0
    }
327
2.05M
  }
328
9.43M
  num_sidecars_ = 0;
329
9.43M
  filled_bytes_in_last_sidecar_buffer_ = 0;
330
9.43M
  total_sidecars_size_ = 0;
331
9.43M
  sidecar_buffers_.clear();
332
9.43M
  sidecar_offsets_.Clear();
333
9.43M
}
334
335
633k
void YBInboundCall::ReserveSidecarSpace(size_t space) {
336
633k
  if (num_sidecars_ != 0) {
337
0
    LOG(DFATAL) << "Attempt to ReserveSidecarSpace when there are already sidecars present";
338
0
    return;
339
0
  }
340
341
633k
  AllocateSidecarBuffer(space);
342
633k
}
343
344
4.72M
void YBInboundCall::AllocateSidecarBuffer(size_t size) {
345
4.72M
  sidecar_buffers_.push_back(RefCntBuffer(size));
346
4.72M
  if (consumption_) {
347
4.52M
    consumption_.Add(size);
348
4.52M
  }
349
4.72M
}
350
351
70.8M
Status YBInboundCall::SerializeResponseBuffer(AnyMessageConstPtr response, bool is_success) {
352
70.8M
  auto body_size = response.SerializedSize();
353
354
70.8M
  ResponseHeader resp_hdr;
355
70.8M
  resp_hdr.set_call_id(header_.call_id);
356
70.8M
  resp_hdr.set_is_error(!is_success);
357
70.8M
  for (auto& offset : sidecar_offsets_) {
358
17.9M
    offset += body_size;
359
17.9M
  }
360
70.8M
  *resp_hdr.mutable_sidecar_offsets() = std::move(sidecar_offsets_);
361
362
70.8M
  response_buf_ = VERIFY_RESULT(SerializeRequest(
363
0
      body_size, total_sidecars_size_, resp_hdr, response));
364
0
  return Status::OK();
365
70.8M
}
366
367
9.26k
string YBInboundCall::ToString() const {
368
9.26k
  return Format("Call $0 $1 => $2 (request call id $3)",
369
9.26k
                header_.RemoteMethodAsString(), remote_address(), local_address(), header_.call_id);
370
9.26k
}
371
372
bool YBInboundCall::DumpPB(const DumpRunningRpcsRequestPB& req,
373
1
                           RpcCallInProgressPB* resp) {
374
1
  header_.ToPB(resp->mutable_header());
375
1
  if (req.include_traces() && trace_) {
376
1
    resp->set_trace_buffer(trace_->DumpToString(true));
377
1
  }
378
1
  resp->set_elapsed_millis(MonoTime::Now().GetDeltaSince(timing_.time_received)
379
1
      .ToMilliseconds());
380
1
  return true;
381
1
}
382
383
70.8M
void YBInboundCall::LogTrace() const {
384
70.8M
  MonoTime now = MonoTime::Now();
385
70.8M
  auto total_time = now.GetDeltaSince(timing_.time_received).ToMilliseconds();
386
387
70.8M
  if (header_.timeout_ms > 0) {
388
70.7M
    double log_threshold = header_.timeout_ms * 0.75f;
389
70.7M
    if (total_time > log_threshold) {
390
      // TODO: consider pushing this onto another thread since it may be slow.
391
      // The traces may also be too large to fit in a log message.
392
8.53k
      LOG(WARNING) << ToString() << " took " << total_time << "ms (client timeout "
393
8.53k
                   << header_.timeout_ms << "ms).";
394
8.53k
      std::string s = trace_->DumpToString(1, true);
395
8.53k
      if (!s.empty()) {
396
6.46k
        LOG(WARNING) << "Trace:\n" << s;
397
6.46k
      }
398
8.53k
      return;
399
8.53k
    }
400
70.7M
  }
401
402
70.8M
  if (PREDICT_FALSE(
403
70.8M
          trace_->must_print() ||
404
70.8M
          FLAGS_rpc_dump_all_traces ||
405
70.8M
          total_time > FLAGS_rpc_slow_query_threshold_ms)) {
406
177
    LOG(INFO) << ToString() << " took " << total_time << "ms. Trace:";
407
177
    trace_->Dump(&LOG(INFO), true);
408
177
  }
409
70.8M
}
410
411
70.8M
void YBInboundCall::DoSerialize(boost::container::small_vector_base<RefCntBuffer>* output) {
412
70.8M
  TRACE_EVENT0("rpc", "YBInboundCall::Serialize");
413
70.8M
  CHECK_GT(response_buf_.size(), 0);
414
70.8M
  output->push_back(std::move(response_buf_));
415
70.8M
  if (!sidecar_buffers_.empty()) {
416
4.52M
    sidecar_buffers_.back().Shrink(filled_bytes_in_last_sidecar_buffer_);
417
4.52M
    for (auto& car : sidecar_buffers_) {
418
4.52M
      output->push_back(std::move(car));
419
4.52M
    }
420
4.52M
    sidecar_buffers_.clear();
421
4.52M
  }
422
70.8M
}
423
424
70.3M
Status YBInboundCall::ParseParam(RpcCallParams* params) {
425
70.3M
  RETURN_NOT_OK(ThrottleRpcStatus(consumption_.mem_tracker(), *this));
426
427
70.3M
  auto consumption = params->ParseRequest(serialized_request());
428
70.3M
  if (!consumption.ok()) {
429
1
    auto status = consumption.status().CloneAndPrepend(
430
1
        Format("Invalid parameter for call $0", header_.RemoteMethodAsString()));
431
1
    LOG(WARNING) << status;
432
1
    return status;
433
1
  }
434
70.3M
  consumption_.Add(*consumption);
435
436
70.3M
  if (PREDICT_FALSE(FLAGS_TEST_yb_inbound_big_calls_parse_delay_ms > 0 &&
437
70.3M
          implicit_cast<ssize_t>(request_data_.size()) > FLAGS_rpc_throttle_threshold_bytes)) {
438
8
    std::this_thread::sleep_for(FLAGS_TEST_yb_inbound_big_calls_parse_delay_ms * 1ms);
439
8
  }
440
441
70.3M
  return Status::OK();
442
70.3M
}
443
444
81.3M
void YBInboundCall::RespondSuccess(AnyMessageConstPtr response) {
445
81.3M
  TRACE_EVENT0("rpc", "InboundCall::RespondSuccess");
446
81.3M
  Respond(response, true);
447
81.3M
}
448
449
void YBInboundCall::RespondFailure(ErrorStatusPB::RpcErrorCodePB error_code,
450
270k
                                   const Status& status) {
451
270k
  TRACE_EVENT0("rpc", "InboundCall::RespondFailure");
452
270k
  ErrorStatusPB err;
453
270k
  err.set_message(status.ToString());
454
270k
  err.set_code(error_code);
455
456
270k
  Respond(AnyMessageConstPtr(&err), false);
457
270k
}
458
459
void YBInboundCall::RespondApplicationError(int error_ext_id, const std::string& message,
460
2.03k
                                            const MessageLite& app_error_pb) {
461
2.03k
  ErrorStatusPB err;
462
2.03k
  ApplicationErrorToPB(error_ext_id, message, app_error_pb, &err);
463
2.03k
  Respond(AnyMessageConstPtr(&err), false);
464
2.03k
}
465
466
void YBInboundCall::ApplicationErrorToPB(int error_ext_id, const std::string& message,
467
                                         const google::protobuf::MessageLite& app_error_pb,
468
2.03k
                                         ErrorStatusPB* err) {
469
2.03k
  err->set_message(message);
470
2.03k
  const FieldDescriptor* app_error_field =
471
2.03k
      err->GetReflection()->FindKnownExtensionByNumber(error_ext_id);
472
2.03k
  if (
app_error_field != nullptr2.03k
) {
473
2.03k
    err->GetReflection()->MutableMessage(err, app_error_field)->CheckTypeAndMergeFrom(app_error_pb);
474
18.4E
  } else {
475
18.4E
    LOG(DFATAL) << "Unable to find application error extension ID " << error_ext_id
476
18.4E
                << " (message=" << message << ")";
477
18.4E
  }
478
2.03k
}
479
480
70.8M
void YBInboundCall::Respond(AnyMessageConstPtr response, bool is_success) {
481
70.8M
  TRACE_EVENT_FLOW_END0("rpc", "InboundCall", this);
482
70.8M
  Status s = SerializeResponseBuffer(response, is_success);
483
70.8M
  if (PREDICT_FALSE(!s.ok())) {
484
0
    RespondFailure(ErrorStatusPB::ERROR_APPLICATION, s);
485
0
    return;
486
0
  }
487
488
70.8M
  TRACE_EVENT_ASYNC_END1("rpc", "InboundCall", this, "method", method_name().ToBuffer());
489
490
70.8M
  QueueResponse(is_success);
491
70.8M
}
492
493
38.6M
Slice YBInboundCall::method_name() const {
494
38.6M
  auto parsed_remote_method = ParseRemoteMethod(header_.remote_method);
495
38.6M
  return parsed_remote_method.ok() ? 
parsed_remote_method->method38.6M
:
Slice()14.2k
;
496
38.6M
}
497
498
Status YBOutboundConnectionContext::HandleCall(
499
72.8M
    const ConnectionPtr& connection, CallData* call_data) {
500
72.8M
  return connection->HandleCallResponse(call_data);
501
72.8M
}
502
503
605k
void YBOutboundConnectionContext::Connected(const ConnectionPtr& connection) {
504
605k
  DCHECK_EQ(connection->direction(), Connection::Direction::CLIENT);
505
605k
  connection_ = connection;
506
605k
  last_read_time_ = connection->reactor()->cur_time();
507
605k
  if (FLAGS_enable_rpc_keepalive) {
508
599k
    timer_.Init(*loop_);
509
599k
    timer_.SetCallback<
510
599k
        YBOutboundConnectionContext, &YBOutboundConnectionContext::HandleTimeout>(this);
511
599k
    timer_.Start(Timeout());
512
599k
  }
513
605k
}
514
515
3.12M
void YBOutboundConnectionContext::AssignConnection(const ConnectionPtr& connection) {
516
3.12M
  connection->QueueOutboundData(ConnectionHeaderInstance());
517
3.12M
}
518
519
Result<ProcessCallsResult> YBOutboundConnectionContext::ProcessCalls(
520
81.2M
    const ConnectionPtr& connection, const IoVecs& data, ReadBufferFull read_buffer_full) {
521
81.2M
  return parser().Parse(connection, data, read_buffer_full, nullptr /* tracker_for_throttle */);
522
81.2M
}
523
524
84.2M
void YBOutboundConnectionContext::UpdateLastRead(const ConnectionPtr& connection) {
525
84.2M
  last_read_time_ = connection->reactor()->cur_time();
526
84.2M
  VLOG
(4) << Format("$0: Updated last_read_time_=$1", connection, last_read_time_)78.4k
;
527
84.2M
}
528
529
7.65M
void YBOutboundConnectionContext::HandleTimeout(ev::timer& watcher, int revents) {  // NOLINT
530
7.65M
  const auto connection = connection_.lock();
531
7.65M
  if (connection) {
532
7.65M
    VLOG
(5) << Format("$0: YBOutboundConnectionContext::HandleTimeout", connection)102
;
533
7.65M
    if (EV_ERROR & revents) {
534
0
      LOG(WARNING) << connection->ToString() << ": " << "Got an error in handle timeout";
535
0
      return;
536
0
    }
537
538
7.65M
    const auto now = connection->reactor()->cur_time();
539
7.65M
    const MonoDelta timeout = Timeout();
540
541
7.65M
    auto deadline = last_read_time_ + timeout;
542
7.65M
    VLOG(5) << Format(
543
885
        "$0: YBOutboundConnectionContext::HandleTimeout last_read_time_: $1, timeout: $2",
544
885
        connection, last_read_time_, timeout);
545
7.65M
    if (now > deadline) {
546
33.1k
      auto passed = now - last_read_time_;
547
33.1k
      const auto status = STATUS_FORMAT(
548
33.1k
          NetworkError, "Rpc timeout, passed: $0, timeout: $1, now: $2, last_read_time_: $3",
549
33.1k
          passed, timeout, now, last_read_time_);
550
33.1k
      LOG(WARNING) << connection->ToString() << ": " << status;
551
33.1k
      connection->reactor()->DestroyConnection(connection.get(), status);
552
33.1k
      return;
553
33.1k
    }
554
555
7.62M
    timer_.Start(deadline - now);
556
7.62M
  }
557
7.65M
}
558
559
} // namespace rpc
560
} // namespace yb