/Users/deen/code/yugabyte-db/src/yb/consensus/raft_consensus.h
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | // |
18 | | // The following only applies to changes made to this file as part of YugaByte development. |
19 | | // |
20 | | // Portions Copyright (c) YugaByte, Inc. |
21 | | // |
22 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
23 | | // in compliance with the License. You may obtain a copy of the License at |
24 | | // |
25 | | // http://www.apache.org/licenses/LICENSE-2.0 |
26 | | // |
27 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
28 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
29 | | // or implied. See the License for the specific language governing permissions and limitations |
30 | | // under the License. |
31 | | // |
32 | | |
33 | | #ifndef YB_CONSENSUS_RAFT_CONSENSUS_H_ |
34 | | #define YB_CONSENSUS_RAFT_CONSENSUS_H_ |
35 | | |
36 | | #include <atomic> |
37 | | #include <memory> |
38 | | #include <mutex> |
39 | | #include <string> |
40 | | #include <utility> |
41 | | #include <vector> |
42 | | |
43 | | #include <boost/optional/optional_fwd.hpp> |
44 | | |
45 | | #include "yb/common/entity_ids_types.h" |
46 | | #include "yb/consensus/consensus.h" |
47 | | #include "yb/consensus/consensus.pb.h" |
48 | | #include "yb/consensus/consensus_meta.h" |
49 | | #include "yb/consensus/consensus_queue.h" |
50 | | #include "yb/consensus/multi_raft_batcher.h" |
51 | | |
52 | | #include "yb/gutil/callback.h" |
53 | | |
54 | | #include "yb/rpc/scheduler.h" |
55 | | |
56 | | #include "yb/util/atomic.h" |
57 | | #include "yb/util/opid.h" |
58 | | #include "yb/util/random.h" |
59 | | |
60 | | DECLARE_int32(leader_lease_duration_ms); |
61 | | DECLARE_int32(ht_lease_duration_ms); |
62 | | |
63 | | namespace yb { |
64 | | |
65 | | typedef std::lock_guard<simple_spinlock> Lock; |
66 | | typedef std::unique_ptr<Lock> ScopedLock; |
67 | | |
68 | | class Counter; |
69 | | class HostPort; |
70 | | class ThreadPool; |
71 | | class ThreadPoolToken; |
72 | | |
73 | | namespace server { |
74 | | class Clock; |
75 | | } |
76 | | |
77 | | namespace rpc { |
78 | | class PeriodicTimer; |
79 | | } |
80 | | |
81 | | namespace consensus { |
82 | | |
83 | | class ConsensusMetadata; |
84 | | class Peer; |
85 | | class PeerProxyFactory; |
86 | | class PeerManager; |
87 | | class ReplicaState; |
88 | | struct ElectionResult; |
89 | | |
90 | | constexpr int32_t kDefaultLeaderLeaseDurationMs = 2000; |
91 | | |
92 | | YB_STRONGLY_TYPED_BOOL(WriteEmpty); |
93 | | YB_STRONGLY_TYPED_BOOL(PreElected); |
94 | | |
95 | | YB_DEFINE_ENUM(RejectMode, (kNone)(kAll)(kNonEmpty)); |
96 | | |
97 | | std::unique_ptr<ConsensusRoundCallback> MakeNonTrackedRoundCallback( |
98 | | ConsensusRound* round, const StdStatusCallback& callback); |
99 | | |
100 | | class RaftConsensus : public std::enable_shared_from_this<RaftConsensus>, |
101 | | public Consensus, |
102 | | public PeerMessageQueueObserver, |
103 | | public SafeOpIdWaiter { |
104 | | public: |
105 | | class ConsensusFaultHooks; |
106 | | |
107 | | // Creates RaftConsensus. |
108 | | static std::shared_ptr<RaftConsensus> Create( |
109 | | const ConsensusOptions& options, |
110 | | std::unique_ptr<ConsensusMetadata> cmeta, |
111 | | const RaftPeerPB& local_peer_pb, |
112 | | const scoped_refptr<MetricEntity>& table_metric_entity, |
113 | | const scoped_refptr<MetricEntity>& tablet_metric_entity, |
114 | | const scoped_refptr<server::Clock>& clock, |
115 | | ConsensusContext* consensus_context, |
116 | | rpc::Messenger* messenger, |
117 | | rpc::ProxyCache* proxy_cache, |
118 | | const scoped_refptr<log::Log>& log, |
119 | | const std::shared_ptr<MemTracker>& server_mem_tracker, |
120 | | const std::shared_ptr<MemTracker>& parent_mem_tracker, |
121 | | const Callback<void(std::shared_ptr<StateChangeContext> context)> mark_dirty_clbk, |
122 | | TableType table_type, |
123 | | ThreadPool* raft_pool, |
124 | | RetryableRequests* retryable_requests, |
125 | | MultiRaftManager* multi_raft_manager); |
126 | | |
127 | | // Creates RaftConsensus. |
128 | | RaftConsensus( |
129 | | const ConsensusOptions& options, |
130 | | std::unique_ptr<ConsensusMetadata> cmeta, |
131 | | std::unique_ptr<PeerProxyFactory> peer_proxy_factory, |
132 | | std::unique_ptr<PeerMessageQueue> queue, |
133 | | std::unique_ptr<PeerManager> peer_manager, |
134 | | std::unique_ptr<ThreadPoolToken> raft_pool_token, |
135 | | const scoped_refptr<MetricEntity>& table_metric_entity, |
136 | | const scoped_refptr<MetricEntity>& tablet_metric_entity, |
137 | | const std::string& peer_uuid, |
138 | | const scoped_refptr<server::Clock>& clock, |
139 | | ConsensusContext* consensus_context, |
140 | | const scoped_refptr<log::Log>& log, |
141 | | std::shared_ptr<MemTracker> parent_mem_tracker, |
142 | | Callback<void(std::shared_ptr<StateChangeContext> context)> mark_dirty_clbk, |
143 | | TableType table_type, |
144 | | RetryableRequests* retryable_requests); |
145 | | |
146 | | virtual ~RaftConsensus(); |
147 | | |
148 | | virtual CHECKED_STATUS Start(const ConsensusBootstrapInfo& info) override; |
149 | | |
150 | | virtual bool IsRunning() const override; |
151 | | |
152 | | // Emulates an election by increasing the term number and asserting leadership |
153 | | // in the configuration by sending a NO_OP to other peers. |
154 | | // This is NOT safe to use in a distributed configuration with failure detection |
155 | | // enabled, as it could result in a split-brain scenario. |
156 | | CHECKED_STATUS EmulateElection() override; |
157 | | |
158 | | CHECKED_STATUS ElectionLostByProtege(const std::string& election_lost_by_uuid) override; |
159 | | |
160 | | CHECKED_STATUS WaitUntilLeaderForTests(const MonoDelta& timeout) override; |
161 | | |
162 | | CHECKED_STATUS StepDown(const LeaderStepDownRequestPB* req, |
163 | | LeaderStepDownResponsePB* resp) override; |
164 | | |
165 | | CHECKED_STATUS TEST_Replicate(const ConsensusRoundPtr& round) override; |
166 | | CHECKED_STATUS ReplicateBatch(const ConsensusRounds& rounds) override; |
167 | | |
168 | | CHECKED_STATUS Update( |
169 | | ConsensusRequestPB* request, |
170 | | ConsensusResponsePB* response, |
171 | | CoarseTimePoint deadline) override; |
172 | | |
173 | | CHECKED_STATUS RequestVote(const VoteRequestPB* request, |
174 | | VoteResponsePB* response) override; |
175 | | |
176 | | CHECKED_STATUS ChangeConfig(const ChangeConfigRequestPB& req, |
177 | | const StdStatusCallback& client_cb, |
178 | | boost::optional<tserver::TabletServerErrorPB::Code>* error_code) |
179 | | override; |
180 | | |
181 | | CHECKED_STATUS UnsafeChangeConfig( |
182 | | const UnsafeChangeConfigRequestPB& req, |
183 | | boost::optional<tserver::TabletServerErrorPB::Code>* error_code) override; |
184 | | |
185 | | PeerRole GetRoleUnlocked() const; |
186 | | |
187 | | PeerRole role() const override; |
188 | | |
189 | | LeaderState GetLeaderState(bool allow_stale = false) const override; |
190 | | |
191 | | std::string peer_uuid() const override; |
192 | | |
193 | | std::string tablet_id() const override; |
194 | | |
195 | | const TabletId& split_parent_tablet_id() const override; |
196 | | |
197 | | ConsensusStatePB ConsensusState( |
198 | | ConsensusConfigType type, |
199 | | LeaderLeaseStatus* leader_lease_status) const override; |
200 | | |
201 | | ConsensusStatePB ConsensusStateUnlocked( |
202 | | ConsensusConfigType type, |
203 | | LeaderLeaseStatus* leader_lease_status) const override; |
204 | | |
205 | | RaftConfigPB CommittedConfig() const override; |
206 | | |
207 | | void DumpStatusHtml(std::ostream& out) const override; |
208 | | |
209 | | void Shutdown() override; |
210 | | |
211 | | // Return the active (as opposed to committed) role. |
212 | | PeerRole GetActiveRole() const; |
213 | | |
214 | | // Returns the replica state for tests. This should never be used outside of |
215 | | // tests, in particular calling the LockFor* methods on the returned object |
216 | | // can cause consensus to deadlock. |
217 | | ReplicaState* GetReplicaStateForTests(); |
218 | | |
219 | | void TEST_UpdateMajorityReplicated( |
220 | 16 | const OpId& majority_replicated, OpId* committed_index, OpId* last_committed_op_id) { |
221 | 16 | UpdateMajorityReplicated({ majority_replicated, |
222 | 16 | CoarseTimePoint::min(), |
223 | 16 | HybridTime::kMin.GetPhysicalValueMicros() }, |
224 | 16 | committed_index, last_committed_op_id); |
225 | 16 | } |
226 | | |
227 | | yb::OpId GetLastReceivedOpId() override; |
228 | | |
229 | | yb::OpId GetLastCommittedOpId() override; |
230 | | |
231 | | OpId GetLastCDCedOpId() override; |
232 | | |
233 | | yb::OpId GetLastAppliedOpId() override; |
234 | | |
235 | | yb::OpId GetAllAppliedOpId(); |
236 | | |
237 | | Result<MicrosTime> MajorityReplicatedHtLeaseExpiration( |
238 | | MicrosTime min_allowed, CoarseTimePoint deadline) const override; |
239 | | |
240 | | // The on-disk size of the consensus metadata. |
241 | | uint64_t OnDiskSize() const; |
242 | | |
243 | | yb::OpId MinRetryableRequestOpId(); |
244 | | |
245 | 40.9k | CHECKED_STATUS StartElection(const LeaderElectionData& data) override { |
246 | 40.9k | return DoStartElection(data, PreElected::kFalse); |
247 | 40.9k | } |
248 | | |
249 | | size_t LogCacheSize(); |
250 | | size_t EvictLogCache(size_t bytes_to_evict); |
251 | | |
252 | 44 | const scoped_refptr<log::Log>& log() { return log_; } |
253 | | |
254 | | RetryableRequestsCounts TEST_CountRetryableRequests(); |
255 | | |
256 | 0 | void TEST_RejectMode(RejectMode value) { |
257 | 0 | reject_mode_.store(value, std::memory_order_release); |
258 | 0 | } |
259 | | |
260 | 0 | void TEST_DelayUpdate(MonoDelta duration) { |
261 | 0 | TEST_delay_update_.store(duration, std::memory_order_release); |
262 | 0 | } |
263 | | |
264 | | Result<ReadOpsResult> ReadReplicatedMessagesForCDC(const yb::OpId& from, |
265 | | int64_t* last_replicated_opid_index, |
266 | | const CoarseTimePoint deadline = CoarseTimePoint::max()) override; |
267 | | |
268 | | void UpdateCDCConsumerOpId(const yb::OpId& op_id) override; |
269 | | |
270 | | // Start memory tracking of following operation in case it is still present in our caches. |
271 | | void TrackOperationMemory(const yb::OpId& op_id); |
272 | | |
273 | 1.48M | uint64_t MajorityNumSSTFiles() const { |
274 | 1.48M | return majority_num_sst_files_.load(std::memory_order_acquire); |
275 | 1.48M | } |
276 | | |
277 | | // Returns last op id from log cache with specified op id type and operation type. |
278 | | Result<OpId> TEST_GetLastOpIdWithType(OpIdType opid_type, OperationType op_type); |
279 | | |
280 | | int64_t TEST_LeaderTerm() const; |
281 | | |
282 | | // Trigger that a non-Operation ConsensusRound has finished replication. |
283 | | // If the replication was successful, an status will be OK. Otherwise, it |
284 | | // may be Aborted or some other error status. |
285 | | // If 'status' is OK, write a Commit message to the local WAL based on the |
286 | | // type of message it is. |
287 | | // The 'client_cb' will be invoked at the end of this execution. |
288 | | virtual void NonTrackedRoundReplicationFinished( |
289 | | ConsensusRound* round, const StdStatusCallback& client_cb, const Status& status); |
290 | | |
291 | | protected: |
292 | | // As a leader, append a new ConsensusRound to the queue. |
293 | | // Only virtual and protected for mocking purposes. |
294 | | virtual CHECKED_STATUS AppendNewRoundToQueueUnlocked(const ConsensusRoundPtr& round); |
295 | | |
296 | | // processed_rounds - out value for number of rounds that were processed. |
297 | | virtual CHECKED_STATUS AppendNewRoundsToQueueUnlocked( |
298 | | const ConsensusRounds& rounds, size_t* processed_rounds); |
299 | | |
300 | | CHECKED_STATUS CheckLeasesUnlocked(const ConsensusRoundPtr& round); |
301 | | |
302 | | // As a follower, start a consensus round not associated with a Operation. |
303 | | // Only virtual and protected for mocking purposes. |
304 | | virtual CHECKED_STATUS StartConsensusOnlyRoundUnlocked(const ReplicateMsgPtr& msg); |
305 | | |
306 | | // Assuming we are the leader, wait until we have a valid leader lease (i.e. the old leader's |
307 | | // lease has expired, and we have replicated a new lease that has not expired yet). |
308 | | // This says "Imprecise" because there is a slight race condition where this could wait for an |
309 | | // additional short time interval (e.g. 100 ms) in case we've just acquired the lease and the |
310 | | // waiting thread missed the notification. However, as of 08/14/2017 this is only used in a |
311 | | // context where this does not matter, such as catalog manager initialization. |
312 | | CHECKED_STATUS WaitForLeaderLeaseImprecise(CoarseTimePoint deadline) override; |
313 | | |
314 | | CHECKED_STATUS CheckIsActiveLeaderAndHasLease() const override; |
315 | | |
316 | | private: |
317 | | friend class ReplicaState; |
318 | | friend class RaftConsensusQuorumTest; |
319 | | |
320 | | // processed_rounds - out value for number of rounds that were processed. |
321 | | CHECKED_STATUS DoReplicateBatch(const ConsensusRounds& rounds, size_t* processed_rounds); |
322 | | |
323 | | CHECKED_STATUS DoStartElection(const LeaderElectionData& data, PreElected preelected); |
324 | | |
325 | | Result<LeaderElectionPtr> CreateElectionUnlocked( |
326 | | const LeaderElectionData& data, |
327 | | MonoDelta timeout, |
328 | | PreElection preelection); |
329 | | |
330 | | // Updates the committed_index, triggers the Apply()s for whatever |
331 | | // operations were pending and updates last_applied_op_id. |
332 | | // This is idempotent. |
333 | | void UpdateMajorityReplicated( |
334 | | const MajorityReplicatedData& data, OpId* committed_op_id, OpId* last_applied_op_id) override; |
335 | | |
336 | | void NotifyTermChange(int64_t term) override; |
337 | | |
338 | | void NotifyFailedFollower(const std::string& uuid, |
339 | | int64_t term, |
340 | | const std::string& reason) override; |
341 | | |
342 | | void MajorityReplicatedNumSSTFilesChanged(uint64_t majority_replicated_num_sst_files) override; |
343 | | |
344 | | // Control whether printing of log messages should be done for a particular |
345 | | // function call. |
346 | | enum AllowLogging { |
347 | | DO_NOT_LOG = 0, |
348 | | ALLOW_LOGGING = 1, |
349 | | }; |
350 | | |
351 | | // Helper struct that contains the messages from the leader that we need to |
352 | | // append to our log, after they've been deduplicated. |
353 | | struct LeaderRequest; |
354 | | |
355 | | std::string LogPrefix(); |
356 | | |
357 | | // Set the leader UUID of the configuration and mark the tablet config dirty for |
358 | | // reporting to the master. |
359 | | void SetLeaderUuidUnlocked(const std::string& uuid); |
360 | | |
361 | | // Replicate (as leader) a pre-validated config change. This includes |
362 | | // updating the peers and setting the new_configuration as pending. |
363 | | CHECKED_STATUS ReplicateConfigChangeUnlocked(const ReplicateMsgPtr& replicate_ref, |
364 | | const RaftConfigPB& new_config, |
365 | | ChangeConfigType type, |
366 | | StdStatusCallback client_cb); |
367 | | |
368 | | // Update the peers and queue to be consistent with a new active configuration. |
369 | | // Should only be called by the leader. |
370 | | void RefreshConsensusQueueAndPeersUnlocked(); |
371 | | |
372 | | // Makes the peer become leader. |
373 | | // Returns OK once the change config operation that has this peer as leader |
374 | | // has been enqueued, the operation will complete asynchronously. |
375 | | // |
376 | | // The ReplicaState must be locked for configuration change before calling. |
377 | | CHECKED_STATUS BecomeLeaderUnlocked(); |
378 | | |
379 | | // Makes the peer become a replica, i.e. a FOLLOWER or a LEARNER. |
380 | | // initial_fd_wait is the initial wait time before the FailureDetector wakes up and triggers a |
381 | | // leader election. |
382 | | // |
383 | | // The ReplicaState must be locked for configuration change before calling. |
384 | | CHECKED_STATUS BecomeReplicaUnlocked( |
385 | | const std::string& new_leader_uuid, |
386 | | MonoDelta initial_fd_wait = MonoDelta()); |
387 | | |
388 | | struct UpdateReplicaResult { |
389 | | OpId wait_for_op_id; |
390 | | |
391 | | // Start an election after the writes are committed? |
392 | | bool start_election = false; |
393 | | |
394 | | int64_t current_term = OpId::kUnknownTerm; |
395 | | }; |
396 | | |
397 | | // Updates the state in a replica by storing the received operations in the log |
398 | | // and triggering the required operations. This method won't return until all |
399 | | // operations have been stored in the log and all Prepares() have been completed, |
400 | | // and a replica cannot accept any more Update() requests until this is done. |
401 | | Result<UpdateReplicaResult> UpdateReplica( |
402 | | ConsensusRequestPB* request, |
403 | | ConsensusResponsePB* response); |
404 | | |
405 | | // Deduplicates an RPC request making sure that we get only messages that we |
406 | | // haven't appended to our log yet. |
407 | | // On return 'deduplicated_req' is instantiated with only the new messages |
408 | | // and the correct preceding id. |
409 | | CHECKED_STATUS DeduplicateLeaderRequestUnlocked(ConsensusRequestPB* rpc_req, |
410 | | LeaderRequest* deduplicated_req); |
411 | | |
412 | | // Handles a request from a leader, refusing the request if the term is lower than |
413 | | // ours or stepping down if it's higher. |
414 | | CHECKED_STATUS HandleLeaderRequestTermUnlocked(const ConsensusRequestPB* request, |
415 | | ConsensusResponsePB* response); |
416 | | |
417 | | // Checks that the preceding op in 'req' is locally committed or pending and sets an |
418 | | // appropriate error message in 'response' if not. |
419 | | // If there is term mismatch between the preceding op id in 'req' and the local log's |
420 | | // pending operations, we proactively abort those pending operations after and including |
421 | | // the preceding op in 'req' to avoid a pointless cache miss in the leader's log cache. |
422 | | CHECKED_STATUS EnforceLogMatchingPropertyMatchesUnlocked(const LeaderRequest& req, |
423 | | ConsensusResponsePB* response); |
424 | | |
425 | | // Checks that deduplicated messages in an UpdateConsensus request are in the right order. |
426 | | CHECKED_STATUS CheckLeaderRequestOpIdSequence( |
427 | | const LeaderRequest& deduped_req, |
428 | | ConsensusRequestPB* request); |
429 | | |
430 | | // Check a request received from a leader, making sure: |
431 | | // - The request is in the right term |
432 | | // - The log matching property holds |
433 | | // - Messages are de-duplicated so that we only process previously unprocessed requests. |
434 | | // - We abort operations if the leader sends operations that have the same index as |
435 | | // operations currently on the pendings set, but different terms. |
436 | | // If this returns ok and the response has no errors, 'deduped_req' is set with only |
437 | | // the messages to add to our state machine. |
438 | | CHECKED_STATUS CheckLeaderRequestUnlocked( |
439 | | ConsensusRequestPB* request, |
440 | | ConsensusResponsePB* response, |
441 | | LeaderRequest* deduped_req); |
442 | | |
443 | | // Returns the most recent OpId written to the Log. |
444 | | yb::OpId GetLatestOpIdFromLog(); |
445 | | |
446 | | // Begin a replica operation. If the type of message in 'msg' is not a type |
447 | | // that uses operations, delegates to StartConsensusOnlyRoundUnlocked(). |
448 | | CHECKED_STATUS StartReplicaOperationUnlocked(const ReplicateMsgPtr& msg, |
449 | | HybridTime propagated_safe_time); |
450 | | |
451 | | // Return header string for RequestVote log messages. The ReplicaState lock must be held. |
452 | | std::string GetRequestVoteLogPrefix(const VoteRequestPB& request) const; |
453 | | |
454 | | // Fills the response with the current status, if an update was successful. |
455 | | void FillConsensusResponseOKUnlocked(ConsensusResponsePB* response); |
456 | | |
457 | | // Fills the response with an error code and error message. |
458 | | void FillConsensusResponseError(ConsensusResponsePB* response, |
459 | | ConsensusErrorPB::Code error_code, |
460 | | const Status& status); |
461 | | |
462 | | // Fill VoteResponsePB with the following information: |
463 | | // - Update responder_term to current local term. |
464 | | // - Set vote_granted to true. |
465 | | void FillVoteResponseVoteGranted(const VoteRequestPB& request, VoteResponsePB* response); |
466 | | |
467 | | // Fill VoteResponsePB with the following information: |
468 | | // - Update responder_term to current local term. |
469 | | // - Set vote_granted to false. |
470 | | // - Set consensus_error.code to the given code. |
471 | | void FillVoteResponseVoteDenied(ConsensusErrorPB::Code error_code, VoteResponsePB* response); |
472 | | |
473 | | void RequestVoteRespondVoteDenied( |
474 | | ConsensusErrorPB::Code error_code, const std::string& message_suffix, |
475 | | const VoteRequestPB& request, VoteResponsePB* response); |
476 | | |
477 | | // Respond to VoteRequest that the candidate has an old term. |
478 | | CHECKED_STATUS RequestVoteRespondInvalidTerm(const VoteRequestPB* request, |
479 | | VoteResponsePB* response); |
480 | | |
481 | | // Respond to VoteRequest that we already granted our vote to the candidate. |
482 | | CHECKED_STATUS RequestVoteRespondVoteAlreadyGranted(const VoteRequestPB* request, |
483 | | VoteResponsePB* response); |
484 | | |
485 | | // Respond to VoteRequest that we already granted our vote to someone else. |
486 | | CHECKED_STATUS RequestVoteRespondAlreadyVotedForOther(const VoteRequestPB* request, |
487 | | VoteResponsePB* response); |
488 | | |
489 | | // Respond to VoteRequest that the candidate's last-logged OpId is too old. |
490 | | CHECKED_STATUS RequestVoteRespondLastOpIdTooOld(const OpIdPB& local_last_opid, |
491 | | const VoteRequestPB* request, |
492 | | VoteResponsePB* response); |
493 | | |
494 | | // Respond to VoteRequest that the vote was not granted because we believe |
495 | | // the leader to be alive. |
496 | | CHECKED_STATUS RequestVoteRespondLeaderIsAlive(const VoteRequestPB* request, |
497 | | VoteResponsePB* response); |
498 | | |
499 | | // Respond to VoteRequest that the replica is already in the middle of servicing |
500 | | // another vote request or an update from a valid leader. |
501 | | CHECKED_STATUS RequestVoteRespondIsBusy(const VoteRequestPB* request, |
502 | | VoteResponsePB* response); |
503 | | |
504 | | // Respond to VoteRequest that the vote is granted for candidate. |
505 | | CHECKED_STATUS RequestVoteRespondVoteGranted(const VoteRequestPB* request, |
506 | | VoteResponsePB* response); |
507 | | |
508 | | // Callback for leader election driver. ElectionCallback is run on the |
509 | | // reactor thread, so it simply defers its work to DoElectionCallback. |
510 | | void ElectionCallback(const LeaderElectionData& data, const ElectionResult& result); |
511 | | void DoElectionCallback(const LeaderElectionData& data, const ElectionResult& result); |
512 | | void NotifyOriginatorAboutLostElection(const std::string& originator_uuid); |
513 | | |
514 | | // Helper struct that tracks the RunLeaderElection as part of leadership transferral. |
515 | | struct RunLeaderElectionState { |
516 | | PeerProxyPtr proxy; |
517 | | RunLeaderElectionRequestPB req; |
518 | | RunLeaderElectionResponsePB resp; |
519 | | rpc::RpcController rpc; |
520 | | }; |
521 | | |
522 | | // Callback for RunLeaderElection async request. |
523 | | void RunLeaderElectionResponseRpcCallback(std::shared_ptr<RunLeaderElectionState> election_state); |
524 | | |
525 | | // Start tracking the leader for failures. This typically occurs at startup |
526 | | // and when the local peer steps down as leader. |
527 | | // |
528 | | // If 'delta' is set, it is used as the initial failure period. Otherwise, |
529 | | // the minimum election timeout is used. |
530 | | // |
531 | | // If the failure detector is already registered, has no effect. |
532 | | void EnableFailureDetector(MonoDelta delta = MonoDelta()); |
533 | | |
534 | | // Stop tracking the current leader for failures. |
535 | | // This typically happens when the local peer becomes leader. |
536 | | // If the failure detector is already disabled, has no effect. |
537 | | void DisableFailureDetector(); |
538 | | |
539 | | // "Reset" the failure detector to indicate leader activity. |
540 | | // When this is called a failure is guaranteed not to be detected |
541 | | // before 'FLAGS_leader_failure_max_missed_heartbeat_periods' * |
542 | | // 'FLAGS_raft_heartbeat_interval_ms' has elapsed, unless 'delta' is set, in |
543 | | // which case its value is used as the next failure period. |
544 | | // If 'allow_logging' is set to ALLOW_LOGGING, then this method |
545 | | // will print a log message when called. |
546 | | // If the failure detector is not registered, this method has no effect. |
547 | | void SnoozeFailureDetector(AllowLogging allow_logging, |
548 | | MonoDelta delta = MonoDelta()); |
549 | | |
550 | | // Return the minimum election timeout. Due to backoff and random |
551 | | // jitter, election timeouts may be longer than this. |
552 | | MonoDelta MinimumElectionTimeout() const; |
553 | | |
554 | | // Calculates a snooze delta for leader election. |
555 | | // The delta increases exponentially with the difference |
556 | | // between the current term and the term of the last committed |
557 | | // operation. |
558 | | // The maximum delta is capped by 'FLAGS_leader_failure_exp_backoff_max_delta_ms'. |
559 | | MonoDelta LeaderElectionExpBackoffDeltaUnlocked(); |
560 | | |
561 | | // Checks if the leader is ready to process a change config request (one requirement for this is |
562 | | // for it to have at least one committed op in the current term). Also checks that there are no |
563 | | // voters in transition in the active config state. CHECKED_STATUS OK() implies leader is ready. |
564 | | // server_uuid is the uuid of the server that we are trying to remove, add, or change its |
565 | | // role. |
566 | | CHECKED_STATUS IsLeaderReadyForChangeConfigUnlocked(ChangeConfigType type, |
567 | | const std::string& server_uuid); |
568 | | |
569 | | // Increment the term to the next term, resetting the current leader, etc. |
570 | | CHECKED_STATUS IncrementTermUnlocked(); |
571 | | |
572 | | // Handle when the term has advanced beyond the current term. |
573 | | CHECKED_STATUS HandleTermAdvanceUnlocked(ConsensusTerm new_term); |
574 | | |
575 | | // Notify the tablet peer that the consensus configuration |
576 | | // has changed, thus reporting it back to the master. This is performed inline. |
577 | | void MarkDirty(std::shared_ptr<StateChangeContext> context); |
578 | | |
579 | | // Calls MarkDirty() if 'status' == OK. Then, always calls 'client_cb' with |
580 | | // 'status' as its argument. |
581 | | void MarkDirtyOnSuccess(std::shared_ptr<StateChangeContext> context, |
582 | | const StdStatusCallback& client_cb, |
583 | | const Status& status); |
584 | | |
585 | | // Attempt to remove the follower with the specified 'uuid' from the config, |
586 | | // if the 'committed_config' is still the committed config and if the current |
587 | | // node is the leader. |
588 | | // |
589 | | // Since this is inherently an asynchronous operation run on a thread pool, |
590 | | // it may fail due to the configuration changing, the local node losing |
591 | | // leadership, or the tablet shutting down. |
592 | | // Logs a warning on failure. |
593 | | void TryRemoveFollowerTask(const std::string& uuid, |
594 | | const RaftConfigPB& committed_config, |
595 | | const std::string& reason); |
596 | | |
597 | | // Called when the failure detector expires. |
598 | | // Submits ReportFailureDetectedTask() to a thread pool. |
599 | | void ReportFailureDetected(); |
600 | | |
601 | | // Call StartElection(), log a warning if the call fails (usually due to |
602 | | // being shut down). |
603 | | void ReportFailureDetectedTask(); |
604 | | |
605 | | // Helper API to check if the pending/committed configuration has a PRE_VOTER. Non-null return |
606 | | // string implies there are servers in transit. |
607 | | string ServersInTransitionMessage(); |
608 | | |
609 | | // Prevent starting new election for some time, after we stepped down. |
610 | | // protege_uuid - in case of step down we remember our protege. |
611 | | // After that we use its UUID to check whether node that lost election is our active protege. |
612 | | // There could be case that we already initiated another stepdown, and after that we received |
613 | | // delayed packet from old protege. |
614 | | // So this field allows us to filter out this situation. |
615 | | // Also we could introduce serial number of stepdown and filter using it. |
616 | | // That woule be more robust, since it handles also situation when we tried to stepdown |
617 | | // to the same node twice, and first retry was delayed, but second procedure is on the way. |
618 | | void WithholdElectionAfterStepDown(const std::string& protege_uuid); |
619 | | |
620 | | // Steps of UpdateReplica. |
621 | | CHECKED_STATUS EarlyCommitUnlocked(const ConsensusRequestPB& request, |
622 | | const LeaderRequest& deduped_req); |
623 | | Result<bool> EnqueuePreparesUnlocked(const ConsensusRequestPB& request, |
624 | | LeaderRequest* deduped_req, |
625 | | ConsensusResponsePB* response); |
626 | | // Returns last op id received from leader. |
627 | | yb::OpId EnqueueWritesUnlocked(const LeaderRequest& deduped_req, WriteEmpty write_empty); |
628 | | CHECKED_STATUS MarkOperationsAsCommittedUnlocked(const ConsensusRequestPB& request, |
629 | | const LeaderRequest& deduped_req, |
630 | | OpId last_from_leader); |
631 | | |
632 | | // Wait until the operation with op id equal to wait_for_op_id is flushed in the WAL. |
633 | | // If term was changed during wait from the specified one - exit with error. |
634 | | CHECKED_STATUS WaitForWrites(int64_t term, const OpId& wait_for_op_id); |
635 | | |
636 | | // See comment for ReplicaState::CancelPendingOperation |
637 | | void RollbackIdAndDeleteOpId(const ReplicateMsgPtr& replicate_msg, bool should_exists); |
638 | | |
639 | | yb::OpId WaitForSafeOpIdToApply(const yb::OpId& op_id) override; |
640 | | |
641 | | void AppendEmptyBatchToLeaderLog(); |
642 | | |
643 | | // Step down in favor of peer. |
644 | | // When graceful is true, protege would not be stored and election would not take place in case |
645 | | // of protege election failure. |
646 | | CHECKED_STATUS StartStepDownUnlocked(const RaftPeerPB& peer, bool graceful); |
647 | | |
648 | | // Checked whether we should start step down when protege did not synchronize before timeout. |
649 | | void CheckDelayedStepDown(const Status& status); |
650 | | |
651 | | // Threadpool token for constructing requests to peers, handling RPC callbacks, |
652 | | // etc. |
653 | | std::unique_ptr<ThreadPoolToken> raft_pool_token_; |
654 | | |
655 | | scoped_refptr<log::Log> log_; |
656 | | scoped_refptr<server::Clock> clock_; |
657 | | std::unique_ptr<PeerProxyFactory> peer_proxy_factory_; |
658 | | |
659 | | std::unique_ptr<PeerManager> peer_manager_; |
660 | | |
661 | | // The queue of messages that must be sent to peers. |
662 | | std::unique_ptr<PeerMessageQueue> queue_; |
663 | | |
664 | | std::unique_ptr<ReplicaState> state_; |
665 | | |
666 | | Random rng_; |
667 | | |
668 | | std::shared_ptr<rpc::PeriodicTimer> failure_detector_; |
669 | | |
670 | | // If any RequestVote() RPC arrives before this hybrid time, |
671 | | // the request will be ignored. This prevents abandoned or partitioned |
672 | | // nodes from disturbing the healthy leader. |
673 | | std::atomic<MonoTime> withhold_votes_until_; |
674 | | |
675 | | // UUID of new desired leader during stepdown. |
676 | | TabletServerId protege_leader_uuid_; |
677 | | |
678 | | // This is the time (in the MonoTime's uint64 representation) for which election should not start |
679 | | // on this peer. |
680 | | std::atomic<MonoTime> withhold_election_start_until_{MonoTime::Min()}; |
681 | | |
682 | | // We record the moment at which we discover that an election has been lost by our "protege" |
683 | | // during leader stepdown. Then, when the master asks us to step down again in favor of the same |
684 | | // server, we'll reply with the amount of time that has passed to avoid leader stepdown loops.s |
685 | | MonoTime election_lost_by_protege_at_; |
686 | | |
687 | | struct DelayedStepDown { |
688 | | int64_t term = OpId::kUnknownTerm; |
689 | | TabletServerId protege; |
690 | | bool graceful; |
691 | | |
692 | | std::string ToString() const; |
693 | | }; |
694 | | |
695 | | DelayedStepDown delayed_step_down_; |
696 | | rpc::ScheduledTaskTracker step_down_check_tracker_; |
697 | | |
698 | | // The number of times this node has called and lost a leader election since |
699 | | // the last time it saw a stable leader (either itself or another node). |
700 | | // This is used to calculate back-off of the election timeout. |
701 | | std::atomic<int> failed_elections_since_stable_leader_{0}; |
702 | | |
703 | | const Callback<void(std::shared_ptr<StateChangeContext> context)> mark_dirty_clbk_; |
704 | | |
705 | | // Lock ordering note: If both this lock and the ReplicaState lock are to be |
706 | | // taken, this lock must be taken first. |
707 | | mutable std::timed_mutex update_mutex_; |
708 | | |
709 | | std::atomic_flag outstanding_report_failure_task_ = ATOMIC_FLAG_INIT; |
710 | | |
711 | | AtomicBool shutdown_; |
712 | | |
713 | | scoped_refptr<Counter> follower_memory_pressure_rejections_; |
714 | | scoped_refptr<AtomicGauge<int64_t>> term_metric_; |
715 | | scoped_refptr<AtomicMillisLag> follower_last_update_time_ms_metric_; |
716 | | scoped_refptr<AtomicGauge<int64_t>> is_raft_leader_metric_; |
717 | | std::shared_ptr<MemTracker> parent_mem_tracker_; |
718 | | |
719 | | TableType table_type_; |
720 | | |
721 | | // Mutex / condition used for waiting for acquiring a valid leader lease. |
722 | | std::mutex leader_lease_wait_mtx_; |
723 | | std::condition_variable leader_lease_wait_cond_; |
724 | | |
725 | | scoped_refptr<Histogram> update_raft_config_dns_latency_; |
726 | | |
727 | | // Used only when TEST_follower_reject_update_consensus_requests_seconds is greater than 0. |
728 | | // Any requests to update the replica will be rejected until this time. For testing only. |
729 | | MonoTime withold_replica_updates_until_ = MonoTime::kUninitialized; |
730 | | |
731 | | std::atomic<RejectMode> reject_mode_{RejectMode::kNone}; |
732 | | |
733 | | CoarseTimePoint disable_pre_elections_until_ = CoarseTimePoint::min(); |
734 | | |
735 | | std::atomic<MonoDelta> TEST_delay_update_{MonoDelta::kZero}; |
736 | | |
737 | | std::atomic<uint64_t> majority_num_sst_files_{0}; |
738 | | |
739 | | const TabletId split_parent_tablet_id_; |
740 | | |
741 | | DISALLOW_COPY_AND_ASSIGN(RaftConsensus); |
742 | | }; |
743 | | |
744 | | } // namespace consensus |
745 | | } // namespace yb |
746 | | |
747 | | #endif /* YB_CONSENSUS_RAFT_CONSENSUS_H_ */ |