YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/consensus/consensus-test-util.h
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#ifndef YB_CONSENSUS_CONSENSUS_TEST_UTIL_H_
34
#define YB_CONSENSUS_CONSENSUS_TEST_UTIL_H_
35
36
#include <map>
37
#include <memory>
38
#include <mutex>
39
#include <string>
40
#include <unordered_map>
41
#include <utility>
42
#include <vector>
43
44
#include <gmock/gmock.h>
45
46
#include "yb/common/hybrid_time.h"
47
#include "yb/common/wire_protocol.h"
48
#include "yb/consensus/consensus.h"
49
#include "yb/consensus/consensus_peers.h"
50
#include "yb/consensus/consensus_queue.h"
51
#include "yb/consensus/consensus_round.h"
52
#include "yb/consensus/opid_util.h"
53
#include "yb/consensus/raft_consensus.h"
54
#include "yb/consensus/test_consensus_context.h"
55
#include "yb/gutil/map-util.h"
56
#include "yb/gutil/strings/substitute.h"
57
#include "yb/rpc/messenger.h"
58
#include "yb/rpc/rpc_test_util.h"
59
#include "yb/server/clock.h"
60
#include "yb/util/countdown_latch.h"
61
#include "yb/util/locks.h"
62
#include "yb/util/status_log.h"
63
#include "yb/util/test_macros.h"
64
#include "yb/util/test_util.h"
65
#include "yb/util/threadpool.h"
66
67
using namespace std::literals;
68
69
52
#define TOKENPASTE(x, y) x ## y
70
52
#define TOKENPASTE2(x, y) TOKENPASTE(x, y)
71
72
#define ASSERT_OPID_EQ(left, right) \
73
30
  OpIdPB TOKENPASTE2(_left, __LINE__) = (left); \
74
30
  OpIdPB TOKENPASTE2(_right, __LINE__) = (right); \
75
26
  if (!consensus::OpIdEquals(TOKENPASTE2(_left, __LINE__), TOKENPASTE2(_right, __LINE__))) \
76
0
    FAIL() << "Expected: " << TOKENPASTE2(_right, __LINE__).ShortDebugString() << "\n" \
77
0
           << "Value: " << TOKENPASTE2(_left, __LINE__).ShortDebugString() << "\n"
