/Users/deen/code/yugabyte-db/src/yb/consensus/consensus-test-util.h
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | // |
18 | | // The following only applies to changes made to this file as part of YugaByte development. |
19 | | // |
20 | | // Portions Copyright (c) YugaByte, Inc. |
21 | | // |
22 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
23 | | // in compliance with the License. You may obtain a copy of the License at |
24 | | // |
25 | | // http://www.apache.org/licenses/LICENSE-2.0 |
26 | | // |
27 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
28 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
29 | | // or implied. See the License for the specific language governing permissions and limitations |
30 | | // under the License. |
31 | | // |
32 | | |
33 | | #ifndef YB_CONSENSUS_CONSENSUS_TEST_UTIL_H_ |
34 | | #define YB_CONSENSUS_CONSENSUS_TEST_UTIL_H_ |
35 | | |
36 | | #include <map> |
37 | | #include <memory> |
38 | | #include <mutex> |
39 | | #include <string> |
40 | | #include <unordered_map> |
41 | | #include <utility> |
42 | | #include <vector> |
43 | | |
44 | | #include <gmock/gmock.h> |
45 | | |
46 | | #include "yb/common/hybrid_time.h" |
47 | | #include "yb/common/wire_protocol.h" |
48 | | #include "yb/consensus/consensus.h" |
49 | | #include "yb/consensus/consensus_peers.h" |
50 | | #include "yb/consensus/consensus_queue.h" |
51 | | #include "yb/consensus/consensus_round.h" |
52 | | #include "yb/consensus/opid_util.h" |
53 | | #include "yb/consensus/raft_consensus.h" |
54 | | #include "yb/consensus/test_consensus_context.h" |
55 | | #include "yb/gutil/map-util.h" |
56 | | #include "yb/gutil/strings/substitute.h" |
57 | | #include "yb/rpc/messenger.h" |
58 | | #include "yb/rpc/rpc_test_util.h" |
59 | | #include "yb/server/clock.h" |
60 | | #include "yb/util/countdown_latch.h" |
61 | | #include "yb/util/locks.h" |
62 | | #include "yb/util/status_log.h" |
63 | | #include "yb/util/test_macros.h" |
64 | | #include "yb/util/test_util.h" |
65 | | #include "yb/util/threadpool.h" |
66 | | |
67 | | using namespace std::literals; |
68 | | |
69 | 52 | #define TOKENPASTE(x, y) x ## y |
70 | 52 | #define TOKENPASTE2(x, y) TOKENPASTE(x, y) |
71 | | |
72 | | #define ASSERT_OPID_EQ(left, right) \ |
73 | 30 | OpIdPB TOKENPASTE2(_left, __LINE__) = (left); \ |
74 | 30 | OpIdPB TOKENPASTE2(_right, __LINE__) = (right); \ |
75 | 26 | if (!consensus::OpIdEquals(TOKENPASTE2(_left, __LINE__), TOKENPASTE2(_right, __LINE__))) \ |
76 | 0 | FAIL() << "Expected: " << TOKENPASTE2(_right, __LINE__).ShortDebugString() << "\n" \ |
77 | 0 | << "Value: " << TOKENPASTE2(_left, __LINE__).ShortDebugString() << "\n" |
78 | | |
79 | | namespace yb { |
80 | | namespace consensus { |
81 | | |
82 | | using log::Log; |
83 | | using rpc::Messenger; |
84 | | using strings::Substitute; |
85 | | |
86 | | constexpr int kTermDivisor = 7; |
87 | | |
88 | 633 | inline CoarseTimePoint CoarseBigDeadline() { |
89 | 633 | return CoarseMonoClock::now() + 600s; |
90 | 633 | } |
91 | | |
92 | | inline ReplicateMsgPtr CreateDummyReplicate(int64_t term, |
93 | | int64_t index, |
94 | | const HybridTime& hybrid_time, |
95 | 952 | int64_t payload_size) { |
96 | 952 | auto msg = std::make_shared<ReplicateMsg>(); |
97 | 952 | OpIdPB* id = msg->mutable_id(); |
98 | 952 | id->set_term(term); |
99 | 952 | id->set_index(index); |
100 | | |
101 | 952 | msg->set_op_type(NO_OP); |
102 | 952 | msg->mutable_noop_request()->mutable_payload_for_tests()->resize(payload_size); |
103 | 952 | msg->set_hybrid_time(hybrid_time.ToUint64()); |
104 | 952 | return msg; |
105 | 952 | } |
106 | | |
107 | | // Returns RaftPeerPB with given UUID and obviously-fake hostname / port combo. |
108 | 26 | RaftPeerPB FakeRaftPeerPB(const std::string& uuid) { |
109 | 26 | RaftPeerPB peer_pb; |
110 | 26 | peer_pb.set_permanent_uuid(uuid); |
111 | 26 | auto addr = peer_pb.mutable_last_known_private_addr()->Add(); |
112 | 26 | addr->set_host(Substitute("$0-fake-hostname", CURRENT_TEST_NAME())); |
113 | 26 | addr->set_port(0); |
114 | 26 | return peer_pb; |
115 | 26 | } |
116 | | |
117 | | // Appends 'count' messages to 'queue' with different terms and indexes. |
118 | | // |
119 | | // An operation will only be considered done (TestOperationStatus::IsDone() |
120 | | // will become true) once at least 'n_majority' peers have called |
121 | | // TestOperationStatus::AckPeer(). |
122 | | static inline void AppendReplicateMessagesToQueue( |
123 | | PeerMessageQueue* queue, |
124 | | const scoped_refptr<server::Clock>& clock, |
125 | | int64_t first_index, |
126 | | int64_t count, |
127 | 113 | int64_t payload_size = 0) { |
128 | | |
129 | 857 | for (int64_t index = first_index; index < first_index + count; index++) { |
130 | 744 | int64_t term = index / kTermDivisor; |
131 | 744 | CHECK_OK(queue->TEST_AppendOperation( |
132 | 744 | CreateDummyReplicate(term, index, clock->Now(), payload_size))); |
133 | 744 | } |
134 | 113 | } consensus_peers-test.cc:_ZN2yb9consensusL30AppendReplicateMessagesToQueueEPNS0_16PeerMessageQueueERK13scoped_refptrINS_6server5ClockEExxx Line | Count | Source | 127 | 105 | int64_t payload_size = 0) { | 128 | | | 129 | 229 | for (int64_t index = first_index; index < first_index + count; index++) { | 130 | 124 | int64_t term = index / kTermDivisor; | 131 | 124 | CHECK_OK(queue->TEST_AppendOperation( | 132 | 124 | CreateDummyReplicate(term, index, clock->Now(), payload_size))); | 133 | 124 | } | 134 | 105 | } |
consensus_queue-test.cc:_ZN2yb9consensusL30AppendReplicateMessagesToQueueEPNS0_16PeerMessageQueueERK13scoped_refptrINS_6server5ClockEExxx Line | Count | Source | 127 | 8 | int64_t payload_size = 0) { | 128 | | | 129 | 628 | for (int64_t index = first_index; index < first_index + count; index++) { | 130 | 620 | int64_t term = index / kTermDivisor; | 131 | 620 | CHECK_OK(queue->TEST_AppendOperation( | 132 | 620 | CreateDummyReplicate(term, index, clock->Now(), payload_size))); | 133 | 620 | } | 134 | 8 | } |
Unexecuted instantiation: leader_election-test.cc:_ZN2yb9consensusL30AppendReplicateMessagesToQueueEPNS0_16PeerMessageQueueERK13scoped_refptrINS_6server5ClockEExxx Unexecuted instantiation: log_cache-test.cc:_ZN2yb9consensusL30AppendReplicateMessagesToQueueEPNS0_16PeerMessageQueueERK13scoped_refptrINS_6server5ClockEExxx Unexecuted instantiation: raft_consensus-test.cc:_ZN2yb9consensusL30AppendReplicateMessagesToQueueEPNS0_16PeerMessageQueueERK13scoped_refptrINS_6server5ClockEExxx Unexecuted instantiation: raft_consensus_quorum-test.cc:_ZN2yb9consensusL30AppendReplicateMessagesToQueueEPNS0_16PeerMessageQueueERK13scoped_refptrINS_6server5ClockEExxx Unexecuted instantiation: replica_state-test.cc:_ZN2yb9consensusL30AppendReplicateMessagesToQueueEPNS0_16PeerMessageQueueERK13scoped_refptrINS_6server5ClockEExxx Unexecuted instantiation: tablet_bootstrap-test.cc:_ZN2yb9consensusL30AppendReplicateMessagesToQueueEPNS0_16PeerMessageQueueERK13scoped_refptrINS_6server5ClockEExxx |
135 | | |
136 | 12 | OpIdPB MakeOpIdPbForIndex(int index) { |
137 | 12 | return MakeOpId(index / kTermDivisor, index); |
138 | 12 | } |
139 | | |
140 | 13 | OpId MakeOpIdForIndex(int index) { |
141 | 13 | return OpId(index / kTermDivisor, index); |
142 | 13 | } |
143 | | |
144 | 10 | std::string OpIdStrForIndex(int index) { |
145 | 10 | return OpIdToString(MakeOpIdPbForIndex(index)); |
146 | 10 | } |
147 | | |
148 | | // Builds a configuration of 'num' voters. |
149 | 27 | RaftConfigPB BuildRaftConfigPBForTests(int num) { |
150 | 27 | RaftConfigPB raft_config; |
151 | 100 | for (int i = 0; i < num; i++) { |
152 | 73 | RaftPeerPB* peer_pb = raft_config.add_peers(); |
153 | 73 | peer_pb->set_member_type(PeerMemberType::VOTER); |
154 | 73 | peer_pb->set_permanent_uuid(Substitute("peer-$0", i)); |
155 | 73 | HostPortPB* hp = peer_pb->mutable_last_known_private_addr()->Add(); |
156 | 73 | hp->set_host(Substitute("peer-$0.fake-domain-for-tests", i)); |
157 | 73 | hp->set_port(0); |
158 | 73 | } |
159 | 27 | return raft_config; |
160 | 27 | } |
161 | | |
162 | | // Abstract base class to build PeerProxy implementations on top of for testing. |
163 | | // Provides a single-threaded pool to run callbacks in and callback |
164 | | // registration/running, along with an enum to identify the supported methods. |
165 | | class TestPeerProxy : public PeerProxy { |
166 | | public: |
167 | | // Which PeerProxy method to invoke. |
168 | | enum Method { |
169 | | kUpdate, |
170 | | kRequestVote, |
171 | | }; |
172 | | |
173 | 1.75k | explicit TestPeerProxy(ThreadPool* pool) : pool_(pool) {} |
174 | | |
175 | | protected: |
176 | | // Register the RPC callback in order to call later. |
177 | | // We currently only support one request of each method being in flight at a time. |
178 | 1.01k | virtual void RegisterCallback(Method method, const rpc::ResponseCallback& callback) { |
179 | 1.01k | std::lock_guard<simple_spinlock> lock(lock_); |
180 | 1.01k | InsertOrDie(&callbacks_, method, callback); |
181 | 1.01k | } |
182 | | |
183 | | // Answer the peer. |
184 | 1.01k | virtual void Respond(Method method) { |
185 | 1.01k | rpc::ResponseCallback callback; |
186 | 1.01k | { |
187 | 1.01k | std::lock_guard<simple_spinlock> lock(lock_); |
188 | 1.01k | callback = FindOrDie(callbacks_, method); |
189 | 1.01k | CHECK_EQ(1, callbacks_.erase(method)); |
190 | | // Drop the lock before submitting to the pool, since the callback itself may |
191 | | // destroy this instance. |
192 | 1.01k | } |
193 | 1.01k | WARN_NOT_OK(pool_->SubmitFunc(callback), "Submit failed"); |
194 | 1.01k | } |
195 | | |
196 | 339 | virtual void RegisterCallbackAndRespond(Method method, const rpc::ResponseCallback& callback) { |
197 | 339 | RegisterCallback(method, callback); |
198 | 339 | Respond(method); |
199 | 339 | } |
200 | | |
201 | | mutable simple_spinlock lock_; |
202 | | ThreadPool* pool_; |
203 | | std::map<Method, rpc::ResponseCallback> callbacks_; // Protected by lock_. |
204 | | }; |
205 | | |
206 | | template <typename ProxyType> |
207 | | class DelayablePeerProxy : public TestPeerProxy { |
208 | | public: |
209 | | // Add delayability of RPC responses to the delegated impl. |
210 | | // This class takes ownership of 'proxy'. |
211 | | explicit DelayablePeerProxy(ThreadPool* pool, ProxyType* proxy) |
212 | | : TestPeerProxy(pool), |
213 | | proxy_(CHECK_NOTNULL(proxy)), |
214 | | delay_response_(false), |
215 | 17 | latch_(1) { |
216 | 17 | } _ZN2yb9consensus18DelayablePeerProxyINS0_17NoOpTestPeerProxyEEC2EPNS_10ThreadPoolEPS2_ Line | Count | Source | 215 | 5 | latch_(1) { | 216 | 5 | } |
_ZN2yb9consensus18DelayablePeerProxyINS0_15MockedPeerProxyEEC2EPNS_10ThreadPoolEPS2_ Line | Count | Source | 215 | 12 | latch_(1) { | 216 | 12 | } |
|
217 | | |
218 | | // Delay the answer to the next response to this remote |
219 | | // peer. The response callback will only be called on Respond(). |
220 | 6 | virtual void DelayResponse() { |
221 | 6 | std::lock_guard<simple_spinlock> l(lock_); |
222 | 6 | delay_response_ = true; |
223 | 6 | latch_.Reset(1); // Reset for the next time. |
224 | 6 | } _ZN2yb9consensus18DelayablePeerProxyINS0_17NoOpTestPeerProxyEE13DelayResponseEv Line | Count | Source | 220 | 2 | virtual void DelayResponse() { | 221 | 2 | std::lock_guard<simple_spinlock> l(lock_); | 222 | 2 | delay_response_ = true; | 223 | 2 | latch_.Reset(1); // Reset for the next time. | 224 | 2 | } |
_ZN2yb9consensus18DelayablePeerProxyINS0_15MockedPeerProxyEE13DelayResponseEv Line | Count | Source | 220 | 4 | virtual void DelayResponse() { | 221 | 4 | std::lock_guard<simple_spinlock> l(lock_); | 222 | 4 | delay_response_ = true; | 223 | 4 | latch_.Reset(1); // Reset for the next time. | 224 | 4 | } |
|
225 | | |
226 | 23 | virtual void RespondUnlessDelayed(Method method) { |
227 | 23 | { |
228 | 23 | std::lock_guard<simple_spinlock> l(lock_); |
229 | 23 | if (delay_response_) { |
230 | 6 | latch_.CountDown(); |
231 | 6 | delay_response_ = false; |
232 | 6 | return; |
233 | 6 | } |
234 | 17 | } |
235 | 17 | TestPeerProxy::Respond(method); |
236 | 17 | } _ZN2yb9consensus18DelayablePeerProxyINS0_17NoOpTestPeerProxyEE20RespondUnlessDelayedENS0_13TestPeerProxy6MethodE Line | Count | Source | 226 | 11 | virtual void RespondUnlessDelayed(Method method) { | 227 | 11 | { | 228 | 11 | std::lock_guard<simple_spinlock> l(lock_); | 229 | 11 | if (delay_response_) { | 230 | 2 | latch_.CountDown(); | 231 | 2 | delay_response_ = false; | 232 | 2 | return; | 233 | 2 | } | 234 | 9 | } | 235 | 9 | TestPeerProxy::Respond(method); | 236 | 9 | } |
_ZN2yb9consensus18DelayablePeerProxyINS0_15MockedPeerProxyEE20RespondUnlessDelayedENS0_13TestPeerProxy6MethodE Line | Count | Source | 226 | 12 | virtual void RespondUnlessDelayed(Method method) { | 227 | 12 | { | 228 | 12 | std::lock_guard<simple_spinlock> l(lock_); | 229 | 12 | if (delay_response_) { | 230 | 4 | latch_.CountDown(); | 231 | 4 | delay_response_ = false; | 232 | 4 | return; | 233 | 4 | } | 234 | 8 | } | 235 | 8 | TestPeerProxy::Respond(method); | 236 | 8 | } |
|
237 | | |
238 | 6 | virtual void Respond(Method method) override { |
239 | 6 | latch_.Wait(); // Wait until strictly after peer would have responded. |
240 | 6 | return TestPeerProxy::Respond(method); |
241 | 6 | } _ZN2yb9consensus18DelayablePeerProxyINS0_17NoOpTestPeerProxyEE7RespondENS0_13TestPeerProxy6MethodE Line | Count | Source | 238 | 2 | virtual void Respond(Method method) override { | 239 | 2 | latch_.Wait(); // Wait until strictly after peer would have responded. | 240 | 2 | return TestPeerProxy::Respond(method); | 241 | 2 | } |
_ZN2yb9consensus18DelayablePeerProxyINS0_15MockedPeerProxyEE7RespondENS0_13TestPeerProxy6MethodE Line | Count | Source | 238 | 4 | virtual void Respond(Method method) override { | 239 | 4 | latch_.Wait(); // Wait until strictly after peer would have responded. | 240 | 4 | return TestPeerProxy::Respond(method); | 241 | 4 | } |
|
242 | | |
243 | | virtual void UpdateAsync(const ConsensusRequestPB* request, |
244 | | RequestTriggerMode trigger_mode, |
245 | | ConsensusResponsePB* response, |
246 | | rpc::RpcController* controller, |
247 | 11 | const rpc::ResponseCallback& callback) override { |
248 | 11 | RegisterCallback(kUpdate, callback); |
249 | 11 | return proxy_->UpdateAsync( |
250 | 11 | request, trigger_mode, response, controller, |
251 | 11 | std::bind(&DelayablePeerProxy::RespondUnlessDelayed, this, kUpdate)); |
252 | 11 | } _ZN2yb9consensus18DelayablePeerProxyINS0_17NoOpTestPeerProxyEE11UpdateAsyncEPKNS0_18ConsensusRequestPBENS0_18RequestTriggerModeEPNS0_19ConsensusResponsePBEPNS_3rpc13RpcControllerERKNSt3__18functionIFvvEEE Line | Count | Source | 247 | 11 | const rpc::ResponseCallback& callback) override { | 248 | 11 | RegisterCallback(kUpdate, callback); | 249 | 11 | return proxy_->UpdateAsync( | 250 | 11 | request, trigger_mode, response, controller, | 251 | 11 | std::bind(&DelayablePeerProxy::RespondUnlessDelayed, this, kUpdate)); | 252 | 11 | } |
Unexecuted instantiation: _ZN2yb9consensus18DelayablePeerProxyINS0_15MockedPeerProxyEE11UpdateAsyncEPKNS0_18ConsensusRequestPBENS0_18RequestTriggerModeEPNS0_19ConsensusResponsePBEPNS_3rpc13RpcControllerERKNSt3__18functionIFvvEEE |
253 | | |
254 | | virtual void RequestConsensusVoteAsync(const VoteRequestPB* request, |
255 | | VoteResponsePB* response, |
256 | | rpc::RpcController* controller, |
257 | 12 | const rpc::ResponseCallback& callback) override { |
258 | 12 | RegisterCallback(kRequestVote, callback); |
259 | 12 | return proxy_->RequestConsensusVoteAsync( |
260 | 12 | request, response, controller, |
261 | 12 | std::bind(&DelayablePeerProxy::RespondUnlessDelayed, this, kRequestVote)); |
262 | 12 | } Unexecuted instantiation: _ZN2yb9consensus18DelayablePeerProxyINS0_17NoOpTestPeerProxyEE25RequestConsensusVoteAsyncEPKNS0_13VoteRequestPBEPNS0_14VoteResponsePBEPNS_3rpc13RpcControllerERKNSt3__18functionIFvvEEE _ZN2yb9consensus18DelayablePeerProxyINS0_15MockedPeerProxyEE25RequestConsensusVoteAsyncEPKNS0_13VoteRequestPBEPNS0_14VoteResponsePBEPNS_3rpc13RpcControllerERKNSt3__18functionIFvvEEE Line | Count | Source | 257 | 12 | const rpc::ResponseCallback& callback) override { | 258 | 12 | RegisterCallback(kRequestVote, callback); | 259 | 12 | return proxy_->RequestConsensusVoteAsync( | 260 | 12 | request, response, controller, | 261 | 12 | std::bind(&DelayablePeerProxy::RespondUnlessDelayed, this, kRequestVote)); | 262 | 12 | } |
|
263 | | |
264 | 16 | ProxyType* proxy() const { |
265 | 16 | return proxy_.get(); |
266 | 16 | } _ZNK2yb9consensus18DelayablePeerProxyINS0_17NoOpTestPeerProxyEE5proxyEv Line | Count | Source | 264 | 4 | ProxyType* proxy() const { | 265 | 4 | return proxy_.get(); | 266 | 4 | } |
_ZNK2yb9consensus18DelayablePeerProxyINS0_15MockedPeerProxyEE5proxyEv Line | Count | Source | 264 | 12 | ProxyType* proxy() const { | 265 | 12 | return proxy_.get(); | 266 | 12 | } |
|
267 | | |
268 | | protected: |
269 | | std::unique_ptr<ProxyType> const proxy_; |
270 | | bool delay_response_; // Protected by lock_. |
271 | | CountDownLatch latch_; |
272 | | }; |
273 | | |
274 | | // Allows complete mocking of a peer's responses. |
275 | | // You set the response, it will respond with that. |
276 | | class MockedPeerProxy : public TestPeerProxy { |
277 | | public: |
278 | | explicit MockedPeerProxy(ThreadPool* pool) |
279 | 14 | : TestPeerProxy(pool) { |
280 | 14 | } |
281 | | |
282 | 3 | virtual void set_update_response(const ConsensusResponsePB& update_response) { |
283 | 0 | CHECK(update_response.IsInitialized()) << update_response.ShortDebugString(); |
284 | 3 | { |
285 | 3 | std::lock_guard<simple_spinlock> l(lock_); |
286 | 3 | update_response_ = update_response; |
287 | 3 | } |
288 | 3 | } |
289 | | |
290 | 12 | virtual void set_vote_response(const VoteResponsePB& vote_response) { |
291 | 12 | { |
292 | 12 | std::lock_guard<simple_spinlock> l(lock_); |
293 | 12 | vote_response_ = vote_response; |
294 | 12 | } |
295 | 12 | } |
296 | | |
297 | | virtual void UpdateAsync(const ConsensusRequestPB* request, |
298 | | RequestTriggerMode trigger_mode, |
299 | | ConsensusResponsePB* response, |
300 | | rpc::RpcController* controller, |
301 | 2 | const rpc::ResponseCallback& callback) override { |
302 | 2 | { |
303 | 2 | std::lock_guard<simple_spinlock> l(lock_); |
304 | 2 | switch (trigger_mode) { |
305 | 1 | case RequestTriggerMode::kNonEmptyOnly: non_empty_only_update_count_++; break; |
306 | 1 | case RequestTriggerMode::kAlwaysSend: forced_update_count_++; break; |
307 | 2 | } |
308 | 2 | update_count_++; |
309 | 2 | *response = update_response_; |
310 | 2 | } |
311 | 2 | return RegisterCallbackAndRespond(kUpdate, callback); |
312 | 2 | } |
313 | | |
314 | | virtual void RequestConsensusVoteAsync(const VoteRequestPB* request, |
315 | | VoteResponsePB* response, |
316 | | rpc::RpcController* controller, |
317 | 12 | const rpc::ResponseCallback& callback) override { |
318 | 12 | *response = vote_response_; |
319 | 12 | return RegisterCallbackAndRespond(kRequestVote, callback); |
320 | 12 | } |
321 | | |
322 | | // Return the number of times that UpdateAsync() has been called. |
323 | 3 | int update_count() const { |
324 | 3 | std::lock_guard<simple_spinlock> l(lock_); |
325 | 3 | return update_count_; |
326 | 3 | } |
327 | | |
328 | | // Return the number of times that UpdateAsync() has been for requestes triggered with |
329 | | // RequestTriggerMode::kNonEmptyOnly. |
330 | 0 | int non_empty_only_update_count() const { |
331 | 0 | std::lock_guard<simple_spinlock> l(lock_); |
332 | 0 | return non_empty_only_update_count_; |
333 | 0 | } |
334 | | |
335 | | // Return the number of times that UpdateAsync() has been for requestes triggered with |
336 | | // RequestTriggerMode::kAlwaysSend. |
337 | 0 | int forced_update_count() const { |
338 | 0 | std::lock_guard<simple_spinlock> l(lock_); |
339 | 0 | return forced_update_count_; |
340 | 0 | } |
341 | | |
342 | | protected: |
343 | | int update_count_ = 0; |
344 | | int forced_update_count_ = 0; |
345 | | int non_empty_only_update_count_ = 0; |
346 | | |
347 | | ConsensusResponsePB update_response_; |
348 | | VoteResponsePB vote_response_; |
349 | | }; |
350 | | |
351 | | // Allows to test peers by emulating a noop remote endpoint that just replies |
352 | | // that the messages were received/replicated/committed. |
353 | | class NoOpTestPeerProxy : public TestPeerProxy { |
354 | | public: |
355 | | |
356 | | explicit NoOpTestPeerProxy(ThreadPool* pool, const consensus::RaftPeerPB& peer_pb) |
357 | 1.68k | : TestPeerProxy(pool), peer_pb_(peer_pb) { |
358 | 1.68k | last_received_.CopyFrom(MinimumOpId()); |
359 | 1.68k | } |
360 | | |
361 | | virtual void UpdateAsync(const ConsensusRequestPB* request, |
362 | | RequestTriggerMode trigger_mode, |
363 | | ConsensusResponsePB* response, |
364 | | rpc::RpcController* controller, |
365 | 11 | const rpc::ResponseCallback& callback) override { |
366 | | |
367 | 11 | response->Clear(); |
368 | 11 | { |
369 | 11 | std::lock_guard<simple_spinlock> lock(lock_); |
370 | 11 | if (OpIdLessThan(last_received_, request->preceding_id())) { |
371 | 5 | ConsensusErrorPB* error = response->mutable_status()->mutable_error(); |
372 | 5 | error->set_code(ConsensusErrorPB::PRECEDING_ENTRY_DIDNT_MATCH); |
373 | 5 | StatusToPB(STATUS(IllegalState, ""), error->mutable_status()); |
374 | 6 | } else if (request->ops_size() > 0) { |
375 | 5 | last_received_.CopyFrom(request->ops(request->ops_size() - 1).id()); |
376 | 5 | } |
377 | | |
378 | 11 | response->set_responder_uuid(peer_pb_.permanent_uuid()); |
379 | 11 | response->set_responder_term(request->caller_term()); |
380 | 11 | response->mutable_status()->mutable_last_received()->CopyFrom(last_received_); |
381 | 11 | response->mutable_status()->mutable_last_received_current_leader()->CopyFrom(last_received_); |
382 | | // We set the last committed index to be the same index as the last received. While |
383 | | // this is unlikely to happen in a real situation, its not technically incorrect and |
384 | | // avoids having to come up with some other index that it still correct. |
385 | 11 | response->mutable_status()->set_last_committed_idx(last_received_.index()); |
386 | 11 | } |
387 | 11 | return RegisterCallbackAndRespond(kUpdate, callback); |
388 | 11 | } |
389 | | |
390 | | virtual void RequestConsensusVoteAsync(const VoteRequestPB* request, |
391 | | VoteResponsePB* response, |
392 | | rpc::RpcController* controller, |
393 | 314 | const rpc::ResponseCallback& callback) override { |
394 | 314 | { |
395 | 314 | std::lock_guard<simple_spinlock> lock(lock_); |
396 | 314 | response->set_responder_uuid(peer_pb_.permanent_uuid()); |
397 | 314 | response->set_responder_term(request->candidate_term()); |
398 | 314 | response->set_vote_granted(true); |
399 | 314 | } |
400 | 314 | return RegisterCallbackAndRespond(kRequestVote, callback); |
401 | 314 | } |
402 | | |
403 | 4 | const OpIdPB& last_received() { |
404 | 4 | std::lock_guard<simple_spinlock> lock(lock_); |
405 | 4 | return last_received_; |
406 | 4 | } |
407 | | |
408 | | private: |
409 | | const consensus::RaftPeerPB peer_pb_; |
410 | | ConsensusStatusPB last_status_; // Protected by lock_. |
411 | | OpIdPB last_received_; // Protected by lock_. |
412 | | }; |
413 | | |
414 | | class NoOpTestPeerProxyFactory : public PeerProxyFactory { |
415 | | public: |
416 | 0 | NoOpTestPeerProxyFactory() { |
417 | 0 | CHECK_OK(ThreadPoolBuilder("test-peer-pool").set_max_threads(3).Build(&pool_)); |
418 | 0 | messenger_ = CHECK_RESULT(rpc::MessengerBuilder("test").Build()); |
419 | 0 | } |
420 | | |
421 | 0 | PeerProxyPtr NewProxy(const RaftPeerPB& peer_pb) override { |
422 | 0 | return std::make_unique<NoOpTestPeerProxy>(pool_.get(), peer_pb); |
423 | 0 | } |
424 | | |
425 | 0 | Messenger* messenger() const override { |
426 | 0 | return messenger_.get(); |
427 | 0 | } |
428 | | |
429 | | std::unique_ptr<ThreadPool> pool_; |
430 | | std::unique_ptr<rpc::Messenger> messenger_; |
431 | | }; |
432 | | |
433 | | typedef std::unordered_map<std::string, std::shared_ptr<RaftConsensus> > TestPeerMap; |
434 | | |
435 | | // Thread-safe manager for list of peers being used in tests. |
436 | | class TestPeerMapManager { |
437 | | public: |
438 | 7 | explicit TestPeerMapManager(const RaftConfigPB& config) : config_(config) {} |
439 | | |
440 | 23 | void AddPeer(const std::string& peer_uuid, const std::shared_ptr<RaftConsensus>& peer) { |
441 | 23 | std::lock_guard<simple_spinlock> lock(lock_); |
442 | 23 | InsertOrDie(&peers_, peer_uuid, peer); |
443 | 23 | } |
444 | | |
445 | 456 | CHECKED_STATUS GetPeerByIdx(int idx, std::shared_ptr<RaftConsensus>* peer_out) const { |
446 | 456 | CHECK_LT(idx, config_.peers_size()); |
447 | 456 | return GetPeerByUuid(config_.peers(idx).permanent_uuid(), peer_out); |
448 | 456 | } |
449 | | |
450 | | CHECKED_STATUS GetPeerByUuid(const std::string& peer_uuid, |
451 | 1.10k | std::shared_ptr<RaftConsensus>* peer_out) const { |
452 | 1.10k | std::lock_guard<simple_spinlock> lock(lock_); |
453 | 1.10k | if (!FindCopy(peers_, peer_uuid, peer_out)) { |
454 | 19 | return STATUS(NotFound, "Other consensus instance was destroyed"); |
455 | 19 | } |
456 | 1.08k | return Status::OK(); |
457 | 1.08k | } |
458 | | |
459 | 2 | void RemovePeer(const std::string& peer_uuid) { |
460 | 2 | std::lock_guard<simple_spinlock> lock(lock_); |
461 | 2 | peers_.erase(peer_uuid); |
462 | 2 | } |
463 | | |
464 | 16 | TestPeerMap GetPeerMapCopy() const { |
465 | 16 | std::lock_guard<simple_spinlock> lock(lock_); |
466 | 16 | return peers_; |
467 | 16 | } |
468 | | |
469 | 7 | void Clear() { |
470 | | // We create a copy of the peers before we clear 'peers_' so that there's |
471 | | // still a reference to each peer. If we reduce the reference count to 0 under |
472 | | // the lock we might get a deadlock as on shutdown consensus indirectly |
473 | | // destroys the test proxies which in turn reach into this class. |
474 | 7 | TestPeerMap copy = peers_; |
475 | 7 | { |
476 | 7 | std::lock_guard<simple_spinlock> lock(lock_); |
477 | 7 | peers_.clear(); |
478 | 7 | } |
479 | | |
480 | 7 | } |
481 | | |
482 | | private: |
483 | | const RaftConfigPB config_; |
484 | | TestPeerMap peers_; |
485 | | mutable simple_spinlock lock_; |
486 | | }; |
487 | | |
488 | | |
489 | | // Allows to test remote peers by emulating an RPC. |
490 | | // Both the "remote" peer's RPC call and the caller peer's response are executed |
491 | | // asynchronously in a ThreadPool. |
492 | | class LocalTestPeerProxy : public TestPeerProxy { |
493 | | public: |
494 | | LocalTestPeerProxy(std::string peer_uuid, ThreadPool* pool, |
495 | | TestPeerMapManager* peers) |
496 | | : TestPeerProxy(pool), |
497 | | peer_uuid_(std::move(peer_uuid)), |
498 | | peers_(peers), |
499 | 40 | miss_comm_(false) {} |
500 | | |
501 | | void UpdateAsync(const ConsensusRequestPB* request, |
502 | | RequestTriggerMode trigger_mode, |
503 | | ConsensusResponsePB* response, |
504 | | rpc::RpcController* controller, |
505 | 634 | const rpc::ResponseCallback& callback) override { |
506 | 634 | RegisterCallback(kUpdate, callback); |
507 | 634 | CHECK_OK(pool_->SubmitFunc( |
508 | 634 | std::bind(&LocalTestPeerProxy::SendUpdateRequest, this, *request, response))); |
509 | 634 | } |
510 | | |
511 | | void RequestConsensusVoteAsync(const VoteRequestPB* request, |
512 | | VoteResponsePB* response, |
513 | | rpc::RpcController* controller, |
514 | 16 | const rpc::ResponseCallback& callback) override { |
515 | 16 | RegisterCallback(kRequestVote, callback); |
516 | 16 | WARN_NOT_OK( |
517 | 16 | pool_->SubmitFunc(std::bind(&LocalTestPeerProxy::SendVoteRequest, this, request, response)), |
518 | 16 | "Submit failed"); |
519 | 16 | } |
520 | | |
521 | | template<class Response> |
522 | 122 | void SetResponseError(const Status& status, Response* response) { |
523 | 122 | tserver::TabletServerErrorPB* error = response->mutable_error(); |
524 | 122 | error->set_code(tserver::TabletServerErrorPB::UNKNOWN_ERROR); |
525 | 122 | StatusToPB(status, error->mutable_status()); |
526 | 122 | ClearStatus(response); |
527 | 122 | } _ZN2yb9consensus18LocalTestPeerProxy16SetResponseErrorINS0_19ConsensusResponsePBEEEvRKNS_6StatusEPT_ Line | Count | Source | 522 | 116 | void SetResponseError(const Status& status, Response* response) { | 523 | 116 | tserver::TabletServerErrorPB* error = response->mutable_error(); | 524 | 116 | error->set_code(tserver::TabletServerErrorPB::UNKNOWN_ERROR); | 525 | 116 | StatusToPB(status, error->mutable_status()); | 526 | 116 | ClearStatus(response); | 527 | 116 | } |
_ZN2yb9consensus18LocalTestPeerProxy16SetResponseErrorINS0_14VoteResponsePBEEEvRKNS_6StatusEPT_ Line | Count | Source | 522 | 6 | void SetResponseError(const Status& status, Response* response) { | 523 | 6 | tserver::TabletServerErrorPB* error = response->mutable_error(); | 524 | 6 | error->set_code(tserver::TabletServerErrorPB::UNKNOWN_ERROR); | 525 | 6 | StatusToPB(status, error->mutable_status()); | 526 | 6 | ClearStatus(response); | 527 | 6 | } |
|
528 | | |
529 | 6 | void ClearStatus(VoteResponsePB* response) { |
530 | 6 | } |
531 | | |
532 | 116 | void ClearStatus(ConsensusResponsePB* response) { |
533 | 116 | response->clear_status(); |
534 | 116 | } |
535 | | |
536 | | template<class Request, class Response> |
537 | | void RespondOrMissResponse(Request* request, |
538 | | const Response& response_temp, |
539 | | Response* final_response, |
540 | 650 | Method method) { |
541 | 650 | bool miss_comm_copy; |
542 | 650 | { |
543 | 650 | std::lock_guard<simple_spinlock> lock(lock_); |
544 | 650 | miss_comm_copy = miss_comm_; |
545 | 650 | miss_comm_ = false; |
546 | 650 | } |
547 | 650 | if (PREDICT_FALSE(miss_comm_copy)) { |
548 | 0 | VLOG(2) << this << ": injecting fault on " << request->ShortDebugString(); |
549 | 103 | SetResponseError(STATUS(IOError, "Artificial error caused by communication " |
550 | 103 | "failure injection."), final_response); |
551 | 547 | } else { |
552 | 547 | final_response->CopyFrom(response_temp); |
553 | 547 | } |
554 | 650 | Respond(method); |
555 | 650 | } _ZN2yb9consensus18LocalTestPeerProxy21RespondOrMissResponseINS0_18ConsensusRequestPBENS0_19ConsensusResponsePBEEEvPT_RKT0_PS7_NS0_13TestPeerProxy6MethodE Line | Count | Source | 540 | 634 | Method method) { | 541 | 634 | bool miss_comm_copy; | 542 | 634 | { | 543 | 634 | std::lock_guard<simple_spinlock> lock(lock_); | 544 | 634 | miss_comm_copy = miss_comm_; | 545 | 634 | miss_comm_ = false; | 546 | 634 | } | 547 | 634 | if (PREDICT_FALSE(miss_comm_copy)) { | 548 | 0 | VLOG(2) << this << ": injecting fault on " << request->ShortDebugString(); | 549 | 103 | SetResponseError(STATUS(IOError, "Artificial error caused by communication " | 550 | 103 | "failure injection."), final_response); | 551 | 531 | } else { | 552 | 531 | final_response->CopyFrom(response_temp); | 553 | 531 | } | 554 | 634 | Respond(method); | 555 | 634 | } |
_ZN2yb9consensus18LocalTestPeerProxy21RespondOrMissResponseIKNS0_13VoteRequestPBENS0_14VoteResponsePBEEEvPT_RKT0_PS8_NS0_13TestPeerProxy6MethodE Line | Count | Source | 540 | 16 | Method method) { | 541 | 16 | bool miss_comm_copy; | 542 | 16 | { | 543 | 16 | std::lock_guard<simple_spinlock> lock(lock_); | 544 | 16 | miss_comm_copy = miss_comm_; | 545 | 16 | miss_comm_ = false; | 546 | 16 | } | 547 | 16 | if (PREDICT_FALSE(miss_comm_copy)) { | 548 | 0 | VLOG(2) << this << ": injecting fault on " << request->ShortDebugString(); | 549 | 0 | SetResponseError(STATUS(IOError, "Artificial error caused by communication " | 550 | 0 | "failure injection."), final_response); | 551 | 16 | } else { | 552 | 16 | final_response->CopyFrom(response_temp); | 553 | 16 | } | 554 | 16 | Respond(method); | 555 | 16 | } |
|
556 | | |
557 | | void SendUpdateRequest(ConsensusRequestPB request, |
558 | 634 | ConsensusResponsePB* response) { |
559 | | // Give the other peer a clean response object to write to. |
560 | 634 | ConsensusResponsePB other_peer_resp; |
561 | 634 | std::shared_ptr<RaftConsensus> peer; |
562 | 634 | Status s = peers_->GetPeerByUuid(peer_uuid_, &peer); |
563 | | |
564 | 634 | if (s.ok()) { |
565 | 621 | s = peer->Update(&request, &other_peer_resp, CoarseBigDeadline()); |
566 | 621 | if (s.ok() && !other_peer_resp.has_error()) { |
567 | 620 | CHECK(other_peer_resp.has_status()); |
568 | 620 | CHECK(other_peer_resp.status().IsInitialized()); |
569 | 620 | } |
570 | 621 | } |
571 | 634 | if (!s.ok()) { |
572 | 13 | LOG(WARNING) << "Could not Update replica with request: " |
573 | 13 | << request.ShortDebugString() |
574 | 13 | << " Status: " << s.ToString(); |
575 | 13 | SetResponseError(s, &other_peer_resp); |
576 | 13 | } |
577 | | |
578 | 634 | response->CopyFrom(other_peer_resp); |
579 | 634 | RespondOrMissResponse(&request, other_peer_resp, response, kUpdate); |
580 | 634 | } |
581 | | |
582 | | |
583 | | |
584 | | void SendVoteRequest(const VoteRequestPB* request, |
585 | 16 | VoteResponsePB* response) { |
586 | | |
587 | | // Copy the request and the response for the other peer so that ownership |
588 | | // remains as close to the dist. impl. as possible. |
589 | 16 | VoteRequestPB other_peer_req; |
590 | 16 | other_peer_req.CopyFrom(*request); |
591 | 16 | VoteResponsePB other_peer_resp; |
592 | 16 | other_peer_resp.CopyFrom(*response); |
593 | | |
594 | 16 | std::shared_ptr<RaftConsensus> peer; |
595 | 16 | Status s = peers_->GetPeerByUuid(peer_uuid_, &peer); |
596 | | |
597 | 16 | if (s.ok()) { |
598 | 10 | s = peer->RequestVote(&other_peer_req, &other_peer_resp); |
599 | 10 | } |
600 | 16 | if (!s.ok()) { |
601 | 6 | LOG(WARNING) << "Could not RequestVote from replica with request: " |
602 | 6 | << other_peer_req.ShortDebugString() |
603 | 6 | << " Status: " << s.ToString(); |
604 | 6 | SetResponseError(s, &other_peer_resp); |
605 | 6 | } |
606 | | |
607 | 16 | response->CopyFrom(other_peer_resp); |
608 | 16 | RespondOrMissResponse(request, other_peer_resp, response, kRequestVote); |
609 | 16 | } |
610 | | |
611 | 104 | void InjectCommFaultLeaderSide() { |
612 | 0 | VLOG(2) << this << ": injecting fault next time"; |
613 | 104 | std::lock_guard<simple_spinlock> lock(lock_); |
614 | 104 | miss_comm_ = true; |
615 | 104 | } |
616 | | |
617 | 156 | const std::string& GetTarget() const { |
618 | 156 | return peer_uuid_; |
619 | 156 | } |
620 | | |
621 | | private: |
622 | | const std::string peer_uuid_; |
623 | | TestPeerMapManager* const peers_; |
624 | | bool miss_comm_; |
625 | | }; |
626 | | |
627 | | class LocalTestPeerProxyFactory : public PeerProxyFactory { |
628 | | public: |
629 | | explicit LocalTestPeerProxyFactory(TestPeerMapManager* peers) |
630 | 29 | : peers_(peers) { |
631 | 29 | CHECK_OK(ThreadPoolBuilder("test-peer-pool").set_max_threads(3).Build(&pool_)); |
632 | 29 | messenger_ = rpc::CreateAutoShutdownMessengerHolder( |
633 | 29 | CHECK_RESULT(rpc::MessengerBuilder("test").Build())); |
634 | 29 | } |
635 | | |
636 | 40 | PeerProxyPtr NewProxy(const consensus::RaftPeerPB& peer_pb) override { |
637 | 40 | auto new_proxy = std::make_unique<LocalTestPeerProxy>( |
638 | 40 | peer_pb.permanent_uuid(), pool_.get(), peers_); |
639 | 40 | proxies_.push_back(new_proxy.get()); |
640 | 40 | return new_proxy; |
641 | 40 | } |
642 | | |
643 | 104 | virtual const vector<LocalTestPeerProxy*>& GetProxies() { |
644 | 104 | return proxies_; |
645 | 104 | } |
646 | | |
647 | 81 | rpc::Messenger* messenger() const override { |
648 | 81 | return messenger_.get(); |
649 | 81 | } |
650 | | |
651 | | private: |
652 | | std::unique_ptr<ThreadPool> pool_; |
653 | | rpc::AutoShutdownMessengerHolder messenger_; |
654 | | TestPeerMapManager* const peers_; |
655 | | // NOTE: There is no need to delete this on the dctor because proxies are externally managed |
656 | | vector<LocalTestPeerProxy*> proxies_; |
657 | | }; |
658 | | |
659 | | // A simple implementation of the transaction driver. |
660 | | // This is usually implemented by OperationDriver but here we |
661 | | // keep the implementation to the minimally required to have consensus |
662 | | // work. |
663 | | class TestDriver : public ConsensusRoundCallback { |
664 | | public: |
665 | | TestDriver(ThreadPool* pool, const scoped_refptr<ConsensusRound>& round) |
666 | 0 | : round_(round), pool_(pool) { |
667 | 0 | } |
668 | | |
669 | 0 | void SetRound(const scoped_refptr<ConsensusRound>& round) { |
670 | 0 | round_ = round; |
671 | 0 | } |
672 | | |
673 | | // Does nothing but enqueue the Apply |
674 | | void ReplicationFinished( |
675 | 0 | const Status& status, int64_t leader_term, OpIds* applied_op_ids) override { |
676 | 0 | if (status.IsAborted()) { |
677 | 0 | Cleanup(); |
678 | 0 | return; |
679 | 0 | } |
680 | 0 | CHECK_OK(status); |
681 | 0 | CHECK_OK(pool_->SubmitFunc(std::bind(&TestDriver::Apply, this))); |
682 | 0 | } Unexecuted instantiation: _ZN2yb9consensus10TestDriver19ReplicationFinishedERKNS_6StatusExPNSt3__16vectorINS_4OpIdENS5_9allocatorIS7_EEEE Unexecuted instantiation: _ZN2yb9consensus10TestDriver19ReplicationFinishedERKNS_6StatusExPNSt3__16vectorINS_4OpIdENS5_9allocatorIS7_EEEE |
683 | | |
684 | 0 | void AddedToLeader(const OpId& op_id, const OpId& committed_op_id) override {} |
685 | | |
686 | | // Called in all modes to delete the transaction and, transitively, the consensus |
687 | | // round. |
688 | 0 | void Cleanup() { |
689 | 0 | delete this; |
690 | 0 | } |
691 | | |
692 | | scoped_refptr<ConsensusRound> round_; |
693 | | |
694 | | private: |
695 | | // The commit message has the exact same type of the replicate message, but |
696 | | // no content. |
697 | 0 | void Apply() {} |
698 | | |
699 | 0 | void CommitCallback(const Status& s) { |
700 | 0 | CHECK_OK(s); |
701 | 0 | Cleanup(); |
702 | 0 | } |
703 | | |
704 | | ThreadPool* pool_; |
705 | | }; |
706 | | |
707 | | // Fake ReplicaOperationFactory that allows for instantiating and unit |
708 | | // testing RaftConsensusState. Does not actually support running transactions. |
709 | | class MockOperationFactory : public TestConsensusContext { |
710 | | public: |
711 | | CHECKED_STATUS StartReplicaOperation( |
712 | 0 | const scoped_refptr<ConsensusRound>& round, HybridTime propagated_hybrid_time) override { |
713 | 0 | return StartReplicaOperationMock(round.get()); |
714 | 0 | } |
715 | | |
716 | | MOCK_METHOD1(StartReplicaOperationMock, Status(ConsensusRound* round)); |
717 | | }; |
718 | | |
719 | | // A transaction factory for tests, usually this is implemented by TabletPeer. |
720 | | class TestOperationFactory : public TestConsensusContext { |
721 | | public: |
722 | 23 | TestOperationFactory() { |
723 | 23 | CHECK_OK(ThreadPoolBuilder("test-operation-factory").set_max_threads(1).Build(&pool_)); |
724 | 23 | } |
725 | | |
726 | 23 | void SetConsensus(Consensus* consensus) { |
727 | 23 | consensus_ = consensus; |
728 | 23 | } |
729 | | |
730 | | CHECKED_STATUS StartReplicaOperation( |
731 | 0 | const scoped_refptr<ConsensusRound>& round, HybridTime propagated_hybrid_time) override { |
732 | 0 | auto txn = new TestDriver(pool_.get(), round); |
733 | 0 | txn->round_->SetCallback(txn); |
734 | 0 | return Status::OK(); |
735 | 0 | } |
736 | | |
737 | 0 | void ReplicateAsync(ConsensusRound* round) { |
738 | 0 | CHECK_OK(consensus_->TEST_Replicate(round)); |
739 | 0 | } |
740 | | |
741 | 40 | void WaitDone() { |
742 | 40 | pool_->Wait(); |
743 | 40 | } |
744 | | |
745 | 23 | void ShutDown() { |
746 | 23 | WaitDone(); |
747 | 23 | pool_->Shutdown(); |
748 | 23 | } |
749 | | |
750 | 23 | ~TestOperationFactory() { |
751 | 23 | ShutDown(); |
752 | 23 | } |
753 | | |
754 | | private: |
755 | | std::unique_ptr<ThreadPool> pool_; |
756 | | Consensus* consensus_ = nullptr; |
757 | | }; |
758 | | |
759 | | // Consensus fault hooks impl. that simply counts the number of calls to |
760 | | // each method. |
761 | | // Allows passing another hook instance so that we can use both. |
762 | | // If non-null, the passed hook instance will be called first for all methods. |
763 | | class CounterHooks : public Consensus::ConsensusFaultHooks { |
764 | | public: |
765 | | explicit CounterHooks( |
766 | | std::shared_ptr<Consensus::ConsensusFaultHooks> current_hook) |
767 | | : current_hook_(std::move(current_hook)), |
768 | | pre_start_calls_(0), |
769 | | post_start_calls_(0), |
770 | | pre_config_change_calls_(0), |
771 | | post_config_change_calls_(0), |
772 | | pre_replicate_calls_(0), |
773 | | post_replicate_calls_(0), |
774 | | pre_update_calls_(0), |
775 | | post_update_calls_(0), |
776 | | pre_shutdown_calls_(0), |
777 | 2 | post_shutdown_calls_(0) {} |
778 | | |
779 | 2 | virtual CHECKED_STATUS PreStart() override { |
780 | 2 | if (current_hook_.get()) RETURN_NOT_OK(current_hook_->PreStart()); |
781 | 2 | std::lock_guard<simple_spinlock> lock(lock_); |
782 | 2 | pre_start_calls_++; |
783 | 2 | return Status::OK(); |
784 | 2 | } |
785 | | |
786 | 2 | virtual CHECKED_STATUS PostStart() override { |
787 | 2 | if (current_hook_.get()) RETURN_NOT_OK(current_hook_->PostStart()); |
788 | 2 | std::lock_guard<simple_spinlock> lock(lock_); |
789 | 2 | post_start_calls_++; |
790 | 2 | return Status::OK(); |
791 | 2 | } |
792 | | |
793 | 0 | virtual CHECKED_STATUS PreConfigChange() override { |
794 | 0 | if (current_hook_.get()) RETURN_NOT_OK(current_hook_->PreConfigChange()); |
795 | 0 | std::lock_guard<simple_spinlock> lock(lock_); |
796 | 0 | pre_config_change_calls_++; |
797 | 0 | return Status::OK(); |
798 | 0 | } Unexecuted instantiation: _ZN2yb9consensus12CounterHooks15PreConfigChangeEv Unexecuted instantiation: _ZN2yb9consensus12CounterHooks15PreConfigChangeEv |
799 | | |
800 | 0 | virtual CHECKED_STATUS PostConfigChange() override { |
801 | 0 | if (current_hook_.get()) RETURN_NOT_OK(current_hook_->PostConfigChange()); |
802 | 0 | std::lock_guard<simple_spinlock> lock(lock_); |
803 | 0 | post_config_change_calls_++; |
804 | 0 | return Status::OK(); |
805 | 0 | } Unexecuted instantiation: _ZN2yb9consensus12CounterHooks16PostConfigChangeEv Unexecuted instantiation: _ZN2yb9consensus12CounterHooks16PostConfigChangeEv |
806 | | |
807 | 0 | virtual CHECKED_STATUS PreReplicate() override { |
808 | 0 | if (current_hook_.get()) RETURN_NOT_OK(current_hook_->PreReplicate()); |
809 | 0 | std::lock_guard<simple_spinlock> lock(lock_); |
810 | 0 | pre_replicate_calls_++; |
811 | 0 | return Status::OK(); |
812 | 0 | } Unexecuted instantiation: _ZN2yb9consensus12CounterHooks12PreReplicateEv Unexecuted instantiation: _ZN2yb9consensus12CounterHooks12PreReplicateEv |
813 | | |
814 | 0 | virtual CHECKED_STATUS PostReplicate() override { |
815 | 0 | if (current_hook_.get()) RETURN_NOT_OK(current_hook_->PostReplicate()); |
816 | 0 | std::lock_guard<simple_spinlock> lock(lock_); |
817 | 0 | post_replicate_calls_++; |
818 | 0 | return Status::OK(); |
819 | 0 | } Unexecuted instantiation: _ZN2yb9consensus12CounterHooks13PostReplicateEv Unexecuted instantiation: _ZN2yb9consensus12CounterHooks13PostReplicateEv |
820 | | |
821 | 14 | virtual CHECKED_STATUS PreUpdate() override { |
822 | 14 | if (current_hook_.get()) RETURN_NOT_OK(current_hook_->PreUpdate()); |
823 | 14 | std::lock_guard<simple_spinlock> lock(lock_); |
824 | 14 | pre_update_calls_++; |
825 | 14 | return Status::OK(); |
826 | 14 | } |
827 | | |
828 | 14 | virtual CHECKED_STATUS PostUpdate() override { |
829 | 14 | if (current_hook_.get()) RETURN_NOT_OK(current_hook_->PostUpdate()); |
830 | 14 | std::lock_guard<simple_spinlock> lock(lock_); |
831 | 14 | post_update_calls_++; |
832 | 14 | return Status::OK(); |
833 | 14 | } |
834 | | |
835 | 2 | virtual CHECKED_STATUS PreShutdown() override { |
836 | 2 | if (current_hook_.get()) RETURN_NOT_OK(current_hook_->PreShutdown()); |
837 | 2 | std::lock_guard<simple_spinlock> lock(lock_); |
838 | 2 | pre_shutdown_calls_++; |
839 | 2 | return Status::OK(); |
840 | 2 | } |
841 | | |
842 | 2 | virtual CHECKED_STATUS PostShutdown() override { |
843 | 2 | if (current_hook_.get()) RETURN_NOT_OK(current_hook_->PostShutdown()); |
844 | 2 | std::lock_guard<simple_spinlock> lock(lock_); |
845 | 2 | post_shutdown_calls_++; |
846 | 2 | return Status::OK(); |
847 | 2 | } |
848 | | |
849 | 0 | int num_pre_start_calls() { |
850 | 0 | std::lock_guard<simple_spinlock> lock(lock_); |
851 | 0 | return pre_start_calls_; |
852 | 0 | } |
853 | | |
854 | 0 | int num_post_start_calls() { |
855 | 0 | std::lock_guard<simple_spinlock> lock(lock_); |
856 | 0 | return post_start_calls_; |
857 | 0 | } |
858 | | |
859 | 0 | int num_pre_config_change_calls() { |
860 | 0 | std::lock_guard<simple_spinlock> lock(lock_); |
861 | 0 | return pre_config_change_calls_; |
862 | 0 | } |
863 | | |
864 | 0 | int num_post_config_change_calls() { |
865 | 0 | std::lock_guard<simple_spinlock> lock(lock_); |
866 | 0 | return post_config_change_calls_; |
867 | 0 | } |
868 | | |
869 | 0 | int num_pre_replicate_calls() { |
870 | 0 | std::lock_guard<simple_spinlock> lock(lock_); |
871 | 0 | return pre_replicate_calls_; |
872 | 0 | } |
873 | | |
874 | 0 | int num_post_replicate_calls() { |
875 | 0 | std::lock_guard<simple_spinlock> lock(lock_); |
876 | 0 | return post_replicate_calls_; |
877 | 0 | } |
878 | | |
879 | 4 | int num_pre_update_calls() { |
880 | 4 | std::lock_guard<simple_spinlock> lock(lock_); |
881 | 4 | return pre_update_calls_; |
882 | 4 | } |
883 | | |
884 | 0 | int num_post_update_calls() { |
885 | 0 | std::lock_guard<simple_spinlock> lock(lock_); |
886 | 0 | return post_update_calls_; |
887 | 0 | } |
888 | | |
889 | 0 | int num_pre_shutdown_calls() { |
890 | 0 | std::lock_guard<simple_spinlock> lock(lock_); |
891 | 0 | return pre_shutdown_calls_; |
892 | 0 | } |
893 | | |
894 | 0 | int num_post_shutdown_calls() { |
895 | 0 | std::lock_guard<simple_spinlock> lock(lock_); |
896 | 0 | return post_shutdown_calls_; |
897 | 0 | } |
898 | | |
899 | | private: |
900 | | std::shared_ptr<Consensus::ConsensusFaultHooks> current_hook_; |
901 | | int pre_start_calls_; |
902 | | int post_start_calls_; |
903 | | int pre_config_change_calls_; |
904 | | int post_config_change_calls_; |
905 | | int pre_replicate_calls_; |
906 | | int post_replicate_calls_; |
907 | | int pre_update_calls_; |
908 | | int post_update_calls_; |
909 | | int pre_shutdown_calls_; |
910 | | int post_shutdown_calls_; |
911 | | |
912 | | // Lock that protects updates to the counters. |
913 | | mutable simple_spinlock lock_; |
914 | | }; |
915 | | |
916 | | class TestRaftConsensusQueueIface : public PeerMessageQueueObserver { |
917 | | public: |
918 | 71 | bool IsMajorityReplicated(int64_t index) { |
919 | 71 | std::lock_guard<simple_spinlock> lock(lock_); |
920 | 71 | return majority_replicated_op_id_.index >= index; |
921 | 71 | } |
922 | | |
923 | 0 | OpId majority_replicated_op_id() { |
924 | 0 | std::lock_guard<simple_spinlock> lock(lock_); |
925 | 0 | return majority_replicated_op_id_; |
926 | 0 | } |
927 | | |
928 | 13 | void WaitForMajorityReplicatedIndex(int64_t index, MonoDelta timeout = MonoDelta(30s)) { |
929 | 13 | ASSERT_OK(WaitFor( |
930 | 13 | [&]() { return IsMajorityReplicated(index); }, |
931 | 13 | timeout, Format("waiting for index $0 to be replicated", index))); |
932 | 13 | } |
933 | | |
934 | | protected: |
935 | | void UpdateMajorityReplicated( |
936 | | const MajorityReplicatedData& data, OpId* committed_index, |
937 | 740 | OpId* last_applied_op_id) override { |
938 | 740 | std::lock_guard<simple_spinlock> lock(lock_); |
939 | 740 | majority_replicated_op_id_ = data.op_id; |
940 | 740 | *committed_index = data.op_id; |
941 | 740 | *last_applied_op_id = data.op_id; |
942 | 740 | } |
943 | 0 | void NotifyTermChange(int64_t term) override {} |
944 | | void NotifyFailedFollower(const std::string& uuid, |
945 | | int64_t term, |
946 | 0 | const std::string& reason) override {} |
947 | 0 | void MajorityReplicatedNumSSTFilesChanged(uint64_t) override {} |
948 | | |
949 | | private: |
950 | | mutable simple_spinlock lock_; |
951 | | OpId majority_replicated_op_id_; |
952 | | }; |
953 | | |
954 | | } // namespace consensus |
955 | | } // namespace yb |
956 | | |
957 | | #endif /* YB_CONSENSUS_CONSENSUS_TEST_UTIL_H_ */ |