YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/integration-tests/network_failure-test.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include "yb/client/error.h"
15
#include "yb/client/schema.h"
16
#include "yb/client/session.h"
17
#include "yb/client/table_handle.h"
18
#include "yb/client/yb_op.h"
19
#include "yb/client/yb_table_name.h"
20
21
#include "yb/integration-tests/mini_cluster.h"
22
#include "yb/integration-tests/yb_mini_cluster_test_base.h"
23
24
#include "yb/master/master.h"
25
#include "yb/master/mini_master.h"
26
27
#include "yb/util/async_util.h"
28
#include "yb/util/metrics.h"
29
#include "yb/util/random_util.h"
30
#include "yb/util/test_thread_holder.h"
31
#include "yb/util/test_util.h"
32
#include "yb/util/tsan_util.h"
33
34
using namespace std::literals;
35
36
METRIC_DECLARE_histogram(handler_latency_yb_master_MasterClient_GetTabletLocations);
37
38
DECLARE_int64(meta_cache_lookup_throttling_max_delay_ms);
39
DECLARE_int64(meta_cache_lookup_throttling_step_ms);
40
41
namespace yb {
42
43
const std::string kKeyspaceName("ks");
44
const client::YBTableName kTableName(YQL_DATABASE_CQL, kKeyspaceName, "test");
45
const std::string kValueColumn("value");
46
constexpr int kNumTablets = 8;
47
48
class NetworkFailureTest : public MiniClusterTestWithClient<MiniCluster> {
49
 public:
50
0
  virtual ~NetworkFailureTest() = default;
51
52
1
  void SetUp() override {
53
1
    YBMiniClusterTestBase::SetUp();
54
55
1
    auto opts = MiniClusterOptions();
56
1
    opts.num_tablet_servers = 4;
57
1
    opts.num_masters = 3;
58
1
    cluster_.reset(new MiniCluster(opts));
59
1
    ASSERT_OK(cluster_->Start());
60
61
0
    ASSERT_OK(CreateClient());
62
63
    // Create a keyspace;
64
0
    ASSERT_OK(client_->CreateNamespace(kKeyspaceName));
65
66
0
    client::YBSchemaBuilder builder;
67
0
    builder.AddColumn("key")->Type(INT32)->NotNull()->HashPrimaryKey();
68
0
    builder.AddColumn("value")->Type(INT32)->NotNull();
69
70
0
    ASSERT_OK(table_.Create(kTableName, kNumTablets, client_.get(), &builder));
71
72
    // Cluster verifier is unable to create thread in this setup.
73
0
    DontVerifyClusterBeforeNextTearDown();
74
0
  }
75
76
 protected:
77
  client::TableHandle table_;
78
};
79
80
0
int64_t CountLookups(MiniCluster* cluster) {
81
0
  int64_t result = 0;
82
0
  for (size_t i = 0; i != cluster->num_masters(); ++i) {
83
0
    auto new_leader_master = cluster->mini_master(i);
84
0
    auto histogram = new_leader_master->master()->metric_entity()->FindOrCreateHistogram(
85
0
        &METRIC_handler_latency_yb_master_MasterClient_GetTabletLocations);
86
0
    result += histogram->TotalCount();
87
0
  }
88
0
  return result;
89
0
}
90
91
0
TEST_F(NetworkFailureTest, DisconnectMasterLeader) {
92
0
  FLAGS_meta_cache_lookup_throttling_max_delay_ms = 10000;
93
0
  FLAGS_meta_cache_lookup_throttling_step_ms = 50;
94
95
0
  TestThreadHolder thread_holder;
96
97
0
  std::atomic<int> written(0);
98
0
  std::atomic<CoarseTimePoint> prev_report{CoarseTimePoint()};
99
0
  thread_holder.AddThreadFunctor([
100
0
      this, &written, &stop_flag = thread_holder.stop_flag(), &prev_report]() {
101
0
    auto session = client_->NewSession();
102
0
    std::deque<std::future<client::FlushStatus>> futures;
103
0
    std::deque<client::YBOperationPtr> ops;
104
105
0
    while (!stop_flag.load()) {
106
0
      while (!futures.empty() && IsReady(futures.front())) {
107
0
        ASSERT_OK(futures.front().get().status);
108
0
        ASSERT_TRUE(ops.front()->succeeded());
109
0
        futures.pop_front();
110
0
        ops.pop_front();
111
112
0
        auto new_written = ++written;
113
0
        if (new_written % 4000 == 0) {
114
0
          auto now = CoarseMonoClock::now();
115
0
          auto old_value = prev_report.exchange(now);
116
0
          LOG(INFO) << "Written: " << new_written << ", time taken: " << MonoDelta(now - old_value);
117
0
        }
118
0
      }
119
120
0
      int key = RandomUniformInt<int>(0, std::numeric_limits<int>::max() - 1);
121
0
      int value = RandomUniformInt<int>(0, std::numeric_limits<int>::max() - 1);
122
0
      auto op = table_.NewWriteOp(QLWriteRequestPB::QL_STMT_INSERT);
123
0
      auto* const req = op->mutable_request();
124
0
      QLAddInt32HashValue(req, key);
125
0
      table_.AddInt32ColumnValue(req, kValueColumn, value);
126
0
      session->Apply(op);
127
0
      futures.push_back(session->FlushFuture());
128
0
      ops.push_back(op);
129
0
    }
130
0
  });
131
132
0
  while (written.load() <= RegularBuildVsSanitizers(5000, 500)) {
133
0
    std::this_thread::sleep_for(100ms);
134
0
  }
135
136
0
  auto old_lookups = CountLookups(cluster_.get());
137
138
0
  auto leader_master_idx = cluster_->LeaderMasterIdx();
139
0
  LOG(INFO) << "Old leader: " << leader_master_idx;
140
0
  for (size_t i = 0; i != cluster_->num_masters(); ++i) {
141
0
    if (implicit_cast<ssize_t>(i) != leader_master_idx) {
142
0
      ASSERT_OK(BreakConnectivity(cluster_.get(), leader_master_idx, i));
143
0
    }
144
0
  }
145
146
0
  thread_holder.WaitAndStop(10s);
147
148
0
  auto new_lookups = CountLookups(cluster_.get());
149
150
0
  LOG(INFO) << "Lookups before: " << old_lookups << ", after: " << new_lookups;
151
152
0
  ASSERT_LE(new_lookups, old_lookups + 100);
153
0
}
154
155
} // namespace yb