/Users/deen/code/yugabyte-db/src/yb/consensus/consensus_queue.h
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | // |
18 | | // The following only applies to changes made to this file as part of YugaByte development. |
19 | | // |
20 | | // Portions Copyright (c) YugaByte, Inc. |
21 | | // |
22 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
23 | | // in compliance with the License. You may obtain a copy of the License at |
24 | | // |
25 | | // http://www.apache.org/licenses/LICENSE-2.0 |
26 | | // |
27 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
28 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
29 | | // or implied. See the License for the specific language governing permissions and limitations |
30 | | // under the License. |
31 | | // |
32 | | |
33 | | #ifndef YB_CONSENSUS_CONSENSUS_QUEUE_H_ |
34 | | #define YB_CONSENSUS_CONSENSUS_QUEUE_H_ |
35 | | |
36 | | #include <iosfwd> |
37 | | #include <map> |
38 | | #include <set> |
39 | | #include <string> |
40 | | #include <unordered_map> |
41 | | #include <utility> |
42 | | #include <vector> |
43 | | |
44 | | #include "yb/common/entity_ids_types.h" |
45 | | #include "yb/common/hybrid_time.h" |
46 | | |
47 | | #include "yb/consensus/metadata.pb.h" |
48 | | #include "yb/consensus/log_cache.h" |
49 | | #include "yb/consensus/opid_util.h" |
50 | | |
51 | | #include "yb/gutil/ref_counted.h" |
52 | | |
53 | | #include "yb/server/clock.h" |
54 | | |
55 | | #include "yb/util/status_fwd.h" |
56 | | #include "yb/util/locks.h" |
57 | | |
58 | | namespace yb { |
59 | | template<class T> |
60 | | class AtomicGauge; |
61 | | class MemTracker; |
62 | | class MetricEntity; |
63 | | class ThreadPoolToken; |
64 | | |
65 | | namespace consensus { |
66 | | |
67 | | class PeerMessageQueueObserver; |
68 | | struct MajorityReplicatedData; |
69 | | |
70 | | // The id for the server-wide consensus queue MemTracker. |
71 | | extern const char kConsensusQueueParentTrackerId[]; |
72 | | |
73 | | // Utility structure to track value sent to and received by follower. |
74 | | template <class Value> |
75 | | struct FollowerWatermark { |
76 | | const Value initial; |
77 | | |
78 | | // When value is sent to follower, its value is written to last_sent. |
79 | | Value last_sent; |
80 | | |
81 | | // After follower successfully process our request, we copy value from last_sent to last_received. |
82 | | Value last_received; |
83 | | |
84 | | explicit FollowerWatermark(const Value& initial_ = Value()) |
85 | 820k | : initial(initial_), last_sent(initial_), last_received(initial_) {} yb::consensus::FollowerWatermark<std::__1::chrono::time_point<yb::CoarseMonoClock, std::__1::chrono::duration<long long, std::__1::ratio<1l, 1000000000l> > > >::FollowerWatermark(std::__1::chrono::time_point<yb::CoarseMonoClock, std::__1::chrono::duration<long long, std::__1::ratio<1l, 1000000000l> > > const&) Line | Count | Source | 85 | 273k | : initial(initial_), last_sent(initial_), last_received(initial_) {} |
yb::consensus::FollowerWatermark<unsigned long long>::FollowerWatermark(unsigned long long const&) Line | Count | Source | 85 | 273k | : initial(initial_), last_sent(initial_), last_received(initial_) {} |
yb::consensus::FollowerWatermark<yb::HybridTime>::FollowerWatermark(yb::HybridTime const&) Line | Count | Source | 85 | 273k | : initial(initial_), last_sent(initial_), last_received(initial_) {} |
|
86 | | |
87 | 499k | void Reset() { |
88 | 499k | last_sent = initial; |
89 | 499k | last_received = initial; |
90 | 499k | } yb::consensus::FollowerWatermark<std::__1::chrono::time_point<yb::CoarseMonoClock, std::__1::chrono::duration<long long, std::__1::ratio<1l, 1000000000l> > > >::Reset() Line | Count | Source | 87 | 249k | void Reset() { | 88 | 249k | last_sent = initial; | 89 | 249k | last_received = initial; | 90 | 249k | } |
yb::consensus::FollowerWatermark<unsigned long long>::Reset() Line | Count | Source | 87 | 249k | void Reset() { | 88 | 249k | last_sent = initial; | 89 | 249k | last_received = initial; | 90 | 249k | } |
|
91 | | |
92 | 69.5M | void OnReplyFromFollower() { |
93 | 69.5M | last_received = last_sent; |
94 | 69.5M | } yb::consensus::FollowerWatermark<std::__1::chrono::time_point<yb::CoarseMonoClock, std::__1::chrono::duration<long long, std::__1::ratio<1l, 1000000000l> > > >::OnReplyFromFollower() Line | Count | Source | 92 | 34.7M | void OnReplyFromFollower() { | 93 | 34.7M | last_received = last_sent; | 94 | 34.7M | } |
yb::consensus::FollowerWatermark<unsigned long long>::OnReplyFromFollower() Line | Count | Source | 92 | 34.7M | void OnReplyFromFollower() { | 93 | 34.7M | last_received = last_sent; | 94 | 34.7M | } |
|
95 | | |
96 | | std::string ToString() const { |
97 | | return Format("{ last_sent: $0 last_received: $1 }", last_sent, last_received); |
98 | | } |
99 | | }; |
100 | | |
101 | | |
102 | | // Tracks the state of the peers and which transactions they have replicated. Owns the LogCache |
103 | | // which actually holds the replicate messages which are en route to the various peers. |
104 | | // |
105 | | // This also takes care of pushing requests to peers as new operations are added, and notifying |
106 | | // RaftConsensus when the commit index advances. |
107 | | // |
108 | | // TODO Currently this class is able to track one outstanding operation per peer. If we want to have |
109 | | // more than one outstanding RPC we need to modify it. |
110 | | class PeerMessageQueue { |
111 | | public: |
112 | | struct TrackedPeer { |
113 | | explicit TrackedPeer(std::string uuid) |
114 | | : uuid(std::move(uuid)), |
115 | | last_known_committed_idx(OpId::Min().index), |
116 | 273k | last_successful_communication_time(MonoTime::Now()) {} |
117 | | |
118 | | // Check that the terms seen from a given peer only increase monotonically. |
119 | 25.4M | void CheckMonotonicTerms(int64_t term) { |
120 | 25.4M | DCHECK_GE(term, last_seen_term_); |
121 | 25.4M | last_seen_term_ = term; |
122 | 25.4M | } |
123 | | |
124 | | std::string ToString() const; |
125 | | |
126 | | void ResetLeaderLeases(); |
127 | | |
128 | | void ResetLastRequest(); |
129 | | |
130 | | // UUID of the peer. |
131 | | const std::string uuid; |
132 | | |
133 | | // Whether this is a newly tracked peer. |
134 | | bool is_new = true; |
135 | | |
136 | | // Next index to send to the peer. This corresponds to "nextIndex" as specified in Raft. |
137 | | int64_t next_index = kInvalidOpIdIndex; |
138 | | |
139 | | // Number of ops starting from next_index_ to retransmit. |
140 | | int64_t last_num_messages_sent = -1; |
141 | | |
142 | | // Number of retransmissions from same next_index_. |
143 | | int64_t current_retransmissions = -1; |
144 | | |
145 | | // The last operation that we've sent to this peer and that it acked. Used for watermark |
146 | | // movement. |
147 | | OpId last_received = yb::OpId::Min(); |
148 | | |
149 | | // The last committed index this peer knows about. |
150 | | int64_t last_known_committed_idx; |
151 | | |
152 | | // The ID of the operation last applied by this peer. |
153 | | OpId last_applied; |
154 | | |
155 | | // Whether the last exchange with this peer was successful. |
156 | | bool is_last_exchange_successful = false; |
157 | | |
158 | | // The time of the last communication with the peer. |
159 | | // Defaults to the time of construction, so does not necessarily mean that |
160 | | // successful communication ever took place. |
161 | | MonoTime last_successful_communication_time; |
162 | | |
163 | | // Leader lease expiration from this follower's point of view. |
164 | | FollowerWatermark<CoarseTimePoint> leader_lease_expiration; |
165 | | |
166 | | // Leader hybrid time lease expiration from this follower's point of view. |
167 | | FollowerWatermark<MicrosTime> leader_ht_lease_expiration{ |
168 | | HybridTime::kMin.GetPhysicalValueMicros()}; |
169 | | |
170 | | // History cutoff from this follower's point of view. |
171 | | FollowerWatermark<HybridTime> history_cutoff{HybridTime::kMin}; |
172 | | |
173 | | // Whether the follower was detected to need remote bootstrap. |
174 | | bool needs_remote_bootstrap = false; |
175 | | |
176 | | // Member type of this peer in the config. |
177 | | PeerMemberType member_type = PeerMemberType::UNKNOWN_MEMBER_TYPE; |
178 | | |
179 | | uint64_t num_sst_files = 0; |
180 | | |
181 | | private: |
182 | | // The last term we saw from a given peer. |
183 | | // This is only used for sanity checking that a peer doesn't |
184 | | // go backwards in time. |
185 | | int64_t last_seen_term_ = 0; |
186 | | }; |
187 | | |
188 | | PeerMessageQueue(const scoped_refptr<MetricEntity>& metric_entity, |
189 | | const scoped_refptr<log::Log>& log, |
190 | | const std::shared_ptr<MemTracker>& server_tracker, |
191 | | const std::shared_ptr<MemTracker>& parent_tracker, |
192 | | const RaftPeerPB& local_peer_pb, |
193 | | const std::string& tablet_id, |
194 | | const server::ClockPtr& clock, |
195 | | ConsensusContext* context, |
196 | | std::unique_ptr<ThreadPoolToken> raft_pool_observers_token); |
197 | | |
198 | | // Initialize the queue. |
199 | | virtual void Init(const OpId& last_locally_replicated); |
200 | | |
201 | | // Changes the queue to leader mode, meaning it tracks majority replicated operations and notifies |
202 | | // observers when those change. |
203 | | // |
204 | | // 'committed_index' corresponds to the id of the last committed operation, i.e. operations with |
205 | | // ids <= 'committed_index' should be considered committed. |
206 | | // |
207 | | // 'current_term' corresponds to the leader's current term, this is different from |
208 | | // 'committed_index.term()' if the leader has not yet committed an operation in the current term. |
209 | | // |
210 | | // 'active_config' is the currently-active Raft config. This must always be a superset of the |
211 | | // tracked peers, and that is enforced with runtime CHECKs. |
212 | | virtual void SetLeaderMode(const OpId& committed_op_id, |
213 | | int64_t current_term, |
214 | | const OpId& last_applied_op_id, |
215 | | const RaftConfigPB& active_config); |
216 | | |
217 | | // Changes the queue to non-leader mode. Currently tracked peers will still be tracked so that the |
218 | | // cache is only evicted when the peers no longer need the operations but the queue will no longer |
219 | | // advance the majority replicated index or notify observers of its advancement. |
220 | | virtual void SetNonLeaderMode(); |
221 | | |
222 | | // Makes the queue track this peer. |
223 | | virtual void TrackPeer(const std::string& peer_uuid); |
224 | | |
225 | | // Makes the queue untrack this peer. |
226 | | virtual void UntrackPeer(const std::string& peer_uuid); |
227 | | |
228 | | // Appends a single message to be replicated to the peers. Returns OK unless the message could |
229 | | // not be added to the queue for some reason (e.g. the queue reached max size). |
230 | | // |
231 | | // If it returns OK the queue takes ownership of 'msg'. |
232 | | // |
233 | | // This is thread-safe against all of the read methods, but not thread-safe with concurrent Append |
234 | | // calls. |
235 | | CHECKED_STATUS TEST_AppendOperation(const ReplicateMsgPtr& msg); |
236 | | |
237 | | // Appends a vector of messages to be replicated to the peers. Returns OK unless the message |
238 | | // could not be added to the queue for some reason (e.g. the queue reached max size). Calls |
239 | | // 'log_append_callback' when the messages are durable in the local Log. |
240 | | // |
241 | | // If it returns OK the queue takes ownership of 'msgs'. |
242 | | // |
243 | | // This is thread-safe against all of the read methods, but not thread-safe with concurrent Append |
244 | | // calls. |
245 | | // |
246 | | // It is possible that this method will be invoked with empty list of messages, when |
247 | | // we update committed op id. |
248 | | virtual CHECKED_STATUS AppendOperations( |
249 | | const ReplicateMsgs& msgs, const yb::OpId& committed_op_id, |
250 | | RestartSafeCoarseTimePoint batch_mono_time); |
251 | | |
252 | | // Assembles a request for a peer, adding entries past 'op_id' up to |
253 | | // 'consensus_max_batch_size_bytes'. |
254 | | // |
255 | | // Returns OK if the request was assembled, or STATUS(NotFound, "") if the peer with 'uuid' was |
256 | | // not tracked, or if the queue is not in leader mode. |
257 | | // |
258 | | // Returns STATUS(Incomplete, "") if we try to read an operation index from the log that has not |
259 | | // been written. |
260 | | // |
261 | | // WARNING: In order to avoid copying the same messages to every peer, entries are added to |
262 | | // 'request' via AddAllocated() methods. The owner of 'request' is expected not to delete the |
263 | | // request prior to removing the entries through ExtractSubRange() or any other method that does |
264 | | // not delete the entries. The simplest way is to pass the same instance of ConsensusRequestPB to |
265 | | // RequestForPeer(): the buffer will replace the old entries with new ones without de-allocating |
266 | | // the old ones if they are still required. |
267 | | virtual CHECKED_STATUS RequestForPeer( |
268 | | const std::string& uuid, |
269 | | ConsensusRequestPB* request, |
270 | | ReplicateMsgsHolder* msgs_holder, |
271 | | bool* needs_remote_bootstrap, |
272 | | PeerMemberType* member_type = nullptr, |
273 | | bool* last_exchange_successful = nullptr); |
274 | | |
275 | | // Fill in a StartRemoteBootstrapRequest for the specified peer. If that peer should not remotely |
276 | | // bootstrap, returns a non-OK status. On success, also internally resets |
277 | | // peer->needs_remote_bootstrap to false. |
278 | | CHECKED_STATUS GetRemoteBootstrapRequestForPeer( |
279 | | const std::string& uuid, |
280 | | StartRemoteBootstrapRequestPB* req); |
281 | | |
282 | | // Update the last successful communication timestamp for the given peer to the current time. This |
283 | | // should be called when a non-network related error is received from the peer, indicating that it |
284 | | // is alive, even if it may not be fully up and running or able to accept updates. |
285 | | void NotifyPeerIsResponsiveDespiteError(const std::string& peer_uuid); |
286 | | |
287 | | // Updates the request queue with the latest response of a peer, returns whether this peer has |
288 | | // more requests pending. |
289 | | virtual bool ResponseFromPeer(const std::string& peer_uuid, |
290 | | const ConsensusResponsePB& response); |
291 | | |
292 | | void RequestWasNotSent(const std::string& peer_uuid); |
293 | | |
294 | | // Closes the queue, peers are still allowed to call UntrackPeer() and ResponseFromPeer() but no |
295 | | // additional peers can be tracked or messages queued. |
296 | | virtual void Close(); |
297 | | |
298 | | // Returns the last message replicated by all peers, for tests. |
299 | | OpId TEST_GetAllReplicatedIndex() const; |
300 | | |
301 | | OpId TEST_GetCommittedIndex() const; |
302 | | |
303 | | OpId GetAllAppliedOpId() const; |
304 | | |
305 | | // Returns the current majority replicated OpId, for tests. |
306 | | OpId TEST_GetMajorityReplicatedOpId() const; |
307 | | |
308 | | OpId TEST_GetLastAppended() const; |
309 | | |
310 | | OpId TEST_GetLastAppliedOpId() const; |
311 | | |
312 | | // Returns true if specified peer accepted our lease request. |
313 | | bool PeerAcceptedOurLease(const std::string& uuid) const; |
314 | | |
315 | | // Returns a copy of the TrackedPeer with 'uuid' or crashes if the peer is not being tracked. |
316 | | TrackedPeer GetTrackedPeerForTests(std::string uuid); |
317 | | |
318 | | std::string ToString() const; |
319 | | |
320 | | void DumpToHtml(std::ostream& out) const; |
321 | | |
322 | | void RegisterObserver(PeerMessageQueueObserver* observer); |
323 | | |
324 | | CHECKED_STATUS UnRegisterObserver(PeerMessageQueueObserver* observer); |
325 | | |
326 | | bool CanPeerBecomeLeader(const std::string& peer_uuid) const; |
327 | | |
328 | | OpId PeerLastReceivedOpId(const TabletServerId& uuid) const; |
329 | | |
330 | | std::string GetUpToDatePeer() const; |
331 | | |
332 | | struct Metrics { |
333 | | // Keeps track of the number of ops. that are completed by a majority but still need |
334 | | // to be replicated to a minority (IsDone() is true, IsAllDone() is false). |
335 | | scoped_refptr<AtomicGauge<int64_t> > num_majority_done_ops; |
336 | | // Keeps track of the number of ops. that are still in progress (IsDone() returns false). |
337 | | scoped_refptr<AtomicGauge<int64_t> > num_in_progress_ops; |
338 | | |
339 | | explicit Metrics(const scoped_refptr<MetricEntity>& metric_entity); |
340 | | }; |
341 | | |
342 | | virtual ~PeerMessageQueue(); |
343 | | |
344 | | void NotifyObserversOfFailedFollower(const std::string& uuid, |
345 | | const std::string& reason); |
346 | | |
347 | 0 | void SetContext(ConsensusContext* context) { |
348 | 0 | context_ = context; |
349 | 0 | } |
350 | | |
351 | 5 | const CloudInfoPB& local_cloud_info() const { |
352 | 5 | return local_peer_pb_.cloud_info(); |
353 | 5 | } |
354 | | |
355 | | // Read replicated log records starting from the OpId immediately after last_op_id. |
356 | | Result<ReadOpsResult> ReadReplicatedMessagesForCDC( |
357 | | const yb::OpId& last_op_id, |
358 | | int64_t* last_replicated_opid_index = nullptr, |
359 | | const CoarseTimePoint deadline = CoarseTimePoint::max()); |
360 | | |
361 | | void UpdateCDCConsumerOpId(const yb::OpId& op_id); |
362 | | |
363 | | // Get the maximum op ID that can be evicted for CDC consumer from log cache. |
364 | | yb::OpId GetCDCConsumerOpIdToEvict(); |
365 | | yb::OpId GetCDCConsumerOpIdForIntentRemoval(); |
366 | | |
367 | | |
368 | | size_t LogCacheSize(); |
369 | | size_t EvictLogCache(size_t bytes_to_evict); |
370 | | |
371 | | CHECKED_STATUS FlushLogIndex(); |
372 | | |
373 | | // Start memory tracking of following operations in case they are still present in our caches. |
374 | | void TrackOperationsMemory(const OpIds& op_ids); |
375 | | |
376 | 25.5M | const server::ClockPtr& clock() const { |
377 | 25.5M | return clock_; |
378 | 25.5M | } |
379 | | |
380 | | Result<OpId> TEST_GetLastOpIdWithType(int64_t max_allowed_index, OperationType op_type); |
381 | | |
382 | | private: |
383 | | FRIEND_TEST(ConsensusQueueTest, TestQueueAdvancesCommittedIndex); |
384 | | FRIEND_TEST(ConsensusQueueTest, TestReadReplicatedMessagesForCDC); |
385 | | |
386 | | // Mode specifies how the queue currently behaves: |
387 | | // |
388 | | // LEADER - Means the queue tracks remote peers and replicates whatever messages are appended. |
389 | | // Observers are notified of changes. |
390 | | // |
391 | | // NON_LEADER - Means the queue only tracks the local peer (remote peers are ignored). Observers |
392 | | // are not notified of changes. |
393 | | enum class Mode { |
394 | | LEADER, |
395 | | NON_LEADER |
396 | | }; |
397 | | |
398 | | static const char* ModeToStr(Mode mode); |
399 | | friend std::ostream& operator <<(std::ostream& out, Mode mode); |
400 | | |
401 | | enum class State { |
402 | | kQueueConstructed, |
403 | | kQueueOpen, |
404 | | kQueueClosed |
405 | | }; |
406 | | |
407 | | static const char* StateToStr(State state); |
408 | | friend std::ostream& operator <<(std::ostream& out, State mode); |
409 | | |
410 | | static constexpr ssize_t kUninitializedMajoritySize = -1; |
411 | | |
412 | | struct QueueState { |
413 | | |
414 | | // The last operation that has been replicated to all currently tracked peers. |
415 | | OpId all_replicated_op_id = OpId::Min(); |
416 | | |
417 | | // The last operation that has been replicated to all currently non-lagging tracked peers. |
418 | | OpId all_nonlagging_replicated_op_id = OpId::Min(); |
419 | | |
420 | | // The last operation that has been applied by all currently tracked peers. |
421 | | OpId all_applied_op_id = OpId::Min(); |
422 | | |
423 | | // The index of the last operation replicated to a majority. This is usually the same as |
424 | | // 'committed_op_id' but might not be if the terms changed. |
425 | | OpId majority_replicated_op_id = OpId::Min(); |
426 | | |
427 | | // The index of the last operation to be considered committed. |
428 | | OpId committed_op_id = OpId::Min(); |
429 | | |
430 | | // The ID of the last applied operation. |
431 | | OpId last_applied_op_id = OpId::Min(); |
432 | | |
433 | | // The opid of the last operation appended to the queue. |
434 | | OpId last_appended = OpId::Min(); |
435 | | |
436 | | // The queue's owner current_term. Set by the last appended operation. If the queue owner's |
437 | | // term is less than the term observed from another peer the queue owner must step down. |
438 | | // TODO: it is likely to be cleaner to get this from the ConsensusMetadata rather than by |
439 | | // snooping on what operations are appended to the queue. |
440 | | int64_t current_term = OpId::Min().term; |
441 | | |
442 | | // The size of the majority for the queue. |
443 | | ssize_t majority_size_ = kUninitializedMajoritySize; |
444 | | |
445 | | State state = State::kQueueConstructed; |
446 | | |
447 | | // The current mode of the queue. |
448 | | Mode mode = Mode::NON_LEADER; |
449 | | |
450 | | // The currently-active raft config. Only set if in LEADER mode. |
451 | | std::unique_ptr<RaftConfigPB> active_config; |
452 | | |
453 | | std::string ToString() const; |
454 | | }; |
455 | | |
456 | | // Returns true iff given 'desired_op' is found in the local WAL. |
457 | | // If the op is not found, returns false. |
458 | | // If the log cache returns some error other than NotFound, crashes with a fatal error. |
459 | | bool IsOpInLog(const yb::OpId& desired_op) const; |
460 | | |
461 | | void NotifyObserversOfMajorityReplOpChange(const MajorityReplicatedData& data); |
462 | | |
463 | | void NotifyObserversOfMajorityReplOpChangeTask(const MajorityReplicatedData& data); |
464 | | |
465 | | void NotifyObserversOfTermChange(int64_t term); |
466 | | |
467 | | void NotifyObserversOfFailedFollower(const std::string& uuid, |
468 | | int64_t term, |
469 | | const std::string& reason); |
470 | | |
471 | | template <class Func> |
472 | | void NotifyObservers(const char* title, Func&& func); |
473 | | |
474 | | typedef std::unordered_map<std::string, TrackedPeer*> PeersMap; |
475 | | |
476 | | std::string ToStringUnlocked() const; |
477 | | |
478 | | std::string LogPrefixUnlocked() const; |
479 | | |
480 | | // Updates the metrics based on index math. |
481 | | void UpdateMetrics(); |
482 | | |
483 | | void ClearUnlocked(); |
484 | | |
485 | | // Returns the last operation in the message queue, or 'preceding_first_op_in_queue_' if the queue |
486 | | // is empty. |
487 | | const OpIdPB& GetLastOp() const; |
488 | | |
489 | | TrackedPeer* TrackPeerUnlocked(const std::string& uuid); |
490 | | |
491 | | // Checks that if the queue is in LEADER mode then all registered peers are in the active config. |
492 | | // Crashes with a FATAL log message if this invariant does not hold. If the queue is in NON_LEADER |
493 | | // mode, does nothing. |
494 | | void CheckPeersInActiveConfigIfLeaderUnlocked() const; |
495 | | |
496 | | // Callback when a REPLICATE message has finished appending to the local log. |
497 | | void LocalPeerAppendFinished(const OpId& id, const Status& status); |
498 | | |
499 | | void NumSSTFilesChanged(); |
500 | | |
501 | | // Updates op id replicated on each node. |
502 | | void UpdateAllReplicatedOpId(OpId* result) REQUIRES(queue_lock_); |
503 | | |
504 | | // Updates op ID applied on each node. |
505 | | void UpdateAllAppliedOpId(OpId* result) REQUIRES(queue_lock_); |
506 | | |
507 | | // Updates op id replicated on each non-lagging node. |
508 | | void UpdateAllNonLaggingReplicatedOpId(int32_t threshold) REQUIRES(queue_lock_); |
509 | | |
510 | | // Policy is responsible for tuning of watermark calculation. |
511 | | // I.e. simple leader lease or hybrid time leader lease etc. |
512 | | // It should provide result type and a function for extracting a value from a peer. |
513 | | template <class Policy> |
514 | | typename Policy::result_type GetWatermark(); |
515 | | |
516 | | CoarseTimePoint LeaderLeaseExpirationWatermark(); |
517 | | MicrosTime HybridTimeLeaseExpirationWatermark(); |
518 | | OpId OpIdWatermark(); |
519 | | uint64_t NumSSTFilesWatermark(); |
520 | | |
521 | | // Reads operations from the log cache in the range (after_index, to_index]. |
522 | | // |
523 | | // If 'to_index' is 0, then all operations after 'after_index' will be included. |
524 | | Result<ReadOpsResult> ReadFromLogCache( |
525 | | int64_t after_index, |
526 | | int64_t to_index, |
527 | | size_t max_batch_size, |
528 | | const std::string& peer_uuid, |
529 | | const CoarseTimePoint deadline = CoarseTimePoint::max()); |
530 | | |
531 | | std::vector<PeerMessageQueueObserver*> observers_; |
532 | | |
533 | | // The pool token which executes observer notifications. |
534 | | std::unique_ptr<ThreadPoolToken> raft_pool_observers_token_; |
535 | | |
536 | | // PB containing identifying information about the local peer. |
537 | | const RaftPeerPB local_peer_pb_; |
538 | | const yb::PeerId local_peer_uuid_; |
539 | | |
540 | | const TabletId tablet_id_; |
541 | | |
542 | | QueueState queue_state_; |
543 | | |
544 | | // The currently tracked peers. |
545 | | PeersMap peers_map_; |
546 | | TrackedPeer* local_peer_ = nullptr; |
547 | | |
548 | | using LockType = simple_spinlock; |
549 | | using LockGuard = std::lock_guard<LockType>; |
550 | | mutable LockType queue_lock_; // TODO: rename |
551 | | |
552 | | // We assume that we never have multiple threads racing to append to the queue. This fake mutex |
553 | | // adds some extra assurance that this implementation property doesn't change. |
554 | | DFAKE_MUTEX(append_fake_lock_); |
555 | | |
556 | | LogCache log_cache_; |
557 | | |
558 | | std::shared_ptr<MemTracker> operations_mem_tracker_; |
559 | | |
560 | | Metrics metrics_; |
561 | | |
562 | | server::ClockPtr clock_; |
563 | | |
564 | | ConsensusContext* context_ = nullptr; |
565 | | bool installed_num_sst_files_changed_listener_ = false; |
566 | | |
567 | | // Used to protect cdc_consumer_op_id_ and cdc_consumer_op_id_last_updated_. |
568 | | mutable rw_spinlock cdc_consumer_lock_; |
569 | | yb::OpId cdc_consumer_op_id_ = yb::OpId::Max(); |
570 | | CoarseTimePoint cdc_consumer_op_id_last_updated_ = ToCoarse(MonoTime::kMin); |
571 | | }; |
572 | | |
573 | 0 | inline std::ostream& operator <<(std::ostream& out, PeerMessageQueue::Mode mode) { |
574 | 0 | return out << PeerMessageQueue::ModeToStr(mode); |
575 | 0 | } |
576 | | |
577 | 0 | inline std::ostream& operator <<(std::ostream& out, PeerMessageQueue::State state) { |
578 | 0 | return out << PeerMessageQueue::StateToStr(state); |
579 | 0 | } |
580 | | |
581 | | struct MajorityReplicatedData { |
582 | | OpId op_id; |
583 | | CoarseTimePoint leader_lease_expiration; |
584 | | MicrosTime ht_lease_expiration; |
585 | | uint64_t num_sst_files; |
586 | | |
587 | | // Update was caused by the following peer, that received all operations. |
588 | | TabletServerId peer_got_all_ops; |
589 | | |
590 | | std::string ToString() const; |
591 | | }; |
592 | | |
593 | | // The interface between RaftConsensus and the PeerMessageQueue. |
594 | | class PeerMessageQueueObserver { |
595 | | public: |
596 | | // Called by the queue each time the response for a peer is handled with the resulting majority |
597 | | // replicated index. The consensus implementation decides the commit index based on that and |
598 | | // triggers the apply for pending transactions. |
599 | | // |
600 | | // 'committed_index' is set to the id of the last operation considered committed by consensus. |
601 | | // `last_applied_op_id` is set the ID of the last operation applied by consensus. |
602 | | // |
603 | | // The implementation is idempotent, i.e. independently of the ordering of calls to this method |
604 | | // only non-triggered applys will be started. |
605 | | virtual void UpdateMajorityReplicated( |
606 | | const MajorityReplicatedData& data, OpId* committed_index, OpId* last_applied_op_id) = 0; |
607 | | |
608 | | // Notify the Consensus implementation that a follower replied with a term higher than that |
609 | | // established in the queue. |
610 | | virtual void NotifyTermChange(int64_t term) = 0; |
611 | | |
612 | | // Notify Consensus that a peer is unable to catch up due to falling behind the leader's log GC |
613 | | // threshold. |
614 | | virtual void NotifyFailedFollower(const std::string& peer_uuid, |
615 | | int64_t term, |
616 | | const std::string& reason) = 0; |
617 | | |
618 | | virtual void MajorityReplicatedNumSSTFilesChanged(uint64_t majority_replicated_num_sst_files) = 0; |
619 | | |
620 | 75.6k | virtual ~PeerMessageQueueObserver() {} |
621 | | }; |
622 | | |
623 | | Status ValidateFlags(); |
624 | | |
625 | | } // namespace consensus |
626 | | } // namespace yb |
627 | | |
628 | | #endif // YB_CONSENSUS_CONSENSUS_QUEUE_H_ |