/Users/deen/code/yugabyte-db/src/yb/rpc/connection.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | // |
18 | | // The following only applies to changes made to this file as part of YugaByte development. |
19 | | // |
20 | | // Portions Copyright (c) YugaByte, Inc. |
21 | | // |
22 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
23 | | // in compliance with the License. You may obtain a copy of the License at |
24 | | // |
25 | | // http://www.apache.org/licenses/LICENSE-2.0 |
26 | | // |
27 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
28 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
29 | | // or implied. See the License for the specific language governing permissions and limitations |
30 | | // under the License. |
31 | | // |
32 | | |
33 | | #include "yb/rpc/connection.h" |
34 | | |
35 | | #include <thread> |
36 | | #include <utility> |
37 | | |
38 | | #include "yb/gutil/map-util.h" |
39 | | #include "yb/gutil/strings/substitute.h" |
40 | | |
41 | | #include "yb/rpc/connection_context.h" |
42 | | #include "yb/rpc/messenger.h" |
43 | | #include "yb/rpc/reactor.h" |
44 | | #include "yb/rpc/rpc_controller.h" |
45 | | #include "yb/rpc/rpc_introspection.pb.h" |
46 | | #include "yb/rpc/rpc_metrics.h" |
47 | | |
48 | | #include "yb/util/enums.h" |
49 | | #include "yb/util/format.h" |
50 | | #include "yb/util/logging.h" |
51 | | #include "yb/util/metrics.h" |
52 | | #include "yb/util/result.h" |
53 | | #include "yb/util/status_format.h" |
54 | | #include "yb/util/string_util.h" |
55 | | #include "yb/util/trace.h" |
56 | | #include "yb/util/tsan_util.h" |
57 | | |
58 | | using namespace std::literals; |
59 | | using namespace std::placeholders; |
60 | | using std::shared_ptr; |
61 | | using std::vector; |
62 | | using strings::Substitute; |
63 | | |
64 | | DEFINE_uint64(rpc_connection_timeout_ms, yb::NonTsanVsTsan(15000, 30000), |
65 | | "Timeout for RPC connection operations"); |
66 | | |
67 | | METRIC_DEFINE_histogram_with_percentiles( |
68 | | server, handler_latency_outbound_transfer, "Time taken to transfer the response ", |
69 | | yb::MetricUnit::kMicroseconds, "Microseconds spent to queue and write the response to the wire", |
70 | | 60000000LU, 2); |
71 | | |
72 | | namespace yb { |
73 | | namespace rpc { |
74 | | |
75 | | /// |
76 | | /// Connection |
77 | | /// |
78 | | Connection::Connection(Reactor* reactor, |
79 | | std::unique_ptr<Stream> stream, |
80 | | Direction direction, |
81 | | RpcMetrics* rpc_metrics, |
82 | | std::unique_ptr<ConnectionContext> context) |
83 | | : reactor_(reactor), |
84 | | stream_(std::move(stream)), |
85 | | direction_(direction), |
86 | | last_activity_time_(CoarseMonoClock::Now()), |
87 | | rpc_metrics_(rpc_metrics), |
88 | 3.76M | context_(std::move(context)) { |
89 | 3.76M | const auto metric_entity = reactor->messenger()->metric_entity(); |
90 | 3.76M | handler_latency_outbound_transfer_ = metric_entity ? |
91 | 3.70M | METRIC_handler_latency_outbound_transfer.Instantiate(metric_entity) : nullptr61.7k ; |
92 | 3.76M | IncrementCounter(rpc_metrics_->connections_created); |
93 | 3.76M | IncrementGauge(rpc_metrics_->connections_alive); |
94 | 3.76M | } |
95 | | |
96 | 3.25M | Connection::~Connection() { |
97 | 3.25M | DecrementGauge(rpc_metrics_->connections_alive); |
98 | 3.25M | } |
99 | | |
100 | 0 | void UpdateIdleReason(const char* message, bool* result, std::string* reason) { |
101 | 0 | *result = false; |
102 | 0 | if (reason) { |
103 | 0 | AppendWithSeparator(message, reason); |
104 | 0 | } |
105 | 0 | } |
106 | | |
107 | 916M | bool Connection::Idle(std::string* reason_not_idle) const { |
108 | 916M | DCHECK(reactor_->IsCurrentThread()); |
109 | | |
110 | 916M | bool result = stream_->Idle(reason_not_idle); |
111 | | |
112 | | // Connection is not idle if calls are waiting for a response. |
113 | 916M | if (!awaiting_response_.empty()) { |
114 | 0 | UpdateIdleReason("awaiting response", &result, reason_not_idle); |
115 | 0 | } |
116 | | |
117 | | // Check upstream logic (i.e. processing calls, etc.) |
118 | 916M | return context_->Idle(reason_not_idle) && result906M ; |
119 | 916M | } |
120 | | |
121 | 0 | std::string Connection::ReasonNotIdle() const { |
122 | 0 | std::string reason; |
123 | 0 | Idle(&reason); |
124 | 0 | return reason; |
125 | 0 | } |
126 | | |
127 | 6.33M | void Connection::Shutdown(const Status& status) { |
128 | 6.33M | DCHECK(reactor_->IsCurrentThread()); |
129 | | |
130 | 6.33M | { |
131 | 6.33M | std::lock_guard<simple_spinlock> lock(outbound_data_queue_lock_); |
132 | 6.33M | outbound_data_being_processed_.swap(outbound_data_to_process_); |
133 | 6.33M | shutdown_status_ = status; |
134 | 6.33M | } |
135 | | |
136 | | // Clear any calls which have been sent and were awaiting a response. |
137 | 6.33M | for (auto& v : awaiting_response_) { |
138 | 33.3k | if (v.second) { |
139 | 14.0k | v.second->SetFailed(status); |
140 | 14.0k | } |
141 | 33.3k | } |
142 | 6.33M | awaiting_response_.clear(); |
143 | | |
144 | 6.33M | for (auto& call : outbound_data_being_processed_) { |
145 | 479 | call->Transferred(status, this); |
146 | 479 | } |
147 | 6.33M | outbound_data_being_processed_.clear(); |
148 | | |
149 | 6.33M | context_->Shutdown(status); |
150 | | |
151 | 6.33M | stream_->Shutdown(status); |
152 | 6.33M | timer_.Shutdown(); |
153 | | |
154 | | // TODO(bogdan): re-enable once we decide how to control verbose logs better... |
155 | | // LOG_WITH_PREFIX(INFO) << "Connection::Shutdown completed, status: " << status; |
156 | 6.33M | } |
157 | | |
158 | 165M | void Connection::OutboundQueued() { |
159 | 165M | DCHECK(reactor_->IsCurrentThread()); |
160 | | |
161 | 165M | auto status = stream_->TryWrite(); |
162 | 165M | if (!status.ok()) { |
163 | 1.47k | VLOG_WITH_PREFIX2 (1) << "Write failed: " << status2 ; |
164 | 1.47k | auto scheduled = reactor_->ScheduleReactorTask( |
165 | 1.47k | MakeFunctorReactorTask( |
166 | 1.47k | std::bind(&Reactor::DestroyConnection, reactor_, this, status), |
167 | 1.47k | shared_from_this(), SOURCE_LOCATION())); |
168 | 1.47k | LOG_IF_WITH_PREFIX0 (WARNING, !scheduled) << "Failed to schedule destroy"0 ; |
169 | 1.47k | } |
170 | 165M | } |
171 | | |
172 | 47.6M | void Connection::HandleTimeout(ev::timer& watcher, int revents) { // NOLINT |
173 | 47.6M | DCHECK(reactor_->IsCurrentThread()); |
174 | 47.6M | DVLOG_WITH_PREFIX138k (5) << "Connection::HandleTimeout revents: " << revents |
175 | 138k | << " connected: " << stream_->IsConnected(); |
176 | | |
177 | 47.6M | if (EV_ERROR & revents) { |
178 | 0 | LOG_WITH_PREFIX(WARNING) << "Got an error in handle timeout"; |
179 | 0 | return; |
180 | 0 | } |
181 | | |
182 | 47.6M | auto now = CoarseMonoClock::Now(); |
183 | | |
184 | 47.6M | CoarseTimePoint deadline = CoarseTimePoint::max(); |
185 | 47.6M | if (!stream_->IsConnected()) { |
186 | 986 | const MonoDelta timeout = FLAGS_rpc_connection_timeout_ms * 1ms; |
187 | 986 | deadline = last_activity_time_ + timeout; |
188 | 986 | DVLOG_WITH_PREFIX0 (5) << Format("now: $0, deadline: $1, timeout: $2", now, deadline, timeout)0 ; |
189 | 986 | if (now > deadline) { |
190 | 178 | auto passed = reactor_->cur_time() - last_activity_time_; |
191 | 178 | reactor_->DestroyConnection( |
192 | 178 | this, |
193 | 178 | STATUS_FORMAT(NetworkError, "Connect timeout, passed: $0, timeout: $1", passed, timeout)); |
194 | 178 | return; |
195 | 178 | } |
196 | 986 | } |
197 | | |
198 | 111M | while (47.6M !expiration_queue_.empty() && expiration_queue_.top().time <= now107M ) { |
199 | 63.7M | auto& top = expiration_queue_.top(); |
200 | 63.7M | auto call = top.call.lock(); |
201 | 63.7M | auto handle = top.handle; |
202 | 63.7M | expiration_queue_.pop(); |
203 | 63.7M | if (call && !call->IsFinished()123k ) { |
204 | 38.4k | call->SetTimedOut(); |
205 | 38.4k | if (handle != std::numeric_limits<size_t>::max()) { |
206 | 29.5k | stream_->Cancelled(handle); |
207 | 29.5k | } |
208 | 38.4k | auto i = awaiting_response_.find(call->call_id()); |
209 | 38.4k | if (i != awaiting_response_.end()) { |
210 | 34.4k | i->second.reset(); |
211 | 34.4k | } |
212 | 38.4k | } |
213 | 63.7M | } |
214 | | |
215 | 47.6M | if (!expiration_queue_.empty()) { |
216 | 43.5M | deadline = std::min(deadline, expiration_queue_.top().time); |
217 | 43.5M | } |
218 | | |
219 | 47.6M | if (deadline != CoarseTimePoint::max()) { |
220 | 43.5M | timer_.Start(deadline - now); |
221 | 43.5M | } |
222 | 47.6M | } |
223 | | |
224 | 75.4M | void Connection::QueueOutboundCall(const OutboundCallPtr& call) { |
225 | 75.4M | DCHECK(call); |
226 | 75.4M | DCHECK_EQ(direction_, Direction::CLIENT); |
227 | | |
228 | 75.4M | auto handle = DoQueueOutboundData(call, true); |
229 | | |
230 | | // Set up the timeout timer. |
231 | 75.4M | const MonoDelta& timeout = call->controller()->timeout(); |
232 | 75.4M | if (timeout.Initialized()) { |
233 | 75.4M | auto expires_at = CoarseMonoClock::Now() + timeout.ToSteadyDuration(); |
234 | 75.4M | auto reschedule = expiration_queue_.empty() || expiration_queue_.top().time > expires_at67.9M ; |
235 | 75.4M | expiration_queue_.push({expires_at, call, handle}); |
236 | 75.4M | if (reschedule && (8.10M stream_->IsConnected()8.10M || |
237 | 8.10M | expires_at < last_activity_time_ + FLAGS_rpc_connection_timeout_ms * 1ms3.18M )) { |
238 | 7.56M | timer_.Start(timeout.ToSteadyDuration()); |
239 | 7.56M | } |
240 | 75.4M | } |
241 | | |
242 | 75.4M | call->SetQueued(); |
243 | 75.4M | } |
244 | | |
245 | 167M | size_t Connection::DoQueueOutboundData(OutboundDataPtr outbound_data, bool batch) { |
246 | 167M | DCHECK(reactor_->IsCurrentThread()); |
247 | 18.4E | DVLOG_WITH_PREFIX(4) << "Connection::DoQueueOutboundData: " << AsString(outbound_data); |
248 | | |
249 | 167M | if (!shutdown_status_.ok()) { |
250 | 1 | YB_LOG_EVERY_N_SECS(INFO, 5) << "Connection::DoQueueOutboundData data: " |
251 | 1 | << AsString(outbound_data) << " shutdown_status_: " |
252 | 1 | << shutdown_status_; |
253 | 1 | outbound_data->Transferred(shutdown_status_, this); |
254 | 1 | return std::numeric_limits<size_t>::max(); |
255 | 1 | } |
256 | | |
257 | | // If the connection is torn down, then the QueueOutbound() call that |
258 | | // eventually runs in the reactor thread will take care of calling |
259 | | // ResponseTransferCallbacks::NotifyTransferAborted. |
260 | | |
261 | | // Check before and after calling Send. Before to reset state, if we |
262 | | // were over the limit; but are now back in good standing. After, to |
263 | | // check if we are now over the limit. |
264 | 167M | Status s = context_->ReportPendingWriteBytes(stream_->GetPendingWriteBytes()); |
265 | 167M | if (!s.ok()) { |
266 | 1 | Shutdown(s); |
267 | 1 | return std::numeric_limits<size_t>::max(); |
268 | 1 | } |
269 | 167M | auto result = stream_->Send(std::move(outbound_data)); |
270 | 167M | if (!result.ok()) { |
271 | 0 | Shutdown(result.status()); |
272 | 0 | return std::numeric_limits<size_t>::max(); |
273 | 0 | } |
274 | 167M | s = context_->ReportPendingWriteBytes(stream_->GetPendingWriteBytes()); |
275 | 167M | if (!s.ok()) { |
276 | 2 | Shutdown(s); |
277 | 2 | return std::numeric_limits<size_t>::max(); |
278 | 2 | } |
279 | | |
280 | 167M | if (!batch) { |
281 | 12.7M | OutboundQueued(); |
282 | 12.7M | } |
283 | | |
284 | 167M | return *result; |
285 | 167M | } |
286 | | |
287 | 0 | void Connection::ParseReceived() { |
288 | 0 | stream_->ParseReceived(); |
289 | 0 | } |
290 | | |
291 | 159M | Result<size_t> Connection::ProcessReceived(ReadBufferFull read_buffer_full) { |
292 | 159M | auto result = context_->ProcessCalls( |
293 | 159M | shared_from_this(), ReadBuffer().AppendedVecs(), read_buffer_full); |
294 | 18.4E | VLOG_WITH_PREFIX(4) << "context_->ProcessCalls result: " << AsString(result); |
295 | 159M | if (PREDICT_FALSE(!result.ok())) { |
296 | 2.22k | LOG_WITH_PREFIX(WARNING) << "Command sequence failure: " << result.status(); |
297 | 2.22k | return result.status(); |
298 | 2.22k | } |
299 | | |
300 | 159M | if (!result->consumed && ReadBuffer().Full()37.7k && context_->Idle()0 ) { |
301 | 0 | return STATUS_FORMAT( |
302 | 0 | InvalidArgument, "Command is greater than read buffer, exist data: $0", |
303 | 0 | IoVecsFullSize(ReadBuffer().AppendedVecs())); |
304 | 0 | } |
305 | | |
306 | 159M | ReadBuffer().Consume(result->consumed, result->buffer); |
307 | | |
308 | 159M | return result->bytes_to_skip; |
309 | 159M | } |
310 | | |
311 | 72.8M | Status Connection::HandleCallResponse(CallData* call_data) { |
312 | 72.8M | DCHECK(reactor_->IsCurrentThread()); |
313 | 72.8M | CallResponse resp; |
314 | 72.8M | RETURN_NOT_OK(resp.ParseFrom(call_data)); |
315 | | |
316 | 72.8M | ++responded_call_count_; |
317 | 72.8M | auto awaiting = awaiting_response_.find(resp.call_id()); |
318 | 72.8M | if (awaiting == awaiting_response_.end()) { |
319 | 0 | LOG_WITH_PREFIX(DFATAL) << "Got a response for call id " << resp.call_id() << " which " |
320 | 0 | << "was not pending! Ignoring."; |
321 | 0 | return Status::OK(); |
322 | 0 | } |
323 | 72.8M | auto call = awaiting->second; |
324 | 72.8M | awaiting_response_.erase(awaiting); |
325 | | |
326 | 72.8M | if (PREDICT_FALSE(!call)) { |
327 | | // The call already failed due to a timeout. |
328 | 18.4E | VLOG_WITH_PREFIX(1) << "Got response to call id " << resp.call_id() |
329 | 18.4E | << " after client already timed out"; |
330 | 13.6k | return Status::OK(); |
331 | 13.6k | } |
332 | | |
333 | 72.7M | call->SetResponse(std::move(resp)); |
334 | | |
335 | 72.7M | return Status::OK(); |
336 | 72.8M | } |
337 | | |
338 | 72.6M | void Connection::CallSent(OutboundCallPtr call) { |
339 | 72.6M | DCHECK(reactor_->IsCurrentThread()); |
340 | | |
341 | 72.6M | awaiting_response_.emplace(call->call_id(), !call->IsFinished() ? call72.3M : nullptr258k ); |
342 | 72.6M | } |
343 | | |
344 | 193k | std::string Connection::ToString() const { |
345 | | // This may be called from other threads, so we cannot |
346 | | // include anything in the output about the current state, |
347 | | // which might concurrently change from another thread. |
348 | 193k | static const char* format = "Connection ($0) $1 $2 => $3"; |
349 | 193k | const void* self = this; |
350 | 193k | if (direction_ == Direction::SERVER) { |
351 | 155k | return Format(format, self, "server", remote(), local()); |
352 | 155k | } else { |
353 | 37.6k | return Format(format, self, "client", local(), remote()); |
354 | 37.6k | } |
355 | 193k | } |
356 | | |
357 | | Status Connection::DumpPB(const DumpRunningRpcsRequestPB& req, |
358 | 21 | RpcConnectionPB* resp) { |
359 | 21 | DCHECK(reactor_->IsCurrentThread()); |
360 | 21 | resp->set_remote_ip(yb::ToString(remote())); |
361 | 21 | resp->set_state(context_->State()); |
362 | | |
363 | 21 | const uint64_t processed_call_count = |
364 | 21 | direction_ == Direction::CLIENT ? responded_call_count_.load(std::memory_order_acquire)10 |
365 | 21 | : context_->ProcessedCallCount()11 ; |
366 | 21 | if (processed_call_count > 0) { |
367 | 18 | resp->set_processed_call_count(processed_call_count); |
368 | 18 | } |
369 | | |
370 | 21 | context_->DumpPB(req, resp); |
371 | | |
372 | 21 | if (direction_ == Direction::CLIENT) { |
373 | 10 | auto call_in_flight = resp->add_calls_in_flight(); |
374 | 10 | for (auto& entry : awaiting_response_) { |
375 | 2 | if (entry.second && entry.second->DumpPB(req, call_in_flight)1 ) { |
376 | 1 | call_in_flight = resp->add_calls_in_flight(); |
377 | 1 | } |
378 | 2 | } |
379 | 10 | resp->mutable_calls_in_flight()->DeleteSubrange(resp->calls_in_flight_size() - 1, 1); |
380 | 10 | stream_->DumpPB(req, resp); |
381 | 11 | } else if (direction_ != Direction::SERVER) { |
382 | 0 | LOG(FATAL) << "Invalid direction: " << to_underlying(direction_); |
383 | 0 | } |
384 | | |
385 | 21 | return Status::OK(); |
386 | 21 | } |
387 | | |
388 | 209k | void Connection::QueueOutboundDataBatch(const OutboundDataBatch& batch) { |
389 | 209k | DCHECK(reactor_->IsCurrentThread()); |
390 | | |
391 | 209k | for (const auto& call : batch) { |
392 | 209k | DoQueueOutboundData(call, /* batch */ true); |
393 | 209k | } |
394 | | |
395 | 209k | OutboundQueued(); |
396 | 209k | } |
397 | | |
398 | 92.1M | void Connection::QueueOutboundData(OutboundDataPtr outbound_data) { |
399 | 92.1M | if (reactor_->IsCurrentThread()) { |
400 | 12.7M | DoQueueOutboundData(std::move(outbound_data), /* batch */ false); |
401 | 12.7M | return; |
402 | 12.7M | } |
403 | | |
404 | 79.4M | bool was_empty; |
405 | 79.4M | { |
406 | 79.4M | std::unique_lock<simple_spinlock> lock(outbound_data_queue_lock_); |
407 | 79.4M | if (!shutdown_status_.ok()) { |
408 | 3.24k | auto task = MakeFunctorReactorTaskWithAbort( |
409 | 3.24k | std::bind(&OutboundData::Transferred, outbound_data, _2, /* conn */ nullptr), |
410 | 3.24k | SOURCE_LOCATION()); |
411 | 3.24k | lock.unlock(); |
412 | 3.24k | auto scheduled = reactor_->ScheduleReactorTask(task, true /* schedule_even_closing */); |
413 | 3.24k | LOG_IF_WITH_PREFIX1 (DFATAL, !scheduled) << "Failed to schedule OutboundData::Transferred"1 ; |
414 | 3.24k | return; |
415 | 3.24k | } |
416 | 79.4M | was_empty = outbound_data_to_process_.empty(); |
417 | 79.4M | outbound_data_to_process_.push_back(std::move(outbound_data)); |
418 | 79.4M | if (was_empty && !process_response_queue_task_78.0M ) { |
419 | 616k | process_response_queue_task_ = |
420 | 616k | MakeFunctorReactorTask(std::bind(&Connection::ProcessResponseQueue, this), |
421 | 616k | shared_from_this(), SOURCE_LOCATION()); |
422 | 616k | } |
423 | 79.4M | } |
424 | | |
425 | 79.4M | if (was_empty) { |
426 | | // TODO: what happens if the reactor is shutting down? Currently Abort is ignored. |
427 | 78.2M | auto scheduled = reactor_->ScheduleReactorTask(process_response_queue_task_); |
428 | 18.4E | LOG_IF_WITH_PREFIX(WARNING, !scheduled) |
429 | 18.4E | << "Failed to schedule Connection::ProcessResponseQueue"; |
430 | 78.2M | } |
431 | 79.4M | } |
432 | | |
433 | 78.2M | void Connection::ProcessResponseQueue() { |
434 | 78.2M | DCHECK(reactor_->IsCurrentThread()); |
435 | | |
436 | 78.2M | { |
437 | 78.2M | std::lock_guard<simple_spinlock> lock(outbound_data_queue_lock_); |
438 | 78.2M | outbound_data_to_process_.swap(outbound_data_being_processed_); |
439 | 78.2M | } |
440 | | |
441 | 78.2M | if (!outbound_data_being_processed_.empty()78.2M ) { |
442 | 79.5M | for (auto &call : outbound_data_being_processed_) { |
443 | 79.5M | DoQueueOutboundData(std::move(call), /* batch */ true); |
444 | 79.5M | } |
445 | 78.2M | outbound_data_being_processed_.clear(); |
446 | 78.2M | OutboundQueued(); |
447 | 78.2M | } |
448 | 78.2M | } |
449 | | |
450 | 3.75M | Status Connection::Start(ev::loop_ref* loop) { |
451 | 3.75M | DCHECK(reactor_->IsCurrentThread()); |
452 | | |
453 | 3.75M | context_->SetEventLoop(loop); |
454 | | |
455 | 3.75M | RETURN_NOT_OK(stream_->Start(direction_ == Direction::CLIENT, loop, this)); |
456 | | |
457 | 3.75M | timer_.Init(*loop); |
458 | 3.75M | timer_.SetCallback<Connection, &Connection::HandleTimeout>(this); // NOLINT |
459 | | |
460 | 3.75M | if (!stream_->IsConnected()) { |
461 | 3.72M | timer_.Start(FLAGS_rpc_connection_timeout_ms * 1ms); |
462 | 3.72M | } |
463 | | |
464 | 3.75M | auto self = shared_from_this(); |
465 | 3.75M | context_->AssignConnection(self); |
466 | | |
467 | 3.75M | return Status::OK(); |
468 | 3.75M | } |
469 | | |
470 | 1.22M | void Connection::Connected() { |
471 | 1.22M | context_->Connected(shared_from_this()); |
472 | 1.22M | } |
473 | | |
474 | 1.87G | StreamReadBuffer& Connection::ReadBuffer() { |
475 | 1.87G | return context_->ReadBuffer(); |
476 | 1.87G | } |
477 | | |
478 | 53.2M | const Endpoint& Connection::remote() const { |
479 | 53.2M | return stream_->Remote(); |
480 | 53.2M | } |
481 | | |
482 | 23.0M | const Protocol* Connection::protocol() const { |
483 | 23.0M | return stream_->GetProtocol(); |
484 | 23.0M | } |
485 | | |
486 | 216k | const Endpoint& Connection::local() const { |
487 | 216k | return stream_->Local(); |
488 | 216k | } |
489 | | |
490 | 7.50k | void Connection::Close() { |
491 | 7.50k | stream_->Close(); |
492 | 7.50k | } |
493 | | |
494 | 381M | void Connection::UpdateLastActivity() { |
495 | 381M | last_activity_time_ = reactor_->cur_time(); |
496 | 381M | VLOG_WITH_PREFIX481k (4) << "Updated last_activity_time_=" << AsString(last_activity_time_)481k ; |
497 | 381M | } |
498 | | |
499 | 162M | void Connection::UpdateLastRead() { |
500 | 162M | context_->UpdateLastRead(shared_from_this()); |
501 | 162M | } |
502 | | |
503 | 160M | void Connection::UpdateLastWrite() { |
504 | 160M | context_->UpdateLastWrite(shared_from_this()); |
505 | 160M | } |
506 | | |
507 | 167M | void Connection::Transferred(const OutboundDataPtr& data, const Status& status) { |
508 | 167M | data->Transferred(status, this); |
509 | 167M | } |
510 | | |
511 | 3.04M | void Connection::Destroy(const Status& status) { |
512 | 3.04M | reactor_->DestroyConnection(this, status); |
513 | 3.04M | } |
514 | | |
515 | 2.26k | std::string Connection::LogPrefix() const { |
516 | 2.26k | return ToString() + ": "; |
517 | 2.26k | } |
518 | | |
519 | | } // namespace rpc |
520 | | } // namespace yb |