YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/rpc/connection.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include "yb/rpc/connection.h"
34
35
#include <thread>
36
#include <utility>
37
38
#include "yb/gutil/map-util.h"
39
#include "yb/gutil/strings/substitute.h"
40
41
#include "yb/rpc/connection_context.h"
42
#include "yb/rpc/messenger.h"
43
#include "yb/rpc/reactor.h"
44
#include "yb/rpc/rpc_controller.h"
45
#include "yb/rpc/rpc_introspection.pb.h"
46
#include "yb/rpc/rpc_metrics.h"
47
48
#include "yb/util/enums.h"
49
#include "yb/util/format.h"
50
#include "yb/util/logging.h"
51
#include "yb/util/metrics.h"
52
#include "yb/util/result.h"
53
#include "yb/util/status_format.h"
54
#include "yb/util/string_util.h"
55
#include "yb/util/trace.h"
56
#include "yb/util/tsan_util.h"
57
58
using namespace std::literals;
59
using namespace std::placeholders;
60
using std::shared_ptr;
61
using std::vector;
62
using strings::Substitute;
63
64
DEFINE_uint64(rpc_connection_timeout_ms, yb::NonTsanVsTsan(15000, 30000),
65
    "Timeout for RPC connection operations");
66
67
METRIC_DEFINE_histogram_with_percentiles(
68
    server, handler_latency_outbound_transfer, "Time taken to transfer the response ",
69
    yb::MetricUnit::kMicroseconds, "Microseconds spent to queue and write the response to the wire",
70
    60000000LU, 2);
