/Users/deen/code/yugabyte-db/src/yb/integration-tests/network_failure-test.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | // |
13 | | |
14 | | #include "yb/client/error.h" |
15 | | #include "yb/client/schema.h" |
16 | | #include "yb/client/session.h" |
17 | | #include "yb/client/table_handle.h" |
18 | | #include "yb/client/yb_op.h" |
19 | | #include "yb/client/yb_table_name.h" |
20 | | |
21 | | #include "yb/integration-tests/mini_cluster.h" |
22 | | #include "yb/integration-tests/yb_mini_cluster_test_base.h" |
23 | | |
24 | | #include "yb/master/master.h" |
25 | | #include "yb/master/mini_master.h" |
26 | | |
27 | | #include "yb/util/async_util.h" |
28 | | #include "yb/util/metrics.h" |
29 | | #include "yb/util/random_util.h" |
30 | | #include "yb/util/test_thread_holder.h" |
31 | | #include "yb/util/test_util.h" |
32 | | #include "yb/util/tsan_util.h" |
33 | | |
34 | | using namespace std::literals; |
35 | | |
36 | | METRIC_DECLARE_histogram(handler_latency_yb_master_MasterClient_GetTabletLocations); |
37 | | |
38 | | DECLARE_int64(meta_cache_lookup_throttling_max_delay_ms); |
39 | | DECLARE_int64(meta_cache_lookup_throttling_step_ms); |
40 | | |
41 | | namespace yb { |
42 | | |
43 | | const std::string kKeyspaceName("ks"); |
44 | | const client::YBTableName kTableName(YQL_DATABASE_CQL, kKeyspaceName, "test"); |
45 | | const std::string kValueColumn("value"); |
46 | | constexpr int kNumTablets = 8; |
47 | | |
48 | | class NetworkFailureTest : public MiniClusterTestWithClient<MiniCluster> { |
49 | | public: |
50 | 0 | virtual ~NetworkFailureTest() = default; |
51 | | |
52 | 1 | void SetUp() override { |
53 | 1 | YBMiniClusterTestBase::SetUp(); |
54 | | |
55 | 1 | auto opts = MiniClusterOptions(); |
56 | 1 | opts.num_tablet_servers = 4; |
57 | 1 | opts.num_masters = 3; |
58 | 1 | cluster_.reset(new MiniCluster(opts)); |
59 | 1 | ASSERT_OK(cluster_->Start()); |
60 | | |
61 | 0 | ASSERT_OK(CreateClient()); |
62 | | |
63 | | // Create a keyspace; |
64 | 0 | ASSERT_OK(client_->CreateNamespace(kKeyspaceName)); |
65 | |
|
66 | 0 | client::YBSchemaBuilder builder; |
67 | 0 | builder.AddColumn("key")->Type(INT32)->NotNull()->HashPrimaryKey(); |
68 | 0 | builder.AddColumn("value")->Type(INT32)->NotNull(); |
69 | |
|
70 | 0 | ASSERT_OK(table_.Create(kTableName, kNumTablets, client_.get(), &builder)); |
71 | | |
72 | | // Cluster verifier is unable to create thread in this setup. |
73 | 0 | DontVerifyClusterBeforeNextTearDown(); |
74 | 0 | } |
75 | | |
76 | | protected: |
77 | | client::TableHandle table_; |
78 | | }; |
79 | | |
80 | 0 | int64_t CountLookups(MiniCluster* cluster) { |
81 | 0 | int64_t result = 0; |
82 | 0 | for (size_t i = 0; i != cluster->num_masters(); ++i) { |
83 | 0 | auto new_leader_master = cluster->mini_master(i); |
84 | 0 | auto histogram = new_leader_master->master()->metric_entity()->FindOrCreateHistogram( |
85 | 0 | &METRIC_handler_latency_yb_master_MasterClient_GetTabletLocations); |
86 | 0 | result += histogram->TotalCount(); |
87 | 0 | } |
88 | 0 | return result; |
89 | 0 | } |
90 | | |
91 | 0 | TEST_F(NetworkFailureTest, DisconnectMasterLeader) { |
92 | 0 | FLAGS_meta_cache_lookup_throttling_max_delay_ms = 10000; |
93 | 0 | FLAGS_meta_cache_lookup_throttling_step_ms = 50; |
94 | |
|
95 | 0 | TestThreadHolder thread_holder; |
96 | |
|
97 | 0 | std::atomic<int> written(0); |
98 | 0 | std::atomic<CoarseTimePoint> prev_report{CoarseTimePoint()}; |
99 | 0 | thread_holder.AddThreadFunctor([ |
100 | 0 | this, &written, &stop_flag = thread_holder.stop_flag(), &prev_report]() { |
101 | 0 | auto session = client_->NewSession(); |
102 | 0 | std::deque<std::future<client::FlushStatus>> futures; |
103 | 0 | std::deque<client::YBOperationPtr> ops; |
104 | |
|
105 | 0 | while (!stop_flag.load()) { |
106 | 0 | while (!futures.empty() && IsReady(futures.front())) { |
107 | 0 | ASSERT_OK(futures.front().get().status); |
108 | 0 | ASSERT_TRUE(ops.front()->succeeded()); |
109 | 0 | futures.pop_front(); |
110 | 0 | ops.pop_front(); |
111 | |
|
112 | 0 | auto new_written = ++written; |
113 | 0 | if (new_written % 4000 == 0) { |
114 | 0 | auto now = CoarseMonoClock::now(); |
115 | 0 | auto old_value = prev_report.exchange(now); |
116 | 0 | LOG(INFO) << "Written: " << new_written << ", time taken: " << MonoDelta(now - old_value); |
117 | 0 | } |
118 | 0 | } |
119 | |
|
120 | 0 | int key = RandomUniformInt<int>(0, std::numeric_limits<int>::max() - 1); |
121 | 0 | int value = RandomUniformInt<int>(0, std::numeric_limits<int>::max() - 1); |
122 | 0 | auto op = table_.NewWriteOp(QLWriteRequestPB::QL_STMT_INSERT); |
123 | 0 | auto* const req = op->mutable_request(); |
124 | 0 | QLAddInt32HashValue(req, key); |
125 | 0 | table_.AddInt32ColumnValue(req, kValueColumn, value); |
126 | 0 | session->Apply(op); |
127 | 0 | futures.push_back(session->FlushFuture()); |
128 | 0 | ops.push_back(op); |
129 | 0 | } |
130 | 0 | }); |
131 | |
|
132 | 0 | while (written.load() <= RegularBuildVsSanitizers(5000, 500)) { |
133 | 0 | std::this_thread::sleep_for(100ms); |
134 | 0 | } |
135 | |
|
136 | 0 | auto old_lookups = CountLookups(cluster_.get()); |
137 | |
|
138 | 0 | auto leader_master_idx = cluster_->LeaderMasterIdx(); |
139 | 0 | LOG(INFO) << "Old leader: " << leader_master_idx; |
140 | 0 | for (size_t i = 0; i != cluster_->num_masters(); ++i) { |
141 | 0 | if (implicit_cast<ssize_t>(i) != leader_master_idx) { |
142 | 0 | ASSERT_OK(BreakConnectivity(cluster_.get(), leader_master_idx, i)); |
143 | 0 | } |
144 | 0 | } |
145 | |
|
146 | 0 | thread_holder.WaitAndStop(10s); |
147 | |
|
148 | 0 | auto new_lookups = CountLookups(cluster_.get()); |
149 | |
|
150 | 0 | LOG(INFO) << "Lookups before: " << old_lookups << ", after: " << new_lookups; |
151 | |
|
152 | 0 | ASSERT_LE(new_lookups, old_lookups + 100); |
153 | 0 | } |
154 | | |
155 | | } // namespace yb |