/Users/deen/code/yugabyte-db/src/yb/consensus/multi_raft_batcher.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | |
13 | | #include "yb/consensus/multi_raft_batcher.h" |
14 | | #include <memory> |
15 | | |
16 | | #include "yb/common/wire_protocol.h" |
17 | | |
18 | | #include "yb/consensus/consensus_meta.h" |
19 | | #include "yb/consensus/consensus.proxy.h" |
20 | | |
21 | | #include "yb/rpc/periodic.h" |
22 | | |
23 | | #include "yb/util/flag_tags.h" |
24 | | |
25 | | using namespace std::literals; |
26 | | using namespace std::placeholders; |
27 | | |
28 | | // NOTE: For tests set this value to ~10ms |
29 | | DEFINE_int32(multi_raft_heartbeat_interval_ms, 10, |
30 | | "The heartbeat interval for batch Raft replication."); |
31 | | TAG_FLAG(multi_raft_heartbeat_interval_ms, experimental); |
32 | | TAG_FLAG(multi_raft_heartbeat_interval_ms, hidden); |
33 | | |
34 | | // TODO: Add MultiRaftUpdateConsensus to metrics.yml when flag is turned on |
35 | | DEFINE_bool(enable_multi_raft_heartbeat_batcher, false, |
36 | | "Whether to enable the batching of raft heartbeats."); |
37 | | TAG_FLAG(enable_multi_raft_heartbeat_batcher, experimental); |
38 | | TAG_FLAG(enable_multi_raft_heartbeat_batcher, hidden); |
39 | | |
40 | | // NOTE: For tests set this value to 1 |
41 | | DEFINE_uint64(multi_raft_batch_size, 1, |
42 | | "Maximum batch size for a multi-raft consensus payload."); |
43 | | TAG_FLAG(multi_raft_batch_size, experimental); |
44 | | TAG_FLAG(multi_raft_batch_size, hidden); |
45 | | |
46 | | DECLARE_int32(consensus_rpc_timeout_ms); |
47 | | |
48 | | namespace yb { |
49 | | namespace consensus { |
50 | | |
51 | | using rpc::PeriodicTimer; |
52 | | |
53 | | struct MultiRaftHeartbeatBatcher::MultiRaftConsensusData { |
54 | | MultiRaftConsensusRequestPB batch_req; |
55 | | MultiRaftConsensusResponsePB batch_res; |
56 | | rpc::RpcController controller; |
57 | | std::vector<ResponseCallbackData> response_callback_data; |
58 | | }; |
59 | | |
60 | | MultiRaftHeartbeatBatcher::MultiRaftHeartbeatBatcher(const yb::HostPort& hostport, |
61 | | rpc::ProxyCache* proxy_cache, |
62 | | rpc::Messenger* messenger): |
63 | | messenger_(messenger), |
64 | | consensus_proxy_(std::make_unique<ConsensusServiceProxy>(proxy_cache, hostport)), |
65 | 0 | current_batch_(std::make_shared<MultiRaftConsensusData>()) {} |
66 | | |
67 | 0 | void MultiRaftHeartbeatBatcher::Start() { |
68 | 0 | std::weak_ptr<MultiRaftHeartbeatBatcher> weak_peer = shared_from_this(); |
69 | 0 | batch_sender_ = PeriodicTimer::Create( |
70 | 0 | messenger_, |
71 | 0 | [weak_peer]() { |
72 | 0 | if (auto peer = weak_peer.lock()) { |
73 | 0 | peer->PrepareAndSendBatchRequest(); |
74 | 0 | } |
75 | 0 | }, |
76 | 0 | MonoDelta::FromMilliseconds(FLAGS_multi_raft_heartbeat_interval_ms)); |
77 | 0 | batch_sender_->Start(); |
78 | 0 | } |
79 | | |
80 | 0 | MultiRaftHeartbeatBatcher::~MultiRaftHeartbeatBatcher() = default; |
81 | | |
82 | | void MultiRaftHeartbeatBatcher::AddRequestToBatch(ConsensusRequestPB* request, |
83 | | ConsensusResponsePB* response, |
84 | 0 | HeartbeatResponseCallback callback) { |
85 | 0 | std::shared_ptr<MultiRaftConsensusData> data = nullptr; |
86 | 0 | { |
87 | 0 | std::lock_guard<std::mutex> lock(mutex_); |
88 | 0 | current_batch_->response_callback_data.push_back({ |
89 | 0 | response, |
90 | 0 | std::move(callback) |
91 | 0 | }); |
92 | | // Add a ConsensusRequestPB to the batch |
93 | 0 | current_batch_->batch_req.add_consensus_request()->Swap(request); |
94 | 0 | if (FLAGS_multi_raft_batch_size > 0 |
95 | 0 | && current_batch_->response_callback_data.size() >= FLAGS_multi_raft_batch_size) { |
96 | 0 | data = PrepareNextBatchRequest(); |
97 | 0 | } |
98 | 0 | } |
99 | 0 | if (data) { |
100 | 0 | SendBatchRequest(data); |
101 | 0 | } |
102 | 0 | } |
103 | | |
104 | 0 | void MultiRaftHeartbeatBatcher::PrepareAndSendBatchRequest() { |
105 | 0 | std::shared_ptr<MultiRaftConsensusData> data; |
106 | 0 | { |
107 | 0 | std::lock_guard<std::mutex> lock(mutex_); |
108 | 0 | data = PrepareNextBatchRequest(); |
109 | 0 | } |
110 | 0 | SendBatchRequest(data); |
111 | 0 | } |
112 | | |
113 | | std::shared_ptr<MultiRaftHeartbeatBatcher::MultiRaftConsensusData> |
114 | 0 | MultiRaftHeartbeatBatcher::PrepareNextBatchRequest() { |
115 | 0 | if (current_batch_->batch_req.consensus_request_size() == 0) { |
116 | 0 | return nullptr; |
117 | 0 | } |
118 | 0 | batch_sender_->Snooze(); |
119 | 0 | auto data = std::move(current_batch_); |
120 | 0 | current_batch_ = std::make_shared<MultiRaftConsensusData>(); |
121 | 0 | return data; |
122 | 0 | } |
123 | | |
124 | 0 | void MultiRaftHeartbeatBatcher::SendBatchRequest(std::shared_ptr<MultiRaftConsensusData> data) { |
125 | 0 | if (!data) { |
126 | 0 | return; |
127 | 0 | } |
128 | | |
129 | 0 | data->controller.Reset(); |
130 | 0 | data->controller.set_timeout(MonoDelta::FromMilliseconds( |
131 | 0 | FLAGS_consensus_rpc_timeout_ms * data->batch_req.consensus_request_size())); |
132 | 0 | consensus_proxy_->MultiRaftUpdateConsensusAsync( |
133 | 0 | data->batch_req, &data->batch_res, &data->controller, |
134 | 0 | std::bind(&MultiRaftHeartbeatBatcher::MultiRaftUpdateHeartbeatResponseCallback, |
135 | 0 | shared_from_this(), data)); |
136 | 0 | } |
137 | | |
138 | | void MultiRaftHeartbeatBatcher::MultiRaftUpdateHeartbeatResponseCallback( |
139 | 0 | std::shared_ptr<MultiRaftConsensusData> data) { |
140 | 0 | auto status = data->controller.status(); |
141 | 0 | for (int i = 0; i < data->batch_req.consensus_request_size(); i++) { |
142 | 0 | auto callback_data = data->response_callback_data[i]; |
143 | 0 | if (status.ok()) { |
144 | 0 | callback_data.resp->Swap(data->batch_res.mutable_consensus_response(i)); |
145 | 0 | } |
146 | 0 | callback_data.callback(status); |
147 | 0 | } |
148 | 0 | } |
149 | | |
150 | | MultiRaftManager::MultiRaftManager(rpc::Messenger* messenger, |
151 | | rpc::ProxyCache* proxy_cache, |
152 | | CloudInfoPB local_peer_cloud_info_pb): |
153 | | messenger_(messenger), proxy_cache_(proxy_cache), |
154 | 11.1k | local_peer_cloud_info_pb_(std::move(local_peer_cloud_info_pb)) {} |
155 | | |
156 | 69.5k | MultiRaftHeartbeatBatcherPtr MultiRaftManager::AddOrGetBatcher(const RaftPeerPB& remote_peer_pb) { |
157 | 69.5k | if (!FLAGS_enable_multi_raft_heartbeat_batcher) { |
158 | 69.5k | return nullptr; |
159 | 69.5k | } |
160 | | |
161 | 7 | auto hostport = HostPortFromPB(DesiredHostPort(remote_peer_pb, local_peer_cloud_info_pb_)); |
162 | 7 | std::lock_guard<std::mutex> lock(mutex_); |
163 | 7 | MultiRaftHeartbeatBatcherPtr batcher; |
164 | | |
165 | | // After taking the lock, check if there is already a batcher |
166 | | // for the same remote host and return it. |
167 | 7 | auto res = batchers_.find(hostport); |
168 | 7 | if (res != batchers_.end() && (batcher = res->second.lock())) { |
169 | 0 | return batcher; |
170 | 0 | } |
171 | 7 | batcher = std::make_shared<MultiRaftHeartbeatBatcher>(hostport, proxy_cache_, messenger_); |
172 | 7 | batchers_[hostport] = batcher; |
173 | 7 | batcher->Start(); |
174 | 7 | return batcher; |
175 | 7 | } |
176 | | |
177 | | } // namespace consensus |
178 | | } // namespace yb |