YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/consensus/consensus_queue.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include "yb/consensus/consensus_queue.h"
34
35
#include <algorithm>
36
#include <mutex>
37
#include <shared_mutex>
38
#include <string>
39
#include <utility>
40
41
#include <boost/container/small_vector.hpp>
42
#include <glog/logging.h>
43
44
#include "yb/consensus/consensus_context.h"
45
#include "yb/consensus/log_util.h"
46
#include "yb/consensus/opid_util.h"
47
#include "yb/consensus/quorum_util.h"
48
#include "yb/consensus/raft_consensus.h"
49
#include "yb/consensus/replicate_msgs_holder.h"
50
51
#include "yb/gutil/bind.h"
52
#include "yb/gutil/dynamic_annotations.h"
53
#include "yb/gutil/map-util.h"
54
#include "yb/gutil/stl_util.h"
55
#include "yb/gutil/strings/substitute.h"
56
57
#include "yb/util/enums.h"
58
#include "yb/util/fault_injection.h"
59
#include "yb/util/flag_tags.h"
60
#include "yb/util/locks.h"
61
#include "yb/util/logging.h"
62
#include "yb/util/mem_tracker.h"
63
#include "yb/util/metrics.h"
64
#include "yb/util/monotime.h"
65
#include "yb/util/random_util.h"
66
#include "yb/util/result.h"
67
#include "yb/util/size_literals.h"
68
#include "yb/util/status_log.h"
69
#include "yb/util/threadpool.h"
70
#include "yb/util/tostring.h"
71
#include "yb/util/url-coding.h"
72
73
using namespace std::literals;
74
using namespace yb::size_literals;
75
76
DECLARE_uint64(rpc_max_message_size);
77
78
// We expect that consensus_max_batch_size_bytes + 1_KB would be less than rpc_max_message_size.
79
// Otherwise such batch would be rejected by RPC layer.
80
DEFINE_uint64(consensus_max_batch_size_bytes, 4_MB,
81
              "The maximum per-tablet RPC batch size when updating peers.");
82
TAG_FLAG(consensus_max_batch_size_bytes, advanced);
83
TAG_FLAG(consensus_max_batch_size_bytes, runtime);
84
85
DEFINE_int32(follower_unavailable_considered_failed_sec, 900,
86
             "Seconds that a leader is unable to successfully heartbeat to a "
87
             "follower after which the follower is considered to be failed and "
88
             "evicted from the config.");
89
TAG_FLAG(follower_unavailable_considered_failed_sec, advanced);
90
91
DEFINE_int32(consensus_inject_latency_ms_in_notifications, 0,
92
             "Injects a random sleep between 0 and this many milliseconds into "
93
             "asynchronous notifications from the consensus queue back to the "
94
             "consensus implementation.");
95
TAG_FLAG(consensus_inject_latency_ms_in_notifications, hidden);
96
TAG_FLAG(consensus_inject_latency_ms_in_notifications, unsafe);
97
98
DEFINE_int32(cdc_checkpoint_opid_interval_ms, 60 * 1000,
99
             "Interval up to which CDC consumer's checkpoint is considered for retaining log cache."
100
             "If we haven't received an updated checkpoint from CDC consumer within the interval "
101
             "specified by cdc_checkpoint_opid_interval, then log cache does not consider that "
102
             "consumer while determining which op IDs to evict.");
103
104
DEFINE_int64(cdc_intent_retention_ms, 4 * 3600 * 1000,
105
             "Interval up to which CDC consumer's checkpoint is considered for retaining intents."
106
             "If we haven't received an updated checkpoint from CDC consumer within the interval "
107
             "specified by cdc_checkpoint_opid_interval, then CDC does not consider that "
108
             "consumer while determining which op IDs to delete from the intent.");
109
110
DEFINE_bool(enable_consensus_exponential_backoff, true,
111
            "Whether exponential backoff based on number of retransmissions at tablet leader "
112
            "for number of entries to replicate to lagging follower is enabled.");
113
TAG_FLAG(enable_consensus_exponential_backoff, advanced);
114
TAG_FLAG(enable_consensus_exponential_backoff, runtime);
115
116
DEFINE_int32(consensus_lagging_follower_threshold, 10,
117
             "Number of retransmissions at tablet leader to mark a follower as lagging. "
118
             "-1 disables the feature.");
119
TAG_FLAG(consensus_lagging_follower_threshold, advanced);
120
TAG_FLAG(consensus_lagging_follower_threshold, runtime);
121
122
DEFINE_test_flag(bool, disallow_lmp_failures, false,
123
                 "Whether we disallow PRECEDING_ENTRY_DIDNT_MATCH failures for non new peers.");
