/Users/deen/code/yugabyte-db/src/yb/integration-tests/raft_consensus-itest.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | // |
18 | | // The following only applies to changes made to this file as part of YugaByte development. |
19 | | // |
20 | | // Portions Copyright (c) YugaByte, Inc. |
21 | | // |
22 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
23 | | // in compliance with the License. You may obtain a copy of the License at |
24 | | // |
25 | | // http://www.apache.org/licenses/LICENSE-2.0 |
26 | | // |
27 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
28 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
29 | | // or implied. See the License for the specific language governing permissions and limitations |
30 | | // under the License. |
31 | | // |
32 | | |
33 | | #include <regex> |
34 | | #include <unordered_map> |
35 | | #include <unordered_set> |
36 | | |
37 | | #include <boost/optional.hpp> |
38 | | #include <glog/logging.h> |
39 | | #include <glog/stl_logging.h> |
40 | | #include <gtest/gtest.h> |
41 | | |
42 | | #include "yb/client/client.h" |
43 | | #include "yb/client/error.h" |
44 | | #include "yb/client/session.h" |
45 | | #include "yb/client/table_handle.h" |
46 | | #include "yb/client/yb_op.h" |
47 | | |
48 | | #include "yb/common/partition.h" |
49 | | #include "yb/common/ql_type.h" |
50 | | #include "yb/common/schema.h" |
51 | | #include "yb/common/wire_protocol-test-util.h" |
52 | | #include "yb/common/wire_protocol.h" |
53 | | |
54 | | #include "yb/consensus/consensus.pb.h" |
55 | | #include "yb/consensus/consensus.proxy.h" |
56 | | #include "yb/consensus/consensus_peers.h" |
57 | | #include "yb/consensus/consensus_types.pb.h" |
58 | | #include "yb/consensus/metadata.pb.h" |
59 | | #include "yb/consensus/opid_util.h" |
60 | | #include "yb/consensus/quorum_util.h" |
61 | | |
62 | | #include "yb/docdb/doc_key.h" |
63 | | #include "yb/docdb/value_type.h" |
64 | | |
65 | | #include "yb/gutil/map-util.h" |
66 | | #include "yb/gutil/strings/strcat.h" |
67 | | #include "yb/gutil/strings/util.h" |
68 | | |
69 | | #include "yb/integration-tests/cluster_verifier.h" |
70 | | #include "yb/integration-tests/external_mini_cluster.h" |
71 | | #include "yb/integration-tests/external_mini_cluster_fs_inspector.h" |
72 | | #include "yb/integration-tests/test_workload.h" |
73 | | #include "yb/integration-tests/ts_itest-base.h" |
74 | | |
75 | | #include "yb/master/master_client.proxy.h" |
76 | | #include "yb/master/master_cluster.proxy.h" |
77 | | |
78 | | #include "yb/rpc/messenger.h" |
79 | | #include "yb/rpc/proxy.h" |
80 | | #include "yb/rpc/rpc_controller.h" |
81 | | #include "yb/rpc/rpc_test_util.h" |
82 | | |
83 | | #include "yb/server/hybrid_clock.h" |
84 | | #include "yb/server/server_base.proxy.h" |
85 | | |
86 | | #include "yb/tserver/tserver_admin.proxy.h" |
87 | | #include "yb/tserver/tserver_service.proxy.h" |
88 | | |
89 | | #include "yb/util/oid_generator.h" |
90 | | #include "yb/util/opid.pb.h" |
91 | | #include "yb/util/scope_exit.h" |
92 | | #include "yb/util/size_literals.h" |
93 | | #include "yb/util/status_log.h" |
94 | | #include "yb/util/stopwatch.h" |
95 | | #include "yb/util/thread.h" |
96 | | #include "yb/util/tsan_util.h" |
97 | | |
98 | | using namespace std::literals; |
99 | | |
100 | | DEFINE_int32(num_client_threads, 8, |
101 | | "Number of client threads to launch"); |
102 | | DEFINE_int32(client_inserts_per_thread, 50, |
103 | | "Number of rows inserted by each client thread"); |
104 | | DEFINE_int32(client_num_batches_per_thread, 5, |
105 | | "In how many batches to group the rows, for each client"); |
106 | | DECLARE_int32(consensus_rpc_timeout_ms); |
107 | | DECLARE_int32(leader_lease_duration_ms); |
108 | | DECLARE_int32(ht_lease_duration_ms); |
109 | | DECLARE_int32(rpc_timeout); |
110 | | |
111 | | METRIC_DECLARE_entity(tablet); |
112 | | METRIC_DECLARE_counter(not_leader_rejections); |
113 | | METRIC_DECLARE_gauge_int64(raft_term); |
114 | | METRIC_DECLARE_counter(log_cache_disk_reads); |
115 | | METRIC_DECLARE_gauge_int64(log_cache_num_ops); |
116 | | |
117 | | namespace yb { |
118 | | namespace tserver { |
119 | | |
120 | | using std::unordered_map; |
121 | | using std::unordered_set; |
122 | | using std::vector; |
123 | | using std::shared_ptr; |
124 | | |
125 | | using client::YBSession; |
126 | | using client::YBTable; |
127 | | using client::YBTableName; |
128 | | using consensus::ConsensusRequestPB; |
129 | | using consensus::ConsensusResponsePB; |
130 | | using consensus::ConsensusServiceProxy; |
131 | | using consensus::MajoritySize; |
132 | | using consensus::MakeOpId; |
133 | | using consensus::PeerMemberType; |
134 | | using consensus::RaftPeerPB; |
135 | | using consensus::ReplicateMsg; |
136 | | using consensus::LeaderLeaseCheckMode; |
137 | | using docdb::KeyValuePairPB; |
138 | | using docdb::SubDocKey; |
139 | | using docdb::DocKey; |
140 | | using docdb::PrimitiveValue; |
141 | | using itest::AddServer; |
142 | | using itest::GetReplicaStatusAndCheckIfLeader; |
143 | | using itest::LeaderStepDown; |
144 | | using itest::TabletServerMap; |
145 | | using itest::TabletServerMapUnowned; |
146 | | using itest::TServerDetails; |
147 | | using itest::RemoveServer; |
148 | | using itest::StartElection; |
149 | | using itest::WaitUntilNumberOfAliveTServersEqual; |
150 | | using itest::WaitUntilLeader; |
151 | | using itest::WriteSimpleTestRow; |
152 | | using master::GetTabletLocationsRequestPB; |
153 | | using master::GetTabletLocationsResponsePB; |
154 | | using master::TabletLocationsPB; |
155 | | using rpc::RpcController; |
156 | | using server::SetFlagRequestPB; |
157 | | using server::SetFlagResponsePB; |
158 | | using server::HybridClock; |
159 | | using server::ClockPtr; |
160 | | using strings::Substitute; |
161 | | |
162 | | static const int kConsensusRpcTimeoutForTests = 50; |
163 | | |
164 | | static const int kTestRowKey = 1234; |
165 | | static const int kTestRowIntVal = 5678; |
166 | | |
167 | | // Integration test for the raft consensus implementation. |
168 | | // Uses the whole tablet server stack with ExternalMiniCluster. |
169 | | class RaftConsensusITest : public TabletServerIntegrationTestBase { |
170 | | public: |
171 | | RaftConsensusITest() |
172 | | : inserters_(FLAGS_num_client_threads), |
173 | 46 | clock_(new HybridClock()) { |
174 | 46 | CHECK_OK(clock_->Init()); |
175 | 46 | } |
176 | | |
177 | 46 | void SetUp() override { |
178 | 46 | TabletServerIntegrationTestBase::SetUp(); |
179 | 46 | FLAGS_consensus_rpc_timeout_ms = kConsensusRpcTimeoutForTests; |
180 | 46 | } |
181 | | |
182 | | void ScanReplica(TabletServerServiceProxy* replica_proxy, |
183 | 4 | vector<string>* results) { |
184 | | |
185 | 4 | ReadRequestPB req; |
186 | 4 | ReadResponsePB resp; |
187 | 4 | RpcController rpc; |
188 | 4 | rpc.set_timeout(MonoDelta::FromSeconds(10)); // Squelch warnings. |
189 | | |
190 | 4 | req.set_tablet_id(tablet_id_); |
191 | 4 | req.set_consistency_level(YBConsistencyLevel::CONSISTENT_PREFIX); |
192 | 4 | auto batch = req.add_ql_batch(); |
193 | 4 | batch->set_schema_version(0); |
194 | 4 | int id = kFirstColumnId; |
195 | 4 | auto rsrow = batch->mutable_rsrow_desc(); |
196 | 12 | for (const auto& col : schema_.columns()) { |
197 | 12 | batch->add_selected_exprs()->set_column_id(id); |
198 | 12 | batch->mutable_column_refs()->add_ids(id); |
199 | 12 | auto coldesc = rsrow->add_rscol_descs(); |
200 | 12 | coldesc->set_name(col.name()); |
201 | 12 | col.type()->ToQLTypePB(coldesc->mutable_ql_type()); |
202 | 12 | ++id; |
203 | 12 | } |
204 | | |
205 | | // Send the call. |
206 | 4 | { |
207 | 4 | SCOPED_TRACE(req.DebugString()); |
208 | 4 | ASSERT_OK(replica_proxy->Read(req, &resp, &rpc)); |
209 | 4 | SCOPED_TRACE(resp.DebugString()); |
210 | 4 | if (resp.has_error()) { |
211 | 0 | ASSERT_OK(StatusFromPB(resp.error().status())); |
212 | 0 | } |
213 | 4 | } |
214 | | |
215 | 4 | Schema schema(client::MakeColumnSchemasFromColDesc(rsrow->rscol_descs()), 0); |
216 | 4 | QLRowBlock result(schema); |
217 | 4 | Slice data = ASSERT_RESULT(rpc.GetSidecar(0)); |
218 | 4 | if (!data.empty()) { |
219 | 4 | ASSERT_OK(result.Deserialize(QLClient::YQL_CLIENT_CQL, &data)); |
220 | 4 | } |
221 | 10 | for (const auto& row : result.rows()) { |
222 | 10 | results->push_back(row.ToString()); |
223 | 10 | } |
224 | | |
225 | 4 | std::sort(results->begin(), results->end()); |
226 | 4 | } |
227 | | |
228 | | // Scan the given replica in a loop until the number of rows |
229 | | // is 'expected_count'. If it takes more than 10 seconds, then |
230 | | // fails the test. |
231 | | void WaitForRowCount(TabletServerServiceProxy* replica_proxy, |
232 | | size_t expected_count, |
233 | 3 | vector<string>* results) { |
234 | 3 | LOG(INFO) << "Waiting for row count " << expected_count << "..."; |
235 | 3 | MonoTime start = MonoTime::Now(); |
236 | 3 | MonoTime deadline = MonoTime::Now(); |
237 | 3 | deadline.AddDelta(MonoDelta::FromSeconds(10)); |
238 | 3 | while (true) { |
239 | 3 | results->clear(); |
240 | 3 | ASSERT_NO_FATALS(ScanReplica(replica_proxy, results)); |
241 | 3 | if (results->size() == expected_count) { |
242 | 3 | return; |
243 | 3 | } |
244 | 0 | SleepFor(MonoDelta::FromMilliseconds(10)); |
245 | 0 | if (!MonoTime::Now().ComesBefore(deadline)) { |
246 | 0 | break; |
247 | 0 | } |
248 | 0 | } |
249 | 0 | MonoTime end = MonoTime::Now(); |
250 | 0 | LOG(WARNING) << "Didn't reach row count " << expected_count; |
251 | 0 | FAIL() << "Did not reach expected row count " << expected_count |
252 | 0 | << " after " << end.GetDeltaSince(start).ToString() |
253 | 0 | << ": rows: " << *results; |
254 | 3 | } |
255 | | |
256 | | // Add an Insert operation to the given consensus request. |
257 | | // The row to be inserted is generated based on the OpId. |
258 | | void AddOp(const OpIdPB& id, ConsensusRequestPB* req); |
259 | | |
260 | | string DumpToString(TServerDetails* leader, |
261 | | const vector<string>& leader_results, |
262 | | TServerDetails* replica, |
263 | 0 | const vector<string>& replica_results) { |
264 | 0 | string ret = strings::Substitute("Replica results did not match the leaders." |
265 | 0 | "\nLeader: $0\nReplica: $1. Results size " |
266 | 0 | "L: $2 R: $3", |
267 | 0 | leader->ToString(), |
268 | 0 | replica->ToString(), |
269 | 0 | leader_results.size(), |
270 | 0 | replica_results.size()); |
271 | 0 |
|
272 | 0 | StrAppend(&ret, "Leader Results: \n"); |
273 | 0 | for (const string& result : leader_results) { |
274 | 0 | StrAppend(&ret, result, "\n"); |
275 | 0 | } |
276 | 0 |
|
277 | 0 | StrAppend(&ret, "Replica Results: \n"); |
278 | 0 | for (const string& result : replica_results) { |
279 | 0 | StrAppend(&ret, result, "\n"); |
280 | 0 | } |
281 | 0 |
|
282 | 0 | return ret; |
283 | 0 | } |
284 | | |
285 | | void InsertTestRowsRemoteThread(int32_t first_row, |
286 | | int32_t count, |
287 | | int num_batches, |
288 | 21 | const vector<CountDownLatch*>& latches) { |
289 | 21 | client::TableHandle table; |
290 | 21 | ASSERT_OK(table.Open(kTableName, client_.get())); |
291 | | |
292 | 21 | shared_ptr<YBSession> session = client_->NewSession(); |
293 | 21 | session->SetTimeout(60s); |
294 | | |
295 | 122 | for (int i = 0; i < num_batches; i++) { |
296 | 101 | SCOPED_TRACE(Format("Batch: $0", i)); |
297 | | |
298 | 101 | auto first_row_in_batch = first_row + (i * count / num_batches); |
299 | 101 | auto last_row_in_batch = first_row_in_batch + count / num_batches; |
300 | | |
301 | 1.11k | for (int j = first_row_in_batch; j < last_row_in_batch; j++) { |
302 | 1.00k | auto op = table.NewWriteOp(QLWriteRequestPB::QL_STMT_INSERT); |
303 | 1.00k | auto* const req = op->mutable_request(); |
304 | 1.00k | QLAddInt32HashValue(req, j); |
305 | 1.00k | table.AddInt32ColumnValue(req, "int_val", j * 2); |
306 | 1.00k | table.AddStringColumnValue(req, "string_val", StringPrintf("hello %d", j)); |
307 | 1.00k | session->Apply(op); |
308 | 1.00k | } |
309 | | |
310 | | // We don't handle write idempotency yet. (i.e making sure that when a leader fails |
311 | | // writes to it that were eventually committed by the new leader but un-ackd to the |
312 | | // client are not retried), so some errors are expected. |
313 | | // It's OK as long as the errors are Status::AlreadyPresent(); |
314 | | |
315 | 101 | int inserted = last_row_in_batch - first_row_in_batch; |
316 | | |
317 | 101 | const auto flush_status = session->FlushAndGetOpsErrors(); |
318 | 101 | const auto& s = flush_status.status; |
319 | 101 | if (PREDICT_FALSE(!s.ok())) { |
320 | 0 | for (const auto& e : flush_status.errors) { |
321 | 0 | ASSERT_TRUE(e->status().IsAlreadyPresent()) << "Unexpected error: " << e->status(); |
322 | 0 | } |
323 | 0 | inserted -= flush_status.errors.size(); |
324 | 0 | } |
325 | | |
326 | 101 | for (CountDownLatch* latch : latches) { |
327 | 79 | latch->CountDown(inserted); |
328 | 79 | } |
329 | 101 | } |
330 | | |
331 | 21 | inserters_.CountDown(); |
332 | 21 | } |
333 | | |
334 | | // Brings Chaos to a MiniTabletServer by introducing random delays. Does this by |
335 | | // pausing the daemon a random amount of time. |
336 | 3 | void DelayInjectorThread(ExternalTabletServer* tablet_server, int timeout_msec) { |
337 | 31 | while (inserters_.count() > 0) { |
338 | | |
339 | | // Adjust the value obtained from the normalized gauss. dist. so that we steal the lock |
340 | | // longer than the timeout a small (~5%) percentage of the times. |
341 | | // (95% corresponds to 1.64485, in a normalized (0,1) gaussian distribution). |
342 | 28 | double sleep_time_usec = 1000 * |
343 | 28 | ((random_.Normal(0, 1) * timeout_msec) / 1.64485); |
344 | | |
345 | 28 | if (sleep_time_usec < 0) sleep_time_usec = 0; |
346 | | |
347 | | // Additionally only cause timeouts at all 50% of the time, otherwise sleep. |
348 | 28 | double val = (rand() * 1.0) / RAND_MAX; // NOLINT(runtime/threadsafe_fn) |
349 | 28 | if (val < 0.5) { |
350 | 14 | SleepFor(MonoDelta::FromMicroseconds(sleep_time_usec)); |
351 | 14 | continue; |
352 | 14 | } |
353 | | |
354 | 14 | ASSERT_OK(tablet_server->Pause()); |
355 | 7 | LOG_IF(INFO, sleep_time_usec > 0.0) |
356 | 7 | << "Delay injector thread for TS " << tablet_server->instance_id().permanent_uuid() |
357 | 7 | << " SIGSTOPped the ts, sleeping for " << sleep_time_usec << " usec..."; |
358 | 14 | SleepFor(MonoDelta::FromMicroseconds(sleep_time_usec)); |
359 | 14 | ASSERT_OK(tablet_server->Resume()); |
360 | 14 | } |
361 | 3 | } |
362 | | |
363 | | // Thread which loops until '*finish' becomes true, trying to insert a row |
364 | | // on the given tablet server identified by 'replica_idx'. |
365 | | void StubbornlyWriteSameRowThread(int replica_idx, const AtomicBool* finish); |
366 | | |
367 | | // Stops the current leader of the configuration, runs leader election and then brings it back. |
368 | | // Before stopping the leader this pauses all follower nodes in regular intervals so that |
369 | | // we get an increased chance of stuff being pending. |
370 | 3 | void StopOrKillLeaderAndElectNewOne() { |
371 | 3 | bool kill = rand() % 2 == 0; // NOLINT(runtime/threadsafe_fn) |
372 | | |
373 | 3 | TServerDetails* old_leader; |
374 | 3 | CHECK_OK(GetLeaderReplicaWithRetries(tablet_id_, &old_leader)); |
375 | 3 | ExternalTabletServer* old_leader_ets = cluster_->tablet_server_by_uuid(old_leader->uuid()); |
376 | | |
377 | 3 | vector<TServerDetails*> followers; |
378 | 3 | GetOnlyLiveFollowerReplicas(tablet_id_, &followers); |
379 | | |
380 | 6 | for (TServerDetails* ts : followers) { |
381 | 6 | ExternalTabletServer* ets = cluster_->tablet_server_by_uuid(ts->uuid()); |
382 | 6 | CHECK_OK(ets->Pause()); |
383 | 6 | SleepFor(MonoDelta::FromMilliseconds(100)); |
384 | 6 | } |
385 | | |
386 | | // When all are paused also pause or kill the current leader. Since we've waited a bit |
387 | | // the old leader is likely to have operations that must be aborted. |
388 | 3 | if (kill) { |
389 | 2 | old_leader_ets->Shutdown(); |
390 | 1 | } else { |
391 | 1 | CHECK_OK(old_leader_ets->Pause()); |
392 | 1 | } |
393 | | |
394 | | // Resume the replicas. |
395 | 6 | for (TServerDetails* ts : followers) { |
396 | 6 | ExternalTabletServer* ets = cluster_->tablet_server_by_uuid(ts->uuid()); |
397 | 6 | CHECK_OK(ets->Resume()); |
398 | 6 | } |
399 | | |
400 | | // Get the new leader. |
401 | 3 | TServerDetails* new_leader; |
402 | 3 | CHECK_OK(GetLeaderReplicaWithRetries(tablet_id_, &new_leader)); |
403 | | |
404 | | // Bring the old leader back. |
405 | 3 | if (kill) { |
406 | 2 | CHECK_OK(old_leader_ets->Restart()); |
407 | | // Wait until we have the same number of followers. |
408 | 2 | auto initial_followers = followers.size(); |
409 | 2 | do { |
410 | 2 | GetOnlyLiveFollowerReplicas(tablet_id_, &followers); |
411 | 2 | } while (followers.size() < initial_followers); |
412 | 1 | } else { |
413 | 1 | CHECK_OK(old_leader_ets->Resume()); |
414 | 1 | } |
415 | 3 | } |
416 | | |
417 | | // Writes 'num_writes' operations to the current leader. Each of the operations |
418 | | // has a payload of around `size_bytes`. Causes a gtest failure on error. |
419 | | void WriteOpsToLeader(int num_writes, size_t size_bytes); |
420 | | |
421 | | // Check for and restart any TS that have crashed. |
422 | | // Returns the number of servers restarted. |
423 | | int RestartAnyCrashedTabletServers(); |
424 | | |
425 | | // Assert that no tablet servers have crashed. |
426 | | // Tablet servers that have been manually Shutdown() are allowed. |
427 | | void AssertNoTabletServersCrashed(); |
428 | | |
429 | | Result<int64_t> GetNumLogCacheOpsReadFromDisk(); |
430 | | |
431 | | // Ensure that a majority of servers is required for elections and writes. |
432 | | // This is done by pausing a majority and asserting that writes and elections fail, |
433 | | // then unpausing the majority and asserting that elections and writes succeed. |
434 | | // If fails, throws a gtest assertion. |
435 | | // Note: This test assumes all tablet servers listed in tablet_servers are voters. |
436 | | void AssertMajorityRequiredForElectionsAndWrites(const TabletServerMapUnowned& tablet_servers, |
437 | | const string& leader_uuid); |
438 | | |
439 | | // Return the replicas of the specified 'tablet_id', as seen by the Master. |
440 | | Status GetTabletLocations(const string& tablet_id, const MonoDelta& timeout, |
441 | | master::TabletLocationsPB* tablet_locations); |
442 | | |
443 | | enum WaitForLeader { |
444 | | NO_WAIT_FOR_LEADER = 0, |
445 | | WAIT_FOR_LEADER = 1 |
446 | | }; |
447 | | |
448 | | // Wait for the specified number of replicas to be reported by the master for |
449 | | // the given tablet. Fails with an assertion if the timeout expires. |
450 | | void WaitForReplicasReportedToMaster(int num_replicas, const string& tablet_id, |
451 | | const MonoDelta& timeout, |
452 | | WaitForLeader wait_for_leader, |
453 | | bool* has_leader, |
454 | | master::TabletLocationsPB* tablet_locations); |
455 | | |
456 | | static const bool WITH_NOTIFICATION_LATENCY = true; |
457 | | static const bool WITHOUT_NOTIFICATION_LATENCY = false; |
458 | | void DoTestChurnyElections(bool with_latency); |
459 | | |
460 | | protected: |
461 | | // Flags needed for CauseFollowerToFallBehindLogGC() to work well. |
462 | | void AddFlagsForLogRolls(vector<string>* extra_tserver_flags); |
463 | | |
464 | | // Pause one of the followers and write enough data to the remaining replicas |
465 | | // to cause log GC, then resume the paused follower. On success, |
466 | | // 'leader_uuid' will be set to the UUID of the leader, 'orig_term' will be |
467 | | // set to the term of the leader before un-pausing the follower, and |
468 | | // 'fell_behind_uuid' will be set to the UUID of the follower that was paused |
469 | | // and caused to fall behind. These can be used for verification purposes. |
470 | | // |
471 | | // Certain flags should be set. You can add the required flags with |
472 | | // AddFlagsForLogRolls() before starting the cluster. |
473 | | void CauseFollowerToFallBehindLogGC(string* leader_uuid, |
474 | | int64_t* orig_term, |
475 | | string* fell_behind_uuid); |
476 | | |
477 | | void TestAddRemoveServer(PeerMemberType member_type); |
478 | | void TestRemoveTserverFailsWhenServerInTransition(PeerMemberType member_type); |
479 | | void TestRemoveTserverInTransitionSucceeds(PeerMemberType member_type); |
480 | | |
481 | | std::vector<scoped_refptr<yb::Thread> > threads_; |
482 | | CountDownLatch inserters_; |
483 | | ClockPtr clock_; |
484 | | }; |
485 | | |
486 | 3 | void RaftConsensusITest::AddFlagsForLogRolls(vector<string>* extra_tserver_flags) { |
487 | | // We configure a small log segment size so that we roll frequently, |
488 | | // configure a small cache size so that we evict data from the cache, and |
489 | | // retain as few segments as possible. We also turn off async segment |
490 | | // allocation -- this ensures that we roll many segments of logs (with async |
491 | | // allocation, it's possible that the preallocation is slow and we wouldn't |
492 | | // roll deterministically). |
493 | 3 | extra_tserver_flags->push_back("--log_cache_size_limit_mb=1"); |
494 | 3 | extra_tserver_flags->push_back("--log_segment_size_mb=1"); |
495 | 3 | extra_tserver_flags->push_back("--log_async_preallocate_segments=false"); |
496 | 3 | extra_tserver_flags->push_back("--log_min_segments_to_retain=1"); |
497 | 3 | extra_tserver_flags->push_back("--log_min_seconds_to_retain=0"); |
498 | 3 | extra_tserver_flags->push_back("--maintenance_manager_polling_interval_ms=100"); |
499 | 3 | extra_tserver_flags->push_back("--db_write_buffer_size=100000"); |
500 | 3 | } |
501 | | |
502 | | // Test that we can retrieve the permanent uuid of a server running |
503 | | // consensus service via RPC. |
504 | 1 | TEST_F(RaftConsensusITest, TestGetPermanentUuid) { |
505 | 1 | ASSERT_NO_FATALS(BuildAndStart(vector<string>())); |
506 | | |
507 | 1 | TServerDetails* leader = nullptr; |
508 | 1 | ASSERT_OK(GetLeaderReplicaWithRetries(tablet_id_, &leader)); |
509 | 1 | const string expected_uuid = leader->instance_id.permanent_uuid(); |
510 | | |
511 | 1 | rpc::MessengerBuilder builder("test builder"); |
512 | 1 | builder.set_num_reactors(1); |
513 | 1 | auto messenger = rpc::CreateAutoShutdownMessengerHolder(ASSERT_RESULT(builder.Build())); |
514 | 1 | rpc::ProxyCache proxy_cache(messenger.get()); |
515 | | |
516 | | // Set a decent timeout for allowing the masters to find eachother. |
517 | 1 | const auto kTimeout = 30s; |
518 | 1 | std::vector<HostPort> endpoints; |
519 | 1 | for (const auto& hp : leader->registration->common().private_rpc_addresses()) { |
520 | 1 | endpoints.push_back(HostPortFromPB(hp)); |
521 | 1 | } |
522 | 1 | RaftPeerPB peer; |
523 | 1 | ASSERT_OK(consensus::SetPermanentUuidForRemotePeer(&proxy_cache, kTimeout, endpoints, &peer)); |
524 | 1 | ASSERT_EQ(expected_uuid, peer.permanent_uuid()); |
525 | 1 | } |
526 | | |
527 | | // TODO allow the scan to define an operation id, fetch the last id |
528 | | // from the leader and then use that id to make the replica wait |
529 | | // until it is done. This will avoid the sleeps below. |
530 | 1 | TEST_F(RaftConsensusITest, TestInsertAndMutateThroughConsensus) { |
531 | 1 | ASSERT_NO_FATALS(BuildAndStart(vector<string>())); |
532 | | |
533 | 1 | int num_iters = AllowSlowTests() ? 10 : 1; |
534 | | |
535 | 2 | for (int i = 0; i < num_iters; i++) { |
536 | 1 | ASSERT_NO_FATALS(InsertTestRowsRemoteThread( |
537 | 1 | i * FLAGS_client_inserts_per_thread, |
538 | 1 | FLAGS_client_inserts_per_thread, |
539 | 1 | FLAGS_client_num_batches_per_thread, |
540 | 1 | vector<CountDownLatch*>())); |
541 | 1 | } |
542 | 1 | ASSERT_ALL_REPLICAS_AGREE(FLAGS_client_inserts_per_thread * num_iters); |
543 | 1 | } |
544 | | |
545 | 1 | TEST_F(RaftConsensusITest, TestFailedOperation) { |
546 | 1 | ASSERT_NO_FATALS(BuildAndStart(vector<string>())); |
547 | | |
548 | | // Wait until we have a stable leader. |
549 | 1 | ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(10), tablet_servers_, |
550 | 1 | tablet_id_, 1)); |
551 | | |
552 | 1 | WriteResponsePB resp; |
553 | 1 | RpcController controller; |
554 | 1 | controller.set_timeout(MonoDelta::FromSeconds(FLAGS_rpc_timeout)); |
555 | | |
556 | 1 | TServerDetails* leader = nullptr; |
557 | 1 | ASSERT_OK(GetLeaderReplicaWithRetries(tablet_id_, &leader)); |
558 | | |
559 | 1 | WriteRequestPB req; |
560 | 1 | req.set_tablet_id(tablet_id_); |
561 | 1 | req.add_ql_write_batch()->set_schema_version(123); |
562 | | |
563 | 1 | ASSERT_OK(DCHECK_NOTNULL(leader->tserver_proxy.get())->Write(req, &resp, &controller)); |
564 | 2 | ASSERT_NE(QLResponsePB::YQL_STATUS_OK, resp.ql_response_batch(0).status()) |
565 | 2 | << "Response: " << resp.ShortDebugString(); |
566 | | |
567 | | // Add a proper row so that we can verify that all of the replicas continue |
568 | | // to process transactions after a failure. Additionally, this allows us to wait |
569 | | // for all of the replicas to finish processing transactions before shutting down, |
570 | | // avoiding a potential stall as we currently can't abort transactions (see KUDU-341). |
571 | | |
572 | 1 | req.Clear(); |
573 | 1 | req.set_tablet_id(tablet_id_); |
574 | 1 | AddTestRowInsert(0, 0, "original0", &req); |
575 | | |
576 | 1 | controller.Reset(); |
577 | 1 | controller.set_timeout(MonoDelta::FromSeconds(FLAGS_rpc_timeout)); |
578 | | |
579 | 1 | ASSERT_OK(DCHECK_NOTNULL(leader->tserver_proxy.get())->Write(req, &resp, &controller)); |
580 | 1 | SCOPED_TRACE(resp.ShortDebugString()); |
581 | 1 | ASSERT_FALSE(resp.has_error()); |
582 | | |
583 | 1 | ASSERT_ALL_REPLICAS_AGREE(1); |
584 | 1 | } |
585 | | |
586 | | // Inserts rows through consensus and also starts one delay injecting thread |
587 | | // that steals consensus peer locks for a while. This is meant to test that |
588 | | // even with timeouts and repeated requests consensus still works. |
589 | 1 | TEST_F(RaftConsensusITest, MultiThreadedMutateAndInsertThroughConsensus) { |
590 | 1 | ASSERT_NO_FATALS(BuildAndStart(vector<string>())); |
591 | | |
592 | 1 | if (500 == FLAGS_client_inserts_per_thread) { |
593 | 0 | if (AllowSlowTests()) { |
594 | 0 | FLAGS_client_inserts_per_thread = FLAGS_client_inserts_per_thread * 10; |
595 | 0 | FLAGS_client_num_batches_per_thread = FLAGS_client_num_batches_per_thread * 10; |
596 | 0 | } |
597 | 0 | } |
598 | | |
599 | 1 | int num_threads = FLAGS_num_client_threads; |
600 | 9 | for (int i = 0; i < num_threads; i++) { |
601 | 8 | scoped_refptr<yb::Thread> new_thread; |
602 | 8 | CHECK_OK(yb::Thread::Create("test", strings::Substitute("ts-test$0", i), |
603 | 8 | &RaftConsensusITest::InsertTestRowsRemoteThread, |
604 | 8 | this, i * FLAGS_client_inserts_per_thread, |
605 | 8 | FLAGS_client_inserts_per_thread, |
606 | 8 | FLAGS_client_num_batches_per_thread, |
607 | 8 | vector<CountDownLatch*>(), |
608 | 8 | &new_thread)); |
609 | 8 | threads_.push_back(new_thread); |
610 | 8 | } |
611 | 4 | for (int i = 0; i < FLAGS_num_replicas; i++) { |
612 | 3 | scoped_refptr<yb::Thread> new_thread; |
613 | 3 | CHECK_OK(yb::Thread::Create("test", strings::Substitute("chaos-test$0", i), |
614 | 3 | &RaftConsensusITest::DelayInjectorThread, |
615 | 3 | this, cluster_->tablet_server(i), |
616 | 3 | kConsensusRpcTimeoutForTests, |
617 | 3 | &new_thread)); |
618 | 3 | threads_.push_back(new_thread); |
619 | 3 | } |
620 | 11 | for (scoped_refptr<yb::Thread> thr : threads_) { |
621 | 11 | CHECK_OK(ThreadJoiner(thr.get()).Join()); |
622 | 11 | } |
623 | | |
624 | 1 | ASSERT_ALL_REPLICAS_AGREE(FLAGS_client_inserts_per_thread * FLAGS_num_client_threads); |
625 | 1 | } |
626 | | |
627 | 1 | TEST_F(RaftConsensusITest, TestReadOnNonLeader) { |
628 | 1 | ASSERT_NO_FATALS(BuildAndStart(vector<string>())); |
629 | | |
630 | | // Wait for the initial leader election to complete. |
631 | 1 | ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(10), tablet_servers_, |
632 | 1 | tablet_id_, 1)); |
633 | | |
634 | | // By default reads should be allowed only on the leader. |
635 | 1 | ReadRequestPB req; |
636 | 1 | ReadResponsePB resp; |
637 | 1 | RpcController rpc; |
638 | 1 | req.set_tablet_id(tablet_id_); |
639 | | |
640 | | // Perform a read on one of the followers. |
641 | 1 | vector<TServerDetails*> followers; |
642 | 1 | GetOnlyLiveFollowerReplicas(tablet_id_, &followers); |
643 | | |
644 | 2 | for (const auto& follower : followers) { |
645 | 2 | rpc.Reset(); |
646 | 2 | ASSERT_OK(follower->tserver_proxy->Read(req, &resp, &rpc)); |
647 | 2 | ASSERT_TRUE(resp.has_error()); |
648 | 2 | ASSERT_EQ(TabletServerErrorPB_Code_NOT_THE_LEADER, resp.error().code()); |
649 | 2 | } |
650 | 1 | } |
651 | | |
652 | 1 | TEST_F(RaftConsensusITest, TestInsertOnNonLeader) { |
653 | 1 | ASSERT_NO_FATALS(BuildAndStart(vector<string>())); |
654 | | |
655 | | // Wait for the initial leader election to complete. |
656 | 1 | ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(10), tablet_servers_, |
657 | 1 | tablet_id_, 1)); |
658 | | |
659 | | // Manually construct a write RPC to a replica and make sure it responds |
660 | | // with the correct error code. |
661 | 1 | WriteRequestPB req; |
662 | 1 | WriteResponsePB resp; |
663 | 1 | RpcController rpc; |
664 | 1 | req.set_tablet_id(tablet_id_); |
665 | 1 | AddTestRowInsert(kTestRowKey, kTestRowIntVal, "hello world via RPC", &req); |
666 | | |
667 | | // Get the leader. |
668 | 1 | vector<TServerDetails*> followers; |
669 | 1 | GetOnlyLiveFollowerReplicas(tablet_id_, &followers); |
670 | | |
671 | 1 | ASSERT_OK(followers[0]->tserver_proxy->Write(req, &resp, &rpc)); |
672 | 1 | SCOPED_TRACE(resp.DebugString()); |
673 | 1 | ASSERT_TRUE(resp.has_error()); |
674 | 1 | Status s = StatusFromPB(resp.error().status()); |
675 | 1 | EXPECT_TRUE(s.IsIllegalState()); |
676 | 1 | ASSERT_STR_CONTAINS(s.message().ToBuffer(), "Not the leader"); |
677 | | // TODO: need to change the error code to be something like REPLICA_NOT_LEADER |
678 | | // so that the client can properly handle this case! plumbing this is a little difficult |
679 | | // so not addressing at the moment. |
680 | 1 | ASSERT_ALL_REPLICAS_AGREE(0); |
681 | 1 | } |
682 | | |
683 | 1 | TEST_F(RaftConsensusITest, TestRunLeaderElection) { |
684 | | // Reset consensus rpc timeout to the default value or the election might fail often. |
685 | 1 | FLAGS_consensus_rpc_timeout_ms = 1000; |
686 | | |
687 | 1 | ASSERT_NO_FATALS(BuildAndStart(vector<string>())); |
688 | | |
689 | 1 | int num_iters = AllowSlowTests() ? 10 : 1; |
690 | | |
691 | 1 | ASSERT_NO_FATALS(InsertTestRowsRemoteThread( |
692 | 1 | 0, |
693 | 1 | FLAGS_client_inserts_per_thread * num_iters, |
694 | 1 | FLAGS_client_num_batches_per_thread, |
695 | 1 | vector<CountDownLatch*>())); |
696 | | |
697 | 1 | ASSERT_ALL_REPLICAS_AGREE(FLAGS_client_inserts_per_thread * num_iters); |
698 | | |
699 | | // Select the last follower to be new leader. |
700 | 0 | vector<TServerDetails*> followers; |
701 | 0 | GetOnlyLiveFollowerReplicas(tablet_id_, &followers); |
702 | | |
703 | | // Now shutdown the current leader. |
704 | 0 | TServerDetails* leader = DCHECK_NOTNULL(GetLeaderReplicaOrNull(tablet_id_)); |
705 | 0 | ExternalTabletServer* leader_ets = cluster_->tablet_server_by_uuid(leader->uuid()); |
706 | 0 | leader_ets->Shutdown(); |
707 | |
|
708 | 0 | TServerDetails* replica = followers.back(); |
709 | 0 | CHECK_NE(leader->instance_id.permanent_uuid(), replica->instance_id.permanent_uuid()); |
710 | | |
711 | | // Make the new replica leader. |
712 | 0 | ASSERT_OK(StartElection(replica, tablet_id_, MonoDelta::FromSeconds(10))); |
713 | | |
714 | | // Insert a bunch more rows. |
715 | 0 | ASSERT_NO_FATALS(InsertTestRowsRemoteThread( |
716 | 0 | FLAGS_client_inserts_per_thread * num_iters, |
717 | 0 | FLAGS_client_inserts_per_thread * num_iters, |
718 | 0 | FLAGS_client_num_batches_per_thread, |
719 | 0 | vector<CountDownLatch*>())); |
720 | | |
721 | | // Restart the original replica and make sure they all agree. |
722 | 0 | ASSERT_OK(leader_ets->Restart()); |
723 | |
|
724 | 0 | ASSERT_ALL_REPLICAS_AGREE(FLAGS_client_inserts_per_thread * num_iters * 2); |
725 | 0 | } |
726 | | |
727 | 5 | void RaftConsensusITest::WriteOpsToLeader(int num_writes, size_t size_bytes) { |
728 | 5 | TServerDetails* leader = nullptr; |
729 | 5 | ASSERT_OK(GetLeaderReplicaWithRetries(tablet_id_, &leader)); |
730 | | |
731 | 5 | WriteRequestPB req; |
732 | 5 | WriteResponsePB resp; |
733 | 5 | RpcController rpc; |
734 | 5 | rpc.set_timeout(MonoDelta::FromMilliseconds(10000)); |
735 | 5 | int key = 0; |
736 | | |
737 | | // generate dummy payload. |
738 | 5 | string test_payload(size_bytes, '0'); |
739 | 4.03k | for (int i = 0; i < num_writes; i++) { |
740 | 4.02k | rpc.Reset(); |
741 | 4.02k | req.Clear(); |
742 | 4.02k | req.set_tablet_id(tablet_id_); |
743 | 4.02k | AddTestRowInsert(key, key, test_payload, &req); |
744 | 4.02k | key++; |
745 | 4.02k | ASSERT_OK(leader->tserver_proxy->Write(req, &resp, &rpc)); |
746 | | |
747 | 8.05k | ASSERT_FALSE(resp.has_error()) << resp.DebugString(); |
748 | 4.02k | } |
749 | 5 | } |
750 | | |
751 | | // Test that when a follower is stopped for a long time, the log cache |
752 | | // properly evicts operations, but still allows the follower to catch |
753 | | // up when it comes back. |
754 | 1 | TEST_F(RaftConsensusITest, TestCatchupAfterOpsEvicted) { |
755 | 1 | vector<string> extra_flags; |
756 | 1 | extra_flags.push_back("--log_cache_size_limit_mb=1"); |
757 | 1 | extra_flags.push_back("--rpc_throttle_threshold_bytes=-1"); |
758 | 1 | extra_flags.push_back("--consensus_max_batch_size_bytes=500000"); |
759 | 1 | ASSERT_NO_FATALS(BuildAndStart(extra_flags)); |
760 | 1 | TServerDetails* replica = (*tablet_replicas_.begin()).second; |
761 | 1 | ASSERT_TRUE(replica != nullptr); |
762 | 1 | ExternalTabletServer* replica_ets = cluster_->tablet_server_by_uuid(replica->uuid()); |
763 | | |
764 | | // Pause a replica. |
765 | 1 | ASSERT_OK(replica_ets->Pause()); |
766 | 1 | LOG(INFO)<< "Paused one of the replicas, starting to write."; |
767 | | |
768 | | // Insert 3MB worth of data. |
769 | 1 | const int kNumWrites = 25; |
770 | 1 | ASSERT_NO_FATALS(WriteOpsToLeader(kNumWrites, 128_KB)); |
771 | | |
772 | | // Now unpause the replica, the lagging replica should eventually catch back up. |
773 | 1 | ASSERT_OK(replica_ets->Resume()); |
774 | | |
775 | 1 | ASSERT_ALL_REPLICAS_AGREE(kNumWrites); |
776 | 1 | } |
777 | | |
778 | 3 | Result<int64_t> RaftConsensusITest::GetNumLogCacheOpsReadFromDisk() { |
779 | 3 | TServerDetails* leader = nullptr; |
780 | 3 | RETURN_NOT_OK(GetLeaderReplicaWithRetries(tablet_id_, &leader)); |
781 | 3 | return cluster_->tablet_server_by_uuid(leader->uuid())->GetInt64Metric( |
782 | 3 | &METRIC_ENTITY_tablet, |
783 | 3 | nullptr, |
784 | 3 | &METRIC_log_cache_disk_reads, |
785 | 3 | "value"); |
786 | 3 | } |
787 | | |
788 | | // Test that when a follower is stopped for a long time, the log cache |
789 | | // reads few ops from disk due to exponential backoff on number of ops |
790 | | // to replicate to an unresponsive follower. |
791 | 1 | TEST_F(RaftConsensusITest, TestCatchupOpsReadFromDisk) { |
792 | 1 | vector<string> extra_flags = { |
793 | 1 | "--log_cache_size_limit_mb=1"s, |
794 | 1 | "--enable_consensus_exponential_backoff=true"s |
795 | 1 | }; |
796 | 1 | ASSERT_NO_FATALS(BuildAndStart(extra_flags)); |
797 | 1 | TServerDetails* replica = tablet_replicas_.begin()->second; |
798 | 1 | ASSERT_TRUE(replica != nullptr); |
799 | 1 | ExternalTabletServer* replica_ets = cluster_->tablet_server_by_uuid(replica->uuid()); |
800 | | |
801 | | // Pause a replica. |
802 | 1 | ASSERT_OK(replica_ets->Pause()); |
803 | 1 | LOG(INFO) << "Paused replica " << replica->uuid() << ", starting to write."; |
804 | | |
805 | | // Insert 3MB worth of data. |
806 | 1 | const int kNumWrites = 1000; |
807 | 1 | ASSERT_NO_FATALS(WriteOpsToLeader(kNumWrites, 3_KB)); |
808 | | |
809 | | // Allow for unsuccessful replication attempts to lagging follower. |
810 | 1 | SleepFor(5s); |
811 | | |
812 | | // Confirm that in the steady state the leader is not reading any new ops from disk since it has |
813 | | // reached 0 and is sending only NOOP. |
814 | 1 | auto ops_before = ASSERT_RESULT(GetNumLogCacheOpsReadFromDisk()); |
815 | 1 | SleepFor(10s); |
816 | 1 | auto ops_after = ASSERT_RESULT(GetNumLogCacheOpsReadFromDisk()); |
817 | 1 | EXPECT_EQ(ops_before, ops_after); |
818 | | |
819 | | // Now unpause the replica, the lagging replica should eventually catch back up. |
820 | 1 | ASSERT_OK(replica_ets->Resume()); |
821 | | |
822 | | // Wait for successful replication to resumed follower. |
823 | 1 | ASSERT_OK(WaitForServersToAgree(30s, tablet_servers_, tablet_id_, |
824 | 1 | kNumWrites)); |
825 | | |
826 | 1 | auto final_ops_read_from_disk = ASSERT_RESULT(GetNumLogCacheOpsReadFromDisk()); |
827 | 1 | LOG(INFO)<< "Ops read from disk: " << final_ops_read_from_disk; |
828 | | |
829 | | // NOTE: empirically determined threshold. |
830 | 1 | const int kOpsReadFromDiskThreshold = kNumWrites * 2; |
831 | 1 | ASSERT_LE(final_ops_read_from_disk, kOpsReadFromDiskThreshold); |
832 | 1 | } |
833 | | |
834 | | // Test that when a follower is paused so that it misses several writes, is resumed |
835 | | // briefly such that it only receives some of the entries from the leader but |
836 | | // becomes aware of the highest committed op id, it is able to bootstrap successfully |
837 | | // after a restart. |
838 | 1 | TEST_F(RaftConsensusITest, TestLaggingFollowerRestart) { |
839 | 1 | vector<string> extra_flags = { |
840 | 1 | "--consensus_inject_latency_ms_in_notifications=10"s, |
841 | 1 | "--rpc_throttle_threshold_bytes=-1"s, |
842 | 1 | "--consensus_max_batch_size_bytes=1024"s |
843 | 1 | }; |
844 | 1 | ASSERT_NO_FATALS(BuildAndStart(extra_flags)); |
845 | | |
846 | | // Find leader. |
847 | 1 | TServerDetails* leader = nullptr; |
848 | 1 | ASSERT_OK(FindTabletLeader(tablet_servers_, tablet_id_, 30s, &leader)); |
849 | 1 | CHECK_NOTNULL(leader); |
850 | | |
851 | | // Find a follower to pause, resume, and restart. |
852 | 1 | TServerDetails* replica = nullptr; |
853 | 1 | for (const auto& tablet_replica : tablet_replicas_) { |
854 | 1 | TServerDetails* ts_details = tablet_replica.second; |
855 | 1 | if (ts_details->uuid() != leader->uuid()) { |
856 | 1 | replica = ts_details; |
857 | 1 | break; |
858 | 1 | } |
859 | 1 | } |
860 | 1 | CHECK_NOTNULL(replica); |
861 | 1 | ASSERT_NE(replica->uuid(), leader->uuid()); |
862 | | |
863 | 1 | ExternalTabletServer* replica_ets = cluster_->tablet_server_by_uuid(replica->uuid()); |
864 | | |
865 | | // Pause a replica. |
866 | 1 | ASSERT_OK(replica_ets->Pause()); |
867 | 1 | LOG(INFO)<< "Paused replica " << replica->uuid() << ", starting to write."; |
868 | | |
869 | | // Insert 3MB worth of data. |
870 | 1 | const int kNumWrites = 1000; |
871 | 1 | ASSERT_NO_FATALS(WriteOpsToLeader(kNumWrites, 3_KB)); |
872 | | |
873 | | // Allow for paused replica to lag. |
874 | 1 | SleepFor(20s); |
875 | | |
876 | | // Now unpause the replica, the lagging replica should eventually catch back up. |
877 | 1 | ASSERT_OK(replica_ets->Resume()); |
878 | 1 | LOG(INFO)<< "Resumed replica " << replica->uuid() << "."; |
879 | | |
880 | | // Wait for resumed follower to get some but not all entries. |
881 | 1 | int64_t actual_minimum_index = consensus::kInvalidOpIdIndex; |
882 | 1 | ASSERT_OK(WaitUntilAllReplicasHaveOp(kNumWrites / 4, tablet_id_, |
883 | 1 | TServerDetailsVector(tablet_servers_), 10s, |
884 | 1 | &actual_minimum_index)); |
885 | 1 | LOG(INFO) << "Replica " << replica->uuid() << " received " << actual_minimum_index; |
886 | 1 | ASSERT_LE(actual_minimum_index, kNumWrites / 2); |
887 | | |
888 | 1 | replica_ets->Shutdown(); |
889 | 1 | LOG(INFO)<< "Shutdown replica " << replica->uuid() << "."; |
890 | | |
891 | 1 | CHECK_OK(replica_ets->Restart()); |
892 | 1 | LOG(INFO)<< "Restarted replica " << replica->uuid() << "."; |
893 | | |
894 | | // Wait for successful replication to restarted follower. |
895 | 1 | ASSERT_OK(WaitForServersToAgree(60s, tablet_servers_, tablet_id_, kNumWrites)); |
896 | 1 | } |
897 | | |
898 | | // Test that when a follower is shutdown so that it misses several writes, the |
899 | | // leader evicts almost all entries from log cache. |
900 | 1 | TEST_F(RaftConsensusITest, TestLaggingFollowerLogCacheEviction) { |
901 | 1 | const auto kHeartBeatInterval = 1s; |
902 | 1 | const int32_t kConsensusLaggingFollowerThreshold = 10; |
903 | | |
904 | 1 | vector<string> extra_flags = { |
905 | 1 | "--consensus_inject_latency_ms_in_notifications=10"s, |
906 | 1 | "--rpc_throttle_threshold_bytes=-1"s, |
907 | 1 | "--consensus_max_batch_size_bytes=1024"s, |
908 | 1 | Format("--consensus_lagging_follower_threshold=$0", kConsensusLaggingFollowerThreshold), |
909 | 1 | Format("--raft_heartbeat_interval_ms=$0", ToMilliseconds(kHeartBeatInterval)) |
910 | 1 | }; |
911 | 1 | ASSERT_NO_FATALS(BuildAndStart(extra_flags)); |
912 | | |
913 | | // Find leader. |
914 | 1 | TServerDetails* leader = nullptr; |
915 | 1 | ASSERT_OK(FindTabletLeader(tablet_servers_, tablet_id_, 30s, &leader)); |
916 | 1 | CHECK_NOTNULL(leader); |
917 | | |
918 | | // Find a follower to pause, resume, and restart. |
919 | 1 | TServerDetails* replica = nullptr; |
920 | 2 | for (const auto& tablet_replica : tablet_replicas_) { |
921 | 2 | TServerDetails* ts_details = tablet_replica.second; |
922 | 2 | if (ts_details->uuid() != leader->uuid()) { |
923 | 1 | replica = ts_details; |
924 | 1 | break; |
925 | 1 | } |
926 | 2 | } |
927 | 1 | CHECK_NOTNULL(replica); |
928 | 1 | ASSERT_NE(replica->uuid(), leader->uuid()); |
929 | | |
930 | 1 | ExternalTabletServer* replica_ets = cluster_->tablet_server_by_uuid(replica->uuid()); |
931 | | |
932 | | // Shutdown a replica. |
933 | 1 | replica_ets->Shutdown(); |
934 | 1 | LOG(INFO)<< "Shutdown replica " << replica->uuid() << "."; |
935 | | |
936 | | // Insert 3MB worth of data. |
937 | 1 | const int kNumWrites = 1000; |
938 | 1 | ASSERT_NO_FATALS(WriteOpsToLeader(kNumWrites, 3_KB)); |
939 | | |
940 | | // Allow for shutdown replica to be retransmitted to more than |
941 | | // consensus_lagging_follower_threshold times so that it can be marked as lagging. |
942 | | // Also use a multiplicative factor to reduce flakiness. |
943 | 1 | SleepFor(kHeartBeatInterval * kConsensusLaggingFollowerThreshold * 2); |
944 | | |
945 | | // Allow for shutdown replica to lag. |
946 | 1 | auto active_tablet_servers = CreateTabletServerMapUnowned(tablet_servers_); |
947 | 1 | ASSERT_EQ(1, active_tablet_servers.erase(replica->uuid())); |
948 | 1 | ASSERT_OK(WaitForServersToAgree(60s, active_tablet_servers, tablet_id_, kNumWrites)); |
949 | | |
950 | | // Find log cache num ops at leader - value from non-leaders will be zero, |
951 | | // hence it is unncessary to omit values from followers. |
952 | 1 | int64_t log_cache_num_ops = 0; |
953 | 3 | for (const auto& tablet_replica : tablet_replicas_) { |
954 | | // Skip shutdown replica. |
955 | 3 | if (tablet_replica.second->uuid() == replica->uuid()) { |
956 | 1 | continue; |
957 | 1 | } |
958 | | |
959 | 2 | log_cache_num_ops += ASSERT_RESULT( |
960 | 2 | cluster_->tablet_server_by_uuid(tablet_replica.second->uuid())->GetInt64Metric( |
961 | 2 | &METRIC_ENTITY_tablet, |
962 | 2 | nullptr, |
963 | 2 | &METRIC_log_cache_num_ops, |
964 | 2 | "value")); |
965 | 2 | } |
966 | 1 | LOG(INFO)<< "Num ops in log cache: " << log_cache_num_ops; |
967 | | |
968 | | // NOTE: empirically determined threshold. |
969 | 1 | const int kLogCacheNumOpsThreshold = kNumWrites / 10; |
970 | 1 | ASSERT_LE(log_cache_num_ops, kLogCacheNumOpsThreshold); |
971 | | |
972 | | // Now restart the replica shutdown earlier, the lagging replica should eventually catch back up. |
973 | 1 | CHECK_OK(replica_ets->Restart()); |
974 | 1 | LOG(INFO)<< "Restarted replica " << replica->uuid() << "."; |
975 | | |
976 | | // Wait for successful replication to restarted follower. |
977 | 1 | ASSERT_OK(WaitForServersToAgree(60s, tablet_servers_, tablet_id_, kNumWrites)); |
978 | 1 | } |
979 | | |
980 | 1 | TEST_F(RaftConsensusITest, TestAddRemoveNonVoter) { |
981 | 1 | MonoDelta kTimeout = MonoDelta::FromSeconds(30); |
982 | 1 | FLAGS_num_tablet_servers = 3; |
983 | 1 | vector<string> ts_flags = { |
984 | 1 | "--enable_leader_failure_detection=false"s, |
985 | 1 | "--TEST_inject_latency_before_change_role_secs=1"s, |
986 | 1 | "--follower_unavailable_considered_failed_sec=5"s, |
987 | 1 | }; |
988 | 1 | vector<string> master_flags = { |
989 | 1 | "--catalog_manager_wait_for_new_tablets_to_elect_leader=false"s, |
990 | 1 | "--use_create_table_leader_hint=false"s, |
991 | 1 | }; |
992 | 1 | ASSERT_NO_FATALS(BuildAndStart(ts_flags, master_flags)); |
993 | | |
994 | 1 | vector<TServerDetails*> tservers = TServerDetailsVector(tablet_servers_); |
995 | 1 | ASSERT_EQ(FLAGS_num_tablet_servers, tservers.size()); |
996 | | |
997 | | // Elect server 0 as leader and wait for log index 1 to propagate to all servers. |
998 | 1 | TServerDetails* leader_tserver = tservers[0]; |
999 | 1 | const string& leader_uuid = tservers[0]->uuid(); |
1000 | 1 | ASSERT_OK(StartElection(leader_tserver, tablet_id_, kTimeout)); |
1001 | 1 | int64_t min_index = 1; |
1002 | 1 | ASSERT_OK(WaitForServersToAgree(kTimeout, tablet_servers_, tablet_id_, min_index, &min_index)); |
1003 | 1 | ASSERT_OK(WaitUntilCommittedOpIdIndexIsAtLeast(&min_index, leader_tserver, tablet_id_, kTimeout)); |
1004 | | |
1005 | | // Kill the master, so we can change the config without interference. |
1006 | 1 | cluster_->master()->Shutdown(); |
1007 | | |
1008 | 1 | auto active_tablet_servers = CreateTabletServerMapUnowned(tablet_servers_); |
1009 | | |
1010 | | // Do majority correctness check for 3 servers. |
1011 | 1 | ASSERT_NO_FATALS(AssertMajorityRequiredForElectionsAndWrites(active_tablet_servers, leader_uuid)); |
1012 | 1 | auto cur_log_index = ASSERT_RESULT(GetLastOpIdForReplica( |
1013 | 1 | tablet_id_, leader_tserver, consensus::RECEIVED_OPID, kTimeout)).index; |
1014 | | |
1015 | | // Remove tablet server 2, so we can add it as an observer. |
1016 | 1 | vector<int> remove_list = { 2, 1 }; |
1017 | 1 | LOG(INFO) << "Remove: Going from 3 voter replicas to 2 voter replicas"; |
1018 | | |
1019 | 1 | TServerDetails* tserver_to_remove = tservers[2]; |
1020 | 1 | LOG(INFO) << "Removing tserver with uuid " << tserver_to_remove->uuid(); |
1021 | 1 | ASSERT_OK(RemoveServer(leader_tserver, tablet_id_, tserver_to_remove, boost::none, kTimeout)); |
1022 | 1 | ASSERT_EQ(1, active_tablet_servers.erase(tserver_to_remove->uuid())); |
1023 | 1 | ASSERT_OK(WaitForServersToAgree(kTimeout, active_tablet_servers, tablet_id_, ++cur_log_index)); |
1024 | | |
1025 | | // Do majority correctness check for each incremental decrease. |
1026 | 1 | ASSERT_NO_FATALS(AssertMajorityRequiredForElectionsAndWrites(active_tablet_servers, leader_uuid)); |
1027 | 1 | cur_log_index = ASSERT_RESULT(GetLastOpIdForReplica( |
1028 | 1 | tablet_id_, leader_tserver, consensus::RECEIVED_OPID, kTimeout)).index; |
1029 | | |
1030 | | // Add the tablet server back as an observer. |
1031 | 1 | LOG(INFO) << "Add: Creating a new read replica"; |
1032 | | |
1033 | 1 | TServerDetails* tserver_to_add = tservers[2]; |
1034 | 1 | LOG(INFO) << "Adding tserver with uuid " << tserver_to_add->uuid(); |
1035 | | |
1036 | 1 | ASSERT_OK(DeleteTablet(tserver_to_add, tablet_id_, tablet::TABLET_DATA_TOMBSTONED, boost::none, |
1037 | 1 | kTimeout)); |
1038 | | |
1039 | 1 | ASSERT_OK(AddServer(leader_tserver, tablet_id_, tserver_to_add, PeerMemberType::PRE_OBSERVER, |
1040 | 1 | boost::none, kTimeout)); |
1041 | | |
1042 | 1 | consensus::ConsensusStatePB cstate; |
1043 | 1 | ASSERT_OK(itest::GetConsensusState(leader_tserver, tablet_id_, |
1044 | 1 | consensus::CONSENSUS_CONFIG_COMMITTED, kTimeout, &cstate)); |
1045 | | |
1046 | | // Verify that this tserver member type was set correctly. |
1047 | 3 | for (const auto& peer : cstate.config().peers()) { |
1048 | 3 | if (peer.permanent_uuid() == tserver_to_add->uuid()) { |
1049 | 1 | ASSERT_EQ(PeerMemberType::PRE_OBSERVER, peer.member_type()); |
1050 | 1 | } |
1051 | 3 | } |
1052 | | |
1053 | | // Wait until the ChangeConfig has finished and this tserver has been made a VOTER. |
1054 | 1 | ASSERT_OK(WaitUntilCommittedConfigNumVotersIs(active_tablet_servers.size(), leader_tserver, |
1055 | 1 | tablet_id_, MonoDelta::FromSeconds(10))); |
1056 | 1 | ASSERT_OK(WaitUntilCommittedConfigMemberTypeIs(1, leader_tserver, tablet_id_, |
1057 | 1 | MonoDelta::FromSeconds(10), |
1058 | 1 | PeerMemberType::OBSERVER)); |
1059 | | |
1060 | 1 | ASSERT_OK(WaitForServersToAgree(kTimeout, active_tablet_servers, tablet_id_, ++cur_log_index)); |
1061 | | |
1062 | | // Pause the read replica and assert that it gets removed from the configuration. |
1063 | 1 | ASSERT_OK(cluster_->tablet_server_by_uuid(tserver_to_add->uuid())->Pause()); |
1064 | 1 | ASSERT_OK(WaitUntilCommittedConfigMemberTypeIs(0, leader_tserver, tablet_id_, |
1065 | 1 | MonoDelta::FromSeconds(20), |
1066 | 1 | PeerMemberType::OBSERVER)); |
1067 | 1 | } |
1068 | | |
1069 | | void RaftConsensusITest::CauseFollowerToFallBehindLogGC(string* leader_uuid, |
1070 | | int64_t* orig_term, |
1071 | 3 | string* fell_behind_uuid) { |
1072 | 3 | MonoDelta kTimeout = MonoDelta::FromSeconds(10); |
1073 | | // Wait for all of the replicas to have acknowledged the elected |
1074 | | // leader and logged the first NO_OP. |
1075 | 3 | ASSERT_OK(WaitForServersToAgree(kTimeout, tablet_servers_, tablet_id_, 1)); |
1076 | | |
1077 | | // Pause one server. This might be the leader, but pausing it will cause |
1078 | | // a leader election to happen. |
1079 | 3 | TServerDetails* replica = (*tablet_replicas_.begin()).second; |
1080 | 3 | ExternalTabletServer* replica_ets = cluster_->tablet_server_by_uuid(replica->uuid()); |
1081 | 3 | ASSERT_OK(replica_ets->Pause()); |
1082 | | |
1083 | | // Find a leader. In case we paused the leader above, this will wait until |
1084 | | // we have elected a new one. |
1085 | 3 | TServerDetails* leader = nullptr; |
1086 | 3 | while (true) { |
1087 | 3 | Status s = GetLeaderReplicaWithRetries(tablet_id_, &leader); |
1088 | 3 | if (s.ok() && leader != nullptr && leader != replica) { |
1089 | 3 | break; |
1090 | 3 | } |
1091 | 0 | SleepFor(MonoDelta::FromMilliseconds(10)); |
1092 | 0 | } |
1093 | 3 | *leader_uuid = leader->uuid(); |
1094 | 3 | int leader_index = cluster_->tablet_server_index_by_uuid(*leader_uuid); |
1095 | | |
1096 | 3 | TestWorkload workload(cluster_.get()); |
1097 | 3 | workload.set_table_name(kTableName); |
1098 | 3 | workload.set_timeout_allowed(true); |
1099 | 3 | workload.set_payload_bytes(128 * 1024); // Write ops of size 128KB. |
1100 | 3 | workload.set_write_batch_size(1); |
1101 | 3 | workload.set_num_write_threads(4); |
1102 | 3 | workload.Setup(); |
1103 | 3 | workload.Start(); |
1104 | | |
1105 | 3 | LOG(INFO) << "Waiting until we've written at least 4MB..."; |
1106 | 3 | while (workload.rows_inserted() < 8 * 4) { |
1107 | 0 | SleepFor(MonoDelta::FromMilliseconds(10)); |
1108 | 0 | } |
1109 | 3 | workload.StopAndJoin(); |
1110 | | |
1111 | 3 | LOG(INFO) << "Waiting for log GC on " << leader->uuid(); |
1112 | | // Some WAL segments must exist, but wal segment 1 must not exist. |
1113 | | |
1114 | 3 | ASSERT_OK(inspect_->WaitForFilePatternInTabletWalDirOnTs( |
1115 | 3 | leader_index, tablet_id_, { "wal-" }, { "wal-000000001" })); |
1116 | | |
1117 | 0 | LOG(INFO) << "Log GC complete on " << leader->uuid(); |
1118 | | |
1119 | | // Then wait another couple of seconds to be sure that it has bothered to try |
1120 | | // to write to the paused peer. |
1121 | | // TODO: would be nice to be able to poll the leader with an RPC like |
1122 | | // GetLeaderStatus() which could tell us whether it has made any requests |
1123 | | // since the log GC. |
1124 | 0 | SleepFor(MonoDelta::FromSeconds(2)); |
1125 | | |
1126 | | // Make a note of whatever the current term of the cluster is, |
1127 | | // before we resume the follower. |
1128 | 0 | { |
1129 | 0 | OpId op_id = ASSERT_RESULT(GetLastOpIdForReplica( |
1130 | 0 | tablet_id_, leader, consensus::RECEIVED_OPID, kTimeout)); |
1131 | 0 | *orig_term = op_id.term; |
1132 | 0 | LOG(INFO) << "Servers converged with original term " << *orig_term; |
1133 | 0 | } |
1134 | | |
1135 | | // Resume the follower. |
1136 | 0 | LOG(INFO) << "Resuming " << replica->uuid(); |
1137 | 0 | ASSERT_OK(replica_ets->Resume()); |
1138 | | |
1139 | | // Ensure that none of the tablet servers crashed. |
1140 | 0 | for (size_t i = 0; i < cluster_->num_tablet_servers(); i++) { |
1141 | | // Make sure it didn't crash. |
1142 | 0 | ASSERT_TRUE(cluster_->tablet_server(i)->IsProcessAlive()) |
1143 | 0 | << "Tablet server " << i << " crashed"; |
1144 | 0 | } |
1145 | 0 | *fell_behind_uuid = replica->uuid(); |
1146 | 0 | } |
1147 | | |
1148 | 2 | void RaftConsensusITest::TestAddRemoveServer(PeerMemberType member_type) { |
1149 | 2 | ASSERT_TRUE(member_type == PeerMemberType::PRE_VOTER || |
1150 | 2 | member_type == PeerMemberType::PRE_OBSERVER); |
1151 | | |
1152 | 2 | MonoDelta kTimeout = MonoDelta::FromSeconds(30); |
1153 | 2 | FLAGS_num_tablet_servers = 3; |
1154 | 2 | vector<string> ts_flags = { |
1155 | 2 | "--enable_leader_failure_detection=false"s, |
1156 | 2 | "--TEST_inject_latency_before_change_role_secs=1"s, |
1157 | 2 | }; |
1158 | 2 | vector<string> master_flags = { |
1159 | 2 | "--catalog_manager_wait_for_new_tablets_to_elect_leader=false"s, |
1160 | 2 | "--use_create_table_leader_hint=false"s, |
1161 | 2 | }; |
1162 | 2 | ASSERT_NO_FATALS(BuildAndStart(ts_flags, master_flags)); |
1163 | | |
1164 | 2 | vector<TServerDetails*> tservers = TServerDetailsVector(tablet_servers_); |
1165 | 2 | ASSERT_EQ(FLAGS_num_tablet_servers, tservers.size()); |
1166 | | |
1167 | | // Elect server 0 as leader and wait for log index 1 to propagate to all servers. |
1168 | 2 | TServerDetails* leader_tserver = tservers[0]; |
1169 | 2 | const string& leader_uuid = tservers[0]->uuid(); |
1170 | 2 | ASSERT_OK(StartElection(leader_tserver, tablet_id_, kTimeout)); |
1171 | 2 | ASSERT_OK(WaitForServersToAgree(kTimeout, tablet_servers_, tablet_id_, 1)); |
1172 | 2 | ASSERT_OK(WaitUntilCommittedOpIdIndexIs(1, leader_tserver, tablet_id_, kTimeout)); |
1173 | | |
1174 | | // Make sure the server rejects removal of itself from the configuration. |
1175 | 2 | Status s = RemoveServer(leader_tserver, tablet_id_, leader_tserver, boost::none, kTimeout, NULL, |
1176 | 2 | false /* retry */); |
1177 | 4 | ASSERT_TRUE(s.IsInvalidArgument()) << "Should not be able to remove self from config: " |
1178 | 4 | << s.ToString(); |
1179 | | |
1180 | | // Insert the row that we will update throughout the test. |
1181 | 2 | ASSERT_OK(WriteSimpleTestRow( |
1182 | 2 | leader_tserver, tablet_id_, kTestRowKey, kTestRowIntVal, "initial insert", kTimeout)); |
1183 | | |
1184 | | // Kill the master, so we can change the config without interference. |
1185 | 2 | cluster_->master()->Shutdown(); |
1186 | | |
1187 | 2 | auto active_tablet_servers = CreateTabletServerMapUnowned(tablet_servers_); |
1188 | | |
1189 | | // Do majority correctness check for 3 servers. |
1190 | 2 | ASSERT_NO_FATALS(AssertMajorityRequiredForElectionsAndWrites(active_tablet_servers, leader_uuid)); |
1191 | 2 | int64_t cur_log_index = ASSERT_RESULT(GetLastOpIdForReplica( |
1192 | 2 | tablet_id_, leader_tserver, consensus::RECEIVED_OPID, kTimeout)).index; |
1193 | | |
1194 | | // Go from 3 tablet servers down to 1 in the configuration. |
1195 | 2 | vector<int> remove_list = { 2, 1 }; |
1196 | 4 | for (int to_remove_idx : remove_list) { |
1197 | 4 | auto num_servers = active_tablet_servers.size(); |
1198 | 4 | LOG(INFO) << "Remove: Going from " << num_servers << " to " << num_servers - 1 << " replicas"; |
1199 | | |
1200 | 4 | TServerDetails* tserver_to_remove = tservers[to_remove_idx]; |
1201 | 4 | LOG(INFO) << "Removing tserver with uuid " << tserver_to_remove->uuid(); |
1202 | 4 | ASSERT_OK(RemoveServer(leader_tserver, tablet_id_, tserver_to_remove, boost::none, kTimeout)); |
1203 | 4 | ASSERT_EQ(1, active_tablet_servers.erase(tserver_to_remove->uuid())); |
1204 | 4 | ASSERT_OK(WaitForServersToAgree(kTimeout, active_tablet_servers, tablet_id_, ++cur_log_index)); |
1205 | | |
1206 | | // Do majority correctness check for each incremental decrease. |
1207 | 4 | ASSERT_NO_FATALS(AssertMajorityRequiredForElectionsAndWrites( |
1208 | 4 | active_tablet_servers, leader_uuid)); |
1209 | 4 | cur_log_index = ASSERT_RESULT(GetLastOpIdForReplica( |
1210 | 4 | tablet_id_, leader_tserver, consensus::RECEIVED_OPID, kTimeout)).index; |
1211 | 4 | } |
1212 | | |
1213 | 2 | int num_observers = 0; |
1214 | | // Add the tablet servers back, in reverse order, going from 1 to 3 servers in the configuration. |
1215 | 2 | vector<int> add_list = { 1, 2 }; |
1216 | 4 | for (int to_add_idx : add_list) { |
1217 | 4 | auto num_servers = active_tablet_servers.size(); |
1218 | 4 | if (PeerMemberType::PRE_VOTER == member_type) { |
1219 | 2 | LOG(INFO) << "Add: Going from " << num_servers << " to " << num_servers + 1 << " replicas"; |
1220 | 2 | } else { |
1221 | 2 | LOG(INFO) << "Add: Going from " << num_observers<< " to " << num_observers + 1 |
1222 | 2 | << " observers"; |
1223 | 2 | } |
1224 | | |
1225 | 4 | TServerDetails* tserver_to_add = tservers[to_add_idx]; |
1226 | 4 | LOG(INFO) << "Adding tserver with uuid " << tserver_to_add->uuid(); |
1227 | | |
1228 | 4 | ASSERT_OK(DeleteTablet(tserver_to_add, tablet_id_, tablet::TABLET_DATA_TOMBSTONED, boost::none, |
1229 | 4 | kTimeout)); |
1230 | | |
1231 | 4 | ASSERT_OK(AddServer(leader_tserver, tablet_id_, tserver_to_add, member_type, boost::none, |
1232 | 4 | kTimeout)); |
1233 | | |
1234 | 4 | consensus::ConsensusStatePB cstate; |
1235 | 4 | ASSERT_OK(itest::GetConsensusState(leader_tserver, tablet_id_, |
1236 | 4 | consensus::CONSENSUS_CONFIG_COMMITTED, kTimeout, &cstate)); |
1237 | | |
1238 | | // Verify that this tserver member type was set correctly. |
1239 | 10 | for (const auto& peer : cstate.config().peers()) { |
1240 | 10 | if (peer.permanent_uuid() == tserver_to_add->uuid()) { |
1241 | 4 | ASSERT_EQ(member_type, peer.member_type()); |
1242 | 4 | LOG(INFO) << "tserver with uuid " << tserver_to_add->uuid() << " was added as a " |
1243 | 4 | << PeerMemberType_Name(peer.member_type()); |
1244 | 4 | } |
1245 | 10 | } |
1246 | | |
1247 | 4 | if (PeerMemberType::PRE_VOTER == member_type) { |
1248 | 2 | InsertOrDie(&active_tablet_servers, tserver_to_add->uuid(), tserver_to_add); |
1249 | 2 | } else { |
1250 | 2 | num_observers++; |
1251 | 2 | } |
1252 | | |
1253 | | // Wait until the ChangeConfig has finished and this tserver has been made a VOTER. |
1254 | 4 | ASSERT_OK(WaitUntilCommittedConfigNumVotersIs(active_tablet_servers.size(), leader_tserver, |
1255 | 4 | tablet_id_, MonoDelta::FromSeconds(10))); |
1256 | 4 | ASSERT_OK(WaitUntilCommittedConfigMemberTypeIs(num_observers, leader_tserver, tablet_id_, |
1257 | 4 | MonoDelta::FromSeconds(10), |
1258 | 4 | PeerMemberType::OBSERVER)); |
1259 | | |
1260 | 4 | ASSERT_OK(WaitForServersToAgree(kTimeout, active_tablet_servers, tablet_id_, ++cur_log_index)); |
1261 | | |
1262 | | // Do majority correctness check for each incremental increase. |
1263 | 4 | ASSERT_NO_FATALS(AssertMajorityRequiredForElectionsAndWrites( |
1264 | 4 | active_tablet_servers, leader_uuid)); |
1265 | 4 | cur_log_index = ASSERT_RESULT(GetLastOpIdForReplica( |
1266 | 4 | tablet_id_, leader_tserver, consensus::RECEIVED_OPID, kTimeout)).index; |
1267 | 4 | } |
1268 | 2 | } |
1269 | | |
1270 | 2 | void RaftConsensusITest::TestRemoveTserverFailsWhenServerInTransition(PeerMemberType member_type) { |
1271 | 2 | ASSERT_TRUE(member_type == PeerMemberType::PRE_VOTER || |
1272 | 2 | member_type == PeerMemberType::PRE_OBSERVER); |
1273 | | |
1274 | 2 | FLAGS_num_tablet_servers = 3; |
1275 | 2 | vector<string> ts_flags = { |
1276 | 2 | "--enable_leader_failure_detection=false"s, |
1277 | 2 | "--TEST_inject_latency_before_change_role_secs=10"s, |
1278 | 2 | }; |
1279 | 2 | vector<string> master_flags = { |
1280 | 2 | "--catalog_manager_wait_for_new_tablets_to_elect_leader=false", |
1281 | 2 | "--use_create_table_leader_hint=false"s, |
1282 | 2 | }; |
1283 | 2 | ASSERT_NO_FATALS(BuildAndStart(ts_flags, master_flags)); |
1284 | | |
1285 | 2 | vector<TServerDetails*> tservers = TServerDetailsVector(tablet_servers_); |
1286 | 2 | ASSERT_EQ(FLAGS_num_tablet_servers, tservers.size()); |
1287 | | |
1288 | | // Elect server 0 as leader and wait for log index 1 to propagate to all servers. |
1289 | 2 | TServerDetails* initial_leader = tservers[0]; |
1290 | 2 | const MonoDelta timeout = MonoDelta::FromSeconds(10); |
1291 | 2 | ASSERT_OK(StartElection(initial_leader, tablet_id_, timeout)); |
1292 | 2 | ASSERT_OK(WaitForServersToAgree(timeout, tablet_servers_, tablet_id_, 1)); |
1293 | 2 | ASSERT_OK(WaitUntilCommittedOpIdIndexIs(1, initial_leader, tablet_id_, timeout)); |
1294 | | |
1295 | | // The server we will remove and then bring back. |
1296 | 2 | TServerDetails* tserver = tservers[2]; |
1297 | | |
1298 | | // Kill the master, so we can change the config without interference. |
1299 | 2 | cluster_->master()->Shutdown(); |
1300 | | |
1301 | | // Now remove server 2 from the configuration. |
1302 | 2 | auto active_tablet_servers = CreateTabletServerMapUnowned(tablet_servers_); |
1303 | 2 | LOG(INFO) << "Removing tserver with uuid " << tserver->uuid(); |
1304 | 2 | ASSERT_OK(RemoveServer(initial_leader, tablet_id_, tserver, boost::none, |
1305 | 2 | MonoDelta::FromSeconds(10))); |
1306 | 2 | ASSERT_EQ(1, active_tablet_servers.erase(tserver->uuid())); |
1307 | 2 | int64_t cur_log_index = 2; |
1308 | 2 | ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(10), |
1309 | 2 | active_tablet_servers, tablet_id_, cur_log_index)); |
1310 | 2 | ASSERT_OK(WaitUntilCommittedOpIdIndexIs(cur_log_index, initial_leader, tablet_id_, timeout)); |
1311 | | |
1312 | 2 | ASSERT_OK(DeleteTablet(tserver, tablet_id_, tablet::TABLET_DATA_TOMBSTONED, boost::none, |
1313 | 2 | MonoDelta::FromSeconds(30))); |
1314 | | |
1315 | | // Now add server 2 back as a learner to the peers. |
1316 | 2 | LOG(INFO) << "Adding back Peer " << tserver->uuid(); |
1317 | 2 | ASSERT_OK(AddServer(initial_leader, tablet_id_, tserver, member_type, boost::none, |
1318 | 2 | MonoDelta::FromSeconds(10))); |
1319 | | |
1320 | | // Only wait for TS 0 and 1 to agree that the new change config op (ADD_SERVER for server 2) |
1321 | | // has been replicated. |
1322 | 2 | ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(60), |
1323 | 2 | active_tablet_servers, tablet_id_, ++cur_log_index)); |
1324 | | |
1325 | | // Now try to remove server 1 from the configuration. This should fail. |
1326 | 2 | LOG(INFO) << "Removing tserver with uuid " << tservers[1]->uuid(); |
1327 | 2 | auto status = RemoveServer(initial_leader, tablet_id_, tservers[1], boost::none, |
1328 | 2 | MonoDelta::FromSeconds(10), NULL, false /* retry */); |
1329 | 2 | ASSERT_TRUE(status.IsIllegalState()); |
1330 | 2 | ASSERT_STR_CONTAINS(status.ToString(), "Leader is not ready for Config Change"); |
1331 | 2 | } |
1332 | | |
1333 | | // In TestRemoveTserverFailsWhenServerInTransition we are testing that a REMOVE_SERVER request |
1334 | | // operation fails whenever the committed config contains a server in PRE_VOTER or PRE_OBSERVER |
1335 | | // mode. In this test we are testing that a REMOVE_SERVER operation succeeds whenever the committed |
1336 | | // config contains a PRE_VOTER or PRE_OBSERVER mode and it's the same server we are trying to |
1337 | | // remove. |
1338 | 2 | void RaftConsensusITest::TestRemoveTserverInTransitionSucceeds(PeerMemberType member_type) { |
1339 | 2 | ASSERT_TRUE(member_type == PeerMemberType::PRE_VOTER || |
1340 | 2 | member_type == PeerMemberType::PRE_OBSERVER); |
1341 | | |
1342 | 2 | FLAGS_num_tablet_servers = 3; |
1343 | 2 | vector<string> ts_flags = { |
1344 | 2 | "--enable_leader_failure_detection=false"s, |
1345 | 2 | "--TEST_skip_change_role"s, |
1346 | 2 | }; |
1347 | 2 | vector<string> master_flags = { |
1348 | 2 | "--catalog_manager_wait_for_new_tablets_to_elect_leader=false"s, |
1349 | 2 | "--use_create_table_leader_hint=false"s, |
1350 | 2 | }; |
1351 | 2 | ASSERT_NO_FATALS(BuildAndStart(ts_flags, master_flags)); |
1352 | | |
1353 | 2 | vector<TServerDetails*> tservers = TServerDetailsVector(tablet_servers_); |
1354 | 2 | ASSERT_EQ(FLAGS_num_tablet_servers, tservers.size()); |
1355 | | |
1356 | | // Elect server 0 as leader and wait for log index 1 to propagate to all servers. |
1357 | 2 | TServerDetails* initial_leader = tservers[0]; |
1358 | 2 | const MonoDelta timeout = MonoDelta::FromSeconds(10); |
1359 | 2 | ASSERT_OK(StartElection(initial_leader, tablet_id_, timeout)); |
1360 | 2 | ASSERT_OK(WaitForServersToAgree(timeout, tablet_servers_, tablet_id_, 1)); |
1361 | 2 | ASSERT_OK(WaitUntilCommittedOpIdIndexIs(1, initial_leader, tablet_id_, timeout)); |
1362 | | |
1363 | | // The server we will remove and then bring back. |
1364 | 2 | TServerDetails* tserver = tservers[2]; |
1365 | | |
1366 | | // Kill the master, so we can change the config without interference. |
1367 | 2 | cluster_->master()->Shutdown(); |
1368 | | |
1369 | | // Now remove server 2 from the configuration. |
1370 | 2 | auto active_tablet_servers = CreateTabletServerMapUnowned(tablet_servers_); |
1371 | 2 | LOG(INFO) << "Removing tserver with uuid " << tserver->uuid(); |
1372 | 2 | ASSERT_OK(RemoveServer(initial_leader, tablet_id_, tserver, boost::none, |
1373 | 2 | MonoDelta::FromSeconds(10))); |
1374 | 2 | ASSERT_EQ(1, active_tablet_servers.erase(tserver->uuid())); |
1375 | 2 | int64_t cur_log_index = 2; |
1376 | 2 | ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(10), |
1377 | 2 | active_tablet_servers, tablet_id_, cur_log_index)); |
1378 | 2 | ASSERT_OK(WaitUntilCommittedOpIdIndexIs(cur_log_index, initial_leader, tablet_id_, timeout)); |
1379 | | |
1380 | 2 | ASSERT_OK(DeleteTablet(tserver, tablet_id_, tablet::TABLET_DATA_TOMBSTONED, boost::none, |
1381 | 2 | MonoDelta::FromSeconds(30))); |
1382 | | |
1383 | | // Now add server 2 back in PRE_VOTER or PRE_OBSERVER mode. This server will never transition to |
1384 | | // VOTER or OBSERVER because flag TEST_skip_change_role is set. |
1385 | 2 | LOG(INFO) << "Adding back Peer " << tserver->uuid(); |
1386 | 2 | ASSERT_OK(AddServer(initial_leader, tablet_id_, tserver, member_type, boost::none, |
1387 | 2 | MonoDelta::FromSeconds(10))); |
1388 | | |
1389 | | // Only wait for TS 0 and 1 to agree that the new change config op (ADD_SERVER for server 2) |
1390 | | // has been replicated. |
1391 | 2 | ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(60), |
1392 | 2 | active_tablet_servers, tablet_id_, ++cur_log_index)); |
1393 | | |
1394 | | // Now try to remove server 2 from the configuration. This should succeed. |
1395 | 2 | LOG(INFO) << "Removing tserver with uuid " << tservers[2]->uuid(); |
1396 | 2 | ASSERT_OK(RemoveServer(initial_leader, tablet_id_, tservers[2], boost::none, |
1397 | 2 | MonoDelta::FromSeconds(10))); |
1398 | 2 | } |
1399 | | |
1400 | | // Test that the leader doesn't crash if one of its followers has |
1401 | | // fallen behind so far that the logs necessary to catch it up |
1402 | | // have been GCed. |
1403 | | // |
1404 | | // In a real cluster, this will eventually cause the follower to be |
1405 | | // evicted/replaced. In any case, the leader should not crash. |
1406 | | // |
1407 | | // We also ensure that, when the leader stops writing to the follower, |
1408 | | // the follower won't disturb the other nodes when it attempts to elect |
1409 | | // itself. |
1410 | | // |
1411 | | // This is a regression test for KUDU-775 and KUDU-562. |
1412 | 1 | TEST_F(RaftConsensusITest, TestFollowerFallsBehindLeaderGC) { |
1413 | | // Disable follower eviction to maintain the original intent of this test. |
1414 | 1 | vector<string> extra_flags = { "--evict_failed_followers=false" }; |
1415 | 1 | AddFlagsForLogRolls(&extra_flags); // For CauseFollowerToFallBehindLogGC(). |
1416 | 1 | ASSERT_NO_FATALS(BuildAndStart(extra_flags, std::vector<std::string>())); |
1417 | | |
1418 | 1 | string leader_uuid; |
1419 | 1 | int64_t orig_term; |
1420 | 1 | string follower_uuid; |
1421 | 1 | ASSERT_NO_FATALS(CauseFollowerToFallBehindLogGC(&leader_uuid, &orig_term, &follower_uuid)); |
1422 | | |
1423 | | // Wait for remaining majority to agree. |
1424 | 0 | auto active_tablet_servers = CreateTabletServerMapUnowned(tablet_servers_); |
1425 | 0 | ASSERT_EQ(3, active_tablet_servers.size()); |
1426 | 0 | ASSERT_EQ(1, active_tablet_servers.erase(follower_uuid)); |
1427 | 0 | ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(30), active_tablet_servers, tablet_id_, |
1428 | 0 | 1)); |
1429 | |
|
1430 | 0 | if (AllowSlowTests()) { |
1431 | | // Sleep long enough that the "abandoned" server's leader election interval |
1432 | | // will trigger several times. Then, verify that the term has not increased. |
1433 | | // This ensures that the other servers properly ignore the election requests |
1434 | | // from the abandoned node. |
1435 | | // TODO: would be nicer to use an RPC to check the current term of the |
1436 | | // abandoned replica, and wait until it has incremented a couple of times. |
1437 | 0 | SleepFor(MonoDelta::FromSeconds(5)); |
1438 | 0 | TServerDetails* leader = tablet_servers_[leader_uuid].get(); |
1439 | 0 | auto op_id = ASSERT_RESULT(GetLastOpIdForReplica( |
1440 | 0 | tablet_id_, leader, consensus::RECEIVED_OPID, MonoDelta::FromSeconds(10))); |
1441 | 0 | ASSERT_EQ(orig_term, op_id.term) |
1442 | 0 | << "expected the leader to have not advanced terms but has op " << op_id; |
1443 | 0 | } |
1444 | 0 | } |
1445 | | |
1446 | 9 | int RaftConsensusITest::RestartAnyCrashedTabletServers() { |
1447 | 9 | int restarted = 0; |
1448 | 36 | for (size_t i = 0; i < cluster_->num_tablet_servers(); i++) { |
1449 | 27 | if (!cluster_->tablet_server(i)->IsProcessAlive()) { |
1450 | 3 | LOG(INFO) << "TS " << i << " appears to have crashed. Restarting."; |
1451 | 3 | cluster_->tablet_server(i)->Shutdown(); |
1452 | 3 | CHECK_OK(cluster_->tablet_server(i)->Restart()); |
1453 | 3 | restarted++; |
1454 | 3 | } |
1455 | 27 | } |
1456 | 9 | return restarted; |
1457 | 9 | } |
1458 | | |
1459 | 60 | void RaftConsensusITest::AssertNoTabletServersCrashed() { |
1460 | 240 | for (size_t i = 0; i < cluster_->num_tablet_servers(); i++) { |
1461 | 180 | if (cluster_->tablet_server(i)->IsShutdown()) continue; |
1462 | | |
1463 | 360 | ASSERT_TRUE(cluster_->tablet_server(i)->IsProcessAlive()) |
1464 | 360 | << "Tablet server " << i << " crashed"; |
1465 | 180 | } |
1466 | 60 | } |
1467 | | |
1468 | | // This test starts several tablet servers, and configures them with |
1469 | | // fault injection so that the leaders frequently crash just before |
1470 | | // sending RPCs to followers. |
1471 | | // |
1472 | | // This can result in various scenarios where leaders crash right after |
1473 | | // being elected and never succeed in replicating their first operation. |
1474 | | // For example, KUDU-783 reproduces from this test approximately 5% of the |
1475 | | // time on a slow-test debug build. |
1476 | 1 | TEST_F(RaftConsensusITest, InsertWithCrashyNodes) { |
1477 | 1 | int kCrashesToCause = 3; |
1478 | 1 | vector<string> ts_flags, master_flags; |
1479 | 1 | if (AllowSlowTests()) { |
1480 | 0 | FLAGS_num_tablet_servers = 7; |
1481 | 0 | FLAGS_num_replicas = 7; |
1482 | 0 | kCrashesToCause = 15; |
1483 | 0 | master_flags.push_back("--replication_factor=7"); |
1484 | 0 | } |
1485 | | |
1486 | | // Crash 5% of the time just before sending an RPC. With 7 servers, |
1487 | | // this means we crash about 30% of the time before we've fully |
1488 | | // replicated the NO_OP at the start of the term. |
1489 | 1 | ts_flags.push_back("--TEST_fault_crash_on_leader_request_fraction=0.05"); |
1490 | | |
1491 | | // Inject latency to encourage the replicas to fall out of sync |
1492 | | // with each other. |
1493 | 1 | ts_flags.push_back("--log_inject_latency"); |
1494 | 1 | ts_flags.push_back("--log_inject_latency_ms_mean=30"); |
1495 | 1 | ts_flags.push_back("--log_inject_latency_ms_stddev=60"); |
1496 | | |
1497 | | // Make leader elections faster so we get through more cycles of |
1498 | | // leaders. |
1499 | 1 | ts_flags.push_back("--raft_heartbeat_interval_ms=100"); |
1500 | | |
1501 | | // Avoid preallocating segments since bootstrap is a little bit |
1502 | | // faster if it doesn't have to scan forward through the preallocated |
1503 | | // log area. |
1504 | 1 | ts_flags.push_back("--log_preallocate_segments=false"); |
1505 | | |
1506 | 1 | CreateCluster("raft_consensus-itest-cluster", ts_flags, master_flags); |
1507 | | |
1508 | 1 | TestWorkload workload(cluster_.get()); |
1509 | 1 | workload.set_timeout_allowed(true); |
1510 | 1 | workload.set_write_timeout_millis(1000); |
1511 | 1 | workload.set_num_write_threads(10); |
1512 | 1 | workload.set_write_batch_size(1); |
1513 | 1 | workload.set_sequential_write(true); |
1514 | 1 | workload.Setup(client::YBTableType::YQL_TABLE_TYPE); |
1515 | 1 | workload.Start(); |
1516 | | |
1517 | 1 | int num_crashes = 0; |
1518 | 10 | while (num_crashes < kCrashesToCause && |
1519 | 9 | workload.rows_inserted() < 100) { |
1520 | 9 | num_crashes += RestartAnyCrashedTabletServers(); |
1521 | 9 | SleepFor(MonoDelta::FromMilliseconds(10)); |
1522 | 9 | } |
1523 | | |
1524 | 1 | workload.StopAndJoin(); |
1525 | | |
1526 | | // After we stop the writes, we can still get crashes because heartbeats could |
1527 | | // trigger the fault path. So, disable the faults and restart one more time. |
1528 | 4 | for (size_t i = 0; i < cluster_->num_tablet_servers(); i++) { |
1529 | 3 | ExternalTabletServer* ts = cluster_->tablet_server(i); |
1530 | 3 | vector<string>* flags = ts->mutable_flags(); |
1531 | 3 | bool removed_flag = false; |
1532 | 18 | for (auto it = flags->begin(); it != flags->end(); ++it) { |
1533 | 18 | if (HasPrefixString(*it, "--TEST_fault_crash")) { |
1534 | 3 | flags->erase(it); |
1535 | 3 | removed_flag = true; |
1536 | 3 | break; |
1537 | 3 | } |
1538 | 18 | } |
1539 | 6 | ASSERT_TRUE(removed_flag) << "could not remove flag from TS " << i |
1540 | 6 | << "\nFlags:\n" << *flags; |
1541 | 3 | ts->Shutdown(); |
1542 | 3 | CHECK_OK(ts->Restart()); |
1543 | 3 | } |
1544 | | |
1545 | | // Ensure that the replicas converge. |
1546 | | // We don't know exactly how many rows got inserted, since the writer |
1547 | | // probably saw many errors which left inserts in indeterminate state. |
1548 | | // But, we should have at least as many as we got confirmation for. |
1549 | 1 | ClusterVerifier cluster_verifier(cluster_.get()); |
1550 | 1 | ASSERT_NO_FATALS(cluster_verifier.CheckCluster()); |
1551 | 1 | ASSERT_NO_FATALS(cluster_verifier.CheckRowCount(workload.table_name(), ClusterVerifier::AT_LEAST, |
1552 | 1 | workload.rows_inserted())); |
1553 | 1 | } |
1554 | | |
1555 | | // This test sets all of the election timers to be very short, resulting |
1556 | | // in a lot of churn. We expect to make some progress and not diverge or |
1557 | | // crash, despite the frequent re-elections and races. |
1558 | 1 | TEST_F(RaftConsensusITest, TestChurnyElections) { |
1559 | 1 | DoTestChurnyElections(WITHOUT_NOTIFICATION_LATENCY); |
1560 | 1 | } |
1561 | | |
1562 | | // The same test, except inject artificial latency when propagating notifications |
1563 | | // from the queue back to consensus. This can reproduce bugs like KUDU-1078 which |
1564 | | // normally only appear under high load. TODO: Re-enable once we get to the |
1565 | | // bottom of KUDU-1078. |
1566 | 0 | TEST_F(RaftConsensusITest, DISABLED_TestChurnyElections_WithNotificationLatency) { |
1567 | 0 | DoTestChurnyElections(WITH_NOTIFICATION_LATENCY); |
1568 | 0 | } |
1569 | | |
1570 | 1 | void RaftConsensusITest::DoTestChurnyElections(bool with_latency) { |
1571 | 1 | vector<string> ts_flags, master_flags; |
1572 | | |
1573 | | // On TSAN builds, we need to be a little bit less churny in order to make |
1574 | | // any progress at all. |
1575 | 1 | ts_flags.push_back(Format("--raft_heartbeat_interval_ms=$0", NonTsanVsTsan(5, 25))); |
1576 | 1 | ts_flags.push_back("--never_fsync"); |
1577 | 1 | if (with_latency) { |
1578 | 0 | ts_flags.push_back("--consensus_inject_latency_ms_in_notifications=50"); |
1579 | 0 | } |
1580 | | |
1581 | 1 | CreateCluster("raft_consensus-itest-cluster", ts_flags, master_flags); |
1582 | | |
1583 | 1 | TestWorkload workload(cluster_.get()); |
1584 | 1 | workload.set_timeout_allowed(true); |
1585 | 1 | workload.set_write_timeout_millis(100); |
1586 | 1 | workload.set_num_write_threads(2); |
1587 | 1 | workload.set_write_batch_size(1); |
1588 | 1 | workload.set_sequential_write(true); |
1589 | 1 | workload.Setup(); |
1590 | 1 | workload.Start(); |
1591 | | |
1592 | | // Run for either a prescribed number of writes, or 30 seconds, |
1593 | | // whichever comes first. This prevents test timeouts on slower |
1594 | | // build machines, TSAN builds, etc. |
1595 | 1 | Stopwatch sw; |
1596 | 1 | sw.start(); |
1597 | 1 | const int kNumWrites = AllowSlowTests() ? 10000 : 1000; |
1598 | 61 | while (workload.rows_inserted() < kNumWrites && |
1599 | 60 | (sw.elapsed().wall_seconds() < 30 || |
1600 | | // If no rows are inserted, run a little longer. |
1601 | 60 | (workload.rows_inserted() == 0 && sw.elapsed().wall_seconds() < 90))) { |
1602 | 60 | SleepFor(MonoDelta::FromMilliseconds(10)); |
1603 | 60 | ASSERT_NO_FATALS(AssertNoTabletServersCrashed()); |
1604 | 60 | } |
1605 | 1 | workload.StopAndJoin(); |
1606 | 1 | LOG(INFO) << "rows_inserted=" << workload.rows_inserted(); |
1607 | 2 | ASSERT_GT(workload.rows_inserted(), 0) << "No rows inserted"; |
1608 | | |
1609 | | // Ensure that the replicas converge. |
1610 | | // We don't know exactly how many rows got inserted, since the writer |
1611 | | // probably saw many errors which left inserts in indeterminate state. |
1612 | | // But, we should have at least as many as we got confirmation for. |
1613 | 1 | ClusterVerifier cluster_verifier(cluster_.get()); |
1614 | 1 | ASSERT_NO_FATALS(cluster_verifier.CheckCluster()); |
1615 | 1 | ASSERT_NO_FATALS(cluster_verifier.CheckRowCount( |
1616 | 1 | workload.table_name(), ClusterVerifier::AT_LEAST, workload.rows_inserted())); |
1617 | 0 | ASSERT_NO_FATALS(AssertNoTabletServersCrashed()); |
1618 | 0 | } |
1619 | | |
1620 | 1 | TEST_F(RaftConsensusITest, MultiThreadedInsertWithFailovers) { |
1621 | 1 | int kNumElections = FLAGS_num_replicas; |
1622 | 1 | vector<string> master_flags; |
1623 | | |
1624 | 1 | if (AllowSlowTests()) { |
1625 | 0 | FLAGS_num_tablet_servers = 7; |
1626 | 0 | FLAGS_num_replicas = 7; |
1627 | 0 | kNumElections = 3 * FLAGS_num_replicas; |
1628 | 0 | master_flags.push_back("--replication_factor=7"); |
1629 | 0 | } |
1630 | | |
1631 | | // Reset consensus rpc timeout to the default value or the election might fail often. |
1632 | 1 | FLAGS_consensus_rpc_timeout_ms = 1000; |
1633 | | |
1634 | | // Start a 7 node configuration cluster (since we can't bring leaders back we start with a |
1635 | | // higher replica count so that we kill more leaders). |
1636 | | |
1637 | 1 | ASSERT_NO_FATALS(BuildAndStart({}, master_flags)); |
1638 | | |
1639 | 1 | OverrideFlagForSlowTests( |
1640 | 1 | "client_inserts_per_thread", |
1641 | 1 | strings::Substitute("$0", (FLAGS_client_inserts_per_thread * 100))); |
1642 | 1 | OverrideFlagForSlowTests( |
1643 | 1 | "client_num_batches_per_thread", |
1644 | 1 | strings::Substitute("$0", (FLAGS_client_num_batches_per_thread * 100))); |
1645 | | |
1646 | 1 | int num_threads = FLAGS_num_client_threads; |
1647 | 1 | int64_t total_num_rows = num_threads * FLAGS_client_inserts_per_thread; |
1648 | | |
1649 | | // We create 2 * (kNumReplicas - 1) latches so that we kill the same node at least |
1650 | | // twice. |
1651 | 1 | vector<CountDownLatch*> latches; |
1652 | 3 | for (int i = 1; i < kNumElections; i++) { |
1653 | 2 | latches.push_back(new CountDownLatch((i * total_num_rows) / kNumElections)); |
1654 | 2 | } |
1655 | | |
1656 | 9 | for (int i = 0; i < num_threads; i++) { |
1657 | 8 | scoped_refptr<yb::Thread> new_thread; |
1658 | 8 | CHECK_OK(yb::Thread::Create("test", strings::Substitute("ts-test$0", i), |
1659 | 8 | &RaftConsensusITest::InsertTestRowsRemoteThread, |
1660 | 8 | this, i * FLAGS_client_inserts_per_thread, |
1661 | 8 | FLAGS_client_inserts_per_thread, |
1662 | 8 | FLAGS_client_num_batches_per_thread, |
1663 | 8 | latches, |
1664 | 8 | &new_thread)); |
1665 | 8 | threads_.push_back(new_thread); |
1666 | 8 | } |
1667 | | |
1668 | 2 | for (CountDownLatch* latch : latches) { |
1669 | 2 | latch->Wait(); |
1670 | 2 | StopOrKillLeaderAndElectNewOne(); |
1671 | 2 | } |
1672 | | |
1673 | 8 | for (scoped_refptr<yb::Thread> thr : threads_) { |
1674 | 8 | CHECK_OK(ThreadJoiner(thr.get()).Join()); |
1675 | 8 | } |
1676 | | |
1677 | 1 | ASSERT_ALL_REPLICAS_AGREE(FLAGS_client_inserts_per_thread * FLAGS_num_client_threads); |
1678 | 0 | STLDeleteElements(&latches); |
1679 | 0 | } |
1680 | | |
1681 | | // Test automatic leader election by killing leaders. |
1682 | 1 | TEST_F(RaftConsensusITest, TestAutomaticLeaderElection) { |
1683 | 1 | vector<string> master_flags; |
1684 | 1 | if (AllowSlowTests()) { |
1685 | 0 | FLAGS_num_tablet_servers = 5; |
1686 | 0 | FLAGS_num_replicas = 5; |
1687 | 0 | master_flags.push_back("--replication_factor=5"); |
1688 | 0 | } |
1689 | 1 | ASSERT_NO_FATALS(BuildAndStart({}, master_flags)); |
1690 | | |
1691 | 1 | TServerDetails* leader; |
1692 | 1 | ASSERT_OK(GetLeaderReplicaWithRetries(tablet_id_, &leader)); |
1693 | | |
1694 | 1 | unordered_set<TServerDetails*> killed_leaders; |
1695 | | |
1696 | 1 | const int kNumLeadersToKill = FLAGS_num_replicas / 2; |
1697 | 1 | const int kFinalNumReplicas = FLAGS_num_replicas / 2 + 1; |
1698 | | |
1699 | 3 | for (int leaders_killed = 0; leaders_killed < kFinalNumReplicas; leaders_killed++) { |
1700 | 2 | LOG(INFO) << Substitute("Writing data to leader of $0-node config ($1 alive)...", |
1701 | 2 | FLAGS_num_replicas, FLAGS_num_replicas - leaders_killed); |
1702 | | |
1703 | 2 | ASSERT_NO_FATALS(InsertTestRowsRemoteThread( |
1704 | 2 | leaders_killed * FLAGS_client_inserts_per_thread, |
1705 | 2 | FLAGS_client_inserts_per_thread, |
1706 | 2 | FLAGS_client_num_batches_per_thread, |
1707 | 2 | vector<CountDownLatch*>())); |
1708 | | |
1709 | | // At this point, the writes are flushed but the commit index may not be |
1710 | | // propagated to all replicas. We kill the leader anyway. |
1711 | 2 | if (leaders_killed < kNumLeadersToKill) { |
1712 | 1 | LOG(INFO) << "Killing current leader " << leader->instance_id.permanent_uuid() << "..."; |
1713 | 1 | cluster_->tablet_server_by_uuid(leader->uuid())->Shutdown(); |
1714 | 1 | InsertOrDie(&killed_leaders, leader); |
1715 | | |
1716 | 1 | LOG(INFO) << "Waiting for new guy to be elected leader."; |
1717 | 1 | ASSERT_OK(GetLeaderReplicaWithRetries(tablet_id_, &leader)); |
1718 | 1 | } |
1719 | 2 | } |
1720 | | |
1721 | | // Restart every node that was killed, and wait for the nodes to converge. |
1722 | 1 | for (TServerDetails* killed_node : killed_leaders) { |
1723 | 1 | CHECK_OK(cluster_->tablet_server_by_uuid(killed_node->uuid())->Restart()); |
1724 | 1 | } |
1725 | | // Verify the data on the remaining replicas. |
1726 | 1 | ASSERT_ALL_REPLICAS_AGREE(FLAGS_client_inserts_per_thread * kFinalNumReplicas); |
1727 | 1 | } |
1728 | | |
1729 | | // Single-replica leader election test. |
1730 | 1 | TEST_F(RaftConsensusITest, TestAutomaticLeaderElectionOneReplica) { |
1731 | 1 | FLAGS_num_tablet_servers = 1; |
1732 | 1 | FLAGS_num_replicas = 1; |
1733 | 1 | ASSERT_NO_FATALS(BuildAndStart({}, {"--replication_factor=1"})); |
1734 | | |
1735 | 1 | TServerDetails* leader; |
1736 | 1 | ASSERT_OK(GetLeaderReplicaWithRetries(tablet_id_, &leader)); |
1737 | 1 | } |
1738 | | |
1739 | 3 | void RaftConsensusITest::StubbornlyWriteSameRowThread(int replica_idx, const AtomicBool* finish) { |
1740 | 3 | vector<TServerDetails*> servers = TServerDetailsVector(tablet_servers_); |
1741 | | |
1742 | 3 | CHECK_LT(replica_idx, servers.size()); |
1743 | 3 | TServerDetails* ts = servers[replica_idx]; |
1744 | | |
1745 | | // Manually construct an RPC to our target replica. We expect most of the calls |
1746 | | // to fail either with an "already present" or an error because we are writing |
1747 | | // to a follower. That's OK, though - what we care about for this test is |
1748 | | // just that the operations Apply() in the same order everywhere (even though |
1749 | | // in this case the result will just be an error). |
1750 | 3 | WriteRequestPB req; |
1751 | 3 | WriteResponsePB resp; |
1752 | 3 | RpcController rpc; |
1753 | 3 | req.set_tablet_id(tablet_id_); |
1754 | 3 | AddTestRowInsert(kTestRowKey, kTestRowIntVal, "hello world", &req); |
1755 | | |
1756 | 58.8k | while (!finish->Load()) { |
1757 | 58.8k | resp.Clear(); |
1758 | 58.8k | rpc.Reset(); |
1759 | 58.8k | rpc.set_timeout(MonoDelta::FromSeconds(10)); |
1760 | 58.8k | WARN_NOT_OK(ts->tserver_proxy->Write(req, &resp, &rpc), "Write failed"); |
1761 | 10 | VLOG(1) << "Response from server " << replica_idx << ": " |
1762 | 10 | << resp.ShortDebugString(); |
1763 | 58.8k | } |
1764 | 3 | } |
1765 | | |
1766 | | // Regression test for KUDU-597, an issue where we could mis-order operations on |
1767 | | // a machine if the following sequence occurred: |
1768 | | // 1) Replica is a FOLLOWER |
1769 | | // 2) A client request hits the machine |
1770 | | // 3) It receives some operations from the current leader |
1771 | | // 4) It gets elected LEADER |
1772 | | // In this scenario, it would incorrectly sequence the client request's PREPARE phase |
1773 | | // before the operations received in step (3), even though the correct behavior would be |
1774 | | // to either reject them or sequence them after those operations, because the operation |
1775 | | // index is higher. |
1776 | | // |
1777 | | // The test works by setting up three replicas and manually hammering them with write |
1778 | | // requests targeting a single row. If the bug exists, then OperationOrderVerifier |
1779 | | // will trigger an assertion because the prepare order and the op indexes will become |
1780 | | // misaligned. |
1781 | 1 | TEST_F(RaftConsensusITest, VerifyTransactionOrder) { |
1782 | 1 | FLAGS_num_tablet_servers = 3; |
1783 | 1 | ASSERT_NO_FATALS(BuildAndStart(vector<string>())); |
1784 | | |
1785 | 1 | AtomicBool finish(false); |
1786 | 4 | for (int i = 0; i < FLAGS_num_tablet_servers; i++) { |
1787 | 3 | scoped_refptr<yb::Thread> new_thread; |
1788 | 3 | CHECK_OK(yb::Thread::Create("test", strings::Substitute("ts-test$0", i), |
1789 | 3 | &RaftConsensusITest::StubbornlyWriteSameRowThread, |
1790 | 3 | this, i, &finish, &new_thread)); |
1791 | 3 | threads_.push_back(new_thread); |
1792 | 3 | } |
1793 | | |
1794 | 1 | const int num_loops = AllowSlowTests() ? 10 : 1; |
1795 | 2 | for (int i = 0; i < num_loops; i++) { |
1796 | 1 | StopOrKillLeaderAndElectNewOne(); |
1797 | 1 | SleepFor(MonoDelta::FromSeconds(1)); |
1798 | 1 | ASSERT_OK(CheckTabletServersAreAlive(FLAGS_num_tablet_servers)); |
1799 | 1 | } |
1800 | | |
1801 | 1 | finish.Store(true); |
1802 | 3 | for (scoped_refptr<yb::Thread> thr : threads_) { |
1803 | 3 | CHECK_OK(ThreadJoiner(thr.get()).Join()); |
1804 | 3 | } |
1805 | 1 | } |
1806 | | |
1807 | 215 | void RaftConsensusITest::AddOp(const OpIdPB& id, ConsensusRequestPB* req) { |
1808 | 215 | ReplicateMsg* msg = req->add_ops(); |
1809 | 215 | msg->mutable_id()->CopyFrom(id); |
1810 | 215 | msg->set_hybrid_time(clock_->Now().ToUint64()); |
1811 | 215 | msg->set_op_type(consensus::WRITE_OP); |
1812 | 215 | auto* write_req = msg->mutable_write(); |
1813 | 215 | int32_t key = static_cast<int32_t>(id.index() * 10000 + id.term()); |
1814 | 215 | string str_val = Substitute("term: $0 index: $1", id.term(), id.index()); |
1815 | 215 | AddKVToPB(key, key + 10, str_val, write_req->mutable_write_batch()); |
1816 | 215 | } |
1817 | | |
1818 | | // Regression test for KUDU-644: |
1819 | | // Triggers some complicated scenarios on the replica involving aborting and |
1820 | | // replacing transactions. |
1821 | 1 | TEST_F(RaftConsensusITest, TestReplicaBehaviorViaRPC) { |
1822 | 1 | FLAGS_num_tablet_servers = 3; |
1823 | 1 | auto ts_flags = { |
1824 | 1 | "--enable_leader_failure_detection=false"s, |
1825 | 1 | "--max_wait_for_safe_time_ms=100"s, |
1826 | 1 | "--propagate_safe_time=false"s, |
1827 | | // Disable bounded stale reads because the |
1828 | | // update consensus requests generated by this |
1829 | | // test don't update the hybrid timesetamp, so |
1830 | | // follower reads will always be stale. |
1831 | 1 | "--max_stale_read_bound_time_ms=0"s |
1832 | 1 | }; |
1833 | 1 | auto master_flags = { |
1834 | 1 | "--catalog_manager_wait_for_new_tablets_to_elect_leader=false"s, |
1835 | 1 | "--use_create_table_leader_hint=false"s, |
1836 | 1 | }; |
1837 | 1 | ASSERT_NO_FATALS(BuildAndStart(ts_flags, master_flags)); |
1838 | | |
1839 | | // Kill all the servers but one. |
1840 | 1 | TServerDetails *replica_ts; |
1841 | 1 | vector<TServerDetails*> tservers = TServerDetailsVector(tablet_servers_); |
1842 | 1 | ASSERT_EQ(3, tservers.size()); |
1843 | | |
1844 | | // Elect server 2 as leader and wait for log index 1 to propagate to all servers. |
1845 | 1 | ASSERT_OK(StartElection(tservers[2], tablet_id_, MonoDelta::FromSeconds(10))); |
1846 | 1 | ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(10), tablet_servers_, tablet_id_, 1)); |
1847 | | |
1848 | 1 | replica_ts = tservers[0]; |
1849 | 1 | cluster_->tablet_server_by_uuid(tservers[1]->uuid())->Shutdown(); |
1850 | 1 | cluster_->tablet_server_by_uuid(tservers[2]->uuid())->Shutdown(); |
1851 | | |
1852 | 1 | LOG(INFO) << "================================== Cluster setup complete."; |
1853 | | |
1854 | | // Check that the 'term' metric is correctly exposed. |
1855 | 1 | { |
1856 | 1 | int64_t term_from_metric = ASSERT_RESULT( |
1857 | 1 | cluster_->tablet_server_by_uuid(replica_ts->uuid())->GetInt64Metric( |
1858 | 1 | &METRIC_ENTITY_tablet, |
1859 | 1 | nullptr, |
1860 | 1 | &METRIC_raft_term, |
1861 | 1 | "value")); |
1862 | 1 | ASSERT_EQ(term_from_metric, 1); |
1863 | 1 | } |
1864 | | |
1865 | 1 | ConsensusServiceProxy* c_proxy = CHECK_NOTNULL(replica_ts->consensus_proxy.get()); |
1866 | | |
1867 | 1 | ConsensusRequestPB req; |
1868 | 1 | ConsensusResponsePB resp; |
1869 | 1 | RpcController rpc; |
1870 | | |
1871 | 1 | LOG(INFO) << "Send a simple request with no ops."; |
1872 | 1 | req.set_tablet_id(tablet_id_); |
1873 | 1 | req.set_dest_uuid(replica_ts->uuid()); |
1874 | 1 | req.set_caller_uuid("fake_caller"); |
1875 | 1 | req.set_caller_term(2); |
1876 | 1 | req.mutable_committed_op_id()->CopyFrom(MakeOpId(1, 1)); |
1877 | 1 | req.mutable_preceding_id()->CopyFrom(MakeOpId(1, 1)); |
1878 | | |
1879 | 1 | ASSERT_OK(c_proxy->UpdateConsensus(req, &resp, &rpc)); |
1880 | 2 | ASSERT_FALSE(resp.has_error()) << resp.DebugString(); |
1881 | | |
1882 | 1 | LOG(INFO) << "Send some operations, but don't advance the commit index. They should not commit."; |
1883 | 1 | AddOp(MakeOpId(2, 2), &req); |
1884 | 1 | AddOp(MakeOpId(2, 3), &req); |
1885 | 1 | AddOp(MakeOpId(2, 4), &req); |
1886 | 1 | rpc.Reset(); |
1887 | 1 | ASSERT_OK(c_proxy->UpdateConsensus(req, &resp, &rpc)); |
1888 | 2 | ASSERT_FALSE(resp.has_error()) << resp.DebugString(); |
1889 | | |
1890 | 1 | LOG(INFO) << "We shouldn't read anything yet, because the ops should be pending."; |
1891 | 1 | { |
1892 | 1 | vector<string> results; |
1893 | 1 | ASSERT_NO_FATALS(ScanReplica(replica_ts->tserver_proxy.get(), &results)); |
1894 | 2 | ASSERT_EQ(0, results.size()) << results; |
1895 | 1 | } |
1896 | | |
1897 | 1 | LOG(INFO) << "Send op 2.6, but set preceding OpId to 2.4. " |
1898 | 1 | << "This is an invalid request, and the replica should reject it."; |
1899 | 1 | req.mutable_preceding_id()->CopyFrom(MakeOpId(2, 4)); |
1900 | 1 | req.clear_ops(); |
1901 | 1 | AddOp(MakeOpId(2, 6), &req); |
1902 | 1 | rpc.Reset(); |
1903 | 1 | ASSERT_OK(c_proxy->UpdateConsensus(req, &resp, &rpc)); |
1904 | 2 | ASSERT_TRUE(resp.has_error()) << resp.DebugString(); |
1905 | 1 | ASSERT_EQ(resp.error().status().message(), |
1906 | 1 | "New operation's index does not follow the previous op's index. " |
1907 | 1 | "Current: 2.6. Previous: 2.4"); |
1908 | | |
1909 | 1 | resp.Clear(); |
1910 | 1 | req.clear_ops(); |
1911 | 1 | LOG(INFO) << "Send ops 3.5 and 2.6, then commit up to index 6, the replica " |
1912 | 1 | << "should fail because of the out-of-order terms."; |
1913 | 1 | req.mutable_preceding_id()->CopyFrom(MakeOpId(2, 4)); |
1914 | 1 | AddOp(MakeOpId(3, 5), &req); |
1915 | 1 | AddOp(MakeOpId(2, 6), &req); |
1916 | 1 | rpc.Reset(); |
1917 | 1 | ASSERT_OK(c_proxy->UpdateConsensus(req, &resp, &rpc)); |
1918 | 2 | ASSERT_TRUE(resp.has_error()) << resp.DebugString(); |
1919 | 1 | ASSERT_EQ(resp.error().status().message(), |
1920 | 1 | "New operation's term is not >= than the previous op's term. " |
1921 | 1 | "Current: 2.6. Previous: 3.5"); |
1922 | | |
1923 | 1 | LOG(INFO) << "Regression test for KUDU-639"; |
1924 | | // If we send a valid request, but the |
1925 | | // current commit index is higher than the data we're sending, we shouldn't |
1926 | | // commit anything higher than the last op sent by the leader. |
1927 | | // |
1928 | | // To test, we re-send operation 2.3, with the correct preceding ID 2.2, |
1929 | | // but we set the committed index to 2.4. This should only commit |
1930 | | // 2.2 and 2.3. |
1931 | 1 | resp.Clear(); |
1932 | 1 | req.clear_ops(); |
1933 | 1 | req.mutable_preceding_id()->CopyFrom(MakeOpId(2, 2)); |
1934 | 1 | AddOp(MakeOpId(2, 3), &req); |
1935 | 1 | req.mutable_committed_op_id()->CopyFrom(MakeOpId(2, 4)); |
1936 | 1 | rpc.Reset(); |
1937 | 1 | ASSERT_OK(c_proxy->UpdateConsensus(req, &resp, &rpc)); |
1938 | 2 | ASSERT_FALSE(resp.has_error()) << resp.DebugString(); |
1939 | 1 | LOG(INFO) << "Verify only 2.2 and 2.3 are committed."; |
1940 | 1 | { |
1941 | 1 | vector<string> results; |
1942 | 1 | ASSERT_NO_FATALS(WaitForRowCount(replica_ts->tserver_proxy.get(), 2, &results)); |
1943 | 1 | ASSERT_STR_CONTAINS(results[0], "term: 2 index: 2"); |
1944 | 1 | ASSERT_STR_CONTAINS(results[1], "term: 2 index: 3"); |
1945 | 1 | } |
1946 | | |
1947 | 1 | resp.Clear(); |
1948 | 1 | req.clear_ops(); |
1949 | 1 | LOG(INFO) << "Now send some more ops, and commit the earlier ones."; |
1950 | 1 | req.mutable_committed_op_id()->CopyFrom(MakeOpId(2, 4)); |
1951 | 1 | req.mutable_preceding_id()->CopyFrom(MakeOpId(2, 4)); |
1952 | 1 | AddOp(MakeOpId(2, 5), &req); |
1953 | 1 | AddOp(MakeOpId(2, 6), &req); |
1954 | 1 | rpc.Reset(); |
1955 | 1 | ASSERT_OK(c_proxy->UpdateConsensus(req, &resp, &rpc)); |
1956 | 2 | ASSERT_FALSE(resp.has_error()) << resp.DebugString(); |
1957 | | |
1958 | 1 | LOG(INFO) << "Verify they are committed."; |
1959 | 1 | { |
1960 | 1 | vector<string> results; |
1961 | 1 | ASSERT_NO_FATALS(WaitForRowCount(replica_ts->tserver_proxy.get(), 3, &results)); |
1962 | 1 | ASSERT_STR_CONTAINS(results[0], "term: 2 index: 2"); |
1963 | 1 | ASSERT_STR_CONTAINS(results[1], "term: 2 index: 3"); |
1964 | 1 | ASSERT_STR_CONTAINS(results[2], "term: 2 index: 4"); |
1965 | 1 | } |
1966 | | |
1967 | 1 | resp.Clear(); |
1968 | 1 | req.clear_ops(); |
1969 | 1 | int leader_term = 2; |
1970 | 1 | const int kNumTerms = AllowSlowTests() ? 10000 : 100; |
1971 | 99 | while (leader_term < kNumTerms) { |
1972 | 98 | leader_term++; |
1973 | | // Now pretend to be a new leader (term 3) and replace the earlier ops |
1974 | | // without committing the new replacements. |
1975 | 98 | req.set_caller_term(leader_term); |
1976 | 98 | req.set_caller_uuid("new_leader"); |
1977 | 98 | req.mutable_preceding_id()->CopyFrom(MakeOpId(2, 4)); |
1978 | 98 | req.clear_ops(); |
1979 | 98 | AddOp(MakeOpId(leader_term, 5), &req); |
1980 | 98 | AddOp(MakeOpId(leader_term, 6), &req); |
1981 | 98 | rpc.Reset(); |
1982 | 98 | ASSERT_OK(c_proxy->UpdateConsensus(req, &resp, &rpc)); |
1983 | 196 | ASSERT_FALSE(resp.has_error()) << "Req: " << req.ShortDebugString() |
1984 | 196 | << " Resp: " << resp.DebugString(); |
1985 | 98 | } |
1986 | | |
1987 | 1 | LOG(INFO) << "Send an empty request from the newest term which should commit " |
1988 | 1 | << "the earlier ops."; |
1989 | 1 | { |
1990 | 1 | req.mutable_preceding_id()->CopyFrom(MakeOpId(leader_term, 6)); |
1991 | 1 | req.mutable_committed_op_id()->CopyFrom(MakeOpId(leader_term, 6)); |
1992 | 1 | req.clear_ops(); |
1993 | 1 | rpc.Reset(); |
1994 | 1 | ASSERT_OK(c_proxy->UpdateConsensus(req, &resp, &rpc)); |
1995 | 2 | ASSERT_FALSE(resp.has_error()) << resp.DebugString(); |
1996 | 1 | } |
1997 | | |
1998 | 1 | LOG(INFO) << "Verify the new rows are committed."; |
1999 | 1 | { |
2000 | 1 | vector<string> results; |
2001 | 1 | ASSERT_NO_FATALS(WaitForRowCount(replica_ts->tserver_proxy.get(), 5, &results)); |
2002 | 1 | SCOPED_TRACE(results); |
2003 | 1 | ASSERT_STR_CONTAINS(results[3], Substitute("term: $0 index: 5", leader_term)); |
2004 | 1 | ASSERT_STR_CONTAINS(results[4], Substitute("term: $0 index: 6", leader_term)); |
2005 | 1 | } |
2006 | 1 | } |
2007 | | |
2008 | 1 | TEST_F(RaftConsensusITest, TestLeaderStepDown) { |
2009 | 1 | FLAGS_num_tablet_servers = 3; |
2010 | | |
2011 | 1 | vector<string> ts_flags = { |
2012 | 1 | "--enable_leader_failure_detection=false"s, |
2013 | 1 | }; |
2014 | 1 | vector<string> master_flags = { |
2015 | 1 | "--catalog_manager_wait_for_new_tablets_to_elect_leader=false"s, |
2016 | 1 | "--use_create_table_leader_hint=false"s, |
2017 | 1 | }; |
2018 | 1 | ASSERT_NO_FATALS(BuildAndStart(ts_flags, master_flags)); |
2019 | | |
2020 | 1 | vector<TServerDetails*> tservers = TServerDetailsVector(tablet_servers_); |
2021 | | |
2022 | | // Start with no leader. |
2023 | 1 | Status s = GetReplicaStatusAndCheckIfLeader(tservers[0], tablet_id_, MonoDelta::FromSeconds(10)); |
2024 | 2 | ASSERT_TRUE(s.IsIllegalState()) << "TS #0 should not be leader yet: " << s.ToString(); |
2025 | | |
2026 | | // Become leader. |
2027 | 1 | ASSERT_OK(StartElection(tservers[0], tablet_id_, MonoDelta::FromSeconds(10))); |
2028 | 1 | ASSERT_OK(WaitUntilLeader(tservers[0], tablet_id_, MonoDelta::FromSeconds(10))); |
2029 | 1 | ASSERT_OK(WriteSimpleTestRow( |
2030 | 1 | tservers[0], tablet_id_, kTestRowKey, kTestRowIntVal, "foo", MonoDelta::FromSeconds(10))); |
2031 | 1 | ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(10), tablet_servers_, tablet_id_, 2)); |
2032 | | |
2033 | 1 | LOG(INFO) << "Stepping down leader " << tservers[0]; |
2034 | | // Step down and verify that leadership gracefully transitions to a new leader |
2035 | | // despite failure detection being disabled. |
2036 | 1 | bool allow_graceful_leader_transfer = true; |
2037 | 1 | ASSERT_OK(LeaderStepDown( |
2038 | 1 | tservers[0], tablet_id_, nullptr, MonoDelta::FromSeconds(10), |
2039 | 1 | !allow_graceful_leader_transfer)); |
2040 | 1 | TServerDetails* new_leader_ts = nullptr; |
2041 | 1 | ASSERT_OK( |
2042 | 1 | FindTabletLeader(tablet_servers_, tablet_id_, MonoDelta::FromSeconds(10), &new_leader_ts)); |
2043 | 1 | ASSERT_TRUE(new_leader_ts != nullptr); |
2044 | 1 | LOG(INFO) << "Identified new leader " << new_leader_ts; |
2045 | | |
2046 | | // Verify that further leader stepdowns and writes against the original leader do not succeed. |
2047 | 1 | TabletServerErrorPB error; |
2048 | 1 | s = LeaderStepDown( |
2049 | 1 | tservers[0], tablet_id_, nullptr, MonoDelta::FromSeconds(10), !allow_graceful_leader_transfer, |
2050 | 1 | &error); |
2051 | 2 | ASSERT_TRUE(s.IsIllegalState()) << "TS #0 should not be leader anymore: " << s.ToString(); |
2052 | 2 | ASSERT_EQ(TabletServerErrorPB::NOT_THE_LEADER, error.code()) << error.ShortDebugString(); |
2053 | | |
2054 | 1 | s = WriteSimpleTestRow( |
2055 | 1 | tservers[0], tablet_id_, kTestRowKey, kTestRowIntVal, "foo", MonoDelta::FromSeconds(10)); |
2056 | 2 | ASSERT_TRUE(s.IsIllegalState()) << "TS #0 should not accept writes as follower: " |
2057 | 2 | << s.ToString(); |
2058 | | |
2059 | | // Stepping down a leader with graceful transition disabled should |
2060 | | // result in no leader being elected |
2061 | 1 | LOG(INFO) << "Stepping down leader " << new_leader_ts->uuid(); |
2062 | 1 | allow_graceful_leader_transfer = false; |
2063 | 1 | ASSERT_OK(LeaderStepDown( |
2064 | 1 | new_leader_ts, tablet_id_, nullptr, MonoDelta::FromSeconds(10), |
2065 | 1 | !allow_graceful_leader_transfer)); |
2066 | 1 | TServerDetails* final_leader_ts = nullptr; |
2067 | 1 | ASSERT_NOK( |
2068 | 1 | FindTabletLeader(tablet_servers_, tablet_id_, MonoDelta::FromSeconds(10), &final_leader_ts)); |
2069 | 1 | } |
2070 | | |
2071 | | // Test for #350: sets the consensus RPC timeout to be long, and freezes both followers before |
2072 | | // asking the leader to step down. Prior to fixing #350, the step-down process would block |
2073 | | // until the pending requests timed out. |
2074 | 1 | TEST_F(RaftConsensusITest, TestStepDownWithSlowFollower) { |
2075 | 1 | vector<string> ts_flags = { |
2076 | 1 | "--enable_leader_failure_detection=false", |
2077 | | // Bump up the RPC timeout, so that we can verify that the stepdown responds |
2078 | | // quickly even when an outbound request is hung. |
2079 | 1 | "--consensus_rpc_timeout_ms=15000", |
2080 | | // Make heartbeats more often so we can detect dead tservers faster. |
2081 | 1 | "--raft_heartbeat_interval_ms=10", |
2082 | | // Set it high enough so that the election rpcs don't time out. |
2083 | 1 | "--leader_failure_max_missed_heartbeat_periods=100" |
2084 | 1 | }; |
2085 | 1 | vector<string> master_flags = { |
2086 | 1 | "--catalog_manager_wait_for_new_tablets_to_elect_leader=false", |
2087 | 1 | "--tserver_unresponsive_timeout_ms=5000"s, |
2088 | 1 | "--use_create_table_leader_hint=false"s, |
2089 | 1 | }; |
2090 | 1 | BuildAndStart(ts_flags, master_flags); |
2091 | | |
2092 | 1 | vector<TServerDetails *> tservers = TServerDetailsVector(tablet_servers_); |
2093 | 1 | ASSERT_OK(StartElection(tservers[0], tablet_id_, MonoDelta::FromSeconds(10))); |
2094 | 1 | ASSERT_OK(WaitUntilLeader(tservers[0], tablet_id_, MonoDelta::FromSeconds(10))); |
2095 | | |
2096 | | // Stop both followers. |
2097 | 3 | for (int i = 1; i < 3; i++) { |
2098 | 2 | ASSERT_OK(cluster_->tablet_server_by_uuid(tservers[i]->uuid())->Pause()); |
2099 | 2 | } |
2100 | | |
2101 | | // Wait until the paused tservers have stopped heartbeating. |
2102 | 1 | ASSERT_OK(WaitUntilNumberOfAliveTServersEqual( |
2103 | 1 | 1, cluster_->GetMasterProxy<master::MasterClusterProxy>(), MonoDelta::FromSeconds(20))); |
2104 | | |
2105 | | // Step down should respond quickly despite the hung requests. |
2106 | 1 | ASSERT_OK(LeaderStepDown(tservers[0], tablet_id_, nullptr, MonoDelta::FromSeconds(3))); |
2107 | 1 | } |
2108 | | |
2109 | | void RaftConsensusITest::AssertMajorityRequiredForElectionsAndWrites( |
2110 | 12 | const TabletServerMapUnowned& tablet_servers, const string& leader_uuid) { |
2111 | | |
2112 | 12 | TServerDetails* initial_leader = FindOrDie(tablet_servers, leader_uuid); |
2113 | | |
2114 | | // Calculate number of servers to leave unpaused (minority). |
2115 | | // This math is a little unintuitive but works for cluster sizes including 2 and 1. |
2116 | | // Note: We assume all of these TSes are voters. |
2117 | 12 | auto config_size = tablet_servers.size(); |
2118 | 12 | auto minority_to_retain = MajoritySize(config_size) - 1; |
2119 | | |
2120 | | // Only perform this part of the test if we have some servers to pause, else |
2121 | | // the failure assertions will throw. |
2122 | 12 | if (config_size > 1) { |
2123 | | // Pause enough replicas to prevent a majority. |
2124 | 8 | auto num_to_pause = config_size - minority_to_retain; |
2125 | 8 | LOG(INFO) << "Pausing " << num_to_pause << " tablet servers in config of size " << config_size; |
2126 | 8 | vector<string> paused_uuids; |
2127 | 20 | for (const auto& entry : tablet_servers) { |
2128 | 20 | if (paused_uuids.size() == num_to_pause) { |
2129 | 8 | continue; |
2130 | 8 | } |
2131 | 12 | const string& replica_uuid = entry.first; |
2132 | 12 | if (replica_uuid == leader_uuid) { |
2133 | | // Always leave this one alone. |
2134 | 0 | continue; |
2135 | 0 | } |
2136 | 12 | ExternalTabletServer* replica_ts = cluster_->tablet_server_by_uuid(replica_uuid); |
2137 | 12 | ASSERT_OK(replica_ts->Pause()); |
2138 | 12 | paused_uuids.push_back(replica_uuid); |
2139 | 12 | } |
2140 | | |
2141 | | // Ensure writes timeout while only a minority is alive. |
2142 | 8 | Status s = WriteSimpleTestRow(initial_leader, tablet_id_, |
2143 | 8 | kTestRowKey, kTestRowIntVal, "foo", |
2144 | 8 | MonoDelta::FromMilliseconds(100)); |
2145 | 16 | ASSERT_TRUE(s.IsTimedOut()) << s.ToString(); |
2146 | | |
2147 | | // Step down. |
2148 | 8 | ASSERT_OK(LeaderStepDown(initial_leader, tablet_id_, nullptr, MonoDelta::FromSeconds(10))); |
2149 | | |
2150 | | // Assert that elections time out without a live majority. |
2151 | | // We specify a very short timeout here to keep the tests fast. |
2152 | 8 | ASSERT_OK(StartElection(initial_leader, tablet_id_, MonoDelta::FromSeconds(10))); |
2153 | 8 | s = WaitUntilLeader(initial_leader, tablet_id_, MonoDelta::FromMilliseconds(100)); |
2154 | 16 | ASSERT_TRUE(s.IsTimedOut()) << s.ToString(); |
2155 | 8 | LOG(INFO) << "Expected timeout encountered on election with weakened config: " << s.ToString(); |
2156 | | |
2157 | | // Resume the paused servers. |
2158 | 8 | LOG(INFO) << "Resuming " << num_to_pause << " tablet servers in config of size " << config_size; |
2159 | 12 | for (const string& replica_uuid : paused_uuids) { |
2160 | 12 | ExternalTabletServer* replica_ts = cluster_->tablet_server_by_uuid(replica_uuid); |
2161 | 12 | ASSERT_OK(replica_ts->Resume()); |
2162 | 12 | } |
2163 | 8 | } |
2164 | | |
2165 | 12 | ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(20), tablet_servers, tablet_id_, 1)); |
2166 | | |
2167 | | // Now an election should succeed. |
2168 | 12 | ASSERT_OK(StartElection(initial_leader, tablet_id_, MonoDelta::FromSeconds(10))); |
2169 | 12 | ASSERT_OK(WaitUntilLeader(initial_leader, tablet_id_, MonoDelta::FromSeconds(10))); |
2170 | 12 | LOG(INFO) << "Successful election with full config of size " << config_size; |
2171 | | |
2172 | | // And a write should also succeed. |
2173 | 12 | ASSERT_OK(WriteSimpleTestRow(initial_leader, tablet_id_, |
2174 | 12 | kTestRowKey, kTestRowIntVal, Substitute("qsz=$0", config_size), |
2175 | 12 | MonoDelta::FromSeconds(10))); |
2176 | 12 | } |
2177 | | |
2178 | | // Return the replicas of the specified 'tablet_id', as seen by the Master. |
2179 | | Status RaftConsensusITest::GetTabletLocations(const string& tablet_id, const MonoDelta& timeout, |
2180 | 3 | master::TabletLocationsPB* tablet_locations) { |
2181 | 3 | RpcController rpc; |
2182 | 3 | rpc.set_timeout(timeout); |
2183 | 3 | GetTabletLocationsRequestPB req; |
2184 | 3 | *req.add_tablet_ids() = tablet_id; |
2185 | 3 | GetTabletLocationsResponsePB resp; |
2186 | 3 | RETURN_NOT_OK(cluster_->GetMasterProxy<master::MasterClientProxy>().GetTabletLocations( |
2187 | 3 | req, &resp, &rpc)); |
2188 | 3 | if (resp.has_error()) { |
2189 | 0 | return StatusFromPB(resp.error().status()); |
2190 | 0 | } |
2191 | 3 | if (resp.errors_size() > 0) { |
2192 | 0 | CHECK_EQ(1, resp.errors_size()) << resp.ShortDebugString(); |
2193 | 0 | CHECK_EQ(tablet_id, resp.errors(0).tablet_id()) << resp.ShortDebugString(); |
2194 | 0 | return StatusFromPB(resp.errors(0).status()); |
2195 | 0 | } |
2196 | 0 | CHECK_EQ(1, resp.tablet_locations_size()) << resp.ShortDebugString(); |
2197 | 3 | *tablet_locations = resp.tablet_locations(0); |
2198 | 3 | return Status::OK(); |
2199 | 3 | } |
2200 | | |
2201 | | void RaftConsensusITest::WaitForReplicasReportedToMaster( |
2202 | | int num_replicas, const string& tablet_id, |
2203 | | const MonoDelta& timeout, |
2204 | | WaitForLeader wait_for_leader, |
2205 | | bool* has_leader, |
2206 | 3 | master::TabletLocationsPB* tablet_locations) { |
2207 | 3 | MonoTime deadline(MonoTime::Now()); |
2208 | 3 | deadline.AddDelta(timeout); |
2209 | 3 | while (true) { |
2210 | 3 | ASSERT_OK(GetTabletLocations(tablet_id, timeout, tablet_locations)); |
2211 | 3 | *has_leader = false; |
2212 | 3 | if (tablet_locations->replicas_size() == num_replicas) { |
2213 | 3 | for (const master::TabletLocationsPB_ReplicaPB& replica : |
2214 | 7 | tablet_locations->replicas()) { |
2215 | 7 | if (replica.role() == PeerRole::LEADER) { |
2216 | 3 | *has_leader = true; |
2217 | 3 | } |
2218 | 7 | } |
2219 | 3 | if (wait_for_leader == NO_WAIT_FOR_LEADER || |
2220 | 3 | (wait_for_leader == WAIT_FOR_LEADER && *has_leader)) { |
2221 | 3 | break; |
2222 | 3 | } |
2223 | 0 | } |
2224 | 0 | if (deadline.ComesBefore(MonoTime::Now())) break; |
2225 | 0 | SleepFor(MonoDelta::FromMilliseconds(20)); |
2226 | 0 | } |
2227 | 6 | ASSERT_EQ(num_replicas, tablet_locations->replicas_size()) << tablet_locations->DebugString(); |
2228 | 3 | if (wait_for_leader == WAIT_FOR_LEADER) { |
2229 | 2 | ASSERT_TRUE(*has_leader) << tablet_locations->DebugString(); |
2230 | 1 | } |
2231 | 3 | } |
2232 | | |
2233 | | // Basic tests of adding and removing servers from a configuration. |
2234 | 1 | TEST_F(RaftConsensusITest, TestAddRemoveVoter) { |
2235 | 1 | TestAddRemoveServer(PeerMemberType::PRE_VOTER); |
2236 | 1 | } |
2237 | | |
2238 | 1 | TEST_F(RaftConsensusITest, TestAddRemoveObserver) { |
2239 | 1 | TestAddRemoveServer(PeerMemberType::PRE_OBSERVER); |
2240 | 1 | } |
2241 | | |
2242 | | // Regression test for KUDU-1169: a crash when a Config Change operation is replaced |
2243 | | // by a later leader. |
2244 | 1 | TEST_F(RaftConsensusITest, TestReplaceChangeConfigOperation) { |
2245 | 1 | FLAGS_num_tablet_servers = 3; |
2246 | 1 | vector<string> ts_flags = { "--enable_leader_failure_detection=false"s }; |
2247 | 1 | vector<string> master_flags = { |
2248 | 1 | "--catalog_manager_wait_for_new_tablets_to_elect_leader=false"s, |
2249 | 1 | "--use_create_table_leader_hint=false"s, |
2250 | 1 | }; |
2251 | 1 | ASSERT_NO_FATALS(BuildAndStart(ts_flags, master_flags)); |
2252 | | |
2253 | 1 | vector<TServerDetails*> tservers = TServerDetailsVector(tablet_servers_); |
2254 | 1 | ASSERT_EQ(FLAGS_num_tablet_servers, tservers.size()); |
2255 | | |
2256 | 1 | TServerDetails* leader_tserver = tservers[0]; |
2257 | 1 | LOG(INFO) << "Elect server 0 (" << leader_tserver->uuid() |
2258 | 1 | << ") as leader and wait for log index 1 to propagate to all servers."; |
2259 | | |
2260 | 1 | auto original_followers = CreateTabletServerMapUnowned(tablet_servers_); |
2261 | 1 | ASSERT_EQ(1, original_followers.erase(leader_tserver->uuid())); |
2262 | | |
2263 | 1 | const MonoDelta timeout = MonoDelta::FromSeconds(10); |
2264 | 1 | ASSERT_OK(StartElection(leader_tserver, tablet_id_, timeout)); |
2265 | 1 | ASSERT_OK(WaitForServersToAgree(timeout, tablet_servers_, tablet_id_, 1)); |
2266 | 1 | ASSERT_OK(WaitUntilCommittedOpIdIndexIs(1, leader_tserver, tablet_id_, timeout)); |
2267 | | |
2268 | 1 | LOG(INFO) << "Shut down servers 1 and 2, so that server 0 can't replicate anything."; |
2269 | 1 | cluster_->tablet_server_by_uuid(tservers[1]->uuid())->Shutdown(); |
2270 | 1 | cluster_->tablet_server_by_uuid(tservers[2]->uuid())->Shutdown(); |
2271 | | |
2272 | 1 | LOG(INFO) << "Now try to replicate a ChangeConfig operation. This should get stuck and time out"; |
2273 | 1 | LOG(INFO) << "because the server can't replicate any operations."; |
2274 | 1 | TabletServerErrorPB::Code error_code; |
2275 | 1 | Status s = RemoveServer(leader_tserver, tablet_id_, tservers[1], |
2276 | 1 | -1, MonoDelta::FromSeconds(1), |
2277 | 1 | &error_code, false /* retry */); |
2278 | 1 | ASSERT_TRUE(s.IsTimedOut()); |
2279 | | |
2280 | 1 | LOG(INFO) << "Pause the leader, and restart the other servers."; |
2281 | 1 | ASSERT_OK(cluster_->tablet_server_by_uuid(tservers[0]->uuid())->Pause()); |
2282 | 1 | ASSERT_OK(cluster_->tablet_server_by_uuid(tservers[1]->uuid())->Restart()); |
2283 | 1 | ASSERT_OK(cluster_->tablet_server_by_uuid(tservers[2]->uuid())->Restart()); |
2284 | | |
2285 | 1 | ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(10), original_followers, tablet_id_, 1)); |
2286 | | |
2287 | 1 | LOG(INFO) << "Elect one of the other servers: " << tservers[1]->uuid(); |
2288 | 1 | ASSERT_OK(StartElection(tservers[1], tablet_id_, MonoDelta::FromSeconds(10))); |
2289 | 1 | ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(10), original_followers, tablet_id_, 2)); |
2290 | | |
2291 | 1 | LOG(INFO) << "Resume the original leader. Its change-config operation will now be aborted " |
2292 | 1 | "since it was never replicated to the majority, and the new leader will have " |
2293 | 1 | "replaced the operation."; |
2294 | 1 | ASSERT_OK(cluster_->tablet_server_by_uuid(tservers[0]->uuid())->Resume()); |
2295 | | |
2296 | 1 | LOG(INFO) << "Insert some data and verify that it propagates to all servers."; |
2297 | 1 | ASSERT_NO_FATALS(InsertTestRowsRemoteThread(0, 10, 1, vector<CountDownLatch*>())); |
2298 | 1 | ASSERT_ALL_REPLICAS_AGREE(10); |
2299 | 1 | } |
2300 | | |
2301 | | // Test the atomic CAS arguments to ChangeConfig() add server and remove server. |
2302 | 1 | TEST_F(RaftConsensusITest, TestAtomicAddRemoveServer) { |
2303 | 1 | FLAGS_num_tablet_servers = 3; |
2304 | 1 | vector<string> ts_flags = { "--enable_leader_failure_detection=false" }; |
2305 | 1 | vector<string> master_flags = { |
2306 | 1 | "--catalog_manager_wait_for_new_tablets_to_elect_leader=false"s, |
2307 | 1 | "--use_create_table_leader_hint=false"s, |
2308 | 1 | }; |
2309 | 1 | ASSERT_NO_FATALS(BuildAndStart(ts_flags, master_flags)); |
2310 | | |
2311 | 1 | vector<TServerDetails*> tservers = TServerDetailsVector(tablet_servers_); |
2312 | 1 | ASSERT_EQ(FLAGS_num_tablet_servers, tservers.size()); |
2313 | | |
2314 | | // Elect server 0 as leader and wait for log index 1 to propagate to all servers. |
2315 | 1 | TServerDetails* leader_tserver = tservers[0]; |
2316 | 1 | const MonoDelta timeout = MonoDelta::FromSeconds(10); |
2317 | 1 | ASSERT_OK(StartElection(leader_tserver, tablet_id_, timeout)); |
2318 | 1 | ASSERT_OK(WaitForServersToAgree(timeout, tablet_servers_, tablet_id_, 1)); |
2319 | 1 | ASSERT_OK(WaitUntilCommittedOpIdIndexIs(1, leader_tserver, tablet_id_, timeout)); |
2320 | 1 | int64_t cur_log_index = 1; |
2321 | | |
2322 | 1 | auto active_tablet_servers = CreateTabletServerMapUnowned(tablet_servers_); |
2323 | | |
2324 | 1 | TServerDetails* follower_ts = tservers[2]; |
2325 | | |
2326 | | // Initial committed config should have opid_index == -1. |
2327 | | // Server should reject request to change config from opid other than this. |
2328 | 1 | int64_t invalid_committed_opid_index = 7; |
2329 | 1 | TabletServerErrorPB::Code error_code; |
2330 | 1 | Status s = RemoveServer(leader_tserver, tablet_id_, follower_ts, |
2331 | 1 | invalid_committed_opid_index, MonoDelta::FromSeconds(10), |
2332 | 1 | &error_code, false /* retry */); |
2333 | 1 | ASSERT_EQ(TabletServerErrorPB::CAS_FAILED, error_code); |
2334 | 1 | ASSERT_STR_CONTAINS(s.ToString(), "of 7 but the committed config has opid_index of -1"); |
2335 | | |
2336 | | // Specifying the correct committed opid index should work. |
2337 | 1 | int64_t committed_opid_index = -1; |
2338 | 1 | ASSERT_OK(RemoveServer(leader_tserver, tablet_id_, follower_ts, |
2339 | 1 | committed_opid_index, MonoDelta::FromSeconds(10))); |
2340 | | |
2341 | 1 | ASSERT_EQ(1, active_tablet_servers.erase(follower_ts->uuid())); |
2342 | 1 | ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(10), |
2343 | 1 | active_tablet_servers, tablet_id_, ++cur_log_index)); |
2344 | | |
2345 | | // Now, add the server back. Again, specifying something other than the |
2346 | | // latest committed_opid_index should fail. |
2347 | 1 | invalid_committed_opid_index = -1; // The old one is no longer valid. |
2348 | 1 | s = AddServer(leader_tserver, tablet_id_, follower_ts, PeerMemberType::VOTER, |
2349 | 1 | invalid_committed_opid_index, MonoDelta::FromSeconds(10), |
2350 | 1 | &error_code, false /* retry */); |
2351 | 1 | ASSERT_EQ(TabletServerErrorPB::CAS_FAILED, error_code); |
2352 | 1 | ASSERT_STR_CONTAINS(s.ToString(), "of -1 but the committed config has opid_index of 2"); |
2353 | | |
2354 | | // Specifying the correct committed opid index should work. |
2355 | | // The previous config change op is the latest entry in the log. |
2356 | 1 | committed_opid_index = cur_log_index; |
2357 | 1 | ASSERT_OK(AddServer(leader_tserver, tablet_id_, follower_ts, PeerMemberType::PRE_VOTER, |
2358 | 1 | committed_opid_index, MonoDelta::FromSeconds(10))); |
2359 | | |
2360 | 1 | InsertOrDie(&active_tablet_servers, follower_ts->uuid(), follower_ts); |
2361 | 1 | ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(10), |
2362 | 1 | active_tablet_servers, tablet_id_, ++cur_log_index)); |
2363 | 1 | } |
2364 | | |
2365 | | // Ensure that we can elect a server that is in the "pending" configuration. This is required by |
2366 | | // the Raft protocol. See Diego Ongaro's PhD thesis, section 4.1, where it states that "it is the |
2367 | | // caller's configuration that is used in reaching consensus, both for voting and for log |
2368 | | // replication". |
2369 | | // |
2370 | | // This test also tests the case where a node comes back from the dead to a leader that was not in |
2371 | | // its configuration when it died. That should also work, i.e. the revived node should accept |
2372 | | // writes from the new leader. |
2373 | 1 | TEST_F(RaftConsensusITest, TestElectPendingVoter) { |
2374 | | // Test plan: |
2375 | | // 1. Disable failure detection to avoid non-deterministic behavior. |
2376 | | // 2. Start with a configuration size of 5, all servers synced. |
2377 | | // 3. Remove one server from the configuration, wait until committed. |
2378 | | // 4. Pause the 3 remaining non-leaders (SIGSTOP). |
2379 | | // 5. Run a config change to add back the previously-removed server. |
2380 | | // Ensure that, while the op cannot be committed yet due to lack of a |
2381 | | // majority in the new config (only 2 out of 5 servers are alive), the op |
2382 | | // has been replicated to both the local leader and the new member. |
2383 | | // 6. Force the existing leader to step down. |
2384 | | // 7. Resume one of the paused nodes so that a majority (of the 5-node |
2385 | | // configuration, but not the original 4-node configuration) will be available. |
2386 | | // 8. Start a leader election on the new (pending) node. It should win. |
2387 | | // 9. Unpause the two remaining stopped nodes. |
2388 | | // 10. Wait for all nodes to sync to the new leader's log. |
2389 | 1 | FLAGS_num_tablet_servers = 5; |
2390 | 1 | FLAGS_num_replicas = 5; |
2391 | 1 | vector<string> ts_flags = { |
2392 | 1 | "--enable_leader_failure_detection=false"s, |
2393 | 1 | "--TEST_inject_latency_before_change_role_secs=10"s, |
2394 | 1 | }; |
2395 | 1 | vector<string> master_flags = { |
2396 | 1 | "--catalog_manager_wait_for_new_tablets_to_elect_leader=false"s, |
2397 | 1 | "--replication_factor=5"s, |
2398 | 1 | "--use_create_table_leader_hint=false"s, |
2399 | 1 | }; |
2400 | 1 | ASSERT_NO_FATALS(BuildAndStart(ts_flags, master_flags)); |
2401 | | |
2402 | 1 | vector<TServerDetails*> tservers = TServerDetailsVector(tablet_servers_); |
2403 | 1 | ASSERT_EQ(FLAGS_num_tablet_servers, tservers.size()); |
2404 | | |
2405 | | // Elect server 0 as leader and wait for log index 1 to propagate to all servers. |
2406 | 1 | TServerDetails* initial_leader = tservers[0]; |
2407 | 1 | const MonoDelta timeout = MonoDelta::FromSeconds(10); |
2408 | 1 | ASSERT_OK(StartElection(initial_leader, tablet_id_, timeout)); |
2409 | 1 | ASSERT_OK(WaitForServersToAgree(timeout, tablet_servers_, tablet_id_, 1)); |
2410 | 0 | ASSERT_OK(WaitUntilCommittedOpIdIndexIs(1, initial_leader, tablet_id_, timeout)); |
2411 | | |
2412 | | // The server we will remove and then bring back. |
2413 | 0 | TServerDetails* final_leader = tservers[4]; |
2414 | | |
2415 | | // Kill the master, so we can change the config without interference. |
2416 | 0 | cluster_->master()->Shutdown(); |
2417 | | |
2418 | | // Now remove server 4 from the configuration. |
2419 | 0 | auto active_tablet_servers = CreateTabletServerMapUnowned(tablet_servers_); |
2420 | 0 | LOG(INFO) << "Removing tserver with uuid " << final_leader->uuid(); |
2421 | 0 | ASSERT_OK(RemoveServer(initial_leader, tablet_id_, final_leader, boost::none, |
2422 | 0 | MonoDelta::FromSeconds(10))); |
2423 | 0 | ASSERT_EQ(1, active_tablet_servers.erase(final_leader->uuid())); |
2424 | 0 | int64_t cur_log_index = 2; |
2425 | 0 | ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(10), |
2426 | 0 | active_tablet_servers, tablet_id_, cur_log_index)); |
2427 | 0 | ASSERT_OK(WaitUntilCommittedOpIdIndexIs(cur_log_index, initial_leader, tablet_id_, timeout)); |
2428 | |
|
2429 | 0 | ASSERT_OK(DeleteTablet(final_leader, tablet_id_, tablet::TABLET_DATA_TOMBSTONED, boost::none, |
2430 | 0 | MonoDelta::FromSeconds(30))); |
2431 | | |
2432 | | // Now add server 4 back as a learner to the peers. |
2433 | 0 | LOG(INFO) << "Adding back Peer " << final_leader->uuid() << " and expecting timeout..."; |
2434 | 0 | ASSERT_OK(AddServer( |
2435 | 0 | initial_leader, tablet_id_, final_leader, PeerMemberType::PRE_VOTER, boost::none, |
2436 | 0 | MonoDelta::FromSeconds(10))); |
2437 | | |
2438 | | // Pause tablet servers 1 through 3, so they won't see the operation to change server 4 from |
2439 | | // learner to voter which will happen automatically once remote bootstrap for server 4 is |
2440 | | // completed. |
2441 | 0 | LOG(INFO) << "Pausing 3 replicas..."; |
2442 | 0 | for (int i = 1; i <= 3; i++) { |
2443 | 0 | ExternalTabletServer* replica_ts = cluster_->tablet_server_by_uuid(tservers[i]->uuid()); |
2444 | 0 | ASSERT_OK(replica_ts->Pause()); |
2445 | 0 | } |
2446 | | |
2447 | | // Reset to the unpaused servers. |
2448 | 0 | active_tablet_servers = CreateTabletServerMapUnowned(tablet_servers_); |
2449 | 0 | for (int i = 1; i <= 3; i++) { |
2450 | 0 | ASSERT_EQ(1, active_tablet_servers.erase(tservers[i]->uuid())); |
2451 | 0 | } |
2452 | | |
2453 | | // Adding a server will cause two calls to ChangeConfig. One to add the server as a learner, |
2454 | | // and another one to change its role to voter. |
2455 | 0 | ++cur_log_index; |
2456 | | |
2457 | | // Only wait for TS 0 and 4 to agree that the new change config op (CHANGE_ROLE for server 4, |
2458 | | // which will be automatically sent by the leader once remote bootstrap has completed) has been |
2459 | | // replicated. |
2460 | 0 | ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(60), |
2461 | 0 | active_tablet_servers, tablet_id_, ++cur_log_index)); |
2462 | | |
2463 | | // Now that TS 4 is electable (and pending), have TS 0 step down. |
2464 | 0 | LOG(INFO) << "Forcing Peer " << initial_leader->uuid() << " to step down..."; |
2465 | 0 | Status status = LeaderStepDown(initial_leader, tablet_id_, nullptr, MonoDelta::FromSeconds(10)); |
2466 | | // We allow illegal state for now as the leader step down does not succeed in this case when it |
2467 | | // has a pending config. Peer 4 will get elected though as it has new term and others |
2468 | | // in quorum to vote. |
2469 | 0 | if (!status.IsIllegalState()) { |
2470 | 0 | ASSERT_OK(status); |
2471 | 0 | } else { |
2472 | 0 | LOG(INFO) << "Resuming Peer " << tservers[2]->uuid() << " as leader did not step down..."; |
2473 | 0 | ASSERT_OK(cluster_->tablet_server_by_uuid(tservers[2]->uuid())->Resume()); |
2474 | 0 | InsertOrDie(&active_tablet_servers, tservers[2]->uuid(), tservers[2]); |
2475 | 0 | } |
2476 | | // Resume TS 1 so we have a majority of 3 to elect a new leader. |
2477 | 0 | LOG(INFO) << "Resuming Peer " << tservers[1]->uuid() << " ..."; |
2478 | 0 | ASSERT_OK(cluster_->tablet_server_by_uuid(tservers[1]->uuid())->Resume()); |
2479 | 0 | InsertOrDie(&active_tablet_servers, tservers[1]->uuid(), tservers[1]); |
2480 | | |
2481 | | // Now try to get TS 4 elected. It should succeed and push a NO_OP. |
2482 | 0 | LOG(INFO) << "Trying to elect Peer " << tservers[4]->uuid() << " ..."; |
2483 | 0 | ASSERT_OK(StartElection(final_leader, tablet_id_, MonoDelta::FromSeconds(10))); |
2484 | 0 | ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(10), |
2485 | 0 | active_tablet_servers, tablet_id_, ++cur_log_index)); |
2486 | | |
2487 | | // Resume the remaining paused nodes. |
2488 | 0 | LOG(INFO) << "Resuming remaining nodes..."; |
2489 | 0 | ASSERT_OK(cluster_->tablet_server_by_uuid(tservers[2]->uuid())->Resume()); |
2490 | 0 | ASSERT_OK(cluster_->tablet_server_by_uuid(tservers[3]->uuid())->Resume()); |
2491 | 0 | active_tablet_servers = CreateTabletServerMapUnowned(tablet_servers_); |
2492 | | |
2493 | | // Wait until the leader is sure that the old leader's lease is over. |
2494 | 0 | ASSERT_OK(WaitUntilLeader(final_leader, tablet_id_, |
2495 | 0 | MonoDelta::FromMilliseconds(GetAtomicFlag(&FLAGS_leader_lease_duration_ms)))); |
2496 | | |
2497 | | // Do one last operation on the new leader: an insert. |
2498 | 0 | ASSERT_OK(WriteSimpleTestRow(final_leader, tablet_id_, |
2499 | 0 | kTestRowKey, kTestRowIntVal, "Ob-La-Di, Ob-La-Da", |
2500 | 0 | MonoDelta::FromSeconds(10))); |
2501 | | |
2502 | | // Wait for all servers to replicate everything up through the last write op. |
2503 | 0 | ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(10), |
2504 | 0 | active_tablet_servers, tablet_id_, ++cur_log_index)); |
2505 | 0 | } |
2506 | | |
2507 | | // Writes test rows in ascending order to a single tablet server. |
2508 | | // Essentially a poor-man's version of TestWorkload that only operates on a |
2509 | | // single tablet. Does not batch, does not tolerate timeouts, and does not |
2510 | | // interact with the Master. 'rows_inserted' is used to determine row id and is |
2511 | | // incremented prior to each successful insert. Since a write failure results in |
2512 | | // a crash, as long as there is no crash then 'rows_inserted' will have a |
2513 | | // correct count at the end of the run. |
2514 | | // Crashes on any failure, so 'write_timeout' should be high. |
2515 | | void DoWriteTestRows(const TServerDetails* leader_tserver, |
2516 | | const string& tablet_id, |
2517 | | const MonoDelta& write_timeout, |
2518 | | std::atomic<int32_t>* rows_inserted, |
2519 | | std::atomic<int32_t>* row_key, |
2520 | 8 | const std::atomic<bool>* finish) { |
2521 | 6.14k | while (!finish->load()) { |
2522 | 6.13k | int cur_row_key = ++*row_key; |
2523 | 6.13k | Status write_status = WriteSimpleTestRow( |
2524 | 6.13k | leader_tserver, tablet_id, cur_row_key, cur_row_key, |
2525 | 6.13k | Substitute("key=$0", cur_row_key), write_timeout); |
2526 | 6.13k | if (!write_status.IsLeaderHasNoLease() && |
2527 | 6.11k | !write_status.IsLeaderNotReadyToServe()) { |
2528 | | // Temporary failures to write because of not having a valid leader lease are OK. We don't |
2529 | | // increment the number of rows inserted in that case. |
2530 | 6.11k | CHECK_OK(write_status); |
2531 | 6.11k | ++*rows_inserted; |
2532 | 6.11k | } |
2533 | 6.13k | } |
2534 | 8 | } |
2535 | | |
2536 | | // Test that config change works while running a workload. |
2537 | 1 | TEST_F(RaftConsensusITest, TestConfigChangeUnderLoad) { |
2538 | 1 | FLAGS_num_tablet_servers = 3; |
2539 | 1 | vector<string> ts_flags = { "--enable_leader_failure_detection=false" }; |
2540 | 1 | vector<string> master_flags = { |
2541 | 1 | "--catalog_manager_wait_for_new_tablets_to_elect_leader=false"s, |
2542 | 1 | "--use_create_table_leader_hint=false"s, |
2543 | 1 | }; |
2544 | 1 | ASSERT_NO_FATALS(BuildAndStart(ts_flags, master_flags)); |
2545 | | |
2546 | 1 | vector<TServerDetails*> tservers = TServerDetailsVector(tablet_servers_); |
2547 | 1 | ASSERT_EQ(FLAGS_num_tablet_servers, tservers.size()); |
2548 | | |
2549 | | // Elect server 0 as leader and wait for log index 1 to propagate to all servers. |
2550 | 1 | TServerDetails* leader_tserver = tservers[0]; |
2551 | 1 | MonoDelta timeout = MonoDelta::FromSeconds(10); |
2552 | 1 | ASSERT_OK(StartElection(leader_tserver, tablet_id_, timeout)); |
2553 | 1 | ASSERT_OK(WaitForServersToAgree(timeout, tablet_servers_, tablet_id_, 1)); |
2554 | 1 | ASSERT_OK(WaitUntilCommittedOpIdIndexIs(1, leader_tserver, tablet_id_, timeout)); |
2555 | | |
2556 | 1 | auto active_tablet_servers = CreateTabletServerMapUnowned(tablet_servers_); |
2557 | | |
2558 | | // Start a write workload. |
2559 | 1 | LOG(INFO) << "Starting write workload..."; |
2560 | 1 | std::atomic<int32_t> rows_inserted(0); |
2561 | 1 | std::atomic<int32_t> row_key(0); |
2562 | 1 | { |
2563 | 1 | std::atomic<bool> finish(false); |
2564 | 1 | vector<scoped_refptr<Thread> > threads; |
2565 | 1 | auto se = ScopeExit([&threads, &finish] { |
2566 | 1 | LOG(INFO) << "Joining writer threads..."; |
2567 | 1 | finish = true; |
2568 | 8 | for (const scoped_refptr<Thread> &thread : threads) { |
2569 | 8 | ASSERT_OK(ThreadJoiner(thread.get()).Join()); |
2570 | 8 | } |
2571 | 1 | }); |
2572 | | |
2573 | 1 | int num_threads = FLAGS_num_client_threads; |
2574 | 9 | for (int i = 0; i < num_threads; i++) { |
2575 | 8 | scoped_refptr<Thread> thread; |
2576 | 8 | ASSERT_OK(Thread::Create(CURRENT_TEST_NAME(), Substitute("row-writer-$0", i), |
2577 | 8 | &DoWriteTestRows, |
2578 | 8 | leader_tserver, |
2579 | 8 | tablet_id_, |
2580 | 8 | MonoDelta::FromSeconds(10), |
2581 | 8 | &rows_inserted, |
2582 | 8 | &row_key, |
2583 | 8 | &finish, |
2584 | 8 | &thread)); |
2585 | 8 | threads.push_back(thread); |
2586 | 8 | } |
2587 | | |
2588 | 1 | LOG(INFO) << "Removing servers..."; |
2589 | | // Go from 3 tablet servers down to 1 in the configuration. |
2590 | 1 | vector<int> remove_list = {2, 1}; |
2591 | 2 | for (int to_remove_idx : remove_list) { |
2592 | 2 | auto num_servers = active_tablet_servers.size(); |
2593 | 2 | LOG(INFO) << "Remove: Going from " << num_servers << " to " << num_servers - 1 << " replicas"; |
2594 | | |
2595 | 2 | TServerDetails *tserver_to_remove = tservers[to_remove_idx]; |
2596 | 2 | LOG(INFO) << "Removing tserver with uuid " << tserver_to_remove->uuid(); |
2597 | 2 | ASSERT_OK(RemoveServer(leader_tserver, tablet_id_, tserver_to_remove, boost::none, |
2598 | 2 | MonoDelta::FromSeconds(10))); |
2599 | 2 | ASSERT_EQ(1, active_tablet_servers.erase(tserver_to_remove->uuid())); |
2600 | 2 | ASSERT_OK(WaitUntilCommittedConfigNumVotersIs(active_tablet_servers.size(), |
2601 | 2 | leader_tserver, tablet_id_, |
2602 | 2 | MonoDelta::FromSeconds(10))); |
2603 | 2 | } |
2604 | | |
2605 | 1 | LOG(INFO) << "Adding servers..."; |
2606 | | // Add the tablet servers back, in reverse order, going from 1 to 3 servers in the |
2607 | | // configuration. |
2608 | 1 | vector<int> add_list = {1, 2}; |
2609 | 2 | for (int to_add_idx : add_list) { |
2610 | 2 | auto num_servers = active_tablet_servers.size(); |
2611 | 2 | LOG(INFO) << "Add: Going from " << num_servers << " to " << num_servers + 1 << " replicas"; |
2612 | | |
2613 | 2 | TServerDetails *tserver_to_add = tservers[to_add_idx]; |
2614 | 2 | LOG(INFO) << "Adding tserver with uuid " << tserver_to_add->uuid(); |
2615 | 2 | ASSERT_OK(AddServer(leader_tserver, tablet_id_, tserver_to_add, PeerMemberType::PRE_VOTER, |
2616 | 2 | boost::none, MonoDelta::FromSeconds(10))); |
2617 | 2 | InsertOrDie(&active_tablet_servers, tserver_to_add->uuid(), tserver_to_add); |
2618 | 2 | ASSERT_OK(WaitUntilCommittedConfigNumVotersIs(active_tablet_servers.size(), |
2619 | 2 | leader_tserver, tablet_id_, |
2620 | 2 | MonoDelta::FromSeconds(10))); |
2621 | 2 | } |
2622 | 1 | } |
2623 | | |
2624 | 1 | LOG(INFO) << "Waiting for replicas to agree... (rows_inserted=" << rows_inserted |
2625 | 1 | << ", unique row keys used: " << row_key << ")"; |
2626 | | // Wait for all servers to replicate everything up through the last write op. |
2627 | | // Since we don't batch, there should be at least # rows inserted log entries, |
2628 | | // plus the initial leader's no-op, plus 2 for the removed servers, plus 2 for |
2629 | | // the added servers for a total of 5. |
2630 | 1 | int min_log_index = rows_inserted + 5; |
2631 | 1 | ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(10), |
2632 | 1 | active_tablet_servers, tablet_id_, |
2633 | 1 | min_log_index)); |
2634 | | |
2635 | 1 | LOG(INFO) << "Number of rows inserted: " << rows_inserted.load(); |
2636 | 1 | ASSERT_ALL_REPLICAS_AGREE(rows_inserted.load()); |
2637 | 1 | } |
2638 | | |
2639 | 1 | TEST_F(RaftConsensusITest, TestMasterNotifiedOnConfigChange) { |
2640 | 1 | MonoDelta timeout = MonoDelta::FromSeconds(30); |
2641 | 1 | FLAGS_num_tablet_servers = 3; |
2642 | 1 | FLAGS_num_replicas = 2; |
2643 | 1 | vector<string> ts_flags; |
2644 | 1 | vector<string> master_flags; |
2645 | 1 | master_flags.push_back("--replication_factor=2"); |
2646 | 1 | ASSERT_NO_FATALS(BuildAndStart(ts_flags, master_flags)); |
2647 | | |
2648 | 1 | LOG(INFO) << "Finding tablet leader and waiting for things to start..."; |
2649 | 1 | string tablet_id = tablet_replicas_.begin()->first; |
2650 | | |
2651 | | // Determine the list of tablet servers currently in the config. |
2652 | 1 | TabletServerMapUnowned active_tablet_servers; |
2653 | 1 | for (itest::TabletReplicaMap::const_iterator iter = tablet_replicas_.find(tablet_id); |
2654 | 3 | iter != tablet_replicas_.end(); ++iter) { |
2655 | 2 | InsertOrDie(&active_tablet_servers, iter->second->uuid(), iter->second); |
2656 | 2 | } |
2657 | | |
2658 | | // Determine the server to add to the config. |
2659 | 1 | string uuid_to_add; |
2660 | 3 | for (const TabletServerMap::value_type& entry : tablet_servers_) { |
2661 | 3 | if (!ContainsKey(active_tablet_servers, entry.second->uuid())) { |
2662 | 1 | uuid_to_add = entry.second->uuid(); |
2663 | 1 | } |
2664 | 3 | } |
2665 | 1 | ASSERT_FALSE(uuid_to_add.empty()); |
2666 | | |
2667 | | // Get a baseline config reported to the master. |
2668 | 1 | LOG(INFO) << "Waiting for Master to see the current replicas..."; |
2669 | 1 | master::TabletLocationsPB tablet_locations; |
2670 | 1 | bool has_leader; |
2671 | 1 | ASSERT_NO_FATALS(WaitForReplicasReportedToMaster(2, tablet_id, timeout, WAIT_FOR_LEADER, |
2672 | 1 | &has_leader, &tablet_locations)); |
2673 | 1 | LOG(INFO) << "Tablet locations:\n" << tablet_locations.DebugString(); |
2674 | | |
2675 | | // Wait for initial NO_OP to be committed by the leader. |
2676 | 1 | TServerDetails* leader_ts; |
2677 | 1 | ASSERT_OK(FindTabletLeader(tablet_servers_, tablet_id, timeout, &leader_ts)); |
2678 | 1 | int64_t expected_index = 0; |
2679 | 1 | ASSERT_OK(WaitForServersToAgree(timeout, active_tablet_servers, tablet_id, 1, &expected_index)); |
2680 | 1 | ASSERT_OK(WaitUntilCommittedOpIdIndexIs(expected_index, leader_ts, tablet_id, timeout)); |
2681 | 1 | expected_index += 2; // Adding a new peer generates two ChangeConfig requests. |
2682 | | |
2683 | | // Change the config. |
2684 | 1 | TServerDetails* tserver_to_add = tablet_servers_[uuid_to_add].get(); |
2685 | 1 | LOG(INFO) << "Adding tserver with uuid " << tserver_to_add->uuid(); |
2686 | 1 | ASSERT_OK(AddServer(leader_ts, tablet_id_, tserver_to_add, PeerMemberType::PRE_VOTER, boost::none, |
2687 | 1 | timeout)); |
2688 | 1 | ASSERT_OK(WaitForServersToAgree(timeout, tablet_servers_, tablet_id_, expected_index)); |
2689 | 1 | ++expected_index; |
2690 | | |
2691 | | // Wait for the master to be notified of the config change. |
2692 | | // It should continue to have the same leader, even without waiting. |
2693 | 1 | LOG(INFO) << "Waiting for Master to see config change..."; |
2694 | 1 | ASSERT_NO_FATALS(WaitForReplicasReportedToMaster(3, tablet_id, timeout, NO_WAIT_FOR_LEADER, |
2695 | 1 | &has_leader, &tablet_locations)); |
2696 | 2 | ASSERT_TRUE(has_leader) << tablet_locations.DebugString(); |
2697 | 1 | LOG(INFO) << "Tablet locations:\n" << tablet_locations.DebugString(); |
2698 | | |
2699 | | // Change the config again. |
2700 | 1 | LOG(INFO) << "Removing tserver with uuid " << tserver_to_add->uuid(); |
2701 | 1 | ASSERT_OK(RemoveServer(leader_ts, tablet_id_, tserver_to_add, boost::none, timeout)); |
2702 | 1 | active_tablet_servers = CreateTabletServerMapUnowned(tablet_servers_); |
2703 | 1 | ASSERT_EQ(1, active_tablet_servers.erase(tserver_to_add->uuid())); |
2704 | 1 | ASSERT_OK(WaitForServersToAgree(timeout, active_tablet_servers, tablet_id_, expected_index)); |
2705 | | |
2706 | | // Wait for the master to be notified of the removal. |
2707 | 1 | LOG(INFO) << "Waiting for Master to see config change..."; |
2708 | 1 | ASSERT_NO_FATALS(WaitForReplicasReportedToMaster(2, tablet_id, timeout, NO_WAIT_FOR_LEADER, |
2709 | 1 | &has_leader, &tablet_locations)); |
2710 | 2 | ASSERT_TRUE(has_leader) << tablet_locations.DebugString(); |
2711 | 1 | LOG(INFO) << "Tablet locations:\n" << tablet_locations.DebugString(); |
2712 | 1 | } |
2713 | | |
2714 | | // Test that we can create (vivify) a new tablet via remote bootstrap. |
2715 | 1 | TEST_F(RaftConsensusITest, TestAutoCreateReplica) { |
2716 | 1 | FLAGS_num_tablet_servers = 3; |
2717 | 1 | FLAGS_num_replicas = 2; |
2718 | 1 | std::vector<std::string> ts_flags = { |
2719 | 1 | "--enable_leader_failure_detection=false", |
2720 | 1 | "--log_cache_size_limit_mb=1", |
2721 | 1 | "--log_segment_size_mb=1", |
2722 | 1 | "--log_async_preallocate_segments=false", |
2723 | 1 | "--maintenance_manager_polling_interval_ms=300", |
2724 | 1 | Format("--db_write_buffer_size=$0", 256_KB), |
2725 | 1 | "--remote_bootstrap_begin_session_timeout_ms=15000" |
2726 | 1 | }; |
2727 | 1 | std::vector<std::string> master_flags = { |
2728 | 1 | "--catalog_manager_wait_for_new_tablets_to_elect_leader=false"s, |
2729 | 1 | "--replication_factor=2"s, |
2730 | 1 | "--use_create_table_leader_hint=false"s, |
2731 | 1 | }; |
2732 | 1 | ASSERT_NO_FATALS(BuildAndStart(ts_flags, master_flags)); |
2733 | | |
2734 | | // 50K is enough to cause flushes & log rolls. |
2735 | 1 | int num_rows_to_write = 50000; |
2736 | 1 | if (AllowSlowTests()) { |
2737 | 0 | num_rows_to_write = 150000; |
2738 | 0 | } |
2739 | 1 | num_rows_to_write = NonTsanVsTsan(num_rows_to_write, num_rows_to_write / 4); |
2740 | | |
2741 | 1 | vector<TServerDetails*> tservers = TServerDetailsVector(tablet_servers_); |
2742 | 1 | ASSERT_EQ(FLAGS_num_tablet_servers, tservers.size()); |
2743 | | |
2744 | 1 | itest::TabletServerMapUnowned active_tablet_servers; |
2745 | 1 | auto iter = tablet_replicas_.find(tablet_id_); |
2746 | 1 | TServerDetails* leader = iter->second; |
2747 | 1 | TServerDetails* follower = (++iter)->second; |
2748 | 1 | InsertOrDie(&active_tablet_servers, leader->uuid(), leader); |
2749 | 1 | InsertOrDie(&active_tablet_servers, follower->uuid(), follower); |
2750 | | |
2751 | 1 | TServerDetails* new_node = nullptr; |
2752 | 1 | for (TServerDetails* ts : tservers) { |
2753 | 1 | if (!ContainsKey(active_tablet_servers, ts->uuid())) { |
2754 | 1 | new_node = ts; |
2755 | 1 | break; |
2756 | 1 | } |
2757 | 1 | } |
2758 | 1 | ASSERT_TRUE(new_node != nullptr); |
2759 | | |
2760 | | // Elect the leader (still only a consensus config size of 2). |
2761 | 1 | ASSERT_OK(StartElection(leader, tablet_id_, MonoDelta::FromSeconds(10))); |
2762 | 1 | ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(30), active_tablet_servers, |
2763 | 1 | tablet_id_, 1)); |
2764 | | |
2765 | 1 | TestWorkload workload(cluster_.get()); |
2766 | 1 | workload.set_table_name(kTableName); |
2767 | 1 | workload.set_num_write_threads(10); |
2768 | 1 | workload.set_write_batch_size(100); |
2769 | 1 | workload.set_sequential_write(true); |
2770 | 1 | workload.Setup(); |
2771 | | |
2772 | 1 | LOG(INFO) << "Starting write workload..."; |
2773 | 1 | workload.Start(); |
2774 | | |
2775 | 1 | while (true) { |
2776 | 0 | auto rows_inserted = workload.rows_inserted(); |
2777 | 0 | if (rows_inserted >= num_rows_to_write) { |
2778 | 0 | break; |
2779 | 0 | } |
2780 | 0 | LOG(INFO) << "Only inserted " << rows_inserted << " rows so far, sleeping for 100ms"; |
2781 | 0 | SleepFor(MonoDelta::FromMilliseconds(100)); |
2782 | 0 | } |
2783 | | |
2784 | 1 | LOG(INFO) << "Adding tserver with uuid " << new_node->uuid() << " as VOTER..."; |
2785 | 1 | ASSERT_OK(AddServer(leader, tablet_id_, new_node, PeerMemberType::PRE_VOTER, boost::none, |
2786 | 1 | MonoDelta::FromSeconds(10))); |
2787 | 0 | InsertOrDie(&active_tablet_servers, new_node->uuid(), new_node); |
2788 | 0 | ASSERT_OK(WaitUntilCommittedConfigNumVotersIs( |
2789 | 0 | active_tablet_servers.size(), leader, tablet_id_, |
2790 | 0 | MonoDelta::FromSeconds(NonTsanVsTsan(20, 60)))); |
2791 | |
|
2792 | 0 | workload.StopAndJoin(); |
2793 | 0 | auto num_batches = workload.batches_completed(); |
2794 | |
|
2795 | 0 | LOG(INFO) << "Waiting for replicas to agree..."; |
2796 | | // Wait for all servers to replicate everything up through the last write op. |
2797 | | // Since we don't batch, there should be at least # rows inserted log entries, |
2798 | | // plus the initial leader's no-op, plus 1 for |
2799 | | // the added replica for a total == #rows + 2. |
2800 | 0 | auto min_log_index = num_batches + 2; |
2801 | 0 | ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(120), |
2802 | 0 | active_tablet_servers, tablet_id_, |
2803 | 0 | min_log_index)); |
2804 | |
|
2805 | 0 | auto rows_inserted = workload.rows_inserted(); |
2806 | 0 | LOG(INFO) << "Number of rows inserted: " << rows_inserted; |
2807 | 0 | ASSERT_ALL_REPLICAS_AGREE(rows_inserted); |
2808 | 0 | } |
2809 | | |
2810 | 1 | TEST_F(RaftConsensusITest, TestMemoryRemainsConstantDespiteTwoDeadFollowers) { |
2811 | 1 | const int64_t kMinRejections = 100; |
2812 | 1 | const MonoDelta kMaxWaitTime = MonoDelta::FromSeconds(60); |
2813 | | |
2814 | | // Start the cluster with a low per-tablet transaction memory limit, so that |
2815 | | // the test can complete faster. |
2816 | 1 | std::vector<std:: string> flags = { |
2817 | 1 | "--tablet_operation_memory_limit_mb=2"s |
2818 | 1 | }; |
2819 | | |
2820 | 1 | ASSERT_NO_FATALS(BuildAndStart(flags)); |
2821 | | |
2822 | | // Kill both followers. |
2823 | 1 | TServerDetails* details; |
2824 | 1 | ASSERT_OK(GetLeaderReplicaWithRetries(tablet_id_, &details)); |
2825 | 1 | int num_shutdown = 0; |
2826 | 1 | ssize_t leader_ts_idx = -1; |
2827 | 4 | for (size_t i = 0; i < cluster_->num_tablet_servers(); i++) { |
2828 | 3 | ExternalTabletServer* ts = cluster_->tablet_server(i); |
2829 | 3 | if (ts->instance_id().permanent_uuid() != details->uuid()) { |
2830 | 2 | ts->Shutdown(); |
2831 | 2 | num_shutdown++; |
2832 | 1 | } else { |
2833 | 1 | leader_ts_idx = i; |
2834 | 1 | } |
2835 | 3 | } |
2836 | 1 | ASSERT_EQ(2, num_shutdown); |
2837 | 1 | ASSERT_NE(-1, leader_ts_idx); |
2838 | | |
2839 | | // Because the majority of the cluster is dead and because of this workload's |
2840 | | // timeout behavior, more and more wedged transactions will accumulate in the |
2841 | | // leader. To prevent memory usage from skyrocketing, the leader will |
2842 | | // eventually reject new transactions. That's what we're testing for here. |
2843 | 1 | TestWorkload workload(cluster_.get()); |
2844 | 1 | workload.set_table_name(kTableName); |
2845 | 1 | workload.set_timeout_allowed(true); |
2846 | 1 | workload.set_write_timeout_millis(50); |
2847 | 1 | workload.Setup(); |
2848 | 1 | workload.Start(); |
2849 | | |
2850 | | // Run until the former leader has rejected several transactions. |
2851 | 1 | MonoTime deadline = MonoTime::Now(); |
2852 | 1 | deadline.AddDelta(kMaxWaitTime); |
2853 | 1 | while (true) { |
2854 | 0 | int64_t num_rejections = ASSERT_RESULT(cluster_->tablet_server(leader_ts_idx)->GetInt64Metric( |
2855 | 0 | &METRIC_ENTITY_tablet, |
2856 | 0 | nullptr, |
2857 | 0 | &METRIC_not_leader_rejections, |
2858 | 0 | "value")); |
2859 | 0 | if (num_rejections >= kMinRejections) { |
2860 | 0 | break; |
2861 | 0 | } else if (deadline.ComesBefore(MonoTime::Now())) { |
2862 | 0 | FAIL() << "Ran for " << kMaxWaitTime.ToString() << ", deadline expired"; |
2863 | 0 | } |
2864 | 0 | SleepFor(MonoDelta::FromMilliseconds(200)); |
2865 | 0 | } |
2866 | 1 | } |
2867 | | |
2868 | 0 | static void EnableLogLatency(server::GenericServiceProxy* proxy) { |
2869 | 0 | typedef unordered_map<string, string> FlagMap; |
2870 | 0 | FlagMap flags; |
2871 | 0 | InsertOrDie(&flags, "log_inject_latency", "true"); |
2872 | 0 | InsertOrDie(&flags, "log_inject_latency_ms_mean", "1000"); |
2873 | 0 | for (const FlagMap::value_type& e : flags) { |
2874 | 0 | SetFlagRequestPB req; |
2875 | 0 | SetFlagResponsePB resp; |
2876 | 0 | RpcController rpc; |
2877 | 0 | req.set_flag(e.first); |
2878 | 0 | req.set_value(e.second); |
2879 | 0 | ASSERT_OK(proxy->SetFlag(req, &resp, &rpc)); |
2880 | 0 | } |
2881 | 0 | } |
2882 | | |
2883 | | // Check if hinted replica becomes leader. |
2884 | 1 | TEST_F(RaftConsensusITest, HintedLeader) { |
2885 | 1 | vector<string> master_flags = { |
2886 | 1 | "--use_create_table_leader_hint=true"s, |
2887 | 1 | "--TEST_create_table_leader_hint_min_lexicographic=true"s, |
2888 | 1 | }; |
2889 | | |
2890 | 1 | ASSERT_NO_FATALS(BuildAndStart({}, master_flags)); |
2891 | | |
2892 | 1 | std::string init_leader_hint = ""; |
2893 | 3 | for (const TabletServerMap::value_type& entry : tablet_servers_) { |
2894 | 3 | if (init_leader_hint == "" || init_leader_hint > entry.first) { |
2895 | 1 | init_leader_hint = entry.first; |
2896 | 1 | } |
2897 | 3 | } |
2898 | | |
2899 | 1 | LOG(INFO) << "hinted replica = " << init_leader_hint; |
2900 | | |
2901 | | // Check that hinted replica became leader. |
2902 | 1 | TServerDetails* leader; |
2903 | 1 | ASSERT_OK(GetLeaderReplicaWithRetries(tablet_id_, &leader)); |
2904 | 1 | ASSERT_EQ(init_leader_hint, leader->uuid()); |
2905 | | |
2906 | | // Insert 3MB worth of data. |
2907 | 1 | const int kNumWrites = 1000; |
2908 | 1 | ASSERT_NO_FATALS(WriteOpsToLeader(kNumWrites, 3_KB)); |
2909 | 1 | ASSERT_OK(WaitForServersToAgree(30s, tablet_servers_, tablet_id_, |
2910 | 1 | kNumWrites)); |
2911 | | |
2912 | | // Check that no leadership change occurred. |
2913 | 1 | auto op_id = ASSERT_RESULT(GetLastOpIdForReplica( |
2914 | 1 | tablet_id_, leader, consensus::RECEIVED_OPID, 10s)); |
2915 | 1 | ASSERT_EQ(op_id.term, 1); |
2916 | 1 | } |
2917 | | |
2918 | | // Run a regular workload with a leader that's writing to its WAL slowly. |
2919 | 1 | TEST_F(RaftConsensusITest, TestSlowLeader) { |
2920 | 1 | if (!AllowSlowTests()) return; |
2921 | 0 | ASSERT_NO_FATALS(BuildAndStart(vector<string>())); |
2922 | |
|
2923 | 0 | TServerDetails* leader; |
2924 | 0 | ASSERT_OK(GetLeaderReplicaWithRetries(tablet_id_, &leader)); |
2925 | 0 | ASSERT_NO_FATALS(EnableLogLatency(leader->generic_proxy.get())); |
2926 | |
|
2927 | 0 | TestWorkload workload(cluster_.get()); |
2928 | 0 | workload.set_table_name(kTableName); |
2929 | 0 | workload.Setup(); |
2930 | 0 | workload.Start(); |
2931 | 0 | SleepFor(MonoDelta::FromSeconds(60)); |
2932 | 0 | } |
2933 | | |
2934 | | // Run a regular workload with one follower that's writing to its WAL slowly. |
2935 | 1 | TEST_F(RaftConsensusITest, TestSlowFollower) { |
2936 | 1 | if (!AllowSlowTests()) return; |
2937 | 0 | ASSERT_NO_FATALS(BuildAndStart(vector<string>())); |
2938 | |
|
2939 | 0 | TServerDetails* leader; |
2940 | 0 | ASSERT_OK(GetLeaderReplicaWithRetries(tablet_id_, &leader)); |
2941 | 0 | int num_reconfigured = 0; |
2942 | 0 | for (size_t i = 0; i < cluster_->num_tablet_servers(); i++) { |
2943 | 0 | ExternalTabletServer* ts = cluster_->tablet_server(i); |
2944 | 0 | if (ts->instance_id().permanent_uuid() != leader->uuid()) { |
2945 | 0 | TServerDetails* follower; |
2946 | 0 | follower = GetReplicaWithUuidOrNull(tablet_id_, ts->instance_id().permanent_uuid()); |
2947 | 0 | ASSERT_TRUE(follower); |
2948 | 0 | ASSERT_NO_FATALS(EnableLogLatency(follower->generic_proxy.get())); |
2949 | 0 | num_reconfigured++; |
2950 | 0 | break; |
2951 | 0 | } |
2952 | 0 | } |
2953 | 0 | ASSERT_EQ(1, num_reconfigured); |
2954 | |
|
2955 | 0 | TestWorkload workload(cluster_.get()); |
2956 | 0 | workload.set_table_name(kTableName); |
2957 | 0 | workload.Setup(); |
2958 | 0 | workload.Start(); |
2959 | 0 | SleepFor(MonoDelta::FromSeconds(60)); |
2960 | 0 | } |
2961 | | |
2962 | | // Run a special workload that constantly updates a single row on a cluster |
2963 | | // where every replica is writing to its WAL slowly. |
2964 | 1 | TEST_F(RaftConsensusITest, TestHammerOneRow) { |
2965 | 1 | if (!AllowSlowTests()) return; |
2966 | 0 | ASSERT_NO_FATALS(BuildAndStart(vector<string>())); |
2967 | |
|
2968 | 0 | for (size_t i = 0; i < cluster_->num_tablet_servers(); i++) { |
2969 | 0 | ExternalTabletServer* ts = cluster_->tablet_server(i); |
2970 | 0 | TServerDetails* follower; |
2971 | 0 | follower = GetReplicaWithUuidOrNull(tablet_id_, ts->instance_id().permanent_uuid()); |
2972 | 0 | ASSERT_TRUE(follower); |
2973 | 0 | ASSERT_NO_FATALS(EnableLogLatency(follower->generic_proxy.get())); |
2974 | 0 | } |
2975 | |
|
2976 | 0 | TestWorkload workload(cluster_.get()); |
2977 | 0 | workload.set_table_name(kTableName); |
2978 | 0 | workload.set_pathological_one_row_enabled(true); |
2979 | 0 | workload.set_num_write_threads(20); |
2980 | 0 | workload.Setup(); |
2981 | 0 | workload.Start(); |
2982 | 0 | SleepFor(MonoDelta::FromSeconds(60)); |
2983 | 0 | } |
2984 | | |
2985 | | // Test that followers that fall behind the leader's log GC threshold are |
2986 | | // evicted from the config. |
2987 | 1 | TEST_F(RaftConsensusITest, TestEvictAbandonedFollowers) { |
2988 | 1 | vector<string> ts_flags; |
2989 | 1 | AddFlagsForLogRolls(&ts_flags); // For CauseFollowerToFallBehindLogGC(). |
2990 | 1 | vector<string> master_flags; |
2991 | 1 | LOG(INFO) << __func__ << ": starting the cluster"; |
2992 | 1 | ASSERT_NO_FATALS(BuildAndStart(ts_flags, master_flags)); |
2993 | | |
2994 | 1 | MonoDelta timeout = MonoDelta::FromSeconds(30); |
2995 | 1 | auto active_tablet_servers = CreateTabletServerMapUnowned(tablet_servers_); |
2996 | 1 | ASSERT_EQ(3, active_tablet_servers.size()); |
2997 | | |
2998 | 1 | string leader_uuid; |
2999 | 1 | int64_t orig_term; |
3000 | 1 | string follower_uuid; |
3001 | 1 | LOG(INFO) << __func__ << ": causing followers to fall behind"; |
3002 | 1 | ASSERT_NO_FATALS(CauseFollowerToFallBehindLogGC(&leader_uuid, &orig_term, &follower_uuid)); |
3003 | | |
3004 | 0 | LOG(INFO) << __func__ << ": waiting for 2 voters in the committed config"; |
3005 | | // Wait for the abandoned follower to be evicted. |
3006 | 0 | ASSERT_OK(WaitUntilCommittedConfigNumVotersIs(2, |
3007 | 0 | tablet_servers_[leader_uuid].get(), |
3008 | 0 | tablet_id_, |
3009 | 0 | timeout)); |
3010 | 0 | ASSERT_EQ(1, active_tablet_servers.erase(follower_uuid)); |
3011 | 0 | LOG(INFO) << __func__ << ": waiting for servers to agree"; |
3012 | 0 | ASSERT_OK(WaitForServersToAgree(timeout, active_tablet_servers, tablet_id_, 2)); |
3013 | 0 | } |
3014 | | |
3015 | | // Test that followers that fall behind the leader's log GC threshold are |
3016 | | // evicted from the config. |
3017 | 1 | TEST_F(RaftConsensusITest, TestMasterReplacesEvictedFollowers) { |
3018 | 1 | vector<string> extra_flags; |
3019 | 1 | AddFlagsForLogRolls(&extra_flags); // For CauseFollowerToFallBehindLogGC(). |
3020 | 1 | ASSERT_NO_FATALS(BuildAndStart(extra_flags, {"--enable_load_balancing=true"})); |
3021 | | |
3022 | 1 | MonoDelta timeout = MonoDelta::FromSeconds(30); |
3023 | | |
3024 | 1 | string leader_uuid; |
3025 | 1 | int64_t orig_term; |
3026 | 1 | string follower_uuid; |
3027 | 1 | ASSERT_NO_FATALS(CauseFollowerToFallBehindLogGC(&leader_uuid, &orig_term, &follower_uuid)); |
3028 | | |
3029 | | // The follower will be evicted. Now wait for the master to cause it to be remotely bootstrapped. |
3030 | 0 | ASSERT_OK(WaitForServersToAgree(timeout, tablet_servers_, tablet_id_, 2)); |
3031 | |
|
3032 | 0 | ClusterVerifier cluster_verifier(cluster_.get()); |
3033 | 0 | ASSERT_NO_FATALS(cluster_verifier.CheckCluster()); |
3034 | 0 | ASSERT_NO_FATALS(cluster_verifier.CheckRowCount(kTableName, ClusterVerifier::AT_LEAST, 1)); |
3035 | 0 | } |
3036 | | |
3037 | | // Test that a ChangeConfig() request is rejected unless the leader has replicated one of its own |
3038 | | // log entries during the current term. This is required for correctness of Raft config change. For |
3039 | | // details, see https://groups.google.com/forum/#!topic/raft-dev/t4xj6dJTP6E |
3040 | 1 | TEST_F(RaftConsensusITest, TestChangeConfigRejectedUnlessNoopReplicated) { |
3041 | 1 | vector<string> ts_flags = { "--enable_leader_failure_detection=false" }; |
3042 | 1 | vector<string> master_flags = { |
3043 | 1 | "--catalog_manager_wait_for_new_tablets_to_elect_leader=false"s, |
3044 | 1 | "--use_create_table_leader_hint=false"s, |
3045 | 1 | }; |
3046 | 1 | ASSERT_NO_FATALS(BuildAndStart(ts_flags, master_flags)); |
3047 | | |
3048 | 1 | MonoDelta timeout = MonoDelta::FromSeconds(30); |
3049 | | |
3050 | 1 | int kLeaderIndex = 0; |
3051 | 1 | TServerDetails* leader_ts = tablet_servers_[cluster_->tablet_server(kLeaderIndex)->uuid()].get(); |
3052 | | |
3053 | | // Prevent followers from accepting UpdateConsensus requests from the leader, even though they |
3054 | | // will vote. This will allow us to get the distributed system into a state where there is a valid |
3055 | | // leader (based on winning an election) but that leader will be unable to commit any entries from |
3056 | | // its own term, making it illegal to accept ChangeConfig() requests. |
3057 | 3 | for (int i = 1; i <= 2; i++) { |
3058 | 2 | ASSERT_OK(cluster_->SetFlag(cluster_->tablet_server(i), |
3059 | 2 | "TEST_follower_reject_update_consensus_requests", "true")); |
3060 | 2 | } |
3061 | | |
3062 | | // Elect the leader. |
3063 | 1 | ASSERT_OK(StartElection(leader_ts, tablet_id_, timeout)); |
3064 | | |
3065 | | // We don't need to wait until the leader obtains a lease here. In fact, that will never happen, |
3066 | | // because the followers are rejecting UpdateConsensus, and the leader needs to majority-replicate |
3067 | | // a lease expiration that is in the future in order to establish a leader lease. |
3068 | 1 | ASSERT_OK(WaitUntilLeader(leader_ts, tablet_id_, timeout, LeaderLeaseCheckMode::DONT_NEED_LEASE)); |
3069 | | // Now attempt to do a config change. It should be rejected because there have not been any ops |
3070 | | // (notably the initial NO_OP) from the leader's term that have been committed yet. |
3071 | 1 | Status s = itest::RemoveServer(leader_ts, |
3072 | 1 | tablet_id_, |
3073 | 1 | tablet_servers_[cluster_->tablet_server(1)->uuid()].get(), |
3074 | 1 | boost::none, |
3075 | 1 | timeout, |
3076 | 1 | nullptr /* error_code */, |
3077 | 1 | false /* retry */); |
3078 | 2 | ASSERT_TRUE(s.IsLeaderNotReadyToServe()) << s; |
3079 | 1 | ASSERT_STR_CONTAINS(s.message().ToBuffer(), |
3080 | 1 | "Leader not yet replicated NoOp to be ready to serve requests"); |
3081 | 1 | } |
3082 | | |
3083 | 1 | TEST_F(RaftConsensusITest, TestChangeConfigBasedOnJepsen) { |
3084 | 1 | FLAGS_num_tablet_servers = 4; |
3085 | 1 | vector<string> ts_flags = { "--enable_leader_failure_detection=false", |
3086 | 1 | "--TEST_log_change_config_every_n=3000" }; |
3087 | 1 | vector<string> master_flags = { |
3088 | 1 | "--catalog_manager_wait_for_new_tablets_to_elect_leader=false"s, |
3089 | 1 | "--use_create_table_leader_hint=false"s, |
3090 | 1 | }; |
3091 | 1 | ASSERT_NO_FATALS(BuildAndStart(ts_flags, master_flags)); |
3092 | 1 | vector<TServerDetails*> tservers = TServerDetailsVector(tablet_servers_); |
3093 | 1 | ASSERT_EQ(FLAGS_num_tablet_servers, tservers.size()); |
3094 | | |
3095 | 1 | int leader_idx = -1; |
3096 | 1 | int new_node_idx = -1; |
3097 | 5 | for (int i = 0; i < 4; i++) { |
3098 | 4 | auto& uuid = cluster_->tablet_server(i)->uuid(); |
3099 | 4 | if (nullptr == GetReplicaWithUuidOrNull(tablet_id_, uuid)) { |
3100 | 1 | new_node_idx = i; |
3101 | 1 | continue; |
3102 | 3 | } else if (leader_idx == -1) { |
3103 | 1 | leader_idx = i; |
3104 | 1 | continue; |
3105 | 1 | } |
3106 | 4 | } |
3107 | | |
3108 | 0 | CHECK_NE(new_node_idx, -1) << "Could not find the node not having tablet_id_"; |
3109 | | |
3110 | 1 | TServerDetails* leader_ts = tablet_servers_[cluster_->tablet_server(leader_idx)->uuid()].get(); |
3111 | 1 | MonoDelta timeout = MonoDelta::FromSeconds(30); |
3112 | 1 | ASSERT_OK(StartElection(leader_ts, tablet_id_, timeout)); |
3113 | | |
3114 | 1 | ASSERT_OK(WaitUntilLeader(leader_ts, tablet_id_, timeout, LeaderLeaseCheckMode::NEED_LEASE)); |
3115 | | |
3116 | 1 | vector<TServerDetails*> tservers_list(4); |
3117 | 1 | int ts_idx = 0; |
3118 | 1 | tservers_list[ts_idx++] = tablet_servers_[cluster_->tablet_server(leader_idx)->uuid()].get(); |
3119 | 5 | for (int i = 0; i < 4; i++) { |
3120 | 4 | if (i == leader_idx || i == new_node_idx) { |
3121 | 2 | continue; |
3122 | 2 | } |
3123 | 2 | ASSERT_OK(cluster_->SetFlag(cluster_->tablet_server(i), |
3124 | 2 | "TEST_follower_reject_update_consensus_requests", "true")); |
3125 | 2 | tservers_list[ts_idx++] = tablet_servers_[cluster_->tablet_server(i)->uuid()].get(); |
3126 | 2 | } |
3127 | 1 | tservers_list[ts_idx++] = tablet_servers_[cluster_->tablet_server(new_node_idx)->uuid()].get(); |
3128 | | |
3129 | | // Now attempt to do a config change. It should be rejected because there have not been any ops |
3130 | | // (notably the initial NO_OP) from the leader's term that have been committed yet. |
3131 | 1 | Status s = itest::AddServer(leader_ts, |
3132 | 1 | tablet_id_, |
3133 | 1 | tablet_servers_[cluster_->tablet_server(new_node_idx)->uuid()].get(), |
3134 | 1 | PeerMemberType::PRE_VOTER, |
3135 | 1 | boost::none, |
3136 | 1 | timeout, |
3137 | 1 | nullptr /* error_code */, |
3138 | 1 | false /* retry */); |
3139 | 1 | LOG(INFO) << "Got status " << yb::ToString(s); |
3140 | 1 | const double kSleepDelaySec = 50; |
3141 | 1 | SleepFor(MonoDelta::FromSeconds(kSleepDelaySec)); |
3142 | 1 | LOG(INFO) << "Done Sleeping"; |
3143 | | |
3144 | 1 | auto committed_op_ids = ASSERT_RESULT(GetLastOpIdForEachReplica( |
3145 | 1 | tablet_id_, tservers_list, consensus::OpIdType::COMMITTED_OPID, timeout)); |
3146 | 1 | auto received_op_ids = ASSERT_RESULT(GetLastOpIdForEachReplica( |
3147 | 1 | tablet_id_, tservers_list, consensus::OpIdType::RECEIVED_OPID, timeout)); |
3148 | | |
3149 | 5 | for(int i = 0; i < 4; i++) { |
3150 | 4 | LOG(INFO) << "i = " << i << " Peer " << tservers_list[i]->uuid() |
3151 | 4 | << " Committed op id " << yb::ToString(committed_op_ids[i]) |
3152 | 4 | << " Last received op id " << yb::ToString(received_op_ids[i]); |
3153 | 4 | } |
3154 | | |
3155 | 1 | const OpId kLeaderCommittedOpId = committed_op_ids[0]; |
3156 | 1 | int num_voters_who_received_committed_op_id = 0; |
3157 | 4 | for(int i = 0; i < 3; i++) { |
3158 | 3 | if (kLeaderCommittedOpId <= received_op_ids[i]) { |
3159 | 3 | num_voters_who_received_committed_op_id++; |
3160 | 3 | } |
3161 | 3 | } |
3162 | 0 | CHECK_GE(num_voters_who_received_committed_op_id, 2) |
3163 | 0 | << "At least 2 voters should have received the op id"; |
3164 | 1 | } |
3165 | | |
3166 | | // Test that if for some reason none of the transactions can be prepared, that it will come back as |
3167 | | // an error in UpdateConsensus(). |
3168 | 1 | TEST_F(RaftConsensusITest, TestUpdateConsensusErrorNonePrepared) { |
3169 | 1 | const int kNumOps = 10; |
3170 | | |
3171 | 1 | vector<string> ts_flags = { |
3172 | 1 | "--enable_leader_failure_detection=false", |
3173 | 1 | }; |
3174 | 1 | vector<string> master_flags = { |
3175 | 1 | "--catalog_manager_wait_for_new_tablets_to_elect_leader=false"s, |
3176 | 1 | "--use_create_table_leader_hint=false"s, |
3177 | 1 | }; |
3178 | 1 | ASSERT_NO_FATALS(BuildAndStart(ts_flags, master_flags)); |
3179 | | |
3180 | 1 | vector<TServerDetails*> tservers = TServerDetailsVector(tablet_servers_); |
3181 | 1 | ASSERT_EQ(3, tservers.size()); |
3182 | | |
3183 | | // Shutdown the other servers so they don't get chatty. |
3184 | 1 | cluster_->tablet_server_by_uuid(tservers[1]->uuid())->Shutdown(); |
3185 | 1 | cluster_->tablet_server_by_uuid(tservers[2]->uuid())->Shutdown(); |
3186 | | |
3187 | | // Configure the first server to fail all on prepare. |
3188 | 1 | TServerDetails *replica_ts = tservers[0]; |
3189 | 1 | ASSERT_OK(cluster_->SetFlag(cluster_->tablet_server_by_uuid(replica_ts->uuid()), |
3190 | 1 | "TEST_follower_fail_all_prepare", "true")); |
3191 | | |
3192 | | // Pretend to be the leader and send a request that should return an error. |
3193 | 1 | ConsensusRequestPB req; |
3194 | 1 | ConsensusResponsePB resp; |
3195 | 1 | RpcController rpc; |
3196 | 1 | req.set_dest_uuid(replica_ts->uuid()); |
3197 | 1 | req.set_tablet_id(tablet_id_); |
3198 | 1 | req.set_caller_uuid(tservers[2]->instance_id.permanent_uuid()); |
3199 | 1 | req.set_caller_term(0); |
3200 | 1 | req.mutable_committed_op_id()->CopyFrom(MakeOpId(0, 0)); |
3201 | 1 | req.mutable_preceding_id()->CopyFrom(MakeOpId(0, 0)); |
3202 | 11 | for (int i = 0; i < kNumOps; i++) { |
3203 | 10 | AddOp(MakeOpId(0, 1 + i), &req); |
3204 | 10 | } |
3205 | | |
3206 | 1 | ASSERT_OK(replica_ts->consensus_proxy->UpdateConsensus(req, &resp, &rpc)); |
3207 | 1 | LOG(INFO) << resp.ShortDebugString(); |
3208 | 1 | ASSERT_TRUE(resp.status().has_error()); |
3209 | 1 | ASSERT_EQ(consensus::ConsensusErrorPB::CANNOT_PREPARE, resp.status().error().code()); |
3210 | 1 | ASSERT_STR_CONTAINS(resp.ShortDebugString(), "Could not prepare a single operation"); |
3211 | 1 | } |
3212 | | |
3213 | 1 | TEST_F(RaftConsensusITest, TestRemoveTserverFailsWhenVoterInTransition) { |
3214 | 1 | TestRemoveTserverFailsWhenServerInTransition(PeerMemberType::PRE_VOTER); |
3215 | 1 | } |
3216 | | |
3217 | 1 | TEST_F(RaftConsensusITest, TestRemoveTserverFailsWhenObserverInTransition) { |
3218 | 1 | TestRemoveTserverFailsWhenServerInTransition(PeerMemberType::PRE_OBSERVER); |
3219 | 1 | } |
3220 | | |
3221 | 1 | TEST_F(RaftConsensusITest, TestRemovePreObserverServerSucceeds) { |
3222 | 1 | TestRemoveTserverInTransitionSucceeds(PeerMemberType::PRE_VOTER); |
3223 | 1 | } |
3224 | | |
3225 | 1 | TEST_F(RaftConsensusITest, TestRemovePreVoterServerSucceeds) { |
3226 | 1 | TestRemoveTserverInTransitionSucceeds(PeerMemberType::PRE_OBSERVER); |
3227 | 1 | } |
3228 | | |
3229 | | // A test scenario to verify that a disruptive server doesn't start needless |
3230 | | // elections in case if it takes a long time to replicate Raft transactions |
3231 | | // from leader to follower replicas (e.g., due to slowness in WAL IO ops). |
3232 | 1 | TEST_F(RaftConsensusITest, DisruptiveServerAndSlowWAL) { |
3233 | 1 | const MonoDelta kTimeout = MonoDelta::FromSeconds(10); |
3234 | | // Shorten the heartbeat interval for faster test run time. |
3235 | 1 | const auto kHeartbeatIntervalMs = 200; |
3236 | 1 | const auto kMaxMissedHeartbeatPeriods = 3; |
3237 | 1 | const vector<string> ts_flags { |
3238 | 1 | Substitute("--raft_heartbeat_interval_ms=$0", kHeartbeatIntervalMs), |
3239 | 1 | Substitute("--leader_failure_max_missed_heartbeat_periods=$0", |
3240 | 1 | kMaxMissedHeartbeatPeriods), |
3241 | 1 | }; |
3242 | 1 | NO_FATALS(BuildAndStart(ts_flags)); |
3243 | | |
3244 | | // Sanity check: this scenario assumes there are 3 tablet servers. The test |
3245 | | // table is created with RF=FLAGS_num_replicas. |
3246 | 1 | ASSERT_EQ(3, FLAGS_num_replicas); |
3247 | 1 | ASSERT_EQ(3, tablet_servers_.size()); |
3248 | | |
3249 | | // A convenience array to access each tablet server as TServerDetails. |
3250 | 1 | vector<TServerDetails*> tservers = TServerDetailsVector(tablet_servers_); |
3251 | 1 | ASSERT_EQ(cluster_->num_tablet_servers(), tservers.size()); |
3252 | | |
3253 | | // The leadership might fluctuate before shutting down the third tablet |
3254 | | // server, so ASSERT_EVENTUALLY() below is for those rare cases. |
3255 | | // |
3256 | | // However, after one of the tablet servers is shutdown, the leadership should |
3257 | | // not fluctuate because: |
3258 | | // 1) only two voters out of three are alive |
3259 | | // 2) current leader should not be disturbed by any rogue votes -- that's |
3260 | | // the whole premise of this test scenario |
3261 | | // |
3262 | | // So, for this scenario the leadership can fluctuate only if significantly |
3263 | | // delaying or dropping Raft heartbeats sent from leader to follower replicas. |
3264 | | // However, minicluster components send heartbeats via the loopback interface, |
3265 | | // so no real network layer that might significantly delay heartbeats |
3266 | | // is involved. Also, the consensus RPC queues should not overflow |
3267 | | // in this scenario because the number of consensus RPCs is relatively low. |
3268 | 1 | TServerDetails* leader_tserver = nullptr; |
3269 | 1 | TServerDetails* other_tserver = nullptr; |
3270 | 1 | TServerDetails* shutdown_tserver = nullptr; |
3271 | 1 | ASSERT_EVENTUALLY([&] { |
3272 | | // This is a clean-up in case of retry. |
3273 | 1 | if (shutdown_tserver) { |
3274 | 1 | auto* ts = cluster_->tablet_server_by_uuid(shutdown_tserver->uuid()); |
3275 | 1 | if (ts->IsShutdown()) { |
3276 | 1 | ASSERT_OK(ts->Restart()); |
3277 | 1 | } |
3278 | 1 | } |
3279 | 1 | for (size_t idx = 0; idx < cluster_->num_tablet_servers(); ++idx) { |
3280 | 1 | auto* ts = cluster_->tablet_server(idx); |
3281 | 1 | ASSERT_OK(cluster_->SetFlag(ts, "log_inject_latency", "false")); |
3282 | 1 | } |
3283 | 1 | leader_tserver = nullptr; |
3284 | 1 | other_tserver = nullptr; |
3285 | 1 | shutdown_tserver = nullptr; |
3286 | | |
3287 | 1 | ASSERT_OK(GetLeaderReplicaWithRetries(tablet_id_, &leader_tserver)); |
3288 | 1 | ASSERT_OK(WriteSimpleTestRow(leader_tserver, tablet_id_, |
3289 | 1 | 0 /* key */, 0 /* int_val */, "" /* string_val */, kTimeout)); |
3290 | 1 | auto op_id = ASSERT_RESULT(GetLastOpIdForReplica( |
3291 | 1 | tablet_id_, leader_tserver, consensus::COMMITTED_OPID, kTimeout)); |
3292 | 1 | ASSERT_OK(WaitForServersToAgree(kTimeout, tablet_servers_, tablet_id_, op_id.index)); |
3293 | | // Shutdown one tablet server that doesn't host the leader replica of the |
3294 | | // target tablet and inject WAL latency to others. |
3295 | 1 | for (const auto& server : tservers) { |
3296 | 1 | auto* ts = cluster_->tablet_server_by_uuid(server->uuid()); |
3297 | 1 | const bool is_leader = server->uuid() == leader_tserver->uuid(); |
3298 | 1 | if (!is_leader && !shutdown_tserver) { |
3299 | 1 | shutdown_tserver = server; |
3300 | 1 | continue; |
3301 | 1 | } |
3302 | 1 | if (!is_leader && !other_tserver) { |
3303 | 1 | other_tserver = server; |
3304 | 1 | } |
3305 | | // For this scenario it's important to reserve some inactivity intervals |
3306 | | // for the follower between processing Raft messages from the leader. |
3307 | | // If a vote request arrives while replica is busy with processing |
3308 | | // Raft message from leader, it is rejected with 'busy' status before |
3309 | | // evaluating the vote withholding interval. |
3310 | 1 | const auto mult = is_leader ? 2 : 1; |
3311 | 1 | const auto latency_ms = mult * kHeartbeatIntervalMs * kMaxMissedHeartbeatPeriods; |
3312 | 1 | ASSERT_OK(cluster_->SetFlag(ts, "log_inject_latency_ms_mean", |
3313 | 1 | std::to_string(latency_ms))); |
3314 | 1 | ASSERT_OK(cluster_->SetFlag(ts, "log_inject_latency_ms_stddev", "0")); |
3315 | 1 | ASSERT_OK(cluster_->SetFlag(ts, "log_inject_latency", "true")); |
3316 | 1 | } |
3317 | | |
3318 | | // Shutdown the third tablet server. |
3319 | 1 | cluster_->tablet_server_by_uuid(shutdown_tserver->uuid())->Shutdown(); |
3320 | | |
3321 | | // Sanity check: make sure the leadership hasn't changed since the leader |
3322 | | // has been determined. |
3323 | 1 | TServerDetails* current_leader; |
3324 | 1 | ASSERT_OK(GetLeaderReplicaWithRetries(tablet_id_, ¤t_leader)); |
3325 | 1 | ASSERT_EQ(cluster_->tablet_server_index_by_uuid(leader_tserver->uuid()), |
3326 | 1 | cluster_->tablet_server_index_by_uuid(current_leader->uuid())); |
3327 | 1 | }); |
3328 | | |
3329 | | // Get the Raft term from the established leader. |
3330 | 1 | consensus::ConsensusStatePB cstate; |
3331 | 1 | ASSERT_OK(GetConsensusState(leader_tserver, tablet_id_, |
3332 | 1 | consensus::CONSENSUS_CONFIG_COMMITTED, kTimeout, &cstate)); |
3333 | | |
3334 | 1 | TestWorkload workload(cluster_.get()); |
3335 | 1 | workload.set_table_name(kTableName); |
3336 | 1 | workload.set_timeout_allowed(true); |
3337 | 1 | workload.set_num_write_threads(1); |
3338 | 1 | workload.set_write_batch_size(1); |
3339 | | // Make a 'space' for the artificial vote requests (see below) to arrive |
3340 | | // while there is no activity on RaftConsensus::Update(). |
3341 | 1 | workload.set_write_interval_millis(kHeartbeatIntervalMs); |
3342 | 1 | workload.Setup(); |
3343 | 1 | workload.Start(); |
3344 | | |
3345 | | // Issue rogue vote requests, imitating a disruptive tablet replica. |
3346 | 1 | const auto& shutdown_server_uuid = shutdown_tserver->uuid(); |
3347 | 1 | const auto next_term = cstate.current_term() + 1; |
3348 | 1 | const auto targets = { leader_tserver, other_tserver }; |
3349 | 1 | for (auto i = 0; i < 100; ++i) { |
3350 | 0 | SleepFor(MonoDelta::FromMilliseconds(kHeartbeatIntervalMs / 4)); |
3351 | 0 | for (const auto* ts : targets) { |
3352 | 0 | auto s = RequestVote(ts, tablet_id_, shutdown_server_uuid, |
3353 | 0 | next_term, MakeOpId(next_term + i, 0), |
3354 | 0 | /*ignore_live_leader=*/ false, |
3355 | 0 | /*is_pre_election=*/ false, |
3356 | 0 | kTimeout); |
3357 | | // Neither leader nor follower replica should grant 'yes' vote |
3358 | | // since the healthy leader is there and doing well, sending Raft |
3359 | | // transactions to be replicated. |
3360 | 0 | ASSERT_TRUE(s.IsInvalidArgument() || s.IsServiceUnavailable()) |
3361 | 0 | << s.ToString(); |
3362 | 0 | std::regex pattern( |
3363 | 0 | "(" |
3364 | 0 | "because replica is either leader or " |
3365 | 0 | "believes a valid leader to be alive" |
3366 | 0 | "|" |
3367 | 0 | "because replica is already servicing an update " |
3368 | 0 | "from a current leader or another vote" |
3369 | 0 | ")"); |
3370 | 0 | ASSERT_TRUE(std::regex_search(s.ToString(), pattern)); |
3371 | 0 | } |
3372 | 0 | } |
3373 | 1 | } |
3374 | | |
3375 | | // Checking that not yet committed split operation is correctly aborted after leader change and |
3376 | | // then new split op id is successfully set on all replicas after retry. |
3377 | 1 | TEST_F(RaftConsensusITest, SplitOpId) { |
3378 | 1 | RpcController rpc; |
3379 | 1 | const auto kTimeout = 60s * kTimeMultiplier; |
3380 | | |
3381 | 1 | FLAGS_num_tablet_servers = 3; |
3382 | 1 | FLAGS_num_replicas = 3; |
3383 | 1 | vector<string> ts_flags = { |
3384 | 1 | "--enable_leader_failure_detection=false"s, |
3385 | 1 | }; |
3386 | 1 | vector<string> master_flags = { |
3387 | 1 | "--catalog_manager_wait_for_new_tablets_to_elect_leader=false"s, |
3388 | 1 | "--use_create_table_leader_hint=false"s, |
3389 | 1 | }; |
3390 | 1 | ASSERT_NO_FATALS(BuildAndStart(ts_flags, master_flags)); |
3391 | | |
3392 | 1 | vector<TServerDetails*> tservers = TServerDetailsVector(tablet_servers_); |
3393 | 1 | ASSERT_EQ(FLAGS_num_tablet_servers, tservers.size()); |
3394 | | |
3395 | | // Elect server 0 as leader and wait for log index 1 to propagate to all servers. |
3396 | 1 | TServerDetails* initial_leader = tservers[0]; |
3397 | 1 | ASSERT_OK(StartElection(initial_leader, tablet_id_, kTimeout)); |
3398 | 1 | ASSERT_OK(WaitForServersToAgree(kTimeout, tablet_servers_, tablet_id_, 1)); |
3399 | 1 | ASSERT_OK(WaitUntilCommittedOpIdIndexIs(1, initial_leader, tablet_id_, kTimeout)); |
3400 | | |
3401 | 1 | LOG(INFO) << "Initial leader: " << initial_leader->uuid(); |
3402 | | |
3403 | 2 | auto pause_update_consensus = [&](ExternalTabletServer* tserver, bool value) { |
3404 | 2 | return cluster_->SetFlag( |
3405 | 2 | tserver, "TEST_follower_pause_update_consensus_requests", AsString(value)); |
3406 | 2 | }; |
3407 | | |
3408 | 3 | for (auto* tserver : cluster_->tserver_daemons()) { |
3409 | 3 | if (tserver->uuid() != initial_leader->uuid()) { |
3410 | 2 | ASSERT_OK(pause_update_consensus(tserver, true)); |
3411 | 2 | } |
3412 | 3 | } |
3413 | | |
3414 | | // Add SPLIT_OP to the leader. |
3415 | 1 | tablet::SplitTabletRequestPB req; |
3416 | 1 | req.set_tablet_id(tablet_id_); |
3417 | 1 | req.set_new_tablet1_id(GenerateObjectId()); |
3418 | 1 | req.set_new_tablet2_id(GenerateObjectId()); |
3419 | 1 | { |
3420 | 1 | const auto min_hash_code = std::numeric_limits<docdb::DocKeyHash>::max(); |
3421 | 1 | const auto max_hash_code = std::numeric_limits<docdb::DocKeyHash>::min(); |
3422 | 1 | const auto split_hash_code = (max_hash_code - min_hash_code) / 2 + min_hash_code; |
3423 | 1 | const auto partition_key = PartitionSchema::EncodeMultiColumnHashValue(split_hash_code); |
3424 | 1 | docdb::KeyBytes encoded_doc_key; |
3425 | 1 | docdb::DocKeyEncoderAfterTableIdStep(&encoded_doc_key).Hash( |
3426 | 1 | split_hash_code, std::vector<docdb::PrimitiveValue>()); |
3427 | 1 | req.set_split_encoded_key(encoded_doc_key.ToStringBuffer()); |
3428 | 1 | req.set_split_partition_key(partition_key); |
3429 | 1 | } |
3430 | 1 | req.set_dest_uuid(initial_leader->uuid()); |
3431 | 1 | tserver::SplitTabletResponsePB resp; |
3432 | | |
3433 | 1 | LOG(INFO) << "Sending Split RPC to the initial tablet leader"; |
3434 | 1 | CountDownLatch split_latch(1); |
3435 | 1 | initial_leader->tserver_admin_proxy->SplitTabletAsync(req, &resp, &rpc, [&split_latch, &resp]() { |
3436 | 1 | LOG(INFO) << "Split RPC response: " << AsString(resp); |
3437 | 1 | split_latch.CountDown(); |
3438 | 1 | }); |
3439 | | |
3440 | 1 | std::vector<OpIdPB> split_op_id_pbs; |
3441 | 1 | std::vector<OpId> split_op_ids; |
3442 | 2 | auto get_split_op_ids = [&]() -> Status { |
3443 | 1 | split_op_ids = VERIFY_RESULT(GetLastOpIdForEachReplica( |
3444 | 1 | tablet_id_, tservers, consensus::OpIdType::RECEIVED_OPID, kTimeout, |
3445 | 1 | consensus::OperationType::SPLIT_OP)); |
3446 | 1 | return Status::OK(); |
3447 | 2 | }; |
3448 | | |
3449 | 1 | ASSERT_OK(LoggedWaitFor([&]() -> Result<bool> { |
3450 | 1 | RETURN_NOT_OK(get_split_op_ids()); |
3451 | 1 | return split_op_ids[0].term > 0; |
3452 | 1 | }, kTimeout, "Waiting for the initial leader to add SPLIT_OP to Raft log")); |
3453 | 0 | LOG(INFO) << "split_op_ids: " << AsString(split_op_ids); |
3454 | 0 | ASSERT_EQ(split_op_ids[1], OpId()); |
3455 | 0 | ASSERT_EQ(split_op_ids[2], OpId()); |
3456 | 0 | const auto split_op_id_first_try = split_op_ids[0]; |
3457 | |
|
3458 | 0 | LOG(INFO) << "Stepping down initial tablet leader"; |
3459 | 0 | ASSERT_OK(LeaderStepDown(initial_leader, tablet_id_, nullptr, kTimeout)); |
3460 | |
|
3461 | 0 | TServerDetails* new_leader; |
3462 | 0 | ASSERT_OK(FindTabletLeader(tablet_servers_, tablet_id_, kTimeout, &new_leader)); |
3463 | 0 | LOG(INFO) << "New leader: " << new_leader->uuid(); |
3464 | |
|
3465 | 0 | ASSERT_OK(LoggedWaitFor( |
3466 | 0 | [&]() -> Result<bool> { |
3467 | 0 | RETURN_NOT_OK(get_split_op_ids()); |
3468 | 0 | for (const auto& split_op_id : split_op_ids) { |
3469 | 0 | if (split_op_id != OpId()) { |
3470 | 0 | return false; |
3471 | 0 | } |
3472 | 0 | } |
3473 | 0 | return true; |
3474 | 0 | }, |
3475 | 0 | kTimeout, "Waiting for SPLIT_OP to be aborted and split_op_id to be reset on all replicas")); |
3476 | 0 | split_latch.Wait(); |
3477 | |
|
3478 | 0 | LOG(INFO) << "Pause update consensus on the old leader"; |
3479 | 0 | for (auto* tserver : cluster_->tserver_daemons()) { |
3480 | 0 | if (tserver->uuid() == initial_leader->uuid()) { |
3481 | 0 | ASSERT_OK(pause_update_consensus(tserver, true)); |
3482 | 0 | } |
3483 | 0 | } |
3484 | |
|
3485 | 0 | LOG(INFO) << "Sending Split RPC to the new tablet leader"; |
3486 | 0 | rpc.Reset(); |
3487 | 0 | split_latch.Reset(1); |
3488 | 0 | req.set_dest_uuid(new_leader->uuid()); |
3489 | 0 | new_leader->tserver_admin_proxy->SplitTabletAsync(req, &resp, &rpc, [&split_latch, &resp]() { |
3490 | 0 | LOG(INFO) << "Split RPC response: " << AsString(resp); |
3491 | 0 | split_latch.CountDown(); |
3492 | 0 | }); |
3493 | |
|
3494 | 0 | ASSERT_OK(LoggedWaitFor([&]() -> Result<bool> { |
3495 | 0 | RETURN_NOT_OK(get_split_op_ids()); |
3496 | 0 | for (size_t i = 0; i < tservers.size(); ++i) { |
3497 | 0 | if (tservers[i]->uuid() == new_leader->uuid() && split_op_ids[i].index > 0) { |
3498 | 0 | return true; |
3499 | 0 | } |
3500 | 0 | } |
3501 | 0 | return false; |
3502 | 0 | }, kTimeout, "Waiting for the new leader to add SPLIT_OP to Raft log")); |
3503 | 0 | LOG(INFO) << "split_op_ids: " << AsString(split_op_ids); |
3504 | | |
3505 | | // Make sure followers have split_op_id not yet set. |
3506 | 0 | for (size_t i = 0; i < tservers.size(); ++i) { |
3507 | 0 | if (tservers[i]->uuid() != new_leader->uuid()) { |
3508 | 0 | ASSERT_EQ(split_op_ids[i], OpId()); |
3509 | 0 | } |
3510 | 0 | } |
3511 | |
|
3512 | 0 | LOG(INFO) << "Resume update consensus on all replicas"; |
3513 | 0 | for (auto* tserver : cluster_->tserver_daemons()) { |
3514 | 0 | ASSERT_OK(pause_update_consensus(tserver, false)); |
3515 | 0 | } |
3516 | |
|
3517 | 0 | ASSERT_OK(LoggedWaitFor([&]() -> Result<bool> { |
3518 | 0 | RETURN_NOT_OK(get_split_op_ids()); |
3519 | 0 | for (auto& split_op_id : split_op_ids) { |
3520 | 0 | if (split_op_id == OpId()) { |
3521 | 0 | return false; |
3522 | 0 | } |
3523 | 0 | } |
3524 | 0 | return true; |
3525 | 0 | }, kTimeout, "Waiting for all replicas to add SPLIT_OP to Raft log")); |
3526 | 0 | LOG(INFO) << "split_op_ids: " << AsString(split_op_ids); |
3527 | |
|
3528 | 0 | for (auto& split_op_id : split_op_ids) { |
3529 | 0 | ASSERT_EQ(split_op_id, split_op_ids[0]); |
3530 | 0 | } |
3531 | 0 | ASSERT_GT(split_op_ids[0].term, split_op_id_first_try.term); |
3532 | 0 | ASSERT_GT(split_op_ids[0].index, split_op_id_first_try.index); |
3533 | |
|
3534 | 0 | split_latch.Wait(); |
3535 | 0 | } |
3536 | | |
3537 | | } // namespace tserver |
3538 | | } // namespace yb |