/Users/deen/code/yugabyte-db/src/yb/integration-tests/kv_table-test.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | // |
13 | | |
14 | | #include <atomic> |
15 | | #include <cmath> |
16 | | #include <cstdlib> |
17 | | #include <future> |
18 | | |
19 | | #include <gflags/gflags.h> |
20 | | #include <glog/logging.h> |
21 | | |
22 | | #include "yb/client/callbacks.h" |
23 | | #include "yb/client/client.h" |
24 | | #include "yb/client/table.h" |
25 | | #include "yb/client/tablet_server.h" |
26 | | |
27 | | #include "yb/gutil/strings/split.h" |
28 | | #include "yb/gutil/strings/substitute.h" |
29 | | |
30 | | #include "yb/integration-tests/cluster_verifier.h" |
31 | | #include "yb/integration-tests/load_generator.h" |
32 | | #include "yb/integration-tests/mini_cluster.h" |
33 | | #include "yb/integration-tests/yb_table_test_base.h" |
34 | | |
35 | | #include "yb/master/mini_master.h" |
36 | | |
37 | | #include "yb/tserver/mini_tablet_server.h" |
38 | | |
39 | | #include "yb/util/size_literals.h" |
40 | | #include "yb/util/test_macros.h" |
41 | | #include "yb/util/test_thread_holder.h" |
42 | | #include "yb/util/test_util.h" |
43 | | |
44 | | using namespace std::literals; |
45 | | |
46 | | using std::string; |
47 | | using std::vector; |
48 | | using std::unique_ptr; |
49 | | |
50 | | using yb::client::YBValue; |
51 | | |
52 | | using std::shared_ptr; |
53 | | |
54 | | DECLARE_int32(log_cache_size_limit_mb); |
55 | | DECLARE_int32(global_log_cache_size_limit_mb); |
56 | | |
57 | | namespace yb { |
58 | | namespace integration_tests { |
59 | | |
60 | | using client::YBClient; |
61 | | using client::YBClientBuilder; |
62 | | using client::YBColumnSchema; |
63 | | using client::YBSchema; |
64 | | using client::YBSchemaBuilder; |
65 | | using client::YBSession; |
66 | | using client::YBStatusMemberCallback; |
67 | | using client::YBTable; |
68 | | using client::YBTableCreator; |
69 | | using strings::Split; |
70 | | |
71 | | class KVTableTest : public YBTableTestBase { |
72 | | protected: |
73 | | |
74 | 10 | bool use_external_mini_cluster() override { return false; } |
75 | | |
76 | | protected: |
77 | | |
78 | 0 | void PutSampleKeysValues() { |
79 | 0 | PutKeyValue("key123", "value123"); |
80 | 0 | PutKeyValue("key200", "value200"); |
81 | 0 | PutKeyValue("key300", "value300"); |
82 | 0 | } |
83 | | |
84 | | void CheckSampleKeysValues() { |
85 | | auto result_kvs = GetScanResults(client::TableRange(table_)); |
86 | | |
87 | | ASSERT_EQ(3, result_kvs.size()); |
88 | | ASSERT_EQ("key123", result_kvs.front().first); |
89 | | ASSERT_EQ("value123", result_kvs.front().second); |
90 | | ASSERT_EQ("key200", result_kvs[1].first); |
91 | | ASSERT_EQ("value200", result_kvs[1].second); |
92 | | ASSERT_EQ("key300", result_kvs[2].first); |
93 | | ASSERT_EQ("value300", result_kvs[2].second); |
94 | | } |
95 | | |
96 | | }; |
97 | | |
98 | | TEST_F(KVTableTest, SimpleKVTableTest) { |
99 | | ASSERT_NO_FATALS(PutSampleKeysValues()); |
100 | | ASSERT_NO_FATALS(CheckSampleKeysValues()); |
101 | | ClusterVerifier cluster_verifier(mini_cluster()); |
102 | | ASSERT_NO_FATALS(cluster_verifier.CheckCluster()); |
103 | | ASSERT_NO_FATALS(cluster_verifier.CheckRowCount(table_->name(), ClusterVerifier::EXACTLY, 3)); |
104 | | } |
105 | | |
106 | | TEST_F(KVTableTest, PointQuery) { |
107 | | ASSERT_NO_FATALS(PutSampleKeysValues()); |
108 | | |
109 | | client::TableIteratorOptions options; |
110 | | options.filter = client::FilterEqual("key200"s, "k"s); |
111 | | auto result_kvs = GetScanResults(client::TableRange(table_, options)); |
112 | | ASSERT_EQ(1, result_kvs.size()); |
113 | | ASSERT_EQ("key200", result_kvs.front().first); |
114 | | ASSERT_EQ("value200", result_kvs.front().second); |
115 | | } |
116 | | |
117 | | TEST_F(KVTableTest, Eng135MetricsTest) { |
118 | | ClusterVerifier cluster_verifier(mini_cluster()); |
119 | | for (int idx = 0; idx < 10; ++idx) { |
120 | | ASSERT_NO_FATALS(PutSampleKeysValues()); |
121 | | ASSERT_NO_FATALS(CheckSampleKeysValues()); |
122 | | ASSERT_NO_FATALS(DeleteTable()); |
123 | | ASSERT_NO_FATALS(CreateTable()); |
124 | | ASSERT_NO_FATALS(OpenTable()); |
125 | | ASSERT_NO_FATALS(cluster_verifier.CheckCluster()); |
126 | | ASSERT_NO_FATALS(cluster_verifier.CheckRowCount(table_->name(), ClusterVerifier::EXACTLY, 0)); |
127 | | } |
128 | | } |
129 | | |
130 | | TEST_F(KVTableTest, LoadTest) { |
131 | | std::atomic_bool stop_requested_flag(false); |
132 | | int rows = 5000; |
133 | | int start_key = 0; |
134 | | int writer_threads = 4; |
135 | | int reader_threads = 4; |
136 | | int value_size_bytes = 16; |
137 | | int max_write_errors = 0; |
138 | | int max_read_errors = 0; |
139 | | |
140 | | // Create two separate clients for read and writes. |
141 | | auto write_client = CreateYBClient(); |
142 | | auto read_client = CreateYBClient(); |
143 | | yb::load_generator::YBSessionFactory write_session_factory(write_client.get(), &table_); |
144 | | yb::load_generator::YBSessionFactory read_session_factory(read_client.get(), &table_); |
145 | | |
146 | | yb::load_generator::MultiThreadedWriter writer(rows, start_key, writer_threads, |
147 | | &write_session_factory, &stop_requested_flag, |
148 | | value_size_bytes, max_write_errors); |
149 | | yb::load_generator::MultiThreadedReader reader(rows, reader_threads, &read_session_factory, |
150 | | writer.InsertionPoint(), writer.InsertedKeys(), |
151 | | writer.FailedKeys(), &stop_requested_flag, |
152 | | value_size_bytes, max_read_errors); |
153 | | |
154 | | writer.Start(); |
155 | | // Having separate write requires adding in write client id to the reader. |
156 | | reader.set_client_id(write_session_factory.ClientId()); |
157 | | reader.Start(); |
158 | | writer.WaitForCompletion(); |
159 | | LOG(INFO) << "Writing complete"; |
160 | | |
161 | | // The reader will not stop on its own, so we stop it after a couple of seconds after the writer |
162 | | // stops. |
163 | | SleepFor(MonoDelta::FromSeconds(2)); |
164 | | reader.Stop(); |
165 | | reader.WaitForCompletion(); |
166 | | LOG(INFO) << "Reading complete"; |
167 | | |
168 | | ASSERT_EQ(0, writer.num_write_errors()); |
169 | | ASSERT_EQ(0, reader.num_read_errors()); |
170 | | ASSERT_GE(writer.num_writes(), rows); |
171 | | ASSERT_GE(reader.num_reads(), rows); // assuming reads are at least as fast as writes |
172 | | |
173 | | ClusterVerifier cluster_verifier(mini_cluster()); |
174 | | ASSERT_NO_FATALS(cluster_verifier.CheckCluster()); |
175 | | ASSERT_NO_FATALS(cluster_verifier.CheckRowCount(table_->name(), ClusterVerifier::EXACTLY, rows)); |
176 | | } |
177 | | |
178 | | TEST_F(KVTableTest, Restart) { |
179 | | ASSERT_NO_FATALS(PutSampleKeysValues()); |
180 | | // Check we've written the data successfully. |
181 | | ASSERT_NO_FATALS(CheckSampleKeysValues()); |
182 | | ASSERT_NO_FATALS(RestartCluster()); |
183 | | |
184 | | LOG(INFO) << "Checking entries written before the cluster restart"; |
185 | | ASSERT_NO_FATALS(CheckSampleKeysValues()); |
186 | | ClusterVerifier cluster_verifier(mini_cluster()); |
187 | | ASSERT_NO_FATALS(cluster_verifier.CheckCluster()); |
188 | | ASSERT_NO_FATALS(cluster_verifier.CheckRowCount(table_->name(), ClusterVerifier::EXACTLY, 3)); |
189 | | |
190 | | LOG(INFO) << "Issuing additional write operations"; |
191 | | ASSERT_NO_FATALS(PutSampleKeysValues()); |
192 | | ASSERT_NO_FATALS(CheckSampleKeysValues()); |
193 | | |
194 | | // Wait until all tablet servers come up. |
195 | | std::vector<client::YBTabletServer> tablet_servers; |
196 | | do { |
197 | | tablet_servers = ASSERT_RESULT(client_->ListTabletServers()); |
198 | | if (tablet_servers.size() == num_tablet_servers()) { |
199 | | break; |
200 | | } |
201 | | SleepFor(MonoDelta::FromMilliseconds(100)); |
202 | | } while (true); |
203 | | |
204 | | ASSERT_NO_FATALS(cluster_verifier.CheckCluster()); |
205 | | ASSERT_NO_FATALS(cluster_verifier.CheckRowCount(table_->name(), ClusterVerifier::EXACTLY, 3)); |
206 | | } |
207 | | |
208 | | class KVTableSingleTabletTest : public KVTableTest { |
209 | | public: |
210 | 0 | int num_tablets() override { |
211 | 0 | return 1; |
212 | 0 | } |
213 | | |
214 | 1 | void SetUp() override { |
215 | 1 | FLAGS_global_log_cache_size_limit_mb = 1; |
216 | 1 | FLAGS_log_cache_size_limit_mb = 1; |
217 | 1 | KVTableTest::SetUp(); |
218 | 1 | } |
219 | | }; |
220 | | |
221 | | // Write big values with small log cache. |
222 | | // And restart one tserver. |
223 | | // |
224 | | // So we expect that some operations would be unloaded to disk and loaded back |
225 | | // after this tservers joined raft group again. |
226 | | // |
227 | | // Also check that we track such operations. |
228 | 0 | TEST_F_EX(KVTableTest, BigValues, KVTableSingleTabletTest) { |
229 | 0 | std::atomic_bool stop_requested_flag(false); |
230 | 0 | SetFlagOnExit set_flag_on_exit(&stop_requested_flag); |
231 | 0 | int rows = 100; |
232 | 0 | int start_key = 0; |
233 | 0 | int writer_threads = 4; |
234 | 0 | int value_size_bytes = 32_KB; |
235 | 0 | int max_write_errors = 0; |
236 | | |
237 | | // Create two separate clients for read and writes. |
238 | 0 | auto write_client = CreateYBClient(); |
239 | 0 | yb::load_generator::YBSessionFactory write_session_factory(write_client.get(), &table_); |
240 | |
|
241 | 0 | yb::load_generator::MultiThreadedWriter writer(rows, start_key, writer_threads, |
242 | 0 | &write_session_factory, &stop_requested_flag, |
243 | 0 | value_size_bytes, max_write_errors); |
244 | |
|
245 | 0 | writer.Start(); |
246 | 0 | mini_cluster_->mini_tablet_server(1)->Shutdown(); |
247 | 0 | auto start_writes = writer.num_writes(); |
248 | 0 | while (writer.num_writes() - start_writes < 50) { |
249 | 0 | std::this_thread::sleep_for(100ms); |
250 | 0 | } |
251 | 0 | ASSERT_OK(mini_cluster_->mini_tablet_server(1)->Start()); |
252 | |
|
253 | 0 | ASSERT_OK(WaitFor([] { |
254 | 0 | std::vector<MemTrackerData> trackers; |
255 | 0 | trackers.clear(); |
256 | 0 | CollectMemTrackerData(MemTracker::GetRootTracker(), 0, &trackers); |
257 | 0 | bool found = false; |
258 | 0 | for (const auto& data : trackers) { |
259 | 0 | if (data.tracker->id() == "OperationsFromDisk" && data.tracker->peak_consumption()) { |
260 | 0 | LOG(INFO) << "Tracker: " << data.tracker->ToString() << ", peak consumption: " |
261 | 0 | << data.tracker->peak_consumption(); |
262 | 0 | found = true; |
263 | 0 | } |
264 | 0 | } |
265 | 0 | return found; |
266 | 0 | }, 15s, "Load operations from disk")); |
267 | |
|
268 | 0 | writer.WaitForCompletion(); |
269 | 0 | } |
270 | | |
271 | | } // namespace integration_tests |
272 | | } // namespace yb |