YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/integration-tests/raft_consensus-itest.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include <regex>
34
#include <unordered_map>
35
#include <unordered_set>
36
37
#include <boost/optional.hpp>
38
#include <glog/logging.h>
39
#include <glog/stl_logging.h>
40
#include <gtest/gtest.h>
41
42
#include "yb/client/client.h"
43
#include "yb/client/error.h"
44
#include "yb/client/session.h"
45
#include "yb/client/table_handle.h"
46
#include "yb/client/yb_op.h"
47
48
#include "yb/common/partition.h"
49
#include "yb/common/ql_type.h"
50
#include "yb/common/schema.h"
51
#include "yb/common/wire_protocol-test-util.h"
52
#include "yb/common/wire_protocol.h"
53
54
#include "yb/consensus/consensus.pb.h"
55
#include "yb/consensus/consensus.proxy.h"
56
#include "yb/consensus/consensus_peers.h"
57
#include "yb/consensus/consensus_types.pb.h"
58
#include "yb/consensus/metadata.pb.h"
59
#include "yb/consensus/opid_util.h"
60
#include "yb/consensus/quorum_util.h"
61
62
#include "yb/docdb/doc_key.h"
63
#include "yb/docdb/value_type.h"
64
65
#include "yb/gutil/map-util.h"
66
#include "yb/gutil/strings/strcat.h"
67
#include "yb/gutil/strings/util.h"
68
69
#include "yb/integration-tests/cluster_verifier.h"
70
#include "yb/integration-tests/external_mini_cluster.h"
71
#include "yb/integration-tests/external_mini_cluster_fs_inspector.h"
72
#include "yb/integration-tests/test_workload.h"
73
#include "yb/integration-tests/ts_itest-base.h"
74
75
#include "yb/master/master_client.proxy.h"
76
#include "yb/master/master_cluster.proxy.h"
77
78
#include "yb/rpc/messenger.h"
79
#include "yb/rpc/proxy.h"
80
#include "yb/rpc/rpc_controller.h"
81
#include "yb/rpc/rpc_test_util.h"
82
83
#include "yb/server/hybrid_clock.h"
84
#include "yb/server/server_base.proxy.h"
85
86
#include "yb/tserver/tserver_admin.proxy.h"
87
#include "yb/tserver/tserver_service.proxy.h"
88
89
#include "yb/util/oid_generator.h"
90
#include "yb/util/opid.pb.h"
91
#include "yb/util/scope_exit.h"
92
#include "yb/util/size_literals.h"
93
#include "yb/util/status_log.h"
94
#include "yb/util/stopwatch.h"
95
#include "yb/util/thread.h"
96
#include "yb/util/tsan_util.h"
97
98
using namespace std::literals;
99
100
DEFINE_int32(num_client_threads, 8,
101
             "Number of client threads to launch");
102
DEFINE_int32(client_inserts_per_thread, 50,
103
             "Number of rows inserted by each client thread");
104
DEFINE_int32(client_num_batches_per_thread, 5,
105
             "In how many batches to group the rows, for each client");
106
DECLARE_int32(consensus_rpc_timeout_ms);
107
DECLARE_int32(leader_lease_duration_ms);
108
DECLARE_int32(ht_lease_duration_ms);
109
DECLARE_int32(rpc_timeout);
110
111
METRIC_DECLARE_entity(tablet);
112
METRIC_DECLARE_counter(not_leader_rejections);
113
METRIC_DECLARE_gauge_int64(raft_term);
114
METRIC_DECLARE_counter(log_cache_disk_reads);
115
METRIC_DECLARE_gauge_int64(log_cache_num_ops);
116
117
namespace yb {
118
namespace tserver {
119
120
using std::unordered_map;
121
using std::unordered_set;
122
using std::vector;
123
using std::shared_ptr;
124
125
using client::YBSession;
126
using client::YBTable;
127
using client::YBTableName;
128
using consensus::ConsensusRequestPB;
129
using consensus::ConsensusResponsePB;
130
using consensus::ConsensusServiceProxy;
131
using consensus::MajoritySize;
132
using consensus::MakeOpId;
133
using consensus::PeerMemberType;
134
using consensus::RaftPeerPB;
135
using consensus::ReplicateMsg;
136
using consensus::LeaderLeaseCheckMode;
137
using docdb::KeyValuePairPB;
138
using docdb::SubDocKey;
139
using docdb::DocKey;
140
using docdb::PrimitiveValue;
141
using itest::AddServer;
142
using itest::GetReplicaStatusAndCheckIfLeader;
143
using itest::LeaderStepDown;
144
using itest::TabletServerMap;
145
using itest::TabletServerMapUnowned;
146
using itest::TServerDetails;
147
using itest::RemoveServer;
148
using itest::StartElection;
149
using itest::WaitUntilNumberOfAliveTServersEqual;
150
using itest::WaitUntilLeader;
151
using itest::WriteSimpleTestRow;
152
using master::GetTabletLocationsRequestPB;
153
using master::GetTabletLocationsResponsePB;
154
using master::TabletLocationsPB;
155
using rpc::RpcController;
156
using server::SetFlagRequestPB;
157
using server::SetFlagResponsePB;
158
using server::HybridClock;
159
using server::ClockPtr;
160
using strings::Substitute;
161
162
static const int kConsensusRpcTimeoutForTests = 50;
163
164
static const int kTestRowKey = 1234;
165
static const int kTestRowIntVal = 5678;
166
167
// Integration test for the raft consensus implementation.
168
// Uses the whole tablet server stack with ExternalMiniCluster.
169
class RaftConsensusITest : public TabletServerIntegrationTestBase {
170
 public:
171
  RaftConsensusITest()
172
      : inserters_(FLAGS_num_client_threads),
173
46
        clock_(new HybridClock()) {
174
46
    CHECK_OK(clock_->Init());
175
46
  }
176
177
46
  void SetUp() override {
178
46
    TabletServerIntegrationTestBase::SetUp();
179
46
    FLAGS_consensus_rpc_timeout_ms = kConsensusRpcTimeoutForTests;
180
46
  }
181
182
  void ScanReplica(TabletServerServiceProxy* replica_proxy,
183
4
                   vector<string>* results) {
184
185
4
    ReadRequestPB req;
186
4
    ReadResponsePB resp;
187
4
    RpcController rpc;
188
4
    rpc.set_timeout(MonoDelta::FromSeconds(10));  // Squelch warnings.
189
190
4
    req.set_tablet_id(tablet_id_);
191
4
    req.set_consistency_level(YBConsistencyLevel::CONSISTENT_PREFIX);
192
4
    auto batch = req.add_ql_batch();
193
4
    batch->set_schema_version(0);
194
4
    int id = kFirstColumnId;
195
4
    auto rsrow = batch->mutable_rsrow_desc();
196
12
    for (const auto& col : schema_.columns()) {
197
12
      batch->add_selected_exprs()->set_column_id(id);
198
12
      batch->mutable_column_refs()->add_ids(id);
199
12
      auto coldesc = rsrow->add_rscol_descs();
200
12
      coldesc->set_name(col.name());
201
12
      col.type()->ToQLTypePB(coldesc->mutable_ql_type());
202
12
      ++id;
203
12
    }
204
205
    // Send the call.
206
4
    {
207
4
      SCOPED_TRACE(req.DebugString());
208
4
      ASSERT_OK(replica_proxy->Read(req, &resp, &rpc));
209
4
      SCOPED_TRACE(resp.DebugString());
210
4
      if (resp.has_error()) {
211
0
        ASSERT_OK(StatusFromPB(resp.error().status()));
212
0
      }
213
4
    }
214
215
4
    Schema schema(client::MakeColumnSchemasFromColDesc(rsrow->rscol_descs()), 0);
216
4
    QLRowBlock result(schema);
217
4
    Slice data = ASSERT_RESULT(rpc.GetSidecar(0));
218
4
    if (!data.empty()) {
219
4
      ASSERT_OK(result.Deserialize(QLClient::YQL_CLIENT_CQL, &data));
220
4
    }
221
10
    for (const auto& row : result.rows()) {
222
10
      results->push_back(row.ToString());
223
10
    }
224
225
4
    std::sort(results->begin(), results->end());
226
4
  }
227
228
  // Scan the given replica in a loop until the number of rows
229
  // is 'expected_count'. If it takes more than 10 seconds, then
230
  // fails the test.
231
  void WaitForRowCount(TabletServerServiceProxy* replica_proxy,
232
                       size_t expected_count,
233
3
                       vector<string>* results) {
234
3
    LOG(INFO) << "Waiting for row count " << expected_count << "...";
235
3
    MonoTime start = MonoTime::Now();
236
3
    MonoTime deadline = MonoTime::Now();
237
3
    deadline.AddDelta(MonoDelta::FromSeconds(10));
238
3
    while (true) {
239
3
      results->clear();
240
3
      ASSERT_NO_FATALS(ScanReplica(replica_proxy, results));
241
3
      if (results->size() == expected_count) {
242
3
        return;
243
3
      }
244
0
      SleepFor(MonoDelta::FromMilliseconds(10));
245
0
      if (!MonoTime::Now().ComesBefore(deadline)) {
246
0
        break;
247
0
      }
248
0
    }
249
0
    MonoTime end = MonoTime::Now();
250
0
    LOG(WARNING) << "Didn't reach row count " << expected_count;
251
0
    FAIL() << "Did not reach expected row count " << expected_count
252
0
           << " after " << end.GetDeltaSince(start).ToString()
253
0
           << ": rows: " << *results;
254
3
  }
255
256
  // Add an Insert operation to the given consensus request.
257
  // The row to be inserted is generated based on the OpId.
258
  void AddOp(const OpIdPB& id, ConsensusRequestPB* req);
259
260
  string DumpToString(TServerDetails* leader,
261
                      const vector<string>& leader_results,
262
                      TServerDetails* replica,
263
0
                      const vector<string>& replica_results) {
264
0
    string ret = strings::Substitute("Replica results did not match the leaders."
265
0
                                     "\nLeader: $0\nReplica: $1. Results size "
266
0
                                     "L: $2 R: $3",
267
0
                                     leader->ToString(),
268
0
                                     replica->ToString(),
269
0
                                     leader_results.size(),
270
0
                                     replica_results.size());
271
0
272
0
    StrAppend(&ret, "Leader Results: \n");
273
0
    for (const string& result : leader_results) {
274
0
      StrAppend(&ret, result, "\n");
275
0
    }
276
0
277
0
    StrAppend(&ret, "Replica Results: \n");
278
0
    for (const string& result : replica_results) {
279
0
      StrAppend(&ret, result, "\n");
280
0
    }
281
0
282
0
    return ret;
283
0
  }
284
285
  void InsertTestRowsRemoteThread(int32_t first_row,
286
                                  int32_t count,
287
                                  int num_batches,
288
21
                                  const vector<CountDownLatch*>& latches) {
289
21
    client::TableHandle table;
290
21
    ASSERT_OK(table.Open(kTableName, client_.get()));
291
292
21
    shared_ptr<YBSession> session = client_->NewSession();
293
21
    session->SetTimeout(60s);
294
295
122
    for (int i = 0; i < num_batches; i++) {
296
101
      SCOPED_TRACE(Format("Batch: $0", i));
297
298
101
      auto first_row_in_batch = first_row + (i * count / num_batches);
299
101
      auto last_row_in_batch = first_row_in_batch + count / num_batches;
300
301
1.11k
      for (int j = first_row_in_batch; j < last_row_in_batch; j++) {
302
1.00k
        auto op = table.NewWriteOp(QLWriteRequestPB::QL_STMT_INSERT);
303
1.00k
        auto* const req = op->mutable_request();
304
1.00k
        QLAddInt32HashValue(req, j);
305
1.00k
        table.AddInt32ColumnValue(req, "int_val", j * 2);
306
1.00k
        table.AddStringColumnValue(req, "string_val", StringPrintf("hello %d", j));
307
1.00k
        session->Apply(op);
308
1.00k
      }
309
310
      // We don't handle write idempotency yet. (i.e making sure that when a leader fails
311
      // writes to it that were eventually committed by the new leader but un-ackd to the
312
      // client are not retried), so some errors are expected.
313
      // It's OK as long as the errors are Status::AlreadyPresent();
314
315
101
      int inserted = last_row_in_batch - first_row_in_batch;
316
317
101
      const auto flush_status = session->FlushAndGetOpsErrors();
318
101
      const auto& s = flush_status.status;
319
101
      if (PREDICT_FALSE(!s.ok())) {
320
0
        for (const auto& e : flush_status.errors) {
321
0
          ASSERT_TRUE(e->status().IsAlreadyPresent()) << "Unexpected error: " << e->status();
322
0
        }
323
0
        inserted -= flush_status.errors.size();
324
0
      }
325
326
101
      for (CountDownLatch* latch : latches) {
327
79
        latch->CountDown(inserted);
328
79
      }
329
101
    }
330
331
21
    inserters_.CountDown();
332
21
  }
333
334
  // Brings Chaos to a MiniTabletServer by introducing random delays. Does this by
335
  // pausing the daemon a random amount of time.
336
3
  void DelayInjectorThread(ExternalTabletServer* tablet_server, int timeout_msec) {
337
31
    while (inserters_.count() > 0) {
338
339
      // Adjust the value obtained from the normalized gauss. dist. so that we steal the lock
340
      // longer than the timeout a small (~5%) percentage of the times.
341
      // (95% corresponds to 1.64485, in a normalized (0,1) gaussian distribution).
342
28
      double sleep_time_usec = 1000 *
343
28
          ((random_.Normal(0, 1) * timeout_msec) / 1.64485);
344
345
28
      if (sleep_time_usec < 0) sleep_time_usec = 0;
346
347
      // Additionally only cause timeouts at all 50% of the time, otherwise sleep.
348
28
      double val = (rand() * 1.0) / RAND_MAX;  // NOLINT(runtime/threadsafe_fn)
349
28
      if (val < 0.5) {
350
14
        SleepFor(MonoDelta::FromMicroseconds(sleep_time_usec));
351
14
        continue;
352
14
      }
353
354
14
      ASSERT_OK(tablet_server->Pause());
355
7
      LOG_IF(INFO, sleep_time_usec > 0.0)
356
7
          << "Delay injector thread for TS " << tablet_server->instance_id().permanent_uuid()
357
7
          << " SIGSTOPped the ts, sleeping for " << sleep_time_usec << " usec...";
358
14
      SleepFor(MonoDelta::FromMicroseconds(sleep_time_usec));
359
14
      ASSERT_OK(tablet_server->Resume());
360
14
    }
361
3
  }
362
363
  // Thread which loops until '*finish' becomes true, trying to insert a row
364
  // on the given tablet server identified by 'replica_idx'.
365
  void StubbornlyWriteSameRowThread(int replica_idx, const AtomicBool* finish);
366
367
  // Stops the current leader of the configuration, runs leader election and then brings it back.
368
  // Before stopping the leader this pauses all follower nodes in regular intervals so that
369
  // we get an increased chance of stuff being pending.
370
3
  void StopOrKillLeaderAndElectNewOne() {
371
3
    bool kill = rand() % 2 == 0;  // NOLINT(runtime/threadsafe_fn)
372
373
3
    TServerDetails* old_leader;
374
3
    CHECK_OK(GetLeaderReplicaWithRetries(tablet_id_, &old_leader));
375
3
    ExternalTabletServer* old_leader_ets = cluster_->tablet_server_by_uuid(old_leader->uuid());
376
377
3
    vector<TServerDetails*> followers;
378
3
    GetOnlyLiveFollowerReplicas(tablet_id_, &followers);
379
380
6
    for (TServerDetails* ts : followers) {
381
6
      ExternalTabletServer* ets = cluster_->tablet_server_by_uuid(ts->uuid());
382
6
      CHECK_OK(ets->Pause());
383
6
      SleepFor(MonoDelta::FromMilliseconds(100));
384
6
    }
385
386
    // When all are paused also pause or kill the current leader. Since we've waited a bit
387
    // the old leader is likely to have operations that must be aborted.
388
3
    if (kill) {
389
2
      old_leader_ets->Shutdown();
390
1
    } else {
391
1
      CHECK_OK(old_leader_ets->Pause());
392
1
    }
393
394
    // Resume the replicas.
395
6
    for (TServerDetails* ts : followers) {
396
6
      ExternalTabletServer* ets = cluster_->tablet_server_by_uuid(ts->uuid());
397
6
      CHECK_OK(ets->Resume());
398
6
    }
399
400
    // Get the new leader.
401
3
    TServerDetails* new_leader;
402
3
    CHECK_OK(GetLeaderReplicaWithRetries(tablet_id_, &new_leader));
403
404
    // Bring the old leader back.
405
3
    if (kill) {
406
2
      CHECK_OK(old_leader_ets->Restart());
407
      // Wait until we have the same number of followers.
408
2
      auto initial_followers = followers.size();
409
2
      do {
410
2
        GetOnlyLiveFollowerReplicas(tablet_id_, &followers);
411
2
      } while (followers.size() < initial_followers);
412
1
    } else {
413
1
      CHECK_OK(old_leader_ets->Resume());
414
1
    }
415
3
  }
416
417
  // Writes 'num_writes' operations to the current leader. Each of the operations
418
  // has a payload of around `size_bytes`. Causes a gtest failure on error.
419
  void WriteOpsToLeader(int num_writes, size_t size_bytes);
420
421
  // Check for and restart any TS that have crashed.
422
  // Returns the number of servers restarted.
423
  int RestartAnyCrashedTabletServers();
424
425
  // Assert that no tablet servers have crashed.
426
  // Tablet servers that have been manually Shutdown() are allowed.
427
  void AssertNoTabletServersCrashed();
428
429
  Result<int64_t> GetNumLogCacheOpsReadFromDisk();
430
431
  // Ensure that a majority of servers is required for elections and writes.
432
  // This is done by pausing a majority and asserting that writes and elections fail,
433
  // then unpausing the majority and asserting that elections and writes succeed.
434
  // If fails, throws a gtest assertion.
435
  // Note: This test assumes all tablet servers listed in tablet_servers are voters.
436
  void AssertMajorityRequiredForElectionsAndWrites(const TabletServerMapUnowned& tablet_servers,
437
                                                   const string& leader_uuid);
438
439
  // Return the replicas of the specified 'tablet_id', as seen by the Master.
440
  Status GetTabletLocations(const string& tablet_id, const MonoDelta& timeout,
441
                            master::TabletLocationsPB* tablet_locations);
442
443
  enum WaitForLeader {
444
    NO_WAIT_FOR_LEADER = 0,
445
    WAIT_FOR_LEADER = 1
446
  };
447
448
  // Wait for the specified number of replicas to be reported by the master for
449
  // the given tablet. Fails with an assertion if the timeout expires.
450
  void WaitForReplicasReportedToMaster(int num_replicas, const string& tablet_id,
451
                                       const MonoDelta& timeout,
452
                                       WaitForLeader wait_for_leader,
453
                                       bool* has_leader,
454
                                       master::TabletLocationsPB* tablet_locations);
455
456
  static const bool WITH_NOTIFICATION_LATENCY = true;
457
  static const bool WITHOUT_NOTIFICATION_LATENCY = false;
458
  void DoTestChurnyElections(bool with_latency);
459
460
 protected:
461
  // Flags needed for CauseFollowerToFallBehindLogGC() to work well.
462
  void AddFlagsForLogRolls(vector<string>* extra_tserver_flags);
463
464
  // Pause one of the followers and write enough data to the remaining replicas
465
  // to cause log GC, then resume the paused follower. On success,
466
  // 'leader_uuid' will be set to the UUID of the leader, 'orig_term' will be
467
  // set to the term of the leader before un-pausing the follower, and
468
  // 'fell_behind_uuid' will be set to the UUID of the follower that was paused
469
  // and caused to fall behind. These can be used for verification purposes.
470
  //
471
  // Certain flags should be set. You can add the required flags with
472
  // AddFlagsForLogRolls() before starting the cluster.
473
  void CauseFollowerToFallBehindLogGC(string* leader_uuid,
474
                                      int64_t* orig_term,
475
                                      string* fell_behind_uuid);
476
477
  void TestAddRemoveServer(PeerMemberType member_type);
478
  void TestRemoveTserverFailsWhenServerInTransition(PeerMemberType member_type);
479
  void TestRemoveTserverInTransitionSucceeds(PeerMemberType member_type);
480
481
  std::vector<scoped_refptr<yb::Thread> > threads_;
482
  CountDownLatch inserters_;
483
  ClockPtr clock_;
484
};
485
486
3
void RaftConsensusITest::AddFlagsForLogRolls(vector<string>* extra_tserver_flags) {
487
  // We configure a small log segment size so that we roll frequently,
488
  // configure a small cache size so that we evict data from the cache, and
489
  // retain as few segments as possible. We also turn off async segment
490
  // allocation -- this ensures that we roll many segments of logs (with async
491
  // allocation, it's possible that the preallocation is slow and we wouldn't
492
  // roll deterministically).
493
3
  extra_tserver_flags->push_back("--log_cache_size_limit_mb=1");
494
3
  extra_tserver_flags->push_back("--log_segment_size_mb=1");
495
3
  extra_tserver_flags->push_back("--log_async_preallocate_segments=false");
496
3
  extra_tserver_flags->push_back("--log_min_segments_to_retain=1");
497
3
  extra_tserver_flags->push_back("--log_min_seconds_to_retain=0");
498
3
  extra_tserver_flags->push_back("--maintenance_manager_polling_interval_ms=100");
499
3
  extra_tserver_flags->push_back("--db_write_buffer_size=100000");
500
3
}
501
502
// Test that we can retrieve the permanent uuid of a server running
503
// consensus service via RPC.
504
1
TEST_F(RaftConsensusITest, TestGetPermanentUuid) {
505
1
  ASSERT_NO_FATALS(BuildAndStart(vector<string>()));
506
507
1
  TServerDetails* leader = nullptr;
508
1
  ASSERT_OK(GetLeaderReplicaWithRetries(tablet_id_, &leader));
509
1
  const string expected_uuid = leader->instance_id.permanent_uuid();
510
511
1
  rpc::MessengerBuilder builder("test builder");
512
1
  builder.set_num_reactors(1);
513
1
  auto messenger = rpc::CreateAutoShutdownMessengerHolder(ASSERT_RESULT(builder.Build()));
514
1
  rpc::ProxyCache proxy_cache(messenger.get());
515
516
  // Set a decent timeout for allowing the masters to find eachother.
517
1
  const auto kTimeout = 30s;
518
1
  std::vector<HostPort> endpoints;
519
1
  for (const auto& hp : leader->registration->common().private_rpc_addresses()) {
520
1
    endpoints.push_back(HostPortFromPB(hp));
521
1
  }
522
1
  RaftPeerPB peer;
523
1
  ASSERT_OK(consensus::SetPermanentUuidForRemotePeer(&proxy_cache, kTimeout, endpoints, &peer));
524
1
  ASSERT_EQ(expected_uuid, peer.permanent_uuid());
525
1
}
526
527
// TODO allow the scan to define an operation id, fetch the last id
528
// from the leader and then use that id to make the replica wait
529
// until it is done. This will avoid the sleeps below.
530
1
TEST_F(RaftConsensusITest, TestInsertAndMutateThroughConsensus) {
531
1
  ASSERT_NO_FATALS(BuildAndStart(vector<string>()));
532
533
1
  int num_iters = AllowSlowTests() ? 10 : 1;
534
535
2
  for (int i = 0; i < num_iters; i++) {
536
1
    ASSERT_NO_FATALS(InsertTestRowsRemoteThread(
537
1
        i * FLAGS_client_inserts_per_thread,
538
1
        FLAGS_client_inserts_per_thread,
539
1
        FLAGS_client_num_batches_per_thread,
540
1
        vector<CountDownLatch*>()));
541
1
  }
542
1
  ASSERT_ALL_REPLICAS_AGREE(FLAGS_client_inserts_per_thread * num_iters);
543
1
}
544
545
1
TEST_F(RaftConsensusITest, TestFailedOperation) {
546
1
  ASSERT_NO_FATALS(BuildAndStart(vector<string>()));
547
548
  // Wait until we have a stable leader.
549
1
  ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(10), tablet_servers_,
550
1
                                  tablet_id_, 1));
551
552
1
  WriteResponsePB resp;
553
1
  RpcController controller;
554
1
  controller.set_timeout(MonoDelta::FromSeconds(FLAGS_rpc_timeout));
555
556
1
  TServerDetails* leader = nullptr;
557
1
  ASSERT_OK(GetLeaderReplicaWithRetries(tablet_id_, &leader));
558
559
1
  WriteRequestPB req;
560
1
  req.set_tablet_id(tablet_id_);
561
1
  req.add_ql_write_batch()->set_schema_version(123);
562
563
1
  ASSERT_OK(DCHECK_NOTNULL(leader->tserver_proxy.get())->Write(req, &resp, &controller));
564
2
  ASSERT_NE(QLResponsePB::YQL_STATUS_OK, resp.ql_response_batch(0).status())
565
2
      << "Response: " << resp.ShortDebugString();
566
567
  // Add a proper row so that we can verify that all of the replicas continue
568
  // to process transactions after a failure. Additionally, this allows us to wait
569
  // for all of the replicas to finish processing transactions before shutting down,
570
  // avoiding a potential stall as we currently can't abort transactions (see KUDU-341).
571
572
1
  req.Clear();
573
1
  req.set_tablet_id(tablet_id_);
574
1
  AddTestRowInsert(0, 0, "original0", &req);
575
576
1
  controller.Reset();
577
1
  controller.set_timeout(MonoDelta::FromSeconds(FLAGS_rpc_timeout));
578
579
1
  ASSERT_OK(DCHECK_NOTNULL(leader->tserver_proxy.get())->Write(req, &resp, &controller));
580
1
  SCOPED_TRACE(resp.ShortDebugString());
581
1
  ASSERT_FALSE(resp.has_error());
582
583
1
  ASSERT_ALL_REPLICAS_AGREE(1);
584
1
}
585
586
// Inserts rows through consensus and also starts one delay injecting thread
587
// that steals consensus peer locks for a while. This is meant to test that
588
// even with timeouts and repeated requests consensus still works.
589
1
TEST_F(RaftConsensusITest, MultiThreadedMutateAndInsertThroughConsensus) {
590
1
  ASSERT_NO_FATALS(BuildAndStart(vector<string>()));
591
592
1
  if (500 == FLAGS_client_inserts_per_thread) {
593
0
    if (AllowSlowTests()) {
594
0
      FLAGS_client_inserts_per_thread = FLAGS_client_inserts_per_thread * 10;
595
0
      FLAGS_client_num_batches_per_thread = FLAGS_client_num_batches_per_thread * 10;
596
0
    }
597
0
  }
598
599
1
  int num_threads = FLAGS_num_client_threads;
600
9
  for (int i = 0; i < num_threads; i++) {
601
8
    scoped_refptr<yb::Thread> new_thread;
602
8
    CHECK_OK(yb::Thread::Create("test", strings::Substitute("ts-test$0", i),
603
8
                                  &RaftConsensusITest::InsertTestRowsRemoteThread,
604
8
                                  this, i * FLAGS_client_inserts_per_thread,
605
8
                                  FLAGS_client_inserts_per_thread,
606
8
                                  FLAGS_client_num_batches_per_thread,
607
8
                                  vector<CountDownLatch*>(),
608
8
                                  &new_thread));
609
8
    threads_.push_back(new_thread);
610
8
  }
611
4
  for (int i = 0; i < FLAGS_num_replicas; i++) {
612
3
    scoped_refptr<yb::Thread> new_thread;
613
3
    CHECK_OK(yb::Thread::Create("test", strings::Substitute("chaos-test$0", i),
614
3
                                  &RaftConsensusITest::DelayInjectorThread,
615
3
                                  this, cluster_->tablet_server(i),
616
3
                                  kConsensusRpcTimeoutForTests,
617
3
                                  &new_thread));
618
3
    threads_.push_back(new_thread);
619
3
  }
620
11
  for (scoped_refptr<yb::Thread> thr : threads_) {
621
11
    CHECK_OK(ThreadJoiner(thr.get()).Join());
622
11
  }
623
624
1
  ASSERT_ALL_REPLICAS_AGREE(FLAGS_client_inserts_per_thread * FLAGS_num_client_threads);
625
1
}
626
627
1
TEST_F(RaftConsensusITest, TestReadOnNonLeader) {
628
1
  ASSERT_NO_FATALS(BuildAndStart(vector<string>()));
629
630
  // Wait for the initial leader election to complete.
631
1
  ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(10), tablet_servers_,
632
1
                                  tablet_id_, 1));
