YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/rpc/inbound_call.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include "yb/rpc/inbound_call.h"
34
35
#include "yb/gutil/strings/substitute.h"
36
37
#include "yb/rpc/connection.h"
38
#include "yb/rpc/connection_context.h"
39
#include "yb/rpc/rpc_introspection.pb.h"
40
#include "yb/rpc/rpc_metrics.h"
41
#include "yb/rpc/service_if.h"
42
43
#include "yb/util/debug/trace_event.h"
44
#include "yb/util/flag_tags.h"
45
#include "yb/util/logging.h"
46
#include "yb/util/metrics.h"
47
#include "yb/util/trace.h"
48
49
using std::shared_ptr;
50
using std::vector;
51
using strings::Substitute;
52
53
DEFINE_bool(rpc_dump_all_traces, false,
54
            "If true, dump all RPC traces at INFO level");
55
TAG_FLAG(rpc_dump_all_traces, advanced);
56
TAG_FLAG(rpc_dump_all_traces, runtime);
57
58
DEFINE_bool(collect_end_to_end_traces, false,
59
            "If true, collected traces includes information for sub-components "
60
            "potentially running on a different server. ");
61
TAG_FLAG(collect_end_to_end_traces, advanced);
62
TAG_FLAG(collect_end_to_end_traces, runtime);
63
64
DEFINE_int32(print_trace_every, 0,
65
             "Controls the rate at which traces are printed. Setting this to 0 "
66
             "disables printing the collected traces.");
67
TAG_FLAG(print_trace_every, advanced);
68
TAG_FLAG(print_trace_every, runtime);
69
70
DEFINE_int32(rpc_slow_query_threshold_ms, 10000,
71
             "Traces for calls that take longer than this threshold (in ms) are logged");
