YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/consensus/consensus_peers.h
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#ifndef YB_CONSENSUS_CONSENSUS_PEERS_H_
34
#define YB_CONSENSUS_CONSENSUS_PEERS_H_
35
36
#include <stdint.h>
37
38
#include <atomic>
39
#include <cstdint>
40
#include <cstdlib>
41
#include <memory>
42
#include <string>
43
#include <type_traits>
44
#include <vector>
45
46
#include <boost/version.hpp>
47
#include <gflags/gflags_declare.h>
48
49
#include "yb/consensus/consensus_fwd.h"
50
#include "yb/consensus/consensus.pb.h"
51
#include "yb/consensus/consensus_util.h"
52
#include "yb/consensus/metadata.pb.h"
53
54
#include "yb/gutil/integral_types.h"
55
56
#include "yb/rpc/rpc_controller.h"
57
58
#include "yb/util/status_fwd.h"
59
#include "yb/util/atomic.h"
60
#include "yb/util/countdown_latch.h"
61
#include "yb/util/locks.h"
62
#include "yb/util/net/net_util.h"
63
#include "yb/util/result.h"
64
#include "yb/util/semaphore.h"
65
#include "yb/util/shared_lock.h"
66
67
namespace yb {
68
class HostPort;
69
class ThreadPoolToken;
70
71
namespace rpc {
72
class Messenger;
73
class PeriodicTimer;
74
}
75
76
namespace consensus {
77
78
// A peer in consensus (local or remote).
79
//
80
// Leaders use peers to update the local Log and remote replicas.
81
//
82
// Peers are owned by the consensus implementation and do not keep state aside from whether there
83
// are requests pending or if requests are being processed.
84
//
85
// There are two external actions that trigger a state change:
86
//
87
// SignalRequest(): Called by the consensus implementation, notifies that the queue contains
88
// messages to be processed. This function takes a parameter allowing to send requests only if
89
// the queue is not empty, or to force-send a request even if it is empty.
90
//
91
// ProcessResponse() Called a response from a peer is received.
92
//
93
// The following state diagrams describe what happens when a state changing method is called.
94
//
95
//                        +
96
//                        |
97
//       SignalRequest()  |
98
//                        |
99
//                        |
100
//                        v
101
//              +------------------+
102
//       +------+    processing ?  +-----+
103
//       |      +------------------+     |
104
//       |                               |
105
//       | Yes                           | No
106
//       |                               |
107
//       v                               v
108
//     return                      ProcessNextRequest()
109
//                                 processing = true
110
//                                 - get reqs. from queue
111
//                                 - update peer async
112
//                                 return
113
//
114
//                         +
115
//                         |
116
//      ProcessResponse()  |
117
//      processing = false |
118
//                         v
119
//               +------------------+
120
//        +------+   more pending?  +-----+
121
//        |      +------------------+     |
122
//        |                               |
123
//        | Yes                           | No
124
//        |                               |
125
//        v                               v
126
//  SignalRequest()                    return
127
//
128
class Peer;
129
typedef std::shared_ptr<Peer> PeerPtr;
130
131
class Peer : public std::enable_shared_from_this<Peer> {
132
 public:
133
  Peer(const RaftPeerPB& peer, std::string tablet_id, std::string leader_uuid,
134
       PeerProxyPtr proxy, PeerMessageQueue* queue, MultiRaftHeartbeatBatcherPtr multi_raft_batcher,
135
       ThreadPoolToken* raft_pool_token, Consensus* consensus, rpc::Messenger* messenger);
136
137
  // Initializes a peer and get its status.
138
  CHECKED_STATUS Init();
139
140
  // Signals that this peer has a new request to replicate/store.
141
  CHECKED_STATUS SignalRequest(RequestTriggerMode trigger_mode);
142
143
23.4k
  const RaftPeerPB& peer_pb() const { return peer_pb_; }
144
145
  // Returns the PeerProxy if this is a remote peer or NULL if it
146
  // isn't. Used for tests to fiddle with the proxy and emulate remote
147
  // behavior.
148
  PeerProxy* GetPeerProxyForTests();
149
150
  // Stop sending requests and periodic heartbeats.
151
  //
152
  // This does not block waiting on any current outstanding requests to finish.
153
  // However, when they do finish, the results will be disregarded, so this
154
  // is safe to call at any point.
155
  //
156
  // This method must be called before the Peer's associated ThreadPoolToken
157
  // is destructed. Once this method returns, it is safe to destruct
158
  // the ThreadPoolToken.
159
  void Close();
160
161
  void SetTermForTest(int term);
162
163
  ~Peer();
164
165
  // Creates a new remote peer and makes the queue track it.'
166
  //
167
  // Requests to this peer (which may end up doing IO to read non-cached log entries) are assembled
168
  // on 'raft_pool_token'.  Response handling may also involve IO related to log-entry lookups
169
  // and is also done on 'thread_pool'.
170
  template <class... Args>
171
69.6k
  static Result<PeerPtr> NewRemotePeer(Args&&... args) {
172
69.6k
    auto new_peer = std::make_shared<Peer>(std::forward<Args>(args)...);
173
69.6k
    RETURN_NOT_OK(new_peer->Init());
174
69.6k
    return Result<PeerPtr>(std::move(new_peer));
175
69.6k
  }
_ZN2yb9consensus4Peer13NewRemotePeerIJRNS0_10RaftPeerPBERPKcS7_NSt3__110unique_ptrINS0_9PeerProxyENS8_14default_deleteISA_EEEEPNS0_16PeerMessageQueueEDnPNS_15ThreadPoolTokenEDnPNS_3rpc9MessengerEEEENS_6ResultINS8_10shared_ptrIS1_EEEEDpOT_
Line
Count
Source
171
5
  static Result<PeerPtr> NewRemotePeer(Args&&... args) {
172
5
    auto new_peer = std::make_shared<Peer>(std::forward<Args>(args)...);
173
5
    RETURN_NOT_OK(new_peer->Init());
174
5
    return Result<PeerPtr>(std::move(new_peer));
175
5
  }
_ZN2yb9consensus4Peer13NewRemotePeerIJNS0_10RaftPeerPBERPKcS6_NSt3__110unique_ptrINS0_9PeerProxyENS7_14default_deleteIS9_EEEEPNS0_16PeerMessageQueueEDnPNS_15ThreadPoolTokenEDnPNS_3rpc9MessengerEEEENS_6ResultINS7_10shared_ptrIS1_EEEEDpOT_
Line
Count
Source
171
2
  static Result<PeerPtr> NewRemotePeer(Args&&... args) {
172
2
    auto new_peer = std::make_shared<Peer>(std::forward<Args>(args)...);
173
2
    RETURN_NOT_OK(new_peer->Init());
174
2
    return Result<PeerPtr>(std::move(new_peer));
175
2
  }
_ZN2yb9consensus4Peer13NewRemotePeerIJRKNS0_10RaftPeerPBERKNSt3__112basic_stringIcNS6_11char_traitsIcEENS6_9allocatorIcEEEESE_NS6_10unique_ptrINS0_9PeerProxyENS6_14default_deleteISG_EEEERPNS0_16PeerMessageQueueERNS6_10shared_ptrINS0_25MultiRaftHeartbeatBatcherEEERPNS_15ThreadPoolTokenERPNS0_9ConsensusEPNS_3rpc9MessengerEEEENS_6ResultINSN_IS1_EEEEDpOT_
Line
Count
Source
171
69.5k
  static Result<PeerPtr> NewRemotePeer(Args&&... args) {
172
69.5k
    auto new_peer = std::make_shared<Peer>(std::forward<Args>(args)...);
173
69.5k
    RETURN_NOT_OK(new_peer->Init());
174
69.5k
    return Result<PeerPtr>(std::move(new_peer));
175
69.5k
  }
176
177
0
  uint64_t failed_attempts() {
178
0
    std::lock_guard<simple_spinlock> l(peer_lock_);
179
0
    return failed_attempts_;
180
0
  }
181
182
 private:
183
  void SendNextRequest(RequestTriggerMode trigger_mode);
184
185
  // Signals that a response was received from the peer. This method does response handling that
186
  // requires IO or may block.
187
  void ProcessResponse();
188
189
  // Signals that a heartbeat response was received from the peer.
190
  void ProcessHeartbeatResponse(const Status& status);
191
192
  // Returns true if there are more pending ops to process, false otherwise.
193
  bool ProcessResponseWithStatus(const Status& status,
194
                                 ConsensusResponsePB* response);
195
196
  // Fetch the desired remote bootstrap request from the queue and send it to the peer. The callback
197
  // goes to ProcessRemoteBootstrapResponse().
198
  //
199
  // Returns a bad Status if remote bootstrap is disabled, or if the request cannot be generated for
200
  // some reason.
201
  CHECKED_STATUS SendRemoteBootstrapRequest();
202
203
  // Handle RPC callback from initiating remote bootstrap.
204
  void ProcessRemoteBootstrapResponse();
205
206
  // Signals there was an error sending the request to the peer.
207
  void ProcessResponseError(const Status& status);
208
209
  // Returns true if the peer is closed and the calling function should return.
210
  std::unique_lock<simple_spinlock> StartProcessingUnlocked();
211
212
  template <class LockType>
213
38.5M
  std::unique_lock<AtomicTryMutex> LockPerformingUpdate(LockType type) {
214
38.5M
    return std::unique_lock<AtomicTryMutex>(performing_update_mutex_, type);
215
38.5M
  }
_ZN2yb9consensus4Peer20LockPerformingUpdateINSt3__113try_to_lock_tEEENS3_11unique_lockINS_14AtomicTryMutexEEET_
Line
Count
Source
213
14.8M
  std::unique_lock<AtomicTryMutex> LockPerformingUpdate(LockType type) {
214
14.8M
    return std::unique_lock<AtomicTryMutex>(performing_update_mutex_, type);
215
14.8M
  }
_ZN2yb9consensus4Peer20LockPerformingUpdateINSt3__112adopt_lock_tEEENS3_11unique_lockINS_14AtomicTryMutexEEET_
Line
Count
Source
213
23.7M
  std::unique_lock<AtomicTryMutex> LockPerformingUpdate(LockType type) {
214
23.7M
    return std::unique_lock<AtomicTryMutex>(performing_update_mutex_, type);
215
23.7M
  }
216
217
  template <class LockType>
218
0
  std::unique_lock<AtomicTryMutex> LockPerformingHeartbeat(LockType type) {
219
0
    return std::unique_lock<AtomicTryMutex>(performing_heartbeat_mutex_, type);
220
0
  }
Unexecuted instantiation: _ZN2yb9consensus4Peer23LockPerformingHeartbeatINSt3__113try_to_lock_tEEENS3_11unique_lockINS_14AtomicTryMutexEEET_
Unexecuted instantiation: _ZN2yb9consensus4Peer23LockPerformingHeartbeatINSt3__112adopt_lock_tEEENS3_11unique_lockINS_14AtomicTryMutexEEET_
221
222
  // Simple wrapper to cleanup ops from the request.
223
  void CleanRequestOps(ConsensusRequestPB* request);
224
225
  std::string LogPrefix() const;
226
227
0
  const std::string& tablet_id() const { return tablet_id_; }
228
229
  const std::string tablet_id_;
230
  const std::string leader_uuid_;
231
232
  const RaftPeerPB peer_pb_;
233
234
  PeerProxyPtr proxy_;
235
236
  PeerMessageQueue* queue_;
237
  uint64_t failed_attempts_ = 0;
238
239
  // The latest consensus update request and response.
240
  ConsensusRequestPB update_request_;
241
  ConsensusResponsePB update_response_;
242
243
  // Latest heartbeat request and response
244
  ConsensusRequestPB heartbeat_request_;
245
  ConsensusResponsePB heartbeat_response_;
246
247
  // Each time a heartbeat request is sent this value is incremented.
248
  int64_t cur_heartbeat_id_ = 0;
249
  // Indiciates the last valid heartbeat id that was sent.
250
  // Each time an operation (non-heartbeat) is sent this value is updated.
251
  // Upon receving the response for an outstanding heartbeat if
252
  // cur_heartbeat_id_ < minimum_viable_heartbeat_ then the heartbeat is invalid
253
  // since a more recent op was sent so we won't process it's response.
254
  int64_t minimum_viable_heartbeat_ = 0;
255
256
  // The latest remote bootstrap request and response.
257
  StartRemoteBootstrapRequestPB rb_request_;
258
  StartRemoteBootstrapResponsePB rb_response_;
259
260
  rpc::RpcController controller_;
261
262
  // Held if there is an outstanding request.  This is used in order to ensure that we only have a
263
  // single request outstanding at a time, and to wait for the outstanding requests at Close().
264
  AtomicTryMutex performing_update_mutex_;
265
266
  // Held if there is an outstanding heartbeat request.
267
  // This is used in order to ensure that we only have a
268
  // single heartbeat request outstanding at a time.
269
  AtomicTryMutex performing_heartbeat_mutex_;
270
271
  // Heartbeater for remote peer implementations.  This will send status only requests to the remote
272
  // peers whenever we go more than 'FLAGS_raft_heartbeat_interval_ms' without sending actual data.
273
  std::shared_ptr<rpc::PeriodicTimer> heartbeater_;
274
275
  // Batcher that currently batches heartbeat requests that are sent by each consensus peer
276
  // on a per tserver level
277
  MultiRaftHeartbeatBatcherPtr multi_raft_batcher_;
278
279
  // Thread pool used to construct requests to this peer.
280
  ThreadPoolToken* raft_pool_token_;
281
282
  enum State {
283
    kPeerCreated,
284
    kPeerStarted,
285
    kPeerRunning,
286
    kPeerClosed
287
  };
288
289
  // Lock that protects Peer state changes, initialization, etc.  Must not try to acquire sem_ while
290
  // holding peer_lock_.
291
  mutable simple_spinlock peer_lock_;
292
  State state_ = kPeerCreated;
293
  Consensus* consensus_ = nullptr;
294
  rpc::Messenger* messenger_ = nullptr;
295
  std::atomic<int> using_thread_pool_{0};
296
};
297
298
// A proxy to another peer. Usually a thin wrapper around an rpc proxy but can be replaced for
299
// tests.
300
class PeerProxy {
301
 public:
302
303
  // Sends a request, asynchronously, to a remote peer.
304
  virtual void UpdateAsync(const ConsensusRequestPB* request,
305
                           RequestTriggerMode trigger_mode,
306
                           ConsensusResponsePB* response,
307
                           rpc::RpcController* controller,
308
                           const rpc::ResponseCallback& callback) = 0;
309
310
  // Sends a RequestConsensusVote to a remote peer.
311
  virtual void RequestConsensusVoteAsync(const VoteRequestPB* request,
312
                                         VoteResponsePB* response,
313
                                         rpc::RpcController* controller,
314
                                         const rpc::ResponseCallback& callback) = 0;
315
316
  // Instructs a peer to begin a remote bootstrap session.
317
  virtual void StartRemoteBootstrap(const StartRemoteBootstrapRequestPB* request,
318
                                    StartRemoteBootstrapResponsePB* response,
319
                                    rpc::RpcController* controller,
320
0
                                    const rpc::ResponseCallback& callback) {
321
0
    LOG(DFATAL) << "Not implemented";
322
0
  }
323
324
  // Sends a RunLeaderElection request to a peer.
325
  virtual void RunLeaderElectionAsync(const RunLeaderElectionRequestPB* request,
326
                                      RunLeaderElectionResponsePB* response,
327
                                      rpc::RpcController* controller,
328
0
                                      const rpc::ResponseCallback& callback) {
329
0
    LOG(DFATAL) << "Not implemented";
330
0
  }
331
332
  virtual void LeaderElectionLostAsync(const LeaderElectionLostRequestPB* request,
333
                                       LeaderElectionLostResponsePB* response,
334
                                       rpc::RpcController* controller,
335
0
                                       const rpc::ResponseCallback& callback) {
336
0
    LOG(DFATAL) << "Not implemented";
337
0
  }
338
339
195k
  virtual ~PeerProxy() {}
340
};
341
342
typedef std::unique_ptr<PeerProxy> PeerProxyPtr;
343
344
// A peer proxy factory. Usually just obtains peers through the rpc implementation but can be
345
// replaced for tests.
346
class PeerProxyFactory {
347
 public:
348
  virtual PeerProxyPtr NewProxy(const RaftPeerPB& peer_pb) = 0;
349
350
47.8k
  virtual ~PeerProxyFactory() {}
351
352
0
  virtual rpc::Messenger* messenger() const {
353
0
    return nullptr;
354
0
  }
355
};
356
357
// PeerProxy implementation that does RPC calls
358
class RpcPeerProxy : public PeerProxy {
359
 public:
360
  RpcPeerProxy(HostPort hostport, ConsensusServiceProxyPtr consensus_proxy);
361
362
  virtual void UpdateAsync(const ConsensusRequestPB* request,
363
                           RequestTriggerMode trigger_mode,
364
                           ConsensusResponsePB* response,
365
                           rpc::RpcController* controller,
366
                           const rpc::ResponseCallback& callback) override;
367
368
  virtual void RequestConsensusVoteAsync(const VoteRequestPB* request,
369
                                         VoteResponsePB* response,
370
                                         rpc::RpcController* controller,
371
                                         const rpc::ResponseCallback& callback) override;
372
373
  virtual void StartRemoteBootstrap(const StartRemoteBootstrapRequestPB* request,
374
                                    StartRemoteBootstrapResponsePB* response,
375
                                    rpc::RpcController* controller,
376
                                    const rpc::ResponseCallback& callback) override;
377
378
  virtual void RunLeaderElectionAsync(const RunLeaderElectionRequestPB* request,
379
                                      RunLeaderElectionResponsePB* response,
380
                                      rpc::RpcController* controller,
381
                                      const rpc::ResponseCallback& callback) override;
382
383
  virtual void LeaderElectionLostAsync(const LeaderElectionLostRequestPB* request,
384
                                       LeaderElectionLostResponsePB* response,
385
                                       rpc::RpcController* controller,
386
                                       const rpc::ResponseCallback& callback) override;
387
388
  virtual ~RpcPeerProxy();
389
390
 private:
391
  HostPort hostport_;
392
  ConsensusServiceProxyPtr consensus_proxy_;
393
};
394
395
// PeerProxyFactory implementation that generates RPCPeerProxies
396
class RpcPeerProxyFactory : public PeerProxyFactory {
397
 public:
398
  RpcPeerProxyFactory(rpc::Messenger* messenger, rpc::ProxyCache* proxy_cache, CloudInfoPB from);
399
400
  PeerProxyPtr NewProxy(const RaftPeerPB& peer_pb) override;
401
402
  virtual ~RpcPeerProxyFactory();
403
404
  rpc::Messenger* messenger() const override;
405
406
 private:
407
  rpc::Messenger* messenger_ = nullptr;
408
  rpc::ProxyCache* const proxy_cache_;
409
  const CloudInfoPB from_;
410
};
411
412
// Query the consensus service at last known host/port that is specified in 'remote_peer' and set
413
// the 'permanent_uuid' field based on the response.
414
Status SetPermanentUuidForRemotePeer(
415
    rpc::ProxyCache* proxy_cache,
416
    std::chrono::steady_clock::duration timeout,
417
    const std::vector<HostPort>& endpoints,
418
    RaftPeerPB* remote_peer);
419
420
}  // namespace consensus
421
}  // namespace yb
422
423
#endif /* YB_CONSENSUS_CONSENSUS_PEERS_H_ */