YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/rpc/inbound_call.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include "yb/rpc/inbound_call.h"
34
35
#include "yb/gutil/strings/substitute.h"
36
37
#include "yb/rpc/connection.h"
38
#include "yb/rpc/connection_context.h"
39
#include "yb/rpc/rpc_introspection.pb.h"
40
#include "yb/rpc/rpc_metrics.h"
41
#include "yb/rpc/service_if.h"
42
43
#include "yb/util/debug/trace_event.h"
44
#include "yb/util/flag_tags.h"
45
#include "yb/util/logging.h"
46
#include "yb/util/metrics.h"
47
#include "yb/util/trace.h"
48
49
using std::shared_ptr;
50
using std::vector;
51
using strings::Substitute;
52
53
DEFINE_bool(rpc_dump_all_traces, false,
54
            "If true, dump all RPC traces at INFO level");
55
TAG_FLAG(rpc_dump_all_traces, advanced);
56
TAG_FLAG(rpc_dump_all_traces, runtime);
57
58
DEFINE_bool(collect_end_to_end_traces, false,
59
            "If true, collected traces includes information for sub-components "
60
            "potentially running on a different server. ");
61
TAG_FLAG(collect_end_to_end_traces, advanced);
62
TAG_FLAG(collect_end_to_end_traces, runtime);
63
64
DEFINE_int32(print_trace_every, 0,
65
             "Controls the rate at which traces are printed. Setting this to 0 "
66
             "disables printing the collected traces.");
67
TAG_FLAG(print_trace_every, advanced);
68
TAG_FLAG(print_trace_every, runtime);
69
70
DEFINE_int32(rpc_slow_query_threshold_ms, 10000,
71
             "Traces for calls that take longer than this threshold (in ms) are logged");