72
TAG_FLAG(rpc_slow_query_threshold_ms, advanced);
73
TAG_FLAG(rpc_slow_query_threshold_ms, runtime);
74
75
namespace yb {
76
namespace rpc {
77
78
InboundCall::InboundCall(ConnectionPtr conn, RpcMetrics* rpc_metrics,
79
                         CallProcessedListener call_processed_listener)
80
    : trace_(new Trace),
81
      conn_(std::move(conn)),
82
      rpc_metrics_(rpc_metrics ? rpc_metrics : &conn_->rpc_metrics()),
83
90.7M
      call_processed_listener_(std::move(call_processed_listener)) {
84
90.7M
  TRACE_TO(trace_, "Created InboundCall");
85
90.7M
  IncrementCounter(rpc_metrics_->inbound_calls_created);
86
90.7M
  IncrementGauge(rpc_metrics_->inbound_calls_alive);
87
90.7M
}
88
89
90.8M
InboundCall::~InboundCall() {
90
90.8M
  TRACE_TO(trace_, "Destroying InboundCall");
91
90.8M
  
YB_LOG_IF_EVERY_N0
(INFO, FLAGS_print_trace_every > 0, FLAGS_print_trace_every)
92
0
      << "Tracing op: \n " << trace_->DumpToString(true);
93
90.8M
  DecrementGauge(rpc_metrics_->inbound_calls_alive);
94
90.8M
}
95
96
79.9M
void InboundCall::NotifyTransferred(const Status& status, Connection* conn) {
97
79.9M
  if (status.ok()) {
98
79.9M
    TRACE_TO(trace_, "Transfer finished");
99
79.9M
  } else {
100
18.5k
    YB_LOG_EVERY_N_SECS(WARNING, 10) << LogPrefix() << "Connection torn down before " << ToString()
101
256
                                     << " could send its response: " << status.ToString();
102
18.5k
  }
103
79.9M
  if (call_processed_listener_) {
104
79.9M
    call_processed_listener_(this);
105
79.9M
  }
106
79.9M
}
107
108
29.6M
const Endpoint& InboundCall::remote_address() const {
109
29.6M
  CHECK_NOTNULL(conn_.get());
110
29.6M
  return conn_->remote();
111
29.6M
}
112
113
9.23k
const Endpoint& InboundCall::local_address() const {
114
9.23k
  CHECK_NOTNULL(conn_.get());
115
9.23k
  return conn_->local();
116
9.23k
}
117
118
276M
ConnectionPtr InboundCall::connection() const {
119
276M
  return conn_;
120
276M
}
121
122
0
ConnectionContext& InboundCall::connection_context() const {
123
0
  return conn_->context();
124
0
}
125
126
245M
Trace* InboundCall::trace() {
127
245M
  return trace_.get();
128
245M
}
129
130
90.8M
void InboundCall::RecordCallReceived() {
131
90.8M
  TRACE_EVENT_ASYNC_BEGIN0("rpc", "InboundCall", this);
132
  // Protect against multiple calls.
133
90.8M
  
LOG_IF_WITH_PREFIX84.9k
(DFATAL, timing_.time_received.Initialized()) << "Already marked as received"84.9k
;
134
90.8M
  
VLOG_WITH_PREFIX71.6k
(4) << "Received"71.6k
;
135
90.8M
  timing_.time_received = MonoTime::Now();
136
90.8M
}
137
138
90.5M
void InboundCall::RecordHandlingStarted(scoped_refptr<Histogram> incoming_queue_time) {
139
90.5M
  DCHECK(incoming_queue_time != nullptr);
140
  // Protect against multiple calls.
141
90.5M
  
LOG_IF_WITH_PREFIX11.7k
(DFATAL, timing_.time_handled.Initialized()) << "Already marked as started"11.7k
;
142
90.5M
  timing_.time_handled = MonoTime::Now();
143
90.5M
  
VLOG_WITH_PREFIX7.01k
(4) << "Handling"7.01k
;
144
90.5M
  incoming_queue_time->Increment(
145
90.5M
      timing_.time_handled.GetDeltaSince(timing_.time_received).ToMicroseconds());
146
90.5M
}
147
148
1.26M
MonoDelta InboundCall::GetTimeInQueue() const {
149
1.26M
  return timing_.time_handled.GetDeltaSince(timing_.time_received);
150
1.26M
}
151
152
90.4M
ThreadPoolTask* InboundCall::BindTask(InboundCallHandler* handler) {
153
90.4M
  auto shared_this = shared_from(this);
154
90.4M
  if (!handler->CallQueued()) {
155
1.26k
    return nullptr;
156
1.26k
  }
157
90.4M
  tracker_ = handler;
158
90.4M
  task_.Bind(handler, shared_this);
159
90.4M
  return &task_;
160
90.4M
}
161
162
90.2M
void InboundCall::RecordHandlingCompleted() {
163
  // Protect against multiple calls.
164
90.2M
  
LOG_IF_WITH_PREFIX15.3k
(DFATAL, timing_.time_completed.Initialized()) << "Already marked as completed"15.3k
;
165
90.2M
  timing_.time_completed = MonoTime::Now();
166
18.4E
  VLOG_WITH_PREFIX(4) << "Completed handling";
167
90.2M
  if (rpc_method_handler_latency_) {
168
90.0M
    rpc_method_handler_latency_->Increment(
169
90.0M
        (timing_.time_completed - timing_.time_handled).ToMicroseconds());
170
90.0M
  }
171
90.2M
}
172
173
90.5M
bool InboundCall::ClientTimedOut() const {
174
90.5M
  auto deadline = GetClientDeadline();
175
90.5M
  if (deadline == CoarseTimePoint::max()) {
176
219k
    return false;
177
219k
  }
178
179
90.3M
  return deadline < CoarseMonoClock::now();
180
90.5M
}
181
182
79.9M
void InboundCall::QueueResponse(bool is_success) {
183
79.9M
  TRACE_TO(trace_, is_success ? "Queueing success response" : "Queueing failure response");
184
79.9M
  LogTrace();
185
79.9M
  bool expected = false;
186
79.9M
  if (
responded_.compare_exchange_strong(expected, true, std::memory_order_acq_rel)79.9M
) {
187
79.9M
    connection()->context().QueueResponse(connection(), shared_from(this));
188
18.4E
  } else {
189
18.4E
    LOG_WITH_PREFIX(DFATAL) << "Response already queued";
190
18.4E
  }
191
79.9M
}
192
193
275
std::string InboundCall::LogPrefix() const {
194
275
  return Format("$0: ", this);
195
275
}
196
197
19.5k
bool InboundCall::RespondTimedOutIfPending(const char* message) {
198
19.5k
  if (!TryStartProcessing()) {
199
15.6k
    return false;
200
15.6k
  }
201
202
3.89k
  RespondFailure(ErrorStatusPB::ERROR_SERVER_TOO_BUSY, STATUS(TimedOut, message));
203
3.89k
  Clear();
204
205
3.89k
  return true;
206
19.5k
}
207
208
3.89k
void InboundCall::Clear() {
209
3.89k
  serialized_request_.clear();
210
3.89k
  request_data_.Reset();
211
3.89k
  request_data_memory_usage_.store(0, std::memory_order_release);
212
3.89k
}
213
214
71.0M
size_t InboundCall::DynamicMemoryUsage() const {
215
71.0M
  return request_data_memory_usage_.load(std::memory_order_acquire) + DynamicMemoryUsageOf(trace_);
216
71.0M
}
217
218
90.5M
void InboundCall::InboundCallTask::Run() {
219
90.5M
  handler_->Handle(call_);
220
90.5M
}
221
222
90.4M
void InboundCall::InboundCallTask::Done(const Status& status) {
223
  // We should reset call_ after this function. So it is easiest way to do it.
224
90.4M
  auto call = std::move(call_);
225
90.4M
  if (!status.ok()) {
226
16
    handler_->Failure(call, status);
227
16
  }
228
90.4M
}
229
230
90.0M
void InboundCall::SetRpcMethodMetrics(std::reference_wrapper<const RpcMethodMetrics> value) {
231
90.0M
  const auto& metrics = value.get();
232
90.0M
  rpc_method_response_bytes_ = metrics.response_bytes;
233
90.0M
  rpc_method_handler_latency_ = metrics.handler_latency;
234
90.0M
  if (metrics.request_bytes) {
235
81.1M
    auto request_size = request_data_.size();
236
81.1M
    if (request_size) {
237
70.2M
      metrics.request_bytes->IncrementBy(request_size);
238
70.2M
    }
239
81.1M
  }
240
90.0M
}
241
242
79.9M
void InboundCall::Serialize(boost::container::small_vector_base<RefCntBuffer>* output) {
243
79.9M
  size_t old_size = output->size();
244
79.9M
  DoSerialize(output);
245
79.9M
  if (rpc_method_response_bytes_) {
246
70.2M
    auto response_size = 0;
247
145M
    for (size_t i = old_size; i != output->size(); 
++i74.8M
) {
248
74.8M
      response_size += (*output)[i].size();
249
74.8M
    }
250
70.2M
    if (
response_size70.2M
) {
251
70.2M
      rpc_method_response_bytes_->IncrementBy(response_size);
252
70.2M
    }
253
70.2M
  }
254
79.9M
}
255
256
}  // namespace rpc
257
}  // namespace yb