YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/consensus/consensus_queue.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include "yb/consensus/consensus_queue.h"
34
35
#include <algorithm>
36
#include <mutex>
37
#include <shared_mutex>
38
#include <string>
39
#include <utility>
40
41
#include <boost/container/small_vector.hpp>
42
#include <glog/logging.h>
43
44
#include "yb/consensus/consensus_context.h"
45
#include "yb/consensus/log_util.h"
46
#include "yb/consensus/opid_util.h"
47
#include "yb/consensus/quorum_util.h"
48
#include "yb/consensus/raft_consensus.h"
49
#include "yb/consensus/replicate_msgs_holder.h"
50
51
#include "yb/gutil/bind.h"
52
#include "yb/gutil/dynamic_annotations.h"
53
#include "yb/gutil/map-util.h"
54
#include "yb/gutil/stl_util.h"
55
#include "yb/gutil/strings/substitute.h"
56
57
#include "yb/util/enums.h"
58
#include "yb/util/fault_injection.h"
59
#include "yb/util/flag_tags.h"
60
#include "yb/util/locks.h"
61
#include "yb/util/logging.h"
62
#include "yb/util/mem_tracker.h"
63
#include "yb/util/metrics.h"
64
#include "yb/util/monotime.h"
65
#include "yb/util/random_util.h"
66
#include "yb/util/result.h"
67
#include "yb/util/size_literals.h"
68
#include "yb/util/status_log.h"
69
#include "yb/util/threadpool.h"
70
#include "yb/util/tostring.h"
71
#include "yb/util/url-coding.h"
72
73
using namespace std::literals;
74
using namespace yb::size_literals;
75
76
DECLARE_uint64(rpc_max_message_size);
77
78
// We expect that consensus_max_batch_size_bytes + 1_KB would be less than rpc_max_message_size.
79
// Otherwise such batch would be rejected by RPC layer.
80
DEFINE_uint64(consensus_max_batch_size_bytes, 4_MB,
81
              "The maximum per-tablet RPC batch size when updating peers.");
82
TAG_FLAG(consensus_max_batch_size_bytes, advanced);
83
TAG_FLAG(consensus_max_batch_size_bytes, runtime);
84
85
DEFINE_int32(follower_unavailable_considered_failed_sec, 900,
86
             "Seconds that a leader is unable to successfully heartbeat to a "
87
             "follower after which the follower is considered to be failed and "
88
             "evicted from the config.");
89
TAG_FLAG(follower_unavailable_considered_failed_sec, advanced);
90
91
DEFINE_int32(consensus_inject_latency_ms_in_notifications, 0,
92
             "Injects a random sleep between 0 and this many milliseconds into "
93
             "asynchronous notifications from the consensus queue back to the "
94
             "consensus implementation.");
95
TAG_FLAG(consensus_inject_latency_ms_in_notifications, hidden);
96
TAG_FLAG(consensus_inject_latency_ms_in_notifications, unsafe);
97
98
DEFINE_int32(cdc_checkpoint_opid_interval_ms, 60 * 1000,
99
             "Interval up to which CDC consumer's checkpoint is considered for retaining log cache."
100
             "If we haven't received an updated checkpoint from CDC consumer within the interval "
101
             "specified by cdc_checkpoint_opid_interval, then log cache does not consider that "
102
             "consumer while determining which op IDs to evict.");
103
104
DEFINE_int64(cdc_intent_retention_ms, 4 * 3600 * 1000,
105
             "Interval up to which CDC consumer's checkpoint is considered for retaining intents."
106
             "If we haven't received an updated checkpoint from CDC consumer within the interval "
107
             "specified by cdc_checkpoint_opid_interval, then CDC does not consider that "
108
             "consumer while determining which op IDs to delete from the intent.");
109
110
DEFINE_bool(enable_consensus_exponential_backoff, true,
111
            "Whether exponential backoff based on number of retransmissions at tablet leader "
112
            "for number of entries to replicate to lagging follower is enabled.");
113
TAG_FLAG(enable_consensus_exponential_backoff, advanced);
114
TAG_FLAG(enable_consensus_exponential_backoff, runtime);
115
116
DEFINE_int32(consensus_lagging_follower_threshold, 10,
117
             "Number of retransmissions at tablet leader to mark a follower as lagging. "
118
             "-1 disables the feature.");
119
TAG_FLAG(consensus_lagging_follower_threshold, advanced);
120
TAG_FLAG(consensus_lagging_follower_threshold, runtime);
121
122
DEFINE_test_flag(bool, disallow_lmp_failures, false,
123
                 "Whether we disallow PRECEDING_ENTRY_DIDNT_MATCH failures for non new peers.");
