YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/client/ql-transaction-test.cc
Line
Count
Source (jump to first uncovered line)
1
//
2
// Copyright (c) YugaByte, Inc.
3
//
4
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
5
// in compliance with the License.  You may obtain a copy of the License at
6
//
7
// http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software distributed under the License
10
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
11
// or implied.  See the License for the specific language governing permissions and limitations
12
// under the License.
13
//
14
//
15
16
#include "yb/client/error.h"
17
#include "yb/client/schema.h"
18
#include "yb/client/session.h"
19
#include "yb/client/table.h"
20
#include "yb/client/table_alterer.h"
21
#include "yb/client/transaction.h"
22
#include "yb/client/transaction_rpc.h"
23
#include "yb/client/txn-test-base.h"
24
#include "yb/client/yb_op.h"
25
26
#include "yb/common/ql_value.h"
27
28
#include "yb/consensus/consensus.h"
29
#include "yb/consensus/log.h"
30
31
#include "yb/rocksdb/db.h"
32
33
#include "yb/rpc/rpc.h"
34
35
#include "yb/tablet/tablet.h"
36
#include "yb/tablet/tablet_bootstrap_if.h"
37
#include "yb/tablet/tablet_peer.h"
38
#include "yb/tablet/transaction_coordinator.h"
39
40
#include "yb/tserver/mini_tablet_server.h"
41
#include "yb/tserver/tablet_server.h"
42
#include "yb/tserver/ts_tablet_manager.h"
43
#include "yb/tserver/tserver_service.pb.h"
44
45
#include "yb/util/async_util.h"
46
#include "yb/util/random_util.h"
47
#include "yb/util/scope_exit.h"
48
#include "yb/util/size_literals.h"
49
#include "yb/util/test_thread_holder.h"
50
#include "yb/util/tsan_util.h"
51
52
#include "yb/yql/cql/ql/util/errcodes.h"
53
#include "yb/yql/cql/ql/util/statement_result.h"
54
55
using namespace std::literals;
56
57
using yb::tablet::GetTransactionTimeout;
58
using yb::tablet::TabletPeer;
59
60
DECLARE_bool(TEST_disable_proactive_txn_cleanup_on_abort);
61
DECLARE_bool(TEST_fail_in_apply_if_no_metadata);
62
DECLARE_bool(TEST_master_fail_transactional_tablet_lookups);
63
DECLARE_bool(TEST_transaction_allow_rerequest_status);
64
DECLARE_bool(delete_intents_sst_files);
65
DECLARE_bool(enable_load_balancing);
66
DECLARE_bool(fail_on_out_of_range_clock_skew);
67
DECLARE_bool(flush_rocksdb_on_shutdown);
68
DECLARE_bool(rocksdb_disable_compactions);
69
DECLARE_int32(TEST_delay_init_tablet_peer_ms);
70
DECLARE_int32(log_min_seconds_to_retain);
71
DECLARE_int32(remote_bootstrap_max_chunk_size);
72
DECLARE_int64(transaction_rpc_timeout_ms);
73
DECLARE_uint64(TEST_transaction_delay_status_reply_usec_in_tests);
74
DECLARE_uint64(aborted_intent_cleanup_ms);
75
DECLARE_uint64(max_clock_skew_usec);
76
DECLARE_uint64(transaction_heartbeat_usec);
77
78
namespace yb {
79
namespace client {
80
81
struct WriteConflictsOptions {
82
  bool do_restarts = false;
83
  size_t active_transactions = 50;
84
  int total_keys = 5;
85
  bool non_txn_writes = false;
86
};
87
88
class QLTransactionTest : public TransactionTestBase<MiniCluster> {
89
 protected:
90
44
  void SetUp() override {
91
44
    SetIsolationLevel(IsolationLevel::SNAPSHOT_ISOLATION);
92
44
    TransactionTestBase::SetUp();
93
44
  }
94
95
  // We write data with first transaction then try to read it another one.
96
  // If commit is true, then first transaction is committed and second should be restarted.
97
  // Otherwise second transaction would see pending intents from first one and should not restart.
98
  void TestReadRestart(bool commit = true);
99
100
  void TestWriteConflicts(const WriteConflictsOptions& options);
101
102
  void TestReadOnlyTablets(IsolationLevel isolation_level,
103
                           bool perform_write,
104
                           bool written_intents_expected);
105
106
0
  CHECKED_STATUS WaitTransactionsCleaned() {
107
0
    return WaitFor(
108
0
      [this] { return !HasTransactions(); }, kTransactionApplyTime, "Transactions cleaned");
109
0
  }
110
111
0
  CHECKED_STATUS WaitIntentsCleaned() {
112
0
    return WaitFor(
113
0
      [this] { return CountIntents(cluster_.get()) == 0; }, kIntentsCleanupTime, "Intents cleaned");
114
0
  }
115
};
116
117
typedef TransactionCustomLogSegmentSizeTest<0, QLTransactionTest>
118
    QLTransactionBigLogSegmentSizeTest;
119
120
0
TEST_F(QLTransactionTest, Simple) {
121
0
  ASSERT_NO_FATALS(WriteData());
122
0
  ASSERT_NO_FATALS(VerifyData());
123
0
  ASSERT_OK(cluster_->RestartSync());
124
0
  AssertNoRunningTransactions();
125
0
}
126
127
0
TEST_F(QLTransactionTest, LookupTabletFailure) {
128
0
  FLAGS_TEST_master_fail_transactional_tablet_lookups = true;
129
130
0
  auto txn = CreateTransaction();
131
0
  auto result = WriteRow(CreateSession(txn), 0 /* key */, 1 /* value */);
132
133
0
  ASSERT_TRUE(!result.ok() && result.status().IsTimedOut()) << "Result: " << AsString(result);
134
0
}
135
136
0
TEST_F(QLTransactionTest, ReadWithTimeInFuture) {
137
0
  FLAGS_fail_on_out_of_range_clock_skew = false;
138
139
0
  WriteData();
140
0
  server::SkewedClockDeltaChanger delta_changer(100ms, skewed_clock_);
141
0
  for (size_t i = 0; i != 100; ++i) {
142
0
    auto transaction = CreateTransaction2();
143
0
    auto session = CreateSession(transaction);
144
0
    VerifyRows(session);
145
0
  }
146
0
  ASSERT_OK(cluster_->RestartSync());
147
0
  AssertNoRunningTransactions();
148
0
}
149
150
TEST_F(QLTransactionTest, WriteSameKey) {
151
  ASSERT_NO_FATALS(WriteDataWithRepetition());
152
  std::this_thread::sleep_for(1s); // Wait some time for intents to apply.
153
  ASSERT_NO_FATALS(VerifyData());
154
  ASSERT_OK(cluster_->RestartSync());
155
}
156
157
0
TEST_F(QLTransactionTest, WriteSameKeyWithIntents) {
158
0
  DisableApplyingIntents();
159
160
0
  ASSERT_NO_FATALS(WriteDataWithRepetition());
161
0
  ASSERT_NO_FATALS(VerifyData());
162
0
  ASSERT_OK(cluster_->RestartSync());
163
0
}
164
165
// Commit flags says whether we should commit write txn during this test.
166
0
void QLTransactionTest::TestReadRestart(bool commit) {
167
0
  SetAtomicFlag(250000ULL, &FLAGS_max_clock_skew_usec);
168
169
0
  {
170
0
    auto write_txn = CreateTransaction();
171
0
    ASSERT_OK(WriteRows(CreateSession(write_txn)));
172
0
    if (commit) {
173
0
      ASSERT_OK(write_txn->CommitFuture().get());
174
0
    }
175
0
    auto se = ScopeExit([write_txn, commit] {
176
0
      if (!commit) {
177
0
        write_txn->Abort();
178
0
      }
179
0
    });
180
181
0
    server::SkewedClockDeltaChanger delta_changer(-100ms, skewed_clock_);
182
183
0
    auto txn1 = CreateTransaction2(SetReadTime::kTrue);
184
0
    auto se2 = ScopeExit([txn1, commit] {
185
0
      if (!commit) {
186
0
        txn1->Abort();
187
0
      }
188
0
    });
189
0
    auto session = CreateSession(txn1);
190
0
    if (commit) {
191
0
      for (size_t r = 0; r != kNumRows; ++r) {
192
0
        auto row = SelectRow(session, KeyForTransactionAndIndex(0, r));
193
0
        ASSERT_NOK(row);
194
0
        ASSERT_EQ(ql::ErrorCode::RESTART_REQUIRED, ql::GetErrorCode(row.status()))
195
0
                      << "Bad row: " << row;
196
0
      }
197
0
      auto txn2 = ASSERT_RESULT(txn1->CreateRestartedTransaction());
198
0
      auto se = ScopeExit([txn2] {
199
0
        txn2->Abort();
200
0
      });
201
0
      session->SetTransaction(txn2);
202
0
      VerifyRows(session);
203
0
      VerifyData();
204
0
    } else {
205
0
      for (size_t r = 0; r != kNumRows; ++r) {
206
0
        auto row = SelectRow(session, KeyForTransactionAndIndex(0, r));
207
0
        ASSERT_TRUE(!row.ok() && row.status().IsNotFound()) << "Bad row: " << row;
208
0
      }
209
0
    }
210
0
  }
211
212
0
  ASSERT_OK(cluster_->RestartSync());
213
0
}
214
215
0
TEST_F(QLTransactionTest, ReadRestart) {
216
0
  TestReadRestart();
217
0
  AssertNoRunningTransactions();
218
0
}
219
220
0
TEST_F(QLTransactionTest, ReadRestartWithIntents) {
221
0
  DisableApplyingIntents();
222
0
  TestReadRestart();
223
0
}
224
225
0
TEST_F(QLTransactionTest, ReadRestartWithPendingIntents) {
226
0
  FLAGS_TEST_transaction_allow_rerequest_status = false;
227
0
  DisableApplyingIntents();
228
0
  TestReadRestart(false /* commit */);
229
0
}
230
231
// Non transactional restart happens in server, so we just checking that we read correct values.
232
// Skewed clocks are used because there could be case when applied intents or commit transaction
233
// has time greater than max safetime to read, that causes restart.
234
0
TEST_F(QLTransactionTest, ReadRestartNonTransactional) {
235
0
  const auto kClockSkew = 500ms;
236
237
0
  SetAtomicFlag(1000000ULL, &FLAGS_max_clock_skew_usec);
238
0
  DisableTransactionTimeout();
239
240
0
  auto delta_changers = SkewClocks(cluster_.get(), kClockSkew);
241
0
  constexpr size_t kTotalTransactions = 10;
242
243
0
  for (size_t i = 0; i != kTotalTransactions; ++i) {
244
0
    SCOPED_TRACE(Format("Transaction $0", i));
245
0
    auto txn = CreateTransaction();
246
0
    ASSERT_OK(WriteRows(CreateSession(txn), i));
247
0
    ASSERT_OK(txn->CommitFuture().get());
248
0
    ASSERT_NO_FATALS(VerifyRows(CreateSession(), i));
249
250
    // We propagate hybrid time, so when commit and read finishes, all servers has about the same
251
    // physical component. We are waiting double skew, until time on servers became skewed again.
252
0
    std::this_thread::sleep_for(kClockSkew * 2);
253
0
  }
254
255
0
  cluster_->Shutdown(); // Need to shutdown cluster before resetting clock back.
256
0
  cluster_.reset();
257
0
}
258
259
0
TEST_F(QLTransactionTest, WriteRestart) {
260
0
  SetAtomicFlag(250000ULL, &FLAGS_max_clock_skew_usec);
261
262
0
  const std::string kExtraColumn = "v2";
263
0
  std::unique_ptr<YBTableAlterer> table_alterer(client_->NewTableAlterer(kTableName));
264
0
  table_alterer->AddColumn(kExtraColumn)->Type(DataType::INT32);
265
0
  ASSERT_OK(table_alterer->Alter());
266
267
0
  ASSERT_OK(table_.Open(kTableName, client_.get())); // Reopen to update schema version.
268
269
0
  WriteData();
270
271
0
  server::SkewedClockDeltaChanger delta_changer(-100ms, skewed_clock_);
272
0
  auto txn1 = CreateTransaction2(SetReadTime::kTrue);
273
0
  YBTransactionPtr txn2;
274
0
  auto session = CreateSession(txn1);
275
0
  for (bool retry : {false, true}) {
276
0
    for (size_t r = 0; r != kNumRows; ++r) {
277
0
      const auto op = table_.NewWriteOp(QLWriteRequestPB::QL_STMT_UPDATE);
278
0
      auto* const req = op->mutable_request();
279
0
      auto key = KeyForTransactionAndIndex(0, r);
280
0
      auto old_value = ValueForTransactionAndIndex(0, r, WriteOpType::INSERT);
281
0
      auto value = ValueForTransactionAndIndex(0, r, WriteOpType::UPDATE);
282
0
      QLAddInt32HashValue(req, key);
283
0
      table_.AddInt32ColumnValue(req, kExtraColumn, value);
284
0
      auto cond = req->mutable_where_expr()->mutable_condition();
285
0
      table_.SetInt32Condition(cond, kValueColumn, QLOperator::QL_OP_EQUAL, old_value);
286
0
      req->mutable_column_refs()->add_ids(table_.ColumnId(kValueColumn));
287
0
      LOG(INFO) << "Updating value";
288
0
      auto status = session->ApplyAndFlush(op);
289
0
      ASSERT_OK(status);
290
0
      if (!retry) {
291
0
        ASSERT_EQ(QLResponsePB::YQL_STATUS_RESTART_REQUIRED_ERROR, op->response().status());
292
0
      } else {
293
0
        ASSERT_EQ(QLResponsePB::YQL_STATUS_OK, op->response().status());
294
0
      }
295
0
    }
296
0
    if (!retry) {
297
0
      txn2 = ASSERT_RESULT(txn1->CreateRestartedTransaction());
298
0
      session->SetTransaction(txn2);
299
0
    }
300
0
  }
301
0
  txn2->CommitFuture().wait();
302
0
  VerifyData();
303
0
  VerifyData(1, WriteOpType::UPDATE, kExtraColumn);
304
305
0
  ASSERT_OK(cluster_->RestartSync());
306
0
  AssertNoRunningTransactions();
307
0
}
308
309
// Check that we could write to transaction that were restarted.
310
0
TEST_F(QLTransactionTest, WriteAfterReadRestart) {
311
0
  const auto kClockDelta = 100ms;
312
0
  SetAtomicFlag(250000ULL, &FLAGS_max_clock_skew_usec);
313
314
0
  auto write_txn = CreateTransaction();
315
0
  ASSERT_OK(WriteRows(CreateSession(write_txn)));
316
0
  ASSERT_OK(write_txn->CommitFuture().get());
317
318
0
  server::SkewedClockDeltaChanger delta_changer(-kClockDelta, skewed_clock_);
319
320
0
  auto txn1 = CreateTransaction2(SetReadTime::kTrue);
321
0
  auto session = CreateSession(txn1);
322
0
  for (size_t r = 0; r != kNumRows; ++r) {
323
0
    auto row = SelectRow(session, KeyForTransactionAndIndex(0, r));
324
0
    ASSERT_NOK(row);
325
0
    ASSERT_EQ(ql::ErrorCode::RESTART_REQUIRED, ql::GetErrorCode(row.status()))
326
0
                  << "Bad row: " << row;
327
0
  }
328
0
  {
329
    // To reset clock back.
330
0
    auto temp_delta_changed = std::move(delta_changer);
331
0
  }
332
0
  auto txn2 = ASSERT_RESULT(txn1->CreateRestartedTransaction());
333
0
  session->SetTransaction(txn2);
334
0
  VerifyRows(session);
335
0
  for (size_t r = 0; r != kNumRows; ++r) {
336
0
    auto result = WriteRow(
337
0
        session, KeyForTransactionAndIndex(0, r),
338
0
        ValueForTransactionAndIndex(0, r, WriteOpType::UPDATE), WriteOpType::UPDATE);
339
0
    ASSERT_OK(result);
340
0
  }
341
342
0
  txn2->Abort();
343
344
0
  VerifyData();
345
0
}
346
347
0
TEST_F(QLTransactionTest, Child) {
348
0
  auto txn = CreateTransaction();
349
0
  TransactionManager manager2(client_.get(), clock_, client::LocalTabletFilter());
350
0
  auto data_pb = txn->PrepareChildFuture(ForceConsistentRead::kFalse).get();
351
0
  ASSERT_OK(data_pb);
352
0
  auto data = ChildTransactionData::FromPB(*data_pb);
353
0
  ASSERT_OK(data);
354
0
  auto txn2 = std::make_shared<YBTransaction>(&manager2, std::move(*data));
355
356
0
  ASSERT_OK(WriteRows(CreateSession(txn2), 0));
357
0
  auto result = txn2->FinishChild();
358
0
  ASSERT_OK(result);
359
0
  ASSERT_OK(txn->ApplyChildResult(*result));
360
361
0
  ASSERT_OK(txn->CommitFuture().get());
362
363
0
  ASSERT_NO_FATALS(VerifyData());
364
0
  ASSERT_OK(cluster_->RestartSync());
365
0
  AssertNoRunningTransactions();
366
0
}
367
368
0
TEST_F(QLTransactionTest, ChildReadRestart) {
369
0
  SetAtomicFlag(250000ULL, &FLAGS_max_clock_skew_usec);
370
371
0
  {
372
0
    auto write_txn = CreateTransaction();
373
0
    ASSERT_OK(WriteRows(CreateSession(write_txn)));
374
0
    ASSERT_OK(write_txn->CommitFuture().get());
375
0
  }
376
377
0
  server::SkewedClockDeltaChanger delta_changer(-100ms, skewed_clock_);
378
0
  auto parent_txn = CreateTransaction2(SetReadTime::kTrue);
379
380
0
  auto data_pb = parent_txn->PrepareChildFuture(ForceConsistentRead::kFalse).get();
381
0
  ASSERT_OK(data_pb);
382
0
  auto data = ChildTransactionData::FromPB(*data_pb);
383
0
  ASSERT_OK(data);
384
385
0
  server::ClockPtr clock3(new server::HybridClock(skewed_clock_));
386
0
  ASSERT_OK(clock3->Init());
387
0
  TransactionManager manager3(client_.get(), clock3, client::LocalTabletFilter());
388
0
  auto child_txn = std::make_shared<YBTransaction>(&manager3, std::move(*data));
389
390
0
  auto session = CreateSession(child_txn);
391
0
  for (size_t r = 0; r != kNumRows; ++r) {
392
0
    auto row = SelectRow(session, KeyForTransactionAndIndex(0, r));
393
0
    ASSERT_NOK(row);
394
0
    ASSERT_EQ(ql::ErrorCode::RESTART_REQUIRED, ql::GetErrorCode(row.status()))
395
0
                  << "Bad row: " << row;
396
0
  }
397
398
0
  auto result = child_txn->FinishChild();
399
0
  ASSERT_OK(result);
400
0
  ASSERT_OK(parent_txn->ApplyChildResult(*result));
401
402
0
  auto master2_txn = ASSERT_RESULT(parent_txn->CreateRestartedTransaction());
403
0
  session->SetTransaction(master2_txn);
404
0
  for (size_t r = 0; r != kNumRows; ++r) {
405
0
    auto row = SelectRow(session, KeyForTransactionAndIndex(0, r));
406
0
    ASSERT_OK(row);
407
0
    ASSERT_EQ(ValueForTransactionAndIndex(0, r, WriteOpType::INSERT), *row);
408
0
  }
409
0
  ASSERT_NO_FATALS(VerifyData());
410
411
0
  ASSERT_OK(cluster_->RestartSync());
412
0
  AssertNoRunningTransactions();
413
0
}
414
415
0
TEST_F(QLTransactionTest, InsertUpdate) {
416
0
  DisableApplyingIntents();
417
0
  WriteData(); // Add data
418
0
  WriteData(); // Update data
419
0
  VerifyData();
420
0
  ASSERT_OK(cluster_->RestartSync());
421
0
}
422
423
0
TEST_F(QLTransactionTest, Cleanup) {
424
0
  WriteData();
425
0
  VerifyData();
426
427
  // Wait transaction apply. Otherwise count could be non zero.
428
0
  ASSERT_OK(WaitTransactionsCleaned());
429
0
  VerifyData();
430
0
  ASSERT_OK(cluster_->RestartSync());
431
0
  AssertNoRunningTransactions();
432
0
}
433
434
0
TEST_F(QLTransactionTest, Heartbeat) {
435
0
  auto txn = CreateTransaction();
436
0
  auto session = CreateSession(txn);
437
0
  ASSERT_OK(WriteRows(session));
438
0
  std::this_thread::sleep_for(GetTransactionTimeout() * 2);
439
0
  ASSERT_OK(txn->CommitFuture().get());
440
0
  VerifyData();
441
0
  AssertNoRunningTransactions();
442
0
}
443
444
0
TEST_F(QLTransactionTest, Expire) {
445
0
  SetDisableHeartbeatInTests(true);
446
0
  auto txn = CreateTransaction();
447
0
  auto session = CreateSession(txn);
448
0
  ASSERT_OK(WriteRows(session));
449
0
  std::this_thread::sleep_for(GetTransactionTimeout() * 2);
450
0
  auto commit_status = txn->CommitFuture().get();
451
0
  ASSERT_TRUE(commit_status.IsExpired()) << "Bad status: " << commit_status;
452
0
  std::this_thread::sleep_for(std::chrono::microseconds(FLAGS_transaction_heartbeat_usec * 2));
453
0
  ASSERT_OK(cluster_->CleanTabletLogs());
454
0
  ASSERT_FALSE(HasTransactions());
455
0
}
456
457
0
TEST_F(QLTransactionTest, PreserveLogs) {
458
0
  FLAGS_transaction_rpc_timeout_ms = 60000;
459
0
  SetDisableHeartbeatInTests(true);
460
0
  DisableTransactionTimeout();
461
0
  std::vector<std::shared_ptr<YBTransaction>> transactions;
462
0
  constexpr size_t kTransactions = 20;
463
0
  for (size_t i = 0; i != kTransactions; ++i) {
464
0
    auto txn = CreateTransaction();
465
0
    auto session = CreateSession(txn);
466
0
    ASSERT_OK(WriteRows(session, i));
467
0
    transactions.push_back(std::move(txn));
468
0
    std::this_thread::sleep_for(100ms);
469
0
  }
470
0
  LOG(INFO) << "Request clean";
471
0
  ASSERT_OK(cluster_->CleanTabletLogs());
472
0
  ASSERT_OK(cluster_->RestartSync());
473
0
  CountDownLatch latch(kTransactions);
474
0
  for (auto& transaction : transactions) {
475
0
    transaction->Commit([&latch](const Status& status) {
476
0
      EXPECT_OK(status);
477
0
      latch.CountDown();
478
0
    });
479
0
  }
480
0
  latch.Wait();
481
0
  VerifyData(kTransactions);
482
0
  AssertNoRunningTransactions();
483
0
  auto peers = ListTabletPeers(cluster_.get(), ListPeersFilter::kAll);
484
0
  uint64_t max_active_segment_sequence_number = 0;
485
0
  for (const auto& peer : peers) {
486
0
    if (peer->table_type() != TableType::TRANSACTION_STATUS_TABLE_TYPE) {
487
0
      continue;
488
0
    }
489
0
    auto current_active_segment_sequence_number = peer->log()->active_segment_sequence_number();
490
0
    LOG(INFO) << peer->LogPrefix() << "active segment: "
491
0
              << current_active_segment_sequence_number;
492
0
    max_active_segment_sequence_number = std::max(
493
0
        max_active_segment_sequence_number, current_active_segment_sequence_number);
494
0
  }
495
496
  // Ensure that we had enough log segments, otherwise this test is pretty useless.
497
0
  ASSERT_GE(max_active_segment_sequence_number, kTransactions / 4);
498
0
}
499
500
0
TEST_F(QLTransactionTest, ResendApplying) {
501
0
  DisableApplyingIntents();
502
0
  WriteData();
503
0
  std::this_thread::sleep_for(5s); // Transaction should not be applied here.
504
0
  ASSERT_TRUE(HasTransactions());
505
506
0
  SetIgnoreApplyingProbability(0.0);
507
508
0
  ASSERT_OK(WaitTransactionsCleaned());
509
0
  VerifyData();
510
0
  ASSERT_OK(cluster_->RestartSync());
511
0
  AssertNoRunningTransactions();
512
0
}
513
514
0
TEST_F(QLTransactionTest, ConflictResolution) {
515
0
  constexpr int kTotalTransactions = 5;
516
0
  constexpr int kNumRows = 10;
517
0
  std::vector<YBTransactionPtr> transactions;
518
0
  std::vector<YBSessionPtr> sessions;
519
0
  std::vector<std::vector<YBqlWriteOpPtr>> write_ops(kTotalTransactions);
520
521
0
  CountDownLatch latch(kTotalTransactions);
522
0
  for (int i = 0; i != kTotalTransactions; ++i) {
523
0
    transactions.push_back(CreateTransaction());
524
0
    auto session = CreateSession(transactions.back());
525
0
    sessions.push_back(session);
526
0
    for (int r = 0; r != kNumRows; ++r) {
527
0
      write_ops[i].push_back(ASSERT_RESULT(WriteRow(
528
0
          sessions.back(), r, i, WriteOpType::INSERT, Flush::kFalse)));
529
0
    }
530
0
    session->FlushAsync([&latch](FlushStatus* flush_status) { latch.CountDown(); });
531
0
  }
532
0
  latch.Wait();
533
534
0
  latch.Reset(transactions.size());
535
0
  std::atomic<size_t> successes(0);
536
0
  std::atomic<size_t> failures(0);
537
538
0
  for (size_t i = 0; i != kTotalTransactions; ++i) {
539
0
    bool success = true;
540
0
    for (auto& op : write_ops[i]) {
541
0
      if (!op->succeeded()) {
542
0
        success = false;
543
0
        break;
544
0
      }
545
0
    }
546
0
    if (!success) {
547
0
      failures.fetch_add(1, std::memory_order_release);
548
0
      latch.CountDown(1);
549
0
      continue;
550
0
    }
551
0
    transactions[i]->Commit([&latch, &successes, &failures](const Status& status) {
552
0
      if (status.ok()) {
553
0
        successes.fetch_add(1, std::memory_order_release);
554
0
      } else {
555
0
        failures.fetch_add(1, std::memory_order_release);
556
0
      }
557
0
      latch.CountDown(1);
558
0
    });
559
0
  }
560
561
0
  latch.Wait();
562
0
  LOG(INFO) << "Committed, successes: " << successes.load() << ", failures: " << failures.load();
563
564
0
  ASSERT_GE(successes.load(std::memory_order_acquire), 1);
565
0
  ASSERT_GE(failures.load(std::memory_order_acquire), 1);
566
567
0
  auto session = CreateSession();
568
0
  std::vector<int32_t> values;
569
0
  for (int r = 0; r != kNumRows; ++r) {
570
0
    auto row = SelectRow(session, r);
571
0
    ASSERT_OK(row);
572
0
    values.push_back(*row);
573
0
  }
574
0
  for (const auto& value : values) {
575
0
    ASSERT_EQ(values.front(), value) << "Values: " << yb::ToString(values);
576
0
  }
577
0
}
578
579
0
TEST_F(QLTransactionTest, SimpleWriteConflict) {
580
0
  auto transaction = CreateTransaction();
581
0
  ASSERT_OK(WriteRows(CreateSession(transaction)));
582
0
  ASSERT_OK(WriteRows(CreateSession()));
583
584
0
  ASSERT_NOK(transaction->CommitFuture().get());
585
0
}
586
587
void QLTransactionTest::TestReadOnlyTablets(IsolationLevel isolation_level,
588
                                            bool perform_write,
589
0
                                            bool written_intents_expected) {
590
0
  SetIsolationLevel(isolation_level);
591
592
0
  YBTransactionPtr txn = CreateTransaction();
593
0
  YBSessionPtr session = CreateSession(txn);
594
595
0
  ReadRow(session, 0 /* key */);
596
0
  if (perform_write) {
597
0
    ASSERT_OK(WriteRow(session, 1 /* key */, 1 /* value */, WriteOpType::INSERT, Flush::kFalse));
598
0
  }
599
0
  ASSERT_OK(session->Flush());
600
601
  // Verify intents were written if expected.
602
0
  if (written_intents_expected) {
603
0
    ASSERT_GT(CountIntents(cluster_.get()), 0);
604
0
  } else {
605
0
    ASSERT_EQ(CountIntents(cluster_.get()), 0);
606
0
  }
607
608
  // Commit and verify transaction and intents were applied/cleaned up.
609
0
  ASSERT_OK(txn->CommitFuture().get());
610
0
  ASSERT_OK(WaitTransactionsCleaned());
611
0
  ASSERT_OK(WaitIntentsCleaned());
612
0
}
613
614
0
TEST_F(QLTransactionTest, ReadOnlyTablets) {
615
0
  FLAGS_TEST_fail_in_apply_if_no_metadata = true;
616
617
  // In snapshot isolation, tablets only read from will not have metadata written, so applying
618
  // intents on this tablet would cause the test to fail.
619
0
  TestReadOnlyTablets(IsolationLevel::SNAPSHOT_ISOLATION,
620
0
                      false /* perform_write */,
621
0
                      false /* written_intents_expected */);
622
623
  // Writes always write intents, so metadata should be written and intents should be applied
624
  // on this tablet.
625
0
  TestReadOnlyTablets(IsolationLevel::SNAPSHOT_ISOLATION,
626
0
                      true /* perform_write */,
627
0
                      true /* written_intents_expected */);
628
629
  // In serializable isolation, reads write intents, so metadata should be written and intents
630
  // should be applied on this tablet.
631
0
  TestReadOnlyTablets(IsolationLevel::SERIALIZABLE_ISOLATION,
632
0
                      false /* perform_write */,
633
0
                      true /* written_intents_expected */);
634
0
}
635
636
0
void QLTransactionTest::TestWriteConflicts(const WriteConflictsOptions& options) {
637
0
  struct ActiveTransaction {
638
0
    YBTransactionPtr transaction;
639
0
    YBSessionPtr session;
640
0
    std::future<FlushStatus> flush_future;
641
0
    std::future<Status> commit_future;
642
643
0
    std::string ToString() const {
644
0
      ANNOTATE_IGNORE_READS_BEGIN();
645
0
      auto str = transaction ? transaction->ToString() : "no-txn";
646
0
      ANNOTATE_IGNORE_READS_END();
647
0
      return str;
648
0
    }
649
0
  };
650
651
0
  constexpr auto kTestTime = 60s;
652
0
  std::vector<ActiveTransaction> active_transactions;
653
654
0
  auto stop = std::chrono::steady_clock::now() + kTestTime;
655
656
0
  std::thread restart_thread;
657
658
0
  if (options.do_restarts) {
659
0
    restart_thread = std::thread([this, stop] {
660
0
        CDSAttacher attacher;
661
0
        int it = 0;
662
0
        while (std::chrono::steady_clock::now() < stop) {
663
0
          std::this_thread::sleep_for(5s);
664
0
          ASSERT_OK(cluster_->mini_tablet_server(++it % cluster_->num_tablet_servers())->Restart());
665
0
        }
666
0
    });
667
0
  }
668
669
0
  int value = 0;
670
0
  size_t tries = 0;
671
0
  size_t written = 0;
672
0
  size_t flushed = 0;
673
0
  for (;;) {
674
0
    auto expired = std::chrono::steady_clock::now() >= stop;
675
0
    if (expired) {
676
0
      if (active_transactions.empty()) {
677
0
        break;
678
0
      }
679
0
      LOG(INFO) << "Time expired, remaining transactions: " << active_transactions.size();
680
0
      for (const auto& txn : active_transactions) {
681
0
        LOG(INFO) << "TXN: " << txn.ToString() << ", "
682
0
                  << (!txn.commit_future.valid() ? "Flushing" : "Committing");
683
0
      }
684
0
    }
685
0
    while (!expired && active_transactions.size() < options.active_transactions) {
686
0
      auto key = RandomUniformInt<int>(1, options.total_keys);
687
0
      ActiveTransaction active_txn;
688
0
      if (!options.non_txn_writes || RandomUniformBool()) {
689
0
        active_txn.transaction = CreateTransaction();
690
0
      }
691
0
      active_txn.session = CreateSession(active_txn.transaction);
692
0
      const auto op = table_.NewInsertOp();
693
0
      auto* const req = op->mutable_request();
694
0
      QLAddInt32HashValue(req, key);
695
0
      const auto val = ++value;
696
0
      table_.AddInt32ColumnValue(req, kValueColumn, val);
697
0
      LOG(INFO) << "TXN: " << active_txn.ToString() << " write " << key << " = " << val;
698
0
      active_txn.session->Apply(op);
699
0
      active_txn.flush_future = active_txn.session->FlushFuture();
700
701
0
      ++tries;
702
0
      active_transactions.push_back(std::move(active_txn));
703
0
    }
704
705
0
    auto w = active_transactions.begin();
706
0
    for (auto i = active_transactions.begin(); i != active_transactions.end(); ++i) {
707
0
      if (!i->commit_future.valid()) {
708
0
        if (IsReady(i->flush_future)) {
709
0
          auto flush_status = i->flush_future.get().status;
710
0
          if (!flush_status.ok()) {
711
0
            LOG(INFO) << "TXN: " << i->ToString() << ", flush failed: " << flush_status;
712
0
            continue;
713
0
          }
714
0
          ++flushed;
715
0
          LOG(INFO) << "TXN: " << i->ToString() << ", flushed";
716
0
          if (!i->transaction) {
717
0
            ++written;
718
0
            continue;
719
0
          }
720
0
          i->commit_future = i->transaction->CommitFuture();
721
0
        }
722
0
      } else if (IsReady(i->commit_future)) {
723
0
        auto commit_status = i->commit_future.get();
724
0
        if (!commit_status.ok()) {
725
0
          LOG(INFO) << "TXN: " << i->ToString() << ", commit failed: " << commit_status;
726
0
          continue;
727
0
        }
728
0
        LOG(INFO) << "TXN: " << i->ToString() << ", committed";
729
0
        ++written;
730
0
        continue;
731
0
      }
732
733
0
      if (w != i) {
734
0
        *w = std::move(*i);
735
0
      }
736
0
      ++w;
737
0
    }
738
0
    active_transactions.erase(w, active_transactions.end());
739
740
0
    std::this_thread::sleep_for(expired ? 1s : 100ms);
741
0
  }
742
743
0
  if (options.do_restarts) {
744
0
    restart_thread.join();
745
0
  }
746
747
0
  LOG(INFO) << "Written: " << written << ", flushed: " << flushed << ", tries: " << tries;
748
749
0
  ASSERT_GE(written, options.total_keys);
750
0
  ASSERT_GT(flushed, written);
751
0
  ASSERT_GT(flushed, options.active_transactions);
752
0
  ASSERT_GT(tries, flushed);
753
0
}
754
755
0
TEST_F_EX(QLTransactionTest, WriteConflicts, QLTransactionBigLogSegmentSizeTest) {
756
0
  WriteConflictsOptions options = {
757
0
    .do_restarts = false,
758
0
  };
759
0
  TestWriteConflicts(options);
760
0
}
761
762
0
TEST_F_EX(QLTransactionTest, WriteConflictsWithRestarts, QLTransactionBigLogSegmentSizeTest) {
763
0
  WriteConflictsOptions options = {
764
0
    .do_restarts = true,
765
0
  };
766
0
  TestWriteConflicts(options);
767
0
}
768
769
0
TEST_F_EX(QLTransactionTest, MixedWriteConflicts, QLTransactionBigLogSegmentSizeTest) {
770
0
  WriteConflictsOptions options = {
771
0
    .do_restarts = false,
772
0
    .active_transactions = 3,
773
0
    .total_keys = 1,
774
0
    .non_txn_writes = true,
775
0
  };
776
0
  TestWriteConflicts(options);
777
0
}
778
779
0
TEST_F(QLTransactionTest, ResolveIntentsWriteReadUpdateRead) {
780
0
  DisableApplyingIntents();
781
782
0
  WriteData();
783
0
  VerifyData();
784
785
0
  WriteData(WriteOpType::UPDATE);
786
0
  VerifyData(1, WriteOpType::UPDATE);
787
788
0
  ASSERT_OK(cluster_->RestartSync());
789
0
}
790
791
0
TEST_F(QLTransactionTest, ResolveIntentsWriteReadWithinTransactionAndRollback) {
792
0
  SetAtomicFlag(0ULL, &FLAGS_max_clock_skew_usec); // To avoid read restart in this test.
793
0
  DisableApplyingIntents();
794
795
  // Write { 1 -> 1, 2 -> 2 }.
796
0
  {
797
0
    auto session = CreateSession();
798
0
    ASSERT_OK(WriteRow(session, 1, 1));
799
0
    ASSERT_OK(WriteRow(session, 2, 2));
800
0
  }
801
802
0
  {
803
    // Start T1.
804
0
    auto txn = CreateTransaction();
805
0
    auto session = CreateSession(txn);
806
807
    // T1: Update { 1 -> 11, 2 -> 12 }.
808
0
    ASSERT_OK(UpdateRow(session, 1, 11));
809
0
    ASSERT_OK(UpdateRow(session, 2, 12));
810
811
    // T1: Should read { 1 -> 11, 2 -> 12 }.
812
0
    VERIFY_ROW(session, 1, 11);
813
0
    VERIFY_ROW(session, 2, 12);
814
815
    // Need to wait transaction to be replicated to all tablet replicas, otherwise direct intents
816
    // cleanup could not happen.
817
0
    ASSERT_OK(WaitFor([this] {
818
0
      return CountRunningTransactions() == 6;
819
0
    }, 10s, "Wait transactions replicated to all tablet replicas"));
820
821
0
    txn->Abort();
822
0
  }
823
824
0
  ASSERT_OK(WaitTransactionsCleaned());
825
826
  // Should read { 1 -> 1, 2 -> 2 }, since T1 has been aborted.
827
0
  {
828
0
    auto session = CreateSession();
829
0
    VERIFY_ROW(session, 1, 1);
830
0
    VERIFY_ROW(session, 2, 2);
831
0
  }
832
833
0
  ASSERT_OK(WaitIntentsCleaned());
834
835
0
  ASSERT_OK(cluster_->RestartSync());
836
0
}
837
838
0
TEST_F(QLTransactionTest, CheckCompactionAbortCleanup) {
839
0
  SetAtomicFlag(0ULL, &FLAGS_max_clock_skew_usec); // To avoid read restart in this test.
840
0
  FLAGS_TEST_disable_proactive_txn_cleanup_on_abort = true;
841
0
  FLAGS_aborted_intent_cleanup_ms = 1000; // 1 sec
842
843
  // Write { 1 -> 1, 2 -> 2 }.
844
0
  {
845
0
    auto session = CreateSession();
846
0
    ASSERT_OK(WriteRow(session, 1, 1));
847
0
    ASSERT_OK(WriteRow(session, 2, 2));
848
0
  }
849
850
0
  {
851
    // Start T1.
852
0
    auto txn = CreateTransaction();
853
0
    auto session = CreateSession(txn);
854
855
    // T1: Update { 1 -> 11, 2 -> 12 }.
856
0
    ASSERT_OK(UpdateRow(session, 1, 11));
857
0
    ASSERT_OK(UpdateRow(session, 2, 12));
858
859
    // T1: Should read { 1 -> 11, 2 -> 12 }.
860
0
    VERIFY_ROW(session, 1, 11);
861
0
    VERIFY_ROW(session, 2, 12);
862
863
0
    txn->Abort();
864
0
  }
865
866
0
  ASSERT_OK(WaitTransactionsCleaned());
867
868
0
  std::this_thread::sleep_for(std::chrono::milliseconds(FLAGS_aborted_intent_cleanup_ms));
869
0
  ASSERT_OK(cluster_->CompactTablets());
870
871
  // Should read { 1 -> 1, 2 -> 2 }, since T1 has been aborted.
872
0
  {
873
0
    auto session = CreateSession();
874
0
    VERIFY_ROW(session, 1, 1);
875
0
    VERIFY_ROW(session, 2, 2);
876
0
  }
877
878
0
  ASSERT_OK(WaitIntentsCleaned());
879
880
0
  ASSERT_OK(cluster_->RestartSync());
881
0
}
882
883
class QLTransactionTestWithDisabledCompactions : public QLTransactionTest {
884
 public:
885
1
  void SetUp() override {
886
1
    FLAGS_rocksdb_disable_compactions = true;
887
1
    QLTransactionTest::SetUp();
888
1
  }
889
};
890
891
0
TEST_F_EX(QLTransactionTest, IntentsCleanupAfterRestart, QLTransactionTestWithDisabledCompactions) {
892
0
  SetAtomicFlag(0ULL, &FLAGS_max_clock_skew_usec); // To avoid read restart in this test.
893
0
  FLAGS_TEST_disable_proactive_txn_cleanup_on_abort = true;
894
0
  FLAGS_aborted_intent_cleanup_ms = 1000; // 1 sec
895
0
  FLAGS_delete_intents_sst_files = false;
896
897
0
#ifndef NDEBUG
898
0
  constexpr int kTransactions = 10;
899
#else
900
  constexpr int kTransactions = 20;
901
#endif
902
  // Empirically determined constant.
903
0
  constexpr int kBytesPerRow = 75;
904
0
  constexpr int kRequiredCompactedBytes = kTransactions * kNumRows * kBytesPerRow;
905
906
0
  LOG(INFO) << "Write values";
907
908
0
  for (int i = 0; i != kTransactions; ++i) {
909
0
    SCOPED_TRACE(Format("Transaction $0", i));
910
0
    auto txn = CreateTransaction();
911
0
    auto session = CreateSession(txn);
912
0
    for (int row = 0; row != kNumRows; ++row) {
913
0
      ASSERT_OK(WriteRow(session, i * kNumRows + row, row));
914
0
    }
915
0
    ASSERT_OK(cluster_->FlushTablets(tablet::FlushMode::kAsync));
916
917
    // Need some time for flush to be initiated.
918
0
    std::this_thread::sleep_for(100ms);
919
920
0
    txn->Abort();
921
0
  }
922
923
0
  ASSERT_OK(WaitTransactionsCleaned());
924
925
0
  LOG(INFO) << "Shutdown cluster";
926
0
  cluster_->Shutdown();
927
928
0
  std::this_thread::sleep_for(FLAGS_aborted_intent_cleanup_ms * 1ms);
929
930
0
  FLAGS_TEST_delay_init_tablet_peer_ms = 100;
931
0
  FLAGS_rocksdb_disable_compactions = false;
932
933
0
  LOG(INFO) << "Start cluster";
934
0
  ASSERT_OK(cluster_->StartSync());
935
936
0
  ASSERT_OK(WaitFor([cluster = cluster_.get()] {
937
0
    auto peers = ListTabletPeers(cluster, ListPeersFilter::kAll);
938
0
    int64_t bytes = 0;
939
0
    for (const auto& peer : peers) {
940
0
      if (peer->tablet()) {
941
0
        bytes +=
942
0
            peer->tablet()->intentsdb_statistics()->getTickerCount(rocksdb::COMPACT_READ_BYTES);
943
0
      }
944
0
    }
945
0
    LOG(INFO) << "Compact read bytes: " << bytes;
946
947
0
    return bytes >= kRequiredCompactedBytes;
948
0
  }, 10s, "Enough compactions happen"));
949
0
}
950
951
0
TEST_F(QLTransactionTest, ResolveIntentsWriteReadBeforeAndAfterCommit) {
952
0
  SetAtomicFlag(0ULL, &FLAGS_max_clock_skew_usec); // To avoid read restart in this test.
953
0
  DisableApplyingIntents();
954
955
  // Write { 1 -> 1, 2 -> 2 }.
956
0
  {
957
0
    auto session = CreateSession();
958
0
    ASSERT_OK(WriteRow(session, 1, 1));
959
0
    ASSERT_OK(WriteRow(session, 2, 2));
960
0
  }
961
962
  // Start T1.
963
0
  auto txn1 = CreateTransaction();
964
0
  auto session1 = CreateSession(txn1);
965
966
  // T1: Update { 1 -> 11, 2 -> 12 }.
967
0
  ASSERT_OK(UpdateRow(session1, 1, 11));
968
0
  ASSERT_OK(UpdateRow(session1, 2, 12));
969
970
  // Start T2.
971
0
  auto txn2 = CreateTransaction();
972
0
  auto session2 = CreateSession(txn2);
973
974
  // T2: Should read { 1 -> 1, 2 -> 2 }.
975
0
  VERIFY_ROW(session2, 1, 1);
976
0
  VERIFY_ROW(session2, 2, 2);
977
978
  // T1: Commit
979
0
  CommitAndResetSync(&txn1);
980
981
  // T2: Should still read { 1 -> 1, 2 -> 2 }, because it should read at the time of it's start.
982
0
  VERIFY_ROW(session2, 1, 1);
983
0
  VERIFY_ROW(session2, 2, 2);
984
985
  // Simple read should get { 1 -> 11, 2 -> 12 }, since T1 has been already committed.
986
0
  {
987
0
    auto session = CreateSession();
988
0
    VERIFY_ROW(session, 1, 11);
989
0
    VERIFY_ROW(session, 2, 12);
990
0
  }
991
992
0
  ASSERT_NO_FATALS(CommitAndResetSync(&txn2));
993
994
0
  ASSERT_OK(cluster_->RestartSync());
995
0
}
996
997
0
TEST_F(QLTransactionTest, ResolveIntentsCheckConsistency) {
998
0
  SetAtomicFlag(0ULL, &FLAGS_max_clock_skew_usec); // To avoid read restart in this test.
999
0
  DisableApplyingIntents();
1000
1001
  // Write { 1 -> 1, 2 -> 2 }.
1002
0
  {
1003
0
    auto session = CreateSession();
1004
0
    ASSERT_OK(WriteRow(session, 1, 1));
1005
0
    ASSERT_OK(WriteRow(session, 2, 2));
1006
0
  }
1007
1008
  // Start T1.
1009
0
  auto txn1 = CreateTransaction();
1010
1011
  // T1: Update { 1 -> 11, 2 -> 12 }.
1012
0
  {
1013
0
    auto session = CreateSession(txn1);
1014
0
    ASSERT_OK(UpdateRow(session, 1, 11));
1015
0
    ASSERT_OK(UpdateRow(session, 2, 12));
1016
0
  }
1017
1018
  // T1: Request commit.
1019
0
  CountDownLatch commit_latch(1);
1020
0
  txn1->Commit([&commit_latch](const Status& status) {
1021
0
    ASSERT_OK(status);
1022
0
    commit_latch.CountDown(1);
1023
0
  });
1024
1025
  // Start T2.
1026
0
  auto txn2 = CreateTransaction(SetReadTime::kTrue);
1027
1028
  // T2: Should read { 1 -> 1, 2 -> 2 } even in case T1 is committed between reading k1 and k2.
1029
0
  {
1030
0
    auto session = CreateSession(txn2);
1031
0
    VERIFY_ROW(session, 1, 1);
1032
0
    commit_latch.Wait();
1033
0
    VERIFY_ROW(session, 2, 2);
1034
0
  }
1035
1036
  // Simple read should get { 1 -> 11, 2 -> 12 }, since T1 has been already committed.
1037
0
  {
1038
0
    auto session = CreateSession();
1039
0
    VERIFY_ROW(session, 1, 11);
1040
0
    VERIFY_ROW(session, 2, 12);
1041
0
  }
1042
1043
0
  CommitAndResetSync(&txn2);
1044
1045
0
  ASSERT_OK(cluster_->RestartSync());
1046
0
}
1047
1048
// This test launches write thread, that writes increasing value to key using transaction.
1049
// Then it launches multiple read threads, each of them tries to read this key and
1050
// verifies that its value is at least the same like it was written before read was started.
1051
//
1052
// It is don't for multiple keys sequentially. So those keys are located on different tablets
1053
// and tablet servers, and we test different cases of clock skew.
1054
0
TEST_F_EX(QLTransactionTest, CorrectStatusRequestBatching, QLTransactionBigLogSegmentSizeTest) {
1055
0
  const auto kClockSkew = 100ms;
1056
0
  constexpr auto kMinWrites = RegularBuildVsSanitizers(25, 1);
1057
0
  constexpr auto kMinReads = 10;
1058
0
  constexpr size_t kConcurrentReads = RegularBuildVsSanitizers<size_t>(20, 5);
1059
1060
0
  FLAGS_TEST_transaction_delay_status_reply_usec_in_tests = 200000;
1061
0
  SetAtomicFlag(std::chrono::microseconds(kClockSkew).count() * 3, &FLAGS_max_clock_skew_usec);
1062
1063
0
  auto delta_changers = SkewClocks(cluster_.get(), kClockSkew);
1064
1065
0
  for (int32_t key = 0; key != 10; ++key) {
1066
0
    std::atomic<bool> stop(false);
1067
0
    std::atomic<int32_t> value(0);
1068
1069
0
    std::thread write_thread([this, key, &stop, &value] {
1070
0
      CDSAttacher attacher;
1071
0
      auto session = CreateSession();
1072
0
      while (!stop) {
1073
0
        auto txn = CreateTransaction();
1074
0
        session->SetTransaction(txn);
1075
0
        auto write_result = WriteRow(session, key, value + 1);
1076
0
        if (write_result.ok()) {
1077
0
          auto status = txn->CommitFuture().get();
1078
0
          if (status.ok()) {
1079
0
            ++value;
1080
0
          }
1081
0
        }
1082
0
      }
1083
0
    });
1084
1085
0
    std::vector<std::thread> read_threads;
1086
0
    std::array<std::atomic<size_t>, kConcurrentReads> reads;
1087
0
    for (auto& read : reads) {
1088
0
      read.store(0);
1089
0
    }
1090
1091
0
    for (size_t i = 0; i != kConcurrentReads; ++i) {
1092
0
      read_threads.emplace_back([this, key, &stop, &value, &read = reads[i]] {
1093
0
        CDSAttacher attacher;
1094
0
        auto session = CreateSession();
1095
0
        StopOnFailure stop_on_failure(&stop);
1096
0
        while (!stop) {
1097
0
          auto value_before_start = value.load();
1098
0
          YBqlReadOpPtr op = ReadRow(session, key);
1099
0
          ASSERT_OK(session->Flush());
1100
0
          ASSERT_EQ(op->response().status(), QLResponsePB::YQL_STATUS_OK)
1101
0
                        << op->response().ShortDebugString();
1102
0
          auto rowblock = yb::ql::RowsResult(op.get()).GetRowBlock();
1103
0
          int32_t current_value;
1104
0
          if (rowblock->row_count() == 0) {
1105
0
            current_value = 0;
1106
0
          } else {
1107
0
            current_value = rowblock->row(0).column(0).int32_value();
1108
0
          }
1109
0
          ASSERT_GE(current_value, value_before_start);
1110
0
          ++read;
1111
0
        }
1112
0
        stop_on_failure.Success();
1113
0
      });
1114
0
    }
1115
1116
0
    WaitStopped(10s, &stop);
1117
1118
    // Already failed
1119
0
    bool failed = stop.exchange(true);
1120
0
    write_thread.join();
1121
1122
0
    for (auto& thread : read_threads) {
1123
0
      thread.join();
1124
0
    }
1125
1126
0
    if (failed) {
1127
0
      break;
1128
0
    }
1129
1130
0
    LOG(INFO) << "Writes: " << value.load() << ", reads: " << yb::ToString(reads);
1131
1132
0
    EXPECT_GE(value.load(), kMinWrites);
1133
0
    for (auto& read : reads) {
1134
0
      EXPECT_GE(read.load(), kMinReads);
1135
0
    }
1136
0
  }
1137
1138
0
  cluster_->Shutdown(); // Need to shutdown cluster before resetting clock back.
1139
0
  cluster_.reset();
1140
0
}
1141
1142
struct TransactionState {
1143
  YBTransactionPtr transaction;
1144
  std::shared_future<Result<TransactionMetadata>> metadata_future;
1145
  std::future<Status> commit_future;
1146
  std::future<Result<tserver::GetTransactionStatusResponsePB>> status_future;
1147
  TransactionMetadata metadata;
1148
  HybridTime status_time = HybridTime::kMin;
1149
  TransactionStatus last_status = TransactionStatus::PENDING;
1150
1151
  void CheckStatus() {
1152
    ASSERT_TRUE(status_future.valid());
1153
    ASSERT_EQ(status_future.wait_for(NonTsanVsTsan(3s, 10s)), std::future_status::ready);
1154
    auto resp = status_future.get();
1155
    ASSERT_OK(resp);
1156
1157
    ASSERT_EQ(1, resp->status().size());
1158
    ASSERT_EQ(1, resp->status_hybrid_time().size());
1159
1160
    if (resp->status(0) == TransactionStatus::ABORTED) {
1161
      ASSERT_TRUE(commit_future.valid());
1162
      transaction = nullptr;
1163
      return;
1164
    }
1165
1166
    auto new_time = HybridTime(resp->status_hybrid_time()[0]);
1167
    if (last_status == TransactionStatus::PENDING) {
1168
      if (resp->status(0) == TransactionStatus::PENDING) {
1169
        ASSERT_GE(new_time, status_time);
1170
      } else {
1171
        ASSERT_EQ(TransactionStatus::COMMITTED, resp->status(0));
1172
        ASSERT_GT(new_time, status_time);
1173
      }
1174
    } else {
1175
      ASSERT_EQ(last_status, TransactionStatus::COMMITTED);
1176
      ASSERT_EQ(resp->status(0), TransactionStatus::COMMITTED)
1177
          << "Bad transaction status: " << TransactionStatus_Name(resp->status(0));
1178
      ASSERT_EQ(status_time, new_time);
1179
    }
1180
    status_time = new_time;
1181
    last_status = resp->status(0);
1182
  }
1183
};
1184
1185
// Test transaction status evolution.
1186
// The following should happen:
1187
// If both previous and new transaction state are PENDING, then the new time of status is >= the
1188
// old time of status.
1189
// Previous - PENDING, new - COMMITTED, new_time > old_time.
1190
// Previous - COMMITTED, new - COMMITTED, new_time == old_time.
1191
// All other cases are invalid.
1192
0
TEST_F(QLTransactionTest, StatusEvolution) {
1193
  // We don't care about exact probability of create/commit operations.
1194
  // Just create rate should be higher than commit one.
1195
0
  const int kTransactionCreateChance = 10;
1196
0
  const int kTransactionCommitChance = 20;
1197
0
  size_t transactions_to_create = 10;
1198
0
  size_t active_transactions = 0;
1199
0
  std::vector<TransactionState> states;
1200
0
  rpc::Rpcs rpcs;
1201
0
  states.reserve(transactions_to_create);
1202
1203
0
  while (transactions_to_create || active_transactions) {
1204
0
    if (transactions_to_create &&
1205
0
        (!active_transactions || RandomWithChance(kTransactionCreateChance))) {
1206
0
      LOG(INFO) << "Create transaction";
1207
0
      auto txn = CreateTransaction();
1208
0
      {
1209
0
        auto session = CreateSession(txn);
1210
        // Insert using different keys to avoid conflicts.
1211
0
        int idx = narrow_cast<int>(states.size());
1212
0
        ASSERT_OK(WriteRow(session, idx, idx));
1213
0
      }
1214
0
      states.push_back({ txn, txn->GetMetadata() });
1215
0
      ++active_transactions;
1216
0
      --transactions_to_create;
1217
0
    }
1218
0
    if (active_transactions && RandomWithChance(kTransactionCommitChance)) {
1219
0
      LOG(INFO) << "Destroy transaction";
1220
0
      size_t idx = RandomUniformInt<size_t>(1, active_transactions);
1221
0
      for (auto& state : states) {
1222
0
        if (!state.transaction) {
1223
0
          continue;
1224
0
        }
1225
0
        if (!--idx) {
1226
0
          state.commit_future = state.transaction->CommitFuture();
1227
0
          break;
1228
0
        }
1229
0
      }
1230
0
    }
1231
1232
0
    for (auto& state : states) {
1233
0
      if (!state.transaction) {
1234
0
        continue;
1235
0
      }
1236
0
      if (state.metadata.isolation == IsolationLevel::NON_TRANSACTIONAL) {
1237
0
        if (!IsReady(state.metadata_future)) {
1238
0
          continue;
1239
0
        }
1240
0
        state.metadata = ASSERT_RESULT(Copy(state.metadata_future.get()));
1241
0
      }
1242
0
      tserver::GetTransactionStatusRequestPB req;
1243
0
      req.set_tablet_id(state.metadata.status_tablet);
1244
0
      req.add_transaction_id()->assign(
1245
0
          pointer_cast<const char*>(state.metadata.transaction_id.data()),
1246
0
          state.metadata.transaction_id.size());
1247
0
      state.status_future = rpc::WrapRpcFuture<tserver::GetTransactionStatusResponsePB>(
1248
0
          GetTransactionStatus, &rpcs)(
1249
0
              TransactionRpcDeadline(), nullptr /* tablet */, client_.get(), &req);
1250
0
    }
1251
0
    for (auto& state : states) {
1252
0
      if (!state.transaction) {
1253
0
        continue;
1254
0
      }
1255
0
      state.CheckStatus();
1256
0
      if (!state.transaction) {
1257
0
        --active_transactions;
1258
0
      }
1259
0
    }
1260
0
  }
1261
1262
0
  for (auto& state : states) {
1263
0
    ASSERT_EQ(state.commit_future.wait_for(NonTsanVsTsan(3s, 15s)), std::future_status::ready);
1264
0
  }
1265
0
}
1266
1267
// Writing multiple keys concurrently, each key is increasing by 1 at each step.
1268
// At the same time concurrently execute several transactions that read all those keys.
1269
// Suppose two transactions have read values t1_i and t2_i respectively.
1270
// And t1_j > t2_j for some j, then we expect that t1_i >= t2_i for all i.
1271
//
1272
// Suppose we have 2 transactions, both reading k1 (from tablet1) and k2 (from tablet2).
1273
// ht1 - read time of first transaction, and ht2 - read time of second transaction.
1274
// Suppose ht1 <= ht2 for simplicity.
1275
// Old value of k1 is v1before, and after ht_k1 it has v1after.
1276
// Old value of k2 is v2before, and after ht_k2 it has v2after.
1277
// ht_k1 <= ht1, ht_k2 <= ht1.
1278
//
1279
// Suppose following sequence of read requests:
1280
// 1) The read request for the first transaction arrives at tablet1 when it has safe read
1281
//    time < ht1. But it is already replicating k1 (with ht_k1). Then it would read v1before for k1.
1282
// 2) The read request for the second transaction arrives at tablet2 when it has safe read
1283
//    time < ht2. But it is already replicating k2 (with ht_k2). So it reads v2before for k2.
1284
// 3) The remaining read request requests arrive after the appropriate operations have replicated.
1285
//    So we get v2after in the first transaction and v1after for the second.
1286
// The read result for the first transaction (v1before, v2after), for the second is is
1287
// (v1after, v2before).
1288
//
1289
// Such read is inconsistent.
1290
//
1291
// This test addresses this issue.
1292
0
TEST_F_EX(QLTransactionTest, WaitRead, QLTransactionBigLogSegmentSizeTest) {
1293
0
  constexpr int kWriteThreads = 10;
1294
0
  constexpr size_t kCycles = 100;
1295
0
  constexpr size_t kConcurrentReads = 4;
1296
1297
0
  SetAtomicFlag(0ULL, &FLAGS_max_clock_skew_usec); // To avoid read restart in this test.
1298
1299
0
  std::atomic<bool> stop(false);
1300
0
  std::vector<std::thread> threads;
1301
1302
0
  for (int i = 0; i != kWriteThreads; ++i) {
1303
0
    threads.emplace_back([this, i, &stop] {
1304
0
      CDSAttacher attacher;
1305
0
      auto session = CreateSession();
1306
0
      int32_t value = 0;
1307
0
      while (!stop) {
1308
0
        ASSERT_OK(WriteRow(session, i, ++value));
1309
0
      }
1310
0
    });
1311
0
  }
1312
1313
0
  CountDownLatch latch(kConcurrentReads);
1314
1315
0
  std::vector<std::vector<YBqlReadOpPtr>> reads(kConcurrentReads);
1316
0
  std::vector<std::shared_future<Status>> futures(kConcurrentReads);
1317
  // values[i] contains values read by i-th transaction.
1318
0
  std::vector<std::vector<int32_t>> values(kConcurrentReads);
1319
1320
0
  for (size_t i = 0; i != kCycles; ++i) {
1321
0
    latch.Reset(kConcurrentReads);
1322
0
    for (size_t j = 0; j != kConcurrentReads; ++j) {
1323
0
      values[j].clear();
1324
0
      auto session = CreateSession(CreateTransaction());
1325
0
      for (int key = 0; key != kWriteThreads; ++key) {
1326
0
        reads[j].push_back(ReadRow(session, key));
1327
0
      }
1328
0
      session->FlushAsync([&latch](FlushStatus* flush_status) {
1329
0
        ASSERT_OK(flush_status->status);
1330
0
        latch.CountDown();
1331
0
      });
1332
0
    }
1333
0
    latch.Wait();
1334
0
    for (size_t j = 0; j != kConcurrentReads; ++j) {
1335
0
      values[j].clear();
1336
0
      for (auto& op : reads[j]) {
1337
0
        ASSERT_EQ(op->response().status(), QLResponsePB::YQL_STATUS_OK)
1338
0
            << op->response().ShortDebugString();
1339
0
        auto rowblock = yb::ql::RowsResult(op.get()).GetRowBlock();
1340
0
        if (rowblock->row_count() == 1) {
1341
0
          values[j].push_back(rowblock->row(0).column(0).int32_value());
1342
0
        } else {
1343
0
          values[j].push_back(0);
1344
0
        }
1345
0
      }
1346
0
    }
1347
0
    std::sort(values.begin(), values.end());
1348
0
    for (size_t j = 1; j != kConcurrentReads; ++j) {
1349
0
      for (size_t k = 0; k != values[j].size(); ++k) {
1350
0
        ASSERT_GE(values[j][k], values[j - 1][k]);
1351
0
      }
1352
0
    }
1353
0
  }
1354
1355
0
  stop = true;
1356
0
  for (auto& thread : threads) {
1357
0
    thread.join();
1358
0
  }
1359
0
}
1360
1361
0
TEST_F(QLTransactionTest, InsertDelete) {
1362
0
  DisableApplyingIntents();
1363
1364
0
  auto txn = CreateTransaction();
1365
0
  auto session = CreateSession(txn);
1366
0
  ASSERT_OK(WriteRow(session, 1 /* key */, 10 /* value */, WriteOpType::INSERT));
1367
0
  ASSERT_OK(DeleteRow(session, 1 /* key */));
1368
0
  ASSERT_OK(txn->CommitFuture().get());
1369
1370
0
  session = CreateSession();
1371
0
  auto row = SelectRow(session, 1 /* key */);
1372
0
  ASSERT_FALSE(row.ok()) << "Row: " << row;
1373
0
}
1374
1375
0
TEST_F(QLTransactionTest, InsertDeleteWithClusterRestart) {
1376
0
  DisableApplyingIntents();
1377
0
  DisableTransactionTimeout();
1378
0
  constexpr int kKeys = 100;
1379
1380
0
  for (int i = 0; i != kKeys; ++i) {
1381
0
    ASSERT_OK(WriteRow(CreateSession(), i /* key */, i * 2 /* value */, WriteOpType::INSERT));
1382
0
  }
1383
1384
0
  auto txn = CreateTransaction();
1385
0
  auto session = CreateSession(txn);
1386
0
  for (int i = 0; i != kKeys; ++i) {
1387
0
    SCOPED_TRACE(Format("Key: $0", i));
1388
0
    ASSERT_OK(WriteRow(session, i /* key */, i * 3 /* value */, WriteOpType::UPDATE));
1389
0
  }
1390
1391
0
  std::this_thread::sleep_for(1s); // Wait some time for intents to populate.
1392
0
  ASSERT_OK(cluster_->RestartSync());
1393
1394
0
  for (int i = 0; i != kKeys; ++i) {
1395
0
    SCOPED_TRACE(Format("Key: $0", i));
1396
0
    ASSERT_OK(DeleteRow(session, i /* key */));
1397
0
  }
1398
0
  ASSERT_OK(txn->CommitFuture().get());
1399
1400
0
  session = CreateSession();
1401
0
  for (int i = 0; i != kKeys; ++i) {
1402
0
    SCOPED_TRACE(Format("Key: $0", i));
1403
0
    auto row = SelectRow(session, 1 /* key */);
1404
0
    ASSERT_FALSE(row.ok()) << "Row: " << row;
1405
0
  }
1406
0
}
1407
1408
0
TEST_F_EX(QLTransactionTest, ChangeLeader, QLTransactionBigLogSegmentSizeTest) {
1409
0
  constexpr size_t kThreads = 2;
1410
0
  constexpr auto kTestTime = 5s;
1411
1412
0
  DisableTransactionTimeout();
1413
0
  FLAGS_transaction_rpc_timeout_ms = MonoDelta(1min).ToMilliseconds();
1414
1415
0
  std::vector<std::thread> threads;
1416
0
  std::atomic<bool> stopped{false};
1417
0
  std::atomic<int> successes{0};
1418
0
  std::atomic<int> expirations{0};
1419
0
  for (size_t i = 0; i != kThreads; ++i) {
1420
0
    threads.emplace_back([this, i, &stopped, &successes, &expirations] {
1421
0
      CDSAttacher attacher;
1422
0
      size_t idx = i;
1423
0
      while (!stopped) {
1424
0
        auto txn = CreateTransaction();
1425
0
        ASSERT_OK(WriteRows(CreateSession(txn), idx, WriteOpType::INSERT));
1426
0
        auto status = txn->CommitFuture().get();
1427
0
        if (status.ok()) {
1428
0
          ++successes;
1429
0
        } else {
1430
          // We allow expiration on commit, because it means that commit succeed after leader
1431
          // change. And we just did not receive respose. But rate of such cases should be small.
1432
0
          ASSERT_TRUE(status.IsExpired()) << status;
1433
0
          ++expirations;
1434
0
        }
1435
0
        idx += kThreads;
1436
0
      }
1437
0
    });
1438
0
  }
1439
1440
0
  auto test_finish = std::chrono::steady_clock::now() + kTestTime;
1441
0
  while (std::chrono::steady_clock::now() < test_finish) {
1442
0
    for (size_t i = 0; i != cluster_->num_tablet_servers(); ++i) {
1443
0
      auto peers = cluster_->mini_tablet_server(i)->server()->tablet_manager()->GetTabletPeers();
1444
0
      for (const auto& peer : peers) {
1445
0
        if (peer->consensus() &&
1446
0
            peer->consensus()->GetLeaderStatus() !=
1447
0
                consensus::LeaderStatus::NOT_LEADER &&
1448
0
            peer->tablet()->transaction_coordinator() &&
1449
0
            peer->tablet()->transaction_coordinator()->test_count_transactions()) {
1450
0
          consensus::LeaderStepDownRequestPB req;
1451
0
          req.set_tablet_id(peer->tablet_id());
1452
0
          consensus::LeaderStepDownResponsePB resp;
1453
0
          ASSERT_OK(peer->consensus()->StepDown(&req, &resp));
1454
0
        }
1455
0
      }
1456
0
    }
1457
0
    std::this_thread::sleep_for(3s);
1458
0
  }
1459
0
  stopped = true;
1460
1461
0
  for (auto& thread : threads) {
1462
0
    thread.join();
1463
0
  }
1464
1465
  // Allow expirations to be 5% of successful commits.
1466
0
  ASSERT_LE(expirations.load() * 100, successes * 5);
1467
0
}
1468
1469
class RemoteBootstrapTest : public QLTransactionTest {
1470
 protected:
1471
1
  void SetUp() override {
1472
1
    FLAGS_remote_bootstrap_max_chunk_size = 1_KB;
1473
1
    FLAGS_log_min_seconds_to_retain = 1;
1474
1
    QLTransactionTest::SetUp();
1475
1
  }
1476
};
1477
1478
// Check that we do correct remote bootstrap for intents db.
1479
// Workflow is the following:
1480
// Shutdown TServer with index 0.
1481
// Write some data to two remaining servers.
1482
// Flush data and clean logs.
1483
// Restart cluster.
1484
// Verify that all tablets at all tservers are up and running.
1485
// Verify that all tservers have same amount of running tablets.
1486
// During test tear down cluster verifier will check that all servers have same data.
1487
0
TEST_F_EX(QLTransactionTest, RemoteBootstrap, RemoteBootstrapTest) {
1488
0
  constexpr size_t kNumWrites = 10;
1489
0
  constexpr size_t kTransactionalWrites = 8;
1490
0
  constexpr size_t kNumRows = 30;
1491
1492
0
  DisableTransactionTimeout();
1493
0
  DisableApplyingIntents();
1494
1495
0
  cluster_->mini_tablet_server(0)->Shutdown();
1496
1497
0
  for (size_t i = 0; i != kNumWrites; ++i) {
1498
0
    auto transaction = i < kTransactionalWrites ? CreateTransaction() : nullptr;
1499
0
    auto session = CreateSession(transaction);
1500
0
    for (size_t r = 0; r != kNumRows; ++r) {
1501
0
      ASSERT_OK(WriteRow(
1502
0
          session,
1503
0
          KeyForTransactionAndIndex(i, r),
1504
0
          ValueForTransactionAndIndex(i, r, WriteOpType::INSERT)));
1505
0
    }
1506
0
    if (transaction) {
1507
0
      ASSERT_OK(transaction->CommitFuture().get());
1508
0
    }
1509
0
  }
1510
1511
0
  VerifyData(kNumWrites);
1512
1513
  // Wait until all tablets done writing to db.
1514
0
  std::this_thread::sleep_for(5s);
1515
1516
0
  LOG(INFO) << "Flushing";
1517
0
  ASSERT_OK(cluster_->FlushTablets());
1518
1519
0
  LOG(INFO) << "Clean logs";
1520
0
  ASSERT_OK(cluster_->CleanTabletLogs());
1521
1522
  // Wait logs cleanup.
1523
0
  std::this_thread::sleep_for(5s * kTimeMultiplier);
1524
1525
  // Shutdown to reset cached logs.
1526
0
  for (size_t i = 1; i != cluster_->num_tablet_servers(); ++i) {
1527
0
    cluster_->mini_tablet_server(i)->Shutdown();
1528
0
  }
1529
1530
  // Start all servers. Cluster verifier should check that all tablets are synchronized.
1531
0
  for (size_t i = 0; i != cluster_->num_tablet_servers(); ++i) {
1532
0
    ASSERT_OK(cluster_->mini_tablet_server(i)->Start());
1533
0
  }
1534
1535
0
  ASSERT_OK(WaitFor([this] { return CheckAllTabletsRunning(); }, 20s * kTimeMultiplier,
1536
0
                    "All tablets running"));
1537
0
}
1538
1539
0
TEST_F(QLTransactionTest, FlushIntents) {
1540
0
  FLAGS_flush_rocksdb_on_shutdown = false;
1541
1542
0
  WriteData();
1543
0
  ASSERT_OK(WriteRows(CreateSession(), 1));
1544
1545
0
  VerifyData(2);
1546
1547
0
  ASSERT_OK(cluster_->FlushTablets(tablet::FlushMode::kSync, tablet::FlushFlags::kIntents));
1548
0
  cluster_->Shutdown();
1549
0
  ASSERT_OK(cluster_->StartSync());
1550
1551
0
  VerifyData(2);
1552
0
}
1553
1554
// This test checks that read restart never happen during first read request to single table.
1555
0
TEST_F_EX(QLTransactionTest, PickReadTimeAtServer, QLTransactionBigLogSegmentSizeTest) {
1556
0
  constexpr int kKeys = 10;
1557
0
  constexpr int kThreads = 5;
1558
1559
0
  std::atomic<bool> stop(false);
1560
0
  std::vector<std::thread> threads;
1561
0
  while (threads.size() != kThreads) {
1562
0
    threads.emplace_back([this, &stop] {
1563
0
      CDSAttacher attacher;
1564
0
      StopOnFailure stop_on_failure(&stop);
1565
0
      while (!stop.load(std::memory_order_acquire)) {
1566
0
        auto txn = CreateTransaction();
1567
0
        auto session = CreateSession(txn);
1568
0
        auto key = RandomUniformInt(1, kKeys);
1569
0
        auto value_result = SelectRow(session, key);
1570
0
        int value;
1571
0
        if (value_result.ok()) {
1572
0
          value = *value_result;
1573
0
        } else {
1574
0
          ASSERT_TRUE(value_result.status().IsNotFound()) << value_result.status();
1575
0
          value = 0;
1576
0
        }
1577
0
        auto status = ResultToStatus(WriteRow(session, key, value));
1578
0
        if (status.ok()) {
1579
0
          status = txn->CommitFuture().get();
1580
0
        }
1581
        // Write or commit could fail because of conflict during write or transaction conflict
1582
        // during commit.
1583
0
        ASSERT_TRUE(status.ok() || status.IsTryAgain() || status.IsExpired()) << status;
1584
0
      }
1585
0
      stop_on_failure.Success();
1586
0
    });
1587
0
  }
1588
1589
0
  WaitStopped(30s, &stop);
1590
1591
0
  stop.store(true, std::memory_order_release);
1592
1593
0
  for (auto& thread : threads) {
1594
0
    thread.join();
1595
0
  }
1596
0
}
1597
1598
// Test that we could init transaction after it was originally created.
1599
0
TEST_F(QLTransactionTest, DelayedInit) {
1600
0
  SetAtomicFlag(0ULL, &FLAGS_max_clock_skew_usec); // To avoid read restart in this test.
1601
1602
0
  auto txn1 = std::make_shared<YBTransaction>(transaction_manager_.get_ptr());
1603
0
  auto txn2 = std::make_shared<YBTransaction>(transaction_manager_.get_ptr());
1604
1605
0
  auto write_session = CreateSession();
1606
0
  ASSERT_OK(WriteRow(write_session, 0, 0));
1607
1608
0
  ConsistentReadPoint read_point(transaction_manager_->clock());
1609
0
  read_point.SetCurrentReadTime();
1610
1611
0
  ASSERT_OK(WriteRow(write_session, 1, 1));
1612
1613
0
  ASSERT_OK(txn1->Init(IsolationLevel::SNAPSHOT_ISOLATION, read_point.GetReadTime()));
1614
  // To check delayed init we specify read time here.
1615
0
  ASSERT_OK(txn2->Init(
1616
0
      IsolationLevel::SNAPSHOT_ISOLATION,
1617
0
      ReadHybridTime::FromHybridTimeRange(transaction_manager_->clock()->NowRange())));
1618
1619
0
  ASSERT_OK(WriteRow(write_session, 2, 2));
1620
1621
0
  {
1622
0
    auto read_session = CreateSession(txn1);
1623
0
    auto row0 = ASSERT_RESULT(SelectRow(read_session, 0));
1624
0
    ASSERT_EQ(0, row0);
1625
0
    auto row1 = SelectRow(read_session, 1);
1626
0
    ASSERT_TRUE(!row1.ok() && row1.status().IsNotFound()) << row1;
1627
0
    auto row2 = SelectRow(read_session, 2);
1628
0
    ASSERT_TRUE(!row2.ok() && row2.status().IsNotFound()) << row2;
1629
0
  }
1630
1631
0
  {
1632
0
    auto read_session = CreateSession(txn2);
1633
0
    auto row0 = ASSERT_RESULT(SelectRow(read_session, 0));
1634
0
    ASSERT_EQ(0, row0);
1635
0
    auto row1 = ASSERT_RESULT(SelectRow(read_session, 1));
1636
0
    ASSERT_EQ(1, row1);
1637
0
    auto row2 = SelectRow(read_session, 2);
1638
0
    ASSERT_TRUE(!row2.ok() && row2.status().IsNotFound()) << row2;
1639
0
  }
1640
0
}
1641
1642
class QLTransactionTestSingleTablet :
1643
    public TransactionCustomLogSegmentSizeTest<4_KB, QLTransactionTest> {
1644
 public:
1645
0
  int NumTablets() override {
1646
0
    return 1;
1647
0
  }
1648
};
1649
1650
0
TEST_F_EX(QLTransactionTest, DeleteFlushedIntents, QLTransactionTestSingleTablet) {
1651
0
  constexpr int kNumWrites = 10;
1652
1653
0
  auto session = CreateSession();
1654
0
  for (size_t idx = 0; idx != kNumWrites; ++idx) {
1655
0
    auto txn = CreateTransaction();
1656
0
    session->SetTransaction(txn);
1657
0
    ASSERT_OK(WriteRows(session, idx, WriteOpType::INSERT));
1658
0
    ASSERT_OK(cluster_->FlushTablets(tablet::FlushMode::kSync, tablet::FlushFlags::kIntents));
1659
0
    ASSERT_OK(txn->CommitFuture().get());
1660
0
  }
1661
1662
0
  ASSERT_OK(WaitFor([this] {
1663
0
    if (CountIntents(cluster_.get()) != 0) {
1664
0
      return false;
1665
0
    }
1666
1667
0
    auto peers = ListTabletPeers(cluster_.get(), ListPeersFilter::kAll);
1668
0
    size_t total_sst_files = 0;
1669
0
    for (auto& peer : peers) {
1670
0
      auto intents_db = peer->tablet()->TEST_intents_db();
1671
0
      if (!intents_db) {
1672
0
        continue;
1673
0
      }
1674
0
      std::vector<rocksdb::LiveFileMetaData> files;
1675
0
      intents_db->GetLiveFilesMetaData(&files);
1676
0
      LOG(INFO) << "T " << peer->tablet_id() << " P " << peer->permanent_uuid() << ": files: "
1677
0
                << AsString(files);
1678
0
      total_sst_files += files.size();
1679
0
    }
1680
1681
0
    return total_sst_files == 0;
1682
0
  }, 15s, "Intents and files are removed"));
1683
0
}
1684
1685
// Test performs transactional writes to get flushed intents.
1686
// Then performs non transactional writes and checks that log size stabilizes, meaning
1687
// log gc is working.
1688
0
TEST_F_EX(QLTransactionTest, GCLogsAfterTransactionalWritesStop, QLTransactionTestSingleTablet) {
1689
  // An amount of time during which we require log size to be stable.
1690
0
  const MonoDelta kStableTimePeriod = 10s;
1691
0
  const MonoDelta kTimeout = 30s + kStableTimePeriod;
1692
1693
0
  LOG(INFO) << "Perform transactional writes, to get non empty intents db";
1694
0
  TestThreadHolder thread_holder;
1695
0
  std::atomic<bool> use_transaction(true);
1696
  // This thread first does transactional writes and then switches to doing non-transactional
1697
  // writes.
1698
0
  thread_holder.AddThreadFunctor([this, &use_transaction, &stop = thread_holder.stop_flag()] {
1699
0
    SetFlagOnExit set_flag_on_exit(&stop);
1700
0
    auto session = CreateSession();
1701
0
    int txn_idx = 0;
1702
0
    while (!stop.load(std::memory_order_acquire)) {
1703
0
      YBTransactionPtr write_txn = use_transaction.load(std::memory_order_acquire)
1704
0
          ? CreateTransaction() : nullptr;
1705
0
      session->SetTransaction(write_txn);
1706
0
      ASSERT_OK(WriteRows(session, txn_idx++));
1707
0
      if (write_txn) {
1708
0
        ASSERT_OK(write_txn->CommitFuture().get());
1709
0
      }
1710
0
    }
1711
0
  });
1712
  // Waiting for some intent SSTables to be flushed.
1713
0
  bool has_flushed_intents_db = false;
1714
0
  while (!has_flushed_intents_db && !thread_holder.stop_flag().load(std::memory_order_acquire)) {
1715
0
    ASSERT_OK(cluster_->FlushTablets());
1716
0
    auto peers = ListTabletPeers(cluster_.get(), ListPeersFilter::kLeaders);
1717
0
    for (const auto& peer : peers) {
1718
0
      auto* tablet = peer->tablet();
1719
0
      if (!tablet) {
1720
0
        continue;
1721
0
      }
1722
0
      auto persistent_op_id = ASSERT_RESULT(tablet->MaxPersistentOpId());
1723
0
      if (persistent_op_id.intents.index > 10) {
1724
0
        has_flushed_intents_db = true;
1725
0
        break;
1726
0
      }
1727
0
    }
1728
0
    std::this_thread::sleep_for(10ms);
1729
0
  }
1730
1731
  // We are expecting the log size to stay bounded, which means the maximum log size we've ever
1732
  // seen for any tablet should stabilize. That would indicate that the bug with unbounded log
1733
  // growth (https://github.com/YugaByte/yugabyte-db/issues/2221) is not happening.
1734
0
  LOG(INFO) << "Perform non transactional writes";
1735
1736
0
  use_transaction.store(false, std::memory_order_release);
1737
0
  uint64_t max_log_size = 0;
1738
0
  auto last_log_size_increment = CoarseMonoClock::now();
1739
0
  auto deadline = last_log_size_increment + kTimeout;
1740
0
  while (!thread_holder.stop_flag().load(std::memory_order_acquire)) {
1741
0
    auto now = CoarseMonoClock::now();
1742
0
    ASSERT_OK(cluster_->FlushTablets(tablet::FlushMode::kSync, tablet::FlushFlags::kRegular));
1743
0
    ASSERT_OK(cluster_->CleanTabletLogs());
1744
0
    auto peers = ListTabletPeers(cluster_.get(), ListPeersFilter::kLeaders);
1745
0
    for (const auto& peer : peers) {
1746
0
      auto* log = peer->log();
1747
0
      if (!log) {
1748
0
        continue;
1749
0
      }
1750
0
      uint64_t current_log_size = log->OnDiskSize();
1751
0
      if (current_log_size > max_log_size) {
1752
0
        LOG(INFO) << Format("T $1 P $0: Log size increased: $2", peer->permanent_uuid(),
1753
0
                            peer->tablet_id(), current_log_size);
1754
0
        last_log_size_increment = now;
1755
0
        max_log_size = current_log_size;
1756
0
      }
1757
0
    }
1758
0
    if (now - last_log_size_increment > kStableTimePeriod) {
1759
0
      break;
1760
0
    } else {
1761
0
      ASSERT_LE(last_log_size_increment + kStableTimePeriod, deadline)
1762
0
          << "Log size would not stabilize in " << kTimeout;
1763
0
    }
1764
0
    std::this_thread::sleep_for(100ms);
1765
0
  }
1766
1767
0
  thread_holder.Stop();
1768
0
}
1769
1770
0
TEST_F(QLTransactionTest, DeleteTableDuringWrite) {
1771
0
  DisableApplyingIntents();
1772
0
  ASSERT_NO_FATALS(WriteData());
1773
0
  ASSERT_OK(client_->DeleteTable(table_.table()->id()));
1774
0
  SetIgnoreApplyingProbability(0.0);
1775
0
  ASSERT_OK(WaitFor([this] {
1776
0
    return !HasTransactions();
1777
0
  }, 10s * kTimeMultiplier, "Cleanup transactions from coordinator"));
1778
0
}
1779
1780
} // namespace client
1781
} // namespace yb