YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/consensus/leader_election.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include "yb/consensus/leader_election.h"
34
35
#include <functional>
36
#include <mutex>
37
38
#include <glog/logging.h>
39
40
#include "yb/common/wire_protocol.h"
41
42
#include "yb/consensus/consensus_peers.h"
43
#include "yb/consensus/metadata.pb.h"
44
45
#include "yb/gutil/map-util.h"
46
#include "yb/gutil/port.h"
47
#include "yb/gutil/strings/join.h"
48
49
#include "yb/rpc/rpc_controller.h"
50
51
#include "yb/util/format.h"
52
#include "yb/util/logging.h"
53
#include "yb/util/net/net_util.h"
54
#include "yb/util/status.h"
55
#include "yb/util/status_format.h"
56
57
using namespace std::literals;
58
59
namespace yb {
60
namespace consensus {
61
62
using std::string;
63
using strings::Substitute;
64
65
///////////////////////////////////////////////////
66
// VoteCounter
67
///////////////////////////////////////////////////
68
69
VoteCounter::VoteCounter(size_t num_voters, size_t majority_size)
70
  : num_voters_(num_voters),
71
737k
    majority_size_(majority_size) {
72
737k
  CHECK_LE(majority_size, num_voters);
73
737k
  CHECK_GT(num_voters_, 0);
74
737k
  CHECK_GT(majority_size_, 0);
75
737k
}
76
77
Status VoteCounter::RegisterVote(const std::string& voter_uuid, ElectionVote vote,
78
1.45M
                                 bool* is_duplicate) {
79
  // Handle repeated votes.
80
1.45M
  if (PREDICT_FALSE(ContainsKey(votes_, voter_uuid))) {
81
    // Detect changed votes.
82
2
    ElectionVote prior_vote = votes_[voter_uuid];
83
2
    if (PREDICT_FALSE(prior_vote != vote)) {
84
1
      return STATUS_FORMAT(
85
1
          InvalidArgument,
86
1
          "Peer $0 voted a different way twice in the same election. "
87
1
          "First vote: $1, second vote: $2.",
88
1
          voter_uuid, prior_vote, vote);
89
1
    }
90
91
    // This was just a duplicate. Allow the caller to log it but don't change
92
    // the voting record.
93
1
    *is_duplicate = true;
94
1
    return Status::OK();
95
2
  }
96
97
  // Sanity check to ensure we did not exceed the allowed number of voters.
98
1.45M
  if (PREDICT_FALSE(yes_votes_ + no_votes_ == num_voters_)) {
99
    // More unique voters than allowed!
100
1
    return STATUS(InvalidArgument, Substitute(
101
1
        "Vote from peer $0 would cause the number of votes to exceed the expected number of "
102
1
        "voters, which is $1. Votes already received from the following peers: {$2}",
103
1
        voter_uuid,
104
1
        num_voters_,
105
1
        JoinKeysIterator(votes_.begin(), votes_.end(), ", ")));
106
1
  }
107
108
  // This is a valid vote, so store it.
109
1.45M
  InsertOrDie(&votes_, voter_uuid, vote);
110
1.45M
  switch (vote) {
111
1.36M
    case ElectionVote::kGranted:
112
1.36M
      ++yes_votes_;
113
1.36M
      break;
114
91.8k
    case ElectionVote::kDenied:
115
91.8k
      ++no_votes_;
116
91.8k
      break;
117
0
    case ElectionVote::kUnknown:
118
0
      return STATUS_FORMAT(InvalidArgument, "Invalid vote: $0", vote);
119
1.45M
  }
120
1.45M
  *is_duplicate = false;
121
1.45M
  return Status::OK();
122
1.45M
}
123
124
1.54M
ElectionVote VoteCounter::GetDecision() const {
125
1.54M
  if (yes_votes_ >= majority_size_) {
126
636k
    return ElectionVote::kGranted;
127
636k
  }
128
912k
  if (no_votes_ > num_voters_ - majority_size_) {
129
35.8k
    return ElectionVote::kDenied;
130
35.8k
  }
131
876k
  return ElectionVote::kUnknown;
132
912k
}
133
134
1.47M
size_t VoteCounter::GetTotalVotesCounted() const {
135
1.47M
  return yes_votes_ + no_votes_;
136
1.47M
}
137
138
20
bool VoteCounter::AreAllVotesIn() const {
139
20
  return GetTotalVotesCounted() == num_voters_;
140
20
}
141
142
///////////////////////////////////////////////////
143
// LeaderElection
144
///////////////////////////////////////////////////
145
146
LeaderElection::LeaderElection(const RaftConfigPB& config,
147
                               PeerProxyFactory* proxy_factory,
148
                               const VoteRequestPB& request,
149
                               std::unique_ptr<VoteCounter> vote_counter,
150
                               MonoDelta timeout,
151
                               PreElection preelection,
152
                               TEST_SuppressVoteRequest suppress_vote_request,
153
                               ElectionDecisionCallback decision_callback)
154
    : request_(request),
155
      result_(preelection, request.candidate_term()),
156
      vote_counter_(std::move(vote_counter)),
157
      timeout_(timeout),
158
      suppress_vote_request_(suppress_vote_request),
159
737k
      decision_callback_(std::move(decision_callback)) {
160
2.19M
  for (const RaftPeerPB& peer : config.peers()) {
161
2.19M
    if (request.candidate_uuid() == peer.permanent_uuid()) 
continue737k
;
162
    // Only peers with member_type == VOTER are allowed to vote.
163
1.45M
    if (peer.member_type() != PeerMemberType::VOTER) {
164
3.11k
      LOG(INFO) << "Ignoring peer " << peer.permanent_uuid() << " vote because its member type is "
165
3.11k
                << PeerMemberType_Name(peer.member_type());
166
3.11k
      continue;
167
3.11k
    }
168
169
1.45M
    voting_follower_uuids_.push_back(peer.permanent_uuid());
170
171
1.45M
    auto state = std::make_unique<VoterState>();
172
1.45M
    state->proxy = proxy_factory->NewProxy(peer);
173
1.45M
    CHECK(voter_state_.emplace(peer.permanent_uuid(), std::move(state)).second);
174
1.45M
  }
175
176
  // Ensure that the candidate has already voted for itself.
177
737k
  CHECK_EQ
(1, vote_counter_->GetTotalVotesCounted()) << "Candidate must vote for itself first"0
;
178
179
  // Ensure that existing votes + future votes add up to the expected total.
180
737k
  CHECK_EQ(vote_counter_->GetTotalVotesCounted() + voting_follower_uuids_.size(),
181
0
           vote_counter_->GetTotalExpectedVotes())
182
0
      << "Expected different number of followers. Follower UUIDs: ["
183
0
      << yb::ToString(voting_follower_uuids_)
184
0
      << "]; RaftConfig: {" << config.ShortDebugString() << "}";
185
737k
}
186
187
737k
LeaderElection::~LeaderElection() {
188
737k
  std::lock_guard<Lock> guard(lock_);
189
737k
  DCHECK(has_responded_); // We must always call the callback exactly once.
190
737k
  voter_state_.clear();
191
737k
}
192
193
737k
void LeaderElection::Run() {
194
18.4E
  VLOG_WITH_PREFIX(1) << "Running leader election.";
195
196
  // Check if we have already won the election (relevant if this is a
197
  // single-node configuration, since we always pre-vote for ourselves).
198
737k
  CheckForDecision();
199
200
  // The rest of the code below is for a typical multi-node configuration.
201
1.45M
  for (const std::string& voter_uuid : voting_follower_uuids_) {
202
1.45M
    VoterState* state = nullptr;
203
1.45M
    {
204
1.45M
      std::lock_guard<Lock> guard(lock_);
205
1.45M
      if (result_.decided()) { // Already have result.
206
111
        break;
207
111
      }
208
1.45M
      auto it = voter_state_.find(voter_uuid);
209
1.45M
      CHECK(it != voter_state_.end());
210
1.45M
      state = it->second.get();
211
      // Safe to drop the lock because voter_state_ is not mutated outside of
212
      // the constructor / destructor. We do this to avoid deadlocks below.
213
1.45M
    }
214
215
    // Send the RPC request.
216
1.45M
    LOG_WITH_PREFIX(INFO) << "Requesting vote from peer " << voter_uuid;
217
1.45M
    state->rpc.set_timeout(timeout_);
218
219
1.45M
    state->request = request_;
220
1.45M
    state->request.set_dest_uuid(voter_uuid);
221
222
1.45M
    LeaderElectionPtr retained_self = this;
223
1.45M
    if (
!suppress_vote_request_1.45M
) {
224
1.45M
      state->rpc.set_invoke_callback_mode(rpc::InvokeCallbackMode::kThreadPoolHigh);
225
1.45M
      state->proxy->RequestConsensusVoteAsync(
226
1.45M
          &state->request, &state->response, &state->rpc,
227
1.45M
          std::bind(&LeaderElection::VoteResponseRpcCallback, this, voter_uuid, retained_self));
228
18.4E
    } else {
229
18.4E
      state->response.set_responder_uuid(voter_uuid);
230
18.4E
      VoteResponseRpcCallback(voter_uuid, retained_self);
231
18.4E
    }
232
1.45M
  }
233
737k
}
234
235
1.54M
void LeaderElection::CheckForDecision() {
236
1.54M
  bool to_respond = false;
237
1.54M
  {
238
1.54M
    std::lock_guard<Lock> guard(lock_);
239
    // Check if the vote has been newly decided.
240
1.54M
    auto decision = vote_counter_->GetDecision();
241
1.54M
    if (!result_.decided() && 
decision != ElectionVote::kUnknown1.43M
) {
242
649k
      LOG_WITH_PREFIX(INFO) << "Election decided. Result: candidate "
243
649k
                << ((decision == ElectionVote::kGranted) ? 
"won."614k
:
"lost."35.4k
);
244
649k
      result_.decision = decision;
245
649k
    }
246
    // Check whether to respond. This can happen as a result of either getting
247
    // a majority vote or of something invalidating the election, like
248
    // observing a higher term.
249
1.54M
    if (result_.decided() && 
!has_responded_759k
) {
250
737k
      has_responded_ = true;
251
737k
      to_respond = true;
252
737k
    }
253
1.54M
  }
254
255
  // Respond outside of the lock.
256
1.54M
  if (to_respond) {
257
    // This is thread-safe since result_ is write-once.
258
737k
    decision_callback_(result_);
259
737k
  }
260
1.54M
}
261
262
void LeaderElection::VoteResponseRpcCallback(const std::string& voter_uuid,
263
1.45M
                                             const LeaderElectionPtr& self) {
264
1.45M
  {
265
1.45M
    std::lock_guard<Lock> guard(lock_);
266
267
1.45M
    if (has_responded_) {
268
645k
      return;
269
645k
    }
270
271
809k
    auto it = voter_state_.find(voter_uuid);
272
809k
    CHECK(it != voter_state_.end());
273
809k
    VoterState* state = it->second.get();
274
275
    // Check for RPC errors.
276
809k
    if (!state->rpc.status().ok()) {
277
55.9k
      LOG_WITH_PREFIX(WARNING) << "RPC error from VoteRequest() call to peer " << voter_uuid
278
55.9k
                  << ": " << state->rpc.status().ToString();
279
55.9k
      RecordVoteUnlocked(voter_uuid, ElectionVote::kDenied);
280
281
    // Check for tablet errors.
282
753k
    } else if (state->response.has_error()) {
283
26.3k
      LOG_WITH_PREFIX(WARNING) << "Tablet error from VoteRequest() call to peer "
284
26.3k
                   << voter_uuid << ": "
285
26.3k
                   << StatusFromPB(state->response.error().status()).ToString();
286
26.3k
      RecordVoteUnlocked(voter_uuid, ElectionVote::kDenied);
287
288
    // If the peer changed their IP address, we shouldn't count this vote since
289
    // our knowledge of the configuration is in an inconsistent state.
290
727k
    } else if (PREDICT_FALSE(voter_uuid != state->response.responder_uuid())) {
291
0
      LOG_WITH_PREFIX(DFATAL) << "Received vote response from peer we thought had UUID "
292
0
                  << voter_uuid << ", but its actual UUID is " << state->response.responder_uuid();
293
0
      RecordVoteUnlocked(voter_uuid, ElectionVote::kDenied);
294
295
    // Node does not support preelection
296
727k
    } else if (result_.preelection && 
!state->response.preelection()664k
) {
297
0
      result_.preelections_not_supported_by_uuid = voter_uuid;
298
0
      HandleVoteDeniedUnlocked(voter_uuid, *state);
299
300
    // Count the granted votes.
301
727k
    } else if (state->response.vote_granted()) {
302
629k
      HandleVoteGrantedUnlocked(voter_uuid, *state);
303
304
    // Anything else is a denied vote.
305
629k
    } else {
306
97.8k
      HandleVoteDeniedUnlocked(voter_uuid, *state);
307
97.8k
    }
308
809k
  }
309
310
  // Check for a decision outside the lock.
311
0
  CheckForDecision();
312
809k
}
313
314
721k
void LeaderElection::RecordVoteUnlocked(const std::string& voter_uuid, ElectionVote vote) {
315
721k
  DCHECK(lock_.is_locked());
316
317
  // Record the vote.
318
721k
  bool duplicate;
319
721k
  Status s = vote_counter_->RegisterVote(voter_uuid, vote, &duplicate);
320
721k
  if (!s.ok()) {
321
0
    LOG_WITH_PREFIX(WARNING) << "Error registering vote for peer " << voter_uuid
322
0
                 << ": " << s.ToString();
323
0
    return;
324
0
  }
325
721k
  if (duplicate) {
326
    // Note: This is DFATAL because at the time of writing we do not support
327
    // retrying vote requests, so this should be impossible. It may be valid to
328
    // receive duplicate votes in the future if we implement retry.
329
0
    LOG_WITH_PREFIX(DFATAL) << "Duplicate vote received from peer " << voter_uuid;
330
0
  }
331
721k
}
332
333
88.8k
void LeaderElection::HandleHigherTermUnlocked(const string& voter_uuid, const VoterState& state) {
334
88.8k
  DCHECK(lock_.is_locked());
335
88.8k
  DCHECK_GT(state.response.responder_term(), consensus_term());
336
337
88.8k
  string msg = Substitute("Vote denied by peer $0 with higher term. Message: $1",
338
88.8k
                          state.response.responder_uuid(),
339
88.8k
                          StatusFromPB(state.response.consensus_error().status()).ToString());
340
88.8k
  LOG_WITH_PREFIX(WARNING) << msg;
341
342
88.8k
  if (!result_.decided()) {
343
88.2k
    LOG_WITH_PREFIX(INFO) << "Cancelling election due to peer responding with higher term";
344
88.2k
    result_.decision = ElectionVote::kDenied;
345
88.2k
    result_.higher_term = state.response.responder_term();
346
88.2k
    result_.message = msg;
347
88.2k
  }
348
88.8k
}
349
350
629k
void LeaderElection::HandleVoteGrantedUnlocked(const string& voter_uuid, const VoterState& state) {
351
629k
  DCHECK(lock_.is_locked());
352
629k
  DCHECK_EQ(state.response.responder_term(), election_term());
353
629k
  DCHECK(state.response.vote_granted());
354
629k
  if (state.response.has_remaining_leader_lease_duration_ms()) {
355
9.41k
    CoarseTimeLease lease(
356
9.41k
        state.response.leader_lease_uuid(),
357
9.41k
        CoarseMonoClock::Now() + state.response.remaining_leader_lease_duration_ms() * 1ms);
358
9.41k
    result_.old_leader_lease.TryUpdate(lease);
359
9.41k
  }
360
361
629k
  if (state.response.has_leader_ht_lease_expiration()) {
362
437k
    PhysicalComponentLease lease(
363
437k
        state.response.leader_ht_lease_uuid(), state.response.leader_ht_lease_expiration());
364
437k
    result_.old_leader_ht_lease.TryUpdate(lease);
365
437k
  }
366
367
629k
  LOG_WITH_PREFIX(INFO) << "Vote granted by peer " << voter_uuid;
368
629k
  RecordVoteUnlocked(voter_uuid, ElectionVote::kGranted);
369
629k
}
370
371
98.4k
void LeaderElection::HandleVoteDeniedUnlocked(const string& voter_uuid, const VoterState& state) {
372
98.4k
  DCHECK(lock_.is_locked());
373
98.4k
  DCHECK(!state.response.vote_granted());
374
375
  // If one of the voters responds with a greater term than our own, and we
376
  // have not yet triggered the decision callback, it cancels the election.
377
98.4k
  if (state.response.responder_term() > consensus_term()) {
378
88.8k
    return HandleHigherTermUnlocked(voter_uuid, state);
379
88.8k
  }
380
381
9.56k
  LOG_WITH_PREFIX(INFO) << "Vote denied by peer " << voter_uuid << ". Message: "
382
9.56k
            << StatusFromPB(state.response.consensus_error().status()).ToString();
383
9.56k
  RecordVoteUnlocked(voter_uuid, ElectionVote::kDenied);
384
9.56k
}
385
386
3.00M
std::string LeaderElection::LogPrefix() const {
387
3.00M
  return Substitute("T $0 P $1 [CANDIDATE]: Term $2 $3election: ",
388
3.00M
                    request_.tablet_id(),
389
3.00M
                    request_.candidate_uuid(),
390
3.00M
                    request_.candidate_term(),
391
3.00M
                    (result_.preelection ? 
"pre-"2.75M
:
""252k
));
392
3.00M
}
393
394
} // namespace consensus
395
} // namespace yb