78
79
namespace yb {
80
namespace consensus {
81
82
using log::Log;
83
using rpc::Messenger;
84
using strings::Substitute;
85
86
constexpr int kTermDivisor = 7;
87
88
633
inline CoarseTimePoint CoarseBigDeadline() {
89
633
  return CoarseMonoClock::now() + 600s;
90
633
}
91
92
inline ReplicateMsgPtr CreateDummyReplicate(int64_t term,
93
                                            int64_t index,
94
                                            const HybridTime& hybrid_time,
95
952
                                            int64_t payload_size) {
96
952
  auto msg = std::make_shared<ReplicateMsg>();
97
952
  OpIdPB* id = msg->mutable_id();
98
952
  id->set_term(term);
99
952
  id->set_index(index);
100
101
952
  msg->set_op_type(NO_OP);
102
952
  msg->mutable_noop_request()->mutable_payload_for_tests()->resize(payload_size);
103
952
  msg->set_hybrid_time(hybrid_time.ToUint64());
104
952
  return msg;
105
952
}
106
107
// Returns RaftPeerPB with given UUID and obviously-fake hostname / port combo.
108
26
RaftPeerPB FakeRaftPeerPB(const std::string& uuid) {
109
26
  RaftPeerPB peer_pb;
110
26
  peer_pb.set_permanent_uuid(uuid);
111
26
  auto addr = peer_pb.mutable_last_known_private_addr()->Add();
112
26
  addr->set_host(Substitute("$0-fake-hostname", CURRENT_TEST_NAME()));
113
26
  addr->set_port(0);
114
26
  return peer_pb;
115
26
}
116
117
// Appends 'count' messages to 'queue' with different terms and indexes.
118
//
119
// An operation will only be considered done (TestOperationStatus::IsDone()
120
// will become true) once at least 'n_majority' peers have called
121
// TestOperationStatus::AckPeer().
122
static inline void AppendReplicateMessagesToQueue(
123
    PeerMessageQueue* queue,
124
    const scoped_refptr<server::Clock>& clock,
125
    int64_t first_index,
126
    int64_t count,
127
113
    int64_t payload_size = 0) {
128
129
857
  for (int64_t index = first_index; index < first_index + count; index++) {
130
744
    int64_t term = index / kTermDivisor;
131
744
    CHECK_OK(queue->TEST_AppendOperation(
132
744
          CreateDummyReplicate(term, index, clock->Now(), payload_size)));
133
744
  }
134
113
}
consensus_peers-test.cc:_ZN2yb9consensusL30AppendReplicateMessagesToQueueEPNS0_16PeerMessageQueueERK13scoped_refptrINS_6server5ClockEExxx
Line
Count
Source
127
105
    int64_t payload_size = 0) {
128
129
229
  for (int64_t index = first_index; index < first_index + count; index++) {
130
124
    int64_t term = index / kTermDivisor;
131
124
    CHECK_OK(queue->TEST_AppendOperation(
132
124
          CreateDummyReplicate(term, index, clock->Now(), payload_size)));
133
124
  }
134
105
}
consensus_queue-test.cc:_ZN2yb9consensusL30AppendReplicateMessagesToQueueEPNS0_16PeerMessageQueueERK13scoped_refptrINS_6server5ClockEExxx
Line
Count
Source
127
8
    int64_t payload_size = 0) {
128
129
628
  for (int64_t index = first_index; index < first_index + count; index++) {
130
620
    int64_t term = index / kTermDivisor;
131
620
    CHECK_OK(queue->TEST_AppendOperation(
132
620
          CreateDummyReplicate(term, index, clock->Now(), payload_size)));
133
620
  }
134
8
}
Unexecuted instantiation: leader_election-test.cc:_ZN2yb9consensusL30AppendReplicateMessagesToQueueEPNS0_16PeerMessageQueueERK13scoped_refptrINS_6server5ClockEExxx
Unexecuted instantiation: log_cache-test.cc:_ZN2yb9consensusL30AppendReplicateMessagesToQueueEPNS0_16PeerMessageQueueERK13scoped_refptrINS_6server5ClockEExxx
Unexecuted instantiation: raft_consensus-test.cc:_ZN2yb9consensusL30AppendReplicateMessagesToQueueEPNS0_16PeerMessageQueueERK13scoped_refptrINS_6server5ClockEExxx
Unexecuted instantiation: raft_consensus_quorum-test.cc:_ZN2yb9consensusL30AppendReplicateMessagesToQueueEPNS0_16PeerMessageQueueERK13scoped_refptrINS_6server5ClockEExxx
Unexecuted instantiation: replica_state-test.cc:_ZN2yb9consensusL30AppendReplicateMessagesToQueueEPNS0_16PeerMessageQueueERK13scoped_refptrINS_6server5ClockEExxx
Unexecuted instantiation: tablet_bootstrap-test.cc:_ZN2yb9consensusL30AppendReplicateMessagesToQueueEPNS0_16PeerMessageQueueERK13scoped_refptrINS_6server5ClockEExxx
135
136
12
OpIdPB MakeOpIdPbForIndex(int index) {
137
12
  return MakeOpId(index / kTermDivisor, index);
138
12
}
139
140
13
OpId MakeOpIdForIndex(int index) {
141
13
  return OpId(index / kTermDivisor, index);
142
13
}
143
144
10
std::string OpIdStrForIndex(int index) {
145
10
  return OpIdToString(MakeOpIdPbForIndex(index));
146
10
}
147
148
// Builds a configuration of 'num' voters.
149
27
RaftConfigPB BuildRaftConfigPBForTests(int num) {
150
27
  RaftConfigPB raft_config;
151
100
  for (int i = 0; i < num; i++) {
152
73
    RaftPeerPB* peer_pb = raft_config.add_peers();
153
73
    peer_pb->set_member_type(PeerMemberType::VOTER);
154
73
    peer_pb->set_permanent_uuid(Substitute("peer-$0", i));
155
73
    HostPortPB* hp = peer_pb->mutable_last_known_private_addr()->Add();
156
73
    hp->set_host(Substitute("peer-$0.fake-domain-for-tests", i));
157
73
    hp->set_port(0);
158
73
  }
159
27
  return raft_config;
160
27
}
161
162
// Abstract base class to build PeerProxy implementations on top of for testing.
163
// Provides a single-threaded pool to run callbacks in and callback
164
// registration/running, along with an enum to identify the supported methods.
165
class TestPeerProxy : public PeerProxy {
166
 public:
167
  // Which PeerProxy method to invoke.
168
  enum Method {
169
    kUpdate,
170
    kRequestVote,
171
  };
172
173
1.75k
  explicit TestPeerProxy(ThreadPool* pool) : pool_(pool) {}
174
175
 protected:
176
  // Register the RPC callback in order to call later.
177
  // We currently only support one request of each method being in flight at a time.
178
1.01k
  virtual void RegisterCallback(Method method, const rpc::ResponseCallback& callback) {
179
1.01k
    std::lock_guard<simple_spinlock> lock(lock_);
180
1.01k
    InsertOrDie(&callbacks_, method, callback);
181
1.01k
  }
182
183
  // Answer the peer.
184
1.01k
  virtual void Respond(Method method) {
185
1.01k
    rpc::ResponseCallback callback;
186
1.01k
    {
187
1.01k
      std::lock_guard<simple_spinlock> lock(lock_);
188
1.01k
      callback = FindOrDie(callbacks_, method);
189
1.01k
      CHECK_EQ(1, callbacks_.erase(method));
190
      // Drop the lock before submitting to the pool, since the callback itself may
191
      // destroy this instance.
192
1.01k
    }
193
1.01k
    WARN_NOT_OK(pool_->SubmitFunc(callback), "Submit failed");
194
1.01k
  }
195
196
339
  virtual void RegisterCallbackAndRespond(Method method, const rpc::ResponseCallback& callback) {
197
339
    RegisterCallback(method, callback);
198
339
    Respond(method);
199
339
  }
200
201
  mutable simple_spinlock lock_;
202
  ThreadPool* pool_;
203
  std::map<Method, rpc::ResponseCallback> callbacks_; // Protected by lock_.
204
};
205
206
template <typename ProxyType>
207
class DelayablePeerProxy : public TestPeerProxy {
208
 public:
209
  // Add delayability of RPC responses to the delegated impl.
210
  // This class takes ownership of 'proxy'.
211
  explicit DelayablePeerProxy(ThreadPool* pool, ProxyType* proxy)
212
    : TestPeerProxy(pool),
213
      proxy_(CHECK_NOTNULL(proxy)),
214
      delay_response_(false),
215
17
      latch_(1) {
216
17
  }
_ZN2yb9consensus18DelayablePeerProxyINS0_17NoOpTestPeerProxyEEC2EPNS_10ThreadPoolEPS2_
Line
Count
Source
215
5
      latch_(1) {
216
5
  }
_ZN2yb9consensus18DelayablePeerProxyINS0_15MockedPeerProxyEEC2EPNS_10ThreadPoolEPS2_
Line
Count
Source
215
12
      latch_(1) {
216
12
  }
217
218
  // Delay the answer to the next response to this remote
219
  // peer. The response callback will only be called on Respond().
220
6
  virtual void DelayResponse() {
221
6
    std::lock_guard<simple_spinlock> l(lock_);
222
6
    delay_response_ = true;
223
6
    latch_.Reset(1); // Reset for the next time.
224
6
  }
_ZN2yb9consensus18DelayablePeerProxyINS0_17NoOpTestPeerProxyEE13DelayResponseEv
Line
Count
Source
220
2
  virtual void DelayResponse() {
221
2
    std::lock_guard<simple_spinlock> l(lock_);
222
2
    delay_response_ = true;
223
2
    latch_.Reset(1); // Reset for the next time.
224
2
  }
_ZN2yb9consensus18DelayablePeerProxyINS0_15MockedPeerProxyEE13DelayResponseEv
Line
Count
Source
220
4
  virtual void DelayResponse() {
221
4
    std::lock_guard<simple_spinlock> l(lock_);
222
4
    delay_response_ = true;
223
4
    latch_.Reset(1); // Reset for the next time.
224
4
  }
225
226
23
  virtual void RespondUnlessDelayed(Method method) {
227
23
    {
228
23
      std::lock_guard<simple_spinlock> l(lock_);
229
23
      if (delay_response_) {
230
6
        latch_.CountDown();
231
6
        delay_response_ = false;
232
6
        return;
233
6
      }
234
17
    }
235
17
    TestPeerProxy::Respond(method);
236
17
  }
_ZN2yb9consensus18DelayablePeerProxyINS0_17NoOpTestPeerProxyEE20RespondUnlessDelayedENS0_13TestPeerProxy6MethodE
Line
Count
Source
226
11
  virtual void RespondUnlessDelayed(Method method) {
227
11
    {
228
11
      std::lock_guard<simple_spinlock> l(lock_);
229
11
      if (delay_response_) {
230
2
        latch_.CountDown();
231
2
        delay_response_ = false;
232
2
        return;
233
2
      }
234
9
    }
235
9
    TestPeerProxy::Respond(method);
236
9
  }
_ZN2yb9consensus18DelayablePeerProxyINS0_15MockedPeerProxyEE20RespondUnlessDelayedENS0_13TestPeerProxy6MethodE
Line
Count
Source
226
12
  virtual void RespondUnlessDelayed(Method method) {
227
12
    {
228
12
      std::lock_guard<simple_spinlock> l(lock_);
229
12
      if (delay_response_) {
230
4
        latch_.CountDown();
231
4
        delay_response_ = false;
232
4
        return;
233
4
      }
234
8
    }
235
8
    TestPeerProxy::Respond(method);
236
8
  }
237
238
6
  virtual void Respond(Method method) override {
239
6
    latch_.Wait();   // Wait until strictly after peer would have responded.
240
6
    return TestPeerProxy::Respond(method);
241
6
  }
_ZN2yb9consensus18DelayablePeerProxyINS0_17NoOpTestPeerProxyEE7RespondENS0_13TestPeerProxy6MethodE
Line
Count
Source
238
2
  virtual void Respond(Method method) override {
239
2
    latch_.Wait();   // Wait until strictly after peer would have responded.
240
2
    return TestPeerProxy::Respond(method);
241
2
  }
_ZN2yb9consensus18DelayablePeerProxyINS0_15MockedPeerProxyEE7RespondENS0_13TestPeerProxy6MethodE
Line
Count
Source
238
4
  virtual void Respond(Method method) override {
239
4
    latch_.Wait();   // Wait until strictly after peer would have responded.
240
4
    return TestPeerProxy::Respond(method);
241
4
  }
242
243
  virtual void UpdateAsync(const ConsensusRequestPB* request,
244
                           RequestTriggerMode trigger_mode,
245
                           ConsensusResponsePB* response,
246
                           rpc::RpcController* controller,
247
11
                           const rpc::ResponseCallback& callback) override {
248
11
    RegisterCallback(kUpdate, callback);
249
11
    return proxy_->UpdateAsync(
250
11
        request, trigger_mode, response, controller,
251
11
        std::bind(&DelayablePeerProxy::RespondUnlessDelayed, this, kUpdate));
252
11
  }
_ZN2yb9consensus18DelayablePeerProxyINS0_17NoOpTestPeerProxyEE11UpdateAsyncEPKNS0_18ConsensusRequestPBENS0_18RequestTriggerModeEPNS0_19ConsensusResponsePBEPNS_3rpc13RpcControllerERKNSt3__18functionIFvvEEE
Line
Count
Source
247
11
                           const rpc::ResponseCallback& callback) override {
248
11
    RegisterCallback(kUpdate, callback);
249
11
    return proxy_->UpdateAsync(
250
11
        request, trigger_mode, response, controller,
251
11
        std::bind(&DelayablePeerProxy::RespondUnlessDelayed, this, kUpdate));
252
11
  }
