/Users/deen/code/yugabyte-db/src/yb/rpc/outbound_call.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | // |
18 | | // The following only applies to changes made to this file as part of YugaByte development. |
19 | | // |
20 | | // Portions Copyright (c) YugaByte, Inc. |
21 | | // |
22 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
23 | | // in compliance with the License. You may obtain a copy of the License at |
24 | | // |
25 | | // http://www.apache.org/licenses/LICENSE-2.0 |
26 | | // |
27 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
28 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
29 | | // or implied. See the License for the specific language governing permissions and limitations |
30 | | // under the License. |
31 | | // |
32 | | |
33 | | #include "yb/rpc/outbound_call.h" |
34 | | |
35 | | #include <algorithm> |
36 | | #include <mutex> |
37 | | #include <string> |
38 | | #include <vector> |
39 | | |
40 | | #include <boost/functional/hash.hpp> |
41 | | #include <gflags/gflags.h> |
42 | | |
43 | | #include "yb/gutil/strings/substitute.h" |
44 | | #include "yb/gutil/walltime.h" |
45 | | |
46 | | #include "yb/rpc/connection.h" |
47 | | #include "yb/rpc/constants.h" |
48 | | #include "yb/rpc/proxy_base.h" |
49 | | #include "yb/rpc/rpc_controller.h" |
50 | | #include "yb/rpc/rpc_introspection.pb.h" |
51 | | #include "yb/rpc/rpc_metrics.h" |
52 | | #include "yb/rpc/serialization.h" |
53 | | |
54 | | #include "yb/util/flag_tags.h" |
55 | | #include "yb/util/format.h" |
56 | | #include "yb/util/logging.h" |
57 | | #include "yb/util/memory/memory.h" |
58 | | #include "yb/util/metrics.h" |
59 | | #include "yb/util/pb_util.h" |
60 | | #include "yb/util/result.h" |
61 | | #include "yb/util/scope_exit.h" |
62 | | #include "yb/util/status_format.h" |
63 | | #include "yb/util/thread_restrictions.h" |
64 | | #include "yb/util/trace.h" |
65 | | #include "yb/util/tsan_util.h" |
66 | | |
67 | | METRIC_DEFINE_coarse_histogram( |
68 | | server, handler_latency_outbound_call_queue_time, "Time taken to queue the request ", |
69 | | yb::MetricUnit::kMicroseconds, "Microseconds spent to queue the request to the reactor"); |
70 | | METRIC_DEFINE_coarse_histogram( |
71 | | server, handler_latency_outbound_call_send_time, "Time taken to send the request ", |
72 | | yb::MetricUnit::kMicroseconds, "Microseconds spent to queue and write the request to the wire"); |
73 | | METRIC_DEFINE_coarse_histogram( |
74 | | server, handler_latency_outbound_call_time_to_response, "Time taken to get the response ", |
75 | | yb::MetricUnit::kMicroseconds, |
76 | | "Microseconds spent to send the request and get a response on the wire"); |
77 | | |
78 | | // 100M cycles should be about 50ms on a 2Ghz box. This should be high |
79 | | // enough that involuntary context switches don't trigger it, but low enough |
80 | | // that any serious blocking behavior on the reactor would. |
81 | | DEFINE_int64( |
82 | | rpc_callback_max_cycles, 100 * 1000 * 1000 * yb::kTimeMultiplier, |
83 | | "The maximum number of cycles for which an RPC callback " |
84 | | "should be allowed to run without emitting a warning." |
85 | | " (Advanced debugging option)"); |
86 | | TAG_FLAG(rpc_callback_max_cycles, advanced); |
87 | | TAG_FLAG(rpc_callback_max_cycles, runtime); |
88 | | DECLARE_bool(rpc_dump_all_traces); |
89 | | |
90 | | namespace yb { |
91 | | namespace rpc { |
92 | | |
93 | | using strings::Substitute; |
94 | | using google::protobuf::Message; |
95 | | using google::protobuf::io::CodedOutputStream; |
96 | | |
97 | | OutboundCallMetrics::OutboundCallMetrics(const scoped_refptr<MetricEntity>& entity) |
98 | | : queue_time(METRIC_handler_latency_outbound_call_queue_time.Instantiate(entity)), |
99 | | send_time(METRIC_handler_latency_outbound_call_send_time.Instantiate(entity)), |
100 | 206k | time_to_response(METRIC_handler_latency_outbound_call_time_to_response.Instantiate(entity)) { |
101 | 206k | } |
102 | | |
103 | | namespace { |
104 | | |
105 | | std::atomic<int32_t> call_id_ = {0}; |
106 | | |
107 | 86.4M | int32_t NextCallId() { |
108 | 86.4M | for (;;) { |
109 | 86.4M | auto result = call_id_.fetch_add(1, std::memory_order_acquire); |
110 | 86.4M | ++result; |
111 | 86.5M | if (result > 086.4M ) { |
112 | 86.5M | return result; |
113 | 86.5M | } |
114 | | // When call id overflows, we reset it to zero. |
115 | 18.4E | call_id_.compare_exchange_weak(result, 0); |
116 | 18.4E | } |
117 | 86.4M | } |
118 | | |
119 | | const std::string kEmptyString; |
120 | | |
121 | | } // namespace |
122 | | |
123 | 66.8M | void InvokeCallbackTask::Run() { |
124 | 66.8M | CHECK_NOTNULL(call_.get()); |
125 | 66.8M | call_->InvokeCallbackSync(); |
126 | 66.8M | } |
127 | | |
128 | 66.8M | void InvokeCallbackTask::Done(const Status& status) { |
129 | 66.8M | CHECK_NOTNULL(call_.get()); |
130 | 66.8M | if (!status.ok()) { |
131 | 283 | LOG(WARNING) << Format( |
132 | 283 | "Failed to schedule invoking callback on response for request $0 to $1: $2", |
133 | 283 | call_->remote_method(), call_->hostname(), status); |
134 | 283 | call_->SetThreadPoolFailure(status); |
135 | | // We are in the shutdown path, with the threadpool closing, so allow IO and wait. |
136 | 283 | ThreadRestrictions::SetWaitAllowed(true); |
137 | 283 | ThreadRestrictions::SetIOAllowed(true); |
138 | 283 | call_->InvokeCallbackSync(); |
139 | 283 | } |
140 | | // Clear the call, since it holds OutboundCall object. |
141 | 66.8M | call_ = nullptr; |
142 | 66.8M | } |
143 | | |
144 | | /// |
145 | | /// OutboundCall |
146 | | /// |
147 | | |
148 | | OutboundCall::OutboundCall(const RemoteMethod* remote_method, |
149 | | const std::shared_ptr<OutboundCallMetrics>& outbound_call_metrics, |
150 | | std::shared_ptr<const OutboundMethodMetrics> method_metrics, |
151 | | AnyMessagePtr response_storage, |
152 | | RpcController* controller, |
153 | | std::shared_ptr<RpcMetrics> rpc_metrics, |
154 | | ResponseCallback callback, |
155 | | ThreadPool* callback_thread_pool) |
156 | | : hostname_(&kEmptyString), |
157 | | start_(CoarseMonoClock::Now()), |
158 | | controller_(DCHECK_NOTNULL(controller)), |
159 | | response_(DCHECK_NOTNULL(response_storage)), |
160 | | trace_(new Trace), |
161 | | call_id_(NextCallId()), |
162 | | remote_method_(remote_method), |
163 | | callback_(std::move(callback)), |
164 | | callback_thread_pool_(callback_thread_pool), |
165 | | outbound_call_metrics_(outbound_call_metrics), |
166 | | rpc_metrics_(std::move(rpc_metrics)), |
167 | 86.4M | method_metrics_(std::move(method_metrics)) { |
168 | 86.4M | TRACE_TO_WITH_TIME(trace_, start_, "$0.", remote_method_->ToString()); |
169 | | |
170 | 86.4M | if (Trace::CurrentTrace()) { |
171 | 1.01M | Trace::CurrentTrace()->AddChildTrace(trace_.get()); |
172 | 1.01M | } |
173 | | |
174 | 86.4M | DVLOG(4) << "OutboundCall " << this << " constructed with state_: " << StateName(state_) |
175 | 33.3k | << " and RPC timeout: " |
176 | 33.3k | << (controller_->timeout().Initialized() ? controller_->timeout().ToString()0 : "none"); |
177 | | |
178 | 86.4M | IncrementCounter(rpc_metrics_->outbound_calls_created); |
179 | 86.4M | IncrementGauge(rpc_metrics_->outbound_calls_alive); |
180 | 86.4M | } |
181 | | |
182 | 86.3M | OutboundCall::~OutboundCall() { |
183 | 86.3M | DCHECK(IsFinished()); |
184 | 86.3M | DVLOG(4) << "OutboundCall " << this << " destroyed with state_: " << StateName(state_)19.9k ; |
185 | | |
186 | 86.3M | if (PREDICT_FALSE(FLAGS_rpc_dump_all_traces)) { |
187 | 0 | LOG(INFO) << ToString() << " took " |
188 | 0 | << MonoDelta(CoarseMonoClock::Now() - start_).ToMicroseconds() << "us. Trace:"; |
189 | 0 | trace_->Dump(&LOG(INFO), true); |
190 | 0 | } |
191 | | |
192 | 86.3M | DecrementGauge(rpc_metrics_->outbound_calls_alive); |
193 | 86.3M | } |
194 | | |
195 | 74.8M | void OutboundCall::NotifyTransferred(const Status& status, Connection* conn) { |
196 | 74.8M | if (status.ok()) { |
197 | | // Even when call is already finished (timed out) we should notify connection that it was sent |
198 | | // because it should expect response with appropriate id. |
199 | 72.0M | conn->CallSent(shared_from(this)); |
200 | 72.0M | } |
201 | | |
202 | 74.8M | if (IsFinished()) { |
203 | 192 | LOG_IF_WITH_PREFIX0 (DFATAL, !IsTimedOut()) |
204 | 0 | << "Transferred call is in wrong state: " << state_.load(std::memory_order_acquire); |
205 | 74.8M | } else if (status.ok()) { |
206 | 72.4M | SetSent(); |
207 | 72.4M | } else { |
208 | 18.4E | VLOG_WITH_PREFIX(1) << "Connection torn down: " << status; |
209 | 2.40M | SetFailed(status); |
210 | 2.40M | } |
211 | 74.8M | } |
212 | | |
213 | 72.9M | void OutboundCall::Serialize(boost::container::small_vector_base<RefCntBuffer>* output) { |
214 | 72.9M | output->push_back(std::move(buffer_)); |
215 | 72.9M | buffer_consumption_ = ScopedTrackedConsumption(); |
216 | 72.9M | } |
217 | | |
218 | 75.6M | Status OutboundCall::SetRequestParam(AnyMessageConstPtr req, const MemTrackerPtr& mem_tracker) { |
219 | 75.6M | auto req_size = req.SerializedSize(); |
220 | 75.6M | size_t message_size = SerializedMessageSize(req_size, 0); |
221 | | |
222 | 75.6M | using Output = google::protobuf::io::CodedOutputStream; |
223 | 75.6M | auto timeout_ms = VERIFY_RESULT75.6M (TimeoutMs());75.6M |
224 | 0 | size_t call_id_size = Output::VarintSize32(call_id_); |
225 | 75.6M | size_t timeout_ms_size = Output::VarintSize32(timeout_ms); |
226 | 75.6M | auto serialized_remote_method = remote_method_->serialized(); |
227 | | |
228 | 75.6M | size_t header_pb_len = 1 + call_id_size + serialized_remote_method.size() + 1 + timeout_ms_size; |
229 | 75.6M | size_t header_size = |
230 | 75.6M | kMsgLengthPrefixLength // Int prefix for the total length. |
231 | 75.6M | + CodedOutputStream::VarintSize32( |
232 | 75.6M | narrow_cast<uint32_t>(header_pb_len)) // Varint delimiter for header PB. |
233 | 75.6M | + header_pb_len; // Length for the header PB itself. |
234 | 75.6M | size_t total_size = header_size + message_size; |
235 | | |
236 | 75.6M | buffer_ = RefCntBuffer(total_size); |
237 | 75.6M | uint8_t* dst = buffer_.udata(); |
238 | | |
239 | | // 1. The length for the whole request, not including the 4-byte |
240 | | // length prefix. |
241 | 75.6M | NetworkByteOrder::Store32(dst, narrow_cast<uint32_t>(total_size - kMsgLengthPrefixLength)); |
242 | 75.6M | dst += sizeof(uint32_t); |
243 | | |
244 | | // 2. The varint-prefixed RequestHeader PB |
245 | 75.6M | dst = CodedOutputStream::WriteVarint32ToArray(narrow_cast<uint32_t>(header_pb_len), dst); |
246 | 75.6M | dst = Output::WriteTagToArray(RequestHeader::kCallIdFieldNumber << 3, dst); |
247 | 75.6M | dst = Output::WriteVarint32ToArray(call_id_, dst); |
248 | 75.6M | memcpy(dst, serialized_remote_method.data(), serialized_remote_method.size()); |
249 | 75.6M | dst += serialized_remote_method.size(); |
250 | 75.6M | dst = CodedOutputStream::WriteTagToArray(RequestHeader::kTimeoutMillisFieldNumber << 3, dst); |
251 | 75.6M | dst = Output::WriteVarint32ToArray(timeout_ms, dst); |
252 | | |
253 | 75.6M | DCHECK_EQ(dst - buffer_.udata(), header_size); |
254 | | |
255 | 75.6M | if (mem_tracker) { |
256 | 75.5M | buffer_consumption_ = ScopedTrackedConsumption(mem_tracker, buffer_.size()); |
257 | 75.5M | } |
258 | 75.6M | RETURN_NOT_OK(SerializeMessage(req, req_size, buffer_, 0, header_size)); |
259 | 75.6M | if (method_metrics_) { |
260 | 73.7M | IncrementCounterBy(method_metrics_->request_bytes, buffer_.size()); |
261 | 73.7M | } |
262 | 75.6M | return Status::OK(); |
263 | 75.6M | } |
264 | | |
265 | 84.9M | Status OutboundCall::status() const { |
266 | 84.9M | std::lock_guard<simple_spinlock> l(lock_); |
267 | 84.9M | return status_; |
268 | 84.9M | } |
269 | | |
270 | 268k | const ErrorStatusPB* OutboundCall::error_pb() const { |
271 | 268k | std::lock_guard<simple_spinlock> l(lock_); |
272 | 268k | return error_pb_.get(); |
273 | 268k | } |
274 | | |
275 | 195 | string OutboundCall::StateName(State state) { |
276 | 195 | return RpcCallState_Name(state); |
277 | 195 | } |
278 | | |
279 | 1 | OutboundCall::State OutboundCall::state() const { |
280 | 1 | return state_.load(std::memory_order_acquire); |
281 | 1 | } |
282 | | |
283 | 673M | bool FinishedState(RpcCallState state) { |
284 | 673M | switch (state) { |
285 | 86.1M | case READY: |
286 | 394M | case ON_OUTBOUND_QUEUE: |
287 | 561M | case SENT: |
288 | 561M | return false; |
289 | 58.5k | case TIMED_OUT: |
290 | 3.35M | case FINISHED_ERROR: |
291 | 113M | case FINISHED_SUCCESS: |
292 | 113M | return true; |
293 | 673M | } |
294 | 0 | LOG(FATAL) << "Unknown call state: " << state; |
295 | 0 | return false; |
296 | 673M | } |
297 | | |
298 | 255M | bool ValidStateTransition(RpcCallState old_state, RpcCallState new_state) { |
299 | 255M | switch (new_state) { |
300 | 86.0M | case ON_OUTBOUND_QUEUE: |
301 | 86.0M | return old_state == READY; |
302 | 83.6M | case SENT: |
303 | 83.6M | return old_state == ON_OUTBOUND_QUEUE; |
304 | 38.4k | case TIMED_OUT: |
305 | 38.4k | return old_state == SENT || old_state == ON_OUTBOUND_QUEUE3.98k ; |
306 | 83.1M | case FINISHED_SUCCESS: |
307 | 83.1M | return old_state == SENT; |
308 | 3.01M | case FINISHED_ERROR: |
309 | 3.01M | return old_state == SENT || old_state == ON_OUTBOUND_QUEUE2.72M || old_state == READY160k ; |
310 | 0 | default: |
311 | | // No sanity checks for others. |
312 | 0 | return true; |
313 | 255M | } |
314 | 255M | } |
315 | | |
316 | 255M | bool OutboundCall::SetState(State new_state) { |
317 | 255M | auto old_state = state_.load(std::memory_order_acquire); |
318 | | // Sanity check state transitions. |
319 | 255M | DVLOG(3) << "OutboundCall " << this << " (" << ToString() << ") switching from " << |
320 | 744k | StateName(old_state) << " to " << StateName(new_state); |
321 | 255M | for (;;) { |
322 | 255M | if (FinishedState(old_state)) { |
323 | 0 | VLOG(1) << "Call already finished: " << RpcCallState_Name(old_state) << ", new state: " |
324 | 0 | << RpcCallState_Name(new_state); |
325 | 0 | return false; |
326 | 0 | } |
327 | 255M | if (!ValidStateTransition(old_state, new_state)) { |
328 | 0 | LOG(DFATAL) |
329 | 0 | << "Invalid call state transition: " << RpcCallState_Name(old_state) << " => " |
330 | 0 | << RpcCallState_Name(new_state); |
331 | 0 | return false; |
332 | 0 | } |
333 | 255M | if (255M state_.compare_exchange_weak(old_state, new_state, std::memory_order_acq_rel)255M ) { |
334 | 255M | return true; |
335 | 255M | } |
336 | 255M | } |
337 | 255M | } |
338 | | |
339 | 86.1M | void OutboundCall::InvokeCallback() { |
340 | 86.1M | if (callback_thread_pool_) { |
341 | 66.5M | callback_task_.SetOutboundCall(shared_from(this)); |
342 | 66.5M | callback_thread_pool_->Enqueue(&callback_task_); |
343 | 66.5M | TRACE_TO(trace_, "Callback will be called asynchronously."); |
344 | 66.5M | } else { |
345 | 19.5M | InvokeCallbackSync(); |
346 | 19.5M | TRACE_TO(trace_, "Callback called synchronously."); |
347 | 19.5M | } |
348 | 86.1M | } |
349 | | |
350 | 86.4M | void OutboundCall::InvokeCallbackSync() { |
351 | 86.4M | if (!callback_) { |
352 | 0 | LOG(DFATAL) << "Callback has been already invoked."; |
353 | 0 | return; |
354 | 0 | } |
355 | | |
356 | 86.4M | int64_t start_cycles = CycleClock::Now(); |
357 | 86.4M | callback_(); |
358 | | // Clear the callback, since it may be holding onto reference counts |
359 | | // via bound parameters. We do this inside the timer because it's possible |
360 | | // the user has naughty destructors that block, and we want to account for that |
361 | | // time here if they happen to run on this thread. |
362 | 86.4M | callback_ = nullptr; |
363 | 86.4M | int64_t end_cycles = CycleClock::Now(); |
364 | 86.4M | int64_t wait_cycles = end_cycles - start_cycles; |
365 | 86.4M | if (PREDICT_FALSE(wait_cycles > FLAGS_rpc_callback_max_cycles)) { |
366 | 146 | auto time_spent = MonoDelta::FromSeconds( |
367 | 146 | static_cast<double>(wait_cycles) / base::CyclesPerSecond()); |
368 | | |
369 | 146 | LOG(WARNING) << "RPC callback for " << ToString() << " took " << time_spent; |
370 | 146 | } |
371 | | |
372 | | // Could be destroyed during callback. So reset it. |
373 | 86.4M | controller_ = nullptr; |
374 | 86.4M | response_ = nullptr; |
375 | 86.4M | } |
376 | | |
377 | 72.7M | void OutboundCall::SetResponse(CallResponse&& resp) { |
378 | 72.7M | DCHECK(!IsFinished()); |
379 | | |
380 | 72.7M | auto now = CoarseMonoClock::Now(); |
381 | 72.7M | TRACE_TO_WITH_TIME(trace_, now, "Response received."); |
382 | | // Avoid expensive conn_id.ToString() in production. |
383 | 72.7M | VTRACE_TO(1, trace_, "from $0", conn_id_.ToString()); |
384 | | // Track time taken to be responded. |
385 | | |
386 | 72.7M | if (outbound_call_metrics_) { |
387 | 71.1M | outbound_call_metrics_->time_to_response->Increment(MonoDelta(now - start_).ToMicroseconds()); |
388 | 71.1M | } |
389 | 72.7M | call_response_ = std::move(resp); |
390 | 72.7M | Slice r(call_response_.serialized_response()); |
391 | | |
392 | 72.7M | if (method_metrics_) { |
393 | 70.9M | IncrementCounterBy(method_metrics_->response_bytes, r.size()); |
394 | 70.9M | } |
395 | | |
396 | 72.7M | if (call_response_.is_success()) { |
397 | | // TODO: here we're deserializing the call response within the reactor thread, |
398 | | // which isn't great, since it would block processing of other RPCs in parallel. |
399 | | // Should look into a way to avoid this. |
400 | 72.2M | auto status = response_.ParseFromSlice(r); |
401 | 72.2M | if (!status.ok()) { |
402 | 0 | SetFailed(status); |
403 | 0 | return; |
404 | 0 | } |
405 | 72.2M | if (SetState(FINISHED_SUCCESS)) { |
406 | 72.2M | InvokeCallback(); |
407 | 72.2M | } else { |
408 | 35.6k | LOG(DFATAL) << "Success of already finished call: " |
409 | 35.6k | << RpcCallState_Name(state_.load(std::memory_order_acquire)); |
410 | 35.6k | } |
411 | 72.2M | } else { |
412 | | // Error |
413 | 460k | auto err = std::make_unique<ErrorStatusPB>(); |
414 | 460k | if (!pb_util::ParseFromArray(err.get(), r.data(), r.size()).IsOk()) { |
415 | 0 | SetFailed(STATUS(IOError, "Was an RPC error but could not parse error response", |
416 | 0 | err->InitializationErrorString())); |
417 | 0 | return; |
418 | 0 | } |
419 | 460k | auto status = STATUS(RemoteError, err->message()); |
420 | 460k | SetFailed(status, std::move(err)); |
421 | 460k | } |
422 | 72.7M | } |
423 | | |
424 | 86.0M | void OutboundCall::SetQueued() { |
425 | 86.0M | auto end_time = CoarseMonoClock::Now(); |
426 | | // Track time taken to be queued. |
427 | 86.0M | if (outbound_call_metrics_) { |
428 | 84.0M | outbound_call_metrics_->queue_time->Increment(MonoDelta(end_time - start_).ToMicroseconds()); |
429 | 84.0M | } |
430 | 86.0M | SetState(ON_OUTBOUND_QUEUE); |
431 | 86.0M | TRACE_TO_WITH_TIME(trace_, end_time, "Queued."); |
432 | 86.0M | } |
433 | | |
434 | 83.3M | void OutboundCall::SetSent() { |
435 | 83.3M | auto end_time = CoarseMonoClock::Now(); |
436 | | // Track time taken to be sent |
437 | 83.3M | if (outbound_call_metrics_) { |
438 | 81.4M | outbound_call_metrics_->send_time->Increment(MonoDelta(end_time - start_).ToMicroseconds()); |
439 | 81.4M | } |
440 | 83.3M | SetState(SENT); |
441 | 83.3M | TRACE_TO_WITH_TIME(trace_, end_time, "Call Sent."); |
442 | 83.3M | } |
443 | | |
444 | 10.7M | void OutboundCall::SetFinished() { |
445 | 10.7M | DCHECK(!IsFinished()); |
446 | | |
447 | | // Track time taken to be responded. |
448 | 10.7M | if (outbound_call_metrics_) { |
449 | 10.7M | outbound_call_metrics_->time_to_response->Increment( |
450 | 10.7M | MonoDelta(CoarseMonoClock::Now() - start_).ToMicroseconds()); |
451 | 10.7M | } |
452 | 10.8M | if (SetState(FINISHED_SUCCESS)10.7M ) { |
453 | 10.8M | InvokeCallback(); |
454 | 10.8M | } |
455 | 10.7M | } |
456 | | |
457 | 3.00M | void OutboundCall::SetFailed(const Status &status, std::unique_ptr<ErrorStatusPB> err_pb) { |
458 | 3.00M | DCHECK(!IsFinished()); |
459 | | |
460 | 3.00M | TRACE_TO(trace_, "Call Failed."); |
461 | 3.00M | bool invoke_callback; |
462 | 3.00M | { |
463 | 3.00M | std::lock_guard<simple_spinlock> l(lock_); |
464 | 3.00M | status_ = status; |
465 | 3.00M | if (status_.IsRemoteError()) { |
466 | 268k | CHECK(err_pb); |
467 | 268k | error_pb_ = std::move(err_pb); |
468 | 268k | if (error_pb_->has_code()) { |
469 | 266k | status_ = status_.CloneAndAddErrorCode(RpcError(error_pb_->code())); |
470 | 266k | } |
471 | 2.74M | } else { |
472 | 2.74M | CHECK(!err_pb); |
473 | 2.74M | } |
474 | 3.00M | invoke_callback = SetState(FINISHED_ERROR); |
475 | 3.00M | } |
476 | 3.01M | if (invoke_callback3.00M ) { |
477 | 3.01M | InvokeCallback(); |
478 | 3.01M | } |
479 | 3.00M | } |
480 | | |
481 | 38.4k | void OutboundCall::SetTimedOut() { |
482 | 38.4k | DCHECK(!IsFinished()); |
483 | | |
484 | 38.4k | TRACE_TO(trace_, "Call TimedOut."); |
485 | 38.4k | bool invoke_callback; |
486 | 38.4k | { |
487 | 38.4k | auto status = STATUS_FORMAT( |
488 | 38.4k | TimedOut, |
489 | 38.4k | "$0 RPC (request call id $3) to $1 timed out after $2", |
490 | 38.4k | remote_method_->method_name(), |
491 | 38.4k | conn_id_.remote(), |
492 | 38.4k | controller_->timeout(), |
493 | 38.4k | call_id_); |
494 | 38.4k | std::lock_guard<simple_spinlock> l(lock_); |
495 | 38.4k | status_ = std::move(status); |
496 | 38.4k | invoke_callback = SetState(TIMED_OUT); |
497 | 38.4k | } |
498 | 38.4k | if (invoke_callback) { |
499 | 38.4k | InvokeCallback(); |
500 | 38.4k | } |
501 | 38.4k | } |
502 | | |
503 | 192 | bool OutboundCall::IsTimedOut() const { |
504 | 192 | return state_.load(std::memory_order_acquire) == TIMED_OUT; |
505 | 192 | } |
506 | | |
507 | 419M | bool OutboundCall::IsFinished() const { |
508 | 419M | return FinishedState(state_.load(std::memory_order_acquire)); |
509 | 419M | } |
510 | | |
511 | 7.52M | Result<Slice> OutboundCall::GetSidecar(size_t idx) const { |
512 | 7.52M | auto ptr = VERIFY_RESULT(GetSidecarPtr(idx)); |
513 | 0 | return Slice(ptr[0], ptr[1]); |
514 | 7.52M | } |
515 | | |
516 | 10.7M | Result<const uint8_t*const*> OutboundCall::GetSidecarPtr(size_t idx) const { |
517 | 10.7M | return call_response_.GetSidecarPtr(idx); |
518 | 10.7M | } |
519 | | |
520 | 7.21M | Result<SidecarHolder> OutboundCall::GetSidecarHolder(size_t idx) const { |
521 | 7.21M | return call_response_.GetSidecarHolder(idx); |
522 | 7.21M | } |
523 | | |
524 | 165 | string OutboundCall::ToString() const { |
525 | 165 | return Format("RPC call $0 -> $1 , state=$2.", *remote_method_, conn_id_, StateName(state_)); |
526 | 165 | } |
527 | | |
528 | | bool OutboundCall::DumpPB(const DumpRunningRpcsRequestPB& req, |
529 | 1 | RpcCallInProgressPB* resp) { |
530 | 1 | std::lock_guard<simple_spinlock> l(lock_); |
531 | 1 | auto state_value = state(); |
532 | 1 | if (!req.dump_timed_out() && state_value == RpcCallState::TIMED_OUT) { |
533 | 0 | return false; |
534 | 0 | } |
535 | 1 | if (!InitHeader(resp->mutable_header()).ok() && !req.dump_timed_out()0 ) { |
536 | | // Note that if we proceed here due to req.dump_timed_out() being true, then the |
537 | | // header.timeout_millis() will be inaccurate/not-set. This is ok because DumpPB |
538 | | // is only used for dumping the PB and not to send the RPC over the wire. |
539 | 0 | return false; |
540 | 0 | } |
541 | 1 | resp->set_elapsed_millis(MonoDelta(CoarseMonoClock::Now() - start_).ToMilliseconds()); |
542 | 1 | resp->set_state(state_value); |
543 | 1 | if (req.include_traces() && trace_) { |
544 | 1 | resp->set_trace_buffer(trace_->DumpToString(true)); |
545 | 1 | } |
546 | 1 | return true; |
547 | 1 | } |
548 | | |
549 | 0 | std::string OutboundCall::LogPrefix() const { |
550 | 0 | return Format("{ OutboundCall@$0 } ", this); |
551 | 0 | } |
552 | | |
553 | 75.6M | Result<uint32_t> OutboundCall::TimeoutMs() const { |
554 | 75.6M | MonoDelta timeout = controller_->timeout(); |
555 | 75.6M | if (timeout.Initialized()75.6M ) { |
556 | 75.6M | auto timeout_millis = timeout.ToMilliseconds(); |
557 | 75.6M | if (timeout_millis <= 0) { |
558 | 1.37k | return STATUS(TimedOut, "Call timed out before sending"); |
559 | 1.37k | } |
560 | 75.6M | return narrow_cast<uint32_t>(timeout_millis); |
561 | 18.4E | } else { |
562 | 18.4E | return 0; |
563 | 18.4E | } |
564 | 75.6M | } |
565 | | |
566 | 1 | Status OutboundCall::InitHeader(RequestHeader* header) { |
567 | 1 | header->set_call_id(call_id_); |
568 | 1 | remote_method_->ToPB(header->mutable_remote_method()); |
569 | | |
570 | 1 | if (!IsFinished()) { |
571 | 1 | header->set_timeout_millis(VERIFY_RESULT(TimeoutMs())); |
572 | 1 | } |
573 | 1 | return Status::OK(); |
574 | 1 | } |
575 | | |
576 | | /// |
577 | | /// ConnectionId |
578 | | /// |
579 | | |
580 | 10.2k | string ConnectionId::ToString() const { |
581 | 10.2k | return Format("{ remote: $0 idx: $1 protocol: $2 }", remote_, idx_, protocol_); |
582 | 10.2k | } |
583 | | |
584 | 104M | size_t ConnectionId::HashCode() const { |
585 | 104M | size_t seed = 0; |
586 | 104M | boost::hash_combine(seed, hash_value(remote_)); |
587 | 104M | boost::hash_combine(seed, idx_); |
588 | 104M | boost::hash_combine(seed, protocol_); |
589 | 104M | return seed; |
590 | 104M | } |
591 | | |
592 | 104M | size_t ConnectionIdHash::operator() (const ConnectionId& conn_id) const { |
593 | 104M | return conn_id.HashCode(); |
594 | 104M | } |
595 | | |
596 | | /// |
597 | | /// CallResponse |
598 | | /// |
599 | | |
600 | | CallResponse::CallResponse() |
601 | 159M | : parsed_(false) { |
602 | 159M | } |
603 | | |
604 | 17.9M | Result<const uint8_t*const*> CallResponse::GetSidecarPtr(size_t idx) const { |
605 | 17.9M | DCHECK(parsed_); |
606 | 17.9M | if (idx + 1 >= sidecar_bounds_.size()) { |
607 | 0 | return STATUS_FORMAT(InvalidArgument, "Index $0 does not reference a valid sidecar", idx); |
608 | 0 | } |
609 | 17.9M | return &sidecar_bounds_[idx]; |
610 | 17.9M | } |
611 | | |
612 | 7.21M | Result<SidecarHolder> CallResponse::GetSidecarHolder(size_t idx) const { |
613 | 7.21M | auto bounds = VERIFY_RESULT(GetSidecarPtr(idx)); |
614 | 0 | return SidecarHolder(response_data_.buffer(), Slice(bounds[0], bounds[1])); |
615 | 7.21M | } |
616 | | |
617 | 72.7M | Status CallResponse::ParseFrom(CallData* call_data) { |
618 | 72.7M | CHECK(!parsed_); |
619 | 72.7M | Slice entire_message; |
620 | | |
621 | 72.7M | response_data_ = std::move(*call_data); |
622 | 72.7M | Slice source(response_data_.data(), response_data_.size()); |
623 | 72.7M | RETURN_NOT_OK(ParseYBMessage(source, &header_, &entire_message)); |
624 | | |
625 | | // Use information from header to extract the payload slices. |
626 | 72.7M | const size_t sidecars = header_.sidecar_offsets_size(); |
627 | | |
628 | 72.7M | if (sidecars > 0) { |
629 | 4.52M | serialized_response_ = Slice(entire_message.data(), |
630 | 4.52M | header_.sidecar_offsets(0)); |
631 | 4.52M | sidecar_bounds_.reserve(sidecars + 1); |
632 | | |
633 | 4.52M | uint32_t prev_offset = 0; |
634 | 17.9M | for (auto offset : header_.sidecar_offsets()) { |
635 | 17.9M | if (offset > entire_message.size()17.9M || offset < prev_offset) { |
636 | 0 | return STATUS_FORMAT( |
637 | 0 | Corruption, |
638 | 0 | "Invalid sidecar offsets; sidecar apparently starts at $0," |
639 | 0 | " ends at $1, but the entire message has length $2", |
640 | 0 | prev_offset, offset, entire_message.size()); |
641 | 0 | } |
642 | 17.9M | sidecar_bounds_.push_back(entire_message.data() + offset); |
643 | 17.9M | prev_offset = offset; |
644 | 17.9M | } |
645 | 4.52M | sidecar_bounds_.emplace_back(entire_message.end()); |
646 | 68.2M | } else { |
647 | 68.2M | serialized_response_ = entire_message; |
648 | 68.2M | } |
649 | | |
650 | 72.7M | parsed_ = true; |
651 | 72.7M | return Status::OK(); |
652 | 72.7M | } |
653 | | |
654 | | const std::string kRpcErrorCategoryName = "rpc error"; |
655 | | |
656 | | StatusCategoryRegisterer rpc_error_category_registerer( |
657 | | StatusCategoryDescription::Make<RpcErrorTag>(&kRpcErrorCategoryName)); |
658 | | |
659 | | } // namespace rpc |
660 | | } // namespace yb |