72
TAG_FLAG(rpc_slow_query_threshold_ms, advanced);
73
TAG_FLAG(rpc_slow_query_threshold_ms, runtime);
74
75
namespace yb {
76
namespace rpc {
77
78
InboundCall::InboundCall(ConnectionPtr conn, RpcMetrics* rpc_metrics,
79
                         CallProcessedListener call_processed_listener)
80
    : trace_(new Trace),
81
      conn_(std::move(conn)),
82
      rpc_metrics_(rpc_metrics ? rpc_metrics : &conn_->rpc_metrics()),
83
29.8M
      call_processed_listener_(std::move(call_processed_listener)) {
84
29.8M
  TRACE_TO(trace_, "Created InboundCall");
85
29.8M
  IncrementCounter(rpc_metrics_->inbound_calls_created);
86
29.8M
  IncrementGauge(rpc_metrics_->inbound_calls_alive);
87
29.8M
}
88
89
29.8M
InboundCall::~InboundCall() {
90
29.8M
  TRACE_TO(trace_, "Destroying InboundCall");
91
0
  YB_LOG_IF_EVERY_N(INFO, FLAGS_print_trace_every > 0, FLAGS_print_trace_every)
92
0
      << "Tracing op: \n " << trace_->DumpToString(true);
93
29.8M
  DecrementGauge(rpc_metrics_->inbound_calls_alive);
94
29.8M
}
95
96
24.2M
void InboundCall::NotifyTransferred(const Status& status, Connection* conn) {
97
24.2M
  if (status.ok()) {
98
24.2M
    TRACE_TO(trace_, "Transfer finished");
99
16.1k
  } else {
100
16.1k
    YB_LOG_EVERY_N_SECS(WARNING, 10) << LogPrefix() << "Connection torn down before " << ToString()
101
11
                                     << " could send its response: " << status.ToString();
102
16.1k
  }
103
24.2M
  if (call_processed_listener_) {
104
24.2M
    call_processed_listener_(this);
105
24.2M
  }
106
24.2M
}
107
108
11.7M
const Endpoint& InboundCall::remote_address() const {
109
11.7M
  CHECK_NOTNULL(conn_.get());
110
11.7M
  return conn_->remote();
111
11.7M
}
112
113
2.45k
const Endpoint& InboundCall::local_address() const {
114
2.45k
  CHECK_NOTNULL(conn_.get());
115
2.45k
  return conn_->local();
116
2.45k
}
117
118
91.1M
ConnectionPtr InboundCall::connection() const {
119
91.1M
  return conn_;
120
91.1M
}
121
122
0
ConnectionContext& InboundCall::connection_context() const {
123
0
  return conn_->context();
124
0
}
125
126
42.7M
Trace* InboundCall::trace() {
127
42.7M
  return trace_.get();
128
42.7M
}
129
130
29.8M
void InboundCall::RecordCallReceived() {
131
29.8M
  TRACE_EVENT_ASYNC_BEGIN0("rpc", "InboundCall", this);
132
  // Protect against multiple calls.
133
29.5k
  LOG_IF_WITH_PREFIX(DFATAL, timing_.time_received.Initialized()) << "Already marked as received";
134
28.6k
  VLOG_WITH_PREFIX(4) << "Received";
135
29.8M
  timing_.time_received = MonoTime::Now();
136
29.8M
}
137
138
29.8M
void InboundCall::RecordHandlingStarted(scoped_refptr<Histogram> incoming_queue_time) {
139
29.8M
  DCHECK(incoming_queue_time != nullptr);
140
  // Protect against multiple calls.
141
3.68k
  LOG_IF_WITH_PREFIX(DFATAL, timing_.time_handled.Initialized()) << "Already marked as started";
142
29.8M
  timing_.time_handled = MonoTime::Now();
143
1.47k
  VLOG_WITH_PREFIX(4) << "Handling";
144
29.8M
  incoming_queue_time->Increment(
145
29.8M
      timing_.time_handled.GetDeltaSince(timing_.time_received).ToMicroseconds());
146
29.8M
}
147
148
1.18M
MonoDelta InboundCall::GetTimeInQueue() const {
149
1.18M
  return timing_.time_handled.GetDeltaSince(timing_.time_received);
150
1.18M
}
151
152
29.8M
ThreadPoolTask* InboundCall::BindTask(InboundCallHandler* handler) {
153
29.8M
  auto shared_this = shared_from(this);
154
29.8M
  if (!handler->CallQueued()) {
155
1.10k
    return nullptr;
156
1.10k
  }
157
29.8M
  tracker_ = handler;
158
29.8M
  task_.Bind(handler, shared_this);
159
29.8M
  return &task_;
160
29.8M
}
161
162
29.6M
void InboundCall::RecordHandlingCompleted() {
163
  // Protect against multiple calls.
164
8.36k
  LOG_IF_WITH_PREFIX(DFATAL, timing_.time_completed.Initialized()) << "Already marked as completed";
165
29.6M
  timing_.time_completed = MonoTime::Now();
166
18.4E
  VLOG_WITH_PREFIX(4) << "Completed handling";
167
29.6M
  if (rpc_method_handler_latency_) {
168
29.5M
    rpc_method_handler_latency_->Increment(
169
29.5M
        (timing_.time_completed - timing_.time_handled).ToMicroseconds());
170
29.5M
  }
171
29.6M
}
172
173
29.8M
bool InboundCall::ClientTimedOut() const {
174
29.8M
  auto deadline = GetClientDeadline();
175
29.8M
  if (deadline == CoarseTimePoint::max()) {
176
114k
    return false;
177
114k
  }
178
179
29.7M
  return deadline < CoarseMonoClock::now();
180
29.7M
}
181
182
24.2M
void InboundCall::QueueResponse(bool is_success) {
183
24.2M
  TRACE_TO(trace_, is_success ? "Queueing success response" : "Queueing failure response");
184
24.2M
  LogTrace();
185
24.2M
  bool expected = false;
186
24.2M
  if (responded_.compare_exchange_strong(expected, true, std::memory_order_acq_rel)) {
187
24.2M
    connection()->context().QueueResponse(connection(), shared_from(this));
188
18.4E
  } else {
189
18.4E
    LOG_WITH_PREFIX(DFATAL) << "Response already queued";
190
18.4E
  }
191
24.2M
}
192
193
27
std::string InboundCall::LogPrefix() const {
194
27
  return Format("$0: ", this);
195
27
}
196
197
11.9k
bool InboundCall::RespondTimedOutIfPending(const char* message) {
198
11.9k
  if (!TryStartProcessing()) {
199
11.8k
    return false;
200
11.8k
  }
201
202
55
  RespondFailure(ErrorStatusPB::ERROR_SERVER_TOO_BUSY, STATUS(TimedOut, message));
203
55
  Clear();
204
205
55
  return true;
206
55
}
207
208
55
void InboundCall::Clear() {
209
55
  serialized_request_.clear();
210
55
  request_data_.Reset();
211
55
  request_data_memory_usage_.store(0, std::memory_order_release);
212
55
}
213
214
19.7M
size_t InboundCall::DynamicMemoryUsage() const {
215
19.7M
  return request_data_memory_usage_.load(std::memory_order_acquire) + DynamicMemoryUsageOf(trace_);
216
19.7M
}
217
218
29.8M
void InboundCall::InboundCallTask::Run() {
219
29.8M
  handler_->Handle(call_);
220
29.8M
}
221
222
29.8M
void InboundCall::InboundCallTask::Done(const Status& status) {
223
  // We should reset call_ after this function. So it is easiest way to do it.
224
29.8M
  auto call = std::move(call_);
225
29.8M
  if (!status.ok()) {
226
7
    handler_->Failure(call, status);
227
7
  }
228
29.8M
}
229
230
29.5M
void InboundCall::SetRpcMethodMetrics(std::reference_wrapper<const RpcMethodMetrics> value) {
231
29.5M
  const auto& metrics = value.get();
232
29.5M
  rpc_method_response_bytes_ = metrics.response_bytes;
233
29.5M
  rpc_method_handler_latency_ = metrics.handler_latency;
234
29.5M
  if (metrics.request_bytes) {
235
24.9M
    auto request_size = request_data_.size();
236
24.9M
    if (request_size) {
237
19.3M
      metrics.request_bytes->IncrementBy(request_size);
238
19.3M
    }
239
24.9M
  }
240
29.5M
}
241
242
21.2M
void InboundCall::Serialize(boost::container::small_vector_base<RefCntBuffer>* output) {
243
21.2M
  size_t old_size = output->size();
244
21.2M
  DoSerialize(output);
245
21.2M
  if (rpc_method_response_bytes_) {
246
16.5M
    auto response_size = 0;
247
34.9M
    for (size_t i = old_size; i != output->size(); ++i) {
248
18.3M
      response_size += (*output)[i].size();
249
18.3M
    }
250
16.5M
    if (response_size) {
251
16.5M
      rpc_method_response_bytes_->IncrementBy(response_size);
252
16.5M
    }
253
16.5M
  }
254
21.2M
}
255
256
}  // namespace rpc
257
}  // namespace yb