/Users/deen/code/yugabyte-db/src/yb/consensus/consensus_peers-test.cc
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | // |
18 | | // The following only applies to changes made to this file as part of YugaByte development. |
19 | | // |
20 | | // Portions Copyright (c) YugaByte, Inc. |
21 | | // |
22 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
23 | | // in compliance with the License. You may obtain a copy of the License at |
24 | | // |
25 | | // http://www.apache.org/licenses/LICENSE-2.0 |
26 | | // |
27 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
28 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
29 | | // or implied. See the License for the specific language governing permissions and limitations |
30 | | // under the License. |
31 | | // |
32 | | |
33 | | #include <gtest/gtest.h> |
34 | | |
35 | | #include "yb/common/schema.h" |
36 | | #include "yb/common/wire_protocol-test-util.h" |
37 | | |
38 | | #include "yb/consensus/consensus-test-util.h" |
39 | | #include "yb/consensus/log.h" |
40 | | #include "yb/consensus/log_util.h" |
41 | | #include "yb/consensus/opid_util.h" |
42 | | |
43 | | #include "yb/fs/fs_manager.h" |
44 | | |
45 | | #include "yb/rpc/messenger.h" |
46 | | |
47 | | #include "yb/server/hybrid_clock.h" |
48 | | |
49 | | #include "yb/util/logging.h" |
50 | | #include "yb/util/metrics.h" |
51 | | #include "yb/util/opid.h" |
52 | | #include "yb/util/scope_exit.h" |
53 | | #include "yb/util/status_log.h" |
54 | | #include "yb/util/test_macros.h" |
55 | | #include "yb/util/test_util.h" |
56 | | #include "yb/util/threadpool.h" |
57 | | |
58 | | using namespace std::chrono_literals; |
59 | | |
60 | | METRIC_DECLARE_entity(tablet); |
61 | | |
62 | | namespace yb { |
63 | | namespace consensus { |
64 | | |
65 | | using log::Log; |
66 | | using log::LogOptions; |
67 | | using log::LogAnchorRegistry; |
68 | | using rpc::Messenger; |
69 | | using rpc::MessengerBuilder; |
70 | | using std::shared_ptr; |
71 | | using std::unique_ptr; |
72 | | |
73 | | const char* kTableId = "test-peers-table"; |
74 | | const char* kTabletId = "test-peers-tablet"; |
75 | | const char* kLeaderUuid = "peer-0"; |
76 | | const char* kFollowerUuid = "peer-1"; |
77 | | |
78 | | class ConsensusPeersTest : public YBTest { |
79 | | public: |
80 | | ConsensusPeersTest() |
81 | | : metric_entity_(METRIC_ENTITY_tablet.Instantiate(&metric_registry_, "peer-test")), |
82 | 5 | schema_(GetSimpleTestSchema()) { |
83 | 5 | } |
84 | | |
85 | 5 | void SetUp() override { |
86 | 5 | YBTest::SetUp(); |
87 | 5 | MessengerBuilder bld("test"); |
88 | 5 | messenger_ = ASSERT_RESULT(bld.Build()); |
89 | 5 | ASSERT_OK(ThreadPoolBuilder("test-raft-pool").Build(&raft_pool_)); |
90 | 5 | raft_pool_token_ = raft_pool_->NewToken(ThreadPool::ExecutionMode::CONCURRENT); |
91 | 5 | ASSERT_OK(ThreadPoolBuilder("log").Build(&log_thread_pool_)); |
92 | 5 | fs_manager_.reset(new FsManager(env_.get(), GetTestPath("fs_root"), "tserver_test")); |
93 | | |
94 | 5 | ASSERT_OK(fs_manager_->CreateInitialFileSystemLayout()); |
95 | 5 | ASSERT_OK(fs_manager_->Open()); |
96 | 5 | ASSERT_OK(Log::Open(options_, |
97 | 5 | kTabletId, |
98 | 5 | fs_manager_->GetFirstTabletWalDirOrDie(kTableId, kTabletId), |
99 | 5 | fs_manager_->uuid(), |
100 | 5 | schema_, |
101 | 5 | 0, // schema_version |
102 | 5 | nullptr, // table_metric_entity |
103 | 5 | nullptr, // tablet_metric_entity |
104 | 5 | log_thread_pool_.get(), |
105 | 5 | log_thread_pool_.get(), |
106 | 5 | std::numeric_limits<int64_t>::max(), // cdc_min_replicated_index |
107 | 5 | &log_)); |
108 | 5 | clock_.reset(new server::HybridClock()); |
109 | 5 | ASSERT_OK(clock_->Init()); |
110 | | |
111 | 5 | consensus_.reset(new TestRaftConsensusQueueIface()); |
112 | 5 | message_queue_.reset(new PeerMessageQueue( |
113 | 5 | metric_entity_, |
114 | 5 | log_.get(), |
115 | 5 | nullptr /* server_tracker */, |
116 | 5 | nullptr /* parent_tracker */, |
117 | 5 | FakeRaftPeerPB(kLeaderUuid), |
118 | 5 | kTabletId, |
119 | 5 | clock_, |
120 | 5 | nullptr /* consensus_context */, |
121 | 5 | raft_pool_->NewToken(ThreadPool::ExecutionMode::SERIAL))); |
122 | 5 | message_queue_->RegisterObserver(consensus_.get()); |
123 | | |
124 | 5 | message_queue_->Init(OpId::Min()); |
125 | 5 | message_queue_->SetLeaderMode(OpId::Min(), |
126 | 5 | OpId().term, |
127 | 5 | OpId::Min(), |
128 | 5 | BuildRaftConfigPBForTests(3)); |
129 | 5 | } |
130 | | |
131 | 5 | void TearDown() override { |
132 | 5 | ASSERT_OK(log_->WaitUntilAllFlushed()); |
133 | 5 | log_thread_pool_->Shutdown(); |
134 | 5 | raft_pool_->Shutdown(); |
135 | 5 | messenger_->Shutdown(); |
136 | 5 | } |
137 | | |
138 | | DelayablePeerProxy<NoOpTestPeerProxy>* NewRemotePeer( |
139 | | const string& peer_name, |
140 | 5 | std::shared_ptr<Peer>* peer) { |
141 | 5 | RaftPeerPB peer_pb; |
142 | 5 | peer_pb.set_permanent_uuid(peer_name); |
143 | 5 | auto proxy_ptr = new DelayablePeerProxy<NoOpTestPeerProxy>( |
144 | 5 | raft_pool_.get(), new NoOpTestPeerProxy(raft_pool_.get(), peer_pb)); |
145 | 5 | *peer = CHECK_RESULT(Peer::NewRemotePeer( |
146 | 5 | peer_pb, kTabletId, kLeaderUuid, PeerProxyPtr(proxy_ptr), message_queue_.get(), |
147 | 5 | nullptr /* multi raft batcher */, raft_pool_token_.get(), |
148 | 5 | nullptr /* consensus */, messenger_.get())); |
149 | 5 | return proxy_ptr; |
150 | 5 | } |
151 | | |
152 | 1 | void CheckLastLogEntry(int64_t term, int64_t index) { |
153 | 1 | ASSERT_EQ(log_->GetLatestEntryOpId(), OpId(term, index)); |
154 | 1 | } |
155 | | |
156 | | void CheckLastRemoteEntry( |
157 | 2 | DelayablePeerProxy<NoOpTestPeerProxy>* proxy, int64_t term, int64_t index) { |
158 | 2 | ASSERT_EQ(OpId::FromPB(proxy->proxy()->last_received()), OpId(term, index)); |
159 | 2 | } |
160 | | |
161 | | protected: |
162 | | unique_ptr<ThreadPool> raft_pool_; |
163 | | std::unique_ptr<TestRaftConsensusQueueIface> consensus_; |
164 | | MetricRegistry metric_registry_; |
165 | | scoped_refptr<MetricEntity> metric_entity_; |
166 | | std::unique_ptr<FsManager> fs_manager_; |
167 | | unique_ptr<ThreadPool> log_thread_pool_; |
168 | | scoped_refptr<Log> log_; |
169 | | std::unique_ptr<PeerMessageQueue> message_queue_; |
170 | | const Schema schema_; |
171 | | LogOptions options_; |
172 | | unique_ptr<ThreadPoolToken> raft_pool_token_; |
173 | | scoped_refptr<server::Clock> clock_; |
174 | | std::unique_ptr<Messenger> messenger_; |
175 | | }; |
176 | | |
177 | | // Tests that a remote peer is correctly built and tracked |
178 | | // by the message queue. |
179 | | // After the operations are considered done the proxy (which |
180 | | // simulates the other endpoint) should reflect the replicated |
181 | | // messages. |
182 | 1 | TEST_F(ConsensusPeersTest, TestRemotePeer) { |
183 | | // We use a majority size of 2 since we make one fake remote peer |
184 | | // in addition to our real local log. |
185 | | |
186 | 1 | std::shared_ptr<Peer> remote_peer; |
187 | 1 | auto se = ScopeExit([&remote_peer] { |
188 | | // This guarantees that the Peer object doesn't get destroyed if there is a pending request. |
189 | 1 | remote_peer->Close(); |
190 | 1 | }); |
191 | | |
192 | 1 | DelayablePeerProxy<NoOpTestPeerProxy>* proxy = NewRemotePeer(kFollowerUuid, &remote_peer); |
193 | | |
194 | | // Append a bunch of messages to the queue. |
195 | 1 | AppendReplicateMessagesToQueue(message_queue_.get(), clock_, 1, 20); |
196 | | |
197 | | // The above append ends up appending messages in term 2, so we update the peer's term to match. |
198 | 1 | remote_peer->SetTermForTest(2); |
199 | | |
200 | | // signal the peer there are requests pending. |
201 | 1 | ASSERT_OK(remote_peer->SignalRequest(RequestTriggerMode::kNonEmptyOnly)); |
202 | | |
203 | | // Now wait on the status of the last operation. This will complete once the peer has logged all |
204 | | // requests. |
205 | 1 | consensus_->WaitForMajorityReplicatedIndex(20); |
206 | | // Verify that the replicated watermark corresponds to the last replicated message. |
207 | 1 | CheckLastRemoteEntry(proxy, 2, 20); |
208 | 1 | } |
209 | | |
210 | 1 | TEST_F(ConsensusPeersTest, TestLocalAppendAndRemotePeerDelay) { |
211 | | // Create a set of remote peers. |
212 | 1 | std::shared_ptr<Peer> remote_peer1; |
213 | 1 | NewRemotePeer("peer-1", &remote_peer1); |
214 | | |
215 | 1 | std::shared_ptr<Peer> remote_peer2; |
216 | 1 | DelayablePeerProxy<NoOpTestPeerProxy>* remote_peer2_proxy = |
217 | 1 | NewRemotePeer("peer-2", &remote_peer2); |
218 | | |
219 | 1 | auto se = ScopeExit([&remote_peer1, &remote_peer2] { |
220 | | // This guarantees that the Peer objects don't get destroyed if there is a pending request. |
221 | 1 | remote_peer1->Close(); |
222 | 1 | remote_peer2->Close(); |
223 | 1 | }); |
224 | | |
225 | | // Delay the response from the second remote peer. |
226 | 1 | const auto kAppendDelayTime = 1s; |
227 | 1 | log_->TEST_SetSleepDuration(kAppendDelayTime); |
228 | 1 | remote_peer2_proxy->DelayResponse(); |
229 | 1 | auto se2 = ScopeExit([this, &remote_peer2_proxy] { |
230 | 1 | log_->TEST_SetSleepDuration(0s); |
231 | 1 | remote_peer2_proxy->Respond(TestPeerProxy::kUpdate); |
232 | 1 | }); |
233 | | |
234 | | // Append one message to the queue. |
235 | 1 | const auto start_time = MonoTime::Now(); |
236 | 1 | OpIdPB first = MakeOpId(0, 1); |
237 | 1 | AppendReplicateMessagesToQueue(message_queue_.get(), clock_, first.index(), 1); |
238 | | |
239 | 1 | ASSERT_OK(remote_peer1->SignalRequest(RequestTriggerMode::kNonEmptyOnly)); |
240 | 1 | ASSERT_OK(remote_peer2->SignalRequest(RequestTriggerMode::kNonEmptyOnly)); |
241 | | |
242 | | // Replication should time out, because of the delayed response. |
243 | 1 | consensus_->WaitForMajorityReplicatedIndex(first.index()); |
244 | 1 | const auto elapsed_time = MonoTime::Now() - start_time; |
245 | 1 | LOG(INFO) << "Replication elapsed time: " << elapsed_time; |
246 | | // Replication should take at least as much time as it takes the local peer to append, because |
247 | | // there is only one remote peer that is responding. |
248 | 1 | ASSERT_GE(elapsed_time, kAppendDelayTime); |
249 | 1 | } |
250 | | |
251 | 1 | TEST_F(ConsensusPeersTest, TestRemotePeers) { |
252 | | // Create a set of remote peers. |
253 | 1 | std::shared_ptr<Peer> remote_peer1; |
254 | 1 | DelayablePeerProxy<NoOpTestPeerProxy>* remote_peer1_proxy = |
255 | 1 | NewRemotePeer("peer-1", &remote_peer1); |
256 | | |
257 | 1 | std::shared_ptr<Peer> remote_peer2; |
258 | 1 | DelayablePeerProxy<NoOpTestPeerProxy>* remote_peer2_proxy = |
259 | 1 | NewRemotePeer("peer-2", &remote_peer2); |
260 | | |
261 | 1 | auto se = ScopeExit([&remote_peer1, &remote_peer2] { |
262 | | // This guarantees that the Peer objects don't get destroyed if there is a pending request. |
263 | 1 | remote_peer1->Close(); |
264 | 1 | remote_peer2->Close(); |
265 | 1 | }); |
266 | | |
267 | | // Delay the response from the second remote peer. |
268 | 1 | remote_peer2_proxy->DelayResponse(); |
269 | | |
270 | | // Append one message to the queue. |
271 | 1 | OpId first(0, 1); |
272 | | |
273 | 1 | AppendReplicateMessagesToQueue(message_queue_.get(), clock_, first.index, 1); |
274 | | |
275 | 1 | ASSERT_OK(remote_peer1->SignalRequest(RequestTriggerMode::kNonEmptyOnly)); |
276 | 1 | ASSERT_OK(remote_peer2->SignalRequest(RequestTriggerMode::kNonEmptyOnly)); |
277 | | |
278 | | // Now wait for the message to be replicated, this should succeed since |
279 | | // majority = 2 and only one peer was delayed. The majority is made up |
280 | | // of remote-peer1 and the local log. |
281 | 1 | consensus_->WaitForMajorityReplicatedIndex(first.index); |
282 | | |
283 | 1 | SCOPED_TRACE(Format( |
284 | 1 | "Written to log locally: $0, Received by peer1: {$1}, by peer2: {$2}", |
285 | 1 | log_->GetLatestEntryOpId(), |
286 | 1 | remote_peer1_proxy->proxy()->last_received(), |
287 | 1 | remote_peer2_proxy->proxy()->last_received())); |
288 | | |
289 | 1 | CheckLastLogEntry(first.term, first.index); |
290 | 1 | CheckLastRemoteEntry(remote_peer1_proxy, first.term, first.index); |
291 | | |
292 | 1 | remote_peer2_proxy->Respond(TestPeerProxy::kUpdate); |
293 | | // Wait until all peers have replicated the message, otherwise |
294 | | // when we add the next one remote_peer2 might find the next message |
295 | | // in the queue and will replicate it, which is not what we want. |
296 | 2 | while (message_queue_->TEST_GetAllReplicatedIndex() != first) { |
297 | 1 | std::this_thread::sleep_for(1ms); |
298 | 1 | } |
299 | | |
300 | | // Now append another message to the queue. |
301 | 1 | AppendReplicateMessagesToQueue(message_queue_.get(), clock_, 2, 1); |
302 | | |
303 | | // We should not see it replicated, even after 10ms, |
304 | | // since only the local peer replicates the message. |
305 | 1 | SleepFor(MonoDelta::FromMilliseconds(10)); |
306 | 1 | ASSERT_FALSE(consensus_->IsMajorityReplicated(2)); |
307 | | |
308 | | // Signal one of the two remote peers. |
309 | 1 | ASSERT_OK(remote_peer1->SignalRequest(RequestTriggerMode::kNonEmptyOnly)); |
310 | | // We should now be able to wait for it to replicate, since two peers (a majority) |
311 | | // have replicated the message. |
312 | 1 | consensus_->WaitForMajorityReplicatedIndex(2); |
313 | 1 | } |
314 | | |
315 | | // Regression test for KUDU-699: even if a peer isn't making progress, |
316 | | // and thus always has data pending, we should be able to close the peer. |
317 | 1 | TEST_F(ConsensusPeersTest, TestCloseWhenRemotePeerDoesntMakeProgress) { |
318 | 1 | auto mock_proxy = new MockedPeerProxy(raft_pool_.get()); |
319 | 1 | auto peer = ASSERT_RESULT(Peer::NewRemotePeer( |
320 | 1 | FakeRaftPeerPB(kFollowerUuid), kTabletId, kLeaderUuid, PeerProxyPtr(mock_proxy), |
321 | 1 | message_queue_.get(), nullptr /* multi raft batcher */, |
322 | 1 | raft_pool_token_.get(), nullptr /* consensus */, |
323 | 1 | messenger_.get())); |
324 | | |
325 | | // Make the peer respond without making any progress -- it always returns |
326 | | // that it has only replicated op 0.0. When we see the response, we always |
327 | | // decide that more data is pending, and we want to send another request. |
328 | 1 | ConsensusResponsePB peer_resp; |
329 | 1 | peer_resp.set_responder_uuid(kFollowerUuid); |
330 | 1 | peer_resp.set_responder_term(0); |
331 | 1 | peer_resp.mutable_status()->mutable_last_received()->CopyFrom( |
332 | 1 | MakeOpId(0, 0)); |
333 | 1 | peer_resp.mutable_status()->mutable_last_received_current_leader()->CopyFrom( |
334 | 1 | MakeOpId(0, 0)); |
335 | 1 | peer_resp.mutable_status()->set_last_committed_idx(0); |
336 | | |
337 | 1 | mock_proxy->set_update_response(peer_resp); |
338 | | |
339 | | // Add an op to the queue and start sending requests to the peer. |
340 | 1 | AppendReplicateMessagesToQueue(message_queue_.get(), clock_, 1, 1); |
341 | 1 | ASSERT_OK(peer->SignalRequest(RequestTriggerMode::kAlwaysSend)); |
342 | | |
343 | | // We should be able to close the peer even though it has more data pending. |
344 | 1 | peer->Close(); |
345 | 1 | } |
346 | | |
347 | 1 | TEST_F(ConsensusPeersTest, TestDontSendOneRpcPerWriteWhenPeerIsDown) { |
348 | 1 | auto mock_proxy = new MockedPeerProxy(raft_pool_.get()); |
349 | 1 | auto peer = ASSERT_RESULT(Peer::NewRemotePeer( |
350 | 1 | FakeRaftPeerPB(kFollowerUuid), kTabletId, kLeaderUuid, PeerProxyPtr(mock_proxy), |
351 | 1 | message_queue_.get(), nullptr /* multi raft batcher */, raft_pool_token_.get(), |
352 | 1 | nullptr /* consensus */, |
353 | 1 | messenger_.get())); |
354 | | |
355 | 1 | auto se = ScopeExit([&peer] { |
356 | | // This guarantees that the Peer object doesn't get destroyed if there is a pending request. |
357 | 1 | peer->Close(); |
358 | 1 | }); |
359 | | |
360 | | // Initial response has to be successful -- otherwise we'll consider the peer "new" and only send |
361 | | // heartbeat RPCs. |
362 | 1 | ConsensusResponsePB initial_resp; |
363 | 1 | initial_resp.set_responder_uuid(kFollowerUuid); |
364 | 1 | initial_resp.set_responder_term(0); |
365 | 1 | initial_resp.mutable_status()->mutable_last_received()->CopyFrom( |
366 | 1 | MakeOpId(1, 1)); |
367 | 1 | initial_resp.mutable_status()->mutable_last_received_current_leader()->CopyFrom( |
368 | 1 | MakeOpId(1, 1)); |
369 | 1 | initial_resp.mutable_status()->set_last_committed_idx(0); |
370 | 1 | mock_proxy->set_update_response(initial_resp); |
371 | | |
372 | 1 | AppendReplicateMessagesToQueue(message_queue_.get(), clock_, 1, 1); |
373 | 1 | LOG(INFO) << "Initial SignalRequest"; |
374 | 1 | ASSERT_OK(peer->SignalRequest(RequestTriggerMode::kAlwaysSend)); |
375 | 1 | LOG(INFO) << "Initial SignalRequest done"; |
376 | | |
377 | | // Now wait for the message to be replicated, this should succeed since the local (leader) peer |
378 | | // always acks and the follower also acked this time. |
379 | 1 | consensus_->WaitForMajorityReplicatedIndex(1); |
380 | 1 | LOG(INFO) << "Message replicated, setting error response"; |
381 | | |
382 | | // Set up the peer to respond with an error. |
383 | 1 | ConsensusResponsePB error_resp; |
384 | 1 | error_resp.mutable_error()->set_code(tserver::TabletServerErrorPB::UNKNOWN_ERROR); |
385 | 1 | StatusToPB(STATUS(NotFound, "fake error"), error_resp.mutable_error()->mutable_status()); |
386 | 1 | mock_proxy->set_update_response(error_resp); |
387 | | |
388 | | // Up to this point we might have sent a lot of updates: we get the response from the fake peer |
389 | | // that it accepted our entry, we consider it replicated, and we are trying to tell the fake peer |
390 | | // about that, but it replies with the same canned response without bumping up the committed |
391 | | // index. We can keep spinning in this loop for a few hundred times until we replace the response |
392 | | // with an error. At the end of the test we should just consider the UpdateAsync calls made after |
393 | | // this point. |
394 | 1 | int initial_update_count = mock_proxy->update_count(); |
395 | | |
396 | | // Add a bunch of messages to the queue. |
397 | 100 | for (int i = 2; i <= 100; i++) { |
398 | 99 | AppendReplicateMessagesToQueue(message_queue_.get(), clock_, i, /* count */ 1); |
399 | 99 | ASSERT_OK(peer->SignalRequest(RequestTriggerMode::kNonEmptyOnly)); |
400 | | // Sleep for a longer time during the first iteration so we have a higher chance of handling |
401 | | // the response and incrementing failed_attempts_. |
402 | 99 | std::this_thread::sleep_for(i == 2 ? 100ms : 2ms); |
403 | 99 | } |
404 | | |
405 | 1 | LOG(INFO) << EXPR_VALUE_FOR_LOG(mock_proxy->update_count()); |
406 | 1 | LOG(INFO) << EXPR_VALUE_FOR_LOG(initial_update_count); |
407 | | // Check that we didn't attempt to send one UpdateConsensus call per |
408 | | // Write. 100 writes might have taken a second or two, though, so it's |
409 | | // OK to have called UpdateConsensus() a few times due to regularly |
410 | | // scheduled heartbeats. |
411 | 1 | ASSERT_LT(mock_proxy->update_count() - initial_update_count, 5); |
412 | 1 | } |
413 | | |
414 | | } // namespace consensus |
415 | | } // namespace yb |