YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/rpc/rpc_with_queue.cc
Line
Count
Source (jump to first uncovered line)
1
//
2
// Copyright (c) YugaByte, Inc.
3
//
4
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
5
// in compliance with the License.  You may obtain a copy of the License at
6
//
7
// http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software distributed under the License
10
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
11
// or implied.  See the License for the specific language governing permissions and limitations
12
// under the License.
13
//
14
//
15
16
#include "yb/rpc/rpc_with_queue.h"
17
18
#include "yb/gutil/casts.h"
19
20
#include "yb/rpc/connection.h"
21
#include "yb/rpc/messenger.h"
22
#include "yb/rpc/reactor.h"
23
#include "yb/rpc/rpc_introspection.pb.h"
24
25
#include "yb/util/string_util.h"
26
27
namespace yb {
28
namespace rpc {
29
30
ConnectionContextWithQueue::ConnectionContextWithQueue(
31
    size_t max_concurrent_calls,
32
    size_t max_queued_bytes)
33
528
    : max_concurrent_calls_(max_concurrent_calls), max_queued_bytes_(max_queued_bytes) {
34
528
}
35
36
125
ConnectionContextWithQueue::~ConnectionContextWithQueue() {
37
125
}
38
39
void ConnectionContextWithQueue::DumpPB(const DumpRunningRpcsRequestPB& req,
40
0
                                        RpcConnectionPB* resp) {
41
0
  for (auto& call : calls_queue_) {
42
0
    call->DumpPB(req, resp->add_calls_in_flight());
43
0
  }
44
0
}
45
46
104k
bool ConnectionContextWithQueue::Idle(std::string* reason_not_idle) {
47
104k
  if (calls_queue_.empty()) {
48
104k
    return true;
49
104k
  }
50
51
1
  if (reason_not_idle) {
52
0
    AppendWithSeparator(Format("$0 calls", calls_queue_.size()), reason_not_idle);
53
0
  }
54
55
1
  return false;
56
1
}
57
58
104k
void ConnectionContextWithQueue::Enqueue(std::shared_ptr<QueueableInboundCall> call) {
59
104k
  auto reactor = call->connection()->reactor();
60
104k
  DCHECK(reactor->IsCurrentThread());
61
62
104k
  calls_queue_.push_back(call);
63
104k
  queued_bytes_ += call->weight_in_bytes();
64
65
104k
  size_t size = calls_queue_.size();
66
104k
  if (size == replies_being_sent_ + 1) {
67
104k
    first_without_reply_.store(call.get(), std::memory_order_release);
68
104k
  }
69
104k
  if (size <= max_concurrent_calls_) {
70
104k
    reactor->messenger()->Handle(call, Queue::kTrue);
71
104k
  }
72
104k
}
73
74
249
void ConnectionContextWithQueue::Shutdown(const Status& status) {
75
  // Could erase calls, that we did not start to process yet.
76
249
  if (calls_queue_.size() > max_concurrent_calls_) {
77
0
    calls_queue_.erase(calls_queue_.begin() + max_concurrent_calls_, calls_queue_.end());
78
0
  }
79
80
0
  for (auto& call : calls_queue_) {
81
0
    call->Abort(status);
82
0
  }
83
249
}
84
85
104k
void ConnectionContextWithQueue::CallProcessed(InboundCall* call) {
86
104k
  ++processed_call_count_;
87
104k
  auto reactor = call->connection()->reactor();
88
104k
  DCHECK(reactor->IsCurrentThread());
89
90
104k
  DCHECK(!calls_queue_.empty());
91
104k
  DCHECK_EQ(calls_queue_.front().get(), call);
92
104k
  DCHECK_GT(replies_being_sent_, 0);
93
94
104k
  bool could_enqueue = can_enqueue();
95
104k
  auto call_weight_in_bytes = down_cast<QueueableInboundCall*>(call)->weight_in_bytes();
96
104k
  queued_bytes_ -= call_weight_in_bytes;
97
98
104k
  calls_queue_.pop_front();
99
104k
  --replies_being_sent_;
100
104k
  if (calls_queue_.size() >= max_concurrent_calls_) {
101
0
    auto call_ptr = calls_queue_[max_concurrent_calls_ - 1];
102
0
    reactor->messenger()->Handle(call_ptr, Queue::kTrue);
103
0
  }
104
104k
  if (Idle() && idle_listener_) {
105
0
    idle_listener_();
106
0
  }
107
108
104k
  if (!could_enqueue && can_enqueue()) {
109
0
    call->connection()->ParseReceived();
110
0
  }
111
104k
}
112
113
void ConnectionContextWithQueue::QueueResponse(const ConnectionPtr& conn,
114
104k
                                               InboundCallPtr call) {
115
104k
  QueueableInboundCall* queueable_call = down_cast<QueueableInboundCall*>(call.get());
116
104k
  queueable_call->SetHasReply();
117
104k
  if (queueable_call == first_without_reply_.load(std::memory_order_acquire)) {
118
104k
    auto scheduled = conn->reactor()->ScheduleReactorTask(flush_outbound_queue_task_);
119
0
    LOG_IF(WARNING, !scheduled) << "Failed to schedule flush outbound queue";
120
104k
  }
121
104k
}
122
123
104k
void ConnectionContextWithQueue::FlushOutboundQueue(Connection* conn) {
124
104k
  DCHECK(conn->reactor()->IsCurrentThread());
125
126
104k
  const size_t begin = replies_being_sent_;
127
104k
  size_t end = begin;
128
104k
  for (;;) {
129
104k
    size_t queue_size = calls_queue_.size();
130
209k
    while (end < queue_size) {
131
104k
      if (!calls_queue_[end]->has_reply()) {
132
0
        break;
133
0
      }
134
104k
      ++end;
135
104k
    }
136
104k
    auto new_first_without_reply = end < queue_size ? calls_queue_[end].get() : nullptr;
137
104k
    first_without_reply_.store(new_first_without_reply, std::memory_order_release);
138
    // It is usual case that we break here, but sometimes there could happen that before updating
139
    // first_without_reply_ we did QueueResponse for this call.
140
104k
    if (!new_first_without_reply || !new_first_without_reply->has_reply()) {
141
104k
      break;
142
104k
    }
143
104k
  }
144
145
104k
  if (begin != end) {
146
104k
    replies_being_sent_ = end;
147
104k
    boost::container::small_vector<OutboundDataPtr, 64> batch(
148
104k
        calls_queue_.begin() + begin,
149
104k
        calls_queue_.begin() + end);
150
104k
    conn->QueueOutboundDataBatch(batch);
151
104k
  }
152
104k
}
153
154
528
void ConnectionContextWithQueue::AssignConnection(const ConnectionPtr& conn) {
155
528
  flush_outbound_queue_task_ = MakeFunctorReactorTask(
156
528
      std::bind(&ConnectionContextWithQueue::FlushOutboundQueue, this, conn.get()), conn,
157
528
      SOURCE_LOCATION());
158
528
}
159
160
} // namespace rpc
161
} // namespace yb