/Users/deen/code/yugabyte-db/src/yb/consensus/leader_election.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | // |
18 | | // The following only applies to changes made to this file as part of YugaByte development. |
19 | | // |
20 | | // Portions Copyright (c) YugaByte, Inc. |
21 | | // |
22 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
23 | | // in compliance with the License. You may obtain a copy of the License at |
24 | | // |
25 | | // http://www.apache.org/licenses/LICENSE-2.0 |
26 | | // |
27 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
28 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
29 | | // or implied. See the License for the specific language governing permissions and limitations |
30 | | // under the License. |
31 | | // |
32 | | |
33 | | #include "yb/consensus/leader_election.h" |
34 | | |
35 | | #include <functional> |
36 | | #include <mutex> |
37 | | |
38 | | #include <glog/logging.h> |
39 | | |
40 | | #include "yb/common/wire_protocol.h" |
41 | | |
42 | | #include "yb/consensus/consensus_peers.h" |
43 | | #include "yb/consensus/metadata.pb.h" |
44 | | |
45 | | #include "yb/gutil/map-util.h" |
46 | | #include "yb/gutil/port.h" |
47 | | #include "yb/gutil/strings/join.h" |
48 | | |
49 | | #include "yb/rpc/rpc_controller.h" |
50 | | |
51 | | #include "yb/util/format.h" |
52 | | #include "yb/util/logging.h" |
53 | | #include "yb/util/net/net_util.h" |
54 | | #include "yb/util/status.h" |
55 | | #include "yb/util/status_format.h" |
56 | | |
57 | | using namespace std::literals; |
58 | | |
59 | | namespace yb { |
60 | | namespace consensus { |
61 | | |
62 | | using std::string; |
63 | | using strings::Substitute; |
64 | | |
65 | | /////////////////////////////////////////////////// |
66 | | // VoteCounter |
67 | | /////////////////////////////////////////////////// |
68 | | |
69 | | VoteCounter::VoteCounter(size_t num_voters, size_t majority_size) |
70 | | : num_voters_(num_voters), |
71 | 76.1k | majority_size_(majority_size) { |
72 | 76.1k | CHECK_LE(majority_size, num_voters); |
73 | 76.1k | CHECK_GT(num_voters_, 0); |
74 | 76.1k | CHECK_GT(majority_size_, 0); |
75 | 76.1k | } |
76 | | |
77 | | Status VoteCounter::RegisterVote(const std::string& voter_uuid, ElectionVote vote, |
78 | 164k | bool* is_duplicate) { |
79 | | // Handle repeated votes. |
80 | 164k | if (PREDICT_FALSE(ContainsKey(votes_, voter_uuid))) { |
81 | | // Detect changed votes. |
82 | 2 | ElectionVote prior_vote = votes_[voter_uuid]; |
83 | 2 | if (PREDICT_FALSE(prior_vote != vote)) { |
84 | 1 | return STATUS_FORMAT( |
85 | 1 | InvalidArgument, |
86 | 1 | "Peer $0 voted a different way twice in the same election. " |
87 | 1 | "First vote: $1, second vote: $2.", |
88 | 1 | voter_uuid, prior_vote, vote); |
89 | 1 | } |
90 | | |
91 | | // This was just a duplicate. Allow the caller to log it but don't change |
92 | | // the voting record. |
93 | 1 | *is_duplicate = true; |
94 | 1 | return Status::OK(); |
95 | 1 | } |
96 | | |
97 | | // Sanity check to ensure we did not exceed the allowed number of voters. |
98 | 164k | if (PREDICT_FALSE(yes_votes_ + no_votes_ == num_voters_)) { |
99 | | // More unique voters than allowed! |
100 | 1 | return STATUS(InvalidArgument, Substitute( |
101 | 1 | "Vote from peer $0 would cause the number of votes to exceed the expected number of " |
102 | 1 | "voters, which is $1. Votes already received from the following peers: {$2}", |
103 | 1 | voter_uuid, |
104 | 1 | num_voters_, |
105 | 1 | JoinKeysIterator(votes_.begin(), votes_.end(), ", "))); |
106 | 1 | } |
107 | | |
108 | | // This is a valid vote, so store it. |
109 | 164k | InsertOrDie(&votes_, voter_uuid, vote); |
110 | 164k | switch (vote) { |
111 | 145k | case ElectionVote::kGranted: |
112 | 145k | ++yes_votes_; |
113 | 145k | break; |
114 | 18.9k | case ElectionVote::kDenied: |
115 | 18.9k | ++no_votes_; |
116 | 18.9k | break; |
117 | 0 | case ElectionVote::kUnknown: |
118 | 0 | return STATUS_FORMAT(InvalidArgument, "Invalid vote: $0", vote); |
119 | 164k | } |
120 | 164k | *is_duplicate = false; |
121 | 164k | return Status::OK(); |
122 | 164k | } |
123 | | |
124 | 164k | ElectionVote VoteCounter::GetDecision() const { |
125 | 164k | if (yes_votes_ >= majority_size_) { |
126 | 72.5k | return ElectionVote::kGranted; |
127 | 72.5k | } |
128 | 92.4k | if (no_votes_ > num_voters_ - majority_size_) { |
129 | 4.68k | return ElectionVote::kDenied; |
130 | 4.68k | } |
131 | 87.7k | return ElectionVote::kUnknown; |
132 | 87.7k | } |
133 | | |
134 | 152k | size_t VoteCounter::GetTotalVotesCounted() const { |
135 | 152k | return yes_votes_ + no_votes_; |
136 | 152k | } |
137 | | |
138 | 20 | bool VoteCounter::AreAllVotesIn() const { |
139 | 20 | return GetTotalVotesCounted() == num_voters_; |
140 | 20 | } |
141 | | |
142 | | /////////////////////////////////////////////////// |
143 | | // LeaderElection |
144 | | /////////////////////////////////////////////////// |
145 | | |
146 | | LeaderElection::LeaderElection(const RaftConfigPB& config, |
147 | | PeerProxyFactory* proxy_factory, |
148 | | const VoteRequestPB& request, |
149 | | std::unique_ptr<VoteCounter> vote_counter, |
150 | | MonoDelta timeout, |
151 | | PreElection preelection, |
152 | | TEST_SuppressVoteRequest suppress_vote_request, |
153 | | ElectionDecisionCallback decision_callback) |
154 | | : request_(request), |
155 | | result_(preelection, request.candidate_term()), |
156 | | vote_counter_(std::move(vote_counter)), |
157 | | timeout_(timeout), |
158 | | suppress_vote_request_(suppress_vote_request), |
159 | 76.0k | decision_callback_(std::move(decision_callback)) { |
160 | 224k | for (const RaftPeerPB& peer : config.peers()) { |
161 | 224k | if (request.candidate_uuid() == peer.permanent_uuid()) continue; |
162 | | // Only peers with member_type == VOTER are allowed to vote. |
163 | 148k | if (peer.member_type() != PeerMemberType::VOTER) { |
164 | 2.29k | LOG(INFO) << "Ignoring peer " << peer.permanent_uuid() << " vote because its member type is " |
165 | 2.29k | << PeerMemberType_Name(peer.member_type()); |
166 | 2.29k | continue; |
167 | 2.29k | } |
168 | | |
169 | 146k | voting_follower_uuids_.push_back(peer.permanent_uuid()); |
170 | | |
171 | 146k | auto state = std::make_unique<VoterState>(); |
172 | 146k | state->proxy = proxy_factory->NewProxy(peer); |
173 | 146k | CHECK(voter_state_.emplace(peer.permanent_uuid(), std::move(state)).second); |
174 | 146k | } |
175 | | |
176 | | // Ensure that the candidate has already voted for itself. |
177 | 0 | CHECK_EQ(1, vote_counter_->GetTotalVotesCounted()) << "Candidate must vote for itself first"; |
178 | | |
179 | | // Ensure that existing votes + future votes add up to the expected total. |
180 | 0 | CHECK_EQ(vote_counter_->GetTotalVotesCounted() + voting_follower_uuids_.size(), |
181 | 0 | vote_counter_->GetTotalExpectedVotes()) |
182 | 0 | << "Expected different number of followers. Follower UUIDs: [" |
183 | 0 | << yb::ToString(voting_follower_uuids_) |
184 | 0 | << "]; RaftConfig: {" << config.ShortDebugString() << "}"; |
185 | 76.0k | } |
186 | | |
187 | 76.1k | LeaderElection::~LeaderElection() { |
188 | 76.1k | std::lock_guard<Lock> guard(lock_); |
189 | 76.1k | DCHECK(has_responded_); // We must always call the callback exactly once. |
190 | 76.1k | voter_state_.clear(); |
191 | 76.1k | } |
192 | | |
193 | 76.0k | void LeaderElection::Run() { |
194 | 18.4E | VLOG_WITH_PREFIX(1) << "Running leader election."; |
195 | | |
196 | | // Check if we have already won the election (relevant if this is a |
197 | | // single-node configuration, since we always pre-vote for ourselves). |
198 | 76.0k | CheckForDecision(); |
199 | | |
200 | | // The rest of the code below is for a typical multi-node configuration. |
201 | 146k | for (const std::string& voter_uuid : voting_follower_uuids_) { |
202 | 146k | VoterState* state = nullptr; |
203 | 146k | { |
204 | 146k | std::lock_guard<Lock> guard(lock_); |
205 | 146k | if (result_.decided()) { // Already have result. |
206 | 43 | break; |
207 | 43 | } |
208 | 146k | auto it = voter_state_.find(voter_uuid); |
209 | 146k | CHECK(it != voter_state_.end()); |
210 | 146k | state = it->second.get(); |
211 | | // Safe to drop the lock because voter_state_ is not mutated outside of |
212 | | // the constructor / destructor. We do this to avoid deadlocks below. |
213 | 146k | } |
214 | | |
215 | | // Send the RPC request. |
216 | 146k | LOG_WITH_PREFIX(INFO) << "Requesting vote from peer " << voter_uuid; |
217 | 146k | state->rpc.set_timeout(timeout_); |
218 | | |
219 | 146k | state->request = request_; |
220 | 146k | state->request.set_dest_uuid(voter_uuid); |
221 | | |
222 | 146k | LeaderElectionPtr retained_self = this; |
223 | 146k | if (!suppress_vote_request_) { |
224 | 146k | state->rpc.set_invoke_callback_mode(rpc::InvokeCallbackMode::kThreadPoolHigh); |
225 | 146k | state->proxy->RequestConsensusVoteAsync( |
226 | 146k | &state->request, &state->response, &state->rpc, |
227 | 146k | std::bind(&LeaderElection::VoteResponseRpcCallback, this, voter_uuid, retained_self)); |
228 | 18.4E | } else { |
229 | 18.4E | state->response.set_responder_uuid(voter_uuid); |
230 | 18.4E | VoteResponseRpcCallback(voter_uuid, retained_self); |
231 | 18.4E | } |
232 | 146k | } |
233 | 76.0k | } |
234 | | |
235 | 164k | void LeaderElection::CheckForDecision() { |
236 | 164k | bool to_respond = false; |
237 | 164k | { |
238 | 164k | std::lock_guard<Lock> guard(lock_); |
239 | | // Check if the vote has been newly decided. |
240 | 164k | auto decision = vote_counter_->GetDecision(); |
241 | 164k | if (!result_.decided() && decision != ElectionVote::kUnknown) { |
242 | 76.0k | LOG_WITH_PREFIX(INFO) << "Election decided. Result: candidate " |
243 | 71.3k | << ((decision == ElectionVote::kGranted) ? "won." : "lost."); |
244 | 76.0k | result_.decision = decision; |
245 | 76.0k | } |
246 | | // Check whether to respond. This can happen as a result of either getting |
247 | | // a majority vote or of something invalidating the election, like |
248 | | // observing a higher term. |
249 | 164k | if (result_.decided() && !has_responded_) { |
250 | 76.1k | has_responded_ = true; |
251 | 76.1k | to_respond = true; |
252 | 76.1k | } |
253 | 164k | } |
254 | | |
255 | | // Respond outside of the lock. |
256 | 164k | if (to_respond) { |
257 | | // This is thread-safe since result_ is write-once. |
258 | 76.1k | decision_callback_(result_); |
259 | 76.1k | } |
260 | 164k | } |
261 | | |
262 | | void LeaderElection::VoteResponseRpcCallback(const std::string& voter_uuid, |
263 | 145k | const LeaderElectionPtr& self) { |
264 | 145k | { |
265 | 145k | std::lock_guard<Lock> guard(lock_); |
266 | | |
267 | 145k | if (has_responded_) { |
268 | 57.1k | return; |
269 | 57.1k | } |
270 | | |
271 | 88.7k | auto it = voter_state_.find(voter_uuid); |
272 | 88.7k | CHECK(it != voter_state_.end()); |
273 | 88.7k | VoterState* state = it->second.get(); |
274 | | |
275 | | // Check for RPC errors. |
276 | 88.7k | if (!state->rpc.status().ok()) { |
277 | 1.44k | LOG_WITH_PREFIX(WARNING) << "RPC error from VoteRequest() call to peer " << voter_uuid |
278 | 1.44k | << ": " << state->rpc.status().ToString(); |
279 | 1.44k | RecordVoteUnlocked(voter_uuid, ElectionVote::kDenied); |
280 | | |
281 | | // Check for tablet errors. |
282 | 87.3k | } else if (state->response.has_error()) { |
283 | 16.6k | LOG_WITH_PREFIX(WARNING) << "Tablet error from VoteRequest() call to peer " |
284 | 16.6k | << voter_uuid << ": " |
285 | 16.6k | << StatusFromPB(state->response.error().status()).ToString(); |
286 | 16.6k | RecordVoteUnlocked(voter_uuid, ElectionVote::kDenied); |
287 | | |
288 | | // If the peer changed their IP address, we shouldn't count this vote since |
289 | | // our knowledge of the configuration is in an inconsistent state. |
290 | 70.7k | } else if (PREDICT_FALSE(voter_uuid != state->response.responder_uuid())) { |
291 | 0 | LOG_WITH_PREFIX(DFATAL) << "Received vote response from peer we thought had UUID " |
292 | 0 | << voter_uuid << ", but its actual UUID is " << state->response.responder_uuid(); |
293 | 0 | RecordVoteUnlocked(voter_uuid, ElectionVote::kDenied); |
294 | | |
295 | | // Node does not support preelection |
296 | 70.7k | } else if (result_.preelection && !state->response.preelection()) { |
297 | 0 | result_.preelections_not_supported_by_uuid = voter_uuid; |
298 | 0 | HandleVoteDeniedUnlocked(voter_uuid, *state); |
299 | | |
300 | | // Count the granted votes. |
301 | 70.7k | } else if (state->response.vote_granted()) { |
302 | 69.7k | HandleVoteGrantedUnlocked(voter_uuid, *state); |
303 | | |
304 | | // Anything else is a denied vote. |
305 | 955 | } else { |
306 | 955 | HandleVoteDeniedUnlocked(voter_uuid, *state); |
307 | 955 | } |
308 | 88.7k | } |
309 | | |
310 | | // Check for a decision outside the lock. |
311 | 88.7k | CheckForDecision(); |
312 | 88.7k | } |
313 | | |
314 | 88.7k | void LeaderElection::RecordVoteUnlocked(const std::string& voter_uuid, ElectionVote vote) { |
315 | 88.7k | DCHECK(lock_.is_locked()); |
316 | | |
317 | | // Record the vote. |
318 | 88.7k | bool duplicate; |
319 | 88.7k | Status s = vote_counter_->RegisterVote(voter_uuid, vote, &duplicate); |
320 | 88.7k | if (!s.ok()) { |
321 | 0 | LOG_WITH_PREFIX(WARNING) << "Error registering vote for peer " << voter_uuid |
322 | 0 | << ": " << s.ToString(); |
323 | 0 | return; |
324 | 0 | } |
325 | 88.7k | if (duplicate) { |
326 | | // Note: This is DFATAL because at the time of writing we do not support |
327 | | // retrying vote requests, so this should be impossible. It may be valid to |
328 | | // receive duplicate votes in the future if we implement retry. |
329 | 0 | LOG_WITH_PREFIX(DFATAL) << "Duplicate vote received from peer " << voter_uuid; |
330 | 0 | } |
331 | 88.7k | } |
332 | | |
333 | 109 | void LeaderElection::HandleHigherTermUnlocked(const string& voter_uuid, const VoterState& state) { |
334 | 109 | DCHECK(lock_.is_locked()); |
335 | 109 | DCHECK_GT(state.response.responder_term(), consensus_term()); |
336 | | |
337 | 109 | string msg = Substitute("Vote denied by peer $0 with higher term. Message: $1", |
338 | 109 | state.response.responder_uuid(), |
339 | 109 | StatusFromPB(state.response.consensus_error().status()).ToString()); |
340 | 109 | LOG_WITH_PREFIX(WARNING) << msg; |
341 | | |
342 | 109 | if (!result_.decided()) { |
343 | 109 | LOG_WITH_PREFIX(INFO) << "Cancelling election due to peer responding with higher term"; |
344 | 109 | result_.decision = ElectionVote::kDenied; |
345 | 109 | result_.higher_term = state.response.responder_term(); |
346 | 109 | result_.message = msg; |
347 | 109 | } |
348 | 109 | } |
349 | | |
350 | 69.7k | void LeaderElection::HandleVoteGrantedUnlocked(const string& voter_uuid, const VoterState& state) { |
351 | 69.7k | DCHECK(lock_.is_locked()); |
352 | 69.7k | DCHECK_EQ(state.response.responder_term(), election_term()); |
353 | 69.7k | DCHECK(state.response.vote_granted()); |
354 | 69.7k | if (state.response.has_remaining_leader_lease_duration_ms()) { |
355 | 4.30k | CoarseTimeLease lease( |
356 | 4.30k | state.response.leader_lease_uuid(), |
357 | 4.30k | CoarseMonoClock::Now() + state.response.remaining_leader_lease_duration_ms() * 1ms); |
358 | 4.30k | result_.old_leader_lease.TryUpdate(lease); |
359 | 4.30k | } |
360 | | |
361 | 69.7k | if (state.response.has_leader_ht_lease_expiration()) { |
362 | 5.17k | PhysicalComponentLease lease( |
363 | 5.17k | state.response.leader_ht_lease_uuid(), state.response.leader_ht_lease_expiration()); |
364 | 5.17k | result_.old_leader_ht_lease.TryUpdate(lease); |
365 | 5.17k | } |
366 | | |
367 | 69.7k | LOG_WITH_PREFIX(INFO) << "Vote granted by peer " << voter_uuid; |
368 | 69.7k | RecordVoteUnlocked(voter_uuid, ElectionVote::kGranted); |
369 | 69.7k | } |
370 | | |
371 | 991 | void LeaderElection::HandleVoteDeniedUnlocked(const string& voter_uuid, const VoterState& state) { |
372 | 991 | DCHECK(lock_.is_locked()); |
373 | 991 | DCHECK(!state.response.vote_granted()); |
374 | | |
375 | | // If one of the voters responds with a greater term than our own, and we |
376 | | // have not yet triggered the decision callback, it cancels the election. |
377 | 991 | if (state.response.responder_term() > consensus_term()) { |
378 | 109 | return HandleHigherTermUnlocked(voter_uuid, state); |
379 | 109 | } |
380 | | |
381 | 882 | LOG_WITH_PREFIX(INFO) << "Vote denied by peer " << voter_uuid << ". Message: " |
382 | 882 | << StatusFromPB(state.response.consensus_error().status()).ToString(); |
383 | 882 | RecordVoteUnlocked(voter_uuid, ElectionVote::kDenied); |
384 | 882 | } |
385 | | |
386 | 311k | std::string LeaderElection::LogPrefix() const { |
387 | 311k | return Substitute("T $0 P $1 [CANDIDATE]: Term $2 $3election: ", |
388 | 311k | request_.tablet_id(), |
389 | 311k | request_.candidate_uuid(), |
390 | 311k | request_.candidate_term(), |
391 | 169k | (result_.preelection ? "pre-" : "")); |
392 | 311k | } |
393 | | |
394 | | } // namespace consensus |
395 | | } // namespace yb |