YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/consensus/raft_consensus_quorum-test.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include <gtest/gtest.h>
34
35
#include "yb/common/schema.h"
36
#include "yb/common/wire_protocol-test-util.h"
37
38
#include "yb/consensus/consensus-test-util.h"
39
#include "yb/consensus/log.h"
40
#include "yb/consensus/log_index.h"
41
#include "yb/consensus/log_reader.h"
42
#include "yb/consensus/log_util.h"
43
#include "yb/consensus/opid_util.h"
44
#include "yb/consensus/peer_manager.h"
45
#include "yb/consensus/quorum_util.h"
46
#include "yb/consensus/raft_consensus.h"
47
#include "yb/consensus/replica_state.h"
48
49
#include "yb/gutil/bind.h"
50
#include "yb/gutil/stl_util.h"
51
#include "yb/gutil/strings/strcat.h"
52
#include "yb/gutil/strings/substitute.h"
53
54
#include "yb/rpc/messenger.h"
55
56
#include "yb/server/logical_clock.h"
57
58
#include "yb/util/mem_tracker.h"
59
#include "yb/util/metrics.h"
60
#include "yb/util/status_log.h"
61
#include "yb/util/test_macros.h"
62
#include "yb/util/test_util.h"
63
#include "yb/util/threadpool.h"
64
65
DECLARE_int32(raft_heartbeat_interval_ms);
66
DECLARE_bool(enable_leader_failure_detection);
67
68
METRIC_DECLARE_entity(table);
69
METRIC_DECLARE_entity(tablet);
70
71
#define REPLICATE_SEQUENCE_OF_MESSAGES(...) \
72
6
  ASSERT_NO_FATALS(ReplicateSequenceOfMessages(__VA_ARGS__))
