YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/consensus/replica_state.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include "yb/consensus/replica_state.h"
34
35
#include <gflags/gflags.h>
36
37
#include "yb/consensus/consensus.h"
38
#include "yb/consensus/consensus_context.h"
39
#include "yb/consensus/consensus_round.h"
40
#include "yb/consensus/log_util.h"
41
#include "yb/consensus/quorum_util.h"
42
43
#include "yb/gutil/strings/substitute.h"
44
45
#include "yb/util/atomic.h"
46
#include "yb/util/debug/trace_event.h"
47
#include "yb/util/enums.h"
48
#include "yb/util/flag_tags.h"
49
#include "yb/util/format.h"
50
#include "yb/util/logging.h"
51
#include "yb/util/opid.h"
52
#include "yb/util/result.h"
53
#include "yb/util/status.h"
54
#include "yb/util/status_format.h"
55
#include "yb/util/status_log.h"
56
#include "yb/util/thread_restrictions.h"
57
#include "yb/util/tostring.h"
58
#include "yb/util/trace.h"
59
60
using namespace std::literals;
61
62
DEFINE_int32(inject_delay_commit_pre_voter_to_voter_secs, 0,
63
             "Amount of time to delay commit of a PRE_VOTER to VOTER transition. To be used for "
64
             "unit testing purposes only.");
