/Users/deen/code/yugabyte-db/src/yb/rpc/inbound_call.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | // |
18 | | // The following only applies to changes made to this file as part of YugaByte development. |
19 | | // |
20 | | // Portions Copyright (c) YugaByte, Inc. |
21 | | // |
22 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
23 | | // in compliance with the License. You may obtain a copy of the License at |
24 | | // |
25 | | // http://www.apache.org/licenses/LICENSE-2.0 |
26 | | // |
27 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
28 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
29 | | // or implied. See the License for the specific language governing permissions and limitations |
30 | | // under the License. |
31 | | // |
32 | | |
33 | | #include "yb/rpc/inbound_call.h" |
34 | | |
35 | | #include "yb/gutil/strings/substitute.h" |
36 | | |
37 | | #include "yb/rpc/connection.h" |
38 | | #include "yb/rpc/connection_context.h" |
39 | | #include "yb/rpc/rpc_introspection.pb.h" |
40 | | #include "yb/rpc/rpc_metrics.h" |
41 | | #include "yb/rpc/service_if.h" |
42 | | |
43 | | #include "yb/util/debug/trace_event.h" |
44 | | #include "yb/util/flag_tags.h" |
45 | | #include "yb/util/logging.h" |
46 | | #include "yb/util/metrics.h" |
47 | | #include "yb/util/trace.h" |
48 | | |
49 | | using std::shared_ptr; |
50 | | using std::vector; |
51 | | using strings::Substitute; |
52 | | |
53 | | DEFINE_bool(rpc_dump_all_traces, false, |
54 | | "If true, dump all RPC traces at INFO level"); |
55 | | TAG_FLAG(rpc_dump_all_traces, advanced); |
56 | | TAG_FLAG(rpc_dump_all_traces, runtime); |
57 | | |
58 | | DEFINE_bool(collect_end_to_end_traces, false, |
59 | | "If true, collected traces includes information for sub-components " |
60 | | "potentially running on a different server. "); |
61 | | TAG_FLAG(collect_end_to_end_traces, advanced); |
62 | | TAG_FLAG(collect_end_to_end_traces, runtime); |
63 | | |
64 | | DEFINE_int32(print_trace_every, 0, |
65 | | "Controls the rate at which traces are printed. Setting this to 0 " |
66 | | "disables printing the collected traces."); |
67 | | TAG_FLAG(print_trace_every, advanced); |
68 | | TAG_FLAG(print_trace_every, runtime); |
69 | | |
70 | | DEFINE_int32(rpc_slow_query_threshold_ms, 10000, |
71 | | "Traces for calls that take longer than this threshold (in ms) are logged"); |
72 | | TAG_FLAG(rpc_slow_query_threshold_ms, advanced); |
73 | | TAG_FLAG(rpc_slow_query_threshold_ms, runtime); |
74 | | |
75 | | namespace yb { |
76 | | namespace rpc { |
77 | | |
78 | | InboundCall::InboundCall(ConnectionPtr conn, RpcMetrics* rpc_metrics, |
79 | | CallProcessedListener call_processed_listener) |
80 | | : trace_(new Trace), |
81 | | conn_(std::move(conn)), |
82 | | rpc_metrics_(rpc_metrics ? rpc_metrics : &conn_->rpc_metrics()), |
83 | 90.7M | call_processed_listener_(std::move(call_processed_listener)) { |
84 | 90.7M | TRACE_TO(trace_, "Created InboundCall"); |
85 | 90.7M | IncrementCounter(rpc_metrics_->inbound_calls_created); |
86 | 90.7M | IncrementGauge(rpc_metrics_->inbound_calls_alive); |
87 | 90.7M | } |
88 | | |
89 | 90.8M | InboundCall::~InboundCall() { |
90 | 90.8M | TRACE_TO(trace_, "Destroying InboundCall"); |
91 | 90.8M | YB_LOG_IF_EVERY_N0 (INFO, FLAGS_print_trace_every > 0, FLAGS_print_trace_every) |
92 | 0 | << "Tracing op: \n " << trace_->DumpToString(true); |
93 | 90.8M | DecrementGauge(rpc_metrics_->inbound_calls_alive); |
94 | 90.8M | } |
95 | | |
96 | 79.9M | void InboundCall::NotifyTransferred(const Status& status, Connection* conn) { |
97 | 79.9M | if (status.ok()) { |
98 | 79.9M | TRACE_TO(trace_, "Transfer finished"); |
99 | 79.9M | } else { |
100 | 18.5k | YB_LOG_EVERY_N_SECS(WARNING, 10) << LogPrefix() << "Connection torn down before " << ToString() |
101 | 256 | << " could send its response: " << status.ToString(); |
102 | 18.5k | } |
103 | 79.9M | if (call_processed_listener_) { |
104 | 79.9M | call_processed_listener_(this); |
105 | 79.9M | } |
106 | 79.9M | } |
107 | | |
108 | 29.6M | const Endpoint& InboundCall::remote_address() const { |
109 | 29.6M | CHECK_NOTNULL(conn_.get()); |
110 | 29.6M | return conn_->remote(); |
111 | 29.6M | } |
112 | | |
113 | 9.23k | const Endpoint& InboundCall::local_address() const { |
114 | 9.23k | CHECK_NOTNULL(conn_.get()); |
115 | 9.23k | return conn_->local(); |
116 | 9.23k | } |
117 | | |
118 | 276M | ConnectionPtr InboundCall::connection() const { |
119 | 276M | return conn_; |
120 | 276M | } |
121 | | |
122 | 0 | ConnectionContext& InboundCall::connection_context() const { |
123 | 0 | return conn_->context(); |
124 | 0 | } |
125 | | |
126 | 245M | Trace* InboundCall::trace() { |
127 | 245M | return trace_.get(); |
128 | 245M | } |
129 | | |
130 | 90.8M | void InboundCall::RecordCallReceived() { |
131 | 90.8M | TRACE_EVENT_ASYNC_BEGIN0("rpc", "InboundCall", this); |
132 | | // Protect against multiple calls. |
133 | 90.8M | LOG_IF_WITH_PREFIX84.9k (DFATAL, timing_.time_received.Initialized()) << "Already marked as received"84.9k ; |
134 | 90.8M | VLOG_WITH_PREFIX71.6k (4) << "Received"71.6k ; |
135 | 90.8M | timing_.time_received = MonoTime::Now(); |
136 | 90.8M | } |
137 | | |
138 | 90.5M | void InboundCall::RecordHandlingStarted(scoped_refptr<Histogram> incoming_queue_time) { |
139 | 90.5M | DCHECK(incoming_queue_time != nullptr); |
140 | | // Protect against multiple calls. |
141 | 90.5M | LOG_IF_WITH_PREFIX11.7k (DFATAL, timing_.time_handled.Initialized()) << "Already marked as started"11.7k ; |
142 | 90.5M | timing_.time_handled = MonoTime::Now(); |
143 | 90.5M | VLOG_WITH_PREFIX7.01k (4) << "Handling"7.01k ; |
144 | 90.5M | incoming_queue_time->Increment( |
145 | 90.5M | timing_.time_handled.GetDeltaSince(timing_.time_received).ToMicroseconds()); |
146 | 90.5M | } |
147 | | |
148 | 1.26M | MonoDelta InboundCall::GetTimeInQueue() const { |
149 | 1.26M | return timing_.time_handled.GetDeltaSince(timing_.time_received); |
150 | 1.26M | } |
151 | | |
152 | 90.4M | ThreadPoolTask* InboundCall::BindTask(InboundCallHandler* handler) { |
153 | 90.4M | auto shared_this = shared_from(this); |
154 | 90.4M | if (!handler->CallQueued()) { |
155 | 1.26k | return nullptr; |
156 | 1.26k | } |
157 | 90.4M | tracker_ = handler; |
158 | 90.4M | task_.Bind(handler, shared_this); |
159 | 90.4M | return &task_; |
160 | 90.4M | } |
161 | | |
162 | 90.2M | void InboundCall::RecordHandlingCompleted() { |
163 | | // Protect against multiple calls. |
164 | 90.2M | LOG_IF_WITH_PREFIX15.3k (DFATAL, timing_.time_completed.Initialized()) << "Already marked as completed"15.3k ; |
165 | 90.2M | timing_.time_completed = MonoTime::Now(); |
166 | 18.4E | VLOG_WITH_PREFIX(4) << "Completed handling"; |
167 | 90.2M | if (rpc_method_handler_latency_) { |
168 | 90.0M | rpc_method_handler_latency_->Increment( |
169 | 90.0M | (timing_.time_completed - timing_.time_handled).ToMicroseconds()); |
170 | 90.0M | } |
171 | 90.2M | } |
172 | | |
173 | 90.5M | bool InboundCall::ClientTimedOut() const { |
174 | 90.5M | auto deadline = GetClientDeadline(); |
175 | 90.5M | if (deadline == CoarseTimePoint::max()) { |
176 | 219k | return false; |
177 | 219k | } |
178 | | |
179 | 90.3M | return deadline < CoarseMonoClock::now(); |
180 | 90.5M | } |
181 | | |
182 | 79.9M | void InboundCall::QueueResponse(bool is_success) { |
183 | 79.9M | TRACE_TO(trace_, is_success ? "Queueing success response" : "Queueing failure response"); |
184 | 79.9M | LogTrace(); |
185 | 79.9M | bool expected = false; |
186 | 79.9M | if (responded_.compare_exchange_strong(expected, true, std::memory_order_acq_rel)79.9M ) { |
187 | 79.9M | connection()->context().QueueResponse(connection(), shared_from(this)); |
188 | 18.4E | } else { |
189 | 18.4E | LOG_WITH_PREFIX(DFATAL) << "Response already queued"; |
190 | 18.4E | } |
191 | 79.9M | } |
192 | | |
193 | 275 | std::string InboundCall::LogPrefix() const { |
194 | 275 | return Format("$0: ", this); |
195 | 275 | } |
196 | | |
197 | 19.5k | bool InboundCall::RespondTimedOutIfPending(const char* message) { |
198 | 19.5k | if (!TryStartProcessing()) { |
199 | 15.6k | return false; |
200 | 15.6k | } |
201 | | |
202 | 3.89k | RespondFailure(ErrorStatusPB::ERROR_SERVER_TOO_BUSY, STATUS(TimedOut, message)); |
203 | 3.89k | Clear(); |
204 | | |
205 | 3.89k | return true; |
206 | 19.5k | } |
207 | | |
208 | 3.89k | void InboundCall::Clear() { |
209 | 3.89k | serialized_request_.clear(); |
210 | 3.89k | request_data_.Reset(); |
211 | 3.89k | request_data_memory_usage_.store(0, std::memory_order_release); |
212 | 3.89k | } |
213 | | |
214 | 71.0M | size_t InboundCall::DynamicMemoryUsage() const { |
215 | 71.0M | return request_data_memory_usage_.load(std::memory_order_acquire) + DynamicMemoryUsageOf(trace_); |
216 | 71.0M | } |
217 | | |
218 | 90.5M | void InboundCall::InboundCallTask::Run() { |
219 | 90.5M | handler_->Handle(call_); |
220 | 90.5M | } |
221 | | |
222 | 90.4M | void InboundCall::InboundCallTask::Done(const Status& status) { |
223 | | // We should reset call_ after this function. So it is easiest way to do it. |
224 | 90.4M | auto call = std::move(call_); |
225 | 90.4M | if (!status.ok()) { |
226 | 16 | handler_->Failure(call, status); |
227 | 16 | } |
228 | 90.4M | } |
229 | | |
230 | 90.0M | void InboundCall::SetRpcMethodMetrics(std::reference_wrapper<const RpcMethodMetrics> value) { |
231 | 90.0M | const auto& metrics = value.get(); |
232 | 90.0M | rpc_method_response_bytes_ = metrics.response_bytes; |
233 | 90.0M | rpc_method_handler_latency_ = metrics.handler_latency; |
234 | 90.0M | if (metrics.request_bytes) { |
235 | 81.1M | auto request_size = request_data_.size(); |
236 | 81.1M | if (request_size) { |
237 | 70.2M | metrics.request_bytes->IncrementBy(request_size); |
238 | 70.2M | } |
239 | 81.1M | } |
240 | 90.0M | } |
241 | | |
242 | 79.9M | void InboundCall::Serialize(boost::container::small_vector_base<RefCntBuffer>* output) { |
243 | 79.9M | size_t old_size = output->size(); |
244 | 79.9M | DoSerialize(output); |
245 | 79.9M | if (rpc_method_response_bytes_) { |
246 | 70.2M | auto response_size = 0; |
247 | 145M | for (size_t i = old_size; i != output->size(); ++i74.8M ) { |
248 | 74.8M | response_size += (*output)[i].size(); |
249 | 74.8M | } |
250 | 70.2M | if (response_size70.2M ) { |
251 | 70.2M | rpc_method_response_bytes_->IncrementBy(response_size); |
252 | 70.2M | } |
253 | 70.2M | } |
254 | 79.9M | } |
255 | | |
256 | | } // namespace rpc |
257 | | } // namespace yb |