633
634
  // By default reads should be allowed only on the leader.
635
1
  ReadRequestPB req;
636
1
  ReadResponsePB resp;
637
1
  RpcController rpc;
638
1
  req.set_tablet_id(tablet_id_);
639
640
  // Perform a read on one of the followers.
641
1
  vector<TServerDetails*> followers;
642
1
  GetOnlyLiveFollowerReplicas(tablet_id_, &followers);
643
644
2
  for (const auto& follower : followers) {
645
2
    rpc.Reset();
646
2
    ASSERT_OK(follower->tserver_proxy->Read(req, &resp, &rpc));
647
2
    ASSERT_TRUE(resp.has_error());
648
2
    ASSERT_EQ(TabletServerErrorPB_Code_NOT_THE_LEADER, resp.error().code());
649
2
  }
650
1
}
651
652
1
TEST_F(RaftConsensusITest, TestInsertOnNonLeader) {
653
1
  ASSERT_NO_FATALS(BuildAndStart(vector<string>()));
654
655
  // Wait for the initial leader election to complete.
656
1
  ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(10), tablet_servers_,
657
1
                                  tablet_id_, 1));
658
659
  // Manually construct a write RPC to a replica and make sure it responds
660
  // with the correct error code.
661
1
  WriteRequestPB req;
662
1
  WriteResponsePB resp;
663
1
  RpcController rpc;
664
1
  req.set_tablet_id(tablet_id_);
665
1
  AddTestRowInsert(kTestRowKey, kTestRowIntVal, "hello world via RPC", &req);
666
667
  // Get the leader.
668
1
  vector<TServerDetails*> followers;
669
1
  GetOnlyLiveFollowerReplicas(tablet_id_, &followers);
670
671
1
  ASSERT_OK(followers[0]->tserver_proxy->Write(req, &resp, &rpc));
672
1
  SCOPED_TRACE(resp.DebugString());
673
1
  ASSERT_TRUE(resp.has_error());
674
1
  Status s = StatusFromPB(resp.error().status());
675
1
  EXPECT_TRUE(s.IsIllegalState());
676
1
  ASSERT_STR_CONTAINS(s.message().ToBuffer(), "Not the leader");
677
  // TODO: need to change the error code to be something like REPLICA_NOT_LEADER
678
  // so that the client can properly handle this case! plumbing this is a little difficult
679
  // so not addressing at the moment.
680
1
  ASSERT_ALL_REPLICAS_AGREE(0);
681
1
}
682
683
1
TEST_F(RaftConsensusITest, TestRunLeaderElection) {
684
  // Reset consensus rpc timeout to the default value or the election might fail often.
685
1
  FLAGS_consensus_rpc_timeout_ms = 1000;
686
687
1
  ASSERT_NO_FATALS(BuildAndStart(vector<string>()));
688
689
1
  int num_iters = AllowSlowTests() ? 10 : 1;
690
691
1
  ASSERT_NO_FATALS(InsertTestRowsRemoteThread(
692
1
      0,
693
1
      FLAGS_client_inserts_per_thread * num_iters,
694
1
      FLAGS_client_num_batches_per_thread,
695
1
      vector<CountDownLatch*>()));
696
697
1
  ASSERT_ALL_REPLICAS_AGREE(FLAGS_client_inserts_per_thread * num_iters);
698
699
  // Select the last follower to be new leader.
700
0
  vector<TServerDetails*> followers;
701
0
  GetOnlyLiveFollowerReplicas(tablet_id_, &followers);
702
703
  // Now shutdown the current leader.
704
0
  TServerDetails* leader = DCHECK_NOTNULL(GetLeaderReplicaOrNull(tablet_id_));
705
0
  ExternalTabletServer* leader_ets = cluster_->tablet_server_by_uuid(leader->uuid());
706
0
  leader_ets->Shutdown();
707
708
0
  TServerDetails* replica = followers.back();
709
0
  CHECK_NE(leader->instance_id.permanent_uuid(), replica->instance_id.permanent_uuid());
710
711
  // Make the new replica leader.
712
0
  ASSERT_OK(StartElection(replica, tablet_id_, MonoDelta::FromSeconds(10)));
713
714
  // Insert a bunch more rows.
715
0
  ASSERT_NO_FATALS(InsertTestRowsRemoteThread(
716
0
      FLAGS_client_inserts_per_thread * num_iters,
717
0
      FLAGS_client_inserts_per_thread * num_iters,
718
0
      FLAGS_client_num_batches_per_thread,
719
0
      vector<CountDownLatch*>()));
720
721
  // Restart the original replica and make sure they all agree.
722
0
  ASSERT_OK(leader_ets->Restart());
723
724
0
  ASSERT_ALL_REPLICAS_AGREE(FLAGS_client_inserts_per_thread * num_iters * 2);
725
0
}
726
727
5
void RaftConsensusITest::WriteOpsToLeader(int num_writes, size_t size_bytes) {
728
5
  TServerDetails* leader = nullptr;
729
5
  ASSERT_OK(GetLeaderReplicaWithRetries(tablet_id_, &leader));
730
731
5
  WriteRequestPB req;
732
5
  WriteResponsePB resp;
733
5
  RpcController rpc;
734
5
  rpc.set_timeout(MonoDelta::FromMilliseconds(10000));
735
5
  int key = 0;
736
737
  // generate dummy payload.
738
5
  string test_payload(size_bytes, '0');
739
4.03k
  for (int i = 0; i < num_writes; i++) {
740
4.02k
    rpc.Reset();
741
4.02k
    req.Clear();
742
4.02k
    req.set_tablet_id(tablet_id_);
743
4.02k
    AddTestRowInsert(key, key, test_payload, &req);
744
4.02k
    key++;
745
4.02k
    ASSERT_OK(leader->tserver_proxy->Write(req, &resp, &rpc));
746
747
8.05k
    ASSERT_FALSE(resp.has_error()) << resp.DebugString();
748
4.02k
  }
749
5
}
750
751
// Test that when a follower is stopped for a long time, the log cache
752
// properly evicts operations, but still allows the follower to catch
753
// up when it comes back.
754
1
TEST_F(RaftConsensusITest, TestCatchupAfterOpsEvicted) {
755
1
  vector<string> extra_flags;
756
1
  extra_flags.push_back("--log_cache_size_limit_mb=1");
757
1
  extra_flags.push_back("--rpc_throttle_threshold_bytes=-1");
758
1
  extra_flags.push_back("--consensus_max_batch_size_bytes=500000");
759
1
  ASSERT_NO_FATALS(BuildAndStart(extra_flags));
760
1
  TServerDetails* replica = (*tablet_replicas_.begin()).second;
761
1
  ASSERT_TRUE(replica != nullptr);
762
1
  ExternalTabletServer* replica_ets = cluster_->tablet_server_by_uuid(replica->uuid());
763
764
  // Pause a replica.
765
1
  ASSERT_OK(replica_ets->Pause());
766
1
  LOG(INFO)<< "Paused one of the replicas, starting to write.";
767
768
  // Insert 3MB worth of data.
769
1
  const int kNumWrites = 25;
770
1
  ASSERT_NO_FATALS(WriteOpsToLeader(kNumWrites, 128_KB));
771
772
  // Now unpause the replica, the lagging replica should eventually catch back up.
773
1
  ASSERT_OK(replica_ets->Resume());
774
775
1
  ASSERT_ALL_REPLICAS_AGREE(kNumWrites);
776
1
}
777
778
3
Result<int64_t> RaftConsensusITest::GetNumLogCacheOpsReadFromDisk() {
779
3
  TServerDetails* leader = nullptr;
780
3
  RETURN_NOT_OK(GetLeaderReplicaWithRetries(tablet_id_, &leader));
781
3
  return cluster_->tablet_server_by_uuid(leader->uuid())->GetInt64Metric(
782
3
      &METRIC_ENTITY_tablet,
783
3
      nullptr,
784
3
      &METRIC_log_cache_disk_reads,
785
3
      "value");
786
3
}
787
788
// Test that when a follower is stopped for a long time, the log cache
789
// reads few ops from disk due to exponential backoff on number of ops
790
// to replicate to an unresponsive follower.
791
1
TEST_F(RaftConsensusITest, TestCatchupOpsReadFromDisk) {
792
1
  vector<string> extra_flags = {
793
1
      "--log_cache_size_limit_mb=1"s,
794
1
      "--enable_consensus_exponential_backoff=true"s
795
1
  };
796
1
  ASSERT_NO_FATALS(BuildAndStart(extra_flags));
797
1
  TServerDetails* replica = tablet_replicas_.begin()->second;
798
1
  ASSERT_TRUE(replica != nullptr);
799
1
  ExternalTabletServer* replica_ets = cluster_->tablet_server_by_uuid(replica->uuid());
800
801
  // Pause a replica.
802
1
  ASSERT_OK(replica_ets->Pause());
803
1
  LOG(INFO) << "Paused replica " << replica->uuid() << ", starting to write.";
804
805
  // Insert 3MB worth of data.
806
1
  const int kNumWrites = 1000;
807
1
  ASSERT_NO_FATALS(WriteOpsToLeader(kNumWrites, 3_KB));
808
809
  // Allow for unsuccessful replication attempts to lagging follower.
810
1
  SleepFor(5s);
811
812
  // Confirm that in the steady state the leader is not reading any new ops from disk since it has
813
  // reached 0 and is sending only NOOP.
814
1
  auto ops_before = ASSERT_RESULT(GetNumLogCacheOpsReadFromDisk());
815
1
  SleepFor(10s);
816
1
  auto ops_after = ASSERT_RESULT(GetNumLogCacheOpsReadFromDisk());
817
1
  EXPECT_EQ(ops_before, ops_after);
818
819
  // Now unpause the replica, the lagging replica should eventually catch back up.
820
1
  ASSERT_OK(replica_ets->Resume());
821
822
  // Wait for successful replication to resumed follower.
823
1
  ASSERT_OK(WaitForServersToAgree(30s, tablet_servers_, tablet_id_,
824
1
                                  kNumWrites));
825
826
1
  auto final_ops_read_from_disk = ASSERT_RESULT(GetNumLogCacheOpsReadFromDisk());
827
1
  LOG(INFO)<< "Ops read from disk: " << final_ops_read_from_disk;
828
829
  // NOTE: empirically determined threshold.
830
1
  const int kOpsReadFromDiskThreshold = kNumWrites * 2;
831
1
  ASSERT_LE(final_ops_read_from_disk, kOpsReadFromDiskThreshold);
832
1
}
833
834
// Test that when a follower is paused so that it misses several writes, is resumed
835
// briefly such that it only receives some of the entries from the leader but
836
// becomes aware of the highest committed op id, it is able to bootstrap successfully
837
// after a restart.
838
1
TEST_F(RaftConsensusITest, TestLaggingFollowerRestart) {
839
1
  vector<string> extra_flags = {
840
1
      "--consensus_inject_latency_ms_in_notifications=10"s,
841
1
      "--rpc_throttle_threshold_bytes=-1"s,
842
1
      "--consensus_max_batch_size_bytes=1024"s
843
1
  };
844
1
  ASSERT_NO_FATALS(BuildAndStart(extra_flags));
845
846
  // Find leader.
847
1
  TServerDetails* leader = nullptr;
848
1
  ASSERT_OK(FindTabletLeader(tablet_servers_, tablet_id_, 30s, &leader));
849
1
  CHECK_NOTNULL(leader);
850
851
  // Find a follower to pause, resume, and restart.
852
1
  TServerDetails* replica = nullptr;
853
1
  for (const auto& tablet_replica : tablet_replicas_) {
854
1
      TServerDetails* ts_details = tablet_replica.second;
855
1
      if (ts_details->uuid() != leader->uuid()) {
856
1
        replica = ts_details;
857
1
        break;
858
1
      }
859
1
  }
860
1
  CHECK_NOTNULL(replica);
861
1
  ASSERT_NE(replica->uuid(), leader->uuid());
862
863
1
  ExternalTabletServer* replica_ets = cluster_->tablet_server_by_uuid(replica->uuid());
864
865
  // Pause a replica.
866
1
  ASSERT_OK(replica_ets->Pause());
867
1
  LOG(INFO)<< "Paused replica " << replica->uuid() << ", starting to write.";
868
869
  // Insert 3MB worth of data.
870
1
  const int kNumWrites = 1000;
871
1
  ASSERT_NO_FATALS(WriteOpsToLeader(kNumWrites, 3_KB));
872
873
  // Allow for paused replica to lag.
874
1
  SleepFor(20s);
875
876
  // Now unpause the replica, the lagging replica should eventually catch back up.
877
1
  ASSERT_OK(replica_ets->Resume());
878
1
  LOG(INFO)<< "Resumed replica " << replica->uuid() << ".";
879
880
  // Wait for resumed follower to get some but not all entries.
881
1
  int64_t actual_minimum_index = consensus::kInvalidOpIdIndex;
882
1
  ASSERT_OK(WaitUntilAllReplicasHaveOp(kNumWrites / 4, tablet_id_,
883
1
                                       TServerDetailsVector(tablet_servers_), 10s,
884
1
                                       &actual_minimum_index));
885
1
  LOG(INFO) << "Replica " << replica->uuid() << " received " << actual_minimum_index;
886
1
  ASSERT_LE(actual_minimum_index, kNumWrites / 2);
887
888
1
  replica_ets->Shutdown();
889
1
  LOG(INFO)<< "Shutdown replica " << replica->uuid() << ".";
890
891
1
  CHECK_OK(replica_ets->Restart());
892
1
  LOG(INFO)<< "Restarted replica " << replica->uuid() << ".";
893
894
  // Wait for successful replication to restarted follower.
895
1
  ASSERT_OK(WaitForServersToAgree(60s, tablet_servers_, tablet_id_, kNumWrites));
896
1
}
897
898
// Test that when a follower is shutdown so that it misses several writes, the
899
// leader evicts almost all entries from log cache.
900
1
TEST_F(RaftConsensusITest, TestLaggingFollowerLogCacheEviction) {
901
1
  const auto kHeartBeatInterval = 1s;
902
1
  const int32_t kConsensusLaggingFollowerThreshold = 10;
903
904
1
  vector<string> extra_flags = {
905
1
      "--consensus_inject_latency_ms_in_notifications=10"s,
906
1
      "--rpc_throttle_threshold_bytes=-1"s,
907
1
      "--consensus_max_batch_size_bytes=1024"s,
908
1
      Format("--consensus_lagging_follower_threshold=$0", kConsensusLaggingFollowerThreshold),
909
1
      Format("--raft_heartbeat_interval_ms=$0", ToMilliseconds(kHeartBeatInterval))
910
1
  };
911
1
  ASSERT_NO_FATALS(BuildAndStart(extra_flags));
912
913
  // Find leader.
914
1
  TServerDetails* leader = nullptr;
915
1
  ASSERT_OK(FindTabletLeader(tablet_servers_, tablet_id_, 30s, &leader));
916
1
  CHECK_NOTNULL(leader);
917
918
  // Find a follower to pause, resume, and restart.
919
1
  TServerDetails* replica = nullptr;
920
2
  for (const auto& tablet_replica : tablet_replicas_) {
921
2
    TServerDetails* ts_details = tablet_replica.second;
922
2
    if (ts_details->uuid() != leader->uuid()) {
923
1
      replica = ts_details;
924
1
      break;
925
1
    }
926
2
  }
927
1
  CHECK_NOTNULL(replica);
928
1
  ASSERT_NE(replica->uuid(), leader->uuid());
929
930
1
  ExternalTabletServer* replica_ets = cluster_->tablet_server_by_uuid(replica->uuid());
931
932
  // Shutdown a replica.
933
1
  replica_ets->Shutdown();
934
1
  LOG(INFO)<< "Shutdown replica " << replica->uuid() << ".";
935
936
  // Insert 3MB worth of data.
937
1
  const int kNumWrites = 1000;
938
1
  ASSERT_NO_FATALS(WriteOpsToLeader(kNumWrites, 3_KB));
939
940
  // Allow for shutdown replica to be retransmitted to more than
941
  // consensus_lagging_follower_threshold times so that it can be marked as lagging.
942
  // Also use a multiplicative factor to reduce flakiness.
943
1
  SleepFor(kHeartBeatInterval * kConsensusLaggingFollowerThreshold * 2);
944
945
  // Allow for shutdown replica to lag.
946
1
  auto active_tablet_servers = CreateTabletServerMapUnowned(tablet_servers_);
947
1
  ASSERT_EQ(1, active_tablet_servers.erase(replica->uuid()));
948
1
  ASSERT_OK(WaitForServersToAgree(60s, active_tablet_servers, tablet_id_, kNumWrites));
949
950
  // Find log cache num ops at leader - value from non-leaders will be zero,
951
  // hence it is unncessary to omit values from followers.
952
1
  int64_t log_cache_num_ops = 0;
953
3
  for (const auto& tablet_replica : tablet_replicas_) {
954
    // Skip shutdown replica.
955
3
    if (tablet_replica.second->uuid() == replica->uuid()) {
956
1
      continue;
957
1
    }
958
959
2
    log_cache_num_ops += ASSERT_RESULT(
960
2
        cluster_->tablet_server_by_uuid(tablet_replica.second->uuid())->GetInt64Metric(
961
2
            &METRIC_ENTITY_tablet,
962
2
            nullptr,
963
2
            &METRIC_log_cache_num_ops,
964
2
            "value"));
965
2
  }
966
1
  LOG(INFO)<< "Num ops in log cache: " << log_cache_num_ops;
967
968
  // NOTE: empirically determined threshold.
969
1
  const int kLogCacheNumOpsThreshold = kNumWrites / 10;
970
1
  ASSERT_LE(log_cache_num_ops, kLogCacheNumOpsThreshold);
971
972
  // Now restart the replica shutdown earlier, the lagging replica should eventually catch back up.
973
1
  CHECK_OK(replica_ets->Restart());
974
1
  LOG(INFO)<< "Restarted replica " << replica->uuid() << ".";
975
976
  // Wait for successful replication to restarted follower.
977
1
  ASSERT_OK(WaitForServersToAgree(60s, tablet_servers_, tablet_id_, kNumWrites));
978
1
}
979
980
1
TEST_F(RaftConsensusITest, TestAddRemoveNonVoter) {
981
1
  MonoDelta kTimeout = MonoDelta::FromSeconds(30);
982
1
  FLAGS_num_tablet_servers = 3;
983
1
  vector<string> ts_flags = {
984
1
    "--enable_leader_failure_detection=false"s,
985
1
    "--TEST_inject_latency_before_change_role_secs=1"s,
986
1
    "--follower_unavailable_considered_failed_sec=5"s,
987
1
  };
988
1
  vector<string> master_flags = {
989
1
    "--catalog_manager_wait_for_new_tablets_to_elect_leader=false"s,
990
1
    "--use_create_table_leader_hint=false"s,
991
1
  };
992
1
  ASSERT_NO_FATALS(BuildAndStart(ts_flags, master_flags));
993
994
1
  vector<TServerDetails*> tservers = TServerDetailsVector(tablet_servers_);
995
1
  ASSERT_EQ(FLAGS_num_tablet_servers, tservers.size());
996
997
  // Elect server 0 as leader and wait for log index 1 to propagate to all servers.
998
1
  TServerDetails* leader_tserver = tservers[0];
999
1
  const string& leader_uuid = tservers[0]->uuid();
1000
1
  ASSERT_OK(StartElection(leader_tserver, tablet_id_, kTimeout));
1001
1
  int64_t min_index = 1;
1002
1
  ASSERT_OK(WaitForServersToAgree(kTimeout, tablet_servers_, tablet_id_, min_index, &min_index));
1003
1
  ASSERT_OK(WaitUntilCommittedOpIdIndexIsAtLeast(&min_index, leader_tserver, tablet_id_, kTimeout));
1004
1005
  // Kill the master, so we can change the config without interference.
1006
1
  cluster_->master()->Shutdown();
1007
1008
1
  auto active_tablet_servers = CreateTabletServerMapUnowned(tablet_servers_);
1009
1010
  // Do majority correctness check for 3 servers.
1011
1
  ASSERT_NO_FATALS(AssertMajorityRequiredForElectionsAndWrites(active_tablet_servers, leader_uuid));
1012
1
  auto cur_log_index = ASSERT_RESULT(GetLastOpIdForReplica(
1013
1
      tablet_id_, leader_tserver, consensus::RECEIVED_OPID, kTimeout)).index;
1014
1015
  // Remove tablet server 2, so we can add it as an observer.
1016
1
  vector<int> remove_list = { 2, 1 };
1017
1
  LOG(INFO) << "Remove: Going from 3 voter replicas to 2 voter replicas";
1018
1019
1
  TServerDetails* tserver_to_remove = tservers[2];
1020
1
  LOG(INFO) << "Removing tserver with uuid " << tserver_to_remove->uuid();
1021
1
  ASSERT_OK(RemoveServer(leader_tserver, tablet_id_, tserver_to_remove, boost::none, kTimeout));
1022
1
  ASSERT_EQ(1, active_tablet_servers.erase(tserver_to_remove->uuid()));
1023
1
  ASSERT_OK(WaitForServersToAgree(kTimeout, active_tablet_servers, tablet_id_, ++cur_log_index));
1024
1025
  // Do majority correctness check for each incremental decrease.
1026
1
  ASSERT_NO_FATALS(AssertMajorityRequiredForElectionsAndWrites(active_tablet_servers, leader_uuid));
1027
1
  cur_log_index = ASSERT_RESULT(GetLastOpIdForReplica(
1028
1
      tablet_id_, leader_tserver, consensus::RECEIVED_OPID, kTimeout)).index;
1029
1030
  // Add the tablet server back as an observer.
1031
1
  LOG(INFO) << "Add: Creating a new read replica";
1032
1033
1
  TServerDetails* tserver_to_add = tservers[2];
1034
1
  LOG(INFO) << "Adding tserver with uuid " << tserver_to_add->uuid();
1035
1036
1
  ASSERT_OK(DeleteTablet(tserver_to_add, tablet_id_, tablet::TABLET_DATA_TOMBSTONED, boost::none,
1037
1
                         kTimeout));
1038
1039
1
  ASSERT_OK(AddServer(leader_tserver, tablet_id_, tserver_to_add, PeerMemberType::PRE_OBSERVER,
1040
1
                      boost::none, kTimeout));
1041
1042
1
  consensus::ConsensusStatePB cstate;
1043
1
  ASSERT_OK(itest::GetConsensusState(leader_tserver, tablet_id_,
1044
1
                                     consensus::CONSENSUS_CONFIG_COMMITTED, kTimeout, &cstate));
1045
1046
  // Verify that this tserver member type was set correctly.
1047
3
  for (const auto& peer : cstate.config().peers()) {
1048
3
    if (peer.permanent_uuid() == tserver_to_add->uuid()) {
1049
1
      ASSERT_EQ(PeerMemberType::PRE_OBSERVER, peer.member_type());
1050
1
    }
1051
3
  }
1052
1053
  // Wait until the ChangeConfig has finished and this tserver has been made a VOTER.
1054
1
  ASSERT_OK(WaitUntilCommittedConfigNumVotersIs(active_tablet_servers.size(), leader_tserver,
1055
1
                                                tablet_id_, MonoDelta::FromSeconds(10)));
1056
1
  ASSERT_OK(WaitUntilCommittedConfigMemberTypeIs(1, leader_tserver, tablet_id_,
1057
1
                                                 MonoDelta::FromSeconds(10),
1058
1
                                                 PeerMemberType::OBSERVER));
1059
1060
1
  ASSERT_OK(WaitForServersToAgree(kTimeout, active_tablet_servers, tablet_id_, ++cur_log_index));
1061
1062
  // Pause the read replica and assert that it gets removed from the configuration.
1063
1
  ASSERT_OK(cluster_->tablet_server_by_uuid(tserver_to_add->uuid())->Pause());
1064
1
  ASSERT_OK(WaitUntilCommittedConfigMemberTypeIs(0, leader_tserver, tablet_id_,
1065
1
                                                 MonoDelta::FromSeconds(20),
1066
1
                                                 PeerMemberType::OBSERVER));
1067
1
}
1068
1069
void RaftConsensusITest::CauseFollowerToFallBehindLogGC(string* leader_uuid,
1070
                                                        int64_t* orig_term,
1071
3
                                                        string* fell_behind_uuid) {
1072
3
  MonoDelta kTimeout = MonoDelta::FromSeconds(10);
1073
  // Wait for all of the replicas to have acknowledged the elected
1074
  // leader and logged the first NO_OP.
1075
3
  ASSERT_OK(WaitForServersToAgree(kTimeout, tablet_servers_, tablet_id_, 1));
1076
1077
  // Pause one server. This might be the leader, but pausing it will cause
1078
  // a leader election to happen.
1079
3
  TServerDetails* replica = (*tablet_replicas_.begin()).second;
1080
3
  ExternalTabletServer* replica_ets = cluster_->tablet_server_by_uuid(replica->uuid());
1081
3
  ASSERT_OK(replica_ets->Pause());
1082
1083
  // Find a leader. In case we paused the leader above, this will wait until
1084
  // we have elected a new one.
1085
3
  TServerDetails* leader = nullptr;
1086
3
  while (true) {
1087
3
    Status s = GetLeaderReplicaWithRetries(tablet_id_, &leader);
1088
3
    if (s.ok() && leader != nullptr && leader != replica) {
1089
3
      break;
1090
3
    }
1091
0
    SleepFor(MonoDelta::FromMilliseconds(10));
1092
0
  }
1093
3
  *leader_uuid = leader->uuid();
1094
3
  int leader_index = cluster_->tablet_server_index_by_uuid(*leader_uuid);
1095
1096
3
  TestWorkload workload(cluster_.get());
1097
3
  workload.set_table_name(kTableName);
1098
3
  workload.set_timeout_allowed(true);
1099
3
  workload.set_payload_bytes(128 * 1024);  // Write ops of size 128KB.
1100
3
  workload.set_write_batch_size(1);
1101
3
  workload.set_num_write_threads(4);
1102
3
  workload.Setup();
1103
3
  workload.Start();
1104
1105
3
  LOG(INFO) << "Waiting until we've written at least 4MB...";
1106
3
  while (workload.rows_inserted() < 8 * 4) {
1107
0
    SleepFor(MonoDelta::FromMilliseconds(10));
1108
0
  }
1109
3
  workload.StopAndJoin();
1110
1111
3
  LOG(INFO) << "Waiting for log GC on " << leader->uuid();
1112
  // Some WAL segments must exist, but wal segment 1 must not exist.
1113
1114
3
  ASSERT_OK(inspect_->WaitForFilePatternInTabletWalDirOnTs(
1115
3
      leader_index, tablet_id_, { "wal-" }, { "wal-000000001" }));
1116
1117
0
  LOG(INFO) << "Log GC complete on " << leader->uuid();
1118
1119
  // Then wait another couple of seconds to be sure that it has bothered to try
1120
  // to write to the paused peer.
1121
  // TODO: would be nice to be able to poll the leader with an RPC like
1122
  // GetLeaderStatus() which could tell us whether it has made any requests
1123
  // since the log GC.
1124
0
  SleepFor(MonoDelta::FromSeconds(2));
1125
1126
  // Make a note of whatever the current term of the cluster is,
1127
  // before we resume the follower.
1128
0
  {
1129
0
    OpId op_id = ASSERT_RESULT(GetLastOpIdForReplica(
1130
0
        tablet_id_, leader, consensus::RECEIVED_OPID, kTimeout));
1131
0
    *orig_term = op_id.term;
1132
0
    LOG(INFO) << "Servers converged with original term " << *orig_term;
1133
0
  }
1134
1135
  // Resume the follower.
1136
0
  LOG(INFO) << "Resuming " << replica->uuid();
1137
0
  ASSERT_OK(replica_ets->Resume());
1138
1139
  // Ensure that none of the tablet servers crashed.
1140
0
  for (size_t i = 0; i < cluster_->num_tablet_servers(); i++) {
1141
    // Make sure it didn't crash.
1142
0
    ASSERT_TRUE(cluster_->tablet_server(i)->IsProcessAlive())
1143
0
      << "Tablet server " << i << " crashed";
1144
0
  }
1145
0
  *fell_behind_uuid = replica->uuid();
1146
0
}
1147
1148
2
void RaftConsensusITest::TestAddRemoveServer(PeerMemberType member_type) {
1149
2
  ASSERT_TRUE(member_type == PeerMemberType::PRE_VOTER ||
1150
2
              member_type == PeerMemberType::PRE_OBSERVER);
1151
1152
2
  MonoDelta kTimeout = MonoDelta::FromSeconds(30);
1153
2
  FLAGS_num_tablet_servers = 3;
1154
2
  vector<string> ts_flags = {
1155
2
    "--enable_leader_failure_detection=false"s,
1156
2
    "--TEST_inject_latency_before_change_role_secs=1"s,
1157
2
  };
1158
2
  vector<string> master_flags = {
1159
2
    "--catalog_manager_wait_for_new_tablets_to_elect_leader=false"s,
1160
2
    "--use_create_table_leader_hint=false"s,
1161
2
  };
1162
2
  ASSERT_NO_FATALS(BuildAndStart(ts_flags, master_flags));
1163
1164
2
  vector<TServerDetails*> tservers = TServerDetailsVector(tablet_servers_);
1165
2
  ASSERT_EQ(FLAGS_num_tablet_servers, tservers.size());
1166
1167
  // Elect server 0 as leader and wait for log index 1 to propagate to all servers.
1168
2
  TServerDetails* leader_tserver = tservers[0];
1169
2
  const string& leader_uuid = tservers[0]->uuid();
1170
2
  ASSERT_OK(StartElection(leader_tserver, tablet_id_, kTimeout));
1171
2
  ASSERT_OK(WaitForServersToAgree(kTimeout, tablet_servers_, tablet_id_, 1));
1172
2
  ASSERT_OK(WaitUntilCommittedOpIdIndexIs(1, leader_tserver, tablet_id_, kTimeout));
1173
1174
  // Make sure the server rejects removal of itself from the configuration.
1175
2
  Status s = RemoveServer(leader_tserver, tablet_id_, leader_tserver, boost::none, kTimeout, NULL,
1176
2
                          false /* retry */);
1177
4
  ASSERT_TRUE(s.IsInvalidArgument()) << "Should not be able to remove self from config: "
1178
4
                                     << s.ToString();
1179
1180
  // Insert the row that we will update throughout the test.
1181
2
  ASSERT_OK(WriteSimpleTestRow(
1182
2
      leader_tserver, tablet_id_, kTestRowKey, kTestRowIntVal, "initial insert", kTimeout));
1183
1184
  // Kill the master, so we can change the config without interference.
1185
2
  cluster_->master()->Shutdown();
1186
1187
2
  auto active_tablet_servers = CreateTabletServerMapUnowned(tablet_servers_);
1188
1189
  // Do majority correctness check for 3 servers.
1190
2
  ASSERT_NO_FATALS(AssertMajorityRequiredForElectionsAndWrites(active_tablet_servers, leader_uuid));
1191
2
  int64_t cur_log_index = ASSERT_RESULT(GetLastOpIdForReplica(
1192
2
      tablet_id_, leader_tserver, consensus::RECEIVED_OPID, kTimeout)).index;
1193
1194
  // Go from 3 tablet servers down to 1 in the configuration.
1195
2
  vector<int> remove_list = { 2, 1 };
1196
4
  for (int to_remove_idx : remove_list) {
1197
4
    auto num_servers = active_tablet_servers.size();
1198
4
    LOG(INFO) << "Remove: Going from " << num_servers << " to " << num_servers - 1 << " replicas";
1199
1200
4
    TServerDetails* tserver_to_remove = tservers[to_remove_idx];
1201
4
    LOG(INFO) << "Removing tserver with uuid " << tserver_to_remove->uuid();
1202
4
    ASSERT_OK(RemoveServer(leader_tserver, tablet_id_, tserver_to_remove, boost::none, kTimeout));
1203
4
    ASSERT_EQ(1, active_tablet_servers.erase(tserver_to_remove->uuid()));
1204
4
    ASSERT_OK(WaitForServersToAgree(kTimeout, active_tablet_servers, tablet_id_, ++cur_log_index));
1205
1206
    // Do majority correctness check for each incremental decrease.
1207
4
    ASSERT_NO_FATALS(AssertMajorityRequiredForElectionsAndWrites(
1208
4
        active_tablet_servers, leader_uuid));
1209
4
    cur_log_index = ASSERT_RESULT(GetLastOpIdForReplica(
1210
4
        tablet_id_, leader_tserver, consensus::RECEIVED_OPID, kTimeout)).index;
1211
4
  }
1212
1213
2
  int num_observers = 0;
1214
  // Add the tablet servers back, in reverse order, going from 1 to 3 servers in the configuration.
1215
2
  vector<int> add_list = { 1, 2 };
1216
4
  for (int to_add_idx : add_list) {
1217
4
    auto num_servers = active_tablet_servers.size();
1218
4
    if (PeerMemberType::PRE_VOTER == member_type) {
1219
2
      LOG(INFO) << "Add: Going from " << num_servers << " to " << num_servers + 1 << " replicas";
1220
2
    } else {
1221
2
      LOG(INFO) << "Add: Going from " << num_observers<< " to " << num_observers + 1
1222
2
                << " observers";
1223
2
    }
1224
1225
4
    TServerDetails* tserver_to_add = tservers[to_add_idx];
1226
4
    LOG(INFO) << "Adding tserver with uuid " << tserver_to_add->uuid();
1227
1228
4
    ASSERT_OK(DeleteTablet(tserver_to_add, tablet_id_, tablet::TABLET_DATA_TOMBSTONED, boost::none,
1229
4
                           kTimeout));
1230
1231
4
    ASSERT_OK(AddServer(leader_tserver, tablet_id_, tserver_to_add, member_type, boost::none,
1232
4
                        kTimeout));
1233
1234
4
    consensus::ConsensusStatePB cstate;
1235
4
    ASSERT_OK(itest::GetConsensusState(leader_tserver, tablet_id_,
1236
4
                                       consensus::CONSENSUS_CONFIG_COMMITTED, kTimeout, &cstate));
1237
1238
    // Verify that this tserver member type was set correctly.
1239
10
    for (const auto& peer : cstate.config().peers()) {
1240
10
      if (peer.permanent_uuid() == tserver_to_add->uuid()) {
1241
4
        ASSERT_EQ(member_type, peer.member_type());
1242
4
        LOG(INFO) << "tserver with uuid " << tserver_to_add->uuid() << " was added as a "
1243
4
                  << PeerMemberType_Name(peer.member_type());
1244
4
      }
1245
10
    }
1246
1247
4
    if (PeerMemberType::PRE_VOTER == member_type) {
1248
2
      InsertOrDie(&active_tablet_servers, tserver_to_add->uuid(), tserver_to_add);
1249
2
    } else {
1250
2
      num_observers++;
1251
2
    }
1252
1253
    // Wait until the ChangeConfig has finished and this tserver has been made a VOTER.
1254
4
    ASSERT_OK(WaitUntilCommittedConfigNumVotersIs(active_tablet_servers.size(), leader_tserver,
1255
4
                                                    tablet_id_, MonoDelta::FromSeconds(10)));
1256
4
    ASSERT_OK(WaitUntilCommittedConfigMemberTypeIs(num_observers, leader_tserver, tablet_id_,
1257
4
                                                   MonoDelta::FromSeconds(10),
1258
4
                                                   PeerMemberType::OBSERVER));
1259
1260
4
    ASSERT_OK(WaitForServersToAgree(kTimeout, active_tablet_servers, tablet_id_, ++cur_log_index));
1261
1262
    // Do majority correctness check for each incremental increase.
1263
4
    ASSERT_NO_FATALS(AssertMajorityRequiredForElectionsAndWrites(
1264
4
        active_tablet_servers, leader_uuid));
1265
4
    cur_log_index = ASSERT_RESULT(GetLastOpIdForReplica(
1266
4
        tablet_id_, leader_tserver, consensus::RECEIVED_OPID, kTimeout)).index;
1267
4
  }
1268
2
}
1269
1270
2
void RaftConsensusITest::TestRemoveTserverFailsWhenServerInTransition(PeerMemberType member_type) {
1271
2
  ASSERT_TRUE(member_type == PeerMemberType::PRE_VOTER ||
1272
2
              member_type == PeerMemberType::PRE_OBSERVER);
1273
1274
2
  FLAGS_num_tablet_servers = 3;
1275
2
  vector<string> ts_flags = {
1276
2
    "--enable_leader_failure_detection=false"s,
1277
2
    "--TEST_inject_latency_before_change_role_secs=10"s,
1278
2
  };
1279
2
  vector<string> master_flags = {
1280
2
    "--catalog_manager_wait_for_new_tablets_to_elect_leader=false",
1281
2
    "--use_create_table_leader_hint=false"s,
1282
2
  };
1283
2
  ASSERT_NO_FATALS(BuildAndStart(ts_flags, master_flags));
1284
1285
2
  vector<TServerDetails*> tservers = TServerDetailsVector(tablet_servers_);
1286
2
  ASSERT_EQ(FLAGS_num_tablet_servers, tservers.size());
1287
1288
  // Elect server 0 as leader and wait for log index 1 to propagate to all servers.
1289
2
  TServerDetails* initial_leader = tservers[0];
1290
2
  const MonoDelta timeout = MonoDelta::FromSeconds(10);
1291
2
  ASSERT_OK(StartElection(initial_leader, tablet_id_, timeout));
1292
2
  ASSERT_OK(WaitForServersToAgree(timeout, tablet_servers_, tablet_id_, 1));
1293
2
  ASSERT_OK(WaitUntilCommittedOpIdIndexIs(1, initial_leader, tablet_id_, timeout));
1294
1295
  // The server we will remove and then bring back.
1296
2
  TServerDetails* tserver = tservers[2];
1297
1298
  // Kill the master, so we can change the config without interference.
1299
2
  cluster_->master()->Shutdown();
1300
1301
  // Now remove server 2 from the configuration.
1302
2
  auto active_tablet_servers = CreateTabletServerMapUnowned(tablet_servers_);
1303
2
  LOG(INFO) << "Removing tserver with uuid " << tserver->uuid();
1304
2
  ASSERT_OK(RemoveServer(initial_leader, tablet_id_, tserver, boost::none,
1305
2
                         MonoDelta::FromSeconds(10)));
1306
2
  ASSERT_EQ(1, active_tablet_servers.erase(tserver->uuid()));
1307
2
  int64_t cur_log_index = 2;
1308
2
  ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(10),
1309
2
                                  active_tablet_servers, tablet_id_, cur_log_index));
