/Users/deen/code/yugabyte-db/src/yb/consensus/consensus_queue-test.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | // |
18 | | // The following only applies to changes made to this file as part of YugaByte development. |
19 | | // |
20 | | // Portions Copyright (c) YugaByte, Inc. |
21 | | // |
22 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
23 | | // in compliance with the License. You may obtain a copy of the License at |
24 | | // |
25 | | // http://www.apache.org/licenses/LICENSE-2.0 |
26 | | // |
27 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
28 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
29 | | // or implied. See the License for the specific language governing permissions and limitations |
30 | | // under the License. |
31 | | // |
32 | | |
33 | | #include <gtest/gtest.h> |
34 | | |
35 | | #include "yb/common/schema.h" |
36 | | #include "yb/common/wire_protocol-test-util.h" |
37 | | |
38 | | #include "yb/consensus/consensus-test-util.h" |
39 | | #include "yb/consensus/consensus.pb.h" |
40 | | #include "yb/consensus/consensus_queue.h" |
41 | | #include "yb/consensus/log-test-base.h" |
42 | | #include "yb/consensus/log_anchor_registry.h" |
43 | | #include "yb/consensus/log_reader.h" |
44 | | #include "yb/consensus/log_util.h" |
45 | | #include "yb/consensus/replicate_msgs_holder.h" |
46 | | |
47 | | #include "yb/fs/fs_manager.h" |
48 | | |
49 | | #include "yb/server/hybrid_clock.h" |
50 | | |
51 | | #include "yb/util/metrics.h" |
52 | | #include "yb/util/test_macros.h" |
53 | | #include "yb/util/test_util.h" |
54 | | #include "yb/util/threadpool.h" |
55 | | |
56 | | DECLARE_bool(enable_data_block_fsync); |
57 | | DECLARE_uint64(consensus_max_batch_size_bytes); |
58 | | |
59 | | METRIC_DECLARE_entity(tablet); |
60 | | |
61 | | namespace yb { |
62 | | namespace consensus { |
63 | | |
64 | | static const char* kLeaderUuid = "peer-0"; |
65 | | static const char* kPeerUuid = "peer-1"; |
66 | | static const char* kTestTable = "test-table"; |
67 | | static const char* kTestTablet = "test-tablet"; |
68 | | |
69 | | constexpr int kNumMessages = 100; |
70 | | |
71 | | class ConsensusQueueTest : public YBTest { |
72 | | public: |
73 | | ConsensusQueueTest() |
74 | | : schema_(GetSimpleTestSchema()), |
75 | | metric_entity_(METRIC_ENTITY_tablet.Instantiate(&metric_registry_, "queue-test")), |
76 | 10 | registry_(new log::LogAnchorRegistry) { |
77 | 10 | FLAGS_enable_data_block_fsync = false; // Keep unit tests fast. |
78 | 10 | } |
79 | | |
80 | 10 | void SetUp() override { |
81 | 10 | YBTest::SetUp(); |
82 | 10 | fs_manager_.reset(new FsManager(env_.get(), GetTestPath("fs_root"), "tserver_test")); |
83 | 10 | ASSERT_OK(fs_manager_->CreateInitialFileSystemLayout()); |
84 | 10 | ASSERT_OK(fs_manager_->Open()); |
85 | 10 | ASSERT_OK(ThreadPoolBuilder("log").Build(&log_thread_pool_)); |
86 | 10 | ASSERT_OK(log::Log::Open(log::LogOptions(), |
87 | 10 | kTestTablet, |
88 | 10 | fs_manager_->GetFirstTabletWalDirOrDie(kTestTable, kTestTablet), |
89 | 10 | fs_manager_->uuid(), |
90 | 10 | schema_, |
91 | 10 | 0, // schema_version |
92 | 10 | nullptr, |
93 | 10 | nullptr, |
94 | 10 | log_thread_pool_.get(), |
95 | 10 | log_thread_pool_.get(), |
96 | 10 | std::numeric_limits<int64_t>::max(), // cdc_min_replicated_index |
97 | 10 | &log_)); |
98 | 10 | clock_.reset(new server::HybridClock()); |
99 | 10 | ASSERT_OK(clock_->Init()); |
100 | | |
101 | 10 | ASSERT_OK(ThreadPoolBuilder("raft").Build(&raft_pool_)); |
102 | 10 | CloseAndReopenQueue(); |
103 | 10 | } |
104 | | |
105 | 12 | void CloseAndReopenQueue() { |
106 | | // Blow away the memtrackers before creating the new queue. |
107 | 12 | queue_.reset(); |
108 | 12 | auto token = raft_pool_->NewToken(ThreadPool::ExecutionMode::SERIAL); |
109 | 12 | queue_.reset(new PeerMessageQueue(metric_entity_, |
110 | 12 | log_.get(), |
111 | 12 | nullptr /* server_tracker */, |
112 | 12 | nullptr /* parent_tracker */, |
113 | 12 | FakeRaftPeerPB(kLeaderUuid), |
114 | 12 | kTestTablet, |
115 | 12 | clock_, |
116 | 12 | nullptr /* consensus_context */, |
117 | 12 | std::move(token))); |
118 | 12 | consensus_.reset(new TestRaftConsensusQueueIface()); |
119 | 12 | queue_->RegisterObserver(consensus_.get()); |
120 | 12 | } |
121 | | |
122 | 10 | void TearDown() override { |
123 | 10 | ASSERT_OK(log_->WaitUntilAllFlushed()); |
124 | 10 | queue_->Close(); |
125 | 10 | } |
126 | | |
127 | 23 | Status AppendReplicateMsg(int term, int index, int payload_size) { |
128 | 23 | return queue_->TEST_AppendOperation(CreateDummyReplicate( |
129 | 23 | term, index, clock_->Now(), payload_size)); |
130 | 23 | } |
131 | | |
132 | | // Updates the peer's watermark in the queue so that it matches |
133 | | // the operation we want, since the queue always assumes that |
134 | | // when a peer gets tracked it's always tracked starting at the |
135 | | // last operation in the queue |
136 | | bool UpdatePeerWatermarkToOp(ConsensusRequestPB* request, |
137 | | ConsensusResponsePB* response, |
138 | | const OpIdPB& last_received, |
139 | | const OpIdPB& last_received_current_leader, |
140 | 5 | int64_t last_committed_idx) { |
141 | 5 | queue_->TrackPeer(kPeerUuid); |
142 | 5 | response->set_responder_uuid(kPeerUuid); |
143 | | |
144 | | // Ask for a request. The queue assumes the peer is up-to-date so this should contain no |
145 | | // operations. |
146 | 5 | ReplicateMsgsHolder refs; |
147 | 5 | bool needs_remote_bootstrap; |
148 | 5 | EXPECT_OK(queue_->RequestForPeer(kPeerUuid, request, &refs, &needs_remote_bootstrap)); |
149 | 5 | EXPECT_FALSE(needs_remote_bootstrap); |
150 | 5 | EXPECT_EQ(request->ops_size(), 0); |
151 | | |
152 | | // Refuse saying that the log matching property check failed and |
153 | | // that our last operation is actually 'last_received'. |
154 | 5 | RefuseWithLogPropertyMismatch(response, last_received, last_received_current_leader); |
155 | 5 | response->mutable_status()->set_last_committed_idx(last_committed_idx); |
156 | 5 | bool result = queue_->ResponseFromPeer(response->responder_uuid(), *response); |
157 | 5 | request->Clear(); |
158 | 5 | response->mutable_status()->Clear(); |
159 | 5 | return result; |
160 | 5 | } |
161 | | |
162 | | // Like the above but uses the last received index as the commtited index. |
163 | | bool UpdatePeerWatermarkToOp(ConsensusRequestPB* request, |
164 | | ConsensusResponsePB* response, |
165 | | const OpIdPB& last_received, |
166 | 4 | const OpIdPB& last_received_current_leader) { |
167 | 4 | return UpdatePeerWatermarkToOp(request, response, last_received, |
168 | 4 | last_received_current_leader, |
169 | 4 | last_received.index()); |
170 | 4 | } |
171 | | |
172 | | void RefuseWithLogPropertyMismatch(ConsensusResponsePB* response, |
173 | | const OpIdPB& last_received, |
174 | 5 | const OpIdPB& last_received_current_leader) { |
175 | 5 | ConsensusStatusPB* status = response->mutable_status(); |
176 | 5 | status->mutable_last_received()->CopyFrom(last_received); |
177 | 5 | status->mutable_last_received_current_leader()->CopyFrom(last_received_current_leader); |
178 | 5 | ConsensusErrorPB* error = status->mutable_error(); |
179 | 5 | error->set_code(ConsensusErrorPB::PRECEDING_ENTRY_DIDNT_MATCH); |
180 | 5 | StatusToPB(STATUS(IllegalState, "LMP failed."), error->mutable_status()); |
181 | 5 | } |
182 | | |
183 | 6 | void WaitForLocalPeerToAckIndex(int64_t index) { |
184 | 11 | while (true) { |
185 | 11 | PeerMessageQueue::TrackedPeer leader = queue_->GetTrackedPeerForTests(kLeaderUuid); |
186 | 11 | if (leader.last_received.index >= index) { |
187 | 6 | break; |
188 | 6 | } |
189 | 5 | SleepFor(MonoDelta::FromMilliseconds(10)); |
190 | 5 | } |
191 | 6 | } |
192 | | |
193 | | // Sets the last received op on the response, as well as the last committed index. |
194 | | void SetLastReceivedAndLastCommitted(ConsensusResponsePB* response, |
195 | | const OpId& last_received, |
196 | | const OpId& last_received_current_leader, |
197 | 22 | int64_t last_committed_idx) { |
198 | 22 | last_received.ToPB(response->mutable_status()->mutable_last_received()); |
199 | 22 | last_received_current_leader.ToPB( |
200 | 22 | response->mutable_status()->mutable_last_received_current_leader()); |
201 | 22 | response->mutable_status()->set_last_committed_idx(last_committed_idx); |
202 | 22 | } |
203 | | |
204 | | // Like the above but uses the same last_received for current term. |
205 | | void SetLastReceivedAndLastCommitted(ConsensusResponsePB* response, |
206 | | const OpId& last_received, |
207 | 19 | int64_t last_committed_idx) { |
208 | 19 | SetLastReceivedAndLastCommitted(response, last_received, last_received, last_committed_idx); |
209 | 19 | } |
210 | | |
211 | | // Like the above but just sets the last committed index to have the same index |
212 | | // as the last received op. |
213 | | void SetLastReceivedAndLastCommitted(ConsensusResponsePB* response, |
214 | 16 | const OpId& last_received) { |
215 | 16 | SetLastReceivedAndLastCommitted(response, last_received, last_received.index); |
216 | 16 | } |
217 | | |
218 | | protected: |
219 | | std::unique_ptr<TestRaftConsensusQueueIface> consensus_; |
220 | | const Schema schema_; |
221 | | std::unique_ptr<FsManager> fs_manager_; |
222 | | MetricRegistry metric_registry_; |
223 | | scoped_refptr<MetricEntity> metric_entity_; |
224 | | std::unique_ptr<ThreadPool> log_thread_pool_; |
225 | | scoped_refptr<log::Log> log_; |
226 | | std::unique_ptr<ThreadPool> raft_pool_; |
227 | | std::unique_ptr<PeerMessageQueue> queue_; |
228 | | scoped_refptr<log::LogAnchorRegistry> registry_; |
229 | | scoped_refptr<server::Clock> clock_; |
230 | | }; |
231 | | |
232 | | // Tests that the queue is able to track a peer when it starts tracking a peer |
233 | | // after the initial message in the queue. In particular this creates a queue |
234 | | // with several messages and then starts to track a peer whose watermark |
235 | | // falls in the middle of the current messages in the queue. |
236 | 1 | TEST_F(ConsensusQueueTest, TestStartTrackingAfterStart) { |
237 | 1 | queue_->Init(OpId::Min()); |
238 | 1 | queue_->SetLeaderMode( |
239 | 1 | OpId::Min(), OpId::Min().term, OpId::Min(), BuildRaftConfigPBForTests(2)); |
240 | 1 | AppendReplicateMessagesToQueue(queue_.get(), clock_, 1, 100); |
241 | | |
242 | 1 | ConsensusRequestPB request; |
243 | 1 | ConsensusResponsePB response; |
244 | 1 | response.set_responder_uuid(kPeerUuid); |
245 | | |
246 | | // Peer already has some messages, last one being index (kNumMessages / 2) |
247 | 1 | OpIdPB last_received = MakeOpIdPbForIndex(kNumMessages / 2); |
248 | 1 | OpIdPB last_received_current_leader = MinimumOpId(); |
249 | | |
250 | 1 | ASSERT_TRUE(UpdatePeerWatermarkToOp( |
251 | 1 | &request, &response, last_received, last_received_current_leader)); |
252 | | |
253 | | // Getting a new request should get all operations after 7.50 |
254 | 1 | ReplicateMsgsHolder refs; |
255 | 1 | bool needs_remote_bootstrap; |
256 | 1 | ASSERT_OK(queue_->RequestForPeer(kPeerUuid, &request, &refs, &needs_remote_bootstrap)); |
257 | 1 | ASSERT_FALSE(needs_remote_bootstrap); |
258 | 1 | ASSERT_EQ(kNumMessages / 2, request.ops_size()); |
259 | | |
260 | 1 | SetLastReceivedAndLastCommitted( |
261 | 1 | &response, OpId::FromPB(request.ops((kNumMessages / 2) - 1).id())); |
262 | 2 | ASSERT_FALSE(queue_->ResponseFromPeer(response.responder_uuid(), response)) |
263 | 2 | << "Queue still had requests pending"; |
264 | | |
265 | | // if we ask for a new request, it should come back empty |
266 | 1 | refs.Reset(); |
267 | 1 | ASSERT_OK(queue_->RequestForPeer(kPeerUuid, &request, &refs, &needs_remote_bootstrap)); |
268 | 1 | ASSERT_FALSE(needs_remote_bootstrap); |
269 | 1 | ASSERT_EQ(0, request.ops_size()); |
270 | 1 | } |
271 | | |
272 | | // Tests that the peers gets the messages pages, with the size of a page being |
273 | | // 'consensus_max_batch_size_bytes' |
274 | 1 | TEST_F(ConsensusQueueTest, TestGetPagedMessages) { |
275 | 1 | queue_->Init(OpId::Min()); |
276 | 1 | queue_->SetLeaderMode( |
277 | 1 | OpId::Min(), OpId::Min().term, OpId::Min(), BuildRaftConfigPBForTests(2)); |
278 | | |
279 | 1 | const int kOpsPerRequest = 9; |
280 | 1 | int32_t page_size_estimate = 0; |
281 | 1 | { |
282 | | // Helper to estimate request size so that we can set the max batch size appropriately. |
283 | 1 | ConsensusRequestPB page_size_estimator; |
284 | 1 | page_size_estimator.set_caller_term(14); |
285 | 1 | OpIdPB* committed_index = page_size_estimator.mutable_committed_op_id(); |
286 | 1 | OpIdPB* preceding_id = page_size_estimator.mutable_preceding_id(); |
287 | | |
288 | | // The actual leader lease duration does not matter here, we just want it to be set. |
289 | 1 | page_size_estimator.set_leader_lease_duration_ms(kDefaultLeaderLeaseDurationMs); |
290 | 1 | page_size_estimator.set_ht_lease_expiration(1000); |
291 | 1 | const HybridTime ht = clock_->Now(); |
292 | 1 | page_size_estimator.set_propagated_hybrid_time(ht.ToUint64()); |
293 | 1 | committed_index->CopyFrom(MinimumOpId()); |
294 | 1 | preceding_id->CopyFrom(MinimumOpId()); |
295 | | |
296 | | // We're going to add 100 messages to the queue so we make each page fetch 9 of those, |
297 | | // for a total of 12 pages. The last page should have a single op. |
298 | 1 | ReplicateMsgs replicates; |
299 | 10 | for (int i = 0; i < kOpsPerRequest; i++) { |
300 | 9 | replicates.push_back(CreateDummyReplicate( |
301 | 9 | 0 /* term */, 0 /* index */, ht, 0 /* payload_size */)); |
302 | 9 | page_size_estimator.mutable_ops()->AddAllocated(replicates.back().get()); |
303 | 9 | } |
304 | | |
305 | 1 | page_size_estimate = page_size_estimator.ByteSize(); |
306 | 1 | LOG(INFO) << "page_size_estimate=" << page_size_estimate; |
307 | 1 | page_size_estimator.mutable_ops()->ExtractSubrange(0, |
308 | 1 | page_size_estimator.ops_size(), |
309 | 1 | /* elements */ nullptr); |
310 | 1 | } |
311 | | |
312 | | // Save the current flag state. |
313 | 1 | google::FlagSaver saver; |
314 | 1 | FLAGS_consensus_max_batch_size_bytes = page_size_estimate; |
315 | | |
316 | 1 | ConsensusRequestPB request; |
317 | 1 | ConsensusResponsePB response; |
318 | 1 | response.set_responder_uuid(kPeerUuid); |
319 | | |
320 | 1 | ASSERT_TRUE(UpdatePeerWatermarkToOp(&request, &response, MinimumOpId(), MinimumOpId())); |
321 | | |
322 | | // Append the messages after the queue is tracked. Otherwise the ops might |
323 | | // get evicted from the cache immediately and the requests below would |
324 | | // result in async log reads instead of cache hits. |
325 | 1 | AppendReplicateMessagesToQueue(queue_.get(), clock_, 1, 100); |
326 | | |
327 | 1 | OpIdPB last; |
328 | 12 | for (int i = 0; i < 11; i++) { |
329 | 0 | VLOG(1) << "Making request " << i; |
330 | 11 | ReplicateMsgsHolder refs; |
331 | 11 | bool needs_remote_bootstrap; |
332 | 11 | ASSERT_OK(queue_->RequestForPeer(kPeerUuid, &request, &refs, &needs_remote_bootstrap)); |
333 | | |
334 | 11 | ASSERT_FALSE(needs_remote_bootstrap); |
335 | 11 | LOG(INFO) << "Number of ops in request: " << request.ops_size(); |
336 | 11 | ASSERT_EQ(kOpsPerRequest, request.ops_size()); |
337 | 11 | last = request.ops(request.ops_size() - 1).id(); |
338 | 11 | SetLastReceivedAndLastCommitted(&response, OpId::FromPB(last)); |
339 | 0 | VLOG(1) << "Faking received up through " << last; |
340 | 11 | ASSERT_TRUE(queue_->ResponseFromPeer(response.responder_uuid(), response)); |
341 | 11 | } |
342 | 1 | ReplicateMsgsHolder refs; |
343 | 1 | bool needs_remote_bootstrap; |
344 | 1 | ASSERT_OK(queue_->RequestForPeer(kPeerUuid, &request, &refs, &needs_remote_bootstrap)); |
345 | 1 | ASSERT_FALSE(needs_remote_bootstrap); |
346 | 1 | ASSERT_EQ(1, request.ops_size()); |
347 | 1 | last = request.ops(request.ops_size() - 1).id(); |
348 | 1 | SetLastReceivedAndLastCommitted(&response, OpId::FromPB(last)); |
349 | 1 | ASSERT_FALSE(queue_->ResponseFromPeer(response.responder_uuid(), response)); |
350 | 1 | } |
351 | | |
352 | 1 | TEST_F(ConsensusQueueTest, TestPeersDontAckBeyondWatermarks) { |
353 | 1 | queue_->Init(OpId::Min()); |
354 | 1 | queue_->SetLeaderMode( |
355 | 1 | OpId::Min(), OpId::Min().term, OpId::Min(), BuildRaftConfigPBForTests(3)); |
356 | 1 | AppendReplicateMessagesToQueue(queue_.get(), clock_, 1, kNumMessages); |
357 | | |
358 | | // Wait for the local peer to append all messages |
359 | 1 | WaitForLocalPeerToAckIndex(kNumMessages); |
360 | | |
361 | 1 | ASSERT_EQ(queue_->TEST_GetMajorityReplicatedOpId(), OpId::Min()); |
362 | | // Since we're tracking a single peer still this should have moved the all |
363 | | // replicated watermark to the last op appended to the local log. |
364 | 1 | ASSERT_EQ(queue_->TEST_GetAllReplicatedIndex(), MakeOpIdForIndex(kNumMessages)); |
365 | 1 | ASSERT_EQ(queue_->TEST_GetLastAppliedOpId(), OpId::Min()); |
366 | 1 | ASSERT_EQ(queue_->GetAllAppliedOpId(), OpId::Min()); |
367 | | |
368 | | // Start to track the peer after the queue has some messages in it |
369 | | // at a point that is halfway through the current messages in the queue. |
370 | 1 | OpIdPB first_msg = MakeOpIdPbForIndex(kNumMessages / 2); |
371 | | |
372 | 1 | ConsensusRequestPB request; |
373 | 1 | ConsensusResponsePB response; |
374 | 1 | response.set_responder_uuid(kPeerUuid); |
375 | | |
376 | 1 | ASSERT_TRUE(UpdatePeerWatermarkToOp(&request, &response, first_msg, MinimumOpId())); |
377 | | |
378 | | // Tracking a peer a new peer should have moved the all replicated watermark back. |
379 | 1 | ASSERT_EQ(queue_->TEST_GetAllReplicatedIndex(), OpId::Min()); |
380 | 1 | ASSERT_EQ(queue_->TEST_GetMajorityReplicatedOpId(), OpId::Min()); |
381 | 1 | ASSERT_EQ(queue_->TEST_GetLastAppliedOpId(), OpId::Min()); |
382 | 1 | ASSERT_EQ(queue_->GetAllAppliedOpId(), OpId::Min()); |
383 | | |
384 | 1 | ReplicateMsgsHolder refs; |
385 | 1 | bool needs_remote_bootstrap; |
386 | 1 | ASSERT_OK(queue_->RequestForPeer(kPeerUuid, &request, &refs, &needs_remote_bootstrap)); |
387 | 1 | ASSERT_FALSE(needs_remote_bootstrap); |
388 | 1 | ASSERT_EQ(kNumMessages / 2, request.ops_size()); |
389 | | |
390 | 1 | AppendReplicateMessagesToQueue(queue_.get(), clock_, 101, kNumMessages); |
391 | | |
392 | 1 | SetLastReceivedAndLastCommitted( |
393 | 1 | &response, OpId::FromPB(request.ops((kNumMessages / 2) - 1).id())); |
394 | 1 | response.set_responder_term(28); |
395 | | |
396 | 2 | ASSERT_TRUE(queue_->ResponseFromPeer(response.responder_uuid(), response)) |
397 | 2 | << "Queue didn't have anymore requests pending"; |
398 | | |
399 | 1 | auto expected_op_id = MakeOpIdForIndex(kNumMessages); |
400 | 1 | ASSERT_EQ(queue_->TEST_GetMajorityReplicatedOpId(), expected_op_id); |
401 | 1 | ASSERT_EQ(queue_->TEST_GetAllReplicatedIndex(), expected_op_id); |
402 | 1 | consensus_->WaitForMajorityReplicatedIndex(expected_op_id.index); |
403 | 1 | ASSERT_EQ(queue_->TEST_GetLastAppliedOpId(), expected_op_id); |
404 | | |
405 | | // if we ask for a new request, it should come back with the rest of the messages |
406 | 1 | refs.Reset(); |
407 | 1 | ASSERT_OK(queue_->RequestForPeer(kPeerUuid, &request, &refs, &needs_remote_bootstrap)); |
408 | 1 | ASSERT_FALSE(needs_remote_bootstrap); |
409 | 1 | ASSERT_EQ(kNumMessages, request.ops_size()); |
410 | | |
411 | 1 | OpId expected = OpId::FromPB(request.ops(kNumMessages - 1).id()); |
412 | | |
413 | 1 | SetLastReceivedAndLastCommitted(&response, expected); |
414 | 1 | response.set_responder_term(expected.term); |
415 | 2 | ASSERT_FALSE(queue_->ResponseFromPeer(response.responder_uuid(), response)) |
416 | 2 | << "Queue didn't have anymore requests pending"; |
417 | | |
418 | 1 | WaitForLocalPeerToAckIndex(expected.index); |
419 | | |
420 | 1 | ASSERT_EQ(queue_->TEST_GetMajorityReplicatedOpId(), expected); |
421 | 1 | ASSERT_EQ(queue_->TEST_GetAllReplicatedIndex(), expected); |
422 | 1 | consensus_->WaitForMajorityReplicatedIndex(expected.index); |
423 | 1 | ASSERT_EQ(queue_->TEST_GetLastAppliedOpId(), expected); |
424 | 1 | } |
425 | | |
426 | 1 | TEST_F(ConsensusQueueTest, TestQueueAdvancesCommittedIndex) { |
427 | 1 | queue_->Init(OpId::Min()); |
428 | 1 | queue_->SetLeaderMode( |
429 | 1 | OpId::Min(), OpId::Min().term, OpId::Min(), BuildRaftConfigPBForTests(5)); |
430 | | // Track 4 additional peers (in addition to the local peer) |
431 | 1 | queue_->TrackPeer("peer-1"); |
432 | 1 | queue_->TrackPeer("peer-2"); |
433 | 1 | queue_->TrackPeer("peer-3"); |
434 | 1 | queue_->TrackPeer("peer-4"); |
435 | | |
436 | | // Append 10 messages to the queue with a majority of 2 for a total of 3 peers. |
437 | | // This should add messages 0.1 -> 0.7, 1.8 -> 1.10 to the queue. |
438 | 1 | AppendReplicateMessagesToQueue(queue_.get(), clock_, 1, 10); |
439 | 1 | WaitForLocalPeerToAckIndex(10); |
440 | | |
441 | | // Since only the local log might have ACKed at this point, |
442 | | // the committed_index should be MinimumOpId(). |
443 | 1 | queue_->raft_pool_observers_token_->Wait(); |
444 | 1 | ASSERT_EQ(queue_->TEST_GetCommittedIndex(), OpId::Min()); |
445 | 1 | ASSERT_EQ(queue_->TEST_GetLastAppliedOpId(), OpId::Min()); |
446 | 1 | ASSERT_EQ(queue_->GetAllAppliedOpId(), OpId::Min()); |
447 | | |
448 | | // NOTE: We don't need to get operations from the queue. The queue |
449 | | // only cares about what the peer reported as received, not what was sent. |
450 | 1 | ConsensusResponsePB response; |
451 | 1 | response.set_responder_term(1); |
452 | | |
453 | 1 | OpId last_sent = MakeOpIdForIndex(5); |
454 | | |
455 | | // Ack the first five operations for peer-1 |
456 | 1 | response.set_responder_uuid("peer-1"); |
457 | 1 | SetLastReceivedAndLastCommitted(&response, last_sent, MinimumOpId().index()); |
458 | | |
459 | 1 | ASSERT_TRUE(queue_->ResponseFromPeer(response.responder_uuid(), response)); |
460 | | |
461 | | // Committed index should be the same |
462 | 1 | queue_->raft_pool_observers_token_->Wait(); |
463 | 1 | ASSERT_EQ(queue_->TEST_GetCommittedIndex(), OpId::Min()); |
464 | 1 | ASSERT_EQ(queue_->TEST_GetLastAppliedOpId(), OpId::Min()); |
465 | 1 | ASSERT_EQ(queue_->GetAllAppliedOpId(), OpId::Min()); |
466 | | |
467 | | // Ack the first five operations for peer-2 |
468 | 1 | response.set_responder_uuid("peer-2"); |
469 | 1 | ASSERT_TRUE(queue_->ResponseFromPeer(response.responder_uuid(), response)); |
470 | | |
471 | | // A majority has now replicated up to 0.5. |
472 | 1 | queue_->raft_pool_observers_token_->Wait(); |
473 | 1 | ASSERT_EQ(queue_->TEST_GetMajorityReplicatedOpId(), MakeOpIdForIndex(5)); |
474 | | |
475 | 1 | string up_to_date_peer = queue_->GetUpToDatePeer(); |
476 | 1 | ASSERT_TRUE((up_to_date_peer == "peer-2") || (up_to_date_peer == "peer-1")); |
477 | | |
478 | | // Ack all operations for peer-3 |
479 | 1 | response.set_responder_uuid("peer-3"); |
480 | 1 | last_sent = MakeOpIdForIndex(10); |
481 | 1 | SetLastReceivedAndLastCommitted(&response, last_sent, OpId::Min().index); |
482 | | |
483 | | // The committed index moved so 'more_pending' should be true so that the peer is |
484 | | // notified. |
485 | 1 | ASSERT_TRUE(queue_->ResponseFromPeer(response.responder_uuid(), response)); |
486 | | |
487 | 1 | up_to_date_peer.clear(); |
488 | 1 | up_to_date_peer = queue_->GetUpToDatePeer(); |
489 | 1 | ASSERT_EQ(up_to_date_peer, "peer-3"); |
490 | | |
491 | | // Majority replicated watermark should be the same |
492 | 1 | ASSERT_EQ(queue_->TEST_GetMajorityReplicatedOpId(), MakeOpIdForIndex(5)); |
493 | | |
494 | | // Ack the remaining operations for peer-4 |
495 | 1 | response.set_responder_uuid("peer-4"); |
496 | 1 | ASSERT_TRUE(queue_->ResponseFromPeer(response.responder_uuid(), response)); |
497 | | |
498 | 1 | up_to_date_peer.clear(); |
499 | 1 | up_to_date_peer = queue_->GetUpToDatePeer(); |
500 | 1 | ASSERT_TRUE((up_to_date_peer == "peer-3") || (up_to_date_peer == "peer-4")); |
501 | | |
502 | | // Now that a majority of peers have replicated an operation in the queue's |
503 | | // term the committed index should advance. |
504 | 1 | queue_->raft_pool_observers_token_->Wait(); |
505 | 1 | const auto expected_op_id = MakeOpIdForIndex(10); |
506 | 1 | ASSERT_EQ(queue_->TEST_GetMajorityReplicatedOpId(), expected_op_id); |
507 | 1 | ASSERT_EQ(queue_->TEST_GetCommittedIndex(), expected_op_id); |
508 | 1 | consensus_->WaitForMajorityReplicatedIndex(expected_op_id.index); |
509 | 1 | ASSERT_EQ(queue_->TEST_GetLastAppliedOpId(), expected_op_id); |
510 | 1 | } |
511 | | |
512 | | // In this test we append a sequence of operations to a log |
513 | | // and then start tracking a peer whose first required operation |
514 | | // is before the first operation in the queue. |
515 | 1 | TEST_F(ConsensusQueueTest, TestQueueLoadsOperationsForPeer) { |
516 | 1 | OpIdPB opid = MakeOpId(1, 1); |
517 | | |
518 | 101 | for (int i = 1; i <= 100; i++) { |
519 | 100 | ASSERT_OK(log::AppendNoOpToLogSync(clock_, log_.get(), &opid)); |
520 | | // Roll the log every 10 ops |
521 | 100 | if (i % 10 == 0) { |
522 | 10 | ASSERT_OK(log_->AllocateSegmentAndRollOver()); |
523 | 10 | } |
524 | 100 | } |
525 | 1 | ASSERT_OK(log_->WaitUntilAllFlushed()); |
526 | | |
527 | 1 | OpIdPB queues_last_op = opid; |
528 | 1 | queues_last_op.set_index(queues_last_op.index() - 1); |
529 | | |
530 | | // Now reset the queue so that we can pass a new committed index, |
531 | | // the last operation in the log. |
532 | 1 | CloseAndReopenQueue(); |
533 | | |
534 | 1 | OpId committed_index(1, 100); |
535 | 1 | const auto last_applied_op = committed_index; |
536 | 1 | queue_->Init(committed_index); |
537 | 1 | queue_->SetLeaderMode( |
538 | 1 | committed_index, committed_index.term, last_applied_op, BuildRaftConfigPBForTests(3)); |
539 | | |
540 | 1 | ConsensusRequestPB request; |
541 | 1 | ConsensusResponsePB response; |
542 | 1 | response.set_responder_uuid(kPeerUuid); |
543 | | |
544 | | // The peer will actually be behind the first operation in the queue |
545 | | // in this case about 50 operations before. |
546 | 1 | OpIdPB peers_last_op; |
547 | 1 | peers_last_op.set_term(1); |
548 | 1 | peers_last_op.set_index(50); |
549 | | |
550 | | // Now we start tracking the peer, this negotiation round should let |
551 | | // the queue know how far along the peer is. |
552 | | // The queue should reply that there are more messages for the peer. |
553 | 1 | ASSERT_TRUE(UpdatePeerWatermarkToOp( |
554 | 1 | &request, &response, peers_last_op, MinimumOpId())); |
555 | | |
556 | | // When we get another request for the peer the queue should load |
557 | | // the missing operations. |
558 | 1 | ReplicateMsgsHolder refs; |
559 | 1 | bool needs_remote_bootstrap; |
560 | 1 | ASSERT_OK(queue_->RequestForPeer(kPeerUuid, &request, &refs, &needs_remote_bootstrap)); |
561 | 1 | ASSERT_FALSE(needs_remote_bootstrap); |
562 | 1 | ASSERT_EQ(request.ops_size(), 50); |
563 | 1 | } |
564 | | |
565 | | // This tests that the queue is able to handle operation overwriting, i.e. when a |
566 | | // newly tracked peer reports the last received operations as some operation that |
567 | | // doesn't exist in the leader's log. In particular it tests the case where a |
568 | | // new leader starts at term 2 with only a part of the operations of the previous |
569 | | // leader having been committed. |
570 | 1 | TEST_F(ConsensusQueueTest, TestQueueHandlesOperationOverwriting) { |
571 | 1 | OpIdPB opid = MakeOpId(1, 1); |
572 | | // Append 10 messages in term 1 to the log. |
573 | 11 | for (int i = 1; i <= 10; i++) { |
574 | 10 | ASSERT_OK(log::AppendNoOpToLogSync(clock_, log_.get(), &opid)); |
575 | | // Roll the log every 3 ops |
576 | 10 | if (i % 3 == 0) { |
577 | 3 | ASSERT_OK(log_->AllocateSegmentAndRollOver()); |
578 | 3 | } |
579 | 10 | } |
580 | | |
581 | 1 | opid = MakeOpId(2, 11); |
582 | | // Now append 10 more messages in term 2. |
583 | 11 | for (int i = 11; i <= 20; i++) { |
584 | 10 | ASSERT_OK(log::AppendNoOpToLogSync(clock_, log_.get(), &opid)); |
585 | | // Roll the log every 3 ops |
586 | 10 | if (i % 3 == 0) { |
587 | 3 | ASSERT_OK(log_->AllocateSegmentAndRollOver()); |
588 | 3 | } |
589 | 10 | } |
590 | | |
591 | | |
592 | | // Now reset the queue so that we can pass a new committed index, |
593 | | // op, 2.15. |
594 | 1 | CloseAndReopenQueue(); |
595 | | |
596 | 1 | const OpId committed_op_id(2, 15); |
597 | 1 | queue_->Init(OpId(2, 20)); |
598 | 1 | queue_->SetLeaderMode( |
599 | 1 | committed_op_id, committed_op_id.term, committed_op_id, |
600 | 1 | BuildRaftConfigPBForTests(3)); |
601 | | |
602 | | // Now get a request for a simulated old leader, which contains more operations |
603 | | // in term 1 than the new leader has. |
604 | | // The queue should realize that the old leader's last received doesn't exist |
605 | | // and send it operations starting at the old leader's committed index. |
606 | 1 | ConsensusRequestPB request; |
607 | 1 | ConsensusResponsePB response; |
608 | 1 | response.set_responder_uuid(kPeerUuid); |
609 | | |
610 | 1 | queue_->TrackPeer(kPeerUuid); |
611 | | |
612 | | // Ask for a request. The queue assumes the peer is up-to-date so |
613 | | // this should contain no operations. |
614 | 1 | ReplicateMsgsHolder refs; |
615 | 1 | bool needs_remote_bootstrap; |
616 | 1 | ASSERT_OK(queue_->RequestForPeer(kPeerUuid, &request, &refs, &needs_remote_bootstrap)); |
617 | 1 | ASSERT_FALSE(needs_remote_bootstrap); |
618 | 1 | ASSERT_EQ(request.ops_size(), 0); |
619 | 1 | ASSERT_OPID_EQ(request.preceding_id(), MakeOpId(2, 20)); |
620 | 1 | ASSERT_EQ(OpId::FromPB(request.committed_op_id()), committed_op_id); |
621 | | |
622 | | // The old leader was still in term 1 but it increased its term with our request. |
623 | 1 | response.set_responder_term(2); |
624 | | |
625 | | // We emulate that the old leader had 25 total operations in Term 1 (15 more than we knew about) |
626 | | // which were never committed, and that its last known committed index was 5. |
627 | 1 | ConsensusStatusPB* status = response.mutable_status(); |
628 | 1 | status->mutable_last_received()->CopyFrom(MakeOpId(1, 25)); |
629 | 1 | status->mutable_last_received_current_leader()->CopyFrom(MinimumOpId()); |
630 | 1 | status->set_last_committed_idx(5); |
631 | 1 | ConsensusErrorPB* error = status->mutable_error(); |
632 | 1 | error->set_code(ConsensusErrorPB::PRECEDING_ENTRY_DIDNT_MATCH); |
633 | 1 | StatusToPB(STATUS(IllegalState, "LMP failed."), error->mutable_status()); |
634 | | |
635 | | // The queue should reply that there are more operations pending. |
636 | 1 | ASSERT_TRUE(queue_->ResponseFromPeer(response.responder_uuid(), response)); |
637 | 1 | request.Clear(); |
638 | | |
639 | | // We're waiting for a two nodes. The all committed watermark should be |
640 | | // 0.0 since we haven't had a successful exchange with the 'remote' peer. |
641 | 1 | ASSERT_EQ(queue_->TEST_GetAllReplicatedIndex(), OpId::Min()); |
642 | 1 | ASSERT_EQ(queue_->TEST_GetLastAppliedOpId(), committed_op_id); |
643 | 1 | ASSERT_EQ(queue_->GetAllAppliedOpId(), OpId::Min()); |
644 | | |
645 | | // Test even when a correct peer responds (meaning we actually get to execute |
646 | | // watermark advancement) we sill have the same all-replicated watermark. |
647 | 1 | auto replicate = CreateDummyReplicate(2, 21, clock_->Now(), 0); |
648 | 1 | ASSERT_OK(queue_->TEST_AppendOperation(replicate)); |
649 | 1 | WaitForLocalPeerToAckIndex(21); |
650 | | |
651 | 1 | ASSERT_EQ(queue_->TEST_GetAllReplicatedIndex(), OpId::Min()); |
652 | 1 | ASSERT_EQ(queue_->TEST_GetLastAppliedOpId(), committed_op_id); |
653 | 1 | ASSERT_EQ(queue_->GetAllAppliedOpId(), OpId::Min()); |
654 | | |
655 | | // Generate another request for the remote peer, which should include |
656 | | // all of the ops since the peer's last-known committed index. |
657 | 1 | refs.Reset(); |
658 | 1 | ASSERT_OK(queue_->RequestForPeer(kPeerUuid, &request, &refs, &needs_remote_bootstrap)); |
659 | 1 | ASSERT_FALSE(needs_remote_bootstrap); |
660 | 1 | ASSERT_OPID_EQ(MakeOpId(1, 5), request.preceding_id()); |
661 | 1 | ASSERT_EQ(16, request.ops_size()); |
662 | | |
663 | | // Now when we respond the watermarks should advance. |
664 | 1 | response.mutable_status()->clear_error(); |
665 | 1 | SetLastReceivedAndLastCommitted(&response, OpId(2, 21), 5); |
666 | 1 | queue_->ResponseFromPeer(response.responder_uuid(), response); |
667 | | |
668 | | // Now the watermark should have advanced. |
669 | 1 | const auto expected_op_id = OpId(2, 21); |
670 | 1 | ASSERT_EQ(queue_->TEST_GetAllReplicatedIndex(), expected_op_id); |
671 | 1 | consensus_->WaitForMajorityReplicatedIndex(expected_op_id.index); |
672 | 1 | ASSERT_EQ(queue_->TEST_GetLastAppliedOpId(), expected_op_id); |
673 | 1 | } |
674 | | |
675 | | // Test for a bug where we wouldn't move any watermark back, when overwriting |
676 | | // operations, which would cause a check failure on the write immediately |
677 | | // following the overwriting write. |
678 | 1 | TEST_F(ConsensusQueueTest, TestQueueMovesWatermarksBackward) { |
679 | 1 | RestartSafeCoarseMonoClock restart_safe_coarse_mono_clock; |
680 | 1 | queue_->Init(OpId::Min()); |
681 | 1 | queue_->SetNonLeaderMode(); |
682 | | // Append a bunch of messages. |
683 | 1 | AppendReplicateMessagesToQueue(queue_.get(), clock_, 1, 10); |
684 | 1 | ASSERT_OK(log_->WaitUntilAllFlushed()); |
685 | 1 | ASSERT_EQ(queue_->TEST_GetLastAppended(), OpId(1, 10)); |
686 | | // Now rewrite some of the operations and wait for the log to append. |
687 | 1 | ASSERT_OK(queue_->AppendOperations( |
688 | 1 | { CreateDummyReplicate(2, 5, clock_->Now(), 0) }, yb::OpId() /* committed_op_id */, |
689 | 1 | restart_safe_coarse_mono_clock.Now())); |
690 | | |
691 | | // Wait for the operation to be in the log. |
692 | 1 | log_->WaitForSafeOpIdToApply(yb::OpId(2, 5)); |
693 | | |
694 | | // Without the fix the following append would trigger a check failure |
695 | | // in log cache. |
696 | 1 | ASSERT_OK(queue_->AppendOperations( |
697 | 1 | { CreateDummyReplicate(2, 6, clock_->Now(), 0) }, yb::OpId() /* committed_op_id */, |
698 | 1 | restart_safe_coarse_mono_clock.Now())); |
699 | | |
700 | | // Wait for the operation to be in the log. |
701 | 1 | log_->WaitForSafeOpIdToApply(yb::OpId(2, 6)); |
702 | | |
703 | | // Now the last appended should have moved backward. |
704 | 1 | ASSERT_OK(WaitFor( |
705 | 1 | [this]() { return queue_->TEST_GetLastAppended() == OpId(2, 6); }, |
706 | 1 | 1s, "AllReplicatedOpIdForTests", 10ms)); |
707 | 1 | } |
708 | | |
709 | | // Tests that we're advancing the watermarks properly and only when the peer |
710 | | // has a prefix of our log. This also tests for a specific bug that we had. Here's |
711 | | // the scenario: |
712 | | // Peer would report: |
713 | | // - last received 75.49 |
714 | | // - last committed 72.31 |
715 | | // |
716 | | // Queue has messages: |
717 | | // 72.31-72.45 |
718 | | // 73.46-73.51 |
719 | | // 76.52-76.53 |
720 | | // |
721 | | // The queue has more messages than the peer, but the peer has messages |
722 | | // that the queue doesn't and which will be overwritten. |
723 | | // |
724 | | // In the first round of negotiation the peer would report LMP mismatch. |
725 | | // In the second round the queue would try to send it messages starting at 75.49 |
726 | | // but since that message didn't exist in the queue's log it would instead send |
727 | | // messages starting at 72.31. However, because the batches were big it was only |
728 | | // able to send a few messages (e.g. up to 72.40). |
729 | | // |
730 | | // Since in this last exchange everything went ok (the peer still doesn't know |
731 | | // that messages will be overwritten later), the queue would mark the exchange |
732 | | // as successful and the peer's last received would be taken into account when |
733 | | // calculating watermarks, which was incorrect. |
734 | 1 | TEST_F(ConsensusQueueTest, TestOnlyAdvancesWatermarkWhenPeerHasAPrefixOfOurLog) { |
735 | 1 | FLAGS_consensus_max_batch_size_bytes = 1024 * 10; |
736 | | |
737 | 1 | queue_->Init(OpId(72, 30)); |
738 | 1 | queue_->SetLeaderMode(OpId(72, 31), 76, OpId(72, 31), BuildRaftConfigPBForTests(3)); |
739 | | |
740 | 1 | ConsensusRequestPB request; |
741 | 1 | ConsensusResponsePB response; |
742 | | |
743 | | // We expect the majority replicated watermark to star at the committed index. |
744 | 1 | OpId expected_majority_replicated(72, 31); |
745 | | // We expect the all replicated watermark to be reset when we track a new peer. |
746 | 1 | OpId expected_all_replicated = OpId::Min(); |
747 | 1 | auto expected_last_applied = expected_majority_replicated; |
748 | | |
749 | 1 | ASSERT_EQ(queue_->TEST_GetMajorityReplicatedOpId(), expected_majority_replicated); |
750 | 1 | ASSERT_EQ(queue_->TEST_GetAllReplicatedIndex(), expected_all_replicated); |
751 | 1 | ASSERT_EQ(queue_->TEST_GetLastAppliedOpId(), expected_last_applied); |
752 | | |
753 | 1 | ASSERT_TRUE(UpdatePeerWatermarkToOp(&request, &response, MakeOpId(75, 49), MinimumOpId(), 31)); |
754 | | |
755 | 24 | for (int i = 31; i <= 53; i++) { |
756 | 23 | if (i <= 45) { |
757 | 15 | ASSERT_OK(AppendReplicateMsg(72, i, 1024)); |
758 | 15 | continue; |
759 | 8 | } |
760 | 8 | if (i <= 51) { |
761 | 6 | ASSERT_OK(AppendReplicateMsg(73, i, 1024)); |
762 | 6 | continue; |
763 | 2 | } |
764 | 2 | ASSERT_OK(AppendReplicateMsg(76, i, 1024)); |
765 | 2 | } |
766 | | |
767 | 1 | WaitForLocalPeerToAckIndex(53); |
768 | | |
769 | | // When we get operations for this peer we should get them starting immediately after |
770 | | // the committed index, for a total of 9 operations. |
771 | 1 | ReplicateMsgsHolder refs; |
772 | 1 | bool needs_remote_bootstrap; |
773 | 1 | ASSERT_OK(queue_->RequestForPeer(kPeerUuid, &request, &refs, &needs_remote_bootstrap)); |
774 | 1 | ASSERT_FALSE(needs_remote_bootstrap); |
775 | 1 | ASSERT_EQ(request.ops_size(), 9); |
776 | 1 | ASSERT_OPID_EQ(request.ops(0).id(), MakeOpId(72, 32)); |
777 | 1 | const OpIdPB* last_op = &request.ops(request.ops_size() - 1).id(); |
778 | | |
779 | | // When the peer acks that it received an operation that is not in our current |
780 | | // term, it gets ignored in terms of watermark advancement. |
781 | 1 | SetLastReceivedAndLastCommitted(&response, OpId(75, 49), OpId::FromPB(*last_op), 31); |
782 | 1 | ASSERT_TRUE(queue_->ResponseFromPeer(response.responder_uuid(), response)); |
783 | | |
784 | | // We've sent (and received and ack) up to 72.40 from the remote peer |
785 | 1 | expected_majority_replicated = OpId(72, 40); |
786 | 1 | expected_all_replicated = expected_majority_replicated; |
787 | 1 | expected_last_applied = expected_majority_replicated; |
788 | | |
789 | 1 | ASSERT_EQ(queue_->TEST_GetMajorityReplicatedOpId(), expected_majority_replicated); |
790 | 1 | ASSERT_EQ(queue_->TEST_GetAllReplicatedIndex(), expected_all_replicated); |
791 | 1 | consensus_->WaitForMajorityReplicatedIndex(expected_last_applied.index); |
792 | 1 | ASSERT_EQ(queue_->TEST_GetLastAppliedOpId(), expected_last_applied); |
793 | | |
794 | | // Another request for this peer should get another page of messages. Still not |
795 | | // on the queue's term (and thus without advancing watermarks). |
796 | 1 | refs.Reset(); |
797 | 1 | ASSERT_OK(queue_->RequestForPeer(kPeerUuid, &request, &refs, &needs_remote_bootstrap)); |
798 | 1 | ASSERT_FALSE(needs_remote_bootstrap); |
799 | 1 | ASSERT_EQ(request.ops_size(), 9); |
800 | 1 | ASSERT_OPID_EQ(request.ops(0).id(), MakeOpId(72, 41)); |
801 | 1 | last_op = &request.ops(request.ops_size() - 1).id(); |
802 | | |
803 | 1 | SetLastReceivedAndLastCommitted(&response, OpId(75, 49), OpId::FromPB(*last_op), 31); |
804 | 1 | queue_->ResponseFromPeer(response.responder_uuid(), response); |
805 | | |
806 | | // We've now sent (and received an ack) up to 73.39 |
807 | 1 | expected_majority_replicated = OpId(73, 49); |
808 | 1 | expected_all_replicated = expected_majority_replicated; |
809 | 1 | expected_last_applied = expected_majority_replicated; |
810 | | |
811 | 1 | ASSERT_EQ(queue_->TEST_GetMajorityReplicatedOpId(), expected_majority_replicated); |
812 | 1 | ASSERT_EQ(queue_->TEST_GetAllReplicatedIndex(), expected_all_replicated); |
813 | 1 | consensus_->WaitForMajorityReplicatedIndex(expected_last_applied.index); |
814 | 1 | ASSERT_EQ(queue_->TEST_GetLastAppliedOpId(), expected_last_applied); |
815 | | |
816 | | // The last page of request should overwrite the peer's operations and the |
817 | | // response should finally advance the watermarks. |
818 | 1 | refs.Reset(); |
819 | 1 | ASSERT_OK(queue_->RequestForPeer(kPeerUuid, &request, &refs, &needs_remote_bootstrap)); |
820 | 1 | ASSERT_FALSE(needs_remote_bootstrap); |
821 | 1 | ASSERT_EQ(request.ops_size(), 4); |
822 | 1 | ASSERT_OPID_EQ(request.ops(0).id(), MakeOpId(73, 50)); |
823 | | |
824 | | // We're done, both watermarks should be at the end. |
825 | 1 | expected_majority_replicated = OpId(76, 53); |
826 | 1 | expected_all_replicated = expected_majority_replicated; |
827 | 1 | expected_last_applied = expected_majority_replicated; |
828 | | |
829 | 1 | SetLastReceivedAndLastCommitted(&response, expected_majority_replicated, |
830 | 1 | expected_majority_replicated, 31); |
831 | 1 | queue_->ResponseFromPeer(response.responder_uuid(), response); |
832 | | |
833 | 1 | ASSERT_EQ(queue_->TEST_GetMajorityReplicatedOpId(), expected_majority_replicated); |
834 | 1 | ASSERT_EQ(queue_->TEST_GetAllReplicatedIndex(), expected_all_replicated); |
835 | 1 | consensus_->WaitForMajorityReplicatedIndex(expected_last_applied.index); |
836 | 1 | ASSERT_EQ(queue_->TEST_GetLastAppliedOpId(), expected_last_applied); |
837 | | |
838 | 1 | request.mutable_ops()->ExtractSubrange(0, request.ops().size(), nullptr); |
839 | 1 | } |
840 | | |
841 | | // Test that remote bootstrap is triggered when a "tablet not found" error occurs. |
842 | 1 | TEST_F(ConsensusQueueTest, TestTriggerRemoteBootstrapIfTabletNotFound) { |
843 | 1 | queue_->Init(OpId::Min()); |
844 | 1 | queue_->SetLeaderMode( |
845 | 1 | OpId::Min(), OpId::Min().term, OpId::Min(), BuildRaftConfigPBForTests(3)); |
846 | 1 | AppendReplicateMessagesToQueue(queue_.get(), clock_, 1, 100); |
847 | | |
848 | 1 | ConsensusRequestPB request; |
849 | 1 | ConsensusResponsePB response; |
850 | 1 | response.set_responder_uuid(kPeerUuid); |
851 | 1 | queue_->TrackPeer(kPeerUuid); |
852 | | |
853 | | // Create request for new peer. |
854 | 1 | ReplicateMsgsHolder refs; |
855 | 1 | bool needs_remote_bootstrap; |
856 | 1 | ASSERT_OK(queue_->RequestForPeer(kPeerUuid, &request, &refs, &needs_remote_bootstrap)); |
857 | 1 | ASSERT_FALSE(needs_remote_bootstrap); |
858 | | |
859 | | // Peer responds with tablet not found. |
860 | 1 | response.mutable_error()->set_code(tserver::TabletServerErrorPB::TABLET_NOT_FOUND); |
861 | 1 | StatusToPB(STATUS(NotFound, "No such tablet"), response.mutable_error()->mutable_status()); |
862 | | |
863 | | // If the peer needs remote bootstrap, more_pending should be set to true. |
864 | 1 | ASSERT_TRUE(queue_->ResponseFromPeer(kPeerUuid, response)); |
865 | | |
866 | | // On the next request, we should find out that the queue wants us to remotely bootstrap. |
867 | 1 | request.Clear(); |
868 | 1 | refs.Reset(); |
869 | 1 | ASSERT_OK(queue_->RequestForPeer(kPeerUuid, &request, &refs, &needs_remote_bootstrap)); |
870 | 1 | ASSERT_TRUE(needs_remote_bootstrap); |
871 | | |
872 | 1 | StartRemoteBootstrapRequestPB rb_req; |
873 | 1 | ASSERT_OK(queue_->GetRemoteBootstrapRequestForPeer(kPeerUuid, &rb_req)); |
874 | | |
875 | 2 | ASSERT_TRUE(rb_req.IsInitialized()) << rb_req.ShortDebugString(); |
876 | 1 | ASSERT_EQ(kTestTablet, rb_req.tablet_id()); |
877 | 1 | ASSERT_EQ(kLeaderUuid, rb_req.bootstrap_peer_uuid()); |
878 | 1 | ASSERT_EQ(FakeRaftPeerPB(kLeaderUuid).last_known_private_addr()[0].ShortDebugString(), |
879 | 1 | rb_req.source_private_addr()[0].ShortDebugString()); |
880 | 1 | } |
881 | | |
882 | | // Tests that ReadReplicatedMessagesForCDC() only reads messages until the last known |
883 | | // committed index. |
884 | 1 | TEST_F(ConsensusQueueTest, TestReadReplicatedMessagesForCDC) { |
885 | 1 | auto start_op_id = MakeOpIdForIndex(3); // Starting after the normal first index. |
886 | 1 | queue_->Init(start_op_id); |
887 | 1 | queue_->SetLeaderMode( |
888 | 1 | start_op_id, start_op_id.term, start_op_id, BuildRaftConfigPBForTests(2)); |
889 | 1 | queue_->TrackPeer(kPeerUuid); |
890 | | |
891 | 1 | AppendReplicateMessagesToQueue(queue_.get(), clock_, start_op_id.index, kNumMessages); |
892 | | |
893 | | // Wait for the local peer to append all messages. |
894 | 1 | WaitForLocalPeerToAckIndex(kNumMessages); |
895 | | |
896 | | // Since only the local log might have ACKed at this point, |
897 | | // the committed_index should be MinimumOpId(). |
898 | 1 | queue_->raft_pool_observers_token_->Wait(); |
899 | 1 | ASSERT_EQ(queue_->TEST_GetCommittedIndex(), start_op_id); |
900 | 1 | ASSERT_EQ(queue_->TEST_GetLastAppliedOpId(), start_op_id); |
901 | | |
902 | 1 | ConsensusResponsePB response; |
903 | 1 | response.set_responder_uuid(kPeerUuid); |
904 | | |
905 | 1 | int last_committed_index = kNumMessages - 20; |
906 | | // Ack last_committed_index messages. |
907 | 1 | SetLastReceivedAndLastCommitted(&response, MakeOpIdForIndex(last_committed_index)); |
908 | 1 | ASSERT_TRUE(queue_->ResponseFromPeer(response.responder_uuid(), response)); |
909 | 1 | queue_->raft_pool_observers_token_->Wait(); |
910 | 1 | const auto expected_op_id = MakeOpIdForIndex(last_committed_index); |
911 | 1 | ASSERT_EQ(queue_->TEST_GetCommittedIndex(), expected_op_id); |
912 | 1 | consensus_->WaitForMajorityReplicatedIndex(expected_op_id.index); |
913 | 1 | ASSERT_EQ(queue_->TEST_GetLastAppliedOpId(), expected_op_id); |
914 | | |
915 | | // Read from the start_op_id |
916 | 1 | auto read_result = ASSERT_RESULT(queue_->ReadReplicatedMessagesForCDC(MakeOpIdForIndex(3))); |
917 | 1 | ASSERT_EQ(last_committed_index - start_op_id.index, read_result.messages.size()); |
918 | | |
919 | | // Start reading from 0.0 and ensure that we get the first known OpID. |
920 | 1 | read_result = ASSERT_RESULT(queue_->ReadReplicatedMessagesForCDC(MakeOpIdForIndex(0))); |
921 | 1 | ASSERT_EQ(last_committed_index - start_op_id.index, read_result.messages.size()); |
922 | | |
923 | | // Read from some index > 0 |
924 | 1 | int start = 10; |
925 | 1 | read_result = ASSERT_RESULT(queue_->ReadReplicatedMessagesForCDC(MakeOpIdForIndex(start))); |
926 | 1 | ASSERT_EQ(last_committed_index - start, read_result.messages.size()); |
927 | 1 | } |
928 | | |
929 | | } // namespace consensus |
930 | | } // namespace yb |