YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/consensus/replica_state.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include "yb/consensus/replica_state.h"
34
35
#include <gflags/gflags.h>
36
37
#include "yb/consensus/consensus.h"
38
#include "yb/consensus/consensus_context.h"
39
#include "yb/consensus/consensus_round.h"
40
#include "yb/consensus/log_util.h"
41
#include "yb/consensus/quorum_util.h"
42
43
#include "yb/gutil/strings/substitute.h"
44
45
#include "yb/util/atomic.h"
46
#include "yb/util/debug/trace_event.h"
47
#include "yb/util/enums.h"
48
#include "yb/util/flag_tags.h"
49
#include "yb/util/format.h"
50
#include "yb/util/logging.h"
51
#include "yb/util/opid.h"
52
#include "yb/util/result.h"
53
#include "yb/util/status.h"
54
#include "yb/util/status_format.h"
55
#include "yb/util/status_log.h"
56
#include "yb/util/thread_restrictions.h"
57
#include "yb/util/tostring.h"
58
#include "yb/util/trace.h"
59
60
using namespace std::literals;
61
62
DEFINE_int32(inject_delay_commit_pre_voter_to_voter_secs, 0,
63
             "Amount of time to delay commit of a PRE_VOTER to VOTER transition. To be used for "
64
             "unit testing purposes only.");
