/Users/deen/code/yugabyte-db/src/yb/rpc/yb_rpc.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // |
2 | | // Copyright (c) YugaByte, Inc. |
3 | | // |
4 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
5 | | // in compliance with the License. You may obtain a copy of the License at |
6 | | // |
7 | | // http://www.apache.org/licenses/LICENSE-2.0 |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
10 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
11 | | // or implied. See the License for the specific language governing permissions and limitations |
12 | | // under the License. |
13 | | // |
14 | | // |
15 | | |
16 | | #include "yb/rpc/yb_rpc.h" |
17 | | |
18 | | #include <google/protobuf/io/coded_stream.h> |
19 | | |
20 | | #include "yb/gutil/casts.h" |
21 | | #include "yb/gutil/endian.h" |
22 | | |
23 | | #include "yb/rpc/connection.h" |
24 | | #include "yb/rpc/messenger.h" |
25 | | #include "yb/rpc/reactor.h" |
26 | | #include "yb/rpc/rpc_context.h" |
27 | | #include "yb/rpc/rpc_introspection.pb.h" |
28 | | #include "yb/rpc/serialization.h" |
29 | | |
30 | | #include "yb/util/debug/trace_event.h" |
31 | | #include "yb/util/flag_tags.h" |
32 | | #include "yb/util/format.h" |
33 | | #include "yb/util/memory/memory.h" |
34 | | #include "yb/util/result.h" |
35 | | #include "yb/util/size_literals.h" |
36 | | #include "yb/util/status_format.h" |
37 | | |
38 | | using google::protobuf::io::CodedInputStream; |
39 | | using namespace yb::size_literals; |
40 | | using namespace std::literals; |
41 | | |
42 | | DECLARE_bool(rpc_dump_all_traces); |
43 | | DECLARE_uint64(rpc_max_message_size); |
44 | | |
45 | | DEFINE_bool(enable_rpc_keepalive, true, "Whether to enable RPC keepalive mechanism"); |
46 | | |
47 | | DEFINE_uint64(min_sidecar_buffer_size, 16_KB, "Minimal buffer to allocate for sidecar"); |
48 | | |
49 | | DEFINE_test_flag(uint64, yb_inbound_big_calls_parse_delay_ms, false, |
50 | | "Test flag for simulating slow parsing of inbound calls larger than " |
51 | | "rpc_throttle_threshold_bytes"); |
52 | | |
53 | | using std::placeholders::_1; |
54 | | DECLARE_uint64(rpc_connection_timeout_ms); |
55 | | DECLARE_int32(rpc_slow_query_threshold_ms); |
56 | | DECLARE_int64(rpc_throttle_threshold_bytes); |
57 | | |
58 | | namespace yb { |
59 | | namespace rpc { |
60 | | |
61 | | constexpr const auto kHeartbeatsPerTimeoutPeriod = 3; |
62 | | |
63 | | namespace { |
64 | | |
65 | | // One byte after YugaByte is reserved for future use. It could control type of connection. |
66 | | const char kConnectionHeaderBytes[] = "YB\1"; |
67 | | const size_t kConnectionHeaderSize = sizeof(kConnectionHeaderBytes) - 1; |
68 | | |
69 | 3.11M | OutboundDataPtr ConnectionHeaderInstance() { |
70 | 3.11M | static OutboundDataPtr result = std::make_shared<StringOutboundData>( |
71 | 3.11M | kConnectionHeaderBytes, kConnectionHeaderSize, "ConnectionHeader"); |
72 | 3.11M | return result; |
73 | 3.11M | } |
74 | | |
75 | | const char kEmptyMsgLengthPrefix[kMsgLengthPrefixLength] = {0}; |
76 | | |
77 | | class HeartbeatOutboundData : public StringOutboundData { |
78 | | public: |
79 | 9.30M | bool IsHeartbeat() const override { return true; } |
80 | | |
81 | 9.31M | static std::shared_ptr<HeartbeatOutboundData> Instance() { |
82 | 9.31M | static std::shared_ptr<HeartbeatOutboundData> instance(new HeartbeatOutboundData()); |
83 | 9.31M | return instance; |
84 | 9.31M | } |
85 | | |
86 | | private: |
87 | | HeartbeatOutboundData() : |
88 | 11.0k | StringOutboundData(kEmptyMsgLengthPrefix, kMsgLengthPrefixLength, "Heartbeat") {} |
89 | | }; |
90 | | |
91 | | } // namespace |
92 | | |
93 | | using google::protobuf::FieldDescriptor; |
94 | | using google::protobuf::Message; |
95 | | using google::protobuf::MessageLite; |
96 | | using google::protobuf::io::CodedOutputStream; |
97 | | |
98 | | YBConnectionContext::YBConnectionContext( |
99 | | size_t receive_buffer_size, const MemTrackerPtr& buffer_tracker, |
100 | | const MemTrackerPtr& call_tracker) |
101 | | : parser_(buffer_tracker, kMsgLengthPrefixLength, 0 /* size_offset */, |
102 | | FLAGS_rpc_max_message_size, IncludeHeader::kFalse, rpc::SkipEmptyMessages::kTrue, |
103 | | this), |
104 | | read_buffer_(receive_buffer_size, buffer_tracker), |
105 | 3.75M | call_tracker_(call_tracker) {} |
106 | | |
107 | 3.74M | void YBConnectionContext::SetEventLoop(ev::loop_ref* loop) { |
108 | 3.74M | loop_ = loop; |
109 | 3.74M | } |
110 | | |
111 | 6.31M | void YBConnectionContext::Shutdown(const Status& status) { |
112 | 6.31M | timer_.Shutdown(); |
113 | 6.31M | loop_ = nullptr; |
114 | 6.31M | } |
115 | | |
116 | 3.24M | YBConnectionContext::~YBConnectionContext() {} |
117 | | |
118 | | namespace { |
119 | | |
120 | 1.10G | CoarseMonoClock::Duration Timeout() { |
121 | 1.10G | return FLAGS_rpc_connection_timeout_ms * 1ms; |
122 | 1.10G | } |
123 | | |
124 | 1.10G | CoarseMonoClock::Duration HeartbeatPeriod() { |
125 | 1.10G | return Timeout() / kHeartbeatsPerTimeoutPeriod; |
126 | 1.10G | } |
127 | | |
128 | | } // namespace |
129 | | |
130 | 141M | uint64_t YBConnectionContext::ExtractCallId(InboundCall* call) { |
131 | 141M | return down_cast<YBInboundCall*>(call)->call_id(); |
132 | 141M | } |
133 | | |
134 | | Result<ProcessCallsResult> YBInboundConnectionContext::ProcessCalls( |
135 | 70.0M | const ConnectionPtr& connection, const IoVecs& data, ReadBufferFull read_buffer_full) { |
136 | 70.0M | if (state_ == RpcConnectionPB::NEGOTIATING) { |
137 | | // We assume that header is fully contained in the first block. |
138 | 609k | if (data[0].iov_len < kConnectionHeaderSize) { |
139 | 0 | return ProcessCallsResult{ 0, Slice() }; |
140 | 0 | } |
141 | | |
142 | 609k | Slice slice(static_cast<const char*>(data[0].iov_base), data[0].iov_len); |
143 | 609k | if (!slice.starts_with(kConnectionHeaderBytes, kConnectionHeaderSize)) { |
144 | 0 | return STATUS_FORMAT(NetworkError, |
145 | 0 | "Invalid connection header: $0", |
146 | 0 | slice.ToDebugHexString()); |
147 | 0 | } |
148 | 609k | state_ = RpcConnectionPB::OPEN; |
149 | 609k | IoVecs data_copy(data); |
150 | 609k | data_copy[0].iov_len -= kConnectionHeaderSize; |
151 | 609k | data_copy[0].iov_base = const_cast<uint8_t*>(slice.data() + kConnectionHeaderSize); |
152 | 609k | auto result = VERIFY_RESULT( |
153 | 609k | parser().Parse(connection, data_copy, ReadBufferFull::kFalse, &call_tracker())); |
154 | 0 | result.consumed += kConnectionHeaderSize; |
155 | 609k | return result; |
156 | 609k | } |
157 | | |
158 | 69.3M | return parser().Parse(connection, data, read_buffer_full, &call_tracker()); |
159 | 70.0M | } |
160 | | |
161 | | namespace { |
162 | | |
163 | 141M | CHECKED_STATUS ThrottleRpcStatus(const MemTrackerPtr& throttle_tracker, const YBInboundCall& call) { |
164 | 141M | if (ShouldThrottleRpc(throttle_tracker, call.request_data().size(), "Rejecting RPC call: ")) { |
165 | 0 | return STATUS_FORMAT(ServiceUnavailable, "Call rejected due to memory pressure: $0", call); |
166 | 141M | } else { |
167 | 141M | return Status::OK(); |
168 | 141M | } |
169 | 141M | } |
170 | | |
171 | | } // namespace |
172 | | |
173 | | Status YBInboundConnectionContext::HandleCall( |
174 | 70.8M | const ConnectionPtr& connection, CallData* call_data) { |
175 | 70.8M | auto reactor = connection->reactor(); |
176 | 70.8M | DCHECK(reactor->IsCurrentThread()); |
177 | | |
178 | 70.8M | auto call = InboundCall::Create<YBInboundCall>(connection, call_processed_listener()); |
179 | | |
180 | 70.8M | Status s = call->ParseFrom(call_tracker(), call_data); |
181 | 70.8M | if (!s.ok()) { |
182 | 0 | return s; |
183 | 0 | } |
184 | | |
185 | 70.8M | s = Store(call.get()); |
186 | 70.8M | if (!s.ok()) { |
187 | 0 | return s; |
188 | 0 | } |
189 | | |
190 | 70.8M | auto throttle_status = ThrottleRpcStatus(call_tracker(), *call); |
191 | 70.8M | if (!throttle_status.ok()) { |
192 | 0 | call->RespondFailure(ErrorStatusPB::ERROR_APPLICATION, throttle_status); |
193 | 0 | return Status::OK(); |
194 | 0 | } |
195 | | |
196 | 70.8M | reactor->messenger()->Handle(call, Queue::kTrue); |
197 | | |
198 | 70.8M | return Status::OK(); |
199 | 70.8M | } |
200 | | |
201 | 609k | void YBInboundConnectionContext::Connected(const ConnectionPtr& connection) { |
202 | 609k | DCHECK_EQ(connection->direction(), Connection::Direction::SERVER); |
203 | | |
204 | 609k | state_ = RpcConnectionPB::NEGOTIATING; |
205 | | |
206 | 609k | connection_ = connection; |
207 | 609k | last_write_time_ = connection->reactor()->cur_time(); |
208 | 609k | if (FLAGS_enable_rpc_keepalive) { |
209 | 609k | timer_.Init(*loop_); |
210 | 609k | timer_.SetCallback< |
211 | 609k | YBInboundConnectionContext, &YBInboundConnectionContext::HandleTimeout>(this); |
212 | 609k | timer_.Start(HeartbeatPeriod()); |
213 | 609k | } |
214 | 609k | } |
215 | | |
216 | 79.4M | void YBInboundConnectionContext::UpdateLastWrite(const ConnectionPtr& connection) { |
217 | 79.4M | last_write_time_ = connection->reactor()->cur_time(); |
218 | 79.4M | VLOG(4) << connection->ToString() << ": " << "Updated last_write_time_=" |
219 | 27.9k | << AsString(last_write_time_); |
220 | 79.4M | } |
221 | | |
222 | 1.09G | void YBInboundConnectionContext::HandleTimeout(ev::timer& watcher, int revents) { // NOLINT |
223 | 1.09G | const auto connection = connection_.lock(); |
224 | 1.09G | if (connection1.09G ) { |
225 | 1.09G | if (EV_ERROR & revents) { |
226 | 0 | LOG(WARNING) << connection->ToString() << ": " << "Got an error in handle timeout"; |
227 | 0 | return; |
228 | 0 | } |
229 | | |
230 | 1.09G | const auto now = connection->reactor()->cur_time(); |
231 | | |
232 | 1.09G | const auto deadline = |
233 | 1.09G | std::max(last_heartbeat_sending_time_, last_write_time_) + HeartbeatPeriod(); |
234 | 1.09G | if (now >= deadline) { |
235 | 9.31M | if (last_write_time_ >= last_heartbeat_sending_time_) { |
236 | | // last_write_time_ < last_heartbeat_sending_time_ means that last heartbeat we've queued |
237 | | // for sending is still in queue due to RPC/networking issues, so no need to queue |
238 | | // another one. |
239 | 9.31M | VLOG(4) << connection->ToString() << ": " << "Sending heartbeat, now: " << AsString(now) |
240 | 151 | << ", deadline: " << AsString(deadline) |
241 | 151 | << ", last_write_time_: " << AsString(last_write_time_) |
242 | 151 | << ", last_heartbeat_sending_time_: " << AsString(last_heartbeat_sending_time_); |
243 | 9.31M | connection->QueueOutboundData(HeartbeatOutboundData::Instance()); |
244 | 9.31M | last_heartbeat_sending_time_ = now; |
245 | 9.31M | } |
246 | 9.31M | timer_.Start(HeartbeatPeriod()); |
247 | 1.08G | } else { |
248 | 1.08G | timer_.Start(deadline - now); |
249 | 1.08G | } |
250 | 1.09G | } |
251 | 1.09G | } |
252 | | |
253 | | YBInboundCall::YBInboundCall(ConnectionPtr conn, CallProcessedListener call_processed_listener) |
254 | 70.8M | : InboundCall(std::move(conn), nullptr /* rpc_metrics */, std::move(call_processed_listener)) {} |
255 | | |
256 | | YBInboundCall::YBInboundCall(RpcMetrics* rpc_metrics, const RemoteMethod& remote_method) |
257 | 10.8M | : InboundCall(nullptr /* conn */, rpc_metrics, nullptr /* call_processed_listener */) { |
258 | 10.8M | header_.remote_method = remote_method.serialized_body(); |
259 | 10.8M | } |
260 | | |
261 | 81.6M | YBInboundCall::~YBInboundCall() {} |
262 | | |
263 | 184M | CoarseTimePoint YBInboundCall::GetClientDeadline() const { |
264 | 184M | if (header_.timeout_ms == 0) { |
265 | 20.3k | return CoarseTimePoint::max(); |
266 | 20.3k | } |
267 | 184M | return ToCoarse(timing_.time_received) + header_.timeout_ms * 1ms; |
268 | 184M | } |
269 | | |
270 | 70.8M | Status YBInboundCall::ParseFrom(const MemTrackerPtr& mem_tracker, CallData* call_data) { |
271 | 70.8M | TRACE_EVENT_FLOW_BEGIN0("rpc", "YBInboundCall", this); |
272 | 70.8M | TRACE_EVENT0("rpc", "YBInboundCall::ParseFrom"); |
273 | | |
274 | 70.8M | Slice source(call_data->data(), call_data->size()); |
275 | 70.8M | RETURN_NOT_OK(ParseYBMessage(source, &header_, &serialized_request_)); |
276 | 18.4E | DVLOG(4) << "Parsed YBInboundCall header: " << header_.call_id; |
277 | | |
278 | 70.8M | consumption_ = ScopedTrackedConsumption(mem_tracker, call_data->size()); |
279 | 70.8M | request_data_memory_usage_.store(call_data->size(), std::memory_order_release); |
280 | 70.8M | request_data_ = std::move(*call_data); |
281 | | |
282 | | // Adopt the service/method info from the header as soon as it's available. |
283 | 70.8M | if (PREDICT_FALSE(header_.remote_method.empty())) { |
284 | 0 | return STATUS(Corruption, "Non-connection context request header must specify remote_method"); |
285 | 0 | } |
286 | | |
287 | 70.8M | return Status::OK(); |
288 | 70.8M | } |
289 | | |
290 | 17.9M | size_t YBInboundCall::CopyToLastSidecarBuffer(const Slice& car) { |
291 | 17.9M | if (sidecar_buffers_.empty()) { |
292 | 4.09M | return 0; |
293 | 4.09M | } |
294 | 13.8M | auto& last_buffer = sidecar_buffers_.back(); |
295 | 13.8M | auto len = std::min(last_buffer.size() - filled_bytes_in_last_sidecar_buffer_, car.size()); |
296 | 13.8M | memcpy(last_buffer.data() + filled_bytes_in_last_sidecar_buffer_, car.data(), len); |
297 | 13.8M | filled_bytes_in_last_sidecar_buffer_ += len; |
298 | | |
299 | 13.8M | return len; |
300 | 17.9M | } |
301 | | |
302 | 17.9M | size_t YBInboundCall::AddRpcSidecar(Slice car) { |
303 | 17.9M | sidecar_offsets_.Add(narrow_cast<uint32_t>(total_sidecars_size_)); |
304 | 17.9M | total_sidecars_size_ += car.size(); |
305 | | // Copy start of sidecar to existing buffer if present. |
306 | 17.9M | car.remove_prefix(CopyToLastSidecarBuffer(car)); |
307 | | |
308 | | // If sidecar did not fit into last buffer, then we should allocate a new one. |
309 | 17.9M | if (!car.empty()) { |
310 | 4.09M | DCHECK(sidecar_buffers_.empty() || |
311 | 4.09M | filled_bytes_in_last_sidecar_buffer_ == sidecar_buffers_.back().size()); |
312 | | |
313 | | // Allocate new sidecar buffer and copy remaining part of sidecar to it. |
314 | 4.09M | AllocateSidecarBuffer(std::max<size_t>(car.size(), FLAGS_min_sidecar_buffer_size)); |
315 | 4.09M | memcpy(sidecar_buffers_.back().data(), car.data(), car.size()); |
316 | 4.09M | filled_bytes_in_last_sidecar_buffer_ = car.size(); |
317 | 4.09M | } |
318 | | |
319 | 17.9M | return num_sidecars_++; |
320 | 17.9M | } |
321 | | |
322 | 9.43M | void YBInboundCall::ResetRpcSidecars() { |
323 | 9.43M | if (consumption_) { |
324 | 2.05M | for (const auto& buffer : sidecar_buffers_) { |
325 | 0 | consumption_.Add(-buffer.size()); |
326 | 0 | } |
327 | 2.05M | } |
328 | 9.43M | num_sidecars_ = 0; |
329 | 9.43M | filled_bytes_in_last_sidecar_buffer_ = 0; |
330 | 9.43M | total_sidecars_size_ = 0; |
331 | 9.43M | sidecar_buffers_.clear(); |
332 | 9.43M | sidecar_offsets_.Clear(); |
333 | 9.43M | } |
334 | | |
335 | 633k | void YBInboundCall::ReserveSidecarSpace(size_t space) { |
336 | 633k | if (num_sidecars_ != 0) { |
337 | 0 | LOG(DFATAL) << "Attempt to ReserveSidecarSpace when there are already sidecars present"; |
338 | 0 | return; |
339 | 0 | } |
340 | | |
341 | 633k | AllocateSidecarBuffer(space); |
342 | 633k | } |
343 | | |
344 | 4.72M | void YBInboundCall::AllocateSidecarBuffer(size_t size) { |
345 | 4.72M | sidecar_buffers_.push_back(RefCntBuffer(size)); |
346 | 4.72M | if (consumption_) { |
347 | 4.52M | consumption_.Add(size); |
348 | 4.52M | } |
349 | 4.72M | } |
350 | | |
351 | 70.8M | Status YBInboundCall::SerializeResponseBuffer(AnyMessageConstPtr response, bool is_success) { |
352 | 70.8M | auto body_size = response.SerializedSize(); |
353 | | |
354 | 70.8M | ResponseHeader resp_hdr; |
355 | 70.8M | resp_hdr.set_call_id(header_.call_id); |
356 | 70.8M | resp_hdr.set_is_error(!is_success); |
357 | 70.8M | for (auto& offset : sidecar_offsets_) { |
358 | 17.9M | offset += body_size; |
359 | 17.9M | } |
360 | 70.8M | *resp_hdr.mutable_sidecar_offsets() = std::move(sidecar_offsets_); |
361 | | |
362 | 70.8M | response_buf_ = VERIFY_RESULT(SerializeRequest( |
363 | 0 | body_size, total_sidecars_size_, resp_hdr, response)); |
364 | 0 | return Status::OK(); |
365 | 70.8M | } |
366 | | |
367 | 9.26k | string YBInboundCall::ToString() const { |
368 | 9.26k | return Format("Call $0 $1 => $2 (request call id $3)", |
369 | 9.26k | header_.RemoteMethodAsString(), remote_address(), local_address(), header_.call_id); |
370 | 9.26k | } |
371 | | |
372 | | bool YBInboundCall::DumpPB(const DumpRunningRpcsRequestPB& req, |
373 | 1 | RpcCallInProgressPB* resp) { |
374 | 1 | header_.ToPB(resp->mutable_header()); |
375 | 1 | if (req.include_traces() && trace_) { |
376 | 1 | resp->set_trace_buffer(trace_->DumpToString(true)); |
377 | 1 | } |
378 | 1 | resp->set_elapsed_millis(MonoTime::Now().GetDeltaSince(timing_.time_received) |
379 | 1 | .ToMilliseconds()); |
380 | 1 | return true; |
381 | 1 | } |
382 | | |
383 | 70.8M | void YBInboundCall::LogTrace() const { |
384 | 70.8M | MonoTime now = MonoTime::Now(); |
385 | 70.8M | auto total_time = now.GetDeltaSince(timing_.time_received).ToMilliseconds(); |
386 | | |
387 | 70.8M | if (header_.timeout_ms > 0) { |
388 | 70.7M | double log_threshold = header_.timeout_ms * 0.75f; |
389 | 70.7M | if (total_time > log_threshold) { |
390 | | // TODO: consider pushing this onto another thread since it may be slow. |
391 | | // The traces may also be too large to fit in a log message. |
392 | 8.53k | LOG(WARNING) << ToString() << " took " << total_time << "ms (client timeout " |
393 | 8.53k | << header_.timeout_ms << "ms)."; |
394 | 8.53k | std::string s = trace_->DumpToString(1, true); |
395 | 8.53k | if (!s.empty()) { |
396 | 6.46k | LOG(WARNING) << "Trace:\n" << s; |
397 | 6.46k | } |
398 | 8.53k | return; |
399 | 8.53k | } |
400 | 70.7M | } |
401 | | |
402 | 70.8M | if (PREDICT_FALSE( |
403 | 70.8M | trace_->must_print() || |
404 | 70.8M | FLAGS_rpc_dump_all_traces || |
405 | 70.8M | total_time > FLAGS_rpc_slow_query_threshold_ms)) { |
406 | 177 | LOG(INFO) << ToString() << " took " << total_time << "ms. Trace:"; |
407 | 177 | trace_->Dump(&LOG(INFO), true); |
408 | 177 | } |
409 | 70.8M | } |
410 | | |
411 | 70.8M | void YBInboundCall::DoSerialize(boost::container::small_vector_base<RefCntBuffer>* output) { |
412 | 70.8M | TRACE_EVENT0("rpc", "YBInboundCall::Serialize"); |
413 | 70.8M | CHECK_GT(response_buf_.size(), 0); |
414 | 70.8M | output->push_back(std::move(response_buf_)); |
415 | 70.8M | if (!sidecar_buffers_.empty()) { |
416 | 4.52M | sidecar_buffers_.back().Shrink(filled_bytes_in_last_sidecar_buffer_); |
417 | 4.52M | for (auto& car : sidecar_buffers_) { |
418 | 4.52M | output->push_back(std::move(car)); |
419 | 4.52M | } |
420 | 4.52M | sidecar_buffers_.clear(); |
421 | 4.52M | } |
422 | 70.8M | } |
423 | | |
424 | 70.3M | Status YBInboundCall::ParseParam(RpcCallParams* params) { |
425 | 70.3M | RETURN_NOT_OK(ThrottleRpcStatus(consumption_.mem_tracker(), *this)); |
426 | | |
427 | 70.3M | auto consumption = params->ParseRequest(serialized_request()); |
428 | 70.3M | if (!consumption.ok()) { |
429 | 1 | auto status = consumption.status().CloneAndPrepend( |
430 | 1 | Format("Invalid parameter for call $0", header_.RemoteMethodAsString())); |
431 | 1 | LOG(WARNING) << status; |
432 | 1 | return status; |
433 | 1 | } |
434 | 70.3M | consumption_.Add(*consumption); |
435 | | |
436 | 70.3M | if (PREDICT_FALSE(FLAGS_TEST_yb_inbound_big_calls_parse_delay_ms > 0 && |
437 | 70.3M | implicit_cast<ssize_t>(request_data_.size()) > FLAGS_rpc_throttle_threshold_bytes)) { |
438 | 8 | std::this_thread::sleep_for(FLAGS_TEST_yb_inbound_big_calls_parse_delay_ms * 1ms); |
439 | 8 | } |
440 | | |
441 | 70.3M | return Status::OK(); |
442 | 70.3M | } |
443 | | |
444 | 81.3M | void YBInboundCall::RespondSuccess(AnyMessageConstPtr response) { |
445 | 81.3M | TRACE_EVENT0("rpc", "InboundCall::RespondSuccess"); |
446 | 81.3M | Respond(response, true); |
447 | 81.3M | } |
448 | | |
449 | | void YBInboundCall::RespondFailure(ErrorStatusPB::RpcErrorCodePB error_code, |
450 | 270k | const Status& status) { |
451 | 270k | TRACE_EVENT0("rpc", "InboundCall::RespondFailure"); |
452 | 270k | ErrorStatusPB err; |
453 | 270k | err.set_message(status.ToString()); |
454 | 270k | err.set_code(error_code); |
455 | | |
456 | 270k | Respond(AnyMessageConstPtr(&err), false); |
457 | 270k | } |
458 | | |
459 | | void YBInboundCall::RespondApplicationError(int error_ext_id, const std::string& message, |
460 | 2.03k | const MessageLite& app_error_pb) { |
461 | 2.03k | ErrorStatusPB err; |
462 | 2.03k | ApplicationErrorToPB(error_ext_id, message, app_error_pb, &err); |
463 | 2.03k | Respond(AnyMessageConstPtr(&err), false); |
464 | 2.03k | } |
465 | | |
466 | | void YBInboundCall::ApplicationErrorToPB(int error_ext_id, const std::string& message, |
467 | | const google::protobuf::MessageLite& app_error_pb, |
468 | 2.03k | ErrorStatusPB* err) { |
469 | 2.03k | err->set_message(message); |
470 | 2.03k | const FieldDescriptor* app_error_field = |
471 | 2.03k | err->GetReflection()->FindKnownExtensionByNumber(error_ext_id); |
472 | 2.03k | if (app_error_field != nullptr2.03k ) { |
473 | 2.03k | err->GetReflection()->MutableMessage(err, app_error_field)->CheckTypeAndMergeFrom(app_error_pb); |
474 | 18.4E | } else { |
475 | 18.4E | LOG(DFATAL) << "Unable to find application error extension ID " << error_ext_id |
476 | 18.4E | << " (message=" << message << ")"; |
477 | 18.4E | } |
478 | 2.03k | } |
479 | | |
480 | 70.8M | void YBInboundCall::Respond(AnyMessageConstPtr response, bool is_success) { |
481 | 70.8M | TRACE_EVENT_FLOW_END0("rpc", "InboundCall", this); |
482 | 70.8M | Status s = SerializeResponseBuffer(response, is_success); |
483 | 70.8M | if (PREDICT_FALSE(!s.ok())) { |
484 | 0 | RespondFailure(ErrorStatusPB::ERROR_APPLICATION, s); |
485 | 0 | return; |
486 | 0 | } |
487 | | |
488 | 70.8M | TRACE_EVENT_ASYNC_END1("rpc", "InboundCall", this, "method", method_name().ToBuffer()); |
489 | | |
490 | 70.8M | QueueResponse(is_success); |
491 | 70.8M | } |
492 | | |
493 | 38.6M | Slice YBInboundCall::method_name() const { |
494 | 38.6M | auto parsed_remote_method = ParseRemoteMethod(header_.remote_method); |
495 | 38.6M | return parsed_remote_method.ok() ? parsed_remote_method->method38.6M : Slice()14.2k ; |
496 | 38.6M | } |
497 | | |
498 | | Status YBOutboundConnectionContext::HandleCall( |
499 | 72.8M | const ConnectionPtr& connection, CallData* call_data) { |
500 | 72.8M | return connection->HandleCallResponse(call_data); |
501 | 72.8M | } |
502 | | |
503 | 605k | void YBOutboundConnectionContext::Connected(const ConnectionPtr& connection) { |
504 | 605k | DCHECK_EQ(connection->direction(), Connection::Direction::CLIENT); |
505 | 605k | connection_ = connection; |
506 | 605k | last_read_time_ = connection->reactor()->cur_time(); |
507 | 605k | if (FLAGS_enable_rpc_keepalive) { |
508 | 599k | timer_.Init(*loop_); |
509 | 599k | timer_.SetCallback< |
510 | 599k | YBOutboundConnectionContext, &YBOutboundConnectionContext::HandleTimeout>(this); |
511 | 599k | timer_.Start(Timeout()); |
512 | 599k | } |
513 | 605k | } |
514 | | |
515 | 3.12M | void YBOutboundConnectionContext::AssignConnection(const ConnectionPtr& connection) { |
516 | 3.12M | connection->QueueOutboundData(ConnectionHeaderInstance()); |
517 | 3.12M | } |
518 | | |
519 | | Result<ProcessCallsResult> YBOutboundConnectionContext::ProcessCalls( |
520 | 81.2M | const ConnectionPtr& connection, const IoVecs& data, ReadBufferFull read_buffer_full) { |
521 | 81.2M | return parser().Parse(connection, data, read_buffer_full, nullptr /* tracker_for_throttle */); |
522 | 81.2M | } |
523 | | |
524 | 84.2M | void YBOutboundConnectionContext::UpdateLastRead(const ConnectionPtr& connection) { |
525 | 84.2M | last_read_time_ = connection->reactor()->cur_time(); |
526 | 84.2M | VLOG(4) << Format("$0: Updated last_read_time_=$1", connection, last_read_time_)78.4k ; |
527 | 84.2M | } |
528 | | |
529 | 7.65M | void YBOutboundConnectionContext::HandleTimeout(ev::timer& watcher, int revents) { // NOLINT |
530 | 7.65M | const auto connection = connection_.lock(); |
531 | 7.65M | if (connection) { |
532 | 7.65M | VLOG(5) << Format("$0: YBOutboundConnectionContext::HandleTimeout", connection)102 ; |
533 | 7.65M | if (EV_ERROR & revents) { |
534 | 0 | LOG(WARNING) << connection->ToString() << ": " << "Got an error in handle timeout"; |
535 | 0 | return; |
536 | 0 | } |
537 | | |
538 | 7.65M | const auto now = connection->reactor()->cur_time(); |
539 | 7.65M | const MonoDelta timeout = Timeout(); |
540 | | |
541 | 7.65M | auto deadline = last_read_time_ + timeout; |
542 | 7.65M | VLOG(5) << Format( |
543 | 885 | "$0: YBOutboundConnectionContext::HandleTimeout last_read_time_: $1, timeout: $2", |
544 | 885 | connection, last_read_time_, timeout); |
545 | 7.65M | if (now > deadline) { |
546 | 33.1k | auto passed = now - last_read_time_; |
547 | 33.1k | const auto status = STATUS_FORMAT( |
548 | 33.1k | NetworkError, "Rpc timeout, passed: $0, timeout: $1, now: $2, last_read_time_: $3", |
549 | 33.1k | passed, timeout, now, last_read_time_); |
550 | 33.1k | LOG(WARNING) << connection->ToString() << ": " << status; |
551 | 33.1k | connection->reactor()->DestroyConnection(connection.get(), status); |
552 | 33.1k | return; |
553 | 33.1k | } |
554 | | |
555 | 7.62M | timer_.Start(deadline - now); |
556 | 7.62M | } |
557 | 7.65M | } |
558 | | |
559 | | } // namespace rpc |
560 | | } // namespace yb |