YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/consensus/multi_raft_batcher.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
13
#include "yb/consensus/multi_raft_batcher.h"
14
#include <memory>
15
16
#include "yb/common/wire_protocol.h"
17
18
#include "yb/consensus/consensus_meta.h"
19
#include "yb/consensus/consensus.proxy.h"
20
21
#include "yb/rpc/periodic.h"
22
23
#include "yb/util/flag_tags.h"
24
25
using namespace std::literals;
26
using namespace std::placeholders;
27
28
// NOTE: For tests set this value to ~10ms
29
DEFINE_int32(multi_raft_heartbeat_interval_ms, 10,
30
             "The heartbeat interval for batch Raft replication.");
31
TAG_FLAG(multi_raft_heartbeat_interval_ms, experimental);
32
TAG_FLAG(multi_raft_heartbeat_interval_ms, hidden);
33
34
// TODO: Add MultiRaftUpdateConsensus to metrics.yml when flag is turned on
35
DEFINE_bool(enable_multi_raft_heartbeat_batcher, false,
36
            "Whether to enable the batching of raft heartbeats.");
37
TAG_FLAG(enable_multi_raft_heartbeat_batcher, experimental);
38
TAG_FLAG(enable_multi_raft_heartbeat_batcher, hidden);
39
40
// NOTE: For tests set this value to 1
41
DEFINE_uint64(multi_raft_batch_size, 1,
42
              "Maximum batch size for a multi-raft consensus payload.");
