YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/consensus/raft_consensus.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include "yb/consensus/raft_consensus.h"
34
35
#include <algorithm>
36
#include <memory>
37
#include <mutex>
38
39
#include <boost/optional.hpp>
40
#include <gflags/gflags.h>
41
42
#include "yb/common/wire_protocol.h"
43
44
#include "yb/consensus/consensus.pb.h"
45
#include "yb/consensus/consensus_context.h"
46
#include "yb/consensus/consensus_peers.h"
47
#include "yb/consensus/consensus_round.h"
48
#include "yb/consensus/leader_election.h"
49
#include "yb/consensus/log.h"
50
#include "yb/consensus/peer_manager.h"
51
#include "yb/consensus/quorum_util.h"
52
#include "yb/consensus/replica_state.h"
53
#include "yb/consensus/state_change_context.h"
54
55
#include "yb/gutil/casts.h"
56
#include "yb/gutil/map-util.h"
57
#include "yb/gutil/stringprintf.h"
58
59
#include "yb/rpc/messenger.h"
60
#include "yb/rpc/periodic.h"
61
#include "yb/rpc/rpc_controller.h"
62
63
#include "yb/server/clock.h"
64
65
#include "yb/util/debug-util.h"
66
#include "yb/util/debug/long_operation_tracker.h"
67
#include "yb/util/debug/trace_event.h"
68
#include "yb/util/enums.h"
69
#include "yb/util/flag_tags.h"
70
#include "yb/util/format.h"
71
#include "yb/util/logging.h"
72
#include "yb/util/metrics.h"
73
#include "yb/util/net/dns_resolver.h"
74
#include "yb/util/random.h"
75
#include "yb/util/random_util.h"
76
#include "yb/util/scope_exit.h"
77
#include "yb/util/status_format.h"
78
#include "yb/util/status_log.h"
79
#include "yb/util/threadpool.h"
80
#include "yb/util/tostring.h"
81
#include "yb/util/trace.h"
82
#include "yb/util/tsan_util.h"
83
#include "yb/util/url-coding.h"
84
85
using namespace std::literals;
86
using namespace std::placeholders;
87
88
DEFINE_int32(raft_heartbeat_interval_ms, yb::NonTsanVsTsan(500, 1000),
89
             "The heartbeat interval for Raft replication. The leader produces heartbeats "
90
             "to followers at this interval. The followers expect a heartbeat at this interval "
91
             "and consider a leader to have failed if it misses several in a row.");
92
TAG_FLAG(raft_heartbeat_interval_ms, advanced);
93
94
DEFINE_double(leader_failure_max_missed_heartbeat_periods, 6.0,
95
              "Maximum heartbeat periods that the leader can fail to heartbeat in before we "
96
              "consider the leader to be failed. The total failure timeout in milliseconds is "
97
              "raft_heartbeat_interval_ms times leader_failure_max_missed_heartbeat_periods. "
98
              "The value passed to this flag may be fractional.");
99
TAG_FLAG(leader_failure_max_missed_heartbeat_periods, advanced);
100
101
DEFINE_int32(leader_failure_exp_backoff_max_delta_ms, 20 * 1000,
102
             "Maximum time to sleep in between leader election retries, in addition to the "
103
             "regular timeout. When leader election fails the interval in between retries "
104
             "increases exponentially, up to this value.");
105
TAG_FLAG(leader_failure_exp_backoff_max_delta_ms, experimental);
106
107
DEFINE_bool(enable_leader_failure_detection, true,
108
            "Whether to enable failure detection of tablet leaders. If enabled, attempts will be "
109
            "made to elect a follower as a new leader when the leader is detected to have failed.");
110
TAG_FLAG(enable_leader_failure_detection, unsafe);
111
112
DEFINE_test_flag(bool, do_not_start_election_test_only, false,
113
                 "Do not start election even if leader failure is detected. ");
114
TAG_FLAG(TEST_do_not_start_election_test_only, runtime);
115
116
DEFINE_bool(evict_failed_followers, true,
117
            "Whether to evict followers from the Raft config that have fallen "
118
            "too far behind the leader's log to catch up normally or have been "
119
            "unreachable by the leader for longer than "
120
            "follower_unavailable_considered_failed_sec");
121
TAG_FLAG(evict_failed_followers, advanced);
122
123
DEFINE_test_flag(bool, follower_reject_update_consensus_requests, false,
124
                 "Whether a follower will return an error for all UpdateConsensus() requests.");
125
126
DEFINE_test_flag(bool, follower_pause_update_consensus_requests, false,
127
                 "Whether a follower will pause all UpdateConsensus() requests.");
128
129
DEFINE_test_flag(int32, follower_reject_update_consensus_requests_seconds, 0,
130
                 "Whether a follower will return an error for all UpdateConsensus() requests for "
131
                 "the first TEST_follower_reject_update_consensus_requests_seconds seconds after "
132
                 "the Consensus objet is created.");
133
134
DEFINE_test_flag(bool, follower_fail_all_prepare, false,
135
                 "Whether a follower will fail preparing all operations.");
136
137
DEFINE_int32(after_stepdown_delay_election_multiplier, 5,
138
             "After a peer steps down as a leader, the factor with which to multiply "
139
             "leader_failure_max_missed_heartbeat_periods to get the delay time before starting a "
140
             "new election.");
141
TAG_FLAG(after_stepdown_delay_election_multiplier, advanced);
142
TAG_FLAG(after_stepdown_delay_election_multiplier, hidden);
143
144
DECLARE_int32(memory_limit_warn_threshold_percentage);
145
146
DEFINE_test_flag(int32, inject_delay_leader_change_role_append_secs, 0,
147
                 "Amount of time to delay leader from sending replicate of change role.");
148
149
DEFINE_test_flag(double, return_error_on_change_config, 0.0,
150
                 "Fraction of the time when ChangeConfig will return an error.");
151
152
METRIC_DEFINE_counter(tablet, follower_memory_pressure_rejections,
153
                      "Follower Memory Pressure Rejections",
154
                      yb::MetricUnit::kRequests,
155
                      "Number of RPC requests rejected due to "
156
                      "memory pressure while FOLLOWER.");
157
METRIC_DEFINE_gauge_int64(tablet, raft_term,
158
                          "Current Raft Consensus Term",
159
                          yb::MetricUnit::kUnits,
160
                          "Current Term of the Raft Consensus algorithm. This number increments "
161
                          "each time a leader election is started.");
162
163
METRIC_DEFINE_lag(tablet, follower_lag_ms,
164
                  "Follower lag from leader",
165
                  "The amount of time since the last UpdateConsensus request from the "
166
                  "leader.", {0, yb::AggregationFunction::kMax} /* optional_args */);
167
168
METRIC_DEFINE_gauge_int64(tablet, is_raft_leader,
169
                          "Is tablet raft leader",
170
                          yb::MetricUnit::kUnits,
171
                          "Keeps track whether tablet is raft leader"
172
                          "1 indicates that the tablet is raft leader");
173
174
METRIC_DEFINE_coarse_histogram(
175
  table, dns_resolve_latency_during_update_raft_config,
176
  "yb.consensus.RaftConsensus.UpdateRaftConfig DNS Resolve",
177
  yb::MetricUnit::kMicroseconds,
178
  "Microseconds spent resolving DNS requests during RaftConsensus::UpdateRaftConfig");
179
180
DEFINE_int32(leader_lease_duration_ms, yb::consensus::kDefaultLeaderLeaseDurationMs,
181
             "Leader lease duration. A leader keeps establishing a new lease or extending the "
182
             "existing one with every UpdateConsensus. A new server is not allowed to serve as a "
183
             "leader (i.e. serve up-to-date read requests or acknowledge write requests) until a "
184
             "lease of this duration has definitely expired on the old leader's side.");
185
186
DEFINE_int32(ht_lease_duration_ms, 2000,
187
             "Hybrid time leader lease duration. A leader keeps establishing a new lease or "
188
             "extending the existing one with every UpdateConsensus. A new server is not allowed "
189
             "to add entries to RAFT log until a lease of the old leader is expired. 0 to disable."
190
             );
191
192
DEFINE_int32(min_leader_stepdown_retry_interval_ms,
193
             20 * 1000,
194
             "Minimum amount of time between successive attempts to perform the leader stepdown "
195
             "for the same combination of tablet and intended (target) leader. This is needed "
196
             "to avoid infinite leader stepdown loops when the current leader never has a chance "
197
             "to update the intended leader with its latest records.");
198
199
DEFINE_bool(use_preelection, true, "Whether to use pre election, before doing actual election.");
200
201
DEFINE_int32(temporary_disable_preelections_timeout_ms, 10 * 60 * 1000,
202
             "If some of nodes does not support preelections, then we disable them for this "
203
             "amount of time.");
204
205
DEFINE_test_flag(bool, pause_update_replica, false,
206
                 "Pause RaftConsensus::UpdateReplica processing before snoozing failure detector.");
207
208
DEFINE_test_flag(bool, pause_update_majority_replicated, false,
209
                 "Pause RaftConsensus::UpdateMajorityReplicated.");
210
211
DEFINE_test_flag(int32, log_change_config_every_n, 1,
212
                 "How often to log change config information. "
213
                 "Used to reduce the number of lines being printed for change config requests "
214
                 "when a test simulates a failure that would generate a log of these requests.");
215
216
DEFINE_bool(enable_lease_revocation, true, "Enables lease revocation mechanism");
217
218
DEFINE_bool(quick_leader_election_on_create, false,
219
            "Do we trigger quick leader elections on table creation.");
220
TAG_FLAG(quick_leader_election_on_create, advanced);
221
TAG_FLAG(quick_leader_election_on_create, hidden);
222
223
DEFINE_bool(
224
    stepdown_disable_graceful_transition, false,
225
    "During a leader stepdown, disable graceful leadership transfer "
226
    "to an up to date peer");
227
228
DEFINE_bool(
229
    raft_disallow_concurrent_outstanding_report_failure_tasks, true,
230
    "If true, only submit a new report failure task if there is not one outstanding.");
231
TAG_FLAG(raft_disallow_concurrent_outstanding_report_failure_tasks, advanced);
232
TAG_FLAG(raft_disallow_concurrent_outstanding_report_failure_tasks, hidden);
233
234
DEFINE_int64(protege_synchronization_timeout_ms, 1000,
235
             "Timeout to synchronize protege before performing step down. "
236
             "0 to disable synchronization.");