Unexecuted instantiation: _ZN2yb9consensus18DelayablePeerProxyINS0_15MockedPeerProxyEE11UpdateAsyncEPKNS0_18ConsensusRequestPBENS0_18RequestTriggerModeEPNS0_19ConsensusResponsePBEPNS_3rpc13RpcControllerERKNSt3__18functionIFvvEEE
253
254
  virtual void RequestConsensusVoteAsync(const VoteRequestPB* request,
255
                                         VoteResponsePB* response,
256
                                         rpc::RpcController* controller,
257
12
                                         const rpc::ResponseCallback& callback) override {
258
12
    RegisterCallback(kRequestVote, callback);
259
12
    return proxy_->RequestConsensusVoteAsync(
260
12
        request, response, controller,
261
12
        std::bind(&DelayablePeerProxy::RespondUnlessDelayed, this, kRequestVote));
262
12
  }
Unexecuted instantiation: _ZN2yb9consensus18DelayablePeerProxyINS0_17NoOpTestPeerProxyEE25RequestConsensusVoteAsyncEPKNS0_13VoteRequestPBEPNS0_14VoteResponsePBEPNS_3rpc13RpcControllerERKNSt3__18functionIFvvEEE
_ZN2yb9consensus18DelayablePeerProxyINS0_15MockedPeerProxyEE25RequestConsensusVoteAsyncEPKNS0_13VoteRequestPBEPNS0_14VoteResponsePBEPNS_3rpc13RpcControllerERKNSt3__18functionIFvvEEE
Line
Count
Source
257
12
                                         const rpc::ResponseCallback& callback) override {
258
12
    RegisterCallback(kRequestVote, callback);
259
12
    return proxy_->RequestConsensusVoteAsync(
260
12
        request, response, controller,
261
12
        std::bind(&DelayablePeerProxy::RespondUnlessDelayed, this, kRequestVote));
262
12
  }
263
264
16
  ProxyType* proxy() const {
265
16
    return proxy_.get();
266
16
  }
_ZNK2yb9consensus18DelayablePeerProxyINS0_17NoOpTestPeerProxyEE5proxyEv
Line
Count
Source
264
4
  ProxyType* proxy() const {
265
4
    return proxy_.get();
266
4
  }
_ZNK2yb9consensus18DelayablePeerProxyINS0_15MockedPeerProxyEE5proxyEv
Line
Count
Source
264
12
  ProxyType* proxy() const {
265
12
    return proxy_.get();
266
12
  }
267
268
 protected:
269
  std::unique_ptr<ProxyType> const proxy_;
270
  bool delay_response_; // Protected by lock_.
271
  CountDownLatch latch_;
