YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/ent/src/yb/integration-tests/transaction-ent-test.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include <boost/container/stable_vector.hpp>
15
16
#include "yb/client/ql-dml-test-base.h"
17
#include "yb/client/session.h"
18
#include "yb/client/transaction.h"
19
#include "yb/client/transaction_manager.h"
20
21
#include "yb/gutil/casts.h"
22
23
#include "yb/server/hybrid_clock.h"
24
#include "yb/server/random_error_clock.h"
25
26
#include "yb/util/random_util.h"
27
#include "yb/util/thread.h"
28
29
#include "yb/yql/cql/ql/util/errcodes.h"
30
31
using namespace std::literals;
32
33
DECLARE_string(time_source);
34
DECLARE_int32(intents_flush_max_delay_ms);
35
DECLARE_bool(fail_on_out_of_range_clock_skew);
36
37
namespace yb {
38
39
class TransactionEntTest : public client::KeyValueTableTest<MiniCluster> {
40
 protected:
41
0
  virtual ~TransactionEntTest() {}
42
43
0
  void SetUp() override {
44
0
    FLAGS_fail_on_out_of_range_clock_skew = false;
45
46
0
    server::RandomErrorClock::Register();
47
0
    FLAGS_time_source = server::RandomErrorClock::kNtpName;
48
0
    KeyValueTableTest::SetUp();
49
50
0
    CreateTable(client::Transactional::kTrue);
51
52
0
    FLAGS_intents_flush_max_delay_ms = 250;
53
54
0
    HybridTime::TEST_SetPrettyToString(true);
55
0
  }
56
57
0
  client::TransactionManager& CreateTransactionManager() {
58
0
    auto random_error_clock = server::RandomErrorClock::CreateNtpClock();
59
0
    server::ClockPtr clock(new server::HybridClock(random_error_clock));
60
0
    EXPECT_OK(clock->Init());
61
62
0
    std::lock_guard<std::mutex> lock(transaction_managers_mutex_);
63
0
    transaction_managers_.emplace_back(client_.get(), clock, client::LocalTabletFilter());
64
0
    return transaction_managers_.back();
65
0
  }
66
67
  std::mutex transaction_managers_mutex_;
68
  boost::container::stable_vector<client::TransactionManager> transaction_managers_;
69
};
70
71
72
0
TEST_F(TransactionEntTest, RandomErrorClock) {
73
0
  static constexpr size_t kNumReaders = 3;
74
0
  static constexpr size_t kNumKeys = 5;
75
76
0
  struct RandomErrorClockShare {
77
0
    std::atomic<bool> stopped{false};
78
0
    std::array<std::atomic<int>, kNumKeys> values;
79
0
    std::atomic<size_t> reads{0};
80
0
  };
81
82
0
  std::vector<std::thread> threads;
83
0
  RandomErrorClockShare share;
84
0
  for (int32_t key = 0; key != narrow_cast<int32_t>(share.values.size()); ++key) {
85
0
    share.values[key].store(0, std::memory_order_release);
86
0
    ASSERT_OK(WriteRow(CreateSession(), key, 0));
87
0
  }
88
89
0
  while (threads.size() < share.values.size()) {
90
0
    threads.emplace_back([this, &share, key = narrow_cast<int>(threads.size())] {
91
0
      CDSAttacher attacher;
92
0
      auto& transaction_manager = CreateTransactionManager();
93
0
      auto session = CreateSession();
94
0
      while (!share.stopped.load(std::memory_order_acquire)) {
95
0
        auto transaction = std::make_shared<client::YBTransaction>(&transaction_manager);
96
0
        ASSERT_OK(transaction->Init(IsolationLevel::SNAPSHOT_ISOLATION));
97
0
        session->SetTransaction(transaction);
98
0
        auto value = share.values[key].load(std::memory_order_acquire);
99
0
        auto new_value = value + 1;
100
0
        auto write_result = WriteRow(session, key, new_value);
101
0
        if (!write_result.ok()) {
102
0
          continue;
103
0
        }
104
0
        auto status = transaction->CommitFuture().get();
105
0
        if (status.ok()) {
106
0
          share.values[key].store(new_value, std::memory_order_release);
107
0
        } else {
108
0
          ASSERT_TRUE(status.IsTryAgain() || status.IsExpired()) << "Bad status: " << status;
109
0
        }
110
0
      }
111
0
    });
112
0
  }
113
114
0
  for (auto& value : share.values) {
115
0
    while (value.load(std::memory_order_acquire) == 0) {
116
0
      std::this_thread::sleep_for(10ms);
117
0
    }
118
0
  }
119
120
0
  while (threads.size() < share.values.size() + kNumReaders) {
121
0
    threads.emplace_back([this, &share] {
122
0
      CDSAttacher attacher;
123
      // We need separate transaction manager for each thread to have different clocks for different
124
      // threads.
125
0
      auto& transaction_manager = CreateTransactionManager();
126
0
      auto session = CreateSession();
127
0
      client::YBTransactionPtr transaction;
128
0
      int32_t key = 0;
129
0
      int32_t value = 0;
130
0
      while (!share.stopped.load(std::memory_order_acquire)) {
131
0
        if (!transaction) {
132
0
          key = RandomUniformInt<int32_t>(0, share.values.size() - 1);
133
0
          value = share.values[key].load(std::memory_order_acquire);
134
0
          transaction = std::make_shared<client::YBTransaction>(&transaction_manager);
135
0
          ASSERT_OK(transaction->Init(IsolationLevel::SNAPSHOT_ISOLATION));
136
0
        }
137
138
0
        session->SetTransaction(transaction);
139
0
        auto read_value = SelectRow(session, key);
140
0
        if (read_value.ok()) {
141
0
          ASSERT_GE(*read_value, value) << "Key: " << key;
142
0
          share.reads.fetch_add(1, std::memory_order_release);
143
0
          transaction->Abort();
144
0
          transaction = nullptr;
145
0
        } else {
146
0
          ASSERT_EQ(ql::ErrorCode::RESTART_REQUIRED, ql::GetErrorCode(read_value.status()))
147
0
              << "Bad value: " << read_value;
148
0
          transaction = ASSERT_RESULT(transaction->CreateRestartedTransaction());
149
0
        }
150
0
      }
151
0
    });
152
0
  }
153
154
0
  std::this_thread::sleep_for(10s);
155
156
0
  share.stopped.store(true, std::memory_order_release);
157
158
0
  for (auto& thread : threads) {
159
0
    thread.join();
160
0
  }
161
162
0
  EXPECT_GT(share.reads.load(std::memory_order_acquire), 0);
163
164
0
  for (size_t key = 0; key != share.values.size(); ++key) {
165
0
    LOG(INFO) << key << " => " << share.values[key].load(std::memory_order_acquire);
166
0
    EXPECT_GT(share.values[key].load(std::memory_order_acquire), 0) << "Key: " << key;
167
0
  }
168
0
}
169
170
} // namespace yb