YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/integration-tests/full_stack-insert-scan-test.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include <cmath>
34
#include <cstdlib>
35
#include <memory>
36
#include <string>
37
#include <vector>
38
39
#include <gflags/gflags.h>
40
#include <glog/logging.h>
41
42
#include "yb/gutil/casts.h"
43
44
#include "yb/client/callbacks.h"
45
#include "yb/client/client-test-util.h"
46
#include "yb/client/client.h"
47
#include "yb/client/error.h"
48
#include "yb/client/schema.h"
49
#include "yb/client/session.h"
50
#include "yb/client/table_handle.h"
51
#include "yb/client/yb_op.h"
52
#include "yb/client/yb_table_name.h"
53
54
#include "yb/gutil/ref_counted.h"
55
#include "yb/gutil/strings/split.h"
56
#include "yb/gutil/strings/substitute.h"
57
58
#include "yb/integration-tests/mini_cluster.h"
59
#include "yb/integration-tests/yb_mini_cluster_test_base.h"
60
61
#include "yb/master/mini_master.h"
62
63
#include "yb/tablet/maintenance_manager.h"
64
#include "yb/tablet/tablet.h"
65
#include "yb/tablet/tablet_peer.h"
66
67
#include "yb/tserver/mini_tablet_server.h"
68
#include "yb/tserver/tablet_server.h"
69
#include "yb/tserver/ts_tablet_manager.h"
70
71
#include "yb/util/async_util.h"
72
#include "yb/util/countdown_latch.h"
73
#include "yb/util/errno.h"
74
#include "yb/util/random.h"
75
#include "yb/util/random_util.h"
76
#include "yb/util/status.h"
77
#include "yb/util/status_log.h"
78
#include "yb/util/stopwatch.h"
79
#include "yb/util/subprocess.h"
80
#include "yb/util/test_macros.h"
81
#include "yb/util/test_util.h"
82
#include "yb/util/thread.h"
83
84
using namespace std::literals;
85
86
// Test size parameters
87
DEFINE_int32(concurrent_inserts, -1, "Number of inserting clients to launch");
88
DEFINE_int32(inserts_per_client, -1,
89
             "Number of rows inserted by each inserter client");
90
DEFINE_int32(rows_per_batch, -1, "Number of rows per client batch");
91
92
// Perf-related FLAGS_perf_stat
93
DEFINE_bool(perf_record_scan, false, "Call \"perf record --call-graph\" "
94
            "for the duration of the scan, disabled by default");
95
DEFINE_bool(perf_stat_scan, false, "Print \"perf stat\" results during"
96
            "scan to stdout, disabled by default");
97
DEFINE_bool(perf_fp_flag, false, "Only applicable with --perf_record_scan,"
98
            " provides argument \"fp\" to the --call-graph flag");
