/Users/deen/code/yugabyte-db/src/yb/consensus/raft_consensus-test.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | // |
18 | | // The following only applies to changes made to this file as part of YugaByte development. |
19 | | // |
20 | | // Portions Copyright (c) YugaByte, Inc. |
21 | | // |
22 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
23 | | // in compliance with the License. You may obtain a copy of the License at |
24 | | // |
25 | | // http://www.apache.org/licenses/LICENSE-2.0 |
26 | | // |
27 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
28 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
29 | | // or implied. See the License for the specific language governing permissions and limitations |
30 | | // under the License. |
31 | | // |
32 | | |
33 | | #include <gmock/gmock.h> |
34 | | #include <gtest/gtest.h> |
35 | | |
36 | | #include "yb/common/schema.h" |
37 | | #include "yb/common/wire_protocol-test-util.h" |
38 | | |
39 | | #include "yb/consensus/consensus-test-util.h" |
40 | | #include "yb/consensus/consensus_types.h" |
41 | | #include "yb/consensus/log.h" |
42 | | #include "yb/consensus/peer_manager.h" |
43 | | |
44 | | #include "yb/fs/fs_manager.h" |
45 | | |
46 | | #include "yb/gutil/bind.h" |
47 | | #include "yb/gutil/stl_util.h" |
48 | | |
49 | | #include "yb/server/logical_clock.h" |
50 | | |
51 | | #include "yb/util/async_util.h" |
52 | | #include "yb/util/mem_tracker.h" |
53 | | #include "yb/util/metrics.h" |
54 | | #include "yb/util/status_log.h" |
55 | | #include "yb/util/test_macros.h" |
56 | | #include "yb/util/test_util.h" |
57 | | |
58 | | DECLARE_bool(enable_leader_failure_detection); |
59 | | DECLARE_bool(never_fsync); |
60 | | |
61 | | METRIC_DECLARE_entity(table); |
62 | | METRIC_DECLARE_entity(tablet); |
63 | | |
64 | | using std::shared_ptr; |
65 | | using std::string; |
66 | | |
67 | | namespace yb { |
68 | | namespace consensus { |
69 | | |
70 | | using log::Log; |
71 | | using log::LogOptions; |
72 | | using ::testing::_; |
73 | | using ::testing::AnyNumber; |
74 | | using ::testing::AtLeast; |
75 | | using ::testing::Eq; |
76 | | using ::testing::InSequence; |
77 | | using ::testing::Invoke; |
78 | | using ::testing::Mock; |
79 | | using ::testing::Property; |
80 | | using ::testing::Return; |
81 | | |
82 | | const char* kTestTable = "TestTable"; |
83 | | const char* kTestTablet = "TestTablet"; |
84 | | const char* kLocalPeerUuid = "peer-0"; |
85 | | |
86 | | // A simple map to collect the results of a sequence of transactions. |
87 | | typedef std::map<OpIdPB, Status, OpIdCompareFunctor> StatusesMap; |
88 | | |
89 | | class MockQueue : public PeerMessageQueue { |
90 | | public: |
91 | | explicit MockQueue(const scoped_refptr<MetricEntity>& tablet_metric_entity, log::Log* log, |
92 | | const server::ClockPtr& clock, |
93 | | std::unique_ptr<ThreadPoolToken> raft_pool_observers_token) |
94 | | : PeerMessageQueue( |
95 | | tablet_metric_entity, log, nullptr /* server_tracker */, nullptr /* parent_tracker */, |
96 | | FakeRaftPeerPB(kLocalPeerUuid), kTestTablet, clock, nullptr /* consensus_queue */, |
97 | 6 | std::move(raft_pool_observers_token)) {} |
98 | | |
99 | | MOCK_METHOD1(Init, void(const OpId& locally_replicated_index)); |
100 | | MOCK_METHOD4(SetLeaderMode, void(const OpId& committed_opid, |
101 | | int64_t current_term, |
102 | | const OpId& last_applied_op_id, |
103 | | const RaftConfigPB& active_config)); |
104 | | MOCK_METHOD0(SetNonLeaderMode, void()); |
105 | | Status AppendOperations(const ReplicateMsgs& msgs, |
106 | | const yb::OpId& committed_op_id, |
107 | 46 | RestartSafeCoarseTimePoint time) override { |
108 | 46 | return AppendOperationsMock(msgs, committed_op_id, time); |
109 | 46 | } |
110 | | MOCK_METHOD3(AppendOperationsMock, Status(const ReplicateMsgs& msgs, |
111 | | const yb::OpId& committed_op_id, |
112 | | RestartSafeCoarseTimePoint time)); |
113 | | MOCK_METHOD1(TrackPeer, void(const string&)); |
114 | | MOCK_METHOD1(UntrackPeer, void(const string&)); |
115 | | MOCK_METHOD6(RequestForPeer, Status(const std::string& uuid, |
116 | | ConsensusRequestPB* request, |
117 | | ReplicateMsgsHolder* msgs_holder, |
118 | | bool* needs_remote_bootstrap, |
119 | | PeerMemberType* member_type, |
120 | | bool* last_exchange_successful)); |
121 | | MOCK_METHOD2(ResponseFromPeer, bool(const std::string& peer_uuid, |
122 | | const ConsensusResponsePB& response)); |
123 | | MOCK_METHOD0(Close, void()); |
124 | | }; |
125 | | |
126 | | class MockPeerManager : public PeerManager { |
127 | | public: |
128 | 6 | MockPeerManager() : PeerManager("", "", nullptr, nullptr, nullptr, nullptr) {} |
129 | | MOCK_METHOD1(UpdateRaftConfig, void(const consensus::RaftConfigPB& config)); |
130 | | MOCK_METHOD1(SignalRequest, void(RequestTriggerMode trigger_mode)); |
131 | | MOCK_METHOD0(Close, void()); |
132 | | }; |
133 | | |
134 | | class RaftConsensusSpy : public RaftConsensus { |
135 | | public: |
136 | | typedef Callback<Status(const scoped_refptr<ConsensusRound>& round)> AppendCallback; |
137 | | |
138 | | RaftConsensusSpy(const ConsensusOptions& options, |
139 | | std::unique_ptr<ConsensusMetadata> cmeta, |
140 | | std::unique_ptr<PeerProxyFactory> proxy_factory, |
141 | | std::unique_ptr<PeerMessageQueue> queue, |
142 | | std::unique_ptr<PeerManager> peer_manager, |
143 | | std::unique_ptr<ThreadPoolToken> raft_pool_token, |
144 | | const scoped_refptr<MetricEntity>& table_metric_entity, |
145 | | const scoped_refptr<MetricEntity>& tablet_metric_entity, |
146 | | const std::string& peer_uuid, |
147 | | const scoped_refptr<server::Clock>& clock, |
148 | | ConsensusContext* consensus_context, |
149 | | const scoped_refptr<log::Log>& log, |
150 | | const shared_ptr<MemTracker>& parent_mem_tracker, |
151 | | const Callback<void(std::shared_ptr<consensus::StateChangeContext> context)>& |
152 | | mark_dirty_clbk) |
153 | | : RaftConsensus(options, |
154 | | std::move(cmeta), |
155 | | std::move(proxy_factory), |
156 | | std::move(queue), |
157 | | std::move(peer_manager), |
158 | | std::move(raft_pool_token), |
159 | | table_metric_entity, |
160 | | tablet_metric_entity, |
161 | | peer_uuid, |
162 | | clock, |
163 | | consensus_context, |
164 | | log, |
165 | | parent_mem_tracker, |
166 | | mark_dirty_clbk, |
167 | | YQL_TABLE_TYPE, |
168 | 6 | nullptr /* retryable_requests */) { |
169 | | // These "aliases" allow us to count invocations and assert on them. |
170 | 6 | ON_CALL(*this, StartConsensusOnlyRoundUnlocked(_)) |
171 | 6 | .WillByDefault(Invoke(this, |
172 | 6 | &RaftConsensusSpy::StartNonLeaderConsensusRoundUnlockedConcrete)); |
173 | 6 | ON_CALL(*this, NonTrackedRoundReplicationFinished(_, _, _)) |
174 | 6 | .WillByDefault(Invoke(this, &RaftConsensusSpy::NonTrackedRoundReplicationFinishedConcrete)); |
175 | 6 | } |
176 | | |
177 | | MOCK_METHOD1(AppendNewRoundToQueueUnlocked, Status(const scoped_refptr<ConsensusRound>& round)); |
178 | 5 | Status AppendNewRoundToQueueUnlockedConcrete(const scoped_refptr<ConsensusRound>& round) { |
179 | 5 | return RaftConsensus::AppendNewRoundToQueueUnlocked(round); |
180 | 5 | } |
181 | | |
182 | | MOCK_METHOD2(AppendNewRoundsToQueueUnlocked, Status( |
183 | | const ConsensusRounds& rounds, size_t* processed_rounds)); |
184 | | Status AppendNewRoundsToQueueUnlockedConcrete( |
185 | 26 | const ConsensusRounds& rounds, size_t* processed_rounds) { |
186 | 26 | return RaftConsensus::AppendNewRoundsToQueueUnlocked(rounds, processed_rounds); |
187 | 26 | } |
188 | | |
189 | | MOCK_METHOD1(StartConsensusOnlyRoundUnlocked, Status(const ReplicateMsgPtr& msg)); |
190 | 18 | Status StartNonLeaderConsensusRoundUnlockedConcrete(const ReplicateMsgPtr& msg) { |
191 | 18 | return RaftConsensus::StartConsensusOnlyRoundUnlocked(msg); |
192 | 18 | } |
193 | | |
194 | | MOCK_METHOD3(NonTrackedRoundReplicationFinished, void(ConsensusRound* round, |
195 | | const StdStatusCallback& client_cb, |
196 | | const Status& status)); |
197 | | void NonTrackedRoundReplicationFinishedConcrete(ConsensusRound* round, |
198 | | const StdStatusCallback& client_cb, |
199 | 40 | const Status& status) { |
200 | 40 | LOG(INFO) << "Round " << round->id() << " finished with status: " << status; |
201 | 40 | } |
202 | | |
203 | | private: |
204 | | DISALLOW_COPY_AND_ASSIGN(RaftConsensusSpy); |
205 | | }; |
206 | | |
207 | 473 | void DoNothing(std::shared_ptr<consensus::StateChangeContext> context) { |
208 | 473 | } |
209 | | |
210 | | class RaftConsensusTest : public YBTest { |
211 | | public: |
212 | | RaftConsensusTest() |
213 | | : clock_(server::LogicalClock::CreateStartingAt(HybridTime(0))), |
214 | | table_metric_entity_( |
215 | | METRIC_ENTITY_table.Instantiate(&metric_registry_, "raft-consensus-test-table")), |
216 | | tablet_metric_entity_( |
217 | | METRIC_ENTITY_tablet.Instantiate(&metric_registry_, "raft-consensus-test-tablet")), |
218 | 6 | schema_(GetSimpleTestSchema()) { |
219 | 6 | FLAGS_enable_leader_failure_detection = false; |
220 | 6 | options_.tablet_id = kTestTablet; |
221 | 6 | } |
222 | | |
223 | 6 | void SetUp() override { |
224 | 6 | YBTest::SetUp(); |
225 | | |
226 | 6 | LogOptions options; |
227 | 6 | string test_path = GetTestPath("test-peer-root"); |
228 | | |
229 | | // TODO mock the Log too, since we're gonna mock the queue |
230 | | // monitors and pretty much everything else. |
231 | 6 | fs_manager_.reset(new FsManager(env_.get(), test_path, "tserver_test")); |
232 | 6 | ASSERT_OK(fs_manager_->CreateInitialFileSystemLayout()); |
233 | 6 | ASSERT_OK(fs_manager_->Open()); |
234 | 6 | ASSERT_OK(ThreadPoolBuilder("log").Build(&log_thread_pool_)); |
235 | 6 | ASSERT_OK(Log::Open(LogOptions(), |
236 | 6 | kTestTablet, |
237 | 6 | fs_manager_->GetFirstTabletWalDirOrDie(kTestTable, kTestTablet), |
238 | 6 | fs_manager_->uuid(), |
239 | 6 | schema_, |
240 | 6 | 0, // schema_version |
241 | 6 | nullptr, // table_metric_entity |
242 | 6 | nullptr, // tablet_metric_entity |
243 | 6 | log_thread_pool_.get(), |
244 | 6 | log_thread_pool_.get(), |
245 | 6 | std::numeric_limits<int64_t>::max(), // cdc_min_replicated_index |
246 | 6 | &log_)); |
247 | | |
248 | 6 | log_->TEST_SetAllOpIdsSafe(true); |
249 | | |
250 | 6 | ASSERT_OK(ThreadPoolBuilder("raft-pool").Build(&raft_pool_)); |
251 | 6 | std::unique_ptr<ThreadPoolToken> raft_pool_token = |
252 | 6 | raft_pool_->NewToken(ThreadPool::ExecutionMode::CONCURRENT); |
253 | 6 | queue_ = new MockQueue(tablet_metric_entity_, log_.get(), clock_, std::move(raft_pool_token)); |
254 | 6 | peer_manager_ = new MockPeerManager; |
255 | 6 | operation_factory_.reset(new MockOperationFactory); |
256 | | |
257 | 6 | ON_CALL(*queue_, AppendOperationsMock(_, _, _)) |
258 | 6 | .WillByDefault(Invoke(this, &RaftConsensusTest::AppendToLog)); |
259 | 6 | } |
260 | | |
261 | 6 | void SetUpConsensus(int64_t initial_term = consensus::kMinimumTerm, int num_peers = 1) { |
262 | 6 | config_ = BuildRaftConfigPBForTests(num_peers); |
263 | 6 | config_.set_opid_index(kInvalidOpIdIndex); |
264 | | |
265 | 6 | auto proxy_factory = std::make_unique<LocalTestPeerProxyFactory>(nullptr); |
266 | | |
267 | 6 | string peer_uuid = config_.peers(num_peers - 1).permanent_uuid(); |
268 | | |
269 | 6 | std::unique_ptr<ConsensusMetadata> cmeta; |
270 | 6 | ASSERT_OK(ConsensusMetadata::Create(fs_manager_.get(), kTestTablet, peer_uuid, |
271 | 6 | config_, initial_term, &cmeta)); |
272 | | |
273 | 6 | std::unique_ptr<ThreadPoolToken> raft_pool_token = |
274 | 6 | raft_pool_->NewToken(ThreadPool::ExecutionMode::CONCURRENT); |
275 | | |
276 | 6 | consensus_.reset(new RaftConsensusSpy(options_, |
277 | 6 | std::move(cmeta), |
278 | 6 | std::move(proxy_factory), |
279 | 6 | std::unique_ptr<PeerMessageQueue>(queue_), |
280 | 6 | std::unique_ptr<PeerManager>(peer_manager_), |
281 | 6 | std::move(raft_pool_token), |
282 | 6 | table_metric_entity_, |
283 | 6 | tablet_metric_entity_, |
284 | 6 | peer_uuid, |
285 | 6 | clock_, |
286 | 6 | operation_factory_.get(), |
287 | 6 | log_.get(), |
288 | 6 | MemTracker::GetRootTracker(), |
289 | 6 | Bind(&DoNothing))); |
290 | | |
291 | 6 | ON_CALL(*consensus_.get(), AppendNewRoundToQueueUnlocked(_)) |
292 | 6 | .WillByDefault(Invoke(this, &RaftConsensusTest::MockAppendNewRound)); |
293 | 6 | ON_CALL(*consensus_.get(), AppendNewRoundsToQueueUnlocked(_, _)) |
294 | 6 | .WillByDefault(Invoke(this, &RaftConsensusTest::MockAppendNewRounds)); |
295 | 6 | } |
296 | | |
297 | | Status AppendToLog(const ReplicateMsgs& msgs, |
298 | | const yb::OpId& committed_op_id, |
299 | 18 | RestartSafeCoarseTimePoint time) { |
300 | 18 | return log_->AsyncAppendReplicates(msgs, committed_op_id, time, |
301 | 18 | Bind(LogAppendCallback)); |
302 | 18 | } |
303 | | |
304 | 18 | static void LogAppendCallback(const Status& s) { |
305 | 18 | ASSERT_OK(s); |
306 | 18 | } |
307 | | |
308 | 5 | Status MockAppendNewRound(const scoped_refptr<ConsensusRound>& round) { |
309 | 5 | return consensus_->AppendNewRoundToQueueUnlockedConcrete(round); |
310 | 5 | } |
311 | | |
312 | 26 | Status MockAppendNewRounds(const ConsensusRounds& rounds, size_t* processed_rounds) { |
313 | 26 | for (const auto& round : rounds) { |
314 | 26 | rounds_.push_back(round); |
315 | 26 | } |
316 | 26 | RETURN_NOT_OK(consensus_->AppendNewRoundsToQueueUnlockedConcrete(rounds, processed_rounds)); |
317 | 26 | for (const auto& round : rounds) { |
318 | 26 | LOG(INFO) << "Round append: " << round->id() << ", ReplicateMsg: " |
319 | 26 | << round->replicate_msg()->ShortDebugString(); |
320 | 26 | } |
321 | 26 | return Status::OK(); |
322 | 26 | } |
323 | | |
324 | 3 | void SetUpGeneralExpectations() { |
325 | 3 | EXPECT_CALL(*peer_manager_, SignalRequest(_)) |
326 | 3 | .Times(AnyNumber()); |
327 | 3 | EXPECT_CALL(*peer_manager_, Close()) |
328 | 3 | .Times(AtLeast(1)); |
329 | 3 | EXPECT_CALL(*queue_, Close()) |
330 | 3 | .Times(1); |
331 | 3 | EXPECT_CALL(*consensus_.get(), AppendNewRoundToQueueUnlocked(_)) |
332 | 3 | .Times(AnyNumber()); |
333 | 3 | } |
334 | | |
335 | | // Create a ConsensusRequestPB suitable to send to a peer. |
336 | | ConsensusRequestPB MakeConsensusRequest(int64_t caller_term, |
337 | | const string& caller_uuid, |
338 | | const OpIdPB& preceding_opid); |
339 | | |
340 | | // Add a single no-op with the given OpId to a ConsensusRequestPB. |
341 | | void AddNoOpToConsensusRequest(ConsensusRequestPB* request, const OpIdPB& noop_opid); |
342 | | |
343 | 21 | scoped_refptr<ConsensusRound> AppendNoOpRound() { |
344 | 21 | auto replicate_ptr = std::make_shared<ReplicateMsg>(); |
345 | 21 | replicate_ptr->set_op_type(NO_OP); |
346 | 21 | replicate_ptr->set_hybrid_time(clock_->Now().ToUint64()); |
347 | 21 | scoped_refptr<ConsensusRound> round(new ConsensusRound(consensus_.get(), |
348 | 21 | std::move(replicate_ptr))); |
349 | 21 | round->SetCallback(MakeNonTrackedRoundCallback( |
350 | 21 | round.get(), |
351 | 21 | std::bind(&RaftConsensusSpy::NonTrackedRoundReplicationFinished, |
352 | 21 | consensus_.get(), round.get(), &DoNothingStatusCB, std::placeholders::_1))); |
353 | 21 | round->BindToTerm(consensus_->TEST_LeaderTerm()); |
354 | | |
355 | 21 | CHECK_OK(consensus_->TEST_Replicate(round)); |
356 | 21 | LOG(INFO) << "Appended NO_OP round with opid " << round->id(); |
357 | 21 | return round; |
358 | 21 | } |
359 | | |
360 | 1 | void DumpRounds() { |
361 | 1 | LOG(INFO) << "Dumping rounds..."; |
362 | 3 | for (const scoped_refptr<ConsensusRound>& round : rounds_) { |
363 | 3 | LOG(INFO) << "Round: OpId " << round->id() << ", ReplicateMsg: " |
364 | 3 | << round->replicate_msg()->ShortDebugString(); |
365 | 3 | } |
366 | 1 | } |
367 | | |
368 | | protected: |
369 | | std::unique_ptr<ThreadPool> raft_pool_; |
370 | | ConsensusOptions options_; |
371 | | RaftConfigPB config_; |
372 | | OpIdPB initial_id_; |
373 | | std::unique_ptr<FsManager> fs_manager_; |
374 | | std::unique_ptr<ThreadPool> log_thread_pool_; |
375 | | scoped_refptr<Log> log_; |
376 | | std::unique_ptr<PeerProxyFactory> proxy_factory_; |
377 | | scoped_refptr<server::Clock> clock_; |
378 | | MetricRegistry metric_registry_; |
379 | | scoped_refptr<MetricEntity> table_metric_entity_; |
380 | | scoped_refptr<MetricEntity> tablet_metric_entity_; |
381 | | const Schema schema_; |
382 | | shared_ptr<RaftConsensusSpy> consensus_; |
383 | | |
384 | | vector<scoped_refptr<ConsensusRound> > rounds_; |
385 | | |
386 | | // Mocks. |
387 | | // NOTE: both 'queue_' and 'peer_manager_' belong to 'consensus_' and may be deleted before |
388 | | // the test is. |
389 | | MockQueue* queue_; |
390 | | MockPeerManager* peer_manager_; |
391 | | std::unique_ptr<MockOperationFactory> operation_factory_; |
392 | | }; |
393 | | |
394 | | ConsensusRequestPB RaftConsensusTest::MakeConsensusRequest(int64_t caller_term, |
395 | | const string& caller_uuid, |
396 | 5 | const OpIdPB& preceding_opid) { |
397 | 5 | ConsensusRequestPB request; |
398 | 5 | request.set_caller_term(caller_term); |
399 | 5 | request.set_caller_uuid(caller_uuid); |
400 | 5 | request.set_tablet_id(kTestTablet); |
401 | 5 | *request.mutable_preceding_id() = preceding_opid; |
402 | 5 | return request; |
403 | 5 | } |
404 | | |
405 | | void RaftConsensusTest::AddNoOpToConsensusRequest(ConsensusRequestPB* request, |
406 | 4 | const OpIdPB& noop_opid) { |
407 | 4 | ReplicateMsg* noop_msg = request->add_ops(); |
408 | 4 | *noop_msg->mutable_id() = noop_opid; |
409 | 4 | noop_msg->set_op_type(NO_OP); |
410 | 4 | noop_msg->set_hybrid_time(clock_->Now().ToUint64()); |
411 | 4 | noop_msg->mutable_noop_request(); |
412 | 4 | } |
413 | | |
414 | | // Tests that the committed index moves along with the majority replicated |
415 | | // index when the terms are the same. |
416 | 1 | TEST_F(RaftConsensusTest, TestCommittedIndexWhenInSameTerm) { |
417 | 1 | SetUpConsensus(); |
418 | 1 | SetUpGeneralExpectations(); |
419 | 1 | EXPECT_CALL(*peer_manager_, UpdateRaftConfig(_)) |
420 | 1 | .Times(1); |
421 | 1 | EXPECT_CALL(*queue_, Init(_)) |
422 | 1 | .Times(1); |
423 | 1 | EXPECT_CALL(*queue_, SetLeaderMode(_, _, _, _)) |
424 | 1 | .Times(1); |
425 | 1 | EXPECT_CALL(*consensus_.get(), AppendNewRoundToQueueUnlocked(_)) |
426 | 1 | .Times(1); |
427 | 1 | EXPECT_CALL(*consensus_.get(), AppendNewRoundsToQueueUnlocked(_, _)) |
428 | 1 | .Times(11); |
429 | 1 | EXPECT_CALL(*queue_, AppendOperationsMock(_, _, _)) |
430 | 1 | .Times(22).WillRepeatedly(Return(Status::OK())); |
431 | | |
432 | 1 | ConsensusBootstrapInfo info; |
433 | 1 | ASSERT_OK(consensus_->Start(info)); |
434 | 1 | ASSERT_OK(consensus_->EmulateElection()); |
435 | | |
436 | | // Commit the first noop round, created on EmulateElection(); |
437 | 1 | OpId committed_index; |
438 | 1 | OpId last_applied_op_id; |
439 | 1 | consensus_->TEST_UpdateMajorityReplicated( |
440 | 1 | rounds_[0]->id(), &committed_index, &last_applied_op_id); |
441 | 1 | ASSERT_EQ(rounds_[0]->id(), committed_index); |
442 | 1 | ASSERT_EQ(last_applied_op_id, rounds_[0]->id()); |
443 | | |
444 | | // Append 10 rounds |
445 | 11 | for (int i = 0; i < 10; i++) { |
446 | 10 | scoped_refptr<ConsensusRound> round = AppendNoOpRound(); |
447 | | // queue reports majority replicated index in the leader's term |
448 | | // committed index should move accordingly. |
449 | 10 | consensus_->TEST_UpdateMajorityReplicated( |
450 | 10 | round->id(), &committed_index, &last_applied_op_id); |
451 | 10 | ASSERT_EQ(last_applied_op_id, round->id()); |
452 | 10 | } |
453 | 1 | } |
454 | | |
455 | | // Tests that, when terms change, the commit index only advances when the majority |
456 | | // replicated index is in the current term. |
457 | 1 | TEST_F(RaftConsensusTest, TestCommittedIndexWhenTermsChange) { |
458 | 1 | SetUpConsensus(); |
459 | 1 | SetUpGeneralExpectations(); |
460 | 1 | EXPECT_CALL(*peer_manager_, UpdateRaftConfig(_)) |
461 | 1 | .Times(2); |
462 | 1 | EXPECT_CALL(*queue_, Init(_)) |
463 | 1 | .Times(1); |
464 | 1 | EXPECT_CALL(*queue_, SetLeaderMode(_, _, _, _)) |
465 | 1 | .Times(2); |
466 | 1 | EXPECT_CALL(*consensus_.get(), AppendNewRoundsToQueueUnlocked(_, _)) |
467 | 1 | .Times(3); |
468 | 1 | EXPECT_CALL(*consensus_.get(), AppendNewRoundToQueueUnlocked(_)) |
469 | 1 | .Times(2); |
470 | 1 | EXPECT_CALL(*queue_, AppendOperationsMock(_, _, _)) |
471 | 1 | .Times(5).WillRepeatedly(Return(Status::OK()));; |
472 | | |
473 | 1 | ConsensusBootstrapInfo info; |
474 | 1 | ASSERT_OK(consensus_->Start(info)); |
475 | 1 | ASSERT_OK(consensus_->EmulateElection()); |
476 | | |
477 | 1 | OpId committed_index; |
478 | 1 | OpId last_applied_op_id; |
479 | 1 | consensus_->TEST_UpdateMajorityReplicated( |
480 | 1 | rounds_[0]->id(), &committed_index, &last_applied_op_id); |
481 | 1 | ASSERT_EQ(rounds_[0]->id(), committed_index); |
482 | 1 | ASSERT_EQ(last_applied_op_id, rounds_[0]->id()); |
483 | | |
484 | | // Append another round in the current term (besides the original config round). |
485 | 1 | scoped_refptr<ConsensusRound> round = AppendNoOpRound(); |
486 | | |
487 | | // Now emulate an election, the same guy will be leader but the term |
488 | | // will change. |
489 | 1 | ASSERT_OK(consensus_->EmulateElection()); |
490 | | |
491 | | // Now tell consensus that 'round' has been majority replicated, this _shouldn't_ |
492 | | // advance the committed index, since that belongs to a previous term. |
493 | 1 | OpId new_committed_index; |
494 | 1 | OpId new_last_applied_op_id; |
495 | 1 | consensus_->TEST_UpdateMajorityReplicated( |
496 | 1 | round->id(), &new_committed_index, &new_last_applied_op_id); |
497 | 1 | ASSERT_EQ(committed_index, new_committed_index); |
498 | 1 | ASSERT_EQ(last_applied_op_id, new_last_applied_op_id); |
499 | | |
500 | 1 | const scoped_refptr<ConsensusRound>& last_config_round = rounds_[2]; |
501 | | |
502 | | // Now notify that the last change config was committed, this should advance the |
503 | | // commit index to the id of the last change config. |
504 | 1 | consensus_->TEST_UpdateMajorityReplicated( |
505 | 1 | last_config_round->id(), &committed_index, &last_applied_op_id); |
506 | | |
507 | 1 | DumpRounds(); |
508 | 1 | ASSERT_EQ(last_config_round->id(), committed_index); |
509 | 1 | ASSERT_EQ(last_applied_op_id, last_config_round->id()); |
510 | 1 | } |
511 | | |
512 | | // Asserts that a ConsensusRound has an OpId set in its ReplicateMsg. |
513 | 11 | MATCHER(HasOpId, "") { return !arg->id().empty(); } |
514 | | |
515 | | // These matchers assert that a Status object is of a certain type. |
516 | 20 | MATCHER(IsOk, "") { return arg.ok(); } |
517 | 6 | MATCHER(IsAborted, "") { return arg.IsAborted(); } |
518 | | |
519 | | // Tests that consensus is able to handle pending operations. It tests this in two ways: |
520 | | // - It tests that consensus does the right thing with pending transactions from the WAL. |
521 | | // - It tests that when a follower gets promoted to leader it does the right thing |
522 | | // with the pending operations. |
523 | 1 | TEST_F(RaftConsensusTest, TestPendingOperations) { |
524 | 1 | SetUpConsensus(10); |
525 | | |
526 | | // Emulate a stateful system by having a bunch of operations in flight when consensus starts. |
527 | | // Specifically we emulate we're on term 10, with 10 operations that have not been committed yet. |
528 | 1 | ConsensusBootstrapInfo info; |
529 | 1 | info.last_id.set_term(10); |
530 | 11 | for (int i = 0; i < 10; i++) { |
531 | 10 | auto replicate = std::make_shared<ReplicateMsg>(); |
532 | 10 | replicate->set_op_type(NO_OP); |
533 | 10 | info.last_id.set_index(100 + i); |
534 | 10 | replicate->mutable_id()->CopyFrom(info.last_id); |
535 | 10 | info.orphaned_replicates.push_back(replicate); |
536 | 10 | } |
537 | | |
538 | 1 | info.last_committed_id.set_term(10); |
539 | 1 | info.last_committed_id.set_index(99); |
540 | | |
541 | 1 | { |
542 | 1 | InSequence dummy; |
543 | | // On start we expect 10 NO_OPs to be enqueued. |
544 | 1 | EXPECT_CALL(*consensus_.get(), StartConsensusOnlyRoundUnlocked(_)) |
545 | 1 | .Times(10); |
546 | | |
547 | | // Queue gets initted when the peer starts. |
548 | 1 | EXPECT_CALL(*queue_, Init(_)) |
549 | 1 | .Times(1); |
550 | 1 | } |
551 | 1 | ASSERT_OK(consensus_->Start(info)); |
552 | | |
553 | 1 | ASSERT_TRUE(testing::Mock::VerifyAndClearExpectations(queue_)); |
554 | 1 | ASSERT_TRUE(testing::Mock::VerifyAndClearExpectations(operation_factory_.get())); |
555 | 1 | ASSERT_TRUE(testing::Mock::VerifyAndClearExpectations(peer_manager_)); |
556 | 1 | ASSERT_TRUE(testing::Mock::VerifyAndClearExpectations(consensus_.get())); |
557 | | |
558 | | // Now we test what this peer does with the pending operations once it's elected leader. |
559 | 1 | { |
560 | 1 | InSequence dummy; |
561 | | // Peer manager gets updated with the new set of peers to send stuff to. |
562 | 1 | EXPECT_CALL(*peer_manager_, UpdateRaftConfig(_)) |
563 | 1 | .Times(1); |
564 | | // The no-op should be appended to the queue. |
565 | 1 | EXPECT_CALL(*consensus_.get(), AppendNewRoundToQueueUnlocked(_)) |
566 | 1 | .Times(1); |
567 | | // One more op will be appended for the election. |
568 | 1 | EXPECT_CALL(*queue_, AppendOperationsMock(_, _, _)) |
569 | 1 | .Times(1).WillRepeatedly(Return(Status::OK()));; |
570 | 1 | } |
571 | | |
572 | | // Emulate an election, this will make this peer become leader and trigger the |
573 | | // above set expectations. |
574 | 1 | ASSERT_OK(consensus_->EmulateElection()); |
575 | | |
576 | 1 | ASSERT_TRUE(testing::Mock::VerifyAndClearExpectations(queue_)); |
577 | 1 | ASSERT_TRUE(testing::Mock::VerifyAndClearExpectations(operation_factory_.get())); |
578 | 1 | ASSERT_TRUE(testing::Mock::VerifyAndClearExpectations(peer_manager_)); |
579 | | |
580 | | // Commit the 10 no-ops from the previous term, along with the one pushed to |
581 | | // assert leadership. |
582 | 1 | EXPECT_CALL(*consensus_.get(), NonTrackedRoundReplicationFinished(HasOpId(), _, IsOk())) |
583 | 1 | .Times(11); |
584 | 1 | EXPECT_CALL(*peer_manager_, SignalRequest(_)) |
585 | 1 | .Times(AnyNumber()); |
586 | | // In the end peer manager and the queue get closed. |
587 | 1 | EXPECT_CALL(*peer_manager_, Close()) |
588 | 1 | .Times(AtLeast(1)); |
589 | 1 | EXPECT_CALL(*queue_, Close()) |
590 | 1 | .Times(1); |
591 | | |
592 | | // Now tell consensus all original orphaned replicates were majority replicated. |
593 | | // This should not advance the committed index because we haven't replicated |
594 | | // anything in the current term. |
595 | 1 | OpId committed_index; |
596 | 1 | OpId last_applied_op_id; |
597 | 1 | consensus_->TEST_UpdateMajorityReplicated( |
598 | 1 | OpId::FromPB(info.orphaned_replicates.back()->id()), &committed_index, &last_applied_op_id); |
599 | | // Should still be the last committed in the wal. |
600 | 1 | ASSERT_EQ(committed_index, OpId::FromPB(info.last_committed_id)); |
601 | 1 | ASSERT_EQ(last_applied_op_id, OpId::FromPB(info.last_committed_id)); |
602 | | |
603 | | // Now mark the last operation (the no-op round) as committed. |
604 | | // This should advance the committed index, since that round in on our current term, |
605 | | // and we should be able to commit all previous rounds. |
606 | 1 | OpId cc_round_id = OpId::FromPB(info.orphaned_replicates.back()->id()); |
607 | 1 | cc_round_id.term = 11; |
608 | | |
609 | | // +1 here because index is incremented during emulated election. |
610 | 1 | ++cc_round_id.index; |
611 | 1 | consensus_->TEST_UpdateMajorityReplicated(cc_round_id, &committed_index, &last_applied_op_id); |
612 | 1 | ASSERT_EQ(committed_index, cc_round_id); |
613 | 1 | ASSERT_EQ(last_applied_op_id, cc_round_id); |
614 | 1 | } |
615 | | |
616 | 84 | MATCHER_P2(RoundHasOpId, term, index, "") { |
617 | 84 | LOG(INFO) << "expected: " << MakeOpId(term, index) << ", actual: " << arg->id(); |
618 | 84 | return arg->id().term == term && arg->id().index == index; |
619 | 84 | } |
620 | | |
621 | | // Tests the case where a leader is elected and pushed a sequence of |
622 | | // operations of which some never get committed. Eventually a new leader in a higher |
623 | | // term pushes operations that overwrite some of the original indexes. |
624 | 1 | TEST_F(RaftConsensusTest, TestAbortOperations) { |
625 | 1 | SetUpConsensus(1, 2); |
626 | | |
627 | 1 | EXPECT_CALL(*consensus_.get(), AppendNewRoundToQueueUnlocked(_)) |
628 | 1 | .Times(AnyNumber()); |
629 | | |
630 | 1 | EXPECT_CALL(*peer_manager_, SignalRequest(_)) |
631 | 1 | .Times(AnyNumber()); |
632 | 1 | EXPECT_CALL(*peer_manager_, Close()) |
633 | 1 | .Times(AtLeast(1)); |
634 | 1 | EXPECT_CALL(*queue_, Close()) |
635 | 1 | .Times(1); |
636 | 1 | EXPECT_CALL(*queue_, Init(_)) |
637 | 1 | .Times(1); |
638 | 1 | EXPECT_CALL(*peer_manager_, UpdateRaftConfig(_)) |
639 | 1 | .Times(1); |
640 | | |
641 | | // We'll append to the queue 12 times, the initial noop txn + 10 initial ops while leader |
642 | | // and the new leader's update, when we're overwriting operations. |
643 | 1 | EXPECT_CALL(*queue_, AppendOperationsMock(_, _, _)) |
644 | 1 | .Times(13); |
645 | | |
646 | | // .. but those will be overwritten later by another |
647 | | // leader, which will push and commit 5 ops. |
648 | | // Only these five should start as replica rounds. |
649 | 1 | EXPECT_CALL(*consensus_.get(), StartConsensusOnlyRoundUnlocked(_)) |
650 | 1 | .Times(4); |
651 | | |
652 | 1 | ConsensusBootstrapInfo info; |
653 | 1 | ASSERT_OK(consensus_->Start(info)); |
654 | 1 | ASSERT_OK(consensus_->EmulateElection()); |
655 | | |
656 | | // Append 10 rounds: 2.2 - 2.11 |
657 | 11 | for (int i = 0; i < 10; i++) { |
658 | 10 | AppendNoOpRound(); |
659 | 10 | } |
660 | | |
661 | | // Expectations for what gets committed and what gets aborted: |
662 | | // (note: the aborts may be triggered before the commits) |
663 | | // 5 OK's for the 2.1-2.5 ops. |
664 | | // 6 Aborts for the 2.6-2.11 ops. |
665 | | // 1 OK for the 3.6 op. |
666 | 6 | for (int index = 1; index < 6; index++) { |
667 | 5 | EXPECT_CALL(*consensus_.get(), |
668 | 5 | NonTrackedRoundReplicationFinished(RoundHasOpId(2, index), _, IsOk())).Times(1); |
669 | 5 | } |
670 | 7 | for (int index = 6; index < 12; index++) { |
671 | 6 | EXPECT_CALL(*consensus_.get(), |
672 | 6 | NonTrackedRoundReplicationFinished( |
673 | 6 | RoundHasOpId(2, index), _, IsAborted())).Times(1); |
674 | 6 | } |
675 | 1 | EXPECT_CALL(*consensus_.get(), |
676 | 1 | NonTrackedRoundReplicationFinished(RoundHasOpId(3, 6), _, IsOk())).Times(1); |
677 | | |
678 | | // Nothing's committed so far, so now just send an Update() message |
679 | | // emulating another guy got elected leader and is overwriting a suffix |
680 | | // of the previous messages. |
681 | | // In particular this request has: |
682 | | // - Op 2.5 from the previous leader's term |
683 | | // - Ops 3.6-3.9 from the new leader's term |
684 | | // - A new committed index of 3.6 |
685 | 1 | ConsensusRequestPB request; |
686 | 1 | request.set_caller_term(3); |
687 | 1 | const string PEER_0_UUID = "peer-0"; |
688 | 1 | request.set_caller_uuid(PEER_0_UUID); |
689 | 1 | request.set_tablet_id(kTestTablet); |
690 | 1 | request.mutable_preceding_id()->CopyFrom(MakeOpId(2, 4)); |
691 | | |
692 | 1 | ReplicateMsg* replicate = request.add_ops(); |
693 | 1 | replicate->mutable_id()->CopyFrom(MakeOpId(2, 5)); |
694 | 1 | replicate->set_op_type(NO_OP); |
695 | | |
696 | 1 | ReplicateMsg* noop_msg = request.add_ops(); |
697 | 1 | noop_msg->mutable_id()->CopyFrom(MakeOpId(3, 6)); |
698 | 1 | noop_msg->set_op_type(NO_OP); |
699 | 1 | noop_msg->set_hybrid_time(clock_->Now().ToUint64()); |
700 | 1 | noop_msg->mutable_noop_request(); |
701 | | |
702 | | // Overwrite another 3 of the original rounds for a total of 4 overwrites. |
703 | 4 | for (int i = 7; i < 10; i++) { |
704 | 3 | ReplicateMsg* replicate = request.add_ops(); |
705 | 3 | replicate->mutable_id()->CopyFrom(MakeOpId(3, i)); |
706 | 3 | replicate->set_op_type(NO_OP); |
707 | 3 | replicate->set_hybrid_time(clock_->Now().ToUint64()); |
708 | 3 | } |
709 | | |
710 | 1 | request.mutable_committed_op_id()->CopyFrom(MakeOpId(3, 6)); |
711 | | |
712 | 1 | ConsensusResponsePB response; |
713 | 1 | ASSERT_OK(consensus_->Update(&request, &response, CoarseBigDeadline())); |
714 | 1 | ASSERT_FALSE(response.has_error()); |
715 | | |
716 | 1 | ASSERT_TRUE(Mock::VerifyAndClearExpectations(consensus_.get())); |
717 | | |
718 | | // Now we expect to commit ops 3.7 - 3.9. |
719 | 4 | for (int index = 7; index < 10; index++) { |
720 | 3 | EXPECT_CALL(*consensus_.get(), |
721 | 3 | NonTrackedRoundReplicationFinished(RoundHasOpId(3, index), _, IsOk())).Times(1); |
722 | 3 | } |
723 | | |
724 | 1 | request.mutable_ops()->Clear(); |
725 | 1 | request.mutable_preceding_id()->CopyFrom(MakeOpId(3, 9)); |
726 | 1 | request.mutable_committed_op_id()->CopyFrom(MakeOpId(3, 9)); |
727 | | |
728 | 1 | ASSERT_OK(consensus_->Update(&request, &response, CoarseBigDeadline())); |
729 | 1 | ASSERT_FALSE(response.has_error()); |
730 | 1 | } |
731 | | |
732 | 1 | TEST_F(RaftConsensusTest, TestReceivedIdIsInittedBeforeStart) { |
733 | 1 | SetUpConsensus(); |
734 | 1 | OpIdPB opid; |
735 | 1 | consensus_->GetLastReceivedOpId().ToPB(&opid); |
736 | 1 | ASSERT_TRUE(opid.IsInitialized()); |
737 | 1 | ASSERT_OPID_EQ(opid, MinimumOpId()); |
738 | 1 | } |
739 | | |
740 | | // Ensure that followers reset their "last_received_current_leader" |
741 | | // ConsensusStatusPB field when a new term is encountered. This is a |
742 | | // correctness test for the logic on the follower side that allows the |
743 | | // leader-side queue to determine which op to send next in various scenarios. |
744 | 1 | TEST_F(RaftConsensusTest, TestResetRcvdFromCurrentLeaderOnNewTerm) { |
745 | 1 | SetUpConsensus(kMinimumTerm, 3); |
746 | 1 | SetUpGeneralExpectations(); |
747 | 1 | ConsensusBootstrapInfo info; |
748 | 1 | ASSERT_OK(consensus_->Start(info)); |
749 | | |
750 | 1 | ConsensusRequestPB request; |
751 | 1 | ConsensusResponsePB response; |
752 | 1 | int64_t caller_term = 0; |
753 | 1 | int64_t log_index = 0; |
754 | | |
755 | 1 | caller_term = 1; |
756 | 1 | string caller_uuid = config_.peers(0).permanent_uuid(); |
757 | 1 | OpIdPB preceding_opid = MinimumOpId(); |
758 | | |
759 | | // Heartbeat. This will cause the term to increment on the follower. |
760 | 1 | request = MakeConsensusRequest(caller_term, caller_uuid, preceding_opid); |
761 | 1 | response.Clear(); |
762 | 1 | ASSERT_OK(consensus_->Update(&request, &response, CoarseBigDeadline())); |
763 | 2 | ASSERT_FALSE(response.status().has_error()) << response.ShortDebugString(); |
764 | 1 | ASSERT_EQ(caller_term, response.responder_term()); |
765 | 1 | ASSERT_OPID_EQ(response.status().last_received(), MinimumOpId()); |
766 | 1 | ASSERT_OPID_EQ(response.status().last_received_current_leader(), MinimumOpId()); |
767 | | |
768 | | // Replicate a no-op. |
769 | 1 | OpIdPB noop_opid = MakeOpId(caller_term, ++log_index); |
770 | 1 | AddNoOpToConsensusRequest(&request, noop_opid); |
771 | 1 | response.Clear(); |
772 | 1 | ASSERT_OK(consensus_->Update(&request, &response, CoarseBigDeadline())); |
773 | 2 | ASSERT_FALSE(response.status().has_error()) << response.ShortDebugString(); |
774 | 1 | ASSERT_OPID_EQ(response.status().last_received(), noop_opid); |
775 | 1 | ASSERT_OPID_EQ(response.status().last_received_current_leader(), noop_opid); |
776 | | |
777 | | // New leader heartbeat. Term increase to 2. |
778 | | // Expect current term replicated to be nothing (MinimumOpId) but log |
779 | | // replicated to be everything sent so far. |
780 | 1 | caller_term = 2; |
781 | 1 | caller_uuid = config_.peers(1).permanent_uuid(); |
782 | 1 | preceding_opid = noop_opid; |
783 | 1 | request = MakeConsensusRequest(caller_term, caller_uuid, preceding_opid); |
784 | 1 | response.Clear(); |
785 | 1 | ASSERT_OK(consensus_->Update(&request, &response, CoarseBigDeadline())); |
786 | 2 | ASSERT_FALSE(response.status().has_error()) << response.ShortDebugString(); |
787 | 1 | ASSERT_EQ(caller_term, response.responder_term()); |
788 | 1 | ASSERT_OPID_EQ(response.status().last_received(), preceding_opid); |
789 | 1 | ASSERT_OPID_EQ(response.status().last_received_current_leader(), MinimumOpId()); |
790 | | |
791 | | // Append a no-op. |
792 | 1 | noop_opid = MakeOpId(caller_term, ++log_index); |
793 | 1 | AddNoOpToConsensusRequest(&request, noop_opid); |
794 | 1 | response.Clear(); |
795 | 1 | ASSERT_OK(consensus_->Update(&request, &response, CoarseBigDeadline())); |
796 | 2 | ASSERT_FALSE(response.status().has_error()) << response.ShortDebugString(); |
797 | 1 | ASSERT_OPID_EQ(response.status().last_received(), noop_opid); |
798 | 1 | ASSERT_OPID_EQ(response.status().last_received_current_leader(), noop_opid); |
799 | | |
800 | | // New leader heartbeat. The term should rev but we should get an LMP mismatch. |
801 | 1 | caller_term = 3; |
802 | 1 | caller_uuid = config_.peers(0).permanent_uuid(); |
803 | 1 | preceding_opid = MakeOpId(caller_term, log_index + 1); // Not replicated yet. |
804 | 1 | request = MakeConsensusRequest(caller_term, caller_uuid, preceding_opid); |
805 | 1 | response.Clear(); |
806 | 1 | ASSERT_OK(consensus_->Update(&request, &response, CoarseBigDeadline())); |
807 | 1 | ASSERT_EQ(caller_term, response.responder_term()); |
808 | 1 | ASSERT_OPID_EQ(response.status().last_received(), noop_opid); // Not preceding this time. |
809 | 1 | ASSERT_OPID_EQ(response.status().last_received_current_leader(), MinimumOpId()); |
810 | 2 | ASSERT_TRUE(response.status().has_error()) << response.ShortDebugString(); |
811 | 1 | ASSERT_EQ(ConsensusErrorPB::PRECEDING_ENTRY_DIDNT_MATCH, response.status().error().code()); |
812 | | |
813 | | // Decrement preceding and append a no-op. |
814 | 1 | preceding_opid = MakeOpId(2, log_index); |
815 | 1 | noop_opid = MakeOpId(caller_term, ++log_index); |
816 | 1 | request = MakeConsensusRequest(caller_term, caller_uuid, preceding_opid); |
817 | 1 | AddNoOpToConsensusRequest(&request, noop_opid); |
818 | 1 | response.Clear(); |
819 | 1 | ASSERT_OK(consensus_->Update(&request, &response, CoarseBigDeadline())); |
820 | 2 | ASSERT_FALSE(response.status().has_error()) << response.ShortDebugString(); |
821 | 1 | ASSERT_OPID_EQ(response.status().last_received(), noop_opid) << response.ShortDebugString(); |
822 | 1 | ASSERT_OPID_EQ(response.status().last_received_current_leader(), noop_opid) |
823 | 0 | << response.ShortDebugString(); |
824 | | |
825 | | // Happy case. New leader with new no-op to append right off the bat. |
826 | | // Response should be OK with all last_received* fields equal to the new no-op. |
827 | 1 | caller_term = 4; |
828 | 1 | caller_uuid = config_.peers(1).permanent_uuid(); |
829 | 1 | preceding_opid = noop_opid; |
830 | 1 | noop_opid = MakeOpId(caller_term, ++log_index); |
831 | 1 | request = MakeConsensusRequest(caller_term, caller_uuid, preceding_opid); |
832 | 1 | AddNoOpToConsensusRequest(&request, noop_opid); |
833 | 1 | response.Clear(); |
834 | 1 | ASSERT_OK(consensus_->Update(&request, &response, CoarseBigDeadline())); |
835 | 2 | ASSERT_FALSE(response.status().has_error()) << response.ShortDebugString(); |
836 | 1 | ASSERT_EQ(caller_term, response.responder_term()); |
837 | 1 | ASSERT_OPID_EQ(response.status().last_received(), noop_opid); |
838 | 1 | ASSERT_OPID_EQ(response.status().last_received_current_leader(), noop_opid); |
839 | 1 | } |
840 | | |
841 | | } // namespace consensus |
842 | | } // namespace yb |