YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/client/ql-stress-test.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include "yb/bfql/gen_opcodes.h"
15
16
#include "yb/client/client.h"
17
#include "yb/client/error.h"
18
#include "yb/client/ql-dml-test-base.h"
19
#include "yb/client/rejection_score_source.h"
20
#include "yb/client/schema.h"
21
#include "yb/client/session.h"
22
#include "yb/client/table.h"
23
#include "yb/client/table_handle.h"
24
#include "yb/client/transaction.h"
25
#include "yb/client/transaction_manager.h"
26
#include "yb/client/yb_op.h"
27
28
#include "yb/common/ql_value.h"
29
#include "yb/common/schema.h"
30
31
#include "yb/consensus/log.h"
32
#include "yb/consensus/log_reader.h"
33
#include "yb/consensus/raft_consensus.h"
34
#include "yb/consensus/retryable_requests.h"
35
36
#include "yb/docdb/consensus_frontier.h"
37
#include "yb/docdb/doc_key.h"
38
#include "yb/docdb/docdb_rocksdb_util.h"
39
40
#include "yb/rocksdb/metadata.h"
41
#include "yb/rocksdb/utilities/checkpoint.h"
42
43
#include "yb/rpc/messenger.h"
44
45
#include "yb/server/hybrid_clock.h"
46
47
#include "yb/tablet/tablet.h"
48
#include "yb/tablet/tablet_metadata.h"
49
#include "yb/tablet/tablet_options.h"
50
#include "yb/tablet/tablet_peer.h"
51
52
#include "yb/tserver/mini_tablet_server.h"
53
#include "yb/tserver/tablet_server.h"
54
#include "yb/tserver/ts_tablet_manager.h"
55
56
#include "yb/util/debug-util.h"
57
#include "yb/util/format.h"
58
#include "yb/util/metrics.h"
59
#include "yb/util/random_util.h"
60
#include "yb/util/size_literals.h"
61
#include "yb/util/status_format.h"
62
#include "yb/util/status_log.h"
63
#include "yb/util/test_thread_holder.h"
64
#include "yb/util/tsan_util.h"
65
66
#include "yb/yql/cql/ql/util/statement_result.h"
67
68
DECLARE_bool(TEST_combine_batcher_errors);
69
DECLARE_bool(allow_preempting_compactions);
70
DECLARE_bool(detect_duplicates_for_retryable_requests);
71
DECLARE_bool(enable_ondisk_compression);
72
DECLARE_double(TEST_respond_write_failed_probability);
73
DECLARE_double(transaction_max_missed_heartbeat_periods);
74
DECLARE_int32(TEST_max_write_waiters);
75
DECLARE_int32(client_read_write_timeout_ms);
76
DECLARE_int32(log_cache_size_limit_mb);
77
DECLARE_int32(log_min_seconds_to_retain);
78
DECLARE_int32(raft_heartbeat_interval_ms);
79
DECLARE_int32(retryable_request_range_time_limit_secs);
80
DECLARE_int32(rocksdb_level0_file_num_compaction_trigger);
81
DECLARE_int32(rocksdb_level0_slowdown_writes_trigger);
82
DECLARE_int32(rocksdb_max_background_compactions);
83
DECLARE_int32(rocksdb_universal_compaction_min_merge_width);
84
DECLARE_int32(rocksdb_universal_compaction_size_ratio);
85
DECLARE_int64(db_write_buffer_size);
86
DECLARE_int64(remote_bootstrap_rate_limit_bytes_per_sec);
87
DECLARE_int64(rocksdb_compact_flush_rate_limit_bytes_per_sec);
88
DECLARE_int64(transaction_rpc_timeout_ms);
89
DECLARE_uint64(log_segment_size_bytes);
90
DECLARE_uint64(sst_files_hard_limit);
91
DECLARE_uint64(sst_files_soft_limit);
92
93
METRIC_DECLARE_counter(majority_sst_files_rejections);
94
95
using namespace std::literals;
96
97
using rocksdb::checkpoint::CreateCheckpoint;
98
using rocksdb::UserFrontierPtr;
99
using yb::tablet::TabletOptions;
100
using yb::docdb::InitRocksDBOptions;
101
102
DECLARE_bool(enable_ysql);
103
104
namespace yb {
105
namespace client {
106
107
namespace {
108
109
const std::string kValueColumn = "v";
110
111
}
112
113
class QLStressTest : public QLDmlTestBase<MiniCluster> {
114
 public:
115
17
  QLStressTest() {
116
17
  }
117
118
17
  void SetUp() override {
119
    // To prevent automatic creation of the transaction status table.
120
17
    SetAtomicFlag(false, &FLAGS_enable_ysql);
121
122
17
    ASSERT_NO_FATALS(QLDmlTestBase::SetUp());
123
124
0
    YBSchemaBuilder b;
125
0
    InitSchemaBuilder(&b);
126
0
    CompleteSchemaBuilder(&b);
127
128
0
    ASSERT_OK(table_.Create(kTableName, NumTablets(), client_.get(), &b));
129
0
    ASSERT_OK(WaitForTabletLeaders());
130
0
  }
131
132
0
  virtual void CompleteSchemaBuilder(YBSchemaBuilder* b) {}
133
134
0
  virtual int NumTablets() {
135
0
    return CalcNumTablets(3);
136
0
  }
137
138
0
  virtual void InitSchemaBuilder(YBSchemaBuilder* builder) {
139
0
    builder->AddColumn("h")->Type(INT32)->HashPrimaryKey()->NotNull();
140
0
    builder->AddColumn(kValueColumn)->Type(STRING);
141
0
  }
142
143
0
  CHECKED_STATUS WaitForTabletLeaders() {
144
0
    const MonoTime deadline = MonoTime::Now() + 10s * kTimeMultiplier;
145
0
    for (const auto& tablet_id : ListTabletIdsForTable(cluster_.get(), table_->id())) {
146
0
      RETURN_NOT_OK(WaitUntilTabletHasLeader(cluster_.get(), tablet_id, deadline));
147
0
    }
148
0
    return Status::OK();
149
0
  }
150
151
  YBqlWriteOpPtr InsertRow(const YBSessionPtr& session,
152
                           const TableHandle& table,
153
                           int32_t key,
154
0
                           const std::string& value) {
155
0
    auto op = table.NewWriteOp(QLWriteRequestPB::QL_STMT_INSERT);
156
0
    auto* const req = op->mutable_request();
157
0
    QLAddInt32HashValue(req, key);
158
0
    table.AddStringColumnValue(req, kValueColumn, value);
159
0
    session->Apply(op);
160
0
    return op;
161
0
  }
162
163
  CHECKED_STATUS WriteRow(const YBSessionPtr& session,
164
                          const TableHandle& table,
165
                          int32_t key,
166
0
                          const std::string& value) {
167
0
    auto op = InsertRow(session, table, key, value);
168
0
    RETURN_NOT_OK(session->Flush());
169
0
    if (op->response().status() != QLResponsePB::YQL_STATUS_OK) {
170
0
      return STATUS_FORMAT(
171
0
          RemoteError, "Write failed: $0", QLResponsePB::QLStatus_Name(op->response().status()));
172
0
    }
173
174
0
    return Status::OK();
175
0
  }
176
177
0
  YBqlReadOpPtr SelectRow(const YBSessionPtr& session, const TableHandle& table, int32_t key) {
178
0
    auto op = table.NewReadOp();
179
0
    auto* const req = op->mutable_request();
180
0
    QLAddInt32HashValue(req, key);
181
0
    table.AddColumns({kValueColumn}, req);
182
0
    session->Apply(op);
183
0
    return op;
184
0
  }
185
186
0
  Result<QLValue> ReadRow(const YBSessionPtr& session, const TableHandle& table, int32_t key) {
187
0
    auto op = SelectRow(session, table, key);
188
0
    RETURN_NOT_OK(session->Flush());
189
0
    if (op->response().status() != QLResponsePB::YQL_STATUS_OK) {
190
0
      return STATUS_FORMAT(
191
0
          RemoteError, "Read failed: $0", QLResponsePB::QLStatus_Name(op->response().status()));
192
0
    }
193
0
    auto rowblock = ql::RowsResult(op.get()).GetRowBlock();
194
0
    if (rowblock->row_count() != 1) {
195
0
      return STATUS_FORMAT(NotFound, "Bad count for $0, count: $1", key, rowblock->row_count());
196
0
    }
197
0
    const auto& row = rowblock->row(0);
198
0
    return row.column(0);
199
0
  }
200
201
  YBqlWriteOpPtr InsertRow(const YBSessionPtr& session,
202
                           int32_t key,
203
0
                           const std::string& value) {
204
0
    return QLStressTest::InsertRow(session, table_, key, value);
205
0
  }
206
207
  CHECKED_STATUS WriteRow(const YBSessionPtr& session,
208
                          int32_t key,
209
0
                          const std::string& value) {
210
0
    return QLStressTest::WriteRow(session, table_, key, value);
211
0
  }
212
213
0
  YBqlReadOpPtr SelectRow(const YBSessionPtr& session, int32_t key) {
214
0
    return QLStressTest::SelectRow(session, table_, key);
215
0
  }
216
217
0
  Result<QLValue> ReadRow(const YBSessionPtr& session, int32_t key) {
218
0
    return QLStressTest::ReadRow(session, table_, key);
219
0
  }
220
221
  TransactionManager CreateTxnManager();
222
223
  void VerifyFlushedFrontiers();
224
225
  void TestRetryWrites(bool restarts);
226
227
  bool CheckRetryableRequestsCountsAndLeaders(size_t total_leaders, size_t* total_entries);
228
229
  void AddWriter(
230
      std::string value_prefix, std::atomic<int>* key, TestThreadHolder* thread_holder,
231
      const std::chrono::nanoseconds& sleep_duration = std::chrono::nanoseconds(),
232
      bool allow_failures = false, TransactionManager* txn_manager = nullptr,
233
      double transactional_write_probability = 0.0);
234
235
  void TestWriteRejection();
236
237
  TableHandle table_;
238
239
  int checkpoint_index_ = 0;
240
};
241
242
/*
243
 * Create a lot of tables and check that each of them are usable (can read/write to them).
244
 * Test enough rows/keys to ensure that most tablets will be hit.
245
 */
246
0
TEST_F(QLStressTest, LargeNumberOfTables) {
247
0
  int num_tables = NonTsanVsTsan(20, 10);
248
0
  int num_tablets_per_table = NonTsanVsTsan(3, 1);
249
0
  auto session = NewSession();
250
0
  for (int i = 0; i < num_tables; i++) {
251
0
    YBSchemaBuilder b;
252
0
    InitSchemaBuilder(&b);
253
0
    CompleteSchemaBuilder(&b);
254
0
    TableHandle table;
255
0
    client::YBTableName table_name(
256
0
        YQL_DATABASE_CQL, "my_keyspace", "ql_client_test_table_" + std::to_string(i));
257
0
    ASSERT_OK(table.Create(table_name, num_tablets_per_table, client_.get(), &b));
258
259
0
    int num_rows = num_tablets_per_table * 5;
260
0
    for (int key = i; key < i + num_rows; key++) {
261
0
      string value = "value_" + std::to_string(key);
262
0
      ASSERT_OK(WriteRow(session, table, key, value));
263
0
      auto read_value = ASSERT_RESULT(ReadRow(session, table, key));
264
0
      ASSERT_EQ(read_value.string_value(), value) << read_value.ToString();
265
0
    }
266
0
  }
267
0
}
268
269
bool QLStressTest::CheckRetryableRequestsCountsAndLeaders(
270
0
      size_t expected_leaders, size_t* total_entries) {
271
0
  size_t total_leaders = 0;
272
0
  *total_entries = 0;
273
0
  bool result = true;
274
0
  size_t replicated_limit = FLAGS_detect_duplicates_for_retryable_requests ? 1 : 0;
275
0
  auto peers = ListTabletPeers(cluster_.get(), ListPeersFilter::kAll);
276
0
  for (const auto& peer : peers) {
277
0
    auto leader = peer->LeaderStatus() != consensus::LeaderStatus::NOT_LEADER;
278
0
    if (!peer->tablet() || peer->tablet()->metadata()->table_id() != table_.table()->id()) {
279
0
      continue;
280
0
    }
281
0
    size_t tablet_entries = peer->tablet()->TEST_CountRegularDBRecords();
282
0
    auto raft_consensus = down_cast<consensus::RaftConsensus*>(peer->consensus());
283
0
    auto request_counts = raft_consensus->TEST_CountRetryableRequests();
284
0
    LOG(INFO) << "T " << peer->tablet()->tablet_id() << " P " << peer->permanent_uuid()
285
0
              << ", entries: " << tablet_entries
286
0
              << ", running: " << request_counts.running
287
0
              << ", replicated: " << request_counts.replicated
288
0
              << ", leader: " << leader
289
0
              << ", term: " << raft_consensus->LeaderTerm();
290
0
    if (leader) {
291
0
      *total_entries += tablet_entries;
292
0
      ++total_leaders;
293
0
    }
294
    // Last write request could be rejected as duplicate, so followers would not be able to
295
    // cleanup replicated requests.
296
0
    if (request_counts.running != 0 || (leader && request_counts.replicated > replicated_limit)) {
297
0
      result = false;
298
0
    }
299
0
  }
300
301
0
  if (total_leaders != expected_leaders) {
302
0
    LOG(INFO) << "Expected " << expected_leaders << " leaders, found " << total_leaders;
303
0
    return false;
304
0
  }
305
306
0
  if (result && FLAGS_detect_duplicates_for_retryable_requests) {
307
0
    auto peers = ListTabletPeers(cluster_.get(), ListPeersFilter::kLeaders);
308
0
    for (const auto& peer : peers) {
309
0
      if (peer->tablet()->metadata()->table_id() != table_.table()->id()) {
310
0
        continue;
311
0
      }
312
0
      auto db = peer->tablet()->TEST_db();
313
0
      rocksdb::ReadOptions read_opts;
314
0
      read_opts.query_id = rocksdb::kDefaultQueryId;
315
0
      std::unique_ptr<rocksdb::Iterator> iter(db->NewIterator(read_opts));
316
0
      std::unordered_map<std::string, std::string> keys;
317
318
0
      for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
319
0
        Slice key = iter->key();
320
0
        EXPECT_OK(DocHybridTime::DecodeFromEnd(&key));
321
0
        auto emplace_result = keys.emplace(key.ToBuffer(), iter->key().ToBuffer());
322
0
        if (!emplace_result.second) {
323
0
          LOG(ERROR)
324
0
              << "Duplicate key: " << docdb::SubDocKey::DebugSliceToString(iter->key())
325
0
              << " vs " << docdb::SubDocKey::DebugSliceToString(emplace_result.first->second);
326
0
        }
327
0
      }
328
0
    }
329
0
  }
330
331
0
  return result;
332
0
}
333
334
0
TransactionManager QLStressTest::CreateTxnManager() {
335
0
  server::ClockPtr clock(new server::HybridClock(WallClock()));
336
0
  EXPECT_OK(clock->Init());
337
0
  return TransactionManager(client_.get(), clock, client::LocalTabletFilter());
338
0
}
339
340
0
void QLStressTest::TestRetryWrites(bool restarts) {
341
0
  const size_t kConcurrentWrites = 5;
342
  // Used only when table is transactional.
343
0
  const double kTransactionalWriteProbability = 0.5;
344
345
0
  SetAtomicFlag(0.25, &FLAGS_TEST_respond_write_failed_probability);
346
347
0
  const bool transactional = table_.table()->schema().table_properties().is_transactional();
348
0
  boost::optional<TransactionManager> txn_manager;
349
0
  if (transactional) {
350
0
    txn_manager = CreateTxnManager();
351
0
  }
352
353
0
  TestThreadHolder thread_holder;
354
0
  std::atomic<int32_t> key_source(0);
355
0
  for (int i = 0; i != kConcurrentWrites; ++i) {
356
0
    thread_holder.AddThreadFunctor(
357
0
        [this, &key_source, &stop_requested = thread_holder.stop_flag(),
358
0
         &txn_manager, kTransactionalWriteProbability] {
359
0
      auto session = NewSession();
360
0
      while (!stop_requested.load(std::memory_order_acquire)) {
361
0
        int32_t key = key_source.fetch_add(1, std::memory_order_acq_rel);
362
0
        YBTransactionPtr txn;
363
0
        if (txn_manager &&
364
0
            RandomActWithProbability(kTransactionalWriteProbability)) {
365
0
          txn = std::make_shared<YBTransaction>(txn_manager.get_ptr());
366
0
          ASSERT_OK(txn->Init(IsolationLevel::SNAPSHOT_ISOLATION));
367
0
          session->SetTransaction(txn);
368
0
        } else {
369
0
          session->SetTransaction(nullptr);
370
0
        }
371
372
0
        auto op = InsertRow(session, key, Format("value_$0", key));
373
0
        auto flush_status = session->FlushAndGetOpsErrors();
374
0
        const auto& status = flush_status.status;
375
0
        if (status.ok()) {
376
0
          ASSERT_EQ(op->response().status(), QLResponsePB::YQL_STATUS_OK);
377
378
0
          if (txn) {
379
0
            auto commit_status = txn->CommitFuture().get();
380
0
            if (!commit_status.ok()) {
381
0
              LOG(INFO) << "Commit failed, key: " << key << ", txn: " << txn->id()
382
0
                        << ", commit failed: " << commit_status;
383
0
              ASSERT_TRUE(commit_status.IsExpired());
384
0
            }
385
0
          }
386
0
          continue;
387
0
        }
388
0
        ASSERT_TRUE(status.IsIOError()) << "Status: " << AsString(status);
389
0
        ASSERT_EQ(op->response().status(), QLResponsePB::YQL_STATUS_RUNTIME_ERROR);
390
0
        ASSERT_EQ(op->response().error_message(), "Duplicate request");
391
0
      }
392
0
    });
393
0
  }
394
395
0
  if (restarts) {
396
0
    thread_holder.AddThread(RestartsThread(cluster_.get(), 5s, &thread_holder.stop_flag()));
397
0
  }
398
399
0
  thread_holder.WaitAndStop(restarts ? 60s : 15s);
400
401
0
  int written_keys = key_source.load(std::memory_order_acquire);
402
0
  auto session = NewSession();
403
0
  for (int key = 0; key != written_keys; ++key) {
404
0
    auto value = ASSERT_RESULT(ReadRow(session, key));
405
0
    ASSERT_EQ(value.string_value(), Format("value_$0", key));
406
0
  }
407
408
0
  size_t total_entries = 0;
409
0
  size_t expected_leaders = table_.table()->GetPartitionCount();
410
0
  ASSERT_OK(WaitFor(
411
0
      std::bind(&QLStressTest::CheckRetryableRequestsCountsAndLeaders, this,
412
0
                expected_leaders, &total_entries),
413
0
      15s, "Retryable requests cleanup and leader wait"));
414
415
  // We have 2 entries per row.
416
0
  if (FLAGS_detect_duplicates_for_retryable_requests) {
417
0
    ASSERT_EQ(total_entries, written_keys * 2);
418
0
  } else {
419
    // If duplicate request tracking is disabled, then total_entries should be greater than
420
    // written keys, otherwise test does not work.
421
0
    ASSERT_GT(total_entries, written_keys * 2);
422
0
  }
423
424
0
  ASSERT_GE(written_keys, RegularBuildVsSanitizers(100, 40));
425
0
}
426
427
0
TEST_F(QLStressTest, RetryWrites) {
428
0
  FLAGS_detect_duplicates_for_retryable_requests = true;
429
0
  TestRetryWrites(false /* restarts */);
430
0
}
431
432
0
TEST_F(QLStressTest, RetryWritesWithRestarts) {
433
0
  FLAGS_detect_duplicates_for_retryable_requests = true;
434
0
  TestRetryWrites(true /* restarts */);
435
0
}
436
437
0
void SetTransactional(YBSchemaBuilder* builder) {
438
0
  TableProperties table_properties;
439
0
  table_properties.SetTransactional(true);
440
0
  builder->SetTableProperties(table_properties);
441
0
}
442
443
class QLTransactionalStressTest : public QLStressTest {
444
 public:
445
2
  void SetUp() override {
446
2
    FLAGS_transaction_rpc_timeout_ms =
447
2
        std::chrono::duration_cast<std::chrono::milliseconds>(1min).count();
448
2
    FLAGS_transaction_max_missed_heartbeat_periods = 1000000;
449
2
    FLAGS_retryable_request_range_time_limit_secs = 600;
450
2
    ASSERT_NO_FATALS(QLStressTest::SetUp());
451
2
  }
452
453
0
  void CompleteSchemaBuilder(YBSchemaBuilder* builder) override {
454
0
    SetTransactional(builder);
455
0
  }
456
};
457
458
0
TEST_F_EX(QLStressTest, RetryTransactionalWrites, QLTransactionalStressTest) {
459
0
  FLAGS_detect_duplicates_for_retryable_requests = true;
460
0
  TestRetryWrites(false /* restarts */);
461
0
}
462
463
0
TEST_F_EX(QLStressTest, RetryTransactionalWritesWithRestarts, QLTransactionalStressTest) {
464
0
  FLAGS_detect_duplicates_for_retryable_requests = true;
465
0
  TestRetryWrites(true /* restarts */);
466
0
}
467
468
0
TEST_F(QLStressTest, RetryWritesDisabled) {
469
0
  FLAGS_detect_duplicates_for_retryable_requests = false;
470
0
  TestRetryWrites(false /* restarts */);
471
0
}
472
473
class QLStressTestIntValue : public QLStressTest {
474
 private:
475
0
  void InitSchemaBuilder(YBSchemaBuilder* builder) override {
476
0
    builder->AddColumn("h")->Type(INT32)->HashPrimaryKey()->NotNull();
477
0
    builder->AddColumn(kValueColumn)->Type(INT64);
478
0
  }
479
};
480
481
// This test does 100 concurrent increments of the same row.
482
// It is expected that resulting value will be equal to 100.
483
0
TEST_F_EX(QLStressTest, Increment, QLStressTestIntValue) {
484
0
  const auto kIncrements = 100;
485
0
  const auto kKey = 1;
486
487
0
  auto session = NewSession();
488
0
  {
489
0
    auto op = table_.NewWriteOp(QLWriteRequestPB::QL_STMT_INSERT);
490
0
    auto* const req = op->mutable_request();
491
0
    QLAddInt32HashValue(req, kKey);
492
0
    table_.AddInt64ColumnValue(req, kValueColumn, 0);
493
0
    ASSERT_OK(session->ApplyAndFlush(op));
494
0
    ASSERT_EQ(op->response().status(), QLResponsePB::YQL_STATUS_OK);
495
0
  }
496
497
0
  std::vector<YBqlWriteOpPtr> write_ops;
498
0
  std::vector<std::shared_future<FlushStatus>> futures;
499
500
0
  auto value_column_id = table_.ColumnId(kValueColumn);
501
0
  for (int i = 0; i != kIncrements; ++i) {
502
0
    auto op = table_.NewWriteOp(QLWriteRequestPB::QL_STMT_UPDATE);
503
0
    auto* const req = op->mutable_request();
504
0
    QLAddInt32HashValue(req, kKey);
505
0
    req->mutable_column_refs()->add_ids(value_column_id);
506
0
    auto* column_value = req->add_column_values();
507
0
    column_value->set_column_id(value_column_id);
508
0
    auto* bfcall = column_value->mutable_expr()->mutable_bfcall();
509
0
    bfcall->set_opcode(to_underlying(bfql::BFOpcode::OPCODE_AddI64I64_80));
510
0
    bfcall->add_operands()->set_column_id(value_column_id);
511
0
    bfcall->add_operands()->mutable_value()->set_int64_value(1);
512
0
    write_ops.push_back(op);
513
0
  }
514
515
0
  for (const auto& op : write_ops) {
516
0
    session->Apply(op);
517
0
    futures.push_back(session->FlushFuture());
518
0
  }
519
520
0
  for (size_t i = 0; i != write_ops.size(); ++i) {
521
0
    ASSERT_OK(futures[i].get().status);
522
0
    ASSERT_EQ(write_ops[i]->response().status(), QLResponsePB::YQL_STATUS_OK);
523
0
  }
524
525
0
  auto value = ASSERT_RESULT(ReadRow(session, kKey));
526
0
  ASSERT_EQ(value.int64_value(), kIncrements) << value.ToString();
527
0
}
528
529
class QLStressTestSingleTablet : public QLStressTest {
530
 private:
531
0
  int NumTablets() override {
532
0
    return 1;
533
0
  }
534
};
535
536
// This test has the following scenario:
537
// Add some operations to the old leader, but don't add to other nodes.
538
// Switch leadership to a new leader, but don't accept updates from new leader by old leader.
539
// Also don't replicate no op by the new leader.
540
// Switch leadership back to the old leader.
541
// New leader should successfully accept old operations from old leader.
542
0
TEST_F_EX(QLStressTest, ShortTimeLeaderDoesNotReplicateNoOp, QLStressTestSingleTablet) {
543
0
  auto session = NewSession();
544
0
  ASSERT_OK(WriteRow(session, 0, "value0"));
545
546
0
  auto leaders = ListTabletPeers(cluster_.get(), ListPeersFilter::kLeaders);
547
0
  ASSERT_EQ(1, leaders.size());
548
0
  auto old_leader = leaders[0];
549
550
0
  auto followers = ListTabletPeers(cluster_.get(), ListPeersFilter::kNonLeaders);
551
0
  ASSERT_EQ(2, followers.size());
552
0
  tablet::TabletPeerPtr temp_leader = followers[0];
553
0
  tablet::TabletPeerPtr always_follower = followers[1];
554
555
0
  ASSERT_OK(WaitFor([old_leader, always_follower]() -> Result<bool> {
556
0
    auto leader_op_id = old_leader->consensus()->GetLastReceivedOpId();
557
0
    auto follower_op_id = always_follower->consensus()->GetLastReceivedOpId();
558
0
    return follower_op_id == leader_op_id;
559
0
  }, 5s, "Follower catch up"));
560
561
0
  for (const auto& follower : followers) {
562
0
    down_cast<consensus::RaftConsensus*>(follower->consensus())->TEST_RejectMode(
563
0
        consensus::RejectMode::kAll);
564
0
  }
565
566
0
  InsertRow(session, 1, "value1");
567
0
  auto flush_future = session->FlushFuture();
568
569
0
  InsertRow(session, 2, "value2");
570
0
  auto flush_future2 = session->FlushFuture();
571
572
  // Give leader some time to receive operation.
573
  // TODO wait for specific event.
574
0
  std::this_thread::sleep_for(1s);
575
576
0
  LOG(INFO) << "Step down old leader " << old_leader->permanent_uuid()
577
0
            << " in favor of " << temp_leader->permanent_uuid();
578
579
0
  ASSERT_OK(StepDown(old_leader, temp_leader->permanent_uuid(), ForceStepDown::kFalse));
580
581
0
  down_cast<consensus::RaftConsensus*>(old_leader->consensus())->TEST_RejectMode(
582
0
      consensus::RejectMode::kAll);
583
0
  down_cast<consensus::RaftConsensus*>(temp_leader->consensus())->TEST_RejectMode(
584
0
      consensus::RejectMode::kNone);
585
0
  down_cast<consensus::RaftConsensus*>(always_follower->consensus())->TEST_RejectMode(
586
0
      consensus::RejectMode::kNonEmpty);
587
588
0
  ASSERT_OK(WaitForLeaderOfSingleTablet(
589
0
      cluster_.get(), temp_leader, 20s, "Waiting for new leader"));
590
591
  // Give new leader some time to request lease.
592
  // TODO wait for specific event.
593
0
  std::this_thread::sleep_for(3s);
594
0
  auto temp_leader_safe_time = ASSERT_RESULT(
595
0
      temp_leader->tablet()->SafeTime(tablet::RequireLease::kTrue));
596
0
  LOG(INFO) << "Safe time: " << temp_leader_safe_time;
597
598
0
  LOG(INFO) << "Transferring leadership from " << temp_leader->permanent_uuid()
599
0
            << " back to " << old_leader->permanent_uuid();
600
0
  ASSERT_OK(StepDown(temp_leader, old_leader->permanent_uuid(), ForceStepDown::kTrue));
601
602
0
  ASSERT_OK(WaitForLeaderOfSingleTablet(
603
0
      cluster_.get(), old_leader, 20s, "Waiting old leader to restore leadership"));
604
605
0
  down_cast<consensus::RaftConsensus*>(always_follower->consensus())->TEST_RejectMode(
606
0
      consensus::RejectMode::kNone);
607
608
0
  ASSERT_OK(WriteRow(session, 3, "value3"));
609
610
0
  ASSERT_OK(flush_future.get().status);
611
0
  ASSERT_OK(flush_future2.get().status);
612
0
}
613
614
namespace {
615
616
0
void VerifyFlushedFrontier(const UserFrontierPtr& frontier, OpId* op_id) {
617
0
  ASSERT_TRUE(frontier);
618
0
  if (frontier) {
619
0
    *op_id = down_cast<docdb::ConsensusFrontier*>(frontier.get())->op_id();
620
0
    ASSERT_GT(op_id->term, 0);
621
0
    ASSERT_GT(op_id->index, 0);
622
0
  }
623
0
}
624
625
}  // anonymous namespace
626
0
void QLStressTest::VerifyFlushedFrontiers() {
627
0
  for (const auto& mini_tserver : cluster_->mini_tablet_servers()) {
628
0
    auto peers = mini_tserver->server()->tablet_manager()->GetTabletPeers();
629
0
    for (const auto& peer : peers) {
630
0
      rocksdb::DB* db = peer->tablet()->TEST_db();
631
0
      OpId op_id;
632
0
      ASSERT_NO_FATALS(VerifyFlushedFrontier(db->GetFlushedFrontier(), &op_id));
633
634
      // Also check that if we checkpoint this DB and open the checkpoint separately, the
635
      // flushed frontier non-zero as well.
636
0
      std::string checkpoint_dir;
637
0
      ASSERT_OK(Env::Default()->GetTestDirectory(&checkpoint_dir));
638
0
      checkpoint_dir += Format("/checkpoint_$0", checkpoint_index_);
639
0
      checkpoint_index_++;
640
641
0
      ASSERT_OK(CreateCheckpoint(db, checkpoint_dir));
642
643
0
      rocksdb::Options options;
644
0
      auto tablet_options = TabletOptions();
645
0
      tablet_options.rocksdb_env = db->GetEnv();
646
0
      InitRocksDBOptions(&options, "", nullptr, tablet_options);
647
0
      std::unique_ptr<rocksdb::DB> checkpoint_db;
648
0
      rocksdb::DB* checkpoint_db_raw_ptr = nullptr;
649
650
0
      options.create_if_missing = false;
651
0
      ASSERT_OK(rocksdb::DB::Open(options, checkpoint_dir, &checkpoint_db_raw_ptr));
652
0
      checkpoint_db.reset(checkpoint_db_raw_ptr);
653
0
      OpId checkpoint_op_id;
654
0
      ASSERT_NO_FATALS(
655
0
          VerifyFlushedFrontier(checkpoint_db->GetFlushedFrontier(), &checkpoint_op_id));
656
0
      ASSERT_OK(Env::Default()->DeleteRecursively(checkpoint_dir));
657
658
0
      ASSERT_LE(op_id, checkpoint_op_id);
659
0
    }
660
0
  }
661
0
}
662
663
0
TEST_F_EX(QLStressTest, FlushCompact, QLStressTestSingleTablet) {
664
0
  std::atomic<int> key;
665
666
0
  TestThreadHolder thread_holder;
667
668
0
  AddWriter("value_", &key, &thread_holder);
669
670
0
  auto start_time = MonoTime::Now();
671
0
  const auto kTimeout = MonoDelta::FromSeconds(60);
672
0
  int num_iter = 0;
673
0
  while (MonoTime::Now() - start_time < kTimeout) {
674
0
    ++num_iter;
675
0
    std::this_thread::sleep_for(1s);
676
0
    ASSERT_OK(cluster_->FlushTablets());
677
0
    ASSERT_NO_FATALS(VerifyFlushedFrontiers());
678
0
    std::this_thread::sleep_for(1s);
679
0
    auto compact_status = cluster_->CompactTablets();
680
0
    LOG_IF(INFO, !compact_status.ok()) << "Compaction failed: " << compact_status;
681
0
    ASSERT_NO_FATALS(VerifyFlushedFrontiers());
682
0
  }
683
0
  ASSERT_GE(num_iter, 5);
684
0
}
685
686
// The scenario of this test is the following:
687
// We do writes in background.
688
// Isolate leader for 10 seconds.
689
// Restore connectivity.
690
// Check that old leader was able to catch up after the partition is healed.
691
0
TEST_F_EX(QLStressTest, OldLeaderCatchUpAfterNetworkPartition, QLStressTestSingleTablet) {
692
0
  FLAGS_TEST_combine_batcher_errors = true;
693
694
0
  tablet::TabletPeer* leader_peer = nullptr;
695
0
  std::atomic<int> key(0);
696
0
  {
697
0
    TestThreadHolder thread_holder;
698
699
0
    AddWriter("value_", &key, &thread_holder);
700
701
0
    std::this_thread::sleep_for(5s * yb::kTimeMultiplier);
702
703
0
    tserver::MiniTabletServer* leader = nullptr;
704
0
    for (size_t i = 0; i != cluster_->num_tablet_servers(); ++i) {
705
0
      auto current = cluster_->mini_tablet_server(i);
706
0
      auto peers = current->server()->tablet_manager()->GetTabletPeers();
707
0
      ASSERT_EQ(peers.size(), 1);
708
0
      if (peers.front()->LeaderStatus() != consensus::LeaderStatus::NOT_LEADER) {
709
0
        leader = current;
710
0
        leader_peer = peers.front().get();
711
0
        break;
712
0
      }
713
0
    }
714
715
0
    ASSERT_NE(leader, nullptr);
716
717
0
    auto pre_isolate_op_id = leader_peer->GetLatestLogEntryOpId();
718
0
    LOG(INFO) << "Isolate, last op id: " << pre_isolate_op_id << ", key: " << key;
719
0
    ASSERT_GE(pre_isolate_op_id.term, 1);
720
0
    ASSERT_GT(pre_isolate_op_id.index, key);
721
0
    leader->Isolate();
722
0
    std::this_thread::sleep_for(10s * yb::kTimeMultiplier);
723
724
0
    auto pre_restore_op_id = leader_peer->GetLatestLogEntryOpId();
725
0
    LOG(INFO) << "Restore, last op id: " << pre_restore_op_id << ", key: " << key;
726
0
    ASSERT_EQ(pre_restore_op_id.term, pre_isolate_op_id.term);
727
0
    ASSERT_GE(pre_restore_op_id.index, pre_isolate_op_id.index);
728
0
    ASSERT_LE(pre_restore_op_id.index, pre_isolate_op_id.index + 10);
729
0
    ASSERT_OK(leader->Reconnect());
730
731
0
    thread_holder.WaitAndStop(5s * yb::kTimeMultiplier);
732
0
  }
733
734
0
  ASSERT_OK(WaitFor([leader_peer, &key] {
735
0
    return leader_peer->GetLatestLogEntryOpId().index > key;
736
0
  }, 5s, "Old leader has enough operations"));
737
738
0
  auto finish_op_id = leader_peer->GetLatestLogEntryOpId();
739
0
  LOG(INFO) << "Finish, last op id: " << finish_op_id << ", key: " << key;
740
0
  ASSERT_GT(finish_op_id.term, 1);
741
0
  ASSERT_GT(finish_op_id.index, key);
742
0
}
743
744
0
TEST_F_EX(QLStressTest, SlowUpdateConsensus, QLStressTestSingleTablet) {
745
0
  std::atomic<int> key(0);
746
747
0
  auto peers = ListTabletPeers(cluster_.get(), ListPeersFilter::kNonLeaders);
748
0
  ASSERT_EQ(peers.size(), 2);
749
750
0
  down_cast<consensus::RaftConsensus*>(peers[0]->consensus())->TEST_DelayUpdate(20s);
751
752
0
  TestThreadHolder thread_holder;
753
0
  AddWriter(std::string(100_KB, 'X'), &key, &thread_holder, 100ms);
754
755
0
  thread_holder.WaitAndStop(30s);
756
757
0
  down_cast<consensus::RaftConsensus*>(peers[0]->consensus())->TEST_DelayUpdate(0s);
758
759
0
  int64_t max_peak_consumption = 0;
760
0
  for (size_t i = 1; i <= cluster_->num_tablet_servers(); ++i) {
761
0
    auto server_tracker = MemTracker::FindTracker(Format("server $0", i));
762
0
    auto call_tracker = MemTracker::FindTracker("Call", server_tracker);
763
0
    auto inbound_rpc_tracker = MemTracker::FindTracker("Inbound RPC", call_tracker);
764
0
    max_peak_consumption = std::max(max_peak_consumption, inbound_rpc_tracker->peak_consumption());
765
0
  }
766
0
  LOG(INFO) << "Peak consumption: " << max_peak_consumption;
767
0
  ASSERT_LE(max_peak_consumption, 150_MB);
768
0
}
769
770
template <int kSoftLimit, int kHardLimit>
771
class QLStressTestDelayWrite : public QLStressTestSingleTablet {
772
 public:
773
2
  void SetUp() override {
774
2
    FLAGS_db_write_buffer_size = 1_KB;
775
2
    FLAGS_sst_files_soft_limit = kSoftLimit;
776
2
    FLAGS_sst_files_hard_limit = kHardLimit;
777
2
    FLAGS_rocksdb_level0_file_num_compaction_trigger = 6;
778
2
    FLAGS_rocksdb_universal_compaction_min_merge_width = 2;
779
2
    FLAGS_rocksdb_universal_compaction_size_ratio = 1000;
780
2
    QLStressTestSingleTablet::SetUp();
781
2
  }
_ZN2yb6client22QLStressTestDelayWriteILi4ELi10EE5SetUpEv
Line
Count
Source
773
1
  void SetUp() override {
774
1
    FLAGS_db_write_buffer_size = 1_KB;
775
1
    FLAGS_sst_files_soft_limit = kSoftLimit;
776
1
    FLAGS_sst_files_hard_limit = kHardLimit;
777
1
    FLAGS_rocksdb_level0_file_num_compaction_trigger = 6;
778
1
    FLAGS_rocksdb_universal_compaction_min_merge_width = 2;
779
1
    FLAGS_rocksdb_universal_compaction_size_ratio = 1000;
780
1
    QLStressTestSingleTablet::SetUp();
781
1
  }
_ZN2yb6client22QLStressTestDelayWriteILi6ELi6EE5SetUpEv
Line
Count
Source
773
1
  void SetUp() override {
774
1
    FLAGS_db_write_buffer_size = 1_KB;
775
1
    FLAGS_sst_files_soft_limit = kSoftLimit;
776
1
    FLAGS_sst_files_hard_limit = kHardLimit;
777
1
    FLAGS_rocksdb_level0_file_num_compaction_trigger = 6;
778
1
    FLAGS_rocksdb_universal_compaction_min_merge_width = 2;
779
1
    FLAGS_rocksdb_universal_compaction_size_ratio = 1000;
780
1
    QLStressTestSingleTablet::SetUp();
781
1
  }
782
783
0
  client::YBSessionPtr NewSession() override {
784
0
    auto result = QLStressTestSingleTablet::NewSession();
785
0
    result->SetTimeout(5s);
786
0
    return result;
787
0
  }
Unexecuted instantiation: _ZN2yb6client22QLStressTestDelayWriteILi4ELi10EE10NewSessionEv
Unexecuted instantiation: _ZN2yb6client22QLStressTestDelayWriteILi6ELi6EE10NewSessionEv
788
};
789
790
void QLStressTest::AddWriter(
791
    std::string value_prefix, std::atomic<int>* key, TestThreadHolder* thread_holder,
792
    const std::chrono::nanoseconds& sleep_duration,
793
0
    bool allow_failures, TransactionManager* txn_manager, double transactional_write_probability) {
794
0
  thread_holder->AddThreadFunctor([this, &stop = thread_holder->stop_flag(), key, sleep_duration,
795
0
                                   value_prefix = std::move(value_prefix), allow_failures,
796
0
                                   txn_manager, transactional_write_probability] {
797
0
    auto session = NewSession();
798
0
    session->SetRejectionScoreSource(std::make_shared<RejectionScoreSource>());
799
0
    ASSERT_TRUE(txn_manager || transactional_write_probability == 0.0);
800
801
0
    while (!stop.load(std::memory_order_acquire)) {
802
0
      YBTransactionPtr txn;
803
0
      if (txn_manager && RandomActWithProbability(transactional_write_probability)) {
804
0
        txn = std::make_shared<YBTransaction>(txn_manager);
805
0
        ASSERT_OK(txn->Init(IsolationLevel::SNAPSHOT_ISOLATION));
806
0
        session->SetTransaction(txn);
807
0
      } else {
808
0
        session->SetTransaction(nullptr);
809
0
      }
810
0
      auto new_key = *key + 1;
811
0
      auto status = WriteRow(session, new_key, value_prefix + std::to_string(new_key));
812
0
      if (status.ok() && txn) {
813
0
        status = txn->CommitFuture().get();
814
0
      }
815
816
0
      if (!allow_failures) {
817
0
        ASSERT_OK(status);
818
0
      } else if (!status.ok()) {
819
0
        LOG(WARNING) << "Write failed: " << status;
820
0
      }
821
0
      if (status.ok()) {
822
0
        ++*key;
823
0
      }
824
0
      if (sleep_duration.count() > 0) {
825
0
        std::this_thread::sleep_for(sleep_duration);
826
0
      }
827
0
    }
828
0
  });
829
0
}
830
831
0
void QLStressTest::TestWriteRejection() {
832
0
  constexpr int kWriters = IsDebug() ? 10 : 20;
833
0
  constexpr int kKeyBase = 10000;
834
835
0
  std::array<std::atomic<int>, kWriters> keys;
836
837
0
  const std::string value_prefix = std::string(1_KB, 'X');
838
839
0
  TestThreadHolder thread_holder;
840
0
  for (int i = 0; i != kWriters; ++i) {
841
0
    keys[i] = i * kKeyBase;
842
0
    AddWriter(value_prefix, &keys[i], &thread_holder, 0s, true /* allow_failures */);
843
0
  }
844
845
0
  thread_holder.AddThreadFunctor([this, &stop = thread_holder.stop_flag(), &keys, &value_prefix] {
846
0
    auto session = NewSession();
847
0
    while (!stop.load(std::memory_order_acquire)) {
848
0
      int idx = RandomUniformInt(0, kWriters - 1);
849
0
      auto had_key = keys[idx].load(std::memory_order_acquire);
850
0
      if (had_key == kKeyBase * idx) {
851
0
        std::this_thread::sleep_for(50ms);
852
0
        continue;
853
0
      }
854
0
      auto value = ASSERT_RESULT(ReadRow(session, had_key)).string_value();
855
0
      ASSERT_EQ(value, value_prefix + std::to_string(had_key));
856
0
    }
857
0
  });
858
859
0
  int last_keys_written = 0;
860
0
  int first_keys_written_after_rejections_started_to_appear = -1;
861
0
  auto last_keys_written_update_time = CoarseMonoClock::now();
862
0
  uint64_t last_rejections = 0;
863
0
  bool has_writes_after_rejections = false;
864
0
  for (;;) {
865
0
    std::this_thread::sleep_for(1s);
866
0
    int keys_written = 0;
867
0
    for (int i = 0; i != kWriters; ++i) {
868
0
      keys_written += keys[i].load() - kKeyBase * i;
869
0
    }
870
0
    LOG(INFO) << "Total keys written: " << keys_written;
871
0
    if (keys_written == last_keys_written) {
872
0
      ASSERT_LE(CoarseMonoClock::now() - last_keys_written_update_time, 20s);
873
0
      continue;
874
0
    }
875
0
    if (last_rejections != 0) {
876
0
      if (first_keys_written_after_rejections_started_to_appear < 0) {
877
0
        first_keys_written_after_rejections_started_to_appear = keys_written;
878
0
      } else if (keys_written > first_keys_written_after_rejections_started_to_appear) {
879
0
        has_writes_after_rejections = true;
880
0
      }
881
0
    }
882
0
    last_keys_written = keys_written;
883
0
    last_keys_written_update_time = CoarseMonoClock::now();
884
885
0
    uint64_t total_rejections = 0;
886
0
    for (size_t i = 0; i < cluster_->num_tablet_servers(); ++i) {
887
0
      int64_t rejections = 0;
888
0
      auto peers = cluster_->mini_tablet_server(i)->server()->tablet_manager()->GetTabletPeers();
889
0
      for (const auto& peer : peers) {
890
0
        auto counter = METRIC_majority_sst_files_rejections.Instantiate(
891
0
            peer->tablet()->GetTabletMetricsEntity());
892
0
        rejections += counter->value();
893
0
      }
894
0
      total_rejections += rejections;
895
0
    }
896
0
    LOG(INFO) << "Total rejections: " << total_rejections;
897
0
    last_rejections = total_rejections;
898
899
0
    if (keys_written >= RegularBuildVsSanitizers(1000, 100) &&
900
0
        (IsSanitizer() || total_rejections >= 10) &&
901
0
        has_writes_after_rejections) {
902
0
      break;
903
0
    }
904
0
  }
905
906
0
  thread_holder.Stop();
907
908
0
  ASSERT_OK(WaitFor([cluster = cluster_.get()] {
909
0
    auto peers = ListTabletPeers(cluster, ListPeersFilter::kAll);
910
0
    OpId first_op_id;
911
0
    for (const auto& peer : peers) {
912
0
      if (!peer->consensus()) {
913
0
        return false;
914
0
      }
915
0
      auto current = peer->consensus()->GetLastCommittedOpId();
916
0
      if (!first_op_id) {
917
0
        first_op_id = current;
918
0
      } else if (current != first_op_id) {
919
0
        return false;
920
0
      }
921
0
    }
922
0
    return true;
923
0
  }, 30s, "Waiting tablets to sync up"));
924
0
}
925
926
typedef QLStressTestDelayWrite<4, 10> QLStressTestDelayWrite_4_10;
927
928
0
TEST_F_EX(QLStressTest, DelayWrite, QLStressTestDelayWrite_4_10) {
929
0
  TestWriteRejection();
930
0
}
931
932
// Soft limit == hard limit to test write stop and recover after it.
933
typedef QLStressTestDelayWrite<6, 6> QLStressTestDelayWrite_6_6;
934
935
0
TEST_F_EX(QLStressTest, WriteStop, QLStressTestDelayWrite_6_6) {
936
0
  TestWriteRejection();
937
0
}
938
939
class QLStressTestLongRemoteBootstrap : public QLStressTestSingleTablet {
940
 public:
941
1
  void SetUp() override {
942
1
    FLAGS_log_cache_size_limit_mb = 1;
943
1
    FLAGS_log_segment_size_bytes = 96_KB;
944
1
    QLStressTestSingleTablet::SetUp();
945
1
  }
946
};
947
948
0
TEST_F_EX(QLStressTest, LongRemoteBootstrap, QLStressTestLongRemoteBootstrap) {
949
0
  FLAGS_log_min_seconds_to_retain = 1;
950
0
  FLAGS_remote_bootstrap_rate_limit_bytes_per_sec = 1_MB;
951
952
0
  cluster_->mini_tablet_server(0)->Shutdown();
953
954
0
  ASSERT_OK(WaitFor([this] {
955
0
    auto leaders = ListTabletPeers(cluster_.get(), ListPeersFilter::kLeaders);
956
0
    if (leaders.empty()) {
957
0
      return false;
958
0
    }
959
0
    LOG(INFO) << "Tablet id: " << leaders.front()->tablet_id();
960
0
    return true;
961
0
  }, 30s, "Leader elected"));
962
963
0
  std::atomic<int> key(0);
964
965
0
  TestThreadHolder thread_holder;
966
0
  constexpr size_t kValueSize = 32_KB;
967
0
  AddWriter(std::string(kValueSize, 'X'), &key, &thread_holder, 100ms);
968
969
0
  std::this_thread::sleep_for(20s); // Wait some time to have logs.
970
971
0
  ASSERT_OK(WaitFor([this]() -> Result<bool> {
972
0
    RETURN_NOT_OK(cluster_->CleanTabletLogs());
973
0
    auto leaders = ListTabletPeers(cluster_.get(), ListPeersFilter::kLeaders);
974
0
    if (leaders.empty()) {
975
0
      return false;
976
0
    }
977
978
0
    RETURN_NOT_OK(leaders.front()->tablet()->Flush(tablet::FlushMode::kSync));
979
0
    RETURN_NOT_OK(leaders.front()->RunLogGC());
980
981
    // Check that first log was garbage collected, so remote bootstrap will be required.
982
0
    consensus::ReplicateMsgs replicates;
983
0
    int64_t starting_op_segment_seq_num;
984
0
    yb::SchemaPB schema;
985
0
    uint32_t schema_version;
986
0
    return !leaders.front()->log()->GetLogReader()->ReadReplicatesInRange(
987
0
        100, 101, 0, &replicates, &starting_op_segment_seq_num, &schema, &schema_version).ok();
988
0
  }, 30s, "Logs cleaned"));
989
990
0
  LOG(INFO) << "Bring replica back, keys written: " << key.load(std::memory_order_acquire);
991
0
  ASSERT_OK(cluster_->mini_tablet_server(0)->Start());
992
993
0
  thread_holder.AddThreadFunctor([this, &stop = thread_holder.stop_flag()] {
994
0
    while (!stop.load(std::memory_order_acquire)) {
995
0
      ASSERT_OK(cluster_->FlushTablets());
996
0
      ASSERT_OK(cluster_->CleanTabletLogs());
997
0
      std::this_thread::sleep_for(100ms);
998
0
    }
999
0
  });
1000
1001
0
  ASSERT_OK(WaitAllReplicasHaveIndex(cluster_.get(), key.load(std::memory_order_acquire), 40s));
1002
0
  LOG(INFO) << "All replicas ready";
1003
1004
0
  ASSERT_OK(WaitFor([this] {
1005
0
    bool result = true;
1006
0
    auto followers = ListTabletPeers(cluster_.get(), ListPeersFilter::kNonLeaders);
1007
0
    LOG(INFO) << "Num followers: " << followers.size();
1008
0
    for (const auto& peer : followers) {
1009
0
      auto log_cache_size = peer->raft_consensus()->LogCacheSize();
1010
0
      LOG(INFO) << "T " << peer->tablet_id() << " P " << peer->permanent_uuid()
1011
0
                << ", log cache size: " << log_cache_size;
1012
0
      if (log_cache_size != 0) {
1013
0
        result = false;
1014
0
      }
1015
0
    }
1016
0
    return result;
1017
0
  }, 5s, "All followers cleanup cache"));
1018
1019
  // Write some more values and check that replica still in touch.
1020
0
  std::this_thread::sleep_for(5s);
1021
0
  ASSERT_OK(WaitAllReplicasHaveIndex(cluster_.get(), key.load(std::memory_order_acquire), 1s));
1022
0
}
1023
1024
class QLStressDynamicCompactionPriorityTest : public QLStressTest {
1025
 public:
1026
1
  void SetUp() override {
1027
1
    FLAGS_allow_preempting_compactions = true;
1028
1
    FLAGS_db_write_buffer_size = 16_KB;
1029
1
    FLAGS_enable_ondisk_compression = false;
1030
1
    FLAGS_rocksdb_max_background_compactions = 1;
1031
1
    FLAGS_rocksdb_compact_flush_rate_limit_bytes_per_sec = 160_KB;
1032
1
    QLStressTest::SetUp();
1033
1
  }
1034
1035
0
  int NumTablets() override {
1036
0
    return 1;
1037
0
  }
1038
1039
0
  void InitSchemaBuilder(YBSchemaBuilder* builder) override {
1040
0
    builder->AddColumn("h")->Type(INT32)->HashPrimaryKey()->NotNull();
1041
0
    builder->AddColumn(kValueColumn)->Type(STRING);
1042
0
  }
1043
};
1044
1045
0
TEST_F_EX(QLStressTest, DynamicCompactionPriority, QLStressDynamicCompactionPriorityTest) {
1046
0
  YBSchemaBuilder b;
1047
0
  InitSchemaBuilder(&b);
1048
0
  CompleteSchemaBuilder(&b);
1049
1050
0
  TableHandle table2;
1051
0
  ASSERT_OK(table2.Create(YBTableName(kTableName.namespace_type(),
1052
0
                                      kTableName.namespace_name(),
1053
0
                                      kTableName.table_name() + "_2"),
1054
0
                          NumTablets(), client_.get(), &b));
1055
1056
0
  TestThreadHolder thread_holder;
1057
0
  thread_holder.AddThreadFunctor([this, &table2, &stop = thread_holder.stop_flag()] {
1058
0
    auto session = NewSession();
1059
0
    int key = 1;
1060
0
    std::string value(FLAGS_db_write_buffer_size, 'X');
1061
0
    int left_writes_to_current_table = 0;
1062
0
    TableHandle* table = nullptr;
1063
0
    while (!stop.load(std::memory_order_acquire)) {
1064
0
      if (left_writes_to_current_table == 0) {
1065
0
        table = RandomUniformBool() ? &table_ : &table2;
1066
0
        left_writes_to_current_table = RandomUniformInt(1, std::max(1, key / 5));
1067
0
      } else {
1068
0
        --left_writes_to_current_table;
1069
0
      }
1070
0
      const auto op = table->NewWriteOp(QLWriteRequestPB::QL_STMT_INSERT);
1071
0
      auto* const req = op->mutable_request();
1072
0
      QLAddInt32HashValue(req, key);
1073
0
      table_.AddStringColumnValue(req, kValueColumn, value);
1074
0
      session->Apply(op);
1075
0
      ASSERT_OK(session->Flush());
1076
0
      ASSERT_OK(CheckOp(op.get()));
1077
0
      std::this_thread::sleep_for(100ms);
1078
0
      ++key;
1079
0
    }
1080
0
  });
1081
1082
0
  thread_holder.WaitAndStop(60s);
1083
1084
0
  auto delete_start = CoarseMonoClock::now();
1085
0
  ASSERT_OK(client_->DeleteTable(table_->id(), true));
1086
0
  MonoDelta delete_time(CoarseMonoClock::now() - delete_start);
1087
0
  LOG(INFO) << "Delete time: " << delete_time;
1088
0
  ASSERT_LE(delete_time, 10s);
1089
0
}
1090
1091
class QLStressTestTransactionalSingleTablet : public QLStressTestSingleTablet {
1092
0
  void CompleteSchemaBuilder(YBSchemaBuilder* builder) override {
1093
0
    SetTransactional(builder);
1094
0
  }
1095
};
1096
1097
// Verify that we don't have too many write waiters.
1098
// Uses FLAGS_TEST_max_write_waiters to fail debug check when there are too many waiters.
1099
0
TEST_F_EX(QLStressTest, RemoveIntentsDuringWrite, QLStressTestTransactionalSingleTablet) {
1100
0
  FLAGS_TEST_max_write_waiters = 5;
1101
1102
0
  constexpr int kWriters = 10;
1103
0
  constexpr int kKeyBase = 10000;
1104
1105
0
  std::array<std::atomic<int>, kWriters> keys;
1106
1107
0
  auto txn_manager = CreateTxnManager();
1108
0
  TestThreadHolder thread_holder;
1109
0
  for (int i = 0; i != kWriters; ++i) {
1110
0
    keys[i] = i * kKeyBase;
1111
0
    AddWriter(
1112
0
        "value_", &keys[i], &thread_holder, 0s, false /* allow_failures */, &txn_manager, 1.0);
1113
0
  }
1114
1115
0
  thread_holder.WaitAndStop(3s);
1116
0
}
1117
1118
0
TEST_F_EX(QLStressTest, SyncOldLeader, QLStressTestSingleTablet) {
1119
0
  FLAGS_raft_heartbeat_interval_ms = 100 * kTimeMultiplier;
1120
0
  constexpr int kOldLeaderWriteKeys = 100;
1121
  // Should be less than amount of pending operations at the old leader.
1122
  // So it is much smaller than keys written to the old leader.
1123
0
  constexpr int kNewLeaderWriteKeys = 10;
1124
1125
0
  TestThreadHolder thread_holder;
1126
1127
0
  client_->messenger()->TEST_SetOutboundIpBase(ASSERT_RESULT(HostToAddress("127.0.0.1")));
1128
1129
0
  auto session = NewSession();
1130
  // Perform write to make sure we have a leader.
1131
0
  ASSERT_OK(WriteRow(session, 0, "value"));
1132
1133
0
  session->SetTimeout(10s);
1134
0
  std::vector<std::future<FlushStatus>> futures;
1135
0
  int key;
1136
0
  for (key = 1; key <= kOldLeaderWriteKeys; ++key) {
1137
0
    InsertRow(session, key, std::to_string(key));
1138
0
    futures.push_back(session->FlushFuture());
1139
0
  }
1140
1141
0
  auto old_leader = ASSERT_RESULT(ServerWithLeaders(cluster_.get()));
1142
0
  LOG(INFO) << "Isolate old leader: "
1143
0
            << cluster_->mini_tablet_server(old_leader)->server()->permanent_uuid();
1144
0
  for (size_t i = 0; i != cluster_->num_tablet_servers(); ++i) {
1145
0
    if (i != old_leader) {
1146
0
      ASSERT_OK(SetupConnectivity(cluster_.get(), i, old_leader, Connectivity::kOff));
1147
0
    }
1148
0
  }
1149
1150
0
  int written_to_new_leader = 0;
1151
0
  while (written_to_new_leader < kNewLeaderWriteKeys) {
1152
0
    ++key;
1153
0
    auto write_status = WriteRow(session, key, std::to_string(key));
1154
0
    if (write_status.ok()) {
1155
0
      ++written_to_new_leader;
1156
0
    } else {
1157
      // Some writes could fail, while operations are being send to the old leader.
1158
0
      LOG(INFO) << "Write " << key << " failed: " << write_status;
1159
0
    }
1160
0
  }
1161
1162
0
  auto peers = cluster_->GetTabletPeers(old_leader);
1163
  // Reject all non empty update consensuses, to activate consensus exponential backoff,
1164
  // and get into situation where leader sends empty request.
1165
0
  for (const auto& peer : peers) {
1166
0
    peer->raft_consensus()->TEST_RejectMode(consensus::RejectMode::kNonEmpty);
1167
0
  }
1168
1169
0
  for (size_t i = 0; i != cluster_->num_tablet_servers(); ++i) {
1170
0
    if (i != old_leader) {
1171
0
      ASSERT_OK(SetupConnectivity(cluster_.get(), i, old_leader, Connectivity::kOn));
1172
0
    }
1173
0
  }
1174
1175
  // Wait until old leader receive update consensus with empty ops.
1176
0
  std::this_thread::sleep_for(5s * kTimeMultiplier);
1177
1178
0
  for (const auto& peer : peers) {
1179
0
    peer->raft_consensus()->TEST_RejectMode(consensus::RejectMode::kNone);
1180
0
  }
1181
1182
  // Wait all writes to complete.
1183
0
  for (auto& future : futures) {
1184
0
    WARN_NOT_OK(future.get().status, "Write failed");
1185
0
  }
1186
1187
0
  thread_holder.Stop();
1188
0
}
1189
1190
} // namespace client
1191
} // namespace yb