YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/consensus/consensus_peers.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include "yb/consensus/consensus_peers.h"
34
35
#include <algorithm>
36
#include <mutex>
37
#include <string>
38
#include <utility>
39
#include <vector>
40
41
#include <boost/optional.hpp>
42
#include <glog/logging.h>
43
44
#include "yb/common/wire_protocol.h"
45
46
#include "yb/consensus/consensus.h"
47
#include "yb/consensus/consensus.proxy.h"
48
#include "yb/consensus/consensus_meta.h"
49
#include "yb/consensus/consensus_queue.h"
50
#include "yb/consensus/replicate_msgs_holder.h"
51
#include "yb/consensus/multi_raft_batcher.h"
52
#include "yb/gutil/strings/substitute.h"
53
54
#include "yb/rpc/periodic.h"
55
#include "yb/rpc/rpc_controller.h"
56
57
#include "yb/tablet/tablet_error.h"
58
#include "yb/tserver/tserver_error.h"
59
60
#include "yb/util/backoff_waiter.h"
61
#include "yb/util/fault_injection.h"
62
#include "yb/util/flag_tags.h"
63
#include "yb/util/format.h"
64
#include "yb/util/logging.h"
65
#include "yb/util/monotime.h"
66
#include "yb/util/net/net_util.h"
67
#include "yb/util/scope_exit.h"
68
#include "yb/util/status_callback.h"
69
#include "yb/util/status_format.h"
70
#include "yb/util/threadpool.h"
71
#include "yb/util/tsan_util.h"
72
73
using namespace std::literals;
74
using namespace std::placeholders;
75
76
DEFINE_int32(consensus_rpc_timeout_ms, 3000,
77
             "Timeout used for all consensus internal RPC communications.");
78
TAG_FLAG(consensus_rpc_timeout_ms, advanced);
79
80
DEFINE_int32(max_wait_for_processresponse_before_closing_ms,
81
             yb::RegularBuildVsSanitizers(5000, 60000),
82
             "Maximum amount of time we will wait in Peer::Close() for Peer::ProcessResponse() to "
83
             "finish before returning proceding to close the Peer and return");
84
TAG_FLAG(max_wait_for_processresponse_before_closing_ms, advanced);
85
86
DECLARE_int32(raft_heartbeat_interval_ms);
87
88
DECLARE_bool(enable_multi_raft_heartbeat_batcher);
89
90
DEFINE_test_flag(double, fault_crash_on_leader_request_fraction, 0.0,
91
                 "Fraction of the time when the leader will crash just before sending an "
92
                 "UpdateConsensus RPC.");
93
94
DEFINE_test_flag(int32, delay_removing_peer_with_failed_tablet_secs, 0,
95
                 "If greater than 0, Peer::ProcessResponse will sleep after receiving a response "
96
                 "indicating that a tablet is in the FAILED state, and before marking this peer "
97
                 "as failed.");
98
99
// Allow for disabling remote bootstrap in unit tests where we want to test
100
// certain scenarios without triggering bootstrap of a remote peer.
101
DEFINE_test_flag(bool, enable_remote_bootstrap, true,
102
                 "Whether remote bootstrap will be initiated by the leader when it "
103
                 "detects that a follower is out of date or does not have a tablet "
104
                 "replica.");