99
DECLARE_bool(enable_maintenance_manager);
100
101
using std::string;
102
using std::vector;
103
using std::shared_ptr;
104
105
namespace yb {
106
namespace tablet {
107
108
using client::YBClient;
109
using client::YBClientBuilder;
110
using client::YBColumnSchema;
111
using client::YBSchema;
112
using client::YBSchemaBuilder;
113
using client::YBSession;
114
using client::YBStatusMemberCallback;
115
using client::YBTable;
116
using client::YBTableCreator;
117
using client::YBTableName;
118
using strings::Split;
119
using strings::Substitute;
120
121
namespace {
122
123
const auto kSessionTimeout = 60s;
124
125
}
126
127
class FullStackInsertScanTest : public YBMiniClusterTestBase<MiniCluster> {
128
 protected:
129
  FullStackInsertScanTest()
130
    : // Set the default value depending on whether slow tests are allowed
131
    kNumInsertClients(DefaultFlag(FLAGS_concurrent_inserts, 3, 10)),
132
    kNumInsertsPerClient(DefaultFlag(FLAGS_inserts_per_client, 500, 50000)),
133
    kNumRows(kNumInsertClients*kNumInsertsPerClient),
134
    kFlushEveryN(DefaultFlag(FLAGS_rows_per_batch, 125, 5000)),
135
    random_(SeedRandom()),
136
2
    sessions_(kNumInsertClients) {
137
2
    tables_.reserve(kNumInsertClients);
138
2
  }
139
140
  const int kNumInsertClients;
141
  const int kNumInsertsPerClient;
142
  const int kNumRows;
143
144
2
  void SetUp() override {
145
2
    YBMiniClusterTestBase::SetUp();
146
2
  }
147
148
  void CreateTable();
149
150
1
  void DoTearDown() override {
151
1
    client_.reset();
152
1
    if (cluster_) {
153
1
      cluster_->Shutdown();
154
1
    }
155
1
    YBMiniClusterTestBase::DoTearDown();
156
1
  }
157
158
  void DoConcurrentClientInserts();
159
  void DoTestScans();
160
  void FlushToDisk();
161
162
 private:
163
6
  int DefaultFlag(int flag, int fast, int slow) {
164
6
    if (flag != -1) return flag;
165
6
    if (AllowSlowTests()) return slow;
166
6
    return fast;
167
6
  }
168
169
2
  void InitCluster() {
170
    // Start mini-cluster with 1 tserver, config client options
171
2
    cluster_.reset(new MiniCluster(MiniClusterOptions()));
172
2
    ASSERT_OK(cluster_->Start());
173
2
    YBClientBuilder builder;
174
2
    builder.add_master_server_addr(
175
2
        cluster_->mini_master()->bound_rpc_addr_str());
176
2
    builder.default_rpc_timeout(MonoDelta::FromSeconds(30));
177
0
    client_ = ASSERT_RESULT(builder.Build());
178
0
  }
179
180
  // Adds newly generated client's session and table pointers to arrays at id
181
0
  void CreateNewClient(int id) {
182
0
    while (tables_.size() <= implicit_cast<size_t>(id)) {
183
0
      auto table = std::make_unique<client::TableHandle>();
184
0
      ASSERT_OK(table->Open(kTableName, client_.get()));
185
0
      tables_.push_back(std::move(table));
186
0
    }
187
0
    std::shared_ptr<YBSession> session = client_->NewSession();
188
0
    session->SetTimeout(kSessionTimeout);
189
0
    sessions_[id] = session;
190
0
  }
191
192
  // Insert the rows that are associated with that ID.
193
  void InsertRows(CountDownLatch* start_latch, int id, uint32_t seed);
194
195
  // Run a scan from the reader_client_ with the projection schema schema
196
  // and LOG_TIMING message msg.
197
  void ScanProjection(const vector<string>& cols, const string& msg);
198
199
  vector<string> AllColumnNames() const;
200
201
  static const YBTableName kTableName;
202
  const int kFlushEveryN;
203
204
  Random random_;
205
206
  YBSchema schema_;
207
  std::unique_ptr<YBClient> client_;
208
  client::TableHandle reader_table_;
209
  // Concurrent client insertion test variables
210
  vector<std::shared_ptr<YBSession> > sessions_;
211
  std::vector<std::unique_ptr<client::TableHandle>> tables_;
212
};
213
214
namespace {
215
216
0
std::unique_ptr<Subprocess> MakePerfStat() {
217
0
  if (!FLAGS_perf_stat_scan) return std::unique_ptr<Subprocess>();
218
  // No output flag for perf-stat 2.x, just print to output
219
0
  string cmd = Substitute("perf stat --pid=$0", getpid());
220
0
  LOG(INFO) << "Calling: \"" << cmd << "\"";
221
0
  return std::unique_ptr<Subprocess>(new Subprocess("perf", Split(cmd, " ")));
222
0
}
223
224
0
std::unique_ptr<Subprocess> MakePerfRecord() {
225
0
  if (!FLAGS_perf_record_scan) return std::unique_ptr<Subprocess>();
226
0
  string cmd = Substitute("perf record --pid=$0 --call-graph", getpid());
227
0
  if (FLAGS_perf_fp_flag) cmd += " fp";
228
0
  LOG(INFO) << "Calling: \"" << cmd << "\"";
229
0
  return std::unique_ptr<Subprocess>(new Subprocess("perf", Split(cmd, " ")));
230
0
}
231
232
0
void InterruptNotNull(std::unique_ptr<Subprocess> sub) {
233
0
  if (!sub) return;
234
0
  ASSERT_OK(sub->Kill(SIGINT));
235
0
  int exit_status = 0;
236
0
  ASSERT_OK(sub->Wait(&exit_status));
237
0
  if (!exit_status) {
238
0
    LOG(WARNING) << "Subprocess returned " << exit_status
239
0
                 << ": " << ErrnoToString(exit_status);
240
0
  }
241
0
}
242
243
// If key is approximately at an even multiple of 1/10 of the way between
244
// start and end, then a % completion update is printed to LOG(INFO)
245
// Assumes that end - start + 1 fits into an int
246
void ReportTenthDone(int64_t key, int64_t start, int64_t end,
247
0
                     int id, int numids) {
248
0
  auto done = key - start + 1;
249
0
  auto total = end - start + 1;
250
0
  if (total < 10) return;
251
0
  if (done % (total / 10) == 0) {
252
0
    auto percent = done * 100 / total;
253
0
    LOG(INFO) << "Insertion thread " << id << " of "
254
0
              << numids << " is "<< percent << "% done.";
255
0
  }
256
0
}
257
258
0
void ReportAllDone(int id, int numids) {
259
0
  LOG(INFO) << "Insertion thread " << id << " of  "
260
0
            << numids << " is 100% done.";
261
0
}
262
263
} // anonymous namespace
264
265
const YBTableName FullStackInsertScanTest::kTableName(
266
    YQL_DATABASE_CQL, "my_keyspace", "full-stack-mrs-test-tbl");
267
268
1
TEST_F(FullStackInsertScanTest, MRSOnlyStressTest) {
269
1
  FLAGS_enable_maintenance_manager = false;
270
1
  ASSERT_NO_FATALS(CreateTable());
271
0
  ASSERT_NO_FATALS(DoConcurrentClientInserts());
272
0
  ASSERT_NO_FATALS(DoTestScans());
273
0
}
274
275
1
TEST_F(FullStackInsertScanTest, WithDiskStressTest) {
276
1
  ASSERT_NO_FATALS(CreateTable());
277
0
  ASSERT_NO_FATALS(DoConcurrentClientInserts());
278
0
  ASSERT_NO_FATALS(FlushToDisk());
279
0
  ASSERT_NO_FATALS(DoTestScans());
280
0
}
281
282
0
void FullStackInsertScanTest::DoConcurrentClientInserts() {
283
0
  vector<scoped_refptr<Thread> > threads(kNumInsertClients);
284
0
  CountDownLatch start_latch(kNumInsertClients + 1);
285
0
  for (int i = 0; i < kNumInsertClients; ++i) {
286
0
    ASSERT_NO_FATALS(CreateNewClient(i));
287
0
    ASSERT_OK(Thread::Create(CURRENT_TEST_NAME(),
288
0
                             StrCat(CURRENT_TEST_CASE_NAME(), "-id", i),
289
0
                             &FullStackInsertScanTest::InsertRows, this,
290
0
                             &start_latch, i, random_.Next(), &threads[i]));
291
0
    start_latch.CountDown();
292
0
  }
293
0
  LOG_TIMING(INFO,
294
0
             strings::Substitute("concurrent inserts ($0 rows, $1 threads)",
295
0
                                 kNumRows, kNumInsertClients)) {
296
0
    start_latch.CountDown();
297
0
    for (const scoped_refptr<Thread>& thread : threads) {
298
0
      ASSERT_OK(ThreadJoiner(thread.get()).warn_every(15s).Join());
299
0
    }
300
0
  }
301
0
}
302
303
namespace {
304
305
constexpr int kNumIntCols = 4;
306
constexpr int kRandomStrMinLength = 16;
307
constexpr int kRandomStrMaxLength = 31;
308
309
0
const std::vector<string>& StringColumnNames() {
310
0
  static std::vector<string> result = { "string_val" };
311
0
  return result;
312
0
}
313
314
0
const std::vector<string>& Int32ColumnNames() {
315
0
  static std::vector<string> result = {
316
0
      "int32_val1",
317
0
      "int32_val2",
318
0
      "int32_val3",
319
0
      "int32_val4" };
320
0
  return result;
321
0
}
322
323
0
const std::vector<string>& Int64ColumnNames() {
324
0
  static std::vector<string> result = {
325
0
      "int64_val1",
326
0
      "int64_val2",
327
0
      "int64_val3",
328
0
      "int64_val4" };
329
0
  return result;
330
0
}
331
332
// Fills in the fields for a row as defined by the Schema below
333
// name: (key,      string_val, int32_val$, int64_val$)
334
// type: (int64_t,  string,     int32_t x4, int64_t x4)
335
// The first int32 gets the id and the first int64 gets the thread
336
// id. The key is assigned to "key," and the other fields are random.
337
void RandomRow(Random* rng, QLWriteRequestPB* req, char* buf, int64_t key, int id,
338
0
               client::TableHandle* table) {
339
0
  QLAddInt64HashValue(req, key);
340
0
  int len = kRandomStrMinLength + rng->Uniform(kRandomStrMaxLength - kRandomStrMinLength + 1);
341
0
  RandomString(buf, len, rng);
342
0
  buf[len] = '\0';
343
0
  table->AddStringColumnValue(req, StringColumnNames()[0], buf);
344
0
  for (int i = 0; i < kNumIntCols; ++i) {
345
0
    table->AddInt32ColumnValue(req, Int32ColumnNames()[i], rng->Next32());
346
0
    table->AddInt64ColumnValue(req, Int64ColumnNames()[i], rng->Next64());
347
0
  }
348
0
}
349
350
} // namespace
351
352
2
void FullStackInsertScanTest::CreateTable() {
353
2
  ASSERT_GE(kNumInsertClients, 0);
354
2
  ASSERT_GE(kNumInsertsPerClient, 0);
355
2
  ASSERT_NO_FATALS(InitCluster());
356
357
0
  ASSERT_OK(client_->CreateNamespaceIfNotExists(kTableName.namespace_name(),
358
0
                                                kTableName.namespace_type()));
359
360
0
  YBSchemaBuilder b;
361
0
  b.AddColumn("key")->Type(INT64)->NotNull()->HashPrimaryKey();
362
0
  b.AddColumn("string_val")->Type(STRING)->NotNull();
363
0
  for (auto& col : Int32ColumnNames()) {
364
0
    b.AddColumn(col)->Type(INT32)->NotNull();
365
0
  }
366
0
  for (auto& col : Int64ColumnNames()) {
367
0
    b.AddColumn(col)->Type(INT64)->NotNull();
368
0
  }
369
0
  ASSERT_OK(reader_table_.Create(kTableName, CalcNumTablets(1), client_.get(), &b));
370
0
  schema_ = reader_table_.schema();
371
0
}
372
373
0
void FullStackInsertScanTest::DoTestScans() {
374
0
  LOG(INFO) << "Doing test scans on table of " << kNumRows << " rows.";
375
376
0
  auto stat = MakePerfStat();
377
0
  auto record = MakePerfRecord();
378
0
  if (stat) {
379
0
    CHECK_OK(stat->Start());
380
0
  }
381
0
  if (record) {
382
0
    CHECK_OK(record->Start());
383
0
  }
384
0
  ASSERT_NO_FATALS(ScanProjection(vector<string>(), "empty projection, 0 col"));
385
0
  ASSERT_NO_FATALS(ScanProjection({ "key" }, "key scan, 1 col"));
386
0
  ASSERT_NO_FATALS(ScanProjection(AllColumnNames(), "full schema scan, 10 col"));
387
0
  ASSERT_NO_FATALS(ScanProjection(StringColumnNames(), "String projection, 1 col"));
388
0
  ASSERT_NO_FATALS(ScanProjection(Int32ColumnNames(), "Int32 projection, 4 col"));
389
0
  ASSERT_NO_FATALS(ScanProjection(Int64ColumnNames(), "Int64 projection, 4 col"));
390
391
0
  ASSERT_NO_FATALS(InterruptNotNull(std::move(record)));
392
0
  ASSERT_NO_FATALS(InterruptNotNull(std::move(stat)));
393
0
}
394
395
0
void FullStackInsertScanTest::FlushToDisk() {
396
0
  for (size_t i = 0; i < cluster_->num_tablet_servers(); ++i) {
397
0
    tserver::TabletServer* ts = cluster_->mini_tablet_server(i)->server();
398
0
    ts->maintenance_manager()->Shutdown();
399
0
    auto peers = ts->tablet_manager()->GetTabletPeers();
400
0
    for (const std::shared_ptr<TabletPeer>& peer : peers) {
401
0
      Tablet* tablet = peer->tablet();
402
0
      ASSERT_OK(tablet->Flush(tablet::FlushMode::kSync));
403
0
    }
404
0
  }
405
0
}
406
407
void FullStackInsertScanTest::InsertRows(CountDownLatch* start_latch, int id,
408
0
                                         uint32_t seed) {
409
0
  Random rng(seed + id);
410
411
0
  start_latch->Wait();
412
  // Retrieve id's session and table
413
0
  std::shared_ptr<YBSession> session = sessions_[id];
414
0
  client::TableHandle* table = tables_[id].get();
415
  // Identify start and end of keyrange id is responsible for
416
0
  int64_t start = kNumInsertsPerClient * id;
417
0
  int64_t end = start + kNumInsertsPerClient;
418
  // Printed id value is in the range 1..kNumInsertClients inclusive
419
0
  ++id;
420
  // Prime the future as if it was running a batch (for for-loop code)
421
0
  std::promise<client::FlushStatus> promise;
422
0
  auto flush_status_future = promise.get_future();
423
0
  promise.set_value(client::FlushStatus());
424
  // Maintain buffer for random string generation
425
0
  char randstr[kRandomStrMaxLength + 1];
426
  // Insert in the id's key range
427
0
  for (int64_t key = start; key < end; ++key) {
428
0
    auto op = table->NewWriteOp(QLWriteRequestPB::QL_STMT_INSERT);
429
0
    RandomRow(&rng, op->mutable_request(), randstr, key, id, table);
430
0
    session->Apply(op);
431
432
    // Report updates or flush every so often, using the synchronizer to always
433
    // start filling up the next batch while previous one is sent out.
434
0
    if (key % kFlushEveryN == 0 || key == end - 1) {
435
0
      auto flush_status = flush_status_future.get();
436
0
      if (!flush_status.status.ok()) {
437
0
        LogSessionErrorsAndDie(flush_status);
438
0
      }
439
0
      flush_status_future = session->FlushFuture();
440
0
    }
441
0
    ReportTenthDone(key, start, end, id, kNumInsertClients);
442
0
  }
443
0
  auto flush_status = flush_status_future.get();
444
0
  if (!flush_status.status.ok()) {
445
0
    LogSessionErrorsAndDie(flush_status);
446
0
  }
447
0
  ReportAllDone(id, kNumInsertClients);
448
0
  FlushSessionOrDie(session);
449
0
}
450
451
void FullStackInsertScanTest::ScanProjection(const vector<string>& cols,
452
0
                                             const string& msg) {
453
0
  client::TableIteratorOptions options;
454
0
  options.columns = cols;
455
0
  size_t nrows;
456
0
  LOG_TIMING(INFO, msg) {
457
0
    nrows = boost::size(client::TableRange(reader_table_));
458
0
  }
459
0
  ASSERT_EQ(nrows, kNumRows);
460
0
}
461
462
0
vector<string> FullStackInsertScanTest::AllColumnNames() const {
463
0
  vector<string> ret;
464
0
  for (size_t i = 0; i < schema_.num_columns(); i++) {
465
0
    ret.push_back(schema_.Column(i).name());
466
0
  }
467
0
  return ret;
468
0
}
469
470
}  // namespace tablet
471
}  // namespace yb