1310
2
  ASSERT_OK(WaitUntilCommittedOpIdIndexIs(cur_log_index, initial_leader, tablet_id_, timeout));
1311
1312
2
  ASSERT_OK(DeleteTablet(tserver, tablet_id_, tablet::TABLET_DATA_TOMBSTONED, boost::none,
1313
2
                         MonoDelta::FromSeconds(30)));
1314
1315
  // Now add server 2 back as a learner to the peers.
1316
2
  LOG(INFO) << "Adding back Peer " << tserver->uuid();
1317
2
  ASSERT_OK(AddServer(initial_leader, tablet_id_, tserver, member_type, boost::none,
1318
2
                      MonoDelta::FromSeconds(10)));
1319
1320
  // Only wait for TS 0 and 1 to agree that the new change config op (ADD_SERVER for server 2)
1321
  // has been replicated.
1322
2
  ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(60),
1323
2
                                  active_tablet_servers, tablet_id_, ++cur_log_index));
1324
1325
  // Now try to remove server 1 from the configuration. This should fail.
1326
2
  LOG(INFO) << "Removing tserver with uuid " << tservers[1]->uuid();
1327
2
  auto status = RemoveServer(initial_leader, tablet_id_, tservers[1], boost::none,
1328
2
                             MonoDelta::FromSeconds(10), NULL, false /* retry */);
1329
2
  ASSERT_TRUE(status.IsIllegalState());
1330
2
  ASSERT_STR_CONTAINS(status.ToString(), "Leader is not ready for Config Change");
1331
2
}
1332
1333
// In TestRemoveTserverFailsWhenServerInTransition we are testing that a REMOVE_SERVER request
1334
// operation fails whenever the committed config contains a server in PRE_VOTER or PRE_OBSERVER
1335
// mode. In this test we are testing that a REMOVE_SERVER operation succeeds whenever the committed
1336
// config contains a PRE_VOTER or PRE_OBSERVER mode and it's the same server we are trying to
1337
// remove.
1338
2
void RaftConsensusITest::TestRemoveTserverInTransitionSucceeds(PeerMemberType member_type) {
1339
2
  ASSERT_TRUE(member_type == PeerMemberType::PRE_VOTER ||
1340
2
              member_type == PeerMemberType::PRE_OBSERVER);
1341
1342
2
  FLAGS_num_tablet_servers = 3;
1343
2
  vector<string> ts_flags = {
1344
2
    "--enable_leader_failure_detection=false"s,
1345
2
    "--TEST_skip_change_role"s,
1346
2
  };
1347
2
  vector<string> master_flags = {
1348
2
    "--catalog_manager_wait_for_new_tablets_to_elect_leader=false"s,
1349
2
    "--use_create_table_leader_hint=false"s,
1350
2
  };
1351
2
  ASSERT_NO_FATALS(BuildAndStart(ts_flags, master_flags));
1352
1353
2
  vector<TServerDetails*> tservers = TServerDetailsVector(tablet_servers_);
1354
2
  ASSERT_EQ(FLAGS_num_tablet_servers, tservers.size());
1355
1356
  // Elect server 0 as leader and wait for log index 1 to propagate to all servers.
1357
2
  TServerDetails* initial_leader = tservers[0];
1358
2
  const MonoDelta timeout = MonoDelta::FromSeconds(10);
1359
2
  ASSERT_OK(StartElection(initial_leader, tablet_id_, timeout));
1360
2
  ASSERT_OK(WaitForServersToAgree(timeout, tablet_servers_, tablet_id_, 1));
1361
2
  ASSERT_OK(WaitUntilCommittedOpIdIndexIs(1, initial_leader, tablet_id_, timeout));
1362
1363
  // The server we will remove and then bring back.
1364
2
  TServerDetails* tserver = tservers[2];
1365
1366
  // Kill the master, so we can change the config without interference.
1367
2
  cluster_->master()->Shutdown();
1368
1369
  // Now remove server 2 from the configuration.
1370
2
  auto active_tablet_servers = CreateTabletServerMapUnowned(tablet_servers_);
1371
2
  LOG(INFO) << "Removing tserver with uuid " << tserver->uuid();
1372
2
  ASSERT_OK(RemoveServer(initial_leader, tablet_id_, tserver, boost::none,
1373
2
                         MonoDelta::FromSeconds(10)));
1374
2
  ASSERT_EQ(1, active_tablet_servers.erase(tserver->uuid()));
1375
2
  int64_t cur_log_index = 2;
1376
2
  ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(10),
1377
2
                                  active_tablet_servers, tablet_id_, cur_log_index));
1378
2
  ASSERT_OK(WaitUntilCommittedOpIdIndexIs(cur_log_index, initial_leader, tablet_id_, timeout));
1379
1380
2
  ASSERT_OK(DeleteTablet(tserver, tablet_id_, tablet::TABLET_DATA_TOMBSTONED, boost::none,
1381
2
                         MonoDelta::FromSeconds(30)));
1382
1383
  // Now add server 2 back in PRE_VOTER or PRE_OBSERVER mode. This server will never transition to
1384
  // VOTER or OBSERVER because flag TEST_skip_change_role is set.
1385
2
  LOG(INFO) << "Adding back Peer " << tserver->uuid();
1386
2
  ASSERT_OK(AddServer(initial_leader, tablet_id_, tserver, member_type, boost::none,
1387
2
                      MonoDelta::FromSeconds(10)));
1388
1389
  // Only wait for TS 0 and 1 to agree that the new change config op (ADD_SERVER for server 2)
1390
  // has been replicated.
1391
2
  ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(60),
1392
2
                                  active_tablet_servers, tablet_id_, ++cur_log_index));
1393
1394
  // Now try to remove server 2 from the configuration. This should succeed.
1395
2
  LOG(INFO) << "Removing tserver with uuid " << tservers[2]->uuid();
1396
2
  ASSERT_OK(RemoveServer(initial_leader, tablet_id_, tservers[2], boost::none,
1397
2
                         MonoDelta::FromSeconds(10)));
1398
2
}
1399
1400
// Test that the leader doesn't crash if one of its followers has
1401
// fallen behind so far that the logs necessary to catch it up
1402
// have been GCed.
1403
//
1404
// In a real cluster, this will eventually cause the follower to be
1405
// evicted/replaced. In any case, the leader should not crash.
1406
//
1407
// We also ensure that, when the leader stops writing to the follower,
1408
// the follower won't disturb the other nodes when it attempts to elect
1409
// itself.
1410
//
1411
// This is a regression test for KUDU-775 and KUDU-562.
1412
1
TEST_F(RaftConsensusITest, TestFollowerFallsBehindLeaderGC) {
1413
  // Disable follower eviction to maintain the original intent of this test.
1414
1
  vector<string> extra_flags = { "--evict_failed_followers=false" };
1415
1
  AddFlagsForLogRolls(&extra_flags);  // For CauseFollowerToFallBehindLogGC().
1416
1
  ASSERT_NO_FATALS(BuildAndStart(extra_flags, std::vector<std::string>()));
1417
1418
1
  string leader_uuid;
1419
1
  int64_t orig_term;
1420
1
  string follower_uuid;
1421
1
  ASSERT_NO_FATALS(CauseFollowerToFallBehindLogGC(&leader_uuid, &orig_term, &follower_uuid));
1422
1423
  // Wait for remaining majority to agree.
1424
0
  auto active_tablet_servers = CreateTabletServerMapUnowned(tablet_servers_);
1425
0
  ASSERT_EQ(3, active_tablet_servers.size());
1426
0
  ASSERT_EQ(1, active_tablet_servers.erase(follower_uuid));
1427
0
  ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(30), active_tablet_servers, tablet_id_,
1428
0
                                  1));