272
};
273
274
// Allows complete mocking of a peer's responses.
275
// You set the response, it will respond with that.
276
class MockedPeerProxy : public TestPeerProxy {
277
 public:
278
  explicit MockedPeerProxy(ThreadPool* pool)
279
14
      : TestPeerProxy(pool) {
280
14
  }
281
282
3
  virtual void set_update_response(const ConsensusResponsePB& update_response) {
283
0
    CHECK(update_response.IsInitialized()) << update_response.ShortDebugString();
284
3
    {
285
3
      std::lock_guard<simple_spinlock> l(lock_);
286
3
      update_response_ = update_response;
287
3
    }
288
3
  }
289
290
12
  virtual void set_vote_response(const VoteResponsePB& vote_response) {
291
12
    {
292
12
      std::lock_guard<simple_spinlock> l(lock_);
293
12
      vote_response_ = vote_response;
294
12
    }
295
12
  }
296
297
  virtual void UpdateAsync(const ConsensusRequestPB* request,
298
                           RequestTriggerMode trigger_mode,
299
                           ConsensusResponsePB* response,
300
                           rpc::RpcController* controller,
301
2
                           const rpc::ResponseCallback& callback) override {
302
2
    {
303
2
      std::lock_guard<simple_spinlock> l(lock_);
304
2
      switch (trigger_mode) {
305
1
        case RequestTriggerMode::kNonEmptyOnly: non_empty_only_update_count_++; break;
306
1
        case RequestTriggerMode::kAlwaysSend: forced_update_count_++; break;
307
2
      }
308
2
      update_count_++;
309
2
      *response = update_response_;
310
2
    }
311
2
    return RegisterCallbackAndRespond(kUpdate, callback);
312
2
  }
313
314
  virtual void RequestConsensusVoteAsync(const VoteRequestPB* request,
315
                                         VoteResponsePB* response,
316
                                         rpc::RpcController* controller,
317
12
                                         const rpc::ResponseCallback& callback) override {
318
12
    *response = vote_response_;
319
12
    return RegisterCallbackAndRespond(kRequestVote, callback);
320
12
  }
321
322
  // Return the number of times that UpdateAsync() has been called.
323
3
  int update_count() const {
324
3
    std::lock_guard<simple_spinlock> l(lock_);
325
3
    return update_count_;
326
3
  }
327
328
  // Return the number of times that UpdateAsync() has been for requestes triggered with
329
  // RequestTriggerMode::kNonEmptyOnly.
330
0
  int non_empty_only_update_count() const {
331
0
    std::lock_guard<simple_spinlock> l(lock_);
332
0
    return non_empty_only_update_count_;
333
0
  }
334
335
  // Return the number of times that UpdateAsync() has been for requestes triggered with
336
  // RequestTriggerMode::kAlwaysSend.
337
0
  int forced_update_count() const {
338
0
    std::lock_guard<simple_spinlock> l(lock_);
339
0
    return forced_update_count_;
340
0
  }
341
342
 protected:
343
  int update_count_ = 0;
344
  int forced_update_count_ = 0;
345
  int non_empty_only_update_count_ = 0;
346
347
  ConsensusResponsePB update_response_;
348
  VoteResponsePB vote_response_;
349
};
350
351
// Allows to test peers by emulating a noop remote endpoint that just replies
352
// that the messages were received/replicated/committed.
353
class NoOpTestPeerProxy : public TestPeerProxy {
354
 public:
355
356
  explicit NoOpTestPeerProxy(ThreadPool* pool, const consensus::RaftPeerPB& peer_pb)
357
1.68k
    : TestPeerProxy(pool), peer_pb_(peer_pb) {
358
1.68k
    last_received_.CopyFrom(MinimumOpId());
359
1.68k
  }
360
361
  virtual void UpdateAsync(const ConsensusRequestPB* request,
362
                           RequestTriggerMode trigger_mode,
363
                           ConsensusResponsePB* response,
364
                           rpc::RpcController* controller,
365
11
                           const rpc::ResponseCallback& callback) override {
366
367
11
    response->Clear();
368
11
    {
369
11
      std::lock_guard<simple_spinlock> lock(lock_);
370
11
      if (OpIdLessThan(last_received_, request->preceding_id())) {
371
5
        ConsensusErrorPB* error = response->mutable_status()->mutable_error();
372
5
        error->set_code(ConsensusErrorPB::PRECEDING_ENTRY_DIDNT_MATCH);
373
5
        StatusToPB(STATUS(IllegalState, ""), error->mutable_status());
374
6
      } else if (request->ops_size() > 0) {
375
5
        last_received_.CopyFrom(request->ops(request->ops_size() - 1).id());
376
5
      }
377
378
11
      response->set_responder_uuid(peer_pb_.permanent_uuid());
379
11
      response->set_responder_term(request->caller_term());
380
11
      response->mutable_status()->mutable_last_received()->CopyFrom(last_received_);
381
11
      response->mutable_status()->mutable_last_received_current_leader()->CopyFrom(last_received_);
382
      // We set the last committed index to be the same index as the last received. While
383
      // this is unlikely to happen in a real situation, its not technically incorrect and
384
      // avoids having to come up with some other index that it still correct.
385
11
      response->mutable_status()->set_last_committed_idx(last_received_.index());
386
11
    }
387
11
    return RegisterCallbackAndRespond(kUpdate, callback);
388
11
  }
389
390
  virtual void RequestConsensusVoteAsync(const VoteRequestPB* request,
391
                                         VoteResponsePB* response,
392
                                         rpc::RpcController* controller,
393
314
                                         const rpc::ResponseCallback& callback) override {
394
314
    {
395
314
      std::lock_guard<simple_spinlock> lock(lock_);
396
314
      response->set_responder_uuid(peer_pb_.permanent_uuid());
397
314
      response->set_responder_term(request->candidate_term());
398
314
      response->set_vote_granted(true);
399
314
    }
400
314
    return RegisterCallbackAndRespond(kRequestVote, callback);
401
314
  }
402
403
4
  const OpIdPB& last_received() {
404
4
    std::lock_guard<simple_spinlock> lock(lock_);
405
4
    return last_received_;
406
4
  }
407
408
 private:
409
  const consensus::RaftPeerPB peer_pb_;
410
  ConsensusStatusPB last_status_; // Protected by lock_.
411
  OpIdPB last_received_;            // Protected by lock_.
412
};
413
414
class NoOpTestPeerProxyFactory : public PeerProxyFactory {
415
 public:
416
0
  NoOpTestPeerProxyFactory() {
417
0
    CHECK_OK(ThreadPoolBuilder("test-peer-pool").set_max_threads(3).Build(&pool_));
418
0
    messenger_ = CHECK_RESULT(rpc::MessengerBuilder("test").Build());
419
0
  }
420
421
0
  PeerProxyPtr NewProxy(const RaftPeerPB& peer_pb) override {
422
0
    return std::make_unique<NoOpTestPeerProxy>(pool_.get(), peer_pb);
423
0
  }
424
425
0
  Messenger* messenger() const override {
426
0
    return messenger_.get();
427
0
  }
428
429
  std::unique_ptr<ThreadPool> pool_;
430
  std::unique_ptr<rpc::Messenger> messenger_;
431
};
432
433
typedef std::unordered_map<std::string, std::shared_ptr<RaftConsensus> > TestPeerMap;
434
435
// Thread-safe manager for list of peers being used in tests.
436
class TestPeerMapManager {
437
 public:
438
7
  explicit TestPeerMapManager(const RaftConfigPB& config) : config_(config) {}
439
440
23
  void AddPeer(const std::string& peer_uuid, const std::shared_ptr<RaftConsensus>& peer) {
441
23
    std::lock_guard<simple_spinlock> lock(lock_);
442
23
    InsertOrDie(&peers_, peer_uuid, peer);
443
23
  }
444
445
456
  CHECKED_STATUS GetPeerByIdx(int idx, std::shared_ptr<RaftConsensus>* peer_out) const {
446
456
    CHECK_LT(idx, config_.peers_size());
447
456
    return GetPeerByUuid(config_.peers(idx).permanent_uuid(), peer_out);
448
456
  }
449
450
  CHECKED_STATUS GetPeerByUuid(const std::string& peer_uuid,
451
1.10k
                       std::shared_ptr<RaftConsensus>* peer_out) const {
452
1.10k
    std::lock_guard<simple_spinlock> lock(lock_);
453
1.10k
    if (!FindCopy(peers_, peer_uuid, peer_out)) {
454
19
      return STATUS(NotFound, "Other consensus instance was destroyed");
455
19
    }
456
1.08k
    return Status::OK();
457
1.08k
  }
458
459
2
  void RemovePeer(const std::string& peer_uuid) {
460
2
    std::lock_guard<simple_spinlock> lock(lock_);
461
2
    peers_.erase(peer_uuid);
462
2
  }
463
464
16
  TestPeerMap GetPeerMapCopy() const {
465
16
    std::lock_guard<simple_spinlock> lock(lock_);
466
16
    return peers_;
467
16
  }
468
469
7
  void Clear() {
470
    // We create a copy of the peers before we clear 'peers_' so that there's
471
    // still a reference to each peer. If we reduce the reference count to 0 under
472
    // the lock we might get a deadlock as on shutdown consensus indirectly
473
    // destroys the test proxies which in turn reach into this class.
474
7
    TestPeerMap copy = peers_;
475
7
    {
476
7
      std::lock_guard<simple_spinlock> lock(lock_);
477
7
      peers_.clear();
478
7
    }
479
480
7
  }
481
482
 private:
483
  const RaftConfigPB config_;
484
  TestPeerMap peers_;
485
  mutable simple_spinlock lock_;
486
};
487
488
489
// Allows to test remote peers by emulating an RPC.
490
// Both the "remote" peer's RPC call and the caller peer's response are executed
491
// asynchronously in a ThreadPool.
492
class LocalTestPeerProxy : public TestPeerProxy {
493
 public:
494
  LocalTestPeerProxy(std::string peer_uuid, ThreadPool* pool,
495
                     TestPeerMapManager* peers)
496
      : TestPeerProxy(pool),
497
        peer_uuid_(std::move(peer_uuid)),
498
        peers_(peers),
499
40
        miss_comm_(false) {}
500
501
  void UpdateAsync(const ConsensusRequestPB* request,
502
                   RequestTriggerMode trigger_mode,
503
                   ConsensusResponsePB* response,
504
                   rpc::RpcController* controller,
505
634
                   const rpc::ResponseCallback& callback) override {
506
634
    RegisterCallback(kUpdate, callback);
507
634
    CHECK_OK(pool_->SubmitFunc(
508
634
        std::bind(&LocalTestPeerProxy::SendUpdateRequest, this, *request, response)));
509
634
  }
510
511
  void RequestConsensusVoteAsync(const VoteRequestPB* request,
512
                                 VoteResponsePB* response,
513
                                 rpc::RpcController* controller,
514
16
                                 const rpc::ResponseCallback& callback) override {
515
16
    RegisterCallback(kRequestVote, callback);
516
16
    WARN_NOT_OK(
517
16
        pool_->SubmitFunc(std::bind(&LocalTestPeerProxy::SendVoteRequest, this, request, response)),
518
16
        "Submit failed");
519
16
  }
520
521
  template<class Response>
522
122
  void SetResponseError(const Status& status, Response* response) {
523
122
    tserver::TabletServerErrorPB* error = response->mutable_error();
524
122
    error->set_code(tserver::TabletServerErrorPB::UNKNOWN_ERROR);
525
122
    StatusToPB(status, error->mutable_status());
526
122
    ClearStatus(response);
527
122
  }
_ZN2yb9consensus18LocalTestPeerProxy16SetResponseErrorINS0_19ConsensusResponsePBEEEvRKNS_6StatusEPT_
Line
Count
Source
522
116
  void SetResponseError(const Status& status, Response* response) {
523
116
    tserver::TabletServerErrorPB* error = response->mutable_error();
524
116
    error->set_code(tserver::TabletServerErrorPB::UNKNOWN_ERROR);
525
116
    StatusToPB(status, error->mutable_status());
526
116
    ClearStatus(response);
527
116
  }
_ZN2yb9consensus18LocalTestPeerProxy16SetResponseErrorINS0_14VoteResponsePBEEEvRKNS_6StatusEPT_
Line
Count
Source
522
6
  void SetResponseError(const Status& status, Response* response) {
523
6
    tserver::TabletServerErrorPB* error = response->mutable_error();
524
6
    error->set_code(tserver::TabletServerErrorPB::UNKNOWN_ERROR);
525
6
    StatusToPB(status, error->mutable_status());
526
6
    ClearStatus(response);
527
6
  }
528
529
6
  void ClearStatus(VoteResponsePB* response) {
530
6
  }
531
532
116
  void ClearStatus(ConsensusResponsePB* response) {
533
116
    response->clear_status();
534
116
  }
535
536
  template<class Request, class Response>
537
  void RespondOrMissResponse(Request* request,
538
                             const Response& response_temp,
539
                             Response* final_response,
540
650
                             Method method) {
541
650
    bool miss_comm_copy;
542
650
    {
543
650
      std::lock_guard<simple_spinlock> lock(lock_);
544
650
      miss_comm_copy = miss_comm_;
545
650
      miss_comm_ = false;
546
650
    }
547
650
    if (PREDICT_FALSE(miss_comm_copy)) {
548
0
      VLOG(2) << this << ": injecting fault on " << request->ShortDebugString();
549
103
      SetResponseError(STATUS(IOError, "Artificial error caused by communication "
550
103
          "failure injection."), final_response);
551
547
    } else {
552
547
      final_response->CopyFrom(response_temp);
553
547
    }
554
650
    Respond(method);
555
650
  }
_ZN2yb9consensus18LocalTestPeerProxy21RespondOrMissResponseINS0_18ConsensusRequestPBENS0_19ConsensusResponsePBEEEvPT_RKT0_PS7_NS0_13TestPeerProxy6MethodE
Line
Count
Source
540
634
                             Method method) {
541
634
    bool miss_comm_copy;
542
634
    {
543
634
      std::lock_guard<simple_spinlock> lock(lock_);
544
634
      miss_comm_copy = miss_comm_;
545
634
      miss_comm_ = false;
546
634
    }
547
634
    if (PREDICT_FALSE(miss_comm_copy)) {
548
0
      VLOG(2) << this << ": injecting fault on " << request->ShortDebugString();
549
103
      SetResponseError(STATUS(IOError, "Artificial error caused by communication "
550
103
          "failure injection."), final_response);
551
531
    } else {
552
531
      final_response->CopyFrom(response_temp);
553
531
    }
554
634
    Respond(method);
555
634
  }