124
125
namespace {
126
127
constexpr const auto kMinRpcThrottleThresholdBytes = 16;
128
129
10.3k
static bool RpcThrottleThresholdBytesValidator(const char* flagname, int64_t value) {
130
10.3k
  if (value > 0) {
131
10.3k
    if (value < kMinRpcThrottleThresholdBytes) {
132
0
      LOG(ERROR) << "Expect " << flagname << " to be at least " << kMinRpcThrottleThresholdBytes;
133
0
      return false;
134
10.3k
    } else if (implicit_cast<size_t>(value) >= FLAGS_consensus_max_batch_size_bytes) {
135
0
      LOG(ERROR) << "Expect " << flagname << " to be less than consensus_max_batch_size_bytes "
136
0
                 << "value (" << FLAGS_consensus_max_batch_size_bytes << ")";
137
0
      return false;
138
0
    }
139
10.3k
  }
140
10.3k
  return true;
141
10.3k
}
142
143
} // namespace
144
145
DECLARE_int64(rpc_throttle_threshold_bytes);
146
147
namespace yb {
148
namespace consensus {
149
150
using log::Log;
151
using std::unique_ptr;
152
using rpc::Messenger;
153
using strings::Substitute;
154
155
METRIC_DEFINE_gauge_int64(tablet, majority_done_ops, "Leader Operations Acked by Majority",
156
                          MetricUnit::kOperations,
157
                          "Number of operations in the leader queue ack'd by a majority but "
158
                          "not all peers.");
159
METRIC_DEFINE_gauge_int64(tablet, in_progress_ops, "Leader Operations in Progress",
160
                          MetricUnit::kOperations,
161
                          "Number of operations in the leader queue ack'd by a minority of "
162
                          "peers.");
163
164
const auto kCDCConsumerCheckpointInterval = FLAGS_cdc_checkpoint_opid_interval_ms * 1ms;
165
166
const auto kCDCConsumerIntentRetention = FLAGS_cdc_intent_retention_ms * 1ms;
167
168
0
std::string MajorityReplicatedData::ToString() const {
169
0
  return Format(
170
0
      "{ op_id: $0 leader_lease_expiration: $1 ht_lease_expiration: $2 num_sst_files: $3 }",
171
0
      op_id, leader_lease_expiration, ht_lease_expiration, num_sst_files);
172
0
}
173
174
68.6k
std::string PeerMessageQueue::TrackedPeer::ToString() const {
175
68.6k
  return Format(
176
68.6k
      "{ peer: $0 is_new: $1 last_received: $2 next_index: $3 last_known_committed_idx: $4 "
177
68.6k
      "is_last_exchange_successful: $5 needs_remote_bootstrap: $6 member_type: $7 "
178
68.6k
      "num_sst_files: $8 last_applied: $9 }",
179
68.6k
      uuid, is_new, last_received, next_index, last_known_committed_idx,
180
68.6k
      is_last_exchange_successful, needs_remote_bootstrap, PeerMemberType_Name(member_type),
181
68.6k
      num_sst_files, last_applied);
182
68.6k
}
183
184
44.3k
void PeerMessageQueue::TrackedPeer::ResetLeaderLeases() {
185
44.3k
  leader_lease_expiration.Reset();
186
44.3k
  leader_ht_lease_expiration.Reset();
187
44.3k
}
188
189
15.2M
void PeerMessageQueue::TrackedPeer::ResetLastRequest() {
190
  // Reset so that next transmission is not considered a re-transmission.
191
15.2M
  last_num_messages_sent = -1;
192
15.2M
  current_retransmissions = -1;
193
15.2M
}
194
195
#define INSTANTIATE_METRIC(x) \
196
  x.Instantiate(metric_entity, 0)
197
PeerMessageQueue::Metrics::Metrics(const scoped_refptr<MetricEntity>& metric_entity)
198
  : num_majority_done_ops(INSTANTIATE_METRIC(METRIC_majority_done_ops)),
199
88.8k
    num_in_progress_ops(INSTANTIATE_METRIC(METRIC_in_progress_ops)) {
200
88.8k
}
201
#undef INSTANTIATE_METRIC
202
203
PeerMessageQueue::PeerMessageQueue(const scoped_refptr<MetricEntity>& metric_entity,
204
                                   const scoped_refptr<log::Log>& log,
205
                                   const MemTrackerPtr& server_tracker,
206
                                   const MemTrackerPtr& parent_tracker,
207
                                   const RaftPeerPB& local_peer_pb,
208
                                   const string& tablet_id,
209
                                   const server::ClockPtr& clock,
210
                                   ConsensusContext* context,
211
                                   unique_ptr<ThreadPoolToken> raft_pool_token)
212
    : raft_pool_observers_token_(std::move(raft_pool_token)),
213
      local_peer_pb_(local_peer_pb),
214
      local_peer_uuid_(local_peer_pb_.has_permanent_uuid() ? local_peer_pb_.permanent_uuid()
215
                                                           : string()),
216
      tablet_id_(tablet_id),
217
      log_cache_(metric_entity, log, server_tracker, local_peer_pb.permanent_uuid(), tablet_id),
218
      operations_mem_tracker_(
219
          MemTracker::FindOrCreateTracker("OperationsFromDisk", parent_tracker)),
220
      metrics_(metric_entity),
221
      clock_(clock),
222
88.8k
      context_(context) {
223
88.8k
  DCHECK(local_peer_pb_.has_permanent_uuid());
224
88.8k
  DCHECK(!local_peer_pb_.last_known_private_addr().empty());
225
88.8k
}
226
227
88.7k
void PeerMessageQueue::Init(const OpId& last_locally_replicated) {
228
88.7k
  LockGuard lock(queue_lock_);
229
88.7k
  CHECK_EQ(queue_state_.state, State::kQueueConstructed);
230
88.7k
  log_cache_.Init(last_locally_replicated.ToPB<OpIdPB>());
231
88.7k
  queue_state_.last_appended = last_locally_replicated;
232
88.7k
  queue_state_.state = State::kQueueOpen;
233
88.7k
  local_peer_ = TrackPeerUnlocked(local_peer_uuid_);
234
235
88.7k
  if (context_) {
236
88.7k
    context_->ListenNumSSTFilesChanged(std::bind(&PeerMessageQueue::NumSSTFilesChanged, this));
237
88.7k
    installed_num_sst_files_changed_listener_ = true;
238
88.7k
  }
239
88.7k
}
240
241
void PeerMessageQueue::SetLeaderMode(const OpId& committed_op_id,
242
                                     int64_t current_term,
243
                                     const OpId& last_applied_op_id,
244
38.1k
                                     const RaftConfigPB& active_config) {
245
38.1k
  LockGuard lock(queue_lock_);
246
38.1k
  queue_state_.current_term = current_term;
247
38.1k
  queue_state_.committed_op_id = committed_op_id;
248
38.1k
  queue_state_.last_applied_op_id = last_applied_op_id;
249
38.1k
  queue_state_.majority_replicated_op_id = committed_op_id;
250
38.1k
  queue_state_.active_config.reset(new RaftConfigPB(active_config));
251
6
  CHECK(IsRaftConfigVoter(local_peer_uuid_, *queue_state_.active_config))
252
6
      << local_peer_pb_.ShortDebugString() << " not a voter in config: "
253
6
      << queue_state_.active_config->ShortDebugString();
254
38.1k
  queue_state_.majority_size_ = MajoritySize(CountVoters(*queue_state_.active_config));
255
38.1k
  queue_state_.mode = Mode::LEADER;
256
257
38.1k
  LOG_WITH_PREFIX_UNLOCKED(INFO) << "Queue going to LEADER mode. State: "
258
38.1k
      << queue_state_.ToString();
259
38.1k
  CheckPeersInActiveConfigIfLeaderUnlocked();
260
261
  // Reset last communication time with all peers to reset the clock on the
262
  // failure timeout.
263
38.1k
  MonoTime now(MonoTime::Now());
264
44.3k
  for (const PeersMap::value_type& entry : peers_map_) {
265
44.3k
    entry.second->ResetLeaderLeases();
266
44.3k
    entry.second->last_successful_communication_time = now;
267
44.3k
  }
268
38.1k
}
269
270
93.6k
void PeerMessageQueue::SetNonLeaderMode() {
271
93.6k
  LockGuard lock(queue_lock_);
272
93.6k
  queue_state_.active_config.reset();
273
93.6k
  queue_state_.mode = Mode::NON_LEADER;
274
93.6k
  queue_state_.majority_size_ = -1;
275
93.6k
  LOG_WITH_PREFIX_UNLOCKED(INFO) << "Queue going to NON_LEADER mode. State: "
276
93.6k
      << queue_state_.ToString();
277
93.6k
}
278
279
69.6k
void PeerMessageQueue::TrackPeer(const string& uuid) {
280
69.6k
  LockGuard lock(queue_lock_);
281
69.6k
  TrackPeerUnlocked(uuid);
282
69.6k
}
283
284
158k
PeerMessageQueue::TrackedPeer* PeerMessageQueue::TrackPeerUnlocked(const string& uuid) {
285
27
  CHECK(!uuid.empty()) << "Got request to track peer with empty UUID";
286
158k
  DCHECK_EQ(queue_state_.state, State::kQueueOpen);
287
288
158k
  TrackedPeer* tracked_peer = new TrackedPeer(uuid);
289
290
  // We don't know the last operation received by the peer so, following the Raft protocol, we set
291
  // next_index to one past the end of our own log. This way, if calling this method is the result
292
  // of a successful leader election and the logs between the new leader and remote peer match, the
293
  // peer->next_index will point to the index of the soon-to-be-written NO_OP entry that is used to
294
  // assert leadership. If we guessed wrong, and the peer does not have a log that matches ours, the
295
  // normal queue negotiation process will eventually find the right point to resume from.
296
158k
  tracked_peer->next_index = queue_state_.last_appended.index + 1;
297
158k
  InsertOrDie(&peers_map_, uuid, tracked_peer);
298
299
158k
  CheckPeersInActiveConfigIfLeaderUnlocked();
300
301
  // We don't know how far back this peer is, so set the all replicated watermark to
302
  // MinimumOpId. We'll advance it when we know how far along the peer is.
303
158k
  queue_state_.all_replicated_op_id = OpId::Min();
304
158k
  return tracked_peer;
305
158k
}
306
307
43.2k
void PeerMessageQueue::UntrackPeer(const string& uuid) {
308
43.2k
  LockGuard lock(queue_lock_);
309
43.2k
  TrackedPeer* peer = EraseKeyReturnValuePtr(&peers_map_, uuid);
310
43.2k
  if (peer != nullptr) {
311
43.2k
    delete peer;
312
43.2k
  }
313
43.2k
}
314
315
196k
void PeerMessageQueue::CheckPeersInActiveConfigIfLeaderUnlocked() const {
316
196k
  if (queue_state_.mode != Mode::LEADER) return;
317
107k
  std::unordered_set<std::string> config_peer_uuids;
318
328k
  for (const RaftPeerPB& peer_pb : queue_state_.active_config->peers()) {
319
328k
    InsertOrDie(&config_peer_uuids, peer_pb.permanent_uuid());
320
328k
  }
321
223k
  for (const PeersMap::value_type& entry : peers_map_) {
322
223k
    if (!ContainsKey(config_peer_uuids, entry.first)) {
323
0
      LOG_WITH_PREFIX_UNLOCKED(FATAL) << Substitute("Peer $0 is not in the active config. "
324
0
                                                    "Queue state: $1",
325
0
                                                    entry.first,
326
0
                                                    queue_state_.ToString());
327
0
    }
328
223k
  }
329
107k
}
330
331
2.40k
void PeerMessageQueue::NumSSTFilesChanged() {
332
2.40k
  auto num_sst_files = context_->NumSSTFiles();
333
334
2.40k
  uint64_t majority_replicated_num_sst_files;
335
2.40k
  {
336
2.40k
    LockGuard lock(queue_lock_);
337
2.40k
    if (queue_state_.mode != Mode::LEADER) {
338
964
      return;
339
964
    }
340
1.44k
    auto it = peers_map_.find(local_peer_uuid_);
341
1.44k
    if (it == peers_map_.end()) {
342
0
      return;
343
0
    }
344
1.44k
    it->second->num_sst_files = num_sst_files;
345
1.44k
    majority_replicated_num_sst_files = NumSSTFilesWatermark();
346
1.44k
  }
347
348
1.44k
  NotifyObservers(
349
1.44k
      "majority replicated num SST files changed",
350
1.44k
      [majority_replicated_num_sst_files](PeerMessageQueueObserver* observer) {
351
1.44k
    observer->MajorityReplicatedNumSSTFilesChanged(majority_replicated_num_sst_files);
352
1.44k
  });
353
1.44k
}
354
355
13.1M
void PeerMessageQueue::LocalPeerAppendFinished(const OpId& id, const Status& status) {
356
13.1M
  CHECK_OK(status);
357
358
  // Fake an RPC response from the local peer.
359
  // TODO: we should probably refactor the ResponseFromPeer function so that we don't need to
360
  // construct this fake response, but this seems to work for now.
361
13.1M
  ConsensusResponsePB fake_response;
362
13.1M
  id.ToPB(fake_response.mutable_status()->mutable_last_received());
363
13.1M
  id.ToPB(fake_response.mutable_status()->mutable_last_received_current_leader());
364
13.1M
  if (context_) {
365
13.1M
    fake_response.set_num_sst_files(context_->NumSSTFiles());
366
13.1M
  }
367
13.1M
  {
368
13.1M
    LockGuard lock(queue_lock_);
369
370
    // TODO This ugly fix is required because we unlock queue_lock_ while doing AppendOperations.
371
    // So LocalPeerAppendFinished could be invoked before rest of AppendOperations.
372
13.1M
    if (queue_state_.last_appended.index < id.index) {
373
47.7k
      queue_state_.last_appended = id;
374
47.7k
    }
375
13.1M
    fake_response.mutable_status()->set_last_committed_idx(queue_state_.committed_op_id.index);
376
13.1M
    queue_state_.last_applied_op_id.ToPB(fake_response.mutable_status()->mutable_last_applied());
377
378
13.1M
    if (queue_state_.mode != Mode::LEADER) {
379
8.21M
      log_cache_.EvictThroughOp(id.index);
380
381
8.21M
      UpdateMetrics();
382
8.21M
      return;
383
8.21M
    }
384
4.92M
  }
385
4.92M
  ResponseFromPeer(local_peer_uuid_, fake_response);
386
4.92M
}
387
388
768
Status PeerMessageQueue::TEST_AppendOperation(const ReplicateMsgPtr& msg) {
389
768
  return AppendOperations(
390
768
      { msg }, yb::OpId::FromPB(msg->committed_op_id()), RestartSafeCoarseMonoClock().Now());
391
768
}
392
393
Status PeerMessageQueue::AppendOperations(const ReplicateMsgs& msgs,
394
                                          const yb::OpId& committed_op_id,
395
13.1M
                                          RestartSafeCoarseTimePoint batch_mono_time) {
396
13.1M
  DFAKE_SCOPED_LOCK(append_fake_lock_);
397
13.1M
  OpId last_id;
398
13.1M
  if (!msgs.empty()) {
399
7.14M
    std::unique_lock<simple_spinlock> lock(queue_lock_);
400
401
7.14M
    last_id = OpId::FromPB(msgs.back()->id());
402
403
7.14M
    if (last_id.term > queue_state_.current_term) {
404
68.3k
      queue_state_.current_term = last_id.term;
405
68.3k
    }
406
5.99M
  } else {
407
5.99M
    std::unique_lock<simple_spinlock> lock(queue_lock_);
408
5.99M
    last_id = queue_state_.last_appended;
409
5.99M
  }
410
411
  // Unlock ourselves during Append to prevent a deadlock: it's possible that the log buffer is
412
  // full, in which case AppendOperations would block. However, for the log buffer to empty, it may
413
  // need to call LocalPeerAppendFinished() which also needs queue_lock_.
414
  //
415
  // Since we are doing AppendOperations only in one thread, no concurrent AppendOperations could
416
  // be executed and queue_state_.last_appended will be updated correctly.
417
13.1M
  RETURN_NOT_OK(log_cache_.AppendOperations(
418
13.1M
      msgs, committed_op_id, batch_mono_time,
419
13.1M
      Bind(&PeerMessageQueue::LocalPeerAppendFinished, Unretained(this), last_id)));
420
421
13.1M
  if (!msgs.empty()) {
422
7.15M
    std::unique_lock<simple_spinlock> lock(queue_lock_);
423
7.15M
    queue_state_.last_appended = last_id;
424
7.15M
    UpdateMetrics();
425
7.15M
  }
426
427
13.1M
  return Status::OK();
428
13.1M
}
429
430
3.05M
uint64_t GetNumMessagesToSendWithBackoff(int64_t last_num_messages_sent) {
431
3.05M
  return std::max<int64_t>((last_num_messages_sent >> 1) - 1, 0);
432
3.05M
}
433
434
Status PeerMessageQueue::RequestForPeer(const string& uuid,
435
                                        ConsensusRequestPB* request,
436
                                        ReplicateMsgsHolder* msgs_holder,
437
                                        bool* needs_remote_bootstrap,
438
                                        PeerMemberType* member_type,
439
13.3M
                                        bool* last_exchange_successful) {
440
13.3M
  static constexpr uint64_t kSendUnboundedLogOps = std::numeric_limits<uint64_t>::max();
441
14.6k
  DCHECK(request->ops().empty()) << request->ShortDebugString();
442
443
13.3M
  OpId preceding_id;
444
13.3M
  MonoDelta unreachable_time = MonoDelta::kMin;
445
13.3M
  bool is_voter = false;
446
13.3M
  bool is_new;
447
13.3M
  int64_t previously_sent_index;
448
13.3M
  uint64_t num_log_ops_to_send;
449
13.3M
  HybridTime propagated_safe_time;
450
451
  // Should be before now_ht, i.e. not greater than propagated_hybrid_time.
452
13.3M
  if (context_) {
453
13.3M
    propagated_safe_time = VERIFY_RESULT(context_->PreparePeerRequest());
454
13.3M
  }
455
456
13.3M
  {
457
13.3M
    LockGuard lock(queue_lock_);
458
13.3M
    DCHECK_EQ(queue_state_.state, State::kQueueOpen);
459
13.3M
    DCHECK_NE(uuid, local_peer_uuid_);
460
461
13.3M
    auto peer = FindPtrOrNull(peers_map_, uuid);
462
13.3M
    if (PREDICT_FALSE(peer == nullptr || queue_state_.mode == Mode::NON_LEADER)) {
463
20
      return STATUS(NotFound, "Peer not tracked or queue not in leader mode.");
464
20
    }
465
466
13.3M
    HybridTime now_ht;
467
468
13.3M
    is_new = peer->is_new;
469
13.3M
    if (!is_new) {
470
13.2M
      now_ht = clock_->Now();
471
472
13.2M
      auto ht_lease_expiration_micros = now_ht.GetPhysicalValueMicros() +
473
13.2M
                                        FLAGS_ht_lease_duration_ms * 1000;
474
13.2M
      auto leader_lease_duration_ms = GetAtomicFlag(&FLAGS_leader_lease_duration_ms);
475
13.2M
      request->set_leader_lease_duration_ms(leader_lease_duration_ms);
476
13.2M
      request->set_ht_lease_expiration(ht_lease_expiration_micros);
477
478
      // As noted here:
479
      // https://red.ht/2sCSErb
480
      //
481
      // The _COARSE variants are faster to read and have a precision (also known as resolution) of
482
      // one millisecond (ms).
483
      //
484
      // Coarse clock precision is 1 millisecond.
485
13.2M
      const auto kCoarseClockPrecision = 1ms;
486
487
      // Because of coarse clocks we subtract 2ms, to be sure that our local version of lease
488
      // does not expire after it expires at follower.
489
13.2M
      peer->leader_lease_expiration.last_sent =
490
13.2M
          CoarseMonoClock::Now() + leader_lease_duration_ms * 1ms - kCoarseClockPrecision * 2;
491
13.2M
      peer->leader_ht_lease_expiration.last_sent = ht_lease_expiration_micros;
492
93.9k
    } else {
493
93.9k
      now_ht = clock_->Now();
494
93.9k
      request->clear_leader_lease_duration_ms();
495
93.9k
      request->clear_ht_lease_expiration();
496
93.9k
      peer->leader_lease_expiration.Reset();
497
93.9k
      peer->leader_ht_lease_expiration.Reset();
498
93.9k
    }
499
    // This is initialized to the queue's last appended op but gets set to the id of the
500
    // log entry preceding the first one in 'messages' if messages are found for the peer.
501
    //
502
    // The leader does not know the actual state of a peer but it should always send a value of
503
    // preceding_id that is present in the leader's own log, so the follower can verify the log
504
    // matching property.
505
    //
506
    // In case we decide not to send any messages to the follower this time due to exponential
507
    // backoff to an unresponsive follower, we will keep preceding_id equal to last_appended.
508
    // This is safe because unless the follower already has that operation, it will fail to find
509
    // it in its pending operations in EnforceLogMatchingPropertyMatchesUnlocked and will return
510
    // a log matching property violation error without applying any incorrect messages from its log.
511
    //
512
    // See this scenario for more context on the issue we are trying to avoid:
513
    // https://github.com/yugabyte/yugabyte-db/issues/8150#issuecomment-827821784
514
13.3M
    preceding_id = queue_state_.last_appended;
515
516
13.3M
    request->set_propagated_hybrid_time(now_ht.ToUint64());
517
518
    // NOTE: committed_op_id may be overwritten later.
519
    // In our system committed_op_id means that this operation was also applied.
520
    // If we have operation that applied significant time, followers would not know that this
521
    // operation is committed until it is applied in the leader.
522
    // To address this issue we use majority_replicated_op_id, that is updated before apply.
523
    // But we could use it only when its term matches current term, see Fig.8 in Raft paper.
524
13.3M
    if (queue_state_.majority_replicated_op_id.index > queue_state_.committed_op_id.index &&
525
375k
        queue_state_.majority_replicated_op_id.term == queue_state_.current_term) {
526
375k
      queue_state_.majority_replicated_op_id.ToPB(request->mutable_committed_op_id());
527
13.0M
    } else {
528
13.0M
      queue_state_.committed_op_id.ToPB(request->mutable_committed_op_id());
529
13.0M
    }
530
531
13.3M
    request->set_caller_term(queue_state_.current_term);
532
13.3M
    unreachable_time =
533
13.3M
        MonoTime::Now().GetDeltaSince(peer->last_successful_communication_time);
534
13.3M
    if (member_type) *member_type = peer->member_type;
535
13.3M
    if (last_exchange_successful) *last_exchange_successful = peer->is_last_exchange_successful;
536
13.3M
    *needs_remote_bootstrap = peer->needs_remote_bootstrap;
537
538
13.3M
    previously_sent_index = peer->next_index - 1;
539
13.3M
    if (FLAGS_enable_consensus_exponential_backoff && peer->last_num_messages_sent >= 0) {
540
      // Previous request to peer has not been acked. Reduce number of entries to be sent
541
      // in this attempt using exponential backoff. Note that to_index is inclusive.
542
3.05M
      num_log_ops_to_send = GetNumMessagesToSendWithBackoff(peer->last_num_messages_sent);
543
10.3M
    } else {
544
      // Previous request to peer has been acked or a heartbeat response has been received.
545
      // Transmit as many entries as allowed.
546
10.3M
      num_log_ops_to_send = kSendUnboundedLogOps;
547
10.3M
    }
548
549
13.3M
    peer->current_retransmissions++;
550
551
13.3M
    if (peer->member_type == PeerMemberType::VOTER) {
552
10.1M
      is_voter = true;
553
10.1M
    }
554
13.3M
  }
555
556
13.3M
  if (unreachable_time.ToSeconds() > FLAGS_follower_unavailable_considered_failed_sec) {
557
144
    if (!is_voter || CountVoters(*queue_state_.active_config) > 2) {
558
      // We never drop from 2 voters to 1 voter automatically, at least for now (12/4/18). We may
559
      // want to revisit this later, we're just being cautious with this.
560
      // We remove unconditionally any failed non-voter replica (PRE_VOTER, PRE_OBSERVER, OBSERVER).
561
144
      string msg = Substitute("Leader has been unable to successfully communicate "
562
144
                              "with Peer $0 for more than $1 seconds ($2)",
563
144
                              uuid,
564
144
                              FLAGS_follower_unavailable_considered_failed_sec,
565
144
                              unreachable_time.ToString());
566
144
      NotifyObserversOfFailedFollower(uuid, queue_state_.current_term, msg);
567
144
    }
568
144
  }
569
570
13.3M
  if (PREDICT_FALSE(*needs_remote_bootstrap)) {
571
272
      YB_LOG_WITH_PREFIX_UNLOCKED_EVERY_N_SECS(INFO, 30)
572
272
          << "Peer needs remote bootstrap: " << uuid;
573
4.98k
    return Status::OK();
574
4.98k
  }
575
13.3M
  *needs_remote_bootstrap = false;
576
577
13.3M
  request->clear_propagated_safe_time();
578
579
  // If we've never communicated with the peer, we don't know what messages to send, so we'll send a
580
  // status-only request. If the peer has not responded to the point that our to_index == next_index
581
  // due to exponential backoff of replicated segment size, we also send a status-only request.
582
  // Otherwise, we grab requests from the log starting at the last_received point.
583
13.3M
  if (!is_new && num_log_ops_to_send > 0) {
584
    // The batch of messages to send to the peer.
585
10.2M
    auto max_batch_size = FLAGS_consensus_max_batch_size_bytes - request->ByteSizeLong();
586
10.2M
    auto to_index = num_log_ops_to_send == kSendUnboundedLogOps ?
587
10.2M
        0 : previously_sent_index + num_log_ops_to_send;
588
10.2M
    auto result = ReadFromLogCache(previously_sent_index, to_index, max_batch_size, uuid);
589
590
10.2M
    if (PREDICT_FALSE(!result.ok())) {
591
0
      if (PREDICT_TRUE(result.status().IsNotFound())) {
592
0
        std::string msg = Format("The logs necessary to catch up peer $0 have been "
593
0
                                 "garbage collected. The follower will never be able "
594
0
                                 "to catch up ($1)", uuid, result.status());
595
0
        NotifyObserversOfFailedFollower(uuid, queue_state_.current_term, msg);
596
0
      }
597
0
      return result.status();
598
0
    }
599
600
10.2M
    preceding_id = result->preceding_op;
601
    // We use AddAllocated rather than copy, because we pin the log cache at the "all replicated"
602
    // point. At some point we may want to allow partially loading (and not pinning) earlier
603
    // messages. At that point we'll need to do something smarter here, like copy or ref-count.
604
5.19M
    for (const auto& msg : result->messages) {
605
5.19M
      request->mutable_ops()->AddAllocated(msg.get());
606
5.19M
    }
607
608
10.2M
    {
609
10.2M
      LockGuard lock(queue_lock_);
610
10.2M
      auto peer = FindPtrOrNull(peers_map_, uuid);
611
10.2M
      if (PREDICT_FALSE(peer == nullptr)) {
612
0
        return STATUS(NotFound, "Peer not tracked.");
613
0
      }
614
615
10.2M
      peer->last_num_messages_sent = result->messages.size();
616
10.2M
    }
617
618
10.2M
    ScopedTrackedConsumption consumption;
619
10.2M
    if (result->read_from_disk_size) {
620
1.17k
      consumption = ScopedTrackedConsumption(operations_mem_tracker_, result->read_from_disk_size);
621
1.17k
    }
622
10.2M
    *msgs_holder = ReplicateMsgsHolder(
623
10.2M
        request->mutable_ops(), std::move(result->messages), std::move(consumption));
624
625
10.2M
    if (propagated_safe_time &&
626
10.2M
        !result->have_more_messages &&
627
10.2M
        num_log_ops_to_send == kSendUnboundedLogOps) {
628
      // Get the current local safe time on the leader and propagate it to the follower.
629
10.2M
      request->set_propagated_safe_time(propagated_safe_time.ToUint64());
630
10.2M
    }
631
10.2M
  }
632
633
13.3M
  preceding_id.ToPB(request->mutable_preceding_id());
634
635
  // All entries committed at leader may not be available at lagging follower.
636
  // `commited_op_id` in this request may make a lagging follower aware of the
637
  // highest committed op index at the leader. We have a sanity check during tablet
638
  // bootstrap, in TabletBootstrap::PlaySegments(), that this tablet did not lose a
639
  // committed operation. Hence avoid sending a committed op id that is too large
640
  // to such a lagging follower.
641
  // If we send operations to it, then last know operation to this follower will be last sent
642
  // operation. If we don't send any operation, then last known operation will be preceding
643
  // operation.
644
  // We don't have to change committed_op_id when it is less than max_allowed_committed_op_id,
645
  // because it will have actual committed_op_id value and this operation is known to the
646
  // follower.
647
13.3M
  const auto max_allowed_committed_op_id = !request->ops().empty()
648
8.87M
      ? OpId::FromPB(request->ops().rbegin()->id()) : preceding_id;
649
13.3M
  if (max_allowed_committed_op_id.index < request->committed_op_id().index()) {
650
2.05k
    max_allowed_committed_op_id.ToPB(request->mutable_committed_op_id());
651
2.05k
  }
652
653
13.3M
  if (PREDICT_FALSE(VLOG_IS_ON(2))) {
654
0
    if (request->ops_size() > 0) {
655
0
      VLOG_WITH_PREFIX_UNLOCKED(2) << "Sending request with operations to Peer: " << uuid
656
0
          << ". Size: " << request->ops_size()
657
0
          << ". From: " << request->ops(0).id().ShortDebugString() << ". To: "
658
0
          << request->ops(request->ops_size() - 1).id().ShortDebugString();
659
0
      VLOG_WITH_PREFIX_UNLOCKED(3) << "Operations: " << yb::ToString(request->ops());
660
0
    } else {
661
0
      VLOG_WITH_PREFIX_UNLOCKED(2)
662
0
          << "Sending " << (is_new ? "new " : "") << "status only request to Peer: " << uuid
663
0
          << ": " << request->ShortDebugString();
664
0
    }
665
0
  }
666
667
13.3M
  return Status::OK();
668
13.3M
}
669
670
Result<ReadOpsResult> PeerMessageQueue::ReadFromLogCache(int64_t after_index,
671
                                                         int64_t to_index,
672
                                                         size_t max_batch_size,
673
                                                         const std::string& peer_uuid,
674
10.2M
                                                         const CoarseTimePoint deadline) {
675
10.2M
  DCHECK_LT(FLAGS_consensus_max_batch_size_bytes + 1_KB, FLAGS_rpc_max_message_size);
676
677
  // We try to get the follower's next_index from our log.
678
  // Note this is not using "term" and needs to change
679
10.2M
  auto result = log_cache_.ReadOps(after_index, to_index, max_batch_size, deadline);
680
10.2M
  if (PREDICT_FALSE(!result.ok())) {
681
0
    auto s = result.status();
682
0
    if (PREDICT_TRUE(s.IsNotFound())) {
683
0
      return s;
684
0
    } else if (s.IsIncomplete()) {
685
      // IsIncomplete() means that we tried to read beyond the head of the log (in the future).
686
      // KUDU-1078 points to a fix of this log spew issue that we've ported. This should not
687
      // happen under normal circumstances.
688
0
      LOG_WITH_PREFIX_UNLOCKED(ERROR) << "Error trying to read ahead of the log "
689
0
                                      << "while preparing peer request: "
690
0
                                      << s.ToString() << ". Destination peer: "
691
0
                                      << peer_uuid;
692
0
      return s;
693
0
    } else {
694
0
      LOG_WITH_PREFIX_UNLOCKED(FATAL) << "Error reading the log while preparing peer request: "
695
0
                                      << s.ToString() << ". Destination peer: "
696
0
                                      << peer_uuid;
697
0
      return s;
698
0
    }
699
10.2M
  }
700
10.2M
  return result;
701
10.2M
}
702
703
// Read majority replicated messages from cache for CDC.
704
// CDC producer will use this to get the messages to send in response to cdc::GetChanges RPC.
705
Result<ReadOpsResult> PeerMessageQueue::ReadReplicatedMessagesForCDC(
706
  const yb::OpId& last_op_id,
707
  int64_t* repl_index,
708
310
  const CoarseTimePoint deadline) {
709
  // The batch of messages read from cache.
710
711
310
  int64_t to_index;
712
310
  bool pending_messages = false;
713
310
  {
714
310
    LockGuard lock(queue_lock_);
715
    // Use committed_op_id because it's already been processed by the Transaction codepath.
716
310
    to_index = queue_state_.committed_op_id.index;
717
    // Determine if there are pending operations in RAFT but not yet LogCache.
718
310
    pending_messages = to_index != queue_state_.majority_replicated_op_id.index;
719
310
  }
720
310
  if (repl_index) {
721
307
    *repl_index = to_index;
722
307
  }
723
724
310
  if (last_op_id.index >= to_index) {
725
    // Nothing to read.
726
0
    return ReadOpsResult();
727
0
  }
728
729
  // If an empty OpID is only sent on the first read request, start at the earliest known entry.
730
310
  int64_t after_op_index = last_op_id.empty() ?
731
302
                             max(log_cache_.earliest_op_index(), last_op_id.index) :
732
8
                             last_op_id.index;
733
734
310
  auto result = ReadFromLogCache(
735
310
      after_op_index, to_index, FLAGS_consensus_max_batch_size_bytes, local_peer_uuid_, deadline);
736
310
  if (PREDICT_FALSE(!result.ok()) && PREDICT_TRUE(result.status().IsNotFound())) {
737
0
    LOG_WITH_PREFIX_UNLOCKED(INFO) << Format(
738
0
        "The logs from index $0 have been garbage collected and cannot be read ($1)",
739
0
        after_op_index, result.status());
740
0
  }
741
310
  if (result.ok()) {
742
310
    result->have_more_messages |= pending_messages;
743
310
  }
744
310
  return result;
745
310
}
746
747
Status PeerMessageQueue::GetRemoteBootstrapRequestForPeer(const string& uuid,
748
4.91k
                                                          StartRemoteBootstrapRequestPB* req) {
749
4.91k
  TrackedPeer* peer = nullptr;
750
4.91k
  {
751
4.91k
    LockGuard lock(queue_lock_);
752
4.91k
    DCHECK_EQ(queue_state_.state, State::kQueueOpen);
753
4.91k
    DCHECK_NE(uuid, local_peer_uuid_);
754
4.91k
    peer = FindPtrOrNull(peers_map_, uuid);
755
4.91k
    if (PREDICT_FALSE(peer == nullptr || queue_state_.mode == Mode::NON_LEADER)) {
756
0
      return STATUS(NotFound, "Peer not tracked or queue not in leader mode.");
757
0
    }
758
4.91k
  }
759
760
4.91k
  if (PREDICT_FALSE(!peer->needs_remote_bootstrap)) {
761
0
    return STATUS(IllegalState, "Peer does not need to remotely bootstrap", uuid);
762
0
  }
763
764
4.91k
  if (peer->member_type == PeerMemberType::VOTER || peer->member_type == PeerMemberType::OBSERVER) {
765
356
    LOG(INFO) << "Remote bootstrapping peer " << uuid << " with type "
766
356
              << PeerMemberType_Name(peer->member_type);
767
356
  }
768
769
4.91k
  req->Clear();
770
4.91k
  req->set_dest_uuid(uuid);
771
4.91k
  req->set_tablet_id(tablet_id_);
772
4.91k
  req->set_bootstrap_peer_uuid(local_peer_uuid_);
773
4.91k
  *req->mutable_source_private_addr() = local_peer_pb_.last_known_private_addr();
774
4.91k
  *req->mutable_source_broadcast_addr() = local_peer_pb_.last_known_broadcast_addr();
775
4.91k
  *req->mutable_source_cloud_info() = local_peer_pb_.cloud_info();
776
4.91k
  req->set_caller_term(queue_state_.current_term);
777
4.91k
  peer->needs_remote_bootstrap = false; // Now reset the flag.
778
4.91k
  return Status::OK();
779
4.91k
}
780
781
485
void PeerMessageQueue::UpdateCDCConsumerOpId(const yb::OpId& op_id) {
782
485
  std::lock_guard<rw_spinlock> l(cdc_consumer_lock_);
783
485
  cdc_consumer_op_id_ = op_id;
784
485
  cdc_consumer_op_id_last_updated_ = CoarseMonoClock::Now();
785
485
}
786
787
15.1M
yb::OpId PeerMessageQueue::GetCDCConsumerOpIdToEvict() {
788
15.1M
  std::shared_lock<rw_spinlock> l(cdc_consumer_lock_);
789
  // For log cache eviction, we only want to include CDC consumers that are actively polling.
790
  // If CDC consumer checkpoint has not been updated recently, we exclude it.
791
15.1M
  if (CoarseMonoClock::Now() - cdc_consumer_op_id_last_updated_ <= kCDCConsumerCheckpointInterval) {
792
5.63k
    return cdc_consumer_op_id_;
793
15.1M
  } else {
794
15.1M
    return yb::OpId::Max();
795
15.1M
  }
796
15.1M
}
797
798
1.96M
yb::OpId PeerMessageQueue::GetCDCConsumerOpIdForIntentRemoval() {
799
1.96M
  std::shared_lock<rw_spinlock> l(cdc_consumer_lock_);
800
1.96M
  if (CoarseMonoClock::Now() - cdc_consumer_op_id_last_updated_ <= kCDCConsumerIntentRetention) {
801
542
    return cdc_consumer_op_id_;
802
1.96M
  } else {
803
1.96M
    return yb::OpId::Max();
804
1.96M
  }
805
1.96M
}
806
807
15.1M
void PeerMessageQueue::UpdateAllReplicatedOpId(OpId* result) {
808
15.1M
  OpId new_op_id = OpId::Max();
809
810
44.9M
  for (const auto& peer : peers_map_) {
811
44.9M
    if (!peer.second->is_last_exchange_successful) {
812
266k
      return;
813
266k
    }
814
44.6M
    if (peer.second->last_received.index < new_op_id.index) {
815
17.8M
      new_op_id = peer.second->last_received;
816
17.8M
    }
817
44.6M
  }
818
819
14.8M
  CHECK_NE(OpId::Max(), new_op_id);
820
14.8M
  *result = new_op_id;
821
14.8M
}
822
823
30.2M
void PeerMessageQueue::UpdateAllAppliedOpId(OpId* result) {
824
30.2M
  OpId all_applied_op_id = OpId::Max();
825
89.9M
  for (const auto& peer : peers_map_) {
826
89.9M
    if (!peer.second->is_last_exchange_successful) {
827
524k
      return;
828
524k
    }
829
89.3M
    all_applied_op_id = std::min(all_applied_op_id, peer.second->last_applied);
830
89.3M
  }
831
832
29.7M
  CHECK_NE(OpId::Max(), all_applied_op_id);
833
29.7M
  *result = all_applied_op_id;
834
29.7M
}
835
836
15.1M
void PeerMessageQueue::UpdateAllNonLaggingReplicatedOpId(int32_t threshold) {
837
15.1M
  OpId new_op_id = OpId::Max();
838
839
45.4M
  for (const auto& peer : peers_map_) {
840
    // Ignore lagging follower.
841
45.4M
    if (peer.second->current_retransmissions >= threshold) {
842
74.7k
      continue;
843
74.7k
    }
844
45.3M
    if (peer.second->last_received.index < new_op_id.index) {
845
18.0M
      new_op_id = peer.second->last_received;
846
18.0M
    }
847
45.3M
  }
848
849
15.1M
  if (new_op_id == OpId::Max()) {
850
0
    LOG_WITH_PREFIX_UNLOCKED(INFO) << "Non lagging peer(s) not found.";
851
0
    new_op_id = queue_state_.all_replicated_op_id;
852
0
  }
853
854
15.1M
  if (queue_state_.all_nonlagging_replicated_op_id.index < new_op_id.index) {
855
2.37M
    queue_state_.all_nonlagging_replicated_op_id = new_op_id;
856
2.37M
  }
857
15.1M
}
858
859
HAS_MEMBER_FUNCTION(InfiniteWatermarkForLocalPeer);
860
861
template <class Policy, bool HasMemberFunction_InfiniteWatermarkForLocalPeer>
862
struct GetInfiniteWatermarkForLocalPeer;
863
864
template <class Policy>
865
struct GetInfiniteWatermarkForLocalPeer<Policy, true> {
866
595k
  static auto Apply() {
867
595k
    return Policy::InfiniteWatermarkForLocalPeer();
868
595k
  }
consensus_queue.cc:_ZN2yb9consensus32GetInfiniteWatermarkForLocalPeerIZNS0_16PeerMessageQueue30LeaderLeaseExpirationWatermarkEvE6PolicyLb1EE5ApplyEv
Line
Count
Source
866
297k
  static auto Apply() {
867
297k
    return Policy::InfiniteWatermarkForLocalPeer();
868
297k
  }
consensus_queue.cc:_ZN2yb9consensus32GetInfiniteWatermarkForLocalPeerIZNS0_16PeerMessageQueue34HybridTimeLeaseExpirationWatermarkEvE6PolicyLb1EE5ApplyEv
Line
Count
Source
866
297k
  static auto Apply() {
867
297k
    return Policy::InfiniteWatermarkForLocalPeer();
868
297k
  }
869
};
870
871
template <class Policy>
872
struct GetInfiniteWatermarkForLocalPeer<Policy, false> {
873
  // Should not be invoked, but have to define to make compiler happy.
874
0
  static typename Policy::result_type Apply() {
875
0
    LOG(DFATAL) << "Invoked Apply when InfiniteWatermarkForLocalPeer is not defined";
876
0
    return typename Policy::result_type();
877
0
  }
Unexecuted instantiation: consensus_queue.cc:_ZN2yb9consensus32GetInfiniteWatermarkForLocalPeerIZNS0_16PeerMessageQueue20NumSSTFilesWatermarkEvE6PolicyLb0EE5ApplyEv
Unexecuted instantiation: consensus_queue.cc:_ZN2yb9consensus32GetInfiniteWatermarkForLocalPeerIZNS0_16PeerMessageQueue13OpIdWatermarkEvE6PolicyLb0EE5ApplyEv
878
};
879
880
template <class Policy>
881
60.6M
typename Policy::result_type PeerMessageQueue::GetWatermark() {
882
60.6M
  DCHECK(queue_lock_.is_locked());
883
60.6M
  const auto num_peers_required = queue_state_.majority_size_;
884
60.6M
  if (num_peers_required == kUninitializedMajoritySize) {
885
    // We don't even know the quorum majority size yet.
886
0
    return Policy::NotEnoughPeersValue();
887
0
  }
888
60.6M
  CHECK_GE(num_peers_required, 0);
889
890
60.6M
  const ssize_t num_peers = peers_map_.size();
891
60.6M
  if (num_peers < num_peers_required) {
892
500
    return Policy::NotEnoughPeersValue();
893
500
  }
894
895
  // This flag indicates whether to implicitly assume that the local peer has an "infinite"
896
  // replicated value of the dimension that we are computing a watermark for. There is a difference
897
  // in logic between handling of OpIds vs. leader leases:
898
  // - For OpIds, the local peer might actually be less up-to-date than followers.
899
  // - For leader leases, we always assume that we've replicated an "infinite" lease to ourselves.
900
60.6M
  const bool local_peer_infinite_watermark =
901
60.6M
      HasMemberFunction_InfiniteWatermarkForLocalPeer<Policy>::value;
902
903
60.6M
  if (num_peers_required == 1 && local_peer_infinite_watermark) {
904
    // We give "infinite lease" to ourselves.
905
595k
    return GetInfiniteWatermarkForLocalPeer<
906
595k
        Policy, HasMemberFunction_InfiniteWatermarkForLocalPeer<Policy>::value>::Apply();
907
595k
  }
908
909
60.0M
  constexpr size_t kMaxPracticalReplicationFactor = 5;
910
60.0M
  boost::container::small_vector<
911
60.0M
      typename Policy::result_type, kMaxPracticalReplicationFactor> watermarks;
912
60.0M
  watermarks.reserve(num_peers - 1 + !local_peer_infinite_watermark);
913
914
181M
  for (const PeersMap::value_type &peer_map_entry : peers_map_) {
915
181M
    const TrackedPeer &peer = *peer_map_entry.second;
916
181M
    if (local_peer_infinite_watermark && peer.uuid == local_peer_uuid_) {
917
      // Don't even include the local peer in the watermarks array. Assume it has an "infinite"
918
      // value of the watermark.
919
29.7M
      continue;
920
29.7M
    }
921
151M
    if (!IsRaftConfigVoter(peer.uuid, *queue_state_.active_config)) {
922
      // Only votes from VOTERs in the active config should be taken into consideration
923
1.63M
      continue;
924
1.63M
    }
925
149M
    if (peer.is_last_exchange_successful) {
926
148M
      watermarks.push_back(Policy::ExtractValue(peer));
927
148M
    }
928
149M
  }
929
930
  // We always assume that local peer has most recent information.
931
60.0M
  const ssize_t num_responsive_peers = watermarks.size() + local_peer_infinite_watermark;
932
933
60.0M
  if (num_responsive_peers < num_peers_required) {
934
18
    VLOG_WITH_PREFIX_UNLOCKED(2)
935
18
        << Policy::Name() << " watermarks by peer: " << ::yb::ToString(watermarks)
936
18
        << ", num_peers_required=" << num_peers_required
937
18
        << ", num_responsive_peers=" << num_responsive_peers
938
18
        << ", not enough responsive peers";
939
    // There are not enough peers with which the last message exchange was successful.
940
136k
    return Policy::NotEnoughPeersValue();
941
136k
  }
942
943
  // If there are 5 peers (and num_peers_required is 3), and we have successfully replicated
944
  // something to 3 of them and 4th is our local peer, there are two possibilities:
945
  // - If local_peer_infinite_watermark is false (for OpId): watermarks.size() is 4,
946
  //   and we want an OpId value such that 3 or more peers have replicated that or greater value.
947
  //   Then index_of_interest = 1, computed as watermarks.size() - num_peers_required, or
948
  //   num_responsive_peers - num_peers_required.
949
  //
950
  // - If local_peer_infinite_watermark is true (for leader leases): watermarks.size() is 3, and we
951
  //   are assuming that the local peer (leader) has replicated an infinitely high watermark to
952
  //   itself. Then watermark.size() is 3 (because we skip the local peer when populating
953
  //   watermarks), but num_responsive_peers is still 4, and the expression stays the same.
954
955
59.8M
  const size_t index_of_interest = num_responsive_peers - num_peers_required;
956
59.8M
  DCHECK_LT(index_of_interest, watermarks.size());
957
958
59.8M
  auto nth = watermarks.begin() + index_of_interest;
959
59.8M
  std::nth_element(watermarks.begin(), nth, watermarks.end(), typename Policy::Comparator());
960
18.4E
  VLOG_WITH_PREFIX_UNLOCKED(2)
961
18.4E
      << Policy::Name() << " watermarks by peer: " << ::yb::ToString(watermarks)
962
18.4E
      << ", num_peers_required=" << num_peers_required
963
18.4E
      << ", local_peer_infinite_watermark=" << local_peer_infinite_watermark
964
18.4E
      << ", watermark: " << yb::ToString(*nth);
965
966
59.8M
  return *nth;
967
59.8M
}
consensus_queue.cc:_ZN2yb9consensus16PeerMessageQueue12GetWatermarkIZNS1_30LeaderLeaseExpirationWatermarkEvE6PolicyEENT_11result_typeEv
Line
Count
Source
881
15.1M
typename Policy::result_type PeerMessageQueue::GetWatermark() {
882
15.1M
  DCHECK(queue_lock_.is_locked());
883
15.1M
  const auto num_peers_required = queue_state_.majority_size_;
884
15.1M
  if (num_peers_required == kUninitializedMajoritySize) {
885
    // We don't even know the quorum majority size yet.
886
0
    return Policy::NotEnoughPeersValue();
887
0
  }
888
15.1M
  CHECK_GE(num_peers_required, 0);
889
890
15.1M
  const ssize_t num_peers = peers_map_.size();
891
15.1M
  if (num_peers < num_peers_required) {
892
125
    return Policy::NotEnoughPeersValue();
893
125
  }
894
895
  // This flag indicates whether to implicitly assume that the local peer has an "infinite"
896
  // replicated value of the dimension that we are computing a watermark for. There is a difference
897
  // in logic between handling of OpIds vs. leader leases:
898
  // - For OpIds, the local peer might actually be less up-to-date than followers.
899
  // - For leader leases, we always assume that we've replicated an "infinite" lease to ourselves.
900
15.1M
  const bool local_peer_infinite_watermark =
901
15.1M
      HasMemberFunction_InfiniteWatermarkForLocalPeer<Policy>::value;
902
903
15.1M
  if (num_peers_required == 1 && local_peer_infinite_watermark) {
904
    // We give "infinite lease" to ourselves.
905
297k
    return GetInfiniteWatermarkForLocalPeer<
906
297k
        Policy, HasMemberFunction_InfiniteWatermarkForLocalPeer<Policy>::value>::Apply();
907
297k
  }
908
909
14.8M
  constexpr size_t kMaxPracticalReplicationFactor = 5;
910
14.8M
  boost::container::small_vector<
911
14.8M
      typename Policy::result_type, kMaxPracticalReplicationFactor> watermarks;
912
14.8M
  watermarks.reserve(num_peers - 1 + !local_peer_infinite_watermark);
913
914
45.1M
  for (const PeersMap::value_type &peer_map_entry : peers_map_) {
915
45.1M
    const TrackedPeer &peer = *peer_map_entry.second;
916
45.1M
    if (local_peer_infinite_watermark && peer.uuid == local_peer_uuid_) {
917
      // Don't even include the local peer in the watermarks array. Assume it has an "infinite"
918
      // value of the watermark.
919
14.8M
      continue;
920
14.8M
    }
921
30.3M
    if (!IsRaftConfigVoter(peer.uuid, *queue_state_.active_config)) {
922
      // Only votes from VOTERs in the active config should be taken into consideration
923
407k
      continue;
924
407k
    }
925
29.8M
    if (peer.is_last_exchange_successful) {
926
29.6M
      watermarks.push_back(Policy::ExtractValue(peer));
927
29.6M
    }
928
29.8M
  }
929
930
  // We always assume that local peer has most recent information.
931
14.8M
  const ssize_t num_responsive_peers = watermarks.size() + local_peer_infinite_watermark;
932
933
14.8M
  if (num_responsive_peers < num_peers_required) {
934
18.4E
    VLOG_WITH_PREFIX_UNLOCKED(2)
935
18.4E
        << Policy::Name() << " watermarks by peer: " << ::yb::ToString(watermarks)
936
18.4E
        << ", num_peers_required=" << num_peers_required
937
18.4E
        << ", num_responsive_peers=" << num_responsive_peers
938
18.4E
        << ", not enough responsive peers";
939
    // There are not enough peers with which the last message exchange was successful.
940
34.1k
    return Policy::NotEnoughPeersValue();
941
34.1k
  }
942
943
  // If there are 5 peers (and num_peers_required is 3), and we have successfully replicated
944
  // something to 3 of them and 4th is our local peer, there are two possibilities:
945
  // - If local_peer_infinite_watermark is false (for OpId): watermarks.size() is 4,
946
  //   and we want an OpId value such that 3 or more peers have replicated that or greater value.
947
  //   Then index_of_interest = 1, computed as watermarks.size() - num_peers_required, or
948
  //   num_responsive_peers - num_peers_required.
949
  //
950
  // - If local_peer_infinite_watermark is true (for leader leases): watermarks.size() is 3, and we
951
  //   are assuming that the local peer (leader) has replicated an infinitely high watermark to
952
  //   itself. Then watermark.size() is 3 (because we skip the local peer when populating
953
  //   watermarks), but num_responsive_peers is still 4, and the expression stays the same.
954
955
14.8M
  const size_t index_of_interest = num_responsive_peers - num_peers_required;
956
14.8M
  DCHECK_LT(index_of_interest, watermarks.size());
957
958
14.8M
  auto nth = watermarks.begin() + index_of_interest;
959
14.8M
  std::nth_element(watermarks.begin(), nth, watermarks.end(), typename Policy::Comparator());
960
18.4E
  VLOG_WITH_PREFIX_UNLOCKED(2)
961
18.4E
      << Policy::Name() << " watermarks by peer: " << ::yb::ToString(watermarks)
962
18.4E
      << ", num_peers_required=" << num_peers_required
963
18.4E
      << ", local_peer_infinite_watermark=" << local_peer_infinite_watermark
964
18.4E
      << ", watermark: " << yb::ToString(*nth);
965
966
14.8M
  return *nth;
967
14.8M
}
consensus_queue.cc:_ZN2yb9consensus16PeerMessageQueue12GetWatermarkIZNS1_34HybridTimeLeaseExpirationWatermarkEvE6PolicyEENT_11result_typeEv
Line
Count
Source
881
15.1M
typename Policy::result_type PeerMessageQueue::GetWatermark() {
882
15.1M
  DCHECK(queue_lock_.is_locked());
883
15.1M
  const auto num_peers_required = queue_state_.majority_size_;
884
15.1M
  if (num_peers_required == kUninitializedMajoritySize) {
885
    // We don't even know the quorum majority size yet.
886
0
    return Policy::NotEnoughPeersValue();
887
0
  }
888
15.1M
  CHECK_GE(num_peers_required, 0);
889
890
15.1M
  const ssize_t num_peers = peers_map_.size();
891
15.1M
  if (num_peers < num_peers_required) {
892
125
    return Policy::NotEnoughPeersValue();
893
125
  }
894
895
  // This flag indicates whether to implicitly assume that the local peer has an "infinite"
896
  // replicated value of the dimension that we are computing a watermark for. There is a difference
897
  // in logic between handling of OpIds vs. leader leases:
898
  // - For OpIds, the local peer might actually be less up-to-date than followers.
899
  // - For leader leases, we always assume that we've replicated an "infinite" lease to ourselves.
900
15.1M
  const bool local_peer_infinite_watermark =
901
15.1M
      HasMemberFunction_InfiniteWatermarkForLocalPeer<Policy>::value;
902
903
15.1M
  if (num_peers_required == 1 && local_peer_infinite_watermark) {
904
    // We give "infinite lease" to ourselves.
905
297k
    return GetInfiniteWatermarkForLocalPeer<
906
297k
        Policy, HasMemberFunction_InfiniteWatermarkForLocalPeer<Policy>::value>::Apply();
907
297k
  }
908
909
14.8M
  constexpr size_t kMaxPracticalReplicationFactor = 5;
910
14.8M
  boost::container::small_vector<
911
14.8M
      typename Policy::result_type, kMaxPracticalReplicationFactor> watermarks;
912
14.8M
  watermarks.reserve(num_peers - 1 + !local_peer_infinite_watermark);
913
914
45.1M
  for (const PeersMap::value_type &peer_map_entry : peers_map_) {
915
45.1M
    const TrackedPeer &peer = *peer_map_entry.second;
916
45.1M
    if (local_peer_infinite_watermark && peer.uuid == local_peer_uuid_) {
917
      // Don't even include the local peer in the watermarks array. Assume it has an "infinite"
918
      // value of the watermark.
919
14.8M
      continue;
920
14.8M
    }
921
30.3M
    if (!IsRaftConfigVoter(peer.uuid, *queue_state_.active_config)) {
922
      // Only votes from VOTERs in the active config should be taken into consideration
923
407k
      continue;
924
407k
    }
925
29.8M
    if (peer.is_last_exchange_successful) {
926
29.6M
      watermarks.push_back(Policy::ExtractValue(peer));
927
29.6M
    }
928
29.8M
  }
929
930
  // We always assume that local peer has most recent information.
931
14.8M
  const ssize_t num_responsive_peers = watermarks.size() + local_peer_infinite_watermark;
932
933
14.8M
  if (num_responsive_peers < num_peers_required) {
934
31
    VLOG_WITH_PREFIX_UNLOCKED(2)
935
31
        << Policy::Name() << " watermarks by peer: " << ::yb::ToString(watermarks)
936
31
        << ", num_peers_required=" << num_peers_required
937
31
        << ", num_responsive_peers=" << num_responsive_peers
938
31
        << ", not enough responsive peers";
939
    // There are not enough peers with which the last message exchange was successful.
940
34.1k
    return Policy::NotEnoughPeersValue();
941
34.1k
  }
942
943
  // If there are 5 peers (and num_peers_required is 3), and we have successfully replicated
944
  // something to 3 of them and 4th is our local peer, there are two possibilities:
945
  // - If local_peer_infinite_watermark is false (for OpId): watermarks.size() is 4,
946
  //   and we want an OpId value such that 3 or more peers have replicated that or greater value.
947
  //   Then index_of_interest = 1, computed as watermarks.size() - num_peers_required, or
948
  //   num_responsive_peers - num_peers_required.
949
  //
950
  // - If local_peer_infinite_watermark is true (for leader leases): watermarks.size() is 3, and we
951
  //   are assuming that the local peer (leader) has replicated an infinitely high watermark to
952
  //   itself. Then watermark.size() is 3 (because we skip the local peer when populating
953
  //   watermarks), but num_responsive_peers is still 4, and the expression stays the same.
954
955
14.8M
  const size_t index_of_interest = num_responsive_peers - num_peers_required;
956
14.8M
  DCHECK_LT(index_of_interest, watermarks.size());
957
958
14.8M
  auto nth = watermarks.begin() + index_of_interest;
959
14.8M
  std::nth_element(watermarks.begin(), nth, watermarks.end(), typename Policy::Comparator());
960
889
  VLOG_WITH_PREFIX_UNLOCKED(2)
961
889
      << Policy::Name() << " watermarks by peer: " << ::yb::ToString(watermarks)
962
889
      << ", num_peers_required=" << num_peers_required
963
889
      << ", local_peer_infinite_watermark=" << local_peer_infinite_watermark
964
889
      << ", watermark: " << yb::ToString(*nth);
965
966
14.8M
  return *nth;
967
14.8M
}
consensus_queue.cc:_ZN2yb9consensus16PeerMessageQueue12GetWatermarkIZNS1_20NumSSTFilesWatermarkEvE6PolicyEENT_11result_typeEv
Line
Count
Source
881
15.1M
typename Policy::result_type PeerMessageQueue::GetWatermark() {
882
15.1M
  DCHECK(queue_lock_.is_locked());
883
15.1M
  const auto num_peers_required = queue_state_.majority_size_;
884
15.1M
  if (num_peers_required == kUninitializedMajoritySize) {
885
    // We don't even know the quorum majority size yet.
886
0
    return Policy::NotEnoughPeersValue();
887
0
  }
888
15.1M
  CHECK_GE(num_peers_required, 0);
889
890
15.1M
  const ssize_t num_peers = peers_map_.size();
891
15.1M
  if (num_peers < num_peers_required) {
892
125
    return Policy::NotEnoughPeersValue();
893
125
  }
894
895
  // This flag indicates whether to implicitly assume that the local peer has an "infinite"
896
  // replicated value of the dimension that we are computing a watermark for. There is a difference
897
  // in logic between handling of OpIds vs. leader leases:
898
  // - For OpIds, the local peer might actually be less up-to-date than followers.
899
  // - For leader leases, we always assume that we've replicated an "infinite" lease to ourselves.
900
15.1M
  const bool local_peer_infinite_watermark =
901
15.1M
      HasMemberFunction_InfiniteWatermarkForLocalPeer<Policy>::value;
902
903
15.1M
  if (num_peers_required == 1 && local_peer_infinite_watermark) {
904
    // We give "infinite lease" to ourselves.
905
0
    return GetInfiniteWatermarkForLocalPeer<
906
0
        Policy, HasMemberFunction_InfiniteWatermarkForLocalPeer<Policy>::value>::Apply();
907
0
  }
908
909
15.1M
  constexpr size_t kMaxPracticalReplicationFactor = 5;
910
15.1M
  boost::container::small_vector<
911
15.1M
      typename Policy::result_type, kMaxPracticalReplicationFactor> watermarks;
912
15.1M
  watermarks.reserve(num_peers - 1 + !local_peer_infinite_watermark);
913
914
45.4M
  for (const PeersMap::value_type &peer_map_entry : peers_map_) {
915
45.4M
    const TrackedPeer &peer = *peer_map_entry.second;
916
45.4M
    if (local_peer_infinite_watermark && peer.uuid == local_peer_uuid_) {
917
      // Don't even include the local peer in the watermarks array. Assume it has an "infinite"
918
      // value of the watermark.
919
0
      continue;
920
0
    }
921
45.4M
    if (!IsRaftConfigVoter(peer.uuid, *queue_state_.active_config)) {
922
      // Only votes from VOTERs in the active config should be taken into consideration
923
413k
      continue;
924
413k
    }
925
45.0M
    if (peer.is_last_exchange_successful) {
926
44.8M
      watermarks.push_back(Policy::ExtractValue(peer));
927
44.8M
    }
928
45.0M
  }
929
930
  // We always assume that local peer has most recent information.
931
15.1M
  const ssize_t num_responsive_peers = watermarks.size() + local_peer_infinite_watermark;
932
933
15.1M
  if (num_responsive_peers < num_peers_required) {
934
18.4E
    VLOG_WITH_PREFIX_UNLOCKED(2)
935
18.4E
        << Policy::Name() << " watermarks by peer: " << ::yb::ToString(watermarks)
936
18.4E
        << ", num_peers_required=" << num_peers_required
937
18.4E
        << ", num_responsive_peers=" << num_responsive_peers
938
18.4E
        << ", not enough responsive peers";
939
    // There are not enough peers with which the last message exchange was successful.
940
34.1k
    return Policy::NotEnoughPeersValue();
941
34.1k
  }
942
943
  // If there are 5 peers (and num_peers_required is 3), and we have successfully replicated
944
  // something to 3 of them and 4th is our local peer, there are two possibilities:
945
  // - If local_peer_infinite_watermark is false (for OpId): watermarks.size() is 4,
946
  //   and we want an OpId value such that 3 or more peers have replicated that or greater value.
947
  //   Then index_of_interest = 1, computed as watermarks.size() - num_peers_required, or
948
  //   num_responsive_peers - num_peers_required.
949
  //
950
  // - If local_peer_infinite_watermark is true (for leader leases): watermarks.size() is 3, and we
951
  //   are assuming that the local peer (leader) has replicated an infinitely high watermark to
952
  //   itself. Then watermark.size() is 3 (because we skip the local peer when populating
953
  //   watermarks), but num_responsive_peers is still 4, and the expression stays the same.
954
955
15.1M
  const size_t index_of_interest = num_responsive_peers - num_peers_required;
956
15.1M
  DCHECK_LT(index_of_interest, watermarks.size());
957
958
15.1M
  auto nth = watermarks.begin() + index_of_interest;
959
15.1M
  std::nth_element(watermarks.begin(), nth, watermarks.end(), typename Policy::Comparator());
960
18.4E
  VLOG_WITH_PREFIX_UNLOCKED(2)
961
18.4E
      << Policy::Name() << " watermarks by peer: " << ::yb::ToString(watermarks)
962
18.4E
      << ", num_peers_required=" << num_peers_required
963
18.4E
      << ", local_peer_infinite_watermark=" << local_peer_infinite_watermark
964
18.4E
      << ", watermark: " << yb::ToString(*nth);
965
966
15.1M
  return *nth;
967
15.1M
}
consensus_queue.cc:_ZN2yb9consensus16PeerMessageQueue12GetWatermarkIZNS1_13OpIdWatermarkEvE6PolicyEENT_11result_typeEv
Line
Count
Source
881
15.1M
typename Policy::result_type PeerMessageQueue::GetWatermark() {
882
15.1M
  DCHECK(queue_lock_.is_locked());
883
15.1M
  const auto num_peers_required = queue_state_.majority_size_;
884
15.1M
  if (num_peers_required == kUninitializedMajoritySize) {
885
    // We don't even know the quorum majority size yet.
886
0
    return Policy::NotEnoughPeersValue();
887
0
  }
888
15.1M
  CHECK_GE(num_peers_required, 0);
889
890
15.1M
  const ssize_t num_peers = peers_map_.size();
891
15.1M
  if (num_peers < num_peers_required) {
892
125
    return Policy::NotEnoughPeersValue();
893
125
  }
894
895
  // This flag indicates whether to implicitly assume that the local peer has an "infinite"
896
  // replicated value of the dimension that we are computing a watermark for. There is a difference
897
  // in logic between handling of OpIds vs. leader leases:
898
  // - For OpIds, the local peer might actually be less up-to-date than followers.
899
  // - For leader leases, we always assume that we've replicated an "infinite" lease to ourselves.
900
15.1M
  const bool local_peer_infinite_watermark =
901
15.1M
      HasMemberFunction_InfiniteWatermarkForLocalPeer<Policy>::value;
902
903
15.1M
  if (num_peers_required == 1 && local_peer_infinite_watermark) {
904
    // We give "infinite lease" to ourselves.
905
0
    return GetInfiniteWatermarkForLocalPeer<
906
0
        Policy, HasMemberFunction_InfiniteWatermarkForLocalPeer<Policy>::value>::Apply();
907
0
  }
908
909
15.1M
  constexpr size_t kMaxPracticalReplicationFactor = 5;
910
15.1M
  boost::container::small_vector<
911
15.1M
      typename Policy::result_type, kMaxPracticalReplicationFactor> watermarks;
912
15.1M
  watermarks.reserve(num_peers - 1 + !local_peer_infinite_watermark);
913
914
45.4M
  for (const PeersMap::value_type &peer_map_entry : peers_map_) {
915
45.4M
    const TrackedPeer &peer = *peer_map_entry.second;
916
45.4M
    if (local_peer_infinite_watermark && peer.uuid == local_peer_uuid_) {
917
      // Don't even include the local peer in the watermarks array. Assume it has an "infinite"
918
      // value of the watermark.
919
0
      continue;
920
0
    }
921
45.4M
    if (!IsRaftConfigVoter(peer.uuid, *queue_state_.active_config)) {
922
      // Only votes from VOTERs in the active config should be taken into consideration
923
412k
      continue;
924
412k
    }
925
45.0M
    if (peer.is_last_exchange_successful) {
926
44.8M
      watermarks.push_back(Policy::ExtractValue(peer));
927
44.8M
    }
928
45.0M
  }
929
930
  // We always assume that local peer has most recent information.
931
15.1M
  const ssize_t num_responsive_peers = watermarks.size() + local_peer_infinite_watermark;
932
933
15.1M
  if (num_responsive_peers < num_peers_required) {
934
18.4E
    VLOG_WITH_PREFIX_UNLOCKED(2)
935
18.4E
        << Policy::Name() << " watermarks by peer: " << ::yb::ToString(watermarks)
936
18.4E
        << ", num_peers_required=" << num_peers_required
937
18.4E
        << ", num_responsive_peers=" << num_responsive_peers
938
18.4E
        << ", not enough responsive peers";
939
    // There are not enough peers with which the last message exchange was successful.
940
34.1k
    return Policy::NotEnoughPeersValue();
941
34.1k
  }
942
943
  // If there are 5 peers (and num_peers_required is 3), and we have successfully replicated
944
  // something to 3 of them and 4th is our local peer, there are two possibilities:
945
  // - If local_peer_infinite_watermark is false (for OpId): watermarks.size() is 4,
946
  //   and we want an OpId value such that 3 or more peers have replicated that or greater value.
947
  //   Then index_of_interest = 1, computed as watermarks.size() - num_peers_required, or
948
  //   num_responsive_peers - num_peers_required.
949
  //
950
  // - If local_peer_infinite_watermark is true (for leader leases): watermarks.size() is 3, and we
951
  //   are assuming that the local peer (leader) has replicated an infinitely high watermark to
952
  //   itself. Then watermark.size() is 3 (because we skip the local peer when populating
953
  //   watermarks), but num_responsive_peers is still 4, and the expression stays the same.
954
955
15.1M
  const size_t index_of_interest = num_responsive_peers - num_peers_required;
956
15.1M
  DCHECK_LT(index_of_interest, watermarks.size());
957
958
15.1M
  auto nth = watermarks.begin() + index_of_interest;
959
15.1M
  std::nth_element(watermarks.begin(), nth, watermarks.end(), typename Policy::Comparator());
960
18.4E
  VLOG_WITH_PREFIX_UNLOCKED(2)
961
18.4E
      << Policy::Name() << " watermarks by peer: " << ::yb::ToString(watermarks)
962
18.4E
      << ", num_peers_required=" << num_peers_required
963
18.4E
      << ", local_peer_infinite_watermark=" << local_peer_infinite_watermark
964
18.4E
      << ", watermark: " << yb::ToString(*nth);
965
966
15.1M
  return *nth;
967
15.1M
}
968
969
15.1M
CoarseTimePoint PeerMessageQueue::LeaderLeaseExpirationWatermark() {
970
15.1M
  struct Policy {
971
15.1M
    typedef CoarseTimePoint result_type;
972
    // Workaround for a gcc bug. That does not understand that Comparator is actually being used.
973
15.1M
    __attribute__((unused)) typedef std::less<result_type> Comparator;
974
975
34.2k
    static result_type NotEnoughPeersValue() {
976
34.2k
      return result_type::min();
977
34.2k
    }
978
979
297k
    static result_type InfiniteWatermarkForLocalPeer() {
980
297k
      return result_type::max();
981
297k
    }
982
983
29.6M
    static result_type ExtractValue(const TrackedPeer& peer) {
984
29.6M
      auto lease_exp = peer.leader_lease_expiration.last_received;
985
29.6M
      return lease_exp != CoarseTimePoint() ? lease_exp : CoarseTimePoint::min();
986
29.6M
    }
987
988
2
    static const char* Name() {
989
2
      return "Leader lease expiration";
990
2
    }
991
15.1M
  };
992
993
15.1M
  return GetWatermark<Policy>();
994
15.1M
}
995
996
15.1M
MicrosTime PeerMessageQueue::HybridTimeLeaseExpirationWatermark() {
997
15.1M
  struct Policy {
998
15.1M
    typedef MicrosTime result_type;
999
    // Workaround for a gcc bug. That does not understand that Comparator is actually being used.
1000
15.1M
    __attribute__((unused)) typedef std::less<result_type> Comparator;
1001
1002
34.2k
    static result_type NotEnoughPeersValue() {
1003
34.2k
      return HybridTime::kMin.GetPhysicalValueMicros();
1004
34.2k
    }
1005
1006
297k
    static result_type InfiniteWatermarkForLocalPeer() {
1007
297k
      return HybridTime::kMax.GetPhysicalValueMicros();
1008
297k
    }
1009
1010
29.6M
    static result_type ExtractValue(const TrackedPeer& peer) {
1011
29.6M
      return peer.leader_ht_lease_expiration.last_received;
1012
29.6M
    }
1013
1014
2
    static const char* Name() {
1015
2
      return "Hybrid time leader lease expiration";
1016
2
    }
1017
15.1M
  };
1018
1019
15.1M
  return GetWatermark<Policy>();
1020
15.1M
}
1021
1022
15.1M
uint64_t PeerMessageQueue::NumSSTFilesWatermark() {
1023
15.1M
  struct Policy {
1024
15.1M
    typedef uint64_t result_type;
1025
    // Workaround for a gcc bug. That does not understand that Comparator is actually being used.
1026
15.1M
    __attribute__((unused)) typedef std::greater<result_type> Comparator;
1027
1028
34.3k
    static result_type NotEnoughPeersValue() {
1029
34.3k
      return 0;
1030
34.3k
    }
1031
1032
44.8M
    static result_type ExtractValue(const TrackedPeer& peer) {
1033
44.8M
      return peer.num_sst_files;
1034
44.8M
    }
1035
1036
0
    static const char* Name() {
1037
0
      return "Num SST files";
1038
0
    }
1039
15.1M
  };
1040
1041
15.1M
  auto watermark = GetWatermark<Policy>();
1042
15.1M
  return std::max(watermark, local_peer_->num_sst_files);
1043
15.1M
}
1044
1045
15.1M
OpId PeerMessageQueue::OpIdWatermark() {
1046
15.1M
  struct Policy {
1047
15.1M
    typedef OpId result_type;
1048
1049
34.3k
    static result_type NotEnoughPeersValue() {
1050
34.3k
      return OpId::Min();
1051
34.3k
    }
1052
1053
44.8M
    static result_type ExtractValue(const TrackedPeer& peer) {
1054
44.8M
      return peer.last_received;
1055
44.8M
    }
1056
1057
15.1M
    struct Comparator {
1058
33.1M
      bool operator()(const OpId& lhs, const OpId& rhs) {
1059
33.1M
        return lhs.index < rhs.index;
1060
33.1M
      }
1061
15.1M
    };
1062
1063
0
    static const char* Name() {
1064
0
      return "OpId";
1065
0
    }
1066
15.1M
  };
1067
1068
15.1M
  return GetWatermark<Policy>();
1069
15.1M
}
1070
1071
11.0k
void PeerMessageQueue::NotifyPeerIsResponsiveDespiteError(const std::string& peer_uuid) {
1072
11.0k
  LockGuard l(queue_lock_);
1073
11.0k
  TrackedPeer* peer = FindPtrOrNull(peers_map_, peer_uuid);
1074
11.0k
  if (!peer) return;
1075
11.0k
  peer->last_successful_communication_time = MonoTime::Now();
1076
11.0k
}
1077
1078
13.2k
void PeerMessageQueue::RequestWasNotSent(const std::string& peer_uuid) {
1079
13.2k
  LockGuard scoped_lock(queue_lock_);
1080
13.2k
  DCHECK_NE(State::kQueueConstructed, queue_state_.state);
1081
1082
13.2k
  TrackedPeer* peer = FindPtrOrNull(peers_map_, peer_uuid);
1083
13.2k
  if (PREDICT_FALSE(queue_state_.state != State::kQueueOpen || peer == nullptr)) {
1084
0
    LOG_WITH_PREFIX_UNLOCKED(WARNING) << "Queue is closed or peer was untracked.";
1085
0
    return;
1086
0
  }
1087
1088
13.2k
  peer->ResetLastRequest();
1089
13.2k
}
1090
1091
1092
bool PeerMessageQueue::ResponseFromPeer(const std::string& peer_uuid,
1093
15.2M
                                        const ConsensusResponsePB& response) {
1094
19.9k
  DCHECK(response.IsInitialized()) << "Error: Uninitialized: "
1095
19.9k
      << response.InitializationErrorString() << ". Response: " << response.ShortDebugString();
1096
1097
15.2M
  MajorityReplicatedData majority_replicated;
1098
15.2M
  Mode mode_copy;
1099
15.2M
  bool result = false;
1100
15.2M
  {
1101
15.2M
    LockGuard scoped_lock(queue_lock_);
1102
15.2M
    DCHECK_NE(State::kQueueConstructed, queue_state_.state);
1103
1104
15.2M
    TrackedPeer* peer = FindPtrOrNull(peers_map_, peer_uuid);
1105
15.2M
    if (PREDICT_FALSE(queue_state_.state != State::kQueueOpen || peer == nullptr)) {
1106
5
      LOG_WITH_PREFIX_UNLOCKED(WARNING) << "Queue is closed or peer was untracked, disregarding "
1107
5
          "peer response. Response: " << response.ShortDebugString();
1108
5
      return false;
1109
5
    }
1110
1111
    // Remotely bootstrap the peer if the tablet is not found or deleted.
1112
15.2M
    if (response.has_error()) {
1113
      // We only let special types of errors through to this point from the peer.
1114
0
      CHECK_EQ(tserver::TabletServerErrorPB::TABLET_NOT_FOUND, response.error().code())
1115
0
          << response.ShortDebugString();
1116
1117
4.91k
      peer->needs_remote_bootstrap = true;
1118
      // Since we received a response from the peer, we know it is alive. So we need to update
1119
      // peer->last_successful_communication_time, otherwise, we will remove this peer from the
1120
      // configuration if the remote bootstrap is not completed within
1121
      // FLAGS_follower_unavailable_considered_failed_sec seconds.
1122
4.91k
      peer->last_successful_communication_time = MonoTime::Now();
1123
272
      YB_LOG_WITH_PREFIX_UNLOCKED_EVERY_N_SECS(INFO, 30)
1124
272
          << "Marked peer as needing remote bootstrap: " << peer->ToString();
1125
4.91k
      return true;
1126
4.91k
    }
1127
1128
15.2M
    if (queue_state_.active_config) {
1129
15.2M
      RaftPeerPB peer_pb;
1130
15.2M
      if (!GetRaftConfigMember(*queue_state_.active_config, peer_uuid, &peer_pb).ok()) {
1131
0
        LOG(FATAL) << "Peer " << peer_uuid << " not in active config";
1132
0
      }
1133
15.2M
      peer->member_type = peer_pb.member_type();
1134
207
    } else {
1135
207
      peer->member_type = PeerMemberType::UNKNOWN_MEMBER_TYPE;
1136
207
    }
1137
1138
    // Application level errors should be handled elsewhere
1139
15.2M
    DCHECK(!response.has_error());
1140
1141
    // Take a snapshot of the current peer status.
1142
15.2M
    TrackedPeer previous = *peer;
1143
1144
    // Update the peer status based on the response.
1145
15.2M
    peer->is_new = false;
1146
15.2M
    peer->last_successful_communication_time = MonoTime::Now();
1147
1148
15.2M
    peer->ResetLastRequest();
1149
1150
15.2M
    if (response.has_status()) {
1151
15.2M
      const auto& status = response.status();
1152
      // Sanity checks.  Some of these can be eventually removed, but they are handy for now.
1153
39
      DCHECK(status.IsInitialized()) << "Error: Uninitialized: "
1154
39
                                                << response.InitializationErrorString()
1155
39
                                                << ". Response: " << response.ShortDebugString();
1156
      // The status must always have a last received op id and a last committed index.
1157
15.2M
      DCHECK(status.has_last_received());
1158
15.2M
      DCHECK(status.has_last_received_current_leader());
1159
15.2M
      DCHECK(status.has_last_committed_idx());
1160
1161
15.2M
      peer->last_known_committed_idx = status.last_committed_idx();
1162
15.2M
      peer->last_applied = OpId::FromPB(status.last_applied());
1163
1164
      // If the reported last-received op for the replica is in our local log, then resume sending
1165
      // entries from that point onward. Otherwise, resume after the last op they received from us.
1166
      // If we've never successfully sent them anything, start after the last-committed op in their
1167
      // log, which is guaranteed by the Raft protocol to be a valid op.
1168
1169
15.2M
      bool peer_has_prefix_of_log = IsOpInLog(yb::OpId::FromPB(status.last_received()));
1170
15.2M
      if (peer_has_prefix_of_log) {
1171
        // If the latest thing in their log is in our log, we are in sync.
1172
15.2M
        peer->last_received = OpId::FromPB(status.last_received());
1173
15.2M
        peer->next_index = peer->last_received.index + 1;
1174
1175
18.4E
      } else if (!OpIdEquals(status.last_received_current_leader(), MinimumOpId())) {
1176
        // Their log may have diverged from ours, however we are in the process of replicating our
1177
        // ops to them, so continue doing so. Eventually, we will cause the divergent entry in their
1178
        // log to be overwritten.
1179
4
        peer->last_received = OpId::FromPB(status.last_received_current_leader());
1180
4
        peer->next_index = peer->last_received.index + 1;
1181
18.4E
      } else {
1182
        // The peer is divergent and they have not (successfully) received anything from us yet.
1183
        // Start sending from their last committed index.  This logic differs from the Raft spec
1184
        // slightly because instead of stepping back one-by-one from the end until we no longer have
1185
        // an LMP error, we jump back to the last committed op indicated by the peer with the hope
1186
        // that doing so will result in a faster catch-up process.
1187
18.4E
        DCHECK_GE(peer->last_known_committed_idx, 0);
1188
18.4E
        peer->next_index = peer->last_known_committed_idx + 1;
1189
18.4E
      }
1190
1191
15.2M
      if (PREDICT_FALSE(status.has_error())) {
1192
68.2k
        peer->is_last_exchange_successful = false;
1193
68.2k
        switch (status.error().code()) {
1194
68.2k
          case ConsensusErrorPB::PRECEDING_ENTRY_DIDNT_MATCH: {
1195
68.2k
            DCHECK(status.has_last_received());
1196
68.2k
            if (previous.is_new) {
1197
              // That's currently how we can detect that we able to connect to a peer.
1198
68.1k
              LOG_WITH_PREFIX_UNLOCKED(INFO) << "Connected to new peer: " << peer->ToString();
1199
94
            } else {
1200
94
              LOG_WITH_PREFIX_UNLOCKED(INFO) << "Got LMP mismatch error from peer: "
1201
94
                                             << peer->ToString();
1202
94
              CHECK(!FLAGS_TEST_disallow_lmp_failures);
1203
94
            }
1204
68.2k
            return true;
1205
0
          }
1206
19
          case ConsensusErrorPB::INVALID_TERM: {
1207
19
            CHECK(response.has_responder_term());
1208
19
            LOG_WITH_PREFIX_UNLOCKED(INFO) << "Peer responded invalid term: " << peer->ToString()
1209
19
                                           << ". Peer's new term: " << response.responder_term();
1210
19
            NotifyObserversOfTermChange(response.responder_term());
1211
19
            return false;
1212
0
          }
1213
0
          default: {
1214
0
            LOG_WITH_PREFIX_UNLOCKED(FATAL) << "Unexpected consensus error. Code: "
1215
0
                << ConsensusErrorPB::Code_Name(status.error().code()) << ". Response: "
1216
0
                << response.ShortDebugString();
1217
0
          }
1218
68.2k
        }
1219
68.2k
      }
1220
15.2M
    }
1221
1222
15.1M
    peer->is_last_exchange_successful = true;
1223
15.1M
    peer->num_sst_files = response.num_sst_files();
1224
1225
15.1M
    if (response.has_responder_term()) {
1226
      // The peer must have responded with a term that is greater than or equal to the last known
1227
      // term for that peer.
1228
10.2M
      peer->CheckMonotonicTerms(response.responder_term());
1229
1230
      // If the responder didn't send an error back that must mean that it has a term that is the
1231
      // same or lower than ours.
1232
10.2M
      CHECK_LE(response.responder_term(), queue_state_.current_term);
1233
10.2M
    }
1234
1235
15.1M
    if (PREDICT_FALSE(VLOG_IS_ON(2))) {
1236
0
      VLOG_WITH_PREFIX_UNLOCKED(2) << "Received Response from Peer (" << peer->ToString() << "). "
1237
0
          << "Response: " << response.ShortDebugString();
1238
0
    }
1239
1240
    // If our log has the next request for the peer or if the peer's committed index is lower than
1241
    // our own, set 'more_pending' to true.
1242
15.1M
    result = log_cache_.HasOpBeenWritten(peer->next_index) ||
1243
13.9M
        (peer->last_known_committed_idx < queue_state_.committed_op_id.index);
1244
1245
15.1M
    mode_copy = queue_state_.mode;
1246
15.1M
    if (mode_copy == Mode::LEADER) {
1247
15.1M
      auto new_majority_replicated_opid = OpIdWatermark();
1248
15.1M
      if (new_majority_replicated_opid != OpId::Min()) {
1249
15.1M
        if (new_majority_replicated_opid.index == MaximumOpId().index()) {
1250
0
          queue_state_.majority_replicated_op_id = local_peer_->last_received;
1251
15.1M
        } else {
1252
15.1M
          queue_state_.majority_replicated_op_id = new_majority_replicated_opid;
1253
15.1M
        }
1254
15.1M
      }
1255
1256
15.1M
      peer->leader_lease_expiration.OnReplyFromFollower();
1257
15.1M
      peer->leader_ht_lease_expiration.OnReplyFromFollower();
1258
1259
15.1M
      majority_replicated.op_id = queue_state_.majority_replicated_op_id;
1260
15.1M
      majority_replicated.leader_lease_expiration = LeaderLeaseExpirationWatermark();
1261
15.1M
      majority_replicated.ht_lease_expiration = HybridTimeLeaseExpirationWatermark();
1262
15.1M
      majority_replicated.num_sst_files = NumSSTFilesWatermark();
1263
15.1M
      if (peer->last_received == queue_state_.last_applied_op_id) {
1264
9.06M
        majority_replicated.peer_got_all_ops = peer->uuid;
1265
9.06M
      }
1266
15.1M
    }
1267
1268
15.1M
    UpdateAllReplicatedOpId(&queue_state_.all_replicated_op_id);
1269
15.1M
    UpdateAllAppliedOpId(&queue_state_.all_applied_op_id);
1270
1271
15.1M
    auto evict_index = GetCDCConsumerOpIdToEvict().index;
1272
1273
15.1M
    int32_t lagging_follower_threshold = FLAGS_consensus_lagging_follower_threshold;
1274
15.1M
    if (lagging_follower_threshold > 0) {
1275
15.1M
      UpdateAllNonLaggingReplicatedOpId(lagging_follower_threshold);
1276
15.1M
      evict_index = std::min(evict_index, queue_state_.all_nonlagging_replicated_op_id.index);
1277
2.52k
    } else {
1278
2.52k
      evict_index = std::min(evict_index, queue_state_.all_replicated_op_id.index);
1279
2.52k
    }
1280
1281
15.1M
    log_cache_.EvictThroughOp(evict_index);
1282
1283
15.1M
    UpdateMetrics();
1284
15.1M
  }
1285
1286
15.1M
  if (mode_copy == Mode::LEADER) {
1287
15.1M
    NotifyObserversOfMajorityReplOpChange(majority_replicated);
1288
15.1M
  }
1289
1290
15.1M
  return result;
1291
15.2M
}
1292
1293
11
PeerMessageQueue::TrackedPeer PeerMessageQueue::GetTrackedPeerForTests(string uuid) {
1294
11
  LockGuard scoped_lock(queue_lock_);
1295
11
  TrackedPeer* tracked = FindOrDie(peers_map_, uuid);
1296
11
  return *tracked;
1297
11
}
1298
1299
13
OpId PeerMessageQueue::TEST_GetAllReplicatedIndex() const {
1300
13
  LockGuard lock(queue_lock_);
1301
13
  return queue_state_.all_replicated_op_id;
1302
13
}
1303
1304
68
OpId PeerMessageQueue::GetAllAppliedOpId() const {
1305
68
  LockGuard lock(queue_lock_);
1306
68
  return queue_state_.all_applied_op_id;
1307
68
}
1308
1309
5
OpId PeerMessageQueue::TEST_GetCommittedIndex() const {
1310
5
  LockGuard lock(queue_lock_);
1311
5
  return queue_state_.committed_op_id;
1312
5
}
1313
1314
11
OpId PeerMessageQueue::TEST_GetMajorityReplicatedOpId() const {
1315
11
  LockGuard lock(queue_lock_);
1316
11
  return queue_state_.majority_replicated_op_id;
1317
11
}
1318
1319
2
OpId PeerMessageQueue::TEST_GetLastAppended() const {
1320
2
  LockGuard lock(queue_lock_);
1321
2
  return queue_state_.last_appended;
1322
2
}
1323
1324
16
OpId PeerMessageQueue::TEST_GetLastAppliedOpId() const {
1325
16
  LockGuard lock(queue_lock_);
1326
16
  return queue_state_.last_applied_op_id;
1327
16
}
1328
1329
30.5M
void PeerMessageQueue::UpdateMetrics() {
1330
  // Since operations have consecutive indices we can update the metrics based on simple index math.
1331
30.5M
  metrics_.num_majority_done_ops->set_value(
1332
30.5M
      queue_state_.committed_op_id.index - queue_state_.all_replicated_op_id.index);
1333
30.5M
  metrics_.num_in_progress_ops->set_value(
1334
30.5M
      queue_state_.last_appended.index - queue_state_.committed_op_id.index);
1335
30.5M
}
1336
1337
47
void PeerMessageQueue::DumpToHtml(std::ostream& out) const {
1338
47
  using std::endl;
1339
1340
47
  LockGuard lock(queue_lock_);
1341
47
  out << "<h3>Watermarks</h3>" << endl;
1342
47
  out << "<table>" << endl;;
1343
47
  out << "  <tr><th>Peer</th><th>Watermark</th></tr>" << endl;
1344
126
  for (const PeersMap::value_type& entry : peers_map_) {
1345
126
    out << Substitute("  <tr><td>$0</td><td>$1</td></tr>",
1346
126
                      EscapeForHtmlToString(entry.first),
1347
126
                      EscapeForHtmlToString(entry.second->ToString())) << endl;
1348
126
  }
1349
47
  out << "</table>" << endl;
1350
1351
47
  log_cache_.DumpToHtml(out);
1352
47
}
1353
1354
95.6k
void PeerMessageQueue::ClearUnlocked() {
1355
95.6k
  STLDeleteValues(&peers_map_);
1356
95.6k
  queue_state_.state = State::kQueueClosed;
1357
95.6k
}
1358
1359
95.6k
void PeerMessageQueue::Close() {
1360
95.6k
  if (installed_num_sst_files_changed_listener_) {
1361
47.7k
    context_->ListenNumSSTFilesChanged(std::function<void()>());
1362
47.7k
    installed_num_sst_files_changed_listener_ = false;
1363
47.7k
  }
1364
95.6k
  raft_pool_observers_token_->Shutdown();
1365
95.6k
  LockGuard lock(queue_lock_);
1366
95.6k
  ClearUnlocked();
1367
95.6k
}
1368
1369
94
string PeerMessageQueue::ToString() const {
1370
  // Even though metrics are thread-safe obtain the lock so that we get a "consistent" snapshot of
1371
  // the metrics.
1372
94
  LockGuard lock(queue_lock_);
1373
94
  return ToStringUnlocked();
1374
94
}
1375
1376
94
string PeerMessageQueue::ToStringUnlocked() const {
1377
94
  return Substitute("Consensus queue metrics:"
1378
94
                    "Only Majority Done Ops: $0, In Progress Ops: $1, Cache: $2",
1379
94
                    metrics_.num_majority_done_ops->value(), metrics_.num_in_progress_ops->value(),
1380
94
                    log_cache_.StatsString());
1381
94
}
1382
1383
35.1k
void PeerMessageQueue::RegisterObserver(PeerMessageQueueObserver* observer) {
1384
35.1k
  LockGuard lock(queue_lock_);
1385
35.1k
  auto iter = std::find(observers_.begin(), observers_.end(), observer);
1386
35.1k
  if (iter == observers_.end()) {
1387
35.1k
    observers_.push_back(observer);
1388
35.1k
  }
1389
35.1k
}
1390
1391
93.6k
Status PeerMessageQueue::UnRegisterObserver(PeerMessageQueueObserver* observer) {
1392
93.6k
  LockGuard lock(queue_lock_);
1393
93.6k
  auto iter = std::find(observers_.begin(), observers_.end(), observer);
1394
93.6k
  if (iter == observers_.end()) {
1395
88.7k
    return STATUS(NotFound, "Can't find observer.");
1396
88.7k
  }
1397
4.90k
  observers_.erase(iter);
1398
4.90k
  return Status::OK();
1399
4.90k
}
1400
1401
332k
const char* PeerMessageQueue::ModeToStr(Mode mode) {
1402
332k
  switch (mode) {
1403
145k
    case Mode::LEADER: return "LEADER";
1404
187k
    case Mode::NON_LEADER: return "NON_LEADER";
1405
0
  }
1406
0
  FATAL_INVALID_ENUM_VALUE(PeerMessageQueue::Mode, mode);
1407
0
}
1408
1409
131k
const char* PeerMessageQueue::StateToStr(State state) {
1410
131k
  switch (state) {
1411
0
    case State::kQueueConstructed:
1412
0
      return "QUEUE_CONSTRUCTED";
1413
131k
    case State::kQueueOpen:
1414
131k
      return "QUEUE_OPEN";
1415
0
    case State::kQueueClosed:
1416
0
      return "QUEUE_CLOSED";
1417
1418
0
  }
1419
0
  FATAL_INVALID_ENUM_VALUE(PeerMessageQueue::State, state);
1420
0
}
1421
1422
15.2M
bool PeerMessageQueue::IsOpInLog(const yb::OpId& desired_op) const {
1423
15.2M
  auto result = log_cache_.LookupOpId(desired_op.index);
1424
15.2M
  if (PREDICT_TRUE(result.ok())) {
1425
15.2M
    return desired_op == *result;
1426
15.2M
  }
1427
18.4E
  if (PREDICT_TRUE(result.status().IsNotFound() || result.status().IsIncomplete())) {
1428
3
    return false;
1429
3
  }
1430
18.4E
  LOG_WITH_PREFIX_UNLOCKED(FATAL) << "Error while reading the log: " << result.status();
1431
18.4E
  return false; // Unreachable; here to squelch GCC warning.
1432
18.4E
}
1433
1434
void PeerMessageQueue::NotifyObserversOfMajorityReplOpChange(
1435
15.1M
    const MajorityReplicatedData& majority_replicated_data) {
1436
15.1M
  WARN_NOT_OK(raft_pool_observers_token_->SubmitClosure(
1437
15.1M
      Bind(&PeerMessageQueue::NotifyObserversOfMajorityReplOpChangeTask,
1438
15.1M
           Unretained(this),
1439
15.1M
           majority_replicated_data)),
1440
15.1M
      LogPrefixUnlocked() + "Unable to notify RaftConsensus of "
1441
15.1M
                           "majority replicated op change.");
1442
15.1M
}
1443
1444
template <class Func>
1445
1.62k
void PeerMessageQueue::NotifyObservers(const char* title, Func&& func) {
1446
1.62k
  WARN_NOT_OK(
1447
1.62k
      raft_pool_observers_token_->SubmitFunc(
1448
1.62k
          [this, func = std::move(func)] {
1449
1.62k
        MAYBE_INJECT_RANDOM_LATENCY(FLAGS_consensus_inject_latency_ms_in_notifications);
1450
1.62k
        std::vector<PeerMessageQueueObserver*> copy;
1451
1.62k
        {
1452
1.62k
          LockGuard lock(queue_lock_);
1453
1.62k
          copy = observers_;
1454
1.62k
        }
1455
1456
1.62k
        for (PeerMessageQueueObserver* observer : copy) {
1457
1.62k
          func(observer);
1458
1.62k
        }
1459
1.62k
      }),
1460
1.62k
      Format("$0Unable to notify observers for $1.", LogPrefixUnlocked(), title));
1461
1.62k
}
consensus_queue.cc:_ZN2yb9consensus16PeerMessageQueue15NotifyObserversIZNS1_18NumSSTFilesChangedEvE3$_0EEvPKcOT_
Line
Count
Source
1445
1.44k
void PeerMessageQueue::NotifyObservers(const char* title, Func&& func) {
1446
1.44k
  WARN_NOT_OK(
1447
1.44k
      raft_pool_observers_token_->SubmitFunc(
1448
1.44k
          [this, func = std::move(func)] {
1449
1.44k
        MAYBE_INJECT_RANDOM_LATENCY(FLAGS_consensus_inject_latency_ms_in_notifications);
1450
1.44k
        std::vector<PeerMessageQueueObserver*> copy;
1451
1.44k
        {
1452
1.44k
          LockGuard lock(queue_lock_);
1453
1.44k
          copy = observers_;
1454
1.44k
        }
1455
1456
1.44k
        for (PeerMessageQueueObserver* observer : copy) {
1457
1.44k
          func(observer);
1458
1.44k
        }
1459
1.44k
      }),
1460
1.44k
      Format("$0Unable to notify observers for $1.", LogPrefixUnlocked(), title));
1461
1.44k
}
consensus_queue.cc:_ZN2yb9consensus16PeerMessageQueue15NotifyObserversIZNS1_27NotifyObserversOfTermChangeExE3$_1EEvPKcOT_
Line
Count
Source
1445
19
void PeerMessageQueue::NotifyObservers(const char* title, Func&& func) {
1446
19
  WARN_NOT_OK(
1447
19
      raft_pool_observers_token_->SubmitFunc(
1448
19
          [this, func = std::move(func)] {
1449
19
        MAYBE_INJECT_RANDOM_LATENCY(FLAGS_consensus_inject_latency_ms_in_notifications);
1450
19
        std::vector<PeerMessageQueueObserver*> copy;
1451
19
        {
1452
19
          LockGuard lock(queue_lock_);
1453
19
          copy = observers_;
1454
19
        }
1455
1456
19
        for (PeerMessageQueueObserver* observer : copy) {
1457
19
          func(observer);
1458
19
        }
1459
19
      }),
1460
19
      Format("$0Unable to notify observers for $1.", LogPrefixUnlocked(), title));
1461
19
}
consensus_queue.cc:_ZN2yb9consensus16PeerMessageQueue15NotifyObserversIZNS1_31NotifyObserversOfFailedFollowerERKNSt3__112basic_stringIcNS3_11char_traitsIcEENS3_9allocatorIcEEEExSB_E3$_2EEvPKcOT_
Line
Count
Source
1445
160
void PeerMessageQueue::NotifyObservers(const char* title, Func&& func) {
1446
160
  WARN_NOT_OK(
1447
160
      raft_pool_observers_token_->SubmitFunc(
1448
160
          [this, func = std::move(func)] {
1449
160
        MAYBE_INJECT_RANDOM_LATENCY(FLAGS_consensus_inject_latency_ms_in_notifications);
1450
160
        std::vector<PeerMessageQueueObserver*> copy;
1451
160
        {
1452
160
          LockGuard lock(queue_lock_);
1453
160
          copy = observers_;
1454
160
        }
1455
1456
160
        for (PeerMessageQueueObserver* observer : copy) {
1457
160
          func(observer);
1458
160
        }
1459
160
      }),
1460
160
      Format("$0Unable to notify observers for $1.", LogPrefixUnlocked(), title));
1461
160
}
1462
1463
19
void PeerMessageQueue::NotifyObserversOfTermChange(int64_t term) {
1464
18
  NotifyObservers("term change", [term](PeerMessageQueueObserver* observer) {
1465
18
    observer->NotifyTermChange(term);
1466
18
  });
1467
19
}
1468
1469
void PeerMessageQueue::NotifyObserversOfMajorityReplOpChangeTask(
1470
15.1M
    const MajorityReplicatedData& majority_replicated_data) {
1471
15.1M
  std::vector<PeerMessageQueueObserver*> copy;
1472
15.1M
  {
1473
15.1M
    LockGuard lock(queue_lock_);
1474
15.1M
    copy = observers_;
1475
15.1M
  }
1476
1477
  // TODO move commit index advancement here so that the queue is not dependent on consensus at all,
1478
  // but that requires a bit more work.
1479
15.1M
  OpId new_committed_op_id;
1480
15.1M
  OpId last_applied_op_id;
1481
15.1M
  for (PeerMessageQueueObserver* observer : copy) {
1482
15.1M
    observer->UpdateMajorityReplicated(
1483
15.1M
        majority_replicated_data, &new_committed_op_id, &last_applied_op_id);
1484
15.1M
  }
1485
1486
15.1M
  {
1487
15.1M
    LockGuard lock(queue_lock_);
1488
15.1M
    if (!new_committed_op_id.empty() &&
1489
15.1M
        new_committed_op_id.index > queue_state_.committed_op_id.index) {
1490
2.47M
      queue_state_.committed_op_id = new_committed_op_id;
1491
2.47M
    }
1492
15.1M
    queue_state_.last_applied_op_id.MakeAtLeast(last_applied_op_id);
1493
15.1M
    local_peer_->last_applied = queue_state_.last_applied_op_id;
1494
15.1M
    UpdateAllAppliedOpId(&queue_state_.all_applied_op_id);
1495
15.1M
  }
1496
15.1M
}
1497
1498
void PeerMessageQueue::NotifyObserversOfFailedFollower(const string& uuid,
1499
16
                                                       const string& reason) {
1500
16
  int64_t current_term;
1501
16
  {
1502
16
    LockGuard lock(queue_lock_);
1503
16
    current_term = queue_state_.current_term;
1504
16
  }
1505
16
  NotifyObserversOfFailedFollower(uuid, current_term, reason);
1506
16
}
1507
1508
void PeerMessageQueue::NotifyObserversOfFailedFollower(const string& uuid,
1509
                                                       int64_t term,
1510
160
                                                       const string& reason) {
1511
160
  NotifyObservers("failed follower", [uuid, term, reason](PeerMessageQueueObserver* observer) {
1512
160
    observer->NotifyFailedFollower(uuid, term, reason);
1513
160
  });
1514
160
}
1515
1516
153k
bool PeerMessageQueue::PeerAcceptedOurLease(const std::string& uuid) const {
1517
153k
  std::lock_guard<simple_spinlock> lock(queue_lock_);
1518
153k
  TrackedPeer* peer = FindPtrOrNull(peers_map_, uuid);
1519
153k
  if (peer == nullptr) {
1520
14.1k
    return false;
1521
14.1k
  }
1522
1523
139k
  return peer->leader_lease_expiration.last_received != CoarseTimePoint();
1524
139k
}
1525
1526
5.09k
bool PeerMessageQueue::CanPeerBecomeLeader(const std::string& peer_uuid) const {
1527
5.09k
  std::lock_guard<simple_spinlock> lock(queue_lock_);
1528
5.09k
  TrackedPeer* peer = FindPtrOrNull(peers_map_, peer_uuid);
1529
5.09k
  if (peer == nullptr) {
1530
0
    LOG(ERROR) << "Invalid peer UUID: " << peer_uuid;
1531
0
    return false;
1532
0
  }
1533
5.09k
  const bool peer_can_be_leader = peer->last_received >= queue_state_.majority_replicated_op_id;
1534
5.09k
  if (!peer_can_be_leader) {
1535
185
    LOG(INFO) << Format(
1536
185
        "Peer $0 cannot become Leader as it is not caught up: Majority OpId $1, Peer OpId $2",
1537
185
        peer_uuid, queue_state_.majority_replicated_op_id, peer->last_received);
1538
185
  }
1539
5.09k
  return peer_can_be_leader;
1540
5.09k
}
1541
1542
4.85k
OpId PeerMessageQueue::PeerLastReceivedOpId(const TabletServerId& uuid) const {
1543
4.85k
  std::lock_guard<simple_spinlock> lock(queue_lock_);
1544
4.85k
  TrackedPeer* peer = FindPtrOrNull(peers_map_, uuid);
1545
4.85k
  if (peer == nullptr) {
1546
0
    LOG(ERROR) << "Invalid peer UUID: " << uuid;
1547
0
    return OpId::Min();
1548
0
  }
1549
4.85k
  return peer->last_received;
1550
4.85k
}
1551
1552
274
string PeerMessageQueue::GetUpToDatePeer() const {
1553
274
  OpId highest_op_id = OpId::Min();
1554
274
  std::vector<std::string> candidates;
1555
1556
274
  {
1557
274
    std::lock_guard<simple_spinlock> lock(queue_lock_);
1558
1.03k
    for (const PeersMap::value_type& entry : peers_map_) {
1559
1.03k
      if (local_peer_uuid_ == entry.first) {
1560
274
        continue;
1561
274
      }
1562
762
      if (highest_op_id > entry.second->last_received) {
1563
39
        continue;
1564
723
      } else if (highest_op_id == entry.second->last_received) {
1565
448
        candidates.push_back(entry.first);
1566
275
      } else {
1567
275
        candidates = {entry.first};
1568
275
        highest_op_id = entry.second->last_received;
1569
275
      }
1570
762
    }
1571
274
  }
1572
1573
274
  if (candidates.empty()) {
1574
0
    return string();
1575
0
  }
1576
274
  size_t index = 0;
1577
274
  if (candidates.size() > 1) {
1578
    // choose randomly among candidates at the same opid
1579
245
    index = RandomUniformInt<size_t>(0, candidates.size() - 1);
1580
245
  }
1581
274
  return candidates[index];
1582
274
}
1583
1584
47.8k
PeerMessageQueue::~PeerMessageQueue() {
1585
47.8k
  Close();
1586
47.8k
}
1587
1588
200k
string PeerMessageQueue::LogPrefixUnlocked() const {
1589
  // TODO: we should probably use an atomic here. We'll just annotate away the TSAN error for now,
1590
  // since the worst case is a slightly out-of-date log message, and not very likely.
1591
200k
  Mode mode = ANNOTATE_UNPROTECTED_READ(queue_state_.mode);
1592
200k
  return Substitute("T $0 P $1 [$2]: ",
1593
200k
                    tablet_id_,
1594
200k
                    local_peer_uuid_,
1595
200k
                    ModeToStr(mode));
1596
200k
}
1597
1598
131k
string PeerMessageQueue::QueueState::ToString() const {
1599
131k
  return Format(
1600
131k
      "All replicated op: $0, Majority replicated op: $1, Committed index: $2, Last applied: $3, "
1601
131k
      "Last appended: $4, Current term: $5, Majority size: $6, State: $7, Mode: $8$9",
1602
131k
      /* 0 */ all_replicated_op_id,
1603
131k
      /* 1 */ majority_replicated_op_id,
1604
131k
      /* 2 */ committed_op_id,
1605
131k
      /* 3 */ last_applied_op_id,
1606
131k
      /* 4 */ last_appended,
1607
131k
      /* 5 */ current_term,
1608
131k
      /* 6 */ majority_size_,
1609
131k
      /* 7 */ StateToStr(state),
1610
131k
      /* 8 */ ModeToStr(mode),
1611
93.6k
      /* 9 */ active_config ? ", active raft config: " + active_config->ShortDebugString() : "");
1612
131k
}
1613
1614
0
size_t PeerMessageQueue::LogCacheSize() {
1615
0
  return log_cache_.BytesUsed();
1616
0
}
1617
1618
0
size_t PeerMessageQueue::EvictLogCache(size_t bytes_to_evict) {
1619
0
  return log_cache_.EvictThroughOp(std::numeric_limits<int64_t>::max(), bytes_to_evict);
1620
0
}
1621
1622
0
Status PeerMessageQueue::FlushLogIndex() {
1623
0
  return log_cache_.FlushIndex();
1624
0
}
1625
1626
7.03M
void PeerMessageQueue::TrackOperationsMemory(const OpIds& op_ids) {
1627
7.03M
  log_cache_.TrackOperationsMemory(op_ids);
1628
7.03M
}
1629
1630
Result<OpId> PeerMessageQueue::TEST_GetLastOpIdWithType(
1631
3
    int64_t max_allowed_index, OperationType op_type) {
1632
3
  return log_cache_.TEST_GetLastOpIdWithType(max_allowed_index, op_type);
1633
3
}
1634
1635
10.3k
Status ValidateFlags() {
1636
  // Normally we would have used
1637
  //   DEFINE_validator(rpc_throttle_threshold_bytes, &RpcThrottleThresholdBytesValidator);
1638
  // right after defining the rpc_throttle_threshold_bytes flag. However, this leads to a segfault
1639
  // in the LTO-enabled build, presumably due to indeterminate order of static initialization.
1640
  // Instead, we invoke this function from master/tserver main() functions when static
1641
  // initialization is already finished.
1642
10.3k
  if (!RpcThrottleThresholdBytesValidator(
1643
0
      "rpc_throttle_threshold_bytes", FLAGS_rpc_throttle_threshold_bytes)) {
1644
0
    return STATUS(InvalidArgument, "Flag validation failed");
1645
0
  }
1646
1647
10.3k
  return Status::OK();
1648
10.3k
}
1649
1650
}  // namespace consensus
1651
}  // namespace yb