YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/consensus/consensus_peers.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include "yb/consensus/consensus_peers.h"
34
35
#include <algorithm>
36
#include <mutex>
37
#include <string>
38
#include <utility>
39
#include <vector>
40
41
#include <boost/optional.hpp>
42
#include <glog/logging.h>
43
44
#include "yb/common/wire_protocol.h"
45
46
#include "yb/consensus/consensus.h"
47
#include "yb/consensus/consensus.proxy.h"
48
#include "yb/consensus/consensus_meta.h"
49
#include "yb/consensus/consensus_queue.h"
50
#include "yb/consensus/replicate_msgs_holder.h"
51
#include "yb/consensus/multi_raft_batcher.h"
52
#include "yb/gutil/strings/substitute.h"
53
54
#include "yb/rpc/periodic.h"
55
#include "yb/rpc/rpc_controller.h"
56
57
#include "yb/tablet/tablet_error.h"
58
#include "yb/tserver/tserver_error.h"
59
60
#include "yb/util/backoff_waiter.h"
61
#include "yb/util/fault_injection.h"
62
#include "yb/util/flag_tags.h"
63
#include "yb/util/format.h"
64
#include "yb/util/logging.h"
65
#include "yb/util/monotime.h"
66
#include "yb/util/net/net_util.h"
67
#include "yb/util/scope_exit.h"
68
#include "yb/util/status_callback.h"
69
#include "yb/util/status_format.h"
70
#include "yb/util/threadpool.h"
71
#include "yb/util/tsan_util.h"
72
73
using namespace std::literals;
74
using namespace std::placeholders;
75
76
DEFINE_int32(consensus_rpc_timeout_ms, 3000,
77
             "Timeout used for all consensus internal RPC communications.");
78
TAG_FLAG(consensus_rpc_timeout_ms, advanced);
79
80
DEFINE_int32(max_wait_for_processresponse_before_closing_ms,
81
             yb::RegularBuildVsSanitizers(5000, 60000),
82
             "Maximum amount of time we will wait in Peer::Close() for Peer::ProcessResponse() to "
83
             "finish before returning proceding to close the Peer and return");
84
TAG_FLAG(max_wait_for_processresponse_before_closing_ms, advanced);
85
86
DECLARE_int32(raft_heartbeat_interval_ms);
87
88
DECLARE_bool(enable_multi_raft_heartbeat_batcher);
89
90
DEFINE_test_flag(double, fault_crash_on_leader_request_fraction, 0.0,
91
                 "Fraction of the time when the leader will crash just before sending an "
92
                 "UpdateConsensus RPC.");
93
94
DEFINE_test_flag(int32, delay_removing_peer_with_failed_tablet_secs, 0,
95
                 "If greater than 0, Peer::ProcessResponse will sleep after receiving a response "
96
                 "indicating that a tablet is in the FAILED state, and before marking this peer "
97
                 "as failed.");
98
99
// Allow for disabling remote bootstrap in unit tests where we want to test
100
// certain scenarios without triggering bootstrap of a remote peer.
101
DEFINE_test_flag(bool, enable_remote_bootstrap, true,
102
                 "Whether remote bootstrap will be initiated by the leader when it "
103
                 "detects that a follower is out of date or does not have a tablet "
104
                 "replica.");