105
106
DECLARE_int32(TEST_log_change_config_every_n);
107
108
namespace yb {
109
namespace consensus {
110
111
using log::Log;
112
using log::LogEntryBatch;
113
using std::shared_ptr;
114
using rpc::Messenger;
115
using rpc::PeriodicTimer;
116
using rpc::RpcController;
117
using strings::Substitute;
118
119
Peer::Peer(
120
    const RaftPeerPB& peer_pb, string tablet_id, string leader_uuid, PeerProxyPtr proxy,
121
    PeerMessageQueue* queue, MultiRaftHeartbeatBatcherPtr multi_raft_batcher,
122
    ThreadPoolToken* raft_pool_token, Consensus* consensus, rpc::Messenger* messenger)
123
    : tablet_id_(std::move(tablet_id)),
124
      leader_uuid_(std::move(leader_uuid)),
125
      peer_pb_(peer_pb),
126
      proxy_(std::move(proxy)),
127
      queue_(queue),
128
      multi_raft_batcher_(std::move(multi_raft_batcher)),
129
      raft_pool_token_(raft_pool_token),
130
      consensus_(consensus),
131
69.5k
      messenger_(messenger) {}
132
133
1
void Peer::SetTermForTest(int term) {
134
1
  update_response_.set_responder_term(term);
135
1
}
136
137
69.5k
Status Peer::Init() {
138
69.5k
  std::lock_guard<simple_spinlock> lock(peer_lock_);
139
69.5k
  queue_->TrackPeer(peer_pb_.permanent_uuid());
140
  // Capture a weak_ptr reference into the functor so it can safely handle
141
  // outliving the peer.
142
69.5k
  std::weak_ptr<Peer> weak_peer = shared_from_this();
143
69.5k
  heartbeater_ = PeriodicTimer::Create(
144
69.5k
      messenger_,
145
2.04M
      [weak_peer]() {
146
2.04M
        if (auto p = weak_peer.lock()) {
147
2.04M
          Status s = p->SignalRequest(RequestTriggerMode::kAlwaysSend);
148
2.04M
        }
149
2.04M
      },
150
69.5k
      MonoDelta::FromMilliseconds(FLAGS_raft_heartbeat_interval_ms));
151
69.5k
  heartbeater_->Start();
152
69.5k
  state_ = kPeerStarted;
153
69.5k
  return Status::OK();
154
69.5k
}
155
156
14.7M
Status Peer::SignalRequest(RequestTriggerMode trigger_mode) {
157
  // If the peer is currently sending, return Status::OK().
158
  // If there are new requests in the queue we'll get them on ProcessResponse().
159
14.7M
  auto performing_update_lock = LockPerformingUpdate(std::try_to_lock);
160
14.7M
  if (!performing_update_lock.owns_lock()) {
161
3.12M
    return Status::OK();
162
3.12M
  }
163
164
11.6M
  {
165
11.6M
    auto processing_lock = StartProcessingUnlocked();
166
11.6M
    if (!processing_lock.owns_lock()) {
167
2
      return STATUS(IllegalState, "Peer was closed.");
168
2
    }
169
170
    // For the first request sent by the peer, we send it even if the queue is empty, which it will
171
    // always appear to be for the first request, since this is the negotiation round.
172
11.6M
    if (PREDICT_FALSE(state_ == kPeerStarted)) {
173
69.5k
      trigger_mode = RequestTriggerMode::kAlwaysSend;
174
69.5k
      state_ = kPeerRunning;
175
69.5k
    }
176
11.6M
    DCHECK_EQ(state_, kPeerRunning);
177
178
    // If our last request generated an error, and this is not a normal heartbeat request (i.e.
179
    // we're not forcing a request even if the queue is empty, unlike we do during heartbeats),
180
    // then don't send the "per-RPC" request. Instead, we'll wait for the heartbeat.
181
    //
182
    // TODO: we could consider looking at the number of consecutive failed attempts, and instead of
183
    // ignoring the signal, ask the heartbeater to "expedite" the next heartbeat in order to achieve
184
    // something like exponential backoff after an error. As it is implemented today, any transient
185
    // error will result in a latency blip as long as the heartbeat period.
186
11.6M
    if (failed_attempts_ > 0 && trigger_mode == RequestTriggerMode::kNonEmptyOnly) {
187
68.8k
      return Status::OK();
188
68.8k
    }
189
190
11.5M
    using_thread_pool_.fetch_add(1, std::memory_order_acq_rel);
191
11.5M
  }
192
11.5M
  auto status = raft_pool_token_->SubmitFunc(
193
11.5M
      std::bind(&Peer::SendNextRequest, shared_from_this(), trigger_mode));
194
11.5M
  using_thread_pool_.fetch_sub(1, std::memory_order_acq_rel);
195
11.6M
  if (status.ok()) {
196
11.6M
    performing_update_lock.release();
197
11.6M
  }
198
11.5M
  return status;
199
11.5M
}
200
201
13.3M
void Peer::SendNextRequest(RequestTriggerMode trigger_mode) {
202
13.3M
  auto retain_self = shared_from_this();
203
2.47k
  DCHECK(performing_update_mutex_.is_locked()) << "Cannot send request";
204
205
13.3M
  auto performing_update_lock = LockPerformingUpdate(std::adopt_lock);
206
13.3M
  auto processing_lock = StartProcessingUnlocked();
207
13.3M
  if (!processing_lock.owns_lock()) {
208
24
    return;
209
24
  }
210
  // Since there's a couple of return paths from this function, setup a cleanup, in case we fill in
211
  // ops inside update_request_, but do not get to use them.
212
13.3M
  bool needs_cleanup = true;
213
13.3M
  ScopeExit([&needs_cleanup, this](){
214
13.3M
    if (needs_cleanup) {
215
      // Since we will not be using update_request_, we should cleanup the reserved ops.
216
13.3M
      CleanRequestOps(&update_request_);
217
13.3M
    }
218
13.3M
  });
219
220
  // The peer has no pending request nor is sending: send the request.
221
13.3M
  bool needs_remote_bootstrap = false;
222
13.3M
  bool last_exchange_successful = false;
223
13.3M
  PeerMemberType member_type = PeerMemberType::UNKNOWN_MEMBER_TYPE;
224
13.3M
  int64_t commit_index_before = update_request_.has_committed_op_id() ?
225
13.3M
      update_request_.committed_op_id().index() : kMinimumOpIdIndex;
226
13.3M
  ReplicateMsgsHolder msgs_holder;
227
13.3M
  Status s = queue_->RequestForPeer(
228
13.3M
      peer_pb_.permanent_uuid(), &update_request_, &msgs_holder, &needs_remote_bootstrap,
229
13.3M
      &member_type, &last_exchange_successful);
230
13.3M
  int64_t commit_index_after = update_request_.has_committed_op_id() ?
231
13.3M
      update_request_.committed_op_id().index() : kMinimumOpIdIndex;
232
233
13.3M
  if (PREDICT_FALSE(!s.ok())) {
234
20
    LOG_WITH_PREFIX(INFO) << "Could not obtain request from queue for peer: " << s;
235
20
    return;
236
20
  }
237
238
13.3M
  if (PREDICT_FALSE(needs_remote_bootstrap)) {
239
4.98k
    Status status;
240
4.98k
    if (!FLAGS_TEST_enable_remote_bootstrap) {
241
73
      failed_attempts_++;
242
73
      status = STATUS(NotSupported, "remote bootstrap is disabled");
243
4.91k
    } else {
244
4.91k
      status = queue_->GetRemoteBootstrapRequestForPeer(peer_pb_.permanent_uuid(), &rb_request_);
245
4.91k
      if (!consensus_->split_parent_tablet_id().empty()) {
246
83
        rb_request_.set_split_parent_tablet_id(consensus_->split_parent_tablet_id());
247
83
      }
248
4.91k
    }
249
4.98k
    if (!status.ok()) {
250
73
      LOG_WITH_PREFIX(WARNING) << "Unable to generate remote bootstrap request for peer: "
251
73
                               << status;
252
73
      return;
253
73
    }
254
255
4.91k
    using_thread_pool_.fetch_add(1, std::memory_order_acq_rel);
256
4.91k
    s = SendRemoteBootstrapRequest();
257
4.91k
    using_thread_pool_.fetch_sub(1, std::memory_order_acq_rel);
258
4.91k
    if (s.ok()) {
259
4.91k
      performing_update_lock.release();
260
4.91k
    }
261
4.91k
    return;
262
4.91k
  }
263
264
  // If the peer doesn't need remote bootstrap, but it is a PRE_VOTER or PRE_OBSERVER in the config,
265
  // we need to promote it.
266
13.3M
  if (last_exchange_successful &&
267
13.2M
      (member_type == PeerMemberType::PRE_VOTER || member_type == PeerMemberType::PRE_OBSERVER)) {
268
3.04M
    if (PREDICT_TRUE(consensus_)) {
269
3.04M
      auto uuid = peer_pb_.permanent_uuid();
270
      // Remove these here, before we drop the locks.
271
3.04M
      needs_cleanup = false;
272
3.04M
      CleanRequestOps(&update_request_);
273
3.04M
      processing_lock.unlock();
274
3.04M
      performing_update_lock.unlock();
275
3.04M
      consensus::ChangeConfigRequestPB req;
276
3.04M
      consensus::ChangeConfigResponsePB resp;
277
278
3.04M
      req.set_tablet_id(tablet_id_);
279
3.04M
      req.set_type(consensus::CHANGE_ROLE);
280
3.04M
      RaftPeerPB *peer = req.mutable_server();
281
3.04M
      peer->set_permanent_uuid(peer_pb_.permanent_uuid());
282
283
3.04M
      boost::optional<tserver::TabletServerErrorPB::Code> error_code;
284
285
      // If another ChangeConfig is being processed, our request will be rejected.
286
3.04M
      YB_LOG_EVERY_N(INFO, FLAGS_TEST_log_change_config_every_n)
287
1.04k
          << "Sending ChangeConfig request to promote peer";
288
3.04M
      auto status = consensus_->ChangeConfig(req, &DoNothingStatusCB, &error_code);
289
3.04M
      if (PREDICT_FALSE(!status.ok())) {
290
3.04M
        YB_LOG_EVERY_N(INFO, FLAGS_TEST_log_change_config_every_n)
291
1.01k
            << "Unable to change role for peer " << uuid << ": " << status;
292
        // Since we released the semaphore, we need to call SignalRequest again to send a message
293
3.04M
        status = SignalRequest(RequestTriggerMode::kAlwaysSend);
294
3.04M
        if (PREDICT_FALSE(!status.ok())) {
295
2
          LOG(WARNING) << "Unexpected error when trying to send request: "
296
2
                       << status;
297
2
        }
298
3.04M
      }
299
3.04M
      return;
300
3.04M
    }
301
10.3M
  }
302
303
10.3M
  if (update_request_.tablet_id().empty()) {
304
69.5k
    update_request_.set_tablet_id(tablet_id_);
305
69.5k
    update_request_.set_caller_uuid(leader_uuid_);
306
69.5k
    update_request_.set_dest_uuid(peer_pb_.permanent_uuid());
307
69.5k
  }
308
309
10.3M
  const bool req_is_heartbeat = update_request_.ops_size() == 0 &&
310
5.81M
                                commit_index_after <= commit_index_before;
311
312
  // If the queue is empty, check if we were told to send a status-only message (which is what
313
  // happens during heartbeats). If not, just return.
314
10.3M
  if (PREDICT_FALSE(req_is_heartbeat && trigger_mode == RequestTriggerMode::kNonEmptyOnly)) {
315
13.2k
    queue_->RequestWasNotSent(peer_pb_.permanent_uuid());
316
13.2k
    return;
317
13.2k
  }
318
319
  // If we're actually sending ops there's no need to heartbeat for a while, reset the heartbeater.
320
10.3M
  if (!req_is_heartbeat) {
321
8.22M
    heartbeater_->Snooze();
322
8.22M
  }
323
324
10.3M
  MAYBE_FAULT(FLAGS_TEST_fault_crash_on_leader_request_fraction);
325
326
  // We will cleanup ops from request in ProcessResponse, because otherwise there could be race
327
  // condition. When rest of this function is running in parallel to ProcessResponse.
328
10.3M
  needs_cleanup = false;
329
10.3M
  msgs_holder.ReleaseOps();
330
331
  // Heartbeat batching allows for network layer savings by reducing CPU cycles
332
  // spent on computing state, context switching (sending/receiving RPC's)
333
  // and serializing/deserializing protobufs.
334
10.3M
  if (req_is_heartbeat && multi_raft_batcher_
335
0
      && FLAGS_enable_multi_raft_heartbeat_batcher) {
336
0
    auto performing_heartbeat_lock = LockPerformingHeartbeat(std::try_to_lock);
337
0
    if (!performing_heartbeat_lock.owns_lock()) {
338
      // Outstanding heartbeat already in flight so don't schedule another.
339
0
      return;
340
0
    }
341
0
    heartbeat_request_.Swap(&update_request_);
342
0
    heartbeat_response_.Swap(&update_response_);
343
0
    cur_heartbeat_id_++;
344
0
    processing_lock.unlock();
345
0
    performing_update_lock.unlock();
346
0
    performing_heartbeat_lock.release();
347
0
    multi_raft_batcher_->AddRequestToBatch(&heartbeat_request_, &heartbeat_response_,
348
0
                                           std::bind(&Peer::ProcessHeartbeatResponse,
349
0
                                                     retain_self, _1));
350
0
    return;
351
0
  }
352
353
  // The minimum_viable_heartbeat_ represents the
354
  // heartbeat that is sent immediately following this op.
355
  // Any heartbeats which are outstanding are considered no longer viable.
356
  // It's simpler for us to drop the responses for these heartbeats
357
  // rather than attempt to ensure we process the responses of the outstanding heartbeat
358
  // and this new request in the same order they were received by the remote peer.
359
  // TODO: Remove batched but unsent heartbeats (in the respective MultiRaftBatcher) in this case
360
10.3M
  minimum_viable_heartbeat_ = cur_heartbeat_id_ + 1;
361
10.3M
  processing_lock.unlock();
362
10.3M
  performing_update_lock.release();
363
10.3M
  controller_.set_invoke_callback_mode(rpc::InvokeCallbackMode::kThreadPoolHigh);
364
10.3M
  proxy_->UpdateAsync(&update_request_, trigger_mode, &update_response_, &controller_,
365
10.3M
                      std::bind(&Peer::ProcessResponse, retain_self));
366
10.3M
}
367
368
35.3M
std::unique_lock<simple_spinlock> Peer::StartProcessingUnlocked() {
369
35.3M
  std::unique_lock<simple_spinlock> lock(peer_lock_);
370
371
35.3M
  if (state_ == kPeerClosed) {
372
1.88k
    lock.unlock();
373
1.88k
  }
374
375
35.3M
  return lock;
376
35.3M
}
377
378
bool Peer::ProcessResponseWithStatus(const Status& status,
379
10.3M
                                     ConsensusResponsePB* response) {
380
10.3M
  if (!status.ok()) {
381
18.5k
    if (status.IsRemoteError()) {
382
      // Most controller errors are caused by network issues or corner cases like shutdown and
383
      // failure to serialize a protobuf. Therefore, we generally consider these errors to indicate
384
      // an unreachable peer.  However, a RemoteError wraps some other error propagated from the
385
      // remote peer, so we know the remote is alive. Therefore, we will let the queue know that the
386
      // remote is responsive.
387
3
      queue_->NotifyPeerIsResponsiveDespiteError(peer_pb_.permanent_uuid());
388
3
    }
389
18.5k
    ProcessResponseError(status);
390
18.5k
    return false;
391
18.5k
  }
392
393
10.3M
  if (response->has_propagated_hybrid_time()) {
394
10.2M
    queue_->clock()->Update(HybridTime(response->propagated_hybrid_time()));
395
10.2M
  }
396
397
  // We should try to evict a follower which returns a WRONG UUID error.
398
10.3M
  if (response->has_error() &&
399
12.0k
      response->error().code() == tserver::TabletServerErrorPB::WRONG_SERVER_UUID) {
400
12
    queue_->NotifyObserversOfFailedFollower(
401
12
        peer_pb_.permanent_uuid(),
402
12
        Substitute("Leader communication with peer $0 received error $1, will try to "
403
12
                   "evict peer", peer_pb_.permanent_uuid(),
404
12
                   response->error().ShortDebugString()));
405
12
    ProcessResponseError(StatusFromPB(response->error().status()));
406
12
    return false;
407
12
  }
408
409
10.3M
  auto s = ResponseStatus(*response);
410
10.3M
  if (!s.ok() &&
411
12.0k
      tserver::TabletServerError(s) == tserver::TabletServerErrorPB::TABLET_NOT_RUNNING &&
412
6.54k
      tablet::RaftGroupStateError(s) == tablet::RaftGroupStatePB::FAILED) {
413
4
    if (PREDICT_FALSE(FLAGS_TEST_delay_removing_peer_with_failed_tablet_secs > 0)) {
414
0
      LOG(INFO) << "TEST: Sleeping for " << FLAGS_TEST_delay_removing_peer_with_failed_tablet_secs
415
0
                << " seconds";
416
0
      SleepFor(MonoDelta::FromSeconds(FLAGS_TEST_delay_removing_peer_with_failed_tablet_secs));
417
0
    }
418
4
    queue_->NotifyObserversOfFailedFollower(
419
4
        peer_pb_.permanent_uuid(),
420
4
        Format("Tablet in peer $0 is in FAILED state, will try to evict peer",
421
4
               peer_pb_.permanent_uuid()));
422
4
    ProcessResponseError(StatusFromPB(response->error().status()));
423
4
  }
424
425
  // Response should be either error or status.
426
18.4E
  LOG_IF(DFATAL, response->has_error() == response->has_status())
427
18.4E
    << "Invalid response: " << response->ShortDebugString();
428
429
  // Pass through errors we can respond to, like not found, since in that case
430
  // we will need to remotely bootstrap. TODO: Handle DELETED response once implemented.
431
10.3M
  if ((response->has_error() &&
432
12.0k
      response->error().code() != tserver::TabletServerErrorPB::TABLET_NOT_FOUND) ||
433
10.2M
      (response->status().has_error() &&
434
68.0k
          response->status().error().code() == consensus::ConsensusErrorPB::CANNOT_PREPARE)) {
435
    // Again, let the queue know that the remote is still responsive, since we will not be sending
436
    // this error response through to the queue.
437
7.15k
    queue_->NotifyPeerIsResponsiveDespiteError(peer_pb_.permanent_uuid());
438
7.15k
    ProcessResponseError(StatusFromPB(response->error().status()));
439
7.15k
    return false;
440
7.15k
  }
441
442
10.2M
  failed_attempts_ = 0;
443
10.2M
  return queue_->ResponseFromPeer(peer_pb_.permanent_uuid(), *response);
444
10.2M
}
445
446
10.3M
void Peer::ProcessResponse() {
447
122
  DCHECK(performing_update_mutex_.is_locked()) << "Got a response when nothing was pending.";
448
10.3M
  auto status = controller_.status();
449
10.3M
  if (status.ok()) {
450
10.3M
    status = controller_.thread_pool_failure();
451
10.3M
  }
452
10.3M
  controller_.Reset();
453
10.3M
  CleanRequestOps(&update_request_);
454
455
10.3M
  auto performing_update_lock = LockPerformingUpdate(std::adopt_lock);
456
10.3M
  auto processing_lock = StartProcessingUnlocked();
457
10.3M
  if (!processing_lock.owns_lock()) {
458
941
    return;
459
941
  }
460
10.3M
  bool more_pending = ProcessResponseWithStatus(status, &update_response_);
461
462
10.3M
  if (more_pending) {
463
1.78M
    processing_lock.unlock();
464
1.78M
    performing_update_lock.release();
465
1.78M
    SendNextRequest(RequestTriggerMode::kAlwaysSend);
466
1.78M
  }
467
10.3M
}
468
469
0
void Peer::ProcessHeartbeatResponse(const Status& status) {
470
0
  DCHECK(performing_heartbeat_mutex_.is_locked()) << "Got a heartbeat when nothing was pending.";
471
0
  DCHECK(heartbeat_request_.ops_size() == 0) << "Got a heartbeat with a non-zero number of ops.";
472
473
0
  auto performing_heartbeat_lock = LockPerformingHeartbeat(std::adopt_lock);
474
0
  auto processing_lock = StartProcessingUnlocked();
475
0
  if (!processing_lock.owns_lock()) {
476
0
    return;
477
0
  }
478
479
0
  if (cur_heartbeat_id_ < minimum_viable_heartbeat_) {
480
    // If we receive a response from a heartbeat that was sent before a valid op
481
    // then we should discard it as the op is more recent and the heartbeat should not
482
    // be modifying any state.
483
    // TODO: Add a metric to track the frequency of this
484
0
    return;
485
0
  }
486
0
  bool more_pending = ProcessResponseWithStatus(status, &heartbeat_response_);
487
488
0
  if (more_pending) {
489
0
    auto performing_update_lock = LockPerformingUpdate(std::try_to_lock);
490
0
    if (!performing_update_lock.owns_lock()) {
491
0
      return;
492
0
    }
493
0
    performing_heartbeat_lock.unlock();
494
0
    processing_lock.unlock();
495
0
    performing_update_lock.release();
496
0
    SendNextRequest(RequestTriggerMode::kAlwaysSend);
497
0
  }
498
0
}
499
500
4.91k
Status Peer::SendRemoteBootstrapRequest() {
501
270
  YB_LOG_WITH_PREFIX_EVERY_N_SECS(INFO, 30) << "Sending request to remotely bootstrap";
502
4.91k
  controller_.set_invoke_callback_mode(rpc::InvokeCallbackMode::kThreadPoolNormal);
503
4.91k
  return raft_pool_token_->SubmitFunc([retain_self = shared_from_this()]() {
504
4.91k
    retain_self->proxy_->StartRemoteBootstrap(
505
4.91k
      &retain_self->rb_request_, &retain_self->rb_response_, &retain_self->controller_,
506
4.91k
      std::bind(&Peer::ProcessRemoteBootstrapResponse, retain_self));
507
4.91k
  });
508
4.91k
}
509
510
4.89k
void Peer::ProcessRemoteBootstrapResponse() {
511
4.89k
  Status status = controller_.status();
512
4.89k
  controller_.Reset();
513
514
4.89k
  auto performing_update_lock = LockPerformingUpdate(std::adopt_lock);
515
4.89k
  auto processing_lock = StartProcessingUnlocked();
516
4.89k
  if (!processing_lock.owns_lock()) {
517
917
    return;
518
917
  }
519
520
3.97k
  if (!status.ok()) {
521
73
    LOG_WITH_PREFIX(WARNING) << "Unable to begin remote bootstrap on peer: " << status;
522
73
    return;
523
73
  }
524
525
3.90k
  if (rb_response_.has_error()) {
526
3.89k
    const auto error_code = rb_response_.error().code();
527
3.89k
    if (
528
3.89k
        error_code == tserver::TabletServerErrorPB::ALREADY_IN_PROGRESS ||
529
3.89k
        error_code == tserver::TabletServerErrorPB::TABLET_SPLIT_PARENT_STILL_LIVE) {
530
3.89k
      queue_->NotifyPeerIsResponsiveDespiteError(peer_pb_.permanent_uuid());
531
41
      YB_LOG_WITH_PREFIX_EVERY_N_SECS(WARNING, 30)
532
41
        << ":::Unable to begin remote bootstrap on peer: " << rb_response_.ShortDebugString();
533
1
    } else {
534
1
      LOG_WITH_PREFIX(WARNING) << "Unable to begin remote bootstrap on peer: "
535
1
                               << rb_response_.ShortDebugString();
536
1
    }
537
3.89k
  }
538
3.90k
}
539
540
25.7k
void Peer::ProcessResponseError(const Status& status) {
541
25.7k
  DCHECK(performing_update_mutex_.is_locked() || performing_heartbeat_mutex_.is_locked());
542
25.7k
  failed_attempts_++;
543
3.06k
  YB_LOG_WITH_PREFIX_EVERY_N_SECS(WARNING, 5) << "Couldn't send request. "
544
3.06k
      << " Status: " << status.ToString() << ". Retrying in the next heartbeat period."
545
3.06k
      << " Already tried " << failed_attempts_ << " times. State: " << state_;
546
25.7k
}
547
548
46.8k
string Peer::LogPrefix() const {
549
46.8k
  return Format("T $0 P $1 -> Peer $2 ($3, $4): ",
550
46.8k
                tablet_id_, leader_uuid_, peer_pb_.permanent_uuid(),
551
46.8k
                peer_pb_.last_known_private_addr(), peer_pb_.last_known_broadcast_addr());
552
46.8k
}
553
554
43.2k
void Peer::Close() {
555
43.2k
  if (heartbeater_) {
556
43.2k
    heartbeater_->Stop();
557
43.2k
  }
558
559
  // If the peer is already closed return.
560
43.2k
  {
561
43.2k
    std::lock_guard<simple_spinlock> processing_lock(peer_lock_);
562
43.2k
    if (using_thread_pool_.load(std::memory_order_acquire) > 0) {
563
5
      auto deadline = std::chrono::steady_clock::now() +
564
5
                      FLAGS_max_wait_for_processresponse_before_closing_ms * 1ms;
565
5
      BackoffWaiter waiter(deadline, 100ms);
566
10
      while (using_thread_pool_.load(std::memory_order_acquire) > 0) {
567
5
        if (!waiter.Wait()) {
568
0
          LOG_WITH_PREFIX(DFATAL)
569
0
              << "Timed out waiting for ThreadPoolToken::SubmitFunc() to finish. "
570
0
              << "Number of pending calls: " << using_thread_pool_.load(std::memory_order_acquire);
571
0
          break;
572
0
        }
573
5
      }
574
5
    }
575
43.2k
    if (state_ == kPeerClosed) {
576
0
      return;
577
0
    }
578
1
    DCHECK(state_ == kPeerRunning || state_ == kPeerStarted) << "Unexpected state: " << state_;
579
43.2k
    state_ = kPeerClosed;
580
43.2k
    LOG_WITH_PREFIX(INFO) << "Closing peer";
581
43.2k
  }
582
583
43.2k
  auto retain_self = shared_from_this();
584
585
43.2k
  queue_->UntrackPeer(peer_pb_.permanent_uuid());
586
43.2k
}
587
588
43.2k
Peer::~Peer() {
589
43.2k
  std::lock_guard<simple_spinlock> processing_lock(peer_lock_);
590
0
  CHECK_EQ(state_, kPeerClosed) << "Peer cannot be implicitly closed";
591
43.2k
}
592
593
26.7M
void Peer::CleanRequestOps(ConsensusRequestPB* request) {
594
26.7M
  request->mutable_ops()->ExtractSubrange(0, request->ops().size(), nullptr /* elements */);
595
26.7M
}
596
597
RpcPeerProxy::RpcPeerProxy(HostPort hostport, ConsensusServiceProxyPtr consensus_proxy)
598
220k
    : hostport_(std::move(hostport)), consensus_proxy_(std::move(consensus_proxy)) {
599
220k
}
600
601
void RpcPeerProxy::UpdateAsync(const ConsensusRequestPB* request,
602
                               RequestTriggerMode trigger_mode,
603
                               ConsensusResponsePB* response,
604
                               rpc::RpcController* controller,
605
10.3M
                               const rpc::ResponseCallback& callback) {
606
10.3M
  controller->set_timeout(MonoDelta::FromMilliseconds(FLAGS_consensus_rpc_timeout_ms));
607
10.3M
  consensus_proxy_->UpdateConsensusAsync(*request, response, controller, callback);
608
10.3M
}
609
610
void RpcPeerProxy::RequestConsensusVoteAsync(const VoteRequestPB* request,
611
                                             VoteResponsePB* response,
612
                                             rpc::RpcController* controller,
613
145k
                                             const rpc::ResponseCallback& callback) {
614
145k
  consensus_proxy_->RequestConsensusVoteAsync(*request, response, controller, callback);
615
145k
}
616
617
void RpcPeerProxy::RunLeaderElectionAsync(const RunLeaderElectionRequestPB* request,
618
                                          RunLeaderElectionResponsePB* response,
619
                                          rpc::RpcController* controller,
620
4.85k
                                          const rpc::ResponseCallback& callback) {
621
4.85k
  controller->set_timeout(MonoDelta::FromMilliseconds(FLAGS_consensus_rpc_timeout_ms));
622
4.85k
  consensus_proxy_->RunLeaderElectionAsync(*request, response, controller, callback);
623
4.85k
}
624
625
void RpcPeerProxy::LeaderElectionLostAsync(const LeaderElectionLostRequestPB* request,
626
                                           LeaderElectionLostResponsePB* response,
627
                                           rpc::RpcController* controller,
628
44
                                           const rpc::ResponseCallback& callback) {
629
44
  consensus_proxy_->LeaderElectionLostAsync(*request, response, controller, callback);
630
44
}
631
632
void RpcPeerProxy::StartRemoteBootstrap(const StartRemoteBootstrapRequestPB* request,
633
                                        StartRemoteBootstrapResponsePB* response,
634
                                        rpc::RpcController* controller,
635
4.91k
                                        const rpc::ResponseCallback& callback) {
636
4.91k
  consensus_proxy_->StartRemoteBootstrapAsync(*request, response, controller, callback);
637
4.91k
}
638
639
193k
RpcPeerProxy::~RpcPeerProxy() {}
640
641
RpcPeerProxyFactory::RpcPeerProxyFactory(
642
    Messenger* messenger, rpc::ProxyCache* proxy_cache, CloudInfoPB from)
643
88.7k
    : messenger_(messenger), proxy_cache_(proxy_cache), from_(std::move(from)) {}
644
645
220k
PeerProxyPtr RpcPeerProxyFactory::NewProxy(const RaftPeerPB& peer_pb) {
646
220k
  auto hostport = HostPortFromPB(DesiredHostPort(peer_pb, from_));
647
220k
  auto proxy = std::make_unique<ConsensusServiceProxy>(proxy_cache_, hostport);
648
220k
  return std::make_unique<RpcPeerProxy>(std::move(hostport), std::move(proxy));
649
220k
}
650
651
47.7k
RpcPeerProxyFactory::~RpcPeerProxyFactory() {}
652
653
247k
rpc::Messenger* RpcPeerProxyFactory::messenger() const { return messenger_; }
654
655
struct GetNodeInstanceRequest {
656
  GetNodeInstanceRequestPB req;
657
  GetNodeInstanceResponsePB resp;
658
  rpc::RpcController controller;
659
  ConsensusServiceProxy proxy;
660
661
  GetNodeInstanceRequest(rpc::ProxyCache* proxy_cache, const HostPort& hostport)
662
15.2k
      : proxy(proxy_cache, hostport) {}
663
};
664
665
Status SetPermanentUuidForRemotePeer(
666
    rpc::ProxyCache* proxy_cache,
667
    std::chrono::steady_clock::duration timeout,
668
    const std::vector<HostPort>& endpoints,
669
14.6k
    RaftPeerPB* remote_peer) {
670
671
14.6k
  DCHECK(!remote_peer->has_permanent_uuid());
672
14.6k
  auto deadline = std::chrono::steady_clock::now() + timeout;
673
674
14.6k
  std::vector<GetNodeInstanceRequest> requests;
675
14.6k
  requests.reserve(endpoints.size());
676
15.2k
  for (const auto& hp : endpoints) {
677
15.2k
    requests.emplace_back(proxy_cache, hp);
678
15.2k
  }
679
680
14.6k
  CountDownLatch latch(requests.size());
681
14.6k
  const auto kMaxWait = 10s;
682
14.6k
  BackoffWaiter waiter(deadline, kMaxWait);
683
32.8k
  for (;;) {
684
32.8k
    latch.Reset(requests.size());
685
32.8k
    std::atomic<GetNodeInstanceRequest*> last_reply{nullptr};
686
33.4k
    for (auto& request : requests) {
687
33.4k
      request.controller.Reset();
688
33.4k
      request.controller.set_timeout(kMaxWait);
689
0
      VLOG(2) << "Getting uuid from remote peer. Request: " << request.req.ShortDebugString();
690
691
33.4k
      request.proxy.GetNodeInstanceAsync(
692
33.4k
          request.req, &request.resp, &request.controller,
693
33.4k
          [&latch, &request, &last_reply] {
694
33.4k
        if (!request.controller.status().IsTimedOut()) {
695
33.4k
          last_reply.store(&request, std::memory_order_release);
696
33.4k
        }
697
33.4k
        latch.CountDown();
698
33.4k
      });
699
33.4k
    }
700
701
32.8k
    latch.Wait();
702
703
32.9k
    for (auto& request : requests) {
704
32.9k
      auto status = request.controller.status();
705
32.9k
      if (status.ok()) {
706
14.5k
        remote_peer->set_permanent_uuid(request.resp.node_instance().permanent_uuid());
707
14.5k
        remote_peer->set_member_type(PeerMemberType::VOTER);
708
14.5k
        if (request.resp.has_registration()) {
709
14.5k
          CopyRegistration(request.resp.registration(), remote_peer);
710
0
        } else {
711
          // Required for backward compatibility.
712
0
          HostPortsToPBs(endpoints, remote_peer->mutable_last_known_private_addr());
713
0
        }
714
14.5k
        return Status::OK();
715
14.5k
      }
716
32.9k
    }
717
718
18.2k
    auto* last_reply_value = last_reply.load(std::memory_order_acquire);
719
18.2k
    if (last_reply_value == nullptr) {
720
0
      last_reply_value = &requests.front();
721
0
    }
722
723
18.2k
    LOG(WARNING) << "Error getting permanent uuid from config peer " << yb::ToString(endpoints)
724
18.2k
                 << ": " << last_reply_value->controller.status();
725
726
18.2k
    if (last_reply_value->controller.status().IsAborted()) {
727
10
      return last_reply_value->controller.status();
728
10
    }
729
730
18.2k
    if (!waiter.Wait()) {
731
0
      return STATUS_FORMAT(
732
0
          TimedOut, "Getting permanent uuid from $0 timed out after $1: $2",
733
0
          endpoints, timeout, last_reply_value->controller.status());
734
0
    }
735
736
18.2k
    LOG(INFO) << "Retrying to get permanent uuid for remote peer: "
737
18.2k
              << yb::ToString(endpoints) << " attempt: " << waiter.attempt();
738
18.2k
  }
739
14.6k
}
740
741
}  // namespace consensus
742
}  // namespace yb