YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/integration-tests/update_scan_delta_compact-test.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include <memory>
34
#include <string>
35
#include <vector>
36
37
#include <boost/range/iterator_range.hpp>
38
39
#include "yb/client/callbacks.h"
40
#include "yb/client/client.h"
41
#include "yb/client/error.h"
42
#include "yb/client/schema.h"
43
#include "yb/client/session.h"
44
#include "yb/client/table_handle.h"
45
#include "yb/client/yb_op.h"
46
#include "yb/client/yb_table_name.h"
47
48
#include "yb/gutil/strings/strcat.h"
49
50
#include "yb/integration-tests/mini_cluster.h"
51
#include "yb/integration-tests/yb_mini_cluster_test_base.h"
52
53
#include "yb/tserver/mini_tablet_server.h"
54
55
#include "yb/util/countdown_latch.h"
56
#include "yb/util/curl_util.h"
57
#include "yb/util/monotime.h"
58
#include "yb/util/status_log.h"
59
#include "yb/util/stopwatch.h"
60
#include "yb/util/test_macros.h"
61
#include "yb/util/test_util.h"
62
#include "yb/util/thread.h"
63
64
using namespace std::literals;
65
66
DECLARE_int32(log_segment_size_mb);
67
DECLARE_int32(maintenance_manager_polling_interval_ms);
68
DEFINE_int32(mbs_for_flushes_and_rolls, 1, "How many MBs are needed to flush and roll");
69
DEFINE_int32(row_count, 2000, "How many rows will be used in this test for the base data");
70
DEFINE_int32(seconds_to_run, 4,
71
             "How long this test runs for, after inserting the base data, in seconds");
