YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/consensus/raft_consensus.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include "yb/consensus/raft_consensus.h"
34
35
#include <algorithm>
36
#include <memory>
37
#include <mutex>
38
39
#include <boost/optional.hpp>
40
#include <gflags/gflags.h>
41
42
#include "yb/common/wire_protocol.h"
43
44
#include "yb/consensus/consensus.pb.h"
45
#include "yb/consensus/consensus_context.h"
46
#include "yb/consensus/consensus_peers.h"
47
#include "yb/consensus/consensus_round.h"
48
#include "yb/consensus/leader_election.h"
49
#include "yb/consensus/log.h"
50
#include "yb/consensus/peer_manager.h"
51
#include "yb/consensus/quorum_util.h"
52
#include "yb/consensus/replica_state.h"
53
#include "yb/consensus/state_change_context.h"
54
55
#include "yb/gutil/casts.h"
56
#include "yb/gutil/map-util.h"
57
#include "yb/gutil/stringprintf.h"
58
59
#include "yb/rpc/messenger.h"
60
#include "yb/rpc/periodic.h"
61
#include "yb/rpc/rpc_controller.h"
62
63
#include "yb/server/clock.h"
64
65
#include "yb/util/debug-util.h"
66
#include "yb/util/debug/long_operation_tracker.h"
67
#include "yb/util/debug/trace_event.h"
68
#include "yb/util/enums.h"
69
#include "yb/util/flag_tags.h"
70
#include "yb/util/format.h"
71
#include "yb/util/logging.h"
72
#include "yb/util/metrics.h"
73
#include "yb/util/net/dns_resolver.h"
74
#include "yb/util/random.h"
75
#include "yb/util/random_util.h"
76
#include "yb/util/scope_exit.h"
77
#include "yb/util/status_format.h"
78
#include "yb/util/status_log.h"
79
#include "yb/util/threadpool.h"
80
#include "yb/util/tostring.h"
81
#include "yb/util/trace.h"
82
#include "yb/util/tsan_util.h"
83
#include "yb/util/url-coding.h"
84
85
using namespace std::literals;
86
using namespace std::placeholders;
87
88
DEFINE_int32(raft_heartbeat_interval_ms, yb::NonTsanVsTsan(500, 1000),
89
             "The heartbeat interval for Raft replication. The leader produces heartbeats "
90
             "to followers at this interval. The followers expect a heartbeat at this interval "
91
             "and consider a leader to have failed if it misses several in a row.");
92
TAG_FLAG(raft_heartbeat_interval_ms, advanced);
93
94
DEFINE_double(leader_failure_max_missed_heartbeat_periods, 6.0,
95
              "Maximum heartbeat periods that the leader can fail to heartbeat in before we "
96
              "consider the leader to be failed. The total failure timeout in milliseconds is "
97
              "raft_heartbeat_interval_ms times leader_failure_max_missed_heartbeat_periods. "
98
              "The value passed to this flag may be fractional.");
99
TAG_FLAG(leader_failure_max_missed_heartbeat_periods, advanced);
100
101
DEFINE_int32(leader_failure_exp_backoff_max_delta_ms, 20 * 1000,
102
             "Maximum time to sleep in between leader election retries, in addition to the "
103
             "regular timeout. When leader election fails the interval in between retries "
104
             "increases exponentially, up to this value.");
105
TAG_FLAG(leader_failure_exp_backoff_max_delta_ms, experimental);
106
107
DEFINE_bool(enable_leader_failure_detection, true,
108
            "Whether to enable failure detection of tablet leaders. If enabled, attempts will be "
109
            "made to elect a follower as a new leader when the leader is detected to have failed.");
110
TAG_FLAG(enable_leader_failure_detection, unsafe);
111
112
DEFINE_test_flag(bool, do_not_start_election_test_only, false,
113
                 "Do not start election even if leader failure is detected. ");
114
TAG_FLAG(TEST_do_not_start_election_test_only, runtime);
115
116
DEFINE_bool(evict_failed_followers, true,
117
            "Whether to evict followers from the Raft config that have fallen "
118
            "too far behind the leader's log to catch up normally or have been "
119
            "unreachable by the leader for longer than "
120
            "follower_unavailable_considered_failed_sec");
121
TAG_FLAG(evict_failed_followers, advanced);
122
123
DEFINE_test_flag(bool, follower_reject_update_consensus_requests, false,
124
                 "Whether a follower will return an error for all UpdateConsensus() requests.");
125
126
DEFINE_test_flag(bool, follower_pause_update_consensus_requests, false,
127
                 "Whether a follower will pause all UpdateConsensus() requests.");
128
129
DEFINE_test_flag(int32, follower_reject_update_consensus_requests_seconds, 0,
130
                 "Whether a follower will return an error for all UpdateConsensus() requests for "
131
                 "the first TEST_follower_reject_update_consensus_requests_seconds seconds after "
132
                 "the Consensus objet is created.");
133
134
DEFINE_test_flag(bool, follower_fail_all_prepare, false,
135
                 "Whether a follower will fail preparing all operations.");
136
137
DEFINE_int32(after_stepdown_delay_election_multiplier, 5,
138
             "After a peer steps down as a leader, the factor with which to multiply "
139
             "leader_failure_max_missed_heartbeat_periods to get the delay time before starting a "
140
             "new election.");
141
TAG_FLAG(after_stepdown_delay_election_multiplier, advanced);
142
TAG_FLAG(after_stepdown_delay_election_multiplier, hidden);
143
144
DECLARE_int32(memory_limit_warn_threshold_percentage);
145
146
DEFINE_test_flag(int32, inject_delay_leader_change_role_append_secs, 0,
147
                 "Amount of time to delay leader from sending replicate of change role.");
148
149
DEFINE_test_flag(double, return_error_on_change_config, 0.0,
150
                 "Fraction of the time when ChangeConfig will return an error.");
151
152
METRIC_DEFINE_counter(tablet, follower_memory_pressure_rejections,
153
                      "Follower Memory Pressure Rejections",
154
                      yb::MetricUnit::kRequests,
155
                      "Number of RPC requests rejected due to "
156
                      "memory pressure while FOLLOWER.");
157
METRIC_DEFINE_gauge_int64(tablet, raft_term,
158
                          "Current Raft Consensus Term",
159
                          yb::MetricUnit::kUnits,
160
                          "Current Term of the Raft Consensus algorithm. This number increments "
161
                          "each time a leader election is started.");
162
163
METRIC_DEFINE_lag(tablet, follower_lag_ms,
164
                  "Follower lag from leader",
165
                  "The amount of time since the last UpdateConsensus request from the "
166
                  "leader.", {0, yb::AggregationFunction::kMax} /* optional_args */);
167
168
METRIC_DEFINE_gauge_int64(tablet, is_raft_leader,
169
                          "Is tablet raft leader",
170
                          yb::MetricUnit::kUnits,
171
                          "Keeps track whether tablet is raft leader"
172
                          "1 indicates that the tablet is raft leader");
173
174
METRIC_DEFINE_coarse_histogram(
175
  table, dns_resolve_latency_during_update_raft_config,
176
  "yb.consensus.RaftConsensus.UpdateRaftConfig DNS Resolve",
177
  yb::MetricUnit::kMicroseconds,
178
  "Microseconds spent resolving DNS requests during RaftConsensus::UpdateRaftConfig");
179
180
DEFINE_int32(leader_lease_duration_ms, yb::consensus::kDefaultLeaderLeaseDurationMs,
181
             "Leader lease duration. A leader keeps establishing a new lease or extending the "
182
             "existing one with every UpdateConsensus. A new server is not allowed to serve as a "
183
             "leader (i.e. serve up-to-date read requests or acknowledge write requests) until a "
184
             "lease of this duration has definitely expired on the old leader's side.");
185
186
DEFINE_int32(ht_lease_duration_ms, 2000,
187
             "Hybrid time leader lease duration. A leader keeps establishing a new lease or "
188
             "extending the existing one with every UpdateConsensus. A new server is not allowed "
189
             "to add entries to RAFT log until a lease of the old leader is expired. 0 to disable."
190
             );
191
192
DEFINE_int32(min_leader_stepdown_retry_interval_ms,
193
             20 * 1000,
194
             "Minimum amount of time between successive attempts to perform the leader stepdown "
195
             "for the same combination of tablet and intended (target) leader. This is needed "
196
             "to avoid infinite leader stepdown loops when the current leader never has a chance "
197
             "to update the intended leader with its latest records.");
198
199
DEFINE_bool(use_preelection, true, "Whether to use pre election, before doing actual election.");
200
201
DEFINE_int32(temporary_disable_preelections_timeout_ms, 10 * 60 * 1000,
202
             "If some of nodes does not support preelections, then we disable them for this "
203
             "amount of time.");
204
205
DEFINE_test_flag(bool, pause_update_replica, false,
206
                 "Pause RaftConsensus::UpdateReplica processing before snoozing failure detector.");
207
208
DEFINE_test_flag(bool, pause_update_majority_replicated, false,
209
                 "Pause RaftConsensus::UpdateMajorityReplicated.");
210
211
DEFINE_test_flag(int32, log_change_config_every_n, 1,
212
                 "How often to log change config information. "
213
                 "Used to reduce the number of lines being printed for change config requests "
214
                 "when a test simulates a failure that would generate a log of these requests.");
215
216
DEFINE_bool(enable_lease_revocation, true, "Enables lease revocation mechanism");
217
218
DEFINE_bool(quick_leader_election_on_create, false,
219
            "Do we trigger quick leader elections on table creation.");
220
TAG_FLAG(quick_leader_election_on_create, advanced);
221
TAG_FLAG(quick_leader_election_on_create, hidden);
222
223
DEFINE_bool(
224
    stepdown_disable_graceful_transition, false,
225
    "During a leader stepdown, disable graceful leadership transfer "
226
    "to an up to date peer");
227
228
DEFINE_bool(
229
    raft_disallow_concurrent_outstanding_report_failure_tasks, true,
230
    "If true, only submit a new report failure task if there is not one outstanding.");
231
TAG_FLAG(raft_disallow_concurrent_outstanding_report_failure_tasks, advanced);
232
TAG_FLAG(raft_disallow_concurrent_outstanding_report_failure_tasks, hidden);
233
234
DEFINE_int64(protege_synchronization_timeout_ms, 1000,
235
             "Timeout to synchronize protege before performing step down. "
236
             "0 to disable synchronization.");
