YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/rpc/connection.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include "yb/rpc/connection.h"
34
35
#include <thread>
36
#include <utility>
37
38
#include "yb/gutil/map-util.h"
39
#include "yb/gutil/strings/substitute.h"
40
41
#include "yb/rpc/connection_context.h"
42
#include "yb/rpc/messenger.h"
43
#include "yb/rpc/reactor.h"
44
#include "yb/rpc/rpc_controller.h"
45
#include "yb/rpc/rpc_introspection.pb.h"
46
#include "yb/rpc/rpc_metrics.h"
47
48
#include "yb/util/enums.h"
49
#include "yb/util/format.h"
50
#include "yb/util/logging.h"
51
#include "yb/util/metrics.h"
52
#include "yb/util/result.h"
53
#include "yb/util/status_format.h"
54
#include "yb/util/string_util.h"
55
#include "yb/util/trace.h"
56
#include "yb/util/tsan_util.h"
57
58
using namespace std::literals;
59
using namespace std::placeholders;
60
using std::shared_ptr;
61
using std::vector;
62
using strings::Substitute;
63
64
DEFINE_uint64(rpc_connection_timeout_ms, yb::NonTsanVsTsan(15000, 30000),
65
    "Timeout for RPC connection operations");
66
67
METRIC_DEFINE_histogram_with_percentiles(
68
    server, handler_latency_outbound_transfer, "Time taken to transfer the response ",
69
    yb::MetricUnit::kMicroseconds, "Microseconds spent to queue and write the response to the wire",
70
    60000000LU, 2);
