YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/yql/pgwrapper/pg_txn-test.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include "yb/yql/pgwrapper/pg_mini_test_base.h"
15
16
#include "yb/util/test_macros.h"
17
#include "yb/util/test_thread_holder.h"
18
19
using namespace std::literals;
20
21
DECLARE_bool(TEST_fail_in_apply_if_no_metadata);
22
23
namespace yb {
24
namespace pgwrapper {
25
26
class PgTxnTest : public PgMiniTestBase {
27
28
};
29
30
0
TEST_F(PgTxnTest, YB_DISABLE_TEST_IN_SANITIZERS(EmptyUpdate)) {
31
0
  FLAGS_TEST_fail_in_apply_if_no_metadata = true;
32
33
0
  auto conn = ASSERT_RESULT(Connect());
34
35
0
  ASSERT_OK(conn.Execute("CREATE TABLE test (key TEXT, value TEXT, PRIMARY KEY((key) HASH))"));
36
0
  ASSERT_OK(conn.StartTransaction(IsolationLevel::SNAPSHOT_ISOLATION));
37
0
  ASSERT_OK(conn.Execute("UPDATE test SET value = 'a' WHERE key = 'b'"));
38
0
  ASSERT_OK(conn.CommitTransaction());
39
0
}
40
41
class PgTxnRF1Test : public PgTxnTest {
42
 public:
43
0
  size_t NumTabletServers() override {
44
0
    return 1;
45
0
  }
46
};
47
48
0
TEST_F_EX(PgTxnTest, YB_DISABLE_TEST_IN_TSAN(SelectRF1ReadOnlyDeferred), PgTxnRF1Test) {
49
0
  auto conn = ASSERT_RESULT(Connect());
50
51
0
  ASSERT_OK(conn.Execute("CREATE TABLE test (key INT)"));
52
0
  ASSERT_OK(conn.Execute("INSERT INTO test VALUES (1)"));
53
0
  ASSERT_OK(conn.Execute("BEGIN ISOLATION LEVEL SERIALIZABLE, READ ONLY, DEFERRABLE"));
54
0
  auto res = ASSERT_RESULT(conn.FetchValue<int32_t>("SELECT * FROM test"));
55
0
  ASSERT_EQ(res, 1);
56
0
  ASSERT_OK(conn.Execute("COMMIT"));
57
0
}
58
59
0
TEST_F(PgTxnTest, YB_DISABLE_TEST_IN_TSAN(SerializableReadWriteConflicts)) {
60
0
  auto conn1 = ASSERT_RESULT(Connect());
61
0
  auto conn2 = ASSERT_RESULT(Connect());
62
0
  constexpr double kPriorityBound = 0.5;
63
64
0
  ASSERT_OK(conn1.ExecuteFormat("SET yb_transaction_priority_lower_bound = $0", kPriorityBound));
65
0
  ASSERT_OK(conn1.Execute("CREATE TABLE test (key INT, value INT, PRIMARY KEY((key) HASH))"));
66
0
  ASSERT_OK(conn1.Execute("CREATE INDEX idx ON test (value)"));
67
0
  ASSERT_OK(conn1.StartTransaction(IsolationLevel::SERIALIZABLE_ISOLATION));
68
0
  ASSERT_OK(conn1.Execute("INSERT INTO test VALUES (1, 1)"));
69
0
  ASSERT_OK(conn2.ExecuteFormat("SET yb_transaction_priority_upper_bound = $0", kPriorityBound));
70
0
  ASSERT_OK(conn2.StartTransaction(IsolationLevel::SERIALIZABLE_ISOLATION));
71
0
  ASSERT_NOK(conn2.Fetch("SELECT key FROM test WHERE value = 1"));
72
0
  ASSERT_OK(conn1.Execute("COMMIT"));
73
0
}
74
75
// Test concurrently insert increasing values, and in parallel perform read of several recent
76
// values.
77
// Checking that reads could be serialized.
78
0
TEST_F(PgTxnTest, YB_DISABLE_TEST_IN_TSAN(ReadRecentSet)) {
79
0
  auto conn = ASSERT_RESULT(Connect());
80
0
  constexpr int kWriters = 16;
81
0
  constexpr int kReaders = 16;
82
0
  constexpr int kReadLength = 32;
83
84
0
  ASSERT_OK(conn.Execute(
85
0
      "CREATE TABLE test (key INT, value INT, PRIMARY KEY((key) HASH)) SPLIT INTO 1 TABLETS"));
86
0
  ASSERT_OK(conn.Execute(
87
0
      "CREATE INDEX idx ON test (value) SPLIT INTO 2 TABLETS"));
88
0
  TestThreadHolder thread_holder;
89
0
  std::atomic<int> value(0);
90
0
  for (int i = 0; i != kWriters; ++i) {
91
0
    thread_holder.AddThreadFunctor(
92
0
        [this, &stop = thread_holder.stop_flag(), &value] {
93
0
      auto connection = ASSERT_RESULT(Connect());
94
0
      while (!stop.load()) {
95
0
        int cur = value.fetch_add(1);
96
0
        ASSERT_OK(connection.StartTransaction(IsolationLevel::SERIALIZABLE_ISOLATION));
97
0
        auto status = connection.ExecuteFormat("INSERT INTO test VALUES ($0, $0)", cur);
98
0
        if (status.ok()) {
99
0
          status = connection.CommitTransaction();
100
0
        }
101
0
        if (!status.ok()) {
102
0
          ASSERT_OK(connection.RollbackTransaction());
103
0
        }
104
0
      }
105
0
    });
106
0
  }
107
0
  std::mutex reads_mutex;
108
0
  struct Read {
109
0
    int read_min;
110
0
    uint64_t mask;
111
112
0
    static std::string ValuesToString(int read_min, uint64_t mask) {
113
0
      auto v = read_min;
114
0
      auto m = mask;
115
0
      std::vector<int> values;
116
0
      while (m) {
117
0
        if (m & 1ULL) {
118
0
          values.push_back(v);
119
0
        }
120
0
        ++v;
121
0
        m >>= 1ULL;
122
0
      }
123
124
0
      return AsString(values);
125
0
    }
126
127
0
    std::string ToString() const {
128
0
      return Format("{ read range: $0-$1 values: $2 }",
129
0
                    read_min, read_min + kReadLength - 1, ValuesToString(read_min, mask));
130
0
    }
131
0
  };
132
0
  std::vector<Read> reads;
133
0
  for (int i = 0; i != kReaders; ++i) {
134
0
    thread_holder.AddThreadFunctor(
135
0
        [this, &stop = thread_holder.stop_flag(), &value, &reads, &reads_mutex] {
136
0
      auto connection = ASSERT_RESULT(Connect());
137
0
      char str_buffer[0x200];
138
0
      while (!stop.load()) {
139
0
        const auto read_min = std::max(value.load() - kReadLength, 0);
140
0
        char* p = str_buffer;
141
0
        for (auto v = read_min; v != read_min + kReadLength; ++v) {
142
0
          if (p != str_buffer) {
143
0
            *p++ = ',';
144
0
          }
145
0
          p = FastInt64ToBufferLeft(v, p);
146
0
        }
147
0
        ASSERT_OK(connection.StartTransaction(IsolationLevel::SERIALIZABLE_ISOLATION));
148
0
        auto res = connection.FetchFormat("SELECT value FROM test WHERE value in ($0)", str_buffer);
149
0
        if (!res.ok()) {
150
0
          ASSERT_OK(connection.RollbackTransaction());
151
0
          continue;
152
0
        }
153
0
        auto status = connection.CommitTransaction();
154
0
        if (!status.ok()) {
155
0
          ASSERT_OK(connection.RollbackTransaction());
156
0
          continue;
157
0
        }
158
0
        uint64_t mask = 0;
159
0
        for (int j = 0, count = PQntuples(res->get()); j != count; ++j) {
160
0
          mask |= 1ULL << (ASSERT_RESULT(GetInt32(res->get(), j, 0)) - read_min);
161
0
        }
162
0
        std::lock_guard<std::mutex> lock(reads_mutex);
163
0
        Read new_read{read_min, mask};
164
0
        reads.erase(std::remove_if(reads.begin(), reads.end(),
165
0
            [&new_read, &stop](const auto& old_read) {
166
0
          int read_min_delta = new_read.read_min - old_read.read_min;
167
          // Existing read is too old, remove it.
168
0
          if (read_min_delta >= 64) {
169
0
            return true;
170
0
          }
171
          // New read is too old, cannot check it.
172
0
          if (read_min_delta <= -64) {
173
0
            return false;
174
0
          }
175
0
          constexpr auto kFullMask = (1ULL << kReadLength) - 1ULL;
176
          // Extract only numbers that belong to both reads.
177
0
          uint64_t lmask, rmask;
178
0
          if (read_min_delta >= 0) {
179
0
            lmask = new_read.mask & (kFullMask >> read_min_delta);
180
0
            rmask = (old_read.mask >> read_min_delta) & kFullMask;
181
0
          } else {
182
0
            lmask = (new_read.mask >> -read_min_delta) & kFullMask;
183
0
            rmask = old_read.mask & (kFullMask >> -read_min_delta);
184
0
          }
185
          // Check that one set is subset of another subset.
186
          // I.e. only one set is allowed to have elements that is not contained in another set.
187
0
          if ((lmask | rmask) != std::max(lmask, rmask)) {
188
0
            const auto read = std::max(old_read.read_min, new_read.read_min);
189
0
            ADD_FAILURE() << "R1: " << old_read.ToString() << "\nR2: " << new_read.ToString()
190
0
                          << "\nR1-R2: " << Read::ValuesToString(read, rmask ^ (lmask & rmask))
191
0
                          << ", R2-R1: " << Read::ValuesToString(read, lmask ^ (lmask & rmask));
192
0
            stop.store(true);
193
0
          }
194
0
          return false;
195
0
        }), reads.end());
196
0
        reads.push_back(new_read);
197
0
      }
198
0
    });
199
0
  }
200
201
0
  thread_holder.WaitAndStop(30s);
202
0
}
203
204
} // namespace pgwrapper
205
} // namespace yb