YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/consensus/consensus_queue.h
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#ifndef YB_CONSENSUS_CONSENSUS_QUEUE_H_
34
#define YB_CONSENSUS_CONSENSUS_QUEUE_H_
35
36
#include <iosfwd>
37
#include <map>
38
#include <set>
39
#include <string>
40
#include <unordered_map>
41
#include <utility>
42
#include <vector>
43
44
#include "yb/common/entity_ids_types.h"
45
#include "yb/common/hybrid_time.h"
46
47
#include "yb/consensus/metadata.pb.h"
48
#include "yb/consensus/log_cache.h"
49
#include "yb/consensus/opid_util.h"
50
51
#include "yb/gutil/ref_counted.h"
52
53
#include "yb/server/clock.h"
54
55
#include "yb/util/status_fwd.h"
56
#include "yb/util/locks.h"
57
58
namespace yb {
59
template<class T>
60
class AtomicGauge;
61
class MemTracker;
62
class MetricEntity;
63
class ThreadPoolToken;
64
65
namespace consensus {
66
67
class PeerMessageQueueObserver;
68
struct MajorityReplicatedData;
69
70
// The id for the server-wide consensus queue MemTracker.
71
extern const char kConsensusQueueParentTrackerId[];
72
73
// Utility structure to track value sent to and received by follower.
74
template <class Value>
75
struct FollowerWatermark {
76
  const Value initial;
77
78
  // When value is sent to follower, its value is written to last_sent.
79
  Value last_sent;
80
81
  // After follower successfully process our request, we copy value from last_sent to last_received.
82
  Value last_received;
83
84
  explicit FollowerWatermark(const Value& initial_ = Value())
85
820k
      : initial(initial_), last_sent(initial_), last_received(initial_) {}
yb::consensus::FollowerWatermark<std::__1::chrono::time_point<yb::CoarseMonoClock, std::__1::chrono::duration<long long, std::__1::ratio<1l, 1000000000l> > > >::FollowerWatermark(std::__1::chrono::time_point<yb::CoarseMonoClock, std::__1::chrono::duration<long long, std::__1::ratio<1l, 1000000000l> > > const&)
Line
Count
Source
85
273k
      : initial(initial_), last_sent(initial_), last_received(initial_) {}
yb::consensus::FollowerWatermark<unsigned long long>::FollowerWatermark(unsigned long long const&)
Line
Count
Source
85
273k
      : initial(initial_), last_sent(initial_), last_received(initial_) {}
yb::consensus::FollowerWatermark<yb::HybridTime>::FollowerWatermark(yb::HybridTime const&)
Line
Count
Source
85
273k
      : initial(initial_), last_sent(initial_), last_received(initial_) {}
86
87
499k
  void Reset() {
88
499k
    last_sent = initial;
89
499k
    last_received = initial;
90
499k
  }
yb::consensus::FollowerWatermark<std::__1::chrono::time_point<yb::CoarseMonoClock, std::__1::chrono::duration<long long, std::__1::ratio<1l, 1000000000l> > > >::Reset()
Line
Count
Source
87
249k
  void Reset() {
88
249k
    last_sent = initial;
89
249k
    last_received = initial;
90
249k
  }
yb::consensus::FollowerWatermark<unsigned long long>::Reset()
Line
Count
Source
87
249k
  void Reset() {
88
249k
    last_sent = initial;
89
249k
    last_received = initial;
90
249k
  }
91
92
69.5M
  void OnReplyFromFollower() {
93
69.5M
    last_received = last_sent;
94
69.5M
  }
yb::consensus::FollowerWatermark<std::__1::chrono::time_point<yb::CoarseMonoClock, std::__1::chrono::duration<long long, std::__1::ratio<1l, 1000000000l> > > >::OnReplyFromFollower()
Line
Count
Source
92
34.7M
  void OnReplyFromFollower() {
93
34.7M
    last_received = last_sent;
94
34.7M
  }
yb::consensus::FollowerWatermark<unsigned long long>::OnReplyFromFollower()
Line
Count
Source
92
34.7M
  void OnReplyFromFollower() {
93
34.7M
    last_received = last_sent;
94
34.7M
  }
95
96
  std::string ToString() const {
97
    return Format("{ last_sent: $0 last_received: $1 }", last_sent, last_received);
98
  }
99
};
100
101
102
// Tracks the state of the peers and which transactions they have replicated.  Owns the LogCache
103
// which actually holds the replicate messages which are en route to the various peers.
104
//
105
// This also takes care of pushing requests to peers as new operations are added, and notifying
106
// RaftConsensus when the commit index advances.
107
//
108
// TODO Currently this class is able to track one outstanding operation per peer. If we want to have
109
// more than one outstanding RPC we need to modify it.
110
class PeerMessageQueue {
111
 public:
112
  struct TrackedPeer {
113
    explicit TrackedPeer(std::string uuid)
114
        : uuid(std::move(uuid)),
115
          last_known_committed_idx(OpId::Min().index),
116
273k
          last_successful_communication_time(MonoTime::Now()) {}
117
118
    // Check that the terms seen from a given peer only increase monotonically.
119
25.4M
    void CheckMonotonicTerms(int64_t term) {
120
25.4M
      DCHECK_GE(term, last_seen_term_);
121
25.4M
      last_seen_term_ = term;
122
25.4M
    }
123
124
    std::string ToString() const;
125
126
    void ResetLeaderLeases();
127
128
    void ResetLastRequest();
129
130
    // UUID of the peer.
131
    const std::string uuid;
132
133
    // Whether this is a newly tracked peer.
134
    bool is_new = true;
135
136
    // Next index to send to the peer.  This corresponds to "nextIndex" as specified in Raft.
137
    int64_t next_index = kInvalidOpIdIndex;
138
139
    // Number of ops starting from next_index_ to retransmit.
140
    int64_t last_num_messages_sent = -1;
141
142
    // Number of retransmissions from same next_index_.
143
    int64_t current_retransmissions = -1;
144
145
    // The last operation that we've sent to this peer and that it acked. Used for watermark
146
    // movement.
147
    OpId last_received = yb::OpId::Min();
148
149
    // The last committed index this peer knows about.
150
    int64_t last_known_committed_idx;
151
152
    // The ID of the operation last applied by this peer.
153
    OpId last_applied;
154
155
    // Whether the last exchange with this peer was successful.
156
    bool is_last_exchange_successful = false;
157
158
    // The time of the last communication with the peer.
159
    // Defaults to the time of construction, so does not necessarily mean that
160
    // successful communication ever took place.
161
    MonoTime last_successful_communication_time;
162
163
    // Leader lease expiration from this follower's point of view.
164
    FollowerWatermark<CoarseTimePoint> leader_lease_expiration;
165
166
    // Leader hybrid time lease expiration from this follower's point of view.
167
    FollowerWatermark<MicrosTime> leader_ht_lease_expiration{
168
        HybridTime::kMin.GetPhysicalValueMicros()};
169
170
    // History cutoff from this follower's point of view.
171
    FollowerWatermark<HybridTime> history_cutoff{HybridTime::kMin};
172
173
    // Whether the follower was detected to need remote bootstrap.
174
    bool needs_remote_bootstrap = false;
175
176
    // Member type of this peer in the config.
177
    PeerMemberType member_type = PeerMemberType::UNKNOWN_MEMBER_TYPE;
178
179
    uint64_t num_sst_files = 0;
180
181
   private:
182
    // The last term we saw from a given peer.
183
    // This is only used for sanity checking that a peer doesn't
184
    // go backwards in time.
185
    int64_t last_seen_term_ = 0;
186
  };
187
188
  PeerMessageQueue(const scoped_refptr<MetricEntity>& metric_entity,
189
                   const scoped_refptr<log::Log>& log,
190
                   const std::shared_ptr<MemTracker>& server_tracker,
191
                   const std::shared_ptr<MemTracker>& parent_tracker,
192
                   const RaftPeerPB& local_peer_pb,
193
                   const std::string& tablet_id,
194
                   const server::ClockPtr& clock,
195
                   ConsensusContext* context,
196
                   std::unique_ptr<ThreadPoolToken> raft_pool_observers_token);
197
198
  // Initialize the queue.
199
  virtual void Init(const OpId& last_locally_replicated);
200
201
  // Changes the queue to leader mode, meaning it tracks majority replicated operations and notifies
202
  // observers when those change.
203
  //
204
  // 'committed_index' corresponds to the id of the last committed operation, i.e. operations with
205
  // ids <= 'committed_index' should be considered committed.
206
  //
207
  // 'current_term' corresponds to the leader's current term, this is different from
208
  // 'committed_index.term()' if the leader has not yet committed an operation in the current term.
209
  //
210
  // 'active_config' is the currently-active Raft config. This must always be a superset of the
211
  // tracked peers, and that is enforced with runtime CHECKs.
212
  virtual void SetLeaderMode(const OpId& committed_op_id,
213
                             int64_t current_term,
214
                             const OpId& last_applied_op_id,
215
                             const RaftConfigPB& active_config);
216
217
  // Changes the queue to non-leader mode. Currently tracked peers will still be tracked so that the
218
  // cache is only evicted when the peers no longer need the operations but the queue will no longer
219
  // advance the majority replicated index or notify observers of its advancement.
220
  virtual void SetNonLeaderMode();
221
222
  // Makes the queue track this peer.
223
  virtual void TrackPeer(const std::string& peer_uuid);
224
225
  // Makes the queue untrack this peer.
226
  virtual void UntrackPeer(const std::string& peer_uuid);
227
228
  // Appends a single message to be replicated to the peers.  Returns OK unless the message could
229
  // not be added to the queue for some reason (e.g. the queue reached max size).
230
  //
231
  // If it returns OK the queue takes ownership of 'msg'.
232
  //
233
  // This is thread-safe against all of the read methods, but not thread-safe with concurrent Append
234
  // calls.
235
  CHECKED_STATUS TEST_AppendOperation(const ReplicateMsgPtr& msg);
236
237
  // Appends a vector of messages to be replicated to the peers.  Returns OK unless the message
238
  // could not be added to the queue for some reason (e.g. the queue reached max size). Calls
239
  // 'log_append_callback' when the messages are durable in the local Log.
240
  //
241
  // If it returns OK the queue takes ownership of 'msgs'.
242
  //
243
  // This is thread-safe against all of the read methods, but not thread-safe with concurrent Append
244
  // calls.
245
  //
246
  // It is possible that this method will be invoked with empty list of messages, when
247
  // we update committed op id.
248
  virtual CHECKED_STATUS AppendOperations(
249
      const ReplicateMsgs& msgs, const yb::OpId& committed_op_id,
250
      RestartSafeCoarseTimePoint batch_mono_time);
251
252
  // Assembles a request for a peer, adding entries past 'op_id' up to
253
  // 'consensus_max_batch_size_bytes'.
254
  //
255
  // Returns OK if the request was assembled, or STATUS(NotFound, "") if the peer with 'uuid' was
256
  // not tracked, or if the queue is not in leader mode.
257
  //
258
  // Returns STATUS(Incomplete, "") if we try to read an operation index from the log that has not
259
  // been written.
260
  //
261
  // WARNING: In order to avoid copying the same messages to every peer, entries are added to
262
  // 'request' via AddAllocated() methods.  The owner of 'request' is expected not to delete the
263
  // request prior to removing the entries through ExtractSubRange() or any other method that does
264
  // not delete the entries. The simplest way is to pass the same instance of ConsensusRequestPB to
265
  // RequestForPeer(): the buffer will replace the old entries with new ones without de-allocating
266
  // the old ones if they are still required.
267
  virtual CHECKED_STATUS RequestForPeer(
268
      const std::string& uuid,
269
      ConsensusRequestPB* request,
270
      ReplicateMsgsHolder* msgs_holder,
271
      bool* needs_remote_bootstrap,
272
      PeerMemberType* member_type = nullptr,
273
      bool* last_exchange_successful = nullptr);
274
275
  // Fill in a StartRemoteBootstrapRequest for the specified peer.  If that peer should not remotely
276
  // bootstrap, returns a non-OK status.  On success, also internally resets
277
  // peer->needs_remote_bootstrap to false.
278
  CHECKED_STATUS GetRemoteBootstrapRequestForPeer(
279
      const std::string& uuid,
280
      StartRemoteBootstrapRequestPB* req);
281
282
  // Update the last successful communication timestamp for the given peer to the current time. This
283
  // should be called when a non-network related error is received from the peer, indicating that it
284
  // is alive, even if it may not be fully up and running or able to accept updates.
285
  void NotifyPeerIsResponsiveDespiteError(const std::string& peer_uuid);
286
287
  // Updates the request queue with the latest response of a peer, returns whether this peer has
288
  // more requests pending.
289
  virtual bool ResponseFromPeer(const std::string& peer_uuid,
290
                                const ConsensusResponsePB& response);
291
292
  void RequestWasNotSent(const std::string& peer_uuid);
293
294
  // Closes the queue, peers are still allowed to call UntrackPeer() and ResponseFromPeer() but no
295
  // additional peers can be tracked or messages queued.
296
  virtual void Close();
297
298
  // Returns the last message replicated by all peers, for tests.
299
  OpId TEST_GetAllReplicatedIndex() const;
300
301
  OpId TEST_GetCommittedIndex() const;
302
303
  OpId GetAllAppliedOpId() const;
304
305
  // Returns the current majority replicated OpId, for tests.
306
  OpId TEST_GetMajorityReplicatedOpId() const;
307
308
  OpId TEST_GetLastAppended() const;
309
310
  OpId TEST_GetLastAppliedOpId() const;
311
312
  // Returns true if specified peer accepted our lease request.
313
  bool PeerAcceptedOurLease(const std::string& uuid) const;
314
315
  // Returns a copy of the TrackedPeer with 'uuid' or crashes if the peer is not being tracked.
316
  TrackedPeer GetTrackedPeerForTests(std::string uuid);
317
318
  std::string ToString() const;
319
320
  void DumpToHtml(std::ostream& out) const;
321
322
  void RegisterObserver(PeerMessageQueueObserver* observer);
323
324
  CHECKED_STATUS UnRegisterObserver(PeerMessageQueueObserver* observer);
325
326
  bool CanPeerBecomeLeader(const std::string& peer_uuid) const;
327
328
  OpId PeerLastReceivedOpId(const TabletServerId& uuid) const;
329
330
  std::string GetUpToDatePeer() const;
331
332
  struct Metrics {
333
    // Keeps track of the number of ops. that are completed by a majority but still need
334
    // to be replicated to a minority (IsDone() is true, IsAllDone() is false).
335
    scoped_refptr<AtomicGauge<int64_t> > num_majority_done_ops;
336
    // Keeps track of the number of ops. that are still in progress (IsDone() returns false).
337
    scoped_refptr<AtomicGauge<int64_t> > num_in_progress_ops;
338
339
    explicit Metrics(const scoped_refptr<MetricEntity>& metric_entity);
340
  };
341
342
  virtual ~PeerMessageQueue();
343
344
  void NotifyObserversOfFailedFollower(const std::string& uuid,
345
                                       const std::string& reason);
346
347
0
  void SetContext(ConsensusContext* context) {
348
0
    context_ = context;
349
0
  }
350
351
5
  const CloudInfoPB& local_cloud_info() const {
352
5
    return local_peer_pb_.cloud_info();
353
5
  }
354
355
  // Read replicated log records starting from the OpId immediately after last_op_id.
356
  Result<ReadOpsResult> ReadReplicatedMessagesForCDC(
357
    const yb::OpId& last_op_id,
358
    int64_t* last_replicated_opid_index = nullptr,
359
    const CoarseTimePoint deadline = CoarseTimePoint::max());
360
361
  void UpdateCDCConsumerOpId(const yb::OpId& op_id);
362
363
  // Get the maximum op ID that can be evicted for CDC consumer from log cache.
364
  yb::OpId GetCDCConsumerOpIdToEvict();
365
  yb::OpId GetCDCConsumerOpIdForIntentRemoval();
366
367
368
  size_t LogCacheSize();
369
  size_t EvictLogCache(size_t bytes_to_evict);
370
371
  CHECKED_STATUS FlushLogIndex();
372
373
  // Start memory tracking of following operations in case they are still present in our caches.
374
  void TrackOperationsMemory(const OpIds& op_ids);
375
376
25.5M
  const server::ClockPtr& clock() const {
377
25.5M
    return clock_;
378
25.5M
  }
379
380
  Result<OpId> TEST_GetLastOpIdWithType(int64_t max_allowed_index, OperationType op_type);
381
382
 private:
383
  FRIEND_TEST(ConsensusQueueTest, TestQueueAdvancesCommittedIndex);
384
  FRIEND_TEST(ConsensusQueueTest, TestReadReplicatedMessagesForCDC);
385
386
  // Mode specifies how the queue currently behaves:
387
  //
388
  // LEADER - Means the queue tracks remote peers and replicates whatever messages are appended.
389
  //          Observers are notified of changes.
390
  //
391
  // NON_LEADER - Means the queue only tracks the local peer (remote peers are ignored).  Observers
392
  //              are not notified of changes.
393
  enum class Mode {
394
    LEADER,
395
    NON_LEADER
396
  };
397
398
  static const char* ModeToStr(Mode mode);
399
  friend std::ostream& operator <<(std::ostream& out, Mode mode);
400
401
  enum class State {
402
    kQueueConstructed,
403
    kQueueOpen,
404
    kQueueClosed
405
  };
406
407
  static const char* StateToStr(State state);
408
  friend std::ostream& operator <<(std::ostream& out, State mode);
409
410
  static constexpr ssize_t kUninitializedMajoritySize = -1;
411
412
  struct QueueState {
413
414
    // The last operation that has been replicated to all currently tracked peers.
415
    OpId all_replicated_op_id = OpId::Min();
416
417
    // The last operation that has been replicated to all currently non-lagging tracked peers.
418
    OpId all_nonlagging_replicated_op_id = OpId::Min();
419
420
    // The last operation that has been applied by all currently tracked peers.
421
    OpId all_applied_op_id = OpId::Min();
422
423
    // The index of the last operation replicated to a majority.  This is usually the same as
424
    // 'committed_op_id' but might not be if the terms changed.
425
    OpId majority_replicated_op_id = OpId::Min();
426
427
    // The index of the last operation to be considered committed.
428
    OpId committed_op_id = OpId::Min();
429
430
    // The ID of the last applied operation.
431
    OpId last_applied_op_id = OpId::Min();
432
433
    // The opid of the last operation appended to the queue.
434
    OpId last_appended = OpId::Min();
435
436
    // The queue's owner current_term.  Set by the last appended operation.  If the queue owner's
437
    // term is less than the term observed from another peer the queue owner must step down.
438
    // TODO: it is likely to be cleaner to get this from the ConsensusMetadata rather than by
439
    // snooping on what operations are appended to the queue.
440
    int64_t current_term = OpId::Min().term;
441
442
    // The size of the majority for the queue.
443
    ssize_t majority_size_ = kUninitializedMajoritySize;
444
445
    State state = State::kQueueConstructed;
446
447
    // The current mode of the queue.
448
    Mode mode = Mode::NON_LEADER;
449
450
    // The currently-active raft config. Only set if in LEADER mode.
451
    std::unique_ptr<RaftConfigPB> active_config;
452
453
    std::string ToString() const;
454
  };
455
456
  // Returns true iff given 'desired_op' is found in the local WAL.
457
  // If the op is not found, returns false.
458
  // If the log cache returns some error other than NotFound, crashes with a fatal error.
459
  bool IsOpInLog(const yb::OpId& desired_op) const;
460
461
  void NotifyObserversOfMajorityReplOpChange(const MajorityReplicatedData& data);
462
463
  void NotifyObserversOfMajorityReplOpChangeTask(const MajorityReplicatedData& data);
464
465
  void NotifyObserversOfTermChange(int64_t term);
466
467
  void NotifyObserversOfFailedFollower(const std::string& uuid,
468
                                       int64_t term,
469
                                       const std::string& reason);
470
471
  template <class Func>
472
  void NotifyObservers(const char* title, Func&& func);
473
474
  typedef std::unordered_map<std::string, TrackedPeer*> PeersMap;
475
476
  std::string ToStringUnlocked() const;
477
478
  std::string LogPrefixUnlocked() const;
479
480
  // Updates the metrics based on index math.
481
  void UpdateMetrics();
482
483
  void ClearUnlocked();
484
485
  // Returns the last operation in the message queue, or 'preceding_first_op_in_queue_' if the queue
486
  // is empty.
487
  const OpIdPB& GetLastOp() const;
488
489
  TrackedPeer* TrackPeerUnlocked(const std::string& uuid);
490
491
  // Checks that if the queue is in LEADER mode then all registered peers are in the active config.
492
  // Crashes with a FATAL log message if this invariant does not hold. If the queue is in NON_LEADER
493
  // mode, does nothing.
494
  void CheckPeersInActiveConfigIfLeaderUnlocked() const;
495
496
  // Callback when a REPLICATE message has finished appending to the local log.
497
  void LocalPeerAppendFinished(const OpId& id, const Status& status);
498
499
  void NumSSTFilesChanged();
500
501
  // Updates op id replicated on each node.
502
  void UpdateAllReplicatedOpId(OpId* result) REQUIRES(queue_lock_);
503
504
  // Updates op ID applied on each node.
505
  void UpdateAllAppliedOpId(OpId* result) REQUIRES(queue_lock_);
506
507
  // Updates op id replicated on each non-lagging node.
508
  void UpdateAllNonLaggingReplicatedOpId(int32_t threshold) REQUIRES(queue_lock_);
509
510
  // Policy is responsible for tuning of watermark calculation.
511
  // I.e. simple leader lease or hybrid time leader lease etc.
512
  // It should provide result type and a function for extracting a value from a peer.
513
  template <class Policy>
514
  typename Policy::result_type GetWatermark();
515
516
  CoarseTimePoint LeaderLeaseExpirationWatermark();
517
  MicrosTime HybridTimeLeaseExpirationWatermark();
518
  OpId OpIdWatermark();
519
  uint64_t NumSSTFilesWatermark();
520
521
  // Reads operations from the log cache in the range (after_index, to_index].
522
  //
523
  // If 'to_index' is 0, then all operations after 'after_index' will be included.
524
  Result<ReadOpsResult> ReadFromLogCache(
525
    int64_t after_index,
526
    int64_t to_index,
527
    size_t max_batch_size,
528
    const std::string& peer_uuid,
529
    const CoarseTimePoint deadline = CoarseTimePoint::max());
530
531
  std::vector<PeerMessageQueueObserver*> observers_;
532
533
  // The pool token which executes observer notifications.
534
  std::unique_ptr<ThreadPoolToken> raft_pool_observers_token_;
535
536
  // PB containing identifying information about the local peer.
537
  const RaftPeerPB local_peer_pb_;
538
  const yb::PeerId local_peer_uuid_;
539
540
  const TabletId tablet_id_;
541
542
  QueueState queue_state_;
543
544
  // The currently tracked peers.
545
  PeersMap peers_map_;
546
  TrackedPeer* local_peer_ = nullptr;
547
548
  using LockType = simple_spinlock;
549
  using LockGuard = std::lock_guard<LockType>;
550
  mutable LockType queue_lock_; // TODO: rename
551
552
  // We assume that we never have multiple threads racing to append to the queue.  This fake mutex
553
  // adds some extra assurance that this implementation property doesn't change.
554
  DFAKE_MUTEX(append_fake_lock_);
555
556
  LogCache log_cache_;
557
558
  std::shared_ptr<MemTracker> operations_mem_tracker_;
559
560
  Metrics metrics_;
561
562
  server::ClockPtr clock_;
563
564
  ConsensusContext* context_ = nullptr;
565
  bool installed_num_sst_files_changed_listener_ = false;
566
567
  // Used to protect cdc_consumer_op_id_ and cdc_consumer_op_id_last_updated_.
568
  mutable rw_spinlock cdc_consumer_lock_;
569
  yb::OpId cdc_consumer_op_id_ = yb::OpId::Max();
570
  CoarseTimePoint cdc_consumer_op_id_last_updated_ = ToCoarse(MonoTime::kMin);
571
};
572
573
0
inline std::ostream& operator <<(std::ostream& out, PeerMessageQueue::Mode mode) {
574
0
  return out << PeerMessageQueue::ModeToStr(mode);
575
0
}
576
577
0
inline std::ostream& operator <<(std::ostream& out, PeerMessageQueue::State state) {
578
0
  return out << PeerMessageQueue::StateToStr(state);
579
0
}
580
581
struct MajorityReplicatedData {
582
  OpId op_id;
583
  CoarseTimePoint leader_lease_expiration;
584
  MicrosTime ht_lease_expiration;
585
  uint64_t num_sst_files;
586
587
  // Update was caused by the following peer, that received all operations.
588
  TabletServerId peer_got_all_ops;
589
590
  std::string ToString() const;
591
};
592
593
// The interface between RaftConsensus and the PeerMessageQueue.
594
class PeerMessageQueueObserver {
595
 public:
596
  // Called by the queue each time the response for a peer is handled with the resulting majority
597
  // replicated index.  The consensus implementation decides the commit index based on that and
598
  // triggers the apply for pending transactions.
599
  //
600
  // 'committed_index' is set to the id of the last operation considered committed by consensus.
601
  // `last_applied_op_id` is set the ID of the last operation applied by consensus.
602
  //
603
  // The implementation is idempotent, i.e. independently of the ordering of calls to this method
604
  // only non-triggered applys will be started.
605
  virtual void UpdateMajorityReplicated(
606
      const MajorityReplicatedData& data, OpId* committed_index, OpId* last_applied_op_id) = 0;
607
608
  // Notify the Consensus implementation that a follower replied with a term higher than that
609
  // established in the queue.
610
  virtual void NotifyTermChange(int64_t term) = 0;
611
612
  // Notify Consensus that a peer is unable to catch up due to falling behind the leader's log GC
613
  // threshold.
614
  virtual void NotifyFailedFollower(const std::string& peer_uuid,
615
                                    int64_t term,
616
                                    const std::string& reason) = 0;
617
618
  virtual void MajorityReplicatedNumSSTFilesChanged(uint64_t majority_replicated_num_sst_files) = 0;
619
620
75.6k
  virtual ~PeerMessageQueueObserver() {}
621
};
622
623
Status ValidateFlags();
624
625
}  // namespace consensus
626
}  // namespace yb
627
628
#endif // YB_CONSENSUS_CONSENSUS_QUEUE_H_