1429
1430
0
  if (AllowSlowTests()) {
1431
    // Sleep long enough that the "abandoned" server's leader election interval
1432
    // will trigger several times. Then, verify that the term has not increased.
1433
    // This ensures that the other servers properly ignore the election requests
1434
    // from the abandoned node.
1435
    // TODO: would be nicer to use an RPC to check the current term of the
1436
    // abandoned replica, and wait until it has incremented a couple of times.
1437
0
    SleepFor(MonoDelta::FromSeconds(5));
1438
0
    TServerDetails* leader = tablet_servers_[leader_uuid].get();
1439
0
    auto op_id = ASSERT_RESULT(GetLastOpIdForReplica(
1440
0
        tablet_id_, leader, consensus::RECEIVED_OPID, MonoDelta::FromSeconds(10)));
1441
0
    ASSERT_EQ(orig_term, op_id.term)
1442
0
      << "expected the leader to have not advanced terms but has op " << op_id;
1443
0
  }
1444
0
}
1445
1446
9
int RaftConsensusITest::RestartAnyCrashedTabletServers() {
1447
9
  int restarted = 0;
1448
36
  for (size_t i = 0; i < cluster_->num_tablet_servers(); i++) {
1449
27
    if (!cluster_->tablet_server(i)->IsProcessAlive()) {
1450
3
      LOG(INFO) << "TS " << i << " appears to have crashed. Restarting.";
1451
3
      cluster_->tablet_server(i)->Shutdown();
1452
3
      CHECK_OK(cluster_->tablet_server(i)->Restart());
1453
3
      restarted++;
1454
3
    }
1455
27
  }
1456
9
  return restarted;
1457
9
}
1458
1459
60
void RaftConsensusITest::AssertNoTabletServersCrashed() {
1460
240
  for (size_t i = 0; i < cluster_->num_tablet_servers(); i++) {
1461
180
    if (cluster_->tablet_server(i)->IsShutdown()) continue;
1462
1463
360
    ASSERT_TRUE(cluster_->tablet_server(i)->IsProcessAlive())
1464
360
      << "Tablet server " << i << " crashed";
1465
180
  }
1466
60
}
1467
1468
// This test starts several tablet servers, and configures them with
1469
// fault injection so that the leaders frequently crash just before
1470
// sending RPCs to followers.
1471
//
1472
// This can result in various scenarios where leaders crash right after
1473
// being elected and never succeed in replicating their first operation.
1474
// For example, KUDU-783 reproduces from this test approximately 5% of the
1475
// time on a slow-test debug build.
1476
1
TEST_F(RaftConsensusITest, InsertWithCrashyNodes) {
1477
1
  int kCrashesToCause = 3;
1478
1
  vector<string> ts_flags, master_flags;
1479
1
  if (AllowSlowTests()) {
1480
0
    FLAGS_num_tablet_servers = 7;
1481
0
    FLAGS_num_replicas = 7;
1482
0
    kCrashesToCause = 15;
1483
0
    master_flags.push_back("--replication_factor=7");
1484
0
  }
1485
1486
  // Crash 5% of the time just before sending an RPC. With 7 servers,
1487
  // this means we crash about 30% of the time before we've fully
1488
  // replicated the NO_OP at the start of the term.
1489
1
  ts_flags.push_back("--TEST_fault_crash_on_leader_request_fraction=0.05");
1490
1491
  // Inject latency to encourage the replicas to fall out of sync
1492
  // with each other.
1493
1
  ts_flags.push_back("--log_inject_latency");
1494
1
  ts_flags.push_back("--log_inject_latency_ms_mean=30");
1495
1
  ts_flags.push_back("--log_inject_latency_ms_stddev=60");
1496
1497
  // Make leader elections faster so we get through more cycles of
1498
  // leaders.
1499
1
  ts_flags.push_back("--raft_heartbeat_interval_ms=100");
1500
1501
  // Avoid preallocating segments since bootstrap is a little bit
1502
  // faster if it doesn't have to scan forward through the preallocated
1503
  // log area.
1504
1
  ts_flags.push_back("--log_preallocate_segments=false");
1505
1506
1
  CreateCluster("raft_consensus-itest-cluster", ts_flags, master_flags);
1507
1508
1
  TestWorkload workload(cluster_.get());
1509
1
  workload.set_timeout_allowed(true);
1510
1
  workload.set_write_timeout_millis(1000);
1511
1
  workload.set_num_write_threads(10);
1512
1
  workload.set_write_batch_size(1);
1513
1
  workload.set_sequential_write(true);
1514
1
  workload.Setup(client::YBTableType::YQL_TABLE_TYPE);
1515
1
  workload.Start();
1516
1517
1
  int num_crashes = 0;
1518
10
  while (num_crashes < kCrashesToCause &&
1519
9
         workload.rows_inserted() < 100) {
1520
9
    num_crashes += RestartAnyCrashedTabletServers();
1521
9
    SleepFor(MonoDelta::FromMilliseconds(10));
1522
9
  }
1523
1524
1
  workload.StopAndJoin();
1525
1526
  // After we stop the writes, we can still get crashes because heartbeats could
1527
  // trigger the fault path. So, disable the faults and restart one more time.
1528
4
  for (size_t i = 0; i < cluster_->num_tablet_servers(); i++) {
1529
3
    ExternalTabletServer* ts = cluster_->tablet_server(i);
1530
3
    vector<string>* flags = ts->mutable_flags();
1531
3
    bool removed_flag = false;
1532
18
    for (auto it = flags->begin(); it != flags->end(); ++it) {
1533
18
      if (HasPrefixString(*it, "--TEST_fault_crash")) {
1534
3
        flags->erase(it);
1535
3
        removed_flag = true;
1536
3
        break;
1537
3
      }
1538
18
    }
1539
6
    ASSERT_TRUE(removed_flag) << "could not remove flag from TS " << i
1540
6
                              << "\nFlags:\n" << *flags;
1541
3
    ts->Shutdown();
1542
3
    CHECK_OK(ts->Restart());
1543
3
  }
1544
1545
  // Ensure that the replicas converge.
1546
  // We don't know exactly how many rows got inserted, since the writer
1547
  // probably saw many errors which left inserts in indeterminate state.
1548
  // But, we should have at least as many as we got confirmation for.
1549
1
  ClusterVerifier cluster_verifier(cluster_.get());
1550
1
  ASSERT_NO_FATALS(cluster_verifier.CheckCluster());
1551
1
  ASSERT_NO_FATALS(cluster_verifier.CheckRowCount(workload.table_name(), ClusterVerifier::AT_LEAST,
1552
1
                            workload.rows_inserted()));
1553
1
}
1554
1555
// This test sets all of the election timers to be very short, resulting
1556
// in a lot of churn. We expect to make some progress and not diverge or
1557
// crash, despite the frequent re-elections and races.
1558
1
TEST_F(RaftConsensusITest, TestChurnyElections) {
1559
1
  DoTestChurnyElections(WITHOUT_NOTIFICATION_LATENCY);
1560
1
}
1561
1562
// The same test, except inject artificial latency when propagating notifications
1563
// from the queue back to consensus. This can reproduce bugs like KUDU-1078 which
1564
// normally only appear under high load. TODO: Re-enable once we get to the
1565
// bottom of KUDU-1078.
1566
0
TEST_F(RaftConsensusITest, DISABLED_TestChurnyElections_WithNotificationLatency) {
1567
0
  DoTestChurnyElections(WITH_NOTIFICATION_LATENCY);
1568
0
}
1569
1570
1
void RaftConsensusITest::DoTestChurnyElections(bool with_latency) {
1571
1
  vector<string> ts_flags, master_flags;
1572
1573
  // On TSAN builds, we need to be a little bit less churny in order to make
1574
  // any progress at all.
1575
1
  ts_flags.push_back(Format("--raft_heartbeat_interval_ms=$0", NonTsanVsTsan(5, 25)));
1576
1
  ts_flags.push_back("--never_fsync");
1577
1
  if (with_latency) {
1578
0
    ts_flags.push_back("--consensus_inject_latency_ms_in_notifications=50");
1579
0
  }
1580
1581
1
  CreateCluster("raft_consensus-itest-cluster", ts_flags, master_flags);
1582
1583
1
  TestWorkload workload(cluster_.get());
1584
1
  workload.set_timeout_allowed(true);
1585
1
  workload.set_write_timeout_millis(100);
1586
1
  workload.set_num_write_threads(2);
1587
1
  workload.set_write_batch_size(1);
1588
1
  workload.set_sequential_write(true);
1589
1
  workload.Setup();
1590
1
  workload.Start();
1591
1592
  // Run for either a prescribed number of writes, or 30 seconds,
1593
  // whichever comes first. This prevents test timeouts on slower
1594
  // build machines, TSAN builds, etc.
1595
1
  Stopwatch sw;
1596
1
  sw.start();
1597
1
  const int kNumWrites = AllowSlowTests() ? 10000 : 1000;
1598
61
  while (workload.rows_inserted() < kNumWrites &&
1599
60
         (sw.elapsed().wall_seconds() < 30 ||
1600
          // If no rows are inserted, run a little longer.
1601
60
          (workload.rows_inserted() == 0 && sw.elapsed().wall_seconds() < 90))) {
1602
60
    SleepFor(MonoDelta::FromMilliseconds(10));
1603
60
    ASSERT_NO_FATALS(AssertNoTabletServersCrashed());
1604
60
  }
1605
1
  workload.StopAndJoin();
1606
1
  LOG(INFO) << "rows_inserted=" << workload.rows_inserted();
1607
2
  ASSERT_GT(workload.rows_inserted(), 0) << "No rows inserted";
1608
1609
  // Ensure that the replicas converge.
1610
  // We don't know exactly how many rows got inserted, since the writer
1611
  // probably saw many errors which left inserts in indeterminate state.
1612
  // But, we should have at least as many as we got confirmation for.
1613
1
  ClusterVerifier cluster_verifier(cluster_.get());
1614
1
  ASSERT_NO_FATALS(cluster_verifier.CheckCluster());
1615
1
  ASSERT_NO_FATALS(cluster_verifier.CheckRowCount(
1616
1
      workload.table_name(), ClusterVerifier::AT_LEAST, workload.rows_inserted()));
1617
0
  ASSERT_NO_FATALS(AssertNoTabletServersCrashed());
1618
0
}
1619
1620
1
TEST_F(RaftConsensusITest, MultiThreadedInsertWithFailovers) {
1621
1
  int kNumElections = FLAGS_num_replicas;
1622
1
  vector<string> master_flags;
1623
1624
1
  if (AllowSlowTests()) {
1625
0
    FLAGS_num_tablet_servers = 7;
1626
0
    FLAGS_num_replicas = 7;
1627
0
    kNumElections = 3 * FLAGS_num_replicas;
1628
0
    master_flags.push_back("--replication_factor=7");
1629
0
  }
1630
1631
  // Reset consensus rpc timeout to the default value or the election might fail often.
1632
1
  FLAGS_consensus_rpc_timeout_ms = 1000;
1633
1634
  // Start a 7 node configuration cluster (since we can't bring leaders back we start with a
1635
  // higher replica count so that we kill more leaders).
1636
1637
1
  ASSERT_NO_FATALS(BuildAndStart({}, master_flags));
1638
1639
1
  OverrideFlagForSlowTests(
1640
1
      "client_inserts_per_thread",
1641
1
      strings::Substitute("$0", (FLAGS_client_inserts_per_thread * 100)));
1642
1
  OverrideFlagForSlowTests(
1643
1
      "client_num_batches_per_thread",
1644
1
      strings::Substitute("$0", (FLAGS_client_num_batches_per_thread * 100)));
1645
1646
1
  int num_threads = FLAGS_num_client_threads;
1647
1
  int64_t total_num_rows = num_threads * FLAGS_client_inserts_per_thread;
1648
1649
  // We create 2 * (kNumReplicas - 1) latches so that we kill the same node at least
1650
  // twice.
1651
1
  vector<CountDownLatch*> latches;
1652
3
  for (int i = 1; i < kNumElections; i++) {
1653
2
    latches.push_back(new CountDownLatch((i * total_num_rows)  / kNumElections));
1654
2
  }
1655
1656
9
  for (int i = 0; i < num_threads; i++) {
1657
8
    scoped_refptr<yb::Thread> new_thread;
1658
8
    CHECK_OK(yb::Thread::Create("test", strings::Substitute("ts-test$0", i),
1659
8
                                  &RaftConsensusITest::InsertTestRowsRemoteThread,
1660
8
                                  this, i * FLAGS_client_inserts_per_thread,
1661
8
                                  FLAGS_client_inserts_per_thread,
1662
8
                                  FLAGS_client_num_batches_per_thread,
1663
8
                                  latches,
1664
8
                                  &new_thread));
1665
8
    threads_.push_back(new_thread);
1666
8
  }
1667
1668
2
  for (CountDownLatch* latch : latches) {
1669
2
    latch->Wait();
1670
2
    StopOrKillLeaderAndElectNewOne();
1671
2
  }
1672
1673
8
  for (scoped_refptr<yb::Thread> thr : threads_) {
1674
8
    CHECK_OK(ThreadJoiner(thr.get()).Join());
1675
8
  }
1676
1677
1
  ASSERT_ALL_REPLICAS_AGREE(FLAGS_client_inserts_per_thread * FLAGS_num_client_threads);
1678
0
  STLDeleteElements(&latches);
1679
0
}
1680
1681
// Test automatic leader election by killing leaders.
1682
1
TEST_F(RaftConsensusITest, TestAutomaticLeaderElection) {
1683
1
  vector<string> master_flags;
1684
1
  if (AllowSlowTests()) {
1685
0
    FLAGS_num_tablet_servers = 5;
1686
0
    FLAGS_num_replicas = 5;
1687
0
    master_flags.push_back("--replication_factor=5");
1688
0
  }
1689
1
  ASSERT_NO_FATALS(BuildAndStart({}, master_flags));
1690
1691
1
  TServerDetails* leader;
1692
1
  ASSERT_OK(GetLeaderReplicaWithRetries(tablet_id_, &leader));
1693
1694
1
  unordered_set<TServerDetails*> killed_leaders;
1695
1696
1
  const int kNumLeadersToKill = FLAGS_num_replicas / 2;
1697
1
  const int kFinalNumReplicas = FLAGS_num_replicas / 2 + 1;
1698
1699
3
  for (int leaders_killed = 0; leaders_killed < kFinalNumReplicas; leaders_killed++) {
1700
2
    LOG(INFO) << Substitute("Writing data to leader of $0-node config ($1 alive)...",
1701
2
                            FLAGS_num_replicas, FLAGS_num_replicas - leaders_killed);
1702
1703
2
    ASSERT_NO_FATALS(InsertTestRowsRemoteThread(
1704
2
        leaders_killed * FLAGS_client_inserts_per_thread,
1705
2
        FLAGS_client_inserts_per_thread,
1706
2
        FLAGS_client_num_batches_per_thread,
1707
2
        vector<CountDownLatch*>()));
1708
1709
    // At this point, the writes are flushed but the commit index may not be
1710
    // propagated to all replicas. We kill the leader anyway.
1711
2
    if (leaders_killed < kNumLeadersToKill) {
1712
1
      LOG(INFO) << "Killing current leader " << leader->instance_id.permanent_uuid() << "...";
1713
1
      cluster_->tablet_server_by_uuid(leader->uuid())->Shutdown();
1714
1
      InsertOrDie(&killed_leaders, leader);
1715
1716
1
      LOG(INFO) << "Waiting for new guy to be elected leader.";
1717
1
      ASSERT_OK(GetLeaderReplicaWithRetries(tablet_id_, &leader));
1718
1
    }
1719
2
  }
1720
1721
  // Restart every node that was killed, and wait for the nodes to converge.
1722
1
  for (TServerDetails* killed_node : killed_leaders) {
1723
1
    CHECK_OK(cluster_->tablet_server_by_uuid(killed_node->uuid())->Restart());
1724
1
  }
1725
  // Verify the data on the remaining replicas.
1726
1
  ASSERT_ALL_REPLICAS_AGREE(FLAGS_client_inserts_per_thread * kFinalNumReplicas);
1727
1
}
1728
1729
// Single-replica leader election test.
1730
1
TEST_F(RaftConsensusITest, TestAutomaticLeaderElectionOneReplica) {
1731
1
  FLAGS_num_tablet_servers = 1;
1732
1
  FLAGS_num_replicas = 1;
1733
1
  ASSERT_NO_FATALS(BuildAndStart({}, {"--replication_factor=1"}));
1734
1735
1
  TServerDetails* leader;
1736
1
  ASSERT_OK(GetLeaderReplicaWithRetries(tablet_id_, &leader));
1737
1
}
1738
1739
3
void RaftConsensusITest::StubbornlyWriteSameRowThread(int replica_idx, const AtomicBool* finish) {
1740
3
  vector<TServerDetails*> servers = TServerDetailsVector(tablet_servers_);
1741
1742
3
  CHECK_LT(replica_idx, servers.size());
1743
3
  TServerDetails* ts = servers[replica_idx];
1744
1745
  // Manually construct an RPC to our target replica. We expect most of the calls
1746
  // to fail either with an "already present" or an error because we are writing
1747
  // to a follower. That's OK, though - what we care about for this test is
1748
  // just that the operations Apply() in the same order everywhere (even though
1749
  // in this case the result will just be an error).
1750
3
  WriteRequestPB req;
1751
3
  WriteResponsePB resp;
1752
3
  RpcController rpc;
1753
3
  req.set_tablet_id(tablet_id_);
1754
3
  AddTestRowInsert(kTestRowKey, kTestRowIntVal, "hello world", &req);
1755
1756
58.8k
  while (!finish->Load()) {
1757
58.8k
    resp.Clear();
1758
58.8k
    rpc.Reset();
1759
58.8k
    rpc.set_timeout(MonoDelta::FromSeconds(10));
1760
58.8k
    WARN_NOT_OK(ts->tserver_proxy->Write(req, &resp, &rpc), "Write failed");
1761
10
    VLOG(1) << "Response from server " << replica_idx << ": "
1762
10
            << resp.ShortDebugString();
1763
58.8k
  }
1764
3
}
1765
1766
// Regression test for KUDU-597, an issue where we could mis-order operations on
1767
// a machine if the following sequence occurred:
1768
//  1) Replica is a FOLLOWER
1769
//  2) A client request hits the machine
1770
//  3) It receives some operations from the current leader
1771
//  4) It gets elected LEADER
1772
// In this scenario, it would incorrectly sequence the client request's PREPARE phase
1773
// before the operations received in step (3), even though the correct behavior would be
1774
// to either reject them or sequence them after those operations, because the operation
1775
// index is higher.
1776
//
1777
// The test works by setting up three replicas and manually hammering them with write
1778
// requests targeting a single row. If the bug exists, then OperationOrderVerifier
1779
// will trigger an assertion because the prepare order and the op indexes will become
1780
// misaligned.
1781
1
TEST_F(RaftConsensusITest, VerifyTransactionOrder) {
1782
1
  FLAGS_num_tablet_servers = 3;
1783
1
  ASSERT_NO_FATALS(BuildAndStart(vector<string>()));
1784
1785
1
  AtomicBool finish(false);
1786
4
  for (int i = 0; i < FLAGS_num_tablet_servers; i++) {
1787
3
    scoped_refptr<yb::Thread> new_thread;
1788
3
    CHECK_OK(yb::Thread::Create("test", strings::Substitute("ts-test$0", i),
1789
3
                                  &RaftConsensusITest::StubbornlyWriteSameRowThread,
1790
3
                                  this, i, &finish, &new_thread));
1791
3
    threads_.push_back(new_thread);
1792
3
  }
1793
1794
1
  const int num_loops = AllowSlowTests() ? 10 : 1;
1795
2
  for (int i = 0; i < num_loops; i++) {
1796
1
    StopOrKillLeaderAndElectNewOne();
1797
1
    SleepFor(MonoDelta::FromSeconds(1));
1798
1
    ASSERT_OK(CheckTabletServersAreAlive(FLAGS_num_tablet_servers));
1799
1
  }
1800
1801
1
  finish.Store(true);
1802
3
  for (scoped_refptr<yb::Thread> thr : threads_) {
1803
3
    CHECK_OK(ThreadJoiner(thr.get()).Join());
1804
3
  }
1805
1
}
1806
1807
215
void RaftConsensusITest::AddOp(const OpIdPB& id, ConsensusRequestPB* req) {
1808
215
  ReplicateMsg* msg = req->add_ops();
1809
215
  msg->mutable_id()->CopyFrom(id);
1810
215
  msg->set_hybrid_time(clock_->Now().ToUint64());
1811
215
  msg->set_op_type(consensus::WRITE_OP);
1812
215
  auto* write_req = msg->mutable_write();
1813
215
  int32_t key = static_cast<int32_t>(id.index() * 10000 + id.term());
1814
215
  string str_val = Substitute("term: $0 index: $1", id.term(), id.index());
1815
215
  AddKVToPB(key, key + 10, str_val, write_req->mutable_write_batch());
1816
215
}
1817
1818
// Regression test for KUDU-644:
1819
// Triggers some complicated scenarios on the replica involving aborting and
1820
// replacing transactions.
1821
1
TEST_F(RaftConsensusITest, TestReplicaBehaviorViaRPC) {
1822
1
  FLAGS_num_tablet_servers = 3;
1823
1
  auto ts_flags = {
1824
1
    "--enable_leader_failure_detection=false"s,
1825
1
    "--max_wait_for_safe_time_ms=100"s,
1826
1
    "--propagate_safe_time=false"s,
1827
    // Disable bounded stale reads because the
1828
    // update consensus requests generated by this
1829
    // test don't update the hybrid timesetamp, so
1830
    // follower reads will always be stale.
1831
1
    "--max_stale_read_bound_time_ms=0"s
1832
1
  };
1833
1
  auto master_flags = {
1834
1
    "--catalog_manager_wait_for_new_tablets_to_elect_leader=false"s,
1835
1
    "--use_create_table_leader_hint=false"s,
1836
1
  };
1837
1
  ASSERT_NO_FATALS(BuildAndStart(ts_flags, master_flags));
1838
1839
  // Kill all the servers but one.
1840
1
  TServerDetails *replica_ts;
1841
1
  vector<TServerDetails*> tservers = TServerDetailsVector(tablet_servers_);
1842
1
  ASSERT_EQ(3, tservers.size());
1843
1844
  // Elect server 2 as leader and wait for log index 1 to propagate to all servers.
1845
1
  ASSERT_OK(StartElection(tservers[2], tablet_id_, MonoDelta::FromSeconds(10)));
1846
1
  ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(10), tablet_servers_, tablet_id_, 1));
1847
1848
1
  replica_ts = tservers[0];
1849
1
  cluster_->tablet_server_by_uuid(tservers[1]->uuid())->Shutdown();
1850
1
  cluster_->tablet_server_by_uuid(tservers[2]->uuid())->Shutdown();
1851
1852
1
  LOG(INFO) << "================================== Cluster setup complete.";
1853
1854
  // Check that the 'term' metric is correctly exposed.
1855
1
  {
1856
1
    int64_t term_from_metric = ASSERT_RESULT(
1857
1
        cluster_->tablet_server_by_uuid(replica_ts->uuid())->GetInt64Metric(
1858
1
            &METRIC_ENTITY_tablet,
1859
1
            nullptr,
1860
1
            &METRIC_raft_term,
1861
1
            "value"));
1862
1
    ASSERT_EQ(term_from_metric, 1);
1863
1
  }
1864
1865
1
  ConsensusServiceProxy* c_proxy = CHECK_NOTNULL(replica_ts->consensus_proxy.get());
1866
1867
1
  ConsensusRequestPB req;
1868
1
  ConsensusResponsePB resp;
1869
1
  RpcController rpc;
1870
1871
1
  LOG(INFO) << "Send a simple request with no ops.";
1872
1
  req.set_tablet_id(tablet_id_);
1873
1
  req.set_dest_uuid(replica_ts->uuid());
1874
1
  req.set_caller_uuid("fake_caller");
1875
1
  req.set_caller_term(2);
1876
1
  req.mutable_committed_op_id()->CopyFrom(MakeOpId(1, 1));
1877
1
  req.mutable_preceding_id()->CopyFrom(MakeOpId(1, 1));