43
TAG_FLAG(multi_raft_batch_size, experimental);
44
TAG_FLAG(multi_raft_batch_size, hidden);
45
46
DECLARE_int32(consensus_rpc_timeout_ms);
47
48
namespace yb {
49
namespace consensus {
50
51
using rpc::PeriodicTimer;
52
53
struct MultiRaftHeartbeatBatcher::MultiRaftConsensusData {
54
  MultiRaftConsensusRequestPB batch_req;
55
  MultiRaftConsensusResponsePB batch_res;
56
  rpc::RpcController controller;
57
  std::vector<ResponseCallbackData> response_callback_data;
58
};
59
60
MultiRaftHeartbeatBatcher::MultiRaftHeartbeatBatcher(const yb::HostPort& hostport,
61
                                                     rpc::ProxyCache* proxy_cache,
62
                                                     rpc::Messenger* messenger):
63
    messenger_(messenger),
64
    consensus_proxy_(std::make_unique<ConsensusServiceProxy>(proxy_cache, hostport)),
65
0
    current_batch_(std::make_shared<MultiRaftConsensusData>()) {}
66
67
0
void MultiRaftHeartbeatBatcher::Start() {
68
0
  std::weak_ptr<MultiRaftHeartbeatBatcher> weak_peer = shared_from_this();
69
0
  batch_sender_ = PeriodicTimer::Create(
70
0
    messenger_,
71
0
    [weak_peer]() {
72
0
      if (auto peer = weak_peer.lock()) {
73
0
        peer->PrepareAndSendBatchRequest();
74
0
      }
75
0
    },
76
0
    MonoDelta::FromMilliseconds(FLAGS_multi_raft_heartbeat_interval_ms));
77
0
  batch_sender_->Start();
78
0
}
79
80
0
MultiRaftHeartbeatBatcher::~MultiRaftHeartbeatBatcher() = default;
81
82
void MultiRaftHeartbeatBatcher::AddRequestToBatch(ConsensusRequestPB* request,
83
                                                  ConsensusResponsePB* response,
84
0
                                                  HeartbeatResponseCallback callback) {
85
0
  std::shared_ptr<MultiRaftConsensusData> data = nullptr;
86
0
  {
87
0
    std::lock_guard<std::mutex> lock(mutex_);
88
0
    current_batch_->response_callback_data.push_back({
89
0
      response,
90
0
      std::move(callback)
91
0
    });
92
    // Add a ConsensusRequestPB to the batch
93
0
    current_batch_->batch_req.add_consensus_request()->Swap(request);
94
0
    if (FLAGS_multi_raft_batch_size > 0
95
0
        && current_batch_->response_callback_data.size() >= FLAGS_multi_raft_batch_size) {
96
0
      data = PrepareNextBatchRequest();
97
0
    }
98
0
  }
99
0
  if (data) {
100
0
    SendBatchRequest(data);
101
0
  }
102
0
}
103
104
0
void MultiRaftHeartbeatBatcher::PrepareAndSendBatchRequest() {
105
0
  std::shared_ptr<MultiRaftConsensusData> data;
106
0
  {
107
0
    std::lock_guard<std::mutex> lock(mutex_);
108
0
    data = PrepareNextBatchRequest();
109
0
  }
110
0
  SendBatchRequest(data);
111
0
}
112
113
std::shared_ptr<MultiRaftHeartbeatBatcher::MultiRaftConsensusData>
114
0
    MultiRaftHeartbeatBatcher::PrepareNextBatchRequest() {
115
0
  if (current_batch_->batch_req.consensus_request_size() == 0) {
116
0
    return nullptr;
117
0
  }
118
0
  batch_sender_->Snooze();
119
0
  auto data = std::move(current_batch_);
120
0
  current_batch_ = std::make_shared<MultiRaftConsensusData>();
121
0
  return data;
122
0
}
123
124
0
void MultiRaftHeartbeatBatcher::SendBatchRequest(std::shared_ptr<MultiRaftConsensusData> data) {
125
0
  if (!data) {
126
0
    return;
127
0
  }
128
129
0
  data->controller.Reset();
130
0
  data->controller.set_timeout(MonoDelta::FromMilliseconds(
131
0
    FLAGS_consensus_rpc_timeout_ms * data->batch_req.consensus_request_size()));
132
0
  consensus_proxy_->MultiRaftUpdateConsensusAsync(
133
0
    data->batch_req, &data->batch_res, &data->controller,
134
0
    std::bind(&MultiRaftHeartbeatBatcher::MultiRaftUpdateHeartbeatResponseCallback,
135
0
              shared_from_this(), data));
136
0
}
137
138
void MultiRaftHeartbeatBatcher::MultiRaftUpdateHeartbeatResponseCallback(
139
0
    std::shared_ptr<MultiRaftConsensusData> data) {
140
0
  auto status = data->controller.status();
141
0
  for (int i = 0; i < data->batch_req.consensus_request_size(); i++) {
142
0
    auto callback_data = data->response_callback_data[i];
143
0
    if (status.ok()) {
144
0
      callback_data.resp->Swap(data->batch_res.mutable_consensus_response(i));
145
0
    }
146
0
    callback_data.callback(status);
147
0
  }
148
0
}
149
150
MultiRaftManager::MultiRaftManager(rpc::Messenger* messenger,
151
                                   rpc::ProxyCache* proxy_cache,
152
                                   CloudInfoPB local_peer_cloud_info_pb):
153
    messenger_(messenger), proxy_cache_(proxy_cache),
154
16.7k
    local_peer_cloud_info_pb_(std::move(local_peer_cloud_info_pb)) {}
155
156
123k
MultiRaftHeartbeatBatcherPtr MultiRaftManager::AddOrGetBatcher(const RaftPeerPB& remote_peer_pb) {
157
123k
  if (!FLAGS_enable_multi_raft_heartbeat_batcher) {
158
123k
    return nullptr;
159
123k
  }
160
161
24
  auto hostport = HostPortFromPB(DesiredHostPort(remote_peer_pb, local_peer_cloud_info_pb_));
162
24
  std::lock_guard<std::mutex> lock(mutex_);
163
24
  MultiRaftHeartbeatBatcherPtr batcher;
164
165
  // After taking the lock, check if there is already a batcher
166
  // for the same remote host and return it.
167
24
  auto res = batchers_.find(hostport);
168
24
  if (res != batchers_.end() && 
(batcher = res->second.lock())0
) {
169
0
    return batcher;
170
0
  }
171
24
  batcher = std::make_shared<MultiRaftHeartbeatBatcher>(hostport, proxy_cache_, messenger_);
172
24
  batchers_[hostport] = batcher;
173
24
  batcher->Start();
174
24
  return batcher;
175
24
}
176
177
}  // namespace consensus
178
}  // namespace yb