65
TAG_FLAG(inject_delay_commit_pre_voter_to_voter_secs, unsafe);
66
TAG_FLAG(inject_delay_commit_pre_voter_to_voter_secs, hidden);
67
68
namespace yb {
69
namespace consensus {
70
71
using std::string;
72
using strings::Substitute;
73
using strings::SubstituteAndAppend;
74
75
//////////////////////////////////////////////////
76
// ReplicaState
77
//////////////////////////////////////////////////
78
79
ReplicaState::ReplicaState(
80
    ConsensusOptions options, string peer_uuid, std::unique_ptr<ConsensusMetadata> cmeta,
81
    ConsensusContext* consensus_context, SafeOpIdWaiter* safe_op_id_waiter,
82
    RetryableRequests* retryable_requests,
83
    std::function<void(const OpIds&)> applied_ops_tracker)
84
    : options_(std::move(options)),
85
      peer_uuid_(std::move(peer_uuid)),
86
      cmeta_(std::move(cmeta)),
87
      context_(consensus_context),
88
      safe_op_id_waiter_(safe_op_id_waiter),
89
88.7k
      applied_ops_tracker_(std::move(applied_ops_tracker)) {
90
11
  CHECK(cmeta_) << "ConsensusMeta passed as NULL";
91
88.7k
  if (retryable_requests) {
92
83.3k
    retryable_requests_ = std::move(*retryable_requests);
93
83.3k
  }
94
95
88.7k
  CHECK(IsAcceptableAtomicImpl(leader_state_cache_));
96
97
  // Actually we don't need this lock, but GetActiveRoleUnlocked checks that we are holding the
98
  // lock.
99
88.7k
  auto lock = LockForRead();
100
88.7k
  CoarseTimePoint now;
101
88.7k
  RefreshLeaderStateCacheUnlocked(&now);
102
88.7k
}
103
104
47.8k
ReplicaState::~ReplicaState() {
105
47.8k
}
106
107
88.7k
Status ReplicaState::StartUnlocked(const OpIdPB& last_id_in_wal) {
108
88.7k
  DCHECK(IsLocked());
109
110
  // Our last persisted term can be higher than the last persisted operation
111
  // (i.e. if we called an election) but reverse should never happen.
112
0
  CHECK_LE(last_id_in_wal.term(), GetCurrentTermUnlocked()) << LogPrefix()
113
0
      << "The last op in the WAL with id " << OpIdToString(last_id_in_wal)
114
0
      << " has a term (" << last_id_in_wal.term() << ") that is greater "
115
0
      << "than the latest recorded term, which is " << GetCurrentTermUnlocked();
116
117
88.7k
  next_index_ = last_id_in_wal.index() + 1;
118
119
88.7k
  last_received_op_id_ = yb::OpId::FromPB(last_id_in_wal);
120
121
88.7k
  state_ = kRunning;
122
88.7k
  return Status::OK();
123
88.7k
}
124
125
88.7k
Status ReplicaState::LockForStart(UniqueLock* lock) const {
126
88.7k
  ThreadRestrictions::AssertWaitAllowed();
127
88.7k
  UniqueLock l(update_lock_);
128
0
  CHECK_EQ(state_, kInitialized) << "Illegal state for Start()."
129
0
      << " Replica is not in kInitialized state";
130
88.7k
  lock->swap(l);
131
88.7k
  return Status::OK();
132
88.7k
}
133
134
629M
bool ReplicaState::IsLocked() const {
135
629M
  std::unique_lock<std::mutex> lock(update_lock_, std::try_to_lock);
136
629M
  return !lock.owns_lock();
137
629M
}
138
139
69.3M
ReplicaState::UniqueLock ReplicaState::LockForRead() const {
140
69.3M
  ThreadRestrictions::AssertWaitAllowed();
141
69.3M
  return UniqueLock(update_lock_);
142
69.3M
}
143
144
0
Status ReplicaState::LockForReplicate(UniqueLock* lock, const ReplicateMsg& msg) const {
145
0
  DCHECK(!msg.has_id()) << "Should not have an ID yet: " << msg.ShortDebugString();
146
0
  CHECK(msg.has_op_type());  // TODO: better checking?
147
0
  return LockForReplicate(lock);
148
0
}
149
150
2.60M
Status ReplicaState::LockForReplicate(UniqueLock* lock) const {
151
2.60M
  ThreadRestrictions::AssertWaitAllowed();
152
2.60M
  UniqueLock l(update_lock_);
153
2.60M
  if (PREDICT_FALSE(state_ != kRunning)) {
154
0
    return STATUS(IllegalState, "Replica not in running state");
155
0
  }
156
157
2.60M
  lock->swap(l);
158
2.60M
  return Status::OK();
159
2.60M
}
160
161
2.64M
Status ReplicaState::CheckIsActiveLeaderAndHasLease() const {
162
2.64M
  UniqueLock l(update_lock_);
163
2.64M
  if (PREDICT_FALSE(state_ != kRunning)) {
164
0
    return STATUS(IllegalState, "Replica not in running state");
165
0
  }
166
2.64M
  return CheckActiveLeaderUnlocked(LeaderLeaseCheckMode::NEED_LEASE);
167
2.64M
}
168
169
15.1M
Status ReplicaState::LockForMajorityReplicatedIndexUpdate(UniqueLock* lock) const {
170
15.1M
  TRACE_EVENT0("consensus", "ReplicaState::LockForMajorityReplicatedIndexUpdate");
171
15.1M
  ThreadRestrictions::AssertWaitAllowed();
172
15.1M
  UniqueLock l(update_lock_);
173
174
15.1M
  if (PREDICT_FALSE(state_ != kRunning)) {
175
31
    return STATUS(IllegalState, "Replica not in running state");
176
31
  }
177
178
15.1M
  if (PREDICT_FALSE(GetActiveRoleUnlocked() != PeerRole::LEADER)) {
179
50
    return STATUS(IllegalState, "Replica not LEADER");
180
50
  }
181
15.1M
  lock->swap(l);
182
15.1M
  return Status::OK();
183
15.1M
}
184
185
33.2M
LeaderState ReplicaState::GetLeaderState(bool allow_stale) const {
186
33.2M
  auto cache = leader_state_cache_.load(boost::memory_order_acquire);
187
188
33.2M
  if (!allow_stale) {
189
13.3M
    CoarseTimePoint now = CoarseMonoClock::Now();
190
13.3M
    if (now >= cache.expire_at) {
191
33
      auto lock = LockForRead();
192
33
      return RefreshLeaderStateCacheUnlocked(&now);
193
33
    }
194
33.2M
  }
195
196
33.2M
  LeaderState result = {cache.status()};
197
33.2M
  if (result.status == LeaderStatus::LEADER_AND_READY) {
198
21.1M
    result.term = cache.extra_value();
199
12.1M
  } else {
200
12.1M
    if (result.status == LeaderStatus::LEADER_BUT_OLD_LEADER_MAY_HAVE_LEASE) {
201
7.82k
      result.remaining_old_leader_lease = MonoDelta::FromMicroseconds(cache.extra_value());
202
7.82k
    }
203
12.1M
    result.MakeNotReadyLeader(result.status);
204
12.1M
  }
205
206
33.2M
  return result;
207
33.2M
}
208
209
LeaderState ReplicaState::GetLeaderStateUnlocked(
210
43.6M
    LeaderLeaseCheckMode lease_check_mode, CoarseTimePoint* now) const {
211
43.6M
  LeaderState result;
212
213
43.6M
  if (GetActiveRoleUnlocked() != PeerRole::LEADER) {
214
5.05M
    return result.MakeNotReadyLeader(LeaderStatus::NOT_LEADER);
215
5.05M
  }
216
217
38.5M
  if (!leader_no_op_committed_) {
218
    // This will cause the client to retry on the same server (won't try to find the new leader).
219
174k
    return result.MakeNotReadyLeader(LeaderStatus::LEADER_BUT_NO_OP_NOT_COMMITTED);
220
174k
  }
221
222
38.3M
  const auto lease_status = lease_check_mode != LeaderLeaseCheckMode::DONT_NEED_LEASE
223
35.3M
      ? GetLeaderLeaseStatusUnlocked(&result.remaining_old_leader_lease, now)
224
3.04M
      : LeaderLeaseStatus::HAS_LEASE;
225
38.3M
  switch (lease_status) {
226
10.5k
    case LeaderLeaseStatus::OLD_LEADER_MAY_HAVE_LEASE:
227
      // Will retry on the same server.
228
0
      VLOG(1) << "Old leader lease might still be active for "
229
0
              << result.remaining_old_leader_lease.ToString();
230
10.5k
      return result.MakeNotReadyLeader(
231
10.5k
          LeaderStatus::LEADER_BUT_OLD_LEADER_MAY_HAVE_LEASE);
232
233
8.44k
    case LeaderLeaseStatus::NO_MAJORITY_REPLICATED_LEASE:
234
      // Will retry to look up the leader, because it might have changed.
235
8.44k
      return result.MakeNotReadyLeader(
236
8.44k
          LeaderStatus::LEADER_BUT_NO_MAJORITY_REPLICATED_LEASE);
237
238
38.3M
    case LeaderLeaseStatus::HAS_LEASE:
239
38.3M
      result.status = LeaderStatus::LEADER_AND_READY;
240
38.3M
      result.term = GetCurrentTermUnlocked();
241
38.3M
      return result;
242
0
  }
243
244
0
  FATAL_INVALID_ENUM_VALUE(LeaderLeaseStatus, lease_status);
245
0
}
246
247
5.69M
Status ReplicaState::CheckActiveLeaderUnlocked(LeaderLeaseCheckMode lease_check_mode) const {
248
5.69M
  auto state = GetLeaderStateUnlocked(lease_check_mode);
249
5.69M
  if (state.status == LeaderStatus::NOT_LEADER) {
250
325
    ConsensusStatePB cstate = ConsensusStateUnlocked(CONSENSUS_CONFIG_ACTIVE);
251
325
    return STATUS_FORMAT(IllegalState,
252
325
                         "Replica $0 is not leader of this config. Role: $1. Consensus state: $2",
253
325
                         peer_uuid_, PeerRole_Name(GetActiveRoleUnlocked()), cstate);
254
325
  }
255
256
5.69M
  return state.CreateStatus();
257
5.69M
}
258
259
3.41M
Status ReplicaState::LockForConfigChange(UniqueLock* lock) const {
260
3.41M
  TRACE_EVENT0("consensus", "ReplicaState::LockForConfigChange");
261
262
3.41M
  ThreadRestrictions::AssertWaitAllowed();
263
3.41M
  UniqueLock l(update_lock_);
264
  // Can only change the config on running replicas.
265
3.41M
  if (PREDICT_FALSE(state_ != kRunning)) {
266
10
    return STATUS(IllegalState, "Unable to lock ReplicaState for config change",
267
10
                                Substitute("State = $0", state_));
268
10
  }
269
3.41M
  lock->swap(l);
270
3.41M
  return Status::OK();
271
3.41M
}
272
273
16.6M
Status ReplicaState::LockForUpdate(UniqueLock* lock) const {
274
16.6M
  TRACE_EVENT0("consensus", "ReplicaState::LockForUpdate");
275
16.6M
  ThreadRestrictions::AssertWaitAllowed();
276
16.6M
  UniqueLock l(update_lock_);
277
16.6M
  if (PREDICT_FALSE(state_ != kRunning)) {
278
12
    return STATUS(IllegalState, "Replica not in running state");
279
12
  }
280
16.6M
  lock->swap(l);
281
16.6M
  return Status::OK();
282
16.6M
}
283
284
95.6k
Status ReplicaState::LockForShutdown(UniqueLock* lock) {
285
95.6k
  TRACE_EVENT0("consensus", "ReplicaState::LockForShutdown");
286
95.6k
  ThreadRestrictions::AssertWaitAllowed();
287
95.6k
  UniqueLock l(update_lock_);
288
95.6k
  if (state_ != kShuttingDown && state_ != kShutDown) {
289
47.8k
    state_ = kShuttingDown;
290
47.8k
  }
291
95.6k
  lock->swap(l);
292
95.6k
  return Status::OK();
293
95.6k
}
294
295
47.8k
Status ReplicaState::ShutdownUnlocked() {
296
47.8k
  DCHECK(IsLocked());
297
47.8k
  CHECK_EQ(state_, kShuttingDown);
298
47.8k
  state_ = kShutDown;
299
47.8k
  return Status::OK();
300
47.8k
}
301
302
7.28M
ConsensusStatePB ReplicaState::ConsensusStateUnlocked(ConsensusConfigType type) const {
303
7.28M
  return cmeta_->ToConsensusStatePB(type);
304
7.28M
}
305
306
108M
PeerRole ReplicaState::GetActiveRoleUnlocked() const {
307
108M
  DCHECK(IsLocked());
308
108M
  return cmeta_->active_role();
309
108M
}
310
311
9.17M
bool ReplicaState::IsConfigChangePendingUnlocked() const {
312
9.17M
  DCHECK(IsLocked());
313
9.17M
  return cmeta_->has_pending_config();
314
9.17M
}
315
316
7.31k
Status ReplicaState::CheckNoConfigChangePendingUnlocked() const {
317
7.31k
  DCHECK(IsLocked());
318
7.31k
  if (IsConfigChangePendingUnlocked()) {
319
129
    return STATUS(IllegalState,
320
129
        Substitute("RaftConfig change currently pending. Only one is allowed at a time.\n"
321
129
                   "  Committed config: $0.\n  Pending config: $1",
322
129
                   GetCommittedConfigUnlocked().ShortDebugString(),
323
129
                   GetPendingConfigUnlocked().ShortDebugString()));
324
129
  }
325
7.18k
  return Status::OK();
326
7.18k
}
327
328
8.34k
Status ReplicaState::SetPendingConfigUnlocked(const RaftConfigPB& new_config) {
329
8.34k
  DCHECK(IsLocked());
330
8.34k
  RETURN_NOT_OK_PREPEND(VerifyRaftConfig(new_config, UNCOMMITTED_QUORUM),
331
8.34k
                        "Invalid config to set as pending");
332
8.34k
  if (!new_config.unsafe_config_change()) {
333
4
    CHECK(!cmeta_->has_pending_config())
334
4
        << "Attempt to set pending config while another is already pending! "
335
4
        << "Existing pending config: " << cmeta_->pending_config().ShortDebugString() << "; "
336
4
        << "Attempted new pending config: " << new_config.ShortDebugString();
337
18.4E
  } else if (cmeta_->has_pending_config()) {
338
0
    LOG_WITH_PREFIX(INFO) << "Allowing unsafe config change even though there is a pending config! "
339
0
                          << "Existing pending config: "
340
0
                          << cmeta_->pending_config().ShortDebugString() << "; "
341
0
                          << "New pending config: " << new_config.ShortDebugString();
342
0
  }
343
8.34k
  cmeta_->set_pending_config(new_config);
344
8.34k
  CoarseTimePoint now;
345
8.34k
  RefreshLeaderStateCacheUnlocked(&now);
346
8.34k
  return Status::OK();
347
8.34k
}
348
349
4
Status ReplicaState::ClearPendingConfigUnlocked() {
350
4
  DCHECK(IsLocked());
351
4
  if (!cmeta_->has_pending_config()) {
352
0
    LOG(WARNING) << "Attempt to clear a non-existent pending config."
353
0
                 << "Existing committed config: " << cmeta_->committed_config().ShortDebugString();
354
0
    return STATUS(IllegalState, "Attempt to clear a non-existent pending config.");
355
0
  }
356
4
  cmeta_->clear_pending_config();
357
4
  CoarseTimePoint now;
358
4
  RefreshLeaderStateCacheUnlocked(&now);
359
4
  return Status::OK();
360
4
}
361
362
3.06M
const RaftConfigPB& ReplicaState::GetPendingConfigUnlocked() const {
363
3.06M
  DCHECK(IsLocked());
364
0
  CHECK(IsConfigChangePendingUnlocked()) << "No pending config";
365
3.06M
  return cmeta_->pending_config();
366
3.06M
}
367
368
8.34k
Status ReplicaState::SetCommittedConfigUnlocked(const RaftConfigPB& config_to_commit) {
369
8.34k
  TRACE_EVENT0("consensus", "ReplicaState::SetCommittedConfigUnlocked");
370
8.34k
  DCHECK(IsLocked());
371
8.34k
  DCHECK(config_to_commit.IsInitialized());
372
8.34k
  RETURN_NOT_OK_PREPEND(
373
8.34k
      VerifyRaftConfig(config_to_commit, COMMITTED_QUORUM), "Invalid config to set as committed");
374
375
  // Compare committed with pending configuration, ensure they are the same.
376
  // In the event of an unsafe config change triggered by an administrator,
377
  // it is possible that the config being committed may not match the pending config
378
  // because unsafe config change allows multiple pending configs to exist.
379
  // Therefore we only need to validate that 'config_to_commit' matches the pending config
380
  // if the pending config does not have its 'unsafe_config_change' flag set.
381
8.34k
  if (IsConfigChangePendingUnlocked()) {
382
8.34k
    const RaftConfigPB& pending_config = GetPendingConfigUnlocked();
383
8.34k
    if (!pending_config.unsafe_config_change()) {
384
      // Pending will not have an opid_index, so ignore that field.
385
8.34k
      RaftConfigPB config_no_opid = config_to_commit;
386
8.34k
      config_no_opid.clear_opid_index();
387
      // Quorums must be exactly equal, even w.r.t. peer ordering.
388
0
      CHECK_EQ(GetPendingConfigUnlocked().SerializeAsString(), config_no_opid.SerializeAsString())
389
0
          << Substitute(
390
0
                 "New committed config must equal pending config, but does not. "
391
0
                 "Pending config: $0, committed config: $1",
392
0
                 pending_config.ShortDebugString(), config_to_commit.ShortDebugString());
393
8.34k
    }
394
8.34k
  }
395
8.34k
  cmeta_->set_committed_config(config_to_commit);
396
8.34k
  cmeta_->clear_pending_config();
397
8.34k
  CoarseTimePoint now;
398
8.34k
  RefreshLeaderStateCacheUnlocked(&now);
399
8.34k
  CHECK_OK(cmeta_->Flush());
400
8.34k
  return Status::OK();
401
8.34k
}
402
403
53.7M
const RaftConfigPB& ReplicaState::GetCommittedConfigUnlocked() const {
404
53.7M
  DCHECK(IsLocked());
405
53.7M
  return cmeta_->committed_config();
406
53.7M
}
407
408
43.9M
const RaftConfigPB& ReplicaState::GetActiveConfigUnlocked() const {
409
43.9M
  DCHECK(IsLocked());
410
43.9M
  return cmeta_->active_config();
411
43.9M
}
412
413
14.7M
bool ReplicaState::IsOpCommittedOrPending(const OpId& op_id, bool* term_mismatch) {
414
14.7M
  DCHECK(IsLocked());
415
416
14.7M
  *term_mismatch = false;
417
418
14.7M
  if (cmeta_ == nullptr) {
419
0
    LOG(FATAL) << "cmeta_ cannot be NULL";
420
0
  }
421
422
14.7M
  int64_t committed_index = GetCommittedOpIdUnlocked().index;
423
14.7M
  if (op_id.index <= committed_index) {
424
5.92M
    return true;
425
5.92M
  }
426
427
8.86M
  int64_t last_received_index = GetLastReceivedOpIdUnlocked().index;
428
8.86M
  if (op_id.index > last_received_index) {
429
4.57M
    return false;
430
4.57M
  }
431
432
4.28M
  scoped_refptr<ConsensusRound> round = GetPendingOpByIndexOrNullUnlocked(op_id.index);
433
4.28M
  if (round == nullptr) {
434
0
    LOG_WITH_PREFIX(ERROR)
435
0
        << "Consensus round not found for op id " << op_id << ": "
436
0
        << "committed_index=" << committed_index << ", "
437
0
        << "last_received_index=" << last_received_index << ", "
438
0
        << "tablet: " << options_.tablet_id << ", current state: "
439
0
        << ToStringUnlocked();
440
0
    DumpPendingOperationsUnlocked();
441
0
    CHECK(false);
442
0
  }
443
444
4.28M
  if (round->id().term != op_id.term) {
445
113
    *term_mismatch = true;
446
113
    return false;
447
113
  }
448
4.28M
  return true;
449
4.28M
}
450
451
103k
Status ReplicaState::SetCurrentTermUnlocked(int64_t new_term) {
452
103k
  TRACE_EVENT1("consensus", "ReplicaState::SetCurrentTermUnlocked",
453
103k
               "term", new_term);
454
103k
  DCHECK(IsLocked());
455
103k
  if (PREDICT_FALSE(new_term <= GetCurrentTermUnlocked())) {
456
0
    return STATUS(IllegalState,
457
0
        Substitute("Cannot change term to a term that is lower than or equal to the current one. "
458
0
                   "Current: $0, Proposed: $1", GetCurrentTermUnlocked(), new_term));
459
0
  }
460
103k
  cmeta_->set_current_term(new_term);
461
103k
  cmeta_->clear_voted_for();
462
  // OK to flush before clearing the leader, because the leader UUID is not part of
463
  // ConsensusMetadataPB.
464
103k
  RETURN_NOT_OK(cmeta_->Flush());
465
102k
  ClearLeaderUnlocked();
466
102k
  last_received_op_id_current_leader_ = yb::OpId();
467
102k
  return Status::OK();
468
103k
}
469
470
106M
const int64_t ReplicaState::GetCurrentTermUnlocked() const {
471
106M
  DCHECK(IsLocked());
472
106M
  return cmeta_->current_term();
473
106M
}
474
475
388k
void ReplicaState::SetLeaderUuidUnlocked(const std::string& uuid) {
476
388k
  DCHECK(IsLocked());
477
388k
  cmeta_->set_leader_uuid(uuid);
478
388k
  CoarseTimePoint now;
479
388k
  RefreshLeaderStateCacheUnlocked(&now);
480
388k
}
481
482
41.0M
const string& ReplicaState::GetLeaderUuidUnlocked() const {
483
41.0M
  DCHECK(IsLocked());
484
41.0M
  return cmeta_->leader_uuid();
485
41.0M
}
486
487
696
const bool ReplicaState::HasVotedCurrentTermUnlocked() const {
488
696
  DCHECK(IsLocked());
489
696
  return cmeta_->has_voted_for();
490
696
}
491
492
96.2k
Status ReplicaState::SetVotedForCurrentTermUnlocked(const std::string& uuid) {
493
96.2k
  TRACE_EVENT1("consensus", "ReplicaState::SetVotedForCurrentTermUnlocked",
494
96.2k
               "uuid", uuid);
495
96.2k
  DCHECK(IsLocked());
496
96.2k
  cmeta_->set_voted_for(uuid);
497
96.2k
  CHECK_OK(cmeta_->Flush());
498
96.2k
  return Status::OK();
499
96.2k
}
500
501
430
const std::string& ReplicaState::GetVotedForCurrentTermUnlocked() const {
502
430
  DCHECK(IsLocked());
503
430
  DCHECK(cmeta_->has_voted_for());
504
430
  return cmeta_->voted_for();
505
430
}
506
507
10.7M
const string& ReplicaState::GetPeerUuid() const {
508
10.7M
  return peer_uuid_;
509
10.7M
}
510
511
89.9k
const ConsensusOptions& ReplicaState::GetOptions() const {
512
89.9k
  return options_;
513
89.9k
}
514
515
0
void ReplicaState::DumpPendingOperationsUnlocked() {
516
0
  DCHECK(IsLocked());
517
0
  LOG_WITH_PREFIX(INFO) << "Dumping " << pending_operations_.size()
518
0
                                 << " pending operations.";
519
0
  for (const auto &round : pending_operations_) {
520
0
    LOG_WITH_PREFIX(INFO) << round->replicate_msg()->ShortDebugString();
521
0
  }
522
0
}
523
524
47.8k
Status ReplicaState::CancelPendingOperations() {
525
47.8k
  {
526
47.8k
    ThreadRestrictions::AssertWaitAllowed();
527
47.8k
    UniqueLock lock(update_lock_);
528
47.8k
    if (state_ != kShuttingDown) {
529
0
      return STATUS(IllegalState, "Can only wait for pending commits on kShuttingDown state.");
530
0
    }
531
47.8k
    if (pending_operations_.empty()) {
532
47.6k
      return Status::OK();
533
47.6k
    }
534
535
163
    LOG_WITH_PREFIX(INFO) << "Trying to abort " << pending_operations_.size()
536
163
                          << " pending operations because of shutdown.";
537
163
    auto abort_status = STATUS(Aborted, "Operation aborted");
538
163
    int i = 0;
539
391
    for (size_t idx = pending_operations_.size(); idx > 0; ) {
540
228
      --idx;
541
      // We cancel only operations whose applies have not yet been triggered.
542
228
      constexpr auto kLogAbortedOperationsNum = 10;
543
228
      if (++i <= kLogAbortedOperationsNum) {
544
228
        LOG_WITH_PREFIX(INFO) << "Aborting operation because of shutdown: "
545
228
                              << pending_operations_[idx]->replicate_msg()->ShortDebugString();
546
228
      }
547
228
      NotifyReplicationFinishedUnlocked(pending_operations_[idx], abort_status, OpId::kUnknownTerm,
548
228
                                        nullptr /* applied_op_ids */);
549
228
    }
550
163
  }
551
163
  return Status::OK();
552
163
}
553
554
struct PendingOperationsComparator {
555
0
  bool operator()(const ConsensusRoundPtr& lhs, int64_t rhs) const {
556
0
    return lhs->id().index < rhs;
557
0
  }
558
559
0
  bool operator()(int64_t lhs, const ConsensusRoundPtr& rhs) const {
560
0
    return lhs < rhs->id().index;
561
0
  }
562
};
563
564
4.29M
ReplicaState::PendingOperations::iterator ReplicaState::FindPendingOperation(int64_t index) {
565
4.29M
  if (pending_operations_.empty()) {
566
0
    return pending_operations_.end();
567
0
  }
568
569
4.29M
  size_t offset = index - pending_operations_.front()->id().index;
570
  // If index < pending_operations_.front()->id().index() then offset will be very big positive
571
  // number, so could check both bounds in one comparison.
572
4.29M
  if (offset >= pending_operations_.size()) {
573
112
    return pending_operations_.end();
574
112
  }
575
576
4.29M
  auto result = pending_operations_.begin() + offset;
577
4.29M
  DCHECK_EQ((**result).id().index, index);
578
4.29M
  return result;
579
4.29M
}
580
581
113
Status ReplicaState::AbortOpsAfterUnlocked(int64_t new_preceding_idx) {
582
113
  DCHECK(IsLocked());
583
113
  LOG_WITH_PREFIX(INFO)
584
113
      << "Aborting all operations after (but not including): "
585
113
      << new_preceding_idx << ". Current State: " << ToStringUnlocked();
586
587
113
  DCHECK_GE(new_preceding_idx, 0);
588
113
  yb::OpId new_preceding;
589
590
113
  auto preceding_op_iter = FindPendingOperation(new_preceding_idx);
591
592
  // Either the new preceding id is in the pendings set or it must be equal to the
593
  // committed index since we can't truncate already committed operations.
594
113
  if (preceding_op_iter != pending_operations_.end()) {
595
1
    new_preceding = (**preceding_op_iter).id();
596
1
    ++preceding_op_iter;
597
112
  } else {
598
112
    CHECK_EQ(new_preceding_idx, last_committed_op_id_.index);
599
112
    new_preceding = last_committed_op_id_;
600
112
    if (!pending_operations_.empty() &&
601
112
        pending_operations_.front()->id().index > new_preceding_idx) {
602
112
      preceding_op_iter = pending_operations_.begin();
603
112
    }
604
112
  }
605
606
  // This is the same as UpdateLastReceivedOpIdUnlocked() but we do it
607
  // here to avoid the bounds check, since we're breaking monotonicity.
608
113
  last_received_op_id_ = new_preceding;
609
113
  last_received_op_id_current_leader_ = OpId();
610
113
  next_index_ = new_preceding.index + 1;
611
612
113
  auto abort_status = STATUS(Aborted, "Operation aborted by new leader");
613
329
  for (auto it = pending_operations_.end(); it != preceding_op_iter;) {
614
216
    const ConsensusRoundPtr& round = *--it;
615
216
    auto op_id = OpId::FromPB(round->replicate_msg()->id());
616
216
    LOG_WITH_PREFIX(INFO) << "Aborting uncommitted operation due to leader change: "
617
216
                          << op_id << ", committed: " << last_committed_op_id_;
618
216
    NotifyReplicationFinishedUnlocked(round, abort_status, OpId::kUnknownTerm,
619
216
                                      nullptr /* applied_op_ids */);
620
216
  }
621
622
  // Clear entries from pending operations.
623
113
  pending_operations_.erase(preceding_op_iter, pending_operations_.end());
624
113
  CheckPendingOperationsHead();
625
626
113
  return Status::OK();
627
113
}
628
629
7.11M
Status ReplicaState::AddPendingOperation(const ConsensusRoundPtr& round, OperationMode mode) {
630
7.11M
  DCHECK(IsLocked());
631
632
7.11M
  auto op_type = round->replicate_msg()->op_type();
633
7.11M
  if (PREDICT_FALSE(state_ != kRunning)) {
634
    // Special case when we're configuring and this is a config change, refuse
635
    // everything else.
636
    // TODO: Don't require a NO_OP to get to kRunning state
637
0
    if (op_type != NO_OP) {
638
0
      return STATUS(IllegalState, "Cannot trigger prepare. Replica is not in kRunning state.");
639
0
    }
640
7.11M
  }
641
642
7.11M
  if (mode == OperationMode::kLeader) {
643
2.43M
    RETURN_NOT_OK(context_->CheckOperationAllowed(round->id(), op_type));
644
2.43M
  }
645
646
  // Mark pending configuration.
647
7.11M
  if (PREDICT_FALSE(op_type == CHANGE_CONFIG_OP)) {
648
8.47k
    DCHECK(round->replicate_msg()->change_config_record().has_old_config());
649
8.47k
    DCHECK(round->replicate_msg()->change_config_record().old_config().has_opid_index());
650
8.47k
    DCHECK(round->replicate_msg()->change_config_record().has_new_config());
651
8.47k
    DCHECK(!round->replicate_msg()->change_config_record().new_config().has_opid_index());
652
8.47k
    if (mode == OperationMode::kFollower) {
653
6.19k
      const RaftConfigPB& old_config = round->replicate_msg()->change_config_record().old_config();
654
6.19k
      const RaftConfigPB& new_config = round->replicate_msg()->change_config_record().new_config();
655
      // The leader has to mark the configuration as pending before it gets here
656
      // because the active configuration affects the replication queue.
657
      // Do one last sanity check.
658
6.19k
      Status s = CheckNoConfigChangePendingUnlocked();
659
6.19k
      if (PREDICT_FALSE(!s.ok() && !new_config.unsafe_config_change())) {
660
128
        s = s.CloneAndAppend(Format("New config: $0", new_config));
661
128
        LOG_WITH_PREFIX(INFO) << s;
662
128
        return s;
663
128
      }
664
      // Check if the pending Raft config has an OpId less than the committed
665
      // config. If so, this is a replay at startup in which the COMMIT
666
      // messages were delayed.
667
6.06k
      const RaftConfigPB& committed_config = GetCommittedConfigUnlocked();
668
6.06k
      if (round->replicate_msg()->id().index() > committed_config.opid_index()) {
669
6.06k
        CHECK_OK(SetPendingConfigUnlocked(new_config));
670
4
      } else {
671
4
        LOG_WITH_PREFIX(INFO)
672
4
            << "Ignoring setting pending config change with OpId "
673
4
            << round->replicate_msg()->id() << " because the committed config has OpId index "
674
4
            << committed_config.opid_index() << ". The config change we are ignoring is: "
675
4
            << "Old config: { " << old_config.ShortDebugString() << " }. "
676
4
            << "New config: { " << new_config.ShortDebugString() << " }";
677
4
      }
678
6.06k
    }
679
7.11M
  } else if (op_type == WRITE_OP) {
680
    // Leader registers an operation with RetryableRequests even before assigning an op id.
681
4.14M
    if (mode == OperationMode::kFollower && !retryable_requests_.Register(round)) {
682
0
      return STATUS(IllegalState, "Cannot register retryable request on follower");
683
0
    }
684
2.96M
  } else if (op_type == SPLIT_OP) {
685
0
    const auto& split_request = round->replicate_msg()->split_request();
686
0
    SCHECK_EQ(
687
0
        split_request.tablet_id(), cmeta_->tablet_id(), InvalidArgument,
688
0
        "Received split op for a different tablet.");
689
    // TODO(tsplit): if we get failures past this point we can't undo the tablet state.
690
    // Might be need some tool to be able to remove SPLIT_OP from Raft log.
691
0
  }
692
693
1.35k
  LOG_IF_WITH_PREFIX(
694
1.35k
      DFATAL,
695
1.35k
      !pending_operations_.empty() &&
696
1.35k
          pending_operations_.back()->id().index + 1 != round->id().index)
697
1.35k
      << "Adding operation with wrong index: " << AsString(round) << ", last op id: "
698
1.35k
      << AsString(pending_operations_.back()->id()) << ", operations: "
699
1.35k
      << AsString(pending_operations_);
700
7.11M
  pending_operations_.push_back(round);
701
7.11M
  CheckPendingOperationsHead();
702
7.11M
  return Status::OK();
703
7.11M
}
704
705
4.29M
scoped_refptr<ConsensusRound> ReplicaState::GetPendingOpByIndexOrNullUnlocked(int64_t index) {
706
4.29M
  DCHECK(IsLocked());
707
4.29M
  auto it = FindPendingOperation(index);
708
4.29M
  if (it == pending_operations_.end()) {
709
0
    return nullptr;
710
0
  }
711
4.29M
  return *it;
712
4.29M
}
713
714
Status ReplicaState::UpdateMajorityReplicatedUnlocked(
715
    const OpId& majority_replicated, OpId* committed_op_id,
716
15.1M
    bool* committed_op_id_changed, OpId* last_applied_op_id) {
717
15.1M
  DCHECK(IsLocked());
718
15.1M
  if (PREDICT_FALSE(state_ == kShuttingDown || state_ == kShutDown)) {
719
0
    return STATUS(ServiceUnavailable, "Cannot trigger apply. Replica is shutting down.");
720
0
  }
721
15.1M
  if (PREDICT_FALSE(state_ != kRunning)) {
722
0
    return STATUS(IllegalState, "Cannot trigger apply. Replica is not in kRunning state.");
723
0
  }
724
725
  // If the last committed operation was in the current term (the normal case)
726
  // then 'committed_op_id' is simply equal to majority replicated.
727
15.1M
  if (last_committed_op_id_.term == GetCurrentTermUnlocked()) {
728
15.0M
    *committed_op_id_changed = VERIFY_RESULT(AdvanceCommittedOpIdUnlocked(
729
15.0M
        majority_replicated, CouldStop::kFalse));
730
15.0M
    *committed_op_id = last_committed_op_id_;
731
15.0M
    *last_applied_op_id = GetLastAppliedOpIdUnlocked();
732
15.0M
    return Status::OK();
733
72.1k
  }
734
735
  // If the last committed operation is not in the current term (such as when
736
  // we change leaders) but 'majority_replicated' is then we can advance the
737
  // 'committed_op_id' too.
738
72.1k
  if (majority_replicated.term == GetCurrentTermUnlocked()) {
739
35.1k
    auto previous = last_committed_op_id_;
740
35.1k
    *committed_op_id_changed = VERIFY_RESULT(AdvanceCommittedOpIdUnlocked(
741
35.1k
        majority_replicated, CouldStop::kFalse));
742
35.1k
    *committed_op_id = last_committed_op_id_;
743
35.1k
    *last_applied_op_id = GetLastAppliedOpIdUnlocked();
744
35.1k
    LOG_WITH_PREFIX(INFO)
745
35.1k
        << "Advanced the committed_op_id across terms."
746
35.1k
        << " Last committed operation was: " << previous
747
35.1k
        << " New committed index is: " << last_committed_op_id_;
748
35.1k
    return Status::OK();
749
37.0k
  }
750
751
37.0k
  *committed_op_id = last_committed_op_id_;
752
37.0k
  *last_applied_op_id = GetLastAppliedOpIdUnlocked();
753
37.0k
  YB_LOG_EVERY_N_SECS(WARNING, 1) << LogPrefix()
754
13.1k
          << "Can't advance the committed index across term boundaries"
755
13.1k
          << " until operations from the current term are replicated."
756
13.1k
          << " Last committed operation was: " << last_committed_op_id_ << ","
757
13.1k
          << " New majority replicated is: " << majority_replicated << ","
758
13.1k
          << " Current term is: " << GetCurrentTermUnlocked();
759
760
37.0k
  return Status::OK();
761
37.0k
}
762
763
7.12M
void ReplicaState::SetLastCommittedIndexUnlocked(const yb::OpId& committed_op_id) {
764
7.12M
  DCHECK(IsLocked());
765
7.12M
  CHECK_GE(last_received_op_id_.index, committed_op_id.index);
766
7.12M
  last_committed_op_id_ = committed_op_id;
767
7.12M
  CheckPendingOperationsHead();
768
7.12M
}
769
770
88.7k
Status ReplicaState::InitCommittedOpIdUnlocked(const yb::OpId& committed_op_id) {
771
88.7k
  if (!last_committed_op_id_.empty()) {
772
0
    return STATUS_FORMAT(
773
0
        IllegalState,
774
0
        "Committed index already initialized to: $0, tried to set $1",
775
0
        last_committed_op_id_,
776
0
        committed_op_id);
777
0
  }
778
779
88.7k
  if (!pending_operations_.empty() &&
780
49
      committed_op_id.index >= pending_operations_.front()->id().index) {
781
0
    RETURN_NOT_OK(ApplyPendingOperationsUnlocked(committed_op_id, CouldStop::kFalse));
782
0
  }
783
784
88.7k
  SetLastCommittedIndexUnlocked(committed_op_id);
785
786
88.7k
  return Status::OK();
787
88.7k
}
788
789
22.0M
void ReplicaState::CheckPendingOperationsHead() const {
790
22.0M
  if (pending_operations_.empty() || last_committed_op_id_.empty() ||
791
22.0M
      pending_operations_.front()->id().index == last_committed_op_id_.index + 1) {
792
22.0M
    return;
793
22.0M
  }
794
795
4.79k
  LOG_WITH_PREFIX(FATAL)
796
4.79k
      << "The first pending operation's index is supposed to immediately follow the last committed "
797
4.79k
      << "operation's index. Committed op id: " << last_committed_op_id_ << ", pending operations: "
798
4.79k
      << AsString(pending_operations_);
799
4.79k
}
800
801
Result<bool> ReplicaState::AdvanceCommittedOpIdUnlocked(
802
35.5M
    const yb::OpId& committed_op_id, CouldStop could_stop) {
803
35.5M
  DCHECK(IsLocked());
804
  // If we already committed up to (or past) 'id' return.
805
  // This can happen in the case that multiple UpdateConsensus() calls end
806
  // up in the RPC queue at the same time, and then might get interleaved out
807
  // of order.
808
35.5M
  if (last_committed_op_id_.index >= committed_op_id.index) {
809
18.4E
    VLOG_WITH_PREFIX(1)
810
18.4E
        << "Already marked ops through " << last_committed_op_id_ << " as committed. "
811
18.4E
        << "Now trying to mark " << committed_op_id << " which would be a no-op.";
812
28.5M
    return false;
813
28.5M
  }
814
815
7.03M
  if (pending_operations_.empty()) {
816
0
    VLOG_WITH_PREFIX(1) << "No operations to mark as committed up to: "
817
0
                        << committed_op_id;
818
0
    return STATUS_FORMAT(
819
0
        NotFound,
820
0
        "No pending entries, requested to advance last committed OpId from $0 to $1, "
821
0
            "last received: $2",
822
0
        last_committed_op_id_, committed_op_id, last_received_op_id_);
823
0
  }
824
825
7.03M
  CheckPendingOperationsHead();
826
827
7.03M
  auto old_index = last_committed_op_id_.index;
828
829
7.03M
  auto status = ApplyPendingOperationsUnlocked(committed_op_id, could_stop);
830
7.03M
  if (!status.ok()) {
831
0
    return status;
832
0
  }
833
834
7.03M
  return last_committed_op_id_.index != old_index;
835
7.03M
}
836
837
Status ReplicaState::ApplyPendingOperationsUnlocked(
838
7.03M
    const yb::OpId& committed_op_id, CouldStop could_stop) {
839
7.03M
  DCHECK(IsLocked());
840
18.4E
  VLOG_WITH_PREFIX(1) << "Last triggered apply was: " <<  last_committed_op_id_;
841
842
  // Stop at the operation after the last one we must commit. This iterator by definition points to
843
  // the first entry greater than the committed index, so the entry preceding that must have the
844
  // OpId equal to committed_op_id.
845
846
7.03M
  auto prev_id = last_committed_op_id_;
847
7.03M
  yb::OpId max_allowed_op_id;
848
7.03M
  if (!safe_op_id_waiter_) {
849
0
    max_allowed_op_id.index = std::numeric_limits<int64_t>::max();
850
0
  }
851
7.03M
  auto leader_term = GetLeaderStateUnlocked().term;
852
853
7.03M
  OpIds applied_op_ids;
854
7.03M
  applied_op_ids.reserve(committed_op_id.index - prev_id.index);
855
856
7.03M
  Status status;
857
858
14.9M
  while (!pending_operations_.empty()) {
859
8.16M
    auto round = pending_operations_.front();
860
8.16M
    auto current_id = round->id();
861
862
8.16M
    if (PREDICT_TRUE(prev_id)) {
863
8.07M
      CHECK_OK(CheckOpInSequence(prev_id, current_id));
864
8.07M
    }
865
866
8.16M
    if (current_id.index > committed_op_id.index) {
867
259k
      break;
868
259k
    }
869
870
7.90M
    auto type = round->replicate_msg()->op_type();
871
872
    // For write operations we block rocksdb flush, until appropriate records are written to the
873
    // log file. So we could apply them before adding to log.
874
7.90M
    if (type == OperationType::WRITE_OP) {
875
4.47M
      if (could_stop && !context_->ShouldApplyWrite()) {
876
0
        YB_LOG_EVERY_N_SECS(WARNING, 5) << LogPrefix()
877
0
            << "Stop apply pending operations, because of write delay required, last applied: "
878
0
            << prev_id << " of " << committed_op_id;
879
0
        break;
880
0
      }
881
3.42M
    } else if (current_id.index > max_allowed_op_id.index ||
882
2.96M
               current_id.term > max_allowed_op_id.term) {
883
2.96M
      max_allowed_op_id = safe_op_id_waiter_->WaitForSafeOpIdToApply(current_id);
884
      // This situation should not happen. Prior to #4150 it could happen as follows.  Suppose
885
      // replica A was the leader of term 1 and added operations 1.100 and 1.101 to the WAL but
886
      // has not committed them yet.  Replica B decides that A is unavailable, starts and wins
887
      // term 2 election, and tries to replicate a no-op 2.100.  Replica A starts and wins term 3
888
      // election and then continues to replicate 1.100 and 1.101 and the new no-op 3.102.
889
      // Suppose an UpdateConsensus from replica A reaches replica B with a committed op id of
890
      // 3.102 (because perhaps some other replica has already received those entries).  Replica B
891
      // will abort 2.100 and try to apply all three operations. Suppose the last op id flushed to
892
      // the WAL on replica B is currently 2.100, and current_id is 1.101. Then
893
      // WaitForSafeOpIdToApply would return 2.100 immediately as 2.100 > 1.101 in terms of OpId
894
      // comparison, and we will throw an error here.
895
      //
896
      // However, after the #4150 fix we are resetting flushed op id using ResetLastSynchedOpId
897
      // when aborting operations during term changes, so WaitForSafeOpIdToApply would correctly
898
      // wait until 1.101 is written and return 1.101 or 3.102 in the above example.
899
2.96M
      if (max_allowed_op_id.index < current_id.index || max_allowed_op_id.term < current_id.term) {
900
0
        status = STATUS_FORMAT(
901
0
            RuntimeError,
902
0
            "Bad max allowed op id ($0), term/index must be no less than that of current op id "
903
0
            "($1)",
904
0
            max_allowed_op_id, current_id);
905
0
        break;
906
0
      }
907
7.90M
    }
908
909
7.90M
    pending_operations_.pop_front();
910
    // Set committed configuration.
911
7.90M
    if (PREDICT_FALSE(type == OperationType::CHANGE_CONFIG_OP)) {
912
10.1k
      ApplyConfigChangeUnlocked(round);
913
10.1k
    }
914
915
7.90M
    prev_id = current_id;
916
7.90M
    NotifyReplicationFinishedUnlocked(round, Status::OK(), leader_term, &applied_op_ids);
917
7.90M
  }
918
919
7.03M
  SetLastCommittedIndexUnlocked(prev_id);
920
921
7.03M
  applied_ops_tracker_(applied_op_ids);
922
923
7.03M
  return status;
924
7.03M
}
925
926
10.1k
void ReplicaState::ApplyConfigChangeUnlocked(const ConsensusRoundPtr& round) {
927
10.1k
  DCHECK(round->replicate_msg()->change_config_record().has_old_config());
928
10.1k
  DCHECK(round->replicate_msg()->change_config_record().has_new_config());
929
10.1k
  RaftConfigPB old_config = round->replicate_msg()->change_config_record().old_config();
930
10.1k
  RaftConfigPB new_config = round->replicate_msg()->change_config_record().new_config();
931
10.1k
  DCHECK(old_config.has_opid_index());
932
10.1k
  DCHECK(!new_config.has_opid_index());
933
934
10.1k
  const OpId& current_id = round->id();
935
936
10.1k
  if (PREDICT_FALSE(FLAGS_inject_delay_commit_pre_voter_to_voter_secs)) {
937
7
    bool is_transit_to_voter =
938
7
      CountVotersInTransition(old_config) > CountVotersInTransition(new_config);
939
7
    if (is_transit_to_voter) {
940
4
      LOG_WITH_PREFIX(INFO)
941
4
          << "Commit skipped as inject_delay_commit_pre_voter_to_voter_secs flag is set to true.\n"
942
4
          << "  Old config: { " << old_config.ShortDebugString() << " }.\n"
943
4
          << "  New config: { " << new_config.ShortDebugString() << " }";
944
4
      SleepFor(MonoDelta::FromSeconds(FLAGS_inject_delay_commit_pre_voter_to_voter_secs));
945
4
    }
946
7
  }
947
948
10.1k
  new_config.set_opid_index(current_id.index);
949
  // Check if the pending Raft config has an OpId less than the committed
950
  // config. If so, this is a replay at startup in which the COMMIT
951
  // messages were delayed.
952
10.1k
  const RaftConfigPB& committed_config = GetCommittedConfigUnlocked();
953
10.1k
  if (new_config.opid_index() > committed_config.opid_index()) {
954
10.1k
    LOG_WITH_PREFIX(INFO)
955
10.1k
        << "Committing config change with OpId "
956
10.1k
        << current_id << ". Old config: { " << old_config.ShortDebugString() << " }. "
957
10.1k
        << "New config: { " << new_config.ShortDebugString() << " }";
958
10.1k
    CHECK_OK(SetCommittedConfigUnlocked(new_config));
959
4
  } else {
960
4
    LOG_WITH_PREFIX(INFO)
961
4
        << "Ignoring commit of config change with OpId "
962
4
        << current_id << " because the committed config has OpId index "
963
4
        << committed_config.opid_index() << ". The config change we are ignoring is: "
964
4
        << "Old config: { " << old_config.ShortDebugString() << " }. "
965
4
        << "New config: { " << new_config.ShortDebugString() << " }";
966
4
  }
967
10.1k
}
968
969
87.4M
const yb::OpId& ReplicaState::GetCommittedOpIdUnlocked() const {
970
87.4M
  DCHECK(IsLocked());
971
87.4M
  return last_committed_op_id_;
972
87.4M
}
973
974
13.1M
RestartSafeCoarseMonoClock& ReplicaState::Clock() {
975
13.1M
  return retryable_requests_.Clock();
976
13.1M
}
977
978
0
RetryableRequestsCounts ReplicaState::TEST_CountRetryableRequests() {
979
0
  auto lock = LockForRead();
980
0
  return retryable_requests_.TEST_Counts();
981
0
}
982
983
3.04M
bool ReplicaState::AreCommittedAndCurrentTermsSameUnlocked() const {
984
3.04M
  int64_t term = GetCurrentTermUnlocked();
985
3.04M
  const auto& opid = GetCommittedOpIdUnlocked();
986
3.04M
  if (opid.term != term) {
987
0
    LOG(INFO) << "committed term=" << opid.term << ", current term=" << term;
988
0
    return false;
989
0
  }
990
3.04M
  return true;
991
3.04M
}
992
993
7.15M
void ReplicaState::UpdateLastReceivedOpIdUnlocked(const OpIdPB& op_id) {
994
7.15M
  DCHECK(IsLocked());
995
7.15M
  auto* trace = Trace::CurrentTrace();
996
371
  DCHECK(last_received_op_id_.term <= op_id.term() && last_received_op_id_.index <= op_id.index())
997
371
      << LogPrefix() << ": "
998
371
      << "Previously received OpId: " << last_received_op_id_
999
371
      << ", updated OpId: " << op_id.ShortDebugString()
1000
371
      << ", Trace:" << std::endl << (trace ? trace->DumpToString(true) : "No trace found");
1001
1002
7.15M
  last_received_op_id_ = yb::OpId::FromPB(op_id);
1003
7.15M
  last_received_op_id_current_leader_ = last_received_op_id_;
1004
7.15M
  next_index_ = op_id.index() + 1;
1005
7.15M
}
1006
1007
42.4M
const yb::OpId& ReplicaState::GetLastReceivedOpIdUnlocked() const {
1008
42.4M
  DCHECK(IsLocked());
1009
42.4M
  return last_received_op_id_;
1010
42.4M
}
1011
1012
10.2M
const yb::OpId& ReplicaState::GetLastReceivedOpIdCurLeaderUnlocked() const {
1013
10.2M
  DCHECK(IsLocked());
1014
10.2M
  return last_received_op_id_current_leader_;
1015
10.2M
}
1016
1017
10.2M
OpId ReplicaState::GetLastPendingOperationOpIdUnlocked() const {
1018
10.2M
  DCHECK(IsLocked());
1019
5.92M
  return pending_operations_.empty() ? OpId() : pending_operations_.back()->id();
1020
10.2M
}
1021
1022
2.73M
OpId ReplicaState::NewIdUnlocked() {
1023
2.73M
  DCHECK(IsLocked());
1024
2.73M
  return OpId(GetCurrentTermUnlocked(), next_index_++);
1025
2.73M
}
1026
1027
12
void ReplicaState::CancelPendingOperation(const OpId& id, bool should_exist) {
1028
12
  DCHECK(IsLocked());
1029
12
  CHECK_EQ(GetCurrentTermUnlocked(), id.term);
1030
12
  CHECK_EQ(next_index_, id.index + 1);
1031
12
  next_index_ = id.index;
1032
1033
  // We don't use UpdateLastReceivedOpIdUnlocked because we're actually
1034
  // updating it back to a lower value and we need to avoid the checks
1035
  // that method has.
1036
1037
  // This is only ok if we do _not_ release the lock after calling
1038
  // NewIdUnlocked() (which we don't in RaftConsensus::Replicate()).
1039
  // The below is correct because we always have starting NoOp, that has the same term and was
1040
  // replicated.
1041
12
  last_received_op_id_ = OpId(id.term, id.index - 1);
1042
12
  if (should_exist) {
1043
0
    CHECK(!pending_operations_.empty() && pending_operations_.back()->id() == id)
1044
0
        << "Pending operations should end with: " << id << ", but there are: "
1045
0
        << AsString(pending_operations_);
1046
0
    pending_operations_.pop_back();
1047
12
  } else {
1048
    // It could happen only in leader, while we add new operations, since we already have no op
1049
    // in this term, that would not be cancelled, we could be sure that previous operation
1050
    // has the same term.
1051
12
    OpId expected_last_op_id(id.term, id.index - 1);
1052
0
    CHECK(pending_operations_.empty() ||
1053
0
          (pending_operations_.back()->id() == expected_last_op_id))
1054
0
        << "Pending operations should end with: " << expected_last_op_id << ", but there are: "
1055
0
        << AsString(pending_operations_);
1056
12
  }
1057
12
}
1058
1059
1.77M
string ReplicaState::LogPrefix() const {
1060
1.77M
  auto role_and_term = cmeta_->GetRoleAndTerm();
1061
1.77M
  return Substitute("T $0 P $1 [term $2 $3]: ",
1062
1.77M
                    options_.tablet_id,
1063
1.77M
                    peer_uuid_,
1064
1.77M
                    role_and_term.second,
1065
1.77M
                    PeerRole_Name(role_and_term.first));
1066
1.77M
}
1067
1068
58.2k
ReplicaState::State ReplicaState::state() const {
1069
58.2k
  DCHECK(IsLocked());
1070
58.2k
  return state_;
1071
58.2k
}
1072
1073
0
string ReplicaState::ToString() const {
1074
0
  ThreadRestrictions::AssertWaitAllowed();
1075
0
  ReplicaState::UniqueLock lock(update_lock_);
1076
0
  return ToStringUnlocked();
1077
0
}
1078
1079
128k
string ReplicaState::ToStringUnlocked() const {
1080
128k
  DCHECK(IsLocked());
1081
128k
  return Format(
1082
128k
      "Replica: $0, State: $1, Role: $2, Watermarks: {Received: $3 Committed: $4} Leader: $5",
1083
128k
      peer_uuid_, state_, PeerRole_Name(GetActiveRoleUnlocked()),
1084
128k
      last_received_op_id_, last_committed_op_id_, last_received_op_id_current_leader_);
1085
128k
}
1086
1087
13.2M
Status ReplicaState::CheckOpInSequence(const yb::OpId& previous, const yb::OpId& current) {
1088
13.2M
  if (current.term < previous.term) {
1089
1
    return STATUS_FORMAT(
1090
1
        Corruption,
1091
1
        "New operation's term is not >= than the previous op's term. Current: $0. Previous: $1",
1092
1
        current, previous);
1093
1
  }
1094
1095
13.2M
  if (current.index != previous.index + 1) {
1096
1
    return STATUS_FORMAT(
1097
1
        Corruption,
1098
1
        "New operation's index does not follow the previous op's index. Current: $0. Previous: $1",
1099
1
        current, previous);
1100
1
  }
1101
13.2M
  return Status::OK();
1102
13.2M
}
1103
1104
void ReplicaState::UpdateOldLeaderLeaseExpirationOnNonLeaderUnlocked(
1105
10.2M
    const CoarseTimeLease& lease, const PhysicalComponentLease& ht_lease) {
1106
10.2M
  old_leader_lease_.TryUpdate(lease);
1107
10.2M
  old_leader_ht_lease_.TryUpdate(ht_lease);
1108
1109
  // Reset our lease, since we are non leader now. I.e. follower or candidate.
1110
10.2M
  auto existing_lease = majority_replicated_lease_expiration_;
1111
10.2M
  if (existing_lease != CoarseTimeLease::NoneValue()) {
1112
93.0k
    LOG_WITH_PREFIX(INFO)
1113
93.0k
        << "Reset our lease: " << MonoDelta(CoarseMonoClock::now() - existing_lease);
1114
93.0k
    majority_replicated_lease_expiration_ = CoarseTimeLease::NoneValue();
1115
93.0k
  }
1116
1117
10.2M
  auto existing_ht_lease = majority_replicated_ht_lease_expiration_.load(std::memory_order_acquire);
1118
10.2M
  if (existing_ht_lease != PhysicalComponentLease::NoneValue()) {
1119
4.86k
    LOG_WITH_PREFIX(INFO) << "Reset our ht lease: " << HybridTime::FromMicros(existing_ht_lease);
1120
4.86k
    majority_replicated_ht_lease_expiration_.store(PhysicalComponentLease::NoneValue(),
1121
4.86k
                                                   std::memory_order_release);
1122
4.86k
    cond_.notify_all();
1123
4.86k
  }
1124
10.2M
}
1125
1126
template <class Policy>
1127
40.5M
LeaderLeaseStatus ReplicaState::GetLeaseStatusUnlocked(Policy policy) const {
1128
40.5M
  DCHECK_EQ(GetActiveRoleUnlocked(), PeerRole::LEADER);
1129
1130
40.5M
  if (!policy.Enabled()) {
1131
1.37k
    return LeaderLeaseStatus::HAS_LEASE;
1132
1.37k
  }
1133
1134
40.5M
  if (GetActiveConfigUnlocked().peers_size() == 1) {
1135
    // It is OK that majority_replicated_lease_expiration_ might be undefined in this case, because
1136
    // we are only reading it in this function (as of 08/09/2017).
1137
1.32M
    return LeaderLeaseStatus::HAS_LEASE;
1138
1.32M
  }
1139
1140
39.2M
  if (!policy.OldLeaderLeaseExpired()) {
1141
10.7k
    return LeaderLeaseStatus::OLD_LEADER_MAY_HAVE_LEASE;
1142
10.7k
  }
1143
1144
39.2M
  if (policy.MajorityReplicatedLeaseExpired()) {
1145
11.5k
    return LeaderLeaseStatus::NO_MAJORITY_REPLICATED_LEASE;
1146
11.5k
  }
1147
1148
39.1M
  return LeaderLeaseStatus::HAS_LEASE;
1149
39.1M
}
_ZNK2yb9consensus12ReplicaState22GetLeaseStatusUnlockedINS0_26GetLeaderLeaseStatusPolicyEEENS0_17LeaderLeaseStatusET_
Line
Count
Source
1127
37.9M
LeaderLeaseStatus ReplicaState::GetLeaseStatusUnlocked(Policy policy) const {
1128
37.9M
  DCHECK_EQ(GetActiveRoleUnlocked(), PeerRole::LEADER);
1129
1130
37.9M
  if (!policy.Enabled()) {
1131
0
    return LeaderLeaseStatus::HAS_LEASE;
1132
0
  }
1133
1134
37.9M
  if (GetActiveConfigUnlocked().peers_size() == 1) {
1135
    // It is OK that majority_replicated_lease_expiration_ might be undefined in this case, because
1136
    // we are only reading it in this function (as of 08/09/2017).
1137
1.18M
    return LeaderLeaseStatus::HAS_LEASE;
1138
1.18M
  }
1139
1140
36.7M
  if (!policy.OldLeaderLeaseExpired()) {
1141
10.7k
    return LeaderLeaseStatus::OLD_LEADER_MAY_HAVE_LEASE;
1142
10.7k
  }
1143
1144
36.7M
  if (policy.MajorityReplicatedLeaseExpired()) {
1145
11.5k
    return LeaderLeaseStatus::NO_MAJORITY_REPLICATED_LEASE;
1146
11.5k
  }
1147
1148
36.7M
  return LeaderLeaseStatus::HAS_LEASE;
1149
36.7M
}
_ZNK2yb9consensus12ReplicaState22GetLeaseStatusUnlockedINS0_32GetHybridTimeLeaseStatusAtPolicyEEENS0_17LeaderLeaseStatusET_
Line
Count
Source
1127
2.60M
LeaderLeaseStatus ReplicaState::GetLeaseStatusUnlocked(Policy policy) const {
1128
2.60M
  DCHECK_EQ(GetActiveRoleUnlocked(), PeerRole::LEADER);
1129
1130
2.60M
  if (!policy.Enabled()) {
1131
1.37k
    return LeaderLeaseStatus::HAS_LEASE;
1132
1.37k
  }
1133
1134
2.59M
  if (GetActiveConfigUnlocked().peers_size() == 1) {
1135
    // It is OK that majority_replicated_lease_expiration_ might be undefined in this case, because
1136
    // we are only reading it in this function (as of 08/09/2017).
1137
145k
    return LeaderLeaseStatus::HAS_LEASE;
1138
145k
  }
1139
1140
2.45M
  if (!policy.OldLeaderLeaseExpired()) {
1141
0
    return LeaderLeaseStatus::OLD_LEADER_MAY_HAVE_LEASE;
1142
0
  }
1143
1144
2.45M
  if (policy.MajorityReplicatedLeaseExpired()) {
1145
7
    return LeaderLeaseStatus::NO_MAJORITY_REPLICATED_LEASE;
1146
7
  }
1147
1148
2.45M
  return LeaderLeaseStatus::HAS_LEASE;
1149
2.45M
}
1150
1151
// Policy that is used during leader lease calculation.
1152
struct GetLeaderLeaseStatusPolicy {
1153
  const ReplicaState* replica_state;
1154
  MonoDelta* remaining_old_leader_lease;
1155
  CoarseTimePoint* now;
1156
1157
  GetLeaderLeaseStatusPolicy(
1158
      const ReplicaState* replica_state_, MonoDelta* remaining_old_leader_lease_,
1159
      CoarseTimePoint* now_)
1160
      : replica_state(replica_state_), remaining_old_leader_lease(remaining_old_leader_lease_),
1161
37.9M
        now(now_) {
1162
37.9M
    if (remaining_old_leader_lease) {
1163
35.3M
      *remaining_old_leader_lease = 0s;
1164
35.3M
    }
1165
37.9M
  }
1166
1167
36.7M
  bool OldLeaderLeaseExpired() {
1168
36.7M
    const auto remaining_old_leader_lease_duration =
1169
36.7M
        replica_state->RemainingOldLeaderLeaseDuration(now);
1170
36.7M
    if (remaining_old_leader_lease_duration) {
1171
10.7k
      if (remaining_old_leader_lease) {
1172
10.5k
        *remaining_old_leader_lease = remaining_old_leader_lease_duration;
1173
10.5k
      }
1174
10.7k
      return false;
1175
10.7k
    }
1176
36.7M
    return true;
1177
36.7M
  }
1178
1179
36.7M
  bool MajorityReplicatedLeaseExpired() {
1180
36.7M
    return replica_state->MajorityReplicatedLeaderLeaseExpired(now);
1181
36.7M
  }
1182
1183
37.9M
  bool Enabled() {
1184
37.9M
    return true;
1185
37.9M
  }
1186
};
1187
1188
36.7M
bool ReplicaState::MajorityReplicatedLeaderLeaseExpired(CoarseTimePoint* now) const {
1189
36.7M
  if (majority_replicated_lease_expiration_ == CoarseTimePoint()) {
1190
0
    return true;
1191
0
  }
1192
1193
36.7M
  if (*now == CoarseTimePoint()) {
1194
36.7M
    *now = CoarseMonoClock::Now();
1195
36.7M
  }
1196
1197
36.7M
  return *now >= majority_replicated_lease_expiration_;
1198
36.7M
}
1199
1200
LeaderLeaseStatus ReplicaState::GetLeaderLeaseStatusUnlocked(
1201
37.9M
    MonoDelta* remaining_old_leader_lease, CoarseTimePoint* now) const {
1202
37.9M
  if (now == nullptr) {
1203
22.8M
    CoarseTimePoint local_now;
1204
22.8M
    return GetLeaseStatusUnlocked(GetLeaderLeaseStatusPolicy(
1205
22.8M
        this, remaining_old_leader_lease, &local_now));
1206
22.8M
  }
1207
15.1M
  return GetLeaseStatusUnlocked(GetLeaderLeaseStatusPolicy(this, remaining_old_leader_lease, now));
1208
15.1M
}
1209
1210
2.45M
bool ReplicaState::MajorityReplicatedHybridTimeLeaseExpiredAt(MicrosTime hybrid_time) const {
1211
2.45M
  return hybrid_time >= majority_replicated_ht_lease_expiration_;
1212
2.45M
}
1213
1214
struct GetHybridTimeLeaseStatusAtPolicy {
1215
  const ReplicaState* replica_state;
1216
  MicrosTime micros_time;
1217
1218
  GetHybridTimeLeaseStatusAtPolicy(const ReplicaState* rs, MicrosTime ht)
1219
2.60M
      : replica_state(rs), micros_time(ht) {}
1220
1221
2.45M
  bool OldLeaderLeaseExpired() {
1222
2.45M
    return micros_time > replica_state->old_leader_ht_lease().expiration;
1223
2.45M
  }
1224
1225
2.45M
  bool MajorityReplicatedLeaseExpired() {
1226
2.45M
    return replica_state->MajorityReplicatedHybridTimeLeaseExpiredAt(micros_time);
1227
2.45M
  }
1228
1229
2.60M
  bool Enabled() {
1230
2.60M
    return FLAGS_ht_lease_duration_ms != 0;
1231
2.60M
  }
1232
};
1233
1234
LeaderLeaseStatus ReplicaState::GetHybridTimeLeaseStatusAtUnlocked(
1235
2.59M
    MicrosTime micros_time) const {
1236
2.59M
  return GetLeaseStatusUnlocked(GetHybridTimeLeaseStatusAtPolicy(this, micros_time));
1237
2.59M
}
1238
1239
36.8M
MonoDelta ReplicaState::RemainingOldLeaderLeaseDuration(CoarseTimePoint* now) const {
1240
36.8M
  MonoDelta result;
1241
36.8M
  if (old_leader_lease_) {
1242
21.0k
    CoarseTimePoint now_local;
1243
21.0k
    if (!now) {
1244
10.2k
      now = &now_local;
1245
10.2k
    }
1246
21.0k
    *now = CoarseMonoClock::Now();
1247
1248
21.0k
    if (*now > old_leader_lease_.expiration) {
1249
      // Reset the old leader lease expiration time so that we don't have to check it anymore.
1250
309
      old_leader_lease_.Reset();
1251
20.7k
    } else {
1252
20.7k
      result = old_leader_lease_.expiration - *now;
1253
20.7k
    }
1254
21.0k
  }
1255
1256
36.8M
  return result;
1257
36.8M
}
1258
1259
Result<MicrosTime> ReplicaState::MajorityReplicatedHtLeaseExpiration(
1260
33.4M
    MicrosTime min_allowed, CoarseTimePoint deadline) const {
1261
33.4M
  if (FLAGS_ht_lease_duration_ms == 0) {
1262
2.81k
    return kMaxHybridTimePhysicalMicros;
1263
2.81k
  }
1264
1265
33.4M
  auto result = majority_replicated_ht_lease_expiration_.load(std::memory_order_acquire);
1266
33.4M
  if (result >= min_allowed) { // Fast path
1267
33.4M
    return result;
1268
33.4M
  }
1269
1270
8.67k
  if (result != PhysicalComponentLease::NoneValue()) {
1271
    // Slow path
1272
0
    UniqueLock l(update_lock_);
1273
0
    auto predicate = [this, &result, min_allowed] {
1274
0
      result = majority_replicated_ht_lease_expiration_.load(std::memory_order_acquire);
1275
0
      return result >= min_allowed || result == PhysicalComponentLease::NoneValue();
1276
0
    };
1277
0
    if (deadline == CoarseTimePoint::max()) {
1278
0
      cond_.wait(l, predicate);
1279
0
    } else if (!cond_.wait_until(l, deadline, predicate)) {
1280
0
      return STATUS_FORMAT(TimedOut, "Timed out waiting leader lease: $0", min_allowed);
1281
0
    }
1282
8.67k
  }
1283
1284
8.67k
  if (result == PhysicalComponentLease::NoneValue()) {
1285
5
    static const Status kNotLeaderStatus = STATUS(IllegalState, "Not a leader");
1286
5
    return kNotLeaderStatus;
1287
5
  }
1288
1289
8.67k
  return result;
1290
8.67k
}
1291
1292
void ReplicaState::SetMajorityReplicatedLeaseExpirationUnlocked(
1293
    const MajorityReplicatedData& majority_replicated_data,
1294
15.1M
    EnumBitSet<SetMajorityReplicatedLeaseExpirationFlag> flags) {
1295
15.1M
  majority_replicated_lease_expiration_ = majority_replicated_data.leader_lease_expiration;
1296
15.1M
  majority_replicated_ht_lease_expiration_.store(majority_replicated_data.ht_lease_expiration,
1297
15.1M
                                                 std::memory_order_release);
1298
1299
15.1M
  if (flags.Test(SetMajorityReplicatedLeaseExpirationFlag::kResetOldLeaderLease)) {
1300
4.80k
    LOG_WITH_PREFIX(INFO)
1301
4.80k
        << "Revoked old leader " << old_leader_lease_.holder_uuid << " lease: "
1302
4.80k
        << MonoDelta(old_leader_lease_.expiration - CoarseMonoClock::now());
1303
4.80k
    old_leader_lease_.Reset();
1304
4.80k
  }
1305
1306
15.1M
  if (flags.Test(SetMajorityReplicatedLeaseExpirationFlag::kResetOldLeaderHtLease)) {
1307
4.85k
    LOG_WITH_PREFIX(INFO)
1308
4.85k
        << "Revoked old leader " << old_leader_ht_lease_.holder_uuid << " ht lease: "
1309
4.85k
        << HybridTime::FromMicros(old_leader_ht_lease_.expiration);
1310
4.85k
    old_leader_ht_lease_.Reset();
1311
4.85k
  }
1312
1313
15.1M
  CoarseTimePoint now;
1314
15.1M
  RefreshLeaderStateCacheUnlocked(&now);
1315
15.1M
  cond_.notify_all();
1316
15.1M
}
1317
1318
7.29k
uint64_t ReplicaState::OnDiskSize() const {
1319
7.29k
  return cmeta_->on_disk_size();
1320
7.29k
}
1321
1322
1.54M
bool ReplicaState::RegisterRetryableRequest(const ConsensusRoundPtr& round) {
1323
1.54M
  return retryable_requests_.Register(round);
1324
1.54M
}
1325
1326
6.40M
OpId ReplicaState::MinRetryableRequestOpId() {
1327
6.40M
  UniqueLock lock;
1328
6.40M
  auto status = LockForUpdate(&lock);
1329
6.40M
  if (!status.ok()) {
1330
0
    return OpId::Min(); // return minimal op id, that prevents log from cleaning
1331
0
  }
1332
6.40M
  return retryable_requests_.CleanExpiredReplicatedAndGetMinOpId();
1333
6.40M
}
1334
1335
void ReplicaState::NotifyReplicationFinishedUnlocked(
1336
    const ConsensusRoundPtr& round, const Status& status, int64_t leader_term,
1337
7.90M
    OpIds* applied_op_ids) {
1338
7.90M
  round->NotifyReplicationFinished(status, leader_term, applied_op_ids);
1339
1340
7.90M
  retryable_requests_.ReplicationFinished(*round->replicate_msg(), status, leader_term);
1341
7.90M
}
1342
1343
15.7M
consensus::LeaderState ReplicaState::RefreshLeaderStateCacheUnlocked(CoarseTimePoint* now) const {
1344
15.7M
  auto result = GetLeaderStateUnlocked(LeaderLeaseCheckMode::NEED_LEASE, now);
1345
15.7M
  LeaderStateCache cache;
1346
15.7M
  if (result.status == LeaderStatus::LEADER_AND_READY) {
1347
15.1M
    cache.Set(result.status, result.term, majority_replicated_lease_expiration_);
1348
605k
  } else if (result.status == LeaderStatus::LEADER_BUT_OLD_LEADER_MAY_HAVE_LEASE) {
1349
5.25k
    cache.Set(result.status, result.remaining_old_leader_lease.ToMicroseconds(),
1350
5.25k
              *now + result.remaining_old_leader_lease);
1351
599k
  } else {
1352
599k
    cache.Set(result.status, 0 /* extra_value */, CoarseTimePoint::max());
1353
599k
  }
1354
1355
15.7M
  leader_state_cache_.store(cache, boost::memory_order_release);
1356
1357
15.7M
  return result;
1358
15.7M
}
1359
1360
70.1k
void ReplicaState::SetLeaderNoOpCommittedUnlocked(bool value) {
1361
70.1k
  LOG_WITH_PREFIX(INFO)
1362
70.1k
      << __func__ << "(" << value << "), committed: " << GetCommittedOpIdUnlocked()
1363
70.1k
      << ", received: " << GetLastReceivedOpIdUnlocked();
1364
1365
70.1k
  leader_no_op_committed_ = value;
1366
70.1k
  CoarseTimePoint now;
1367
70.1k
  RefreshLeaderStateCacheUnlocked(&now);
1368
70.1k
}
1369
1370
}  // namespace consensus
1371
}  // namespace yb