1878
1879
1
  ASSERT_OK(c_proxy->UpdateConsensus(req, &resp, &rpc));
1880
2
  ASSERT_FALSE(resp.has_error()) << resp.DebugString();
1881
1882
1
  LOG(INFO) << "Send some operations, but don't advance the commit index. They should not commit.";
1883
1
  AddOp(MakeOpId(2, 2), &req);
1884
1
  AddOp(MakeOpId(2, 3), &req);
1885
1
  AddOp(MakeOpId(2, 4), &req);
1886
1
  rpc.Reset();
1887
1
  ASSERT_OK(c_proxy->UpdateConsensus(req, &resp, &rpc));
1888
2
  ASSERT_FALSE(resp.has_error()) << resp.DebugString();
1889
1890
1
  LOG(INFO) << "We shouldn't read anything yet, because the ops should be pending.";
1891
1
  {
1892
1
    vector<string> results;
1893
1
    ASSERT_NO_FATALS(ScanReplica(replica_ts->tserver_proxy.get(), &results));
1894
2
    ASSERT_EQ(0, results.size()) << results;
1895
1
  }
1896
1897
1
  LOG(INFO) << "Send op 2.6, but set preceding OpId to 2.4. "
1898
1
            << "This is an invalid request, and the replica should reject it.";
1899
1
  req.mutable_preceding_id()->CopyFrom(MakeOpId(2, 4));
1900
1
  req.clear_ops();
1901
1
  AddOp(MakeOpId(2, 6), &req);
1902
1
  rpc.Reset();
1903
1
  ASSERT_OK(c_proxy->UpdateConsensus(req, &resp, &rpc));
1904
2
  ASSERT_TRUE(resp.has_error()) << resp.DebugString();
1905
1
  ASSERT_EQ(resp.error().status().message(),
1906
1
            "New operation's index does not follow the previous op's index. "
1907
1
            "Current: 2.6. Previous: 2.4");
1908
1909
1
  resp.Clear();
1910
1
  req.clear_ops();
1911
1
  LOG(INFO) << "Send ops 3.5 and 2.6, then commit up to index 6, the replica "
1912
1
            << "should fail because of the out-of-order terms.";
1913
1
  req.mutable_preceding_id()->CopyFrom(MakeOpId(2, 4));
1914
1
  AddOp(MakeOpId(3, 5), &req);
1915
1
  AddOp(MakeOpId(2, 6), &req);
1916
1
  rpc.Reset();
1917
1
  ASSERT_OK(c_proxy->UpdateConsensus(req, &resp, &rpc));
1918
2
  ASSERT_TRUE(resp.has_error()) << resp.DebugString();
1919
1
  ASSERT_EQ(resp.error().status().message(),
1920
1
            "New operation's term is not >= than the previous op's term. "
1921
1
            "Current: 2.6. Previous: 3.5");
1922
1923
1
  LOG(INFO) << "Regression test for KUDU-639";
1924
  // If we send a valid request, but the
1925
  // current commit index is higher than the data we're sending, we shouldn't
1926
  // commit anything higher than the last op sent by the leader.
1927
  //
1928
  // To test, we re-send operation 2.3, with the correct preceding ID 2.2,
1929
  // but we set the committed index to 2.4. This should only commit
1930
  // 2.2 and 2.3.
1931
1
  resp.Clear();
1932
1
  req.clear_ops();
1933
1
  req.mutable_preceding_id()->CopyFrom(MakeOpId(2, 2));
1934
1
  AddOp(MakeOpId(2, 3), &req);
1935
1
  req.mutable_committed_op_id()->CopyFrom(MakeOpId(2, 4));
1936
1
  rpc.Reset();
1937
1
  ASSERT_OK(c_proxy->UpdateConsensus(req, &resp, &rpc));
1938
2
  ASSERT_FALSE(resp.has_error()) << resp.DebugString();
1939
1
  LOG(INFO) << "Verify only 2.2 and 2.3 are committed.";
1940
1
  {
1941
1
    vector<string> results;
1942
1
    ASSERT_NO_FATALS(WaitForRowCount(replica_ts->tserver_proxy.get(), 2, &results));
1943
1
    ASSERT_STR_CONTAINS(results[0], "term: 2 index: 2");
1944
1
    ASSERT_STR_CONTAINS(results[1], "term: 2 index: 3");
1945
1
  }
1946
1947
1
  resp.Clear();
1948
1
  req.clear_ops();
1949
1
  LOG(INFO) << "Now send some more ops, and commit the earlier ones.";
1950
1
  req.mutable_committed_op_id()->CopyFrom(MakeOpId(2, 4));
1951
1
  req.mutable_preceding_id()->CopyFrom(MakeOpId(2, 4));
1952
1
  AddOp(MakeOpId(2, 5), &req);
1953
1
  AddOp(MakeOpId(2, 6), &req);
1954
1
  rpc.Reset();
1955
1
  ASSERT_OK(c_proxy->UpdateConsensus(req, &resp, &rpc));
1956
2
  ASSERT_FALSE(resp.has_error()) << resp.DebugString();
1957
1958
1
  LOG(INFO) << "Verify they are committed.";
1959
1
  {
1960
1
    vector<string> results;
1961
1
    ASSERT_NO_FATALS(WaitForRowCount(replica_ts->tserver_proxy.get(), 3, &results));
1962
1
    ASSERT_STR_CONTAINS(results[0], "term: 2 index: 2");
1963
1
    ASSERT_STR_CONTAINS(results[1], "term: 2 index: 3");
1964
1
    ASSERT_STR_CONTAINS(results[2], "term: 2 index: 4");
1965
1
  }
1966
1967
1
  resp.Clear();
1968
1
  req.clear_ops();
1969
1
  int leader_term = 2;
1970
1
  const int kNumTerms = AllowSlowTests() ? 10000 : 100;
1971
99
  while (leader_term < kNumTerms) {
1972
98
    leader_term++;
1973
    // Now pretend to be a new leader (term 3) and replace the earlier ops
1974
    // without committing the new replacements.
1975
98
    req.set_caller_term(leader_term);
1976
98
    req.set_caller_uuid("new_leader");
1977
98
    req.mutable_preceding_id()->CopyFrom(MakeOpId(2, 4));
1978
98
    req.clear_ops();
1979
98
    AddOp(MakeOpId(leader_term, 5), &req);
1980
98
    AddOp(MakeOpId(leader_term, 6), &req);
1981
98
    rpc.Reset();
1982
98
    ASSERT_OK(c_proxy->UpdateConsensus(req, &resp, &rpc));
1983
196
    ASSERT_FALSE(resp.has_error()) << "Req: " << req.ShortDebugString()
1984
196
        << " Resp: " << resp.DebugString();
1985
98
  }
1986
1987
1
  LOG(INFO) << "Send an empty request from the newest term which should commit "
1988
1
            << "the earlier ops.";
1989
1
  {
1990
1
    req.mutable_preceding_id()->CopyFrom(MakeOpId(leader_term, 6));
1991
1
    req.mutable_committed_op_id()->CopyFrom(MakeOpId(leader_term, 6));
1992
1
    req.clear_ops();
1993
1
    rpc.Reset();
1994
1
    ASSERT_OK(c_proxy->UpdateConsensus(req, &resp, &rpc));
1995
2
    ASSERT_FALSE(resp.has_error()) << resp.DebugString();
1996
1
  }
1997
1998
1
  LOG(INFO) << "Verify the new rows are committed.";
1999
1
  {
2000
1
    vector<string> results;
2001
1
    ASSERT_NO_FATALS(WaitForRowCount(replica_ts->tserver_proxy.get(), 5, &results));
2002
1
    SCOPED_TRACE(results);
2003
1
    ASSERT_STR_CONTAINS(results[3], Substitute("term: $0 index: 5", leader_term));
2004
1
    ASSERT_STR_CONTAINS(results[4], Substitute("term: $0 index: 6", leader_term));
2005
1
  }
2006
1
}
2007
2008
1
TEST_F(RaftConsensusITest, TestLeaderStepDown) {
2009
1
  FLAGS_num_tablet_servers = 3;
2010
2011
1
  vector<string> ts_flags = {
2012
1
    "--enable_leader_failure_detection=false"s,
2013
1
  };
2014
1
  vector<string> master_flags = {
2015
1
    "--catalog_manager_wait_for_new_tablets_to_elect_leader=false"s,
2016
1
    "--use_create_table_leader_hint=false"s,
2017
1
  };
2018
1
  ASSERT_NO_FATALS(BuildAndStart(ts_flags, master_flags));
2019
2020
1
  vector<TServerDetails*> tservers = TServerDetailsVector(tablet_servers_);
2021
2022
  // Start with no leader.
2023
1
  Status s = GetReplicaStatusAndCheckIfLeader(tservers[0], tablet_id_, MonoDelta::FromSeconds(10));
2024
2
  ASSERT_TRUE(s.IsIllegalState()) << "TS #0 should not be leader yet: " << s.ToString();
2025
2026
  // Become leader.
2027
1
  ASSERT_OK(StartElection(tservers[0], tablet_id_, MonoDelta::FromSeconds(10)));
2028
1
  ASSERT_OK(WaitUntilLeader(tservers[0], tablet_id_, MonoDelta::FromSeconds(10)));
2029
1
  ASSERT_OK(WriteSimpleTestRow(
2030
1
      tservers[0], tablet_id_, kTestRowKey, kTestRowIntVal, "foo", MonoDelta::FromSeconds(10)));
2031
1
  ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(10), tablet_servers_, tablet_id_, 2));
2032
2033
1
  LOG(INFO) << "Stepping down leader " << tservers[0];
2034
  // Step down and verify that leadership gracefully transitions to a new leader
2035
  // despite failure detection being disabled.
2036
1
  bool allow_graceful_leader_transfer = true;
2037
1
  ASSERT_OK(LeaderStepDown(
2038
1
      tservers[0], tablet_id_, nullptr, MonoDelta::FromSeconds(10),
2039
1
      !allow_graceful_leader_transfer));
2040
1
  TServerDetails* new_leader_ts = nullptr;
2041
1
  ASSERT_OK(
2042
1
      FindTabletLeader(tablet_servers_, tablet_id_, MonoDelta::FromSeconds(10), &new_leader_ts));
2043
1
  ASSERT_TRUE(new_leader_ts != nullptr);
2044
1
  LOG(INFO) << "Identified new leader " << new_leader_ts;
2045
2046
  // Verify that further leader stepdowns and writes against the original leader do not succeed.
2047
1
  TabletServerErrorPB error;
2048
1
  s = LeaderStepDown(
2049
1
      tservers[0], tablet_id_, nullptr, MonoDelta::FromSeconds(10), !allow_graceful_leader_transfer,
2050
1
      &error);
2051
2
  ASSERT_TRUE(s.IsIllegalState()) << "TS #0 should not be leader anymore: " << s.ToString();
2052
2
  ASSERT_EQ(TabletServerErrorPB::NOT_THE_LEADER, error.code()) << error.ShortDebugString();
2053
2054
1
  s = WriteSimpleTestRow(
2055
1
      tservers[0], tablet_id_, kTestRowKey, kTestRowIntVal, "foo", MonoDelta::FromSeconds(10));
2056
2
  ASSERT_TRUE(s.IsIllegalState()) << "TS #0 should not accept writes as follower: "
2057
2
                                  << s.ToString();
2058
2059
  // Stepping down a leader with graceful transition disabled should
2060
  // result in no leader being elected
2061
1
  LOG(INFO) << "Stepping down leader " << new_leader_ts->uuid();
2062
1
  allow_graceful_leader_transfer = false;
2063
1
  ASSERT_OK(LeaderStepDown(
2064
1
      new_leader_ts, tablet_id_, nullptr, MonoDelta::FromSeconds(10),
2065
1
      !allow_graceful_leader_transfer));
2066
1
  TServerDetails* final_leader_ts = nullptr;
2067
1
  ASSERT_NOK(
2068
1
      FindTabletLeader(tablet_servers_, tablet_id_, MonoDelta::FromSeconds(10), &final_leader_ts));
2069
1
}
2070
2071
// Test for #350: sets the consensus RPC timeout to be long, and freezes both followers before
2072
// asking the leader to step down. Prior to fixing #350, the step-down process would block
2073
// until the pending requests timed out.
2074
1
TEST_F(RaftConsensusITest, TestStepDownWithSlowFollower) {
2075
1
  vector<string> ts_flags = {
2076
1
    "--enable_leader_failure_detection=false",
2077
    // Bump up the RPC timeout, so that we can verify that the stepdown responds
2078
    // quickly even when an outbound request is hung.
2079
1
    "--consensus_rpc_timeout_ms=15000",
2080
    // Make heartbeats more often so we can detect dead tservers faster.
2081
1
    "--raft_heartbeat_interval_ms=10",
2082
    // Set it high enough so that the election rpcs don't time out.
2083
1
    "--leader_failure_max_missed_heartbeat_periods=100"
2084
1
  };
2085
1
  vector<string> master_flags = {
2086
1
    "--catalog_manager_wait_for_new_tablets_to_elect_leader=false",
2087
1
    "--tserver_unresponsive_timeout_ms=5000"s,
2088
1
    "--use_create_table_leader_hint=false"s,
2089
1
  };
2090
1
  BuildAndStart(ts_flags, master_flags);
2091
2092
1
  vector<TServerDetails *> tservers = TServerDetailsVector(tablet_servers_);
2093
1
  ASSERT_OK(StartElection(tservers[0], tablet_id_, MonoDelta::FromSeconds(10)));
2094
1
  ASSERT_OK(WaitUntilLeader(tservers[0], tablet_id_, MonoDelta::FromSeconds(10)));
2095
2096
  // Stop both followers.
2097
3
  for (int i = 1; i < 3; i++) {
2098
2
    ASSERT_OK(cluster_->tablet_server_by_uuid(tservers[i]->uuid())->Pause());
2099
2
  }
2100
2101
  // Wait until the paused tservers have stopped heartbeating.
2102
1
  ASSERT_OK(WaitUntilNumberOfAliveTServersEqual(
2103
1
      1, cluster_->GetMasterProxy<master::MasterClusterProxy>(), MonoDelta::FromSeconds(20)));
2104
2105
  // Step down should respond quickly despite the hung requests.
2106
1
  ASSERT_OK(LeaderStepDown(tservers[0], tablet_id_, nullptr, MonoDelta::FromSeconds(3)));
2107
1
}
2108
2109
void RaftConsensusITest::AssertMajorityRequiredForElectionsAndWrites(
2110
12
    const TabletServerMapUnowned& tablet_servers, const string& leader_uuid) {
2111
2112
12
  TServerDetails* initial_leader = FindOrDie(tablet_servers, leader_uuid);
2113
2114
  // Calculate number of servers to leave unpaused (minority).
2115
  // This math is a little unintuitive but works for cluster sizes including 2 and 1.
2116
  // Note: We assume all of these TSes are voters.
2117
12
  auto config_size = tablet_servers.size();
2118
12
  auto minority_to_retain = MajoritySize(config_size) - 1;
2119
2120
  // Only perform this part of the test if we have some servers to pause, else
2121
  // the failure assertions will throw.
2122
12
  if (config_size > 1) {
2123
    // Pause enough replicas to prevent a majority.
2124
8
    auto num_to_pause = config_size - minority_to_retain;
2125
8
    LOG(INFO) << "Pausing " << num_to_pause << " tablet servers in config of size " << config_size;
2126
8
    vector<string> paused_uuids;
2127
20
    for (const auto& entry : tablet_servers) {
2128
20
      if (paused_uuids.size() == num_to_pause) {
2129
8
        continue;
2130
8
      }
2131
12
      const string& replica_uuid = entry.first;
2132
12
      if (replica_uuid == leader_uuid) {
2133
        // Always leave this one alone.
2134
0
        continue;
2135
0
      }
2136
12
      ExternalTabletServer* replica_ts = cluster_->tablet_server_by_uuid(replica_uuid);
2137
12
      ASSERT_OK(replica_ts->Pause());
2138
12
      paused_uuids.push_back(replica_uuid);
2139
12
    }
2140
2141
    // Ensure writes timeout while only a minority is alive.
2142
8
    Status s = WriteSimpleTestRow(initial_leader, tablet_id_,
2143
8
                                  kTestRowKey, kTestRowIntVal, "foo",
2144
8
                                  MonoDelta::FromMilliseconds(100));
2145
16
    ASSERT_TRUE(s.IsTimedOut()) << s.ToString();
2146
2147
    // Step down.
2148
8
    ASSERT_OK(LeaderStepDown(initial_leader, tablet_id_, nullptr, MonoDelta::FromSeconds(10)));
2149
2150
    // Assert that elections time out without a live majority.
2151
    // We specify a very short timeout here to keep the tests fast.
2152
8
    ASSERT_OK(StartElection(initial_leader, tablet_id_, MonoDelta::FromSeconds(10)));
2153
8
    s = WaitUntilLeader(initial_leader, tablet_id_, MonoDelta::FromMilliseconds(100));
2154
16
    ASSERT_TRUE(s.IsTimedOut()) << s.ToString();
2155
8
    LOG(INFO) << "Expected timeout encountered on election with weakened config: " << s.ToString();
2156
2157
    // Resume the paused servers.
2158
8
    LOG(INFO) << "Resuming " << num_to_pause << " tablet servers in config of size " << config_size;
2159
12
    for (const string& replica_uuid : paused_uuids) {
2160
12
      ExternalTabletServer* replica_ts = cluster_->tablet_server_by_uuid(replica_uuid);
2161
12
      ASSERT_OK(replica_ts->Resume());
2162
12
    }
2163
8
  }
2164
2165
12
  ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(20), tablet_servers, tablet_id_, 1));
2166
2167
  // Now an election should succeed.
2168
12
  ASSERT_OK(StartElection(initial_leader, tablet_id_, MonoDelta::FromSeconds(10)));
2169
12
  ASSERT_OK(WaitUntilLeader(initial_leader, tablet_id_, MonoDelta::FromSeconds(10)));
2170
12
  LOG(INFO) << "Successful election with full config of size " << config_size;
2171
2172
  // And a write should also succeed.
2173
12
  ASSERT_OK(WriteSimpleTestRow(initial_leader, tablet_id_,
2174
12
                               kTestRowKey, kTestRowIntVal, Substitute("qsz=$0", config_size),
2175
12
                               MonoDelta::FromSeconds(10)));
2176
12
}
2177
2178
// Return the replicas of the specified 'tablet_id', as seen by the Master.
2179
Status RaftConsensusITest::GetTabletLocations(const string& tablet_id, const MonoDelta& timeout,
2180
3
                                              master::TabletLocationsPB* tablet_locations) {
2181
3
  RpcController rpc;
2182
3
  rpc.set_timeout(timeout);
2183
3
  GetTabletLocationsRequestPB req;
2184
3
  *req.add_tablet_ids() = tablet_id;
2185
3
  GetTabletLocationsResponsePB resp;
2186
3
  RETURN_NOT_OK(cluster_->GetMasterProxy<master::MasterClientProxy>().GetTabletLocations(
2187
3
      req, &resp, &rpc));
2188
3
  if (resp.has_error()) {
2189
0
    return StatusFromPB(resp.error().status());
2190
0
  }
2191
3
  if (resp.errors_size() > 0) {
2192
0
    CHECK_EQ(1, resp.errors_size()) << resp.ShortDebugString();
2193
0
    CHECK_EQ(tablet_id, resp.errors(0).tablet_id()) << resp.ShortDebugString();
2194
0
    return StatusFromPB(resp.errors(0).status());
2195
0
  }
2196
0
  CHECK_EQ(1, resp.tablet_locations_size()) << resp.ShortDebugString();
2197
3
  *tablet_locations = resp.tablet_locations(0);
2198
3
  return Status::OK();
2199
3
}
2200
2201
void RaftConsensusITest::WaitForReplicasReportedToMaster(
2202
    int num_replicas, const string& tablet_id,
2203
    const MonoDelta& timeout,
2204
    WaitForLeader wait_for_leader,
2205
    bool* has_leader,
2206
3
    master::TabletLocationsPB* tablet_locations) {
2207
3
  MonoTime deadline(MonoTime::Now());
2208
3
  deadline.AddDelta(timeout);
2209
3
  while (true) {
2210
3
    ASSERT_OK(GetTabletLocations(tablet_id, timeout, tablet_locations));
2211
3
    *has_leader = false;
2212
3
    if (tablet_locations->replicas_size() == num_replicas) {
2213
3
      for (const master::TabletLocationsPB_ReplicaPB& replica :
2214
7
                    tablet_locations->replicas()) {
2215
7
        if (replica.role() == PeerRole::LEADER) {
2216
3
          *has_leader = true;
2217
3
        }
2218
7
      }
2219
3
      if (wait_for_leader == NO_WAIT_FOR_LEADER ||
2220
3
          (wait_for_leader == WAIT_FOR_LEADER && *has_leader)) {
2221
3
        break;
2222
3
      }
2223
0
    }
2224
0
    if (deadline.ComesBefore(MonoTime::Now())) break;
2225
0
    SleepFor(MonoDelta::FromMilliseconds(20));
2226
0
  }
2227
6
  ASSERT_EQ(num_replicas, tablet_locations->replicas_size()) << tablet_locations->DebugString();
2228
3
  if (wait_for_leader == WAIT_FOR_LEADER) {
2229
2
    ASSERT_TRUE(*has_leader) << tablet_locations->DebugString();
2230
1
  }
2231
3
}
2232
2233
// Basic tests of adding and removing servers from a configuration.
2234
1
TEST_F(RaftConsensusITest, TestAddRemoveVoter) {
2235
1
  TestAddRemoveServer(PeerMemberType::PRE_VOTER);
2236
1
}
2237
2238
1
TEST_F(RaftConsensusITest, TestAddRemoveObserver) {
2239
1
  TestAddRemoveServer(PeerMemberType::PRE_OBSERVER);
2240
1
}
2241
2242
// Regression test for KUDU-1169: a crash when a Config Change operation is replaced
2243
// by a later leader.
2244
1
TEST_F(RaftConsensusITest, TestReplaceChangeConfigOperation) {
2245
1
  FLAGS_num_tablet_servers = 3;
2246
1
  vector<string> ts_flags = { "--enable_leader_failure_detection=false"s };
2247
1
  vector<string> master_flags = {
2248
1
    "--catalog_manager_wait_for_new_tablets_to_elect_leader=false"s,
2249
1
    "--use_create_table_leader_hint=false"s,
2250
1
  };
2251
1
  ASSERT_NO_FATALS(BuildAndStart(ts_flags, master_flags));
2252
2253
1
  vector<TServerDetails*> tservers = TServerDetailsVector(tablet_servers_);
2254
1
  ASSERT_EQ(FLAGS_num_tablet_servers, tservers.size());
2255
2256
1
  TServerDetails* leader_tserver = tservers[0];
2257
1
  LOG(INFO) << "Elect server 0 (" << leader_tserver->uuid()
2258
1
            << ") as leader and wait for log index 1 to propagate to all servers.";
2259
2260
1
  auto original_followers = CreateTabletServerMapUnowned(tablet_servers_);
2261
1
  ASSERT_EQ(1, original_followers.erase(leader_tserver->uuid()));
2262
2263
1
  const MonoDelta timeout = MonoDelta::FromSeconds(10);
2264
1
  ASSERT_OK(StartElection(leader_tserver, tablet_id_, timeout));
2265
1
  ASSERT_OK(WaitForServersToAgree(timeout, tablet_servers_, tablet_id_, 1));
2266
1
  ASSERT_OK(WaitUntilCommittedOpIdIndexIs(1, leader_tserver, tablet_id_, timeout));
2267
2268
1
  LOG(INFO) << "Shut down servers 1 and 2, so that server 0 can't replicate anything.";
2269
1
  cluster_->tablet_server_by_uuid(tservers[1]->uuid())->Shutdown();
2270
1
  cluster_->tablet_server_by_uuid(tservers[2]->uuid())->Shutdown();
2271
2272
1
  LOG(INFO) << "Now try to replicate a ChangeConfig operation. This should get stuck and time out";
2273
1
  LOG(INFO) << "because the server can't replicate any operations.";
2274
1
  TabletServerErrorPB::Code error_code;
2275
1
  Status s = RemoveServer(leader_tserver, tablet_id_, tservers[1],
2276
1
                          -1, MonoDelta::FromSeconds(1),
2277
1
                          &error_code, false /* retry */);
2278
1
  ASSERT_TRUE(s.IsTimedOut());
2279
2280
1
  LOG(INFO) << "Pause the leader, and restart the other servers.";
2281
1
  ASSERT_OK(cluster_->tablet_server_by_uuid(tservers[0]->uuid())->Pause());
2282
1
  ASSERT_OK(cluster_->tablet_server_by_uuid(tservers[1]->uuid())->Restart());
2283
1
  ASSERT_OK(cluster_->tablet_server_by_uuid(tservers[2]->uuid())->Restart());
2284
2285
1
  ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(10), original_followers, tablet_id_, 1));
