YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/consensus/raft_consensus.h
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#ifndef YB_CONSENSUS_RAFT_CONSENSUS_H_
34
#define YB_CONSENSUS_RAFT_CONSENSUS_H_
35
36
#include <atomic>
37
#include <memory>
38
#include <mutex>
39
#include <string>
40
#include <utility>
41
#include <vector>
42
43
#include <boost/optional/optional_fwd.hpp>
44
45
#include "yb/common/entity_ids_types.h"
46
#include "yb/consensus/consensus.h"
47
#include "yb/consensus/consensus.pb.h"
48
#include "yb/consensus/consensus_meta.h"
49
#include "yb/consensus/consensus_queue.h"
50
#include "yb/consensus/multi_raft_batcher.h"
51
52
#include "yb/gutil/callback.h"
53
54
#include "yb/rpc/scheduler.h"
55
56
#include "yb/util/atomic.h"
57
#include "yb/util/opid.h"
58
#include "yb/util/random.h"
59
60
DECLARE_int32(leader_lease_duration_ms);
61
DECLARE_int32(ht_lease_duration_ms);
62
63
namespace yb {
64
65
typedef std::lock_guard<simple_spinlock> Lock;
66
typedef std::unique_ptr<Lock> ScopedLock;
67
68
class Counter;
69
class HostPort;
70
class ThreadPool;
71
class ThreadPoolToken;
72
73
namespace server {
74
class Clock;
75
}
76
77
namespace rpc {
78
class PeriodicTimer;
79
}
80
81
namespace consensus {
82
83
class ConsensusMetadata;
84
class Peer;
85
class PeerProxyFactory;
86
class PeerManager;
87
class ReplicaState;
88
struct ElectionResult;
89
90
constexpr int32_t kDefaultLeaderLeaseDurationMs = 2000;
91
92
YB_STRONGLY_TYPED_BOOL(WriteEmpty);
93
YB_STRONGLY_TYPED_BOOL(PreElected);
94
95
YB_DEFINE_ENUM(RejectMode, (kNone)(kAll)(kNonEmpty));
96
97
std::unique_ptr<ConsensusRoundCallback> MakeNonTrackedRoundCallback(
98
    ConsensusRound* round, const StdStatusCallback& callback);
99
100
class RaftConsensus : public std::enable_shared_from_this<RaftConsensus>,
101
                      public Consensus,
102
                      public PeerMessageQueueObserver,
103
                      public SafeOpIdWaiter {
104
 public:
105
  class ConsensusFaultHooks;
106
107
  // Creates RaftConsensus.
108
  static std::shared_ptr<RaftConsensus> Create(
109
    const ConsensusOptions& options,
110
    std::unique_ptr<ConsensusMetadata> cmeta,
111
    const RaftPeerPB& local_peer_pb,
112
    const scoped_refptr<MetricEntity>& table_metric_entity,
113
    const scoped_refptr<MetricEntity>& tablet_metric_entity,
114
    const scoped_refptr<server::Clock>& clock,
115
    ConsensusContext* consensus_context,
116
    rpc::Messenger* messenger,
117
    rpc::ProxyCache* proxy_cache,
118
    const scoped_refptr<log::Log>& log,
119
    const std::shared_ptr<MemTracker>& server_mem_tracker,
120
    const std::shared_ptr<MemTracker>& parent_mem_tracker,
121
    const Callback<void(std::shared_ptr<StateChangeContext> context)> mark_dirty_clbk,
122
    TableType table_type,
123
    ThreadPool* raft_pool,
124
    RetryableRequests* retryable_requests,
125
    MultiRaftManager* multi_raft_manager);
126
127
  // Creates RaftConsensus.
128
  RaftConsensus(
129
    const ConsensusOptions& options,
130
    std::unique_ptr<ConsensusMetadata> cmeta,
131
    std::unique_ptr<PeerProxyFactory> peer_proxy_factory,
132
    std::unique_ptr<PeerMessageQueue> queue,
133
    std::unique_ptr<PeerManager> peer_manager,
134
    std::unique_ptr<ThreadPoolToken> raft_pool_token,
135
    const scoped_refptr<MetricEntity>& table_metric_entity,
136
    const scoped_refptr<MetricEntity>& tablet_metric_entity,
137
    const std::string& peer_uuid,
138
    const scoped_refptr<server::Clock>& clock,
139
    ConsensusContext* consensus_context,
140
    const scoped_refptr<log::Log>& log,
141
    std::shared_ptr<MemTracker> parent_mem_tracker,
142
    Callback<void(std::shared_ptr<StateChangeContext> context)> mark_dirty_clbk,
143
    TableType table_type,
144
    RetryableRequests* retryable_requests);
145
146
  virtual ~RaftConsensus();
147
148
  virtual CHECKED_STATUS Start(const ConsensusBootstrapInfo& info) override;
149
150
  virtual bool IsRunning() const override;
151
152
  // Emulates an election by increasing the term number and asserting leadership
153
  // in the configuration by sending a NO_OP to other peers.
154
  // This is NOT safe to use in a distributed configuration with failure detection
155
  // enabled, as it could result in a split-brain scenario.
156
  CHECKED_STATUS EmulateElection() override;
157
158
  CHECKED_STATUS ElectionLostByProtege(const std::string& election_lost_by_uuid) override;
159
160
  CHECKED_STATUS WaitUntilLeaderForTests(const MonoDelta& timeout) override;
161
162
  CHECKED_STATUS StepDown(const LeaderStepDownRequestPB* req,
163
                          LeaderStepDownResponsePB* resp) override;
164
165
  CHECKED_STATUS TEST_Replicate(const ConsensusRoundPtr& round) override;
166
  CHECKED_STATUS ReplicateBatch(const ConsensusRounds& rounds) override;
167
168
  CHECKED_STATUS Update(
169
      ConsensusRequestPB* request,
170
      ConsensusResponsePB* response,
171
      CoarseTimePoint deadline) override;
172
173
  CHECKED_STATUS RequestVote(const VoteRequestPB* request,
174
                             VoteResponsePB* response) override;
175
176
  CHECKED_STATUS ChangeConfig(const ChangeConfigRequestPB& req,
177
                              const StdStatusCallback& client_cb,
178
                              boost::optional<tserver::TabletServerErrorPB::Code>* error_code)
179
                              override;
180
181
  CHECKED_STATUS UnsafeChangeConfig(
182
      const UnsafeChangeConfigRequestPB& req,
183
      boost::optional<tserver::TabletServerErrorPB::Code>* error_code) override;
184
185
  PeerRole GetRoleUnlocked() const;
186
187
  PeerRole role() const override;
188
189
  LeaderState GetLeaderState(bool allow_stale = false) const override;
190
191
  std::string peer_uuid() const override;
192
193
  std::string tablet_id() const override;
194
195
  const TabletId& split_parent_tablet_id() const override;
196
197
  ConsensusStatePB ConsensusState(
198
      ConsensusConfigType type,
199
      LeaderLeaseStatus* leader_lease_status) const override;
200
201
  ConsensusStatePB ConsensusStateUnlocked(
202
      ConsensusConfigType type,
203
      LeaderLeaseStatus* leader_lease_status) const override;
204
205
  RaftConfigPB CommittedConfig() const override;
206
207
  void DumpStatusHtml(std::ostream& out) const override;
208
209
  void Shutdown() override;
210
211
  // Return the active (as opposed to committed) role.
212
  PeerRole GetActiveRole() const;
213
214
  // Returns the replica state for tests. This should never be used outside of
215
  // tests, in particular calling the LockFor* methods on the returned object
216
  // can cause consensus to deadlock.
217
  ReplicaState* GetReplicaStateForTests();
218
219
  void TEST_UpdateMajorityReplicated(
220
      const OpId& majority_replicated, OpId* committed_index, OpId* last_committed_op_id) {
221
    UpdateMajorityReplicated({ majority_replicated,
222
                               CoarseTimePoint::min(),
223
                               HybridTime::kMin.GetPhysicalValueMicros() },
224
                             committed_index, last_committed_op_id);
225
  }
226
227
  yb::OpId GetLastReceivedOpId() override;
228
229
  yb::OpId GetLastCommittedOpId() override;
230
231
  OpId GetLastCDCedOpId() override;
232
233
  yb::OpId GetLastAppliedOpId() override;
234
235
  yb::OpId GetAllAppliedOpId();
236
237
  Result<MicrosTime> MajorityReplicatedHtLeaseExpiration(
238
      MicrosTime min_allowed, CoarseTimePoint deadline) const override;
239
240
  // The on-disk size of the consensus metadata.
241
  uint64_t OnDiskSize() const;
242
243
  yb::OpId MinRetryableRequestOpId();
244
245
675k
  CHECKED_STATUS StartElection(const LeaderElectionData& data) override {
246
675k
    return DoStartElection(data, PreElected::kFalse);
247
675k
  }
248
249
  size_t LogCacheSize();
250
  size_t EvictLogCache(size_t bytes_to_evict);
251
252
68
  const scoped_refptr<log::Log>& log() { return log_; }
253
254
  RetryableRequestsCounts TEST_CountRetryableRequests();
255
256
0
  void TEST_RejectMode(RejectMode value) {
257
0
    reject_mode_.store(value, std::memory_order_release);
258
0
  }
259
260
0
  void TEST_DelayUpdate(MonoDelta duration) {
261
0
    TEST_delay_update_.store(duration, std::memory_order_release);
262
0
  }
263
264
  Result<ReadOpsResult> ReadReplicatedMessagesForCDC(const yb::OpId& from,
265
    int64_t* last_replicated_opid_index,
266
    const CoarseTimePoint deadline = CoarseTimePoint::max()) override;
267
268
  void UpdateCDCConsumerOpId(const yb::OpId& op_id) override;
269
270
  // Start memory tracking of following operation in case it is still present in our caches.
271
  void TrackOperationMemory(const yb::OpId& op_id);
272
273
2.88M
  uint64_t MajorityNumSSTFiles() const {
274
2.88M
    return majority_num_sst_files_.load(std::memory_order_acquire);
275
2.88M
  }
276
277
  // Returns last op id from log cache with specified op id type and operation type.
278
  Result<OpId> TEST_GetLastOpIdWithType(OpIdType opid_type, OperationType op_type);
279
280
  int64_t TEST_LeaderTerm() const;
281
282
  // Trigger that a non-Operation ConsensusRound has finished replication.
283
  // If the replication was successful, an status will be OK. Otherwise, it
284
  // may be Aborted or some other error status.
285
  // If 'status' is OK, write a Commit message to the local WAL based on the
286
  // type of message it is.
287
  // The 'client_cb' will be invoked at the end of this execution.
288
  virtual void NonTrackedRoundReplicationFinished(
289
      ConsensusRound* round, const StdStatusCallback& client_cb, const Status& status);
290
291
 protected:
292
  // As a leader, append a new ConsensusRound to the queue.
293
  // Only virtual and protected for mocking purposes.
294
  virtual CHECKED_STATUS AppendNewRoundToQueueUnlocked(const ConsensusRoundPtr& round);
295
296
  // processed_rounds - out value for number of rounds that were processed.
297
  virtual CHECKED_STATUS AppendNewRoundsToQueueUnlocked(
298
      const ConsensusRounds& rounds, size_t* processed_rounds);
299
300
  CHECKED_STATUS CheckLeasesUnlocked(const ConsensusRoundPtr& round);
301
302
  // As a follower, start a consensus round not associated with a Operation.
303
  // Only virtual and protected for mocking purposes.
304
  virtual CHECKED_STATUS StartConsensusOnlyRoundUnlocked(const ReplicateMsgPtr& msg);
305
306
  // Assuming we are the leader, wait until we have a valid leader lease (i.e. the old leader's
307
  // lease has expired, and we have replicated a new lease that has not expired yet).
308
  // This says "Imprecise" because there is a slight race condition where this could wait for an
309
  // additional short time interval (e.g. 100 ms) in case we've just acquired the lease and the
310
  // waiting thread missed the notification. However, as of 08/14/2017 this is only used in a
311
  // context where this does not matter, such as catalog manager initialization.
312
  CHECKED_STATUS WaitForLeaderLeaseImprecise(CoarseTimePoint deadline) override;
313
314
  CHECKED_STATUS CheckIsActiveLeaderAndHasLease() const override;
315
316
 private:
317
  friend class ReplicaState;
318
  friend class RaftConsensusQuorumTest;
319
320
  // processed_rounds - out value for number of rounds that were processed.
321
  CHECKED_STATUS DoReplicateBatch(const ConsensusRounds& rounds, size_t* processed_rounds);
322
323
  CHECKED_STATUS DoStartElection(const LeaderElectionData& data, PreElected preelected);
324
325
  Result<LeaderElectionPtr> CreateElectionUnlocked(
326
      const LeaderElectionData& data,
327
      MonoDelta timeout,
328
      PreElection preelection);
329
330
  // Updates the committed_index, triggers the Apply()s for whatever
331
  // operations were pending and updates last_applied_op_id.
332
  // This is idempotent.
333
  void UpdateMajorityReplicated(
334
      const MajorityReplicatedData& data, OpId* committed_op_id, OpId* last_applied_op_id) override;
335
336
  void NotifyTermChange(int64_t term) override;
337
338
  void NotifyFailedFollower(const std::string& uuid,
339
                            int64_t term,
340
                            const std::string& reason) override;
341
342
  void MajorityReplicatedNumSSTFilesChanged(uint64_t majority_replicated_num_sst_files) override;
343
344
  // Control whether printing of log messages should be done for a particular
345
  // function call.
346
  enum AllowLogging {
347
    DO_NOT_LOG = 0,
348
    ALLOW_LOGGING = 1,
349
  };
350
351
  // Helper struct that contains the messages from the leader that we need to
352
  // append to our log, after they've been deduplicated.
353
  struct LeaderRequest;
354
355
  std::string LogPrefix();
356
357
  // Set the leader UUID of the configuration and mark the tablet config dirty for
358
  // reporting to the master.
359
  void SetLeaderUuidUnlocked(const std::string& uuid);
360
361
  // Replicate (as leader) a pre-validated config change. This includes
362
  // updating the peers and setting the new_configuration as pending.
363
  CHECKED_STATUS ReplicateConfigChangeUnlocked(const ReplicateMsgPtr& replicate_ref,
364
                                               const RaftConfigPB& new_config,
365
                                               ChangeConfigType type,
366
                                               StdStatusCallback client_cb);
367
368
  // Update the peers and queue to be consistent with a new active configuration.
369
  // Should only be called by the leader.
370
  void RefreshConsensusQueueAndPeersUnlocked();
371
372
  // Makes the peer become leader.
373
  // Returns OK once the change config operation that has this peer as leader
374
  // has been enqueued, the operation will complete asynchronously.
375
  //
376
  // The ReplicaState must be locked for configuration change before calling.
377
  CHECKED_STATUS BecomeLeaderUnlocked();
378
379
  // Makes the peer become a replica, i.e. a FOLLOWER or a LEARNER.
380
  // initial_fd_wait is the initial wait time before the FailureDetector wakes up and triggers a
381
  // leader election.
382
  //
383
  // The ReplicaState must be locked for configuration change before calling.
384
  CHECKED_STATUS BecomeReplicaUnlocked(
385
      const std::string& new_leader_uuid,
386
      MonoDelta initial_fd_wait = MonoDelta());
387
388
  struct UpdateReplicaResult {
389
    OpId wait_for_op_id;
390
391
    // Start an election after the writes are committed?
392
    bool start_election = false;
393
394
    int64_t current_term = OpId::kUnknownTerm;
395
  };
396
397
  // Updates the state in a replica by storing the received operations in the log
398
  // and triggering the required operations. This method won't return until all
399
  // operations have been stored in the log and all Prepares() have been completed,
400
  // and a replica cannot accept any more Update() requests until this is done.
401
  Result<UpdateReplicaResult> UpdateReplica(
402
      ConsensusRequestPB* request,
403
      ConsensusResponsePB* response);
404
405
  // Deduplicates an RPC request making sure that we get only messages that we
406
  // haven't appended to our log yet.
407
  // On return 'deduplicated_req' is instantiated with only the new messages
408
  // and the correct preceding id.
409
  CHECKED_STATUS DeduplicateLeaderRequestUnlocked(ConsensusRequestPB* rpc_req,
410
                                                  LeaderRequest* deduplicated_req);
411
412
  // Handles a request from a leader, refusing the request if the term is lower than
413
  // ours or stepping down if it's higher.
414
  CHECKED_STATUS HandleLeaderRequestTermUnlocked(const ConsensusRequestPB* request,
415
                                         ConsensusResponsePB* response);
416
417
  // Checks that the preceding op in 'req' is locally committed or pending and sets an
418
  // appropriate error message in 'response' if not.
419
  // If there is term mismatch between the preceding op id in 'req' and the local log's
420
  // pending operations, we proactively abort those pending operations after and including
421
  // the preceding op in 'req' to avoid a pointless cache miss in the leader's log cache.
422
  CHECKED_STATUS EnforceLogMatchingPropertyMatchesUnlocked(const LeaderRequest& req,
423
                                                   ConsensusResponsePB* response);
424
425
  // Checks that deduplicated messages in an UpdateConsensus request are in the right order.
426
  CHECKED_STATUS CheckLeaderRequestOpIdSequence(
427
      const LeaderRequest& deduped_req,
428
      ConsensusRequestPB* request);
429
430
  // Check a request received from a leader, making sure:
431
  // - The request is in the right term
432
  // - The log matching property holds
433
  // - Messages are de-duplicated so that we only process previously unprocessed requests.
434
  // - We abort operations if the leader sends operations that have the same index as
435
  //   operations currently on the pendings set, but different terms.
436
  // If this returns ok and the response has no errors, 'deduped_req' is set with only
437
  // the messages to add to our state machine.
438
  CHECKED_STATUS CheckLeaderRequestUnlocked(
439
      ConsensusRequestPB* request,
440
      ConsensusResponsePB* response,
441
      LeaderRequest* deduped_req);
442
443
  // Returns the most recent OpId written to the Log.
444
  yb::OpId GetLatestOpIdFromLog();
445
446
  // Begin a replica operation. If the type of message in 'msg' is not a type
447
  // that uses operations, delegates to StartConsensusOnlyRoundUnlocked().
448
  CHECKED_STATUS StartReplicaOperationUnlocked(const ReplicateMsgPtr& msg,
449
                                               HybridTime propagated_safe_time);
450
451
  // Return header string for RequestVote log messages. The ReplicaState lock must be held.
452
  std::string GetRequestVoteLogPrefix(const VoteRequestPB& request) const;
453
454
  // Fills the response with the current status, if an update was successful.
455
  void FillConsensusResponseOKUnlocked(ConsensusResponsePB* response);
456
457
  // Fills the response with an error code and error message.
458
  void FillConsensusResponseError(ConsensusResponsePB* response,
459
                                  ConsensusErrorPB::Code error_code,
460
                                  const Status& status);
461
462
  // Fill VoteResponsePB with the following information:
463
  // - Update responder_term to current local term.
464
  // - Set vote_granted to true.
465
  void FillVoteResponseVoteGranted(const VoteRequestPB& request, VoteResponsePB* response);
466
467
  // Fill VoteResponsePB with the following information:
468
  // - Update responder_term to current local term.
469
  // - Set vote_granted to false.
470
  // - Set consensus_error.code to the given code.
471
  void FillVoteResponseVoteDenied(ConsensusErrorPB::Code error_code, VoteResponsePB* response);
472
473
  void RequestVoteRespondVoteDenied(
474
      ConsensusErrorPB::Code error_code, const std::string& message_suffix,
475
      const VoteRequestPB& request, VoteResponsePB* response);
476
477
  // Respond to VoteRequest that the candidate has an old term.
478
  CHECKED_STATUS RequestVoteRespondInvalidTerm(const VoteRequestPB* request,
479
                                               VoteResponsePB* response);
480
481
  // Respond to VoteRequest that we already granted our vote to the candidate.
482
  CHECKED_STATUS RequestVoteRespondVoteAlreadyGranted(const VoteRequestPB* request,
483
                                              VoteResponsePB* response);
484
485
  // Respond to VoteRequest that we already granted our vote to someone else.
486
  CHECKED_STATUS RequestVoteRespondAlreadyVotedForOther(const VoteRequestPB* request,
487
                                                VoteResponsePB* response);
488
489
  // Respond to VoteRequest that the candidate's last-logged OpId is too old.
490
  CHECKED_STATUS RequestVoteRespondLastOpIdTooOld(const OpIdPB& local_last_opid,
491
                                                  const VoteRequestPB* request,
492
                                                  VoteResponsePB* response);
493
494
  // Respond to VoteRequest that the vote was not granted because we believe
495
  // the leader to be alive.
496
  CHECKED_STATUS RequestVoteRespondLeaderIsAlive(const VoteRequestPB* request,
497
                                         VoteResponsePB* response);
498
499
  // Respond to VoteRequest that the replica is already in the middle of servicing
500
  // another vote request or an update from a valid leader.
501
  CHECKED_STATUS RequestVoteRespondIsBusy(const VoteRequestPB* request,
502
                                  VoteResponsePB* response);
503
504
  // Respond to VoteRequest that the vote is granted for candidate.
505
  CHECKED_STATUS RequestVoteRespondVoteGranted(const VoteRequestPB* request,
506
                                       VoteResponsePB* response);
507
508
  // Callback for leader election driver. ElectionCallback is run on the
509
  // reactor thread, so it simply defers its work to DoElectionCallback.
510
  void ElectionCallback(const LeaderElectionData& data, const ElectionResult& result);
511
  void DoElectionCallback(const LeaderElectionData& data, const ElectionResult& result);
512
  void NotifyOriginatorAboutLostElection(const std::string& originator_uuid);
513
514
  // Helper struct that tracks the RunLeaderElection as part of leadership transferral.
515
  struct RunLeaderElectionState {
516
    PeerProxyPtr proxy;
517
    RunLeaderElectionRequestPB req;
518
    RunLeaderElectionResponsePB resp;
519
    rpc::RpcController rpc;
520
  };
521
522
  // Callback for RunLeaderElection async request.
523
  void RunLeaderElectionResponseRpcCallback(std::shared_ptr<RunLeaderElectionState> election_state);
524
525
  // Start tracking the leader for failures. This typically occurs at startup
526
  // and when the local peer steps down as leader.
527
  //
528
  // If 'delta' is set, it is used as the initial failure period. Otherwise,
529
  // the minimum election timeout is used.
530
  //
531
  // If the failure detector is already registered, has no effect.
532
  void EnableFailureDetector(MonoDelta delta = MonoDelta());
533
534
  // Stop tracking the current leader for failures.
535
  // This typically happens when the local peer becomes leader.
536
  // If the failure detector is already disabled, has no effect.
537
  void DisableFailureDetector();
538
539
  // "Reset" the failure detector to indicate leader activity.
540
  // When this is called a failure is guaranteed not to be detected
541
  // before 'FLAGS_leader_failure_max_missed_heartbeat_periods' *
542
  // 'FLAGS_raft_heartbeat_interval_ms' has elapsed, unless 'delta' is set, in
543
  // which case its value is used as the next failure period.
544
  // If 'allow_logging' is set to ALLOW_LOGGING, then this method
545
  // will print a log message when called.
546
  // If the failure detector is not registered, this method has no effect.
547
  void SnoozeFailureDetector(AllowLogging allow_logging,
548
                             MonoDelta delta = MonoDelta());
549
550
  // Return the minimum election timeout. Due to backoff and random
551
  // jitter, election timeouts may be longer than this.
552
  MonoDelta MinimumElectionTimeout() const;
553
554
  // Calculates a snooze delta for leader election.
555
  // The delta increases exponentially with the difference
556
  // between the current term and the term of the last committed
557
  // operation.
558
  // The maximum delta is capped by 'FLAGS_leader_failure_exp_backoff_max_delta_ms'.
559
  MonoDelta LeaderElectionExpBackoffDeltaUnlocked();
560
561
  // Checks if the leader is ready to process a change config request (one requirement for this is
562
  // for it to have at least one committed op in the current term). Also checks that there are no
563
  // voters in transition in the active config state. CHECKED_STATUS OK() implies leader is ready.
564
  // server_uuid is the uuid of the server that we are trying to remove, add, or change its
565
  // role.
566
  CHECKED_STATUS IsLeaderReadyForChangeConfigUnlocked(ChangeConfigType type,
567
                                              const std::string& server_uuid);
568
569
  // Increment the term to the next term, resetting the current leader, etc.
570
  CHECKED_STATUS IncrementTermUnlocked();
571
572
  // Handle when the term has advanced beyond the current term.
573
  CHECKED_STATUS HandleTermAdvanceUnlocked(ConsensusTerm new_term);
574
575
  // Notify the tablet peer that the consensus configuration
576
  // has changed, thus reporting it back to the master. This is performed inline.
577
  void MarkDirty(std::shared_ptr<StateChangeContext> context);
578
579
  // Calls MarkDirty() if 'status' == OK. Then, always calls 'client_cb' with
580
  // 'status' as its argument.
581
  void MarkDirtyOnSuccess(std::shared_ptr<StateChangeContext> context,
582
                          const StdStatusCallback& client_cb,
583
                          const Status& status);
584
585
  // Attempt to remove the follower with the specified 'uuid' from the config,
586
  // if the 'committed_config' is still the committed config and if the current
587
  // node is the leader.
588
  //
589
  // Since this is inherently an asynchronous operation run on a thread pool,
590
  // it may fail due to the configuration changing, the local node losing
591
  // leadership, or the tablet shutting down.
592
  // Logs a warning on failure.
593
  void TryRemoveFollowerTask(const std::string& uuid,
594
                             const RaftConfigPB& committed_config,
595
                             const std::string& reason);
596
597
  // Called when the failure detector expires.
598
  // Submits ReportFailureDetectedTask() to a thread pool.
599
  void ReportFailureDetected();
600
601
  // Call StartElection(), log a warning if the call fails (usually due to
602
  // being shut down).
603
  void ReportFailureDetectedTask();
604
605
  // Helper API to check if the pending/committed configuration has a PRE_VOTER. Non-null return
606
  // string implies there are servers in transit.
607
  string ServersInTransitionMessage();
608
609
  // Prevent starting new election for some time, after we stepped down.
610
  // protege_uuid - in case of step down we remember our protege.
611
  // After that we use its UUID to check whether node that lost election is our active protege.
612
  // There could be case that we already initiated another stepdown, and after that we received
613
  // delayed packet from old protege.
614
  // So this field allows us to filter out this situation.
615
  // Also we could introduce serial number of stepdown and filter using it.
616
  // That woule be more robust, since it handles also situation when we tried to stepdown
617
  // to the same node twice, and first retry was delayed, but second procedure is on the way.
618
  void WithholdElectionAfterStepDown(const std::string& protege_uuid);
619
620
  // Steps of UpdateReplica.
621
  CHECKED_STATUS EarlyCommitUnlocked(const ConsensusRequestPB& request,
622
                                     const LeaderRequest& deduped_req);
623
  Result<bool> EnqueuePreparesUnlocked(const ConsensusRequestPB& request,
624
                                       LeaderRequest* deduped_req,
625
                                       ConsensusResponsePB* response);
626
  // Returns last op id received from leader.
627
  yb::OpId EnqueueWritesUnlocked(const LeaderRequest& deduped_req, WriteEmpty write_empty);
628
  CHECKED_STATUS MarkOperationsAsCommittedUnlocked(const ConsensusRequestPB& request,
629
                                                   const LeaderRequest& deduped_req,
630
                                                   OpId last_from_leader);
631
632
  // Wait until the operation with op id equal to wait_for_op_id is flushed in the WAL.
633
  // If term was changed during wait from the specified one - exit with error.
634
  CHECKED_STATUS WaitForWrites(int64_t term, const OpId& wait_for_op_id);
635
636
  // See comment for ReplicaState::CancelPendingOperation
637
  void RollbackIdAndDeleteOpId(const ReplicateMsgPtr& replicate_msg, bool should_exists);
638
639
  yb::OpId WaitForSafeOpIdToApply(const yb::OpId& op_id) override;
640
641
  void AppendEmptyBatchToLeaderLog();
642
643
  // Step down in favor of peer.
644
  // When graceful is true, protege would not be stored and election would not take place in case
645
  // of protege election failure.
646
  CHECKED_STATUS StartStepDownUnlocked(const RaftPeerPB& peer, bool graceful);
647
648
  // Checked whether we should start step down when protege did not synchronize before timeout.
649
  void CheckDelayedStepDown(const Status& status);
650
651
  // Threadpool token for constructing requests to peers, handling RPC callbacks,
652
  // etc.
653
  std::unique_ptr<ThreadPoolToken> raft_pool_token_;
654
655
  scoped_refptr<log::Log> log_;
656
  scoped_refptr<server::Clock> clock_;
657
  std::unique_ptr<PeerProxyFactory> peer_proxy_factory_;
658
659
  std::unique_ptr<PeerManager> peer_manager_;
660
661
  // The queue of messages that must be sent to peers.
662
  std::unique_ptr<PeerMessageQueue> queue_;
663
664
  std::unique_ptr<ReplicaState> state_;
665
666
  Random rng_;
667
668
  std::shared_ptr<rpc::PeriodicTimer> failure_detector_;
669
670
  // If any RequestVote() RPC arrives before this hybrid time,
671
  // the request will be ignored. This prevents abandoned or partitioned
672
  // nodes from disturbing the healthy leader.
673
  std::atomic<MonoTime> withhold_votes_until_;
674
675
  // UUID of new desired leader during stepdown.
676
  TabletServerId protege_leader_uuid_;
677
678
  // This is the time (in the MonoTime's uint64 representation) for which election should not start
679
  // on this peer.
680
  std::atomic<MonoTime> withhold_election_start_until_{MonoTime::Min()};
681
682
  // We record the moment at which we discover that an election has been lost by our "protege"
683
  // during leader stepdown. Then, when the master asks us to step down again in favor of the same
684
  // server, we'll reply with the amount of time that has passed to avoid leader stepdown loops.s
685
  MonoTime election_lost_by_protege_at_;
686
687
  struct DelayedStepDown {
688
    int64_t term = OpId::kUnknownTerm;
689
    TabletServerId protege;
690
    bool graceful;
691
692
    std::string ToString() const;
693
  };
694
695
  DelayedStepDown delayed_step_down_;
696
  rpc::ScheduledTaskTracker step_down_check_tracker_;
697
698
  // The number of times this node has called and lost a leader election since
699
  // the last time it saw a stable leader (either itself or another node).
700
  // This is used to calculate back-off of the election timeout.
701
  std::atomic<int> failed_elections_since_stable_leader_{0};
702
703
  const Callback<void(std::shared_ptr<StateChangeContext> context)> mark_dirty_clbk_;
704
705
  // Lock ordering note: If both this lock and the ReplicaState lock are to be
706
  // taken, this lock must be taken first.
707
  mutable std::timed_mutex update_mutex_;
708
709
  std::atomic_flag outstanding_report_failure_task_ = ATOMIC_FLAG_INIT;
710
711
  AtomicBool shutdown_;
712
713
  scoped_refptr<Counter> follower_memory_pressure_rejections_;
714
  scoped_refptr<AtomicGauge<int64_t>> term_metric_;
715
  scoped_refptr<AtomicMillisLag> follower_last_update_time_ms_metric_;
716
  scoped_refptr<AtomicGauge<int64_t>> is_raft_leader_metric_;
717
  std::shared_ptr<MemTracker> parent_mem_tracker_;
718
719
  TableType table_type_;
720
721
  // Mutex / condition used for waiting for acquiring a valid leader lease.
722
  std::mutex leader_lease_wait_mtx_;
723
  std::condition_variable leader_lease_wait_cond_;
724
725
  scoped_refptr<Histogram> update_raft_config_dns_latency_;
726
727
  // Used only when TEST_follower_reject_update_consensus_requests_seconds is greater than 0.
728
  // Any requests to update the replica will be rejected until this time. For testing only.
729
  MonoTime withold_replica_updates_until_ = MonoTime::kUninitialized;
730
731
  std::atomic<RejectMode> reject_mode_{RejectMode::kNone};
732
733
  CoarseTimePoint disable_pre_elections_until_ = CoarseTimePoint::min();
734
735
  std::atomic<MonoDelta> TEST_delay_update_{MonoDelta::kZero};
736
737
  std::atomic<uint64_t> majority_num_sst_files_{0};
738
739
  const TabletId split_parent_tablet_id_;
740
741
  DISALLOW_COPY_AND_ASSIGN(RaftConsensus);
742
};
743
744
}  // namespace consensus
745
}  // namespace yb
746
747
#endif /* YB_CONSENSUS_RAFT_CONSENSUS_H_ */