/Users/deen/code/yugabyte-db/src/yb/consensus/raft_consensus.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | // |
18 | | // The following only applies to changes made to this file as part of YugaByte development. |
19 | | // |
20 | | // Portions Copyright (c) YugaByte, Inc. |
21 | | // |
22 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
23 | | // in compliance with the License. You may obtain a copy of the License at |
24 | | // |
25 | | // http://www.apache.org/licenses/LICENSE-2.0 |
26 | | // |
27 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
28 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
29 | | // or implied. See the License for the specific language governing permissions and limitations |
30 | | // under the License. |
31 | | // |
32 | | |
33 | | #include "yb/consensus/raft_consensus.h" |
34 | | |
35 | | #include <algorithm> |
36 | | #include <memory> |
37 | | #include <mutex> |
38 | | |
39 | | #include <boost/optional.hpp> |
40 | | #include <gflags/gflags.h> |
41 | | |
42 | | #include "yb/common/wire_protocol.h" |
43 | | |
44 | | #include "yb/consensus/consensus.pb.h" |
45 | | #include "yb/consensus/consensus_context.h" |
46 | | #include "yb/consensus/consensus_peers.h" |
47 | | #include "yb/consensus/consensus_round.h" |
48 | | #include "yb/consensus/leader_election.h" |
49 | | #include "yb/consensus/log.h" |
50 | | #include "yb/consensus/peer_manager.h" |
51 | | #include "yb/consensus/quorum_util.h" |
52 | | #include "yb/consensus/replica_state.h" |
53 | | #include "yb/consensus/state_change_context.h" |
54 | | |
55 | | #include "yb/gutil/casts.h" |
56 | | #include "yb/gutil/map-util.h" |
57 | | #include "yb/gutil/stringprintf.h" |
58 | | |
59 | | #include "yb/rpc/messenger.h" |
60 | | #include "yb/rpc/periodic.h" |
61 | | #include "yb/rpc/rpc_controller.h" |
62 | | |
63 | | #include "yb/server/clock.h" |
64 | | |
65 | | #include "yb/util/debug-util.h" |
66 | | #include "yb/util/debug/long_operation_tracker.h" |
67 | | #include "yb/util/debug/trace_event.h" |
68 | | #include "yb/util/enums.h" |
69 | | #include "yb/util/flag_tags.h" |
70 | | #include "yb/util/format.h" |
71 | | #include "yb/util/logging.h" |
72 | | #include "yb/util/metrics.h" |
73 | | #include "yb/util/net/dns_resolver.h" |
74 | | #include "yb/util/random.h" |
75 | | #include "yb/util/random_util.h" |
76 | | #include "yb/util/scope_exit.h" |
77 | | #include "yb/util/status_format.h" |
78 | | #include "yb/util/status_log.h" |
79 | | #include "yb/util/threadpool.h" |
80 | | #include "yb/util/tostring.h" |
81 | | #include "yb/util/trace.h" |
82 | | #include "yb/util/tsan_util.h" |
83 | | #include "yb/util/url-coding.h" |
84 | | |
85 | | using namespace std::literals; |
86 | | using namespace std::placeholders; |
87 | | |
88 | | DEFINE_int32(raft_heartbeat_interval_ms, yb::NonTsanVsTsan(500, 1000), |
89 | | "The heartbeat interval for Raft replication. The leader produces heartbeats " |
90 | | "to followers at this interval. The followers expect a heartbeat at this interval " |
91 | | "and consider a leader to have failed if it misses several in a row."); |
92 | | TAG_FLAG(raft_heartbeat_interval_ms, advanced); |
93 | | |
94 | | DEFINE_double(leader_failure_max_missed_heartbeat_periods, 6.0, |
95 | | "Maximum heartbeat periods that the leader can fail to heartbeat in before we " |
96 | | "consider the leader to be failed. The total failure timeout in milliseconds is " |
97 | | "raft_heartbeat_interval_ms times leader_failure_max_missed_heartbeat_periods. " |
98 | | "The value passed to this flag may be fractional."); |
99 | | TAG_FLAG(leader_failure_max_missed_heartbeat_periods, advanced); |
100 | | |
101 | | DEFINE_int32(leader_failure_exp_backoff_max_delta_ms, 20 * 1000, |
102 | | "Maximum time to sleep in between leader election retries, in addition to the " |
103 | | "regular timeout. When leader election fails the interval in between retries " |
104 | | "increases exponentially, up to this value."); |
105 | | TAG_FLAG(leader_failure_exp_backoff_max_delta_ms, experimental); |
106 | | |
107 | | DEFINE_bool(enable_leader_failure_detection, true, |
108 | | "Whether to enable failure detection of tablet leaders. If enabled, attempts will be " |
109 | | "made to elect a follower as a new leader when the leader is detected to have failed."); |
110 | | TAG_FLAG(enable_leader_failure_detection, unsafe); |
111 | | |
112 | | DEFINE_test_flag(bool, do_not_start_election_test_only, false, |
113 | | "Do not start election even if leader failure is detected. "); |
114 | | TAG_FLAG(TEST_do_not_start_election_test_only, runtime); |
115 | | |
116 | | DEFINE_bool(evict_failed_followers, true, |
117 | | "Whether to evict followers from the Raft config that have fallen " |
118 | | "too far behind the leader's log to catch up normally or have been " |
119 | | "unreachable by the leader for longer than " |
120 | | "follower_unavailable_considered_failed_sec"); |
121 | | TAG_FLAG(evict_failed_followers, advanced); |
122 | | |
123 | | DEFINE_test_flag(bool, follower_reject_update_consensus_requests, false, |
124 | | "Whether a follower will return an error for all UpdateConsensus() requests."); |
125 | | |
126 | | DEFINE_test_flag(bool, follower_pause_update_consensus_requests, false, |
127 | | "Whether a follower will pause all UpdateConsensus() requests."); |
128 | | |
129 | | DEFINE_test_flag(int32, follower_reject_update_consensus_requests_seconds, 0, |
130 | | "Whether a follower will return an error for all UpdateConsensus() requests for " |
131 | | "the first TEST_follower_reject_update_consensus_requests_seconds seconds after " |
132 | | "the Consensus objet is created."); |
133 | | |
134 | | DEFINE_test_flag(bool, follower_fail_all_prepare, false, |
135 | | "Whether a follower will fail preparing all operations."); |
136 | | |
137 | | DEFINE_int32(after_stepdown_delay_election_multiplier, 5, |
138 | | "After a peer steps down as a leader, the factor with which to multiply " |
139 | | "leader_failure_max_missed_heartbeat_periods to get the delay time before starting a " |
140 | | "new election."); |
141 | | TAG_FLAG(after_stepdown_delay_election_multiplier, advanced); |
142 | | TAG_FLAG(after_stepdown_delay_election_multiplier, hidden); |
143 | | |
144 | | DECLARE_int32(memory_limit_warn_threshold_percentage); |
145 | | |
146 | | DEFINE_test_flag(int32, inject_delay_leader_change_role_append_secs, 0, |
147 | | "Amount of time to delay leader from sending replicate of change role."); |
148 | | |
149 | | DEFINE_test_flag(double, return_error_on_change_config, 0.0, |
150 | | "Fraction of the time when ChangeConfig will return an error."); |
151 | | |
152 | | METRIC_DEFINE_counter(tablet, follower_memory_pressure_rejections, |
153 | | "Follower Memory Pressure Rejections", |
154 | | yb::MetricUnit::kRequests, |
155 | | "Number of RPC requests rejected due to " |
156 | | "memory pressure while FOLLOWER."); |
157 | | METRIC_DEFINE_gauge_int64(tablet, raft_term, |
158 | | "Current Raft Consensus Term", |
159 | | yb::MetricUnit::kUnits, |
160 | | "Current Term of the Raft Consensus algorithm. This number increments " |
161 | | "each time a leader election is started."); |
162 | | |
163 | | METRIC_DEFINE_lag(tablet, follower_lag_ms, |
164 | | "Follower lag from leader", |
165 | | "The amount of time since the last UpdateConsensus request from the " |
166 | | "leader.", {0, yb::AggregationFunction::kMax} /* optional_args */); |
167 | | |
168 | | METRIC_DEFINE_gauge_int64(tablet, is_raft_leader, |
169 | | "Is tablet raft leader", |
170 | | yb::MetricUnit::kUnits, |
171 | | "Keeps track whether tablet is raft leader" |
172 | | "1 indicates that the tablet is raft leader"); |
173 | | |
174 | | METRIC_DEFINE_coarse_histogram( |
175 | | table, dns_resolve_latency_during_update_raft_config, |
176 | | "yb.consensus.RaftConsensus.UpdateRaftConfig DNS Resolve", |
177 | | yb::MetricUnit::kMicroseconds, |
178 | | "Microseconds spent resolving DNS requests during RaftConsensus::UpdateRaftConfig"); |
179 | | |
180 | | DEFINE_int32(leader_lease_duration_ms, yb::consensus::kDefaultLeaderLeaseDurationMs, |
181 | | "Leader lease duration. A leader keeps establishing a new lease or extending the " |
182 | | "existing one with every UpdateConsensus. A new server is not allowed to serve as a " |
183 | | "leader (i.e. serve up-to-date read requests or acknowledge write requests) until a " |
184 | | "lease of this duration has definitely expired on the old leader's side."); |
185 | | |
186 | | DEFINE_int32(ht_lease_duration_ms, 2000, |
187 | | "Hybrid time leader lease duration. A leader keeps establishing a new lease or " |
188 | | "extending the existing one with every UpdateConsensus. A new server is not allowed " |
189 | | "to add entries to RAFT log until a lease of the old leader is expired. 0 to disable." |
190 | | ); |
191 | | |
192 | | DEFINE_int32(min_leader_stepdown_retry_interval_ms, |
193 | | 20 * 1000, |
194 | | "Minimum amount of time between successive attempts to perform the leader stepdown " |
195 | | "for the same combination of tablet and intended (target) leader. This is needed " |
196 | | "to avoid infinite leader stepdown loops when the current leader never has a chance " |
197 | | "to update the intended leader with its latest records."); |
198 | | |
199 | | DEFINE_bool(use_preelection, true, "Whether to use pre election, before doing actual election."); |
200 | | |
201 | | DEFINE_int32(temporary_disable_preelections_timeout_ms, 10 * 60 * 1000, |
202 | | "If some of nodes does not support preelections, then we disable them for this " |
203 | | "amount of time."); |
204 | | |
205 | | DEFINE_test_flag(bool, pause_update_replica, false, |
206 | | "Pause RaftConsensus::UpdateReplica processing before snoozing failure detector."); |
207 | | |
208 | | DEFINE_test_flag(bool, pause_update_majority_replicated, false, |
209 | | "Pause RaftConsensus::UpdateMajorityReplicated."); |
210 | | |
211 | | DEFINE_test_flag(int32, log_change_config_every_n, 1, |
212 | | "How often to log change config information. " |
213 | | "Used to reduce the number of lines being printed for change config requests " |
214 | | "when a test simulates a failure that would generate a log of these requests."); |
215 | | |
216 | | DEFINE_bool(enable_lease_revocation, true, "Enables lease revocation mechanism"); |
217 | | |
218 | | DEFINE_bool(quick_leader_election_on_create, false, |
219 | | "Do we trigger quick leader elections on table creation."); |
220 | | TAG_FLAG(quick_leader_election_on_create, advanced); |
221 | | TAG_FLAG(quick_leader_election_on_create, hidden); |
222 | | |
223 | | DEFINE_bool( |
224 | | stepdown_disable_graceful_transition, false, |
225 | | "During a leader stepdown, disable graceful leadership transfer " |
226 | | "to an up to date peer"); |
227 | | |
228 | | DEFINE_bool( |
229 | | raft_disallow_concurrent_outstanding_report_failure_tasks, true, |
230 | | "If true, only submit a new report failure task if there is not one outstanding."); |
231 | | TAG_FLAG(raft_disallow_concurrent_outstanding_report_failure_tasks, advanced); |
232 | | TAG_FLAG(raft_disallow_concurrent_outstanding_report_failure_tasks, hidden); |
233 | | |
234 | | DEFINE_int64(protege_synchronization_timeout_ms, 1000, |
235 | | "Timeout to synchronize protege before performing step down. " |
236 | | "0 to disable synchronization."); |
237 | | |
238 | | namespace yb { |
239 | | namespace consensus { |
240 | | |
241 | | using log::LogEntryBatch; |
242 | | using rpc::PeriodicTimer; |
243 | | using std::shared_ptr; |
244 | | using std::unique_ptr; |
245 | | using std::weak_ptr; |
246 | | using strings::Substitute; |
247 | | using tserver::TabletServerErrorPB; |
248 | | |
249 | | namespace { |
250 | | |
251 | 10.1k | const RaftPeerPB* FindPeer(const RaftConfigPB& active_config, const std::string& uuid) { |
252 | 20.7k | for (const RaftPeerPB& peer : active_config.peers()) { |
253 | 20.7k | if (peer.permanent_uuid() == uuid) { |
254 | 10.1k | return &peer; |
255 | 10.1k | } |
256 | 20.7k | } |
257 | | |
258 | 18.4E | return nullptr; |
259 | 10.1k | } |
260 | | |
261 | | // Helper function to check if the op is a non-Operation op. |
262 | 14.7M | bool IsConsensusOnlyOperation(OperationType op_type) { |
263 | 14.7M | return op_type == NO_OP || op_type == CHANGE_CONFIG_OP14.2M ; |
264 | 14.7M | } |
265 | | |
266 | | // Helper to check if the op is Change Config op. |
267 | 333k | bool IsChangeConfigOperation(OperationType op_type) { |
268 | 333k | return op_type == CHANGE_CONFIG_OP; |
269 | 333k | } |
270 | | |
271 | | class NonTrackedRoundCallback : public ConsensusRoundCallback { |
272 | | public: |
273 | | explicit NonTrackedRoundCallback(ConsensusRound* round, const StdStatusCallback& callback) |
274 | 201k | : round_(round), callback_(callback) { |
275 | 201k | } |
276 | | |
277 | 67.9k | void AddedToLeader(const OpId& op_id, const OpId& committed_op_id) override { |
278 | 67.9k | auto& replicate_msg = *round_->replicate_msg(); |
279 | 67.9k | op_id.ToPB(replicate_msg.mutable_id()); |
280 | 67.9k | committed_op_id.ToPB(replicate_msg.mutable_committed_op_id()); |
281 | 67.9k | } |
282 | | |
283 | | void ReplicationFinished( |
284 | 200k | const Status& status, int64_t leader_term, OpIds* applied_op_ids) override { |
285 | 200k | down_cast<RaftConsensus*>(round_->consensus())->NonTrackedRoundReplicationFinished( |
286 | 200k | round_, callback_, status); |
287 | 200k | } |
288 | | |
289 | | private: |
290 | | ConsensusRound* round_; |
291 | | StdStatusCallback callback_; |
292 | | }; |
293 | | |
294 | | } // namespace |
295 | | |
296 | | std::unique_ptr<ConsensusRoundCallback> MakeNonTrackedRoundCallback( |
297 | 201k | ConsensusRound* round, const StdStatusCallback& callback) { |
298 | 201k | return std::make_unique<NonTrackedRoundCallback>(round, callback); |
299 | 201k | } |
300 | | |
301 | | struct RaftConsensus::LeaderRequest { |
302 | | std::string leader_uuid; |
303 | | yb::OpId preceding_op_id; |
304 | | yb::OpId committed_op_id; |
305 | | ReplicateMsgs messages; |
306 | | // The positional index of the first message selected to be appended, in the |
307 | | // original leader's request message sequence. |
308 | | int64_t first_message_idx; |
309 | | |
310 | | std::string OpsRangeString() const; |
311 | | }; |
312 | | |
313 | | shared_ptr<RaftConsensus> RaftConsensus::Create( |
314 | | const ConsensusOptions& options, |
315 | | std::unique_ptr<ConsensusMetadata> cmeta, |
316 | | const RaftPeerPB& local_peer_pb, |
317 | | const scoped_refptr<MetricEntity>& table_metric_entity, |
318 | | const scoped_refptr<MetricEntity>& tablet_metric_entity, |
319 | | const scoped_refptr<server::Clock>& clock, |
320 | | ConsensusContext* consensus_context, |
321 | | rpc::Messenger* messenger, |
322 | | rpc::ProxyCache* proxy_cache, |
323 | | const scoped_refptr<log::Log>& log, |
324 | | const shared_ptr<MemTracker>& server_mem_tracker, |
325 | | const shared_ptr<MemTracker>& parent_mem_tracker, |
326 | | const Callback<void(std::shared_ptr<StateChangeContext> context)> mark_dirty_clbk, |
327 | | TableType table_type, |
328 | | ThreadPool* raft_pool, |
329 | | RetryableRequests* retryable_requests, |
330 | 150k | MultiRaftManager* multi_raft_manager) { |
331 | | |
332 | 150k | auto rpc_factory = std::make_unique<RpcPeerProxyFactory>( |
333 | 150k | messenger, proxy_cache, local_peer_pb.cloud_info()); |
334 | | |
335 | | // The message queue that keeps track of which operations need to be replicated |
336 | | // where. |
337 | 150k | auto queue = std::make_unique<PeerMessageQueue>( |
338 | 150k | tablet_metric_entity, |
339 | 150k | log, |
340 | 150k | server_mem_tracker, |
341 | 150k | parent_mem_tracker, |
342 | 150k | local_peer_pb, |
343 | 150k | options.tablet_id, |
344 | 150k | clock, |
345 | 150k | consensus_context, |
346 | 150k | raft_pool->NewToken(ThreadPool::ExecutionMode::SERIAL)); |
347 | | |
348 | 150k | DCHECK(local_peer_pb.has_permanent_uuid()); |
349 | 150k | const string& peer_uuid = local_peer_pb.permanent_uuid(); |
350 | | |
351 | | // A single Raft thread pool token is shared between RaftConsensus and |
352 | | // PeerManager. Because PeerManager is owned by RaftConsensus, it receives a |
353 | | // raw pointer to the token, to emphasize that RaftConsensus is responsible |
354 | | // for destroying the token. |
355 | 150k | unique_ptr<ThreadPoolToken> raft_pool_token(raft_pool->NewToken( |
356 | 150k | ThreadPool::ExecutionMode::CONCURRENT)); |
357 | | |
358 | | // A manager for the set of peers that actually send the operations both remotely |
359 | | // and to the local wal. |
360 | 150k | auto peer_manager = std::make_unique<PeerManager>( |
361 | 150k | options.tablet_id, |
362 | 150k | peer_uuid, |
363 | 150k | rpc_factory.get(), |
364 | 150k | queue.get(), |
365 | 150k | raft_pool_token.get(), |
366 | 150k | multi_raft_manager); |
367 | | |
368 | 150k | return std::make_shared<RaftConsensus>( |
369 | 150k | options, |
370 | 150k | std::move(cmeta), |
371 | 150k | std::move(rpc_factory), |
372 | 150k | std::move(queue), |
373 | 150k | std::move(peer_manager), |
374 | 150k | std::move(raft_pool_token), |
375 | 150k | table_metric_entity, |
376 | 150k | tablet_metric_entity, |
377 | 150k | peer_uuid, |
378 | 150k | clock, |
379 | 150k | consensus_context, |
380 | 150k | log, |
381 | 150k | parent_mem_tracker, |
382 | 150k | mark_dirty_clbk, |
383 | 150k | table_type, |
384 | 150k | retryable_requests); |
385 | 150k | } |
386 | | |
387 | | RaftConsensus::RaftConsensus( |
388 | | const ConsensusOptions& options, std::unique_ptr<ConsensusMetadata> cmeta, |
389 | | std::unique_ptr<PeerProxyFactory> proxy_factory, |
390 | | std::unique_ptr<PeerMessageQueue> queue, |
391 | | std::unique_ptr<PeerManager> peer_manager, |
392 | | std::unique_ptr<ThreadPoolToken> raft_pool_token, |
393 | | const scoped_refptr<MetricEntity>& table_metric_entity, |
394 | | const scoped_refptr<MetricEntity>& tablet_metric_entity, |
395 | | const std::string& peer_uuid, const scoped_refptr<server::Clock>& clock, |
396 | | ConsensusContext* consensus_context, const scoped_refptr<log::Log>& log, |
397 | | shared_ptr<MemTracker> parent_mem_tracker, |
398 | | Callback<void(std::shared_ptr<StateChangeContext> context)> mark_dirty_clbk, |
399 | | TableType table_type, |
400 | | RetryableRequests* retryable_requests) |
401 | | : raft_pool_token_(std::move(raft_pool_token)), |
402 | | log_(log), |
403 | | clock_(clock), |
404 | | peer_proxy_factory_(std::move(proxy_factory)), |
405 | | peer_manager_(std::move(peer_manager)), |
406 | | queue_(std::move(queue)), |
407 | | rng_(GetRandomSeed32()), |
408 | | withhold_votes_until_(MonoTime::Min()), |
409 | | step_down_check_tracker_(&peer_proxy_factory_->messenger()->scheduler()), |
410 | | mark_dirty_clbk_(std::move(mark_dirty_clbk)), |
411 | | shutdown_(false), |
412 | | follower_memory_pressure_rejections_(tablet_metric_entity->FindOrCreateCounter( |
413 | | &METRIC_follower_memory_pressure_rejections)), |
414 | | term_metric_(tablet_metric_entity->FindOrCreateGauge(&METRIC_raft_term, |
415 | | cmeta->current_term())), |
416 | | follower_last_update_time_ms_metric_( |
417 | | tablet_metric_entity->FindOrCreateAtomicMillisLag(&METRIC_follower_lag_ms)), |
418 | | is_raft_leader_metric_(tablet_metric_entity->FindOrCreateGauge(&METRIC_is_raft_leader, |
419 | | static_cast<int64_t>(0))), |
420 | | parent_mem_tracker_(std::move(parent_mem_tracker)), |
421 | | table_type_(table_type), |
422 | | update_raft_config_dns_latency_( |
423 | | METRIC_dns_resolve_latency_during_update_raft_config.Instantiate(table_metric_entity)), |
424 | | split_parent_tablet_id_( |
425 | 150k | cmeta->has_split_parent_tablet_id() ? cmeta->split_parent_tablet_id() : "") { |
426 | 150k | DCHECK_NOTNULL(log_.get()); |
427 | | |
428 | 150k | if (PREDICT_FALSE(FLAGS_TEST_follower_reject_update_consensus_requests_seconds > 0)) { |
429 | 36 | withold_replica_updates_until_ = MonoTime::Now() + |
430 | 36 | MonoDelta::FromSeconds(FLAGS_TEST_follower_reject_update_consensus_requests_seconds); |
431 | 36 | } |
432 | | |
433 | 150k | state_ = std::make_unique<ReplicaState>( |
434 | 150k | options, |
435 | 150k | peer_uuid, |
436 | 150k | std::move(cmeta), |
437 | 150k | DCHECK_NOTNULL(consensus_context), |
438 | 150k | this, |
439 | 150k | retryable_requests, |
440 | 150k | std::bind(&PeerMessageQueue::TrackOperationsMemory, queue_.get(), _1)); |
441 | | |
442 | 150k | peer_manager_->SetConsensus(this); |
443 | 150k | } |
444 | | |
445 | 75.4k | RaftConsensus::~RaftConsensus() { |
446 | 75.4k | Shutdown(); |
447 | 75.4k | } |
448 | | |
449 | 150k | Status RaftConsensus::Start(const ConsensusBootstrapInfo& info) { |
450 | 150k | RETURN_NOT_OK(ExecuteHook(PRE_START)); |
451 | | |
452 | | // Capture a weak_ptr reference into the functor so it can safely handle |
453 | | // outliving the consensus instance. |
454 | 150k | std::weak_ptr<RaftConsensus> w = shared_from_this(); |
455 | 150k | failure_detector_ = PeriodicTimer::Create( |
456 | 150k | peer_proxy_factory_->messenger(), |
457 | 818k | [w]() { |
458 | 818k | if (auto consensus = w.lock()) { |
459 | 818k | consensus->ReportFailureDetected(); |
460 | 818k | } |
461 | 818k | }, |
462 | 150k | MinimumElectionTimeout()); |
463 | | |
464 | 150k | { |
465 | 150k | ReplicaState::UniqueLock lock; |
466 | 150k | RETURN_NOT_OK(state_->LockForStart(&lock)); |
467 | 150k | state_->ClearLeaderUnlocked(); |
468 | | |
469 | 150k | RETURN_NOT_OK_PREPEND(state_->StartUnlocked(info.last_id), |
470 | 150k | "Unable to start RAFT ReplicaState"); |
471 | | |
472 | 150k | LOG_WITH_PREFIX(INFO) << "Replica starting. Triggering " |
473 | 150k | << info.orphaned_replicates.size() |
474 | 150k | << " pending operations. Active config: " |
475 | 150k | << state_->GetActiveConfigUnlocked().ShortDebugString(); |
476 | 150k | for (const auto& replicate : info.orphaned_replicates) { |
477 | 156 | ReplicateMsgPtr replicate_ptr = std::make_shared<ReplicateMsg>(*replicate); |
478 | 156 | RETURN_NOT_OK(StartReplicaOperationUnlocked(replicate_ptr, HybridTime::kInvalid)); |
479 | 156 | } |
480 | | |
481 | 150k | RETURN_NOT_OK(state_->InitCommittedOpIdUnlocked(yb::OpId::FromPB(info.last_committed_id))); |
482 | | |
483 | 150k | queue_->Init(state_->GetLastReceivedOpIdUnlocked()); |
484 | 150k | } |
485 | | |
486 | 0 | { |
487 | 150k | ReplicaState::UniqueLock lock; |
488 | 150k | RETURN_NOT_OK(state_->LockForConfigChange(&lock)); |
489 | | |
490 | | // If this is the first term expire the FD immediately so that we have a fast first |
491 | | // election, otherwise we just let the timer expire normally. |
492 | 150k | MonoDelta initial_delta = MonoDelta(); |
493 | 150k | if (state_->GetCurrentTermUnlocked() == 0) { |
494 | | // The failure detector is initialized to a low value to trigger an early election |
495 | | // (unless someone else requested a vote from us first, which resets the |
496 | | // election timer). We do it this way instead of immediately running an |
497 | | // election to get a higher likelihood of enough servers being available |
498 | | // when the first one attempts an election to avoid multiple election |
499 | | // cycles on startup, while keeping that "waiting period" random. If there is only one peer, |
500 | | // trigger an election right away. |
501 | 147k | if (PREDICT_TRUE(FLAGS_enable_leader_failure_detection)) { |
502 | 147k | LOG_WITH_PREFIX(INFO) << "Consensus starting up: Expiring fail detector timer " |
503 | 147k | "to make a prompt election more likely"; |
504 | | // Gating quick leader elections on table creation since prompter leader elections are |
505 | | // more likely to fail due to uninitialized peers or conflicting elections, which could |
506 | | // have unforseen consequences. |
507 | 147k | if (FLAGS_quick_leader_election_on_create) { |
508 | 0 | initial_delta = (state_->GetCommittedConfigUnlocked().peers_size() == 1) ? |
509 | 0 | MonoDelta::kZero : |
510 | 0 | MonoDelta::FromMilliseconds(rng_.Uniform(FLAGS_raft_heartbeat_interval_ms)); |
511 | 0 | } |
512 | 147k | } |
513 | 147k | } |
514 | 150k | RETURN_NOT_OK(BecomeReplicaUnlocked(std::string(), initial_delta)); |
515 | 150k | } |
516 | | |
517 | 150k | RETURN_NOT_OK(ExecuteHook(POST_START)); |
518 | | |
519 | | // The context tracks that the current caller does not hold the lock for consensus state. |
520 | | // So mark dirty callback, e.g., consensus->ConsensusState() for master consensus callback of |
521 | | // SysCatalogStateChanged, can get the lock when needed. |
522 | 150k | auto context = std::make_shared<StateChangeContext>(StateChangeReason::CONSENSUS_STARTED, false); |
523 | | // Report become visible to the Master. |
524 | 150k | MarkDirty(context); |
525 | | |
526 | 150k | return Status::OK(); |
527 | 150k | } |
528 | | |
529 | 7.96k | bool RaftConsensus::IsRunning() const { |
530 | 7.96k | auto lock = state_->LockForRead(); |
531 | 7.96k | return state_->state() == ReplicaState::kRunning; |
532 | 7.96k | } |
533 | | |
534 | 86 | Status RaftConsensus::EmulateElection() { |
535 | 86 | ReplicaState::UniqueLock lock; |
536 | 86 | RETURN_NOT_OK(state_->LockForConfigChange(&lock)); |
537 | | |
538 | 86 | LOG_WITH_PREFIX(INFO) << "Emulating election..."; |
539 | | |
540 | | // Assume leadership of new term. |
541 | 86 | RETURN_NOT_OK(IncrementTermUnlocked()); |
542 | 86 | SetLeaderUuidUnlocked(state_->GetPeerUuid()); |
543 | 86 | return BecomeLeaderUnlocked(); |
544 | 86 | } |
545 | | |
546 | 1.22M | Status RaftConsensus::DoStartElection(const LeaderElectionData& data, PreElected preelected) { |
547 | 1.22M | TRACE_EVENT2("consensus", "RaftConsensus::StartElection", |
548 | 1.22M | "peer", peer_uuid(), |
549 | 1.22M | "tablet", tablet_id()); |
550 | 1.22M | VLOG_WITH_PREFIX_AND_FUNC47 (1) << data.ToString()47 ; |
551 | 1.22M | if (FLAGS_TEST_do_not_start_election_test_only) { |
552 | 11 | LOG(INFO) << "Election start skipped as TEST_do_not_start_election_test_only flag " |
553 | 11 | "is set to true."; |
554 | 11 | return Status::OK(); |
555 | 11 | } |
556 | | |
557 | | // If pre-elections disabled or we already won pre-election then start regular election, |
558 | | // otherwise pre-election is started. |
559 | | // Pre-elections could be disable via flag, or temporarily if some nodes do not support them. |
560 | 1.22M | auto preelection = ANNOTATE_UNPROTECTED_READ(FLAGS_use_preelection) && !preelected1.22M && |
561 | 1.22M | disable_pre_elections_until_ < CoarseMonoClock::now()674k ; |
562 | 1.22M | const char* election_name = preelection ? "pre-election"675k : "election"550k ; |
563 | | |
564 | 1.22M | LeaderElectionPtr election; |
565 | 1.22M | { |
566 | 1.22M | ReplicaState::UniqueLock lock; |
567 | 1.22M | RETURN_NOT_OK(state_->LockForConfigChange(&lock)); |
568 | | |
569 | 1.22M | if (data.initial_election && state_->GetCurrentTermUnlocked() != 089.5k ) { |
570 | 1 | LOG_WITH_PREFIX(INFO) << "Not starting initial " << election_name << " -- non zero term"; |
571 | 1 | return Status::OK(); |
572 | 1 | } |
573 | | |
574 | 1.22M | PeerRole active_role = state_->GetActiveRoleUnlocked(); |
575 | 1.22M | if (active_role == PeerRole::LEADER) { |
576 | 50 | LOG_WITH_PREFIX(INFO) << "Not starting " << election_name << " -- already leader"; |
577 | 50 | return Status::OK(); |
578 | 50 | } |
579 | 1.22M | if (1.22M active_role == PeerRole::LEARNER1.22M || active_role == PeerRole::READ_REPLICA) { |
580 | 217 | LOG_WITH_PREFIX(INFO) << "Not starting " << election_name << " -- role is " << active_role |
581 | 217 | << ", pending = " << state_->IsConfigChangePendingUnlocked() |
582 | 217 | << ", active_role=" << active_role; |
583 | 217 | return Status::OK(); |
584 | 217 | } |
585 | 1.22M | if (PREDICT_FALSE(active_role == PeerRole::NON_PARTICIPANT)) { |
586 | 33 | VLOG_WITH_PREFIX0 (1) << "Not starting " << election_name << " -- non participant"0 ; |
587 | | // Avoid excessive election noise while in this state. |
588 | 33 | SnoozeFailureDetector(DO_NOT_LOG); |
589 | 33 | return STATUS_FORMAT( |
590 | 33 | IllegalState, |
591 | 33 | "Not starting $0: Node is currently a non-participant in the raft config: $1", |
592 | 33 | election_name, state_->GetActiveConfigUnlocked()); |
593 | 33 | } |
594 | | |
595 | | // Default is to start the election now. But if we are starting a pending election, see if |
596 | | // there is an op id pending upon indeed and if it has been committed to the log. The op id |
597 | | // could have been cleared if the pending election has already been started or another peer |
598 | | // has jumped before we can start. |
599 | 1.22M | bool start_now = true; |
600 | 1.22M | if (data.pending_commit) { |
601 | 0 | const auto required_id = !data.must_be_committed_opid |
602 | 0 | ? state_->GetPendingElectionOpIdUnlocked() : data.must_be_committed_opid; |
603 | 0 | const Status advance_committed_index_status = ResultToStatus( |
604 | 0 | state_->AdvanceCommittedOpIdUnlocked(required_id, CouldStop::kFalse)); |
605 | 0 | if (!advance_committed_index_status.ok()) { |
606 | 0 | LOG(WARNING) << "Starting an " << election_name << " but the latest committed OpId is not " |
607 | 0 | "present in this peer's log: " |
608 | 0 | << required_id << ". " << "Status: " << advance_committed_index_status; |
609 | 0 | } |
610 | 0 | start_now = required_id.index <= state_->GetCommittedOpIdUnlocked().index; |
611 | 0 | } |
612 | | |
613 | 1.22M | if (start_now1.22M ) { |
614 | 1.22M | if (state_->HasLeaderUnlocked()) { |
615 | 492k | LOG_WITH_PREFIX(INFO) << "Fail of leader " << state_->GetLeaderUuidUnlocked() |
616 | 492k | << " detected. Triggering leader " << election_name |
617 | 492k | << ", mode=" << data.mode; |
618 | 733k | } else { |
619 | 733k | LOG_WITH_PREFIX(INFO) << "Triggering leader " << election_name << ", mode=" << data.mode; |
620 | 733k | } |
621 | | |
622 | | // Snooze to avoid the election timer firing again as much as possible. |
623 | | // We do not disable the election timer while running an election. |
624 | 1.22M | MonoDelta timeout = LeaderElectionExpBackoffDeltaUnlocked(); |
625 | 1.22M | SnoozeFailureDetector(ALLOW_LOGGING, timeout); |
626 | | |
627 | 1.22M | election = VERIFY_RESULT737k (CreateElectionUnlocked(data, timeout, PreElection(preelection))); |
628 | 18.4E | } else if (data.pending_commit && !data.must_be_committed_opid.empty()0 ) { |
629 | | // Queue up the pending op id if specified. |
630 | 0 | state_->SetPendingElectionOpIdUnlocked(data.must_be_committed_opid); |
631 | 0 | LOG_WITH_PREFIX(INFO) |
632 | 0 | << "Leader " << election_name << " is pending upon log commitment of OpId " |
633 | 0 | << data.must_be_committed_opid; |
634 | 18.4E | } else { |
635 | 18.4E | LOG_WITH_PREFIX(INFO) << "Ignore " << __func__ << " existing wait on op id"; |
636 | 18.4E | } |
637 | 1.22M | } |
638 | | |
639 | | // Start the election outside the lock. |
640 | 737k | if (737k election737k ) { |
641 | 737k | election->Run(); |
642 | 737k | } |
643 | | |
644 | 737k | return Status::OK(); |
645 | 1.22M | } |
646 | | |
647 | | Result<LeaderElectionPtr> RaftConsensus::CreateElectionUnlocked( |
648 | 1.22M | const LeaderElectionData& data, MonoDelta timeout, PreElection preelection) { |
649 | 1.22M | int64_t new_term; |
650 | 1.22M | if (preelection) { |
651 | 674k | new_term = state_->GetCurrentTermUnlocked() + 1; |
652 | 674k | } else { |
653 | | // Increment the term. |
654 | 550k | RETURN_NOT_OK(IncrementTermUnlocked()); |
655 | 62.8k | new_term = state_->GetCurrentTermUnlocked(); |
656 | 62.8k | } |
657 | | |
658 | 737k | const RaftConfigPB& active_config = state_->GetActiveConfigUnlocked(); |
659 | 737k | LOG_WITH_PREFIX(INFO) << "Starting " << (preelection ? "pre-"674k : ""62.8k ) << "election with config: " |
660 | 737k | << active_config.ShortDebugString(); |
661 | | |
662 | | // Initialize the VoteCounter. |
663 | 737k | auto num_voters = CountVoters(active_config); |
664 | 737k | auto majority_size = MajoritySize(num_voters); |
665 | | |
666 | | // Vote for ourselves. |
667 | 737k | if (!preelection) { |
668 | | // TODO: Consider using a separate Mutex for voting, which must sync to disk. |
669 | 62.8k | RETURN_NOT_OK(state_->SetVotedForCurrentTermUnlocked(state_->GetPeerUuid())); |
670 | 62.8k | } |
671 | | |
672 | 737k | auto counter = std::make_unique<VoteCounter>(num_voters, majority_size); |
673 | 737k | bool duplicate; |
674 | 737k | RETURN_NOT_OK(counter->RegisterVote(state_->GetPeerUuid(), ElectionVote::kGranted, &duplicate)); |
675 | 737k | CHECK(!duplicate) << state_->LogPrefix() |
676 | 71 | << "Inexplicable duplicate self-vote for term " |
677 | 71 | << state_->GetCurrentTermUnlocked(); |
678 | | |
679 | 737k | VoteRequestPB request; |
680 | 737k | request.set_ignore_live_leader(data.mode == ElectionMode::ELECT_EVEN_IF_LEADER_IS_ALIVE); |
681 | 737k | request.set_candidate_uuid(state_->GetPeerUuid()); |
682 | 737k | request.set_candidate_term(new_term); |
683 | 737k | request.set_tablet_id(state_->GetOptions().tablet_id); |
684 | 737k | request.set_preelection(preelection); |
685 | 737k | state_->GetLastReceivedOpIdUnlocked().ToPB( |
686 | 737k | request.mutable_candidate_status()->mutable_last_received()); |
687 | | |
688 | 737k | LeaderElectionPtr result(new LeaderElection( |
689 | 737k | active_config, |
690 | 737k | peer_proxy_factory_.get(), |
691 | 737k | request, |
692 | 737k | std::move(counter), |
693 | 737k | timeout, |
694 | 737k | preelection, |
695 | 737k | data.suppress_vote_request, |
696 | 737k | std::bind(&RaftConsensus::ElectionCallback, shared_from_this(), data, _1))); |
697 | | |
698 | 737k | if (!preelection) { |
699 | | // Clear the pending election op id so that we won't start the same pending election again. |
700 | | // Pre-election does not change state, so should not do it in this case. |
701 | 62.8k | state_->ClearPendingElectionOpIdUnlocked(); |
702 | 62.8k | } |
703 | | |
704 | 737k | return result; |
705 | 737k | } |
706 | | |
707 | 8 | Status RaftConsensus::WaitUntilLeaderForTests(const MonoDelta& timeout) { |
708 | 8 | MonoTime deadline = MonoTime::Now(); |
709 | 8 | deadline.AddDelta(timeout); |
710 | 286 | while (MonoTime::Now().ComesBefore(deadline)) { |
711 | 286 | if (GetLeaderStatus() == LeaderStatus::LEADER_AND_READY) { |
712 | 8 | return Status::OK(); |
713 | 8 | } |
714 | 278 | SleepFor(MonoDelta::FromMilliseconds(10)); |
715 | 278 | } |
716 | | |
717 | 0 | return STATUS(TimedOut, Substitute("Peer $0 is not leader of tablet $1 after $2. Role: $3", |
718 | 8 | peer_uuid(), tablet_id(), timeout.ToString(), role())); |
719 | 8 | } |
720 | | |
721 | 11.3k | string RaftConsensus::ServersInTransitionMessage() { |
722 | 11.3k | string err_msg; |
723 | 11.3k | const RaftConfigPB& active_config = state_->GetActiveConfigUnlocked(); |
724 | 11.3k | const RaftConfigPB& committed_config = state_->GetCommittedConfigUnlocked(); |
725 | 11.3k | auto servers_in_transition = CountServersInTransition(active_config); |
726 | 11.3k | auto committed_servers_in_transition = CountServersInTransition(committed_config); |
727 | 11.3k | LOG(INFO) << Substitute("Active config has $0 and committed has $1 servers in transition.", |
728 | 11.3k | servers_in_transition, committed_servers_in_transition); |
729 | 11.3k | if (servers_in_transition != 0 || committed_servers_in_transition != 011.1k ) { |
730 | 225 | err_msg = Substitute("Leader not ready to step down as there are $0 active config peers" |
731 | 225 | " in transition, $1 in committed. Configs:\nactive=$2\ncommit=$3", |
732 | 225 | servers_in_transition, committed_servers_in_transition, |
733 | 225 | active_config.ShortDebugString(), committed_config.ShortDebugString()); |
734 | 225 | LOG(INFO) << err_msg; |
735 | 225 | } |
736 | 11.3k | return err_msg; |
737 | 11.3k | } |
738 | | |
739 | 10.0k | Status RaftConsensus::StartStepDownUnlocked(const RaftPeerPB& peer, bool graceful) { |
740 | 10.0k | auto election_state = std::make_shared<RunLeaderElectionState>(); |
741 | 10.0k | election_state->proxy = peer_proxy_factory_->NewProxy(peer); |
742 | 10.0k | election_state->req.set_originator_uuid(state_->GetPeerUuid()); |
743 | 10.0k | election_state->req.set_dest_uuid(peer.permanent_uuid()); |
744 | 10.0k | election_state->req.set_tablet_id(state_->GetOptions().tablet_id); |
745 | 10.0k | election_state->rpc.set_invoke_callback_mode(rpc::InvokeCallbackMode::kThreadPoolHigh); |
746 | 10.0k | election_state->proxy->RunLeaderElectionAsync( |
747 | 10.0k | &election_state->req, &election_state->resp, &election_state->rpc, |
748 | 10.0k | std::bind(&RaftConsensus::RunLeaderElectionResponseRpcCallback, this, |
749 | 10.0k | election_state)); |
750 | | |
751 | 10.0k | LOG_WITH_PREFIX(INFO) << "Transferring leadership to " << peer.permanent_uuid(); |
752 | | |
753 | 10.0k | return BecomeReplicaUnlocked( |
754 | 10.0k | graceful ? std::string()569 : peer.permanent_uuid()9.43k , MonoDelta()); |
755 | 10.0k | } |
756 | | |
757 | 86 | void RaftConsensus::CheckDelayedStepDown(const Status& status) { |
758 | 86 | ReplicaState::UniqueLock lock; |
759 | 86 | auto lock_status = state_->LockForConfigChange(&lock); |
760 | 86 | if (!lock_status.ok()) { |
761 | 9 | LOG_WITH_PREFIX(INFO) << "Failed to check delayed election: " << lock_status; |
762 | 9 | return; |
763 | 9 | } |
764 | | |
765 | 77 | if (state_->GetCurrentTermUnlocked() != delayed_step_down_.term) { |
766 | 68 | return; |
767 | 68 | } |
768 | | |
769 | 9 | const auto& config = state_->GetActiveConfigUnlocked(); |
770 | 9 | const auto* peer = FindPeer(config, delayed_step_down_.protege); |
771 | 9 | if (peer) { |
772 | 9 | LOG_WITH_PREFIX(INFO) << "Step down in favor on not synchronized protege: " |
773 | 9 | << delayed_step_down_.protege; |
774 | 9 | WARN_NOT_OK(StartStepDownUnlocked(*peer, delayed_step_down_.graceful), |
775 | 9 | "Start step down failed"); |
776 | 9 | } else { |
777 | 0 | LOG_WITH_PREFIX(INFO) << "Failed to synchronize with protege " << delayed_step_down_.protege |
778 | 0 | << " and cannot find it in config: " << config.ShortDebugString(); |
779 | 0 | delayed_step_down_.term = OpId::kUnknownTerm; |
780 | 0 | } |
781 | 9 | } |
782 | | |
783 | 54.1k | Status RaftConsensus::StepDown(const LeaderStepDownRequestPB* req, LeaderStepDownResponsePB* resp) { |
784 | 54.1k | TRACE_EVENT0("consensus", "RaftConsensus::StepDown"); |
785 | 54.1k | ReplicaState::UniqueLock lock; |
786 | 54.1k | RETURN_NOT_OK(state_->LockForConfigChange(&lock)); |
787 | | |
788 | | // A sanity check that this request was routed to the correct RaftConsensus. |
789 | 54.1k | const auto& tablet_id = req->tablet_id(); |
790 | 54.1k | if (tablet_id != this->tablet_id()) { |
791 | 0 | resp->mutable_error()->set_code(TabletServerErrorPB::UNKNOWN_ERROR); |
792 | 0 | const auto msg = Format( |
793 | 0 | "Received a leader stepdown operation for wrong tablet id: $0, must be: $1", |
794 | 0 | tablet_id, this->tablet_id()); |
795 | 0 | LOG_WITH_PREFIX(ERROR) << msg; |
796 | 0 | StatusToPB(STATUS(IllegalState, msg), resp->mutable_error()->mutable_status()); |
797 | 0 | return Status::OK(); |
798 | 0 | } |
799 | | |
800 | 54.1k | if (state_->GetActiveRoleUnlocked() != PeerRole::LEADER) { |
801 | 42.7k | resp->mutable_error()->set_code(TabletServerErrorPB::NOT_THE_LEADER); |
802 | 42.7k | StatusToPB(STATUS(IllegalState, "Not currently leader"), |
803 | 42.7k | resp->mutable_error()->mutable_status()); |
804 | | // We return OK so that the tablet service won't overwrite the error code. |
805 | 42.7k | return Status::OK(); |
806 | 42.7k | } |
807 | | |
808 | | // The leader needs to be ready to perform a step down. There should be no PRE_VOTER in both |
809 | | // active and committed configs - ENG-557. |
810 | 11.3k | const string err_msg = ServersInTransitionMessage(); |
811 | 11.3k | if (!err_msg.empty()) { |
812 | 225 | resp->mutable_error()->set_code(TabletServerErrorPB::LEADER_NOT_READY_TO_STEP_DOWN); |
813 | 225 | StatusToPB(STATUS(IllegalState, err_msg), resp->mutable_error()->mutable_status()); |
814 | 225 | return Status::OK(); |
815 | 225 | } |
816 | | |
817 | 11.0k | std::string new_leader_uuid; |
818 | | // If a new leader is nominated, find it among peers to send RunLeaderElection request. |
819 | | // See https://ramcloud.stanford.edu/~ongaro/thesis.pdf, section 3.10 for this mechanism |
820 | | // to transfer the leadership. |
821 | 11.0k | const bool forced = (req->has_force_step_down() && req->force_step_down()0 ); |
822 | 11.0k | if (req->has_new_leader_uuid()) { |
823 | 10.5k | new_leader_uuid = req->new_leader_uuid(); |
824 | 10.5k | if (!forced && !queue_->CanPeerBecomeLeader(new_leader_uuid)) { |
825 | 166 | resp->mutable_error()->set_code(TabletServerErrorPB::LEADER_NOT_READY_TO_STEP_DOWN); |
826 | 166 | StatusToPB( |
827 | 166 | STATUS(IllegalState, "Suggested peer is not caught up yet"), |
828 | 166 | resp->mutable_error()->mutable_status()); |
829 | | // We return OK so that the tablet service won't overwrite the error code. |
830 | 166 | return Status::OK(); |
831 | 166 | } |
832 | 10.5k | } |
833 | | |
834 | 10.9k | bool graceful_stepdown = false; |
835 | 10.9k | if (new_leader_uuid.empty() && !FLAGS_stepdown_disable_graceful_transition589 && |
836 | 10.9k | !(589 req->has_disable_graceful_transition()589 && req->disable_graceful_transition()1 )) { |
837 | 588 | new_leader_uuid = queue_->GetUpToDatePeer(); |
838 | 588 | LOG_WITH_PREFIX(INFO) << "Selected up to date candidate protege leader [" << new_leader_uuid |
839 | 588 | << "]"; |
840 | 588 | graceful_stepdown = true; |
841 | 588 | } |
842 | | |
843 | 10.9k | const auto& local_peer_uuid = state_->GetPeerUuid(); |
844 | 10.9k | if (!new_leader_uuid.empty()10.9k ) { |
845 | 10.9k | const auto leadership_transfer_description = |
846 | 10.9k | Format("tablet $0 from $1 to $2", tablet_id, local_peer_uuid, new_leader_uuid); |
847 | 10.9k | if (!forced10.9k && new_leader_uuid == protege_leader_uuid_ && election_lost_by_protege_at_1.08k ) { |
848 | 923 | const MonoDelta time_since_election_loss_by_protege = |
849 | 923 | MonoTime::Now() - election_lost_by_protege_at_; |
850 | 923 | if (time_since_election_loss_by_protege.ToMilliseconds() < |
851 | 923 | FLAGS_min_leader_stepdown_retry_interval_ms) { |
852 | 908 | LOG_WITH_PREFIX(INFO) << "Unable to execute leadership transfer for " |
853 | 908 | << leadership_transfer_description |
854 | 908 | << " because the intended leader already lost an election only " |
855 | 908 | << ToString(time_since_election_loss_by_protege) << " ago (within " |
856 | 908 | << FLAGS_min_leader_stepdown_retry_interval_ms << " ms)."; |
857 | 908 | if (req->has_new_leader_uuid()) { |
858 | 903 | LOG_WITH_PREFIX(INFO) << "Rejecting leader stepdown request for " |
859 | 903 | << leadership_transfer_description; |
860 | 903 | resp->mutable_error()->set_code(TabletServerErrorPB::LEADER_NOT_READY_TO_STEP_DOWN); |
861 | 903 | resp->set_time_since_election_failure_ms( |
862 | 903 | time_since_election_loss_by_protege.ToMilliseconds()); |
863 | 903 | StatusToPB( |
864 | 903 | STATUS(IllegalState, "Suggested peer lost an election recently"), |
865 | 903 | resp->mutable_error()->mutable_status()); |
866 | | // We return OK so that the tablet service won't overwrite the error code. |
867 | 903 | return Status::OK(); |
868 | 903 | } else { |
869 | | // we were attempting a graceful transfer of our own choice |
870 | | // which is no longer possible |
871 | 5 | new_leader_uuid.clear(); |
872 | 5 | } |
873 | 908 | } |
874 | 20 | election_lost_by_protege_at_ = MonoTime(); |
875 | 20 | } |
876 | 10.9k | } |
877 | | |
878 | 10.0k | if (!new_leader_uuid.empty()) { |
879 | 10.0k | const auto* peer = FindPeer(state_->GetActiveConfigUnlocked(), new_leader_uuid); |
880 | 10.0k | if (peer10.0k && peer->member_type() == PeerMemberType::VOTER) { |
881 | 10.0k | auto timeout_ms = FLAGS_protege_synchronization_timeout_ms; |
882 | 10.0k | if (timeout_ms != 0 && |
883 | 10.0k | queue_->PeerLastReceivedOpId(new_leader_uuid) < GetLatestOpIdFromLog()10.0k ) { |
884 | 86 | delayed_step_down_ = DelayedStepDown { |
885 | 86 | .term = state_->GetCurrentTermUnlocked(), |
886 | 86 | .protege = new_leader_uuid, |
887 | 86 | .graceful = graceful_stepdown, |
888 | 86 | }; |
889 | 86 | LOG_WITH_PREFIX(INFO) << "Delay step down: " << delayed_step_down_.ToString(); |
890 | 86 | step_down_check_tracker_.Schedule( |
891 | 86 | std::bind(&RaftConsensus::CheckDelayedStepDown, this, _1), |
892 | 86 | 1ms * timeout_ms); |
893 | 86 | return Status::OK(); |
894 | 86 | } |
895 | | |
896 | 9.92k | return StartStepDownUnlocked(*peer, graceful_stepdown); |
897 | 10.0k | } |
898 | | |
899 | 10 | LOG_WITH_PREFIX(WARNING) << "New leader " << new_leader_uuid << " not found."; |
900 | 10 | if (req->has_new_leader_uuid()) { |
901 | 0 | resp->mutable_error()->set_code(TabletServerErrorPB::LEADER_NOT_READY_TO_STEP_DOWN); |
902 | 0 | StatusToPB( |
903 | 0 | STATUS(IllegalState, "New leader not found among peers"), |
904 | 0 | resp->mutable_error()->mutable_status()); |
905 | | // We return OK so that the tablet service won't overwrite the error code. |
906 | 0 | return Status::OK(); |
907 | 10 | } else { |
908 | | // we were attempting a graceful transfer of our own choice |
909 | | // which is no longer possible |
910 | 10 | new_leader_uuid.clear(); |
911 | 10 | } |
912 | 10 | } |
913 | | |
914 | 19 | if (15 graceful_stepdown15 ) { |
915 | 19 | new_leader_uuid.clear(); |
916 | 19 | } |
917 | 15 | RETURN_NOT_OK(BecomeReplicaUnlocked(new_leader_uuid, MonoDelta())); |
918 | | |
919 | 15 | return Status::OK(); |
920 | 15 | } |
921 | | |
922 | 77 | Status RaftConsensus::ElectionLostByProtege(const std::string& election_lost_by_uuid) { |
923 | 77 | if (election_lost_by_uuid.empty()) { |
924 | 0 | return STATUS(InvalidArgument, "election_lost_by_uuid could not be empty"); |
925 | 0 | } |
926 | | |
927 | 77 | auto start_election = false; |
928 | 77 | { |
929 | 77 | ReplicaState::UniqueLock lock; |
930 | 77 | RETURN_NOT_OK(state_->LockForConfigChange(&lock)); |
931 | 77 | if (election_lost_by_uuid == protege_leader_uuid_) { |
932 | 69 | LOG_WITH_PREFIX(INFO) << "Our protege " << election_lost_by_uuid |
933 | 69 | << ", lost election. Has leader: " |
934 | 69 | << state_->HasLeaderUnlocked(); |
935 | 69 | withhold_election_start_until_.store(MonoTime::Min(), std::memory_order_relaxed); |
936 | 69 | election_lost_by_protege_at_ = MonoTime::Now(); |
937 | | |
938 | 69 | start_election = !state_->HasLeaderUnlocked(); |
939 | 69 | } |
940 | 77 | } |
941 | | |
942 | 77 | if (start_election) { |
943 | 68 | return StartElection({ElectionMode::NORMAL_ELECTION}); |
944 | 68 | } |
945 | | |
946 | 9 | return Status::OK(); |
947 | 77 | } |
948 | | |
949 | 10.9k | void RaftConsensus::WithholdElectionAfterStepDown(const std::string& protege_uuid) { |
950 | 10.9k | DCHECK(state_->IsLocked()); |
951 | 10.9k | protege_leader_uuid_ = protege_uuid; |
952 | 10.9k | auto timeout = MonoDelta::FromMilliseconds( |
953 | 10.9k | FLAGS_leader_failure_max_missed_heartbeat_periods * |
954 | 10.9k | FLAGS_raft_heartbeat_interval_ms); |
955 | 10.9k | if (!protege_uuid.empty()) { |
956 | | // Actually we have 2 kinds of step downs. |
957 | | // 1) We step down in favor of some protege. |
958 | | // 2) We step down because term was advanced or just started. |
959 | | // In second case we should not withhold election for a long period of time. |
960 | 9.44k | timeout *= FLAGS_after_stepdown_delay_election_multiplier; |
961 | 9.44k | } |
962 | 10.9k | auto deadline = MonoTime::Now() + timeout; |
963 | 10.9k | VLOG(2) << "Withholding election for " << timeout0 ; |
964 | 10.9k | withhold_election_start_until_.store(deadline, std::memory_order_release); |
965 | 10.9k | election_lost_by_protege_at_ = MonoTime(); |
966 | 10.9k | } |
967 | | |
968 | | void RaftConsensus::RunLeaderElectionResponseRpcCallback( |
969 | 10.0k | shared_ptr<RunLeaderElectionState> election_state) { |
970 | | // Check for RPC errors. |
971 | 10.0k | if (!election_state->rpc.status().ok()) { |
972 | 15 | LOG_WITH_PREFIX(WARNING) << "RPC error from RunLeaderElection() call to peer " |
973 | 15 | << election_state->req.dest_uuid() << ": " |
974 | 15 | << election_state->rpc.status(); |
975 | | // Check for tablet errors. |
976 | 9.99k | } else if (election_state->resp.has_error()) { |
977 | 0 | LOG_WITH_PREFIX(WARNING) << "Tablet error from RunLeaderElection() call to peer " |
978 | 0 | << election_state->req.dest_uuid() << ": " |
979 | 0 | << StatusFromPB(election_state->resp.error().status()); |
980 | 0 | } |
981 | 10.0k | } |
982 | | |
983 | 617k | void RaftConsensus::ReportFailureDetectedTask() { |
984 | 617k | auto scope_exit = ScopeExit([this] { |
985 | 617k | outstanding_report_failure_task_.clear(std::memory_order_release); |
986 | 617k | }); |
987 | | |
988 | 617k | MonoTime now; |
989 | 617k | for (;;) { |
990 | | // Do not start election for an extended period of time if we were recently stepped down. |
991 | 617k | auto old_value = withhold_election_start_until_.load(std::memory_order_acquire); |
992 | | |
993 | 617k | if (old_value == MonoTime::Min()) { |
994 | 616k | break; |
995 | 616k | } |
996 | | |
997 | 1.39k | if (1.39k !now.Initialized()1.39k ) { |
998 | 1.39k | now = MonoTime::Now(); |
999 | 1.39k | } |
1000 | | |
1001 | 1.39k | if (now < old_value) { |
1002 | 361 | VLOG(1) << "Skipping election due to delayed timeout for " << (old_value - now)0 ; |
1003 | 361 | return; |
1004 | 361 | } |
1005 | | |
1006 | | // If we ever stepped down and then delayed election start did get scheduled, reset that we |
1007 | | // are out of that extra delay mode. |
1008 | 1.03k | if (withhold_election_start_until_.compare_exchange_weak( |
1009 | 1.03k | old_value, MonoTime::Min(), std::memory_order_release)) { |
1010 | 1.03k | break; |
1011 | 1.03k | } |
1012 | 1.03k | } |
1013 | | |
1014 | | // Start an election. |
1015 | 617k | LOG_WITH_PREFIX(INFO) << "ReportFailDetected: Starting NORMAL_ELECTION..."; |
1016 | 617k | Status s = StartElection({ElectionMode::NORMAL_ELECTION}); |
1017 | 617k | if (PREDICT_FALSE(!s.ok())) { |
1018 | 33 | LOG_WITH_PREFIX(WARNING) << "Failed to trigger leader election: " << s.ToString(); |
1019 | 33 | } |
1020 | 617k | } |
1021 | | |
1022 | 818k | void RaftConsensus::ReportFailureDetected() { |
1023 | 818k | if (FLAGS_raft_disallow_concurrent_outstanding_report_failure_tasks && |
1024 | 818k | outstanding_report_failure_task_.test_and_set(std::memory_order_acq_rel)818k ) { |
1025 | 201k | VLOG(4) |
1026 | 2 | << "Returning from ReportFailureDetected as there is already an outstanding report task."; |
1027 | 617k | } else { |
1028 | | // We're running on a timer thread; start an election on a different thread pool. |
1029 | 617k | auto s = raft_pool_token_->SubmitFunc( |
1030 | 617k | std::bind(&RaftConsensus::ReportFailureDetectedTask, shared_from_this())); |
1031 | 617k | WARN_NOT_OK(s, "Failed to submit failure detected task"); |
1032 | 617k | if (!s.ok()) { |
1033 | 0 | outstanding_report_failure_task_.clear(std::memory_order_release); |
1034 | 0 | } |
1035 | 617k | } |
1036 | 818k | } |
1037 | | |
1038 | 62.0k | Status RaftConsensus::BecomeLeaderUnlocked() { |
1039 | 62.0k | DCHECK(state_->IsLocked()); |
1040 | 62.0k | TRACE_EVENT2("consensus", "RaftConsensus::BecomeLeaderUnlocked", |
1041 | 62.0k | "peer", peer_uuid(), |
1042 | 62.0k | "tablet", tablet_id()); |
1043 | 62.0k | LOG_WITH_PREFIX(INFO) << "Becoming Leader. State: " << state_->ToStringUnlocked(); |
1044 | | |
1045 | | // Disable FD while we are leader. |
1046 | 62.0k | DisableFailureDetector(); |
1047 | | |
1048 | | // Don't vote for anyone if we're a leader. |
1049 | 62.0k | withhold_votes_until_.store(MonoTime::Max(), std::memory_order_release); |
1050 | | |
1051 | 62.0k | queue_->RegisterObserver(this); |
1052 | | |
1053 | | // Refresh queue and peers before initiating NO_OP. |
1054 | 62.0k | RefreshConsensusQueueAndPeersUnlocked(); |
1055 | | |
1056 | | // Initiate a NO_OP operation that is sent at the beginning of every term |
1057 | | // change in raft. |
1058 | 62.0k | auto replicate = std::make_shared<ReplicateMsg>(); |
1059 | 62.0k | replicate->set_op_type(NO_OP); |
1060 | 62.0k | replicate->mutable_noop_request(); // Define the no-op request field. |
1061 | 62.0k | LOG(INFO) << "Sending NO_OP at op " << state_->GetCommittedOpIdUnlocked(); |
1062 | | // This committed OpId is used for tablet bootstrap for RocksDB-backed tables. |
1063 | 62.0k | state_->GetCommittedOpIdUnlocked().ToPB(replicate->mutable_committed_op_id()); |
1064 | | |
1065 | | // TODO: We should have no-ops (?) and config changes be COMMIT_WAIT |
1066 | | // operations. See KUDU-798. |
1067 | | // Note: This hybrid_time has no meaning from a serialization perspective |
1068 | | // because this method is not executed on the TabletPeer's prepare thread. |
1069 | 62.0k | replicate->set_hybrid_time(clock_->Now().ToUint64()); |
1070 | | |
1071 | 62.0k | auto round = make_scoped_refptr<ConsensusRound>(this, replicate); |
1072 | 62.0k | round->SetCallback(MakeNonTrackedRoundCallback(round.get(), |
1073 | 62.0k | [this, term = state_->GetCurrentTermUnlocked()](const Status& status) { |
1074 | | // Set 'Leader is ready to serve' flag only for committed NoOp operation |
1075 | | // and only if the term is up-to-date. |
1076 | | // It is guaranteed that successful notification is called only while holding replicate state |
1077 | | // mutex. |
1078 | 61.8k | if (status.ok()61.8k && term == state_->GetCurrentTermUnlocked()61.7k ) { |
1079 | 61.8k | state_->SetLeaderNoOpCommittedUnlocked(true); |
1080 | 61.8k | } |
1081 | 61.8k | })); |
1082 | 62.0k | RETURN_NOT_OK(AppendNewRoundToQueueUnlocked(round)); |
1083 | | |
1084 | 62.0k | peer_manager_->SignalRequest(RequestTriggerMode::kNonEmptyOnly); |
1085 | | |
1086 | | // Set the timestamp to max uint64_t so that every time this metric is queried, the returned |
1087 | | // lag is 0. We will need to restore the timestamp once this peer steps down. |
1088 | 62.0k | follower_last_update_time_ms_metric_->UpdateTimestampInMilliseconds( |
1089 | 62.0k | std::numeric_limits<int64_t>::max()); |
1090 | 62.0k | is_raft_leader_metric_->set_value(1); |
1091 | | |
1092 | 62.0k | return Status::OK(); |
1093 | 62.0k | } |
1094 | | |
1095 | | Status RaftConsensus::BecomeReplicaUnlocked( |
1096 | 161k | const std::string& new_leader_uuid, MonoDelta initial_fd_wait) { |
1097 | 161k | LOG_WITH_PREFIX(INFO) |
1098 | 161k | << "Becoming Follower/Learner. State: " << state_->ToStringUnlocked() |
1099 | 161k | << ", new leader: " << new_leader_uuid << ", initial_fd_wait: " << initial_fd_wait; |
1100 | | |
1101 | 161k | if (state_->GetActiveRoleUnlocked() == PeerRole::LEADER) { |
1102 | 10.9k | WithholdElectionAfterStepDown(new_leader_uuid); |
1103 | 10.9k | } |
1104 | | |
1105 | 161k | state_->ClearLeaderUnlocked(); |
1106 | | |
1107 | | // FD should be running while we are a follower. |
1108 | 161k | EnableFailureDetector(initial_fd_wait); |
1109 | | |
1110 | | // Now that we're a replica, we can allow voting for other nodes. |
1111 | 161k | withhold_votes_until_.store(MonoTime::Min(), std::memory_order_release); |
1112 | | |
1113 | 161k | const Status unregister_observer_status = queue_->UnRegisterObserver(this); |
1114 | 161k | if (!unregister_observer_status.IsNotFound()) { |
1115 | 10.9k | RETURN_NOT_OK(unregister_observer_status); |
1116 | 10.9k | } |
1117 | | // Deregister ourselves from the queue. We don't care what get's replicated, since |
1118 | | // we're stepping down. |
1119 | 161k | queue_->SetNonLeaderMode(); |
1120 | | |
1121 | 161k | peer_manager_->Close(); |
1122 | | |
1123 | | // TODO: https://github.com/yugabyte/yugabyte-db/issues/5522. Add unit tests for this metric. |
1124 | | // We update the follower lag metric timestamp here because it's possible that a leader |
1125 | | // that step downs could get partitioned before it receives any replicate message. If we |
1126 | | // don't update the timestamp here, and the above scenario happens, the metric will keep the |
1127 | | // uint64_t max value, which would make the metric return a 0 lag every time it is queried, |
1128 | | // even though that's not the case. |
1129 | 161k | follower_last_update_time_ms_metric_->UpdateTimestampInMilliseconds( |
1130 | 161k | clock_->Now().GetPhysicalValueMicros() / 1000); |
1131 | 161k | is_raft_leader_metric_->set_value(0); |
1132 | | |
1133 | 161k | return Status::OK(); |
1134 | 161k | } |
1135 | | |
1136 | 195 | Status RaftConsensus::TEST_Replicate(const ConsensusRoundPtr& round) { |
1137 | 195 | return ReplicateBatch({ round }); |
1138 | 195 | } |
1139 | | |
1140 | 4.93M | Status RaftConsensus::ReplicateBatch(const ConsensusRounds& rounds) { |
1141 | 4.93M | size_t processed_rounds = 0; |
1142 | 4.93M | auto status = DoReplicateBatch(rounds, &processed_rounds); |
1143 | 4.93M | if (!status.ok()) { |
1144 | 84 | VLOG_WITH_PREFIX_AND_FUNC0 (1) |
1145 | 0 | << "Failed with status " << status << ", treating all " << rounds.size() |
1146 | 0 | << " operations as failed with that status"; |
1147 | | // Treat all the operations in the batch as failed. |
1148 | 147 | for (size_t i = rounds.size(); i != processed_rounds;) { |
1149 | 63 | rounds[--i]->callback()->ReplicationFailed(status); |
1150 | 63 | } |
1151 | 84 | } |
1152 | 4.93M | return status; |
1153 | 4.93M | } |
1154 | | |
1155 | 4.93M | Status RaftConsensus::DoReplicateBatch(const ConsensusRounds& rounds, size_t* processed_rounds) { |
1156 | 4.93M | RETURN_NOT_OK(ExecuteHook(PRE_REPLICATE)); |
1157 | 4.93M | { |
1158 | 4.93M | ReplicaState::UniqueLock lock; |
1159 | 4.93M | #ifndef NDEBUG |
1160 | 5.04M | for (const auto& round : rounds) { |
1161 | 18.4E | DCHECK(!round->replicate_msg()->has_id()) << "Should not have an OpId yet: " |
1162 | 18.4E | << round->replicate_msg()->DebugString(); |
1163 | 5.04M | } |
1164 | 4.93M | #endif |
1165 | 4.93M | RETURN_NOT_OK(state_->LockForReplicate(&lock)); |
1166 | 4.93M | auto current_term = state_->GetCurrentTermUnlocked(); |
1167 | 4.93M | if (current_term == delayed_step_down_.term) { |
1168 | 15 | return STATUS(Aborted, "Rejecting because of planned step down"); |
1169 | 15 | } |
1170 | | |
1171 | 5.04M | for (const auto& round : rounds)4.93M { |
1172 | 5.04M | RETURN_NOT_OK(round->CheckBoundTerm(current_term)); |
1173 | 5.04M | } |
1174 | 4.93M | auto status = AppendNewRoundsToQueueUnlocked(rounds, processed_rounds); |
1175 | 4.93M | if (!status.ok()) { |
1176 | | // In general we have 3 kinds of rounds in case of failure: |
1177 | | // 1) Rounds that were rejected by retryable requests. |
1178 | | // We should not call ReplicationFinished for them. |
1179 | | // 2) Rounds that were registered with retryable requests. |
1180 | | // We should call state_->NotifyReplicationFinishedUnlocked for them. |
1181 | | // 3) Rounds that were not registered with retryable requests. |
1182 | | // We should call ReplicationFinished directly for them. Could do it after releasing |
1183 | | // the lock. I.e. in ReplicateBatch. |
1184 | | // |
1185 | | // (3) is all rounds starting with index *processed_rounds and above. |
1186 | | // For (1) we reset bound term, so could use it to distinguish between (1) and (2). |
1187 | 58 | for (size_t i = *processed_rounds; i != 0;) { |
1188 | 23 | --i; |
1189 | 23 | if (rounds[i]->bound_term() == OpId::kUnknownTerm) { |
1190 | | // Already rejected by retryable requests. |
1191 | 0 | continue; |
1192 | 0 | } |
1193 | 23 | state_->NotifyReplicationFinishedUnlocked( |
1194 | 23 | rounds[i], status, OpId::kUnknownTerm, /* applied_op_ids */ nullptr); |
1195 | 23 | } |
1196 | 35 | return status; |
1197 | 35 | } |
1198 | 4.93M | } |
1199 | | |
1200 | 4.93M | peer_manager_->SignalRequest(RequestTriggerMode::kNonEmptyOnly); |
1201 | 4.93M | RETURN_NOT_OK(ExecuteHook(POST_REPLICATE)); |
1202 | 4.93M | return Status::OK(); |
1203 | 4.93M | } |
1204 | | |
1205 | 67.9k | Status RaftConsensus::AppendNewRoundToQueueUnlocked(const scoped_refptr<ConsensusRound>& round) { |
1206 | 67.9k | size_t processed_rounds = 0; |
1207 | 67.9k | return AppendNewRoundsToQueueUnlocked({ round }, &processed_rounds); |
1208 | 67.9k | } |
1209 | | |
1210 | 4.99M | Status RaftConsensus::CheckLeasesUnlocked(const ConsensusRoundPtr& round) { |
1211 | 4.99M | auto op_type = round->replicate_msg()->op_type(); |
1212 | | // When we do not have a hybrid time leader lease we allow 2 operation types to be added to RAFT. |
1213 | | // NO_OP - because even empty heartbeat messages could be used to obtain the lease. |
1214 | | // CHANGE_CONFIG_OP - because we should be able to update consensus even w/o lease. |
1215 | | // Both of them are safe, since they don't affect user reads or writes. |
1216 | 4.99M | if (IsConsensusOnlyOperation(op_type)) { |
1217 | 67.9k | return Status::OK(); |
1218 | 67.9k | } |
1219 | | |
1220 | 4.92M | auto lease_status = state_->GetHybridTimeLeaseStatusAtUnlocked( |
1221 | 4.92M | HybridTime(round->replicate_msg()->hybrid_time()).GetPhysicalValueMicros()); |
1222 | 4.92M | static_assert(LeaderLeaseStatus_ARRAYSIZE == 3, "Please update logic below to adapt new state"); |
1223 | 4.92M | if (lease_status == LeaderLeaseStatus::OLD_LEADER_MAY_HAVE_LEASE) { |
1224 | 0 | return STATUS_FORMAT(LeaderHasNoLease, |
1225 | 0 | "Old leader may have hybrid time lease, while adding: $0", |
1226 | 0 | OperationType_Name(op_type)); |
1227 | 0 | } |
1228 | 4.92M | lease_status = state_->GetLeaderLeaseStatusUnlocked(nullptr); |
1229 | 4.92M | if (lease_status == LeaderLeaseStatus::OLD_LEADER_MAY_HAVE_LEASE) { |
1230 | 0 | return STATUS_FORMAT(LeaderHasNoLease, |
1231 | 0 | "Old leader may have lease, while adding: $0", |
1232 | 0 | OperationType_Name(op_type)); |
1233 | 0 | } |
1234 | | |
1235 | 4.92M | return Status::OK(); |
1236 | 4.92M | } |
1237 | | |
1238 | | Status RaftConsensus::AppendNewRoundsToQueueUnlocked( |
1239 | 4.99M | const ConsensusRounds& rounds, size_t* processed_rounds) { |
1240 | 4.99M | SCHECK(!rounds.empty(), InvalidArgument, "Attempted to add zero rounds to the queue"); |
1241 | | |
1242 | 4.99M | auto role = state_->GetActiveRoleUnlocked(); |
1243 | 4.99M | if (role != PeerRole::LEADER) { |
1244 | 12 | return STATUS_FORMAT(IllegalState, "Appending new rounds while not the leader but $0", |
1245 | 12 | PeerRole_Name(role)); |
1246 | 12 | } |
1247 | | |
1248 | 4.99M | std::vector<ReplicateMsgPtr> replicate_msgs; |
1249 | 4.99M | replicate_msgs.reserve(rounds.size()); |
1250 | 4.99M | const OpId& committed_op_id = state_->GetCommittedOpIdUnlocked(); |
1251 | | |
1252 | 5.11M | for (const auto& round : rounds) { |
1253 | 5.11M | ++*processed_rounds; |
1254 | | |
1255 | 5.11M | if (round->replicate_msg()->op_type() == OperationType::WRITE_OP && |
1256 | 5.11M | !state_->RegisterRetryableRequest(round)3.02M ) { |
1257 | 1 | round->BindToTerm(OpId::kUnknownTerm); // Mark round as non replicating |
1258 | 1 | continue; |
1259 | 1 | } |
1260 | | |
1261 | 5.11M | OpId op_id = state_->NewIdUnlocked(); |
1262 | | |
1263 | | // We use this callback to transform write operations by substituting the hybrid_time into |
1264 | | // the write batch inside the write operation. |
1265 | | // |
1266 | | // TODO: we could allocate multiple HybridTimes in batch, only reading system clock once. |
1267 | 5.11M | round->callback()->AddedToLeader(op_id, committed_op_id); |
1268 | | |
1269 | 5.11M | Status s = state_->AddPendingOperation(round, OperationMode::kLeader); |
1270 | 5.11M | if (!s.ok()) { |
1271 | 23 | RollbackIdAndDeleteOpId(round->replicate_msg(), false /* should_exists */); |
1272 | | |
1273 | | // Iterate rounds in the reverse order and release ids. |
1274 | 23 | while (!replicate_msgs.empty()) { |
1275 | 0 | RollbackIdAndDeleteOpId(replicate_msgs.back(), true /* should_exists */); |
1276 | 0 | replicate_msgs.pop_back(); |
1277 | 0 | } |
1278 | 23 | return s; |
1279 | 23 | } |
1280 | | |
1281 | 5.11M | replicate_msgs.push_back(round->replicate_msg()); |
1282 | 5.11M | } |
1283 | | |
1284 | 4.99M | if (replicate_msgs.empty()) { |
1285 | 1 | return Status::OK(); |
1286 | 1 | } |
1287 | | |
1288 | | // Could check lease just for the latest operation in batch, because it will have greatest |
1289 | | // hybrid time, so requires most advanced lease. |
1290 | 4.99M | auto s = CheckLeasesUnlocked(rounds.back()); |
1291 | | |
1292 | 4.99M | if (s.ok()) { |
1293 | 4.99M | s = queue_->AppendOperations(replicate_msgs, committed_op_id, state_->Clock().Now()); |
1294 | 4.99M | } |
1295 | | |
1296 | | // Handle Status::ServiceUnavailable(), which means the queue is full. |
1297 | | // TODO: what are we doing about other errors here? Should we also release OpIds in those cases? |
1298 | 4.99M | if (PREDICT_FALSE(!s.ok())) { |
1299 | 0 | LOG_WITH_PREFIX(WARNING) << "Could not append replicate request: " << s << ", queue status: " |
1300 | 0 | << queue_->ToString(); |
1301 | 0 | for (auto iter = replicate_msgs.rbegin(); iter != replicate_msgs.rend(); ++iter) { |
1302 | 0 | RollbackIdAndDeleteOpId(*iter, true /* should_exists */); |
1303 | | // TODO Possibly evict a dangling peer from the configuration here. |
1304 | | // TODO count of number of ops failed due to consensus queue overflow. |
1305 | 0 | } |
1306 | |
|
1307 | 0 | return s.CloneAndPrepend("Unable to append operations to consensus queue"); |
1308 | 0 | } |
1309 | | |
1310 | 4.99M | state_->UpdateLastReceivedOpIdUnlocked(replicate_msgs.back()->id()); |
1311 | 4.99M | return Status::OK(); |
1312 | 4.99M | } |
1313 | | |
1314 | | void RaftConsensus::MajorityReplicatedNumSSTFilesChanged( |
1315 | 2.06k | uint64_t majority_replicated_num_sst_files) { |
1316 | 2.06k | majority_num_sst_files_.store(majority_replicated_num_sst_files, std::memory_order_release); |
1317 | 2.06k | } |
1318 | | |
1319 | | void RaftConsensus::UpdateMajorityReplicated( |
1320 | | const MajorityReplicatedData& majority_replicated_data, OpId* committed_op_id, |
1321 | 34.7M | OpId* last_applied_op_id) { |
1322 | 34.7M | TEST_PAUSE_IF_FLAG(TEST_pause_update_majority_replicated); |
1323 | 34.7M | ReplicaState::UniqueLock lock; |
1324 | 34.7M | Status s = state_->LockForMajorityReplicatedIndexUpdate(&lock); |
1325 | 34.7M | if (PREDICT_FALSE(!s.ok())) { |
1326 | 128 | LOG_WITH_PREFIX(WARNING) |
1327 | 128 | << "Unable to take state lock to update committed index: " |
1328 | 128 | << s.ToString(); |
1329 | 128 | return; |
1330 | 128 | } |
1331 | | |
1332 | 34.7M | EnumBitSet<SetMajorityReplicatedLeaseExpirationFlag> flags; |
1333 | 34.7M | if (GetAtomicFlag(&FLAGS_enable_lease_revocation)) { |
1334 | 34.7M | if (!state_->old_leader_lease().holder_uuid.empty() && |
1335 | 34.7M | queue_->PeerAcceptedOurLease(state_->old_leader_lease().holder_uuid)31.7k ) { |
1336 | 10.2k | flags.Set(SetMajorityReplicatedLeaseExpirationFlag::kResetOldLeaderLease); |
1337 | 10.2k | } |
1338 | | |
1339 | 34.7M | if (!state_->old_leader_ht_lease().holder_uuid.empty() && |
1340 | 34.7M | queue_->PeerAcceptedOurLease(state_->old_leader_ht_lease().holder_uuid)281k ) { |
1341 | 10.7k | flags.Set(SetMajorityReplicatedLeaseExpirationFlag::kResetOldLeaderHtLease); |
1342 | 10.7k | } |
1343 | 34.7M | } |
1344 | | |
1345 | 34.7M | state_->SetMajorityReplicatedLeaseExpirationUnlocked(majority_replicated_data, flags); |
1346 | 34.7M | leader_lease_wait_cond_.notify_all(); |
1347 | | |
1348 | 34.7M | VLOG_WITH_PREFIX15.0k (1) << "Marking majority replicated up to " |
1349 | 15.0k | << majority_replicated_data.ToString(); |
1350 | 34.7M | TRACE("Marking majority replicated up to $0", majority_replicated_data.op_id.ToString()); |
1351 | 34.7M | bool committed_index_changed = false; |
1352 | 34.7M | s = state_->UpdateMajorityReplicatedUnlocked( |
1353 | 34.7M | majority_replicated_data.op_id, committed_op_id, &committed_index_changed, |
1354 | 34.7M | last_applied_op_id); |
1355 | 34.7M | auto leader_state = state_->GetLeaderStateUnlocked(); |
1356 | 34.7M | if (leader_state.ok() && leader_state.status == LeaderStatus::LEADER_AND_READY34.6M ) { |
1357 | 34.6M | state_->context()->MajorityReplicated(); |
1358 | 34.6M | } |
1359 | 34.7M | if (PREDICT_FALSE(!s.ok())) { |
1360 | 0 | string msg = Format("Unable to mark committed up to $0: $1", majority_replicated_data.op_id, s); |
1361 | 0 | TRACE(msg); |
1362 | 0 | LOG_WITH_PREFIX(WARNING) << msg; |
1363 | 0 | return; |
1364 | 0 | } |
1365 | | |
1366 | 34.7M | majority_num_sst_files_.store(majority_replicated_data.num_sst_files, std::memory_order_release); |
1367 | | |
1368 | 34.7M | if (!majority_replicated_data.peer_got_all_ops.empty() && |
1369 | 34.7M | delayed_step_down_.term == state_->GetCurrentTermUnlocked()23.3M && |
1370 | 34.7M | majority_replicated_data.peer_got_all_ops == delayed_step_down_.protege187 ) { |
1371 | 75 | LOG_WITH_PREFIX(INFO) << "Protege synchronized: " << delayed_step_down_.ToString(); |
1372 | 75 | const auto* peer = FindPeer(state_->GetActiveConfigUnlocked(), delayed_step_down_.protege); |
1373 | 75 | if (peer) { |
1374 | 75 | WARN_NOT_OK(StartStepDownUnlocked(*peer, delayed_step_down_.graceful), |
1375 | 75 | "Start step down failed"); |
1376 | 75 | } |
1377 | 75 | delayed_step_down_.term = OpId::kUnknownTerm; |
1378 | 75 | } |
1379 | | |
1380 | 34.7M | if (committed_index_changed && |
1381 | 34.7M | state_->GetActiveRoleUnlocked() == PeerRole::LEADER4.68M ) { |
1382 | | // If all operations were just committed, and we don't have pending operations, then |
1383 | | // we write an empty batch that contains committed index. |
1384 | | // This affects only our local log, because followers have different logic in this scenario. |
1385 | 4.69M | if (*committed_op_id == state_->GetLastReceivedOpIdUnlocked()) { |
1386 | 4.28M | auto status = queue_->AppendOperations({}, *committed_op_id, state_->Clock().Now()); |
1387 | 18.4E | LOG_IF_WITH_PREFIX(DFATAL, !status.ok() && !status.IsServiceUnavailable()) |
1388 | 18.4E | << "Failed to append empty batch: " << status; |
1389 | 4.28M | } |
1390 | | |
1391 | 4.69M | lock.unlock(); |
1392 | | // No need to hold the lock while calling SignalRequest. |
1393 | 4.69M | peer_manager_->SignalRequest(RequestTriggerMode::kNonEmptyOnly); |
1394 | 4.69M | } |
1395 | 34.7M | } |
1396 | | |
1397 | 0 | void RaftConsensus::AppendEmptyBatchToLeaderLog() { |
1398 | 0 | auto lock = state_->LockForRead(); |
1399 | 0 | auto committed_op_id = state_->GetCommittedOpIdUnlocked(); |
1400 | 0 | if (committed_op_id == state_->GetLastReceivedOpIdUnlocked()) { |
1401 | 0 | auto status = queue_->AppendOperations({}, committed_op_id, state_->Clock().Now()); |
1402 | 0 | LOG_IF_WITH_PREFIX(DFATAL, !status.ok()) << "Failed to append empty batch: " << status; |
1403 | 0 | } |
1404 | 0 | } |
1405 | | |
1406 | 801 | void RaftConsensus::NotifyTermChange(int64_t term) { |
1407 | 801 | ReplicaState::UniqueLock lock; |
1408 | 801 | Status s = state_->LockForConfigChange(&lock); |
1409 | 801 | if (PREDICT_FALSE(!s.ok())) { |
1410 | 0 | LOG_WITH_PREFIX(WARNING) << "Unable to lock ReplicaState for config change" |
1411 | 0 | << " when notified of new term " << term << ": " << s; |
1412 | 0 | return; |
1413 | 0 | } |
1414 | 801 | WARN_NOT_OK(HandleTermAdvanceUnlocked(term), "Couldn't advance consensus term."); |
1415 | 801 | } |
1416 | | |
1417 | | void RaftConsensus::NotifyFailedFollower(const string& uuid, |
1418 | | int64_t term, |
1419 | 330 | const std::string& reason) { |
1420 | | // Common info used in all of the log messages within this method. |
1421 | 330 | string fail_msg = Substitute("Processing failure of peer $0 in term $1 ($2): ", |
1422 | 330 | uuid, term, reason); |
1423 | | |
1424 | 330 | if (!FLAGS_evict_failed_followers) { |
1425 | 0 | LOG_WITH_PREFIX(INFO) << fail_msg << "Eviction of failed followers is disabled. Doing nothing."; |
1426 | 0 | return; |
1427 | 0 | } |
1428 | | |
1429 | 330 | RaftConfigPB committed_config; |
1430 | 330 | { |
1431 | 330 | auto lock = state_->LockForRead(); |
1432 | | |
1433 | 330 | int64_t current_term = state_->GetCurrentTermUnlocked(); |
1434 | 330 | if (current_term != term) { |
1435 | 0 | LOG_WITH_PREFIX(INFO) << fail_msg << "Notified about a follower failure in " |
1436 | 0 | << "previous term " << term << ", but a leader election " |
1437 | 0 | << "likely occurred since the failure was detected. " |
1438 | 0 | << "Doing nothing."; |
1439 | 0 | return; |
1440 | 0 | } |
1441 | | |
1442 | 330 | if (state_->IsConfigChangePendingUnlocked()) { |
1443 | 84 | LOG_WITH_PREFIX(INFO) << fail_msg << "There is already a config change operation " |
1444 | 84 | << "in progress. Unable to evict follower until it completes. " |
1445 | 84 | << "Doing nothing."; |
1446 | 84 | return; |
1447 | 84 | } |
1448 | 246 | committed_config = state_->GetCommittedConfigUnlocked(); |
1449 | 246 | } |
1450 | | |
1451 | | // Run config change on thread pool after dropping ReplicaState lock. |
1452 | 246 | WARN_NOT_OK(raft_pool_token_->SubmitFunc(std::bind(&RaftConsensus::TryRemoveFollowerTask, |
1453 | 246 | shared_from_this(), uuid, committed_config, reason)), |
1454 | 246 | state_->LogPrefix() + "Unable to start RemoteFollowerTask"); |
1455 | 246 | } |
1456 | | |
1457 | | void RaftConsensus::TryRemoveFollowerTask(const string& uuid, |
1458 | | const RaftConfigPB& committed_config, |
1459 | 246 | const std::string& reason) { |
1460 | 246 | ChangeConfigRequestPB req; |
1461 | 246 | req.set_tablet_id(tablet_id()); |
1462 | 246 | req.mutable_server()->set_permanent_uuid(uuid); |
1463 | 246 | req.set_type(REMOVE_SERVER); |
1464 | 246 | req.set_cas_config_opid_index(committed_config.opid_index()); |
1465 | 246 | LOG_WITH_PREFIX(INFO) |
1466 | 246 | << "Attempting to remove follower " << uuid << " from the Raft config at commit index " |
1467 | 246 | << committed_config.opid_index() << ". Reason: " << reason; |
1468 | 246 | boost::optional<TabletServerErrorPB::Code> error_code; |
1469 | 246 | WARN_NOT_OK(ChangeConfig(req, &DoNothingStatusCB, &error_code), |
1470 | 246 | state_->LogPrefix() + "Unable to remove follower " + uuid); |
1471 | 246 | } |
1472 | | |
1473 | | Status RaftConsensus::Update(ConsensusRequestPB* request, |
1474 | | ConsensusResponsePB* response, |
1475 | 25.5M | CoarseTimePoint deadline) { |
1476 | 25.5M | if (PREDICT_FALSE(FLAGS_TEST_follower_reject_update_consensus_requests)) { |
1477 | 329 | return STATUS(IllegalState, "Rejected: --TEST_follower_reject_update_consensus_requests " |
1478 | 329 | "is set to true."); |
1479 | 329 | } |
1480 | 25.5M | TEST_PAUSE_IF_FLAG(TEST_follower_pause_update_consensus_requests); |
1481 | | |
1482 | 25.5M | auto reject_mode = reject_mode_.load(std::memory_order_acquire); |
1483 | 25.5M | if (reject_mode != RejectMode::kNone) { |
1484 | 0 | if (reject_mode == RejectMode::kAll || |
1485 | 0 | (reject_mode == RejectMode::kNonEmpty && !request->ops().empty())) { |
1486 | 0 | auto result = STATUS_FORMAT(IllegalState, "Rejected because of reject mode: $0", |
1487 | 0 | ToString(reject_mode)); |
1488 | 0 | LOG_WITH_PREFIX(INFO) << result; |
1489 | 0 | return result; |
1490 | 0 | } |
1491 | 0 | LOG_WITH_PREFIX(INFO) << "Accepted: " << request->ShortDebugString(); |
1492 | 0 | } |
1493 | | |
1494 | 25.5M | if (PREDICT_FALSE(FLAGS_TEST_follower_reject_update_consensus_requests_seconds > 0)) { |
1495 | 385 | if (MonoTime::Now() < withold_replica_updates_until_) { |
1496 | 385 | LOG(INFO) << "Rejecting Update for tablet: " << tablet_id() |
1497 | 385 | << " tserver uuid: " << peer_uuid(); |
1498 | 385 | return STATUS_SUBSTITUTE(IllegalState, |
1499 | 385 | "Rejected: --TEST_follower_reject_update_consensus_requests_seconds is set to $0", |
1500 | 385 | FLAGS_TEST_follower_reject_update_consensus_requests_seconds); |
1501 | 385 | } |
1502 | 385 | } |
1503 | | |
1504 | 25.5M | RETURN_NOT_OK(ExecuteHook(PRE_UPDATE)); |
1505 | 25.5M | response->set_responder_uuid(state_->GetPeerUuid()); |
1506 | | |
1507 | 25.5M | VLOG_WITH_PREFIX32.0k (2) << "Replica received request: " << request->ShortDebugString()32.0k ; |
1508 | | |
1509 | 25.5M | UpdateReplicaResult result; |
1510 | 25.5M | { |
1511 | | // see var declaration |
1512 | 25.5M | auto wait_start = CoarseMonoClock::now(); |
1513 | 25.5M | auto wait_duration = deadline != CoarseTimePoint::max() ? deadline - wait_start25.5M |
1514 | 25.5M | : CoarseDuration::max()22.1k ; |
1515 | 25.5M | auto lock = LockMutex(&update_mutex_, wait_duration); |
1516 | 25.5M | if (!lock.owns_lock()) { |
1517 | 1.20k | return STATUS_FORMAT(TimedOut, "Unable to lock update mutex for $0", wait_duration); |
1518 | 1.20k | } |
1519 | | |
1520 | 25.5M | LongOperationTracker operation_tracker("UpdateReplica", 1s); |
1521 | 25.5M | result = VERIFY_RESULT25.5M (25.5M UpdateReplica(request, response)); |
1522 | | |
1523 | 0 | auto delay = TEST_delay_update_.load(std::memory_order_acquire); |
1524 | 25.5M | if (delay != MonoDelta::kZero) { |
1525 | 0 | std::this_thread::sleep_for(delay.ToSteadyDuration()); |
1526 | 0 | } |
1527 | 25.5M | } |
1528 | | |
1529 | | // Release the lock while we wait for the log append to finish so that commits can go through. |
1530 | 25.5M | if (!result.wait_for_op_id.empty()) { |
1531 | 8.27M | RETURN_NOT_OK(WaitForWrites(result.current_term, result.wait_for_op_id)); |
1532 | 8.27M | } |
1533 | | |
1534 | 25.5M | if (PREDICT_FALSE(VLOG_IS_ON(2))) { |
1535 | 0 | VLOG_WITH_PREFIX(2) << "Replica updated. " |
1536 | 0 | << state_->ToString() << " Request: " << request->ShortDebugString(); |
1537 | 0 | } |
1538 | | |
1539 | | // If an election pending on a specific op id and it has just been committed, start it now. |
1540 | | // StartElection will ensure the pending election will be started just once only even if |
1541 | | // UpdateReplica happens in multiple threads in parallel. |
1542 | 25.5M | if (result.start_election) { |
1543 | 0 | RETURN_NOT_OK(StartElection( |
1544 | 0 | {consensus::ElectionMode::ELECT_EVEN_IF_LEADER_IS_ALIVE, true /* pending_commit */})); |
1545 | 0 | } |
1546 | | |
1547 | 25.5M | RETURN_NOT_OK(ExecuteHook(POST_UPDATE)); |
1548 | 25.5M | return Status::OK(); |
1549 | 25.5M | } |
1550 | | |
1551 | | Status RaftConsensus::StartReplicaOperationUnlocked( |
1552 | 9.41M | const ReplicateMsgPtr& msg, HybridTime propagated_safe_time) { |
1553 | 9.41M | if (IsConsensusOnlyOperation(msg->op_type())) { |
1554 | 133k | return StartConsensusOnlyRoundUnlocked(msg); |
1555 | 133k | } |
1556 | | |
1557 | 9.28M | if (PREDICT_FALSE(FLAGS_TEST_follower_fail_all_prepare)) { |
1558 | 1 | return STATUS(IllegalState, "Rejected: --TEST_follower_fail_all_prepare " |
1559 | 1 | "is set to true."); |
1560 | 1 | } |
1561 | | |
1562 | 9.28M | VLOG_WITH_PREFIX3.00k (1) << "Starting operation: " << msg->id().ShortDebugString()3.00k ; |
1563 | 9.28M | scoped_refptr<ConsensusRound> round(new ConsensusRound(this, msg)); |
1564 | 9.28M | ConsensusRound* round_ptr = round.get(); |
1565 | 9.28M | RETURN_NOT_OK(state_->context()->StartReplicaOperation(round, propagated_safe_time)); |
1566 | 9.28M | auto result = state_->AddPendingOperation(round_ptr, OperationMode::kFollower); |
1567 | 9.28M | if (!result.ok()) { |
1568 | 0 | round_ptr->NotifyReplicationFinished(result, OpId::kUnknownTerm, /* applied_op_ids */ nullptr); |
1569 | 0 | } |
1570 | 9.28M | return result; |
1571 | 9.28M | } |
1572 | | |
1573 | 34 | std::string RaftConsensus::LeaderRequest::OpsRangeString() const { |
1574 | 34 | std::string ret; |
1575 | 34 | ret.reserve(100); |
1576 | 34 | ret.push_back('['); |
1577 | 34 | if (!messages.empty()) { |
1578 | 7 | const OpIdPB& first_op = (*messages.begin())->id(); |
1579 | 7 | const OpIdPB& last_op = (*messages.rbegin())->id(); |
1580 | 7 | strings::SubstituteAndAppend(&ret, "$0.$1-$2.$3", |
1581 | 7 | first_op.term(), first_op.index(), |
1582 | 7 | last_op.term(), last_op.index()); |
1583 | 7 | } |
1584 | 34 | ret.push_back(']'); |
1585 | 34 | return ret; |
1586 | 34 | } |
1587 | | |
1588 | | Status RaftConsensus::DeduplicateLeaderRequestUnlocked(ConsensusRequestPB* rpc_req, |
1589 | 25.5M | LeaderRequest* deduplicated_req) { |
1590 | 25.5M | const auto& last_committed = state_->GetCommittedOpIdUnlocked(); |
1591 | | |
1592 | | // The leader's preceding id. |
1593 | 25.5M | deduplicated_req->preceding_op_id = yb::OpId::FromPB(rpc_req->preceding_id()); |
1594 | | |
1595 | 25.5M | int64_t dedup_up_to_index = state_->GetLastReceivedOpIdUnlocked().index; |
1596 | | |
1597 | 25.5M | deduplicated_req->first_message_idx = -1; |
1598 | | |
1599 | | // In this loop we discard duplicates and advance the leader's preceding id |
1600 | | // accordingly. |
1601 | 34.9M | for (int i = 0; i < rpc_req->ops_size(); i++9.41M ) { |
1602 | 9.41M | ReplicateMsg* leader_msg = rpc_req->mutable_ops(i); |
1603 | | |
1604 | 9.41M | if (leader_msg->id().index() <= last_committed.index) { |
1605 | 603 | VLOG_WITH_PREFIX0 (2) << "Skipping op id " << leader_msg->id() |
1606 | 0 | << " (already committed)"; |
1607 | 603 | deduplicated_req->preceding_op_id = yb::OpId::FromPB(leader_msg->id()); |
1608 | 603 | continue; |
1609 | 603 | } |
1610 | | |
1611 | 9.41M | if (leader_msg->id().index() <= dedup_up_to_index) { |
1612 | | // If the index is uncommitted and below our match index, then it must be in the |
1613 | | // pendings set. |
1614 | 114 | scoped_refptr<ConsensusRound> round = |
1615 | 114 | state_->GetPendingOpByIndexOrNullUnlocked(leader_msg->id().index()); |
1616 | 114 | if (!round) { |
1617 | | // Could happen if we received outdated leader request. So should just reject it. |
1618 | 0 | return STATUS_FORMAT(IllegalState, "Round not found for index: $0", |
1619 | 0 | leader_msg->id().index()); |
1620 | 0 | } |
1621 | | |
1622 | | // If the OpIds match, i.e. if they have the same term and id, then this is just |
1623 | | // duplicate, we skip... |
1624 | 114 | if (OpIdEquals(round->replicate_msg()->id(), leader_msg->id())) { |
1625 | 8 | VLOG_WITH_PREFIX0 (2) << "Skipping op id " << leader_msg->id() |
1626 | 0 | << " (already replicated)"; |
1627 | 8 | deduplicated_req->preceding_op_id = yb::OpId::FromPB(leader_msg->id()); |
1628 | 8 | continue; |
1629 | 8 | } |
1630 | | |
1631 | | // ... otherwise we must adjust our match index, i.e. all messages from now on |
1632 | | // are "new" |
1633 | 106 | dedup_up_to_index = leader_msg->id().index(); |
1634 | 106 | } |
1635 | | |
1636 | 9.41M | if (deduplicated_req->first_message_idx == -1) { |
1637 | 8.27M | deduplicated_req->first_message_idx = i; |
1638 | 8.27M | } |
1639 | 9.41M | deduplicated_req->messages.emplace_back(leader_msg); |
1640 | 9.41M | } |
1641 | | |
1642 | 25.5M | if (deduplicated_req->messages.size() != implicit_cast<size_t>(rpc_req->ops_size())) { |
1643 | 34 | LOG_WITH_PREFIX(INFO) << "Deduplicated request from leader. Original: " |
1644 | 34 | << rpc_req->preceding_id() << "->" << OpsRangeString(*rpc_req) |
1645 | 34 | << " Dedup: " << deduplicated_req->preceding_op_id << "->" |
1646 | 34 | << deduplicated_req->OpsRangeString() |
1647 | 34 | << ", known committed: " << last_committed << ", received committed: " |
1648 | 34 | << OpId::FromPB(rpc_req->committed_op_id()); |
1649 | 34 | } |
1650 | | |
1651 | 25.5M | return Status::OK(); |
1652 | 25.5M | } |
1653 | | |
1654 | | Status RaftConsensus::HandleLeaderRequestTermUnlocked(const ConsensusRequestPB* request, |
1655 | 25.5M | ConsensusResponsePB* response) { |
1656 | | // Do term checks first: |
1657 | 25.5M | if (PREDICT_FALSE(request->caller_term() != state_->GetCurrentTermUnlocked())) { |
1658 | | |
1659 | | // If less, reject. |
1660 | 12.1k | if (request->caller_term() < state_->GetCurrentTermUnlocked()) { |
1661 | 1.56k | string msg = Substitute("Rejecting Update request from peer $0 for earlier term $1. " |
1662 | 1.56k | "Current term is $2. Ops: $3", |
1663 | | |
1664 | 1.56k | request->caller_uuid(), |
1665 | 1.56k | request->caller_term(), |
1666 | 1.56k | state_->GetCurrentTermUnlocked(), |
1667 | 1.56k | OpsRangeString(*request)); |
1668 | 1.56k | LOG_WITH_PREFIX(INFO) << msg; |
1669 | 1.56k | FillConsensusResponseError(response, |
1670 | 1.56k | ConsensusErrorPB::INVALID_TERM, |
1671 | 1.56k | STATUS(IllegalState, msg)); |
1672 | 1.56k | return Status::OK(); |
1673 | 10.6k | } else { |
1674 | 10.6k | RETURN_NOT_OK(HandleTermAdvanceUnlocked(request->caller_term())); |
1675 | 10.6k | } |
1676 | 12.1k | } |
1677 | 25.5M | return Status::OK(); |
1678 | 25.5M | } |
1679 | | |
1680 | | Status RaftConsensus::EnforceLogMatchingPropertyMatchesUnlocked(const LeaderRequest& req, |
1681 | 25.5M | ConsensusResponsePB* response) { |
1682 | | |
1683 | 25.5M | bool term_mismatch; |
1684 | 25.5M | if (state_->IsOpCommittedOrPending(req.preceding_op_id, &term_mismatch)) { |
1685 | 25.4M | return Status::OK(); |
1686 | 25.4M | } |
1687 | | |
1688 | 100k | string error_msg = Format( |
1689 | 100k | "Log matching property violated." |
1690 | 100k | " Preceding OpId in replica: $0. Preceding OpId from leader: $1. ($2 mismatch)", |
1691 | 100k | state_->GetLastReceivedOpIdUnlocked(), req.preceding_op_id, term_mismatch ? "term"65 : "index"100k ); |
1692 | | |
1693 | 100k | FillConsensusResponseError(response, |
1694 | 100k | ConsensusErrorPB::PRECEDING_ENTRY_DIDNT_MATCH, |
1695 | 100k | STATUS(IllegalState, error_msg)); |
1696 | | |
1697 | 100k | LOG_WITH_PREFIX(INFO) << "Refusing update from remote peer " |
1698 | 100k | << req.leader_uuid << ": " << error_msg; |
1699 | | |
1700 | | // If the terms mismatch we abort down to the index before the leader's preceding, |
1701 | | // since we know that is the last opid that has a chance of not being overwritten. |
1702 | | // Aborting preemptively here avoids us reporting a last received index that is |
1703 | | // possibly higher than the leader's causing an avoidable cache miss on the leader's |
1704 | | // queue. |
1705 | | // |
1706 | | // TODO: this isn't just an optimization! if we comment this out, we get |
1707 | | // failures on raft_consensus-itest a couple percent of the time! Should investigate |
1708 | | // why this is actually critical to do here, as opposed to just on requests that |
1709 | | // append some ops. |
1710 | 100k | if (term_mismatch) { |
1711 | 65 | return state_->AbortOpsAfterUnlocked(req.preceding_op_id.index - 1); |
1712 | 65 | } |
1713 | | |
1714 | 100k | return Status::OK(); |
1715 | 100k | } |
1716 | | |
1717 | | Status RaftConsensus::CheckLeaderRequestOpIdSequence( |
1718 | | const LeaderRequest& deduped_req, |
1719 | 25.5M | ConsensusRequestPB* request) { |
1720 | 25.5M | Status sequence_check_status; |
1721 | 25.5M | yb::OpId prev = deduped_req.preceding_op_id; |
1722 | 25.5M | for (const auto& message : deduped_req.messages) { |
1723 | 9.41M | auto current = yb::OpId::FromPB(message->id()); |
1724 | 9.41M | sequence_check_status = ReplicaState::CheckOpInSequence(prev, current); |
1725 | 9.41M | if (PREDICT_FALSE(!sequence_check_status.ok())) { |
1726 | 2 | LOG(ERROR) << "Leader request contained out-of-sequence messages. Status: " |
1727 | 2 | << sequence_check_status.ToString() << ". Leader Request: " |
1728 | 2 | << request->ShortDebugString(); |
1729 | 2 | break; |
1730 | 2 | } |
1731 | 9.41M | prev = current; |
1732 | 9.41M | } |
1733 | | |
1734 | | // We only release the messages from the request after the above check so that that we can print |
1735 | | // the original request, if it fails. |
1736 | 25.5M | if (!deduped_req.messages.empty()) { |
1737 | | // We take ownership of the deduped ops. |
1738 | 8.27M | DCHECK_GE(deduped_req.first_message_idx, 0); |
1739 | 8.27M | request->mutable_ops()->ExtractSubrange( |
1740 | 8.27M | narrow_cast<int>(deduped_req.first_message_idx), |
1741 | 8.27M | narrow_cast<int>(deduped_req.messages.size()), |
1742 | 8.27M | nullptr); |
1743 | 8.27M | } |
1744 | | |
1745 | | // We don't need request->ops() anymore, so could release them to avoid unnecessary memory |
1746 | | // consumption. |
1747 | 25.5M | request->mutable_ops()->Clear(); |
1748 | | |
1749 | 25.5M | return sequence_check_status; |
1750 | 25.5M | } |
1751 | | |
1752 | | Status RaftConsensus::CheckLeaderRequestUnlocked(ConsensusRequestPB* request, |
1753 | | ConsensusResponsePB* response, |
1754 | 25.5M | LeaderRequest* deduped_req) { |
1755 | 25.5M | RETURN_NOT_OK(DeduplicateLeaderRequestUnlocked(request, deduped_req)); |
1756 | | |
1757 | | // This is an additional check for KUDU-639 that makes sure the message's index |
1758 | | // and term are in the right sequence in the request, after we've deduplicated |
1759 | | // them. We do this before we change any of the internal state. |
1760 | | // |
1761 | | // TODO move this to raft_consensus-state or whatever we transform that into. |
1762 | | // We should be able to do this check for each append, but right now the way |
1763 | | // we initialize raft_consensus-state is preventing us from doing so. |
1764 | 25.5M | RETURN_NOT_OK(CheckLeaderRequestOpIdSequence(*deduped_req, request)); |
1765 | | |
1766 | 25.5M | RETURN_NOT_OK(HandleLeaderRequestTermUnlocked(request, response)); |
1767 | | |
1768 | 25.5M | if (response->status().has_error()) { |
1769 | 1.56k | return Status::OK(); |
1770 | 1.56k | } |
1771 | | |
1772 | 25.5M | RETURN_NOT_OK(EnforceLogMatchingPropertyMatchesUnlocked(*deduped_req, response)); |
1773 | | |
1774 | 25.5M | if (response->status().has_error()) { |
1775 | 120k | return Status::OK(); |
1776 | 120k | } |
1777 | | |
1778 | | // If the first of the messages to apply is not in our log, either it follows the last |
1779 | | // received message or it replaces some in-flight. |
1780 | 25.4M | if (!deduped_req->messages.empty()) { |
1781 | 8.27M | auto first_id = yb::OpId::FromPB(deduped_req->messages[0]->id()); |
1782 | 8.27M | bool term_mismatch; |
1783 | 8.27M | if (state_->IsOpCommittedOrPending(first_id, &term_mismatch)) { |
1784 | 0 | return STATUS_FORMAT(IllegalState, |
1785 | 0 | "First deduped message $0 is committed or pending", |
1786 | 0 | first_id); |
1787 | 0 | } |
1788 | | |
1789 | | // If the index is in our log but the terms are not the same abort down to the leader's |
1790 | | // preceding id. |
1791 | 8.27M | if (term_mismatch) { |
1792 | | // Since we are holding the lock ApplyPendingOperationsUnlocked would be invoked between |
1793 | | // those two. |
1794 | 103 | RETURN_NOT_OK(state_->AbortOpsAfterUnlocked(deduped_req->preceding_op_id.index)); |
1795 | 103 | RETURN_NOT_OK(log_->ResetLastSyncedEntryOpId(deduped_req->preceding_op_id)); |
1796 | 103 | } |
1797 | 8.27M | } |
1798 | | |
1799 | | // If all of the above logic was successful then we can consider this to be |
1800 | | // the effective leader of the configuration. If they are not currently marked as |
1801 | | // the leader locally, mark them as leader now. |
1802 | 25.4M | const string& caller_uuid = request->caller_uuid(); |
1803 | 25.4M | if (PREDICT_FALSE(state_->HasLeaderUnlocked() && |
1804 | 25.4M | state_->GetLeaderUuidUnlocked() != caller_uuid)) { |
1805 | 0 | LOG_WITH_PREFIX(FATAL) |
1806 | 0 | << "Unexpected new leader in same term! " |
1807 | 0 | << "Existing leader UUID: " << state_->GetLeaderUuidUnlocked() << ", " |
1808 | 0 | << "new leader UUID: " << caller_uuid; |
1809 | 0 | } |
1810 | 25.4M | if (PREDICT_FALSE(!state_->HasLeaderUnlocked())) { |
1811 | 120k | SetLeaderUuidUnlocked(caller_uuid); |
1812 | 120k | } |
1813 | | |
1814 | 25.4M | return Status::OK(); |
1815 | 25.4M | } |
1816 | | |
1817 | | Result<RaftConsensus::UpdateReplicaResult> RaftConsensus::UpdateReplica( |
1818 | 25.5M | ConsensusRequestPB* request, ConsensusResponsePB* response) { |
1819 | 25.5M | TRACE_EVENT2("consensus", "RaftConsensus::UpdateReplica", |
1820 | 25.5M | "peer", peer_uuid(), |
1821 | 25.5M | "tablet", tablet_id()); |
1822 | | |
1823 | 25.5M | if (request->has_propagated_hybrid_time()) { |
1824 | 25.5M | clock_->Update(HybridTime(request->propagated_hybrid_time())); |
1825 | 25.5M | } |
1826 | | |
1827 | | // The ordering of the following operations is crucial, read on for details. |
1828 | | // |
1829 | | // The main requirements explained in more detail below are: |
1830 | | // |
1831 | | // 1) We must enqueue the prepares before we write to our local log. |
1832 | | // 2) If we were able to enqueue a prepare then we must be able to log it. |
1833 | | // 3) If we fail to enqueue a prepare, we must not attempt to enqueue any |
1834 | | // later-indexed prepare or apply. |
1835 | | // |
1836 | | // See below for detailed rationale. |
1837 | | // |
1838 | | // The steps are: |
1839 | | // |
1840 | | // 0 - Dedup |
1841 | | // |
1842 | | // We make sure that we don't do anything on Replicate operations we've already received in a |
1843 | | // previous call. This essentially makes this method idempotent. |
1844 | | // |
1845 | | // 1 - We mark as many pending operations as committed as we can. |
1846 | | // |
1847 | | // We may have some pending operations that, according to the leader, are now |
1848 | | // committed. We Apply them early, because: |
1849 | | // - Soon (step 2) we may reject the call due to excessive memory pressure. One |
1850 | | // way to relieve the pressure is by flushing the MRS, and applying these |
1851 | | // operations may unblock an in-flight Flush(). |
1852 | | // - The Apply and subsequent Prepares (step 2) can take place concurrently. |
1853 | | // |
1854 | | // 2 - We enqueue the Prepare of the operations. |
1855 | | // |
1856 | | // The actual prepares are enqueued in order but happen asynchronously so we don't |
1857 | | // have decoding/acquiring locks on the critical path. |
1858 | | // |
1859 | | // We need to do this now for a number of reasons: |
1860 | | // - Prepares, by themselves, are inconsequential, i.e. they do not mutate the |
1861 | | // state machine so, were we to crash afterwards, having the prepares in-flight |
1862 | | // won't hurt. |
1863 | | // - Prepares depend on factors external to consensus (the operation drivers and |
1864 | | // the tablet peer) so if for some reason they cannot be enqueued we must know |
1865 | | // before we try write them to the WAL. Once enqueued, we assume that prepare will |
1866 | | // always succeed on a replica operation (because the leader already prepared them |
1867 | | // successfully, and thus we know they are valid). |
1868 | | // - The prepares corresponding to every operation that was logged must be in-flight |
1869 | | // first. This because should we need to abort certain operations (say a new leader |
1870 | | // says they are not committed) we need to have those prepares in-flight so that |
1871 | | // the operations can be continued (in the abort path). |
1872 | | // - Failure to enqueue prepares is OK, we can continue and let the leader know that |
1873 | | // we only went so far. The leader will re-send the remaining messages. |
1874 | | // - Prepares represent new operations, and operations consume memory. Thus, if the |
1875 | | // overall memory pressure on the server is too high, we will reject the prepares. |
1876 | | // |
1877 | | // 3 - We enqueue the writes to the WAL. |
1878 | | // |
1879 | | // We enqueue writes to the WAL, but only the operations that were successfully |
1880 | | // enqueued for prepare (for the reasons introduced above). This means that even |
1881 | | // if a prepare fails to enqueue, if any of the previous prepares were successfully |
1882 | | // submitted they must be written to the WAL. |
1883 | | // If writing to the WAL fails, we're in an inconsistent state and we crash. In this |
1884 | | // case, no one will ever know of the operations we previously prepared so those are |
1885 | | // inconsequential. |
1886 | | // |
1887 | | // 4 - We mark the operations as committed. |
1888 | | // |
1889 | | // For each operation which has been committed by the leader, we update the |
1890 | | // operation state to reflect that. If the logging has already succeeded for that |
1891 | | // operation, this will trigger the Apply phase. Otherwise, Apply will be triggered |
1892 | | // when the logging completes. In both cases the Apply phase executes asynchronously. |
1893 | | // This must, of course, happen after the prepares have been triggered as the same batch |
1894 | | // can both replicate/prepare and commit/apply an operation. |
1895 | | // |
1896 | | // Currently, if a prepare failed to enqueue we still trigger all applies for operations |
1897 | | // with an id lower than it (if we have them). This is important now as the leader will |
1898 | | // not re-send those commit messages. This will be moot when we move to the commit |
1899 | | // commitIndex way of doing things as we can simply ignore the applies as we know |
1900 | | // they will be triggered with the next successful batch. |
1901 | | // |
1902 | | // 5 - We wait for the writes to be durable. |
1903 | | // |
1904 | | // Before replying to the leader we wait for the writes to be durable. We then |
1905 | | // just update the last replicated watermark and respond. |
1906 | | // |
1907 | | // TODO - These failure scenarios need to be exercised in an unit |
1908 | | // test. Moreover we need to add more fault injection spots (well that |
1909 | | // and actually use them) for each of these steps. |
1910 | 25.5M | TRACE("Updating replica for $0 ops", request->ops_size()); |
1911 | | |
1912 | | // The deduplicated request. |
1913 | 25.5M | LeaderRequest deduped_req; |
1914 | | |
1915 | 25.5M | ReplicaState::UniqueLock lock; |
1916 | 25.5M | RETURN_NOT_OK(state_->LockForUpdate(&lock)); |
1917 | | |
1918 | 25.5M | const auto old_leader = state_->GetLeaderUuidUnlocked(); |
1919 | | |
1920 | 25.5M | auto prev_committed_op_id = state_->GetCommittedOpIdUnlocked(); |
1921 | | |
1922 | 25.5M | deduped_req.leader_uuid = request->caller_uuid(); |
1923 | | |
1924 | 25.5M | RETURN_NOT_OK(CheckLeaderRequestUnlocked(request, response, &deduped_req)); |
1925 | | |
1926 | 25.5M | if (response->status().has_error()) { |
1927 | 122k | LOG_WITH_PREFIX(INFO) |
1928 | 122k | << "Returning from UpdateConsensus because of error: " << AsString(response->status()); |
1929 | | // We had an error, like an invalid term, we still fill the response. |
1930 | 122k | FillConsensusResponseOKUnlocked(response); |
1931 | 122k | return UpdateReplicaResult(); |
1932 | 122k | } |
1933 | | |
1934 | 25.4M | TEST_PAUSE_IF_FLAG(TEST_pause_update_replica); |
1935 | | |
1936 | | // Snooze the failure detector as soon as we decide to accept the message. |
1937 | | // We are guaranteed to be acting as a FOLLOWER at this point by the above |
1938 | | // sanity check. |
1939 | 25.4M | SnoozeFailureDetector(DO_NOT_LOG); |
1940 | | |
1941 | 25.4M | auto now = MonoTime::Now(); |
1942 | | |
1943 | | // Update the expiration time of the current leader's lease, so that when this follower becomes |
1944 | | // a leader, it can wait out the time interval while the old leader might still be active. |
1945 | 25.4M | if (request->has_leader_lease_duration_ms()) { |
1946 | 25.4M | state_->UpdateOldLeaderLeaseExpirationOnNonLeaderUnlocked( |
1947 | 25.4M | CoarseTimeLease(deduped_req.leader_uuid, |
1948 | 25.4M | CoarseMonoClock::now() + request->leader_lease_duration_ms() * 1ms), |
1949 | 25.4M | PhysicalComponentLease(deduped_req.leader_uuid, request->ht_lease_expiration())); |
1950 | 25.4M | } |
1951 | | |
1952 | | // Also prohibit voting for anyone for the minimum election timeout. |
1953 | 25.4M | withhold_votes_until_.store(now + MinimumElectionTimeout(), std::memory_order_release); |
1954 | | |
1955 | | // 1 - Early commit pending (and committed) operations |
1956 | 25.4M | RETURN_NOT_OK(EarlyCommitUnlocked(*request, deduped_req)); |
1957 | | |
1958 | | // 2 - Enqueue the prepares |
1959 | 25.4M | if (!VERIFY_RESULT(EnqueuePreparesUnlocked(*request, &deduped_req, response))) { |
1960 | 1 | return UpdateReplicaResult(); |
1961 | 1 | } |
1962 | | |
1963 | 25.4M | if (deduped_req.committed_op_id.index < prev_committed_op_id.index) { |
1964 | 75 | deduped_req.committed_op_id = prev_committed_op_id; |
1965 | 75 | } |
1966 | | |
1967 | | // 3 - Enqueue the writes. |
1968 | 25.4M | auto last_from_leader = EnqueueWritesUnlocked( |
1969 | 25.4M | deduped_req, WriteEmpty(prev_committed_op_id != deduped_req.committed_op_id)); |
1970 | | |
1971 | | // 4 - Mark operations as committed |
1972 | 25.4M | RETURN_NOT_OK(MarkOperationsAsCommittedUnlocked(*request, deduped_req, last_from_leader)); |
1973 | | |
1974 | | // Fill the response with the current state. We will not mutate anymore state until |
1975 | | // we actually reply to the leader, we'll just wait for the messages to be durable. |
1976 | 25.4M | FillConsensusResponseOKUnlocked(response); |
1977 | | |
1978 | 25.4M | UpdateReplicaResult result; |
1979 | | |
1980 | | // Check if there is an election pending and the op id pending upon has just been committed. |
1981 | 25.4M | const auto& pending_election_op_id = state_->GetPendingElectionOpIdUnlocked(); |
1982 | 25.4M | result.start_election = |
1983 | 25.4M | !pending_election_op_id.empty() && |
1984 | 25.4M | pending_election_op_id.index <= state_->GetCommittedOpIdUnlocked().index0 ; |
1985 | | |
1986 | 25.4M | if (!deduped_req.messages.empty()) { |
1987 | 8.28M | result.wait_for_op_id = state_->GetLastReceivedOpIdUnlocked(); |
1988 | 8.28M | } |
1989 | 25.4M | result.current_term = state_->GetCurrentTermUnlocked(); |
1990 | | |
1991 | 25.4M | uint64_t update_time_ms = 0; |
1992 | 25.4M | if (request->has_propagated_hybrid_time()) { |
1993 | 25.4M | update_time_ms = HybridTime::FromPB( |
1994 | 25.4M | request->propagated_hybrid_time()).GetPhysicalValueMicros() / 1000; |
1995 | 25.4M | } else if (3.56k !deduped_req.messages.empty()3.56k ) { |
1996 | 112 | update_time_ms = HybridTime::FromPB( |
1997 | 112 | deduped_req.messages.back()->hybrid_time()).GetPhysicalValueMicros() / 1000; |
1998 | 112 | } |
1999 | 25.4M | follower_last_update_time_ms_metric_->UpdateTimestampInMilliseconds( |
2000 | 25.4M | (update_time_ms > 0 ? update_time_ms25.4M : clock_->Now().GetPhysicalValueMicros() / 10005.97k )); |
2001 | 25.4M | TRACE("UpdateReplica() finished"); |
2002 | 25.4M | return result; |
2003 | 25.4M | } |
2004 | | |
2005 | | Status RaftConsensus::EarlyCommitUnlocked(const ConsensusRequestPB& request, |
2006 | 25.4M | const LeaderRequest& deduped_req) { |
2007 | | // What should we commit? |
2008 | | // 1. As many pending operations as we can, except... |
2009 | | // 2. ...if we commit beyond the preceding index, we'd regress KUDU-639 |
2010 | | // ("Leader doesn't overwrite demoted follower's log properly"), and... |
2011 | | // 3. ...the leader's committed index is always our upper bound. |
2012 | 25.4M | auto early_apply_up_to = state_->GetLastPendingOperationOpIdUnlocked(); |
2013 | 25.4M | if (deduped_req.preceding_op_id.index < early_apply_up_to.index) { |
2014 | 5 | early_apply_up_to = deduped_req.preceding_op_id; |
2015 | 5 | } |
2016 | 25.4M | if (request.committed_op_id().index() < early_apply_up_to.index) { |
2017 | 40.8k | early_apply_up_to = yb::OpId::FromPB(request.committed_op_id()); |
2018 | 40.8k | } |
2019 | | |
2020 | 25.4M | VLOG_WITH_PREFIX244 (1) << "Early marking committed up to " << early_apply_up_to244 ; |
2021 | 25.4M | TRACE("Early marking committed up to $0.$1", early_apply_up_to.term, early_apply_up_to.index); |
2022 | 25.4M | return ResultToStatus(state_->AdvanceCommittedOpIdUnlocked(early_apply_up_to, CouldStop::kTrue)); |
2023 | 25.4M | } |
2024 | | |
2025 | | Result<bool> RaftConsensus::EnqueuePreparesUnlocked(const ConsensusRequestPB& request, |
2026 | | LeaderRequest* deduped_req_ptr, |
2027 | 25.4M | ConsensusResponsePB* response) { |
2028 | 25.4M | LeaderRequest& deduped_req = *deduped_req_ptr; |
2029 | 25.4M | TRACE("Triggering prepare for $0 ops", deduped_req.messages.size()); |
2030 | | |
2031 | 25.4M | Status prepare_status; |
2032 | 25.4M | auto iter = deduped_req.messages.begin(); |
2033 | | |
2034 | 25.4M | if (PREDICT_TRUE(!deduped_req.messages.empty())) { |
2035 | | // TODO Temporary until the leader explicitly propagates the safe hybrid_time. |
2036 | | // TODO: what if there is a failure here because the updated time is too far in the future? |
2037 | 8.27M | clock_->Update(HybridTime(deduped_req.messages.back()->hybrid_time())); |
2038 | 8.27M | } |
2039 | | |
2040 | 25.4M | HybridTime propagated_safe_time; |
2041 | 25.4M | if (request.has_propagated_safe_time()) { |
2042 | 25.4M | propagated_safe_time = HybridTime(request.propagated_safe_time()); |
2043 | 25.4M | if (deduped_req.messages.empty()) { |
2044 | 17.1M | state_->context()->SetPropagatedSafeTime(propagated_safe_time); |
2045 | 17.1M | } |
2046 | 25.4M | } |
2047 | | |
2048 | 25.4M | if (iter != deduped_req.messages.end()) { |
2049 | 9.41M | for (;;) { |
2050 | 9.41M | const ReplicateMsgPtr& msg = *iter; |
2051 | 9.41M | ++iter; |
2052 | 9.41M | bool last = iter == deduped_req.messages.end(); |
2053 | 9.41M | prepare_status = StartReplicaOperationUnlocked( |
2054 | 9.41M | msg, last ? propagated_safe_time8.27M : HybridTime::kInvalid1.13M ); |
2055 | 9.41M | if (PREDICT_FALSE(!prepare_status.ok())) { |
2056 | 322 | --iter; |
2057 | 322 | LOG_WITH_PREFIX(WARNING) << "StartReplicaOperationUnlocked failed: " << prepare_status; |
2058 | 322 | break; |
2059 | 322 | } |
2060 | 9.41M | if (last) { |
2061 | 8.28M | break; |
2062 | 8.28M | } |
2063 | 9.41M | } |
2064 | 8.27M | } |
2065 | | |
2066 | | // If we stopped before reaching the end we failed to prepare some message(s) and need |
2067 | | // to perform cleanup, namely trimming deduped_req.messages to only contain the messages |
2068 | | // that were actually prepared, and deleting the other ones since we've taken ownership |
2069 | | // when we first deduped. |
2070 | 25.4M | bool incomplete = iter != deduped_req.messages.end(); |
2071 | 25.4M | if (incomplete) { |
2072 | 322 | { |
2073 | 322 | const ReplicateMsgPtr msg = *iter; |
2074 | 322 | LOG_WITH_PREFIX(WARNING) |
2075 | 322 | << "Could not prepare operation for op: " |
2076 | 322 | << msg->id() << ". Suppressed " << (deduped_req.messages.end() - iter - 1) |
2077 | 322 | << " other warnings. Status for this op: " << prepare_status; |
2078 | 322 | deduped_req.messages.erase(iter, deduped_req.messages.end()); |
2079 | 322 | } |
2080 | | |
2081 | | // If this is empty, it means we couldn't prepare a single de-duped message. There is nothing |
2082 | | // else we can do. The leader will detect this and retry later. |
2083 | 322 | if (deduped_req.messages.empty()) { |
2084 | 1 | auto msg = Format("Rejecting Update request from peer $0 for term $1. " |
2085 | 1 | "Could not prepare a single operation due to: $2", |
2086 | 1 | request.caller_uuid(), |
2087 | 1 | request.caller_term(), |
2088 | 1 | prepare_status); |
2089 | 1 | LOG_WITH_PREFIX(INFO) << msg; |
2090 | 1 | FillConsensusResponseError(response, ConsensusErrorPB::CANNOT_PREPARE, |
2091 | 1 | STATUS(IllegalState, msg)); |
2092 | 1 | FillConsensusResponseOKUnlocked(response); |
2093 | 1 | return false; |
2094 | 1 | } |
2095 | 322 | } |
2096 | | |
2097 | 25.4M | deduped_req.committed_op_id = yb::OpId::FromPB(request.committed_op_id()); |
2098 | 25.4M | if (!deduped_req.messages.empty()) { |
2099 | 8.28M | auto last_op_id = yb::OpId::FromPB(deduped_req.messages.back()->id()); |
2100 | 8.28M | if (deduped_req.committed_op_id > last_op_id) { |
2101 | 299 | LOG_IF_WITH_PREFIX0 (DFATAL, !incomplete) |
2102 | 0 | << "Received committed op id: " << deduped_req.committed_op_id |
2103 | 0 | << ", past last known op id: " << last_op_id; |
2104 | | |
2105 | | // It is possible that we failed to prepare of of messages, |
2106 | | // so limit committed op id to avoid having committed op id past last known op it. |
2107 | 299 | deduped_req.committed_op_id = last_op_id; |
2108 | 299 | } |
2109 | 8.28M | } |
2110 | | |
2111 | 25.4M | return true; |
2112 | 25.4M | } |
2113 | | |
2114 | | yb::OpId RaftConsensus::EnqueueWritesUnlocked(const LeaderRequest& deduped_req, |
2115 | 25.4M | WriteEmpty write_empty) { |
2116 | | // Now that we've triggered the prepares enqueue the operations to be written |
2117 | | // to the WAL. |
2118 | 25.4M | if (PREDICT_TRUE(!deduped_req.messages.empty()) || write_empty17.1M ) { |
2119 | | // Trigger the log append asap, if fsync() is on this might take a while |
2120 | | // and we can't reply until this is done. |
2121 | | // |
2122 | | // Since we've prepared, we need to be able to append (or we risk trying to apply |
2123 | | // later something that wasn't logged). We crash if we can't. |
2124 | 15.0M | CHECK_OK(queue_->AppendOperations( |
2125 | 15.0M | deduped_req.messages, deduped_req.committed_op_id, state_->Clock().Now())); |
2126 | 15.0M | } |
2127 | | |
2128 | 25.4M | return !deduped_req.messages.empty() ? |
2129 | 17.1M | yb::OpId::FromPB(deduped_req.messages.back()->id())8.28M : deduped_req.preceding_op_id; |
2130 | 25.4M | } |
2131 | | |
2132 | 8.27M | Status RaftConsensus::WaitForWrites(int64_t term, const OpId& wait_for_op_id) { |
2133 | | // 5 - We wait for the writes to be durable. |
2134 | | |
2135 | | // Note that this is safe because dist consensus now only supports a single outstanding |
2136 | | // request at a time and this way we can allow commits to proceed while we wait. |
2137 | 8.27M | TRACE("Waiting on the replicates to finish logging"); |
2138 | 8.27M | TRACE_EVENT0("consensus", "Wait for log"); |
2139 | 8.27M | for (;;) { |
2140 | 8.27M | auto wait_result = log_->WaitForSafeOpIdToApply( |
2141 | 8.27M | wait_for_op_id, MonoDelta::FromMilliseconds(FLAGS_raft_heartbeat_interval_ms)); |
2142 | | // If just waiting for our log append to finish lets snooze the timer. |
2143 | | // We don't want to fire leader election because we're waiting on our own log. |
2144 | 8.28M | if (!wait_result.empty()8.27M ) { |
2145 | 8.28M | break; |
2146 | 8.28M | } |
2147 | 18.4E | int64_t new_term; |
2148 | 18.4E | { |
2149 | 18.4E | auto lock = state_->LockForRead(); |
2150 | 18.4E | new_term = state_->GetCurrentTermUnlocked(); |
2151 | 18.4E | } |
2152 | 18.4E | if (term != new_term) { |
2153 | 0 | return STATUS_FORMAT(IllegalState, "Term changed to $0 while waiting for writes in term $1", |
2154 | 0 | new_term, term); |
2155 | 0 | } |
2156 | | |
2157 | 18.4E | SnoozeFailureDetector(DO_NOT_LOG); |
2158 | | |
2159 | 18.4E | const auto election_timeout_at = MonoTime::Now() + MinimumElectionTimeout(); |
2160 | 18.4E | UpdateAtomicMax(&withhold_votes_until_, election_timeout_at); |
2161 | 18.4E | } |
2162 | 8.27M | TRACE("Finished waiting on the replicates to finish logging"); |
2163 | | |
2164 | 8.27M | return Status::OK(); |
2165 | 8.27M | } |
2166 | | |
2167 | | Status RaftConsensus::MarkOperationsAsCommittedUnlocked(const ConsensusRequestPB& request, |
2168 | | const LeaderRequest& deduped_req, |
2169 | 25.4M | yb::OpId last_from_leader) { |
2170 | | // Choose the last operation to be applied. This will either be 'committed_index', if |
2171 | | // no prepare enqueuing failed, or the minimum between 'committed_index' and the id of |
2172 | | // the last successfully enqueued prepare, if some prepare failed to enqueue. |
2173 | 25.4M | yb::OpId apply_up_to; |
2174 | 25.4M | if (last_from_leader.index < request.committed_op_id().index()) { |
2175 | | // we should never apply anything later than what we received in this request |
2176 | 300 | apply_up_to = last_from_leader; |
2177 | | |
2178 | 300 | LOG_WITH_PREFIX(INFO) |
2179 | 300 | << "Received commit index " << request.committed_op_id() |
2180 | 300 | << " from the leader but only marked up to " << apply_up_to << " as committed."; |
2181 | 25.4M | } else { |
2182 | 25.4M | apply_up_to = yb::OpId::FromPB(request.committed_op_id()); |
2183 | 25.4M | } |
2184 | | |
2185 | | // We can now update the last received watermark. |
2186 | | // |
2187 | | // We do it here (and before we actually hear back from the wal whether things |
2188 | | // are durable) so that, if we receive another, possible duplicate, message |
2189 | | // that exercises this path we don't handle these messages twice. |
2190 | | // |
2191 | | // If any messages failed to be started locally, then we already have removed them |
2192 | | // from 'deduped_req' at this point. So, we can simply update our last-received |
2193 | | // watermark to the last message that remains in 'deduped_req'. |
2194 | | // |
2195 | | // It's possible that the leader didn't send us any new data -- it might be a completely |
2196 | | // duplicate request. In that case, we don't need to update LastReceived at all. |
2197 | 25.4M | if (!deduped_req.messages.empty()) { |
2198 | 8.28M | OpIdPB last_appended = deduped_req.messages.back()->id(); |
2199 | 8.28M | TRACE(Substitute("Updating last received op as $0", last_appended.ShortDebugString())); |
2200 | 8.28M | state_->UpdateLastReceivedOpIdUnlocked(last_appended); |
2201 | 17.1M | } else if (state_->GetLastReceivedOpIdUnlocked().index < deduped_req.preceding_op_id.index) { |
2202 | 0 | return STATUS_FORMAT(InvalidArgument, |
2203 | 0 | "Bad preceding_opid: $0, last received: $1", |
2204 | 0 | deduped_req.preceding_op_id, |
2205 | 0 | state_->GetLastReceivedOpIdUnlocked()); |
2206 | 0 | } |
2207 | | |
2208 | 18.4E | VLOG_WITH_PREFIX(1) << "Marking committed up to " << apply_up_to; |
2209 | 25.4M | TRACE(Format("Marking committed up to $0", apply_up_to)); |
2210 | 25.4M | return ResultToStatus(state_->AdvanceCommittedOpIdUnlocked(apply_up_to, CouldStop::kTrue)); |
2211 | 25.4M | } |
2212 | | |
2213 | 25.5M | void RaftConsensus::FillConsensusResponseOKUnlocked(ConsensusResponsePB* response) { |
2214 | 25.5M | TRACE("Filling consensus response to leader."); |
2215 | 25.5M | response->set_responder_term(state_->GetCurrentTermUnlocked()); |
2216 | 25.5M | state_->GetLastReceivedOpIdUnlocked().ToPB(response->mutable_status()->mutable_last_received()); |
2217 | 25.5M | state_->GetLastReceivedOpIdCurLeaderUnlocked().ToPB( |
2218 | 25.5M | response->mutable_status()->mutable_last_received_current_leader()); |
2219 | 25.5M | response->mutable_status()->set_last_committed_idx(state_->GetCommittedOpIdUnlocked().index); |
2220 | 25.5M | state_->GetLastAppliedOpIdUnlocked().ToPB(response->mutable_status()->mutable_last_applied()); |
2221 | 25.5M | } |
2222 | | |
2223 | | void RaftConsensus::FillConsensusResponseError(ConsensusResponsePB* response, |
2224 | | ConsensusErrorPB::Code error_code, |
2225 | 122k | const Status& status) { |
2226 | 122k | ConsensusErrorPB* error = response->mutable_status()->mutable_error(); |
2227 | 122k | error->set_code(error_code); |
2228 | 122k | StatusToPB(status, error->mutable_status()); |
2229 | 122k | } |
2230 | | |
2231 | 1.36M | Status RaftConsensus::RequestVote(const VoteRequestPB* request, VoteResponsePB* response) { |
2232 | 1.36M | TRACE_EVENT2("consensus", "RaftConsensus::RequestVote", |
2233 | 1.36M | "peer", peer_uuid(), |
2234 | 1.36M | "tablet", tablet_id()); |
2235 | 1.36M | bool preelection = request->preelection(); |
2236 | | |
2237 | 1.36M | response->set_responder_uuid(state_->GetPeerUuid()); |
2238 | 1.36M | response->set_preelection(preelection); |
2239 | | |
2240 | | // We must acquire the update lock in order to ensure that this vote action |
2241 | | // takes place between requests. |
2242 | | // Lock ordering: The update lock must be acquired before the ReplicaState lock. |
2243 | 1.36M | std::unique_lock<decltype(update_mutex_)> update_guard(update_mutex_, std::defer_lock); |
2244 | 1.36M | if (FLAGS_enable_leader_failure_detection) { |
2245 | 1.36M | update_guard.try_lock(); |
2246 | 1.36M | } else { |
2247 | | // If failure detection is not enabled, then we can't just reject the vote, |
2248 | | // because there will be no automatic retry later. So, block for the lock. |
2249 | 770 | update_guard.lock(); |
2250 | 770 | } |
2251 | 1.36M | if (!update_guard.owns_lock()) { |
2252 | | // There is another vote or update concurrent with the vote. In that case, that |
2253 | | // other request is likely to reset the timer, and we'll end up just voting |
2254 | | // "NO" after waiting. To avoid starving RPC handlers and causing cascading |
2255 | | // timeouts, just vote a quick NO. |
2256 | | // |
2257 | | // We still need to take the state lock in order to respond with term info, etc. |
2258 | 11.1k | ReplicaState::UniqueLock state_guard; |
2259 | 11.1k | RETURN_NOT_OK(state_->LockForConfigChange(&state_guard)); |
2260 | 11.1k | return RequestVoteRespondIsBusy(request, response); |
2261 | 11.1k | } |
2262 | | |
2263 | | // Acquire the replica state lock so we can read / modify the consensus state. |
2264 | 1.35M | ReplicaState::UniqueLock state_guard; |
2265 | 1.35M | RETURN_NOT_OK(state_->LockForConfigChange(&state_guard)); |
2266 | | |
2267 | | // If the node is not in the configuration, allow the vote (this is required by Raft) |
2268 | | // but log an informational message anyway. |
2269 | 1.35M | if (!IsRaftConfigMember(request->candidate_uuid(), state_->GetActiveConfigUnlocked())) { |
2270 | 90 | LOG_WITH_PREFIX(INFO) << "Handling vote request from an unknown peer " |
2271 | 90 | << request->candidate_uuid(); |
2272 | 90 | } |
2273 | | |
2274 | | // If we've heard recently from the leader, then we should ignore the request |
2275 | | // (except if it is the leader itself requesting a vote -- something that might |
2276 | | // happen if the leader were to stepdown and call an election.). Otherwise, |
2277 | | // it might be from a "disruptive" server. This could happen in a few cases: |
2278 | | // |
2279 | | // 1) Network partitions |
2280 | | // If the leader can talk to a majority of the nodes, but is partitioned from a |
2281 | | // bad node, the bad node's failure detector will trigger. If the bad node is |
2282 | | // able to reach other nodes in the cluster, it will continuously trigger elections. |
2283 | | // |
2284 | | // 2) An abandoned node |
2285 | | // It's possible that a node has fallen behind the log GC mark of the leader. In that |
2286 | | // case, the leader will stop sending it requests. Eventually, the configuration |
2287 | | // will change to eject the abandoned node, but until that point, we don't want the |
2288 | | // abandoned follower to disturb the other nodes. |
2289 | | // |
2290 | | // See also https://ramcloud.stanford.edu/~ongaro/thesis.pdf |
2291 | | // section 4.2.3. |
2292 | 1.35M | MonoTime now = MonoTime::Now(); |
2293 | 1.35M | if (request->candidate_uuid() != state_->GetLeaderUuidUnlocked() && |
2294 | 1.35M | !request->ignore_live_leader()723k && |
2295 | 1.35M | now < withhold_votes_until_.load(std::memory_order_acquire)545k ) { |
2296 | 8.89k | return RequestVoteRespondLeaderIsAlive(request, response); |
2297 | 8.89k | } |
2298 | | |
2299 | | // Candidate is running behind. |
2300 | 1.34M | if (request->candidate_term() < state_->GetCurrentTermUnlocked()) { |
2301 | 204k | return RequestVoteRespondInvalidTerm(request, response); |
2302 | 204k | } |
2303 | | |
2304 | | // We already voted this term. |
2305 | 1.13M | if (request->candidate_term() == state_->GetCurrentTermUnlocked() && |
2306 | 1.13M | state_->HasVotedCurrentTermUnlocked()62.1k ) { |
2307 | | |
2308 | | // Already voted for the same candidate in the current term. |
2309 | 1.37k | if (state_->GetVotedForCurrentTermUnlocked() == request->candidate_uuid()) { |
2310 | 209 | return RequestVoteRespondVoteAlreadyGranted(request, response); |
2311 | 209 | } |
2312 | | |
2313 | | // Voted for someone else in current term. |
2314 | 1.16k | return RequestVoteRespondAlreadyVotedForOther(request, response); |
2315 | 1.37k | } |
2316 | | |
2317 | | // The term advanced. |
2318 | 1.13M | if (request->candidate_term() > state_->GetCurrentTermUnlocked() && !preelection1.07M ) { |
2319 | 107k | RETURN_NOT_OK_PREPEND(HandleTermAdvanceUnlocked(request->candidate_term()), |
2320 | 107k | Substitute("Could not step down in RequestVote. Current term: $0, candidate term: $1", |
2321 | 107k | state_->GetCurrentTermUnlocked(), request->candidate_term())); |
2322 | 107k | } |
2323 | | |
2324 | | // Candidate must have last-logged OpId at least as large as our own to get our vote. |
2325 | 1.13M | OpIdPB local_last_logged_opid; |
2326 | 1.13M | GetLatestOpIdFromLog().ToPB(&local_last_logged_opid); |
2327 | 1.13M | if (OpIdLessThan(request->candidate_status().last_received(), local_last_logged_opid)) { |
2328 | 433 | return RequestVoteRespondLastOpIdTooOld(local_last_logged_opid, request, response); |
2329 | 433 | } |
2330 | | |
2331 | 1.13M | if (!preelection) { |
2332 | | // Clear the pending election op id if any before granting the vote. If another peer jumps in |
2333 | | // before we can catch up and start the election, let's not disrupt the quorum with another |
2334 | | // election. |
2335 | 107k | state_->ClearPendingElectionOpIdUnlocked(); |
2336 | 107k | } |
2337 | | |
2338 | 1.13M | auto remaining_old_leader_lease = state_->RemainingOldLeaderLeaseDuration(); |
2339 | | |
2340 | 1.13M | if (remaining_old_leader_lease.Initialized()) { |
2341 | 20.7k | response->set_remaining_leader_lease_duration_ms( |
2342 | 20.7k | narrow_cast<int32_t>(remaining_old_leader_lease.ToMilliseconds())); |
2343 | 20.7k | response->set_leader_lease_uuid(state_->old_leader_lease().holder_uuid); |
2344 | 20.7k | } |
2345 | | |
2346 | 1.13M | const auto& old_leader_ht_lease = state_->old_leader_ht_lease(); |
2347 | 1.13M | if (old_leader_ht_lease) { |
2348 | 803k | response->set_leader_ht_lease_expiration(old_leader_ht_lease.expiration); |
2349 | 803k | response->set_leader_ht_lease_uuid(old_leader_ht_lease.holder_uuid); |
2350 | 803k | } |
2351 | | |
2352 | | // Passed all our checks. Vote granted. |
2353 | 1.13M | if (preelection) { |
2354 | 1.02M | LOG_WITH_PREFIX(INFO) << "Pre-election. Granting vote for candidate " |
2355 | 1.02M | << request->candidate_uuid() << " in term " << request->candidate_term(); |
2356 | 1.02M | FillVoteResponseVoteGranted(*request, response); |
2357 | 1.02M | return Status::OK(); |
2358 | 1.02M | } |
2359 | | |
2360 | 108k | return RequestVoteRespondVoteGranted(request, response); |
2361 | 1.13M | } |
2362 | | |
2363 | | Status RaftConsensus::IsLeaderReadyForChangeConfigUnlocked(ChangeConfigType type, |
2364 | 2.91M | const string& server_uuid) { |
2365 | 2.91M | const RaftConfigPB& active_config = state_->GetActiveConfigUnlocked(); |
2366 | 2.91M | size_t servers_in_transition = 0; |
2367 | 2.91M | if (type == ADD_SERVER) { |
2368 | 2.53k | servers_in_transition = CountServersInTransition(active_config); |
2369 | 2.91M | } else if (type == REMOVE_SERVER) { |
2370 | | // If we are trying to remove the server in transition, then servers_in_transition shouldn't |
2371 | | // count it so we can proceed with the operation. |
2372 | 2.18k | servers_in_transition = CountServersInTransition(active_config, server_uuid); |
2373 | 2.18k | } |
2374 | | |
2375 | | // Check that all the following requirements are met: |
2376 | | // 1. We are required by Raft to reject config change operations until we have |
2377 | | // committed at least one operation in our current term as leader. |
2378 | | // See https://groups.google.com/forum/#!topic/raft-dev/t4xj6dJTP6E |
2379 | | // 2. Ensure there is no other pending change config. |
2380 | | // 3. There are no peers that are in the process of becoming VOTERs or OBSERVERs. |
2381 | 2.91M | if (!state_->AreCommittedAndCurrentTermsSameUnlocked() || |
2382 | 2.91M | state_->IsConfigChangePendingUnlocked() || |
2383 | 2.91M | servers_in_transition != 06.18k ) { |
2384 | 2.91M | return STATUS_FORMAT(IllegalState, |
2385 | 2.91M | "Leader is not ready for Config Change, can try again. " |
2386 | 2.91M | "Num peers in transit: $0. Type: $1. Has opid: $2. Committed config: $3. " |
2387 | 2.91M | "Pending config: $4. Current term: $5. Committed op id: $6.", |
2388 | 2.91M | servers_in_transition, ChangeConfigType_Name(type), |
2389 | 2.91M | active_config.has_opid_index(), |
2390 | 2.91M | state_->GetCommittedConfigUnlocked().ShortDebugString(), |
2391 | 2.91M | state_->IsConfigChangePendingUnlocked() ? |
2392 | 2.91M | state_->GetPendingConfigUnlocked().ShortDebugString() : "", |
2393 | 2.91M | state_->GetCurrentTermUnlocked(), state_->GetCommittedOpIdUnlocked()); |
2394 | 2.91M | } |
2395 | | |
2396 | 5.92k | return Status::OK(); |
2397 | 2.91M | } |
2398 | | |
2399 | | Status RaftConsensus::ChangeConfig(const ChangeConfigRequestPB& req, |
2400 | | const StdStatusCallback& client_cb, |
2401 | 2.91M | boost::optional<TabletServerErrorPB::Code>* error_code) { |
2402 | 2.91M | if (PREDICT_FALSE(!req.has_type())) { |
2403 | 0 | return STATUS(InvalidArgument, "Must specify 'type' argument to ChangeConfig()", |
2404 | 0 | req.ShortDebugString()); |
2405 | 0 | } |
2406 | 2.91M | if (PREDICT_FALSE(!req.has_server())) { |
2407 | 0 | *error_code = TabletServerErrorPB::INVALID_CONFIG; |
2408 | 0 | return STATUS(InvalidArgument, "Must specify 'server' argument to ChangeConfig()", |
2409 | 0 | req.ShortDebugString()); |
2410 | 0 | } |
2411 | 2.91M | YB_LOG_EVERY_N(INFO, FLAGS_TEST_log_change_config_every_n) |
2412 | 8.39k | << "Received ChangeConfig request " << req.ShortDebugString(); |
2413 | 2.91M | ChangeConfigType type = req.type(); |
2414 | 2.91M | bool use_hostport = req.has_use_host() && req.use_host()23 ; |
2415 | | |
2416 | 2.91M | if (type != REMOVE_SERVER && use_hostport2.91M ) { |
2417 | 0 | return STATUS_SUBSTITUTE(InvalidArgument, "Cannot set use_host for change config type $0, " |
2418 | 0 | "only allowed with REMOVE_SERVER.", type); |
2419 | 0 | } |
2420 | | |
2421 | 2.91M | if (PREDICT_FALSE(FLAGS_TEST_return_error_on_change_config != 0.0 && type == CHANGE_ROLE)) { |
2422 | 0 | DCHECK(FLAGS_TEST_return_error_on_change_config >= 0.0 && |
2423 | 0 | FLAGS_TEST_return_error_on_change_config <= 1.0); |
2424 | 0 | if (clock_->Now().ToUint64() % 100 < 100 * FLAGS_TEST_return_error_on_change_config) { |
2425 | 0 | return STATUS(IllegalState, "Returning error for unit test"); |
2426 | 0 | } |
2427 | 0 | } |
2428 | 2.91M | RaftPeerPB* new_peer = nullptr; |
2429 | 2.91M | const RaftPeerPB& server = req.server(); |
2430 | 2.91M | if (!use_hostport2.91M && !server.has_permanent_uuid()) { |
2431 | 0 | return STATUS(InvalidArgument, |
2432 | 0 | Substitute("server must have permanent_uuid or use_host specified: $0", |
2433 | 0 | req.ShortDebugString())); |
2434 | 0 | } |
2435 | 2.91M | { |
2436 | 2.91M | ReplicaState::UniqueLock lock; |
2437 | 2.91M | RETURN_NOT_OK(state_->LockForConfigChange(&lock)); |
2438 | 2.91M | Status s = state_->CheckActiveLeaderUnlocked(LeaderLeaseCheckMode::DONT_NEED_LEASE); |
2439 | 2.91M | if (!s.ok()) { |
2440 | 817 | *error_code = TabletServerErrorPB::NOT_THE_LEADER; |
2441 | 817 | return s; |
2442 | 817 | } |
2443 | | |
2444 | 18.4E | const string& server_uuid = 2.91M server.has_permanent_uuid()2.91M ? server.permanent_uuid()2.91M : ""; |
2445 | 2.91M | s = IsLeaderReadyForChangeConfigUnlocked(type, server_uuid); |
2446 | 2.91M | if (!s.ok()) { |
2447 | 2.91M | YB_LOG_EVERY_N(INFO, FLAGS_TEST_log_change_config_every_n) |
2448 | 1.79k | << "Returning not ready for " << ChangeConfigType_Name(type) |
2449 | 1.79k | << " due to error " << s.ToString(); |
2450 | 2.91M | *error_code = TabletServerErrorPB::LEADER_NOT_READY_CHANGE_CONFIG; |
2451 | 2.91M | return s; |
2452 | 2.91M | } |
2453 | | |
2454 | 5.91k | const RaftConfigPB& committed_config = state_->GetCommittedConfigUnlocked(); |
2455 | | |
2456 | | // Support atomic ChangeConfig requests. |
2457 | 5.91k | if (req.has_cas_config_opid_index()) { |
2458 | 3.75k | if (committed_config.opid_index() != req.cas_config_opid_index()) { |
2459 | 15 | *error_code = TabletServerErrorPB::CAS_FAILED; |
2460 | 15 | return STATUS(IllegalState, Substitute("Request specified cas_config_opid_index " |
2461 | 15 | "of $0 but the committed config has opid_index " |
2462 | 15 | "of $1", |
2463 | 15 | req.cas_config_opid_index(), |
2464 | 15 | committed_config.opid_index())); |
2465 | 15 | } |
2466 | 3.75k | } |
2467 | | |
2468 | 5.89k | RaftConfigPB new_config = committed_config; |
2469 | 5.89k | new_config.clear_opid_index(); |
2470 | 5.89k | switch (type) { |
2471 | 2.06k | case ADD_SERVER: |
2472 | | // Ensure the server we are adding is not already a member of the configuration. |
2473 | 2.06k | if (IsRaftConfigMember(server_uuid, committed_config)) { |
2474 | 0 | *error_code = TabletServerErrorPB::ADD_CHANGE_CONFIG_ALREADY_PRESENT; |
2475 | 0 | return STATUS(IllegalState, |
2476 | 0 | Substitute("Server with UUID $0 is already a member of the config. RaftConfig: $1", |
2477 | 0 | server_uuid, committed_config.ShortDebugString())); |
2478 | 0 | } |
2479 | 2.06k | if (!server.has_member_type()) { |
2480 | 0 | return STATUS(InvalidArgument, |
2481 | 0 | Substitute("Server must have member_type specified. Request: $0", |
2482 | 0 | req.ShortDebugString())); |
2483 | 0 | } |
2484 | 2.06k | if (server.member_type() != PeerMemberType::PRE_VOTER && |
2485 | 2.06k | server.member_type() != PeerMemberType::PRE_OBSERVER79 ) { |
2486 | 0 | return STATUS(InvalidArgument, |
2487 | 0 | Substitute("Server with UUID $0 must be of member_type PRE_VOTER or PRE_OBSERVER. " |
2488 | 0 | "member_type received: $1", server_uuid, |
2489 | 0 | PeerMemberType_Name(server.member_type()))); |
2490 | 0 | } |
2491 | 2.06k | if (server.last_known_private_addr().empty()) { |
2492 | 0 | return STATUS(InvalidArgument, "server must have last_known_addr specified", |
2493 | 0 | req.ShortDebugString()); |
2494 | 0 | } |
2495 | 2.06k | new_peer = new_config.add_peers(); |
2496 | 2.06k | *new_peer = server; |
2497 | 2.06k | break; |
2498 | | |
2499 | 1.82k | case REMOVE_SERVER: |
2500 | 1.82k | if (use_hostport) { |
2501 | 5 | if (server.last_known_private_addr().empty()) { |
2502 | 0 | return STATUS(InvalidArgument, "Must have last_known_addr specified.", |
2503 | 0 | req.ShortDebugString()); |
2504 | 0 | } |
2505 | 5 | HostPort leader_hp; |
2506 | 5 | RETURN_NOT_OK(GetHostPortFromConfig( |
2507 | 5 | new_config, peer_uuid(), queue_->local_cloud_info(), &leader_hp)); |
2508 | 5 | for (const auto& host_port : server.last_known_private_addr()) { |
2509 | 5 | if (leader_hp.port() == host_port.port() && leader_hp.host() == host_port.host()0 ) { |
2510 | 0 | return STATUS(InvalidArgument, "Cannot remove live leader using hostport.", |
2511 | 0 | req.ShortDebugString()); |
2512 | 0 | } |
2513 | 5 | } |
2514 | 5 | } |
2515 | 1.82k | if (server_uuid == peer_uuid()) { |
2516 | 30 | *error_code = TabletServerErrorPB::LEADER_NEEDS_STEP_DOWN; |
2517 | 30 | return STATUS(InvalidArgument, |
2518 | 30 | Substitute("Cannot remove peer $0 from the config because it is the leader. " |
2519 | 30 | "Force another leader to be elected to remove this server. " |
2520 | 30 | "Active consensus state: $1", server_uuid, |
2521 | 30 | state_->ConsensusStateUnlocked(CONSENSUS_CONFIG_ACTIVE) |
2522 | 30 | .ShortDebugString())); |
2523 | 30 | } |
2524 | 1.79k | if (!RemoveFromRaftConfig(&new_config, req)) { |
2525 | 0 | *error_code = TabletServerErrorPB::REMOVE_CHANGE_CONFIG_NOT_PRESENT; |
2526 | 0 | return STATUS(NotFound, |
2527 | 0 | Substitute("Server with UUID $0 not a member of the config. RaftConfig: $1", |
2528 | 0 | server_uuid, committed_config.ShortDebugString())); |
2529 | 0 | } |
2530 | 1.79k | break; |
2531 | | |
2532 | 2.01k | case CHANGE_ROLE: |
2533 | 2.01k | if (server_uuid == peer_uuid()) { |
2534 | 0 | return STATUS(InvalidArgument, |
2535 | 0 | Substitute("Cannot change role of peer $0 because it is the leader. Force " |
2536 | 0 | "another leader to be elected. Active consensus state: $1", server_uuid, |
2537 | 0 | state_->ConsensusStateUnlocked(CONSENSUS_CONFIG_ACTIVE) |
2538 | 0 | .ShortDebugString())); |
2539 | 0 | } |
2540 | 2.01k | VLOG(3) << "config before CHANGE_ROLE: " << new_config.DebugString()0 ; |
2541 | | |
2542 | 2.01k | if (!GetMutableRaftConfigMember(&new_config, server_uuid, &new_peer).ok()) { |
2543 | 0 | return STATUS(NotFound, |
2544 | 0 | Substitute("Server with UUID $0 not a member of the config. RaftConfig: $1", |
2545 | 0 | server_uuid, new_config.ShortDebugString())); |
2546 | 0 | } |
2547 | 2.01k | if (new_peer->member_type() != PeerMemberType::PRE_OBSERVER && |
2548 | 2.01k | new_peer->member_type() != PeerMemberType::PRE_VOTER1.93k ) { |
2549 | 0 | return STATUS(IllegalState, Substitute("Cannot change role of server with UUID $0 " |
2550 | 0 | "because its member type is $1", |
2551 | 0 | server_uuid, new_peer->member_type())); |
2552 | 0 | } |
2553 | 2.01k | if (new_peer->member_type() == PeerMemberType::PRE_OBSERVER) { |
2554 | 77 | new_peer->set_member_type(PeerMemberType::OBSERVER); |
2555 | 1.93k | } else { |
2556 | 1.93k | new_peer->set_member_type(PeerMemberType::VOTER); |
2557 | 1.93k | } |
2558 | | |
2559 | 2.01k | VLOG(3) << "config after CHANGE_ROLE: " << new_config.DebugString()0 ; |
2560 | 2.01k | break; |
2561 | 0 | default: |
2562 | 0 | return STATUS(InvalidArgument, Substitute("Unsupported type $0", |
2563 | 5.89k | ChangeConfigType_Name(type))); |
2564 | 5.89k | } |
2565 | | |
2566 | 5.87k | auto cc_replicate = std::make_shared<ReplicateMsg>(); |
2567 | 5.87k | cc_replicate->set_op_type(CHANGE_CONFIG_OP); |
2568 | 5.87k | ChangeConfigRecordPB* cc_req = cc_replicate->mutable_change_config_record(); |
2569 | 5.87k | cc_req->set_tablet_id(tablet_id()); |
2570 | 5.87k | *cc_req->mutable_old_config() = committed_config; |
2571 | 5.87k | *cc_req->mutable_new_config() = new_config; |
2572 | | // Note: This hybrid_time has no meaning from a serialization perspective |
2573 | | // because this method is not executed on the TabletPeer's prepare thread. |
2574 | 5.87k | cc_replicate->set_hybrid_time(clock_->Now().ToUint64()); |
2575 | 5.87k | state_->GetCommittedOpIdUnlocked().ToPB(cc_replicate->mutable_committed_op_id()); |
2576 | | |
2577 | 5.87k | auto context = std::make_shared<StateChangeContext>( |
2578 | 5.87k | StateChangeReason::LEADER_CONFIG_CHANGE_COMPLETE, *cc_req, |
2579 | 5.87k | type == REMOVE_SERVER ? server_uuid1.80k : ""4.07k ); |
2580 | | |
2581 | 5.87k | RETURN_NOT_OK( |
2582 | 5.87k | ReplicateConfigChangeUnlocked(cc_replicate, |
2583 | 5.87k | new_config, |
2584 | 5.87k | type, |
2585 | 5.87k | std::bind(&RaftConsensus::MarkDirtyOnSuccess, |
2586 | 5.87k | this, |
2587 | 5.87k | std::move(context), |
2588 | 5.87k | std::move(client_cb), std::placeholders::_1))); |
2589 | 5.87k | } |
2590 | | |
2591 | 5.87k | peer_manager_->SignalRequest(RequestTriggerMode::kNonEmptyOnly); |
2592 | | |
2593 | 5.87k | return Status::OK(); |
2594 | 5.87k | } |
2595 | | |
2596 | | Status RaftConsensus::UnsafeChangeConfig( |
2597 | | const UnsafeChangeConfigRequestPB& req, |
2598 | 6 | boost::optional<tserver::TabletServerErrorPB::Code>* error_code) { |
2599 | 6 | if (PREDICT_FALSE(!req.has_new_config())) { |
2600 | 0 | *error_code = TabletServerErrorPB::INVALID_CONFIG; |
2601 | 0 | return STATUS(InvalidArgument, "Request must contain 'new_config' argument " |
2602 | 0 | "to UnsafeChangeConfig()", yb::ToString(req)); |
2603 | 0 | } |
2604 | 6 | if (PREDICT_FALSE(!req.has_caller_id())) { |
2605 | 0 | *error_code = TabletServerErrorPB::INVALID_CONFIG; |
2606 | 0 | return STATUS(InvalidArgument, "Must specify 'caller_id' argument to UnsafeChangeConfig()", |
2607 | 0 | yb::ToString(req)); |
2608 | 0 | } |
2609 | | |
2610 | | // Grab the committed config and current term on this node. |
2611 | 6 | int64_t current_term; |
2612 | 6 | RaftConfigPB committed_config; |
2613 | 6 | OpId last_committed_opid; |
2614 | 6 | OpId preceding_opid; |
2615 | 6 | string local_peer_uuid; |
2616 | 6 | { |
2617 | | // Take the snapshot of the replica state and queue state so that |
2618 | | // we can stick them in the consensus update request later. |
2619 | 6 | auto lock = state_->LockForRead(); |
2620 | 6 | local_peer_uuid = state_->GetPeerUuid(); |
2621 | 6 | current_term = state_->GetCurrentTermUnlocked(); |
2622 | 6 | committed_config = state_->GetCommittedConfigUnlocked(); |
2623 | 6 | if (state_->IsConfigChangePendingUnlocked()) { |
2624 | 3 | LOG_WITH_PREFIX(WARNING) << "Replica has a pending config, but the new config " |
2625 | 3 | << "will be unsafely changed anyway. " |
2626 | 3 | << "Currently pending config on the node: " |
2627 | 3 | << yb::ToString(state_->GetPendingConfigUnlocked()); |
2628 | 3 | } |
2629 | 6 | last_committed_opid = state_->GetCommittedOpIdUnlocked(); |
2630 | 6 | preceding_opid = state_->GetLastAppliedOpIdUnlocked(); |
2631 | 6 | } |
2632 | | |
2633 | | // Validate that passed replica uuids are part of the committed config |
2634 | | // on this node. This allows a manual recovery tool to only have to specify |
2635 | | // the uuid of each replica in the new config without having to know the |
2636 | | // addresses of each server (since we can get the address information from |
2637 | | // the committed config). Additionally, only a subset of the committed config |
2638 | | // is required for typical cluster repair scenarios. |
2639 | 6 | std::unordered_set<string> retained_peer_uuids; |
2640 | 6 | const RaftConfigPB& config = req.new_config(); |
2641 | 7 | for (const RaftPeerPB& new_peer : config.peers()) { |
2642 | 7 | const string& peer_uuid = new_peer.permanent_uuid(); |
2643 | 7 | retained_peer_uuids.insert(peer_uuid); |
2644 | 7 | if (!IsRaftConfigMember(peer_uuid, committed_config)) { |
2645 | 0 | *error_code = TabletServerErrorPB::INVALID_CONFIG; |
2646 | 0 | return STATUS(InvalidArgument, Substitute("Peer with uuid $0 is not in the committed " |
2647 | 0 | "config on this replica, rejecting the " |
2648 | 0 | "unsafe config change request for tablet $1. " |
2649 | 0 | "Committed config: $2", |
2650 | 0 | peer_uuid, req.tablet_id(), |
2651 | 0 | yb::ToString(committed_config))); |
2652 | 0 | } |
2653 | 7 | } |
2654 | | |
2655 | 6 | RaftConfigPB new_config = committed_config; |
2656 | 18 | for (const auto& peer : committed_config.peers()) { |
2657 | 18 | const string& peer_uuid = peer.permanent_uuid(); |
2658 | 18 | if (!ContainsKey(retained_peer_uuids, peer_uuid)) { |
2659 | 11 | ChangeConfigRequestPB req; |
2660 | 11 | req.set_tablet_id(tablet_id()); |
2661 | 11 | req.mutable_server()->set_permanent_uuid(peer_uuid); |
2662 | 11 | req.set_type(REMOVE_SERVER); |
2663 | 11 | req.set_cas_config_opid_index(committed_config.opid_index()); |
2664 | 11 | CHECK(RemoveFromRaftConfig(&new_config, req)); |
2665 | 11 | } |
2666 | 18 | } |
2667 | | // Check that local peer is part of the new config and is a VOTER. |
2668 | | // Although it is valid for a local replica to not have itself |
2669 | | // in the committed config, it is rare and a replica without itself |
2670 | | // in the latest config is definitely not caught up with the latest leader's log. |
2671 | 6 | if (!IsRaftConfigVoter(local_peer_uuid, new_config)) { |
2672 | 0 | *error_code = TabletServerErrorPB::INVALID_CONFIG; |
2673 | 0 | return STATUS(InvalidArgument, Substitute("Local replica uuid $0 is not " |
2674 | 0 | "a VOTER in the new config, " |
2675 | 0 | "rejecting the unsafe config " |
2676 | 0 | "change request for tablet $1. " |
2677 | 0 | "Rejected config: $2" , |
2678 | 0 | local_peer_uuid, req.tablet_id(), |
2679 | 0 | yb::ToString(new_config))); |
2680 | 0 | } |
2681 | 6 | new_config.set_unsafe_config_change(true); |
2682 | 6 | int64 replicate_opid_index = preceding_opid.index + 1; |
2683 | 6 | new_config.clear_opid_index(); |
2684 | | |
2685 | | // Sanity check the new config. |
2686 | 6 | Status s = VerifyRaftConfig(new_config, UNCOMMITTED_QUORUM); |
2687 | 6 | if (!s.ok()) { |
2688 | 0 | *error_code = TabletServerErrorPB::INVALID_CONFIG; |
2689 | 0 | return STATUS(InvalidArgument, Substitute("The resulting new config for tablet $0 " |
2690 | 0 | "from passed parameters has failed raft " |
2691 | 0 | "config sanity check: $1", |
2692 | 0 | req.tablet_id(), s.ToString())); |
2693 | 0 | } |
2694 | | |
2695 | | // Prepare the consensus request as if the request is being generated |
2696 | | // from a different leader. |
2697 | 6 | ConsensusRequestPB consensus_req; |
2698 | 6 | consensus_req.set_caller_uuid(req.caller_id()); |
2699 | | // Bumping up the term for the consensus request being generated. |
2700 | | // This makes this request appear to come from a new leader that |
2701 | | // the local replica doesn't know about yet. If the local replica |
2702 | | // happens to be the leader, this will cause it to step down. |
2703 | 6 | const int64 new_term = current_term + 1; |
2704 | 6 | consensus_req.set_caller_term(new_term); |
2705 | 6 | preceding_opid.ToPB(consensus_req.mutable_preceding_id()); |
2706 | 6 | last_committed_opid.ToPB(consensus_req.mutable_committed_op_id()); |
2707 | | |
2708 | | // Prepare the replicate msg to be replicated. |
2709 | 6 | ReplicateMsg* replicate = consensus_req.add_ops(); |
2710 | 6 | ChangeConfigRecordPB* cc_req = replicate->mutable_change_config_record(); |
2711 | 6 | cc_req->set_tablet_id(req.tablet_id()); |
2712 | 6 | *cc_req->mutable_old_config() = committed_config; |
2713 | 6 | *cc_req->mutable_new_config() = new_config; |
2714 | 6 | OpIdPB* id = replicate->mutable_id(); |
2715 | | // Bumping up both the term and the opid_index from what's found in the log. |
2716 | 6 | id->set_term(new_term); |
2717 | 6 | id->set_index(replicate_opid_index); |
2718 | 6 | replicate->set_op_type(CHANGE_CONFIG_OP); |
2719 | 6 | replicate->set_hybrid_time(clock_->Now().ToUint64()); |
2720 | 6 | last_committed_opid.ToPB(replicate->mutable_committed_op_id()); |
2721 | | |
2722 | 6 | VLOG_WITH_PREFIX0 (3) << "UnsafeChangeConfig: Generated consensus request: " |
2723 | 0 | << yb::ToString(consensus_req); |
2724 | | |
2725 | 6 | LOG_WITH_PREFIX(WARNING) << "PROCEEDING WITH UNSAFE CONFIG CHANGE ON THIS SERVER, " |
2726 | 6 | << "COMMITTED CONFIG: " << yb::ToString(committed_config) |
2727 | 6 | << "NEW CONFIG: " << yb::ToString(new_config); |
2728 | | |
2729 | 6 | const auto deadline = CoarseMonoClock::Now() + 15s; // TODO: fix me |
2730 | 6 | ConsensusResponsePB consensus_resp; |
2731 | 6 | s = Update(&consensus_req, &consensus_resp, deadline); |
2732 | 6 | if (!s.ok() || consensus_resp.has_error()) { |
2733 | 0 | *error_code = TabletServerErrorPB::UNKNOWN_ERROR; |
2734 | 0 | } |
2735 | 6 | if (s.ok() && consensus_resp.has_error()) { |
2736 | 0 | s = StatusFromPB(consensus_resp.error().status()); |
2737 | 0 | } |
2738 | 6 | return s; |
2739 | 6 | } |
2740 | | |
2741 | 151k | void RaftConsensus::Shutdown() { |
2742 | 151k | LOG_WITH_PREFIX(INFO) << "Shutdown."; |
2743 | | |
2744 | | // Avoid taking locks if already shut down so we don't violate |
2745 | | // ThreadRestrictions assertions in the case where the RaftConsensus |
2746 | | // destructor runs on the reactor thread due to an election callback being |
2747 | | // the last outstanding reference. |
2748 | 151k | if (shutdown_.Load(kMemOrderAcquire)) return75.6k ; |
2749 | | |
2750 | 75.6k | CHECK_OK(ExecuteHook(PRE_SHUTDOWN)); |
2751 | | |
2752 | 75.6k | { |
2753 | 75.6k | ReplicaState::UniqueLock lock; |
2754 | | // Transition to kShuttingDown state. |
2755 | 75.6k | CHECK_OK(state_->LockForShutdown(&lock)); |
2756 | 75.6k | step_down_check_tracker_.StartShutdown(); |
2757 | 75.6k | } |
2758 | 75.6k | step_down_check_tracker_.CompleteShutdown(); |
2759 | | |
2760 | | // Close the peer manager. |
2761 | 75.6k | peer_manager_->Close(); |
2762 | | |
2763 | | // We must close the queue after we close the peers. |
2764 | 75.6k | queue_->Close(); |
2765 | | |
2766 | 75.6k | CHECK_OK(state_->CancelPendingOperations()); |
2767 | | |
2768 | 75.6k | { |
2769 | 75.6k | ReplicaState::UniqueLock lock; |
2770 | 75.6k | CHECK_OK(state_->LockForShutdown(&lock)); |
2771 | 75.6k | CHECK_EQ(ReplicaState::kShuttingDown, state_->state()); |
2772 | 75.6k | CHECK_OK(state_->ShutdownUnlocked()); |
2773 | 75.6k | LOG_WITH_PREFIX(INFO) << "Raft consensus is shut down!"; |
2774 | 75.6k | } |
2775 | | |
2776 | | // Shut down things that might acquire locks during destruction. |
2777 | 75.6k | raft_pool_token_->Shutdown(); |
2778 | | // We might not have run Start yet, so make sure we have a FD. |
2779 | 75.6k | if (failure_detector_75.6k ) { |
2780 | 75.6k | DisableFailureDetector(); |
2781 | 75.6k | } |
2782 | | |
2783 | 75.6k | CHECK_OK(ExecuteHook(POST_SHUTDOWN)); |
2784 | | |
2785 | 75.6k | shutdown_.Store(true, kMemOrderRelease); |
2786 | 75.6k | } |
2787 | | |
2788 | 0 | PeerRole RaftConsensus::GetActiveRole() const { |
2789 | 0 | auto lock = state_->LockForRead(); |
2790 | 0 | return state_->GetActiveRoleUnlocked(); |
2791 | 0 | } |
2792 | | |
2793 | 1.14M | yb::OpId RaftConsensus::GetLatestOpIdFromLog() { |
2794 | 1.14M | return log_->GetLatestEntryOpId(); |
2795 | 1.14M | } |
2796 | | |
2797 | 133k | Status RaftConsensus::StartConsensusOnlyRoundUnlocked(const ReplicateMsgPtr& msg) { |
2798 | 133k | OperationType op_type = msg->op_type(); |
2799 | 133k | if (!IsConsensusOnlyOperation(op_type)) { |
2800 | 0 | return STATUS_FORMAT(InvalidArgument, |
2801 | 0 | "Expected a consensus-only op type, got $0: $1", |
2802 | 0 | OperationType_Name(op_type), |
2803 | 0 | *msg); |
2804 | 0 | } |
2805 | 133k | VLOG_WITH_PREFIX10 (1) << "Starting consensus round: " |
2806 | 10 | << msg->id().ShortDebugString(); |
2807 | 133k | scoped_refptr<ConsensusRound> round(new ConsensusRound(this, msg)); |
2808 | 133k | std::shared_ptr<StateChangeContext> context = nullptr; |
2809 | | |
2810 | | // We are here for NO_OP or CHANGE_CONFIG_OP type ops. We need to set the change record for an |
2811 | | // actual config change operation. The NO_OP does not update the config, as it is used for a new |
2812 | | // leader election term change replicate message, which keeps the same config. |
2813 | 133k | if (IsChangeConfigOperation(op_type)) { |
2814 | 14.0k | context = |
2815 | 14.0k | std::make_shared<StateChangeContext>(StateChangeReason::FOLLOWER_CONFIG_CHANGE_COMPLETE, |
2816 | 14.0k | msg->change_config_record()); |
2817 | 119k | } else { |
2818 | 119k | context = std::make_shared<StateChangeContext>(StateChangeReason::FOLLOWER_NO_OP_COMPLETE); |
2819 | 119k | } |
2820 | | |
2821 | 133k | StdStatusCallback client_cb = |
2822 | 133k | std::bind(&RaftConsensus::MarkDirtyOnSuccess, |
2823 | 133k | this, |
2824 | 133k | context, |
2825 | 133k | &DoNothingStatusCB, |
2826 | 133k | std::placeholders::_1); |
2827 | 133k | round->SetCallback(MakeNonTrackedRoundCallback(round.get(), std::move(client_cb))); |
2828 | 133k | return state_->AddPendingOperation(round, OperationMode::kFollower); |
2829 | 133k | } |
2830 | | |
2831 | 3.02k | Status RaftConsensus::WaitForLeaderLeaseImprecise(CoarseTimePoint deadline) { |
2832 | 3.02k | CoarseTimePoint start = CoarseMonoClock::now(); |
2833 | 7.26k | for (;;) { |
2834 | 7.26k | MonoDelta remaining_old_leader_lease; |
2835 | 7.26k | LeaderLeaseStatus leader_lease_status; |
2836 | 7.26k | ReplicaState::State state; |
2837 | 7.26k | { |
2838 | 7.26k | auto lock = state_->LockForRead(); |
2839 | 7.26k | state = state_->state(); |
2840 | 7.26k | if (state != ReplicaState::kRunning) { |
2841 | 0 | return STATUS_FORMAT(IllegalState, "Consensus is not running: $0", state); |
2842 | 0 | } |
2843 | 7.26k | if (state_->GetActiveRoleUnlocked() != PeerRole::LEADER) { |
2844 | 4 | return STATUS_FORMAT(IllegalState, "Not the leader: $0", state_->GetActiveRoleUnlocked()); |
2845 | 4 | } |
2846 | 7.25k | leader_lease_status = state_->GetLeaderLeaseStatusUnlocked(&remaining_old_leader_lease); |
2847 | 7.25k | } |
2848 | 7.25k | if (leader_lease_status == LeaderLeaseStatus::HAS_LEASE) { |
2849 | 3.01k | return Status::OK(); |
2850 | 3.01k | } |
2851 | 4.24k | CoarseTimePoint now = CoarseMonoClock::now(); |
2852 | 4.24k | if (now > deadline) { |
2853 | 0 | return STATUS_FORMAT( |
2854 | 0 | TimedOut, "Waited for $0 to acquire a leader lease, state $1, lease status: $2", |
2855 | 0 | now - start, state, LeaderLeaseStatus_Name(leader_lease_status)); |
2856 | 0 | } |
2857 | 4.24k | switch (leader_lease_status) { |
2858 | 0 | case LeaderLeaseStatus::HAS_LEASE: |
2859 | 0 | return Status::OK(); |
2860 | 4.20k | case LeaderLeaseStatus::NO_MAJORITY_REPLICATED_LEASE: |
2861 | 4.20k | { |
2862 | 4.20k | std::unique_lock<decltype(leader_lease_wait_mtx_)> lock(leader_lease_wait_mtx_); |
2863 | | // Because we're not taking the same lock (leader_lease_wait_mtx_) when we check the |
2864 | | // leader lease status, there is a possibility of a race condition when we miss the |
2865 | | // notification and by this point we already have a lease. Rather than re-taking the |
2866 | | // ReplicaState lock and re-checking, here we simply block for up to 100ms in that case, |
2867 | | // because this function is currently (08/14/2017) only used in a context when it is OK, |
2868 | | // such as catalog manager initialization. |
2869 | 4.20k | leader_lease_wait_cond_.wait_for( |
2870 | 4.20k | lock, std::max<MonoDelta>(100ms, deadline - now).ToSteadyDuration()); |
2871 | 4.20k | } |
2872 | 4.20k | continue; |
2873 | 35 | case LeaderLeaseStatus::OLD_LEADER_MAY_HAVE_LEASE: { |
2874 | 35 | auto wait_deadline = std::min({deadline, now + 100ms, now + remaining_old_leader_lease}); |
2875 | 35 | std::this_thread::sleep_until(wait_deadline); |
2876 | 35 | } continue; |
2877 | 4.24k | } |
2878 | 0 | FATAL_INVALID_ENUM_VALUE(LeaderLeaseStatus, leader_lease_status); |
2879 | 0 | } |
2880 | 3.02k | } |
2881 | | |
2882 | 10.3M | Status RaftConsensus::CheckIsActiveLeaderAndHasLease() const { |
2883 | 10.3M | return state_->CheckIsActiveLeaderAndHasLease(); |
2884 | 10.3M | } |
2885 | | |
2886 | | Result<MicrosTime> RaftConsensus::MajorityReplicatedHtLeaseExpiration( |
2887 | 73.5M | MicrosTime min_allowed, CoarseTimePoint deadline) const { |
2888 | 73.5M | return state_->MajorityReplicatedHtLeaseExpiration(min_allowed, deadline); |
2889 | 73.5M | } |
2890 | | |
2891 | 329k | std::string RaftConsensus::GetRequestVoteLogPrefix(const VoteRequestPB& request) const { |
2892 | 329k | return Format("$0 Leader $1election vote request", |
2893 | 329k | state_->LogPrefix(), request.preelection() ? "pre-"216k : ""113k ); |
2894 | 329k | } |
2895 | | |
2896 | | void RaftConsensus::FillVoteResponseVoteGranted( |
2897 | 1.13M | const VoteRequestPB& request, VoteResponsePB* response) { |
2898 | 1.13M | response->set_responder_term(request.candidate_term()); |
2899 | 1.13M | response->set_vote_granted(true); |
2900 | 1.13M | } |
2901 | | |
2902 | | void RaftConsensus::FillVoteResponseVoteDenied(ConsensusErrorPB::Code error_code, |
2903 | 221k | VoteResponsePB* response) { |
2904 | 221k | response->set_responder_term(state_->GetCurrentTermUnlocked()); |
2905 | 221k | response->set_vote_granted(false); |
2906 | 221k | response->mutable_consensus_error()->set_code(error_code); |
2907 | 221k | } |
2908 | | |
2909 | | void RaftConsensus::RequestVoteRespondVoteDenied( |
2910 | | ConsensusErrorPB::Code error_code, const std::string& message_suffix, |
2911 | 205k | const VoteRequestPB& request, VoteResponsePB* response) { |
2912 | 205k | auto status = STATUS_FORMAT( |
2913 | 205k | InvalidArgument, "$0: Denying vote to candidate $1 $2", |
2914 | 205k | GetRequestVoteLogPrefix(request), request.candidate_uuid(), message_suffix); |
2915 | 205k | FillVoteResponseVoteDenied(error_code, response); |
2916 | 205k | LOG(INFO) << status.message().ToBuffer(); |
2917 | 205k | StatusToPB(status, response->mutable_consensus_error()->mutable_status()); |
2918 | 205k | } |
2919 | | |
2920 | | Status RaftConsensus::RequestVoteRespondInvalidTerm(const VoteRequestPB* request, |
2921 | 204k | VoteResponsePB* response) { |
2922 | 204k | auto message_suffix = Format( |
2923 | 204k | "for earlier term $0. Current term is $1.", |
2924 | 204k | request->candidate_term(), state_->GetCurrentTermUnlocked()); |
2925 | 204k | RequestVoteRespondVoteDenied(ConsensusErrorPB::INVALID_TERM, message_suffix, *request, response); |
2926 | 204k | return Status::OK(); |
2927 | 204k | } |
2928 | | |
2929 | | Status RaftConsensus::RequestVoteRespondVoteAlreadyGranted(const VoteRequestPB* request, |
2930 | 209 | VoteResponsePB* response) { |
2931 | 209 | FillVoteResponseVoteGranted(*request, response); |
2932 | 209 | LOG(INFO) << Substitute("$0: Already granted yes vote for candidate $1 in term $2. " |
2933 | 209 | "Re-sending same reply.", |
2934 | 209 | GetRequestVoteLogPrefix(*request), |
2935 | 209 | request->candidate_uuid(), |
2936 | 209 | request->candidate_term()); |
2937 | 209 | return Status::OK(); |
2938 | 209 | } |
2939 | | |
2940 | | Status RaftConsensus::RequestVoteRespondAlreadyVotedForOther(const VoteRequestPB* request, |
2941 | 1.16k | VoteResponsePB* response) { |
2942 | 1.16k | auto message_suffix = Format( |
2943 | 1.16k | "in current term $0: Already voted for candidate $1 in this term.", |
2944 | 1.16k | state_->GetCurrentTermUnlocked(), state_->GetVotedForCurrentTermUnlocked()); |
2945 | 1.16k | RequestVoteRespondVoteDenied(ConsensusErrorPB::ALREADY_VOTED, message_suffix, *request, response); |
2946 | 1.16k | return Status::OK(); |
2947 | 1.16k | } |
2948 | | |
2949 | | Status RaftConsensus::RequestVoteRespondLastOpIdTooOld(const OpIdPB& local_last_logged_opid, |
2950 | | const VoteRequestPB* request, |
2951 | 433 | VoteResponsePB* response) { |
2952 | 433 | auto message_suffix = Format( |
2953 | 433 | "for term $0 because replica has last-logged OpId of $1, which is greater than that of the " |
2954 | 433 | "candidate, which has last-logged OpId of $2.", |
2955 | 433 | request->candidate_term(), local_last_logged_opid, |
2956 | 433 | request->candidate_status().last_received()); |
2957 | 433 | RequestVoteRespondVoteDenied( |
2958 | 433 | ConsensusErrorPB::LAST_OPID_TOO_OLD, message_suffix, *request, response); |
2959 | 433 | return Status::OK(); |
2960 | 433 | } |
2961 | | |
2962 | | Status RaftConsensus::RequestVoteRespondLeaderIsAlive(const VoteRequestPB* request, |
2963 | 8.89k | VoteResponsePB* response) { |
2964 | 8.89k | FillVoteResponseVoteDenied(ConsensusErrorPB::LEADER_IS_ALIVE, response); |
2965 | 8.89k | std::string msg = Format( |
2966 | 8.89k | "$0: Denying vote to candidate $1 for term $2 because replica is either leader or believes a " |
2967 | 8.89k | "valid leader to be alive. Time left: $3", |
2968 | 8.89k | GetRequestVoteLogPrefix(*request), request->candidate_uuid(), request->candidate_term(), |
2969 | 8.89k | withhold_votes_until_.load(std::memory_order_acquire) - MonoTime::Now()); |
2970 | 8.89k | LOG(INFO) << msg; |
2971 | 8.89k | StatusToPB(STATUS(InvalidArgument, msg), response->mutable_consensus_error()->mutable_status()); |
2972 | 8.89k | return Status::OK(); |
2973 | 8.89k | } |
2974 | | |
2975 | | Status RaftConsensus::RequestVoteRespondIsBusy(const VoteRequestPB* request, |
2976 | 6.88k | VoteResponsePB* response) { |
2977 | 6.88k | FillVoteResponseVoteDenied(ConsensusErrorPB::CONSENSUS_BUSY, response); |
2978 | 6.88k | string msg = Substitute("$0: Denying vote to candidate $1 for term $2 because " |
2979 | 6.88k | "replica is already servicing an update from a current leader " |
2980 | 6.88k | "or another vote.", |
2981 | 6.88k | GetRequestVoteLogPrefix(*request), |
2982 | 6.88k | request->candidate_uuid(), |
2983 | 6.88k | request->candidate_term()); |
2984 | 6.88k | LOG(INFO) << msg; |
2985 | 6.88k | StatusToPB(STATUS(ServiceUnavailable, msg), |
2986 | 6.88k | response->mutable_consensus_error()->mutable_status()); |
2987 | 6.88k | return Status::OK(); |
2988 | 6.88k | } |
2989 | | |
2990 | | Status RaftConsensus::RequestVoteRespondVoteGranted(const VoteRequestPB* request, |
2991 | 107k | VoteResponsePB* response) { |
2992 | | // We know our vote will be "yes", so avoid triggering an election while we |
2993 | | // persist our vote to disk. We use an exponential backoff to avoid too much |
2994 | | // split-vote contention when nodes display high latencies. |
2995 | 107k | MonoDelta additional_backoff = LeaderElectionExpBackoffDeltaUnlocked(); |
2996 | 107k | SnoozeFailureDetector(ALLOW_LOGGING, additional_backoff); |
2997 | | |
2998 | | // Persist our vote to disk. |
2999 | 107k | RETURN_NOT_OK(state_->SetVotedForCurrentTermUnlocked(request->candidate_uuid())); |
3000 | | |
3001 | 107k | FillVoteResponseVoteGranted(*request, response); |
3002 | | |
3003 | | // Give peer time to become leader. Snooze one more time after persisting our |
3004 | | // vote. When disk latency is high, this should help reduce churn. |
3005 | 107k | SnoozeFailureDetector(DO_NOT_LOG, additional_backoff); |
3006 | | |
3007 | 107k | LOG(INFO) << Substitute("$0: Granting yes vote for candidate $1 in term $2.", |
3008 | 107k | GetRequestVoteLogPrefix(*request), |
3009 | 107k | request->candidate_uuid(), |
3010 | 107k | state_->GetCurrentTermUnlocked()); |
3011 | 107k | return Status::OK(); |
3012 | 107k | } |
3013 | | |
3014 | 28.9M | PeerRole RaftConsensus::GetRoleUnlocked() const { |
3015 | 28.9M | DCHECK(state_->IsLocked()); |
3016 | 28.9M | return state_->GetActiveRoleUnlocked(); |
3017 | 28.9M | } |
3018 | | |
3019 | 28.9M | PeerRole RaftConsensus::role() const { |
3020 | 28.9M | auto lock = state_->LockForRead(); |
3021 | 28.9M | return GetRoleUnlocked(); |
3022 | 28.9M | } |
3023 | | |
3024 | 82.0M | LeaderState RaftConsensus::GetLeaderState(bool allow_stale) const { |
3025 | 82.0M | return state_->GetLeaderState(allow_stale); |
3026 | 82.0M | } |
3027 | | |
3028 | 8.74M | std::string RaftConsensus::LogPrefix() { |
3029 | 8.74M | return state_->LogPrefix(); |
3030 | 8.74M | } |
3031 | | |
3032 | 182k | void RaftConsensus::SetLeaderUuidUnlocked(const string& uuid) { |
3033 | 182k | failed_elections_since_stable_leader_.store(0, std::memory_order_release); |
3034 | 182k | state_->SetLeaderUuidUnlocked(uuid); |
3035 | 182k | auto context = std::make_shared<StateChangeContext>(StateChangeReason::NEW_LEADER_ELECTED, uuid); |
3036 | 182k | MarkDirty(context); |
3037 | 182k | } |
3038 | | |
3039 | | Status RaftConsensus::ReplicateConfigChangeUnlocked(const ReplicateMsgPtr& replicate_ref, |
3040 | | const RaftConfigPB& new_config, |
3041 | | ChangeConfigType type, |
3042 | 5.87k | StdStatusCallback client_cb) { |
3043 | 5.87k | LOG(INFO) << "Setting replicate pending config " << new_config.ShortDebugString() |
3044 | 5.87k | << ", type = " << ChangeConfigType_Name(type); |
3045 | | |
3046 | 5.87k | RETURN_NOT_OK(state_->SetPendingConfigUnlocked(new_config)); |
3047 | | |
3048 | 5.87k | if (type == CHANGE_ROLE && |
3049 | 5.87k | PREDICT_FALSE2.01k (FLAGS_TEST_inject_delay_leader_change_role_append_secs)) { |
3050 | 1 | LOG(INFO) << "Adding change role sleep for " |
3051 | 1 | << FLAGS_TEST_inject_delay_leader_change_role_append_secs << " secs."; |
3052 | 1 | SleepFor(MonoDelta::FromSeconds(FLAGS_TEST_inject_delay_leader_change_role_append_secs)); |
3053 | 1 | } |
3054 | | |
3055 | | // Set as pending. |
3056 | 5.87k | RefreshConsensusQueueAndPeersUnlocked(); |
3057 | | |
3058 | 5.87k | auto round = make_scoped_refptr<ConsensusRound>(this, replicate_ref); |
3059 | 5.87k | round->SetCallback(MakeNonTrackedRoundCallback(round.get(), std::move(client_cb))); |
3060 | 5.87k | auto status = AppendNewRoundToQueueUnlocked(round); |
3061 | 5.87k | if (!status.ok()) { |
3062 | | // We could just cancel pending config, because there is could be only one pending config. |
3063 | 0 | auto clear_status = state_->ClearPendingConfigUnlocked(); |
3064 | 0 | if (!clear_status.ok()) { |
3065 | 0 | LOG(WARNING) << "Could not clear pending config: " << clear_status; |
3066 | 0 | } |
3067 | 0 | } |
3068 | 5.87k | return status; |
3069 | 5.87k | } |
3070 | | |
3071 | 67.9k | void RaftConsensus::RefreshConsensusQueueAndPeersUnlocked() { |
3072 | 67.9k | DCHECK_EQ(PeerRole::LEADER, state_->GetActiveRoleUnlocked()); |
3073 | 67.9k | const RaftConfigPB& active_config = state_->GetActiveConfigUnlocked(); |
3074 | | |
3075 | | // Change the peers so that we're able to replicate messages remotely and |
3076 | | // locally. Peer manager connections are updated using the active config. Connections to peers |
3077 | | // that are not part of active_config are closed. New connections are created for those peers |
3078 | | // that are present in active_config but have no connections. When the queue is in LEADER |
3079 | | // mode, it checks that all registered peers are a part of the active config. |
3080 | 67.9k | peer_manager_->ClosePeersNotInConfig(active_config); |
3081 | 67.9k | queue_->SetLeaderMode(state_->GetCommittedOpIdUnlocked(), |
3082 | 67.9k | state_->GetCurrentTermUnlocked(), |
3083 | 67.9k | state_->GetLastAppliedOpIdUnlocked(), |
3084 | 67.9k | active_config); |
3085 | | |
3086 | 67.9k | ScopedDnsTracker dns_tracker(update_raft_config_dns_latency_.get()); |
3087 | 67.9k | peer_manager_->UpdateRaftConfig(active_config); |
3088 | 67.9k | } |
3089 | | |
3090 | 4.44k | string RaftConsensus::peer_uuid() const { |
3091 | 4.44k | return state_->GetPeerUuid(); |
3092 | 4.44k | } |
3093 | | |
3094 | 60.6k | string RaftConsensus::tablet_id() const { |
3095 | 60.6k | return state_->GetOptions().tablet_id; |
3096 | 60.6k | } |
3097 | | |
3098 | 10.7k | const TabletId& RaftConsensus::split_parent_tablet_id() const { |
3099 | 10.7k | return split_parent_tablet_id_; |
3100 | 10.7k | } |
3101 | | |
3102 | | ConsensusStatePB RaftConsensus::ConsensusState( |
3103 | | ConsensusConfigType type, |
3104 | 46.5M | LeaderLeaseStatus* leader_lease_status) const { |
3105 | 46.5M | auto lock = state_->LockForRead(); |
3106 | 46.5M | return ConsensusStateUnlocked(type, leader_lease_status); |
3107 | 46.5M | } |
3108 | | |
3109 | | ConsensusStatePB RaftConsensus::ConsensusStateUnlocked( |
3110 | | ConsensusConfigType type, |
3111 | 46.5M | LeaderLeaseStatus* leader_lease_status) const { |
3112 | 46.5M | CHECK(state_->IsLocked()); |
3113 | 46.5M | if (leader_lease_status) { |
3114 | 3.88k | if (GetRoleUnlocked() == PeerRole::LEADER) { |
3115 | 1.45k | *leader_lease_status = state_->GetLeaderLeaseStatusUnlocked(); |
3116 | 2.42k | } else { |
3117 | | // We'll still return a valid value if we're not a leader. |
3118 | 2.42k | *leader_lease_status = LeaderLeaseStatus::NO_MAJORITY_REPLICATED_LEASE; |
3119 | 2.42k | } |
3120 | 3.88k | } |
3121 | 46.5M | return state_->ConsensusStateUnlocked(type); |
3122 | 46.5M | } |
3123 | | |
3124 | 77.9M | RaftConfigPB RaftConsensus::CommittedConfig() const { |
3125 | 77.9M | auto lock = state_->LockForRead(); |
3126 | 77.9M | return state_->GetCommittedConfigUnlocked(); |
3127 | 77.9M | } |
3128 | | |
3129 | 38 | void RaftConsensus::DumpStatusHtml(std::ostream& out) const { |
3130 | 38 | out << "<h1>Raft Consensus State</h1>" << std::endl; |
3131 | | |
3132 | 38 | out << "<h2>State</h2>" << std::endl; |
3133 | 38 | out << "<pre>" << EscapeForHtmlToString(queue_->ToString()) << "</pre>" << std::endl; |
3134 | | |
3135 | | // Dump the queues on a leader. |
3136 | 38 | PeerRole role; |
3137 | 38 | { |
3138 | 38 | auto lock = state_->LockForRead(); |
3139 | 38 | role = state_->GetActiveRoleUnlocked(); |
3140 | 38 | } |
3141 | 38 | if (role == PeerRole::LEADER) { |
3142 | 38 | out << "<h2>Queue overview</h2>" << std::endl; |
3143 | 38 | out << "<pre>" << EscapeForHtmlToString(queue_->ToString()) << "</pre>" << std::endl; |
3144 | 38 | out << "<hr/>" << std::endl; |
3145 | 38 | out << "<h2>Queue details</h2>" << std::endl; |
3146 | 38 | queue_->DumpToHtml(out); |
3147 | 38 | } |
3148 | 38 | } |
3149 | | |
3150 | 46 | ReplicaState* RaftConsensus::GetReplicaStateForTests() { |
3151 | 46 | return state_.get(); |
3152 | 46 | } |
3153 | | |
3154 | | void RaftConsensus::ElectionCallback(const LeaderElectionData& data, |
3155 | 737k | const ElectionResult& result) { |
3156 | | // The election callback runs on a reactor thread, so we need to defer to our |
3157 | | // threadpool. If the threadpool is already shut down for some reason, it's OK -- |
3158 | | // we're OK with the callback never running. |
3159 | 737k | WARN_NOT_OK(raft_pool_token_->SubmitFunc( |
3160 | 737k | std::bind(&RaftConsensus::DoElectionCallback, shared_from_this(), data, result)), |
3161 | 737k | state_->LogPrefix() + "Unable to run election callback"); |
3162 | 737k | } |
3163 | | |
3164 | 123k | void RaftConsensus::NotifyOriginatorAboutLostElection(const std::string& originator_uuid) { |
3165 | 123k | if (originator_uuid.empty()) { |
3166 | 123k | return; |
3167 | 123k | } |
3168 | | |
3169 | 77 | ReplicaState::UniqueLock lock; |
3170 | 77 | Status s = state_->LockForConfigChange(&lock); |
3171 | 77 | if (PREDICT_FALSE(!s.ok())) { |
3172 | 0 | LOG_WITH_PREFIX(INFO) << "Unable to notify originator about lost election, lock failed: " |
3173 | 0 | << s.ToString(); |
3174 | 0 | return; |
3175 | 0 | } |
3176 | | |
3177 | 77 | const auto& active_config = state_->GetActiveConfigUnlocked(); |
3178 | 77 | const auto * peer = FindPeer(active_config, originator_uuid); |
3179 | 77 | if (!peer) { |
3180 | 0 | LOG_WITH_PREFIX(WARNING) << "Failed to find originators peer: " << originator_uuid |
3181 | 0 | << ", config: " << active_config.ShortDebugString(); |
3182 | 0 | return; |
3183 | 0 | } |
3184 | | |
3185 | 77 | auto proxy = peer_proxy_factory_->NewProxy(*peer); |
3186 | 77 | LeaderElectionLostRequestPB req; |
3187 | 77 | req.set_dest_uuid(originator_uuid); |
3188 | 77 | req.set_election_lost_by_uuid(state_->GetPeerUuid()); |
3189 | 77 | req.set_tablet_id(state_->GetOptions().tablet_id); |
3190 | 77 | auto resp = std::make_shared<LeaderElectionLostResponsePB>(); |
3191 | 77 | auto rpc = std::make_shared<rpc::RpcController>(); |
3192 | 77 | rpc->set_invoke_callback_mode(rpc::InvokeCallbackMode::kThreadPoolHigh); |
3193 | 77 | auto log_prefix = state_->LogPrefix(); |
3194 | 77 | proxy->LeaderElectionLostAsync(&req, resp.get(), rpc.get(), [log_prefix, resp, rpc] { |
3195 | 77 | if (!rpc->status().ok()) { |
3196 | 0 | LOG(WARNING) << log_prefix << "Notify about lost election RPC failure: " |
3197 | 0 | << rpc->status().ToString(); |
3198 | 77 | } else if (resp->has_error()) { |
3199 | 0 | LOG(WARNING) << log_prefix << "Notify about lost election failed: " |
3200 | 0 | << StatusFromPB(resp->error().status()).ToString(); |
3201 | 0 | } |
3202 | 77 | }); |
3203 | 77 | } |
3204 | | |
3205 | | void RaftConsensus::DoElectionCallback(const LeaderElectionData& data, |
3206 | 737k | const ElectionResult& result) { |
3207 | 737k | const char* election_name = result.preelection ? "Pre-election"674k : "election"62.8k ; |
3208 | 737k | const char* decision_name = result.decision == ElectionVote::kGranted ? "won"613k : "lost"123k ; |
3209 | | // Snooze to avoid the election timer firing again as much as possible. |
3210 | 737k | { |
3211 | 737k | auto lock = state_->LockForRead(); |
3212 | | // We need to snooze when we win and when we lose: |
3213 | | // - When we win because we're about to disable the timer and become leader. |
3214 | | // - When we loose or otherwise we can fall into a cycle, where everyone keeps |
3215 | | // triggering elections but no election ever completes because by the time they |
3216 | | // finish another one is triggered already. |
3217 | | // We ignore the status as we don't want to fail if we the timer is |
3218 | | // disabled. |
3219 | 737k | SnoozeFailureDetector(ALLOW_LOGGING, LeaderElectionExpBackoffDeltaUnlocked()); |
3220 | | |
3221 | 737k | if (!result.preelections_not_supported_by_uuid.empty()) { |
3222 | 0 | disable_pre_elections_until_ = |
3223 | 0 | CoarseMonoClock::now() + FLAGS_temporary_disable_preelections_timeout_ms * 1ms; |
3224 | 0 | LOG_WITH_PREFIX(WARNING) |
3225 | 0 | << "Disable pre-elections until " << ToString(disable_pre_elections_until_) |
3226 | 0 | << ", because " << result.preelections_not_supported_by_uuid << " does not support them."; |
3227 | 0 | } |
3228 | 737k | } |
3229 | 737k | if (result.decision == ElectionVote::kDenied) { |
3230 | 123k | failed_elections_since_stable_leader_.fetch_add(1, std::memory_order_acq_rel); |
3231 | 123k | LOG_WITH_PREFIX(INFO) << "Leader " << election_name << " lost for term " |
3232 | 123k | << result.election_term << ". Reason: " |
3233 | 123k | << (!result.message.empty() ? result.message88.2k : "None given"35.4k ) |
3234 | 123k | << ". Originator: " << data.originator_uuid; |
3235 | 123k | NotifyOriginatorAboutLostElection(data.originator_uuid); |
3236 | | |
3237 | 123k | if (result.higher_term) { |
3238 | 88.2k | ReplicaState::UniqueLock lock; |
3239 | 88.2k | Status s = state_->LockForConfigChange(&lock); |
3240 | 88.2k | if (s.ok()) { |
3241 | 88.2k | s = HandleTermAdvanceUnlocked(*result.higher_term); |
3242 | 88.2k | } |
3243 | 88.2k | if (!s.ok()) { |
3244 | 87.7k | LOG_WITH_PREFIX(INFO) << "Unable to advance term as " << election_name << " result: " << s; |
3245 | 87.7k | } |
3246 | 88.2k | } |
3247 | | |
3248 | 123k | return; |
3249 | 123k | } |
3250 | | |
3251 | 613k | ReplicaState::UniqueLock lock; |
3252 | 613k | Status s = state_->LockForConfigChange(&lock); |
3253 | 613k | if (PREDICT_FALSE(!s.ok())) { |
3254 | 0 | LOG_WITH_PREFIX(INFO) << "Received " << election_name << " callback for term " |
3255 | 0 | << result.election_term << " while not running: " |
3256 | 0 | << s.ToString(); |
3257 | 0 | return; |
3258 | 0 | } |
3259 | | |
3260 | 613k | auto desired_term = state_->GetCurrentTermUnlocked() + (result.preelection ? 1551k : 061.9k ); |
3261 | 613k | if (result.election_term != desired_term) { |
3262 | 1.21k | LOG_WITH_PREFIX(INFO) |
3263 | 1.21k | << "Leader " << election_name << " decision for defunct term " |
3264 | 1.21k | << result.election_term << ": " << decision_name; |
3265 | 1.21k | return; |
3266 | 1.21k | } |
3267 | | |
3268 | 612k | const RaftConfigPB& active_config = state_->GetActiveConfigUnlocked(); |
3269 | 612k | if (!IsRaftConfigVoter(state_->GetPeerUuid(), active_config)) { |
3270 | 0 | LOG_WITH_PREFIX(WARNING) |
3271 | 0 | << "Leader " << election_name << " decision while not in active config. " |
3272 | 0 | << "Result: Term " << result.election_term << ": " << decision_name |
3273 | 0 | << ". RaftConfig: " << active_config.ShortDebugString(); |
3274 | 0 | return; |
3275 | 0 | } |
3276 | | |
3277 | 612k | if (result.preelection) { |
3278 | 550k | LOG_WITH_PREFIX(INFO) << "Leader pre-election won for term " << result.election_term; |
3279 | 550k | lock.unlock(); |
3280 | 550k | WARN_NOT_OK(DoStartElection(data, PreElected::kTrue), "Start election failed: "); |
3281 | 550k | return; |
3282 | 550k | } |
3283 | | |
3284 | 61.9k | if (state_->GetActiveRoleUnlocked() == PeerRole::LEADER) { |
3285 | 0 | LOG_WITH_PREFIX(DFATAL) |
3286 | 0 | << "Leader " << election_name << " callback while already leader! Result: Term " |
3287 | 0 | << result.election_term << ": " |
3288 | 0 | << decision_name; |
3289 | 0 | return; |
3290 | 0 | } |
3291 | | |
3292 | 61.9k | LOG_WITH_PREFIX(INFO) << "Leader " << election_name << " won for term " << result.election_term; |
3293 | | |
3294 | | // Apply lease updates that were possible received from voters. |
3295 | 61.9k | state_->UpdateOldLeaderLeaseExpirationOnNonLeaderUnlocked( |
3296 | 61.9k | result.old_leader_lease, result.old_leader_ht_lease); |
3297 | | |
3298 | 61.9k | state_->SetLeaderNoOpCommittedUnlocked(false); |
3299 | | // Convert role to LEADER. |
3300 | 61.9k | SetLeaderUuidUnlocked(state_->GetPeerUuid()); |
3301 | | |
3302 | | // TODO: BecomeLeaderUnlocked() can fail due to state checks during shutdown. |
3303 | | // It races with the above state check. |
3304 | | // This could be a problem during tablet deletion. |
3305 | 61.9k | auto status = BecomeLeaderUnlocked(); |
3306 | 61.9k | if (!status.ok()) { |
3307 | 0 | LOG_WITH_PREFIX(DFATAL) << "Failed to become leader: " << status.ToString(); |
3308 | 0 | } |
3309 | 61.9k | } |
3310 | | |
3311 | 341 | yb::OpId RaftConsensus::GetLastReceivedOpId() { |
3312 | 341 | auto lock = state_->LockForRead(); |
3313 | 341 | return state_->GetLastReceivedOpIdUnlocked(); |
3314 | 341 | } |
3315 | | |
3316 | 51.8M | yb::OpId RaftConsensus::GetLastCommittedOpId() { |
3317 | 51.8M | auto lock = state_->LockForRead(); |
3318 | 51.8M | return state_->GetCommittedOpIdUnlocked(); |
3319 | 51.8M | } |
3320 | | |
3321 | 3.82M | yb::OpId RaftConsensus::GetLastCDCedOpId() { |
3322 | 3.82M | return queue_->GetCDCConsumerOpIdForIntentRemoval(); |
3323 | 3.82M | } |
3324 | | |
3325 | 0 | yb::OpId RaftConsensus::GetLastAppliedOpId() { |
3326 | 0 | auto lock = state_->LockForRead(); |
3327 | 0 | return state_->GetLastAppliedOpIdUnlocked(); |
3328 | 0 | } |
3329 | | |
3330 | 59 | yb::OpId RaftConsensus::GetAllAppliedOpId() { |
3331 | 59 | return queue_->GetAllAppliedOpId(); |
3332 | 59 | } |
3333 | | |
3334 | 471k | void RaftConsensus::MarkDirty(std::shared_ptr<StateChangeContext> context) { |
3335 | 471k | LOG_WITH_PREFIX(INFO) << "Calling mark dirty synchronously for reason code " << context->reason; |
3336 | 471k | mark_dirty_clbk_.Run(context); |
3337 | 471k | } |
3338 | | |
3339 | | void RaftConsensus::MarkDirtyOnSuccess(std::shared_ptr<StateChangeContext> context, |
3340 | | const StdStatusCallback& client_cb, |
3341 | 138k | const Status& status) { |
3342 | 138k | if (PREDICT_TRUE(status.ok())) { |
3343 | 138k | MarkDirty(context); |
3344 | 138k | } |
3345 | 138k | client_cb(status); |
3346 | 138k | } |
3347 | | |
3348 | | void RaftConsensus::NonTrackedRoundReplicationFinished(ConsensusRound* round, |
3349 | | const StdStatusCallback& client_cb, |
3350 | 200k | const Status& status) { |
3351 | 200k | DCHECK(state_->IsLocked()); |
3352 | 200k | OperationType op_type = round->replicate_msg()->op_type(); |
3353 | 200k | string op_str = Format("$0 [$1]", OperationType_Name(op_type), round->id()); |
3354 | 200k | if (!IsConsensusOnlyOperation(op_type)) { |
3355 | 0 | LOG_WITH_PREFIX(ERROR) << "Unexpected op type: " << op_str; |
3356 | 0 | return; |
3357 | 0 | } |
3358 | 200k | if (!status.ok()) { |
3359 | | // TODO: Do something with the status on failure? |
3360 | 138 | LOG_WITH_PREFIX(INFO) << op_str << " replication failed: " << status << "\n" << GetStackTrace(); |
3361 | | |
3362 | | // Clear out the pending state (ENG-590). |
3363 | 138 | if (IsChangeConfigOperation(op_type)) { |
3364 | 19 | WARN_NOT_OK(state_->ClearPendingConfigUnlocked(), "Could not clear pending state"); |
3365 | 19 | } |
3366 | 200k | } else if (IsChangeConfigOperation(op_type)) { |
3367 | | // Notify the TabletPeer owner object. |
3368 | 19.5k | state_->context()->ChangeConfigReplicated(state_->GetCommittedConfigUnlocked()); |
3369 | 19.5k | } |
3370 | | |
3371 | 200k | client_cb(status); |
3372 | 200k | } |
3373 | | |
3374 | 161k | void RaftConsensus::EnableFailureDetector(MonoDelta delta) { |
3375 | 161k | if (PREDICT_TRUE(FLAGS_enable_leader_failure_detection)) { |
3376 | 161k | failure_detector_->Start(delta); |
3377 | 161k | } |
3378 | 161k | } |
3379 | | |
3380 | 137k | void RaftConsensus::DisableFailureDetector() { |
3381 | 137k | if (PREDICT_TRUE(FLAGS_enable_leader_failure_detection)) { |
3382 | 137k | failure_detector_->Stop(); |
3383 | 137k | } |
3384 | 137k | } |
3385 | | |
3386 | 27.6M | void RaftConsensus::SnoozeFailureDetector(AllowLogging allow_logging, MonoDelta delta) { |
3387 | 27.6M | if (PREDICT_TRUE(GetAtomicFlag(&FLAGS_enable_leader_failure_detection))) { |
3388 | 26.5M | if (allow_logging == ALLOW_LOGGING) { |
3389 | 2.07M | LOG_WITH_PREFIX(INFO) << Format("Snoozing leader timeout detection for $0", |
3390 | 18.4E | delta.Initialized()2.07M ? delta.ToString()2.07M : "election timeout"); |
3391 | 2.07M | } |
3392 | | |
3393 | 26.5M | if (!delta.Initialized()) { |
3394 | 24.3M | delta = MinimumElectionTimeout(); |
3395 | 24.3M | } |
3396 | 26.5M | failure_detector_->Snooze(delta); |
3397 | 26.5M | } |
3398 | 27.6M | } |
3399 | | |
3400 | 52.0M | MonoDelta RaftConsensus::MinimumElectionTimeout() const { |
3401 | 52.0M | int32_t failure_timeout = FLAGS_leader_failure_max_missed_heartbeat_periods * |
3402 | 52.0M | FLAGS_raft_heartbeat_interval_ms; |
3403 | | |
3404 | 52.0M | return MonoDelta::FromMilliseconds(failure_timeout); |
3405 | 52.0M | } |
3406 | | |
3407 | 2.07M | MonoDelta RaftConsensus::LeaderElectionExpBackoffDeltaUnlocked() { |
3408 | | // Compute a backoff factor based on how many leader elections have |
3409 | | // taken place since a stable leader was last seen. |
3410 | 2.07M | double backoff_factor = pow( |
3411 | 2.07M | 1.1, |
3412 | 2.07M | failed_elections_since_stable_leader_.load(std::memory_order_acquire) + 1); |
3413 | 2.07M | double min_timeout = MinimumElectionTimeout().ToMilliseconds(); |
3414 | 2.07M | double max_timeout = std::min<double>( |
3415 | 2.07M | min_timeout * backoff_factor, |
3416 | 2.07M | FLAGS_leader_failure_exp_backoff_max_delta_ms); |
3417 | 2.07M | if (max_timeout < min_timeout) { |
3418 | 1.23k | LOG(INFO) << "Resetting max_timeout from " << max_timeout << " to " << min_timeout |
3419 | 1.23k | << ", max_delta_flag=" << FLAGS_leader_failure_exp_backoff_max_delta_ms; |
3420 | 1.23k | max_timeout = min_timeout; |
3421 | 1.23k | } |
3422 | | // Randomize the timeout between the minimum and the calculated value. |
3423 | | // We do this after the above capping to the max. Otherwise, after a |
3424 | | // churny period, we'd end up highly likely to backoff exactly the max |
3425 | | // amount. |
3426 | 2.07M | double timeout = min_timeout + (max_timeout - min_timeout) * rng_.NextDoubleFraction(); |
3427 | 2.07M | DCHECK_GE(timeout, min_timeout); |
3428 | | |
3429 | 2.07M | return MonoDelta::FromMilliseconds(timeout); |
3430 | 2.07M | } |
3431 | | |
3432 | 550k | Status RaftConsensus::IncrementTermUnlocked() { |
3433 | 550k | return HandleTermAdvanceUnlocked(state_->GetCurrentTermUnlocked() + 1); |
3434 | 550k | } |
3435 | | |
3436 | 758k | Status RaftConsensus::HandleTermAdvanceUnlocked(ConsensusTerm new_term) { |
3437 | 758k | if (new_term <= state_->GetCurrentTermUnlocked()) { |
3438 | 1.09k | return STATUS(IllegalState, Substitute("Can't advance term to: $0 current term: $1 is higher.", |
3439 | 1.09k | new_term, state_->GetCurrentTermUnlocked())); |
3440 | 1.09k | } |
3441 | | |
3442 | 757k | if (state_->GetActiveRoleUnlocked() == PeerRole::LEADER) { |
3443 | 945 | LOG_WITH_PREFIX(INFO) << "Stepping down as leader of term " |
3444 | 945 | << state_->GetCurrentTermUnlocked() |
3445 | 945 | << " since new term is " << new_term; |
3446 | | |
3447 | 945 | RETURN_NOT_OK(BecomeReplicaUnlocked(std::string())); |
3448 | 945 | } |
3449 | | |
3450 | 757k | LOG_WITH_PREFIX(INFO) << "Advancing to term " << new_term; |
3451 | 757k | RETURN_NOT_OK(state_->SetCurrentTermUnlocked(new_term)); |
3452 | 182k | term_metric_->set_value(new_term); |
3453 | 182k | return Status::OK(); |
3454 | 757k | } |
3455 | | |
3456 | | Result<ReadOpsResult> RaftConsensus::ReadReplicatedMessagesForCDC(const yb::OpId& from, |
3457 | | int64_t* last_replicated_opid_index, |
3458 | 617 | const CoarseTimePoint deadline) { |
3459 | 617 | return queue_->ReadReplicatedMessagesForCDC(from, last_replicated_opid_index, deadline); |
3460 | 617 | } |
3461 | | |
3462 | 874 | void RaftConsensus::UpdateCDCConsumerOpId(const yb::OpId& op_id) { |
3463 | 874 | return queue_->UpdateCDCConsumerOpId(op_id); |
3464 | 874 | } |
3465 | | |
3466 | | void RaftConsensus::RollbackIdAndDeleteOpId(const ReplicateMsgPtr& replicate_msg, |
3467 | 23 | bool should_exists) { |
3468 | 23 | state_->CancelPendingOperation(OpId::FromPB(replicate_msg->id()), should_exists); |
3469 | 23 | } |
3470 | | |
3471 | 8.07k | uint64_t RaftConsensus::OnDiskSize() const { |
3472 | 8.07k | return state_->OnDiskSize(); |
3473 | 8.07k | } |
3474 | | |
3475 | 5.36M | yb::OpId RaftConsensus::WaitForSafeOpIdToApply(const yb::OpId& op_id) { |
3476 | 5.36M | return log_->WaitForSafeOpIdToApply(op_id); |
3477 | 5.36M | } |
3478 | | |
3479 | 50.1M | yb::OpId RaftConsensus::MinRetryableRequestOpId() { |
3480 | 50.1M | return state_->MinRetryableRequestOpId(); |
3481 | 50.1M | } |
3482 | | |
3483 | 0 | size_t RaftConsensus::LogCacheSize() { |
3484 | 0 | return queue_->LogCacheSize(); |
3485 | 0 | } |
3486 | | |
3487 | 0 | size_t RaftConsensus::EvictLogCache(size_t bytes_to_evict) { |
3488 | 0 | return queue_->EvictLogCache(bytes_to_evict); |
3489 | 0 | } |
3490 | | |
3491 | 0 | RetryableRequestsCounts RaftConsensus::TEST_CountRetryableRequests() { |
3492 | 0 | return state_->TEST_CountRetryableRequests(); |
3493 | 0 | } |
3494 | | |
3495 | 605 | void RaftConsensus::TrackOperationMemory(const yb::OpId& op_id) { |
3496 | 605 | queue_->TrackOperationsMemory({op_id}); |
3497 | 605 | } |
3498 | | |
3499 | 23 | int64_t RaftConsensus::TEST_LeaderTerm() const { |
3500 | 23 | auto lock = state_->LockForRead(); |
3501 | 23 | return state_->GetCurrentTermUnlocked(); |
3502 | 23 | } |
3503 | | |
3504 | 161 | std::string RaftConsensus::DelayedStepDown::ToString() const { |
3505 | 161 | return YB_STRUCT_TO_STRING(term, protege, graceful); |
3506 | 161 | } |
3507 | | |
3508 | 3 | Result<OpId> RaftConsensus::TEST_GetLastOpIdWithType(OpIdType opid_type, OperationType op_type) { |
3509 | 3 | return queue_->TEST_GetLastOpIdWithType(VERIFY_RESULT(GetLastOpId(opid_type)).index, op_type); |
3510 | 3 | } |
3511 | | |
3512 | | } // namespace consensus |
3513 | | } // namespace yb |