237
238
namespace yb {
239
namespace consensus {
240
241
using log::LogEntryBatch;
242
using rpc::PeriodicTimer;
243
using std::shared_ptr;
244
using std::unique_ptr;
245
using std::weak_ptr;
246
using strings::Substitute;
247
using tserver::TabletServerErrorPB;
248
249
namespace {
250
251
4.94k
const RaftPeerPB* FindPeer(const RaftConfigPB& active_config, const std::string& uuid) {
252
10.3k
  for (const RaftPeerPB& peer : active_config.peers()) {
253
10.3k
    if (peer.permanent_uuid() == uuid) {
254
4.94k
      return &peer;
255
4.94k
    }
256
10.3k
  }
257
258
0
  return nullptr;
259
4.94k
}
260
261
// Helper function to check if the op is a non-Operation op.
262
7.99M
bool IsConsensusOnlyOperation(OperationType op_type) {
263
7.99M
  return op_type == NO_OP || op_type == CHANGE_CONFIG_OP;
264
7.99M
}
265
266
// Helper to check if the op is Change Config op.
267
187k
bool IsChangeConfigOperation(OperationType op_type) {
268
187k
  return op_type == CHANGE_CONFIG_OP;
269
187k
}
270
271
class NonTrackedRoundCallback : public ConsensusRoundCallback {
272
 public:
273
  explicit NonTrackedRoundCallback(ConsensusRound* round, const StdStatusCallback& callback)
274
113k
      : round_(round), callback_(callback) {
275
113k
  }
276
277
38.2k
  void AddedToLeader(const OpId& op_id, const OpId& committed_op_id) override {
278
38.2k
    auto& replicate_msg = *round_->replicate_msg();
279
38.2k
    op_id.ToPB(replicate_msg.mutable_id());
280
38.2k
    committed_op_id.ToPB(replicate_msg.mutable_committed_op_id());
281
38.2k
  }
282
283
  void ReplicationFinished(
284
113k
      const Status& status, int64_t leader_term, OpIds* applied_op_ids) override {
285
113k
    down_cast<RaftConsensus*>(round_->consensus())->NonTrackedRoundReplicationFinished(
286
113k
        round_, callback_, status);
287
113k
  }
288
289
 private:
290
  ConsensusRound* round_;
291
  StdStatusCallback callback_;
292
};
293
294
} // namespace
295
296
std::unique_ptr<ConsensusRoundCallback> MakeNonTrackedRoundCallback(
297
113k
    ConsensusRound* round, const StdStatusCallback& callback) {
298
113k
  return std::make_unique<NonTrackedRoundCallback>(round, callback);
299
113k
}
300
301
struct RaftConsensus::LeaderRequest {
302
  std::string leader_uuid;
303
  yb::OpId preceding_op_id;
304
  yb::OpId committed_op_id;
305
  ReplicateMsgs messages;
306
  // The positional index of the first message selected to be appended, in the
307
  // original leader's request message sequence.
308
  int64_t first_message_idx;
309
310
  std::string OpsRangeString() const;
311
};
312
313
shared_ptr<RaftConsensus> RaftConsensus::Create(
314
    const ConsensusOptions& options,
315
    std::unique_ptr<ConsensusMetadata> cmeta,
316
    const RaftPeerPB& local_peer_pb,
317
    const scoped_refptr<MetricEntity>& table_metric_entity,
318
    const scoped_refptr<MetricEntity>& tablet_metric_entity,
319
    const scoped_refptr<server::Clock>& clock,
320
    ConsensusContext* consensus_context,
321
    rpc::Messenger* messenger,
322
    rpc::ProxyCache* proxy_cache,
323
    const scoped_refptr<log::Log>& log,
324
    const shared_ptr<MemTracker>& server_mem_tracker,
325
    const shared_ptr<MemTracker>& parent_mem_tracker,
326
    const Callback<void(std::shared_ptr<StateChangeContext> context)> mark_dirty_clbk,
327
    TableType table_type,
328
    ThreadPool* raft_pool,
329
    RetryableRequests* retryable_requests,
330
88.7k
    MultiRaftManager* multi_raft_manager) {
331
332
88.7k
  auto rpc_factory = std::make_unique<RpcPeerProxyFactory>(
333
88.7k
    messenger, proxy_cache, local_peer_pb.cloud_info());
334
335
  // The message queue that keeps track of which operations need to be replicated
336
  // where.
337
88.7k
  auto queue = std::make_unique<PeerMessageQueue>(
338
88.7k
      tablet_metric_entity,
339
88.7k
      log,
340
88.7k
      server_mem_tracker,
341
88.7k
      parent_mem_tracker,
342
88.7k
      local_peer_pb,
343
88.7k
      options.tablet_id,
344
88.7k
      clock,
345
88.7k
      consensus_context,
346
88.7k
      raft_pool->NewToken(ThreadPool::ExecutionMode::SERIAL));
347
348
88.7k
  DCHECK(local_peer_pb.has_permanent_uuid());
349
88.7k
  const string& peer_uuid = local_peer_pb.permanent_uuid();
350
351
  // A single Raft thread pool token is shared between RaftConsensus and
352
  // PeerManager. Because PeerManager is owned by RaftConsensus, it receives a
353
  // raw pointer to the token, to emphasize that RaftConsensus is responsible
354
  // for destroying the token.
355
88.7k
  unique_ptr<ThreadPoolToken> raft_pool_token(raft_pool->NewToken(
356
88.7k
      ThreadPool::ExecutionMode::CONCURRENT));
357
358
  // A manager for the set of peers that actually send the operations both remotely
359
  // and to the local wal.
360
88.7k
  auto peer_manager = std::make_unique<PeerManager>(
361
88.7k
      options.tablet_id,
362
88.7k
      peer_uuid,
363
88.7k
      rpc_factory.get(),
364
88.7k
      queue.get(),
365
88.7k
      raft_pool_token.get(),
366
88.7k
      multi_raft_manager);
367
368
88.7k
  return std::make_shared<RaftConsensus>(
369
88.7k
      options,
370
88.7k
      std::move(cmeta),
371
88.7k
      std::move(rpc_factory),
372
88.7k
      std::move(queue),
373
88.7k
      std::move(peer_manager),
374
88.7k
      std::move(raft_pool_token),
375
88.7k
      table_metric_entity,
376
88.7k
      tablet_metric_entity,
377
88.7k
      peer_uuid,
378
88.7k
      clock,
379
88.7k
      consensus_context,
380
88.7k
      log,
381
88.7k
      parent_mem_tracker,
382
88.7k
      mark_dirty_clbk,
383
88.7k
      table_type,
384
88.7k
      retryable_requests);
385
88.7k
}
386
387
RaftConsensus::RaftConsensus(
388
    const ConsensusOptions& options, std::unique_ptr<ConsensusMetadata> cmeta,
389
    std::unique_ptr<PeerProxyFactory> proxy_factory,
390
    std::unique_ptr<PeerMessageQueue> queue,
391
    std::unique_ptr<PeerManager> peer_manager,
392
    std::unique_ptr<ThreadPoolToken> raft_pool_token,
393
    const scoped_refptr<MetricEntity>& table_metric_entity,
394
    const scoped_refptr<MetricEntity>& tablet_metric_entity,
395
    const std::string& peer_uuid, const scoped_refptr<server::Clock>& clock,
396
    ConsensusContext* consensus_context, const scoped_refptr<log::Log>& log,
397
    shared_ptr<MemTracker> parent_mem_tracker,
398
    Callback<void(std::shared_ptr<StateChangeContext> context)> mark_dirty_clbk,
399
    TableType table_type,
400
    RetryableRequests* retryable_requests)
401
    : raft_pool_token_(std::move(raft_pool_token)),
402
      log_(log),
403
      clock_(clock),
404
      peer_proxy_factory_(std::move(proxy_factory)),
405
      peer_manager_(std::move(peer_manager)),
406
      queue_(std::move(queue)),
407
      rng_(GetRandomSeed32()),
408
      withhold_votes_until_(MonoTime::Min()),
409
      step_down_check_tracker_(&peer_proxy_factory_->messenger()->scheduler()),
410
      mark_dirty_clbk_(std::move(mark_dirty_clbk)),
411
      shutdown_(false),
412
      follower_memory_pressure_rejections_(tablet_metric_entity->FindOrCreateCounter(
413
          &METRIC_follower_memory_pressure_rejections)),
414
      term_metric_(tablet_metric_entity->FindOrCreateGauge(&METRIC_raft_term,
415
                                                    cmeta->current_term())),
416
      follower_last_update_time_ms_metric_(
417
          tablet_metric_entity->FindOrCreateAtomicMillisLag(&METRIC_follower_lag_ms)),
418
      is_raft_leader_metric_(tablet_metric_entity->FindOrCreateGauge(&METRIC_is_raft_leader,
419
                                                              static_cast<int64_t>(0))),
420
      parent_mem_tracker_(std::move(parent_mem_tracker)),
421
      table_type_(table_type),
422
      update_raft_config_dns_latency_(
423
          METRIC_dns_resolve_latency_during_update_raft_config.Instantiate(table_metric_entity)),
424
      split_parent_tablet_id_(
425
88.7k
          cmeta->has_split_parent_tablet_id() ? cmeta->split_parent_tablet_id() : "") {
426
88.7k
  DCHECK_NOTNULL(log_.get());
427
428
88.7k
  if (PREDICT_FALSE(FLAGS_TEST_follower_reject_update_consensus_requests_seconds > 0)) {
429
18
    withold_replica_updates_until_ = MonoTime::Now() +
430
18
        MonoDelta::FromSeconds(FLAGS_TEST_follower_reject_update_consensus_requests_seconds);
431
18
  }
432
433
88.7k
  state_ = std::make_unique<ReplicaState>(
434
88.7k
      options,
435
88.7k
      peer_uuid,
436
88.7k
      std::move(cmeta),
437
88.7k
      DCHECK_NOTNULL(consensus_context),
438
88.7k
      this,
439
88.7k
      retryable_requests,
440
88.7k
      std::bind(&PeerMessageQueue::TrackOperationsMemory, queue_.get(), _1));
441
442
88.7k
  peer_manager_->SetConsensus(this);
443
88.7k
}
444
445
47.8k
RaftConsensus::~RaftConsensus() {
446
47.8k
  Shutdown();
447
47.8k
}
448
449
88.7k
Status RaftConsensus::Start(const ConsensusBootstrapInfo& info) {
450
88.7k
  RETURN_NOT_OK(ExecuteHook(PRE_START));
451
452
  // Capture a weak_ptr reference into the functor so it can safely handle
453
  // outliving the consensus instance.
454
88.7k
  std::weak_ptr<RaftConsensus> w = shared_from_this();
455
88.7k
  failure_detector_ = PeriodicTimer::Create(
456
88.7k
      peer_proxy_factory_->messenger(),
457
8.02k
      [w]() {
458
8.02k
        if (auto consensus = w.lock()) {
459
8.02k
          consensus->ReportFailureDetected();
460
8.02k
        }
461
8.02k
      },
462
88.7k
      MinimumElectionTimeout());
463
464
88.7k
  {
465
88.7k
    ReplicaState::UniqueLock lock;
466
88.7k
    RETURN_NOT_OK(state_->LockForStart(&lock));
467
88.7k
    state_->ClearLeaderUnlocked();
468
469
88.7k
    RETURN_NOT_OK_PREPEND(state_->StartUnlocked(info.last_id),
470
88.7k
                          "Unable to start RAFT ReplicaState");
471
472
88.7k
    LOG_WITH_PREFIX(INFO) << "Replica starting. Triggering "
473
88.7k
                          << info.orphaned_replicates.size()
474
88.7k
                          << " pending operations. Active config: "
475
88.7k
                          << state_->GetActiveConfigUnlocked().ShortDebugString();
476
129
    for (const auto& replicate : info.orphaned_replicates) {
477
129
      ReplicateMsgPtr replicate_ptr = std::make_shared<ReplicateMsg>(*replicate);
478
129
      RETURN_NOT_OK(StartReplicaOperationUnlocked(replicate_ptr, HybridTime::kInvalid));
479
129
    }
480
481
88.7k
    RETURN_NOT_OK(state_->InitCommittedOpIdUnlocked(yb::OpId::FromPB(info.last_committed_id)));
482
483
88.7k
    queue_->Init(state_->GetLastReceivedOpIdUnlocked());
484
88.7k
  }
485
486
88.7k
  {
487
88.7k
    ReplicaState::UniqueLock lock;
488
88.7k
    RETURN_NOT_OK(state_->LockForConfigChange(&lock));
489
490
    // If this is the first term expire the FD immediately so that we have a fast first
491
    // election, otherwise we just let the timer expire normally.
492
88.7k
    MonoDelta initial_delta = MonoDelta();
493
88.7k
    if (state_->GetCurrentTermUnlocked() == 0) {
494
      // The failure detector is initialized to a low value to trigger an early election
495
      // (unless someone else requested a vote from us first, which resets the
496
      // election timer). We do it this way instead of immediately running an
497
      // election to get a higher likelihood of enough servers being available
498
      // when the first one attempts an election to avoid multiple election
499
      // cycles on startup, while keeping that "waiting period" random. If there is only one peer,
500
      // trigger an election right away.
501
87.4k
      if (PREDICT_TRUE(FLAGS_enable_leader_failure_detection)) {
502
87.2k
        LOG_WITH_PREFIX(INFO) << "Consensus starting up: Expiring fail detector timer "
503
87.2k
                                 "to make a prompt election more likely";
504
        // Gating quick leader elections on table creation since prompter leader elections are
505
        // more likely to fail due to uninitialized peers or conflicting elections, which could
506
        // have unforseen consequences.
507
87.2k
        if (FLAGS_quick_leader_election_on_create) {
508
0
          initial_delta = (state_->GetCommittedConfigUnlocked().peers_size() == 1) ?
509
0
              MonoDelta::kZero :
510
0
              MonoDelta::FromMilliseconds(rng_.Uniform(FLAGS_raft_heartbeat_interval_ms));
511
0
        }
512
87.2k
      }
513
87.4k
    }
514
88.7k
    RETURN_NOT_OK(BecomeReplicaUnlocked(std::string(), initial_delta));
515
88.7k
  }
516
517
88.7k
  RETURN_NOT_OK(ExecuteHook(POST_START));
518
519
  // The context tracks that the current caller does not hold the lock for consensus state.
520
  // So mark dirty callback, e.g., consensus->ConsensusState() for master consensus callback of
521
  // SysCatalogStateChanged, can get the lock when needed.
522
88.7k
  auto context = std::make_shared<StateChangeContext>(StateChangeReason::CONSENSUS_STARTED, false);
523
  // Report become visible to the Master.
524
88.7k
  MarkDirty(context);
525
526
88.7k
  return Status::OK();
527
88.7k
}
528
529
5.39k
bool RaftConsensus::IsRunning() const {
530
5.39k
  auto lock = state_->LockForRead();
531
5.39k
  return state_->state() == ReplicaState::kRunning;
532
5.39k
}
533
534
86
Status RaftConsensus::EmulateElection() {
535
86
  ReplicaState::UniqueLock lock;
536
86
  RETURN_NOT_OK(state_->LockForConfigChange(&lock));
537
538
86
  LOG_WITH_PREFIX(INFO) << "Emulating election...";
539
540
  // Assume leadership of new term.
541
86
  RETURN_NOT_OK(IncrementTermUnlocked());
542
86
  SetLeaderUuidUnlocked(state_->GetPeerUuid());
543
86
  return BecomeLeaderUnlocked();
544
86
}
545
546
76.9k
Status RaftConsensus::DoStartElection(const LeaderElectionData& data, PreElected preelected) {
547
76.9k
  TRACE_EVENT2("consensus", "RaftConsensus::StartElection",
548
76.9k
               "peer", peer_uuid(),
549
76.9k
               "tablet", tablet_id());
550
18.4E
  VLOG_WITH_PREFIX_AND_FUNC(1) << data.ToString();
551
76.9k
  if (FLAGS_TEST_do_not_start_election_test_only) {
552
8
    LOG(INFO) << "Election start skipped as TEST_do_not_start_election_test_only flag "
553
8
                 "is set to true.";
554
8
    return Status::OK();
555
8
  }
556
557
  // If pre-elections disabled or we already won pre-election then start regular election,
558
  // otherwise pre-election is started.
559
  // Pre-elections could be disable via flag, or temporarily if some nodes do not support them.
560
76.9k
  auto preelection = ANNOTATE_UNPROTECTED_READ(FLAGS_use_preelection) && !preelected &&
561
40.8k
                     disable_pre_elections_until_ < CoarseMonoClock::now();
562
40.8k
  const char* election_name = preelection ? "pre-election" : "election";
563
564
76.9k
  LeaderElectionPtr election;
565
76.9k
  {
566
76.9k
    ReplicaState::UniqueLock lock;
567
76.9k
    RETURN_NOT_OK(state_->LockForConfigChange(&lock));
568
569
76.9k
    if (data.initial_election && state_->GetCurrentTermUnlocked() != 0) {
570
0
      LOG_WITH_PREFIX(INFO) << "Not starting initial " << election_name << " -- non zero term";
571
0
      return Status::OK();
572
0
    }
573
574
76.9k
    PeerRole active_role = state_->GetActiveRoleUnlocked();
575
76.9k
    if (active_role == PeerRole::LEADER) {
576
37
      LOG_WITH_PREFIX(INFO) << "Not starting " << election_name << " -- already leader";
577
37
      return Status::OK();
578
37
    }
579
76.8k
    if (active_role == PeerRole::LEARNER || active_role == PeerRole::READ_REPLICA) {
580
104
      LOG_WITH_PREFIX(INFO) << "Not starting " << election_name << " -- role is " << active_role
581
104
                            << ", pending = " << state_->IsConfigChangePendingUnlocked()
582
104
                            << ", active_role=" << active_role;
583
104
      return Status::OK();
584
104
    }
585
76.7k
    if (PREDICT_FALSE(active_role == PeerRole::NON_PARTICIPANT)) {
586
0
      VLOG_WITH_PREFIX(1) << "Not starting " << election_name << " -- non participant";
587
      // Avoid excessive election noise while in this state.
588
10
      SnoozeFailureDetector(DO_NOT_LOG);
589
10
      return STATUS_FORMAT(
590
10
          IllegalState,
591
10
          "Not starting $0: Node is currently a non-participant in the raft config: $1",
592
10
          election_name, state_->GetActiveConfigUnlocked());
593
10
    }
594
595
    // Default is to start the election now. But if we are starting a pending election, see if
596
    // there is an op id pending upon indeed and if it has been committed to the log. The op id
597
    // could have been cleared if the pending election has already been started or another peer
598
    // has jumped before we can start.
599
76.7k
    bool start_now = true;
600
76.7k
    if (data.pending_commit) {
601
0
      const auto required_id = !data.must_be_committed_opid
602
0
          ? state_->GetPendingElectionOpIdUnlocked() : data.must_be_committed_opid;
603
0
      const Status advance_committed_index_status = ResultToStatus(
604
0
          state_->AdvanceCommittedOpIdUnlocked(required_id, CouldStop::kFalse));
605
0
      if (!advance_committed_index_status.ok()) {
606
0
        LOG(WARNING) << "Starting an " << election_name << " but the latest committed OpId is not "
607
0
                        "present in this peer's log: "
608
0
                     << required_id << ". " << "Status: " << advance_committed_index_status;
609
0
      }
610
0
      start_now = required_id.index <= state_->GetCommittedOpIdUnlocked().index;
611
0
    }
612
613
76.8k
    if (start_now) {
614
76.8k
      if (state_->HasLeaderUnlocked()) {
615
11.4k
        LOG_WITH_PREFIX(INFO) << "Fail of leader " << state_->GetLeaderUuidUnlocked()
616
11.4k
                              << " detected. Triggering leader " << election_name
617
11.4k
                              << ", mode=" << data.mode;
618
65.3k
      } else {
619
65.3k
        LOG_WITH_PREFIX(INFO) << "Triggering leader " << election_name << ", mode=" << data.mode;
620
65.3k
      }
621
622
      // Snooze to avoid the election timer firing again as much as possible.
623
      // We do not disable the election timer while running an election.
624
76.8k
      MonoDelta timeout = LeaderElectionExpBackoffDeltaUnlocked();
625
76.8k
      SnoozeFailureDetector(ALLOW_LOGGING, timeout);
626
627
75.9k
      election = VERIFY_RESULT(CreateElectionUnlocked(data, timeout, PreElection(preelection)));
628
18.4E
    } else if (data.pending_commit && !data.must_be_committed_opid.empty()) {
629
      // Queue up the pending op id if specified.
630
0
      state_->SetPendingElectionOpIdUnlocked(data.must_be_committed_opid);
631
0
      LOG_WITH_PREFIX(INFO)
632
0
          << "Leader " << election_name << " is pending upon log commitment of OpId "
633
0
          << data.must_be_committed_opid;
634
18.4E
    } else {
635
18.4E
      LOG_WITH_PREFIX(INFO) << "Ignore " << __func__ << " existing wait on op id";
636
18.4E
    }
637
76.7k
  }
638
639
  // Start the election outside the lock.
640
75.9k
  if (election) {
641
75.9k
    election->Run();
642
75.9k
  }
643
644
75.8k
  return Status::OK();
645
76.7k
}
646
647
Result<LeaderElectionPtr> RaftConsensus::CreateElectionUnlocked(
648
76.8k
    const LeaderElectionData& data, MonoDelta timeout, PreElection preelection) {
649
76.8k
  int64_t new_term;
650
76.8k
  if (preelection) {
651
40.7k
    new_term = state_->GetCurrentTermUnlocked() + 1;
652
36.0k
  } else {
653
    // Increment the term.
654
36.0k
    RETURN_NOT_OK(IncrementTermUnlocked());
655
35.1k
    new_term = state_->GetCurrentTermUnlocked();
656
35.1k
  }
657
658
75.9k
  const RaftConfigPB& active_config = state_->GetActiveConfigUnlocked();
659
75.9k
  LOG_WITH_PREFIX(INFO) << "Starting " << (preelection ? "pre-" : "") << "election with config: "
660
75.9k
                        << active_config.ShortDebugString();
661
662
  // Initialize the VoteCounter.
663
75.9k
  auto num_voters = CountVoters(active_config);
664
75.9k
  auto majority_size = MajoritySize(num_voters);
665
666
  // Vote for ourselves.
667
75.9k
  if (!preelection) {
668
    // TODO: Consider using a separate Mutex for voting, which must sync to disk.
669
35.1k
    RETURN_NOT_OK(state_->SetVotedForCurrentTermUnlocked(state_->GetPeerUuid()));
670
35.1k
  }
671
672
75.9k
  auto counter = std::make_unique<VoteCounter>(num_voters, majority_size);
673
75.9k
  bool duplicate;
674
75.9k
  RETURN_NOT_OK(counter->RegisterVote(state_->GetPeerUuid(), ElectionVote::kGranted, &duplicate));
675
16
  CHECK(!duplicate) << state_->LogPrefix()
676
16
                    << "Inexplicable duplicate self-vote for term "
677
16
                    << state_->GetCurrentTermUnlocked();
678
679
75.9k
  VoteRequestPB request;
680
75.9k
  request.set_ignore_live_leader(data.mode == ElectionMode::ELECT_EVEN_IF_LEADER_IS_ALIVE);
681
75.9k
  request.set_candidate_uuid(state_->GetPeerUuid());
682
75.9k
  request.set_candidate_term(new_term);
683
75.9k
  request.set_tablet_id(state_->GetOptions().tablet_id);
684
75.9k
  request.set_preelection(preelection);
685
75.9k
  state_->GetLastReceivedOpIdUnlocked().ToPB(
686
75.9k
      request.mutable_candidate_status()->mutable_last_received());
687
688
75.9k
  LeaderElectionPtr result(new LeaderElection(
689
75.9k
      active_config,
690
75.9k
      peer_proxy_factory_.get(),
691
75.9k
      request,
692
75.9k
      std::move(counter),
693
75.9k
      timeout,
694
75.9k
      preelection,
695
75.9k
      data.suppress_vote_request,
696
75.9k
      std::bind(&RaftConsensus::ElectionCallback, shared_from_this(), data, _1)));
697
698
75.9k
  if (!preelection) {
699
    // Clear the pending election op id so that we won't start the same pending election again.
700
    // Pre-election does not change state, so should not do it in this case.
701
35.1k
    state_->ClearPendingElectionOpIdUnlocked();
702
35.1k
  }
703
704
75.9k
  return result;
705
75.9k
}
706
707
8
Status RaftConsensus::WaitUntilLeaderForTests(const MonoDelta& timeout) {
708
8
  MonoTime deadline = MonoTime::Now();
709
8
  deadline.AddDelta(timeout);
710
349
  while (MonoTime::Now().ComesBefore(deadline)) {
711
349
    if (GetLeaderStatus() == LeaderStatus::LEADER_AND_READY) {
712
8
      return Status::OK();
713
8
    }
714
341
    SleepFor(MonoDelta::FromMilliseconds(10));
715
341
  }
716
717
0
  return STATUS(TimedOut, Substitute("Peer $0 is not leader of tablet $1 after $2. Role: $3",
718
8
                                     peer_uuid(), tablet_id(), timeout.ToString(), role()));
719
8
}
720
721
5.46k
string RaftConsensus::ServersInTransitionMessage() {
722
5.46k
  string err_msg;
723
5.46k
  const RaftConfigPB& active_config = state_->GetActiveConfigUnlocked();
724
5.46k
  const RaftConfigPB& committed_config = state_->GetCommittedConfigUnlocked();
725
5.46k
  auto servers_in_transition = CountServersInTransition(active_config);
726
5.46k
  auto committed_servers_in_transition = CountServersInTransition(committed_config);
727
5.46k
  LOG(INFO) << Substitute("Active config has $0 and committed has $1 servers in transition.",
728
5.46k
                          servers_in_transition, committed_servers_in_transition);
729
5.46k
  if (servers_in_transition != 0 || committed_servers_in_transition != 0) {
730
101
    err_msg = Substitute("Leader not ready to step down as there are $0 active config peers"
731
101
                         " in transition, $1 in committed. Configs:\nactive=$2\ncommit=$3",
732
101
                         servers_in_transition, committed_servers_in_transition,
733
101
                         active_config.ShortDebugString(), committed_config.ShortDebugString());
734
101
    LOG(INFO) << err_msg;
735
101
  }
736
5.46k
  return err_msg;
737
5.46k
}
738
739
4.84k
Status RaftConsensus::StartStepDownUnlocked(const RaftPeerPB& peer, bool graceful) {
740
4.84k
  auto election_state = std::make_shared<RunLeaderElectionState>();
741
4.84k
  election_state->proxy = peer_proxy_factory_->NewProxy(peer);
742
4.84k
  election_state->req.set_originator_uuid(state_->GetPeerUuid());
743
4.84k
  election_state->req.set_dest_uuid(peer.permanent_uuid());
744
4.84k
  election_state->req.set_tablet_id(state_->GetOptions().tablet_id);
745
4.84k
  election_state->rpc.set_invoke_callback_mode(rpc::InvokeCallbackMode::kThreadPoolHigh);
746
4.84k
  election_state->proxy->RunLeaderElectionAsync(
747
4.84k
      &election_state->req, &election_state->resp, &election_state->rpc,
748
4.84k
      std::bind(&RaftConsensus::RunLeaderElectionResponseRpcCallback, this,
749
4.84k
          election_state));
750
751
4.84k
  LOG_WITH_PREFIX(INFO) << "Transferring leadership to " << peer.permanent_uuid();
752
753
4.84k
  return BecomeReplicaUnlocked(
754
4.58k
      graceful ? std::string() : peer.permanent_uuid(), MonoDelta());
755
4.84k
}
756
757
50
void RaftConsensus::CheckDelayedStepDown(const Status& status) {
758
50
  ReplicaState::UniqueLock lock;
759
50
  auto lock_status = state_->LockForConfigChange(&lock);
760
50
  if (!lock_status.ok()) {
761
10
    LOG_WITH_PREFIX(INFO) << "Failed to check delayed election: " << lock_status;
762
10
    return;
763
10
  }
764
765
40
  if (state_->GetCurrentTermUnlocked() != delayed_step_down_.term) {
766
32
    return;
767
32
  }
768
769
8
  const auto& config = state_->GetActiveConfigUnlocked();
770
8
  const auto* peer = FindPeer(config, delayed_step_down_.protege);
771
8
  if (peer) {
772
8
    LOG_WITH_PREFIX(INFO) << "Step down in favor on not synchronized protege: "
773
8
                          << delayed_step_down_.protege;
774
8
    WARN_NOT_OK(StartStepDownUnlocked(*peer, delayed_step_down_.graceful),
775
8
                "Start step down failed");
776
0
  } else {
777
0
    LOG_WITH_PREFIX(INFO) << "Failed to synchronize with protege " << delayed_step_down_.protege
778
0
                          << " and cannot find it in config: " << config.ShortDebugString();
779
0
    delayed_step_down_.term = OpId::kUnknownTerm;
780
0
  }
781
8
}
782
783
5.88k
Status RaftConsensus::StepDown(const LeaderStepDownRequestPB* req, LeaderStepDownResponsePB* resp) {
784
5.88k
  TRACE_EVENT0("consensus", "RaftConsensus::StepDown");
785
5.88k
  ReplicaState::UniqueLock lock;
786
5.88k
  RETURN_NOT_OK(state_->LockForConfigChange(&lock));
787
788
  // A sanity check that this request was routed to the correct RaftConsensus.
789
5.88k
  const auto& tablet_id = req->tablet_id();
790
5.88k
  if (tablet_id != this->tablet_id()) {
791
0
    resp->mutable_error()->set_code(TabletServerErrorPB::UNKNOWN_ERROR);
792
0
    const auto msg = Format(
793
0
        "Received a leader stepdown operation for wrong tablet id: $0, must be: $1",
794
0
        tablet_id, this->tablet_id());
795
0
    LOG_WITH_PREFIX(ERROR) << msg;
796
0
    StatusToPB(STATUS(IllegalState, msg), resp->mutable_error()->mutable_status());
797
0
    return Status::OK();
798
0
  }
799
800
5.88k
  if (state_->GetActiveRoleUnlocked() != PeerRole::LEADER) {
801
418
    resp->mutable_error()->set_code(TabletServerErrorPB::NOT_THE_LEADER);
802
418
    StatusToPB(STATUS(IllegalState, "Not currently leader"),
803
418
               resp->mutable_error()->mutable_status());
804
    // We return OK so that the tablet service won't overwrite the error code.
805
418
    return Status::OK();
806
418
  }
807
808
  // The leader needs to be ready to perform a step down. There should be no PRE_VOTER in both
809
  // active and committed configs - ENG-557.
810
5.46k
  const string err_msg = ServersInTransitionMessage();
811
5.46k
  if (!err_msg.empty()) {
812
101
    resp->mutable_error()->set_code(TabletServerErrorPB::LEADER_NOT_READY_TO_STEP_DOWN);
813
101
    StatusToPB(STATUS(IllegalState, err_msg), resp->mutable_error()->mutable_status());
814
101
    return Status::OK();
815
101
  }
816
817
5.36k
  std::string new_leader_uuid;
818
  // If a new leader is nominated, find it among peers to send RunLeaderElection request.
819
  // See https://ramcloud.stanford.edu/~ongaro/thesis.pdf, section 3.10 for this mechanism
820
  // to transfer the leadership.
821
5.36k
  const bool forced = (req->has_force_step_down() && req->force_step_down());
822
5.36k
  if (req->has_new_leader_uuid()) {
823
5.09k
    new_leader_uuid = req->new_leader_uuid();
824
5.09k
    if (!forced && !queue_->CanPeerBecomeLeader(new_leader_uuid)) {
825
185
      resp->mutable_error()->set_code(TabletServerErrorPB::LEADER_NOT_READY_TO_STEP_DOWN);
826
185
      StatusToPB(
827
185
          STATUS(IllegalState, "Suggested peer is not caught up yet"),
828
185
          resp->mutable_error()->mutable_status());
829
      // We return OK so that the tablet service won't overwrite the error code.
830
185
      return Status::OK();
831
185
    }
832
5.18k
  }
833
834
5.18k
  bool graceful_stepdown = false;
835
5.18k
  if (new_leader_uuid.empty() && !FLAGS_stepdown_disable_graceful_transition &&
836
272
      !(req->has_disable_graceful_transition() && req->disable_graceful_transition())) {
837
271
    new_leader_uuid = queue_->GetUpToDatePeer();
838
271
    LOG_WITH_PREFIX(INFO) << "Selected up to date candidate protege leader [" << new_leader_uuid
839
271
                          << "]";
840
271
    graceful_stepdown = true;
841
271
  }
842
843
5.18k
  const auto& local_peer_uuid = state_->GetPeerUuid();
844
5.18k
  if (!new_leader_uuid.empty()) {
845
5.18k
    const auto leadership_transfer_description =
846
5.18k
        Format("tablet $0 from $1 to $2", tablet_id, local_peer_uuid, new_leader_uuid);
847
5.18k
    if (!forced && new_leader_uuid == protege_leader_uuid_ && election_lost_by_protege_at_) {
848
337
      const MonoDelta time_since_election_loss_by_protege =
849
337
          MonoTime::Now() - election_lost_by_protege_at_;
850
337
      if (time_since_election_loss_by_protege.ToMilliseconds() <
851
327
              FLAGS_min_leader_stepdown_retry_interval_ms) {
852
327
        LOG_WITH_PREFIX(INFO) << "Unable to execute leadership transfer for "
853
327
                              << leadership_transfer_description
854
327
                              << " because the intended leader already lost an election only "
855
327
                              << ToString(time_since_election_loss_by_protege) << " ago (within "
856
327
                              << FLAGS_min_leader_stepdown_retry_interval_ms << " ms).";
857
327
        if (req->has_new_leader_uuid()) {
858
327
          LOG_WITH_PREFIX(INFO) << "Rejecting leader stepdown request for "
859
327
                                << leadership_transfer_description;
860
327
          resp->mutable_error()->set_code(TabletServerErrorPB::LEADER_NOT_READY_TO_STEP_DOWN);
861
327
          resp->set_time_since_election_failure_ms(
862
327
              time_since_election_loss_by_protege.ToMilliseconds());
863
327
          StatusToPB(
864
327
              STATUS(IllegalState, "Suggested peer lost an election recently"),
865
327
              resp->mutable_error()->mutable_status());
866
          // We return OK so that the tablet service won't overwrite the error code.
867
327
          return Status::OK();
868
0
        } else {
869
          // we were attempting a graceful transfer of our own choice
870
          // which is no longer possible
871
0
          new_leader_uuid.clear();
872
0
        }
873
327
      }
874
10
      election_lost_by_protege_at_ = MonoTime();
875
10
    }
876
5.18k
  }
877
878
4.85k
  if (!new_leader_uuid.empty()) {
879
4.85k
    const auto* peer = FindPeer(state_->GetActiveConfigUnlocked(), new_leader_uuid);
880
4.85k
    if (peer && peer->member_type() == PeerMemberType::VOTER) {
881
4.85k
      auto timeout_ms = FLAGS_protege_synchronization_timeout_ms;
882
4.85k
      if (timeout_ms != 0 &&
883
4.85k
          queue_->PeerLastReceivedOpId(new_leader_uuid) < GetLatestOpIdFromLog()) {
884
50
        delayed_step_down_ = DelayedStepDown {
885
50
          .term = state_->GetCurrentTermUnlocked(),
886
50
          .protege = new_leader_uuid,
887
50
          .graceful = graceful_stepdown,
888
50
        };
889
50
        LOG_WITH_PREFIX(INFO) << "Delay step down: " << delayed_step_down_.ToString();
890
50
        step_down_check_tracker_.Schedule(
891
50
            std::bind(&RaftConsensus::CheckDelayedStepDown, this, _1),
892
50
            1ms * timeout_ms);
893
50
        return Status::OK();
894
50
      }
895
896
4.80k
      return StartStepDownUnlocked(*peer, graceful_stepdown);
897
4.80k
    }
898
899
3
    LOG_WITH_PREFIX(WARNING) << "New leader " << new_leader_uuid << " not found.";
900
3
    if (req->has_new_leader_uuid()) {
901
0
      resp->mutable_error()->set_code(TabletServerErrorPB::LEADER_NOT_READY_TO_STEP_DOWN);
902
0
      StatusToPB(
903
0
          STATUS(IllegalState, "New leader not found among peers"),
904
0
          resp->mutable_error()->mutable_status());
905
      // We return OK so that the tablet service won't overwrite the error code.
906
0
      return Status::OK();
907
3
    } else {
908
      // we were attempting a graceful transfer of our own choice
909
      // which is no longer possible
910
3
      new_leader_uuid.clear();
911
3
    }
912
3
  }
913
914
4
  if (graceful_stepdown) {
915
4
    new_leader_uuid.clear();
916
4
  }
917
4
  RETURN_NOT_OK(BecomeReplicaUnlocked(new_leader_uuid, MonoDelta()));
918
919
4
  return Status::OK();
920
4
}
921
922
44
Status RaftConsensus::ElectionLostByProtege(const std::string& election_lost_by_uuid) {
923
44
  if (election_lost_by_uuid.empty()) {
924
0
    return STATUS(InvalidArgument, "election_lost_by_uuid could not be empty");
925
0
  }
926
927
44
  auto start_election = false;
928
44
  {
929
44
    ReplicaState::UniqueLock lock;
930
44
    RETURN_NOT_OK(state_->LockForConfigChange(&lock));
931
44
    if (election_lost_by_uuid == protege_leader_uuid_) {
932
36
      LOG_WITH_PREFIX(INFO) << "Our protege " << election_lost_by_uuid
933
36
                            << ", lost election. Has leader: "
934
36
                            << state_->HasLeaderUnlocked();
935
36
      withhold_election_start_until_.store(MonoTime::Min(), std::memory_order_relaxed);
936
36
      election_lost_by_protege_at_ = MonoTime::Now();
937
938
36
      start_election = !state_->HasLeaderUnlocked();
939
36
    }
940
44
  }
941
942
44
  if (start_election) {
943
35
    return StartElection({ElectionMode::NORMAL_ELECTION});
944
35
  }
945
946
9
  return Status::OK();
947
9
}
948
949
4.90k
void RaftConsensus::WithholdElectionAfterStepDown(const std::string& protege_uuid) {
950
4.90k
  DCHECK(state_->IsLocked());
951
4.90k
  protege_leader_uuid_ = protege_uuid;
952
4.90k
  auto timeout = MonoDelta::FromMilliseconds(
953
4.90k
      FLAGS_leader_failure_max_missed_heartbeat_periods *
954
4.90k
      FLAGS_raft_heartbeat_interval_ms);
955
4.90k
  if (!protege_uuid.empty()) {
956
    // Actually we have 2 kinds of step downs.
957
    // 1) We step down in favor of some protege.
958
    // 2) We step down because term was advanced or just started.
959
    // In second case we should not withhold election for a long period of time.
960
4.58k
    timeout *= FLAGS_after_stepdown_delay_election_multiplier;
961
4.58k
  }
962
4.90k
  auto deadline = MonoTime::Now() + timeout;
963
0
  VLOG(2) << "Withholding election for " << timeout;
964
4.90k
  withhold_election_start_until_.store(deadline, std::memory_order_release);
965
4.90k
  election_lost_by_protege_at_ = MonoTime();
966
4.90k
}
967
968
void RaftConsensus::RunLeaderElectionResponseRpcCallback(
969
4.84k
    shared_ptr<RunLeaderElectionState> election_state) {
970
  // Check for RPC errors.
971
4.84k
  if (!election_state->rpc.status().ok()) {
972
9
    LOG_WITH_PREFIX(WARNING) << "RPC error from RunLeaderElection() call to peer "
973
9
                             << election_state->req.dest_uuid() << ": "
974
9
                             << election_state->rpc.status();
975
  // Check for tablet errors.
976
4.83k
  } else if (election_state->resp.has_error()) {
977
0
    LOG_WITH_PREFIX(WARNING) << "Tablet error from RunLeaderElection() call to peer "
978
0
                             << election_state->req.dest_uuid() << ": "
979
0
                             << StatusFromPB(election_state->resp.error().status());
980
0
  }
981
4.84k
}
982
983
8.01k
void RaftConsensus::ReportFailureDetectedTask() {
984
8.01k
  auto scope_exit = ScopeExit([this] {
985
8.01k
    outstanding_report_failure_task_.clear(std::memory_order_release);
986
8.01k
  });
987
988
8.01k
  MonoTime now;
989
8.01k
  for (;;) {
990
    // Do not start election for an extended period of time if we were recently stepped down.
991
8.01k
    auto old_value = withhold_election_start_until_.load(std::memory_order_acquire);
992
993
8.01k
    if (old_value == MonoTime::Min()) {
994
7.93k
      break;
995
7.93k
    }
996
997
83
    if (!now.Initialized()) {
998
83
      now = MonoTime::Now();
999
83
    }
1000
1001
83
    if (now < old_value) {
1002
0
      VLOG(1) << "Skipping election due to delayed timeout for " << (old_value - now);
1003
51
      return;
1004
51
    }
1005
1006
    // If we ever stepped down and then delayed election start did get scheduled, reset that we
1007
    // are out of that extra delay mode.
1008
32
    if (withhold_election_start_until_.compare_exchange_weak(
1009
32
        old_value, MonoTime::Min(), std::memory_order_release)) {
1010
32
      break;
1011
32
    }
1012
32
  }
1013
1014
  // Start an election.
1015
7.96k
  LOG_WITH_PREFIX(INFO) << "ReportFailDetected: Starting NORMAL_ELECTION...";
1016
7.96k
  Status s = StartElection({ElectionMode::NORMAL_ELECTION});
1017
7.96k
  if (PREDICT_FALSE(!s.ok())) {
1018
10
    LOG_WITH_PREFIX(WARNING) << "Failed to trigger leader election: " << s.ToString();
1019
10
  }
1020
7.96k
}
1021
1022
8.02k
void RaftConsensus::ReportFailureDetected() {
1023
8.02k
  if (FLAGS_raft_disallow_concurrent_outstanding_report_failure_tasks &&
1024
8.02k
      outstanding_report_failure_task_.test_and_set(std::memory_order_acq_rel)) {
1025
0
    VLOG(4)
1026
0
        << "Returning from ReportFailureDetected as there is already an outstanding report task.";
1027
8.01k
  } else {
1028
    // We're running on a timer thread; start an election on a different thread pool.
1029
8.01k
    auto s = raft_pool_token_->SubmitFunc(
1030
8.01k
        std::bind(&RaftConsensus::ReportFailureDetectedTask, shared_from_this()));
1031
8.01k
    WARN_NOT_OK(s, "Failed to submit failure detected task");
1032
8.01k
    if (!s.ok()) {
1033
0
      outstanding_report_failure_task_.clear(std::memory_order_release);
1034
0
    }
1035
8.01k
  }
1036
8.02k
}
1037
1038
35.1k
Status RaftConsensus::BecomeLeaderUnlocked() {
1039
35.1k
  DCHECK(state_->IsLocked());
1040
35.1k
  TRACE_EVENT2("consensus", "RaftConsensus::BecomeLeaderUnlocked",
1041
35.1k
               "peer", peer_uuid(),
1042
35.1k
               "tablet", tablet_id());
1043
35.1k
  LOG_WITH_PREFIX(INFO) << "Becoming Leader. State: " << state_->ToStringUnlocked();
1044
1045
  // Disable FD while we are leader.
1046
35.1k
  DisableFailureDetector();
1047
1048
  // Don't vote for anyone if we're a leader.
1049
35.1k
  withhold_votes_until_.store(MonoTime::Max(), std::memory_order_release);
1050
1051
35.1k
  queue_->RegisterObserver(this);
1052
1053
  // Refresh queue and peers before initiating NO_OP.
1054
35.1k
  RefreshConsensusQueueAndPeersUnlocked();
1055
1056
  // Initiate a NO_OP operation that is sent at the beginning of every term
1057
  // change in raft.
1058
35.1k
  auto replicate = std::make_shared<ReplicateMsg>();
1059
35.1k
  replicate->set_op_type(NO_OP);
1060
35.1k
  replicate->mutable_noop_request(); // Define the no-op request field.
1061
35.1k
  LOG(INFO) << "Sending NO_OP at op " << state_->GetCommittedOpIdUnlocked();
1062
  // This committed OpId is used for tablet bootstrap for RocksDB-backed tables.
1063
35.1k
  state_->GetCommittedOpIdUnlocked().ToPB(replicate->mutable_committed_op_id());
1064
1065
  // TODO: We should have no-ops (?) and config changes be COMMIT_WAIT
1066
  // operations. See KUDU-798.
1067
  // Note: This hybrid_time has no meaning from a serialization perspective
1068
  // because this method is not executed on the TabletPeer's prepare thread.
1069
35.1k
  replicate->set_hybrid_time(clock_->Now().ToUint64());
1070
1071
35.1k
  auto round = make_scoped_refptr<ConsensusRound>(this, replicate);
1072
35.1k
  round->SetCallback(MakeNonTrackedRoundCallback(round.get(),
1073
35.0k
      [this, term = state_->GetCurrentTermUnlocked()](const Status& status) {
1074
    // Set 'Leader is ready to serve' flag only for committed NoOp operation
1075
    // and only if the term is up-to-date.
1076
    // It is guaranteed that successful notification is called only while holding replicate state
1077
    // mutex.
1078
35.1k
    if (status.ok() && term == state_->GetCurrentTermUnlocked()) {
1079
35.1k
      state_->SetLeaderNoOpCommittedUnlocked(true);
1080
35.1k
    }
1081
35.0k
  }));
1082
35.1k
  RETURN_NOT_OK(AppendNewRoundToQueueUnlocked(round));
1083
1084
35.1k
  peer_manager_->SignalRequest(RequestTriggerMode::kNonEmptyOnly);
1085
1086
  // Set the timestamp to max uint64_t so that every time this metric is queried, the returned
1087
  // lag is 0. We will need to restore the timestamp once this peer steps down.
1088
35.1k
  follower_last_update_time_ms_metric_->UpdateTimestampInMilliseconds(
1089
35.1k
      std::numeric_limits<int64_t>::max());
1090
35.1k
  is_raft_leader_metric_->set_value(1);
1091
1092
35.1k
  return Status::OK();
1093
35.1k
}
1094
1095
Status RaftConsensus::BecomeReplicaUnlocked(
1096
93.6k
    const std::string& new_leader_uuid, MonoDelta initial_fd_wait) {
1097
93.6k
  LOG_WITH_PREFIX(INFO)
1098
93.6k
      << "Becoming Follower/Learner. State: " << state_->ToStringUnlocked()
1099
93.6k
      << ", new leader: " << new_leader_uuid << ", initial_fd_wait: " << initial_fd_wait;
1100
1101
93.6k
  if (state_->GetActiveRoleUnlocked() == PeerRole::LEADER) {
1102
4.90k
    WithholdElectionAfterStepDown(new_leader_uuid);
1103
4.90k
  }
1104
1105
93.6k
  state_->ClearLeaderUnlocked();
1106
1107
  // FD should be running while we are a follower.
1108
93.6k
  EnableFailureDetector(initial_fd_wait);
1109
1110
  // Now that we're a replica, we can allow voting for other nodes.
1111
93.6k
  withhold_votes_until_.store(MonoTime::Min(), std::memory_order_release);
1112
1113
93.6k
  const Status unregister_observer_status = queue_->UnRegisterObserver(this);
1114
93.6k
  if (!unregister_observer_status.IsNotFound()) {
1115
4.90k
    RETURN_NOT_OK(unregister_observer_status);
1116
4.90k
  }
1117
  // Deregister ourselves from the queue. We don't care what get's replicated, since
1118
  // we're stepping down.
1119
93.6k
  queue_->SetNonLeaderMode();
1120
1121
93.6k
  peer_manager_->Close();
1122
1123
  // TODO: https://github.com/yugabyte/yugabyte-db/issues/5522. Add unit tests for this metric.
1124
  // We update the follower lag metric timestamp here because it's possible that a leader
1125
  // that step downs could get partitioned before it receives any replicate message. If we
1126
  // don't update the timestamp here, and the above scenario happens, the metric will keep the
1127
  // uint64_t max value, which would make the metric return a 0 lag every time it is queried,
1128
  // even though that's not the case.
1129
93.6k
  follower_last_update_time_ms_metric_->UpdateTimestampInMilliseconds(
1130
93.6k
      clock_->Now().GetPhysicalValueMicros() / 1000);
1131
93.6k
  is_raft_leader_metric_->set_value(0);
1132
1133
93.6k
  return Status::OK();
1134
93.6k
}
1135
1136
195
Status RaftConsensus::TEST_Replicate(const ConsensusRoundPtr& round) {
1137
195
  return ReplicateBatch({ round });
1138
195
}
1139
1140
2.60M
Status RaftConsensus::ReplicateBatch(const ConsensusRounds& rounds) {
1141
2.60M
  size_t processed_rounds = 0;
1142
2.60M
  auto status = DoReplicateBatch(rounds, &processed_rounds);
1143
2.60M
  if (!status.ok()) {
1144
0
    VLOG_WITH_PREFIX_AND_FUNC(1)
1145
0
        << "Failed with status " << status << ", treating all " << rounds.size()
1146
0
        << " operations as failed with that status";
1147
    // Treat all the operations in the batch as failed.
1148
45
    for (size_t i = rounds.size(); i != processed_rounds;) {
1149
17
      rounds[--i]->callback()->ReplicationFailed(status);
1150
17
    }
1151
28
  }
1152
2.60M
  return status;
1153
2.60M
}
1154
1155
2.60M
Status RaftConsensus::DoReplicateBatch(const ConsensusRounds& rounds, size_t* processed_rounds) {
1156
2.60M
  RETURN_NOT_OK(ExecuteHook(PRE_REPLICATE));
1157
2.60M
  {
1158
2.60M
    ReplicaState::UniqueLock lock;
1159
2.60M
#ifndef NDEBUG
1160
2.69M
    for (const auto& round : rounds) {
1161
129
      DCHECK(!round->replicate_msg()->has_id()) << "Should not have an OpId yet: "
1162
129
                                                << round->replicate_msg()->DebugString();
1163
2.69M
    }
1164
2.60M
#endif
1165
2.60M
    RETURN_NOT_OK(state_->LockForReplicate(&lock));
1166
2.60M
    auto current_term = state_->GetCurrentTermUnlocked();
1167
2.60M
    if (current_term == delayed_step_down_.term) {
1168
9
      return STATUS(Aborted, "Rejecting because of planned step down");
1169
9
    }
1170
1171
2.69M
    for (const auto& round : rounds) {
1172
2.69M
      RETURN_NOT_OK(round->CheckBoundTerm(current_term));
1173
2.69M
    }
1174
2.60M
    auto status = AppendNewRoundsToQueueUnlocked(rounds, processed_rounds);
1175
2.60M
    if (!status.ok()) {
1176
      // In general we have 3 kinds of rounds in case of failure:
1177
      // 1) Rounds that were rejected by retryable requests.
1178
      //    We should not call ReplicationFinished for them.
1179
      // 2) Rounds that were registered with retryable requests.
1180
      //    We should call state_->NotifyReplicationFinishedUnlocked for them.
1181
      // 3) Rounds that were not registered with retryable requests.
1182
      //    We should call ReplicationFinished directly for them. Could do it after releasing
1183
      //    the lock. I.e. in ReplicateBatch.
1184
      //
1185
      // (3) is all rounds starting with index *processed_rounds and above.
1186
      // For (1) we reset bound term, so could use it to distinguish between (1) and (2).
1187
29
      for (size_t i = *processed_rounds; i != 0;) {
1188
12
        --i;
1189
12
        if (rounds[i]->bound_term() == OpId::kUnknownTerm) {
1190
          // Already rejected by retryable requests.
1191
0
          continue;
1192
0
        }
1193
12
        state_->NotifyReplicationFinishedUnlocked(
1194
12
            rounds[i], status, OpId::kUnknownTerm, /* applied_op_ids */ nullptr);
1195
12
      }
1196
17
      return status;
1197
17
    }
1198
2.60M
  }
1199
1200
2.60M
  peer_manager_->SignalRequest(RequestTriggerMode::kNonEmptyOnly);
1201
2.60M
  RETURN_NOT_OK(ExecuteHook(POST_REPLICATE));
1202
2.60M
  return Status::OK();
1203
2.60M
}
1204
1205
38.1k
Status RaftConsensus::AppendNewRoundToQueueUnlocked(const scoped_refptr<ConsensusRound>& round) {
1206
38.1k
  size_t processed_rounds = 0;
1207
38.1k
  return AppendNewRoundsToQueueUnlocked({ round }, &processed_rounds);
1208
38.1k
}
1209
1210
2.63M
Status RaftConsensus::CheckLeasesUnlocked(const ConsensusRoundPtr& round) {
1211
2.63M
  auto op_type = round->replicate_msg()->op_type();
1212
  // When we do not have a hybrid time leader lease we allow 2 operation types to be added to RAFT.
1213
  // NO_OP - because even empty heartbeat messages could be used to obtain the lease.
1214
  // CHANGE_CONFIG_OP - because we should be able to update consensus even w/o lease.
1215
  // Both of them are safe, since they don't affect user reads or writes.
1216
2.63M
  if (IsConsensusOnlyOperation(op_type)) {
1217
38.2k
    return Status::OK();
1218
38.2k
  }
1219
1220
2.59M
  auto lease_status = state_->GetHybridTimeLeaseStatusAtUnlocked(
1221
2.59M
      HybridTime(round->replicate_msg()->hybrid_time()).GetPhysicalValueMicros());
1222
2.59M
  static_assert(LeaderLeaseStatus_ARRAYSIZE == 3, "Please update logic below to adapt new state");
1223
2.59M
  if (lease_status == LeaderLeaseStatus::OLD_LEADER_MAY_HAVE_LEASE) {
1224
0
    return STATUS_FORMAT(LeaderHasNoLease,
1225
0
                         "Old leader may have hybrid time lease, while adding: $0",
1226
0
                         OperationType_Name(op_type));
1227
0
  }
1228
2.59M
  lease_status = state_->GetLeaderLeaseStatusUnlocked(nullptr);
1229
2.59M
  if (lease_status == LeaderLeaseStatus::OLD_LEADER_MAY_HAVE_LEASE) {
1230
0
    return STATUS_FORMAT(LeaderHasNoLease,
1231
0
                         "Old leader may have lease, while adding: $0",
1232
0
                         OperationType_Name(op_type));
1233
0
  }
1234
1235
2.59M
  return Status::OK();
1236
2.59M
}
1237
1238
Status RaftConsensus::AppendNewRoundsToQueueUnlocked(
1239
2.64M
    const ConsensusRounds& rounds, size_t* processed_rounds) {
1240
2.64M
  SCHECK(!rounds.empty(), InvalidArgument, "Attempted to add zero rounds to the queue");
1241
1242
2.64M
  auto role = state_->GetActiveRoleUnlocked();
1243
2.64M
  if (role != PeerRole::LEADER) {
1244
5
    return STATUS_FORMAT(IllegalState, "Appending new rounds while not the leader but $0",
1245
5
                         PeerRole_Name(role));
1246
5
  }
1247
1248
2.64M
  std::vector<ReplicateMsgPtr> replicate_msgs;
1249
2.64M
  replicate_msgs.reserve(rounds.size());
1250
2.64M
  const OpId& committed_op_id = state_->GetCommittedOpIdUnlocked();
1251
1252
2.73M
  for (const auto& round : rounds) {
1253
2.73M
    ++*processed_rounds;
1254
1255
2.73M
    if (round->replicate_msg()->op_type() == OperationType::WRITE_OP &&
1256
1.54M
        !state_->RegisterRetryableRequest(round)) {
1257
1
      round->BindToTerm(OpId::kUnknownTerm); // Mark round as non replicating
1258
1
      continue;
1259
1
    }
1260
1261
2.73M
    OpId op_id = state_->NewIdUnlocked();
1262
1263
    // We use this callback to transform write operations by substituting the hybrid_time into
1264
    // the write batch inside the write operation.
1265
    //
1266
    // TODO: we could allocate multiple HybridTimes in batch, only reading system clock once.
1267
2.73M
    round->callback()->AddedToLeader(op_id, committed_op_id);
1268
1269
2.73M
    Status s = state_->AddPendingOperation(round, OperationMode::kLeader);
1270
2.73M
    if (!s.ok()) {
1271
12
      RollbackIdAndDeleteOpId(round->replicate_msg(), false /* should_exists */);
1272
1273
      // Iterate rounds in the reverse order and release ids.
1274
12
      while (!replicate_msgs.empty()) {
1275
0
        RollbackIdAndDeleteOpId(replicate_msgs.back(), true /* should_exists */);
1276
0
        replicate_msgs.pop_back();
1277
0
      }
1278
12
      return s;
1279
12
    }
1280
1281
2.73M
    replicate_msgs.push_back(round->replicate_msg());
1282
2.73M
  }
1283
1284
2.64M
  if (replicate_msgs.empty()) {
1285
1
    return Status::OK();
1286
1
  }
1287
1288
  // Could check lease just for the latest operation in batch, because it will have greatest
1289
  // hybrid time, so requires most advanced lease.
1290
2.64M
  auto s = CheckLeasesUnlocked(rounds.back());
1291
1292
2.64M
  if (s.ok()) {
1293
2.64M
    s = queue_->AppendOperations(replicate_msgs, committed_op_id, state_->Clock().Now());
1294
2.64M
  }
1295
1296
  // Handle Status::ServiceUnavailable(), which means the queue is full.
1297
  // TODO: what are we doing about other errors here? Should we also release OpIds in those cases?
1298
2.64M
  if (PREDICT_FALSE(!s.ok())) {
1299
0
    LOG_WITH_PREFIX(WARNING) << "Could not append replicate request: " << s << ", queue status: "
1300
0
                             << queue_->ToString();
1301
0
    for (auto iter = replicate_msgs.rbegin(); iter != replicate_msgs.rend(); ++iter) {
1302
0
      RollbackIdAndDeleteOpId(*iter, true /* should_exists */);
1303
      // TODO Possibly evict a dangling peer from the configuration here.
1304
      // TODO count of number of ops failed due to consensus queue overflow.
1305
0
    }
1306
1307
0
    return s.CloneAndPrepend("Unable to append operations to consensus queue");
1308
0
  }
1309
1310
2.64M
  state_->UpdateLastReceivedOpIdUnlocked(replicate_msgs.back()->id());
1311
2.64M
  return Status::OK();
1312
2.64M
}
1313
1314
void RaftConsensus::MajorityReplicatedNumSSTFilesChanged(
1315
1.44k
    uint64_t majority_replicated_num_sst_files) {
1316
1.44k
  majority_num_sst_files_.store(majority_replicated_num_sst_files, std::memory_order_release);
1317
1.44k
}
1318
1319
void RaftConsensus::UpdateMajorityReplicated(
1320
    const MajorityReplicatedData& majority_replicated_data, OpId* committed_op_id,
1321
15.1M
    OpId* last_applied_op_id) {
1322
15.1M
  TEST_PAUSE_IF_FLAG(TEST_pause_update_majority_replicated);
1323
15.1M
  ReplicaState::UniqueLock lock;
1324
15.1M
  Status s = state_->LockForMajorityReplicatedIndexUpdate(&lock);
1325
15.1M
  if (PREDICT_FALSE(!s.ok())) {
1326
81
    LOG_WITH_PREFIX(WARNING)
1327
81
        << "Unable to take state lock to update committed index: "
1328
81
        << s.ToString();
1329
81
    return;
1330
81
  }
1331
1332
15.1M
  EnumBitSet<SetMajorityReplicatedLeaseExpirationFlag> flags;
1333
15.1M
  if (GetAtomicFlag(&FLAGS_enable_lease_revocation)) {
1334
15.1M
    if (!state_->old_leader_lease().holder_uuid.empty() &&
1335
15.2k
        queue_->PeerAcceptedOurLease(state_->old_leader_lease().holder_uuid)) {
1336
4.80k
      flags.Set(SetMajorityReplicatedLeaseExpirationFlag::kResetOldLeaderLease);
1337
4.80k
    }
1338
1339
15.1M
    if (!state_->old_leader_ht_lease().holder_uuid.empty() &&
1340
138k
        queue_->PeerAcceptedOurLease(state_->old_leader_ht_lease().holder_uuid)) {
1341
4.85k
      flags.Set(SetMajorityReplicatedLeaseExpirationFlag::kResetOldLeaderHtLease);
1342
4.85k
    }
1343
15.1M
  }
1344
1345
15.1M
  state_->SetMajorityReplicatedLeaseExpirationUnlocked(majority_replicated_data, flags);
1346
15.1M
  leader_lease_wait_cond_.notify_all();
1347
1348
3.82k
  VLOG_WITH_PREFIX(1) << "Marking majority replicated up to "
1349
3.82k
      << majority_replicated_data.ToString();
1350
15.1M
  TRACE("Marking majority replicated up to $0", majority_replicated_data.op_id.ToString());
1351
15.1M
  bool committed_index_changed = false;
1352
15.1M
  s = state_->UpdateMajorityReplicatedUnlocked(
1353
15.1M
      majority_replicated_data.op_id, committed_op_id, &committed_index_changed,
1354
15.1M
      last_applied_op_id);
1355
15.1M
  auto leader_state = state_->GetLeaderStateUnlocked();
1356
15.1M
  if (leader_state.ok() && leader_state.status == LeaderStatus::LEADER_AND_READY) {
1357
15.1M
    state_->context()->MajorityReplicated();
1358
15.1M
  }
1359
15.1M
  if (PREDICT_FALSE(!s.ok())) {
1360
0
    string msg = Format("Unable to mark committed up to $0: $1", majority_replicated_data.op_id, s);
1361
0
    TRACE(msg);
1362
0
    LOG_WITH_PREFIX(WARNING) << msg;
1363
0
    return;
1364
0
  }
1365
1366
15.1M
  majority_num_sst_files_.store(majority_replicated_data.num_sst_files, std::memory_order_release);
1367
1368
15.1M
  if (!majority_replicated_data.peer_got_all_ops.empty() &&
1369
9.06M
      delayed_step_down_.term == state_->GetCurrentTermUnlocked() &&
1370
110
      majority_replicated_data.peer_got_all_ops == delayed_step_down_.protege) {
1371
42
    LOG_WITH_PREFIX(INFO) << "Protege synchronized: " << delayed_step_down_.ToString();
1372
42
    const auto* peer = FindPeer(state_->GetActiveConfigUnlocked(), delayed_step_down_.protege);
1373
42
    if (peer) {
1374
42
      WARN_NOT_OK(StartStepDownUnlocked(*peer, delayed_step_down_.graceful),
1375
42
                  "Start step down failed");
1376
42
    }
1377
42
    delayed_step_down_.term = OpId::kUnknownTerm;
1378
42
  }
1379
1380
15.1M
  if (committed_index_changed &&
1381
2.47M
      state_->GetActiveRoleUnlocked() == PeerRole::LEADER) {
1382
    // If all operations were just committed, and we don't have pending operations, then
1383
    // we write an empty batch that contains committed index.
1384
    // This affects only our local log, because followers have different logic in this scenario.
1385
2.47M
    if (*committed_op_id == state_->GetLastReceivedOpIdUnlocked()) {
1386
2.28M
      auto status = queue_->AppendOperations({}, *committed_op_id, state_->Clock().Now());
1387
62
      LOG_IF_WITH_PREFIX(DFATAL, !status.ok() && !status.IsServiceUnavailable())
1388
62
          << "Failed to append empty batch: " << status;
1389
2.28M
    }
1390
1391
2.47M
    lock.unlock();
1392
    // No need to hold the lock while calling SignalRequest.
1393
2.47M
    peer_manager_->SignalRequest(RequestTriggerMode::kNonEmptyOnly);
1394
2.47M
  }
1395
15.1M
}
1396
1397
0
void RaftConsensus::AppendEmptyBatchToLeaderLog() {
1398
0
  auto lock = state_->LockForRead();
1399
0
  auto committed_op_id = state_->GetCommittedOpIdUnlocked();
1400
0
  if (committed_op_id == state_->GetLastReceivedOpIdUnlocked()) {
1401
0
    auto status = queue_->AppendOperations({}, committed_op_id, state_->Clock().Now());
1402
0
    LOG_IF_WITH_PREFIX(DFATAL, !status.ok()) << "Failed to append empty batch: " << status;
1403
0
  }
1404
0
}
1405
1406
18
void RaftConsensus::NotifyTermChange(int64_t term) {
1407
18
  ReplicaState::UniqueLock lock;
1408
18
  Status s = state_->LockForConfigChange(&lock);
1409
18
  if (PREDICT_FALSE(!s.ok())) {
1410
0
    LOG_WITH_PREFIX(WARNING) << "Unable to lock ReplicaState for config change"
1411
0
                             << " when notified of new term " << term << ": " << s;
1412
0
    return;
1413
0
  }
1414
18
  WARN_NOT_OK(HandleTermAdvanceUnlocked(term), "Couldn't advance consensus term.");
1415
18
}
1416
1417
void RaftConsensus::NotifyFailedFollower(const string& uuid,
1418
                                         int64_t term,
1419
160
                                         const std::string& reason) {
1420
  // Common info used in all of the log messages within this method.
1421
160
  string fail_msg = Substitute("Processing failure of peer $0 in term $1 ($2): ",
1422
160
                               uuid, term, reason);
1423
1424
160
  if (!FLAGS_evict_failed_followers) {
1425
0
    LOG_WITH_PREFIX(INFO) << fail_msg << "Eviction of failed followers is disabled. Doing nothing.";
1426
0
    return;
1427
0
  }
1428
1429
160
  RaftConfigPB committed_config;
1430
160
  {
1431
160
    auto lock = state_->LockForRead();
1432
1433
160
    int64_t current_term = state_->GetCurrentTermUnlocked();
1434
160
    if (current_term != term) {
1435
0
      LOG_WITH_PREFIX(INFO) << fail_msg << "Notified about a follower failure in "
1436
0
                            << "previous term " << term << ", but a leader election "
1437
0
                            << "likely occurred since the failure was detected. "
1438
0
                            << "Doing nothing.";
1439
0
      return;
1440
0
    }
1441
1442
160
    if (state_->IsConfigChangePendingUnlocked()) {
1443
0
      LOG_WITH_PREFIX(INFO) << fail_msg << "There is already a config change operation "
1444
0
                            << "in progress. Unable to evict follower until it completes. "
1445
0
                            << "Doing nothing.";
1446
0
      return;
1447
0
    }
1448
160
    committed_config = state_->GetCommittedConfigUnlocked();
1449
160
  }
1450
1451
  // Run config change on thread pool after dropping ReplicaState lock.
1452
160
  WARN_NOT_OK(raft_pool_token_->SubmitFunc(std::bind(&RaftConsensus::TryRemoveFollowerTask,
1453
160
                                               shared_from_this(), uuid, committed_config, reason)),
1454
160
              state_->LogPrefix() + "Unable to start RemoteFollowerTask");
1455
160
}
1456
1457
void RaftConsensus::TryRemoveFollowerTask(const string& uuid,
1458
                                          const RaftConfigPB& committed_config,
1459
160
                                          const std::string& reason) {
1460
160
  ChangeConfigRequestPB req;
1461
160
  req.set_tablet_id(tablet_id());
1462
160
  req.mutable_server()->set_permanent_uuid(uuid);
1463
160
  req.set_type(REMOVE_SERVER);
1464
160
  req.set_cas_config_opid_index(committed_config.opid_index());
1465
160
  LOG_WITH_PREFIX(INFO)
1466
160
      << "Attempting to remove follower " << uuid << " from the Raft config at commit index "
1467
160
      << committed_config.opid_index() << ". Reason: " << reason;
1468
160
  boost::optional<TabletServerErrorPB::Code> error_code;
1469
160
  WARN_NOT_OK(ChangeConfig(req, &DoNothingStatusCB, &error_code),
1470
160
              state_->LogPrefix() + "Unable to remove follower " + uuid);
1471
160
}
1472
1473
Status RaftConsensus::Update(ConsensusRequestPB* request,
1474
                             ConsensusResponsePB* response,
1475
10.2M
                             CoarseTimePoint deadline) {
1476
10.2M
  if (PREDICT_FALSE(FLAGS_TEST_follower_reject_update_consensus_requests)) {
1477
327
    return STATUS(IllegalState, "Rejected: --TEST_follower_reject_update_consensus_requests "
1478
327
                                "is set to true.");
1479
327
  }
1480
10.2M
  TEST_PAUSE_IF_FLAG(TEST_follower_pause_update_consensus_requests);
1481
1482
10.2M
  auto reject_mode = reject_mode_.load(std::memory_order_acquire);
1483
10.2M
  if (reject_mode != RejectMode::kNone) {
1484
0
    if (reject_mode == RejectMode::kAll ||
1485
0
        (reject_mode == RejectMode::kNonEmpty && !request->ops().empty())) {
1486
0
      auto result = STATUS_FORMAT(IllegalState, "Rejected because of reject mode: $0",
1487
0
                                  ToString(reject_mode));
1488
0
      LOG_WITH_PREFIX(INFO) << result;
1489
0
      return result;
1490
0
    }
1491
0
    LOG_WITH_PREFIX(INFO) << "Accepted: " << request->ShortDebugString();
1492
0
  }
1493
1494
10.2M
  if (PREDICT_FALSE(FLAGS_TEST_follower_reject_update_consensus_requests_seconds > 0)) {
1495
169
    if (MonoTime::Now() < withold_replica_updates_until_) {
1496
169
      LOG(INFO) << "Rejecting Update for tablet: " << tablet_id()
1497
169
                << " tserver uuid: " << peer_uuid();
1498
169
      return STATUS_SUBSTITUTE(IllegalState,
1499
169
          "Rejected: --TEST_follower_reject_update_consensus_requests_seconds is set to $0",
1500
169
          FLAGS_TEST_follower_reject_update_consensus_requests_seconds);
1501
169
    }
1502
10.2M
  }
1503
1504
10.2M
  RETURN_NOT_OK(ExecuteHook(PRE_UPDATE));
1505
10.2M
  response->set_responder_uuid(state_->GetPeerUuid());
1506
1507
11.6k
  VLOG_WITH_PREFIX(2) << "Replica received request: " << request->ShortDebugString();
1508
1509
10.2M
  UpdateReplicaResult result;
1510
10.2M
  {
1511
    // see var declaration
1512
10.2M
    auto wait_start = CoarseMonoClock::now();
1513
10.2M
    auto wait_duration = deadline != CoarseTimePoint::max() ? deadline - wait_start
1514
12.0k
                                                            : CoarseDuration::max();
1515
10.2M
    auto lock = LockMutex(&update_mutex_, wait_duration);
1516
10.2M
    if (!lock.owns_lock()) {
1517
8
      return STATUS_FORMAT(TimedOut, "Unable to lock update mutex for $0", wait_duration);
1518
8
    }
1519
1520
10.2M
    LongOperationTracker operation_tracker("UpdateReplica", 1s);
1521
10.2M
    result = VERIFY_RESULT(UpdateReplica(request, response));
1522
1523
10.2M
    auto delay = TEST_delay_update_.load(std::memory_order_acquire);
1524
10.2M
    if (delay != MonoDelta::kZero) {
1525
0
      std::this_thread::sleep_for(delay.ToSteadyDuration());
1526
0
    }
1527
10.2M
  }
1528
1529
  // Release the lock while we wait for the log append to finish so that commits can go through.
1530
10.2M
  if (!result.wait_for_op_id.empty()) {
1531
4.50M
    RETURN_NOT_OK(WaitForWrites(result.current_term, result.wait_for_op_id));
1532
4.50M
  }
1533
1534
10.2M
  if (PREDICT_FALSE(VLOG_IS_ON(2))) {
1535
0
    VLOG_WITH_PREFIX(2) << "Replica updated. "
1536
0
        << state_->ToString() << " Request: " << request->ShortDebugString();
1537
0
  }
1538
1539
  // If an election pending on a specific op id and it has just been committed, start it now.
1540
  // StartElection will ensure the pending election will be started just once only even if
1541
  // UpdateReplica happens in multiple threads in parallel.
1542
10.2M
  if (result.start_election) {
1543
0
    RETURN_NOT_OK(StartElection(
1544
0
        {consensus::ElectionMode::ELECT_EVEN_IF_LEADER_IS_ALIVE, true /* pending_commit */}));
1545
0
  }
1546
1547
10.2M
  RETURN_NOT_OK(ExecuteHook(POST_UPDATE));
1548
10.2M
  return Status::OK();
1549
10.2M
}
1550
1551
Status RaftConsensus::StartReplicaOperationUnlocked(
1552
5.17M
    const ReplicateMsgPtr& msg, HybridTime propagated_safe_time) {
1553
5.17M
  if (IsConsensusOnlyOperation(msg->op_type())) {
1554
74.9k
    return StartConsensusOnlyRoundUnlocked(msg);
1555
74.9k
  }
1556
1557
5.09M
  if (PREDICT_FALSE(FLAGS_TEST_follower_fail_all_prepare)) {
1558
1
    return STATUS(IllegalState, "Rejected: --TEST_follower_fail_all_prepare "
1559
1
                                "is set to true.");
1560
1
  }
1561
1562
5.09M
  VLOG_WITH_PREFIX(1) << "Starting operation: " << msg->id().ShortDebugString();
1563
5.09M
  scoped_refptr<ConsensusRound> round(new ConsensusRound(this, msg));
1564
5.09M
  ConsensusRound* round_ptr = round.get();
1565
5.09M
  RETURN_NOT_OK(state_->context()->StartReplicaOperation(round, propagated_safe_time));
1566
5.09M
  auto result = state_->AddPendingOperation(round_ptr, OperationMode::kFollower);
1567
5.09M
  if (!result.ok()) {
1568
2
    round_ptr->NotifyReplicationFinished(result, OpId::kUnknownTerm, /* applied_op_ids */ nullptr);
1569
2
  }
1570
5.09M
  return result;
1571
5.09M
}
1572
1573
8
std::string RaftConsensus::LeaderRequest::OpsRangeString() const {
1574
8
  std::string ret;
1575
8
  ret.reserve(100);
1576
8
  ret.push_back('[');
1577
8
  if (!messages.empty()) {
1578
3
    const OpIdPB& first_op = (*messages.begin())->id();
1579
3
    const OpIdPB& last_op = (*messages.rbegin())->id();
1580
3
    strings::SubstituteAndAppend(&ret, "$0.$1-$2.$3",
1581
3
                                 first_op.term(), first_op.index(),
1582
3
                                 last_op.term(), last_op.index());
1583
3
  }
1584
8
  ret.push_back(']');
1585
8
  return ret;
1586
8
}
1587
1588
Status RaftConsensus::DeduplicateLeaderRequestUnlocked(ConsensusRequestPB* rpc_req,
1589
10.2M
                                                       LeaderRequest* deduplicated_req) {
1590
10.2M
  const auto& last_committed = state_->GetCommittedOpIdUnlocked();
1591
1592
  // The leader's preceding id.
1593
10.2M
  deduplicated_req->preceding_op_id = yb::OpId::FromPB(rpc_req->preceding_id());
1594
1595
10.2M
  int64_t dedup_up_to_index = state_->GetLastReceivedOpIdUnlocked().index;
1596
1597
10.2M
  deduplicated_req->first_message_idx = -1;
1598
1599
  // In this loop we discard duplicates and advance the leader's preceding id
1600
  // accordingly.
1601
15.4M
  for (int i = 0; i < rpc_req->ops_size(); i++) {
1602
5.17M
    ReplicateMsg* leader_msg = rpc_req->mutable_ops(i);
1603
1604
5.17M
    if (leader_msg->id().index() <= last_committed.index) {
1605
0
      VLOG_WITH_PREFIX(2) << "Skipping op id " << leader_msg->id()
1606
0
                          << " (already committed)";
1607
508
      deduplicated_req->preceding_op_id = yb::OpId::FromPB(leader_msg->id());
1608
508
      continue;
1609
508
    }
1610
1611
5.17M
    if (leader_msg->id().index() <= dedup_up_to_index) {
1612
      // If the index is uncommitted and below our match index, then it must be in the
1613
      // pendings set.
1614
102
      scoped_refptr<ConsensusRound> round =
1615
102
          state_->GetPendingOpByIndexOrNullUnlocked(leader_msg->id().index());
1616
102
      if (!round) {
1617
        // Could happen if we received outdated leader request. So should just reject it.
1618
0
        return STATUS_FORMAT(IllegalState, "Round not found for index: $0",
1619
0
                             leader_msg->id().index());
1620
0
      }
1621
1622
      // If the OpIds match, i.e. if they have the same term and id, then this is just
1623
      // duplicate, we skip...
1624
102
      if (OpIdEquals(round->replicate_msg()->id(), leader_msg->id())) {
1625
0
        VLOG_WITH_PREFIX(2) << "Skipping op id " << leader_msg->id()
1626
0
                            << " (already replicated)";
1627
3
        deduplicated_req->preceding_op_id = yb::OpId::FromPB(leader_msg->id());
1628
3
        continue;
1629
3
      }
1630
1631
      // ... otherwise we must adjust our match index, i.e. all messages from now on
1632
      // are "new"
1633
99
      dedup_up_to_index = leader_msg->id().index();
1634
99
    }
1635
1636
5.17M
    if (deduplicated_req->first_message_idx == -1) {
1637
4.50M
      deduplicated_req->first_message_idx = i;
1638
4.50M
    }
1639
5.17M
    deduplicated_req->messages.emplace_back(leader_msg);
1640
5.17M
  }
1641
1642
10.2M
  if (deduplicated_req->messages.size() != implicit_cast<size_t>(rpc_req->ops_size())) {
1643
8
    LOG_WITH_PREFIX(INFO) << "Deduplicated request from leader. Original: "
1644
8
                          << rpc_req->preceding_id() << "->" << OpsRangeString(*rpc_req)
1645
8
                          << "   Dedup: " << deduplicated_req->preceding_op_id << "->"
1646
8
                          << deduplicated_req->OpsRangeString()
1647
8
                          << ", known committed: " << last_committed << ", received committed: "
1648
8
                          << OpId::FromPB(rpc_req->committed_op_id());
1649
8
  }
1650
1651
10.2M
  return Status::OK();
1652
10.2M
}
1653
1654
Status RaftConsensus::HandleLeaderRequestTermUnlocked(const ConsensusRequestPB* request,
1655
10.2M
                                                      ConsensusResponsePB* response) {
1656
  // Do term checks first:
1657
10.2M
  if (PREDICT_FALSE(request->caller_term() != state_->GetCurrentTermUnlocked())) {
1658
1659
    // If less, reject.
1660
6.39k
    if (request->caller_term() < state_->GetCurrentTermUnlocked()) {
1661
93
      string msg = Substitute("Rejecting Update request from peer $0 for earlier term $1. "
1662
93
                              "Current term is $2. Ops: $3",
1663
1664
93
                              request->caller_uuid(),
1665
93
                              request->caller_term(),
1666
93
                              state_->GetCurrentTermUnlocked(),
1667
93
                              OpsRangeString(*request));
1668
93
      LOG_WITH_PREFIX(INFO) << msg;
1669
93
      FillConsensusResponseError(response,
1670
93
                                 ConsensusErrorPB::INVALID_TERM,
1671
93
                                 STATUS(IllegalState, msg));
1672
93
      return Status::OK();
1673
6.29k
    } else {
1674
6.29k
      RETURN_NOT_OK(HandleTermAdvanceUnlocked(request->caller_term()));
1675
6.29k
    }
1676
6.39k
  }
1677
10.2M
  return Status::OK();
1678
10.2M
}
1679
1680
Status RaftConsensus::EnforceLogMatchingPropertyMatchesUnlocked(const LeaderRequest& req,
1681
10.2M
                                                                ConsensusResponsePB* response) {
1682
1683
10.2M
  bool term_mismatch;
1684
10.2M
  if (state_->IsOpCommittedOrPending(req.preceding_op_id, &term_mismatch)) {
1685
10.2M
    return Status::OK();
1686
10.2M
  }
1687
1688
63.4k
  string error_msg = Format(
1689
63.4k
    "Log matching property violated."
1690
63.4k
    " Preceding OpId in replica: $0. Preceding OpId from leader: $1. ($2 mismatch)",
1691
63.3k
    state_->GetLastReceivedOpIdUnlocked(), req.preceding_op_id, term_mismatch ? "term" : "index");
1692
1693
63.4k
  FillConsensusResponseError(response,
1694
63.4k
                             ConsensusErrorPB::PRECEDING_ENTRY_DIDNT_MATCH,
1695
63.4k
                             STATUS(IllegalState, error_msg));
1696
1697
63.4k
  LOG_WITH_PREFIX(INFO) << "Refusing update from remote peer "
1698
63.4k
                        << req.leader_uuid << ": " << error_msg;
1699
1700
  // If the terms mismatch we abort down to the index before the leader's preceding,
1701
  // since we know that is the last opid that has a chance of not being overwritten.
1702
  // Aborting preemptively here avoids us reporting a last received index that is
1703
  // possibly higher than the leader's causing an avoidable cache miss on the leader's
1704
  // queue.
1705
  //
1706
  // TODO: this isn't just an optimization! if we comment this out, we get
1707
  // failures on raft_consensus-itest a couple percent of the time! Should investigate
1708
  // why this is actually critical to do here, as opposed to just on requests that
1709
  // append some ops.
1710
63.4k
  if (term_mismatch) {
1711
14
    return state_->AbortOpsAfterUnlocked(req.preceding_op_id.index - 1);
1712
14
  }
1713
1714
63.3k
  return Status::OK();
1715
63.3k
}
1716
1717
Status RaftConsensus::CheckLeaderRequestOpIdSequence(
1718
    const LeaderRequest& deduped_req,
1719
10.2M
    ConsensusRequestPB* request) {
1720
10.2M
  Status sequence_check_status;
1721
10.2M
  yb::OpId prev = deduped_req.preceding_op_id;
1722
5.17M
  for (const auto& message : deduped_req.messages) {
1723
5.17M
    auto current = yb::OpId::FromPB(message->id());
1724
5.17M
    sequence_check_status = ReplicaState::CheckOpInSequence(prev, current);
1725
5.17M
    if (PREDICT_FALSE(!sequence_check_status.ok())) {
1726
2
      LOG(ERROR) << "Leader request contained out-of-sequence messages. Status: "
1727
2
          << sequence_check_status.ToString() << ". Leader Request: "
1728
2
          << request->ShortDebugString();
1729
2
      break;
1730
2
    }
1731
5.17M
    prev = current;
1732
5.17M
  }
1733
1734
  // We only release the messages from the request after the above check so that that we can print
1735
  // the original request, if it fails.
1736
10.2M
  if (!deduped_req.messages.empty()) {
1737
    // We take ownership of the deduped ops.
1738
4.50M
    DCHECK_GE(deduped_req.first_message_idx, 0);
1739
4.50M
    request->mutable_ops()->ExtractSubrange(
1740
4.50M
        narrow_cast<int>(deduped_req.first_message_idx),
1741
4.50M
        narrow_cast<int>(deduped_req.messages.size()),
1742
4.50M
        nullptr);
1743
4.50M
  }
1744
1745
  // We don't need request->ops() anymore, so could release them to avoid unnecessary memory
1746
  // consumption.
1747
10.2M
  request->mutable_ops()->Clear();
1748
1749
10.2M
  return sequence_check_status;
1750
10.2M
}
1751
1752
Status RaftConsensus::CheckLeaderRequestUnlocked(ConsensusRequestPB* request,
1753
                                                 ConsensusResponsePB* response,
1754
10.2M
                                                 LeaderRequest* deduped_req) {
1755
10.2M
  RETURN_NOT_OK(DeduplicateLeaderRequestUnlocked(request, deduped_req));
1756
1757
  // This is an additional check for KUDU-639 that makes sure the message's index
1758
  // and term are in the right sequence in the request, after we've deduplicated
1759
  // them. We do this before we change any of the internal state.
1760
  //
1761
  // TODO move this to raft_consensus-state or whatever we transform that into.
1762
  // We should be able to do this check for each append, but right now the way
1763
  // we initialize raft_consensus-state is preventing us from doing so.
1764
10.2M
  RETURN_NOT_OK(CheckLeaderRequestOpIdSequence(*deduped_req, request));
1765
1766
10.2M
  RETURN_NOT_OK(HandleLeaderRequestTermUnlocked(request, response));
1767
1768
10.2M
  if (response->status().has_error()) {
1769
93
    return Status::OK();
1770
93
  }
1771
1772
10.2M
  RETURN_NOT_OK(EnforceLogMatchingPropertyMatchesUnlocked(*deduped_req, response));
1773
1774
10.2M
  if (response->status().has_error()) {
1775
68.3k
    return Status::OK();
1776
68.3k
  }
1777
1778
  // If the first of the messages to apply is not in our log, either it follows the last
1779
  // received message or it replaces some in-flight.
1780
10.2M
  if (!deduped_req->messages.empty()) {
1781
4.50M
    auto first_id = yb::OpId::FromPB(deduped_req->messages[0]->id());
1782
4.50M
    bool term_mismatch;
1783
4.50M
    if (state_->IsOpCommittedOrPending(first_id, &term_mismatch)) {
1784
0
      return STATUS_FORMAT(IllegalState,
1785
0
                           "First deduped message $0 is committed or pending",
1786
0
                           first_id);
1787
0
    }
1788
1789
    // If the index is in our log but the terms are not the same abort down to the leader's
1790
    // preceding id.
1791
4.50M
    if (term_mismatch) {
1792
      // Since we are holding the lock ApplyPendingOperationsUnlocked would be invoked between
1793
      // those two.
1794
99
      RETURN_NOT_OK(state_->AbortOpsAfterUnlocked(deduped_req->preceding_op_id.index));
1795
99
      RETURN_NOT_OK(log_->ResetLastSyncedEntryOpId(deduped_req->preceding_op_id));
1796
99
    }
1797
4.50M
  }
1798
1799
  // If all of the above logic was successful then we can consider this to be
1800
  // the effective leader of the configuration. If they are not currently marked as
1801
  // the leader locally, mark them as leader now.
1802
10.2M
  const string& caller_uuid = request->caller_uuid();
1803
10.2M
  if (PREDICT_FALSE(state_->HasLeaderUnlocked() &&
1804
0
                    state_->GetLeaderUuidUnlocked() != caller_uuid)) {
1805
0
    LOG_WITH_PREFIX(FATAL)
1806
0
        << "Unexpected new leader in same term! "
1807
0
        << "Existing leader UUID: " << state_->GetLeaderUuidUnlocked() << ", "
1808
0
        << "new leader UUID: " << caller_uuid;
1809
0
  }
1810
10.2M
  if (PREDICT_FALSE(!state_->HasLeaderUnlocked())) {
1811
68.2k
    SetLeaderUuidUnlocked(caller_uuid);
1812
68.2k
  }
1813
1814
10.2M
  return Status::OK();
1815
10.2M
}
1816
1817
Result<RaftConsensus::UpdateReplicaResult> RaftConsensus::UpdateReplica(
1818
10.2M
    ConsensusRequestPB* request, ConsensusResponsePB* response) {
1819
10.2M
  TRACE_EVENT2("consensus", "RaftConsensus::UpdateReplica",
1820
10.2M
               "peer", peer_uuid(),
1821
10.2M
               "tablet", tablet_id());
1822
1823
10.2M
  if (request->has_propagated_hybrid_time()) {
1824
10.2M
    clock_->Update(HybridTime(request->propagated_hybrid_time()));
1825
10.2M
  }
1826
1827
  // The ordering of the following operations is crucial, read on for details.
1828
  //
1829
  // The main requirements explained in more detail below are:
1830
  //
1831
  //   1) We must enqueue the prepares before we write to our local log.
1832
  //   2) If we were able to enqueue a prepare then we must be able to log it.
1833
  //   3) If we fail to enqueue a prepare, we must not attempt to enqueue any
1834
  //      later-indexed prepare or apply.
1835
  //
1836
  // See below for detailed rationale.
1837
  //
1838
  // The steps are:
1839
  //
1840
  // 0 - Dedup
1841
  //
1842
  // We make sure that we don't do anything on Replicate operations we've already received in a
1843
  // previous call. This essentially makes this method idempotent.
1844
  //
1845
  // 1 - We mark as many pending operations as committed as we can.
1846
  //
1847
  // We may have some pending operations that, according to the leader, are now
1848
  // committed. We Apply them early, because:
1849
  // - Soon (step 2) we may reject the call due to excessive memory pressure. One
1850
  //   way to relieve the pressure is by flushing the MRS, and applying these
1851
  //   operations may unblock an in-flight Flush().
1852
  // - The Apply and subsequent Prepares (step 2) can take place concurrently.
1853
  //
1854
  // 2 - We enqueue the Prepare of the operations.
1855
  //
1856
  // The actual prepares are enqueued in order but happen asynchronously so we don't
1857
  // have decoding/acquiring locks on the critical path.
1858
  //
1859
  // We need to do this now for a number of reasons:
1860
  // - Prepares, by themselves, are inconsequential, i.e. they do not mutate the
1861
  //   state machine so, were we to crash afterwards, having the prepares in-flight
1862
  //   won't hurt.
1863
  // - Prepares depend on factors external to consensus (the operation drivers and
1864
  //   the tablet peer) so if for some reason they cannot be enqueued we must know
1865
  //   before we try write them to the WAL. Once enqueued, we assume that prepare will
1866
  //   always succeed on a replica operation (because the leader already prepared them
1867
  //   successfully, and thus we know they are valid).
1868
  // - The prepares corresponding to every operation that was logged must be in-flight
1869
  //   first. This because should we need to abort certain operations (say a new leader
1870
  //   says they are not committed) we need to have those prepares in-flight so that
1871
  //   the operations can be continued (in the abort path).
1872
  // - Failure to enqueue prepares is OK, we can continue and let the leader know that
1873
  //   we only went so far. The leader will re-send the remaining messages.
1874
  // - Prepares represent new operations, and operations consume memory. Thus, if the
1875
  //   overall memory pressure on the server is too high, we will reject the prepares.
1876
  //
1877
  // 3 - We enqueue the writes to the WAL.
1878
  //
1879
  // We enqueue writes to the WAL, but only the operations that were successfully
1880
  // enqueued for prepare (for the reasons introduced above). This means that even
1881
  // if a prepare fails to enqueue, if any of the previous prepares were successfully
1882
  // submitted they must be written to the WAL.
1883
  // If writing to the WAL fails, we're in an inconsistent state and we crash. In this
1884
  // case, no one will ever know of the operations we previously prepared so those are
1885
  // inconsequential.
1886
  //
1887
  // 4 - We mark the operations as committed.
1888
  //
1889
  // For each operation which has been committed by the leader, we update the
1890
  // operation state to reflect that. If the logging has already succeeded for that
1891
  // operation, this will trigger the Apply phase. Otherwise, Apply will be triggered
1892
  // when the logging completes. In both cases the Apply phase executes asynchronously.
1893
  // This must, of course, happen after the prepares have been triggered as the same batch
1894
  // can both replicate/prepare and commit/apply an operation.
1895
  //
1896
  // Currently, if a prepare failed to enqueue we still trigger all applies for operations
1897
  // with an id lower than it (if we have them). This is important now as the leader will
1898
  // not re-send those commit messages. This will be moot when we move to the commit
1899
  // commitIndex way of doing things as we can simply ignore the applies as we know
1900
  // they will be triggered with the next successful batch.
1901
  //
1902
  // 5 - We wait for the writes to be durable.
1903
  //
1904
  // Before replying to the leader we wait for the writes to be durable. We then
1905
  // just update the last replicated watermark and respond.
1906
  //
1907
  // TODO - These failure scenarios need to be exercised in an unit
1908
  //        test. Moreover we need to add more fault injection spots (well that
1909
  //        and actually use them) for each of these steps.
1910
10.2M
  TRACE("Updating replica for $0 ops", request->ops_size());
1911
1912
  // The deduplicated request.
1913
10.2M
  LeaderRequest deduped_req;
1914
1915
10.2M
  ReplicaState::UniqueLock lock;
1916
10.2M
  RETURN_NOT_OK(state_->LockForUpdate(&lock));
1917
1918
10.2M
  const auto old_leader = state_->GetLeaderUuidUnlocked();
1919
1920
10.2M
  auto prev_committed_op_id = state_->GetCommittedOpIdUnlocked();
1921
1922
10.2M
  deduped_req.leader_uuid = request->caller_uuid();
1923
1924
10.2M
  RETURN_NOT_OK(CheckLeaderRequestUnlocked(request, response, &deduped_req));
1925
1926
10.2M
  if (response->status().has_error()) {
1927
68.4k
    LOG_WITH_PREFIX(INFO)
1928
68.4k
        << "Returning from UpdateConsensus because of error: " << AsString(response->status());
1929
    // We had an error, like an invalid term, we still fill the response.
1930
68.4k
    FillConsensusResponseOKUnlocked(response);
1931
68.4k
    return UpdateReplicaResult();
1932
68.4k
  }
1933
1934
10.2M
  TEST_PAUSE_IF_FLAG(TEST_pause_update_replica);
1935
1936
  // Snooze the failure detector as soon as we decide to accept the message.
1937
  // We are guaranteed to be acting as a FOLLOWER at this point by the above
1938
  // sanity check.
1939
10.2M
  SnoozeFailureDetector(DO_NOT_LOG);
1940
1941
10.2M
  auto now = MonoTime::Now();
1942
1943
  // Update the expiration time of the current leader's lease, so that when this follower becomes
1944
  // a leader, it can wait out the time interval while the old leader might still be active.
1945
10.2M
  if (request->has_leader_lease_duration_ms()) {
1946
10.2M
    state_->UpdateOldLeaderLeaseExpirationOnNonLeaderUnlocked(
1947
10.2M
        CoarseTimeLease(deduped_req.leader_uuid,
1948
10.2M
                        CoarseMonoClock::now() + request->leader_lease_duration_ms() * 1ms),
1949
10.2M
        PhysicalComponentLease(deduped_req.leader_uuid, request->ht_lease_expiration()));
1950
10.2M
  }
1951
1952
  // Also prohibit voting for anyone for the minimum election timeout.
1953
10.2M
  withhold_votes_until_.store(now + MinimumElectionTimeout(), std::memory_order_release);
1954
1955
  // 1 - Early commit pending (and committed) operations
1956
10.2M
  RETURN_NOT_OK(EarlyCommitUnlocked(*request, deduped_req));
1957
1958
  // 2 - Enqueue the prepares
1959
10.2M
  if (!VERIFY_RESULT(EnqueuePreparesUnlocked(*request, &deduped_req, response))) {
1960
5
    return UpdateReplicaResult();
1961
5
  }
1962
1963
10.2M
  if (deduped_req.committed_op_id.index < prev_committed_op_id.index) {
1964
11
    deduped_req.committed_op_id = prev_committed_op_id;
1965
11
  }
1966
1967
  // 3 - Enqueue the writes.
1968
10.2M
  auto last_from_leader = EnqueueWritesUnlocked(
1969
10.2M
      deduped_req, WriteEmpty(prev_committed_op_id != deduped_req.committed_op_id));
1970
1971
  // 4 - Mark operations as committed
1972
10.2M
  RETURN_NOT_OK(MarkOperationsAsCommittedUnlocked(*request, deduped_req, last_from_leader));
1973
1974
  // Fill the response with the current state. We will not mutate anymore state until
1975
  // we actually reply to the leader, we'll just wait for the messages to be durable.
1976
10.2M
  FillConsensusResponseOKUnlocked(response);
1977
1978
10.2M
  UpdateReplicaResult result;
1979
1980
  // Check if there is an election pending and the op id pending upon has just been committed.
1981
10.2M
  const auto& pending_election_op_id = state_->GetPendingElectionOpIdUnlocked();
1982
10.2M
  result.start_election =
1983
10.2M
      !pending_election_op_id.empty() &&
1984
0
      pending_election_op_id.index <= state_->GetCommittedOpIdUnlocked().index;
1985
1986
10.2M
  if (!deduped_req.messages.empty()) {
1987
4.50M
    result.wait_for_op_id = state_->GetLastReceivedOpIdUnlocked();
1988
4.50M
  }
1989
10.2M
  result.current_term = state_->GetCurrentTermUnlocked();
1990
1991
10.2M
  uint64_t update_time_ms = 0;
1992
10.2M
  if (request->has_propagated_hybrid_time()) {
1993
10.2M
    update_time_ms =  HybridTime::FromPB(
1994
10.2M
        request->propagated_hybrid_time()).GetPhysicalValueMicros() / 1000;
1995
2.99k
  } else if (!deduped_req.messages.empty()) {
1996
106
    update_time_ms = HybridTime::FromPB(
1997
106
        deduped_req.messages.back()->hybrid_time()).GetPhysicalValueMicros() / 1000;
1998
106
  }
1999
10.2M
  follower_last_update_time_ms_metric_->UpdateTimestampInMilliseconds(
2000
10.2M
      (update_time_ms > 0 ? update_time_ms : clock_->Now().GetPhysicalValueMicros() / 1000));
2001
10.2M
  TRACE("UpdateReplica() finished");
2002
10.2M
  return result;
2003
10.2M
}
2004
2005
Status RaftConsensus::EarlyCommitUnlocked(const ConsensusRequestPB& request,
2006
10.2M
                                          const LeaderRequest& deduped_req) {
2007
  // What should we commit?
2008
  // 1. As many pending operations as we can, except...
2009
  // 2. ...if we commit beyond the preceding index, we'd regress KUDU-639
2010
  //    ("Leader doesn't overwrite demoted follower's log properly"), and...
2011
  // 3. ...the leader's committed index is always our upper bound.
2012
10.2M
  auto early_apply_up_to = state_->GetLastPendingOperationOpIdUnlocked();
2013
10.2M
  if (deduped_req.preceding_op_id.index < early_apply_up_to.index) {
2014
1
    early_apply_up_to = deduped_req.preceding_op_id;
2015
1
  }
2016
10.2M
  if (request.committed_op_id().index() < early_apply_up_to.index) {
2017
18.3k
    early_apply_up_to = yb::OpId::FromPB(request.committed_op_id());
2018
18.3k
  }
2019
2020
18.4E
  VLOG_WITH_PREFIX(1) << "Early marking committed up to " << early_apply_up_to;
2021
10.2M
  TRACE("Early marking committed up to $0.$1", early_apply_up_to.term, early_apply_up_to.index);
2022
10.2M
  return ResultToStatus(state_->AdvanceCommittedOpIdUnlocked(early_apply_up_to, CouldStop::kTrue));
2023
10.2M
}
2024
2025
Result<bool> RaftConsensus::EnqueuePreparesUnlocked(const ConsensusRequestPB& request,
2026
                                                    LeaderRequest* deduped_req_ptr,
2027
10.2M
                                                    ConsensusResponsePB* response) {
2028
10.2M
  LeaderRequest& deduped_req = *deduped_req_ptr;
2029
10.2M
  TRACE("Triggering prepare for $0 ops", deduped_req.messages.size());
2030
2031
10.2M
  Status prepare_status;
2032
10.2M
  auto iter = deduped_req.messages.begin();
2033
2034
10.2M
  if (PREDICT_TRUE(!deduped_req.messages.empty())) {
2035
    // TODO Temporary until the leader explicitly propagates the safe hybrid_time.
2036
    // TODO: what if there is a failure here because the updated time is too far in the future?
2037
4.50M
    clock_->Update(HybridTime(deduped_req.messages.back()->hybrid_time()));
2038
4.50M
  }
2039
2040
10.2M
  HybridTime propagated_safe_time;
2041
10.2M
  if (request.has_propagated_safe_time()) {
2042
10.2M
    propagated_safe_time = HybridTime(request.propagated_safe_time());
2043
10.2M
    if (deduped_req.messages.empty()) {
2044
5.70M
      state_->context()->SetPropagatedSafeTime(propagated_safe_time);
2045
5.70M
    }
2046
10.2M
  }
2047
2048
10.2M
  if (iter != deduped_req.messages.end()) {
2049
5.17M
    for (;;) {
2050
5.17M
      const ReplicateMsgPtr& msg = *iter;
2051
5.17M
      ++iter;
2052
5.17M
      bool last = iter == deduped_req.messages.end();
2053
5.17M
      prepare_status = StartReplicaOperationUnlocked(
2054
4.50M
          msg, last ? propagated_safe_time : HybridTime::kInvalid);
2055
5.17M
      if (PREDICT_FALSE(!prepare_status.ok())) {
2056
136
        --iter;
2057
136
        LOG_WITH_PREFIX(WARNING) << "StartReplicaOperationUnlocked failed: " << prepare_status;
2058
136
        break;
2059
136
      }
2060
5.17M
      if (last) {
2061
4.51M
        break;
2062
4.51M
      }
2063
5.17M
    }
2064
4.50M
  }
2065
2066
  // If we stopped before reaching the end we failed to prepare some message(s) and need
2067
  // to perform cleanup, namely trimming deduped_req.messages to only contain the messages
2068
  // that were actually prepared, and deleting the other ones since we've taken ownership
2069
  // when we first deduped.
2070
10.2M
  bool incomplete = iter != deduped_req.messages.end();
2071
10.2M
  if (incomplete) {
2072
136
    {
2073
136
      const ReplicateMsgPtr msg = *iter;
2074
136
      LOG_WITH_PREFIX(WARNING)
2075
136
          << "Could not prepare operation for op: "
2076
136
          << msg->id() << ". Suppressed " << (deduped_req.messages.end() - iter - 1)
2077
136
          << " other warnings. Status for this op: " << prepare_status;
2078
136
      deduped_req.messages.erase(iter, deduped_req.messages.end());
2079
136
    }
2080
2081
    // If this is empty, it means we couldn't prepare a single de-duped message. There is nothing
2082
    // else we can do. The leader will detect this and retry later.
2083
136
    if (deduped_req.messages.empty()) {
2084
5
      auto msg = Format("Rejecting Update request from peer $0 for term $1. "
2085
5
                        "Could not prepare a single operation due to: $2",
2086
5
                        request.caller_uuid(),
2087
5
                        request.caller_term(),
2088
5
                        prepare_status);
2089
5
      LOG_WITH_PREFIX(INFO) << msg;
2090
5
      FillConsensusResponseError(response, ConsensusErrorPB::CANNOT_PREPARE,
2091
5
                                 STATUS(IllegalState, msg));
2092
5
      FillConsensusResponseOKUnlocked(response);
2093
5
      return false;
2094
5
    }
2095
10.2M
  }
2096
2097
10.2M
  deduped_req.committed_op_id = yb::OpId::FromPB(request.committed_op_id());
2098
10.2M
  if (!deduped_req.messages.empty()) {
2099
4.51M
    auto last_op_id = yb::OpId::FromPB(deduped_req.messages.back()->id());
2100
4.51M
    if (deduped_req.committed_op_id > last_op_id) {
2101
0
      LOG_IF_WITH_PREFIX(DFATAL, !incomplete)
2102
0
          << "Received committed op id: " << deduped_req.committed_op_id
2103
0
          << ", past last known op id: " << last_op_id;
2104
2105
      // It is possible that we failed to prepare of of messages,
2106
      // so limit committed op id to avoid having committed op id past last known op it.
2107
127
      deduped_req.committed_op_id = last_op_id;
2108
127
    }
2109
4.51M
  }
2110
2111
10.2M
  return true;
2112
10.2M
}
2113
2114
yb::OpId RaftConsensus::EnqueueWritesUnlocked(const LeaderRequest& deduped_req,
2115
10.2M
                                              WriteEmpty write_empty) {
2116
  // Now that we've triggered the prepares enqueue the operations to be written
2117
  // to the WAL.
2118
10.2M
  if (PREDICT_TRUE(!deduped_req.messages.empty()) || write_empty) {
2119
    // Trigger the log append asap, if fsync() is on this might take a while
2120
    // and we can't reply until this is done.
2121
    //
2122
    // Since we've prepared, we need to be able to append (or we risk trying to apply
2123
    // later something that wasn't logged). We crash if we can't.
2124
8.21M
    CHECK_OK(queue_->AppendOperations(
2125
8.21M
        deduped_req.messages, deduped_req.committed_op_id, state_->Clock().Now()));
2126
8.21M
  }
2127
2128
10.2M
  return !deduped_req.messages.empty() ?
2129
5.71M
      yb::OpId::FromPB(deduped_req.messages.back()->id()) : deduped_req.preceding_op_id;
2130
10.2M
}
2131
2132
4.50M
Status RaftConsensus::WaitForWrites(int64_t term, const OpId& wait_for_op_id) {
2133
  // 5 - We wait for the writes to be durable.
2134
2135
  // Note that this is safe because dist consensus now only supports a single outstanding
2136
  // request at a time and this way we can allow commits to proceed while we wait.
2137
4.50M
  TRACE("Waiting on the replicates to finish logging");
2138
4.50M
  TRACE_EVENT0("consensus", "Wait for log");
2139
4.50M
  for (;;) {
2140
4.50M
    auto wait_result = log_->WaitForSafeOpIdToApply(
2141
4.50M
        wait_for_op_id, MonoDelta::FromMilliseconds(FLAGS_raft_heartbeat_interval_ms));
2142
    // If just waiting for our log append to finish lets snooze the timer.
2143
    // We don't want to fire leader election because we're waiting on our own log.
2144
4.50M
    if (!wait_result.empty()) {
2145
4.50M
      break;
2146
4.50M
    }
2147
18.4E
    int64_t new_term;
2148
18.4E
    {
2149
18.4E
      auto lock = state_->LockForRead();
2150
18.4E
      new_term = state_->GetCurrentTermUnlocked();
2151
18.4E
    }
2152
18.4E
    if (term != new_term) {
2153
0
      return STATUS_FORMAT(IllegalState, "Term changed to $0 while waiting for writes in term $1",
2154
0
                           new_term, term);
2155
0
    }
2156
2157
18.4E
    SnoozeFailureDetector(DO_NOT_LOG);
2158
2159
18.4E
    const auto election_timeout_at = MonoTime::Now() + MinimumElectionTimeout();
2160
18.4E
    UpdateAtomicMax(&withhold_votes_until_, election_timeout_at);
2161
18.4E
  }
2162
4.50M
  TRACE("Finished waiting on the replicates to finish logging");
2163
2164
4.50M
  return Status::OK();
2165
4.50M
}
2166
2167
Status RaftConsensus::MarkOperationsAsCommittedUnlocked(const ConsensusRequestPB& request,
2168
                                                        const LeaderRequest& deduped_req,
2169
10.2M
                                                        yb::OpId last_from_leader) {
2170
  // Choose the last operation to be applied. This will either be 'committed_index', if
2171
  // no prepare enqueuing failed, or the minimum between 'committed_index' and the id of
2172
  // the last successfully enqueued prepare, if some prepare failed to enqueue.
2173
10.2M
  yb::OpId apply_up_to;
2174
10.2M
  if (last_from_leader.index < request.committed_op_id().index()) {
2175
    // we should never apply anything later than what we received in this request
2176
128
    apply_up_to = last_from_leader;
2177
2178
128
    LOG_WITH_PREFIX(INFO)
2179
128
        << "Received commit index " << request.committed_op_id()
2180
128
        << " from the leader but only marked up to " << apply_up_to << " as committed.";
2181
10.2M
  } else {
2182
10.2M
    apply_up_to = yb::OpId::FromPB(request.committed_op_id());
2183
10.2M
  }
2184
2185
  // We can now update the last received watermark.
2186
  //
2187
  // We do it here (and before we actually hear back from the wal whether things
2188
  // are durable) so that, if we receive another, possible duplicate, message
2189
  // that exercises this path we don't handle these messages twice.
2190
  //
2191
  // If any messages failed to be started locally, then we already have removed them
2192
  // from 'deduped_req' at this point. So, we can simply update our last-received
2193
  // watermark to the last message that remains in 'deduped_req'.
2194
  //
2195
  // It's possible that the leader didn't send us any new data -- it might be a completely
2196
  // duplicate request. In that case, we don't need to update LastReceived at all.
2197
10.2M
  if (!deduped_req.messages.empty()) {
2198
4.50M
    OpIdPB last_appended = deduped_req.messages.back()->id();
2199
4.50M
    TRACE(Substitute("Updating last received op as $0", last_appended.ShortDebugString()));
2200
4.50M
    state_->UpdateLastReceivedOpIdUnlocked(last_appended);
2201
5.71M
  } else if (state_->GetLastReceivedOpIdUnlocked().index < deduped_req.preceding_op_id.index) {
2202
0
    return STATUS_FORMAT(InvalidArgument,
2203
0
                         "Bad preceding_opid: $0, last received: $1",
2204
0
                         deduped_req.preceding_op_id,
2205
0
                         state_->GetLastReceivedOpIdUnlocked());
2206
0
  }
2207
2208
10.2M
  VLOG_WITH_PREFIX(1) << "Marking committed up to " << apply_up_to;
2209
10.2M
  TRACE(Format("Marking committed up to $0", apply_up_to));
2210
10.2M
  return ResultToStatus(state_->AdvanceCommittedOpIdUnlocked(apply_up_to, CouldStop::kTrue));
2211
10.2M
}
2212
2213
10.2M
void RaftConsensus::FillConsensusResponseOKUnlocked(ConsensusResponsePB* response) {
2214
10.2M
  TRACE("Filling consensus response to leader.");
2215
10.2M
  response->set_responder_term(state_->GetCurrentTermUnlocked());
2216
10.2M
  state_->GetLastReceivedOpIdUnlocked().ToPB(response->mutable_status()->mutable_last_received());
2217
10.2M
  state_->GetLastReceivedOpIdCurLeaderUnlocked().ToPB(
2218
10.2M
      response->mutable_status()->mutable_last_received_current_leader());
2219
10.2M
  response->mutable_status()->set_last_committed_idx(state_->GetCommittedOpIdUnlocked().index);
2220
10.2M
  state_->GetLastAppliedOpIdUnlocked().ToPB(response->mutable_status()->mutable_last_applied());
2221
10.2M
}
2222
2223
void RaftConsensus::FillConsensusResponseError(ConsensusResponsePB* response,
2224
                                               ConsensusErrorPB::Code error_code,
2225
68.4k
                                               const Status& status) {
2226
68.4k
  ConsensusErrorPB* error = response->mutable_status()->mutable_error();
2227
68.4k
  error->set_code(error_code);
2228
68.4k
  StatusToPB(status, error->mutable_status());
2229
68.4k
}
2230
2231
125k
Status RaftConsensus::RequestVote(const VoteRequestPB* request, VoteResponsePB* response) {
2232
125k
  TRACE_EVENT2("consensus", "RaftConsensus::RequestVote",
2233
125k
               "peer", peer_uuid(),
2234
125k
               "tablet", tablet_id());
2235
125k
  bool preelection = request->preelection();
2236
2237
125k
  response->set_responder_uuid(state_->GetPeerUuid());
2238
125k
  response->set_preelection(preelection);
2239
2240
  // We must acquire the update lock in order to ensure that this vote action
2241
  // takes place between requests.
2242
  // Lock ordering: The update lock must be acquired before the ReplicaState lock.
2243
125k
  std::unique_lock<decltype(update_mutex_)> update_guard(update_mutex_, std::defer_lock);
2244
125k
  if (FLAGS_enable_leader_failure_detection) {
2245
124k
    update_guard.try_lock();
2246
374
  } else {
2247
    // If failure detection is not enabled, then we can't just reject the vote,
2248
    // because there will be no automatic retry later. So, block for the lock.
2249
374
    update_guard.lock();
2250
374
  }
2251
125k
  if (!update_guard.owns_lock()) {
2252
    // There is another vote or update concurrent with the vote. In that case, that
2253
    // other request is likely to reset the timer, and we'll end up just voting
2254
    // "NO" after waiting. To avoid starving RPC handlers and causing cascading
2255
    // timeouts, just vote a quick NO.
2256
    //
2257
    // We still need to take the state lock in order to respond with term info, etc.
2258
1.98k
    ReplicaState::UniqueLock state_guard;
2259
1.98k
    RETURN_NOT_OK(state_->LockForConfigChange(&state_guard));
2260
1.98k
    return RequestVoteRespondIsBusy(request, response);
2261
123k
  }
2262
2263
  // Acquire the replica state lock so we can read / modify the consensus state.
2264
123k
  ReplicaState::UniqueLock state_guard;
2265
123k
  RETURN_NOT_OK(state_->LockForConfigChange(&state_guard));
2266
2267
  // If the node is not in the configuration, allow the vote (this is required by Raft)
2268
  // but log an informational message anyway.
2269
123k
  if (!IsRaftConfigMember(request->candidate_uuid(), state_->GetActiveConfigUnlocked())) {
2270
35
    LOG_WITH_PREFIX(INFO) << "Handling vote request from an unknown peer "
2271
35
                          << request->candidate_uuid();
2272
35
  }
2273
2274
  // If we've heard recently from the leader, then we should ignore the request
2275
  // (except if it is the leader itself requesting a vote -- something that might
2276
  //  happen if the leader were to stepdown and call an election.). Otherwise,
2277
  // it might be from a "disruptive" server. This could happen in a few cases:
2278
  //
2279
  // 1) Network partitions
2280
  // If the leader can talk to a majority of the nodes, but is partitioned from a
2281
  // bad node, the bad node's failure detector will trigger. If the bad node is
2282
  // able to reach other nodes in the cluster, it will continuously trigger elections.
2283
  //
2284
  // 2) An abandoned node
2285
  // It's possible that a node has fallen behind the log GC mark of the leader. In that
2286
  // case, the leader will stop sending it requests. Eventually, the configuration
2287
  // will change to eject the abandoned node, but until that point, we don't want the
2288
  // abandoned follower to disturb the other nodes.
2289
  //
2290
  // See also https://ramcloud.stanford.edu/~ongaro/thesis.pdf
2291
  // section 4.2.3.
2292
123k
  MonoTime now = MonoTime::Now();
2293
123k
  if (request->candidate_uuid() != state_->GetLeaderUuidUnlocked() &&
2294
122k
      !request->ignore_live_leader() &&
2295
24.9k
      now < withhold_votes_until_.load(std::memory_order_acquire)) {
2296
319
    return RequestVoteRespondLeaderIsAlive(request, response);
2297
319
  }
2298
2299
  // Candidate is running behind.
2300
122k
  if (request->candidate_term() < state_->GetCurrentTermUnlocked()) {
2301
113
    return RequestVoteRespondInvalidTerm(request, response);
2302
113
  }
2303
2304
  // We already voted this term.
2305
122k
  if (request->candidate_term() == state_->GetCurrentTermUnlocked() &&
2306
696
      state_->HasVotedCurrentTermUnlocked()) {
2307
2308
    // Already voted for the same candidate in the current term.
2309
239
    if (state_->GetVotedForCurrentTermUnlocked() == request->candidate_uuid()) {
2310
48
      return RequestVoteRespondVoteAlreadyGranted(request, response);
2311
48
    }
2312
2313
    // Voted for someone else in current term.
2314
191
    return RequestVoteRespondAlreadyVotedForOther(request, response);
2315
191
  }
2316
2317
  // The term advanced.
2318
122k
  if (request->candidate_term() > state_->GetCurrentTermUnlocked() && !preelection) {
2319
60.9k
    RETURN_NOT_OK_PREPEND(HandleTermAdvanceUnlocked(request->candidate_term()),
2320
60.9k
        Substitute("Could not step down in RequestVote. Current term: $0, candidate term: $1",
2321
60.9k
            state_->GetCurrentTermUnlocked(), request->candidate_term()));
2322
60.9k
  }
2323
2324
  // Candidate must have last-logged OpId at least as large as our own to get our vote.
2325
122k
  OpIdPB local_last_logged_opid;
2326
122k
  GetLatestOpIdFromLog().ToPB(&local_last_logged_opid);
2327
122k
  if (OpIdLessThan(request->candidate_status().last_received(), local_last_logged_opid)) {
2328
166
    return RequestVoteRespondLastOpIdTooOld(local_last_logged_opid, request, response);
2329
166
  }
2330
2331
122k
  if (!preelection) {
2332
    // Clear the pending election op id if any before granting the vote. If another peer jumps in
2333
    // before we can catch up and start the election, let's not disrupt the quorum with another
2334
    // election.
2335
61.0k
    state_->ClearPendingElectionOpIdUnlocked();
2336
61.0k
  }
2337
2338
122k
  auto remaining_old_leader_lease = state_->RemainingOldLeaderLeaseDuration();
2339
2340
122k
  if (remaining_old_leader_lease.Initialized()) {
2341
10.0k
    response->set_remaining_leader_lease_duration_ms(
2342
10.0k
        narrow_cast<int32_t>(remaining_old_leader_lease.ToMilliseconds()));
2343
10.0k
    response->set_leader_lease_uuid(state_->old_leader_lease().holder_uuid);
2344
10.0k
  }
2345
2346
122k
  const auto& old_leader_ht_lease = state_->old_leader_ht_lease();
2347
122k
  if (old_leader_ht_lease) {
2348
11.0k
    response->set_leader_ht_lease_expiration(old_leader_ht_lease.expiration);
2349
11.0k
    response->set_leader_ht_lease_uuid(old_leader_ht_lease.holder_uuid);
2350
11.0k
  }
2351
2352
  // Passed all our checks. Vote granted.
2353
122k
  if (preelection) {
2354
61.1k
    LOG_WITH_PREFIX(INFO) << "Pre-election. Granting vote for candidate "
2355
61.1k
                          << request->candidate_uuid() << " in term " << request->candidate_term();
2356
61.1k
    FillVoteResponseVoteGranted(*request, response);
2357
61.1k
    return Status::OK();
2358
61.1k
  }
2359
2360
61.0k
  return RequestVoteRespondVoteGranted(request, response);
2361
61.0k
}
2362
2363
Status RaftConsensus::IsLeaderReadyForChangeConfigUnlocked(ChangeConfigType type,
2364
3.04M
                                                           const string& server_uuid) {
2365
3.04M
  const RaftConfigPB& active_config = state_->GetActiveConfigUnlocked();
2366
3.04M
  size_t servers_in_transition = 0;
2367
3.04M
  if (type == ADD_SERVER) {
2368
1.20k
    servers_in_transition = CountServersInTransition(active_config);
2369
3.04M
  } else if (type == REMOVE_SERVER) {
2370
    // If we are trying to remove the server in transition, then servers_in_transition shouldn't
2371
    // count it so we can proceed with the operation.
2372
1.13k
    servers_in_transition = CountServersInTransition(active_config, server_uuid);
2373
1.13k
  }
2374
2375
  // Check that all the following requirements are met:
2376
  // 1. We are required by Raft to reject config change operations until we have
2377
  //    committed at least one operation in our current term as leader.
2378
  //    See https://groups.google.com/forum/#!topic/raft-dev/t4xj6dJTP6E
2379
  // 2. Ensure there is no other pending change config.
2380
  // 3. There are no peers that are in the process of becoming VOTERs or OBSERVERs.
2381
3.04M
  if (!state_->AreCommittedAndCurrentTermsSameUnlocked() ||
2382
3.04M
      state_->IsConfigChangePendingUnlocked() ||
2383
3.04M
      servers_in_transition != 0) {
2384
3.04M
    return STATUS_FORMAT(IllegalState,
2385
3.04M
                         "Leader is not ready for Config Change, can try again. "
2386
3.04M
                         "Num peers in transit: $0. Type: $1. Has opid: $2. Committed config: $3. "
2387
3.04M
                         "Pending config: $4. Current term: $5. Committed op id: $6.",
2388
3.04M
                         servers_in_transition, ChangeConfigType_Name(type),
2389
3.04M
                         active_config.has_opid_index(),
2390
3.04M
                         state_->GetCommittedConfigUnlocked().ShortDebugString(),
2391
3.04M
                         state_->IsConfigChangePendingUnlocked() ?
2392
3.04M
                             state_->GetPendingConfigUnlocked().ShortDebugString() : "",
2393
3.04M
                         state_->GetCurrentTermUnlocked(), state_->GetCommittedOpIdUnlocked());
2394
3.04M
  }
2395
2396
2.96k
  return Status::OK();
2397
2.96k
}
2398
2399
Status RaftConsensus::ChangeConfig(const ChangeConfigRequestPB& req,
2400
                                   const StdStatusCallback& client_cb,
2401
3.04M
                                   boost::optional<TabletServerErrorPB::Code>* error_code) {
2402
3.04M
  if (PREDICT_FALSE(!req.has_type())) {
2403
0
    return STATUS(InvalidArgument, "Must specify 'type' argument to ChangeConfig()",
2404
0
                                   req.ShortDebugString());
2405
0
  }
2406
3.04M
  if (PREDICT_FALSE(!req.has_server())) {
2407
0
    *error_code = TabletServerErrorPB::INVALID_CONFIG;
2408
0
    return STATUS(InvalidArgument, "Must specify 'server' argument to ChangeConfig()",
2409
0
                                   req.ShortDebugString());
2410
0
  }
2411
3.04M
  YB_LOG_EVERY_N(INFO, FLAGS_TEST_log_change_config_every_n)
2412
4.57k
      << "Received ChangeConfig request " << req.ShortDebugString();
2413
3.04M
  ChangeConfigType type = req.type();
2414
3.04M
  bool use_hostport = req.has_use_host() && req.use_host();
2415
2416
3.04M
  if (type != REMOVE_SERVER && use_hostport) {
2417
0
    return STATUS_SUBSTITUTE(InvalidArgument, "Cannot set use_host for change config type $0, "
2418
0
                             "only allowed with REMOVE_SERVER.", type);
2419
0
  }
2420
2421
3.04M
  if (PREDICT_FALSE(FLAGS_TEST_return_error_on_change_config != 0.0 && type == CHANGE_ROLE)) {
2422
0
    DCHECK(FLAGS_TEST_return_error_on_change_config >= 0.0 &&
2423
0
           FLAGS_TEST_return_error_on_change_config <= 1.0);
2424
0
    if (clock_->Now().ToUint64() % 100 < 100 * FLAGS_TEST_return_error_on_change_config) {
2425
0
      return STATUS(IllegalState, "Returning error for unit test");
2426
0
    }
2427
3.04M
  }
2428
3.04M
  RaftPeerPB* new_peer = nullptr;
2429
3.04M
  const RaftPeerPB& server = req.server();
2430
3.04M
  if (!use_hostport && !server.has_permanent_uuid()) {
2431
0
    return STATUS(InvalidArgument,
2432
0
                  Substitute("server must have permanent_uuid or use_host specified: $0",
2433
0
                             req.ShortDebugString()));
2434
0
  }
2435
3.04M
  {
2436
3.04M
    ReplicaState::UniqueLock lock;
2437
3.04M
    RETURN_NOT_OK(state_->LockForConfigChange(&lock));
2438
3.04M
    Status s = state_->CheckActiveLeaderUnlocked(LeaderLeaseCheckMode::DONT_NEED_LEASE);
2439
3.04M
    if (!s.ok()) {
2440
331
      *error_code = TabletServerErrorPB::NOT_THE_LEADER;
2441
331
      return s;
2442
331
    }
2443
2444
3.04M
    const string& server_uuid = server.has_permanent_uuid() ? server.permanent_uuid() : "";
2445
3.04M
    s = IsLeaderReadyForChangeConfigUnlocked(type, server_uuid);
2446
3.04M
    if (!s.ok()) {
2447
3.04M
      YB_LOG_EVERY_N(INFO, FLAGS_TEST_log_change_config_every_n)
2448
1.38k
          << "Returning not ready for " << ChangeConfigType_Name(type)
2449
1.38k
          << " due to error " << s.ToString();
2450
3.04M
      *error_code = TabletServerErrorPB::LEADER_NOT_READY_CHANGE_CONFIG;
2451
3.04M
      return s;
2452
3.04M
    }
2453
2454
2.96k
    const RaftConfigPB& committed_config = state_->GetCommittedConfigUnlocked();
2455
2456
    // Support atomic ChangeConfig requests.
2457
2.96k
    if (req.has_cas_config_opid_index()) {
2458
1.85k
      if (committed_config.opid_index() != req.cas_config_opid_index()) {
2459
3
        *error_code = TabletServerErrorPB::CAS_FAILED;
2460
3
        return STATUS(IllegalState, Substitute("Request specified cas_config_opid_index "
2461
3
                                               "of $0 but the committed config has opid_index "
2462
3
                                               "of $1",
2463
3
                                               req.cas_config_opid_index(),
2464
3
                                               committed_config.opid_index()));
2465
3
      }
2466
2.95k
    }
2467
2468
2.95k
    RaftConfigPB new_config = committed_config;
2469
2.95k
    new_config.clear_opid_index();
2470
2.95k
    switch (type) {
2471
1.04k
      case ADD_SERVER:
2472
        // Ensure the server we are adding is not already a member of the configuration.
2473
1.04k
        if (IsRaftConfigMember(server_uuid, committed_config)) {
2474
0
          *error_code = TabletServerErrorPB::ADD_CHANGE_CONFIG_ALREADY_PRESENT;
2475
0
          return STATUS(IllegalState,
2476
0
              Substitute("Server with UUID $0 is already a member of the config. RaftConfig: $1",
2477
0
                        server_uuid, committed_config.ShortDebugString()));
2478
0
        }
2479
1.04k
        if (!server.has_member_type()) {
2480
0
          return STATUS(InvalidArgument,
2481
0
                        Substitute("Server must have member_type specified. Request: $0",
2482
0
                                   req.ShortDebugString()));
2483
0
        }
2484
1.04k
        if (server.member_type() != PeerMemberType::PRE_VOTER &&
2485
60
            server.member_type() != PeerMemberType::PRE_OBSERVER) {
2486
0
          return STATUS(InvalidArgument,
2487
0
              Substitute("Server with UUID $0 must be of member_type PRE_VOTER or PRE_OBSERVER. "
2488
0
                         "member_type received: $1", server_uuid,
2489
0
                         PeerMemberType_Name(server.member_type())));
2490
0
        }
2491
1.04k
        if (server.last_known_private_addr().empty()) {
2492
0
          return STATUS(InvalidArgument, "server must have last_known_addr specified",
2493
0
                                         req.ShortDebugString());
2494
0
        }
2495
1.04k
        new_peer = new_config.add_peers();
2496
1.04k
        *new_peer = server;
2497
1.04k
        break;
2498
2499
925
      case REMOVE_SERVER:
2500
925
        if (use_hostport) {
2501
4
          if (server.last_known_private_addr().empty()) {
2502
0
            return STATUS(InvalidArgument, "Must have last_known_addr specified.",
2503
0
                          req.ShortDebugString());
2504
0
          }
2505
4
          HostPort leader_hp;
2506
4
          RETURN_NOT_OK(GetHostPortFromConfig(
2507
4
              new_config, peer_uuid(), queue_->local_cloud_info(), &leader_hp));
2508
4
          for (const auto& host_port : server.last_known_private_addr()) {
2509
4
            if (leader_hp.port() == host_port.port() && leader_hp.host() == host_port.host()) {
2510
0
              return STATUS(InvalidArgument, "Cannot remove live leader using hostport.",
2511
0
                            req.ShortDebugString());
2512
0
            }
2513
4
          }
2514
4
        }
2515
925
        if (server_uuid == peer_uuid()) {
2516
24
          *error_code = TabletServerErrorPB::LEADER_NEEDS_STEP_DOWN;
2517
24
          return STATUS(InvalidArgument,
2518
24
              Substitute("Cannot remove peer $0 from the config because it is the leader. "
2519
24
                         "Force another leader to be elected to remove this server. "
2520
24
                         "Active consensus state: $1", server_uuid,
2521
24
                         state_->ConsensusStateUnlocked(CONSENSUS_CONFIG_ACTIVE)
2522
24
                            .ShortDebugString()));
2523
24
        }
2524
901
        if (!RemoveFromRaftConfig(&new_config, req)) {
2525
0
          *error_code = TabletServerErrorPB::REMOVE_CHANGE_CONFIG_NOT_PRESENT;
2526
0
          return STATUS(NotFound,
2527
0
              Substitute("Server with UUID $0 not a member of the config. RaftConfig: $1",
2528
0
                        server_uuid, committed_config.ShortDebugString()));
2529
0
        }
2530
901
        break;
2531
2532
989
      case CHANGE_ROLE:
2533
989
        if (server_uuid == peer_uuid()) {
2534
0
          return STATUS(InvalidArgument,
2535
0
              Substitute("Cannot change role of peer $0 because it is the leader. Force "
2536
0
                         "another leader to be elected. Active consensus state: $1", server_uuid,
2537
0
                         state_->ConsensusStateUnlocked(CONSENSUS_CONFIG_ACTIVE)
2538
0
                             .ShortDebugString()));
2539
0
        }
2540
0
        VLOG(3) << "config before CHANGE_ROLE: " << new_config.DebugString();
2541
2542
989
        if (!GetMutableRaftConfigMember(&new_config, server_uuid, &new_peer).ok()) {
2543
0
          return STATUS(NotFound,
2544
0
            Substitute("Server with UUID $0 not a member of the config. RaftConfig: $1",
2545
0
                       server_uuid, new_config.ShortDebugString()));
2546
0
        }
2547
989
        if (new_peer->member_type() != PeerMemberType::PRE_OBSERVER &&
2548
932
            new_peer->member_type() != PeerMemberType::PRE_VOTER) {
2549
0
          return STATUS(IllegalState, Substitute("Cannot change role of server with UUID $0 "
2550
0
                                                 "because its member type is $1",
2551
0
                                                 server_uuid, new_peer->member_type()));
2552
0
        }
2553
989
        if (new_peer->member_type() == PeerMemberType::PRE_OBSERVER) {
2554
57
          new_peer->set_member_type(PeerMemberType::OBSERVER);
2555
932
        } else {
2556
932
          new_peer->set_member_type(PeerMemberType::VOTER);
2557
932
        }
2558
2559
0
        VLOG(3) << "config after CHANGE_ROLE: " << new_config.DebugString();
2560
989
        break;
2561
0
      default:
2562
0
        return STATUS(InvalidArgument, Substitute("Unsupported type $0",
2563
2.93k
                                                  ChangeConfigType_Name(type)));
2564
2.93k
    }
2565
2566
2.93k
    auto cc_replicate = std::make_shared<ReplicateMsg>();
2567
2.93k
    cc_replicate->set_op_type(CHANGE_CONFIG_OP);
2568
2.93k
    ChangeConfigRecordPB* cc_req = cc_replicate->mutable_change_config_record();
2569
2.93k
    cc_req->set_tablet_id(tablet_id());
2570
2.93k
    *cc_req->mutable_old_config() = committed_config;
2571
2.93k
    *cc_req->mutable_new_config() = new_config;
2572
    // Note: This hybrid_time has no meaning from a serialization perspective
2573
    // because this method is not executed on the TabletPeer's prepare thread.
2574
2.93k
    cc_replicate->set_hybrid_time(clock_->Now().ToUint64());
2575
2.93k
    state_->GetCommittedOpIdUnlocked().ToPB(cc_replicate->mutable_committed_op_id());
2576
2577
2.93k
    auto context = std::make_shared<StateChangeContext>(
2578
2.93k
        StateChangeReason::LEADER_CONFIG_CHANGE_COMPLETE, *cc_req,
2579
2.03k
        type == REMOVE_SERVER ? server_uuid : "");
2580
2581
2.93k
    RETURN_NOT_OK(
2582
2.93k
        ReplicateConfigChangeUnlocked(cc_replicate,
2583
2.93k
                                      new_config,
2584
2.93k
                                      type,
2585
2.93k
                                      std::bind(&RaftConsensus::MarkDirtyOnSuccess,
2586
2.93k
                                           this,
2587
2.93k
                                           std::move(context),
2588
2.93k
                                           std::move(client_cb), std::placeholders::_1)));
2589
2.93k
  }
2590
2591
2.93k
  peer_manager_->SignalRequest(RequestTriggerMode::kNonEmptyOnly);
2592
2593
2.93k
  return Status::OK();
2594
2.93k
}
2595
2596
Status RaftConsensus::UnsafeChangeConfig(
2597
    const UnsafeChangeConfigRequestPB& req,
2598
0
    boost::optional<tserver::TabletServerErrorPB::Code>* error_code) {
2599
0
  if (PREDICT_FALSE(!req.has_new_config())) {
2600
0
    *error_code = TabletServerErrorPB::INVALID_CONFIG;
2601
0
    return STATUS(InvalidArgument, "Request must contain 'new_config' argument "
2602
0
                  "to UnsafeChangeConfig()", yb::ToString(req));
2603
0
  }
2604
0
  if (PREDICT_FALSE(!req.has_caller_id())) {
2605
0
    *error_code = TabletServerErrorPB::INVALID_CONFIG;
2606
0
    return STATUS(InvalidArgument, "Must specify 'caller_id' argument to UnsafeChangeConfig()",
2607
0
                  yb::ToString(req));
2608
0
  }
2609
2610
  // Grab the committed config and current term on this node.
2611
0
  int64_t current_term;
2612
0
  RaftConfigPB committed_config;
2613
0
  OpId last_committed_opid;
2614
0
  OpId preceding_opid;
2615
0
  string local_peer_uuid;
2616
0
  {
2617
    // Take the snapshot of the replica state and queue state so that
2618
    // we can stick them in the consensus update request later.
2619
0
    auto lock = state_->LockForRead();
2620
0
    local_peer_uuid = state_->GetPeerUuid();
2621
0
    current_term = state_->GetCurrentTermUnlocked();
2622
0
    committed_config = state_->GetCommittedConfigUnlocked();
2623
0
    if (state_->IsConfigChangePendingUnlocked()) {
2624
0
      LOG_WITH_PREFIX(WARNING) << "Replica has a pending config, but the new config "
2625
0
                               << "will be unsafely changed anyway. "
2626
0
                               << "Currently pending config on the node: "
2627
0
                               << yb::ToString(state_->GetPendingConfigUnlocked());
2628
0
    }
2629
0
    last_committed_opid = state_->GetCommittedOpIdUnlocked();
2630
0
    preceding_opid = state_->GetLastAppliedOpIdUnlocked();
2631
0
  }
2632
2633
  // Validate that passed replica uuids are part of the committed config
2634
  // on this node.  This allows a manual recovery tool to only have to specify
2635
  // the uuid of each replica in the new config without having to know the
2636
  // addresses of each server (since we can get the address information from
2637
  // the committed config). Additionally, only a subset of the committed config
2638
  // is required for typical cluster repair scenarios.
2639
0
  std::unordered_set<string> retained_peer_uuids;
2640
0
  const RaftConfigPB& config = req.new_config();
2641
0
  for (const RaftPeerPB& new_peer : config.peers()) {
2642
0
    const string& peer_uuid = new_peer.permanent_uuid();
2643
0
    retained_peer_uuids.insert(peer_uuid);
2644
0
    if (!IsRaftConfigMember(peer_uuid, committed_config)) {
2645
0
      *error_code = TabletServerErrorPB::INVALID_CONFIG;
2646
0
      return STATUS(InvalidArgument, Substitute("Peer with uuid $0 is not in the committed  "
2647
0
                                                "config on this replica, rejecting the  "
2648
0
                                                "unsafe config change request for tablet $1. "
2649
0
                                                "Committed config: $2",
2650
0
                                                peer_uuid, req.tablet_id(),
2651
0
                                                yb::ToString(committed_config)));
2652
0
    }
2653
0
  }
2654
2655
0
  RaftConfigPB new_config = committed_config;
2656
0
  for (const auto& peer : committed_config.peers()) {
2657
0
    const string& peer_uuid = peer.permanent_uuid();
2658
0
    if (!ContainsKey(retained_peer_uuids, peer_uuid)) {
2659
0
      ChangeConfigRequestPB req;
2660
0
      req.set_tablet_id(tablet_id());
2661
0
      req.mutable_server()->set_permanent_uuid(peer_uuid);
2662
0
      req.set_type(REMOVE_SERVER);
2663
0
      req.set_cas_config_opid_index(committed_config.opid_index());
2664
0
      CHECK(RemoveFromRaftConfig(&new_config, req));
2665
0
    }
2666
0
  }
2667
  // Check that local peer is part of the new config and is a VOTER.
2668
  // Although it is valid for a local replica to not have itself
2669
  // in the committed config, it is rare and a replica without itself
2670
  // in the latest config is definitely not caught up with the latest leader's log.
2671
0
  if (!IsRaftConfigVoter(local_peer_uuid, new_config)) {
2672
0
    *error_code = TabletServerErrorPB::INVALID_CONFIG;
2673
0
    return STATUS(InvalidArgument, Substitute("Local replica uuid $0 is not "
2674
0
                                              "a VOTER in the new config, "
2675
0
                                              "rejecting the unsafe config "
2676
0
                                              "change request for tablet $1. "
2677
0
                                              "Rejected config: $2" ,
2678
0
                                              local_peer_uuid, req.tablet_id(),
2679
0
                                              yb::ToString(new_config)));
2680
0
  }
2681
0
  new_config.set_unsafe_config_change(true);
2682
0
  int64 replicate_opid_index = preceding_opid.index + 1;
2683
0
  new_config.clear_opid_index();
2684
2685
  // Sanity check the new config.
2686
0
  Status s = VerifyRaftConfig(new_config, UNCOMMITTED_QUORUM);
2687
0
  if (!s.ok()) {
2688
0
    *error_code = TabletServerErrorPB::INVALID_CONFIG;
2689
0
    return STATUS(InvalidArgument, Substitute("The resulting new config for tablet $0  "
2690
0
                                              "from passed parameters has failed raft "
2691
0
                                              "config sanity check: $1",
2692
0
                                              req.tablet_id(), s.ToString()));
2693
0
  }
2694
2695
  // Prepare the consensus request as if the request is being generated
2696
  // from a different leader.
2697
0
  ConsensusRequestPB consensus_req;
2698
0
  consensus_req.set_caller_uuid(req.caller_id());
2699
  // Bumping up the term for the consensus request being generated.
2700
  // This makes this request appear to come from a new leader that
2701
  // the local replica doesn't know about yet. If the local replica
2702
  // happens to be the leader, this will cause it to step down.
2703
0
  const int64 new_term = current_term + 1;
2704
0
  consensus_req.set_caller_term(new_term);
2705
0
  preceding_opid.ToPB(consensus_req.mutable_preceding_id());
2706
0
  last_committed_opid.ToPB(consensus_req.mutable_committed_op_id());
2707
2708
  // Prepare the replicate msg to be replicated.
2709
0
  ReplicateMsg* replicate = consensus_req.add_ops();
2710
0
  ChangeConfigRecordPB* cc_req = replicate->mutable_change_config_record();
2711
0
  cc_req->set_tablet_id(req.tablet_id());
2712
0
  *cc_req->mutable_old_config() = committed_config;
2713
0
  *cc_req->mutable_new_config() = new_config;
2714
0
  OpIdPB* id = replicate->mutable_id();
2715
  // Bumping up both the term and the opid_index from what's found in the log.
2716
0
  id->set_term(new_term);
2717
0
  id->set_index(replicate_opid_index);
2718
0
  replicate->set_op_type(CHANGE_CONFIG_OP);
2719
0
  replicate->set_hybrid_time(clock_->Now().ToUint64());
2720
0
  last_committed_opid.ToPB(replicate->mutable_committed_op_id());
2721
2722
0
  VLOG_WITH_PREFIX(3) << "UnsafeChangeConfig: Generated consensus request: "
2723
0
                      << yb::ToString(consensus_req);
2724
2725
0
  LOG_WITH_PREFIX(WARNING) << "PROCEEDING WITH UNSAFE CONFIG CHANGE ON THIS SERVER, "
2726
0
                           << "COMMITTED CONFIG: " << yb::ToString(committed_config)
2727
0
                           << "NEW CONFIG: " << yb::ToString(new_config);
2728
2729
0
  const auto deadline = CoarseMonoClock::Now() + 15s;  // TODO: fix me
2730
0
  ConsensusResponsePB consensus_resp;
2731
0
  s = Update(&consensus_req, &consensus_resp, deadline);
2732
0
  if (!s.ok() || consensus_resp.has_error()) {
2733
0
    *error_code = TabletServerErrorPB::UNKNOWN_ERROR;
2734
0
  }
2735
0
  if (s.ok() && consensus_resp.has_error()) {
2736
0
    s = StatusFromPB(consensus_resp.error().status());
2737
0
  }
2738
0
  return s;
2739
0
}
2740
2741
95.5k
void RaftConsensus::Shutdown() {
2742
95.5k
  LOG_WITH_PREFIX(INFO) << "Shutdown.";
2743
2744
  // Avoid taking locks if already shut down so we don't violate
2745
  // ThreadRestrictions assertions in the case where the RaftConsensus
2746
  // destructor runs on the reactor thread due to an election callback being
2747
  // the last outstanding reference.
2748
95.5k
  if (shutdown_.Load(kMemOrderAcquire)) return;
2749
2750
47.8k
  CHECK_OK(ExecuteHook(PRE_SHUTDOWN));
2751
2752
47.8k
  {
2753
47.8k
    ReplicaState::UniqueLock lock;
2754
    // Transition to kShuttingDown state.
2755
47.8k
    CHECK_OK(state_->LockForShutdown(&lock));
2756
47.8k
    step_down_check_tracker_.StartShutdown();
2757
47.8k
  }
2758
47.8k
  step_down_check_tracker_.CompleteShutdown();
2759
2760
  // Close the peer manager.
2761
47.8k
  peer_manager_->Close();
2762
2763
  // We must close the queue after we close the peers.
2764
47.8k
  queue_->Close();
2765
2766
47.8k
  CHECK_OK(state_->CancelPendingOperations());
2767
2768
47.8k
  {
2769
47.8k
    ReplicaState::UniqueLock lock;
2770
47.8k
    CHECK_OK(state_->LockForShutdown(&lock));
2771
47.8k
    CHECK_EQ(ReplicaState::kShuttingDown, state_->state());
2772
47.8k
    CHECK_OK(state_->ShutdownUnlocked());
2773
47.8k
    LOG_WITH_PREFIX(INFO) << "Raft consensus is shut down!";
2774
47.8k
  }
2775
2776
  // Shut down things that might acquire locks during destruction.
2777
47.8k
  raft_pool_token_->Shutdown();
2778
  // We might not have run Start yet, so make sure we have a FD.
2779
47.8k
  if (failure_detector_) {
2780
47.8k
    DisableFailureDetector();
2781
47.8k
  }
2782
2783
47.8k
  CHECK_OK(ExecuteHook(POST_SHUTDOWN));
2784
2785
47.8k
  shutdown_.Store(true, kMemOrderRelease);
2786
47.8k
}
2787
2788
0
PeerRole RaftConsensus::GetActiveRole() const {
2789
0
  auto lock = state_->LockForRead();
2790
0
  return state_->GetActiveRoleUnlocked();
2791
0
}
2792
2793
127k
yb::OpId RaftConsensus::GetLatestOpIdFromLog() {
2794
127k
  return log_->GetLatestEntryOpId();
2795
127k
}
2796
2797
74.9k
Status RaftConsensus::StartConsensusOnlyRoundUnlocked(const ReplicateMsgPtr& msg) {
2798
74.9k
  OperationType op_type = msg->op_type();
2799
74.9k
  if (!IsConsensusOnlyOperation(op_type)) {
2800
0
    return STATUS_FORMAT(InvalidArgument,
2801
0
                         "Expected a consensus-only op type, got $0: $1",
2802
0
                         OperationType_Name(op_type),
2803
0
                         *msg);
2804
0
  }
2805
74.9k
  VLOG_WITH_PREFIX(1) << "Starting consensus round: "
2806
5
                      << msg->id().ShortDebugString();
2807
74.9k
  scoped_refptr<ConsensusRound> round(new ConsensusRound(this, msg));
2808
74.9k
  std::shared_ptr<StateChangeContext> context = nullptr;
2809
2810
  // We are here for NO_OP or CHANGE_CONFIG_OP type ops. We need to set the change record for an
2811
  // actual config change operation. The NO_OP does not update the config, as it is used for a new
2812
  // leader election term change replicate message, which keeps the same config.
2813
74.9k
  if (IsChangeConfigOperation(op_type)) {
2814
7.31k
    context =
2815
7.31k
      std::make_shared<StateChangeContext>(StateChangeReason::FOLLOWER_CONFIG_CHANGE_COMPLETE,
2816
7.31k
                                           msg->change_config_record());
2817
67.6k
  } else {
2818
67.6k
    context = std::make_shared<StateChangeContext>(StateChangeReason::FOLLOWER_NO_OP_COMPLETE);
2819
67.6k
  }
2820
2821
74.9k
  StdStatusCallback client_cb =
2822
74.9k
      std::bind(&RaftConsensus::MarkDirtyOnSuccess,
2823
74.9k
                this,
2824
74.9k
                context,
2825
74.9k
                &DoNothingStatusCB,
2826
74.9k
                std::placeholders::_1);
2827
74.9k
  round->SetCallback(MakeNonTrackedRoundCallback(round.get(), std::move(client_cb)));
2828
74.9k
  return state_->AddPendingOperation(round, OperationMode::kFollower);
2829
74.9k
}
2830
2831
2.01k
Status RaftConsensus::WaitForLeaderLeaseImprecise(CoarseTimePoint deadline) {
2832
2.01k
  CoarseTimePoint start = CoarseMonoClock::now();
2833
5.07k
  for (;;) {
2834
5.07k
    MonoDelta remaining_old_leader_lease;
2835
5.07k
    LeaderLeaseStatus leader_lease_status;
2836
5.07k
    ReplicaState::State state;
2837
5.07k
    {
2838
5.07k
      auto lock = state_->LockForRead();
2839
5.07k
      state = state_->state();
2840
5.07k
      if (state != ReplicaState::kRunning) {
2841
0
        return STATUS_FORMAT(IllegalState, "Consensus is not running: $0", state);
2842
0
      }
2843
5.07k
      if (state_->GetActiveRoleUnlocked() != PeerRole::LEADER) {
2844
1
        return STATUS_FORMAT(IllegalState, "Not the leader: $0", state_->GetActiveRoleUnlocked());
2845
1
      }
2846
5.07k
      leader_lease_status = state_->GetLeaderLeaseStatusUnlocked(&remaining_old_leader_lease);
2847
5.07k
    }
2848
5.07k
    if (leader_lease_status == LeaderLeaseStatus::HAS_LEASE) {
2849
2.00k
      return Status::OK();
2850
2.00k
    }
2851
3.06k
    CoarseTimePoint now = CoarseMonoClock::now();
2852
3.06k
    if (now > deadline) {
2853
0
      return STATUS_FORMAT(
2854
0
          TimedOut, "Waited for $0 to acquire a leader lease, state $1, lease status: $2",
2855
0
          now - start, state, LeaderLeaseStatus_Name(leader_lease_status));
2856
0
    }
2857
3.06k
    switch (leader_lease_status) {
2858
0
      case LeaderLeaseStatus::HAS_LEASE:
2859
0
        return Status::OK();
2860
3.04k
      case LeaderLeaseStatus::NO_MAJORITY_REPLICATED_LEASE:
2861
3.04k
        {
2862
3.04k
          std::unique_lock<decltype(leader_lease_wait_mtx_)> lock(leader_lease_wait_mtx_);
2863
          // Because we're not taking the same lock (leader_lease_wait_mtx_) when we check the
2864
          // leader lease status, there is a possibility of a race condition when we miss the
2865
          // notification and by this point we already have a lease. Rather than re-taking the
2866
          // ReplicaState lock and re-checking, here we simply block for up to 100ms in that case,
2867
          // because this function is currently (08/14/2017) only used in a context when it is OK,
2868
          // such as catalog manager initialization.
2869
3.04k
          leader_lease_wait_cond_.wait_for(
2870
3.04k
              lock, std::max<MonoDelta>(100ms, deadline - now).ToSteadyDuration());
2871
3.04k
        }
2872
3.04k
        continue;
2873
21
      case LeaderLeaseStatus::OLD_LEADER_MAY_HAVE_LEASE: {
2874
21
        auto wait_deadline = std::min({deadline, now + 100ms, now + remaining_old_leader_lease});
2875
21
        std::this_thread::sleep_until(wait_deadline);
2876
21
      } continue;
2877
0
    }
2878
0
    FATAL_INVALID_ENUM_VALUE(LeaderLeaseStatus, leader_lease_status);
2879
0
  }
2880
2.01k
}
2881
2882
2.64M
Status RaftConsensus::CheckIsActiveLeaderAndHasLease() const {
2883
2.64M
  return state_->CheckIsActiveLeaderAndHasLease();
2884
2.64M
}
2885
2886
Result<MicrosTime> RaftConsensus::MajorityReplicatedHtLeaseExpiration(
2887
33.4M
    MicrosTime min_allowed, CoarseTimePoint deadline) const {
2888
33.4M
  return state_->MajorityReplicatedHtLeaseExpiration(min_allowed, deadline);
2889
33.4M
}
2890
2891
63.8k
std::string RaftConsensus::GetRequestVoteLogPrefix(const VoteRequestPB& request) const {
2892
63.8k
  return Format("$0 Leader $1election vote request",
2893
63.1k
                state_->LogPrefix(), request.preelection() ? "pre-" : "");
2894
63.8k
}
2895
2896
void RaftConsensus::FillVoteResponseVoteGranted(
2897
122k
    const VoteRequestPB& request, VoteResponsePB* response) {
2898
122k
  response->set_responder_term(request.candidate_term());
2899
122k
  response->set_vote_granted(true);
2900
122k
}
2901
2902
void RaftConsensus::FillVoteResponseVoteDenied(ConsensusErrorPB::Code error_code,
2903
2.76k
                                               VoteResponsePB* response) {
2904
2.76k
  response->set_responder_term(state_->GetCurrentTermUnlocked());
2905
2.76k
  response->set_vote_granted(false);
2906
2.76k
  response->mutable_consensus_error()->set_code(error_code);
2907
2.76k
}
2908
2909
void RaftConsensus::RequestVoteRespondVoteDenied(
2910
    ConsensusErrorPB::Code error_code, const std::string& message_suffix,
2911
470
    const VoteRequestPB& request, VoteResponsePB* response) {
2912
470
  auto status = STATUS_FORMAT(
2913
470
      InvalidArgument, "$0: Denying vote to candidate $1 $2",
2914
470
      GetRequestVoteLogPrefix(request), request.candidate_uuid(), message_suffix);
2915
470
  FillVoteResponseVoteDenied(error_code, response);
2916
470
  LOG(INFO) << status.message().ToBuffer();
2917
470
  StatusToPB(status, response->mutable_consensus_error()->mutable_status());
2918
470
}
2919
2920
Status RaftConsensus::RequestVoteRespondInvalidTerm(const VoteRequestPB* request,
2921
113
                                                    VoteResponsePB* response) {
2922
113
  auto message_suffix = Format(
2923
113
      "for earlier term $0. Current term is $1.",
2924
113
      request->candidate_term(), state_->GetCurrentTermUnlocked());
2925
113
  RequestVoteRespondVoteDenied(ConsensusErrorPB::INVALID_TERM, message_suffix, *request, response);
2926
113
  return Status::OK();
2927
113
}
2928
2929
Status RaftConsensus::RequestVoteRespondVoteAlreadyGranted(const VoteRequestPB* request,
2930
48
                                                           VoteResponsePB* response) {
2931
48
  FillVoteResponseVoteGranted(*request, response);
2932
48
  LOG(INFO) << Substitute("$0: Already granted yes vote for candidate $1 in term $2. "
2933
48
                          "Re-sending same reply.",
2934
48
                          GetRequestVoteLogPrefix(*request),
2935
48
                          request->candidate_uuid(),
2936
48
                          request->candidate_term());
2937
48
  return Status::OK();
2938
48
}
2939
2940
Status RaftConsensus::RequestVoteRespondAlreadyVotedForOther(const VoteRequestPB* request,
2941
191
                                                             VoteResponsePB* response) {
2942
191
  auto message_suffix = Format(
2943
191
      "in current term $0: Already voted for candidate $1 in this term.",
2944
191
      state_->GetCurrentTermUnlocked(), state_->GetVotedForCurrentTermUnlocked());
2945
191
  RequestVoteRespondVoteDenied(ConsensusErrorPB::ALREADY_VOTED, message_suffix, *request, response);
2946
191
  return Status::OK();
2947
191
}
2948
2949
Status RaftConsensus::RequestVoteRespondLastOpIdTooOld(const OpIdPB& local_last_logged_opid,
2950
                                                       const VoteRequestPB* request,
2951
166
                                                       VoteResponsePB* response) {
2952
166
  auto message_suffix = Format(
2953
166
      "for term $0 because replica has last-logged OpId of $1, which is greater than that of the "
2954
166
          "candidate, which has last-logged OpId of $2.",
2955
166
      request->candidate_term(), local_last_logged_opid,
2956
166
      request->candidate_status().last_received());
2957
166
  RequestVoteRespondVoteDenied(
2958
166
      ConsensusErrorPB::LAST_OPID_TOO_OLD, message_suffix, *request, response);
2959
166
  return Status::OK();
2960
166
}
2961
2962
Status RaftConsensus::RequestVoteRespondLeaderIsAlive(const VoteRequestPB* request,
2963
319
                                                      VoteResponsePB* response) {
2964
319
  FillVoteResponseVoteDenied(ConsensusErrorPB::LEADER_IS_ALIVE, response);
2965
319
  std::string msg = Format(
2966
319
      "$0: Denying vote to candidate $1 for term $2 because replica is either leader or believes a "
2967
319
      "valid leader to be alive. Time left: $3",
2968
319
      GetRequestVoteLogPrefix(*request), request->candidate_uuid(), request->candidate_term(),
2969
319
      withhold_votes_until_.load(std::memory_order_acquire) - MonoTime::Now());
2970
319
  LOG(INFO) << msg;
2971
319
  StatusToPB(STATUS(InvalidArgument, msg), response->mutable_consensus_error()->mutable_status());
2972
319
  return Status::OK();
2973
319
}
2974
2975
Status RaftConsensus::RequestVoteRespondIsBusy(const VoteRequestPB* request,
2976
1.97k
                                               VoteResponsePB* response) {
2977
1.97k
  FillVoteResponseVoteDenied(ConsensusErrorPB::CONSENSUS_BUSY, response);
2978
1.97k
  string msg = Substitute("$0: Denying vote to candidate $1 for term $2 because "
2979
1.97k
                          "replica is already servicing an update from a current leader "
2980
1.97k
                          "or another vote.",
2981
1.97k
                          GetRequestVoteLogPrefix(*request),
2982
1.97k
                          request->candidate_uuid(),
2983
1.97k
                          request->candidate_term());
2984
1.97k
  LOG(INFO) << msg;
2985
1.97k
  StatusToPB(STATUS(ServiceUnavailable, msg),
2986
1.97k
             response->mutable_consensus_error()->mutable_status());
2987
1.97k
  return Status::OK();
2988
1.97k
}
2989
2990
Status RaftConsensus::RequestVoteRespondVoteGranted(const VoteRequestPB* request,
2991
61.0k
                                                    VoteResponsePB* response) {
2992
  // We know our vote will be "yes", so avoid triggering an election while we
2993
  // persist our vote to disk. We use an exponential backoff to avoid too much
2994
  // split-vote contention when nodes display high latencies.
2995
61.0k
  MonoDelta additional_backoff = LeaderElectionExpBackoffDeltaUnlocked();
2996
61.0k
  SnoozeFailureDetector(ALLOW_LOGGING, additional_backoff);
2997
2998
  // Persist our vote to disk.
2999
61.0k
  RETURN_NOT_OK(state_->SetVotedForCurrentTermUnlocked(request->candidate_uuid()));
3000
3001
61.0k
  FillVoteResponseVoteGranted(*request, response);
3002
3003
  // Give peer time to become leader. Snooze one more time after persisting our
3004
  // vote. When disk latency is high, this should help reduce churn.
3005
61.0k
  SnoozeFailureDetector(DO_NOT_LOG, additional_backoff);
3006
3007
61.0k
  LOG(INFO) << Substitute("$0: Granting yes vote for candidate $1 in term $2.",
3008
61.0k
                          GetRequestVoteLogPrefix(*request),
3009
61.0k
                          request->candidate_uuid(),
3010
61.0k
                          state_->GetCurrentTermUnlocked());
3011
61.0k
  return Status::OK();
3012
61.0k
}
3013
3014
3.91M
PeerRole RaftConsensus::GetRoleUnlocked() const {
3015
3.91M
  DCHECK(state_->IsLocked());
3016
3.91M
  return state_->GetActiveRoleUnlocked();
3017
3.91M
}
3018
3019
3.90M
PeerRole RaftConsensus::role() const {
3020
3.90M
  auto lock = state_->LockForRead();
3021
3.90M
  return GetRoleUnlocked();
3022
3.90M
}
3023
3024
33.3M
LeaderState RaftConsensus::GetLeaderState(bool allow_stale) const {
3025
33.3M
  return state_->GetLeaderState(allow_stale);
3026
33.3M
}
3027
3028
1.47M
std::string RaftConsensus::LogPrefix() {
3029
1.47M
  return state_->LogPrefix();
3030
1.47M
}
3031
3032
103k
void RaftConsensus::SetLeaderUuidUnlocked(const string& uuid) {
3033
103k
  failed_elections_since_stable_leader_.store(0, std::memory_order_release);
3034
103k
  state_->SetLeaderUuidUnlocked(uuid);
3035
103k
  auto context = std::make_shared<StateChangeContext>(StateChangeReason::NEW_LEADER_ELECTED, uuid);
3036
103k
  MarkDirty(context);
3037
103k
}
3038
3039
Status RaftConsensus::ReplicateConfigChangeUnlocked(const ReplicateMsgPtr& replicate_ref,
3040
                                                    const RaftConfigPB& new_config,
3041
                                                    ChangeConfigType type,
3042
2.93k
                                                    StdStatusCallback client_cb) {
3043
2.93k
  LOG(INFO) << "Setting replicate pending config " << new_config.ShortDebugString()
3044
2.93k
            << ", type = " << ChangeConfigType_Name(type);
3045
3046
2.93k
  RETURN_NOT_OK(state_->SetPendingConfigUnlocked(new_config));
3047
3048
2.93k
  if (type == CHANGE_ROLE &&
3049
989
      PREDICT_FALSE(FLAGS_TEST_inject_delay_leader_change_role_append_secs)) {
3050
1
    LOG(INFO) << "Adding change role sleep for "
3051
1
              << FLAGS_TEST_inject_delay_leader_change_role_append_secs << " secs.";
3052
1
    SleepFor(MonoDelta::FromSeconds(FLAGS_TEST_inject_delay_leader_change_role_append_secs));
3053
1
  }
3054
3055
  // Set as pending.
3056
2.93k
  RefreshConsensusQueueAndPeersUnlocked();
3057
3058
2.93k
  auto round = make_scoped_refptr<ConsensusRound>(this, replicate_ref);
3059
2.93k
  round->SetCallback(MakeNonTrackedRoundCallback(round.get(), std::move(client_cb)));
3060
2.93k
  auto status = AppendNewRoundToQueueUnlocked(round);
3061
2.93k
  if (!status.ok()) {
3062
    // We could just cancel pending config, because there is could be only one pending config.
3063
0
    auto clear_status = state_->ClearPendingConfigUnlocked();
3064
0
    if (!clear_status.ok()) {
3065
0
      LOG(WARNING) << "Could not clear pending config: " << clear_status;
3066
0
    }
3067
0
  }
3068
2.93k
  return status;
3069
2.93k
}
3070
3071
38.1k
void RaftConsensus::RefreshConsensusQueueAndPeersUnlocked() {
3072
38.1k
  DCHECK_EQ(PeerRole::LEADER, state_->GetActiveRoleUnlocked());
3073
38.1k
  const RaftConfigPB& active_config = state_->GetActiveConfigUnlocked();
3074
3075
  // Change the peers so that we're able to replicate messages remotely and
3076
  // locally. Peer manager connections are updated using the active config. Connections to peers
3077
  // that are not part of active_config are closed. New connections are created for those peers
3078
  // that are present in active_config but have no connections. When the queue is in LEADER
3079
  // mode, it checks that all registered peers are a part of the active config.
3080
38.1k
  peer_manager_->ClosePeersNotInConfig(active_config);
3081
38.1k
  queue_->SetLeaderMode(state_->GetCommittedOpIdUnlocked(),
3082
38.1k
                        state_->GetCurrentTermUnlocked(),
3083
38.1k
                        state_->GetLastAppliedOpIdUnlocked(),
3084
38.1k
                        active_config);
3085
3086
38.1k
  ScopedDnsTracker dns_tracker(update_raft_config_dns_latency_.get());
3087
38.1k
  peer_manager_->UpdateRaftConfig(active_config);
3088
38.1k
}
3089
3090
2.29k
string RaftConsensus::peer_uuid() const {
3091
2.29k
  return state_->GetPeerUuid();
3092
2.29k
}
3093
3094
9.15k
string RaftConsensus::tablet_id() const {
3095
9.15k
  return state_->GetOptions().tablet_id;
3096
9.15k
}
3097
3098
4.99k
const TabletId& RaftConsensus::split_parent_tablet_id() const {
3099
4.99k
  return split_parent_tablet_id_;
3100
4.99k
}
3101
3102
ConsensusStatePB RaftConsensus::ConsensusState(
3103
    ConsensusConfigType type,
3104
7.27M
    LeaderLeaseStatus* leader_lease_status) const {
3105
7.27M
  auto lock = state_->LockForRead();
3106
7.27M
  return ConsensusStateUnlocked(type, leader_lease_status);
3107
7.27M
}
3108
3109
ConsensusStatePB RaftConsensus::ConsensusStateUnlocked(
3110
    ConsensusConfigType type,
3111
7.28M
    LeaderLeaseStatus* leader_lease_status) const {
3112
7.28M
  CHECK(state_->IsLocked());
3113
7.28M
  if (leader_lease_status) {
3114
3.68k
    if (GetRoleUnlocked() == PeerRole::LEADER) {
3115
1.39k
      *leader_lease_status = state_->GetLeaderLeaseStatusUnlocked();
3116
2.29k
    } else {
3117
      // We'll still return a valid value if we're not a leader.
3118
2.29k
      *leader_lease_status = LeaderLeaseStatus::NO_MAJORITY_REPLICATED_LEASE;
3119
2.29k
    }
3120
3.68k
  }
3121
7.28M
  return state_->ConsensusStateUnlocked(type);
3122
7.28M
}
3123
3124
50.6M
RaftConfigPB RaftConsensus::CommittedConfig() const {
3125
50.6M
  auto lock = state_->LockForRead();
3126
50.6M
  return state_->GetCommittedConfigUnlocked();
3127
50.6M
}
3128
3129
47
void RaftConsensus::DumpStatusHtml(std::ostream& out) const {
3130
47
  out << "<h1>Raft Consensus State</h1>" << std::endl;
3131
3132
47
  out << "<h2>State</h2>" << std::endl;
3133
47
  out << "<pre>" << EscapeForHtmlToString(queue_->ToString()) << "</pre>" << std::endl;
3134
3135
  // Dump the queues on a leader.
3136
47
  PeerRole role;
3137
47
  {
3138
47
    auto lock = state_->LockForRead();
3139
47
    role = state_->GetActiveRoleUnlocked();
3140
47
  }
3141
47
  if (role == PeerRole::LEADER) {
3142
47
    out << "<h2>Queue overview</h2>" << std::endl;
3143
47
    out << "<pre>" << EscapeForHtmlToString(queue_->ToString()) << "</pre>" << std::endl;
3144
47
    out << "<hr/>" << std::endl;
3145
47
    out << "<h2>Queue details</h2>" << std::endl;
3146
47
    queue_->DumpToHtml(out);
3147
47
  }
3148
47
}
3149
3150
46
ReplicaState* RaftConsensus::GetReplicaStateForTests() {
3151
46
  return state_.get();
3152
46
}
3153
3154
void RaftConsensus::ElectionCallback(const LeaderElectionData& data,
3155
75.9k
                                     const ElectionResult& result) {
3156
  // The election callback runs on a reactor thread, so we need to defer to our
3157
  // threadpool. If the threadpool is already shut down for some reason, it's OK --
3158
  // we're OK with the callback never running.
3159
75.9k
  WARN_NOT_OK(raft_pool_token_->SubmitFunc(
3160
75.9k
              std::bind(&RaftConsensus::DoElectionCallback, shared_from_this(), data, result)),
3161
75.9k
              state_->LogPrefix() + "Unable to run election callback");
3162
75.9k
}
3163
3164
4.76k
void RaftConsensus::NotifyOriginatorAboutLostElection(const std::string& originator_uuid) {
3165
4.76k
  if (originator_uuid.empty()) {
3166
4.71k
    return;
3167
4.71k
  }
3168
3169
44
  ReplicaState::UniqueLock lock;
3170
44
  Status s = state_->LockForConfigChange(&lock);
3171
44
  if (PREDICT_FALSE(!s.ok())) {
3172
0
    LOG_WITH_PREFIX(INFO) << "Unable to notify originator about lost election, lock failed: "
3173
0
                          << s.ToString();
3174
0
    return;
3175
0
  }
3176
3177
44
  const auto& active_config = state_->GetActiveConfigUnlocked();
3178
44
  const auto * peer = FindPeer(active_config, originator_uuid);
3179
44
  if (!peer) {
3180
0
    LOG_WITH_PREFIX(WARNING) << "Failed to find originators peer: " << originator_uuid
3181
0
                             << ", config: " << active_config.ShortDebugString();
3182
0
    return;
3183
0
  }
3184
3185
44
  auto proxy = peer_proxy_factory_->NewProxy(*peer);
3186
44
  LeaderElectionLostRequestPB req;
3187
44
  req.set_dest_uuid(originator_uuid);
3188
44
  req.set_election_lost_by_uuid(state_->GetPeerUuid());
3189
44
  req.set_tablet_id(state_->GetOptions().tablet_id);
3190
44
  auto resp = std::make_shared<LeaderElectionLostResponsePB>();
3191
44
  auto rpc = std::make_shared<rpc::RpcController>();
3192
44
  rpc->set_invoke_callback_mode(rpc::InvokeCallbackMode::kThreadPoolHigh);
3193
44
  auto log_prefix = state_->LogPrefix();
3194
44
  proxy->LeaderElectionLostAsync(&req, resp.get(), rpc.get(), [log_prefix, resp, rpc] {
3195
44
    if (!rpc->status().ok()) {
3196
0
      LOG(WARNING) << log_prefix << "Notify about lost election RPC failure: "
3197
0
                   << rpc->status().ToString();
3198
44
    } else if (resp->has_error()) {
3199
0
      LOG(WARNING) << log_prefix << "Notify about lost election failed: "
3200
0
                   << StatusFromPB(resp->error().status()).ToString();
3201
0
    }
3202
44
  });
3203
44
}
3204
3205
void RaftConsensus::DoElectionCallback(const LeaderElectionData& data,
3206
75.9k
                                       const ElectionResult& result) {
3207
40.7k
  const char* election_name = result.preelection ? "Pre-election" : "election";
3208
71.1k
  const char* decision_name = result.decision == ElectionVote::kGranted ? "won" : "lost";
3209
  // Snooze to avoid the election timer firing again as much as possible.
3210
75.9k
  {
3211
75.9k
    auto lock = state_->LockForRead();
3212
    // We need to snooze when we win and when we lose:
3213
    // - When we win because we're about to disable the timer and become leader.
3214
    // - When we loose or otherwise we can fall into a cycle, where everyone keeps
3215
    //   triggering elections but no election ever completes because by the time they
3216
    //   finish another one is triggered already.
3217
    // We ignore the status as we don't want to fail if we the timer is
3218
    // disabled.
3219
75.9k
    SnoozeFailureDetector(ALLOW_LOGGING, LeaderElectionExpBackoffDeltaUnlocked());
3220
3221
75.9k
    if (!result.preelections_not_supported_by_uuid.empty()) {
3222
0
      disable_pre_elections_until_ =
3223
0
          CoarseMonoClock::now() + FLAGS_temporary_disable_preelections_timeout_ms * 1ms;
3224
0
      LOG_WITH_PREFIX(WARNING)
3225
0
          << "Disable pre-elections until " << ToString(disable_pre_elections_until_)
3226
0
          << ", because " << result.preelections_not_supported_by_uuid << " does not support them.";
3227
0
    }
3228
75.9k
  }
3229
75.9k
  if (result.decision == ElectionVote::kDenied) {
3230
4.76k
    failed_elections_since_stable_leader_.fetch_add(1, std::memory_order_acq_rel);
3231
4.76k
    LOG_WITH_PREFIX(INFO) << "Leader " << election_name << " lost for term "
3232
4.76k
                          << result.election_term << ". Reason: "
3233
4.65k
                          << (!result.message.empty() ? result.message : "None given")
3234
4.76k
                          << ". Originator: " << data.originator_uuid;
3235
4.76k
    NotifyOriginatorAboutLostElection(data.originator_uuid);
3236
3237
4.76k
    if (result.higher_term) {
3238
108
      ReplicaState::UniqueLock lock;
3239
108
      Status s = state_->LockForConfigChange(&lock);
3240
108
      if (s.ok()) {
3241
108
        s = HandleTermAdvanceUnlocked(*result.higher_term);
3242
108
      }
3243
108
      if (!s.ok()) {
3244
94
        LOG_WITH_PREFIX(INFO) << "Unable to advance term as " << election_name << " result: " << s;
3245
94
      }
3246
108
    }
3247
3248
4.76k
    return;
3249
4.76k
  }
3250
3251
71.1k
  ReplicaState::UniqueLock lock;
3252
71.1k
  Status s = state_->LockForConfigChange(&lock);
3253
71.1k
  if (PREDICT_FALSE(!s.ok())) {
3254
0
    LOG_WITH_PREFIX(INFO) << "Received " << election_name << " callback for term "
3255
0
                          << result.election_term << " while not running: "
3256
0
                          << s.ToString();
3257
0
    return;
3258
0
  }
3259
3260
71.1k
  auto desired_term = state_->GetCurrentTermUnlocked() + (result.preelection ? 1 : 0);
3261
71.1k
  if (result.election_term != desired_term) {
3262
15
    LOG_WITH_PREFIX(INFO)
3263
15
        << "Leader " << election_name << " decision for defunct term "
3264
15
        << result.election_term << ": " << decision_name;
3265
15
    return;
3266
15
  }
3267
3268
71.1k
  const RaftConfigPB& active_config = state_->GetActiveConfigUnlocked();
3269
71.1k
  if (!IsRaftConfigVoter(state_->GetPeerUuid(), active_config)) {
3270
0
    LOG_WITH_PREFIX(WARNING)
3271
0
        << "Leader " << election_name << " decision while not in active config. "
3272
0
        << "Result: Term " << result.election_term << ": " << decision_name
3273
0
        << ". RaftConfig: " << active_config.ShortDebugString();
3274
0
    return;
3275
0
  }
3276
3277
71.1k
  if (result.preelection) {
3278
36.0k
    LOG_WITH_PREFIX(INFO) << "Leader pre-election won for term " << result.election_term;
3279
36.0k
    lock.unlock();
3280
36.0k
    WARN_NOT_OK(DoStartElection(data, PreElected::kTrue), "Start election failed: ");
3281
36.0k
    return;
3282
36.0k
  }
3283
3284
35.0k
  if (state_->GetActiveRoleUnlocked() == PeerRole::LEADER) {
3285
0
    LOG_WITH_PREFIX(DFATAL)
3286
0
        << "Leader " << election_name << " callback while already leader! Result: Term "
3287
0
        << result.election_term << ": "
3288
0
        << decision_name;
3289
0
    return;
3290
0
  }
3291
3292
35.0k
  LOG_WITH_PREFIX(INFO) << "Leader " << election_name << " won for term " << result.election_term;
3293
3294
  // Apply lease updates that were possible received from voters.
3295
35.0k
  state_->UpdateOldLeaderLeaseExpirationOnNonLeaderUnlocked(
3296
35.0k
      result.old_leader_lease, result.old_leader_ht_lease);
3297
3298
35.0k
  state_->SetLeaderNoOpCommittedUnlocked(false);
3299
  // Convert role to LEADER.
3300
35.0k
  SetLeaderUuidUnlocked(state_->GetPeerUuid());
3301
3302
  // TODO: BecomeLeaderUnlocked() can fail due to state checks during shutdown.
3303
  // It races with the above state check.
3304
  // This could be a problem during tablet deletion.
3305
35.0k
  auto status = BecomeLeaderUnlocked();
3306
35.0k
  if (!status.ok()) {
3307
0
    LOG_WITH_PREFIX(DFATAL) << "Failed to become leader: " << status.ToString();
3308
0
  }
3309
35.0k
}
3310
3311
340
yb::OpId RaftConsensus::GetLastReceivedOpId() {
3312
340
  auto lock = state_->LockForRead();
3313
340
  return state_->GetLastReceivedOpIdUnlocked();
3314
340
}
3315
3316
7.37M
yb::OpId RaftConsensus::GetLastCommittedOpId() {
3317
7.37M
  auto lock = state_->LockForRead();
3318
7.37M
  return state_->GetCommittedOpIdUnlocked();
3319
7.37M
}
3320
3321
1.96M
yb::OpId RaftConsensus::GetLastCDCedOpId() {
3322
1.96M
  return queue_->GetCDCConsumerOpIdForIntentRemoval();
3323
1.96M
}
3324
3325
0
yb::OpId RaftConsensus::GetLastAppliedOpId() {
3326
0
  auto lock = state_->LockForRead();
3327
0
  return state_->GetLastAppliedOpIdUnlocked();
3328
0
}
3329
3330
62
yb::OpId RaftConsensus::GetAllAppliedOpId() {
3331
62
  return queue_->GetAllAppliedOpId();
3332
62
}
3333
3334
269k
void RaftConsensus::MarkDirty(std::shared_ptr<StateChangeContext> context) {
3335
269k
  LOG_WITH_PREFIX(INFO) << "Calling mark dirty synchronously for reason code " << context->reason;
3336
269k
  mark_dirty_clbk_.Run(context);
3337
269k
}
3338
3339
void RaftConsensus::MarkDirtyOnSuccess(std::shared_ptr<StateChangeContext> context,
3340
                                       const StdStatusCallback& client_cb,
3341
77.6k
                                       const Status& status) {
3342
77.6k
  if (PREDICT_TRUE(status.ok())) {
3343
77.4k
    MarkDirty(context);
3344
77.4k
  }
3345
77.6k
  client_cb(status);
3346
77.6k
}
3347
3348
void RaftConsensus::NonTrackedRoundReplicationFinished(ConsensusRound* round,
3349
                                                       const StdStatusCallback& client_cb,
3350
113k
                                                       const Status& status) {
3351
113k
  DCHECK(state_->IsLocked());
3352
113k
  OperationType op_type = round->replicate_msg()->op_type();
3353
113k
  string op_str = Format("$0 [$1]", OperationType_Name(op_type), round->id());
3354
113k
  if (!IsConsensusOnlyOperation(op_type)) {
3355
0
    LOG_WITH_PREFIX(ERROR) << "Unexpected op type: " << op_str;
3356
0
    return;
3357
0
  }
3358
113k
  if (!status.ok()) {
3359
    // TODO: Do something with the status on failure?
3360
59
    LOG_WITH_PREFIX(INFO) << op_str << " replication failed: " << status << "\n" << GetStackTrace();
3361
3362
    // Clear out the pending state (ENG-590).
3363
59
    if (IsChangeConfigOperation(op_type)) {
3364
4
      WARN_NOT_OK(state_->ClearPendingConfigUnlocked(), "Could not clear pending state");
3365
4
    }
3366
112k
  } else if (IsChangeConfigOperation(op_type)) {
3367
    // Notify the TabletPeer owner object.
3368
10.0k
    state_->context()->ChangeConfigReplicated(state_->GetCommittedConfigUnlocked());
3369
10.0k
  }
3370
3371
113k
  client_cb(status);
3372
113k
}
3373
3374
93.6k
void RaftConsensus::EnableFailureDetector(MonoDelta delta) {
3375
93.6k
  if (PREDICT_TRUE(FLAGS_enable_leader_failure_detection)) {
3376
93.4k
    failure_detector_->Start(delta);
3377
93.4k
  }
3378
93.6k
}
3379
3380
82.9k
void RaftConsensus::DisableFailureDetector() {
3381
82.9k
  if (PREDICT_TRUE(FLAGS_enable_leader_failure_detection)) {
3382
82.8k
    failure_detector_->Stop();
3383
82.8k
  }
3384
82.9k
}
3385
3386
10.4M
void RaftConsensus::SnoozeFailureDetector(AllowLogging allow_logging, MonoDelta delta) {
3387
10.4M
  if (PREDICT_TRUE(GetAtomicFlag(&FLAGS_enable_leader_failure_detection))) {
3388
10.4M
    if (allow_logging == ALLOW_LOGGING) {
3389
213k
      LOG_WITH_PREFIX(INFO) << Format("Snoozing leader timeout detection for $0",
3390
18.4E
                                      delta.Initialized() ? delta.ToString() : "election timeout");
3391
213k
    }
3392
3393
10.4M
    if (!delta.Initialized()) {
3394
10.1M
      delta = MinimumElectionTimeout();
3395
10.1M
    }
3396
10.4M
    failure_detector_->Snooze(delta);
3397
10.4M
  }
3398
10.4M
}
3399
3400
20.7M
MonoDelta RaftConsensus::MinimumElectionTimeout() const {
3401
20.7M
  int32_t failure_timeout = FLAGS_leader_failure_max_missed_heartbeat_periods *
3402
20.7M
      FLAGS_raft_heartbeat_interval_ms;
3403
3404
20.7M
  return MonoDelta::FromMilliseconds(failure_timeout);
3405
20.7M
}
3406
3407
213k
MonoDelta RaftConsensus::LeaderElectionExpBackoffDeltaUnlocked() {
3408
  // Compute a backoff factor based on how many leader elections have
3409
  // taken place since a stable leader was last seen.
3410
213k
  double backoff_factor = pow(
3411
213k
      1.1,
3412
213k
      failed_elections_since_stable_leader_.load(std::memory_order_acquire) + 1);
3413
213k
  double min_timeout = MinimumElectionTimeout().ToMilliseconds();
3414
213k
  double max_timeout = std::min<double>(
3415
213k
      min_timeout * backoff_factor,
3416
213k
      FLAGS_leader_failure_exp_backoff_max_delta_ms);
3417
213k
  if (max_timeout < min_timeout) {
3418
558
    LOG(INFO) << "Resetting max_timeout from " <<  max_timeout << " to " << min_timeout
3419
558
              << ", max_delta_flag=" << FLAGS_leader_failure_exp_backoff_max_delta_ms;
3420
558
    max_timeout = min_timeout;
3421
558
  }
3422
  // Randomize the timeout between the minimum and the calculated value.
3423
  // We do this after the above capping to the max. Otherwise, after a
3424
  // churny period, we'd end up highly likely to backoff exactly the max
3425
  // amount.
3426
213k
  double timeout = min_timeout + (max_timeout - min_timeout) * rng_.NextDoubleFraction();
3427
213k
  DCHECK_GE(timeout, min_timeout);
3428
3429
213k
  return MonoDelta::FromMilliseconds(timeout);
3430
213k
}
3431
3432
36.1k
Status RaftConsensus::IncrementTermUnlocked() {
3433
36.1k
  return HandleTermAdvanceUnlocked(state_->GetCurrentTermUnlocked() + 1);
3434
36.1k
}
3435
3436
103k
Status RaftConsensus::HandleTermAdvanceUnlocked(ConsensusTerm new_term) {
3437
103k
  if (new_term <= state_->GetCurrentTermUnlocked()) {
3438
31
    return STATUS(IllegalState, Substitute("Can't advance term to: $0 current term: $1 is higher.",
3439
31
                                           new_term, state_->GetCurrentTermUnlocked()));
3440
31
  }
3441
3442
103k
  if (state_->GetActiveRoleUnlocked() == PeerRole::LEADER) {
3443
47
    LOG_WITH_PREFIX(INFO) << "Stepping down as leader of term "
3444
47
                          << state_->GetCurrentTermUnlocked()
3445
47
                          << " since new term is " << new_term;
3446
3447
47
    RETURN_NOT_OK(BecomeReplicaUnlocked(std::string()));
3448
47
  }
3449
3450
103k
  LOG_WITH_PREFIX(INFO) << "Advancing to term " << new_term;
3451
103k
  RETURN_NOT_OK(state_->SetCurrentTermUnlocked(new_term));
3452
102k
  term_metric_->set_value(new_term);
3453
102k
  return Status::OK();
3454
103k
}
3455
3456
Result<ReadOpsResult> RaftConsensus::ReadReplicatedMessagesForCDC(const yb::OpId& from,
3457
  int64_t* last_replicated_opid_index,
3458
307
  const CoarseTimePoint deadline) {
3459
307
  return queue_->ReadReplicatedMessagesForCDC(from, last_replicated_opid_index, deadline);
3460
307
}
3461
3462
485
void RaftConsensus::UpdateCDCConsumerOpId(const yb::OpId& op_id) {
3463
485
  return queue_->UpdateCDCConsumerOpId(op_id);
3464
485
}
3465
3466
void RaftConsensus::RollbackIdAndDeleteOpId(const ReplicateMsgPtr& replicate_msg,
3467
12
                                            bool should_exists) {
3468
12
  state_->CancelPendingOperation(OpId::FromPB(replicate_msg->id()), should_exists);
3469
12
}
3470
3471
7.29k
uint64_t RaftConsensus::OnDiskSize() const {
3472
7.29k
  return state_->OnDiskSize();
3473
7.29k
}
3474
3475
2.96M
yb::OpId RaftConsensus::WaitForSafeOpIdToApply(const yb::OpId& op_id) {
3476
2.96M
  return log_->WaitForSafeOpIdToApply(op_id);
3477
2.96M
}
3478
3479
6.40M
yb::OpId RaftConsensus::MinRetryableRequestOpId() {
3480
6.40M
  return state_->MinRetryableRequestOpId();
3481
6.40M
}
3482
3483
0
size_t RaftConsensus::LogCacheSize() {
3484
0
  return queue_->LogCacheSize();
3485
0
}
3486
3487
0
size_t RaftConsensus::EvictLogCache(size_t bytes_to_evict) {
3488
0
  return queue_->EvictLogCache(bytes_to_evict);
3489
0
}
3490
3491
0
RetryableRequestsCounts RaftConsensus::TEST_CountRetryableRequests() {
3492
0
  return state_->TEST_CountRetryableRequests();
3493
0
}
3494
3495
410
void RaftConsensus::TrackOperationMemory(const yb::OpId& op_id) {
3496
410
  queue_->TrackOperationsMemory({op_id});
3497
410
}
3498
3499
23
int64_t RaftConsensus::TEST_LeaderTerm() const {
3500
23
  auto lock = state_->LockForRead();
3501
23
  return state_->GetCurrentTermUnlocked();
3502
23
}
3503
3504
92
std::string RaftConsensus::DelayedStepDown::ToString() const {
3505
92
  return YB_STRUCT_TO_STRING(term, protege, graceful);
3506
92
}
3507
3508
3
Result<OpId> RaftConsensus::TEST_GetLastOpIdWithType(OpIdType opid_type, OperationType op_type) {
3509
3
  return queue_->TEST_GetLastOpIdWithType(VERIFY_RESULT(GetLastOpId(opid_type)).index, op_type);
3510
3
}
3511
3512
}  // namespace consensus
3513
}  // namespace yb