72
73
namespace yb {
74
namespace tablet {
75
76
using client::YBClient;
77
using client::YBClientBuilder;
78
using client::YBColumnSchema;
79
using client::YBSchema;
80
using client::YBSchemaBuilder;
81
using client::YBSession;
82
using client::YBStatusCallback;
83
using client::YBStatusMemberCallback;
84
using client::YBTable;
85
using client::YBTableCreator;
86
using client::YBTableName;
87
using std::shared_ptr;
88
89
// This integration test tries to trigger all the update-related bits while also serving as a
90
// foundation for benchmarking. It first inserts 'row_count' rows and then starts two threads,
91
// one that continuously updates all the rows sequentially and one that scans them all, until
92
// it's been running for 'seconds_to_run'. It doesn't test for correctness, unless something
93
// FATALs.
94
class UpdateScanDeltaCompactionTest : public YBMiniClusterTestBase<MiniCluster> {
95
 protected:
96
1
  UpdateScanDeltaCompactionTest() {
97
1
    YBSchemaBuilder b;
98
1
    b.AddColumn("key")->Type(INT64)->NotNull()->HashPrimaryKey();
99
1
    b.AddColumn("string")->Type(STRING)->NotNull();
100
1
    b.AddColumn("int64")->Type(INT64)->NotNull();
101
1
    CHECK_OK(b.Build(&schema_));
102
1
  }
103
104
1
  void SetUp() override {
105
1
    HybridTime::TEST_SetPrettyToString(true);
106
1
    YBMiniClusterTestBase::SetUp();
107
1
  }
108
109
1
  void CreateTable() {
110
1
    ASSERT_NO_FATALS(InitCluster());
111
0
    ASSERT_OK(client_->CreateNamespaceIfNotExists(kTableName.namespace_name(),
112
0
                                                  kTableName.namespace_type()));
113
0
    ASSERT_OK(table_.Create(kTableName, CalcNumTablets(1), schema_, client_.get()));
114
0
  }
115
116
0
  void DoTearDown() override {
117
0
    if (cluster_) {
118
0
      cluster_->Shutdown();
119
0
    }
120
0
    YBMiniClusterTestBase::DoTearDown();
121
0
  }
122
123
  // Inserts row_count rows sequentially.
124
  void InsertBaseData();
125
126
  // Starts the update and scan threads then stops them after seconds_to_run.
127
  void RunThreads();
128
129
 private:
130
  static const YBTableName kTableName;
131
132
1
  void InitCluster() {
133
    // Start mini-cluster with 1 tserver.
134
1
    cluster_.reset(new MiniCluster(MiniClusterOptions()));
135
1
    ASSERT_OK(cluster_->Start());
136
1
    client_ = ASSERT_RESULT(cluster_->CreateClient());
137
0
  }
138
139
0
  shared_ptr<YBSession> CreateSession() {
140
0
    shared_ptr<YBSession> session = client_->NewSession();
141
    // Bumped this up from 5 sec to 30 sec in hope to fix the flakiness in this test.
142
0
    session->SetTimeout(30s);
143
0
    return session;
144
0
  }
145
146
  // Continuously updates the existing data until 'stop_latch' drops to 0.
147
  void UpdateRows(CountDownLatch* stop_latch);
148
149
  // Continuously scans the data until 'stop_latch' drops to 0.
150
  void ScanRows(CountDownLatch* stop_latch) const;
151
152
  // Continuously fetch various web pages on the TS.
153
  void CurlWebPages(CountDownLatch* stop_latch) const;
154
155
  // Sets the passed values on the row.
156
  // TODO randomize the string column.
157
  void MakeRow(int64_t key, int64_t val, QLWriteRequestPB* req) const;
158
159
  // If 'key' is a multiple of kSessionBatchSize, it uses 'flush_future' to wait for the previous
160
  // batch to finish and then flushes the current one.
161
  Status WaitForLastBatchAndFlush(int64_t key,
162
                                  std::future<client::FlushStatus>* flush_future,
163
                                  shared_ptr<YBSession> session);
164
165
  YBSchema schema_;
166
  client::TableHandle table_;
167
  std::unique_ptr<YBClient> client_;
168
};
169
170
const YBTableName UpdateScanDeltaCompactionTest::kTableName(
171
    YQL_DATABASE_CQL, "my_keyspace", "update-scan-delta-compact-tbl");
172
const int kSessionBatchSize = 1000;
173
174
1
TEST_F(UpdateScanDeltaCompactionTest, TestAll) {
175
1
  OverrideFlagForSlowTests("seconds_to_run", "100");
176
1
  OverrideFlagForSlowTests("row_count", "1000000");
177
1
  OverrideFlagForSlowTests("mbs_for_flushes_and_rolls", "8");
178
  // Setting this high enough that we see the effects of flushes and compactions.
179
1
  OverrideFlagForSlowTests("maintenance_manager_polling_interval_ms", "2000");
180
1
  FLAGS_log_segment_size_mb = FLAGS_mbs_for_flushes_and_rolls;
181
1
  if (!AllowSlowTests()) {
182
    // Make it run more often since it's not a long test.
183
1
    FLAGS_maintenance_manager_polling_interval_ms = 50;
184
1
  }
185
186
1
  ASSERT_NO_FATALS(CreateTable());
187
0
  ASSERT_NO_FATALS(InsertBaseData());
188
0
  ASSERT_NO_FATALS(RunThreads());
189
0
}
190
191
0
void UpdateScanDeltaCompactionTest::InsertBaseData() {
192
0
  shared_ptr<YBSession> session = CreateSession();
193
0
  std::promise<client::FlushStatus> promise;
194
0
  auto flush_future = promise.get_future();
195
0
  promise.set_value(client::FlushStatus());
196
197
0
  LOG_TIMING(INFO, "Insert") {
198
0
    for (int64_t key = 0; key < FLAGS_row_count; key++) {
199
0
      auto insert = table_.NewInsertOp();
200
0
      MakeRow(key, 0, insert->mutable_request());
201
0
      session->Apply(insert);
202
0
      ASSERT_OK(WaitForLastBatchAndFlush(key, &flush_future, session));
203
0
    }
204
0
    ASSERT_OK(WaitForLastBatchAndFlush(kSessionBatchSize, &flush_future, session));
205
0
    if (flush_future.valid()) {
206
0
      ASSERT_OK(flush_future.get().status);
207
0
    }
208
0
  }
209
0
}
210
211
0
void UpdateScanDeltaCompactionTest::RunThreads() {
212
0
  vector<scoped_refptr<Thread> > threads;
213
214
0
  CountDownLatch stop_latch(1);
215
216
0
  {
217
0
    scoped_refptr<Thread> t;
218
0
    ASSERT_OK(Thread::Create(CURRENT_TEST_NAME(),
219
0
                             StrCat(CURRENT_TEST_CASE_NAME(), "-update"),
220
0
                             &UpdateScanDeltaCompactionTest::UpdateRows, this,
221
0
                             &stop_latch, &t));
222
0
    threads.push_back(t);
223
0
  }
224
225
0
  {
226
0
    scoped_refptr<Thread> t;
227
0
    ASSERT_OK(Thread::Create(CURRENT_TEST_NAME(),
228
0
                             StrCat(CURRENT_TEST_CASE_NAME(), "-scan"),
229
0
                             &UpdateScanDeltaCompactionTest::ScanRows, this,
230
0
                             &stop_latch, &t));
231
0
    threads.push_back(t);
232
0
  }
233
234
0
  {
235
0
    scoped_refptr<Thread> t;
236
0
    ASSERT_OK(Thread::Create(CURRENT_TEST_NAME(),
237
0
                             StrCat(CURRENT_TEST_CASE_NAME(), "-curl"),
238
0
                             &UpdateScanDeltaCompactionTest::CurlWebPages, this,
239
0
                             &stop_latch, &t));
240
0
    threads.push_back(t);
241
0
  }
242
243
0
  SleepFor(MonoDelta::FromSeconds(FLAGS_seconds_to_run * 1.0));
244
0
  stop_latch.CountDown();
245
246
0
  for (const scoped_refptr<Thread>& thread : threads) {
247
0
    ASSERT_OK(ThreadJoiner(thread.get()).warn_every(500ms).Join());
248
0
  }
249
0
}
250
251
0
void UpdateScanDeltaCompactionTest::UpdateRows(CountDownLatch* stop_latch) {
252
0
  shared_ptr<YBSession> session = CreateSession();
253
254
0
  for (int64_t iteration = 1; stop_latch->count() > 0; iteration++) {
255
0
    LOG_TIMING(INFO, "Update") {
256
0
      std::promise<client::FlushStatus> promise;
257
0
      auto flush_future = promise.get_future();
258
0
      promise.set_value(client::FlushStatus());
259
0
      for (int64_t key = 0; key < FLAGS_row_count && stop_latch->count() > 0; key++) {
260
0
        auto update = table_.NewUpdateOp();
261
0
        MakeRow(key, iteration, update->mutable_request());
262
0
        session->Apply(update);
263
0
        CHECK_OK(WaitForLastBatchAndFlush(key, &flush_future, session));
264
0
      }
265
0
      CHECK_OK(WaitForLastBatchAndFlush(kSessionBatchSize, &flush_future, session));
266
0
      if (flush_future.valid()) {
267
0
        CHECK_OK(flush_future.get().status);
268
0
      }
269
0
    }
270
0
  }
271
0
}
272
273
0
void UpdateScanDeltaCompactionTest::ScanRows(CountDownLatch* stop_latch) const {
274
0
  while (stop_latch->count() > 0) {
275
0
    LOG_TIMING(INFO, "Scan") {
276
0
      boost::size(client::TableRange(table_));
277
0
    }
278
0
  }
279
0
}
280
281
0
void UpdateScanDeltaCompactionTest::CurlWebPages(CountDownLatch* stop_latch) const {
282
0
  vector<string> urls;
283
0
  string base_url = yb::ToString(cluster_->mini_tablet_server(0)->bound_http_addr());
284
0
  urls.push_back(base_url + "/scans");
285
0
  urls.push_back(base_url + "/transactions");
286
287
0
  EasyCurl curl;
288
0
  faststring dst;
289
0
  while (stop_latch->count() > 0) {
290
0
    for (const string& url : urls) {
291
0
      VLOG(1) << "Curling URL " << url;
292
0
      Status status = curl.FetchURL(url, &dst);
293
0
      if (status.ok()) {
294
0
        CHECK_GT(dst.length(), 0);
295
0
      }
296
0
    }
297
0
  }
298
0
}
299
300
void UpdateScanDeltaCompactionTest::MakeRow(int64_t key,
301
                                            int64_t val,
302
0
                                            QLWriteRequestPB* req) const {
303
0
  QLAddInt64HashValue(req, key);
304
0
  table_.AddStringColumnValue(req, "string", "TODO random string");
305
0
  table_.AddInt64ColumnValue(req, "int64", val);
306
0
}
307
308
Status UpdateScanDeltaCompactionTest::WaitForLastBatchAndFlush(
309
0
    int64_t key, std::future<client::FlushStatus>* flush_future, shared_ptr<YBSession> session) {
310
0
  if (key % kSessionBatchSize == 0) {
311
0
    RETURN_NOT_OK(flush_future->get().status);
312
0
    *flush_future = session->FlushFuture();
313
0
  }
314
0
  return Status::OK();
315
0
}
316
317
}  // namespace tablet
318
}  // namespace yb