/Users/deen/code/yugabyte-db/src/yb/consensus/raft_consensus_quorum-test.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | // |
18 | | // The following only applies to changes made to this file as part of YugaByte development. |
19 | | // |
20 | | // Portions Copyright (c) YugaByte, Inc. |
21 | | // |
22 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
23 | | // in compliance with the License. You may obtain a copy of the License at |
24 | | // |
25 | | // http://www.apache.org/licenses/LICENSE-2.0 |
26 | | // |
27 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
28 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
29 | | // or implied. See the License for the specific language governing permissions and limitations |
30 | | // under the License. |
31 | | // |
32 | | |
33 | | #include <gtest/gtest.h> |
34 | | |
35 | | #include "yb/common/schema.h" |
36 | | #include "yb/common/wire_protocol-test-util.h" |
37 | | |
38 | | #include "yb/consensus/consensus-test-util.h" |
39 | | #include "yb/consensus/log.h" |
40 | | #include "yb/consensus/log_index.h" |
41 | | #include "yb/consensus/log_reader.h" |
42 | | #include "yb/consensus/log_util.h" |
43 | | #include "yb/consensus/opid_util.h" |
44 | | #include "yb/consensus/peer_manager.h" |
45 | | #include "yb/consensus/quorum_util.h" |
46 | | #include "yb/consensus/raft_consensus.h" |
47 | | #include "yb/consensus/replica_state.h" |
48 | | |
49 | | #include "yb/gutil/bind.h" |
50 | | #include "yb/gutil/stl_util.h" |
51 | | #include "yb/gutil/strings/strcat.h" |
52 | | #include "yb/gutil/strings/substitute.h" |
53 | | |
54 | | #include "yb/rpc/messenger.h" |
55 | | |
56 | | #include "yb/server/logical_clock.h" |
57 | | |
58 | | #include "yb/util/mem_tracker.h" |
59 | | #include "yb/util/metrics.h" |
60 | | #include "yb/util/status_log.h" |
61 | | #include "yb/util/test_macros.h" |
62 | | #include "yb/util/test_util.h" |
63 | | #include "yb/util/threadpool.h" |
64 | | |
65 | | DECLARE_int32(raft_heartbeat_interval_ms); |
66 | | DECLARE_bool(enable_leader_failure_detection); |
67 | | |
68 | | METRIC_DECLARE_entity(table); |
69 | | METRIC_DECLARE_entity(tablet); |
70 | | |
71 | | #define REPLICATE_SEQUENCE_OF_MESSAGES(...) \ |
72 | 6 | ASSERT_NO_FATALS(ReplicateSequenceOfMessages(__VA_ARGS__)) |
73 | | |
74 | | using std::shared_ptr; |
75 | | using std::unique_ptr; |
76 | | |
77 | | namespace yb { |
78 | | |
79 | | namespace consensus { |
80 | | |
81 | | using log::Log; |
82 | | using log::LogEntryPB; |
83 | | using log::LogOptions; |
84 | | using log::LogReader; |
85 | | using rpc::RpcContext; |
86 | | using strings::Substitute; |
87 | | using strings::SubstituteAndAppend; |
88 | | |
89 | | const char* kTestTable = "TestTable"; |
90 | | const char* kTestTablet = "TestTablet"; |
91 | | |
92 | 473 | void DoNothing(std::shared_ptr<consensus::StateChangeContext> context) { |
93 | 473 | } |
94 | | |
95 | | // Test suite for tests that focus on multiple peer interaction, but |
96 | | // without integrating with other components, such as transactions. |
97 | | class RaftConsensusQuorumTest : public YBTest { |
98 | | public: |
99 | | RaftConsensusQuorumTest() |
100 | | : clock_(server::LogicalClock::CreateStartingAt(HybridTime(0))), |
101 | | table_metric_entity_( |
102 | | METRIC_ENTITY_table.Instantiate(&metric_registry_, "raft-test-table")), |
103 | | tablet_metric_entity_( |
104 | | METRIC_ENTITY_tablet.Instantiate(&metric_registry_, "raft-test-tablet")), |
105 | 7 | schema_(GetSimpleTestSchema()) { |
106 | 7 | options_.tablet_id = kTestTablet; |
107 | 7 | FLAGS_enable_leader_failure_detection = false; |
108 | 7 | } |
109 | | |
110 | | // Builds an initial configuration of 'num' elements. |
111 | | // All of the peers start as followers. |
112 | 7 | void BuildInitialRaftConfigPB(int num) { |
113 | 7 | config_ = BuildRaftConfigPBForTests(num); |
114 | 7 | config_.set_opid_index(kInvalidOpIdIndex); |
115 | 7 | peers_.reset(new TestPeerMapManager(config_)); |
116 | 7 | } |
117 | | |
118 | 7 | Status BuildFsManagersAndLogs() { |
119 | | // Build the fsmanagers and logs |
120 | 30 | for (int i = 0; i < config_.peers_size(); i++) { |
121 | 23 | shared_ptr<MemTracker> parent_mem_tracker = |
122 | 23 | MemTracker::CreateTracker(Substitute("peer-$0", i)); |
123 | 23 | parent_mem_trackers_.push_back(parent_mem_tracker); |
124 | 23 | string test_path = GetTestPath(Substitute("peer-$0-root", i)); |
125 | 23 | FsManagerOpts opts; |
126 | 23 | opts.parent_mem_tracker = parent_mem_tracker; |
127 | 23 | opts.wal_paths = { test_path }; |
128 | 23 | opts.data_paths = { test_path }; |
129 | 23 | opts.server_type = "tserver_test"; |
130 | 23 | std::unique_ptr<FsManager> fs_manager(new FsManager(env_.get(), opts)); |
131 | 23 | RETURN_NOT_OK(fs_manager->CreateInitialFileSystemLayout()); |
132 | 23 | RETURN_NOT_OK(fs_manager->Open()); |
133 | | |
134 | 23 | scoped_refptr<Log> log; |
135 | 23 | RETURN_NOT_OK(Log::Open(LogOptions(), |
136 | 23 | kTestTablet, |
137 | 23 | fs_manager->GetFirstTabletWalDirOrDie(kTestTable, kTestTablet), |
138 | 23 | fs_manager->uuid(), |
139 | 23 | schema_, |
140 | 23 | 0, // schema_version |
141 | 23 | nullptr, // table_metric_entity |
142 | 23 | nullptr, // tablet_metric_entity |
143 | 23 | log_thread_pool_.get(), |
144 | 23 | log_thread_pool_.get(), |
145 | 23 | std::numeric_limits<int64_t>::max(), // cdc_min_replicated_index |
146 | 23 | &log)); |
147 | 23 | logs_.push_back(log.get()); |
148 | 23 | fs_managers_.push_back(fs_manager.release()); |
149 | 23 | } |
150 | 7 | return Status::OK(); |
151 | 7 | } |
152 | | |
153 | 7 | void BuildPeers() { |
154 | 7 | vector<LocalTestPeerProxyFactory*> proxy_factories; |
155 | 30 | for (int i = 0; i < config_.peers_size(); i++) { |
156 | 23 | auto proxy_factory = std::make_unique<LocalTestPeerProxyFactory>(peers_.get()); |
157 | 23 | proxy_factories.push_back(proxy_factory.get()); |
158 | | |
159 | 23 | auto operation_factory = new TestOperationFactory(); |
160 | | |
161 | 23 | string peer_uuid = Substitute("peer-$0", i); |
162 | | |
163 | 23 | std::unique_ptr<ConsensusMetadata> cmeta; |
164 | 23 | ASSERT_OK(ConsensusMetadata::Create(fs_managers_[i], kTestTablet, peer_uuid, config_, |
165 | 23 | kMinimumTerm, &cmeta)); |
166 | | |
167 | 23 | RaftPeerPB local_peer_pb; |
168 | 23 | ASSERT_OK(GetRaftConfigMember(config_, peer_uuid, &local_peer_pb)); |
169 | 23 | auto queue = std::make_unique<PeerMessageQueue>( |
170 | 23 | tablet_metric_entity_, |
171 | 23 | logs_[i], |
172 | 23 | MemTracker::FindOrCreateTracker(peer_uuid), |
173 | 23 | MemTracker::FindOrCreateTracker(peer_uuid), |
174 | 23 | local_peer_pb, |
175 | 23 | kTestTablet, |
176 | 23 | clock_, |
177 | 23 | nullptr /* consensus_context */, |
178 | 23 | raft_pool_->NewToken(ThreadPool::ExecutionMode::SERIAL)); |
179 | | |
180 | 23 | unique_ptr<ThreadPoolToken> pool_token( |
181 | 23 | raft_pool_->NewToken(ThreadPool::ExecutionMode::CONCURRENT)); |
182 | | |
183 | 23 | auto peer_manager = std::make_unique<PeerManager>( |
184 | 23 | options_.tablet_id, |
185 | 23 | config_.peers(i).permanent_uuid(), |
186 | 23 | proxy_factory.get(), |
187 | 23 | queue.get(), |
188 | 23 | pool_token.get(), |
189 | 23 | nullptr); |
190 | | |
191 | 23 | shared_ptr<RaftConsensus> peer(new RaftConsensus( |
192 | 23 | options_, |
193 | 23 | std::move(cmeta), |
194 | 23 | std::move(proxy_factory), |
195 | 23 | std::move(queue), |
196 | 23 | std::move(peer_manager), |
197 | 23 | std::move(pool_token), |
198 | 23 | table_metric_entity_, |
199 | 23 | tablet_metric_entity_, |
200 | 23 | config_.peers(i).permanent_uuid(), |
201 | 23 | clock_, |
202 | 23 | operation_factory, |
203 | 23 | logs_[i], |
204 | 23 | parent_mem_trackers_[i], |
205 | 23 | Bind(&DoNothing), |
206 | 23 | DEFAULT_TABLE_TYPE, |
207 | 23 | nullptr /* retryable_requests */)); |
208 | | |
209 | 23 | operation_factory->SetConsensus(peer.get()); |
210 | 23 | operation_factories_.emplace_back(operation_factory); |
211 | 23 | peers_->AddPeer(config_.peers(i).permanent_uuid(), peer); |
212 | 23 | } |
213 | 7 | } |
214 | | |
215 | 7 | Status StartPeers() { |
216 | 7 | ConsensusBootstrapInfo boot_info; |
217 | | |
218 | 7 | TestPeerMap all_peers = peers_->GetPeerMapCopy(); |
219 | 23 | for (const TestPeerMap::value_type& entry : all_peers) { |
220 | 23 | RETURN_NOT_OK(entry.second->Start(boot_info)); |
221 | 23 | } |
222 | 7 | return Status::OK(); |
223 | 7 | } |
224 | | |
225 | 7 | Status BuildConfig(int num) { |
226 | 7 | RETURN_NOT_OK(ThreadPoolBuilder("raft").Build(&raft_pool_)); |
227 | 7 | RETURN_NOT_OK(ThreadPoolBuilder("log").Build(&log_thread_pool_)); |
228 | 7 | BuildInitialRaftConfigPB(num); |
229 | 7 | RETURN_NOT_OK(BuildFsManagersAndLogs()); |
230 | 7 | BuildPeers(); |
231 | 7 | return Status::OK(); |
232 | 7 | } |
233 | | |
234 | 6 | Status BuildAndStartConfig(int num) { |
235 | 6 | RETURN_NOT_OK(BuildConfig(num)); |
236 | 6 | RETURN_NOT_OK(StartPeers()); |
237 | | |
238 | | // Automatically elect the last node in the list. |
239 | 6 | const int kLeaderIdx = num - 1; |
240 | 6 | shared_ptr<RaftConsensus> leader; |
241 | 6 | RETURN_NOT_OK(peers_->GetPeerByIdx(kLeaderIdx, &leader)); |
242 | 6 | RETURN_NOT_OK(leader->EmulateElection()); |
243 | 6 | RETURN_NOT_OK(leader->WaitUntilLeaderForTests(MonoDelta::FromSeconds(10))); |
244 | 6 | return Status::OK(); |
245 | 6 | } |
246 | | |
247 | 104 | LocalTestPeerProxy* GetLeaderProxyToPeer(int peer_idx, int leader_idx) { |
248 | 104 | shared_ptr<RaftConsensus> follower; |
249 | 104 | CHECK_OK(peers_->GetPeerByIdx(peer_idx, &follower)); |
250 | 104 | shared_ptr<RaftConsensus> leader; |
251 | 104 | CHECK_OK(peers_->GetPeerByIdx(leader_idx, &leader)); |
252 | 104 | for (LocalTestPeerProxy* proxy : down_cast<LocalTestPeerProxyFactory*>( |
253 | 156 | leader->peer_proxy_factory_.get())->GetProxies()) { |
254 | 156 | if (proxy->GetTarget() == follower->peer_uuid()) { |
255 | 104 | return proxy; |
256 | 104 | } |
257 | 156 | } |
258 | 0 | CHECK(false) << "Proxy not found"; |
259 | 0 | return nullptr; |
260 | 104 | } |
261 | | |
262 | | Status AppendDummyMessage(int peer_idx, |
263 | 172 | scoped_refptr<ConsensusRound>* round) { |
264 | 172 | auto msg = std::make_shared<ReplicateMsg>(); |
265 | 172 | msg->set_op_type(NO_OP); |
266 | 172 | msg->mutable_noop_request(); |
267 | 172 | msg->set_hybrid_time(clock_->Now().ToUint64()); |
268 | | |
269 | 172 | shared_ptr<RaftConsensus> peer; |
270 | 172 | CHECK_OK(peers_->GetPeerByIdx(peer_idx, &peer)); |
271 | | |
272 | | // Use a latch in place of a Transaction callback. |
273 | 172 | auto sync = std::make_unique<Synchronizer>(); |
274 | 172 | *round = make_scoped_refptr<ConsensusRound>(peer.get(), std::move(msg)); |
275 | 172 | (**round).SetCallback(MakeNonTrackedRoundCallback( |
276 | 172 | round->get(), |
277 | 172 | [sync = sync.get()](const Status& status) { |
278 | 172 | sync->StatusCB(status); |
279 | 172 | })); |
280 | 172 | (**round).BindToTerm(peer->LeaderTerm()); |
281 | 172 | InsertOrDie(&syncs_, round->get(), sync.release()); |
282 | 172 | RETURN_NOT_OK_PREPEND(peer->TEST_Replicate(round->get()), |
283 | 172 | Substitute("Unable to replicate to peer $0", peer_idx)); |
284 | 172 | return Status::OK(); |
285 | 172 | } |
286 | | |
287 | 172 | Status WaitForReplicate(ConsensusRound* round) { |
288 | 172 | return FindOrDie(syncs_, round)->Wait(); |
289 | 172 | } |
290 | | |
291 | 1 | Status TimedWaitForReplicate(ConsensusRound* round, const MonoDelta& delta) { |
292 | 1 | return FindOrDie(syncs_, round)->WaitFor(delta); |
293 | 1 | } |
294 | | |
295 | 17 | void WaitForReplicateIfNotAlreadyPresent(const OpIdPB& to_wait_for, int peer_idx) { |
296 | 17 | shared_ptr<RaftConsensus> peer; |
297 | 17 | ASSERT_OK(peers_->GetPeerByIdx(peer_idx, &peer)); |
298 | 17 | ReplicaState* state = peer->GetReplicaStateForTests(); |
299 | 343 | while (true) { |
300 | 343 | { |
301 | 343 | auto lock = state->LockForRead(); |
302 | 343 | if (state->GetLastReceivedOpIdUnlocked().index >= to_wait_for.index()) { |
303 | 17 | return; |
304 | 17 | } |
305 | 326 | } |
306 | 326 | SleepFor(MonoDelta::FromMilliseconds(1)); |
307 | 326 | } |
308 | 17 | } |
309 | | |
310 | | // Waits for an operation to be (database) committed in the replica at index |
311 | | // 'peer_idx'. If the operation was already committed this returns immediately. |
312 | | void WaitForCommitIfNotAlreadyPresent(const OpIdPB& to_wait_for, |
313 | | int peer_idx, |
314 | 26 | int leader_idx) { |
315 | 26 | MonoDelta timeout(MonoDelta::FromSeconds(10)); |
316 | 26 | MonoTime start(MonoTime::Now()); |
317 | | |
318 | 26 | shared_ptr<RaftConsensus> peer; |
319 | 26 | ASSERT_OK(peers_->GetPeerByIdx(peer_idx, &peer)); |
320 | 26 | ReplicaState* state = peer->GetReplicaStateForTests(); |
321 | | |
322 | 26 | int backoff_exp = 0; |
323 | 26 | const int kMaxBackoffExp = 8; |
324 | 26 | OpIdPB committed_op_id; |
325 | 51 | while (true) { |
326 | 51 | { |
327 | 51 | auto lock = state->LockForRead(); |
328 | 51 | state->GetCommittedOpIdUnlocked().ToPB(&committed_op_id); |
329 | 51 | if (OpIdCompare(committed_op_id, to_wait_for) >= 0) { |
330 | 26 | return; |
331 | 26 | } |
332 | 25 | } |
333 | 25 | MonoDelta elapsed = MonoTime::Now().GetDeltaSince(start); |
334 | 25 | if (elapsed.MoreThan(timeout)) { |
335 | 0 | break; |
336 | 0 | } |
337 | 25 | SleepFor(MonoDelta::FromMilliseconds(1 << backoff_exp)); |
338 | 25 | backoff_exp = std::min(backoff_exp + 1, kMaxBackoffExp); |
339 | 25 | } |
340 | | |
341 | 0 | LOG(ERROR) << "Max timeout reached (" << timeout.ToString() << ") while waiting for commit of " |
342 | 0 | << "op " << to_wait_for << " on replica. Last committed op on replica: " |
343 | 0 | << committed_op_id << ". Dumping state and quitting."; |
344 | 0 | vector<string> lines; |
345 | 0 | shared_ptr<RaftConsensus> leader; |
346 | 0 | ASSERT_OK(peers_->GetPeerByIdx(leader_idx, &leader)); |
347 | 0 | for (const string& line : lines) { |
348 | 0 | LOG(ERROR) << line; |
349 | 0 | } |
350 | | |
351 | | // Gather the replica and leader operations for printing |
352 | 0 | log::LogEntries replica_ops = GatherLogEntries(peer_idx, logs_[peer_idx]); |
353 | 0 | log::LogEntries leader_ops = GatherLogEntries(leader_idx, logs_[leader_idx]); |
354 | 0 | SCOPED_TRACE(PrintOnError(replica_ops, Substitute("local peer ($0)", peer->peer_uuid()))); |
355 | 0 | SCOPED_TRACE(PrintOnError(leader_ops, Substitute("leader (peer-$0)", leader_idx))); |
356 | 0 | FAIL() << "Replica did not commit."; |
357 | 0 | } |
358 | | |
359 | | // Used in ReplicateSequenceOfMessages() to specify whether |
360 | | // we should wait for all replicas to have replicated the |
361 | | // sequence or just a majority. |
362 | | enum ReplicateWaitMode { |
363 | | WAIT_FOR_ALL_REPLICAS, |
364 | | WAIT_FOR_MAJORITY |
365 | | }; |
366 | | |
367 | | // Used in ReplicateSequenceOfMessages() to specify whether |
368 | | // we should also commit the messages in the sequence |
369 | | enum CommitMode { |
370 | | DONT_COMMIT, |
371 | | COMMIT_ONE_BY_ONE |
372 | | }; |
373 | | |
374 | | // Replicates a sequence of messages to the peer passed as leader. |
375 | | // Optionally waits for the messages to be replicated to followers. |
376 | | // 'last_op_id' is set to the id of the last replicated operation. |
377 | | // The operations are only committed if 'commit_one_by_one' is true. |
378 | | void ReplicateSequenceOfMessages(int seq_size, |
379 | | int leader_idx, |
380 | | ReplicateWaitMode wait_mode, |
381 | | CommitMode commit_mode, |
382 | | OpIdPB* last_op_id, |
383 | 7 | vector<scoped_refptr<ConsensusRound> >* rounds) { |
384 | 77 | for (int i = 0; i < seq_size; i++) { |
385 | 70 | scoped_refptr<ConsensusRound> round; |
386 | 70 | ASSERT_OK(AppendDummyMessage(leader_idx, &round)); |
387 | 70 | ASSERT_OK(WaitForReplicate(round.get())); |
388 | 70 | round->id().ToPB(last_op_id); |
389 | 70 | rounds->push_back(round); |
390 | 70 | } |
391 | | |
392 | 7 | if (wait_mode == WAIT_FOR_ALL_REPLICAS) { |
393 | 4 | shared_ptr<RaftConsensus> leader; |
394 | 4 | ASSERT_OK(peers_->GetPeerByIdx(leader_idx, &leader)); |
395 | | |
396 | 4 | TestPeerMap all_peers = peers_->GetPeerMapCopy(); |
397 | 4 | int i = 0; |
398 | 15 | for (const TestPeerMap::value_type& entry : all_peers) { |
399 | 15 | if (entry.second->peer_uuid() != leader->peer_uuid()) { |
400 | 11 | WaitForReplicateIfNotAlreadyPresent(*last_op_id, i); |
401 | 11 | } |
402 | 15 | i++; |
403 | 15 | } |
404 | 4 | } |
405 | 7 | } |
406 | | |
407 | 10 | log::LogEntries GatherLogEntries(int idx, const scoped_refptr<Log>& log) { |
408 | 10 | EXPECT_OK(log->WaitUntilAllFlushed()); |
409 | 10 | EXPECT_OK(log->Close()); |
410 | 10 | std::unique_ptr<LogReader> log_reader; |
411 | 10 | EXPECT_OK(log::LogReader::Open(fs_managers_[idx]->env(), |
412 | 10 | scoped_refptr<log::LogIndex>(), |
413 | 10 | "Log reader: ", |
414 | 10 | fs_managers_[idx]->GetFirstTabletWalDirOrDie(kTestTable, |
415 | 10 | kTestTablet), |
416 | 10 | table_metric_entity_.get(), |
417 | 10 | tablet_metric_entity_.get(), |
418 | 10 | &log_reader)); |
419 | 10 | log::LogEntries ret; |
420 | 10 | log::SegmentSequence segments; |
421 | 10 | EXPECT_OK(log_reader->GetSegmentsSnapshot(&segments)); |
422 | | |
423 | 10 | for (const log::SegmentSequence::value_type& entry : segments) { |
424 | 10 | auto result = entry->ReadEntries(); |
425 | 10 | EXPECT_OK(result.status); |
426 | 318 | for (auto& e : result.entries) { |
427 | 318 | ret.push_back(std::move(e)); |
428 | 318 | } |
429 | 10 | } |
430 | | |
431 | 10 | return ret; |
432 | 10 | } |
433 | | |
434 | | // Verifies that the replica's log match the leader's. This deletes the |
435 | | // peers (so we're sure that no further writes occur) and closes the logs |
436 | | // so it must be the very last thing to run, in a test. |
437 | 5 | void VerifyLogs(int leader_idx, int first_replica_idx, int last_replica_idx) { |
438 | | // Wait for in-flight transactions to be done. We're destroying the |
439 | | // peers next and leader transactions won't be able to commit anymore. |
440 | 17 | for (const auto& factory : operation_factories_) { |
441 | 17 | factory->WaitDone(); |
442 | 17 | } |
443 | | |
444 | | // Shut down all the peers. |
445 | 5 | TestPeerMap all_peers = peers_->GetPeerMapCopy(); |
446 | 15 | for (const TestPeerMap::value_type& entry : all_peers) { |
447 | 15 | entry.second->Shutdown(); |
448 | 15 | } |
449 | | |
450 | 5 | log::LogEntries leader_entries = GatherLogEntries(leader_idx, logs_[leader_idx]); |
451 | 5 | shared_ptr<RaftConsensus> leader; |
452 | 5 | ASSERT_OK(peers_->GetPeerByIdx(leader_idx, &leader)); |
453 | | |
454 | 10 | for (int replica_idx = first_replica_idx; replica_idx < last_replica_idx; replica_idx++) { |
455 | 5 | log::LogEntries replica_entries = GatherLogEntries(replica_idx, logs_[replica_idx]); |
456 | | |
457 | 5 | shared_ptr<RaftConsensus> replica; |
458 | 5 | ASSERT_OK(peers_->GetPeerByIdx(replica_idx, &replica)); |
459 | 5 | VerifyReplica(leader_entries, |
460 | 5 | replica_entries, |
461 | 5 | leader->peer_uuid(), |
462 | 5 | replica->peer_uuid()); |
463 | 5 | } |
464 | 5 | } |
465 | | |
466 | 10 | std::vector<OpIdPB> ExtractReplicateIds(const log::LogEntries& entries) { |
467 | 10 | std::vector<OpIdPB> result; |
468 | 10 | result.reserve(entries.size() / 2); |
469 | 318 | for (const auto& entry : entries) { |
470 | 318 | if (entry->has_replicate()) { |
471 | 318 | result.push_back(entry->replicate().id()); |
472 | 318 | } |
473 | 318 | } |
474 | 10 | return result; |
475 | 10 | } |
476 | | |
477 | | void VerifyReplicateOrderMatches(const log::LogEntries& leader_entries, |
478 | 5 | const log::LogEntries& replica_entries) { |
479 | 5 | auto leader_ids = ExtractReplicateIds(leader_entries); |
480 | 5 | auto replica_ids = ExtractReplicateIds(replica_entries); |
481 | 5 | ASSERT_EQ(leader_ids.size(), replica_ids.size()); |
482 | 164 | for (size_t i = 0; i < leader_ids.size(); i++) { |
483 | 159 | ASSERT_EQ(leader_ids[i].ShortDebugString(), |
484 | 159 | replica_ids[i].ShortDebugString()); |
485 | 159 | } |
486 | 5 | } |
487 | | |
488 | 10 | void VerifyNoCommitsBeforeReplicates(const log::LogEntries& entries) { |
489 | 10 | std::unordered_set<OpIdPB, OpIdHashFunctor, OpIdEqualsFunctor> replication_ops; |
490 | | |
491 | 318 | for (const auto& entry : entries) { |
492 | 318 | if (entry->has_replicate()) { |
493 | 636 | ASSERT_TRUE(InsertIfNotPresent(&replication_ops, entry->replicate().id())) |
494 | 636 | << "REPLICATE op id showed up twice: " << entry->ShortDebugString(); |
495 | 318 | } |
496 | 318 | } |
497 | 10 | } |
498 | | |
499 | | void VerifyReplica(const log::LogEntries& leader_entries, |
500 | | const log::LogEntries& replica_entries, |
501 | | const string& leader_name, |
502 | 5 | const string& replica_name) { |
503 | 5 | SCOPED_TRACE(PrintOnError(leader_entries, Substitute("Leader: $0", leader_name))); |
504 | 5 | SCOPED_TRACE(PrintOnError(replica_entries, Substitute("Replica: $0", replica_name))); |
505 | | |
506 | | // Check that the REPLICATE messages come in the same order on both nodes. |
507 | 5 | VerifyReplicateOrderMatches(leader_entries, replica_entries); |
508 | | |
509 | | // Check that no COMMIT precedes its related REPLICATE on both the replica |
510 | | // and leader. |
511 | 5 | VerifyNoCommitsBeforeReplicates(replica_entries); |
512 | 5 | VerifyNoCommitsBeforeReplicates(leader_entries); |
513 | 5 | } |
514 | | |
515 | | string PrintOnError(const log::LogEntries& replica_entries, |
516 | 10 | const string& replica_id) { |
517 | 10 | string ret = ""; |
518 | 10 | SubstituteAndAppend(&ret, "$1 log entries for replica $0:\n", |
519 | 10 | replica_id, replica_entries.size()); |
520 | 318 | for (const auto& replica_entry : replica_entries) { |
521 | 318 | StrAppend(&ret, "Replica log entry: ", replica_entry->ShortDebugString(), "\n"); |
522 | 318 | } |
523 | 10 | return ret; |
524 | 10 | } |
525 | | |
526 | | // Read the ConsensusMetadata for the given peer from disk. |
527 | 5 | std::unique_ptr<ConsensusMetadata> ReadConsensusMetadataFromDisk(int peer_index) { |
528 | 5 | string peer_uuid = Substitute("peer-$0", peer_index); |
529 | 5 | std::unique_ptr<ConsensusMetadata> cmeta; |
530 | 5 | CHECK_OK(ConsensusMetadata::Load(fs_managers_[peer_index], kTestTablet, peer_uuid, &cmeta)); |
531 | 5 | return cmeta; |
532 | 5 | } |
533 | | |
534 | | // Assert that the durable term == term and that the peer that got the vote == voted_for. |
535 | 4 | void AssertDurableTermAndVote(int peer_index, int64_t term, const std::string& voted_for) { |
536 | 4 | auto cmeta = ReadConsensusMetadataFromDisk(peer_index); |
537 | 4 | ASSERT_EQ(term, cmeta->current_term()); |
538 | 4 | ASSERT_EQ(voted_for, cmeta->voted_for()); |
539 | 4 | } |
540 | | |
541 | | // Assert that the durable term == term and that the peer has not yet voted. |
542 | 1 | void AssertDurableTermWithoutVote(int peer_index, int64_t term) { |
543 | 1 | auto cmeta = ReadConsensusMetadataFromDisk(peer_index); |
544 | 1 | ASSERT_EQ(term, cmeta->current_term()); |
545 | 1 | ASSERT_FALSE(cmeta->has_voted_for()); |
546 | 1 | } |
547 | | |
548 | 7 | ~RaftConsensusQuorumTest() { |
549 | 7 | peers_->Clear(); |
550 | 7 | operation_factories_.clear(); |
551 | | // We need to clear the logs before deleting the fs_managers_ or we'll |
552 | | // get a SIGSEGV when closing the logs. |
553 | 7 | logs_.clear(); |
554 | 7 | STLDeleteElements(&fs_managers_); |
555 | 7 | STLDeleteValues(&syncs_); |
556 | 7 | } |
557 | | |
558 | | protected: |
559 | | ConsensusOptions options_; |
560 | | RaftConfigPB config_; |
561 | | OpIdPB initial_id_; |
562 | | vector<shared_ptr<MemTracker> > parent_mem_trackers_; |
563 | | vector<FsManager*> fs_managers_; |
564 | | vector<scoped_refptr<Log> > logs_; |
565 | | unique_ptr<ThreadPool> raft_pool_; |
566 | | unique_ptr<ThreadPool> log_thread_pool_; |
567 | | std::unique_ptr<TestPeerMapManager> peers_; |
568 | | std::vector<std::unique_ptr<TestOperationFactory>> operation_factories_; |
569 | | scoped_refptr<server::Clock> clock_; |
570 | | MetricRegistry metric_registry_; |
571 | | scoped_refptr<MetricEntity> table_metric_entity_; |
572 | | scoped_refptr<MetricEntity> tablet_metric_entity_; |
573 | | const Schema schema_; |
574 | | std::unordered_map<ConsensusRound*, Synchronizer*> syncs_; |
575 | | }; |
576 | | |
577 | 1 | TEST_F(RaftConsensusQuorumTest, TestConsensusContinuesIfAMinorityFallsBehind) { |
578 | | // Constants with the indexes of peers with certain roles, |
579 | | // since peers don't change roles in this test. |
580 | 1 | const int kFollower0Idx = 0; |
581 | 1 | const int kFollower1Idx = 1; |
582 | 1 | const int kLeaderIdx = 2; |
583 | | |
584 | 1 | ASSERT_OK(BuildAndStartConfig(3)); |
585 | | |
586 | 1 | OpIdPB last_replicate; |
587 | 1 | vector<scoped_refptr<ConsensusRound> > rounds; |
588 | 1 | { |
589 | | // lock one of the replicas down by obtaining the state lock |
590 | | // and never letting it go. |
591 | 1 | shared_ptr<RaftConsensus> follower0; |
592 | 1 | ASSERT_OK(peers_->GetPeerByIdx(kFollower0Idx, &follower0)); |
593 | | |
594 | 1 | ReplicaState* follower0_rs = follower0->GetReplicaStateForTests(); |
595 | 1 | auto lock = follower0_rs->LockForRead(); |
596 | | |
597 | | // If the locked replica would stop consensus we would hang here |
598 | | // as we wait for operations to be replicated to a majority. |
599 | 1 | ASSERT_NO_FATALS(ReplicateSequenceOfMessages( |
600 | 1 | 10, |
601 | 1 | kLeaderIdx, |
602 | 1 | WAIT_FOR_MAJORITY, |
603 | 1 | COMMIT_ONE_BY_ONE, |
604 | 1 | &last_replicate, |
605 | 1 | &rounds)); |
606 | | |
607 | | // Follower 1 should be fine (Were we to wait for follower0's replicate |
608 | | // this would hang here). We know he must have replicated but make sure |
609 | | // by calling Wait(). |
610 | 1 | WaitForReplicateIfNotAlreadyPresent(last_replicate, kFollower1Idx); |
611 | 1 | WaitForCommitIfNotAlreadyPresent(last_replicate, kFollower1Idx, kLeaderIdx); |
612 | 1 | } |
613 | | |
614 | | // After we let the lock go the remaining follower should get up-to-date |
615 | 1 | WaitForReplicateIfNotAlreadyPresent(last_replicate, kFollower0Idx); |
616 | 1 | WaitForCommitIfNotAlreadyPresent(last_replicate, kFollower0Idx, kLeaderIdx); |
617 | 1 | VerifyLogs(2, 0, 1); |
618 | 1 | } |
619 | | |
620 | 1 | TEST_F(RaftConsensusQuorumTest, TestConsensusStopsIfAMajorityFallsBehind) { |
621 | | // Constants with the indexes of peers with certain roles, |
622 | | // since peers don't change roles in this test. |
623 | 1 | const int kFollower0Idx = 0; |
624 | 1 | const int kFollower1Idx = 1; |
625 | 1 | const int kLeaderIdx = 2; |
626 | | |
627 | 1 | ASSERT_OK(BuildAndStartConfig(3)); |
628 | | |
629 | 1 | OpIdPB last_op_id; |
630 | | |
631 | 1 | scoped_refptr<ConsensusRound> round; |
632 | 1 | { |
633 | | // lock two of the replicas down by obtaining the state locks |
634 | | // and never letting them go. |
635 | 1 | shared_ptr<RaftConsensus> follower0; |
636 | 1 | ASSERT_OK(peers_->GetPeerByIdx(kFollower0Idx, &follower0)); |
637 | 1 | ReplicaState* follower0_rs = follower0->GetReplicaStateForTests(); |
638 | 1 | auto lock0 = follower0_rs->LockForRead(); |
639 | | |
640 | 1 | shared_ptr<RaftConsensus> follower1; |
641 | 1 | ASSERT_OK(peers_->GetPeerByIdx(kFollower1Idx, &follower1)); |
642 | 1 | ReplicaState* follower1_rs = follower1->GetReplicaStateForTests(); |
643 | 1 | auto lock1 = follower1_rs->LockForRead(); |
644 | | |
645 | | // Append a single message to the queue |
646 | 1 | ASSERT_OK(AppendDummyMessage(kLeaderIdx, &round)); |
647 | 1 | round->id().ToPB(&last_op_id); |
648 | | // This should timeout. |
649 | 1 | Status status = TimedWaitForReplicate(round.get(), MonoDelta::FromMilliseconds(500)); |
650 | 1 | ASSERT_TRUE(status.IsTimedOut()); |
651 | 1 | } |
652 | | |
653 | | // After we release the locks the operation should replicate to all replicas |
654 | | // and we commit. |
655 | 1 | ASSERT_OK(WaitForReplicate(round.get())); |
656 | | |
657 | | // Assert that everything was ok |
658 | 1 | WaitForReplicateIfNotAlreadyPresent(last_op_id, kFollower0Idx); |
659 | 1 | WaitForReplicateIfNotAlreadyPresent(last_op_id, kFollower1Idx); |
660 | 1 | WaitForCommitIfNotAlreadyPresent(last_op_id, kFollower0Idx, kLeaderIdx); |
661 | 1 | WaitForCommitIfNotAlreadyPresent(last_op_id, kFollower1Idx, kLeaderIdx); |
662 | 1 | VerifyLogs(2, 0, 1); |
663 | 1 | } |
664 | | |
665 | | // If some communication error happens the leader will resend the request to the |
666 | | // peers. This tests that the peers handle repeated requests. |
667 | 1 | TEST_F(RaftConsensusQuorumTest, TestReplicasHandleCommunicationErrors) { |
668 | | // Constants with the indexes of peers with certain roles, |
669 | | // since peers don't change roles in this test. |
670 | 1 | const int kFollower0Idx = 0; |
671 | 1 | const int kFollower1Idx = 1; |
672 | 1 | const int kLeaderIdx = 2; |
673 | | |
674 | 1 | ASSERT_OK(BuildAndStartConfig(3)); |
675 | | |
676 | 1 | OpIdPB last_op_id; |
677 | | |
678 | | // Append a dummy message, with faults injected on the first attempt |
679 | | // to send the message. |
680 | 1 | scoped_refptr<ConsensusRound> round; |
681 | 1 | GetLeaderProxyToPeer(kFollower0Idx, kLeaderIdx)->InjectCommFaultLeaderSide(); |
682 | 1 | GetLeaderProxyToPeer(kFollower1Idx, kLeaderIdx)->InjectCommFaultLeaderSide(); |
683 | 1 | ASSERT_OK(AppendDummyMessage(kLeaderIdx, &round)); |
684 | | |
685 | | // We should successfully replicate it due to retries. |
686 | 1 | ASSERT_OK(WaitForReplicate(round.get())); |
687 | | |
688 | 1 | GetLeaderProxyToPeer(kFollower0Idx, kLeaderIdx)->InjectCommFaultLeaderSide(); |
689 | 1 | GetLeaderProxyToPeer(kFollower1Idx, kLeaderIdx)->InjectCommFaultLeaderSide(); |
690 | | |
691 | | // The commit should eventually reach both followers as well. |
692 | 1 | round->id().ToPB(&last_op_id); |
693 | 1 | WaitForCommitIfNotAlreadyPresent(last_op_id, kFollower0Idx, kLeaderIdx); |
694 | 1 | WaitForCommitIfNotAlreadyPresent(last_op_id, kFollower1Idx, kLeaderIdx); |
695 | | |
696 | | // Append a sequence of messages, and keep injecting errors into the |
697 | | // replica proxies. |
698 | 1 | vector<scoped_refptr<ConsensusRound> > rounds; |
699 | 101 | for (int i = 0; i < 100; i++) { |
700 | 100 | scoped_refptr<ConsensusRound> round; |
701 | 100 | ASSERT_OK(AppendDummyMessage(kLeaderIdx, &round)); |
702 | 100 | ConsensusRound* round_ptr = round.get(); |
703 | 100 | round->id().ToPB(&last_op_id); |
704 | 100 | rounds.push_back(round); |
705 | | |
706 | | // inject comm faults |
707 | 100 | if (i % 2 == 0) { |
708 | 50 | GetLeaderProxyToPeer(kFollower0Idx, kLeaderIdx)->InjectCommFaultLeaderSide(); |
709 | 50 | } else { |
710 | 50 | GetLeaderProxyToPeer(kFollower1Idx, kLeaderIdx)->InjectCommFaultLeaderSide(); |
711 | 50 | } |
712 | | |
713 | 100 | ASSERT_OK(WaitForReplicate(round_ptr)); |
714 | 100 | } |
715 | | |
716 | | // Assert last operation was correctly replicated and committed. |
717 | 1 | WaitForReplicateIfNotAlreadyPresent(last_op_id, kFollower0Idx); |
718 | 1 | WaitForReplicateIfNotAlreadyPresent(last_op_id, kFollower1Idx); |
719 | | |
720 | | // See comment at the end of TestFollowersReplicateAndCommitMessage |
721 | | // for an explanation on this waiting sequence. |
722 | 1 | WaitForCommitIfNotAlreadyPresent(last_op_id, kFollower0Idx, kLeaderIdx); |
723 | 1 | WaitForCommitIfNotAlreadyPresent(last_op_id, kFollower1Idx, kLeaderIdx); |
724 | 1 | VerifyLogs(2, 0, 1); |
725 | 1 | } |
726 | | |
727 | | // In this test we test the ability of the leader to send heartbeats |
728 | | // to replicas by simply pushing nothing after the configuration round |
729 | | // and still expecting for the replicas Update() hooks to be called. |
730 | 1 | TEST_F(RaftConsensusQuorumTest, TestLeaderHeartbeats) { |
731 | | // Constants with the indexes of peers with certain roles, |
732 | | // since peers don't change roles in this test. |
733 | 1 | const int kFollower0Idx = 0; |
734 | 1 | const int kFollower1Idx = 1; |
735 | 1 | const int kLeaderIdx = 2; |
736 | | |
737 | 1 | ASSERT_OK(BuildConfig(3)); |
738 | | |
739 | 1 | shared_ptr<RaftConsensus> follower0; |
740 | 1 | ASSERT_OK(peers_->GetPeerByIdx(kFollower0Idx, &follower0)); |
741 | 1 | shared_ptr<RaftConsensus> follower1; |
742 | 1 | ASSERT_OK(peers_->GetPeerByIdx(kFollower1Idx, &follower1)); |
743 | | |
744 | 1 | shared_ptr<CounterHooks> counter_hook_rpl0( |
745 | 1 | new CounterHooks(follower0->GetFaultHooks())); |
746 | 1 | shared_ptr<CounterHooks> counter_hook_rpl1( |
747 | 1 | new CounterHooks(follower1->GetFaultHooks())); |
748 | | |
749 | | // Replace the default fault hooks on the replicas with counter hooks |
750 | | // before we start the configuration. |
751 | 1 | follower0->SetFaultHooks(counter_hook_rpl0); |
752 | 1 | follower1->SetFaultHooks(counter_hook_rpl1); |
753 | | |
754 | 1 | ASSERT_OK(StartPeers()); |
755 | | |
756 | 1 | shared_ptr<RaftConsensus> leader; |
757 | 1 | ASSERT_OK(peers_->GetPeerByIdx(kLeaderIdx, &leader)); |
758 | 1 | ASSERT_OK(leader->EmulateElection()); |
759 | | |
760 | | // Wait for the config round to get committed and count the number |
761 | | // of update calls, calls after that will be heartbeats. |
762 | 1 | OpIdPB config_round; |
763 | 1 | config_round.set_term(1); |
764 | 1 | config_round.set_index(1); |
765 | 1 | WaitForCommitIfNotAlreadyPresent(config_round, kFollower0Idx, kLeaderIdx); |
766 | 1 | WaitForCommitIfNotAlreadyPresent(config_round, kFollower1Idx, kLeaderIdx); |
767 | | |
768 | 1 | int repl0_init_count = counter_hook_rpl0->num_pre_update_calls(); |
769 | 1 | int repl1_init_count = counter_hook_rpl1->num_pre_update_calls(); |
770 | | |
771 | | // Now wait for about 4 times the heartbeat period the counters |
772 | | // should have increased between 3 to 8 times. |
773 | | // |
774 | | // Why the variance? Heartbeat timing is jittered such that the period |
775 | | // between heartbeats can be anywhere from half the interval to the full interval. |
776 | 1 | SleepFor(MonoDelta::FromMilliseconds(FLAGS_raft_heartbeat_interval_ms * 4)); |
777 | | |
778 | 1 | int repl0_final_count = counter_hook_rpl0->num_pre_update_calls(); |
779 | 1 | int repl1_final_count = counter_hook_rpl1->num_pre_update_calls(); |
780 | | |
781 | 1 | ASSERT_GE(repl0_final_count - repl0_init_count, 3); |
782 | 1 | ASSERT_LE(repl0_final_count - repl0_init_count, 8); |
783 | 1 | ASSERT_GE(repl1_final_count - repl1_init_count, 3); |
784 | 1 | ASSERT_LE(repl1_final_count - repl1_init_count, 8); |
785 | | |
786 | 1 | VerifyLogs(2, 0, 1); |
787 | 1 | } |
788 | | |
789 | | // After creating the initial configuration, this test writes a small sequence |
790 | | // of messages to the initial leader. It then shuts down the current |
791 | | // leader, makes another peer become leader and writes a sequence of |
792 | | // messages to it. The new leader and the follower should agree on the |
793 | | // sequence of messages. |
794 | 1 | TEST_F(RaftConsensusQuorumTest, TestLeaderElectionWithQuiescedQuorum) { |
795 | 1 | const int kInitialNumPeers = 5; |
796 | 1 | ASSERT_OK(BuildAndStartConfig(kInitialNumPeers)); |
797 | | |
798 | 1 | OpIdPB last_op_id; |
799 | 1 | vector<scoped_refptr<ConsensusRound> > rounds; |
800 | | |
801 | | // Loop twice, successively shutting down the previous leader. |
802 | 1 | for (int current_config_size = kInitialNumPeers; |
803 | 3 | current_config_size >= kInitialNumPeers - 1; |
804 | 2 | current_config_size--) { |
805 | 2 | REPLICATE_SEQUENCE_OF_MESSAGES(10, |
806 | 2 | current_config_size - 1, // The index of the leader. |
807 | 2 | WAIT_FOR_ALL_REPLICAS, |
808 | 2 | COMMIT_ONE_BY_ONE, |
809 | 2 | &last_op_id, |
810 | 2 | &rounds); |
811 | | |
812 | | // Make sure the last operation is committed everywhere |
813 | 9 | for (int i = 0; i < current_config_size - 1; i++) { |
814 | 7 | WaitForCommitIfNotAlreadyPresent(last_op_id, i, current_config_size - 1); |
815 | 7 | } |
816 | | |
817 | | // Now shutdown the current leader. |
818 | 2 | LOG(INFO) << "Shutting down current leader with index " << (current_config_size - 1); |
819 | 2 | shared_ptr<RaftConsensus> current_leader; |
820 | 2 | ASSERT_OK(peers_->GetPeerByIdx(current_config_size - 1, ¤t_leader)); |
821 | 2 | current_leader->Shutdown(); |
822 | 2 | peers_->RemovePeer(current_leader->peer_uuid()); |
823 | | |
824 | | // ... and make the peer before it become leader. |
825 | 2 | shared_ptr<RaftConsensus> new_leader; |
826 | 2 | ASSERT_OK(peers_->GetPeerByIdx(current_config_size - 2, &new_leader)); |
827 | | |
828 | | // This will force an election in which we expect to make the last |
829 | | // non-shutdown peer in the list become leader. |
830 | 2 | LOG(INFO) << "Running election for future leader with index " << (current_config_size - 1); |
831 | 2 | ASSERT_OK(new_leader->StartElection({ElectionMode::ELECT_EVEN_IF_LEADER_IS_ALIVE})); |
832 | 2 | ASSERT_OK(new_leader->WaitUntilLeaderForTests(MonoDelta::FromSeconds(15))); |
833 | 2 | LOG(INFO) << "Election won"; |
834 | | |
835 | | // ... replicating a set of messages to the new leader should now be possible. |
836 | 2 | REPLICATE_SEQUENCE_OF_MESSAGES(10, |
837 | 2 | current_config_size - 2, // The index of the new leader. |
838 | 2 | WAIT_FOR_MAJORITY, |
839 | 2 | COMMIT_ONE_BY_ONE, |
840 | 2 | &last_op_id, |
841 | 2 | &rounds); |
842 | | |
843 | | // Make sure the last operation is committed everywhere |
844 | 7 | for (int i = 0; i < current_config_size - 2; i++) { |
845 | 5 | WaitForCommitIfNotAlreadyPresent(last_op_id, i, current_config_size - 2); |
846 | 5 | } |
847 | 2 | } |
848 | | // We can only verify the logs of the peers that were not killed, due to the |
849 | | // old leaders being out-of-date now. |
850 | 1 | VerifyLogs(2, 0, 1); |
851 | 1 | } |
852 | | |
853 | 1 | TEST_F(RaftConsensusQuorumTest, TestReplicasEnforceTheLogMatchingProperty) { |
854 | 1 | ASSERT_OK(BuildAndStartConfig(3)); |
855 | | |
856 | 1 | OpIdPB last_op_id; |
857 | 1 | vector<scoped_refptr<ConsensusRound> > rounds; |
858 | 1 | REPLICATE_SEQUENCE_OF_MESSAGES(10, |
859 | 1 | 2, // The index of the initial leader. |
860 | 1 | WAIT_FOR_ALL_REPLICAS, |
861 | 1 | COMMIT_ONE_BY_ONE, |
862 | 1 | &last_op_id, |
863 | 1 | &rounds); |
864 | | |
865 | | // Make sure the last operation is committed everywhere |
866 | 1 | WaitForCommitIfNotAlreadyPresent(last_op_id, 0, 2); |
867 | 1 | WaitForCommitIfNotAlreadyPresent(last_op_id, 1, 2); |
868 | | |
869 | | // Now replicas should only accept operations with |
870 | | // 'last_id' as the preceding id. |
871 | 1 | ConsensusRequestPB req; |
872 | 1 | ConsensusResponsePB resp; |
873 | | |
874 | 1 | shared_ptr<RaftConsensus> leader; |
875 | 1 | ASSERT_OK(peers_->GetPeerByIdx(2, &leader)); |
876 | | |
877 | 1 | shared_ptr<RaftConsensus> follower; |
878 | 1 | ASSERT_OK(peers_->GetPeerByIdx(0, &follower)); |
879 | | |
880 | 1 | req.set_caller_uuid(leader->peer_uuid()); |
881 | 1 | req.set_caller_term(last_op_id.term()); |
882 | 1 | req.mutable_preceding_id()->CopyFrom(last_op_id); |
883 | 1 | req.mutable_committed_op_id()->CopyFrom(last_op_id); |
884 | | |
885 | 1 | ReplicateMsg* replicate = req.add_ops(); |
886 | 1 | replicate->set_hybrid_time(clock_->Now().ToUint64()); |
887 | 1 | OpIdPB* id = replicate->mutable_id(); |
888 | 1 | id->set_term(last_op_id.term()); |
889 | 1 | id->set_index(last_op_id.index() + 1); |
890 | | // Make a copy of the OpId to be TSAN friendly. |
891 | 1 | auto req_copy = req; |
892 | 1 | auto id_copy = req_copy.mutable_ops(0)->mutable_id(); |
893 | 1 | replicate->set_op_type(NO_OP); |
894 | | |
895 | | // Appending this message to peer0 should work and update |
896 | | // its 'last_received' to 'id'. |
897 | 1 | ASSERT_OK(follower->Update(&req, &resp, CoarseBigDeadline())); |
898 | 1 | ASSERT_TRUE(OpIdEquals(resp.status().last_received(), *id)); |
899 | | |
900 | | // Now skip one message in the same term. The replica should |
901 | | // complain with the right error message. |
902 | 1 | req_copy.mutable_preceding_id()->set_index(id_copy->index() + 1); |
903 | 1 | id_copy->set_index(id_copy->index() + 2); |
904 | | // Appending this message to peer0 should return a Status::OK |
905 | | // but should contain an error referring to the log matching property. |
906 | 1 | ASSERT_OK(follower->Update(&req_copy, &resp, CoarseBigDeadline())); |
907 | 1 | ASSERT_TRUE(resp.has_status()); |
908 | 1 | ASSERT_TRUE(resp.status().has_error()); |
909 | 1 | ASSERT_EQ(resp.status().error().code(), ConsensusErrorPB::PRECEDING_ENTRY_DIDNT_MATCH); |
910 | 1 | ASSERT_STR_CONTAINS(resp.status().error().status().message(), |
911 | 1 | "Log matching property violated"); |
912 | 1 | } |
913 | | |
914 | | // Test that RequestVote performs according to "spec". |
915 | 1 | TEST_F(RaftConsensusQuorumTest, TestRequestVote) { |
916 | 1 | ASSERT_OK(BuildAndStartConfig(3)); |
917 | | |
918 | 1 | OpIdPB last_op_id; |
919 | 1 | vector<scoped_refptr<ConsensusRound> > rounds; |
920 | 1 | REPLICATE_SEQUENCE_OF_MESSAGES(10, |
921 | 1 | 2, // The index of the initial leader. |
922 | 1 | WAIT_FOR_ALL_REPLICAS, |
923 | 1 | COMMIT_ONE_BY_ONE, |
924 | 1 | &last_op_id, |
925 | 1 | &rounds); |
926 | | |
927 | | // Make sure the last operation is committed everywhere |
928 | 1 | WaitForCommitIfNotAlreadyPresent(last_op_id, 0, 2); |
929 | 1 | WaitForCommitIfNotAlreadyPresent(last_op_id, 1, 2); |
930 | | |
931 | | // Ensure last-logged OpId is > (0,0). |
932 | 1 | ASSERT_TRUE(OpIdLessThan(MinimumOpId(), last_op_id)); |
933 | | |
934 | 1 | const int kPeerIndex = 1; |
935 | 1 | shared_ptr<RaftConsensus> peer; |
936 | 1 | ASSERT_OK(peers_->GetPeerByIdx(kPeerIndex, &peer)); |
937 | | |
938 | 1 | VoteRequestPB request; |
939 | 1 | request.set_tablet_id(kTestTablet); |
940 | 1 | request.mutable_candidate_status()->mutable_last_received()->CopyFrom(last_op_id); |
941 | | |
942 | | // Test that the replica won't vote since it has recently heard from |
943 | | // a valid leader. |
944 | 1 | VoteResponsePB response; |
945 | 1 | request.set_candidate_uuid("peer-0"); |
946 | 1 | request.set_candidate_term(last_op_id.term() + 1); |
947 | 1 | ASSERT_OK(peer->RequestVote(&request, &response)); |
948 | 1 | ASSERT_FALSE(response.vote_granted()); |
949 | 1 | ASSERT_EQ(ConsensusErrorPB::LEADER_IS_ALIVE, response.consensus_error().code()); |
950 | | |
951 | | // Test that replicas only vote yes for a single peer per term. |
952 | | |
953 | | // Indicate that replicas should vote even if they think another leader is alive. |
954 | | // This will allow the rest of the requests in the test to go through. |
955 | 1 | request.set_ignore_live_leader(true); |
956 | 1 | ASSERT_OK(peer->RequestVote(&request, &response)); |
957 | 1 | ASSERT_TRUE(response.vote_granted()); |
958 | 1 | ASSERT_EQ(last_op_id.term() + 1, response.responder_term()); |
959 | 1 | ASSERT_NO_FATALS(AssertDurableTermAndVote(kPeerIndex, last_op_id.term() + 1, "peer-0")); |
960 | | |
961 | | // Ensure we get same response for same term and same UUID. |
962 | 1 | response.Clear(); |
963 | 1 | ASSERT_OK(peer->RequestVote(&request, &response)); |
964 | 1 | ASSERT_TRUE(response.vote_granted()); |
965 | | |
966 | | // Ensure we get a "no" for a different candidate UUID for that term. |
967 | 1 | response.Clear(); |
968 | 1 | request.set_candidate_uuid("peer-2"); |
969 | 1 | ASSERT_OK(peer->RequestVote(&request, &response)); |
970 | 1 | ASSERT_FALSE(response.vote_granted()); |
971 | 1 | ASSERT_TRUE(response.has_consensus_error()); |
972 | 1 | ASSERT_EQ(ConsensusErrorPB::ALREADY_VOTED, response.consensus_error().code()); |
973 | 1 | ASSERT_EQ(last_op_id.term() + 1, response.responder_term()); |
974 | 1 | ASSERT_NO_FATALS(AssertDurableTermAndVote(kPeerIndex, last_op_id.term() + 1, "peer-0")); |
975 | | |
976 | | // |
977 | | // Test that replicas refuse votes for an old term. |
978 | | // |
979 | | |
980 | | // Increase the term of our candidate, which will cause the voter replica to |
981 | | // increase its own term to match. |
982 | 1 | request.set_candidate_uuid("peer-0"); |
983 | 1 | request.set_candidate_term(last_op_id.term() + 2); |
984 | 1 | response.Clear(); |
985 | 1 | ASSERT_OK(peer->RequestVote(&request, &response)); |
986 | 1 | ASSERT_TRUE(response.vote_granted()); |
987 | 1 | ASSERT_EQ(last_op_id.term() + 2, response.responder_term()); |
988 | 1 | ASSERT_NO_FATALS(AssertDurableTermAndVote(kPeerIndex, last_op_id.term() + 2, "peer-0")); |
989 | | |
990 | | // Now try the old term. |
991 | | // Note: Use the peer who "won" the election on the previous term (peer-0), |
992 | | // although in practice the impl does not store historical vote data. |
993 | 1 | request.set_candidate_term(last_op_id.term() + 1); |
994 | 1 | response.Clear(); |
995 | 1 | ASSERT_OK(peer->RequestVote(&request, &response)); |
996 | 1 | ASSERT_FALSE(response.vote_granted()); |
997 | 1 | ASSERT_TRUE(response.has_consensus_error()); |
998 | 1 | ASSERT_EQ(ConsensusErrorPB::INVALID_TERM, response.consensus_error().code()); |
999 | 1 | ASSERT_EQ(last_op_id.term() + 2, response.responder_term()); |
1000 | 1 | ASSERT_NO_FATALS(AssertDurableTermAndVote(kPeerIndex, last_op_id.term() + 2, "peer-0")); |
1001 | | |
1002 | | // |
1003 | | // Ensure replicas vote no for an old op index. |
1004 | | // |
1005 | | |
1006 | 1 | request.set_candidate_uuid("peer-0"); |
1007 | 1 | request.set_candidate_term(last_op_id.term() + 3); |
1008 | 1 | request.mutable_candidate_status()->mutable_last_received()->CopyFrom(MinimumOpId()); |
1009 | 1 | response.Clear(); |
1010 | 1 | ASSERT_OK(peer->RequestVote(&request, &response)); |
1011 | 1 | ASSERT_FALSE(response.vote_granted()); |
1012 | 1 | ASSERT_TRUE(response.has_consensus_error()); |
1013 | 1 | ASSERT_EQ(ConsensusErrorPB::LAST_OPID_TOO_OLD, response.consensus_error().code()); |
1014 | 1 | ASSERT_EQ(last_op_id.term() + 3, response.responder_term()); |
1015 | 1 | ASSERT_NO_FATALS(AssertDurableTermWithoutVote(kPeerIndex, last_op_id.term() + 3)); |
1016 | | |
1017 | | // Send a "heartbeat" to the peer. It should be rejected. |
1018 | 1 | ConsensusRequestPB req; |
1019 | 1 | req.set_caller_term(last_op_id.term()); |
1020 | 1 | req.set_caller_uuid("peer-0"); |
1021 | 1 | req.mutable_committed_op_id()->CopyFrom(last_op_id); |
1022 | 1 | ConsensusResponsePB res; |
1023 | 1 | Status s = peer->Update(&req, &res, CoarseBigDeadline()); |
1024 | 1 | ASSERT_EQ(last_op_id.term() + 3, res.responder_term()); |
1025 | 1 | ASSERT_TRUE(res.status().has_error()); |
1026 | 1 | ASSERT_EQ(ConsensusErrorPB::INVALID_TERM, res.status().error().code()); |
1027 | 1 | LOG(INFO) << "Follower rejected old heartbeat, as expected: " << res.ShortDebugString(); |
1028 | 1 | } |
1029 | | |
1030 | | } // namespace consensus |
1031 | | } // namespace yb |