237
238
namespace yb {
239
namespace consensus {
240
241
using log::LogEntryBatch;
242
using rpc::PeriodicTimer;
243
using std::shared_ptr;
244
using std::unique_ptr;
245
using std::weak_ptr;
246
using strings::Substitute;
247
using tserver::TabletServerErrorPB;
248
249
namespace {
250
251
10.1k
const RaftPeerPB* FindPeer(const RaftConfigPB& active_config, const std::string& uuid) {
252
20.7k
  for (const RaftPeerPB& peer : active_config.peers()) {
253
20.7k
    if (peer.permanent_uuid() == uuid) {
254
10.1k
      return &peer;
255
10.1k
    }
256
20.7k
  }
257
258
18.4E
  return nullptr;
259
10.1k
}
260
261
// Helper function to check if the op is a non-Operation op.
262
14.7M
bool IsConsensusOnlyOperation(OperationType op_type) {
263
14.7M
  return op_type == NO_OP || 
op_type == CHANGE_CONFIG_OP14.2M
;
264
14.7M
}
265
266
// Helper to check if the op is Change Config op.
267
333k
bool IsChangeConfigOperation(OperationType op_type) {
268
333k
  return op_type == CHANGE_CONFIG_OP;
269
333k
}
270
271
class NonTrackedRoundCallback : public ConsensusRoundCallback {
272
 public:
273
  explicit NonTrackedRoundCallback(ConsensusRound* round, const StdStatusCallback& callback)
274
201k
      : round_(round), callback_(callback) {
275
201k
  }
276
277
67.9k
  void AddedToLeader(const OpId& op_id, const OpId& committed_op_id) override {
278
67.9k
    auto& replicate_msg = *round_->replicate_msg();
279
67.9k
    op_id.ToPB(replicate_msg.mutable_id());
280
67.9k
    committed_op_id.ToPB(replicate_msg.mutable_committed_op_id());
281
67.9k
  }
282
283
  void ReplicationFinished(
284
200k
      const Status& status, int64_t leader_term, OpIds* applied_op_ids) override {
285
200k
    down_cast<RaftConsensus*>(round_->consensus())->NonTrackedRoundReplicationFinished(
286
200k
        round_, callback_, status);
287
200k
  }
288
289
 private:
290
  ConsensusRound* round_;
291
  StdStatusCallback callback_;
292
};
293
294
} // namespace
295
296
std::unique_ptr<ConsensusRoundCallback> MakeNonTrackedRoundCallback(
297
201k
    ConsensusRound* round, const StdStatusCallback& callback) {
298
201k
  return std::make_unique<NonTrackedRoundCallback>(round, callback);
299
201k
}
300
301
struct RaftConsensus::LeaderRequest {
302
  std::string leader_uuid;
303
  yb::OpId preceding_op_id;
304
  yb::OpId committed_op_id;
305
  ReplicateMsgs messages;
306
  // The positional index of the first message selected to be appended, in the
307
  // original leader's request message sequence.
308
  int64_t first_message_idx;
309
310
  std::string OpsRangeString() const;
311
};
312
313
shared_ptr<RaftConsensus> RaftConsensus::Create(
314
    const ConsensusOptions& options,
315
    std::unique_ptr<ConsensusMetadata> cmeta,
316
    const RaftPeerPB& local_peer_pb,
317
    const scoped_refptr<MetricEntity>& table_metric_entity,
318
    const scoped_refptr<MetricEntity>& tablet_metric_entity,
319
    const scoped_refptr<server::Clock>& clock,
320
    ConsensusContext* consensus_context,
321
    rpc::Messenger* messenger,
322
    rpc::ProxyCache* proxy_cache,
323
    const scoped_refptr<log::Log>& log,
324
    const shared_ptr<MemTracker>& server_mem_tracker,
325
    const shared_ptr<MemTracker>& parent_mem_tracker,
326
    const Callback<void(std::shared_ptr<StateChangeContext> context)> mark_dirty_clbk,
327
    TableType table_type,
328
    ThreadPool* raft_pool,
329
    RetryableRequests* retryable_requests,
330
150k
    MultiRaftManager* multi_raft_manager) {
331
332
150k
  auto rpc_factory = std::make_unique<RpcPeerProxyFactory>(
333
150k
    messenger, proxy_cache, local_peer_pb.cloud_info());
334
335
  // The message queue that keeps track of which operations need to be replicated
336
  // where.
337
150k
  auto queue = std::make_unique<PeerMessageQueue>(
338
150k
      tablet_metric_entity,
339
150k
      log,
340
150k
      server_mem_tracker,
341
150k
      parent_mem_tracker,
342
150k
      local_peer_pb,
343
150k
      options.tablet_id,
344
150k
      clock,
345
150k
      consensus_context,
346
150k
      raft_pool->NewToken(ThreadPool::ExecutionMode::SERIAL));
347
348
150k
  DCHECK(local_peer_pb.has_permanent_uuid());
349
150k
  const string& peer_uuid = local_peer_pb.permanent_uuid();
350
351
  // A single Raft thread pool token is shared between RaftConsensus and
352
  // PeerManager. Because PeerManager is owned by RaftConsensus, it receives a
353
  // raw pointer to the token, to emphasize that RaftConsensus is responsible
354
  // for destroying the token.
355
150k
  unique_ptr<ThreadPoolToken> raft_pool_token(raft_pool->NewToken(
356
150k
      ThreadPool::ExecutionMode::CONCURRENT));
357
358
  // A manager for the set of peers that actually send the operations both remotely
359
  // and to the local wal.
360
150k
  auto peer_manager = std::make_unique<PeerManager>(
361
150k
      options.tablet_id,
362
150k
      peer_uuid,
363
150k
      rpc_factory.get(),
364
150k
      queue.get(),
365
150k
      raft_pool_token.get(),
366
150k
      multi_raft_manager);
367
368
150k
  return std::make_shared<RaftConsensus>(
369
150k
      options,
370
150k
      std::move(cmeta),
371
150k
      std::move(rpc_factory),
372
150k
      std::move(queue),
373
150k
      std::move(peer_manager),
374
150k
      std::move(raft_pool_token),
375
150k
      table_metric_entity,
376
150k
      tablet_metric_entity,
377
150k
      peer_uuid,
378
150k
      clock,
379
150k
      consensus_context,
380
150k
      log,
381
150k
      parent_mem_tracker,
382
150k
      mark_dirty_clbk,
383
150k
      table_type,
384
150k
      retryable_requests);
385
150k
}
386
387
RaftConsensus::RaftConsensus(
388
    const ConsensusOptions& options, std::unique_ptr<ConsensusMetadata> cmeta,
389
    std::unique_ptr<PeerProxyFactory> proxy_factory,
390
    std::unique_ptr<PeerMessageQueue> queue,
391
    std::unique_ptr<PeerManager> peer_manager,
392
    std::unique_ptr<ThreadPoolToken> raft_pool_token,
393
    const scoped_refptr<MetricEntity>& table_metric_entity,
394
    const scoped_refptr<MetricEntity>& tablet_metric_entity,
395
    const std::string& peer_uuid, const scoped_refptr<server::Clock>& clock,
396
    ConsensusContext* consensus_context, const scoped_refptr<log::Log>& log,
397
    shared_ptr<MemTracker> parent_mem_tracker,
398
    Callback<void(std::shared_ptr<StateChangeContext> context)> mark_dirty_clbk,
399
    TableType table_type,
400
    RetryableRequests* retryable_requests)
401
    : raft_pool_token_(std::move(raft_pool_token)),
402
      log_(log),
403
      clock_(clock),
404
      peer_proxy_factory_(std::move(proxy_factory)),
405
      peer_manager_(std::move(peer_manager)),
406
      queue_(std::move(queue)),
407
      rng_(GetRandomSeed32()),
408
      withhold_votes_until_(MonoTime::Min()),
409
      step_down_check_tracker_(&peer_proxy_factory_->messenger()->scheduler()),
410
      mark_dirty_clbk_(std::move(mark_dirty_clbk)),
411
      shutdown_(false),
412
      follower_memory_pressure_rejections_(tablet_metric_entity->FindOrCreateCounter(
413
          &METRIC_follower_memory_pressure_rejections)),
414
      term_metric_(tablet_metric_entity->FindOrCreateGauge(&METRIC_raft_term,
415
                                                    cmeta->current_term())),
416
      follower_last_update_time_ms_metric_(
417
          tablet_metric_entity->FindOrCreateAtomicMillisLag(&METRIC_follower_lag_ms)),
418
      is_raft_leader_metric_(tablet_metric_entity->FindOrCreateGauge(&METRIC_is_raft_leader,
419
                                                              static_cast<int64_t>(0))),
420
      parent_mem_tracker_(std::move(parent_mem_tracker)),
421
      table_type_(table_type),
422
      update_raft_config_dns_latency_(
423
          METRIC_dns_resolve_latency_during_update_raft_config.Instantiate(table_metric_entity)),
424
      split_parent_tablet_id_(
425
150k
          cmeta->has_split_parent_tablet_id() ? cmeta->split_parent_tablet_id() : "") {
426
150k
  DCHECK_NOTNULL(log_.get());
427
428
150k
  if (PREDICT_FALSE(FLAGS_TEST_follower_reject_update_consensus_requests_seconds > 0)) {
429
36
    withold_replica_updates_until_ = MonoTime::Now() +
430
36
        MonoDelta::FromSeconds(FLAGS_TEST_follower_reject_update_consensus_requests_seconds);
431
36
  }
432
433
150k
  state_ = std::make_unique<ReplicaState>(
434
150k
      options,
435
150k
      peer_uuid,
436
150k
      std::move(cmeta),
437
150k
      DCHECK_NOTNULL(consensus_context),
438
150k
      this,
439
150k
      retryable_requests,
440
150k
      std::bind(&PeerMessageQueue::TrackOperationsMemory, queue_.get(), _1));
441
442
150k
  peer_manager_->SetConsensus(this);
443
150k
}
444
445
75.4k
RaftConsensus::~RaftConsensus() {
446
75.4k
  Shutdown();
447
75.4k
}
448
449
150k
Status RaftConsensus::Start(const ConsensusBootstrapInfo& info) {
450
150k
  RETURN_NOT_OK(ExecuteHook(PRE_START));
451
452
  // Capture a weak_ptr reference into the functor so it can safely handle
453
  // outliving the consensus instance.
454
150k
  std::weak_ptr<RaftConsensus> w = shared_from_this();
455
150k
  failure_detector_ = PeriodicTimer::Create(
456
150k
      peer_proxy_factory_->messenger(),
457
818k
      [w]() {
458
818k
        if (auto consensus = w.lock()) {
459
818k
          consensus->ReportFailureDetected();
460
818k
        }
461
818k
      },
462
150k
      MinimumElectionTimeout());
463
464
150k
  {
465
150k
    ReplicaState::UniqueLock lock;
466
150k
    RETURN_NOT_OK(state_->LockForStart(&lock));
467
150k
    state_->ClearLeaderUnlocked();
468
469
150k
    RETURN_NOT_OK_PREPEND(state_->StartUnlocked(info.last_id),
470
150k
                          "Unable to start RAFT ReplicaState");
471
472
150k
    LOG_WITH_PREFIX(INFO) << "Replica starting. Triggering "
473
150k
                          << info.orphaned_replicates.size()
474
150k
                          << " pending operations. Active config: "
475
150k
                          << state_->GetActiveConfigUnlocked().ShortDebugString();
476
150k
    for (const auto& replicate : info.orphaned_replicates) {
477
156
      ReplicateMsgPtr replicate_ptr = std::make_shared<ReplicateMsg>(*replicate);
478
156
      RETURN_NOT_OK(StartReplicaOperationUnlocked(replicate_ptr, HybridTime::kInvalid));
479
156
    }
480
481
150k
    RETURN_NOT_OK(state_->InitCommittedOpIdUnlocked(yb::OpId::FromPB(info.last_committed_id)));
482
483
150k
    queue_->Init(state_->GetLastReceivedOpIdUnlocked());
484
150k
  }
485
486
0
  {
487
150k
    ReplicaState::UniqueLock lock;
488
150k
    RETURN_NOT_OK(state_->LockForConfigChange(&lock));
489
490
    // If this is the first term expire the FD immediately so that we have a fast first
491
    // election, otherwise we just let the timer expire normally.
492
150k
    MonoDelta initial_delta = MonoDelta();
493
150k
    if (state_->GetCurrentTermUnlocked() == 0) {
494
      // The failure detector is initialized to a low value to trigger an early election
495
      // (unless someone else requested a vote from us first, which resets the
496
      // election timer). We do it this way instead of immediately running an
497
      // election to get a higher likelihood of enough servers being available
498
      // when the first one attempts an election to avoid multiple election
499
      // cycles on startup, while keeping that "waiting period" random. If there is only one peer,
500
      // trigger an election right away.
501
147k
      if (PREDICT_TRUE(FLAGS_enable_leader_failure_detection)) {
502
147k
        LOG_WITH_PREFIX(INFO) << "Consensus starting up: Expiring fail detector timer "
503
147k
                                 "to make a prompt election more likely";
504
        // Gating quick leader elections on table creation since prompter leader elections are
505
        // more likely to fail due to uninitialized peers or conflicting elections, which could
506
        // have unforseen consequences.
507
147k
        if (FLAGS_quick_leader_election_on_create) {
508
0
          initial_delta = (state_->GetCommittedConfigUnlocked().peers_size() == 1) ?
509
0
              MonoDelta::kZero :
510
0
              MonoDelta::FromMilliseconds(rng_.Uniform(FLAGS_raft_heartbeat_interval_ms));
511
0
        }
512
147k
      }
513
147k
    }
514
150k
    RETURN_NOT_OK(BecomeReplicaUnlocked(std::string(), initial_delta));
515
150k
  }
516
517
150k
  RETURN_NOT_OK(ExecuteHook(POST_START));
518
519
  // The context tracks that the current caller does not hold the lock for consensus state.
520
  // So mark dirty callback, e.g., consensus->ConsensusState() for master consensus callback of
521
  // SysCatalogStateChanged, can get the lock when needed.
522
150k
  auto context = std::make_shared<StateChangeContext>(StateChangeReason::CONSENSUS_STARTED, false);
523
  // Report become visible to the Master.
524
150k
  MarkDirty(context);
525
526
150k
  return Status::OK();
527
150k
}
528
529
7.96k
bool RaftConsensus::IsRunning() const {
530
7.96k
  auto lock = state_->LockForRead();
531
7.96k
  return state_->state() == ReplicaState::kRunning;
532
7.96k
}
533
534
86
Status RaftConsensus::EmulateElection() {
535
86
  ReplicaState::UniqueLock lock;
536
86
  RETURN_NOT_OK(state_->LockForConfigChange(&lock));
537
538
86
  LOG_WITH_PREFIX(INFO) << "Emulating election...";
539
540
  // Assume leadership of new term.
541
86
  RETURN_NOT_OK(IncrementTermUnlocked());
542
86
  SetLeaderUuidUnlocked(state_->GetPeerUuid());
543
86
  return BecomeLeaderUnlocked();
544
86
}
545
546
1.22M
Status RaftConsensus::DoStartElection(const LeaderElectionData& data, PreElected preelected) {
547
1.22M
  TRACE_EVENT2("consensus", "RaftConsensus::StartElection",
548
1.22M
               "peer", peer_uuid(),
549
1.22M
               "tablet", tablet_id());
550
1.22M
  
VLOG_WITH_PREFIX_AND_FUNC47
(1) << data.ToString()47
;
551
1.22M
  if (FLAGS_TEST_do_not_start_election_test_only) {
552
11
    LOG(INFO) << "Election start skipped as TEST_do_not_start_election_test_only flag "
553
11
                 "is set to true.";
554
11
    return Status::OK();
555
11
  }
556
557
  // If pre-elections disabled or we already won pre-election then start regular election,
558
  // otherwise pre-election is started.
559
  // Pre-elections could be disable via flag, or temporarily if some nodes do not support them.
560
1.22M
  auto preelection = ANNOTATE_UNPROTECTED_READ(FLAGS_use_preelection) && 
!preelected1.22M
&&
561
1.22M
                     
disable_pre_elections_until_ < CoarseMonoClock::now()674k
;
562
1.22M
  const char* election_name = preelection ? 
"pre-election"675k
:
"election"550k
;
563
564
1.22M
  LeaderElectionPtr election;
565
1.22M
  {
566
1.22M
    ReplicaState::UniqueLock lock;
567
1.22M
    RETURN_NOT_OK(state_->LockForConfigChange(&lock));
568
569
1.22M
    if (data.initial_election && 
state_->GetCurrentTermUnlocked() != 089.5k
) {
570
1
      LOG_WITH_PREFIX(INFO) << "Not starting initial " << election_name << " -- non zero term";
571
1
      return Status::OK();
572
1
    }
573
574
1.22M
    PeerRole active_role = state_->GetActiveRoleUnlocked();
575
1.22M
    if (active_role == PeerRole::LEADER) {
576
50
      LOG_WITH_PREFIX(INFO) << "Not starting " << election_name << " -- already leader";
577
50
      return Status::OK();
578
50
    }
579
1.22M
    
if (1.22M
active_role == PeerRole::LEARNER1.22M
|| active_role == PeerRole::READ_REPLICA) {
580
217
      LOG_WITH_PREFIX(INFO) << "Not starting " << election_name << " -- role is " << active_role
581
217
                            << ", pending = " << state_->IsConfigChangePendingUnlocked()
582
217
                            << ", active_role=" << active_role;
583
217
      return Status::OK();
584
217
    }
585
1.22M
    if (PREDICT_FALSE(active_role == PeerRole::NON_PARTICIPANT)) {
586
33
      
VLOG_WITH_PREFIX0
(1) << "Not starting " << election_name << " -- non participant"0
;
587
      // Avoid excessive election noise while in this state.
588
33
      SnoozeFailureDetector(DO_NOT_LOG);
589
33
      return STATUS_FORMAT(
590
33
          IllegalState,
591
33
          "Not starting $0: Node is currently a non-participant in the raft config: $1",
592
33
          election_name, state_->GetActiveConfigUnlocked());
593
33
    }
594
595
    // Default is to start the election now. But if we are starting a pending election, see if
596
    // there is an op id pending upon indeed and if it has been committed to the log. The op id
597
    // could have been cleared if the pending election has already been started or another peer
598
    // has jumped before we can start.
599
1.22M
    bool start_now = true;
600
1.22M
    if (data.pending_commit) {
601
0
      const auto required_id = !data.must_be_committed_opid
602
0
          ? state_->GetPendingElectionOpIdUnlocked() : data.must_be_committed_opid;
603
0
      const Status advance_committed_index_status = ResultToStatus(
604
0
          state_->AdvanceCommittedOpIdUnlocked(required_id, CouldStop::kFalse));
605
0
      if (!advance_committed_index_status.ok()) {
606
0
        LOG(WARNING) << "Starting an " << election_name << " but the latest committed OpId is not "
607
0
                        "present in this peer's log: "
608
0
                     << required_id << ". " << "Status: " << advance_committed_index_status;
609
0
      }
610
0
      start_now = required_id.index <= state_->GetCommittedOpIdUnlocked().index;
611
0
    }
612
613
1.22M
    if (
start_now1.22M
) {
614
1.22M
      if (state_->HasLeaderUnlocked()) {
615
492k
        LOG_WITH_PREFIX(INFO) << "Fail of leader " << state_->GetLeaderUuidUnlocked()
616
492k
                              << " detected. Triggering leader " << election_name
617
492k
                              << ", mode=" << data.mode;
618
733k
      } else {
619
733k
        LOG_WITH_PREFIX(INFO) << "Triggering leader " << election_name << ", mode=" << data.mode;
620
733k
      }
621
622
      // Snooze to avoid the election timer firing again as much as possible.
623
      // We do not disable the election timer while running an election.
624
1.22M
      MonoDelta timeout = LeaderElectionExpBackoffDeltaUnlocked();
625
1.22M
      SnoozeFailureDetector(ALLOW_LOGGING, timeout);
626
627
1.22M
      election = 
VERIFY_RESULT737k
(CreateElectionUnlocked(data, timeout, PreElection(preelection)));
628
18.4E
    } else if (data.pending_commit && 
!data.must_be_committed_opid.empty()0
) {
629
      // Queue up the pending op id if specified.
630
0
      state_->SetPendingElectionOpIdUnlocked(data.must_be_committed_opid);
631
0
      LOG_WITH_PREFIX(INFO)
632
0
          << "Leader " << election_name << " is pending upon log commitment of OpId "
633
0
          << data.must_be_committed_opid;
634
18.4E
    } else {
635
18.4E
      LOG_WITH_PREFIX(INFO) << "Ignore " << __func__ << " existing wait on op id";
636
18.4E
    }
637
1.22M
  }
638
639
  // Start the election outside the lock.
640
737k
  
if (737k
election737k
) {
641
737k
    election->Run();
642
737k
  }
643
644
737k
  return Status::OK();
645
1.22M
}
646
647
Result<LeaderElectionPtr> RaftConsensus::CreateElectionUnlocked(
648
1.22M
    const LeaderElectionData& data, MonoDelta timeout, PreElection preelection) {
649
1.22M
  int64_t new_term;
650
1.22M
  if (preelection) {
651
674k
    new_term = state_->GetCurrentTermUnlocked() + 1;
652
674k
  } else {
653
    // Increment the term.
654
550k
    RETURN_NOT_OK(IncrementTermUnlocked());
655
62.8k
    new_term = state_->GetCurrentTermUnlocked();
656
62.8k
  }
657
658
737k
  const RaftConfigPB& active_config = state_->GetActiveConfigUnlocked();
659
737k
  LOG_WITH_PREFIX(INFO) << "Starting " << (preelection ? 
"pre-"674k
:
""62.8k
) << "election with config: "
660
737k
                        << active_config.ShortDebugString();
661
662
  // Initialize the VoteCounter.
663
737k
  auto num_voters = CountVoters(active_config);
664
737k
  auto majority_size = MajoritySize(num_voters);
665
666
  // Vote for ourselves.
667
737k
  if (!preelection) {
668
    // TODO: Consider using a separate Mutex for voting, which must sync to disk.
669
62.8k
    RETURN_NOT_OK(state_->SetVotedForCurrentTermUnlocked(state_->GetPeerUuid()));
670
62.8k
  }
671
672
737k
  auto counter = std::make_unique<VoteCounter>(num_voters, majority_size);
673
737k
  bool duplicate;
674
737k
  RETURN_NOT_OK(counter->RegisterVote(state_->GetPeerUuid(), ElectionVote::kGranted, &duplicate));
675
737k
  CHECK(!duplicate) << state_->LogPrefix()
676
71
                    << "Inexplicable duplicate self-vote for term "
677
71
                    << state_->GetCurrentTermUnlocked();
678
679
737k
  VoteRequestPB request;
680
737k
  request.set_ignore_live_leader(data.mode == ElectionMode::ELECT_EVEN_IF_LEADER_IS_ALIVE);
681
737k
  request.set_candidate_uuid(state_->GetPeerUuid());
682
737k
  request.set_candidate_term(new_term);
683
737k
  request.set_tablet_id(state_->GetOptions().tablet_id);
684
737k
  request.set_preelection(preelection);
685
737k
  state_->GetLastReceivedOpIdUnlocked().ToPB(
686
737k
      request.mutable_candidate_status()->mutable_last_received());
687
688
737k
  LeaderElectionPtr result(new LeaderElection(
689
737k
      active_config,
690
737k
      peer_proxy_factory_.get(),
691
737k
      request,
692
737k
      std::move(counter),
693
737k
      timeout,
694
737k
      preelection,
695
737k
      data.suppress_vote_request,
696
737k
      std::bind(&RaftConsensus::ElectionCallback, shared_from_this(), data, _1)));
697
698
737k
  if (!preelection) {
699
    // Clear the pending election op id so that we won't start the same pending election again.
700
    // Pre-election does not change state, so should not do it in this case.
701
62.8k
    state_->ClearPendingElectionOpIdUnlocked();
702
62.8k
  }
703
704
737k
  return result;
705
737k
}
706
707
8
Status RaftConsensus::WaitUntilLeaderForTests(const MonoDelta& timeout) {
708
8
  MonoTime deadline = MonoTime::Now();
709
8
  deadline.AddDelta(timeout);
710
286
  while (MonoTime::Now().ComesBefore(deadline)) {
711
286
    if (GetLeaderStatus() == LeaderStatus::LEADER_AND_READY) {
712
8
      return Status::OK();
713
8
    }
714
278
    SleepFor(MonoDelta::FromMilliseconds(10));
715
278
  }
716
717
0
  return STATUS(TimedOut, Substitute("Peer $0 is not leader of tablet $1 after $2. Role: $3",
718
8
                                     peer_uuid(), tablet_id(), timeout.ToString(), role()));
719
8
}
720
721
11.3k
string RaftConsensus::ServersInTransitionMessage() {
722
11.3k
  string err_msg;
723
11.3k
  const RaftConfigPB& active_config = state_->GetActiveConfigUnlocked();
724
11.3k
  const RaftConfigPB& committed_config = state_->GetCommittedConfigUnlocked();
725
11.3k
  auto servers_in_transition = CountServersInTransition(active_config);
726
11.3k
  auto committed_servers_in_transition = CountServersInTransition(committed_config);
727
11.3k
  LOG(INFO) << Substitute("Active config has $0 and committed has $1 servers in transition.",
728
11.3k
                          servers_in_transition, committed_servers_in_transition);
729
11.3k
  if (servers_in_transition != 0 || 
committed_servers_in_transition != 011.1k
) {
730
225
    err_msg = Substitute("Leader not ready to step down as there are $0 active config peers"
731
225
                         " in transition, $1 in committed. Configs:\nactive=$2\ncommit=$3",
732
225
                         servers_in_transition, committed_servers_in_transition,
733
225
                         active_config.ShortDebugString(), committed_config.ShortDebugString());
734
225
    LOG(INFO) << err_msg;
735
225
  }
736
11.3k
  return err_msg;
737
11.3k
}
738
739
10.0k
Status RaftConsensus::StartStepDownUnlocked(const RaftPeerPB& peer, bool graceful) {
740
10.0k
  auto election_state = std::make_shared<RunLeaderElectionState>();
741
10.0k
  election_state->proxy = peer_proxy_factory_->NewProxy(peer);
742
10.0k
  election_state->req.set_originator_uuid(state_->GetPeerUuid());
743
10.0k
  election_state->req.set_dest_uuid(peer.permanent_uuid());
744
10.0k
  election_state->req.set_tablet_id(state_->GetOptions().tablet_id);
745
10.0k
  election_state->rpc.set_invoke_callback_mode(rpc::InvokeCallbackMode::kThreadPoolHigh);
746
10.0k
  election_state->proxy->RunLeaderElectionAsync(
747
10.0k
      &election_state->req, &election_state->resp, &election_state->rpc,
748
10.0k
      std::bind(&RaftConsensus::RunLeaderElectionResponseRpcCallback, this,
749
10.0k
          election_state));
750
751
10.0k
  LOG_WITH_PREFIX(INFO) << "Transferring leadership to " << peer.permanent_uuid();
752
753
10.0k
  return BecomeReplicaUnlocked(
754
10.0k
      graceful ? 
std::string()569
:
peer.permanent_uuid()9.43k
, MonoDelta());
755
10.0k
}
756
757
86
void RaftConsensus::CheckDelayedStepDown(const Status& status) {
758
86
  ReplicaState::UniqueLock lock;
759
86
  auto lock_status = state_->LockForConfigChange(&lock);
760
86
  if (!lock_status.ok()) {
761
9
    LOG_WITH_PREFIX(INFO) << "Failed to check delayed election: " << lock_status;
762
9
    return;
763
9
  }
764
765
77
  if (state_->GetCurrentTermUnlocked() != delayed_step_down_.term) {
766
68
    return;
767
68
  }
768
769
9
  const auto& config = state_->GetActiveConfigUnlocked();
770
9
  const auto* peer = FindPeer(config, delayed_step_down_.protege);
771
9
  if (peer) {
772
9
    LOG_WITH_PREFIX(INFO) << "Step down in favor on not synchronized protege: "
773
9
                          << delayed_step_down_.protege;
774
9
    WARN_NOT_OK(StartStepDownUnlocked(*peer, delayed_step_down_.graceful),
775
9
                "Start step down failed");
776
9
  } else {
777
0
    LOG_WITH_PREFIX(INFO) << "Failed to synchronize with protege " << delayed_step_down_.protege
778
0
                          << " and cannot find it in config: " << config.ShortDebugString();
779
0
    delayed_step_down_.term = OpId::kUnknownTerm;
780
0
  }
781
9
}
782
783
54.1k
Status RaftConsensus::StepDown(const LeaderStepDownRequestPB* req, LeaderStepDownResponsePB* resp) {
784
54.1k
  TRACE_EVENT0("consensus", "RaftConsensus::StepDown");
785
54.1k
  ReplicaState::UniqueLock lock;
786
54.1k
  RETURN_NOT_OK(state_->LockForConfigChange(&lock));
787
788
  // A sanity check that this request was routed to the correct RaftConsensus.
789
54.1k
  const auto& tablet_id = req->tablet_id();
790
54.1k
  if (tablet_id != this->tablet_id()) {
791
0
    resp->mutable_error()->set_code(TabletServerErrorPB::UNKNOWN_ERROR);
792
0
    const auto msg = Format(
793
0
        "Received a leader stepdown operation for wrong tablet id: $0, must be: $1",
794
0
        tablet_id, this->tablet_id());
795
0
    LOG_WITH_PREFIX(ERROR) << msg;
796
0
    StatusToPB(STATUS(IllegalState, msg), resp->mutable_error()->mutable_status());
797
0
    return Status::OK();
798
0
  }
799
800
54.1k
  if (state_->GetActiveRoleUnlocked() != PeerRole::LEADER) {
801
42.7k
    resp->mutable_error()->set_code(TabletServerErrorPB::NOT_THE_LEADER);
802
42.7k
    StatusToPB(STATUS(IllegalState, "Not currently leader"),
803
42.7k
               resp->mutable_error()->mutable_status());
804
    // We return OK so that the tablet service won't overwrite the error code.
805
42.7k
    return Status::OK();
806
42.7k
  }
807
808
  // The leader needs to be ready to perform a step down. There should be no PRE_VOTER in both
809
  // active and committed configs - ENG-557.
810
11.3k
  const string err_msg = ServersInTransitionMessage();
811
11.3k
  if (!err_msg.empty()) {
812
225
    resp->mutable_error()->set_code(TabletServerErrorPB::LEADER_NOT_READY_TO_STEP_DOWN);
813
225
    StatusToPB(STATUS(IllegalState, err_msg), resp->mutable_error()->mutable_status());
814
225
    return Status::OK();
815
225
  }
816
817
11.0k
  std::string new_leader_uuid;
818
  // If a new leader is nominated, find it among peers to send RunLeaderElection request.
819
  // See https://ramcloud.stanford.edu/~ongaro/thesis.pdf, section 3.10 for this mechanism
820
  // to transfer the leadership.
821
11.0k
  const bool forced = (req->has_force_step_down() && 
req->force_step_down()0
);
822
11.0k
  if (req->has_new_leader_uuid()) {
823
10.5k
    new_leader_uuid = req->new_leader_uuid();
824
10.5k
    if (!forced && !queue_->CanPeerBecomeLeader(new_leader_uuid)) {
825
166
      resp->mutable_error()->set_code(TabletServerErrorPB::LEADER_NOT_READY_TO_STEP_DOWN);
826
166
      StatusToPB(
827
166
          STATUS(IllegalState, "Suggested peer is not caught up yet"),
828
166
          resp->mutable_error()->mutable_status());
829
      // We return OK so that the tablet service won't overwrite the error code.
830
166
      return Status::OK();
831
166
    }
832
10.5k
  }
833
834
10.9k
  bool graceful_stepdown = false;
835
10.9k
  if (new_leader_uuid.empty() && 
!FLAGS_stepdown_disable_graceful_transition589
&&
836
10.9k
      
!(589
req->has_disable_graceful_transition()589
&&
req->disable_graceful_transition()1
)) {
837
588
    new_leader_uuid = queue_->GetUpToDatePeer();
838
588
    LOG_WITH_PREFIX(INFO) << "Selected up to date candidate protege leader [" << new_leader_uuid
839
588
                          << "]";
840
588
    graceful_stepdown = true;
841
588
  }
842
843
10.9k
  const auto& local_peer_uuid = state_->GetPeerUuid();
844
10.9k
  if (
!new_leader_uuid.empty()10.9k
) {
845
10.9k
    const auto leadership_transfer_description =
846
10.9k
        Format("tablet $0 from $1 to $2", tablet_id, local_peer_uuid, new_leader_uuid);
847
10.9k
    if (
!forced10.9k
&& new_leader_uuid == protege_leader_uuid_ &&
election_lost_by_protege_at_1.08k
) {
848
923
      const MonoDelta time_since_election_loss_by_protege =
849
923
          MonoTime::Now() - election_lost_by_protege_at_;
850
923
      if (time_since_election_loss_by_protege.ToMilliseconds() <
851
923
              FLAGS_min_leader_stepdown_retry_interval_ms) {
852
908
        LOG_WITH_PREFIX(INFO) << "Unable to execute leadership transfer for "
853
908
                              << leadership_transfer_description
854
908
                              << " because the intended leader already lost an election only "
855
908
                              << ToString(time_since_election_loss_by_protege) << " ago (within "
856
908
                              << FLAGS_min_leader_stepdown_retry_interval_ms << " ms).";
857
908
        if (req->has_new_leader_uuid()) {
858
903
          LOG_WITH_PREFIX(INFO) << "Rejecting leader stepdown request for "
859
903
                                << leadership_transfer_description;
860
903
          resp->mutable_error()->set_code(TabletServerErrorPB::LEADER_NOT_READY_TO_STEP_DOWN);
861
903
          resp->set_time_since_election_failure_ms(
862
903
              time_since_election_loss_by_protege.ToMilliseconds());
863
903
          StatusToPB(
864
903
              STATUS(IllegalState, "Suggested peer lost an election recently"),
865
903
              resp->mutable_error()->mutable_status());
866
          // We return OK so that the tablet service won't overwrite the error code.
867
903
          return Status::OK();
868
903
        } else {
869
          // we were attempting a graceful transfer of our own choice
870
          // which is no longer possible
871
5
          new_leader_uuid.clear();
872
5
        }
873
908
      }
874
20
      election_lost_by_protege_at_ = MonoTime();
875
20
    }
876
10.9k
  }
877
878
10.0k
  if (!new_leader_uuid.empty()) {
879
10.0k
    const auto* peer = FindPeer(state_->GetActiveConfigUnlocked(), new_leader_uuid);
880
10.0k
    if (
peer10.0k
&& peer->member_type() == PeerMemberType::VOTER) {
881
10.0k
      auto timeout_ms = FLAGS_protege_synchronization_timeout_ms;
882
10.0k
      if (timeout_ms != 0 &&
883
10.0k
          
queue_->PeerLastReceivedOpId(new_leader_uuid) < GetLatestOpIdFromLog()10.0k
) {
884
86
        delayed_step_down_ = DelayedStepDown {
885
86
          .term = state_->GetCurrentTermUnlocked(),
886
86
          .protege = new_leader_uuid,
887
86
          .graceful = graceful_stepdown,
888
86
        };
889
86
        LOG_WITH_PREFIX(INFO) << "Delay step down: " << delayed_step_down_.ToString();
890
86
        step_down_check_tracker_.Schedule(
891
86
            std::bind(&RaftConsensus::CheckDelayedStepDown, this, _1),
892
86
            1ms * timeout_ms);
893
86
        return Status::OK();
894
86
      }
895
896
9.92k
      return StartStepDownUnlocked(*peer, graceful_stepdown);
897
10.0k
    }
898
899
10
    LOG_WITH_PREFIX(WARNING) << "New leader " << new_leader_uuid << " not found.";
900
10
    if (req->has_new_leader_uuid()) {
901
0
      resp->mutable_error()->set_code(TabletServerErrorPB::LEADER_NOT_READY_TO_STEP_DOWN);
902
0
      StatusToPB(
903
0
          STATUS(IllegalState, "New leader not found among peers"),
904
0
          resp->mutable_error()->mutable_status());
905
      // We return OK so that the tablet service won't overwrite the error code.
906
0
      return Status::OK();
907
10
    } else {
908
      // we were attempting a graceful transfer of our own choice
909
      // which is no longer possible
910
10
      new_leader_uuid.clear();
911
10
    }
912
10
  }
913
914
19
  
if (15
graceful_stepdown15
) {
915
19
    new_leader_uuid.clear();
916
19
  }
917
15
  RETURN_NOT_OK(BecomeReplicaUnlocked(new_leader_uuid, MonoDelta()));
918
919
15
  return Status::OK();
920
15
}
921
922
77
Status RaftConsensus::ElectionLostByProtege(const std::string& election_lost_by_uuid) {
923
77
  if (election_lost_by_uuid.empty()) {
924
0
    return STATUS(InvalidArgument, "election_lost_by_uuid could not be empty");
925
0
  }
926
927
77
  auto start_election = false;
928
77
  {
929
77
    ReplicaState::UniqueLock lock;
930
77
    RETURN_NOT_OK(state_->LockForConfigChange(&lock));
931
77
    if (election_lost_by_uuid == protege_leader_uuid_) {
932
69
      LOG_WITH_PREFIX(INFO) << "Our protege " << election_lost_by_uuid
933
69
                            << ", lost election. Has leader: "
934
69
                            << state_->HasLeaderUnlocked();
935
69
      withhold_election_start_until_.store(MonoTime::Min(), std::memory_order_relaxed);
936
69
      election_lost_by_protege_at_ = MonoTime::Now();
937
938
69
      start_election = !state_->HasLeaderUnlocked();
939
69
    }
940
77
  }
941
942
77
  if (start_election) {
943
68
    return StartElection({ElectionMode::NORMAL_ELECTION});
944
68
  }
945
946
9
  return Status::OK();
947
77
}
948
949
10.9k
void RaftConsensus::WithholdElectionAfterStepDown(const std::string& protege_uuid) {
950
10.9k
  DCHECK(state_->IsLocked());
951
10.9k
  protege_leader_uuid_ = protege_uuid;
952
10.9k
  auto timeout = MonoDelta::FromMilliseconds(
953
10.9k
      FLAGS_leader_failure_max_missed_heartbeat_periods *
954
10.9k
      FLAGS_raft_heartbeat_interval_ms);
955
10.9k
  if (!protege_uuid.empty()) {
956
    // Actually we have 2 kinds of step downs.
957
    // 1) We step down in favor of some protege.
958
    // 2) We step down because term was advanced or just started.
959
    // In second case we should not withhold election for a long period of time.
960
9.44k
    timeout *= FLAGS_after_stepdown_delay_election_multiplier;
961
9.44k
  }
962
10.9k
  auto deadline = MonoTime::Now() + timeout;
963
10.9k
  VLOG
(2) << "Withholding election for " << timeout0
;
964
10.9k
  withhold_election_start_until_.store(deadline, std::memory_order_release);
965
10.9k
  election_lost_by_protege_at_ = MonoTime();
966
10.9k
}
967
968
void RaftConsensus::RunLeaderElectionResponseRpcCallback(
969
10.0k
    shared_ptr<RunLeaderElectionState> election_state) {
970
  // Check for RPC errors.
971
10.0k
  if (!election_state->rpc.status().ok()) {
972
15
    LOG_WITH_PREFIX(WARNING) << "RPC error from RunLeaderElection() call to peer "
973
15
                             << election_state->req.dest_uuid() << ": "
974
15
                             << election_state->rpc.status();
975
  // Check for tablet errors.
976
9.99k
  } else if (election_state->resp.has_error()) {
977
0
    LOG_WITH_PREFIX(WARNING) << "Tablet error from RunLeaderElection() call to peer "
978
0
                             << election_state->req.dest_uuid() << ": "
979
0
                             << StatusFromPB(election_state->resp.error().status());
980
0
  }
981
10.0k
}
982
983
617k
void RaftConsensus::ReportFailureDetectedTask() {
984
617k
  auto scope_exit = ScopeExit([this] {
985
617k
    outstanding_report_failure_task_.clear(std::memory_order_release);
986
617k
  });
987
988
617k
  MonoTime now;
989
617k
  for (;;) {
990
    // Do not start election for an extended period of time if we were recently stepped down.
991
617k
    auto old_value = withhold_election_start_until_.load(std::memory_order_acquire);
992
993
617k
    if (old_value == MonoTime::Min()) {
994
616k
      break;
995
616k
    }
996
997
1.39k
    
if (1.39k
!now.Initialized()1.39k
) {
998
1.39k
      now = MonoTime::Now();
999
1.39k
    }
1000
1001
1.39k
    if (now < old_value) {
1002
361
      VLOG
(1) << "Skipping election due to delayed timeout for " << (old_value - now)0
;
1003
361
      return;
1004
361
    }
1005
1006
    // If we ever stepped down and then delayed election start did get scheduled, reset that we
1007
    // are out of that extra delay mode.
1008
1.03k
    if (withhold_election_start_until_.compare_exchange_weak(
1009
1.03k
        old_value, MonoTime::Min(), std::memory_order_release)) {
1010
1.03k
      break;
1011
1.03k
    }
1012
1.03k
  }
1013
1014
  // Start an election.
1015
617k
  LOG_WITH_PREFIX(INFO) << "ReportFailDetected: Starting NORMAL_ELECTION...";
1016
617k
  Status s = StartElection({ElectionMode::NORMAL_ELECTION});
1017
617k
  if (PREDICT_FALSE(!s.ok())) {
1018
33
    LOG_WITH_PREFIX(WARNING) << "Failed to trigger leader election: " << s.ToString();
1019
33
  }
1020
617k
}
1021
1022
818k
void RaftConsensus::ReportFailureDetected() {
1023
818k
  if (FLAGS_raft_disallow_concurrent_outstanding_report_failure_tasks &&
1024
818k
      
outstanding_report_failure_task_.test_and_set(std::memory_order_acq_rel)818k
) {
1025
201k
    VLOG(4)
1026
2
        << "Returning from ReportFailureDetected as there is already an outstanding report task.";
1027
617k
  } else {
1028
    // We're running on a timer thread; start an election on a different thread pool.
1029
617k
    auto s = raft_pool_token_->SubmitFunc(
1030
617k
        std::bind(&RaftConsensus::ReportFailureDetectedTask, shared_from_this()));
1031
617k
    WARN_NOT_OK(s, "Failed to submit failure detected task");
1032
617k
    if (!s.ok()) {
1033
0
      outstanding_report_failure_task_.clear(std::memory_order_release);
1034
0
    }
1035
617k
  }
1036
818k
}
1037
1038
62.0k
Status RaftConsensus::BecomeLeaderUnlocked() {
1039
62.0k
  DCHECK(state_->IsLocked());
1040
62.0k
  TRACE_EVENT2("consensus", "RaftConsensus::BecomeLeaderUnlocked",
1041
62.0k
               "peer", peer_uuid(),
1042
62.0k
               "tablet", tablet_id());
1043
62.0k
  LOG_WITH_PREFIX(INFO) << "Becoming Leader. State: " << state_->ToStringUnlocked();
1044
1045
  // Disable FD while we are leader.
1046
62.0k
  DisableFailureDetector();
1047
1048
  // Don't vote for anyone if we're a leader.
1049
62.0k
  withhold_votes_until_.store(MonoTime::Max(), std::memory_order_release);
1050
1051
62.0k
  queue_->RegisterObserver(this);
1052
1053
  // Refresh queue and peers before initiating NO_OP.
1054
62.0k
  RefreshConsensusQueueAndPeersUnlocked();
1055
1056
  // Initiate a NO_OP operation that is sent at the beginning of every term
1057
  // change in raft.
1058
62.0k
  auto replicate = std::make_shared<ReplicateMsg>();
1059
62.0k
  replicate->set_op_type(NO_OP);
1060
62.0k
  replicate->mutable_noop_request(); // Define the no-op request field.
1061
62.0k
  LOG(INFO) << "Sending NO_OP at op " << state_->GetCommittedOpIdUnlocked();
1062
  // This committed OpId is used for tablet bootstrap for RocksDB-backed tables.
1063
62.0k
  state_->GetCommittedOpIdUnlocked().ToPB(replicate->mutable_committed_op_id());
1064
1065
  // TODO: We should have no-ops (?) and config changes be COMMIT_WAIT
1066
  // operations. See KUDU-798.
1067
  // Note: This hybrid_time has no meaning from a serialization perspective
1068
  // because this method is not executed on the TabletPeer's prepare thread.
1069
62.0k
  replicate->set_hybrid_time(clock_->Now().ToUint64());
1070
1071
62.0k
  auto round = make_scoped_refptr<ConsensusRound>(this, replicate);
1072
62.0k
  round->SetCallback(MakeNonTrackedRoundCallback(round.get(),
1073
62.0k
      [this, term = state_->GetCurrentTermUnlocked()](const Status& status) {
1074
    // Set 'Leader is ready to serve' flag only for committed NoOp operation
1075
    // and only if the term is up-to-date.
1076
    // It is guaranteed that successful notification is called only while holding replicate state
1077
    // mutex.
1078
61.8k
    if (
status.ok()61.8k
&&
term == state_->GetCurrentTermUnlocked()61.7k
) {
1079
61.8k
      state_->SetLeaderNoOpCommittedUnlocked(true);
1080
61.8k
    }
1081
61.8k
  }));
1082
62.0k
  RETURN_NOT_OK(AppendNewRoundToQueueUnlocked(round));
1083
1084
62.0k
  peer_manager_->SignalRequest(RequestTriggerMode::kNonEmptyOnly);
1085
1086
  // Set the timestamp to max uint64_t so that every time this metric is queried, the returned
1087
  // lag is 0. We will need to restore the timestamp once this peer steps down.
1088
62.0k
  follower_last_update_time_ms_metric_->UpdateTimestampInMilliseconds(
1089
62.0k
      std::numeric_limits<int64_t>::max());
1090
62.0k
  is_raft_leader_metric_->set_value(1);
1091
1092
62.0k
  return Status::OK();
1093
62.0k
}
1094
1095
Status RaftConsensus::BecomeReplicaUnlocked(
1096
161k
    const std::string& new_leader_uuid, MonoDelta initial_fd_wait) {
1097
161k
  LOG_WITH_PREFIX(INFO)
1098
161k
      << "Becoming Follower/Learner. State: " << state_->ToStringUnlocked()
1099
161k
      << ", new leader: " << new_leader_uuid << ", initial_fd_wait: " << initial_fd_wait;
1100
1101
161k
  if (state_->GetActiveRoleUnlocked() == PeerRole::LEADER) {
1102
10.9k
    WithholdElectionAfterStepDown(new_leader_uuid);
1103
10.9k
  }
1104
1105
161k
  state_->ClearLeaderUnlocked();
1106
1107
  // FD should be running while we are a follower.
1108
161k
  EnableFailureDetector(initial_fd_wait);
1109
1110
  // Now that we're a replica, we can allow voting for other nodes.
1111
161k
  withhold_votes_until_.store(MonoTime::Min(), std::memory_order_release);
1112
1113
161k
  const Status unregister_observer_status = queue_->UnRegisterObserver(this);
1114
161k
  if (!unregister_observer_status.IsNotFound()) {
1115
10.9k
    RETURN_NOT_OK(unregister_observer_status);
1116
10.9k
  }
1117
  // Deregister ourselves from the queue. We don't care what get's replicated, since
1118
  // we're stepping down.
1119
161k
  queue_->SetNonLeaderMode();
1120
1121
161k
  peer_manager_->Close();
1122
1123
  // TODO: https://github.com/yugabyte/yugabyte-db/issues/5522. Add unit tests for this metric.
1124
  // We update the follower lag metric timestamp here because it's possible that a leader
1125
  // that step downs could get partitioned before it receives any replicate message. If we
1126
  // don't update the timestamp here, and the above scenario happens, the metric will keep the
1127
  // uint64_t max value, which would make the metric return a 0 lag every time it is queried,
1128
  // even though that's not the case.
1129
161k
  follower_last_update_time_ms_metric_->UpdateTimestampInMilliseconds(
1130
161k
      clock_->Now().GetPhysicalValueMicros() / 1000);
1131
161k
  is_raft_leader_metric_->set_value(0);
1132
1133
161k
  return Status::OK();
1134
161k
}
1135
1136
195
Status RaftConsensus::TEST_Replicate(const ConsensusRoundPtr& round) {
1137
195
  return ReplicateBatch({ round });
1138
195
}
1139
1140
4.93M
Status RaftConsensus::ReplicateBatch(const ConsensusRounds& rounds) {
1141
4.93M
  size_t processed_rounds = 0;
1142
4.93M
  auto status = DoReplicateBatch(rounds, &processed_rounds);
1143
4.93M
  if (!status.ok()) {
1144
84
    
VLOG_WITH_PREFIX_AND_FUNC0
(1)
1145
0
        << "Failed with status " << status << ", treating all " << rounds.size()
1146
0
        << " operations as failed with that status";
1147
    // Treat all the operations in the batch as failed.
1148
147
    for (size_t i = rounds.size(); i != processed_rounds;) {
1149
63
      rounds[--i]->callback()->ReplicationFailed(status);
1150
63
    }
1151
84
  }
1152
4.93M
  return status;
1153
4.93M
}
1154
1155
4.93M
Status RaftConsensus::DoReplicateBatch(const ConsensusRounds& rounds, size_t* processed_rounds) {
1156
4.93M
  RETURN_NOT_OK(ExecuteHook(PRE_REPLICATE));
1157
4.93M
  {
1158
4.93M
    ReplicaState::UniqueLock lock;
1159
4.93M
#ifndef NDEBUG
1160
5.04M
    for (const auto& round : rounds) {
1161
18.4E
      DCHECK(!round->replicate_msg()->has_id()) << "Should not have an OpId yet: "
1162
18.4E
                                                << round->replicate_msg()->DebugString();
1163
5.04M
    }
1164
4.93M
#endif
1165
4.93M
    RETURN_NOT_OK(state_->LockForReplicate(&lock));
1166
4.93M
    auto current_term = state_->GetCurrentTermUnlocked();
1167
4.93M
    if (current_term == delayed_step_down_.term) {
1168
15
      return STATUS(Aborted, "Rejecting because of planned step down");
1169
15
    }
1170
1171
5.04M
    
for (const auto& round : rounds)4.93M
{
1172
5.04M
      RETURN_NOT_OK(round->CheckBoundTerm(current_term));
1173
5.04M
    }
1174
4.93M
    auto status = AppendNewRoundsToQueueUnlocked(rounds, processed_rounds);
1175
4.93M
    if (!status.ok()) {
1176
      // In general we have 3 kinds of rounds in case of failure:
1177
      // 1) Rounds that were rejected by retryable requests.
1178
      //    We should not call ReplicationFinished for them.
1179
      // 2) Rounds that were registered with retryable requests.
1180
      //    We should call state_->NotifyReplicationFinishedUnlocked for them.
1181
      // 3) Rounds that were not registered with retryable requests.
1182
      //    We should call ReplicationFinished directly for them. Could do it after releasing
1183
      //    the lock. I.e. in ReplicateBatch.
1184
      //
1185
      // (3) is all rounds starting with index *processed_rounds and above.
1186
      // For (1) we reset bound term, so could use it to distinguish between (1) and (2).
1187
58
      for (size_t i = *processed_rounds; i != 0;) {
1188
23
        --i;
1189
23
        if (rounds[i]->bound_term() == OpId::kUnknownTerm) {
1190
          // Already rejected by retryable requests.
1191
0
          continue;
1192
0
        }
1193
23
        state_->NotifyReplicationFinishedUnlocked(
1194
23
            rounds[i], status, OpId::kUnknownTerm, /* applied_op_ids */ nullptr);
1195
23
      }
1196
35
      return status;
1197
35
    }
1198
4.93M
  }
1199
1200
4.93M
  peer_manager_->SignalRequest(RequestTriggerMode::kNonEmptyOnly);
1201
4.93M
  RETURN_NOT_OK(ExecuteHook(POST_REPLICATE));
1202
4.93M
  return Status::OK();
1203
4.93M
}
1204
1205
67.9k
Status RaftConsensus::AppendNewRoundToQueueUnlocked(const scoped_refptr<ConsensusRound>& round) {
1206
67.9k
  size_t processed_rounds = 0;
1207
67.9k
  return AppendNewRoundsToQueueUnlocked({ round }, &processed_rounds);
1208
67.9k
}
1209
1210
4.99M
Status RaftConsensus::CheckLeasesUnlocked(const ConsensusRoundPtr& round) {
1211
4.99M
  auto op_type = round->replicate_msg()->op_type();
1212
  // When we do not have a hybrid time leader lease we allow 2 operation types to be added to RAFT.
1213
  // NO_OP - because even empty heartbeat messages could be used to obtain the lease.
1214
  // CHANGE_CONFIG_OP - because we should be able to update consensus even w/o lease.
1215
  // Both of them are safe, since they don't affect user reads or writes.
1216
4.99M
  if (IsConsensusOnlyOperation(op_type)) {
1217
67.9k
    return Status::OK();
1218
67.9k
  }
1219
1220
4.92M
  auto lease_status = state_->GetHybridTimeLeaseStatusAtUnlocked(
1221
4.92M
      HybridTime(round->replicate_msg()->hybrid_time()).GetPhysicalValueMicros());
1222
4.92M
  static_assert(LeaderLeaseStatus_ARRAYSIZE == 3, "Please update logic below to adapt new state");
1223
4.92M
  if (lease_status == LeaderLeaseStatus::OLD_LEADER_MAY_HAVE_LEASE) {
1224
0
    return STATUS_FORMAT(LeaderHasNoLease,
1225
0
                         "Old leader may have hybrid time lease, while adding: $0",
1226
0
                         OperationType_Name(op_type));
1227
0
  }
1228
4.92M
  lease_status = state_->GetLeaderLeaseStatusUnlocked(nullptr);
1229
4.92M
  if (lease_status == LeaderLeaseStatus::OLD_LEADER_MAY_HAVE_LEASE) {
1230
0
    return STATUS_FORMAT(LeaderHasNoLease,
1231
0
                         "Old leader may have lease, while adding: $0",
1232
0
                         OperationType_Name(op_type));
1233
0
  }
1234
1235
4.92M
  return Status::OK();
1236
4.92M
}
1237
1238
Status RaftConsensus::AppendNewRoundsToQueueUnlocked(
1239
4.99M
    const ConsensusRounds& rounds, size_t* processed_rounds) {
1240
4.99M
  SCHECK(!rounds.empty(), InvalidArgument, "Attempted to add zero rounds to the queue");
1241
1242
4.99M
  auto role = state_->GetActiveRoleUnlocked();
1243
4.99M
  if (role != PeerRole::LEADER) {
1244
12
    return STATUS_FORMAT(IllegalState, "Appending new rounds while not the leader but $0",
1245
12
                         PeerRole_Name(role));
1246
12
  }
1247
1248
4.99M
  std::vector<ReplicateMsgPtr> replicate_msgs;
1249
4.99M
  replicate_msgs.reserve(rounds.size());
1250
4.99M
  const OpId& committed_op_id = state_->GetCommittedOpIdUnlocked();
1251
1252
5.11M
  for (const auto& round : rounds) {
1253
5.11M
    ++*processed_rounds;
1254
1255
5.11M
    if (round->replicate_msg()->op_type() == OperationType::WRITE_OP &&
1256
5.11M
        
!state_->RegisterRetryableRequest(round)3.02M
) {
1257
1
      round->BindToTerm(OpId::kUnknownTerm); // Mark round as non replicating
1258
1
      continue;
1259
1
    }
1260
1261
5.11M
    OpId op_id = state_->NewIdUnlocked();
1262
1263
    // We use this callback to transform write operations by substituting the hybrid_time into
1264
    // the write batch inside the write operation.
1265
    //
1266
    // TODO: we could allocate multiple HybridTimes in batch, only reading system clock once.
1267
5.11M
    round->callback()->AddedToLeader(op_id, committed_op_id);
1268
1269
5.11M
    Status s = state_->AddPendingOperation(round, OperationMode::kLeader);
1270
5.11M
    if (!s.ok()) {
1271
23
      RollbackIdAndDeleteOpId(round->replicate_msg(), false /* should_exists */);
1272
1273
      // Iterate rounds in the reverse order and release ids.
1274
23
      while (!replicate_msgs.empty()) {
1275
0
        RollbackIdAndDeleteOpId(replicate_msgs.back(), true /* should_exists */);
1276
0
        replicate_msgs.pop_back();
1277
0
      }
1278
23
      return s;
1279
23
    }
1280
1281
5.11M
    replicate_msgs.push_back(round->replicate_msg());
1282
5.11M
  }
1283
1284
4.99M
  if (replicate_msgs.empty()) {
1285
1
    return Status::OK();
1286
1
  }
1287
1288
  // Could check lease just for the latest operation in batch, because it will have greatest
1289
  // hybrid time, so requires most advanced lease.
1290
4.99M
  auto s = CheckLeasesUnlocked(rounds.back());
1291
1292
4.99M
  if (s.ok()) {
1293
4.99M
    s = queue_->AppendOperations(replicate_msgs, committed_op_id, state_->Clock().Now());
1294
4.99M
  }
1295
1296
  // Handle Status::ServiceUnavailable(), which means the queue is full.
1297
  // TODO: what are we doing about other errors here? Should we also release OpIds in those cases?
1298
4.99M
  if (PREDICT_FALSE(!s.ok())) {
1299
0
    LOG_WITH_PREFIX(WARNING) << "Could not append replicate request: " << s << ", queue status: "
1300
0
                             << queue_->ToString();
1301
0
    for (auto iter = replicate_msgs.rbegin(); iter != replicate_msgs.rend(); ++iter) {
1302
0
      RollbackIdAndDeleteOpId(*iter, true /* should_exists */);
1303
      // TODO Possibly evict a dangling peer from the configuration here.
1304
      // TODO count of number of ops failed due to consensus queue overflow.
1305
0
    }
1306
1307
0
    return s.CloneAndPrepend("Unable to append operations to consensus queue");
1308
0
  }
1309
1310
4.99M
  state_->UpdateLastReceivedOpIdUnlocked(replicate_msgs.back()->id());
1311
4.99M
  return Status::OK();
1312
4.99M
}
1313
1314
void RaftConsensus::MajorityReplicatedNumSSTFilesChanged(
1315
2.06k
    uint64_t majority_replicated_num_sst_files) {
1316
2.06k
  majority_num_sst_files_.store(majority_replicated_num_sst_files, std::memory_order_release);
1317
2.06k
}
1318
1319
void RaftConsensus::UpdateMajorityReplicated(
1320
    const MajorityReplicatedData& majority_replicated_data, OpId* committed_op_id,
1321
34.7M
    OpId* last_applied_op_id) {
1322
34.7M
  TEST_PAUSE_IF_FLAG(TEST_pause_update_majority_replicated);
1323
34.7M
  ReplicaState::UniqueLock lock;
1324
34.7M
  Status s = state_->LockForMajorityReplicatedIndexUpdate(&lock);
1325
34.7M
  if (PREDICT_FALSE(!s.ok())) {
1326
128
    LOG_WITH_PREFIX(WARNING)
1327
128
        << "Unable to take state lock to update committed index: "
1328
128
        << s.ToString();
1329
128
    return;
1330
128
  }
1331
1332
34.7M
  EnumBitSet<SetMajorityReplicatedLeaseExpirationFlag> flags;
1333
34.7M
  if (GetAtomicFlag(&FLAGS_enable_lease_revocation)) {
1334
34.7M
    if (!state_->old_leader_lease().holder_uuid.empty() &&
1335
34.7M
        
queue_->PeerAcceptedOurLease(state_->old_leader_lease().holder_uuid)31.7k
) {
1336
10.2k
      flags.Set(SetMajorityReplicatedLeaseExpirationFlag::kResetOldLeaderLease);
1337
10.2k
    }
1338
1339
34.7M
    if (!state_->old_leader_ht_lease().holder_uuid.empty() &&
1340
34.7M
        
queue_->PeerAcceptedOurLease(state_->old_leader_ht_lease().holder_uuid)281k
) {
1341
10.7k
      flags.Set(SetMajorityReplicatedLeaseExpirationFlag::kResetOldLeaderHtLease);
1342
10.7k
    }
1343
34.7M
  }
1344
1345
34.7M
  state_->SetMajorityReplicatedLeaseExpirationUnlocked(majority_replicated_data, flags);
1346
34.7M
  leader_lease_wait_cond_.notify_all();
1347
1348
34.7M
  
VLOG_WITH_PREFIX15.0k
(1) << "Marking majority replicated up to "
1349
15.0k
      << majority_replicated_data.ToString();
1350
34.7M
  TRACE("Marking majority replicated up to $0", majority_replicated_data.op_id.ToString());
1351
34.7M
  bool committed_index_changed = false;
1352
34.7M
  s = state_->UpdateMajorityReplicatedUnlocked(
1353
34.7M
      majority_replicated_data.op_id, committed_op_id, &committed_index_changed,
1354
34.7M
      last_applied_op_id);
1355
34.7M
  auto leader_state = state_->GetLeaderStateUnlocked();
1356
34.7M
  if (leader_state.ok() && 
leader_state.status == LeaderStatus::LEADER_AND_READY34.6M
) {
1357
34.6M
    state_->context()->MajorityReplicated();
1358
34.6M
  }
1359
34.7M
  if (PREDICT_FALSE(!s.ok())) {
1360
0
    string msg = Format("Unable to mark committed up to $0: $1", majority_replicated_data.op_id, s);
1361
0
    TRACE(msg);
1362
0
    LOG_WITH_PREFIX(WARNING) << msg;
1363
0
    return;
1364
0
  }
1365
1366
34.7M
  majority_num_sst_files_.store(majority_replicated_data.num_sst_files, std::memory_order_release);
1367
1368
34.7M
  if (!majority_replicated_data.peer_got_all_ops.empty() &&
1369
34.7M
      
delayed_step_down_.term == state_->GetCurrentTermUnlocked()23.3M
&&
1370
34.7M
      
majority_replicated_data.peer_got_all_ops == delayed_step_down_.protege187
) {
1371
75
    LOG_WITH_PREFIX(INFO) << "Protege synchronized: " << delayed_step_down_.ToString();
1372
75
    const auto* peer = FindPeer(state_->GetActiveConfigUnlocked(), delayed_step_down_.protege);
1373
75
    if (peer) {
1374
75
      WARN_NOT_OK(StartStepDownUnlocked(*peer, delayed_step_down_.graceful),
1375
75
                  "Start step down failed");
1376
75
    }
1377
75
    delayed_step_down_.term = OpId::kUnknownTerm;
1378
75
  }
1379
1380
34.7M
  if (committed_index_changed &&
1381
34.7M
      
state_->GetActiveRoleUnlocked() == PeerRole::LEADER4.68M
) {
1382
    // If all operations were just committed, and we don't have pending operations, then
1383
    // we write an empty batch that contains committed index.
1384
    // This affects only our local log, because followers have different logic in this scenario.
1385
4.69M
    if (*committed_op_id == state_->GetLastReceivedOpIdUnlocked()) {
1386
4.28M
      auto status = queue_->AppendOperations({}, *committed_op_id, state_->Clock().Now());
1387
18.4E
      LOG_IF_WITH_PREFIX(DFATAL, !status.ok() && !status.IsServiceUnavailable())
1388
18.4E
          << "Failed to append empty batch: " << status;
1389
4.28M
    }
1390
1391
4.69M
    lock.unlock();
1392
    // No need to hold the lock while calling SignalRequest.
1393
4.69M
    peer_manager_->SignalRequest(RequestTriggerMode::kNonEmptyOnly);
1394
4.69M
  }
1395
34.7M
}
1396
1397
0
void RaftConsensus::AppendEmptyBatchToLeaderLog() {
1398
0
  auto lock = state_->LockForRead();
1399
0
  auto committed_op_id = state_->GetCommittedOpIdUnlocked();
1400
0
  if (committed_op_id == state_->GetLastReceivedOpIdUnlocked()) {
1401
0
    auto status = queue_->AppendOperations({}, committed_op_id, state_->Clock().Now());
1402
0
    LOG_IF_WITH_PREFIX(DFATAL, !status.ok()) << "Failed to append empty batch: " << status;
1403
0
  }
1404
0
}
1405
1406
801
void RaftConsensus::NotifyTermChange(int64_t term) {
1407
801
  ReplicaState::UniqueLock lock;
1408
801
  Status s = state_->LockForConfigChange(&lock);
1409
801
  if (PREDICT_FALSE(!s.ok())) {
1410
0
    LOG_WITH_PREFIX(WARNING) << "Unable to lock ReplicaState for config change"
1411
0
                             << " when notified of new term " << term << ": " << s;
1412
0
    return;
1413
0
  }
1414
801
  WARN_NOT_OK(HandleTermAdvanceUnlocked(term), "Couldn't advance consensus term.");
1415
801
}
1416
1417
void RaftConsensus::NotifyFailedFollower(const string& uuid,
1418
                                         int64_t term,
1419
330
                                         const std::string& reason) {
1420
  // Common info used in all of the log messages within this method.
1421
330
  string fail_msg = Substitute("Processing failure of peer $0 in term $1 ($2): ",
1422
330
                               uuid, term, reason);
1423
1424
330
  if (!FLAGS_evict_failed_followers) {
1425
0
    LOG_WITH_PREFIX(INFO) << fail_msg << "Eviction of failed followers is disabled. Doing nothing.";
1426
0
    return;
1427
0
  }
1428
1429
330
  RaftConfigPB committed_config;
1430
330
  {
1431
330
    auto lock = state_->LockForRead();
1432
1433
330
    int64_t current_term = state_->GetCurrentTermUnlocked();
1434
330
    if (current_term != term) {
1435
0
      LOG_WITH_PREFIX(INFO) << fail_msg << "Notified about a follower failure in "
1436
0
                            << "previous term " << term << ", but a leader election "
1437
0
                            << "likely occurred since the failure was detected. "
1438
0
                            << "Doing nothing.";
1439
0
      return;
1440
0
    }
1441
1442
330
    if (state_->IsConfigChangePendingUnlocked()) {
1443
84
      LOG_WITH_PREFIX(INFO) << fail_msg << "There is already a config change operation "
1444
84
                            << "in progress. Unable to evict follower until it completes. "
1445
84
                            << "Doing nothing.";
1446
84
      return;
1447
84
    }
1448
246
    committed_config = state_->GetCommittedConfigUnlocked();
1449
246
  }
1450
1451
  // Run config change on thread pool after dropping ReplicaState lock.
1452
246
  WARN_NOT_OK(raft_pool_token_->SubmitFunc(std::bind(&RaftConsensus::TryRemoveFollowerTask,
1453
246
                                               shared_from_this(), uuid, committed_config, reason)),
1454
246
              state_->LogPrefix() + "Unable to start RemoteFollowerTask");
1455
246
}
1456
1457
void RaftConsensus::TryRemoveFollowerTask(const string& uuid,
1458
                                          const RaftConfigPB& committed_config,
1459
246
                                          const std::string& reason) {
1460
246
  ChangeConfigRequestPB req;
1461
246
  req.set_tablet_id(tablet_id());
1462
246
  req.mutable_server()->set_permanent_uuid(uuid);
1463
246
  req.set_type(REMOVE_SERVER);
1464
246
  req.set_cas_config_opid_index(committed_config.opid_index());
1465
246
  LOG_WITH_PREFIX(INFO)
1466
246
      << "Attempting to remove follower " << uuid << " from the Raft config at commit index "
1467
246
      << committed_config.opid_index() << ". Reason: " << reason;
1468
246
  boost::optional<TabletServerErrorPB::Code> error_code;
1469
246
  WARN_NOT_OK(ChangeConfig(req, &DoNothingStatusCB, &error_code),
1470
246
              state_->LogPrefix() + "Unable to remove follower " + uuid);
1471
246
}
1472
1473
Status RaftConsensus::Update(ConsensusRequestPB* request,
1474
                             ConsensusResponsePB* response,
1475
25.5M
                             CoarseTimePoint deadline) {
1476
25.5M
  if (PREDICT_FALSE(FLAGS_TEST_follower_reject_update_consensus_requests)) {
1477
329
    return STATUS(IllegalState, "Rejected: --TEST_follower_reject_update_consensus_requests "
1478
329
                                "is set to true.");
1479
329
  }
1480
25.5M
  TEST_PAUSE_IF_FLAG(TEST_follower_pause_update_consensus_requests);
1481
1482
25.5M
  auto reject_mode = reject_mode_.load(std::memory_order_acquire);
1483
25.5M
  if (reject_mode != RejectMode::kNone) {
1484
0
    if (reject_mode == RejectMode::kAll ||
1485
0
        (reject_mode == RejectMode::kNonEmpty && !request->ops().empty())) {
1486
0
      auto result = STATUS_FORMAT(IllegalState, "Rejected because of reject mode: $0",
1487
0
                                  ToString(reject_mode));
1488
0
      LOG_WITH_PREFIX(INFO) << result;
1489
0
      return result;
1490
0
    }
1491
0
    LOG_WITH_PREFIX(INFO) << "Accepted: " << request->ShortDebugString();
1492
0
  }
1493
1494
25.5M
  if (PREDICT_FALSE(FLAGS_TEST_follower_reject_update_consensus_requests_seconds > 0)) {
1495
385
    if (MonoTime::Now() < withold_replica_updates_until_) {
1496
385
      LOG(INFO) << "Rejecting Update for tablet: " << tablet_id()
1497
385
                << " tserver uuid: " << peer_uuid();
1498
385
      return STATUS_SUBSTITUTE(IllegalState,
1499
385
          "Rejected: --TEST_follower_reject_update_consensus_requests_seconds is set to $0",
1500
385
          FLAGS_TEST_follower_reject_update_consensus_requests_seconds);
1501
385
    }
1502
385
  }
1503
1504
25.5M
  RETURN_NOT_OK(ExecuteHook(PRE_UPDATE));
1505
25.5M
  response->set_responder_uuid(state_->GetPeerUuid());
1506
1507
25.5M
  
VLOG_WITH_PREFIX32.0k
(2) << "Replica received request: " << request->ShortDebugString()32.0k
;
1508
1509
25.5M
  UpdateReplicaResult result;
1510
25.5M
  {
1511
    // see var declaration
1512
25.5M
    auto wait_start = CoarseMonoClock::now();
1513
25.5M
    auto wait_duration = deadline != CoarseTimePoint::max() ? 
deadline - wait_start25.5M
1514
25.5M
                                                            : 
CoarseDuration::max()22.1k
;
1515
25.5M
    auto lock = LockMutex(&update_mutex_, wait_duration);
1516
25.5M
    if (!lock.owns_lock()) {
1517
1.20k
      return STATUS_FORMAT(TimedOut, "Unable to lock update mutex for $0", wait_duration);
1518
1.20k
    }
1519
1520
25.5M
    LongOperationTracker operation_tracker("UpdateReplica", 1s);
1521
25.5M
    result = 
VERIFY_RESULT25.5M
(25.5M
UpdateReplica(request, response));
1522
1523
0
    auto delay = TEST_delay_update_.load(std::memory_order_acquire);
1524
25.5M
    if (delay != MonoDelta::kZero) {
1525
0
      std::this_thread::sleep_for(delay.ToSteadyDuration());
1526
0
    }
1527
25.5M
  }
1528
1529
  // Release the lock while we wait for the log append to finish so that commits can go through.
1530
25.5M
  if (!result.wait_for_op_id.empty()) {
1531
8.27M
    RETURN_NOT_OK(WaitForWrites(result.current_term, result.wait_for_op_id));
1532
8.27M
  }
1533
1534
25.5M
  if (PREDICT_FALSE(VLOG_IS_ON(2))) {
1535
0
    VLOG_WITH_PREFIX(2) << "Replica updated. "
1536
0
        << state_->ToString() << " Request: " << request->ShortDebugString();
1537
0
  }
1538
1539
  // If an election pending on a specific op id and it has just been committed, start it now.
1540
  // StartElection will ensure the pending election will be started just once only even if
1541
  // UpdateReplica happens in multiple threads in parallel.
1542
25.5M
  if (result.start_election) {
1543
0
    RETURN_NOT_OK(StartElection(
1544
0
        {consensus::ElectionMode::ELECT_EVEN_IF_LEADER_IS_ALIVE, true /* pending_commit */}));
1545
0
  }
1546
1547
25.5M
  RETURN_NOT_OK(ExecuteHook(POST_UPDATE));
1548
25.5M
  return Status::OK();
1549
25.5M
}
1550
1551
Status RaftConsensus::StartReplicaOperationUnlocked(
1552
9.41M
    const ReplicateMsgPtr& msg, HybridTime propagated_safe_time) {
1553
9.41M
  if (IsConsensusOnlyOperation(msg->op_type())) {
1554
133k
    return StartConsensusOnlyRoundUnlocked(msg);
1555
133k
  }
1556
1557
9.28M
  if (PREDICT_FALSE(FLAGS_TEST_follower_fail_all_prepare)) {
1558
1
    return STATUS(IllegalState, "Rejected: --TEST_follower_fail_all_prepare "
1559
1
                                "is set to true.");
1560
1
  }
1561
1562
9.28M
  
VLOG_WITH_PREFIX3.00k
(1) << "Starting operation: " << msg->id().ShortDebugString()3.00k
;
1563
9.28M
  scoped_refptr<ConsensusRound> round(new ConsensusRound(this, msg));
1564
9.28M
  ConsensusRound* round_ptr = round.get();
1565
9.28M
  RETURN_NOT_OK(state_->context()->StartReplicaOperation(round, propagated_safe_time));
1566
9.28M
  auto result = state_->AddPendingOperation(round_ptr, OperationMode::kFollower);
1567
9.28M
  if (!result.ok()) {
1568
0
    round_ptr->NotifyReplicationFinished(result, OpId::kUnknownTerm, /* applied_op_ids */ nullptr);
1569
0
  }
1570
9.28M
  return result;
1571
9.28M
}
1572
1573
34
std::string RaftConsensus::LeaderRequest::OpsRangeString() const {
1574
34
  std::string ret;
1575
34
  ret.reserve(100);
1576
34
  ret.push_back('[');
1577
34
  if (!messages.empty()) {
1578
7
    const OpIdPB& first_op = (*messages.begin())->id();
1579
7
    const OpIdPB& last_op = (*messages.rbegin())->id();
1580
7
    strings::SubstituteAndAppend(&ret, "$0.$1-$2.$3",
1581
7
                                 first_op.term(), first_op.index(),
1582
7
                                 last_op.term(), last_op.index());
1583
7
  }
1584
34
  ret.push_back(']');
1585
34
  return ret;
1586
34
}
1587
1588
Status RaftConsensus::DeduplicateLeaderRequestUnlocked(ConsensusRequestPB* rpc_req,
1589
25.5M
                                                       LeaderRequest* deduplicated_req) {
1590
25.5M
  const auto& last_committed = state_->GetCommittedOpIdUnlocked();
1591
1592
  // The leader's preceding id.
1593
25.5M
  deduplicated_req->preceding_op_id = yb::OpId::FromPB(rpc_req->preceding_id());
1594
1595
25.5M
  int64_t dedup_up_to_index = state_->GetLastReceivedOpIdUnlocked().index;
1596
1597
25.5M
  deduplicated_req->first_message_idx = -1;
1598
1599
  // In this loop we discard duplicates and advance the leader's preceding id
1600
  // accordingly.
1601
34.9M
  for (int i = 0; i < rpc_req->ops_size(); 
i++9.41M
) {
1602
9.41M
    ReplicateMsg* leader_msg = rpc_req->mutable_ops(i);
1603
1604
9.41M
    if (leader_msg->id().index() <= last_committed.index) {
1605
603
      
VLOG_WITH_PREFIX0
(2) << "Skipping op id " << leader_msg->id()
1606
0
                          << " (already committed)";
1607
603
      deduplicated_req->preceding_op_id = yb::OpId::FromPB(leader_msg->id());
1608
603
      continue;
1609
603
    }
1610
1611
9.41M
    if (leader_msg->id().index() <= dedup_up_to_index) {
1612
      // If the index is uncommitted and below our match index, then it must be in the
1613
      // pendings set.
1614
114
      scoped_refptr<ConsensusRound> round =
1615
114
          state_->GetPendingOpByIndexOrNullUnlocked(leader_msg->id().index());
1616
114
      if (!round) {
1617
        // Could happen if we received outdated leader request. So should just reject it.
1618
0
        return STATUS_FORMAT(IllegalState, "Round not found for index: $0",
1619
0
                             leader_msg->id().index());
1620
0
      }
1621
1622
      // If the OpIds match, i.e. if they have the same term and id, then this is just
1623
      // duplicate, we skip...
1624
114
      if (OpIdEquals(round->replicate_msg()->id(), leader_msg->id())) {
1625
8
        
VLOG_WITH_PREFIX0
(2) << "Skipping op id " << leader_msg->id()
1626
0
                            << " (already replicated)";
1627
8
        deduplicated_req->preceding_op_id = yb::OpId::FromPB(leader_msg->id());
1628
8
        continue;
1629
8
      }
1630
1631
      // ... otherwise we must adjust our match index, i.e. all messages from now on
1632
      // are "new"
1633
106
      dedup_up_to_index = leader_msg->id().index();
1634
106
    }
1635
1636
9.41M
    if (deduplicated_req->first_message_idx == -1) {
1637
8.27M
      deduplicated_req->first_message_idx = i;
1638
8.27M
    }
1639
9.41M
    deduplicated_req->messages.emplace_back(leader_msg);
1640
9.41M
  }
1641
1642
25.5M
  if (deduplicated_req->messages.size() != implicit_cast<size_t>(rpc_req->ops_size())) {
1643
34
    LOG_WITH_PREFIX(INFO) << "Deduplicated request from leader. Original: "
1644
34
                          << rpc_req->preceding_id() << "->" << OpsRangeString(*rpc_req)
1645
34
                          << "   Dedup: " << deduplicated_req->preceding_op_id << "->"
1646
34
                          << deduplicated_req->OpsRangeString()
1647
34
                          << ", known committed: " << last_committed << ", received committed: "
1648
34
                          << OpId::FromPB(rpc_req->committed_op_id());
1649
34
  }
1650
1651
25.5M
  return Status::OK();
1652
25.5M
}
1653
1654
Status RaftConsensus::HandleLeaderRequestTermUnlocked(const ConsensusRequestPB* request,
1655
25.5M
                                                      ConsensusResponsePB* response) {
1656
  // Do term checks first:
1657
25.5M
  if (PREDICT_FALSE(request->caller_term() != state_->GetCurrentTermUnlocked())) {
1658
1659
    // If less, reject.
1660
12.1k
    if (request->caller_term() < state_->GetCurrentTermUnlocked()) {
1661
1.56k
      string msg = Substitute("Rejecting Update request from peer $0 for earlier term $1. "
1662
1.56k
                              "Current term is $2. Ops: $3",
1663
1664
1.56k
                              request->caller_uuid(),
1665
1.56k
                              request->caller_term(),
1666
1.56k
                              state_->GetCurrentTermUnlocked(),
1667
1.56k
                              OpsRangeString(*request));
1668
1.56k
      LOG_WITH_PREFIX(INFO) << msg;
1669
1.56k
      FillConsensusResponseError(response,
1670
1.56k
                                 ConsensusErrorPB::INVALID_TERM,
1671
1.56k
                                 STATUS(IllegalState, msg));
1672
1.56k
      return Status::OK();
1673
10.6k
    } else {
1674
10.6k
      RETURN_NOT_OK(HandleTermAdvanceUnlocked(request->caller_term()));
1675
10.6k
    }
1676
12.1k
  }
1677
25.5M
  return Status::OK();
1678
25.5M
}
1679
1680
Status RaftConsensus::EnforceLogMatchingPropertyMatchesUnlocked(const LeaderRequest& req,
1681
25.5M
                                                                ConsensusResponsePB* response) {
1682
1683
25.5M
  bool term_mismatch;
1684
25.5M
  if (state_->IsOpCommittedOrPending(req.preceding_op_id, &term_mismatch)) {
1685
25.4M
    return Status::OK();
1686
25.4M
  }
1687
1688
100k
  string error_msg = Format(
1689
100k
    "Log matching property violated."
1690
100k
    " Preceding OpId in replica: $0. Preceding OpId from leader: $1. ($2 mismatch)",
1691
100k
    state_->GetLastReceivedOpIdUnlocked(), req.preceding_op_id, term_mismatch ? 
"term"65
:
"index"100k
);
1692
1693
100k
  FillConsensusResponseError(response,
1694
100k
                             ConsensusErrorPB::PRECEDING_ENTRY_DIDNT_MATCH,
1695
100k
                             STATUS(IllegalState, error_msg));
1696
1697
100k
  LOG_WITH_PREFIX(INFO) << "Refusing update from remote peer "
1698
100k
                        << req.leader_uuid << ": " << error_msg;
1699
1700
  // If the terms mismatch we abort down to the index before the leader's preceding,
1701
  // since we know that is the last opid that has a chance of not being overwritten.
1702
  // Aborting preemptively here avoids us reporting a last received index that is
1703
  // possibly higher than the leader's causing an avoidable cache miss on the leader's
1704
  // queue.
1705
  //
1706
  // TODO: this isn't just an optimization! if we comment this out, we get
1707
  // failures on raft_consensus-itest a couple percent of the time! Should investigate
1708
  // why this is actually critical to do here, as opposed to just on requests that
1709
  // append some ops.
1710
100k
  if (term_mismatch) {
1711
65
    return state_->AbortOpsAfterUnlocked(req.preceding_op_id.index - 1);
1712
65
  }
1713
1714
100k
  return Status::OK();
1715
100k
}
1716
1717
Status RaftConsensus::CheckLeaderRequestOpIdSequence(
1718
    const LeaderRequest& deduped_req,
1719
25.5M
    ConsensusRequestPB* request) {
1720
25.5M
  Status sequence_check_status;
1721
25.5M
  yb::OpId prev = deduped_req.preceding_op_id;
1722
25.5M
  for (const auto& message : deduped_req.messages) {
1723
9.41M
    auto current = yb::OpId::FromPB(message->id());
1724
9.41M
    sequence_check_status = ReplicaState::CheckOpInSequence(prev, current);
1725
9.41M
    if (PREDICT_FALSE(!sequence_check_status.ok())) {
1726
2
      LOG(ERROR) << "Leader request contained out-of-sequence messages. Status: "
1727
2
          << sequence_check_status.ToString() << ". Leader Request: "
1728
2
          << request->ShortDebugString();
1729
2
      break;
1730
2
    }
1731
9.41M
    prev = current;
1732
9.41M
  }
1733
1734
  // We only release the messages from the request after the above check so that that we can print
1735
  // the original request, if it fails.
1736
25.5M
  if (!deduped_req.messages.empty()) {
1737
    // We take ownership of the deduped ops.
1738
8.27M
    DCHECK_GE(deduped_req.first_message_idx, 0);
1739
8.27M
    request->mutable_ops()->ExtractSubrange(
1740
8.27M
        narrow_cast<int>(deduped_req.first_message_idx),
1741
8.27M
        narrow_cast<int>(deduped_req.messages.size()),
1742
8.27M
        nullptr);
1743
8.27M
  }
1744
1745
  // We don't need request->ops() anymore, so could release them to avoid unnecessary memory
1746
  // consumption.
1747
25.5M
  request->mutable_ops()->Clear();
1748
1749
25.5M
  return sequence_check_status;
1750
25.5M
}
1751
1752
Status RaftConsensus::CheckLeaderRequestUnlocked(ConsensusRequestPB* request,
1753
                                                 ConsensusResponsePB* response,
1754
25.5M
                                                 LeaderRequest* deduped_req) {
1755
25.5M
  RETURN_NOT_OK(DeduplicateLeaderRequestUnlocked(request, deduped_req));
1756
1757
  // This is an additional check for KUDU-639 that makes sure the message's index
1758
  // and term are in the right sequence in the request, after we've deduplicated
1759
  // them. We do this before we change any of the internal state.
1760
  //
1761
  // TODO move this to raft_consensus-state or whatever we transform that into.
1762
  // We should be able to do this check for each append, but right now the way
1763
  // we initialize raft_consensus-state is preventing us from doing so.
1764
25.5M
  RETURN_NOT_OK(CheckLeaderRequestOpIdSequence(*deduped_req, request));
1765
1766
25.5M
  RETURN_NOT_OK(HandleLeaderRequestTermUnlocked(request, response));
1767
1768
25.5M
  if (response->status().has_error()) {
1769
1.56k
    return Status::OK();
1770
1.56k
  }
1771
1772
25.5M
  RETURN_NOT_OK(EnforceLogMatchingPropertyMatchesUnlocked(*deduped_req, response));
1773
1774
25.5M
  if (response->status().has_error()) {
1775
120k
    return Status::OK();
1776
120k
  }
1777
1778
  // If the first of the messages to apply is not in our log, either it follows the last
1779
  // received message or it replaces some in-flight.
1780
25.4M
  if (!deduped_req->messages.empty()) {
1781
8.27M
    auto first_id = yb::OpId::FromPB(deduped_req->messages[0]->id());
1782
8.27M
    bool term_mismatch;
1783
8.27M
    if (state_->IsOpCommittedOrPending(first_id, &term_mismatch)) {
1784
0
      return STATUS_FORMAT(IllegalState,
1785
0
                           "First deduped message $0 is committed or pending",
1786
0
                           first_id);
1787
0
    }
1788
1789
    // If the index is in our log but the terms are not the same abort down to the leader's
1790
    // preceding id.
1791
8.27M
    if (term_mismatch) {
1792
      // Since we are holding the lock ApplyPendingOperationsUnlocked would be invoked between
1793
      // those two.
1794
103
      RETURN_NOT_OK(state_->AbortOpsAfterUnlocked(deduped_req->preceding_op_id.index));
1795
103
      RETURN_NOT_OK(log_->ResetLastSyncedEntryOpId(deduped_req->preceding_op_id));
1796
103
    }
1797
8.27M
  }
1798
1799
  // If all of the above logic was successful then we can consider this to be
1800
  // the effective leader of the configuration. If they are not currently marked as
1801
  // the leader locally, mark them as leader now.
1802
25.4M
  const string& caller_uuid = request->caller_uuid();
1803
25.4M
  if (PREDICT_FALSE(state_->HasLeaderUnlocked() &&
1804
25.4M
                    state_->GetLeaderUuidUnlocked() != caller_uuid)) {
1805
0
    LOG_WITH_PREFIX(FATAL)
1806
0
        << "Unexpected new leader in same term! "
1807
0
        << "Existing leader UUID: " << state_->GetLeaderUuidUnlocked() << ", "
1808
0
        << "new leader UUID: " << caller_uuid;
1809
0
  }
1810
25.4M
  if (PREDICT_FALSE(!state_->HasLeaderUnlocked())) {
1811
120k
    SetLeaderUuidUnlocked(caller_uuid);
1812
120k
  }
1813
1814
25.4M
  return Status::OK();
1815
25.4M
}
1816
1817
Result<RaftConsensus::UpdateReplicaResult> RaftConsensus::UpdateReplica(
1818
25.5M
    ConsensusRequestPB* request, ConsensusResponsePB* response) {
1819
25.5M
  TRACE_EVENT2("consensus", "RaftConsensus::UpdateReplica",
1820
25.5M
               "peer", peer_uuid(),
1821
25.5M
               "tablet", tablet_id());
1822
1823
25.5M
  if (request->has_propagated_hybrid_time()) {
1824
25.5M
    clock_->Update(HybridTime(request->propagated_hybrid_time()));
1825
25.5M
  }
1826
1827
  // The ordering of the following operations is crucial, read on for details.
1828
  //
1829
  // The main requirements explained in more detail below are:
1830
  //
1831
  //   1) We must enqueue the prepares before we write to our local log.
1832
  //   2) If we were able to enqueue a prepare then we must be able to log it.
1833
  //   3) If we fail to enqueue a prepare, we must not attempt to enqueue any
1834
  //      later-indexed prepare or apply.
1835
  //
1836
  // See below for detailed rationale.
1837
  //
1838
  // The steps are:
1839
  //
1840
  // 0 - Dedup
1841
  //
1842
  // We make sure that we don't do anything on Replicate operations we've already received in a
1843
  // previous call. This essentially makes this method idempotent.
1844
  //
1845
  // 1 - We mark as many pending operations as committed as we can.
1846
  //
1847
  // We may have some pending operations that, according to the leader, are now
1848
  // committed. We Apply them early, because:
1849
  // - Soon (step 2) we may reject the call due to excessive memory pressure. One
1850
  //   way to relieve the pressure is by flushing the MRS, and applying these
1851
  //   operations may unblock an in-flight Flush().
1852
  // - The Apply and subsequent Prepares (step 2) can take place concurrently.
1853
  //
1854
  // 2 - We enqueue the Prepare of the operations.
1855
  //
1856
  // The actual prepares are enqueued in order but happen asynchronously so we don't
1857
  // have decoding/acquiring locks on the critical path.
1858
  //
1859
  // We need to do this now for a number of reasons:
1860
  // - Prepares, by themselves, are inconsequential, i.e. they do not mutate the
1861
  //   state machine so, were we to crash afterwards, having the prepares in-flight
1862
  //   won't hurt.
1863
  // - Prepares depend on factors external to consensus (the operation drivers and
1864
  //   the tablet peer) so if for some reason they cannot be enqueued we must know
1865
  //   before we try write them to the WAL. Once enqueued, we assume that prepare will
1866
  //   always succeed on a replica operation (because the leader already prepared them
1867
  //   successfully, and thus we know they are valid).
1868
  // - The prepares corresponding to every operation that was logged must be in-flight
1869
  //   first. This because should we need to abort certain operations (say a new leader
1870
  //   says they are not committed) we need to have those prepares in-flight so that
1871
  //   the operations can be continued (in the abort path).
1872
  // - Failure to enqueue prepares is OK, we can continue and let the leader know that
1873
  //   we only went so far. The leader will re-send the remaining messages.
1874
  // - Prepares represent new operations, and operations consume memory. Thus, if the
1875
  //   overall memory pressure on the server is too high, we will reject the prepares.
1876
  //
1877
  // 3 - We enqueue the writes to the WAL.
1878
  //
1879
  // We enqueue writes to the WAL, but only the operations that were successfully
1880
  // enqueued for prepare (for the reasons introduced above). This means that even
1881
  // if a prepare fails to enqueue, if any of the previous prepares were successfully
1882
  // submitted they must be written to the WAL.
1883
  // If writing to the WAL fails, we're in an inconsistent state and we crash. In this
1884
  // case, no one will ever know of the operations we previously prepared so those are
1885
  // inconsequential.
1886
  //
1887
  // 4 - We mark the operations as committed.
1888
  //
1889
  // For each operation which has been committed by the leader, we update the
1890
  // operation state to reflect that. If the logging has already succeeded for that
1891
  // operation, this will trigger the Apply phase. Otherwise, Apply will be triggered
1892
  // when the logging completes. In both cases the Apply phase executes asynchronously.
1893
  // This must, of course, happen after the prepares have been triggered as the same batch
1894
  // can both replicate/prepare and commit/apply an operation.
1895
  //
1896
  // Currently, if a prepare failed to enqueue we still trigger all applies for operations
1897
  // with an id lower than it (if we have them). This is important now as the leader will
1898
  // not re-send those commit messages. This will be moot when we move to the commit
1899
  // commitIndex way of doing things as we can simply ignore the applies as we know
1900
  // they will be triggered with the next successful batch.
1901
  //
1902
  // 5 - We wait for the writes to be durable.
1903
  //
1904
  // Before replying to the leader we wait for the writes to be durable. We then
1905
  // just update the last replicated watermark and respond.
1906
  //
1907
  // TODO - These failure scenarios need to be exercised in an unit
1908
  //        test. Moreover we need to add more fault injection spots (well that
1909
  //        and actually use them) for each of these steps.
1910
25.5M
  TRACE("Updating replica for $0 ops", request->ops_size());
1911
1912
  // The deduplicated request.
1913
25.5M
  LeaderRequest deduped_req;
1914
1915
25.5M
  ReplicaState::UniqueLock lock;
1916
25.5M
  RETURN_NOT_OK(state_->LockForUpdate(&lock));
1917
1918
25.5M
  const auto old_leader = state_->GetLeaderUuidUnlocked();
1919
1920
25.5M
  auto prev_committed_op_id = state_->GetCommittedOpIdUnlocked();
1921
1922
25.5M
  deduped_req.leader_uuid = request->caller_uuid();
1923
1924
25.5M
  RETURN_NOT_OK(CheckLeaderRequestUnlocked(request, response, &deduped_req));
1925
1926
25.5M
  if (response->status().has_error()) {
1927
122k
    LOG_WITH_PREFIX(INFO)
1928
122k
        << "Returning from UpdateConsensus because of error: " << AsString(response->status());
1929
    // We had an error, like an invalid term, we still fill the response.
1930
122k
    FillConsensusResponseOKUnlocked(response);
1931
122k
    return UpdateReplicaResult();
1932
122k
  }
1933
1934
25.4M
  TEST_PAUSE_IF_FLAG(TEST_pause_update_replica);
1935
1936
  // Snooze the failure detector as soon as we decide to accept the message.
1937
  // We are guaranteed to be acting as a FOLLOWER at this point by the above
1938
  // sanity check.
1939
25.4M
  SnoozeFailureDetector(DO_NOT_LOG);
1940
1941
25.4M
  auto now = MonoTime::Now();
1942
1943
  // Update the expiration time of the current leader's lease, so that when this follower becomes
1944
  // a leader, it can wait out the time interval while the old leader might still be active.
1945
25.4M
  if (request->has_leader_lease_duration_ms()) {
1946
25.4M
    state_->UpdateOldLeaderLeaseExpirationOnNonLeaderUnlocked(
1947
25.4M
        CoarseTimeLease(deduped_req.leader_uuid,
1948
25.4M
                        CoarseMonoClock::now() + request->leader_lease_duration_ms() * 1ms),
1949
25.4M
        PhysicalComponentLease(deduped_req.leader_uuid, request->ht_lease_expiration()));
1950
25.4M
  }
1951
1952
  // Also prohibit voting for anyone for the minimum election timeout.
1953
25.4M
  withhold_votes_until_.store(now + MinimumElectionTimeout(), std::memory_order_release);
1954
1955
  // 1 - Early commit pending (and committed) operations
1956
25.4M
  RETURN_NOT_OK(EarlyCommitUnlocked(*request, deduped_req));
1957
1958
  // 2 - Enqueue the prepares
1959
25.4M
  if (!VERIFY_RESULT(EnqueuePreparesUnlocked(*request, &deduped_req, response))) {
1960
1
    return UpdateReplicaResult();
1961
1
  }
1962
1963
25.4M
  if (deduped_req.committed_op_id.index < prev_committed_op_id.index) {
1964
75
    deduped_req.committed_op_id = prev_committed_op_id;
1965
75
  }
1966
1967
  // 3 - Enqueue the writes.
1968
25.4M
  auto last_from_leader = EnqueueWritesUnlocked(
1969
25.4M
      deduped_req, WriteEmpty(prev_committed_op_id != deduped_req.committed_op_id));
1970
1971
  // 4 - Mark operations as committed
1972
25.4M
  RETURN_NOT_OK(MarkOperationsAsCommittedUnlocked(*request, deduped_req, last_from_leader));
1973
1974
  // Fill the response with the current state. We will not mutate anymore state until
1975
  // we actually reply to the leader, we'll just wait for the messages to be durable.
1976
25.4M
  FillConsensusResponseOKUnlocked(response);
1977
1978
25.4M
  UpdateReplicaResult result;
1979
1980
  // Check if there is an election pending and the op id pending upon has just been committed.
1981
25.4M
  const auto& pending_election_op_id = state_->GetPendingElectionOpIdUnlocked();
1982
25.4M
  result.start_election =
1983
25.4M
      !pending_election_op_id.empty() &&
1984
25.4M
      
pending_election_op_id.index <= state_->GetCommittedOpIdUnlocked().index0
;
1985
1986
25.4M
  if (!deduped_req.messages.empty()) {
1987
8.28M
    result.wait_for_op_id = state_->GetLastReceivedOpIdUnlocked();
1988
8.28M
  }
1989
25.4M
  result.current_term = state_->GetCurrentTermUnlocked();
1990
1991
25.4M
  uint64_t update_time_ms = 0;
1992
25.4M
  if (request->has_propagated_hybrid_time()) {
1993
25.4M
    update_time_ms =  HybridTime::FromPB(
1994
25.4M
        request->propagated_hybrid_time()).GetPhysicalValueMicros() / 1000;
1995
25.4M
  } else 
if (3.56k
!deduped_req.messages.empty()3.56k
) {
1996
112
    update_time_ms = HybridTime::FromPB(
1997
112
        deduped_req.messages.back()->hybrid_time()).GetPhysicalValueMicros() / 1000;
1998
112
  }
1999
25.4M
  follower_last_update_time_ms_metric_->UpdateTimestampInMilliseconds(
2000
25.4M
      (update_time_ms > 0 ? 
update_time_ms25.4M
:
clock_->Now().GetPhysicalValueMicros() / 10005.97k
));
2001
25.4M
  TRACE("UpdateReplica() finished");
2002
25.4M
  return result;
2003
25.4M
}
2004
2005
Status RaftConsensus::EarlyCommitUnlocked(const ConsensusRequestPB& request,
2006
25.4M
                                          const LeaderRequest& deduped_req) {
2007
  // What should we commit?
2008
  // 1. As many pending operations as we can, except...
2009
  // 2. ...if we commit beyond the preceding index, we'd regress KUDU-639
2010
  //    ("Leader doesn't overwrite demoted follower's log properly"), and...
2011
  // 3. ...the leader's committed index is always our upper bound.
2012
25.4M
  auto early_apply_up_to = state_->GetLastPendingOperationOpIdUnlocked();
2013
25.4M
  if (deduped_req.preceding_op_id.index < early_apply_up_to.index) {
2014
5
    early_apply_up_to = deduped_req.preceding_op_id;
2015
5
  }
2016
25.4M
  if (request.committed_op_id().index() < early_apply_up_to.index) {
2017
40.8k
    early_apply_up_to = yb::OpId::FromPB(request.committed_op_id());
2018
40.8k
  }
2019
2020
25.4M
  
VLOG_WITH_PREFIX244
(1) << "Early marking committed up to " << early_apply_up_to244
;
2021
25.4M
  TRACE("Early marking committed up to $0.$1", early_apply_up_to.term, early_apply_up_to.index);
2022
25.4M
  return ResultToStatus(state_->AdvanceCommittedOpIdUnlocked(early_apply_up_to, CouldStop::kTrue));
2023
25.4M
}
2024
2025
Result<bool> RaftConsensus::EnqueuePreparesUnlocked(const ConsensusRequestPB& request,
2026
                                                    LeaderRequest* deduped_req_ptr,
2027
25.4M
                                                    ConsensusResponsePB* response) {
2028
25.4M
  LeaderRequest& deduped_req = *deduped_req_ptr;
2029
25.4M
  TRACE("Triggering prepare for $0 ops", deduped_req.messages.size());
2030
2031
25.4M
  Status prepare_status;
2032
25.4M
  auto iter = deduped_req.messages.begin();
2033
2034
25.4M
  if (PREDICT_TRUE(!deduped_req.messages.empty())) {
2035
    // TODO Temporary until the leader explicitly propagates the safe hybrid_time.
2036
    // TODO: what if there is a failure here because the updated time is too far in the future?
2037
8.27M
    clock_->Update(HybridTime(deduped_req.messages.back()->hybrid_time()));
2038
8.27M
  }
2039
2040
25.4M
  HybridTime propagated_safe_time;
2041
25.4M
  if (request.has_propagated_safe_time()) {
2042
25.4M
    propagated_safe_time = HybridTime(request.propagated_safe_time());
2043
25.4M
    if (deduped_req.messages.empty()) {
2044
17.1M
      state_->context()->SetPropagatedSafeTime(propagated_safe_time);
2045
17.1M
    }
2046
25.4M
  }
2047
2048
25.4M
  if (iter != deduped_req.messages.end()) {
2049
9.41M
    for (;;) {
2050
9.41M
      const ReplicateMsgPtr& msg = *iter;
2051
9.41M
      ++iter;
2052
9.41M
      bool last = iter == deduped_req.messages.end();
2053
9.41M
      prepare_status = StartReplicaOperationUnlocked(
2054
9.41M
          msg, last ? 
propagated_safe_time8.27M
:
HybridTime::kInvalid1.13M
);
2055
9.41M
      if (PREDICT_FALSE(!prepare_status.ok())) {
2056
322
        --iter;
2057
322
        LOG_WITH_PREFIX(WARNING) << "StartReplicaOperationUnlocked failed: " << prepare_status;
2058
322
        break;
2059
322
      }
2060
9.41M
      if (last) {
2061
8.28M
        break;
2062
8.28M
      }
2063
9.41M
    }
2064
8.27M
  }
2065
2066
  // If we stopped before reaching the end we failed to prepare some message(s) and need
2067
  // to perform cleanup, namely trimming deduped_req.messages to only contain the messages
2068
  // that were actually prepared, and deleting the other ones since we've taken ownership
2069
  // when we first deduped.
2070
25.4M
  bool incomplete = iter != deduped_req.messages.end();
2071
25.4M
  if (incomplete) {
2072
322
    {
2073
322
      const ReplicateMsgPtr msg = *iter;
2074
322
      LOG_WITH_PREFIX(WARNING)
2075
322
          << "Could not prepare operation for op: "
2076
322
          << msg->id() << ". Suppressed " << (deduped_req.messages.end() - iter - 1)
2077
322
          << " other warnings. Status for this op: " << prepare_status;
2078
322
      deduped_req.messages.erase(iter, deduped_req.messages.end());
2079
322
    }
2080
2081
    // If this is empty, it means we couldn't prepare a single de-duped message. There is nothing
2082
    // else we can do. The leader will detect this and retry later.
2083
322
    if (deduped_req.messages.empty()) {
2084
1
      auto msg = Format("Rejecting Update request from peer $0 for term $1. "
2085
1
                        "Could not prepare a single operation due to: $2",
2086
1
                        request.caller_uuid(),
2087
1
                        request.caller_term(),
2088
1
                        prepare_status);
2089
1
      LOG_WITH_PREFIX(INFO) << msg;
2090
1
      FillConsensusResponseError(response, ConsensusErrorPB::CANNOT_PREPARE,
2091
1
                                 STATUS(IllegalState, msg));
2092
1
      FillConsensusResponseOKUnlocked(response);
2093
1
      return false;
2094
1
    }
2095
322
  }
2096
2097
25.4M
  deduped_req.committed_op_id = yb::OpId::FromPB(request.committed_op_id());
2098
25.4M
  if (!deduped_req.messages.empty()) {
2099
8.28M
    auto last_op_id = yb::OpId::FromPB(deduped_req.messages.back()->id());
2100
8.28M
    if (deduped_req.committed_op_id > last_op_id) {
2101
299
      
LOG_IF_WITH_PREFIX0
(DFATAL, !incomplete)
2102
0
          << "Received committed op id: " << deduped_req.committed_op_id
2103
0
          << ", past last known op id: " << last_op_id;
2104
2105
      // It is possible that we failed to prepare of of messages,
2106
      // so limit committed op id to avoid having committed op id past last known op it.
2107
299
      deduped_req.committed_op_id = last_op_id;
2108
299
    }
2109
8.28M
  }
2110
2111
25.4M
  return true;
2112
25.4M
}
2113
2114
yb::OpId RaftConsensus::EnqueueWritesUnlocked(const LeaderRequest& deduped_req,
2115
25.4M
                                              WriteEmpty write_empty) {
2116
  // Now that we've triggered the prepares enqueue the operations to be written
2117
  // to the WAL.
2118
25.4M
  if (PREDICT_TRUE(!deduped_req.messages.empty()) || 
write_empty17.1M
) {
2119
    // Trigger the log append asap, if fsync() is on this might take a while
2120
    // and we can't reply until this is done.
2121
    //
2122
    // Since we've prepared, we need to be able to append (or we risk trying to apply
2123
    // later something that wasn't logged). We crash if we can't.
2124
15.0M
    CHECK_OK(queue_->AppendOperations(
2125
15.0M
        deduped_req.messages, deduped_req.committed_op_id, state_->Clock().Now()));
2126
15.0M
  }
2127
2128
25.4M
  return !deduped_req.messages.empty() ?
2129
17.1M
      
yb::OpId::FromPB(deduped_req.messages.back()->id())8.28M
: deduped_req.preceding_op_id;
2130
25.4M
}
2131
2132
8.27M
Status RaftConsensus::WaitForWrites(int64_t term, const OpId& wait_for_op_id) {
2133
  // 5 - We wait for the writes to be durable.
2134
2135
  // Note that this is safe because dist consensus now only supports a single outstanding
2136
  // request at a time and this way we can allow commits to proceed while we wait.
2137
8.27M
  TRACE("Waiting on the replicates to finish logging");
2138
8.27M
  TRACE_EVENT0("consensus", "Wait for log");
2139
8.27M
  for (;;) {
2140
8.27M
    auto wait_result = log_->WaitForSafeOpIdToApply(
2141
8.27M
        wait_for_op_id, MonoDelta::FromMilliseconds(FLAGS_raft_heartbeat_interval_ms));
2142
    // If just waiting for our log append to finish lets snooze the timer.
2143
    // We don't want to fire leader election because we're waiting on our own log.
2144
8.28M
    if (
!wait_result.empty()8.27M
) {
2145
8.28M
      break;
2146
8.28M
    }
2147
18.4E
    int64_t new_term;
2148
18.4E
    {
2149
18.4E
      auto lock = state_->LockForRead();
2150
18.4E
      new_term = state_->GetCurrentTermUnlocked();
2151
18.4E
    }
2152
18.4E
    if (term != new_term) {
2153
0
      return STATUS_FORMAT(IllegalState, "Term changed to $0 while waiting for writes in term $1",
2154
0
                           new_term, term);
2155
0
    }
2156
2157
18.4E
    SnoozeFailureDetector(DO_NOT_LOG);
2158
2159
18.4E
    const auto election_timeout_at = MonoTime::Now() + MinimumElectionTimeout();
2160
18.4E
    UpdateAtomicMax(&withhold_votes_until_, election_timeout_at);
2161
18.4E
  }
2162
8.27M
  TRACE("Finished waiting on the replicates to finish logging");
2163
2164
8.27M
  return Status::OK();
2165
8.27M
}
2166
2167
Status RaftConsensus::MarkOperationsAsCommittedUnlocked(const ConsensusRequestPB& request,
2168
                                                        const LeaderRequest& deduped_req,
2169
25.4M
                                                        yb::OpId last_from_leader) {
2170
  // Choose the last operation to be applied. This will either be 'committed_index', if
2171
  // no prepare enqueuing failed, or the minimum between 'committed_index' and the id of
2172
  // the last successfully enqueued prepare, if some prepare failed to enqueue.
2173
25.4M
  yb::OpId apply_up_to;
2174
25.4M
  if (last_from_leader.index < request.committed_op_id().index()) {
2175
    // we should never apply anything later than what we received in this request
2176
300
    apply_up_to = last_from_leader;
2177
2178
300
    LOG_WITH_PREFIX(INFO)
2179
300
        << "Received commit index " << request.committed_op_id()
2180
300
        << " from the leader but only marked up to " << apply_up_to << " as committed.";
2181
25.4M
  } else {
2182
25.4M
    apply_up_to = yb::OpId::FromPB(request.committed_op_id());
2183
25.4M
  }
2184
2185
  // We can now update the last received watermark.
2186
  //
2187
  // We do it here (and before we actually hear back from the wal whether things
2188
  // are durable) so that, if we receive another, possible duplicate, message
2189
  // that exercises this path we don't handle these messages twice.
2190
  //
2191
  // If any messages failed to be started locally, then we already have removed them
2192
  // from 'deduped_req' at this point. So, we can simply update our last-received
2193
  // watermark to the last message that remains in 'deduped_req'.
2194
  //
2195
  // It's possible that the leader didn't send us any new data -- it might be a completely
2196
  // duplicate request. In that case, we don't need to update LastReceived at all.
2197
25.4M
  if (!deduped_req.messages.empty()) {
2198
8.28M
    OpIdPB last_appended = deduped_req.messages.back()->id();
2199
8.28M
    TRACE(Substitute("Updating last received op as $0", last_appended.ShortDebugString()));
2200
8.28M
    state_->UpdateLastReceivedOpIdUnlocked(last_appended);
2201
17.1M
  } else if (state_->GetLastReceivedOpIdUnlocked().index < deduped_req.preceding_op_id.index) {
2202
0
    return STATUS_FORMAT(InvalidArgument,
2203
0
                         "Bad preceding_opid: $0, last received: $1",
2204
0
                         deduped_req.preceding_op_id,
2205
0
                         state_->GetLastReceivedOpIdUnlocked());
2206
0
  }
2207
2208
18.4E
  VLOG_WITH_PREFIX(1) << "Marking committed up to " << apply_up_to;
2209
25.4M
  TRACE(Format("Marking committed up to $0", apply_up_to));
2210
25.4M
  return ResultToStatus(state_->AdvanceCommittedOpIdUnlocked(apply_up_to, CouldStop::kTrue));
2211
25.4M
}
2212
2213
25.5M
void RaftConsensus::FillConsensusResponseOKUnlocked(ConsensusResponsePB* response) {
2214
25.5M
  TRACE("Filling consensus response to leader.");
2215
25.5M
  response->set_responder_term(state_->GetCurrentTermUnlocked());
2216
25.5M
  state_->GetLastReceivedOpIdUnlocked().ToPB(response->mutable_status()->mutable_last_received());
2217
25.5M
  state_->GetLastReceivedOpIdCurLeaderUnlocked().ToPB(
2218
25.5M
      response->mutable_status()->mutable_last_received_current_leader());
2219
25.5M
  response->mutable_status()->set_last_committed_idx(state_->GetCommittedOpIdUnlocked().index);
2220
25.5M
  state_->GetLastAppliedOpIdUnlocked().ToPB(response->mutable_status()->mutable_last_applied());
2221
25.5M
}
2222
2223
void RaftConsensus::FillConsensusResponseError(ConsensusResponsePB* response,
2224
                                               ConsensusErrorPB::Code error_code,
2225
122k
                                               const Status& status) {
2226
122k
  ConsensusErrorPB* error = response->mutable_status()->mutable_error();
2227
122k
  error->set_code(error_code);
2228
122k
  StatusToPB(status, error->mutable_status());
2229
122k
}
2230
2231
1.36M
Status RaftConsensus::RequestVote(const VoteRequestPB* request, VoteResponsePB* response) {
2232
1.36M
  TRACE_EVENT2("consensus", "RaftConsensus::RequestVote",
2233
1.36M
               "peer", peer_uuid(),
2234
1.36M
               "tablet", tablet_id());
2235
1.36M
  bool preelection = request->preelection();
2236
2237
1.36M
  response->set_responder_uuid(state_->GetPeerUuid());
2238
1.36M
  response->set_preelection(preelection);
2239
2240
  // We must acquire the update lock in order to ensure that this vote action
2241
  // takes place between requests.
2242
  // Lock ordering: The update lock must be acquired before the ReplicaState lock.
2243
1.36M
  std::unique_lock<decltype(update_mutex_)> update_guard(update_mutex_, std::defer_lock);
2244
1.36M
  if (FLAGS_enable_leader_failure_detection) {
2245
1.36M
    update_guard.try_lock();
2246
1.36M
  } else {
2247
    // If failure detection is not enabled, then we can't just reject the vote,
2248
    // because there will be no automatic retry later. So, block for the lock.
2249
770
    update_guard.lock();
2250
770
  }
2251
1.36M
  if (!update_guard.owns_lock()) {
2252
    // There is another vote or update concurrent with the vote. In that case, that
2253
    // other request is likely to reset the timer, and we'll end up just voting
2254
    // "NO" after waiting. To avoid starving RPC handlers and causing cascading
2255
    // timeouts, just vote a quick NO.
2256
    //
2257
    // We still need to take the state lock in order to respond with term info, etc.
2258
11.1k
    ReplicaState::UniqueLock state_guard;
2259
11.1k
    RETURN_NOT_OK(state_->LockForConfigChange(&state_guard));
2260
11.1k
    return RequestVoteRespondIsBusy(request, response);
2261
11.1k
  }
2262
2263
  // Acquire the replica state lock so we can read / modify the consensus state.
2264
1.35M
  ReplicaState::UniqueLock state_guard;
2265
1.35M
  RETURN_NOT_OK(state_->LockForConfigChange(&state_guard));
2266
2267
  // If the node is not in the configuration, allow the vote (this is required by Raft)
2268
  // but log an informational message anyway.
2269
1.35M
  if (!IsRaftConfigMember(request->candidate_uuid(), state_->GetActiveConfigUnlocked())) {
2270
90
    LOG_WITH_PREFIX(INFO) << "Handling vote request from an unknown peer "
2271
90
                          << request->candidate_uuid();
2272
90
  }
2273
2274
  // If we've heard recently from the leader, then we should ignore the request
2275
  // (except if it is the leader itself requesting a vote -- something that might
2276
  //  happen if the leader were to stepdown and call an election.). Otherwise,
2277
  // it might be from a "disruptive" server. This could happen in a few cases:
2278
  //
2279
  // 1) Network partitions
2280
  // If the leader can talk to a majority of the nodes, but is partitioned from a
2281
  // bad node, the bad node's failure detector will trigger. If the bad node is
2282
  // able to reach other nodes in the cluster, it will continuously trigger elections.
2283
  //
2284
  // 2) An abandoned node
2285
  // It's possible that a node has fallen behind the log GC mark of the leader. In that
2286
  // case, the leader will stop sending it requests. Eventually, the configuration
2287
  // will change to eject the abandoned node, but until that point, we don't want the
2288
  // abandoned follower to disturb the other nodes.
2289
  //
2290
  // See also https://ramcloud.stanford.edu/~ongaro/thesis.pdf
2291
  // section 4.2.3.
2292
1.35M
  MonoTime now = MonoTime::Now();
2293
1.35M
  if (request->candidate_uuid() != state_->GetLeaderUuidUnlocked() &&
2294
1.35M
      
!request->ignore_live_leader()723k
&&
2295
1.35M
      
now < withhold_votes_until_.load(std::memory_order_acquire)545k
) {
2296
8.89k
    return RequestVoteRespondLeaderIsAlive(request, response);
2297
8.89k
  }
2298
2299
  // Candidate is running behind.
2300
1.34M
  if (request->candidate_term() < state_->GetCurrentTermUnlocked()) {
2301
204k
    return RequestVoteRespondInvalidTerm(request, response);
2302
204k
  }
2303
2304
  // We already voted this term.
2305
1.13M
  if (request->candidate_term() == state_->GetCurrentTermUnlocked() &&
2306
1.13M
      
state_->HasVotedCurrentTermUnlocked()62.1k
) {
2307
2308
    // Already voted for the same candidate in the current term.
2309
1.37k
    if (state_->GetVotedForCurrentTermUnlocked() == request->candidate_uuid()) {
2310
209
      return RequestVoteRespondVoteAlreadyGranted(request, response);
2311
209
    }
2312
2313
    // Voted for someone else in current term.
2314
1.16k
    return RequestVoteRespondAlreadyVotedForOther(request, response);
2315
1.37k
  }
2316
2317
  // The term advanced.
2318
1.13M
  if (request->candidate_term() > state_->GetCurrentTermUnlocked() && 
!preelection1.07M
) {
2319
107k
    RETURN_NOT_OK_PREPEND(HandleTermAdvanceUnlocked(request->candidate_term()),
2320
107k
        Substitute("Could not step down in RequestVote. Current term: $0, candidate term: $1",
2321
107k
            state_->GetCurrentTermUnlocked(), request->candidate_term()));
2322
107k
  }
2323
2324
  // Candidate must have last-logged OpId at least as large as our own to get our vote.
2325
1.13M
  OpIdPB local_last_logged_opid;
2326
1.13M
  GetLatestOpIdFromLog().ToPB(&local_last_logged_opid);
2327
1.13M
  if (OpIdLessThan(request->candidate_status().last_received(), local_last_logged_opid)) {
2328
433
    return RequestVoteRespondLastOpIdTooOld(local_last_logged_opid, request, response);
2329
433
  }
2330
2331
1.13M
  if (!preelection) {
2332
    // Clear the pending election op id if any before granting the vote. If another peer jumps in
2333
    // before we can catch up and start the election, let's not disrupt the quorum with another
2334
    // election.
2335
107k
    state_->ClearPendingElectionOpIdUnlocked();
2336
107k
  }
2337
2338
1.13M
  auto remaining_old_leader_lease = state_->RemainingOldLeaderLeaseDuration();
2339
2340
1.13M
  if (remaining_old_leader_lease.Initialized()) {
2341
20.7k
    response->set_remaining_leader_lease_duration_ms(
2342
20.7k
        narrow_cast<int32_t>(remaining_old_leader_lease.ToMilliseconds()));
2343
20.7k
    response->set_leader_lease_uuid(state_->old_leader_lease().holder_uuid);
2344
20.7k
  }
2345
2346
1.13M
  const auto& old_leader_ht_lease = state_->old_leader_ht_lease();
2347
1.13M
  if (old_leader_ht_lease) {
2348
803k
    response->set_leader_ht_lease_expiration(old_leader_ht_lease.expiration);
2349
803k
    response->set_leader_ht_lease_uuid(old_leader_ht_lease.holder_uuid);
2350
803k
  }
2351
2352
  // Passed all our checks. Vote granted.
2353
1.13M
  if (preelection) {
2354
1.02M
    LOG_WITH_PREFIX(INFO) << "Pre-election. Granting vote for candidate "
2355
1.02M
                          << request->candidate_uuid() << " in term " << request->candidate_term();
2356
1.02M
    FillVoteResponseVoteGranted(*request, response);
2357
1.02M
    return Status::OK();
2358
1.02M
  }
2359
2360
108k
  return RequestVoteRespondVoteGranted(request, response);
2361
1.13M
}
2362
2363
Status RaftConsensus::IsLeaderReadyForChangeConfigUnlocked(ChangeConfigType type,
2364
2.91M
                                                           const string& server_uuid) {
2365
2.91M
  const RaftConfigPB& active_config = state_->GetActiveConfigUnlocked();
2366
2.91M
  size_t servers_in_transition = 0;
2367
2.91M
  if (type == ADD_SERVER) {
2368
2.53k
    servers_in_transition = CountServersInTransition(active_config);
2369
2.91M
  } else if (type == REMOVE_SERVER) {
2370
    // If we are trying to remove the server in transition, then servers_in_transition shouldn't
2371
    // count it so we can proceed with the operation.
2372
2.18k
    servers_in_transition = CountServersInTransition(active_config, server_uuid);
2373
2.18k
  }
2374
2375
  // Check that all the following requirements are met:
2376
  // 1. We are required by Raft to reject config change operations until we have
2377
  //    committed at least one operation in our current term as leader.
2378
  //    See https://groups.google.com/forum/#!topic/raft-dev/t4xj6dJTP6E
2379
  // 2. Ensure there is no other pending change config.
2380
  // 3. There are no peers that are in the process of becoming VOTERs or OBSERVERs.
2381
2.91M
  if (!state_->AreCommittedAndCurrentTermsSameUnlocked() ||
2382
2.91M
      state_->IsConfigChangePendingUnlocked() ||
2383
2.91M
      
servers_in_transition != 06.18k
) {
2384
2.91M
    return STATUS_FORMAT(IllegalState,
2385
2.91M
                         "Leader is not ready for Config Change, can try again. "
2386
2.91M
                         "Num peers in transit: $0. Type: $1. Has opid: $2. Committed config: $3. "
2387
2.91M
                         "Pending config: $4. Current term: $5. Committed op id: $6.",
2388
2.91M
                         servers_in_transition, ChangeConfigType_Name(type),
2389
2.91M
                         active_config.has_opid_index(),
2390
2.91M
                         state_->GetCommittedConfigUnlocked().ShortDebugString(),
2391
2.91M
                         state_->IsConfigChangePendingUnlocked() ?
2392
2.91M
                             state_->GetPendingConfigUnlocked().ShortDebugString() : "",
2393
2.91M
                         state_->GetCurrentTermUnlocked(), state_->GetCommittedOpIdUnlocked());
2394
2.91M
  }
2395
2396
5.92k
  return Status::OK();
2397
2.91M
}
2398
2399
Status RaftConsensus::ChangeConfig(const ChangeConfigRequestPB& req,
2400
                                   const StdStatusCallback& client_cb,
2401
2.91M
                                   boost::optional<TabletServerErrorPB::Code>* error_code) {
2402
2.91M
  if (PREDICT_FALSE(!req.has_type())) {
2403
0
    return STATUS(InvalidArgument, "Must specify 'type' argument to ChangeConfig()",
2404
0
                                   req.ShortDebugString());
2405
0
  }
2406
2.91M
  if (PREDICT_FALSE(!req.has_server())) {
2407
0
    *error_code = TabletServerErrorPB::INVALID_CONFIG;
2408
0
    return STATUS(InvalidArgument, "Must specify 'server' argument to ChangeConfig()",
2409
0
                                   req.ShortDebugString());
2410
0
  }
2411
2.91M
  YB_LOG_EVERY_N(INFO, FLAGS_TEST_log_change_config_every_n)
2412
8.39k
      << "Received ChangeConfig request " << req.ShortDebugString();
2413
2.91M
  ChangeConfigType type = req.type();
2414
2.91M
  bool use_hostport = req.has_use_host() && 
req.use_host()23
;
2415
2416
2.91M
  if (type != REMOVE_SERVER && 
use_hostport2.91M
) {
2417
0
    return STATUS_SUBSTITUTE(InvalidArgument, "Cannot set use_host for change config type $0, "
2418
0
                             "only allowed with REMOVE_SERVER.", type);
2419
0
  }
2420
2421
2.91M
  if (PREDICT_FALSE(FLAGS_TEST_return_error_on_change_config != 0.0 && type == CHANGE_ROLE)) {
2422
0
    DCHECK(FLAGS_TEST_return_error_on_change_config >= 0.0 &&
2423
0
           FLAGS_TEST_return_error_on_change_config <= 1.0);
2424
0
    if (clock_->Now().ToUint64() % 100 < 100 * FLAGS_TEST_return_error_on_change_config) {
2425
0
      return STATUS(IllegalState, "Returning error for unit test");
2426
0
    }
2427
0
  }
2428
2.91M
  RaftPeerPB* new_peer = nullptr;
2429
2.91M
  const RaftPeerPB& server = req.server();
2430
2.91M
  if (
!use_hostport2.91M
&& !server.has_permanent_uuid()) {
2431
0
    return STATUS(InvalidArgument,
2432
0
                  Substitute("server must have permanent_uuid or use_host specified: $0",
2433
0
                             req.ShortDebugString()));
2434
0
  }
2435
2.91M
  {
2436
2.91M
    ReplicaState::UniqueLock lock;
2437
2.91M
    RETURN_NOT_OK(state_->LockForConfigChange(&lock));
2438
2.91M
    Status s = state_->CheckActiveLeaderUnlocked(LeaderLeaseCheckMode::DONT_NEED_LEASE);
2439
2.91M
    if (!s.ok()) {
2440
817
      *error_code = TabletServerErrorPB::NOT_THE_LEADER;
2441
817
      return s;
2442
817
    }
2443
2444
18.4E
    
const string& server_uuid = 2.91M
server.has_permanent_uuid()2.91M
?
server.permanent_uuid()2.91M
: "";
2445
2.91M
    s = IsLeaderReadyForChangeConfigUnlocked(type, server_uuid);
2446
2.91M
    if (!s.ok()) {
2447
2.91M
      YB_LOG_EVERY_N(INFO, FLAGS_TEST_log_change_config_every_n)
2448
1.79k
          << "Returning not ready for " << ChangeConfigType_Name(type)
2449
1.79k
          << " due to error " << s.ToString();
2450
2.91M
      *error_code = TabletServerErrorPB::LEADER_NOT_READY_CHANGE_CONFIG;
2451
2.91M
      return s;
2452
2.91M
    }
2453
2454
5.91k
    const RaftConfigPB& committed_config = state_->GetCommittedConfigUnlocked();
2455
2456
    // Support atomic ChangeConfig requests.
2457
5.91k
    if (req.has_cas_config_opid_index()) {
2458
3.75k
      if (committed_config.opid_index() != req.cas_config_opid_index()) {
2459
15
        *error_code = TabletServerErrorPB::CAS_FAILED;
2460
15
        return STATUS(IllegalState, Substitute("Request specified cas_config_opid_index "
2461
15
                                               "of $0 but the committed config has opid_index "
2462
15
                                               "of $1",
2463
15
                                               req.cas_config_opid_index(),
2464
15
                                               committed_config.opid_index()));
2465
15
      }
2466
3.75k
    }
2467
2468
5.89k
    RaftConfigPB new_config = committed_config;
2469
5.89k
    new_config.clear_opid_index();
2470
5.89k
    switch (type) {
2471
2.06k
      case ADD_SERVER:
2472
        // Ensure the server we are adding is not already a member of the configuration.
2473
2.06k
        if (IsRaftConfigMember(server_uuid, committed_config)) {
2474
0
          *error_code = TabletServerErrorPB::ADD_CHANGE_CONFIG_ALREADY_PRESENT;
2475
0
          return STATUS(IllegalState,
2476
0
              Substitute("Server with UUID $0 is already a member of the config. RaftConfig: $1",
2477
0
                        server_uuid, committed_config.ShortDebugString()));
2478
0
        }
2479
2.06k
        if (!server.has_member_type()) {
2480
0
          return STATUS(InvalidArgument,
2481
0
                        Substitute("Server must have member_type specified. Request: $0",
2482
0
                                   req.ShortDebugString()));
2483
0
        }
2484
2.06k
        if (server.member_type() != PeerMemberType::PRE_VOTER &&
2485
2.06k
            
server.member_type() != PeerMemberType::PRE_OBSERVER79
) {
2486
0
          return STATUS(InvalidArgument,
2487
0
              Substitute("Server with UUID $0 must be of member_type PRE_VOTER or PRE_OBSERVER. "
2488
0
                         "member_type received: $1", server_uuid,
2489
0
                         PeerMemberType_Name(server.member_type())));
2490
0
        }
2491
2.06k
        if (server.last_known_private_addr().empty()) {
2492
0
          return STATUS(InvalidArgument, "server must have last_known_addr specified",
2493
0
                                         req.ShortDebugString());
2494
0
        }
2495
2.06k
        new_peer = new_config.add_peers();
2496
2.06k
        *new_peer = server;
2497
2.06k
        break;
2498
2499
1.82k
      case REMOVE_SERVER:
2500
1.82k
        if (use_hostport) {
2501
5
          if (server.last_known_private_addr().empty()) {
2502
0
            return STATUS(InvalidArgument, "Must have last_known_addr specified.",
2503
0
                          req.ShortDebugString());
2504
0
          }
2505
5
          HostPort leader_hp;
2506
5
          RETURN_NOT_OK(GetHostPortFromConfig(
2507
5
              new_config, peer_uuid(), queue_->local_cloud_info(), &leader_hp));
2508
5
          for (const auto& host_port : server.last_known_private_addr()) {
2509
5
            if (leader_hp.port() == host_port.port() && 
leader_hp.host() == host_port.host()0
) {
2510
0
              return STATUS(InvalidArgument, "Cannot remove live leader using hostport.",
2511
0
                            req.ShortDebugString());
2512
0
            }
2513
5
          }
2514
5
        }
2515
1.82k
        if (server_uuid == peer_uuid()) {
2516
30
          *error_code = TabletServerErrorPB::LEADER_NEEDS_STEP_DOWN;
2517
30
          return STATUS(InvalidArgument,
2518
30
              Substitute("Cannot remove peer $0 from the config because it is the leader. "
2519
30
                         "Force another leader to be elected to remove this server. "
2520
30
                         "Active consensus state: $1", server_uuid,
2521
30
                         state_->ConsensusStateUnlocked(CONSENSUS_CONFIG_ACTIVE)
2522
30
                            .ShortDebugString()));
2523
30
        }
2524
1.79k
        if (!RemoveFromRaftConfig(&new_config, req)) {
2525
0
          *error_code = TabletServerErrorPB::REMOVE_CHANGE_CONFIG_NOT_PRESENT;
2526
0
          return STATUS(NotFound,
2527
0
              Substitute("Server with UUID $0 not a member of the config. RaftConfig: $1",
2528
0
                        server_uuid, committed_config.ShortDebugString()));
2529
0
        }
2530
1.79k
        break;
2531
2532
2.01k
      case CHANGE_ROLE:
2533
2.01k
        if (server_uuid == peer_uuid()) {
2534
0
          return STATUS(InvalidArgument,
2535
0
              Substitute("Cannot change role of peer $0 because it is the leader. Force "
2536
0
                         "another leader to be elected. Active consensus state: $1", server_uuid,
2537
0
                         state_->ConsensusStateUnlocked(CONSENSUS_CONFIG_ACTIVE)
2538
0
                             .ShortDebugString()));
2539
0
        }
2540
2.01k
        VLOG
(3) << "config before CHANGE_ROLE: " << new_config.DebugString()0
;
2541
2542
2.01k
        if (!GetMutableRaftConfigMember(&new_config, server_uuid, &new_peer).ok()) {
2543
0
          return STATUS(NotFound,
2544
0
            Substitute("Server with UUID $0 not a member of the config. RaftConfig: $1",
2545
0
                       server_uuid, new_config.ShortDebugString()));
2546
0
        }
2547
2.01k
        if (new_peer->member_type() != PeerMemberType::PRE_OBSERVER &&
2548
2.01k
            
new_peer->member_type() != PeerMemberType::PRE_VOTER1.93k
) {
2549
0
          return STATUS(IllegalState, Substitute("Cannot change role of server with UUID $0 "
2550
0
                                                 "because its member type is $1",
2551
0
                                                 server_uuid, new_peer->member_type()));
2552
0
        }
2553
2.01k
        if (new_peer->member_type() == PeerMemberType::PRE_OBSERVER) {
2554
77
          new_peer->set_member_type(PeerMemberType::OBSERVER);
2555
1.93k
        } else {
2556
1.93k
          new_peer->set_member_type(PeerMemberType::VOTER);
2557
1.93k
        }
2558
2559
2.01k
        VLOG
(3) << "config after CHANGE_ROLE: " << new_config.DebugString()0
;
2560
2.01k
        break;
2561
0
      default:
2562
0
        return STATUS(InvalidArgument, Substitute("Unsupported type $0",
2563
5.89k
                                                  ChangeConfigType_Name(type)));
2564
5.89k
    }
2565
2566
5.87k
    auto cc_replicate = std::make_shared<ReplicateMsg>();
2567
5.87k
    cc_replicate->set_op_type(CHANGE_CONFIG_OP);
2568
5.87k
    ChangeConfigRecordPB* cc_req = cc_replicate->mutable_change_config_record();
2569
5.87k
    cc_req->set_tablet_id(tablet_id());
2570
5.87k
    *cc_req->mutable_old_config() = committed_config;
2571
5.87k
    *cc_req->mutable_new_config() = new_config;
2572
    // Note: This hybrid_time has no meaning from a serialization perspective
2573
    // because this method is not executed on the TabletPeer's prepare thread.
2574
5.87k
    cc_replicate->set_hybrid_time(clock_->Now().ToUint64());
2575
5.87k
    state_->GetCommittedOpIdUnlocked().ToPB(cc_replicate->mutable_committed_op_id());
2576
2577
5.87k
    auto context = std::make_shared<StateChangeContext>(
2578
5.87k
        StateChangeReason::LEADER_CONFIG_CHANGE_COMPLETE, *cc_req,
2579
5.87k
        type == REMOVE_SERVER ? 
server_uuid1.80k
:
""4.07k
);
2580
2581
5.87k
    RETURN_NOT_OK(
2582
5.87k
        ReplicateConfigChangeUnlocked(cc_replicate,
2583
5.87k
                                      new_config,
2584
5.87k
                                      type,
2585
5.87k
                                      std::bind(&RaftConsensus::MarkDirtyOnSuccess,
2586
5.87k
                                           this,
2587
5.87k
                                           std::move(context),
2588
5.87k
                                           std::move(client_cb), std::placeholders::_1)));
2589
5.87k
  }
2590
2591
5.87k
  peer_manager_->SignalRequest(RequestTriggerMode::kNonEmptyOnly);
2592
2593
5.87k
  return Status::OK();
2594
5.87k
}
2595
2596
Status RaftConsensus::UnsafeChangeConfig(
2597
    const UnsafeChangeConfigRequestPB& req,
2598
6
    boost::optional<tserver::TabletServerErrorPB::Code>* error_code) {
2599
6
  if (PREDICT_FALSE(!req.has_new_config())) {
2600
0
    *error_code = TabletServerErrorPB::INVALID_CONFIG;
2601
0
    return STATUS(InvalidArgument, "Request must contain 'new_config' argument "
2602
0
                  "to UnsafeChangeConfig()", yb::ToString(req));
2603
0
  }
2604
6
  if (PREDICT_FALSE(!req.has_caller_id())) {
2605
0
    *error_code = TabletServerErrorPB::INVALID_CONFIG;
2606
0
    return STATUS(InvalidArgument, "Must specify 'caller_id' argument to UnsafeChangeConfig()",
2607
0
                  yb::ToString(req));
2608
0
  }
2609
2610
  // Grab the committed config and current term on this node.
2611
6
  int64_t current_term;
2612
6
  RaftConfigPB committed_config;
2613
6
  OpId last_committed_opid;
2614
6
  OpId preceding_opid;
2615
6
  string local_peer_uuid;
2616
6
  {
2617
    // Take the snapshot of the replica state and queue state so that
2618
    // we can stick them in the consensus update request later.
2619
6
    auto lock = state_->LockForRead();
2620
6
    local_peer_uuid = state_->GetPeerUuid();
2621
6
    current_term = state_->GetCurrentTermUnlocked();
2622
6
    committed_config = state_->GetCommittedConfigUnlocked();
2623
6
    if (state_->IsConfigChangePendingUnlocked()) {
2624
3
      LOG_WITH_PREFIX(WARNING) << "Replica has a pending config, but the new config "
2625
3
                               << "will be unsafely changed anyway. "
2626
3
                               << "Currently pending config on the node: "
2627
3
                               << yb::ToString(state_->GetPendingConfigUnlocked());
2628
3
    }
2629
6
    last_committed_opid = state_->GetCommittedOpIdUnlocked();
2630
6
    preceding_opid = state_->GetLastAppliedOpIdUnlocked();
2631
6
  }
2632
2633
  // Validate that passed replica uuids are part of the committed config
2634
  // on this node.  This allows a manual recovery tool to only have to specify
2635
  // the uuid of each replica in the new config without having to know the
2636
  // addresses of each server (since we can get the address information from
2637
  // the committed config). Additionally, only a subset of the committed config
2638
  // is required for typical cluster repair scenarios.
2639
6
  std::unordered_set<string> retained_peer_uuids;
2640
6
  const RaftConfigPB& config = req.new_config();
2641
7
  for (const RaftPeerPB& new_peer : config.peers()) {
2642
7
    const string& peer_uuid = new_peer.permanent_uuid();
2643
7
    retained_peer_uuids.insert(peer_uuid);
2644
7
    if (!IsRaftConfigMember(peer_uuid, committed_config)) {
2645
0
      *error_code = TabletServerErrorPB::INVALID_CONFIG;
2646
0
      return STATUS(InvalidArgument, Substitute("Peer with uuid $0 is not in the committed  "
2647
0
                                                "config on this replica, rejecting the  "
2648
0
                                                "unsafe config change request for tablet $1. "
2649
0
                                                "Committed config: $2",
2650
0
                                                peer_uuid, req.tablet_id(),
2651
0
                                                yb::ToString(committed_config)));
2652
0
    }
2653
7
  }
2654
2655
6
  RaftConfigPB new_config = committed_config;
2656
18
  for (const auto& peer : committed_config.peers()) {
2657
18
    const string& peer_uuid = peer.permanent_uuid();
2658
18
    if (!ContainsKey(retained_peer_uuids, peer_uuid)) {
2659
11
      ChangeConfigRequestPB req;
2660
11
      req.set_tablet_id(tablet_id());
2661
11
      req.mutable_server()->set_permanent_uuid(peer_uuid);
2662
11
      req.set_type(REMOVE_SERVER);
2663
11
      req.set_cas_config_opid_index(committed_config.opid_index());
2664
11
      CHECK(RemoveFromRaftConfig(&new_config, req));
2665
11
    }
2666
18
  }
2667
  // Check that local peer is part of the new config and is a VOTER.
2668
  // Although it is valid for a local replica to not have itself
2669
  // in the committed config, it is rare and a replica without itself
2670
  // in the latest config is definitely not caught up with the latest leader's log.
2671
6
  if (!IsRaftConfigVoter(local_peer_uuid, new_config)) {
2672
0
    *error_code = TabletServerErrorPB::INVALID_CONFIG;
2673
0
    return STATUS(InvalidArgument, Substitute("Local replica uuid $0 is not "
2674
0
                                              "a VOTER in the new config, "
2675
0
                                              "rejecting the unsafe config "
2676
0
                                              "change request for tablet $1. "
2677
0
                                              "Rejected config: $2" ,
2678
0
                                              local_peer_uuid, req.tablet_id(),
2679
0
                                              yb::ToString(new_config)));
2680
0
  }
2681
6
  new_config.set_unsafe_config_change(true);
2682
6
  int64 replicate_opid_index = preceding_opid.index + 1;
2683
6
  new_config.clear_opid_index();
2684
2685
  // Sanity check the new config.
2686
6
  Status s = VerifyRaftConfig(new_config, UNCOMMITTED_QUORUM);
2687
6
  if (!s.ok()) {
2688
0
    *error_code = TabletServerErrorPB::INVALID_CONFIG;
2689
0
    return STATUS(InvalidArgument, Substitute("The resulting new config for tablet $0  "
2690
0
                                              "from passed parameters has failed raft "
2691
0
                                              "config sanity check: $1",
2692
0
                                              req.tablet_id(), s.ToString()));
2693
0
  }
2694
2695
  // Prepare the consensus request as if the request is being generated
2696
  // from a different leader.
2697
6
  ConsensusRequestPB consensus_req;
2698
6
  consensus_req.set_caller_uuid(req.caller_id());
2699
  // Bumping up the term for the consensus request being generated.
2700
  // This makes this request appear to come from a new leader that
2701
  // the local replica doesn't know about yet. If the local replica
2702
  // happens to be the leader, this will cause it to step down.
2703
6
  const int64 new_term = current_term + 1;
2704
6
  consensus_req.set_caller_term(new_term);
2705
6
  preceding_opid.ToPB(consensus_req.mutable_preceding_id());
2706
6
  last_committed_opid.ToPB(consensus_req.mutable_committed_op_id());
2707
2708
  // Prepare the replicate msg to be replicated.
2709
6
  ReplicateMsg* replicate = consensus_req.add_ops();
2710
6
  ChangeConfigRecordPB* cc_req = replicate->mutable_change_config_record();
2711
6
  cc_req->set_tablet_id(req.tablet_id());
2712
6
  *cc_req->mutable_old_config() = committed_config;
2713
6
  *cc_req->mutable_new_config() = new_config;
2714
6
  OpIdPB* id = replicate->mutable_id();
2715
  // Bumping up both the term and the opid_index from what's found in the log.
2716
6
  id->set_term(new_term);
2717
6
  id->set_index(replicate_opid_index);
2718
6
  replicate->set_op_type(CHANGE_CONFIG_OP);
2719
6
  replicate->set_hybrid_time(clock_->Now().ToUint64());
2720
6
  last_committed_opid.ToPB(replicate->mutable_committed_op_id());
2721
2722
6
  
VLOG_WITH_PREFIX0
(3) << "UnsafeChangeConfig: Generated consensus request: "
2723
0
                      << yb::ToString(consensus_req);
2724
2725
6
  LOG_WITH_PREFIX(WARNING) << "PROCEEDING WITH UNSAFE CONFIG CHANGE ON THIS SERVER, "
2726
6
                           << "COMMITTED CONFIG: " << yb::ToString(committed_config)
2727
6
                           << "NEW CONFIG: " << yb::ToString(new_config);
2728
2729
6
  const auto deadline = CoarseMonoClock::Now() + 15s;  // TODO: fix me
2730
6
  ConsensusResponsePB consensus_resp;
2731
6
  s = Update(&consensus_req, &consensus_resp, deadline);
2732
6
  if (!s.ok() || consensus_resp.has_error()) {
2733
0
    *error_code = TabletServerErrorPB::UNKNOWN_ERROR;
2734
0
  }
2735
6
  if (s.ok() && consensus_resp.has_error()) {
2736
0
    s = StatusFromPB(consensus_resp.error().status());
2737
0
  }
2738
6
  return s;
2739
6
}
2740
2741
151k
void RaftConsensus::Shutdown() {
2742
151k
  LOG_WITH_PREFIX(INFO) << "Shutdown.";
2743
2744
  // Avoid taking locks if already shut down so we don't violate
2745
  // ThreadRestrictions assertions in the case where the RaftConsensus
2746
  // destructor runs on the reactor thread due to an election callback being
2747
  // the last outstanding reference.
2748
151k
  if (shutdown_.Load(kMemOrderAcquire)) 
return75.6k
;
2749
2750
75.6k
  CHECK_OK(ExecuteHook(PRE_SHUTDOWN));
2751
2752
75.6k
  {
2753
75.6k
    ReplicaState::UniqueLock lock;
2754
    // Transition to kShuttingDown state.
2755
75.6k
    CHECK_OK(state_->LockForShutdown(&lock));
2756
75.6k
    step_down_check_tracker_.StartShutdown();
2757
75.6k
  }
2758
75.6k
  step_down_check_tracker_.CompleteShutdown();
2759
2760
  // Close the peer manager.
2761
75.6k
  peer_manager_->Close();
2762
2763
  // We must close the queue after we close the peers.
2764
75.6k
  queue_->Close();
2765
2766
75.6k
  CHECK_OK(state_->CancelPendingOperations());
2767
2768
75.6k
  {
2769
75.6k
    ReplicaState::UniqueLock lock;
2770
75.6k
    CHECK_OK(state_->LockForShutdown(&lock));
2771
75.6k
    CHECK_EQ(ReplicaState::kShuttingDown, state_->state());
2772
75.6k
    CHECK_OK(state_->ShutdownUnlocked());
2773
75.6k
    LOG_WITH_PREFIX(INFO) << "Raft consensus is shut down!";
2774
75.6k
  }
2775
2776
  // Shut down things that might acquire locks during destruction.
2777
75.6k
  raft_pool_token_->Shutdown();
2778
  // We might not have run Start yet, so make sure we have a FD.
2779
75.6k
  if (
failure_detector_75.6k
) {
2780
75.6k
    DisableFailureDetector();
2781
75.6k
  }
2782
2783
75.6k
  CHECK_OK(ExecuteHook(POST_SHUTDOWN));
2784
2785
75.6k
  shutdown_.Store(true, kMemOrderRelease);
2786
75.6k
}
2787
2788
0
PeerRole RaftConsensus::GetActiveRole() const {
2789
0
  auto lock = state_->LockForRead();
2790
0
  return state_->GetActiveRoleUnlocked();
2791
0
}
2792
2793
1.14M
yb::OpId RaftConsensus::GetLatestOpIdFromLog() {
2794
1.14M
  return log_->GetLatestEntryOpId();
2795
1.14M
}
2796
2797
133k
Status RaftConsensus::StartConsensusOnlyRoundUnlocked(const ReplicateMsgPtr& msg) {
2798
133k
  OperationType op_type = msg->op_type();
2799
133k
  if (!IsConsensusOnlyOperation(op_type)) {
2800
0
    return STATUS_FORMAT(InvalidArgument,
2801
0
                         "Expected a consensus-only op type, got $0: $1",
2802
0
                         OperationType_Name(op_type),
2803
0
                         *msg);
2804
0
  }
2805
133k
  
VLOG_WITH_PREFIX10
(1) << "Starting consensus round: "
2806
10
                      << msg->id().ShortDebugString();
2807
133k
  scoped_refptr<ConsensusRound> round(new ConsensusRound(this, msg));
2808
133k
  std::shared_ptr<StateChangeContext> context = nullptr;
2809
2810
  // We are here for NO_OP or CHANGE_CONFIG_OP type ops. We need to set the change record for an
2811
  // actual config change operation. The NO_OP does not update the config, as it is used for a new
2812
  // leader election term change replicate message, which keeps the same config.
2813
133k
  if (IsChangeConfigOperation(op_type)) {
2814
14.0k
    context =
2815
14.0k
      std::make_shared<StateChangeContext>(StateChangeReason::FOLLOWER_CONFIG_CHANGE_COMPLETE,
2816
14.0k
                                           msg->change_config_record());
2817
119k
  } else {
2818
119k
    context = std::make_shared<StateChangeContext>(StateChangeReason::FOLLOWER_NO_OP_COMPLETE);
2819
119k
  }
2820
2821
133k
  StdStatusCallback client_cb =
2822
133k
      std::bind(&RaftConsensus::MarkDirtyOnSuccess,
2823
133k
                this,
2824
133k
                context,
2825
133k
                &DoNothingStatusCB,
2826
133k
                std::placeholders::_1);
2827
133k
  round->SetCallback(MakeNonTrackedRoundCallback(round.get(), std::move(client_cb)));
2828
133k
  return state_->AddPendingOperation(round, OperationMode::kFollower);
2829
133k
}
2830
2831
3.02k
Status RaftConsensus::WaitForLeaderLeaseImprecise(CoarseTimePoint deadline) {
2832
3.02k
  CoarseTimePoint start = CoarseMonoClock::now();
2833
7.26k
  for (;;) {
2834
7.26k
    MonoDelta remaining_old_leader_lease;
2835
7.26k
    LeaderLeaseStatus leader_lease_status;
2836
7.26k
    ReplicaState::State state;
2837
7.26k
    {
2838
7.26k
      auto lock = state_->LockForRead();
2839
7.26k
      state = state_->state();
2840
7.26k
      if (state != ReplicaState::kRunning) {
2841
0
        return STATUS_FORMAT(IllegalState, "Consensus is not running: $0", state);
2842
0
      }
2843
7.26k
      if (state_->GetActiveRoleUnlocked() != PeerRole::LEADER) {
2844
4
        return STATUS_FORMAT(IllegalState, "Not the leader: $0", state_->GetActiveRoleUnlocked());
2845
4
      }
2846
7.25k
      leader_lease_status = state_->GetLeaderLeaseStatusUnlocked(&remaining_old_leader_lease);
2847
7.25k
    }
2848
7.25k
    if (leader_lease_status == LeaderLeaseStatus::HAS_LEASE) {
2849
3.01k
      return Status::OK();
2850
3.01k
    }
2851
4.24k
    CoarseTimePoint now = CoarseMonoClock::now();
2852
4.24k
    if (now > deadline) {
2853
0
      return STATUS_FORMAT(
2854
0
          TimedOut, "Waited for $0 to acquire a leader lease, state $1, lease status: $2",
2855
0
          now - start, state, LeaderLeaseStatus_Name(leader_lease_status));
2856
0
    }
2857
4.24k
    switch (leader_lease_status) {
2858
0
      case LeaderLeaseStatus::HAS_LEASE:
2859
0
        return Status::OK();
2860
4.20k
      case LeaderLeaseStatus::NO_MAJORITY_REPLICATED_LEASE:
2861
4.20k
        {
2862
4.20k
          std::unique_lock<decltype(leader_lease_wait_mtx_)> lock(leader_lease_wait_mtx_);
2863
          // Because we're not taking the same lock (leader_lease_wait_mtx_) when we check the
2864
          // leader lease status, there is a possibility of a race condition when we miss the
2865
          // notification and by this point we already have a lease. Rather than re-taking the
2866
          // ReplicaState lock and re-checking, here we simply block for up to 100ms in that case,
2867
          // because this function is currently (08/14/2017) only used in a context when it is OK,
2868
          // such as catalog manager initialization.
2869
4.20k
          leader_lease_wait_cond_.wait_for(
2870
4.20k
              lock, std::max<MonoDelta>(100ms, deadline - now).ToSteadyDuration());
2871
4.20k
        }
2872
4.20k
        continue;
2873
35
      case LeaderLeaseStatus::OLD_LEADER_MAY_HAVE_LEASE: {
2874
35
        auto wait_deadline = std::min({deadline, now + 100ms, now + remaining_old_leader_lease});
2875
35
        std::this_thread::sleep_until(wait_deadline);
2876
35
      } continue;
2877
4.24k
    }
2878
0
    FATAL_INVALID_ENUM_VALUE(LeaderLeaseStatus, leader_lease_status);
2879
0
  }
2880
3.02k
}
2881
2882
10.3M
Status RaftConsensus::CheckIsActiveLeaderAndHasLease() const {
2883
10.3M
  return state_->CheckIsActiveLeaderAndHasLease();
2884
10.3M
}
2885
2886
Result<MicrosTime> RaftConsensus::MajorityReplicatedHtLeaseExpiration(
2887
73.5M
    MicrosTime min_allowed, CoarseTimePoint deadline) const {
2888
73.5M
  return state_->MajorityReplicatedHtLeaseExpiration(min_allowed, deadline);
2889
73.5M
}
2890
2891
329k
std::string RaftConsensus::GetRequestVoteLogPrefix(const VoteRequestPB& request) const {
2892
329k
  return Format("$0 Leader $1election vote request",
2893
329k
                state_->LogPrefix(), request.preelection() ? 
"pre-"216k
:
""113k
);
2894
329k
}
2895
2896
void RaftConsensus::FillVoteResponseVoteGranted(
2897
1.13M
    const VoteRequestPB& request, VoteResponsePB* response) {
2898
1.13M
  response->set_responder_term(request.candidate_term());
2899
1.13M
  response->set_vote_granted(true);
2900
1.13M
}
2901
2902
void RaftConsensus::FillVoteResponseVoteDenied(ConsensusErrorPB::Code error_code,
2903
221k
                                               VoteResponsePB* response) {
2904
221k
  response->set_responder_term(state_->GetCurrentTermUnlocked());
2905
221k
  response->set_vote_granted(false);
2906
221k
  response->mutable_consensus_error()->set_code(error_code);
2907
221k
}
2908
2909
void RaftConsensus::RequestVoteRespondVoteDenied(
2910
    ConsensusErrorPB::Code error_code, const std::string& message_suffix,
2911
205k
    const VoteRequestPB& request, VoteResponsePB* response) {
2912
205k
  auto status = STATUS_FORMAT(
2913
205k
      InvalidArgument, "$0: Denying vote to candidate $1 $2",
2914
205k
      GetRequestVoteLogPrefix(request), request.candidate_uuid(), message_suffix);
2915
205k
  FillVoteResponseVoteDenied(error_code, response);
2916
205k
  LOG(INFO) << status.message().ToBuffer();
2917
205k
  StatusToPB(status, response->mutable_consensus_error()->mutable_status());
2918
205k
}
2919
2920
Status RaftConsensus::RequestVoteRespondInvalidTerm(const VoteRequestPB* request,
2921
204k
                                                    VoteResponsePB* response) {
2922
204k
  auto message_suffix = Format(
2923
204k
      "for earlier term $0. Current term is $1.",
2924
204k
      request->candidate_term(), state_->GetCurrentTermUnlocked());
2925
204k
  RequestVoteRespondVoteDenied(ConsensusErrorPB::INVALID_TERM, message_suffix, *request, response);
2926
204k
  return Status::OK();
2927
204k
}
2928
2929
Status RaftConsensus::RequestVoteRespondVoteAlreadyGranted(const VoteRequestPB* request,
2930
209
                                                           VoteResponsePB* response) {
2931
209
  FillVoteResponseVoteGranted(*request, response);
2932
209
  LOG(INFO) << Substitute("$0: Already granted yes vote for candidate $1 in term $2. "
2933
209
                          "Re-sending same reply.",
2934
209
                          GetRequestVoteLogPrefix(*request),
2935
209
                          request->candidate_uuid(),
2936
209
                          request->candidate_term());
2937
209
  return Status::OK();
2938
209
}
2939
2940
Status RaftConsensus::RequestVoteRespondAlreadyVotedForOther(const VoteRequestPB* request,
2941
1.16k
                                                             VoteResponsePB* response) {
2942
1.16k
  auto message_suffix = Format(
2943
1.16k
      "in current term $0: Already voted for candidate $1 in this term.",
2944
1.16k
      state_->GetCurrentTermUnlocked(), state_->GetVotedForCurrentTermUnlocked());
2945
1.16k
  RequestVoteRespondVoteDenied(ConsensusErrorPB::ALREADY_VOTED, message_suffix, *request, response);
2946
1.16k
  return Status::OK();
2947
1.16k
}
2948
2949
Status RaftConsensus::RequestVoteRespondLastOpIdTooOld(const OpIdPB& local_last_logged_opid,
2950
                                                       const VoteRequestPB* request,
2951
433
                                                       VoteResponsePB* response) {
2952
433
  auto message_suffix = Format(
2953
433
      "for term $0 because replica has last-logged OpId of $1, which is greater than that of the "
2954
433
          "candidate, which has last-logged OpId of $2.",
2955
433
      request->candidate_term(), local_last_logged_opid,
2956
433
      request->candidate_status().last_received());
2957
433
  RequestVoteRespondVoteDenied(
2958
433
      ConsensusErrorPB::LAST_OPID_TOO_OLD, message_suffix, *request, response);
2959
433
  return Status::OK();
2960
433
}
2961
2962
Status RaftConsensus::RequestVoteRespondLeaderIsAlive(const VoteRequestPB* request,
2963
8.89k
                                                      VoteResponsePB* response) {
2964
8.89k
  FillVoteResponseVoteDenied(ConsensusErrorPB::LEADER_IS_ALIVE, response);
2965
8.89k
  std::string msg = Format(
2966
8.89k
      "$0: Denying vote to candidate $1 for term $2 because replica is either leader or believes a "
2967
8.89k
      "valid leader to be alive. Time left: $3",
2968
8.89k
      GetRequestVoteLogPrefix(*request), request->candidate_uuid(), request->candidate_term(),
2969
8.89k
      withhold_votes_until_.load(std::memory_order_acquire) - MonoTime::Now());
2970
8.89k
  LOG(INFO) << msg;
2971
8.89k
  StatusToPB(STATUS(InvalidArgument, msg), response->mutable_consensus_error()->mutable_status());
2972
8.89k
  return Status::OK();
2973
8.89k
}
2974
2975
Status RaftConsensus::RequestVoteRespondIsBusy(const VoteRequestPB* request,
2976
6.88k
                                               VoteResponsePB* response) {
2977
6.88k
  FillVoteResponseVoteDenied(ConsensusErrorPB::CONSENSUS_BUSY, response);
2978
6.88k
  string msg = Substitute("$0: Denying vote to candidate $1 for term $2 because "
2979
6.88k
                          "replica is already servicing an update from a current leader "
2980
6.88k
                          "or another vote.",
2981
6.88k
                          GetRequestVoteLogPrefix(*request),
2982
6.88k
                          request->candidate_uuid(),
2983
6.88k
                          request->candidate_term());
2984
6.88k
  LOG(INFO) << msg;
2985
6.88k
  StatusToPB(STATUS(ServiceUnavailable, msg),
2986
6.88k
             response->mutable_consensus_error()->mutable_status());
2987
6.88k
  return Status::OK();
2988
6.88k
}
2989
2990
Status RaftConsensus::RequestVoteRespondVoteGranted(const VoteRequestPB* request,
2991
107k
                                                    VoteResponsePB* response) {
2992
  // We know our vote will be "yes", so avoid triggering an election while we
2993
  // persist our vote to disk. We use an exponential backoff to avoid too much
2994
  // split-vote contention when nodes display high latencies.
2995
107k
  MonoDelta additional_backoff = LeaderElectionExpBackoffDeltaUnlocked();
2996
107k
  SnoozeFailureDetector(ALLOW_LOGGING, additional_backoff);
2997
2998
  // Persist our vote to disk.
2999
107k
  RETURN_NOT_OK(state_->SetVotedForCurrentTermUnlocked(request->candidate_uuid()));
3000
3001
107k
  FillVoteResponseVoteGranted(*request, response);
3002
3003
  // Give peer time to become leader. Snooze one more time after persisting our
3004
  // vote. When disk latency is high, this should help reduce churn.
3005
107k
  SnoozeFailureDetector(DO_NOT_LOG, additional_backoff);
3006
3007
107k
  LOG(INFO) << Substitute("$0: Granting yes vote for candidate $1 in term $2.",
3008
107k
                          GetRequestVoteLogPrefix(*request),
3009
107k
                          request->candidate_uuid(),
3010
107k
                          state_->GetCurrentTermUnlocked());
3011
107k
  return Status::OK();
3012
107k
}
3013
3014
28.9M
PeerRole RaftConsensus::GetRoleUnlocked() const {
3015
28.9M
  DCHECK(state_->IsLocked());
3016
28.9M
  return state_->GetActiveRoleUnlocked();
3017
28.9M
}
3018
3019
28.9M
PeerRole RaftConsensus::role() const {
3020
28.9M
  auto lock = state_->LockForRead();
3021
28.9M
  return GetRoleUnlocked();
3022
28.9M
}
3023
3024
82.0M
LeaderState RaftConsensus::GetLeaderState(bool allow_stale) const {
3025
82.0M
  return state_->GetLeaderState(allow_stale);
3026
82.0M
}
3027
3028
8.74M
std::string RaftConsensus::LogPrefix() {
3029
8.74M
  return state_->LogPrefix();
3030
8.74M
}
3031
3032
182k
void RaftConsensus::SetLeaderUuidUnlocked(const string& uuid) {
3033
182k
  failed_elections_since_stable_leader_.store(0, std::memory_order_release);
3034
182k
  state_->SetLeaderUuidUnlocked(uuid);
3035
182k
  auto context = std::make_shared<StateChangeContext>(StateChangeReason::NEW_LEADER_ELECTED, uuid);
3036
182k
  MarkDirty(context);
3037
182k
}
3038
3039
Status RaftConsensus::ReplicateConfigChangeUnlocked(const ReplicateMsgPtr& replicate_ref,
3040
                                                    const RaftConfigPB& new_config,
3041
                                                    ChangeConfigType type,
3042
5.87k
                                                    StdStatusCallback client_cb) {
3043
5.87k
  LOG(INFO) << "Setting replicate pending config " << new_config.ShortDebugString()
3044
5.87k
            << ", type = " << ChangeConfigType_Name(type);
3045
3046
5.87k
  RETURN_NOT_OK(state_->SetPendingConfigUnlocked(new_config));
3047
3048
5.87k
  if (type == CHANGE_ROLE &&
3049
5.87k
      
PREDICT_FALSE2.01k
(FLAGS_TEST_inject_delay_leader_change_role_append_secs)) {
3050
1
    LOG(INFO) << "Adding change role sleep for "
3051
1
              << FLAGS_TEST_inject_delay_leader_change_role_append_secs << " secs.";
3052
1
    SleepFor(MonoDelta::FromSeconds(FLAGS_TEST_inject_delay_leader_change_role_append_secs));
3053
1
  }
3054
3055
  // Set as pending.
3056
5.87k
  RefreshConsensusQueueAndPeersUnlocked();
3057
3058
5.87k
  auto round = make_scoped_refptr<ConsensusRound>(this, replicate_ref);
3059
5.87k
  round->SetCallback(MakeNonTrackedRoundCallback(round.get(), std::move(client_cb)));
3060
5.87k
  auto status = AppendNewRoundToQueueUnlocked(round);
3061
5.87k
  if (!status.ok()) {
3062
    // We could just cancel pending config, because there is could be only one pending config.
3063
0
    auto clear_status = state_->ClearPendingConfigUnlocked();
3064
0
    if (!clear_status.ok()) {
3065
0
      LOG(WARNING) << "Could not clear pending config: " << clear_status;
3066
0
    }
3067
0
  }
3068
5.87k
  return status;
3069
5.87k
}
3070
3071
67.9k
void RaftConsensus::RefreshConsensusQueueAndPeersUnlocked() {
3072
67.9k
  DCHECK_EQ(PeerRole::LEADER, state_->GetActiveRoleUnlocked());
3073
67.9k
  const RaftConfigPB& active_config = state_->GetActiveConfigUnlocked();
3074
3075
  // Change the peers so that we're able to replicate messages remotely and
3076
  // locally. Peer manager connections are updated using the active config. Connections to peers
3077
  // that are not part of active_config are closed. New connections are created for those peers
3078
  // that are present in active_config but have no connections. When the queue is in LEADER
3079
  // mode, it checks that all registered peers are a part of the active config.
3080
67.9k
  peer_manager_->ClosePeersNotInConfig(active_config);
3081
67.9k
  queue_->SetLeaderMode(state_->GetCommittedOpIdUnlocked(),
3082
67.9k
                        state_->GetCurrentTermUnlocked(),
3083
67.9k
                        state_->GetLastAppliedOpIdUnlocked(),
3084
67.9k
                        active_config);
3085
3086
67.9k
  ScopedDnsTracker dns_tracker(update_raft_config_dns_latency_.get());
3087
67.9k
  peer_manager_->UpdateRaftConfig(active_config);
3088
67.9k
}
3089
3090
4.44k
string RaftConsensus::peer_uuid() const {
3091
4.44k
  return state_->GetPeerUuid();
3092
4.44k
}
3093
3094
60.6k
string RaftConsensus::tablet_id() const {
3095
60.6k
  return state_->GetOptions().tablet_id;
3096
60.6k
}
3097
3098
10.7k
const TabletId& RaftConsensus::split_parent_tablet_id() const {
3099
10.7k
  return split_parent_tablet_id_;
3100
10.7k
}
3101
3102
ConsensusStatePB RaftConsensus::ConsensusState(
3103
    ConsensusConfigType type,
3104
46.5M
    LeaderLeaseStatus* leader_lease_status) const {
3105
46.5M
  auto lock = state_->LockForRead();
3106
46.5M
  return ConsensusStateUnlocked(type, leader_lease_status);
3107
46.5M
}
3108
3109
ConsensusStatePB RaftConsensus::ConsensusStateUnlocked(
3110
    ConsensusConfigType type,
3111
46.5M
    LeaderLeaseStatus* leader_lease_status) const {
3112
46.5M
  CHECK(state_->IsLocked());
3113
46.5M
  if (leader_lease_status) {
3114
3.88k
    if (GetRoleUnlocked() == PeerRole::LEADER) {
3115
1.45k
      *leader_lease_status = state_->GetLeaderLeaseStatusUnlocked();
3116
2.42k
    } else {
3117
      // We'll still return a valid value if we're not a leader.
3118
2.42k
      *leader_lease_status = LeaderLeaseStatus::NO_MAJORITY_REPLICATED_LEASE;
3119
2.42k
    }
3120
3.88k
  }
3121
46.5M
  return state_->ConsensusStateUnlocked(type);
3122
46.5M
}
3123
3124
77.9M
RaftConfigPB RaftConsensus::CommittedConfig() const {
3125
77.9M
  auto lock = state_->LockForRead();
3126
77.9M
  return state_->GetCommittedConfigUnlocked();
3127
77.9M
}
3128
3129
38
void RaftConsensus::DumpStatusHtml(std::ostream& out) const {
3130
38
  out << "<h1>Raft Consensus State</h1>" << std::endl;
3131
3132
38
  out << "<h2>State</h2>" << std::endl;
3133
38
  out << "<pre>" << EscapeForHtmlToString(queue_->ToString()) << "</pre>" << std::endl;
3134
3135
  // Dump the queues on a leader.
3136
38
  PeerRole role;
3137
38
  {
3138
38
    auto lock = state_->LockForRead();
3139
38
    role = state_->GetActiveRoleUnlocked();
3140
38
  }
3141
38
  if (role == PeerRole::LEADER) {
3142
38
    out << "<h2>Queue overview</h2>" << std::endl;
3143
38
    out << "<pre>" << EscapeForHtmlToString(queue_->ToString()) << "</pre>" << std::endl;
3144
38
    out << "<hr/>" << std::endl;
3145
38
    out << "<h2>Queue details</h2>" << std::endl;
3146
38
    queue_->DumpToHtml(out);
3147
38
  }
3148
38
}
3149
3150
46
ReplicaState* RaftConsensus::GetReplicaStateForTests() {
3151
46
  return state_.get();
3152
46
}
3153
3154
void RaftConsensus::ElectionCallback(const LeaderElectionData& data,
3155
737k
                                     const ElectionResult& result) {
3156
  // The election callback runs on a reactor thread, so we need to defer to our
3157
  // threadpool. If the threadpool is already shut down for some reason, it's OK --
3158
  // we're OK with the callback never running.
3159
737k
  WARN_NOT_OK(raft_pool_token_->SubmitFunc(
3160
737k
              std::bind(&RaftConsensus::DoElectionCallback, shared_from_this(), data, result)),
3161
737k
              state_->LogPrefix() + "Unable to run election callback");
3162
737k
}
3163
3164
123k
void RaftConsensus::NotifyOriginatorAboutLostElection(const std::string& originator_uuid) {
3165
123k
  if (originator_uuid.empty()) {
3166
123k
    return;
3167
123k
  }
3168
3169
77
  ReplicaState::UniqueLock lock;
3170
77
  Status s = state_->LockForConfigChange(&lock);
3171
77
  if (PREDICT_FALSE(!s.ok())) {
3172
0
    LOG_WITH_PREFIX(INFO) << "Unable to notify originator about lost election, lock failed: "
3173
0
                          << s.ToString();
3174
0
    return;
3175
0
  }
3176
3177
77
  const auto& active_config = state_->GetActiveConfigUnlocked();
3178
77
  const auto * peer = FindPeer(active_config, originator_uuid);
3179
77
  if (!peer) {
3180
0
    LOG_WITH_PREFIX(WARNING) << "Failed to find originators peer: " << originator_uuid
3181
0
                             << ", config: " << active_config.ShortDebugString();
3182
0
    return;
3183
0
  }
3184
3185
77
  auto proxy = peer_proxy_factory_->NewProxy(*peer);
3186
77
  LeaderElectionLostRequestPB req;
3187
77
  req.set_dest_uuid(originator_uuid);
3188
77
  req.set_election_lost_by_uuid(state_->GetPeerUuid());
3189
77
  req.set_tablet_id(state_->GetOptions().tablet_id);
3190
77
  auto resp = std::make_shared<LeaderElectionLostResponsePB>();
3191
77
  auto rpc = std::make_shared<rpc::RpcController>();
3192
77
  rpc->set_invoke_callback_mode(rpc::InvokeCallbackMode::kThreadPoolHigh);
3193
77
  auto log_prefix = state_->LogPrefix();
3194
77
  proxy->LeaderElectionLostAsync(&req, resp.get(), rpc.get(), [log_prefix, resp, rpc] {
3195
77
    if (!rpc->status().ok()) {
3196
0
      LOG(WARNING) << log_prefix << "Notify about lost election RPC failure: "
3197
0
                   << rpc->status().ToString();
3198
77
    } else if (resp->has_error()) {
3199
0
      LOG(WARNING) << log_prefix << "Notify about lost election failed: "
3200
0
                   << StatusFromPB(resp->error().status()).ToString();
3201
0
    }
3202
77
  });
3203
77
}
3204
3205
void RaftConsensus::DoElectionCallback(const LeaderElectionData& data,
3206
737k
                                       const ElectionResult& result) {
3207
737k
  const char* election_name = result.preelection ? 
"Pre-election"674k
:
"election"62.8k
;
3208
737k
  const char* decision_name = result.decision == ElectionVote::kGranted ? 
"won"613k
:
"lost"123k
;
3209
  // Snooze to avoid the election timer firing again as much as possible.
3210
737k
  {
3211
737k
    auto lock = state_->LockForRead();
3212
    // We need to snooze when we win and when we lose:
3213
    // - When we win because we're about to disable the timer and become leader.
3214
    // - When we loose or otherwise we can fall into a cycle, where everyone keeps
3215
    //   triggering elections but no election ever completes because by the time they
3216
    //   finish another one is triggered already.
3217
    // We ignore the status as we don't want to fail if we the timer is
3218
    // disabled.
3219
737k
    SnoozeFailureDetector(ALLOW_LOGGING, LeaderElectionExpBackoffDeltaUnlocked());
3220
3221
737k
    if (!result.preelections_not_supported_by_uuid.empty()) {
3222
0
      disable_pre_elections_until_ =
3223
0
          CoarseMonoClock::now() + FLAGS_temporary_disable_preelections_timeout_ms * 1ms;
3224
0
      LOG_WITH_PREFIX(WARNING)
3225
0
          << "Disable pre-elections until " << ToString(disable_pre_elections_until_)
3226
0
          << ", because " << result.preelections_not_supported_by_uuid << " does not support them.";
3227
0
    }
3228
737k
  }
3229
737k
  if (result.decision == ElectionVote::kDenied) {
3230
123k
    failed_elections_since_stable_leader_.fetch_add(1, std::memory_order_acq_rel);
3231
123k
    LOG_WITH_PREFIX(INFO) << "Leader " << election_name << " lost for term "
3232
123k
                          << result.election_term << ". Reason: "
3233
123k
                          << (!result.message.empty() ? 
result.message88.2k
:
"None given"35.4k
)
3234
123k
                          << ". Originator: " << data.originator_uuid;
3235
123k
    NotifyOriginatorAboutLostElection(data.originator_uuid);
3236
3237
123k
    if (result.higher_term) {
3238
88.2k
      ReplicaState::UniqueLock lock;
3239
88.2k
      Status s = state_->LockForConfigChange(&lock);
3240
88.2k
      if (s.ok()) {
3241
88.2k
        s = HandleTermAdvanceUnlocked(*result.higher_term);
3242
88.2k
      }
3243
88.2k
      if (!s.ok()) {
3244
87.7k
        LOG_WITH_PREFIX(INFO) << "Unable to advance term as " << election_name << " result: " << s;
3245
87.7k
      }
3246
88.2k
    }
3247
3248
123k
    return;
3249
123k
  }
3250
3251
613k
  ReplicaState::UniqueLock lock;
3252
613k
  Status s = state_->LockForConfigChange(&lock);
3253
613k
  if (PREDICT_FALSE(!s.ok())) {
3254
0
    LOG_WITH_PREFIX(INFO) << "Received " << election_name << " callback for term "
3255
0
                          << result.election_term << " while not running: "
3256
0
                          << s.ToString();
3257
0
    return;
3258
0
  }
3259
3260
613k
  auto desired_term = state_->GetCurrentTermUnlocked() + (result.preelection ? 
1551k
:
061.9k
);
3261
613k
  if (result.election_term != desired_term) {
3262
1.21k
    LOG_WITH_PREFIX(INFO)
3263
1.21k
        << "Leader " << election_name << " decision for defunct term "
3264
1.21k
        << result.election_term << ": " << decision_name;
3265
1.21k
    return;
3266
1.21k
  }
3267
3268
612k
  const RaftConfigPB& active_config = state_->GetActiveConfigUnlocked();
3269
612k
  if (!IsRaftConfigVoter(state_->GetPeerUuid(), active_config)) {
3270
0
    LOG_WITH_PREFIX(WARNING)
3271
0
        << "Leader " << election_name << " decision while not in active config. "
3272
0
        << "Result: Term " << result.election_term << ": " << decision_name
3273
0
        << ". RaftConfig: " << active_config.ShortDebugString();
3274
0
    return;
3275
0
  }
3276
3277
612k
  if (result.preelection) {
3278
550k
    LOG_WITH_PREFIX(INFO) << "Leader pre-election won for term " << result.election_term;
3279
550k
    lock.unlock();
3280
550k
    WARN_NOT_OK(DoStartElection(data, PreElected::kTrue), "Start election failed: ");
3281
550k
    return;
3282
550k
  }
3283
3284
61.9k
  if (state_->GetActiveRoleUnlocked() == PeerRole::LEADER) {
3285
0
    LOG_WITH_PREFIX(DFATAL)
3286
0
        << "Leader " << election_name << " callback while already leader! Result: Term "
3287
0
        << result.election_term << ": "
3288
0
        << decision_name;
3289
0
    return;
3290
0
  }
3291
3292
61.9k
  LOG_WITH_PREFIX(INFO) << "Leader " << election_name << " won for term " << result.election_term;
3293
3294
  // Apply lease updates that were possible received from voters.
3295
61.9k
  state_->UpdateOldLeaderLeaseExpirationOnNonLeaderUnlocked(
3296
61.9k
      result.old_leader_lease, result.old_leader_ht_lease);
3297
3298
61.9k
  state_->SetLeaderNoOpCommittedUnlocked(false);
3299
  // Convert role to LEADER.
3300
61.9k
  SetLeaderUuidUnlocked(state_->GetPeerUuid());
3301
3302
  // TODO: BecomeLeaderUnlocked() can fail due to state checks during shutdown.
3303
  // It races with the above state check.
3304
  // This could be a problem during tablet deletion.
3305
61.9k
  auto status = BecomeLeaderUnlocked();
3306
61.9k
  if (!status.ok()) {
3307
0
    LOG_WITH_PREFIX(DFATAL) << "Failed to become leader: " << status.ToString();
3308
0
  }
3309
61.9k
}
3310
3311
341
yb::OpId RaftConsensus::GetLastReceivedOpId() {
3312
341
  auto lock = state_->LockForRead();
3313
341
  return state_->GetLastReceivedOpIdUnlocked();
3314
341
}
3315
3316
51.8M
yb::OpId RaftConsensus::GetLastCommittedOpId() {
3317
51.8M
  auto lock = state_->LockForRead();
3318
51.8M
  return state_->GetCommittedOpIdUnlocked();
3319
51.8M
}
3320
3321
3.82M
yb::OpId RaftConsensus::GetLastCDCedOpId() {
3322
3.82M
  return queue_->GetCDCConsumerOpIdForIntentRemoval();
3323
3.82M
}
3324
3325
0
yb::OpId RaftConsensus::GetLastAppliedOpId() {
3326
0
  auto lock = state_->LockForRead();
3327
0
  return state_->GetLastAppliedOpIdUnlocked();
3328
0
}
3329
3330
59
yb::OpId RaftConsensus::GetAllAppliedOpId() {
3331
59
  return queue_->GetAllAppliedOpId();
3332
59
}
3333
3334
471k
void RaftConsensus::MarkDirty(std::shared_ptr<StateChangeContext> context) {
3335
471k
  LOG_WITH_PREFIX(INFO) << "Calling mark dirty synchronously for reason code " << context->reason;
3336
471k
  mark_dirty_clbk_.Run(context);
3337
471k
}
3338
3339
void RaftConsensus::MarkDirtyOnSuccess(std::shared_ptr<StateChangeContext> context,
3340
                                       const StdStatusCallback& client_cb,
3341
138k
                                       const Status& status) {
3342
138k
  if (PREDICT_TRUE(status.ok())) {
3343
138k
    MarkDirty(context);
3344
138k
  }
3345
138k
  client_cb(status);
3346
138k
}
3347
3348
void RaftConsensus::NonTrackedRoundReplicationFinished(ConsensusRound* round,
3349
                                                       const StdStatusCallback& client_cb,
3350
200k
                                                       const Status& status) {
3351
200k
  DCHECK(state_->IsLocked());
3352
200k
  OperationType op_type = round->replicate_msg()->op_type();
3353
200k
  string op_str = Format("$0 [$1]", OperationType_Name(op_type), round->id());
3354
200k
  if (!IsConsensusOnlyOperation(op_type)) {
3355
0
    LOG_WITH_PREFIX(ERROR) << "Unexpected op type: " << op_str;
3356
0
    return;
3357
0
  }
3358
200k
  if (!status.ok()) {
3359
    // TODO: Do something with the status on failure?
3360
138
    LOG_WITH_PREFIX(INFO) << op_str << " replication failed: " << status << "\n" << GetStackTrace();
3361
3362
    // Clear out the pending state (ENG-590).
3363
138
    if (IsChangeConfigOperation(op_type)) {
3364
19
      WARN_NOT_OK(state_->ClearPendingConfigUnlocked(), "Could not clear pending state");
3365
19
    }
3366
200k
  } else if (IsChangeConfigOperation(op_type)) {
3367
    // Notify the TabletPeer owner object.
3368
19.5k
    state_->context()->ChangeConfigReplicated(state_->GetCommittedConfigUnlocked());
3369
19.5k
  }
3370
3371
200k
  client_cb(status);
3372
200k
}
3373
3374
161k
void RaftConsensus::EnableFailureDetector(MonoDelta delta) {
3375
161k
  if (PREDICT_TRUE(FLAGS_enable_leader_failure_detection)) {
3376
161k
    failure_detector_->Start(delta);
3377
161k
  }
3378
161k
}
3379
3380
137k
void RaftConsensus::DisableFailureDetector() {
3381
137k
  if (PREDICT_TRUE(FLAGS_enable_leader_failure_detection)) {
3382
137k
    failure_detector_->Stop();
3383
137k
  }
3384
137k
}
3385
3386
27.6M
void RaftConsensus::SnoozeFailureDetector(AllowLogging allow_logging, MonoDelta delta) {
3387
27.6M
  if (PREDICT_TRUE(GetAtomicFlag(&FLAGS_enable_leader_failure_detection))) {
3388
26.5M
    if (allow_logging == ALLOW_LOGGING) {
3389
2.07M
      LOG_WITH_PREFIX(INFO) << Format("Snoozing leader timeout detection for $0",
3390
18.4E
                                      
delta.Initialized()2.07M
?
delta.ToString()2.07M
: "election timeout");
3391
2.07M
    }
3392
3393
26.5M
    if (!delta.Initialized()) {
3394
24.3M
      delta = MinimumElectionTimeout();
3395
24.3M
    }
3396
26.5M
    failure_detector_->Snooze(delta);
3397
26.5M
  }
3398
27.6M
}
3399
3400
52.0M
MonoDelta RaftConsensus::MinimumElectionTimeout() const {
3401
52.0M
  int32_t failure_timeout = FLAGS_leader_failure_max_missed_heartbeat_periods *
3402
52.0M
      FLAGS_raft_heartbeat_interval_ms;
3403
3404
52.0M
  return MonoDelta::FromMilliseconds(failure_timeout);
3405
52.0M
}
3406
3407
2.07M
MonoDelta RaftConsensus::LeaderElectionExpBackoffDeltaUnlocked() {
3408
  // Compute a backoff factor based on how many leader elections have
3409
  // taken place since a stable leader was last seen.
3410
2.07M
  double backoff_factor = pow(
3411
2.07M
      1.1,
3412
2.07M
      failed_elections_since_stable_leader_.load(std::memory_order_acquire) + 1);
3413
2.07M
  double min_timeout = MinimumElectionTimeout().ToMilliseconds();
3414
2.07M
  double max_timeout = std::min<double>(
3415
2.07M
      min_timeout * backoff_factor,
3416
2.07M
      FLAGS_leader_failure_exp_backoff_max_delta_ms);
3417
2.07M
  if (max_timeout < min_timeout) {
3418
1.23k
    LOG(INFO) << "Resetting max_timeout from " <<  max_timeout << " to " << min_timeout
3419
1.23k
              << ", max_delta_flag=" << FLAGS_leader_failure_exp_backoff_max_delta_ms;
3420
1.23k
    max_timeout = min_timeout;
3421
1.23k
  }
3422
  // Randomize the timeout between the minimum and the calculated value.
3423
  // We do this after the above capping to the max. Otherwise, after a
3424
  // churny period, we'd end up highly likely to backoff exactly the max
3425
  // amount.
3426
2.07M
  double timeout = min_timeout + (max_timeout - min_timeout) * rng_.NextDoubleFraction();
3427
2.07M
  DCHECK_GE(timeout, min_timeout);
3428
3429
2.07M
  return MonoDelta::FromMilliseconds(timeout);
3430
2.07M
}
3431
3432
550k
Status RaftConsensus::IncrementTermUnlocked() {
3433
550k
  return HandleTermAdvanceUnlocked(state_->GetCurrentTermUnlocked() + 1);
3434
550k
}
3435
3436
758k
Status RaftConsensus::HandleTermAdvanceUnlocked(ConsensusTerm new_term) {
3437
758k
  if (new_term <= state_->GetCurrentTermUnlocked()) {
3438
1.09k
    return STATUS(IllegalState, Substitute("Can't advance term to: $0 current term: $1 is higher.",
3439
1.09k
                                           new_term, state_->GetCurrentTermUnlocked()));
3440
1.09k
  }
3441
3442
757k
  if (state_->GetActiveRoleUnlocked() == PeerRole::LEADER) {
3443
945
    LOG_WITH_PREFIX(INFO) << "Stepping down as leader of term "
3444
945
                          << state_->GetCurrentTermUnlocked()
3445
945
                          << " since new term is " << new_term;
3446
3447
945
    RETURN_NOT_OK(BecomeReplicaUnlocked(std::string()));
3448
945
  }
3449
3450
757k
  LOG_WITH_PREFIX(INFO) << "Advancing to term " << new_term;
3451
757k
  RETURN_NOT_OK(state_->SetCurrentTermUnlocked(new_term));
3452
182k
  term_metric_->set_value(new_term);
3453
182k
  return Status::OK();
3454
757k
}
3455
3456
Result<ReadOpsResult> RaftConsensus::ReadReplicatedMessagesForCDC(const yb::OpId& from,
3457
  int64_t* last_replicated_opid_index,
3458
617
  const CoarseTimePoint deadline) {
3459
617
  return queue_->ReadReplicatedMessagesForCDC(from, last_replicated_opid_index, deadline);
3460
617
}
3461
3462
874
void RaftConsensus::UpdateCDCConsumerOpId(const yb::OpId& op_id) {
3463
874
  return queue_->UpdateCDCConsumerOpId(op_id);
3464
874
}
3465
3466
void RaftConsensus::RollbackIdAndDeleteOpId(const ReplicateMsgPtr& replicate_msg,
3467
23
                                            bool should_exists) {
3468
23
  state_->CancelPendingOperation(OpId::FromPB(replicate_msg->id()), should_exists);
3469
23
}
3470
3471
8.07k
uint64_t RaftConsensus::OnDiskSize() const {
3472
8.07k
  return state_->OnDiskSize();
3473
8.07k
}
3474
3475
5.36M
yb::OpId RaftConsensus::WaitForSafeOpIdToApply(const yb::OpId& op_id) {
3476
5.36M
  return log_->WaitForSafeOpIdToApply(op_id);
3477
5.36M
}
3478
3479
50.1M
yb::OpId RaftConsensus::MinRetryableRequestOpId() {
3480
50.1M
  return state_->MinRetryableRequestOpId();
3481
50.1M
}
3482
3483
0
size_t RaftConsensus::LogCacheSize() {
3484
0
  return queue_->LogCacheSize();
3485
0
}
3486
3487
0
size_t RaftConsensus::EvictLogCache(size_t bytes_to_evict) {
3488
0
  return queue_->EvictLogCache(bytes_to_evict);
3489
0
}
3490
3491
0
RetryableRequestsCounts RaftConsensus::TEST_CountRetryableRequests() {
3492
0
  return state_->TEST_CountRetryableRequests();
3493
0
}
3494
3495
605
void RaftConsensus::TrackOperationMemory(const yb::OpId& op_id) {
3496
605
  queue_->TrackOperationsMemory({op_id});
3497
605
}
3498
3499
23
int64_t RaftConsensus::TEST_LeaderTerm() const {
3500
23
  auto lock = state_->LockForRead();
3501
23
  return state_->GetCurrentTermUnlocked();
3502
23
}
3503
3504
161
std::string RaftConsensus::DelayedStepDown::ToString() const {
3505
161
  return YB_STRUCT_TO_STRING(term, protege, graceful);
3506
161
}
3507
3508
3
Result<OpId> RaftConsensus::TEST_GetLastOpIdWithType(OpIdType opid_type, OperationType op_type) {
3509
3
  return queue_->TEST_GetLastOpIdWithType(VERIFY_RESULT(GetLastOpId(opid_type)).index, op_type);
3510
3
}
3511
3512
}  // namespace consensus
3513
}  // namespace yb