/Users/deen/code/yugabyte-db/src/yb/rpc/connection.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | // |
18 | | // The following only applies to changes made to this file as part of YugaByte development. |
19 | | // |
20 | | // Portions Copyright (c) YugaByte, Inc. |
21 | | // |
22 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
23 | | // in compliance with the License. You may obtain a copy of the License at |
24 | | // |
25 | | // http://www.apache.org/licenses/LICENSE-2.0 |
26 | | // |
27 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
28 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
29 | | // or implied. See the License for the specific language governing permissions and limitations |
30 | | // under the License. |
31 | | // |
32 | | |
33 | | #include "yb/rpc/connection.h" |
34 | | |
35 | | #include <thread> |
36 | | #include <utility> |
37 | | |
38 | | #include "yb/gutil/map-util.h" |
39 | | #include "yb/gutil/strings/substitute.h" |
40 | | |
41 | | #include "yb/rpc/connection_context.h" |
42 | | #include "yb/rpc/messenger.h" |
43 | | #include "yb/rpc/reactor.h" |
44 | | #include "yb/rpc/rpc_controller.h" |
45 | | #include "yb/rpc/rpc_introspection.pb.h" |
46 | | #include "yb/rpc/rpc_metrics.h" |
47 | | |
48 | | #include "yb/util/enums.h" |
49 | | #include "yb/util/format.h" |
50 | | #include "yb/util/logging.h" |
51 | | #include "yb/util/metrics.h" |
52 | | #include "yb/util/result.h" |
53 | | #include "yb/util/status_format.h" |
54 | | #include "yb/util/string_util.h" |
55 | | #include "yb/util/trace.h" |
56 | | #include "yb/util/tsan_util.h" |
57 | | |
58 | | using namespace std::literals; |
59 | | using namespace std::placeholders; |
60 | | using std::shared_ptr; |
61 | | using std::vector; |
62 | | using strings::Substitute; |
63 | | |
64 | | DEFINE_uint64(rpc_connection_timeout_ms, yb::NonTsanVsTsan(15000, 30000), |
65 | | "Timeout for RPC connection operations"); |
66 | | |
67 | | METRIC_DEFINE_histogram_with_percentiles( |
68 | | server, handler_latency_outbound_transfer, "Time taken to transfer the response ", |
69 | | yb::MetricUnit::kMicroseconds, "Microseconds spent to queue and write the response to the wire", |
70 | | 60000000LU, 2); |
71 | | |
72 | | namespace yb { |
73 | | namespace rpc { |
74 | | |
75 | | /// |
76 | | /// Connection |
77 | | /// |
78 | | Connection::Connection(Reactor* reactor, |
79 | | std::unique_ptr<Stream> stream, |
80 | | Direction direction, |
81 | | RpcMetrics* rpc_metrics, |
82 | | std::unique_ptr<ConnectionContext> context) |
83 | | : reactor_(reactor), |
84 | | stream_(std::move(stream)), |
85 | | direction_(direction), |
86 | | last_activity_time_(CoarseMonoClock::Now()), |
87 | | rpc_metrics_(rpc_metrics), |
88 | 946k | context_(std::move(context)) { |
89 | 946k | const auto metric_entity = reactor->messenger()->metric_entity(); |
90 | 946k | handler_latency_outbound_transfer_ = metric_entity ? |
91 | 884k | METRIC_handler_latency_outbound_transfer.Instantiate(metric_entity) : nullptr; |
92 | 946k | IncrementCounter(rpc_metrics_->connections_created); |
93 | 946k | IncrementGauge(rpc_metrics_->connections_alive); |
94 | 946k | } |
95 | | |
96 | 623k | Connection::~Connection() { |
97 | 623k | DecrementGauge(rpc_metrics_->connections_alive); |
98 | 623k | } |
99 | | |
100 | 0 | void UpdateIdleReason(const char* message, bool* result, std::string* reason) { |
101 | 0 | *result = false; |
102 | 0 | if (reason) { |
103 | 0 | AppendWithSeparator(message, reason); |
104 | 0 | } |
105 | 0 | } |
106 | | |
107 | 59.6M | bool Connection::Idle(std::string* reason_not_idle) const { |
108 | 59.6M | DCHECK(reactor_->IsCurrentThread()); |
109 | | |
110 | 59.6M | bool result = stream_->Idle(reason_not_idle); |
111 | | |
112 | | // Connection is not idle if calls are waiting for a response. |
113 | 59.6M | if (!awaiting_response_.empty()) { |
114 | 0 | UpdateIdleReason("awaiting response", &result, reason_not_idle); |
115 | 0 | } |
116 | | |
117 | | // Check upstream logic (i.e. processing calls, etc.) |
118 | 59.6M | return context_->Idle(reason_not_idle) && result; |
119 | 59.6M | } |
120 | | |
121 | 0 | std::string Connection::ReasonNotIdle() const { |
122 | 0 | std::string reason; |
123 | 0 | Idle(&reason); |
124 | 0 | return reason; |
125 | 0 | } |
126 | | |
127 | 1.21M | void Connection::Shutdown(const Status& status) { |
128 | 1.21M | DCHECK(reactor_->IsCurrentThread()); |
129 | | |
130 | 1.21M | { |
131 | 1.21M | std::lock_guard<simple_spinlock> lock(outbound_data_queue_lock_); |
132 | 1.21M | outbound_data_being_processed_.swap(outbound_data_to_process_); |
133 | 1.21M | shutdown_status_ = status; |
134 | 1.21M | } |
135 | | |
136 | | // Clear any calls which have been sent and were awaiting a response. |
137 | 12.1k | for (auto& v : awaiting_response_) { |
138 | 12.1k | if (v.second) { |
139 | 10.4k | v.second->SetFailed(status); |
140 | 10.4k | } |
141 | 12.1k | } |
142 | 1.21M | awaiting_response_.clear(); |
143 | | |
144 | 134 | for (auto& call : outbound_data_being_processed_) { |
145 | 134 | call->Transferred(status, this); |
146 | 134 | } |
147 | 1.21M | outbound_data_being_processed_.clear(); |
148 | | |
149 | 1.21M | context_->Shutdown(status); |
150 | | |
151 | 1.21M | stream_->Shutdown(status); |
152 | 1.21M | timer_.Shutdown(); |
153 | | |
154 | | // TODO(bogdan): re-enable once we decide how to control verbose logs better... |
155 | | // LOG_WITH_PREFIX(INFO) << "Connection::Shutdown completed, status: " << status; |
156 | 1.21M | } |
157 | | |
158 | 44.0M | void Connection::OutboundQueued() { |
159 | 44.0M | DCHECK(reactor_->IsCurrentThread()); |
160 | | |
161 | 44.0M | auto status = stream_->TryWrite(); |
162 | 44.0M | if (!status.ok()) { |
163 | 0 | VLOG_WITH_PREFIX(1) << "Write failed: " << status; |
164 | 28 | auto scheduled = reactor_->ScheduleReactorTask( |
165 | 28 | MakeFunctorReactorTask( |
166 | 28 | std::bind(&Reactor::DestroyConnection, reactor_, this, status), |
167 | 28 | shared_from_this(), SOURCE_LOCATION())); |
168 | 0 | LOG_IF_WITH_PREFIX(WARNING, !scheduled) << "Failed to schedule destroy"; |
169 | 28 | } |
170 | 44.0M | } |
171 | | |
172 | 12.7M | void Connection::HandleTimeout(ev::timer& watcher, int revents) { // NOLINT |
173 | 12.7M | DCHECK(reactor_->IsCurrentThread()); |
174 | 31.9k | DVLOG_WITH_PREFIX(5) << "Connection::HandleTimeout revents: " << revents |
175 | 31.9k | << " connected: " << stream_->IsConnected(); |
176 | | |
177 | 12.7M | if (EV_ERROR & revents) { |
178 | 0 | LOG_WITH_PREFIX(WARNING) << "Got an error in handle timeout"; |
179 | 0 | return; |
180 | 0 | } |
181 | | |
182 | 12.7M | auto now = CoarseMonoClock::Now(); |
183 | | |
184 | 12.7M | CoarseTimePoint deadline = CoarseTimePoint::max(); |
185 | 12.7M | if (!stream_->IsConnected()) { |
186 | 646 | const MonoDelta timeout = FLAGS_rpc_connection_timeout_ms * 1ms; |
187 | 646 | deadline = last_activity_time_ + timeout; |
188 | 0 | DVLOG_WITH_PREFIX(5) << Format("now: $0, deadline: $1, timeout: $2", now, deadline, timeout); |
189 | 646 | if (now > deadline) { |
190 | 1 | auto passed = reactor_->cur_time() - last_activity_time_; |
191 | 1 | reactor_->DestroyConnection( |
192 | 1 | this, |
193 | 1 | STATUS_FORMAT(NetworkError, "Connect timeout, passed: $0, timeout: $1", passed, timeout)); |
194 | 1 | return; |
195 | 1 | } |
196 | 12.7M | } |
197 | | |
198 | 26.9M | while (!expiration_queue_.empty() && expiration_queue_.top().time <= now) { |
199 | 14.2M | auto& top = expiration_queue_.top(); |
200 | 14.2M | auto call = top.call.lock(); |
201 | 14.2M | auto handle = top.handle; |
202 | 14.2M | expiration_queue_.pop(); |
203 | 14.2M | if (call && !call->IsFinished()) { |
204 | 14.7k | call->SetTimedOut(); |
205 | 14.7k | if (handle != std::numeric_limits<size_t>::max()) { |
206 | 14.1k | stream_->Cancelled(handle); |
207 | 14.1k | } |
208 | 14.7k | auto i = awaiting_response_.find(call->call_id()); |
209 | 14.7k | if (i != awaiting_response_.end()) { |
210 | 9.92k | i->second.reset(); |
211 | 9.92k | } |
212 | 14.7k | } |
213 | 14.2M | } |
214 | | |
215 | 12.7M | if (!expiration_queue_.empty()) { |
216 | 12.5M | deadline = std::min(deadline, expiration_queue_.top().time); |
217 | 12.5M | } |
218 | | |
219 | 12.7M | if (deadline != CoarseTimePoint::max()) { |
220 | 12.5M | timer_.Start(deadline - now); |
221 | 12.5M | } |
222 | 12.7M | } |
223 | | |
224 | 19.8M | void Connection::QueueOutboundCall(const OutboundCallPtr& call) { |
225 | 19.8M | DCHECK(call); |
226 | 19.8M | DCHECK_EQ(direction_, Direction::CLIENT); |
227 | | |
228 | 19.8M | auto handle = DoQueueOutboundData(call, true); |
229 | | |
230 | | // Set up the timeout timer. |
231 | 19.8M | const MonoDelta& timeout = call->controller()->timeout(); |
232 | 19.9M | if (timeout.Initialized()) { |
233 | 19.9M | auto expires_at = CoarseMonoClock::Now() + timeout.ToSteadyDuration(); |
234 | 19.9M | auto reschedule = expiration_queue_.empty() || expiration_queue_.top().time > expires_at; |
235 | 19.9M | expiration_queue_.push({expires_at, call, handle}); |
236 | 19.9M | if (reschedule && (stream_->IsConnected() || |
237 | 1.15M | expires_at < last_activity_time_ + FLAGS_rpc_connection_timeout_ms * 1ms)) { |
238 | 1.15M | timer_.Start(timeout.ToSteadyDuration()); |
239 | 1.15M | } |
240 | 19.9M | } |
241 | | |
242 | 19.8M | call->SetQueued(); |
243 | 19.8M | } |
244 | | |
245 | 45.1M | size_t Connection::DoQueueOutboundData(OutboundDataPtr outbound_data, bool batch) { |
246 | 45.1M | DCHECK(reactor_->IsCurrentThread()); |
247 | 18.4E | DVLOG_WITH_PREFIX(4) << "Connection::DoQueueOutboundData: " << AsString(outbound_data); |
248 | | |
249 | 45.1M | if (!shutdown_status_.ok()) { |
250 | 0 | YB_LOG_EVERY_N_SECS(INFO, 5) << "Connection::DoQueueOutboundData data: " |
251 | 0 | << AsString(outbound_data) << " shutdown_status_: " |
252 | 0 | << shutdown_status_; |
253 | 0 | outbound_data->Transferred(shutdown_status_, this); |
254 | 0 | return std::numeric_limits<size_t>::max(); |
255 | 0 | } |
256 | | |
257 | | // If the connection is torn down, then the QueueOutbound() call that |
258 | | // eventually runs in the reactor thread will take care of calling |
259 | | // ResponseTransferCallbacks::NotifyTransferAborted. |
260 | | |
261 | | // Check before and after calling Send. Before to reset state, if we |
262 | | // were over the limit; but are now back in good standing. After, to |
263 | | // check if we are now over the limit. |
264 | 45.1M | Status s = context_->ReportPendingWriteBytes(stream_->GetPendingWriteBytes()); |
265 | 45.1M | if (!s.ok()) { |
266 | 0 | Shutdown(s); |
267 | 0 | return std::numeric_limits<size_t>::max(); |
268 | 0 | } |
269 | 45.1M | auto result = stream_->Send(std::move(outbound_data)); |
270 | 45.1M | if (!result.ok()) { |
271 | 0 | Shutdown(result.status()); |
272 | 0 | return std::numeric_limits<size_t>::max(); |
273 | 0 | } |
274 | 45.1M | s = context_->ReportPendingWriteBytes(stream_->GetPendingWriteBytes()); |
275 | 45.1M | if (!s.ok()) { |
276 | 0 | Shutdown(s); |
277 | 0 | return std::numeric_limits<size_t>::max(); |
278 | 0 | } |
279 | | |
280 | 45.1M | if (!batch) { |
281 | 1.08M | OutboundQueued(); |
282 | 1.08M | } |
283 | | |
284 | 45.1M | return *result; |
285 | 45.1M | } |
286 | | |
287 | 0 | void Connection::ParseReceived() { |
288 | 0 | stream_->ParseReceived(); |
289 | 0 | } |
290 | | |
291 | 42.4M | Result<size_t> Connection::ProcessReceived(ReadBufferFull read_buffer_full) { |
292 | 42.4M | auto result = context_->ProcessCalls( |
293 | 42.4M | shared_from_this(), ReadBuffer().AppendedVecs(), read_buffer_full); |
294 | 8.24k | VLOG_WITH_PREFIX(4) << "context_->ProcessCalls result: " << AsString(result); |
295 | 42.4M | if (PREDICT_FALSE(!result.ok())) { |
296 | 6 | LOG_WITH_PREFIX(WARNING) << "Command sequence failure: " << result.status(); |
297 | 6 | return result.status(); |
298 | 6 | } |
299 | | |
300 | 42.4M | if (!result->consumed && ReadBuffer().Full() && context_->Idle()) { |
301 | 0 | return STATUS_FORMAT( |
302 | 0 | InvalidArgument, "Command is greater than read buffer, exist data: $0", |
303 | 0 | IoVecsFullSize(ReadBuffer().AppendedVecs())); |
304 | 0 | } |
305 | | |
306 | 42.4M | ReadBuffer().Consume(result->consumed, result->buffer); |
307 | | |
308 | 42.4M | return result->bytes_to_skip; |
309 | 42.4M | } |
310 | | |
311 | 19.5M | Status Connection::HandleCallResponse(CallData* call_data) { |
312 | 19.5M | DCHECK(reactor_->IsCurrentThread()); |
313 | 19.5M | CallResponse resp; |
314 | 19.5M | RETURN_NOT_OK(resp.ParseFrom(call_data)); |
315 | | |
316 | 19.5M | ++responded_call_count_; |
317 | 19.5M | auto awaiting = awaiting_response_.find(resp.call_id()); |
318 | 19.5M | if (awaiting == awaiting_response_.end()) { |
319 | 0 | LOG_WITH_PREFIX(DFATAL) << "Got a response for call id " << resp.call_id() << " which " |
320 | 0 | << "was not pending! Ignoring."; |
321 | 0 | return Status::OK(); |
322 | 0 | } |
323 | 19.5M | auto call = awaiting->second; |
324 | 19.5M | awaiting_response_.erase(awaiting); |
325 | | |
326 | 19.5M | if (PREDICT_FALSE(!call)) { |
327 | | // The call already failed due to a timeout. |
328 | 18.4E | VLOG_WITH_PREFIX(1) << "Got response to call id " << resp.call_id() |
329 | 18.4E | << " after client already timed out"; |
330 | 7.82k | return Status::OK(); |
331 | 7.82k | } |
332 | | |
333 | 19.5M | call->SetResponse(std::move(resp)); |
334 | | |
335 | 19.5M | return Status::OK(); |
336 | 19.5M | } |
337 | | |
338 | 19.4M | void Connection::CallSent(OutboundCallPtr call) { |
339 | 19.4M | DCHECK(reactor_->IsCurrentThread()); |
340 | | |
341 | 19.4M | awaiting_response_.emplace(call->call_id(), !call->IsFinished() ? call : nullptr); |
342 | 19.4M | } |
343 | | |
344 | 7.70k | std::string Connection::ToString() const { |
345 | | // This may be called from other threads, so we cannot |
346 | | // include anything in the output about the current state, |
347 | | // which might concurrently change from another thread. |
348 | 7.70k | static const char* format = "Connection ($0) $1 $2 => $3"; |
349 | 7.70k | const void* self = this; |
350 | 7.70k | if (direction_ == Direction::SERVER) { |
351 | 5.31k | return Format(format, self, "server", remote(), local()); |
352 | 2.38k | } else { |
353 | 2.38k | return Format(format, self, "client", local(), remote()); |
354 | 2.38k | } |
355 | 7.70k | } |
356 | | |
357 | | Status Connection::DumpPB(const DumpRunningRpcsRequestPB& req, |
358 | 21 | RpcConnectionPB* resp) { |
359 | 21 | DCHECK(reactor_->IsCurrentThread()); |
360 | 21 | resp->set_remote_ip(yb::ToString(remote())); |
361 | 21 | resp->set_state(context_->State()); |
362 | | |
363 | 21 | const uint64_t processed_call_count = |
364 | 10 | direction_ == Direction::CLIENT ? responded_call_count_.load(std::memory_order_acquire) |
365 | 11 | : context_->ProcessedCallCount(); |
366 | 21 | if (processed_call_count > 0) { |
367 | 18 | resp->set_processed_call_count(processed_call_count); |
368 | 18 | } |
369 | | |
370 | 21 | context_->DumpPB(req, resp); |
371 | | |
372 | 21 | if (direction_ == Direction::CLIENT) { |
373 | 10 | auto call_in_flight = resp->add_calls_in_flight(); |
374 | 1 | for (auto& entry : awaiting_response_) { |
375 | 1 | if (entry.second && entry.second->DumpPB(req, call_in_flight)) { |
376 | 1 | call_in_flight = resp->add_calls_in_flight(); |
377 | 1 | } |
378 | 1 | } |
379 | 10 | resp->mutable_calls_in_flight()->DeleteSubrange(resp->calls_in_flight_size() - 1, 1); |
380 | 10 | stream_->DumpPB(req, resp); |
381 | 11 | } else if (direction_ != Direction::SERVER) { |
382 | 0 | LOG(FATAL) << "Invalid direction: " << to_underlying(direction_); |
383 | 0 | } |
384 | | |
385 | 21 | return Status::OK(); |
386 | 21 | } |
387 | | |
388 | 104k | void Connection::QueueOutboundDataBatch(const OutboundDataBatch& batch) { |
389 | 104k | DCHECK(reactor_->IsCurrentThread()); |
390 | | |
391 | 104k | for (const auto& call : batch) { |
392 | 104k | DoQueueOutboundData(call, /* batch */ true); |
393 | 104k | } |
394 | | |
395 | 104k | OutboundQueued(); |
396 | 104k | } |
397 | | |
398 | 25.2M | void Connection::QueueOutboundData(OutboundDataPtr outbound_data) { |
399 | 25.2M | if (reactor_->IsCurrentThread()) { |
400 | 1.09M | DoQueueOutboundData(std::move(outbound_data), /* batch */ false); |
401 | 1.09M | return; |
402 | 1.09M | } |
403 | | |
404 | 24.1M | bool was_empty; |
405 | 24.1M | { |
406 | 24.1M | std::unique_lock<simple_spinlock> lock(outbound_data_queue_lock_); |
407 | 24.1M | if (!shutdown_status_.ok()) { |
408 | 423 | auto task = MakeFunctorReactorTaskWithAbort( |
409 | 423 | std::bind(&OutboundData::Transferred, outbound_data, _2, /* conn */ nullptr), |
410 | 423 | SOURCE_LOCATION()); |
411 | 423 | lock.unlock(); |
412 | 423 | auto scheduled = reactor_->ScheduleReactorTask(task, true /* schedule_even_closing */); |
413 | 0 | LOG_IF_WITH_PREFIX(DFATAL, !scheduled) << "Failed to schedule OutboundData::Transferred"; |
414 | 423 | return; |
415 | 423 | } |
416 | 24.1M | was_empty = outbound_data_to_process_.empty(); |
417 | 24.1M | outbound_data_to_process_.push_back(std::move(outbound_data)); |
418 | 24.1M | if (was_empty && !process_response_queue_task_) { |
419 | 313k | process_response_queue_task_ = |
420 | 313k | MakeFunctorReactorTask(std::bind(&Connection::ProcessResponseQueue, this), |
421 | 313k | shared_from_this(), SOURCE_LOCATION()); |
422 | 313k | } |
423 | 24.1M | } |
424 | | |
425 | 24.1M | if (was_empty) { |
426 | | // TODO: what happens if the reactor is shutting down? Currently Abort is ignored. |
427 | 23.3M | auto scheduled = reactor_->ScheduleReactorTask(process_response_queue_task_); |
428 | 18.4E | LOG_IF_WITH_PREFIX(WARNING, !scheduled) |
429 | 18.4E | << "Failed to schedule Connection::ProcessResponseQueue"; |
430 | 23.3M | } |
431 | 24.1M | } |
432 | | |
433 | 23.3M | void Connection::ProcessResponseQueue() { |
434 | 23.3M | DCHECK(reactor_->IsCurrentThread()); |
435 | | |
436 | 23.3M | { |
437 | 23.3M | std::lock_guard<simple_spinlock> lock(outbound_data_queue_lock_); |
438 | 23.3M | outbound_data_to_process_.swap(outbound_data_being_processed_); |
439 | 23.3M | } |
440 | | |
441 | 23.3M | if (!outbound_data_being_processed_.empty()) { |
442 | 24.1M | for (auto &call : outbound_data_being_processed_) { |
443 | 24.1M | DoQueueOutboundData(std::move(call), /* batch */ true); |
444 | 24.1M | } |
445 | 23.3M | outbound_data_being_processed_.clear(); |
446 | 23.3M | OutboundQueued(); |
447 | 23.3M | } |
448 | 23.3M | } |
449 | | |
450 | 944k | Status Connection::Start(ev::loop_ref* loop) { |
451 | 944k | DCHECK(reactor_->IsCurrentThread()); |
452 | | |
453 | 944k | context_->SetEventLoop(loop); |
454 | | |
455 | 944k | RETURN_NOT_OK(stream_->Start(direction_ == Direction::CLIENT, loop, this)); |
456 | | |
457 | 944k | timer_.Init(*loop); |
458 | 944k | timer_.SetCallback<Connection, &Connection::HandleTimeout>(this); // NOLINT |
459 | | |
460 | 944k | if (!stream_->IsConnected()) { |
461 | 933k | timer_.Start(FLAGS_rpc_connection_timeout_ms * 1ms); |
462 | 933k | } |
463 | | |
464 | 944k | auto self = shared_from_this(); |
465 | 944k | context_->AssignConnection(self); |
466 | | |
467 | 944k | return Status::OK(); |
468 | 944k | } |
469 | | |
470 | 613k | void Connection::Connected() { |
471 | 613k | context_->Connected(shared_from_this()); |
472 | 613k | } |
473 | | |
474 | 315M | StreamReadBuffer& Connection::ReadBuffer() { |
475 | 315M | return context_->ReadBuffer(); |
476 | 315M | } |
477 | | |
478 | 15.5M | const Endpoint& Connection::remote() const { |
479 | 15.5M | return stream_->Remote(); |
480 | 15.5M | } |
481 | | |
482 | 3.68M | const Protocol* Connection::protocol() const { |
483 | 3.68M | return stream_->GetProtocol(); |
484 | 3.68M | } |
485 | | |
486 | 23.6k | const Endpoint& Connection::local() const { |
487 | 23.6k | return stream_->Local(); |
488 | 23.6k | } |
489 | | |
490 | 10.0k | void Connection::Close() { |
491 | 10.0k | stream_->Close(); |
492 | 10.0k | } |
493 | | |
494 | 109M | void Connection::UpdateLastActivity() { |
495 | 109M | last_activity_time_ = reactor_->cur_time(); |
496 | 81.8k | VLOG_WITH_PREFIX(4) << "Updated last_activity_time_=" << AsString(last_activity_time_); |
497 | 109M | } |
498 | | |
499 | 42.9M | void Connection::UpdateLastRead() { |
500 | 42.9M | context_->UpdateLastRead(shared_from_this()); |
501 | 42.9M | } |
502 | | |
503 | 43.0M | void Connection::UpdateLastWrite() { |
504 | 43.0M | context_->UpdateLastWrite(shared_from_this()); |
505 | 43.0M | } |
506 | | |
507 | 45.2M | void Connection::Transferred(const OutboundDataPtr& data, const Status& status) { |
508 | 45.2M | data->Transferred(status, this); |
509 | 45.2M | } |
510 | | |
511 | 594k | void Connection::Destroy(const Status& status) { |
512 | 594k | reactor_->DestroyConnection(this, status); |
513 | 594k | } |
514 | | |
515 | 34 | std::string Connection::LogPrefix() const { |
516 | 34 | return ToString() + ": "; |
517 | 34 | } |
518 | | |
519 | | } // namespace rpc |
520 | | } // namespace yb |