124
125
namespace {
126
127
constexpr const auto kMinRpcThrottleThresholdBytes = 16;
128
129
14.6k
static bool RpcThrottleThresholdBytesValidator(const char* flagname, int64_t value) {
130
14.6k
  if (value > 0) {
131
14.6k
    if (value < kMinRpcThrottleThresholdBytes) {
132
0
      LOG(ERROR) << "Expect " << flagname << " to be at least " << kMinRpcThrottleThresholdBytes;
133
0
      return false;
134
14.6k
    } else if (implicit_cast<size_t>(value) >= FLAGS_consensus_max_batch_size_bytes) {
135
0
      LOG(ERROR) << "Expect " << flagname << " to be less than consensus_max_batch_size_bytes "
136
0
                 << "value (" << FLAGS_consensus_max_batch_size_bytes << ")";
137
0
      return false;
138
0
    }
139
14.6k
  }
140
14.6k
  return true;
141
14.6k
}
142
143
} // namespace
144
145
DECLARE_int64(rpc_throttle_threshold_bytes);
146
147
namespace yb {
148
namespace consensus {
149
150
using log::Log;
151
using std::unique_ptr;
152
using rpc::Messenger;
153
using strings::Substitute;
154
155
METRIC_DEFINE_gauge_int64(tablet, majority_done_ops, "Leader Operations Acked by Majority",
156
                          MetricUnit::kOperations,
157
                          "Number of operations in the leader queue ack'd by a majority but "
158
                          "not all peers.");
159
METRIC_DEFINE_gauge_int64(tablet, in_progress_ops, "Leader Operations in Progress",
160
                          MetricUnit::kOperations,
161
                          "Number of operations in the leader queue ack'd by a minority of "
162
                          "peers.");
163
164
const auto kCDCConsumerCheckpointInterval = FLAGS_cdc_checkpoint_opid_interval_ms * 1ms;
165
166
const auto kCDCConsumerIntentRetention = FLAGS_cdc_intent_retention_ms * 1ms;
167
168
0
std::string MajorityReplicatedData::ToString() const {
169
0
  return Format(
170
0
      "{ op_id: $0 leader_lease_expiration: $1 ht_lease_expiration: $2 num_sst_files: $3 }",
171
0
      op_id, leader_lease_expiration, ht_lease_expiration, num_sst_files);
172
0
}
173
174
122k
std::string PeerMessageQueue::TrackedPeer::ToString() const {
175
122k
  return Format(
176
122k
      "{ peer: $0 is_new: $1 last_received: $2 next_index: $3 last_known_committed_idx: $4 "
177
122k
      "is_last_exchange_successful: $5 needs_remote_bootstrap: $6 member_type: $7 "
178
122k
      "num_sst_files: $8 last_applied: $9 }",
179
122k
      uuid, is_new, last_received, next_index, last_known_committed_idx,
180
122k
      is_last_exchange_successful, needs_remote_bootstrap, PeerMemberType_Name(member_type),
181
122k
      num_sst_files, last_applied);
182
122k
}
183
184
79.5k
void PeerMessageQueue::TrackedPeer::ResetLeaderLeases() {
185
79.5k
  leader_lease_expiration.Reset();
186
79.5k
  leader_ht_lease_expiration.Reset();
187
79.5k
}
188
189
34.9M
void PeerMessageQueue::TrackedPeer::ResetLastRequest() {
190
  // Reset so that next transmission is not considered a re-transmission.
191
34.9M
  last_num_messages_sent = -1;
192
34.9M
  current_retransmissions = -1;
193
34.9M
}
194
195
#define INSTANTIATE_METRIC(x) \
196
  x.Instantiate(metric_entity, 0)
197
PeerMessageQueue::Metrics::Metrics(const scoped_refptr<MetricEntity>& metric_entity)
198
  : num_majority_done_ops(INSTANTIATE_METRIC(METRIC_majority_done_ops)),
199
150k
    num_in_progress_ops(INSTANTIATE_METRIC(METRIC_in_progress_ops)) {
200
150k
}
201
#undef INSTANTIATE_METRIC
202
203
PeerMessageQueue::PeerMessageQueue(const scoped_refptr<MetricEntity>& metric_entity,
204
                                   const scoped_refptr<log::Log>& log,
205
                                   const MemTrackerPtr& server_tracker,
206
                                   const MemTrackerPtr& parent_tracker,
207
                                   const RaftPeerPB& local_peer_pb,
208
                                   const string& tablet_id,
209
                                   const server::ClockPtr& clock,
210
                                   ConsensusContext* context,
211
                                   unique_ptr<ThreadPoolToken> raft_pool_token)
212
    : raft_pool_observers_token_(std::move(raft_pool_token)),
213
      local_peer_pb_(local_peer_pb),
214
      local_peer_uuid_(local_peer_pb_.has_permanent_uuid() ? local_peer_pb_.permanent_uuid()
215
                                                           : string()),
216
      tablet_id_(tablet_id),
217
      log_cache_(metric_entity, log, server_tracker, local_peer_pb.permanent_uuid(), tablet_id),
218
      operations_mem_tracker_(
219
          MemTracker::FindOrCreateTracker("OperationsFromDisk", parent_tracker)),
220
      metrics_(metric_entity),
221
      clock_(clock),
222
150k
      context_(context) {
223
150k
  DCHECK(local_peer_pb_.has_permanent_uuid());
224
150k
  DCHECK(!local_peer_pb_.last_known_private_addr().empty());
225
150k
}
226
227
150k
void PeerMessageQueue::Init(const OpId& last_locally_replicated) {
228
150k
  LockGuard lock(queue_lock_);
229
150k
  CHECK_EQ(queue_state_.state, State::kQueueConstructed);
230
150k
  log_cache_.Init(last_locally_replicated.ToPB<OpIdPB>());
231
150k
  queue_state_.last_appended = last_locally_replicated;
232
150k
  queue_state_.state = State::kQueueOpen;
233
150k
  local_peer_ = TrackPeerUnlocked(local_peer_uuid_);
234
235
150k
  if (context_) {
236
150k
    context_->ListenNumSSTFilesChanged(std::bind(&PeerMessageQueue::NumSSTFilesChanged, this));
237
150k
    installed_num_sst_files_changed_listener_ = true;
238
150k
  }
239
150k
}
240
241
void PeerMessageQueue::SetLeaderMode(const OpId& committed_op_id,
242
                                     int64_t current_term,
243
                                     const OpId& last_applied_op_id,
244
67.9k
                                     const RaftConfigPB& active_config) {
245
67.9k
  LockGuard lock(queue_lock_);
246
67.9k
  queue_state_.current_term = current_term;
247
67.9k
  queue_state_.committed_op_id = committed_op_id;
248
67.9k
  queue_state_.last_applied_op_id = last_applied_op_id;
249
67.9k
  queue_state_.majority_replicated_op_id = committed_op_id;
250
67.9k
  queue_state_.active_config.reset(new RaftConfigPB(active_config));
251
67.9k
  CHECK(IsRaftConfigVoter(local_peer_uuid_, *queue_state_.active_config))
252
11
      << local_peer_pb_.ShortDebugString() << " not a voter in config: "
253
11
      << queue_state_.active_config->ShortDebugString();
254
67.9k
  queue_state_.majority_size_ = MajoritySize(CountVoters(*queue_state_.active_config));
255
67.9k
  queue_state_.mode = Mode::LEADER;
256
257
67.9k
  LOG_WITH_PREFIX_UNLOCKED(INFO) << "Queue going to LEADER mode. State: "
258
67.9k
      << queue_state_.ToString();
259
67.9k
  CheckPeersInActiveConfigIfLeaderUnlocked();
260
261
  // Reset last communication time with all peers to reset the clock on the
262
  // failure timeout.
263
67.9k
  MonoTime now(MonoTime::Now());
264
79.5k
  for (const PeersMap::value_type& entry : peers_map_) {
265
79.5k
    entry.second->ResetLeaderLeases();
266
79.5k
    entry.second->last_successful_communication_time = now;
267
79.5k
  }
268
67.9k
}
269
270
161k
void PeerMessageQueue::SetNonLeaderMode() {
271
161k
  LockGuard lock(queue_lock_);
272
161k
  queue_state_.active_config.reset();
273
161k
  queue_state_.mode = Mode::NON_LEADER;
274
161k
  queue_state_.majority_size_ = -1;
275
161k
  LOG_WITH_PREFIX_UNLOCKED(INFO) << "Queue going to NON_LEADER mode. State: "
276
161k
      << queue_state_.ToString();
277
161k
}
278
279
123k
void PeerMessageQueue::TrackPeer(const string& uuid) {
280
123k
  LockGuard lock(queue_lock_);
281
123k
  TrackPeerUnlocked(uuid);
282
123k
}
283
284
273k
PeerMessageQueue::TrackedPeer* PeerMessageQueue::TrackPeerUnlocked(const string& uuid) {
285
273k
  CHECK
(!uuid.empty()) << "Got request to track peer with empty UUID"49
;
286
273k
  DCHECK_EQ(queue_state_.state, State::kQueueOpen);
287
288
273k
  TrackedPeer* tracked_peer = new TrackedPeer(uuid);
289
290
  // We don't know the last operation received by the peer so, following the Raft protocol, we set
291
  // next_index to one past the end of our own log. This way, if calling this method is the result
292
  // of a successful leader election and the logs between the new leader and remote peer match, the
293
  // peer->next_index will point to the index of the soon-to-be-written NO_OP entry that is used to
294
  // assert leadership. If we guessed wrong, and the peer does not have a log that matches ours, the
295
  // normal queue negotiation process will eventually find the right point to resume from.
296
273k
  tracked_peer->next_index = queue_state_.last_appended.index + 1;
297
273k
  InsertOrDie(&peers_map_, uuid, tracked_peer);
298
299
273k
  CheckPeersInActiveConfigIfLeaderUnlocked();
300
301
  // We don't know how far back this peer is, so set the all replicated watermark to
302
  // MinimumOpId. We'll advance it when we know how far along the peer is.
303
273k
  queue_state_.all_replicated_op_id = OpId::Min();
304
273k
  return tracked_peer;
305
273k
}
306
307
75.4k
void PeerMessageQueue::UntrackPeer(const string& uuid) {
308
75.4k
  LockGuard lock(queue_lock_);
309
75.4k
  TrackedPeer* peer = EraseKeyReturnValuePtr(&peers_map_, uuid);
310
75.4k
  if (peer != nullptr) {
311
75.4k
    delete peer;
312
75.4k
  }
313
75.4k
}
314
315
341k
void PeerMessageQueue::CheckPeersInActiveConfigIfLeaderUnlocked() const {
316
341k
  if (queue_state_.mode != Mode::LEADER) 
return150k
;
317
191k
  std::unordered_set<std::string> config_peer_uuids;
318
581k
  for (const RaftPeerPB& peer_pb : queue_state_.active_config->peers()) {
319
581k
    InsertOrDie(&config_peer_uuids, peer_pb.permanent_uuid());
320
581k
  }
321
396k
  for (const PeersMap::value_type& entry : peers_map_) {
322
396k
    if (!ContainsKey(config_peer_uuids, entry.first)) {
323
0
      LOG_WITH_PREFIX_UNLOCKED(FATAL) << Substitute("Peer $0 is not in the active config. "
324
0
                                                    "Queue state: $1",
325
0
                                                    entry.first,
326
0
                                                    queue_state_.ToString());
327
0
    }
328
396k
  }
329
191k
}
330
331
3.41k
void PeerMessageQueue::NumSSTFilesChanged() {
332
3.41k
  auto num_sst_files = context_->NumSSTFiles();
333
334
3.41k
  uint64_t majority_replicated_num_sst_files;
335
3.41k
  {
336
3.41k
    LockGuard lock(queue_lock_);
337
3.41k
    if (queue_state_.mode != Mode::LEADER) {
338
1.34k
      return;
339
1.34k
    }
340
2.06k
    auto it = peers_map_.find(local_peer_uuid_);
341
2.06k
    if (it == peers_map_.end()) {
342
0
      return;
343
0
    }
344
2.06k
    it->second->num_sst_files = num_sst_files;
345
2.06k
    majority_replicated_num_sst_files = NumSSTFilesWatermark();
346
2.06k
  }
347
348
0
  NotifyObservers(
349
2.06k
      "majority replicated num SST files changed",
350
2.06k
      [majority_replicated_num_sst_files](PeerMessageQueueObserver* observer) {
351
2.06k
    observer->MajorityReplicatedNumSSTFilesChanged(majority_replicated_num_sst_files);
352
2.06k
  });
353
2.06k
}
354
355
24.3M
void PeerMessageQueue::LocalPeerAppendFinished(const OpId& id, const Status& status) {
356
24.3M
  CHECK_OK(status);
357
358
  // Fake an RPC response from the local peer.
359
  // TODO: we should probably refactor the ResponseFromPeer function so that we don't need to
360
  // construct this fake response, but this seems to work for now.
361
24.3M
  ConsensusResponsePB fake_response;
362
24.3M
  id.ToPB(fake_response.mutable_status()->mutable_last_received());
363
24.3M
  id.ToPB(fake_response.mutable_status()->mutable_last_received_current_leader());
364
24.3M
  if (context_) {
365
24.2M
    fake_response.set_num_sst_files(context_->NumSSTFiles());
366
24.2M
  }
367
24.3M
  {
368
24.3M
    LockGuard lock(queue_lock_);
369
370
    // TODO This ugly fix is required because we unlock queue_lock_ while doing AppendOperations.
371
    // So LocalPeerAppendFinished could be invoked before rest of AppendOperations.
372
24.3M
    if (queue_state_.last_appended.index < id.index) {
373
88.5k
      queue_state_.last_appended = id;
374
88.5k
    }
375
24.3M
    fake_response.mutable_status()->set_last_committed_idx(queue_state_.committed_op_id.index);
376
24.3M
    queue_state_.last_applied_op_id.ToPB(fake_response.mutable_status()->mutable_last_applied());
377
378
24.3M
    if (queue_state_.mode != Mode::LEADER) {
379
15.0M
      log_cache_.EvictThroughOp(id.index);
380
381
15.0M
      UpdateMetrics();
382
15.0M
      return;
383
15.0M
    }
384
24.3M
  }
385
9.28M
  ResponseFromPeer(local_peer_uuid_, fake_response);
386
9.28M
}
387
388
768
Status PeerMessageQueue::TEST_AppendOperation(const ReplicateMsgPtr& msg) {
389
768
  return AppendOperations(
390
768
      { msg }, yb::OpId::FromPB(msg->committed_op_id()), RestartSafeCoarseMonoClock().Now());
391
768
}
392
393
Status PeerMessageQueue::AppendOperations(const ReplicateMsgs& msgs,
394
                                          const yb::OpId& committed_op_id,
395
24.2M
                                          RestartSafeCoarseTimePoint batch_mono_time) {
396
24.2M
  DFAKE_SCOPED_LOCK(append_fake_lock_);
397
24.2M
  OpId last_id;
398
24.2M
  if (!msgs.empty()) {
399
13.2M
    std::unique_lock<simple_spinlock> lock(queue_lock_);
400
401
13.2M
    last_id = OpId::FromPB(msgs.back()->id());
402
403
13.2M
    if (last_id.term > queue_state_.current_term) {
404
120k
      queue_state_.current_term = last_id.term;
405
120k
    }
406
13.2M
  } else {
407
11.0M
    std::unique_lock<simple_spinlock> lock(queue_lock_);
408
11.0M
    last_id = queue_state_.last_appended;
409
11.0M
  }
410
411
  // Unlock ourselves during Append to prevent a deadlock: it's possible that the log buffer is
412
  // full, in which case AppendOperations would block. However, for the log buffer to empty, it may
413
  // need to call LocalPeerAppendFinished() which also needs queue_lock_.
414
  //
415
  // Since we are doing AppendOperations only in one thread, no concurrent AppendOperations could
416
  // be executed and queue_state_.last_appended will be updated correctly.
417
24.2M
  RETURN_NOT_OK(log_cache_.AppendOperations(
418
24.2M
      msgs, committed_op_id, batch_mono_time,
419
24.2M
      Bind(&PeerMessageQueue::LocalPeerAppendFinished, Unretained(this), last_id)));
420
421
24.2M
  if (!msgs.empty()) {
422
13.2M
    std::unique_lock<simple_spinlock> lock(queue_lock_);
423
13.2M
    queue_state_.last_appended = last_id;
424
13.2M
    UpdateMetrics();
425
13.2M
  }
426
427
24.2M
  return Status::OK();
428
24.2M
}
429
430
3.17M
uint64_t GetNumMessagesToSendWithBackoff(int64_t last_num_messages_sent) {
431
3.17M
  return std::max<int64_t>((last_num_messages_sent >> 1) - 1, 0);
432
3.17M
}
433
434
Status PeerMessageQueue::RequestForPeer(const string& uuid,
435
                                        ConsensusRequestPB* request,
436
                                        ReplicateMsgsHolder* msgs_holder,
437
                                        bool* needs_remote_bootstrap,
438
                                        PeerMemberType* member_type,
439
28.8M
                                        bool* last_exchange_successful) {
440
28.8M
  static constexpr uint64_t kSendUnboundedLogOps = std::numeric_limits<uint64_t>::max();
441
28.8M
  DCHECK
(request->ops().empty()) << request->ShortDebugString()61.4k
;
442
443
28.8M
  OpId preceding_id;
444
28.8M
  MonoDelta unreachable_time = MonoDelta::kMin;
445
28.8M
  bool is_voter = false;
446
28.8M
  bool is_new;
447
28.8M
  int64_t previously_sent_index;
448
28.8M
  uint64_t num_log_ops_to_send;
449
28.8M
  HybridTime propagated_safe_time;
450
451
  // Should be before now_ht, i.e. not greater than propagated_hybrid_time.
452
28.8M
  if (context_) {
453
28.7M
    propagated_safe_time = VERIFY_RESULT(context_->PreparePeerRequest());
454
28.7M
  }
455
456
28.8M
  {
457
28.8M
    LockGuard lock(queue_lock_);
458
28.8M
    DCHECK_EQ(queue_state_.state, State::kQueueOpen);
459
28.8M
    DCHECK_NE(uuid, local_peer_uuid_);
460
461
28.8M
    auto peer = FindPtrOrNull(peers_map_, uuid);
462
28.8M
    if (PREDICT_FALSE(peer == nullptr || queue_state_.mode == Mode::NON_LEADER)) {
463
69
      return STATUS(NotFound, "Peer not tracked or queue not in leader mode.");
464
69
    }
465
466
28.8M
    HybridTime now_ht;
467
468
28.8M
    is_new = peer->is_new;
469
28.8M
    if (!is_new) {
470
28.6M
      now_ht = clock_->Now();
471
472
28.6M
      auto ht_lease_expiration_micros = now_ht.GetPhysicalValueMicros() +
473
28.6M
                                        FLAGS_ht_lease_duration_ms * 1000;
474
28.6M
      auto leader_lease_duration_ms = GetAtomicFlag(&FLAGS_leader_lease_duration_ms);
475
28.6M
      request->set_leader_lease_duration_ms(leader_lease_duration_ms);
476
28.6M
      request->set_ht_lease_expiration(ht_lease_expiration_micros);
477
478
      // As noted here:
479
      // https://red.ht/2sCSErb
480
      //
481
      // The _COARSE variants are faster to read and have a precision (also known as resolution) of
482
      // one millisecond (ms).
483
      //
484
      // Coarse clock precision is 1 millisecond.
485
28.6M
      const auto kCoarseClockPrecision = 1ms;
486
487
      // Because of coarse clocks we subtract 2ms, to be sure that our local version of lease
488
      // does not expire after it expires at follower.
489
28.6M
      peer->leader_lease_expiration.last_sent =
490
28.6M
          CoarseMonoClock::Now() + leader_lease_duration_ms * 1ms - kCoarseClockPrecision * 2;
491
28.6M
      peer->leader_ht_lease_expiration.last_sent = ht_lease_expiration_micros;
492
28.6M
    } else {
493
190k
      now_ht = clock_->Now();
494
190k
      request->clear_leader_lease_duration_ms();
495
190k
      request->clear_ht_lease_expiration();
496
190k
      peer->leader_lease_expiration.Reset();
497
190k
      peer->leader_ht_lease_expiration.Reset();
498
190k
    }
499
    // This is initialized to the queue's last appended op but gets set to the id of the
500
    // log entry preceding the first one in 'messages' if messages are found for the peer.
501
    //
502
    // The leader does not know the actual state of a peer but it should always send a value of
503
    // preceding_id that is present in the leader's own log, so the follower can verify the log
504
    // matching property.
505
    //
506
    // In case we decide not to send any messages to the follower this time due to exponential
507
    // backoff to an unresponsive follower, we will keep preceding_id equal to last_appended.
508
    // This is safe because unless the follower already has that operation, it will fail to find
509
    // it in its pending operations in EnforceLogMatchingPropertyMatchesUnlocked and will return
510
    // a log matching property violation error without applying any incorrect messages from its log.
511
    //
512
    // See this scenario for more context on the issue we are trying to avoid:
513
    // https://github.com/yugabyte/yugabyte-db/issues/8150#issuecomment-827821784
514
28.8M
    preceding_id = queue_state_.last_appended;
515
516
28.8M
    request->set_propagated_hybrid_time(now_ht.ToUint64());
517
518
    // NOTE: committed_op_id may be overwritten later.
519
    // In our system committed_op_id means that this operation was also applied.
520
    // If we have operation that applied significant time, followers would not know that this
521
    // operation is committed until it is applied in the leader.
522
    // To address this issue we use majority_replicated_op_id, that is updated before apply.
523
    // But we could use it only when its term matches current term, see Fig.8 in Raft paper.
524
28.8M
    if (queue_state_.majority_replicated_op_id.index > queue_state_.committed_op_id.index &&
525
28.8M
        
queue_state_.majority_replicated_op_id.term == queue_state_.current_term720k
) {
526
720k
      queue_state_.majority_replicated_op_id.ToPB(request->mutable_committed_op_id());
527
28.1M
    } else {
528
28.1M
      queue_state_.committed_op_id.ToPB(request->mutable_committed_op_id());
529
28.1M
    }
530
531
28.8M
    request->set_caller_term(queue_state_.current_term);
532
28.8M
    unreachable_time =
533
28.8M
        MonoTime::Now().GetDeltaSince(peer->last_successful_communication_time);
534
28.8M
    if (
member_type28.8M
) *member_type = peer->member_type;
535
28.8M
    if (
last_exchange_successful28.8M
) *last_exchange_successful = peer->is_last_exchange_successful;
536
28.8M
    *needs_remote_bootstrap = peer->needs_remote_bootstrap;
537
538
28.8M
    previously_sent_index = peer->next_index - 1;
539
28.8M
    if (FLAGS_enable_consensus_exponential_backoff && 
peer->last_num_messages_sent >= 028.8M
) {
540
      // Previous request to peer has not been acked. Reduce number of entries to be sent
541
      // in this attempt using exponential backoff. Note that to_index is inclusive.
542
3.17M
      num_log_ops_to_send = GetNumMessagesToSendWithBackoff(peer->last_num_messages_sent);
543
25.6M
    } else {
544
      // Previous request to peer has been acked or a heartbeat response has been received.
545
      // Transmit as many entries as allowed.
546
25.6M
      num_log_ops_to_send = kSendUnboundedLogOps;
547
25.6M
    }
548
549
28.8M
    peer->current_retransmissions++;
550
551
28.8M
    if (peer->member_type == PeerMemberType::VOTER) {
552
25.5M
      is_voter = true;
553
25.5M
    }
554
28.8M
  }
555
556
28.8M
  if (unreachable_time.ToSeconds() > FLAGS_follower_unavailable_considered_failed_sec) {
557
217k
    if (!is_voter || 
CountVoters(*queue_state_.active_config) > 2217k
) {
558
      // We never drop from 2 voters to 1 voter automatically, at least for now (12/4/18). We may
559
      // want to revisit this later, we're just being cautious with this.
560
      // We remove unconditionally any failed non-voter replica (PRE_VOTER, PRE_OBSERVER, OBSERVER).
561
235
      string msg = Substitute("Leader has been unable to successfully communicate "
562
235
                              "with Peer $0 for more than $1 seconds ($2)",
563
235
                              uuid,
564
235
                              FLAGS_follower_unavailable_considered_failed_sec,
565
235
                              unreachable_time.ToString());
566
235
      NotifyObserversOfFailedFollower(uuid, queue_state_.current_term, msg);
567
235
    }
568
217k
  }
569
570
28.8M
  if (PREDICT_FALSE(*needs_remote_bootstrap)) {
571
10.7k
      
YB_LOG_WITH_PREFIX_UNLOCKED_EVERY_N_SECS486
(INFO, 30)
572
486
          << "Peer needs remote bootstrap: " << uuid;
573
10.7k
    return Status::OK();
574
10.7k
  }
575
28.8M
  *needs_remote_bootstrap = false;
576
577
28.8M
  request->clear_propagated_safe_time();
578
579
  // If we've never communicated with the peer, we don't know what messages to send, so we'll send a
580
  // status-only request. If the peer has not responded to the point that our to_index == next_index
581
  // due to exponential backoff of replicated segment size, we also send a status-only request.
582
  // Otherwise, we grab requests from the log starting at the last_received point.
583
28.8M
  if (!is_new && 
num_log_ops_to_send > 028.6M
) {
584
    // The batch of messages to send to the peer.
585
25.4M
    auto max_batch_size = FLAGS_consensus_max_batch_size_bytes - request->ByteSizeLong();
586
25.4M
    auto to_index = num_log_ops_to_send == kSendUnboundedLogOps ?
587
25.4M
        0 : 
previously_sent_index + num_log_ops_to_send26.9k
;
588
25.4M
    auto result = ReadFromLogCache(previously_sent_index, to_index, max_batch_size, uuid);
589
590
25.4M
    if (PREDICT_FALSE(!result.ok())) {
591
0
      if (PREDICT_TRUE(result.status().IsNotFound())) {
592
0
        std::string msg = Format("The logs necessary to catch up peer $0 have been "
593
0
                                 "garbage collected. The follower will never be able "
594
0
                                 "to catch up ($1)", uuid, result.status());
595
0
        NotifyObserversOfFailedFollower(uuid, queue_state_.current_term, msg);
596
0
      }
597
0
      return result.status();
598
0
    }
599
600
25.4M
    preceding_id = result->preceding_op;
601
    // We use AddAllocated rather than copy, because we pin the log cache at the "all replicated"
602
    // point. At some point we may want to allow partially loading (and not pinning) earlier
603
    // messages. At that point we'll need to do something smarter here, like copy or ref-count.
604
25.4M
    for (const auto& msg : result->messages) {
605
9.43M
      request->mutable_ops()->AddAllocated(msg.get());
606
9.43M
    }
607
608
25.4M
    {
609
25.4M
      LockGuard lock(queue_lock_);
610
25.4M
      auto peer = FindPtrOrNull(peers_map_, uuid);
611
25.4M
      if (PREDICT_FALSE(peer == nullptr)) {
612
0
        return STATUS(NotFound, "Peer not tracked.");
613
0
      }
614
615
25.4M
      peer->last_num_messages_sent = result->messages.size();
616
25.4M
    }
617
618
0
    ScopedTrackedConsumption consumption;
619
25.4M
    if (result->read_from_disk_size) {
620
1.33k
      consumption = ScopedTrackedConsumption(operations_mem_tracker_, result->read_from_disk_size);
621
1.33k
    }
622
25.4M
    *msgs_holder = ReplicateMsgsHolder(
623
25.4M
        request->mutable_ops(), std::move(result->messages), std::move(consumption));
624
625
25.4M
    if (propagated_safe_time &&
626
25.4M
        
!result->have_more_messages25.4M
&&
627
25.4M
        
num_log_ops_to_send == kSendUnboundedLogOps25.4M
) {
628
      // Get the current local safe time on the leader and propagate it to the follower.
629
25.4M
      request->set_propagated_safe_time(propagated_safe_time.ToUint64());
630
25.4M
    }
631
25.4M
  }
632
633
28.8M
  preceding_id.ToPB(request->mutable_preceding_id());
634
635
  // All entries committed at leader may not be available at lagging follower.
636
  // `commited_op_id` in this request may make a lagging follower aware of the
637
  // highest committed op index at the leader. We have a sanity check during tablet
638
  // bootstrap, in TabletBootstrap::PlaySegments(), that this tablet did not lose a
639
  // committed operation. Hence avoid sending a committed op id that is too large
640
  // to such a lagging follower.
641
  // If we send operations to it, then last know operation to this follower will be last sent
642
  // operation. If we don't send any operation, then last known operation will be preceding
643
  // operation.
644
  // We don't have to change committed_op_id when it is less than max_allowed_committed_op_id,
645
  // because it will have actual committed_op_id value and this operation is known to the
646
  // follower.
647
28.8M
  const auto max_allowed_committed_op_id = !request->ops().empty()
648
28.8M
      ? 
OpId::FromPB(request->ops().rbegin()->id())8.27M
:
preceding_id20.5M
;
649
28.8M
  if (max_allowed_committed_op_id.index < request->committed_op_id().index()) {
650
2.06k
    max_allowed_committed_op_id.ToPB(request->mutable_committed_op_id());
651
2.06k
  }
652
653
28.8M
  if (PREDICT_FALSE(VLOG_IS_ON(2))) {
654
0
    if (request->ops_size() > 0) {
655
0
      VLOG_WITH_PREFIX_UNLOCKED(2) << "Sending request with operations to Peer: " << uuid
656
0
          << ". Size: " << request->ops_size()
657
0
          << ". From: " << request->ops(0).id().ShortDebugString() << ". To: "
658
0
          << request->ops(request->ops_size() - 1).id().ShortDebugString();
659
0
      VLOG_WITH_PREFIX_UNLOCKED(3) << "Operations: " << yb::ToString(request->ops());
660
0
    } else {
661
0
      VLOG_WITH_PREFIX_UNLOCKED(2)
662
0
          << "Sending " << (is_new ? "new " : "") << "status only request to Peer: " << uuid
663
0
          << ": " << request->ShortDebugString();
664
0
    }
665
0
  }
666
667
28.8M
  return Status::OK();
668
28.8M
}
669
670
Result<ReadOpsResult> PeerMessageQueue::ReadFromLogCache(int64_t after_index,
671
                                                         int64_t to_index,
672
                                                         size_t max_batch_size,
673
                                                         const std::string& peer_uuid,
674
25.4M
                                                         const CoarseTimePoint deadline) {
675
25.4M
  DCHECK_LT(FLAGS_consensus_max_batch_size_bytes + 1_KB, FLAGS_rpc_max_message_size);
676
677
  // We try to get the follower's next_index from our log.
678
  // Note this is not using "term" and needs to change
679
25.4M
  auto result = log_cache_.ReadOps(after_index, to_index, max_batch_size, deadline);
680
25.4M
  if (PREDICT_FALSE(!result.ok())) {
681
0
    auto s = result.status();
682
0
    if (PREDICT_TRUE(s.IsNotFound())) {
683
0
      return s;
684
0
    } else if (s.IsIncomplete()) {
685
      // IsIncomplete() means that we tried to read beyond the head of the log (in the future).
686
      // KUDU-1078 points to a fix of this log spew issue that we've ported. This should not
687
      // happen under normal circumstances.
688
0
      LOG_WITH_PREFIX_UNLOCKED(ERROR) << "Error trying to read ahead of the log "
689
0
                                      << "while preparing peer request: "
690
0
                                      << s.ToString() << ". Destination peer: "
691
0
                                      << peer_uuid;
692
0
      return s;
693
0
    } else {
694
0
      LOG_WITH_PREFIX_UNLOCKED(FATAL) << "Error reading the log while preparing peer request: "
695
0
                                      << s.ToString() << ". Destination peer: "
696
0
                                      << peer_uuid;
697
0
      return s;
698
0
    }
699
0
  }
700
25.4M
  return result;
701
25.4M
}
702
703
// Read majority replicated messages from cache for CDC.
704
// CDC producer will use this to get the messages to send in response to cdc::GetChanges RPC.
705
Result<ReadOpsResult> PeerMessageQueue::ReadReplicatedMessagesForCDC(
706
  const yb::OpId& last_op_id,
707
  int64_t* repl_index,
708
620
  const CoarseTimePoint deadline) {
709
  // The batch of messages read from cache.
710
711
620
  int64_t to_index;
712
620
  bool pending_messages = false;
713
620
  {
714
620
    LockGuard lock(queue_lock_);
715
    // Use committed_op_id because it's already been processed by the Transaction codepath.
716
620
    to_index = queue_state_.committed_op_id.index;
717
    // Determine if there are pending operations in RAFT but not yet LogCache.
718
620
    pending_messages = to_index != queue_state_.majority_replicated_op_id.index;
719
620
  }
720
620
  if (repl_index) {
721
617
    *repl_index = to_index;
722
617
  }
723
724
620
  if (last_op_id.index >= to_index) {
725
    // Nothing to read.
726
0
    return ReadOpsResult();
727
0
  }
728
729
  // If an empty OpID is only sent on the first read request, start at the earliest known entry.
730
620
  int64_t after_op_index = last_op_id.empty() ?
731
606
                             max(log_cache_.earliest_op_index(), last_op_id.index) :
732
620
                             
last_op_id.index14
;
733
734
620
  auto result = ReadFromLogCache(
735
620
      after_op_index, to_index, FLAGS_consensus_max_batch_size_bytes, local_peer_uuid_, deadline);
736
620
  if (PREDICT_FALSE(!result.ok()) && 
PREDICT_TRUE0
(result.status().IsNotFound())) {
737
0
    LOG_WITH_PREFIX_UNLOCKED(INFO) << Format(
738
0
        "The logs from index $0 have been garbage collected and cannot be read ($1)",
739
0
        after_op_index, result.status());
740
0
  }
741
620
  if (result.ok()) {
742
620
    result->have_more_messages = HaveMoreMessages(result->have_more_messages.get() ||
743
620
                                                  pending_messages);
744
620
  }
745
620
  return result;
746
620
}
747
748
Status PeerMessageQueue::GetRemoteBootstrapRequestForPeer(const string& uuid,
749
10.6k
                                                          StartRemoteBootstrapRequestPB* req) {
750
10.6k
  TrackedPeer* peer = nullptr;
751
10.6k
  {
752
10.6k
    LockGuard lock(queue_lock_);
753
10.6k
    DCHECK_EQ(queue_state_.state, State::kQueueOpen);
754
10.6k
    DCHECK_NE(uuid, local_peer_uuid_);
755
10.6k
    peer = FindPtrOrNull(peers_map_, uuid);
756
10.6k
    if (PREDICT_FALSE(peer == nullptr || queue_state_.mode == Mode::NON_LEADER)) {
757
0
      return STATUS(NotFound, "Peer not tracked or queue not in leader mode.");
758
0
    }
759
10.6k
  }
760
761
10.6k
  if (PREDICT_FALSE(!peer->needs_remote_bootstrap)) {
762
0
    return STATUS(IllegalState, "Peer does not need to remotely bootstrap", uuid);
763
0
  }
764
765
10.6k
  if (peer->member_type == PeerMemberType::VOTER || 
peer->member_type == PeerMemberType::OBSERVER10.6k
) {
766
5
    LOG(INFO) << "Remote bootstrapping peer " << uuid << " with type "
767
5
              << PeerMemberType_Name(peer->member_type);
768
5
  }
769
770
10.6k
  req->Clear();
771
10.6k
  req->set_dest_uuid(uuid);
772
10.6k
  req->set_tablet_id(tablet_id_);
773
10.6k
  req->set_bootstrap_peer_uuid(local_peer_uuid_);
774
10.6k
  *req->mutable_source_private_addr() = local_peer_pb_.last_known_private_addr();
775
10.6k
  *req->mutable_source_broadcast_addr() = local_peer_pb_.last_known_broadcast_addr();
776
10.6k
  *req->mutable_source_cloud_info() = local_peer_pb_.cloud_info();
777
10.6k
  req->set_caller_term(queue_state_.current_term);
778
10.6k
  peer->needs_remote_bootstrap = false; // Now reset the flag.
779
10.6k
  return Status::OK();
780
10.6k
}
781
782
874
void PeerMessageQueue::UpdateCDCConsumerOpId(const yb::OpId& op_id) {
783
874
  std::lock_guard<rw_spinlock> l(cdc_consumer_lock_);
784
874
  cdc_consumer_op_id_ = op_id;
785
874
  cdc_consumer_op_id_last_updated_ = CoarseMonoClock::Now();
786
874
}
787
788
34.7M
yb::OpId PeerMessageQueue::GetCDCConsumerOpIdToEvict() {
789
34.7M
  std::shared_lock<rw_spinlock> l(cdc_consumer_lock_);
790
  // For log cache eviction, we only want to include CDC consumers that are actively polling.
791
  // If CDC consumer checkpoint has not been updated recently, we exclude it.
792
34.7M
  if (CoarseMonoClock::Now() - cdc_consumer_op_id_last_updated_ <= kCDCConsumerCheckpointInterval) {
793
10.3k
    return cdc_consumer_op_id_;
794
34.7M
  } else {
795
34.7M
    return yb::OpId::Max();
796
34.7M
  }
797
34.7M
}
798
799
3.82M
yb::OpId PeerMessageQueue::GetCDCConsumerOpIdForIntentRemoval() {
800
3.82M
  std::shared_lock<rw_spinlock> l(cdc_consumer_lock_);
801
3.82M
  if (CoarseMonoClock::Now() - cdc_consumer_op_id_last_updated_ <= kCDCConsumerIntentRetention) {
802
378k
    return cdc_consumer_op_id_;
803
3.44M
  } else {
804
3.44M
    return yb::OpId::Max();
805
3.44M
  }
806
3.82M
}
807
808
34.7M
void PeerMessageQueue::UpdateAllReplicatedOpId(OpId* result) {
809
34.7M
  OpId new_op_id = OpId::Max();
810
811
102M
  for (const auto& peer : peers_map_) {
812
102M
    if (!peer.second->is_last_exchange_successful) {
813
412k
      return;
814
412k
    }
815
102M
    if (peer.second->last_received.index < new_op_id.index) {
816
39.4M
      new_op_id = peer.second->last_received;
817
39.4M
    }
818
102M
  }
819
820
34.3M
  CHECK_NE(OpId::Max(), new_op_id);
821
34.3M
  *result = new_op_id;
822
34.3M
}
823
824
69.4M
void PeerMessageQueue::UpdateAllAppliedOpId(OpId* result) {
825
69.4M
  OpId all_applied_op_id = OpId::Max();
826
205M
  for (const auto& peer : peers_map_) {
827
205M
    if (!peer.second->is_last_exchange_successful) {
828
809k
      return;
829
809k
    }
830
204M
    all_applied_op_id = std::min(all_applied_op_id, peer.second->last_applied);
831
204M
  }
832
833
68.6M
  CHECK_NE(OpId::Max(), all_applied_op_id);
834
68.6M
  *result = all_applied_op_id;
835
68.6M
}
836
837
34.7M
void PeerMessageQueue::UpdateAllNonLaggingReplicatedOpId(int32_t threshold) {
838
34.7M
  OpId new_op_id = OpId::Max();
839
840
103M
  for (const auto& peer : peers_map_) {
841
    // Ignore lagging follower.
842
103M
    if (peer.second->current_retransmissions >= threshold) {
843
146k
      continue;
844
146k
    }
845
103M
    if (peer.second->last_received.index < new_op_id.index) {
846
39.8M
      new_op_id = peer.second->last_received;
847
39.8M
    }
848
103M
  }
849
850
34.7M
  if (new_op_id == OpId::Max()) {
851
0
    LOG_WITH_PREFIX_UNLOCKED(INFO) << "Non lagging peer(s) not found.";
852
0
    new_op_id = queue_state_.all_replicated_op_id;
853
0
  }
854
855
34.7M
  if (queue_state_.all_nonlagging_replicated_op_id.index < new_op_id.index) {
856
4.50M
    queue_state_.all_nonlagging_replicated_op_id = new_op_id;
857
4.50M
  }
858
34.7M
}
859
860
HAS_MEMBER_FUNCTION(InfiniteWatermarkForLocalPeer);
861
862
template <class Policy, bool HasMemberFunction_InfiniteWatermarkForLocalPeer>
863
struct GetInfiniteWatermarkForLocalPeer;
864
865
template <class Policy>
866
struct GetInfiniteWatermarkForLocalPeer<Policy, true> {
867
1.68M
  static auto Apply() {
868
1.68M
    return Policy::InfiniteWatermarkForLocalPeer();
869
1.68M
  }
consensus_queue.cc:yb::consensus::GetInfiniteWatermarkForLocalPeer<yb::consensus::PeerMessageQueue::LeaderLeaseExpirationWatermark()::Policy, true>::Apply()
Line
Count
Source
867
844k
  static auto Apply() {
868
844k
    return Policy::InfiniteWatermarkForLocalPeer();
869
844k
  }
consensus_queue.cc:yb::consensus::GetInfiniteWatermarkForLocalPeer<yb::consensus::PeerMessageQueue::HybridTimeLeaseExpirationWatermark()::Policy, true>::Apply()
Line
Count
Source
867
844k
  static auto Apply() {
868
844k
    return Policy::InfiniteWatermarkForLocalPeer();
869
844k
  }
870
};
871
872
template <class Policy>
873
struct GetInfiniteWatermarkForLocalPeer<Policy, false> {
874
  // Should not be invoked, but have to define to make compiler happy.
875
0
  static typename Policy::result_type Apply() {
876
0
    LOG(DFATAL) << "Invoked Apply when InfiniteWatermarkForLocalPeer is not defined";
877
0
    return typename Policy::result_type();
878
0
  }
Unexecuted instantiation: consensus_queue.cc:yb::consensus::GetInfiniteWatermarkForLocalPeer<yb::consensus::PeerMessageQueue::NumSSTFilesWatermark()::Policy, false>::Apply()
Unexecuted instantiation: consensus_queue.cc:yb::consensus::GetInfiniteWatermarkForLocalPeer<yb::consensus::PeerMessageQueue::OpIdWatermark()::Policy, false>::Apply()
879
};
880
881
template <class Policy>
882
139M
typename Policy::result_type PeerMessageQueue::GetWatermark() {
883
139M
  DCHECK(queue_lock_.is_locked());
884
139M
  const auto num_peers_required = queue_state_.majority_size_;
885
139M
  if (num_peers_required == kUninitializedMajoritySize) {
886
    // We don't even know the quorum majority size yet.
887
0
    return Policy::NotEnoughPeersValue();
888
0
  }
889
139M
  CHECK_GE(num_peers_required, 0);
890
891
139M
  const ssize_t num_peers = peers_map_.size();
892
139M
  if (num_peers < num_peers_required) {
893
964
    return Policy::NotEnoughPeersValue();
894
964
  }
895
896
  // This flag indicates whether to implicitly assume that the local peer has an "infinite"
897
  // replicated value of the dimension that we are computing a watermark for. There is a difference
898
  // in logic between handling of OpIds vs. leader leases:
899
  // - For OpIds, the local peer might actually be less up-to-date than followers.
900
  // - For leader leases, we always assume that we've replicated an "infinite" lease to ourselves.
901
139M
  const bool local_peer_infinite_watermark =
902
139M
      HasMemberFunction_InfiniteWatermarkForLocalPeer<Policy>::value;
903
904
139M
  if (num_peers_required == 1 && 
local_peer_infinite_watermark3.37M
) {
905
    // We give "infinite lease" to ourselves.
906
1.68M
    return GetInfiniteWatermarkForLocalPeer<
907
1.68M
        Policy, HasMemberFunction_InfiniteWatermarkForLocalPeer<Policy>::value>::Apply();
908
1.68M
  }
909
910
137M
  constexpr size_t kMaxPracticalReplicationFactor = 5;
911
137M
  boost::container::small_vector<
912
137M
      typename Policy::result_type, kMaxPracticalReplicationFactor> watermarks;
913
137M
  watermarks.reserve(num_peers - 1 + !local_peer_infinite_watermark);
914
915
411M
  for (const PeersMap::value_type &peer_map_entry : peers_map_) {
916
411M
    const TrackedPeer &peer = *peer_map_entry.second;
917
411M
    if (local_peer_infinite_watermark && 
peer.uuid == local_peer_uuid_205M
) {
918
      // Don't even include the local peer in the watermarks array. Assume it has an "infinite"
919
      // value of the watermark.
920
67.8M
      continue;
921
67.8M
    }
922
344M
    if (!IsRaftConfigVoter(peer.uuid, *queue_state_.active_config)) {
923
      // Only votes from VOTERs in the active config should be taken into consideration
924
3.26M
      continue;
925
3.26M
    }
926
340M
    if (peer.is_last_exchange_successful) {
927
339M
      watermarks.push_back(Policy::ExtractValue(peer));
928
339M
    }
929
340M
  }
930
931
  // We always assume that local peer has most recent information.
932
137M
  const ssize_t num_responsive_peers = watermarks.size() + local_peer_infinite_watermark;
933
934
137M
  if (num_responsive_peers < num_peers_required) {
935
18.4E
    VLOG_WITH_PREFIX_UNLOCKED(2)
936
18.4E
        << Policy::Name() << " watermarks by peer: " << ::yb::ToString(watermarks)
937
18.4E
        << ", num_peers_required=" << num_peers_required
938
18.4E
        << ", num_responsive_peers=" << num_responsive_peers
939
18.4E
        << ", not enough responsive peers";
940
    // There are not enough peers with which the last message exchange was successful.
941
239k
    return Policy::NotEnoughPeersValue();
942
239k
  }
943
944
  // If there are 5 peers (and num_peers_required is 3), and we have successfully replicated
945
  // something to 3 of them and 4th is our local peer, there are two possibilities:
946
  // - If local_peer_infinite_watermark is false (for OpId): watermarks.size() is 4,
947
  //   and we want an OpId value such that 3 or more peers have replicated that or greater value.
948
  //   Then index_of_interest = 1, computed as watermarks.size() - num_peers_required, or
949
  //   num_responsive_peers - num_peers_required.
950
  //
951
  // - If local_peer_infinite_watermark is true (for leader leases): watermarks.size() is 3, and we
952
  //   are assuming that the local peer (leader) has replicated an infinitely high watermark to
953
  //   itself. Then watermark.size() is 3 (because we skip the local peer when populating
954
  //   watermarks), but num_responsive_peers is still 4, and the expression stays the same.
955
956
137M
  const size_t index_of_interest = num_responsive_peers - num_peers_required;
957
137M
  DCHECK_LT(index_of_interest, watermarks.size());
958
959
137M
  auto nth = watermarks.begin() + index_of_interest;
960
137M
  std::nth_element(watermarks.begin(), nth, watermarks.end(), typename Policy::Comparator());
961
137M
  
VLOG_WITH_PREFIX_UNLOCKED14.0k
(2)
962
14.0k
      << Policy::Name() << " watermarks by peer: " << ::yb::ToString(watermarks)
963
14.0k
      << ", num_peers_required=" << num_peers_required
964
14.0k
      << ", local_peer_infinite_watermark=" << local_peer_infinite_watermark
965
14.0k
      << ", watermark: " << yb::ToString(*nth);
966
967
137M
  return *nth;
968
137M
}
consensus_queue.cc:yb::consensus::PeerMessageQueue::LeaderLeaseExpirationWatermark()::Policy::result_type yb::consensus::PeerMessageQueue::GetWatermark<yb::consensus::PeerMessageQueue::LeaderLeaseExpirationWatermark()::Policy>()
Line
Count
Source
882
34.7M
typename Policy::result_type PeerMessageQueue::GetWatermark() {
883
34.7M
  DCHECK(queue_lock_.is_locked());
884
34.7M
  const auto num_peers_required = queue_state_.majority_size_;
885
34.7M
  if (num_peers_required == kUninitializedMajoritySize) {
886
    // We don't even know the quorum majority size yet.
887
0
    return Policy::NotEnoughPeersValue();
888
0
  }
889
34.7M
  CHECK_GE(num_peers_required, 0);
890
891
34.7M
  const ssize_t num_peers = peers_map_.size();
892
34.7M
  if (num_peers < num_peers_required) {
893
241
    return Policy::NotEnoughPeersValue();
894
241
  }
895
896
  // This flag indicates whether to implicitly assume that the local peer has an "infinite"
897
  // replicated value of the dimension that we are computing a watermark for. There is a difference
898
  // in logic between handling of OpIds vs. leader leases:
899
  // - For OpIds, the local peer might actually be less up-to-date than followers.
900
  // - For leader leases, we always assume that we've replicated an "infinite" lease to ourselves.
901
34.7M
  const bool local_peer_infinite_watermark =
902
34.7M
      HasMemberFunction_InfiniteWatermarkForLocalPeer<Policy>::value;
903
904
34.7M
  if (num_peers_required == 1 && 
local_peer_infinite_watermark844k
) {
905
    // We give "infinite lease" to ourselves.
906
844k
    return GetInfiniteWatermarkForLocalPeer<
907
844k
        Policy, HasMemberFunction_InfiniteWatermarkForLocalPeer<Policy>::value>::Apply();
908
844k
  }
909
910
33.9M
  constexpr size_t kMaxPracticalReplicationFactor = 5;
911
33.9M
  boost::container::small_vector<
912
33.9M
      typename Policy::result_type, kMaxPracticalReplicationFactor> watermarks;
913
33.9M
  watermarks.reserve(num_peers - 1 + !local_peer_infinite_watermark);
914
915
102M
  for (const PeersMap::value_type &peer_map_entry : peers_map_) {
916
102M
    const TrackedPeer &peer = *peer_map_entry.second;
917
102M
    if (local_peer_infinite_watermark && 
peer.uuid == local_peer_uuid_102M
) {
918
      // Don't even include the local peer in the watermarks array. Assume it has an "infinite"
919
      // value of the watermark.
920
33.9M
      continue;
921
33.9M
    }
922
68.6M
    if (!IsRaftConfigVoter(peer.uuid, *queue_state_.active_config)) {
923
      // Only votes from VOTERs in the active config should be taken into consideration
924
812k
      continue;
925
812k
    }
926
67.8M
    if (peer.is_last_exchange_successful) {
927
67.4M
      watermarks.push_back(Policy::ExtractValue(peer));
928
67.4M
    }
929
67.8M
  }
930
931
  // We always assume that local peer has most recent information.
932
33.9M
  const ssize_t num_responsive_peers = watermarks.size() + local_peer_infinite_watermark;
933
934
33.9M
  if (num_responsive_peers < num_peers_required) {
935
59.9k
    
VLOG_WITH_PREFIX_UNLOCKED44
(2)
936
44
        << Policy::Name() << " watermarks by peer: " << ::yb::ToString(watermarks)
937
44
        << ", num_peers_required=" << num_peers_required
938
44
        << ", num_responsive_peers=" << num_responsive_peers
939
44
        << ", not enough responsive peers";
940
    // There are not enough peers with which the last message exchange was successful.
941
59.9k
    return Policy::NotEnoughPeersValue();
942
59.9k
  }
943
944
  // If there are 5 peers (and num_peers_required is 3), and we have successfully replicated
945
  // something to 3 of them and 4th is our local peer, there are two possibilities:
946
  // - If local_peer_infinite_watermark is false (for OpId): watermarks.size() is 4,
947
  //   and we want an OpId value such that 3 or more peers have replicated that or greater value.
948
  //   Then index_of_interest = 1, computed as watermarks.size() - num_peers_required, or
949
  //   num_responsive_peers - num_peers_required.
950
  //
951
  // - If local_peer_infinite_watermark is true (for leader leases): watermarks.size() is 3, and we
952
  //   are assuming that the local peer (leader) has replicated an infinitely high watermark to
953
  //   itself. Then watermark.size() is 3 (because we skip the local peer when populating
954
  //   watermarks), but num_responsive_peers is still 4, and the expression stays the same.
955
956
33.8M
  const size_t index_of_interest = num_responsive_peers - num_peers_required;
957
33.8M
  DCHECK_LT(index_of_interest, watermarks.size());
958
959
33.8M
  auto nth = watermarks.begin() + index_of_interest;
960
33.8M
  std::nth_element(watermarks.begin(), nth, watermarks.end(), typename Policy::Comparator());
961
33.8M
  
VLOG_WITH_PREFIX_UNLOCKED17.0k
(2)
962
17.0k
      << Policy::Name() << " watermarks by peer: " << ::yb::ToString(watermarks)
963
17.0k
      << ", num_peers_required=" << num_peers_required
964
17.0k
      << ", local_peer_infinite_watermark=" << local_peer_infinite_watermark
965
17.0k
      << ", watermark: " << yb::ToString(*nth);
966
967
33.8M
  return *nth;
968
33.9M
}
consensus_queue.cc:yb::consensus::PeerMessageQueue::HybridTimeLeaseExpirationWatermark()::Policy::result_type yb::consensus::PeerMessageQueue::GetWatermark<yb::consensus::PeerMessageQueue::HybridTimeLeaseExpirationWatermark()::Policy>()
Line
Count
Source
882
34.7M
typename Policy::result_type PeerMessageQueue::GetWatermark() {
883
34.7M
  DCHECK(queue_lock_.is_locked());
884
34.7M
  const auto num_peers_required = queue_state_.majority_size_;
885
34.7M
  if (num_peers_required == kUninitializedMajoritySize) {
886
    // We don't even know the quorum majority size yet.
887
0
    return Policy::NotEnoughPeersValue();
888
0
  }
889
34.7M
  CHECK_GE(num_peers_required, 0);
890
891
34.7M
  const ssize_t num_peers = peers_map_.size();
892
34.7M
  if (num_peers < num_peers_required) {
893
241
    return Policy::NotEnoughPeersValue();
894
241
  }
895
896
  // This flag indicates whether to implicitly assume that the local peer has an "infinite"
897
  // replicated value of the dimension that we are computing a watermark for. There is a difference
898
  // in logic between handling of OpIds vs. leader leases:
899
  // - For OpIds, the local peer might actually be less up-to-date than followers.
900
  // - For leader leases, we always assume that we've replicated an "infinite" lease to ourselves.
901
34.7M
  const bool local_peer_infinite_watermark =
902
34.7M
      HasMemberFunction_InfiniteWatermarkForLocalPeer<Policy>::value;
903
904
34.7M
  if (num_peers_required == 1 && 
local_peer_infinite_watermark844k
) {
905
    // We give "infinite lease" to ourselves.
906
844k
    return GetInfiniteWatermarkForLocalPeer<
907
844k
        Policy, HasMemberFunction_InfiniteWatermarkForLocalPeer<Policy>::value>::Apply();
908
844k
  }
909
910
33.9M
  constexpr size_t kMaxPracticalReplicationFactor = 5;
911
33.9M
  boost::container::small_vector<
912
33.9M
      typename Policy::result_type, kMaxPracticalReplicationFactor> watermarks;
913
33.9M
  watermarks.reserve(num_peers - 1 + !local_peer_infinite_watermark);
914
915
102M
  for (const PeersMap::value_type &peer_map_entry : peers_map_) {
916
102M
    const TrackedPeer &peer = *peer_map_entry.second;
917
102M
    if (local_peer_infinite_watermark && 
peer.uuid == local_peer_uuid_102M
) {
918
      // Don't even include the local peer in the watermarks array. Assume it has an "infinite"
919
      // value of the watermark.
920
33.9M
      continue;
921
33.9M
    }
922
68.6M
    if (!IsRaftConfigVoter(peer.uuid, *queue_state_.active_config)) {
923
      // Only votes from VOTERs in the active config should be taken into consideration
924
812k
      continue;
925
812k
    }
926
67.8M
    if (peer.is_last_exchange_successful) {
927
67.4M
      watermarks.push_back(Policy::ExtractValue(peer));
928
67.4M
    }
929
67.8M
  }
930
931
  // We always assume that local peer has most recent information.
932
33.9M
  const ssize_t num_responsive_peers = watermarks.size() + local_peer_infinite_watermark;
933
934
33.9M
  if (num_responsive_peers < num_peers_required) {
935
59.9k
    
VLOG_WITH_PREFIX_UNLOCKED28
(2)
936
28
        << Policy::Name() << " watermarks by peer: " << ::yb::ToString(watermarks)
937
28
        << ", num_peers_required=" << num_peers_required
938
28
        << ", num_responsive_peers=" << num_responsive_peers
939
28
        << ", not enough responsive peers";
940
    // There are not enough peers with which the last message exchange was successful.
941
59.9k
    return Policy::NotEnoughPeersValue();
942
59.9k
  }
943
944
  // If there are 5 peers (and num_peers_required is 3), and we have successfully replicated
945
  // something to 3 of them and 4th is our local peer, there are two possibilities:
946
  // - If local_peer_infinite_watermark is false (for OpId): watermarks.size() is 4,
947
  //   and we want an OpId value such that 3 or more peers have replicated that or greater value.
948
  //   Then index_of_interest = 1, computed as watermarks.size() - num_peers_required, or
949
  //   num_responsive_peers - num_peers_required.
950
  //
951
  // - If local_peer_infinite_watermark is true (for leader leases): watermarks.size() is 3, and we
952
  //   are assuming that the local peer (leader) has replicated an infinitely high watermark to
953
  //   itself. Then watermark.size() is 3 (because we skip the local peer when populating
954
  //   watermarks), but num_responsive_peers is still 4, and the expression stays the same.
955
956
33.8M
  const size_t index_of_interest = num_responsive_peers - num_peers_required;
957
33.8M
  DCHECK_LT(index_of_interest, watermarks.size());
958
959
33.8M
  auto nth = watermarks.begin() + index_of_interest;
960
33.8M
  std::nth_element(watermarks.begin(), nth, watermarks.end(), typename Policy::Comparator());
961
18.4E
  VLOG_WITH_PREFIX_UNLOCKED(2)
962
18.4E
      << Policy::Name() << " watermarks by peer: " << ::yb::ToString(watermarks)
963
18.4E
      << ", num_peers_required=" << num_peers_required
964
18.4E
      << ", local_peer_infinite_watermark=" << local_peer_infinite_watermark
965
18.4E
      << ", watermark: " << yb::ToString(*nth);
966
967
33.8M
  return *nth;
968
33.9M
}
consensus_queue.cc:yb::consensus::PeerMessageQueue::NumSSTFilesWatermark()::Policy::result_type yb::consensus::PeerMessageQueue::GetWatermark<yb::consensus::PeerMessageQueue::NumSSTFilesWatermark()::Policy>()
Line
Count
Source
882
34.7M
typename Policy::result_type PeerMessageQueue::GetWatermark() {
883
34.7M
  DCHECK(queue_lock_.is_locked());
884
34.7M
  const auto num_peers_required = queue_state_.majority_size_;
885
34.7M
  if (num_peers_required == kUninitializedMajoritySize) {
886
    // We don't even know the quorum majority size yet.
887
0
    return Policy::NotEnoughPeersValue();
888
0
  }
889
34.7M
  CHECK_GE(num_peers_required, 0);
890
891
34.7M
  const ssize_t num_peers = peers_map_.size();
892
34.7M
  if (num_peers < num_peers_required) {
893
241
    return Policy::NotEnoughPeersValue();
894
241
  }
895
896
  // This flag indicates whether to implicitly assume that the local peer has an "infinite"
897
  // replicated value of the dimension that we are computing a watermark for. There is a difference
898
  // in logic between handling of OpIds vs. leader leases:
899
  // - For OpIds, the local peer might actually be less up-to-date than followers.
900
  // - For leader leases, we always assume that we've replicated an "infinite" lease to ourselves.
901
34.7M
  const bool local_peer_infinite_watermark =
902
34.7M
      HasMemberFunction_InfiniteWatermarkForLocalPeer<Policy>::value;
903
904
34.7M
  if (num_peers_required == 1 && 
local_peer_infinite_watermark845k
) {
905
    // We give "infinite lease" to ourselves.
906
0
    return GetInfiniteWatermarkForLocalPeer<
907
0
        Policy, HasMemberFunction_InfiniteWatermarkForLocalPeer<Policy>::value>::Apply();
908
0
  }
909
910
34.7M
  constexpr size_t kMaxPracticalReplicationFactor = 5;
911
34.7M
  boost::container::small_vector<
912
34.7M
      typename Policy::result_type, kMaxPracticalReplicationFactor> watermarks;
913
34.7M
  watermarks.reserve(num_peers - 1 + !local_peer_infinite_watermark);
914
915
103M
  for (const PeersMap::value_type &peer_map_entry : peers_map_) {
916
103M
    const TrackedPeer &peer = *peer_map_entry.second;
917
103M
    if (local_peer_infinite_watermark && 
peer.uuid == local_peer_uuid_0
) {
918
      // Don't even include the local peer in the watermarks array. Assume it has an "infinite"
919
      // value of the watermark.
920
0
      continue;
921
0
    }
922
103M
    if (!IsRaftConfigVoter(peer.uuid, *queue_state_.active_config)) {
923
      // Only votes from VOTERs in the active config should be taken into consideration
924
822k
      continue;
925
822k
    }
926
102M
    if (peer.is_last_exchange_successful) {
927
102M
      watermarks.push_back(Policy::ExtractValue(peer));
928
102M
    }
929
102M
  }
930
931
  // We always assume that local peer has most recent information.
932
34.7M
  const ssize_t num_responsive_peers = watermarks.size() + local_peer_infinite_watermark;
933
934
34.7M
  if (num_responsive_peers < num_peers_required) {
935
18.4E
    VLOG_WITH_PREFIX_UNLOCKED(2)
936
18.4E
        << Policy::Name() << " watermarks by peer: " << ::yb::ToString(watermarks)
937
18.4E
        << ", num_peers_required=" << num_peers_required
938
18.4E
        << ", num_responsive_peers=" << num_responsive_peers
939
18.4E
        << ", not enough responsive peers";
940
    // There are not enough peers with which the last message exchange was successful.
941
60.0k
    return Policy::NotEnoughPeersValue();
942
60.0k
  }
943
944
  // If there are 5 peers (and num_peers_required is 3), and we have successfully replicated
945
  // something to 3 of them and 4th is our local peer, there are two possibilities:
946
  // - If local_peer_infinite_watermark is false (for OpId): watermarks.size() is 4,
947
  //   and we want an OpId value such that 3 or more peers have replicated that or greater value.
948
  //   Then index_of_interest = 1, computed as watermarks.size() - num_peers_required, or
949
  //   num_responsive_peers - num_peers_required.
950
  //
951
  // - If local_peer_infinite_watermark is true (for leader leases): watermarks.size() is 3, and we
952
  //   are assuming that the local peer (leader) has replicated an infinitely high watermark to
953
  //   itself. Then watermark.size() is 3 (because we skip the local peer when populating
954
  //   watermarks), but num_responsive_peers is still 4, and the expression stays the same.
955
956
34.6M
  const size_t index_of_interest = num_responsive_peers - num_peers_required;
957
34.6M
  DCHECK_LT(index_of_interest, watermarks.size());
958
959
34.6M
  auto nth = watermarks.begin() + index_of_interest;
960
34.6M
  std::nth_element(watermarks.begin(), nth, watermarks.end(), typename Policy::Comparator());
961
34.6M
  
VLOG_WITH_PREFIX_UNLOCKED30
(2)
962
30
      << Policy::Name() << " watermarks by peer: " << ::yb::ToString(watermarks)
963
30
      << ", num_peers_required=" << num_peers_required
964
30
      << ", local_peer_infinite_watermark=" << local_peer_infinite_watermark
965
30
      << ", watermark: " << yb::ToString(*nth);
966
967
34.6M
  return *nth;
968
34.7M
}
consensus_queue.cc:yb::consensus::PeerMessageQueue::OpIdWatermark()::Policy::result_type yb::consensus::PeerMessageQueue::GetWatermark<yb::consensus::PeerMessageQueue::OpIdWatermark()::Policy>()
Line
Count
Source
882
34.7M
typename Policy::result_type PeerMessageQueue::GetWatermark() {
883
34.7M
  DCHECK(queue_lock_.is_locked());
884
34.7M
  const auto num_peers_required = queue_state_.majority_size_;
885
34.7M
  if (num_peers_required == kUninitializedMajoritySize) {
886
    // We don't even know the quorum majority size yet.
887
0
    return Policy::NotEnoughPeersValue();
888
0
  }
889
34.7M
  CHECK_GE(num_peers_required, 0);
890
891
34.7M
  const ssize_t num_peers = peers_map_.size();
892
34.7M
  if (num_peers < num_peers_required) {
893
241
    return Policy::NotEnoughPeersValue();
894
241
  }
895
896
  // This flag indicates whether to implicitly assume that the local peer has an "infinite"
897
  // replicated value of the dimension that we are computing a watermark for. There is a difference
898
  // in logic between handling of OpIds vs. leader leases:
899
  // - For OpIds, the local peer might actually be less up-to-date than followers.
900
  // - For leader leases, we always assume that we've replicated an "infinite" lease to ourselves.
901
34.7M
  const bool local_peer_infinite_watermark =
902
34.7M
      HasMemberFunction_InfiniteWatermarkForLocalPeer<Policy>::value;
903
904
34.7M
  if (num_peers_required == 1 && 
local_peer_infinite_watermark844k
) {
905
    // We give "infinite lease" to ourselves.
906
0
    return GetInfiniteWatermarkForLocalPeer<
907
0
        Policy, HasMemberFunction_InfiniteWatermarkForLocalPeer<Policy>::value>::Apply();
908
0
  }
909
910
34.7M
  constexpr size_t kMaxPracticalReplicationFactor = 5;
911
34.7M
  boost::container::small_vector<
912
34.7M
      typename Policy::result_type, kMaxPracticalReplicationFactor> watermarks;
913
34.7M
  watermarks.reserve(num_peers - 1 + !local_peer_infinite_watermark);
914
915
103M
  for (const PeersMap::value_type &peer_map_entry : peers_map_) {
916
103M
    const TrackedPeer &peer = *peer_map_entry.second;
917
103M
    if (local_peer_infinite_watermark && 
peer.uuid == local_peer_uuid_0
) {
918
      // Don't even include the local peer in the watermarks array. Assume it has an "infinite"
919
      // value of the watermark.
920
0
      continue;
921
0
    }
922
103M
    if (!IsRaftConfigVoter(peer.uuid, *queue_state_.active_config)) {
923
      // Only votes from VOTERs in the active config should be taken into consideration
924
821k
      continue;
925
821k
    }
926
102M
    if (peer.is_last_exchange_successful) {
927
102M
      watermarks.push_back(Policy::ExtractValue(peer));
928
102M
    }
929
102M
  }
930
931
  // We always assume that local peer has most recent information.
932
34.7M
  const ssize_t num_responsive_peers = watermarks.size() + local_peer_infinite_watermark;
933
934
34.7M
  if (num_responsive_peers < num_peers_required) {
935
18.4E
    VLOG_WITH_PREFIX_UNLOCKED(2)
936
18.4E
        << Policy::Name() << " watermarks by peer: " << ::yb::ToString(watermarks)
937
18.4E
        << ", num_peers_required=" << num_peers_required
938
18.4E
        << ", num_responsive_peers=" << num_responsive_peers
939
18.4E
        << ", not enough responsive peers";
940
    // There are not enough peers with which the last message exchange was successful.
941
59.9k
    return Policy::NotEnoughPeersValue();
942
59.9k
  }
943
944
  // If there are 5 peers (and num_peers_required is 3), and we have successfully replicated
945
  // something to 3 of them and 4th is our local peer, there are two possibilities:
946
  // - If local_peer_infinite_watermark is false (for OpId): watermarks.size() is 4,
947
  //   and we want an OpId value such that 3 or more peers have replicated that or greater value.
948
  //   Then index_of_interest = 1, computed as watermarks.size() - num_peers_required, or
949
  //   num_responsive_peers - num_peers_required.
950
  //
951
  // - If local_peer_infinite_watermark is true (for leader leases): watermarks.size() is 3, and we
952
  //   are assuming that the local peer (leader) has replicated an infinitely high watermark to
953
  //   itself. Then watermark.size() is 3 (because we skip the local peer when populating
954
  //   watermarks), but num_responsive_peers is still 4, and the expression stays the same.
955
956
34.6M
  const size_t index_of_interest = num_responsive_peers - num_peers_required;
957
34.6M
  DCHECK_LT(index_of_interest, watermarks.size());
958
959
34.6M
  auto nth = watermarks.begin() + index_of_interest;
960
34.6M
  std::nth_element(watermarks.begin(), nth, watermarks.end(), typename Policy::Comparator());
961
34.6M
  
VLOG_WITH_PREFIX_UNLOCKED6.48k
(2)
962
6.48k
      << Policy::Name() << " watermarks by peer: " << ::yb::ToString(watermarks)
963
6.48k
      << ", num_peers_required=" << num_peers_required
964
6.48k
      << ", local_peer_infinite_watermark=" << local_peer_infinite_watermark
965
6.48k
      << ", watermark: " << yb::ToString(*nth);
966
967
34.6M
  return *nth;
968
34.7M
}
969
970
34.7M
CoarseTimePoint PeerMessageQueue::LeaderLeaseExpirationWatermark() {
971
34.7M
  struct Policy {
972
34.7M
    typedef CoarseTimePoint result_type;
973
    // Workaround for a gcc bug. That does not understand that Comparator is actually being used.
974
34.7M
    __attribute__((unused)) typedef std::less<result_type> Comparator;
975
976
34.7M
    static result_type NotEnoughPeersValue() {
977
60.1k
      return result_type::min();
978
60.1k
    }
979
980
34.7M
    static result_type InfiniteWatermarkForLocalPeer() {
981
844k
      return result_type::max();
982
844k
    }
983
984
67.4M
    static result_type ExtractValue(const TrackedPeer& peer) {
985
67.4M
      auto lease_exp = peer.leader_lease_expiration.last_received;
986
67.4M
      return lease_exp != CoarseTimePoint() ? 
lease_exp67.4M
:
CoarseTimePoint::min()38.3k
;
987
67.4M
    }
988
989
34.7M
    static const char* Name() {
990
1
      return "Leader lease expiration";
991
1
    }
992
34.7M
  };
993
994
34.7M
  return GetWatermark<Policy>();
995
34.7M
}
996
997
34.7M
MicrosTime PeerMessageQueue::HybridTimeLeaseExpirationWatermark() {
998
34.7M
  struct Policy {
999
34.7M
    typedef MicrosTime result_type;
1000
    // Workaround for a gcc bug. That does not understand that Comparator is actually being used.
1001
34.7M
    __attribute__((unused)) typedef std::less<result_type> Comparator;
1002
1003
34.7M
    static result_type NotEnoughPeersValue() {
1004
60.1k
      return HybridTime::kMin.GetPhysicalValueMicros();
1005
60.1k
    }
1006
1007
34.7M
    static result_type InfiniteWatermarkForLocalPeer() {
1008
844k
      return HybridTime::kMax.GetPhysicalValueMicros();
1009
844k
    }
1010
1011
67.4M
    static result_type ExtractValue(const TrackedPeer& peer) {
1012
67.4M
      return peer.leader_ht_lease_expiration.last_received;
1013
67.4M
    }
1014
1015
34.7M
    static const char* Name() {
1016
1
      return "Hybrid time leader lease expiration";
1017
1
    }
1018
34.7M
  };
1019
1020
34.7M
  return GetWatermark<Policy>();
1021
34.7M
}
1022
1023
34.7M
uint64_t PeerMessageQueue::NumSSTFilesWatermark() {
1024
34.7M
  struct Policy {
1025
34.7M
    typedef uint64_t result_type;
1026
    // Workaround for a gcc bug. That does not understand that Comparator is actually being used.
1027
34.7M
    __attribute__((unused)) typedef std::greater<result_type> Comparator;
1028
1029
34.7M
    static result_type NotEnoughPeersValue() {
1030
60.3k
      return 0;
1031
60.3k
    }
1032
1033
102M
    static result_type ExtractValue(const TrackedPeer& peer) {
1034
102M
      return peer.num_sst_files;
1035
102M
    }
1036
1037
34.7M
    static const char* Name() {
1038
1
      return "Num SST files";
1039
1
    }
1040
34.7M
  };
1041
1042
34.7M
  auto watermark = GetWatermark<Policy>();
1043
34.7M
  return std::max(watermark, local_peer_->num_sst_files);
1044
34.7M
}
1045
1046
34.7M
OpId PeerMessageQueue::OpIdWatermark() {
1047
34.7M
  struct Policy {
1048
34.7M
    typedef OpId result_type;
1049
1050
34.7M
    static result_type NotEnoughPeersValue() {
1051
60.3k
      return OpId::Min();
1052
60.3k
    }
1053
1054
102M
    static result_type ExtractValue(const TrackedPeer& peer) {
1055
102M
      return peer.last_received;
1056
102M
    }
1057
1058
34.7M
    struct Comparator {
1059
73.8M
      bool operator()(const OpId& lhs, const OpId& rhs) {
1060
73.8M
        return lhs.index < rhs.index;
1061
73.8M
      }
1062
34.7M
    };
1063
1064
34.7M
    static const char* Name() {
1065
1
      return "OpId";
1066
1
    }
1067
34.7M
  };
1068
1069
34.7M
  return GetWatermark<Policy>();
1070
34.7M
}
1071
1072
15.7k
void PeerMessageQueue::NotifyPeerIsResponsiveDespiteError(const std::string& peer_uuid) {
1073
15.7k
  LockGuard l(queue_lock_);
1074
15.7k
  TrackedPeer* peer = FindPtrOrNull(peers_map_, peer_uuid);
1075
15.7k
  if (!peer) 
return0
;
1076
15.7k
  peer->last_successful_communication_time = MonoTime::Now();
1077
15.7k
}
1078
1079
23.2k
void PeerMessageQueue::RequestWasNotSent(const std::string& peer_uuid) {
1080
23.2k
  LockGuard scoped_lock(queue_lock_);
1081
23.2k
  DCHECK_NE(State::kQueueConstructed, queue_state_.state);
1082
1083
23.2k
  TrackedPeer* peer = FindPtrOrNull(peers_map_, peer_uuid);
1084
23.2k
  if (PREDICT_FALSE(queue_state_.state != State::kQueueOpen || peer == nullptr)) {
1085
0
    LOG_WITH_PREFIX_UNLOCKED(WARNING) << "Queue is closed or peer was untracked.";
1086
0
    return;
1087
0
  }
1088
1089
23.2k
  peer->ResetLastRequest();
1090
23.2k
}
1091
1092
1093
bool PeerMessageQueue::ResponseFromPeer(const std::string& peer_uuid,
1094
34.8M
                                        const ConsensusResponsePB& response) {
1095
34.8M
  DCHECK(response.IsInitialized()) << "Error: Uninitialized: "
1096
4.53k
      << response.InitializationErrorString() << ". Response: " << response.ShortDebugString();
1097
1098
34.8M
  MajorityReplicatedData majority_replicated;
1099
34.8M
  Mode mode_copy;
1100
34.8M
  bool result = false;
1101
34.8M
  {
1102
34.8M
    LockGuard scoped_lock(queue_lock_);
1103
34.8M
    DCHECK_NE(State::kQueueConstructed, queue_state_.state);
1104
1105
34.8M
    TrackedPeer* peer = FindPtrOrNull(peers_map_, peer_uuid);
1106
34.8M
    if (PREDICT_FALSE(queue_state_.state != State::kQueueOpen || peer == nullptr)) {
1107
3
      LOG_WITH_PREFIX_UNLOCKED(WARNING) << "Queue is closed or peer was untracked, disregarding "
1108
3
          "peer response. Response: " << response.ShortDebugString();
1109
3
      return false;
1110
3
    }
1111
1112
    // Remotely bootstrap the peer if the tablet is not found or deleted.
1113
34.8M
    if (response.has_error()) {
1114
      // We only let special types of errors through to this point from the peer.
1115
10.6k
      CHECK_EQ(tserver::TabletServerErrorPB::TABLET_NOT_FOUND, response.error().code())
1116
0
          << response.ShortDebugString();
1117
1118
10.6k
      peer->needs_remote_bootstrap = true;
1119
      // Since we received a response from the peer, we know it is alive. So we need to update
1120
      // peer->last_successful_communication_time, otherwise, we will remove this peer from the
1121
      // configuration if the remote bootstrap is not completed within
1122
      // FLAGS_follower_unavailable_considered_failed_sec seconds.
1123
10.6k
      peer->last_successful_communication_time = MonoTime::Now();
1124
10.6k
      
YB_LOG_WITH_PREFIX_UNLOCKED_EVERY_N_SECS485
(INFO, 30)
1125
485
          << "Marked peer as needing remote bootstrap: " << peer->ToString();
1126
10.6k
      return true;
1127
10.6k
    }
1128
1129
34.8M
    
if (34.8M
queue_state_.active_config34.8M
) {
1130
34.8M
      RaftPeerPB peer_pb;
1131
34.8M
      if (!GetRaftConfigMember(*queue_state_.active_config, peer_uuid, &peer_pb).ok()) {
1132
0
        LOG(FATAL) << "Peer " << peer_uuid << " not in active config";
1133
0
      }
1134
34.8M
      peer->member_type = peer_pb.member_type();
1135
18.4E
    } else {
1136
18.4E
      peer->member_type = PeerMemberType::UNKNOWN_MEMBER_TYPE;
1137
18.4E
    }
1138
1139
    // Application level errors should be handled elsewhere
1140
34.8M
    DCHECK(!response.has_error());
1141
1142
    // Take a snapshot of the current peer status.
1143
34.8M
    TrackedPeer previous = *peer;
1144
1145
    // Update the peer status based on the response.
1146
34.8M
    peer->is_new = false;
1147
34.8M
    peer->last_successful_communication_time = MonoTime::Now();
1148
1149
34.8M
    peer->ResetLastRequest();
1150
1151
34.8M
    if (response.has_status()) {
1152
34.8M
      const auto& status = response.status();
1153
      // Sanity checks.  Some of these can be eventually removed, but they are handy for now.
1154
34.8M
      DCHECK(status.IsInitialized()) << "Error: Uninitialized: "
1155
141
                                                << response.InitializationErrorString()
1156
141
                                                << ". Response: " << response.ShortDebugString();
1157
      // The status must always have a last received op id and a last committed index.
1158
34.8M
      DCHECK(status.has_last_received());
1159
34.8M
      DCHECK(status.has_last_received_current_leader());
1160
34.8M
      DCHECK(status.has_last_committed_idx());
1161
1162
34.8M
      peer->last_known_committed_idx = status.last_committed_idx();
1163
34.8M
      peer->last_applied = OpId::FromPB(status.last_applied());
1164
1165
      // If the reported last-received op for the replica is in our local log, then resume sending
1166
      // entries from that point onward. Otherwise, resume after the last op they received from us.
1167
      // If we've never successfully sent them anything, start after the last-committed op in their
1168
      // log, which is guaranteed by the Raft protocol to be a valid op.
1169
1170
34.8M
      bool peer_has_prefix_of_log = IsOpInLog(yb::OpId::FromPB(status.last_received()));
1171
34.8M
      if (
peer_has_prefix_of_log34.8M
) {
1172
        // If the latest thing in their log is in our log, we are in sync.
1173
34.8M
        peer->last_received = OpId::FromPB(status.last_received());
1174
34.8M
        peer->next_index = peer->last_received.index + 1;
1175
1176
18.4E
      } else if (!OpIdEquals(status.last_received_current_leader(), MinimumOpId())) {
1177
        // Their log may have diverged from ours, however we are in the process of replicating our
1178
        // ops to them, so continue doing so. Eventually, we will cause the divergent entry in their
1179
        // log to be overwritten.
1180
53
        peer->last_received = OpId::FromPB(status.last_received_current_leader());
1181
53
        peer->next_index = peer->last_received.index + 1;
1182
18.4E
      } else {
1183
        // The peer is divergent and they have not (successfully) received anything from us yet.
1184
        // Start sending from their last committed index.  This logic differs from the Raft spec
1185
        // slightly because instead of stepping back one-by-one from the end until we no longer have
1186
        // an LMP error, we jump back to the last committed op indicated by the peer with the hope
1187
        // that doing so will result in a faster catch-up process.
1188
18.4E
        DCHECK_GE(peer->last_known_committed_idx, 0);
1189
18.4E
        peer->next_index = peer->last_known_committed_idx + 1;
1190
18.4E
      }
1191
1192
34.8M
      if (PREDICT_FALSE(status.has_error())) {
1193
121k
        peer->is_last_exchange_successful = false;
1194
121k
        switch (status.error().code()) {
1195
120k
          case ConsensusErrorPB::PRECEDING_ENTRY_DIDNT_MATCH: {
1196
120k
            DCHECK(status.has_last_received());
1197
120k
            if (previous.is_new) {
1198
              // That's currently how we can detect that we able to connect to a peer.
1199
120k
              LOG_WITH_PREFIX_UNLOCKED(INFO) << "Connected to new peer: " << peer->ToString();
1200
120k
            } else {
1201
47
              LOG_WITH_PREFIX_UNLOCKED(INFO) << "Got LMP mismatch error from peer: "
1202
47
                                             << peer->ToString();
1203
47
              CHECK(!FLAGS_TEST_disallow_lmp_failures);
1204
47
            }
1205
120k
            return true;
1206
0
          }
1207
877
          case ConsensusErrorPB::INVALID_TERM: {
1208
877
            CHECK(response.has_responder_term());
1209
877
            LOG_WITH_PREFIX_UNLOCKED(INFO) << "Peer responded invalid term: " << peer->ToString()
1210
877
                                           << ". Peer's new term: " << response.responder_term();
1211
877
            NotifyObserversOfTermChange(response.responder_term());
1212
877
            return false;
1213
0
          }
1214
0
          default: {
1215
0
            LOG_WITH_PREFIX_UNLOCKED(FATAL) << "Unexpected consensus error. Code: "
1216
0
                << ConsensusErrorPB::Code_Name(status.error().code()) << ". Response: "
1217
0
                << response.ShortDebugString();
1218
0
          }
1219
121k
        }
1220
121k
      }
1221
34.8M
    }
1222
1223
34.7M
    peer->is_last_exchange_successful = true;
1224
34.7M
    peer->num_sst_files = response.num_sst_files();
1225
1226
34.7M
    if (response.has_responder_term()) {
1227
      // The peer must have responded with a term that is greater than or equal to the last known
1228
      // term for that peer.
1229
25.4M
      peer->CheckMonotonicTerms(response.responder_term());
1230
1231
      // If the responder didn't send an error back that must mean that it has a term that is the
1232
      // same or lower than ours.
1233
25.4M
      CHECK_LE(response.responder_term(), queue_state_.current_term);
1234
25.4M
    }
1235
1236
34.7M
    if (PREDICT_FALSE(VLOG_IS_ON(2))) {
1237
1
      
VLOG_WITH_PREFIX_UNLOCKED0
(2) << "Received Response from Peer (" << peer->ToString() << "). "
1238
0
          << "Response: " << response.ShortDebugString();
1239
1
    }
1240
1241
    // If our log has the next request for the peer or if the peer's committed index is lower than
1242
    // our own, set 'more_pending' to true.
1243
34.7M
    result = log_cache_.HasOpBeenWritten(peer->next_index) ||
1244
34.7M
        
(peer->last_known_committed_idx < queue_state_.committed_op_id.index)32.4M
;
1245
1246
34.7M
    mode_copy = queue_state_.mode;
1247
34.7M
    if (
mode_copy == Mode::LEADER34.7M
) {
1248
34.7M
      auto new_majority_replicated_opid = OpIdWatermark();
1249
34.7M
      if (new_majority_replicated_opid != OpId::Min()) {
1250
34.6M
        if (new_majority_replicated_opid.index == MaximumOpId().index()) {
1251
0
          queue_state_.majority_replicated_op_id = local_peer_->last_received;
1252
34.6M
        } else {
1253
34.6M
          queue_state_.majority_replicated_op_id = new_majority_replicated_opid;
1254
34.6M
        }
1255
34.6M
      }
1256
1257
34.7M
      peer->leader_lease_expiration.OnReplyFromFollower();
1258
34.7M
      peer->leader_ht_lease_expiration.OnReplyFromFollower();
1259
1260
34.7M
      majority_replicated.op_id = queue_state_.majority_replicated_op_id;
1261
34.7M
      majority_replicated.leader_lease_expiration = LeaderLeaseExpirationWatermark();
1262
34.7M
      majority_replicated.ht_lease_expiration = HybridTimeLeaseExpirationWatermark();
1263
34.7M
      majority_replicated.num_sst_files = NumSSTFilesWatermark();
1264
34.7M
      if (peer->last_received == queue_state_.last_applied_op_id) {
1265
23.3M
        majority_replicated.peer_got_all_ops = peer->uuid;
1266
23.3M
      }
1267
34.7M
    }
1268
1269
34.7M
    UpdateAllReplicatedOpId(&queue_state_.all_replicated_op_id);
1270
34.7M
    UpdateAllAppliedOpId(&queue_state_.all_applied_op_id);
1271
1272
34.7M
    auto evict_index = GetCDCConsumerOpIdToEvict().index;
1273
1274
34.7M
    int32_t lagging_follower_threshold = FLAGS_consensus_lagging_follower_threshold;
1275
34.7M
    if (lagging_follower_threshold > 0) {
1276
34.7M
      UpdateAllNonLaggingReplicatedOpId(lagging_follower_threshold);
1277
34.7M
      evict_index = std::min(evict_index, queue_state_.all_nonlagging_replicated_op_id.index);
1278
34.7M
    } else {
1279
299
      evict_index = std::min(evict_index, queue_state_.all_replicated_op_id.index);
1280
299
    }
1281
1282
34.7M
    log_cache_.EvictThroughOp(evict_index);
1283
1284
34.7M
    UpdateMetrics();
1285
34.7M
  }
1286
1287
34.7M
  if (
mode_copy == Mode::LEADER34.7M
) {
1288
34.7M
    NotifyObserversOfMajorityReplOpChange(majority_replicated);
1289
34.7M
  }
1290
1291
34.7M
  return result;
1292
34.8M
}
1293
1294
10
PeerMessageQueue::TrackedPeer PeerMessageQueue::GetTrackedPeerForTests(string uuid) {
1295
10
  LockGuard scoped_lock(queue_lock_);
1296
10
  TrackedPeer* tracked = FindOrDie(peers_map_, uuid);
1297
10
  return *tracked;
1298
10
}
1299
1300
13
OpId PeerMessageQueue::TEST_GetAllReplicatedIndex() const {
1301
13
  LockGuard lock(queue_lock_);
1302
13
  return queue_state_.all_replicated_op_id;
1303
13
}
1304
1305
65
OpId PeerMessageQueue::GetAllAppliedOpId() const {
1306
65
  LockGuard lock(queue_lock_);
1307
65
  return queue_state_.all_applied_op_id;
1308
65
}
1309
1310
5
OpId PeerMessageQueue::TEST_GetCommittedIndex() const {
1311
5
  LockGuard lock(queue_lock_);
1312
5
  return queue_state_.committed_op_id;
1313
5
}
1314
1315
11
OpId PeerMessageQueue::TEST_GetMajorityReplicatedOpId() const {
1316
11
  LockGuard lock(queue_lock_);
1317
11
  return queue_state_.majority_replicated_op_id;
1318
11
}
1319
1320
2
OpId PeerMessageQueue::TEST_GetLastAppended() const {
1321
2
  LockGuard lock(queue_lock_);
1322
2
  return queue_state_.last_appended;
1323
2
}
1324
1325
16
OpId PeerMessageQueue::TEST_GetLastAppliedOpId() const {
1326
16
  LockGuard lock(queue_lock_);
1327
16
  return queue_state_.last_applied_op_id;
1328
16
}
1329
1330
63.0M
void PeerMessageQueue::UpdateMetrics() {
1331
  // Since operations have consecutive indices we can update the metrics based on simple index math.
1332
63.0M
  metrics_.num_majority_done_ops->set_value(
1333
63.0M
      queue_state_.committed_op_id.index - queue_state_.all_replicated_op_id.index);
1334
63.0M
  metrics_.num_in_progress_ops->set_value(
1335
63.0M
      queue_state_.last_appended.index - queue_state_.committed_op_id.index);
1336
63.0M
}
1337
1338
38
void PeerMessageQueue::DumpToHtml(std::ostream& out) const {
1339
38
  using std::endl;
1340
1341
38
  LockGuard lock(queue_lock_);
1342
38
  out << "<h3>Watermarks</h3>" << endl;
1343
38
  out << "<table>" << endl;;
1344
38
  out << "  <tr><th>Peer</th><th>Watermark</th></tr>" << endl;
1345
101
  for (const PeersMap::value_type& entry : peers_map_) {
1346
101
    out << Substitute("  <tr><td>$0</td><td>$1</td></tr>",
1347
101
                      EscapeForHtmlToString(entry.first),
1348
101
                      EscapeForHtmlToString(entry.second->ToString())) << endl;
1349
101
  }
1350
38
  out << "</table>" << endl;
1351
1352
38
  log_cache_.DumpToHtml(out);
1353
38
}
1354
1355
151k
void PeerMessageQueue::ClearUnlocked() {
1356
151k
  STLDeleteValues(&peers_map_);
1357
151k
  queue_state_.state = State::kQueueClosed;
1358
151k
}
1359
1360
151k
void PeerMessageQueue::Close() {
1361
151k
  if (installed_num_sst_files_changed_listener_) {
1362
75.6k
    context_->ListenNumSSTFilesChanged(std::function<void()>());
1363
75.6k
    installed_num_sst_files_changed_listener_ = false;
1364
75.6k
  }
1365
151k
  raft_pool_observers_token_->Shutdown();
1366
151k
  LockGuard lock(queue_lock_);
1367
151k
  ClearUnlocked();
1368
151k
}
1369
1370
76
string PeerMessageQueue::ToString() const {
1371
  // Even though metrics are thread-safe obtain the lock so that we get a "consistent" snapshot of
1372
  // the metrics.
1373
76
  LockGuard lock(queue_lock_);
1374
76
  return ToStringUnlocked();
1375
76
}
1376
1377
76
string PeerMessageQueue::ToStringUnlocked() const {
1378
76
  return Substitute("Consensus queue metrics:"
1379
76
                    "Only Majority Done Ops: $0, In Progress Ops: $1, Cache: $2",
1380
76
                    metrics_.num_majority_done_ops->value(), metrics_.num_in_progress_ops->value(),
1381
76
                    log_cache_.StatsString());
1382
76
}
1383
1384
62.0k
void PeerMessageQueue::RegisterObserver(PeerMessageQueueObserver* observer) {
1385
62.0k
  LockGuard lock(queue_lock_);
1386
62.0k
  auto iter = std::find(observers_.begin(), observers_.end(), observer);
1387
62.0k
  if (iter == observers_.end()) {
1388
62.0k
    observers_.push_back(observer);
1389
62.0k
  }
1390
62.0k
}
1391
1392
161k
Status PeerMessageQueue::UnRegisterObserver(PeerMessageQueueObserver* observer) {
1393
161k
  LockGuard lock(queue_lock_);
1394
161k
  auto iter = std::find(observers_.begin(), observers_.end(), observer);
1395
161k
  if (iter == observers_.end()) {
1396
150k
    return STATUS(NotFound, "Can't find observer.");
1397
150k
  }
1398
10.9k
  observers_.erase(iter);
1399
10.9k
  return Status::OK();
1400
161k
}
1401
1402
580k
const char* PeerMessageQueue::ModeToStr(Mode mode) {
1403
580k
  switch (mode) {
1404
258k
    case Mode::LEADER: return "LEADER";
1405
322k
    case Mode::NON_LEADER: return "NON_LEADER";
1406
580k
  }
1407
0
  FATAL_INVALID_ENUM_VALUE(PeerMessageQueue::Mode, mode);
1408
0
}
1409
1410
229k
const char* PeerMessageQueue::StateToStr(State state) {
1411
229k
  switch (state) {
1412
0
    case State::kQueueConstructed:
1413
0
      return "QUEUE_CONSTRUCTED";
1414
229k
    case State::kQueueOpen:
1415
229k
      return "QUEUE_OPEN";
1416
0
    case State::kQueueClosed:
1417
0
      return "QUEUE_CLOSED";
1418
1419
229k
  }
1420
0
  FATAL_INVALID_ENUM_VALUE(PeerMessageQueue::State, state);
1421
0
}
1422
1423
34.8M
bool PeerMessageQueue::IsOpInLog(const yb::OpId& desired_op) const {
1424
34.8M
  auto result = log_cache_.LookupOpId(desired_op.index);
1425
34.8M
  if (
PREDICT_TRUE34.8M
(result.ok())) {
1426
34.8M
    return desired_op == *result;
1427
34.8M
  }
1428
18.4E
  if (PREDICT_TRUE(result.status().IsNotFound() || result.status().IsIncomplete())) {
1429
44
    return false;
1430
44
  }
1431
18.4E
  LOG_WITH_PREFIX_UNLOCKED(FATAL) << "Error while reading the log: " << result.status();
1432
18.4E
  return false; // Unreachable; here to squelch GCC warning.
1433
18.4E
}
1434
1435
void PeerMessageQueue::NotifyObserversOfMajorityReplOpChange(
1436
34.7M
    const MajorityReplicatedData& majority_replicated_data) {
1437
34.7M
  WARN_NOT_OK(raft_pool_observers_token_->SubmitClosure(
1438
34.7M
      Bind(&PeerMessageQueue::NotifyObserversOfMajorityReplOpChangeTask,
1439
34.7M
           Unretained(this),
1440
34.7M
           majority_replicated_data)),
1441
34.7M
      LogPrefixUnlocked() + "Unable to notify RaftConsensus of "
1442
34.7M
                           "majority replicated op change.");
1443
34.7M
}
1444
1445
template <class Func>
1446
3.26k
void PeerMessageQueue::NotifyObservers(const char* title, Func&& func) {
1447
3.26k
  WARN_NOT_OK(
1448
3.26k
      raft_pool_observers_token_->SubmitFunc(
1449
3.26k
          [this, func = std::move(func)] {
1450
3.26k
        MAYBE_INJECT_RANDOM_LATENCY(FLAGS_consensus_inject_latency_ms_in_notifications);
1451
3.26k
        std::vector<PeerMessageQueueObserver*> copy;
1452
3.26k
        {
1453
3.26k
          LockGuard lock(queue_lock_);
1454
3.26k
          copy = observers_;
1455
3.26k
        }
1456
1457
3.26k
        for (PeerMessageQueueObserver* observer : copy) {
1458
3.26k
          func(observer);
1459
3.26k
        }
1460
3.26k
      }),
1461
3.26k
      Format("$0Unable to notify observers for $1.", LogPrefixUnlocked(), title));
1462
3.26k
}
consensus_queue.cc:void yb::consensus::PeerMessageQueue::NotifyObservers<yb::consensus::PeerMessageQueue::NumSSTFilesChanged()::$_0>(char const*, yb::consensus::PeerMessageQueue::NumSSTFilesChanged()::$_0&&)
Line
Count
Source
1446
2.06k
void PeerMessageQueue::NotifyObservers(const char* title, Func&& func) {
1447
2.06k
  WARN_NOT_OK(
1448
2.06k
      raft_pool_observers_token_->SubmitFunc(
1449
2.06k
          [this, func = std::move(func)] {
1450
2.06k
        MAYBE_INJECT_RANDOM_LATENCY(FLAGS_consensus_inject_latency_ms_in_notifications);
1451
2.06k
        std::vector<PeerMessageQueueObserver*> copy;
1452
2.06k
        {
1453
2.06k
          LockGuard lock(queue_lock_);
1454
2.06k
          copy = observers_;
1455
2.06k
        }
1456
1457
2.06k
        for (PeerMessageQueueObserver* observer : copy) {
1458
2.06k
          func(observer);
1459
2.06k
        }
1460
2.06k
      }),
1461
2.06k
      Format("$0Unable to notify observers for $1.", LogPrefixUnlocked(), title));
1462
2.06k
}
consensus_queue.cc:void yb::consensus::PeerMessageQueue::NotifyObservers<yb::consensus::PeerMessageQueue::NotifyObserversOfTermChange(long long)::$_1>(char const*, yb::consensus::PeerMessageQueue::NotifyObserversOfTermChange(long long)::$_1&&)
Line
Count
Source
1446
874
void PeerMessageQueue::NotifyObservers(const char* title, Func&& func) {
1447
874
  WARN_NOT_OK(
1448
874
      raft_pool_observers_token_->SubmitFunc(
1449
874
          [this, func = std::move(func)] {
1450
874
        MAYBE_INJECT_RANDOM_LATENCY(FLAGS_consensus_inject_latency_ms_in_notifications);
1451
874
        std::vector<PeerMessageQueueObserver*> copy;
1452
874
        {
1453
874
          LockGuard lock(queue_lock_);
1454
874
          copy = observers_;
1455
874
        }
1456
1457
874
        for (PeerMessageQueueObserver* observer : copy) {
1458
874
          func(observer);
1459
874
        }
1460
874
      }),
1461
874
      Format("$0Unable to notify observers for $1.", LogPrefixUnlocked(), title));
1462
874
}
consensus_queue.cc:void yb::consensus::PeerMessageQueue::NotifyObservers<yb::consensus::PeerMessageQueue::NotifyObserversOfFailedFollower(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, long long, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&)::$_2>(char const*, yb::consensus::PeerMessageQueue::NotifyObserversOfFailedFollower(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, long long, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&)::$_2&&)
Line
Count
Source
1446
329
void PeerMessageQueue::NotifyObservers(const char* title, Func&& func) {
1447
329
  WARN_NOT_OK(
1448
329
      raft_pool_observers_token_->SubmitFunc(
1449
329
          [this, func = std::move(func)] {
1450
329
        MAYBE_INJECT_RANDOM_LATENCY(FLAGS_consensus_inject_latency_ms_in_notifications);
1451
329
        std::vector<PeerMessageQueueObserver*> copy;
1452
329
        {
1453
329
          LockGuard lock(queue_lock_);
1454
329
          copy = observers_;
1455
329
        }
1456
1457
329
        for (PeerMessageQueueObserver* observer : copy) {
1458
329
          func(observer);
1459
329
        }
1460
329
      }),
1461
329
      Format("$0Unable to notify observers for $1.", LogPrefixUnlocked(), title));
1462
329
}
1463
1464
874
void PeerMessageQueue::NotifyObserversOfTermChange(int64_t term) {
1465
874
  NotifyObservers("term change", [term](PeerMessageQueueObserver* observer) {
1466
801
    observer->NotifyTermChange(term);
1467
801
  });
1468
874
}
1469
1470
void PeerMessageQueue::NotifyObserversOfMajorityReplOpChangeTask(
1471
34.7M
    const MajorityReplicatedData& majority_replicated_data) {
1472
34.7M
  std::vector<PeerMessageQueueObserver*> copy;
1473
34.7M
  {
1474
34.7M
    LockGuard lock(queue_lock_);
1475
34.7M
    copy = observers_;
1476
34.7M
  }
1477
1478
  // TODO move commit index advancement here so that the queue is not dependent on consensus at all,
1479
  // but that requires a bit more work.
1480
34.7M
  OpId new_committed_op_id;
1481
34.7M
  OpId last_applied_op_id;
1482
34.7M
  for (PeerMessageQueueObserver* observer : copy) {
1483
34.7M
    observer->UpdateMajorityReplicated(
1484
34.7M
        majority_replicated_data, &new_committed_op_id, &last_applied_op_id);
1485
34.7M
  }
1486
1487
34.7M
  {
1488
34.7M
    LockGuard lock(queue_lock_);
1489
34.7M
    if (!new_committed_op_id.empty() &&
1490
34.7M
        
new_committed_op_id.index > queue_state_.committed_op_id.index34.6M
) {
1491
4.69M
      queue_state_.committed_op_id = new_committed_op_id;
1492
4.69M
    }
1493
34.7M
    queue_state_.last_applied_op_id.MakeAtLeast(last_applied_op_id);
1494
34.7M
    local_peer_->last_applied = queue_state_.last_applied_op_id;
1495
34.7M
    UpdateAllAppliedOpId(&queue_state_.all_applied_op_id);
1496
34.7M
  }
1497
34.7M
}
1498
1499
void PeerMessageQueue::NotifyObserversOfFailedFollower(const string& uuid,
1500
95
                                                       const string& reason) {
1501
95
  int64_t current_term;
1502
95
  {
1503
95
    LockGuard lock(queue_lock_);
1504
95
    current_term = queue_state_.current_term;
1505
95
  }
1506
95
  NotifyObserversOfFailedFollower(uuid, current_term, reason);
1507
95
}
1508
1509
void PeerMessageQueue::NotifyObserversOfFailedFollower(const string& uuid,
1510
                                                       int64_t term,
1511
328
                                                       const string& reason) {
1512
330
  NotifyObservers("failed follower", [uuid, term, reason](PeerMessageQueueObserver* observer) {
1513
330
    observer->NotifyFailedFollower(uuid, term, reason);
1514
330
  });
1515
328
}
1516
1517
312k
bool PeerMessageQueue::PeerAcceptedOurLease(const std::string& uuid) const {
1518
312k
  std::lock_guard<simple_spinlock> lock(queue_lock_);
1519
312k
  TrackedPeer* peer = FindPtrOrNull(peers_map_, uuid);
1520
312k
  if (peer == nullptr) {
1521
106k
    return false;
1522
106k
  }
1523
1524
206k
  return peer->leader_lease_expiration.last_received != CoarseTimePoint();
1525
312k
}
1526
1527
10.5k
bool PeerMessageQueue::CanPeerBecomeLeader(const std::string& peer_uuid) const {
1528
10.5k
  std::lock_guard<simple_spinlock> lock(queue_lock_);
1529
10.5k
  TrackedPeer* peer = FindPtrOrNull(peers_map_, peer_uuid);
1530
10.5k
  if (peer == nullptr) {
1531
0
    LOG(ERROR) << "Invalid peer UUID: " << peer_uuid;
1532
0
    return false;
1533
0
  }
1534
10.5k
  const bool peer_can_be_leader = peer->last_received >= queue_state_.majority_replicated_op_id;
1535
10.5k
  if (!peer_can_be_leader) {
1536
166
    LOG(INFO) << Format(
1537
166
        "Peer $0 cannot become Leader as it is not caught up: Majority OpId $1, Peer OpId $2",
1538
166
        peer_uuid, queue_state_.majority_replicated_op_id, peer->last_received);
1539
166
  }
1540
10.5k
  return peer_can_be_leader;
1541
10.5k
}
1542
1543
10.0k
OpId PeerMessageQueue::PeerLastReceivedOpId(const TabletServerId& uuid) const {
1544
10.0k
  std::lock_guard<simple_spinlock> lock(queue_lock_);
1545
10.0k
  TrackedPeer* peer = FindPtrOrNull(peers_map_, uuid);
1546
10.0k
  if (peer == nullptr) {
1547
0
    LOG(ERROR) << "Invalid peer UUID: " << uuid;
1548
0
    return OpId::Min();
1549
0
  }
1550
10.0k
  return peer->last_received;
1551
10.0k
}
1552
1553
591
string PeerMessageQueue::GetUpToDatePeer() const {
1554
591
  OpId highest_op_id = OpId::Min();
1555
591
  std::vector<std::string> candidates;
1556
1557
591
  {
1558
591
    std::lock_guard<simple_spinlock> lock(queue_lock_);
1559
2.16k
    for (const PeersMap::value_type& entry : peers_map_) {
1560
2.16k
      if (local_peer_uuid_ == entry.first) {
1561
591
        continue;
1562
591
      }
1563
1.57k
      if (highest_op_id > entry.second->last_received) {
1564
63
        continue;
1565
1.50k
      } else if (highest_op_id == entry.second->last_received) {
1566
913
        candidates.push_back(entry.first);
1567
913
      } else {
1568
596
        candidates = {entry.first};
1569
596
        highest_op_id = entry.second->last_received;
1570
596
      }
1571
1.57k
    }
1572
591
  }
1573
1574
591
  if (candidates.empty()) {
1575
0
    return string();
1576
0
  }
1577
591
  size_t index = 0;
1578
591
  if (candidates.size() > 1) {
1579
    // choose randomly among candidates at the same opid
1580
504
    index = RandomUniformInt<size_t>(0, candidates.size() - 1);
1581
504
  }
1582
591
  return candidates[index];
1583
591
}
1584
1585
75.6k
PeerMessageQueue::~PeerMessageQueue() {
1586
75.6k
  Close();
1587
75.6k
}
1588
1589
351k
string PeerMessageQueue::LogPrefixUnlocked() const {
1590
  // TODO: we should probably use an atomic here. We'll just annotate away the TSAN error for now,
1591
  // since the worst case is a slightly out-of-date log message, and not very likely.
1592
351k
  Mode mode = ANNOTATE_UNPROTECTED_READ(queue_state_.mode);
1593
351k
  return Substitute("T $0 P $1 [$2]: ",
1594
351k
                    tablet_id_,
1595
351k
                    local_peer_uuid_,
1596
351k
                    ModeToStr(mode));
1597
351k
}
1598
1599
229k
string PeerMessageQueue::QueueState::ToString() const {
1600
229k
  return Format(
1601
229k
      "All replicated op: $0, Majority replicated op: $1, Committed index: $2, Last applied: $3, "
1602
229k
      "Last appended: $4, Current term: $5, Majority size: $6, State: $7, Mode: $8$9",
1603
229k
      /* 0 */ all_replicated_op_id,
1604
229k
      /* 1 */ majority_replicated_op_id,
1605
229k
      /* 2 */ committed_op_id,
1606
229k
      /* 3 */ last_applied_op_id,
1607
229k
      /* 4 */ last_appended,
1608
229k
      /* 5 */ current_term,
1609
229k
      /* 6 */ majority_size_,
1610
229k
      /* 7 */ StateToStr(state),
1611
229k
      /* 8 */ ModeToStr(mode),
1612
229k
      /* 9 */ active_config ? 
", active raft config: " + active_config->ShortDebugString()67.8k
:
""161k
);
1613
229k
}
1614
1615
0
size_t PeerMessageQueue::LogCacheSize() {
1616
0
  return log_cache_.BytesUsed();
1617
0
}
1618
1619
0
size_t PeerMessageQueue::EvictLogCache(size_t bytes_to_evict) {
1620
0
  return log_cache_.EvictThroughOp(std::numeric_limits<int64_t>::max(), bytes_to_evict);
1621
0
}
1622
1623
0
Status PeerMessageQueue::FlushLogIndex() {
1624
0
  return log_cache_.FlushIndex();
1625
0
}
1626
1627
13.0M
void PeerMessageQueue::TrackOperationsMemory(const OpIds& op_ids) {
1628
13.0M
  log_cache_.TrackOperationsMemory(op_ids);
1629
13.0M
}
1630
1631
Result<OpId> PeerMessageQueue::TEST_GetLastOpIdWithType(
1632
3
    int64_t max_allowed_index, OperationType op_type) {
1633
3
  return log_cache_.TEST_GetLastOpIdWithType(max_allowed_index, op_type);
1634
3
}
1635
1636
14.6k
Status ValidateFlags() {
1637
  // Normally we would have used
1638
  //   DEFINE_validator(rpc_throttle_threshold_bytes, &RpcThrottleThresholdBytesValidator);
1639
  // right after defining the rpc_throttle_threshold_bytes flag. However, this leads to a segfault
1640
  // in the LTO-enabled build, presumably due to indeterminate order of static initialization.
1641
  // Instead, we invoke this function from master/tserver main() functions when static
1642
  // initialization is already finished.
1643
14.6k
  if (!RpcThrottleThresholdBytesValidator(
1644
14.6k
      "rpc_throttle_threshold_bytes", FLAGS_rpc_throttle_threshold_bytes)) {
1645
0
    return STATUS(InvalidArgument, "Flag validation failed");
1646
0
  }
1647
1648
14.6k
  return Status::OK();
1649
14.6k
}
1650
1651
}  // namespace consensus
1652
}  // namespace yb