2286
2287
1
  LOG(INFO) << "Elect one of the other servers: " << tservers[1]->uuid();
2288
1
  ASSERT_OK(StartElection(tservers[1], tablet_id_, MonoDelta::FromSeconds(10)));
2289
1
  ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(10), original_followers, tablet_id_, 2));
2290
2291
1
  LOG(INFO) << "Resume the original leader. Its change-config operation will now be aborted "
2292
1
               "since it was never replicated to the majority, and the new leader will have "
2293
1
               "replaced the operation.";
2294
1
  ASSERT_OK(cluster_->tablet_server_by_uuid(tservers[0]->uuid())->Resume());
2295
2296
1
  LOG(INFO) << "Insert some data and verify that it propagates to all servers.";
2297
1
  ASSERT_NO_FATALS(InsertTestRowsRemoteThread(0, 10, 1, vector<CountDownLatch*>()));
2298
1
  ASSERT_ALL_REPLICAS_AGREE(10);
2299
1
}
2300
2301
// Test the atomic CAS arguments to ChangeConfig() add server and remove server.
2302
1
TEST_F(RaftConsensusITest, TestAtomicAddRemoveServer) {
2303
1
  FLAGS_num_tablet_servers = 3;
2304
1
  vector<string> ts_flags = { "--enable_leader_failure_detection=false" };
2305
1
  vector<string> master_flags = {
2306
1
    "--catalog_manager_wait_for_new_tablets_to_elect_leader=false"s,
2307
1
    "--use_create_table_leader_hint=false"s,
2308
1
  };
2309
1
  ASSERT_NO_FATALS(BuildAndStart(ts_flags, master_flags));
2310
2311
1
  vector<TServerDetails*> tservers = TServerDetailsVector(tablet_servers_);
2312
1
  ASSERT_EQ(FLAGS_num_tablet_servers, tservers.size());
2313
2314
  // Elect server 0 as leader and wait for log index 1 to propagate to all servers.
2315
1
  TServerDetails* leader_tserver = tservers[0];
2316
1
  const MonoDelta timeout = MonoDelta::FromSeconds(10);
2317
1
  ASSERT_OK(StartElection(leader_tserver, tablet_id_, timeout));
2318
1
  ASSERT_OK(WaitForServersToAgree(timeout, tablet_servers_, tablet_id_, 1));
2319
1
  ASSERT_OK(WaitUntilCommittedOpIdIndexIs(1, leader_tserver, tablet_id_, timeout));
2320
1
  int64_t cur_log_index = 1;
2321
2322
1
  auto active_tablet_servers = CreateTabletServerMapUnowned(tablet_servers_);
2323
2324
1
  TServerDetails* follower_ts = tservers[2];
2325
2326
  // Initial committed config should have opid_index == -1.
2327
  // Server should reject request to change config from opid other than this.
2328
1
  int64_t invalid_committed_opid_index = 7;
2329
1
  TabletServerErrorPB::Code error_code;
2330
1
  Status s = RemoveServer(leader_tserver, tablet_id_, follower_ts,
2331
1
                          invalid_committed_opid_index, MonoDelta::FromSeconds(10),
2332
1
                          &error_code, false /* retry */);
2333
1
  ASSERT_EQ(TabletServerErrorPB::CAS_FAILED, error_code);
2334
1
  ASSERT_STR_CONTAINS(s.ToString(), "of 7 but the committed config has opid_index of -1");
2335
2336
  // Specifying the correct committed opid index should work.
2337
1
  int64_t committed_opid_index = -1;
2338
1
  ASSERT_OK(RemoveServer(leader_tserver, tablet_id_, follower_ts,
2339
1
                         committed_opid_index, MonoDelta::FromSeconds(10)));
2340
2341
1
  ASSERT_EQ(1, active_tablet_servers.erase(follower_ts->uuid()));
2342
1
  ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(10),
2343
1
                                  active_tablet_servers, tablet_id_, ++cur_log_index));
2344
2345
  // Now, add the server back. Again, specifying something other than the
2346
  // latest committed_opid_index should fail.
2347
1
  invalid_committed_opid_index = -1;  // The old one is no longer valid.
2348
1
  s = AddServer(leader_tserver, tablet_id_, follower_ts, PeerMemberType::VOTER,
2349
1
                invalid_committed_opid_index, MonoDelta::FromSeconds(10),
2350
1
                &error_code, false /* retry */);
2351
1
  ASSERT_EQ(TabletServerErrorPB::CAS_FAILED, error_code);
2352
1
  ASSERT_STR_CONTAINS(s.ToString(), "of -1 but the committed config has opid_index of 2");
2353
2354
  // Specifying the correct committed opid index should work.
2355
  // The previous config change op is the latest entry in the log.
2356
1
  committed_opid_index = cur_log_index;
2357
1
  ASSERT_OK(AddServer(leader_tserver, tablet_id_, follower_ts, PeerMemberType::PRE_VOTER,
2358
1
                      committed_opid_index, MonoDelta::FromSeconds(10)));
2359
2360
1
  InsertOrDie(&active_tablet_servers, follower_ts->uuid(), follower_ts);
2361
1
  ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(10),
2362
1
                                  active_tablet_servers, tablet_id_, ++cur_log_index));
2363
1
}
2364
2365
// Ensure that we can elect a server that is in the "pending" configuration.  This is required by
2366
// the Raft protocol. See Diego Ongaro's PhD thesis, section 4.1, where it states that "it is the
2367
// caller's configuration that is used in reaching consensus, both for voting and for log
2368
// replication".
2369
//
2370
// This test also tests the case where a node comes back from the dead to a leader that was not in
2371
// its configuration when it died. That should also work, i.e.  the revived node should accept
2372
// writes from the new leader.
2373
1
TEST_F(RaftConsensusITest, TestElectPendingVoter) {
2374
  // Test plan:
2375
  //  1. Disable failure detection to avoid non-deterministic behavior.
2376
  //  2. Start with a configuration size of 5, all servers synced.
2377
  //  3. Remove one server from the configuration, wait until committed.
2378
  //  4. Pause the 3 remaining non-leaders (SIGSTOP).
2379
  //  5. Run a config change to add back the previously-removed server.
2380
  //     Ensure that, while the op cannot be committed yet due to lack of a
2381
  //     majority in the new config (only 2 out of 5 servers are alive), the op
2382
  //     has been replicated to both the local leader and the new member.
2383
  //  6. Force the existing leader to step down.
2384
  //  7. Resume one of the paused nodes so that a majority (of the 5-node
2385
  //     configuration, but not the original 4-node configuration) will be available.
2386
  //  8. Start a leader election on the new (pending) node. It should win.
2387
  //  9. Unpause the two remaining stopped nodes.
2388
  // 10. Wait for all nodes to sync to the new leader's log.
2389
1
  FLAGS_num_tablet_servers = 5;
2390
1
  FLAGS_num_replicas = 5;
2391
1
  vector<string> ts_flags = {
2392
1
    "--enable_leader_failure_detection=false"s,
2393
1
    "--TEST_inject_latency_before_change_role_secs=10"s,
2394
1
  };
2395
1
  vector<string> master_flags = {
2396
1
    "--catalog_manager_wait_for_new_tablets_to_elect_leader=false"s,
2397
1
    "--replication_factor=5"s,
2398
1
    "--use_create_table_leader_hint=false"s,
2399
1
  };
2400
1
  ASSERT_NO_FATALS(BuildAndStart(ts_flags, master_flags));
2401
2402
1
  vector<TServerDetails*> tservers = TServerDetailsVector(tablet_servers_);
2403
1
  ASSERT_EQ(FLAGS_num_tablet_servers, tservers.size());
2404
2405
  // Elect server 0 as leader and wait for log index 1 to propagate to all servers.
2406
1
  TServerDetails* initial_leader = tservers[0];
2407
1
  const MonoDelta timeout = MonoDelta::FromSeconds(10);
2408
1
  ASSERT_OK(StartElection(initial_leader, tablet_id_, timeout));
2409
1
  ASSERT_OK(WaitForServersToAgree(timeout, tablet_servers_, tablet_id_, 1));
2410
0
  ASSERT_OK(WaitUntilCommittedOpIdIndexIs(1, initial_leader, tablet_id_, timeout));
2411
2412
  // The server we will remove and then bring back.
2413
0
  TServerDetails* final_leader = tservers[4];
2414
2415
  // Kill the master, so we can change the config without interference.
2416
0
  cluster_->master()->Shutdown();
2417
2418
  // Now remove server 4 from the configuration.
2419
0
  auto active_tablet_servers = CreateTabletServerMapUnowned(tablet_servers_);
2420
0
  LOG(INFO) << "Removing tserver with uuid " << final_leader->uuid();
2421
0
  ASSERT_OK(RemoveServer(initial_leader, tablet_id_, final_leader, boost::none,
2422
0
                         MonoDelta::FromSeconds(10)));
2423
0
  ASSERT_EQ(1, active_tablet_servers.erase(final_leader->uuid()));
2424
0
  int64_t cur_log_index = 2;
2425
0
  ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(10),
2426
0
                                  active_tablet_servers, tablet_id_, cur_log_index));
2427
0
  ASSERT_OK(WaitUntilCommittedOpIdIndexIs(cur_log_index, initial_leader, tablet_id_, timeout));
2428
2429
0
  ASSERT_OK(DeleteTablet(final_leader, tablet_id_, tablet::TABLET_DATA_TOMBSTONED, boost::none,
2430
0
                         MonoDelta::FromSeconds(30)));
2431
2432
  // Now add server 4 back as a learner to the peers.
2433
0
  LOG(INFO) << "Adding back Peer " << final_leader->uuid() << " and expecting timeout...";
2434
0
  ASSERT_OK(AddServer(
2435
0
      initial_leader, tablet_id_, final_leader, PeerMemberType::PRE_VOTER, boost::none,
2436
0
      MonoDelta::FromSeconds(10)));
2437
2438
  // Pause tablet servers 1 through 3, so they won't see the operation to change server 4 from
2439
  // learner to voter which will happen automatically once remote bootstrap for server 4 is
2440
  // completed.
2441
0
  LOG(INFO) << "Pausing 3 replicas...";
2442
0
  for (int i = 1; i <= 3; i++) {
2443
0
    ExternalTabletServer* replica_ts = cluster_->tablet_server_by_uuid(tservers[i]->uuid());
2444
0
    ASSERT_OK(replica_ts->Pause());
2445
0
  }
2446
2447
  // Reset to the unpaused servers.
2448
0
  active_tablet_servers = CreateTabletServerMapUnowned(tablet_servers_);
2449
0
  for (int i = 1; i <= 3; i++) {
2450
0
    ASSERT_EQ(1, active_tablet_servers.erase(tservers[i]->uuid()));
2451
0
  }
2452
2453
  // Adding a server will cause two calls to ChangeConfig. One to add the server as a learner,
2454
  // and another one to change its role to voter.
2455
0
  ++cur_log_index;
2456
2457
  // Only wait for TS 0 and 4 to agree that the new change config op (CHANGE_ROLE for server 4,
2458
  // which will be automatically sent by the leader once remote bootstrap has completed) has been
2459
  // replicated.
2460
0
  ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(60),
2461
0
                                  active_tablet_servers, tablet_id_, ++cur_log_index));
2462
2463
  // Now that TS 4 is electable (and pending), have TS 0 step down.
2464
0
  LOG(INFO) << "Forcing Peer " << initial_leader->uuid() << " to step down...";
2465
0
  Status status = LeaderStepDown(initial_leader, tablet_id_, nullptr, MonoDelta::FromSeconds(10));
2466
  // We allow illegal state for now as the leader step down does not succeed in this case when it
2467
  // has a pending config. Peer 4 will get elected though as it has new term and others
2468
  // in quorum to vote.
2469
0
  if (!status.IsIllegalState()) {
2470
0
    ASSERT_OK(status);
2471
0
  } else {
2472
0
    LOG(INFO) << "Resuming Peer " << tservers[2]->uuid() << " as leader did not step down...";
2473
0
    ASSERT_OK(cluster_->tablet_server_by_uuid(tservers[2]->uuid())->Resume());
2474
0
    InsertOrDie(&active_tablet_servers, tservers[2]->uuid(), tservers[2]);
2475
0
  }
2476
  // Resume TS 1 so we have a majority of 3 to elect a new leader.
2477
0
  LOG(INFO) << "Resuming Peer " << tservers[1]->uuid() << " ...";
2478
0
  ASSERT_OK(cluster_->tablet_server_by_uuid(tservers[1]->uuid())->Resume());
2479
0
  InsertOrDie(&active_tablet_servers, tservers[1]->uuid(), tservers[1]);
2480
2481
  // Now try to get TS 4 elected. It should succeed and push a NO_OP.
2482
0
  LOG(INFO) << "Trying to elect Peer " << tservers[4]->uuid() << " ...";
2483
0
  ASSERT_OK(StartElection(final_leader, tablet_id_, MonoDelta::FromSeconds(10)));
2484
0
  ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(10),
2485
0
                                  active_tablet_servers, tablet_id_, ++cur_log_index));
2486
2487
  // Resume the remaining paused nodes.
2488
0
  LOG(INFO) << "Resuming remaining nodes...";
2489
0
  ASSERT_OK(cluster_->tablet_server_by_uuid(tservers[2]->uuid())->Resume());
2490
0
  ASSERT_OK(cluster_->tablet_server_by_uuid(tservers[3]->uuid())->Resume());
2491
0
  active_tablet_servers = CreateTabletServerMapUnowned(tablet_servers_);
2492
2493
  // Wait until the leader is sure that the old leader's lease is over.
2494
0
  ASSERT_OK(WaitUntilLeader(final_leader, tablet_id_,
2495
0
      MonoDelta::FromMilliseconds(GetAtomicFlag(&FLAGS_leader_lease_duration_ms))));
2496
2497
  // Do one last operation on the new leader: an insert.
2498
0
  ASSERT_OK(WriteSimpleTestRow(final_leader, tablet_id_,
2499
0
                               kTestRowKey, kTestRowIntVal, "Ob-La-Di, Ob-La-Da",
2500
0
                               MonoDelta::FromSeconds(10)));
2501
2502
  // Wait for all servers to replicate everything up through the last write op.
2503
0
  ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(10),
2504
0
                                  active_tablet_servers, tablet_id_, ++cur_log_index));
2505
0
}
2506
2507
// Writes test rows in ascending order to a single tablet server.
2508
// Essentially a poor-man's version of TestWorkload that only operates on a
2509
// single tablet. Does not batch, does not tolerate timeouts, and does not
2510
// interact with the Master. 'rows_inserted' is used to determine row id and is
2511
// incremented prior to each successful insert. Since a write failure results in
2512
// a crash, as long as there is no crash then 'rows_inserted' will have a
2513
// correct count at the end of the run.
2514
// Crashes on any failure, so 'write_timeout' should be high.
2515
void DoWriteTestRows(const TServerDetails* leader_tserver,
2516
                     const string& tablet_id,
2517
                     const MonoDelta& write_timeout,
2518
                     std::atomic<int32_t>* rows_inserted,
2519
                     std::atomic<int32_t>* row_key,
2520
8
                     const std::atomic<bool>* finish) {
2521
6.14k
  while (!finish->load()) {
2522
6.13k
    int cur_row_key = ++*row_key;
2523
6.13k
    Status write_status = WriteSimpleTestRow(
2524
6.13k
        leader_tserver, tablet_id, cur_row_key, cur_row_key,
2525
6.13k
        Substitute("key=$0", cur_row_key), write_timeout);
2526
6.13k
    if (!write_status.IsLeaderHasNoLease() &&
2527
6.11k
        !write_status.IsLeaderNotReadyToServe()) {
2528
      // Temporary failures to write because of not having a valid leader lease are OK. We don't
2529
      // increment the number of rows inserted in that case.
2530
6.11k
      CHECK_OK(write_status);
2531
6.11k
      ++*rows_inserted;
2532
6.11k
    }
2533
6.13k
  }
2534
8
}
2535
2536
// Test that config change works while running a workload.
2537
1
TEST_F(RaftConsensusITest, TestConfigChangeUnderLoad) {
2538
1
  FLAGS_num_tablet_servers = 3;
2539
1
  vector<string> ts_flags = { "--enable_leader_failure_detection=false" };
2540
1
  vector<string> master_flags = {
2541
1
    "--catalog_manager_wait_for_new_tablets_to_elect_leader=false"s,
2542
1
    "--use_create_table_leader_hint=false"s,
2543
1
  };
2544
1
  ASSERT_NO_FATALS(BuildAndStart(ts_flags, master_flags));
2545
2546
1
  vector<TServerDetails*> tservers = TServerDetailsVector(tablet_servers_);
2547
1
  ASSERT_EQ(FLAGS_num_tablet_servers, tservers.size());
2548
2549
  // Elect server 0 as leader and wait for log index 1 to propagate to all servers.
2550
1
  TServerDetails* leader_tserver = tservers[0];
2551
1
  MonoDelta timeout = MonoDelta::FromSeconds(10);
2552
1
  ASSERT_OK(StartElection(leader_tserver, tablet_id_, timeout));
2553
1
  ASSERT_OK(WaitForServersToAgree(timeout, tablet_servers_, tablet_id_, 1));
2554
1
  ASSERT_OK(WaitUntilCommittedOpIdIndexIs(1, leader_tserver, tablet_id_, timeout));
2555
2556
1
  auto active_tablet_servers = CreateTabletServerMapUnowned(tablet_servers_);
2557
2558
  // Start a write workload.
2559
1
  LOG(INFO) << "Starting write workload...";
2560
1
  std::atomic<int32_t> rows_inserted(0);
2561
1
  std::atomic<int32_t> row_key(0);
2562
1
  {
2563
1
    std::atomic<bool> finish(false);
2564
1
    vector<scoped_refptr<Thread> > threads;
2565
1
    auto se = ScopeExit([&threads, &finish] {
2566
1
      LOG(INFO) << "Joining writer threads...";
2567
1
      finish = true;
2568
8
      for (const scoped_refptr<Thread> &thread : threads) {
2569
8
        ASSERT_OK(ThreadJoiner(thread.get()).Join());
2570
8
      }
2571
1
    });
2572
2573
1
    int num_threads = FLAGS_num_client_threads;
2574
9
    for (int i = 0; i < num_threads; i++) {
2575
8
      scoped_refptr<Thread> thread;
2576
8
      ASSERT_OK(Thread::Create(CURRENT_TEST_NAME(), Substitute("row-writer-$0", i),
2577
8
          &DoWriteTestRows,
2578
8
          leader_tserver,
2579
8
          tablet_id_,
2580
8
          MonoDelta::FromSeconds(10),
2581
8
          &rows_inserted,
2582
8
          &row_key,
2583
8
          &finish,
2584
8
          &thread));
2585
8
      threads.push_back(thread);
2586
8
    }
2587
2588
1
    LOG(INFO) << "Removing servers...";
2589
    // Go from 3 tablet servers down to 1 in the configuration.
2590
1
    vector<int> remove_list = {2, 1};
2591
2
    for (int to_remove_idx : remove_list) {
2592
2
      auto num_servers = active_tablet_servers.size();
2593
2
      LOG(INFO) << "Remove: Going from " << num_servers << " to " << num_servers - 1 << " replicas";
2594
2595
2
      TServerDetails *tserver_to_remove = tservers[to_remove_idx];
2596
2
      LOG(INFO) << "Removing tserver with uuid " << tserver_to_remove->uuid();
2597
2
      ASSERT_OK(RemoveServer(leader_tserver, tablet_id_, tserver_to_remove, boost::none,
2598
2
          MonoDelta::FromSeconds(10)));
2599
2
      ASSERT_EQ(1, active_tablet_servers.erase(tserver_to_remove->uuid()));
2600
2
      ASSERT_OK(WaitUntilCommittedConfigNumVotersIs(active_tablet_servers.size(),
2601
2
          leader_tserver, tablet_id_,
2602
2
          MonoDelta::FromSeconds(10)));
2603
2
    }
2604
2605
1
    LOG(INFO) << "Adding servers...";
2606
    // Add the tablet servers back, in reverse order, going from 1 to 3 servers in the
2607
    // configuration.
2608
1
    vector<int> add_list = {1, 2};
2609
2
    for (int to_add_idx : add_list) {
2610
2
      auto num_servers = active_tablet_servers.size();
2611
2
      LOG(INFO) << "Add: Going from " << num_servers << " to " << num_servers + 1 << " replicas";
2612
2613
2
      TServerDetails *tserver_to_add = tservers[to_add_idx];
2614
2
      LOG(INFO) << "Adding tserver with uuid " << tserver_to_add->uuid();
2615
2
      ASSERT_OK(AddServer(leader_tserver, tablet_id_, tserver_to_add, PeerMemberType::PRE_VOTER,
2616
2
          boost::none, MonoDelta::FromSeconds(10)));
2617
2
      InsertOrDie(&active_tablet_servers, tserver_to_add->uuid(), tserver_to_add);
2618
2
      ASSERT_OK(WaitUntilCommittedConfigNumVotersIs(active_tablet_servers.size(),
2619
2
          leader_tserver, tablet_id_,
2620
2
          MonoDelta::FromSeconds(10)));
2621
2
    }
2622
1
  }
2623
2624
1
  LOG(INFO) << "Waiting for replicas to agree... (rows_inserted=" << rows_inserted
2625
1
            << ", unique row keys used: " << row_key << ")";
2626
  // Wait for all servers to replicate everything up through the last write op.
2627
  // Since we don't batch, there should be at least # rows inserted log entries,
2628
  // plus the initial leader's no-op, plus 2 for the removed servers, plus 2 for
2629
  // the added servers for a total of 5.
2630
1
  int min_log_index = rows_inserted + 5;
2631
1
  ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(10),
2632
1
                                  active_tablet_servers, tablet_id_,
2633
1
                                  min_log_index));
2634
2635
1
  LOG(INFO) << "Number of rows inserted: " << rows_inserted.load();
2636
1
  ASSERT_ALL_REPLICAS_AGREE(rows_inserted.load());