_ZN2yb9consensus18LocalTestPeerProxy21RespondOrMissResponseIKNS0_13VoteRequestPBENS0_14VoteResponsePBEEEvPT_RKT0_PS8_NS0_13TestPeerProxy6MethodE
Line
Count
Source
540
16
                             Method method) {
541
16
    bool miss_comm_copy;
542
16
    {
543
16
      std::lock_guard<simple_spinlock> lock(lock_);
544
16
      miss_comm_copy = miss_comm_;
545
16
      miss_comm_ = false;
546
16
    }
547
16
    if (PREDICT_FALSE(miss_comm_copy)) {
548
0
      VLOG(2) << this << ": injecting fault on " << request->ShortDebugString();
549
0
      SetResponseError(STATUS(IOError, "Artificial error caused by communication "
550
0
          "failure injection."), final_response);
551
16
    } else {
552
16
      final_response->CopyFrom(response_temp);
553
16
    }
554
16
    Respond(method);
555
16
  }
556
557
  void SendUpdateRequest(ConsensusRequestPB request,
558
634
                         ConsensusResponsePB* response) {
559
    // Give the other peer a clean response object to write to.
560
634
    ConsensusResponsePB other_peer_resp;
561
634
    std::shared_ptr<RaftConsensus> peer;
562
634
    Status s = peers_->GetPeerByUuid(peer_uuid_, &peer);
563
564
634
    if (s.ok()) {
565
621
      s = peer->Update(&request, &other_peer_resp, CoarseBigDeadline());
566
621
      if (s.ok() && !other_peer_resp.has_error()) {
567
620
        CHECK(other_peer_resp.has_status());
568
620
        CHECK(other_peer_resp.status().IsInitialized());
569
620
      }
570
621
    }
571
634
    if (!s.ok()) {
572
13
      LOG(WARNING) << "Could not Update replica with request: "
573
13
                   << request.ShortDebugString()
574
13
                   << " Status: " << s.ToString();
575
13
      SetResponseError(s, &other_peer_resp);
576
13
    }
577
578
634
    response->CopyFrom(other_peer_resp);
579
634
    RespondOrMissResponse(&request, other_peer_resp, response, kUpdate);
580
634
  }
