/Users/deen/code/yugabyte-db/src/yb/consensus/consensus_queue.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | // |
18 | | // The following only applies to changes made to this file as part of YugaByte development. |
19 | | // |
20 | | // Portions Copyright (c) YugaByte, Inc. |
21 | | // |
22 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
23 | | // in compliance with the License. You may obtain a copy of the License at |
24 | | // |
25 | | // http://www.apache.org/licenses/LICENSE-2.0 |
26 | | // |
27 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
28 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
29 | | // or implied. See the License for the specific language governing permissions and limitations |
30 | | // under the License. |
31 | | // |
32 | | |
33 | | #include "yb/consensus/consensus_queue.h" |
34 | | |
35 | | #include <algorithm> |
36 | | #include <mutex> |
37 | | #include <shared_mutex> |
38 | | #include <string> |
39 | | #include <utility> |
40 | | |
41 | | #include <boost/container/small_vector.hpp> |
42 | | #include <glog/logging.h> |
43 | | |
44 | | #include "yb/consensus/consensus_context.h" |
45 | | #include "yb/consensus/log_util.h" |
46 | | #include "yb/consensus/opid_util.h" |
47 | | #include "yb/consensus/quorum_util.h" |
48 | | #include "yb/consensus/raft_consensus.h" |
49 | | #include "yb/consensus/replicate_msgs_holder.h" |
50 | | |
51 | | #include "yb/gutil/bind.h" |
52 | | #include "yb/gutil/dynamic_annotations.h" |
53 | | #include "yb/gutil/map-util.h" |
54 | | #include "yb/gutil/stl_util.h" |
55 | | #include "yb/gutil/strings/substitute.h" |
56 | | |
57 | | #include "yb/util/enums.h" |
58 | | #include "yb/util/fault_injection.h" |
59 | | #include "yb/util/flag_tags.h" |
60 | | #include "yb/util/locks.h" |
61 | | #include "yb/util/logging.h" |
62 | | #include "yb/util/mem_tracker.h" |
63 | | #include "yb/util/metrics.h" |
64 | | #include "yb/util/monotime.h" |
65 | | #include "yb/util/random_util.h" |
66 | | #include "yb/util/result.h" |
67 | | #include "yb/util/size_literals.h" |
68 | | #include "yb/util/status_log.h" |
69 | | #include "yb/util/threadpool.h" |
70 | | #include "yb/util/tostring.h" |
71 | | #include "yb/util/url-coding.h" |
72 | | |
73 | | using namespace std::literals; |
74 | | using namespace yb::size_literals; |
75 | | |
76 | | DECLARE_uint64(rpc_max_message_size); |
77 | | |
78 | | // We expect that consensus_max_batch_size_bytes + 1_KB would be less than rpc_max_message_size. |
79 | | // Otherwise such batch would be rejected by RPC layer. |
80 | | DEFINE_uint64(consensus_max_batch_size_bytes, 4_MB, |
81 | | "The maximum per-tablet RPC batch size when updating peers."); |
82 | | TAG_FLAG(consensus_max_batch_size_bytes, advanced); |
83 | | TAG_FLAG(consensus_max_batch_size_bytes, runtime); |
84 | | |
85 | | DEFINE_int32(follower_unavailable_considered_failed_sec, 900, |
86 | | "Seconds that a leader is unable to successfully heartbeat to a " |
87 | | "follower after which the follower is considered to be failed and " |
88 | | "evicted from the config."); |
89 | | TAG_FLAG(follower_unavailable_considered_failed_sec, advanced); |
90 | | |
91 | | DEFINE_int32(consensus_inject_latency_ms_in_notifications, 0, |
92 | | "Injects a random sleep between 0 and this many milliseconds into " |
93 | | "asynchronous notifications from the consensus queue back to the " |
94 | | "consensus implementation."); |
95 | | TAG_FLAG(consensus_inject_latency_ms_in_notifications, hidden); |
96 | | TAG_FLAG(consensus_inject_latency_ms_in_notifications, unsafe); |
97 | | |
98 | | DEFINE_int32(cdc_checkpoint_opid_interval_ms, 60 * 1000, |
99 | | "Interval up to which CDC consumer's checkpoint is considered for retaining log cache." |
100 | | "If we haven't received an updated checkpoint from CDC consumer within the interval " |
101 | | "specified by cdc_checkpoint_opid_interval, then log cache does not consider that " |
102 | | "consumer while determining which op IDs to evict."); |
103 | | |
104 | | DEFINE_int64(cdc_intent_retention_ms, 4 * 3600 * 1000, |
105 | | "Interval up to which CDC consumer's checkpoint is considered for retaining intents." |
106 | | "If we haven't received an updated checkpoint from CDC consumer within the interval " |
107 | | "specified by cdc_checkpoint_opid_interval, then CDC does not consider that " |
108 | | "consumer while determining which op IDs to delete from the intent."); |
109 | | |
110 | | DEFINE_bool(enable_consensus_exponential_backoff, true, |
111 | | "Whether exponential backoff based on number of retransmissions at tablet leader " |
112 | | "for number of entries to replicate to lagging follower is enabled."); |
113 | | TAG_FLAG(enable_consensus_exponential_backoff, advanced); |
114 | | TAG_FLAG(enable_consensus_exponential_backoff, runtime); |
115 | | |
116 | | DEFINE_int32(consensus_lagging_follower_threshold, 10, |
117 | | "Number of retransmissions at tablet leader to mark a follower as lagging. " |
118 | | "-1 disables the feature."); |
119 | | TAG_FLAG(consensus_lagging_follower_threshold, advanced); |
120 | | TAG_FLAG(consensus_lagging_follower_threshold, runtime); |
121 | | |
122 | | DEFINE_test_flag(bool, disallow_lmp_failures, false, |
123 | | "Whether we disallow PRECEDING_ENTRY_DIDNT_MATCH failures for non new peers."); |
124 | | |
125 | | namespace { |
126 | | |
127 | | constexpr const auto kMinRpcThrottleThresholdBytes = 16; |
128 | | |
129 | 14.6k | static bool RpcThrottleThresholdBytesValidator(const char* flagname, int64_t value) { |
130 | 14.6k | if (value > 0) { |
131 | 14.6k | if (value < kMinRpcThrottleThresholdBytes) { |
132 | 0 | LOG(ERROR) << "Expect " << flagname << " to be at least " << kMinRpcThrottleThresholdBytes; |
133 | 0 | return false; |
134 | 14.6k | } else if (implicit_cast<size_t>(value) >= FLAGS_consensus_max_batch_size_bytes) { |
135 | 0 | LOG(ERROR) << "Expect " << flagname << " to be less than consensus_max_batch_size_bytes " |
136 | 0 | << "value (" << FLAGS_consensus_max_batch_size_bytes << ")"; |
137 | 0 | return false; |
138 | 0 | } |
139 | 14.6k | } |
140 | 14.6k | return true; |
141 | 14.6k | } |
142 | | |
143 | | } // namespace |
144 | | |
145 | | DECLARE_int64(rpc_throttle_threshold_bytes); |
146 | | |
147 | | namespace yb { |
148 | | namespace consensus { |
149 | | |
150 | | using log::Log; |
151 | | using std::unique_ptr; |
152 | | using rpc::Messenger; |
153 | | using strings::Substitute; |
154 | | |
155 | | METRIC_DEFINE_gauge_int64(tablet, majority_done_ops, "Leader Operations Acked by Majority", |
156 | | MetricUnit::kOperations, |
157 | | "Number of operations in the leader queue ack'd by a majority but " |
158 | | "not all peers."); |
159 | | METRIC_DEFINE_gauge_int64(tablet, in_progress_ops, "Leader Operations in Progress", |
160 | | MetricUnit::kOperations, |
161 | | "Number of operations in the leader queue ack'd by a minority of " |
162 | | "peers."); |
163 | | |
164 | | const auto kCDCConsumerCheckpointInterval = FLAGS_cdc_checkpoint_opid_interval_ms * 1ms; |
165 | | |
166 | | const auto kCDCConsumerIntentRetention = FLAGS_cdc_intent_retention_ms * 1ms; |
167 | | |
168 | 0 | std::string MajorityReplicatedData::ToString() const { |
169 | 0 | return Format( |
170 | 0 | "{ op_id: $0 leader_lease_expiration: $1 ht_lease_expiration: $2 num_sst_files: $3 }", |
171 | 0 | op_id, leader_lease_expiration, ht_lease_expiration, num_sst_files); |
172 | 0 | } |
173 | | |
174 | 122k | std::string PeerMessageQueue::TrackedPeer::ToString() const { |
175 | 122k | return Format( |
176 | 122k | "{ peer: $0 is_new: $1 last_received: $2 next_index: $3 last_known_committed_idx: $4 " |
177 | 122k | "is_last_exchange_successful: $5 needs_remote_bootstrap: $6 member_type: $7 " |
178 | 122k | "num_sst_files: $8 last_applied: $9 }", |
179 | 122k | uuid, is_new, last_received, next_index, last_known_committed_idx, |
180 | 122k | is_last_exchange_successful, needs_remote_bootstrap, PeerMemberType_Name(member_type), |
181 | 122k | num_sst_files, last_applied); |
182 | 122k | } |
183 | | |
184 | 79.5k | void PeerMessageQueue::TrackedPeer::ResetLeaderLeases() { |
185 | 79.5k | leader_lease_expiration.Reset(); |
186 | 79.5k | leader_ht_lease_expiration.Reset(); |
187 | 79.5k | } |
188 | | |
189 | 34.9M | void PeerMessageQueue::TrackedPeer::ResetLastRequest() { |
190 | | // Reset so that next transmission is not considered a re-transmission. |
191 | 34.9M | last_num_messages_sent = -1; |
192 | 34.9M | current_retransmissions = -1; |
193 | 34.9M | } |
194 | | |
195 | | #define INSTANTIATE_METRIC(x) \ |
196 | | x.Instantiate(metric_entity, 0) |
197 | | PeerMessageQueue::Metrics::Metrics(const scoped_refptr<MetricEntity>& metric_entity) |
198 | | : num_majority_done_ops(INSTANTIATE_METRIC(METRIC_majority_done_ops)), |
199 | 150k | num_in_progress_ops(INSTANTIATE_METRIC(METRIC_in_progress_ops)) { |
200 | 150k | } |
201 | | #undef INSTANTIATE_METRIC |
202 | | |
203 | | PeerMessageQueue::PeerMessageQueue(const scoped_refptr<MetricEntity>& metric_entity, |
204 | | const scoped_refptr<log::Log>& log, |
205 | | const MemTrackerPtr& server_tracker, |
206 | | const MemTrackerPtr& parent_tracker, |
207 | | const RaftPeerPB& local_peer_pb, |
208 | | const string& tablet_id, |
209 | | const server::ClockPtr& clock, |
210 | | ConsensusContext* context, |
211 | | unique_ptr<ThreadPoolToken> raft_pool_token) |
212 | | : raft_pool_observers_token_(std::move(raft_pool_token)), |
213 | | local_peer_pb_(local_peer_pb), |
214 | | local_peer_uuid_(local_peer_pb_.has_permanent_uuid() ? local_peer_pb_.permanent_uuid() |
215 | | : string()), |
216 | | tablet_id_(tablet_id), |
217 | | log_cache_(metric_entity, log, server_tracker, local_peer_pb.permanent_uuid(), tablet_id), |
218 | | operations_mem_tracker_( |
219 | | MemTracker::FindOrCreateTracker("OperationsFromDisk", parent_tracker)), |
220 | | metrics_(metric_entity), |
221 | | clock_(clock), |
222 | 150k | context_(context) { |
223 | 150k | DCHECK(local_peer_pb_.has_permanent_uuid()); |
224 | 150k | DCHECK(!local_peer_pb_.last_known_private_addr().empty()); |
225 | 150k | } |
226 | | |
227 | 150k | void PeerMessageQueue::Init(const OpId& last_locally_replicated) { |
228 | 150k | LockGuard lock(queue_lock_); |
229 | 150k | CHECK_EQ(queue_state_.state, State::kQueueConstructed); |
230 | 150k | log_cache_.Init(last_locally_replicated.ToPB<OpIdPB>()); |
231 | 150k | queue_state_.last_appended = last_locally_replicated; |
232 | 150k | queue_state_.state = State::kQueueOpen; |
233 | 150k | local_peer_ = TrackPeerUnlocked(local_peer_uuid_); |
234 | | |
235 | 150k | if (context_) { |
236 | 150k | context_->ListenNumSSTFilesChanged(std::bind(&PeerMessageQueue::NumSSTFilesChanged, this)); |
237 | 150k | installed_num_sst_files_changed_listener_ = true; |
238 | 150k | } |
239 | 150k | } |
240 | | |
241 | | void PeerMessageQueue::SetLeaderMode(const OpId& committed_op_id, |
242 | | int64_t current_term, |
243 | | const OpId& last_applied_op_id, |
244 | 67.9k | const RaftConfigPB& active_config) { |
245 | 67.9k | LockGuard lock(queue_lock_); |
246 | 67.9k | queue_state_.current_term = current_term; |
247 | 67.9k | queue_state_.committed_op_id = committed_op_id; |
248 | 67.9k | queue_state_.last_applied_op_id = last_applied_op_id; |
249 | 67.9k | queue_state_.majority_replicated_op_id = committed_op_id; |
250 | 67.9k | queue_state_.active_config.reset(new RaftConfigPB(active_config)); |
251 | 67.9k | CHECK(IsRaftConfigVoter(local_peer_uuid_, *queue_state_.active_config)) |
252 | 11 | << local_peer_pb_.ShortDebugString() << " not a voter in config: " |
253 | 11 | << queue_state_.active_config->ShortDebugString(); |
254 | 67.9k | queue_state_.majority_size_ = MajoritySize(CountVoters(*queue_state_.active_config)); |
255 | 67.9k | queue_state_.mode = Mode::LEADER; |
256 | | |
257 | 67.9k | LOG_WITH_PREFIX_UNLOCKED(INFO) << "Queue going to LEADER mode. State: " |
258 | 67.9k | << queue_state_.ToString(); |
259 | 67.9k | CheckPeersInActiveConfigIfLeaderUnlocked(); |
260 | | |
261 | | // Reset last communication time with all peers to reset the clock on the |
262 | | // failure timeout. |
263 | 67.9k | MonoTime now(MonoTime::Now()); |
264 | 79.5k | for (const PeersMap::value_type& entry : peers_map_) { |
265 | 79.5k | entry.second->ResetLeaderLeases(); |
266 | 79.5k | entry.second->last_successful_communication_time = now; |
267 | 79.5k | } |
268 | 67.9k | } |
269 | | |
270 | 161k | void PeerMessageQueue::SetNonLeaderMode() { |
271 | 161k | LockGuard lock(queue_lock_); |
272 | 161k | queue_state_.active_config.reset(); |
273 | 161k | queue_state_.mode = Mode::NON_LEADER; |
274 | 161k | queue_state_.majority_size_ = -1; |
275 | 161k | LOG_WITH_PREFIX_UNLOCKED(INFO) << "Queue going to NON_LEADER mode. State: " |
276 | 161k | << queue_state_.ToString(); |
277 | 161k | } |
278 | | |
279 | 123k | void PeerMessageQueue::TrackPeer(const string& uuid) { |
280 | 123k | LockGuard lock(queue_lock_); |
281 | 123k | TrackPeerUnlocked(uuid); |
282 | 123k | } |
283 | | |
284 | 273k | PeerMessageQueue::TrackedPeer* PeerMessageQueue::TrackPeerUnlocked(const string& uuid) { |
285 | 273k | CHECK(!uuid.empty()) << "Got request to track peer with empty UUID"49 ; |
286 | 273k | DCHECK_EQ(queue_state_.state, State::kQueueOpen); |
287 | | |
288 | 273k | TrackedPeer* tracked_peer = new TrackedPeer(uuid); |
289 | | |
290 | | // We don't know the last operation received by the peer so, following the Raft protocol, we set |
291 | | // next_index to one past the end of our own log. This way, if calling this method is the result |
292 | | // of a successful leader election and the logs between the new leader and remote peer match, the |
293 | | // peer->next_index will point to the index of the soon-to-be-written NO_OP entry that is used to |
294 | | // assert leadership. If we guessed wrong, and the peer does not have a log that matches ours, the |
295 | | // normal queue negotiation process will eventually find the right point to resume from. |
296 | 273k | tracked_peer->next_index = queue_state_.last_appended.index + 1; |
297 | 273k | InsertOrDie(&peers_map_, uuid, tracked_peer); |
298 | | |
299 | 273k | CheckPeersInActiveConfigIfLeaderUnlocked(); |
300 | | |
301 | | // We don't know how far back this peer is, so set the all replicated watermark to |
302 | | // MinimumOpId. We'll advance it when we know how far along the peer is. |
303 | 273k | queue_state_.all_replicated_op_id = OpId::Min(); |
304 | 273k | return tracked_peer; |
305 | 273k | } |
306 | | |
307 | 75.4k | void PeerMessageQueue::UntrackPeer(const string& uuid) { |
308 | 75.4k | LockGuard lock(queue_lock_); |
309 | 75.4k | TrackedPeer* peer = EraseKeyReturnValuePtr(&peers_map_, uuid); |
310 | 75.4k | if (peer != nullptr) { |
311 | 75.4k | delete peer; |
312 | 75.4k | } |
313 | 75.4k | } |
314 | | |
315 | 341k | void PeerMessageQueue::CheckPeersInActiveConfigIfLeaderUnlocked() const { |
316 | 341k | if (queue_state_.mode != Mode::LEADER) return150k ; |
317 | 191k | std::unordered_set<std::string> config_peer_uuids; |
318 | 581k | for (const RaftPeerPB& peer_pb : queue_state_.active_config->peers()) { |
319 | 581k | InsertOrDie(&config_peer_uuids, peer_pb.permanent_uuid()); |
320 | 581k | } |
321 | 396k | for (const PeersMap::value_type& entry : peers_map_) { |
322 | 396k | if (!ContainsKey(config_peer_uuids, entry.first)) { |
323 | 0 | LOG_WITH_PREFIX_UNLOCKED(FATAL) << Substitute("Peer $0 is not in the active config. " |
324 | 0 | "Queue state: $1", |
325 | 0 | entry.first, |
326 | 0 | queue_state_.ToString()); |
327 | 0 | } |
328 | 396k | } |
329 | 191k | } |
330 | | |
331 | 3.41k | void PeerMessageQueue::NumSSTFilesChanged() { |
332 | 3.41k | auto num_sst_files = context_->NumSSTFiles(); |
333 | | |
334 | 3.41k | uint64_t majority_replicated_num_sst_files; |
335 | 3.41k | { |
336 | 3.41k | LockGuard lock(queue_lock_); |
337 | 3.41k | if (queue_state_.mode != Mode::LEADER) { |
338 | 1.34k | return; |
339 | 1.34k | } |
340 | 2.06k | auto it = peers_map_.find(local_peer_uuid_); |
341 | 2.06k | if (it == peers_map_.end()) { |
342 | 0 | return; |
343 | 0 | } |
344 | 2.06k | it->second->num_sst_files = num_sst_files; |
345 | 2.06k | majority_replicated_num_sst_files = NumSSTFilesWatermark(); |
346 | 2.06k | } |
347 | | |
348 | 0 | NotifyObservers( |
349 | 2.06k | "majority replicated num SST files changed", |
350 | 2.06k | [majority_replicated_num_sst_files](PeerMessageQueueObserver* observer) { |
351 | 2.06k | observer->MajorityReplicatedNumSSTFilesChanged(majority_replicated_num_sst_files); |
352 | 2.06k | }); |
353 | 2.06k | } |
354 | | |
355 | 24.3M | void PeerMessageQueue::LocalPeerAppendFinished(const OpId& id, const Status& status) { |
356 | 24.3M | CHECK_OK(status); |
357 | | |
358 | | // Fake an RPC response from the local peer. |
359 | | // TODO: we should probably refactor the ResponseFromPeer function so that we don't need to |
360 | | // construct this fake response, but this seems to work for now. |
361 | 24.3M | ConsensusResponsePB fake_response; |
362 | 24.3M | id.ToPB(fake_response.mutable_status()->mutable_last_received()); |
363 | 24.3M | id.ToPB(fake_response.mutable_status()->mutable_last_received_current_leader()); |
364 | 24.3M | if (context_) { |
365 | 24.2M | fake_response.set_num_sst_files(context_->NumSSTFiles()); |
366 | 24.2M | } |
367 | 24.3M | { |
368 | 24.3M | LockGuard lock(queue_lock_); |
369 | | |
370 | | // TODO This ugly fix is required because we unlock queue_lock_ while doing AppendOperations. |
371 | | // So LocalPeerAppendFinished could be invoked before rest of AppendOperations. |
372 | 24.3M | if (queue_state_.last_appended.index < id.index) { |
373 | 88.5k | queue_state_.last_appended = id; |
374 | 88.5k | } |
375 | 24.3M | fake_response.mutable_status()->set_last_committed_idx(queue_state_.committed_op_id.index); |
376 | 24.3M | queue_state_.last_applied_op_id.ToPB(fake_response.mutable_status()->mutable_last_applied()); |
377 | | |
378 | 24.3M | if (queue_state_.mode != Mode::LEADER) { |
379 | 15.0M | log_cache_.EvictThroughOp(id.index); |
380 | | |
381 | 15.0M | UpdateMetrics(); |
382 | 15.0M | return; |
383 | 15.0M | } |
384 | 24.3M | } |
385 | 9.28M | ResponseFromPeer(local_peer_uuid_, fake_response); |
386 | 9.28M | } |
387 | | |
388 | 768 | Status PeerMessageQueue::TEST_AppendOperation(const ReplicateMsgPtr& msg) { |
389 | 768 | return AppendOperations( |
390 | 768 | { msg }, yb::OpId::FromPB(msg->committed_op_id()), RestartSafeCoarseMonoClock().Now()); |
391 | 768 | } |
392 | | |
393 | | Status PeerMessageQueue::AppendOperations(const ReplicateMsgs& msgs, |
394 | | const yb::OpId& committed_op_id, |
395 | 24.2M | RestartSafeCoarseTimePoint batch_mono_time) { |
396 | 24.2M | DFAKE_SCOPED_LOCK(append_fake_lock_); |
397 | 24.2M | OpId last_id; |
398 | 24.2M | if (!msgs.empty()) { |
399 | 13.2M | std::unique_lock<simple_spinlock> lock(queue_lock_); |
400 | | |
401 | 13.2M | last_id = OpId::FromPB(msgs.back()->id()); |
402 | | |
403 | 13.2M | if (last_id.term > queue_state_.current_term) { |
404 | 120k | queue_state_.current_term = last_id.term; |
405 | 120k | } |
406 | 13.2M | } else { |
407 | 11.0M | std::unique_lock<simple_spinlock> lock(queue_lock_); |
408 | 11.0M | last_id = queue_state_.last_appended; |
409 | 11.0M | } |
410 | | |
411 | | // Unlock ourselves during Append to prevent a deadlock: it's possible that the log buffer is |
412 | | // full, in which case AppendOperations would block. However, for the log buffer to empty, it may |
413 | | // need to call LocalPeerAppendFinished() which also needs queue_lock_. |
414 | | // |
415 | | // Since we are doing AppendOperations only in one thread, no concurrent AppendOperations could |
416 | | // be executed and queue_state_.last_appended will be updated correctly. |
417 | 24.2M | RETURN_NOT_OK(log_cache_.AppendOperations( |
418 | 24.2M | msgs, committed_op_id, batch_mono_time, |
419 | 24.2M | Bind(&PeerMessageQueue::LocalPeerAppendFinished, Unretained(this), last_id))); |
420 | | |
421 | 24.2M | if (!msgs.empty()) { |
422 | 13.2M | std::unique_lock<simple_spinlock> lock(queue_lock_); |
423 | 13.2M | queue_state_.last_appended = last_id; |
424 | 13.2M | UpdateMetrics(); |
425 | 13.2M | } |
426 | | |
427 | 24.2M | return Status::OK(); |
428 | 24.2M | } |
429 | | |
430 | 3.17M | uint64_t GetNumMessagesToSendWithBackoff(int64_t last_num_messages_sent) { |
431 | 3.17M | return std::max<int64_t>((last_num_messages_sent >> 1) - 1, 0); |
432 | 3.17M | } |
433 | | |
434 | | Status PeerMessageQueue::RequestForPeer(const string& uuid, |
435 | | ConsensusRequestPB* request, |
436 | | ReplicateMsgsHolder* msgs_holder, |
437 | | bool* needs_remote_bootstrap, |
438 | | PeerMemberType* member_type, |
439 | 28.8M | bool* last_exchange_successful) { |
440 | 28.8M | static constexpr uint64_t kSendUnboundedLogOps = std::numeric_limits<uint64_t>::max(); |
441 | 28.8M | DCHECK(request->ops().empty()) << request->ShortDebugString()61.4k ; |
442 | | |
443 | 28.8M | OpId preceding_id; |
444 | 28.8M | MonoDelta unreachable_time = MonoDelta::kMin; |
445 | 28.8M | bool is_voter = false; |
446 | 28.8M | bool is_new; |
447 | 28.8M | int64_t previously_sent_index; |
448 | 28.8M | uint64_t num_log_ops_to_send; |
449 | 28.8M | HybridTime propagated_safe_time; |
450 | | |
451 | | // Should be before now_ht, i.e. not greater than propagated_hybrid_time. |
452 | 28.8M | if (context_) { |
453 | 28.7M | propagated_safe_time = VERIFY_RESULT(context_->PreparePeerRequest()); |
454 | 28.7M | } |
455 | | |
456 | 28.8M | { |
457 | 28.8M | LockGuard lock(queue_lock_); |
458 | 28.8M | DCHECK_EQ(queue_state_.state, State::kQueueOpen); |
459 | 28.8M | DCHECK_NE(uuid, local_peer_uuid_); |
460 | | |
461 | 28.8M | auto peer = FindPtrOrNull(peers_map_, uuid); |
462 | 28.8M | if (PREDICT_FALSE(peer == nullptr || queue_state_.mode == Mode::NON_LEADER)) { |
463 | 69 | return STATUS(NotFound, "Peer not tracked or queue not in leader mode."); |
464 | 69 | } |
465 | | |
466 | 28.8M | HybridTime now_ht; |
467 | | |
468 | 28.8M | is_new = peer->is_new; |
469 | 28.8M | if (!is_new) { |
470 | 28.6M | now_ht = clock_->Now(); |
471 | | |
472 | 28.6M | auto ht_lease_expiration_micros = now_ht.GetPhysicalValueMicros() + |
473 | 28.6M | FLAGS_ht_lease_duration_ms * 1000; |
474 | 28.6M | auto leader_lease_duration_ms = GetAtomicFlag(&FLAGS_leader_lease_duration_ms); |
475 | 28.6M | request->set_leader_lease_duration_ms(leader_lease_duration_ms); |
476 | 28.6M | request->set_ht_lease_expiration(ht_lease_expiration_micros); |
477 | | |
478 | | // As noted here: |
479 | | // https://red.ht/2sCSErb |
480 | | // |
481 | | // The _COARSE variants are faster to read and have a precision (also known as resolution) of |
482 | | // one millisecond (ms). |
483 | | // |
484 | | // Coarse clock precision is 1 millisecond. |
485 | 28.6M | const auto kCoarseClockPrecision = 1ms; |
486 | | |
487 | | // Because of coarse clocks we subtract 2ms, to be sure that our local version of lease |
488 | | // does not expire after it expires at follower. |
489 | 28.6M | peer->leader_lease_expiration.last_sent = |
490 | 28.6M | CoarseMonoClock::Now() + leader_lease_duration_ms * 1ms - kCoarseClockPrecision * 2; |
491 | 28.6M | peer->leader_ht_lease_expiration.last_sent = ht_lease_expiration_micros; |
492 | 28.6M | } else { |
493 | 190k | now_ht = clock_->Now(); |
494 | 190k | request->clear_leader_lease_duration_ms(); |
495 | 190k | request->clear_ht_lease_expiration(); |
496 | 190k | peer->leader_lease_expiration.Reset(); |
497 | 190k | peer->leader_ht_lease_expiration.Reset(); |
498 | 190k | } |
499 | | // This is initialized to the queue's last appended op but gets set to the id of the |
500 | | // log entry preceding the first one in 'messages' if messages are found for the peer. |
501 | | // |
502 | | // The leader does not know the actual state of a peer but it should always send a value of |
503 | | // preceding_id that is present in the leader's own log, so the follower can verify the log |
504 | | // matching property. |
505 | | // |
506 | | // In case we decide not to send any messages to the follower this time due to exponential |
507 | | // backoff to an unresponsive follower, we will keep preceding_id equal to last_appended. |
508 | | // This is safe because unless the follower already has that operation, it will fail to find |
509 | | // it in its pending operations in EnforceLogMatchingPropertyMatchesUnlocked and will return |
510 | | // a log matching property violation error without applying any incorrect messages from its log. |
511 | | // |
512 | | // See this scenario for more context on the issue we are trying to avoid: |
513 | | // https://github.com/yugabyte/yugabyte-db/issues/8150#issuecomment-827821784 |
514 | 28.8M | preceding_id = queue_state_.last_appended; |
515 | | |
516 | 28.8M | request->set_propagated_hybrid_time(now_ht.ToUint64()); |
517 | | |
518 | | // NOTE: committed_op_id may be overwritten later. |
519 | | // In our system committed_op_id means that this operation was also applied. |
520 | | // If we have operation that applied significant time, followers would not know that this |
521 | | // operation is committed until it is applied in the leader. |
522 | | // To address this issue we use majority_replicated_op_id, that is updated before apply. |
523 | | // But we could use it only when its term matches current term, see Fig.8 in Raft paper. |
524 | 28.8M | if (queue_state_.majority_replicated_op_id.index > queue_state_.committed_op_id.index && |
525 | 28.8M | queue_state_.majority_replicated_op_id.term == queue_state_.current_term720k ) { |
526 | 720k | queue_state_.majority_replicated_op_id.ToPB(request->mutable_committed_op_id()); |
527 | 28.1M | } else { |
528 | 28.1M | queue_state_.committed_op_id.ToPB(request->mutable_committed_op_id()); |
529 | 28.1M | } |
530 | | |
531 | 28.8M | request->set_caller_term(queue_state_.current_term); |
532 | 28.8M | unreachable_time = |
533 | 28.8M | MonoTime::Now().GetDeltaSince(peer->last_successful_communication_time); |
534 | 28.8M | if (member_type28.8M ) *member_type = peer->member_type; |
535 | 28.8M | if (last_exchange_successful28.8M ) *last_exchange_successful = peer->is_last_exchange_successful; |
536 | 28.8M | *needs_remote_bootstrap = peer->needs_remote_bootstrap; |
537 | | |
538 | 28.8M | previously_sent_index = peer->next_index - 1; |
539 | 28.8M | if (FLAGS_enable_consensus_exponential_backoff && peer->last_num_messages_sent >= 028.8M ) { |
540 | | // Previous request to peer has not been acked. Reduce number of entries to be sent |
541 | | // in this attempt using exponential backoff. Note that to_index is inclusive. |
542 | 3.17M | num_log_ops_to_send = GetNumMessagesToSendWithBackoff(peer->last_num_messages_sent); |
543 | 25.6M | } else { |
544 | | // Previous request to peer has been acked or a heartbeat response has been received. |
545 | | // Transmit as many entries as allowed. |
546 | 25.6M | num_log_ops_to_send = kSendUnboundedLogOps; |
547 | 25.6M | } |
548 | | |
549 | 28.8M | peer->current_retransmissions++; |
550 | | |
551 | 28.8M | if (peer->member_type == PeerMemberType::VOTER) { |
552 | 25.5M | is_voter = true; |
553 | 25.5M | } |
554 | 28.8M | } |
555 | | |
556 | 28.8M | if (unreachable_time.ToSeconds() > FLAGS_follower_unavailable_considered_failed_sec) { |
557 | 217k | if (!is_voter || CountVoters(*queue_state_.active_config) > 2217k ) { |
558 | | // We never drop from 2 voters to 1 voter automatically, at least for now (12/4/18). We may |
559 | | // want to revisit this later, we're just being cautious with this. |
560 | | // We remove unconditionally any failed non-voter replica (PRE_VOTER, PRE_OBSERVER, OBSERVER). |
561 | 235 | string msg = Substitute("Leader has been unable to successfully communicate " |
562 | 235 | "with Peer $0 for more than $1 seconds ($2)", |
563 | 235 | uuid, |
564 | 235 | FLAGS_follower_unavailable_considered_failed_sec, |
565 | 235 | unreachable_time.ToString()); |
566 | 235 | NotifyObserversOfFailedFollower(uuid, queue_state_.current_term, msg); |
567 | 235 | } |
568 | 217k | } |
569 | | |
570 | 28.8M | if (PREDICT_FALSE(*needs_remote_bootstrap)) { |
571 | 10.7k | YB_LOG_WITH_PREFIX_UNLOCKED_EVERY_N_SECS486 (INFO, 30) |
572 | 486 | << "Peer needs remote bootstrap: " << uuid; |
573 | 10.7k | return Status::OK(); |
574 | 10.7k | } |
575 | 28.8M | *needs_remote_bootstrap = false; |
576 | | |
577 | 28.8M | request->clear_propagated_safe_time(); |
578 | | |
579 | | // If we've never communicated with the peer, we don't know what messages to send, so we'll send a |
580 | | // status-only request. If the peer has not responded to the point that our to_index == next_index |
581 | | // due to exponential backoff of replicated segment size, we also send a status-only request. |
582 | | // Otherwise, we grab requests from the log starting at the last_received point. |
583 | 28.8M | if (!is_new && num_log_ops_to_send > 028.6M ) { |
584 | | // The batch of messages to send to the peer. |
585 | 25.4M | auto max_batch_size = FLAGS_consensus_max_batch_size_bytes - request->ByteSizeLong(); |
586 | 25.4M | auto to_index = num_log_ops_to_send == kSendUnboundedLogOps ? |
587 | 25.4M | 0 : previously_sent_index + num_log_ops_to_send26.9k ; |
588 | 25.4M | auto result = ReadFromLogCache(previously_sent_index, to_index, max_batch_size, uuid); |
589 | | |
590 | 25.4M | if (PREDICT_FALSE(!result.ok())) { |
591 | 0 | if (PREDICT_TRUE(result.status().IsNotFound())) { |
592 | 0 | std::string msg = Format("The logs necessary to catch up peer $0 have been " |
593 | 0 | "garbage collected. The follower will never be able " |
594 | 0 | "to catch up ($1)", uuid, result.status()); |
595 | 0 | NotifyObserversOfFailedFollower(uuid, queue_state_.current_term, msg); |
596 | 0 | } |
597 | 0 | return result.status(); |
598 | 0 | } |
599 | | |
600 | 25.4M | preceding_id = result->preceding_op; |
601 | | // We use AddAllocated rather than copy, because we pin the log cache at the "all replicated" |
602 | | // point. At some point we may want to allow partially loading (and not pinning) earlier |
603 | | // messages. At that point we'll need to do something smarter here, like copy or ref-count. |
604 | 25.4M | for (const auto& msg : result->messages) { |
605 | 9.43M | request->mutable_ops()->AddAllocated(msg.get()); |
606 | 9.43M | } |
607 | | |
608 | 25.4M | { |
609 | 25.4M | LockGuard lock(queue_lock_); |
610 | 25.4M | auto peer = FindPtrOrNull(peers_map_, uuid); |
611 | 25.4M | if (PREDICT_FALSE(peer == nullptr)) { |
612 | 0 | return STATUS(NotFound, "Peer not tracked."); |
613 | 0 | } |
614 | | |
615 | 25.4M | peer->last_num_messages_sent = result->messages.size(); |
616 | 25.4M | } |
617 | | |
618 | 0 | ScopedTrackedConsumption consumption; |
619 | 25.4M | if (result->read_from_disk_size) { |
620 | 1.33k | consumption = ScopedTrackedConsumption(operations_mem_tracker_, result->read_from_disk_size); |
621 | 1.33k | } |
622 | 25.4M | *msgs_holder = ReplicateMsgsHolder( |
623 | 25.4M | request->mutable_ops(), std::move(result->messages), std::move(consumption)); |
624 | | |
625 | 25.4M | if (propagated_safe_time && |
626 | 25.4M | !result->have_more_messages25.4M && |
627 | 25.4M | num_log_ops_to_send == kSendUnboundedLogOps25.4M ) { |
628 | | // Get the current local safe time on the leader and propagate it to the follower. |
629 | 25.4M | request->set_propagated_safe_time(propagated_safe_time.ToUint64()); |
630 | 25.4M | } |
631 | 25.4M | } |
632 | | |
633 | 28.8M | preceding_id.ToPB(request->mutable_preceding_id()); |
634 | | |
635 | | // All entries committed at leader may not be available at lagging follower. |
636 | | // `commited_op_id` in this request may make a lagging follower aware of the |
637 | | // highest committed op index at the leader. We have a sanity check during tablet |
638 | | // bootstrap, in TabletBootstrap::PlaySegments(), that this tablet did not lose a |
639 | | // committed operation. Hence avoid sending a committed op id that is too large |
640 | | // to such a lagging follower. |
641 | | // If we send operations to it, then last know operation to this follower will be last sent |
642 | | // operation. If we don't send any operation, then last known operation will be preceding |
643 | | // operation. |
644 | | // We don't have to change committed_op_id when it is less than max_allowed_committed_op_id, |
645 | | // because it will have actual committed_op_id value and this operation is known to the |
646 | | // follower. |
647 | 28.8M | const auto max_allowed_committed_op_id = !request->ops().empty() |
648 | 28.8M | ? OpId::FromPB(request->ops().rbegin()->id())8.27M : preceding_id20.5M ; |
649 | 28.8M | if (max_allowed_committed_op_id.index < request->committed_op_id().index()) { |
650 | 2.06k | max_allowed_committed_op_id.ToPB(request->mutable_committed_op_id()); |
651 | 2.06k | } |
652 | | |
653 | 28.8M | if (PREDICT_FALSE(VLOG_IS_ON(2))) { |
654 | 0 | if (request->ops_size() > 0) { |
655 | 0 | VLOG_WITH_PREFIX_UNLOCKED(2) << "Sending request with operations to Peer: " << uuid |
656 | 0 | << ". Size: " << request->ops_size() |
657 | 0 | << ". From: " << request->ops(0).id().ShortDebugString() << ". To: " |
658 | 0 | << request->ops(request->ops_size() - 1).id().ShortDebugString(); |
659 | 0 | VLOG_WITH_PREFIX_UNLOCKED(3) << "Operations: " << yb::ToString(request->ops()); |
660 | 0 | } else { |
661 | 0 | VLOG_WITH_PREFIX_UNLOCKED(2) |
662 | 0 | << "Sending " << (is_new ? "new " : "") << "status only request to Peer: " << uuid |
663 | 0 | << ": " << request->ShortDebugString(); |
664 | 0 | } |
665 | 0 | } |
666 | | |
667 | 28.8M | return Status::OK(); |
668 | 28.8M | } |
669 | | |
670 | | Result<ReadOpsResult> PeerMessageQueue::ReadFromLogCache(int64_t after_index, |
671 | | int64_t to_index, |
672 | | size_t max_batch_size, |
673 | | const std::string& peer_uuid, |
674 | 25.4M | const CoarseTimePoint deadline) { |
675 | 25.4M | DCHECK_LT(FLAGS_consensus_max_batch_size_bytes + 1_KB, FLAGS_rpc_max_message_size); |
676 | | |
677 | | // We try to get the follower's next_index from our log. |
678 | | // Note this is not using "term" and needs to change |
679 | 25.4M | auto result = log_cache_.ReadOps(after_index, to_index, max_batch_size, deadline); |
680 | 25.4M | if (PREDICT_FALSE(!result.ok())) { |
681 | 0 | auto s = result.status(); |
682 | 0 | if (PREDICT_TRUE(s.IsNotFound())) { |
683 | 0 | return s; |
684 | 0 | } else if (s.IsIncomplete()) { |
685 | | // IsIncomplete() means that we tried to read beyond the head of the log (in the future). |
686 | | // KUDU-1078 points to a fix of this log spew issue that we've ported. This should not |
687 | | // happen under normal circumstances. |
688 | 0 | LOG_WITH_PREFIX_UNLOCKED(ERROR) << "Error trying to read ahead of the log " |
689 | 0 | << "while preparing peer request: " |
690 | 0 | << s.ToString() << ". Destination peer: " |
691 | 0 | << peer_uuid; |
692 | 0 | return s; |
693 | 0 | } else { |
694 | 0 | LOG_WITH_PREFIX_UNLOCKED(FATAL) << "Error reading the log while preparing peer request: " |
695 | 0 | << s.ToString() << ". Destination peer: " |
696 | 0 | << peer_uuid; |
697 | 0 | return s; |
698 | 0 | } |
699 | 0 | } |
700 | 25.4M | return result; |
701 | 25.4M | } |
702 | | |
703 | | // Read majority replicated messages from cache for CDC. |
704 | | // CDC producer will use this to get the messages to send in response to cdc::GetChanges RPC. |
705 | | Result<ReadOpsResult> PeerMessageQueue::ReadReplicatedMessagesForCDC( |
706 | | const yb::OpId& last_op_id, |
707 | | int64_t* repl_index, |
708 | 620 | const CoarseTimePoint deadline) { |
709 | | // The batch of messages read from cache. |
710 | | |
711 | 620 | int64_t to_index; |
712 | 620 | bool pending_messages = false; |
713 | 620 | { |
714 | 620 | LockGuard lock(queue_lock_); |
715 | | // Use committed_op_id because it's already been processed by the Transaction codepath. |
716 | 620 | to_index = queue_state_.committed_op_id.index; |
717 | | // Determine if there are pending operations in RAFT but not yet LogCache. |
718 | 620 | pending_messages = to_index != queue_state_.majority_replicated_op_id.index; |
719 | 620 | } |
720 | 620 | if (repl_index) { |
721 | 617 | *repl_index = to_index; |
722 | 617 | } |
723 | | |
724 | 620 | if (last_op_id.index >= to_index) { |
725 | | // Nothing to read. |
726 | 0 | return ReadOpsResult(); |
727 | 0 | } |
728 | | |
729 | | // If an empty OpID is only sent on the first read request, start at the earliest known entry. |
730 | 620 | int64_t after_op_index = last_op_id.empty() ? |
731 | 606 | max(log_cache_.earliest_op_index(), last_op_id.index) : |
732 | 620 | last_op_id.index14 ; |
733 | | |
734 | 620 | auto result = ReadFromLogCache( |
735 | 620 | after_op_index, to_index, FLAGS_consensus_max_batch_size_bytes, local_peer_uuid_, deadline); |
736 | 620 | if (PREDICT_FALSE(!result.ok()) && PREDICT_TRUE0 (result.status().IsNotFound())) { |
737 | 0 | LOG_WITH_PREFIX_UNLOCKED(INFO) << Format( |
738 | 0 | "The logs from index $0 have been garbage collected and cannot be read ($1)", |
739 | 0 | after_op_index, result.status()); |
740 | 0 | } |
741 | 620 | if (result.ok()) { |
742 | 620 | result->have_more_messages = HaveMoreMessages(result->have_more_messages.get() || |
743 | 620 | pending_messages); |
744 | 620 | } |
745 | 620 | return result; |
746 | 620 | } |
747 | | |
748 | | Status PeerMessageQueue::GetRemoteBootstrapRequestForPeer(const string& uuid, |
749 | 10.6k | StartRemoteBootstrapRequestPB* req) { |
750 | 10.6k | TrackedPeer* peer = nullptr; |
751 | 10.6k | { |
752 | 10.6k | LockGuard lock(queue_lock_); |
753 | 10.6k | DCHECK_EQ(queue_state_.state, State::kQueueOpen); |
754 | 10.6k | DCHECK_NE(uuid, local_peer_uuid_); |
755 | 10.6k | peer = FindPtrOrNull(peers_map_, uuid); |
756 | 10.6k | if (PREDICT_FALSE(peer == nullptr || queue_state_.mode == Mode::NON_LEADER)) { |
757 | 0 | return STATUS(NotFound, "Peer not tracked or queue not in leader mode."); |
758 | 0 | } |
759 | 10.6k | } |
760 | | |
761 | 10.6k | if (PREDICT_FALSE(!peer->needs_remote_bootstrap)) { |
762 | 0 | return STATUS(IllegalState, "Peer does not need to remotely bootstrap", uuid); |
763 | 0 | } |
764 | | |
765 | 10.6k | if (peer->member_type == PeerMemberType::VOTER || peer->member_type == PeerMemberType::OBSERVER10.6k ) { |
766 | 5 | LOG(INFO) << "Remote bootstrapping peer " << uuid << " with type " |
767 | 5 | << PeerMemberType_Name(peer->member_type); |
768 | 5 | } |
769 | | |
770 | 10.6k | req->Clear(); |
771 | 10.6k | req->set_dest_uuid(uuid); |
772 | 10.6k | req->set_tablet_id(tablet_id_); |
773 | 10.6k | req->set_bootstrap_peer_uuid(local_peer_uuid_); |
774 | 10.6k | *req->mutable_source_private_addr() = local_peer_pb_.last_known_private_addr(); |
775 | 10.6k | *req->mutable_source_broadcast_addr() = local_peer_pb_.last_known_broadcast_addr(); |
776 | 10.6k | *req->mutable_source_cloud_info() = local_peer_pb_.cloud_info(); |
777 | 10.6k | req->set_caller_term(queue_state_.current_term); |
778 | 10.6k | peer->needs_remote_bootstrap = false; // Now reset the flag. |
779 | 10.6k | return Status::OK(); |
780 | 10.6k | } |
781 | | |
782 | 874 | void PeerMessageQueue::UpdateCDCConsumerOpId(const yb::OpId& op_id) { |
783 | 874 | std::lock_guard<rw_spinlock> l(cdc_consumer_lock_); |
784 | 874 | cdc_consumer_op_id_ = op_id; |
785 | 874 | cdc_consumer_op_id_last_updated_ = CoarseMonoClock::Now(); |
786 | 874 | } |
787 | | |
788 | 34.7M | yb::OpId PeerMessageQueue::GetCDCConsumerOpIdToEvict() { |
789 | 34.7M | std::shared_lock<rw_spinlock> l(cdc_consumer_lock_); |
790 | | // For log cache eviction, we only want to include CDC consumers that are actively polling. |
791 | | // If CDC consumer checkpoint has not been updated recently, we exclude it. |
792 | 34.7M | if (CoarseMonoClock::Now() - cdc_consumer_op_id_last_updated_ <= kCDCConsumerCheckpointInterval) { |
793 | 10.3k | return cdc_consumer_op_id_; |
794 | 34.7M | } else { |
795 | 34.7M | return yb::OpId::Max(); |
796 | 34.7M | } |
797 | 34.7M | } |
798 | | |
799 | 3.82M | yb::OpId PeerMessageQueue::GetCDCConsumerOpIdForIntentRemoval() { |
800 | 3.82M | std::shared_lock<rw_spinlock> l(cdc_consumer_lock_); |
801 | 3.82M | if (CoarseMonoClock::Now() - cdc_consumer_op_id_last_updated_ <= kCDCConsumerIntentRetention) { |
802 | 378k | return cdc_consumer_op_id_; |
803 | 3.44M | } else { |
804 | 3.44M | return yb::OpId::Max(); |
805 | 3.44M | } |
806 | 3.82M | } |
807 | | |
808 | 34.7M | void PeerMessageQueue::UpdateAllReplicatedOpId(OpId* result) { |
809 | 34.7M | OpId new_op_id = OpId::Max(); |
810 | | |
811 | 102M | for (const auto& peer : peers_map_) { |
812 | 102M | if (!peer.second->is_last_exchange_successful) { |
813 | 412k | return; |
814 | 412k | } |
815 | 102M | if (peer.second->last_received.index < new_op_id.index) { |
816 | 39.4M | new_op_id = peer.second->last_received; |
817 | 39.4M | } |
818 | 102M | } |
819 | | |
820 | 34.3M | CHECK_NE(OpId::Max(), new_op_id); |
821 | 34.3M | *result = new_op_id; |
822 | 34.3M | } |
823 | | |
824 | 69.4M | void PeerMessageQueue::UpdateAllAppliedOpId(OpId* result) { |
825 | 69.4M | OpId all_applied_op_id = OpId::Max(); |
826 | 205M | for (const auto& peer : peers_map_) { |
827 | 205M | if (!peer.second->is_last_exchange_successful) { |
828 | 809k | return; |
829 | 809k | } |
830 | 204M | all_applied_op_id = std::min(all_applied_op_id, peer.second->last_applied); |
831 | 204M | } |
832 | | |
833 | 68.6M | CHECK_NE(OpId::Max(), all_applied_op_id); |
834 | 68.6M | *result = all_applied_op_id; |
835 | 68.6M | } |
836 | | |
837 | 34.7M | void PeerMessageQueue::UpdateAllNonLaggingReplicatedOpId(int32_t threshold) { |
838 | 34.7M | OpId new_op_id = OpId::Max(); |
839 | | |
840 | 103M | for (const auto& peer : peers_map_) { |
841 | | // Ignore lagging follower. |
842 | 103M | if (peer.second->current_retransmissions >= threshold) { |
843 | 146k | continue; |
844 | 146k | } |
845 | 103M | if (peer.second->last_received.index < new_op_id.index) { |
846 | 39.8M | new_op_id = peer.second->last_received; |
847 | 39.8M | } |
848 | 103M | } |
849 | | |
850 | 34.7M | if (new_op_id == OpId::Max()) { |
851 | 0 | LOG_WITH_PREFIX_UNLOCKED(INFO) << "Non lagging peer(s) not found."; |
852 | 0 | new_op_id = queue_state_.all_replicated_op_id; |
853 | 0 | } |
854 | | |
855 | 34.7M | if (queue_state_.all_nonlagging_replicated_op_id.index < new_op_id.index) { |
856 | 4.50M | queue_state_.all_nonlagging_replicated_op_id = new_op_id; |
857 | 4.50M | } |
858 | 34.7M | } |
859 | | |
860 | | HAS_MEMBER_FUNCTION(InfiniteWatermarkForLocalPeer); |
861 | | |
862 | | template <class Policy, bool HasMemberFunction_InfiniteWatermarkForLocalPeer> |
863 | | struct GetInfiniteWatermarkForLocalPeer; |
864 | | |
865 | | template <class Policy> |
866 | | struct GetInfiniteWatermarkForLocalPeer<Policy, true> { |
867 | 1.68M | static auto Apply() { |
868 | 1.68M | return Policy::InfiniteWatermarkForLocalPeer(); |
869 | 1.68M | } consensus_queue.cc:yb::consensus::GetInfiniteWatermarkForLocalPeer<yb::consensus::PeerMessageQueue::LeaderLeaseExpirationWatermark()::Policy, true>::Apply() Line | Count | Source | 867 | 844k | static auto Apply() { | 868 | 844k | return Policy::InfiniteWatermarkForLocalPeer(); | 869 | 844k | } |
consensus_queue.cc:yb::consensus::GetInfiniteWatermarkForLocalPeer<yb::consensus::PeerMessageQueue::HybridTimeLeaseExpirationWatermark()::Policy, true>::Apply() Line | Count | Source | 867 | 844k | static auto Apply() { | 868 | 844k | return Policy::InfiniteWatermarkForLocalPeer(); | 869 | 844k | } |
|
870 | | }; |
871 | | |
872 | | template <class Policy> |
873 | | struct GetInfiniteWatermarkForLocalPeer<Policy, false> { |
874 | | // Should not be invoked, but have to define to make compiler happy. |
875 | 0 | static typename Policy::result_type Apply() { |
876 | 0 | LOG(DFATAL) << "Invoked Apply when InfiniteWatermarkForLocalPeer is not defined"; |
877 | 0 | return typename Policy::result_type(); |
878 | 0 | } Unexecuted instantiation: consensus_queue.cc:yb::consensus::GetInfiniteWatermarkForLocalPeer<yb::consensus::PeerMessageQueue::NumSSTFilesWatermark()::Policy, false>::Apply() Unexecuted instantiation: consensus_queue.cc:yb::consensus::GetInfiniteWatermarkForLocalPeer<yb::consensus::PeerMessageQueue::OpIdWatermark()::Policy, false>::Apply() |
879 | | }; |
880 | | |
881 | | template <class Policy> |
882 | 139M | typename Policy::result_type PeerMessageQueue::GetWatermark() { |
883 | 139M | DCHECK(queue_lock_.is_locked()); |
884 | 139M | const auto num_peers_required = queue_state_.majority_size_; |
885 | 139M | if (num_peers_required == kUninitializedMajoritySize) { |
886 | | // We don't even know the quorum majority size yet. |
887 | 0 | return Policy::NotEnoughPeersValue(); |
888 | 0 | } |
889 | 139M | CHECK_GE(num_peers_required, 0); |
890 | | |
891 | 139M | const ssize_t num_peers = peers_map_.size(); |
892 | 139M | if (num_peers < num_peers_required) { |
893 | 964 | return Policy::NotEnoughPeersValue(); |
894 | 964 | } |
895 | | |
896 | | // This flag indicates whether to implicitly assume that the local peer has an "infinite" |
897 | | // replicated value of the dimension that we are computing a watermark for. There is a difference |
898 | | // in logic between handling of OpIds vs. leader leases: |
899 | | // - For OpIds, the local peer might actually be less up-to-date than followers. |
900 | | // - For leader leases, we always assume that we've replicated an "infinite" lease to ourselves. |
901 | 139M | const bool local_peer_infinite_watermark = |
902 | 139M | HasMemberFunction_InfiniteWatermarkForLocalPeer<Policy>::value; |
903 | | |
904 | 139M | if (num_peers_required == 1 && local_peer_infinite_watermark3.37M ) { |
905 | | // We give "infinite lease" to ourselves. |
906 | 1.68M | return GetInfiniteWatermarkForLocalPeer< |
907 | 1.68M | Policy, HasMemberFunction_InfiniteWatermarkForLocalPeer<Policy>::value>::Apply(); |
908 | 1.68M | } |
909 | | |
910 | 137M | constexpr size_t kMaxPracticalReplicationFactor = 5; |
911 | 137M | boost::container::small_vector< |
912 | 137M | typename Policy::result_type, kMaxPracticalReplicationFactor> watermarks; |
913 | 137M | watermarks.reserve(num_peers - 1 + !local_peer_infinite_watermark); |
914 | | |
915 | 411M | for (const PeersMap::value_type &peer_map_entry : peers_map_) { |
916 | 411M | const TrackedPeer &peer = *peer_map_entry.second; |
917 | 411M | if (local_peer_infinite_watermark && peer.uuid == local_peer_uuid_205M ) { |
918 | | // Don't even include the local peer in the watermarks array. Assume it has an "infinite" |
919 | | // value of the watermark. |
920 | 67.8M | continue; |
921 | 67.8M | } |
922 | 344M | if (!IsRaftConfigVoter(peer.uuid, *queue_state_.active_config)) { |
923 | | // Only votes from VOTERs in the active config should be taken into consideration |
924 | 3.26M | continue; |
925 | 3.26M | } |
926 | 340M | if (peer.is_last_exchange_successful) { |
927 | 339M | watermarks.push_back(Policy::ExtractValue(peer)); |
928 | 339M | } |
929 | 340M | } |
930 | | |
931 | | // We always assume that local peer has most recent information. |
932 | 137M | const ssize_t num_responsive_peers = watermarks.size() + local_peer_infinite_watermark; |
933 | | |
934 | 137M | if (num_responsive_peers < num_peers_required) { |
935 | 18.4E | VLOG_WITH_PREFIX_UNLOCKED(2) |
936 | 18.4E | << Policy::Name() << " watermarks by peer: " << ::yb::ToString(watermarks) |
937 | 18.4E | << ", num_peers_required=" << num_peers_required |
938 | 18.4E | << ", num_responsive_peers=" << num_responsive_peers |
939 | 18.4E | << ", not enough responsive peers"; |
940 | | // There are not enough peers with which the last message exchange was successful. |
941 | 239k | return Policy::NotEnoughPeersValue(); |
942 | 239k | } |
943 | | |
944 | | // If there are 5 peers (and num_peers_required is 3), and we have successfully replicated |
945 | | // something to 3 of them and 4th is our local peer, there are two possibilities: |
946 | | // - If local_peer_infinite_watermark is false (for OpId): watermarks.size() is 4, |
947 | | // and we want an OpId value such that 3 or more peers have replicated that or greater value. |
948 | | // Then index_of_interest = 1, computed as watermarks.size() - num_peers_required, or |
949 | | // num_responsive_peers - num_peers_required. |
950 | | // |
951 | | // - If local_peer_infinite_watermark is true (for leader leases): watermarks.size() is 3, and we |
952 | | // are assuming that the local peer (leader) has replicated an infinitely high watermark to |
953 | | // itself. Then watermark.size() is 3 (because we skip the local peer when populating |
954 | | // watermarks), but num_responsive_peers is still 4, and the expression stays the same. |
955 | | |
956 | 137M | const size_t index_of_interest = num_responsive_peers - num_peers_required; |
957 | 137M | DCHECK_LT(index_of_interest, watermarks.size()); |
958 | | |
959 | 137M | auto nth = watermarks.begin() + index_of_interest; |
960 | 137M | std::nth_element(watermarks.begin(), nth, watermarks.end(), typename Policy::Comparator()); |
961 | 137M | VLOG_WITH_PREFIX_UNLOCKED14.0k (2) |
962 | 14.0k | << Policy::Name() << " watermarks by peer: " << ::yb::ToString(watermarks) |
963 | 14.0k | << ", num_peers_required=" << num_peers_required |
964 | 14.0k | << ", local_peer_infinite_watermark=" << local_peer_infinite_watermark |
965 | 14.0k | << ", watermark: " << yb::ToString(*nth); |
966 | | |
967 | 137M | return *nth; |
968 | 137M | } consensus_queue.cc:yb::consensus::PeerMessageQueue::LeaderLeaseExpirationWatermark()::Policy::result_type yb::consensus::PeerMessageQueue::GetWatermark<yb::consensus::PeerMessageQueue::LeaderLeaseExpirationWatermark()::Policy>() Line | Count | Source | 882 | 34.7M | typename Policy::result_type PeerMessageQueue::GetWatermark() { | 883 | 34.7M | DCHECK(queue_lock_.is_locked()); | 884 | 34.7M | const auto num_peers_required = queue_state_.majority_size_; | 885 | 34.7M | if (num_peers_required == kUninitializedMajoritySize) { | 886 | | // We don't even know the quorum majority size yet. | 887 | 0 | return Policy::NotEnoughPeersValue(); | 888 | 0 | } | 889 | 34.7M | CHECK_GE(num_peers_required, 0); | 890 | | | 891 | 34.7M | const ssize_t num_peers = peers_map_.size(); | 892 | 34.7M | if (num_peers < num_peers_required) { | 893 | 241 | return Policy::NotEnoughPeersValue(); | 894 | 241 | } | 895 | | | 896 | | // This flag indicates whether to implicitly assume that the local peer has an "infinite" | 897 | | // replicated value of the dimension that we are computing a watermark for. There is a difference | 898 | | // in logic between handling of OpIds vs. leader leases: | 899 | | // - For OpIds, the local peer might actually be less up-to-date than followers. | 900 | | // - For leader leases, we always assume that we've replicated an "infinite" lease to ourselves. | 901 | 34.7M | const bool local_peer_infinite_watermark = | 902 | 34.7M | HasMemberFunction_InfiniteWatermarkForLocalPeer<Policy>::value; | 903 | | | 904 | 34.7M | if (num_peers_required == 1 && local_peer_infinite_watermark844k ) { | 905 | | // We give "infinite lease" to ourselves. | 906 | 844k | return GetInfiniteWatermarkForLocalPeer< | 907 | 844k | Policy, HasMemberFunction_InfiniteWatermarkForLocalPeer<Policy>::value>::Apply(); | 908 | 844k | } | 909 | | | 910 | 33.9M | constexpr size_t kMaxPracticalReplicationFactor = 5; | 911 | 33.9M | boost::container::small_vector< | 912 | 33.9M | typename Policy::result_type, kMaxPracticalReplicationFactor> watermarks; | 913 | 33.9M | watermarks.reserve(num_peers - 1 + !local_peer_infinite_watermark); | 914 | | | 915 | 102M | for (const PeersMap::value_type &peer_map_entry : peers_map_) { | 916 | 102M | const TrackedPeer &peer = *peer_map_entry.second; | 917 | 102M | if (local_peer_infinite_watermark && peer.uuid == local_peer_uuid_102M ) { | 918 | | // Don't even include the local peer in the watermarks array. Assume it has an "infinite" | 919 | | // value of the watermark. | 920 | 33.9M | continue; | 921 | 33.9M | } | 922 | 68.6M | if (!IsRaftConfigVoter(peer.uuid, *queue_state_.active_config)) { | 923 | | // Only votes from VOTERs in the active config should be taken into consideration | 924 | 812k | continue; | 925 | 812k | } | 926 | 67.8M | if (peer.is_last_exchange_successful) { | 927 | 67.4M | watermarks.push_back(Policy::ExtractValue(peer)); | 928 | 67.4M | } | 929 | 67.8M | } | 930 | | | 931 | | // We always assume that local peer has most recent information. | 932 | 33.9M | const ssize_t num_responsive_peers = watermarks.size() + local_peer_infinite_watermark; | 933 | | | 934 | 33.9M | if (num_responsive_peers < num_peers_required) { | 935 | 59.9k | VLOG_WITH_PREFIX_UNLOCKED44 (2) | 936 | 44 | << Policy::Name() << " watermarks by peer: " << ::yb::ToString(watermarks) | 937 | 44 | << ", num_peers_required=" << num_peers_required | 938 | 44 | << ", num_responsive_peers=" << num_responsive_peers | 939 | 44 | << ", not enough responsive peers"; | 940 | | // There are not enough peers with which the last message exchange was successful. | 941 | 59.9k | return Policy::NotEnoughPeersValue(); | 942 | 59.9k | } | 943 | | | 944 | | // If there are 5 peers (and num_peers_required is 3), and we have successfully replicated | 945 | | // something to 3 of them and 4th is our local peer, there are two possibilities: | 946 | | // - If local_peer_infinite_watermark is false (for OpId): watermarks.size() is 4, | 947 | | // and we want an OpId value such that 3 or more peers have replicated that or greater value. | 948 | | // Then index_of_interest = 1, computed as watermarks.size() - num_peers_required, or | 949 | | // num_responsive_peers - num_peers_required. | 950 | | // | 951 | | // - If local_peer_infinite_watermark is true (for leader leases): watermarks.size() is 3, and we | 952 | | // are assuming that the local peer (leader) has replicated an infinitely high watermark to | 953 | | // itself. Then watermark.size() is 3 (because we skip the local peer when populating | 954 | | // watermarks), but num_responsive_peers is still 4, and the expression stays the same. | 955 | | | 956 | 33.8M | const size_t index_of_interest = num_responsive_peers - num_peers_required; | 957 | 33.8M | DCHECK_LT(index_of_interest, watermarks.size()); | 958 | | | 959 | 33.8M | auto nth = watermarks.begin() + index_of_interest; | 960 | 33.8M | std::nth_element(watermarks.begin(), nth, watermarks.end(), typename Policy::Comparator()); | 961 | 33.8M | VLOG_WITH_PREFIX_UNLOCKED17.0k (2) | 962 | 17.0k | << Policy::Name() << " watermarks by peer: " << ::yb::ToString(watermarks) | 963 | 17.0k | << ", num_peers_required=" << num_peers_required | 964 | 17.0k | << ", local_peer_infinite_watermark=" << local_peer_infinite_watermark | 965 | 17.0k | << ", watermark: " << yb::ToString(*nth); | 966 | | | 967 | 33.8M | return *nth; | 968 | 33.9M | } |
consensus_queue.cc:yb::consensus::PeerMessageQueue::HybridTimeLeaseExpirationWatermark()::Policy::result_type yb::consensus::PeerMessageQueue::GetWatermark<yb::consensus::PeerMessageQueue::HybridTimeLeaseExpirationWatermark()::Policy>() Line | Count | Source | 882 | 34.7M | typename Policy::result_type PeerMessageQueue::GetWatermark() { | 883 | 34.7M | DCHECK(queue_lock_.is_locked()); | 884 | 34.7M | const auto num_peers_required = queue_state_.majority_size_; | 885 | 34.7M | if (num_peers_required == kUninitializedMajoritySize) { | 886 | | // We don't even know the quorum majority size yet. | 887 | 0 | return Policy::NotEnoughPeersValue(); | 888 | 0 | } | 889 | 34.7M | CHECK_GE(num_peers_required, 0); | 890 | | | 891 | 34.7M | const ssize_t num_peers = peers_map_.size(); | 892 | 34.7M | if (num_peers < num_peers_required) { | 893 | 241 | return Policy::NotEnoughPeersValue(); | 894 | 241 | } | 895 | | | 896 | | // This flag indicates whether to implicitly assume that the local peer has an "infinite" | 897 | | // replicated value of the dimension that we are computing a watermark for. There is a difference | 898 | | // in logic between handling of OpIds vs. leader leases: | 899 | | // - For OpIds, the local peer might actually be less up-to-date than followers. | 900 | | // - For leader leases, we always assume that we've replicated an "infinite" lease to ourselves. | 901 | 34.7M | const bool local_peer_infinite_watermark = | 902 | 34.7M | HasMemberFunction_InfiniteWatermarkForLocalPeer<Policy>::value; | 903 | | | 904 | 34.7M | if (num_peers_required == 1 && local_peer_infinite_watermark844k ) { | 905 | | // We give "infinite lease" to ourselves. | 906 | 844k | return GetInfiniteWatermarkForLocalPeer< | 907 | 844k | Policy, HasMemberFunction_InfiniteWatermarkForLocalPeer<Policy>::value>::Apply(); | 908 | 844k | } | 909 | | | 910 | 33.9M | constexpr size_t kMaxPracticalReplicationFactor = 5; | 911 | 33.9M | boost::container::small_vector< | 912 | 33.9M | typename Policy::result_type, kMaxPracticalReplicationFactor> watermarks; | 913 | 33.9M | watermarks.reserve(num_peers - 1 + !local_peer_infinite_watermark); | 914 | | | 915 | 102M | for (const PeersMap::value_type &peer_map_entry : peers_map_) { | 916 | 102M | const TrackedPeer &peer = *peer_map_entry.second; | 917 | 102M | if (local_peer_infinite_watermark && peer.uuid == local_peer_uuid_102M ) { | 918 | | // Don't even include the local peer in the watermarks array. Assume it has an "infinite" | 919 | | // value of the watermark. | 920 | 33.9M | continue; | 921 | 33.9M | } | 922 | 68.6M | if (!IsRaftConfigVoter(peer.uuid, *queue_state_.active_config)) { | 923 | | // Only votes from VOTERs in the active config should be taken into consideration | 924 | 812k | continue; | 925 | 812k | } | 926 | 67.8M | if (peer.is_last_exchange_successful) { | 927 | 67.4M | watermarks.push_back(Policy::ExtractValue(peer)); | 928 | 67.4M | } | 929 | 67.8M | } | 930 | | | 931 | | // We always assume that local peer has most recent information. | 932 | 33.9M | const ssize_t num_responsive_peers = watermarks.size() + local_peer_infinite_watermark; | 933 | | | 934 | 33.9M | if (num_responsive_peers < num_peers_required) { | 935 | 59.9k | VLOG_WITH_PREFIX_UNLOCKED28 (2) | 936 | 28 | << Policy::Name() << " watermarks by peer: " << ::yb::ToString(watermarks) | 937 | 28 | << ", num_peers_required=" << num_peers_required | 938 | 28 | << ", num_responsive_peers=" << num_responsive_peers | 939 | 28 | << ", not enough responsive peers"; | 940 | | // There are not enough peers with which the last message exchange was successful. | 941 | 59.9k | return Policy::NotEnoughPeersValue(); | 942 | 59.9k | } | 943 | | | 944 | | // If there are 5 peers (and num_peers_required is 3), and we have successfully replicated | 945 | | // something to 3 of them and 4th is our local peer, there are two possibilities: | 946 | | // - If local_peer_infinite_watermark is false (for OpId): watermarks.size() is 4, | 947 | | // and we want an OpId value such that 3 or more peers have replicated that or greater value. | 948 | | // Then index_of_interest = 1, computed as watermarks.size() - num_peers_required, or | 949 | | // num_responsive_peers - num_peers_required. | 950 | | // | 951 | | // - If local_peer_infinite_watermark is true (for leader leases): watermarks.size() is 3, and we | 952 | | // are assuming that the local peer (leader) has replicated an infinitely high watermark to | 953 | | // itself. Then watermark.size() is 3 (because we skip the local peer when populating | 954 | | // watermarks), but num_responsive_peers is still 4, and the expression stays the same. | 955 | | | 956 | 33.8M | const size_t index_of_interest = num_responsive_peers - num_peers_required; | 957 | 33.8M | DCHECK_LT(index_of_interest, watermarks.size()); | 958 | | | 959 | 33.8M | auto nth = watermarks.begin() + index_of_interest; | 960 | 33.8M | std::nth_element(watermarks.begin(), nth, watermarks.end(), typename Policy::Comparator()); | 961 | 18.4E | VLOG_WITH_PREFIX_UNLOCKED(2) | 962 | 18.4E | << Policy::Name() << " watermarks by peer: " << ::yb::ToString(watermarks) | 963 | 18.4E | << ", num_peers_required=" << num_peers_required | 964 | 18.4E | << ", local_peer_infinite_watermark=" << local_peer_infinite_watermark | 965 | 18.4E | << ", watermark: " << yb::ToString(*nth); | 966 | | | 967 | 33.8M | return *nth; | 968 | 33.9M | } |
consensus_queue.cc:yb::consensus::PeerMessageQueue::NumSSTFilesWatermark()::Policy::result_type yb::consensus::PeerMessageQueue::GetWatermark<yb::consensus::PeerMessageQueue::NumSSTFilesWatermark()::Policy>() Line | Count | Source | 882 | 34.7M | typename Policy::result_type PeerMessageQueue::GetWatermark() { | 883 | 34.7M | DCHECK(queue_lock_.is_locked()); | 884 | 34.7M | const auto num_peers_required = queue_state_.majority_size_; | 885 | 34.7M | if (num_peers_required == kUninitializedMajoritySize) { | 886 | | // We don't even know the quorum majority size yet. | 887 | 0 | return Policy::NotEnoughPeersValue(); | 888 | 0 | } | 889 | 34.7M | CHECK_GE(num_peers_required, 0); | 890 | | | 891 | 34.7M | const ssize_t num_peers = peers_map_.size(); | 892 | 34.7M | if (num_peers < num_peers_required) { | 893 | 241 | return Policy::NotEnoughPeersValue(); | 894 | 241 | } | 895 | | | 896 | | // This flag indicates whether to implicitly assume that the local peer has an "infinite" | 897 | | // replicated value of the dimension that we are computing a watermark for. There is a difference | 898 | | // in logic between handling of OpIds vs. leader leases: | 899 | | // - For OpIds, the local peer might actually be less up-to-date than followers. | 900 | | // - For leader leases, we always assume that we've replicated an "infinite" lease to ourselves. | 901 | 34.7M | const bool local_peer_infinite_watermark = | 902 | 34.7M | HasMemberFunction_InfiniteWatermarkForLocalPeer<Policy>::value; | 903 | | | 904 | 34.7M | if (num_peers_required == 1 && local_peer_infinite_watermark845k ) { | 905 | | // We give "infinite lease" to ourselves. | 906 | 0 | return GetInfiniteWatermarkForLocalPeer< | 907 | 0 | Policy, HasMemberFunction_InfiniteWatermarkForLocalPeer<Policy>::value>::Apply(); | 908 | 0 | } | 909 | | | 910 | 34.7M | constexpr size_t kMaxPracticalReplicationFactor = 5; | 911 | 34.7M | boost::container::small_vector< | 912 | 34.7M | typename Policy::result_type, kMaxPracticalReplicationFactor> watermarks; | 913 | 34.7M | watermarks.reserve(num_peers - 1 + !local_peer_infinite_watermark); | 914 | | | 915 | 103M | for (const PeersMap::value_type &peer_map_entry : peers_map_) { | 916 | 103M | const TrackedPeer &peer = *peer_map_entry.second; | 917 | 103M | if (local_peer_infinite_watermark && peer.uuid == local_peer_uuid_0 ) { | 918 | | // Don't even include the local peer in the watermarks array. Assume it has an "infinite" | 919 | | // value of the watermark. | 920 | 0 | continue; | 921 | 0 | } | 922 | 103M | if (!IsRaftConfigVoter(peer.uuid, *queue_state_.active_config)) { | 923 | | // Only votes from VOTERs in the active config should be taken into consideration | 924 | 822k | continue; | 925 | 822k | } | 926 | 102M | if (peer.is_last_exchange_successful) { | 927 | 102M | watermarks.push_back(Policy::ExtractValue(peer)); | 928 | 102M | } | 929 | 102M | } | 930 | | | 931 | | // We always assume that local peer has most recent information. | 932 | 34.7M | const ssize_t num_responsive_peers = watermarks.size() + local_peer_infinite_watermark; | 933 | | | 934 | 34.7M | if (num_responsive_peers < num_peers_required) { | 935 | 18.4E | VLOG_WITH_PREFIX_UNLOCKED(2) | 936 | 18.4E | << Policy::Name() << " watermarks by peer: " << ::yb::ToString(watermarks) | 937 | 18.4E | << ", num_peers_required=" << num_peers_required | 938 | 18.4E | << ", num_responsive_peers=" << num_responsive_peers | 939 | 18.4E | << ", not enough responsive peers"; | 940 | | // There are not enough peers with which the last message exchange was successful. | 941 | 60.0k | return Policy::NotEnoughPeersValue(); | 942 | 60.0k | } | 943 | | | 944 | | // If there are 5 peers (and num_peers_required is 3), and we have successfully replicated | 945 | | // something to 3 of them and 4th is our local peer, there are two possibilities: | 946 | | // - If local_peer_infinite_watermark is false (for OpId): watermarks.size() is 4, | 947 | | // and we want an OpId value such that 3 or more peers have replicated that or greater value. | 948 | | // Then index_of_interest = 1, computed as watermarks.size() - num_peers_required, or | 949 | | // num_responsive_peers - num_peers_required. | 950 | | // | 951 | | // - If local_peer_infinite_watermark is true (for leader leases): watermarks.size() is 3, and we | 952 | | // are assuming that the local peer (leader) has replicated an infinitely high watermark to | 953 | | // itself. Then watermark.size() is 3 (because we skip the local peer when populating | 954 | | // watermarks), but num_responsive_peers is still 4, and the expression stays the same. | 955 | | | 956 | 34.6M | const size_t index_of_interest = num_responsive_peers - num_peers_required; | 957 | 34.6M | DCHECK_LT(index_of_interest, watermarks.size()); | 958 | | | 959 | 34.6M | auto nth = watermarks.begin() + index_of_interest; | 960 | 34.6M | std::nth_element(watermarks.begin(), nth, watermarks.end(), typename Policy::Comparator()); | 961 | 34.6M | VLOG_WITH_PREFIX_UNLOCKED30 (2) | 962 | 30 | << Policy::Name() << " watermarks by peer: " << ::yb::ToString(watermarks) | 963 | 30 | << ", num_peers_required=" << num_peers_required | 964 | 30 | << ", local_peer_infinite_watermark=" << local_peer_infinite_watermark | 965 | 30 | << ", watermark: " << yb::ToString(*nth); | 966 | | | 967 | 34.6M | return *nth; | 968 | 34.7M | } |
consensus_queue.cc:yb::consensus::PeerMessageQueue::OpIdWatermark()::Policy::result_type yb::consensus::PeerMessageQueue::GetWatermark<yb::consensus::PeerMessageQueue::OpIdWatermark()::Policy>() Line | Count | Source | 882 | 34.7M | typename Policy::result_type PeerMessageQueue::GetWatermark() { | 883 | 34.7M | DCHECK(queue_lock_.is_locked()); | 884 | 34.7M | const auto num_peers_required = queue_state_.majority_size_; | 885 | 34.7M | if (num_peers_required == kUninitializedMajoritySize) { | 886 | | // We don't even know the quorum majority size yet. | 887 | 0 | return Policy::NotEnoughPeersValue(); | 888 | 0 | } | 889 | 34.7M | CHECK_GE(num_peers_required, 0); | 890 | | | 891 | 34.7M | const ssize_t num_peers = peers_map_.size(); | 892 | 34.7M | if (num_peers < num_peers_required) { | 893 | 241 | return Policy::NotEnoughPeersValue(); | 894 | 241 | } | 895 | | | 896 | | // This flag indicates whether to implicitly assume that the local peer has an "infinite" | 897 | | // replicated value of the dimension that we are computing a watermark for. There is a difference | 898 | | // in logic between handling of OpIds vs. leader leases: | 899 | | // - For OpIds, the local peer might actually be less up-to-date than followers. | 900 | | // - For leader leases, we always assume that we've replicated an "infinite" lease to ourselves. | 901 | 34.7M | const bool local_peer_infinite_watermark = | 902 | 34.7M | HasMemberFunction_InfiniteWatermarkForLocalPeer<Policy>::value; | 903 | | | 904 | 34.7M | if (num_peers_required == 1 && local_peer_infinite_watermark844k ) { | 905 | | // We give "infinite lease" to ourselves. | 906 | 0 | return GetInfiniteWatermarkForLocalPeer< | 907 | 0 | Policy, HasMemberFunction_InfiniteWatermarkForLocalPeer<Policy>::value>::Apply(); | 908 | 0 | } | 909 | | | 910 | 34.7M | constexpr size_t kMaxPracticalReplicationFactor = 5; | 911 | 34.7M | boost::container::small_vector< | 912 | 34.7M | typename Policy::result_type, kMaxPracticalReplicationFactor> watermarks; | 913 | 34.7M | watermarks.reserve(num_peers - 1 + !local_peer_infinite_watermark); | 914 | | | 915 | 103M | for (const PeersMap::value_type &peer_map_entry : peers_map_) { | 916 | 103M | const TrackedPeer &peer = *peer_map_entry.second; | 917 | 103M | if (local_peer_infinite_watermark && peer.uuid == local_peer_uuid_0 ) { | 918 | | // Don't even include the local peer in the watermarks array. Assume it has an "infinite" | 919 | | // value of the watermark. | 920 | 0 | continue; | 921 | 0 | } | 922 | 103M | if (!IsRaftConfigVoter(peer.uuid, *queue_state_.active_config)) { | 923 | | // Only votes from VOTERs in the active config should be taken into consideration | 924 | 821k | continue; | 925 | 821k | } | 926 | 102M | if (peer.is_last_exchange_successful) { | 927 | 102M | watermarks.push_back(Policy::ExtractValue(peer)); | 928 | 102M | } | 929 | 102M | } | 930 | | | 931 | | // We always assume that local peer has most recent information. | 932 | 34.7M | const ssize_t num_responsive_peers = watermarks.size() + local_peer_infinite_watermark; | 933 | | | 934 | 34.7M | if (num_responsive_peers < num_peers_required) { | 935 | 18.4E | VLOG_WITH_PREFIX_UNLOCKED(2) | 936 | 18.4E | << Policy::Name() << " watermarks by peer: " << ::yb::ToString(watermarks) | 937 | 18.4E | << ", num_peers_required=" << num_peers_required | 938 | 18.4E | << ", num_responsive_peers=" << num_responsive_peers | 939 | 18.4E | << ", not enough responsive peers"; | 940 | | // There are not enough peers with which the last message exchange was successful. | 941 | 59.9k | return Policy::NotEnoughPeersValue(); | 942 | 59.9k | } | 943 | | | 944 | | // If there are 5 peers (and num_peers_required is 3), and we have successfully replicated | 945 | | // something to 3 of them and 4th is our local peer, there are two possibilities: | 946 | | // - If local_peer_infinite_watermark is false (for OpId): watermarks.size() is 4, | 947 | | // and we want an OpId value such that 3 or more peers have replicated that or greater value. | 948 | | // Then index_of_interest = 1, computed as watermarks.size() - num_peers_required, or | 949 | | // num_responsive_peers - num_peers_required. | 950 | | // | 951 | | // - If local_peer_infinite_watermark is true (for leader leases): watermarks.size() is 3, and we | 952 | | // are assuming that the local peer (leader) has replicated an infinitely high watermark to | 953 | | // itself. Then watermark.size() is 3 (because we skip the local peer when populating | 954 | | // watermarks), but num_responsive_peers is still 4, and the expression stays the same. | 955 | | | 956 | 34.6M | const size_t index_of_interest = num_responsive_peers - num_peers_required; | 957 | 34.6M | DCHECK_LT(index_of_interest, watermarks.size()); | 958 | | | 959 | 34.6M | auto nth = watermarks.begin() + index_of_interest; | 960 | 34.6M | std::nth_element(watermarks.begin(), nth, watermarks.end(), typename Policy::Comparator()); | 961 | 34.6M | VLOG_WITH_PREFIX_UNLOCKED6.48k (2) | 962 | 6.48k | << Policy::Name() << " watermarks by peer: " << ::yb::ToString(watermarks) | 963 | 6.48k | << ", num_peers_required=" << num_peers_required | 964 | 6.48k | << ", local_peer_infinite_watermark=" << local_peer_infinite_watermark | 965 | 6.48k | << ", watermark: " << yb::ToString(*nth); | 966 | | | 967 | 34.6M | return *nth; | 968 | 34.7M | } |
|
969 | | |
970 | 34.7M | CoarseTimePoint PeerMessageQueue::LeaderLeaseExpirationWatermark() { |
971 | 34.7M | struct Policy { |
972 | 34.7M | typedef CoarseTimePoint result_type; |
973 | | // Workaround for a gcc bug. That does not understand that Comparator is actually being used. |
974 | 34.7M | __attribute__((unused)) typedef std::less<result_type> Comparator; |
975 | | |
976 | 34.7M | static result_type NotEnoughPeersValue() { |
977 | 60.1k | return result_type::min(); |
978 | 60.1k | } |
979 | | |
980 | 34.7M | static result_type InfiniteWatermarkForLocalPeer() { |
981 | 844k | return result_type::max(); |
982 | 844k | } |
983 | | |
984 | 67.4M | static result_type ExtractValue(const TrackedPeer& peer) { |
985 | 67.4M | auto lease_exp = peer.leader_lease_expiration.last_received; |
986 | 67.4M | return lease_exp != CoarseTimePoint() ? lease_exp67.4M : CoarseTimePoint::min()38.3k ; |
987 | 67.4M | } |
988 | | |
989 | 34.7M | static const char* Name() { |
990 | 1 | return "Leader lease expiration"; |
991 | 1 | } |
992 | 34.7M | }; |
993 | | |
994 | 34.7M | return GetWatermark<Policy>(); |
995 | 34.7M | } |
996 | | |
997 | 34.7M | MicrosTime PeerMessageQueue::HybridTimeLeaseExpirationWatermark() { |
998 | 34.7M | struct Policy { |
999 | 34.7M | typedef MicrosTime result_type; |
1000 | | // Workaround for a gcc bug. That does not understand that Comparator is actually being used. |
1001 | 34.7M | __attribute__((unused)) typedef std::less<result_type> Comparator; |
1002 | | |
1003 | 34.7M | static result_type NotEnoughPeersValue() { |
1004 | 60.1k | return HybridTime::kMin.GetPhysicalValueMicros(); |
1005 | 60.1k | } |
1006 | | |
1007 | 34.7M | static result_type InfiniteWatermarkForLocalPeer() { |
1008 | 844k | return HybridTime::kMax.GetPhysicalValueMicros(); |
1009 | 844k | } |
1010 | | |
1011 | 67.4M | static result_type ExtractValue(const TrackedPeer& peer) { |
1012 | 67.4M | return peer.leader_ht_lease_expiration.last_received; |
1013 | 67.4M | } |
1014 | | |
1015 | 34.7M | static const char* Name() { |
1016 | 1 | return "Hybrid time leader lease expiration"; |
1017 | 1 | } |
1018 | 34.7M | }; |
1019 | | |
1020 | 34.7M | return GetWatermark<Policy>(); |
1021 | 34.7M | } |
1022 | | |
1023 | 34.7M | uint64_t PeerMessageQueue::NumSSTFilesWatermark() { |
1024 | 34.7M | struct Policy { |
1025 | 34.7M | typedef uint64_t result_type; |
1026 | | // Workaround for a gcc bug. That does not understand that Comparator is actually being used. |
1027 | 34.7M | __attribute__((unused)) typedef std::greater<result_type> Comparator; |
1028 | | |
1029 | 34.7M | static result_type NotEnoughPeersValue() { |
1030 | 60.3k | return 0; |
1031 | 60.3k | } |
1032 | | |
1033 | 102M | static result_type ExtractValue(const TrackedPeer& peer) { |
1034 | 102M | return peer.num_sst_files; |
1035 | 102M | } |
1036 | | |
1037 | 34.7M | static const char* Name() { |
1038 | 1 | return "Num SST files"; |
1039 | 1 | } |
1040 | 34.7M | }; |
1041 | | |
1042 | 34.7M | auto watermark = GetWatermark<Policy>(); |
1043 | 34.7M | return std::max(watermark, local_peer_->num_sst_files); |
1044 | 34.7M | } |
1045 | | |
1046 | 34.7M | OpId PeerMessageQueue::OpIdWatermark() { |
1047 | 34.7M | struct Policy { |
1048 | 34.7M | typedef OpId result_type; |
1049 | | |
1050 | 34.7M | static result_type NotEnoughPeersValue() { |
1051 | 60.3k | return OpId::Min(); |
1052 | 60.3k | } |
1053 | | |
1054 | 102M | static result_type ExtractValue(const TrackedPeer& peer) { |
1055 | 102M | return peer.last_received; |
1056 | 102M | } |
1057 | | |
1058 | 34.7M | struct Comparator { |
1059 | 73.8M | bool operator()(const OpId& lhs, const OpId& rhs) { |
1060 | 73.8M | return lhs.index < rhs.index; |
1061 | 73.8M | } |
1062 | 34.7M | }; |
1063 | | |
1064 | 34.7M | static const char* Name() { |
1065 | 1 | return "OpId"; |
1066 | 1 | } |
1067 | 34.7M | }; |
1068 | | |
1069 | 34.7M | return GetWatermark<Policy>(); |
1070 | 34.7M | } |
1071 | | |
1072 | 15.7k | void PeerMessageQueue::NotifyPeerIsResponsiveDespiteError(const std::string& peer_uuid) { |
1073 | 15.7k | LockGuard l(queue_lock_); |
1074 | 15.7k | TrackedPeer* peer = FindPtrOrNull(peers_map_, peer_uuid); |
1075 | 15.7k | if (!peer) return0 ; |
1076 | 15.7k | peer->last_successful_communication_time = MonoTime::Now(); |
1077 | 15.7k | } |
1078 | | |
1079 | 23.2k | void PeerMessageQueue::RequestWasNotSent(const std::string& peer_uuid) { |
1080 | 23.2k | LockGuard scoped_lock(queue_lock_); |
1081 | 23.2k | DCHECK_NE(State::kQueueConstructed, queue_state_.state); |
1082 | | |
1083 | 23.2k | TrackedPeer* peer = FindPtrOrNull(peers_map_, peer_uuid); |
1084 | 23.2k | if (PREDICT_FALSE(queue_state_.state != State::kQueueOpen || peer == nullptr)) { |
1085 | 0 | LOG_WITH_PREFIX_UNLOCKED(WARNING) << "Queue is closed or peer was untracked."; |
1086 | 0 | return; |
1087 | 0 | } |
1088 | | |
1089 | 23.2k | peer->ResetLastRequest(); |
1090 | 23.2k | } |
1091 | | |
1092 | | |
1093 | | bool PeerMessageQueue::ResponseFromPeer(const std::string& peer_uuid, |
1094 | 34.8M | const ConsensusResponsePB& response) { |
1095 | 34.8M | DCHECK(response.IsInitialized()) << "Error: Uninitialized: " |
1096 | 4.53k | << response.InitializationErrorString() << ". Response: " << response.ShortDebugString(); |
1097 | | |
1098 | 34.8M | MajorityReplicatedData majority_replicated; |
1099 | 34.8M | Mode mode_copy; |
1100 | 34.8M | bool result = false; |
1101 | 34.8M | { |
1102 | 34.8M | LockGuard scoped_lock(queue_lock_); |
1103 | 34.8M | DCHECK_NE(State::kQueueConstructed, queue_state_.state); |
1104 | | |
1105 | 34.8M | TrackedPeer* peer = FindPtrOrNull(peers_map_, peer_uuid); |
1106 | 34.8M | if (PREDICT_FALSE(queue_state_.state != State::kQueueOpen || peer == nullptr)) { |
1107 | 3 | LOG_WITH_PREFIX_UNLOCKED(WARNING) << "Queue is closed or peer was untracked, disregarding " |
1108 | 3 | "peer response. Response: " << response.ShortDebugString(); |
1109 | 3 | return false; |
1110 | 3 | } |
1111 | | |
1112 | | // Remotely bootstrap the peer if the tablet is not found or deleted. |
1113 | 34.8M | if (response.has_error()) { |
1114 | | // We only let special types of errors through to this point from the peer. |
1115 | 10.6k | CHECK_EQ(tserver::TabletServerErrorPB::TABLET_NOT_FOUND, response.error().code()) |
1116 | 0 | << response.ShortDebugString(); |
1117 | | |
1118 | 10.6k | peer->needs_remote_bootstrap = true; |
1119 | | // Since we received a response from the peer, we know it is alive. So we need to update |
1120 | | // peer->last_successful_communication_time, otherwise, we will remove this peer from the |
1121 | | // configuration if the remote bootstrap is not completed within |
1122 | | // FLAGS_follower_unavailable_considered_failed_sec seconds. |
1123 | 10.6k | peer->last_successful_communication_time = MonoTime::Now(); |
1124 | 10.6k | YB_LOG_WITH_PREFIX_UNLOCKED_EVERY_N_SECS485 (INFO, 30) |
1125 | 485 | << "Marked peer as needing remote bootstrap: " << peer->ToString(); |
1126 | 10.6k | return true; |
1127 | 10.6k | } |
1128 | | |
1129 | 34.8M | if (34.8M queue_state_.active_config34.8M ) { |
1130 | 34.8M | RaftPeerPB peer_pb; |
1131 | 34.8M | if (!GetRaftConfigMember(*queue_state_.active_config, peer_uuid, &peer_pb).ok()) { |
1132 | 0 | LOG(FATAL) << "Peer " << peer_uuid << " not in active config"; |
1133 | 0 | } |
1134 | 34.8M | peer->member_type = peer_pb.member_type(); |
1135 | 18.4E | } else { |
1136 | 18.4E | peer->member_type = PeerMemberType::UNKNOWN_MEMBER_TYPE; |
1137 | 18.4E | } |
1138 | | |
1139 | | // Application level errors should be handled elsewhere |
1140 | 34.8M | DCHECK(!response.has_error()); |
1141 | | |
1142 | | // Take a snapshot of the current peer status. |
1143 | 34.8M | TrackedPeer previous = *peer; |
1144 | | |
1145 | | // Update the peer status based on the response. |
1146 | 34.8M | peer->is_new = false; |
1147 | 34.8M | peer->last_successful_communication_time = MonoTime::Now(); |
1148 | | |
1149 | 34.8M | peer->ResetLastRequest(); |
1150 | | |
1151 | 34.8M | if (response.has_status()) { |
1152 | 34.8M | const auto& status = response.status(); |
1153 | | // Sanity checks. Some of these can be eventually removed, but they are handy for now. |
1154 | 34.8M | DCHECK(status.IsInitialized()) << "Error: Uninitialized: " |
1155 | 141 | << response.InitializationErrorString() |
1156 | 141 | << ". Response: " << response.ShortDebugString(); |
1157 | | // The status must always have a last received op id and a last committed index. |
1158 | 34.8M | DCHECK(status.has_last_received()); |
1159 | 34.8M | DCHECK(status.has_last_received_current_leader()); |
1160 | 34.8M | DCHECK(status.has_last_committed_idx()); |
1161 | | |
1162 | 34.8M | peer->last_known_committed_idx = status.last_committed_idx(); |
1163 | 34.8M | peer->last_applied = OpId::FromPB(status.last_applied()); |
1164 | | |
1165 | | // If the reported last-received op for the replica is in our local log, then resume sending |
1166 | | // entries from that point onward. Otherwise, resume after the last op they received from us. |
1167 | | // If we've never successfully sent them anything, start after the last-committed op in their |
1168 | | // log, which is guaranteed by the Raft protocol to be a valid op. |
1169 | | |
1170 | 34.8M | bool peer_has_prefix_of_log = IsOpInLog(yb::OpId::FromPB(status.last_received())); |
1171 | 34.8M | if (peer_has_prefix_of_log34.8M ) { |
1172 | | // If the latest thing in their log is in our log, we are in sync. |
1173 | 34.8M | peer->last_received = OpId::FromPB(status.last_received()); |
1174 | 34.8M | peer->next_index = peer->last_received.index + 1; |
1175 | | |
1176 | 18.4E | } else if (!OpIdEquals(status.last_received_current_leader(), MinimumOpId())) { |
1177 | | // Their log may have diverged from ours, however we are in the process of replicating our |
1178 | | // ops to them, so continue doing so. Eventually, we will cause the divergent entry in their |
1179 | | // log to be overwritten. |
1180 | 53 | peer->last_received = OpId::FromPB(status.last_received_current_leader()); |
1181 | 53 | peer->next_index = peer->last_received.index + 1; |
1182 | 18.4E | } else { |
1183 | | // The peer is divergent and they have not (successfully) received anything from us yet. |
1184 | | // Start sending from their last committed index. This logic differs from the Raft spec |
1185 | | // slightly because instead of stepping back one-by-one from the end until we no longer have |
1186 | | // an LMP error, we jump back to the last committed op indicated by the peer with the hope |
1187 | | // that doing so will result in a faster catch-up process. |
1188 | 18.4E | DCHECK_GE(peer->last_known_committed_idx, 0); |
1189 | 18.4E | peer->next_index = peer->last_known_committed_idx + 1; |
1190 | 18.4E | } |
1191 | | |
1192 | 34.8M | if (PREDICT_FALSE(status.has_error())) { |
1193 | 121k | peer->is_last_exchange_successful = false; |
1194 | 121k | switch (status.error().code()) { |
1195 | 120k | case ConsensusErrorPB::PRECEDING_ENTRY_DIDNT_MATCH: { |
1196 | 120k | DCHECK(status.has_last_received()); |
1197 | 120k | if (previous.is_new) { |
1198 | | // That's currently how we can detect that we able to connect to a peer. |
1199 | 120k | LOG_WITH_PREFIX_UNLOCKED(INFO) << "Connected to new peer: " << peer->ToString(); |
1200 | 120k | } else { |
1201 | 47 | LOG_WITH_PREFIX_UNLOCKED(INFO) << "Got LMP mismatch error from peer: " |
1202 | 47 | << peer->ToString(); |
1203 | 47 | CHECK(!FLAGS_TEST_disallow_lmp_failures); |
1204 | 47 | } |
1205 | 120k | return true; |
1206 | 0 | } |
1207 | 877 | case ConsensusErrorPB::INVALID_TERM: { |
1208 | 877 | CHECK(response.has_responder_term()); |
1209 | 877 | LOG_WITH_PREFIX_UNLOCKED(INFO) << "Peer responded invalid term: " << peer->ToString() |
1210 | 877 | << ". Peer's new term: " << response.responder_term(); |
1211 | 877 | NotifyObserversOfTermChange(response.responder_term()); |
1212 | 877 | return false; |
1213 | 0 | } |
1214 | 0 | default: { |
1215 | 0 | LOG_WITH_PREFIX_UNLOCKED(FATAL) << "Unexpected consensus error. Code: " |
1216 | 0 | << ConsensusErrorPB::Code_Name(status.error().code()) << ". Response: " |
1217 | 0 | << response.ShortDebugString(); |
1218 | 0 | } |
1219 | 121k | } |
1220 | 121k | } |
1221 | 34.8M | } |
1222 | | |
1223 | 34.7M | peer->is_last_exchange_successful = true; |
1224 | 34.7M | peer->num_sst_files = response.num_sst_files(); |
1225 | | |
1226 | 34.7M | if (response.has_responder_term()) { |
1227 | | // The peer must have responded with a term that is greater than or equal to the last known |
1228 | | // term for that peer. |
1229 | 25.4M | peer->CheckMonotonicTerms(response.responder_term()); |
1230 | | |
1231 | | // If the responder didn't send an error back that must mean that it has a term that is the |
1232 | | // same or lower than ours. |
1233 | 25.4M | CHECK_LE(response.responder_term(), queue_state_.current_term); |
1234 | 25.4M | } |
1235 | | |
1236 | 34.7M | if (PREDICT_FALSE(VLOG_IS_ON(2))) { |
1237 | 1 | VLOG_WITH_PREFIX_UNLOCKED0 (2) << "Received Response from Peer (" << peer->ToString() << "). " |
1238 | 0 | << "Response: " << response.ShortDebugString(); |
1239 | 1 | } |
1240 | | |
1241 | | // If our log has the next request for the peer or if the peer's committed index is lower than |
1242 | | // our own, set 'more_pending' to true. |
1243 | 34.7M | result = log_cache_.HasOpBeenWritten(peer->next_index) || |
1244 | 34.7M | (peer->last_known_committed_idx < queue_state_.committed_op_id.index)32.4M ; |
1245 | | |
1246 | 34.7M | mode_copy = queue_state_.mode; |
1247 | 34.7M | if (mode_copy == Mode::LEADER34.7M ) { |
1248 | 34.7M | auto new_majority_replicated_opid = OpIdWatermark(); |
1249 | 34.7M | if (new_majority_replicated_opid != OpId::Min()) { |
1250 | 34.6M | if (new_majority_replicated_opid.index == MaximumOpId().index()) { |
1251 | 0 | queue_state_.majority_replicated_op_id = local_peer_->last_received; |
1252 | 34.6M | } else { |
1253 | 34.6M | queue_state_.majority_replicated_op_id = new_majority_replicated_opid; |
1254 | 34.6M | } |
1255 | 34.6M | } |
1256 | | |
1257 | 34.7M | peer->leader_lease_expiration.OnReplyFromFollower(); |
1258 | 34.7M | peer->leader_ht_lease_expiration.OnReplyFromFollower(); |
1259 | | |
1260 | 34.7M | majority_replicated.op_id = queue_state_.majority_replicated_op_id; |
1261 | 34.7M | majority_replicated.leader_lease_expiration = LeaderLeaseExpirationWatermark(); |
1262 | 34.7M | majority_replicated.ht_lease_expiration = HybridTimeLeaseExpirationWatermark(); |
1263 | 34.7M | majority_replicated.num_sst_files = NumSSTFilesWatermark(); |
1264 | 34.7M | if (peer->last_received == queue_state_.last_applied_op_id) { |
1265 | 23.3M | majority_replicated.peer_got_all_ops = peer->uuid; |
1266 | 23.3M | } |
1267 | 34.7M | } |
1268 | | |
1269 | 34.7M | UpdateAllReplicatedOpId(&queue_state_.all_replicated_op_id); |
1270 | 34.7M | UpdateAllAppliedOpId(&queue_state_.all_applied_op_id); |
1271 | | |
1272 | 34.7M | auto evict_index = GetCDCConsumerOpIdToEvict().index; |
1273 | | |
1274 | 34.7M | int32_t lagging_follower_threshold = FLAGS_consensus_lagging_follower_threshold; |
1275 | 34.7M | if (lagging_follower_threshold > 0) { |
1276 | 34.7M | UpdateAllNonLaggingReplicatedOpId(lagging_follower_threshold); |
1277 | 34.7M | evict_index = std::min(evict_index, queue_state_.all_nonlagging_replicated_op_id.index); |
1278 | 34.7M | } else { |
1279 | 299 | evict_index = std::min(evict_index, queue_state_.all_replicated_op_id.index); |
1280 | 299 | } |
1281 | | |
1282 | 34.7M | log_cache_.EvictThroughOp(evict_index); |
1283 | | |
1284 | 34.7M | UpdateMetrics(); |
1285 | 34.7M | } |
1286 | | |
1287 | 34.7M | if (mode_copy == Mode::LEADER34.7M ) { |
1288 | 34.7M | NotifyObserversOfMajorityReplOpChange(majority_replicated); |
1289 | 34.7M | } |
1290 | | |
1291 | 34.7M | return result; |
1292 | 34.8M | } |
1293 | | |
1294 | 10 | PeerMessageQueue::TrackedPeer PeerMessageQueue::GetTrackedPeerForTests(string uuid) { |
1295 | 10 | LockGuard scoped_lock(queue_lock_); |
1296 | 10 | TrackedPeer* tracked = FindOrDie(peers_map_, uuid); |
1297 | 10 | return *tracked; |
1298 | 10 | } |
1299 | | |
1300 | 13 | OpId PeerMessageQueue::TEST_GetAllReplicatedIndex() const { |
1301 | 13 | LockGuard lock(queue_lock_); |
1302 | 13 | return queue_state_.all_replicated_op_id; |
1303 | 13 | } |
1304 | | |
1305 | 65 | OpId PeerMessageQueue::GetAllAppliedOpId() const { |
1306 | 65 | LockGuard lock(queue_lock_); |
1307 | 65 | return queue_state_.all_applied_op_id; |
1308 | 65 | } |
1309 | | |
1310 | 5 | OpId PeerMessageQueue::TEST_GetCommittedIndex() const { |
1311 | 5 | LockGuard lock(queue_lock_); |
1312 | 5 | return queue_state_.committed_op_id; |
1313 | 5 | } |
1314 | | |
1315 | 11 | OpId PeerMessageQueue::TEST_GetMajorityReplicatedOpId() const { |
1316 | 11 | LockGuard lock(queue_lock_); |
1317 | 11 | return queue_state_.majority_replicated_op_id; |
1318 | 11 | } |
1319 | | |
1320 | 2 | OpId PeerMessageQueue::TEST_GetLastAppended() const { |
1321 | 2 | LockGuard lock(queue_lock_); |
1322 | 2 | return queue_state_.last_appended; |
1323 | 2 | } |
1324 | | |
1325 | 16 | OpId PeerMessageQueue::TEST_GetLastAppliedOpId() const { |
1326 | 16 | LockGuard lock(queue_lock_); |
1327 | 16 | return queue_state_.last_applied_op_id; |
1328 | 16 | } |
1329 | | |
1330 | 63.0M | void PeerMessageQueue::UpdateMetrics() { |
1331 | | // Since operations have consecutive indices we can update the metrics based on simple index math. |
1332 | 63.0M | metrics_.num_majority_done_ops->set_value( |
1333 | 63.0M | queue_state_.committed_op_id.index - queue_state_.all_replicated_op_id.index); |
1334 | 63.0M | metrics_.num_in_progress_ops->set_value( |
1335 | 63.0M | queue_state_.last_appended.index - queue_state_.committed_op_id.index); |
1336 | 63.0M | } |
1337 | | |
1338 | 38 | void PeerMessageQueue::DumpToHtml(std::ostream& out) const { |
1339 | 38 | using std::endl; |
1340 | | |
1341 | 38 | LockGuard lock(queue_lock_); |
1342 | 38 | out << "<h3>Watermarks</h3>" << endl; |
1343 | 38 | out << "<table>" << endl;; |
1344 | 38 | out << " <tr><th>Peer</th><th>Watermark</th></tr>" << endl; |
1345 | 101 | for (const PeersMap::value_type& entry : peers_map_) { |
1346 | 101 | out << Substitute(" <tr><td>$0</td><td>$1</td></tr>", |
1347 | 101 | EscapeForHtmlToString(entry.first), |
1348 | 101 | EscapeForHtmlToString(entry.second->ToString())) << endl; |
1349 | 101 | } |
1350 | 38 | out << "</table>" << endl; |
1351 | | |
1352 | 38 | log_cache_.DumpToHtml(out); |
1353 | 38 | } |
1354 | | |
1355 | 151k | void PeerMessageQueue::ClearUnlocked() { |
1356 | 151k | STLDeleteValues(&peers_map_); |
1357 | 151k | queue_state_.state = State::kQueueClosed; |
1358 | 151k | } |
1359 | | |
1360 | 151k | void PeerMessageQueue::Close() { |
1361 | 151k | if (installed_num_sst_files_changed_listener_) { |
1362 | 75.6k | context_->ListenNumSSTFilesChanged(std::function<void()>()); |
1363 | 75.6k | installed_num_sst_files_changed_listener_ = false; |
1364 | 75.6k | } |
1365 | 151k | raft_pool_observers_token_->Shutdown(); |
1366 | 151k | LockGuard lock(queue_lock_); |
1367 | 151k | ClearUnlocked(); |
1368 | 151k | } |
1369 | | |
1370 | 76 | string PeerMessageQueue::ToString() const { |
1371 | | // Even though metrics are thread-safe obtain the lock so that we get a "consistent" snapshot of |
1372 | | // the metrics. |
1373 | 76 | LockGuard lock(queue_lock_); |
1374 | 76 | return ToStringUnlocked(); |
1375 | 76 | } |
1376 | | |
1377 | 76 | string PeerMessageQueue::ToStringUnlocked() const { |
1378 | 76 | return Substitute("Consensus queue metrics:" |
1379 | 76 | "Only Majority Done Ops: $0, In Progress Ops: $1, Cache: $2", |
1380 | 76 | metrics_.num_majority_done_ops->value(), metrics_.num_in_progress_ops->value(), |
1381 | 76 | log_cache_.StatsString()); |
1382 | 76 | } |
1383 | | |
1384 | 62.0k | void PeerMessageQueue::RegisterObserver(PeerMessageQueueObserver* observer) { |
1385 | 62.0k | LockGuard lock(queue_lock_); |
1386 | 62.0k | auto iter = std::find(observers_.begin(), observers_.end(), observer); |
1387 | 62.0k | if (iter == observers_.end()) { |
1388 | 62.0k | observers_.push_back(observer); |
1389 | 62.0k | } |
1390 | 62.0k | } |
1391 | | |
1392 | 161k | Status PeerMessageQueue::UnRegisterObserver(PeerMessageQueueObserver* observer) { |
1393 | 161k | LockGuard lock(queue_lock_); |
1394 | 161k | auto iter = std::find(observers_.begin(), observers_.end(), observer); |
1395 | 161k | if (iter == observers_.end()) { |
1396 | 150k | return STATUS(NotFound, "Can't find observer."); |
1397 | 150k | } |
1398 | 10.9k | observers_.erase(iter); |
1399 | 10.9k | return Status::OK(); |
1400 | 161k | } |
1401 | | |
1402 | 580k | const char* PeerMessageQueue::ModeToStr(Mode mode) { |
1403 | 580k | switch (mode) { |
1404 | 258k | case Mode::LEADER: return "LEADER"; |
1405 | 322k | case Mode::NON_LEADER: return "NON_LEADER"; |
1406 | 580k | } |
1407 | 0 | FATAL_INVALID_ENUM_VALUE(PeerMessageQueue::Mode, mode); |
1408 | 0 | } |
1409 | | |
1410 | 229k | const char* PeerMessageQueue::StateToStr(State state) { |
1411 | 229k | switch (state) { |
1412 | 0 | case State::kQueueConstructed: |
1413 | 0 | return "QUEUE_CONSTRUCTED"; |
1414 | 229k | case State::kQueueOpen: |
1415 | 229k | return "QUEUE_OPEN"; |
1416 | 0 | case State::kQueueClosed: |
1417 | 0 | return "QUEUE_CLOSED"; |
1418 | | |
1419 | 229k | } |
1420 | 0 | FATAL_INVALID_ENUM_VALUE(PeerMessageQueue::State, state); |
1421 | 0 | } |
1422 | | |
1423 | 34.8M | bool PeerMessageQueue::IsOpInLog(const yb::OpId& desired_op) const { |
1424 | 34.8M | auto result = log_cache_.LookupOpId(desired_op.index); |
1425 | 34.8M | if (PREDICT_TRUE34.8M (result.ok())) { |
1426 | 34.8M | return desired_op == *result; |
1427 | 34.8M | } |
1428 | 18.4E | if (PREDICT_TRUE(result.status().IsNotFound() || result.status().IsIncomplete())) { |
1429 | 44 | return false; |
1430 | 44 | } |
1431 | 18.4E | LOG_WITH_PREFIX_UNLOCKED(FATAL) << "Error while reading the log: " << result.status(); |
1432 | 18.4E | return false; // Unreachable; here to squelch GCC warning. |
1433 | 18.4E | } |
1434 | | |
1435 | | void PeerMessageQueue::NotifyObserversOfMajorityReplOpChange( |
1436 | 34.7M | const MajorityReplicatedData& majority_replicated_data) { |
1437 | 34.7M | WARN_NOT_OK(raft_pool_observers_token_->SubmitClosure( |
1438 | 34.7M | Bind(&PeerMessageQueue::NotifyObserversOfMajorityReplOpChangeTask, |
1439 | 34.7M | Unretained(this), |
1440 | 34.7M | majority_replicated_data)), |
1441 | 34.7M | LogPrefixUnlocked() + "Unable to notify RaftConsensus of " |
1442 | 34.7M | "majority replicated op change."); |
1443 | 34.7M | } |
1444 | | |
1445 | | template <class Func> |
1446 | 3.26k | void PeerMessageQueue::NotifyObservers(const char* title, Func&& func) { |
1447 | 3.26k | WARN_NOT_OK( |
1448 | 3.26k | raft_pool_observers_token_->SubmitFunc( |
1449 | 3.26k | [this, func = std::move(func)] { |
1450 | 3.26k | MAYBE_INJECT_RANDOM_LATENCY(FLAGS_consensus_inject_latency_ms_in_notifications); |
1451 | 3.26k | std::vector<PeerMessageQueueObserver*> copy; |
1452 | 3.26k | { |
1453 | 3.26k | LockGuard lock(queue_lock_); |
1454 | 3.26k | copy = observers_; |
1455 | 3.26k | } |
1456 | | |
1457 | 3.26k | for (PeerMessageQueueObserver* observer : copy) { |
1458 | 3.26k | func(observer); |
1459 | 3.26k | } |
1460 | 3.26k | }), |
1461 | 3.26k | Format("$0Unable to notify observers for $1.", LogPrefixUnlocked(), title)); |
1462 | 3.26k | } consensus_queue.cc:void yb::consensus::PeerMessageQueue::NotifyObservers<yb::consensus::PeerMessageQueue::NumSSTFilesChanged()::$_0>(char const*, yb::consensus::PeerMessageQueue::NumSSTFilesChanged()::$_0&&) Line | Count | Source | 1446 | 2.06k | void PeerMessageQueue::NotifyObservers(const char* title, Func&& func) { | 1447 | 2.06k | WARN_NOT_OK( | 1448 | 2.06k | raft_pool_observers_token_->SubmitFunc( | 1449 | 2.06k | [this, func = std::move(func)] { | 1450 | 2.06k | MAYBE_INJECT_RANDOM_LATENCY(FLAGS_consensus_inject_latency_ms_in_notifications); | 1451 | 2.06k | std::vector<PeerMessageQueueObserver*> copy; | 1452 | 2.06k | { | 1453 | 2.06k | LockGuard lock(queue_lock_); | 1454 | 2.06k | copy = observers_; | 1455 | 2.06k | } | 1456 | | | 1457 | 2.06k | for (PeerMessageQueueObserver* observer : copy) { | 1458 | 2.06k | func(observer); | 1459 | 2.06k | } | 1460 | 2.06k | }), | 1461 | 2.06k | Format("$0Unable to notify observers for $1.", LogPrefixUnlocked(), title)); | 1462 | 2.06k | } |
consensus_queue.cc:void yb::consensus::PeerMessageQueue::NotifyObservers<yb::consensus::PeerMessageQueue::NotifyObserversOfTermChange(long long)::$_1>(char const*, yb::consensus::PeerMessageQueue::NotifyObserversOfTermChange(long long)::$_1&&) Line | Count | Source | 1446 | 874 | void PeerMessageQueue::NotifyObservers(const char* title, Func&& func) { | 1447 | 874 | WARN_NOT_OK( | 1448 | 874 | raft_pool_observers_token_->SubmitFunc( | 1449 | 874 | [this, func = std::move(func)] { | 1450 | 874 | MAYBE_INJECT_RANDOM_LATENCY(FLAGS_consensus_inject_latency_ms_in_notifications); | 1451 | 874 | std::vector<PeerMessageQueueObserver*> copy; | 1452 | 874 | { | 1453 | 874 | LockGuard lock(queue_lock_); | 1454 | 874 | copy = observers_; | 1455 | 874 | } | 1456 | | | 1457 | 874 | for (PeerMessageQueueObserver* observer : copy) { | 1458 | 874 | func(observer); | 1459 | 874 | } | 1460 | 874 | }), | 1461 | 874 | Format("$0Unable to notify observers for $1.", LogPrefixUnlocked(), title)); | 1462 | 874 | } |
consensus_queue.cc:void yb::consensus::PeerMessageQueue::NotifyObservers<yb::consensus::PeerMessageQueue::NotifyObserversOfFailedFollower(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, long long, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&)::$_2>(char const*, yb::consensus::PeerMessageQueue::NotifyObserversOfFailedFollower(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, long long, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&)::$_2&&) Line | Count | Source | 1446 | 329 | void PeerMessageQueue::NotifyObservers(const char* title, Func&& func) { | 1447 | 329 | WARN_NOT_OK( | 1448 | 329 | raft_pool_observers_token_->SubmitFunc( | 1449 | 329 | [this, func = std::move(func)] { | 1450 | 329 | MAYBE_INJECT_RANDOM_LATENCY(FLAGS_consensus_inject_latency_ms_in_notifications); | 1451 | 329 | std::vector<PeerMessageQueueObserver*> copy; | 1452 | 329 | { | 1453 | 329 | LockGuard lock(queue_lock_); | 1454 | 329 | copy = observers_; | 1455 | 329 | } | 1456 | | | 1457 | 329 | for (PeerMessageQueueObserver* observer : copy) { | 1458 | 329 | func(observer); | 1459 | 329 | } | 1460 | 329 | }), | 1461 | 329 | Format("$0Unable to notify observers for $1.", LogPrefixUnlocked(), title)); | 1462 | 329 | } |
|
1463 | | |
1464 | 874 | void PeerMessageQueue::NotifyObserversOfTermChange(int64_t term) { |
1465 | 874 | NotifyObservers("term change", [term](PeerMessageQueueObserver* observer) { |
1466 | 801 | observer->NotifyTermChange(term); |
1467 | 801 | }); |
1468 | 874 | } |
1469 | | |
1470 | | void PeerMessageQueue::NotifyObserversOfMajorityReplOpChangeTask( |
1471 | 34.7M | const MajorityReplicatedData& majority_replicated_data) { |
1472 | 34.7M | std::vector<PeerMessageQueueObserver*> copy; |
1473 | 34.7M | { |
1474 | 34.7M | LockGuard lock(queue_lock_); |
1475 | 34.7M | copy = observers_; |
1476 | 34.7M | } |
1477 | | |
1478 | | // TODO move commit index advancement here so that the queue is not dependent on consensus at all, |
1479 | | // but that requires a bit more work. |
1480 | 34.7M | OpId new_committed_op_id; |
1481 | 34.7M | OpId last_applied_op_id; |
1482 | 34.7M | for (PeerMessageQueueObserver* observer : copy) { |
1483 | 34.7M | observer->UpdateMajorityReplicated( |
1484 | 34.7M | majority_replicated_data, &new_committed_op_id, &last_applied_op_id); |
1485 | 34.7M | } |
1486 | | |
1487 | 34.7M | { |
1488 | 34.7M | LockGuard lock(queue_lock_); |
1489 | 34.7M | if (!new_committed_op_id.empty() && |
1490 | 34.7M | new_committed_op_id.index > queue_state_.committed_op_id.index34.6M ) { |
1491 | 4.69M | queue_state_.committed_op_id = new_committed_op_id; |
1492 | 4.69M | } |
1493 | 34.7M | queue_state_.last_applied_op_id.MakeAtLeast(last_applied_op_id); |
1494 | 34.7M | local_peer_->last_applied = queue_state_.last_applied_op_id; |
1495 | 34.7M | UpdateAllAppliedOpId(&queue_state_.all_applied_op_id); |
1496 | 34.7M | } |
1497 | 34.7M | } |
1498 | | |
1499 | | void PeerMessageQueue::NotifyObserversOfFailedFollower(const string& uuid, |
1500 | 95 | const string& reason) { |
1501 | 95 | int64_t current_term; |
1502 | 95 | { |
1503 | 95 | LockGuard lock(queue_lock_); |
1504 | 95 | current_term = queue_state_.current_term; |
1505 | 95 | } |
1506 | 95 | NotifyObserversOfFailedFollower(uuid, current_term, reason); |
1507 | 95 | } |
1508 | | |
1509 | | void PeerMessageQueue::NotifyObserversOfFailedFollower(const string& uuid, |
1510 | | int64_t term, |
1511 | 328 | const string& reason) { |
1512 | 330 | NotifyObservers("failed follower", [uuid, term, reason](PeerMessageQueueObserver* observer) { |
1513 | 330 | observer->NotifyFailedFollower(uuid, term, reason); |
1514 | 330 | }); |
1515 | 328 | } |
1516 | | |
1517 | 312k | bool PeerMessageQueue::PeerAcceptedOurLease(const std::string& uuid) const { |
1518 | 312k | std::lock_guard<simple_spinlock> lock(queue_lock_); |
1519 | 312k | TrackedPeer* peer = FindPtrOrNull(peers_map_, uuid); |
1520 | 312k | if (peer == nullptr) { |
1521 | 106k | return false; |
1522 | 106k | } |
1523 | | |
1524 | 206k | return peer->leader_lease_expiration.last_received != CoarseTimePoint(); |
1525 | 312k | } |
1526 | | |
1527 | 10.5k | bool PeerMessageQueue::CanPeerBecomeLeader(const std::string& peer_uuid) const { |
1528 | 10.5k | std::lock_guard<simple_spinlock> lock(queue_lock_); |
1529 | 10.5k | TrackedPeer* peer = FindPtrOrNull(peers_map_, peer_uuid); |
1530 | 10.5k | if (peer == nullptr) { |
1531 | 0 | LOG(ERROR) << "Invalid peer UUID: " << peer_uuid; |
1532 | 0 | return false; |
1533 | 0 | } |
1534 | 10.5k | const bool peer_can_be_leader = peer->last_received >= queue_state_.majority_replicated_op_id; |
1535 | 10.5k | if (!peer_can_be_leader) { |
1536 | 166 | LOG(INFO) << Format( |
1537 | 166 | "Peer $0 cannot become Leader as it is not caught up: Majority OpId $1, Peer OpId $2", |
1538 | 166 | peer_uuid, queue_state_.majority_replicated_op_id, peer->last_received); |
1539 | 166 | } |
1540 | 10.5k | return peer_can_be_leader; |
1541 | 10.5k | } |
1542 | | |
1543 | 10.0k | OpId PeerMessageQueue::PeerLastReceivedOpId(const TabletServerId& uuid) const { |
1544 | 10.0k | std::lock_guard<simple_spinlock> lock(queue_lock_); |
1545 | 10.0k | TrackedPeer* peer = FindPtrOrNull(peers_map_, uuid); |
1546 | 10.0k | if (peer == nullptr) { |
1547 | 0 | LOG(ERROR) << "Invalid peer UUID: " << uuid; |
1548 | 0 | return OpId::Min(); |
1549 | 0 | } |
1550 | 10.0k | return peer->last_received; |
1551 | 10.0k | } |
1552 | | |
1553 | 591 | string PeerMessageQueue::GetUpToDatePeer() const { |
1554 | 591 | OpId highest_op_id = OpId::Min(); |
1555 | 591 | std::vector<std::string> candidates; |
1556 | | |
1557 | 591 | { |
1558 | 591 | std::lock_guard<simple_spinlock> lock(queue_lock_); |
1559 | 2.16k | for (const PeersMap::value_type& entry : peers_map_) { |
1560 | 2.16k | if (local_peer_uuid_ == entry.first) { |
1561 | 591 | continue; |
1562 | 591 | } |
1563 | 1.57k | if (highest_op_id > entry.second->last_received) { |
1564 | 63 | continue; |
1565 | 1.50k | } else if (highest_op_id == entry.second->last_received) { |
1566 | 913 | candidates.push_back(entry.first); |
1567 | 913 | } else { |
1568 | 596 | candidates = {entry.first}; |
1569 | 596 | highest_op_id = entry.second->last_received; |
1570 | 596 | } |
1571 | 1.57k | } |
1572 | 591 | } |
1573 | | |
1574 | 591 | if (candidates.empty()) { |
1575 | 0 | return string(); |
1576 | 0 | } |
1577 | 591 | size_t index = 0; |
1578 | 591 | if (candidates.size() > 1) { |
1579 | | // choose randomly among candidates at the same opid |
1580 | 504 | index = RandomUniformInt<size_t>(0, candidates.size() - 1); |
1581 | 504 | } |
1582 | 591 | return candidates[index]; |
1583 | 591 | } |
1584 | | |
1585 | 75.6k | PeerMessageQueue::~PeerMessageQueue() { |
1586 | 75.6k | Close(); |
1587 | 75.6k | } |
1588 | | |
1589 | 351k | string PeerMessageQueue::LogPrefixUnlocked() const { |
1590 | | // TODO: we should probably use an atomic here. We'll just annotate away the TSAN error for now, |
1591 | | // since the worst case is a slightly out-of-date log message, and not very likely. |
1592 | 351k | Mode mode = ANNOTATE_UNPROTECTED_READ(queue_state_.mode); |
1593 | 351k | return Substitute("T $0 P $1 [$2]: ", |
1594 | 351k | tablet_id_, |
1595 | 351k | local_peer_uuid_, |
1596 | 351k | ModeToStr(mode)); |
1597 | 351k | } |
1598 | | |
1599 | 229k | string PeerMessageQueue::QueueState::ToString() const { |
1600 | 229k | return Format( |
1601 | 229k | "All replicated op: $0, Majority replicated op: $1, Committed index: $2, Last applied: $3, " |
1602 | 229k | "Last appended: $4, Current term: $5, Majority size: $6, State: $7, Mode: $8$9", |
1603 | 229k | /* 0 */ all_replicated_op_id, |
1604 | 229k | /* 1 */ majority_replicated_op_id, |
1605 | 229k | /* 2 */ committed_op_id, |
1606 | 229k | /* 3 */ last_applied_op_id, |
1607 | 229k | /* 4 */ last_appended, |
1608 | 229k | /* 5 */ current_term, |
1609 | 229k | /* 6 */ majority_size_, |
1610 | 229k | /* 7 */ StateToStr(state), |
1611 | 229k | /* 8 */ ModeToStr(mode), |
1612 | 229k | /* 9 */ active_config ? ", active raft config: " + active_config->ShortDebugString()67.8k : ""161k ); |
1613 | 229k | } |
1614 | | |
1615 | 0 | size_t PeerMessageQueue::LogCacheSize() { |
1616 | 0 | return log_cache_.BytesUsed(); |
1617 | 0 | } |
1618 | | |
1619 | 0 | size_t PeerMessageQueue::EvictLogCache(size_t bytes_to_evict) { |
1620 | 0 | return log_cache_.EvictThroughOp(std::numeric_limits<int64_t>::max(), bytes_to_evict); |
1621 | 0 | } |
1622 | | |
1623 | 0 | Status PeerMessageQueue::FlushLogIndex() { |
1624 | 0 | return log_cache_.FlushIndex(); |
1625 | 0 | } |
1626 | | |
1627 | 13.0M | void PeerMessageQueue::TrackOperationsMemory(const OpIds& op_ids) { |
1628 | 13.0M | log_cache_.TrackOperationsMemory(op_ids); |
1629 | 13.0M | } |
1630 | | |
1631 | | Result<OpId> PeerMessageQueue::TEST_GetLastOpIdWithType( |
1632 | 3 | int64_t max_allowed_index, OperationType op_type) { |
1633 | 3 | return log_cache_.TEST_GetLastOpIdWithType(max_allowed_index, op_type); |
1634 | 3 | } |
1635 | | |
1636 | 14.6k | Status ValidateFlags() { |
1637 | | // Normally we would have used |
1638 | | // DEFINE_validator(rpc_throttle_threshold_bytes, &RpcThrottleThresholdBytesValidator); |
1639 | | // right after defining the rpc_throttle_threshold_bytes flag. However, this leads to a segfault |
1640 | | // in the LTO-enabled build, presumably due to indeterminate order of static initialization. |
1641 | | // Instead, we invoke this function from master/tserver main() functions when static |
1642 | | // initialization is already finished. |
1643 | 14.6k | if (!RpcThrottleThresholdBytesValidator( |
1644 | 14.6k | "rpc_throttle_threshold_bytes", FLAGS_rpc_throttle_threshold_bytes)) { |
1645 | 0 | return STATUS(InvalidArgument, "Flag validation failed"); |
1646 | 0 | } |
1647 | | |
1648 | 14.6k | return Status::OK(); |
1649 | 14.6k | } |
1650 | | |
1651 | | } // namespace consensus |
1652 | | } // namespace yb |