/Users/deen/code/yugabyte-db/src/yb/rpc/rpc_with_queue.h
Line | Count | Source (jump to first uncovered line) |
1 | | // |
2 | | // Copyright (c) YugaByte, Inc. |
3 | | // |
4 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
5 | | // in compliance with the License. You may obtain a copy of the License at |
6 | | // |
7 | | // http://www.apache.org/licenses/LICENSE-2.0 |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
10 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
11 | | // or implied. See the License for the specific language governing permissions and limitations |
12 | | // under the License. |
13 | | // |
14 | | // |
15 | | |
16 | | #ifndef YB_RPC_RPC_WITH_QUEUE_H |
17 | | #define YB_RPC_RPC_WITH_QUEUE_H |
18 | | |
19 | | #include <stdint.h> |
20 | | |
21 | | #include <functional> |
22 | | #include <mutex> |
23 | | #include <type_traits> |
24 | | #include <unordered_set> |
25 | | |
26 | | #include "yb/rpc/connection_context.h" |
27 | | #include "yb/rpc/inbound_call.h" |
28 | | |
29 | | #include "yb/util/size_literals.h" |
30 | | |
31 | | namespace yb { |
32 | | namespace rpc { |
33 | | |
34 | | class QueueableInboundCall : public InboundCall { |
35 | | public: |
36 | | QueueableInboundCall(ConnectionPtr conn, size_t weight_in_bytes, |
37 | | CallProcessedListener call_processed_listener) |
38 | | : InboundCall(std::move(conn), nullptr /* rpc_metrics */, std::move(call_processed_listener)), |
39 | 209k | weight_in_bytes_(weight_in_bytes) {} |
40 | | |
41 | 209k | void SetHasReply() { |
42 | 209k | has_reply_.store(true, std::memory_order_release); |
43 | 209k | } |
44 | | |
45 | 209k | bool has_reply() const { |
46 | 209k | return has_reply_.load(std::memory_order_acquire); |
47 | 209k | } |
48 | | |
49 | 2 | void Abort(const Status& status) { |
50 | 2 | aborted_.store(true, std::memory_order_release); |
51 | 2 | } |
52 | | |
53 | 208k | bool aborted() const { |
54 | 208k | return aborted_.load(std::memory_order_acquire); |
55 | 208k | } |
56 | | |
57 | | // Context with queue has limit on bytes used by queued commands. |
58 | | // `weight_in_bytes` function is used to determine how many bytes consumes this call. |
59 | 419k | size_t weight_in_bytes() const { return weight_in_bytes_; } |
60 | | |
61 | | private: |
62 | | std::atomic<bool> has_reply_{false}; |
63 | | std::atomic<bool> aborted_{false}; |
64 | | const size_t weight_in_bytes_; |
65 | | }; |
66 | | |
67 | | class ConnectionContextWithQueue : public ConnectionContextBase { |
68 | | protected: |
69 | | explicit ConnectionContextWithQueue( |
70 | | size_t max_concurrent_calls, |
71 | | size_t max_queued_bytes); |
72 | | |
73 | | ~ConnectionContextWithQueue(); |
74 | | |
75 | 209k | InboundCall::CallProcessedListener call_processed_listener() { |
76 | 209k | return std::bind(&ConnectionContextWithQueue::CallProcessed, this, std::placeholders::_1); |
77 | 209k | } |
78 | | |
79 | 424k | bool can_enqueue() const { |
80 | 424k | return queued_bytes_ <= max_queued_bytes_; |
81 | 424k | } |
82 | | |
83 | | void Enqueue(std::shared_ptr<QueueableInboundCall> call); |
84 | | |
85 | 0 | uint64_t ProcessedCallCount() override { |
86 | 0 | return processed_call_count_.load(std::memory_order_acquire); |
87 | 0 | } |
88 | | |
89 | | void Shutdown(const Status& status) override; |
90 | | |
91 | | private: |
92 | | void AssignConnection(const ConnectionPtr& conn) override; |
93 | | void DumpPB(const DumpRunningRpcsRequestPB& req, RpcConnectionPB* resp) override; |
94 | | bool Idle(std::string* reason_not_idle = nullptr) override; |
95 | | void QueueResponse(const ConnectionPtr& conn, InboundCallPtr call) override; |
96 | 1 | void ListenIdle(IdleListener listener) override { idle_listener_ = std::move(listener); } |
97 | | |
98 | | void CallProcessed(InboundCall* call); |
99 | | void FlushOutboundQueue(Connection* conn); |
100 | | void FlushOutboundQueueAborted(const Status& status); |
101 | | |
102 | | const size_t max_concurrent_calls_; |
103 | | const size_t max_queued_bytes_; |
104 | | size_t replies_being_sent_ = 0; |
105 | | size_t queued_bytes_ = 0; |
106 | | |
107 | | // Calls that are being processed by this connection/context. |
108 | | // At the top or queue there are replies_being_sent_ calls, for which we are sending reply. |
109 | | // After that there are calls that are being processed. |
110 | | // first_without_reply_ points to the first of them. |
111 | | // There are not more than max_concurrent_calls_ entries in first two groups. |
112 | | // After end of queue there are calls that we received but processing did not start for them. |
113 | | std::deque<std::shared_ptr<QueueableInboundCall>> calls_queue_; |
114 | | std::shared_ptr<ReactorTask> flush_outbound_queue_task_; |
115 | | |
116 | | // First call that does not have reply yet. |
117 | | std::atomic<QueueableInboundCall*> first_without_reply_{nullptr}; |
118 | | std::atomic<uint64_t> processed_call_count_{0}; |
119 | | IdleListener idle_listener_; |
120 | | }; |
121 | | |
122 | | } // namespace rpc |
123 | | } // namespace yb |
124 | | |
125 | | #endif // YB_RPC_RPC_WITH_QUEUE_H |