2637
1
}
2638
2639
1
TEST_F(RaftConsensusITest, TestMasterNotifiedOnConfigChange) {
2640
1
  MonoDelta timeout = MonoDelta::FromSeconds(30);
2641
1
  FLAGS_num_tablet_servers = 3;
2642
1
  FLAGS_num_replicas = 2;
2643
1
  vector<string> ts_flags;
2644
1
  vector<string> master_flags;
2645
1
  master_flags.push_back("--replication_factor=2");
2646
1
  ASSERT_NO_FATALS(BuildAndStart(ts_flags, master_flags));
2647
2648
1
  LOG(INFO) << "Finding tablet leader and waiting for things to start...";
2649
1
  string tablet_id = tablet_replicas_.begin()->first;
2650
2651
  // Determine the list of tablet servers currently in the config.
2652
1
  TabletServerMapUnowned active_tablet_servers;
2653
1
  for (itest::TabletReplicaMap::const_iterator iter = tablet_replicas_.find(tablet_id);
2654
3
       iter != tablet_replicas_.end(); ++iter) {
2655
2
    InsertOrDie(&active_tablet_servers, iter->second->uuid(), iter->second);
2656
2
  }
2657
2658
  // Determine the server to add to the config.
2659
1
  string uuid_to_add;
2660
3
  for (const TabletServerMap::value_type& entry : tablet_servers_) {
2661
3
    if (!ContainsKey(active_tablet_servers, entry.second->uuid())) {
2662
1
      uuid_to_add = entry.second->uuid();
2663
1
    }
2664
3
  }
2665
1
  ASSERT_FALSE(uuid_to_add.empty());
2666
2667
  // Get a baseline config reported to the master.
2668
1
  LOG(INFO) << "Waiting for Master to see the current replicas...";
2669
1
  master::TabletLocationsPB tablet_locations;
2670
1
  bool has_leader;
2671
1
  ASSERT_NO_FATALS(WaitForReplicasReportedToMaster(2, tablet_id, timeout, WAIT_FOR_LEADER,
2672
1
                                            &has_leader, &tablet_locations));
2673
1
  LOG(INFO) << "Tablet locations:\n" << tablet_locations.DebugString();
2674
2675
  // Wait for initial NO_OP to be committed by the leader.
2676
1
  TServerDetails* leader_ts;
2677
1
  ASSERT_OK(FindTabletLeader(tablet_servers_, tablet_id, timeout, &leader_ts));
2678
1
  int64_t expected_index = 0;
2679
1
  ASSERT_OK(WaitForServersToAgree(timeout, active_tablet_servers, tablet_id, 1, &expected_index));
2680
1
  ASSERT_OK(WaitUntilCommittedOpIdIndexIs(expected_index, leader_ts, tablet_id, timeout));
2681
1
  expected_index += 2; // Adding a new peer generates two ChangeConfig requests.
2682
2683
  // Change the config.
2684
1
  TServerDetails* tserver_to_add = tablet_servers_[uuid_to_add].get();
2685
1
  LOG(INFO) << "Adding tserver with uuid " << tserver_to_add->uuid();
2686
1
  ASSERT_OK(AddServer(leader_ts, tablet_id_, tserver_to_add, PeerMemberType::PRE_VOTER, boost::none,
2687
1
                      timeout));
2688
1
  ASSERT_OK(WaitForServersToAgree(timeout, tablet_servers_, tablet_id_, expected_index));
2689
1
  ++expected_index;
2690
2691
  // Wait for the master to be notified of the config change.
2692
  // It should continue to have the same leader, even without waiting.
2693
1
  LOG(INFO) << "Waiting for Master to see config change...";
2694
1
  ASSERT_NO_FATALS(WaitForReplicasReportedToMaster(3, tablet_id, timeout, NO_WAIT_FOR_LEADER,
2695
1
                                            &has_leader, &tablet_locations));
2696
2
  ASSERT_TRUE(has_leader) << tablet_locations.DebugString();
2697
1
  LOG(INFO) << "Tablet locations:\n" << tablet_locations.DebugString();
2698
2699
  // Change the config again.
2700
1
  LOG(INFO) << "Removing tserver with uuid " << tserver_to_add->uuid();
2701
1
  ASSERT_OK(RemoveServer(leader_ts, tablet_id_, tserver_to_add, boost::none, timeout));
2702
1
  active_tablet_servers = CreateTabletServerMapUnowned(tablet_servers_);
2703
1
  ASSERT_EQ(1, active_tablet_servers.erase(tserver_to_add->uuid()));
2704
1
  ASSERT_OK(WaitForServersToAgree(timeout, active_tablet_servers, tablet_id_, expected_index));
2705
2706
  // Wait for the master to be notified of the removal.
2707
1
  LOG(INFO) << "Waiting for Master to see config change...";
2708
1
  ASSERT_NO_FATALS(WaitForReplicasReportedToMaster(2, tablet_id, timeout, NO_WAIT_FOR_LEADER,
2709
1
                                            &has_leader, &tablet_locations));
2710
2
  ASSERT_TRUE(has_leader) << tablet_locations.DebugString();
2711
1
  LOG(INFO) << "Tablet locations:\n" << tablet_locations.DebugString();
2712
1
}
2713
2714
// Test that we can create (vivify) a new tablet via remote bootstrap.
2715
1
TEST_F(RaftConsensusITest, TestAutoCreateReplica) {
2716
1
  FLAGS_num_tablet_servers = 3;
2717
1
  FLAGS_num_replicas = 2;
2718
1
  std::vector<std::string> ts_flags = {
2719
1
    "--enable_leader_failure_detection=false",
2720
1
    "--log_cache_size_limit_mb=1",
2721
1
    "--log_segment_size_mb=1",
2722
1
    "--log_async_preallocate_segments=false",
2723
1
    "--maintenance_manager_polling_interval_ms=300",
2724
1
    Format("--db_write_buffer_size=$0", 256_KB),
2725
1
    "--remote_bootstrap_begin_session_timeout_ms=15000"
2726
1
  };
2727
1
  std::vector<std::string> master_flags = {
2728
1
    "--catalog_manager_wait_for_new_tablets_to_elect_leader=false"s,
2729
1
    "--replication_factor=2"s,
2730
1
    "--use_create_table_leader_hint=false"s,
2731
1
  };
2732
1
  ASSERT_NO_FATALS(BuildAndStart(ts_flags, master_flags));
2733
2734
  // 50K is enough to cause flushes & log rolls.
2735
1
  int num_rows_to_write = 50000;
2736
1
  if (AllowSlowTests()) {
2737
0
    num_rows_to_write = 150000;
2738
0
  }
2739
1
  num_rows_to_write = NonTsanVsTsan(num_rows_to_write, num_rows_to_write / 4);
2740
2741
1
  vector<TServerDetails*> tservers = TServerDetailsVector(tablet_servers_);
2742
1
  ASSERT_EQ(FLAGS_num_tablet_servers, tservers.size());
2743
2744
1
  itest::TabletServerMapUnowned active_tablet_servers;
2745
1
  auto iter = tablet_replicas_.find(tablet_id_);
2746
1
  TServerDetails* leader = iter->second;
2747
1
  TServerDetails* follower = (++iter)->second;
2748
1
  InsertOrDie(&active_tablet_servers, leader->uuid(), leader);
2749
1
  InsertOrDie(&active_tablet_servers, follower->uuid(), follower);
2750
2751
1
  TServerDetails* new_node = nullptr;
2752
1
  for (TServerDetails* ts : tservers) {
2753
1
    if (!ContainsKey(active_tablet_servers, ts->uuid())) {
2754
1
      new_node = ts;
2755
1
      break;
2756
1
    }
2757
1
  }
2758
1
  ASSERT_TRUE(new_node != nullptr);
2759
2760
  // Elect the leader (still only a consensus config size of 2).
2761
1
  ASSERT_OK(StartElection(leader, tablet_id_, MonoDelta::FromSeconds(10)));
2762
1
  ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(30), active_tablet_servers,
2763
1
                                  tablet_id_, 1));
2764
2765
1
  TestWorkload workload(cluster_.get());
2766
1
  workload.set_table_name(kTableName);
2767
1
  workload.set_num_write_threads(10);
2768
1
  workload.set_write_batch_size(100);
2769
1
  workload.set_sequential_write(true);
2770
1
  workload.Setup();
2771
2772
1
  LOG(INFO) << "Starting write workload...";
2773
1
  workload.Start();
2774
2775
1
  while (true) {
2776
0
    auto rows_inserted = workload.rows_inserted();
2777
0
    if (rows_inserted >= num_rows_to_write) {
2778
0
      break;
2779
0
    }
2780
0
    LOG(INFO) << "Only inserted " << rows_inserted << " rows so far, sleeping for 100ms";
2781
0
    SleepFor(MonoDelta::FromMilliseconds(100));
2782
0
  }
2783
2784
1
  LOG(INFO) << "Adding tserver with uuid " << new_node->uuid() << " as VOTER...";
2785
1
  ASSERT_OK(AddServer(leader, tablet_id_, new_node, PeerMemberType::PRE_VOTER, boost::none,
2786
1
                      MonoDelta::FromSeconds(10)));
2787
0
  InsertOrDie(&active_tablet_servers, new_node->uuid(), new_node);
2788
0
  ASSERT_OK(WaitUntilCommittedConfigNumVotersIs(
2789
0
      active_tablet_servers.size(), leader, tablet_id_,
2790
0
      MonoDelta::FromSeconds(NonTsanVsTsan(20, 60))));
2791
2792
0
  workload.StopAndJoin();
2793
0
  auto num_batches = workload.batches_completed();
2794
2795
0
  LOG(INFO) << "Waiting for replicas to agree...";
2796
  // Wait for all servers to replicate everything up through the last write op.
2797
  // Since we don't batch, there should be at least # rows inserted log entries,
2798
  // plus the initial leader's no-op, plus 1 for
2799
  // the added replica for a total == #rows + 2.
2800
0
  auto min_log_index = num_batches + 2;
2801
0
  ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(120),
2802
0
                                  active_tablet_servers, tablet_id_,
2803
0
                                  min_log_index));
2804
2805
0
  auto rows_inserted = workload.rows_inserted();
2806
0
  LOG(INFO) << "Number of rows inserted: " << rows_inserted;
2807
0
  ASSERT_ALL_REPLICAS_AGREE(rows_inserted);