71
72
namespace yb {
73
namespace rpc {
74
75
///
76
/// Connection
77
///
78
Connection::Connection(Reactor* reactor,
79
                       std::unique_ptr<Stream> stream,
80
                       Direction direction,
81
                       RpcMetrics* rpc_metrics,
82
                       std::unique_ptr<ConnectionContext> context)
83
    : reactor_(reactor),
84
      stream_(std::move(stream)),
85
      direction_(direction),
86
      last_activity_time_(CoarseMonoClock::Now()),
87
      rpc_metrics_(rpc_metrics),
88
3.76M
      context_(std::move(context)) {
89
3.76M
  const auto metric_entity = reactor->messenger()->metric_entity();
90
3.76M
  handler_latency_outbound_transfer_ = metric_entity ?
91
3.70M
      METRIC_handler_latency_outbound_transfer.Instantiate(metric_entity) : 
nullptr61.7k
;
92
3.76M
  IncrementCounter(rpc_metrics_->connections_created);
93
3.76M
  IncrementGauge(rpc_metrics_->connections_alive);
94
3.76M
}
95
96
3.25M
Connection::~Connection() {
97
3.25M
  DecrementGauge(rpc_metrics_->connections_alive);
98
3.25M
}
99
100
0
void UpdateIdleReason(const char* message, bool* result, std::string* reason) {
101
0
  *result = false;
102
0
  if (reason) {
103
0
    AppendWithSeparator(message, reason);
104
0
  }
105
0
}
106
107
916M
bool Connection::Idle(std::string* reason_not_idle) const {
108
916M
  DCHECK(reactor_->IsCurrentThread());
109
110
916M
  bool result = stream_->Idle(reason_not_idle);
111
112
  // Connection is not idle if calls are waiting for a response.
113
916M
  if (!awaiting_response_.empty()) {
114
0
    UpdateIdleReason("awaiting response", &result, reason_not_idle);
115
0
  }
116
117
  // Check upstream logic (i.e. processing calls, etc.)
118
916M
  return context_->Idle(reason_not_idle) && 
result906M
;
119
916M
}
120
121
0
std::string Connection::ReasonNotIdle() const {
122
0
  std::string reason;
123
0
  Idle(&reason);
124
0
  return reason;
125
0
}
126
127
6.33M
void Connection::Shutdown(const Status& status) {
128
6.33M
  DCHECK(reactor_->IsCurrentThread());
129
130
6.33M
  {
131
6.33M
    std::lock_guard<simple_spinlock> lock(outbound_data_queue_lock_);
132
6.33M
    outbound_data_being_processed_.swap(outbound_data_to_process_);
133
6.33M
    shutdown_status_ = status;
134
6.33M
  }
135
136
  // Clear any calls which have been sent and were awaiting a response.
137
6.33M
  for (auto& v : awaiting_response_) {
138
33.3k
    if (v.second) {
139
14.0k
      v.second->SetFailed(status);
140
14.0k
    }
141
33.3k
  }
142
6.33M
  awaiting_response_.clear();
143
144
6.33M
  for (auto& call : outbound_data_being_processed_) {
145
479
    call->Transferred(status, this);
146
479
  }
147
6.33M
  outbound_data_being_processed_.clear();
148
149
6.33M
  context_->Shutdown(status);
150
151
6.33M
  stream_->Shutdown(status);
152
6.33M
  timer_.Shutdown();
153
154
  // TODO(bogdan): re-enable once we decide how to control verbose logs better...
155
  // LOG_WITH_PREFIX(INFO) << "Connection::Shutdown completed, status: " << status;
156
6.33M
}
157
158
165M
void Connection::OutboundQueued() {
159
165M
  DCHECK(reactor_->IsCurrentThread());
160
161
165M
  auto status = stream_->TryWrite();
162
165M
  if (!status.ok()) {
163
1.47k
    
VLOG_WITH_PREFIX2
(1) << "Write failed: " << status2
;
164
1.47k
    auto scheduled = reactor_->ScheduleReactorTask(
165
1.47k
        MakeFunctorReactorTask(
166
1.47k
            std::bind(&Reactor::DestroyConnection, reactor_, this, status),
167
1.47k
            shared_from_this(), SOURCE_LOCATION()));
168
1.47k
    
LOG_IF_WITH_PREFIX0
(WARNING, !scheduled) << "Failed to schedule destroy"0
;
169
1.47k
  }
170
165M
}
171
172
47.6M
void Connection::HandleTimeout(ev::timer& watcher, int revents) {  // NOLINT
173
47.6M
  DCHECK(reactor_->IsCurrentThread());
174
47.6M
  
DVLOG_WITH_PREFIX138k
(5) << "Connection::HandleTimeout revents: " << revents
175
138k
                       << " connected: " << stream_->IsConnected();
176
177
47.6M
  if (EV_ERROR & revents) {
178
0
    LOG_WITH_PREFIX(WARNING) << "Got an error in handle timeout";
179
0
    return;
180
0
  }
181
182
47.6M
  auto now = CoarseMonoClock::Now();
183
184
47.6M
  CoarseTimePoint deadline = CoarseTimePoint::max();
185
47.6M
  if (!stream_->IsConnected()) {
186
986
    const MonoDelta timeout = FLAGS_rpc_connection_timeout_ms * 1ms;
187
986
    deadline = last_activity_time_ + timeout;
188
986
    
DVLOG_WITH_PREFIX0
(5) << Format("now: $0, deadline: $1, timeout: $2", now, deadline, timeout)0
;
189
986
    if (now > deadline) {
190
178
      auto passed = reactor_->cur_time() - last_activity_time_;
191
178
      reactor_->DestroyConnection(
192
178
          this,
193
178
          STATUS_FORMAT(NetworkError, "Connect timeout, passed: $0, timeout: $1", passed, timeout));
194
178
      return;
195
178
    }
196
986
  }
197
198
111M
  
while (47.6M
!expiration_queue_.empty() &&
expiration_queue_.top().time <= now107M
) {
199
63.7M
    auto& top = expiration_queue_.top();
200
63.7M
    auto call = top.call.lock();
201
63.7M
    auto handle = top.handle;
202
63.7M
    expiration_queue_.pop();
203
63.7M
    if (call && 
!call->IsFinished()123k
) {
204
38.4k
      call->SetTimedOut();
205
38.4k
      if (handle != std::numeric_limits<size_t>::max()) {
206
29.5k
        stream_->Cancelled(handle);
207
29.5k
      }
208
38.4k
      auto i = awaiting_response_.find(call->call_id());
209
38.4k
      if (i != awaiting_response_.end()) {
210
34.4k
        i->second.reset();
211
34.4k
      }
212
38.4k
    }
213
63.7M
  }
214
215
47.6M
  if (!expiration_queue_.empty()) {
216
43.5M
    deadline = std::min(deadline, expiration_queue_.top().time);
217
43.5M
  }
218
219
47.6M
  if (deadline != CoarseTimePoint::max()) {
220
43.5M
    timer_.Start(deadline - now);
221
43.5M
  }
222
47.6M
}
223
224
75.4M
void Connection::QueueOutboundCall(const OutboundCallPtr& call) {
225
75.4M
  DCHECK(call);
226
75.4M
  DCHECK_EQ(direction_, Direction::CLIENT);
227
228
75.4M
  auto handle = DoQueueOutboundData(call, true);
229
230
  // Set up the timeout timer.
231
75.4M
  const MonoDelta& timeout = call->controller()->timeout();
232
75.4M
  if (timeout.Initialized()) {
233
75.4M
    auto expires_at = CoarseMonoClock::Now() + timeout.ToSteadyDuration();
234
75.4M
    auto reschedule = expiration_queue_.empty() || 
expiration_queue_.top().time > expires_at67.9M
;
235
75.4M
    expiration_queue_.push({expires_at, call, handle});
236
75.4M
    if (reschedule && 
(8.10M
stream_->IsConnected()8.10M
||
237
8.10M
                       
expires_at < last_activity_time_ + FLAGS_rpc_connection_timeout_ms * 1ms3.18M
)) {
238
7.56M
      timer_.Start(timeout.ToSteadyDuration());
239
7.56M
    }
240
75.4M
  }
241
242
75.4M
  call->SetQueued();
243
75.4M
}
244
245
167M
size_t Connection::DoQueueOutboundData(OutboundDataPtr outbound_data, bool batch) {
246
167M
  DCHECK(reactor_->IsCurrentThread());
247
18.4E
  DVLOG_WITH_PREFIX(4) << "Connection::DoQueueOutboundData: " << AsString(outbound_data);
248
249
167M
  if (!shutdown_status_.ok()) {
250
1
    YB_LOG_EVERY_N_SECS(INFO, 5) << "Connection::DoQueueOutboundData data: "
251
1
                                 << AsString(outbound_data) << " shutdown_status_: "
252
1
                                 << shutdown_status_;
253
1
    outbound_data->Transferred(shutdown_status_, this);
254
1
    return std::numeric_limits<size_t>::max();
255
1
  }
256
257
  // If the connection is torn down, then the QueueOutbound() call that
258
  // eventually runs in the reactor thread will take care of calling
259
  // ResponseTransferCallbacks::NotifyTransferAborted.
260
261
  // Check before and after calling Send. Before to reset state, if we
262
  // were over the limit; but are now back in good standing. After, to
263
  // check if we are now over the limit.
264
167M
  Status s = context_->ReportPendingWriteBytes(stream_->GetPendingWriteBytes());
265
167M
  if (!s.ok()) {
266
1
    Shutdown(s);
267
1
    return std::numeric_limits<size_t>::max();
268
1
  }
269
167M
  auto result = stream_->Send(std::move(outbound_data));
270
167M
  if (!result.ok()) {
271
0
    Shutdown(result.status());
272
0
    return std::numeric_limits<size_t>::max();
273
0
  }
274
167M
  s = context_->ReportPendingWriteBytes(stream_->GetPendingWriteBytes());
275
167M
  if (!s.ok()) {
276
2
    Shutdown(s);
277
2
    return std::numeric_limits<size_t>::max();
278
2
  }
279
280
167M
  if (!batch) {
281
12.7M
    OutboundQueued();
282
12.7M
  }
283
284
167M
  return *result;
285
167M
}
286
287
0
void Connection::ParseReceived() {
288
0
  stream_->ParseReceived();
289
0
}
290
291
159M
Result<size_t> Connection::ProcessReceived(ReadBufferFull read_buffer_full) {
292
159M
  auto result = context_->ProcessCalls(
293
159M
      shared_from_this(), ReadBuffer().AppendedVecs(), read_buffer_full);
294
18.4E
  VLOG_WITH_PREFIX(4) << "context_->ProcessCalls result: " << AsString(result);
295
159M
  if (PREDICT_FALSE(!result.ok())) {
296
2.22k
    LOG_WITH_PREFIX(WARNING) << "Command sequence failure: " << result.status();
297
2.22k
    return result.status();
298
2.22k
  }
299
300
159M
  if (!result->consumed && 
ReadBuffer().Full()37.7k
&&
context_->Idle()0
) {
301
0
    return STATUS_FORMAT(
302
0
        InvalidArgument, "Command is greater than read buffer, exist data: $0",
303
0
        IoVecsFullSize(ReadBuffer().AppendedVecs()));
304
0
  }
305
306
159M
  ReadBuffer().Consume(result->consumed, result->buffer);
307
308
159M
  return result->bytes_to_skip;
309
159M
}
310
311
72.8M
Status Connection::HandleCallResponse(CallData* call_data) {
312
72.8M
  DCHECK(reactor_->IsCurrentThread());
313
72.8M
  CallResponse resp;
314
72.8M
  RETURN_NOT_OK(resp.ParseFrom(call_data));
315
316
72.8M
  ++responded_call_count_;
317
72.8M
  auto awaiting = awaiting_response_.find(resp.call_id());
318
72.8M
  if (awaiting == awaiting_response_.end()) {
319
0
    LOG_WITH_PREFIX(DFATAL) << "Got a response for call id " << resp.call_id() << " which "
320
0
                            << "was not pending! Ignoring.";
321
0
    return Status::OK();
322
0
  }
323
72.8M
  auto call = awaiting->second;
324
72.8M
  awaiting_response_.erase(awaiting);
325
326
72.8M
  if (PREDICT_FALSE(!call)) {
327
    // The call already failed due to a timeout.
328
18.4E
    VLOG_WITH_PREFIX(1) << "Got response to call id " << resp.call_id()
329
18.4E
                        << " after client already timed out";
330
13.6k
    return Status::OK();
331
13.6k
  }
332
333
72.7M
  call->SetResponse(std::move(resp));
334
335
72.7M
  return Status::OK();
336
72.8M
}
337
338
72.6M
void Connection::CallSent(OutboundCallPtr call) {
339
72.6M
  DCHECK(reactor_->IsCurrentThread());
340
341
72.6M
  awaiting_response_.emplace(call->call_id(), !call->IsFinished() ? 
call72.3M
:
nullptr258k
);
342
72.6M
}
343
344
193k
std::string Connection::ToString() const {
345
  // This may be called from other threads, so we cannot
346
  // include anything in the output about the current state,
347
  // which might concurrently change from another thread.
348
193k
  static const char* format = "Connection ($0) $1 $2 => $3";
349
193k
  const void* self = this;
350
193k
  if (direction_ == Direction::SERVER) {
351
155k
    return Format(format, self, "server", remote(), local());
352
155k
  } else {
353
37.6k
    return Format(format, self, "client", local(), remote());
354
37.6k
  }
355
193k
}
356
357
Status Connection::DumpPB(const DumpRunningRpcsRequestPB& req,
358
21
                          RpcConnectionPB* resp) {
359
21
  DCHECK(reactor_->IsCurrentThread());
360
21
  resp->set_remote_ip(yb::ToString(remote()));
361
21
  resp->set_state(context_->State());
362
363
21
  const uint64_t processed_call_count =
364
21
      direction_ == Direction::CLIENT ? 
responded_call_count_.load(std::memory_order_acquire)10
365
21
                                      : 
context_->ProcessedCallCount()11
;
366
21
  if (processed_call_count > 0) {
367
18
    resp->set_processed_call_count(processed_call_count);
368
18
  }
369
370
21
  context_->DumpPB(req, resp);
371
372
21
  if (direction_ == Direction::CLIENT) {
373
10
    auto call_in_flight = resp->add_calls_in_flight();
374
10
    for (auto& entry : awaiting_response_) {
375
2
      if (entry.second && 
entry.second->DumpPB(req, call_in_flight)1
) {
376
1
        call_in_flight = resp->add_calls_in_flight();
377
1
      }
378
2
    }
379
10
    resp->mutable_calls_in_flight()->DeleteSubrange(resp->calls_in_flight_size() - 1, 1);
380
10
    stream_->DumpPB(req, resp);
381
11
  } else if (direction_ != Direction::SERVER) {
382
0
    LOG(FATAL) << "Invalid direction: " << to_underlying(direction_);
383
0
  }
384
385
21
  return Status::OK();
386
21
}
387
388
209k
void Connection::QueueOutboundDataBatch(const OutboundDataBatch& batch) {
389
209k
  DCHECK(reactor_->IsCurrentThread());
390
391
209k
  for (const auto& call : batch) {
392
209k
    DoQueueOutboundData(call, /* batch */ true);
393
209k
  }
394
395
209k
  OutboundQueued();
396
209k
}
397
398
92.1M
void Connection::QueueOutboundData(OutboundDataPtr outbound_data) {
399
92.1M
  if (reactor_->IsCurrentThread()) {
400
12.7M
    DoQueueOutboundData(std::move(outbound_data), /* batch */ false);
401
12.7M
    return;
402
12.7M
  }
403
404
79.4M
  bool was_empty;
405
79.4M
  {
406
79.4M
    std::unique_lock<simple_spinlock> lock(outbound_data_queue_lock_);
407
79.4M
    if (!shutdown_status_.ok()) {
408
3.24k
      auto task = MakeFunctorReactorTaskWithAbort(
409
3.24k
          std::bind(&OutboundData::Transferred, outbound_data, _2, /* conn */ nullptr),
410
3.24k
          SOURCE_LOCATION());
411
3.24k
      lock.unlock();
412
3.24k
      auto scheduled = reactor_->ScheduleReactorTask(task, true /* schedule_even_closing */);
413
3.24k
      
LOG_IF_WITH_PREFIX1
(DFATAL, !scheduled) << "Failed to schedule OutboundData::Transferred"1
;
414
3.24k
      return;
415
3.24k
    }
416
79.4M
    was_empty = outbound_data_to_process_.empty();
417
79.4M
    outbound_data_to_process_.push_back(std::move(outbound_data));
418
79.4M
    if (was_empty && 
!process_response_queue_task_78.0M
) {
419
616k
      process_response_queue_task_ =
420
616k
          MakeFunctorReactorTask(std::bind(&Connection::ProcessResponseQueue, this),
421
616k
                                 shared_from_this(), SOURCE_LOCATION());
422
616k
    }
423
79.4M
  }
424
425
79.4M
  if (was_empty) {
426
    // TODO: what happens if the reactor is shutting down? Currently Abort is ignored.
427
78.2M
    auto scheduled = reactor_->ScheduleReactorTask(process_response_queue_task_);
428
18.4E
    LOG_IF_WITH_PREFIX(WARNING, !scheduled)
429
18.4E
        << "Failed to schedule Connection::ProcessResponseQueue";
430
78.2M
  }
431
79.4M
}
432
433
78.2M
void Connection::ProcessResponseQueue() {
434
78.2M
  DCHECK(reactor_->IsCurrentThread());
435
436
78.2M
  {
437
78.2M
    std::lock_guard<simple_spinlock> lock(outbound_data_queue_lock_);
438
78.2M
    outbound_data_to_process_.swap(outbound_data_being_processed_);
439
78.2M
  }
440
441
78.2M
  if (
!outbound_data_being_processed_.empty()78.2M
) {
442
79.5M
    for (auto &call : outbound_data_being_processed_) {
443
79.5M
      DoQueueOutboundData(std::move(call), /* batch */ true);
444
79.5M
    }
445
78.2M
    outbound_data_being_processed_.clear();
446
78.2M
    OutboundQueued();
447
78.2M
  }
448
78.2M
}
449
450
3.75M
Status Connection::Start(ev::loop_ref* loop) {
451
3.75M
  DCHECK(reactor_->IsCurrentThread());
452
453
3.75M
  context_->SetEventLoop(loop);
454
455
3.75M
  RETURN_NOT_OK(stream_->Start(direction_ == Direction::CLIENT, loop, this));
456
457
3.75M
  timer_.Init(*loop);
458
3.75M
  timer_.SetCallback<Connection, &Connection::HandleTimeout>(this); // NOLINT
459
460
3.75M
  if (!stream_->IsConnected()) {
461
3.72M
    timer_.Start(FLAGS_rpc_connection_timeout_ms * 1ms);
462
3.72M
  }
463
464
3.75M
  auto self = shared_from_this();
465
3.75M
  context_->AssignConnection(self);
466
467
3.75M
  return Status::OK();
468
3.75M
}
469
470
1.22M
void Connection::Connected() {
471
1.22M
  context_->Connected(shared_from_this());
472
1.22M
}
473
474
1.87G
StreamReadBuffer& Connection::ReadBuffer() {
475
1.87G
  return context_->ReadBuffer();
476
1.87G
}
477
478
53.2M
const Endpoint& Connection::remote() const {
479
53.2M
  return stream_->Remote();
480
53.2M
}
481
482
23.0M
const Protocol* Connection::protocol() const {
483
23.0M
  return stream_->GetProtocol();
484
23.0M
}
485
486
216k
const Endpoint& Connection::local() const {
487
216k
  return stream_->Local();
488
216k
}
489
490
7.50k
void Connection::Close() {
491
7.50k
  stream_->Close();
492
7.50k
}
493
494
381M
void Connection::UpdateLastActivity() {
495
381M
  last_activity_time_ = reactor_->cur_time();
496
381M
  
VLOG_WITH_PREFIX481k
(4) << "Updated last_activity_time_=" << AsString(last_activity_time_)481k
;
497
381M
}
498
499
162M
void Connection::UpdateLastRead() {
500
162M
  context_->UpdateLastRead(shared_from_this());
501
162M
}
502
503
160M
void Connection::UpdateLastWrite() {
504
160M
  context_->UpdateLastWrite(shared_from_this());
505
160M
}
506
507
167M
void Connection::Transferred(const OutboundDataPtr& data, const Status& status) {
508
167M
  data->Transferred(status, this);
509
167M
}
510
511
3.04M
void Connection::Destroy(const Status& status) {
512
3.04M
  reactor_->DestroyConnection(this, status);
513
3.04M
}
514
515
2.26k
std::string Connection::LogPrefix() const {
516
2.26k
  return ToString() + ": ";
517
2.26k
}
518
519
}  // namespace rpc
520
}  // namespace yb