/Users/deen/code/yugabyte-db/src/yb/consensus/consensus_queue.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | // |
18 | | // The following only applies to changes made to this file as part of YugaByte development. |
19 | | // |
20 | | // Portions Copyright (c) YugaByte, Inc. |
21 | | // |
22 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
23 | | // in compliance with the License. You may obtain a copy of the License at |
24 | | // |
25 | | // http://www.apache.org/licenses/LICENSE-2.0 |
26 | | // |
27 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
28 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
29 | | // or implied. See the License for the specific language governing permissions and limitations |
30 | | // under the License. |
31 | | // |
32 | | |
33 | | #include "yb/consensus/consensus_queue.h" |
34 | | |
35 | | #include <algorithm> |
36 | | #include <mutex> |
37 | | #include <shared_mutex> |
38 | | #include <string> |
39 | | #include <utility> |
40 | | |
41 | | #include <boost/container/small_vector.hpp> |
42 | | #include <glog/logging.h> |
43 | | |
44 | | #include "yb/consensus/consensus_context.h" |
45 | | #include "yb/consensus/log_util.h" |
46 | | #include "yb/consensus/opid_util.h" |
47 | | #include "yb/consensus/quorum_util.h" |
48 | | #include "yb/consensus/raft_consensus.h" |
49 | | #include "yb/consensus/replicate_msgs_holder.h" |
50 | | |
51 | | #include "yb/gutil/bind.h" |
52 | | #include "yb/gutil/dynamic_annotations.h" |
53 | | #include "yb/gutil/map-util.h" |
54 | | #include "yb/gutil/stl_util.h" |
55 | | #include "yb/gutil/strings/substitute.h" |
56 | | |
57 | | #include "yb/util/enums.h" |
58 | | #include "yb/util/fault_injection.h" |
59 | | #include "yb/util/flag_tags.h" |
60 | | #include "yb/util/locks.h" |
61 | | #include "yb/util/logging.h" |
62 | | #include "yb/util/mem_tracker.h" |
63 | | #include "yb/util/metrics.h" |
64 | | #include "yb/util/monotime.h" |
65 | | #include "yb/util/random_util.h" |
66 | | #include "yb/util/result.h" |
67 | | #include "yb/util/size_literals.h" |
68 | | #include "yb/util/status_log.h" |
69 | | #include "yb/util/threadpool.h" |
70 | | #include "yb/util/tostring.h" |
71 | | #include "yb/util/url-coding.h" |
72 | | |
73 | | using namespace std::literals; |
74 | | using namespace yb::size_literals; |
75 | | |
76 | | DECLARE_uint64(rpc_max_message_size); |
77 | | |
78 | | // We expect that consensus_max_batch_size_bytes + 1_KB would be less than rpc_max_message_size. |
79 | | // Otherwise such batch would be rejected by RPC layer. |
80 | | DEFINE_uint64(consensus_max_batch_size_bytes, 4_MB, |
81 | | "The maximum per-tablet RPC batch size when updating peers."); |
82 | | TAG_FLAG(consensus_max_batch_size_bytes, advanced); |
83 | | TAG_FLAG(consensus_max_batch_size_bytes, runtime); |
84 | | |
85 | | DEFINE_int32(follower_unavailable_considered_failed_sec, 900, |
86 | | "Seconds that a leader is unable to successfully heartbeat to a " |
87 | | "follower after which the follower is considered to be failed and " |
88 | | "evicted from the config."); |
89 | | TAG_FLAG(follower_unavailable_considered_failed_sec, advanced); |
90 | | |
91 | | DEFINE_int32(consensus_inject_latency_ms_in_notifications, 0, |
92 | | "Injects a random sleep between 0 and this many milliseconds into " |
93 | | "asynchronous notifications from the consensus queue back to the " |
94 | | "consensus implementation."); |
95 | | TAG_FLAG(consensus_inject_latency_ms_in_notifications, hidden); |
96 | | TAG_FLAG(consensus_inject_latency_ms_in_notifications, unsafe); |
97 | | |
98 | | DEFINE_int32(cdc_checkpoint_opid_interval_ms, 60 * 1000, |
99 | | "Interval up to which CDC consumer's checkpoint is considered for retaining log cache." |
100 | | "If we haven't received an updated checkpoint from CDC consumer within the interval " |
101 | | "specified by cdc_checkpoint_opid_interval, then log cache does not consider that " |
102 | | "consumer while determining which op IDs to evict."); |
103 | | |
104 | | DEFINE_int64(cdc_intent_retention_ms, 4 * 3600 * 1000, |
105 | | "Interval up to which CDC consumer's checkpoint is considered for retaining intents." |
106 | | "If we haven't received an updated checkpoint from CDC consumer within the interval " |
107 | | "specified by cdc_checkpoint_opid_interval, then CDC does not consider that " |
108 | | "consumer while determining which op IDs to delete from the intent."); |
109 | | |
110 | | DEFINE_bool(enable_consensus_exponential_backoff, true, |
111 | | "Whether exponential backoff based on number of retransmissions at tablet leader " |
112 | | "for number of entries to replicate to lagging follower is enabled."); |
113 | | TAG_FLAG(enable_consensus_exponential_backoff, advanced); |
114 | | TAG_FLAG(enable_consensus_exponential_backoff, runtime); |
115 | | |
116 | | DEFINE_int32(consensus_lagging_follower_threshold, 10, |
117 | | "Number of retransmissions at tablet leader to mark a follower as lagging. " |
118 | | "-1 disables the feature."); |
119 | | TAG_FLAG(consensus_lagging_follower_threshold, advanced); |
120 | | TAG_FLAG(consensus_lagging_follower_threshold, runtime); |
121 | | |
122 | | DEFINE_test_flag(bool, disallow_lmp_failures, false, |
123 | | "Whether we disallow PRECEDING_ENTRY_DIDNT_MATCH failures for non new peers."); |
124 | | |
125 | | namespace { |
126 | | |
127 | | constexpr const auto kMinRpcThrottleThresholdBytes = 16; |
128 | | |
129 | 10.3k | static bool RpcThrottleThresholdBytesValidator(const char* flagname, int64_t value) { |
130 | 10.3k | if (value > 0) { |
131 | 10.3k | if (value < kMinRpcThrottleThresholdBytes) { |
132 | 0 | LOG(ERROR) << "Expect " << flagname << " to be at least " << kMinRpcThrottleThresholdBytes; |
133 | 0 | return false; |
134 | 10.3k | } else if (implicit_cast<size_t>(value) >= FLAGS_consensus_max_batch_size_bytes) { |
135 | 0 | LOG(ERROR) << "Expect " << flagname << " to be less than consensus_max_batch_size_bytes " |
136 | 0 | << "value (" << FLAGS_consensus_max_batch_size_bytes << ")"; |
137 | 0 | return false; |
138 | 0 | } |
139 | 10.3k | } |
140 | 10.3k | return true; |
141 | 10.3k | } |
142 | | |
143 | | } // namespace |
144 | | |
145 | | DECLARE_int64(rpc_throttle_threshold_bytes); |
146 | | |
147 | | namespace yb { |
148 | | namespace consensus { |
149 | | |
150 | | using log::Log; |
151 | | using std::unique_ptr; |
152 | | using rpc::Messenger; |
153 | | using strings::Substitute; |
154 | | |
155 | | METRIC_DEFINE_gauge_int64(tablet, majority_done_ops, "Leader Operations Acked by Majority", |
156 | | MetricUnit::kOperations, |
157 | | "Number of operations in the leader queue ack'd by a majority but " |
158 | | "not all peers."); |
159 | | METRIC_DEFINE_gauge_int64(tablet, in_progress_ops, "Leader Operations in Progress", |
160 | | MetricUnit::kOperations, |
161 | | "Number of operations in the leader queue ack'd by a minority of " |
162 | | "peers."); |
163 | | |
164 | | const auto kCDCConsumerCheckpointInterval = FLAGS_cdc_checkpoint_opid_interval_ms * 1ms; |
165 | | |
166 | | const auto kCDCConsumerIntentRetention = FLAGS_cdc_intent_retention_ms * 1ms; |
167 | | |
168 | 0 | std::string MajorityReplicatedData::ToString() const { |
169 | 0 | return Format( |
170 | 0 | "{ op_id: $0 leader_lease_expiration: $1 ht_lease_expiration: $2 num_sst_files: $3 }", |
171 | 0 | op_id, leader_lease_expiration, ht_lease_expiration, num_sst_files); |
172 | 0 | } |
173 | | |
174 | 68.6k | std::string PeerMessageQueue::TrackedPeer::ToString() const { |
175 | 68.6k | return Format( |
176 | 68.6k | "{ peer: $0 is_new: $1 last_received: $2 next_index: $3 last_known_committed_idx: $4 " |
177 | 68.6k | "is_last_exchange_successful: $5 needs_remote_bootstrap: $6 member_type: $7 " |
178 | 68.6k | "num_sst_files: $8 last_applied: $9 }", |
179 | 68.6k | uuid, is_new, last_received, next_index, last_known_committed_idx, |
180 | 68.6k | is_last_exchange_successful, needs_remote_bootstrap, PeerMemberType_Name(member_type), |
181 | 68.6k | num_sst_files, last_applied); |
182 | 68.6k | } |
183 | | |
184 | 44.3k | void PeerMessageQueue::TrackedPeer::ResetLeaderLeases() { |
185 | 44.3k | leader_lease_expiration.Reset(); |
186 | 44.3k | leader_ht_lease_expiration.Reset(); |
187 | 44.3k | } |
188 | | |
189 | 15.2M | void PeerMessageQueue::TrackedPeer::ResetLastRequest() { |
190 | | // Reset so that next transmission is not considered a re-transmission. |
191 | 15.2M | last_num_messages_sent = -1; |
192 | 15.2M | current_retransmissions = -1; |
193 | 15.2M | } |
194 | | |
195 | | #define INSTANTIATE_METRIC(x) \ |
196 | | x.Instantiate(metric_entity, 0) |
197 | | PeerMessageQueue::Metrics::Metrics(const scoped_refptr<MetricEntity>& metric_entity) |
198 | | : num_majority_done_ops(INSTANTIATE_METRIC(METRIC_majority_done_ops)), |
199 | 88.8k | num_in_progress_ops(INSTANTIATE_METRIC(METRIC_in_progress_ops)) { |
200 | 88.8k | } |
201 | | #undef INSTANTIATE_METRIC |
202 | | |
203 | | PeerMessageQueue::PeerMessageQueue(const scoped_refptr<MetricEntity>& metric_entity, |
204 | | const scoped_refptr<log::Log>& log, |
205 | | const MemTrackerPtr& server_tracker, |
206 | | const MemTrackerPtr& parent_tracker, |
207 | | const RaftPeerPB& local_peer_pb, |
208 | | const string& tablet_id, |
209 | | const server::ClockPtr& clock, |
210 | | ConsensusContext* context, |
211 | | unique_ptr<ThreadPoolToken> raft_pool_token) |
212 | | : raft_pool_observers_token_(std::move(raft_pool_token)), |
213 | | local_peer_pb_(local_peer_pb), |
214 | | local_peer_uuid_(local_peer_pb_.has_permanent_uuid() ? local_peer_pb_.permanent_uuid() |
215 | | : string()), |
216 | | tablet_id_(tablet_id), |
217 | | log_cache_(metric_entity, log, server_tracker, local_peer_pb.permanent_uuid(), tablet_id), |
218 | | operations_mem_tracker_( |
219 | | MemTracker::FindOrCreateTracker("OperationsFromDisk", parent_tracker)), |
220 | | metrics_(metric_entity), |
221 | | clock_(clock), |
222 | 88.8k | context_(context) { |
223 | 88.8k | DCHECK(local_peer_pb_.has_permanent_uuid()); |
224 | 88.8k | DCHECK(!local_peer_pb_.last_known_private_addr().empty()); |
225 | 88.8k | } |
226 | | |
227 | 88.7k | void PeerMessageQueue::Init(const OpId& last_locally_replicated) { |
228 | 88.7k | LockGuard lock(queue_lock_); |
229 | 88.7k | CHECK_EQ(queue_state_.state, State::kQueueConstructed); |
230 | 88.7k | log_cache_.Init(last_locally_replicated.ToPB<OpIdPB>()); |
231 | 88.7k | queue_state_.last_appended = last_locally_replicated; |
232 | 88.7k | queue_state_.state = State::kQueueOpen; |
233 | 88.7k | local_peer_ = TrackPeerUnlocked(local_peer_uuid_); |
234 | | |
235 | 88.7k | if (context_) { |
236 | 88.7k | context_->ListenNumSSTFilesChanged(std::bind(&PeerMessageQueue::NumSSTFilesChanged, this)); |
237 | 88.7k | installed_num_sst_files_changed_listener_ = true; |
238 | 88.7k | } |
239 | 88.7k | } |
240 | | |
241 | | void PeerMessageQueue::SetLeaderMode(const OpId& committed_op_id, |
242 | | int64_t current_term, |
243 | | const OpId& last_applied_op_id, |
244 | 38.1k | const RaftConfigPB& active_config) { |
245 | 38.1k | LockGuard lock(queue_lock_); |
246 | 38.1k | queue_state_.current_term = current_term; |
247 | 38.1k | queue_state_.committed_op_id = committed_op_id; |
248 | 38.1k | queue_state_.last_applied_op_id = last_applied_op_id; |
249 | 38.1k | queue_state_.majority_replicated_op_id = committed_op_id; |
250 | 38.1k | queue_state_.active_config.reset(new RaftConfigPB(active_config)); |
251 | 6 | CHECK(IsRaftConfigVoter(local_peer_uuid_, *queue_state_.active_config)) |
252 | 6 | << local_peer_pb_.ShortDebugString() << " not a voter in config: " |
253 | 6 | << queue_state_.active_config->ShortDebugString(); |
254 | 38.1k | queue_state_.majority_size_ = MajoritySize(CountVoters(*queue_state_.active_config)); |
255 | 38.1k | queue_state_.mode = Mode::LEADER; |
256 | | |
257 | 38.1k | LOG_WITH_PREFIX_UNLOCKED(INFO) << "Queue going to LEADER mode. State: " |
258 | 38.1k | << queue_state_.ToString(); |
259 | 38.1k | CheckPeersInActiveConfigIfLeaderUnlocked(); |
260 | | |
261 | | // Reset last communication time with all peers to reset the clock on the |
262 | | // failure timeout. |
263 | 38.1k | MonoTime now(MonoTime::Now()); |
264 | 44.3k | for (const PeersMap::value_type& entry : peers_map_) { |
265 | 44.3k | entry.second->ResetLeaderLeases(); |
266 | 44.3k | entry.second->last_successful_communication_time = now; |
267 | 44.3k | } |
268 | 38.1k | } |
269 | | |
270 | 93.6k | void PeerMessageQueue::SetNonLeaderMode() { |
271 | 93.6k | LockGuard lock(queue_lock_); |
272 | 93.6k | queue_state_.active_config.reset(); |
273 | 93.6k | queue_state_.mode = Mode::NON_LEADER; |
274 | 93.6k | queue_state_.majority_size_ = -1; |
275 | 93.6k | LOG_WITH_PREFIX_UNLOCKED(INFO) << "Queue going to NON_LEADER mode. State: " |
276 | 93.6k | << queue_state_.ToString(); |
277 | 93.6k | } |
278 | | |
279 | 69.6k | void PeerMessageQueue::TrackPeer(const string& uuid) { |
280 | 69.6k | LockGuard lock(queue_lock_); |
281 | 69.6k | TrackPeerUnlocked(uuid); |
282 | 69.6k | } |
283 | | |
284 | 158k | PeerMessageQueue::TrackedPeer* PeerMessageQueue::TrackPeerUnlocked(const string& uuid) { |
285 | 27 | CHECK(!uuid.empty()) << "Got request to track peer with empty UUID"; |
286 | 158k | DCHECK_EQ(queue_state_.state, State::kQueueOpen); |
287 | | |
288 | 158k | TrackedPeer* tracked_peer = new TrackedPeer(uuid); |
289 | | |
290 | | // We don't know the last operation received by the peer so, following the Raft protocol, we set |
291 | | // next_index to one past the end of our own log. This way, if calling this method is the result |
292 | | // of a successful leader election and the logs between the new leader and remote peer match, the |
293 | | // peer->next_index will point to the index of the soon-to-be-written NO_OP entry that is used to |
294 | | // assert leadership. If we guessed wrong, and the peer does not have a log that matches ours, the |
295 | | // normal queue negotiation process will eventually find the right point to resume from. |
296 | 158k | tracked_peer->next_index = queue_state_.last_appended.index + 1; |
297 | 158k | InsertOrDie(&peers_map_, uuid, tracked_peer); |
298 | | |
299 | 158k | CheckPeersInActiveConfigIfLeaderUnlocked(); |
300 | | |
301 | | // We don't know how far back this peer is, so set the all replicated watermark to |
302 | | // MinimumOpId. We'll advance it when we know how far along the peer is. |
303 | 158k | queue_state_.all_replicated_op_id = OpId::Min(); |
304 | 158k | return tracked_peer; |
305 | 158k | } |
306 | | |
307 | 43.2k | void PeerMessageQueue::UntrackPeer(const string& uuid) { |
308 | 43.2k | LockGuard lock(queue_lock_); |
309 | 43.2k | TrackedPeer* peer = EraseKeyReturnValuePtr(&peers_map_, uuid); |
310 | 43.2k | if (peer != nullptr) { |
311 | 43.2k | delete peer; |
312 | 43.2k | } |
313 | 43.2k | } |
314 | | |
315 | 196k | void PeerMessageQueue::CheckPeersInActiveConfigIfLeaderUnlocked() const { |
316 | 196k | if (queue_state_.mode != Mode::LEADER) return; |
317 | 107k | std::unordered_set<std::string> config_peer_uuids; |
318 | 328k | for (const RaftPeerPB& peer_pb : queue_state_.active_config->peers()) { |
319 | 328k | InsertOrDie(&config_peer_uuids, peer_pb.permanent_uuid()); |
320 | 328k | } |
321 | 223k | for (const PeersMap::value_type& entry : peers_map_) { |
322 | 223k | if (!ContainsKey(config_peer_uuids, entry.first)) { |
323 | 0 | LOG_WITH_PREFIX_UNLOCKED(FATAL) << Substitute("Peer $0 is not in the active config. " |
324 | 0 | "Queue state: $1", |
325 | 0 | entry.first, |
326 | 0 | queue_state_.ToString()); |
327 | 0 | } |
328 | 223k | } |
329 | 107k | } |
330 | | |
331 | 2.40k | void PeerMessageQueue::NumSSTFilesChanged() { |
332 | 2.40k | auto num_sst_files = context_->NumSSTFiles(); |
333 | | |
334 | 2.40k | uint64_t majority_replicated_num_sst_files; |
335 | 2.40k | { |
336 | 2.40k | LockGuard lock(queue_lock_); |
337 | 2.40k | if (queue_state_.mode != Mode::LEADER) { |
338 | 964 | return; |
339 | 964 | } |
340 | 1.44k | auto it = peers_map_.find(local_peer_uuid_); |
341 | 1.44k | if (it == peers_map_.end()) { |
342 | 0 | return; |
343 | 0 | } |
344 | 1.44k | it->second->num_sst_files = num_sst_files; |
345 | 1.44k | majority_replicated_num_sst_files = NumSSTFilesWatermark(); |
346 | 1.44k | } |
347 | | |
348 | 1.44k | NotifyObservers( |
349 | 1.44k | "majority replicated num SST files changed", |
350 | 1.44k | [majority_replicated_num_sst_files](PeerMessageQueueObserver* observer) { |
351 | 1.44k | observer->MajorityReplicatedNumSSTFilesChanged(majority_replicated_num_sst_files); |
352 | 1.44k | }); |
353 | 1.44k | } |
354 | | |
355 | 13.1M | void PeerMessageQueue::LocalPeerAppendFinished(const OpId& id, const Status& status) { |
356 | 13.1M | CHECK_OK(status); |
357 | | |
358 | | // Fake an RPC response from the local peer. |
359 | | // TODO: we should probably refactor the ResponseFromPeer function so that we don't need to |
360 | | // construct this fake response, but this seems to work for now. |
361 | 13.1M | ConsensusResponsePB fake_response; |
362 | 13.1M | id.ToPB(fake_response.mutable_status()->mutable_last_received()); |
363 | 13.1M | id.ToPB(fake_response.mutable_status()->mutable_last_received_current_leader()); |
364 | 13.1M | if (context_) { |
365 | 13.1M | fake_response.set_num_sst_files(context_->NumSSTFiles()); |
366 | 13.1M | } |
367 | 13.1M | { |
368 | 13.1M | LockGuard lock(queue_lock_); |
369 | | |
370 | | // TODO This ugly fix is required because we unlock queue_lock_ while doing AppendOperations. |
371 | | // So LocalPeerAppendFinished could be invoked before rest of AppendOperations. |
372 | 13.1M | if (queue_state_.last_appended.index < id.index) { |
373 | 47.7k | queue_state_.last_appended = id; |
374 | 47.7k | } |
375 | 13.1M | fake_response.mutable_status()->set_last_committed_idx(queue_state_.committed_op_id.index); |
376 | 13.1M | queue_state_.last_applied_op_id.ToPB(fake_response.mutable_status()->mutable_last_applied()); |
377 | | |
378 | 13.1M | if (queue_state_.mode != Mode::LEADER) { |
379 | 8.21M | log_cache_.EvictThroughOp(id.index); |
380 | | |
381 | 8.21M | UpdateMetrics(); |
382 | 8.21M | return; |
383 | 8.21M | } |
384 | 4.92M | } |
385 | 4.92M | ResponseFromPeer(local_peer_uuid_, fake_response); |
386 | 4.92M | } |
387 | | |
388 | 768 | Status PeerMessageQueue::TEST_AppendOperation(const ReplicateMsgPtr& msg) { |
389 | 768 | return AppendOperations( |
390 | 768 | { msg }, yb::OpId::FromPB(msg->committed_op_id()), RestartSafeCoarseMonoClock().Now()); |
391 | 768 | } |
392 | | |
393 | | Status PeerMessageQueue::AppendOperations(const ReplicateMsgs& msgs, |
394 | | const yb::OpId& committed_op_id, |
395 | 13.1M | RestartSafeCoarseTimePoint batch_mono_time) { |
396 | 13.1M | DFAKE_SCOPED_LOCK(append_fake_lock_); |
397 | 13.1M | OpId last_id; |
398 | 13.1M | if (!msgs.empty()) { |
399 | 7.14M | std::unique_lock<simple_spinlock> lock(queue_lock_); |
400 | | |
401 | 7.14M | last_id = OpId::FromPB(msgs.back()->id()); |
402 | | |
403 | 7.14M | if (last_id.term > queue_state_.current_term) { |
404 | 68.3k | queue_state_.current_term = last_id.term; |
405 | 68.3k | } |
406 | 5.99M | } else { |
407 | 5.99M | std::unique_lock<simple_spinlock> lock(queue_lock_); |
408 | 5.99M | last_id = queue_state_.last_appended; |
409 | 5.99M | } |
410 | | |
411 | | // Unlock ourselves during Append to prevent a deadlock: it's possible that the log buffer is |
412 | | // full, in which case AppendOperations would block. However, for the log buffer to empty, it may |
413 | | // need to call LocalPeerAppendFinished() which also needs queue_lock_. |
414 | | // |
415 | | // Since we are doing AppendOperations only in one thread, no concurrent AppendOperations could |
416 | | // be executed and queue_state_.last_appended will be updated correctly. |
417 | 13.1M | RETURN_NOT_OK(log_cache_.AppendOperations( |
418 | 13.1M | msgs, committed_op_id, batch_mono_time, |
419 | 13.1M | Bind(&PeerMessageQueue::LocalPeerAppendFinished, Unretained(this), last_id))); |
420 | | |
421 | 13.1M | if (!msgs.empty()) { |
422 | 7.15M | std::unique_lock<simple_spinlock> lock(queue_lock_); |
423 | 7.15M | queue_state_.last_appended = last_id; |
424 | 7.15M | UpdateMetrics(); |
425 | 7.15M | } |
426 | | |
427 | 13.1M | return Status::OK(); |
428 | 13.1M | } |
429 | | |
430 | 3.05M | uint64_t GetNumMessagesToSendWithBackoff(int64_t last_num_messages_sent) { |
431 | 3.05M | return std::max<int64_t>((last_num_messages_sent >> 1) - 1, 0); |
432 | 3.05M | } |
433 | | |
434 | | Status PeerMessageQueue::RequestForPeer(const string& uuid, |
435 | | ConsensusRequestPB* request, |
436 | | ReplicateMsgsHolder* msgs_holder, |
437 | | bool* needs_remote_bootstrap, |
438 | | PeerMemberType* member_type, |
439 | 13.3M | bool* last_exchange_successful) { |
440 | 13.3M | static constexpr uint64_t kSendUnboundedLogOps = std::numeric_limits<uint64_t>::max(); |
441 | 14.6k | DCHECK(request->ops().empty()) << request->ShortDebugString(); |
442 | | |
443 | 13.3M | OpId preceding_id; |
444 | 13.3M | MonoDelta unreachable_time = MonoDelta::kMin; |
445 | 13.3M | bool is_voter = false; |
446 | 13.3M | bool is_new; |
447 | 13.3M | int64_t previously_sent_index; |
448 | 13.3M | uint64_t num_log_ops_to_send; |
449 | 13.3M | HybridTime propagated_safe_time; |
450 | | |
451 | | // Should be before now_ht, i.e. not greater than propagated_hybrid_time. |
452 | 13.3M | if (context_) { |
453 | 13.3M | propagated_safe_time = VERIFY_RESULT(context_->PreparePeerRequest()); |
454 | 13.3M | } |
455 | | |
456 | 13.3M | { |
457 | 13.3M | LockGuard lock(queue_lock_); |
458 | 13.3M | DCHECK_EQ(queue_state_.state, State::kQueueOpen); |
459 | 13.3M | DCHECK_NE(uuid, local_peer_uuid_); |
460 | | |
461 | 13.3M | auto peer = FindPtrOrNull(peers_map_, uuid); |
462 | 13.3M | if (PREDICT_FALSE(peer == nullptr || queue_state_.mode == Mode::NON_LEADER)) { |
463 | 20 | return STATUS(NotFound, "Peer not tracked or queue not in leader mode."); |
464 | 20 | } |
465 | | |
466 | 13.3M | HybridTime now_ht; |
467 | | |
468 | 13.3M | is_new = peer->is_new; |
469 | 13.3M | if (!is_new) { |
470 | 13.2M | now_ht = clock_->Now(); |
471 | | |
472 | 13.2M | auto ht_lease_expiration_micros = now_ht.GetPhysicalValueMicros() + |
473 | 13.2M | FLAGS_ht_lease_duration_ms * 1000; |
474 | 13.2M | auto leader_lease_duration_ms = GetAtomicFlag(&FLAGS_leader_lease_duration_ms); |
475 | 13.2M | request->set_leader_lease_duration_ms(leader_lease_duration_ms); |
476 | 13.2M | request->set_ht_lease_expiration(ht_lease_expiration_micros); |
477 | | |
478 | | // As noted here: |
479 | | // https://red.ht/2sCSErb |
480 | | // |
481 | | // The _COARSE variants are faster to read and have a precision (also known as resolution) of |
482 | | // one millisecond (ms). |
483 | | // |
484 | | // Coarse clock precision is 1 millisecond. |
485 | 13.2M | const auto kCoarseClockPrecision = 1ms; |
486 | | |
487 | | // Because of coarse clocks we subtract 2ms, to be sure that our local version of lease |
488 | | // does not expire after it expires at follower. |
489 | 13.2M | peer->leader_lease_expiration.last_sent = |
490 | 13.2M | CoarseMonoClock::Now() + leader_lease_duration_ms * 1ms - kCoarseClockPrecision * 2; |
491 | 13.2M | peer->leader_ht_lease_expiration.last_sent = ht_lease_expiration_micros; |
492 | 93.9k | } else { |
493 | 93.9k | now_ht = clock_->Now(); |
494 | 93.9k | request->clear_leader_lease_duration_ms(); |
495 | 93.9k | request->clear_ht_lease_expiration(); |
496 | 93.9k | peer->leader_lease_expiration.Reset(); |
497 | 93.9k | peer->leader_ht_lease_expiration.Reset(); |
498 | 93.9k | } |
499 | | // This is initialized to the queue's last appended op but gets set to the id of the |
500 | | // log entry preceding the first one in 'messages' if messages are found for the peer. |
501 | | // |
502 | | // The leader does not know the actual state of a peer but it should always send a value of |
503 | | // preceding_id that is present in the leader's own log, so the follower can verify the log |
504 | | // matching property. |
505 | | // |
506 | | // In case we decide not to send any messages to the follower this time due to exponential |
507 | | // backoff to an unresponsive follower, we will keep preceding_id equal to last_appended. |
508 | | // This is safe because unless the follower already has that operation, it will fail to find |
509 | | // it in its pending operations in EnforceLogMatchingPropertyMatchesUnlocked and will return |
510 | | // a log matching property violation error without applying any incorrect messages from its log. |
511 | | // |
512 | | // See this scenario for more context on the issue we are trying to avoid: |
513 | | // https://github.com/yugabyte/yugabyte-db/issues/8150#issuecomment-827821784 |
514 | 13.3M | preceding_id = queue_state_.last_appended; |
515 | | |
516 | 13.3M | request->set_propagated_hybrid_time(now_ht.ToUint64()); |
517 | | |
518 | | // NOTE: committed_op_id may be overwritten later. |
519 | | // In our system committed_op_id means that this operation was also applied. |
520 | | // If we have operation that applied significant time, followers would not know that this |
521 | | // operation is committed until it is applied in the leader. |
522 | | // To address this issue we use majority_replicated_op_id, that is updated before apply. |
523 | | // But we could use it only when its term matches current term, see Fig.8 in Raft paper. |
524 | 13.3M | if (queue_state_.majority_replicated_op_id.index > queue_state_.committed_op_id.index && |
525 | 375k | queue_state_.majority_replicated_op_id.term == queue_state_.current_term) { |
526 | 375k | queue_state_.majority_replicated_op_id.ToPB(request->mutable_committed_op_id()); |
527 | 13.0M | } else { |
528 | 13.0M | queue_state_.committed_op_id.ToPB(request->mutable_committed_op_id()); |
529 | 13.0M | } |
530 | | |
531 | 13.3M | request->set_caller_term(queue_state_.current_term); |
532 | 13.3M | unreachable_time = |
533 | 13.3M | MonoTime::Now().GetDeltaSince(peer->last_successful_communication_time); |
534 | 13.3M | if (member_type) *member_type = peer->member_type; |
535 | 13.3M | if (last_exchange_successful) *last_exchange_successful = peer->is_last_exchange_successful; |
536 | 13.3M | *needs_remote_bootstrap = peer->needs_remote_bootstrap; |
537 | | |
538 | 13.3M | previously_sent_index = peer->next_index - 1; |
539 | 13.3M | if (FLAGS_enable_consensus_exponential_backoff && peer->last_num_messages_sent >= 0) { |
540 | | // Previous request to peer has not been acked. Reduce number of entries to be sent |
541 | | // in this attempt using exponential backoff. Note that to_index is inclusive. |
542 | 3.05M | num_log_ops_to_send = GetNumMessagesToSendWithBackoff(peer->last_num_messages_sent); |
543 | 10.3M | } else { |
544 | | // Previous request to peer has been acked or a heartbeat response has been received. |
545 | | // Transmit as many entries as allowed. |
546 | 10.3M | num_log_ops_to_send = kSendUnboundedLogOps; |
547 | 10.3M | } |
548 | | |
549 | 13.3M | peer->current_retransmissions++; |
550 | | |
551 | 13.3M | if (peer->member_type == PeerMemberType::VOTER) { |
552 | 10.1M | is_voter = true; |
553 | 10.1M | } |
554 | 13.3M | } |
555 | | |
556 | 13.3M | if (unreachable_time.ToSeconds() > FLAGS_follower_unavailable_considered_failed_sec) { |
557 | 144 | if (!is_voter || CountVoters(*queue_state_.active_config) > 2) { |
558 | | // We never drop from 2 voters to 1 voter automatically, at least for now (12/4/18). We may |
559 | | // want to revisit this later, we're just being cautious with this. |
560 | | // We remove unconditionally any failed non-voter replica (PRE_VOTER, PRE_OBSERVER, OBSERVER). |
561 | 144 | string msg = Substitute("Leader has been unable to successfully communicate " |
562 | 144 | "with Peer $0 for more than $1 seconds ($2)", |
563 | 144 | uuid, |
564 | 144 | FLAGS_follower_unavailable_considered_failed_sec, |
565 | 144 | unreachable_time.ToString()); |
566 | 144 | NotifyObserversOfFailedFollower(uuid, queue_state_.current_term, msg); |
567 | 144 | } |
568 | 144 | } |
569 | | |
570 | 13.3M | if (PREDICT_FALSE(*needs_remote_bootstrap)) { |
571 | 272 | YB_LOG_WITH_PREFIX_UNLOCKED_EVERY_N_SECS(INFO, 30) |
572 | 272 | << "Peer needs remote bootstrap: " << uuid; |
573 | 4.98k | return Status::OK(); |
574 | 4.98k | } |
575 | 13.3M | *needs_remote_bootstrap = false; |
576 | | |
577 | 13.3M | request->clear_propagated_safe_time(); |
578 | | |
579 | | // If we've never communicated with the peer, we don't know what messages to send, so we'll send a |
580 | | // status-only request. If the peer has not responded to the point that our to_index == next_index |
581 | | // due to exponential backoff of replicated segment size, we also send a status-only request. |
582 | | // Otherwise, we grab requests from the log starting at the last_received point. |
583 | 13.3M | if (!is_new && num_log_ops_to_send > 0) { |
584 | | // The batch of messages to send to the peer. |
585 | 10.2M | auto max_batch_size = FLAGS_consensus_max_batch_size_bytes - request->ByteSizeLong(); |
586 | 10.2M | auto to_index = num_log_ops_to_send == kSendUnboundedLogOps ? |
587 | 10.2M | 0 : previously_sent_index + num_log_ops_to_send; |
588 | 10.2M | auto result = ReadFromLogCache(previously_sent_index, to_index, max_batch_size, uuid); |
589 | | |
590 | 10.2M | if (PREDICT_FALSE(!result.ok())) { |
591 | 0 | if (PREDICT_TRUE(result.status().IsNotFound())) { |
592 | 0 | std::string msg = Format("The logs necessary to catch up peer $0 have been " |
593 | 0 | "garbage collected. The follower will never be able " |
594 | 0 | "to catch up ($1)", uuid, result.status()); |
595 | 0 | NotifyObserversOfFailedFollower(uuid, queue_state_.current_term, msg); |
596 | 0 | } |
597 | 0 | return result.status(); |
598 | 0 | } |
599 | | |
600 | 10.2M | preceding_id = result->preceding_op; |
601 | | // We use AddAllocated rather than copy, because we pin the log cache at the "all replicated" |
602 | | // point. At some point we may want to allow partially loading (and not pinning) earlier |
603 | | // messages. At that point we'll need to do something smarter here, like copy or ref-count. |
604 | 5.19M | for (const auto& msg : result->messages) { |
605 | 5.19M | request->mutable_ops()->AddAllocated(msg.get()); |
606 | 5.19M | } |
607 | | |
608 | 10.2M | { |
609 | 10.2M | LockGuard lock(queue_lock_); |
610 | 10.2M | auto peer = FindPtrOrNull(peers_map_, uuid); |
611 | 10.2M | if (PREDICT_FALSE(peer == nullptr)) { |
612 | 0 | return STATUS(NotFound, "Peer not tracked."); |
613 | 0 | } |
614 | | |
615 | 10.2M | peer->last_num_messages_sent = result->messages.size(); |
616 | 10.2M | } |
617 | | |
618 | 10.2M | ScopedTrackedConsumption consumption; |
619 | 10.2M | if (result->read_from_disk_size) { |
620 | 1.17k | consumption = ScopedTrackedConsumption(operations_mem_tracker_, result->read_from_disk_size); |
621 | 1.17k | } |
622 | 10.2M | *msgs_holder = ReplicateMsgsHolder( |
623 | 10.2M | request->mutable_ops(), std::move(result->messages), std::move(consumption)); |
624 | | |
625 | 10.2M | if (propagated_safe_time && |
626 | 10.2M | !result->have_more_messages && |
627 | 10.2M | num_log_ops_to_send == kSendUnboundedLogOps) { |
628 | | // Get the current local safe time on the leader and propagate it to the follower. |
629 | 10.2M | request->set_propagated_safe_time(propagated_safe_time.ToUint64()); |
630 | 10.2M | } |
631 | 10.2M | } |
632 | | |
633 | 13.3M | preceding_id.ToPB(request->mutable_preceding_id()); |
634 | | |
635 | | // All entries committed at leader may not be available at lagging follower. |
636 | | // `commited_op_id` in this request may make a lagging follower aware of the |
637 | | // highest committed op index at the leader. We have a sanity check during tablet |
638 | | // bootstrap, in TabletBootstrap::PlaySegments(), that this tablet did not lose a |
639 | | // committed operation. Hence avoid sending a committed op id that is too large |
640 | | // to such a lagging follower. |
641 | | // If we send operations to it, then last know operation to this follower will be last sent |
642 | | // operation. If we don't send any operation, then last known operation will be preceding |
643 | | // operation. |
644 | | // We don't have to change committed_op_id when it is less than max_allowed_committed_op_id, |
645 | | // because it will have actual committed_op_id value and this operation is known to the |
646 | | // follower. |
647 | 13.3M | const auto max_allowed_committed_op_id = !request->ops().empty() |
648 | 8.87M | ? OpId::FromPB(request->ops().rbegin()->id()) : preceding_id; |
649 | 13.3M | if (max_allowed_committed_op_id.index < request->committed_op_id().index()) { |
650 | 2.05k | max_allowed_committed_op_id.ToPB(request->mutable_committed_op_id()); |
651 | 2.05k | } |
652 | | |
653 | 13.3M | if (PREDICT_FALSE(VLOG_IS_ON(2))) { |
654 | 0 | if (request->ops_size() > 0) { |
655 | 0 | VLOG_WITH_PREFIX_UNLOCKED(2) << "Sending request with operations to Peer: " << uuid |
656 | 0 | << ". Size: " << request->ops_size() |
657 | 0 | << ". From: " << request->ops(0).id().ShortDebugString() << ". To: " |
658 | 0 | << request->ops(request->ops_size() - 1).id().ShortDebugString(); |
659 | 0 | VLOG_WITH_PREFIX_UNLOCKED(3) << "Operations: " << yb::ToString(request->ops()); |
660 | 0 | } else { |
661 | 0 | VLOG_WITH_PREFIX_UNLOCKED(2) |
662 | 0 | << "Sending " << (is_new ? "new " : "") << "status only request to Peer: " << uuid |
663 | 0 | << ": " << request->ShortDebugString(); |
664 | 0 | } |
665 | 0 | } |
666 | | |
667 | 13.3M | return Status::OK(); |
668 | 13.3M | } |
669 | | |
670 | | Result<ReadOpsResult> PeerMessageQueue::ReadFromLogCache(int64_t after_index, |
671 | | int64_t to_index, |
672 | | size_t max_batch_size, |
673 | | const std::string& peer_uuid, |
674 | 10.2M | const CoarseTimePoint deadline) { |
675 | 10.2M | DCHECK_LT(FLAGS_consensus_max_batch_size_bytes + 1_KB, FLAGS_rpc_max_message_size); |
676 | | |
677 | | // We try to get the follower's next_index from our log. |
678 | | // Note this is not using "term" and needs to change |
679 | 10.2M | auto result = log_cache_.ReadOps(after_index, to_index, max_batch_size, deadline); |
680 | 10.2M | if (PREDICT_FALSE(!result.ok())) { |
681 | 0 | auto s = result.status(); |
682 | 0 | if (PREDICT_TRUE(s.IsNotFound())) { |
683 | 0 | return s; |
684 | 0 | } else if (s.IsIncomplete()) { |
685 | | // IsIncomplete() means that we tried to read beyond the head of the log (in the future). |
686 | | // KUDU-1078 points to a fix of this log spew issue that we've ported. This should not |
687 | | // happen under normal circumstances. |
688 | 0 | LOG_WITH_PREFIX_UNLOCKED(ERROR) << "Error trying to read ahead of the log " |
689 | 0 | << "while preparing peer request: " |
690 | 0 | << s.ToString() << ". Destination peer: " |
691 | 0 | << peer_uuid; |
692 | 0 | return s; |
693 | 0 | } else { |
694 | 0 | LOG_WITH_PREFIX_UNLOCKED(FATAL) << "Error reading the log while preparing peer request: " |
695 | 0 | << s.ToString() << ". Destination peer: " |
696 | 0 | << peer_uuid; |
697 | 0 | return s; |
698 | 0 | } |
699 | 10.2M | } |
700 | 10.2M | return result; |
701 | 10.2M | } |
702 | | |
703 | | // Read majority replicated messages from cache for CDC. |
704 | | // CDC producer will use this to get the messages to send in response to cdc::GetChanges RPC. |
705 | | Result<ReadOpsResult> PeerMessageQueue::ReadReplicatedMessagesForCDC( |
706 | | const yb::OpId& last_op_id, |
707 | | int64_t* repl_index, |
708 | 310 | const CoarseTimePoint deadline) { |
709 | | // The batch of messages read from cache. |
710 | | |
711 | 310 | int64_t to_index; |
712 | 310 | bool pending_messages = false; |
713 | 310 | { |
714 | 310 | LockGuard lock(queue_lock_); |
715 | | // Use committed_op_id because it's already been processed by the Transaction codepath. |
716 | 310 | to_index = queue_state_.committed_op_id.index; |
717 | | // Determine if there are pending operations in RAFT but not yet LogCache. |
718 | 310 | pending_messages = to_index != queue_state_.majority_replicated_op_id.index; |
719 | 310 | } |
720 | 310 | if (repl_index) { |
721 | 307 | *repl_index = to_index; |
722 | 307 | } |
723 | | |
724 | 310 | if (last_op_id.index >= to_index) { |
725 | | // Nothing to read. |
726 | 0 | return ReadOpsResult(); |
727 | 0 | } |
728 | | |
729 | | // If an empty OpID is only sent on the first read request, start at the earliest known entry. |
730 | 310 | int64_t after_op_index = last_op_id.empty() ? |
731 | 302 | max(log_cache_.earliest_op_index(), last_op_id.index) : |
732 | 8 | last_op_id.index; |
733 | | |
734 | 310 | auto result = ReadFromLogCache( |
735 | 310 | after_op_index, to_index, FLAGS_consensus_max_batch_size_bytes, local_peer_uuid_, deadline); |
736 | 310 | if (PREDICT_FALSE(!result.ok()) && PREDICT_TRUE(result.status().IsNotFound())) { |
737 | 0 | LOG_WITH_PREFIX_UNLOCKED(INFO) << Format( |
738 | 0 | "The logs from index $0 have been garbage collected and cannot be read ($1)", |
739 | 0 | after_op_index, result.status()); |
740 | 0 | } |
741 | 310 | if (result.ok()) { |
742 | 310 | result->have_more_messages |= pending_messages; |
743 | 310 | } |
744 | 310 | return result; |
745 | 310 | } |
746 | | |
747 | | Status PeerMessageQueue::GetRemoteBootstrapRequestForPeer(const string& uuid, |
748 | 4.91k | StartRemoteBootstrapRequestPB* req) { |
749 | 4.91k | TrackedPeer* peer = nullptr; |
750 | 4.91k | { |
751 | 4.91k | LockGuard lock(queue_lock_); |
752 | 4.91k | DCHECK_EQ(queue_state_.state, State::kQueueOpen); |
753 | 4.91k | DCHECK_NE(uuid, local_peer_uuid_); |
754 | 4.91k | peer = FindPtrOrNull(peers_map_, uuid); |
755 | 4.91k | if (PREDICT_FALSE(peer == nullptr || queue_state_.mode == Mode::NON_LEADER)) { |
756 | 0 | return STATUS(NotFound, "Peer not tracked or queue not in leader mode."); |
757 | 0 | } |
758 | 4.91k | } |
759 | | |
760 | 4.91k | if (PREDICT_FALSE(!peer->needs_remote_bootstrap)) { |
761 | 0 | return STATUS(IllegalState, "Peer does not need to remotely bootstrap", uuid); |
762 | 0 | } |
763 | | |
764 | 4.91k | if (peer->member_type == PeerMemberType::VOTER || peer->member_type == PeerMemberType::OBSERVER) { |
765 | 356 | LOG(INFO) << "Remote bootstrapping peer " << uuid << " with type " |
766 | 356 | << PeerMemberType_Name(peer->member_type); |
767 | 356 | } |
768 | | |
769 | 4.91k | req->Clear(); |
770 | 4.91k | req->set_dest_uuid(uuid); |
771 | 4.91k | req->set_tablet_id(tablet_id_); |
772 | 4.91k | req->set_bootstrap_peer_uuid(local_peer_uuid_); |
773 | 4.91k | *req->mutable_source_private_addr() = local_peer_pb_.last_known_private_addr(); |
774 | 4.91k | *req->mutable_source_broadcast_addr() = local_peer_pb_.last_known_broadcast_addr(); |
775 | 4.91k | *req->mutable_source_cloud_info() = local_peer_pb_.cloud_info(); |
776 | 4.91k | req->set_caller_term(queue_state_.current_term); |
777 | 4.91k | peer->needs_remote_bootstrap = false; // Now reset the flag. |
778 | 4.91k | return Status::OK(); |
779 | 4.91k | } |
780 | | |
781 | 485 | void PeerMessageQueue::UpdateCDCConsumerOpId(const yb::OpId& op_id) { |
782 | 485 | std::lock_guard<rw_spinlock> l(cdc_consumer_lock_); |
783 | 485 | cdc_consumer_op_id_ = op_id; |
784 | 485 | cdc_consumer_op_id_last_updated_ = CoarseMonoClock::Now(); |
785 | 485 | } |
786 | | |
787 | 15.1M | yb::OpId PeerMessageQueue::GetCDCConsumerOpIdToEvict() { |
788 | 15.1M | std::shared_lock<rw_spinlock> l(cdc_consumer_lock_); |
789 | | // For log cache eviction, we only want to include CDC consumers that are actively polling. |
790 | | // If CDC consumer checkpoint has not been updated recently, we exclude it. |
791 | 15.1M | if (CoarseMonoClock::Now() - cdc_consumer_op_id_last_updated_ <= kCDCConsumerCheckpointInterval) { |
792 | 5.63k | return cdc_consumer_op_id_; |
793 | 15.1M | } else { |
794 | 15.1M | return yb::OpId::Max(); |
795 | 15.1M | } |
796 | 15.1M | } |
797 | | |
798 | 1.96M | yb::OpId PeerMessageQueue::GetCDCConsumerOpIdForIntentRemoval() { |
799 | 1.96M | std::shared_lock<rw_spinlock> l(cdc_consumer_lock_); |
800 | 1.96M | if (CoarseMonoClock::Now() - cdc_consumer_op_id_last_updated_ <= kCDCConsumerIntentRetention) { |
801 | 542 | return cdc_consumer_op_id_; |
802 | 1.96M | } else { |
803 | 1.96M | return yb::OpId::Max(); |
804 | 1.96M | } |
805 | 1.96M | } |
806 | | |
807 | 15.1M | void PeerMessageQueue::UpdateAllReplicatedOpId(OpId* result) { |
808 | 15.1M | OpId new_op_id = OpId::Max(); |
809 | | |
810 | 44.9M | for (const auto& peer : peers_map_) { |
811 | 44.9M | if (!peer.second->is_last_exchange_successful) { |
812 | 266k | return; |
813 | 266k | } |
814 | 44.6M | if (peer.second->last_received.index < new_op_id.index) { |
815 | 17.8M | new_op_id = peer.second->last_received; |
816 | 17.8M | } |
817 | 44.6M | } |
818 | | |
819 | 14.8M | CHECK_NE(OpId::Max(), new_op_id); |
820 | 14.8M | *result = new_op_id; |
821 | 14.8M | } |
822 | | |
823 | 30.2M | void PeerMessageQueue::UpdateAllAppliedOpId(OpId* result) { |
824 | 30.2M | OpId all_applied_op_id = OpId::Max(); |
825 | 89.9M | for (const auto& peer : peers_map_) { |
826 | 89.9M | if (!peer.second->is_last_exchange_successful) { |
827 | 524k | return; |
828 | 524k | } |
829 | 89.3M | all_applied_op_id = std::min(all_applied_op_id, peer.second->last_applied); |
830 | 89.3M | } |
831 | | |
832 | 29.7M | CHECK_NE(OpId::Max(), all_applied_op_id); |
833 | 29.7M | *result = all_applied_op_id; |
834 | 29.7M | } |
835 | | |
836 | 15.1M | void PeerMessageQueue::UpdateAllNonLaggingReplicatedOpId(int32_t threshold) { |
837 | 15.1M | OpId new_op_id = OpId::Max(); |
838 | | |
839 | 45.4M | for (const auto& peer : peers_map_) { |
840 | | // Ignore lagging follower. |
841 | 45.4M | if (peer.second->current_retransmissions >= threshold) { |
842 | 74.7k | continue; |
843 | 74.7k | } |
844 | 45.3M | if (peer.second->last_received.index < new_op_id.index) { |
845 | 18.0M | new_op_id = peer.second->last_received; |
846 | 18.0M | } |
847 | 45.3M | } |
848 | | |
849 | 15.1M | if (new_op_id == OpId::Max()) { |
850 | 0 | LOG_WITH_PREFIX_UNLOCKED(INFO) << "Non lagging peer(s) not found."; |
851 | 0 | new_op_id = queue_state_.all_replicated_op_id; |
852 | 0 | } |
853 | | |
854 | 15.1M | if (queue_state_.all_nonlagging_replicated_op_id.index < new_op_id.index) { |
855 | 2.37M | queue_state_.all_nonlagging_replicated_op_id = new_op_id; |
856 | 2.37M | } |
857 | 15.1M | } |
858 | | |
859 | | HAS_MEMBER_FUNCTION(InfiniteWatermarkForLocalPeer); |
860 | | |
861 | | template <class Policy, bool HasMemberFunction_InfiniteWatermarkForLocalPeer> |
862 | | struct GetInfiniteWatermarkForLocalPeer; |
863 | | |
864 | | template <class Policy> |
865 | | struct GetInfiniteWatermarkForLocalPeer<Policy, true> { |
866 | 595k | static auto Apply() { |
867 | 595k | return Policy::InfiniteWatermarkForLocalPeer(); |
868 | 595k | } consensus_queue.cc:_ZN2yb9consensus32GetInfiniteWatermarkForLocalPeerIZNS0_16PeerMessageQueue30LeaderLeaseExpirationWatermarkEvE6PolicyLb1EE5ApplyEv Line | Count | Source | 866 | 297k | static auto Apply() { | 867 | 297k | return Policy::InfiniteWatermarkForLocalPeer(); | 868 | 297k | } |
consensus_queue.cc:_ZN2yb9consensus32GetInfiniteWatermarkForLocalPeerIZNS0_16PeerMessageQueue34HybridTimeLeaseExpirationWatermarkEvE6PolicyLb1EE5ApplyEv Line | Count | Source | 866 | 297k | static auto Apply() { | 867 | 297k | return Policy::InfiniteWatermarkForLocalPeer(); | 868 | 297k | } |
|
869 | | }; |
870 | | |
871 | | template <class Policy> |
872 | | struct GetInfiniteWatermarkForLocalPeer<Policy, false> { |
873 | | // Should not be invoked, but have to define to make compiler happy. |
874 | 0 | static typename Policy::result_type Apply() { |
875 | 0 | LOG(DFATAL) << "Invoked Apply when InfiniteWatermarkForLocalPeer is not defined"; |
876 | 0 | return typename Policy::result_type(); |
877 | 0 | } Unexecuted instantiation: consensus_queue.cc:_ZN2yb9consensus32GetInfiniteWatermarkForLocalPeerIZNS0_16PeerMessageQueue20NumSSTFilesWatermarkEvE6PolicyLb0EE5ApplyEv Unexecuted instantiation: consensus_queue.cc:_ZN2yb9consensus32GetInfiniteWatermarkForLocalPeerIZNS0_16PeerMessageQueue13OpIdWatermarkEvE6PolicyLb0EE5ApplyEv |
878 | | }; |
879 | | |
880 | | template <class Policy> |
881 | 60.6M | typename Policy::result_type PeerMessageQueue::GetWatermark() { |
882 | 60.6M | DCHECK(queue_lock_.is_locked()); |
883 | 60.6M | const auto num_peers_required = queue_state_.majority_size_; |
884 | 60.6M | if (num_peers_required == kUninitializedMajoritySize) { |
885 | | // We don't even know the quorum majority size yet. |
886 | 0 | return Policy::NotEnoughPeersValue(); |
887 | 0 | } |
888 | 60.6M | CHECK_GE(num_peers_required, 0); |
889 | | |
890 | 60.6M | const ssize_t num_peers = peers_map_.size(); |
891 | 60.6M | if (num_peers < num_peers_required) { |
892 | 500 | return Policy::NotEnoughPeersValue(); |
893 | 500 | } |
894 | | |
895 | | // This flag indicates whether to implicitly assume that the local peer has an "infinite" |
896 | | // replicated value of the dimension that we are computing a watermark for. There is a difference |
897 | | // in logic between handling of OpIds vs. leader leases: |
898 | | // - For OpIds, the local peer might actually be less up-to-date than followers. |
899 | | // - For leader leases, we always assume that we've replicated an "infinite" lease to ourselves. |
900 | 60.6M | const bool local_peer_infinite_watermark = |
901 | 60.6M | HasMemberFunction_InfiniteWatermarkForLocalPeer<Policy>::value; |
902 | | |
903 | 60.6M | if (num_peers_required == 1 && local_peer_infinite_watermark) { |
904 | | // We give "infinite lease" to ourselves. |
905 | 595k | return GetInfiniteWatermarkForLocalPeer< |
906 | 595k | Policy, HasMemberFunction_InfiniteWatermarkForLocalPeer<Policy>::value>::Apply(); |
907 | 595k | } |
908 | | |
909 | 60.0M | constexpr size_t kMaxPracticalReplicationFactor = 5; |
910 | 60.0M | boost::container::small_vector< |
911 | 60.0M | typename Policy::result_type, kMaxPracticalReplicationFactor> watermarks; |
912 | 60.0M | watermarks.reserve(num_peers - 1 + !local_peer_infinite_watermark); |
913 | | |
914 | 181M | for (const PeersMap::value_type &peer_map_entry : peers_map_) { |
915 | 181M | const TrackedPeer &peer = *peer_map_entry.second; |
916 | 181M | if (local_peer_infinite_watermark && peer.uuid == local_peer_uuid_) { |
917 | | // Don't even include the local peer in the watermarks array. Assume it has an "infinite" |
918 | | // value of the watermark. |
919 | 29.7M | continue; |
920 | 29.7M | } |
921 | 151M | if (!IsRaftConfigVoter(peer.uuid, *queue_state_.active_config)) { |
922 | | // Only votes from VOTERs in the active config should be taken into consideration |
923 | 1.63M | continue; |
924 | 1.63M | } |
925 | 149M | if (peer.is_last_exchange_successful) { |
926 | 148M | watermarks.push_back(Policy::ExtractValue(peer)); |
927 | 148M | } |
928 | 149M | } |
929 | | |
930 | | // We always assume that local peer has most recent information. |
931 | 60.0M | const ssize_t num_responsive_peers = watermarks.size() + local_peer_infinite_watermark; |
932 | | |
933 | 60.0M | if (num_responsive_peers < num_peers_required) { |
934 | 18 | VLOG_WITH_PREFIX_UNLOCKED(2) |
935 | 18 | << Policy::Name() << " watermarks by peer: " << ::yb::ToString(watermarks) |
936 | 18 | << ", num_peers_required=" << num_peers_required |
937 | 18 | << ", num_responsive_peers=" << num_responsive_peers |
938 | 18 | << ", not enough responsive peers"; |
939 | | // There are not enough peers with which the last message exchange was successful. |
940 | 136k | return Policy::NotEnoughPeersValue(); |
941 | 136k | } |
942 | | |
943 | | // If there are 5 peers (and num_peers_required is 3), and we have successfully replicated |
944 | | // something to 3 of them and 4th is our local peer, there are two possibilities: |
945 | | // - If local_peer_infinite_watermark is false (for OpId): watermarks.size() is 4, |
946 | | // and we want an OpId value such that 3 or more peers have replicated that or greater value. |
947 | | // Then index_of_interest = 1, computed as watermarks.size() - num_peers_required, or |
948 | | // num_responsive_peers - num_peers_required. |
949 | | // |
950 | | // - If local_peer_infinite_watermark is true (for leader leases): watermarks.size() is 3, and we |
951 | | // are assuming that the local peer (leader) has replicated an infinitely high watermark to |
952 | | // itself. Then watermark.size() is 3 (because we skip the local peer when populating |
953 | | // watermarks), but num_responsive_peers is still 4, and the expression stays the same. |
954 | | |
955 | 59.8M | const size_t index_of_interest = num_responsive_peers - num_peers_required; |
956 | 59.8M | DCHECK_LT(index_of_interest, watermarks.size()); |
957 | | |
958 | 59.8M | auto nth = watermarks.begin() + index_of_interest; |
959 | 59.8M | std::nth_element(watermarks.begin(), nth, watermarks.end(), typename Policy::Comparator()); |
960 | 18.4E | VLOG_WITH_PREFIX_UNLOCKED(2) |
961 | 18.4E | << Policy::Name() << " watermarks by peer: " << ::yb::ToString(watermarks) |
962 | 18.4E | << ", num_peers_required=" << num_peers_required |
963 | 18.4E | << ", local_peer_infinite_watermark=" << local_peer_infinite_watermark |
964 | 18.4E | << ", watermark: " << yb::ToString(*nth); |
965 | | |
966 | 59.8M | return *nth; |
967 | 59.8M | } consensus_queue.cc:_ZN2yb9consensus16PeerMessageQueue12GetWatermarkIZNS1_30LeaderLeaseExpirationWatermarkEvE6PolicyEENT_11result_typeEv Line | Count | Source | 881 | 15.1M | typename Policy::result_type PeerMessageQueue::GetWatermark() { | 882 | 15.1M | DCHECK(queue_lock_.is_locked()); | 883 | 15.1M | const auto num_peers_required = queue_state_.majority_size_; | 884 | 15.1M | if (num_peers_required == kUninitializedMajoritySize) { | 885 | | // We don't even know the quorum majority size yet. | 886 | 0 | return Policy::NotEnoughPeersValue(); | 887 | 0 | } | 888 | 15.1M | CHECK_GE(num_peers_required, 0); | 889 | | | 890 | 15.1M | const ssize_t num_peers = peers_map_.size(); | 891 | 15.1M | if (num_peers < num_peers_required) { | 892 | 125 | return Policy::NotEnoughPeersValue(); | 893 | 125 | } | 894 | | | 895 | | // This flag indicates whether to implicitly assume that the local peer has an "infinite" | 896 | | // replicated value of the dimension that we are computing a watermark for. There is a difference | 897 | | // in logic between handling of OpIds vs. leader leases: | 898 | | // - For OpIds, the local peer might actually be less up-to-date than followers. | 899 | | // - For leader leases, we always assume that we've replicated an "infinite" lease to ourselves. | 900 | 15.1M | const bool local_peer_infinite_watermark = | 901 | 15.1M | HasMemberFunction_InfiniteWatermarkForLocalPeer<Policy>::value; | 902 | | | 903 | 15.1M | if (num_peers_required == 1 && local_peer_infinite_watermark) { | 904 | | // We give "infinite lease" to ourselves. | 905 | 297k | return GetInfiniteWatermarkForLocalPeer< | 906 | 297k | Policy, HasMemberFunction_InfiniteWatermarkForLocalPeer<Policy>::value>::Apply(); | 907 | 297k | } | 908 | | | 909 | 14.8M | constexpr size_t kMaxPracticalReplicationFactor = 5; | 910 | 14.8M | boost::container::small_vector< | 911 | 14.8M | typename Policy::result_type, kMaxPracticalReplicationFactor> watermarks; | 912 | 14.8M | watermarks.reserve(num_peers - 1 + !local_peer_infinite_watermark); | 913 | | | 914 | 45.1M | for (const PeersMap::value_type &peer_map_entry : peers_map_) { | 915 | 45.1M | const TrackedPeer &peer = *peer_map_entry.second; | 916 | 45.1M | if (local_peer_infinite_watermark && peer.uuid == local_peer_uuid_) { | 917 | | // Don't even include the local peer in the watermarks array. Assume it has an "infinite" | 918 | | // value of the watermark. | 919 | 14.8M | continue; | 920 | 14.8M | } | 921 | 30.3M | if (!IsRaftConfigVoter(peer.uuid, *queue_state_.active_config)) { | 922 | | // Only votes from VOTERs in the active config should be taken into consideration | 923 | 407k | continue; | 924 | 407k | } | 925 | 29.8M | if (peer.is_last_exchange_successful) { | 926 | 29.6M | watermarks.push_back(Policy::ExtractValue(peer)); | 927 | 29.6M | } | 928 | 29.8M | } | 929 | | | 930 | | // We always assume that local peer has most recent information. | 931 | 14.8M | const ssize_t num_responsive_peers = watermarks.size() + local_peer_infinite_watermark; | 932 | | | 933 | 14.8M | if (num_responsive_peers < num_peers_required) { | 934 | 18.4E | VLOG_WITH_PREFIX_UNLOCKED(2) | 935 | 18.4E | << Policy::Name() << " watermarks by peer: " << ::yb::ToString(watermarks) | 936 | 18.4E | << ", num_peers_required=" << num_peers_required | 937 | 18.4E | << ", num_responsive_peers=" << num_responsive_peers | 938 | 18.4E | << ", not enough responsive peers"; | 939 | | // There are not enough peers with which the last message exchange was successful. | 940 | 34.1k | return Policy::NotEnoughPeersValue(); | 941 | 34.1k | } | 942 | | | 943 | | // If there are 5 peers (and num_peers_required is 3), and we have successfully replicated | 944 | | // something to 3 of them and 4th is our local peer, there are two possibilities: | 945 | | // - If local_peer_infinite_watermark is false (for OpId): watermarks.size() is 4, | 946 | | // and we want an OpId value such that 3 or more peers have replicated that or greater value. | 947 | | // Then index_of_interest = 1, computed as watermarks.size() - num_peers_required, or | 948 | | // num_responsive_peers - num_peers_required. | 949 | | // | 950 | | // - If local_peer_infinite_watermark is true (for leader leases): watermarks.size() is 3, and we | 951 | | // are assuming that the local peer (leader) has replicated an infinitely high watermark to | 952 | | // itself. Then watermark.size() is 3 (because we skip the local peer when populating | 953 | | // watermarks), but num_responsive_peers is still 4, and the expression stays the same. | 954 | | | 955 | 14.8M | const size_t index_of_interest = num_responsive_peers - num_peers_required; | 956 | 14.8M | DCHECK_LT(index_of_interest, watermarks.size()); | 957 | | | 958 | 14.8M | auto nth = watermarks.begin() + index_of_interest; | 959 | 14.8M | std::nth_element(watermarks.begin(), nth, watermarks.end(), typename Policy::Comparator()); | 960 | 18.4E | VLOG_WITH_PREFIX_UNLOCKED(2) | 961 | 18.4E | << Policy::Name() << " watermarks by peer: " << ::yb::ToString(watermarks) | 962 | 18.4E | << ", num_peers_required=" << num_peers_required | 963 | 18.4E | << ", local_peer_infinite_watermark=" << local_peer_infinite_watermark | 964 | 18.4E | << ", watermark: " << yb::ToString(*nth); | 965 | | | 966 | 14.8M | return *nth; | 967 | 14.8M | } |
consensus_queue.cc:_ZN2yb9consensus16PeerMessageQueue12GetWatermarkIZNS1_34HybridTimeLeaseExpirationWatermarkEvE6PolicyEENT_11result_typeEv Line | Count | Source | 881 | 15.1M | typename Policy::result_type PeerMessageQueue::GetWatermark() { | 882 | 15.1M | DCHECK(queue_lock_.is_locked()); | 883 | 15.1M | const auto num_peers_required = queue_state_.majority_size_; | 884 | 15.1M | if (num_peers_required == kUninitializedMajoritySize) { | 885 | | // We don't even know the quorum majority size yet. | 886 | 0 | return Policy::NotEnoughPeersValue(); | 887 | 0 | } | 888 | 15.1M | CHECK_GE(num_peers_required, 0); | 889 | | | 890 | 15.1M | const ssize_t num_peers = peers_map_.size(); | 891 | 15.1M | if (num_peers < num_peers_required) { | 892 | 125 | return Policy::NotEnoughPeersValue(); | 893 | 125 | } | 894 | | | 895 | | // This flag indicates whether to implicitly assume that the local peer has an "infinite" | 896 | | // replicated value of the dimension that we are computing a watermark for. There is a difference | 897 | | // in logic between handling of OpIds vs. leader leases: | 898 | | // - For OpIds, the local peer might actually be less up-to-date than followers. | 899 | | // - For leader leases, we always assume that we've replicated an "infinite" lease to ourselves. | 900 | 15.1M | const bool local_peer_infinite_watermark = | 901 | 15.1M | HasMemberFunction_InfiniteWatermarkForLocalPeer<Policy>::value; | 902 | | | 903 | 15.1M | if (num_peers_required == 1 && local_peer_infinite_watermark) { | 904 | | // We give "infinite lease" to ourselves. | 905 | 297k | return GetInfiniteWatermarkForLocalPeer< | 906 | 297k | Policy, HasMemberFunction_InfiniteWatermarkForLocalPeer<Policy>::value>::Apply(); | 907 | 297k | } | 908 | | | 909 | 14.8M | constexpr size_t kMaxPracticalReplicationFactor = 5; | 910 | 14.8M | boost::container::small_vector< | 911 | 14.8M | typename Policy::result_type, kMaxPracticalReplicationFactor> watermarks; | 912 | 14.8M | watermarks.reserve(num_peers - 1 + !local_peer_infinite_watermark); | 913 | | | 914 | 45.1M | for (const PeersMap::value_type &peer_map_entry : peers_map_) { | 915 | 45.1M | const TrackedPeer &peer = *peer_map_entry.second; | 916 | 45.1M | if (local_peer_infinite_watermark && peer.uuid == local_peer_uuid_) { | 917 | | // Don't even include the local peer in the watermarks array. Assume it has an "infinite" | 918 | | // value of the watermark. | 919 | 14.8M | continue; | 920 | 14.8M | } | 921 | 30.3M | if (!IsRaftConfigVoter(peer.uuid, *queue_state_.active_config)) { | 922 | | // Only votes from VOTERs in the active config should be taken into consideration | 923 | 407k | continue; | 924 | 407k | } | 925 | 29.8M | if (peer.is_last_exchange_successful) { | 926 | 29.6M | watermarks.push_back(Policy::ExtractValue(peer)); | 927 | 29.6M | } | 928 | 29.8M | } | 929 | | | 930 | | // We always assume that local peer has most recent information. | 931 | 14.8M | const ssize_t num_responsive_peers = watermarks.size() + local_peer_infinite_watermark; | 932 | | | 933 | 14.8M | if (num_responsive_peers < num_peers_required) { | 934 | 31 | VLOG_WITH_PREFIX_UNLOCKED(2) | 935 | 31 | << Policy::Name() << " watermarks by peer: " << ::yb::ToString(watermarks) | 936 | 31 | << ", num_peers_required=" << num_peers_required | 937 | 31 | << ", num_responsive_peers=" << num_responsive_peers | 938 | 31 | << ", not enough responsive peers"; | 939 | | // There are not enough peers with which the last message exchange was successful. | 940 | 34.1k | return Policy::NotEnoughPeersValue(); | 941 | 34.1k | } | 942 | | | 943 | | // If there are 5 peers (and num_peers_required is 3), and we have successfully replicated | 944 | | // something to 3 of them and 4th is our local peer, there are two possibilities: | 945 | | // - If local_peer_infinite_watermark is false (for OpId): watermarks.size() is 4, | 946 | | // and we want an OpId value such that 3 or more peers have replicated that or greater value. | 947 | | // Then index_of_interest = 1, computed as watermarks.size() - num_peers_required, or | 948 | | // num_responsive_peers - num_peers_required. | 949 | | // | 950 | | // - If local_peer_infinite_watermark is true (for leader leases): watermarks.size() is 3, and we | 951 | | // are assuming that the local peer (leader) has replicated an infinitely high watermark to | 952 | | // itself. Then watermark.size() is 3 (because we skip the local peer when populating | 953 | | // watermarks), but num_responsive_peers is still 4, and the expression stays the same. | 954 | | | 955 | 14.8M | const size_t index_of_interest = num_responsive_peers - num_peers_required; | 956 | 14.8M | DCHECK_LT(index_of_interest, watermarks.size()); | 957 | | | 958 | 14.8M | auto nth = watermarks.begin() + index_of_interest; | 959 | 14.8M | std::nth_element(watermarks.begin(), nth, watermarks.end(), typename Policy::Comparator()); | 960 | 889 | VLOG_WITH_PREFIX_UNLOCKED(2) | 961 | 889 | << Policy::Name() << " watermarks by peer: " << ::yb::ToString(watermarks) | 962 | 889 | << ", num_peers_required=" << num_peers_required | 963 | 889 | << ", local_peer_infinite_watermark=" << local_peer_infinite_watermark | 964 | 889 | << ", watermark: " << yb::ToString(*nth); | 965 | | | 966 | 14.8M | return *nth; | 967 | 14.8M | } |
consensus_queue.cc:_ZN2yb9consensus16PeerMessageQueue12GetWatermarkIZNS1_20NumSSTFilesWatermarkEvE6PolicyEENT_11result_typeEv Line | Count | Source | 881 | 15.1M | typename Policy::result_type PeerMessageQueue::GetWatermark() { | 882 | 15.1M | DCHECK(queue_lock_.is_locked()); | 883 | 15.1M | const auto num_peers_required = queue_state_.majority_size_; | 884 | 15.1M | if (num_peers_required == kUninitializedMajoritySize) { | 885 | | // We don't even know the quorum majority size yet. | 886 | 0 | return Policy::NotEnoughPeersValue(); | 887 | 0 | } | 888 | 15.1M | CHECK_GE(num_peers_required, 0); | 889 | | | 890 | 15.1M | const ssize_t num_peers = peers_map_.size(); | 891 | 15.1M | if (num_peers < num_peers_required) { | 892 | 125 | return Policy::NotEnoughPeersValue(); | 893 | 125 | } | 894 | | | 895 | | // This flag indicates whether to implicitly assume that the local peer has an "infinite" | 896 | | // replicated value of the dimension that we are computing a watermark for. There is a difference | 897 | | // in logic between handling of OpIds vs. leader leases: | 898 | | // - For OpIds, the local peer might actually be less up-to-date than followers. | 899 | | // - For leader leases, we always assume that we've replicated an "infinite" lease to ourselves. | 900 | 15.1M | const bool local_peer_infinite_watermark = | 901 | 15.1M | HasMemberFunction_InfiniteWatermarkForLocalPeer<Policy>::value; | 902 | | | 903 | 15.1M | if (num_peers_required == 1 && local_peer_infinite_watermark) { | 904 | | // We give "infinite lease" to ourselves. | 905 | 0 | return GetInfiniteWatermarkForLocalPeer< | 906 | 0 | Policy, HasMemberFunction_InfiniteWatermarkForLocalPeer<Policy>::value>::Apply(); | 907 | 0 | } | 908 | | | 909 | 15.1M | constexpr size_t kMaxPracticalReplicationFactor = 5; | 910 | 15.1M | boost::container::small_vector< | 911 | 15.1M | typename Policy::result_type, kMaxPracticalReplicationFactor> watermarks; | 912 | 15.1M | watermarks.reserve(num_peers - 1 + !local_peer_infinite_watermark); | 913 | | | 914 | 45.4M | for (const PeersMap::value_type &peer_map_entry : peers_map_) { | 915 | 45.4M | const TrackedPeer &peer = *peer_map_entry.second; | 916 | 45.4M | if (local_peer_infinite_watermark && peer.uuid == local_peer_uuid_) { | 917 | | // Don't even include the local peer in the watermarks array. Assume it has an "infinite" | 918 | | // value of the watermark. | 919 | 0 | continue; | 920 | 0 | } | 921 | 45.4M | if (!IsRaftConfigVoter(peer.uuid, *queue_state_.active_config)) { | 922 | | // Only votes from VOTERs in the active config should be taken into consideration | 923 | 413k | continue; | 924 | 413k | } | 925 | 45.0M | if (peer.is_last_exchange_successful) { | 926 | 44.8M | watermarks.push_back(Policy::ExtractValue(peer)); | 927 | 44.8M | } | 928 | 45.0M | } | 929 | | | 930 | | // We always assume that local peer has most recent information. | 931 | 15.1M | const ssize_t num_responsive_peers = watermarks.size() + local_peer_infinite_watermark; | 932 | | | 933 | 15.1M | if (num_responsive_peers < num_peers_required) { | 934 | 18.4E | VLOG_WITH_PREFIX_UNLOCKED(2) | 935 | 18.4E | << Policy::Name() << " watermarks by peer: " << ::yb::ToString(watermarks) | 936 | 18.4E | << ", num_peers_required=" << num_peers_required | 937 | 18.4E | << ", num_responsive_peers=" << num_responsive_peers | 938 | 18.4E | << ", not enough responsive peers"; | 939 | | // There are not enough peers with which the last message exchange was successful. | 940 | 34.1k | return Policy::NotEnoughPeersValue(); | 941 | 34.1k | } | 942 | | | 943 | | // If there are 5 peers (and num_peers_required is 3), and we have successfully replicated | 944 | | // something to 3 of them and 4th is our local peer, there are two possibilities: | 945 | | // - If local_peer_infinite_watermark is false (for OpId): watermarks.size() is 4, | 946 | | // and we want an OpId value such that 3 or more peers have replicated that or greater value. | 947 | | // Then index_of_interest = 1, computed as watermarks.size() - num_peers_required, or | 948 | | // num_responsive_peers - num_peers_required. | 949 | | // | 950 | | // - If local_peer_infinite_watermark is true (for leader leases): watermarks.size() is 3, and we | 951 | | // are assuming that the local peer (leader) has replicated an infinitely high watermark to | 952 | | // itself. Then watermark.size() is 3 (because we skip the local peer when populating | 953 | | // watermarks), but num_responsive_peers is still 4, and the expression stays the same. | 954 | | | 955 | 15.1M | const size_t index_of_interest = num_responsive_peers - num_peers_required; | 956 | 15.1M | DCHECK_LT(index_of_interest, watermarks.size()); | 957 | | | 958 | 15.1M | auto nth = watermarks.begin() + index_of_interest; | 959 | 15.1M | std::nth_element(watermarks.begin(), nth, watermarks.end(), typename Policy::Comparator()); | 960 | 18.4E | VLOG_WITH_PREFIX_UNLOCKED(2) | 961 | 18.4E | << Policy::Name() << " watermarks by peer: " << ::yb::ToString(watermarks) | 962 | 18.4E | << ", num_peers_required=" << num_peers_required | 963 | 18.4E | << ", local_peer_infinite_watermark=" << local_peer_infinite_watermark | 964 | 18.4E | << ", watermark: " << yb::ToString(*nth); | 965 | | | 966 | 15.1M | return *nth; | 967 | 15.1M | } |
consensus_queue.cc:_ZN2yb9consensus16PeerMessageQueue12GetWatermarkIZNS1_13OpIdWatermarkEvE6PolicyEENT_11result_typeEv Line | Count | Source | 881 | 15.1M | typename Policy::result_type PeerMessageQueue::GetWatermark() { | 882 | 15.1M | DCHECK(queue_lock_.is_locked()); | 883 | 15.1M | const auto num_peers_required = queue_state_.majority_size_; | 884 | 15.1M | if (num_peers_required == kUninitializedMajoritySize) { | 885 | | // We don't even know the quorum majority size yet. | 886 | 0 | return Policy::NotEnoughPeersValue(); | 887 | 0 | } | 888 | 15.1M | CHECK_GE(num_peers_required, 0); | 889 | | | 890 | 15.1M | const ssize_t num_peers = peers_map_.size(); | 891 | 15.1M | if (num_peers < num_peers_required) { | 892 | 125 | return Policy::NotEnoughPeersValue(); | 893 | 125 | } | 894 | | | 895 | | // This flag indicates whether to implicitly assume that the local peer has an "infinite" | 896 | | // replicated value of the dimension that we are computing a watermark for. There is a difference | 897 | | // in logic between handling of OpIds vs. leader leases: | 898 | | // - For OpIds, the local peer might actually be less up-to-date than followers. | 899 | | // - For leader leases, we always assume that we've replicated an "infinite" lease to ourselves. | 900 | 15.1M | const bool local_peer_infinite_watermark = | 901 | 15.1M | HasMemberFunction_InfiniteWatermarkForLocalPeer<Policy>::value; | 902 | | | 903 | 15.1M | if (num_peers_required == 1 && local_peer_infinite_watermark) { | 904 | | // We give "infinite lease" to ourselves. | 905 | 0 | return GetInfiniteWatermarkForLocalPeer< | 906 | 0 | Policy, HasMemberFunction_InfiniteWatermarkForLocalPeer<Policy>::value>::Apply(); | 907 | 0 | } | 908 | | | 909 | 15.1M | constexpr size_t kMaxPracticalReplicationFactor = 5; | 910 | 15.1M | boost::container::small_vector< | 911 | 15.1M | typename Policy::result_type, kMaxPracticalReplicationFactor> watermarks; | 912 | 15.1M | watermarks.reserve(num_peers - 1 + !local_peer_infinite_watermark); | 913 | | | 914 | 45.4M | for (const PeersMap::value_type &peer_map_entry : peers_map_) { | 915 | 45.4M | const TrackedPeer &peer = *peer_map_entry.second; | 916 | 45.4M | if (local_peer_infinite_watermark && peer.uuid == local_peer_uuid_) { | 917 | | // Don't even include the local peer in the watermarks array. Assume it has an "infinite" | 918 | | // value of the watermark. | 919 | 0 | continue; | 920 | 0 | } | 921 | 45.4M | if (!IsRaftConfigVoter(peer.uuid, *queue_state_.active_config)) { | 922 | | // Only votes from VOTERs in the active config should be taken into consideration | 923 | 412k | continue; | 924 | 412k | } | 925 | 45.0M | if (peer.is_last_exchange_successful) { | 926 | 44.8M | watermarks.push_back(Policy::ExtractValue(peer)); | 927 | 44.8M | } | 928 | 45.0M | } | 929 | | | 930 | | // We always assume that local peer has most recent information. | 931 | 15.1M | const ssize_t num_responsive_peers = watermarks.size() + local_peer_infinite_watermark; | 932 | | | 933 | 15.1M | if (num_responsive_peers < num_peers_required) { | 934 | 18.4E | VLOG_WITH_PREFIX_UNLOCKED(2) | 935 | 18.4E | << Policy::Name() << " watermarks by peer: " << ::yb::ToString(watermarks) | 936 | 18.4E | << ", num_peers_required=" << num_peers_required | 937 | 18.4E | << ", num_responsive_peers=" << num_responsive_peers | 938 | 18.4E | << ", not enough responsive peers"; | 939 | | // There are not enough peers with which the last message exchange was successful. | 940 | 34.1k | return Policy::NotEnoughPeersValue(); | 941 | 34.1k | } | 942 | | | 943 | | // If there are 5 peers (and num_peers_required is 3), and we have successfully replicated | 944 | | // something to 3 of them and 4th is our local peer, there are two possibilities: | 945 | | // - If local_peer_infinite_watermark is false (for OpId): watermarks.size() is 4, | 946 | | // and we want an OpId value such that 3 or more peers have replicated that or greater value. | 947 | | // Then index_of_interest = 1, computed as watermarks.size() - num_peers_required, or | 948 | | // num_responsive_peers - num_peers_required. | 949 | | // | 950 | | // - If local_peer_infinite_watermark is true (for leader leases): watermarks.size() is 3, and we | 951 | | // are assuming that the local peer (leader) has replicated an infinitely high watermark to | 952 | | // itself. Then watermark.size() is 3 (because we skip the local peer when populating | 953 | | // watermarks), but num_responsive_peers is still 4, and the expression stays the same. | 954 | | | 955 | 15.1M | const size_t index_of_interest = num_responsive_peers - num_peers_required; | 956 | 15.1M | DCHECK_LT(index_of_interest, watermarks.size()); | 957 | | | 958 | 15.1M | auto nth = watermarks.begin() + index_of_interest; | 959 | 15.1M | std::nth_element(watermarks.begin(), nth, watermarks.end(), typename Policy::Comparator()); | 960 | 18.4E | VLOG_WITH_PREFIX_UNLOCKED(2) | 961 | 18.4E | << Policy::Name() << " watermarks by peer: " << ::yb::ToString(watermarks) | 962 | 18.4E | << ", num_peers_required=" << num_peers_required | 963 | 18.4E | << ", local_peer_infinite_watermark=" << local_peer_infinite_watermark | 964 | 18.4E | << ", watermark: " << yb::ToString(*nth); | 965 | | | 966 | 15.1M | return *nth; | 967 | 15.1M | } |
|
968 | | |
969 | 15.1M | CoarseTimePoint PeerMessageQueue::LeaderLeaseExpirationWatermark() { |
970 | 15.1M | struct Policy { |
971 | 15.1M | typedef CoarseTimePoint result_type; |
972 | | // Workaround for a gcc bug. That does not understand that Comparator is actually being used. |
973 | 15.1M | __attribute__((unused)) typedef std::less<result_type> Comparator; |
974 | | |
975 | 34.2k | static result_type NotEnoughPeersValue() { |
976 | 34.2k | return result_type::min(); |
977 | 34.2k | } |
978 | | |
979 | 297k | static result_type InfiniteWatermarkForLocalPeer() { |
980 | 297k | return result_type::max(); |
981 | 297k | } |
982 | | |
983 | 29.6M | static result_type ExtractValue(const TrackedPeer& peer) { |
984 | 29.6M | auto lease_exp = peer.leader_lease_expiration.last_received; |
985 | 29.6M | return lease_exp != CoarseTimePoint() ? lease_exp : CoarseTimePoint::min(); |
986 | 29.6M | } |
987 | | |
988 | 2 | static const char* Name() { |
989 | 2 | return "Leader lease expiration"; |
990 | 2 | } |
991 | 15.1M | }; |
992 | | |
993 | 15.1M | return GetWatermark<Policy>(); |
994 | 15.1M | } |
995 | | |
996 | 15.1M | MicrosTime PeerMessageQueue::HybridTimeLeaseExpirationWatermark() { |
997 | 15.1M | struct Policy { |
998 | 15.1M | typedef MicrosTime result_type; |
999 | | // Workaround for a gcc bug. That does not understand that Comparator is actually being used. |
1000 | 15.1M | __attribute__((unused)) typedef std::less<result_type> Comparator; |
1001 | | |
1002 | 34.2k | static result_type NotEnoughPeersValue() { |
1003 | 34.2k | return HybridTime::kMin.GetPhysicalValueMicros(); |
1004 | 34.2k | } |
1005 | | |
1006 | 297k | static result_type InfiniteWatermarkForLocalPeer() { |
1007 | 297k | return HybridTime::kMax.GetPhysicalValueMicros(); |
1008 | 297k | } |
1009 | | |
1010 | 29.6M | static result_type ExtractValue(const TrackedPeer& peer) { |
1011 | 29.6M | return peer.leader_ht_lease_expiration.last_received; |
1012 | 29.6M | } |
1013 | | |
1014 | 2 | static const char* Name() { |
1015 | 2 | return "Hybrid time leader lease expiration"; |
1016 | 2 | } |
1017 | 15.1M | }; |
1018 | | |
1019 | 15.1M | return GetWatermark<Policy>(); |
1020 | 15.1M | } |
1021 | | |
1022 | 15.1M | uint64_t PeerMessageQueue::NumSSTFilesWatermark() { |
1023 | 15.1M | struct Policy { |
1024 | 15.1M | typedef uint64_t result_type; |
1025 | | // Workaround for a gcc bug. That does not understand that Comparator is actually being used. |
1026 | 15.1M | __attribute__((unused)) typedef std::greater<result_type> Comparator; |
1027 | | |
1028 | 34.3k | static result_type NotEnoughPeersValue() { |
1029 | 34.3k | return 0; |
1030 | 34.3k | } |
1031 | | |
1032 | 44.8M | static result_type ExtractValue(const TrackedPeer& peer) { |
1033 | 44.8M | return peer.num_sst_files; |
1034 | 44.8M | } |
1035 | | |
1036 | 0 | static const char* Name() { |
1037 | 0 | return "Num SST files"; |
1038 | 0 | } |
1039 | 15.1M | }; |
1040 | | |
1041 | 15.1M | auto watermark = GetWatermark<Policy>(); |
1042 | 15.1M | return std::max(watermark, local_peer_->num_sst_files); |
1043 | 15.1M | } |
1044 | | |
1045 | 15.1M | OpId PeerMessageQueue::OpIdWatermark() { |
1046 | 15.1M | struct Policy { |
1047 | 15.1M | typedef OpId result_type; |
1048 | | |
1049 | 34.3k | static result_type NotEnoughPeersValue() { |
1050 | 34.3k | return OpId::Min(); |
1051 | 34.3k | } |
1052 | | |
1053 | 44.8M | static result_type ExtractValue(const TrackedPeer& peer) { |
1054 | 44.8M | return peer.last_received; |
1055 | 44.8M | } |
1056 | | |
1057 | 15.1M | struct Comparator { |
1058 | 33.1M | bool operator()(const OpId& lhs, const OpId& rhs) { |
1059 | 33.1M | return lhs.index < rhs.index; |
1060 | 33.1M | } |
1061 | 15.1M | }; |
1062 | | |
1063 | 0 | static const char* Name() { |
1064 | 0 | return "OpId"; |
1065 | 0 | } |
1066 | 15.1M | }; |
1067 | | |
1068 | 15.1M | return GetWatermark<Policy>(); |
1069 | 15.1M | } |
1070 | | |
1071 | 11.0k | void PeerMessageQueue::NotifyPeerIsResponsiveDespiteError(const std::string& peer_uuid) { |
1072 | 11.0k | LockGuard l(queue_lock_); |
1073 | 11.0k | TrackedPeer* peer = FindPtrOrNull(peers_map_, peer_uuid); |
1074 | 11.0k | if (!peer) return; |
1075 | 11.0k | peer->last_successful_communication_time = MonoTime::Now(); |
1076 | 11.0k | } |
1077 | | |
1078 | 13.2k | void PeerMessageQueue::RequestWasNotSent(const std::string& peer_uuid) { |
1079 | 13.2k | LockGuard scoped_lock(queue_lock_); |
1080 | 13.2k | DCHECK_NE(State::kQueueConstructed, queue_state_.state); |
1081 | | |
1082 | 13.2k | TrackedPeer* peer = FindPtrOrNull(peers_map_, peer_uuid); |
1083 | 13.2k | if (PREDICT_FALSE(queue_state_.state != State::kQueueOpen || peer == nullptr)) { |
1084 | 0 | LOG_WITH_PREFIX_UNLOCKED(WARNING) << "Queue is closed or peer was untracked."; |
1085 | 0 | return; |
1086 | 0 | } |
1087 | | |
1088 | 13.2k | peer->ResetLastRequest(); |
1089 | 13.2k | } |
1090 | | |
1091 | | |
1092 | | bool PeerMessageQueue::ResponseFromPeer(const std::string& peer_uuid, |
1093 | 15.2M | const ConsensusResponsePB& response) { |
1094 | 19.9k | DCHECK(response.IsInitialized()) << "Error: Uninitialized: " |
1095 | 19.9k | << response.InitializationErrorString() << ". Response: " << response.ShortDebugString(); |
1096 | | |
1097 | 15.2M | MajorityReplicatedData majority_replicated; |
1098 | 15.2M | Mode mode_copy; |
1099 | 15.2M | bool result = false; |
1100 | 15.2M | { |
1101 | 15.2M | LockGuard scoped_lock(queue_lock_); |
1102 | 15.2M | DCHECK_NE(State::kQueueConstructed, queue_state_.state); |
1103 | | |
1104 | 15.2M | TrackedPeer* peer = FindPtrOrNull(peers_map_, peer_uuid); |
1105 | 15.2M | if (PREDICT_FALSE(queue_state_.state != State::kQueueOpen || peer == nullptr)) { |
1106 | 5 | LOG_WITH_PREFIX_UNLOCKED(WARNING) << "Queue is closed or peer was untracked, disregarding " |
1107 | 5 | "peer response. Response: " << response.ShortDebugString(); |
1108 | 5 | return false; |
1109 | 5 | } |
1110 | | |
1111 | | // Remotely bootstrap the peer if the tablet is not found or deleted. |
1112 | 15.2M | if (response.has_error()) { |
1113 | | // We only let special types of errors through to this point from the peer. |
1114 | 0 | CHECK_EQ(tserver::TabletServerErrorPB::TABLET_NOT_FOUND, response.error().code()) |
1115 | 0 | << response.ShortDebugString(); |
1116 | | |
1117 | 4.91k | peer->needs_remote_bootstrap = true; |
1118 | | // Since we received a response from the peer, we know it is alive. So we need to update |
1119 | | // peer->last_successful_communication_time, otherwise, we will remove this peer from the |
1120 | | // configuration if the remote bootstrap is not completed within |
1121 | | // FLAGS_follower_unavailable_considered_failed_sec seconds. |
1122 | 4.91k | peer->last_successful_communication_time = MonoTime::Now(); |
1123 | 272 | YB_LOG_WITH_PREFIX_UNLOCKED_EVERY_N_SECS(INFO, 30) |
1124 | 272 | << "Marked peer as needing remote bootstrap: " << peer->ToString(); |
1125 | 4.91k | return true; |
1126 | 4.91k | } |
1127 | | |
1128 | 15.2M | if (queue_state_.active_config) { |
1129 | 15.2M | RaftPeerPB peer_pb; |
1130 | 15.2M | if (!GetRaftConfigMember(*queue_state_.active_config, peer_uuid, &peer_pb).ok()) { |
1131 | 0 | LOG(FATAL) << "Peer " << peer_uuid << " not in active config"; |
1132 | 0 | } |
1133 | 15.2M | peer->member_type = peer_pb.member_type(); |
1134 | 207 | } else { |
1135 | 207 | peer->member_type = PeerMemberType::UNKNOWN_MEMBER_TYPE; |
1136 | 207 | } |
1137 | | |
1138 | | // Application level errors should be handled elsewhere |
1139 | 15.2M | DCHECK(!response.has_error()); |
1140 | | |
1141 | | // Take a snapshot of the current peer status. |
1142 | 15.2M | TrackedPeer previous = *peer; |
1143 | | |
1144 | | // Update the peer status based on the response. |
1145 | 15.2M | peer->is_new = false; |
1146 | 15.2M | peer->last_successful_communication_time = MonoTime::Now(); |
1147 | | |
1148 | 15.2M | peer->ResetLastRequest(); |
1149 | | |
1150 | 15.2M | if (response.has_status()) { |
1151 | 15.2M | const auto& status = response.status(); |
1152 | | // Sanity checks. Some of these can be eventually removed, but they are handy for now. |
1153 | 39 | DCHECK(status.IsInitialized()) << "Error: Uninitialized: " |
1154 | 39 | << response.InitializationErrorString() |
1155 | 39 | << ". Response: " << response.ShortDebugString(); |
1156 | | // The status must always have a last received op id and a last committed index. |
1157 | 15.2M | DCHECK(status.has_last_received()); |
1158 | 15.2M | DCHECK(status.has_last_received_current_leader()); |
1159 | 15.2M | DCHECK(status.has_last_committed_idx()); |
1160 | | |
1161 | 15.2M | peer->last_known_committed_idx = status.last_committed_idx(); |
1162 | 15.2M | peer->last_applied = OpId::FromPB(status.last_applied()); |
1163 | | |
1164 | | // If the reported last-received op for the replica is in our local log, then resume sending |
1165 | | // entries from that point onward. Otherwise, resume after the last op they received from us. |
1166 | | // If we've never successfully sent them anything, start after the last-committed op in their |
1167 | | // log, which is guaranteed by the Raft protocol to be a valid op. |
1168 | | |
1169 | 15.2M | bool peer_has_prefix_of_log = IsOpInLog(yb::OpId::FromPB(status.last_received())); |
1170 | 15.2M | if (peer_has_prefix_of_log) { |
1171 | | // If the latest thing in their log is in our log, we are in sync. |
1172 | 15.2M | peer->last_received = OpId::FromPB(status.last_received()); |
1173 | 15.2M | peer->next_index = peer->last_received.index + 1; |
1174 | | |
1175 | 18.4E | } else if (!OpIdEquals(status.last_received_current_leader(), MinimumOpId())) { |
1176 | | // Their log may have diverged from ours, however we are in the process of replicating our |
1177 | | // ops to them, so continue doing so. Eventually, we will cause the divergent entry in their |
1178 | | // log to be overwritten. |
1179 | 4 | peer->last_received = OpId::FromPB(status.last_received_current_leader()); |
1180 | 4 | peer->next_index = peer->last_received.index + 1; |
1181 | 18.4E | } else { |
1182 | | // The peer is divergent and they have not (successfully) received anything from us yet. |
1183 | | // Start sending from their last committed index. This logic differs from the Raft spec |
1184 | | // slightly because instead of stepping back one-by-one from the end until we no longer have |
1185 | | // an LMP error, we jump back to the last committed op indicated by the peer with the hope |
1186 | | // that doing so will result in a faster catch-up process. |
1187 | 18.4E | DCHECK_GE(peer->last_known_committed_idx, 0); |
1188 | 18.4E | peer->next_index = peer->last_known_committed_idx + 1; |
1189 | 18.4E | } |
1190 | | |
1191 | 15.2M | if (PREDICT_FALSE(status.has_error())) { |
1192 | 68.2k | peer->is_last_exchange_successful = false; |
1193 | 68.2k | switch (status.error().code()) { |
1194 | 68.2k | case ConsensusErrorPB::PRECEDING_ENTRY_DIDNT_MATCH: { |
1195 | 68.2k | DCHECK(status.has_last_received()); |
1196 | 68.2k | if (previous.is_new) { |
1197 | | // That's currently how we can detect that we able to connect to a peer. |
1198 | 68.1k | LOG_WITH_PREFIX_UNLOCKED(INFO) << "Connected to new peer: " << peer->ToString(); |
1199 | 94 | } else { |
1200 | 94 | LOG_WITH_PREFIX_UNLOCKED(INFO) << "Got LMP mismatch error from peer: " |
1201 | 94 | << peer->ToString(); |
1202 | 94 | CHECK(!FLAGS_TEST_disallow_lmp_failures); |
1203 | 94 | } |
1204 | 68.2k | return true; |
1205 | 0 | } |
1206 | 19 | case ConsensusErrorPB::INVALID_TERM: { |
1207 | 19 | CHECK(response.has_responder_term()); |
1208 | 19 | LOG_WITH_PREFIX_UNLOCKED(INFO) << "Peer responded invalid term: " << peer->ToString() |
1209 | 19 | << ". Peer's new term: " << response.responder_term(); |
1210 | 19 | NotifyObserversOfTermChange(response.responder_term()); |
1211 | 19 | return false; |
1212 | 0 | } |
1213 | 0 | default: { |
1214 | 0 | LOG_WITH_PREFIX_UNLOCKED(FATAL) << "Unexpected consensus error. Code: " |
1215 | 0 | << ConsensusErrorPB::Code_Name(status.error().code()) << ". Response: " |
1216 | 0 | << response.ShortDebugString(); |
1217 | 0 | } |
1218 | 68.2k | } |
1219 | 68.2k | } |
1220 | 15.2M | } |
1221 | | |
1222 | 15.1M | peer->is_last_exchange_successful = true; |
1223 | 15.1M | peer->num_sst_files = response.num_sst_files(); |
1224 | | |
1225 | 15.1M | if (response.has_responder_term()) { |
1226 | | // The peer must have responded with a term that is greater than or equal to the last known |
1227 | | // term for that peer. |
1228 | 10.2M | peer->CheckMonotonicTerms(response.responder_term()); |
1229 | | |
1230 | | // If the responder didn't send an error back that must mean that it has a term that is the |
1231 | | // same or lower than ours. |
1232 | 10.2M | CHECK_LE(response.responder_term(), queue_state_.current_term); |
1233 | 10.2M | } |
1234 | | |
1235 | 15.1M | if (PREDICT_FALSE(VLOG_IS_ON(2))) { |
1236 | 0 | VLOG_WITH_PREFIX_UNLOCKED(2) << "Received Response from Peer (" << peer->ToString() << "). " |
1237 | 0 | << "Response: " << response.ShortDebugString(); |
1238 | 0 | } |
1239 | | |
1240 | | // If our log has the next request for the peer or if the peer's committed index is lower than |
1241 | | // our own, set 'more_pending' to true. |
1242 | 15.1M | result = log_cache_.HasOpBeenWritten(peer->next_index) || |
1243 | 13.9M | (peer->last_known_committed_idx < queue_state_.committed_op_id.index); |
1244 | | |
1245 | 15.1M | mode_copy = queue_state_.mode; |
1246 | 15.1M | if (mode_copy == Mode::LEADER) { |
1247 | 15.1M | auto new_majority_replicated_opid = OpIdWatermark(); |
1248 | 15.1M | if (new_majority_replicated_opid != OpId::Min()) { |
1249 | 15.1M | if (new_majority_replicated_opid.index == MaximumOpId().index()) { |
1250 | 0 | queue_state_.majority_replicated_op_id = local_peer_->last_received; |
1251 | 15.1M | } else { |
1252 | 15.1M | queue_state_.majority_replicated_op_id = new_majority_replicated_opid; |
1253 | 15.1M | } |
1254 | 15.1M | } |
1255 | | |
1256 | 15.1M | peer->leader_lease_expiration.OnReplyFromFollower(); |
1257 | 15.1M | peer->leader_ht_lease_expiration.OnReplyFromFollower(); |
1258 | | |
1259 | 15.1M | majority_replicated.op_id = queue_state_.majority_replicated_op_id; |
1260 | 15.1M | majority_replicated.leader_lease_expiration = LeaderLeaseExpirationWatermark(); |
1261 | 15.1M | majority_replicated.ht_lease_expiration = HybridTimeLeaseExpirationWatermark(); |
1262 | 15.1M | majority_replicated.num_sst_files = NumSSTFilesWatermark(); |
1263 | 15.1M | if (peer->last_received == queue_state_.last_applied_op_id) { |
1264 | 9.06M | majority_replicated.peer_got_all_ops = peer->uuid; |
1265 | 9.06M | } |
1266 | 15.1M | } |
1267 | | |
1268 | 15.1M | UpdateAllReplicatedOpId(&queue_state_.all_replicated_op_id); |
1269 | 15.1M | UpdateAllAppliedOpId(&queue_state_.all_applied_op_id); |
1270 | | |
1271 | 15.1M | auto evict_index = GetCDCConsumerOpIdToEvict().index; |
1272 | | |
1273 | 15.1M | int32_t lagging_follower_threshold = FLAGS_consensus_lagging_follower_threshold; |
1274 | 15.1M | if (lagging_follower_threshold > 0) { |
1275 | 15.1M | UpdateAllNonLaggingReplicatedOpId(lagging_follower_threshold); |
1276 | 15.1M | evict_index = std::min(evict_index, queue_state_.all_nonlagging_replicated_op_id.index); |
1277 | 2.52k | } else { |
1278 | 2.52k | evict_index = std::min(evict_index, queue_state_.all_replicated_op_id.index); |
1279 | 2.52k | } |
1280 | | |
1281 | 15.1M | log_cache_.EvictThroughOp(evict_index); |
1282 | | |
1283 | 15.1M | UpdateMetrics(); |
1284 | 15.1M | } |
1285 | | |
1286 | 15.1M | if (mode_copy == Mode::LEADER) { |
1287 | 15.1M | NotifyObserversOfMajorityReplOpChange(majority_replicated); |
1288 | 15.1M | } |
1289 | | |
1290 | 15.1M | return result; |
1291 | 15.2M | } |
1292 | | |
1293 | 11 | PeerMessageQueue::TrackedPeer PeerMessageQueue::GetTrackedPeerForTests(string uuid) { |
1294 | 11 | LockGuard scoped_lock(queue_lock_); |
1295 | 11 | TrackedPeer* tracked = FindOrDie(peers_map_, uuid); |
1296 | 11 | return *tracked; |
1297 | 11 | } |
1298 | | |
1299 | 13 | OpId PeerMessageQueue::TEST_GetAllReplicatedIndex() const { |
1300 | 13 | LockGuard lock(queue_lock_); |
1301 | 13 | return queue_state_.all_replicated_op_id; |
1302 | 13 | } |
1303 | | |
1304 | 68 | OpId PeerMessageQueue::GetAllAppliedOpId() const { |
1305 | 68 | LockGuard lock(queue_lock_); |
1306 | 68 | return queue_state_.all_applied_op_id; |
1307 | 68 | } |
1308 | | |
1309 | 5 | OpId PeerMessageQueue::TEST_GetCommittedIndex() const { |
1310 | 5 | LockGuard lock(queue_lock_); |
1311 | 5 | return queue_state_.committed_op_id; |
1312 | 5 | } |
1313 | | |
1314 | 11 | OpId PeerMessageQueue::TEST_GetMajorityReplicatedOpId() const { |
1315 | 11 | LockGuard lock(queue_lock_); |
1316 | 11 | return queue_state_.majority_replicated_op_id; |
1317 | 11 | } |
1318 | | |
1319 | 2 | OpId PeerMessageQueue::TEST_GetLastAppended() const { |
1320 | 2 | LockGuard lock(queue_lock_); |
1321 | 2 | return queue_state_.last_appended; |
1322 | 2 | } |
1323 | | |
1324 | 16 | OpId PeerMessageQueue::TEST_GetLastAppliedOpId() const { |
1325 | 16 | LockGuard lock(queue_lock_); |
1326 | 16 | return queue_state_.last_applied_op_id; |
1327 | 16 | } |
1328 | | |
1329 | 30.5M | void PeerMessageQueue::UpdateMetrics() { |
1330 | | // Since operations have consecutive indices we can update the metrics based on simple index math. |
1331 | 30.5M | metrics_.num_majority_done_ops->set_value( |
1332 | 30.5M | queue_state_.committed_op_id.index - queue_state_.all_replicated_op_id.index); |
1333 | 30.5M | metrics_.num_in_progress_ops->set_value( |
1334 | 30.5M | queue_state_.last_appended.index - queue_state_.committed_op_id.index); |
1335 | 30.5M | } |
1336 | | |
1337 | 47 | void PeerMessageQueue::DumpToHtml(std::ostream& out) const { |
1338 | 47 | using std::endl; |
1339 | | |
1340 | 47 | LockGuard lock(queue_lock_); |
1341 | 47 | out << "<h3>Watermarks</h3>" << endl; |
1342 | 47 | out << "<table>" << endl;; |
1343 | 47 | out << " <tr><th>Peer</th><th>Watermark</th></tr>" << endl; |
1344 | 126 | for (const PeersMap::value_type& entry : peers_map_) { |
1345 | 126 | out << Substitute(" <tr><td>$0</td><td>$1</td></tr>", |
1346 | 126 | EscapeForHtmlToString(entry.first), |
1347 | 126 | EscapeForHtmlToString(entry.second->ToString())) << endl; |
1348 | 126 | } |
1349 | 47 | out << "</table>" << endl; |
1350 | | |
1351 | 47 | log_cache_.DumpToHtml(out); |
1352 | 47 | } |
1353 | | |
1354 | 95.6k | void PeerMessageQueue::ClearUnlocked() { |
1355 | 95.6k | STLDeleteValues(&peers_map_); |
1356 | 95.6k | queue_state_.state = State::kQueueClosed; |
1357 | 95.6k | } |
1358 | | |
1359 | 95.6k | void PeerMessageQueue::Close() { |
1360 | 95.6k | if (installed_num_sst_files_changed_listener_) { |
1361 | 47.7k | context_->ListenNumSSTFilesChanged(std::function<void()>()); |
1362 | 47.7k | installed_num_sst_files_changed_listener_ = false; |
1363 | 47.7k | } |
1364 | 95.6k | raft_pool_observers_token_->Shutdown(); |
1365 | 95.6k | LockGuard lock(queue_lock_); |
1366 | 95.6k | ClearUnlocked(); |
1367 | 95.6k | } |
1368 | | |
1369 | 94 | string PeerMessageQueue::ToString() const { |
1370 | | // Even though metrics are thread-safe obtain the lock so that we get a "consistent" snapshot of |
1371 | | // the metrics. |
1372 | 94 | LockGuard lock(queue_lock_); |
1373 | 94 | return ToStringUnlocked(); |
1374 | 94 | } |
1375 | | |
1376 | 94 | string PeerMessageQueue::ToStringUnlocked() const { |
1377 | 94 | return Substitute("Consensus queue metrics:" |
1378 | 94 | "Only Majority Done Ops: $0, In Progress Ops: $1, Cache: $2", |
1379 | 94 | metrics_.num_majority_done_ops->value(), metrics_.num_in_progress_ops->value(), |
1380 | 94 | log_cache_.StatsString()); |
1381 | 94 | } |
1382 | | |
1383 | 35.1k | void PeerMessageQueue::RegisterObserver(PeerMessageQueueObserver* observer) { |
1384 | 35.1k | LockGuard lock(queue_lock_); |
1385 | 35.1k | auto iter = std::find(observers_.begin(), observers_.end(), observer); |
1386 | 35.1k | if (iter == observers_.end()) { |
1387 | 35.1k | observers_.push_back(observer); |
1388 | 35.1k | } |
1389 | 35.1k | } |
1390 | | |
1391 | 93.6k | Status PeerMessageQueue::UnRegisterObserver(PeerMessageQueueObserver* observer) { |
1392 | 93.6k | LockGuard lock(queue_lock_); |
1393 | 93.6k | auto iter = std::find(observers_.begin(), observers_.end(), observer); |
1394 | 93.6k | if (iter == observers_.end()) { |
1395 | 88.7k | return STATUS(NotFound, "Can't find observer."); |
1396 | 88.7k | } |
1397 | 4.90k | observers_.erase(iter); |
1398 | 4.90k | return Status::OK(); |
1399 | 4.90k | } |
1400 | | |
1401 | 332k | const char* PeerMessageQueue::ModeToStr(Mode mode) { |
1402 | 332k | switch (mode) { |
1403 | 145k | case Mode::LEADER: return "LEADER"; |
1404 | 187k | case Mode::NON_LEADER: return "NON_LEADER"; |
1405 | 0 | } |
1406 | 0 | FATAL_INVALID_ENUM_VALUE(PeerMessageQueue::Mode, mode); |
1407 | 0 | } |
1408 | | |
1409 | 131k | const char* PeerMessageQueue::StateToStr(State state) { |
1410 | 131k | switch (state) { |
1411 | 0 | case State::kQueueConstructed: |
1412 | 0 | return "QUEUE_CONSTRUCTED"; |
1413 | 131k | case State::kQueueOpen: |
1414 | 131k | return "QUEUE_OPEN"; |
1415 | 0 | case State::kQueueClosed: |
1416 | 0 | return "QUEUE_CLOSED"; |
1417 | | |
1418 | 0 | } |
1419 | 0 | FATAL_INVALID_ENUM_VALUE(PeerMessageQueue::State, state); |
1420 | 0 | } |
1421 | | |
1422 | 15.2M | bool PeerMessageQueue::IsOpInLog(const yb::OpId& desired_op) const { |
1423 | 15.2M | auto result = log_cache_.LookupOpId(desired_op.index); |
1424 | 15.2M | if (PREDICT_TRUE(result.ok())) { |
1425 | 15.2M | return desired_op == *result; |
1426 | 15.2M | } |
1427 | 18.4E | if (PREDICT_TRUE(result.status().IsNotFound() || result.status().IsIncomplete())) { |
1428 | 3 | return false; |
1429 | 3 | } |
1430 | 18.4E | LOG_WITH_PREFIX_UNLOCKED(FATAL) << "Error while reading the log: " << result.status(); |
1431 | 18.4E | return false; // Unreachable; here to squelch GCC warning. |
1432 | 18.4E | } |
1433 | | |
1434 | | void PeerMessageQueue::NotifyObserversOfMajorityReplOpChange( |
1435 | 15.1M | const MajorityReplicatedData& majority_replicated_data) { |
1436 | 15.1M | WARN_NOT_OK(raft_pool_observers_token_->SubmitClosure( |
1437 | 15.1M | Bind(&PeerMessageQueue::NotifyObserversOfMajorityReplOpChangeTask, |
1438 | 15.1M | Unretained(this), |
1439 | 15.1M | majority_replicated_data)), |
1440 | 15.1M | LogPrefixUnlocked() + "Unable to notify RaftConsensus of " |
1441 | 15.1M | "majority replicated op change."); |
1442 | 15.1M | } |
1443 | | |
1444 | | template <class Func> |
1445 | 1.62k | void PeerMessageQueue::NotifyObservers(const char* title, Func&& func) { |
1446 | 1.62k | WARN_NOT_OK( |
1447 | 1.62k | raft_pool_observers_token_->SubmitFunc( |
1448 | 1.62k | [this, func = std::move(func)] { |
1449 | 1.62k | MAYBE_INJECT_RANDOM_LATENCY(FLAGS_consensus_inject_latency_ms_in_notifications); |
1450 | 1.62k | std::vector<PeerMessageQueueObserver*> copy; |
1451 | 1.62k | { |
1452 | 1.62k | LockGuard lock(queue_lock_); |
1453 | 1.62k | copy = observers_; |
1454 | 1.62k | } |
1455 | | |
1456 | 1.62k | for (PeerMessageQueueObserver* observer : copy) { |
1457 | 1.62k | func(observer); |
1458 | 1.62k | } |
1459 | 1.62k | }), |
1460 | 1.62k | Format("$0Unable to notify observers for $1.", LogPrefixUnlocked(), title)); |
1461 | 1.62k | } consensus_queue.cc:_ZN2yb9consensus16PeerMessageQueue15NotifyObserversIZNS1_18NumSSTFilesChangedEvE3$_0EEvPKcOT_ Line | Count | Source | 1445 | 1.44k | void PeerMessageQueue::NotifyObservers(const char* title, Func&& func) { | 1446 | 1.44k | WARN_NOT_OK( | 1447 | 1.44k | raft_pool_observers_token_->SubmitFunc( | 1448 | 1.44k | [this, func = std::move(func)] { | 1449 | 1.44k | MAYBE_INJECT_RANDOM_LATENCY(FLAGS_consensus_inject_latency_ms_in_notifications); | 1450 | 1.44k | std::vector<PeerMessageQueueObserver*> copy; | 1451 | 1.44k | { | 1452 | 1.44k | LockGuard lock(queue_lock_); | 1453 | 1.44k | copy = observers_; | 1454 | 1.44k | } | 1455 | | | 1456 | 1.44k | for (PeerMessageQueueObserver* observer : copy) { | 1457 | 1.44k | func(observer); | 1458 | 1.44k | } | 1459 | 1.44k | }), | 1460 | 1.44k | Format("$0Unable to notify observers for $1.", LogPrefixUnlocked(), title)); | 1461 | 1.44k | } |
consensus_queue.cc:_ZN2yb9consensus16PeerMessageQueue15NotifyObserversIZNS1_27NotifyObserversOfTermChangeExE3$_1EEvPKcOT_ Line | Count | Source | 1445 | 19 | void PeerMessageQueue::NotifyObservers(const char* title, Func&& func) { | 1446 | 19 | WARN_NOT_OK( | 1447 | 19 | raft_pool_observers_token_->SubmitFunc( | 1448 | 19 | [this, func = std::move(func)] { | 1449 | 19 | MAYBE_INJECT_RANDOM_LATENCY(FLAGS_consensus_inject_latency_ms_in_notifications); | 1450 | 19 | std::vector<PeerMessageQueueObserver*> copy; | 1451 | 19 | { | 1452 | 19 | LockGuard lock(queue_lock_); | 1453 | 19 | copy = observers_; | 1454 | 19 | } | 1455 | | | 1456 | 19 | for (PeerMessageQueueObserver* observer : copy) { | 1457 | 19 | func(observer); | 1458 | 19 | } | 1459 | 19 | }), | 1460 | 19 | Format("$0Unable to notify observers for $1.", LogPrefixUnlocked(), title)); | 1461 | 19 | } |
consensus_queue.cc:_ZN2yb9consensus16PeerMessageQueue15NotifyObserversIZNS1_31NotifyObserversOfFailedFollowerERKNSt3__112basic_stringIcNS3_11char_traitsIcEENS3_9allocatorIcEEEExSB_E3$_2EEvPKcOT_ Line | Count | Source | 1445 | 160 | void PeerMessageQueue::NotifyObservers(const char* title, Func&& func) { | 1446 | 160 | WARN_NOT_OK( | 1447 | 160 | raft_pool_observers_token_->SubmitFunc( | 1448 | 160 | [this, func = std::move(func)] { | 1449 | 160 | MAYBE_INJECT_RANDOM_LATENCY(FLAGS_consensus_inject_latency_ms_in_notifications); | 1450 | 160 | std::vector<PeerMessageQueueObserver*> copy; | 1451 | 160 | { | 1452 | 160 | LockGuard lock(queue_lock_); | 1453 | 160 | copy = observers_; | 1454 | 160 | } | 1455 | | | 1456 | 160 | for (PeerMessageQueueObserver* observer : copy) { | 1457 | 160 | func(observer); | 1458 | 160 | } | 1459 | 160 | }), | 1460 | 160 | Format("$0Unable to notify observers for $1.", LogPrefixUnlocked(), title)); | 1461 | 160 | } |
|
1462 | | |
1463 | 19 | void PeerMessageQueue::NotifyObserversOfTermChange(int64_t term) { |
1464 | 18 | NotifyObservers("term change", [term](PeerMessageQueueObserver* observer) { |
1465 | 18 | observer->NotifyTermChange(term); |
1466 | 18 | }); |
1467 | 19 | } |
1468 | | |
1469 | | void PeerMessageQueue::NotifyObserversOfMajorityReplOpChangeTask( |
1470 | 15.1M | const MajorityReplicatedData& majority_replicated_data) { |
1471 | 15.1M | std::vector<PeerMessageQueueObserver*> copy; |
1472 | 15.1M | { |
1473 | 15.1M | LockGuard lock(queue_lock_); |
1474 | 15.1M | copy = observers_; |
1475 | 15.1M | } |
1476 | | |
1477 | | // TODO move commit index advancement here so that the queue is not dependent on consensus at all, |
1478 | | // but that requires a bit more work. |
1479 | 15.1M | OpId new_committed_op_id; |
1480 | 15.1M | OpId last_applied_op_id; |
1481 | 15.1M | for (PeerMessageQueueObserver* observer : copy) { |
1482 | 15.1M | observer->UpdateMajorityReplicated( |
1483 | 15.1M | majority_replicated_data, &new_committed_op_id, &last_applied_op_id); |
1484 | 15.1M | } |
1485 | | |
1486 | 15.1M | { |
1487 | 15.1M | LockGuard lock(queue_lock_); |
1488 | 15.1M | if (!new_committed_op_id.empty() && |
1489 | 15.1M | new_committed_op_id.index > queue_state_.committed_op_id.index) { |
1490 | 2.47M | queue_state_.committed_op_id = new_committed_op_id; |
1491 | 2.47M | } |
1492 | 15.1M | queue_state_.last_applied_op_id.MakeAtLeast(last_applied_op_id); |
1493 | 15.1M | local_peer_->last_applied = queue_state_.last_applied_op_id; |
1494 | 15.1M | UpdateAllAppliedOpId(&queue_state_.all_applied_op_id); |
1495 | 15.1M | } |
1496 | 15.1M | } |
1497 | | |
1498 | | void PeerMessageQueue::NotifyObserversOfFailedFollower(const string& uuid, |
1499 | 16 | const string& reason) { |
1500 | 16 | int64_t current_term; |
1501 | 16 | { |
1502 | 16 | LockGuard lock(queue_lock_); |
1503 | 16 | current_term = queue_state_.current_term; |
1504 | 16 | } |
1505 | 16 | NotifyObserversOfFailedFollower(uuid, current_term, reason); |
1506 | 16 | } |
1507 | | |
1508 | | void PeerMessageQueue::NotifyObserversOfFailedFollower(const string& uuid, |
1509 | | int64_t term, |
1510 | 160 | const string& reason) { |
1511 | 160 | NotifyObservers("failed follower", [uuid, term, reason](PeerMessageQueueObserver* observer) { |
1512 | 160 | observer->NotifyFailedFollower(uuid, term, reason); |
1513 | 160 | }); |
1514 | 160 | } |
1515 | | |
1516 | 153k | bool PeerMessageQueue::PeerAcceptedOurLease(const std::string& uuid) const { |
1517 | 153k | std::lock_guard<simple_spinlock> lock(queue_lock_); |
1518 | 153k | TrackedPeer* peer = FindPtrOrNull(peers_map_, uuid); |
1519 | 153k | if (peer == nullptr) { |
1520 | 14.1k | return false; |
1521 | 14.1k | } |
1522 | | |
1523 | 139k | return peer->leader_lease_expiration.last_received != CoarseTimePoint(); |
1524 | 139k | } |
1525 | | |
1526 | 5.09k | bool PeerMessageQueue::CanPeerBecomeLeader(const std::string& peer_uuid) const { |
1527 | 5.09k | std::lock_guard<simple_spinlock> lock(queue_lock_); |
1528 | 5.09k | TrackedPeer* peer = FindPtrOrNull(peers_map_, peer_uuid); |
1529 | 5.09k | if (peer == nullptr) { |
1530 | 0 | LOG(ERROR) << "Invalid peer UUID: " << peer_uuid; |
1531 | 0 | return false; |
1532 | 0 | } |
1533 | 5.09k | const bool peer_can_be_leader = peer->last_received >= queue_state_.majority_replicated_op_id; |
1534 | 5.09k | if (!peer_can_be_leader) { |
1535 | 185 | LOG(INFO) << Format( |
1536 | 185 | "Peer $0 cannot become Leader as it is not caught up: Majority OpId $1, Peer OpId $2", |
1537 | 185 | peer_uuid, queue_state_.majority_replicated_op_id, peer->last_received); |
1538 | 185 | } |
1539 | 5.09k | return peer_can_be_leader; |
1540 | 5.09k | } |
1541 | | |
1542 | 4.85k | OpId PeerMessageQueue::PeerLastReceivedOpId(const TabletServerId& uuid) const { |
1543 | 4.85k | std::lock_guard<simple_spinlock> lock(queue_lock_); |
1544 | 4.85k | TrackedPeer* peer = FindPtrOrNull(peers_map_, uuid); |
1545 | 4.85k | if (peer == nullptr) { |
1546 | 0 | LOG(ERROR) << "Invalid peer UUID: " << uuid; |
1547 | 0 | return OpId::Min(); |
1548 | 0 | } |
1549 | 4.85k | return peer->last_received; |
1550 | 4.85k | } |
1551 | | |
1552 | 274 | string PeerMessageQueue::GetUpToDatePeer() const { |
1553 | 274 | OpId highest_op_id = OpId::Min(); |
1554 | 274 | std::vector<std::string> candidates; |
1555 | | |
1556 | 274 | { |
1557 | 274 | std::lock_guard<simple_spinlock> lock(queue_lock_); |
1558 | 1.03k | for (const PeersMap::value_type& entry : peers_map_) { |
1559 | 1.03k | if (local_peer_uuid_ == entry.first) { |
1560 | 274 | continue; |
1561 | 274 | } |
1562 | 762 | if (highest_op_id > entry.second->last_received) { |
1563 | 39 | continue; |
1564 | 723 | } else if (highest_op_id == entry.second->last_received) { |
1565 | 448 | candidates.push_back(entry.first); |
1566 | 275 | } else { |
1567 | 275 | candidates = {entry.first}; |
1568 | 275 | highest_op_id = entry.second->last_received; |
1569 | 275 | } |
1570 | 762 | } |
1571 | 274 | } |
1572 | | |
1573 | 274 | if (candidates.empty()) { |
1574 | 0 | return string(); |
1575 | 0 | } |
1576 | 274 | size_t index = 0; |
1577 | 274 | if (candidates.size() > 1) { |
1578 | | // choose randomly among candidates at the same opid |
1579 | 245 | index = RandomUniformInt<size_t>(0, candidates.size() - 1); |
1580 | 245 | } |
1581 | 274 | return candidates[index]; |
1582 | 274 | } |
1583 | | |
1584 | 47.8k | PeerMessageQueue::~PeerMessageQueue() { |
1585 | 47.8k | Close(); |
1586 | 47.8k | } |
1587 | | |
1588 | 200k | string PeerMessageQueue::LogPrefixUnlocked() const { |
1589 | | // TODO: we should probably use an atomic here. We'll just annotate away the TSAN error for now, |
1590 | | // since the worst case is a slightly out-of-date log message, and not very likely. |
1591 | 200k | Mode mode = ANNOTATE_UNPROTECTED_READ(queue_state_.mode); |
1592 | 200k | return Substitute("T $0 P $1 [$2]: ", |
1593 | 200k | tablet_id_, |
1594 | 200k | local_peer_uuid_, |
1595 | 200k | ModeToStr(mode)); |
1596 | 200k | } |
1597 | | |
1598 | 131k | string PeerMessageQueue::QueueState::ToString() const { |
1599 | 131k | return Format( |
1600 | 131k | "All replicated op: $0, Majority replicated op: $1, Committed index: $2, Last applied: $3, " |
1601 | 131k | "Last appended: $4, Current term: $5, Majority size: $6, State: $7, Mode: $8$9", |
1602 | 131k | /* 0 */ all_replicated_op_id, |
1603 | 131k | /* 1 */ majority_replicated_op_id, |
1604 | 131k | /* 2 */ committed_op_id, |
1605 | 131k | /* 3 */ last_applied_op_id, |
1606 | 131k | /* 4 */ last_appended, |
1607 | 131k | /* 5 */ current_term, |
1608 | 131k | /* 6 */ majority_size_, |
1609 | 131k | /* 7 */ StateToStr(state), |
1610 | 131k | /* 8 */ ModeToStr(mode), |
1611 | 93.6k | /* 9 */ active_config ? ", active raft config: " + active_config->ShortDebugString() : ""); |
1612 | 131k | } |
1613 | | |
1614 | 0 | size_t PeerMessageQueue::LogCacheSize() { |
1615 | 0 | return log_cache_.BytesUsed(); |
1616 | 0 | } |
1617 | | |
1618 | 0 | size_t PeerMessageQueue::EvictLogCache(size_t bytes_to_evict) { |
1619 | 0 | return log_cache_.EvictThroughOp(std::numeric_limits<int64_t>::max(), bytes_to_evict); |
1620 | 0 | } |
1621 | | |
1622 | 0 | Status PeerMessageQueue::FlushLogIndex() { |
1623 | 0 | return log_cache_.FlushIndex(); |
1624 | 0 | } |
1625 | | |
1626 | 7.03M | void PeerMessageQueue::TrackOperationsMemory(const OpIds& op_ids) { |
1627 | 7.03M | log_cache_.TrackOperationsMemory(op_ids); |
1628 | 7.03M | } |
1629 | | |
1630 | | Result<OpId> PeerMessageQueue::TEST_GetLastOpIdWithType( |
1631 | 3 | int64_t max_allowed_index, OperationType op_type) { |
1632 | 3 | return log_cache_.TEST_GetLastOpIdWithType(max_allowed_index, op_type); |
1633 | 3 | } |
1634 | | |
1635 | 10.3k | Status ValidateFlags() { |
1636 | | // Normally we would have used |
1637 | | // DEFINE_validator(rpc_throttle_threshold_bytes, &RpcThrottleThresholdBytesValidator); |
1638 | | // right after defining the rpc_throttle_threshold_bytes flag. However, this leads to a segfault |
1639 | | // in the LTO-enabled build, presumably due to indeterminate order of static initialization. |
1640 | | // Instead, we invoke this function from master/tserver main() functions when static |
1641 | | // initialization is already finished. |
1642 | 10.3k | if (!RpcThrottleThresholdBytesValidator( |
1643 | 0 | "rpc_throttle_threshold_bytes", FLAGS_rpc_throttle_threshold_bytes)) { |
1644 | 0 | return STATUS(InvalidArgument, "Flag validation failed"); |
1645 | 0 | } |
1646 | | |
1647 | 10.3k | return Status::OK(); |
1648 | 10.3k | } |
1649 | | |
1650 | | } // namespace consensus |
1651 | | } // namespace yb |