/Users/deen/code/yugabyte-db/src/yb/consensus/multi_raft_batcher.cc
| Line | Count | Source (jump to first uncovered line) | 
| 1 |  | // Copyright (c) YugaByte, Inc. | 
| 2 |  | // | 
| 3 |  | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except | 
| 4 |  | // in compliance with the License.  You may obtain a copy of the License at | 
| 5 |  | // | 
| 6 |  | // http://www.apache.org/licenses/LICENSE-2.0 | 
| 7 |  | // | 
| 8 |  | // Unless required by applicable law or agreed to in writing, software distributed under the License | 
| 9 |  | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express | 
| 10 |  | // or implied.  See the License for the specific language governing permissions and limitations | 
| 11 |  | // under the License. | 
| 12 |  |  | 
| 13 |  | #include "yb/consensus/multi_raft_batcher.h" | 
| 14 |  | #include <memory> | 
| 15 |  |  | 
| 16 |  | #include "yb/common/wire_protocol.h" | 
| 17 |  |  | 
| 18 |  | #include "yb/consensus/consensus_meta.h" | 
| 19 |  | #include "yb/consensus/consensus.proxy.h" | 
| 20 |  |  | 
| 21 |  | #include "yb/rpc/periodic.h" | 
| 22 |  |  | 
| 23 |  | #include "yb/util/flag_tags.h" | 
| 24 |  |  | 
| 25 |  | using namespace std::literals; | 
| 26 |  | using namespace std::placeholders; | 
| 27 |  |  | 
| 28 |  | // NOTE: For tests set this value to ~10ms | 
| 29 |  | DEFINE_int32(multi_raft_heartbeat_interval_ms, 10, | 
| 30 |  |              "The heartbeat interval for batch Raft replication."); | 
| 31 |  | TAG_FLAG(multi_raft_heartbeat_interval_ms, experimental); | 
| 32 |  | TAG_FLAG(multi_raft_heartbeat_interval_ms, hidden); | 
| 33 |  |  | 
| 34 |  | // TODO: Add MultiRaftUpdateConsensus to metrics.yml when flag is turned on | 
| 35 |  | DEFINE_bool(enable_multi_raft_heartbeat_batcher, false, | 
| 36 |  |             "Whether to enable the batching of raft heartbeats."); | 
| 37 |  | TAG_FLAG(enable_multi_raft_heartbeat_batcher, experimental); | 
| 38 |  | TAG_FLAG(enable_multi_raft_heartbeat_batcher, hidden); | 
| 39 |  |  | 
| 40 |  | // NOTE: For tests set this value to 1 | 
| 41 |  | DEFINE_uint64(multi_raft_batch_size, 1, | 
| 42 |  |               "Maximum batch size for a multi-raft consensus payload."); | 
| 43 |  | TAG_FLAG(multi_raft_batch_size, experimental); | 
| 44 |  | TAG_FLAG(multi_raft_batch_size, hidden); | 
| 45 |  |  | 
| 46 |  | DECLARE_int32(consensus_rpc_timeout_ms); | 
| 47 |  |  | 
| 48 |  | namespace yb { | 
| 49 |  | namespace consensus { | 
| 50 |  |  | 
| 51 |  | using rpc::PeriodicTimer; | 
| 52 |  |  | 
| 53 |  | struct MultiRaftHeartbeatBatcher::MultiRaftConsensusData { | 
| 54 |  |   MultiRaftConsensusRequestPB batch_req; | 
| 55 |  |   MultiRaftConsensusResponsePB batch_res; | 
| 56 |  |   rpc::RpcController controller; | 
| 57 |  |   std::vector<ResponseCallbackData> response_callback_data; | 
| 58 |  | }; | 
| 59 |  |  | 
| 60 |  | MultiRaftHeartbeatBatcher::MultiRaftHeartbeatBatcher(const yb::HostPort& hostport, | 
| 61 |  |                                                      rpc::ProxyCache* proxy_cache, | 
| 62 |  |                                                      rpc::Messenger* messenger): | 
| 63 |  |     messenger_(messenger), | 
| 64 |  |     consensus_proxy_(std::make_unique<ConsensusServiceProxy>(proxy_cache, hostport)), | 
| 65 | 0 |     current_batch_(std::make_shared<MultiRaftConsensusData>()) {} | 
| 66 |  |  | 
| 67 | 0 | void MultiRaftHeartbeatBatcher::Start() { | 
| 68 | 0 |   std::weak_ptr<MultiRaftHeartbeatBatcher> weak_peer = shared_from_this(); | 
| 69 | 0 |   batch_sender_ = PeriodicTimer::Create( | 
| 70 | 0 |     messenger_, | 
| 71 | 0 |     [weak_peer]() { | 
| 72 | 0 |       if (auto peer = weak_peer.lock()) { | 
| 73 | 0 |         peer->PrepareAndSendBatchRequest(); | 
| 74 | 0 |       } | 
| 75 | 0 |     }, | 
| 76 | 0 |     MonoDelta::FromMilliseconds(FLAGS_multi_raft_heartbeat_interval_ms)); | 
| 77 | 0 |   batch_sender_->Start(); | 
| 78 | 0 | } | 
| 79 |  |  | 
| 80 | 0 | MultiRaftHeartbeatBatcher::~MultiRaftHeartbeatBatcher() = default; | 
| 81 |  |  | 
| 82 |  | void MultiRaftHeartbeatBatcher::AddRequestToBatch(ConsensusRequestPB* request, | 
| 83 |  |                                                   ConsensusResponsePB* response, | 
| 84 | 0 |                                                   HeartbeatResponseCallback callback) { | 
| 85 | 0 |   std::shared_ptr<MultiRaftConsensusData> data = nullptr; | 
| 86 | 0 |   { | 
| 87 | 0 |     std::lock_guard<std::mutex> lock(mutex_); | 
| 88 | 0 |     current_batch_->response_callback_data.push_back({ | 
| 89 | 0 |       response, | 
| 90 | 0 |       std::move(callback) | 
| 91 | 0 |     }); | 
| 92 |  |     // Add a ConsensusRequestPB to the batch | 
| 93 | 0 |     current_batch_->batch_req.add_consensus_request()->Swap(request); | 
| 94 | 0 |     if (FLAGS_multi_raft_batch_size > 0 | 
| 95 | 0 |         && current_batch_->response_callback_data.size() >= FLAGS_multi_raft_batch_size) { | 
| 96 | 0 |       data = PrepareNextBatchRequest(); | 
| 97 | 0 |     } | 
| 98 | 0 |   } | 
| 99 | 0 |   if (data) { | 
| 100 | 0 |     SendBatchRequest(data); | 
| 101 | 0 |   } | 
| 102 | 0 | } | 
| 103 |  |  | 
| 104 | 0 | void MultiRaftHeartbeatBatcher::PrepareAndSendBatchRequest() { | 
| 105 | 0 |   std::shared_ptr<MultiRaftConsensusData> data; | 
| 106 | 0 |   { | 
| 107 | 0 |     std::lock_guard<std::mutex> lock(mutex_); | 
| 108 | 0 |     data = PrepareNextBatchRequest(); | 
| 109 | 0 |   } | 
| 110 | 0 |   SendBatchRequest(data); | 
| 111 | 0 | } | 
| 112 |  |  | 
| 113 |  | std::shared_ptr<MultiRaftHeartbeatBatcher::MultiRaftConsensusData> | 
| 114 | 0 |     MultiRaftHeartbeatBatcher::PrepareNextBatchRequest() { | 
| 115 | 0 |   if (current_batch_->batch_req.consensus_request_size() == 0) { | 
| 116 | 0 |     return nullptr; | 
| 117 | 0 |   } | 
| 118 | 0 |   batch_sender_->Snooze(); | 
| 119 | 0 |   auto data = std::move(current_batch_); | 
| 120 | 0 |   current_batch_ = std::make_shared<MultiRaftConsensusData>(); | 
| 121 | 0 |   return data; | 
| 122 | 0 | } | 
| 123 |  |  | 
| 124 | 0 | void MultiRaftHeartbeatBatcher::SendBatchRequest(std::shared_ptr<MultiRaftConsensusData> data) { | 
| 125 | 0 |   if (!data) { | 
| 126 | 0 |     return; | 
| 127 | 0 |   } | 
| 128 |  |  | 
| 129 | 0 |   data->controller.Reset(); | 
| 130 | 0 |   data->controller.set_timeout(MonoDelta::FromMilliseconds( | 
| 131 | 0 |     FLAGS_consensus_rpc_timeout_ms * data->batch_req.consensus_request_size())); | 
| 132 | 0 |   consensus_proxy_->MultiRaftUpdateConsensusAsync( | 
| 133 | 0 |     data->batch_req, &data->batch_res, &data->controller, | 
| 134 | 0 |     std::bind(&MultiRaftHeartbeatBatcher::MultiRaftUpdateHeartbeatResponseCallback, | 
| 135 | 0 |               shared_from_this(), data)); | 
| 136 | 0 | } | 
| 137 |  |  | 
| 138 |  | void MultiRaftHeartbeatBatcher::MultiRaftUpdateHeartbeatResponseCallback( | 
| 139 | 0 |     std::shared_ptr<MultiRaftConsensusData> data) { | 
| 140 | 0 |   auto status = data->controller.status(); | 
| 141 | 0 |   for (int i = 0; i < data->batch_req.consensus_request_size(); i++) { | 
| 142 | 0 |     auto callback_data = data->response_callback_data[i]; | 
| 143 | 0 |     if (status.ok()) { | 
| 144 | 0 |       callback_data.resp->Swap(data->batch_res.mutable_consensus_response(i)); | 
| 145 | 0 |     } | 
| 146 | 0 |     callback_data.callback(status); | 
| 147 | 0 |   } | 
| 148 | 0 | } | 
| 149 |  |  | 
| 150 |  | MultiRaftManager::MultiRaftManager(rpc::Messenger* messenger, | 
| 151 |  |                                    rpc::ProxyCache* proxy_cache, | 
| 152 |  |                                    CloudInfoPB local_peer_cloud_info_pb): | 
| 153 |  |     messenger_(messenger), proxy_cache_(proxy_cache), | 
| 154 | 11.1k |     local_peer_cloud_info_pb_(std::move(local_peer_cloud_info_pb)) {} | 
| 155 |  |  | 
| 156 | 69.5k | MultiRaftHeartbeatBatcherPtr MultiRaftManager::AddOrGetBatcher(const RaftPeerPB& remote_peer_pb) { | 
| 157 | 69.5k |   if (!FLAGS_enable_multi_raft_heartbeat_batcher) { | 
| 158 | 69.5k |     return nullptr; | 
| 159 | 69.5k |   } | 
| 160 |  |  | 
| 161 | 7 |   auto hostport = HostPortFromPB(DesiredHostPort(remote_peer_pb, local_peer_cloud_info_pb_)); | 
| 162 | 7 |   std::lock_guard<std::mutex> lock(mutex_); | 
| 163 | 7 |   MultiRaftHeartbeatBatcherPtr batcher; | 
| 164 |  |  | 
| 165 |  |   // After taking the lock, check if there is already a batcher | 
| 166 |  |   // for the same remote host and return it. | 
| 167 | 7 |   auto res = batchers_.find(hostport); | 
| 168 | 7 |   if (res != batchers_.end() && (batcher = res->second.lock())) { | 
| 169 | 0 |     return batcher; | 
| 170 | 0 |   } | 
| 171 | 7 |   batcher = std::make_shared<MultiRaftHeartbeatBatcher>(hostport, proxy_cache_, messenger_); | 
| 172 | 7 |   batchers_[hostport] = batcher; | 
| 173 | 7 |   batcher->Start(); | 
| 174 | 7 |   return batcher; | 
| 175 | 7 | } | 
| 176 |  |  | 
| 177 |  | }  // namespace consensus | 
| 178 |  | }  // namespace yb |