YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/consensus/raft_consensus-test.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include <gmock/gmock.h>
34
#include <gtest/gtest.h>
35
36
#include "yb/common/schema.h"
37
#include "yb/common/wire_protocol-test-util.h"
38
39
#include "yb/consensus/consensus-test-util.h"
40
#include "yb/consensus/consensus_types.h"
41
#include "yb/consensus/log.h"
42
#include "yb/consensus/peer_manager.h"
43
44
#include "yb/fs/fs_manager.h"
45
46
#include "yb/gutil/bind.h"
47
#include "yb/gutil/stl_util.h"
48
49
#include "yb/server/logical_clock.h"
50
51
#include "yb/util/async_util.h"
52
#include "yb/util/mem_tracker.h"
53
#include "yb/util/metrics.h"
54
#include "yb/util/status_log.h"
55
#include "yb/util/test_macros.h"
56
#include "yb/util/test_util.h"
57
58
DECLARE_bool(enable_leader_failure_detection);
59
DECLARE_bool(never_fsync);
60
61
METRIC_DECLARE_entity(table);
62
METRIC_DECLARE_entity(tablet);
63
64
using std::shared_ptr;
65
using std::string;
66
67
namespace yb {
68
namespace consensus {
69
70
using log::Log;
71
using log::LogOptions;
72
using ::testing::_;
73
using ::testing::AnyNumber;
74
using ::testing::AtLeast;
75
using ::testing::Eq;
76
using ::testing::InSequence;
77
using ::testing::Invoke;
78
using ::testing::Mock;
79
using ::testing::Property;
80
using ::testing::Return;
81
82
const char* kTestTable = "TestTable";
83
const char* kTestTablet = "TestTablet";
84
const char* kLocalPeerUuid = "peer-0";
85
86
// A simple map to collect the results of a sequence of transactions.
87
typedef std::map<OpIdPB, Status, OpIdCompareFunctor> StatusesMap;
88
89
class MockQueue : public PeerMessageQueue {
90
 public:
91
  explicit MockQueue(const scoped_refptr<MetricEntity>& tablet_metric_entity, log::Log* log,
92
                     const server::ClockPtr& clock,
93
                     std::unique_ptr<ThreadPoolToken> raft_pool_observers_token)
94
      : PeerMessageQueue(
95
          tablet_metric_entity, log, nullptr /* server_tracker */, nullptr /* parent_tracker */,
96
          FakeRaftPeerPB(kLocalPeerUuid), kTestTablet, clock, nullptr /* consensus_queue */,
97
6
          std::move(raft_pool_observers_token)) {}
98
99
  MOCK_METHOD1(Init, void(const OpId& locally_replicated_index));
100
  MOCK_METHOD4(SetLeaderMode, void(const OpId& committed_opid,
101
                                   int64_t current_term,
102
                                   const OpId& last_applied_op_id,
103
                                   const RaftConfigPB& active_config));
104
  MOCK_METHOD0(SetNonLeaderMode, void());
105
  Status AppendOperations(const ReplicateMsgs& msgs,
106
                          const yb::OpId& committed_op_id,
107
46
                          RestartSafeCoarseTimePoint time) override {
108
46
    return AppendOperationsMock(msgs, committed_op_id, time);
109
46
  }
110
  MOCK_METHOD3(AppendOperationsMock, Status(const ReplicateMsgs& msgs,
111
                                            const yb::OpId& committed_op_id,
112
                                            RestartSafeCoarseTimePoint time));
113
  MOCK_METHOD1(TrackPeer, void(const string&));
114
  MOCK_METHOD1(UntrackPeer, void(const string&));
115
  MOCK_METHOD6(RequestForPeer, Status(const std::string& uuid,
116
                                      ConsensusRequestPB* request,
117
                                      ReplicateMsgsHolder* msgs_holder,
118
                                      bool* needs_remote_bootstrap,
119
                                      PeerMemberType* member_type,
120
                                      bool* last_exchange_successful));
121
  MOCK_METHOD2(ResponseFromPeer, bool(const std::string& peer_uuid,
122
                                      const ConsensusResponsePB& response));
123
  MOCK_METHOD0(Close, void());
124
};
125
126
class MockPeerManager : public PeerManager {
127
 public:
128
6
  MockPeerManager() : PeerManager("", "", nullptr, nullptr, nullptr, nullptr) {}
129
  MOCK_METHOD1(UpdateRaftConfig, void(const consensus::RaftConfigPB& config));
130
  MOCK_METHOD1(SignalRequest, void(RequestTriggerMode trigger_mode));
131
  MOCK_METHOD0(Close, void());
132
};
133
134
class RaftConsensusSpy : public RaftConsensus {
135
 public:
136
  typedef Callback<Status(const scoped_refptr<ConsensusRound>& round)> AppendCallback;
137
138
  RaftConsensusSpy(const ConsensusOptions& options,
139
                   std::unique_ptr<ConsensusMetadata> cmeta,
140
                   std::unique_ptr<PeerProxyFactory> proxy_factory,
141
                   std::unique_ptr<PeerMessageQueue> queue,
142
                   std::unique_ptr<PeerManager> peer_manager,
143
                   std::unique_ptr<ThreadPoolToken> raft_pool_token,
144
                   const scoped_refptr<MetricEntity>& table_metric_entity,
145
                   const scoped_refptr<MetricEntity>& tablet_metric_entity,
146
                   const std::string& peer_uuid,
147
                   const scoped_refptr<server::Clock>& clock,
148
                   ConsensusContext* consensus_context,
149
                   const scoped_refptr<log::Log>& log,
150
                   const shared_ptr<MemTracker>& parent_mem_tracker,
151
                   const Callback<void(std::shared_ptr<consensus::StateChangeContext> context)>&
152
                     mark_dirty_clbk)
153
    : RaftConsensus(options,
154
                    std::move(cmeta),
155
                    std::move(proxy_factory),
156
                    std::move(queue),
157
                    std::move(peer_manager),
158
                    std::move(raft_pool_token),
159
                    table_metric_entity,
160
                    tablet_metric_entity,
161
                    peer_uuid,
162
                    clock,
163
                    consensus_context,
164
                    log,
165
                    parent_mem_tracker,
166
                    mark_dirty_clbk,
167
                    YQL_TABLE_TYPE,
168
6
                    nullptr /* retryable_requests */) {
169
    // These "aliases" allow us to count invocations and assert on them.
170
6
    ON_CALL(*this, StartConsensusOnlyRoundUnlocked(_))
171
6
        .WillByDefault(Invoke(this,
172
6
              &RaftConsensusSpy::StartNonLeaderConsensusRoundUnlockedConcrete));
173
6
    ON_CALL(*this, NonTrackedRoundReplicationFinished(_, _, _))
174
6
        .WillByDefault(Invoke(this, &RaftConsensusSpy::NonTrackedRoundReplicationFinishedConcrete));
175
6
  }
176
177
  MOCK_METHOD1(AppendNewRoundToQueueUnlocked, Status(const scoped_refptr<ConsensusRound>& round));
178
5
  Status AppendNewRoundToQueueUnlockedConcrete(const scoped_refptr<ConsensusRound>& round) {
179
5
    return RaftConsensus::AppendNewRoundToQueueUnlocked(round);
180
5
  }
181
182
  MOCK_METHOD2(AppendNewRoundsToQueueUnlocked, Status(
183
      const ConsensusRounds& rounds, size_t* processed_rounds));
184
  Status AppendNewRoundsToQueueUnlockedConcrete(
185
26
      const ConsensusRounds& rounds, size_t* processed_rounds) {
186
26
    return RaftConsensus::AppendNewRoundsToQueueUnlocked(rounds, processed_rounds);
187
26
  }
188
189
  MOCK_METHOD1(StartConsensusOnlyRoundUnlocked, Status(const ReplicateMsgPtr& msg));
190
18
  Status StartNonLeaderConsensusRoundUnlockedConcrete(const ReplicateMsgPtr& msg) {
191
18
    return RaftConsensus::StartConsensusOnlyRoundUnlocked(msg);
192
18
  }
193
194
  MOCK_METHOD3(NonTrackedRoundReplicationFinished, void(ConsensusRound* round,
195
                                                   const StdStatusCallback& client_cb,
196
                                                   const Status& status));
197
  void NonTrackedRoundReplicationFinishedConcrete(ConsensusRound* round,
198
                                             const StdStatusCallback& client_cb,
199
40
                                             const Status& status) {
200
40
    LOG(INFO) << "Round " << round->id() << " finished with status: " << status;
201
40
  }
202
203
 private:
204
  DISALLOW_COPY_AND_ASSIGN(RaftConsensusSpy);
205
};
206
207
473
void DoNothing(std::shared_ptr<consensus::StateChangeContext> context) {
208
473
}
209
210
class RaftConsensusTest : public YBTest {
211
 public:
212
  RaftConsensusTest()
213
      : clock_(server::LogicalClock::CreateStartingAt(HybridTime(0))),
214
        table_metric_entity_(
215
          METRIC_ENTITY_table.Instantiate(&metric_registry_, "raft-consensus-test-table")),
216
        tablet_metric_entity_(
217
          METRIC_ENTITY_tablet.Instantiate(&metric_registry_, "raft-consensus-test-tablet")),
218
6
        schema_(GetSimpleTestSchema()) {
219
6
    FLAGS_enable_leader_failure_detection = false;
220
6
    options_.tablet_id = kTestTablet;
221
6
  }
222
223
6
  void SetUp() override {
224
6
    YBTest::SetUp();
225
226
6
    LogOptions options;
227
6
    string test_path = GetTestPath("test-peer-root");
228
229
    // TODO mock the Log too, since we're gonna mock the queue
230
    // monitors and pretty much everything else.
231
6
    fs_manager_.reset(new FsManager(env_.get(), test_path, "tserver_test"));
232
6
    ASSERT_OK(fs_manager_->CreateInitialFileSystemLayout());
233
6
    ASSERT_OK(fs_manager_->Open());
234
6
    ASSERT_OK(ThreadPoolBuilder("log").Build(&log_thread_pool_));
235
6
    ASSERT_OK(Log::Open(LogOptions(),
236
6
                       kTestTablet,
237
6
                       fs_manager_->GetFirstTabletWalDirOrDie(kTestTable, kTestTablet),
238
6
                       fs_manager_->uuid(),
239
6
                       schema_,
240
6
                       0, // schema_version
241
6
                       nullptr, // table_metric_entity
242
6
                       nullptr, // tablet_metric_entity
243
6
                       log_thread_pool_.get(),
244
6
                       log_thread_pool_.get(),
245
6
                       std::numeric_limits<int64_t>::max(), // cdc_min_replicated_index
246
6
                       &log_));
247
248
6
    log_->TEST_SetAllOpIdsSafe(true);
249
250
6
    ASSERT_OK(ThreadPoolBuilder("raft-pool").Build(&raft_pool_));
251
6
    std::unique_ptr<ThreadPoolToken> raft_pool_token =
252
6
        raft_pool_->NewToken(ThreadPool::ExecutionMode::CONCURRENT);
253
6
    queue_ = new MockQueue(tablet_metric_entity_, log_.get(), clock_, std::move(raft_pool_token));
254
6
    peer_manager_ = new MockPeerManager;
255
6
    operation_factory_.reset(new MockOperationFactory);
256
257
6
    ON_CALL(*queue_, AppendOperationsMock(_, _, _))
258
6
        .WillByDefault(Invoke(this, &RaftConsensusTest::AppendToLog));
259
6
  }
260
261
6
  void SetUpConsensus(int64_t initial_term = consensus::kMinimumTerm, int num_peers = 1) {
262
6
    config_ = BuildRaftConfigPBForTests(num_peers);
263
6
    config_.set_opid_index(kInvalidOpIdIndex);
264
265
6
    auto proxy_factory = std::make_unique<LocalTestPeerProxyFactory>(nullptr);
266
267
6
    string peer_uuid = config_.peers(num_peers - 1).permanent_uuid();
268
269
6
    std::unique_ptr<ConsensusMetadata> cmeta;
270
6
    ASSERT_OK(ConsensusMetadata::Create(fs_manager_.get(), kTestTablet, peer_uuid,
271
6
                                       config_, initial_term, &cmeta));
272
273
6
    std::unique_ptr<ThreadPoolToken> raft_pool_token =
274
6
        raft_pool_->NewToken(ThreadPool::ExecutionMode::CONCURRENT);
275
276
6
    consensus_.reset(new RaftConsensusSpy(options_,
277
6
                                          std::move(cmeta),
278
6
                                          std::move(proxy_factory),
279
6
                                          std::unique_ptr<PeerMessageQueue>(queue_),
280
6
                                          std::unique_ptr<PeerManager>(peer_manager_),
281
6
                                          std::move(raft_pool_token),
282
6
                                          table_metric_entity_,
283
6
                                          tablet_metric_entity_,
284
6
                                          peer_uuid,
285
6
                                          clock_,
286
6
                                          operation_factory_.get(),
287
6
                                          log_.get(),
288
6
                                          MemTracker::GetRootTracker(),
289
6
                                          Bind(&DoNothing)));
290
291
6
    ON_CALL(*consensus_.get(), AppendNewRoundToQueueUnlocked(_))
292
6
        .WillByDefault(Invoke(this, &RaftConsensusTest::MockAppendNewRound));
293
6
    ON_CALL(*consensus_.get(), AppendNewRoundsToQueueUnlocked(_, _))
294
6
        .WillByDefault(Invoke(this, &RaftConsensusTest::MockAppendNewRounds));
295
6
  }
296
297
  Status AppendToLog(const ReplicateMsgs& msgs,
298
                     const yb::OpId& committed_op_id,
299
18
                     RestartSafeCoarseTimePoint time) {
300
18
    return log_->AsyncAppendReplicates(msgs, committed_op_id, time,
301
18
                                       Bind(LogAppendCallback));
302
18
  }
303
304
18
  static void LogAppendCallback(const Status& s) {
305
18
    ASSERT_OK(s);
306
18
  }
307
308
5
  Status MockAppendNewRound(const scoped_refptr<ConsensusRound>& round) {
309
5
    return consensus_->AppendNewRoundToQueueUnlockedConcrete(round);
310
5
  }
311
312
26
  Status MockAppendNewRounds(const ConsensusRounds& rounds, size_t* processed_rounds) {
313
26
    for (const auto& round : rounds) {
314
26
      rounds_.push_back(round);
315
26
    }
316
26
    RETURN_NOT_OK(consensus_->AppendNewRoundsToQueueUnlockedConcrete(rounds, processed_rounds));
317
26
    for (const auto& round : rounds) {
318
26
      LOG(INFO) << "Round append: " << round->id() << ", ReplicateMsg: "
319
26
                << round->replicate_msg()->ShortDebugString();
320
26
    }
321
26
    return Status::OK();
322
26
  }
323
324
3
  void SetUpGeneralExpectations() {
325
3
    EXPECT_CALL(*peer_manager_, SignalRequest(_))
326
3
        .Times(AnyNumber());
327
3
    EXPECT_CALL(*peer_manager_, Close())
328
3
        .Times(AtLeast(1));
329
3
    EXPECT_CALL(*queue_, Close())
330
3
        .Times(1);
331
3
    EXPECT_CALL(*consensus_.get(), AppendNewRoundToQueueUnlocked(_))
332
3
        .Times(AnyNumber());
333
3
  }
334
335
  // Create a ConsensusRequestPB suitable to send to a peer.
336
  ConsensusRequestPB MakeConsensusRequest(int64_t caller_term,
337
                                          const string& caller_uuid,
338
                                          const OpIdPB& preceding_opid);
339
340
  // Add a single no-op with the given OpId to a ConsensusRequestPB.
341
  void AddNoOpToConsensusRequest(ConsensusRequestPB* request, const OpIdPB& noop_opid);
342
343
21
  scoped_refptr<ConsensusRound> AppendNoOpRound() {
344
21
    auto replicate_ptr = std::make_shared<ReplicateMsg>();
345
21
    replicate_ptr->set_op_type(NO_OP);
346
21
    replicate_ptr->set_hybrid_time(clock_->Now().ToUint64());
347
21
    scoped_refptr<ConsensusRound> round(new ConsensusRound(consensus_.get(),
348
21
                                                           std::move(replicate_ptr)));
349
21
    round->SetCallback(MakeNonTrackedRoundCallback(
350
21
        round.get(),
351
21
        std::bind(&RaftConsensusSpy::NonTrackedRoundReplicationFinished,
352
21
                  consensus_.get(), round.get(), &DoNothingStatusCB, std::placeholders::_1)));
353
21
    round->BindToTerm(consensus_->TEST_LeaderTerm());
354
355
21
    CHECK_OK(consensus_->TEST_Replicate(round));
356
21
    LOG(INFO) << "Appended NO_OP round with opid " << round->id();
357
21
    return round;
358
21
  }
359
360
1
  void DumpRounds() {
361
1
    LOG(INFO) << "Dumping rounds...";
362
3
    for (const scoped_refptr<ConsensusRound>& round : rounds_) {
363
3
      LOG(INFO) << "Round: OpId " << round->id() << ", ReplicateMsg: "
364
3
                << round->replicate_msg()->ShortDebugString();
365
3
    }
366
1
  }
367
368
 protected:
369
  std::unique_ptr<ThreadPool> raft_pool_;
370
  ConsensusOptions options_;
371
  RaftConfigPB config_;
372
  OpIdPB initial_id_;
373
  std::unique_ptr<FsManager> fs_manager_;
374
  std::unique_ptr<ThreadPool> log_thread_pool_;
375
  scoped_refptr<Log> log_;
376
  std::unique_ptr<PeerProxyFactory> proxy_factory_;
377
  scoped_refptr<server::Clock> clock_;
378
  MetricRegistry metric_registry_;
379
  scoped_refptr<MetricEntity> table_metric_entity_;
380
  scoped_refptr<MetricEntity> tablet_metric_entity_;
381
  const Schema schema_;
382
  shared_ptr<RaftConsensusSpy> consensus_;
383
384
  vector<scoped_refptr<ConsensusRound> > rounds_;
385
386
  // Mocks.
387
  // NOTE: both 'queue_' and 'peer_manager_' belong to 'consensus_' and may be deleted before
388
  // the test is.
389
  MockQueue* queue_;
390
  MockPeerManager* peer_manager_;
391
  std::unique_ptr<MockOperationFactory> operation_factory_;
392
};
393
394
ConsensusRequestPB RaftConsensusTest::MakeConsensusRequest(int64_t caller_term,
395
                                                           const string& caller_uuid,
396
5
                                                           const OpIdPB& preceding_opid) {
397
5
  ConsensusRequestPB request;
398
5
  request.set_caller_term(caller_term);
399
5
  request.set_caller_uuid(caller_uuid);
400
5
  request.set_tablet_id(kTestTablet);
401
5
  *request.mutable_preceding_id() = preceding_opid;
402
5
  return request;
403
5
}
404
405
void RaftConsensusTest::AddNoOpToConsensusRequest(ConsensusRequestPB* request,
406
4
                                                  const OpIdPB& noop_opid) {
407
4
  ReplicateMsg* noop_msg = request->add_ops();
408
4
  *noop_msg->mutable_id() = noop_opid;
409
4
  noop_msg->set_op_type(NO_OP);
410
4
  noop_msg->set_hybrid_time(clock_->Now().ToUint64());
411
4
  noop_msg->mutable_noop_request();
412
4
}
413
414
// Tests that the committed index moves along with the majority replicated
415
// index when the terms are the same.
416
1
TEST_F(RaftConsensusTest, TestCommittedIndexWhenInSameTerm) {
417
1
  SetUpConsensus();
418
1
  SetUpGeneralExpectations();
419
1
  EXPECT_CALL(*peer_manager_, UpdateRaftConfig(_))
420
1
      .Times(1);
421
1
  EXPECT_CALL(*queue_, Init(_))
422
1
      .Times(1);
423
1
  EXPECT_CALL(*queue_, SetLeaderMode(_, _, _, _))
424
1
      .Times(1);
425
1
  EXPECT_CALL(*consensus_.get(), AppendNewRoundToQueueUnlocked(_))
426
1
      .Times(1);
427
1
  EXPECT_CALL(*consensus_.get(), AppendNewRoundsToQueueUnlocked(_, _))
428
1
      .Times(11);
429
1
  EXPECT_CALL(*queue_, AppendOperationsMock(_, _, _))
430
1
      .Times(22).WillRepeatedly(Return(Status::OK()));
431
432
1
  ConsensusBootstrapInfo info;
433
1
  ASSERT_OK(consensus_->Start(info));
434
1
  ASSERT_OK(consensus_->EmulateElection());
435
436
  // Commit the first noop round, created on EmulateElection();
437
1
  OpId committed_index;
438
1
  OpId last_applied_op_id;
439
1
  consensus_->TEST_UpdateMajorityReplicated(
440
1
      rounds_[0]->id(), &committed_index, &last_applied_op_id);
441
1
  ASSERT_EQ(rounds_[0]->id(), committed_index);
442
1
  ASSERT_EQ(last_applied_op_id, rounds_[0]->id());
443
444
  // Append 10 rounds
445
11
  for (int i = 0; i < 10; i++) {
446
10
    scoped_refptr<ConsensusRound> round = AppendNoOpRound();
447
    // queue reports majority replicated index in the leader's term
448
    // committed index should move accordingly.
449
10
    consensus_->TEST_UpdateMajorityReplicated(
450
10
        round->id(), &committed_index, &last_applied_op_id);
451
10
    ASSERT_EQ(last_applied_op_id, round->id());
452
10
  }
453
1
}
454
455
// Tests that, when terms change, the commit index only advances when the majority
456
// replicated index is in the current term.
457
1
TEST_F(RaftConsensusTest, TestCommittedIndexWhenTermsChange) {
458
1
  SetUpConsensus();
459
1
  SetUpGeneralExpectations();
460
1
  EXPECT_CALL(*peer_manager_, UpdateRaftConfig(_))
461
1
      .Times(2);
462
1
  EXPECT_CALL(*queue_, Init(_))
463
1
      .Times(1);
464
1
  EXPECT_CALL(*queue_, SetLeaderMode(_, _, _, _))
465
1
      .Times(2);
466
1
  EXPECT_CALL(*consensus_.get(), AppendNewRoundsToQueueUnlocked(_, _))
467
1
      .Times(3);
468
1
  EXPECT_CALL(*consensus_.get(), AppendNewRoundToQueueUnlocked(_))
469
1
      .Times(2);
470
1
  EXPECT_CALL(*queue_, AppendOperationsMock(_, _, _))
471
1
      .Times(5).WillRepeatedly(Return(Status::OK()));;
472
473
1
  ConsensusBootstrapInfo info;
474
1
  ASSERT_OK(consensus_->Start(info));
475
1
  ASSERT_OK(consensus_->EmulateElection());
476
477
1
  OpId committed_index;
478
1
  OpId last_applied_op_id;
479
1
  consensus_->TEST_UpdateMajorityReplicated(
480
1
      rounds_[0]->id(), &committed_index, &last_applied_op_id);
481
1
  ASSERT_EQ(rounds_[0]->id(), committed_index);
482
1
  ASSERT_EQ(last_applied_op_id, rounds_[0]->id());
483
484
  // Append another round in the current term (besides the original config round).
485
1
  scoped_refptr<ConsensusRound> round = AppendNoOpRound();
486
487
  // Now emulate an election, the same guy will be leader but the term
488
  // will change.
489
1
  ASSERT_OK(consensus_->EmulateElection());
490
491
  // Now tell consensus that 'round' has been majority replicated, this _shouldn't_
492
  // advance the committed index, since that belongs to a previous term.
493
1
  OpId new_committed_index;
494
1
  OpId new_last_applied_op_id;
495
1
  consensus_->TEST_UpdateMajorityReplicated(
496
1
      round->id(), &new_committed_index, &new_last_applied_op_id);
497
1
  ASSERT_EQ(committed_index, new_committed_index);
498
1
  ASSERT_EQ(last_applied_op_id, new_last_applied_op_id);
499
500
1
  const scoped_refptr<ConsensusRound>& last_config_round = rounds_[2];
501
502
  // Now notify that the last change config was committed, this should advance the
503
  // commit index to the id of the last change config.
504
1
  consensus_->TEST_UpdateMajorityReplicated(
505
1
      last_config_round->id(), &committed_index, &last_applied_op_id);
506
507
1
  DumpRounds();
508
1
  ASSERT_EQ(last_config_round->id(), committed_index);
509
1
  ASSERT_EQ(last_applied_op_id, last_config_round->id());
510
1
}
511
512
// Asserts that a ConsensusRound has an OpId set in its ReplicateMsg.
513
11
MATCHER(HasOpId, "") { return !arg->id().empty(); }
514
515
// These matchers assert that a Status object is of a certain type.
516
20
MATCHER(IsOk, "") { return arg.ok(); }
517
6
MATCHER(IsAborted, "") { return arg.IsAborted(); }
518
519
// Tests that consensus is able to handle pending operations. It tests this in two ways:
520
// - It tests that consensus does the right thing with pending transactions from the WAL.
521
// - It tests that when a follower gets promoted to leader it does the right thing
522
//   with the pending operations.
523
1
TEST_F(RaftConsensusTest, TestPendingOperations) {
524
1
  SetUpConsensus(10);
525
526
  // Emulate a stateful system by having a bunch of operations in flight when consensus starts.
527
  // Specifically we emulate we're on term 10, with 10 operations that have not been committed yet.
528
1
  ConsensusBootstrapInfo info;
529
1
  info.last_id.set_term(10);
530
11
  for (int i = 0; i < 10; i++) {
531
10
    auto replicate = std::make_shared<ReplicateMsg>();
532
10
    replicate->set_op_type(NO_OP);
533
10
    info.last_id.set_index(100 + i);
534
10
    replicate->mutable_id()->CopyFrom(info.last_id);
535
10
    info.orphaned_replicates.push_back(replicate);
536
10
  }
537
538
1
  info.last_committed_id.set_term(10);
539
1
  info.last_committed_id.set_index(99);
540
541
1
  {
542
1
    InSequence dummy;
543
    // On start we expect 10 NO_OPs to be enqueued.
544
1
    EXPECT_CALL(*consensus_.get(), StartConsensusOnlyRoundUnlocked(_))
545
1
        .Times(10);
546
547
    // Queue gets initted when the peer starts.
548
1
    EXPECT_CALL(*queue_, Init(_))
549
1
        .Times(1);
550
1
  }
551
1
  ASSERT_OK(consensus_->Start(info));
552
553
1
  ASSERT_TRUE(testing::Mock::VerifyAndClearExpectations(queue_));
554
1
  ASSERT_TRUE(testing::Mock::VerifyAndClearExpectations(operation_factory_.get()));
555
1
  ASSERT_TRUE(testing::Mock::VerifyAndClearExpectations(peer_manager_));
556
1
  ASSERT_TRUE(testing::Mock::VerifyAndClearExpectations(consensus_.get()));
557
558
  // Now we test what this peer does with the pending operations once it's elected leader.
559
1
  {
560
1
    InSequence dummy;
561
    // Peer manager gets updated with the new set of peers to send stuff to.
562
1
    EXPECT_CALL(*peer_manager_, UpdateRaftConfig(_))
563
1
        .Times(1);
564
    // The no-op should be appended to the queue.
565
1
    EXPECT_CALL(*consensus_.get(), AppendNewRoundToQueueUnlocked(_))
566
1
        .Times(1);
567
    // One more op will be appended for the election.
568
1
    EXPECT_CALL(*queue_, AppendOperationsMock(_, _, _))
569
1
        .Times(1).WillRepeatedly(Return(Status::OK()));;
570
1
  }
571
572
  // Emulate an election, this will make this peer become leader and trigger the
573
  // above set expectations.
574
1
  ASSERT_OK(consensus_->EmulateElection());
575
576
1
  ASSERT_TRUE(testing::Mock::VerifyAndClearExpectations(queue_));
577
1
  ASSERT_TRUE(testing::Mock::VerifyAndClearExpectations(operation_factory_.get()));
578
1
  ASSERT_TRUE(testing::Mock::VerifyAndClearExpectations(peer_manager_));
579
580
  // Commit the 10 no-ops from the previous term, along with the one pushed to
581
  // assert leadership.
582
1
  EXPECT_CALL(*consensus_.get(), NonTrackedRoundReplicationFinished(HasOpId(), _, IsOk()))
583
1
      .Times(11);
584
1
  EXPECT_CALL(*peer_manager_, SignalRequest(_))
585
1
      .Times(AnyNumber());
586
  // In the end peer manager and the queue get closed.
587
1
  EXPECT_CALL(*peer_manager_, Close())
588
1
      .Times(AtLeast(1));
589
1
  EXPECT_CALL(*queue_, Close())
590
1
      .Times(1);
591
592
  // Now tell consensus all original orphaned replicates were majority replicated.
593
  // This should not advance the committed index because we haven't replicated
594
  // anything in the current term.
595
1
  OpId committed_index;
596
1
  OpId last_applied_op_id;
597
1
  consensus_->TEST_UpdateMajorityReplicated(
598
1
      OpId::FromPB(info.orphaned_replicates.back()->id()), &committed_index, &last_applied_op_id);
599
  // Should still be the last committed in the wal.
600
1
  ASSERT_EQ(committed_index, OpId::FromPB(info.last_committed_id));
601
1
  ASSERT_EQ(last_applied_op_id, OpId::FromPB(info.last_committed_id));
602
603
  // Now mark the last operation (the no-op round) as committed.
604
  // This should advance the committed index, since that round in on our current term,
605
  // and we should be able to commit all previous rounds.
606
1
  OpId cc_round_id = OpId::FromPB(info.orphaned_replicates.back()->id());
607
1
  cc_round_id.term = 11;
608
609
  // +1 here because index is incremented during emulated election.
610
1
  ++cc_round_id.index;
611
1
  consensus_->TEST_UpdateMajorityReplicated(cc_round_id, &committed_index, &last_applied_op_id);
612
1
  ASSERT_EQ(committed_index, cc_round_id);
613
1
  ASSERT_EQ(last_applied_op_id, cc_round_id);
614
1
}
615
616
84
MATCHER_P2(RoundHasOpId, term, index, "") {
617
84
  LOG(INFO) << "expected: " << MakeOpId(term, index) << ", actual: " << arg->id();
618
84
  return arg->id().term == term && arg->id().index == index;
619
84
}
620
621
// Tests the case where a leader is elected and pushed a sequence of
622
// operations of which some never get committed. Eventually a new leader in a higher
623
// term pushes operations that overwrite some of the original indexes.
624
1
TEST_F(RaftConsensusTest, TestAbortOperations) {
625
1
  SetUpConsensus(1, 2);
626
627
1
  EXPECT_CALL(*consensus_.get(), AppendNewRoundToQueueUnlocked(_))
628
1
      .Times(AnyNumber());
629
630
1
  EXPECT_CALL(*peer_manager_, SignalRequest(_))
631
1
      .Times(AnyNumber());
632
1
  EXPECT_CALL(*peer_manager_, Close())
633
1
      .Times(AtLeast(1));
634
1
  EXPECT_CALL(*queue_, Close())
635
1
      .Times(1);
636
1
  EXPECT_CALL(*queue_, Init(_))
637
1
      .Times(1);
638
1
  EXPECT_CALL(*peer_manager_, UpdateRaftConfig(_))
639
1
      .Times(1);
640
641
  // We'll append to the queue 12 times, the initial noop txn + 10 initial ops while leader
642
  // and the new leader's update, when we're overwriting operations.
643
1
  EXPECT_CALL(*queue_, AppendOperationsMock(_, _, _))
644
1
      .Times(13);
645
646
  // .. but those will be overwritten later by another
647
  // leader, which will push and commit 5 ops.
648
  // Only these five should start as replica rounds.
649
1
  EXPECT_CALL(*consensus_.get(), StartConsensusOnlyRoundUnlocked(_))
650
1
      .Times(4);
651
652
1
  ConsensusBootstrapInfo info;
653
1
  ASSERT_OK(consensus_->Start(info));
654
1
  ASSERT_OK(consensus_->EmulateElection());
655
656
  // Append 10 rounds: 2.2 - 2.11
657
11
  for (int i = 0; i < 10; i++) {
658
10
    AppendNoOpRound();
659
10
  }
660
661
  // Expectations for what gets committed and what gets aborted:
662
  // (note: the aborts may be triggered before the commits)
663
  // 5 OK's for the 2.1-2.5 ops.
664
  // 6 Aborts for the 2.6-2.11 ops.
665
  // 1 OK for the 3.6 op.
666
6
  for (int index = 1; index < 6; index++) {
667
5
    EXPECT_CALL(*consensus_.get(),
668
5
                NonTrackedRoundReplicationFinished(RoundHasOpId(2, index), _, IsOk())).Times(1);
669
5
  }
670
7
  for (int index = 6; index < 12; index++) {
671
6
    EXPECT_CALL(*consensus_.get(),
672
6
                NonTrackedRoundReplicationFinished(
673
6
                    RoundHasOpId(2, index), _, IsAborted())).Times(1);
674
6
  }
675
1
  EXPECT_CALL(*consensus_.get(),
676
1
              NonTrackedRoundReplicationFinished(RoundHasOpId(3, 6), _, IsOk())).Times(1);
677
678
  // Nothing's committed so far, so now just send an Update() message
679
  // emulating another guy got elected leader and is overwriting a suffix
680
  // of the previous messages.
681
  // In particular this request has:
682
  // - Op 2.5 from the previous leader's term
683
  // - Ops 3.6-3.9 from the new leader's term
684
  // - A new committed index of 3.6
685
1
  ConsensusRequestPB request;
686
1
  request.set_caller_term(3);
687
1
  const string PEER_0_UUID = "peer-0";
688
1
  request.set_caller_uuid(PEER_0_UUID);
689
1
  request.set_tablet_id(kTestTablet);
690
1
  request.mutable_preceding_id()->CopyFrom(MakeOpId(2, 4));
691
692
1
  ReplicateMsg* replicate = request.add_ops();
693
1
  replicate->mutable_id()->CopyFrom(MakeOpId(2, 5));
694
1
  replicate->set_op_type(NO_OP);
695
696
1
  ReplicateMsg* noop_msg = request.add_ops();
697
1
  noop_msg->mutable_id()->CopyFrom(MakeOpId(3, 6));
698
1
  noop_msg->set_op_type(NO_OP);
699
1
  noop_msg->set_hybrid_time(clock_->Now().ToUint64());
700
1
  noop_msg->mutable_noop_request();
701
702
  // Overwrite another 3 of the original rounds for a total of 4 overwrites.
703
4
  for (int i = 7; i < 10; i++) {
704
3
    ReplicateMsg* replicate = request.add_ops();
705
3
    replicate->mutable_id()->CopyFrom(MakeOpId(3, i));
706
3
    replicate->set_op_type(NO_OP);
707
3
    replicate->set_hybrid_time(clock_->Now().ToUint64());
708
3
  }
709
710
1
  request.mutable_committed_op_id()->CopyFrom(MakeOpId(3, 6));
711
712
1
  ConsensusResponsePB response;
713
1
  ASSERT_OK(consensus_->Update(&request, &response, CoarseBigDeadline()));
714
1
  ASSERT_FALSE(response.has_error());
715
716
1
  ASSERT_TRUE(Mock::VerifyAndClearExpectations(consensus_.get()));
717
718
  // Now we expect to commit ops 3.7 - 3.9.
719
4
  for (int index = 7; index < 10; index++) {
720
3
    EXPECT_CALL(*consensus_.get(),
721
3
                NonTrackedRoundReplicationFinished(RoundHasOpId(3, index), _, IsOk())).Times(1);
722
3
  }
723
724
1
  request.mutable_ops()->Clear();
725
1
  request.mutable_preceding_id()->CopyFrom(MakeOpId(3, 9));
726
1
  request.mutable_committed_op_id()->CopyFrom(MakeOpId(3, 9));
727
728
1
  ASSERT_OK(consensus_->Update(&request, &response, CoarseBigDeadline()));
729
1
  ASSERT_FALSE(response.has_error());
730
1
}
731
732
1
TEST_F(RaftConsensusTest, TestReceivedIdIsInittedBeforeStart) {
733
1
  SetUpConsensus();
734
1
  OpIdPB opid;
735
1
  consensus_->GetLastReceivedOpId().ToPB(&opid);
736
1
  ASSERT_TRUE(opid.IsInitialized());
737
1
  ASSERT_OPID_EQ(opid, MinimumOpId());
738
1
}
739
740
// Ensure that followers reset their "last_received_current_leader"
741
// ConsensusStatusPB field when a new term is encountered. This is a
742
// correctness test for the logic on the follower side that allows the
743
// leader-side queue to determine which op to send next in various scenarios.
744
1
TEST_F(RaftConsensusTest, TestResetRcvdFromCurrentLeaderOnNewTerm) {
745
1
  SetUpConsensus(kMinimumTerm, 3);
746
1
  SetUpGeneralExpectations();
747
1
  ConsensusBootstrapInfo info;
748
1
  ASSERT_OK(consensus_->Start(info));
749
750
1
  ConsensusRequestPB request;
751
1
  ConsensusResponsePB response;
752
1
  int64_t caller_term = 0;
753
1
  int64_t log_index = 0;
754
755
1
  caller_term = 1;
756
1
  string caller_uuid = config_.peers(0).permanent_uuid();
757
1
  OpIdPB preceding_opid = MinimumOpId();
758
759
  // Heartbeat. This will cause the term to increment on the follower.
760
1
  request = MakeConsensusRequest(caller_term, caller_uuid, preceding_opid);
761
1
  response.Clear();
762
1
  ASSERT_OK(consensus_->Update(&request, &response, CoarseBigDeadline()));
763
2
  ASSERT_FALSE(response.status().has_error()) << response.ShortDebugString();
764
1
  ASSERT_EQ(caller_term, response.responder_term());
765
1
  ASSERT_OPID_EQ(response.status().last_received(), MinimumOpId());
766
1
  ASSERT_OPID_EQ(response.status().last_received_current_leader(), MinimumOpId());
767
768
  // Replicate a no-op.
769
1
  OpIdPB noop_opid = MakeOpId(caller_term, ++log_index);
770
1
  AddNoOpToConsensusRequest(&request, noop_opid);
771
1
  response.Clear();
772
1
  ASSERT_OK(consensus_->Update(&request, &response, CoarseBigDeadline()));
773
2
  ASSERT_FALSE(response.status().has_error()) << response.ShortDebugString();
774
1
  ASSERT_OPID_EQ(response.status().last_received(), noop_opid);
775
1
  ASSERT_OPID_EQ(response.status().last_received_current_leader(),  noop_opid);
776
777
  // New leader heartbeat. Term increase to 2.
778
  // Expect current term replicated to be nothing (MinimumOpId) but log
779
  // replicated to be everything sent so far.
780
1
  caller_term = 2;
781
1
  caller_uuid = config_.peers(1).permanent_uuid();
782
1
  preceding_opid = noop_opid;
783
1
  request = MakeConsensusRequest(caller_term, caller_uuid, preceding_opid);
784
1
  response.Clear();
785
1
  ASSERT_OK(consensus_->Update(&request, &response, CoarseBigDeadline()));
786
2
  ASSERT_FALSE(response.status().has_error()) << response.ShortDebugString();
787
1
  ASSERT_EQ(caller_term, response.responder_term());
788
1
  ASSERT_OPID_EQ(response.status().last_received(), preceding_opid);
789
1
  ASSERT_OPID_EQ(response.status().last_received_current_leader(), MinimumOpId());
790
791
  // Append a no-op.
792
1
  noop_opid = MakeOpId(caller_term, ++log_index);
793
1
  AddNoOpToConsensusRequest(&request, noop_opid);
794
1
  response.Clear();
795
1
  ASSERT_OK(consensus_->Update(&request, &response, CoarseBigDeadline()));
796
2
  ASSERT_FALSE(response.status().has_error()) << response.ShortDebugString();
797
1
  ASSERT_OPID_EQ(response.status().last_received(), noop_opid);
798
1
  ASSERT_OPID_EQ(response.status().last_received_current_leader(), noop_opid);
799
800
  // New leader heartbeat. The term should rev but we should get an LMP mismatch.
801
1
  caller_term = 3;
802
1
  caller_uuid = config_.peers(0).permanent_uuid();
803
1
  preceding_opid = MakeOpId(caller_term, log_index + 1); // Not replicated yet.
804
1
  request = MakeConsensusRequest(caller_term, caller_uuid, preceding_opid);
805
1
  response.Clear();
806
1
  ASSERT_OK(consensus_->Update(&request, &response, CoarseBigDeadline()));
807
1
  ASSERT_EQ(caller_term, response.responder_term());
808
1
  ASSERT_OPID_EQ(response.status().last_received(), noop_opid); // Not preceding this time.
809
1
  ASSERT_OPID_EQ(response.status().last_received_current_leader(), MinimumOpId());
810
2
  ASSERT_TRUE(response.status().has_error()) << response.ShortDebugString();
811
1
  ASSERT_EQ(ConsensusErrorPB::PRECEDING_ENTRY_DIDNT_MATCH, response.status().error().code());
812
813
  // Decrement preceding and append a no-op.
814
1
  preceding_opid = MakeOpId(2, log_index);
815
1
  noop_opid = MakeOpId(caller_term, ++log_index);
816
1
  request = MakeConsensusRequest(caller_term, caller_uuid, preceding_opid);
817
1
  AddNoOpToConsensusRequest(&request, noop_opid);
818
1
  response.Clear();
819
1
  ASSERT_OK(consensus_->Update(&request, &response, CoarseBigDeadline()));
820
2
  ASSERT_FALSE(response.status().has_error()) << response.ShortDebugString();
821
1
  ASSERT_OPID_EQ(response.status().last_received(), noop_opid) << response.ShortDebugString();
822
1
  ASSERT_OPID_EQ(response.status().last_received_current_leader(), noop_opid)
823
0
      << response.ShortDebugString();
824
825
  // Happy case. New leader with new no-op to append right off the bat.
826
  // Response should be OK with all last_received* fields equal to the new no-op.
827
1
  caller_term = 4;
828
1
  caller_uuid = config_.peers(1).permanent_uuid();
829
1
  preceding_opid = noop_opid;
830
1
  noop_opid = MakeOpId(caller_term, ++log_index);
831
1
  request = MakeConsensusRequest(caller_term, caller_uuid, preceding_opid);
832
1
  AddNoOpToConsensusRequest(&request, noop_opid);
833
1
  response.Clear();
834
1
  ASSERT_OK(consensus_->Update(&request, &response, CoarseBigDeadline()));
835
2
  ASSERT_FALSE(response.status().has_error()) << response.ShortDebugString();
836
1
  ASSERT_EQ(caller_term, response.responder_term());
837
1
  ASSERT_OPID_EQ(response.status().last_received(), noop_opid);
838
1
  ASSERT_OPID_EQ(response.status().last_received_current_leader(), noop_opid);
839
1
}
840
841
}  // namespace consensus
842
}  // namespace yb