YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/integration-tests/master-partitioned-test.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include <memory>
15
#include <thread>
16
17
#include <gtest/gtest.h>
18
19
#include "yb/gutil/casts.h"
20
21
#include "yb/client/client.h"
22
#include "yb/client/table.h"
23
#include "yb/client/table_creator.h"
24
25
#include "yb/common/wire_protocol.h"
26
27
#include "yb/consensus/consensus.h"
28
29
#include "yb/fs/fs_manager.h"
30
31
#include "yb/integration-tests/cluster_itest_util.h"
32
#include "yb/integration-tests/mini_cluster.h"
33
#include "yb/integration-tests/yb_mini_cluster_test_base.h"
34
35
#include "yb/master/catalog_manager_if.h"
36
#include "yb/master/master.h"
37
#include "yb/master/master_cluster.proxy.h"
38
#include "yb/master/mini_master.h"
39
40
#include "yb/rpc/messenger.h"
41
#include "yb/rpc/rpc_controller.h"
42
43
#include "yb/tablet/tablet_peer.h"
44
45
#include "yb/util/atomic.h"
46
#include "yb/util/status_log.h"
47
#include "yb/util/test_util.h"
48
#include "yb/util/tsan_util.h"
49
50
using yb::client::YBClient;
51
using yb::client::YBClientBuilder;
52
using yb::client::YBSchema;
53
using yb::client::YBTableCreator;
54
using yb::client::YBTableName;
55
using yb::itest::CreateTabletServerMap;
56
using yb::itest::TabletServerMap;
57
using yb::rpc::Messenger;
58
using yb::rpc::MessengerBuilder;
59
using yb::rpc::RpcController;
60
61
DECLARE_int32(heartbeat_interval_ms);
62
DECLARE_bool(log_preallocate_segments);
63
DECLARE_bool(TEST_log_consider_all_ops_safe);
64
DECLARE_bool(TEST_enable_remote_bootstrap);
65
DECLARE_bool(use_preelection);
66
DECLARE_int32(leader_failure_exp_backoff_max_delta_ms);
67
DECLARE_int32(tserver_unresponsive_timeout_ms);
68
DECLARE_int32(raft_heartbeat_interval_ms);
69
DECLARE_int32(TEST_slowdown_master_async_rpc_tasks_by_ms);
70
DECLARE_int32(unresponsive_ts_rpc_timeout_ms);
71
72
DEFINE_int32(num_test_tablets, 60, "Number of tablets for stress test");
73
74
using std::string;
75
using std::vector;
76
using std::thread;
77
using std::unique_ptr;
78
using strings::Substitute;
79
80
namespace yb {
81
82
class MasterPartitionedTest : public YBMiniClusterTestBase<MiniCluster> {
83
 public:
84
2
  MasterPartitionedTest() {}
85
86
2
  void SetUp() override {
87
    // Make heartbeats faster to speed test runtime.
88
2
    FLAGS_heartbeat_interval_ms = kTimeMultiplier * 10;
89
2
    FLAGS_raft_heartbeat_interval_ms = kTimeMultiplier * 200;
90
2
    FLAGS_unresponsive_ts_rpc_timeout_ms = 10000;  // 10 sec.
91
92
2
    FLAGS_leader_failure_exp_backoff_max_delta_ms = 5000;
93
2
    FLAGS_TEST_slowdown_master_async_rpc_tasks_by_ms = 100;
94
95
2
    FLAGS_TEST_log_consider_all_ops_safe = true;
96
2
    FLAGS_num_test_tablets = RegularBuildVsSanitizers(60, 10);
97
98
2
    YBMiniClusterTestBase::SetUp();
99
2
    MiniClusterOptions opts;
100
2
    opts.num_tablet_servers = num_tservers_;
101
2
    opts.num_masters = 3;
102
2
    cluster_.reset(new MiniCluster(opts));
103
2
    ASSERT_OK(cluster_->Start());
104
105
0
    ASSERT_OK(cluster_->WaitForTabletServerCount(opts.num_tablet_servers));
106
0
    client_ = ASSERT_RESULT(YBClientBuilder()
107
0
        .add_master_server_addr(cluster_->mini_master(0)->bound_rpc_addr_str())
108
0
        .add_master_server_addr(cluster_->mini_master(1)->bound_rpc_addr_str())
109
0
        .add_master_server_addr(cluster_->mini_master(2)->bound_rpc_addr_str())
110
0
        .Build());
111
0
  }
112
113
0
  Status BreakMasterConnectivityTo(size_t from_idx, size_t to_idx) {
114
0
    master::MiniMaster* src_master = cluster_->mini_master(from_idx);
115
0
    IpAddress src = VERIFY_RESULT(HostToAddress(src_master->bound_rpc_addr().host()));
116
    // TEST_RpcAddress is 1-indexed; we expect from_idx/to_idx to be 0-indexed.
117
0
    auto dst_prv = CHECK_RESULT(HostToAddress(TEST_RpcAddress(to_idx + 1, server::Private::kTrue)));
118
0
    auto dst_pub =
119
0
        CHECK_RESULT(HostToAddress(TEST_RpcAddress(to_idx + 1, server::Private::kFalse)));
120
0
    LOG(INFO) << "Breaking connectivities from master " << from_idx << " to " << to_idx << " i.e. "
121
0
              << src << " to " << dst_prv << " and " << dst_pub;
122
0
    src_master->messenger().BreakConnectivityTo(dst_prv);
123
0
    src_master->messenger().BreakConnectivityTo(dst_pub);
124
0
    return Status::OK();
125
0
  }
126
127
0
  Status RestoreMasterConnectivityTo(size_t from_idx, size_t to_idx) {
128
0
    master::MiniMaster* src_master = cluster_->mini_master(from_idx);
129
0
    IpAddress src = VERIFY_RESULT(HostToAddress(src_master->bound_rpc_addr().host()));
130
    // TEST_RpcAddress is 1-indexed; we expect from_idx/to_idx to be 0-indexed.
131
0
    auto dst_prv = CHECK_RESULT(HostToAddress(TEST_RpcAddress(to_idx + 1, server::Private::kTrue)));
132
0
    auto dst_pub =
133
0
        CHECK_RESULT(HostToAddress(TEST_RpcAddress(to_idx + 1, server::Private::kFalse)));
134
0
    LOG(INFO) << "Restoring connectivities from master " << from_idx << " to " << to_idx << " i.e. "
135
0
              << src << " to " << dst_prv << " and " << dst_pub;
136
0
    src_master->messenger().RestoreConnectivityTo(dst_prv);
137
0
    src_master->messenger().RestoreConnectivityTo(dst_pub);
138
0
    return Status::OK();
139
0
  }
140
141
0
  void DoTearDown() override {
142
0
    client_.reset();
143
0
    SetAtomicFlag(0, &FLAGS_TEST_slowdown_master_async_rpc_tasks_by_ms);
144
0
    SleepFor(MonoDelta::FromMilliseconds(1000));
145
0
    cluster_->Shutdown();
146
0
  }
147
148
  void CreateTable(const YBTableName& table_name, int num_tablets);
149
150
 protected:
151
  std::unique_ptr<YBClient> client_;
152
  int32_t num_tservers_ = 5;
153
};
154
155
0
void MasterPartitionedTest::CreateTable(const YBTableName& table_name, int num_tablets) {
156
0
  ASSERT_OK(client_->CreateNamespaceIfNotExists(table_name.namespace_name(),
157
0
                                                table_name.namespace_type()));
158
0
  std::unique_ptr<YBTableCreator> table_creator(client_->NewTableCreator());
159
0
  master::ReplicationInfoPB replication_info;
160
0
  replication_info.mutable_live_replicas()->set_num_replicas(3);
161
0
  ASSERT_OK(table_creator->table_name(table_name)
162
0
                .table_type(client::YBTableType::REDIS_TABLE_TYPE)
163
0
                .num_tablets(num_tablets)
164
0
                .wait(false)
165
0
                .Create());
166
0
}
167
168
0
OpId LastReceivedOpId(master::MiniMaster* master) {
169
0
  auto consensus = master->tablet_peer()->consensus();
170
0
  return consensus->GetLastReceivedOpId();
171
0
}
172
173
0
TEST_F(MasterPartitionedTest, CauseMasterLeaderStepdownWithTasksInProgress) {
174
0
  const auto kTimeout = 60s;
175
176
0
  DontVerifyClusterBeforeNextTearDown();
177
178
0
  auto master_0_is_leader = [this]() {
179
0
    auto l = cluster_->GetLeaderMiniMaster();
180
0
    return l.ok() && (*l)->permanent_uuid() == cluster_->mini_master(0)->permanent_uuid();
181
0
  };
182
183
  // Break connectivity so that :
184
  //   master 0 can make outgoing RPCs to 1 and 2.
185
  //   but 1 and 2 cannot do Outgoing rpcs.
186
  // This should result in master 0 becoming the leader.
187
  //   Network topology:  1 <-- 0 --> 2
188
0
  std::vector<std::pair<int, int>> break_connectivity = {{1, 0}, {1, 2}, {2, 1}, {2, 0}};
189
0
  bool connectivity_broken = false;
190
191
0
  ASSERT_OK(WaitFor(
192
0
      [this, &master_0_is_leader, &break_connectivity, &connectivity_broken]() -> Result<bool> {
193
0
    auto leader_mini_master = cluster_->GetLeaderMiniMaster();
194
0
    if (!leader_mini_master.ok()) {
195
0
      return false;
196
0
    }
197
0
    if (LastReceivedOpId(*leader_mini_master) != LastReceivedOpId(cluster_->mini_master(0))) {
198
0
      if (connectivity_broken) {
199
0
        for (const auto& p : break_connectivity) {
200
0
          RETURN_NOT_OK(RestoreMasterConnectivityTo(p.first, p.second));
201
0
        }
202
0
        connectivity_broken = false;
203
0
      }
204
0
      return false;
205
0
    }
206
207
0
    if (!connectivity_broken) {
208
0
      for (const auto& p : break_connectivity) {
209
0
        RETURN_NOT_OK(BreakMasterConnectivityTo(p.first, p.second));
210
0
      }
211
0
      connectivity_broken = true;
212
0
    }
213
214
0
    LOG(INFO) << "Master 0: " << cluster_->mini_master(0)->permanent_uuid();
215
216
0
    if (master_0_is_leader()) {
217
0
      return true;
218
0
    }
219
220
0
    return false;
221
0
  }, kTimeout, "Master 0 is leader"));
222
223
0
  ASSERT_OK(WaitFor(
224
0
      [this]() { return cluster_->WaitForTabletServerCount(num_tservers_).ok(); },
225
0
      kTimeout,
226
0
      "Wait for master 0 to hear from all tservers"));
227
228
0
  YBTableName table_name(YQL_DATABASE_REDIS, "my_keyspace", "test_table");
229
0
  ASSERT_NO_FATALS(CreateTable(table_name, FLAGS_num_test_tablets));
230
0
  LOG(INFO) << "Created table successfully!";
231
232
0
  constexpr int kNumLoops = 3;
233
0
  for (int i = 0; i < kNumLoops; i++) {
234
0
    LOG(INFO) << "Iteration " << i;
235
236
    // This test was added during Jepsen/CQL testing before preelections
237
    // were implemented. Enabling preelections will prevent us from getting
238
    // into the case that we want to test -- where the master leader has to
239
    // step down because it sees that another master has moved on to a higher
240
    // term.
241
0
    FLAGS_use_preelection = false;
242
243
0
    consensus::ConsensusStatePB cpb;
244
0
    ASSERT_OK(cluster_->mini_master(0)->catalog_manager().GetCurrentConfig(&cpb));
245
0
    const auto initial_term = cpb.current_term();
246
247
    // master-0 cannot send updates to master 2. This will cause master-2
248
    // to increase its term. And cause the leader (master-0) to step down
249
    // and re-elect himself
250
0
    ASSERT_OK(BreakMasterConnectivityTo(0, 2));
251
0
    ASSERT_OK(WaitFor(
252
0
        [this, initial_term]() {
253
0
          consensus::ConsensusStatePB cpb;
254
0
          return cluster_->mini_master(2)->catalog_manager().GetCurrentConfig(&cpb).ok() &&
255
0
                 cpb.current_term() > initial_term;
256
0
        },
257
0
        kTimeout,
258
0
        "Wait for master 2 to do elections and increase the term"));
259
260
0
    ASSERT_OK(RestoreMasterConnectivityTo(0, 2));
261
262
0
    ASSERT_OK(cluster_->mini_master(2)->catalog_manager().GetCurrentConfig(&cpb));
263
0
    const auto new_term = cpb.current_term();
264
0
    ASSERT_OK(WaitFor(
265
0
        [this, new_term]() {
266
0
          consensus::ConsensusStatePB cpb;
267
0
          return cluster_->mini_master(0)->catalog_manager().GetCurrentConfig(&cpb).ok() &&
268
0
                 cpb.current_term() > new_term;
269
0
        },
270
0
        kTimeout,
271
0
        "Wait for master 0 to update its term"));
272
273
0
    FLAGS_use_preelection = true;
274
275
0
    ASSERT_OK(WaitFor(
276
0
        master_0_is_leader, kTimeout, "Wait for master 0 to become the leader again"));
277
0
  }
278
0
}
279
280
0
TEST_F(MasterPartitionedTest, VerifyOldLeaderStepsDown) {
281
  // Partition away the old master leader from the cluster.
282
0
  auto old_leader_idx = cluster_->LeaderMasterIdx();
283
0
  LOG(INFO) << "Old leader master: " << old_leader_idx;
284
285
0
  ssize_t new_cohort_peer1 = -1, new_cohort_peer2 = -1;
286
0
  for (size_t i = 0; i < cluster_->num_masters(); i++) {
287
0
    if (implicit_cast<ssize_t>(i) == old_leader_idx) {
288
0
      continue;
289
0
    }
290
0
    if (new_cohort_peer1 == -1) {
291
0
      new_cohort_peer1 = i;
292
0
    } else if (new_cohort_peer2 == -1) {
293
0
      new_cohort_peer2 = i;
294
0
    }
295
0
    LOG(INFO) << "Breaking connectivity between " << i << " and " << old_leader_idx;
296
0
    ASSERT_OK(BreakMasterConnectivityTo(old_leader_idx, i));
297
0
    ASSERT_OK(BreakMasterConnectivityTo(i, old_leader_idx));
298
0
  }
299
300
0
  LOG(INFO) << "Introduced a network split. Cohort#1 masters: " << old_leader_idx
301
0
            << ", cohort#2 masters: " << new_cohort_peer1 << ", " << new_cohort_peer2;
302
303
  // Wait for a master leader in the new cohort.
304
0
  ASSERT_OK(WaitFor(
305
0
    [&]() -> Result<bool> {
306
      // Get the config of the old leader.
307
0
      consensus::ConsensusStatePB cbp, cbp1, cbp2;
308
0
      RETURN_NOT_OK(
309
0
          cluster_->mini_master(old_leader_idx)->catalog_manager().GetCurrentConfig(&cbp));
310
311
      // Get the config of the new cluster.
312
0
      RETURN_NOT_OK(
313
0
          cluster_->mini_master(new_cohort_peer1)->catalog_manager().GetCurrentConfig(&cbp1));
314
315
0
      RETURN_NOT_OK(
316
0
          cluster_->mini_master(new_cohort_peer2)->catalog_manager().GetCurrentConfig(&cbp2));
317
318
      // Term number of the new cohort's config should increase.
319
      // Leader should not be the same as the old leader.
320
0
      return cbp1.current_term() == cbp2.current_term() &&
321
0
             cbp1.current_term() > cbp.current_term() &&
322
0
             cbp1.has_leader_uuid() && cbp1.leader_uuid() != cbp.leader_uuid() &&
323
0
             cbp2.has_leader_uuid() && cbp2.leader_uuid() == cbp1.leader_uuid();
324
0
    },
325
0
    100s,
326
0
    "Waiting for a leader on the new cohort."
327
0
  ));
328
329
  // Get the index of the new leader.
330
0
  string uuid1 = cluster_->mini_master(new_cohort_peer1)->master()->fs_manager()->uuid();
331
0
  string uuid2 = cluster_->mini_master(new_cohort_peer2)->master()->fs_manager()->uuid();
332
333
0
  consensus::ConsensusStatePB cbp1;
334
0
  ASSERT_OK(cluster_->mini_master(new_cohort_peer1)
335
0
                    ->master()
336
0
                    ->catalog_manager()
337
0
                    ->GetCurrentConfig(&cbp1));
338
339
0
  ssize_t new_leader_idx = -1;
340
0
  if (cbp1.leader_uuid() == uuid1) {
341
0
    new_leader_idx = new_cohort_peer1;
342
0
  } else if (cbp1.leader_uuid() == uuid2) {
343
0
    new_leader_idx = new_cohort_peer2;
344
0
  }
345
0
  LOG(INFO) << "Leader of the new cohort " << new_leader_idx;
346
347
  // Wait for the leader lease to expire on the new master.
348
0
  ASSERT_OK(cluster_->mini_master(new_leader_idx)->catalog_manager().WaitUntilCaughtUpAsLeader(
349
0
      MonoDelta::FromSeconds(100)));
350
351
  // Now perform an RPC that involves a SHARED_LEADER_LOCK and confirm that it fails.
352
0
  yb::master::Master* m = cluster_->mini_master(old_leader_idx)->master();
353
0
  master::MasterClusterProxy proxy(&m->proxy_cache(), m->rpc_server()->GetRpcHostPort()[0]);
354
355
0
  RpcController controller;
356
0
  controller.Reset();
357
0
  master::ListTabletServersRequestPB req;
358
0
  master::ListTabletServersResponsePB resp;
359
360
0
  ASSERT_OK(proxy.ListTabletServers(req, &resp, &controller));
361
0
  ASSERT_TRUE(resp.has_error());
362
0
  ASSERT_EQ(resp.error().code(), master::MasterErrorPB::NOT_THE_LEADER);
363
0
  ASSERT_EQ(resp.error().status().code(), AppStatusPB::LEADER_HAS_NO_LEASE);
364
365
  // Restore connectivity.
366
0
  ASSERT_OK(RestoreMasterConnectivityTo(old_leader_idx, new_cohort_peer1));
367
0
  ASSERT_OK(RestoreMasterConnectivityTo(old_leader_idx, new_cohort_peer2));
368
0
  ASSERT_OK(RestoreMasterConnectivityTo(new_cohort_peer1, old_leader_idx));
369
0
  ASSERT_OK(RestoreMasterConnectivityTo(new_cohort_peer2, old_leader_idx));
370
0
}
371
372
}  // namespace yb