2808
0
}
2809
2810
1
TEST_F(RaftConsensusITest, TestMemoryRemainsConstantDespiteTwoDeadFollowers) {
2811
1
  const int64_t kMinRejections = 100;
2812
1
  const MonoDelta kMaxWaitTime = MonoDelta::FromSeconds(60);
2813
2814
  // Start the cluster with a low per-tablet transaction memory limit, so that
2815
  // the test can complete faster.
2816
1
  std::vector<std:: string> flags = {
2817
1
      "--tablet_operation_memory_limit_mb=2"s
2818
1
  };
2819
2820
1
  ASSERT_NO_FATALS(BuildAndStart(flags));
2821
2822
  // Kill both followers.
2823
1
  TServerDetails* details;
2824
1
  ASSERT_OK(GetLeaderReplicaWithRetries(tablet_id_, &details));
2825
1
  int num_shutdown = 0;
2826
1
  ssize_t leader_ts_idx = -1;
2827
4
  for (size_t i = 0; i < cluster_->num_tablet_servers(); i++) {
2828
3
    ExternalTabletServer* ts = cluster_->tablet_server(i);
2829
3
    if (ts->instance_id().permanent_uuid() != details->uuid()) {
2830
2
      ts->Shutdown();
2831
2
      num_shutdown++;
2832
1
    } else {
2833
1
      leader_ts_idx = i;
2834
1
    }
2835
3
  }
2836
1
  ASSERT_EQ(2, num_shutdown);
2837
1
  ASSERT_NE(-1, leader_ts_idx);
2838
2839
  // Because the majority of the cluster is dead and because of this workload's
2840
  // timeout behavior, more and more wedged transactions will accumulate in the
2841
  // leader. To prevent memory usage from skyrocketing, the leader will
2842
  // eventually reject new transactions. That's what we're testing for here.
2843
1
  TestWorkload workload(cluster_.get());
2844
1
  workload.set_table_name(kTableName);
2845
1
  workload.set_timeout_allowed(true);
2846
1
  workload.set_write_timeout_millis(50);
2847
1
  workload.Setup();
2848
1
  workload.Start();
2849
2850
  // Run until the former leader has rejected several transactions.
2851
1
  MonoTime deadline = MonoTime::Now();
2852
1
  deadline.AddDelta(kMaxWaitTime);
2853
1
  while (true) {
2854
0
    int64_t num_rejections = ASSERT_RESULT(cluster_->tablet_server(leader_ts_idx)->GetInt64Metric(
2855
0
        &METRIC_ENTITY_tablet,
2856
0
        nullptr,
2857
0
        &METRIC_not_leader_rejections,
2858
0
        "value"));
2859
0
    if (num_rejections >= kMinRejections) {
2860
0
      break;
2861
0
    } else if (deadline.ComesBefore(MonoTime::Now())) {
2862
0
      FAIL() << "Ran for " << kMaxWaitTime.ToString() << ", deadline expired";
2863
0
    }
2864
0
    SleepFor(MonoDelta::FromMilliseconds(200));
2865
0
  }
2866
1
}
2867
2868
0
static void EnableLogLatency(server::GenericServiceProxy* proxy) {
2869
0
  typedef unordered_map<string, string> FlagMap;
2870
0
  FlagMap flags;
2871
0
  InsertOrDie(&flags, "log_inject_latency", "true");
2872
0
  InsertOrDie(&flags, "log_inject_latency_ms_mean", "1000");
2873
0
  for (const FlagMap::value_type& e : flags) {
2874
0
    SetFlagRequestPB req;
2875
0
    SetFlagResponsePB resp;
2876
0
    RpcController rpc;
2877
0
    req.set_flag(e.first);
2878
0
    req.set_value(e.second);
2879
0
    ASSERT_OK(proxy->SetFlag(req, &resp, &rpc));
2880
0
  }
2881
0
}
2882
2883
// Check if hinted replica becomes leader.
2884
1
TEST_F(RaftConsensusITest, HintedLeader) {
2885
1
  vector<string> master_flags = {
2886
1
      "--use_create_table_leader_hint=true"s,
2887
1
      "--TEST_create_table_leader_hint_min_lexicographic=true"s,
2888
1
  };
2889
2890
1
  ASSERT_NO_FATALS(BuildAndStart({}, master_flags));
2891
2892
1
  std::string init_leader_hint = "";
2893
3
  for (const TabletServerMap::value_type& entry : tablet_servers_) {
2894
3
    if (init_leader_hint == "" || init_leader_hint > entry.first) {
2895
1
      init_leader_hint = entry.first;
2896
1
    }
2897
3
  }
2898
2899
1
  LOG(INFO) << "hinted replica = " << init_leader_hint;
2900
2901
  // Check that hinted replica became leader.
2902
1
  TServerDetails* leader;
2903
1
  ASSERT_OK(GetLeaderReplicaWithRetries(tablet_id_, &leader));
2904
1
  ASSERT_EQ(init_leader_hint, leader->uuid());
2905
2906
  // Insert 3MB worth of data.
2907
1
  const int kNumWrites = 1000;
2908
1
  ASSERT_NO_FATALS(WriteOpsToLeader(kNumWrites, 3_KB));
2909
1
  ASSERT_OK(WaitForServersToAgree(30s, tablet_servers_, tablet_id_,
2910
1
                                  kNumWrites));
2911
2912
  // Check that no leadership change occurred.
2913
1
  auto op_id = ASSERT_RESULT(GetLastOpIdForReplica(
2914
1
      tablet_id_, leader, consensus::RECEIVED_OPID, 10s));
2915
1
  ASSERT_EQ(op_id.term, 1);
2916
1
}
2917
2918
// Run a regular workload with a leader that's writing to its WAL slowly.
2919
1
TEST_F(RaftConsensusITest, TestSlowLeader) {
2920
1
  if (!AllowSlowTests()) return;
2921
0
  ASSERT_NO_FATALS(BuildAndStart(vector<string>()));
2922
2923
0
  TServerDetails* leader;
2924
0
  ASSERT_OK(GetLeaderReplicaWithRetries(tablet_id_, &leader));
2925
0
  ASSERT_NO_FATALS(EnableLogLatency(leader->generic_proxy.get()));
2926
2927
0
  TestWorkload workload(cluster_.get());
2928
0
  workload.set_table_name(kTableName);
2929
0
  workload.Setup();
2930
0
  workload.Start();
2931
0
  SleepFor(MonoDelta::FromSeconds(60));
2932
0
}
2933
2934
// Run a regular workload with one follower that's writing to its WAL slowly.
2935
1
TEST_F(RaftConsensusITest, TestSlowFollower) {
2936
1
  if (!AllowSlowTests()) return;
2937
0
  ASSERT_NO_FATALS(BuildAndStart(vector<string>()));
2938
2939
0
  TServerDetails* leader;
2940
0
  ASSERT_OK(GetLeaderReplicaWithRetries(tablet_id_, &leader));
2941
0
  int num_reconfigured = 0;
2942
0
  for (size_t i = 0; i < cluster_->num_tablet_servers(); i++) {
2943
0
    ExternalTabletServer* ts = cluster_->tablet_server(i);
2944
0
    if (ts->instance_id().permanent_uuid() != leader->uuid()) {
2945
0
      TServerDetails* follower;
2946
0
      follower = GetReplicaWithUuidOrNull(tablet_id_, ts->instance_id().permanent_uuid());
2947
0
      ASSERT_TRUE(follower);
2948
0
      ASSERT_NO_FATALS(EnableLogLatency(follower->generic_proxy.get()));
2949
0
      num_reconfigured++;
2950
0
      break;
2951
0
    }
2952
0
  }
2953
0
  ASSERT_EQ(1, num_reconfigured);
2954
2955
0
  TestWorkload workload(cluster_.get());
2956
0
  workload.set_table_name(kTableName);
2957
0
  workload.Setup();
2958
0
  workload.Start();
2959
0
  SleepFor(MonoDelta::FromSeconds(60));
2960
0
}
2961
2962
// Run a special workload that constantly updates a single row on a cluster
2963
// where every replica is writing to its WAL slowly.
2964
1
TEST_F(RaftConsensusITest, TestHammerOneRow) {
2965
1
  if (!AllowSlowTests()) return;
2966
0
  ASSERT_NO_FATALS(BuildAndStart(vector<string>()));
2967
2968
0
  for (size_t i = 0; i < cluster_->num_tablet_servers(); i++) {
2969
0
    ExternalTabletServer* ts = cluster_->tablet_server(i);
2970
0
    TServerDetails* follower;
2971
0
    follower = GetReplicaWithUuidOrNull(tablet_id_, ts->instance_id().permanent_uuid());
2972
0
    ASSERT_TRUE(follower);
2973
0
    ASSERT_NO_FATALS(EnableLogLatency(follower->generic_proxy.get()));
2974
0
  }
2975
2976
0
  TestWorkload workload(cluster_.get());
2977
0
  workload.set_table_name(kTableName);
2978
0
  workload.set_pathological_one_row_enabled(true);
2979
0
  workload.set_num_write_threads(20);
2980
0
  workload.Setup();
2981
0
  workload.Start();
2982
0
  SleepFor(MonoDelta::FromSeconds(60));
2983
0
}
2984
2985
// Test that followers that fall behind the leader's log GC threshold are
2986
// evicted from the config.
2987
1
TEST_F(RaftConsensusITest, TestEvictAbandonedFollowers) {
2988
1
  vector<string> ts_flags;
2989
1
  AddFlagsForLogRolls(&ts_flags);  // For CauseFollowerToFallBehindLogGC().
2990
1
  vector<string> master_flags;
2991
1
  LOG(INFO) << __func__ << ": starting the cluster";
2992
1
  ASSERT_NO_FATALS(BuildAndStart(ts_flags, master_flags));
2993
2994
1
  MonoDelta timeout = MonoDelta::FromSeconds(30);
2995
1
  auto active_tablet_servers = CreateTabletServerMapUnowned(tablet_servers_);
2996
1
  ASSERT_EQ(3, active_tablet_servers.size());
2997
2998
1
  string leader_uuid;
2999
1
  int64_t orig_term;
3000
1
  string follower_uuid;
3001
1
  LOG(INFO) << __func__ << ": causing followers to fall behind";
3002
1
  ASSERT_NO_FATALS(CauseFollowerToFallBehindLogGC(&leader_uuid, &orig_term, &follower_uuid));
3003
3004
0
  LOG(INFO) << __func__ << ": waiting for 2 voters in the committed config";
3005
  // Wait for the abandoned follower to be evicted.
3006
0
  ASSERT_OK(WaitUntilCommittedConfigNumVotersIs(2,
3007
0
                                                tablet_servers_[leader_uuid].get(),
3008
0
                                                tablet_id_,
3009
0
                                                timeout));
3010
0
  ASSERT_EQ(1, active_tablet_servers.erase(follower_uuid));
3011
0
  LOG(INFO) << __func__ << ": waiting for servers to agree";
3012
0
  ASSERT_OK(WaitForServersToAgree(timeout, active_tablet_servers, tablet_id_, 2));
3013
0
}
3014
3015
// Test that followers that fall behind the leader's log GC threshold are
3016
// evicted from the config.
3017
1
TEST_F(RaftConsensusITest, TestMasterReplacesEvictedFollowers) {
3018
1
  vector<string> extra_flags;
3019
1
  AddFlagsForLogRolls(&extra_flags);  // For CauseFollowerToFallBehindLogGC().
3020
1
  ASSERT_NO_FATALS(BuildAndStart(extra_flags, {"--enable_load_balancing=true"}));
3021
3022
1
  MonoDelta timeout = MonoDelta::FromSeconds(30);
3023
3024
1
  string leader_uuid;
3025
1
  int64_t orig_term;
3026
1
  string follower_uuid;
3027
1
  ASSERT_NO_FATALS(CauseFollowerToFallBehindLogGC(&leader_uuid, &orig_term, &follower_uuid));
3028
3029
  // The follower will be evicted. Now wait for the master to cause it to be remotely bootstrapped.
3030
0
  ASSERT_OK(WaitForServersToAgree(timeout, tablet_servers_, tablet_id_, 2));
3031
3032
0
  ClusterVerifier cluster_verifier(cluster_.get());
3033
0
  ASSERT_NO_FATALS(cluster_verifier.CheckCluster());
3034
0
  ASSERT_NO_FATALS(cluster_verifier.CheckRowCount(kTableName, ClusterVerifier::AT_LEAST, 1));
3035
0
}
3036
3037
// Test that a ChangeConfig() request is rejected unless the leader has replicated one of its own
3038
// log entries during the current term.  This is required for correctness of Raft config change. For
3039
// details, see https://groups.google.com/forum/#!topic/raft-dev/t4xj6dJTP6E
3040
1
TEST_F(RaftConsensusITest, TestChangeConfigRejectedUnlessNoopReplicated) {
3041
1
  vector<string> ts_flags = { "--enable_leader_failure_detection=false" };
3042
1
  vector<string> master_flags = {
3043
1
    "--catalog_manager_wait_for_new_tablets_to_elect_leader=false"s,
3044
1
    "--use_create_table_leader_hint=false"s,
3045
1
  };
3046
1
  ASSERT_NO_FATALS(BuildAndStart(ts_flags, master_flags));
3047
3048
1
  MonoDelta timeout = MonoDelta::FromSeconds(30);
3049
3050
1
  int kLeaderIndex = 0;
3051
1
  TServerDetails* leader_ts = tablet_servers_[cluster_->tablet_server(kLeaderIndex)->uuid()].get();
3052
3053
  // Prevent followers from accepting UpdateConsensus requests from the leader, even though they
3054
  // will vote. This will allow us to get the distributed system into a state where there is a valid
3055
  // leader (based on winning an election) but that leader will be unable to commit any entries from
3056
  // its own term, making it illegal to accept ChangeConfig() requests.
3057
3
  for (int i = 1; i <= 2; i++) {
3058
2
    ASSERT_OK(cluster_->SetFlag(cluster_->tablet_server(i),
3059
2
              "TEST_follower_reject_update_consensus_requests", "true"));
3060
2
  }
3061
3062
  // Elect the leader.
3063
1
  ASSERT_OK(StartElection(leader_ts, tablet_id_, timeout));
3064
3065
  // We don't need to wait until the leader obtains a lease here. In fact, that will never happen,
3066
  // because the followers are rejecting UpdateConsensus, and the leader needs to majority-replicate
3067
  // a lease expiration that is in the future in order to establish a leader lease.
3068
1
  ASSERT_OK(WaitUntilLeader(leader_ts, tablet_id_, timeout, LeaderLeaseCheckMode::DONT_NEED_LEASE));
3069
  // Now attempt to do a config change. It should be rejected because there have not been any ops
3070
  // (notably the initial NO_OP) from the leader's term that have been committed yet.
3071
1
  Status s = itest::RemoveServer(leader_ts,
3072
1
                                 tablet_id_,
3073
1
                                 tablet_servers_[cluster_->tablet_server(1)->uuid()].get(),
3074
1
                                 boost::none,
3075
1
                                 timeout,
3076
1
                                 nullptr /* error_code */,
3077
1
                                 false /* retry */);
3078
2
  ASSERT_TRUE(s.IsLeaderNotReadyToServe()) << s;
3079
1
  ASSERT_STR_CONTAINS(s.message().ToBuffer(),
3080
1
                      "Leader not yet replicated NoOp to be ready to serve requests");
3081
1
}
3082
3083
1
TEST_F(RaftConsensusITest, TestChangeConfigBasedOnJepsen) {
3084
1
  FLAGS_num_tablet_servers = 4;
3085
1
  vector<string> ts_flags = { "--enable_leader_failure_detection=false",
3086
1
                              "--TEST_log_change_config_every_n=3000" };
3087
1
  vector<string> master_flags = {
3088
1
    "--catalog_manager_wait_for_new_tablets_to_elect_leader=false"s,
3089
1
    "--use_create_table_leader_hint=false"s,
3090
1
  };
3091
1
  ASSERT_NO_FATALS(BuildAndStart(ts_flags, master_flags));
3092
1
  vector<TServerDetails*> tservers = TServerDetailsVector(tablet_servers_);
3093
1
  ASSERT_EQ(FLAGS_num_tablet_servers, tservers.size());
3094
3095
1
  int leader_idx = -1;
3096
1
  int new_node_idx = -1;
3097
5
  for (int i = 0; i < 4; i++) {
3098
4
    auto& uuid = cluster_->tablet_server(i)->uuid();
3099
4
    if (nullptr == GetReplicaWithUuidOrNull(tablet_id_, uuid)) {
3100
1
      new_node_idx = i;
3101
1
      continue;
3102
3
    } else if (leader_idx == -1) {
3103
1
      leader_idx = i;
3104
1
      continue;
3105
1
    }
3106
4
  }
3107
3108
0
  CHECK_NE(new_node_idx, -1) << "Could not find the node not having tablet_id_";
3109
3110
1
  TServerDetails* leader_ts = tablet_servers_[cluster_->tablet_server(leader_idx)->uuid()].get();
3111
1
  MonoDelta timeout = MonoDelta::FromSeconds(30);
3112
1
  ASSERT_OK(StartElection(leader_ts, tablet_id_, timeout));
3113
3114
1
  ASSERT_OK(WaitUntilLeader(leader_ts, tablet_id_, timeout, LeaderLeaseCheckMode::NEED_LEASE));
3115
3116
1
  vector<TServerDetails*> tservers_list(4);
3117
1
  int ts_idx = 0;
3118
1
  tservers_list[ts_idx++] = tablet_servers_[cluster_->tablet_server(leader_idx)->uuid()].get();
3119
5
  for (int i = 0; i < 4; i++) {
3120
4
    if (i == leader_idx || i == new_node_idx) {
3121
2
      continue;
3122
2
    }
3123
2
    ASSERT_OK(cluster_->SetFlag(cluster_->tablet_server(i),
3124
2
                                "TEST_follower_reject_update_consensus_requests", "true"));
3125
2
    tservers_list[ts_idx++] = tablet_servers_[cluster_->tablet_server(i)->uuid()].get();
3126
2
  }
3127
1
  tservers_list[ts_idx++] = tablet_servers_[cluster_->tablet_server(new_node_idx)->uuid()].get();
3128
3129
  // Now attempt to do a config change. It should be rejected because there have not been any ops
3130
  // (notably the initial NO_OP) from the leader's term that have been committed yet.
3131
1
  Status s = itest::AddServer(leader_ts,
3132
1
                              tablet_id_,
3133
1
                              tablet_servers_[cluster_->tablet_server(new_node_idx)->uuid()].get(),
3134
1
                              PeerMemberType::PRE_VOTER,
3135
1
                              boost::none,
3136
1
                              timeout,
3137
1
                              nullptr /* error_code */,
3138
1
                              false /* retry */);
3139
1
  LOG(INFO) << "Got status " << yb::ToString(s);
3140
1
  const double kSleepDelaySec = 50;
3141
1
  SleepFor(MonoDelta::FromSeconds(kSleepDelaySec));
3142
1
  LOG(INFO) << "Done Sleeping";
3143
3144
1
  auto committed_op_ids = ASSERT_RESULT(GetLastOpIdForEachReplica(
3145
1
      tablet_id_, tservers_list, consensus::OpIdType::COMMITTED_OPID, timeout));
3146
1
  auto received_op_ids = ASSERT_RESULT(GetLastOpIdForEachReplica(
3147
1
      tablet_id_, tservers_list, consensus::OpIdType::RECEIVED_OPID, timeout));
3148
3149
5
  for(int i = 0; i < 4; i++) {
3150
4
    LOG(INFO) << "i = " << i << " Peer " << tservers_list[i]->uuid()
3151
4
              << " Committed op id " << yb::ToString(committed_op_ids[i])
3152
4
              << " Last received op id " << yb::ToString(received_op_ids[i]);
3153
4
  }
3154
3155
1
  const OpId kLeaderCommittedOpId = committed_op_ids[0];
3156
1
  int num_voters_who_received_committed_op_id = 0;
3157
4
  for(int i = 0; i < 3; i++) {
3158
3
     if (kLeaderCommittedOpId <= received_op_ids[i]) {
3159
3
        num_voters_who_received_committed_op_id++;
3160
3
     }
3161
3
  }
3162
0
  CHECK_GE(num_voters_who_received_committed_op_id, 2)
3163
0
      << "At least 2 voters should have received the op id";
3164
1
}
3165
3166
// Test that if for some reason none of the transactions can be prepared, that it will come back as
3167
// an error in UpdateConsensus().
3168
1
TEST_F(RaftConsensusITest, TestUpdateConsensusErrorNonePrepared) {
3169
1
  const int kNumOps = 10;
3170
3171
1
  vector<string> ts_flags = {
3172
1
    "--enable_leader_failure_detection=false",
3173
1
  };
3174
1
  vector<string> master_flags = {
3175
1
    "--catalog_manager_wait_for_new_tablets_to_elect_leader=false"s,
3176
1
    "--use_create_table_leader_hint=false"s,
3177
1
  };
3178
1
  ASSERT_NO_FATALS(BuildAndStart(ts_flags, master_flags));
3179
3180
1
  vector<TServerDetails*> tservers = TServerDetailsVector(tablet_servers_);
3181
1
  ASSERT_EQ(3, tservers.size());
3182
3183
  // Shutdown the other servers so they don't get chatty.
3184
1
  cluster_->tablet_server_by_uuid(tservers[1]->uuid())->Shutdown();
3185
1
  cluster_->tablet_server_by_uuid(tservers[2]->uuid())->Shutdown();
3186
3187
  // Configure the first server to fail all on prepare.
3188
1
  TServerDetails *replica_ts = tservers[0];
3189
1
  ASSERT_OK(cluster_->SetFlag(cluster_->tablet_server_by_uuid(replica_ts->uuid()),
3190
1
                "TEST_follower_fail_all_prepare", "true"));
3191
3192
  // Pretend to be the leader and send a request that should return an error.
3193
1
  ConsensusRequestPB req;
3194
1
  ConsensusResponsePB resp;
3195
1
  RpcController rpc;
3196
1
  req.set_dest_uuid(replica_ts->uuid());
3197
1
  req.set_tablet_id(tablet_id_);
3198
1
  req.set_caller_uuid(tservers[2]->instance_id.permanent_uuid());
3199
1
  req.set_caller_term(0);
3200
1
  req.mutable_committed_op_id()->CopyFrom(MakeOpId(0, 0));
3201
1
  req.mutable_preceding_id()->CopyFrom(MakeOpId(0, 0));
3202
11
  for (int i = 0; i < kNumOps; i++) {
3203
10
    AddOp(MakeOpId(0, 1 + i), &req);
3204
10
  }
3205
3206
1
  ASSERT_OK(replica_ts->consensus_proxy->UpdateConsensus(req, &resp, &rpc));
3207
1
  LOG(INFO) << resp.ShortDebugString();
3208
1
  ASSERT_TRUE(resp.status().has_error());
3209
1
  ASSERT_EQ(consensus::ConsensusErrorPB::CANNOT_PREPARE, resp.status().error().code());
3210
1
  ASSERT_STR_CONTAINS(resp.ShortDebugString(), "Could not prepare a single operation");
3211
1
}
3212
3213
1
TEST_F(RaftConsensusITest, TestRemoveTserverFailsWhenVoterInTransition) {
3214
1
  TestRemoveTserverFailsWhenServerInTransition(PeerMemberType::PRE_VOTER);
3215
1
}
3216
3217
1
TEST_F(RaftConsensusITest, TestRemoveTserverFailsWhenObserverInTransition) {
3218
1
  TestRemoveTserverFailsWhenServerInTransition(PeerMemberType::PRE_OBSERVER);
3219
1
}
3220
3221
1
TEST_F(RaftConsensusITest, TestRemovePreObserverServerSucceeds) {
3222
1
  TestRemoveTserverInTransitionSucceeds(PeerMemberType::PRE_VOTER);
3223
1
}
3224
3225
1
TEST_F(RaftConsensusITest, TestRemovePreVoterServerSucceeds) {
3226
1
  TestRemoveTserverInTransitionSucceeds(PeerMemberType::PRE_OBSERVER);
3227
1
}
3228
3229
// A test scenario to verify that a disruptive server doesn't start needless
3230
// elections in case if it takes a long time to replicate Raft transactions
3231
// from leader to follower replicas (e.g., due to slowness in WAL IO ops).
3232
1
TEST_F(RaftConsensusITest, DisruptiveServerAndSlowWAL) {
3233
1
  const MonoDelta kTimeout = MonoDelta::FromSeconds(10);
3234
  // Shorten the heartbeat interval for faster test run time.
3235
1
  const auto kHeartbeatIntervalMs = 200;
3236
1
  const auto kMaxMissedHeartbeatPeriods = 3;
3237
1
  const vector<string> ts_flags {
3238
1
    Substitute("--raft_heartbeat_interval_ms=$0", kHeartbeatIntervalMs),
3239
1
    Substitute("--leader_failure_max_missed_heartbeat_periods=$0",
3240
1
               kMaxMissedHeartbeatPeriods),
3241
1
  };
3242
1
  NO_FATALS(BuildAndStart(ts_flags));
3243
3244
  // Sanity check: this scenario assumes there are 3 tablet servers. The test
3245
  // table is created with RF=FLAGS_num_replicas.
3246
1
  ASSERT_EQ(3, FLAGS_num_replicas);
3247
1
  ASSERT_EQ(3, tablet_servers_.size());
3248
3249
  // A convenience array to access each tablet server as TServerDetails.
3250
1
  vector<TServerDetails*> tservers = TServerDetailsVector(tablet_servers_);
3251
1
  ASSERT_EQ(cluster_->num_tablet_servers(), tservers.size());
3252
3253
  // The leadership might fluctuate before shutting down the third tablet
3254
  // server, so ASSERT_EVENTUALLY() below is for those rare cases.
3255
  //
3256
  // However, after one of the tablet servers is shutdown, the leadership should
3257
  // not fluctuate because:
3258
  //   1) only two voters out of three are alive
3259
  //   2) current leader should not be disturbed by any rogue votes -- that's
3260
  //      the whole premise of this test scenario
3261
  //
3262
  // So, for this scenario the leadership can fluctuate only if significantly
3263
  // delaying or dropping Raft heartbeats sent from leader to follower replicas.
3264
  // However, minicluster components send heartbeats via the loopback interface,
3265
  // so no real network layer that might significantly delay heartbeats
3266
  // is involved. Also, the consensus RPC queues should not overflow
3267
  // in this scenario because the number of consensus RPCs is relatively low.
3268
1
  TServerDetails* leader_tserver = nullptr;
3269
1
  TServerDetails* other_tserver = nullptr;
3270
1
  TServerDetails* shutdown_tserver = nullptr;
3271
1
  ASSERT_EVENTUALLY([&] {
3272
    // This is a clean-up in case of retry.
3273
1
    if (shutdown_tserver) {
3274
1
      auto* ts = cluster_->tablet_server_by_uuid(shutdown_tserver->uuid());
3275
1
      if (ts->IsShutdown()) {
3276
1
        ASSERT_OK(ts->Restart());
3277
1
      }
3278
1
    }
3279
1
    for (size_t idx = 0; idx < cluster_->num_tablet_servers(); ++idx) {
3280
1
      auto* ts = cluster_->tablet_server(idx);
3281
1
      ASSERT_OK(cluster_->SetFlag(ts, "log_inject_latency", "false"));
3282
1
    }
3283
1
    leader_tserver = nullptr;
3284
1
    other_tserver = nullptr;
3285
1
    shutdown_tserver = nullptr;
3286
3287
1
    ASSERT_OK(GetLeaderReplicaWithRetries(tablet_id_, &leader_tserver));
3288
1
    ASSERT_OK(WriteSimpleTestRow(leader_tserver, tablet_id_,
3289
1
                                 0 /* key */, 0 /* int_val */, "" /* string_val */, kTimeout));
3290
1
    auto op_id = ASSERT_RESULT(GetLastOpIdForReplica(
3291
1
        tablet_id_, leader_tserver, consensus::COMMITTED_OPID, kTimeout));
3292
1
    ASSERT_OK(WaitForServersToAgree(kTimeout, tablet_servers_, tablet_id_, op_id.index));
3293
    // Shutdown one tablet server that doesn't host the leader replica of the
3294
    // target tablet and inject WAL latency to others.
3295
1
    for (const auto& server : tservers) {
3296
1
      auto* ts = cluster_->tablet_server_by_uuid(server->uuid());
3297
1
      const bool is_leader = server->uuid() == leader_tserver->uuid();
3298
1
      if (!is_leader && !shutdown_tserver) {
3299
1
        shutdown_tserver = server;
3300
1
        continue;
3301
1
      }
3302
1
      if (!is_leader && !other_tserver) {
3303
1
        other_tserver = server;
3304
1
      }
3305
      // For this scenario it's important to reserve some inactivity intervals
3306
      // for the follower between processing Raft messages from the leader.
3307
      // If a vote request arrives while replica is busy with processing
3308
      // Raft message from leader, it is rejected with 'busy' status before
3309
      // evaluating the vote withholding interval.
3310
1
      const auto mult = is_leader ? 2 : 1;
3311
1
      const auto latency_ms = mult * kHeartbeatIntervalMs * kMaxMissedHeartbeatPeriods;
3312
1
      ASSERT_OK(cluster_->SetFlag(ts, "log_inject_latency_ms_mean",
3313
1
                                  std::to_string(latency_ms)));
3314
1
      ASSERT_OK(cluster_->SetFlag(ts, "log_inject_latency_ms_stddev", "0"));
3315
1
      ASSERT_OK(cluster_->SetFlag(ts, "log_inject_latency", "true"));
3316
1
    }
3317
3318
    // Shutdown the third tablet server.
3319
1
    cluster_->tablet_server_by_uuid(shutdown_tserver->uuid())->Shutdown();
3320
3321
    // Sanity check: make sure the leadership hasn't changed since the leader
3322
    // has been determined.
3323
1
    TServerDetails* current_leader;
3324
1
    ASSERT_OK(GetLeaderReplicaWithRetries(tablet_id_, &current_leader));
3325
1
    ASSERT_EQ(cluster_->tablet_server_index_by_uuid(leader_tserver->uuid()),
3326
1
              cluster_->tablet_server_index_by_uuid(current_leader->uuid()));
3327
1
  });
3328
3329
  // Get the Raft term from the established leader.
3330
1
  consensus::ConsensusStatePB cstate;
3331
1
  ASSERT_OK(GetConsensusState(leader_tserver, tablet_id_,
3332
1
                              consensus::CONSENSUS_CONFIG_COMMITTED, kTimeout, &cstate));
3333
3334
1
  TestWorkload workload(cluster_.get());
3335
1
  workload.set_table_name(kTableName);
3336
1
  workload.set_timeout_allowed(true);
3337
1
  workload.set_num_write_threads(1);
3338
1
  workload.set_write_batch_size(1);
3339
  // Make a 'space' for the artificial vote requests (see below) to arrive
3340
  // while there is no activity on RaftConsensus::Update().
3341
1
  workload.set_write_interval_millis(kHeartbeatIntervalMs);
3342
1
  workload.Setup();
3343
1
  workload.Start();
3344
3345
  // Issue rogue vote requests, imitating a disruptive tablet replica.
3346
1
  const auto& shutdown_server_uuid = shutdown_tserver->uuid();
3347
1
  const auto next_term = cstate.current_term() + 1;
3348
1
  const auto targets = { leader_tserver, other_tserver };
3349
1
  for (auto i = 0; i < 100; ++i) {
3350
0
    SleepFor(MonoDelta::FromMilliseconds(kHeartbeatIntervalMs / 4));
3351
0
    for (const auto* ts : targets) {
3352
0
      auto s = RequestVote(ts, tablet_id_, shutdown_server_uuid,
3353
0
                           next_term, MakeOpId(next_term + i, 0),
3354
0
                           /*ignore_live_leader=*/ false,
3355
0
                           /*is_pre_election=*/ false,
3356
0
                           kTimeout);
3357
      // Neither leader nor follower replica should grant 'yes' vote
3358
      // since the healthy leader is there and doing well, sending Raft
3359
      // transactions to be replicated.
3360
0
      ASSERT_TRUE(s.IsInvalidArgument() || s.IsServiceUnavailable())
3361
0
          << s.ToString();
3362
0
      std::regex pattern(
3363
0
          "("
3364
0
              "because replica is either leader or "
3365
0
              "believes a valid leader to be alive"
3366
0
          "|"
3367
0
              "because replica is already servicing an update "
3368
0
              "from a current leader or another vote"
3369
0
          ")");
3370
0
      ASSERT_TRUE(std::regex_search(s.ToString(), pattern));
3371
0
    }
3372
0
  }
3373
1
}
3374
3375
// Checking that not yet committed split operation is correctly aborted after leader change and
3376
// then new split op id is successfully set on all replicas after retry.
3377
1
TEST_F(RaftConsensusITest, SplitOpId) {
3378
1
  RpcController rpc;
3379
1
  const auto kTimeout = 60s * kTimeMultiplier;
3380
3381
1
  FLAGS_num_tablet_servers = 3;
3382
1
  FLAGS_num_replicas = 3;
3383
1
  vector<string> ts_flags = {
3384
1
    "--enable_leader_failure_detection=false"s,
3385
1
  };
3386
1
  vector<string> master_flags = {
3387
1
    "--catalog_manager_wait_for_new_tablets_to_elect_leader=false"s,
3388
1
    "--use_create_table_leader_hint=false"s,
3389
1
  };
3390
1
  ASSERT_NO_FATALS(BuildAndStart(ts_flags, master_flags));
3391
3392
1
  vector<TServerDetails*> tservers = TServerDetailsVector(tablet_servers_);
3393
1
  ASSERT_EQ(FLAGS_num_tablet_servers, tservers.size());
3394
3395
  // Elect server 0 as leader and wait for log index 1 to propagate to all servers.
3396
1
  TServerDetails* initial_leader = tservers[0];
3397
1
  ASSERT_OK(StartElection(initial_leader, tablet_id_, kTimeout));
3398
1
  ASSERT_OK(WaitForServersToAgree(kTimeout, tablet_servers_, tablet_id_, 1));
3399
1
  ASSERT_OK(WaitUntilCommittedOpIdIndexIs(1, initial_leader, tablet_id_, kTimeout));
3400
3401
1
  LOG(INFO) << "Initial leader: " << initial_leader->uuid();
3402
3403
2
  auto pause_update_consensus = [&](ExternalTabletServer* tserver, bool value) {
3404
2
    return cluster_->SetFlag(
3405
2
        tserver, "TEST_follower_pause_update_consensus_requests", AsString(value));
3406
2
  };
3407
3408
3
  for (auto* tserver : cluster_->tserver_daemons()) {
3409
3
    if (tserver->uuid() != initial_leader->uuid()) {
3410
2
      ASSERT_OK(pause_update_consensus(tserver, true));
3411
2
    }
3412
3
  }
3413
3414
  // Add SPLIT_OP to the leader.
3415
1
  tablet::SplitTabletRequestPB req;
3416
1
  req.set_tablet_id(tablet_id_);
3417
1
  req.set_new_tablet1_id(GenerateObjectId());
3418
1
  req.set_new_tablet2_id(GenerateObjectId());
3419
1
  {
3420
1
    const auto min_hash_code = std::numeric_limits<docdb::DocKeyHash>::max();
3421
1
    const auto max_hash_code = std::numeric_limits<docdb::DocKeyHash>::min();
3422
1
    const auto split_hash_code = (max_hash_code - min_hash_code) / 2 + min_hash_code;
3423
1
    const auto partition_key = PartitionSchema::EncodeMultiColumnHashValue(split_hash_code);
3424
1
    docdb::KeyBytes encoded_doc_key;
3425
1
    docdb::DocKeyEncoderAfterTableIdStep(&encoded_doc_key).Hash(
3426
1
        split_hash_code, std::vector<docdb::PrimitiveValue>());
3427
1
    req.set_split_encoded_key(encoded_doc_key.ToStringBuffer());
3428
1
    req.set_split_partition_key(partition_key);
3429
1
  }
3430
1
  req.set_dest_uuid(initial_leader->uuid());
3431
1
  tserver::SplitTabletResponsePB resp;
3432
3433
1
  LOG(INFO) << "Sending Split RPC to the initial tablet leader";
3434
1
  CountDownLatch split_latch(1);
3435
1
  initial_leader->tserver_admin_proxy->SplitTabletAsync(req, &resp, &rpc, [&split_latch, &resp]() {
3436
1
    LOG(INFO) << "Split RPC response: " << AsString(resp);
3437
1
    split_latch.CountDown();
3438
1
  });
3439
3440
1
  std::vector<OpIdPB> split_op_id_pbs;
3441
1
  std::vector<OpId> split_op_ids;
3442
2
  auto get_split_op_ids = [&]() -> Status {
3443
1
    split_op_ids = VERIFY_RESULT(GetLastOpIdForEachReplica(
3444
1
        tablet_id_, tservers, consensus::OpIdType::RECEIVED_OPID, kTimeout,
3445
1
        consensus::OperationType::SPLIT_OP));
3446
1
    return Status::OK();
3447
2
  };
3448
3449
1
  ASSERT_OK(LoggedWaitFor([&]() -> Result<bool> {
3450
1
    RETURN_NOT_OK(get_split_op_ids());
3451
1
    return split_op_ids[0].term > 0;
3452
1
  }, kTimeout, "Waiting for the initial leader to add SPLIT_OP to Raft log"));
3453
0
  LOG(INFO) << "split_op_ids: " << AsString(split_op_ids);
3454
0
  ASSERT_EQ(split_op_ids[1], OpId());
3455
0
  ASSERT_EQ(split_op_ids[2], OpId());
3456
0
  const auto split_op_id_first_try = split_op_ids[0];
3457
3458
0
  LOG(INFO) << "Stepping down initial tablet leader";
3459
0
  ASSERT_OK(LeaderStepDown(initial_leader, tablet_id_, nullptr, kTimeout));
3460
3461
0
  TServerDetails* new_leader;
3462
0
  ASSERT_OK(FindTabletLeader(tablet_servers_, tablet_id_, kTimeout, &new_leader));
3463
0
  LOG(INFO) << "New leader: " << new_leader->uuid();
3464
3465
0
  ASSERT_OK(LoggedWaitFor(
3466
0
      [&]() -> Result<bool> {
3467
0
        RETURN_NOT_OK(get_split_op_ids());
3468
0
        for (const auto& split_op_id : split_op_ids) {
3469
0
          if (split_op_id != OpId()) {
3470
0
            return false;
3471
0
          }
3472
0
        }
3473
0
        return true;
3474
0
      },
3475
0
      kTimeout, "Waiting for SPLIT_OP to be aborted and split_op_id to be reset on all replicas"));
3476
0
  split_latch.Wait();
3477
3478
0
  LOG(INFO) << "Pause update consensus on the old leader";
3479
0
  for (auto* tserver : cluster_->tserver_daemons()) {
3480
0
    if (tserver->uuid() == initial_leader->uuid()) {
3481
0
      ASSERT_OK(pause_update_consensus(tserver, true));
3482
0
    }
3483
0
  }
3484
3485
0
  LOG(INFO) << "Sending Split RPC to the new tablet leader";
3486
0
  rpc.Reset();
3487
0
  split_latch.Reset(1);
3488
0
  req.set_dest_uuid(new_leader->uuid());
3489
0
  new_leader->tserver_admin_proxy->SplitTabletAsync(req, &resp, &rpc, [&split_latch, &resp]() {
3490
0
    LOG(INFO) << "Split RPC response: " << AsString(resp);
3491
0
    split_latch.CountDown();
3492
0
  });
3493
3494
0
  ASSERT_OK(LoggedWaitFor([&]() -> Result<bool> {
3495
0
    RETURN_NOT_OK(get_split_op_ids());
3496
0
    for (size_t i = 0; i < tservers.size(); ++i) {
3497
0
      if (tservers[i]->uuid() == new_leader->uuid() && split_op_ids[i].index > 0) {
3498
0
        return true;
3499
0
      }
3500
0
    }
3501
0
    return false;
3502
0
  }, kTimeout, "Waiting for the new leader to add SPLIT_OP to Raft log"));
3503
0
  LOG(INFO) << "split_op_ids: " << AsString(split_op_ids);
3504
3505
  // Make sure followers have split_op_id not yet set.
3506
0
  for (size_t i = 0; i < tservers.size(); ++i) {
3507
0
    if (tservers[i]->uuid() != new_leader->uuid()) {
3508
0
      ASSERT_EQ(split_op_ids[i], OpId());
3509
0
    }
3510
0
  }
3511
3512
0
  LOG(INFO) << "Resume update consensus on all replicas";
3513
0
  for (auto* tserver : cluster_->tserver_daemons()) {
3514
0
    ASSERT_OK(pause_update_consensus(tserver, false));
3515
0
  }
3516
3517
0
  ASSERT_OK(LoggedWaitFor([&]() -> Result<bool> {
3518
0
    RETURN_NOT_OK(get_split_op_ids());
3519
0
    for (auto& split_op_id : split_op_ids) {
3520
0
      if (split_op_id == OpId()) {
3521
0
        return false;
3522
0
      }
3523
0
    }
3524
0
    return true;
3525
0
  }, kTimeout, "Waiting for all replicas to add SPLIT_OP to Raft log"));
3526
0
  LOG(INFO) << "split_op_ids: " << AsString(split_op_ids);
3527
3528
0
  for (auto& split_op_id : split_op_ids) {
3529
0
    ASSERT_EQ(split_op_id, split_op_ids[0]);
3530
0
  }
3531
0
  ASSERT_GT(split_op_ids[0].term, split_op_id_first_try.term);
3532
0
  ASSERT_GT(split_op_ids[0].index, split_op_id_first_try.index);
3533
3534
0
  split_latch.Wait();
3535
0
}
3536
3537
}  // namespace tserver
3538
}  // namespace yb