65
TAG_FLAG(inject_delay_commit_pre_voter_to_voter_secs, unsafe);
66
TAG_FLAG(inject_delay_commit_pre_voter_to_voter_secs, hidden);
67
68
namespace yb {
69
namespace consensus {
70
71
using std::string;
72
using strings::Substitute;
73
using strings::SubstituteAndAppend;
74
75
//////////////////////////////////////////////////
76
// ReplicaState
77
//////////////////////////////////////////////////
78
79
ReplicaState::ReplicaState(
80
    ConsensusOptions options, string peer_uuid, std::unique_ptr<ConsensusMetadata> cmeta,
81
    ConsensusContext* consensus_context, SafeOpIdWaiter* safe_op_id_waiter,
82
    RetryableRequests* retryable_requests,
83
    std::function<void(const OpIds&)> applied_ops_tracker)
84
    : options_(std::move(options)),
85
      peer_uuid_(std::move(peer_uuid)),
86
      cmeta_(std::move(cmeta)),
87
      context_(consensus_context),
88
      safe_op_id_waiter_(safe_op_id_waiter),
89
150k
      applied_ops_tracker_(std::move(applied_ops_tracker)) {
90
18.4E
  CHECK(cmeta_) << "ConsensusMeta passed as NULL";
91
150k
  if (retryable_requests) {
92
142k
    retryable_requests_ = std::move(*retryable_requests);
93
142k
  }
94
95
150k
  CHECK(IsAcceptableAtomicImpl(leader_state_cache_));
96
97
  // Actually we don't need this lock, but GetActiveRoleUnlocked checks that we are holding the
98
  // lock.
99
150k
  auto lock = LockForRead();
100
150k
  CoarseTimePoint now;
101
150k
  RefreshLeaderStateCacheUnlocked(&now);
102
150k
}
103
104
75.6k
ReplicaState::~ReplicaState() {
105
75.6k
}
106
107
150k
Status ReplicaState::StartUnlocked(const OpIdPB& last_id_in_wal) {
108
150k
  DCHECK(IsLocked());
109
110
  // Our last persisted term can be higher than the last persisted operation
111
  // (i.e. if we called an election) but reverse should never happen.
112
150k
  CHECK_LE(last_id_in_wal.term(), GetCurrentTermUnlocked()) << LogPrefix()
113
0
      << "The last op in the WAL with id " << OpIdToString(last_id_in_wal)
114
0
      << " has a term (" << last_id_in_wal.term() << ") that is greater "
115
0
      << "than the latest recorded term, which is " << GetCurrentTermUnlocked();
116
117
150k
  next_index_ = last_id_in_wal.index() + 1;
118
119
150k
  last_received_op_id_ = yb::OpId::FromPB(last_id_in_wal);
120
121
150k
  state_ = kRunning;
122
150k
  return Status::OK();
123
150k
}
124
125
150k
Status ReplicaState::LockForStart(UniqueLock* lock) const {
126
150k
  ThreadRestrictions::AssertWaitAllowed();
127
150k
  UniqueLock l(update_lock_);
128
150k
  CHECK_EQ(state_, kInitialized) << "Illegal state for Start()."
129
0
      << " Replica is not in kInitialized state";
130
150k
  lock->swap(l);
131
150k
  return Status::OK();
132
150k
}
133
134
1.49G
bool ReplicaState::IsLocked() const {
135
1.49G
  std::unique_lock<std::mutex> lock(update_lock_, std::try_to_lock);
136
1.49G
  return !lock.owns_lock();
137
1.49G
}
138
139
206M
ReplicaState::UniqueLock ReplicaState::LockForRead() const {
140
206M
  ThreadRestrictions::AssertWaitAllowed();
141
206M
  return UniqueLock(update_lock_);
142
206M
}
143
144
0
Status ReplicaState::LockForReplicate(UniqueLock* lock, const ReplicateMsg& msg) const {
145
0
  DCHECK(!msg.has_id()) << "Should not have an ID yet: " << msg.ShortDebugString();
146
0
  CHECK(msg.has_op_type());  // TODO: better checking?
147
0
  return LockForReplicate(lock);
148
0
}
149
150
4.93M
Status ReplicaState::LockForReplicate(UniqueLock* lock) const {
151
4.93M
  ThreadRestrictions::AssertWaitAllowed();
152
4.93M
  UniqueLock l(update_lock_);
153
4.93M
  if (PREDICT_FALSE(state_ != kRunning)) {
154
0
    return STATUS(IllegalState, "Replica not in running state");
155
0
  }
156
157
4.93M
  lock->swap(l);
158
4.93M
  return Status::OK();
159
4.93M
}
160
161
10.3M
Status ReplicaState::CheckIsActiveLeaderAndHasLease() const {
162
10.3M
  UniqueLock l(update_lock_);
163
10.3M
  if (PREDICT_FALSE(state_ != kRunning)) {
164
0
    return STATUS(IllegalState, "Replica not in running state");
165
0
  }
166
10.3M
  return CheckActiveLeaderUnlocked(LeaderLeaseCheckMode::NEED_LEASE);
167
10.3M
}
168
169
34.7M
Status ReplicaState::LockForMajorityReplicatedIndexUpdate(UniqueLock* lock) const {
170
34.7M
  TRACE_EVENT0("consensus", "ReplicaState::LockForMajorityReplicatedIndexUpdate");
171
34.7M
  ThreadRestrictions::AssertWaitAllowed();
172
34.7M
  UniqueLock l(update_lock_);
173
174
34.7M
  if (PREDICT_FALSE(state_ != kRunning)) {
175
14
    return STATUS(IllegalState, "Replica not in running state");
176
14
  }
177
178
34.7M
  if (PREDICT_FALSE(GetActiveRoleUnlocked() != PeerRole::LEADER)) {
179
114
    return STATUS(IllegalState, "Replica not LEADER");
180
114
  }
181
34.7M
  lock->swap(l);
182
34.7M
  return Status::OK();
183
34.7M
}
184
185
81.9M
LeaderState ReplicaState::GetLeaderState(bool allow_stale) const {
186
81.9M
  auto cache = leader_state_cache_.load(boost::memory_order_acquire);
187
188
81.9M
  if (!allow_stale) {
189
28.6M
    CoarseTimePoint now = CoarseMonoClock::Now();
190
28.6M
    if (now >= cache.expire_at) {
191
1.22k
      auto lock = LockForRead();
192
1.22k
      return RefreshLeaderStateCacheUnlocked(&now);
193
1.22k
    }
194
28.6M
  }
195
196
81.9M
  LeaderState result = {cache.status()};
197
81.9M
  if (result.status == LeaderStatus::LEADER_AND_READY) {
198
50.0M
    result.term = cache.extra_value();
199
50.0M
  } else {
200
31.8M
    if (result.status == LeaderStatus::LEADER_BUT_OLD_LEADER_MAY_HAVE_LEASE) {
201
12.8k
      result.remaining_old_leader_lease = MonoDelta::FromMicroseconds(cache.extra_value());
202
12.8k
    }
203
31.8M
    result.MakeNotReadyLeader(result.status);
204
31.8M
  }
205
206
81.9M
  return result;
207
81.9M
}
208
209
LeaderState ReplicaState::GetLeaderStateUnlocked(
210
96.7M
    LeaderLeaseCheckMode lease_check_mode, CoarseTimePoint* now) const {
211
96.7M
  LeaderState result;
212
213
96.7M
  if (GetActiveRoleUnlocked() != PeerRole::LEADER) {
214
9.22M
    return result.MakeNotReadyLeader(LeaderStatus::NOT_LEADER);
215
9.22M
  }
216
217
87.5M
  if (!leader_no_op_committed_) {
218
    // This will cause the client to retry on the same server (won't try to find the new leader).
219
308k
    return result.MakeNotReadyLeader(LeaderStatus::LEADER_BUT_NO_OP_NOT_COMMITTED);
220
308k
  }
221
222
87.2M
  const auto lease_status = lease_check_mode != LeaderLeaseCheckMode::DONT_NEED_LEASE
223
87.2M
      ? 
GetLeaderLeaseStatusUnlocked(&result.remaining_old_leader_lease, now)84.3M
224
87.2M
      : 
LeaderLeaseStatus::HAS_LEASE2.91M
;
225
87.2M
  switch (lease_status) {
226
20.0k
    case LeaderLeaseStatus::OLD_LEADER_MAY_HAVE_LEASE:
227
      // Will retry on the same server.
228
20.0k
      VLOG(1) << "Old leader lease might still be active for "
229
0
              << result.remaining_old_leader_lease.ToString();
230
20.0k
      return result.MakeNotReadyLeader(
231
20.0k
          LeaderStatus::LEADER_BUT_OLD_LEADER_MAY_HAVE_LEASE);
232
233
21.8k
    case LeaderLeaseStatus::NO_MAJORITY_REPLICATED_LEASE:
234
      // Will retry to look up the leader, because it might have changed.
235
21.8k
      return result.MakeNotReadyLeader(
236
21.8k
          LeaderStatus::LEADER_BUT_NO_MAJORITY_REPLICATED_LEASE);
237
238
87.2M
    case LeaderLeaseStatus::HAS_LEASE:
239
87.2M
      result.status = LeaderStatus::LEADER_AND_READY;
240
87.2M
      result.term = GetCurrentTermUnlocked();
241
87.2M
      return result;
242
87.2M
  }
243
244
0
  FATAL_INVALID_ENUM_VALUE(LeaderLeaseStatus, lease_status);
245
0
}
246
247
13.2M
Status ReplicaState::CheckActiveLeaderUnlocked(LeaderLeaseCheckMode lease_check_mode) const {
248
13.2M
  auto state = GetLeaderStateUnlocked(lease_check_mode);
249
13.2M
  if (state.status == LeaderStatus::NOT_LEADER) {
250
814
    ConsensusStatePB cstate = ConsensusStateUnlocked(CONSENSUS_CONFIG_ACTIVE);
251
814
    return STATUS_FORMAT(IllegalState,
252
814
                         "Replica $0 is not leader of this config. Role: $1. Consensus state: $2",
253
814
                         peer_uuid_, PeerRole_Name(GetActiveRoleUnlocked()), cstate);
254
814
  }
255
256
13.2M
  return state.CreateStatus();
257
13.2M
}
258
259
6.41M
Status ReplicaState::LockForConfigChange(UniqueLock* lock) const {
260
6.41M
  TRACE_EVENT0("consensus", "ReplicaState::LockForConfigChange");
261
262
6.41M
  ThreadRestrictions::AssertWaitAllowed();
263
6.41M
  UniqueLock l(update_lock_);
264
  // Can only change the config on running replicas.
265
6.41M
  if (PREDICT_FALSE(state_ != kRunning)) {
266
9
    return STATUS(IllegalState, "Unable to lock ReplicaState for config change",
267
9
                                Substitute("State = $0", state_));
268
9
  }
269
6.41M
  lock->swap(l);
270
6.41M
  return Status::OK();
271
6.41M
}
272
273
75.6M
Status ReplicaState::LockForUpdate(UniqueLock* lock) const {
274
75.6M
  TRACE_EVENT0("consensus", "ReplicaState::LockForUpdate");
275
75.6M
  ThreadRestrictions::AssertWaitAllowed();
276
75.6M
  UniqueLock l(update_lock_);
277
75.6M
  if (PREDICT_FALSE(state_ != kRunning)) {
278
11
    return STATUS(IllegalState, "Replica not in running state");
279
11
  }
280
75.6M
  lock->swap(l);
281
75.6M
  return Status::OK();
282
75.6M
}
283
284
151k
Status ReplicaState::LockForShutdown(UniqueLock* lock) {
285
151k
  TRACE_EVENT0("consensus", "ReplicaState::LockForShutdown");
286
151k
  ThreadRestrictions::AssertWaitAllowed();
287
151k
  UniqueLock l(update_lock_);
288
151k
  if (state_ != kShuttingDown && 
state_ != kShutDown75.6k
) {
289
75.6k
    state_ = kShuttingDown;
290
75.6k
  }
291
151k
  lock->swap(l);
292
151k
  return Status::OK();
293
151k
}
294
295
75.6k
Status ReplicaState::ShutdownUnlocked() {
296
75.6k
  DCHECK(IsLocked());
297
75.6k
  CHECK_EQ(state_, kShuttingDown);
298
75.6k
  state_ = kShutDown;
299
75.6k
  return Status::OK();
300
75.6k
}
301
302
46.5M
ConsensusStatePB ReplicaState::ConsensusStateUnlocked(ConsensusConfigType type) const {
303
46.5M
  return cmeta_->ToConsensusStatePB(type);
304
46.5M
}
305
306
266M
PeerRole ReplicaState::GetActiveRoleUnlocked() const {
307
266M
  DCHECK(IsLocked());
308
266M
  return cmeta_->active_role();
309
266M
}
310
311
8.81M
bool ReplicaState::IsConfigChangePendingUnlocked() const {
312
8.81M
  DCHECK(IsLocked());
313
8.81M
  return cmeta_->has_pending_config();
314
8.81M
}
315
316
14.0k
Status ReplicaState::CheckNoConfigChangePendingUnlocked() const {
317
14.0k
  DCHECK(IsLocked());
318
14.0k
  if (IsConfigChangePendingUnlocked()) {
319
317
    return STATUS(IllegalState,
320
317
        Substitute("RaftConfig change currently pending. Only one is allowed at a time.\n"
321
317
                   "  Committed config: $0.\n  Pending config: $1",
322
317
                   GetCommittedConfigUnlocked().ShortDebugString(),
323
317
                   GetPendingConfigUnlocked().ShortDebugString()));
324
317
  }
325
13.6k
  return Status::OK();
326
14.0k
}
327
328
19.5k
Status ReplicaState::SetPendingConfigUnlocked(const RaftConfigPB& new_config) {
329
19.5k
  DCHECK(IsLocked());
330
19.5k
  RETURN_NOT_OK_PREPEND(VerifyRaftConfig(new_config, UNCOMMITTED_QUORUM),
331
19.5k
                        "Invalid config to set as pending");
332
19.5k
  if (!new_config.unsafe_config_change()) {
333
19.5k
    CHECK(!cmeta_->has_pending_config())
334
2
        << "Attempt to set pending config while another is already pending! "
335
2
        << "Existing pending config: " << cmeta_->pending_config().ShortDebugString() << "; "
336
2
        << "Attempted new pending config: " << new_config.ShortDebugString();
337
19.5k
  } else 
if (50
cmeta_->has_pending_config()50
) {
338
0
    LOG_WITH_PREFIX(INFO) << "Allowing unsafe config change even though there is a pending config! "
339
0
                          << "Existing pending config: "
340
0
                          << cmeta_->pending_config().ShortDebugString() << "; "
341
0
                          << "New pending config: " << new_config.ShortDebugString();
342
0
  }
343
19.5k
  cmeta_->set_pending_config(new_config);
344
19.5k
  CoarseTimePoint now;
345
19.5k
  RefreshLeaderStateCacheUnlocked(&now);
346
19.5k
  return Status::OK();
347
19.5k
}
348
349
19
Status ReplicaState::ClearPendingConfigUnlocked() {
350
19
  DCHECK(IsLocked());
351
19
  if (!cmeta_->has_pending_config()) {
352
0
    LOG(WARNING) << "Attempt to clear a non-existent pending config."
353
0
                 << "Existing committed config: " << cmeta_->committed_config().ShortDebugString();
354
0
    return STATUS(IllegalState, "Attempt to clear a non-existent pending config.");
355
0
  }
356
19
  cmeta_->clear_pending_config();
357
19
  CoarseTimePoint now;
358
19
  RefreshLeaderStateCacheUnlocked(&now);
359
19
  return Status::OK();
360
19
}
361
362
2.95M
const RaftConfigPB& ReplicaState::GetPendingConfigUnlocked() const {
363
2.95M
  DCHECK(IsLocked());
364
2.95M
  CHECK
(IsConfigChangePendingUnlocked()) << "No pending config"0
;
365
2.95M
  return cmeta_->pending_config();
366
2.95M
}
367
368
19.5k
Status ReplicaState::SetCommittedConfigUnlocked(const RaftConfigPB& config_to_commit) {
369
19.5k
  TRACE_EVENT0("consensus", "ReplicaState::SetCommittedConfigUnlocked");
370
19.5k
  DCHECK(IsLocked());
371
19.5k
  DCHECK(config_to_commit.IsInitialized());
372
19.5k
  RETURN_NOT_OK_PREPEND(
373
19.5k
      VerifyRaftConfig(config_to_commit, COMMITTED_QUORUM), "Invalid config to set as committed");
374
375
  // Compare committed with pending configuration, ensure they are the same.
376
  // In the event of an unsafe config change triggered by an administrator,
377
  // it is possible that the config being committed may not match the pending config
378
  // because unsafe config change allows multiple pending configs to exist.
379
  // Therefore we only need to validate that 'config_to_commit' matches the pending config
380
  // if the pending config does not have its 'unsafe_config_change' flag set.
381
19.5k
  if (IsConfigChangePendingUnlocked()) {
382
19.5k
    const RaftConfigPB& pending_config = GetPendingConfigUnlocked();
383
19.5k
    if (!pending_config.unsafe_config_change()) {
384
      // Pending will not have an opid_index, so ignore that field.
385
19.4k
      RaftConfigPB config_no_opid = config_to_commit;
386
19.4k
      config_no_opid.clear_opid_index();
387
      // Quorums must be exactly equal, even w.r.t. peer ordering.
388
19.4k
      CHECK_EQ(GetPendingConfigUnlocked().SerializeAsString(), config_no_opid.SerializeAsString())
389
0
          << Substitute(
390
0
                 "New committed config must equal pending config, but does not. "
391
0
                 "Pending config: $0, committed config: $1",
392
0
                 pending_config.ShortDebugString(), config_to_commit.ShortDebugString());
393
19.4k
    }
394
19.5k
  }
395
19.5k
  cmeta_->set_committed_config(config_to_commit);
396
19.5k
  cmeta_->clear_pending_config();
397
19.5k
  CoarseTimePoint now;
398
19.5k
  RefreshLeaderStateCacheUnlocked(&now);
399
19.5k
  CHECK_OK(cmeta_->Flush());
400
19.5k
  return Status::OK();
401
19.5k
}
402
403
80.8M
const RaftConfigPB& ReplicaState::GetCommittedConfigUnlocked() const {
404
80.8M
  DCHECK(IsLocked());
405
80.8M
  return cmeta_->committed_config();
406
80.8M
}
407
408
100M
const RaftConfigPB& ReplicaState::GetActiveConfigUnlocked() const {
409
100M
  DCHECK(IsLocked());
410
100M
  return cmeta_->active_config();
411
100M
}
412
413
33.8M
bool ReplicaState::IsOpCommittedOrPending(const OpId& op_id, bool* term_mismatch) {
414
33.8M
  DCHECK(IsLocked());
415
416
33.8M
  *term_mismatch = false;
417
418
33.8M
  if (cmeta_ == nullptr) {
419
0
    LOG(FATAL) << "cmeta_ cannot be NULL";
420
0
  }
421
422
33.8M
  int64_t committed_index = GetCommittedOpIdUnlocked().index;
423
33.8M
  if (op_id.index <= committed_index) {
424
17.4M
    return true;
425
17.4M
  }
426
427
16.3M
  int64_t last_received_index = GetLastReceivedOpIdUnlocked().index;
428
16.3M
  if (op_id.index > last_received_index) {
429
8.40M
    return false;
430
8.40M
  }
431
432
7.93M
  scoped_refptr<ConsensusRound> round = GetPendingOpByIndexOrNullUnlocked(op_id.index);
433
7.93M
  if (round == nullptr) {
434
0
    LOG_WITH_PREFIX(ERROR)
435
0
        << "Consensus round not found for op id " << op_id << ": "
436
0
        << "committed_index=" << committed_index << ", "
437
0
        << "last_received_index=" << last_received_index << ", "
438
0
        << "tablet: " << options_.tablet_id << ", current state: "
439
0
        << ToStringUnlocked();
440
0
    DumpPendingOperationsUnlocked();
441
0
    CHECK(false);
442
0
  }
443
444
7.93M
  if (round->id().term != op_id.term) {
445
168
    *term_mismatch = true;
446
168
    return false;
447
168
  }
448
7.93M
  return true;
449
7.93M
}
450
451
757k
Status ReplicaState::SetCurrentTermUnlocked(int64_t new_term) {
452
757k
  TRACE_EVENT1("consensus", "ReplicaState::SetCurrentTermUnlocked",
453
757k
               "term", new_term);
454
757k
  DCHECK(IsLocked());
455
757k
  if (PREDICT_FALSE(new_term <= GetCurrentTermUnlocked())) {
456
0
    return STATUS(IllegalState,
457
0
        Substitute("Cannot change term to a term that is lower than or equal to the current one. "
458
0
                   "Current: $0, Proposed: $1", GetCurrentTermUnlocked(), new_term));
459
0
  }
460
757k
  cmeta_->set_current_term(new_term);
461
757k
  cmeta_->clear_voted_for();
462
  // OK to flush before clearing the leader, because the leader UUID is not part of
463
  // ConsensusMetadataPB.
464
757k
  RETURN_NOT_OK(cmeta_->Flush());
465
182k
  ClearLeaderUnlocked();
466
182k
  last_received_op_id_current_leader_ = yb::OpId();
467
182k
  return Status::OK();
468
757k
}
469
470
246M
const int64_t ReplicaState::GetCurrentTermUnlocked() const {
471
246M
  DCHECK(IsLocked());
472
246M
  return cmeta_->current_term();
473
246M
}
474
475
676k
void ReplicaState::SetLeaderUuidUnlocked(const std::string& uuid) {
476
676k
  DCHECK(IsLocked());
477
676k
  cmeta_->set_leader_uuid(uuid);
478
676k
  CoarseTimePoint now;
479
676k
  RefreshLeaderStateCacheUnlocked(&now);
480
676k
}
481
482
104M
const string& ReplicaState::GetLeaderUuidUnlocked() const {
483
104M
  DCHECK(IsLocked());
484
104M
  return cmeta_->leader_uuid();
485
104M
}
486
487
62.1k
const bool ReplicaState::HasVotedCurrentTermUnlocked() const {
488
62.1k
  DCHECK(IsLocked());
489
62.1k
  return cmeta_->has_voted_for();
490
62.1k
}
491
492
170k
Status ReplicaState::SetVotedForCurrentTermUnlocked(const std::string& uuid) {
493
170k
  TRACE_EVENT1("consensus", "ReplicaState::SetVotedForCurrentTermUnlocked",
494
170k
               "uuid", uuid);
495
170k
  DCHECK(IsLocked());
496
170k
  cmeta_->set_voted_for(uuid);
497
170k
  CHECK_OK(cmeta_->Flush());
498
170k
  return Status::OK();
499
170k
}
500
501
2.53k
const std::string& ReplicaState::GetVotedForCurrentTermUnlocked() const {
502
2.53k
  DCHECK(IsLocked());
503
2.53k
  DCHECK(cmeta_->has_voted_for());
504
2.53k
  return cmeta_->voted_for();
505
2.53k
}
506
507
29.1M
const string& ReplicaState::GetPeerUuid() const {
508
29.1M
  return peer_uuid_;
509
29.1M
}
510
511
808k
const ConsensusOptions& ReplicaState::GetOptions() const {
512
808k
  return options_;
513
808k
}
514
515
0
void ReplicaState::DumpPendingOperationsUnlocked() {
516
0
  DCHECK(IsLocked());
517
0
  LOG_WITH_PREFIX(INFO) << "Dumping " << pending_operations_.size()
518
0
                                 << " pending operations.";
519
0
  for (const auto &round : pending_operations_) {
520
0
    LOG_WITH_PREFIX(INFO) << round->replicate_msg()->ShortDebugString();
521
0
  }
522
0
}
523
524
75.6k
Status ReplicaState::CancelPendingOperations() {
525
75.6k
  {
526
75.6k
    ThreadRestrictions::AssertWaitAllowed();
527
75.6k
    UniqueLock lock(update_lock_);
528
75.6k
    if (state_ != kShuttingDown) {
529
0
      return STATUS(IllegalState, "Can only wait for pending commits on kShuttingDown state.");
530
0
    }
531
75.6k
    if (pending_operations_.empty()) {
532
75.4k
      return Status::OK();
533
75.4k
    }
534
535
222
    LOG_WITH_PREFIX(INFO) << "Trying to abort " << pending_operations_.size()
536
222
                          << " pending operations because of shutdown.";
537
222
    auto abort_status = STATUS(Aborted, "Operation aborted");
538
222
    int i = 0;
539
613
    for (size_t idx = pending_operations_.size(); idx > 0; ) {
540
391
      --idx;
541
      // We cancel only operations whose applies have not yet been triggered.
542
391
      constexpr auto kLogAbortedOperationsNum = 10;
543
391
      if (++i <= kLogAbortedOperationsNum) {
544
382
        LOG_WITH_PREFIX(INFO) << "Aborting operation because of shutdown: "
545
382
                              << pending_operations_[idx]->replicate_msg()->ShortDebugString();
546
382
      }
547
391
      NotifyReplicationFinishedUnlocked(pending_operations_[idx], abort_status, OpId::kUnknownTerm,
548
391
                                        nullptr /* applied_op_ids */);
549
391
    }
550
222
  }
551
0
  return Status::OK();
552
75.6k
}
553
554
struct PendingOperationsComparator {
555
0
  bool operator()(const ConsensusRoundPtr& lhs, int64_t rhs) const {
556
0
    return lhs->id().index < rhs;
557
0
  }
558
559
0
  bool operator()(int64_t lhs, const ConsensusRoundPtr& rhs) const {
560
0
    return lhs < rhs->id().index;
561
0
  }
562
};
563
564
7.94M
ReplicaState::PendingOperations::iterator ReplicaState::FindPendingOperation(int64_t index) {
565
7.94M
  if (pending_operations_.empty()) {
566
0
    return pending_operations_.end();
567
0
  }
568
569
7.94M
  size_t offset = index - pending_operations_.front()->id().index;
570
  // If index < pending_operations_.front()->id().index() then offset will be very big positive
571
  // number, so could check both bounds in one comparison.
572
7.94M
  if (offset >= pending_operations_.size()) {
573
164
    return pending_operations_.end();
574
164
  }
575
576
7.94M
  auto result = pending_operations_.begin() + offset;
577
7.94M
  DCHECK_EQ((**result).id().index, index);
578
7.94M
  return result;
579
7.94M
}
580
581
168
Status ReplicaState::AbortOpsAfterUnlocked(int64_t new_preceding_idx) {
582
168
  DCHECK(IsLocked());
583
168
  LOG_WITH_PREFIX(INFO)
584
168
      << "Aborting all operations after (but not including): "
585
168
      << new_preceding_idx << ". Current State: " << ToStringUnlocked();
586
587
168
  DCHECK_GE(new_preceding_idx, 0);
588
168
  yb::OpId new_preceding;
589
590
168
  auto preceding_op_iter = FindPendingOperation(new_preceding_idx);
591
592
  // Either the new preceding id is in the pendings set or it must be equal to the
593
  // committed index since we can't truncate already committed operations.
594
168
  if (preceding_op_iter != pending_operations_.end()) {
595
4
    new_preceding = (**preceding_op_iter).id();
596
4
    ++preceding_op_iter;
597
164
  } else {
598
164
    CHECK_EQ(new_preceding_idx, last_committed_op_id_.index);
599
164
    new_preceding = last_committed_op_id_;
600
164
    if (!pending_operations_.empty() &&
601
164
        pending_operations_.front()->id().index > new_preceding_idx) {
602
164
      preceding_op_iter = pending_operations_.begin();
603
164
    }
604
164
  }
605
606
  // This is the same as UpdateLastReceivedOpIdUnlocked() but we do it
607
  // here to avoid the bounds check, since we're breaking monotonicity.
608
168
  last_received_op_id_ = new_preceding;
609
168
  last_received_op_id_current_leader_ = OpId();
610
168
  next_index_ = new_preceding.index + 1;
611
612
168
  auto abort_status = STATUS(Aborted, "Operation aborted by new leader");
613
439
  for (auto it = pending_operations_.end(); it != preceding_op_iter;) {
614
271
    const ConsensusRoundPtr& round = *--it;
615
271
    auto op_id = OpId::FromPB(round->replicate_msg()->id());
616
271
    LOG_WITH_PREFIX(INFO) << "Aborting uncommitted operation due to leader change: "
617
271
                          << op_id << ", committed: " << last_committed_op_id_;
618
271
    NotifyReplicationFinishedUnlocked(round, abort_status, OpId::kUnknownTerm,
619
271
                                      nullptr /* applied_op_ids */);
620
271
  }
621
622
  // Clear entries from pending operations.
623
168
  pending_operations_.erase(preceding_op_iter, pending_operations_.end());
624
168
  CheckPendingOperationsHead();
625
626
168
  return Status::OK();
627
168
}
628
629
14.5M
Status ReplicaState::AddPendingOperation(const ConsensusRoundPtr& round, OperationMode mode) {
630
14.5M
  DCHECK(IsLocked());
631
632
14.5M
  auto op_type = round->replicate_msg()->op_type();
633
14.5M
  if (PREDICT_FALSE(state_ != kRunning)) {
634
    // Special case when we're configuring and this is a config change, refuse
635
    // everything else.
636
    // TODO: Don't require a NO_OP to get to kRunning state
637
0
    if (op_type != NO_OP) {
638
0
      return STATUS(IllegalState, "Cannot trigger prepare. Replica is not in kRunning state.");
639
0
    }
640
0
  }
641
642
14.5M
  if (mode == OperationMode::kLeader) {
643
5.11M
    RETURN_NOT_OK(context_->CheckOperationAllowed(round->id(), op_type));
644
5.11M
  }
645
646
  // Mark pending configuration.
647
14.5M
  if (PREDICT_FALSE(op_type == CHANGE_CONFIG_OP)) {
648
19.8k
    DCHECK(round->replicate_msg()->change_config_record().has_old_config());
649
19.8k
    DCHECK(round->replicate_msg()->change_config_record().old_config().has_opid_index());
650
19.8k
    DCHECK(round->replicate_msg()->change_config_record().has_new_config());
651
19.8k
    DCHECK(!round->replicate_msg()->change_config_record().new_config().has_opid_index());
652
19.8k
    if (mode == OperationMode::kFollower) {
653
13.9k
      const RaftConfigPB& old_config = round->replicate_msg()->change_config_record().old_config();
654
13.9k
      const RaftConfigPB& new_config = round->replicate_msg()->change_config_record().new_config();
655
      // The leader has to mark the configuration as pending before it gets here
656
      // because the active configuration affects the replication queue.
657
      // Do one last sanity check.
658
13.9k
      Status s = CheckNoConfigChangePendingUnlocked();
659
13.9k
      if (PREDICT_FALSE(!s.ok() && !new_config.unsafe_config_change())) {
660
317
        s = s.CloneAndAppend(Format("New config: $0", new_config));
661
317
        LOG_WITH_PREFIX(INFO) << s;
662
317
        return s;
663
317
      }
664
      // Check if the pending Raft config has an OpId less than the committed
665
      // config. If so, this is a replay at startup in which the COMMIT
666
      // messages were delayed.
667
13.6k
      const RaftConfigPB& committed_config = GetCommittedConfigUnlocked();
668
13.6k
      if (
round->replicate_msg()->id().index() > committed_config.opid_index()13.6k
) {
669
13.6k
        CHECK_OK(SetPendingConfigUnlocked(new_config));
670
18.4E
      } else {
671
18.4E
        LOG_WITH_PREFIX(INFO)
672
18.4E
            << "Ignoring setting pending config change with OpId "
673
18.4E
            << round->replicate_msg()->id() << " because the committed config has OpId index "
674
18.4E
            << committed_config.opid_index() << ". The config change we are ignoring is: "
675
18.4E
            << "Old config: { " << old_config.ShortDebugString() << " }. "
676
18.4E
            << "New config: { " << new_config.ShortDebugString() << " }";
677
18.4E
      }
678
13.6k
    }
679
14.5M
  } else if (op_type == WRITE_OP) {
680
    // Leader registers an operation with RetryableRequests even before assigning an op id.
681
8.55M
    if (mode == OperationMode::kFollower && 
!retryable_requests_.Register(round)5.53M
) {
682
0
      return STATUS(IllegalState, "Cannot register retryable request on follower");
683
0
    }
684
8.55M
  } else 
if (5.95M
op_type == SPLIT_OP5.95M
) {
685
71
    const auto& split_request = round->replicate_msg()->split_request();
686
71
    SCHECK_EQ(
687
71
        split_request.tablet_id(), cmeta_->tablet_id(), InvalidArgument,
688
71
        "Received split op for a different tablet.");
689
    // TODO(tsplit): if we get failures past this point we can't undo the tablet state.
690
    // Might be need some tool to be able to remove SPLIT_OP from Raft log.
691
71
  }
692
693
14.5M
  
LOG_IF_WITH_PREFIX1.39k
(
694
1.39k
      DFATAL,
695
1.39k
      !pending_operations_.empty() &&
696
1.39k
          pending_operations_.back()->id().index + 1 != round->id().index)
697
1.39k
      << "Adding operation with wrong index: " << AsString(round) << ", last op id: "
698
1.39k
      << AsString(pending_operations_.back()->id()) << ", operations: "
699
1.39k
      << AsString(pending_operations_);
700
14.5M
  pending_operations_.push_back(round);
701
14.5M
  CheckPendingOperationsHead();
702
14.5M
  return Status::OK();
703
14.5M
}
704
705
7.94M
scoped_refptr<ConsensusRound> ReplicaState::GetPendingOpByIndexOrNullUnlocked(int64_t index) {
706
7.94M
  DCHECK(IsLocked());
707
7.94M
  auto it = FindPendingOperation(index);
708
7.94M
  if (it == pending_operations_.end()) {
709
0
    return nullptr;
710
0
  }
711
7.94M
  return *it;
712
7.94M
}
713
714
Status ReplicaState::UpdateMajorityReplicatedUnlocked(
715
    const OpId& majority_replicated, OpId* committed_op_id,
716
34.7M
    bool* committed_op_id_changed, OpId* last_applied_op_id) {
717
34.7M
  DCHECK(IsLocked());
718
34.7M
  if (PREDICT_FALSE(state_ == kShuttingDown || state_ == kShutDown)) {
719
0
    return STATUS(ServiceUnavailable, "Cannot trigger apply. Replica is shutting down.");
720
0
  }
721
34.7M
  if (PREDICT_FALSE(state_ != kRunning)) {
722
0
    return STATUS(IllegalState, "Cannot trigger apply. Replica is not in kRunning state.");
723
0
  }
724
725
  // If the last committed operation was in the current term (the normal case)
726
  // then 'committed_op_id' is simply equal to majority replicated.
727
34.7M
  if (last_committed_op_id_.term == GetCurrentTermUnlocked()) {
728
34.6M
    *committed_op_id_changed = VERIFY_RESULT(AdvanceCommittedOpIdUnlocked(
729
0
        majority_replicated, CouldStop::kFalse));
730
0
    *committed_op_id = last_committed_op_id_;
731
34.6M
    *last_applied_op_id = GetLastAppliedOpIdUnlocked();
732
34.6M
    return Status::OK();
733
34.6M
  }
734
735
  // If the last committed operation is not in the current term (such as when
736
  // we change leaders) but 'majority_replicated' is then we can advance the
737
  // 'committed_op_id' too.
738
127k
  if (majority_replicated.term == GetCurrentTermUnlocked()) {
739
61.9k
    auto previous = last_committed_op_id_;
740
61.9k
    *committed_op_id_changed = VERIFY_RESULT(AdvanceCommittedOpIdUnlocked(
741
0
        majority_replicated, CouldStop::kFalse));
742
0
    *committed_op_id = last_committed_op_id_;
743
61.9k
    *last_applied_op_id = GetLastAppliedOpIdUnlocked();
744
61.9k
    LOG_WITH_PREFIX(INFO)
745
61.9k
        << "Advanced the committed_op_id across terms."
746
61.9k
        << " Last committed operation was: " << previous
747
61.9k
        << " New committed index is: " << last_committed_op_id_;
748
61.9k
    return Status::OK();
749
61.9k
  }
750
751
65.9k
  *committed_op_id = last_committed_op_id_;
752
65.9k
  *last_applied_op_id = GetLastAppliedOpIdUnlocked();
753
65.9k
  YB_LOG_EVERY_N_SECS(WARNING, 1) << LogPrefix()
754
24.1k
          << "Can't advance the committed index across term boundaries"
755
24.1k
          << " until operations from the current term are replicated."
756
24.1k
          << " Last committed operation was: " << last_committed_op_id_ << ","
757
24.1k
          << " New majority replicated is: " << majority_replicated << ","
758
24.1k
          << " Current term is: " << GetCurrentTermUnlocked();
759
760
65.9k
  return Status::OK();
761
127k
}
762
763
13.2M
void ReplicaState::SetLastCommittedIndexUnlocked(const yb::OpId& committed_op_id) {
764
13.2M
  DCHECK(IsLocked());
765
13.2M
  CHECK_GE(last_received_op_id_.index, committed_op_id.index);
766
13.2M
  last_committed_op_id_ = committed_op_id;
767
13.2M
  CheckPendingOperationsHead();
768
13.2M
}
769
770
150k
Status ReplicaState::InitCommittedOpIdUnlocked(const yb::OpId& committed_op_id) {
771
150k
  if (!last_committed_op_id_.empty()) {
772
0
    return STATUS_FORMAT(
773
0
        IllegalState,
774
0
        "Committed index already initialized to: $0, tried to set $1",
775
0
        last_committed_op_id_,
776
0
        committed_op_id);
777
0
  }
778
779
150k
  if (!pending_operations_.empty() &&
780
150k
      
committed_op_id.index >= pending_operations_.front()->id().index66
) {
781
0
    RETURN_NOT_OK(ApplyPendingOperationsUnlocked(committed_op_id, CouldStop::kFalse));
782
0
  }
783
784
150k
  SetLastCommittedIndexUnlocked(committed_op_id);
785
786
150k
  return Status::OK();
787
150k
}
788
789
40.7M
void ReplicaState::CheckPendingOperationsHead() const {
790
40.7M
  if (pending_operations_.empty() || 
last_committed_op_id_.empty()28.1M
||
791
40.7M
      
pending_operations_.front()->id().index == last_committed_op_id_.index + 127.8M
) {
792
40.7M
    return;
793
40.7M
  }
794
795
7.34k
  LOG_WITH_PREFIX(FATAL)
796
7.34k
      << "The first pending operation's index is supposed to immediately follow the last committed "
797
7.34k
      << "operation's index. Committed op id: " << last_committed_op_id_ << ", pending operations: "
798
7.34k
      << AsString(pending_operations_);
799
7.34k
}
800
801
Result<bool> ReplicaState::AdvanceCommittedOpIdUnlocked(
802
85.5M
    const yb::OpId& committed_op_id, CouldStop could_stop) {
803
85.5M
  DCHECK(IsLocked());
804
  // If we already committed up to (or past) 'id' return.
805
  // This can happen in the case that multiple UpdateConsensus() calls end
806
  // up in the RPC queue at the same time, and then might get interleaved out
807
  // of order.
808
85.5M
  if (last_committed_op_id_.index >= committed_op_id.index) {
809
72.5M
    
VLOG_WITH_PREFIX884
(1)
810
884
        << "Already marked ops through " << last_committed_op_id_ << " as committed. "
811
884
        << "Now trying to mark " << committed_op_id << " which would be a no-op.";
812
72.5M
    return false;
813
72.5M
  }
814
815
13.0M
  if (pending_operations_.empty()) {
816
0
    VLOG_WITH_PREFIX(1) << "No operations to mark as committed up to: "
817
0
                        << committed_op_id;
818
0
    return STATUS_FORMAT(
819
0
        NotFound,
820
0
        "No pending entries, requested to advance last committed OpId from $0 to $1, "
821
0
            "last received: $2",
822
0
        last_committed_op_id_, committed_op_id, last_received_op_id_);
823
0
  }
824
825
13.0M
  CheckPendingOperationsHead();
826
827
13.0M
  auto old_index = last_committed_op_id_.index;
828
829
13.0M
  auto status = ApplyPendingOperationsUnlocked(committed_op_id, could_stop);
830
13.0M
  if (!status.ok()) {
831
0
    return status;
832
0
  }
833
834
13.0M
  return last_committed_op_id_.index != old_index;
835
13.0M
}
836
837
Status ReplicaState::ApplyPendingOperationsUnlocked(
838
13.0M
    const yb::OpId& committed_op_id, CouldStop could_stop) {
839
13.0M
  DCHECK(IsLocked());
840
18.4E
  VLOG_WITH_PREFIX(1) << "Last triggered apply was: " <<  last_committed_op_id_;
841
842
  // Stop at the operation after the last one we must commit. This iterator by definition points to
843
  // the first entry greater than the committed index, so the entry preceding that must have the
844
  // OpId equal to committed_op_id.
845
846
13.0M
  auto prev_id = last_committed_op_id_;
847
13.0M
  yb::OpId max_allowed_op_id;
848
13.0M
  if (!safe_op_id_waiter_) {
849
0
    max_allowed_op_id.index = std::numeric_limits<int64_t>::max();
850
0
  }
851
13.0M
  auto leader_term = GetLeaderStateUnlocked().term;
852
853
13.0M
  OpIds applied_op_ids;
854
13.0M
  applied_op_ids.reserve(committed_op_id.index - prev_id.index);
855
856
13.0M
  Status status;
857
858
27.5M
  while (!pending_operations_.empty()) {
859
15.0M
    auto round = pending_operations_.front();
860
15.0M
    auto current_id = round->id();
861
862
15.0M
    if (PREDICT_TRUE(prev_id)) {
863
14.9M
      CHECK_OK(CheckOpInSequence(prev_id, current_id));
864
14.9M
    }
865
866
15.0M
    if (current_id.index > committed_op_id.index) {
867
528k
      break;
868
528k
    }
869
870
14.5M
    auto type = round->replicate_msg()->op_type();
871
872
    // For write operations we block rocksdb flush, until appropriate records are written to the
873
    // log file. So we could apply them before adding to log.
874
14.5M
    if (type == OperationType::WRITE_OP) {
875
8.55M
      if (could_stop && 
!context_->ShouldApplyWrite()5.53M
) {
876
0
        YB_LOG_EVERY_N_SECS(WARNING, 5) << LogPrefix()
877
0
            << "Stop apply pending operations, because of write delay required, last applied: "
878
0
            << prev_id << " of " << committed_op_id;
879
0
        break;
880
0
      }
881
8.55M
    } else 
if (5.97M
current_id.index > max_allowed_op_id.index5.97M
||
882
5.97M
               
current_id.term > max_allowed_op_id.term613k
) {
883
5.36M
      max_allowed_op_id = safe_op_id_waiter_->WaitForSafeOpIdToApply(current_id);
884
      // This situation should not happen. Prior to #4150 it could happen as follows.  Suppose
885
      // replica A was the leader of term 1 and added operations 1.100 and 1.101 to the WAL but
886
      // has not committed them yet.  Replica B decides that A is unavailable, starts and wins
887
      // term 2 election, and tries to replicate a no-op 2.100.  Replica A starts and wins term 3
888
      // election and then continues to replicate 1.100 and 1.101 and the new no-op 3.102.
889
      // Suppose an UpdateConsensus from replica A reaches replica B with a committed op id of
890
      // 3.102 (because perhaps some other replica has already received those entries).  Replica B
891
      // will abort 2.100 and try to apply all three operations. Suppose the last op id flushed to
892
      // the WAL on replica B is currently 2.100, and current_id is 1.101. Then
893
      // WaitForSafeOpIdToApply would return 2.100 immediately as 2.100 > 1.101 in terms of OpId
894
      // comparison, and we will throw an error here.
895
      //
896
      // However, after the #4150 fix we are resetting flushed op id using ResetLastSynchedOpId
897
      // when aborting operations during term changes, so WaitForSafeOpIdToApply would correctly
898
      // wait until 1.101 is written and return 1.101 or 3.102 in the above example.
899
5.36M
      if (
max_allowed_op_id.index < current_id.index5.36M
|| max_allowed_op_id.term < current_id.term) {
900
0
        status = STATUS_FORMAT(
901
0
            RuntimeError,
902
0
            "Bad max allowed op id ($0), term/index must be no less than that of current op id "
903
0
            "($1)",
904
0
            max_allowed_op_id, current_id);
905
0
        break;
906
0
      }
907
5.36M
    }
908
909
14.5M
    pending_operations_.pop_front();
910
    // Set committed configuration.
911
14.5M
    if (PREDICT_FALSE(type == OperationType::CHANGE_CONFIG_OP)) {
912
19.5k
      ApplyConfigChangeUnlocked(round);
913
19.5k
    }
914
915
14.5M
    prev_id = current_id;
916
14.5M
    NotifyReplicationFinishedUnlocked(round, Status::OK(), leader_term, &applied_op_ids);
917
14.5M
  }
918
919
13.0M
  SetLastCommittedIndexUnlocked(prev_id);
920
921
13.0M
  applied_ops_tracker_(applied_op_ids);
922
923
13.0M
  return status;
924
13.0M
}
925
926
19.5k
void ReplicaState::ApplyConfigChangeUnlocked(const ConsensusRoundPtr& round) {
927
19.5k
  DCHECK(round->replicate_msg()->change_config_record().has_old_config());
928
19.5k
  DCHECK(round->replicate_msg()->change_config_record().has_new_config());
929
19.5k
  RaftConfigPB old_config = round->replicate_msg()->change_config_record().old_config();
930
19.5k
  RaftConfigPB new_config = round->replicate_msg()->change_config_record().new_config();
931
19.5k
  DCHECK(old_config.has_opid_index());
932
19.5k
  DCHECK(!new_config.has_opid_index());
933
934
19.5k
  const OpId& current_id = round->id();
935
936
19.5k
  if (PREDICT_FALSE(FLAGS_inject_delay_commit_pre_voter_to_voter_secs)) {
937
7
    bool is_transit_to_voter =
938
7
      CountVotersInTransition(old_config) > CountVotersInTransition(new_config);
939
7
    if (is_transit_to_voter) {
940
4
      LOG_WITH_PREFIX(INFO)
941
4
          << "Commit skipped as inject_delay_commit_pre_voter_to_voter_secs flag is set to true.\n"
942
4
          << "  Old config: { " << old_config.ShortDebugString() << " }.\n"
943
4
          << "  New config: { " << new_config.ShortDebugString() << " }";
944
4
      SleepFor(MonoDelta::FromSeconds(FLAGS_inject_delay_commit_pre_voter_to_voter_secs));
945
4
    }
946
7
  }
947
948
19.5k
  new_config.set_opid_index(current_id.index);
949
  // Check if the pending Raft config has an OpId less than the committed
950
  // config. If so, this is a replay at startup in which the COMMIT
951
  // messages were delayed.
952
19.5k
  const RaftConfigPB& committed_config = GetCommittedConfigUnlocked();
953
19.5k
  if (new_config.opid_index() > committed_config.opid_index()) {
954
19.5k
    LOG_WITH_PREFIX(INFO)
955
19.5k
        << "Committing config change with OpId "
956
19.5k
        << current_id << ". Old config: { " << old_config.ShortDebugString() << " }. "
957
19.5k
        << "New config: { " << new_config.ShortDebugString() << " }";
958
19.5k
    CHECK_OK(SetCommittedConfigUnlocked(new_config));
959
19.5k
  } else {
960
3
    LOG_WITH_PREFIX(INFO)
961
3
        << "Ignoring commit of config change with OpId "
962
3
        << current_id << " because the committed config has OpId index "
963
3
        << committed_config.opid_index() << ". The config change we are ignoring is: "
964
3
        << "Old config: { " << old_config.ShortDebugString() << " }. "
965
3
        << "New config: { " << new_config.ShortDebugString() << " }";
966
3
  }
967
19.5k
}
968
969
233M
const yb::OpId& ReplicaState::GetCommittedOpIdUnlocked() const {
970
233M
  DCHECK(IsLocked());
971
233M
  return last_committed_op_id_;
972
233M
}
973
974
24.2M
RestartSafeCoarseMonoClock& ReplicaState::Clock() {
975
24.2M
  return retryable_requests_.Clock();
976
24.2M
}
977
978
0
RetryableRequestsCounts ReplicaState::TEST_CountRetryableRequests() {
979
0
  auto lock = LockForRead();
980
0
  return retryable_requests_.TEST_Counts();
981
0
}
982
983
2.91M
bool ReplicaState::AreCommittedAndCurrentTermsSameUnlocked() const {
984
2.91M
  int64_t term = GetCurrentTermUnlocked();
985
2.91M
  const auto& opid = GetCommittedOpIdUnlocked();
986
2.91M
  if (opid.term != term) {
987
0
    LOG(INFO) << "committed term=" << opid.term << ", current term=" << term;
988
0
    return false;
989
0
  }
990
2.91M
  return true;
991
2.91M
}
992
993
13.2M
void ReplicaState::UpdateLastReceivedOpIdUnlocked(const OpIdPB& op_id) {
994
13.2M
  DCHECK(IsLocked());
995
13.2M
  auto* trace = Trace::CurrentTrace();
996
18.4E
  DCHECK(last_received_op_id_.term <= op_id.term() && last_received_op_id_.index <= op_id.index())
997
18.4E
      << LogPrefix() << ": "
998
18.4E
      << "Previously received OpId: " << last_received_op_id_
999
18.4E
      << ", updated OpId: " << op_id.ShortDebugString()
1000
18.4E
      << ", Trace:" << std::endl << (trace ? 
trace->DumpToString(true)0
: "No trace found");
1001
1002
13.2M
  last_received_op_id_ = yb::OpId::FromPB(op_id);
1003
13.2M
  last_received_op_id_current_leader_ = last_received_op_id_;
1004
13.2M
  next_index_ = op_id.index() + 1;
1005
13.2M
}
1006
1007
98.6M
const yb::OpId& ReplicaState::GetLastReceivedOpIdUnlocked() const {
1008
98.6M
  DCHECK(IsLocked());
1009
98.6M
  return last_received_op_id_;
1010
98.6M
}
1011
1012
25.5M
const yb::OpId& ReplicaState::GetLastReceivedOpIdCurLeaderUnlocked() const {
1013
25.5M
  DCHECK(IsLocked());
1014
25.5M
  return last_received_op_id_current_leader_;
1015
25.5M
}
1016
1017
25.4M
OpId ReplicaState::GetLastPendingOperationOpIdUnlocked() const {
1018
25.4M
  DCHECK(IsLocked());
1019
25.4M
  return pending_operations_.empty() ? 
OpId()17.4M
:
pending_operations_.back()->id()7.95M
;
1020
25.4M
}
1021
1022
5.11M
OpId ReplicaState::NewIdUnlocked() {
1023
5.11M
  DCHECK(IsLocked());
1024
5.11M
  return OpId(GetCurrentTermUnlocked(), next_index_++);
1025
5.11M
}
1026
1027
23
void ReplicaState::CancelPendingOperation(const OpId& id, bool should_exist) {
1028
23
  DCHECK(IsLocked());
1029
23
  CHECK_EQ(GetCurrentTermUnlocked(), id.term);
1030
23
  CHECK_EQ(next_index_, id.index + 1);
1031
23
  next_index_ = id.index;
1032
1033
  // We don't use UpdateLastReceivedOpIdUnlocked because we're actually
1034
  // updating it back to a lower value and we need to avoid the checks
1035
  // that method has.
1036
1037
  // This is only ok if we do _not_ release the lock after calling
1038
  // NewIdUnlocked() (which we don't in RaftConsensus::Replicate()).
1039
  // The below is correct because we always have starting NoOp, that has the same term and was
1040
  // replicated.
1041
23
  last_received_op_id_ = OpId(id.term, id.index - 1);
1042
23
  if (should_exist) {
1043
0
    CHECK(!pending_operations_.empty() && pending_operations_.back()->id() == id)
1044
0
        << "Pending operations should end with: " << id << ", but there are: "
1045
0
        << AsString(pending_operations_);
1046
0
    pending_operations_.pop_back();
1047
23
  } else {
1048
    // It could happen only in leader, while we add new operations, since we already have no op
1049
    // in this term, that would not be cancelled, we could be sure that previous operation
1050
    // has the same term.
1051
23
    OpId expected_last_op_id(id.term, id.index - 1);
1052
23
    CHECK(pending_operations_.empty() ||
1053
0
          (pending_operations_.back()->id() == expected_last_op_id))
1054
0
        << "Pending operations should end with: " << expected_last_op_id << ", but there are: "
1055
0
        << AsString(pending_operations_);
1056
23
  }
1057
23
}
1058
1059
9.49M
string ReplicaState::LogPrefix() const {
1060
9.49M
  auto role_and_term = cmeta_->GetRoleAndTerm();
1061
9.49M
  return Substitute("T $0 P $1 [term $2 $3]: ",
1062
9.49M
                    options_.tablet_id,
1063
9.49M
                    peer_uuid_,
1064
9.49M
                    role_and_term.second,
1065
9.49M
                    PeerRole_Name(role_and_term.first));
1066
9.49M
}
1067
1068
90.8k
ReplicaState::State ReplicaState::state() const {
1069
90.8k
  DCHECK(IsLocked());
1070
90.8k
  return state_;
1071
90.8k
}
1072
1073
0
string ReplicaState::ToString() const {
1074
0
  ThreadRestrictions::AssertWaitAllowed();
1075
0
  ReplicaState::UniqueLock lock(update_lock_);
1076
0
  return ToStringUnlocked();
1077
0
}
1078
1079
223k
string ReplicaState::ToStringUnlocked() const {
1080
223k
  DCHECK(IsLocked());
1081
223k
  return Format(
1082
223k
      "Replica: $0, State: $1, Role: $2, Watermarks: {Received: $3 Committed: $4} Leader: $5",
1083
223k
      peer_uuid_, state_, PeerRole_Name(GetActiveRoleUnlocked()),
1084
223k
      last_received_op_id_, last_committed_op_id_, last_received_op_id_current_leader_);
1085
223k
}
1086
1087
24.3M
Status ReplicaState::CheckOpInSequence(const yb::OpId& previous, const yb::OpId& current) {
1088
24.3M
  if (current.term < previous.term) {
1089
1
    return STATUS_FORMAT(
1090
1
        Corruption,
1091
1
        "New operation's term is not >= than the previous op's term. Current: $0. Previous: $1",
1092
1
        current, previous);
1093
1
  }
1094
1095
24.3M
  if (current.index != previous.index + 1) {
1096
1
    return STATUS_FORMAT(
1097
1
        Corruption,
1098
1
        "New operation's index does not follow the previous op's index. Current: $0. Previous: $1",
1099
1
        current, previous);
1100
1
  }
1101
24.3M
  return Status::OK();
1102
24.3M
}
1103
1104
void ReplicaState::UpdateOldLeaderLeaseExpirationOnNonLeaderUnlocked(
1105
25.4M
    const CoarseTimeLease& lease, const PhysicalComponentLease& ht_lease) {
1106
25.4M
  old_leader_lease_.TryUpdate(lease);
1107
25.4M
  old_leader_ht_lease_.TryUpdate(ht_lease);
1108
1109
  // Reset our lease, since we are non leader now. I.e. follower or candidate.
1110
25.4M
  auto existing_lease = majority_replicated_lease_expiration_;
1111
25.4M
  if (existing_lease != CoarseTimeLease::NoneValue()) {
1112
159k
    LOG_WITH_PREFIX(INFO)
1113
159k
        << "Reset our lease: " << MonoDelta(CoarseMonoClock::now() - existing_lease);
1114
159k
    majority_replicated_lease_expiration_ = CoarseTimeLease::NoneValue();
1115
159k
  }
1116
1117
25.4M
  auto existing_ht_lease = majority_replicated_ht_lease_expiration_.load(std::memory_order_acquire);
1118
25.4M
  if (existing_ht_lease != PhysicalComponentLease::NoneValue()) {
1119
10.8k
    LOG_WITH_PREFIX(INFO) << "Reset our ht lease: " << HybridTime::FromMicros(existing_ht_lease);
1120
10.8k
    majority_replicated_ht_lease_expiration_.store(PhysicalComponentLease::NoneValue(),
1121
10.8k
                                                   std::memory_order_release);
1122
10.8k
    cond_.notify_all();
1123
10.8k
  }
1124
25.4M
}
1125
1126
template <class Policy>
1127
94.2M
LeaderLeaseStatus ReplicaState::GetLeaseStatusUnlocked(Policy policy) const {
1128
94.2M
  DCHECK_EQ(GetActiveRoleUnlocked(), PeerRole::LEADER);
1129
1130
94.2M
  if (!policy.Enabled()) {
1131
1.37k
    return LeaderLeaseStatus::HAS_LEASE;
1132
1.37k
  }
1133
1134
94.2M
  if (GetActiveConfigUnlocked().peers_size() == 1) {
1135
    // It is OK that majority_replicated_lease_expiration_ might be undefined in this case, because
1136
    // we are only reading it in this function (as of 08/09/2017).
1137
8.91M
    return LeaderLeaseStatus::HAS_LEASE;
1138
8.91M
  }
1139
1140
85.2M
  if (!policy.OldLeaderLeaseExpired()) {
1141
20.2k
    return LeaderLeaseStatus::OLD_LEADER_MAY_HAVE_LEASE;
1142
20.2k
  }
1143
1144
85.2M
  if (policy.MajorityReplicatedLeaseExpired()) {
1145
26.1k
    return LeaderLeaseStatus::NO_MAJORITY_REPLICATED_LEASE;
1146
26.1k
  }
1147
1148
85.2M
  return LeaderLeaseStatus::HAS_LEASE;
1149
85.2M
}
yb::consensus::LeaderLeaseStatus yb::consensus::ReplicaState::GetLeaseStatusUnlocked<yb::consensus::GetLeaderLeaseStatusPolicy>(yb::consensus::GetLeaderLeaseStatusPolicy) const
Line
Count
Source
1127
89.2M
LeaderLeaseStatus ReplicaState::GetLeaseStatusUnlocked(Policy policy) const {
1128
89.2M
  DCHECK_EQ(GetActiveRoleUnlocked(), PeerRole::LEADER);
1129
1130
89.2M
  if (!policy.Enabled()) {
1131
0
    return LeaderLeaseStatus::HAS_LEASE;
1132
0
  }
1133
1134
89.2M
  if (GetActiveConfigUnlocked().peers_size() == 1) {
1135
    // It is OK that majority_replicated_lease_expiration_ might be undefined in this case, because
1136
    // we are only reading it in this function (as of 08/09/2017).
1137
8.49M
    return LeaderLeaseStatus::HAS_LEASE;
1138
8.49M
  }
1139
1140
80.7M
  if (!policy.OldLeaderLeaseExpired()) {
1141
20.2k
    return LeaderLeaseStatus::OLD_LEADER_MAY_HAVE_LEASE;
1142
20.2k
  }
1143
1144
80.7M
  if (policy.MajorityReplicatedLeaseExpired()) {
1145
26.1k
    return LeaderLeaseStatus::NO_MAJORITY_REPLICATED_LEASE;
1146
26.1k
  }
1147
1148
80.7M
  return LeaderLeaseStatus::HAS_LEASE;
1149
80.7M
}
yb::consensus::LeaderLeaseStatus yb::consensus::ReplicaState::GetLeaseStatusUnlocked<yb::consensus::GetHybridTimeLeaseStatusAtPolicy>(yb::consensus::GetHybridTimeLeaseStatusAtPolicy) const
Line
Count
Source
1127
4.92M
LeaderLeaseStatus ReplicaState::GetLeaseStatusUnlocked(Policy policy) const {
1128
4.92M
  DCHECK_EQ(GetActiveRoleUnlocked(), PeerRole::LEADER);
1129
1130
4.92M
  if (!policy.Enabled()) {
1131
1.37k
    return LeaderLeaseStatus::HAS_LEASE;
1132
1.37k
  }
1133
1134
4.92M
  if (GetActiveConfigUnlocked().peers_size() == 1) {
1135
    // It is OK that majority_replicated_lease_expiration_ might be undefined in this case, because
1136
    // we are only reading it in this function (as of 08/09/2017).
1137
416k
    return LeaderLeaseStatus::HAS_LEASE;
1138
416k
  }
1139
1140
4.50M
  if (!policy.OldLeaderLeaseExpired()) {
1141
0
    return LeaderLeaseStatus::OLD_LEADER_MAY_HAVE_LEASE;
1142
0
  }
1143
1144
4.50M
  if (policy.MajorityReplicatedLeaseExpired()) {
1145
28
    return LeaderLeaseStatus::NO_MAJORITY_REPLICATED_LEASE;
1146
28
  }
1147
1148
4.50M
  return LeaderLeaseStatus::HAS_LEASE;
1149
4.50M
}
1150
1151
// Policy that is used during leader lease calculation.
1152
struct GetLeaderLeaseStatusPolicy {
1153
  const ReplicaState* replica_state;
1154
  MonoDelta* remaining_old_leader_lease;
1155
  CoarseTimePoint* now;
1156
1157
  GetLeaderLeaseStatusPolicy(
1158
      const ReplicaState* replica_state_, MonoDelta* remaining_old_leader_lease_,
1159
      CoarseTimePoint* now_)
1160
      : replica_state(replica_state_), remaining_old_leader_lease(remaining_old_leader_lease_),
1161
89.2M
        now(now_) {
1162
89.2M
    if (remaining_old_leader_lease) {
1163
84.3M
      *remaining_old_leader_lease = 0s;
1164
84.3M
    }
1165
89.2M
  }
1166
1167
80.7M
  bool OldLeaderLeaseExpired() {
1168
80.7M
    const auto remaining_old_leader_lease_duration =
1169
80.7M
        replica_state->RemainingOldLeaderLeaseDuration(now);
1170
80.7M
    if (remaining_old_leader_lease_duration) {
1171
20.2k
      if (remaining_old_leader_lease) {
1172
20.1k
        *remaining_old_leader_lease = remaining_old_leader_lease_duration;
1173
20.1k
      }
1174
20.2k
      return false;
1175
20.2k
    }
1176
80.7M
    return true;
1177
80.7M
  }
1178
1179
80.7M
  bool MajorityReplicatedLeaseExpired() {
1180
80.7M
    return replica_state->MajorityReplicatedLeaderLeaseExpired(now);
1181
80.7M
  }
1182
1183
89.2M
  bool Enabled() {
1184
89.2M
    return true;
1185
89.2M
  }
1186
};
1187
1188
80.7M
bool ReplicaState::MajorityReplicatedLeaderLeaseExpired(CoarseTimePoint* now) const {
1189
80.7M
  if (majority_replicated_lease_expiration_ == CoarseTimePoint()) {
1190
0
    return true;
1191
0
  }
1192
1193
80.7M
  if (*now == CoarseTimePoint()) {
1194
80.7M
    *now = CoarseMonoClock::Now();
1195
80.7M
  }
1196
1197
80.7M
  return *now >= majority_replicated_lease_expiration_;
1198
80.7M
}
1199
1200
LeaderLeaseStatus ReplicaState::GetLeaderLeaseStatusUnlocked(
1201
89.2M
    MonoDelta* remaining_old_leader_lease, CoarseTimePoint* now) const {
1202
89.2M
  if (now == nullptr) {
1203
54.5M
    CoarseTimePoint local_now;
1204
54.5M
    return GetLeaseStatusUnlocked(GetLeaderLeaseStatusPolicy(
1205
54.5M
        this, remaining_old_leader_lease, &local_now));
1206
54.5M
  }
1207
34.7M
  return GetLeaseStatusUnlocked(GetLeaderLeaseStatusPolicy(this, remaining_old_leader_lease, now));
1208
89.2M
}
1209
1210
4.50M
bool ReplicaState::MajorityReplicatedHybridTimeLeaseExpiredAt(MicrosTime hybrid_time) const {
1211
4.50M
  return hybrid_time >= majority_replicated_ht_lease_expiration_;
1212
4.50M
}
1213
1214
struct GetHybridTimeLeaseStatusAtPolicy {
1215
  const ReplicaState* replica_state;
1216
  MicrosTime micros_time;
1217
1218
  GetHybridTimeLeaseStatusAtPolicy(const ReplicaState* rs, MicrosTime ht)
1219
4.92M
      : replica_state(rs), micros_time(ht) {}
1220
1221
4.50M
  bool OldLeaderLeaseExpired() {
1222
4.50M
    return micros_time > replica_state->old_leader_ht_lease().expiration;
1223
4.50M
  }
1224
1225
4.50M
  bool MajorityReplicatedLeaseExpired() {
1226
4.50M
    return replica_state->MajorityReplicatedHybridTimeLeaseExpiredAt(micros_time);
1227
4.50M
  }
1228
1229
4.93M
  bool Enabled() {
1230
4.93M
    return FLAGS_ht_lease_duration_ms != 0;
1231
4.93M
  }
1232
};
1233
1234
LeaderLeaseStatus ReplicaState::GetHybridTimeLeaseStatusAtUnlocked(
1235
4.92M
    MicrosTime micros_time) const {
1236
4.92M
  return GetLeaseStatusUnlocked(GetHybridTimeLeaseStatusAtPolicy(this, micros_time));
1237
4.92M
}
1238
1239
81.9M
MonoDelta ReplicaState::RemainingOldLeaderLeaseDuration(CoarseTimePoint* now) const {
1240
81.9M
  MonoDelta result;
1241
81.9M
  if (old_leader_lease_) {
1242
43.2k
    CoarseTimePoint now_local;
1243
43.2k
    if (!now) {
1244
22.4k
      now = &now_local;
1245
22.4k
    }
1246
43.2k
    *now = CoarseMonoClock::Now();
1247
1248
43.2k
    if (*now > old_leader_lease_.expiration) {
1249
      // Reset the old leader lease expiration time so that we don't have to check it anymore.
1250
2.17k
      old_leader_lease_.Reset();
1251
41.0k
    } else {
1252
41.0k
      result = old_leader_lease_.expiration - *now;
1253
41.0k
    }
1254
43.2k
  }
1255
1256
81.9M
  return result;
1257
81.9M
}
1258
1259
Result<MicrosTime> ReplicaState::MajorityReplicatedHtLeaseExpiration(
1260
73.5M
    MicrosTime min_allowed, CoarseTimePoint deadline) const {
1261
73.5M
  if (FLAGS_ht_lease_duration_ms == 0) {
1262
2.81k
    return kMaxHybridTimePhysicalMicros;
1263
2.81k
  }
1264
1265
73.5M
  auto result = majority_replicated_ht_lease_expiration_.load(std::memory_order_acquire);
1266
73.5M
  if (result >= min_allowed) { // Fast path
1267
73.5M
    return result;
1268
73.5M
  }
1269
1270
21.2k
  if (result != PhysicalComponentLease::NoneValue()) {
1271
    // Slow path
1272
0
    UniqueLock l(update_lock_);
1273
0
    auto predicate = [this, &result, min_allowed] {
1274
0
      result = majority_replicated_ht_lease_expiration_.load(std::memory_order_acquire);
1275
0
      return result >= min_allowed || result == PhysicalComponentLease::NoneValue();
1276
0
    };
1277
0
    if (deadline == CoarseTimePoint::max()) {
1278
0
      cond_.wait(l, predicate);
1279
0
    } else if (!cond_.wait_until(l, deadline, predicate)) {
1280
0
      return STATUS_FORMAT(TimedOut, "Timed out waiting leader lease: $0", min_allowed);
1281
0
    }
1282
0
  }
1283
1284
21.2k
  if (result == PhysicalComponentLease::NoneValue()) {
1285
3
    static const Status kNotLeaderStatus = STATUS(IllegalState, "Not a leader");
1286
3
    return kNotLeaderStatus;
1287
3
  }
1288
1289
21.2k
  return result;
1290
21.2k
}
1291
1292
void ReplicaState::SetMajorityReplicatedLeaseExpirationUnlocked(
1293
    const MajorityReplicatedData& majority_replicated_data,
1294
34.7M
    EnumBitSet<SetMajorityReplicatedLeaseExpirationFlag> flags) {
1295
34.7M
  majority_replicated_lease_expiration_ = majority_replicated_data.leader_lease_expiration;
1296
34.7M
  majority_replicated_ht_lease_expiration_.store(majority_replicated_data.ht_lease_expiration,
1297
34.7M
                                                 std::memory_order_release);
1298
1299
34.7M
  if (flags.Test(SetMajorityReplicatedLeaseExpirationFlag::kResetOldLeaderLease)) {
1300
10.2k
    LOG_WITH_PREFIX(INFO)
1301
10.2k
        << "Revoked old leader " << old_leader_lease_.holder_uuid << " lease: "
1302
10.2k
        << MonoDelta(old_leader_lease_.expiration - CoarseMonoClock::now());
1303
10.2k
    old_leader_lease_.Reset();
1304
10.2k
  }
1305
1306
34.7M
  if (flags.Test(SetMajorityReplicatedLeaseExpirationFlag::kResetOldLeaderHtLease)) {
1307
10.7k
    LOG_WITH_PREFIX(INFO)
1308
10.7k
        << "Revoked old leader " << old_leader_ht_lease_.holder_uuid << " ht lease: "
1309
10.7k
        << HybridTime::FromMicros(old_leader_ht_lease_.expiration);
1310
10.7k
    old_leader_ht_lease_.Reset();
1311
10.7k
  }
1312
1313
34.7M
  CoarseTimePoint now;
1314
34.7M
  RefreshLeaderStateCacheUnlocked(&now);
1315
34.7M
  cond_.notify_all();
1316
34.7M
}
1317
1318
8.07k
uint64_t ReplicaState::OnDiskSize() const {
1319
8.07k
  return cmeta_->on_disk_size();
1320
8.07k
}
1321
1322
3.02M
bool ReplicaState::RegisterRetryableRequest(const ConsensusRoundPtr& round) {
1323
3.02M
  return retryable_requests_.Register(round);
1324
3.02M
}
1325
1326
50.1M
OpId ReplicaState::MinRetryableRequestOpId() {
1327
50.1M
  UniqueLock lock;
1328
50.1M
  auto status = LockForUpdate(&lock);
1329
50.1M
  if (!status.ok()) {
1330
0
    return OpId::Min(); // return minimal op id, that prevents log from cleaning
1331
0
  }
1332
50.1M
  return retryable_requests_.CleanExpiredReplicatedAndGetMinOpId();
1333
50.1M
}
1334
1335
void ReplicaState::NotifyReplicationFinishedUnlocked(
1336
    const ConsensusRoundPtr& round, const Status& status, int64_t leader_term,
1337
14.5M
    OpIds* applied_op_ids) {
1338
14.5M
  round->NotifyReplicationFinished(status, leader_term, applied_op_ids);
1339
1340
14.5M
  retryable_requests_.ReplicationFinished(*round->replicate_msg(), status, leader_term);
1341
14.5M
}
1342
1343
35.7M
consensus::LeaderState ReplicaState::RefreshLeaderStateCacheUnlocked(CoarseTimePoint* now) const {
1344
35.7M
  auto result = GetLeaderStateUnlocked(LeaderLeaseCheckMode::NEED_LEASE, now);
1345
35.7M
  LeaderStateCache cache;
1346
35.7M
  if (result.status == LeaderStatus::LEADER_AND_READY) {
1347
34.6M
    cache.Set(result.status, result.term, majority_replicated_lease_expiration_);
1348
34.6M
  } else 
if (1.05M
result.status == LeaderStatus::LEADER_BUT_OLD_LEADER_MAY_HAVE_LEASE1.05M
) {
1349
10.0k
    cache.Set(result.status, result.remaining_old_leader_lease.ToMicroseconds(),
1350
10.0k
              *now + result.remaining_old_leader_lease);
1351
1.04M
  } else {
1352
1.04M
    cache.Set(result.status, 0 /* extra_value */, CoarseTimePoint::max());
1353
1.04M
  }
1354
1355
35.7M
  leader_state_cache_.store(cache, boost::memory_order_release);
1356
1357
35.7M
  return result;
1358
35.7M
}
1359
1360
123k
void ReplicaState::SetLeaderNoOpCommittedUnlocked(bool value) {
1361
123k
  LOG_WITH_PREFIX(INFO)
1362
123k
      << __func__ << "(" << value << "), committed: " << GetCommittedOpIdUnlocked()
1363
123k
      << ", received: " << GetLastReceivedOpIdUnlocked();
1364
1365
123k
  leader_no_op_committed_ = value;
1366
123k
  CoarseTimePoint now;
1367
123k
  RefreshLeaderStateCacheUnlocked(&now);
1368
123k
}
1369
1370
}  // namespace consensus
1371
}  // namespace yb