581
582
583
584
  void SendVoteRequest(const VoteRequestPB* request,
585
16
                       VoteResponsePB* response) {
586
587
    // Copy the request and the response for the other peer so that ownership
588
    // remains as close to the dist. impl. as possible.
589
16
    VoteRequestPB other_peer_req;
590
16
    other_peer_req.CopyFrom(*request);
591
16
    VoteResponsePB other_peer_resp;
592
16
    other_peer_resp.CopyFrom(*response);
593
594
16
    std::shared_ptr<RaftConsensus> peer;
595
16
    Status s = peers_->GetPeerByUuid(peer_uuid_, &peer);
596
597
16
    if (s.ok()) {
598
10
      s = peer->RequestVote(&other_peer_req, &other_peer_resp);
599
10
    }
600
16
    if (!s.ok()) {
601
6
      LOG(WARNING) << "Could not RequestVote from replica with request: "
602
6
                   << other_peer_req.ShortDebugString()
603
6
                   << " Status: " << s.ToString();
604
6
      SetResponseError(s, &other_peer_resp);
605
6
    }
606
607
16
    response->CopyFrom(other_peer_resp);
608
16
    RespondOrMissResponse(request, other_peer_resp, response, kRequestVote);
609
16
  }
610
611
104
  void InjectCommFaultLeaderSide() {
612
0
    VLOG(2) << this << ": injecting fault next time";
613
104
    std::lock_guard<simple_spinlock> lock(lock_);
614
104
    miss_comm_ = true;
615
104
  }
616
617
156
  const std::string& GetTarget() const {
618
156
    return peer_uuid_;
619
156
  }
620
621
 private:
622
  const std::string peer_uuid_;
623
  TestPeerMapManager* const peers_;
624
  bool miss_comm_;
