YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/client/snapshot-txn-test.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include <atomic>
15
#include <memory>
16
#include <mutex>
17
#include <string>
18
#include <utility>
19
#include <vector>
20
21
#include <boost/optional/optional_fwd.hpp>
22
23
#include "yb/client/session.h"
24
#include "yb/client/table.h"
25
#include "yb/client/transaction.h"
26
#include "yb/client/transaction_pool.h"
27
#include "yb/client/txn-test-base.h"
28
#include "yb/client/yb_op.h"
29
30
#include "yb/common/entity_ids_types.h"
31
#include "yb/common/ql_value.h"
32
33
#include "yb/consensus/consensus.h"
34
#include "yb/consensus/consensus.pb.h"
35
36
#include "yb/docdb/consensus_frontier.h"
37
38
#include "yb/gutil/casts.h"
39
40
#include "yb/rocksdb/db.h"
41
42
#include "yb/tablet/tablet.h"
43
#include "yb/tablet/tablet_peer.h"
44
#include "yb/tablet/transaction_participant.h"
45
46
#include "yb/tserver/mini_tablet_server.h"
47
#include "yb/tserver/tablet_server.h"
48
49
#include "yb/util/debug/long_operation_tracker.h"
50
#include "yb/util/enums.h"
51
#include "yb/util/lockfree.h"
52
#include "yb/util/opid.h"
53
#include "yb/util/random_util.h"
54
#include "yb/util/result.h"
55
#include "yb/util/scope_exit.h"
56
#include "yb/util/test_thread_holder.h"
57
#include "yb/util/tsan_util.h"
58
59
#include "yb/yql/cql/ql/util/errcodes.h"
60
#include "yb/yql/cql/ql/util/statement_result.h"
61
62
using namespace std::literals;
63
64
DECLARE_bool(TEST_disallow_lmp_failures);
65
DECLARE_bool(fail_on_out_of_range_clock_skew);
66
DECLARE_bool(ycql_consistent_transactional_paging);
67
DECLARE_int32(TEST_inject_load_transaction_delay_ms);
68
DECLARE_int32(TEST_inject_mvcc_delay_add_leader_pending_ms);
69
DECLARE_int32(TEST_inject_status_resolver_delay_ms);
70
DECLARE_int32(log_min_seconds_to_retain);
71
DECLARE_int32(txn_max_apply_batch_records);
72
DECLARE_int64(transaction_rpc_timeout_ms);
73
DECLARE_uint64(max_clock_skew_usec);
74
DECLARE_uint64(max_transactions_in_status_request);
75
DECLARE_uint64(clock_skew_force_crash_bound_usec);
76
77
extern double TEST_delay_create_transaction_probability;
78
79
namespace yb {
80
namespace client {
81
82
YB_DEFINE_ENUM(BankAccountsOption,
83
               (kTimeStrobe) // Perform time stobe during test.
84
               (kStepDown) // Perform leader step downs during test.
85
               (kTimeJump) // Perform time jumps during test.
86
               (kNetworkPartition) // Partition network during test.
87
               (kNoSelectRead)) // Don't use select-read for updating balance, i.e. use only
88
                                // "update set balance = balance + delta".
89
typedef EnumBitSet<BankAccountsOption> BankAccountsOptions;
90
91
class SnapshotTxnTest
92
    : public TransactionCustomLogSegmentSizeTest<0, TransactionTestBase<MiniCluster>> {
93
 protected:
94
15
  void SetUp() override {
95
15
    SetIsolationLevel(IsolationLevel::SNAPSHOT_ISOLATION);
96
15
    TransactionTestBase::SetUp();
97
15
  }
98
99
  void TestBankAccounts(BankAccountsOptions options, CoarseDuration duration,
100
                        int minimal_updates_per_second, double select_update_probability = 0.5);
101
  void TestBankAccountsThread(
102
     int accounts, double select_update_probability, std::atomic<bool>* stop,
103
     std::atomic<int64_t>* updates, TransactionPool* pool);
104
  void TestRemoteBootstrap();
105
  void TestMultiWriteWithRestart();
106
};
107
108
0
bool TransactionalFailure(const Status& status) {
109
0
  return status.IsTryAgain() || status.IsExpired() || status.IsNotFound() || status.IsTimedOut();
110
0
}
111
112
void SnapshotTxnTest::TestBankAccountsThread(
113
    int accounts, double select_update_probability, std::atomic<bool>* stop,
114
0
    std::atomic<int64_t>* updates, TransactionPool* pool) {
115
0
  auto session = CreateSession();
116
0
  YBTransactionPtr txn;
117
0
  int32_t key1 = 0, key2 = 0;
118
0
  while (!stop->load(std::memory_order_acquire)) {
119
0
    if (!txn) {
120
0
      key1 = RandomUniformInt(1, accounts);
121
0
      key2 = RandomUniformInt(1, accounts - 1);
122
0
      if (key2 >= key1) {
123
0
        ++key2;
124
0
      }
125
0
      txn = ASSERT_RESULT(pool->TakeAndInit(GetIsolationLevel()));
126
0
    }
127
0
    session->SetTransaction(txn);
128
0
    int transfer = RandomUniformInt(1, 250);
129
130
0
    auto txn_id = txn->id();
131
0
    LOG(INFO) << txn_id << " transferring (" << key1 << ") => (" << key2 << "), delta: "
132
0
              << transfer;
133
134
0
    Status status;
135
0
    if (RandomActWithProbability(select_update_probability)) {
136
0
      auto result = SelectRow(session, key1);
137
0
      int32_t balance1 = -1;
138
0
      if (result.ok()) {
139
0
        balance1 = *result;
140
0
        result = SelectRow(session, key2);
141
0
      }
142
0
      if (!result.ok()) {
143
0
        if (txn->IsRestartRequired()) {
144
0
          ASSERT_TRUE(result.status().IsQLError()) << result;
145
0
          auto txn_result = pool->TakeRestarted(txn);
146
0
          if (!txn_result.ok()) {
147
0
            ASSERT_TRUE(txn_result.status().IsIllegalState()) << txn_result.status();
148
0
            txn = nullptr;
149
0
          } else {
150
0
            txn = *txn_result;
151
0
          }
152
0
          continue;
153
0
        }
154
0
        if (result.status().IsTimedOut() || result.status().IsQLError()) {
155
0
          txn = nullptr;
156
0
          continue;
157
0
        }
158
0
        ASSERT_TRUE(result.ok())
159
0
            << Format("$0, TXN: $0, key1: $1, key2: $2", result.status(), txn->id(), key1, key2);
160
0
      }
161
0
      auto balance2 = *result;
162
0
      status = ResultToStatus(WriteRow(session, key1, balance1 - transfer));
163
0
      if (status.ok()) {
164
0
        status = ResultToStatus(WriteRow(session, key2, balance2 + transfer));
165
0
      }
166
0
    } else {
167
0
      status = ResultToStatus(kv_table_test::Increment(
168
0
          &table_, session, key1, -transfer, Flush::kTrue));
169
0
      if (status.ok()) {
170
0
        status = ResultToStatus(kv_table_test::Increment(
171
0
            &table_, session, key2, transfer, Flush::kTrue));
172
0
      }
173
0
    }
174
175
0
    if (status.ok()) {
176
0
      status = txn->CommitFuture().get();
177
0
    }
178
0
    txn = nullptr;
179
0
    if (status.ok()) {
180
0
      LOG(INFO) << txn_id << " transferred (" << key1 << ") => (" << key2 << "), delta: "
181
0
                << transfer;
182
0
      updates->fetch_add(1);
183
0
    } else {
184
0
      ASSERT_TRUE(
185
0
          status.IsTryAgain() || status.IsExpired() || status.IsNotFound() || status.IsTimedOut() ||
186
0
          ql::QLError(status) == ql::ErrorCode::RESTART_REQUIRED) << status;
187
0
    }
188
0
  }
189
0
}
190
191
0
std::thread RandomClockSkewWalkThread(MiniCluster* cluster, std::atomic<bool>* stop) {
192
  // Clock skew is modified by a random amount every 100ms.
193
0
  return std::thread([cluster, stop] {
194
0
    const server::SkewedClock::DeltaTime upperbound =
195
0
        std::chrono::microseconds(GetAtomicFlag(&FLAGS_max_clock_skew_usec)) / 2;
196
0
    const auto lowerbound = -upperbound;
197
0
    while (!stop->load(std::memory_order_acquire)) {
198
0
      auto num_servers = cluster->num_tablet_servers();
199
0
      std::vector<server::SkewedClock::DeltaTime> time_deltas(num_servers);
200
201
0
      for (size_t i = 0; i != num_servers; ++i) {
202
0
        auto* tserver = cluster->mini_tablet_server(i)->server();
203
0
        auto* hybrid_clock = down_cast<server::HybridClock*>(tserver->clock());
204
0
        auto skewed_clock =
205
0
            std::static_pointer_cast<server::SkewedClock>(hybrid_clock->physical_clock());
206
0
        auto shift = RandomUniformInt(-10, 10);
207
0
        std::chrono::milliseconds change(1 << std::abs(shift));
208
0
        if (shift < 0) {
209
0
          change = -change;
210
0
        }
211
212
0
        time_deltas[i] += change;
213
0
        time_deltas[i] = std::max(std::min(time_deltas[i], upperbound), lowerbound);
214
0
        LOG(INFO) << "Set delta " << i << ": " << time_deltas[i].count();
215
0
        skewed_clock->SetDelta(time_deltas[i]);
216
217
0
        std::this_thread::sleep_for(100ms);
218
0
      }
219
0
    }
220
0
  });
221
0
}
222
223
0
std::thread StrobeThread(MiniCluster* cluster, std::atomic<bool>* stop) {
224
  // When strobe time is enabled we greatly change time delta for a short amount of time,
225
  // then change it back to 0.
226
0
  return std::thread([cluster, stop] {
227
0
    int iteration = 0;
228
0
    while (!stop->load(std::memory_order_acquire)) {
229
0
      for (size_t i = 0; i != cluster->num_tablet_servers(); ++i) {
230
0
        auto* tserver = cluster->mini_tablet_server(i)->server();
231
0
        auto* hybrid_clock = down_cast<server::HybridClock*>(tserver->clock());
232
0
        auto skewed_clock =
233
0
            std::static_pointer_cast<server::SkewedClock>(hybrid_clock->physical_clock());
234
0
        server::SkewedClock::DeltaTime time_delta;
235
0
        if (iteration & 1) {
236
0
          time_delta = server::SkewedClock::DeltaTime();
237
0
        } else {
238
0
          auto shift = RandomUniformInt(-16, 16);
239
0
          time_delta = std::chrono::microseconds(1 << (12 + std::abs(shift)));
240
0
          if (shift < 0) {
241
0
            time_delta = -time_delta;
242
0
          }
243
0
        }
244
0
        skewed_clock->SetDelta(time_delta);
245
0
        std::this_thread::sleep_for(15ms);
246
0
      }
247
0
    }
248
0
  });
249
0
}
250
251
void SnapshotTxnTest::TestBankAccounts(
252
    BankAccountsOptions options, CoarseDuration duration, int minimal_updates_per_second,
253
0
    double select_update_probability) {
254
0
  TransactionPool pool(transaction_manager_.get_ptr(), nullptr /* metric_entity */);
255
0
  const int kAccounts = 20;
256
0
  const int kThreads = 5;
257
0
  const int kInitialAmount = 10000;
258
259
0
  {
260
0
    auto txn = ASSERT_RESULT(pool.TakeAndInit(GetIsolationLevel()));
261
0
    LOG(INFO) << "Initial write transaction: " << txn->id();
262
0
    auto init_session = CreateSession(txn);
263
0
    for (int i = 1; i <= kAccounts; ++i) {
264
0
      ASSERT_OK(WriteRow(init_session, i, kInitialAmount));
265
0
    }
266
0
    ASSERT_OK(txn->CommitFuture().get());
267
0
  }
268
269
0
  TestThreadHolder threads;
270
0
  if (options.Test(BankAccountsOption::kTimeStrobe)) {
271
0
    threads.AddThread(StrobeThread(cluster_.get(), &threads.stop_flag()));
272
0
  }
273
274
0
  if (options.Test(BankAccountsOption::kNetworkPartition)) {
275
0
    threads.AddThreadFunctor([cluster = cluster_.get(), &stop = threads.stop_flag()]() {
276
0
      auto num_tservers = cluster->num_tablet_servers();
277
0
      while (!stop.load(std::memory_order_acquire)) {
278
0
        auto partitioned = RandomUniformInt<size_t>(0, num_tservers - 1);
279
0
        for (auto connectivity : {Connectivity::kOff, Connectivity::kOn}) {
280
0
          for (size_t i = 0; i != num_tservers; ++i) {
281
0
            if (i == partitioned) {
282
0
              continue;
283
0
            }
284
0
            ASSERT_OK(SetupConnectivity(cluster, i, partitioned, connectivity));
285
0
          }
286
0
          std::this_thread::sleep_for(connectivity == Connectivity::kOff ? 10s : 30s);
287
0
        }
288
0
      }
289
0
    });
290
0
  }
291
292
0
  std::atomic<int64_t> updates(0);
293
0
  auto se = ScopeExit(
294
0
      [&threads, &updates, duration, minimal_updates_per_second] {
295
0
    threads.Stop();
296
297
0
    LOG(INFO) << "Total updates: " << updates.load(std::memory_order_acquire);
298
0
    ASSERT_GT(updates.load(std::memory_order_acquire),
299
0
              minimal_updates_per_second * duration / 1s);
300
0
  });
301
302
0
  for (int i = 0; i != kThreads; ++i) {
303
0
    threads.AddThreadFunctor(std::bind(
304
0
        &SnapshotTxnTest::TestBankAccountsThread, this, kAccounts, select_update_probability,
305
0
        &threads.stop_flag(), &updates, &pool));
306
0
  }
307
308
0
  auto end_time = CoarseMonoClock::now() + duration;
309
310
0
  if (options.Test(BankAccountsOption::kTimeJump)) {
311
0
    auto* tserver = cluster_->mini_tablet_server(0)->server();
312
0
    auto* hybrid_clock = down_cast<server::HybridClock*>(tserver->clock());
313
0
    auto skewed_clock =
314
0
        std::static_pointer_cast<server::SkewedClock>(hybrid_clock->physical_clock());
315
0
    auto old_delta = skewed_clock->SetDelta(duration);
316
0
    std::this_thread::sleep_for(1s);
317
0
    skewed_clock->SetDelta(old_delta);
318
0
  }
319
320
0
  auto session = CreateSession();
321
0
  YBTransactionPtr txn;
322
0
  while (CoarseMonoClock::now() < end_time &&
323
0
         !threads.stop_flag().load(std::memory_order_acquire)) {
324
0
    if (!txn) {
325
0
      txn = ASSERT_RESULT(pool.TakeAndInit(GetIsolationLevel()));
326
0
    }
327
0
    auto txn_id = txn->id();
328
0
    session->SetTransaction(txn);
329
0
    auto rows = SelectAllRows(session);
330
0
    if (!rows.ok()) {
331
0
      if (txn->IsRestartRequired()) {
332
0
        auto txn_result = pool.TakeRestarted(txn);
333
0
        if (!txn_result.ok()) {
334
0
          ASSERT_TRUE(txn_result.status().IsIllegalState()) << txn_result.status();
335
0
          txn = nullptr;
336
0
        } else {
337
0
          txn = *txn_result;
338
0
        }
339
0
      } else {
340
0
        txn = nullptr;
341
0
      }
342
0
      continue;
343
0
    }
344
0
    txn = nullptr;
345
0
    int sum_balance = 0;
346
0
    for (const auto& pair : *rows) {
347
0
      sum_balance += pair.second;
348
0
    }
349
0
    LOG(INFO) << txn_id << ", read done, values: " << AsString(*rows);
350
0
    ASSERT_EQ(sum_balance, kAccounts * kInitialAmount);
351
352
0
    if (options.Test(BankAccountsOption::kStepDown)) {
353
0
      StepDownRandomTablet(cluster_.get());
354
0
    }
355
0
  }
356
0
}
357
358
0
TEST_F(SnapshotTxnTest, BankAccounts) {
359
0
  FLAGS_TEST_disallow_lmp_failures = true;
360
0
  TestBankAccounts({}, 30s, RegularBuildVsSanitizers(10, 1) /* minimal_updates_per_second */);
361
0
}
362
363
0
TEST_F(SnapshotTxnTest, BankAccountsPartitioned) {
364
0
  TestBankAccounts(
365
0
      BankAccountsOptions{BankAccountsOption::kNetworkPartition}, 150s,
366
0
      RegularBuildVsSanitizers(10, 1) /* minimal_updates_per_second */);
367
0
}
368
369
0
TEST_F(SnapshotTxnTest, BankAccountsWithTimeStrobe) {
370
0
  FLAGS_fail_on_out_of_range_clock_skew = false;
371
0
  FLAGS_clock_skew_force_crash_bound_usec = 0;
372
373
0
  TestBankAccounts(
374
0
      BankAccountsOptions{BankAccountsOption::kTimeStrobe}, 300s,
375
0
      RegularBuildVsSanitizers(10, 1) /* minimal_updates_per_second */);
376
0
}
377
378
0
TEST_F(SnapshotTxnTest, BankAccountsWithTimeJump) {
379
0
  FLAGS_fail_on_out_of_range_clock_skew = false;
380
381
0
  TestBankAccounts(
382
0
      BankAccountsOptions{BankAccountsOption::kTimeJump, BankAccountsOption::kStepDown}, 30s,
383
0
      RegularBuildVsSanitizers(3, 1) /* minimal_updates_per_second */);
384
0
}
385
386
0
TEST_F(SnapshotTxnTest, BankAccountsDelayCreate) {
387
0
  FLAGS_transaction_rpc_timeout_ms = 500 * kTimeMultiplier;
388
0
  TEST_delay_create_transaction_probability = 0.5;
389
390
0
  TestBankAccounts({}, 30s, RegularBuildVsSanitizers(10, 1) /* minimal_updates_per_second */,
391
0
                   0.0 /* select_update_probability */);
392
0
}
393
394
0
TEST_F(SnapshotTxnTest, BankAccountsDelayAddLeaderPending) {
395
0
  FLAGS_TEST_disallow_lmp_failures = true;
396
0
  FLAGS_TEST_inject_mvcc_delay_add_leader_pending_ms = 20;
397
0
  TestBankAccounts({}, 30s, RegularBuildVsSanitizers(5, 1) /* minimal_updates_per_second */);
398
0
}
399
400
struct PagingReadCounts {
401
  int good = 0;
402
  int failed = 0;
403
  int inconsistent = 0;
404
  int timed_out = 0;
405
406
0
  std::string ToString() const {
407
0
    return Format("{ good: $0 failed: $1 inconsistent: $2 timed_out: $3 }",
408
0
                  good, failed, inconsistent, timed_out);
409
0
  }
410
411
0
  PagingReadCounts& operator+=(const PagingReadCounts& rhs) {
412
0
    good += rhs.good;
413
0
    failed += rhs.failed;
414
0
    inconsistent += rhs.inconsistent;
415
0
    timed_out += rhs.timed_out;
416
0
    return *this;
417
0
  }
418
};
419
420
class SingleTabletSnapshotTxnTest : public SnapshotTxnTest {
421
 protected:
422
0
  int NumTablets() {
423
0
    return 1;
424
0
  }
425
426
  Result<PagingReadCounts> TestPaging();
427
};
428
429
// Test reading from a transactional table using paging.
430
// Writes values in one thread, and reads them using paging in another thread.
431
//
432
// Clock skew is randomized, so we expect failures because of that.
433
// When ycql_consistent_transactional_paging is true we expect read restart failures.
434
// And we expect missing values when ycql_consistent_transactional_paging is false.
435
0
Result<PagingReadCounts> SingleTabletSnapshotTxnTest::TestPaging() {
436
0
  constexpr int kReadThreads = 4;
437
0
  constexpr int kWriteThreads = 4;
438
439
  // Writer with index j writes keys starting with j * kWriterMul + 1
440
0
  constexpr int kWriterMul = 100000;
441
442
0
  std::array<std::atomic<int32_t>, kWriteThreads> last_written_values;
443
0
  for (auto& value : last_written_values) {
444
0
    value.store(0, std::memory_order_release);
445
0
  }
446
447
0
  TestThreadHolder thread_holder;
448
449
0
  for (int j = 0; j != kWriteThreads; ++j) {
450
0
    thread_holder.AddThreadFunctor(
451
0
        [this, j, &stop = thread_holder.stop_flag(), &last_written = last_written_values[j]] {
452
0
      auto session = CreateSession();
453
0
      int i = 1;
454
0
      int base = j * kWriterMul;
455
0
      while (!stop.load(std::memory_order_acquire)) {
456
0
        auto txn = CreateTransaction2();
457
0
        session->SetTransaction(txn);
458
0
        ASSERT_OK(WriteRow(session, base + i, -(base + i)));
459
0
        auto commit_status = txn->CommitFuture().get();
460
0
        if (!commit_status.ok()) {
461
          // That could happen because of time jumps.
462
0
          ASSERT_TRUE(commit_status.IsExpired()) << commit_status;
463
0
          continue;
464
0
        }
465
0
        last_written.store(i, std::memory_order_release);
466
0
        ++i;
467
0
      }
468
0
    });
469
0
  }
470
471
0
  thread_holder.AddThread(RandomClockSkewWalkThread(cluster_.get(), &thread_holder.stop_flag()));
472
473
0
  std::vector<PagingReadCounts> per_thread_counts(kReadThreads);
474
475
0
  for (int i = 0; i != kReadThreads; ++i) {
476
0
    thread_holder.AddThreadFunctor([
477
0
        this, &stop = thread_holder.stop_flag(), &last_written_values,
478
0
        &counts = per_thread_counts[i]] {
479
0
      auto session = CreateSession(nullptr /* transaction */, clock_);
480
0
      while (!stop.load(std::memory_order_acquire)) {
481
0
        std::vector<int32_t> keys;
482
0
        QLPagingStatePB paging_state;
483
0
        std::array<int32_t, kWriteThreads> written_value;
484
0
        int32_t total_values = 0;
485
0
        for (int j = 0; j != kWriteThreads; ++j) {
486
0
          written_value[j] = last_written_values[j].load(std::memory_order_acquire);
487
0
          total_values += written_value[j];
488
0
        }
489
0
        bool failed = false;
490
0
        session->SetReadPoint(client::Restart::kFalse);
491
0
        session->SetForceConsistentRead(ForceConsistentRead::kFalse);
492
493
0
        for (;;) {
494
0
          const YBqlReadOpPtr op = table_.NewReadOp();
495
0
          auto* const req = op->mutable_request();
496
0
          table_.AddColumns(table_.AllColumnNames(), req);
497
0
          req->set_limit(total_values / 2 + 10);
498
0
          req->set_return_paging_state(true);
499
0
          if (paging_state.has_table_id()) {
500
0
            if (paging_state.has_read_time()) {
501
0
              ReadHybridTime read_time = ReadHybridTime::FromPB(paging_state.read_time());
502
0
              if (read_time) {
503
0
                session->SetReadPoint(read_time);
504
0
              }
505
0
            }
506
0
            session->SetForceConsistentRead(ForceConsistentRead::kTrue);
507
0
            *req->mutable_paging_state() = std::move(paging_state);
508
0
          }
509
0
          auto flush_status = session->ApplyAndFlush(op);
510
511
0
          if (!flush_status.ok() || !op->succeeded()) {
512
0
            if (flush_status.IsTimedOut()) {
513
0
              ++counts.timed_out;
514
0
            } else {
515
0
              ++counts.failed;
516
0
            }
517
0
            failed = true;
518
0
            break;
519
0
          }
520
521
0
          auto rowblock = yb::ql::RowsResult(op.get()).GetRowBlock();
522
0
          for (const auto& row : rowblock->rows()) {
523
0
            auto key = row.column(0).int32_value();
524
0
            ASSERT_EQ(key, -row.column(1).int32_value());
525
0
            keys.push_back(key);
526
0
          }
527
0
          if (!op->response().has_paging_state()) {
528
0
            break;
529
0
          }
530
0
          paging_state = op->response().paging_state();
531
0
        }
532
533
0
        if (failed) {
534
0
          continue;
535
0
        }
536
537
0
        std::sort(keys.begin(), keys.end());
538
539
        // Check that there are no duplicates.
540
0
        ASSERT_TRUE(std::unique(keys.begin(), keys.end()) == keys.end());
541
542
0
        bool good = true;
543
0
        size_t idx = 0;
544
0
        for (int j = 0; j != kWriteThreads; ++j) {
545
          // If current writer did not write anything, then check is done.
546
0
          if (written_value[j] == 0) {
547
0
            continue;
548
0
          }
549
550
          // Writer with index j writes the following keys:
551
          // j * kWriteMul + 1, j * kWriteMul + 2, ..., j * kWriteMul + written_value[j]
552
0
          int32_t base = j * kWriterMul;
553
          // Find first key related to the current writer.
554
0
          while (idx < keys.size() && keys[idx] < base) {
555
0
            ++idx;
556
0
          }
557
          // Since we sorted keys and removed duplicates we could just check first and last
558
          // entry of interval for current writer.
559
0
          size_t last_idx = idx + written_value[j] - 1;
560
0
          if (keys[idx] != base + 1 || last_idx >= keys.size() ||
561
0
              keys[last_idx] != base + written_value[j]) {
562
0
            LOG(INFO) << "Inconsistency, written values: " << yb::ToString(written_value)
563
0
                      << ", keys: " << yb::ToString(keys);
564
0
            good = false;
565
0
            break;
566
0
          }
567
0
          idx = last_idx + 1;
568
0
        }
569
0
        if (good) {
570
0
          ++counts.good;
571
0
        } else {
572
0
          ++counts.inconsistent;
573
0
        }
574
0
      }
575
0
    });
576
0
  }
577
578
0
  thread_holder.WaitAndStop(120s);
579
580
0
  int32_t total_values = 0;
581
0
  for (auto& value : last_written_values) {
582
0
    total_values += value.load(std::memory_order_acquire);
583
0
  }
584
585
0
  EXPECT_GE(total_values, RegularBuildVsSanitizers(1000, 100));
586
587
0
  PagingReadCounts counts;
588
589
0
  for(const auto& entry : per_thread_counts) {
590
0
    counts += entry;
591
0
  }
592
593
0
  LOG(INFO) << "Read counts: " << counts.ToString();
594
0
  return counts;
595
0
}
596
597
constexpr auto kExpectedMinCount = RegularBuildVsSanitizers(20, 1);
598
599
0
TEST_F_EX(SnapshotTxnTest, Paging, SingleTabletSnapshotTxnTest) {
600
0
  FLAGS_ycql_consistent_transactional_paging = true;
601
602
0
  auto counts = ASSERT_RESULT(TestPaging());
603
604
0
  EXPECT_GE(counts.good, kExpectedMinCount);
605
0
  EXPECT_GE(counts.failed, kExpectedMinCount);
606
0
  EXPECT_EQ(counts.inconsistent, 0);
607
0
}
608
609
0
TEST_F_EX(SnapshotTxnTest, InconsistentPaging, SingleTabletSnapshotTxnTest) {
610
0
  FLAGS_ycql_consistent_transactional_paging = false;
611
612
0
  auto counts = ASSERT_RESULT(TestPaging());
613
614
0
  EXPECT_GE(counts.good, kExpectedMinCount);
615
  // We need high operation rate to catch inconsistency, so doing this check only in release mode.
616
0
  if (!IsSanitizer()) {
617
0
    EXPECT_GE(counts.inconsistent, 1);
618
0
  }
619
0
  EXPECT_EQ(counts.failed, 0);
620
0
}
621
622
0
TEST_F(SnapshotTxnTest, HotRow) {
623
0
  constexpr int kBlockSize = RegularBuildVsSanitizers(1000, 100);
624
0
  constexpr int kNumBlocks = 10;
625
0
  constexpr int kIterations = kBlockSize * kNumBlocks;
626
0
  constexpr int kKey = 42;
627
628
0
  MonoDelta block_time;
629
0
  TransactionPool pool(transaction_manager_.get_ptr(), nullptr /* metric_entity */);
630
0
  auto session = CreateSession();
631
0
  MonoTime start = MonoTime::Now();
632
0
  for (int i = 1; i <= kIterations; ++i) {
633
0
    auto txn = ASSERT_RESULT(pool.TakeAndInit(GetIsolationLevel()));
634
0
    session->SetTransaction(txn);
635
636
0
    ASSERT_OK(kv_table_test::Increment(&table_, session, kKey));
637
0
    ASSERT_OK(session->Flush());
638
0
    ASSERT_OK(txn->CommitFuture().get());
639
0
    if (i % kBlockSize == 0) {
640
0
      auto now = MonoTime::Now();
641
0
      auto passed = now - start;
642
0
      start = now;
643
644
0
      LOG(INFO) << "Written: " << i << " for " << passed;
645
0
      if (block_time) {
646
0
        ASSERT_LE(passed, block_time * 2);
647
0
      } else {
648
0
        block_time = passed;
649
0
      }
650
0
    }
651
0
  }
652
0
}
653
654
struct KeyToCheck {
655
  int value;
656
  TransactionId txn_id;
657
  KeyToCheck* next = nullptr;
658
659
0
  explicit KeyToCheck(int value_, const TransactionId& txn_id_) : value(value_), txn_id(txn_id_) {}
660
661
0
  friend void SetNext(KeyToCheck* key_to_check, KeyToCheck* next) {
662
0
    key_to_check->next = next;
663
0
  }
664
665
0
  friend KeyToCheck* GetNext(KeyToCheck* key_to_check) {
666
0
    return key_to_check->next;
667
0
  }
668
};
669
670
0
bool IntermittentTxnFailure(const Status& status) {
671
0
  static const std::vector<std::string> kAllowedMessages = {
672
0
    "Commit of expired transaction"s,
673
0
    "Leader does not have a valid lease"s,
674
0
    "Network error"s,
675
0
    "Not the leader"s,
676
0
    "Service is shutting down"s,
677
0
    "Timed out"s,
678
0
    "Transaction aborted"s,
679
0
    "expired or aborted by a conflict"s,
680
0
    "Transaction metadata missing"s,
681
0
    "Unknown transaction, could be recently aborted"s,
682
0
    "Transaction was recently aborted"s,
683
0
  };
684
0
  auto msg = status.ToString();
685
0
  for (const auto& allowed : kAllowedMessages) {
686
0
    if (msg.find(allowed) != std::string::npos) {
687
0
      return true;
688
0
    }
689
0
  }
690
691
0
  return false;
692
0
}
693
694
// Concurrently execute multiple transaction, each of them writes the same key multiple times.
695
// And perform tserver restarts in parallel to it.
696
// This test checks that transaction participant state correctly restored after restart.
697
0
void SnapshotTxnTest::TestMultiWriteWithRestart() {
698
0
  constexpr int kNumWritesPerKey = 10;
699
700
0
  FLAGS_TEST_inject_load_transaction_delay_ms = 25;
701
702
0
  TestThreadHolder thread_holder;
703
704
0
  thread_holder.AddThreadFunctor([this, &stop = thread_holder.stop_flag()] {
705
0
    auto se = ScopeExit([] {
706
0
      LOG(INFO) << "Restarts done";
707
0
    });
708
0
    int ts_idx_to_restart = 0;
709
0
    while (!stop.load(std::memory_order_acquire)) {
710
0
      std::this_thread::sleep_for(5s);
711
0
      ts_idx_to_restart = (ts_idx_to_restart + 1) % cluster_->num_tablet_servers();
712
0
      LongOperationTracker long_operation_tracker("Restart", 20s);
713
0
      ASSERT_OK(cluster_->mini_tablet_server(ts_idx_to_restart)->Restart());
714
0
    }
715
0
  });
716
717
0
  MPSCQueue<KeyToCheck> keys_to_check;
718
0
  TransactionPool pool(transaction_manager_.get_ptr(), nullptr /* metric_entity */);
719
0
  std::atomic<int> key(0);
720
0
  std::atomic<int> good_keys(0);
721
0
  for (int i = 0; i != 25; ++i) {
722
0
    thread_holder.AddThreadFunctor(
723
0
        [this, &stop = thread_holder.stop_flag(), &pool, &key, &keys_to_check, &good_keys] {
724
0
      auto se = ScopeExit([] {
725
0
        LOG(INFO) << "Write done";
726
0
      });
727
0
      auto session = CreateSession();
728
0
      while (!stop.load(std::memory_order_acquire)) {
729
0
        int k = key.fetch_add(1, std::memory_order_acq_rel);
730
0
        auto txn = ASSERT_RESULT(pool.TakeAndInit(GetIsolationLevel()));
731
0
        session->SetTransaction(txn);
732
0
        bool good = true;
733
0
        for (int j = 1; j <= kNumWritesPerKey; ++j) {
734
0
          if (j > 1) {
735
0
            std::this_thread::sleep_for(100ms);
736
0
          }
737
0
          auto write_result = WriteRow(session, k, j);
738
0
          if (!write_result.ok()) {
739
0
            ASSERT_TRUE(IntermittentTxnFailure(write_result.status())) << write_result.status();
740
0
            good = false;
741
0
            break;
742
0
          }
743
0
        }
744
0
        if (!good) {
745
0
          continue;
746
0
        }
747
0
        auto commit_status = txn->CommitFuture().get();
748
0
        if (!commit_status.ok()) {
749
0
          ASSERT_TRUE(IntermittentTxnFailure(commit_status)) << commit_status;
750
0
        } else {
751
0
          keys_to_check.Push(new KeyToCheck(k, txn->id()));
752
0
          good_keys.fetch_add(1, std::memory_order_acq_rel);
753
0
        }
754
0
      }
755
0
    });
756
0
  }
757
758
0
  thread_holder.AddThreadFunctor(
759
0
      [this, &stop = thread_holder.stop_flag(), &keys_to_check, kNumWritesPerKey] {
760
0
    auto se = ScopeExit([] {
761
0
      LOG(INFO) << "Read done";
762
0
    });
763
0
    auto session = CreateSession();
764
0
    for (;;) {
765
0
      std::unique_ptr<KeyToCheck> key(keys_to_check.Pop());
766
0
      if (key == nullptr) {
767
0
        if (stop.load(std::memory_order_acquire)) {
768
0
          break;
769
0
        }
770
0
        std::this_thread::sleep_for(10ms);
771
0
        continue;
772
0
      }
773
0
      SCOPED_TRACE(Format("Reading $0, written with: $1", key->value, key->txn_id));
774
0
      YBqlReadOpPtr op;
775
0
      for (;;) {
776
0
        op = ReadRow(session, key->value);
777
0
        auto flush_result = session->Flush();
778
0
        if (flush_result.ok()) {
779
0
          if (op->succeeded()) {
780
0
            break;
781
0
          }
782
0
          if (op->response().error_message().find("timed out after") == std::string::npos) {
783
0
            ASSERT_TRUE(op->succeeded()) << "Read failed: " << op->response().ShortDebugString();
784
0
          }
785
0
        }
786
0
      }
787
0
      auto rowblock = yb::ql::RowsResult(op.get()).GetRowBlock();
788
0
      ASSERT_EQ(rowblock->row_count(), 1);
789
0
      const auto& first_column = rowblock->row(0).column(0);
790
0
      ASSERT_EQ(InternalType::kInt32Value, first_column.type());
791
0
      ASSERT_EQ(first_column.int32_value(), kNumWritesPerKey);
792
0
    }
793
0
  });
794
795
0
  LOG(INFO) << "Running";
796
0
  thread_holder.WaitAndStop(60s);
797
798
0
  LOG(INFO) << "Stopped";
799
800
0
  for (;;) {
801
0
    std::unique_ptr<KeyToCheck> key(keys_to_check.Pop());
802
0
    if (key == nullptr) {
803
0
      break;
804
0
    }
805
0
  }
806
807
0
  ASSERT_GE(good_keys.load(std::memory_order_relaxed), key.load(std::memory_order_relaxed) * 0.7);
808
809
0
  LOG(INFO) << "Done";
810
0
}
811
812
0
TEST_F(SnapshotTxnTest, MultiWriteWithRestart) {
813
0
  TestMultiWriteWithRestart();
814
0
}
815
816
0
TEST_F(SnapshotTxnTest, MultiWriteWithRestartAndLongApply) {
817
0
  FLAGS_txn_max_apply_batch_records = 3;
818
0
  TestMultiWriteWithRestart();
819
0
}
820
821
using RemoteBootstrapOnStartBase = TransactionCustomLogSegmentSizeTest<128, SnapshotTxnTest>;
822
823
0
void SnapshotTxnTest::TestRemoteBootstrap() {
824
0
  constexpr int kTransactionsCount = RegularBuildVsSanitizers(100, 10);
825
0
  FLAGS_log_min_seconds_to_retain = 1;
826
0
  DisableTransactionTimeout();
827
828
0
  for (int iteration = 0; iteration != 4; ++iteration) {
829
0
    DisableApplyingIntents();
830
831
0
    TestThreadHolder thread_holder;
832
833
0
    std::atomic<int> transactions(0);
834
835
0
    thread_holder.AddThreadFunctor(
836
0
        [this, &stop = thread_holder.stop_flag(), &transactions] {
837
0
      auto session = CreateSession();
838
0
      for (int transaction_idx = 0; !stop.load(std::memory_order_acquire); ++transaction_idx) {
839
0
        auto txn = CreateTransaction();
840
0
        session->SetTransaction(txn);
841
0
        if (WriteRows(session, transaction_idx).ok() && txn->CommitFuture().get().ok()) {
842
0
          transactions.fetch_add(1);
843
0
        }
844
0
      }
845
0
    });
846
847
0
    ASSERT_OK(thread_holder.WaitCondition([&transactions] {
848
0
      return transactions.load(std::memory_order_acquire) >= kTransactionsCount;
849
0
    }));
850
851
0
    cluster_->mini_tablet_server(0)->Shutdown();
852
853
0
    SetIgnoreApplyingProbability(0.0);
854
855
0
    std::this_thread::sleep_for(FLAGS_log_min_seconds_to_retain * 1s);
856
857
0
    auto start_transactions = transactions.load(std::memory_order_acquire);
858
0
    ASSERT_OK(thread_holder.WaitCondition([&transactions, start_transactions] {
859
0
      return transactions.load(std::memory_order_acquire) >=
860
0
             start_transactions + kTransactionsCount;
861
0
    }));
862
863
0
    thread_holder.Stop();
864
865
0
    LOG(INFO) << "Flushing";
866
0
    ASSERT_OK(cluster_->FlushTablets());
867
868
0
    LOG(INFO) << "Clean logs";
869
0
    ASSERT_OK(cluster_->CleanTabletLogs());
870
871
    // Shutdown to reset cached logs.
872
0
    for (size_t i = 1; i != cluster_->num_tablet_servers(); ++i) {
873
0
      cluster_->mini_tablet_server(i)->Shutdown();
874
0
    }
875
876
    // Start all servers. Cluster verifier should check that all tablets are synchronized.
877
0
    for (auto i = cluster_->num_tablet_servers(); i > 0;) {
878
0
      --i;
879
0
      ASSERT_OK(cluster_->mini_tablet_server(i)->Start());
880
0
    }
881
882
0
    ASSERT_OK(WaitFor([this] { return CheckAllTabletsRunning(); }, 20s * kTimeMultiplier,
883
0
                      "All tablets running"));
884
0
  }
885
0
}
886
887
0
TEST_F_EX(SnapshotTxnTest, RemoteBootstrapOnStart, RemoteBootstrapOnStartBase) {
888
0
  TestRemoteBootstrap();
889
0
}
890
891
0
TEST_F_EX(SnapshotTxnTest, TruncateDuringShutdown, RemoteBootstrapOnStartBase) {
892
0
  FLAGS_TEST_inject_load_transaction_delay_ms = 50;
893
894
0
  constexpr int kTransactionsCount = RegularBuildVsSanitizers(20, 5);
895
0
  FLAGS_log_min_seconds_to_retain = 1;
896
0
  DisableTransactionTimeout();
897
898
0
  DisableApplyingIntents();
899
900
0
  TestThreadHolder thread_holder;
901
902
0
  std::atomic<int> transactions(0);
903
904
0
  thread_holder.AddThreadFunctor(
905
0
      [this, &stop = thread_holder.stop_flag(), &transactions] {
906
0
    auto session = CreateSession();
907
0
    for (int transaction_idx = 0; !stop.load(std::memory_order_acquire); ++transaction_idx) {
908
0
      auto txn = CreateTransaction();
909
0
      session->SetTransaction(txn);
910
0
      if (WriteRows(session, transaction_idx).ok() && txn->CommitFuture().get().ok()) {
911
0
        transactions.fetch_add(1);
912
0
      }
913
0
    }
914
0
  });
915
916
0
  while (transactions.load(std::memory_order_acquire) < kTransactionsCount) {
917
0
    std::this_thread::sleep_for(100ms);
918
0
  }
919
920
0
  cluster_->mini_tablet_server(0)->Shutdown();
921
922
0
  thread_holder.Stop();
923
924
0
  ASSERT_OK(client_->TruncateTable(table_.table()->id()));
925
926
0
  ASSERT_OK(cluster_->mini_tablet_server(0)->Start());
927
928
0
  ASSERT_OK(WaitFor([this] { return CheckAllTabletsRunning(); }, 20s * kTimeMultiplier,
929
0
                    "All tablets running"));
930
0
}
931
932
0
TEST_F_EX(SnapshotTxnTest, ResolveIntents, SingleTabletSnapshotTxnTest) {
933
0
  SetIgnoreApplyingProbability(0.5);
934
935
0
  TransactionPool pool(transaction_manager_.get_ptr(), nullptr /* metric_entity */);
936
0
  auto session = CreateSession();
937
0
  auto prev_ht = clock_->Now();
938
0
  for (int i = 0; i != 4; ++i) {
939
0
    auto txn = ASSERT_RESULT(pool.TakeAndInit(isolation_level_));
940
0
    session->SetTransaction(txn);
941
0
    ASSERT_OK(WriteRow(session, i, -i));
942
0
    ASSERT_OK(txn->CommitFuture().get());
943
944
0
    auto peers = ListTabletPeers(
945
0
        cluster_.get(), [](const std::shared_ptr<tablet::TabletPeer>& peer) {
946
0
      if (peer->table_type() == TableType::TRANSACTION_STATUS_TABLE_TYPE) {
947
0
        return false;
948
0
      }
949
0
      return peer->consensus()->GetLeaderStatus() == consensus::LeaderStatus::LEADER_AND_READY;
950
0
    });
951
0
    ASSERT_EQ(peers.size(), 1);
952
0
    auto peer = peers[0];
953
0
    auto tablet = peer->tablet();
954
0
    ASSERT_OK(tablet->transaction_participant()->ResolveIntents(
955
0
        peer->clock().Now(), CoarseTimePoint::max()));
956
0
    auto current_ht = clock_->Now();
957
0
    ASSERT_OK(tablet->Flush(tablet::FlushMode::kSync));
958
0
    bool found = false;
959
0
    auto files = tablet->TEST_db()->GetLiveFilesMetaData();
960
0
    for (const auto& meta : files) {
961
0
      auto min_ht = down_cast<docdb::ConsensusFrontier&>(
962
0
          *meta.smallest.user_frontier).hybrid_time();
963
0
      auto max_ht = down_cast<docdb::ConsensusFrontier&>(
964
0
          *meta.largest.user_frontier).hybrid_time();
965
0
      if (min_ht > prev_ht && max_ht < current_ht) {
966
0
        found = true;
967
0
        break;
968
0
      }
969
0
    }
970
971
0
    ASSERT_TRUE(found) << "Cannot find SST file that fits into " << prev_ht << " - " << current_ht
972
0
                       << " range, files: " << AsString(files);
973
974
0
    prev_ht = current_ht;
975
0
  }
976
0
}
977
978
0
TEST_F(SnapshotTxnTest, DeleteOnLoad) {
979
0
  constexpr int kTransactions = 400;
980
981
0
  FLAGS_TEST_inject_status_resolver_delay_ms = 150 * kTimeMultiplier;
982
983
0
  DisableApplyingIntents();
984
985
0
  TransactionPool pool(transaction_manager_.get_ptr(), nullptr /* metric_entity */);
986
0
  auto session = CreateSession();
987
0
  for (int i = 0; i != kTransactions; ++i) {
988
0
    WriteData(WriteOpType::INSERT, i);
989
0
  }
990
991
0
  cluster_->mini_tablet_server(0)->Shutdown();
992
993
0
  ASSERT_OK(client_->DeleteTable(table_.table()->name(), /* wait= */ false));
994
995
  // Wait delete table request to replicate on alive node.
996
0
  std::this_thread::sleep_for(1s * kTimeMultiplier);
997
998
0
  ASSERT_OK(cluster_->mini_tablet_server(0)->Start());
999
0
}
1000
1001
} // namespace client
1002
} // namespace yb