/Users/deen/code/yugabyte-db/src/yb/consensus/consensus_peers.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | // |
18 | | // The following only applies to changes made to this file as part of YugaByte development. |
19 | | // |
20 | | // Portions Copyright (c) YugaByte, Inc. |
21 | | // |
22 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
23 | | // in compliance with the License. You may obtain a copy of the License at |
24 | | // |
25 | | // http://www.apache.org/licenses/LICENSE-2.0 |
26 | | // |
27 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
28 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
29 | | // or implied. See the License for the specific language governing permissions and limitations |
30 | | // under the License. |
31 | | // |
32 | | |
33 | | #include "yb/consensus/consensus_peers.h" |
34 | | |
35 | | #include <algorithm> |
36 | | #include <mutex> |
37 | | #include <string> |
38 | | #include <utility> |
39 | | #include <vector> |
40 | | |
41 | | #include <boost/optional.hpp> |
42 | | #include <glog/logging.h> |
43 | | |
44 | | #include "yb/common/wire_protocol.h" |
45 | | |
46 | | #include "yb/consensus/consensus.h" |
47 | | #include "yb/consensus/consensus.proxy.h" |
48 | | #include "yb/consensus/consensus_meta.h" |
49 | | #include "yb/consensus/consensus_queue.h" |
50 | | #include "yb/consensus/replicate_msgs_holder.h" |
51 | | #include "yb/consensus/multi_raft_batcher.h" |
52 | | #include "yb/gutil/strings/substitute.h" |
53 | | |
54 | | #include "yb/rpc/periodic.h" |
55 | | #include "yb/rpc/rpc_controller.h" |
56 | | |
57 | | #include "yb/tablet/tablet_error.h" |
58 | | #include "yb/tserver/tserver_error.h" |
59 | | |
60 | | #include "yb/util/backoff_waiter.h" |
61 | | #include "yb/util/fault_injection.h" |
62 | | #include "yb/util/flag_tags.h" |
63 | | #include "yb/util/format.h" |
64 | | #include "yb/util/logging.h" |
65 | | #include "yb/util/monotime.h" |
66 | | #include "yb/util/net/net_util.h" |
67 | | #include "yb/util/scope_exit.h" |
68 | | #include "yb/util/status_callback.h" |
69 | | #include "yb/util/status_format.h" |
70 | | #include "yb/util/threadpool.h" |
71 | | #include "yb/util/tsan_util.h" |
72 | | |
73 | | using namespace std::literals; |
74 | | using namespace std::placeholders; |
75 | | |
76 | | DEFINE_int32(consensus_rpc_timeout_ms, 3000, |
77 | | "Timeout used for all consensus internal RPC communications."); |
78 | | TAG_FLAG(consensus_rpc_timeout_ms, advanced); |
79 | | |
80 | | DEFINE_int32(max_wait_for_processresponse_before_closing_ms, |
81 | | yb::RegularBuildVsSanitizers(5000, 60000), |
82 | | "Maximum amount of time we will wait in Peer::Close() for Peer::ProcessResponse() to " |
83 | | "finish before returning proceding to close the Peer and return"); |
84 | | TAG_FLAG(max_wait_for_processresponse_before_closing_ms, advanced); |
85 | | |
86 | | DECLARE_int32(raft_heartbeat_interval_ms); |
87 | | |
88 | | DECLARE_bool(enable_multi_raft_heartbeat_batcher); |
89 | | |
90 | | DEFINE_test_flag(double, fault_crash_on_leader_request_fraction, 0.0, |
91 | | "Fraction of the time when the leader will crash just before sending an " |
92 | | "UpdateConsensus RPC."); |
93 | | |
94 | | DEFINE_test_flag(int32, delay_removing_peer_with_failed_tablet_secs, 0, |
95 | | "If greater than 0, Peer::ProcessResponse will sleep after receiving a response " |
96 | | "indicating that a tablet is in the FAILED state, and before marking this peer " |
97 | | "as failed."); |
98 | | |
99 | | // Allow for disabling remote bootstrap in unit tests where we want to test |
100 | | // certain scenarios without triggering bootstrap of a remote peer. |
101 | | DEFINE_test_flag(bool, enable_remote_bootstrap, true, |
102 | | "Whether remote bootstrap will be initiated by the leader when it " |
103 | | "detects that a follower is out of date or does not have a tablet " |
104 | | "replica."); |
105 | | |
106 | | DECLARE_int32(TEST_log_change_config_every_n); |
107 | | |
108 | | namespace yb { |
109 | | namespace consensus { |
110 | | |
111 | | using log::Log; |
112 | | using log::LogEntryBatch; |
113 | | using std::shared_ptr; |
114 | | using rpc::Messenger; |
115 | | using rpc::PeriodicTimer; |
116 | | using rpc::RpcController; |
117 | | using strings::Substitute; |
118 | | |
119 | | Peer::Peer( |
120 | | const RaftPeerPB& peer_pb, string tablet_id, string leader_uuid, PeerProxyPtr proxy, |
121 | | PeerMessageQueue* queue, MultiRaftHeartbeatBatcherPtr multi_raft_batcher, |
122 | | ThreadPoolToken* raft_pool_token, Consensus* consensus, rpc::Messenger* messenger) |
123 | | : tablet_id_(std::move(tablet_id)), |
124 | | leader_uuid_(std::move(leader_uuid)), |
125 | | peer_pb_(peer_pb), |
126 | | proxy_(std::move(proxy)), |
127 | | queue_(queue), |
128 | | multi_raft_batcher_(std::move(multi_raft_batcher)), |
129 | | raft_pool_token_(raft_pool_token), |
130 | | consensus_(consensus), |
131 | 123k | messenger_(messenger) {} |
132 | | |
133 | 1 | void Peer::SetTermForTest(int term) { |
134 | 1 | update_response_.set_responder_term(term); |
135 | 1 | } |
136 | | |
137 | 123k | Status Peer::Init() { |
138 | 123k | std::lock_guard<simple_spinlock> lock(peer_lock_); |
139 | 123k | queue_->TrackPeer(peer_pb_.permanent_uuid()); |
140 | | // Capture a weak_ptr reference into the functor so it can safely handle |
141 | | // outliving the peer. |
142 | 123k | std::weak_ptr<Peer> weak_peer = shared_from_this(); |
143 | 123k | heartbeater_ = PeriodicTimer::Create( |
144 | 123k | messenger_, |
145 | 11.0M | [weak_peer]() { |
146 | 11.0M | if (auto p11.0M = weak_peer.lock()) { |
147 | 11.0M | Status s = p->SignalRequest(RequestTriggerMode::kAlwaysSend); |
148 | 11.0M | } |
149 | 11.0M | }, |
150 | 123k | MonoDelta::FromMilliseconds(FLAGS_raft_heartbeat_interval_ms)); |
151 | 123k | heartbeater_->Start(); |
152 | 123k | state_ = kPeerStarted; |
153 | 123k | return Status::OK(); |
154 | 123k | } |
155 | | |
156 | 31.8M | Status Peer::SignalRequest(RequestTriggerMode trigger_mode) { |
157 | | // If the peer is currently sending, return Status::OK(). |
158 | | // If there are new requests in the queue we'll get them on ProcessResponse(). |
159 | 31.8M | auto performing_update_lock = LockPerformingUpdate(std::try_to_lock); |
160 | 31.8M | if (!performing_update_lock.owns_lock()) { |
161 | 6.14M | return Status::OK(); |
162 | 6.14M | } |
163 | | |
164 | 25.6M | { |
165 | 25.6M | auto processing_lock = StartProcessingUnlocked(); |
166 | 25.6M | if (!processing_lock.owns_lock()) { |
167 | 4 | return STATUS(IllegalState, "Peer was closed."); |
168 | 4 | } |
169 | | |
170 | | // For the first request sent by the peer, we send it even if the queue is empty, which it will |
171 | | // always appear to be for the first request, since this is the negotiation round. |
172 | 25.6M | if (PREDICT_FALSE(state_ == kPeerStarted)) { |
173 | 123k | trigger_mode = RequestTriggerMode::kAlwaysSend; |
174 | 123k | state_ = kPeerRunning; |
175 | 123k | } |
176 | 25.6M | DCHECK_EQ(state_, kPeerRunning); |
177 | | |
178 | | // If our last request generated an error, and this is not a normal heartbeat request (i.e. |
179 | | // we're not forcing a request even if the queue is empty, unlike we do during heartbeats), |
180 | | // then don't send the "per-RPC" request. Instead, we'll wait for the heartbeat. |
181 | | // |
182 | | // TODO: we could consider looking at the number of consecutive failed attempts, and instead of |
183 | | // ignoring the signal, ask the heartbeater to "expedite" the next heartbeat in order to achieve |
184 | | // something like exponential backoff after an error. As it is implemented today, any transient |
185 | | // error will result in a latency blip as long as the heartbeat period. |
186 | 25.6M | if (failed_attempts_ > 0 && trigger_mode == RequestTriggerMode::kNonEmptyOnly363k ) { |
187 | 81.2k | return Status::OK(); |
188 | 81.2k | } |
189 | | |
190 | 25.5M | using_thread_pool_.fetch_add(1, std::memory_order_acq_rel); |
191 | 25.5M | } |
192 | 0 | auto status = raft_pool_token_->SubmitFunc( |
193 | 25.5M | std::bind(&Peer::SendNextRequest, shared_from_this(), trigger_mode)); |
194 | 25.5M | using_thread_pool_.fetch_sub(1, std::memory_order_acq_rel); |
195 | 25.6M | if (status.ok()25.5M ) { |
196 | 25.6M | performing_update_lock.release(); |
197 | 25.6M | } |
198 | 25.5M | return status; |
199 | 25.6M | } |
200 | | |
201 | 28.8M | void Peer::SendNextRequest(RequestTriggerMode trigger_mode) { |
202 | 28.8M | auto retain_self = shared_from_this(); |
203 | 28.8M | DCHECK(performing_update_mutex_.is_locked()) << "Cannot send request"790 ; |
204 | | |
205 | 28.8M | auto performing_update_lock = LockPerformingUpdate(std::adopt_lock); |
206 | 28.8M | auto processing_lock = StartProcessingUnlocked(); |
207 | 28.8M | if (!processing_lock.owns_lock()) { |
208 | 52 | return; |
209 | 52 | } |
210 | | // Since there's a couple of return paths from this function, setup a cleanup, in case we fill in |
211 | | // ops inside update_request_, but do not get to use them. |
212 | 28.8M | bool needs_cleanup = true; |
213 | 28.8M | ScopeExit([&needs_cleanup, this](){ |
214 | 28.8M | if (needs_cleanup) { |
215 | | // Since we will not be using update_request_, we should cleanup the reserved ops. |
216 | 28.8M | CleanRequestOps(&update_request_); |
217 | 28.8M | } |
218 | 28.8M | }); |
219 | | |
220 | | // The peer has no pending request nor is sending: send the request. |
221 | 28.8M | bool needs_remote_bootstrap = false; |
222 | 28.8M | bool last_exchange_successful = false; |
223 | 28.8M | PeerMemberType member_type = PeerMemberType::UNKNOWN_MEMBER_TYPE; |
224 | 28.8M | int64_t commit_index_before = update_request_.has_committed_op_id() ? |
225 | 28.6M | update_request_.committed_op_id().index() : kMinimumOpIdIndex138k ; |
226 | 28.8M | ReplicateMsgsHolder msgs_holder; |
227 | 28.8M | Status s = queue_->RequestForPeer( |
228 | 28.8M | peer_pb_.permanent_uuid(), &update_request_, &msgs_holder, &needs_remote_bootstrap, |
229 | 28.8M | &member_type, &last_exchange_successful); |
230 | 28.8M | int64_t commit_index_after = update_request_.has_committed_op_id() ? |
231 | 28.8M | update_request_.committed_op_id().index() : kMinimumOpIdIndex21.0k ; |
232 | | |
233 | 28.8M | if (PREDICT_FALSE(!s.ok())) { |
234 | 69 | LOG_WITH_PREFIX(INFO) << "Could not obtain request from queue for peer: " << s; |
235 | 69 | return; |
236 | 69 | } |
237 | | |
238 | 28.8M | if (PREDICT_FALSE(needs_remote_bootstrap)) { |
239 | 10.7k | Status status; |
240 | 10.7k | if (!FLAGS_TEST_enable_remote_bootstrap) { |
241 | 60 | failed_attempts_++; |
242 | 60 | status = STATUS(NotSupported, "remote bootstrap is disabled"); |
243 | 10.6k | } else { |
244 | 10.6k | status = queue_->GetRemoteBootstrapRequestForPeer(peer_pb_.permanent_uuid(), &rb_request_); |
245 | 10.6k | if (!consensus_->split_parent_tablet_id().empty()) { |
246 | 87 | rb_request_.set_split_parent_tablet_id(consensus_->split_parent_tablet_id()); |
247 | 87 | } |
248 | 10.6k | } |
249 | 10.7k | if (!status.ok()) { |
250 | 60 | LOG_WITH_PREFIX(WARNING) << "Unable to generate remote bootstrap request for peer: " |
251 | 60 | << status; |
252 | 60 | return; |
253 | 60 | } |
254 | | |
255 | 10.6k | using_thread_pool_.fetch_add(1, std::memory_order_acq_rel); |
256 | 10.6k | s = SendRemoteBootstrapRequest(); |
257 | 10.6k | using_thread_pool_.fetch_sub(1, std::memory_order_acq_rel); |
258 | 10.6k | if (s.ok()) { |
259 | 10.6k | performing_update_lock.release(); |
260 | 10.6k | } |
261 | 10.6k | return; |
262 | 10.7k | } |
263 | | |
264 | | // If the peer doesn't need remote bootstrap, but it is a PRE_VOTER or PRE_OBSERVER in the config, |
265 | | // we need to promote it. |
266 | 28.8M | if (last_exchange_successful && |
267 | 28.8M | (28.5M member_type == PeerMemberType::PRE_VOTER28.5M || member_type == PeerMemberType::PRE_OBSERVER25.6M )) { |
268 | 2.91M | if (PREDICT_TRUE(consensus_)) { |
269 | 2.91M | auto uuid = peer_pb_.permanent_uuid(); |
270 | | // Remove these here, before we drop the locks. |
271 | 2.91M | needs_cleanup = false; |
272 | 2.91M | CleanRequestOps(&update_request_); |
273 | 2.91M | processing_lock.unlock(); |
274 | 2.91M | performing_update_lock.unlock(); |
275 | 2.91M | consensus::ChangeConfigRequestPB req; |
276 | 2.91M | consensus::ChangeConfigResponsePB resp; |
277 | | |
278 | 2.91M | req.set_tablet_id(tablet_id_); |
279 | 2.91M | req.set_type(consensus::CHANGE_ROLE); |
280 | 2.91M | RaftPeerPB *peer = req.mutable_server(); |
281 | 2.91M | peer->set_permanent_uuid(peer_pb_.permanent_uuid()); |
282 | | |
283 | 2.91M | boost::optional<tserver::TabletServerErrorPB::Code> error_code; |
284 | | |
285 | | // If another ChangeConfig is being processed, our request will be rejected. |
286 | 2.91M | YB_LOG_EVERY_N(INFO, FLAGS_TEST_log_change_config_every_n) |
287 | 1.03k | << "Sending ChangeConfig request to promote peer"; |
288 | 2.91M | auto status = consensus_->ChangeConfig(req, &DoNothingStatusCB, &error_code); |
289 | 2.91M | if (PREDICT_FALSE(!status.ok())) { |
290 | 2.91M | YB_LOG_EVERY_N(INFO, FLAGS_TEST_log_change_config_every_n) |
291 | 974 | << "Unable to change role for peer " << uuid << ": " << status; |
292 | | // Since we released the semaphore, we need to call SignalRequest again to send a message |
293 | 2.91M | status = SignalRequest(RequestTriggerMode::kAlwaysSend); |
294 | 2.91M | if (PREDICT_FALSE(!status.ok())) { |
295 | 3 | LOG(WARNING) << "Unexpected error when trying to send request: " |
296 | 3 | << status; |
297 | 3 | } |
298 | 2.91M | } |
299 | 2.91M | return; |
300 | 2.91M | } |
301 | 2.91M | } |
302 | | |
303 | 25.9M | if (update_request_.tablet_id().empty()) { |
304 | 123k | update_request_.set_tablet_id(tablet_id_); |
305 | 123k | update_request_.set_caller_uuid(leader_uuid_); |
306 | 123k | update_request_.set_dest_uuid(peer_pb_.permanent_uuid()); |
307 | 123k | } |
308 | | |
309 | 25.9M | const bool req_is_heartbeat = update_request_.ops_size() == 0 && |
310 | 25.9M | commit_index_after <= commit_index_before17.5M ; |
311 | | |
312 | | // If the queue is empty, check if we were told to send a status-only message (which is what |
313 | | // happens during heartbeats). If not, just return. |
314 | 25.9M | if (PREDICT_FALSE(req_is_heartbeat && trigger_mode == RequestTriggerMode::kNonEmptyOnly)) { |
315 | 23.2k | queue_->RequestWasNotSent(peer_pb_.permanent_uuid()); |
316 | 23.2k | return; |
317 | 23.2k | } |
318 | | |
319 | | // If we're actually sending ops there's no need to heartbeat for a while, reset the heartbeater. |
320 | 25.8M | if (!req_is_heartbeat) { |
321 | 15.0M | heartbeater_->Snooze(); |
322 | 15.0M | } |
323 | | |
324 | 25.8M | MAYBE_FAULT(FLAGS_TEST_fault_crash_on_leader_request_fraction); |
325 | | |
326 | | // We will cleanup ops from request in ProcessResponse, because otherwise there could be race |
327 | | // condition. When rest of this function is running in parallel to ProcessResponse. |
328 | 25.8M | needs_cleanup = false; |
329 | 25.8M | msgs_holder.ReleaseOps(); |
330 | | |
331 | | // Heartbeat batching allows for network layer savings by reducing CPU cycles |
332 | | // spent on computing state, context switching (sending/receiving RPC's) |
333 | | // and serializing/deserializing protobufs. |
334 | 25.8M | if (req_is_heartbeat && multi_raft_batcher_10.7M |
335 | 25.8M | && FLAGS_enable_multi_raft_heartbeat_batcher0 ) { |
336 | 0 | auto performing_heartbeat_lock = LockPerformingHeartbeat(std::try_to_lock); |
337 | 0 | if (!performing_heartbeat_lock.owns_lock()) { |
338 | | // Outstanding heartbeat already in flight so don't schedule another. |
339 | 0 | return; |
340 | 0 | } |
341 | 0 | heartbeat_request_.Swap(&update_request_); |
342 | 0 | heartbeat_response_.Swap(&update_response_); |
343 | 0 | cur_heartbeat_id_++; |
344 | 0 | processing_lock.unlock(); |
345 | 0 | performing_update_lock.unlock(); |
346 | 0 | performing_heartbeat_lock.release(); |
347 | 0 | multi_raft_batcher_->AddRequestToBatch(&heartbeat_request_, &heartbeat_response_, |
348 | 0 | std::bind(&Peer::ProcessHeartbeatResponse, |
349 | 0 | retain_self, _1)); |
350 | 0 | return; |
351 | 0 | } |
352 | | |
353 | | // The minimum_viable_heartbeat_ represents the |
354 | | // heartbeat that is sent immediately following this op. |
355 | | // Any heartbeats which are outstanding are considered no longer viable. |
356 | | // It's simpler for us to drop the responses for these heartbeats |
357 | | // rather than attempt to ensure we process the responses of the outstanding heartbeat |
358 | | // and this new request in the same order they were received by the remote peer. |
359 | | // TODO: Remove batched but unsent heartbeats (in the respective MultiRaftBatcher) in this case |
360 | 25.8M | minimum_viable_heartbeat_ = cur_heartbeat_id_ + 1; |
361 | 25.8M | processing_lock.unlock(); |
362 | 25.8M | performing_update_lock.release(); |
363 | 25.8M | controller_.set_invoke_callback_mode(rpc::InvokeCallbackMode::kThreadPoolHigh); |
364 | 25.8M | proxy_->UpdateAsync(&update_request_, trigger_mode, &update_response_, &controller_, |
365 | 25.8M | std::bind(&Peer::ProcessResponse, retain_self)); |
366 | 25.8M | } |
367 | | |
368 | 80.4M | std::unique_lock<simple_spinlock> Peer::StartProcessingUnlocked() { |
369 | 80.4M | std::unique_lock<simple_spinlock> lock(peer_lock_); |
370 | | |
371 | 80.4M | if (state_ == kPeerClosed) { |
372 | 3.68k | lock.unlock(); |
373 | 3.68k | } |
374 | | |
375 | 80.4M | return lock; |
376 | 80.4M | } |
377 | | |
378 | | bool Peer::ProcessResponseWithStatus(const Status& status, |
379 | 25.8M | ConsensusResponsePB* response) { |
380 | 25.8M | if (!status.ok()) { |
381 | 276k | if (status.IsRemoteError()) { |
382 | | // Most controller errors are caused by network issues or corner cases like shutdown and |
383 | | // failure to serialize a protobuf. Therefore, we generally consider these errors to indicate |
384 | | // an unreachable peer. However, a RemoteError wraps some other error propagated from the |
385 | | // remote peer, so we know the remote is alive. Therefore, we will let the queue know that the |
386 | | // remote is responsive. |
387 | 12 | queue_->NotifyPeerIsResponsiveDespiteError(peer_pb_.permanent_uuid()); |
388 | 12 | } |
389 | 276k | ProcessResponseError(status); |
390 | 276k | return false; |
391 | 276k | } |
392 | | |
393 | 25.5M | if (response->has_propagated_hybrid_time()) { |
394 | 25.5M | queue_->clock()->Update(HybridTime(response->propagated_hybrid_time())); |
395 | 25.5M | } |
396 | | |
397 | | // We should try to evict a follower which returns a WRONG UUID error. |
398 | 25.5M | if (response->has_error() && |
399 | 25.5M | response->error().code() == tserver::TabletServerErrorPB::WRONG_SERVER_UUID17.8k ) { |
400 | 95 | queue_->NotifyObserversOfFailedFollower( |
401 | 95 | peer_pb_.permanent_uuid(), |
402 | 95 | Substitute("Leader communication with peer $0 received error $1, will try to " |
403 | 95 | "evict peer", peer_pb_.permanent_uuid(), |
404 | 95 | response->error().ShortDebugString())); |
405 | 95 | ProcessResponseError(StatusFromPB(response->error().status())); |
406 | 95 | return false; |
407 | 95 | } |
408 | | |
409 | 25.5M | auto s = ResponseStatus(*response); |
410 | 25.5M | if (!s.ok() && |
411 | 25.5M | tserver::TabletServerError(s) == tserver::TabletServerErrorPB::TABLET_NOT_RUNNING17.7k && |
412 | 25.5M | tablet::RaftGroupStateError(s) == tablet::RaftGroupStatePB::FAILED6.31k ) { |
413 | 1 | if (PREDICT_FALSE(FLAGS_TEST_delay_removing_peer_with_failed_tablet_secs > 0)) { |
414 | 1 | LOG(INFO) << "TEST: Sleeping for " << FLAGS_TEST_delay_removing_peer_with_failed_tablet_secs |
415 | 1 | << " seconds"; |
416 | 1 | SleepFor(MonoDelta::FromSeconds(FLAGS_TEST_delay_removing_peer_with_failed_tablet_secs)); |
417 | 1 | } |
418 | 1 | queue_->NotifyObserversOfFailedFollower( |
419 | 1 | peer_pb_.permanent_uuid(), |
420 | 1 | Format("Tablet in peer $0 is in FAILED state, will try to evict peer", |
421 | 1 | peer_pb_.permanent_uuid())); |
422 | 1 | ProcessResponseError(StatusFromPB(response->error().status())); |
423 | 1 | } |
424 | | |
425 | | // Response should be either error or status. |
426 | 18.4E | LOG_IF(DFATAL, response->has_error() == response->has_status()) |
427 | 18.4E | << "Invalid response: " << response->ShortDebugString(); |
428 | | |
429 | | // Pass through errors we can respond to, like not found, since in that case |
430 | | // we will need to remotely bootstrap. TODO: Handle DELETED response once implemented. |
431 | 25.5M | if ((response->has_error() && |
432 | 25.5M | response->error().code() != tserver::TabletServerErrorPB::TABLET_NOT_FOUND17.7k ) || |
433 | 25.5M | (25.5M response->status().has_error()25.5M && |
434 | 25.5M | response->status().error().code() == consensus::ConsensusErrorPB::CANNOT_PREPARE121k )) { |
435 | | // Again, let the queue know that the remote is still responsive, since we will not be sending |
436 | | // this error response through to the queue. |
437 | 7.14k | queue_->NotifyPeerIsResponsiveDespiteError(peer_pb_.permanent_uuid()); |
438 | 7.14k | ProcessResponseError(StatusFromPB(response->error().status())); |
439 | 7.14k | return false; |
440 | 7.14k | } |
441 | | |
442 | 25.5M | failed_attempts_ = 0; |
443 | 25.5M | return queue_->ResponseFromPeer(peer_pb_.permanent_uuid(), *response); |
444 | 25.5M | } |
445 | | |
446 | 25.9M | void Peer::ProcessResponse() { |
447 | 25.9M | DCHECK(performing_update_mutex_.is_locked()) << "Got a response when nothing was pending."5.49k ; |
448 | 25.9M | auto status = controller_.status(); |
449 | 25.9M | if (status.ok()) { |
450 | 25.6M | status = controller_.thread_pool_failure(); |
451 | 25.6M | } |
452 | 25.9M | controller_.Reset(); |
453 | 25.9M | CleanRequestOps(&update_request_); |
454 | | |
455 | 25.9M | auto performing_update_lock = LockPerformingUpdate(std::adopt_lock); |
456 | 25.9M | auto processing_lock = StartProcessingUnlocked(); |
457 | 25.9M | if (!processing_lock.owns_lock()) { |
458 | 1.73k | return; |
459 | 1.73k | } |
460 | 25.9M | bool more_pending = ProcessResponseWithStatus(status, &update_response_); |
461 | | |
462 | 25.9M | if (more_pending) { |
463 | 3.24M | processing_lock.unlock(); |
464 | 3.24M | performing_update_lock.release(); |
465 | 3.24M | SendNextRequest(RequestTriggerMode::kAlwaysSend); |
466 | 3.24M | } |
467 | 25.9M | } |
468 | | |
469 | 0 | void Peer::ProcessHeartbeatResponse(const Status& status) { |
470 | 0 | DCHECK(performing_heartbeat_mutex_.is_locked()) << "Got a heartbeat when nothing was pending."; |
471 | 0 | DCHECK(heartbeat_request_.ops_size() == 0) << "Got a heartbeat with a non-zero number of ops."; |
472 | |
|
473 | 0 | auto performing_heartbeat_lock = LockPerformingHeartbeat(std::adopt_lock); |
474 | 0 | auto processing_lock = StartProcessingUnlocked(); |
475 | 0 | if (!processing_lock.owns_lock()) { |
476 | 0 | return; |
477 | 0 | } |
478 | | |
479 | 0 | if (cur_heartbeat_id_ < minimum_viable_heartbeat_) { |
480 | | // If we receive a response from a heartbeat that was sent before a valid op |
481 | | // then we should discard it as the op is more recent and the heartbeat should not |
482 | | // be modifying any state. |
483 | | // TODO: Add a metric to track the frequency of this |
484 | 0 | return; |
485 | 0 | } |
486 | 0 | bool more_pending = ProcessResponseWithStatus(status, &heartbeat_response_); |
487 | |
|
488 | 0 | if (more_pending) { |
489 | 0 | auto performing_update_lock = LockPerformingUpdate(std::try_to_lock); |
490 | 0 | if (!performing_update_lock.owns_lock()) { |
491 | 0 | return; |
492 | 0 | } |
493 | 0 | performing_heartbeat_lock.unlock(); |
494 | 0 | processing_lock.unlock(); |
495 | 0 | performing_update_lock.release(); |
496 | 0 | SendNextRequest(RequestTriggerMode::kAlwaysSend); |
497 | 0 | } |
498 | 0 | } |
499 | | |
500 | 10.6k | Status Peer::SendRemoteBootstrapRequest() { |
501 | 10.6k | YB_LOG_WITH_PREFIX_EVERY_N_SECS484 (INFO, 30) << "Sending request to remotely bootstrap"484 ; |
502 | 10.6k | controller_.set_invoke_callback_mode(rpc::InvokeCallbackMode::kThreadPoolNormal); |
503 | 10.6k | return raft_pool_token_->SubmitFunc([retain_self = shared_from_this()]() { |
504 | 10.6k | retain_self->proxy_->StartRemoteBootstrap( |
505 | 10.6k | &retain_self->rb_request_, &retain_self->rb_response_, &retain_self->controller_, |
506 | 10.6k | std::bind(&Peer::ProcessRemoteBootstrapResponse, retain_self)); |
507 | 10.6k | }); |
508 | 10.6k | } |
509 | | |
510 | 10.6k | void Peer::ProcessRemoteBootstrapResponse() { |
511 | 10.6k | Status status = controller_.status(); |
512 | 10.6k | controller_.Reset(); |
513 | | |
514 | 10.6k | auto performing_update_lock = LockPerformingUpdate(std::adopt_lock); |
515 | 10.6k | auto processing_lock = StartProcessingUnlocked(); |
516 | 10.6k | if (!processing_lock.owns_lock()) { |
517 | 1.88k | return; |
518 | 1.88k | } |
519 | | |
520 | 8.73k | if (!status.ok()) { |
521 | 117 | LOG_WITH_PREFIX(WARNING) << "Unable to begin remote bootstrap on peer: " << status; |
522 | 117 | return; |
523 | 117 | } |
524 | | |
525 | 8.61k | if (rb_response_.has_error()) { |
526 | 8.61k | const auto error_code = rb_response_.error().code(); |
527 | 8.61k | if ( |
528 | 8.61k | error_code == tserver::TabletServerErrorPB::ALREADY_IN_PROGRESS || |
529 | 8.61k | error_code == tserver::TabletServerErrorPB::TABLET_SPLIT_PARENT_STILL_LIVE84 ) { |
530 | 8.61k | queue_->NotifyPeerIsResponsiveDespiteError(peer_pb_.permanent_uuid()); |
531 | 8.61k | YB_LOG_WITH_PREFIX_EVERY_N_SECS57 (WARNING, 30) |
532 | 57 | << ":::Unable to begin remote bootstrap on peer: " << rb_response_.ShortDebugString(); |
533 | 8.61k | } else { |
534 | 0 | LOG_WITH_PREFIX(WARNING) << "Unable to begin remote bootstrap on peer: " |
535 | 0 | << rb_response_.ShortDebugString(); |
536 | 0 | } |
537 | 8.61k | } |
538 | 8.61k | } |
539 | | |
540 | 283k | void Peer::ProcessResponseError(const Status& status) { |
541 | 283k | DCHECK(performing_update_mutex_.is_locked() || performing_heartbeat_mutex_.is_locked()); |
542 | 283k | failed_attempts_++; |
543 | 283k | YB_LOG_WITH_PREFIX_EVERY_N_SECS24.0k (WARNING, 5) << "Couldn't send request. " |
544 | 24.0k | << " Status: " << status.ToString() << ". Retrying in the next heartbeat period." |
545 | 24.0k | << " Already tried " << failed_attempts_ << " times. State: " << state_; |
546 | 283k | } |
547 | | |
548 | 100k | string Peer::LogPrefix() const { |
549 | 100k | return Format("T $0 P $1 -> Peer $2 ($3, $4): ", |
550 | 100k | tablet_id_, leader_uuid_, peer_pb_.permanent_uuid(), |
551 | 100k | peer_pb_.last_known_private_addr(), peer_pb_.last_known_broadcast_addr()); |
552 | 100k | } |
553 | | |
554 | 75.4k | void Peer::Close() { |
555 | 75.4k | if (heartbeater_) { |
556 | 75.4k | heartbeater_->Stop(); |
557 | 75.4k | } |
558 | | |
559 | | // If the peer is already closed return. |
560 | 75.4k | { |
561 | 75.4k | std::lock_guard<simple_spinlock> processing_lock(peer_lock_); |
562 | 75.4k | if (using_thread_pool_.load(std::memory_order_acquire) > 0) { |
563 | 14 | auto deadline = std::chrono::steady_clock::now() + |
564 | 14 | FLAGS_max_wait_for_processresponse_before_closing_ms * 1ms; |
565 | 14 | BackoffWaiter waiter(deadline, 100ms); |
566 | 133 | while (using_thread_pool_.load(std::memory_order_acquire) > 0) { |
567 | 120 | if (!waiter.Wait()) { |
568 | 1 | LOG_WITH_PREFIX(DFATAL) |
569 | 1 | << "Timed out waiting for ThreadPoolToken::SubmitFunc() to finish. " |
570 | 1 | << "Number of pending calls: " << using_thread_pool_.load(std::memory_order_acquire); |
571 | 1 | break; |
572 | 1 | } |
573 | 120 | } |
574 | 14 | } |
575 | 75.4k | if (state_ == kPeerClosed) { |
576 | 0 | return; |
577 | 0 | } |
578 | 18.4E | DCHECK(state_ == kPeerRunning || state_ == kPeerStarted) << "Unexpected state: " << state_; |
579 | 75.4k | state_ = kPeerClosed; |
580 | 75.4k | LOG_WITH_PREFIX(INFO) << "Closing peer"; |
581 | 75.4k | } |
582 | | |
583 | 0 | auto retain_self = shared_from_this(); |
584 | | |
585 | 75.4k | queue_->UntrackPeer(peer_pb_.permanent_uuid()); |
586 | 75.4k | } |
587 | | |
588 | 75.4k | Peer::~Peer() { |
589 | 75.4k | std::lock_guard<simple_spinlock> processing_lock(peer_lock_); |
590 | 75.4k | CHECK_EQ(state_, kPeerClosed) << "Peer cannot be implicitly closed"0 ; |
591 | 75.4k | } |
592 | | |
593 | 57.6M | void Peer::CleanRequestOps(ConsensusRequestPB* request) { |
594 | 57.6M | request->mutable_ops()->ExtractSubrange(0, request->ops().size(), nullptr /* elements */); |
595 | 57.6M | } |
596 | | |
597 | | RpcPeerProxy::RpcPeerProxy(HostPort hostport, ConsensusServiceProxyPtr consensus_proxy) |
598 | 1.58M | : hostport_(std::move(hostport)), consensus_proxy_(std::move(consensus_proxy)) { |
599 | 1.58M | } |
600 | | |
601 | | void RpcPeerProxy::UpdateAsync(const ConsensusRequestPB* request, |
602 | | RequestTriggerMode trigger_mode, |
603 | | ConsensusResponsePB* response, |
604 | | rpc::RpcController* controller, |
605 | 25.8M | const rpc::ResponseCallback& callback) { |
606 | 25.8M | controller->set_timeout(MonoDelta::FromMilliseconds(FLAGS_consensus_rpc_timeout_ms)); |
607 | 25.8M | consensus_proxy_->UpdateConsensusAsync(*request, response, controller, callback); |
608 | 25.8M | } |
609 | | |
610 | | void RpcPeerProxy::RequestConsensusVoteAsync(const VoteRequestPB* request, |
611 | | VoteResponsePB* response, |
612 | | rpc::RpcController* controller, |
613 | 1.45M | const rpc::ResponseCallback& callback) { |
614 | 1.45M | consensus_proxy_->RequestConsensusVoteAsync(*request, response, controller, callback); |
615 | 1.45M | } |
616 | | |
617 | | void RpcPeerProxy::RunLeaderElectionAsync(const RunLeaderElectionRequestPB* request, |
618 | | RunLeaderElectionResponsePB* response, |
619 | | rpc::RpcController* controller, |
620 | 10.0k | const rpc::ResponseCallback& callback) { |
621 | 10.0k | controller->set_timeout(MonoDelta::FromMilliseconds(FLAGS_consensus_rpc_timeout_ms)); |
622 | 10.0k | consensus_proxy_->RunLeaderElectionAsync(*request, response, controller, callback); |
623 | 10.0k | } |
624 | | |
625 | | void RpcPeerProxy::LeaderElectionLostAsync(const LeaderElectionLostRequestPB* request, |
626 | | LeaderElectionLostResponsePB* response, |
627 | | rpc::RpcController* controller, |
628 | 77 | const rpc::ResponseCallback& callback) { |
629 | 77 | consensus_proxy_->LeaderElectionLostAsync(*request, response, controller, callback); |
630 | 77 | } |
631 | | |
632 | | void RpcPeerProxy::StartRemoteBootstrap(const StartRemoteBootstrapRequestPB* request, |
633 | | StartRemoteBootstrapResponsePB* response, |
634 | | rpc::RpcController* controller, |
635 | 10.6k | const rpc::ResponseCallback& callback) { |
636 | 10.6k | consensus_proxy_->StartRemoteBootstrapAsync(*request, response, controller, callback); |
637 | 10.6k | } |
638 | | |
639 | 1.54M | RpcPeerProxy::~RpcPeerProxy() {} |
640 | | |
641 | | RpcPeerProxyFactory::RpcPeerProxyFactory( |
642 | | Messenger* messenger, rpc::ProxyCache* proxy_cache, CloudInfoPB from) |
643 | 150k | : messenger_(messenger), proxy_cache_(proxy_cache), from_(std::move(from)) {} |
644 | | |
645 | 1.58M | PeerProxyPtr RpcPeerProxyFactory::NewProxy(const RaftPeerPB& peer_pb) { |
646 | 1.58M | auto hostport = HostPortFromPB(DesiredHostPort(peer_pb, from_)); |
647 | 1.58M | auto proxy = std::make_unique<ConsensusServiceProxy>(proxy_cache_, hostport); |
648 | 1.58M | return std::make_unique<RpcPeerProxy>(std::move(hostport), std::move(proxy)); |
649 | 1.58M | } |
650 | | |
651 | 75.6k | RpcPeerProxyFactory::~RpcPeerProxyFactory() {} |
652 | | |
653 | 423k | rpc::Messenger* RpcPeerProxyFactory::messenger() const { return messenger_; } |
654 | | |
655 | | struct GetNodeInstanceRequest { |
656 | | GetNodeInstanceRequestPB req; |
657 | | GetNodeInstanceResponsePB resp; |
658 | | rpc::RpcController controller; |
659 | | ConsensusServiceProxy proxy; |
660 | | |
661 | | GetNodeInstanceRequest(rpc::ProxyCache* proxy_cache, const HostPort& hostport) |
662 | 22.0k | : proxy(proxy_cache, hostport) {} |
663 | | }; |
664 | | |
665 | | Status SetPermanentUuidForRemotePeer( |
666 | | rpc::ProxyCache* proxy_cache, |
667 | | std::chrono::steady_clock::duration timeout, |
668 | | const std::vector<HostPort>& endpoints, |
669 | 20.8k | RaftPeerPB* remote_peer) { |
670 | | |
671 | 20.8k | DCHECK(!remote_peer->has_permanent_uuid()); |
672 | 20.8k | auto deadline = std::chrono::steady_clock::now() + timeout; |
673 | | |
674 | 20.8k | std::vector<GetNodeInstanceRequest> requests; |
675 | 20.8k | requests.reserve(endpoints.size()); |
676 | 22.0k | for (const auto& hp : endpoints) { |
677 | 22.0k | requests.emplace_back(proxy_cache, hp); |
678 | 22.0k | } |
679 | | |
680 | 20.8k | CountDownLatch latch(requests.size()); |
681 | 20.8k | const auto kMaxWait = 10s; |
682 | 20.8k | BackoffWaiter waiter(deadline, kMaxWait); |
683 | 41.4k | for (;;) { |
684 | 41.4k | latch.Reset(requests.size()); |
685 | 41.4k | std::atomic<GetNodeInstanceRequest*> last_reply{nullptr}; |
686 | 42.6k | for (auto& request : requests) { |
687 | 42.6k | request.controller.Reset(); |
688 | 42.6k | request.controller.set_timeout(kMaxWait); |
689 | 42.6k | VLOG(2) << "Getting uuid from remote peer. Request: " << request.req.ShortDebugString()0 ; |
690 | | |
691 | 42.6k | request.proxy.GetNodeInstanceAsync( |
692 | 42.6k | request.req, &request.resp, &request.controller, |
693 | 42.6k | [&latch, &request, &last_reply] { |
694 | 42.6k | if (!request.controller.status().IsTimedOut()) { |
695 | 42.6k | last_reply.store(&request, std::memory_order_release); |
696 | 42.6k | } |
697 | 42.6k | latch.CountDown(); |
698 | 42.6k | }); |
699 | 42.6k | } |
700 | | |
701 | 41.4k | latch.Wait(); |
702 | | |
703 | 41.5k | for (auto& request : requests) { |
704 | 41.5k | auto status = request.controller.status(); |
705 | 41.5k | if (status.ok()) { |
706 | 20.8k | remote_peer->set_permanent_uuid(request.resp.node_instance().permanent_uuid()); |
707 | 20.8k | remote_peer->set_member_type(PeerMemberType::VOTER); |
708 | 20.8k | if (request.resp.has_registration()) { |
709 | 20.8k | CopyRegistration(request.resp.registration(), remote_peer); |
710 | 20.8k | } else { |
711 | | // Required for backward compatibility. |
712 | 0 | HostPortsToPBs(endpoints, remote_peer->mutable_last_known_private_addr()); |
713 | 0 | } |
714 | 20.8k | return Status::OK(); |
715 | 20.8k | } |
716 | 41.5k | } |
717 | | |
718 | 20.6k | auto* last_reply_value = last_reply.load(std::memory_order_acquire); |
719 | 20.6k | if (last_reply_value == nullptr) { |
720 | 0 | last_reply_value = &requests.front(); |
721 | 0 | } |
722 | | |
723 | 20.6k | LOG(WARNING) << "Error getting permanent uuid from config peer " << yb::ToString(endpoints) |
724 | 20.6k | << ": " << last_reply_value->controller.status(); |
725 | | |
726 | 20.6k | if (last_reply_value->controller.status().IsAborted()) { |
727 | 4 | return last_reply_value->controller.status(); |
728 | 4 | } |
729 | | |
730 | 20.6k | if (!waiter.Wait()) { |
731 | 0 | return STATUS_FORMAT( |
732 | 0 | TimedOut, "Getting permanent uuid from $0 timed out after $1: $2", |
733 | 0 | endpoints, timeout, last_reply_value->controller.status()); |
734 | 0 | } |
735 | | |
736 | 20.6k | LOG(INFO) << "Retrying to get permanent uuid for remote peer: " |
737 | 20.6k | << yb::ToString(endpoints) << " attempt: " << waiter.attempt(); |
738 | 20.6k | } |
739 | 20.8k | } |
740 | | |
741 | | } // namespace consensus |
742 | | } // namespace yb |