73
74
using std::shared_ptr;
75
using std::unique_ptr;
76
77
namespace yb {
78
79
namespace consensus {
80
81
using log::Log;
82
using log::LogEntryPB;
83
using log::LogOptions;
84
using log::LogReader;
85
using rpc::RpcContext;
86
using strings::Substitute;
87
using strings::SubstituteAndAppend;
88
89
const char* kTestTable = "TestTable";
90
const char* kTestTablet = "TestTablet";
91
92
473
void DoNothing(std::shared_ptr<consensus::StateChangeContext> context) {
93
473
}
94
95
// Test suite for tests that focus on multiple peer interaction, but
96
// without integrating with other components, such as transactions.
97
class RaftConsensusQuorumTest : public YBTest {
98
 public:
99
  RaftConsensusQuorumTest()
100
    : clock_(server::LogicalClock::CreateStartingAt(HybridTime(0))),
101
      table_metric_entity_(
102
          METRIC_ENTITY_table.Instantiate(&metric_registry_, "raft-test-table")),
103
      tablet_metric_entity_(
104
          METRIC_ENTITY_tablet.Instantiate(&metric_registry_, "raft-test-tablet")),
105
7
      schema_(GetSimpleTestSchema()) {
106
7
    options_.tablet_id = kTestTablet;
107
7
    FLAGS_enable_leader_failure_detection = false;
108
7
  }
109
110
  // Builds an initial configuration of 'num' elements.
111
  // All of the peers start as followers.
112
7
  void BuildInitialRaftConfigPB(int num) {
113
7
    config_ = BuildRaftConfigPBForTests(num);
114
7
    config_.set_opid_index(kInvalidOpIdIndex);
115
7
    peers_.reset(new TestPeerMapManager(config_));
116
7
  }
117
118
7
  Status BuildFsManagersAndLogs() {
119
    // Build the fsmanagers and logs
120
30
    for (int i = 0; i < config_.peers_size(); i++) {
121
23
      shared_ptr<MemTracker> parent_mem_tracker =
122
23
          MemTracker::CreateTracker(Substitute("peer-$0", i));
123
23
      parent_mem_trackers_.push_back(parent_mem_tracker);
124
23
      string test_path = GetTestPath(Substitute("peer-$0-root", i));
125
23
      FsManagerOpts opts;
126
23
      opts.parent_mem_tracker = parent_mem_tracker;
127
23
      opts.wal_paths = { test_path };
128
23
      opts.data_paths = { test_path };
129
23
      opts.server_type = "tserver_test";
130
23
      std::unique_ptr<FsManager> fs_manager(new FsManager(env_.get(), opts));
131
23
      RETURN_NOT_OK(fs_manager->CreateInitialFileSystemLayout());
132
23
      RETURN_NOT_OK(fs_manager->Open());
133
134
23
      scoped_refptr<Log> log;
135
23
      RETURN_NOT_OK(Log::Open(LogOptions(),
136
23
                              kTestTablet,
137
23
                              fs_manager->GetFirstTabletWalDirOrDie(kTestTable, kTestTablet),
138
23
                              fs_manager->uuid(),
139
23
                              schema_,
140
23
                              0, // schema_version
141
23
                              nullptr, // table_metric_entity
142
23
                              nullptr, // tablet_metric_entity
143
23
                              log_thread_pool_.get(),
144
23
                              log_thread_pool_.get(),
145
23
                              std::numeric_limits<int64_t>::max(), // cdc_min_replicated_index
146
23
                              &log));
147
23
      logs_.push_back(log.get());
148
23
      fs_managers_.push_back(fs_manager.release());
149
23
    }
150
7
    return Status::OK();
151
7
  }
152
153
7
  void BuildPeers() {
154
7
    vector<LocalTestPeerProxyFactory*> proxy_factories;
155
30
    for (int i = 0; i < config_.peers_size(); i++) {
156
23
      auto proxy_factory = std::make_unique<LocalTestPeerProxyFactory>(peers_.get());
157
23
      proxy_factories.push_back(proxy_factory.get());
158
159
23
      auto operation_factory = new TestOperationFactory();
160
161
23
      string peer_uuid = Substitute("peer-$0", i);
162
163
23
      std::unique_ptr<ConsensusMetadata> cmeta;
164
23
      ASSERT_OK(ConsensusMetadata::Create(fs_managers_[i], kTestTablet, peer_uuid, config_,
165
23
                                         kMinimumTerm, &cmeta));
166
167
23
      RaftPeerPB local_peer_pb;
168
23
      ASSERT_OK(GetRaftConfigMember(config_, peer_uuid, &local_peer_pb));
169
23
      auto queue = std::make_unique<PeerMessageQueue>(
170
23
          tablet_metric_entity_,
171
23
          logs_[i],
172
23
          MemTracker::FindOrCreateTracker(peer_uuid),
173
23
          MemTracker::FindOrCreateTracker(peer_uuid),
174
23
          local_peer_pb,
175
23
          kTestTablet,
176
23
          clock_,
177
23
          nullptr /* consensus_context */,
178
23
          raft_pool_->NewToken(ThreadPool::ExecutionMode::SERIAL));
179
180
23
      unique_ptr<ThreadPoolToken> pool_token(
181
23
          raft_pool_->NewToken(ThreadPool::ExecutionMode::CONCURRENT));
182
183
23
      auto peer_manager = std::make_unique<PeerManager>(
184
23
          options_.tablet_id,
185
23
          config_.peers(i).permanent_uuid(),
186
23
          proxy_factory.get(),
187
23
          queue.get(),
188
23
          pool_token.get(),
189
23
          nullptr);
190
191
23
      shared_ptr<RaftConsensus> peer(new RaftConsensus(
192
23
          options_,
193
23
          std::move(cmeta),
194
23
          std::move(proxy_factory),
195
23
          std::move(queue),
196
23
          std::move(peer_manager),
197
23
          std::move(pool_token),
198
23
          table_metric_entity_,
199
23
          tablet_metric_entity_,
200
23
          config_.peers(i).permanent_uuid(),
201
23
          clock_,
202
23
          operation_factory,
203
23
          logs_[i],
204
23
          parent_mem_trackers_[i],
205
23
          Bind(&DoNothing),
206
23
          DEFAULT_TABLE_TYPE,
207
23
          nullptr /* retryable_requests */));
208
209
23
      operation_factory->SetConsensus(peer.get());
210
23
      operation_factories_.emplace_back(operation_factory);
211
23
      peers_->AddPeer(config_.peers(i).permanent_uuid(), peer);
212
23
    }
213
7
  }
214
215
7
  Status StartPeers() {
216
7
    ConsensusBootstrapInfo boot_info;
217
218
7
    TestPeerMap all_peers = peers_->GetPeerMapCopy();
219
23
    for (const TestPeerMap::value_type& entry : all_peers) {
220
23
      RETURN_NOT_OK(entry.second->Start(boot_info));
221
23
    }
222
7
    return Status::OK();
223
7
  }
224
225
7
  Status BuildConfig(int num) {
226
7
    RETURN_NOT_OK(ThreadPoolBuilder("raft").Build(&raft_pool_));
227
7
    RETURN_NOT_OK(ThreadPoolBuilder("log").Build(&log_thread_pool_));
228
7
    BuildInitialRaftConfigPB(num);
229
7
    RETURN_NOT_OK(BuildFsManagersAndLogs());
230
7
    BuildPeers();
231
7
    return Status::OK();
232
7
  }
233
234
6
  Status BuildAndStartConfig(int num) {
235
6
    RETURN_NOT_OK(BuildConfig(num));
236
6
    RETURN_NOT_OK(StartPeers());
237
238
    // Automatically elect the last node in the list.
239
6
    const int kLeaderIdx = num - 1;
240
6
    shared_ptr<RaftConsensus> leader;
241
6
    RETURN_NOT_OK(peers_->GetPeerByIdx(kLeaderIdx, &leader));
242
6
    RETURN_NOT_OK(leader->EmulateElection());
243
6
    RETURN_NOT_OK(leader->WaitUntilLeaderForTests(MonoDelta::FromSeconds(10)));
244
6
    return Status::OK();
245
6
  }
246
247
104
  LocalTestPeerProxy* GetLeaderProxyToPeer(int peer_idx, int leader_idx) {
248
104
    shared_ptr<RaftConsensus> follower;
249
104
    CHECK_OK(peers_->GetPeerByIdx(peer_idx, &follower));
250
104
    shared_ptr<RaftConsensus> leader;
251
104
    CHECK_OK(peers_->GetPeerByIdx(leader_idx, &leader));
252
104
    for (LocalTestPeerProxy* proxy : down_cast<LocalTestPeerProxyFactory*>(
253
156
        leader->peer_proxy_factory_.get())->GetProxies()) {
254
156
      if (proxy->GetTarget() == follower->peer_uuid()) {
255
104
        return proxy;
256
104
      }
257
156
    }
258
0
    CHECK(false) << "Proxy not found";
259
0
    return nullptr;
260
104
  }
261
262
  Status AppendDummyMessage(int peer_idx,
263
172
                            scoped_refptr<ConsensusRound>* round) {
264
172
    auto msg = std::make_shared<ReplicateMsg>();
265
172
    msg->set_op_type(NO_OP);
266
172
    msg->mutable_noop_request();
267
172
    msg->set_hybrid_time(clock_->Now().ToUint64());
268
269
172
    shared_ptr<RaftConsensus> peer;
270
172
    CHECK_OK(peers_->GetPeerByIdx(peer_idx, &peer));
271
272
    // Use a latch in place of a Transaction callback.
273
172
    auto sync = std::make_unique<Synchronizer>();
274
172
    *round = make_scoped_refptr<ConsensusRound>(peer.get(), std::move(msg));
275
172
    (**round).SetCallback(MakeNonTrackedRoundCallback(
276
172
        round->get(),
277
172
        [sync = sync.get()](const Status& status) {
278
172
      sync->StatusCB(status);
279
172
    }));
280
172
    (**round).BindToTerm(peer->LeaderTerm());
281
172
    InsertOrDie(&syncs_, round->get(), sync.release());
282
172
    RETURN_NOT_OK_PREPEND(peer->TEST_Replicate(round->get()),
283
172
                          Substitute("Unable to replicate to peer $0", peer_idx));
284
172
    return Status::OK();
285
172
  }
286
287
172
  Status WaitForReplicate(ConsensusRound* round) {
288
172
    return FindOrDie(syncs_, round)->Wait();
289
172
  }
290
291
1
  Status TimedWaitForReplicate(ConsensusRound* round, const MonoDelta& delta) {
292
1
    return FindOrDie(syncs_, round)->WaitFor(delta);
293
1
  }
294
295
17
  void WaitForReplicateIfNotAlreadyPresent(const OpIdPB& to_wait_for, int peer_idx) {
296
17
    shared_ptr<RaftConsensus> peer;
297
17
    ASSERT_OK(peers_->GetPeerByIdx(peer_idx, &peer));
298
17
    ReplicaState* state = peer->GetReplicaStateForTests();
299
343
    while (true) {
300
343
      {
301
343
        auto lock = state->LockForRead();
302
343
        if (state->GetLastReceivedOpIdUnlocked().index >= to_wait_for.index()) {
303
17
          return;
304
17
        }
305
326
      }
306
326
      SleepFor(MonoDelta::FromMilliseconds(1));
307
326
    }
308
17
  }
309
310
  // Waits for an operation to be (database) committed in the replica at index
311
  // 'peer_idx'. If the operation was already committed this returns immediately.
312
  void WaitForCommitIfNotAlreadyPresent(const OpIdPB& to_wait_for,
313
                                        int peer_idx,
314
26
                                        int leader_idx) {
315
26
    MonoDelta timeout(MonoDelta::FromSeconds(10));
316
26
    MonoTime start(MonoTime::Now());
317
318
26
    shared_ptr<RaftConsensus> peer;
319
26
    ASSERT_OK(peers_->GetPeerByIdx(peer_idx, &peer));
320
26
    ReplicaState* state = peer->GetReplicaStateForTests();
321
322
26
    int backoff_exp = 0;
323
26
    const int kMaxBackoffExp = 8;
324
26
    OpIdPB committed_op_id;
325
51
    while (true) {
326
51
      {
327
51
        auto lock = state->LockForRead();
328
51
        state->GetCommittedOpIdUnlocked().ToPB(&committed_op_id);
329
51
        if (OpIdCompare(committed_op_id, to_wait_for) >= 0) {
330
26
          return;
331
26
        }
332
25
      }
333
25
      MonoDelta elapsed = MonoTime::Now().GetDeltaSince(start);
334
25
      if (elapsed.MoreThan(timeout)) {
335
0
        break;
336
0
      }
337
25
      SleepFor(MonoDelta::FromMilliseconds(1 << backoff_exp));
338
25
      backoff_exp = std::min(backoff_exp + 1, kMaxBackoffExp);
339
25
    }
340
341
0
    LOG(ERROR) << "Max timeout reached (" << timeout.ToString() << ") while waiting for commit of "
342
0
               << "op " << to_wait_for << " on replica. Last committed op on replica: "
343
0
               << committed_op_id << ". Dumping state and quitting.";
344
0
    vector<string> lines;
345
0
    shared_ptr<RaftConsensus> leader;
346
0
    ASSERT_OK(peers_->GetPeerByIdx(leader_idx, &leader));
347
0
    for (const string& line : lines) {
348
0
      LOG(ERROR) << line;
349
0
    }
350
351
    // Gather the replica and leader operations for printing
352
0
    log::LogEntries replica_ops = GatherLogEntries(peer_idx, logs_[peer_idx]);
353
0
    log::LogEntries leader_ops = GatherLogEntries(leader_idx, logs_[leader_idx]);
354
0
    SCOPED_TRACE(PrintOnError(replica_ops, Substitute("local peer ($0)", peer->peer_uuid())));
355
0
    SCOPED_TRACE(PrintOnError(leader_ops, Substitute("leader (peer-$0)", leader_idx)));
356
0
    FAIL() << "Replica did not commit.";
357
0
  }
358
359
  // Used in ReplicateSequenceOfMessages() to specify whether
360
  // we should wait for all replicas to have replicated the
361
  // sequence or just a majority.
362
  enum ReplicateWaitMode {
363
    WAIT_FOR_ALL_REPLICAS,
364
    WAIT_FOR_MAJORITY
365
  };
366
367
  // Used in ReplicateSequenceOfMessages() to specify whether
368
  // we should also commit the messages in the sequence
369
  enum CommitMode {
370
    DONT_COMMIT,
371
    COMMIT_ONE_BY_ONE
372
  };
373
374
  // Replicates a sequence of messages to the peer passed as leader.
375
  // Optionally waits for the messages to be replicated to followers.
376
  // 'last_op_id' is set to the id of the last replicated operation.
377
  // The operations are only committed if 'commit_one_by_one' is true.
378
  void ReplicateSequenceOfMessages(int seq_size,
379
                                   int leader_idx,
380
                                   ReplicateWaitMode wait_mode,
381
                                   CommitMode commit_mode,
382
                                   OpIdPB* last_op_id,
383
7
                                   vector<scoped_refptr<ConsensusRound> >* rounds) {
384
77
    for (int i = 0; i < seq_size; i++) {
385
70
      scoped_refptr<ConsensusRound> round;
386
70
      ASSERT_OK(AppendDummyMessage(leader_idx, &round));
387
70
      ASSERT_OK(WaitForReplicate(round.get()));
388
70
      round->id().ToPB(last_op_id);
389
70
      rounds->push_back(round);
390
70
    }
391
392
7
    if (wait_mode == WAIT_FOR_ALL_REPLICAS) {
393
4
      shared_ptr<RaftConsensus> leader;
394
4
      ASSERT_OK(peers_->GetPeerByIdx(leader_idx, &leader));
395
396
4
      TestPeerMap all_peers = peers_->GetPeerMapCopy();
397
4
      int i = 0;
398
15
      for (const TestPeerMap::value_type& entry : all_peers) {
399
15
        if (entry.second->peer_uuid() != leader->peer_uuid()) {
400
11
          WaitForReplicateIfNotAlreadyPresent(*last_op_id, i);
401
11
        }
402
15
        i++;
403
15
      }
404
4
    }
405
7
  }
406
407
10
  log::LogEntries GatherLogEntries(int idx, const scoped_refptr<Log>& log) {
408
10
    EXPECT_OK(log->WaitUntilAllFlushed());
409
10
    EXPECT_OK(log->Close());
410
10
    std::unique_ptr<LogReader> log_reader;
411
10
    EXPECT_OK(log::LogReader::Open(fs_managers_[idx]->env(),
412
10
                                   scoped_refptr<log::LogIndex>(),
413
10
                                   "Log reader: ",
414
10
                                   fs_managers_[idx]->GetFirstTabletWalDirOrDie(kTestTable,
415
10
                                                                                kTestTablet),
416
10
                                   table_metric_entity_.get(),
417
10
                                   tablet_metric_entity_.get(),
418
10
                                   &log_reader));
419
10
    log::LogEntries ret;
420
10
    log::SegmentSequence segments;
421
10
    EXPECT_OK(log_reader->GetSegmentsSnapshot(&segments));
422
423
10
    for (const log::SegmentSequence::value_type& entry : segments) {
424
10
      auto result = entry->ReadEntries();
425
10
      EXPECT_OK(result.status);
426
318
      for (auto& e : result.entries) {
427
318
        ret.push_back(std::move(e));
428
318
      }
429
10
    }
430
431
10
    return ret;
432
10
  }
433
434
  // Verifies that the replica's log match the leader's. This deletes the
435
  // peers (so we're sure that no further writes occur) and closes the logs
436
  // so it must be the very last thing to run, in a test.
437
5
  void VerifyLogs(int leader_idx, int first_replica_idx, int last_replica_idx) {
438
    // Wait for in-flight transactions to be done. We're destroying the
439
    // peers next and leader transactions won't be able to commit anymore.
440
17
    for (const auto& factory : operation_factories_) {
441
17
      factory->WaitDone();
442
17
    }
443
444
    // Shut down all the peers.
445
5
    TestPeerMap all_peers = peers_->GetPeerMapCopy();
446
15
    for (const TestPeerMap::value_type& entry : all_peers) {
447
15
      entry.second->Shutdown();
448
15
    }
449
450
5
    log::LogEntries leader_entries = GatherLogEntries(leader_idx, logs_[leader_idx]);
451
5
    shared_ptr<RaftConsensus> leader;
452
5
    ASSERT_OK(peers_->GetPeerByIdx(leader_idx, &leader));
453
454
10
    for (int replica_idx = first_replica_idx; replica_idx < last_replica_idx; replica_idx++) {
455
5
      log::LogEntries replica_entries = GatherLogEntries(replica_idx, logs_[replica_idx]);
456
457
5
      shared_ptr<RaftConsensus> replica;
458
5
      ASSERT_OK(peers_->GetPeerByIdx(replica_idx, &replica));
459
5
      VerifyReplica(leader_entries,
460
5
                    replica_entries,
461
5
                    leader->peer_uuid(),
462
5
                    replica->peer_uuid());
463
5
    }
464
5
  }
465
466
10
  std::vector<OpIdPB> ExtractReplicateIds(const log::LogEntries& entries) {
467
10
    std::vector<OpIdPB> result;
468
10
    result.reserve(entries.size() / 2);
469
318
    for (const auto& entry : entries) {
470
318
      if (entry->has_replicate()) {
471
318
        result.push_back(entry->replicate().id());
472
318
      }
473
318
    }
474
10
    return result;
475
10
  }
476
477
  void VerifyReplicateOrderMatches(const log::LogEntries& leader_entries,
478
5
                                   const log::LogEntries& replica_entries) {
479
5
    auto leader_ids = ExtractReplicateIds(leader_entries);
480
5
    auto replica_ids = ExtractReplicateIds(replica_entries);
481
5
    ASSERT_EQ(leader_ids.size(), replica_ids.size());
482
164
    for (size_t i = 0; i < leader_ids.size(); i++) {
483
159
      ASSERT_EQ(leader_ids[i].ShortDebugString(),
484
159
                replica_ids[i].ShortDebugString());
485
159
    }
486
5
  }
487
488
10
  void VerifyNoCommitsBeforeReplicates(const log::LogEntries& entries) {
489
10
    std::unordered_set<OpIdPB, OpIdHashFunctor, OpIdEqualsFunctor> replication_ops;
490
491
318
    for (const auto& entry : entries) {
492
318
      if (entry->has_replicate()) {
493
636
        ASSERT_TRUE(InsertIfNotPresent(&replication_ops, entry->replicate().id()))
494
636
          << "REPLICATE op id showed up twice: " << entry->ShortDebugString();
495
318
      }
496
318
    }
497
10
  }
498
499
  void VerifyReplica(const log::LogEntries& leader_entries,
500
                     const log::LogEntries& replica_entries,
501
                     const string& leader_name,
502
5
                     const string& replica_name) {
503
5
    SCOPED_TRACE(PrintOnError(leader_entries, Substitute("Leader: $0", leader_name)));
504
5
    SCOPED_TRACE(PrintOnError(replica_entries, Substitute("Replica: $0", replica_name)));
505
506
    // Check that the REPLICATE messages come in the same order on both nodes.
507
5
    VerifyReplicateOrderMatches(leader_entries, replica_entries);
508
509
    // Check that no COMMIT precedes its related REPLICATE on both the replica
510
    // and leader.
511
5
    VerifyNoCommitsBeforeReplicates(replica_entries);
512
5
    VerifyNoCommitsBeforeReplicates(leader_entries);
513
5
  }
514
515
  string PrintOnError(const log::LogEntries& replica_entries,
516
10
                      const string& replica_id) {
517
10
    string ret = "";
518
10
    SubstituteAndAppend(&ret, "$1 log entries for replica $0:\n",
519
10
                        replica_id, replica_entries.size());
520
318
    for (const auto& replica_entry : replica_entries) {
521
318
      StrAppend(&ret, "Replica log entry: ", replica_entry->ShortDebugString(), "\n");
522
318
    }
523
10
    return ret;
524
10
  }
525
526
  // Read the ConsensusMetadata for the given peer from disk.
527
5
  std::unique_ptr<ConsensusMetadata> ReadConsensusMetadataFromDisk(int peer_index) {
528
5
    string peer_uuid = Substitute("peer-$0", peer_index);
529
5
    std::unique_ptr<ConsensusMetadata> cmeta;
530
5
    CHECK_OK(ConsensusMetadata::Load(fs_managers_[peer_index], kTestTablet, peer_uuid, &cmeta));
531
5
    return cmeta;
532
5
  }
533
534
  // Assert that the durable term == term and that the peer that got the vote == voted_for.
535
4
  void AssertDurableTermAndVote(int peer_index, int64_t term, const std::string& voted_for) {
536
4
    auto cmeta = ReadConsensusMetadataFromDisk(peer_index);
537
4
    ASSERT_EQ(term, cmeta->current_term());
538
4
    ASSERT_EQ(voted_for, cmeta->voted_for());
539
4
  }
540
541
  // Assert that the durable term == term and that the peer has not yet voted.
542
1
  void AssertDurableTermWithoutVote(int peer_index, int64_t term) {
543
1
    auto cmeta = ReadConsensusMetadataFromDisk(peer_index);
544
1
    ASSERT_EQ(term, cmeta->current_term());
545
1
    ASSERT_FALSE(cmeta->has_voted_for());
546
1
  }
547
548
7
  ~RaftConsensusQuorumTest() {
549
7
    peers_->Clear();
550
7
    operation_factories_.clear();
551
    // We need to clear the logs before deleting the fs_managers_ or we'll
552
    // get a SIGSEGV when closing the logs.
553
7
    logs_.clear();
554
7
    STLDeleteElements(&fs_managers_);
555
7
    STLDeleteValues(&syncs_);
556
7
  }
557
558
 protected:
559
  ConsensusOptions options_;
560
  RaftConfigPB config_;
561
  OpIdPB initial_id_;
562
  vector<shared_ptr<MemTracker> > parent_mem_trackers_;
563
  vector<FsManager*> fs_managers_;
564
  vector<scoped_refptr<Log> > logs_;
565
  unique_ptr<ThreadPool> raft_pool_;
566
  unique_ptr<ThreadPool> log_thread_pool_;
567
  std::unique_ptr<TestPeerMapManager> peers_;
568
  std::vector<std::unique_ptr<TestOperationFactory>> operation_factories_;
569
  scoped_refptr<server::Clock> clock_;
570
  MetricRegistry metric_registry_;
571
  scoped_refptr<MetricEntity> table_metric_entity_;
572
  scoped_refptr<MetricEntity> tablet_metric_entity_;
573
  const Schema schema_;
574
  std::unordered_map<ConsensusRound*, Synchronizer*> syncs_;
575
};
576
577
1
TEST_F(RaftConsensusQuorumTest, TestConsensusContinuesIfAMinorityFallsBehind) {
578
  // Constants with the indexes of peers with certain roles,
579
  // since peers don't change roles in this test.
580
1
  const int kFollower0Idx = 0;
581
1
  const int kFollower1Idx = 1;
582
1
  const int kLeaderIdx = 2;
583
584
1
  ASSERT_OK(BuildAndStartConfig(3));
585
586
1
  OpIdPB last_replicate;
587
1
  vector<scoped_refptr<ConsensusRound> > rounds;
588
1
  {
589
    // lock one of the replicas down by obtaining the state lock
590
    // and never letting it go.
591
1
    shared_ptr<RaftConsensus> follower0;
592
1
    ASSERT_OK(peers_->GetPeerByIdx(kFollower0Idx, &follower0));
593
594
1
    ReplicaState* follower0_rs = follower0->GetReplicaStateForTests();
595
1
    auto lock = follower0_rs->LockForRead();
596
597
    // If the locked replica would stop consensus we would hang here
598
    // as we wait for operations to be replicated to a majority.
599
1
    ASSERT_NO_FATALS(ReplicateSequenceOfMessages(
600
1
                              10,
601
1
                              kLeaderIdx,
602
1
                              WAIT_FOR_MAJORITY,
603
1
                              COMMIT_ONE_BY_ONE,
604
1
                              &last_replicate,
605
1
                              &rounds));
606
607
    // Follower 1 should be fine (Were we to wait for follower0's replicate
608
    // this would hang here). We know he must have replicated but make sure
609
    // by calling Wait().
610
1
    WaitForReplicateIfNotAlreadyPresent(last_replicate, kFollower1Idx);
611
1
    WaitForCommitIfNotAlreadyPresent(last_replicate, kFollower1Idx, kLeaderIdx);
612
1
  }
613
614
  // After we let the lock go the remaining follower should get up-to-date
615
1
  WaitForReplicateIfNotAlreadyPresent(last_replicate, kFollower0Idx);
616
1
  WaitForCommitIfNotAlreadyPresent(last_replicate, kFollower0Idx, kLeaderIdx);
617
1
  VerifyLogs(2, 0, 1);
618
1
}
619
620
1
TEST_F(RaftConsensusQuorumTest, TestConsensusStopsIfAMajorityFallsBehind) {
621
  // Constants with the indexes of peers with certain roles,
622
  // since peers don't change roles in this test.
623
1
  const int kFollower0Idx = 0;
624
1
  const int kFollower1Idx = 1;
625
1
  const int kLeaderIdx = 2;
626
627
1
  ASSERT_OK(BuildAndStartConfig(3));
628
629
1
  OpIdPB last_op_id;
630
631
1
  scoped_refptr<ConsensusRound> round;
632
1
  {
633
    // lock two of the replicas down by obtaining the state locks
634
    // and never letting them go.
635
1
    shared_ptr<RaftConsensus> follower0;
636
1
    ASSERT_OK(peers_->GetPeerByIdx(kFollower0Idx, &follower0));
637
1
    ReplicaState* follower0_rs = follower0->GetReplicaStateForTests();
638
1
    auto lock0 = follower0_rs->LockForRead();
639
640
1
    shared_ptr<RaftConsensus> follower1;
641
1
    ASSERT_OK(peers_->GetPeerByIdx(kFollower1Idx, &follower1));
642
1
    ReplicaState* follower1_rs = follower1->GetReplicaStateForTests();
643
1
    auto lock1 = follower1_rs->LockForRead();
644
645
    // Append a single message to the queue
646
1
    ASSERT_OK(AppendDummyMessage(kLeaderIdx, &round));
647
1
    round->id().ToPB(&last_op_id);
648
    // This should timeout.
649
1
    Status status = TimedWaitForReplicate(round.get(), MonoDelta::FromMilliseconds(500));
650
1
    ASSERT_TRUE(status.IsTimedOut());
651
1
  }
652
653
  // After we release the locks the operation should replicate to all replicas
654
  // and we commit.
655
1
  ASSERT_OK(WaitForReplicate(round.get()));
656
657
  // Assert that everything was ok
658
1
  WaitForReplicateIfNotAlreadyPresent(last_op_id, kFollower0Idx);
659
1
  WaitForReplicateIfNotAlreadyPresent(last_op_id, kFollower1Idx);
660
1
  WaitForCommitIfNotAlreadyPresent(last_op_id, kFollower0Idx, kLeaderIdx);
661
1
  WaitForCommitIfNotAlreadyPresent(last_op_id, kFollower1Idx, kLeaderIdx);
662
1
  VerifyLogs(2, 0, 1);
663
1
}
664
665
// If some communication error happens the leader will resend the request to the
666
// peers. This tests that the peers handle repeated requests.
667
1
TEST_F(RaftConsensusQuorumTest, TestReplicasHandleCommunicationErrors) {
668
  // Constants with the indexes of peers with certain roles,
669
  // since peers don't change roles in this test.
670
1
  const int kFollower0Idx = 0;
671
1
  const int kFollower1Idx = 1;
672
1
  const int kLeaderIdx = 2;
673
674
1
  ASSERT_OK(BuildAndStartConfig(3));
675
676
1
  OpIdPB last_op_id;
677
678
  // Append a dummy message, with faults injected on the first attempt
679
  // to send the message.
680
1
  scoped_refptr<ConsensusRound> round;
681
1
  GetLeaderProxyToPeer(kFollower0Idx, kLeaderIdx)->InjectCommFaultLeaderSide();
682
1
  GetLeaderProxyToPeer(kFollower1Idx, kLeaderIdx)->InjectCommFaultLeaderSide();
683
1
  ASSERT_OK(AppendDummyMessage(kLeaderIdx, &round));
684
685
  // We should successfully replicate it due to retries.
686
1
  ASSERT_OK(WaitForReplicate(round.get()));
687
688
1
  GetLeaderProxyToPeer(kFollower0Idx, kLeaderIdx)->InjectCommFaultLeaderSide();
689
1
  GetLeaderProxyToPeer(kFollower1Idx, kLeaderIdx)->InjectCommFaultLeaderSide();
690
691
  // The commit should eventually reach both followers as well.
692
1
  round->id().ToPB(&last_op_id);
693
1
  WaitForCommitIfNotAlreadyPresent(last_op_id, kFollower0Idx, kLeaderIdx);
694
1
  WaitForCommitIfNotAlreadyPresent(last_op_id, kFollower1Idx, kLeaderIdx);
695
696
  // Append a sequence of messages, and keep injecting errors into the
697
  // replica proxies.
698
1
  vector<scoped_refptr<ConsensusRound> > rounds;
699
101
  for (int i = 0; i < 100; i++) {
700
100
    scoped_refptr<ConsensusRound> round;
701
100
    ASSERT_OK(AppendDummyMessage(kLeaderIdx, &round));
702
100
    ConsensusRound* round_ptr = round.get();
703
100
    round->id().ToPB(&last_op_id);
704
100
    rounds.push_back(round);
705
706
    // inject comm faults
707
100
    if (i % 2 == 0) {
708
50
      GetLeaderProxyToPeer(kFollower0Idx, kLeaderIdx)->InjectCommFaultLeaderSide();
709
50
    } else {
710
50
      GetLeaderProxyToPeer(kFollower1Idx, kLeaderIdx)->InjectCommFaultLeaderSide();
711
50
    }
712
713
100
    ASSERT_OK(WaitForReplicate(round_ptr));
714
100
  }
715
716
  // Assert last operation was correctly replicated and committed.
717
1
  WaitForReplicateIfNotAlreadyPresent(last_op_id, kFollower0Idx);
718
1
  WaitForReplicateIfNotAlreadyPresent(last_op_id, kFollower1Idx);
719
720
  // See comment at the end of TestFollowersReplicateAndCommitMessage
721
  // for an explanation on this waiting sequence.
722
1
  WaitForCommitIfNotAlreadyPresent(last_op_id, kFollower0Idx, kLeaderIdx);
723
1
  WaitForCommitIfNotAlreadyPresent(last_op_id, kFollower1Idx, kLeaderIdx);
724
1
  VerifyLogs(2, 0, 1);
725
1
}
726
727
// In this test we test the ability of the leader to send heartbeats
728
// to replicas by simply pushing nothing after the configuration round
729
// and still expecting for the replicas Update() hooks to be called.
730
1
TEST_F(RaftConsensusQuorumTest, TestLeaderHeartbeats) {
731
  // Constants with the indexes of peers with certain roles,
732
  // since peers don't change roles in this test.
733
1
  const int kFollower0Idx = 0;
734
1
  const int kFollower1Idx = 1;
735
1
  const int kLeaderIdx = 2;
736
737
1
  ASSERT_OK(BuildConfig(3));
738
739
1
  shared_ptr<RaftConsensus> follower0;
740
1
  ASSERT_OK(peers_->GetPeerByIdx(kFollower0Idx, &follower0));
741
1
  shared_ptr<RaftConsensus> follower1;
742
1
  ASSERT_OK(peers_->GetPeerByIdx(kFollower1Idx, &follower1));
743
744
1
  shared_ptr<CounterHooks> counter_hook_rpl0(
745
1
      new CounterHooks(follower0->GetFaultHooks()));
746
1
  shared_ptr<CounterHooks> counter_hook_rpl1(
747
1
      new CounterHooks(follower1->GetFaultHooks()));
748
749
  // Replace the default fault hooks on the replicas with counter hooks
750
  // before we start the configuration.
751
1
  follower0->SetFaultHooks(counter_hook_rpl0);
752
1
  follower1->SetFaultHooks(counter_hook_rpl1);
753
754
1
  ASSERT_OK(StartPeers());
755
756
1
  shared_ptr<RaftConsensus> leader;
757
1
  ASSERT_OK(peers_->GetPeerByIdx(kLeaderIdx, &leader));
758
1
  ASSERT_OK(leader->EmulateElection());
759
760
  // Wait for the config round to get committed and count the number
761
  // of update calls, calls after that will be heartbeats.
762
1
  OpIdPB config_round;
763
1
  config_round.set_term(1);
764
1
  config_round.set_index(1);
765
1
  WaitForCommitIfNotAlreadyPresent(config_round, kFollower0Idx, kLeaderIdx);
766
1
  WaitForCommitIfNotAlreadyPresent(config_round, kFollower1Idx, kLeaderIdx);
767
768
1
  int repl0_init_count = counter_hook_rpl0->num_pre_update_calls();
769
1
  int repl1_init_count = counter_hook_rpl1->num_pre_update_calls();
770
771
  // Now wait for about 4 times the heartbeat period the counters
772
  // should have increased between 3 to 8 times.
773
  //
774
  // Why the variance? Heartbeat timing is jittered such that the period
775
  // between heartbeats can be anywhere from half the interval to the full interval.
776
1
  SleepFor(MonoDelta::FromMilliseconds(FLAGS_raft_heartbeat_interval_ms * 4));
777
778
1
  int repl0_final_count = counter_hook_rpl0->num_pre_update_calls();
779
1
  int repl1_final_count = counter_hook_rpl1->num_pre_update_calls();
780
781
1
  ASSERT_GE(repl0_final_count - repl0_init_count, 3);
782
1
  ASSERT_LE(repl0_final_count - repl0_init_count, 8);
783
1
  ASSERT_GE(repl1_final_count - repl1_init_count, 3);
784
1
  ASSERT_LE(repl1_final_count - repl1_init_count, 8);
785
786
1
  VerifyLogs(2, 0, 1);
787
1
}
788
789
// After creating the initial configuration, this test writes a small sequence
790
// of messages to the initial leader. It then shuts down the current
791
// leader, makes another peer become leader and writes a sequence of
792
// messages to it. The new leader and the follower should agree on the
793
// sequence of messages.
794
1
TEST_F(RaftConsensusQuorumTest, TestLeaderElectionWithQuiescedQuorum) {
795
1
  const int kInitialNumPeers = 5;
796
1
  ASSERT_OK(BuildAndStartConfig(kInitialNumPeers));
797
798
1
  OpIdPB last_op_id;
799
1
  vector<scoped_refptr<ConsensusRound> > rounds;
800
801
  // Loop twice, successively shutting down the previous leader.
802
1
  for (int current_config_size = kInitialNumPeers;
803
3
       current_config_size >= kInitialNumPeers - 1;
804
2
       current_config_size--) {
805
2
    REPLICATE_SEQUENCE_OF_MESSAGES(10,
806
2
                                   current_config_size - 1, // The index of the leader.
807
2
                                   WAIT_FOR_ALL_REPLICAS,
808
2
                                   COMMIT_ONE_BY_ONE,
809
2
                                   &last_op_id,
810
2
                                   &rounds);
811
812
    // Make sure the last operation is committed everywhere
813
9
    for (int i = 0; i < current_config_size - 1; i++) {
814
7
      WaitForCommitIfNotAlreadyPresent(last_op_id, i, current_config_size - 1);
815
7
    }
816
817
    // Now shutdown the current leader.
818
2
    LOG(INFO) << "Shutting down current leader with index " << (current_config_size - 1);
819
2
    shared_ptr<RaftConsensus> current_leader;
820
2
    ASSERT_OK(peers_->GetPeerByIdx(current_config_size - 1, &current_leader));
821
2
    current_leader->Shutdown();
822
2
    peers_->RemovePeer(current_leader->peer_uuid());
823
824
    // ... and make the peer before it become leader.
825
2
    shared_ptr<RaftConsensus> new_leader;
826
2
    ASSERT_OK(peers_->GetPeerByIdx(current_config_size - 2, &new_leader));
827
828
    // This will force an election in which we expect to make the last
829
    // non-shutdown peer in the list become leader.
830
2
    LOG(INFO) << "Running election for future leader with index " << (current_config_size - 1);
831
2
    ASSERT_OK(new_leader->StartElection({ElectionMode::ELECT_EVEN_IF_LEADER_IS_ALIVE}));
832
2
    ASSERT_OK(new_leader->WaitUntilLeaderForTests(MonoDelta::FromSeconds(15)));
833
2
    LOG(INFO) << "Election won";
834
835
    // ... replicating a set of messages to the new leader should now be possible.
836
2
    REPLICATE_SEQUENCE_OF_MESSAGES(10,
837
2
                                   current_config_size - 2, // The index of the new leader.
838
2
                                   WAIT_FOR_MAJORITY,
839
2
                                   COMMIT_ONE_BY_ONE,
840
2
                                   &last_op_id,
841
2
                                   &rounds);
842
843
    // Make sure the last operation is committed everywhere
844
7
    for (int i = 0; i < current_config_size - 2; i++) {
845
5
      WaitForCommitIfNotAlreadyPresent(last_op_id, i, current_config_size - 2);
846
5
    }
847
2
  }
848
  // We can only verify the logs of the peers that were not killed, due to the
849
  // old leaders being out-of-date now.
850
1
  VerifyLogs(2, 0, 1);
851
1
}
852
853
1
TEST_F(RaftConsensusQuorumTest, TestReplicasEnforceTheLogMatchingProperty) {
854
1
  ASSERT_OK(BuildAndStartConfig(3));
855
856
1
  OpIdPB last_op_id;
857
1
  vector<scoped_refptr<ConsensusRound> > rounds;
858
1
  REPLICATE_SEQUENCE_OF_MESSAGES(10,
859
1
                                 2, // The index of the initial leader.
860
1
                                 WAIT_FOR_ALL_REPLICAS,
861
1
                                 COMMIT_ONE_BY_ONE,
862
1
                                 &last_op_id,
863
1
                                 &rounds);
864
865
  // Make sure the last operation is committed everywhere
866
1
  WaitForCommitIfNotAlreadyPresent(last_op_id, 0, 2);
867
1
  WaitForCommitIfNotAlreadyPresent(last_op_id, 1, 2);
868
869
  // Now replicas should only accept operations with
870
  // 'last_id' as the preceding id.
871
1
  ConsensusRequestPB req;
872
1
  ConsensusResponsePB resp;
873
874
1
  shared_ptr<RaftConsensus> leader;
875
1
  ASSERT_OK(peers_->GetPeerByIdx(2, &leader));
876
877
1
  shared_ptr<RaftConsensus> follower;
878
1
  ASSERT_OK(peers_->GetPeerByIdx(0, &follower));
879
880
1
  req.set_caller_uuid(leader->peer_uuid());
881
1
  req.set_caller_term(last_op_id.term());
882
1
  req.mutable_preceding_id()->CopyFrom(last_op_id);
883
1
  req.mutable_committed_op_id()->CopyFrom(last_op_id);
884
885
1
  ReplicateMsg* replicate = req.add_ops();
886
1
  replicate->set_hybrid_time(clock_->Now().ToUint64());
887
1
  OpIdPB* id = replicate->mutable_id();
888
1
  id->set_term(last_op_id.term());
889
1
  id->set_index(last_op_id.index() + 1);
890
  // Make a copy of the OpId to be TSAN friendly.
891
1
  auto req_copy = req;
892
1
  auto id_copy = req_copy.mutable_ops(0)->mutable_id();
893
1
  replicate->set_op_type(NO_OP);
894
895
  // Appending this message to peer0 should work and update
896
  // its 'last_received' to 'id'.
897
1
  ASSERT_OK(follower->Update(&req, &resp, CoarseBigDeadline()));
898
1
  ASSERT_TRUE(OpIdEquals(resp.status().last_received(), *id));
899
900
  // Now skip one message in the same term. The replica should
901
  // complain with the right error message.
902
1
  req_copy.mutable_preceding_id()->set_index(id_copy->index() + 1);
903
1
  id_copy->set_index(id_copy->index() + 2);
904
  // Appending this message to peer0 should return a Status::OK
905
  // but should contain an error referring to the log matching property.
906
1
  ASSERT_OK(follower->Update(&req_copy, &resp, CoarseBigDeadline()));
907
1
  ASSERT_TRUE(resp.has_status());
908
1
  ASSERT_TRUE(resp.status().has_error());
909
1
  ASSERT_EQ(resp.status().error().code(), ConsensusErrorPB::PRECEDING_ENTRY_DIDNT_MATCH);
910
1
  ASSERT_STR_CONTAINS(resp.status().error().status().message(),
911
1
                      "Log matching property violated");
912
1
}
913
914
// Test that RequestVote performs according to "spec".
915
1
TEST_F(RaftConsensusQuorumTest, TestRequestVote) {
916
1
  ASSERT_OK(BuildAndStartConfig(3));
917
918
1
  OpIdPB last_op_id;
919
1
  vector<scoped_refptr<ConsensusRound> > rounds;
920
1
  REPLICATE_SEQUENCE_OF_MESSAGES(10,
921
1
                                 2, // The index of the initial leader.
922
1
                                 WAIT_FOR_ALL_REPLICAS,
923
1
                                 COMMIT_ONE_BY_ONE,
924
1
                                 &last_op_id,
925
1
                                 &rounds);
926
927
  // Make sure the last operation is committed everywhere
928
1
  WaitForCommitIfNotAlreadyPresent(last_op_id, 0, 2);
929
1
  WaitForCommitIfNotAlreadyPresent(last_op_id, 1, 2);
930
931
  // Ensure last-logged OpId is > (0,0).
932
1
  ASSERT_TRUE(OpIdLessThan(MinimumOpId(), last_op_id));
933
934
1
  const int kPeerIndex = 1;
935
1
  shared_ptr<RaftConsensus> peer;
936
1
  ASSERT_OK(peers_->GetPeerByIdx(kPeerIndex, &peer));
937
938
1
  VoteRequestPB request;
939
1
  request.set_tablet_id(kTestTablet);
940
1
  request.mutable_candidate_status()->mutable_last_received()->CopyFrom(last_op_id);
941
942
  // Test that the replica won't vote since it has recently heard from
943
  // a valid leader.
944
1
  VoteResponsePB response;
945
1
  request.set_candidate_uuid("peer-0");
946
1
  request.set_candidate_term(last_op_id.term() + 1);
947
1
  ASSERT_OK(peer->RequestVote(&request, &response));
948
1
  ASSERT_FALSE(response.vote_granted());
949
1
  ASSERT_EQ(ConsensusErrorPB::LEADER_IS_ALIVE, response.consensus_error().code());
950
951
  // Test that replicas only vote yes for a single peer per term.
952
953
  // Indicate that replicas should vote even if they think another leader is alive.
954
  // This will allow the rest of the requests in the test to go through.
955
1
  request.set_ignore_live_leader(true);
956
1
  ASSERT_OK(peer->RequestVote(&request, &response));
957
1
  ASSERT_TRUE(response.vote_granted());
958
1
  ASSERT_EQ(last_op_id.term() + 1, response.responder_term());
959
1
  ASSERT_NO_FATALS(AssertDurableTermAndVote(kPeerIndex, last_op_id.term() + 1, "peer-0"));
960
961
  // Ensure we get same response for same term and same UUID.
962
1
  response.Clear();
963
1
  ASSERT_OK(peer->RequestVote(&request, &response));
964
1
  ASSERT_TRUE(response.vote_granted());
965
966
  // Ensure we get a "no" for a different candidate UUID for that term.
967
1
  response.Clear();
968
1
  request.set_candidate_uuid("peer-2");
969
1
  ASSERT_OK(peer->RequestVote(&request, &response));
970
1
  ASSERT_FALSE(response.vote_granted());
971
1
  ASSERT_TRUE(response.has_consensus_error());
972
1
  ASSERT_EQ(ConsensusErrorPB::ALREADY_VOTED, response.consensus_error().code());
973
1
  ASSERT_EQ(last_op_id.term() + 1, response.responder_term());
974
1
  ASSERT_NO_FATALS(AssertDurableTermAndVote(kPeerIndex, last_op_id.term() + 1, "peer-0"));
975
976
  //
977
  // Test that replicas refuse votes for an old term.
978
  //
979
980
  // Increase the term of our candidate, which will cause the voter replica to
981
  // increase its own term to match.
982
1
  request.set_candidate_uuid("peer-0");
983
1
  request.set_candidate_term(last_op_id.term() + 2);
984
1
  response.Clear();
985
1
  ASSERT_OK(peer->RequestVote(&request, &response));
986
1
  ASSERT_TRUE(response.vote_granted());
987
1
  ASSERT_EQ(last_op_id.term() + 2, response.responder_term());
988
1
  ASSERT_NO_FATALS(AssertDurableTermAndVote(kPeerIndex, last_op_id.term() + 2, "peer-0"));
989
990
  // Now try the old term.
991
  // Note: Use the peer who "won" the election on the previous term (peer-0),
992
  // although in practice the impl does not store historical vote data.
993
1
  request.set_candidate_term(last_op_id.term() + 1);
994
1
  response.Clear();
995
1
  ASSERT_OK(peer->RequestVote(&request, &response));
996
1
  ASSERT_FALSE(response.vote_granted());
997
1
  ASSERT_TRUE(response.has_consensus_error());
998
1
  ASSERT_EQ(ConsensusErrorPB::INVALID_TERM, response.consensus_error().code());
999
1
  ASSERT_EQ(last_op_id.term() + 2, response.responder_term());
1000
1
  ASSERT_NO_FATALS(AssertDurableTermAndVote(kPeerIndex, last_op_id.term() + 2, "peer-0"));
1001
1002
  //
1003
  // Ensure replicas vote no for an old op index.
1004
  //
1005
1006
1
  request.set_candidate_uuid("peer-0");
1007
1
  request.set_candidate_term(last_op_id.term() + 3);
1008
1
  request.mutable_candidate_status()->mutable_last_received()->CopyFrom(MinimumOpId());
1009
1
  response.Clear();
1010
1
  ASSERT_OK(peer->RequestVote(&request, &response));
1011
1
  ASSERT_FALSE(response.vote_granted());
1012
1
  ASSERT_TRUE(response.has_consensus_error());
1013
1
  ASSERT_EQ(ConsensusErrorPB::LAST_OPID_TOO_OLD, response.consensus_error().code());
1014
1
  ASSERT_EQ(last_op_id.term() + 3, response.responder_term());
1015
1
  ASSERT_NO_FATALS(AssertDurableTermWithoutVote(kPeerIndex, last_op_id.term() + 3));
1016
1017
  // Send a "heartbeat" to the peer. It should be rejected.
1018
1
  ConsensusRequestPB req;
1019
1
  req.set_caller_term(last_op_id.term());
1020
1
  req.set_caller_uuid("peer-0");
1021
1
  req.mutable_committed_op_id()->CopyFrom(last_op_id);
1022
1
  ConsensusResponsePB res;
1023
1
  Status s = peer->Update(&req, &res, CoarseBigDeadline());
1024
1
  ASSERT_EQ(last_op_id.term() + 3, res.responder_term());
1025
1
  ASSERT_TRUE(res.status().has_error());
1026
1
  ASSERT_EQ(ConsensusErrorPB::INVALID_TERM, res.status().error().code());
1027
1
  LOG(INFO) << "Follower rejected old heartbeat, as expected: " << res.ShortDebugString();
1028
1
}
1029
1030
}  // namespace consensus
1031
}  // namespace yb