/Users/deen/code/yugabyte-db/src/yb/consensus/raft_consensus.cc
| Line | Count | Source (jump to first uncovered line) | 
| 1 |  | // Licensed to the Apache Software Foundation (ASF) under one | 
| 2 |  | // or more contributor license agreements.  See the NOTICE file | 
| 3 |  | // distributed with this work for additional information | 
| 4 |  | // regarding copyright ownership.  The ASF licenses this file | 
| 5 |  | // to you under the Apache License, Version 2.0 (the | 
| 6 |  | // "License"); you may not use this file except in compliance | 
| 7 |  | // with the License.  You may obtain a copy of the License at | 
| 8 |  | // | 
| 9 |  | //   http://www.apache.org/licenses/LICENSE-2.0 | 
| 10 |  | // | 
| 11 |  | // Unless required by applicable law or agreed to in writing, | 
| 12 |  | // software distributed under the License is distributed on an | 
| 13 |  | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | 
| 14 |  | // KIND, either express or implied.  See the License for the | 
| 15 |  | // specific language governing permissions and limitations | 
| 16 |  | // under the License. | 
| 17 |  | // | 
| 18 |  | // The following only applies to changes made to this file as part of YugaByte development. | 
| 19 |  | // | 
| 20 |  | // Portions Copyright (c) YugaByte, Inc. | 
| 21 |  | // | 
| 22 |  | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except | 
| 23 |  | // in compliance with the License.  You may obtain a copy of the License at | 
| 24 |  | // | 
| 25 |  | // http://www.apache.org/licenses/LICENSE-2.0 | 
| 26 |  | // | 
| 27 |  | // Unless required by applicable law or agreed to in writing, software distributed under the License | 
| 28 |  | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express | 
| 29 |  | // or implied.  See the License for the specific language governing permissions and limitations | 
| 30 |  | // under the License. | 
| 31 |  | // | 
| 32 |  |  | 
| 33 |  | #include "yb/consensus/raft_consensus.h" | 
| 34 |  |  | 
| 35 |  | #include <algorithm> | 
| 36 |  | #include <memory> | 
| 37 |  | #include <mutex> | 
| 38 |  |  | 
| 39 |  | #include <boost/optional.hpp> | 
| 40 |  | #include <gflags/gflags.h> | 
| 41 |  |  | 
| 42 |  | #include "yb/common/wire_protocol.h" | 
| 43 |  |  | 
| 44 |  | #include "yb/consensus/consensus.pb.h" | 
| 45 |  | #include "yb/consensus/consensus_context.h" | 
| 46 |  | #include "yb/consensus/consensus_peers.h" | 
| 47 |  | #include "yb/consensus/consensus_round.h" | 
| 48 |  | #include "yb/consensus/leader_election.h" | 
| 49 |  | #include "yb/consensus/log.h" | 
| 50 |  | #include "yb/consensus/peer_manager.h" | 
| 51 |  | #include "yb/consensus/quorum_util.h" | 
| 52 |  | #include "yb/consensus/replica_state.h" | 
| 53 |  | #include "yb/consensus/state_change_context.h" | 
| 54 |  |  | 
| 55 |  | #include "yb/gutil/casts.h" | 
| 56 |  | #include "yb/gutil/map-util.h" | 
| 57 |  | #include "yb/gutil/stringprintf.h" | 
| 58 |  |  | 
| 59 |  | #include "yb/rpc/messenger.h" | 
| 60 |  | #include "yb/rpc/periodic.h" | 
| 61 |  | #include "yb/rpc/rpc_controller.h" | 
| 62 |  |  | 
| 63 |  | #include "yb/server/clock.h" | 
| 64 |  |  | 
| 65 |  | #include "yb/util/debug-util.h" | 
| 66 |  | #include "yb/util/debug/long_operation_tracker.h" | 
| 67 |  | #include "yb/util/debug/trace_event.h" | 
| 68 |  | #include "yb/util/enums.h" | 
| 69 |  | #include "yb/util/flag_tags.h" | 
| 70 |  | #include "yb/util/format.h" | 
| 71 |  | #include "yb/util/logging.h" | 
| 72 |  | #include "yb/util/metrics.h" | 
| 73 |  | #include "yb/util/net/dns_resolver.h" | 
| 74 |  | #include "yb/util/random.h" | 
| 75 |  | #include "yb/util/random_util.h" | 
| 76 |  | #include "yb/util/scope_exit.h" | 
| 77 |  | #include "yb/util/status_format.h" | 
| 78 |  | #include "yb/util/status_log.h" | 
| 79 |  | #include "yb/util/threadpool.h" | 
| 80 |  | #include "yb/util/tostring.h" | 
| 81 |  | #include "yb/util/trace.h" | 
| 82 |  | #include "yb/util/tsan_util.h" | 
| 83 |  | #include "yb/util/url-coding.h" | 
| 84 |  |  | 
| 85 |  | using namespace std::literals; | 
| 86 |  | using namespace std::placeholders; | 
| 87 |  |  | 
| 88 |  | DEFINE_int32(raft_heartbeat_interval_ms, yb::NonTsanVsTsan(500, 1000), | 
| 89 |  |              "The heartbeat interval for Raft replication. The leader produces heartbeats " | 
| 90 |  |              "to followers at this interval. The followers expect a heartbeat at this interval " | 
| 91 |  |              "and consider a leader to have failed if it misses several in a row."); | 
| 92 |  | TAG_FLAG(raft_heartbeat_interval_ms, advanced); | 
| 93 |  |  | 
| 94 |  | DEFINE_double(leader_failure_max_missed_heartbeat_periods, 6.0, | 
| 95 |  |               "Maximum heartbeat periods that the leader can fail to heartbeat in before we " | 
| 96 |  |               "consider the leader to be failed. The total failure timeout in milliseconds is " | 
| 97 |  |               "raft_heartbeat_interval_ms times leader_failure_max_missed_heartbeat_periods. " | 
| 98 |  |               "The value passed to this flag may be fractional."); | 
| 99 |  | TAG_FLAG(leader_failure_max_missed_heartbeat_periods, advanced); | 
| 100 |  |  | 
| 101 |  | DEFINE_int32(leader_failure_exp_backoff_max_delta_ms, 20 * 1000, | 
| 102 |  |              "Maximum time to sleep in between leader election retries, in addition to the " | 
| 103 |  |              "regular timeout. When leader election fails the interval in between retries " | 
| 104 |  |              "increases exponentially, up to this value."); | 
| 105 |  | TAG_FLAG(leader_failure_exp_backoff_max_delta_ms, experimental); | 
| 106 |  |  | 
| 107 |  | DEFINE_bool(enable_leader_failure_detection, true, | 
| 108 |  |             "Whether to enable failure detection of tablet leaders. If enabled, attempts will be " | 
| 109 |  |             "made to elect a follower as a new leader when the leader is detected to have failed."); | 
| 110 |  | TAG_FLAG(enable_leader_failure_detection, unsafe); | 
| 111 |  |  | 
| 112 |  | DEFINE_test_flag(bool, do_not_start_election_test_only, false, | 
| 113 |  |                  "Do not start election even if leader failure is detected. "); | 
| 114 |  | TAG_FLAG(TEST_do_not_start_election_test_only, runtime); | 
| 115 |  |  | 
| 116 |  | DEFINE_bool(evict_failed_followers, true, | 
| 117 |  |             "Whether to evict followers from the Raft config that have fallen " | 
| 118 |  |             "too far behind the leader's log to catch up normally or have been " | 
| 119 |  |             "unreachable by the leader for longer than " | 
| 120 |  |             "follower_unavailable_considered_failed_sec"); | 
| 121 |  | TAG_FLAG(evict_failed_followers, advanced); | 
| 122 |  |  | 
| 123 |  | DEFINE_test_flag(bool, follower_reject_update_consensus_requests, false, | 
| 124 |  |                  "Whether a follower will return an error for all UpdateConsensus() requests."); | 
| 125 |  |  | 
| 126 |  | DEFINE_test_flag(bool, follower_pause_update_consensus_requests, false, | 
| 127 |  |                  "Whether a follower will pause all UpdateConsensus() requests."); | 
| 128 |  |  | 
| 129 |  | DEFINE_test_flag(int32, follower_reject_update_consensus_requests_seconds, 0, | 
| 130 |  |                  "Whether a follower will return an error for all UpdateConsensus() requests for " | 
| 131 |  |                  "the first TEST_follower_reject_update_consensus_requests_seconds seconds after " | 
| 132 |  |                  "the Consensus objet is created."); | 
| 133 |  |  | 
| 134 |  | DEFINE_test_flag(bool, follower_fail_all_prepare, false, | 
| 135 |  |                  "Whether a follower will fail preparing all operations."); | 
| 136 |  |  | 
| 137 |  | DEFINE_int32(after_stepdown_delay_election_multiplier, 5, | 
| 138 |  |              "After a peer steps down as a leader, the factor with which to multiply " | 
| 139 |  |              "leader_failure_max_missed_heartbeat_periods to get the delay time before starting a " | 
| 140 |  |              "new election."); | 
| 141 |  | TAG_FLAG(after_stepdown_delay_election_multiplier, advanced); | 
| 142 |  | TAG_FLAG(after_stepdown_delay_election_multiplier, hidden); | 
| 143 |  |  | 
| 144 |  | DECLARE_int32(memory_limit_warn_threshold_percentage); | 
| 145 |  |  | 
| 146 |  | DEFINE_test_flag(int32, inject_delay_leader_change_role_append_secs, 0, | 
| 147 |  |                  "Amount of time to delay leader from sending replicate of change role."); | 
| 148 |  |  | 
| 149 |  | DEFINE_test_flag(double, return_error_on_change_config, 0.0, | 
| 150 |  |                  "Fraction of the time when ChangeConfig will return an error."); | 
| 151 |  |  | 
| 152 |  | METRIC_DEFINE_counter(tablet, follower_memory_pressure_rejections, | 
| 153 |  |                       "Follower Memory Pressure Rejections", | 
| 154 |  |                       yb::MetricUnit::kRequests, | 
| 155 |  |                       "Number of RPC requests rejected due to " | 
| 156 |  |                       "memory pressure while FOLLOWER."); | 
| 157 |  | METRIC_DEFINE_gauge_int64(tablet, raft_term, | 
| 158 |  |                           "Current Raft Consensus Term", | 
| 159 |  |                           yb::MetricUnit::kUnits, | 
| 160 |  |                           "Current Term of the Raft Consensus algorithm. This number increments " | 
| 161 |  |                           "each time a leader election is started."); | 
| 162 |  |  | 
| 163 |  | METRIC_DEFINE_lag(tablet, follower_lag_ms, | 
| 164 |  |                   "Follower lag from leader", | 
| 165 |  |                   "The amount of time since the last UpdateConsensus request from the " | 
| 166 |  |                   "leader.", {0, yb::AggregationFunction::kMax} /* optional_args */); | 
| 167 |  |  | 
| 168 |  | METRIC_DEFINE_gauge_int64(tablet, is_raft_leader, | 
| 169 |  |                           "Is tablet raft leader", | 
| 170 |  |                           yb::MetricUnit::kUnits, | 
| 171 |  |                           "Keeps track whether tablet is raft leader" | 
| 172 |  |                           "1 indicates that the tablet is raft leader"); | 
| 173 |  |  | 
| 174 |  | METRIC_DEFINE_coarse_histogram( | 
| 175 |  |   table, dns_resolve_latency_during_update_raft_config, | 
| 176 |  |   "yb.consensus.RaftConsensus.UpdateRaftConfig DNS Resolve", | 
| 177 |  |   yb::MetricUnit::kMicroseconds, | 
| 178 |  |   "Microseconds spent resolving DNS requests during RaftConsensus::UpdateRaftConfig"); | 
| 179 |  |  | 
| 180 |  | DEFINE_int32(leader_lease_duration_ms, yb::consensus::kDefaultLeaderLeaseDurationMs, | 
| 181 |  |              "Leader lease duration. A leader keeps establishing a new lease or extending the " | 
| 182 |  |              "existing one with every UpdateConsensus. A new server is not allowed to serve as a " | 
| 183 |  |              "leader (i.e. serve up-to-date read requests or acknowledge write requests) until a " | 
| 184 |  |              "lease of this duration has definitely expired on the old leader's side."); | 
| 185 |  |  | 
| 186 |  | DEFINE_int32(ht_lease_duration_ms, 2000, | 
| 187 |  |              "Hybrid time leader lease duration. A leader keeps establishing a new lease or " | 
| 188 |  |              "extending the existing one with every UpdateConsensus. A new server is not allowed " | 
| 189 |  |              "to add entries to RAFT log until a lease of the old leader is expired. 0 to disable." | 
| 190 |  |              ); | 
| 191 |  |  | 
| 192 |  | DEFINE_int32(min_leader_stepdown_retry_interval_ms, | 
| 193 |  |              20 * 1000, | 
| 194 |  |              "Minimum amount of time between successive attempts to perform the leader stepdown " | 
| 195 |  |              "for the same combination of tablet and intended (target) leader. This is needed " | 
| 196 |  |              "to avoid infinite leader stepdown loops when the current leader never has a chance " | 
| 197 |  |              "to update the intended leader with its latest records."); | 
| 198 |  |  | 
| 199 |  | DEFINE_bool(use_preelection, true, "Whether to use pre election, before doing actual election."); | 
| 200 |  |  | 
| 201 |  | DEFINE_int32(temporary_disable_preelections_timeout_ms, 10 * 60 * 1000, | 
| 202 |  |              "If some of nodes does not support preelections, then we disable them for this " | 
| 203 |  |              "amount of time."); | 
| 204 |  |  | 
| 205 |  | DEFINE_test_flag(bool, pause_update_replica, false, | 
| 206 |  |                  "Pause RaftConsensus::UpdateReplica processing before snoozing failure detector."); | 
| 207 |  |  | 
| 208 |  | DEFINE_test_flag(bool, pause_update_majority_replicated, false, | 
| 209 |  |                  "Pause RaftConsensus::UpdateMajorityReplicated."); | 
| 210 |  |  | 
| 211 |  | DEFINE_test_flag(int32, log_change_config_every_n, 1, | 
| 212 |  |                  "How often to log change config information. " | 
| 213 |  |                  "Used to reduce the number of lines being printed for change config requests " | 
| 214 |  |                  "when a test simulates a failure that would generate a log of these requests."); | 
| 215 |  |  | 
| 216 |  | DEFINE_bool(enable_lease_revocation, true, "Enables lease revocation mechanism"); | 
| 217 |  |  | 
| 218 |  | DEFINE_bool(quick_leader_election_on_create, false, | 
| 219 |  |             "Do we trigger quick leader elections on table creation."); | 
| 220 |  | TAG_FLAG(quick_leader_election_on_create, advanced); | 
| 221 |  | TAG_FLAG(quick_leader_election_on_create, hidden); | 
| 222 |  |  | 
| 223 |  | DEFINE_bool( | 
| 224 |  |     stepdown_disable_graceful_transition, false, | 
| 225 |  |     "During a leader stepdown, disable graceful leadership transfer " | 
| 226 |  |     "to an up to date peer"); | 
| 227 |  |  | 
| 228 |  | DEFINE_bool( | 
| 229 |  |     raft_disallow_concurrent_outstanding_report_failure_tasks, true, | 
| 230 |  |     "If true, only submit a new report failure task if there is not one outstanding."); | 
| 231 |  | TAG_FLAG(raft_disallow_concurrent_outstanding_report_failure_tasks, advanced); | 
| 232 |  | TAG_FLAG(raft_disallow_concurrent_outstanding_report_failure_tasks, hidden); | 
| 233 |  |  | 
| 234 |  | DEFINE_int64(protege_synchronization_timeout_ms, 1000, | 
| 235 |  |              "Timeout to synchronize protege before performing step down. " | 
| 236 |  |              "0 to disable synchronization."); | 
| 237 |  |  | 
| 238 |  | namespace yb { | 
| 239 |  | namespace consensus { | 
| 240 |  |  | 
| 241 |  | using log::LogEntryBatch; | 
| 242 |  | using rpc::PeriodicTimer; | 
| 243 |  | using std::shared_ptr; | 
| 244 |  | using std::unique_ptr; | 
| 245 |  | using std::weak_ptr; | 
| 246 |  | using strings::Substitute; | 
| 247 |  | using tserver::TabletServerErrorPB; | 
| 248 |  |  | 
| 249 |  | namespace { | 
| 250 |  |  | 
| 251 | 10.1k | const RaftPeerPB* FindPeer(const RaftConfigPB& active_config, const std::string& uuid) { | 
| 252 | 20.7k |   for (const RaftPeerPB& peer : active_config.peers()) { | 
| 253 | 20.7k |     if (peer.permanent_uuid() == uuid) { | 
| 254 | 10.1k |       return &peer; | 
| 255 | 10.1k |     } | 
| 256 | 20.7k |   } | 
| 257 |  |  | 
| 258 | 18.4E |   return nullptr; | 
| 259 | 10.1k | } | 
| 260 |  |  | 
| 261 |  | // Helper function to check if the op is a non-Operation op. | 
| 262 | 14.7M | bool IsConsensusOnlyOperation(OperationType op_type) { | 
| 263 | 14.7M |   return op_type == NO_OP || op_type == CHANGE_CONFIG_OP14.2M; | 
| 264 | 14.7M | } | 
| 265 |  |  | 
| 266 |  | // Helper to check if the op is Change Config op. | 
| 267 | 333k | bool IsChangeConfigOperation(OperationType op_type) { | 
| 268 | 333k |   return op_type == CHANGE_CONFIG_OP; | 
| 269 | 333k | } | 
| 270 |  |  | 
| 271 |  | class NonTrackedRoundCallback : public ConsensusRoundCallback { | 
| 272 |  |  public: | 
| 273 |  |   explicit NonTrackedRoundCallback(ConsensusRound* round, const StdStatusCallback& callback) | 
| 274 | 201k |       : round_(round), callback_(callback) { | 
| 275 | 201k |   } | 
| 276 |  |  | 
| 277 | 67.9k |   void AddedToLeader(const OpId& op_id, const OpId& committed_op_id) override { | 
| 278 | 67.9k |     auto& replicate_msg = *round_->replicate_msg(); | 
| 279 | 67.9k |     op_id.ToPB(replicate_msg.mutable_id()); | 
| 280 | 67.9k |     committed_op_id.ToPB(replicate_msg.mutable_committed_op_id()); | 
| 281 | 67.9k |   } | 
| 282 |  |  | 
| 283 |  |   void ReplicationFinished( | 
| 284 | 200k |       const Status& status, int64_t leader_term, OpIds* applied_op_ids) override { | 
| 285 | 200k |     down_cast<RaftConsensus*>(round_->consensus())->NonTrackedRoundReplicationFinished( | 
| 286 | 200k |         round_, callback_, status); | 
| 287 | 200k |   } | 
| 288 |  |  | 
| 289 |  |  private: | 
| 290 |  |   ConsensusRound* round_; | 
| 291 |  |   StdStatusCallback callback_; | 
| 292 |  | }; | 
| 293 |  |  | 
| 294 |  | } // namespace | 
| 295 |  |  | 
| 296 |  | std::unique_ptr<ConsensusRoundCallback> MakeNonTrackedRoundCallback( | 
| 297 | 201k |     ConsensusRound* round, const StdStatusCallback& callback) { | 
| 298 | 201k |   return std::make_unique<NonTrackedRoundCallback>(round, callback); | 
| 299 | 201k | } | 
| 300 |  |  | 
| 301 |  | struct RaftConsensus::LeaderRequest { | 
| 302 |  |   std::string leader_uuid; | 
| 303 |  |   yb::OpId preceding_op_id; | 
| 304 |  |   yb::OpId committed_op_id; | 
| 305 |  |   ReplicateMsgs messages; | 
| 306 |  |   // The positional index of the first message selected to be appended, in the | 
| 307 |  |   // original leader's request message sequence. | 
| 308 |  |   int64_t first_message_idx; | 
| 309 |  |  | 
| 310 |  |   std::string OpsRangeString() const; | 
| 311 |  | }; | 
| 312 |  |  | 
| 313 |  | shared_ptr<RaftConsensus> RaftConsensus::Create( | 
| 314 |  |     const ConsensusOptions& options, | 
| 315 |  |     std::unique_ptr<ConsensusMetadata> cmeta, | 
| 316 |  |     const RaftPeerPB& local_peer_pb, | 
| 317 |  |     const scoped_refptr<MetricEntity>& table_metric_entity, | 
| 318 |  |     const scoped_refptr<MetricEntity>& tablet_metric_entity, | 
| 319 |  |     const scoped_refptr<server::Clock>& clock, | 
| 320 |  |     ConsensusContext* consensus_context, | 
| 321 |  |     rpc::Messenger* messenger, | 
| 322 |  |     rpc::ProxyCache* proxy_cache, | 
| 323 |  |     const scoped_refptr<log::Log>& log, | 
| 324 |  |     const shared_ptr<MemTracker>& server_mem_tracker, | 
| 325 |  |     const shared_ptr<MemTracker>& parent_mem_tracker, | 
| 326 |  |     const Callback<void(std::shared_ptr<StateChangeContext> context)> mark_dirty_clbk, | 
| 327 |  |     TableType table_type, | 
| 328 |  |     ThreadPool* raft_pool, | 
| 329 |  |     RetryableRequests* retryable_requests, | 
| 330 | 150k |     MultiRaftManager* multi_raft_manager) { | 
| 331 |  |  | 
| 332 | 150k |   auto rpc_factory = std::make_unique<RpcPeerProxyFactory>( | 
| 333 | 150k |     messenger, proxy_cache, local_peer_pb.cloud_info()); | 
| 334 |  |  | 
| 335 |  |   // The message queue that keeps track of which operations need to be replicated | 
| 336 |  |   // where. | 
| 337 | 150k |   auto queue = std::make_unique<PeerMessageQueue>( | 
| 338 | 150k |       tablet_metric_entity, | 
| 339 | 150k |       log, | 
| 340 | 150k |       server_mem_tracker, | 
| 341 | 150k |       parent_mem_tracker, | 
| 342 | 150k |       local_peer_pb, | 
| 343 | 150k |       options.tablet_id, | 
| 344 | 150k |       clock, | 
| 345 | 150k |       consensus_context, | 
| 346 | 150k |       raft_pool->NewToken(ThreadPool::ExecutionMode::SERIAL)); | 
| 347 |  |  | 
| 348 | 150k |   DCHECK(local_peer_pb.has_permanent_uuid()); | 
| 349 | 150k |   const string& peer_uuid = local_peer_pb.permanent_uuid(); | 
| 350 |  |  | 
| 351 |  |   // A single Raft thread pool token is shared between RaftConsensus and | 
| 352 |  |   // PeerManager. Because PeerManager is owned by RaftConsensus, it receives a | 
| 353 |  |   // raw pointer to the token, to emphasize that RaftConsensus is responsible | 
| 354 |  |   // for destroying the token. | 
| 355 | 150k |   unique_ptr<ThreadPoolToken> raft_pool_token(raft_pool->NewToken( | 
| 356 | 150k |       ThreadPool::ExecutionMode::CONCURRENT)); | 
| 357 |  |  | 
| 358 |  |   // A manager for the set of peers that actually send the operations both remotely | 
| 359 |  |   // and to the local wal. | 
| 360 | 150k |   auto peer_manager = std::make_unique<PeerManager>( | 
| 361 | 150k |       options.tablet_id, | 
| 362 | 150k |       peer_uuid, | 
| 363 | 150k |       rpc_factory.get(), | 
| 364 | 150k |       queue.get(), | 
| 365 | 150k |       raft_pool_token.get(), | 
| 366 | 150k |       multi_raft_manager); | 
| 367 |  |  | 
| 368 | 150k |   return std::make_shared<RaftConsensus>( | 
| 369 | 150k |       options, | 
| 370 | 150k |       std::move(cmeta), | 
| 371 | 150k |       std::move(rpc_factory), | 
| 372 | 150k |       std::move(queue), | 
| 373 | 150k |       std::move(peer_manager), | 
| 374 | 150k |       std::move(raft_pool_token), | 
| 375 | 150k |       table_metric_entity, | 
| 376 | 150k |       tablet_metric_entity, | 
| 377 | 150k |       peer_uuid, | 
| 378 | 150k |       clock, | 
| 379 | 150k |       consensus_context, | 
| 380 | 150k |       log, | 
| 381 | 150k |       parent_mem_tracker, | 
| 382 | 150k |       mark_dirty_clbk, | 
| 383 | 150k |       table_type, | 
| 384 | 150k |       retryable_requests); | 
| 385 | 150k | } | 
| 386 |  |  | 
| 387 |  | RaftConsensus::RaftConsensus( | 
| 388 |  |     const ConsensusOptions& options, std::unique_ptr<ConsensusMetadata> cmeta, | 
| 389 |  |     std::unique_ptr<PeerProxyFactory> proxy_factory, | 
| 390 |  |     std::unique_ptr<PeerMessageQueue> queue, | 
| 391 |  |     std::unique_ptr<PeerManager> peer_manager, | 
| 392 |  |     std::unique_ptr<ThreadPoolToken> raft_pool_token, | 
| 393 |  |     const scoped_refptr<MetricEntity>& table_metric_entity, | 
| 394 |  |     const scoped_refptr<MetricEntity>& tablet_metric_entity, | 
| 395 |  |     const std::string& peer_uuid, const scoped_refptr<server::Clock>& clock, | 
| 396 |  |     ConsensusContext* consensus_context, const scoped_refptr<log::Log>& log, | 
| 397 |  |     shared_ptr<MemTracker> parent_mem_tracker, | 
| 398 |  |     Callback<void(std::shared_ptr<StateChangeContext> context)> mark_dirty_clbk, | 
| 399 |  |     TableType table_type, | 
| 400 |  |     RetryableRequests* retryable_requests) | 
| 401 |  |     : raft_pool_token_(std::move(raft_pool_token)), | 
| 402 |  |       log_(log), | 
| 403 |  |       clock_(clock), | 
| 404 |  |       peer_proxy_factory_(std::move(proxy_factory)), | 
| 405 |  |       peer_manager_(std::move(peer_manager)), | 
| 406 |  |       queue_(std::move(queue)), | 
| 407 |  |       rng_(GetRandomSeed32()), | 
| 408 |  |       withhold_votes_until_(MonoTime::Min()), | 
| 409 |  |       step_down_check_tracker_(&peer_proxy_factory_->messenger()->scheduler()), | 
| 410 |  |       mark_dirty_clbk_(std::move(mark_dirty_clbk)), | 
| 411 |  |       shutdown_(false), | 
| 412 |  |       follower_memory_pressure_rejections_(tablet_metric_entity->FindOrCreateCounter( | 
| 413 |  |           &METRIC_follower_memory_pressure_rejections)), | 
| 414 |  |       term_metric_(tablet_metric_entity->FindOrCreateGauge(&METRIC_raft_term, | 
| 415 |  |                                                     cmeta->current_term())), | 
| 416 |  |       follower_last_update_time_ms_metric_( | 
| 417 |  |           tablet_metric_entity->FindOrCreateAtomicMillisLag(&METRIC_follower_lag_ms)), | 
| 418 |  |       is_raft_leader_metric_(tablet_metric_entity->FindOrCreateGauge(&METRIC_is_raft_leader, | 
| 419 |  |                                                               static_cast<int64_t>(0))), | 
| 420 |  |       parent_mem_tracker_(std::move(parent_mem_tracker)), | 
| 421 |  |       table_type_(table_type), | 
| 422 |  |       update_raft_config_dns_latency_( | 
| 423 |  |           METRIC_dns_resolve_latency_during_update_raft_config.Instantiate(table_metric_entity)), | 
| 424 |  |       split_parent_tablet_id_( | 
| 425 | 150k |           cmeta->has_split_parent_tablet_id() ? cmeta->split_parent_tablet_id() : "") { | 
| 426 | 150k |   DCHECK_NOTNULL(log_.get()); | 
| 427 |  |  | 
| 428 | 150k |   if (PREDICT_FALSE(FLAGS_TEST_follower_reject_update_consensus_requests_seconds > 0)) { | 
| 429 | 36 |     withold_replica_updates_until_ = MonoTime::Now() + | 
| 430 | 36 |         MonoDelta::FromSeconds(FLAGS_TEST_follower_reject_update_consensus_requests_seconds); | 
| 431 | 36 |   } | 
| 432 |  |  | 
| 433 | 150k |   state_ = std::make_unique<ReplicaState>( | 
| 434 | 150k |       options, | 
| 435 | 150k |       peer_uuid, | 
| 436 | 150k |       std::move(cmeta), | 
| 437 | 150k |       DCHECK_NOTNULL(consensus_context), | 
| 438 | 150k |       this, | 
| 439 | 150k |       retryable_requests, | 
| 440 | 150k |       std::bind(&PeerMessageQueue::TrackOperationsMemory, queue_.get(), _1)); | 
| 441 |  |  | 
| 442 | 150k |   peer_manager_->SetConsensus(this); | 
| 443 | 150k | } | 
| 444 |  |  | 
| 445 | 75.4k | RaftConsensus::~RaftConsensus() { | 
| 446 | 75.4k |   Shutdown(); | 
| 447 | 75.4k | } | 
| 448 |  |  | 
| 449 | 150k | Status RaftConsensus::Start(const ConsensusBootstrapInfo& info) { | 
| 450 | 150k |   RETURN_NOT_OK(ExecuteHook(PRE_START)); | 
| 451 |  |  | 
| 452 |  |   // Capture a weak_ptr reference into the functor so it can safely handle | 
| 453 |  |   // outliving the consensus instance. | 
| 454 | 150k |   std::weak_ptr<RaftConsensus> w = shared_from_this(); | 
| 455 | 150k |   failure_detector_ = PeriodicTimer::Create( | 
| 456 | 150k |       peer_proxy_factory_->messenger(), | 
| 457 | 818k |       [w]() { | 
| 458 | 818k |         if (auto consensus = w.lock()) { | 
| 459 | 818k |           consensus->ReportFailureDetected(); | 
| 460 | 818k |         } | 
| 461 | 818k |       }, | 
| 462 | 150k |       MinimumElectionTimeout()); | 
| 463 |  |  | 
| 464 | 150k |   { | 
| 465 | 150k |     ReplicaState::UniqueLock lock; | 
| 466 | 150k |     RETURN_NOT_OK(state_->LockForStart(&lock)); | 
| 467 | 150k |     state_->ClearLeaderUnlocked(); | 
| 468 |  |  | 
| 469 | 150k |     RETURN_NOT_OK_PREPEND(state_->StartUnlocked(info.last_id), | 
| 470 | 150k |                           "Unable to start RAFT ReplicaState"); | 
| 471 |  |  | 
| 472 | 150k |     LOG_WITH_PREFIX(INFO) << "Replica starting. Triggering " | 
| 473 | 150k |                           << info.orphaned_replicates.size() | 
| 474 | 150k |                           << " pending operations. Active config: " | 
| 475 | 150k |                           << state_->GetActiveConfigUnlocked().ShortDebugString(); | 
| 476 | 150k |     for (const auto& replicate : info.orphaned_replicates) { | 
| 477 | 156 |       ReplicateMsgPtr replicate_ptr = std::make_shared<ReplicateMsg>(*replicate); | 
| 478 | 156 |       RETURN_NOT_OK(StartReplicaOperationUnlocked(replicate_ptr, HybridTime::kInvalid)); | 
| 479 | 156 |     } | 
| 480 |  |  | 
| 481 | 150k |     RETURN_NOT_OK(state_->InitCommittedOpIdUnlocked(yb::OpId::FromPB(info.last_committed_id))); | 
| 482 |  |  | 
| 483 | 150k |     queue_->Init(state_->GetLastReceivedOpIdUnlocked()); | 
| 484 | 150k |   } | 
| 485 |  |  | 
| 486 | 0 |   { | 
| 487 | 150k |     ReplicaState::UniqueLock lock; | 
| 488 | 150k |     RETURN_NOT_OK(state_->LockForConfigChange(&lock)); | 
| 489 |  |  | 
| 490 |  |     // If this is the first term expire the FD immediately so that we have a fast first | 
| 491 |  |     // election, otherwise we just let the timer expire normally. | 
| 492 | 150k |     MonoDelta initial_delta = MonoDelta(); | 
| 493 | 150k |     if (state_->GetCurrentTermUnlocked() == 0) { | 
| 494 |  |       // The failure detector is initialized to a low value to trigger an early election | 
| 495 |  |       // (unless someone else requested a vote from us first, which resets the | 
| 496 |  |       // election timer). We do it this way instead of immediately running an | 
| 497 |  |       // election to get a higher likelihood of enough servers being available | 
| 498 |  |       // when the first one attempts an election to avoid multiple election | 
| 499 |  |       // cycles on startup, while keeping that "waiting period" random. If there is only one peer, | 
| 500 |  |       // trigger an election right away. | 
| 501 | 147k |       if (PREDICT_TRUE(FLAGS_enable_leader_failure_detection)) { | 
| 502 | 147k |         LOG_WITH_PREFIX(INFO) << "Consensus starting up: Expiring fail detector timer " | 
| 503 | 147k |                                  "to make a prompt election more likely"; | 
| 504 |  |         // Gating quick leader elections on table creation since prompter leader elections are | 
| 505 |  |         // more likely to fail due to uninitialized peers or conflicting elections, which could | 
| 506 |  |         // have unforseen consequences. | 
| 507 | 147k |         if (FLAGS_quick_leader_election_on_create) { | 
| 508 | 0 |           initial_delta = (state_->GetCommittedConfigUnlocked().peers_size() == 1) ? | 
| 509 | 0 |               MonoDelta::kZero : | 
| 510 | 0 |               MonoDelta::FromMilliseconds(rng_.Uniform(FLAGS_raft_heartbeat_interval_ms)); | 
| 511 | 0 |         } | 
| 512 | 147k |       } | 
| 513 | 147k |     } | 
| 514 | 150k |     RETURN_NOT_OK(BecomeReplicaUnlocked(std::string(), initial_delta)); | 
| 515 | 150k |   } | 
| 516 |  |  | 
| 517 | 150k |   RETURN_NOT_OK(ExecuteHook(POST_START)); | 
| 518 |  |  | 
| 519 |  |   // The context tracks that the current caller does not hold the lock for consensus state. | 
| 520 |  |   // So mark dirty callback, e.g., consensus->ConsensusState() for master consensus callback of | 
| 521 |  |   // SysCatalogStateChanged, can get the lock when needed. | 
| 522 | 150k |   auto context = std::make_shared<StateChangeContext>(StateChangeReason::CONSENSUS_STARTED, false); | 
| 523 |  |   // Report become visible to the Master. | 
| 524 | 150k |   MarkDirty(context); | 
| 525 |  |  | 
| 526 | 150k |   return Status::OK(); | 
| 527 | 150k | } | 
| 528 |  |  | 
| 529 | 7.96k | bool RaftConsensus::IsRunning() const { | 
| 530 | 7.96k |   auto lock = state_->LockForRead(); | 
| 531 | 7.96k |   return state_->state() == ReplicaState::kRunning; | 
| 532 | 7.96k | } | 
| 533 |  |  | 
| 534 | 86 | Status RaftConsensus::EmulateElection() { | 
| 535 | 86 |   ReplicaState::UniqueLock lock; | 
| 536 | 86 |   RETURN_NOT_OK(state_->LockForConfigChange(&lock)); | 
| 537 |  |  | 
| 538 | 86 |   LOG_WITH_PREFIX(INFO) << "Emulating election..."; | 
| 539 |  |  | 
| 540 |  |   // Assume leadership of new term. | 
| 541 | 86 |   RETURN_NOT_OK(IncrementTermUnlocked()); | 
| 542 | 86 |   SetLeaderUuidUnlocked(state_->GetPeerUuid()); | 
| 543 | 86 |   return BecomeLeaderUnlocked(); | 
| 544 | 86 | } | 
| 545 |  |  | 
| 546 | 1.22M | Status RaftConsensus::DoStartElection(const LeaderElectionData& data, PreElected preelected) { | 
| 547 | 1.22M |   TRACE_EVENT2("consensus", "RaftConsensus::StartElection", | 
| 548 | 1.22M |                "peer", peer_uuid(), | 
| 549 | 1.22M |                "tablet", tablet_id()); | 
| 550 | 1.22M |   VLOG_WITH_PREFIX_AND_FUNC47 (1) << data.ToString()47; | 
| 551 | 1.22M |   if (FLAGS_TEST_do_not_start_election_test_only) { | 
| 552 | 11 |     LOG(INFO) << "Election start skipped as TEST_do_not_start_election_test_only flag " | 
| 553 | 11 |                  "is set to true."; | 
| 554 | 11 |     return Status::OK(); | 
| 555 | 11 |   } | 
| 556 |  |  | 
| 557 |  |   // If pre-elections disabled or we already won pre-election then start regular election, | 
| 558 |  |   // otherwise pre-election is started. | 
| 559 |  |   // Pre-elections could be disable via flag, or temporarily if some nodes do not support them. | 
| 560 | 1.22M |   auto preelection = ANNOTATE_UNPROTECTED_READ(FLAGS_use_preelection) && !preelected1.22M&& | 
| 561 | 1.22M |                      disable_pre_elections_until_ < CoarseMonoClock::now()674k; | 
| 562 | 1.22M |   const char* election_name = preelection ? "pre-election"675k: "election"550k; | 
| 563 |  |  | 
| 564 | 1.22M |   LeaderElectionPtr election; | 
| 565 | 1.22M |   { | 
| 566 | 1.22M |     ReplicaState::UniqueLock lock; | 
| 567 | 1.22M |     RETURN_NOT_OK(state_->LockForConfigChange(&lock)); | 
| 568 |  |  | 
| 569 | 1.22M |     if (data.initial_election && state_->GetCurrentTermUnlocked() != 089.5k) { | 
| 570 | 1 |       LOG_WITH_PREFIX(INFO) << "Not starting initial " << election_name << " -- non zero term"; | 
| 571 | 1 |       return Status::OK(); | 
| 572 | 1 |     } | 
| 573 |  |  | 
| 574 | 1.22M |     PeerRole active_role = state_->GetActiveRoleUnlocked(); | 
| 575 | 1.22M |     if (active_role == PeerRole::LEADER) { | 
| 576 | 50 |       LOG_WITH_PREFIX(INFO) << "Not starting " << election_name << " -- already leader"; | 
| 577 | 50 |       return Status::OK(); | 
| 578 | 50 |     } | 
| 579 | 1.22M |     if (1.22M active_role == PeerRole::LEARNER1.22M|| active_role == PeerRole::READ_REPLICA) { | 
| 580 | 217 |       LOG_WITH_PREFIX(INFO) << "Not starting " << election_name << " -- role is " << active_role | 
| 581 | 217 |                             << ", pending = " << state_->IsConfigChangePendingUnlocked() | 
| 582 | 217 |                             << ", active_role=" << active_role; | 
| 583 | 217 |       return Status::OK(); | 
| 584 | 217 |     } | 
| 585 | 1.22M |     if (PREDICT_FALSE(active_role == PeerRole::NON_PARTICIPANT)) { | 
| 586 | 33 |       VLOG_WITH_PREFIX0 (1) << "Not starting " << election_name << " -- non participant"0; | 
| 587 |  |       // Avoid excessive election noise while in this state. | 
| 588 | 33 |       SnoozeFailureDetector(DO_NOT_LOG); | 
| 589 | 33 |       return STATUS_FORMAT( | 
| 590 | 33 |           IllegalState, | 
| 591 | 33 |           "Not starting $0: Node is currently a non-participant in the raft config: $1", | 
| 592 | 33 |           election_name, state_->GetActiveConfigUnlocked()); | 
| 593 | 33 |     } | 
| 594 |  |  | 
| 595 |  |     // Default is to start the election now. But if we are starting a pending election, see if | 
| 596 |  |     // there is an op id pending upon indeed and if it has been committed to the log. The op id | 
| 597 |  |     // could have been cleared if the pending election has already been started or another peer | 
| 598 |  |     // has jumped before we can start. | 
| 599 | 1.22M |     bool start_now = true; | 
| 600 | 1.22M |     if (data.pending_commit) { | 
| 601 | 0 |       const auto required_id = !data.must_be_committed_opid | 
| 602 | 0 |           ? state_->GetPendingElectionOpIdUnlocked() : data.must_be_committed_opid; | 
| 603 | 0 |       const Status advance_committed_index_status = ResultToStatus( | 
| 604 | 0 |           state_->AdvanceCommittedOpIdUnlocked(required_id, CouldStop::kFalse)); | 
| 605 | 0 |       if (!advance_committed_index_status.ok()) { | 
| 606 | 0 |         LOG(WARNING) << "Starting an " << election_name << " but the latest committed OpId is not " | 
| 607 | 0 |                         "present in this peer's log: " | 
| 608 | 0 |                      << required_id << ". " << "Status: " << advance_committed_index_status; | 
| 609 | 0 |       } | 
| 610 | 0 |       start_now = required_id.index <= state_->GetCommittedOpIdUnlocked().index; | 
| 611 | 0 |     } | 
| 612 |  |  | 
| 613 | 1.22M |     if (start_now1.22M) { | 
| 614 | 1.22M |       if (state_->HasLeaderUnlocked()) { | 
| 615 | 492k |         LOG_WITH_PREFIX(INFO) << "Fail of leader " << state_->GetLeaderUuidUnlocked() | 
| 616 | 492k |                               << " detected. Triggering leader " << election_name | 
| 617 | 492k |                               << ", mode=" << data.mode; | 
| 618 | 733k |       } else { | 
| 619 | 733k |         LOG_WITH_PREFIX(INFO) << "Triggering leader " << election_name << ", mode=" << data.mode; | 
| 620 | 733k |       } | 
| 621 |  |  | 
| 622 |  |       // Snooze to avoid the election timer firing again as much as possible. | 
| 623 |  |       // We do not disable the election timer while running an election. | 
| 624 | 1.22M |       MonoDelta timeout = LeaderElectionExpBackoffDeltaUnlocked(); | 
| 625 | 1.22M |       SnoozeFailureDetector(ALLOW_LOGGING, timeout); | 
| 626 |  |  | 
| 627 | 1.22M |       election = VERIFY_RESULT737k(CreateElectionUnlocked(data, timeout, PreElection(preelection))); | 
| 628 | 18.4E |     } else if (data.pending_commit && !data.must_be_committed_opid.empty()0) { | 
| 629 |  |       // Queue up the pending op id if specified. | 
| 630 | 0 |       state_->SetPendingElectionOpIdUnlocked(data.must_be_committed_opid); | 
| 631 | 0 |       LOG_WITH_PREFIX(INFO) | 
| 632 | 0 |           << "Leader " << election_name << " is pending upon log commitment of OpId " | 
| 633 | 0 |           << data.must_be_committed_opid; | 
| 634 | 18.4E |     } else { | 
| 635 | 18.4E |       LOG_WITH_PREFIX(INFO) << "Ignore " << __func__ << " existing wait on op id"; | 
| 636 | 18.4E |     } | 
| 637 | 1.22M |   } | 
| 638 |  |  | 
| 639 |  |   // Start the election outside the lock. | 
| 640 | 737k |   if (737k election737k) { | 
| 641 | 737k |     election->Run(); | 
| 642 | 737k |   } | 
| 643 |  |  | 
| 644 | 737k |   return Status::OK(); | 
| 645 | 1.22M | } | 
| 646 |  |  | 
| 647 |  | Result<LeaderElectionPtr> RaftConsensus::CreateElectionUnlocked( | 
| 648 | 1.22M |     const LeaderElectionData& data, MonoDelta timeout, PreElection preelection) { | 
| 649 | 1.22M |   int64_t new_term; | 
| 650 | 1.22M |   if (preelection) { | 
| 651 | 674k |     new_term = state_->GetCurrentTermUnlocked() + 1; | 
| 652 | 674k |   } else { | 
| 653 |  |     // Increment the term. | 
| 654 | 550k |     RETURN_NOT_OK(IncrementTermUnlocked()); | 
| 655 | 62.8k |     new_term = state_->GetCurrentTermUnlocked(); | 
| 656 | 62.8k |   } | 
| 657 |  |  | 
| 658 | 737k |   const RaftConfigPB& active_config = state_->GetActiveConfigUnlocked(); | 
| 659 | 737k |   LOG_WITH_PREFIX(INFO) << "Starting " << (preelection ? "pre-"674k: ""62.8k) << "election with config: " | 
| 660 | 737k |                         << active_config.ShortDebugString(); | 
| 661 |  |  | 
| 662 |  |   // Initialize the VoteCounter. | 
| 663 | 737k |   auto num_voters = CountVoters(active_config); | 
| 664 | 737k |   auto majority_size = MajoritySize(num_voters); | 
| 665 |  |  | 
| 666 |  |   // Vote for ourselves. | 
| 667 | 737k |   if (!preelection) { | 
| 668 |  |     // TODO: Consider using a separate Mutex for voting, which must sync to disk. | 
| 669 | 62.8k |     RETURN_NOT_OK(state_->SetVotedForCurrentTermUnlocked(state_->GetPeerUuid())); | 
| 670 | 62.8k |   } | 
| 671 |  |  | 
| 672 | 737k |   auto counter = std::make_unique<VoteCounter>(num_voters, majority_size); | 
| 673 | 737k |   bool duplicate; | 
| 674 | 737k |   RETURN_NOT_OK(counter->RegisterVote(state_->GetPeerUuid(), ElectionVote::kGranted, &duplicate)); | 
| 675 | 737k |   CHECK(!duplicate) << state_->LogPrefix() | 
| 676 | 71 |                     << "Inexplicable duplicate self-vote for term " | 
| 677 | 71 |                     << state_->GetCurrentTermUnlocked(); | 
| 678 |  |  | 
| 679 | 737k |   VoteRequestPB request; | 
| 680 | 737k |   request.set_ignore_live_leader(data.mode == ElectionMode::ELECT_EVEN_IF_LEADER_IS_ALIVE); | 
| 681 | 737k |   request.set_candidate_uuid(state_->GetPeerUuid()); | 
| 682 | 737k |   request.set_candidate_term(new_term); | 
| 683 | 737k |   request.set_tablet_id(state_->GetOptions().tablet_id); | 
| 684 | 737k |   request.set_preelection(preelection); | 
| 685 | 737k |   state_->GetLastReceivedOpIdUnlocked().ToPB( | 
| 686 | 737k |       request.mutable_candidate_status()->mutable_last_received()); | 
| 687 |  |  | 
| 688 | 737k |   LeaderElectionPtr result(new LeaderElection( | 
| 689 | 737k |       active_config, | 
| 690 | 737k |       peer_proxy_factory_.get(), | 
| 691 | 737k |       request, | 
| 692 | 737k |       std::move(counter), | 
| 693 | 737k |       timeout, | 
| 694 | 737k |       preelection, | 
| 695 | 737k |       data.suppress_vote_request, | 
| 696 | 737k |       std::bind(&RaftConsensus::ElectionCallback, shared_from_this(), data, _1))); | 
| 697 |  |  | 
| 698 | 737k |   if (!preelection) { | 
| 699 |  |     // Clear the pending election op id so that we won't start the same pending election again. | 
| 700 |  |     // Pre-election does not change state, so should not do it in this case. | 
| 701 | 62.8k |     state_->ClearPendingElectionOpIdUnlocked(); | 
| 702 | 62.8k |   } | 
| 703 |  |  | 
| 704 | 737k |   return result; | 
| 705 | 737k | } | 
| 706 |  |  | 
| 707 | 8 | Status RaftConsensus::WaitUntilLeaderForTests(const MonoDelta& timeout) { | 
| 708 | 8 |   MonoTime deadline = MonoTime::Now(); | 
| 709 | 8 |   deadline.AddDelta(timeout); | 
| 710 | 286 |   while (MonoTime::Now().ComesBefore(deadline)) { | 
| 711 | 286 |     if (GetLeaderStatus() == LeaderStatus::LEADER_AND_READY) { | 
| 712 | 8 |       return Status::OK(); | 
| 713 | 8 |     } | 
| 714 | 278 |     SleepFor(MonoDelta::FromMilliseconds(10)); | 
| 715 | 278 |   } | 
| 716 |  |  | 
| 717 | 0 |   return STATUS(TimedOut, Substitute("Peer $0 is not leader of tablet $1 after $2. Role: $3", | 
| 718 | 8 |                                      peer_uuid(), tablet_id(), timeout.ToString(), role())); | 
| 719 | 8 | } | 
| 720 |  |  | 
| 721 | 11.3k | string RaftConsensus::ServersInTransitionMessage() { | 
| 722 | 11.3k |   string err_msg; | 
| 723 | 11.3k |   const RaftConfigPB& active_config = state_->GetActiveConfigUnlocked(); | 
| 724 | 11.3k |   const RaftConfigPB& committed_config = state_->GetCommittedConfigUnlocked(); | 
| 725 | 11.3k |   auto servers_in_transition = CountServersInTransition(active_config); | 
| 726 | 11.3k |   auto committed_servers_in_transition = CountServersInTransition(committed_config); | 
| 727 | 11.3k |   LOG(INFO) << Substitute("Active config has $0 and committed has $1 servers in transition.", | 
| 728 | 11.3k |                           servers_in_transition, committed_servers_in_transition); | 
| 729 | 11.3k |   if (servers_in_transition != 0 || committed_servers_in_transition != 011.1k) { | 
| 730 | 225 |     err_msg = Substitute("Leader not ready to step down as there are $0 active config peers" | 
| 731 | 225 |                          " in transition, $1 in committed. Configs:\nactive=$2\ncommit=$3", | 
| 732 | 225 |                          servers_in_transition, committed_servers_in_transition, | 
| 733 | 225 |                          active_config.ShortDebugString(), committed_config.ShortDebugString()); | 
| 734 | 225 |     LOG(INFO) << err_msg; | 
| 735 | 225 |   } | 
| 736 | 11.3k |   return err_msg; | 
| 737 | 11.3k | } | 
| 738 |  |  | 
| 739 | 10.0k | Status RaftConsensus::StartStepDownUnlocked(const RaftPeerPB& peer, bool graceful) { | 
| 740 | 10.0k |   auto election_state = std::make_shared<RunLeaderElectionState>(); | 
| 741 | 10.0k |   election_state->proxy = peer_proxy_factory_->NewProxy(peer); | 
| 742 | 10.0k |   election_state->req.set_originator_uuid(state_->GetPeerUuid()); | 
| 743 | 10.0k |   election_state->req.set_dest_uuid(peer.permanent_uuid()); | 
| 744 | 10.0k |   election_state->req.set_tablet_id(state_->GetOptions().tablet_id); | 
| 745 | 10.0k |   election_state->rpc.set_invoke_callback_mode(rpc::InvokeCallbackMode::kThreadPoolHigh); | 
| 746 | 10.0k |   election_state->proxy->RunLeaderElectionAsync( | 
| 747 | 10.0k |       &election_state->req, &election_state->resp, &election_state->rpc, | 
| 748 | 10.0k |       std::bind(&RaftConsensus::RunLeaderElectionResponseRpcCallback, this, | 
| 749 | 10.0k |           election_state)); | 
| 750 |  |  | 
| 751 | 10.0k |   LOG_WITH_PREFIX(INFO) << "Transferring leadership to " << peer.permanent_uuid(); | 
| 752 |  |  | 
| 753 | 10.0k |   return BecomeReplicaUnlocked( | 
| 754 | 10.0k |       graceful ? std::string()569: peer.permanent_uuid()9.43k, MonoDelta()); | 
| 755 | 10.0k | } | 
| 756 |  |  | 
| 757 | 86 | void RaftConsensus::CheckDelayedStepDown(const Status& status) { | 
| 758 | 86 |   ReplicaState::UniqueLock lock; | 
| 759 | 86 |   auto lock_status = state_->LockForConfigChange(&lock); | 
| 760 | 86 |   if (!lock_status.ok()) { | 
| 761 | 9 |     LOG_WITH_PREFIX(INFO) << "Failed to check delayed election: " << lock_status; | 
| 762 | 9 |     return; | 
| 763 | 9 |   } | 
| 764 |  |  | 
| 765 | 77 |   if (state_->GetCurrentTermUnlocked() != delayed_step_down_.term) { | 
| 766 | 68 |     return; | 
| 767 | 68 |   } | 
| 768 |  |  | 
| 769 | 9 |   const auto& config = state_->GetActiveConfigUnlocked(); | 
| 770 | 9 |   const auto* peer = FindPeer(config, delayed_step_down_.protege); | 
| 771 | 9 |   if (peer) { | 
| 772 | 9 |     LOG_WITH_PREFIX(INFO) << "Step down in favor on not synchronized protege: " | 
| 773 | 9 |                           << delayed_step_down_.protege; | 
| 774 | 9 |     WARN_NOT_OK(StartStepDownUnlocked(*peer, delayed_step_down_.graceful), | 
| 775 | 9 |                 "Start step down failed"); | 
| 776 | 9 |   } else { | 
| 777 | 0 |     LOG_WITH_PREFIX(INFO) << "Failed to synchronize with protege " << delayed_step_down_.protege | 
| 778 | 0 |                           << " and cannot find it in config: " << config.ShortDebugString(); | 
| 779 | 0 |     delayed_step_down_.term = OpId::kUnknownTerm; | 
| 780 | 0 |   } | 
| 781 | 9 | } | 
| 782 |  |  | 
| 783 | 54.1k | Status RaftConsensus::StepDown(const LeaderStepDownRequestPB* req, LeaderStepDownResponsePB* resp) { | 
| 784 | 54.1k |   TRACE_EVENT0("consensus", "RaftConsensus::StepDown"); | 
| 785 | 54.1k |   ReplicaState::UniqueLock lock; | 
| 786 | 54.1k |   RETURN_NOT_OK(state_->LockForConfigChange(&lock)); | 
| 787 |  |  | 
| 788 |  |   // A sanity check that this request was routed to the correct RaftConsensus. | 
| 789 | 54.1k |   const auto& tablet_id = req->tablet_id(); | 
| 790 | 54.1k |   if (tablet_id != this->tablet_id()) { | 
| 791 | 0 |     resp->mutable_error()->set_code(TabletServerErrorPB::UNKNOWN_ERROR); | 
| 792 | 0 |     const auto msg = Format( | 
| 793 | 0 |         "Received a leader stepdown operation for wrong tablet id: $0, must be: $1", | 
| 794 | 0 |         tablet_id, this->tablet_id()); | 
| 795 | 0 |     LOG_WITH_PREFIX(ERROR) << msg; | 
| 796 | 0 |     StatusToPB(STATUS(IllegalState, msg), resp->mutable_error()->mutable_status()); | 
| 797 | 0 |     return Status::OK(); | 
| 798 | 0 |   } | 
| 799 |  |  | 
| 800 | 54.1k |   if (state_->GetActiveRoleUnlocked() != PeerRole::LEADER) { | 
| 801 | 42.7k |     resp->mutable_error()->set_code(TabletServerErrorPB::NOT_THE_LEADER); | 
| 802 | 42.7k |     StatusToPB(STATUS(IllegalState, "Not currently leader"), | 
| 803 | 42.7k |                resp->mutable_error()->mutable_status()); | 
| 804 |  |     // We return OK so that the tablet service won't overwrite the error code. | 
| 805 | 42.7k |     return Status::OK(); | 
| 806 | 42.7k |   } | 
| 807 |  |  | 
| 808 |  |   // The leader needs to be ready to perform a step down. There should be no PRE_VOTER in both | 
| 809 |  |   // active and committed configs - ENG-557. | 
| 810 | 11.3k |   const string err_msg = ServersInTransitionMessage(); | 
| 811 | 11.3k |   if (!err_msg.empty()) { | 
| 812 | 225 |     resp->mutable_error()->set_code(TabletServerErrorPB::LEADER_NOT_READY_TO_STEP_DOWN); | 
| 813 | 225 |     StatusToPB(STATUS(IllegalState, err_msg), resp->mutable_error()->mutable_status()); | 
| 814 | 225 |     return Status::OK(); | 
| 815 | 225 |   } | 
| 816 |  |  | 
| 817 | 11.0k |   std::string new_leader_uuid; | 
| 818 |  |   // If a new leader is nominated, find it among peers to send RunLeaderElection request. | 
| 819 |  |   // See https://ramcloud.stanford.edu/~ongaro/thesis.pdf, section 3.10 for this mechanism | 
| 820 |  |   // to transfer the leadership. | 
| 821 | 11.0k |   const bool forced = (req->has_force_step_down() && req->force_step_down()0); | 
| 822 | 11.0k |   if (req->has_new_leader_uuid()) { | 
| 823 | 10.5k |     new_leader_uuid = req->new_leader_uuid(); | 
| 824 | 10.5k |     if (!forced && !queue_->CanPeerBecomeLeader(new_leader_uuid)) { | 
| 825 | 166 |       resp->mutable_error()->set_code(TabletServerErrorPB::LEADER_NOT_READY_TO_STEP_DOWN); | 
| 826 | 166 |       StatusToPB( | 
| 827 | 166 |           STATUS(IllegalState, "Suggested peer is not caught up yet"), | 
| 828 | 166 |           resp->mutable_error()->mutable_status()); | 
| 829 |  |       // We return OK so that the tablet service won't overwrite the error code. | 
| 830 | 166 |       return Status::OK(); | 
| 831 | 166 |     } | 
| 832 | 10.5k |   } | 
| 833 |  |  | 
| 834 | 10.9k |   bool graceful_stepdown = false; | 
| 835 | 10.9k |   if (new_leader_uuid.empty() && !FLAGS_stepdown_disable_graceful_transition589&& | 
| 836 | 10.9k |       !(589 req->has_disable_graceful_transition()589&& req->disable_graceful_transition()1)) { | 
| 837 | 588 |     new_leader_uuid = queue_->GetUpToDatePeer(); | 
| 838 | 588 |     LOG_WITH_PREFIX(INFO) << "Selected up to date candidate protege leader [" << new_leader_uuid | 
| 839 | 588 |                           << "]"; | 
| 840 | 588 |     graceful_stepdown = true; | 
| 841 | 588 |   } | 
| 842 |  |  | 
| 843 | 10.9k |   const auto& local_peer_uuid = state_->GetPeerUuid(); | 
| 844 | 10.9k |   if (!new_leader_uuid.empty()10.9k) { | 
| 845 | 10.9k |     const auto leadership_transfer_description = | 
| 846 | 10.9k |         Format("tablet $0 from $1 to $2", tablet_id, local_peer_uuid, new_leader_uuid); | 
| 847 | 10.9k |     if (!forced10.9k&& new_leader_uuid == protege_leader_uuid_ && election_lost_by_protege_at_1.08k) { | 
| 848 | 923 |       const MonoDelta time_since_election_loss_by_protege = | 
| 849 | 923 |           MonoTime::Now() - election_lost_by_protege_at_; | 
| 850 | 923 |       if (time_since_election_loss_by_protege.ToMilliseconds() < | 
| 851 | 923 |               FLAGS_min_leader_stepdown_retry_interval_ms) { | 
| 852 | 908 |         LOG_WITH_PREFIX(INFO) << "Unable to execute leadership transfer for " | 
| 853 | 908 |                               << leadership_transfer_description | 
| 854 | 908 |                               << " because the intended leader already lost an election only " | 
| 855 | 908 |                               << ToString(time_since_election_loss_by_protege) << " ago (within " | 
| 856 | 908 |                               << FLAGS_min_leader_stepdown_retry_interval_ms << " ms)."; | 
| 857 | 908 |         if (req->has_new_leader_uuid()) { | 
| 858 | 903 |           LOG_WITH_PREFIX(INFO) << "Rejecting leader stepdown request for " | 
| 859 | 903 |                                 << leadership_transfer_description; | 
| 860 | 903 |           resp->mutable_error()->set_code(TabletServerErrorPB::LEADER_NOT_READY_TO_STEP_DOWN); | 
| 861 | 903 |           resp->set_time_since_election_failure_ms( | 
| 862 | 903 |               time_since_election_loss_by_protege.ToMilliseconds()); | 
| 863 | 903 |           StatusToPB( | 
| 864 | 903 |               STATUS(IllegalState, "Suggested peer lost an election recently"), | 
| 865 | 903 |               resp->mutable_error()->mutable_status()); | 
| 866 |  |           // We return OK so that the tablet service won't overwrite the error code. | 
| 867 | 903 |           return Status::OK(); | 
| 868 | 903 |         } else { | 
| 869 |  |           // we were attempting a graceful transfer of our own choice | 
| 870 |  |           // which is no longer possible | 
| 871 | 5 |           new_leader_uuid.clear(); | 
| 872 | 5 |         } | 
| 873 | 908 |       } | 
| 874 | 20 |       election_lost_by_protege_at_ = MonoTime(); | 
| 875 | 20 |     } | 
| 876 | 10.9k |   } | 
| 877 |  |  | 
| 878 | 10.0k |   if (!new_leader_uuid.empty()) { | 
| 879 | 10.0k |     const auto* peer = FindPeer(state_->GetActiveConfigUnlocked(), new_leader_uuid); | 
| 880 | 10.0k |     if (peer10.0k&& peer->member_type() == PeerMemberType::VOTER) { | 
| 881 | 10.0k |       auto timeout_ms = FLAGS_protege_synchronization_timeout_ms; | 
| 882 | 10.0k |       if (timeout_ms != 0 && | 
| 883 | 10.0k |           queue_->PeerLastReceivedOpId(new_leader_uuid) < GetLatestOpIdFromLog()10.0k) { | 
| 884 | 86 |         delayed_step_down_ = DelayedStepDown { | 
| 885 | 86 |           .term = state_->GetCurrentTermUnlocked(), | 
| 886 | 86 |           .protege = new_leader_uuid, | 
| 887 | 86 |           .graceful = graceful_stepdown, | 
| 888 | 86 |         }; | 
| 889 | 86 |         LOG_WITH_PREFIX(INFO) << "Delay step down: " << delayed_step_down_.ToString(); | 
| 890 | 86 |         step_down_check_tracker_.Schedule( | 
| 891 | 86 |             std::bind(&RaftConsensus::CheckDelayedStepDown, this, _1), | 
| 892 | 86 |             1ms * timeout_ms); | 
| 893 | 86 |         return Status::OK(); | 
| 894 | 86 |       } | 
| 895 |  |  | 
| 896 | 9.92k |       return StartStepDownUnlocked(*peer, graceful_stepdown); | 
| 897 | 10.0k |     } | 
| 898 |  |  | 
| 899 | 10 |     LOG_WITH_PREFIX(WARNING) << "New leader " << new_leader_uuid << " not found."; | 
| 900 | 10 |     if (req->has_new_leader_uuid()) { | 
| 901 | 0 |       resp->mutable_error()->set_code(TabletServerErrorPB::LEADER_NOT_READY_TO_STEP_DOWN); | 
| 902 | 0 |       StatusToPB( | 
| 903 | 0 |           STATUS(IllegalState, "New leader not found among peers"), | 
| 904 | 0 |           resp->mutable_error()->mutable_status()); | 
| 905 |  |       // We return OK so that the tablet service won't overwrite the error code. | 
| 906 | 0 |       return Status::OK(); | 
| 907 | 10 |     } else { | 
| 908 |  |       // we were attempting a graceful transfer of our own choice | 
| 909 |  |       // which is no longer possible | 
| 910 | 10 |       new_leader_uuid.clear(); | 
| 911 | 10 |     } | 
| 912 | 10 |   } | 
| 913 |  |  | 
| 914 | 19 |   if (15 graceful_stepdown15) { | 
| 915 | 19 |     new_leader_uuid.clear(); | 
| 916 | 19 |   } | 
| 917 | 15 |   RETURN_NOT_OK(BecomeReplicaUnlocked(new_leader_uuid, MonoDelta())); | 
| 918 |  |  | 
| 919 | 15 |   return Status::OK(); | 
| 920 | 15 | } | 
| 921 |  |  | 
| 922 | 77 | Status RaftConsensus::ElectionLostByProtege(const std::string& election_lost_by_uuid) { | 
| 923 | 77 |   if (election_lost_by_uuid.empty()) { | 
| 924 | 0 |     return STATUS(InvalidArgument, "election_lost_by_uuid could not be empty"); | 
| 925 | 0 |   } | 
| 926 |  |  | 
| 927 | 77 |   auto start_election = false; | 
| 928 | 77 |   { | 
| 929 | 77 |     ReplicaState::UniqueLock lock; | 
| 930 | 77 |     RETURN_NOT_OK(state_->LockForConfigChange(&lock)); | 
| 931 | 77 |     if (election_lost_by_uuid == protege_leader_uuid_) { | 
| 932 | 69 |       LOG_WITH_PREFIX(INFO) << "Our protege " << election_lost_by_uuid | 
| 933 | 69 |                             << ", lost election. Has leader: " | 
| 934 | 69 |                             << state_->HasLeaderUnlocked(); | 
| 935 | 69 |       withhold_election_start_until_.store(MonoTime::Min(), std::memory_order_relaxed); | 
| 936 | 69 |       election_lost_by_protege_at_ = MonoTime::Now(); | 
| 937 |  |  | 
| 938 | 69 |       start_election = !state_->HasLeaderUnlocked(); | 
| 939 | 69 |     } | 
| 940 | 77 |   } | 
| 941 |  |  | 
| 942 | 77 |   if (start_election) { | 
| 943 | 68 |     return StartElection({ElectionMode::NORMAL_ELECTION}); | 
| 944 | 68 |   } | 
| 945 |  |  | 
| 946 | 9 |   return Status::OK(); | 
| 947 | 77 | } | 
| 948 |  |  | 
| 949 | 10.9k | void RaftConsensus::WithholdElectionAfterStepDown(const std::string& protege_uuid) { | 
| 950 | 10.9k |   DCHECK(state_->IsLocked()); | 
| 951 | 10.9k |   protege_leader_uuid_ = protege_uuid; | 
| 952 | 10.9k |   auto timeout = MonoDelta::FromMilliseconds( | 
| 953 | 10.9k |       FLAGS_leader_failure_max_missed_heartbeat_periods * | 
| 954 | 10.9k |       FLAGS_raft_heartbeat_interval_ms); | 
| 955 | 10.9k |   if (!protege_uuid.empty()) { | 
| 956 |  |     // Actually we have 2 kinds of step downs. | 
| 957 |  |     // 1) We step down in favor of some protege. | 
| 958 |  |     // 2) We step down because term was advanced or just started. | 
| 959 |  |     // In second case we should not withhold election for a long period of time. | 
| 960 | 9.44k |     timeout *= FLAGS_after_stepdown_delay_election_multiplier; | 
| 961 | 9.44k |   } | 
| 962 | 10.9k |   auto deadline = MonoTime::Now() + timeout; | 
| 963 | 10.9k |   VLOG(2) << "Withholding election for " << timeout0; | 
| 964 | 10.9k |   withhold_election_start_until_.store(deadline, std::memory_order_release); | 
| 965 | 10.9k |   election_lost_by_protege_at_ = MonoTime(); | 
| 966 | 10.9k | } | 
| 967 |  |  | 
| 968 |  | void RaftConsensus::RunLeaderElectionResponseRpcCallback( | 
| 969 | 10.0k |     shared_ptr<RunLeaderElectionState> election_state) { | 
| 970 |  |   // Check for RPC errors. | 
| 971 | 10.0k |   if (!election_state->rpc.status().ok()) { | 
| 972 | 15 |     LOG_WITH_PREFIX(WARNING) << "RPC error from RunLeaderElection() call to peer " | 
| 973 | 15 |                              << election_state->req.dest_uuid() << ": " | 
| 974 | 15 |                              << election_state->rpc.status(); | 
| 975 |  |   // Check for tablet errors. | 
| 976 | 9.99k |   } else if (election_state->resp.has_error()) { | 
| 977 | 0 |     LOG_WITH_PREFIX(WARNING) << "Tablet error from RunLeaderElection() call to peer " | 
| 978 | 0 |                              << election_state->req.dest_uuid() << ": " | 
| 979 | 0 |                              << StatusFromPB(election_state->resp.error().status()); | 
| 980 | 0 |   } | 
| 981 | 10.0k | } | 
| 982 |  |  | 
| 983 | 617k | void RaftConsensus::ReportFailureDetectedTask() { | 
| 984 | 617k |   auto scope_exit = ScopeExit([this] { | 
| 985 | 617k |     outstanding_report_failure_task_.clear(std::memory_order_release); | 
| 986 | 617k |   }); | 
| 987 |  |  | 
| 988 | 617k |   MonoTime now; | 
| 989 | 617k |   for (;;) { | 
| 990 |  |     // Do not start election for an extended period of time if we were recently stepped down. | 
| 991 | 617k |     auto old_value = withhold_election_start_until_.load(std::memory_order_acquire); | 
| 992 |  |  | 
| 993 | 617k |     if (old_value == MonoTime::Min()) { | 
| 994 | 616k |       break; | 
| 995 | 616k |     } | 
| 996 |  |  | 
| 997 | 1.39k |     if (1.39k !now.Initialized()1.39k) { | 
| 998 | 1.39k |       now = MonoTime::Now(); | 
| 999 | 1.39k |     } | 
| 1000 |  |  | 
| 1001 | 1.39k |     if (now < old_value) { | 
| 1002 | 361 |       VLOG(1) << "Skipping election due to delayed timeout for " << (old_value - now)0; | 
| 1003 | 361 |       return; | 
| 1004 | 361 |     } | 
| 1005 |  |  | 
| 1006 |  |     // If we ever stepped down and then delayed election start did get scheduled, reset that we | 
| 1007 |  |     // are out of that extra delay mode. | 
| 1008 | 1.03k |     if (withhold_election_start_until_.compare_exchange_weak( | 
| 1009 | 1.03k |         old_value, MonoTime::Min(), std::memory_order_release)) { | 
| 1010 | 1.03k |       break; | 
| 1011 | 1.03k |     } | 
| 1012 | 1.03k |   } | 
| 1013 |  |  | 
| 1014 |  |   // Start an election. | 
| 1015 | 617k |   LOG_WITH_PREFIX(INFO) << "ReportFailDetected: Starting NORMAL_ELECTION..."; | 
| 1016 | 617k |   Status s = StartElection({ElectionMode::NORMAL_ELECTION}); | 
| 1017 | 617k |   if (PREDICT_FALSE(!s.ok())) { | 
| 1018 | 33 |     LOG_WITH_PREFIX(WARNING) << "Failed to trigger leader election: " << s.ToString(); | 
| 1019 | 33 |   } | 
| 1020 | 617k | } | 
| 1021 |  |  | 
| 1022 | 818k | void RaftConsensus::ReportFailureDetected() { | 
| 1023 | 818k |   if (FLAGS_raft_disallow_concurrent_outstanding_report_failure_tasks && | 
| 1024 | 818k |       outstanding_report_failure_task_.test_and_set(std::memory_order_acq_rel)818k) { | 
| 1025 | 201k |     VLOG(4) | 
| 1026 | 2 |         << "Returning from ReportFailureDetected as there is already an outstanding report task."; | 
| 1027 | 617k |   } else { | 
| 1028 |  |     // We're running on a timer thread; start an election on a different thread pool. | 
| 1029 | 617k |     auto s = raft_pool_token_->SubmitFunc( | 
| 1030 | 617k |         std::bind(&RaftConsensus::ReportFailureDetectedTask, shared_from_this())); | 
| 1031 | 617k |     WARN_NOT_OK(s, "Failed to submit failure detected task"); | 
| 1032 | 617k |     if (!s.ok()) { | 
| 1033 | 0 |       outstanding_report_failure_task_.clear(std::memory_order_release); | 
| 1034 | 0 |     } | 
| 1035 | 617k |   } | 
| 1036 | 818k | } | 
| 1037 |  |  | 
| 1038 | 62.0k | Status RaftConsensus::BecomeLeaderUnlocked() { | 
| 1039 | 62.0k |   DCHECK(state_->IsLocked()); | 
| 1040 | 62.0k |   TRACE_EVENT2("consensus", "RaftConsensus::BecomeLeaderUnlocked", | 
| 1041 | 62.0k |                "peer", peer_uuid(), | 
| 1042 | 62.0k |                "tablet", tablet_id()); | 
| 1043 | 62.0k |   LOG_WITH_PREFIX(INFO) << "Becoming Leader. State: " << state_->ToStringUnlocked(); | 
| 1044 |  |  | 
| 1045 |  |   // Disable FD while we are leader. | 
| 1046 | 62.0k |   DisableFailureDetector(); | 
| 1047 |  |  | 
| 1048 |  |   // Don't vote for anyone if we're a leader. | 
| 1049 | 62.0k |   withhold_votes_until_.store(MonoTime::Max(), std::memory_order_release); | 
| 1050 |  |  | 
| 1051 | 62.0k |   queue_->RegisterObserver(this); | 
| 1052 |  |  | 
| 1053 |  |   // Refresh queue and peers before initiating NO_OP. | 
| 1054 | 62.0k |   RefreshConsensusQueueAndPeersUnlocked(); | 
| 1055 |  |  | 
| 1056 |  |   // Initiate a NO_OP operation that is sent at the beginning of every term | 
| 1057 |  |   // change in raft. | 
| 1058 | 62.0k |   auto replicate = std::make_shared<ReplicateMsg>(); | 
| 1059 | 62.0k |   replicate->set_op_type(NO_OP); | 
| 1060 | 62.0k |   replicate->mutable_noop_request(); // Define the no-op request field. | 
| 1061 | 62.0k |   LOG(INFO) << "Sending NO_OP at op " << state_->GetCommittedOpIdUnlocked(); | 
| 1062 |  |   // This committed OpId is used for tablet bootstrap for RocksDB-backed tables. | 
| 1063 | 62.0k |   state_->GetCommittedOpIdUnlocked().ToPB(replicate->mutable_committed_op_id()); | 
| 1064 |  |  | 
| 1065 |  |   // TODO: We should have no-ops (?) and config changes be COMMIT_WAIT | 
| 1066 |  |   // operations. See KUDU-798. | 
| 1067 |  |   // Note: This hybrid_time has no meaning from a serialization perspective | 
| 1068 |  |   // because this method is not executed on the TabletPeer's prepare thread. | 
| 1069 | 62.0k |   replicate->set_hybrid_time(clock_->Now().ToUint64()); | 
| 1070 |  |  | 
| 1071 | 62.0k |   auto round = make_scoped_refptr<ConsensusRound>(this, replicate); | 
| 1072 | 62.0k |   round->SetCallback(MakeNonTrackedRoundCallback(round.get(), | 
| 1073 | 62.0k |       [this, term = state_->GetCurrentTermUnlocked()](const Status& status) { | 
| 1074 |  |     // Set 'Leader is ready to serve' flag only for committed NoOp operation | 
| 1075 |  |     // and only if the term is up-to-date. | 
| 1076 |  |     // It is guaranteed that successful notification is called only while holding replicate state | 
| 1077 |  |     // mutex. | 
| 1078 | 61.8k |     if (status.ok()61.8k&& term == state_->GetCurrentTermUnlocked()61.7k) { | 
| 1079 | 61.8k |       state_->SetLeaderNoOpCommittedUnlocked(true); | 
| 1080 | 61.8k |     } | 
| 1081 | 61.8k |   })); | 
| 1082 | 62.0k |   RETURN_NOT_OK(AppendNewRoundToQueueUnlocked(round)); | 
| 1083 |  |  | 
| 1084 | 62.0k |   peer_manager_->SignalRequest(RequestTriggerMode::kNonEmptyOnly); | 
| 1085 |  |  | 
| 1086 |  |   // Set the timestamp to max uint64_t so that every time this metric is queried, the returned | 
| 1087 |  |   // lag is 0. We will need to restore the timestamp once this peer steps down. | 
| 1088 | 62.0k |   follower_last_update_time_ms_metric_->UpdateTimestampInMilliseconds( | 
| 1089 | 62.0k |       std::numeric_limits<int64_t>::max()); | 
| 1090 | 62.0k |   is_raft_leader_metric_->set_value(1); | 
| 1091 |  |  | 
| 1092 | 62.0k |   return Status::OK(); | 
| 1093 | 62.0k | } | 
| 1094 |  |  | 
| 1095 |  | Status RaftConsensus::BecomeReplicaUnlocked( | 
| 1096 | 161k |     const std::string& new_leader_uuid, MonoDelta initial_fd_wait) { | 
| 1097 | 161k |   LOG_WITH_PREFIX(INFO) | 
| 1098 | 161k |       << "Becoming Follower/Learner. State: " << state_->ToStringUnlocked() | 
| 1099 | 161k |       << ", new leader: " << new_leader_uuid << ", initial_fd_wait: " << initial_fd_wait; | 
| 1100 |  |  | 
| 1101 | 161k |   if (state_->GetActiveRoleUnlocked() == PeerRole::LEADER) { | 
| 1102 | 10.9k |     WithholdElectionAfterStepDown(new_leader_uuid); | 
| 1103 | 10.9k |   } | 
| 1104 |  |  | 
| 1105 | 161k |   state_->ClearLeaderUnlocked(); | 
| 1106 |  |  | 
| 1107 |  |   // FD should be running while we are a follower. | 
| 1108 | 161k |   EnableFailureDetector(initial_fd_wait); | 
| 1109 |  |  | 
| 1110 |  |   // Now that we're a replica, we can allow voting for other nodes. | 
| 1111 | 161k |   withhold_votes_until_.store(MonoTime::Min(), std::memory_order_release); | 
| 1112 |  |  | 
| 1113 | 161k |   const Status unregister_observer_status = queue_->UnRegisterObserver(this); | 
| 1114 | 161k |   if (!unregister_observer_status.IsNotFound()) { | 
| 1115 | 10.9k |     RETURN_NOT_OK(unregister_observer_status); | 
| 1116 | 10.9k |   } | 
| 1117 |  |   // Deregister ourselves from the queue. We don't care what get's replicated, since | 
| 1118 |  |   // we're stepping down. | 
| 1119 | 161k |   queue_->SetNonLeaderMode(); | 
| 1120 |  |  | 
| 1121 | 161k |   peer_manager_->Close(); | 
| 1122 |  |  | 
| 1123 |  |   // TODO: https://github.com/yugabyte/yugabyte-db/issues/5522. Add unit tests for this metric. | 
| 1124 |  |   // We update the follower lag metric timestamp here because it's possible that a leader | 
| 1125 |  |   // that step downs could get partitioned before it receives any replicate message. If we | 
| 1126 |  |   // don't update the timestamp here, and the above scenario happens, the metric will keep the | 
| 1127 |  |   // uint64_t max value, which would make the metric return a 0 lag every time it is queried, | 
| 1128 |  |   // even though that's not the case. | 
| 1129 | 161k |   follower_last_update_time_ms_metric_->UpdateTimestampInMilliseconds( | 
| 1130 | 161k |       clock_->Now().GetPhysicalValueMicros() / 1000); | 
| 1131 | 161k |   is_raft_leader_metric_->set_value(0); | 
| 1132 |  |  | 
| 1133 | 161k |   return Status::OK(); | 
| 1134 | 161k | } | 
| 1135 |  |  | 
| 1136 | 195 | Status RaftConsensus::TEST_Replicate(const ConsensusRoundPtr& round) { | 
| 1137 | 195 |   return ReplicateBatch({ round }); | 
| 1138 | 195 | } | 
| 1139 |  |  | 
| 1140 | 4.93M | Status RaftConsensus::ReplicateBatch(const ConsensusRounds& rounds) { | 
| 1141 | 4.93M |   size_t processed_rounds = 0; | 
| 1142 | 4.93M |   auto status = DoReplicateBatch(rounds, &processed_rounds); | 
| 1143 | 4.93M |   if (!status.ok()) { | 
| 1144 | 84 |     VLOG_WITH_PREFIX_AND_FUNC0(1) | 
| 1145 | 0 |         << "Failed with status " << status << ", treating all " << rounds.size() | 
| 1146 | 0 |         << " operations as failed with that status"; | 
| 1147 |  |     // Treat all the operations in the batch as failed. | 
| 1148 | 147 |     for (size_t i = rounds.size(); i != processed_rounds;) { | 
| 1149 | 63 |       rounds[--i]->callback()->ReplicationFailed(status); | 
| 1150 | 63 |     } | 
| 1151 | 84 |   } | 
| 1152 | 4.93M |   return status; | 
| 1153 | 4.93M | } | 
| 1154 |  |  | 
| 1155 | 4.93M | Status RaftConsensus::DoReplicateBatch(const ConsensusRounds& rounds, size_t* processed_rounds) { | 
| 1156 | 4.93M |   RETURN_NOT_OK(ExecuteHook(PRE_REPLICATE)); | 
| 1157 | 4.93M |   { | 
| 1158 | 4.93M |     ReplicaState::UniqueLock lock; | 
| 1159 | 4.93M | #ifndef NDEBUG | 
| 1160 | 5.04M |     for (const auto& round : rounds) { | 
| 1161 | 18.4E |       DCHECK(!round->replicate_msg()->has_id()) << "Should not have an OpId yet: " | 
| 1162 | 18.4E |                                                 << round->replicate_msg()->DebugString(); | 
| 1163 | 5.04M |     } | 
| 1164 | 4.93M | #endif | 
| 1165 | 4.93M |     RETURN_NOT_OK(state_->LockForReplicate(&lock)); | 
| 1166 | 4.93M |     auto current_term = state_->GetCurrentTermUnlocked(); | 
| 1167 | 4.93M |     if (current_term == delayed_step_down_.term) { | 
| 1168 | 15 |       return STATUS(Aborted, "Rejecting because of planned step down"); | 
| 1169 | 15 |     } | 
| 1170 |  |  | 
| 1171 | 5.04M |     for (const auto& round : rounds)4.93M{ | 
| 1172 | 5.04M |       RETURN_NOT_OK(round->CheckBoundTerm(current_term)); | 
| 1173 | 5.04M |     } | 
| 1174 | 4.93M |     auto status = AppendNewRoundsToQueueUnlocked(rounds, processed_rounds); | 
| 1175 | 4.93M |     if (!status.ok()) { | 
| 1176 |  |       // In general we have 3 kinds of rounds in case of failure: | 
| 1177 |  |       // 1) Rounds that were rejected by retryable requests. | 
| 1178 |  |       //    We should not call ReplicationFinished for them. | 
| 1179 |  |       // 2) Rounds that were registered with retryable requests. | 
| 1180 |  |       //    We should call state_->NotifyReplicationFinishedUnlocked for them. | 
| 1181 |  |       // 3) Rounds that were not registered with retryable requests. | 
| 1182 |  |       //    We should call ReplicationFinished directly for them. Could do it after releasing | 
| 1183 |  |       //    the lock. I.e. in ReplicateBatch. | 
| 1184 |  |       // | 
| 1185 |  |       // (3) is all rounds starting with index *processed_rounds and above. | 
| 1186 |  |       // For (1) we reset bound term, so could use it to distinguish between (1) and (2). | 
| 1187 | 58 |       for (size_t i = *processed_rounds; i != 0;) { | 
| 1188 | 23 |         --i; | 
| 1189 | 23 |         if (rounds[i]->bound_term() == OpId::kUnknownTerm) { | 
| 1190 |  |           // Already rejected by retryable requests. | 
| 1191 | 0 |           continue; | 
| 1192 | 0 |         } | 
| 1193 | 23 |         state_->NotifyReplicationFinishedUnlocked( | 
| 1194 | 23 |             rounds[i], status, OpId::kUnknownTerm, /* applied_op_ids */ nullptr); | 
| 1195 | 23 |       } | 
| 1196 | 35 |       return status; | 
| 1197 | 35 |     } | 
| 1198 | 4.93M |   } | 
| 1199 |  |  | 
| 1200 | 4.93M |   peer_manager_->SignalRequest(RequestTriggerMode::kNonEmptyOnly); | 
| 1201 | 4.93M |   RETURN_NOT_OK(ExecuteHook(POST_REPLICATE)); | 
| 1202 | 4.93M |   return Status::OK(); | 
| 1203 | 4.93M | } | 
| 1204 |  |  | 
| 1205 | 67.9k | Status RaftConsensus::AppendNewRoundToQueueUnlocked(const scoped_refptr<ConsensusRound>& round) { | 
| 1206 | 67.9k |   size_t processed_rounds = 0; | 
| 1207 | 67.9k |   return AppendNewRoundsToQueueUnlocked({ round }, &processed_rounds); | 
| 1208 | 67.9k | } | 
| 1209 |  |  | 
| 1210 | 4.99M | Status RaftConsensus::CheckLeasesUnlocked(const ConsensusRoundPtr& round) { | 
| 1211 | 4.99M |   auto op_type = round->replicate_msg()->op_type(); | 
| 1212 |  |   // When we do not have a hybrid time leader lease we allow 2 operation types to be added to RAFT. | 
| 1213 |  |   // NO_OP - because even empty heartbeat messages could be used to obtain the lease. | 
| 1214 |  |   // CHANGE_CONFIG_OP - because we should be able to update consensus even w/o lease. | 
| 1215 |  |   // Both of them are safe, since they don't affect user reads or writes. | 
| 1216 | 4.99M |   if (IsConsensusOnlyOperation(op_type)) { | 
| 1217 | 67.9k |     return Status::OK(); | 
| 1218 | 67.9k |   } | 
| 1219 |  |  | 
| 1220 | 4.92M |   auto lease_status = state_->GetHybridTimeLeaseStatusAtUnlocked( | 
| 1221 | 4.92M |       HybridTime(round->replicate_msg()->hybrid_time()).GetPhysicalValueMicros()); | 
| 1222 | 4.92M |   static_assert(LeaderLeaseStatus_ARRAYSIZE == 3, "Please update logic below to adapt new state"); | 
| 1223 | 4.92M |   if (lease_status == LeaderLeaseStatus::OLD_LEADER_MAY_HAVE_LEASE) { | 
| 1224 | 0 |     return STATUS_FORMAT(LeaderHasNoLease, | 
| 1225 | 0 |                          "Old leader may have hybrid time lease, while adding: $0", | 
| 1226 | 0 |                          OperationType_Name(op_type)); | 
| 1227 | 0 |   } | 
| 1228 | 4.92M |   lease_status = state_->GetLeaderLeaseStatusUnlocked(nullptr); | 
| 1229 | 4.92M |   if (lease_status == LeaderLeaseStatus::OLD_LEADER_MAY_HAVE_LEASE) { | 
| 1230 | 0 |     return STATUS_FORMAT(LeaderHasNoLease, | 
| 1231 | 0 |                          "Old leader may have lease, while adding: $0", | 
| 1232 | 0 |                          OperationType_Name(op_type)); | 
| 1233 | 0 |   } | 
| 1234 |  |  | 
| 1235 | 4.92M |   return Status::OK(); | 
| 1236 | 4.92M | } | 
| 1237 |  |  | 
| 1238 |  | Status RaftConsensus::AppendNewRoundsToQueueUnlocked( | 
| 1239 | 4.99M |     const ConsensusRounds& rounds, size_t* processed_rounds) { | 
| 1240 | 4.99M |   SCHECK(!rounds.empty(), InvalidArgument, "Attempted to add zero rounds to the queue"); | 
| 1241 |  |  | 
| 1242 | 4.99M |   auto role = state_->GetActiveRoleUnlocked(); | 
| 1243 | 4.99M |   if (role != PeerRole::LEADER) { | 
| 1244 | 12 |     return STATUS_FORMAT(IllegalState, "Appending new rounds while not the leader but $0", | 
| 1245 | 12 |                          PeerRole_Name(role)); | 
| 1246 | 12 |   } | 
| 1247 |  |  | 
| 1248 | 4.99M |   std::vector<ReplicateMsgPtr> replicate_msgs; | 
| 1249 | 4.99M |   replicate_msgs.reserve(rounds.size()); | 
| 1250 | 4.99M |   const OpId& committed_op_id = state_->GetCommittedOpIdUnlocked(); | 
| 1251 |  |  | 
| 1252 | 5.11M |   for (const auto& round : rounds) { | 
| 1253 | 5.11M |     ++*processed_rounds; | 
| 1254 |  |  | 
| 1255 | 5.11M |     if (round->replicate_msg()->op_type() == OperationType::WRITE_OP && | 
| 1256 | 5.11M |         !state_->RegisterRetryableRequest(round)3.02M) { | 
| 1257 | 1 |       round->BindToTerm(OpId::kUnknownTerm); // Mark round as non replicating | 
| 1258 | 1 |       continue; | 
| 1259 | 1 |     } | 
| 1260 |  |  | 
| 1261 | 5.11M |     OpId op_id = state_->NewIdUnlocked(); | 
| 1262 |  |  | 
| 1263 |  |     // We use this callback to transform write operations by substituting the hybrid_time into | 
| 1264 |  |     // the write batch inside the write operation. | 
| 1265 |  |     // | 
| 1266 |  |     // TODO: we could allocate multiple HybridTimes in batch, only reading system clock once. | 
| 1267 | 5.11M |     round->callback()->AddedToLeader(op_id, committed_op_id); | 
| 1268 |  |  | 
| 1269 | 5.11M |     Status s = state_->AddPendingOperation(round, OperationMode::kLeader); | 
| 1270 | 5.11M |     if (!s.ok()) { | 
| 1271 | 23 |       RollbackIdAndDeleteOpId(round->replicate_msg(), false /* should_exists */); | 
| 1272 |  |  | 
| 1273 |  |       // Iterate rounds in the reverse order and release ids. | 
| 1274 | 23 |       while (!replicate_msgs.empty()) { | 
| 1275 | 0 |         RollbackIdAndDeleteOpId(replicate_msgs.back(), true /* should_exists */); | 
| 1276 | 0 |         replicate_msgs.pop_back(); | 
| 1277 | 0 |       } | 
| 1278 | 23 |       return s; | 
| 1279 | 23 |     } | 
| 1280 |  |  | 
| 1281 | 5.11M |     replicate_msgs.push_back(round->replicate_msg()); | 
| 1282 | 5.11M |   } | 
| 1283 |  |  | 
| 1284 | 4.99M |   if (replicate_msgs.empty()) { | 
| 1285 | 1 |     return Status::OK(); | 
| 1286 | 1 |   } | 
| 1287 |  |  | 
| 1288 |  |   // Could check lease just for the latest operation in batch, because it will have greatest | 
| 1289 |  |   // hybrid time, so requires most advanced lease. | 
| 1290 | 4.99M |   auto s = CheckLeasesUnlocked(rounds.back()); | 
| 1291 |  |  | 
| 1292 | 4.99M |   if (s.ok()) { | 
| 1293 | 4.99M |     s = queue_->AppendOperations(replicate_msgs, committed_op_id, state_->Clock().Now()); | 
| 1294 | 4.99M |   } | 
| 1295 |  |  | 
| 1296 |  |   // Handle Status::ServiceUnavailable(), which means the queue is full. | 
| 1297 |  |   // TODO: what are we doing about other errors here? Should we also release OpIds in those cases? | 
| 1298 | 4.99M |   if (PREDICT_FALSE(!s.ok())) { | 
| 1299 | 0 |     LOG_WITH_PREFIX(WARNING) << "Could not append replicate request: " << s << ", queue status: " | 
| 1300 | 0 |                              << queue_->ToString(); | 
| 1301 | 0 |     for (auto iter = replicate_msgs.rbegin(); iter != replicate_msgs.rend(); ++iter) { | 
| 1302 | 0 |       RollbackIdAndDeleteOpId(*iter, true /* should_exists */); | 
| 1303 |  |       // TODO Possibly evict a dangling peer from the configuration here. | 
| 1304 |  |       // TODO count of number of ops failed due to consensus queue overflow. | 
| 1305 | 0 |     } | 
| 1306 |  | 
 | 
| 1307 | 0 |     return s.CloneAndPrepend("Unable to append operations to consensus queue"); | 
| 1308 | 0 |   } | 
| 1309 |  |  | 
| 1310 | 4.99M |   state_->UpdateLastReceivedOpIdUnlocked(replicate_msgs.back()->id()); | 
| 1311 | 4.99M |   return Status::OK(); | 
| 1312 | 4.99M | } | 
| 1313 |  |  | 
| 1314 |  | void RaftConsensus::MajorityReplicatedNumSSTFilesChanged( | 
| 1315 | 2.06k |     uint64_t majority_replicated_num_sst_files) { | 
| 1316 | 2.06k |   majority_num_sst_files_.store(majority_replicated_num_sst_files, std::memory_order_release); | 
| 1317 | 2.06k | } | 
| 1318 |  |  | 
| 1319 |  | void RaftConsensus::UpdateMajorityReplicated( | 
| 1320 |  |     const MajorityReplicatedData& majority_replicated_data, OpId* committed_op_id, | 
| 1321 | 34.7M |     OpId* last_applied_op_id) { | 
| 1322 | 34.7M |   TEST_PAUSE_IF_FLAG(TEST_pause_update_majority_replicated); | 
| 1323 | 34.7M |   ReplicaState::UniqueLock lock; | 
| 1324 | 34.7M |   Status s = state_->LockForMajorityReplicatedIndexUpdate(&lock); | 
| 1325 | 34.7M |   if (PREDICT_FALSE(!s.ok())) { | 
| 1326 | 128 |     LOG_WITH_PREFIX(WARNING) | 
| 1327 | 128 |         << "Unable to take state lock to update committed index: " | 
| 1328 | 128 |         << s.ToString(); | 
| 1329 | 128 |     return; | 
| 1330 | 128 |   } | 
| 1331 |  |  | 
| 1332 | 34.7M |   EnumBitSet<SetMajorityReplicatedLeaseExpirationFlag> flags; | 
| 1333 | 34.7M |   if (GetAtomicFlag(&FLAGS_enable_lease_revocation)) { | 
| 1334 | 34.7M |     if (!state_->old_leader_lease().holder_uuid.empty() && | 
| 1335 | 34.7M |         queue_->PeerAcceptedOurLease(state_->old_leader_lease().holder_uuid)31.7k) { | 
| 1336 | 10.2k |       flags.Set(SetMajorityReplicatedLeaseExpirationFlag::kResetOldLeaderLease); | 
| 1337 | 10.2k |     } | 
| 1338 |  |  | 
| 1339 | 34.7M |     if (!state_->old_leader_ht_lease().holder_uuid.empty() && | 
| 1340 | 34.7M |         queue_->PeerAcceptedOurLease(state_->old_leader_ht_lease().holder_uuid)281k) { | 
| 1341 | 10.7k |       flags.Set(SetMajorityReplicatedLeaseExpirationFlag::kResetOldLeaderHtLease); | 
| 1342 | 10.7k |     } | 
| 1343 | 34.7M |   } | 
| 1344 |  |  | 
| 1345 | 34.7M |   state_->SetMajorityReplicatedLeaseExpirationUnlocked(majority_replicated_data, flags); | 
| 1346 | 34.7M |   leader_lease_wait_cond_.notify_all(); | 
| 1347 |  |  | 
| 1348 | 34.7M |   VLOG_WITH_PREFIX15.0k(1) << "Marking majority replicated up to " | 
| 1349 | 15.0k |       << majority_replicated_data.ToString(); | 
| 1350 | 34.7M |   TRACE("Marking majority replicated up to $0", majority_replicated_data.op_id.ToString()); | 
| 1351 | 34.7M |   bool committed_index_changed = false; | 
| 1352 | 34.7M |   s = state_->UpdateMajorityReplicatedUnlocked( | 
| 1353 | 34.7M |       majority_replicated_data.op_id, committed_op_id, &committed_index_changed, | 
| 1354 | 34.7M |       last_applied_op_id); | 
| 1355 | 34.7M |   auto leader_state = state_->GetLeaderStateUnlocked(); | 
| 1356 | 34.7M |   if (leader_state.ok() && leader_state.status == LeaderStatus::LEADER_AND_READY34.6M) { | 
| 1357 | 34.6M |     state_->context()->MajorityReplicated(); | 
| 1358 | 34.6M |   } | 
| 1359 | 34.7M |   if (PREDICT_FALSE(!s.ok())) { | 
| 1360 | 0 |     string msg = Format("Unable to mark committed up to $0: $1", majority_replicated_data.op_id, s); | 
| 1361 | 0 |     TRACE(msg); | 
| 1362 | 0 |     LOG_WITH_PREFIX(WARNING) << msg; | 
| 1363 | 0 |     return; | 
| 1364 | 0 |   } | 
| 1365 |  |  | 
| 1366 | 34.7M |   majority_num_sst_files_.store(majority_replicated_data.num_sst_files, std::memory_order_release); | 
| 1367 |  |  | 
| 1368 | 34.7M |   if (!majority_replicated_data.peer_got_all_ops.empty() && | 
| 1369 | 34.7M |       delayed_step_down_.term == state_->GetCurrentTermUnlocked()23.3M&& | 
| 1370 | 34.7M |       majority_replicated_data.peer_got_all_ops == delayed_step_down_.protege187) { | 
| 1371 | 75 |     LOG_WITH_PREFIX(INFO) << "Protege synchronized: " << delayed_step_down_.ToString(); | 
| 1372 | 75 |     const auto* peer = FindPeer(state_->GetActiveConfigUnlocked(), delayed_step_down_.protege); | 
| 1373 | 75 |     if (peer) { | 
| 1374 | 75 |       WARN_NOT_OK(StartStepDownUnlocked(*peer, delayed_step_down_.graceful), | 
| 1375 | 75 |                   "Start step down failed"); | 
| 1376 | 75 |     } | 
| 1377 | 75 |     delayed_step_down_.term = OpId::kUnknownTerm; | 
| 1378 | 75 |   } | 
| 1379 |  |  | 
| 1380 | 34.7M |   if (committed_index_changed && | 
| 1381 | 34.7M |       state_->GetActiveRoleUnlocked() == PeerRole::LEADER4.68M) { | 
| 1382 |  |     // If all operations were just committed, and we don't have pending operations, then | 
| 1383 |  |     // we write an empty batch that contains committed index. | 
| 1384 |  |     // This affects only our local log, because followers have different logic in this scenario. | 
| 1385 | 4.69M |     if (*committed_op_id == state_->GetLastReceivedOpIdUnlocked()) { | 
| 1386 | 4.28M |       auto status = queue_->AppendOperations({}, *committed_op_id, state_->Clock().Now()); | 
| 1387 | 18.4E |       LOG_IF_WITH_PREFIX(DFATAL, !status.ok() && !status.IsServiceUnavailable()) | 
| 1388 | 18.4E |           << "Failed to append empty batch: " << status; | 
| 1389 | 4.28M |     } | 
| 1390 |  |  | 
| 1391 | 4.69M |     lock.unlock(); | 
| 1392 |  |     // No need to hold the lock while calling SignalRequest. | 
| 1393 | 4.69M |     peer_manager_->SignalRequest(RequestTriggerMode::kNonEmptyOnly); | 
| 1394 | 4.69M |   } | 
| 1395 | 34.7M | } | 
| 1396 |  |  | 
| 1397 | 0 | void RaftConsensus::AppendEmptyBatchToLeaderLog() { | 
| 1398 | 0 |   auto lock = state_->LockForRead(); | 
| 1399 | 0 |   auto committed_op_id = state_->GetCommittedOpIdUnlocked(); | 
| 1400 | 0 |   if (committed_op_id == state_->GetLastReceivedOpIdUnlocked()) { | 
| 1401 | 0 |     auto status = queue_->AppendOperations({}, committed_op_id, state_->Clock().Now()); | 
| 1402 | 0 |     LOG_IF_WITH_PREFIX(DFATAL, !status.ok()) << "Failed to append empty batch: " << status; | 
| 1403 | 0 |   } | 
| 1404 | 0 | } | 
| 1405 |  |  | 
| 1406 | 801 | void RaftConsensus::NotifyTermChange(int64_t term) { | 
| 1407 | 801 |   ReplicaState::UniqueLock lock; | 
| 1408 | 801 |   Status s = state_->LockForConfigChange(&lock); | 
| 1409 | 801 |   if (PREDICT_FALSE(!s.ok())) { | 
| 1410 | 0 |     LOG_WITH_PREFIX(WARNING) << "Unable to lock ReplicaState for config change" | 
| 1411 | 0 |                              << " when notified of new term " << term << ": " << s; | 
| 1412 | 0 |     return; | 
| 1413 | 0 |   } | 
| 1414 | 801 |   WARN_NOT_OK(HandleTermAdvanceUnlocked(term), "Couldn't advance consensus term."); | 
| 1415 | 801 | } | 
| 1416 |  |  | 
| 1417 |  | void RaftConsensus::NotifyFailedFollower(const string& uuid, | 
| 1418 |  |                                          int64_t term, | 
| 1419 | 330 |                                          const std::string& reason) { | 
| 1420 |  |   // Common info used in all of the log messages within this method. | 
| 1421 | 330 |   string fail_msg = Substitute("Processing failure of peer $0 in term $1 ($2): ", | 
| 1422 | 330 |                                uuid, term, reason); | 
| 1423 |  |  | 
| 1424 | 330 |   if (!FLAGS_evict_failed_followers) { | 
| 1425 | 0 |     LOG_WITH_PREFIX(INFO) << fail_msg << "Eviction of failed followers is disabled. Doing nothing."; | 
| 1426 | 0 |     return; | 
| 1427 | 0 |   } | 
| 1428 |  |  | 
| 1429 | 330 |   RaftConfigPB committed_config; | 
| 1430 | 330 |   { | 
| 1431 | 330 |     auto lock = state_->LockForRead(); | 
| 1432 |  |  | 
| 1433 | 330 |     int64_t current_term = state_->GetCurrentTermUnlocked(); | 
| 1434 | 330 |     if (current_term != term) { | 
| 1435 | 0 |       LOG_WITH_PREFIX(INFO) << fail_msg << "Notified about a follower failure in " | 
| 1436 | 0 |                             << "previous term " << term << ", but a leader election " | 
| 1437 | 0 |                             << "likely occurred since the failure was detected. " | 
| 1438 | 0 |                             << "Doing nothing."; | 
| 1439 | 0 |       return; | 
| 1440 | 0 |     } | 
| 1441 |  |  | 
| 1442 | 330 |     if (state_->IsConfigChangePendingUnlocked()) { | 
| 1443 | 84 |       LOG_WITH_PREFIX(INFO) << fail_msg << "There is already a config change operation " | 
| 1444 | 84 |                             << "in progress. Unable to evict follower until it completes. " | 
| 1445 | 84 |                             << "Doing nothing."; | 
| 1446 | 84 |       return; | 
| 1447 | 84 |     } | 
| 1448 | 246 |     committed_config = state_->GetCommittedConfigUnlocked(); | 
| 1449 | 246 |   } | 
| 1450 |  |  | 
| 1451 |  |   // Run config change on thread pool after dropping ReplicaState lock. | 
| 1452 | 246 |   WARN_NOT_OK(raft_pool_token_->SubmitFunc(std::bind(&RaftConsensus::TryRemoveFollowerTask, | 
| 1453 | 246 |                                                shared_from_this(), uuid, committed_config, reason)), | 
| 1454 | 246 |               state_->LogPrefix() + "Unable to start RemoteFollowerTask"); | 
| 1455 | 246 | } | 
| 1456 |  |  | 
| 1457 |  | void RaftConsensus::TryRemoveFollowerTask(const string& uuid, | 
| 1458 |  |                                           const RaftConfigPB& committed_config, | 
| 1459 | 246 |                                           const std::string& reason) { | 
| 1460 | 246 |   ChangeConfigRequestPB req; | 
| 1461 | 246 |   req.set_tablet_id(tablet_id()); | 
| 1462 | 246 |   req.mutable_server()->set_permanent_uuid(uuid); | 
| 1463 | 246 |   req.set_type(REMOVE_SERVER); | 
| 1464 | 246 |   req.set_cas_config_opid_index(committed_config.opid_index()); | 
| 1465 | 246 |   LOG_WITH_PREFIX(INFO) | 
| 1466 | 246 |       << "Attempting to remove follower " << uuid << " from the Raft config at commit index " | 
| 1467 | 246 |       << committed_config.opid_index() << ". Reason: " << reason; | 
| 1468 | 246 |   boost::optional<TabletServerErrorPB::Code> error_code; | 
| 1469 | 246 |   WARN_NOT_OK(ChangeConfig(req, &DoNothingStatusCB, &error_code), | 
| 1470 | 246 |               state_->LogPrefix() + "Unable to remove follower " + uuid); | 
| 1471 | 246 | } | 
| 1472 |  |  | 
| 1473 |  | Status RaftConsensus::Update(ConsensusRequestPB* request, | 
| 1474 |  |                              ConsensusResponsePB* response, | 
| 1475 | 25.5M |                              CoarseTimePoint deadline) { | 
| 1476 | 25.5M |   if (PREDICT_FALSE(FLAGS_TEST_follower_reject_update_consensus_requests)) { | 
| 1477 | 329 |     return STATUS(IllegalState, "Rejected: --TEST_follower_reject_update_consensus_requests " | 
| 1478 | 329 |                                 "is set to true."); | 
| 1479 | 329 |   } | 
| 1480 | 25.5M |   TEST_PAUSE_IF_FLAG(TEST_follower_pause_update_consensus_requests); | 
| 1481 |  |  | 
| 1482 | 25.5M |   auto reject_mode = reject_mode_.load(std::memory_order_acquire); | 
| 1483 | 25.5M |   if (reject_mode != RejectMode::kNone) { | 
| 1484 | 0 |     if (reject_mode == RejectMode::kAll || | 
| 1485 | 0 |         (reject_mode == RejectMode::kNonEmpty && !request->ops().empty())) { | 
| 1486 | 0 |       auto result = STATUS_FORMAT(IllegalState, "Rejected because of reject mode: $0", | 
| 1487 | 0 |                                   ToString(reject_mode)); | 
| 1488 | 0 |       LOG_WITH_PREFIX(INFO) << result; | 
| 1489 | 0 |       return result; | 
| 1490 | 0 |     } | 
| 1491 | 0 |     LOG_WITH_PREFIX(INFO) << "Accepted: " << request->ShortDebugString(); | 
| 1492 | 0 |   } | 
| 1493 |  |  | 
| 1494 | 25.5M |   if (PREDICT_FALSE(FLAGS_TEST_follower_reject_update_consensus_requests_seconds > 0)) { | 
| 1495 | 385 |     if (MonoTime::Now() < withold_replica_updates_until_) { | 
| 1496 | 385 |       LOG(INFO) << "Rejecting Update for tablet: " << tablet_id() | 
| 1497 | 385 |                 << " tserver uuid: " << peer_uuid(); | 
| 1498 | 385 |       return STATUS_SUBSTITUTE(IllegalState, | 
| 1499 | 385 |           "Rejected: --TEST_follower_reject_update_consensus_requests_seconds is set to $0", | 
| 1500 | 385 |           FLAGS_TEST_follower_reject_update_consensus_requests_seconds); | 
| 1501 | 385 |     } | 
| 1502 | 385 |   } | 
| 1503 |  |  | 
| 1504 | 25.5M |   RETURN_NOT_OK(ExecuteHook(PRE_UPDATE)); | 
| 1505 | 25.5M |   response->set_responder_uuid(state_->GetPeerUuid()); | 
| 1506 |  |  | 
| 1507 | 25.5M |   VLOG_WITH_PREFIX32.0k (2) << "Replica received request: " << request->ShortDebugString()32.0k; | 
| 1508 |  |  | 
| 1509 | 25.5M |   UpdateReplicaResult result; | 
| 1510 | 25.5M |   { | 
| 1511 |  |     // see var declaration | 
| 1512 | 25.5M |     auto wait_start = CoarseMonoClock::now(); | 
| 1513 | 25.5M |     auto wait_duration = deadline != CoarseTimePoint::max() ? deadline - wait_start25.5M | 
| 1514 | 25.5M |                                                             : CoarseDuration::max()22.1k; | 
| 1515 | 25.5M |     auto lock = LockMutex(&update_mutex_, wait_duration); | 
| 1516 | 25.5M |     if (!lock.owns_lock()) { | 
| 1517 | 1.20k |       return STATUS_FORMAT(TimedOut, "Unable to lock update mutex for $0", wait_duration); | 
| 1518 | 1.20k |     } | 
| 1519 |  |  | 
| 1520 | 25.5M |     LongOperationTracker operation_tracker("UpdateReplica", 1s); | 
| 1521 | 25.5M |     result = VERIFY_RESULT25.5M (25.5MUpdateReplica(request, response)); | 
| 1522 |  |  | 
| 1523 | 0 |     auto delay = TEST_delay_update_.load(std::memory_order_acquire); | 
| 1524 | 25.5M |     if (delay != MonoDelta::kZero) { | 
| 1525 | 0 |       std::this_thread::sleep_for(delay.ToSteadyDuration()); | 
| 1526 | 0 |     } | 
| 1527 | 25.5M |   } | 
| 1528 |  |  | 
| 1529 |  |   // Release the lock while we wait for the log append to finish so that commits can go through. | 
| 1530 | 25.5M |   if (!result.wait_for_op_id.empty()) { | 
| 1531 | 8.27M |     RETURN_NOT_OK(WaitForWrites(result.current_term, result.wait_for_op_id)); | 
| 1532 | 8.27M |   } | 
| 1533 |  |  | 
| 1534 | 25.5M |   if (PREDICT_FALSE(VLOG_IS_ON(2))) { | 
| 1535 | 0 |     VLOG_WITH_PREFIX(2) << "Replica updated. " | 
| 1536 | 0 |         << state_->ToString() << " Request: " << request->ShortDebugString(); | 
| 1537 | 0 |   } | 
| 1538 |  |  | 
| 1539 |  |   // If an election pending on a specific op id and it has just been committed, start it now. | 
| 1540 |  |   // StartElection will ensure the pending election will be started just once only even if | 
| 1541 |  |   // UpdateReplica happens in multiple threads in parallel. | 
| 1542 | 25.5M |   if (result.start_election) { | 
| 1543 | 0 |     RETURN_NOT_OK(StartElection( | 
| 1544 | 0 |         {consensus::ElectionMode::ELECT_EVEN_IF_LEADER_IS_ALIVE, true /* pending_commit */})); | 
| 1545 | 0 |   } | 
| 1546 |  |  | 
| 1547 | 25.5M |   RETURN_NOT_OK(ExecuteHook(POST_UPDATE)); | 
| 1548 | 25.5M |   return Status::OK(); | 
| 1549 | 25.5M | } | 
| 1550 |  |  | 
| 1551 |  | Status RaftConsensus::StartReplicaOperationUnlocked( | 
| 1552 | 9.41M |     const ReplicateMsgPtr& msg, HybridTime propagated_safe_time) { | 
| 1553 | 9.41M |   if (IsConsensusOnlyOperation(msg->op_type())) { | 
| 1554 | 133k |     return StartConsensusOnlyRoundUnlocked(msg); | 
| 1555 | 133k |   } | 
| 1556 |  |  | 
| 1557 | 9.28M |   if (PREDICT_FALSE(FLAGS_TEST_follower_fail_all_prepare)) { | 
| 1558 | 1 |     return STATUS(IllegalState, "Rejected: --TEST_follower_fail_all_prepare " | 
| 1559 | 1 |                                 "is set to true."); | 
| 1560 | 1 |   } | 
| 1561 |  |  | 
| 1562 | 9.28M |   VLOG_WITH_PREFIX3.00k (1) << "Starting operation: " << msg->id().ShortDebugString()3.00k; | 
| 1563 | 9.28M |   scoped_refptr<ConsensusRound> round(new ConsensusRound(this, msg)); | 
| 1564 | 9.28M |   ConsensusRound* round_ptr = round.get(); | 
| 1565 | 9.28M |   RETURN_NOT_OK(state_->context()->StartReplicaOperation(round, propagated_safe_time)); | 
| 1566 | 9.28M |   auto result = state_->AddPendingOperation(round_ptr, OperationMode::kFollower); | 
| 1567 | 9.28M |   if (!result.ok()) { | 
| 1568 | 0 |     round_ptr->NotifyReplicationFinished(result, OpId::kUnknownTerm, /* applied_op_ids */ nullptr); | 
| 1569 | 0 |   } | 
| 1570 | 9.28M |   return result; | 
| 1571 | 9.28M | } | 
| 1572 |  |  | 
| 1573 | 34 | std::string RaftConsensus::LeaderRequest::OpsRangeString() const { | 
| 1574 | 34 |   std::string ret; | 
| 1575 | 34 |   ret.reserve(100); | 
| 1576 | 34 |   ret.push_back('['); | 
| 1577 | 34 |   if (!messages.empty()) { | 
| 1578 | 7 |     const OpIdPB& first_op = (*messages.begin())->id(); | 
| 1579 | 7 |     const OpIdPB& last_op = (*messages.rbegin())->id(); | 
| 1580 | 7 |     strings::SubstituteAndAppend(&ret, "$0.$1-$2.$3", | 
| 1581 | 7 |                                  first_op.term(), first_op.index(), | 
| 1582 | 7 |                                  last_op.term(), last_op.index()); | 
| 1583 | 7 |   } | 
| 1584 | 34 |   ret.push_back(']'); | 
| 1585 | 34 |   return ret; | 
| 1586 | 34 | } | 
| 1587 |  |  | 
| 1588 |  | Status RaftConsensus::DeduplicateLeaderRequestUnlocked(ConsensusRequestPB* rpc_req, | 
| 1589 | 25.5M |                                                        LeaderRequest* deduplicated_req) { | 
| 1590 | 25.5M |   const auto& last_committed = state_->GetCommittedOpIdUnlocked(); | 
| 1591 |  |  | 
| 1592 |  |   // The leader's preceding id. | 
| 1593 | 25.5M |   deduplicated_req->preceding_op_id = yb::OpId::FromPB(rpc_req->preceding_id()); | 
| 1594 |  |  | 
| 1595 | 25.5M |   int64_t dedup_up_to_index = state_->GetLastReceivedOpIdUnlocked().index; | 
| 1596 |  |  | 
| 1597 | 25.5M |   deduplicated_req->first_message_idx = -1; | 
| 1598 |  |  | 
| 1599 |  |   // In this loop we discard duplicates and advance the leader's preceding id | 
| 1600 |  |   // accordingly. | 
| 1601 | 34.9M |   for (int i = 0; i < rpc_req->ops_size(); i++9.41M) { | 
| 1602 | 9.41M |     ReplicateMsg* leader_msg = rpc_req->mutable_ops(i); | 
| 1603 |  |  | 
| 1604 | 9.41M |     if (leader_msg->id().index() <= last_committed.index) { | 
| 1605 | 603 |       VLOG_WITH_PREFIX0(2) << "Skipping op id " << leader_msg->id() | 
| 1606 | 0 |                           << " (already committed)"; | 
| 1607 | 603 |       deduplicated_req->preceding_op_id = yb::OpId::FromPB(leader_msg->id()); | 
| 1608 | 603 |       continue; | 
| 1609 | 603 |     } | 
| 1610 |  |  | 
| 1611 | 9.41M |     if (leader_msg->id().index() <= dedup_up_to_index) { | 
| 1612 |  |       // If the index is uncommitted and below our match index, then it must be in the | 
| 1613 |  |       // pendings set. | 
| 1614 | 114 |       scoped_refptr<ConsensusRound> round = | 
| 1615 | 114 |           state_->GetPendingOpByIndexOrNullUnlocked(leader_msg->id().index()); | 
| 1616 | 114 |       if (!round) { | 
| 1617 |  |         // Could happen if we received outdated leader request. So should just reject it. | 
| 1618 | 0 |         return STATUS_FORMAT(IllegalState, "Round not found for index: $0", | 
| 1619 | 0 |                              leader_msg->id().index()); | 
| 1620 | 0 |       } | 
| 1621 |  |  | 
| 1622 |  |       // If the OpIds match, i.e. if they have the same term and id, then this is just | 
| 1623 |  |       // duplicate, we skip... | 
| 1624 | 114 |       if (OpIdEquals(round->replicate_msg()->id(), leader_msg->id())) { | 
| 1625 | 8 |         VLOG_WITH_PREFIX0(2) << "Skipping op id " << leader_msg->id() | 
| 1626 | 0 |                             << " (already replicated)"; | 
| 1627 | 8 |         deduplicated_req->preceding_op_id = yb::OpId::FromPB(leader_msg->id()); | 
| 1628 | 8 |         continue; | 
| 1629 | 8 |       } | 
| 1630 |  |  | 
| 1631 |  |       // ... otherwise we must adjust our match index, i.e. all messages from now on | 
| 1632 |  |       // are "new" | 
| 1633 | 106 |       dedup_up_to_index = leader_msg->id().index(); | 
| 1634 | 106 |     } | 
| 1635 |  |  | 
| 1636 | 9.41M |     if (deduplicated_req->first_message_idx == -1) { | 
| 1637 | 8.27M |       deduplicated_req->first_message_idx = i; | 
| 1638 | 8.27M |     } | 
| 1639 | 9.41M |     deduplicated_req->messages.emplace_back(leader_msg); | 
| 1640 | 9.41M |   } | 
| 1641 |  |  | 
| 1642 | 25.5M |   if (deduplicated_req->messages.size() != implicit_cast<size_t>(rpc_req->ops_size())) { | 
| 1643 | 34 |     LOG_WITH_PREFIX(INFO) << "Deduplicated request from leader. Original: " | 
| 1644 | 34 |                           << rpc_req->preceding_id() << "->" << OpsRangeString(*rpc_req) | 
| 1645 | 34 |                           << "   Dedup: " << deduplicated_req->preceding_op_id << "->" | 
| 1646 | 34 |                           << deduplicated_req->OpsRangeString() | 
| 1647 | 34 |                           << ", known committed: " << last_committed << ", received committed: " | 
| 1648 | 34 |                           << OpId::FromPB(rpc_req->committed_op_id()); | 
| 1649 | 34 |   } | 
| 1650 |  |  | 
| 1651 | 25.5M |   return Status::OK(); | 
| 1652 | 25.5M | } | 
| 1653 |  |  | 
| 1654 |  | Status RaftConsensus::HandleLeaderRequestTermUnlocked(const ConsensusRequestPB* request, | 
| 1655 | 25.5M |                                                       ConsensusResponsePB* response) { | 
| 1656 |  |   // Do term checks first: | 
| 1657 | 25.5M |   if (PREDICT_FALSE(request->caller_term() != state_->GetCurrentTermUnlocked())) { | 
| 1658 |  |  | 
| 1659 |  |     // If less, reject. | 
| 1660 | 12.1k |     if (request->caller_term() < state_->GetCurrentTermUnlocked()) { | 
| 1661 | 1.56k |       string msg = Substitute("Rejecting Update request from peer $0 for earlier term $1. " | 
| 1662 | 1.56k |                               "Current term is $2. Ops: $3", | 
| 1663 |  |  | 
| 1664 | 1.56k |                               request->caller_uuid(), | 
| 1665 | 1.56k |                               request->caller_term(), | 
| 1666 | 1.56k |                               state_->GetCurrentTermUnlocked(), | 
| 1667 | 1.56k |                               OpsRangeString(*request)); | 
| 1668 | 1.56k |       LOG_WITH_PREFIX(INFO) << msg; | 
| 1669 | 1.56k |       FillConsensusResponseError(response, | 
| 1670 | 1.56k |                                  ConsensusErrorPB::INVALID_TERM, | 
| 1671 | 1.56k |                                  STATUS(IllegalState, msg)); | 
| 1672 | 1.56k |       return Status::OK(); | 
| 1673 | 10.6k |     } else { | 
| 1674 | 10.6k |       RETURN_NOT_OK(HandleTermAdvanceUnlocked(request->caller_term())); | 
| 1675 | 10.6k |     } | 
| 1676 | 12.1k |   } | 
| 1677 | 25.5M |   return Status::OK(); | 
| 1678 | 25.5M | } | 
| 1679 |  |  | 
| 1680 |  | Status RaftConsensus::EnforceLogMatchingPropertyMatchesUnlocked(const LeaderRequest& req, | 
| 1681 | 25.5M |                                                                 ConsensusResponsePB* response) { | 
| 1682 |  |  | 
| 1683 | 25.5M |   bool term_mismatch; | 
| 1684 | 25.5M |   if (state_->IsOpCommittedOrPending(req.preceding_op_id, &term_mismatch)) { | 
| 1685 | 25.4M |     return Status::OK(); | 
| 1686 | 25.4M |   } | 
| 1687 |  |  | 
| 1688 | 100k |   string error_msg = Format( | 
| 1689 | 100k |     "Log matching property violated." | 
| 1690 | 100k |     " Preceding OpId in replica: $0. Preceding OpId from leader: $1. ($2 mismatch)", | 
| 1691 | 100k |     state_->GetLastReceivedOpIdUnlocked(), req.preceding_op_id, term_mismatch ? "term"65: "index"100k); | 
| 1692 |  |  | 
| 1693 | 100k |   FillConsensusResponseError(response, | 
| 1694 | 100k |                              ConsensusErrorPB::PRECEDING_ENTRY_DIDNT_MATCH, | 
| 1695 | 100k |                              STATUS(IllegalState, error_msg)); | 
| 1696 |  |  | 
| 1697 | 100k |   LOG_WITH_PREFIX(INFO) << "Refusing update from remote peer " | 
| 1698 | 100k |                         << req.leader_uuid << ": " << error_msg; | 
| 1699 |  |  | 
| 1700 |  |   // If the terms mismatch we abort down to the index before the leader's preceding, | 
| 1701 |  |   // since we know that is the last opid that has a chance of not being overwritten. | 
| 1702 |  |   // Aborting preemptively here avoids us reporting a last received index that is | 
| 1703 |  |   // possibly higher than the leader's causing an avoidable cache miss on the leader's | 
| 1704 |  |   // queue. | 
| 1705 |  |   // | 
| 1706 |  |   // TODO: this isn't just an optimization! if we comment this out, we get | 
| 1707 |  |   // failures on raft_consensus-itest a couple percent of the time! Should investigate | 
| 1708 |  |   // why this is actually critical to do here, as opposed to just on requests that | 
| 1709 |  |   // append some ops. | 
| 1710 | 100k |   if (term_mismatch) { | 
| 1711 | 65 |     return state_->AbortOpsAfterUnlocked(req.preceding_op_id.index - 1); | 
| 1712 | 65 |   } | 
| 1713 |  |  | 
| 1714 | 100k |   return Status::OK(); | 
| 1715 | 100k | } | 
| 1716 |  |  | 
| 1717 |  | Status RaftConsensus::CheckLeaderRequestOpIdSequence( | 
| 1718 |  |     const LeaderRequest& deduped_req, | 
| 1719 | 25.5M |     ConsensusRequestPB* request) { | 
| 1720 | 25.5M |   Status sequence_check_status; | 
| 1721 | 25.5M |   yb::OpId prev = deduped_req.preceding_op_id; | 
| 1722 | 25.5M |   for (const auto& message : deduped_req.messages) { | 
| 1723 | 9.41M |     auto current = yb::OpId::FromPB(message->id()); | 
| 1724 | 9.41M |     sequence_check_status = ReplicaState::CheckOpInSequence(prev, current); | 
| 1725 | 9.41M |     if (PREDICT_FALSE(!sequence_check_status.ok())) { | 
| 1726 | 2 |       LOG(ERROR) << "Leader request contained out-of-sequence messages. Status: " | 
| 1727 | 2 |           << sequence_check_status.ToString() << ". Leader Request: " | 
| 1728 | 2 |           << request->ShortDebugString(); | 
| 1729 | 2 |       break; | 
| 1730 | 2 |     } | 
| 1731 | 9.41M |     prev = current; | 
| 1732 | 9.41M |   } | 
| 1733 |  |  | 
| 1734 |  |   // We only release the messages from the request after the above check so that that we can print | 
| 1735 |  |   // the original request, if it fails. | 
| 1736 | 25.5M |   if (!deduped_req.messages.empty()) { | 
| 1737 |  |     // We take ownership of the deduped ops. | 
| 1738 | 8.27M |     DCHECK_GE(deduped_req.first_message_idx, 0); | 
| 1739 | 8.27M |     request->mutable_ops()->ExtractSubrange( | 
| 1740 | 8.27M |         narrow_cast<int>(deduped_req.first_message_idx), | 
| 1741 | 8.27M |         narrow_cast<int>(deduped_req.messages.size()), | 
| 1742 | 8.27M |         nullptr); | 
| 1743 | 8.27M |   } | 
| 1744 |  |  | 
| 1745 |  |   // We don't need request->ops() anymore, so could release them to avoid unnecessary memory | 
| 1746 |  |   // consumption. | 
| 1747 | 25.5M |   request->mutable_ops()->Clear(); | 
| 1748 |  |  | 
| 1749 | 25.5M |   return sequence_check_status; | 
| 1750 | 25.5M | } | 
| 1751 |  |  | 
| 1752 |  | Status RaftConsensus::CheckLeaderRequestUnlocked(ConsensusRequestPB* request, | 
| 1753 |  |                                                  ConsensusResponsePB* response, | 
| 1754 | 25.5M |                                                  LeaderRequest* deduped_req) { | 
| 1755 | 25.5M |   RETURN_NOT_OK(DeduplicateLeaderRequestUnlocked(request, deduped_req)); | 
| 1756 |  |  | 
| 1757 |  |   // This is an additional check for KUDU-639 that makes sure the message's index | 
| 1758 |  |   // and term are in the right sequence in the request, after we've deduplicated | 
| 1759 |  |   // them. We do this before we change any of the internal state. | 
| 1760 |  |   // | 
| 1761 |  |   // TODO move this to raft_consensus-state or whatever we transform that into. | 
| 1762 |  |   // We should be able to do this check for each append, but right now the way | 
| 1763 |  |   // we initialize raft_consensus-state is preventing us from doing so. | 
| 1764 | 25.5M |   RETURN_NOT_OK(CheckLeaderRequestOpIdSequence(*deduped_req, request)); | 
| 1765 |  |  | 
| 1766 | 25.5M |   RETURN_NOT_OK(HandleLeaderRequestTermUnlocked(request, response)); | 
| 1767 |  |  | 
| 1768 | 25.5M |   if (response->status().has_error()) { | 
| 1769 | 1.56k |     return Status::OK(); | 
| 1770 | 1.56k |   } | 
| 1771 |  |  | 
| 1772 | 25.5M |   RETURN_NOT_OK(EnforceLogMatchingPropertyMatchesUnlocked(*deduped_req, response)); | 
| 1773 |  |  | 
| 1774 | 25.5M |   if (response->status().has_error()) { | 
| 1775 | 120k |     return Status::OK(); | 
| 1776 | 120k |   } | 
| 1777 |  |  | 
| 1778 |  |   // If the first of the messages to apply is not in our log, either it follows the last | 
| 1779 |  |   // received message or it replaces some in-flight. | 
| 1780 | 25.4M |   if (!deduped_req->messages.empty()) { | 
| 1781 | 8.27M |     auto first_id = yb::OpId::FromPB(deduped_req->messages[0]->id()); | 
| 1782 | 8.27M |     bool term_mismatch; | 
| 1783 | 8.27M |     if (state_->IsOpCommittedOrPending(first_id, &term_mismatch)) { | 
| 1784 | 0 |       return STATUS_FORMAT(IllegalState, | 
| 1785 | 0 |                            "First deduped message $0 is committed or pending", | 
| 1786 | 0 |                            first_id); | 
| 1787 | 0 |     } | 
| 1788 |  |  | 
| 1789 |  |     // If the index is in our log but the terms are not the same abort down to the leader's | 
| 1790 |  |     // preceding id. | 
| 1791 | 8.27M |     if (term_mismatch) { | 
| 1792 |  |       // Since we are holding the lock ApplyPendingOperationsUnlocked would be invoked between | 
| 1793 |  |       // those two. | 
| 1794 | 103 |       RETURN_NOT_OK(state_->AbortOpsAfterUnlocked(deduped_req->preceding_op_id.index)); | 
| 1795 | 103 |       RETURN_NOT_OK(log_->ResetLastSyncedEntryOpId(deduped_req->preceding_op_id)); | 
| 1796 | 103 |     } | 
| 1797 | 8.27M |   } | 
| 1798 |  |  | 
| 1799 |  |   // If all of the above logic was successful then we can consider this to be | 
| 1800 |  |   // the effective leader of the configuration. If they are not currently marked as | 
| 1801 |  |   // the leader locally, mark them as leader now. | 
| 1802 | 25.4M |   const string& caller_uuid = request->caller_uuid(); | 
| 1803 | 25.4M |   if (PREDICT_FALSE(state_->HasLeaderUnlocked() && | 
| 1804 | 25.4M |                     state_->GetLeaderUuidUnlocked() != caller_uuid)) { | 
| 1805 | 0 |     LOG_WITH_PREFIX(FATAL) | 
| 1806 | 0 |         << "Unexpected new leader in same term! " | 
| 1807 | 0 |         << "Existing leader UUID: " << state_->GetLeaderUuidUnlocked() << ", " | 
| 1808 | 0 |         << "new leader UUID: " << caller_uuid; | 
| 1809 | 0 |   } | 
| 1810 | 25.4M |   if (PREDICT_FALSE(!state_->HasLeaderUnlocked())) { | 
| 1811 | 120k |     SetLeaderUuidUnlocked(caller_uuid); | 
| 1812 | 120k |   } | 
| 1813 |  |  | 
| 1814 | 25.4M |   return Status::OK(); | 
| 1815 | 25.4M | } | 
| 1816 |  |  | 
| 1817 |  | Result<RaftConsensus::UpdateReplicaResult> RaftConsensus::UpdateReplica( | 
| 1818 | 25.5M |     ConsensusRequestPB* request, ConsensusResponsePB* response) { | 
| 1819 | 25.5M |   TRACE_EVENT2("consensus", "RaftConsensus::UpdateReplica", | 
| 1820 | 25.5M |                "peer", peer_uuid(), | 
| 1821 | 25.5M |                "tablet", tablet_id()); | 
| 1822 |  |  | 
| 1823 | 25.5M |   if (request->has_propagated_hybrid_time()) { | 
| 1824 | 25.5M |     clock_->Update(HybridTime(request->propagated_hybrid_time())); | 
| 1825 | 25.5M |   } | 
| 1826 |  |  | 
| 1827 |  |   // The ordering of the following operations is crucial, read on for details. | 
| 1828 |  |   // | 
| 1829 |  |   // The main requirements explained in more detail below are: | 
| 1830 |  |   // | 
| 1831 |  |   //   1) We must enqueue the prepares before we write to our local log. | 
| 1832 |  |   //   2) If we were able to enqueue a prepare then we must be able to log it. | 
| 1833 |  |   //   3) If we fail to enqueue a prepare, we must not attempt to enqueue any | 
| 1834 |  |   //      later-indexed prepare or apply. | 
| 1835 |  |   // | 
| 1836 |  |   // See below for detailed rationale. | 
| 1837 |  |   // | 
| 1838 |  |   // The steps are: | 
| 1839 |  |   // | 
| 1840 |  |   // 0 - Dedup | 
| 1841 |  |   // | 
| 1842 |  |   // We make sure that we don't do anything on Replicate operations we've already received in a | 
| 1843 |  |   // previous call. This essentially makes this method idempotent. | 
| 1844 |  |   // | 
| 1845 |  |   // 1 - We mark as many pending operations as committed as we can. | 
| 1846 |  |   // | 
| 1847 |  |   // We may have some pending operations that, according to the leader, are now | 
| 1848 |  |   // committed. We Apply them early, because: | 
| 1849 |  |   // - Soon (step 2) we may reject the call due to excessive memory pressure. One | 
| 1850 |  |   //   way to relieve the pressure is by flushing the MRS, and applying these | 
| 1851 |  |   //   operations may unblock an in-flight Flush(). | 
| 1852 |  |   // - The Apply and subsequent Prepares (step 2) can take place concurrently. | 
| 1853 |  |   // | 
| 1854 |  |   // 2 - We enqueue the Prepare of the operations. | 
| 1855 |  |   // | 
| 1856 |  |   // The actual prepares are enqueued in order but happen asynchronously so we don't | 
| 1857 |  |   // have decoding/acquiring locks on the critical path. | 
| 1858 |  |   // | 
| 1859 |  |   // We need to do this now for a number of reasons: | 
| 1860 |  |   // - Prepares, by themselves, are inconsequential, i.e. they do not mutate the | 
| 1861 |  |   //   state machine so, were we to crash afterwards, having the prepares in-flight | 
| 1862 |  |   //   won't hurt. | 
| 1863 |  |   // - Prepares depend on factors external to consensus (the operation drivers and | 
| 1864 |  |   //   the tablet peer) so if for some reason they cannot be enqueued we must know | 
| 1865 |  |   //   before we try write them to the WAL. Once enqueued, we assume that prepare will | 
| 1866 |  |   //   always succeed on a replica operation (because the leader already prepared them | 
| 1867 |  |   //   successfully, and thus we know they are valid). | 
| 1868 |  |   // - The prepares corresponding to every operation that was logged must be in-flight | 
| 1869 |  |   //   first. This because should we need to abort certain operations (say a new leader | 
| 1870 |  |   //   says they are not committed) we need to have those prepares in-flight so that | 
| 1871 |  |   //   the operations can be continued (in the abort path). | 
| 1872 |  |   // - Failure to enqueue prepares is OK, we can continue and let the leader know that | 
| 1873 |  |   //   we only went so far. The leader will re-send the remaining messages. | 
| 1874 |  |   // - Prepares represent new operations, and operations consume memory. Thus, if the | 
| 1875 |  |   //   overall memory pressure on the server is too high, we will reject the prepares. | 
| 1876 |  |   // | 
| 1877 |  |   // 3 - We enqueue the writes to the WAL. | 
| 1878 |  |   // | 
| 1879 |  |   // We enqueue writes to the WAL, but only the operations that were successfully | 
| 1880 |  |   // enqueued for prepare (for the reasons introduced above). This means that even | 
| 1881 |  |   // if a prepare fails to enqueue, if any of the previous prepares were successfully | 
| 1882 |  |   // submitted they must be written to the WAL. | 
| 1883 |  |   // If writing to the WAL fails, we're in an inconsistent state and we crash. In this | 
| 1884 |  |   // case, no one will ever know of the operations we previously prepared so those are | 
| 1885 |  |   // inconsequential. | 
| 1886 |  |   // | 
| 1887 |  |   // 4 - We mark the operations as committed. | 
| 1888 |  |   // | 
| 1889 |  |   // For each operation which has been committed by the leader, we update the | 
| 1890 |  |   // operation state to reflect that. If the logging has already succeeded for that | 
| 1891 |  |   // operation, this will trigger the Apply phase. Otherwise, Apply will be triggered | 
| 1892 |  |   // when the logging completes. In both cases the Apply phase executes asynchronously. | 
| 1893 |  |   // This must, of course, happen after the prepares have been triggered as the same batch | 
| 1894 |  |   // can both replicate/prepare and commit/apply an operation. | 
| 1895 |  |   // | 
| 1896 |  |   // Currently, if a prepare failed to enqueue we still trigger all applies for operations | 
| 1897 |  |   // with an id lower than it (if we have them). This is important now as the leader will | 
| 1898 |  |   // not re-send those commit messages. This will be moot when we move to the commit | 
| 1899 |  |   // commitIndex way of doing things as we can simply ignore the applies as we know | 
| 1900 |  |   // they will be triggered with the next successful batch. | 
| 1901 |  |   // | 
| 1902 |  |   // 5 - We wait for the writes to be durable. | 
| 1903 |  |   // | 
| 1904 |  |   // Before replying to the leader we wait for the writes to be durable. We then | 
| 1905 |  |   // just update the last replicated watermark and respond. | 
| 1906 |  |   // | 
| 1907 |  |   // TODO - These failure scenarios need to be exercised in an unit | 
| 1908 |  |   //        test. Moreover we need to add more fault injection spots (well that | 
| 1909 |  |   //        and actually use them) for each of these steps. | 
| 1910 | 25.5M |   TRACE("Updating replica for $0 ops", request->ops_size()); | 
| 1911 |  |  | 
| 1912 |  |   // The deduplicated request. | 
| 1913 | 25.5M |   LeaderRequest deduped_req; | 
| 1914 |  |  | 
| 1915 | 25.5M |   ReplicaState::UniqueLock lock; | 
| 1916 | 25.5M |   RETURN_NOT_OK(state_->LockForUpdate(&lock)); | 
| 1917 |  |  | 
| 1918 | 25.5M |   const auto old_leader = state_->GetLeaderUuidUnlocked(); | 
| 1919 |  |  | 
| 1920 | 25.5M |   auto prev_committed_op_id = state_->GetCommittedOpIdUnlocked(); | 
| 1921 |  |  | 
| 1922 | 25.5M |   deduped_req.leader_uuid = request->caller_uuid(); | 
| 1923 |  |  | 
| 1924 | 25.5M |   RETURN_NOT_OK(CheckLeaderRequestUnlocked(request, response, &deduped_req)); | 
| 1925 |  |  | 
| 1926 | 25.5M |   if (response->status().has_error()) { | 
| 1927 | 122k |     LOG_WITH_PREFIX(INFO) | 
| 1928 | 122k |         << "Returning from UpdateConsensus because of error: " << AsString(response->status()); | 
| 1929 |  |     // We had an error, like an invalid term, we still fill the response. | 
| 1930 | 122k |     FillConsensusResponseOKUnlocked(response); | 
| 1931 | 122k |     return UpdateReplicaResult(); | 
| 1932 | 122k |   } | 
| 1933 |  |  | 
| 1934 | 25.4M |   TEST_PAUSE_IF_FLAG(TEST_pause_update_replica); | 
| 1935 |  |  | 
| 1936 |  |   // Snooze the failure detector as soon as we decide to accept the message. | 
| 1937 |  |   // We are guaranteed to be acting as a FOLLOWER at this point by the above | 
| 1938 |  |   // sanity check. | 
| 1939 | 25.4M |   SnoozeFailureDetector(DO_NOT_LOG); | 
| 1940 |  |  | 
| 1941 | 25.4M |   auto now = MonoTime::Now(); | 
| 1942 |  |  | 
| 1943 |  |   // Update the expiration time of the current leader's lease, so that when this follower becomes | 
| 1944 |  |   // a leader, it can wait out the time interval while the old leader might still be active. | 
| 1945 | 25.4M |   if (request->has_leader_lease_duration_ms()) { | 
| 1946 | 25.4M |     state_->UpdateOldLeaderLeaseExpirationOnNonLeaderUnlocked( | 
| 1947 | 25.4M |         CoarseTimeLease(deduped_req.leader_uuid, | 
| 1948 | 25.4M |                         CoarseMonoClock::now() + request->leader_lease_duration_ms() * 1ms), | 
| 1949 | 25.4M |         PhysicalComponentLease(deduped_req.leader_uuid, request->ht_lease_expiration())); | 
| 1950 | 25.4M |   } | 
| 1951 |  |  | 
| 1952 |  |   // Also prohibit voting for anyone for the minimum election timeout. | 
| 1953 | 25.4M |   withhold_votes_until_.store(now + MinimumElectionTimeout(), std::memory_order_release); | 
| 1954 |  |  | 
| 1955 |  |   // 1 - Early commit pending (and committed) operations | 
| 1956 | 25.4M |   RETURN_NOT_OK(EarlyCommitUnlocked(*request, deduped_req)); | 
| 1957 |  |  | 
| 1958 |  |   // 2 - Enqueue the prepares | 
| 1959 | 25.4M |   if (!VERIFY_RESULT(EnqueuePreparesUnlocked(*request, &deduped_req, response))) { | 
| 1960 | 1 |     return UpdateReplicaResult(); | 
| 1961 | 1 |   } | 
| 1962 |  |  | 
| 1963 | 25.4M |   if (deduped_req.committed_op_id.index < prev_committed_op_id.index) { | 
| 1964 | 75 |     deduped_req.committed_op_id = prev_committed_op_id; | 
| 1965 | 75 |   } | 
| 1966 |  |  | 
| 1967 |  |   // 3 - Enqueue the writes. | 
| 1968 | 25.4M |   auto last_from_leader = EnqueueWritesUnlocked( | 
| 1969 | 25.4M |       deduped_req, WriteEmpty(prev_committed_op_id != deduped_req.committed_op_id)); | 
| 1970 |  |  | 
| 1971 |  |   // 4 - Mark operations as committed | 
| 1972 | 25.4M |   RETURN_NOT_OK(MarkOperationsAsCommittedUnlocked(*request, deduped_req, last_from_leader)); | 
| 1973 |  |  | 
| 1974 |  |   // Fill the response with the current state. We will not mutate anymore state until | 
| 1975 |  |   // we actually reply to the leader, we'll just wait for the messages to be durable. | 
| 1976 | 25.4M |   FillConsensusResponseOKUnlocked(response); | 
| 1977 |  |  | 
| 1978 | 25.4M |   UpdateReplicaResult result; | 
| 1979 |  |  | 
| 1980 |  |   // Check if there is an election pending and the op id pending upon has just been committed. | 
| 1981 | 25.4M |   const auto& pending_election_op_id = state_->GetPendingElectionOpIdUnlocked(); | 
| 1982 | 25.4M |   result.start_election = | 
| 1983 | 25.4M |       !pending_election_op_id.empty() && | 
| 1984 | 25.4M |       pending_election_op_id.index <= state_->GetCommittedOpIdUnlocked().index0; | 
| 1985 |  |  | 
| 1986 | 25.4M |   if (!deduped_req.messages.empty()) { | 
| 1987 | 8.28M |     result.wait_for_op_id = state_->GetLastReceivedOpIdUnlocked(); | 
| 1988 | 8.28M |   } | 
| 1989 | 25.4M |   result.current_term = state_->GetCurrentTermUnlocked(); | 
| 1990 |  |  | 
| 1991 | 25.4M |   uint64_t update_time_ms = 0; | 
| 1992 | 25.4M |   if (request->has_propagated_hybrid_time()) { | 
| 1993 | 25.4M |     update_time_ms =  HybridTime::FromPB( | 
| 1994 | 25.4M |         request->propagated_hybrid_time()).GetPhysicalValueMicros() / 1000; | 
| 1995 | 25.4M |   } else if (3.56k !deduped_req.messages.empty()3.56k) { | 
| 1996 | 112 |     update_time_ms = HybridTime::FromPB( | 
| 1997 | 112 |         deduped_req.messages.back()->hybrid_time()).GetPhysicalValueMicros() / 1000; | 
| 1998 | 112 |   } | 
| 1999 | 25.4M |   follower_last_update_time_ms_metric_->UpdateTimestampInMilliseconds( | 
| 2000 | 25.4M |       (update_time_ms > 0 ? update_time_ms25.4M: clock_->Now().GetPhysicalValueMicros() / 10005.97k)); | 
| 2001 | 25.4M |   TRACE("UpdateReplica() finished"); | 
| 2002 | 25.4M |   return result; | 
| 2003 | 25.4M | } | 
| 2004 |  |  | 
| 2005 |  | Status RaftConsensus::EarlyCommitUnlocked(const ConsensusRequestPB& request, | 
| 2006 | 25.4M |                                           const LeaderRequest& deduped_req) { | 
| 2007 |  |   // What should we commit? | 
| 2008 |  |   // 1. As many pending operations as we can, except... | 
| 2009 |  |   // 2. ...if we commit beyond the preceding index, we'd regress KUDU-639 | 
| 2010 |  |   //    ("Leader doesn't overwrite demoted follower's log properly"), and... | 
| 2011 |  |   // 3. ...the leader's committed index is always our upper bound. | 
| 2012 | 25.4M |   auto early_apply_up_to = state_->GetLastPendingOperationOpIdUnlocked(); | 
| 2013 | 25.4M |   if (deduped_req.preceding_op_id.index < early_apply_up_to.index) { | 
| 2014 | 5 |     early_apply_up_to = deduped_req.preceding_op_id; | 
| 2015 | 5 |   } | 
| 2016 | 25.4M |   if (request.committed_op_id().index() < early_apply_up_to.index) { | 
| 2017 | 40.8k |     early_apply_up_to = yb::OpId::FromPB(request.committed_op_id()); | 
| 2018 | 40.8k |   } | 
| 2019 |  |  | 
| 2020 | 25.4M |   VLOG_WITH_PREFIX244 (1) << "Early marking committed up to " << early_apply_up_to244; | 
| 2021 | 25.4M |   TRACE("Early marking committed up to $0.$1", early_apply_up_to.term, early_apply_up_to.index); | 
| 2022 | 25.4M |   return ResultToStatus(state_->AdvanceCommittedOpIdUnlocked(early_apply_up_to, CouldStop::kTrue)); | 
| 2023 | 25.4M | } | 
| 2024 |  |  | 
| 2025 |  | Result<bool> RaftConsensus::EnqueuePreparesUnlocked(const ConsensusRequestPB& request, | 
| 2026 |  |                                                     LeaderRequest* deduped_req_ptr, | 
| 2027 | 25.4M |                                                     ConsensusResponsePB* response) { | 
| 2028 | 25.4M |   LeaderRequest& deduped_req = *deduped_req_ptr; | 
| 2029 | 25.4M |   TRACE("Triggering prepare for $0 ops", deduped_req.messages.size()); | 
| 2030 |  |  | 
| 2031 | 25.4M |   Status prepare_status; | 
| 2032 | 25.4M |   auto iter = deduped_req.messages.begin(); | 
| 2033 |  |  | 
| 2034 | 25.4M |   if (PREDICT_TRUE(!deduped_req.messages.empty())) { | 
| 2035 |  |     // TODO Temporary until the leader explicitly propagates the safe hybrid_time. | 
| 2036 |  |     // TODO: what if there is a failure here because the updated time is too far in the future? | 
| 2037 | 8.27M |     clock_->Update(HybridTime(deduped_req.messages.back()->hybrid_time())); | 
| 2038 | 8.27M |   } | 
| 2039 |  |  | 
| 2040 | 25.4M |   HybridTime propagated_safe_time; | 
| 2041 | 25.4M |   if (request.has_propagated_safe_time()) { | 
| 2042 | 25.4M |     propagated_safe_time = HybridTime(request.propagated_safe_time()); | 
| 2043 | 25.4M |     if (deduped_req.messages.empty()) { | 
| 2044 | 17.1M |       state_->context()->SetPropagatedSafeTime(propagated_safe_time); | 
| 2045 | 17.1M |     } | 
| 2046 | 25.4M |   } | 
| 2047 |  |  | 
| 2048 | 25.4M |   if (iter != deduped_req.messages.end()) { | 
| 2049 | 9.41M |     for (;;) { | 
| 2050 | 9.41M |       const ReplicateMsgPtr& msg = *iter; | 
| 2051 | 9.41M |       ++iter; | 
| 2052 | 9.41M |       bool last = iter == deduped_req.messages.end(); | 
| 2053 | 9.41M |       prepare_status = StartReplicaOperationUnlocked( | 
| 2054 | 9.41M |           msg, last ? propagated_safe_time8.27M: HybridTime::kInvalid1.13M); | 
| 2055 | 9.41M |       if (PREDICT_FALSE(!prepare_status.ok())) { | 
| 2056 | 322 |         --iter; | 
| 2057 | 322 |         LOG_WITH_PREFIX(WARNING) << "StartReplicaOperationUnlocked failed: " << prepare_status; | 
| 2058 | 322 |         break; | 
| 2059 | 322 |       } | 
| 2060 | 9.41M |       if (last) { | 
| 2061 | 8.28M |         break; | 
| 2062 | 8.28M |       } | 
| 2063 | 9.41M |     } | 
| 2064 | 8.27M |   } | 
| 2065 |  |  | 
| 2066 |  |   // If we stopped before reaching the end we failed to prepare some message(s) and need | 
| 2067 |  |   // to perform cleanup, namely trimming deduped_req.messages to only contain the messages | 
| 2068 |  |   // that were actually prepared, and deleting the other ones since we've taken ownership | 
| 2069 |  |   // when we first deduped. | 
| 2070 | 25.4M |   bool incomplete = iter != deduped_req.messages.end(); | 
| 2071 | 25.4M |   if (incomplete) { | 
| 2072 | 322 |     { | 
| 2073 | 322 |       const ReplicateMsgPtr msg = *iter; | 
| 2074 | 322 |       LOG_WITH_PREFIX(WARNING) | 
| 2075 | 322 |           << "Could not prepare operation for op: " | 
| 2076 | 322 |           << msg->id() << ". Suppressed " << (deduped_req.messages.end() - iter - 1) | 
| 2077 | 322 |           << " other warnings. Status for this op: " << prepare_status; | 
| 2078 | 322 |       deduped_req.messages.erase(iter, deduped_req.messages.end()); | 
| 2079 | 322 |     } | 
| 2080 |  |  | 
| 2081 |  |     // If this is empty, it means we couldn't prepare a single de-duped message. There is nothing | 
| 2082 |  |     // else we can do. The leader will detect this and retry later. | 
| 2083 | 322 |     if (deduped_req.messages.empty()) { | 
| 2084 | 1 |       auto msg = Format("Rejecting Update request from peer $0 for term $1. " | 
| 2085 | 1 |                         "Could not prepare a single operation due to: $2", | 
| 2086 | 1 |                         request.caller_uuid(), | 
| 2087 | 1 |                         request.caller_term(), | 
| 2088 | 1 |                         prepare_status); | 
| 2089 | 1 |       LOG_WITH_PREFIX(INFO) << msg; | 
| 2090 | 1 |       FillConsensusResponseError(response, ConsensusErrorPB::CANNOT_PREPARE, | 
| 2091 | 1 |                                  STATUS(IllegalState, msg)); | 
| 2092 | 1 |       FillConsensusResponseOKUnlocked(response); | 
| 2093 | 1 |       return false; | 
| 2094 | 1 |     } | 
| 2095 | 322 |   } | 
| 2096 |  |  | 
| 2097 | 25.4M |   deduped_req.committed_op_id = yb::OpId::FromPB(request.committed_op_id()); | 
| 2098 | 25.4M |   if (!deduped_req.messages.empty()) { | 
| 2099 | 8.28M |     auto last_op_id = yb::OpId::FromPB(deduped_req.messages.back()->id()); | 
| 2100 | 8.28M |     if (deduped_req.committed_op_id > last_op_id) { | 
| 2101 | 299 |       LOG_IF_WITH_PREFIX0(DFATAL, !incomplete) | 
| 2102 | 0 |           << "Received committed op id: " << deduped_req.committed_op_id | 
| 2103 | 0 |           << ", past last known op id: " << last_op_id; | 
| 2104 |  |  | 
| 2105 |  |       // It is possible that we failed to prepare of of messages, | 
| 2106 |  |       // so limit committed op id to avoid having committed op id past last known op it. | 
| 2107 | 299 |       deduped_req.committed_op_id = last_op_id; | 
| 2108 | 299 |     } | 
| 2109 | 8.28M |   } | 
| 2110 |  |  | 
| 2111 | 25.4M |   return true; | 
| 2112 | 25.4M | } | 
| 2113 |  |  | 
| 2114 |  | yb::OpId RaftConsensus::EnqueueWritesUnlocked(const LeaderRequest& deduped_req, | 
| 2115 | 25.4M |                                               WriteEmpty write_empty) { | 
| 2116 |  |   // Now that we've triggered the prepares enqueue the operations to be written | 
| 2117 |  |   // to the WAL. | 
| 2118 | 25.4M |   if (PREDICT_TRUE(!deduped_req.messages.empty()) || write_empty17.1M) { | 
| 2119 |  |     // Trigger the log append asap, if fsync() is on this might take a while | 
| 2120 |  |     // and we can't reply until this is done. | 
| 2121 |  |     // | 
| 2122 |  |     // Since we've prepared, we need to be able to append (or we risk trying to apply | 
| 2123 |  |     // later something that wasn't logged). We crash if we can't. | 
| 2124 | 15.0M |     CHECK_OK(queue_->AppendOperations( | 
| 2125 | 15.0M |         deduped_req.messages, deduped_req.committed_op_id, state_->Clock().Now())); | 
| 2126 | 15.0M |   } | 
| 2127 |  |  | 
| 2128 | 25.4M |   return !deduped_req.messages.empty() ? | 
| 2129 | 17.1M |       yb::OpId::FromPB(deduped_req.messages.back()->id())8.28M: deduped_req.preceding_op_id; | 
| 2130 | 25.4M | } | 
| 2131 |  |  | 
| 2132 | 8.27M | Status RaftConsensus::WaitForWrites(int64_t term, const OpId& wait_for_op_id) { | 
| 2133 |  |   // 5 - We wait for the writes to be durable. | 
| 2134 |  |  | 
| 2135 |  |   // Note that this is safe because dist consensus now only supports a single outstanding | 
| 2136 |  |   // request at a time and this way we can allow commits to proceed while we wait. | 
| 2137 | 8.27M |   TRACE("Waiting on the replicates to finish logging"); | 
| 2138 | 8.27M |   TRACE_EVENT0("consensus", "Wait for log"); | 
| 2139 | 8.27M |   for (;;) { | 
| 2140 | 8.27M |     auto wait_result = log_->WaitForSafeOpIdToApply( | 
| 2141 | 8.27M |         wait_for_op_id, MonoDelta::FromMilliseconds(FLAGS_raft_heartbeat_interval_ms)); | 
| 2142 |  |     // If just waiting for our log append to finish lets snooze the timer. | 
| 2143 |  |     // We don't want to fire leader election because we're waiting on our own log. | 
| 2144 | 8.28M |     if (!wait_result.empty()8.27M) { | 
| 2145 | 8.28M |       break; | 
| 2146 | 8.28M |     } | 
| 2147 | 18.4E |     int64_t new_term; | 
| 2148 | 18.4E |     { | 
| 2149 | 18.4E |       auto lock = state_->LockForRead(); | 
| 2150 | 18.4E |       new_term = state_->GetCurrentTermUnlocked(); | 
| 2151 | 18.4E |     } | 
| 2152 | 18.4E |     if (term != new_term) { | 
| 2153 | 0 |       return STATUS_FORMAT(IllegalState, "Term changed to $0 while waiting for writes in term $1", | 
| 2154 | 0 |                            new_term, term); | 
| 2155 | 0 |     } | 
| 2156 |  |  | 
| 2157 | 18.4E |     SnoozeFailureDetector(DO_NOT_LOG); | 
| 2158 |  |  | 
| 2159 | 18.4E |     const auto election_timeout_at = MonoTime::Now() + MinimumElectionTimeout(); | 
| 2160 | 18.4E |     UpdateAtomicMax(&withhold_votes_until_, election_timeout_at); | 
| 2161 | 18.4E |   } | 
| 2162 | 8.27M |   TRACE("Finished waiting on the replicates to finish logging"); | 
| 2163 |  |  | 
| 2164 | 8.27M |   return Status::OK(); | 
| 2165 | 8.27M | } | 
| 2166 |  |  | 
| 2167 |  | Status RaftConsensus::MarkOperationsAsCommittedUnlocked(const ConsensusRequestPB& request, | 
| 2168 |  |                                                         const LeaderRequest& deduped_req, | 
| 2169 | 25.4M |                                                         yb::OpId last_from_leader) { | 
| 2170 |  |   // Choose the last operation to be applied. This will either be 'committed_index', if | 
| 2171 |  |   // no prepare enqueuing failed, or the minimum between 'committed_index' and the id of | 
| 2172 |  |   // the last successfully enqueued prepare, if some prepare failed to enqueue. | 
| 2173 | 25.4M |   yb::OpId apply_up_to; | 
| 2174 | 25.4M |   if (last_from_leader.index < request.committed_op_id().index()) { | 
| 2175 |  |     // we should never apply anything later than what we received in this request | 
| 2176 | 300 |     apply_up_to = last_from_leader; | 
| 2177 |  |  | 
| 2178 | 300 |     LOG_WITH_PREFIX(INFO) | 
| 2179 | 300 |         << "Received commit index " << request.committed_op_id() | 
| 2180 | 300 |         << " from the leader but only marked up to " << apply_up_to << " as committed."; | 
| 2181 | 25.4M |   } else { | 
| 2182 | 25.4M |     apply_up_to = yb::OpId::FromPB(request.committed_op_id()); | 
| 2183 | 25.4M |   } | 
| 2184 |  |  | 
| 2185 |  |   // We can now update the last received watermark. | 
| 2186 |  |   // | 
| 2187 |  |   // We do it here (and before we actually hear back from the wal whether things | 
| 2188 |  |   // are durable) so that, if we receive another, possible duplicate, message | 
| 2189 |  |   // that exercises this path we don't handle these messages twice. | 
| 2190 |  |   // | 
| 2191 |  |   // If any messages failed to be started locally, then we already have removed them | 
| 2192 |  |   // from 'deduped_req' at this point. So, we can simply update our last-received | 
| 2193 |  |   // watermark to the last message that remains in 'deduped_req'. | 
| 2194 |  |   // | 
| 2195 |  |   // It's possible that the leader didn't send us any new data -- it might be a completely | 
| 2196 |  |   // duplicate request. In that case, we don't need to update LastReceived at all. | 
| 2197 | 25.4M |   if (!deduped_req.messages.empty()) { | 
| 2198 | 8.28M |     OpIdPB last_appended = deduped_req.messages.back()->id(); | 
| 2199 | 8.28M |     TRACE(Substitute("Updating last received op as $0", last_appended.ShortDebugString())); | 
| 2200 | 8.28M |     state_->UpdateLastReceivedOpIdUnlocked(last_appended); | 
| 2201 | 17.1M |   } else if (state_->GetLastReceivedOpIdUnlocked().index < deduped_req.preceding_op_id.index) { | 
| 2202 | 0 |     return STATUS_FORMAT(InvalidArgument, | 
| 2203 | 0 |                          "Bad preceding_opid: $0, last received: $1", | 
| 2204 | 0 |                          deduped_req.preceding_op_id, | 
| 2205 | 0 |                          state_->GetLastReceivedOpIdUnlocked()); | 
| 2206 | 0 |   } | 
| 2207 |  |  | 
| 2208 | 18.4E |   VLOG_WITH_PREFIX(1) << "Marking committed up to " << apply_up_to; | 
| 2209 | 25.4M |   TRACE(Format("Marking committed up to $0", apply_up_to)); | 
| 2210 | 25.4M |   return ResultToStatus(state_->AdvanceCommittedOpIdUnlocked(apply_up_to, CouldStop::kTrue)); | 
| 2211 | 25.4M | } | 
| 2212 |  |  | 
| 2213 | 25.5M | void RaftConsensus::FillConsensusResponseOKUnlocked(ConsensusResponsePB* response) { | 
| 2214 | 25.5M |   TRACE("Filling consensus response to leader."); | 
| 2215 | 25.5M |   response->set_responder_term(state_->GetCurrentTermUnlocked()); | 
| 2216 | 25.5M |   state_->GetLastReceivedOpIdUnlocked().ToPB(response->mutable_status()->mutable_last_received()); | 
| 2217 | 25.5M |   state_->GetLastReceivedOpIdCurLeaderUnlocked().ToPB( | 
| 2218 | 25.5M |       response->mutable_status()->mutable_last_received_current_leader()); | 
| 2219 | 25.5M |   response->mutable_status()->set_last_committed_idx(state_->GetCommittedOpIdUnlocked().index); | 
| 2220 | 25.5M |   state_->GetLastAppliedOpIdUnlocked().ToPB(response->mutable_status()->mutable_last_applied()); | 
| 2221 | 25.5M | } | 
| 2222 |  |  | 
| 2223 |  | void RaftConsensus::FillConsensusResponseError(ConsensusResponsePB* response, | 
| 2224 |  |                                                ConsensusErrorPB::Code error_code, | 
| 2225 | 122k |                                                const Status& status) { | 
| 2226 | 122k |   ConsensusErrorPB* error = response->mutable_status()->mutable_error(); | 
| 2227 | 122k |   error->set_code(error_code); | 
| 2228 | 122k |   StatusToPB(status, error->mutable_status()); | 
| 2229 | 122k | } | 
| 2230 |  |  | 
| 2231 | 1.36M | Status RaftConsensus::RequestVote(const VoteRequestPB* request, VoteResponsePB* response) { | 
| 2232 | 1.36M |   TRACE_EVENT2("consensus", "RaftConsensus::RequestVote", | 
| 2233 | 1.36M |                "peer", peer_uuid(), | 
| 2234 | 1.36M |                "tablet", tablet_id()); | 
| 2235 | 1.36M |   bool preelection = request->preelection(); | 
| 2236 |  |  | 
| 2237 | 1.36M |   response->set_responder_uuid(state_->GetPeerUuid()); | 
| 2238 | 1.36M |   response->set_preelection(preelection); | 
| 2239 |  |  | 
| 2240 |  |   // We must acquire the update lock in order to ensure that this vote action | 
| 2241 |  |   // takes place between requests. | 
| 2242 |  |   // Lock ordering: The update lock must be acquired before the ReplicaState lock. | 
| 2243 | 1.36M |   std::unique_lock<decltype(update_mutex_)> update_guard(update_mutex_, std::defer_lock); | 
| 2244 | 1.36M |   if (FLAGS_enable_leader_failure_detection) { | 
| 2245 | 1.36M |     update_guard.try_lock(); | 
| 2246 | 1.36M |   } else { | 
| 2247 |  |     // If failure detection is not enabled, then we can't just reject the vote, | 
| 2248 |  |     // because there will be no automatic retry later. So, block for the lock. | 
| 2249 | 770 |     update_guard.lock(); | 
| 2250 | 770 |   } | 
| 2251 | 1.36M |   if (!update_guard.owns_lock()) { | 
| 2252 |  |     // There is another vote or update concurrent with the vote. In that case, that | 
| 2253 |  |     // other request is likely to reset the timer, and we'll end up just voting | 
| 2254 |  |     // "NO" after waiting. To avoid starving RPC handlers and causing cascading | 
| 2255 |  |     // timeouts, just vote a quick NO. | 
| 2256 |  |     // | 
| 2257 |  |     // We still need to take the state lock in order to respond with term info, etc. | 
| 2258 | 11.1k |     ReplicaState::UniqueLock state_guard; | 
| 2259 | 11.1k |     RETURN_NOT_OK(state_->LockForConfigChange(&state_guard)); | 
| 2260 | 11.1k |     return RequestVoteRespondIsBusy(request, response); | 
| 2261 | 11.1k |   } | 
| 2262 |  |  | 
| 2263 |  |   // Acquire the replica state lock so we can read / modify the consensus state. | 
| 2264 | 1.35M |   ReplicaState::UniqueLock state_guard; | 
| 2265 | 1.35M |   RETURN_NOT_OK(state_->LockForConfigChange(&state_guard)); | 
| 2266 |  |  | 
| 2267 |  |   // If the node is not in the configuration, allow the vote (this is required by Raft) | 
| 2268 |  |   // but log an informational message anyway. | 
| 2269 | 1.35M |   if (!IsRaftConfigMember(request->candidate_uuid(), state_->GetActiveConfigUnlocked())) { | 
| 2270 | 90 |     LOG_WITH_PREFIX(INFO) << "Handling vote request from an unknown peer " | 
| 2271 | 90 |                           << request->candidate_uuid(); | 
| 2272 | 90 |   } | 
| 2273 |  |  | 
| 2274 |  |   // If we've heard recently from the leader, then we should ignore the request | 
| 2275 |  |   // (except if it is the leader itself requesting a vote -- something that might | 
| 2276 |  |   //  happen if the leader were to stepdown and call an election.). Otherwise, | 
| 2277 |  |   // it might be from a "disruptive" server. This could happen in a few cases: | 
| 2278 |  |   // | 
| 2279 |  |   // 1) Network partitions | 
| 2280 |  |   // If the leader can talk to a majority of the nodes, but is partitioned from a | 
| 2281 |  |   // bad node, the bad node's failure detector will trigger. If the bad node is | 
| 2282 |  |   // able to reach other nodes in the cluster, it will continuously trigger elections. | 
| 2283 |  |   // | 
| 2284 |  |   // 2) An abandoned node | 
| 2285 |  |   // It's possible that a node has fallen behind the log GC mark of the leader. In that | 
| 2286 |  |   // case, the leader will stop sending it requests. Eventually, the configuration | 
| 2287 |  |   // will change to eject the abandoned node, but until that point, we don't want the | 
| 2288 |  |   // abandoned follower to disturb the other nodes. | 
| 2289 |  |   // | 
| 2290 |  |   // See also https://ramcloud.stanford.edu/~ongaro/thesis.pdf | 
| 2291 |  |   // section 4.2.3. | 
| 2292 | 1.35M |   MonoTime now = MonoTime::Now(); | 
| 2293 | 1.35M |   if (request->candidate_uuid() != state_->GetLeaderUuidUnlocked() && | 
| 2294 | 1.35M |       !request->ignore_live_leader()723k&& | 
| 2295 | 1.35M |       now < withhold_votes_until_.load(std::memory_order_acquire)545k) { | 
| 2296 | 8.89k |     return RequestVoteRespondLeaderIsAlive(request, response); | 
| 2297 | 8.89k |   } | 
| 2298 |  |  | 
| 2299 |  |   // Candidate is running behind. | 
| 2300 | 1.34M |   if (request->candidate_term() < state_->GetCurrentTermUnlocked()) { | 
| 2301 | 204k |     return RequestVoteRespondInvalidTerm(request, response); | 
| 2302 | 204k |   } | 
| 2303 |  |  | 
| 2304 |  |   // We already voted this term. | 
| 2305 | 1.13M |   if (request->candidate_term() == state_->GetCurrentTermUnlocked() && | 
| 2306 | 1.13M |       state_->HasVotedCurrentTermUnlocked()62.1k) { | 
| 2307 |  |  | 
| 2308 |  |     // Already voted for the same candidate in the current term. | 
| 2309 | 1.37k |     if (state_->GetVotedForCurrentTermUnlocked() == request->candidate_uuid()) { | 
| 2310 | 209 |       return RequestVoteRespondVoteAlreadyGranted(request, response); | 
| 2311 | 209 |     } | 
| 2312 |  |  | 
| 2313 |  |     // Voted for someone else in current term. | 
| 2314 | 1.16k |     return RequestVoteRespondAlreadyVotedForOther(request, response); | 
| 2315 | 1.37k |   } | 
| 2316 |  |  | 
| 2317 |  |   // The term advanced. | 
| 2318 | 1.13M |   if (request->candidate_term() > state_->GetCurrentTermUnlocked() && !preelection1.07M) { | 
| 2319 | 107k |     RETURN_NOT_OK_PREPEND(HandleTermAdvanceUnlocked(request->candidate_term()), | 
| 2320 | 107k |         Substitute("Could not step down in RequestVote. Current term: $0, candidate term: $1", | 
| 2321 | 107k |             state_->GetCurrentTermUnlocked(), request->candidate_term())); | 
| 2322 | 107k |   } | 
| 2323 |  |  | 
| 2324 |  |   // Candidate must have last-logged OpId at least as large as our own to get our vote. | 
| 2325 | 1.13M |   OpIdPB local_last_logged_opid; | 
| 2326 | 1.13M |   GetLatestOpIdFromLog().ToPB(&local_last_logged_opid); | 
| 2327 | 1.13M |   if (OpIdLessThan(request->candidate_status().last_received(), local_last_logged_opid)) { | 
| 2328 | 433 |     return RequestVoteRespondLastOpIdTooOld(local_last_logged_opid, request, response); | 
| 2329 | 433 |   } | 
| 2330 |  |  | 
| 2331 | 1.13M |   if (!preelection) { | 
| 2332 |  |     // Clear the pending election op id if any before granting the vote. If another peer jumps in | 
| 2333 |  |     // before we can catch up and start the election, let's not disrupt the quorum with another | 
| 2334 |  |     // election. | 
| 2335 | 107k |     state_->ClearPendingElectionOpIdUnlocked(); | 
| 2336 | 107k |   } | 
| 2337 |  |  | 
| 2338 | 1.13M |   auto remaining_old_leader_lease = state_->RemainingOldLeaderLeaseDuration(); | 
| 2339 |  |  | 
| 2340 | 1.13M |   if (remaining_old_leader_lease.Initialized()) { | 
| 2341 | 20.7k |     response->set_remaining_leader_lease_duration_ms( | 
| 2342 | 20.7k |         narrow_cast<int32_t>(remaining_old_leader_lease.ToMilliseconds())); | 
| 2343 | 20.7k |     response->set_leader_lease_uuid(state_->old_leader_lease().holder_uuid); | 
| 2344 | 20.7k |   } | 
| 2345 |  |  | 
| 2346 | 1.13M |   const auto& old_leader_ht_lease = state_->old_leader_ht_lease(); | 
| 2347 | 1.13M |   if (old_leader_ht_lease) { | 
| 2348 | 803k |     response->set_leader_ht_lease_expiration(old_leader_ht_lease.expiration); | 
| 2349 | 803k |     response->set_leader_ht_lease_uuid(old_leader_ht_lease.holder_uuid); | 
| 2350 | 803k |   } | 
| 2351 |  |  | 
| 2352 |  |   // Passed all our checks. Vote granted. | 
| 2353 | 1.13M |   if (preelection) { | 
| 2354 | 1.02M |     LOG_WITH_PREFIX(INFO) << "Pre-election. Granting vote for candidate " | 
| 2355 | 1.02M |                           << request->candidate_uuid() << " in term " << request->candidate_term(); | 
| 2356 | 1.02M |     FillVoteResponseVoteGranted(*request, response); | 
| 2357 | 1.02M |     return Status::OK(); | 
| 2358 | 1.02M |   } | 
| 2359 |  |  | 
| 2360 | 108k |   return RequestVoteRespondVoteGranted(request, response); | 
| 2361 | 1.13M | } | 
| 2362 |  |  | 
| 2363 |  | Status RaftConsensus::IsLeaderReadyForChangeConfigUnlocked(ChangeConfigType type, | 
| 2364 | 2.91M |                                                            const string& server_uuid) { | 
| 2365 | 2.91M |   const RaftConfigPB& active_config = state_->GetActiveConfigUnlocked(); | 
| 2366 | 2.91M |   size_t servers_in_transition = 0; | 
| 2367 | 2.91M |   if (type == ADD_SERVER) { | 
| 2368 | 2.53k |     servers_in_transition = CountServersInTransition(active_config); | 
| 2369 | 2.91M |   } else if (type == REMOVE_SERVER) { | 
| 2370 |  |     // If we are trying to remove the server in transition, then servers_in_transition shouldn't | 
| 2371 |  |     // count it so we can proceed with the operation. | 
| 2372 | 2.18k |     servers_in_transition = CountServersInTransition(active_config, server_uuid); | 
| 2373 | 2.18k |   } | 
| 2374 |  |  | 
| 2375 |  |   // Check that all the following requirements are met: | 
| 2376 |  |   // 1. We are required by Raft to reject config change operations until we have | 
| 2377 |  |   //    committed at least one operation in our current term as leader. | 
| 2378 |  |   //    See https://groups.google.com/forum/#!topic/raft-dev/t4xj6dJTP6E | 
| 2379 |  |   // 2. Ensure there is no other pending change config. | 
| 2380 |  |   // 3. There are no peers that are in the process of becoming VOTERs or OBSERVERs. | 
| 2381 | 2.91M |   if (!state_->AreCommittedAndCurrentTermsSameUnlocked() || | 
| 2382 | 2.91M |       state_->IsConfigChangePendingUnlocked() || | 
| 2383 | 2.91M |       servers_in_transition != 06.18k) { | 
| 2384 | 2.91M |     return STATUS_FORMAT(IllegalState, | 
| 2385 | 2.91M |                          "Leader is not ready for Config Change, can try again. " | 
| 2386 | 2.91M |                          "Num peers in transit: $0. Type: $1. Has opid: $2. Committed config: $3. " | 
| 2387 | 2.91M |                          "Pending config: $4. Current term: $5. Committed op id: $6.", | 
| 2388 | 2.91M |                          servers_in_transition, ChangeConfigType_Name(type), | 
| 2389 | 2.91M |                          active_config.has_opid_index(), | 
| 2390 | 2.91M |                          state_->GetCommittedConfigUnlocked().ShortDebugString(), | 
| 2391 | 2.91M |                          state_->IsConfigChangePendingUnlocked() ? | 
| 2392 | 2.91M |                              state_->GetPendingConfigUnlocked().ShortDebugString() : "", | 
| 2393 | 2.91M |                          state_->GetCurrentTermUnlocked(), state_->GetCommittedOpIdUnlocked()); | 
| 2394 | 2.91M |   } | 
| 2395 |  |  | 
| 2396 | 5.92k |   return Status::OK(); | 
| 2397 | 2.91M | } | 
| 2398 |  |  | 
| 2399 |  | Status RaftConsensus::ChangeConfig(const ChangeConfigRequestPB& req, | 
| 2400 |  |                                    const StdStatusCallback& client_cb, | 
| 2401 | 2.91M |                                    boost::optional<TabletServerErrorPB::Code>* error_code) { | 
| 2402 | 2.91M |   if (PREDICT_FALSE(!req.has_type())) { | 
| 2403 | 0 |     return STATUS(InvalidArgument, "Must specify 'type' argument to ChangeConfig()", | 
| 2404 | 0 |                                    req.ShortDebugString()); | 
| 2405 | 0 |   } | 
| 2406 | 2.91M |   if (PREDICT_FALSE(!req.has_server())) { | 
| 2407 | 0 |     *error_code = TabletServerErrorPB::INVALID_CONFIG; | 
| 2408 | 0 |     return STATUS(InvalidArgument, "Must specify 'server' argument to ChangeConfig()", | 
| 2409 | 0 |                                    req.ShortDebugString()); | 
| 2410 | 0 |   } | 
| 2411 | 2.91M |   YB_LOG_EVERY_N(INFO, FLAGS_TEST_log_change_config_every_n) | 
| 2412 | 8.39k |       << "Received ChangeConfig request " << req.ShortDebugString(); | 
| 2413 | 2.91M |   ChangeConfigType type = req.type(); | 
| 2414 | 2.91M |   bool use_hostport = req.has_use_host() && req.use_host()23; | 
| 2415 |  |  | 
| 2416 | 2.91M |   if (type != REMOVE_SERVER && use_hostport2.91M) { | 
| 2417 | 0 |     return STATUS_SUBSTITUTE(InvalidArgument, "Cannot set use_host for change config type $0, " | 
| 2418 | 0 |                              "only allowed with REMOVE_SERVER.", type); | 
| 2419 | 0 |   } | 
| 2420 |  |  | 
| 2421 | 2.91M |   if (PREDICT_FALSE(FLAGS_TEST_return_error_on_change_config != 0.0 && type == CHANGE_ROLE)) { | 
| 2422 | 0 |     DCHECK(FLAGS_TEST_return_error_on_change_config >= 0.0 && | 
| 2423 | 0 |            FLAGS_TEST_return_error_on_change_config <= 1.0); | 
| 2424 | 0 |     if (clock_->Now().ToUint64() % 100 < 100 * FLAGS_TEST_return_error_on_change_config) { | 
| 2425 | 0 |       return STATUS(IllegalState, "Returning error for unit test"); | 
| 2426 | 0 |     } | 
| 2427 | 0 |   } | 
| 2428 | 2.91M |   RaftPeerPB* new_peer = nullptr; | 
| 2429 | 2.91M |   const RaftPeerPB& server = req.server(); | 
| 2430 | 2.91M |   if (!use_hostport2.91M&& !server.has_permanent_uuid()) { | 
| 2431 | 0 |     return STATUS(InvalidArgument, | 
| 2432 | 0 |                   Substitute("server must have permanent_uuid or use_host specified: $0", | 
| 2433 | 0 |                              req.ShortDebugString())); | 
| 2434 | 0 |   } | 
| 2435 | 2.91M |   { | 
| 2436 | 2.91M |     ReplicaState::UniqueLock lock; | 
| 2437 | 2.91M |     RETURN_NOT_OK(state_->LockForConfigChange(&lock)); | 
| 2438 | 2.91M |     Status s = state_->CheckActiveLeaderUnlocked(LeaderLeaseCheckMode::DONT_NEED_LEASE); | 
| 2439 | 2.91M |     if (!s.ok()) { | 
| 2440 | 817 |       *error_code = TabletServerErrorPB::NOT_THE_LEADER; | 
| 2441 | 817 |       return s; | 
| 2442 | 817 |     } | 
| 2443 |  |  | 
| 2444 | 18.4E |     const string& server_uuid = 2.91M server.has_permanent_uuid()2.91M? server.permanent_uuid()2.91M: ""; | 
| 2445 | 2.91M |     s = IsLeaderReadyForChangeConfigUnlocked(type, server_uuid); | 
| 2446 | 2.91M |     if (!s.ok()) { | 
| 2447 | 2.91M |       YB_LOG_EVERY_N(INFO, FLAGS_TEST_log_change_config_every_n) | 
| 2448 | 1.79k |           << "Returning not ready for " << ChangeConfigType_Name(type) | 
| 2449 | 1.79k |           << " due to error " << s.ToString(); | 
| 2450 | 2.91M |       *error_code = TabletServerErrorPB::LEADER_NOT_READY_CHANGE_CONFIG; | 
| 2451 | 2.91M |       return s; | 
| 2452 | 2.91M |     } | 
| 2453 |  |  | 
| 2454 | 5.91k |     const RaftConfigPB& committed_config = state_->GetCommittedConfigUnlocked(); | 
| 2455 |  |  | 
| 2456 |  |     // Support atomic ChangeConfig requests. | 
| 2457 | 5.91k |     if (req.has_cas_config_opid_index()) { | 
| 2458 | 3.75k |       if (committed_config.opid_index() != req.cas_config_opid_index()) { | 
| 2459 | 15 |         *error_code = TabletServerErrorPB::CAS_FAILED; | 
| 2460 | 15 |         return STATUS(IllegalState, Substitute("Request specified cas_config_opid_index " | 
| 2461 | 15 |                                                "of $0 but the committed config has opid_index " | 
| 2462 | 15 |                                                "of $1", | 
| 2463 | 15 |                                                req.cas_config_opid_index(), | 
| 2464 | 15 |                                                committed_config.opid_index())); | 
| 2465 | 15 |       } | 
| 2466 | 3.75k |     } | 
| 2467 |  |  | 
| 2468 | 5.89k |     RaftConfigPB new_config = committed_config; | 
| 2469 | 5.89k |     new_config.clear_opid_index(); | 
| 2470 | 5.89k |     switch (type) { | 
| 2471 | 2.06k |       case ADD_SERVER: | 
| 2472 |  |         // Ensure the server we are adding is not already a member of the configuration. | 
| 2473 | 2.06k |         if (IsRaftConfigMember(server_uuid, committed_config)) { | 
| 2474 | 0 |           *error_code = TabletServerErrorPB::ADD_CHANGE_CONFIG_ALREADY_PRESENT; | 
| 2475 | 0 |           return STATUS(IllegalState, | 
| 2476 | 0 |               Substitute("Server with UUID $0 is already a member of the config. RaftConfig: $1", | 
| 2477 | 0 |                         server_uuid, committed_config.ShortDebugString())); | 
| 2478 | 0 |         } | 
| 2479 | 2.06k |         if (!server.has_member_type()) { | 
| 2480 | 0 |           return STATUS(InvalidArgument, | 
| 2481 | 0 |                         Substitute("Server must have member_type specified. Request: $0", | 
| 2482 | 0 |                                    req.ShortDebugString())); | 
| 2483 | 0 |         } | 
| 2484 | 2.06k |         if (server.member_type() != PeerMemberType::PRE_VOTER && | 
| 2485 | 2.06k |             server.member_type() != PeerMemberType::PRE_OBSERVER79) { | 
| 2486 | 0 |           return STATUS(InvalidArgument, | 
| 2487 | 0 |               Substitute("Server with UUID $0 must be of member_type PRE_VOTER or PRE_OBSERVER. " | 
| 2488 | 0 |                          "member_type received: $1", server_uuid, | 
| 2489 | 0 |                          PeerMemberType_Name(server.member_type()))); | 
| 2490 | 0 |         } | 
| 2491 | 2.06k |         if (server.last_known_private_addr().empty()) { | 
| 2492 | 0 |           return STATUS(InvalidArgument, "server must have last_known_addr specified", | 
| 2493 | 0 |                                          req.ShortDebugString()); | 
| 2494 | 0 |         } | 
| 2495 | 2.06k |         new_peer = new_config.add_peers(); | 
| 2496 | 2.06k |         *new_peer = server; | 
| 2497 | 2.06k |         break; | 
| 2498 |  |  | 
| 2499 | 1.82k |       case REMOVE_SERVER: | 
| 2500 | 1.82k |         if (use_hostport) { | 
| 2501 | 5 |           if (server.last_known_private_addr().empty()) { | 
| 2502 | 0 |             return STATUS(InvalidArgument, "Must have last_known_addr specified.", | 
| 2503 | 0 |                           req.ShortDebugString()); | 
| 2504 | 0 |           } | 
| 2505 | 5 |           HostPort leader_hp; | 
| 2506 | 5 |           RETURN_NOT_OK(GetHostPortFromConfig( | 
| 2507 | 5 |               new_config, peer_uuid(), queue_->local_cloud_info(), &leader_hp)); | 
| 2508 | 5 |           for (const auto& host_port : server.last_known_private_addr()) { | 
| 2509 | 5 |             if (leader_hp.port() == host_port.port() && leader_hp.host() == host_port.host()0) { | 
| 2510 | 0 |               return STATUS(InvalidArgument, "Cannot remove live leader using hostport.", | 
| 2511 | 0 |                             req.ShortDebugString()); | 
| 2512 | 0 |             } | 
| 2513 | 5 |           } | 
| 2514 | 5 |         } | 
| 2515 | 1.82k |         if (server_uuid == peer_uuid()) { | 
| 2516 | 30 |           *error_code = TabletServerErrorPB::LEADER_NEEDS_STEP_DOWN; | 
| 2517 | 30 |           return STATUS(InvalidArgument, | 
| 2518 | 30 |               Substitute("Cannot remove peer $0 from the config because it is the leader. " | 
| 2519 | 30 |                          "Force another leader to be elected to remove this server. " | 
| 2520 | 30 |                          "Active consensus state: $1", server_uuid, | 
| 2521 | 30 |                          state_->ConsensusStateUnlocked(CONSENSUS_CONFIG_ACTIVE) | 
| 2522 | 30 |                             .ShortDebugString())); | 
| 2523 | 30 |         } | 
| 2524 | 1.79k |         if (!RemoveFromRaftConfig(&new_config, req)) { | 
| 2525 | 0 |           *error_code = TabletServerErrorPB::REMOVE_CHANGE_CONFIG_NOT_PRESENT; | 
| 2526 | 0 |           return STATUS(NotFound, | 
| 2527 | 0 |               Substitute("Server with UUID $0 not a member of the config. RaftConfig: $1", | 
| 2528 | 0 |                         server_uuid, committed_config.ShortDebugString())); | 
| 2529 | 0 |         } | 
| 2530 | 1.79k |         break; | 
| 2531 |  |  | 
| 2532 | 2.01k |       case CHANGE_ROLE: | 
| 2533 | 2.01k |         if (server_uuid == peer_uuid()) { | 
| 2534 | 0 |           return STATUS(InvalidArgument, | 
| 2535 | 0 |               Substitute("Cannot change role of peer $0 because it is the leader. Force " | 
| 2536 | 0 |                          "another leader to be elected. Active consensus state: $1", server_uuid, | 
| 2537 | 0 |                          state_->ConsensusStateUnlocked(CONSENSUS_CONFIG_ACTIVE) | 
| 2538 | 0 |                              .ShortDebugString())); | 
| 2539 | 0 |         } | 
| 2540 | 2.01k |         VLOG(3) << "config before CHANGE_ROLE: " << new_config.DebugString()0; | 
| 2541 |  |  | 
| 2542 | 2.01k |         if (!GetMutableRaftConfigMember(&new_config, server_uuid, &new_peer).ok()) { | 
| 2543 | 0 |           return STATUS(NotFound, | 
| 2544 | 0 |             Substitute("Server with UUID $0 not a member of the config. RaftConfig: $1", | 
| 2545 | 0 |                        server_uuid, new_config.ShortDebugString())); | 
| 2546 | 0 |         } | 
| 2547 | 2.01k |         if (new_peer->member_type() != PeerMemberType::PRE_OBSERVER && | 
| 2548 | 2.01k |             new_peer->member_type() != PeerMemberType::PRE_VOTER1.93k) { | 
| 2549 | 0 |           return STATUS(IllegalState, Substitute("Cannot change role of server with UUID $0 " | 
| 2550 | 0 |                                                  "because its member type is $1", | 
| 2551 | 0 |                                                  server_uuid, new_peer->member_type())); | 
| 2552 | 0 |         } | 
| 2553 | 2.01k |         if (new_peer->member_type() == PeerMemberType::PRE_OBSERVER) { | 
| 2554 | 77 |           new_peer->set_member_type(PeerMemberType::OBSERVER); | 
| 2555 | 1.93k |         } else { | 
| 2556 | 1.93k |           new_peer->set_member_type(PeerMemberType::VOTER); | 
| 2557 | 1.93k |         } | 
| 2558 |  |  | 
| 2559 | 2.01k |         VLOG(3) << "config after CHANGE_ROLE: " << new_config.DebugString()0; | 
| 2560 | 2.01k |         break; | 
| 2561 | 0 |       default: | 
| 2562 | 0 |         return STATUS(InvalidArgument, Substitute("Unsupported type $0", | 
| 2563 | 5.89k |                                                   ChangeConfigType_Name(type))); | 
| 2564 | 5.89k |     } | 
| 2565 |  |  | 
| 2566 | 5.87k |     auto cc_replicate = std::make_shared<ReplicateMsg>(); | 
| 2567 | 5.87k |     cc_replicate->set_op_type(CHANGE_CONFIG_OP); | 
| 2568 | 5.87k |     ChangeConfigRecordPB* cc_req = cc_replicate->mutable_change_config_record(); | 
| 2569 | 5.87k |     cc_req->set_tablet_id(tablet_id()); | 
| 2570 | 5.87k |     *cc_req->mutable_old_config() = committed_config; | 
| 2571 | 5.87k |     *cc_req->mutable_new_config() = new_config; | 
| 2572 |  |     // Note: This hybrid_time has no meaning from a serialization perspective | 
| 2573 |  |     // because this method is not executed on the TabletPeer's prepare thread. | 
| 2574 | 5.87k |     cc_replicate->set_hybrid_time(clock_->Now().ToUint64()); | 
| 2575 | 5.87k |     state_->GetCommittedOpIdUnlocked().ToPB(cc_replicate->mutable_committed_op_id()); | 
| 2576 |  |  | 
| 2577 | 5.87k |     auto context = std::make_shared<StateChangeContext>( | 
| 2578 | 5.87k |         StateChangeReason::LEADER_CONFIG_CHANGE_COMPLETE, *cc_req, | 
| 2579 | 5.87k |         type == REMOVE_SERVER ? server_uuid1.80k: ""4.07k); | 
| 2580 |  |  | 
| 2581 | 5.87k |     RETURN_NOT_OK( | 
| 2582 | 5.87k |         ReplicateConfigChangeUnlocked(cc_replicate, | 
| 2583 | 5.87k |                                       new_config, | 
| 2584 | 5.87k |                                       type, | 
| 2585 | 5.87k |                                       std::bind(&RaftConsensus::MarkDirtyOnSuccess, | 
| 2586 | 5.87k |                                            this, | 
| 2587 | 5.87k |                                            std::move(context), | 
| 2588 | 5.87k |                                            std::move(client_cb), std::placeholders::_1))); | 
| 2589 | 5.87k |   } | 
| 2590 |  |  | 
| 2591 | 5.87k |   peer_manager_->SignalRequest(RequestTriggerMode::kNonEmptyOnly); | 
| 2592 |  |  | 
| 2593 | 5.87k |   return Status::OK(); | 
| 2594 | 5.87k | } | 
| 2595 |  |  | 
| 2596 |  | Status RaftConsensus::UnsafeChangeConfig( | 
| 2597 |  |     const UnsafeChangeConfigRequestPB& req, | 
| 2598 | 6 |     boost::optional<tserver::TabletServerErrorPB::Code>* error_code) { | 
| 2599 | 6 |   if (PREDICT_FALSE(!req.has_new_config())) { | 
| 2600 | 0 |     *error_code = TabletServerErrorPB::INVALID_CONFIG; | 
| 2601 | 0 |     return STATUS(InvalidArgument, "Request must contain 'new_config' argument " | 
| 2602 | 0 |                   "to UnsafeChangeConfig()", yb::ToString(req)); | 
| 2603 | 0 |   } | 
| 2604 | 6 |   if (PREDICT_FALSE(!req.has_caller_id())) { | 
| 2605 | 0 |     *error_code = TabletServerErrorPB::INVALID_CONFIG; | 
| 2606 | 0 |     return STATUS(InvalidArgument, "Must specify 'caller_id' argument to UnsafeChangeConfig()", | 
| 2607 | 0 |                   yb::ToString(req)); | 
| 2608 | 0 |   } | 
| 2609 |  |  | 
| 2610 |  |   // Grab the committed config and current term on this node. | 
| 2611 | 6 |   int64_t current_term; | 
| 2612 | 6 |   RaftConfigPB committed_config; | 
| 2613 | 6 |   OpId last_committed_opid; | 
| 2614 | 6 |   OpId preceding_opid; | 
| 2615 | 6 |   string local_peer_uuid; | 
| 2616 | 6 |   { | 
| 2617 |  |     // Take the snapshot of the replica state and queue state so that | 
| 2618 |  |     // we can stick them in the consensus update request later. | 
| 2619 | 6 |     auto lock = state_->LockForRead(); | 
| 2620 | 6 |     local_peer_uuid = state_->GetPeerUuid(); | 
| 2621 | 6 |     current_term = state_->GetCurrentTermUnlocked(); | 
| 2622 | 6 |     committed_config = state_->GetCommittedConfigUnlocked(); | 
| 2623 | 6 |     if (state_->IsConfigChangePendingUnlocked()) { | 
| 2624 | 3 |       LOG_WITH_PREFIX(WARNING) << "Replica has a pending config, but the new config " | 
| 2625 | 3 |                                << "will be unsafely changed anyway. " | 
| 2626 | 3 |                                << "Currently pending config on the node: " | 
| 2627 | 3 |                                << yb::ToString(state_->GetPendingConfigUnlocked()); | 
| 2628 | 3 |     } | 
| 2629 | 6 |     last_committed_opid = state_->GetCommittedOpIdUnlocked(); | 
| 2630 | 6 |     preceding_opid = state_->GetLastAppliedOpIdUnlocked(); | 
| 2631 | 6 |   } | 
| 2632 |  |  | 
| 2633 |  |   // Validate that passed replica uuids are part of the committed config | 
| 2634 |  |   // on this node.  This allows a manual recovery tool to only have to specify | 
| 2635 |  |   // the uuid of each replica in the new config without having to know the | 
| 2636 |  |   // addresses of each server (since we can get the address information from | 
| 2637 |  |   // the committed config). Additionally, only a subset of the committed config | 
| 2638 |  |   // is required for typical cluster repair scenarios. | 
| 2639 | 6 |   std::unordered_set<string> retained_peer_uuids; | 
| 2640 | 6 |   const RaftConfigPB& config = req.new_config(); | 
| 2641 | 7 |   for (const RaftPeerPB& new_peer : config.peers()) { | 
| 2642 | 7 |     const string& peer_uuid = new_peer.permanent_uuid(); | 
| 2643 | 7 |     retained_peer_uuids.insert(peer_uuid); | 
| 2644 | 7 |     if (!IsRaftConfigMember(peer_uuid, committed_config)) { | 
| 2645 | 0 |       *error_code = TabletServerErrorPB::INVALID_CONFIG; | 
| 2646 | 0 |       return STATUS(InvalidArgument, Substitute("Peer with uuid $0 is not in the committed  " | 
| 2647 | 0 |                                                 "config on this replica, rejecting the  " | 
| 2648 | 0 |                                                 "unsafe config change request for tablet $1. " | 
| 2649 | 0 |                                                 "Committed config: $2", | 
| 2650 | 0 |                                                 peer_uuid, req.tablet_id(), | 
| 2651 | 0 |                                                 yb::ToString(committed_config))); | 
| 2652 | 0 |     } | 
| 2653 | 7 |   } | 
| 2654 |  |  | 
| 2655 | 6 |   RaftConfigPB new_config = committed_config; | 
| 2656 | 18 |   for (const auto& peer : committed_config.peers()) { | 
| 2657 | 18 |     const string& peer_uuid = peer.permanent_uuid(); | 
| 2658 | 18 |     if (!ContainsKey(retained_peer_uuids, peer_uuid)) { | 
| 2659 | 11 |       ChangeConfigRequestPB req; | 
| 2660 | 11 |       req.set_tablet_id(tablet_id()); | 
| 2661 | 11 |       req.mutable_server()->set_permanent_uuid(peer_uuid); | 
| 2662 | 11 |       req.set_type(REMOVE_SERVER); | 
| 2663 | 11 |       req.set_cas_config_opid_index(committed_config.opid_index()); | 
| 2664 | 11 |       CHECK(RemoveFromRaftConfig(&new_config, req)); | 
| 2665 | 11 |     } | 
| 2666 | 18 |   } | 
| 2667 |  |   // Check that local peer is part of the new config and is a VOTER. | 
| 2668 |  |   // Although it is valid for a local replica to not have itself | 
| 2669 |  |   // in the committed config, it is rare and a replica without itself | 
| 2670 |  |   // in the latest config is definitely not caught up with the latest leader's log. | 
| 2671 | 6 |   if (!IsRaftConfigVoter(local_peer_uuid, new_config)) { | 
| 2672 | 0 |     *error_code = TabletServerErrorPB::INVALID_CONFIG; | 
| 2673 | 0 |     return STATUS(InvalidArgument, Substitute("Local replica uuid $0 is not " | 
| 2674 | 0 |                                               "a VOTER in the new config, " | 
| 2675 | 0 |                                               "rejecting the unsafe config " | 
| 2676 | 0 |                                               "change request for tablet $1. " | 
| 2677 | 0 |                                               "Rejected config: $2" , | 
| 2678 | 0 |                                               local_peer_uuid, req.tablet_id(), | 
| 2679 | 0 |                                               yb::ToString(new_config))); | 
| 2680 | 0 |   } | 
| 2681 | 6 |   new_config.set_unsafe_config_change(true); | 
| 2682 | 6 |   int64 replicate_opid_index = preceding_opid.index + 1; | 
| 2683 | 6 |   new_config.clear_opid_index(); | 
| 2684 |  |  | 
| 2685 |  |   // Sanity check the new config. | 
| 2686 | 6 |   Status s = VerifyRaftConfig(new_config, UNCOMMITTED_QUORUM); | 
| 2687 | 6 |   if (!s.ok()) { | 
| 2688 | 0 |     *error_code = TabletServerErrorPB::INVALID_CONFIG; | 
| 2689 | 0 |     return STATUS(InvalidArgument, Substitute("The resulting new config for tablet $0  " | 
| 2690 | 0 |                                               "from passed parameters has failed raft " | 
| 2691 | 0 |                                               "config sanity check: $1", | 
| 2692 | 0 |                                               req.tablet_id(), s.ToString())); | 
| 2693 | 0 |   } | 
| 2694 |  |  | 
| 2695 |  |   // Prepare the consensus request as if the request is being generated | 
| 2696 |  |   // from a different leader. | 
| 2697 | 6 |   ConsensusRequestPB consensus_req; | 
| 2698 | 6 |   consensus_req.set_caller_uuid(req.caller_id()); | 
| 2699 |  |   // Bumping up the term for the consensus request being generated. | 
| 2700 |  |   // This makes this request appear to come from a new leader that | 
| 2701 |  |   // the local replica doesn't know about yet. If the local replica | 
| 2702 |  |   // happens to be the leader, this will cause it to step down. | 
| 2703 | 6 |   const int64 new_term = current_term + 1; | 
| 2704 | 6 |   consensus_req.set_caller_term(new_term); | 
| 2705 | 6 |   preceding_opid.ToPB(consensus_req.mutable_preceding_id()); | 
| 2706 | 6 |   last_committed_opid.ToPB(consensus_req.mutable_committed_op_id()); | 
| 2707 |  |  | 
| 2708 |  |   // Prepare the replicate msg to be replicated. | 
| 2709 | 6 |   ReplicateMsg* replicate = consensus_req.add_ops(); | 
| 2710 | 6 |   ChangeConfigRecordPB* cc_req = replicate->mutable_change_config_record(); | 
| 2711 | 6 |   cc_req->set_tablet_id(req.tablet_id()); | 
| 2712 | 6 |   *cc_req->mutable_old_config() = committed_config; | 
| 2713 | 6 |   *cc_req->mutable_new_config() = new_config; | 
| 2714 | 6 |   OpIdPB* id = replicate->mutable_id(); | 
| 2715 |  |   // Bumping up both the term and the opid_index from what's found in the log. | 
| 2716 | 6 |   id->set_term(new_term); | 
| 2717 | 6 |   id->set_index(replicate_opid_index); | 
| 2718 | 6 |   replicate->set_op_type(CHANGE_CONFIG_OP); | 
| 2719 | 6 |   replicate->set_hybrid_time(clock_->Now().ToUint64()); | 
| 2720 | 6 |   last_committed_opid.ToPB(replicate->mutable_committed_op_id()); | 
| 2721 |  |  | 
| 2722 | 6 |   VLOG_WITH_PREFIX0(3) << "UnsafeChangeConfig: Generated consensus request: " | 
| 2723 | 0 |                       << yb::ToString(consensus_req); | 
| 2724 |  |  | 
| 2725 | 6 |   LOG_WITH_PREFIX(WARNING) << "PROCEEDING WITH UNSAFE CONFIG CHANGE ON THIS SERVER, " | 
| 2726 | 6 |                            << "COMMITTED CONFIG: " << yb::ToString(committed_config) | 
| 2727 | 6 |                            << "NEW CONFIG: " << yb::ToString(new_config); | 
| 2728 |  |  | 
| 2729 | 6 |   const auto deadline = CoarseMonoClock::Now() + 15s;  // TODO: fix me | 
| 2730 | 6 |   ConsensusResponsePB consensus_resp; | 
| 2731 | 6 |   s = Update(&consensus_req, &consensus_resp, deadline); | 
| 2732 | 6 |   if (!s.ok() || consensus_resp.has_error()) { | 
| 2733 | 0 |     *error_code = TabletServerErrorPB::UNKNOWN_ERROR; | 
| 2734 | 0 |   } | 
| 2735 | 6 |   if (s.ok() && consensus_resp.has_error()) { | 
| 2736 | 0 |     s = StatusFromPB(consensus_resp.error().status()); | 
| 2737 | 0 |   } | 
| 2738 | 6 |   return s; | 
| 2739 | 6 | } | 
| 2740 |  |  | 
| 2741 | 151k | void RaftConsensus::Shutdown() { | 
| 2742 | 151k |   LOG_WITH_PREFIX(INFO) << "Shutdown."; | 
| 2743 |  |  | 
| 2744 |  |   // Avoid taking locks if already shut down so we don't violate | 
| 2745 |  |   // ThreadRestrictions assertions in the case where the RaftConsensus | 
| 2746 |  |   // destructor runs on the reactor thread due to an election callback being | 
| 2747 |  |   // the last outstanding reference. | 
| 2748 | 151k |   if (shutdown_.Load(kMemOrderAcquire)) return75.6k; | 
| 2749 |  |  | 
| 2750 | 75.6k |   CHECK_OK(ExecuteHook(PRE_SHUTDOWN)); | 
| 2751 |  |  | 
| 2752 | 75.6k |   { | 
| 2753 | 75.6k |     ReplicaState::UniqueLock lock; | 
| 2754 |  |     // Transition to kShuttingDown state. | 
| 2755 | 75.6k |     CHECK_OK(state_->LockForShutdown(&lock)); | 
| 2756 | 75.6k |     step_down_check_tracker_.StartShutdown(); | 
| 2757 | 75.6k |   } | 
| 2758 | 75.6k |   step_down_check_tracker_.CompleteShutdown(); | 
| 2759 |  |  | 
| 2760 |  |   // Close the peer manager. | 
| 2761 | 75.6k |   peer_manager_->Close(); | 
| 2762 |  |  | 
| 2763 |  |   // We must close the queue after we close the peers. | 
| 2764 | 75.6k |   queue_->Close(); | 
| 2765 |  |  | 
| 2766 | 75.6k |   CHECK_OK(state_->CancelPendingOperations()); | 
| 2767 |  |  | 
| 2768 | 75.6k |   { | 
| 2769 | 75.6k |     ReplicaState::UniqueLock lock; | 
| 2770 | 75.6k |     CHECK_OK(state_->LockForShutdown(&lock)); | 
| 2771 | 75.6k |     CHECK_EQ(ReplicaState::kShuttingDown, state_->state()); | 
| 2772 | 75.6k |     CHECK_OK(state_->ShutdownUnlocked()); | 
| 2773 | 75.6k |     LOG_WITH_PREFIX(INFO) << "Raft consensus is shut down!"; | 
| 2774 | 75.6k |   } | 
| 2775 |  |  | 
| 2776 |  |   // Shut down things that might acquire locks during destruction. | 
| 2777 | 75.6k |   raft_pool_token_->Shutdown(); | 
| 2778 |  |   // We might not have run Start yet, so make sure we have a FD. | 
| 2779 | 75.6k |   if (failure_detector_75.6k) { | 
| 2780 | 75.6k |     DisableFailureDetector(); | 
| 2781 | 75.6k |   } | 
| 2782 |  |  | 
| 2783 | 75.6k |   CHECK_OK(ExecuteHook(POST_SHUTDOWN)); | 
| 2784 |  |  | 
| 2785 | 75.6k |   shutdown_.Store(true, kMemOrderRelease); | 
| 2786 | 75.6k | } | 
| 2787 |  |  | 
| 2788 | 0 | PeerRole RaftConsensus::GetActiveRole() const { | 
| 2789 | 0 |   auto lock = state_->LockForRead(); | 
| 2790 | 0 |   return state_->GetActiveRoleUnlocked(); | 
| 2791 | 0 | } | 
| 2792 |  |  | 
| 2793 | 1.14M | yb::OpId RaftConsensus::GetLatestOpIdFromLog() { | 
| 2794 | 1.14M |   return log_->GetLatestEntryOpId(); | 
| 2795 | 1.14M | } | 
| 2796 |  |  | 
| 2797 | 133k | Status RaftConsensus::StartConsensusOnlyRoundUnlocked(const ReplicateMsgPtr& msg) { | 
| 2798 | 133k |   OperationType op_type = msg->op_type(); | 
| 2799 | 133k |   if (!IsConsensusOnlyOperation(op_type)) { | 
| 2800 | 0 |     return STATUS_FORMAT(InvalidArgument, | 
| 2801 | 0 |                          "Expected a consensus-only op type, got $0: $1", | 
| 2802 | 0 |                          OperationType_Name(op_type), | 
| 2803 | 0 |                          *msg); | 
| 2804 | 0 |   } | 
| 2805 | 133k |   VLOG_WITH_PREFIX10(1) << "Starting consensus round: " | 
| 2806 | 10 |                       << msg->id().ShortDebugString(); | 
| 2807 | 133k |   scoped_refptr<ConsensusRound> round(new ConsensusRound(this, msg)); | 
| 2808 | 133k |   std::shared_ptr<StateChangeContext> context = nullptr; | 
| 2809 |  |  | 
| 2810 |  |   // We are here for NO_OP or CHANGE_CONFIG_OP type ops. We need to set the change record for an | 
| 2811 |  |   // actual config change operation. The NO_OP does not update the config, as it is used for a new | 
| 2812 |  |   // leader election term change replicate message, which keeps the same config. | 
| 2813 | 133k |   if (IsChangeConfigOperation(op_type)) { | 
| 2814 | 14.0k |     context = | 
| 2815 | 14.0k |       std::make_shared<StateChangeContext>(StateChangeReason::FOLLOWER_CONFIG_CHANGE_COMPLETE, | 
| 2816 | 14.0k |                                            msg->change_config_record()); | 
| 2817 | 119k |   } else { | 
| 2818 | 119k |     context = std::make_shared<StateChangeContext>(StateChangeReason::FOLLOWER_NO_OP_COMPLETE); | 
| 2819 | 119k |   } | 
| 2820 |  |  | 
| 2821 | 133k |   StdStatusCallback client_cb = | 
| 2822 | 133k |       std::bind(&RaftConsensus::MarkDirtyOnSuccess, | 
| 2823 | 133k |                 this, | 
| 2824 | 133k |                 context, | 
| 2825 | 133k |                 &DoNothingStatusCB, | 
| 2826 | 133k |                 std::placeholders::_1); | 
| 2827 | 133k |   round->SetCallback(MakeNonTrackedRoundCallback(round.get(), std::move(client_cb))); | 
| 2828 | 133k |   return state_->AddPendingOperation(round, OperationMode::kFollower); | 
| 2829 | 133k | } | 
| 2830 |  |  | 
| 2831 | 3.02k | Status RaftConsensus::WaitForLeaderLeaseImprecise(CoarseTimePoint deadline) { | 
| 2832 | 3.02k |   CoarseTimePoint start = CoarseMonoClock::now(); | 
| 2833 | 7.26k |   for (;;) { | 
| 2834 | 7.26k |     MonoDelta remaining_old_leader_lease; | 
| 2835 | 7.26k |     LeaderLeaseStatus leader_lease_status; | 
| 2836 | 7.26k |     ReplicaState::State state; | 
| 2837 | 7.26k |     { | 
| 2838 | 7.26k |       auto lock = state_->LockForRead(); | 
| 2839 | 7.26k |       state = state_->state(); | 
| 2840 | 7.26k |       if (state != ReplicaState::kRunning) { | 
| 2841 | 0 |         return STATUS_FORMAT(IllegalState, "Consensus is not running: $0", state); | 
| 2842 | 0 |       } | 
| 2843 | 7.26k |       if (state_->GetActiveRoleUnlocked() != PeerRole::LEADER) { | 
| 2844 | 4 |         return STATUS_FORMAT(IllegalState, "Not the leader: $0", state_->GetActiveRoleUnlocked()); | 
| 2845 | 4 |       } | 
| 2846 | 7.25k |       leader_lease_status = state_->GetLeaderLeaseStatusUnlocked(&remaining_old_leader_lease); | 
| 2847 | 7.25k |     } | 
| 2848 | 7.25k |     if (leader_lease_status == LeaderLeaseStatus::HAS_LEASE) { | 
| 2849 | 3.01k |       return Status::OK(); | 
| 2850 | 3.01k |     } | 
| 2851 | 4.24k |     CoarseTimePoint now = CoarseMonoClock::now(); | 
| 2852 | 4.24k |     if (now > deadline) { | 
| 2853 | 0 |       return STATUS_FORMAT( | 
| 2854 | 0 |           TimedOut, "Waited for $0 to acquire a leader lease, state $1, lease status: $2", | 
| 2855 | 0 |           now - start, state, LeaderLeaseStatus_Name(leader_lease_status)); | 
| 2856 | 0 |     } | 
| 2857 | 4.24k |     switch (leader_lease_status) { | 
| 2858 | 0 |       case LeaderLeaseStatus::HAS_LEASE: | 
| 2859 | 0 |         return Status::OK(); | 
| 2860 | 4.20k |       case LeaderLeaseStatus::NO_MAJORITY_REPLICATED_LEASE: | 
| 2861 | 4.20k |         { | 
| 2862 | 4.20k |           std::unique_lock<decltype(leader_lease_wait_mtx_)> lock(leader_lease_wait_mtx_); | 
| 2863 |  |           // Because we're not taking the same lock (leader_lease_wait_mtx_) when we check the | 
| 2864 |  |           // leader lease status, there is a possibility of a race condition when we miss the | 
| 2865 |  |           // notification and by this point we already have a lease. Rather than re-taking the | 
| 2866 |  |           // ReplicaState lock and re-checking, here we simply block for up to 100ms in that case, | 
| 2867 |  |           // because this function is currently (08/14/2017) only used in a context when it is OK, | 
| 2868 |  |           // such as catalog manager initialization. | 
| 2869 | 4.20k |           leader_lease_wait_cond_.wait_for( | 
| 2870 | 4.20k |               lock, std::max<MonoDelta>(100ms, deadline - now).ToSteadyDuration()); | 
| 2871 | 4.20k |         } | 
| 2872 | 4.20k |         continue; | 
| 2873 | 35 |       case LeaderLeaseStatus::OLD_LEADER_MAY_HAVE_LEASE: { | 
| 2874 | 35 |         auto wait_deadline = std::min({deadline, now + 100ms, now + remaining_old_leader_lease}); | 
| 2875 | 35 |         std::this_thread::sleep_until(wait_deadline); | 
| 2876 | 35 |       } continue; | 
| 2877 | 4.24k |     } | 
| 2878 | 0 |     FATAL_INVALID_ENUM_VALUE(LeaderLeaseStatus, leader_lease_status); | 
| 2879 | 0 |   } | 
| 2880 | 3.02k | } | 
| 2881 |  |  | 
| 2882 | 10.3M | Status RaftConsensus::CheckIsActiveLeaderAndHasLease() const { | 
| 2883 | 10.3M |   return state_->CheckIsActiveLeaderAndHasLease(); | 
| 2884 | 10.3M | } | 
| 2885 |  |  | 
| 2886 |  | Result<MicrosTime> RaftConsensus::MajorityReplicatedHtLeaseExpiration( | 
| 2887 | 73.5M |     MicrosTime min_allowed, CoarseTimePoint deadline) const { | 
| 2888 | 73.5M |   return state_->MajorityReplicatedHtLeaseExpiration(min_allowed, deadline); | 
| 2889 | 73.5M | } | 
| 2890 |  |  | 
| 2891 | 329k | std::string RaftConsensus::GetRequestVoteLogPrefix(const VoteRequestPB& request) const { | 
| 2892 | 329k |   return Format("$0 Leader $1election vote request", | 
| 2893 | 329k |                 state_->LogPrefix(), request.preelection() ? "pre-"216k: ""113k); | 
| 2894 | 329k | } | 
| 2895 |  |  | 
| 2896 |  | void RaftConsensus::FillVoteResponseVoteGranted( | 
| 2897 | 1.13M |     const VoteRequestPB& request, VoteResponsePB* response) { | 
| 2898 | 1.13M |   response->set_responder_term(request.candidate_term()); | 
| 2899 | 1.13M |   response->set_vote_granted(true); | 
| 2900 | 1.13M | } | 
| 2901 |  |  | 
| 2902 |  | void RaftConsensus::FillVoteResponseVoteDenied(ConsensusErrorPB::Code error_code, | 
| 2903 | 221k |                                                VoteResponsePB* response) { | 
| 2904 | 221k |   response->set_responder_term(state_->GetCurrentTermUnlocked()); | 
| 2905 | 221k |   response->set_vote_granted(false); | 
| 2906 | 221k |   response->mutable_consensus_error()->set_code(error_code); | 
| 2907 | 221k | } | 
| 2908 |  |  | 
| 2909 |  | void RaftConsensus::RequestVoteRespondVoteDenied( | 
| 2910 |  |     ConsensusErrorPB::Code error_code, const std::string& message_suffix, | 
| 2911 | 205k |     const VoteRequestPB& request, VoteResponsePB* response) { | 
| 2912 | 205k |   auto status = STATUS_FORMAT( | 
| 2913 | 205k |       InvalidArgument, "$0: Denying vote to candidate $1 $2", | 
| 2914 | 205k |       GetRequestVoteLogPrefix(request), request.candidate_uuid(), message_suffix); | 
| 2915 | 205k |   FillVoteResponseVoteDenied(error_code, response); | 
| 2916 | 205k |   LOG(INFO) << status.message().ToBuffer(); | 
| 2917 | 205k |   StatusToPB(status, response->mutable_consensus_error()->mutable_status()); | 
| 2918 | 205k | } | 
| 2919 |  |  | 
| 2920 |  | Status RaftConsensus::RequestVoteRespondInvalidTerm(const VoteRequestPB* request, | 
| 2921 | 204k |                                                     VoteResponsePB* response) { | 
| 2922 | 204k |   auto message_suffix = Format( | 
| 2923 | 204k |       "for earlier term $0. Current term is $1.", | 
| 2924 | 204k |       request->candidate_term(), state_->GetCurrentTermUnlocked()); | 
| 2925 | 204k |   RequestVoteRespondVoteDenied(ConsensusErrorPB::INVALID_TERM, message_suffix, *request, response); | 
| 2926 | 204k |   return Status::OK(); | 
| 2927 | 204k | } | 
| 2928 |  |  | 
| 2929 |  | Status RaftConsensus::RequestVoteRespondVoteAlreadyGranted(const VoteRequestPB* request, | 
| 2930 | 209 |                                                            VoteResponsePB* response) { | 
| 2931 | 209 |   FillVoteResponseVoteGranted(*request, response); | 
| 2932 | 209 |   LOG(INFO) << Substitute("$0: Already granted yes vote for candidate $1 in term $2. " | 
| 2933 | 209 |                           "Re-sending same reply.", | 
| 2934 | 209 |                           GetRequestVoteLogPrefix(*request), | 
| 2935 | 209 |                           request->candidate_uuid(), | 
| 2936 | 209 |                           request->candidate_term()); | 
| 2937 | 209 |   return Status::OK(); | 
| 2938 | 209 | } | 
| 2939 |  |  | 
| 2940 |  | Status RaftConsensus::RequestVoteRespondAlreadyVotedForOther(const VoteRequestPB* request, | 
| 2941 | 1.16k |                                                              VoteResponsePB* response) { | 
| 2942 | 1.16k |   auto message_suffix = Format( | 
| 2943 | 1.16k |       "in current term $0: Already voted for candidate $1 in this term.", | 
| 2944 | 1.16k |       state_->GetCurrentTermUnlocked(), state_->GetVotedForCurrentTermUnlocked()); | 
| 2945 | 1.16k |   RequestVoteRespondVoteDenied(ConsensusErrorPB::ALREADY_VOTED, message_suffix, *request, response); | 
| 2946 | 1.16k |   return Status::OK(); | 
| 2947 | 1.16k | } | 
| 2948 |  |  | 
| 2949 |  | Status RaftConsensus::RequestVoteRespondLastOpIdTooOld(const OpIdPB& local_last_logged_opid, | 
| 2950 |  |                                                        const VoteRequestPB* request, | 
| 2951 | 433 |                                                        VoteResponsePB* response) { | 
| 2952 | 433 |   auto message_suffix = Format( | 
| 2953 | 433 |       "for term $0 because replica has last-logged OpId of $1, which is greater than that of the " | 
| 2954 | 433 |           "candidate, which has last-logged OpId of $2.", | 
| 2955 | 433 |       request->candidate_term(), local_last_logged_opid, | 
| 2956 | 433 |       request->candidate_status().last_received()); | 
| 2957 | 433 |   RequestVoteRespondVoteDenied( | 
| 2958 | 433 |       ConsensusErrorPB::LAST_OPID_TOO_OLD, message_suffix, *request, response); | 
| 2959 | 433 |   return Status::OK(); | 
| 2960 | 433 | } | 
| 2961 |  |  | 
| 2962 |  | Status RaftConsensus::RequestVoteRespondLeaderIsAlive(const VoteRequestPB* request, | 
| 2963 | 8.89k |                                                       VoteResponsePB* response) { | 
| 2964 | 8.89k |   FillVoteResponseVoteDenied(ConsensusErrorPB::LEADER_IS_ALIVE, response); | 
| 2965 | 8.89k |   std::string msg = Format( | 
| 2966 | 8.89k |       "$0: Denying vote to candidate $1 for term $2 because replica is either leader or believes a " | 
| 2967 | 8.89k |       "valid leader to be alive. Time left: $3", | 
| 2968 | 8.89k |       GetRequestVoteLogPrefix(*request), request->candidate_uuid(), request->candidate_term(), | 
| 2969 | 8.89k |       withhold_votes_until_.load(std::memory_order_acquire) - MonoTime::Now()); | 
| 2970 | 8.89k |   LOG(INFO) << msg; | 
| 2971 | 8.89k |   StatusToPB(STATUS(InvalidArgument, msg), response->mutable_consensus_error()->mutable_status()); | 
| 2972 | 8.89k |   return Status::OK(); | 
| 2973 | 8.89k | } | 
| 2974 |  |  | 
| 2975 |  | Status RaftConsensus::RequestVoteRespondIsBusy(const VoteRequestPB* request, | 
| 2976 | 6.88k |                                                VoteResponsePB* response) { | 
| 2977 | 6.88k |   FillVoteResponseVoteDenied(ConsensusErrorPB::CONSENSUS_BUSY, response); | 
| 2978 | 6.88k |   string msg = Substitute("$0: Denying vote to candidate $1 for term $2 because " | 
| 2979 | 6.88k |                           "replica is already servicing an update from a current leader " | 
| 2980 | 6.88k |                           "or another vote.", | 
| 2981 | 6.88k |                           GetRequestVoteLogPrefix(*request), | 
| 2982 | 6.88k |                           request->candidate_uuid(), | 
| 2983 | 6.88k |                           request->candidate_term()); | 
| 2984 | 6.88k |   LOG(INFO) << msg; | 
| 2985 | 6.88k |   StatusToPB(STATUS(ServiceUnavailable, msg), | 
| 2986 | 6.88k |              response->mutable_consensus_error()->mutable_status()); | 
| 2987 | 6.88k |   return Status::OK(); | 
| 2988 | 6.88k | } | 
| 2989 |  |  | 
| 2990 |  | Status RaftConsensus::RequestVoteRespondVoteGranted(const VoteRequestPB* request, | 
| 2991 | 107k |                                                     VoteResponsePB* response) { | 
| 2992 |  |   // We know our vote will be "yes", so avoid triggering an election while we | 
| 2993 |  |   // persist our vote to disk. We use an exponential backoff to avoid too much | 
| 2994 |  |   // split-vote contention when nodes display high latencies. | 
| 2995 | 107k |   MonoDelta additional_backoff = LeaderElectionExpBackoffDeltaUnlocked(); | 
| 2996 | 107k |   SnoozeFailureDetector(ALLOW_LOGGING, additional_backoff); | 
| 2997 |  |  | 
| 2998 |  |   // Persist our vote to disk. | 
| 2999 | 107k |   RETURN_NOT_OK(state_->SetVotedForCurrentTermUnlocked(request->candidate_uuid())); | 
| 3000 |  |  | 
| 3001 | 107k |   FillVoteResponseVoteGranted(*request, response); | 
| 3002 |  |  | 
| 3003 |  |   // Give peer time to become leader. Snooze one more time after persisting our | 
| 3004 |  |   // vote. When disk latency is high, this should help reduce churn. | 
| 3005 | 107k |   SnoozeFailureDetector(DO_NOT_LOG, additional_backoff); | 
| 3006 |  |  | 
| 3007 | 107k |   LOG(INFO) << Substitute("$0: Granting yes vote for candidate $1 in term $2.", | 
| 3008 | 107k |                           GetRequestVoteLogPrefix(*request), | 
| 3009 | 107k |                           request->candidate_uuid(), | 
| 3010 | 107k |                           state_->GetCurrentTermUnlocked()); | 
| 3011 | 107k |   return Status::OK(); | 
| 3012 | 107k | } | 
| 3013 |  |  | 
| 3014 | 28.9M | PeerRole RaftConsensus::GetRoleUnlocked() const { | 
| 3015 | 28.9M |   DCHECK(state_->IsLocked()); | 
| 3016 | 28.9M |   return state_->GetActiveRoleUnlocked(); | 
| 3017 | 28.9M | } | 
| 3018 |  |  | 
| 3019 | 28.9M | PeerRole RaftConsensus::role() const { | 
| 3020 | 28.9M |   auto lock = state_->LockForRead(); | 
| 3021 | 28.9M |   return GetRoleUnlocked(); | 
| 3022 | 28.9M | } | 
| 3023 |  |  | 
| 3024 | 82.0M | LeaderState RaftConsensus::GetLeaderState(bool allow_stale) const { | 
| 3025 | 82.0M |   return state_->GetLeaderState(allow_stale); | 
| 3026 | 82.0M | } | 
| 3027 |  |  | 
| 3028 | 8.74M | std::string RaftConsensus::LogPrefix() { | 
| 3029 | 8.74M |   return state_->LogPrefix(); | 
| 3030 | 8.74M | } | 
| 3031 |  |  | 
| 3032 | 182k | void RaftConsensus::SetLeaderUuidUnlocked(const string& uuid) { | 
| 3033 | 182k |   failed_elections_since_stable_leader_.store(0, std::memory_order_release); | 
| 3034 | 182k |   state_->SetLeaderUuidUnlocked(uuid); | 
| 3035 | 182k |   auto context = std::make_shared<StateChangeContext>(StateChangeReason::NEW_LEADER_ELECTED, uuid); | 
| 3036 | 182k |   MarkDirty(context); | 
| 3037 | 182k | } | 
| 3038 |  |  | 
| 3039 |  | Status RaftConsensus::ReplicateConfigChangeUnlocked(const ReplicateMsgPtr& replicate_ref, | 
| 3040 |  |                                                     const RaftConfigPB& new_config, | 
| 3041 |  |                                                     ChangeConfigType type, | 
| 3042 | 5.87k |                                                     StdStatusCallback client_cb) { | 
| 3043 | 5.87k |   LOG(INFO) << "Setting replicate pending config " << new_config.ShortDebugString() | 
| 3044 | 5.87k |             << ", type = " << ChangeConfigType_Name(type); | 
| 3045 |  |  | 
| 3046 | 5.87k |   RETURN_NOT_OK(state_->SetPendingConfigUnlocked(new_config)); | 
| 3047 |  |  | 
| 3048 | 5.87k |   if (type == CHANGE_ROLE && | 
| 3049 | 5.87k |       PREDICT_FALSE2.01k(FLAGS_TEST_inject_delay_leader_change_role_append_secs)) { | 
| 3050 | 1 |     LOG(INFO) << "Adding change role sleep for " | 
| 3051 | 1 |               << FLAGS_TEST_inject_delay_leader_change_role_append_secs << " secs."; | 
| 3052 | 1 |     SleepFor(MonoDelta::FromSeconds(FLAGS_TEST_inject_delay_leader_change_role_append_secs)); | 
| 3053 | 1 |   } | 
| 3054 |  |  | 
| 3055 |  |   // Set as pending. | 
| 3056 | 5.87k |   RefreshConsensusQueueAndPeersUnlocked(); | 
| 3057 |  |  | 
| 3058 | 5.87k |   auto round = make_scoped_refptr<ConsensusRound>(this, replicate_ref); | 
| 3059 | 5.87k |   round->SetCallback(MakeNonTrackedRoundCallback(round.get(), std::move(client_cb))); | 
| 3060 | 5.87k |   auto status = AppendNewRoundToQueueUnlocked(round); | 
| 3061 | 5.87k |   if (!status.ok()) { | 
| 3062 |  |     // We could just cancel pending config, because there is could be only one pending config. | 
| 3063 | 0 |     auto clear_status = state_->ClearPendingConfigUnlocked(); | 
| 3064 | 0 |     if (!clear_status.ok()) { | 
| 3065 | 0 |       LOG(WARNING) << "Could not clear pending config: " << clear_status; | 
| 3066 | 0 |     } | 
| 3067 | 0 |   } | 
| 3068 | 5.87k |   return status; | 
| 3069 | 5.87k | } | 
| 3070 |  |  | 
| 3071 | 67.9k | void RaftConsensus::RefreshConsensusQueueAndPeersUnlocked() { | 
| 3072 | 67.9k |   DCHECK_EQ(PeerRole::LEADER, state_->GetActiveRoleUnlocked()); | 
| 3073 | 67.9k |   const RaftConfigPB& active_config = state_->GetActiveConfigUnlocked(); | 
| 3074 |  |  | 
| 3075 |  |   // Change the peers so that we're able to replicate messages remotely and | 
| 3076 |  |   // locally. Peer manager connections are updated using the active config. Connections to peers | 
| 3077 |  |   // that are not part of active_config are closed. New connections are created for those peers | 
| 3078 |  |   // that are present in active_config but have no connections. When the queue is in LEADER | 
| 3079 |  |   // mode, it checks that all registered peers are a part of the active config. | 
| 3080 | 67.9k |   peer_manager_->ClosePeersNotInConfig(active_config); | 
| 3081 | 67.9k |   queue_->SetLeaderMode(state_->GetCommittedOpIdUnlocked(), | 
| 3082 | 67.9k |                         state_->GetCurrentTermUnlocked(), | 
| 3083 | 67.9k |                         state_->GetLastAppliedOpIdUnlocked(), | 
| 3084 | 67.9k |                         active_config); | 
| 3085 |  |  | 
| 3086 | 67.9k |   ScopedDnsTracker dns_tracker(update_raft_config_dns_latency_.get()); | 
| 3087 | 67.9k |   peer_manager_->UpdateRaftConfig(active_config); | 
| 3088 | 67.9k | } | 
| 3089 |  |  | 
| 3090 | 4.44k | string RaftConsensus::peer_uuid() const { | 
| 3091 | 4.44k |   return state_->GetPeerUuid(); | 
| 3092 | 4.44k | } | 
| 3093 |  |  | 
| 3094 | 60.6k | string RaftConsensus::tablet_id() const { | 
| 3095 | 60.6k |   return state_->GetOptions().tablet_id; | 
| 3096 | 60.6k | } | 
| 3097 |  |  | 
| 3098 | 10.7k | const TabletId& RaftConsensus::split_parent_tablet_id() const { | 
| 3099 | 10.7k |   return split_parent_tablet_id_; | 
| 3100 | 10.7k | } | 
| 3101 |  |  | 
| 3102 |  | ConsensusStatePB RaftConsensus::ConsensusState( | 
| 3103 |  |     ConsensusConfigType type, | 
| 3104 | 46.5M |     LeaderLeaseStatus* leader_lease_status) const { | 
| 3105 | 46.5M |   auto lock = state_->LockForRead(); | 
| 3106 | 46.5M |   return ConsensusStateUnlocked(type, leader_lease_status); | 
| 3107 | 46.5M | } | 
| 3108 |  |  | 
| 3109 |  | ConsensusStatePB RaftConsensus::ConsensusStateUnlocked( | 
| 3110 |  |     ConsensusConfigType type, | 
| 3111 | 46.5M |     LeaderLeaseStatus* leader_lease_status) const { | 
| 3112 | 46.5M |   CHECK(state_->IsLocked()); | 
| 3113 | 46.5M |   if (leader_lease_status) { | 
| 3114 | 3.88k |     if (GetRoleUnlocked() == PeerRole::LEADER) { | 
| 3115 | 1.45k |       *leader_lease_status = state_->GetLeaderLeaseStatusUnlocked(); | 
| 3116 | 2.42k |     } else { | 
| 3117 |  |       // We'll still return a valid value if we're not a leader. | 
| 3118 | 2.42k |       *leader_lease_status = LeaderLeaseStatus::NO_MAJORITY_REPLICATED_LEASE; | 
| 3119 | 2.42k |     } | 
| 3120 | 3.88k |   } | 
| 3121 | 46.5M |   return state_->ConsensusStateUnlocked(type); | 
| 3122 | 46.5M | } | 
| 3123 |  |  | 
| 3124 | 77.9M | RaftConfigPB RaftConsensus::CommittedConfig() const { | 
| 3125 | 77.9M |   auto lock = state_->LockForRead(); | 
| 3126 | 77.9M |   return state_->GetCommittedConfigUnlocked(); | 
| 3127 | 77.9M | } | 
| 3128 |  |  | 
| 3129 | 38 | void RaftConsensus::DumpStatusHtml(std::ostream& out) const { | 
| 3130 | 38 |   out << "<h1>Raft Consensus State</h1>" << std::endl; | 
| 3131 |  |  | 
| 3132 | 38 |   out << "<h2>State</h2>" << std::endl; | 
| 3133 | 38 |   out << "<pre>" << EscapeForHtmlToString(queue_->ToString()) << "</pre>" << std::endl; | 
| 3134 |  |  | 
| 3135 |  |   // Dump the queues on a leader. | 
| 3136 | 38 |   PeerRole role; | 
| 3137 | 38 |   { | 
| 3138 | 38 |     auto lock = state_->LockForRead(); | 
| 3139 | 38 |     role = state_->GetActiveRoleUnlocked(); | 
| 3140 | 38 |   } | 
| 3141 | 38 |   if (role == PeerRole::LEADER) { | 
| 3142 | 38 |     out << "<h2>Queue overview</h2>" << std::endl; | 
| 3143 | 38 |     out << "<pre>" << EscapeForHtmlToString(queue_->ToString()) << "</pre>" << std::endl; | 
| 3144 | 38 |     out << "<hr/>" << std::endl; | 
| 3145 | 38 |     out << "<h2>Queue details</h2>" << std::endl; | 
| 3146 | 38 |     queue_->DumpToHtml(out); | 
| 3147 | 38 |   } | 
| 3148 | 38 | } | 
| 3149 |  |  | 
| 3150 | 46 | ReplicaState* RaftConsensus::GetReplicaStateForTests() { | 
| 3151 | 46 |   return state_.get(); | 
| 3152 | 46 | } | 
| 3153 |  |  | 
| 3154 |  | void RaftConsensus::ElectionCallback(const LeaderElectionData& data, | 
| 3155 | 737k |                                      const ElectionResult& result) { | 
| 3156 |  |   // The election callback runs on a reactor thread, so we need to defer to our | 
| 3157 |  |   // threadpool. If the threadpool is already shut down for some reason, it's OK -- | 
| 3158 |  |   // we're OK with the callback never running. | 
| 3159 | 737k |   WARN_NOT_OK(raft_pool_token_->SubmitFunc( | 
| 3160 | 737k |               std::bind(&RaftConsensus::DoElectionCallback, shared_from_this(), data, result)), | 
| 3161 | 737k |               state_->LogPrefix() + "Unable to run election callback"); | 
| 3162 | 737k | } | 
| 3163 |  |  | 
| 3164 | 123k | void RaftConsensus::NotifyOriginatorAboutLostElection(const std::string& originator_uuid) { | 
| 3165 | 123k |   if (originator_uuid.empty()) { | 
| 3166 | 123k |     return; | 
| 3167 | 123k |   } | 
| 3168 |  |  | 
| 3169 | 77 |   ReplicaState::UniqueLock lock; | 
| 3170 | 77 |   Status s = state_->LockForConfigChange(&lock); | 
| 3171 | 77 |   if (PREDICT_FALSE(!s.ok())) { | 
| 3172 | 0 |     LOG_WITH_PREFIX(INFO) << "Unable to notify originator about lost election, lock failed: " | 
| 3173 | 0 |                           << s.ToString(); | 
| 3174 | 0 |     return; | 
| 3175 | 0 |   } | 
| 3176 |  |  | 
| 3177 | 77 |   const auto& active_config = state_->GetActiveConfigUnlocked(); | 
| 3178 | 77 |   const auto * peer = FindPeer(active_config, originator_uuid); | 
| 3179 | 77 |   if (!peer) { | 
| 3180 | 0 |     LOG_WITH_PREFIX(WARNING) << "Failed to find originators peer: " << originator_uuid | 
| 3181 | 0 |                              << ", config: " << active_config.ShortDebugString(); | 
| 3182 | 0 |     return; | 
| 3183 | 0 |   } | 
| 3184 |  |  | 
| 3185 | 77 |   auto proxy = peer_proxy_factory_->NewProxy(*peer); | 
| 3186 | 77 |   LeaderElectionLostRequestPB req; | 
| 3187 | 77 |   req.set_dest_uuid(originator_uuid); | 
| 3188 | 77 |   req.set_election_lost_by_uuid(state_->GetPeerUuid()); | 
| 3189 | 77 |   req.set_tablet_id(state_->GetOptions().tablet_id); | 
| 3190 | 77 |   auto resp = std::make_shared<LeaderElectionLostResponsePB>(); | 
| 3191 | 77 |   auto rpc = std::make_shared<rpc::RpcController>(); | 
| 3192 | 77 |   rpc->set_invoke_callback_mode(rpc::InvokeCallbackMode::kThreadPoolHigh); | 
| 3193 | 77 |   auto log_prefix = state_->LogPrefix(); | 
| 3194 | 77 |   proxy->LeaderElectionLostAsync(&req, resp.get(), rpc.get(), [log_prefix, resp, rpc] { | 
| 3195 | 77 |     if (!rpc->status().ok()) { | 
| 3196 | 0 |       LOG(WARNING) << log_prefix << "Notify about lost election RPC failure: " | 
| 3197 | 0 |                    << rpc->status().ToString(); | 
| 3198 | 77 |     } else if (resp->has_error()) { | 
| 3199 | 0 |       LOG(WARNING) << log_prefix << "Notify about lost election failed: " | 
| 3200 | 0 |                    << StatusFromPB(resp->error().status()).ToString(); | 
| 3201 | 0 |     } | 
| 3202 | 77 |   }); | 
| 3203 | 77 | } | 
| 3204 |  |  | 
| 3205 |  | void RaftConsensus::DoElectionCallback(const LeaderElectionData& data, | 
| 3206 | 737k |                                        const ElectionResult& result) { | 
| 3207 | 737k |   const char* election_name = result.preelection ? "Pre-election"674k: "election"62.8k; | 
| 3208 | 737k |   const char* decision_name = result.decision == ElectionVote::kGranted ? "won"613k: "lost"123k; | 
| 3209 |  |   // Snooze to avoid the election timer firing again as much as possible. | 
| 3210 | 737k |   { | 
| 3211 | 737k |     auto lock = state_->LockForRead(); | 
| 3212 |  |     // We need to snooze when we win and when we lose: | 
| 3213 |  |     // - When we win because we're about to disable the timer and become leader. | 
| 3214 |  |     // - When we loose or otherwise we can fall into a cycle, where everyone keeps | 
| 3215 |  |     //   triggering elections but no election ever completes because by the time they | 
| 3216 |  |     //   finish another one is triggered already. | 
| 3217 |  |     // We ignore the status as we don't want to fail if we the timer is | 
| 3218 |  |     // disabled. | 
| 3219 | 737k |     SnoozeFailureDetector(ALLOW_LOGGING, LeaderElectionExpBackoffDeltaUnlocked()); | 
| 3220 |  |  | 
| 3221 | 737k |     if (!result.preelections_not_supported_by_uuid.empty()) { | 
| 3222 | 0 |       disable_pre_elections_until_ = | 
| 3223 | 0 |           CoarseMonoClock::now() + FLAGS_temporary_disable_preelections_timeout_ms * 1ms; | 
| 3224 | 0 |       LOG_WITH_PREFIX(WARNING) | 
| 3225 | 0 |           << "Disable pre-elections until " << ToString(disable_pre_elections_until_) | 
| 3226 | 0 |           << ", because " << result.preelections_not_supported_by_uuid << " does not support them."; | 
| 3227 | 0 |     } | 
| 3228 | 737k |   } | 
| 3229 | 737k |   if (result.decision == ElectionVote::kDenied) { | 
| 3230 | 123k |     failed_elections_since_stable_leader_.fetch_add(1, std::memory_order_acq_rel); | 
| 3231 | 123k |     LOG_WITH_PREFIX(INFO) << "Leader " << election_name << " lost for term " | 
| 3232 | 123k |                           << result.election_term << ". Reason: " | 
| 3233 | 123k |                           << (!result.message.empty() ? result.message88.2k: "None given"35.4k) | 
| 3234 | 123k |                           << ". Originator: " << data.originator_uuid; | 
| 3235 | 123k |     NotifyOriginatorAboutLostElection(data.originator_uuid); | 
| 3236 |  |  | 
| 3237 | 123k |     if (result.higher_term) { | 
| 3238 | 88.2k |       ReplicaState::UniqueLock lock; | 
| 3239 | 88.2k |       Status s = state_->LockForConfigChange(&lock); | 
| 3240 | 88.2k |       if (s.ok()) { | 
| 3241 | 88.2k |         s = HandleTermAdvanceUnlocked(*result.higher_term); | 
| 3242 | 88.2k |       } | 
| 3243 | 88.2k |       if (!s.ok()) { | 
| 3244 | 87.7k |         LOG_WITH_PREFIX(INFO) << "Unable to advance term as " << election_name << " result: " << s; | 
| 3245 | 87.7k |       } | 
| 3246 | 88.2k |     } | 
| 3247 |  |  | 
| 3248 | 123k |     return; | 
| 3249 | 123k |   } | 
| 3250 |  |  | 
| 3251 | 613k |   ReplicaState::UniqueLock lock; | 
| 3252 | 613k |   Status s = state_->LockForConfigChange(&lock); | 
| 3253 | 613k |   if (PREDICT_FALSE(!s.ok())) { | 
| 3254 | 0 |     LOG_WITH_PREFIX(INFO) << "Received " << election_name << " callback for term " | 
| 3255 | 0 |                           << result.election_term << " while not running: " | 
| 3256 | 0 |                           << s.ToString(); | 
| 3257 | 0 |     return; | 
| 3258 | 0 |   } | 
| 3259 |  |  | 
| 3260 | 613k |   auto desired_term = state_->GetCurrentTermUnlocked() + (result.preelection ? 1551k: 061.9k); | 
| 3261 | 613k |   if (result.election_term != desired_term) { | 
| 3262 | 1.21k |     LOG_WITH_PREFIX(INFO) | 
| 3263 | 1.21k |         << "Leader " << election_name << " decision for defunct term " | 
| 3264 | 1.21k |         << result.election_term << ": " << decision_name; | 
| 3265 | 1.21k |     return; | 
| 3266 | 1.21k |   } | 
| 3267 |  |  | 
| 3268 | 612k |   const RaftConfigPB& active_config = state_->GetActiveConfigUnlocked(); | 
| 3269 | 612k |   if (!IsRaftConfigVoter(state_->GetPeerUuid(), active_config)) { | 
| 3270 | 0 |     LOG_WITH_PREFIX(WARNING) | 
| 3271 | 0 |         << "Leader " << election_name << " decision while not in active config. " | 
| 3272 | 0 |         << "Result: Term " << result.election_term << ": " << decision_name | 
| 3273 | 0 |         << ". RaftConfig: " << active_config.ShortDebugString(); | 
| 3274 | 0 |     return; | 
| 3275 | 0 |   } | 
| 3276 |  |  | 
| 3277 | 612k |   if (result.preelection) { | 
| 3278 | 550k |     LOG_WITH_PREFIX(INFO) << "Leader pre-election won for term " << result.election_term; | 
| 3279 | 550k |     lock.unlock(); | 
| 3280 | 550k |     WARN_NOT_OK(DoStartElection(data, PreElected::kTrue), "Start election failed: "); | 
| 3281 | 550k |     return; | 
| 3282 | 550k |   } | 
| 3283 |  |  | 
| 3284 | 61.9k |   if (state_->GetActiveRoleUnlocked() == PeerRole::LEADER) { | 
| 3285 | 0 |     LOG_WITH_PREFIX(DFATAL) | 
| 3286 | 0 |         << "Leader " << election_name << " callback while already leader! Result: Term " | 
| 3287 | 0 |         << result.election_term << ": " | 
| 3288 | 0 |         << decision_name; | 
| 3289 | 0 |     return; | 
| 3290 | 0 |   } | 
| 3291 |  |  | 
| 3292 | 61.9k |   LOG_WITH_PREFIX(INFO) << "Leader " << election_name << " won for term " << result.election_term; | 
| 3293 |  |  | 
| 3294 |  |   // Apply lease updates that were possible received from voters. | 
| 3295 | 61.9k |   state_->UpdateOldLeaderLeaseExpirationOnNonLeaderUnlocked( | 
| 3296 | 61.9k |       result.old_leader_lease, result.old_leader_ht_lease); | 
| 3297 |  |  | 
| 3298 | 61.9k |   state_->SetLeaderNoOpCommittedUnlocked(false); | 
| 3299 |  |   // Convert role to LEADER. | 
| 3300 | 61.9k |   SetLeaderUuidUnlocked(state_->GetPeerUuid()); | 
| 3301 |  |  | 
| 3302 |  |   // TODO: BecomeLeaderUnlocked() can fail due to state checks during shutdown. | 
| 3303 |  |   // It races with the above state check. | 
| 3304 |  |   // This could be a problem during tablet deletion. | 
| 3305 | 61.9k |   auto status = BecomeLeaderUnlocked(); | 
| 3306 | 61.9k |   if (!status.ok()) { | 
| 3307 | 0 |     LOG_WITH_PREFIX(DFATAL) << "Failed to become leader: " << status.ToString(); | 
| 3308 | 0 |   } | 
| 3309 | 61.9k | } | 
| 3310 |  |  | 
| 3311 | 341 | yb::OpId RaftConsensus::GetLastReceivedOpId() { | 
| 3312 | 341 |   auto lock = state_->LockForRead(); | 
| 3313 | 341 |   return state_->GetLastReceivedOpIdUnlocked(); | 
| 3314 | 341 | } | 
| 3315 |  |  | 
| 3316 | 51.8M | yb::OpId RaftConsensus::GetLastCommittedOpId() { | 
| 3317 | 51.8M |   auto lock = state_->LockForRead(); | 
| 3318 | 51.8M |   return state_->GetCommittedOpIdUnlocked(); | 
| 3319 | 51.8M | } | 
| 3320 |  |  | 
| 3321 | 3.82M | yb::OpId RaftConsensus::GetLastCDCedOpId() { | 
| 3322 | 3.82M |   return queue_->GetCDCConsumerOpIdForIntentRemoval(); | 
| 3323 | 3.82M | } | 
| 3324 |  |  | 
| 3325 | 0 | yb::OpId RaftConsensus::GetLastAppliedOpId() { | 
| 3326 | 0 |   auto lock = state_->LockForRead(); | 
| 3327 | 0 |   return state_->GetLastAppliedOpIdUnlocked(); | 
| 3328 | 0 | } | 
| 3329 |  |  | 
| 3330 | 59 | yb::OpId RaftConsensus::GetAllAppliedOpId() { | 
| 3331 | 59 |   return queue_->GetAllAppliedOpId(); | 
| 3332 | 59 | } | 
| 3333 |  |  | 
| 3334 | 471k | void RaftConsensus::MarkDirty(std::shared_ptr<StateChangeContext> context) { | 
| 3335 | 471k |   LOG_WITH_PREFIX(INFO) << "Calling mark dirty synchronously for reason code " << context->reason; | 
| 3336 | 471k |   mark_dirty_clbk_.Run(context); | 
| 3337 | 471k | } | 
| 3338 |  |  | 
| 3339 |  | void RaftConsensus::MarkDirtyOnSuccess(std::shared_ptr<StateChangeContext> context, | 
| 3340 |  |                                        const StdStatusCallback& client_cb, | 
| 3341 | 138k |                                        const Status& status) { | 
| 3342 | 138k |   if (PREDICT_TRUE(status.ok())) { | 
| 3343 | 138k |     MarkDirty(context); | 
| 3344 | 138k |   } | 
| 3345 | 138k |   client_cb(status); | 
| 3346 | 138k | } | 
| 3347 |  |  | 
| 3348 |  | void RaftConsensus::NonTrackedRoundReplicationFinished(ConsensusRound* round, | 
| 3349 |  |                                                        const StdStatusCallback& client_cb, | 
| 3350 | 200k |                                                        const Status& status) { | 
| 3351 | 200k |   DCHECK(state_->IsLocked()); | 
| 3352 | 200k |   OperationType op_type = round->replicate_msg()->op_type(); | 
| 3353 | 200k |   string op_str = Format("$0 [$1]", OperationType_Name(op_type), round->id()); | 
| 3354 | 200k |   if (!IsConsensusOnlyOperation(op_type)) { | 
| 3355 | 0 |     LOG_WITH_PREFIX(ERROR) << "Unexpected op type: " << op_str; | 
| 3356 | 0 |     return; | 
| 3357 | 0 |   } | 
| 3358 | 200k |   if (!status.ok()) { | 
| 3359 |  |     // TODO: Do something with the status on failure? | 
| 3360 | 138 |     LOG_WITH_PREFIX(INFO) << op_str << " replication failed: " << status << "\n" << GetStackTrace(); | 
| 3361 |  |  | 
| 3362 |  |     // Clear out the pending state (ENG-590). | 
| 3363 | 138 |     if (IsChangeConfigOperation(op_type)) { | 
| 3364 | 19 |       WARN_NOT_OK(state_->ClearPendingConfigUnlocked(), "Could not clear pending state"); | 
| 3365 | 19 |     } | 
| 3366 | 200k |   } else if (IsChangeConfigOperation(op_type)) { | 
| 3367 |  |     // Notify the TabletPeer owner object. | 
| 3368 | 19.5k |     state_->context()->ChangeConfigReplicated(state_->GetCommittedConfigUnlocked()); | 
| 3369 | 19.5k |   } | 
| 3370 |  |  | 
| 3371 | 200k |   client_cb(status); | 
| 3372 | 200k | } | 
| 3373 |  |  | 
| 3374 | 161k | void RaftConsensus::EnableFailureDetector(MonoDelta delta) { | 
| 3375 | 161k |   if (PREDICT_TRUE(FLAGS_enable_leader_failure_detection)) { | 
| 3376 | 161k |     failure_detector_->Start(delta); | 
| 3377 | 161k |   } | 
| 3378 | 161k | } | 
| 3379 |  |  | 
| 3380 | 137k | void RaftConsensus::DisableFailureDetector() { | 
| 3381 | 137k |   if (PREDICT_TRUE(FLAGS_enable_leader_failure_detection)) { | 
| 3382 | 137k |     failure_detector_->Stop(); | 
| 3383 | 137k |   } | 
| 3384 | 137k | } | 
| 3385 |  |  | 
| 3386 | 27.6M | void RaftConsensus::SnoozeFailureDetector(AllowLogging allow_logging, MonoDelta delta) { | 
| 3387 | 27.6M |   if (PREDICT_TRUE(GetAtomicFlag(&FLAGS_enable_leader_failure_detection))) { | 
| 3388 | 26.5M |     if (allow_logging == ALLOW_LOGGING) { | 
| 3389 | 2.07M |       LOG_WITH_PREFIX(INFO) << Format("Snoozing leader timeout detection for $0", | 
| 3390 | 18.4E |                                       delta.Initialized()2.07M? delta.ToString()2.07M: "election timeout"); | 
| 3391 | 2.07M |     } | 
| 3392 |  |  | 
| 3393 | 26.5M |     if (!delta.Initialized()) { | 
| 3394 | 24.3M |       delta = MinimumElectionTimeout(); | 
| 3395 | 24.3M |     } | 
| 3396 | 26.5M |     failure_detector_->Snooze(delta); | 
| 3397 | 26.5M |   } | 
| 3398 | 27.6M | } | 
| 3399 |  |  | 
| 3400 | 52.0M | MonoDelta RaftConsensus::MinimumElectionTimeout() const { | 
| 3401 | 52.0M |   int32_t failure_timeout = FLAGS_leader_failure_max_missed_heartbeat_periods * | 
| 3402 | 52.0M |       FLAGS_raft_heartbeat_interval_ms; | 
| 3403 |  |  | 
| 3404 | 52.0M |   return MonoDelta::FromMilliseconds(failure_timeout); | 
| 3405 | 52.0M | } | 
| 3406 |  |  | 
| 3407 | 2.07M | MonoDelta RaftConsensus::LeaderElectionExpBackoffDeltaUnlocked() { | 
| 3408 |  |   // Compute a backoff factor based on how many leader elections have | 
| 3409 |  |   // taken place since a stable leader was last seen. | 
| 3410 | 2.07M |   double backoff_factor = pow( | 
| 3411 | 2.07M |       1.1, | 
| 3412 | 2.07M |       failed_elections_since_stable_leader_.load(std::memory_order_acquire) + 1); | 
| 3413 | 2.07M |   double min_timeout = MinimumElectionTimeout().ToMilliseconds(); | 
| 3414 | 2.07M |   double max_timeout = std::min<double>( | 
| 3415 | 2.07M |       min_timeout * backoff_factor, | 
| 3416 | 2.07M |       FLAGS_leader_failure_exp_backoff_max_delta_ms); | 
| 3417 | 2.07M |   if (max_timeout < min_timeout) { | 
| 3418 | 1.23k |     LOG(INFO) << "Resetting max_timeout from " <<  max_timeout << " to " << min_timeout | 
| 3419 | 1.23k |               << ", max_delta_flag=" << FLAGS_leader_failure_exp_backoff_max_delta_ms; | 
| 3420 | 1.23k |     max_timeout = min_timeout; | 
| 3421 | 1.23k |   } | 
| 3422 |  |   // Randomize the timeout between the minimum and the calculated value. | 
| 3423 |  |   // We do this after the above capping to the max. Otherwise, after a | 
| 3424 |  |   // churny period, we'd end up highly likely to backoff exactly the max | 
| 3425 |  |   // amount. | 
| 3426 | 2.07M |   double timeout = min_timeout + (max_timeout - min_timeout) * rng_.NextDoubleFraction(); | 
| 3427 | 2.07M |   DCHECK_GE(timeout, min_timeout); | 
| 3428 |  |  | 
| 3429 | 2.07M |   return MonoDelta::FromMilliseconds(timeout); | 
| 3430 | 2.07M | } | 
| 3431 |  |  | 
| 3432 | 550k | Status RaftConsensus::IncrementTermUnlocked() { | 
| 3433 | 550k |   return HandleTermAdvanceUnlocked(state_->GetCurrentTermUnlocked() + 1); | 
| 3434 | 550k | } | 
| 3435 |  |  | 
| 3436 | 758k | Status RaftConsensus::HandleTermAdvanceUnlocked(ConsensusTerm new_term) { | 
| 3437 | 758k |   if (new_term <= state_->GetCurrentTermUnlocked()) { | 
| 3438 | 1.09k |     return STATUS(IllegalState, Substitute("Can't advance term to: $0 current term: $1 is higher.", | 
| 3439 | 1.09k |                                            new_term, state_->GetCurrentTermUnlocked())); | 
| 3440 | 1.09k |   } | 
| 3441 |  |  | 
| 3442 | 757k |   if (state_->GetActiveRoleUnlocked() == PeerRole::LEADER) { | 
| 3443 | 945 |     LOG_WITH_PREFIX(INFO) << "Stepping down as leader of term " | 
| 3444 | 945 |                           << state_->GetCurrentTermUnlocked() | 
| 3445 | 945 |                           << " since new term is " << new_term; | 
| 3446 |  |  | 
| 3447 | 945 |     RETURN_NOT_OK(BecomeReplicaUnlocked(std::string())); | 
| 3448 | 945 |   } | 
| 3449 |  |  | 
| 3450 | 757k |   LOG_WITH_PREFIX(INFO) << "Advancing to term " << new_term; | 
| 3451 | 757k |   RETURN_NOT_OK(state_->SetCurrentTermUnlocked(new_term)); | 
| 3452 | 182k |   term_metric_->set_value(new_term); | 
| 3453 | 182k |   return Status::OK(); | 
| 3454 | 757k | } | 
| 3455 |  |  | 
| 3456 |  | Result<ReadOpsResult> RaftConsensus::ReadReplicatedMessagesForCDC(const yb::OpId& from, | 
| 3457 |  |   int64_t* last_replicated_opid_index, | 
| 3458 | 617 |   const CoarseTimePoint deadline) { | 
| 3459 | 617 |   return queue_->ReadReplicatedMessagesForCDC(from, last_replicated_opid_index, deadline); | 
| 3460 | 617 | } | 
| 3461 |  |  | 
| 3462 | 874 | void RaftConsensus::UpdateCDCConsumerOpId(const yb::OpId& op_id) { | 
| 3463 | 874 |   return queue_->UpdateCDCConsumerOpId(op_id); | 
| 3464 | 874 | } | 
| 3465 |  |  | 
| 3466 |  | void RaftConsensus::RollbackIdAndDeleteOpId(const ReplicateMsgPtr& replicate_msg, | 
| 3467 | 23 |                                             bool should_exists) { | 
| 3468 | 23 |   state_->CancelPendingOperation(OpId::FromPB(replicate_msg->id()), should_exists); | 
| 3469 | 23 | } | 
| 3470 |  |  | 
| 3471 | 8.07k | uint64_t RaftConsensus::OnDiskSize() const { | 
| 3472 | 8.07k |   return state_->OnDiskSize(); | 
| 3473 | 8.07k | } | 
| 3474 |  |  | 
| 3475 | 5.36M | yb::OpId RaftConsensus::WaitForSafeOpIdToApply(const yb::OpId& op_id) { | 
| 3476 | 5.36M |   return log_->WaitForSafeOpIdToApply(op_id); | 
| 3477 | 5.36M | } | 
| 3478 |  |  | 
| 3479 | 50.1M | yb::OpId RaftConsensus::MinRetryableRequestOpId() { | 
| 3480 | 50.1M |   return state_->MinRetryableRequestOpId(); | 
| 3481 | 50.1M | } | 
| 3482 |  |  | 
| 3483 | 0 | size_t RaftConsensus::LogCacheSize() { | 
| 3484 | 0 |   return queue_->LogCacheSize(); | 
| 3485 | 0 | } | 
| 3486 |  |  | 
| 3487 | 0 | size_t RaftConsensus::EvictLogCache(size_t bytes_to_evict) { | 
| 3488 | 0 |   return queue_->EvictLogCache(bytes_to_evict); | 
| 3489 | 0 | } | 
| 3490 |  |  | 
| 3491 | 0 | RetryableRequestsCounts RaftConsensus::TEST_CountRetryableRequests() { | 
| 3492 | 0 |   return state_->TEST_CountRetryableRequests(); | 
| 3493 | 0 | } | 
| 3494 |  |  | 
| 3495 | 605 | void RaftConsensus::TrackOperationMemory(const yb::OpId& op_id) { | 
| 3496 | 605 |   queue_->TrackOperationsMemory({op_id}); | 
| 3497 | 605 | } | 
| 3498 |  |  | 
| 3499 | 23 | int64_t RaftConsensus::TEST_LeaderTerm() const { | 
| 3500 | 23 |   auto lock = state_->LockForRead(); | 
| 3501 | 23 |   return state_->GetCurrentTermUnlocked(); | 
| 3502 | 23 | } | 
| 3503 |  |  | 
| 3504 | 161 | std::string RaftConsensus::DelayedStepDown::ToString() const { | 
| 3505 | 161 |   return YB_STRUCT_TO_STRING(term, protege, graceful); | 
| 3506 | 161 | } | 
| 3507 |  |  | 
| 3508 | 3 | Result<OpId> RaftConsensus::TEST_GetLastOpIdWithType(OpIdType opid_type, OperationType op_type) { | 
| 3509 | 3 |   return queue_->TEST_GetLastOpIdWithType(VERIFY_RESULT(GetLastOpId(opid_type)).index, op_type); | 
| 3510 | 3 | } | 
| 3511 |  |  | 
| 3512 |  | }  // namespace consensus | 
| 3513 |  | }  // namespace yb |