/Users/deen/code/yugabyte-db/src/yb/consensus/replica_state.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | // |
18 | | // The following only applies to changes made to this file as part of YugaByte development. |
19 | | // |
20 | | // Portions Copyright (c) YugaByte, Inc. |
21 | | // |
22 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
23 | | // in compliance with the License. You may obtain a copy of the License at |
24 | | // |
25 | | // http://www.apache.org/licenses/LICENSE-2.0 |
26 | | // |
27 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
28 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
29 | | // or implied. See the License for the specific language governing permissions and limitations |
30 | | // under the License. |
31 | | // |
32 | | |
33 | | #include "yb/consensus/replica_state.h" |
34 | | |
35 | | #include <gflags/gflags.h> |
36 | | |
37 | | #include "yb/consensus/consensus.h" |
38 | | #include "yb/consensus/consensus_context.h" |
39 | | #include "yb/consensus/consensus_round.h" |
40 | | #include "yb/consensus/log_util.h" |
41 | | #include "yb/consensus/quorum_util.h" |
42 | | |
43 | | #include "yb/gutil/strings/substitute.h" |
44 | | |
45 | | #include "yb/util/atomic.h" |
46 | | #include "yb/util/debug/trace_event.h" |
47 | | #include "yb/util/enums.h" |
48 | | #include "yb/util/flag_tags.h" |
49 | | #include "yb/util/format.h" |
50 | | #include "yb/util/logging.h" |
51 | | #include "yb/util/opid.h" |
52 | | #include "yb/util/result.h" |
53 | | #include "yb/util/status.h" |
54 | | #include "yb/util/status_format.h" |
55 | | #include "yb/util/status_log.h" |
56 | | #include "yb/util/thread_restrictions.h" |
57 | | #include "yb/util/tostring.h" |
58 | | #include "yb/util/trace.h" |
59 | | |
60 | | using namespace std::literals; |
61 | | |
62 | | DEFINE_int32(inject_delay_commit_pre_voter_to_voter_secs, 0, |
63 | | "Amount of time to delay commit of a PRE_VOTER to VOTER transition. To be used for " |
64 | | "unit testing purposes only."); |
65 | | TAG_FLAG(inject_delay_commit_pre_voter_to_voter_secs, unsafe); |
66 | | TAG_FLAG(inject_delay_commit_pre_voter_to_voter_secs, hidden); |
67 | | |
68 | | namespace yb { |
69 | | namespace consensus { |
70 | | |
71 | | using std::string; |
72 | | using strings::Substitute; |
73 | | using strings::SubstituteAndAppend; |
74 | | |
75 | | ////////////////////////////////////////////////// |
76 | | // ReplicaState |
77 | | ////////////////////////////////////////////////// |
78 | | |
79 | | ReplicaState::ReplicaState( |
80 | | ConsensusOptions options, string peer_uuid, std::unique_ptr<ConsensusMetadata> cmeta, |
81 | | ConsensusContext* consensus_context, SafeOpIdWaiter* safe_op_id_waiter, |
82 | | RetryableRequests* retryable_requests, |
83 | | std::function<void(const OpIds&)> applied_ops_tracker) |
84 | | : options_(std::move(options)), |
85 | | peer_uuid_(std::move(peer_uuid)), |
86 | | cmeta_(std::move(cmeta)), |
87 | | context_(consensus_context), |
88 | | safe_op_id_waiter_(safe_op_id_waiter), |
89 | 150k | applied_ops_tracker_(std::move(applied_ops_tracker)) { |
90 | 18.4E | CHECK(cmeta_) << "ConsensusMeta passed as NULL"; |
91 | 150k | if (retryable_requests) { |
92 | 142k | retryable_requests_ = std::move(*retryable_requests); |
93 | 142k | } |
94 | | |
95 | 150k | CHECK(IsAcceptableAtomicImpl(leader_state_cache_)); |
96 | | |
97 | | // Actually we don't need this lock, but GetActiveRoleUnlocked checks that we are holding the |
98 | | // lock. |
99 | 150k | auto lock = LockForRead(); |
100 | 150k | CoarseTimePoint now; |
101 | 150k | RefreshLeaderStateCacheUnlocked(&now); |
102 | 150k | } |
103 | | |
104 | 75.6k | ReplicaState::~ReplicaState() { |
105 | 75.6k | } |
106 | | |
107 | 150k | Status ReplicaState::StartUnlocked(const OpIdPB& last_id_in_wal) { |
108 | 150k | DCHECK(IsLocked()); |
109 | | |
110 | | // Our last persisted term can be higher than the last persisted operation |
111 | | // (i.e. if we called an election) but reverse should never happen. |
112 | 150k | CHECK_LE(last_id_in_wal.term(), GetCurrentTermUnlocked()) << LogPrefix() |
113 | 0 | << "The last op in the WAL with id " << OpIdToString(last_id_in_wal) |
114 | 0 | << " has a term (" << last_id_in_wal.term() << ") that is greater " |
115 | 0 | << "than the latest recorded term, which is " << GetCurrentTermUnlocked(); |
116 | | |
117 | 150k | next_index_ = last_id_in_wal.index() + 1; |
118 | | |
119 | 150k | last_received_op_id_ = yb::OpId::FromPB(last_id_in_wal); |
120 | | |
121 | 150k | state_ = kRunning; |
122 | 150k | return Status::OK(); |
123 | 150k | } |
124 | | |
125 | 150k | Status ReplicaState::LockForStart(UniqueLock* lock) const { |
126 | 150k | ThreadRestrictions::AssertWaitAllowed(); |
127 | 150k | UniqueLock l(update_lock_); |
128 | 150k | CHECK_EQ(state_, kInitialized) << "Illegal state for Start()." |
129 | 0 | << " Replica is not in kInitialized state"; |
130 | 150k | lock->swap(l); |
131 | 150k | return Status::OK(); |
132 | 150k | } |
133 | | |
134 | 1.49G | bool ReplicaState::IsLocked() const { |
135 | 1.49G | std::unique_lock<std::mutex> lock(update_lock_, std::try_to_lock); |
136 | 1.49G | return !lock.owns_lock(); |
137 | 1.49G | } |
138 | | |
139 | 206M | ReplicaState::UniqueLock ReplicaState::LockForRead() const { |
140 | 206M | ThreadRestrictions::AssertWaitAllowed(); |
141 | 206M | return UniqueLock(update_lock_); |
142 | 206M | } |
143 | | |
144 | 0 | Status ReplicaState::LockForReplicate(UniqueLock* lock, const ReplicateMsg& msg) const { |
145 | 0 | DCHECK(!msg.has_id()) << "Should not have an ID yet: " << msg.ShortDebugString(); |
146 | 0 | CHECK(msg.has_op_type()); // TODO: better checking? |
147 | 0 | return LockForReplicate(lock); |
148 | 0 | } |
149 | | |
150 | 4.93M | Status ReplicaState::LockForReplicate(UniqueLock* lock) const { |
151 | 4.93M | ThreadRestrictions::AssertWaitAllowed(); |
152 | 4.93M | UniqueLock l(update_lock_); |
153 | 4.93M | if (PREDICT_FALSE(state_ != kRunning)) { |
154 | 0 | return STATUS(IllegalState, "Replica not in running state"); |
155 | 0 | } |
156 | | |
157 | 4.93M | lock->swap(l); |
158 | 4.93M | return Status::OK(); |
159 | 4.93M | } |
160 | | |
161 | 10.3M | Status ReplicaState::CheckIsActiveLeaderAndHasLease() const { |
162 | 10.3M | UniqueLock l(update_lock_); |
163 | 10.3M | if (PREDICT_FALSE(state_ != kRunning)) { |
164 | 0 | return STATUS(IllegalState, "Replica not in running state"); |
165 | 0 | } |
166 | 10.3M | return CheckActiveLeaderUnlocked(LeaderLeaseCheckMode::NEED_LEASE); |
167 | 10.3M | } |
168 | | |
169 | 34.7M | Status ReplicaState::LockForMajorityReplicatedIndexUpdate(UniqueLock* lock) const { |
170 | 34.7M | TRACE_EVENT0("consensus", "ReplicaState::LockForMajorityReplicatedIndexUpdate"); |
171 | 34.7M | ThreadRestrictions::AssertWaitAllowed(); |
172 | 34.7M | UniqueLock l(update_lock_); |
173 | | |
174 | 34.7M | if (PREDICT_FALSE(state_ != kRunning)) { |
175 | 14 | return STATUS(IllegalState, "Replica not in running state"); |
176 | 14 | } |
177 | | |
178 | 34.7M | if (PREDICT_FALSE(GetActiveRoleUnlocked() != PeerRole::LEADER)) { |
179 | 114 | return STATUS(IllegalState, "Replica not LEADER"); |
180 | 114 | } |
181 | 34.7M | lock->swap(l); |
182 | 34.7M | return Status::OK(); |
183 | 34.7M | } |
184 | | |
185 | 81.9M | LeaderState ReplicaState::GetLeaderState(bool allow_stale) const { |
186 | 81.9M | auto cache = leader_state_cache_.load(boost::memory_order_acquire); |
187 | | |
188 | 81.9M | if (!allow_stale) { |
189 | 28.6M | CoarseTimePoint now = CoarseMonoClock::Now(); |
190 | 28.6M | if (now >= cache.expire_at) { |
191 | 1.22k | auto lock = LockForRead(); |
192 | 1.22k | return RefreshLeaderStateCacheUnlocked(&now); |
193 | 1.22k | } |
194 | 28.6M | } |
195 | | |
196 | 81.9M | LeaderState result = {cache.status()}; |
197 | 81.9M | if (result.status == LeaderStatus::LEADER_AND_READY) { |
198 | 50.0M | result.term = cache.extra_value(); |
199 | 50.0M | } else { |
200 | 31.8M | if (result.status == LeaderStatus::LEADER_BUT_OLD_LEADER_MAY_HAVE_LEASE) { |
201 | 12.8k | result.remaining_old_leader_lease = MonoDelta::FromMicroseconds(cache.extra_value()); |
202 | 12.8k | } |
203 | 31.8M | result.MakeNotReadyLeader(result.status); |
204 | 31.8M | } |
205 | | |
206 | 81.9M | return result; |
207 | 81.9M | } |
208 | | |
209 | | LeaderState ReplicaState::GetLeaderStateUnlocked( |
210 | 96.7M | LeaderLeaseCheckMode lease_check_mode, CoarseTimePoint* now) const { |
211 | 96.7M | LeaderState result; |
212 | | |
213 | 96.7M | if (GetActiveRoleUnlocked() != PeerRole::LEADER) { |
214 | 9.22M | return result.MakeNotReadyLeader(LeaderStatus::NOT_LEADER); |
215 | 9.22M | } |
216 | | |
217 | 87.5M | if (!leader_no_op_committed_) { |
218 | | // This will cause the client to retry on the same server (won't try to find the new leader). |
219 | 308k | return result.MakeNotReadyLeader(LeaderStatus::LEADER_BUT_NO_OP_NOT_COMMITTED); |
220 | 308k | } |
221 | | |
222 | 87.2M | const auto lease_status = lease_check_mode != LeaderLeaseCheckMode::DONT_NEED_LEASE |
223 | 87.2M | ? GetLeaderLeaseStatusUnlocked(&result.remaining_old_leader_lease, now)84.3M |
224 | 87.2M | : LeaderLeaseStatus::HAS_LEASE2.91M ; |
225 | 87.2M | switch (lease_status) { |
226 | 20.0k | case LeaderLeaseStatus::OLD_LEADER_MAY_HAVE_LEASE: |
227 | | // Will retry on the same server. |
228 | 20.0k | VLOG(1) << "Old leader lease might still be active for " |
229 | 0 | << result.remaining_old_leader_lease.ToString(); |
230 | 20.0k | return result.MakeNotReadyLeader( |
231 | 20.0k | LeaderStatus::LEADER_BUT_OLD_LEADER_MAY_HAVE_LEASE); |
232 | | |
233 | 21.8k | case LeaderLeaseStatus::NO_MAJORITY_REPLICATED_LEASE: |
234 | | // Will retry to look up the leader, because it might have changed. |
235 | 21.8k | return result.MakeNotReadyLeader( |
236 | 21.8k | LeaderStatus::LEADER_BUT_NO_MAJORITY_REPLICATED_LEASE); |
237 | | |
238 | 87.2M | case LeaderLeaseStatus::HAS_LEASE: |
239 | 87.2M | result.status = LeaderStatus::LEADER_AND_READY; |
240 | 87.2M | result.term = GetCurrentTermUnlocked(); |
241 | 87.2M | return result; |
242 | 87.2M | } |
243 | | |
244 | 0 | FATAL_INVALID_ENUM_VALUE(LeaderLeaseStatus, lease_status); |
245 | 0 | } |
246 | | |
247 | 13.2M | Status ReplicaState::CheckActiveLeaderUnlocked(LeaderLeaseCheckMode lease_check_mode) const { |
248 | 13.2M | auto state = GetLeaderStateUnlocked(lease_check_mode); |
249 | 13.2M | if (state.status == LeaderStatus::NOT_LEADER) { |
250 | 814 | ConsensusStatePB cstate = ConsensusStateUnlocked(CONSENSUS_CONFIG_ACTIVE); |
251 | 814 | return STATUS_FORMAT(IllegalState, |
252 | 814 | "Replica $0 is not leader of this config. Role: $1. Consensus state: $2", |
253 | 814 | peer_uuid_, PeerRole_Name(GetActiveRoleUnlocked()), cstate); |
254 | 814 | } |
255 | | |
256 | 13.2M | return state.CreateStatus(); |
257 | 13.2M | } |
258 | | |
259 | 6.41M | Status ReplicaState::LockForConfigChange(UniqueLock* lock) const { |
260 | 6.41M | TRACE_EVENT0("consensus", "ReplicaState::LockForConfigChange"); |
261 | | |
262 | 6.41M | ThreadRestrictions::AssertWaitAllowed(); |
263 | 6.41M | UniqueLock l(update_lock_); |
264 | | // Can only change the config on running replicas. |
265 | 6.41M | if (PREDICT_FALSE(state_ != kRunning)) { |
266 | 9 | return STATUS(IllegalState, "Unable to lock ReplicaState for config change", |
267 | 9 | Substitute("State = $0", state_)); |
268 | 9 | } |
269 | 6.41M | lock->swap(l); |
270 | 6.41M | return Status::OK(); |
271 | 6.41M | } |
272 | | |
273 | 75.6M | Status ReplicaState::LockForUpdate(UniqueLock* lock) const { |
274 | 75.6M | TRACE_EVENT0("consensus", "ReplicaState::LockForUpdate"); |
275 | 75.6M | ThreadRestrictions::AssertWaitAllowed(); |
276 | 75.6M | UniqueLock l(update_lock_); |
277 | 75.6M | if (PREDICT_FALSE(state_ != kRunning)) { |
278 | 11 | return STATUS(IllegalState, "Replica not in running state"); |
279 | 11 | } |
280 | 75.6M | lock->swap(l); |
281 | 75.6M | return Status::OK(); |
282 | 75.6M | } |
283 | | |
284 | 151k | Status ReplicaState::LockForShutdown(UniqueLock* lock) { |
285 | 151k | TRACE_EVENT0("consensus", "ReplicaState::LockForShutdown"); |
286 | 151k | ThreadRestrictions::AssertWaitAllowed(); |
287 | 151k | UniqueLock l(update_lock_); |
288 | 151k | if (state_ != kShuttingDown && state_ != kShutDown75.6k ) { |
289 | 75.6k | state_ = kShuttingDown; |
290 | 75.6k | } |
291 | 151k | lock->swap(l); |
292 | 151k | return Status::OK(); |
293 | 151k | } |
294 | | |
295 | 75.6k | Status ReplicaState::ShutdownUnlocked() { |
296 | 75.6k | DCHECK(IsLocked()); |
297 | 75.6k | CHECK_EQ(state_, kShuttingDown); |
298 | 75.6k | state_ = kShutDown; |
299 | 75.6k | return Status::OK(); |
300 | 75.6k | } |
301 | | |
302 | 46.5M | ConsensusStatePB ReplicaState::ConsensusStateUnlocked(ConsensusConfigType type) const { |
303 | 46.5M | return cmeta_->ToConsensusStatePB(type); |
304 | 46.5M | } |
305 | | |
306 | 266M | PeerRole ReplicaState::GetActiveRoleUnlocked() const { |
307 | 266M | DCHECK(IsLocked()); |
308 | 266M | return cmeta_->active_role(); |
309 | 266M | } |
310 | | |
311 | 8.81M | bool ReplicaState::IsConfigChangePendingUnlocked() const { |
312 | 8.81M | DCHECK(IsLocked()); |
313 | 8.81M | return cmeta_->has_pending_config(); |
314 | 8.81M | } |
315 | | |
316 | 14.0k | Status ReplicaState::CheckNoConfigChangePendingUnlocked() const { |
317 | 14.0k | DCHECK(IsLocked()); |
318 | 14.0k | if (IsConfigChangePendingUnlocked()) { |
319 | 317 | return STATUS(IllegalState, |
320 | 317 | Substitute("RaftConfig change currently pending. Only one is allowed at a time.\n" |
321 | 317 | " Committed config: $0.\n Pending config: $1", |
322 | 317 | GetCommittedConfigUnlocked().ShortDebugString(), |
323 | 317 | GetPendingConfigUnlocked().ShortDebugString())); |
324 | 317 | } |
325 | 13.6k | return Status::OK(); |
326 | 14.0k | } |
327 | | |
328 | 19.5k | Status ReplicaState::SetPendingConfigUnlocked(const RaftConfigPB& new_config) { |
329 | 19.5k | DCHECK(IsLocked()); |
330 | 19.5k | RETURN_NOT_OK_PREPEND(VerifyRaftConfig(new_config, UNCOMMITTED_QUORUM), |
331 | 19.5k | "Invalid config to set as pending"); |
332 | 19.5k | if (!new_config.unsafe_config_change()) { |
333 | 19.5k | CHECK(!cmeta_->has_pending_config()) |
334 | 2 | << "Attempt to set pending config while another is already pending! " |
335 | 2 | << "Existing pending config: " << cmeta_->pending_config().ShortDebugString() << "; " |
336 | 2 | << "Attempted new pending config: " << new_config.ShortDebugString(); |
337 | 19.5k | } else if (50 cmeta_->has_pending_config()50 ) { |
338 | 0 | LOG_WITH_PREFIX(INFO) << "Allowing unsafe config change even though there is a pending config! " |
339 | 0 | << "Existing pending config: " |
340 | 0 | << cmeta_->pending_config().ShortDebugString() << "; " |
341 | 0 | << "New pending config: " << new_config.ShortDebugString(); |
342 | 0 | } |
343 | 19.5k | cmeta_->set_pending_config(new_config); |
344 | 19.5k | CoarseTimePoint now; |
345 | 19.5k | RefreshLeaderStateCacheUnlocked(&now); |
346 | 19.5k | return Status::OK(); |
347 | 19.5k | } |
348 | | |
349 | 19 | Status ReplicaState::ClearPendingConfigUnlocked() { |
350 | 19 | DCHECK(IsLocked()); |
351 | 19 | if (!cmeta_->has_pending_config()) { |
352 | 0 | LOG(WARNING) << "Attempt to clear a non-existent pending config." |
353 | 0 | << "Existing committed config: " << cmeta_->committed_config().ShortDebugString(); |
354 | 0 | return STATUS(IllegalState, "Attempt to clear a non-existent pending config."); |
355 | 0 | } |
356 | 19 | cmeta_->clear_pending_config(); |
357 | 19 | CoarseTimePoint now; |
358 | 19 | RefreshLeaderStateCacheUnlocked(&now); |
359 | 19 | return Status::OK(); |
360 | 19 | } |
361 | | |
362 | 2.95M | const RaftConfigPB& ReplicaState::GetPendingConfigUnlocked() const { |
363 | 2.95M | DCHECK(IsLocked()); |
364 | 2.95M | CHECK(IsConfigChangePendingUnlocked()) << "No pending config"0 ; |
365 | 2.95M | return cmeta_->pending_config(); |
366 | 2.95M | } |
367 | | |
368 | 19.5k | Status ReplicaState::SetCommittedConfigUnlocked(const RaftConfigPB& config_to_commit) { |
369 | 19.5k | TRACE_EVENT0("consensus", "ReplicaState::SetCommittedConfigUnlocked"); |
370 | 19.5k | DCHECK(IsLocked()); |
371 | 19.5k | DCHECK(config_to_commit.IsInitialized()); |
372 | 19.5k | RETURN_NOT_OK_PREPEND( |
373 | 19.5k | VerifyRaftConfig(config_to_commit, COMMITTED_QUORUM), "Invalid config to set as committed"); |
374 | | |
375 | | // Compare committed with pending configuration, ensure they are the same. |
376 | | // In the event of an unsafe config change triggered by an administrator, |
377 | | // it is possible that the config being committed may not match the pending config |
378 | | // because unsafe config change allows multiple pending configs to exist. |
379 | | // Therefore we only need to validate that 'config_to_commit' matches the pending config |
380 | | // if the pending config does not have its 'unsafe_config_change' flag set. |
381 | 19.5k | if (IsConfigChangePendingUnlocked()) { |
382 | 19.5k | const RaftConfigPB& pending_config = GetPendingConfigUnlocked(); |
383 | 19.5k | if (!pending_config.unsafe_config_change()) { |
384 | | // Pending will not have an opid_index, so ignore that field. |
385 | 19.4k | RaftConfigPB config_no_opid = config_to_commit; |
386 | 19.4k | config_no_opid.clear_opid_index(); |
387 | | // Quorums must be exactly equal, even w.r.t. peer ordering. |
388 | 19.4k | CHECK_EQ(GetPendingConfigUnlocked().SerializeAsString(), config_no_opid.SerializeAsString()) |
389 | 0 | << Substitute( |
390 | 0 | "New committed config must equal pending config, but does not. " |
391 | 0 | "Pending config: $0, committed config: $1", |
392 | 0 | pending_config.ShortDebugString(), config_to_commit.ShortDebugString()); |
393 | 19.4k | } |
394 | 19.5k | } |
395 | 19.5k | cmeta_->set_committed_config(config_to_commit); |
396 | 19.5k | cmeta_->clear_pending_config(); |
397 | 19.5k | CoarseTimePoint now; |
398 | 19.5k | RefreshLeaderStateCacheUnlocked(&now); |
399 | 19.5k | CHECK_OK(cmeta_->Flush()); |
400 | 19.5k | return Status::OK(); |
401 | 19.5k | } |
402 | | |
403 | 80.8M | const RaftConfigPB& ReplicaState::GetCommittedConfigUnlocked() const { |
404 | 80.8M | DCHECK(IsLocked()); |
405 | 80.8M | return cmeta_->committed_config(); |
406 | 80.8M | } |
407 | | |
408 | 100M | const RaftConfigPB& ReplicaState::GetActiveConfigUnlocked() const { |
409 | 100M | DCHECK(IsLocked()); |
410 | 100M | return cmeta_->active_config(); |
411 | 100M | } |
412 | | |
413 | 33.8M | bool ReplicaState::IsOpCommittedOrPending(const OpId& op_id, bool* term_mismatch) { |
414 | 33.8M | DCHECK(IsLocked()); |
415 | | |
416 | 33.8M | *term_mismatch = false; |
417 | | |
418 | 33.8M | if (cmeta_ == nullptr) { |
419 | 0 | LOG(FATAL) << "cmeta_ cannot be NULL"; |
420 | 0 | } |
421 | | |
422 | 33.8M | int64_t committed_index = GetCommittedOpIdUnlocked().index; |
423 | 33.8M | if (op_id.index <= committed_index) { |
424 | 17.4M | return true; |
425 | 17.4M | } |
426 | | |
427 | 16.3M | int64_t last_received_index = GetLastReceivedOpIdUnlocked().index; |
428 | 16.3M | if (op_id.index > last_received_index) { |
429 | 8.40M | return false; |
430 | 8.40M | } |
431 | | |
432 | 7.93M | scoped_refptr<ConsensusRound> round = GetPendingOpByIndexOrNullUnlocked(op_id.index); |
433 | 7.93M | if (round == nullptr) { |
434 | 0 | LOG_WITH_PREFIX(ERROR) |
435 | 0 | << "Consensus round not found for op id " << op_id << ": " |
436 | 0 | << "committed_index=" << committed_index << ", " |
437 | 0 | << "last_received_index=" << last_received_index << ", " |
438 | 0 | << "tablet: " << options_.tablet_id << ", current state: " |
439 | 0 | << ToStringUnlocked(); |
440 | 0 | DumpPendingOperationsUnlocked(); |
441 | 0 | CHECK(false); |
442 | 0 | } |
443 | | |
444 | 7.93M | if (round->id().term != op_id.term) { |
445 | 168 | *term_mismatch = true; |
446 | 168 | return false; |
447 | 168 | } |
448 | 7.93M | return true; |
449 | 7.93M | } |
450 | | |
451 | 757k | Status ReplicaState::SetCurrentTermUnlocked(int64_t new_term) { |
452 | 757k | TRACE_EVENT1("consensus", "ReplicaState::SetCurrentTermUnlocked", |
453 | 757k | "term", new_term); |
454 | 757k | DCHECK(IsLocked()); |
455 | 757k | if (PREDICT_FALSE(new_term <= GetCurrentTermUnlocked())) { |
456 | 0 | return STATUS(IllegalState, |
457 | 0 | Substitute("Cannot change term to a term that is lower than or equal to the current one. " |
458 | 0 | "Current: $0, Proposed: $1", GetCurrentTermUnlocked(), new_term)); |
459 | 0 | } |
460 | 757k | cmeta_->set_current_term(new_term); |
461 | 757k | cmeta_->clear_voted_for(); |
462 | | // OK to flush before clearing the leader, because the leader UUID is not part of |
463 | | // ConsensusMetadataPB. |
464 | 757k | RETURN_NOT_OK(cmeta_->Flush()); |
465 | 182k | ClearLeaderUnlocked(); |
466 | 182k | last_received_op_id_current_leader_ = yb::OpId(); |
467 | 182k | return Status::OK(); |
468 | 757k | } |
469 | | |
470 | 246M | const int64_t ReplicaState::GetCurrentTermUnlocked() const { |
471 | 246M | DCHECK(IsLocked()); |
472 | 246M | return cmeta_->current_term(); |
473 | 246M | } |
474 | | |
475 | 676k | void ReplicaState::SetLeaderUuidUnlocked(const std::string& uuid) { |
476 | 676k | DCHECK(IsLocked()); |
477 | 676k | cmeta_->set_leader_uuid(uuid); |
478 | 676k | CoarseTimePoint now; |
479 | 676k | RefreshLeaderStateCacheUnlocked(&now); |
480 | 676k | } |
481 | | |
482 | 104M | const string& ReplicaState::GetLeaderUuidUnlocked() const { |
483 | 104M | DCHECK(IsLocked()); |
484 | 104M | return cmeta_->leader_uuid(); |
485 | 104M | } |
486 | | |
487 | 62.1k | const bool ReplicaState::HasVotedCurrentTermUnlocked() const { |
488 | 62.1k | DCHECK(IsLocked()); |
489 | 62.1k | return cmeta_->has_voted_for(); |
490 | 62.1k | } |
491 | | |
492 | 170k | Status ReplicaState::SetVotedForCurrentTermUnlocked(const std::string& uuid) { |
493 | 170k | TRACE_EVENT1("consensus", "ReplicaState::SetVotedForCurrentTermUnlocked", |
494 | 170k | "uuid", uuid); |
495 | 170k | DCHECK(IsLocked()); |
496 | 170k | cmeta_->set_voted_for(uuid); |
497 | 170k | CHECK_OK(cmeta_->Flush()); |
498 | 170k | return Status::OK(); |
499 | 170k | } |
500 | | |
501 | 2.53k | const std::string& ReplicaState::GetVotedForCurrentTermUnlocked() const { |
502 | 2.53k | DCHECK(IsLocked()); |
503 | 2.53k | DCHECK(cmeta_->has_voted_for()); |
504 | 2.53k | return cmeta_->voted_for(); |
505 | 2.53k | } |
506 | | |
507 | 29.1M | const string& ReplicaState::GetPeerUuid() const { |
508 | 29.1M | return peer_uuid_; |
509 | 29.1M | } |
510 | | |
511 | 808k | const ConsensusOptions& ReplicaState::GetOptions() const { |
512 | 808k | return options_; |
513 | 808k | } |
514 | | |
515 | 0 | void ReplicaState::DumpPendingOperationsUnlocked() { |
516 | 0 | DCHECK(IsLocked()); |
517 | 0 | LOG_WITH_PREFIX(INFO) << "Dumping " << pending_operations_.size() |
518 | 0 | << " pending operations."; |
519 | 0 | for (const auto &round : pending_operations_) { |
520 | 0 | LOG_WITH_PREFIX(INFO) << round->replicate_msg()->ShortDebugString(); |
521 | 0 | } |
522 | 0 | } |
523 | | |
524 | 75.6k | Status ReplicaState::CancelPendingOperations() { |
525 | 75.6k | { |
526 | 75.6k | ThreadRestrictions::AssertWaitAllowed(); |
527 | 75.6k | UniqueLock lock(update_lock_); |
528 | 75.6k | if (state_ != kShuttingDown) { |
529 | 0 | return STATUS(IllegalState, "Can only wait for pending commits on kShuttingDown state."); |
530 | 0 | } |
531 | 75.6k | if (pending_operations_.empty()) { |
532 | 75.4k | return Status::OK(); |
533 | 75.4k | } |
534 | | |
535 | 222 | LOG_WITH_PREFIX(INFO) << "Trying to abort " << pending_operations_.size() |
536 | 222 | << " pending operations because of shutdown."; |
537 | 222 | auto abort_status = STATUS(Aborted, "Operation aborted"); |
538 | 222 | int i = 0; |
539 | 613 | for (size_t idx = pending_operations_.size(); idx > 0; ) { |
540 | 391 | --idx; |
541 | | // We cancel only operations whose applies have not yet been triggered. |
542 | 391 | constexpr auto kLogAbortedOperationsNum = 10; |
543 | 391 | if (++i <= kLogAbortedOperationsNum) { |
544 | 382 | LOG_WITH_PREFIX(INFO) << "Aborting operation because of shutdown: " |
545 | 382 | << pending_operations_[idx]->replicate_msg()->ShortDebugString(); |
546 | 382 | } |
547 | 391 | NotifyReplicationFinishedUnlocked(pending_operations_[idx], abort_status, OpId::kUnknownTerm, |
548 | 391 | nullptr /* applied_op_ids */); |
549 | 391 | } |
550 | 222 | } |
551 | 0 | return Status::OK(); |
552 | 75.6k | } |
553 | | |
554 | | struct PendingOperationsComparator { |
555 | 0 | bool operator()(const ConsensusRoundPtr& lhs, int64_t rhs) const { |
556 | 0 | return lhs->id().index < rhs; |
557 | 0 | } |
558 | | |
559 | 0 | bool operator()(int64_t lhs, const ConsensusRoundPtr& rhs) const { |
560 | 0 | return lhs < rhs->id().index; |
561 | 0 | } |
562 | | }; |
563 | | |
564 | 7.94M | ReplicaState::PendingOperations::iterator ReplicaState::FindPendingOperation(int64_t index) { |
565 | 7.94M | if (pending_operations_.empty()) { |
566 | 0 | return pending_operations_.end(); |
567 | 0 | } |
568 | | |
569 | 7.94M | size_t offset = index - pending_operations_.front()->id().index; |
570 | | // If index < pending_operations_.front()->id().index() then offset will be very big positive |
571 | | // number, so could check both bounds in one comparison. |
572 | 7.94M | if (offset >= pending_operations_.size()) { |
573 | 164 | return pending_operations_.end(); |
574 | 164 | } |
575 | | |
576 | 7.94M | auto result = pending_operations_.begin() + offset; |
577 | 7.94M | DCHECK_EQ((**result).id().index, index); |
578 | 7.94M | return result; |
579 | 7.94M | } |
580 | | |
581 | 168 | Status ReplicaState::AbortOpsAfterUnlocked(int64_t new_preceding_idx) { |
582 | 168 | DCHECK(IsLocked()); |
583 | 168 | LOG_WITH_PREFIX(INFO) |
584 | 168 | << "Aborting all operations after (but not including): " |
585 | 168 | << new_preceding_idx << ". Current State: " << ToStringUnlocked(); |
586 | | |
587 | 168 | DCHECK_GE(new_preceding_idx, 0); |
588 | 168 | yb::OpId new_preceding; |
589 | | |
590 | 168 | auto preceding_op_iter = FindPendingOperation(new_preceding_idx); |
591 | | |
592 | | // Either the new preceding id is in the pendings set or it must be equal to the |
593 | | // committed index since we can't truncate already committed operations. |
594 | 168 | if (preceding_op_iter != pending_operations_.end()) { |
595 | 4 | new_preceding = (**preceding_op_iter).id(); |
596 | 4 | ++preceding_op_iter; |
597 | 164 | } else { |
598 | 164 | CHECK_EQ(new_preceding_idx, last_committed_op_id_.index); |
599 | 164 | new_preceding = last_committed_op_id_; |
600 | 164 | if (!pending_operations_.empty() && |
601 | 164 | pending_operations_.front()->id().index > new_preceding_idx) { |
602 | 164 | preceding_op_iter = pending_operations_.begin(); |
603 | 164 | } |
604 | 164 | } |
605 | | |
606 | | // This is the same as UpdateLastReceivedOpIdUnlocked() but we do it |
607 | | // here to avoid the bounds check, since we're breaking monotonicity. |
608 | 168 | last_received_op_id_ = new_preceding; |
609 | 168 | last_received_op_id_current_leader_ = OpId(); |
610 | 168 | next_index_ = new_preceding.index + 1; |
611 | | |
612 | 168 | auto abort_status = STATUS(Aborted, "Operation aborted by new leader"); |
613 | 439 | for (auto it = pending_operations_.end(); it != preceding_op_iter;) { |
614 | 271 | const ConsensusRoundPtr& round = *--it; |
615 | 271 | auto op_id = OpId::FromPB(round->replicate_msg()->id()); |
616 | 271 | LOG_WITH_PREFIX(INFO) << "Aborting uncommitted operation due to leader change: " |
617 | 271 | << op_id << ", committed: " << last_committed_op_id_; |
618 | 271 | NotifyReplicationFinishedUnlocked(round, abort_status, OpId::kUnknownTerm, |
619 | 271 | nullptr /* applied_op_ids */); |
620 | 271 | } |
621 | | |
622 | | // Clear entries from pending operations. |
623 | 168 | pending_operations_.erase(preceding_op_iter, pending_operations_.end()); |
624 | 168 | CheckPendingOperationsHead(); |
625 | | |
626 | 168 | return Status::OK(); |
627 | 168 | } |
628 | | |
629 | 14.5M | Status ReplicaState::AddPendingOperation(const ConsensusRoundPtr& round, OperationMode mode) { |
630 | 14.5M | DCHECK(IsLocked()); |
631 | | |
632 | 14.5M | auto op_type = round->replicate_msg()->op_type(); |
633 | 14.5M | if (PREDICT_FALSE(state_ != kRunning)) { |
634 | | // Special case when we're configuring and this is a config change, refuse |
635 | | // everything else. |
636 | | // TODO: Don't require a NO_OP to get to kRunning state |
637 | 0 | if (op_type != NO_OP) { |
638 | 0 | return STATUS(IllegalState, "Cannot trigger prepare. Replica is not in kRunning state."); |
639 | 0 | } |
640 | 0 | } |
641 | | |
642 | 14.5M | if (mode == OperationMode::kLeader) { |
643 | 5.11M | RETURN_NOT_OK(context_->CheckOperationAllowed(round->id(), op_type)); |
644 | 5.11M | } |
645 | | |
646 | | // Mark pending configuration. |
647 | 14.5M | if (PREDICT_FALSE(op_type == CHANGE_CONFIG_OP)) { |
648 | 19.8k | DCHECK(round->replicate_msg()->change_config_record().has_old_config()); |
649 | 19.8k | DCHECK(round->replicate_msg()->change_config_record().old_config().has_opid_index()); |
650 | 19.8k | DCHECK(round->replicate_msg()->change_config_record().has_new_config()); |
651 | 19.8k | DCHECK(!round->replicate_msg()->change_config_record().new_config().has_opid_index()); |
652 | 19.8k | if (mode == OperationMode::kFollower) { |
653 | 13.9k | const RaftConfigPB& old_config = round->replicate_msg()->change_config_record().old_config(); |
654 | 13.9k | const RaftConfigPB& new_config = round->replicate_msg()->change_config_record().new_config(); |
655 | | // The leader has to mark the configuration as pending before it gets here |
656 | | // because the active configuration affects the replication queue. |
657 | | // Do one last sanity check. |
658 | 13.9k | Status s = CheckNoConfigChangePendingUnlocked(); |
659 | 13.9k | if (PREDICT_FALSE(!s.ok() && !new_config.unsafe_config_change())) { |
660 | 317 | s = s.CloneAndAppend(Format("New config: $0", new_config)); |
661 | 317 | LOG_WITH_PREFIX(INFO) << s; |
662 | 317 | return s; |
663 | 317 | } |
664 | | // Check if the pending Raft config has an OpId less than the committed |
665 | | // config. If so, this is a replay at startup in which the COMMIT |
666 | | // messages were delayed. |
667 | 13.6k | const RaftConfigPB& committed_config = GetCommittedConfigUnlocked(); |
668 | 13.6k | if (round->replicate_msg()->id().index() > committed_config.opid_index()13.6k ) { |
669 | 13.6k | CHECK_OK(SetPendingConfigUnlocked(new_config)); |
670 | 18.4E | } else { |
671 | 18.4E | LOG_WITH_PREFIX(INFO) |
672 | 18.4E | << "Ignoring setting pending config change with OpId " |
673 | 18.4E | << round->replicate_msg()->id() << " because the committed config has OpId index " |
674 | 18.4E | << committed_config.opid_index() << ". The config change we are ignoring is: " |
675 | 18.4E | << "Old config: { " << old_config.ShortDebugString() << " }. " |
676 | 18.4E | << "New config: { " << new_config.ShortDebugString() << " }"; |
677 | 18.4E | } |
678 | 13.6k | } |
679 | 14.5M | } else if (op_type == WRITE_OP) { |
680 | | // Leader registers an operation with RetryableRequests even before assigning an op id. |
681 | 8.55M | if (mode == OperationMode::kFollower && !retryable_requests_.Register(round)5.53M ) { |
682 | 0 | return STATUS(IllegalState, "Cannot register retryable request on follower"); |
683 | 0 | } |
684 | 8.55M | } else if (5.95M op_type == SPLIT_OP5.95M ) { |
685 | 71 | const auto& split_request = round->replicate_msg()->split_request(); |
686 | 71 | SCHECK_EQ( |
687 | 71 | split_request.tablet_id(), cmeta_->tablet_id(), InvalidArgument, |
688 | 71 | "Received split op for a different tablet."); |
689 | | // TODO(tsplit): if we get failures past this point we can't undo the tablet state. |
690 | | // Might be need some tool to be able to remove SPLIT_OP from Raft log. |
691 | 71 | } |
692 | | |
693 | 14.5M | LOG_IF_WITH_PREFIX1.39k ( |
694 | 1.39k | DFATAL, |
695 | 1.39k | !pending_operations_.empty() && |
696 | 1.39k | pending_operations_.back()->id().index + 1 != round->id().index) |
697 | 1.39k | << "Adding operation with wrong index: " << AsString(round) << ", last op id: " |
698 | 1.39k | << AsString(pending_operations_.back()->id()) << ", operations: " |
699 | 1.39k | << AsString(pending_operations_); |
700 | 14.5M | pending_operations_.push_back(round); |
701 | 14.5M | CheckPendingOperationsHead(); |
702 | 14.5M | return Status::OK(); |
703 | 14.5M | } |
704 | | |
705 | 7.94M | scoped_refptr<ConsensusRound> ReplicaState::GetPendingOpByIndexOrNullUnlocked(int64_t index) { |
706 | 7.94M | DCHECK(IsLocked()); |
707 | 7.94M | auto it = FindPendingOperation(index); |
708 | 7.94M | if (it == pending_operations_.end()) { |
709 | 0 | return nullptr; |
710 | 0 | } |
711 | 7.94M | return *it; |
712 | 7.94M | } |
713 | | |
714 | | Status ReplicaState::UpdateMajorityReplicatedUnlocked( |
715 | | const OpId& majority_replicated, OpId* committed_op_id, |
716 | 34.7M | bool* committed_op_id_changed, OpId* last_applied_op_id) { |
717 | 34.7M | DCHECK(IsLocked()); |
718 | 34.7M | if (PREDICT_FALSE(state_ == kShuttingDown || state_ == kShutDown)) { |
719 | 0 | return STATUS(ServiceUnavailable, "Cannot trigger apply. Replica is shutting down."); |
720 | 0 | } |
721 | 34.7M | if (PREDICT_FALSE(state_ != kRunning)) { |
722 | 0 | return STATUS(IllegalState, "Cannot trigger apply. Replica is not in kRunning state."); |
723 | 0 | } |
724 | | |
725 | | // If the last committed operation was in the current term (the normal case) |
726 | | // then 'committed_op_id' is simply equal to majority replicated. |
727 | 34.7M | if (last_committed_op_id_.term == GetCurrentTermUnlocked()) { |
728 | 34.6M | *committed_op_id_changed = VERIFY_RESULT(AdvanceCommittedOpIdUnlocked( |
729 | 0 | majority_replicated, CouldStop::kFalse)); |
730 | 0 | *committed_op_id = last_committed_op_id_; |
731 | 34.6M | *last_applied_op_id = GetLastAppliedOpIdUnlocked(); |
732 | 34.6M | return Status::OK(); |
733 | 34.6M | } |
734 | | |
735 | | // If the last committed operation is not in the current term (such as when |
736 | | // we change leaders) but 'majority_replicated' is then we can advance the |
737 | | // 'committed_op_id' too. |
738 | 127k | if (majority_replicated.term == GetCurrentTermUnlocked()) { |
739 | 61.9k | auto previous = last_committed_op_id_; |
740 | 61.9k | *committed_op_id_changed = VERIFY_RESULT(AdvanceCommittedOpIdUnlocked( |
741 | 0 | majority_replicated, CouldStop::kFalse)); |
742 | 0 | *committed_op_id = last_committed_op_id_; |
743 | 61.9k | *last_applied_op_id = GetLastAppliedOpIdUnlocked(); |
744 | 61.9k | LOG_WITH_PREFIX(INFO) |
745 | 61.9k | << "Advanced the committed_op_id across terms." |
746 | 61.9k | << " Last committed operation was: " << previous |
747 | 61.9k | << " New committed index is: " << last_committed_op_id_; |
748 | 61.9k | return Status::OK(); |
749 | 61.9k | } |
750 | | |
751 | 65.9k | *committed_op_id = last_committed_op_id_; |
752 | 65.9k | *last_applied_op_id = GetLastAppliedOpIdUnlocked(); |
753 | 65.9k | YB_LOG_EVERY_N_SECS(WARNING, 1) << LogPrefix() |
754 | 24.1k | << "Can't advance the committed index across term boundaries" |
755 | 24.1k | << " until operations from the current term are replicated." |
756 | 24.1k | << " Last committed operation was: " << last_committed_op_id_ << "," |
757 | 24.1k | << " New majority replicated is: " << majority_replicated << "," |
758 | 24.1k | << " Current term is: " << GetCurrentTermUnlocked(); |
759 | | |
760 | 65.9k | return Status::OK(); |
761 | 127k | } |
762 | | |
763 | 13.2M | void ReplicaState::SetLastCommittedIndexUnlocked(const yb::OpId& committed_op_id) { |
764 | 13.2M | DCHECK(IsLocked()); |
765 | 13.2M | CHECK_GE(last_received_op_id_.index, committed_op_id.index); |
766 | 13.2M | last_committed_op_id_ = committed_op_id; |
767 | 13.2M | CheckPendingOperationsHead(); |
768 | 13.2M | } |
769 | | |
770 | 150k | Status ReplicaState::InitCommittedOpIdUnlocked(const yb::OpId& committed_op_id) { |
771 | 150k | if (!last_committed_op_id_.empty()) { |
772 | 0 | return STATUS_FORMAT( |
773 | 0 | IllegalState, |
774 | 0 | "Committed index already initialized to: $0, tried to set $1", |
775 | 0 | last_committed_op_id_, |
776 | 0 | committed_op_id); |
777 | 0 | } |
778 | | |
779 | 150k | if (!pending_operations_.empty() && |
780 | 150k | committed_op_id.index >= pending_operations_.front()->id().index66 ) { |
781 | 0 | RETURN_NOT_OK(ApplyPendingOperationsUnlocked(committed_op_id, CouldStop::kFalse)); |
782 | 0 | } |
783 | | |
784 | 150k | SetLastCommittedIndexUnlocked(committed_op_id); |
785 | | |
786 | 150k | return Status::OK(); |
787 | 150k | } |
788 | | |
789 | 40.7M | void ReplicaState::CheckPendingOperationsHead() const { |
790 | 40.7M | if (pending_operations_.empty() || last_committed_op_id_.empty()28.1M || |
791 | 40.7M | pending_operations_.front()->id().index == last_committed_op_id_.index + 127.8M ) { |
792 | 40.7M | return; |
793 | 40.7M | } |
794 | | |
795 | 7.34k | LOG_WITH_PREFIX(FATAL) |
796 | 7.34k | << "The first pending operation's index is supposed to immediately follow the last committed " |
797 | 7.34k | << "operation's index. Committed op id: " << last_committed_op_id_ << ", pending operations: " |
798 | 7.34k | << AsString(pending_operations_); |
799 | 7.34k | } |
800 | | |
801 | | Result<bool> ReplicaState::AdvanceCommittedOpIdUnlocked( |
802 | 85.5M | const yb::OpId& committed_op_id, CouldStop could_stop) { |
803 | 85.5M | DCHECK(IsLocked()); |
804 | | // If we already committed up to (or past) 'id' return. |
805 | | // This can happen in the case that multiple UpdateConsensus() calls end |
806 | | // up in the RPC queue at the same time, and then might get interleaved out |
807 | | // of order. |
808 | 85.5M | if (last_committed_op_id_.index >= committed_op_id.index) { |
809 | 72.5M | VLOG_WITH_PREFIX884 (1) |
810 | 884 | << "Already marked ops through " << last_committed_op_id_ << " as committed. " |
811 | 884 | << "Now trying to mark " << committed_op_id << " which would be a no-op."; |
812 | 72.5M | return false; |
813 | 72.5M | } |
814 | | |
815 | 13.0M | if (pending_operations_.empty()) { |
816 | 0 | VLOG_WITH_PREFIX(1) << "No operations to mark as committed up to: " |
817 | 0 | << committed_op_id; |
818 | 0 | return STATUS_FORMAT( |
819 | 0 | NotFound, |
820 | 0 | "No pending entries, requested to advance last committed OpId from $0 to $1, " |
821 | 0 | "last received: $2", |
822 | 0 | last_committed_op_id_, committed_op_id, last_received_op_id_); |
823 | 0 | } |
824 | | |
825 | 13.0M | CheckPendingOperationsHead(); |
826 | | |
827 | 13.0M | auto old_index = last_committed_op_id_.index; |
828 | | |
829 | 13.0M | auto status = ApplyPendingOperationsUnlocked(committed_op_id, could_stop); |
830 | 13.0M | if (!status.ok()) { |
831 | 0 | return status; |
832 | 0 | } |
833 | | |
834 | 13.0M | return last_committed_op_id_.index != old_index; |
835 | 13.0M | } |
836 | | |
837 | | Status ReplicaState::ApplyPendingOperationsUnlocked( |
838 | 13.0M | const yb::OpId& committed_op_id, CouldStop could_stop) { |
839 | 13.0M | DCHECK(IsLocked()); |
840 | 18.4E | VLOG_WITH_PREFIX(1) << "Last triggered apply was: " << last_committed_op_id_; |
841 | | |
842 | | // Stop at the operation after the last one we must commit. This iterator by definition points to |
843 | | // the first entry greater than the committed index, so the entry preceding that must have the |
844 | | // OpId equal to committed_op_id. |
845 | | |
846 | 13.0M | auto prev_id = last_committed_op_id_; |
847 | 13.0M | yb::OpId max_allowed_op_id; |
848 | 13.0M | if (!safe_op_id_waiter_) { |
849 | 0 | max_allowed_op_id.index = std::numeric_limits<int64_t>::max(); |
850 | 0 | } |
851 | 13.0M | auto leader_term = GetLeaderStateUnlocked().term; |
852 | | |
853 | 13.0M | OpIds applied_op_ids; |
854 | 13.0M | applied_op_ids.reserve(committed_op_id.index - prev_id.index); |
855 | | |
856 | 13.0M | Status status; |
857 | | |
858 | 27.5M | while (!pending_operations_.empty()) { |
859 | 15.0M | auto round = pending_operations_.front(); |
860 | 15.0M | auto current_id = round->id(); |
861 | | |
862 | 15.0M | if (PREDICT_TRUE(prev_id)) { |
863 | 14.9M | CHECK_OK(CheckOpInSequence(prev_id, current_id)); |
864 | 14.9M | } |
865 | | |
866 | 15.0M | if (current_id.index > committed_op_id.index) { |
867 | 528k | break; |
868 | 528k | } |
869 | | |
870 | 14.5M | auto type = round->replicate_msg()->op_type(); |
871 | | |
872 | | // For write operations we block rocksdb flush, until appropriate records are written to the |
873 | | // log file. So we could apply them before adding to log. |
874 | 14.5M | if (type == OperationType::WRITE_OP) { |
875 | 8.55M | if (could_stop && !context_->ShouldApplyWrite()5.53M ) { |
876 | 0 | YB_LOG_EVERY_N_SECS(WARNING, 5) << LogPrefix() |
877 | 0 | << "Stop apply pending operations, because of write delay required, last applied: " |
878 | 0 | << prev_id << " of " << committed_op_id; |
879 | 0 | break; |
880 | 0 | } |
881 | 8.55M | } else if (5.97M current_id.index > max_allowed_op_id.index5.97M || |
882 | 5.97M | current_id.term > max_allowed_op_id.term613k ) { |
883 | 5.36M | max_allowed_op_id = safe_op_id_waiter_->WaitForSafeOpIdToApply(current_id); |
884 | | // This situation should not happen. Prior to #4150 it could happen as follows. Suppose |
885 | | // replica A was the leader of term 1 and added operations 1.100 and 1.101 to the WAL but |
886 | | // has not committed them yet. Replica B decides that A is unavailable, starts and wins |
887 | | // term 2 election, and tries to replicate a no-op 2.100. Replica A starts and wins term 3 |
888 | | // election and then continues to replicate 1.100 and 1.101 and the new no-op 3.102. |
889 | | // Suppose an UpdateConsensus from replica A reaches replica B with a committed op id of |
890 | | // 3.102 (because perhaps some other replica has already received those entries). Replica B |
891 | | // will abort 2.100 and try to apply all three operations. Suppose the last op id flushed to |
892 | | // the WAL on replica B is currently 2.100, and current_id is 1.101. Then |
893 | | // WaitForSafeOpIdToApply would return 2.100 immediately as 2.100 > 1.101 in terms of OpId |
894 | | // comparison, and we will throw an error here. |
895 | | // |
896 | | // However, after the #4150 fix we are resetting flushed op id using ResetLastSynchedOpId |
897 | | // when aborting operations during term changes, so WaitForSafeOpIdToApply would correctly |
898 | | // wait until 1.101 is written and return 1.101 or 3.102 in the above example. |
899 | 5.36M | if (max_allowed_op_id.index < current_id.index5.36M || max_allowed_op_id.term < current_id.term) { |
900 | 0 | status = STATUS_FORMAT( |
901 | 0 | RuntimeError, |
902 | 0 | "Bad max allowed op id ($0), term/index must be no less than that of current op id " |
903 | 0 | "($1)", |
904 | 0 | max_allowed_op_id, current_id); |
905 | 0 | break; |
906 | 0 | } |
907 | 5.36M | } |
908 | | |
909 | 14.5M | pending_operations_.pop_front(); |
910 | | // Set committed configuration. |
911 | 14.5M | if (PREDICT_FALSE(type == OperationType::CHANGE_CONFIG_OP)) { |
912 | 19.5k | ApplyConfigChangeUnlocked(round); |
913 | 19.5k | } |
914 | | |
915 | 14.5M | prev_id = current_id; |
916 | 14.5M | NotifyReplicationFinishedUnlocked(round, Status::OK(), leader_term, &applied_op_ids); |
917 | 14.5M | } |
918 | | |
919 | 13.0M | SetLastCommittedIndexUnlocked(prev_id); |
920 | | |
921 | 13.0M | applied_ops_tracker_(applied_op_ids); |
922 | | |
923 | 13.0M | return status; |
924 | 13.0M | } |
925 | | |
926 | 19.5k | void ReplicaState::ApplyConfigChangeUnlocked(const ConsensusRoundPtr& round) { |
927 | 19.5k | DCHECK(round->replicate_msg()->change_config_record().has_old_config()); |
928 | 19.5k | DCHECK(round->replicate_msg()->change_config_record().has_new_config()); |
929 | 19.5k | RaftConfigPB old_config = round->replicate_msg()->change_config_record().old_config(); |
930 | 19.5k | RaftConfigPB new_config = round->replicate_msg()->change_config_record().new_config(); |
931 | 19.5k | DCHECK(old_config.has_opid_index()); |
932 | 19.5k | DCHECK(!new_config.has_opid_index()); |
933 | | |
934 | 19.5k | const OpId& current_id = round->id(); |
935 | | |
936 | 19.5k | if (PREDICT_FALSE(FLAGS_inject_delay_commit_pre_voter_to_voter_secs)) { |
937 | 7 | bool is_transit_to_voter = |
938 | 7 | CountVotersInTransition(old_config) > CountVotersInTransition(new_config); |
939 | 7 | if (is_transit_to_voter) { |
940 | 4 | LOG_WITH_PREFIX(INFO) |
941 | 4 | << "Commit skipped as inject_delay_commit_pre_voter_to_voter_secs flag is set to true.\n" |
942 | 4 | << " Old config: { " << old_config.ShortDebugString() << " }.\n" |
943 | 4 | << " New config: { " << new_config.ShortDebugString() << " }"; |
944 | 4 | SleepFor(MonoDelta::FromSeconds(FLAGS_inject_delay_commit_pre_voter_to_voter_secs)); |
945 | 4 | } |
946 | 7 | } |
947 | | |
948 | 19.5k | new_config.set_opid_index(current_id.index); |
949 | | // Check if the pending Raft config has an OpId less than the committed |
950 | | // config. If so, this is a replay at startup in which the COMMIT |
951 | | // messages were delayed. |
952 | 19.5k | const RaftConfigPB& committed_config = GetCommittedConfigUnlocked(); |
953 | 19.5k | if (new_config.opid_index() > committed_config.opid_index()) { |
954 | 19.5k | LOG_WITH_PREFIX(INFO) |
955 | 19.5k | << "Committing config change with OpId " |
956 | 19.5k | << current_id << ". Old config: { " << old_config.ShortDebugString() << " }. " |
957 | 19.5k | << "New config: { " << new_config.ShortDebugString() << " }"; |
958 | 19.5k | CHECK_OK(SetCommittedConfigUnlocked(new_config)); |
959 | 19.5k | } else { |
960 | 3 | LOG_WITH_PREFIX(INFO) |
961 | 3 | << "Ignoring commit of config change with OpId " |
962 | 3 | << current_id << " because the committed config has OpId index " |
963 | 3 | << committed_config.opid_index() << ". The config change we are ignoring is: " |
964 | 3 | << "Old config: { " << old_config.ShortDebugString() << " }. " |
965 | 3 | << "New config: { " << new_config.ShortDebugString() << " }"; |
966 | 3 | } |
967 | 19.5k | } |
968 | | |
969 | 233M | const yb::OpId& ReplicaState::GetCommittedOpIdUnlocked() const { |
970 | 233M | DCHECK(IsLocked()); |
971 | 233M | return last_committed_op_id_; |
972 | 233M | } |
973 | | |
974 | 24.2M | RestartSafeCoarseMonoClock& ReplicaState::Clock() { |
975 | 24.2M | return retryable_requests_.Clock(); |
976 | 24.2M | } |
977 | | |
978 | 0 | RetryableRequestsCounts ReplicaState::TEST_CountRetryableRequests() { |
979 | 0 | auto lock = LockForRead(); |
980 | 0 | return retryable_requests_.TEST_Counts(); |
981 | 0 | } |
982 | | |
983 | 2.91M | bool ReplicaState::AreCommittedAndCurrentTermsSameUnlocked() const { |
984 | 2.91M | int64_t term = GetCurrentTermUnlocked(); |
985 | 2.91M | const auto& opid = GetCommittedOpIdUnlocked(); |
986 | 2.91M | if (opid.term != term) { |
987 | 0 | LOG(INFO) << "committed term=" << opid.term << ", current term=" << term; |
988 | 0 | return false; |
989 | 0 | } |
990 | 2.91M | return true; |
991 | 2.91M | } |
992 | | |
993 | 13.2M | void ReplicaState::UpdateLastReceivedOpIdUnlocked(const OpIdPB& op_id) { |
994 | 13.2M | DCHECK(IsLocked()); |
995 | 13.2M | auto* trace = Trace::CurrentTrace(); |
996 | 18.4E | DCHECK(last_received_op_id_.term <= op_id.term() && last_received_op_id_.index <= op_id.index()) |
997 | 18.4E | << LogPrefix() << ": " |
998 | 18.4E | << "Previously received OpId: " << last_received_op_id_ |
999 | 18.4E | << ", updated OpId: " << op_id.ShortDebugString() |
1000 | 18.4E | << ", Trace:" << std::endl << (trace ? trace->DumpToString(true)0 : "No trace found"); |
1001 | | |
1002 | 13.2M | last_received_op_id_ = yb::OpId::FromPB(op_id); |
1003 | 13.2M | last_received_op_id_current_leader_ = last_received_op_id_; |
1004 | 13.2M | next_index_ = op_id.index() + 1; |
1005 | 13.2M | } |
1006 | | |
1007 | 98.6M | const yb::OpId& ReplicaState::GetLastReceivedOpIdUnlocked() const { |
1008 | 98.6M | DCHECK(IsLocked()); |
1009 | 98.6M | return last_received_op_id_; |
1010 | 98.6M | } |
1011 | | |
1012 | 25.5M | const yb::OpId& ReplicaState::GetLastReceivedOpIdCurLeaderUnlocked() const { |
1013 | 25.5M | DCHECK(IsLocked()); |
1014 | 25.5M | return last_received_op_id_current_leader_; |
1015 | 25.5M | } |
1016 | | |
1017 | 25.4M | OpId ReplicaState::GetLastPendingOperationOpIdUnlocked() const { |
1018 | 25.4M | DCHECK(IsLocked()); |
1019 | 25.4M | return pending_operations_.empty() ? OpId()17.4M : pending_operations_.back()->id()7.95M ; |
1020 | 25.4M | } |
1021 | | |
1022 | 5.11M | OpId ReplicaState::NewIdUnlocked() { |
1023 | 5.11M | DCHECK(IsLocked()); |
1024 | 5.11M | return OpId(GetCurrentTermUnlocked(), next_index_++); |
1025 | 5.11M | } |
1026 | | |
1027 | 23 | void ReplicaState::CancelPendingOperation(const OpId& id, bool should_exist) { |
1028 | 23 | DCHECK(IsLocked()); |
1029 | 23 | CHECK_EQ(GetCurrentTermUnlocked(), id.term); |
1030 | 23 | CHECK_EQ(next_index_, id.index + 1); |
1031 | 23 | next_index_ = id.index; |
1032 | | |
1033 | | // We don't use UpdateLastReceivedOpIdUnlocked because we're actually |
1034 | | // updating it back to a lower value and we need to avoid the checks |
1035 | | // that method has. |
1036 | | |
1037 | | // This is only ok if we do _not_ release the lock after calling |
1038 | | // NewIdUnlocked() (which we don't in RaftConsensus::Replicate()). |
1039 | | // The below is correct because we always have starting NoOp, that has the same term and was |
1040 | | // replicated. |
1041 | 23 | last_received_op_id_ = OpId(id.term, id.index - 1); |
1042 | 23 | if (should_exist) { |
1043 | 0 | CHECK(!pending_operations_.empty() && pending_operations_.back()->id() == id) |
1044 | 0 | << "Pending operations should end with: " << id << ", but there are: " |
1045 | 0 | << AsString(pending_operations_); |
1046 | 0 | pending_operations_.pop_back(); |
1047 | 23 | } else { |
1048 | | // It could happen only in leader, while we add new operations, since we already have no op |
1049 | | // in this term, that would not be cancelled, we could be sure that previous operation |
1050 | | // has the same term. |
1051 | 23 | OpId expected_last_op_id(id.term, id.index - 1); |
1052 | 23 | CHECK(pending_operations_.empty() || |
1053 | 0 | (pending_operations_.back()->id() == expected_last_op_id)) |
1054 | 0 | << "Pending operations should end with: " << expected_last_op_id << ", but there are: " |
1055 | 0 | << AsString(pending_operations_); |
1056 | 23 | } |
1057 | 23 | } |
1058 | | |
1059 | 9.49M | string ReplicaState::LogPrefix() const { |
1060 | 9.49M | auto role_and_term = cmeta_->GetRoleAndTerm(); |
1061 | 9.49M | return Substitute("T $0 P $1 [term $2 $3]: ", |
1062 | 9.49M | options_.tablet_id, |
1063 | 9.49M | peer_uuid_, |
1064 | 9.49M | role_and_term.second, |
1065 | 9.49M | PeerRole_Name(role_and_term.first)); |
1066 | 9.49M | } |
1067 | | |
1068 | 90.8k | ReplicaState::State ReplicaState::state() const { |
1069 | 90.8k | DCHECK(IsLocked()); |
1070 | 90.8k | return state_; |
1071 | 90.8k | } |
1072 | | |
1073 | 0 | string ReplicaState::ToString() const { |
1074 | 0 | ThreadRestrictions::AssertWaitAllowed(); |
1075 | 0 | ReplicaState::UniqueLock lock(update_lock_); |
1076 | 0 | return ToStringUnlocked(); |
1077 | 0 | } |
1078 | | |
1079 | 223k | string ReplicaState::ToStringUnlocked() const { |
1080 | 223k | DCHECK(IsLocked()); |
1081 | 223k | return Format( |
1082 | 223k | "Replica: $0, State: $1, Role: $2, Watermarks: {Received: $3 Committed: $4} Leader: $5", |
1083 | 223k | peer_uuid_, state_, PeerRole_Name(GetActiveRoleUnlocked()), |
1084 | 223k | last_received_op_id_, last_committed_op_id_, last_received_op_id_current_leader_); |
1085 | 223k | } |
1086 | | |
1087 | 24.3M | Status ReplicaState::CheckOpInSequence(const yb::OpId& previous, const yb::OpId& current) { |
1088 | 24.3M | if (current.term < previous.term) { |
1089 | 1 | return STATUS_FORMAT( |
1090 | 1 | Corruption, |
1091 | 1 | "New operation's term is not >= than the previous op's term. Current: $0. Previous: $1", |
1092 | 1 | current, previous); |
1093 | 1 | } |
1094 | | |
1095 | 24.3M | if (current.index != previous.index + 1) { |
1096 | 1 | return STATUS_FORMAT( |
1097 | 1 | Corruption, |
1098 | 1 | "New operation's index does not follow the previous op's index. Current: $0. Previous: $1", |
1099 | 1 | current, previous); |
1100 | 1 | } |
1101 | 24.3M | return Status::OK(); |
1102 | 24.3M | } |
1103 | | |
1104 | | void ReplicaState::UpdateOldLeaderLeaseExpirationOnNonLeaderUnlocked( |
1105 | 25.4M | const CoarseTimeLease& lease, const PhysicalComponentLease& ht_lease) { |
1106 | 25.4M | old_leader_lease_.TryUpdate(lease); |
1107 | 25.4M | old_leader_ht_lease_.TryUpdate(ht_lease); |
1108 | | |
1109 | | // Reset our lease, since we are non leader now. I.e. follower or candidate. |
1110 | 25.4M | auto existing_lease = majority_replicated_lease_expiration_; |
1111 | 25.4M | if (existing_lease != CoarseTimeLease::NoneValue()) { |
1112 | 159k | LOG_WITH_PREFIX(INFO) |
1113 | 159k | << "Reset our lease: " << MonoDelta(CoarseMonoClock::now() - existing_lease); |
1114 | 159k | majority_replicated_lease_expiration_ = CoarseTimeLease::NoneValue(); |
1115 | 159k | } |
1116 | | |
1117 | 25.4M | auto existing_ht_lease = majority_replicated_ht_lease_expiration_.load(std::memory_order_acquire); |
1118 | 25.4M | if (existing_ht_lease != PhysicalComponentLease::NoneValue()) { |
1119 | 10.8k | LOG_WITH_PREFIX(INFO) << "Reset our ht lease: " << HybridTime::FromMicros(existing_ht_lease); |
1120 | 10.8k | majority_replicated_ht_lease_expiration_.store(PhysicalComponentLease::NoneValue(), |
1121 | 10.8k | std::memory_order_release); |
1122 | 10.8k | cond_.notify_all(); |
1123 | 10.8k | } |
1124 | 25.4M | } |
1125 | | |
1126 | | template <class Policy> |
1127 | 94.2M | LeaderLeaseStatus ReplicaState::GetLeaseStatusUnlocked(Policy policy) const { |
1128 | 94.2M | DCHECK_EQ(GetActiveRoleUnlocked(), PeerRole::LEADER); |
1129 | | |
1130 | 94.2M | if (!policy.Enabled()) { |
1131 | 1.37k | return LeaderLeaseStatus::HAS_LEASE; |
1132 | 1.37k | } |
1133 | | |
1134 | 94.2M | if (GetActiveConfigUnlocked().peers_size() == 1) { |
1135 | | // It is OK that majority_replicated_lease_expiration_ might be undefined in this case, because |
1136 | | // we are only reading it in this function (as of 08/09/2017). |
1137 | 8.91M | return LeaderLeaseStatus::HAS_LEASE; |
1138 | 8.91M | } |
1139 | | |
1140 | 85.2M | if (!policy.OldLeaderLeaseExpired()) { |
1141 | 20.2k | return LeaderLeaseStatus::OLD_LEADER_MAY_HAVE_LEASE; |
1142 | 20.2k | } |
1143 | | |
1144 | 85.2M | if (policy.MajorityReplicatedLeaseExpired()) { |
1145 | 26.1k | return LeaderLeaseStatus::NO_MAJORITY_REPLICATED_LEASE; |
1146 | 26.1k | } |
1147 | | |
1148 | 85.2M | return LeaderLeaseStatus::HAS_LEASE; |
1149 | 85.2M | } yb::consensus::LeaderLeaseStatus yb::consensus::ReplicaState::GetLeaseStatusUnlocked<yb::consensus::GetLeaderLeaseStatusPolicy>(yb::consensus::GetLeaderLeaseStatusPolicy) const Line | Count | Source | 1127 | 89.2M | LeaderLeaseStatus ReplicaState::GetLeaseStatusUnlocked(Policy policy) const { | 1128 | 89.2M | DCHECK_EQ(GetActiveRoleUnlocked(), PeerRole::LEADER); | 1129 | | | 1130 | 89.2M | if (!policy.Enabled()) { | 1131 | 0 | return LeaderLeaseStatus::HAS_LEASE; | 1132 | 0 | } | 1133 | | | 1134 | 89.2M | if (GetActiveConfigUnlocked().peers_size() == 1) { | 1135 | | // It is OK that majority_replicated_lease_expiration_ might be undefined in this case, because | 1136 | | // we are only reading it in this function (as of 08/09/2017). | 1137 | 8.49M | return LeaderLeaseStatus::HAS_LEASE; | 1138 | 8.49M | } | 1139 | | | 1140 | 80.7M | if (!policy.OldLeaderLeaseExpired()) { | 1141 | 20.2k | return LeaderLeaseStatus::OLD_LEADER_MAY_HAVE_LEASE; | 1142 | 20.2k | } | 1143 | | | 1144 | 80.7M | if (policy.MajorityReplicatedLeaseExpired()) { | 1145 | 26.1k | return LeaderLeaseStatus::NO_MAJORITY_REPLICATED_LEASE; | 1146 | 26.1k | } | 1147 | | | 1148 | 80.7M | return LeaderLeaseStatus::HAS_LEASE; | 1149 | 80.7M | } |
yb::consensus::LeaderLeaseStatus yb::consensus::ReplicaState::GetLeaseStatusUnlocked<yb::consensus::GetHybridTimeLeaseStatusAtPolicy>(yb::consensus::GetHybridTimeLeaseStatusAtPolicy) const Line | Count | Source | 1127 | 4.92M | LeaderLeaseStatus ReplicaState::GetLeaseStatusUnlocked(Policy policy) const { | 1128 | 4.92M | DCHECK_EQ(GetActiveRoleUnlocked(), PeerRole::LEADER); | 1129 | | | 1130 | 4.92M | if (!policy.Enabled()) { | 1131 | 1.37k | return LeaderLeaseStatus::HAS_LEASE; | 1132 | 1.37k | } | 1133 | | | 1134 | 4.92M | if (GetActiveConfigUnlocked().peers_size() == 1) { | 1135 | | // It is OK that majority_replicated_lease_expiration_ might be undefined in this case, because | 1136 | | // we are only reading it in this function (as of 08/09/2017). | 1137 | 416k | return LeaderLeaseStatus::HAS_LEASE; | 1138 | 416k | } | 1139 | | | 1140 | 4.50M | if (!policy.OldLeaderLeaseExpired()) { | 1141 | 0 | return LeaderLeaseStatus::OLD_LEADER_MAY_HAVE_LEASE; | 1142 | 0 | } | 1143 | | | 1144 | 4.50M | if (policy.MajorityReplicatedLeaseExpired()) { | 1145 | 28 | return LeaderLeaseStatus::NO_MAJORITY_REPLICATED_LEASE; | 1146 | 28 | } | 1147 | | | 1148 | 4.50M | return LeaderLeaseStatus::HAS_LEASE; | 1149 | 4.50M | } |
|
1150 | | |
1151 | | // Policy that is used during leader lease calculation. |
1152 | | struct GetLeaderLeaseStatusPolicy { |
1153 | | const ReplicaState* replica_state; |
1154 | | MonoDelta* remaining_old_leader_lease; |
1155 | | CoarseTimePoint* now; |
1156 | | |
1157 | | GetLeaderLeaseStatusPolicy( |
1158 | | const ReplicaState* replica_state_, MonoDelta* remaining_old_leader_lease_, |
1159 | | CoarseTimePoint* now_) |
1160 | | : replica_state(replica_state_), remaining_old_leader_lease(remaining_old_leader_lease_), |
1161 | 89.2M | now(now_) { |
1162 | 89.2M | if (remaining_old_leader_lease) { |
1163 | 84.3M | *remaining_old_leader_lease = 0s; |
1164 | 84.3M | } |
1165 | 89.2M | } |
1166 | | |
1167 | 80.7M | bool OldLeaderLeaseExpired() { |
1168 | 80.7M | const auto remaining_old_leader_lease_duration = |
1169 | 80.7M | replica_state->RemainingOldLeaderLeaseDuration(now); |
1170 | 80.7M | if (remaining_old_leader_lease_duration) { |
1171 | 20.2k | if (remaining_old_leader_lease) { |
1172 | 20.1k | *remaining_old_leader_lease = remaining_old_leader_lease_duration; |
1173 | 20.1k | } |
1174 | 20.2k | return false; |
1175 | 20.2k | } |
1176 | 80.7M | return true; |
1177 | 80.7M | } |
1178 | | |
1179 | 80.7M | bool MajorityReplicatedLeaseExpired() { |
1180 | 80.7M | return replica_state->MajorityReplicatedLeaderLeaseExpired(now); |
1181 | 80.7M | } |
1182 | | |
1183 | 89.2M | bool Enabled() { |
1184 | 89.2M | return true; |
1185 | 89.2M | } |
1186 | | }; |
1187 | | |
1188 | 80.7M | bool ReplicaState::MajorityReplicatedLeaderLeaseExpired(CoarseTimePoint* now) const { |
1189 | 80.7M | if (majority_replicated_lease_expiration_ == CoarseTimePoint()) { |
1190 | 0 | return true; |
1191 | 0 | } |
1192 | | |
1193 | 80.7M | if (*now == CoarseTimePoint()) { |
1194 | 80.7M | *now = CoarseMonoClock::Now(); |
1195 | 80.7M | } |
1196 | | |
1197 | 80.7M | return *now >= majority_replicated_lease_expiration_; |
1198 | 80.7M | } |
1199 | | |
1200 | | LeaderLeaseStatus ReplicaState::GetLeaderLeaseStatusUnlocked( |
1201 | 89.2M | MonoDelta* remaining_old_leader_lease, CoarseTimePoint* now) const { |
1202 | 89.2M | if (now == nullptr) { |
1203 | 54.5M | CoarseTimePoint local_now; |
1204 | 54.5M | return GetLeaseStatusUnlocked(GetLeaderLeaseStatusPolicy( |
1205 | 54.5M | this, remaining_old_leader_lease, &local_now)); |
1206 | 54.5M | } |
1207 | 34.7M | return GetLeaseStatusUnlocked(GetLeaderLeaseStatusPolicy(this, remaining_old_leader_lease, now)); |
1208 | 89.2M | } |
1209 | | |
1210 | 4.50M | bool ReplicaState::MajorityReplicatedHybridTimeLeaseExpiredAt(MicrosTime hybrid_time) const { |
1211 | 4.50M | return hybrid_time >= majority_replicated_ht_lease_expiration_; |
1212 | 4.50M | } |
1213 | | |
1214 | | struct GetHybridTimeLeaseStatusAtPolicy { |
1215 | | const ReplicaState* replica_state; |
1216 | | MicrosTime micros_time; |
1217 | | |
1218 | | GetHybridTimeLeaseStatusAtPolicy(const ReplicaState* rs, MicrosTime ht) |
1219 | 4.92M | : replica_state(rs), micros_time(ht) {} |
1220 | | |
1221 | 4.50M | bool OldLeaderLeaseExpired() { |
1222 | 4.50M | return micros_time > replica_state->old_leader_ht_lease().expiration; |
1223 | 4.50M | } |
1224 | | |
1225 | 4.50M | bool MajorityReplicatedLeaseExpired() { |
1226 | 4.50M | return replica_state->MajorityReplicatedHybridTimeLeaseExpiredAt(micros_time); |
1227 | 4.50M | } |
1228 | | |
1229 | 4.93M | bool Enabled() { |
1230 | 4.93M | return FLAGS_ht_lease_duration_ms != 0; |
1231 | 4.93M | } |
1232 | | }; |
1233 | | |
1234 | | LeaderLeaseStatus ReplicaState::GetHybridTimeLeaseStatusAtUnlocked( |
1235 | 4.92M | MicrosTime micros_time) const { |
1236 | 4.92M | return GetLeaseStatusUnlocked(GetHybridTimeLeaseStatusAtPolicy(this, micros_time)); |
1237 | 4.92M | } |
1238 | | |
1239 | 81.9M | MonoDelta ReplicaState::RemainingOldLeaderLeaseDuration(CoarseTimePoint* now) const { |
1240 | 81.9M | MonoDelta result; |
1241 | 81.9M | if (old_leader_lease_) { |
1242 | 43.2k | CoarseTimePoint now_local; |
1243 | 43.2k | if (!now) { |
1244 | 22.4k | now = &now_local; |
1245 | 22.4k | } |
1246 | 43.2k | *now = CoarseMonoClock::Now(); |
1247 | | |
1248 | 43.2k | if (*now > old_leader_lease_.expiration) { |
1249 | | // Reset the old leader lease expiration time so that we don't have to check it anymore. |
1250 | 2.17k | old_leader_lease_.Reset(); |
1251 | 41.0k | } else { |
1252 | 41.0k | result = old_leader_lease_.expiration - *now; |
1253 | 41.0k | } |
1254 | 43.2k | } |
1255 | | |
1256 | 81.9M | return result; |
1257 | 81.9M | } |
1258 | | |
1259 | | Result<MicrosTime> ReplicaState::MajorityReplicatedHtLeaseExpiration( |
1260 | 73.5M | MicrosTime min_allowed, CoarseTimePoint deadline) const { |
1261 | 73.5M | if (FLAGS_ht_lease_duration_ms == 0) { |
1262 | 2.81k | return kMaxHybridTimePhysicalMicros; |
1263 | 2.81k | } |
1264 | | |
1265 | 73.5M | auto result = majority_replicated_ht_lease_expiration_.load(std::memory_order_acquire); |
1266 | 73.5M | if (result >= min_allowed) { // Fast path |
1267 | 73.5M | return result; |
1268 | 73.5M | } |
1269 | | |
1270 | 21.2k | if (result != PhysicalComponentLease::NoneValue()) { |
1271 | | // Slow path |
1272 | 0 | UniqueLock l(update_lock_); |
1273 | 0 | auto predicate = [this, &result, min_allowed] { |
1274 | 0 | result = majority_replicated_ht_lease_expiration_.load(std::memory_order_acquire); |
1275 | 0 | return result >= min_allowed || result == PhysicalComponentLease::NoneValue(); |
1276 | 0 | }; |
1277 | 0 | if (deadline == CoarseTimePoint::max()) { |
1278 | 0 | cond_.wait(l, predicate); |
1279 | 0 | } else if (!cond_.wait_until(l, deadline, predicate)) { |
1280 | 0 | return STATUS_FORMAT(TimedOut, "Timed out waiting leader lease: $0", min_allowed); |
1281 | 0 | } |
1282 | 0 | } |
1283 | | |
1284 | 21.2k | if (result == PhysicalComponentLease::NoneValue()) { |
1285 | 3 | static const Status kNotLeaderStatus = STATUS(IllegalState, "Not a leader"); |
1286 | 3 | return kNotLeaderStatus; |
1287 | 3 | } |
1288 | | |
1289 | 21.2k | return result; |
1290 | 21.2k | } |
1291 | | |
1292 | | void ReplicaState::SetMajorityReplicatedLeaseExpirationUnlocked( |
1293 | | const MajorityReplicatedData& majority_replicated_data, |
1294 | 34.7M | EnumBitSet<SetMajorityReplicatedLeaseExpirationFlag> flags) { |
1295 | 34.7M | majority_replicated_lease_expiration_ = majority_replicated_data.leader_lease_expiration; |
1296 | 34.7M | majority_replicated_ht_lease_expiration_.store(majority_replicated_data.ht_lease_expiration, |
1297 | 34.7M | std::memory_order_release); |
1298 | | |
1299 | 34.7M | if (flags.Test(SetMajorityReplicatedLeaseExpirationFlag::kResetOldLeaderLease)) { |
1300 | 10.2k | LOG_WITH_PREFIX(INFO) |
1301 | 10.2k | << "Revoked old leader " << old_leader_lease_.holder_uuid << " lease: " |
1302 | 10.2k | << MonoDelta(old_leader_lease_.expiration - CoarseMonoClock::now()); |
1303 | 10.2k | old_leader_lease_.Reset(); |
1304 | 10.2k | } |
1305 | | |
1306 | 34.7M | if (flags.Test(SetMajorityReplicatedLeaseExpirationFlag::kResetOldLeaderHtLease)) { |
1307 | 10.7k | LOG_WITH_PREFIX(INFO) |
1308 | 10.7k | << "Revoked old leader " << old_leader_ht_lease_.holder_uuid << " ht lease: " |
1309 | 10.7k | << HybridTime::FromMicros(old_leader_ht_lease_.expiration); |
1310 | 10.7k | old_leader_ht_lease_.Reset(); |
1311 | 10.7k | } |
1312 | | |
1313 | 34.7M | CoarseTimePoint now; |
1314 | 34.7M | RefreshLeaderStateCacheUnlocked(&now); |
1315 | 34.7M | cond_.notify_all(); |
1316 | 34.7M | } |
1317 | | |
1318 | 8.07k | uint64_t ReplicaState::OnDiskSize() const { |
1319 | 8.07k | return cmeta_->on_disk_size(); |
1320 | 8.07k | } |
1321 | | |
1322 | 3.02M | bool ReplicaState::RegisterRetryableRequest(const ConsensusRoundPtr& round) { |
1323 | 3.02M | return retryable_requests_.Register(round); |
1324 | 3.02M | } |
1325 | | |
1326 | 50.1M | OpId ReplicaState::MinRetryableRequestOpId() { |
1327 | 50.1M | UniqueLock lock; |
1328 | 50.1M | auto status = LockForUpdate(&lock); |
1329 | 50.1M | if (!status.ok()) { |
1330 | 0 | return OpId::Min(); // return minimal op id, that prevents log from cleaning |
1331 | 0 | } |
1332 | 50.1M | return retryable_requests_.CleanExpiredReplicatedAndGetMinOpId(); |
1333 | 50.1M | } |
1334 | | |
1335 | | void ReplicaState::NotifyReplicationFinishedUnlocked( |
1336 | | const ConsensusRoundPtr& round, const Status& status, int64_t leader_term, |
1337 | 14.5M | OpIds* applied_op_ids) { |
1338 | 14.5M | round->NotifyReplicationFinished(status, leader_term, applied_op_ids); |
1339 | | |
1340 | 14.5M | retryable_requests_.ReplicationFinished(*round->replicate_msg(), status, leader_term); |
1341 | 14.5M | } |
1342 | | |
1343 | 35.7M | consensus::LeaderState ReplicaState::RefreshLeaderStateCacheUnlocked(CoarseTimePoint* now) const { |
1344 | 35.7M | auto result = GetLeaderStateUnlocked(LeaderLeaseCheckMode::NEED_LEASE, now); |
1345 | 35.7M | LeaderStateCache cache; |
1346 | 35.7M | if (result.status == LeaderStatus::LEADER_AND_READY) { |
1347 | 34.6M | cache.Set(result.status, result.term, majority_replicated_lease_expiration_); |
1348 | 34.6M | } else if (1.05M result.status == LeaderStatus::LEADER_BUT_OLD_LEADER_MAY_HAVE_LEASE1.05M ) { |
1349 | 10.0k | cache.Set(result.status, result.remaining_old_leader_lease.ToMicroseconds(), |
1350 | 10.0k | *now + result.remaining_old_leader_lease); |
1351 | 1.04M | } else { |
1352 | 1.04M | cache.Set(result.status, 0 /* extra_value */, CoarseTimePoint::max()); |
1353 | 1.04M | } |
1354 | | |
1355 | 35.7M | leader_state_cache_.store(cache, boost::memory_order_release); |
1356 | | |
1357 | 35.7M | return result; |
1358 | 35.7M | } |
1359 | | |
1360 | 123k | void ReplicaState::SetLeaderNoOpCommittedUnlocked(bool value) { |
1361 | 123k | LOG_WITH_PREFIX(INFO) |
1362 | 123k | << __func__ << "(" << value << "), committed: " << GetCommittedOpIdUnlocked() |
1363 | 123k | << ", received: " << GetLastReceivedOpIdUnlocked(); |
1364 | | |
1365 | 123k | leader_no_op_committed_ = value; |
1366 | 123k | CoarseTimePoint now; |
1367 | 123k | RefreshLeaderStateCacheUnlocked(&now); |
1368 | 123k | } |
1369 | | |
1370 | | } // namespace consensus |
1371 | | } // namespace yb |