625
};
626
627
class LocalTestPeerProxyFactory : public PeerProxyFactory {
628
 public:
629
  explicit LocalTestPeerProxyFactory(TestPeerMapManager* peers)
630
29
    : peers_(peers) {
631
29
    CHECK_OK(ThreadPoolBuilder("test-peer-pool").set_max_threads(3).Build(&pool_));
632
29
    messenger_ = rpc::CreateAutoShutdownMessengerHolder(
633
29
        CHECK_RESULT(rpc::MessengerBuilder("test").Build()));
634
29
  }
635
636
40
  PeerProxyPtr NewProxy(const consensus::RaftPeerPB& peer_pb) override {
637
40
    auto new_proxy = std::make_unique<LocalTestPeerProxy>(
638
40
        peer_pb.permanent_uuid(), pool_.get(), peers_);
639
40
    proxies_.push_back(new_proxy.get());
640
40
    return new_proxy;
641
40
  }
642
643
104
  virtual const vector<LocalTestPeerProxy*>& GetProxies() {
644
104
    return proxies_;
645
104
  }
646
647
81
  rpc::Messenger* messenger() const override {
648
81
    return messenger_.get();
649
81
  }
650
651
 private:
652
  std::unique_ptr<ThreadPool> pool_;
653
  rpc::AutoShutdownMessengerHolder messenger_;
654
  TestPeerMapManager* const peers_;
655
    // NOTE: There is no need to delete this on the dctor because proxies are externally managed
656
  vector<LocalTestPeerProxy*> proxies_;
657
};
658
659
// A simple implementation of the transaction driver.
660
// This is usually implemented by OperationDriver but here we
661
// keep the implementation to the minimally required to have consensus
662
// work.
663
class TestDriver : public ConsensusRoundCallback {
664
 public:
665
  TestDriver(ThreadPool* pool, const scoped_refptr<ConsensusRound>& round)
666
0
      : round_(round), pool_(pool) {
667
0
  }
668
669
0
  void SetRound(const scoped_refptr<ConsensusRound>& round) {
670
0
    round_ = round;
671
0
  }
672
673
  // Does nothing but enqueue the Apply
674
  void ReplicationFinished(
675
0
      const Status& status, int64_t leader_term, OpIds* applied_op_ids) override {
676
0
    if (status.IsAborted()) {
677
0
      Cleanup();
678
0
      return;
679
0
    }
680
0
    CHECK_OK(status);
681
0
    CHECK_OK(pool_->SubmitFunc(std::bind(&TestDriver::Apply, this)));
682
0
  }
Unexecuted instantiation: _ZN2yb9consensus10TestDriver19ReplicationFinishedERKNS_6StatusExPNSt3__16vectorINS_4OpIdENS5_9allocatorIS7_EEEE
Unexecuted instantiation: _ZN2yb9consensus10TestDriver19ReplicationFinishedERKNS_6StatusExPNSt3__16vectorINS_4OpIdENS5_9allocatorIS7_EEEE
683
684
0
  void AddedToLeader(const OpId& op_id, const OpId& committed_op_id) override {}
685
686
  // Called in all modes to delete the transaction and, transitively, the consensus
687
  // round.
688
0
  void Cleanup() {
689
0
    delete this;
690
0
  }
691
692
  scoped_refptr<ConsensusRound> round_;
693
694
 private:
695
  // The commit message has the exact same type of the replicate message, but
696
  // no content.
697
0
  void Apply() {}
698
699
0
  void CommitCallback(const Status& s) {
700
0
    CHECK_OK(s);
701
0
    Cleanup();
702
0
  }
703
704
  ThreadPool* pool_;
705
};
706
707
// Fake ReplicaOperationFactory that allows for instantiating and unit
708
// testing RaftConsensusState. Does not actually support running transactions.
709
class MockOperationFactory : public TestConsensusContext {
710
 public:
711
  CHECKED_STATUS StartReplicaOperation(
712
0
      const scoped_refptr<ConsensusRound>& round, HybridTime propagated_hybrid_time) override {
713
0
    return StartReplicaOperationMock(round.get());
714
0
  }
715
716
  MOCK_METHOD1(StartReplicaOperationMock, Status(ConsensusRound* round));
717
};
718
719
// A transaction factory for tests, usually this is implemented by TabletPeer.
720
class TestOperationFactory : public TestConsensusContext {
721
 public:
722
23
  TestOperationFactory() {
723
23
    CHECK_OK(ThreadPoolBuilder("test-operation-factory").set_max_threads(1).Build(&pool_));
724
23
  }
725
726
23
  void SetConsensus(Consensus* consensus) {
727
23
    consensus_ = consensus;
728
23
  }
729
730
  CHECKED_STATUS StartReplicaOperation(
731
0
      const scoped_refptr<ConsensusRound>& round, HybridTime propagated_hybrid_time) override {
732
0
    auto txn = new TestDriver(pool_.get(), round);
733
0
    txn->round_->SetCallback(txn);
734
0
    return Status::OK();
735
0
  }
736
737
0
  void ReplicateAsync(ConsensusRound* round) {
738
0
    CHECK_OK(consensus_->TEST_Replicate(round));
739
0
  }
740
741
40
  void WaitDone() {
742
40
    pool_->Wait();
743
40
  }
744
745
23
  void ShutDown() {
746
23
    WaitDone();
747
23
    pool_->Shutdown();
748
23
  }
749
750
23
  ~TestOperationFactory() {
751
23
    ShutDown();
752
23
  }
753
754
 private:
755
  std::unique_ptr<ThreadPool> pool_;
756
  Consensus* consensus_ = nullptr;
757
};
758
759
// Consensus fault hooks impl. that simply counts the number of calls to
760
// each method.
761
// Allows passing another hook instance so that we can use both.
762
// If non-null, the passed hook instance will be called first for all methods.
763
class CounterHooks : public Consensus::ConsensusFaultHooks {
764
 public:
765
  explicit CounterHooks(
766
      std::shared_ptr<Consensus::ConsensusFaultHooks> current_hook)
767
      : current_hook_(std::move(current_hook)),
768
        pre_start_calls_(0),
769
        post_start_calls_(0),
770
        pre_config_change_calls_(0),
771
        post_config_change_calls_(0),
772
        pre_replicate_calls_(0),
773
        post_replicate_calls_(0),
774
        pre_update_calls_(0),
775
        post_update_calls_(0),
776
        pre_shutdown_calls_(0),
777
2
        post_shutdown_calls_(0) {}
778
779
2
  virtual CHECKED_STATUS PreStart() override {
780
2
    if (current_hook_.get()) RETURN_NOT_OK(current_hook_->PreStart());
781
2
    std::lock_guard<simple_spinlock> lock(lock_);
782
2
    pre_start_calls_++;
783
2
    return Status::OK();
784
2
  }
785
786
2
  virtual CHECKED_STATUS PostStart() override {
787
2
    if (current_hook_.get()) RETURN_NOT_OK(current_hook_->PostStart());
788
2
    std::lock_guard<simple_spinlock> lock(lock_);
789
2
    post_start_calls_++;
790
2
    return Status::OK();
791
2
  }
792
793
0
  virtual CHECKED_STATUS PreConfigChange() override {
794
0
    if (current_hook_.get()) RETURN_NOT_OK(current_hook_->PreConfigChange());
795
0
    std::lock_guard<simple_spinlock> lock(lock_);
796
0
    pre_config_change_calls_++;
797
0
    return Status::OK();
798
0
  }
Unexecuted instantiation: _ZN2yb9consensus12CounterHooks15PreConfigChangeEv
Unexecuted instantiation: _ZN2yb9consensus12CounterHooks15PreConfigChangeEv
799
800
0
  virtual CHECKED_STATUS PostConfigChange() override {
801
0
    if (current_hook_.get()) RETURN_NOT_OK(current_hook_->PostConfigChange());
802
0
    std::lock_guard<simple_spinlock> lock(lock_);
803
0
    post_config_change_calls_++;
804
0
    return Status::OK();
805
0
  }
Unexecuted instantiation: _ZN2yb9consensus12CounterHooks16PostConfigChangeEv
Unexecuted instantiation: _ZN2yb9consensus12CounterHooks16PostConfigChangeEv
806
807
0
  virtual CHECKED_STATUS PreReplicate() override {
808
0
    if (current_hook_.get()) RETURN_NOT_OK(current_hook_->PreReplicate());
809
0
    std::lock_guard<simple_spinlock> lock(lock_);
810
0
    pre_replicate_calls_++;
811
0
    return Status::OK();
812
0
  }
Unexecuted instantiation: _ZN2yb9consensus12CounterHooks12PreReplicateEv
Unexecuted instantiation: _ZN2yb9consensus12CounterHooks12PreReplicateEv
813
814
0
  virtual CHECKED_STATUS PostReplicate() override {
815
0
    if (current_hook_.get()) RETURN_NOT_OK(current_hook_->PostReplicate());
816
0
    std::lock_guard<simple_spinlock> lock(lock_);
817
0
    post_replicate_calls_++;
818
0
    return Status::OK();
819
0
  }
Unexecuted instantiation: _ZN2yb9consensus12CounterHooks13PostReplicateEv
Unexecuted instantiation: _ZN2yb9consensus12CounterHooks13PostReplicateEv
820
821
14
  virtual CHECKED_STATUS PreUpdate() override {
822
14
    if (current_hook_.get()) RETURN_NOT_OK(current_hook_->PreUpdate());
823
14
    std::lock_guard<simple_spinlock> lock(lock_);
824
14
    pre_update_calls_++;
825
14
    return Status::OK();
826
14
  }
827
828
14
  virtual CHECKED_STATUS PostUpdate() override {
829
14
    if (current_hook_.get()) RETURN_NOT_OK(current_hook_->PostUpdate());
830
14
    std::lock_guard<simple_spinlock> lock(lock_);
831
14
    post_update_calls_++;
832
14
    return Status::OK();
833
14
  }
834
835
2
  virtual CHECKED_STATUS PreShutdown() override {
836
2
    if (current_hook_.get()) RETURN_NOT_OK(current_hook_->PreShutdown());
837
2
    std::lock_guard<simple_spinlock> lock(lock_);
838
2
    pre_shutdown_calls_++;
839
2
    return Status::OK();
840
2
  }
841
842
2
  virtual CHECKED_STATUS PostShutdown() override {
843
2
    if (current_hook_.get()) RETURN_NOT_OK(current_hook_->PostShutdown());
844
2
    std::lock_guard<simple_spinlock> lock(lock_);
845
2
    post_shutdown_calls_++;
846
2
    return Status::OK();
847
2
  }
848
849
0
  int num_pre_start_calls() {
850
0
    std::lock_guard<simple_spinlock> lock(lock_);
851
0
    return pre_start_calls_;
852
0
  }
853
854
0
  int num_post_start_calls() {
855
0
    std::lock_guard<simple_spinlock> lock(lock_);
856
0
    return post_start_calls_;
857
0
  }
858
859
0
  int num_pre_config_change_calls() {
860
0
    std::lock_guard<simple_spinlock> lock(lock_);
861
0
    return pre_config_change_calls_;
862
0
  }
863
864
0
  int num_post_config_change_calls() {
865
0
    std::lock_guard<simple_spinlock> lock(lock_);
866
0
    return post_config_change_calls_;
867
0
  }
868
869
0
  int num_pre_replicate_calls() {
870
0
    std::lock_guard<simple_spinlock> lock(lock_);
871
0
    return pre_replicate_calls_;
872
0
  }
873
874
0
  int num_post_replicate_calls() {
875
0
    std::lock_guard<simple_spinlock> lock(lock_);
876
0
    return post_replicate_calls_;
877
0
  }
878
879
4
  int num_pre_update_calls() {
880
4
    std::lock_guard<simple_spinlock> lock(lock_);
881
4
    return pre_update_calls_;
882
4
  }
883
884
0
  int num_post_update_calls() {
885
0
    std::lock_guard<simple_spinlock> lock(lock_);
886
0
    return post_update_calls_;
887
0
  }
888
889
0
  int num_pre_shutdown_calls() {
890
0
    std::lock_guard<simple_spinlock> lock(lock_);
891
0
    return pre_shutdown_calls_;
892
0
  }
893
894
0
  int num_post_shutdown_calls() {
895
0
    std::lock_guard<simple_spinlock> lock(lock_);
896
0
    return post_shutdown_calls_;
897
0
  }
898
899
 private:
900
  std::shared_ptr<Consensus::ConsensusFaultHooks> current_hook_;
901
  int pre_start_calls_;
902
  int post_start_calls_;
903
  int pre_config_change_calls_;
904
  int post_config_change_calls_;
905
  int pre_replicate_calls_;
906
  int post_replicate_calls_;
907
  int pre_update_calls_;
908
  int post_update_calls_;
909
  int pre_shutdown_calls_;
910
  int post_shutdown_calls_;
911
912
  // Lock that protects updates to the counters.
913
  mutable simple_spinlock lock_;
914
};
915
916
class TestRaftConsensusQueueIface : public PeerMessageQueueObserver {
917
 public:
918
71
  bool IsMajorityReplicated(int64_t index) {
919
71
    std::lock_guard<simple_spinlock> lock(lock_);
920
71
    return majority_replicated_op_id_.index >= index;
921
71
  }
922
923
0
  OpId majority_replicated_op_id() {
924
0
    std::lock_guard<simple_spinlock> lock(lock_);
925
0
    return majority_replicated_op_id_;
926
0
  }
927
928
13
  void WaitForMajorityReplicatedIndex(int64_t index, MonoDelta timeout = MonoDelta(30s)) {
929
13
    ASSERT_OK(WaitFor(
930
13
        [&]() { return IsMajorityReplicated(index); },
931
13
        timeout, Format("waiting for index $0 to be replicated", index)));
932
13
  }
933
934
 protected:
935
  void UpdateMajorityReplicated(
936
      const MajorityReplicatedData& data, OpId* committed_index,
937
740
      OpId* last_applied_op_id) override {
938
740
    std::lock_guard<simple_spinlock> lock(lock_);
939
740
    majority_replicated_op_id_ = data.op_id;
940
740
    *committed_index = data.op_id;
941
740
    *last_applied_op_id = data.op_id;
942
740
  }
943
0
  void NotifyTermChange(int64_t term) override {}
944
  void NotifyFailedFollower(const std::string& uuid,
945
                            int64_t term,
946
0
                            const std::string& reason) override {}
947
0
  void MajorityReplicatedNumSSTFilesChanged(uint64_t) override {}
948
949
 private:
950
  mutable simple_spinlock lock_;
951
  OpId majority_replicated_op_id_;
952
};
953
954
}  // namespace consensus
955
}  // namespace yb
956
957
#endif /* YB_CONSENSUS_CONSENSUS_TEST_UTIL_H_ */