YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/integration-tests/flush_under_load-test.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include <atomic>
15
#include <cmath>
16
#include <cstdlib>
17
#include <future>
18
19
#include <gflags/gflags.h>
20
#include <glog/logging.h>
21
22
#include "yb/client/callbacks.h"
23
#include "yb/client/client.h"
24
#include "yb/client/table.h"
25
26
#include "yb/gutil/strings/split.h"
27
#include "yb/gutil/strings/substitute.h"
28
29
#include "yb/integration-tests/cluster_verifier.h"
30
#include "yb/integration-tests/load_generator.h"
31
#include "yb/integration-tests/mini_cluster.h"
32
#include "yb/integration-tests/yb_table_test_base.h"
33
34
#include "yb/master/mini_master.h"
35
36
#include "yb/tserver/mini_tablet_server.h"
37
#include "yb/tserver/tablet_server.h"
38
39
#include "yb/util/test_macros.h"
40
#include "yb/util/test_util.h"
41
42
using namespace std::literals;
43
44
using std::string;
45
using std::vector;
46
using std::unique_ptr;
47
48
using yb::client::YBValue;
49
50
using std::shared_ptr;
51
52
namespace yb {
53
namespace integration_tests {
54
55
using client::YBClient;
56
using client::YBClientBuilder;
57
using client::YBColumnSchema;
58
using client::YBSchema;
59
using client::YBSchemaBuilder;
60
using client::YBSession;
61
using client::YBStatusMemberCallback;
62
using client::YBTable;
63
using client::YBTableCreator;
64
using strings::Split;
65
66
class FlushUnderLoadTest : public YBTableTestBase {
67
 protected:
68
69
2
  bool use_external_mini_cluster() override { return false; }
70
71
0
  int num_tablets() override { return 1; }
72
1
  size_t num_tablet_servers() override { return 1; }
73
};
74
75
0
TEST_F(FlushUnderLoadTest, LoadTest) {
76
0
  std::atomic_bool stop_requested_flag(false);
77
0
  int rows = 5000;
78
0
  int start_key = 0;
79
0
  int writer_threads = 2;
80
0
  int reader_threads = 2;
81
0
  int value_size_bytes = 16;
82
0
  int max_write_errors = 0;
83
0
  int max_read_errors = 0;
84
85
  // Create two separate clients for reads and writes.
86
0
  auto write_client = CreateYBClient();
87
0
  auto read_client = CreateYBClient();
88
0
  yb::load_generator::YBSessionFactory write_session_factory(write_client.get(), &table_);
89
0
  yb::load_generator::YBSessionFactory read_session_factory(read_client.get(), &table_);
90
91
0
  yb::load_generator::MultiThreadedWriter writer(rows, start_key, writer_threads,
92
0
                                                 &write_session_factory, &stop_requested_flag,
93
0
                                                 value_size_bytes, max_write_errors);
94
0
  std::atomic<bool> pause_flag{false};
95
0
  writer.set_pause_flag(&pause_flag);
96
0
  yb::load_generator::MultiThreadedReader reader(rows, reader_threads, &read_session_factory,
97
0
                                                 writer.InsertionPoint(), writer.InsertedKeys(),
98
0
                                                 writer.FailedKeys(), &stop_requested_flag,
99
0
                                                 value_size_bytes, max_read_errors);
100
101
0
  writer.Start();
102
  // Having separate write requires adding in write client id to the reader.
103
0
  reader.set_client_id(write_session_factory.ClientId());
104
0
  reader.Start();
105
106
0
  while (writer.IsRunning()) {
107
0
    LOG(INFO) << "Pausing the workload";
108
0
    pause_flag = true;
109
0
    std::this_thread::sleep_for(500ms);
110
0
    LOG(INFO) << "Flushing all tablets";
111
0
    for (int i = 0; i < 3; ++i) {
112
      // In many cases there will be no records and we'll create an empty immutable memtable,
113
      // which is what we're trying to achieve.
114
0
      LOG(INFO) << "Switching memtables for all tablet servers";
115
0
      ASSERT_OK(mini_cluster_->SwitchMemtables());
116
117
      // Flush tablets, wait, etc.
118
0
      for (const auto& mini_ts : mini_cluster_->mini_tablet_servers()) {
119
0
        LOG(INFO) << "Flushing tablets for tablet server " << mini_ts->server()->permanent_uuid();
120
0
        ASSERT_OK(mini_ts->FlushTablets());
121
0
      }
122
0
      std::this_thread::sleep_for(500ms);
123
0
    }
124
0
    LOG(INFO) << "Resuming the workload";
125
0
    pause_flag = false;
126
0
    std::this_thread::sleep_for(1s);
127
0
  }
128
0
  writer.WaitForCompletion();
129
0
  LOG(INFO) << "Writing complete";
130
131
  // The reader will not stop on its own, so we stop it after a couple of seconds after the writer
132
  // stops.
133
0
  SleepFor(MonoDelta::FromSeconds(2));
134
0
  reader.Stop();
135
0
  reader.WaitForCompletion();
136
0
  LOG(INFO) << "Reading complete";
137
138
0
  ASSERT_EQ(0, writer.num_write_errors());
139
0
  ASSERT_EQ(0, reader.num_read_errors());
140
0
  ASSERT_GE(writer.num_writes(), rows);
141
142
0
  ClusterVerifier cluster_verifier(mini_cluster());
143
0
  ASSERT_NO_FATALS(cluster_verifier.CheckCluster());
144
0
  ASSERT_NO_FATALS(cluster_verifier.CheckRowCount(table_->name(), ClusterVerifier::EXACTLY, rows));
145
0
}
146
147
}  // namespace integration_tests
148
}  // namespace yb