71
72
namespace yb {
73
namespace rpc {
74
75
///
76
/// Connection
77
///
78
Connection::Connection(Reactor* reactor,
79
                       std::unique_ptr<Stream> stream,
80
                       Direction direction,
81
                       RpcMetrics* rpc_metrics,
82
                       std::unique_ptr<ConnectionContext> context)
83
    : reactor_(reactor),
84
      stream_(std::move(stream)),
85
      direction_(direction),
86
      last_activity_time_(CoarseMonoClock::Now()),
87
      rpc_metrics_(rpc_metrics),
88
946k
      context_(std::move(context)) {
89
946k
  const auto metric_entity = reactor->messenger()->metric_entity();
90
946k
  handler_latency_outbound_transfer_ = metric_entity ?
91
884k
      METRIC_handler_latency_outbound_transfer.Instantiate(metric_entity) : nullptr;
92
946k
  IncrementCounter(rpc_metrics_->connections_created);
93
946k
  IncrementGauge(rpc_metrics_->connections_alive);
94
946k
}
95
96
623k
Connection::~Connection() {
97
623k
  DecrementGauge(rpc_metrics_->connections_alive);
98
623k
}
99
100
0
void UpdateIdleReason(const char* message, bool* result, std::string* reason) {
101
0
  *result = false;
102
0
  if (reason) {
103
0
    AppendWithSeparator(message, reason);
104
0
  }
105
0
}
106
107
59.6M
bool Connection::Idle(std::string* reason_not_idle) const {
108
59.6M
  DCHECK(reactor_->IsCurrentThread());
109
110
59.6M
  bool result = stream_->Idle(reason_not_idle);
111
112
  // Connection is not idle if calls are waiting for a response.
113
59.6M
  if (!awaiting_response_.empty()) {
114
0
    UpdateIdleReason("awaiting response", &result, reason_not_idle);
115
0
  }
116
117
  // Check upstream logic (i.e. processing calls, etc.)
118
59.6M
  return context_->Idle(reason_not_idle) && result;
119
59.6M
}
120
121
0
std::string Connection::ReasonNotIdle() const {
122
0
  std::string reason;
123
0
  Idle(&reason);
124
0
  return reason;
125
0
}
126
127
1.21M
void Connection::Shutdown(const Status& status) {
128
1.21M
  DCHECK(reactor_->IsCurrentThread());
129
130
1.21M
  {
131
1.21M
    std::lock_guard<simple_spinlock> lock(outbound_data_queue_lock_);
132
1.21M
    outbound_data_being_processed_.swap(outbound_data_to_process_);
133
1.21M
    shutdown_status_ = status;
134
1.21M
  }
135
136
  // Clear any calls which have been sent and were awaiting a response.
137
12.1k
  for (auto& v : awaiting_response_) {
138
12.1k
    if (v.second) {
139
10.4k
      v.second->SetFailed(status);
140
10.4k
    }
141
12.1k
  }
142
1.21M
  awaiting_response_.clear();
143
144
134
  for (auto& call : outbound_data_being_processed_) {
145
134
    call->Transferred(status, this);
146
134
  }
147
1.21M
  outbound_data_being_processed_.clear();
148
149
1.21M
  context_->Shutdown(status);
150
151
1.21M
  stream_->Shutdown(status);
152
1.21M
  timer_.Shutdown();
153
154
  // TODO(bogdan): re-enable once we decide how to control verbose logs better...
155
  // LOG_WITH_PREFIX(INFO) << "Connection::Shutdown completed, status: " << status;
156
1.21M
}
157
158
44.0M
void Connection::OutboundQueued() {
159
44.0M
  DCHECK(reactor_->IsCurrentThread());
160
161
44.0M
  auto status = stream_->TryWrite();
162
44.0M
  if (!status.ok()) {
163
0
    VLOG_WITH_PREFIX(1) << "Write failed: " << status;
164
28
    auto scheduled = reactor_->ScheduleReactorTask(
165
28
        MakeFunctorReactorTask(
166
28
            std::bind(&Reactor::DestroyConnection, reactor_, this, status),
167
28
            shared_from_this(), SOURCE_LOCATION()));
168
0
    LOG_IF_WITH_PREFIX(WARNING, !scheduled) << "Failed to schedule destroy";
169
28
  }
170
44.0M
}
171
172
12.7M
void Connection::HandleTimeout(ev::timer& watcher, int revents) {  // NOLINT
173
12.7M
  DCHECK(reactor_->IsCurrentThread());
174
31.9k
  DVLOG_WITH_PREFIX(5) << "Connection::HandleTimeout revents: " << revents
175
31.9k
                       << " connected: " << stream_->IsConnected();
176
177
12.7M
  if (EV_ERROR & revents) {
178
0
    LOG_WITH_PREFIX(WARNING) << "Got an error in handle timeout";
179
0
    return;
180
0
  }
181
182
12.7M
  auto now = CoarseMonoClock::Now();
183
184
12.7M
  CoarseTimePoint deadline = CoarseTimePoint::max();
185
12.7M
  if (!stream_->IsConnected()) {
186
646
    const MonoDelta timeout = FLAGS_rpc_connection_timeout_ms * 1ms;
187
646
    deadline = last_activity_time_ + timeout;
188
0
    DVLOG_WITH_PREFIX(5) << Format("now: $0, deadline: $1, timeout: $2", now, deadline, timeout);
189
646
    if (now > deadline) {
190
1
      auto passed = reactor_->cur_time() - last_activity_time_;
191
1
      reactor_->DestroyConnection(
192
1
          this,
193
1
          STATUS_FORMAT(NetworkError, "Connect timeout, passed: $0, timeout: $1", passed, timeout));
194
1
      return;
195
1
    }
196
12.7M
  }
197
198
26.9M
  while (!expiration_queue_.empty() && expiration_queue_.top().time <= now) {
199
14.2M
    auto& top = expiration_queue_.top();
200
14.2M
    auto call = top.call.lock();
201
14.2M
    auto handle = top.handle;
202
14.2M
    expiration_queue_.pop();
203
14.2M
    if (call && !call->IsFinished()) {
204
14.7k
      call->SetTimedOut();
205
14.7k
      if (handle != std::numeric_limits<size_t>::max()) {
206
14.1k
        stream_->Cancelled(handle);
207
14.1k
      }
208
14.7k
      auto i = awaiting_response_.find(call->call_id());
209
14.7k
      if (i != awaiting_response_.end()) {
210
9.92k
        i->second.reset();
211
9.92k
      }
212
14.7k
    }
213
14.2M
  }
214
215
12.7M
  if (!expiration_queue_.empty()) {
216
12.5M
    deadline = std::min(deadline, expiration_queue_.top().time);
217
12.5M
  }
218
219
12.7M
  if (deadline != CoarseTimePoint::max()) {
220
12.5M
    timer_.Start(deadline - now);
221
12.5M
  }
222
12.7M
}
223
224
19.8M
void Connection::QueueOutboundCall(const OutboundCallPtr& call) {
225
19.8M
  DCHECK(call);
226
19.8M
  DCHECK_EQ(direction_, Direction::CLIENT);
227
228
19.8M
  auto handle = DoQueueOutboundData(call, true);
229
230
  // Set up the timeout timer.
231
19.8M
  const MonoDelta& timeout = call->controller()->timeout();
232
19.9M
  if (timeout.Initialized()) {
233
19.9M
    auto expires_at = CoarseMonoClock::Now() + timeout.ToSteadyDuration();
234
19.9M
    auto reschedule = expiration_queue_.empty() || expiration_queue_.top().time > expires_at;
235
19.9M
    expiration_queue_.push({expires_at, call, handle});
236
19.9M
    if (reschedule && (stream_->IsConnected() ||
237
1.15M
                       expires_at < last_activity_time_ + FLAGS_rpc_connection_timeout_ms * 1ms)) {
238
1.15M
      timer_.Start(timeout.ToSteadyDuration());
239
1.15M
    }
240
19.9M
  }
241
242
19.8M
  call->SetQueued();
243
19.8M
}
244
245
45.1M
size_t Connection::DoQueueOutboundData(OutboundDataPtr outbound_data, bool batch) {
246
45.1M
  DCHECK(reactor_->IsCurrentThread());
247
18.4E
  DVLOG_WITH_PREFIX(4) << "Connection::DoQueueOutboundData: " << AsString(outbound_data);
248
249
45.1M
  if (!shutdown_status_.ok()) {
250
0
    YB_LOG_EVERY_N_SECS(INFO, 5) << "Connection::DoQueueOutboundData data: "
251
0
                                 << AsString(outbound_data) << " shutdown_status_: "
252
0
                                 << shutdown_status_;
253
0
    outbound_data->Transferred(shutdown_status_, this);
254
0
    return std::numeric_limits<size_t>::max();
255
0
  }
256
257
  // If the connection is torn down, then the QueueOutbound() call that
258
  // eventually runs in the reactor thread will take care of calling
259
  // ResponseTransferCallbacks::NotifyTransferAborted.
260
261
  // Check before and after calling Send. Before to reset state, if we
262
  // were over the limit; but are now back in good standing. After, to
263
  // check if we are now over the limit.
264
45.1M
  Status s = context_->ReportPendingWriteBytes(stream_->GetPendingWriteBytes());
265
45.1M
  if (!s.ok()) {
266
0
    Shutdown(s);
267
0
    return std::numeric_limits<size_t>::max();
268
0
  }
269
45.1M
  auto result = stream_->Send(std::move(outbound_data));
270
45.1M
  if (!result.ok()) {
271
0
    Shutdown(result.status());
272
0
    return std::numeric_limits<size_t>::max();
273
0
  }
274
45.1M
  s = context_->ReportPendingWriteBytes(stream_->GetPendingWriteBytes());
275
45.1M
  if (!s.ok()) {
276
0
    Shutdown(s);
277
0
    return std::numeric_limits<size_t>::max();
278
0
  }
279
280
45.1M
  if (!batch) {
281
1.08M
    OutboundQueued();
282
1.08M
  }
283
284
45.1M
  return *result;
285
45.1M
}
286
287
0
void Connection::ParseReceived() {
288
0
  stream_->ParseReceived();
289
0
}
290
291
42.4M
Result<size_t> Connection::ProcessReceived(ReadBufferFull read_buffer_full) {
292
42.4M
  auto result = context_->ProcessCalls(
293
42.4M
      shared_from_this(), ReadBuffer().AppendedVecs(), read_buffer_full);
294
8.24k
  VLOG_WITH_PREFIX(4) << "context_->ProcessCalls result: " << AsString(result);
295
42.4M
  if (PREDICT_FALSE(!result.ok())) {
296
6
    LOG_WITH_PREFIX(WARNING) << "Command sequence failure: " << result.status();
297
6
    return result.status();
298
6
  }
299
300
42.4M
  if (!result->consumed && ReadBuffer().Full() && context_->Idle()) {
301
0
    return STATUS_FORMAT(
302
0
        InvalidArgument, "Command is greater than read buffer, exist data: $0",
303
0
        IoVecsFullSize(ReadBuffer().AppendedVecs()));
304
0
  }
305
306
42.4M
  ReadBuffer().Consume(result->consumed, result->buffer);
307
308
42.4M
  return result->bytes_to_skip;
309
42.4M
}
310
311
19.5M
Status Connection::HandleCallResponse(CallData* call_data) {
312
19.5M
  DCHECK(reactor_->IsCurrentThread());
313
19.5M
  CallResponse resp;
314
19.5M
  RETURN_NOT_OK(resp.ParseFrom(call_data));
315
316
19.5M
  ++responded_call_count_;
317
19.5M
  auto awaiting = awaiting_response_.find(resp.call_id());
318
19.5M
  if (awaiting == awaiting_response_.end()) {
319
0
    LOG_WITH_PREFIX(DFATAL) << "Got a response for call id " << resp.call_id() << " which "
320
0
                            << "was not pending! Ignoring.";
321
0
    return Status::OK();
322
0
  }
323
19.5M
  auto call = awaiting->second;
324
19.5M
  awaiting_response_.erase(awaiting);
325
326
19.5M
  if (PREDICT_FALSE(!call)) {
327
    // The call already failed due to a timeout.
328
18.4E
    VLOG_WITH_PREFIX(1) << "Got response to call id " << resp.call_id()
329
18.4E
                        << " after client already timed out";
330
7.82k
    return Status::OK();
331
7.82k
  }
332
333
19.5M
  call->SetResponse(std::move(resp));
334
335
19.5M
  return Status::OK();
336
19.5M
}
337
338
19.4M
void Connection::CallSent(OutboundCallPtr call) {
339
19.4M
  DCHECK(reactor_->IsCurrentThread());
340
341
19.4M
  awaiting_response_.emplace(call->call_id(), !call->IsFinished() ? call : nullptr);
342
19.4M
}
343
344
7.70k
std::string Connection::ToString() const {
345
  // This may be called from other threads, so we cannot
346
  // include anything in the output about the current state,
347
  // which might concurrently change from another thread.
348
7.70k
  static const char* format = "Connection ($0) $1 $2 => $3";
349
7.70k
  const void* self = this;
350
7.70k
  if (direction_ == Direction::SERVER) {
351
5.31k
    return Format(format, self, "server", remote(), local());
352
2.38k
  } else {
353
2.38k
    return Format(format, self, "client", local(), remote());
354
2.38k
  }
355
7.70k
}
356
357
Status Connection::DumpPB(const DumpRunningRpcsRequestPB& req,
358
21
                          RpcConnectionPB* resp) {
359
21
  DCHECK(reactor_->IsCurrentThread());
360
21
  resp->set_remote_ip(yb::ToString(remote()));
361
21
  resp->set_state(context_->State());
362
363
21
  const uint64_t processed_call_count =
364
10
      direction_ == Direction::CLIENT ? responded_call_count_.load(std::memory_order_acquire)
365
11
                                      : context_->ProcessedCallCount();
366
21
  if (processed_call_count > 0) {
367
18
    resp->set_processed_call_count(processed_call_count);
368
18
  }
369
370
21
  context_->DumpPB(req, resp);
371
372
21
  if (direction_ == Direction::CLIENT) {
373
10
    auto call_in_flight = resp->add_calls_in_flight();
374
1
    for (auto& entry : awaiting_response_) {
375
1
      if (entry.second && entry.second->DumpPB(req, call_in_flight)) {
376
1
        call_in_flight = resp->add_calls_in_flight();
377
1
      }
378
1
    }
379
10
    resp->mutable_calls_in_flight()->DeleteSubrange(resp->calls_in_flight_size() - 1, 1);
380
10
    stream_->DumpPB(req, resp);
381
11
  } else if (direction_ != Direction::SERVER) {
382
0
    LOG(FATAL) << "Invalid direction: " << to_underlying(direction_);
383
0
  }
384
385
21
  return Status::OK();
386
21
}
387
388
104k
void Connection::QueueOutboundDataBatch(const OutboundDataBatch& batch) {
389
104k
  DCHECK(reactor_->IsCurrentThread());
390
391
104k
  for (const auto& call : batch) {
392
104k
    DoQueueOutboundData(call, /* batch */ true);
393
104k
  }
394
395
104k
  OutboundQueued();
396
104k
}
397
398
25.2M
void Connection::QueueOutboundData(OutboundDataPtr outbound_data) {
399
25.2M
  if (reactor_->IsCurrentThread()) {
400
1.09M
    DoQueueOutboundData(std::move(outbound_data), /* batch */ false);
401
1.09M
    return;
402
1.09M
  }
403
404
24.1M
  bool was_empty;
405
24.1M
  {
406
24.1M
    std::unique_lock<simple_spinlock> lock(outbound_data_queue_lock_);
407
24.1M
    if (!shutdown_status_.ok()) {
408
423
      auto task = MakeFunctorReactorTaskWithAbort(
409
423
          std::bind(&OutboundData::Transferred, outbound_data, _2, /* conn */ nullptr),
410
423
          SOURCE_LOCATION());
411
423
      lock.unlock();
412
423
      auto scheduled = reactor_->ScheduleReactorTask(task, true /* schedule_even_closing */);
413
0
      LOG_IF_WITH_PREFIX(DFATAL, !scheduled) << "Failed to schedule OutboundData::Transferred";
414
423
      return;
415
423
    }
416
24.1M
    was_empty = outbound_data_to_process_.empty();
417
24.1M
    outbound_data_to_process_.push_back(std::move(outbound_data));
418
24.1M
    if (was_empty && !process_response_queue_task_) {
419
313k
      process_response_queue_task_ =
420
313k
          MakeFunctorReactorTask(std::bind(&Connection::ProcessResponseQueue, this),
421
313k
                                 shared_from_this(), SOURCE_LOCATION());
422
313k
    }
423
24.1M
  }
424
425
24.1M
  if (was_empty) {
426
    // TODO: what happens if the reactor is shutting down? Currently Abort is ignored.
427
23.3M
    auto scheduled = reactor_->ScheduleReactorTask(process_response_queue_task_);
428
18.4E
    LOG_IF_WITH_PREFIX(WARNING, !scheduled)
429
18.4E
        << "Failed to schedule Connection::ProcessResponseQueue";
430
23.3M
  }
431
24.1M
}
432
433
23.3M
void Connection::ProcessResponseQueue() {
434
23.3M
  DCHECK(reactor_->IsCurrentThread());
435
436
23.3M
  {
437
23.3M
    std::lock_guard<simple_spinlock> lock(outbound_data_queue_lock_);
438
23.3M
    outbound_data_to_process_.swap(outbound_data_being_processed_);
439
23.3M
  }
440
441
23.3M
  if (!outbound_data_being_processed_.empty()) {
442
24.1M
    for (auto &call : outbound_data_being_processed_) {
443
24.1M
      DoQueueOutboundData(std::move(call), /* batch */ true);
444
24.1M
    }
445
23.3M
    outbound_data_being_processed_.clear();
446
23.3M
    OutboundQueued();
447
23.3M
  }
448
23.3M
}
449
450
944k
Status Connection::Start(ev::loop_ref* loop) {
451
944k
  DCHECK(reactor_->IsCurrentThread());
452
453
944k
  context_->SetEventLoop(loop);
454
455
944k
  RETURN_NOT_OK(stream_->Start(direction_ == Direction::CLIENT, loop, this));
456
457
944k
  timer_.Init(*loop);
458
944k
  timer_.SetCallback<Connection, &Connection::HandleTimeout>(this); // NOLINT
459
460
944k
  if (!stream_->IsConnected()) {
461
933k
    timer_.Start(FLAGS_rpc_connection_timeout_ms * 1ms);
462
933k
  }
463
464
944k
  auto self = shared_from_this();
465
944k
  context_->AssignConnection(self);
466
467
944k
  return Status::OK();
468
944k
}
469
470
613k
void Connection::Connected() {
471
613k
  context_->Connected(shared_from_this());
472
613k
}
473
474
315M
StreamReadBuffer& Connection::ReadBuffer() {
475
315M
  return context_->ReadBuffer();
476
315M
}
477
478
15.5M
const Endpoint& Connection::remote() const {
479
15.5M
  return stream_->Remote();
480
15.5M
}
481
482
3.68M
const Protocol* Connection::protocol() const {
483
3.68M
  return stream_->GetProtocol();
484
3.68M
}
485
486
23.6k
const Endpoint& Connection::local() const {
487
23.6k
  return stream_->Local();
488
23.6k
}
489
490
10.0k
void Connection::Close() {
491
10.0k
  stream_->Close();
492
10.0k
}
493
494
109M
void Connection::UpdateLastActivity() {
495
109M
  last_activity_time_ = reactor_->cur_time();
496
81.8k
  VLOG_WITH_PREFIX(4) << "Updated last_activity_time_=" << AsString(last_activity_time_);
497
109M
}
498
499
42.9M
void Connection::UpdateLastRead() {
500
42.9M
  context_->UpdateLastRead(shared_from_this());
501
42.9M
}
502
503
43.0M
void Connection::UpdateLastWrite() {
504
43.0M
  context_->UpdateLastWrite(shared_from_this());
505
43.0M
}
506
507
45.2M
void Connection::Transferred(const OutboundDataPtr& data, const Status& status) {
508
45.2M
  data->Transferred(status, this);
509
45.2M
}
510
511
594k
void Connection::Destroy(const Status& status) {
512
594k
  reactor_->DestroyConnection(this, status);
513
594k
}
514
515
34
std::string Connection::LogPrefix() const {
516
34
  return ToString() + ": ";
517
34
}
518
519
}  // namespace rpc
520
}  // namespace yb