YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/tools/ysck_remote-test.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include <gtest/gtest.h>
34
35
#include "yb/client/client.h"
36
#include "yb/client/schema.h"
37
#include "yb/client/session.h"
38
#include "yb/client/table.h"
39
#include "yb/client/table_creator.h"
40
#include "yb/client/yb_op.h"
41
42
#include "yb/gutil/strings/substitute.h"
43
44
#include "yb/integration-tests/mini_cluster.h"
45
46
#include "yb/tools/data_gen_util.h"
47
#include "yb/tools/ysck_remote.h"
48
49
#include "yb/util/monotime.h"
50
#include "yb/util/promise.h"
51
#include "yb/util/random.h"
52
#include "yb/util/status_log.h"
53
#include "yb/util/test_util.h"
54
#include "yb/util/thread.h"
55
56
using namespace std::literals;
57
58
DECLARE_int32(heartbeat_interval_ms);
59
60
namespace yb {
61
namespace tools {
62
63
using client::YBColumnSchema;
64
using client::YBSchemaBuilder;
65
using client::YBSession;
66
using client::YBTable;
67
using client::YBTableCreator;
68
using client::YBTableName;
69
using std::shared_ptr;
70
using std::static_pointer_cast;
71
using std::string;
72
using std::vector;
73
using strings::Substitute;
74
75
static const YBTableName kTableName(YQL_DATABASE_CQL, "my_keyspace", "ysck-test-table");
76
77
class RemoteYsckTest : public YBTest {
78
 public:
79
  RemoteYsckTest()
80
0
    : random_(SeedRandom()) {
81
0
    YBSchemaBuilder b;
82
0
    b.AddColumn("key")->Type(INT32)->NotNull()->HashPrimaryKey();
83
0
    b.AddColumn("int_val")->Type(INT32)->NotNull();
84
0
    CHECK_OK(b.Build(&schema_));
85
0
  }
86
87
0
  void SetUp() override {
88
0
    YBTest::SetUp();
89
90
    // Speed up testing, saves about 700ms per TEST_F.
91
0
    FLAGS_heartbeat_interval_ms = 10;
92
93
0
    MiniClusterOptions opts;
94
0
    opts.num_tablet_servers = 3;
95
0
    mini_cluster_.reset(new MiniCluster(opts));
96
0
    ASSERT_OK(mini_cluster_->Start());
97
98
0
    master_rpc_addr_ = ASSERT_RESULT(mini_cluster_->GetLeaderMasterBoundRpcAddr());
99
100
    // Connect to the cluster.
101
0
    client_ = ASSERT_RESULT(mini_cluster_->CreateClient());
102
103
    // Create one table.
104
0
    ASSERT_OK(client_->CreateNamespaceIfNotExists(kTableName.namespace_name(),
105
0
                                                  kTableName.namespace_type()));
106
0
    std::unique_ptr<YBTableCreator> table_creator(client_->NewTableCreator());
107
0
    ASSERT_OK(table_creator->table_name(kTableName)
108
0
                     .schema(&schema_)
109
0
                     .num_tablets(3)
110
0
                     .Create());
111
    // Make sure we can open the table.
112
0
    ASSERT_OK(client_->OpenTable(kTableName, &client_table_));
113
114
0
    ASSERT_OK(RemoteYsckMaster::Build(HostPort(master_rpc_addr_), &master_));
115
0
    cluster_.reset(new YsckCluster(master_));
116
0
    ysck_.reset(new Ysck(cluster_));
117
0
  }
118
119
0
  void TearDown() override {
120
0
    client_.reset();
121
0
    if (mini_cluster_) {
122
0
      mini_cluster_->Shutdown();
123
0
      mini_cluster_.reset();
124
0
    }
125
0
    YBTest::TearDown();
126
0
  }
127
128
  // Writes rows to the table until the continue_writing flag is set to false.
129
  //
130
  // Public for use with std::bind.
131
  void GenerateRowWritesLoop(CountDownLatch* started_writing,
132
                             const AtomicBool& continue_writing,
133
0
                             Promise<Status>* promise) {
134
0
    shared_ptr<YBTable> table;
135
0
    Status status = client_->OpenTable(kTableName, &table);
136
0
    if (!status.ok()) {
137
0
      promise->Set(status);
138
0
      return;
139
0
    }
140
0
    shared_ptr<YBSession> session(client_->NewSession());
141
0
    session->SetTimeout(10s);
142
143
0
    for (uint64_t i = 0; continue_writing.Load(); i++) {
144
0
      std::shared_ptr<client::YBqlWriteOp> insert(table->NewQLInsert());
145
0
      GenerateDataForRow(table->schema(), i, &random_, insert->mutable_request());
146
0
      status = session->ApplyAndFlush(insert);
147
0
      if (!status.ok()) {
148
0
        promise->Set(status);
149
0
        return;
150
0
      }
151
0
      started_writing->CountDown(1);
152
0
    }
153
0
    promise->Set(Status::OK());
154
0
  }
155
156
 protected:
157
  // Generate a set of split rows for tablets used in this test.
158
0
  Status GenerateRowWrites(uint64_t num_rows) {
159
0
    shared_ptr<YBTable> table;
160
0
    RETURN_NOT_OK(client_->OpenTable(kTableName, &table));
161
0
    shared_ptr<YBSession> session(client_->NewSession());
162
0
    session->SetTimeout(10s);
163
0
    for (uint64_t i = 0; i < num_rows; i++) {
164
0
      VLOG(1) << "Generating write for row id " << i;
165
0
      std::shared_ptr<client::YBqlWriteOp> insert(table->NewQLInsert());
166
0
      GenerateDataForRow(table->schema(), i, &random_, insert->mutable_request());
167
0
      session->Apply(insert);
168
169
0
      if (i > 0 && i % 1000 == 0) {
170
0
        RETURN_NOT_OK(session->Flush());
171
0
      }
172
0
    }
173
0
    RETURN_NOT_OK(session->Flush());
174
0
    return Status::OK();
175
0
  }
176
177
  std::shared_ptr<Ysck> ysck_;
178
  std::unique_ptr<client::YBClient> client_;
179
180
 private:
181
  HostPort master_rpc_addr_;
182
  std::shared_ptr<MiniCluster> mini_cluster_;
183
  client::YBSchema schema_;
184
  shared_ptr<client::YBTable> client_table_;
185
  std::shared_ptr<YsckMaster> master_;
186
  std::shared_ptr<YsckCluster> cluster_;
187
  Random random_;
188
};
189
190
0
TEST_F(RemoteYsckTest, TestMasterOk) {
191
0
  ASSERT_OK(ysck_->CheckMasterRunning());
192
0
}
193
194
0
TEST_F(RemoteYsckTest, TestTabletServersOk) {
195
0
  LOG(INFO) << "Fetching table and tablet info...";
196
0
  ASSERT_OK(ysck_->FetchTableAndTabletInfo());
197
0
  LOG(INFO) << "Checking tablet servers are running...";
198
0
  ASSERT_OK(ysck_->CheckTabletServersRunning());
199
0
}
200
201
0
TEST_F(RemoteYsckTest, TestTableConsistency) {
202
0
  MonoTime deadline = MonoTime::Now();
203
0
  deadline.AddDelta(MonoDelta::FromSeconds(30));
204
0
  Status s;
205
0
  while (MonoTime::Now().ComesBefore(deadline)) {
206
0
    ASSERT_OK(ysck_->FetchTableAndTabletInfo());
207
0
    s = ysck_->CheckTablesConsistency();
208
0
    if (s.ok()) {
209
0
      break;
210
0
    }
211
0
    SleepFor(MonoDelta::FromMilliseconds(10));
212
0
  }
213
0
  ASSERT_OK(s);
214
0
}
215
216
0
TEST_F(RemoteYsckTest, TestChecksum) {
217
0
  uint64_t num_writes = 100;
218
0
  LOG(INFO) << "Generating row writes...";
219
0
  ASSERT_OK(GenerateRowWrites(num_writes));
220
221
0
  MonoTime deadline = MonoTime::Now();
222
0
  deadline.AddDelta(MonoDelta::FromSeconds(30));
223
0
  Status s;
224
0
  while (MonoTime::Now().ComesBefore(deadline)) {
225
0
    ASSERT_OK(ysck_->FetchTableAndTabletInfo());
226
0
    s = ysck_->ChecksumData(vector<string>(),
227
0
                            vector<string>(),
228
0
                            ChecksumOptions(MonoDelta::FromSeconds(1), 16));
229
0
    if (s.ok()) {
230
0
      break;
231
0
    }
232
0
    SleepFor(MonoDelta::FromMilliseconds(10));
233
0
  }
234
0
  ASSERT_OK(s);
235
0
}
236
237
0
TEST_F(RemoteYsckTest, TestChecksumTimeout) {
238
0
  uint64_t num_writes = 10000;
239
0
  LOG(INFO) << "Generating row writes...";
240
0
  ASSERT_OK(GenerateRowWrites(num_writes));
241
0
  ASSERT_OK(ysck_->FetchTableAndTabletInfo());
242
  // Use an impossibly low timeout value of zero!
243
0
  Status s = ysck_->ChecksumData(vector<string>(),
244
0
                                 vector<string>(),
245
0
                                 ChecksumOptions(MonoDelta::FromNanoseconds(0), 16));
246
0
  ASSERT_TRUE(s.IsTimedOut()) << "Expected TimedOut Status, got: " << s.ToString();
247
0
}
248
249
0
TEST_F(RemoteYsckTest, TestChecksumSnapshot) {
250
0
  CountDownLatch started_writing(1);
251
0
  AtomicBool continue_writing(true);
252
0
  Promise<Status> promise;
253
0
  scoped_refptr<Thread> writer_thread;
254
255
0
  CHECK_OK(
256
0
      Thread::Create("RemoteYsckTest",
257
0
                     "TestChecksumSnapshot",
258
0
                     &RemoteYsckTest::GenerateRowWritesLoop,
259
0
                     this,
260
0
                     &started_writing,
261
0
                     std::cref(continue_writing),
262
0
                     &promise,
263
0
                     &writer_thread));
264
0
  CHECK(started_writing.WaitFor(MonoDelta::FromSeconds(30)));
265
266
0
  uint64_t ts = client_->GetLatestObservedHybridTime();
267
0
  MonoTime start(MonoTime::Now());
268
0
  MonoTime deadline = start;
269
0
  deadline.AddDelta(MonoDelta::FromSeconds(30));
270
0
  Status s;
271
  // TODO: We need to loop here because safe time is not yet implemented.
272
  // Remove this loop when that is done. See KUDU-1056.
273
0
  while (true) {
274
0
    ASSERT_OK(ysck_->FetchTableAndTabletInfo());
275
0
    Status s = ysck_->ChecksumData(vector<string>(), vector<string>(),
276
0
                                   ChecksumOptions(MonoDelta::FromSeconds(10), 16));
277
0
    if (s.ok()) break;
278
0
    if (deadline.ComesBefore(MonoTime::Now())) break;
279
0
    SleepFor(MonoDelta::FromMilliseconds(10));
280
0
  }
281
0
  if (!s.ok()) {
282
0
    LOG(WARNING) << Substitute("Timed out after $0 waiting for ysck to become consistent on TS $1. "
283
0
                               "Status: $2",
284
0
                               MonoTime::Now().GetDeltaSince(start).ToString(),
285
0
                               ts, s.ToString());
286
0
    EXPECT_OK(s); // To avoid ASAN complaints due to thread reading the CountDownLatch.
287
0
  }
288
0
  continue_writing.Store(false);
289
0
  ASSERT_OK(promise.Get());
290
0
  writer_thread->Join();
291
0
}
292
293
// Test that followers & leader wait until safe time to respond to a snapshot
294
// scan at current hybrid_time. TODO: Safe time not yet implemented. See KUDU-1056.
295
0
TEST_F(RemoteYsckTest, DISABLED_TestChecksumSnapshotCurrentHybridTime) {
296
0
  CountDownLatch started_writing(1);
297
0
  AtomicBool continue_writing(true);
298
0
  Promise<Status> promise;
299
0
  scoped_refptr<Thread> writer_thread;
300
301
0
  CHECK_OK(
302
0
      Thread::Create("RemoteYsckTest",
303
0
                     "TestChecksumSnapshot",
304
0
                     &RemoteYsckTest::GenerateRowWritesLoop,
305
0
                     this,
306
0
                     &started_writing,
307
0
                     std::cref(continue_writing),
308
0
                     &promise,
309
0
                     &writer_thread));
310
0
  CHECK(started_writing.WaitFor(MonoDelta::FromSeconds(30)));
311
312
0
  ASSERT_OK(ysck_->FetchTableAndTabletInfo());
313
0
  ASSERT_OK(ysck_->ChecksumData(vector<string>(), vector<string>(),
314
0
                                ChecksumOptions(MonoDelta::FromSeconds(10), 16)));
315
0
  continue_writing.Store(false);
316
0
  ASSERT_OK(promise.Get());
317
0
  writer_thread->Join();
318
0
}
319
320
}  // namespace tools
321
}  // namespace yb