YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/client/seal-txn-test.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include "yb/client/error.h"
15
#include "yb/client/session.h"
16
#include "yb/client/transaction.h"
17
#include "yb/client/txn-test-base.h"
18
19
#include "yb/tablet/tablet.h"
20
#include "yb/tablet/tablet_peer.h"
21
#include "yb/tablet/transaction_participant.h"
22
23
#include "yb/util/bitmap.h"
24
#include "yb/util/tsan_util.h"
25
26
using namespace std::literals;
27
28
DECLARE_bool(enable_load_balancing);
29
DECLARE_bool(enable_transaction_sealing);
30
DECLARE_bool(TEST_fail_on_replicated_batch_idx_set_in_txn_record);
31
DECLARE_double(transaction_max_missed_heartbeat_periods);
32
DECLARE_int32(TEST_write_rejection_percentage);
33
DECLARE_int64(transaction_rpc_timeout_ms);
34
35
namespace yb {
36
namespace client {
37
38
class SealTxnTest : public TransactionTestBase<MiniCluster> {
39
 protected:
40
6
  void SetUp() override {
41
6
    FLAGS_enable_transaction_sealing = true;
42
43
6
    SetIsolationLevel(IsolationLevel::SNAPSHOT_ISOLATION);
44
6
    TransactionTestBase::SetUp();
45
6
  }
46
47
  void TestNumBatches(bool restart);
48
};
49
50
// Writes some data as part of transaction and check that batches are correcly tracked by
51
// transaction participant.
52
0
void SealTxnTest::TestNumBatches(bool restart) {
53
0
  auto txn = CreateTransaction();
54
0
  auto session = CreateSession(txn);
55
56
0
  size_t prev_num_non_empty = 0;
57
0
  for (auto op_type : {WriteOpType::INSERT, WriteOpType::UPDATE}) {
58
0
    ASSERT_OK(WriteRows(session, /* transaction= */ 0, op_type, Flush::kFalse));
59
0
    ASSERT_OK(session->Flush());
60
61
0
    size_t num_non_empty = 0;
62
0
    auto peers = ListTabletPeers(cluster_.get(), ListPeersFilter::kLeaders);
63
0
    for (const auto& peer : peers) {
64
0
      auto txn_participant = peer->tablet()->transaction_participant();
65
0
      if (!txn_participant) {
66
0
        continue;
67
0
      }
68
0
      auto replicated_batch_idx_set = txn_participant->TEST_TransactionReplicatedBatches(txn->id());
69
0
      LOG(INFO) << peer->tablet_id() << ": " << replicated_batch_idx_set.ToString();
70
0
      if (replicated_batch_idx_set.CountSet() != 0) {
71
0
        ++num_non_empty;
72
0
        ASSERT_EQ(replicated_batch_idx_set.ToString(),
73
0
                  op_type == WriteOpType::INSERT ? "[0]" : "[0, 1]");
74
0
      }
75
0
    }
76
77
0
    if (op_type == WriteOpType::INSERT) {
78
0
      ASSERT_GT(num_non_empty, 0);
79
0
      if (restart) {
80
0
        ASSERT_OK(cluster_->RestartSync());
81
0
      }
82
0
    } else {
83
0
      ASSERT_EQ(num_non_empty, prev_num_non_empty);
84
0
    }
85
0
    prev_num_non_empty = num_non_empty;
86
0
  }
87
0
}
88
89
0
TEST_F(SealTxnTest, NumBatches) {
90
0
  TestNumBatches(/* restart= */ false);
91
0
}
92
93
0
TEST_F(SealTxnTest, NumBatchesWithRestart) {
94
  // Restarting whole cluster could result in expired transaction, that is not expected by the test.
95
  // Increase transaction timeout to avoid such kind of failures.
96
0
  FLAGS_transaction_max_missed_heartbeat_periods = 50;
97
0
  TestNumBatches(/* restart= */ true);
98
0
}
99
100
0
TEST_F(SealTxnTest, NumBatchesWithRejection) {
101
0
  FLAGS_TEST_write_rejection_percentage = 75;
102
0
  TestNumBatches(/* restart= */ false);
103
0
}
104
105
// Check that we could disable writing information about the number of batches,
106
// since it is required for backward compatibility.
107
0
TEST_F(SealTxnTest, NumBatchesDisable) {
108
0
  DisableTransactionTimeout();
109
  // Should be enough for the restarted servers to be back online
110
0
  FLAGS_transaction_rpc_timeout_ms = 20000 * kTimeMultiplier;
111
0
  FLAGS_enable_transaction_sealing = false;
112
0
  FLAGS_TEST_fail_on_replicated_batch_idx_set_in_txn_record = true;
113
114
0
  auto txn = CreateTransaction();
115
0
  auto session = CreateSession(txn);
116
0
  ASSERT_OK(WriteRows(session));
117
0
  ASSERT_OK(cluster_->RestartSync());
118
0
  ASSERT_OK(txn->CommitFuture().get());
119
0
}
120
121
0
TEST_F(SealTxnTest, Simple) {
122
0
  auto txn = CreateTransaction();
123
0
  auto session = CreateSession(txn);
124
0
  ASSERT_OK(WriteRows(session, /* transaction = */ 0, WriteOpType::INSERT, Flush::kFalse));
125
0
  auto flush_future = session->FlushFuture();
126
0
  auto commit_future = txn->CommitFuture(CoarseTimePoint(), SealOnly::kTrue);
127
0
  ASSERT_OK(flush_future.get().status);
128
0
  LOG(INFO) << "Flushed: " << txn->id();
129
0
  ASSERT_OK(commit_future.get());
130
0
  LOG(INFO) << "Committed: " << txn->id();
131
0
  ASSERT_NO_FATALS(VerifyData());
132
0
  ASSERT_OK(cluster_->RestartSync());
133
0
  AssertNoRunningTransactions();
134
0
}
135
136
0
TEST_F(SealTxnTest, Update) {
137
0
  auto txn = CreateTransaction();
138
0
  auto session = CreateSession(txn);
139
0
  LOG(INFO) << "Inserting rows";
140
0
  ASSERT_OK(WriteRows(session, /* transaction = */ 0));
141
0
  LOG(INFO) << "Updating rows";
142
0
  ASSERT_OK(WriteRows(session, /* transaction = */ 0, WriteOpType::UPDATE, Flush::kFalse));
143
0
  auto flush_future = session->FlushFuture();
144
0
  auto commit_future = txn->CommitFuture(CoarseTimePoint(), SealOnly::kTrue);
145
0
  ASSERT_OK(flush_future.get().status);
146
0
  LOG(INFO) << "Flushed: " << txn->id();
147
0
  ASSERT_OK(commit_future.get());
148
0
  LOG(INFO) << "Committed: " << txn->id();
149
0
  ASSERT_NO_FATALS(VerifyData(1, WriteOpType::UPDATE));
150
0
  ASSERT_OK(cluster_->RestartSync());
151
0
  AssertNoRunningTransactions();
152
0
}
153
154
} // namespace client
155
} // namespace yb