YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/integration-tests/kv_table-test.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include <atomic>
15
#include <cmath>
16
#include <cstdlib>
17
#include <future>
18
19
#include <gflags/gflags.h>
20
#include <glog/logging.h>
21
22
#include "yb/client/callbacks.h"
23
#include "yb/client/client.h"
24
#include "yb/client/table.h"
25
#include "yb/client/tablet_server.h"
26
27
#include "yb/gutil/strings/split.h"
28
#include "yb/gutil/strings/substitute.h"
29
30
#include "yb/integration-tests/cluster_verifier.h"
31
#include "yb/integration-tests/load_generator.h"
32
#include "yb/integration-tests/mini_cluster.h"
33
#include "yb/integration-tests/yb_table_test_base.h"
34
35
#include "yb/master/mini_master.h"
36
37
#include "yb/tserver/mini_tablet_server.h"
38
39
#include "yb/util/size_literals.h"
40
#include "yb/util/test_macros.h"
41
#include "yb/util/test_thread_holder.h"
42
#include "yb/util/test_util.h"
43
44
using namespace std::literals;
45
46
using std::string;
47
using std::vector;
48
using std::unique_ptr;
49
50
using yb::client::YBValue;
51
52
using std::shared_ptr;
53
54
DECLARE_int32(log_cache_size_limit_mb);
55
DECLARE_int32(global_log_cache_size_limit_mb);
56
57
namespace yb {
58
namespace integration_tests {
59
60
using client::YBClient;
61
using client::YBClientBuilder;
62
using client::YBColumnSchema;
63
using client::YBSchema;
64
using client::YBSchemaBuilder;
65
using client::YBSession;
66
using client::YBStatusMemberCallback;
67
using client::YBTable;
68
using client::YBTableCreator;
69
using strings::Split;
70
71
class KVTableTest : public YBTableTestBase {
72
 protected:
73
74
10
  bool use_external_mini_cluster() override { return false; }
75
76
 protected:
77
78
0
  void PutSampleKeysValues() {
79
0
    PutKeyValue("key123", "value123");
80
0
    PutKeyValue("key200", "value200");
81
0
    PutKeyValue("key300", "value300");
82
0
  }
83
84
  void CheckSampleKeysValues() {
85
    auto result_kvs = GetScanResults(client::TableRange(table_));
86
87
    ASSERT_EQ(3, result_kvs.size());
88
    ASSERT_EQ("key123", result_kvs.front().first);
89
    ASSERT_EQ("value123", result_kvs.front().second);
90
    ASSERT_EQ("key200", result_kvs[1].first);
91
    ASSERT_EQ("value200", result_kvs[1].second);
92
    ASSERT_EQ("key300", result_kvs[2].first);
93
    ASSERT_EQ("value300", result_kvs[2].second);
94
  }
95
96
};
97
98
TEST_F(KVTableTest, SimpleKVTableTest) {
99
  ASSERT_NO_FATALS(PutSampleKeysValues());
100
  ASSERT_NO_FATALS(CheckSampleKeysValues());
101
  ClusterVerifier cluster_verifier(mini_cluster());
102
  ASSERT_NO_FATALS(cluster_verifier.CheckCluster());
103
  ASSERT_NO_FATALS(cluster_verifier.CheckRowCount(table_->name(), ClusterVerifier::EXACTLY, 3));
104
}
105
106
TEST_F(KVTableTest, PointQuery) {
107
  ASSERT_NO_FATALS(PutSampleKeysValues());
108
109
  client::TableIteratorOptions options;
110
  options.filter = client::FilterEqual("key200"s, "k"s);
111
  auto result_kvs = GetScanResults(client::TableRange(table_, options));
112
  ASSERT_EQ(1, result_kvs.size());
113
  ASSERT_EQ("key200", result_kvs.front().first);
114
  ASSERT_EQ("value200", result_kvs.front().second);
115
}
116
117
TEST_F(KVTableTest, Eng135MetricsTest) {
118
  ClusterVerifier cluster_verifier(mini_cluster());
119
  for (int idx = 0; idx < 10; ++idx) {
120
    ASSERT_NO_FATALS(PutSampleKeysValues());
121
    ASSERT_NO_FATALS(CheckSampleKeysValues());
122
    ASSERT_NO_FATALS(DeleteTable());
123
    ASSERT_NO_FATALS(CreateTable());
124
    ASSERT_NO_FATALS(OpenTable());
125
    ASSERT_NO_FATALS(cluster_verifier.CheckCluster());
126
    ASSERT_NO_FATALS(cluster_verifier.CheckRowCount(table_->name(), ClusterVerifier::EXACTLY, 0));
127
  }
128
}
129
130
TEST_F(KVTableTest, LoadTest) {
131
  std::atomic_bool stop_requested_flag(false);
132
  int rows = 5000;
133
  int start_key = 0;
134
  int writer_threads = 4;
135
  int reader_threads = 4;
136
  int value_size_bytes = 16;
137
  int max_write_errors = 0;
138
  int max_read_errors = 0;
139
140
  // Create two separate clients for read and writes.
141
  auto write_client = CreateYBClient();
142
  auto read_client = CreateYBClient();
143
  yb::load_generator::YBSessionFactory write_session_factory(write_client.get(), &table_);
144
  yb::load_generator::YBSessionFactory read_session_factory(read_client.get(), &table_);
145
146
  yb::load_generator::MultiThreadedWriter writer(rows, start_key, writer_threads,
147
                                                 &write_session_factory, &stop_requested_flag,
148
                                                 value_size_bytes, max_write_errors);
149
  yb::load_generator::MultiThreadedReader reader(rows, reader_threads, &read_session_factory,
150
                                                 writer.InsertionPoint(), writer.InsertedKeys(),
151
                                                 writer.FailedKeys(), &stop_requested_flag,
152
                                                 value_size_bytes, max_read_errors);
153
154
  writer.Start();
155
  // Having separate write requires adding in write client id to the reader.
156
  reader.set_client_id(write_session_factory.ClientId());
157
  reader.Start();
158
  writer.WaitForCompletion();
159
  LOG(INFO) << "Writing complete";
160
161
  // The reader will not stop on its own, so we stop it after a couple of seconds after the writer
162
  // stops.
163
  SleepFor(MonoDelta::FromSeconds(2));
164
  reader.Stop();
165
  reader.WaitForCompletion();
166
  LOG(INFO) << "Reading complete";
167
168
  ASSERT_EQ(0, writer.num_write_errors());
169
  ASSERT_EQ(0, reader.num_read_errors());
170
  ASSERT_GE(writer.num_writes(), rows);
171
  ASSERT_GE(reader.num_reads(), rows);  // assuming reads are at least as fast as writes
172
173
  ClusterVerifier cluster_verifier(mini_cluster());
174
  ASSERT_NO_FATALS(cluster_verifier.CheckCluster());
175
  ASSERT_NO_FATALS(cluster_verifier.CheckRowCount(table_->name(), ClusterVerifier::EXACTLY, rows));
176
}
177
178
TEST_F(KVTableTest, Restart) {
179
  ASSERT_NO_FATALS(PutSampleKeysValues());
180
  // Check we've written the data successfully.
181
  ASSERT_NO_FATALS(CheckSampleKeysValues());
182
  ASSERT_NO_FATALS(RestartCluster());
183
184
  LOG(INFO) << "Checking entries written before the cluster restart";
185
  ASSERT_NO_FATALS(CheckSampleKeysValues());
186
  ClusterVerifier cluster_verifier(mini_cluster());
187
  ASSERT_NO_FATALS(cluster_verifier.CheckCluster());
188
  ASSERT_NO_FATALS(cluster_verifier.CheckRowCount(table_->name(), ClusterVerifier::EXACTLY, 3));
189
190
  LOG(INFO) << "Issuing additional write operations";
191
  ASSERT_NO_FATALS(PutSampleKeysValues());
192
  ASSERT_NO_FATALS(CheckSampleKeysValues());
193
194
  // Wait until all tablet servers come up.
195
  std::vector<client::YBTabletServer> tablet_servers;
196
  do {
197
    tablet_servers = ASSERT_RESULT(client_->ListTabletServers());
198
    if (tablet_servers.size() == num_tablet_servers()) {
199
      break;
200
    }
201
    SleepFor(MonoDelta::FromMilliseconds(100));
202
  } while (true);
203
204
  ASSERT_NO_FATALS(cluster_verifier.CheckCluster());
205
  ASSERT_NO_FATALS(cluster_verifier.CheckRowCount(table_->name(), ClusterVerifier::EXACTLY, 3));
206
}
207
208
class KVTableSingleTabletTest : public KVTableTest {
209
 public:
210
0
  int num_tablets() override {
211
0
    return 1;
212
0
  }
213
214
1
  void SetUp() override {
215
1
    FLAGS_global_log_cache_size_limit_mb = 1;
216
1
    FLAGS_log_cache_size_limit_mb = 1;
217
1
    KVTableTest::SetUp();
218
1
  }
219
};
220
221
// Write big values with small log cache.
222
// And restart one tserver.
223
//
224
// So we expect that some operations would be unloaded to disk and loaded back
225
// after this tservers joined raft group again.
226
//
227
// Also check that we track such operations.
228
0
TEST_F_EX(KVTableTest, BigValues, KVTableSingleTabletTest) {
229
0
  std::atomic_bool stop_requested_flag(false);
230
0
  SetFlagOnExit set_flag_on_exit(&stop_requested_flag);
231
0
  int rows = 100;
232
0
  int start_key = 0;
233
0
  int writer_threads = 4;
234
0
  int value_size_bytes = 32_KB;
235
0
  int max_write_errors = 0;
236
237
  // Create two separate clients for read and writes.
238
0
  auto write_client = CreateYBClient();
239
0
  yb::load_generator::YBSessionFactory write_session_factory(write_client.get(), &table_);
240
241
0
  yb::load_generator::MultiThreadedWriter writer(rows, start_key, writer_threads,
242
0
                                                 &write_session_factory, &stop_requested_flag,
243
0
                                                 value_size_bytes, max_write_errors);
244
245
0
  writer.Start();
246
0
  mini_cluster_->mini_tablet_server(1)->Shutdown();
247
0
  auto start_writes = writer.num_writes();
248
0
  while (writer.num_writes() - start_writes < 50) {
249
0
    std::this_thread::sleep_for(100ms);
250
0
  }
251
0
  ASSERT_OK(mini_cluster_->mini_tablet_server(1)->Start());
252
253
0
  ASSERT_OK(WaitFor([] {
254
0
    std::vector<MemTrackerData> trackers;
255
0
    trackers.clear();
256
0
    CollectMemTrackerData(MemTracker::GetRootTracker(), 0, &trackers);
257
0
    bool found = false;
258
0
    for (const auto& data : trackers) {
259
0
      if (data.tracker->id() == "OperationsFromDisk" && data.tracker->peak_consumption()) {
260
0
        LOG(INFO) << "Tracker: " << data.tracker->ToString() << ", peak consumption: "
261
0
                  << data.tracker->peak_consumption();
262
0
        found = true;
263
0
      }
264
0
    }
265
0
    return found;
266
0
  }, 15s, "Load operations from disk"));
267
268
0
  writer.WaitForCompletion();
269
0
}
270
271
}  // namespace integration_tests
272
}  // namespace yb