105
106
DECLARE_int32(TEST_log_change_config_every_n);
107
108
namespace yb {
109
namespace consensus {
110
111
using log::Log;
112
using log::LogEntryBatch;
113
using std::shared_ptr;
114
using rpc::Messenger;
115
using rpc::PeriodicTimer;
116
using rpc::RpcController;
117
using strings::Substitute;
118
119
Peer::Peer(
120
    const RaftPeerPB& peer_pb, string tablet_id, string leader_uuid, PeerProxyPtr proxy,
121
    PeerMessageQueue* queue, MultiRaftHeartbeatBatcherPtr multi_raft_batcher,
122
    ThreadPoolToken* raft_pool_token, Consensus* consensus, rpc::Messenger* messenger)
123
    : tablet_id_(std::move(tablet_id)),
124
      leader_uuid_(std::move(leader_uuid)),
125
      peer_pb_(peer_pb),
126
      proxy_(std::move(proxy)),
127
      queue_(queue),
128
      multi_raft_batcher_(std::move(multi_raft_batcher)),
129
      raft_pool_token_(raft_pool_token),
130
      consensus_(consensus),
131
123k
      messenger_(messenger) {}
132
133
1
void Peer::SetTermForTest(int term) {
134
1
  update_response_.set_responder_term(term);
135
1
}
136
137
123k
Status Peer::Init() {
138
123k
  std::lock_guard<simple_spinlock> lock(peer_lock_);
139
123k
  queue_->TrackPeer(peer_pb_.permanent_uuid());
140
  // Capture a weak_ptr reference into the functor so it can safely handle
141
  // outliving the peer.
142
123k
  std::weak_ptr<Peer> weak_peer = shared_from_this();
143
123k
  heartbeater_ = PeriodicTimer::Create(
144
123k
      messenger_,
145
11.0M
      [weak_peer]() {
146
11.0M
        if (auto 
p11.0M
= weak_peer.lock()) {
147
11.0M
          Status s = p->SignalRequest(RequestTriggerMode::kAlwaysSend);
148
11.0M
        }
149
11.0M
      },
150
123k
      MonoDelta::FromMilliseconds(FLAGS_raft_heartbeat_interval_ms));
151
123k
  heartbeater_->Start();
152
123k
  state_ = kPeerStarted;
153
123k
  return Status::OK();
154
123k
}
155
156
31.8M
Status Peer::SignalRequest(RequestTriggerMode trigger_mode) {
157
  // If the peer is currently sending, return Status::OK().
158
  // If there are new requests in the queue we'll get them on ProcessResponse().
159
31.8M
  auto performing_update_lock = LockPerformingUpdate(std::try_to_lock);
160
31.8M
  if (!performing_update_lock.owns_lock()) {
161
6.14M
    return Status::OK();
162
6.14M
  }
163
164
25.6M
  {
165
25.6M
    auto processing_lock = StartProcessingUnlocked();
166
25.6M
    if (!processing_lock.owns_lock()) {
167
4
      return STATUS(IllegalState, "Peer was closed.");
168
4
    }
169
170
    // For the first request sent by the peer, we send it even if the queue is empty, which it will
171
    // always appear to be for the first request, since this is the negotiation round.
172
25.6M
    if (PREDICT_FALSE(state_ == kPeerStarted)) {
173
123k
      trigger_mode = RequestTriggerMode::kAlwaysSend;
174
123k
      state_ = kPeerRunning;
175
123k
    }
176
25.6M
    DCHECK_EQ(state_, kPeerRunning);
177
178
    // If our last request generated an error, and this is not a normal heartbeat request (i.e.
179
    // we're not forcing a request even if the queue is empty, unlike we do during heartbeats),
180
    // then don't send the "per-RPC" request. Instead, we'll wait for the heartbeat.
181
    //
182
    // TODO: we could consider looking at the number of consecutive failed attempts, and instead of
183
    // ignoring the signal, ask the heartbeater to "expedite" the next heartbeat in order to achieve
184
    // something like exponential backoff after an error. As it is implemented today, any transient
185
    // error will result in a latency blip as long as the heartbeat period.
186
25.6M
    if (failed_attempts_ > 0 && 
trigger_mode == RequestTriggerMode::kNonEmptyOnly363k
) {
187
81.2k
      return Status::OK();
188
81.2k
    }
189
190
25.5M
    using_thread_pool_.fetch_add(1, std::memory_order_acq_rel);
191
25.5M
  }
192
0
  auto status = raft_pool_token_->SubmitFunc(
193
25.5M
      std::bind(&Peer::SendNextRequest, shared_from_this(), trigger_mode));
194
25.5M
  using_thread_pool_.fetch_sub(1, std::memory_order_acq_rel);
195
25.6M
  if (
status.ok()25.5M
) {
196
25.6M
    performing_update_lock.release();
197
25.6M
  }
198
25.5M
  return status;
199
25.6M
}
200
201
28.8M
void Peer::SendNextRequest(RequestTriggerMode trigger_mode) {
202
28.8M
  auto retain_self = shared_from_this();
203
28.8M
  DCHECK
(performing_update_mutex_.is_locked()) << "Cannot send request"790
;
204
205
28.8M
  auto performing_update_lock = LockPerformingUpdate(std::adopt_lock);
206
28.8M
  auto processing_lock = StartProcessingUnlocked();
207
28.8M
  if (!processing_lock.owns_lock()) {
208
52
    return;
209
52
  }
210
  // Since there's a couple of return paths from this function, setup a cleanup, in case we fill in
211
  // ops inside update_request_, but do not get to use them.
212
28.8M
  bool needs_cleanup = true;
213
28.8M
  ScopeExit([&needs_cleanup, this](){
214
28.8M
    if (needs_cleanup) {
215
      // Since we will not be using update_request_, we should cleanup the reserved ops.
216
28.8M
      CleanRequestOps(&update_request_);
217
28.8M
    }
218
28.8M
  });
219
220
  // The peer has no pending request nor is sending: send the request.
221
28.8M
  bool needs_remote_bootstrap = false;
222
28.8M
  bool last_exchange_successful = false;
223
28.8M
  PeerMemberType member_type = PeerMemberType::UNKNOWN_MEMBER_TYPE;
224
28.8M
  int64_t commit_index_before = update_request_.has_committed_op_id() ?
225
28.6M
      update_request_.committed_op_id().index() : 
kMinimumOpIdIndex138k
;
226
28.8M
  ReplicateMsgsHolder msgs_holder;
227
28.8M
  Status s = queue_->RequestForPeer(
228
28.8M
      peer_pb_.permanent_uuid(), &update_request_, &msgs_holder, &needs_remote_bootstrap,
229
28.8M
      &member_type, &last_exchange_successful);
230
28.8M
  int64_t commit_index_after = update_request_.has_committed_op_id() ?
231
28.8M
      update_request_.committed_op_id().index() : 
kMinimumOpIdIndex21.0k
;
232
233
28.8M
  if (PREDICT_FALSE(!s.ok())) {
234
69
    LOG_WITH_PREFIX(INFO) << "Could not obtain request from queue for peer: " << s;
235
69
    return;
236
69
  }
237
238
28.8M
  if (PREDICT_FALSE(needs_remote_bootstrap)) {
239
10.7k
    Status status;
240
10.7k
    if (!FLAGS_TEST_enable_remote_bootstrap) {
241
60
      failed_attempts_++;
242
60
      status = STATUS(NotSupported, "remote bootstrap is disabled");
243
10.6k
    } else {
244
10.6k
      status = queue_->GetRemoteBootstrapRequestForPeer(peer_pb_.permanent_uuid(), &rb_request_);
245
10.6k
      if (!consensus_->split_parent_tablet_id().empty()) {
246
87
        rb_request_.set_split_parent_tablet_id(consensus_->split_parent_tablet_id());
247
87
      }
248
10.6k
    }
249
10.7k
    if (!status.ok()) {
250
60
      LOG_WITH_PREFIX(WARNING) << "Unable to generate remote bootstrap request for peer: "
251
60
                               << status;
252
60
      return;
253
60
    }
254
255
10.6k
    using_thread_pool_.fetch_add(1, std::memory_order_acq_rel);
256
10.6k
    s = SendRemoteBootstrapRequest();
257
10.6k
    using_thread_pool_.fetch_sub(1, std::memory_order_acq_rel);
258
10.6k
    if (s.ok()) {
259
10.6k
      performing_update_lock.release();
260
10.6k
    }
261
10.6k
    return;
262
10.7k
  }
263
264
  // If the peer doesn't need remote bootstrap, but it is a PRE_VOTER or PRE_OBSERVER in the config,
265
  // we need to promote it.
266
28.8M
  if (last_exchange_successful &&
267
28.8M
      
(28.5M
member_type == PeerMemberType::PRE_VOTER28.5M
||
member_type == PeerMemberType::PRE_OBSERVER25.6M
)) {
268
2.91M
    if (PREDICT_TRUE(consensus_)) {
269
2.91M
      auto uuid = peer_pb_.permanent_uuid();
270
      // Remove these here, before we drop the locks.
271
2.91M
      needs_cleanup = false;
272
2.91M
      CleanRequestOps(&update_request_);
273
2.91M
      processing_lock.unlock();
274
2.91M
      performing_update_lock.unlock();
275
2.91M
      consensus::ChangeConfigRequestPB req;
276
2.91M
      consensus::ChangeConfigResponsePB resp;
277
278
2.91M
      req.set_tablet_id(tablet_id_);
279
2.91M
      req.set_type(consensus::CHANGE_ROLE);
280
2.91M
      RaftPeerPB *peer = req.mutable_server();
281
2.91M
      peer->set_permanent_uuid(peer_pb_.permanent_uuid());
282
283
2.91M
      boost::optional<tserver::TabletServerErrorPB::Code> error_code;
284
285
      // If another ChangeConfig is being processed, our request will be rejected.
286
2.91M
      YB_LOG_EVERY_N(INFO, FLAGS_TEST_log_change_config_every_n)
287
1.03k
          << "Sending ChangeConfig request to promote peer";
288
2.91M
      auto status = consensus_->ChangeConfig(req, &DoNothingStatusCB, &error_code);
289
2.91M
      if (PREDICT_FALSE(!status.ok())) {
290
2.91M
        YB_LOG_EVERY_N(INFO, FLAGS_TEST_log_change_config_every_n)
291
974
            << "Unable to change role for peer " << uuid << ": " << status;
292
        // Since we released the semaphore, we need to call SignalRequest again to send a message
293
2.91M
        status = SignalRequest(RequestTriggerMode::kAlwaysSend);
294
2.91M
        if (PREDICT_FALSE(!status.ok())) {
295
3
          LOG(WARNING) << "Unexpected error when trying to send request: "
296
3
                       << status;
297
3
        }
298
2.91M
      }
299
2.91M
      return;
300
2.91M
    }
301
2.91M
  }
302
303
25.9M
  if (update_request_.tablet_id().empty()) {
304
123k
    update_request_.set_tablet_id(tablet_id_);
305
123k
    update_request_.set_caller_uuid(leader_uuid_);
306
123k
    update_request_.set_dest_uuid(peer_pb_.permanent_uuid());
307
123k
  }
308
309
25.9M
  const bool req_is_heartbeat = update_request_.ops_size() == 0 &&
310
25.9M
                                
commit_index_after <= commit_index_before17.5M
;
311
312
  // If the queue is empty, check if we were told to send a status-only message (which is what
313
  // happens during heartbeats). If not, just return.
314
25.9M
  if (PREDICT_FALSE(req_is_heartbeat && trigger_mode == RequestTriggerMode::kNonEmptyOnly)) {
315
23.2k
    queue_->RequestWasNotSent(peer_pb_.permanent_uuid());
316
23.2k
    return;
317
23.2k
  }
318
319
  // If we're actually sending ops there's no need to heartbeat for a while, reset the heartbeater.
320
25.8M
  if (!req_is_heartbeat) {
321
15.0M
    heartbeater_->Snooze();
322
15.0M
  }
323
324
25.8M
  MAYBE_FAULT(FLAGS_TEST_fault_crash_on_leader_request_fraction);
325
326
  // We will cleanup ops from request in ProcessResponse, because otherwise there could be race
327
  // condition. When rest of this function is running in parallel to ProcessResponse.
328
25.8M
  needs_cleanup = false;
329
25.8M
  msgs_holder.ReleaseOps();
330
331
  // Heartbeat batching allows for network layer savings by reducing CPU cycles
332
  // spent on computing state, context switching (sending/receiving RPC's)
333
  // and serializing/deserializing protobufs.
334
25.8M
  if (req_is_heartbeat && 
multi_raft_batcher_10.7M
335
25.8M
      && 
FLAGS_enable_multi_raft_heartbeat_batcher0
) {
336
0
    auto performing_heartbeat_lock = LockPerformingHeartbeat(std::try_to_lock);
337
0
    if (!performing_heartbeat_lock.owns_lock()) {
338
      // Outstanding heartbeat already in flight so don't schedule another.
339
0
      return;
340
0
    }
341
0
    heartbeat_request_.Swap(&update_request_);
342
0
    heartbeat_response_.Swap(&update_response_);
343
0
    cur_heartbeat_id_++;
344
0
    processing_lock.unlock();
345
0
    performing_update_lock.unlock();
346
0
    performing_heartbeat_lock.release();
347
0
    multi_raft_batcher_->AddRequestToBatch(&heartbeat_request_, &heartbeat_response_,
348
0
                                           std::bind(&Peer::ProcessHeartbeatResponse,
349
0
                                                     retain_self, _1));
350
0
    return;
351
0
  }
352
353
  // The minimum_viable_heartbeat_ represents the
354
  // heartbeat that is sent immediately following this op.
355
  // Any heartbeats which are outstanding are considered no longer viable.
356
  // It's simpler for us to drop the responses for these heartbeats
357
  // rather than attempt to ensure we process the responses of the outstanding heartbeat
358
  // and this new request in the same order they were received by the remote peer.
359
  // TODO: Remove batched but unsent heartbeats (in the respective MultiRaftBatcher) in this case
360
25.8M
  minimum_viable_heartbeat_ = cur_heartbeat_id_ + 1;
361
25.8M
  processing_lock.unlock();
362
25.8M
  performing_update_lock.release();
363
25.8M
  controller_.set_invoke_callback_mode(rpc::InvokeCallbackMode::kThreadPoolHigh);
364
25.8M
  proxy_->UpdateAsync(&update_request_, trigger_mode, &update_response_, &controller_,
365
25.8M
                      std::bind(&Peer::ProcessResponse, retain_self));
366
25.8M
}
367
368
80.4M
std::unique_lock<simple_spinlock> Peer::StartProcessingUnlocked() {
369
80.4M
  std::unique_lock<simple_spinlock> lock(peer_lock_);
370
371
80.4M
  if (state_ == kPeerClosed) {
372
3.68k
    lock.unlock();
373
3.68k
  }
374
375
80.4M
  return lock;
376
80.4M
}
377
378
bool Peer::ProcessResponseWithStatus(const Status& status,
379
25.8M
                                     ConsensusResponsePB* response) {
380
25.8M
  if (!status.ok()) {
381
276k
    if (status.IsRemoteError()) {
382
      // Most controller errors are caused by network issues or corner cases like shutdown and
383
      // failure to serialize a protobuf. Therefore, we generally consider these errors to indicate
384
      // an unreachable peer.  However, a RemoteError wraps some other error propagated from the
385
      // remote peer, so we know the remote is alive. Therefore, we will let the queue know that the
386
      // remote is responsive.
387
12
      queue_->NotifyPeerIsResponsiveDespiteError(peer_pb_.permanent_uuid());
388
12
    }
389
276k
    ProcessResponseError(status);
390
276k
    return false;
391
276k
  }
392
393
25.5M
  if (response->has_propagated_hybrid_time()) {
394
25.5M
    queue_->clock()->Update(HybridTime(response->propagated_hybrid_time()));
395
25.5M
  }
396
397
  // We should try to evict a follower which returns a WRONG UUID error.
398
25.5M
  if (response->has_error() &&
399
25.5M
      
response->error().code() == tserver::TabletServerErrorPB::WRONG_SERVER_UUID17.8k
) {
400
95
    queue_->NotifyObserversOfFailedFollower(
401
95
        peer_pb_.permanent_uuid(),
402
95
        Substitute("Leader communication with peer $0 received error $1, will try to "
403
95
                   "evict peer", peer_pb_.permanent_uuid(),
404
95
                   response->error().ShortDebugString()));
405
95
    ProcessResponseError(StatusFromPB(response->error().status()));
406
95
    return false;
407
95
  }
408
409
25.5M
  auto s = ResponseStatus(*response);
410
25.5M
  if (!s.ok() &&
411
25.5M
      
tserver::TabletServerError(s) == tserver::TabletServerErrorPB::TABLET_NOT_RUNNING17.7k
&&
412
25.5M
      
tablet::RaftGroupStateError(s) == tablet::RaftGroupStatePB::FAILED6.31k
) {
413
1
    if (PREDICT_FALSE(FLAGS_TEST_delay_removing_peer_with_failed_tablet_secs > 0)) {
414
1
      LOG(INFO) << "TEST: Sleeping for " << FLAGS_TEST_delay_removing_peer_with_failed_tablet_secs
415
1
                << " seconds";
416
1
      SleepFor(MonoDelta::FromSeconds(FLAGS_TEST_delay_removing_peer_with_failed_tablet_secs));
417
1
    }
418
1
    queue_->NotifyObserversOfFailedFollower(
419
1
        peer_pb_.permanent_uuid(),
420
1
        Format("Tablet in peer $0 is in FAILED state, will try to evict peer",
421
1
               peer_pb_.permanent_uuid()));
422
1
    ProcessResponseError(StatusFromPB(response->error().status()));
423
1
  }
424
425
  // Response should be either error or status.
426
18.4E
  LOG_IF(DFATAL, response->has_error() == response->has_status())
427
18.4E
    << "Invalid response: " << response->ShortDebugString();
428
429
  // Pass through errors we can respond to, like not found, since in that case
430
  // we will need to remotely bootstrap. TODO: Handle DELETED response once implemented.
431
25.5M
  if ((response->has_error() &&
432
25.5M
      
response->error().code() != tserver::TabletServerErrorPB::TABLET_NOT_FOUND17.7k
) ||
433
25.5M
      
(25.5M
response->status().has_error()25.5M
&&
434
25.5M
          
response->status().error().code() == consensus::ConsensusErrorPB::CANNOT_PREPARE121k
)) {
435
    // Again, let the queue know that the remote is still responsive, since we will not be sending
436
    // this error response through to the queue.
437
7.14k
    queue_->NotifyPeerIsResponsiveDespiteError(peer_pb_.permanent_uuid());
438
7.14k
    ProcessResponseError(StatusFromPB(response->error().status()));
439
7.14k
    return false;
440
7.14k
  }
441
442
25.5M
  failed_attempts_ = 0;
443
25.5M
  return queue_->ResponseFromPeer(peer_pb_.permanent_uuid(), *response);
444
25.5M
}
445
446
25.9M
void Peer::ProcessResponse() {
447
25.9M
  DCHECK
(performing_update_mutex_.is_locked()) << "Got a response when nothing was pending."5.49k
;
448
25.9M
  auto status = controller_.status();
449
25.9M
  if (status.ok()) {
450
25.6M
    status = controller_.thread_pool_failure();
451
25.6M
  }
452
25.9M
  controller_.Reset();
453
25.9M
  CleanRequestOps(&update_request_);
454
455
25.9M
  auto performing_update_lock = LockPerformingUpdate(std::adopt_lock);
456
25.9M
  auto processing_lock = StartProcessingUnlocked();
457
25.9M
  if (!processing_lock.owns_lock()) {
458
1.73k
    return;
459
1.73k
  }
460
25.9M
  bool more_pending = ProcessResponseWithStatus(status, &update_response_);
461
462
25.9M
  if (more_pending) {
463
3.24M
    processing_lock.unlock();
464
3.24M
    performing_update_lock.release();
465
3.24M
    SendNextRequest(RequestTriggerMode::kAlwaysSend);
466
3.24M
  }
467
25.9M
}
468
469
0
void Peer::ProcessHeartbeatResponse(const Status& status) {
470
0
  DCHECK(performing_heartbeat_mutex_.is_locked()) << "Got a heartbeat when nothing was pending.";
471
0
  DCHECK(heartbeat_request_.ops_size() == 0) << "Got a heartbeat with a non-zero number of ops.";
472
473
0
  auto performing_heartbeat_lock = LockPerformingHeartbeat(std::adopt_lock);
474
0
  auto processing_lock = StartProcessingUnlocked();
475
0
  if (!processing_lock.owns_lock()) {
476
0
    return;
477
0
  }
478
479
0
  if (cur_heartbeat_id_ < minimum_viable_heartbeat_) {
480
    // If we receive a response from a heartbeat that was sent before a valid op
481
    // then we should discard it as the op is more recent and the heartbeat should not
482
    // be modifying any state.
483
    // TODO: Add a metric to track the frequency of this
484
0
    return;
485
0
  }
486
0
  bool more_pending = ProcessResponseWithStatus(status, &heartbeat_response_);
487
488
0
  if (more_pending) {
489
0
    auto performing_update_lock = LockPerformingUpdate(std::try_to_lock);
490
0
    if (!performing_update_lock.owns_lock()) {
491
0
      return;
492
0
    }
493
0
    performing_heartbeat_lock.unlock();
494
0
    processing_lock.unlock();
495
0
    performing_update_lock.release();
496
0
    SendNextRequest(RequestTriggerMode::kAlwaysSend);
497
0
  }
498
0
}
499
500
10.6k
Status Peer::SendRemoteBootstrapRequest() {
501
10.6k
  
YB_LOG_WITH_PREFIX_EVERY_N_SECS484
(INFO, 30) << "Sending request to remotely bootstrap"484
;
502
10.6k
  controller_.set_invoke_callback_mode(rpc::InvokeCallbackMode::kThreadPoolNormal);
503
10.6k
  return raft_pool_token_->SubmitFunc([retain_self = shared_from_this()]() {
504
10.6k
    retain_self->proxy_->StartRemoteBootstrap(
505
10.6k
      &retain_self->rb_request_, &retain_self->rb_response_, &retain_self->controller_,
506
10.6k
      std::bind(&Peer::ProcessRemoteBootstrapResponse, retain_self));
507
10.6k
  });
508
10.6k
}
509
510
10.6k
void Peer::ProcessRemoteBootstrapResponse() {
511
10.6k
  Status status = controller_.status();
512
10.6k
  controller_.Reset();
513
514
10.6k
  auto performing_update_lock = LockPerformingUpdate(std::adopt_lock);
515
10.6k
  auto processing_lock = StartProcessingUnlocked();
516
10.6k
  if (!processing_lock.owns_lock()) {
517
1.88k
    return;
518
1.88k
  }
519
520
8.73k
  if (!status.ok()) {
521
117
    LOG_WITH_PREFIX(WARNING) << "Unable to begin remote bootstrap on peer: " << status;
522
117
    return;
523
117
  }
524
525
8.61k
  if (rb_response_.has_error()) {
526
8.61k
    const auto error_code = rb_response_.error().code();
527
8.61k
    if (
528
8.61k
        error_code == tserver::TabletServerErrorPB::ALREADY_IN_PROGRESS ||
529
8.61k
        
error_code == tserver::TabletServerErrorPB::TABLET_SPLIT_PARENT_STILL_LIVE84
) {
530
8.61k
      queue_->NotifyPeerIsResponsiveDespiteError(peer_pb_.permanent_uuid());
531
8.61k
      
YB_LOG_WITH_PREFIX_EVERY_N_SECS57
(WARNING, 30)
532
57
        << ":::Unable to begin remote bootstrap on peer: " << rb_response_.ShortDebugString();
533
8.61k
    } else {
534
0
      LOG_WITH_PREFIX(WARNING) << "Unable to begin remote bootstrap on peer: "
535
0
                               << rb_response_.ShortDebugString();
536
0
    }
537
8.61k
  }
538
8.61k
}
539
540
283k
void Peer::ProcessResponseError(const Status& status) {
541
283k
  DCHECK(performing_update_mutex_.is_locked() || performing_heartbeat_mutex_.is_locked());
542
283k
  failed_attempts_++;
543
283k
  
YB_LOG_WITH_PREFIX_EVERY_N_SECS24.0k
(WARNING, 5) << "Couldn't send request. "
544
24.0k
      << " Status: " << status.ToString() << ". Retrying in the next heartbeat period."
545
24.0k
      << " Already tried " << failed_attempts_ << " times. State: " << state_;
546
283k
}
547
548
100k
string Peer::LogPrefix() const {
549
100k
  return Format("T $0 P $1 -> Peer $2 ($3, $4): ",
550
100k
                tablet_id_, leader_uuid_, peer_pb_.permanent_uuid(),
551
100k
                peer_pb_.last_known_private_addr(), peer_pb_.last_known_broadcast_addr());
552
100k
}
553
554
75.4k
void Peer::Close() {
555
75.4k
  if (heartbeater_) {
556
75.4k
    heartbeater_->Stop();
557
75.4k
  }
558
559
  // If the peer is already closed return.
560
75.4k
  {
561
75.4k
    std::lock_guard<simple_spinlock> processing_lock(peer_lock_);
562
75.4k
    if (using_thread_pool_.load(std::memory_order_acquire) > 0) {
563
14
      auto deadline = std::chrono::steady_clock::now() +
564
14
                      FLAGS_max_wait_for_processresponse_before_closing_ms * 1ms;
565
14
      BackoffWaiter waiter(deadline, 100ms);
566
133
      while (using_thread_pool_.load(std::memory_order_acquire) > 0) {
567
120
        if (!waiter.Wait()) {
568
1
          LOG_WITH_PREFIX(DFATAL)
569
1
              << "Timed out waiting for ThreadPoolToken::SubmitFunc() to finish. "
570
1
              << "Number of pending calls: " << using_thread_pool_.load(std::memory_order_acquire);
571
1
          break;
572
1
        }
573
120
      }
574
14
    }
575
75.4k
    if (state_ == kPeerClosed) {
576
0
      return;
577
0
    }
578
18.4E
    DCHECK(state_ == kPeerRunning || state_ == kPeerStarted) << "Unexpected state: " << state_;
579
75.4k
    state_ = kPeerClosed;
580
75.4k
    LOG_WITH_PREFIX(INFO) << "Closing peer";
581
75.4k
  }
582
583
0
  auto retain_self = shared_from_this();
584
585
75.4k
  queue_->UntrackPeer(peer_pb_.permanent_uuid());
586
75.4k
}
587
588
75.4k
Peer::~Peer() {
589
75.4k
  std::lock_guard<simple_spinlock> processing_lock(peer_lock_);
590
75.4k
  CHECK_EQ
(state_, kPeerClosed) << "Peer cannot be implicitly closed"0
;
591
75.4k
}
592
593
57.6M
void Peer::CleanRequestOps(ConsensusRequestPB* request) {
594
57.6M
  request->mutable_ops()->ExtractSubrange(0, request->ops().size(), nullptr /* elements */);
595
57.6M
}
596
597
RpcPeerProxy::RpcPeerProxy(HostPort hostport, ConsensusServiceProxyPtr consensus_proxy)
598
1.58M
    : hostport_(std::move(hostport)), consensus_proxy_(std::move(consensus_proxy)) {
599
1.58M
}
600
601
void RpcPeerProxy::UpdateAsync(const ConsensusRequestPB* request,
602
                               RequestTriggerMode trigger_mode,
603
                               ConsensusResponsePB* response,
604
                               rpc::RpcController* controller,
605
25.8M
                               const rpc::ResponseCallback& callback) {
606
25.8M
  controller->set_timeout(MonoDelta::FromMilliseconds(FLAGS_consensus_rpc_timeout_ms));
607
25.8M
  consensus_proxy_->UpdateConsensusAsync(*request, response, controller, callback);
608
25.8M
}
609
610
void RpcPeerProxy::RequestConsensusVoteAsync(const VoteRequestPB* request,
611
                                             VoteResponsePB* response,
612
                                             rpc::RpcController* controller,
613
1.45M
                                             const rpc::ResponseCallback& callback) {
614
1.45M
  consensus_proxy_->RequestConsensusVoteAsync(*request, response, controller, callback);
615
1.45M
}
616
617
void RpcPeerProxy::RunLeaderElectionAsync(const RunLeaderElectionRequestPB* request,
618
                                          RunLeaderElectionResponsePB* response,
619
                                          rpc::RpcController* controller,
620
10.0k
                                          const rpc::ResponseCallback& callback) {
621
10.0k
  controller->set_timeout(MonoDelta::FromMilliseconds(FLAGS_consensus_rpc_timeout_ms));
622
10.0k
  consensus_proxy_->RunLeaderElectionAsync(*request, response, controller, callback);
623
10.0k
}
624
625
void RpcPeerProxy::LeaderElectionLostAsync(const LeaderElectionLostRequestPB* request,
626
                                           LeaderElectionLostResponsePB* response,
627
                                           rpc::RpcController* controller,
628
77
                                           const rpc::ResponseCallback& callback) {
629
77
  consensus_proxy_->LeaderElectionLostAsync(*request, response, controller, callback);
630
77
}
631
632
void RpcPeerProxy::StartRemoteBootstrap(const StartRemoteBootstrapRequestPB* request,
633
                                        StartRemoteBootstrapResponsePB* response,
634
                                        rpc::RpcController* controller,
635
10.6k
                                        const rpc::ResponseCallback& callback) {
636
10.6k
  consensus_proxy_->StartRemoteBootstrapAsync(*request, response, controller, callback);
637
10.6k
}
638
639
1.54M
RpcPeerProxy::~RpcPeerProxy() {}
640
641
RpcPeerProxyFactory::RpcPeerProxyFactory(
642
    Messenger* messenger, rpc::ProxyCache* proxy_cache, CloudInfoPB from)
643
150k
    : messenger_(messenger), proxy_cache_(proxy_cache), from_(std::move(from)) {}
644
645
1.58M
PeerProxyPtr RpcPeerProxyFactory::NewProxy(const RaftPeerPB& peer_pb) {
646
1.58M
  auto hostport = HostPortFromPB(DesiredHostPort(peer_pb, from_));
647
1.58M
  auto proxy = std::make_unique<ConsensusServiceProxy>(proxy_cache_, hostport);
648
1.58M
  return std::make_unique<RpcPeerProxy>(std::move(hostport), std::move(proxy));
649
1.58M
}
650
651
75.6k
RpcPeerProxyFactory::~RpcPeerProxyFactory() {}
652
653
423k
rpc::Messenger* RpcPeerProxyFactory::messenger() const { return messenger_; }
654
655
struct GetNodeInstanceRequest {
656
  GetNodeInstanceRequestPB req;
657
  GetNodeInstanceResponsePB resp;
658
  rpc::RpcController controller;
659
  ConsensusServiceProxy proxy;
660
661
  GetNodeInstanceRequest(rpc::ProxyCache* proxy_cache, const HostPort& hostport)
662
22.0k
      : proxy(proxy_cache, hostport) {}
663
};
664
665
Status SetPermanentUuidForRemotePeer(
666
    rpc::ProxyCache* proxy_cache,
667
    std::chrono::steady_clock::duration timeout,
668
    const std::vector<HostPort>& endpoints,
669
20.8k
    RaftPeerPB* remote_peer) {
670
671
20.8k
  DCHECK(!remote_peer->has_permanent_uuid());
672
20.8k
  auto deadline = std::chrono::steady_clock::now() + timeout;
673
674
20.8k
  std::vector<GetNodeInstanceRequest> requests;
675
20.8k
  requests.reserve(endpoints.size());
676
22.0k
  for (const auto& hp : endpoints) {
677
22.0k
    requests.emplace_back(proxy_cache, hp);
678
22.0k
  }
679
680
20.8k
  CountDownLatch latch(requests.size());
681
20.8k
  const auto kMaxWait = 10s;
682
20.8k
  BackoffWaiter waiter(deadline, kMaxWait);
683
41.4k
  for (;;) {
684
41.4k
    latch.Reset(requests.size());
685
41.4k
    std::atomic<GetNodeInstanceRequest*> last_reply{nullptr};
686
42.6k
    for (auto& request : requests) {
687
42.6k
      request.controller.Reset();
688
42.6k
      request.controller.set_timeout(kMaxWait);
689
42.6k
      VLOG
(2) << "Getting uuid from remote peer. Request: " << request.req.ShortDebugString()0
;
690
691
42.6k
      request.proxy.GetNodeInstanceAsync(
692
42.6k
          request.req, &request.resp, &request.controller,
693
42.6k
          [&latch, &request, &last_reply] {
694
42.6k
        if (!request.controller.status().IsTimedOut()) {
695
42.6k
          last_reply.store(&request, std::memory_order_release);
696
42.6k
        }
697
42.6k
        latch.CountDown();
698
42.6k
      });
699
42.6k
    }
700
701
41.4k
    latch.Wait();
702
703
41.5k
    for (auto& request : requests) {
704
41.5k
      auto status = request.controller.status();
705
41.5k
      if (status.ok()) {
706
20.8k
        remote_peer->set_permanent_uuid(request.resp.node_instance().permanent_uuid());
707
20.8k
        remote_peer->set_member_type(PeerMemberType::VOTER);
708
20.8k
        if (request.resp.has_registration()) {
709
20.8k
          CopyRegistration(request.resp.registration(), remote_peer);
710
20.8k
        } else {
711
          // Required for backward compatibility.
712
0
          HostPortsToPBs(endpoints, remote_peer->mutable_last_known_private_addr());
713
0
        }
714
20.8k
        return Status::OK();
715
20.8k
      }
716
41.5k
    }
717
718
20.6k
    auto* last_reply_value = last_reply.load(std::memory_order_acquire);
719
20.6k
    if (last_reply_value == nullptr) {
720
0
      last_reply_value = &requests.front();
721
0
    }
722
723
20.6k
    LOG(WARNING) << "Error getting permanent uuid from config peer " << yb::ToString(endpoints)
724
20.6k
                 << ": " << last_reply_value->controller.status();
725
726
20.6k
    if (last_reply_value->controller.status().IsAborted()) {
727
4
      return last_reply_value->controller.status();
728
4
    }
729
730
20.6k
    if (!waiter.Wait()) {
731
0
      return STATUS_FORMAT(
732
0
          TimedOut, "Getting permanent uuid from $0 timed out after $1: $2",
733
0
          endpoints, timeout, last_reply_value->controller.status());
734
0
    }
735
736
20.6k
    LOG(INFO) << "Retrying to get permanent uuid for remote peer: "
737
20.6k
              << yb::ToString(endpoints) << " attempt: " << waiter.attempt();
738
20.6k
  }
739
20.8k
}
740
741
}  // namespace consensus
742
}  // namespace yb