YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/consensus/leader_election.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include "yb/consensus/leader_election.h"
34
35
#include <functional>
36
#include <mutex>
37
38
#include <glog/logging.h>
39
40
#include "yb/common/wire_protocol.h"
41
42
#include "yb/consensus/consensus_peers.h"
43
#include "yb/consensus/metadata.pb.h"
44
45
#include "yb/gutil/map-util.h"
46
#include "yb/gutil/port.h"
47
#include "yb/gutil/strings/join.h"
48
49
#include "yb/rpc/rpc_controller.h"
50
51
#include "yb/util/format.h"
52
#include "yb/util/logging.h"
53
#include "yb/util/net/net_util.h"
54
#include "yb/util/status.h"
55
#include "yb/util/status_format.h"
56
57
using namespace std::literals;
58
59
namespace yb {
60
namespace consensus {
61
62
using std::string;
63
using strings::Substitute;
64
65
///////////////////////////////////////////////////
66
// VoteCounter
67
///////////////////////////////////////////////////
68
69
VoteCounter::VoteCounter(size_t num_voters, size_t majority_size)
70
  : num_voters_(num_voters),
71
76.1k
    majority_size_(majority_size) {
72
76.1k
  CHECK_LE(majority_size, num_voters);
73
76.1k
  CHECK_GT(num_voters_, 0);
74
76.1k
  CHECK_GT(majority_size_, 0);
75
76.1k
}
76
77
Status VoteCounter::RegisterVote(const std::string& voter_uuid, ElectionVote vote,
78
164k
                                 bool* is_duplicate) {
79
  // Handle repeated votes.
80
164k
  if (PREDICT_FALSE(ContainsKey(votes_, voter_uuid))) {
81
    // Detect changed votes.
82
2
    ElectionVote prior_vote = votes_[voter_uuid];
83
2
    if (PREDICT_FALSE(prior_vote != vote)) {
84
1
      return STATUS_FORMAT(
85
1
          InvalidArgument,
86
1
          "Peer $0 voted a different way twice in the same election. "
87
1
          "First vote: $1, second vote: $2.",
88
1
          voter_uuid, prior_vote, vote);
89
1
    }
90
91
    // This was just a duplicate. Allow the caller to log it but don't change
92
    // the voting record.
93
1
    *is_duplicate = true;
94
1
    return Status::OK();
95
1
  }
96
97
  // Sanity check to ensure we did not exceed the allowed number of voters.
98
164k
  if (PREDICT_FALSE(yes_votes_ + no_votes_ == num_voters_)) {
99
    // More unique voters than allowed!
100
1
    return STATUS(InvalidArgument, Substitute(
101
1
        "Vote from peer $0 would cause the number of votes to exceed the expected number of "
102
1
        "voters, which is $1. Votes already received from the following peers: {$2}",
103
1
        voter_uuid,
104
1
        num_voters_,
105
1
        JoinKeysIterator(votes_.begin(), votes_.end(), ", ")));
106
1
  }
107
108
  // This is a valid vote, so store it.
109
164k
  InsertOrDie(&votes_, voter_uuid, vote);
110
164k
  switch (vote) {
111
145k
    case ElectionVote::kGranted:
112
145k
      ++yes_votes_;
113
145k
      break;
114
18.9k
    case ElectionVote::kDenied:
115
18.9k
      ++no_votes_;
116
18.9k
      break;
117
0
    case ElectionVote::kUnknown:
118
0
      return STATUS_FORMAT(InvalidArgument, "Invalid vote: $0", vote);
119
164k
  }
120
164k
  *is_duplicate = false;
121
164k
  return Status::OK();
122
164k
}
123
124
164k
ElectionVote VoteCounter::GetDecision() const {
125
164k
  if (yes_votes_ >= majority_size_) {
126
72.5k
    return ElectionVote::kGranted;
127
72.5k
  }
128
92.4k
  if (no_votes_ > num_voters_ - majority_size_) {
129
4.68k
    return ElectionVote::kDenied;
130
4.68k
  }
131
87.7k
  return ElectionVote::kUnknown;
132
87.7k
}
133
134
152k
size_t VoteCounter::GetTotalVotesCounted() const {
135
152k
  return yes_votes_ + no_votes_;
136
152k
}
137
138
20
bool VoteCounter::AreAllVotesIn() const {
139
20
  return GetTotalVotesCounted() == num_voters_;
140
20
}
141
142
///////////////////////////////////////////////////
143
// LeaderElection
144
///////////////////////////////////////////////////
145
146
LeaderElection::LeaderElection(const RaftConfigPB& config,
147
                               PeerProxyFactory* proxy_factory,
148
                               const VoteRequestPB& request,
149
                               std::unique_ptr<VoteCounter> vote_counter,
150
                               MonoDelta timeout,
151
                               PreElection preelection,
152
                               TEST_SuppressVoteRequest suppress_vote_request,
153
                               ElectionDecisionCallback decision_callback)
154
    : request_(request),
155
      result_(preelection, request.candidate_term()),
156
      vote_counter_(std::move(vote_counter)),
157
      timeout_(timeout),
158
      suppress_vote_request_(suppress_vote_request),
159
76.0k
      decision_callback_(std::move(decision_callback)) {
160
224k
  for (const RaftPeerPB& peer : config.peers()) {
161
224k
    if (request.candidate_uuid() == peer.permanent_uuid()) continue;
162
    // Only peers with member_type == VOTER are allowed to vote.
163
148k
    if (peer.member_type() != PeerMemberType::VOTER) {
164
2.29k
      LOG(INFO) << "Ignoring peer " << peer.permanent_uuid() << " vote because its member type is "
165
2.29k
                << PeerMemberType_Name(peer.member_type());
166
2.29k
      continue;
167
2.29k
    }
168
169
146k
    voting_follower_uuids_.push_back(peer.permanent_uuid());
170
171
146k
    auto state = std::make_unique<VoterState>();
172
146k
    state->proxy = proxy_factory->NewProxy(peer);
173
146k
    CHECK(voter_state_.emplace(peer.permanent_uuid(), std::move(state)).second);
174
146k
  }
175
176
  // Ensure that the candidate has already voted for itself.
177
0
  CHECK_EQ(1, vote_counter_->GetTotalVotesCounted()) << "Candidate must vote for itself first";
178
179
  // Ensure that existing votes + future votes add up to the expected total.
180
0
  CHECK_EQ(vote_counter_->GetTotalVotesCounted() + voting_follower_uuids_.size(),
181
0
           vote_counter_->GetTotalExpectedVotes())
182
0
      << "Expected different number of followers. Follower UUIDs: ["
183
0
      << yb::ToString(voting_follower_uuids_)
184
0
      << "]; RaftConfig: {" << config.ShortDebugString() << "}";
185
76.0k
}
186
187
76.1k
LeaderElection::~LeaderElection() {
188
76.1k
  std::lock_guard<Lock> guard(lock_);
189
76.1k
  DCHECK(has_responded_); // We must always call the callback exactly once.
190
76.1k
  voter_state_.clear();
191
76.1k
}
192
193
76.0k
void LeaderElection::Run() {
194
18.4E
  VLOG_WITH_PREFIX(1) << "Running leader election.";
195
196
  // Check if we have already won the election (relevant if this is a
197
  // single-node configuration, since we always pre-vote for ourselves).
198
76.0k
  CheckForDecision();
199
200
  // The rest of the code below is for a typical multi-node configuration.
201
146k
  for (const std::string& voter_uuid : voting_follower_uuids_) {
202
146k
    VoterState* state = nullptr;
203
146k
    {
204
146k
      std::lock_guard<Lock> guard(lock_);
205
146k
      if (result_.decided()) { // Already have result.
206
43
        break;
207
43
      }
208
146k
      auto it = voter_state_.find(voter_uuid);
209
146k
      CHECK(it != voter_state_.end());
210
146k
      state = it->second.get();
211
      // Safe to drop the lock because voter_state_ is not mutated outside of
212
      // the constructor / destructor. We do this to avoid deadlocks below.
213
146k
    }
214
215
    // Send the RPC request.
216
146k
    LOG_WITH_PREFIX(INFO) << "Requesting vote from peer " << voter_uuid;
217
146k
    state->rpc.set_timeout(timeout_);
218
219
146k
    state->request = request_;
220
146k
    state->request.set_dest_uuid(voter_uuid);
221
222
146k
    LeaderElectionPtr retained_self = this;
223
146k
    if (!suppress_vote_request_) {
224
146k
      state->rpc.set_invoke_callback_mode(rpc::InvokeCallbackMode::kThreadPoolHigh);
225
146k
      state->proxy->RequestConsensusVoteAsync(
226
146k
          &state->request, &state->response, &state->rpc,
227
146k
          std::bind(&LeaderElection::VoteResponseRpcCallback, this, voter_uuid, retained_self));
228
18.4E
    } else {
229
18.4E
      state->response.set_responder_uuid(voter_uuid);
230
18.4E
      VoteResponseRpcCallback(voter_uuid, retained_self);
231
18.4E
    }
232
146k
  }
233
76.0k
}
234
235
164k
void LeaderElection::CheckForDecision() {
236
164k
  bool to_respond = false;
237
164k
  {
238
164k
    std::lock_guard<Lock> guard(lock_);
239
    // Check if the vote has been newly decided.
240
164k
    auto decision = vote_counter_->GetDecision();
241
164k
    if (!result_.decided() && decision != ElectionVote::kUnknown) {
242
76.0k
      LOG_WITH_PREFIX(INFO) << "Election decided. Result: candidate "
243
71.3k
                << ((decision == ElectionVote::kGranted) ? "won." : "lost.");
244
76.0k
      result_.decision = decision;
245
76.0k
    }
246
    // Check whether to respond. This can happen as a result of either getting
247
    // a majority vote or of something invalidating the election, like
248
    // observing a higher term.
249
164k
    if (result_.decided() && !has_responded_) {
250
76.1k
      has_responded_ = true;
251
76.1k
      to_respond = true;
252
76.1k
    }
253
164k
  }
254
255
  // Respond outside of the lock.
256
164k
  if (to_respond) {
257
    // This is thread-safe since result_ is write-once.
258
76.1k
    decision_callback_(result_);
259
76.1k
  }
260
164k
}
261
262
void LeaderElection::VoteResponseRpcCallback(const std::string& voter_uuid,
263
145k
                                             const LeaderElectionPtr& self) {
264
145k
  {
265
145k
    std::lock_guard<Lock> guard(lock_);
266
267
145k
    if (has_responded_) {
268
57.1k
      return;
269
57.1k
    }
270
271
88.7k
    auto it = voter_state_.find(voter_uuid);
272
88.7k
    CHECK(it != voter_state_.end());
273
88.7k
    VoterState* state = it->second.get();
274
275
    // Check for RPC errors.
276
88.7k
    if (!state->rpc.status().ok()) {
277
1.44k
      LOG_WITH_PREFIX(WARNING) << "RPC error from VoteRequest() call to peer " << voter_uuid
278
1.44k
                  << ": " << state->rpc.status().ToString();
279
1.44k
      RecordVoteUnlocked(voter_uuid, ElectionVote::kDenied);
280
281
    // Check for tablet errors.
282
87.3k
    } else if (state->response.has_error()) {
283
16.6k
      LOG_WITH_PREFIX(WARNING) << "Tablet error from VoteRequest() call to peer "
284
16.6k
                   << voter_uuid << ": "
285
16.6k
                   << StatusFromPB(state->response.error().status()).ToString();
286
16.6k
      RecordVoteUnlocked(voter_uuid, ElectionVote::kDenied);
287
288
    // If the peer changed their IP address, we shouldn't count this vote since
289
    // our knowledge of the configuration is in an inconsistent state.
290
70.7k
    } else if (PREDICT_FALSE(voter_uuid != state->response.responder_uuid())) {
291
0
      LOG_WITH_PREFIX(DFATAL) << "Received vote response from peer we thought had UUID "
292
0
                  << voter_uuid << ", but its actual UUID is " << state->response.responder_uuid();
293
0
      RecordVoteUnlocked(voter_uuid, ElectionVote::kDenied);
294
295
    // Node does not support preelection
296
70.7k
    } else if (result_.preelection && !state->response.preelection()) {
297
0
      result_.preelections_not_supported_by_uuid = voter_uuid;
298
0
      HandleVoteDeniedUnlocked(voter_uuid, *state);
299
300
    // Count the granted votes.
301
70.7k
    } else if (state->response.vote_granted()) {
302
69.7k
      HandleVoteGrantedUnlocked(voter_uuid, *state);
303
304
    // Anything else is a denied vote.
305
955
    } else {
306
955
      HandleVoteDeniedUnlocked(voter_uuid, *state);
307
955
    }
308
88.7k
  }
309
310
  // Check for a decision outside the lock.
311
88.7k
  CheckForDecision();
312
88.7k
}
313
314
88.7k
void LeaderElection::RecordVoteUnlocked(const std::string& voter_uuid, ElectionVote vote) {
315
88.7k
  DCHECK(lock_.is_locked());
316
317
  // Record the vote.
318
88.7k
  bool duplicate;
319
88.7k
  Status s = vote_counter_->RegisterVote(voter_uuid, vote, &duplicate);
320
88.7k
  if (!s.ok()) {
321
0
    LOG_WITH_PREFIX(WARNING) << "Error registering vote for peer " << voter_uuid
322
0
                 << ": " << s.ToString();
323
0
    return;
324
0
  }
325
88.7k
  if (duplicate) {
326
    // Note: This is DFATAL because at the time of writing we do not support
327
    // retrying vote requests, so this should be impossible. It may be valid to
328
    // receive duplicate votes in the future if we implement retry.
329
0
    LOG_WITH_PREFIX(DFATAL) << "Duplicate vote received from peer " << voter_uuid;
330
0
  }
331
88.7k
}
332
333
109
void LeaderElection::HandleHigherTermUnlocked(const string& voter_uuid, const VoterState& state) {
334
109
  DCHECK(lock_.is_locked());
335
109
  DCHECK_GT(state.response.responder_term(), consensus_term());
336
337
109
  string msg = Substitute("Vote denied by peer $0 with higher term. Message: $1",
338
109
                          state.response.responder_uuid(),
339
109
                          StatusFromPB(state.response.consensus_error().status()).ToString());
340
109
  LOG_WITH_PREFIX(WARNING) << msg;
341
342
109
  if (!result_.decided()) {
343
109
    LOG_WITH_PREFIX(INFO) << "Cancelling election due to peer responding with higher term";
344
109
    result_.decision = ElectionVote::kDenied;
345
109
    result_.higher_term = state.response.responder_term();
346
109
    result_.message = msg;
347
109
  }
348
109
}
349
350
69.7k
void LeaderElection::HandleVoteGrantedUnlocked(const string& voter_uuid, const VoterState& state) {
351
69.7k
  DCHECK(lock_.is_locked());
352
69.7k
  DCHECK_EQ(state.response.responder_term(), election_term());
353
69.7k
  DCHECK(state.response.vote_granted());
354
69.7k
  if (state.response.has_remaining_leader_lease_duration_ms()) {
355
4.30k
    CoarseTimeLease lease(
356
4.30k
        state.response.leader_lease_uuid(),
357
4.30k
        CoarseMonoClock::Now() + state.response.remaining_leader_lease_duration_ms() * 1ms);
358
4.30k
    result_.old_leader_lease.TryUpdate(lease);
359
4.30k
  }
360
361
69.7k
  if (state.response.has_leader_ht_lease_expiration()) {
362
5.17k
    PhysicalComponentLease lease(
363
5.17k
        state.response.leader_ht_lease_uuid(), state.response.leader_ht_lease_expiration());
364
5.17k
    result_.old_leader_ht_lease.TryUpdate(lease);
365
5.17k
  }
366
367
69.7k
  LOG_WITH_PREFIX(INFO) << "Vote granted by peer " << voter_uuid;
368
69.7k
  RecordVoteUnlocked(voter_uuid, ElectionVote::kGranted);
369
69.7k
}
370
371
991
void LeaderElection::HandleVoteDeniedUnlocked(const string& voter_uuid, const VoterState& state) {
372
991
  DCHECK(lock_.is_locked());
373
991
  DCHECK(!state.response.vote_granted());
374
375
  // If one of the voters responds with a greater term than our own, and we
376
  // have not yet triggered the decision callback, it cancels the election.
377
991
  if (state.response.responder_term() > consensus_term()) {
378
109
    return HandleHigherTermUnlocked(voter_uuid, state);
379
109
  }
380
381
882
  LOG_WITH_PREFIX(INFO) << "Vote denied by peer " << voter_uuid << ". Message: "
382
882
            << StatusFromPB(state.response.consensus_error().status()).ToString();
383
882
  RecordVoteUnlocked(voter_uuid, ElectionVote::kDenied);
384
882
}
385
386
311k
std::string LeaderElection::LogPrefix() const {
387
311k
  return Substitute("T $0 P $1 [CANDIDATE]: Term $2 $3election: ",
388
311k
                    request_.tablet_id(),
389
311k
                    request_.candidate_uuid(),
390
311k
                    request_.candidate_term(),
391
169k
                    (result_.preelection ? "pre-" : ""));
392
311k
}
393
394
} // namespace consensus
395
} // namespace yb