/Users/deen/code/yugabyte-db/src/yb/rpc/rpc_with_queue.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // |
2 | | // Copyright (c) YugaByte, Inc. |
3 | | // |
4 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
5 | | // in compliance with the License. You may obtain a copy of the License at |
6 | | // |
7 | | // http://www.apache.org/licenses/LICENSE-2.0 |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
10 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
11 | | // or implied. See the License for the specific language governing permissions and limitations |
12 | | // under the License. |
13 | | // |
14 | | // |
15 | | |
16 | | #include "yb/rpc/rpc_with_queue.h" |
17 | | |
18 | | #include "yb/gutil/casts.h" |
19 | | |
20 | | #include "yb/rpc/connection.h" |
21 | | #include "yb/rpc/messenger.h" |
22 | | #include "yb/rpc/reactor.h" |
23 | | #include "yb/rpc/rpc_introspection.pb.h" |
24 | | |
25 | | #include "yb/util/string_util.h" |
26 | | |
27 | | namespace yb { |
28 | | namespace rpc { |
29 | | |
30 | | ConnectionContextWithQueue::ConnectionContextWithQueue( |
31 | | size_t max_concurrent_calls, |
32 | | size_t max_queued_bytes) |
33 | 528 | : max_concurrent_calls_(max_concurrent_calls), max_queued_bytes_(max_queued_bytes) { |
34 | 528 | } |
35 | | |
36 | 125 | ConnectionContextWithQueue::~ConnectionContextWithQueue() { |
37 | 125 | } |
38 | | |
39 | | void ConnectionContextWithQueue::DumpPB(const DumpRunningRpcsRequestPB& req, |
40 | 0 | RpcConnectionPB* resp) { |
41 | 0 | for (auto& call : calls_queue_) { |
42 | 0 | call->DumpPB(req, resp->add_calls_in_flight()); |
43 | 0 | } |
44 | 0 | } |
45 | | |
46 | 104k | bool ConnectionContextWithQueue::Idle(std::string* reason_not_idle) { |
47 | 104k | if (calls_queue_.empty()) { |
48 | 104k | return true; |
49 | 104k | } |
50 | | |
51 | 1 | if (reason_not_idle) { |
52 | 0 | AppendWithSeparator(Format("$0 calls", calls_queue_.size()), reason_not_idle); |
53 | 0 | } |
54 | | |
55 | 1 | return false; |
56 | 1 | } |
57 | | |
58 | 104k | void ConnectionContextWithQueue::Enqueue(std::shared_ptr<QueueableInboundCall> call) { |
59 | 104k | auto reactor = call->connection()->reactor(); |
60 | 104k | DCHECK(reactor->IsCurrentThread()); |
61 | | |
62 | 104k | calls_queue_.push_back(call); |
63 | 104k | queued_bytes_ += call->weight_in_bytes(); |
64 | | |
65 | 104k | size_t size = calls_queue_.size(); |
66 | 104k | if (size == replies_being_sent_ + 1) { |
67 | 104k | first_without_reply_.store(call.get(), std::memory_order_release); |
68 | 104k | } |
69 | 104k | if (size <= max_concurrent_calls_) { |
70 | 104k | reactor->messenger()->Handle(call, Queue::kTrue); |
71 | 104k | } |
72 | 104k | } |
73 | | |
74 | 249 | void ConnectionContextWithQueue::Shutdown(const Status& status) { |
75 | | // Could erase calls, that we did not start to process yet. |
76 | 249 | if (calls_queue_.size() > max_concurrent_calls_) { |
77 | 0 | calls_queue_.erase(calls_queue_.begin() + max_concurrent_calls_, calls_queue_.end()); |
78 | 0 | } |
79 | | |
80 | 0 | for (auto& call : calls_queue_) { |
81 | 0 | call->Abort(status); |
82 | 0 | } |
83 | 249 | } |
84 | | |
85 | 104k | void ConnectionContextWithQueue::CallProcessed(InboundCall* call) { |
86 | 104k | ++processed_call_count_; |
87 | 104k | auto reactor = call->connection()->reactor(); |
88 | 104k | DCHECK(reactor->IsCurrentThread()); |
89 | | |
90 | 104k | DCHECK(!calls_queue_.empty()); |
91 | 104k | DCHECK_EQ(calls_queue_.front().get(), call); |
92 | 104k | DCHECK_GT(replies_being_sent_, 0); |
93 | | |
94 | 104k | bool could_enqueue = can_enqueue(); |
95 | 104k | auto call_weight_in_bytes = down_cast<QueueableInboundCall*>(call)->weight_in_bytes(); |
96 | 104k | queued_bytes_ -= call_weight_in_bytes; |
97 | | |
98 | 104k | calls_queue_.pop_front(); |
99 | 104k | --replies_being_sent_; |
100 | 104k | if (calls_queue_.size() >= max_concurrent_calls_) { |
101 | 0 | auto call_ptr = calls_queue_[max_concurrent_calls_ - 1]; |
102 | 0 | reactor->messenger()->Handle(call_ptr, Queue::kTrue); |
103 | 0 | } |
104 | 104k | if (Idle() && idle_listener_) { |
105 | 0 | idle_listener_(); |
106 | 0 | } |
107 | | |
108 | 104k | if (!could_enqueue && can_enqueue()) { |
109 | 0 | call->connection()->ParseReceived(); |
110 | 0 | } |
111 | 104k | } |
112 | | |
113 | | void ConnectionContextWithQueue::QueueResponse(const ConnectionPtr& conn, |
114 | 104k | InboundCallPtr call) { |
115 | 104k | QueueableInboundCall* queueable_call = down_cast<QueueableInboundCall*>(call.get()); |
116 | 104k | queueable_call->SetHasReply(); |
117 | 104k | if (queueable_call == first_without_reply_.load(std::memory_order_acquire)) { |
118 | 104k | auto scheduled = conn->reactor()->ScheduleReactorTask(flush_outbound_queue_task_); |
119 | 0 | LOG_IF(WARNING, !scheduled) << "Failed to schedule flush outbound queue"; |
120 | 104k | } |
121 | 104k | } |
122 | | |
123 | 104k | void ConnectionContextWithQueue::FlushOutboundQueue(Connection* conn) { |
124 | 104k | DCHECK(conn->reactor()->IsCurrentThread()); |
125 | | |
126 | 104k | const size_t begin = replies_being_sent_; |
127 | 104k | size_t end = begin; |
128 | 104k | for (;;) { |
129 | 104k | size_t queue_size = calls_queue_.size(); |
130 | 209k | while (end < queue_size) { |
131 | 104k | if (!calls_queue_[end]->has_reply()) { |
132 | 0 | break; |
133 | 0 | } |
134 | 104k | ++end; |
135 | 104k | } |
136 | 104k | auto new_first_without_reply = end < queue_size ? calls_queue_[end].get() : nullptr; |
137 | 104k | first_without_reply_.store(new_first_without_reply, std::memory_order_release); |
138 | | // It is usual case that we break here, but sometimes there could happen that before updating |
139 | | // first_without_reply_ we did QueueResponse for this call. |
140 | 104k | if (!new_first_without_reply || !new_first_without_reply->has_reply()) { |
141 | 104k | break; |
142 | 104k | } |
143 | 104k | } |
144 | | |
145 | 104k | if (begin != end) { |
146 | 104k | replies_being_sent_ = end; |
147 | 104k | boost::container::small_vector<OutboundDataPtr, 64> batch( |
148 | 104k | calls_queue_.begin() + begin, |
149 | 104k | calls_queue_.begin() + end); |
150 | 104k | conn->QueueOutboundDataBatch(batch); |
151 | 104k | } |
152 | 104k | } |
153 | | |
154 | 528 | void ConnectionContextWithQueue::AssignConnection(const ConnectionPtr& conn) { |
155 | 528 | flush_outbound_queue_task_ = MakeFunctorReactorTask( |
156 | 528 | std::bind(&ConnectionContextWithQueue::FlushOutboundQueue, this, conn.get()), conn, |
157 | 528 | SOURCE_LOCATION()); |
158 | 528 | } |
159 | | |
160 | | } // namespace rpc |
161 | | } // namespace yb |