YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/yql/pgwrapper/pg_mini-test.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include <atomic>
15
#include <thread>
16
17
#include <boost/preprocessor/seq/for_each.hpp>
18
19
#include <gtest/gtest.h>
20
21
#include "yb/client/yb_table_name.h"
22
23
#include "yb/common/pgsql_error.h"
24
25
#include "yb/docdb/value_type.h"
26
27
#include "yb/integration-tests/mini_cluster.h"
28
29
#include "yb/master/catalog_entity_info.h"
30
#include "yb/master/catalog_manager_if.h"
31
#include "yb/master/mini_master.h"
32
#include "yb/master/sys_catalog_constants.h"
33
34
#include "yb/rocksdb/db.h"
35
36
#include "yb/server/skewed_clock.h"
37
38
#include "yb/tablet/tablet.h"
39
#include "yb/tablet/tablet_peer.h"
40
#include "yb/tablet/transaction_participant.h"
41
42
#include "yb/tools/tools_test_utils.h"
43
44
#include "yb/util/atomic.h"
45
#include "yb/util/random_util.h"
46
#include "yb/util/scope_exit.h"
47
#include "yb/util/status_log.h"
48
#include "yb/util/test_macros.h"
49
#include "yb/util/test_thread_holder.h"
50
#include "yb/util/test_util.h"
51
#include "yb/util/tsan_util.h"
52
53
#include "yb/yql/pggate/pggate_flags.h"
54
#include "yb/yql/pgwrapper/pg_mini_test_base.h"
55
56
using namespace std::literals;
57
58
DECLARE_bool(flush_rocksdb_on_shutdown);
59
DECLARE_bool(TEST_force_master_leader_resolution);
60
DECLARE_bool(TEST_timeout_non_leader_master_rpcs);
61
DECLARE_double(TEST_respond_write_failed_probability);
62
DECLARE_double(TEST_transaction_ignore_applying_probability);
63
DECLARE_int32(history_cutoff_propagation_interval_ms);
64
DECLARE_int32(timestamp_history_retention_interval_sec);
65
DECLARE_int32(txn_max_apply_batch_records);
66
DECLARE_int64(apply_intents_task_injected_delay_ms);
67
DECLARE_uint64(max_clock_skew_usec);
68
DECLARE_int64(db_write_buffer_size);
69
DECLARE_bool(rocksdb_use_logging_iterator);
70
DECLARE_bool(enable_automatic_tablet_splitting);
71
DECLARE_int32(yb_num_shards_per_tserver);
72
DECLARE_int64(tablet_split_low_phase_size_threshold_bytes);
73
DECLARE_int64(tablet_split_high_phase_size_threshold_bytes);
74
DECLARE_int64(tablet_split_low_phase_shard_count_per_node);
75
DECLARE_int64(tablet_split_high_phase_shard_count_per_node);
76
77
DECLARE_int32(heartbeat_interval_ms);
78
DECLARE_int32(tserver_heartbeat_metrics_interval_ms);
79
DECLARE_int32(TEST_txn_participant_inject_latency_on_apply_update_txn_ms);
80
81
DECLARE_int64(db_block_size_bytes);
82
DECLARE_int64(db_filter_block_size_bytes);
83
DECLARE_int64(db_index_block_size_bytes);
84
DECLARE_int64(tablet_force_split_threshold_bytes);
85
DECLARE_int64(TEST_inject_random_delay_on_txn_status_response_ms);
86
87
namespace yb {
88
namespace pgwrapper {
89
namespace {
90
91
template<IsolationLevel level>
92
class TxnHelper {
93
 public:
94
0
  static CHECKED_STATUS StartTxn(PGConn* connection) {
95
0
    return connection->StartTransaction(level);
96
0
  }
Unexecuted instantiation: pg_mini-test.cc:_ZN2yb9pgwrapper12_GLOBAL__N_19TxnHelperILNS_14IsolationLevelE2EE8StartTxnEPNS0_6PGConnE
Unexecuted instantiation: pg_mini-test.cc:_ZN2yb9pgwrapper12_GLOBAL__N_19TxnHelperILNS_14IsolationLevelE1EE8StartTxnEPNS0_6PGConnE
97
98
0
  static CHECKED_STATUS ExecuteInTxn(PGConn* connection, const std::string& query) {
99
0
    const auto guard = CreateTxnGuard(connection);
100
0
    return connection->Execute(query);
101
0
  }
Unexecuted instantiation: pg_mini-test.cc:_ZN2yb9pgwrapper12_GLOBAL__N_19TxnHelperILNS_14IsolationLevelE2EE12ExecuteInTxnEPNS0_6PGConnERKNSt3__112basic_stringIcNS7_11char_traitsIcEENS7_9allocatorIcEEEE
Unexecuted instantiation: pg_mini-test.cc:_ZN2yb9pgwrapper12_GLOBAL__N_19TxnHelperILNS_14IsolationLevelE1EE12ExecuteInTxnEPNS0_6PGConnERKNSt3__112basic_stringIcNS7_11char_traitsIcEENS7_9allocatorIcEEEE
102
103
0
  static Result<PGResultPtr> FetchInTxn(PGConn* connection, const std::string& query) {
104
0
    const auto guard = CreateTxnGuard(connection);
105
0
    return connection->Fetch(query);
106
0
  }
Unexecuted instantiation: pg_mini-test.cc:_ZN2yb9pgwrapper12_GLOBAL__N_19TxnHelperILNS_14IsolationLevelE2EE10FetchInTxnEPNS0_6PGConnERKNSt3__112basic_stringIcNS7_11char_traitsIcEENS7_9allocatorIcEEEE
Unexecuted instantiation: pg_mini-test.cc:_ZN2yb9pgwrapper12_GLOBAL__N_19TxnHelperILNS_14IsolationLevelE1EE10FetchInTxnEPNS0_6PGConnERKNSt3__112basic_stringIcNS7_11char_traitsIcEENS7_9allocatorIcEEEE
107
108
 private:
109
0
  static auto CreateTxnGuard(PGConn* connection) {
110
0
    EXPECT_OK(StartTxn(connection));
111
0
    return ScopeExit([connection]() {
112
      // Event in case some operations in transaction failed the COMMIT command
113
      // will complete successfully as ROLLBACK will be performed by postgres.
114
0
      EXPECT_OK(connection->Execute("COMMIT"));
115
0
    });
Unexecuted instantiation: pg_mini-test.cc:_ZZN2yb9pgwrapper12_GLOBAL__N_19TxnHelperILNS_14IsolationLevelE2EE14CreateTxnGuardEPNS0_6PGConnEENKUlvE_clEv
Unexecuted instantiation: pg_mini-test.cc:_ZZN2yb9pgwrapper12_GLOBAL__N_19TxnHelperILNS_14IsolationLevelE1EE14CreateTxnGuardEPNS0_6PGConnEENKUlvE_clEv
116
0
  }
Unexecuted instantiation: pg_mini-test.cc:_ZN2yb9pgwrapper12_GLOBAL__N_19TxnHelperILNS_14IsolationLevelE2EE14CreateTxnGuardEPNS0_6PGConnE
Unexecuted instantiation: pg_mini-test.cc:_ZN2yb9pgwrapper12_GLOBAL__N_19TxnHelperILNS_14IsolationLevelE1EE14CreateTxnGuardEPNS0_6PGConnE
117
};
118
119
0
std::string RowMarkTypeToPgsqlString(const RowMarkType row_mark_type) {
120
0
  switch (row_mark_type) {
121
0
    case RowMarkType::ROW_MARK_EXCLUSIVE:
122
0
      return "UPDATE";
123
0
    case RowMarkType::ROW_MARK_NOKEYEXCLUSIVE:
124
0
      return "NO KEY UPDATE";
125
0
    case RowMarkType::ROW_MARK_SHARE:
126
0
      return "SHARE";
127
0
    case RowMarkType::ROW_MARK_KEYSHARE:
128
0
      return "KEY SHARE";
129
0
    default:
130
      // We shouldn't get here because other row lock types are disabled at the postgres level.
131
0
      LOG(DFATAL) << "Unsupported row lock of type " << RowMarkType_Name(row_mark_type);
132
0
      return "";
133
0
  }
134
0
}
135
136
YB_DEFINE_ENUM(TestStatement, (kInsert)(kDelete));
137
138
} // namespace
139
140
class PgMiniTest : public PgMiniTestBase {
141
 protected:
142
  // Have several threads doing updates and several threads doing large scans in parallel.
143
  // If deferrable is true, then the scans are in deferrable transactions, so no read restarts are
144
  // expected.
145
  // Otherwise, the scans are in transactions with snapshot isolation, but we still don't expect any
146
  // read restarts to be observed because they should be transparently handled on the postgres side.
147
  void TestReadRestart(bool deferrable = true);
148
149
  // Run interleaved INSERT/DELETE, SELECT with specified isolation level and row mark.
150
  // Possible isolation levels are SNAPSHOT_ISOLATION and SERIALIZABLE_ISOLATION.
151
  // Possible row marks are ROW_MARK_KEYSHARE, ROW_MARK_SHARE, ROW_MARK_NOKEYEXCLUSIVE, and
152
  // ROW_MARK_EXCLUSIVE.
153
  void TestSelectRowLock(IsolationLevel isolation, RowMarkType row_mark, TestStatement statement);
154
155
  void TestForeignKey(IsolationLevel isolation);
156
157
  void TestBigInsert(bool restart);
158
159
  void CreateTableAndInitialize(std::string table_name, int num_tablets);
160
161
  void DestroyTable(std::string table_name);
162
163
  void GetTableIDFromTableName(const std::string table_name, std::string* table_id);
164
165
  void StartReadWriteThreads(std::string table_name, TestThreadHolder *thread_holder);
166
167
  void TestConcurrentDeleteRowAndUpdateColumn(bool select_before_update);
168
169
0
  void FlushAndCompactTablets() {
170
0
    FLAGS_timestamp_history_retention_interval_sec = 0;
171
0
    FLAGS_history_cutoff_propagation_interval_ms = 1;
172
0
    ASSERT_OK(cluster_->FlushTablets(tablet::FlushMode::kSync));
173
0
    const auto compaction_start = MonoTime::Now();
174
0
    ASSERT_OK(cluster_->CompactTablets());
175
0
    const auto compaction_finish = MonoTime::Now();
176
0
    const double compaction_elapsed_time_sec = (compaction_finish - compaction_start).ToSeconds();
177
0
    LOG(INFO) << "Compaction duration: " << compaction_elapsed_time_sec << " s";
178
0
  }
179
180
  void RunManyConcurrentReadersTest();
181
};
182
183
class PgMiniSingleTServerTest : public PgMiniTest {
184
 public:
185
0
  size_t NumTabletServers() override {
186
0
    return 1;
187
0
  }
188
};
189
190
class PgMiniMasterFailoverTest : public PgMiniTest {
191
 public:
192
0
  size_t NumMasters() override {
193
0
    return 3;
194
0
  }
195
};
196
197
// Try to change this to test follower reads.
198
0
TEST_F(PgMiniTest, YB_DISABLE_TEST_IN_TSAN(FollowerReads)) {
199
0
  auto conn = ASSERT_RESULT(Connect());
200
0
  ASSERT_OK(conn.Execute("CREATE TABLE t2 (key int PRIMARY KEY, word TEXT, phrase TEXT)"));
201
0
  ASSERT_OK(conn.Execute("INSERT INTO t2 (key, word, phrase) VALUES (1, 'old', 'old is gold')"));
202
0
  ASSERT_OK(conn.Execute("INSERT INTO t2 (key, word, phrase) VALUES (2, 'NEW', 'NEW is fine')"));
203
204
0
  ASSERT_OK(conn.Execute("CREATE TABLE t (key INT PRIMARY KEY, value TEXT)"));
205
0
  ASSERT_OK(conn.Execute("INSERT INTO t (key, value) VALUES (1, 'old')"));
206
207
0
  ASSERT_OK(conn.Execute("SET yb_debug_log_docdb_requests = true"));
208
0
  ASSERT_OK(conn.Execute("SET yb_read_from_followers = true"));
209
210
  // Try to set a value < 2 * max_clock_skew (500ms) should fail.
211
0
  ASSERT_NOK(conn.Execute(Format("SET yb_follower_read_staleness_ms = $0", 400)));
212
0
  ASSERT_NOK(conn.Execute(Format("SET yb_follower_read_staleness_ms = $0", 999)));
213
  // Setting a value > 2 * max_clock_skew should work.
214
0
  ASSERT_OK(conn.Execute(Format("SET yb_follower_read_staleness_ms = $0", 1001)));
215
216
  // Setting staleness to what we require for the test.
217
  // Sleep and then perform an update, such that follower reads should see the old value.
218
  // But current reads will see the new/updated value.
219
0
  constexpr int32_t kStalenessMs = 4000;
220
0
  SleepFor(MonoDelta::FromMilliseconds(kStalenessMs));
221
0
  ASSERT_OK(conn.Execute("UPDATE t SET value = 'NEW' WHERE key = 1"));
222
0
  auto kUpdateTime = MonoTime::Now();
223
0
  ASSERT_OK(conn.Execute(Format("SET yb_follower_read_staleness_ms = $0", kStalenessMs)));
224
225
  // Follower reads will not be enabled unless a transaction block is marked read-only.
226
0
  {
227
0
    ASSERT_OK(conn.Execute("BEGIN TRANSACTION"));
228
0
    auto value = ASSERT_RESULT(conn.FetchValue<std::string>("SELECT value FROM t WHERE key = 1"));
229
0
    ASSERT_EQ(value, "NEW");
230
0
    ASSERT_OK(conn.Execute("COMMIT"));
231
0
  }
232
233
  // Follower reads will be enabled for transaction block(s) marked read-only.
234
0
  {
235
0
    ASSERT_OK(conn.Execute("BEGIN TRANSACTION READ ONLY"));
236
0
    auto value = ASSERT_RESULT(conn.FetchValue<std::string>("SELECT value FROM t WHERE key = 1"));
237
0
    ASSERT_EQ(value, "old");
238
0
    ASSERT_OK(conn.Execute("COMMIT"));
239
0
  }
240
241
  // Follower reads will not be enabled unless the session or statement is marked read-only.
242
0
  auto value = ASSERT_RESULT(conn.FetchValue<std::string>("SELECT value FROM t WHERE key = 1"));
243
0
  ASSERT_EQ(value, "NEW");
244
245
0
  value = ASSERT_RESULT(
246
0
      conn.FetchValue<std::string>("SELECT phrase FROM t, t2 WHERE t.value = t2.word"));
247
0
  ASSERT_EQ(value, "NEW is fine");
248
249
  // Follower reads can be enabled for a single statement with a pg hint.
250
0
  value =
251
0
      ASSERT_RESULT(conn.FetchValue<std::string>("/*+ Set(transaction_read_only on) */ "
252
0
                                                 "SELECT value FROM t WHERE key = 1"));
253
0
  ASSERT_EQ(value, "old");
254
0
  value = ASSERT_RESULT(
255
0
      conn.FetchValue<std::string>("/*+ Set(transaction_read_only on) */ "
256
0
                                   "SELECT phrase FROM t, t2 WHERE t.value = t2.word"));
257
0
  ASSERT_EQ(value, "old is gold");
258
259
  // pg_hint only applies for the specific statement used.
260
  // Statements following it should not enable follower reads if it is not marked read-only.
261
0
  value = ASSERT_RESULT(conn.FetchValue<std::string>("SELECT value FROM t WHERE key = 1"));
262
0
  ASSERT_EQ(value, "NEW");
263
264
  // pg_hint should also apply for prepared statements, if the hint is provided
265
  // at PREPARE stage.
266
0
  {
267
0
    ASSERT_OK(
268
0
        conn.Execute("PREPARE hinted_select_stmt (int) AS "
269
0
                     "/*+ Set(transaction_read_only on) */ "
270
0
                     "SELECT value FROM t WHERE key = $1"));
271
0
    value = ASSERT_RESULT(conn.FetchValue<std::string>("EXECUTE hinted_select_stmt (1)"));
272
0
    ASSERT_EQ(value, "old");
273
0
    value =
274
0
        ASSERT_RESULT(conn.FetchValue<std::string>("/*+ Set(transaction_read_only on) */ "
275
0
                                                   "EXECUTE hinted_select_stmt (1)"));
276
0
    ASSERT_EQ(value, "old");
277
0
    value =
278
0
        ASSERT_RESULT(conn.FetchValue<std::string>("/*+ Set(transaction_read_only off) */ "
279
0
                                                   "EXECUTE hinted_select_stmt (1)"));
280
0
    ASSERT_EQ(value, "old");
281
0
  }
282
  // Adding a pg_hint at the EXECUTE stage has no effect.
283
0
  {
284
0
    ASSERT_OK(
285
0
        conn.Execute("PREPARE select_stmt (int) AS "
286
0
                     "SELECT value FROM t WHERE key = $1"));
287
0
    value = ASSERT_RESULT(conn.FetchValue<std::string>("EXECUTE select_stmt (1)"));
288
0
    ASSERT_EQ(value, "NEW");
289
0
    value =
290
0
        ASSERT_RESULT(conn.FetchValue<std::string>("/*+ Set(transaction_read_only on) */ "
291
0
                                                   "EXECUTE select_stmt (1)"));
292
0
    ASSERT_EQ(value, "NEW");
293
0
  }
294
295
  // pg_hint with func()
296
  // The hint may be provided when the function is defined, or when the function is
297
  // called.
298
0
  {
299
0
    ASSERT_OK(
300
0
        conn.Execute("CREATE FUNCTION func() RETURNS text AS"
301
0
                     " $$ SELECT value FROM t WHERE key = 1 $$ LANGUAGE SQL"));
302
0
    value = ASSERT_RESULT(conn.FetchValue<std::string>("SELECT func()"));
303
0
    ASSERT_EQ(value, "NEW");
304
0
    value =
305
0
        ASSERT_RESULT(conn.FetchValue<std::string>("/*+ Set(transaction_read_only off) */ "
306
0
                                                   "SELECT func()"));
307
0
    ASSERT_EQ(value, "NEW");
308
0
    value =
309
0
        ASSERT_RESULT(conn.FetchValue<std::string>("/*+ Set(transaction_read_only on) */ "
310
0
                                                   "SELECT func()"));
311
0
    ASSERT_EQ(value, "old");
312
0
    ASSERT_OK(conn.Execute("DROP FUNCTION func()"));
313
0
  }
314
0
  {
315
0
    ASSERT_OK(
316
0
        conn.Execute("CREATE FUNCTION hinted_func() RETURNS text AS"
317
0
                     " $$ "
318
0
                     "/*+ Set(transaction_read_only on) */ "
319
0
                     "SELECT value FROM t WHERE key = 1"
320
0
                     " $$ LANGUAGE SQL"));
321
0
    value = ASSERT_RESULT(conn.FetchValue<std::string>("SELECT hinted_func()"));
322
0
    ASSERT_EQ(value, "old");
323
0
    value =
324
0
        ASSERT_RESULT(conn.FetchValue<std::string>("/*+ Set(transaction_read_only off) */ "
325
0
                                                   "SELECT hinted_func()"));
326
0
    ASSERT_EQ(value, "old");
327
0
    value =
328
0
        ASSERT_RESULT(conn.FetchValue<std::string>("/*+ Set(transaction_read_only on) */ "
329
0
                                                   "SELECT hinted_func()"));
330
0
    ASSERT_EQ(value, "old");
331
0
    ASSERT_OK(conn.Execute("DROP FUNCTION hinted_func()"));
332
0
  }
333
334
0
  ASSERT_OK(conn.Execute("SET default_transaction_read_only = true"));
335
  // Follower reads will be enabled since the session is marked read-only.
336
0
  value = ASSERT_RESULT(conn.FetchValue<std::string>("SELECT value FROM t WHERE key = 1"));
337
0
  ASSERT_EQ(value, "old");
338
339
  // pg_hint can only mark a session from read-write to read-only.
340
  // Marking a statement in a read-only session as read-write is not allowed.
341
  // Writes operations fail.
342
  // Read operations will be performed as if they are read-write, so will not use follower
343
  // reads.
344
0
  {
345
0
    auto status = conn.Execute(
346
0
        "/*+ Set(transaction_read_only off) */ "
347
0
        "UPDATE t SET value = 'NEWER' WHERE key = 1");
348
0
    ASSERT_EQ(PgsqlError(status), YBPgErrorCode::YB_PG_READ_ONLY_SQL_TRANSACTION) << status;
349
0
    ASSERT_STR_CONTAINS(status.ToString(), "cannot execute UPDATE in a read-only transaction");
350
351
0
    auto value =
352
0
        ASSERT_RESULT(conn.FetchValue<std::string>("/*+ Set(transaction_read_only off) */ "
353
0
                                                   "SELECT value FROM t WHERE key = 1"));
354
0
    ASSERT_EQ(value, "old");
355
0
  }
356
357
  // After sufficient time has passed, even "follower reads" should see the newer value.
358
0
  SleepFor(kUpdateTime + MonoDelta::FromMilliseconds(kStalenessMs) - MonoTime::Now());
359
360
0
  ASSERT_OK(conn.Execute("SET default_transaction_read_only = false"));
361
0
  value = ASSERT_RESULT(conn.FetchValue<std::string>("SELECT value FROM t WHERE key = 1"));
362
0
  ASSERT_EQ(value, "NEW");
363
364
0
  ASSERT_OK(conn.Execute("SET default_transaction_read_only = true"));
365
0
  value = ASSERT_RESULT(conn.FetchValue<std::string>("SELECT value FROM t WHERE key = 1"));
366
0
  ASSERT_EQ(value, "NEW");
367
0
  value = ASSERT_RESULT(
368
0
      conn.FetchValue<std::string>("/*+ Set(transaction_read_only on) */ "
369
0
                                   "SELECT phrase FROM t, t2 WHERE t.value = t2.word"));
370
0
  ASSERT_EQ(value, "NEW is fine");
371
0
}
372
373
0
TEST_F(PgMiniTest, YB_DISABLE_TEST_IN_TSAN(MultiColFollowerReads)) {
374
0
  auto conn = ASSERT_RESULT(Connect());
375
0
  ASSERT_OK(conn.Execute("CREATE TABLE t (k int PRIMARY KEY, c1 TEXT, c2 TEXT)"));
376
0
  ASSERT_OK(conn.Execute("SET yb_debug_log_docdb_requests = true"));
377
0
  ASSERT_OK(conn.Execute("SET yb_read_from_followers = true"));
378
379
0
  constexpr int32_t kSleepTimeMs = 1200 * kTimeMultiplier;
380
381
0
  ASSERT_OK(conn.Execute("INSERT INTO t (k, c1, c2) VALUES (1, 'old', 'old')"));
382
0
  auto kUpdateTime0 = MonoTime::Now();
383
384
0
  SleepFor(MonoDelta::FromMilliseconds(kSleepTimeMs));
385
386
0
  ASSERT_OK(conn.Execute("UPDATE t SET c1 = 'NEW' WHERE k = 1"));
387
0
  auto kUpdateTime1 = MonoTime::Now();
388
389
0
  SleepFor(MonoDelta::FromMilliseconds(kSleepTimeMs));
390
391
0
  ASSERT_OK(conn.Execute("UPDATE t SET c2 = 'NEW' WHERE k = 1"));
392
0
  auto kUpdateTime2 = MonoTime::Now();
393
394
0
  auto result =
395
0
      ASSERT_RESULT(conn.Fetch("/*+ Set(transaction_read_only off) */ "
396
0
                               "SELECT * FROM t WHERE k = 1"));
397
0
  ASSERT_EQ(1, ASSERT_RESULT(GetInt32(result.get(), 0, 0)));
398
0
  ASSERT_EQ("NEW", ASSERT_RESULT(GetString(result.get(), 0, 1)));
399
0
  ASSERT_EQ("NEW", ASSERT_RESULT(GetString(result.get(), 0, 2)));
400
401
0
  const int32_t kOpDurationMs = 10;
402
0
  auto staleness_ms = (MonoTime::Now() - kUpdateTime0).ToMilliseconds() - kOpDurationMs;
403
0
  ASSERT_OK(conn.Execute(Format("SET yb_follower_read_staleness_ms = $0", staleness_ms)));
404
0
  result =
405
0
      ASSERT_RESULT(conn.Fetch("/*+ Set(transaction_read_only on) */ "
406
0
                               "SELECT * FROM t WHERE k = 1"));
407
0
  ASSERT_EQ(1, ASSERT_RESULT(GetInt32(result.get(), 0, 0)));
408
0
  ASSERT_EQ("old", ASSERT_RESULT(GetString(result.get(), 0, 1)));
409
0
  ASSERT_EQ("old", ASSERT_RESULT(GetString(result.get(), 0, 2)));
410
411
0
  staleness_ms = (MonoTime::Now() - kUpdateTime1).ToMilliseconds() - kOpDurationMs;
412
0
  ASSERT_OK(conn.Execute(Format("SET yb_follower_read_staleness_ms = $0", staleness_ms)));
413
0
  result =
414
0
      ASSERT_RESULT(conn.Fetch("/*+ Set(transaction_read_only on) */ "
415
0
                               "SELECT * FROM t WHERE k = 1"));
416
0
  ASSERT_EQ(1, ASSERT_RESULT(GetInt32(result.get(), 0, 0)));
417
0
  ASSERT_EQ("NEW", ASSERT_RESULT(GetString(result.get(), 0, 1)));
418
0
  ASSERT_EQ("old", ASSERT_RESULT(GetString(result.get(), 0, 2)));
419
420
0
  SleepFor(MonoDelta::FromMilliseconds(kSleepTimeMs));
421
422
0
  staleness_ms = (MonoTime::Now() - kUpdateTime2).ToMilliseconds();
423
0
  ASSERT_OK(conn.Execute(Format("SET yb_follower_read_staleness_ms = $0", staleness_ms)));
424
0
  result =
425
0
      ASSERT_RESULT(conn.Fetch("/*+ Set(transaction_read_only on) */ "
426
0
                               "SELECT * FROM t WHERE k = 1"));
427
0
  ASSERT_EQ(1, ASSERT_RESULT(GetInt32(result.get(), 0, 0)));
428
0
  ASSERT_EQ("NEW", ASSERT_RESULT(GetString(result.get(), 0, 1)));
429
0
  ASSERT_EQ("NEW", ASSERT_RESULT(GetString(result.get(), 0, 2)));
430
0
}
431
432
0
TEST_F(PgMiniTest, YB_DISABLE_TEST_IN_TSAN(Simple)) {
433
0
  auto conn = ASSERT_RESULT(Connect());
434
435
0
  ASSERT_OK(conn.Execute("CREATE TABLE t (key INT PRIMARY KEY, value TEXT)"));
436
0
  ASSERT_OK(conn.Execute("INSERT INTO t (key, value) VALUES (1, 'hello')"));
437
438
0
  auto value = ASSERT_RESULT(conn.FetchValue<std::string>("SELECT value FROM t WHERE key = 1"));
439
0
  ASSERT_EQ(value, "hello");
440
0
}
441
442
0
TEST_F(PgMiniTest, YB_DISABLE_TEST_IN_TSAN(RowLockWithoutTransaction)) {
443
0
  auto conn = ASSERT_RESULT(Connect());
444
445
0
  auto status = conn.Execute(
446
0
      "SELECT tmplinline FROM pg_catalog.pg_pltemplate WHERE tmplname !~ tmplhandler FOR SHARE");
447
0
  ASSERT_NOK(status);
448
0
  ASSERT_STR_CONTAINS(status.message().ToBuffer(),
449
0
                      "Read request with row mark types must be part of a transaction");
450
0
}
451
452
0
TEST_F(PgMiniTest, YB_DISABLE_TEST_IN_TSAN(WriteRetry)) {
453
0
  constexpr int kKeys = 100;
454
0
  auto conn = ASSERT_RESULT(Connect());
455
456
0
  ASSERT_OK(conn.Execute("CREATE TABLE t (key INT PRIMARY KEY)"));
457
458
0
  SetAtomicFlag(0.25, &FLAGS_TEST_respond_write_failed_probability);
459
460
0
  LOG(INFO) << "Insert " << kKeys << " keys";
461
0
  for (int key = 0; key != kKeys; ++key) {
462
0
    auto status = conn.ExecuteFormat("INSERT INTO t (key) VALUES ($0)", key);
463
0
    ASSERT_TRUE(status.ok() || PgsqlError(status) == YBPgErrorCode::YB_PG_UNIQUE_VIOLATION ||
464
0
                status.ToString().find("Already present: Duplicate request") != std::string::npos)
465
0
        << status;
466
0
  }
467
468
0
  SetAtomicFlag(0, &FLAGS_TEST_respond_write_failed_probability);
469
470
0
  auto result = ASSERT_RESULT(conn.FetchMatrix("SELECT * FROM t ORDER BY key", kKeys, 1));
471
0
  for (int key = 0; key != kKeys; ++key) {
472
0
    auto fetched_key = ASSERT_RESULT(GetInt32(result.get(), key, 0));
473
0
    ASSERT_EQ(fetched_key, key);
474
0
  }
475
476
0
  LOG(INFO) << "Insert duplicate key";
477
0
  auto status = conn.Execute("INSERT INTO t (key) VALUES (1)");
478
0
  ASSERT_EQ(PgsqlError(status), YBPgErrorCode::YB_PG_UNIQUE_VIOLATION) << status;
479
0
  ASSERT_STR_CONTAINS(status.ToString(), "duplicate key value violates unique constraint");
480
0
}
481
482
0
TEST_F(PgMiniTest, YB_DISABLE_TEST_IN_TSAN(With)) {
483
0
  auto conn = ASSERT_RESULT(Connect());
484
485
0
  ASSERT_OK(conn.Execute("CREATE TABLE test (k int PRIMARY KEY, v int)"));
486
487
0
  ASSERT_OK(conn.Execute(
488
0
      "WITH test2 AS (UPDATE test SET v = 2 WHERE k = 1) "
489
0
      "UPDATE test SET v = 3 WHERE k = 1"));
490
0
}
491
492
0
void PgMiniTest::TestReadRestart(const bool deferrable) {
493
0
  constexpr CoarseDuration kWaitTime = 60s;
494
0
  constexpr int kKeys = 100;
495
0
  constexpr int kNumReadThreads = 8;
496
0
  constexpr int kNumUpdateThreads = 8;
497
0
  constexpr int kRequiredNumReads = 500;
498
0
  constexpr std::chrono::milliseconds kClockSkew = -100ms;
499
0
  std::atomic<int> num_read_restarts(0);
500
0
  std::atomic<int> num_read_successes(0);
501
0
  TestThreadHolder thread_holder;
502
503
  // Set up table
504
0
  auto setup_conn = ASSERT_RESULT(Connect());
505
0
  ASSERT_OK(setup_conn.Execute("CREATE TABLE t (key INT PRIMARY KEY, value INT)"));
506
0
  for (int key = 0; key != kKeys; ++key) {
507
0
    ASSERT_OK(setup_conn.Execute(Format("INSERT INTO t (key, value) VALUES ($0, 0)", key)));
508
0
  }
509
510
  // Introduce clock skew
511
0
  auto delta_changers = SkewClocks(cluster_.get(), kClockSkew);
512
513
  // Start read threads
514
0
  for (int i = 0; i < kNumReadThreads; ++i) {
515
0
    thread_holder.AddThreadFunctor([this, deferrable, &num_read_restarts, &num_read_successes,
516
0
                                    &stop = thread_holder.stop_flag()] {
517
0
      auto read_conn = ASSERT_RESULT(Connect());
518
0
      while (!stop.load(std::memory_order_acquire)) {
519
0
        if (deferrable) {
520
0
          ASSERT_OK(read_conn.Execute("BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE, READ ONLY, "
521
0
                                      "DEFERRABLE"));
522
0
        } else {
523
0
          ASSERT_OK(read_conn.Execute("BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ"));
524
0
        }
525
0
        auto result = read_conn.FetchMatrix("SELECT * FROM t", kKeys, 2);
526
0
        if (!result.ok()) {
527
0
          ASSERT_TRUE(result.status().IsNetworkError()) << result.status();
528
0
          ASSERT_EQ(PgsqlError(result.status()), YBPgErrorCode::YB_PG_T_R_SERIALIZATION_FAILURE)
529
0
              << result.status();
530
0
          ASSERT_STR_CONTAINS(result.status().ToString(), "Restart read");
531
0
          ++num_read_restarts;
532
0
          ASSERT_OK(read_conn.Execute("ABORT"));
533
0
          break;
534
0
        } else {
535
0
          ASSERT_OK(read_conn.Execute("COMMIT"));
536
0
          ++num_read_successes;
537
0
        }
538
0
      }
539
0
    });
540
0
  }
541
542
  // Start update threads
543
0
  for (int i = 0; i < kNumUpdateThreads; ++i) {
544
0
    thread_holder.AddThreadFunctor([this, i, &stop = thread_holder.stop_flag()] {
545
0
      auto update_conn = ASSERT_RESULT(Connect());
546
0
      while (!stop.load(std::memory_order_acquire)) {
547
0
        for (int key = i; key < kKeys; key += kNumUpdateThreads) {
548
0
          ASSERT_OK(update_conn.Execute("BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ"));
549
0
          ASSERT_OK(update_conn.Execute(
550
0
              Format("UPDATE t SET value = value + 1 WHERE key = $0", key)));
551
0
          ASSERT_OK(update_conn.Execute("COMMIT"));
552
0
        }
553
0
      }
554
0
    });
555
0
  }
556
557
  // Stop threads after a while
558
0
  thread_holder.WaitAndStop(kWaitTime);
559
560
  // Count successful reads
561
0
  int num_reads = (num_read_restarts.load(std::memory_order_acquire)
562
0
                   + num_read_successes.load(std::memory_order_acquire));
563
0
  LOG(INFO) << "Successful reads: " << num_read_successes.load(std::memory_order_acquire) << "/"
564
0
      << num_reads;
565
0
  ASSERT_EQ(num_read_restarts.load(std::memory_order_acquire), 0);
566
0
  ASSERT_GT(num_read_successes.load(std::memory_order_acquire), kRequiredNumReads);
567
0
}
568
569
class PgMiniLargeClockSkewTest : public PgMiniTest {
570
 public:
571
0
  void SetUp() override {
572
0
    SetAtomicFlag(250000ULL, &FLAGS_max_clock_skew_usec);
573
0
    PgMiniTestBase::SetUp();
574
0
  }
575
};
576
577
TEST_F_EX(PgMiniTest, YB_DISABLE_TEST_IN_SANITIZERS(ReadRestartSerializableDeferrable),
578
0
          PgMiniLargeClockSkewTest) {
579
0
  TestReadRestart(true /* deferrable */);
580
0
}
581
582
TEST_F_EX(PgMiniTest, YB_DISABLE_TEST_IN_SANITIZERS(ReadRestartSnapshot),
583
0
          PgMiniLargeClockSkewTest) {
584
0
  TestReadRestart(false /* deferrable */);
585
0
}
586
587
void PgMiniTest::TestSelectRowLock(
588
0
    IsolationLevel isolation, RowMarkType row_mark, TestStatement statement) {
589
0
  const std::string isolation_str = (
590
0
      isolation == IsolationLevel::SNAPSHOT_ISOLATION ? "REPEATABLE READ" : "SERIALIZABLE");
591
0
  const std::string row_mark_str = RowMarkTypeToPgsqlString(row_mark);
592
0
  constexpr auto kSleepTime = 1s;
593
0
  constexpr int kKeys = 3;
594
0
  PGConn read_conn = ASSERT_RESULT(Connect());
595
0
  PGConn misc_conn = ASSERT_RESULT(Connect());
596
0
  PGConn write_conn = ASSERT_RESULT(Connect());
597
598
  // Set up table
599
0
  ASSERT_OK(misc_conn.Execute("CREATE TABLE t (i INT PRIMARY KEY, j INT)"));
600
  // TODO: remove this when issue #2857 is fixed.
601
0
  std::this_thread::sleep_for(kSleepTime);
602
0
  for (int i = 0; i < kKeys; ++i) {
603
0
    ASSERT_OK(misc_conn.ExecuteFormat("INSERT INTO t (i, j) VALUES ($0, $0)", i));
604
0
  }
605
606
0
  ASSERT_OK(read_conn.ExecuteFormat("BEGIN TRANSACTION ISOLATION LEVEL $0", isolation_str));
607
0
  ASSERT_OK(read_conn.FetchFormat("SELECT * FROM t WHERE i = $0", -1));
608
609
  // Sleep to ensure that read done in txn doesn't face kReadRestart after INSERT (a sleep will
610
  // ensure sufficient gap between write time and read point - more than clock skew).
611
0
  std::this_thread::sleep_for(kSleepTime);
612
613
0
  if (statement == TestStatement::kInsert) {
614
0
    ASSERT_OK(write_conn.ExecuteFormat("INSERT INTO t (i, j) VALUES ($0, $0)", kKeys));
615
0
  } else {
616
0
    ASSERT_OK(write_conn.ExecuteFormat(
617
0
        "DELETE FROM t WHERE i = $0", RandomUniformInt(0, kKeys - 1)));
618
0
  }
619
0
  auto result = read_conn.FetchFormat("SELECT * FROM t FOR $0", row_mark_str);
620
0
  if (isolation == IsolationLevel::SNAPSHOT_ISOLATION && statement == TestStatement::kDelete) {
621
0
    ASSERT_NOK(result);
622
0
    ASSERT_TRUE(result.status().IsNetworkError()) << result.status();
623
0
    ASSERT_EQ(PgsqlError(result.status()), YBPgErrorCode::YB_PG_T_R_SERIALIZATION_FAILURE)
624
0
        << result.status();
625
0
    ASSERT_STR_CONTAINS(result.status().ToString(),
626
0
                        "could not serialize access due to concurrent update");
627
0
    ASSERT_OK(read_conn.Execute("ABORT"));
628
0
  } else {
629
0
    ASSERT_OK(result);
630
    // NOTE: vanilla PostgreSQL expects kKeys rows, but kKeys +/- 1 rows are expected for Yugabyte.
631
0
    auto expected_keys = kKeys;
632
0
    if (isolation != IsolationLevel::SNAPSHOT_ISOLATION) {
633
0
      expected_keys += statement == TestStatement::kInsert ? 1 : -1;
634
0
    }
635
0
    ASSERT_EQ(PQntuples(result.get().get()), expected_keys);
636
0
    ASSERT_OK(read_conn.Execute("COMMIT"));
637
0
  }
638
0
  ASSERT_OK(read_conn.Execute("COMMIT"));
639
0
}
640
641
0
TEST_F(PgMiniTest, YB_DISABLE_TEST_IN_SANITIZERS(SnapshotInsertForUpdate)) {
642
0
  TestSelectRowLock(
643
0
      IsolationLevel::SNAPSHOT_ISOLATION, RowMarkType::ROW_MARK_EXCLUSIVE, TestStatement::kInsert);
644
0
}
645
646
0
TEST_F(PgMiniTest, YB_DISABLE_TEST_IN_SANITIZERS(SerializableInsertForUpdate)) {
647
0
  TestSelectRowLock(
648
0
      IsolationLevel::SERIALIZABLE_ISOLATION, RowMarkType::ROW_MARK_EXCLUSIVE,
649
0
      TestStatement::kInsert);
650
0
}
651
652
0
TEST_F(PgMiniTest, YB_DISABLE_TEST_IN_SANITIZERS(SnapshotInsertForNoKeyUpdate)) {
653
0
  TestSelectRowLock(
654
0
      IsolationLevel::SNAPSHOT_ISOLATION, RowMarkType::ROW_MARK_NOKEYEXCLUSIVE,
655
0
      TestStatement::kInsert);
656
0
}
657
658
0
TEST_F(PgMiniTest, YB_DISABLE_TEST_IN_SANITIZERS(SerializableInsertForNoKeyUpdate)) {
659
0
  TestSelectRowLock(
660
0
      IsolationLevel::SERIALIZABLE_ISOLATION, RowMarkType::ROW_MARK_NOKEYEXCLUSIVE,
661
0
      TestStatement::kInsert);
662
0
}
663
664
0
TEST_F(PgMiniTest, YB_DISABLE_TEST_IN_SANITIZERS(SnapshotInsertForShare)) {
665
0
  TestSelectRowLock(
666
0
      IsolationLevel::SNAPSHOT_ISOLATION, RowMarkType::ROW_MARK_SHARE, TestStatement::kInsert);
667
0
}
668
669
0
TEST_F(PgMiniTest, YB_DISABLE_TEST_IN_SANITIZERS(SerializableInsertForShare)) {
670
0
  TestSelectRowLock(
671
0
      IsolationLevel::SERIALIZABLE_ISOLATION, RowMarkType::ROW_MARK_SHARE, TestStatement::kInsert);
672
0
}
673
674
0
TEST_F(PgMiniTest, YB_DISABLE_TEST_IN_SANITIZERS(SnapshotInsertForKeyShare)) {
675
0
  TestSelectRowLock(
676
0
      IsolationLevel::SNAPSHOT_ISOLATION, RowMarkType::ROW_MARK_KEYSHARE, TestStatement::kInsert);
677
0
}
678
679
0
TEST_F(PgMiniTest, YB_DISABLE_TEST_IN_SANITIZERS(SerializableInsertForKeyShare)) {
680
0
  TestSelectRowLock(
681
0
      IsolationLevel::SERIALIZABLE_ISOLATION, RowMarkType::ROW_MARK_KEYSHARE,
682
0
      TestStatement::kInsert);
683
0
}
684
685
0
TEST_F(PgMiniTest, YB_DISABLE_TEST_IN_SANITIZERS(SnapshotDeleteForUpdate)) {
686
0
  TestSelectRowLock(
687
0
      IsolationLevel::SNAPSHOT_ISOLATION, RowMarkType::ROW_MARK_EXCLUSIVE, TestStatement::kDelete);
688
0
}
689
690
0
TEST_F(PgMiniTest, YB_DISABLE_TEST_IN_SANITIZERS(SerializableDeleteForUpdate)) {
691
0
  TestSelectRowLock(
692
0
      IsolationLevel::SERIALIZABLE_ISOLATION, RowMarkType::ROW_MARK_EXCLUSIVE,
693
0
      TestStatement::kDelete);
694
0
}
695
696
0
TEST_F(PgMiniTest, YB_DISABLE_TEST_IN_SANITIZERS(SnapshotDeleteForNoKeyUpdate)) {
697
0
  TestSelectRowLock(
698
0
      IsolationLevel::SNAPSHOT_ISOLATION, RowMarkType::ROW_MARK_NOKEYEXCLUSIVE,
699
0
      TestStatement::kDelete);
700
0
}
701
702
0
TEST_F(PgMiniTest, YB_DISABLE_TEST_IN_SANITIZERS(SerializableDeleteForNoKeyUpdate)) {
703
0
  TestSelectRowLock(
704
0
      IsolationLevel::SERIALIZABLE_ISOLATION, RowMarkType::ROW_MARK_NOKEYEXCLUSIVE,
705
0
      TestStatement::kDelete);
706
0
}
707
708
0
TEST_F(PgMiniTest, YB_DISABLE_TEST_IN_SANITIZERS(SnapshotDeleteForShare)) {
709
0
  TestSelectRowLock(
710
0
      IsolationLevel::SNAPSHOT_ISOLATION, RowMarkType::ROW_MARK_SHARE, TestStatement::kDelete);
711
0
}
712
713
0
TEST_F(PgMiniTest, YB_DISABLE_TEST_IN_SANITIZERS(SerializableDeleteForShare)) {
714
0
  TestSelectRowLock(
715
0
      IsolationLevel::SERIALIZABLE_ISOLATION, RowMarkType::ROW_MARK_SHARE, TestStatement::kDelete);
716
0
}
717
718
0
TEST_F(PgMiniTest, YB_DISABLE_TEST_IN_SANITIZERS(SnapshotDeleteForKeyShare)) {
719
0
  TestSelectRowLock(
720
0
      IsolationLevel::SNAPSHOT_ISOLATION, RowMarkType::ROW_MARK_KEYSHARE, TestStatement::kDelete);
721
0
}
722
723
0
TEST_F(PgMiniTest, YB_DISABLE_TEST_IN_SANITIZERS(SerializableDeleteForKeyShare)) {
724
0
  TestSelectRowLock(
725
0
      IsolationLevel::SERIALIZABLE_ISOLATION, RowMarkType::ROW_MARK_KEYSHARE,
726
0
      TestStatement::kDelete);
727
0
}
728
729
0
TEST_F(PgMiniTest, YB_DISABLE_TEST_IN_TSAN(SerializableReadOnly)) {
730
0
  PGConn read_conn = ASSERT_RESULT(Connect());
731
0
  PGConn setup_conn = ASSERT_RESULT(Connect());
732
0
  PGConn write_conn = ASSERT_RESULT(Connect());
733
734
  // Set up table
735
0
  ASSERT_OK(setup_conn.Execute("CREATE TABLE t (i INT)"));
736
0
  ASSERT_OK(setup_conn.Execute("INSERT INTO t (i) VALUES (0)"));
737
738
  // SERIALIZABLE, READ ONLY should use snapshot isolation
739
0
  ASSERT_OK(read_conn.Execute("BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE, READ ONLY"));
740
0
  ASSERT_OK(write_conn.Execute("BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE, READ WRITE"));
741
0
  ASSERT_OK(write_conn.Execute("UPDATE t SET i = i + 1"));
742
0
  ASSERT_OK(read_conn.Fetch("SELECT * FROM t"));
743
0
  ASSERT_OK(read_conn.Execute("COMMIT"));
744
0
  ASSERT_OK(write_conn.Execute("COMMIT"));
745
746
  // READ ONLY, SERIALIZABLE should use snapshot isolation
747
0
  ASSERT_OK(read_conn.Execute("BEGIN TRANSACTION READ ONLY, ISOLATION LEVEL SERIALIZABLE"));
748
0
  ASSERT_OK(write_conn.Execute("BEGIN TRANSACTION READ WRITE, ISOLATION LEVEL SERIALIZABLE"));
749
0
  ASSERT_OK(read_conn.Fetch("SELECT * FROM t"));
750
0
  ASSERT_OK(write_conn.Execute("UPDATE t SET i = i + 1"));
751
0
  ASSERT_OK(read_conn.Execute("COMMIT"));
752
0
  ASSERT_OK(write_conn.Execute("COMMIT"));
753
754
  // SHOW for READ ONLY should show serializable
755
0
  ASSERT_OK(read_conn.Execute("BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE, READ ONLY"));
756
0
  Result<PGResultPtr> result = read_conn.Fetch("SHOW transaction_isolation");
757
0
  ASSERT_TRUE(result.ok()) << result.status();
758
0
  string value = ASSERT_RESULT(GetString(result.get().get(), 0, 0));
759
0
  ASSERT_EQ(value, "serializable");
760
0
  ASSERT_OK(read_conn.Execute("COMMIT"));
761
762
  // SHOW for READ WRITE to READ ONLY should show serializable and read_only
763
0
  ASSERT_OK(write_conn.Execute("BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE, READ WRITE"));
764
0
  ASSERT_OK(write_conn.Execute("SET TRANSACTION READ ONLY"));
765
0
  result = write_conn.Fetch("SHOW transaction_isolation");
766
0
  ASSERT_TRUE(result.ok()) << result.status();
767
0
  value = ASSERT_RESULT(GetString(result.get().get(), 0, 0));
768
0
  ASSERT_EQ(value, "serializable");
769
0
  result = write_conn.Fetch("SHOW transaction_read_only");
770
0
  ASSERT_TRUE(result.ok()) << result.status();
771
0
  value = ASSERT_RESULT(GetString(result.get().get(), 0, 0));
772
0
  ASSERT_EQ(value, "on");
773
0
  ASSERT_OK(write_conn.Execute("COMMIT"));
774
775
  // SERIALIZABLE, READ ONLY to READ WRITE should not use snapshot isolation
776
0
  ASSERT_OK(read_conn.Execute("BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE, READ ONLY"));
777
0
  ASSERT_OK(write_conn.Execute("BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE, READ WRITE"));
778
0
  ASSERT_OK(read_conn.Execute("SET TRANSACTION READ WRITE"));
779
0
  ASSERT_OK(write_conn.Execute("UPDATE t SET i = i + 1"));
780
  // The result of the following statement is probabilistic.  If it does not fail now, then it
781
  // should fail during COMMIT.
782
0
  result = read_conn.Fetch("SELECT * FROM t");
783
0
  if (result.ok()) {
784
0
    ASSERT_OK(read_conn.Execute("COMMIT"));
785
0
    Status status = write_conn.Execute("COMMIT");
786
0
    ASSERT_NOK(status);
787
0
    ASSERT_TRUE(status.IsNetworkError()) << status;
788
0
    ASSERT_EQ(PgsqlError(status), YBPgErrorCode::YB_PG_T_R_SERIALIZATION_FAILURE) << status;
789
0
  } else {
790
0
    ASSERT_TRUE(result.status().IsNetworkError()) << result.status();
791
0
    ASSERT_EQ(PgsqlError(result.status()), YBPgErrorCode::YB_PG_T_R_SERIALIZATION_FAILURE)
792
0
        << result.status();
793
0
    ASSERT_STR_CONTAINS(result.status().ToString(), "Conflicts with higher priority transaction");
794
0
  }
795
0
}
796
797
0
void AssertAborted(const Status& status) {
798
0
  ASSERT_NOK(status);
799
0
  ASSERT_STR_CONTAINS(status.ToString(), "aborted");
800
0
}
801
802
0
TEST_F(PgMiniTest, YB_DISABLE_TEST_IN_TSAN(SelectModifySelect)) {
803
0
  {
804
0
    auto read_conn = ASSERT_RESULT(Connect());
805
0
    auto write_conn = ASSERT_RESULT(Connect());
806
807
0
    ASSERT_OK(read_conn.Execute("CREATE TABLE t (i INT)"));
808
0
    ASSERT_OK(read_conn.Execute("BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE"));
809
0
    ASSERT_RESULT(read_conn.FetchMatrix("SELECT * FROM t", 0, 1));
810
0
    ASSERT_OK(write_conn.Execute("INSERT INTO t VALUES (1)"));
811
0
    ASSERT_NO_FATALS(AssertAborted(ResultToStatus(read_conn.Fetch("SELECT * FROM t"))));
812
0
  }
813
0
  {
814
0
    auto read_conn = ASSERT_RESULT(Connect());
815
0
    auto write_conn = ASSERT_RESULT(Connect());
816
817
0
    ASSERT_OK(read_conn.Execute("CREATE TABLE t2 (i INT PRIMARY KEY)"));
818
0
    ASSERT_OK(read_conn.Execute("INSERT INTO t2 VALUES (1)"));
819
820
0
    ASSERT_OK(read_conn.Execute("BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE"));
821
0
    ASSERT_RESULT(read_conn.FetchMatrix("SELECT * FROM t2", 1, 1));
822
0
    ASSERT_OK(write_conn.Execute("DELETE FROM t2 WHERE i = 1"));
823
0
    ASSERT_NO_FATALS(AssertAborted(ResultToStatus(read_conn.Fetch("SELECT * FROM t2"))));
824
0
  }
825
0
}
826
827
class PgMiniSmallWriteBufferTest : public PgMiniTest {
828
 public:
829
0
  void SetUp() override {
830
0
    FLAGS_db_write_buffer_size = 256_KB;
831
0
    PgMiniTest::SetUp();
832
0
  }
833
};
834
835
0
TEST_F_EX(PgMiniTest, YB_DISABLE_TEST_IN_TSAN(BulkCopyWithRestart), PgMiniSmallWriteBufferTest) {
836
0
  const std::string kTableName = "key_value";
837
0
  auto conn = ASSERT_RESULT(Connect());
838
0
  ASSERT_OK(conn.ExecuteFormat(
839
0
      "CREATE TABLE $0 (key INTEGER NOT NULL PRIMARY KEY, value VARCHAR)",
840
0
      kTableName));
841
842
0
  TestThreadHolder thread_holder;
843
0
  constexpr int kTotalBatches = RegularBuildVsSanitizers(50, 5);
844
0
  constexpr int kBatchSize = 1000;
845
0
  constexpr int kValueSize = 128;
846
847
0
  std::atomic<int> key(0);
848
849
0
  thread_holder.AddThreadFunctor([this, &kTableName, &stop = thread_holder.stop_flag(), &key] {
850
0
    SetFlagOnExit set_flag(&stop);
851
0
    auto connection = ASSERT_RESULT(Connect());
852
853
0
    auto se = ScopeExit([&key] {
854
0
      LOG(INFO) << "Total keys: " << key;
855
0
    });
856
857
0
    while (!stop.load(std::memory_order_acquire) && key < kBatchSize * kTotalBatches) {
858
0
      ASSERT_OK(connection.CopyBegin(Format("COPY $0 FROM STDIN WITH BINARY", kTableName)));
859
0
      for (int j = 0; j != kBatchSize; ++j) {
860
0
        connection.CopyStartRow(2);
861
0
        connection.CopyPutInt32(++key);
862
0
        connection.CopyPutString(RandomHumanReadableString(kValueSize));
863
0
      }
864
865
0
      ASSERT_OK(connection.CopyEnd());
866
0
    }
867
0
  });
868
869
0
  thread_holder.AddThread(RestartsThread(cluster_.get(), 5s, &thread_holder.stop_flag()));
870
871
0
  thread_holder.WaitAndStop(120s); // Actually will stop when enough batches were copied
872
873
0
  ASSERT_EQ(key.load(std::memory_order_relaxed), kTotalBatches * kBatchSize);
874
875
0
  LOG(INFO) << "Restarting cluster";
876
0
  ASSERT_OK(RestartCluster());
877
878
0
  ASSERT_OK(WaitFor([this, &conn, &key, &kTableName] {
879
0
    auto intents_count = CountIntents(cluster_.get());
880
0
    LOG(INFO) << "Intents count: " << intents_count;
881
882
0
    if (intents_count <= 5000) {
883
0
      return true;
884
0
    }
885
886
    // We cleanup only transactions that were completely aborted/applied before last replication
887
    // happens.
888
    // So we could get into situation when intents of the last transactions are not cleaned.
889
    // To avoid such scenario in this test we write one more row to allow cleanup.
890
0
    EXPECT_OK(conn.ExecuteFormat(
891
0
        "INSERT INTO $0 VALUES ($1, '$2')", kTableName, ++key,
892
0
        RandomHumanReadableString(kValueSize)));
893
894
0
    return false;
895
0
  }, 10s * kTimeMultiplier, "Intents cleanup", 200ms));
896
0
}
897
898
0
void PgMiniTest::TestForeignKey(IsolationLevel isolation_level) {
899
0
  const std::string kDataTable = "data";
900
0
  const std::string kReferenceTable = "reference";
901
0
  constexpr int kRows = 10;
902
0
  auto conn = ASSERT_RESULT(Connect());
903
904
0
  ASSERT_OK(conn.ExecuteFormat(
905
0
      "CREATE TABLE $0 (id int NOT NULL, name VARCHAR, PRIMARY KEY (id))",
906
0
      kReferenceTable));
907
0
  ASSERT_OK(conn.ExecuteFormat(
908
0
      "CREATE TABLE $0 (ref_id INTEGER, data_id INTEGER, name VARCHAR, "
909
0
          "PRIMARY KEY (ref_id, data_id))",
910
0
      kDataTable));
911
0
  ASSERT_OK(conn.ExecuteFormat(
912
0
      "ALTER TABLE $0 ADD CONSTRAINT fk FOREIGN KEY(ref_id) REFERENCES $1(id) "
913
0
          "ON DELETE CASCADE",
914
0
      kDataTable, kReferenceTable));
915
916
0
  ASSERT_OK(conn.ExecuteFormat(
917
0
      "INSERT INTO $0 VALUES ($1, 'reference_$1')", kReferenceTable, 1));
918
919
0
  for (int i = 1; i <= kRows; ++i) {
920
0
    ASSERT_OK(conn.StartTransaction(isolation_level));
921
0
    ASSERT_OK(conn.ExecuteFormat(
922
0
        "INSERT INTO $0 VALUES ($1, $2, 'data_$2')", kDataTable, 1, i));
923
0
    ASSERT_OK(conn.CommitTransaction());
924
0
  }
925
926
0
  ASSERT_OK(WaitFor([this] {
927
0
    return CountIntents(cluster_.get()) == 0;
928
0
  }, 15s, "Intents cleanup"));
929
0
}
930
931
0
TEST_F(PgMiniTest, YB_DISABLE_TEST_IN_TSAN(ForeignKeySerializable)) {
932
0
  TestForeignKey(IsolationLevel::SERIALIZABLE_ISOLATION);
933
0
}
934
935
0
TEST_F(PgMiniTest, YB_DISABLE_TEST_IN_TSAN(ForeignKeySnapshot)) {
936
0
  TestForeignKey(IsolationLevel::SNAPSHOT_ISOLATION);
937
0
}
938
939
class PgMiniTestNoTxnRetry : public PgMiniTest {
940
 protected:
941
0
  void BeforePgProcessStart() override {
942
0
    FLAGS_ysql_sleep_before_retry_on_txn_conflict = false;
943
0
  }
944
};
945
946
template<IsolationLevel level>
947
class PgMiniTestTxnHelper : public PgMiniTestNoTxnRetry {
948
 protected:
949
950
  // Check possibility of updating column in case row is referenced by foreign key from another txn.
951
0
  void TestReferencedTableUpdate() {
952
0
    auto conn = ASSERT_RESULT(Connect());
953
0
    ASSERT_OK(conn.Execute("CREATE TABLE pktable (k INT PRIMARY KEY, v INT)"));
954
0
    ASSERT_OK(conn.Execute("CREATE TABLE fktable (k INT PRIMARY KEY, fk_p INT, v INT, "
955
0
                           "FOREIGN KEY(fk_p) REFERENCES pktable(k))"));
956
0
    ASSERT_OK(conn.Execute("INSERT INTO pktable VALUES(1, 2)"));
957
0
    auto extra_conn = ASSERT_RESULT(Connect());
958
0
    ASSERT_OK(StartTxn(&extra_conn));
959
0
    ASSERT_OK(extra_conn.Execute("INSERT INTO fktable VALUES(1, 1, 2)"));
960
0
    ASSERT_OK(conn.Execute("UPDATE pktable SET v = 20 WHERE k = 1"));
961
    // extra_conn created strong read intent on (1, liveness) due to foreign key check.
962
    // As a result weak read intent is created for (1).
963
    // conn UPDATE created strong write intent on (1, v).
964
    // As a result weak write intent is created for (1).
965
    // Weak read + weak write on (1) has no conflicts.
966
0
    ASSERT_OK(extra_conn.Execute("COMMIT"));
967
0
    auto res = ASSERT_RESULT(
968
0
        conn.template FetchValue<int64_t>("SELECT COUNT(*) FROM pktable WHERE v = 20"));
969
0
    ASSERT_EQ(res, 1);
970
0
  }
Unexecuted instantiation: _ZN2yb9pgwrapper19PgMiniTestTxnHelperILNS_14IsolationLevelE2EE25TestReferencedTableUpdateEv
Unexecuted instantiation: _ZN2yb9pgwrapper19PgMiniTestTxnHelperILNS_14IsolationLevelE1EE25TestReferencedTableUpdateEv
971
972
  // Check that `FOR KEY SHARE` prevents rows from being deleted even in case not all key
973
  // components are specified (for SERIALIZABLE isolation level). For REPEATABLE READ, only rows
974
  // returned to the user are locked.
975
0
  void TestRowKeyShareLock(const std::string& cur_name = "") {
976
0
    auto conn = ASSERT_RESULT(SetHighPriTxn(Connect()));
977
0
    auto extra_conn = ASSERT_RESULT(SetLowPriTxn(Connect()));
978
979
0
    ASSERT_OK(conn.Execute(
980
0
        "CREATE TABLE t (h INT, r1 INT, r2 INT, v INT, PRIMARY KEY(h, r1, r2))"));
981
0
    ASSERT_OK(conn.Execute(
982
0
        "INSERT INTO t VALUES (1, 2, 3, 4), (1, 2, 30, 40), (1, 3, 4, 5), (10, 2, 3, 4)"));
983
984
    // Transaction 1.
985
    // For SERIALIZABLE level:
986
    //   SELECT FOR KEY SHARE locks the sub doc key prefix (1, 2) as not all key components are
987
    //   specified. This also means that no new rows with this matching prefix can be inserted.
988
    // For REPEATABLE READ level:
989
    //   Only tuples returned by the SELECT statement are locked.
990
0
    ASSERT_OK(StartTxn(&conn));
991
0
    RowLock(&conn, "SELECT * FROM t WHERE h = 1 AND r1 = 2 FOR KEY SHARE", cur_name);
992
993
0
    ASSERT_NOK(ExecuteInTxn(&extra_conn, "DELETE FROM t WHERE h = 1 AND r1 = 2 AND r2 = 3"));
994
0
    ASSERT_NOK(ExecuteInTxn(&extra_conn, "DELETE FROM t WHERE h = 1 AND r1 = 2 AND r2 = 30"));
995
996
    // Doc key (1, 3, 4) is not locked in both isolation levels.
997
0
    ASSERT_OK(ExecuteInTxn(&extra_conn, "DELETE FROM t WHERE h = 1 AND r1 = 3 AND r2 = 4"));
998
999
    // New rows with prefix that matches (1, 2) can't be inserted in SERIALIZABLE level.
1000
0
    if (level == IsolationLevel::SERIALIZABLE_ISOLATION) {
1001
0
      ASSERT_NOK(ExecuteInTxn(&extra_conn, "INSERT INTO t VALUES (1, 2, 100, 100)"));
1002
      // Doc key (1, 2, 2) doesn't exist. But still it conflicts beause of a kStrongRead intent
1003
      // taken on (1, 2) as part of the "fetching" the row when taking for key share locks (which
1004
      // only requires kWeakRead).
1005
0
      ASSERT_NOK(ExecuteInTxn(&extra_conn, "DELETE FROM t WHERE h = 1 AND r1 = 2 AND r2 = 2"));
1006
0
    } else {
1007
0
      ASSERT_OK(ExecuteInTxn(&extra_conn, "INSERT INTO t VALUES (1, 2, 100, 100)"));
1008
      // Delete the row again
1009
0
      ASSERT_OK(ExecuteInTxn(&extra_conn, "DELETE FROM t WHERE h = 1 AND r1 = 2 AND r2 = 100"));
1010
1011
      // Doc key (1, 2, 2) doesn't exist.
1012
0
      ASSERT_OK(ExecuteInTxn(&extra_conn, "DELETE FROM t WHERE h = 1 AND r1 = 2 AND r2 = 2"));
1013
0
    }
1014
1015
0
    ASSERT_OK(conn.Execute("COMMIT"));
1016
1017
    // Transaction 2.
1018
    // For SERIALIZABLE level:
1019
    //   SELECT FOR KEY SHARE locks the sub doc key prefix () as no prefix of the pk is specified.
1020
    // For REPEATABLE READ level:
1021
    //   Only tuples returned by the SELECT statement are locked.
1022
0
    ASSERT_OK(StartTxn(&conn));
1023
0
    RowLock(&conn, "SELECT * FROM t WHERE r2 = 2 FOR KEY SHARE", cur_name);
1024
1025
0
    if (level == IsolationLevel::SERIALIZABLE_ISOLATION) {
1026
0
      ASSERT_NOK(ExecuteInTxn(&extra_conn, "DELETE FROM t WHERE h = 1 AND r1 = 2 AND r2 = 3"));
1027
0
      ASSERT_NOK(ExecuteInTxn(&extra_conn, "DELETE FROM t WHERE h = 10 AND r1 = 2 AND r2 = 3"));
1028
      // Doc key (1, 2, 2) doesn't exist.
1029
0
      ASSERT_NOK(ExecuteInTxn(&extra_conn, "DELETE FROM t WHERE h = 1 AND r1 = 2 AND r2 = 2"));
1030
0
    } else {
1031
0
      ASSERT_OK(ExecuteInTxn(&extra_conn, "DELETE FROM t WHERE h = 1 AND r1 = 2 AND r2 = 3"));
1032
0
      ASSERT_OK(ExecuteInTxn(&extra_conn, "DELETE FROM t WHERE h = 10 AND r1 = 2 AND r2 = 3"));
1033
      // Re-add the rows
1034
0
      ASSERT_OK(ExecuteInTxn(&extra_conn, "INSERT INTO t VALUES (1, 2, 3, 4), (10, 2, 3, 4)"));
1035
1036
0
      ASSERT_OK(ExecuteInTxn(&extra_conn, "INSERT INTO t VALUES (1, 2, 100, 100)"));
1037
      // Delete the row again
1038
0
      ASSERT_OK(ExecuteInTxn(&extra_conn, "DELETE FROM t WHERE h = 1 AND r1 = 2 AND r2 = 100"));
1039
1040
      // Doc key (1, 2, 2) doesn't exist.
1041
0
      ASSERT_OK(ExecuteInTxn(&extra_conn, "DELETE FROM t WHERE h = 1 AND r1 = 2 AND r2 = 2"));
1042
0
    }
1043
1044
0
    ASSERT_OK(conn.Execute("COMMIT"));
1045
1046
    // Transaction 3.
1047
    // For SERIALIZABLE level:
1048
    //   SELECT FOR KEY SHARE locks the sub doc key prefix (1) as not all key components are
1049
    //   specified.
1050
    // For REPEATABLE READ level:
1051
    //   Only tuples returned by the SELECT statement are locked.
1052
0
    ASSERT_OK(StartTxn(&conn));
1053
0
    RowLock(&conn, "SELECT * FROM t WHERE h = 1 AND r2 = 2 FOR KEY SHARE", cur_name);
1054
1055
    // Doc key (10, 2, 3) is not locked.
1056
0
    ASSERT_OK(ExecuteInTxn(&extra_conn, "DELETE FROM t WHERE h = 10 AND r1 = 2 AND r2 = 3"));
1057
1058
0
    if (level == IsolationLevel::SERIALIZABLE_ISOLATION) {
1059
0
      ASSERT_NOK(ExecuteInTxn(&extra_conn, "DELETE FROM t WHERE h = 1 AND r1 = 2 AND r2 = 3"));
1060
0
      ASSERT_NOK(ExecuteInTxn(&extra_conn, "INSERT INTO t VALUES (1, 2, 100, 100)"));
1061
1062
      // Doc key (1, 2, 2) doesn't exist.
1063
0
      ASSERT_NOK(ExecuteInTxn(&extra_conn, "DELETE FROM t WHERE h = 1 AND r1 = 2 AND r2 = 2"));
1064
0
    } else {
1065
0
      ASSERT_OK(ExecuteInTxn(&extra_conn, "DELETE FROM t WHERE h = 1 AND r1 = 2 AND r2 = 3"));
1066
      // Re-add deleted row
1067
0
      ASSERT_OK(ExecuteInTxn(&extra_conn, "INSERT INTO t VALUES (1, 2, 3, 4)"));
1068
1069
0
      ASSERT_OK(ExecuteInTxn(&extra_conn, "INSERT INTO t VALUES (1, 2, 100, 100)"));
1070
      // Delete the row again
1071
0
      ASSERT_OK(ExecuteInTxn(&extra_conn, "DELETE FROM t WHERE h = 1 AND r1 = 2 AND r2 = 100"));
1072
1073
      // Doc key (1, 2, 2) doesn't exist.
1074
0
      ASSERT_OK(ExecuteInTxn(&extra_conn, "DELETE FROM t WHERE h = 1 AND r1 = 2 AND r2 = 2"));
1075
0
    }
1076
1077
0
    ASSERT_OK(conn.Execute("COMMIT"));
1078
1079
    // Transaction 4.
1080
    // SELECT FOR KEY SHARE locks one specific row with doc key (1, 2, 3) only.
1081
0
    ASSERT_OK(StartTxn(&conn));
1082
0
    RowLock(&conn, "SELECT * FROM t WHERE h = 1 AND r1 = 2 AND r2 = 3 FOR KEY SHARE", cur_name);
1083
1084
0
    ASSERT_NOK(ExecuteInTxn(&extra_conn, "DELETE FROM t WHERE h = 1 AND r1 = 2 AND r2 = 3"));
1085
0
    ASSERT_OK(ExecuteInTxn(&extra_conn, "DELETE FROM t WHERE h = 1 AND r1 = 2 AND r2 = 30"));
1086
1087
    // Doc key (1, 2, 2) doesn't exist.
1088
0
    ASSERT_OK(ExecuteInTxn(&extra_conn, "DELETE FROM t WHERE h = 1 AND r1 = 2 AND r2 = 2"));
1089
1090
0
    ASSERT_OK(conn.Execute("COMMIT"));
1091
1092
0
    auto res = ASSERT_RESULT(conn.template FetchValue<int64_t>("SELECT COUNT(*) FROM t"));
1093
0
    ASSERT_EQ(res, 1);
1094
0
  }
Unexecuted instantiation: _ZN2yb9pgwrapper19PgMiniTestTxnHelperILNS_14IsolationLevelE2EE19TestRowKeyShareLockERKNSt3__112basic_stringIcNS4_11char_traitsIcEENS4_9allocatorIcEEEE
Unexecuted instantiation: _ZN2yb9pgwrapper19PgMiniTestTxnHelperILNS_14IsolationLevelE1EE19TestRowKeyShareLockERKNSt3__112basic_stringIcNS4_11char_traitsIcEENS4_9allocatorIcEEEE
1095
1096
  // Check conflicts according to the following matrix (X - conflict, O - no conflict):
1097
  //                   | FOR KEY SHARE | FOR SHARE | FOR NO KEY UPDATE | FOR UPDATE
1098
  // ------------------+---------------+-----------+-------------------+-----------
1099
  // FOR KEY SHARE     |       O       |     O     |         O         |     X
1100
  // FOR SHARE         |       O       |     O     |         X         |     X
1101
  // FOR NO KEY UPDATE |       O       |     X     |         X         |     X
1102
  // FOR UPDATE        |       X       |     X     |         X         |     X
1103
0
  void TestRowLockConflictMatrix(const std::string& cur_name = "") {
1104
0
    auto conn = ASSERT_RESULT(SetHighPriTxn(Connect()));
1105
0
    auto extra_conn = ASSERT_RESULT(SetLowPriTxn(Connect()));
1106
1107
0
    ASSERT_OK(conn.Execute("CREATE TABLE t (k INT PRIMARY KEY, v INT)"));
1108
0
    ASSERT_OK(conn.Execute("INSERT INTO t VALUES (1, 1)"));
1109
1110
    // Transaction 1.
1111
0
    ASSERT_OK(StartTxn(&conn));
1112
0
    RowLock(&conn, "SELECT * FROM t WHERE k = 1 FOR UPDATE", cur_name);
1113
1114
0
    ASSERT_NOK(FetchInTxn(&extra_conn, "SELECT * FROM t WHERE k = 1 FOR UPDATE"));
1115
0
    ASSERT_NOK(FetchInTxn(&extra_conn, "SELECT * FROM t WHERE k = 1 FOR NO KEY UPDATE"));
1116
0
    ASSERT_NOK(FetchInTxn(&extra_conn, "SELECT * FROM t WHERE k = 1 FOR SHARE"));
1117
0
    ASSERT_NOK(FetchInTxn(&extra_conn, "SELECT * FROM t WHERE k = 1 FOR KEY SHARE"));
1118
1119
0
    ASSERT_OK(conn.Execute("COMMIT"));
1120
1121
    // Transaction 2.
1122
0
    ASSERT_OK(StartTxn(&conn));
1123
0
    RowLock(&conn, "SELECT * FROM t WHERE k = 1 FOR NO KEY UPDATE", cur_name);
1124
1125
0
    ASSERT_NOK(FetchInTxn(&extra_conn, "SELECT * FROM t WHERE k = 1 FOR UPDATE"));
1126
0
    ASSERT_NOK(FetchInTxn(&extra_conn, "SELECT * FROM t WHERE k = 1 FOR NO KEY UPDATE"));
1127
0
    ASSERT_NOK(FetchInTxn(&extra_conn, "SELECT * FROM t WHERE k = 1 FOR SHARE"));
1128
0
    ASSERT_RESULT(FetchInTxn(&extra_conn, "SELECT * FROM t WHERE k = 1 FOR KEY SHARE"));
1129
1130
0
    ASSERT_OK(conn.Execute("COMMIT"));
1131
1132
    // Transaction 3.
1133
0
    ASSERT_OK(StartTxn(&conn));
1134
0
    RowLock(&conn, "SELECT * FROM t WHERE k = 1 FOR SHARE", cur_name);
1135
1136
0
    ASSERT_NOK(FetchInTxn(&extra_conn, "SELECT * FROM t WHERE k = 1 FOR UPDATE"));
1137
0
    ASSERT_NOK(FetchInTxn(&extra_conn, "SELECT * FROM t WHERE k = 1 FOR NO KEY UPDATE"));
1138
0
    ASSERT_RESULT(FetchInTxn(&extra_conn, "SELECT * FROM t WHERE k = 1 FOR SHARE"));
1139
0
    ASSERT_RESULT(FetchInTxn(&extra_conn, "SELECT * FROM t WHERE k = 1 FOR KEY SHARE"));
1140
1141
0
    ASSERT_OK(conn.Execute("COMMIT"));
1142
1143
    // Transaction 4.
1144
0
    ASSERT_OK(StartTxn(&conn));
1145
0
    RowLock(&conn, "SELECT * FROM t WHERE k = 1 FOR KEY SHARE", cur_name);
1146
1147
0
    ASSERT_RESULT(FetchInTxn(&extra_conn, "SELECT * FROM t WHERE k = 1 FOR NO KEY UPDATE"));
1148
0
    ASSERT_RESULT(FetchInTxn(&extra_conn, "SELECT * FROM t WHERE k = 1 FOR SHARE"));
1149
0
    ASSERT_RESULT(FetchInTxn(&extra_conn, "SELECT * FROM t WHERE k = 1 FOR KEY SHARE"));
1150
1151
0
    ASSERT_OK(conn.Execute("COMMIT"));
1152
1153
    // Transaction 5.
1154
    // Check FOR KEY SHARE + FOR UPDATE conflict separately
1155
    // as FOR KEY SHARE uses regular and FOR UPDATE uses high txn priority.
1156
0
    ASSERT_OK(StartTxn(&conn));
1157
0
    RowLock(&conn, "SELECT * FROM t WHERE k = 1 FOR KEY SHARE", cur_name);
1158
1159
0
    ASSERT_OK(FetchInTxn(&extra_conn, "SELECT * FROM t WHERE k = 1 FOR UPDATE"));
1160
1161
0
    ASSERT_NOK(conn.Execute("COMMIT"));
1162
0
  }
Unexecuted instantiation: _ZN2yb9pgwrapper19PgMiniTestTxnHelperILNS_14IsolationLevelE2EE25TestRowLockConflictMatrixERKNSt3__112basic_stringIcNS4_11char_traitsIcEENS4_9allocatorIcEEEE
Unexecuted instantiation: _ZN2yb9pgwrapper19PgMiniTestTxnHelperILNS_14IsolationLevelE1EE25TestRowLockConflictMatrixERKNSt3__112basic_stringIcNS4_11char_traitsIcEENS4_9allocatorIcEEEE
1163
1164
0
  void RowLock(PGConn* connection, const std::string& query, const std::string& cur_name) {
1165
0
    std::string lock_stmt = query;
1166
0
    if (!cur_name.empty()) {
1167
0
      const std::string declare_stmt = Format("DECLARE $0 CURSOR FOR $1", cur_name, query);
1168
0
      ASSERT_OK(connection->Execute(declare_stmt));
1169
1170
0
      lock_stmt = Format("FETCH ALL $0", cur_name);
1171
0
    }
1172
0
    ASSERT_RESULT(connection->Fetch(lock_stmt));
1173
0
  }
Unexecuted instantiation: _ZN2yb9pgwrapper19PgMiniTestTxnHelperILNS_14IsolationLevelE2EE7RowLockEPNS0_6PGConnERKNSt3__112basic_stringIcNS6_11char_traitsIcEENS6_9allocatorIcEEEESE_
Unexecuted instantiation: _ZN2yb9pgwrapper19PgMiniTestTxnHelperILNS_14IsolationLevelE1EE7RowLockEPNS0_6PGConnERKNSt3__112basic_stringIcNS6_11char_traitsIcEENS6_9allocatorIcEEEESE_
1174
1175
0
  void TestInOperatorLock() {
1176
0
    auto conn = ASSERT_RESULT(Connect());
1177
0
    ASSERT_OK(conn.Execute("SET yb_transaction_priority_lower_bound = 0.9"));
1178
0
    ASSERT_OK(conn.Execute(
1179
0
        "CREATE TABLE t (h INT, r1 INT, r2 INT, PRIMARY KEY(h, r1 ASC, r2 ASC))"));
1180
0
    ASSERT_OK(conn.Execute(
1181
0
        "INSERT INTO t VALUES (1, 11, 1),(1, 12, 1),(1, 13, 1),(2, 11, 2),(2, 12, 2),(2, 13, 2)"));
1182
0
    ASSERT_OK(StartTxn(&conn));
1183
0
    auto res = ASSERT_RESULT(conn.Fetch(
1184
0
        "SELECT * FROM t WHERE h = 1 AND r1 IN (11, 12) AND r2 = 1 FOR KEY SHARE"));
1185
1186
0
    auto extra_conn = ASSERT_RESULT(Connect());
1187
0
    ASSERT_OK(extra_conn.Execute("SET yb_transaction_priority_upper_bound = 0.1"));
1188
0
    ASSERT_OK(extra_conn.Execute("BEGIN"));
1189
0
    ASSERT_NOK(extra_conn.Execute("DELETE FROM t WHERE h = 1 AND r1 = 11 AND r2 = 1"));
1190
0
    ASSERT_OK(extra_conn.Execute("ROLLBACK"));
1191
0
    ASSERT_OK(extra_conn.Execute("BEGIN"));
1192
0
    ASSERT_OK(extra_conn.Execute("DELETE FROM t WHERE h = 1 AND r1 = 13 AND r2 = 1"));
1193
0
    ASSERT_OK(extra_conn.Execute("COMMIT"));
1194
1195
0
    ASSERT_OK(conn.Execute("COMMIT;"));
1196
1197
0
    ASSERT_OK(conn.Execute("BEGIN;"));
1198
0
    res = ASSERT_RESULT(conn.Fetch(
1199
0
        "SELECT * FROM t WHERE h IN (1, 2) AND r1 = 11 FOR KEY SHARE"));
1200
1201
0
    ASSERT_OK(extra_conn.Execute("BEGIN"));
1202
0
    ASSERT_NOK(extra_conn.Execute("DELETE FROM t WHERE h = 1 AND r1 = 11 AND r2 = 1"));
1203
0
    ASSERT_OK(extra_conn.Execute("ROLLBACK"));
1204
0
    ASSERT_OK(extra_conn.Execute("BEGIN"));
1205
0
    ASSERT_NOK(extra_conn.Execute("DELETE FROM t WHERE h = 2 AND r1 = 11 AND r2 = 2"));
1206
0
    ASSERT_OK(extra_conn.Execute("ROLLBACK"));
1207
0
    ASSERT_OK(extra_conn.Execute("BEGIN"));
1208
0
    ASSERT_OK(extra_conn.Execute("DELETE FROM t WHERE h = 2 AND r1 = 12 AND r2 = 2"));
1209
0
    ASSERT_OK(extra_conn.Execute("COMMIT"));
1210
1211
0
    ASSERT_OK(conn.Execute("COMMIT;"));
1212
0
    const auto count = ASSERT_RESULT(conn.template FetchValue<int64_t>("SELECT COUNT(*) FROM t"));
1213
0
    ASSERT_EQ(4, count);
1214
0
  }
Unexecuted instantiation: _ZN2yb9pgwrapper19PgMiniTestTxnHelperILNS_14IsolationLevelE1EE18TestInOperatorLockEv
Unexecuted instantiation: _ZN2yb9pgwrapper19PgMiniTestTxnHelperILNS_14IsolationLevelE2EE18TestInOperatorLockEv
1215
1216
0
  static Result<PGConn> SetHighPriTxn(Result<PGConn> connection) {
1217
0
    return Execute(std::move(connection), "SET yb_transaction_priority_lower_bound=0.5");
1218
0
  }
Unexecuted instantiation: _ZN2yb9pgwrapper19PgMiniTestTxnHelperILNS_14IsolationLevelE2EE13SetHighPriTxnENS_6ResultINS0_6PGConnEEE
Unexecuted instantiation: _ZN2yb9pgwrapper19PgMiniTestTxnHelperILNS_14IsolationLevelE1EE13SetHighPriTxnENS_6ResultINS0_6PGConnEEE
1219
1220
0
  static Result<PGConn> SetLowPriTxn(Result<PGConn> connection) {
1221
0
    return Execute(std::move(connection), "SET yb_transaction_priority_upper_bound=0.4");
1222
0
  }
Unexecuted instantiation: _ZN2yb9pgwrapper19PgMiniTestTxnHelperILNS_14IsolationLevelE2EE12SetLowPriTxnENS_6ResultINS0_6PGConnEEE
Unexecuted instantiation: _ZN2yb9pgwrapper19PgMiniTestTxnHelperILNS_14IsolationLevelE1EE12SetLowPriTxnENS_6ResultINS0_6PGConnEEE
1223
1224
0
  static Result<PGConn> Execute(Result<PGConn> connection, const std::string& query) {
1225
0
    if (connection.ok()) {
1226
0
      RETURN_NOT_OK((*connection).Execute(query));
1227
0
    }
1228
0
    return connection;
1229
0
  }
Unexecuted instantiation: _ZN2yb9pgwrapper19PgMiniTestTxnHelperILNS_14IsolationLevelE2EE7ExecuteENS_6ResultINS0_6PGConnEEERKNSt3__112basic_stringIcNS7_11char_traitsIcEENS7_9allocatorIcEEEE
Unexecuted instantiation: _ZN2yb9pgwrapper19PgMiniTestTxnHelperILNS_14IsolationLevelE1EE7ExecuteENS_6ResultINS0_6PGConnEEERKNSt3__112basic_stringIcNS7_11char_traitsIcEENS7_9allocatorIcEEEE
1230
1231
0
  static CHECKED_STATUS StartTxn(PGConn* connection) {
1232
0
    return TxnHelper<level>::StartTxn(connection);
1233
0
  }
Unexecuted instantiation: _ZN2yb9pgwrapper19PgMiniTestTxnHelperILNS_14IsolationLevelE2EE8StartTxnEPNS0_6PGConnE
Unexecuted instantiation: _ZN2yb9pgwrapper19PgMiniTestTxnHelperILNS_14IsolationLevelE1EE8StartTxnEPNS0_6PGConnE
1234
1235
0
  static CHECKED_STATUS ExecuteInTxn(PGConn* connection, const std::string& query) {
1236
0
    return TxnHelper<level>::ExecuteInTxn(connection, query);
1237
0
  }
Unexecuted instantiation: _ZN2yb9pgwrapper19PgMiniTestTxnHelperILNS_14IsolationLevelE2EE12ExecuteInTxnEPNS0_6PGConnERKNSt3__112basic_stringIcNS6_11char_traitsIcEENS6_9allocatorIcEEEE
Unexecuted instantiation: _ZN2yb9pgwrapper19PgMiniTestTxnHelperILNS_14IsolationLevelE1EE12ExecuteInTxnEPNS0_6PGConnERKNSt3__112basic_stringIcNS6_11char_traitsIcEENS6_9allocatorIcEEEE
1238
1239
0
  static Result<PGResultPtr> FetchInTxn(PGConn* connection, const std::string& query) {
1240
0
    return TxnHelper<level>::FetchInTxn(connection, query);
1241
0
  }
Unexecuted instantiation: _ZN2yb9pgwrapper19PgMiniTestTxnHelperILNS_14IsolationLevelE2EE10FetchInTxnEPNS0_6PGConnERKNSt3__112basic_stringIcNS6_11char_traitsIcEENS6_9allocatorIcEEEE
Unexecuted instantiation: _ZN2yb9pgwrapper19PgMiniTestTxnHelperILNS_14IsolationLevelE1EE10FetchInTxnEPNS0_6PGConnERKNSt3__112basic_stringIcNS6_11char_traitsIcEENS6_9allocatorIcEEEE
1242
};
1243
1244
class PgMiniTestTxnHelperSerializable
1245
    : public PgMiniTestTxnHelper<IsolationLevel::SERIALIZABLE_ISOLATION> {
1246
 protected:
1247
  // Check two SERIALIZABLE txns has no conflict in case of updating same column in same row.
1248
0
  void TestSameColumnUpdate() {
1249
0
    auto conn = ASSERT_RESULT(SetHighPriTxn(Connect()));
1250
0
    auto extra_conn = ASSERT_RESULT(SetLowPriTxn(Connect()));
1251
1252
0
    ASSERT_OK(conn.Execute("CREATE TABLE t (k INT PRIMARY KEY, v1 INT, v2 INT)"));
1253
0
    ASSERT_OK(conn.Execute("INSERT INTO t VALUES(1, 2, 3)"));
1254
1255
0
    ASSERT_OK(StartTxn(&conn));
1256
0
    ASSERT_OK(conn.Execute("UPDATE t SET v1 = 20 WHERE k = 1"));
1257
1258
0
    ASSERT_OK(ExecuteInTxn(&extra_conn, "UPDATE t SET v1 = 40 WHERE k = 1"));
1259
1260
0
    ASSERT_OK(conn.Execute("COMMIT"));
1261
1262
0
    auto res = ASSERT_RESULT(conn.FetchValue<int64_t>("SELECT COUNT(*) FROM t WHERE v1 = 20"));
1263
0
    ASSERT_EQ(res, 1);
1264
1265
0
    ASSERT_OK(StartTxn(&conn));
1266
    // Next statement will lock whole row for updates due to expression
1267
0
    ASSERT_OK(conn.Execute("UPDATE t SET v2 = v2 * 2 WHERE k = 1"));
1268
1269
0
    ASSERT_NOK(ExecuteInTxn(&extra_conn, "UPDATE t SET v2 = 10 WHERE k = 1"));
1270
0
    ASSERT_NOK(ExecuteInTxn(&extra_conn, "UPDATE t SET v1 = 10 WHERE k = 1"));
1271
1272
0
    ASSERT_OK(conn.Execute("COMMIT"));
1273
1274
0
    res = ASSERT_RESULT(conn.FetchValue<int64_t>("SELECT COUNT(*) FROM t WHERE v2 = 6"));
1275
0
    ASSERT_EQ(res, 1);
1276
0
  }
1277
};
1278
1279
class PgMiniTestTxnHelperSnapshot
1280
    : public PgMiniTestTxnHelper<IsolationLevel::SNAPSHOT_ISOLATION> {
1281
 protected:
1282
  // Check two SNAPSHOT txns has a conflict in case of updating same column in same row.
1283
0
  void TestSameColumnUpdate() {
1284
0
    auto conn = ASSERT_RESULT(SetHighPriTxn(Connect()));
1285
0
    auto extra_conn = ASSERT_RESULT(SetLowPriTxn(Connect()));
1286
1287
0
    ASSERT_OK(conn.Execute("CREATE TABLE t (k INT PRIMARY KEY, v INT)"));
1288
0
    ASSERT_OK(conn.Execute("INSERT INTO t VALUES(1, 2)"));
1289
1290
0
    ASSERT_OK(StartTxn(&conn));
1291
0
    ASSERT_OK(conn.Execute("UPDATE t SET v = 20 WHERE k = 1"));
1292
1293
0
    ASSERT_NOK(ExecuteInTxn(&extra_conn, "UPDATE t SET v = 40 WHERE k = 1"));
1294
1295
0
    ASSERT_OK(conn.Execute("COMMIT"));
1296
1297
0
    const auto res = ASSERT_RESULT(conn.FetchValue<int64_t>("SELECT COUNT(*) FROM t WHERE v = 20"));
1298
0
    ASSERT_EQ(res, 1);
1299
0
  }
1300
};
1301
1302
TEST_F_EX(PgMiniTest,
1303
          YB_DISABLE_TEST_IN_TSAN(ReferencedTableUpdateSerializable),
1304
0
          PgMiniTestTxnHelperSerializable) {
1305
0
  TestReferencedTableUpdate();
1306
0
}
1307
1308
TEST_F_EX(PgMiniTest,
1309
          YB_DISABLE_TEST_IN_TSAN(ReferencedTableUpdateSnapshot),
1310
0
          PgMiniTestTxnHelperSnapshot) {
1311
0
  TestReferencedTableUpdate();
1312
0
}
1313
1314
TEST_F_EX(PgMiniTest,
1315
          YB_DISABLE_TEST_IN_TSAN(RowKeyShareLockSerializable),
1316
0
          PgMiniTestTxnHelperSerializable) {
1317
0
  TestRowKeyShareLock();
1318
0
}
1319
1320
TEST_F_EX(PgMiniTest,
1321
          YB_DISABLE_TEST_IN_TSAN(RowKeyShareLockSnapshot),
1322
0
          PgMiniTestTxnHelperSnapshot) {
1323
0
  TestRowKeyShareLock();
1324
0
}
1325
1326
TEST_F_EX(PgMiniTest,
1327
          YB_DISABLE_TEST_IN_TSAN(RowLockConflictMatrixSerializable),
1328
0
          PgMiniTestTxnHelperSerializable) {
1329
0
  TestRowLockConflictMatrix();
1330
0
}
1331
1332
TEST_F_EX(PgMiniTest,
1333
          YB_DISABLE_TEST_IN_TSAN(RowLockConflictMatrixSnapshot),
1334
0
          PgMiniTestTxnHelperSnapshot) {
1335
0
  TestRowLockConflictMatrix();
1336
0
}
1337
1338
TEST_F_EX(PgMiniTest,
1339
          YB_DISABLE_TEST_IN_TSAN(CursorRowKeyShareLockSerializable),
1340
0
          PgMiniTestTxnHelperSerializable) {
1341
0
  TestRowKeyShareLock("cur_name");
1342
0
}
1343
1344
TEST_F_EX(PgMiniTest,
1345
          YB_DISABLE_TEST_IN_TSAN(CursorRowKeyShareLockSnapshot),
1346
0
          PgMiniTestTxnHelperSnapshot) {
1347
0
  TestRowKeyShareLock("cur_name");
1348
0
}
1349
1350
TEST_F_EX(PgMiniTest,
1351
          YB_DISABLE_TEST_IN_TSAN(CursorRowLockConflictMatrixSerializable),
1352
0
          PgMiniTestTxnHelperSerializable) {
1353
0
  TestRowLockConflictMatrix("cur_name");
1354
0
}
1355
1356
TEST_F_EX(PgMiniTest,
1357
          YB_DISABLE_TEST_IN_TSAN(CursorRowLockConflictMatrixSnapshot),
1358
0
          PgMiniTestTxnHelperSnapshot) {
1359
0
  TestRowLockConflictMatrix("cur_name");
1360
0
}
1361
1362
TEST_F_EX(PgMiniTest,
1363
          YB_DISABLE_TEST_IN_TSAN(SameColumnUpdateSerializable),
1364
0
          PgMiniTestTxnHelperSerializable) {
1365
0
  TestSameColumnUpdate();
1366
0
}
1367
1368
TEST_F_EX(PgMiniTest,
1369
          YB_DISABLE_TEST_IN_TSAN(SameColumnUpdateSnapshot),
1370
0
          PgMiniTestTxnHelperSnapshot) {
1371
0
  TestSameColumnUpdate();
1372
0
}
1373
1374
// ------------------------------------------------------------------------------------------------
1375
// A test performing manual transaction control on system tables.
1376
// ------------------------------------------------------------------------------------------------
1377
1378
0
TEST_F_EX(PgMiniTest, YB_DISABLE_TEST_IN_TSAN(SystemTableTxnTest), PgMiniTestNoTxnRetry) {
1379
1380
  // Resolving conflicts between transactions on a system table.
1381
  //
1382
  // postgres=# \d pg_ts_dict;
1383
  //
1384
  //              Table "pg_catalog.pg_ts_dict"
1385
  //      Column     | Type | Collation | Nullable | Default
1386
  // ----------------+------+-----------+----------+---------
1387
  //  dictname       | name |           | not null |
1388
  //  dictnamespace  | oid  |           | not null |
1389
  //  dictowner      | oid  |           | not null |
1390
  //  dicttemplate   | oid  |           | not null |
1391
  //  dictinitoption | text |           |          |
1392
  // Indexes:
1393
  //     "pg_ts_dict_oid_index" PRIMARY KEY, lsm (oid)
1394
  //     "pg_ts_dict_dictname_index" UNIQUE, lsm (dictname, dictnamespace)
1395
1396
0
  auto conn1 = ASSERT_RESULT(Connect());
1397
0
  auto conn2 = ASSERT_RESULT(Connect());
1398
0
  ASSERT_OK(conn1.Execute("SET yb_non_ddl_txn_for_sys_tables_allowed=1"));
1399
0
  ASSERT_OK(conn2.Execute("SET yb_non_ddl_txn_for_sys_tables_allowed=1"));
1400
1401
0
  size_t commit1_fail_count = 0;
1402
0
  size_t commit2_fail_count = 0;
1403
0
  size_t insert2_fail_count = 0;
1404
1405
0
  const auto kStartTxnStatementStr = "START TRANSACTION ISOLATION LEVEL REPEATABLE READ";
1406
0
  const int iterations = 48;
1407
0
  for (int i = 1; i <= iterations; ++i) {
1408
0
    std::string dictname = Format("contendedkey$0", i);
1409
0
    const int dictnamespace = i;
1410
0
    ASSERT_OK(conn1.Execute(kStartTxnStatementStr));
1411
0
    ASSERT_OK(conn2.Execute(kStartTxnStatementStr));
1412
1413
    // Insert a row in each transaction. The first insert should always succeed.
1414
0
    ASSERT_OK(conn1.Execute(
1415
0
        Format("INSERT INTO pg_ts_dict VALUES ('$0', $1, 1, 2, 'b')", dictname, dictnamespace)));
1416
0
    Status insert_status2 = conn2.Execute(
1417
0
        Format("INSERT INTO pg_ts_dict VALUES ('$0', $1, 3, 4, 'c')", dictname, dictnamespace));
1418
0
    if (!insert_status2.ok()) {
1419
0
      LOG(INFO) << "MUST BE A CONFLICT: Insert failed: " << insert_status2;
1420
0
      insert2_fail_count++;
1421
0
    }
1422
1423
0
    Status commit_status1;
1424
0
    Status commit_status2;
1425
0
    if (RandomUniformBool()) {
1426
0
      commit_status1 = conn1.Execute("COMMIT");
1427
0
      commit_status2 = conn2.Execute("COMMIT");
1428
0
    } else {
1429
0
      commit_status2 = conn2.Execute("COMMIT");
1430
0
      commit_status1 = conn1.Execute("COMMIT");
1431
0
    }
1432
0
    if (!commit_status1.ok()) {
1433
0
      commit1_fail_count++;
1434
0
    }
1435
0
    if (!commit_status2.ok()) {
1436
0
      commit2_fail_count++;
1437
0
    }
1438
1439
0
    auto get_commit_statuses_str = [&commit_status1, &commit_status2]() {
1440
0
      return Format("commit_status1=$0, commit_status2=$1", commit_status1, commit_status2);
1441
0
    };
1442
1443
0
    bool succeeded1 = commit_status1.ok();
1444
0
    bool succeeded2 = insert_status2.ok() && commit_status2.ok();
1445
1446
0
    ASSERT_TRUE(!succeeded1 || !succeeded2)
1447
0
        << "Both transactions can't commit. " << get_commit_statuses_str();
1448
0
    ASSERT_TRUE(succeeded1 || succeeded2)
1449
0
        << "We expect one of the two transactions to succeed. " << get_commit_statuses_str();
1450
0
    if (!commit_status1.ok()) {
1451
0
      ASSERT_OK(conn1.Execute("ROLLBACK"));
1452
0
    }
1453
0
    if (!commit_status2.ok()) {
1454
0
      ASSERT_OK(conn2.Execute("ROLLBACK"));
1455
0
    }
1456
1457
0
    if (RandomUniformBool()) {
1458
0
      std::swap(conn1, conn2);
1459
0
    }
1460
0
  }
1461
0
  LOG(INFO) << "Test stats: "
1462
0
            << EXPR_VALUE_FOR_LOG(commit1_fail_count) << ", "
1463
0
            << EXPR_VALUE_FOR_LOG(insert2_fail_count) << ", "
1464
0
            << EXPR_VALUE_FOR_LOG(commit2_fail_count);
1465
0
  ASSERT_GE(commit1_fail_count, iterations / 4);
1466
0
  ASSERT_GE(insert2_fail_count, iterations / 4);
1467
0
  ASSERT_EQ(commit2_fail_count, 0);
1468
0
}
1469
1470
0
TEST_F(PgMiniTest, YB_DISABLE_TEST_IN_TSAN(DropDBUpdateSysTablet)) {
1471
0
  const std::string kDatabaseName = "testdb";
1472
0
  PGConn conn = ASSERT_RESULT(Connect());
1473
0
  std::array<int, 4> num_tables;
1474
1475
0
  auto* catalog_manager = &ASSERT_RESULT(cluster_->GetLeaderMiniMaster())->catalog_manager();
1476
0
  auto sys_tablet = ASSERT_RESULT(catalog_manager->GetTabletInfo(master::kSysCatalogTabletId));
1477
0
  {
1478
0
    auto tablet_lock = sys_tablet->LockForWrite();
1479
0
    num_tables[0] = tablet_lock->pb.table_ids_size();
1480
0
  }
1481
0
  ASSERT_OK(conn.ExecuteFormat("CREATE DATABASE $0", kDatabaseName));
1482
0
  {
1483
0
    auto tablet_lock = sys_tablet->LockForWrite();
1484
0
    num_tables[1] = tablet_lock->pb.table_ids_size();
1485
0
  }
1486
0
  ASSERT_OK(conn.ExecuteFormat("DROP DATABASE $0", kDatabaseName));
1487
0
  {
1488
0
    auto tablet_lock = sys_tablet->LockForWrite();
1489
0
    num_tables[2] = tablet_lock->pb.table_ids_size();
1490
0
  }
1491
  // Make sure that the system catalog tablet table_ids is persisted.
1492
0
  ASSERT_OK(RestartCluster());
1493
0
  catalog_manager = &ASSERT_RESULT(cluster_->GetLeaderMiniMaster())->catalog_manager();
1494
0
  sys_tablet = ASSERT_RESULT(catalog_manager->GetTabletInfo(master::kSysCatalogTabletId));
1495
0
  {
1496
0
    auto tablet_lock = sys_tablet->LockForWrite();
1497
0
    num_tables[3] = tablet_lock->pb.table_ids_size();
1498
0
  }
1499
0
  ASSERT_LT(num_tables[0], num_tables[1]);
1500
0
  ASSERT_EQ(num_tables[0], num_tables[2]);
1501
0
  ASSERT_EQ(num_tables[0], num_tables[3]);
1502
0
}
1503
1504
0
TEST_F(PgMiniTest, YB_DISABLE_TEST_IN_TSAN(DropDBMarkDeleted)) {
1505
0
  const std::string kDatabaseName = "testdb";
1506
0
  constexpr auto kSleepTime = 500ms;
1507
0
  constexpr int kMaxNumSleeps = 20;
1508
0
  auto *catalog_manager = &ASSERT_RESULT(cluster_->GetLeaderMiniMaster())->catalog_manager();
1509
0
  PGConn conn = ASSERT_RESULT(Connect());
1510
1511
0
  ASSERT_FALSE(catalog_manager->AreTablesDeleting());
1512
0
  ASSERT_OK(conn.ExecuteFormat("CREATE DATABASE $0", kDatabaseName));
1513
0
  ASSERT_OK(conn.ExecuteFormat("DROP DATABASE $0", kDatabaseName));
1514
  // System tables should be deleting then deleted.
1515
0
  int num_sleeps = 0;
1516
0
  while (catalog_manager->AreTablesDeleting() && (num_sleeps++ != kMaxNumSleeps)) {
1517
0
    LOG(INFO) << "Tables are deleting...";
1518
0
    std::this_thread::sleep_for(kSleepTime);
1519
0
  }
1520
0
  ASSERT_FALSE(catalog_manager->AreTablesDeleting()) << "Tables should have finished deleting";
1521
  // Make sure that the table deletions are persisted.
1522
0
  ASSERT_OK(RestartCluster());
1523
  // Refresh stale local variable after RestartSync.
1524
0
  catalog_manager = &ASSERT_RESULT(cluster_->GetLeaderMiniMaster())->catalog_manager();
1525
0
  ASSERT_FALSE(catalog_manager->AreTablesDeleting());
1526
0
}
1527
1528
0
TEST_F(PgMiniTest, YB_DISABLE_TEST_IN_TSAN(DropDBWithTables)) {
1529
0
  const std::string kDatabaseName = "testdb";
1530
0
  const std::string kTablePrefix = "testt";
1531
0
  constexpr auto kSleepTime = 500ms;
1532
0
  constexpr int kMaxNumSleeps = 20;
1533
0
  int num_tables_before, num_tables_after;
1534
0
  auto *catalog_manager = &ASSERT_RESULT(cluster_->GetLeaderMiniMaster())->catalog_manager();
1535
0
  PGConn conn = ASSERT_RESULT(Connect());
1536
0
  auto sys_tablet = ASSERT_RESULT(catalog_manager->GetTabletInfo(master::kSysCatalogTabletId));
1537
1538
0
  {
1539
0
    auto tablet_lock = sys_tablet->LockForWrite();
1540
0
    num_tables_before = tablet_lock->pb.table_ids_size();
1541
0
  }
1542
0
  ASSERT_OK(conn.ExecuteFormat("CREATE DATABASE $0", kDatabaseName));
1543
0
  {
1544
0
    PGConn conn_new = ASSERT_RESULT(ConnectToDB(kDatabaseName));
1545
0
    for (int i = 0; i < 10; ++i) {
1546
0
      ASSERT_OK(conn_new.ExecuteFormat("CREATE TABLE $0$1 (i int)", kTablePrefix, i));
1547
0
    }
1548
0
    ASSERT_OK(conn_new.ExecuteFormat("INSERT INTO $0$1 (i) VALUES (1), (2), (3)", kTablePrefix, 5));
1549
0
  }
1550
0
  ASSERT_OK(conn.ExecuteFormat("DROP DATABASE $0", kDatabaseName));
1551
  // User and system tables should be deleting then deleted.
1552
0
  int num_sleeps = 0;
1553
0
  while (catalog_manager->AreTablesDeleting() && (num_sleeps++ != kMaxNumSleeps)) {
1554
0
    LOG(INFO) << "Tables are deleting...";
1555
0
    std::this_thread::sleep_for(kSleepTime);
1556
0
  }
1557
0
  ASSERT_FALSE(catalog_manager->AreTablesDeleting()) << "Tables should have finished deleting";
1558
  // Make sure that the table deletions are persisted.
1559
0
  ASSERT_OK(RestartCluster());
1560
0
  catalog_manager = &ASSERT_RESULT(cluster_->GetLeaderMiniMaster())->catalog_manager();
1561
0
  sys_tablet = ASSERT_RESULT(catalog_manager->GetTabletInfo(master::kSysCatalogTabletId));
1562
0
  ASSERT_FALSE(catalog_manager->AreTablesDeleting());
1563
0
  {
1564
0
    auto tablet_lock = sys_tablet->LockForWrite();
1565
0
    num_tables_after = tablet_lock->pb.table_ids_size();
1566
0
  }
1567
0
  ASSERT_EQ(num_tables_before, num_tables_after);
1568
0
}
1569
1570
TEST_F_EX(PgMiniTest, YB_DISABLE_TEST_IN_SANITIZERS(DropAllTablesInColocatedDB),
1571
0
          PgMiniMasterFailoverTest) {
1572
0
  const std::string kDatabaseName = "testdb";
1573
  // Create a colocated DB, create some tables, delete all of them.
1574
0
  {
1575
0
    PGConn conn = ASSERT_RESULT(Connect());
1576
0
    ASSERT_OK(conn.ExecuteFormat("CREATE DATABASE $0 with colocated=true", kDatabaseName));
1577
0
    {
1578
0
      PGConn conn_new = ASSERT_RESULT(ConnectToDB(kDatabaseName));
1579
0
      ASSERT_OK(conn_new.Execute("CREATE TABLE foo (i int)"));
1580
0
      ASSERT_OK(conn_new.Execute("DROP TABLE foo"));
1581
0
    }
1582
0
  }
1583
  // Failover to a new master.
1584
0
  LOG(INFO) << "Failover to new Master";
1585
0
  auto old_master = ASSERT_RESULT(cluster_->GetLeaderMiniMaster());
1586
0
  ASSERT_RESULT(cluster_->GetLeaderMiniMaster())->Shutdown();
1587
0
  auto new_master = ASSERT_RESULT(cluster_->GetLeaderMiniMaster());
1588
0
  ASSERT_NE(nullptr, new_master);
1589
0
  ASSERT_NE(old_master, new_master);
1590
  // Wait for all the TabletServers to report in, so we can run CREATE TABLE with working replicas.
1591
0
  ASSERT_OK(cluster_->WaitForAllTabletServers());
1592
  // Ensure we can still access the colocated DB on restart.
1593
0
  {
1594
0
    PGConn conn_new = ASSERT_RESULT(ConnectToDB(kDatabaseName));
1595
0
    ASSERT_OK(conn_new.Execute("CREATE TABLE foo (i int)"));
1596
0
  }
1597
0
}
1598
1599
1600
0
TEST_F(PgMiniTest, YB_DISABLE_TEST_IN_TSAN(BigSelect)) {
1601
0
  auto conn = ASSERT_RESULT(Connect());
1602
1603
0
  ASSERT_OK(conn.Execute("CREATE TABLE t (key INT PRIMARY KEY, value TEXT)"));
1604
1605
0
  constexpr size_t kRows = 400;
1606
0
  constexpr size_t kValueSize = RegularBuildVsSanitizers(256_KB, 4_KB);
1607
1608
0
  for (size_t i = 0; i != kRows; ++i) {
1609
0
    ASSERT_OK(conn.ExecuteFormat(
1610
0
        "INSERT INTO t VALUES ($0, '$1')", i, RandomHumanReadableString(kValueSize)));
1611
0
  }
1612
1613
0
  auto start = MonoTime::Now();
1614
0
  auto res = ASSERT_RESULT(conn.FetchValue<int64_t>("SELECT COUNT(DISTINCT(value)) FROM t"));
1615
0
  auto finish = MonoTime::Now();
1616
0
  LOG(INFO) << "Time: " << finish - start;
1617
0
  ASSERT_EQ(res, kRows);
1618
0
}
1619
1620
0
TEST_F_EX(PgMiniTest, YB_DISABLE_TEST_IN_TSAN(ManyRowsInsert), PgMiniSingleTServerTest) {
1621
0
  constexpr int kRows = 100000;
1622
0
  auto conn = ASSERT_RESULT(Connect());
1623
1624
0
  ASSERT_OK(conn.Execute("CREATE TABLE t (key INT PRIMARY KEY)"));
1625
1626
0
  auto start = MonoTime::Now();
1627
0
  ASSERT_OK(conn.ExecuteFormat("INSERT INTO t SELECT generate_series(1, $0)", kRows));
1628
0
  auto finish = MonoTime::Now();
1629
0
  LOG(INFO) << "Time: " << finish - start;
1630
0
}
1631
1632
0
TEST_F(PgMiniTest, YB_DISABLE_TEST_IN_TSAN(MoveMaster)) {
1633
0
  ShutdownAllMasters(cluster_.get());
1634
0
  cluster_->mini_master(0)->set_pass_master_addresses(false);
1635
0
  ASSERT_OK(StartAllMasters(cluster_.get()));
1636
1637
0
  auto conn = ASSERT_RESULT(Connect());
1638
1639
0
  ASSERT_OK(WaitFor([&conn] {
1640
0
    auto status = conn.Execute("CREATE TABLE t (key INT PRIMARY KEY)");
1641
0
    WARN_NOT_OK(status, "Failed to create table");
1642
0
    return status.ok();
1643
0
  }, 15s, "Create table"));
1644
0
}
1645
1646
class PgMiniBigPrefetchTest : public PgMiniSingleTServerTest {
1647
 protected:
1648
0
  void SetUp() override {
1649
0
    FLAGS_ysql_prefetch_limit = 20000000;
1650
0
    PgMiniTest::SetUp();
1651
0
  }
1652
1653
0
  void Run(int rows, int block_size, int reads, bool compact = false) {
1654
0
    auto conn = ASSERT_RESULT(Connect());
1655
1656
0
    ASSERT_OK(conn.Execute("CREATE TABLE t (a int PRIMARY KEY) SPLIT INTO 1 TABLETS"));
1657
0
    auto last_row = 0;
1658
0
    while (last_row < rows) {
1659
0
      auto first_row = last_row + 1;
1660
0
      last_row = std::min(rows, last_row + block_size);
1661
0
      ASSERT_OK(conn.ExecuteFormat(
1662
0
          "INSERT INTO t SELECT generate_series($0, $1)", first_row, last_row));
1663
0
    }
1664
1665
0
    auto peers = ListTabletPeers(cluster_.get(), ListPeersFilter::kAll);
1666
0
    for (const auto& peer : peers) {
1667
0
      auto tp = peer->tablet()->transaction_participant();
1668
0
      if (tp) {
1669
0
        LOG(INFO) << peer->LogPrefix() << "Intents: " << tp->TEST_CountIntents().first;
1670
0
      }
1671
0
    }
1672
1673
0
    if (compact) {
1674
0
      FlushAndCompactTablets();
1675
0
    }
1676
1677
0
    LOG(INFO) << "Perform read";
1678
1679
0
    if (VLOG_IS_ON(4)) {
1680
0
      google::SetVLOGLevel("intent_aware_iterator", 4);
1681
0
      google::SetVLOGLevel("docdb_rocksdb_util", 4);
1682
0
      google::SetVLOGLevel("docdb", 4);
1683
0
    }
1684
1685
0
    for (int i = 0; i != reads; ++i) {
1686
0
      auto start = MonoTime::Now();
1687
0
      auto fetched_rows = ASSERT_RESULT(conn.FetchValue<int64_t>("SELECT count(*) FROM t"));
1688
0
      auto finish = MonoTime::Now();
1689
0
      ASSERT_EQ(rows, fetched_rows);
1690
0
      LOG(INFO) << i << ") Full Time: " << finish - start;
1691
0
    }
1692
0
  }
1693
};
1694
1695
0
TEST_F_EX(PgMiniTest, YB_DISABLE_TEST_IN_TSAN(BigRead), PgMiniBigPrefetchTest) {
1696
0
  constexpr int kRows = RegularBuildVsDebugVsSanitizers(1000000, 100000, 10000);
1697
0
  constexpr int kBlockSize = 1000;
1698
0
  constexpr int kReads = 3;
1699
1700
0
  Run(kRows, kBlockSize, kReads);
1701
0
}
1702
1703
0
TEST_F_EX(PgMiniTest, YB_DISABLE_TEST_IN_TSAN(BigReadWithCompaction), PgMiniBigPrefetchTest) {
1704
0
  constexpr int kRows = RegularBuildVsDebugVsSanitizers(1000000, 100000, 10000);
1705
0
  constexpr int kBlockSize = 1000;
1706
0
  constexpr int kReads = 3;
1707
1708
0
  Run(kRows, kBlockSize, kReads, /* compact= */ true);
1709
0
}
1710
1711
0
TEST_F_EX(PgMiniTest, YB_DISABLE_TEST_IN_TSAN(SmallRead), PgMiniBigPrefetchTest) {
1712
0
  constexpr int kRows = 10;
1713
0
  constexpr int kBlockSize = kRows;
1714
0
  constexpr int kReads = 1;
1715
1716
0
  Run(kRows, kBlockSize, kReads);
1717
0
}
1718
1719
0
TEST_F(PgMiniTest, YB_DISABLE_TEST_IN_TSAN(DDLWithRestart)) {
1720
0
  SetAtomicFlag(1.0, &FLAGS_TEST_transaction_ignore_applying_probability);
1721
0
  FLAGS_TEST_force_master_leader_resolution = true;
1722
1723
0
  auto conn = ASSERT_RESULT(Connect());
1724
1725
0
  ASSERT_OK(conn.StartTransaction(IsolationLevel::SERIALIZABLE_ISOLATION));
1726
0
  ASSERT_OK(conn.Execute("CREATE TABLE t (a int PRIMARY KEY)"));
1727
0
  ASSERT_OK(conn.CommitTransaction());
1728
1729
0
  ShutdownAllMasters(cluster_.get());
1730
1731
0
  LOG(INFO) << "Start masters";
1732
0
  ASSERT_OK(StartAllMasters(cluster_.get()));
1733
1734
0
  auto res = ASSERT_RESULT(conn.FetchValue<int64_t>("SELECT COUNT(*) FROM t"));
1735
0
  ASSERT_EQ(res, 0);
1736
0
}
1737
1738
class PgMiniRocksDbIteratorLoggingTest : public PgMiniSingleTServerTest {
1739
 public:
1740
  struct IteratorLoggingTestConfig {
1741
    int num_non_pk_columns;
1742
    int num_rows;
1743
    int num_overwrites;
1744
    int first_row_to_scan;
1745
    int last_row_to_scan;
1746
  };
1747
1748
0
  void RunIteratorLoggingTest(const IteratorLoggingTestConfig& config) {
1749
0
    auto conn = ASSERT_RESULT(Connect());
1750
1751
0
    std::string non_pk_columns_schema;
1752
0
    std::string non_pk_column_names;
1753
0
    for (int i = 0; i < config.num_non_pk_columns; ++i) {
1754
0
      non_pk_columns_schema += Format(", $0 TEXT", GetNonPkColName(i));
1755
0
      non_pk_column_names += Format(", $0", GetNonPkColName(i));
1756
0
    }
1757
0
    ASSERT_OK(conn.ExecuteFormat("CREATE TABLE t (pk TEXT, PRIMARY KEY (pk ASC)$0)",
1758
0
                                 non_pk_columns_schema));
1759
    // Delete and overwrite every row multiple times.
1760
0
    for (int overwrite_index = 0; overwrite_index < config.num_overwrites; ++overwrite_index) {
1761
0
      for (int row_index = 0; row_index < config.num_rows; ++row_index) {
1762
0
        string non_pk_values;
1763
0
        for (int non_pk_col_index = 0;
1764
0
             non_pk_col_index < config.num_non_pk_columns;
1765
0
             ++non_pk_col_index) {
1766
0
          non_pk_values += Format(", '$0'", GetNonPkColValue(
1767
0
              non_pk_col_index, row_index, overwrite_index));
1768
0
        }
1769
1770
0
        const auto pk_value = GetPkForRow(row_index);
1771
0
        ASSERT_OK(conn.ExecuteFormat(
1772
0
            "INSERT INTO t(pk$0) VALUES('$1'$2)", non_pk_column_names, pk_value, non_pk_values));
1773
0
        if (overwrite_index != config.num_overwrites - 1) {
1774
0
          ASSERT_OK(conn.ExecuteFormat("DELETE FROM t WHERE pk = '$0'", pk_value));
1775
0
        }
1776
0
      }
1777
0
    }
1778
0
    const auto first_pk_to_scan = GetPkForRow(config.first_row_to_scan);
1779
0
    const auto last_pk_to_scan = GetPkForRow(config.last_row_to_scan);
1780
0
    auto count_stmt_str = Format(
1781
0
        "SELECT COUNT(*) FROM t WHERE pk >= '$0' AND pk <= '$1'",
1782
0
        first_pk_to_scan,
1783
0
        last_pk_to_scan);
1784
    // Do the same scan twice, and only turn on iterator logging on the second scan.
1785
    // This way we won't be logging system table operations needed to fetch PostgreSQL metadata.
1786
0
    for (bool is_warmup : {true, false}) {
1787
0
      if (!is_warmup) {
1788
0
        SetAtomicFlag(true, &FLAGS_rocksdb_use_logging_iterator);
1789
0
      }
1790
0
      auto count_result = ASSERT_RESULT(conn.Fetch(count_stmt_str));
1791
0
      ASSERT_EQ(PQntuples(count_result.get()), 1);
1792
1793
0
      auto actual_num_rows = ASSERT_RESULT(GetInt64(count_result.get(), 0, 0));
1794
0
      const int expected_num_rows = config.last_row_to_scan - config.first_row_to_scan + 1;
1795
0
      ASSERT_EQ(expected_num_rows, actual_num_rows);
1796
0
    }
1797
0
    SetAtomicFlag(false, &FLAGS_rocksdb_use_logging_iterator);
1798
0
  }
1799
1800
 private:
1801
0
  std::string GetNonPkColName(int non_pk_col_index) {
1802
0
    return Format("non_pk_col$0", non_pk_col_index);
1803
0
  }
1804
1805
0
  std::string GetPkForRow(int row_index) {
1806
0
    return Format("PrimaryKeyForRow$0", row_index);
1807
0
  }
1808
1809
0
  std::string GetNonPkColValue(int non_pk_col_index, int row_index, int overwrite_index) {
1810
0
    return Format("NonPkCol$0ValueForRow$1Overwrite$2",
1811
0
                  non_pk_col_index, row_index, overwrite_index);
1812
0
  }
1813
};
1814
1815
TEST_F_EX(PgMiniTest,
1816
0
          YB_DISABLE_TEST_IN_TSAN(IteratorLogPkOnly), PgMiniRocksDbIteratorLoggingTest) {
1817
0
  RunIteratorLoggingTest({
1818
0
    .num_non_pk_columns = 0,
1819
0
    .num_rows = 5,
1820
0
    .num_overwrites = 100,
1821
0
    .first_row_to_scan = 1,  // 0-based
1822
0
    .last_row_to_scan = 3,
1823
0
  });
1824
0
}
1825
1826
TEST_F_EX(PgMiniTest,
1827
0
          YB_DISABLE_TEST_IN_TSAN(IteratorLogTwoNonPkCols), PgMiniRocksDbIteratorLoggingTest) {
1828
0
  RunIteratorLoggingTest({
1829
0
    .num_non_pk_columns = 2,
1830
0
    .num_rows = 5,
1831
0
    .num_overwrites = 100,
1832
0
    .first_row_to_scan = 1,  // 0-based
1833
0
    .last_row_to_scan = 3,
1834
0
  });
1835
0
}
1836
1837
// ------------------------------------------------------------------------------------------------
1838
// Backward scan on an index
1839
// ------------------------------------------------------------------------------------------------
1840
1841
class PgMiniBackwardIndexScanTest : public PgMiniSingleTServerTest {
1842
 protected:
1843
0
  void BackwardIndexScanTest(bool uncommitted_intents) {
1844
0
    auto conn = ASSERT_RESULT(Connect());
1845
1846
0
    ASSERT_OK(conn.Execute(R"#(
1847
0
        create table events_backwardscan (
1848
0
1849
0
          log       text not null,
1850
0
          src       text not null,
1851
0
          inserted  timestamp(3) without time zone not null,
1852
0
          created   timestamp(3) without time zone not null,
1853
0
          data      jsonb not null,
1854
0
1855
0
          primary key (log, src, created)
1856
0
        );
1857
0
      )#"));
1858
0
    ASSERT_OK(conn.Execute("create index on events_backwardscan (inserted asc);"));
1859
1860
0
    for (int day = 1; day <= 31; ++day) {
1861
0
      ASSERT_OK(conn.ExecuteFormat(R"#(
1862
0
          insert into events_backwardscan
1863
0
1864
0
          select
1865
0
            'log',
1866
0
            'src',
1867
0
            t,
1868
0
            t,
1869
0
            '{}'
1870
0
1871
0
          from generate_series(
1872
0
            timestamp '2020-01-$0 00:00:00',
1873
0
            timestamp '2020-01-$0 23:59:59',
1874
0
            interval  '1 minute'
1875
0
          )
1876
0
1877
0
          as t(day);
1878
0
      )#", day));
1879
0
    }
1880
1881
0
    boost::optional<PGConn> uncommitted_intents_conn;
1882
0
    if (uncommitted_intents) {
1883
0
      uncommitted_intents_conn = ASSERT_RESULT(Connect());
1884
0
      ASSERT_OK(uncommitted_intents_conn->Execute("BEGIN"));
1885
0
      auto ts = "1970-01-01 00:00:00";
1886
0
      ASSERT_OK(uncommitted_intents_conn->ExecuteFormat(
1887
0
          "insert into events_backwardscan values ('log', 'src', '$0', '$0', '{}')", ts, ts));
1888
0
    }
1889
1890
0
    auto count = ASSERT_RESULT(
1891
0
        conn.FetchValue<int64_t>("SELECT COUNT(*) FROM events_backwardscan"));
1892
0
    LOG(INFO) << "Total rows inserted: " << count;
1893
1894
0
    auto select_result = ASSERT_RESULT(conn.Fetch(
1895
0
        "select * from events_backwardscan order by inserted desc limit 100"
1896
0
    ));
1897
0
    ASSERT_EQ(PQntuples(select_result.get()), 100);
1898
1899
0
    if (uncommitted_intents) {
1900
0
      ASSERT_OK(uncommitted_intents_conn->Execute("ROLLBACK"));
1901
0
    }
1902
0
  }
1903
};
1904
1905
TEST_F_EX(PgMiniTest,
1906
          YB_DISABLE_TEST_IN_TSAN(BackwardIndexScanNoIntents),
1907
0
          PgMiniBackwardIndexScanTest) {
1908
0
  BackwardIndexScanTest(/* uncommitted_intents */ false);
1909
0
}
1910
1911
TEST_F_EX(PgMiniTest,
1912
          YB_DISABLE_TEST_IN_TSAN(BackwardIndexScanWithIntents),
1913
0
          PgMiniBackwardIndexScanTest) {
1914
0
  BackwardIndexScanTest(/* uncommitted_intents */ true);
1915
0
}
1916
1917
0
TEST_F(PgMiniTest, YB_DISABLE_TEST_IN_TSAN(CreateDatabase)) {
1918
0
  FLAGS_flush_rocksdb_on_shutdown = false;
1919
0
  auto conn = ASSERT_RESULT(Connect());
1920
0
  const std::string kDatabaseName = "testdb";
1921
0
  ASSERT_OK(conn.ExecuteFormat("CREATE DATABASE $0", kDatabaseName));
1922
0
  ASSERT_OK(RestartCluster());
1923
0
}
1924
1925
0
void PgMiniTest::TestBigInsert(bool restart) {
1926
0
  constexpr int64_t kNumRows = RegularBuildVsSanitizers(100000, 10000);
1927
0
  FLAGS_txn_max_apply_batch_records = kNumRows / 10;
1928
1929
0
  auto conn = ASSERT_RESULT(Connect());
1930
0
  ASSERT_OK(conn.Execute("CREATE TABLE t (a int PRIMARY KEY) SPLIT INTO 1 TABLETS"));
1931
0
  ASSERT_OK(conn.Execute("INSERT INTO t VALUES (0)"));
1932
1933
0
  TestThreadHolder thread_holder;
1934
1935
0
  std::atomic<int> post_insert_reads{0};
1936
0
  std::atomic<bool> restarted{false};
1937
0
  thread_holder.AddThreadFunctor(
1938
0
      [this, &stop = thread_holder.stop_flag(), &post_insert_reads, &restarted] {
1939
0
    auto connection = ASSERT_RESULT(Connect());
1940
0
    while (!stop.load(std::memory_order_acquire)) {
1941
0
      auto res = connection.FetchValue<int64_t>("SELECT SUM(a) FROM t");
1942
0
      if (!res.ok()) {
1943
0
        auto msg = res.status().message().ToBuffer();
1944
0
        ASSERT_TRUE(msg.find("server closed the connection unexpectedly") != std::string::npos)
1945
0
            << res.status();
1946
0
        while (!restarted.load() && !stop.load()) {
1947
0
          std::this_thread::sleep_for(10ms);
1948
0
        }
1949
0
        std::this_thread::sleep_for(1s);
1950
0
        LOG(INFO) << "Establishing new connection";
1951
0
        connection = ASSERT_RESULT(Connect());
1952
0
        restarted = false;
1953
0
        continue;
1954
0
      }
1955
1956
      // We should see zero or full sum only.
1957
0
      if (*res) {
1958
0
        ASSERT_EQ(*res, kNumRows * (kNumRows + 1) / 2);
1959
0
        ++post_insert_reads;
1960
0
      }
1961
0
    }
1962
0
  });
1963
1964
0
  ASSERT_OK(conn.ExecuteFormat(
1965
0
      "INSERT INTO t SELECT generate_series(1, $0)", kNumRows));
1966
1967
0
  if (restart) {
1968
0
    LOG(INFO) << "Restart cluster";
1969
0
    ASSERT_OK(RestartCluster());
1970
0
    restarted = true;
1971
0
  }
1972
1973
0
  ASSERT_OK(WaitFor([this, &post_insert_reads] {
1974
0
    auto intents_count = CountIntents(cluster_.get());
1975
0
    LOG(INFO) << "Intents count: " << intents_count;
1976
1977
0
    return intents_count == 0 && post_insert_reads.load(std::memory_order_acquire) > 0;
1978
0
  }, 60s * kTimeMultiplier, "Intents cleanup", 200ms));
1979
1980
0
  thread_holder.Stop();
1981
1982
0
  FlushAndCompactTablets();
1983
1984
0
  auto peers = ListTabletPeers(cluster_.get(), ListPeersFilter::kAll);
1985
0
  for (const auto& peer : peers) {
1986
0
    auto db = peer->tablet()->TEST_db();
1987
0
    if (!db) {
1988
0
      continue;
1989
0
    }
1990
0
    rocksdb::ReadOptions read_opts;
1991
0
    read_opts.query_id = rocksdb::kDefaultQueryId;
1992
0
    std::unique_ptr<rocksdb::Iterator> iter(db->NewIterator(read_opts));
1993
1994
0
    for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
1995
0
      Slice key = iter->key();
1996
0
      ASSERT_FALSE(key.TryConsumeByte(docdb::ValueTypeAsChar::kTransactionApplyState))
1997
0
          << "Key: " << iter->key().ToDebugString() << ", value: " << iter->value().ToDebugString();
1998
0
    }
1999
0
  }
2000
0
}
2001
2002
0
TEST_F(PgMiniTest, YB_DISABLE_TEST_IN_TSAN(BigInsert)) {
2003
0
  TestBigInsert(/* restart= */ false);
2004
0
}
2005
2006
0
TEST_F(PgMiniTest, YB_DISABLE_TEST_IN_TSAN(BigInsertWithRestart)) {
2007
0
  FLAGS_apply_intents_task_injected_delay_ms = 200;
2008
0
  TestBigInsert(/* restart= */ true);
2009
0
}
2010
2011
0
TEST_F(PgMiniTest, YB_DISABLE_TEST_IN_TSAN(BigInsertWithDropTable)) {
2012
0
  constexpr int kNumRows = 10000;
2013
0
  FLAGS_txn_max_apply_batch_records = kNumRows / 10;
2014
0
  FLAGS_apply_intents_task_injected_delay_ms = 200;
2015
0
  auto conn = ASSERT_RESULT(Connect());
2016
0
  ASSERT_OK(conn.Execute("CREATE TABLE t(id int) SPLIT INTO 1 TABLETS"));
2017
0
  ASSERT_OK(conn.ExecuteFormat(
2018
0
      "INSERT INTO t SELECT generate_series(1, $0)", kNumRows));
2019
0
  ASSERT_OK(conn.Execute("DROP TABLE t"));
2020
0
}
2021
2022
0
void PgMiniTest::TestConcurrentDeleteRowAndUpdateColumn(bool select_before_update) {
2023
0
  auto conn1 = ASSERT_RESULT(Connect());
2024
0
  auto conn2 = ASSERT_RESULT(Connect());
2025
0
  ASSERT_OK(conn1.Execute("CREATE TABLE t (i INT PRIMARY KEY, j INT)"));
2026
0
  ASSERT_OK(conn1.Execute("INSERT INTO t VALUES (1, 10), (2, 20), (3, 30)"));
2027
0
  ASSERT_OK(conn1.StartTransaction(IsolationLevel::SNAPSHOT_ISOLATION));
2028
0
  if (select_before_update) {
2029
0
    ASSERT_OK(conn1.Fetch("SELECT * FROM t"));
2030
0
  }
2031
0
  ASSERT_OK(conn2.Execute("DELETE FROM t WHERE i = 2"));
2032
0
  auto status = conn1.Execute("UPDATE t SET j = 21 WHERE i = 2");
2033
0
  if (select_before_update) {
2034
0
    ASSERT_NOK(status);
2035
0
    ASSERT_STR_CONTAINS(status.message().ToBuffer(), "Value write after transaction start");
2036
0
    return;
2037
0
  }
2038
0
  ASSERT_OK(status);
2039
0
  ASSERT_OK(conn1.CommitTransaction());
2040
0
  auto result = ASSERT_RESULT(conn1.FetchMatrix("SELECT * FROM t ORDER BY i", 2, 2));
2041
0
  auto value = ASSERT_RESULT(GetInt32(result.get(), 0, 0));
2042
0
  ASSERT_EQ(value, 1);
2043
0
  value = ASSERT_RESULT(GetInt32(result.get(), 0, 1));
2044
0
  ASSERT_EQ(value, 10);
2045
0
  value = ASSERT_RESULT(GetInt32(result.get(), 1, 0));
2046
0
  ASSERT_EQ(value, 3);
2047
0
  value = ASSERT_RESULT(GetInt32(result.get(), 1, 1));
2048
0
  ASSERT_EQ(value, 30);
2049
0
}
2050
2051
0
TEST_F(PgMiniTest, YB_DISABLE_TEST_IN_TSAN(ConcurrentDeleteRowAndUpdateColumn)) {
2052
0
  TestConcurrentDeleteRowAndUpdateColumn(/* select_before_update= */ false);
2053
0
}
2054
2055
0
TEST_F(PgMiniTest, YB_DISABLE_TEST_IN_TSAN(ConcurrentDeleteRowAndUpdateColumnWithSelect)) {
2056
0
  TestConcurrentDeleteRowAndUpdateColumn(/* select_before_update= */ true);
2057
0
}
2058
2059
// Test that we don't sequential restart read on the same table if intents were written
2060
// after the first read. GH #6972.
2061
0
TEST_F(PgMiniTest, YB_DISABLE_TEST_IN_TSAN(NoRestartSecondRead)) {
2062
0
  FLAGS_max_clock_skew_usec = 1000000000LL * kTimeMultiplier;
2063
0
  auto conn1 = ASSERT_RESULT(Connect());
2064
0
  auto conn2 = ASSERT_RESULT(Connect());
2065
0
  ASSERT_OK(conn1.Execute("CREATE TABLE t (a int PRIMARY KEY, b int) SPLIT INTO 1 TABLETS"));
2066
0
  ASSERT_OK(conn1.Execute("INSERT INTO t VALUES (1, 1), (2, 1), (3, 1)"));
2067
0
  auto start_time = MonoTime::Now();
2068
0
  ASSERT_OK(conn1.StartTransaction(IsolationLevel::SNAPSHOT_ISOLATION));
2069
0
  LOG(INFO) << "Select1";
2070
0
  auto res = ASSERT_RESULT(conn1.FetchValue<int32_t>("SELECT b FROM t WHERE a = 1"));
2071
0
  ASSERT_EQ(res, 1);
2072
0
  LOG(INFO) << "Update";
2073
0
  ASSERT_OK(conn2.StartTransaction(IsolationLevel::SNAPSHOT_ISOLATION));
2074
0
  ASSERT_OK(conn2.Execute("UPDATE t SET b = 2 WHERE a = 2"));
2075
0
  ASSERT_OK(conn2.CommitTransaction());
2076
0
  auto update_time = MonoTime::Now();
2077
0
  ASSERT_LE(update_time, start_time + FLAGS_max_clock_skew_usec * 1us);
2078
0
  LOG(INFO) << "Select2";
2079
0
  res = ASSERT_RESULT(conn1.FetchValue<int32_t>("SELECT b FROM t WHERE a = 2"));
2080
0
  ASSERT_EQ(res, 1);
2081
0
  ASSERT_OK(conn1.CommitTransaction());
2082
0
}
2083
2084
TEST_F_EX(
2085
  PgMiniTest, YB_DISABLE_TEST_IN_TSAN(SnapshotInOperatorLock),
2086
0
  PgMiniTestTxnHelperSnapshot) {
2087
0
  TestInOperatorLock();
2088
0
}
2089
2090
TEST_F_EX(
2091
    PgMiniTest, YB_DISABLE_TEST_IN_TSAN(SerializableInOperatorLock),
2092
0
    PgMiniTestTxnHelperSerializable) {
2093
0
  TestInOperatorLock();
2094
0
}
2095
2096
// ------------------------------------------------------------------------------------------------
2097
// Tablet Splitting Tests
2098
// ------------------------------------------------------------------------------------------------
2099
2100
class PgMiniTestAutoScanNextPartitions : public PgMiniTest {
2101
 public:
2102
0
  void SetUp() override {
2103
0
    FLAGS_TEST_index_read_multiple_partitions = true;
2104
0
    PgMiniTest::SetUp();
2105
0
  }
2106
};
2107
2108
TEST_F_EX(
2109
    PgMiniTest, YB_DISABLE_TEST_IN_SANITIZERS(AutoScanNextPartitions),
2110
0
    PgMiniTestAutoScanNextPartitions) {
2111
0
  auto conn = ASSERT_RESULT(Connect());
2112
0
  constexpr int numRows = 100;
2113
0
  ASSERT_OK(conn.Execute("CREATE TABLE t (k INT PRIMARY KEY, v1 INT, v2 INT) "
2114
0
                         "SPLIT INTO 6 TABLETS"));
2115
0
  ASSERT_OK(conn.Execute("CREATE INDEX ON t(v1, v2)"));
2116
2117
  // Insert elements into the table
2118
0
  for (int i = 0; i < numRows; i++) {
2119
0
    ASSERT_OK(conn.ExecuteFormat("INSERT INTO t (k, v1, v2) VALUES ($0, $1, $2)", i, 1, i));
2120
0
  }
2121
2122
  // Secondary index read from the table
2123
  // While performing secondary index read on ybctids, the pggate layer batches requests belonging
2124
  // to the same tablet. However, if the tablet is split after batching, we need a mechanism to
2125
  // execute the batched request across both the sub-tablets. We create a scenario to test this
2126
  // phenomenon here.
2127
  //
2128
  // FLAGS_index_read_multiple_partitions is a test flag when set will create a scenario to check if
2129
  // index scans of ybctids span across multiple tablets. Specifically in this example, we try to
2130
  // scan the for elements that are present in tablets 0,1 which contain value v1=1 and see if they
2131
  // match the expected number of rows.
2132
0
  auto res = ASSERT_RESULT(conn.Fetch("SELECT k FROM t WHERE v1 = 1"));
2133
0
  auto lines = PQntuples(res.get());
2134
0
  ASSERT_EQ(lines, numRows);
2135
0
}
2136
2137
class PgMiniTabletSplitTest : public PgMiniTest {
2138
 public:
2139
0
  void SetUp() override {
2140
0
    FLAGS_yb_num_shards_per_tserver = 1;
2141
0
    FLAGS_tablet_split_low_phase_size_threshold_bytes = 0;
2142
0
    FLAGS_tablet_split_high_phase_size_threshold_bytes = 0;
2143
0
    FLAGS_tablet_split_low_phase_shard_count_per_node = 0;
2144
0
    FLAGS_tablet_split_high_phase_shard_count_per_node = 0;
2145
0
    FLAGS_tablet_force_split_threshold_bytes = 30_KB;
2146
0
    FLAGS_db_write_buffer_size = FLAGS_tablet_force_split_threshold_bytes / 4;
2147
0
    FLAGS_db_block_size_bytes = 2_KB;
2148
0
    FLAGS_db_filter_block_size_bytes = 2_KB;
2149
0
    FLAGS_db_index_block_size_bytes = 2_KB;
2150
0
    FLAGS_heartbeat_interval_ms = 1000;
2151
0
    FLAGS_tserver_heartbeat_metrics_interval_ms = 1000;
2152
0
    FLAGS_TEST_inject_delay_between_prepare_ybctid_execute_batch_ybctid_ms = 4000;
2153
0
    FLAGS_ysql_prefetch_limit = 32;
2154
0
    PgMiniTest::SetUp();
2155
0
  }
2156
};
2157
2158
0
void PgMiniTest::CreateTableAndInitialize(std::string table_name, int num_tablets) {
2159
0
  auto conn = ASSERT_RESULT(Connect());
2160
2161
0
  ANNOTATE_UNPROTECTED_WRITE(FLAGS_enable_automatic_tablet_splitting) = false;
2162
0
  ASSERT_OK(conn.ExecuteFormat("CREATE TABLE $0 (h1 int, h2 int, r int, i int, "
2163
0
                               "PRIMARY KEY ((h1, h2) HASH, r ASC)) "
2164
0
                               "SPLIT INTO $1 TABLETS", table_name, num_tablets));
2165
2166
0
  ASSERT_OK(conn.ExecuteFormat("CREATE INDEX $0_idx "
2167
0
                               "ON $1(i HASH, r ASC)", table_name, table_name));
2168
2169
0
  ASSERT_OK(conn.ExecuteFormat("INSERT INTO $0 SELECT i, i, i, 1 FROM "
2170
0
                               "(SELECT generate_series(1, 500) i) t", table_name));
2171
0
}
2172
2173
0
void PgMiniTest::DestroyTable(std::string table_name) {
2174
0
  auto conn = ASSERT_RESULT(Connect());
2175
0
  ASSERT_OK(conn.ExecuteFormat("DROP TABLE $0", table_name));
2176
0
}
2177
2178
0
void PgMiniTest::GetTableIDFromTableName(const std::string table_name, std::string* table_id) {
2179
  // Get YBClient handler and tablet ID. Using this we can get the number of tablets before starting
2180
  // the test and before the test ends. With this we can ensure that tablet splitting has occurred.
2181
0
  auto client = ASSERT_RESULT(cluster_->CreateClient());
2182
0
  const auto tables = ASSERT_RESULT(client->ListTables());
2183
0
  for (const auto& table : tables) {
2184
0
    if (table.has_table() && table.table_name() == "update_pk_complex_two_hash_one_range_keys") {
2185
0
      table_id->assign(table.table_id());
2186
0
      break;
2187
0
    }
2188
0
  }
2189
0
}
2190
2191
void PgMiniTest::StartReadWriteThreads(const std::string table_name,
2192
0
    TestThreadHolder *thread_holder) {
2193
  // Writer thread that does parallel writes into table
2194
0
  thread_holder->AddThread([this, table_name] {
2195
0
    auto conn = ASSERT_RESULT(Connect());
2196
0
    for (int i = 501; i < 2000; i++) {
2197
0
      ASSERT_OK(conn.ExecuteFormat("INSERT INTO $0 VALUES ($1, $2, $3, $4)",
2198
0
                                   table_name, i, i, i, 1));
2199
0
    }
2200
0
  });
2201
2202
  // Index read from the table
2203
0
  thread_holder->AddThread([this, &stop = thread_holder->stop_flag(), table_name] {
2204
0
    auto conn = ASSERT_RESULT(Connect());
2205
0
    do {
2206
0
      auto result = ASSERT_RESULT(conn.FetchFormat("SELECT * FROM  $0 WHERE i = 1 order by r",
2207
0
                                                   table_name));
2208
0
      std::vector<int> sort_check;
2209
0
      for(int x = 0; x < PQntuples(result.get()); x++) {
2210
0
        auto value = ASSERT_RESULT(GetInt32(result.get(), x, 2));
2211
0
        sort_check.push_back(value);
2212
0
      }
2213
0
      ASSERT_TRUE(std::is_sorted(sort_check.begin(), sort_check.end()));
2214
0
    }  while (!stop.load(std::memory_order_acquire));
2215
0
  });
2216
0
}
2217
2218
TEST_F_EX(
2219
    PgMiniTest, YB_DISABLE_TEST_IN_SANITIZERS(TabletSplitSecondaryIndexYSQL),
2220
0
    PgMiniTabletSplitTest) {
2221
2222
0
  std::string table_name = "update_pk_complex_two_hash_one_range_keys";
2223
0
  CreateTableAndInitialize(table_name, 1);
2224
2225
0
  std::string table_id;
2226
0
  GetTableIDFromTableName(table_name, &table_id);
2227
0
  auto start_num_tablets = ListTableActiveTabletLeadersPeers(cluster_.get(), table_id).size();
2228
0
  ANNOTATE_UNPROTECTED_WRITE(FLAGS_enable_automatic_tablet_splitting) = true;
2229
2230
  // Insert elements into the table using a parallel thread
2231
0
  TestThreadHolder thread_holder;
2232
2233
  /*
2234
   * Writer thread writes into the table continously, while the index read thread does a secondary
2235
   * index lookup. During the index lookup, we inject artificial delays, specified by the flag
2236
   * FLAGS_TEST_tablet_split_injected_delay_ms. Tablets will split in between those delays into
2237
   * two different partitions.
2238
   *
2239
   * The purpose of this test is to verify that when the secondary index read request is being
2240
   * executed, the results from both the tablets are being represented. Without the fix from
2241
   * the pggate layer, only one half of the results will be obtained. Hence we verify that after the
2242
   * split the number of elements is > 500, which is the number of elements inserted before the
2243
   * split.
2244
   */
2245
0
  StartReadWriteThreads(table_name, &thread_holder);
2246
2247
0
  thread_holder.WaitAndStop(200s);
2248
0
  auto end_num_tablets = ListTableActiveTabletLeadersPeers(cluster_.get(), table_id).size();
2249
0
  ASSERT_GT(end_num_tablets, start_num_tablets);
2250
0
  DestroyTable(table_name);
2251
2252
  // Rerun the same test where table is created with 3 tablets.
2253
  // When a table is created with three tablets, the lower and upper bounds are as follows;
2254
  // tablet 1 -- empty to A
2255
  // tablet 2 -- A to B
2256
  // tablet 3 -- B to empty
2257
  // However, in situations where tables are created with just one tablet lower_bound and
2258
  // upper_bound for the tablet is empty to empty. Hence, to test both situations we run this test
2259
  // with one tablet and three tablets respectively.
2260
0
  CreateTableAndInitialize(table_name, 3);
2261
0
  GetTableIDFromTableName(table_name, &table_id);
2262
0
  start_num_tablets = ListTableActiveTabletLeadersPeers(cluster_.get(), table_id).size();
2263
0
  ANNOTATE_UNPROTECTED_WRITE(FLAGS_enable_automatic_tablet_splitting) = true;
2264
2265
0
  StartReadWriteThreads(table_name, &thread_holder);
2266
0
  thread_holder.WaitAndStop(200s);
2267
2268
0
  end_num_tablets = ListTableActiveTabletLeadersPeers(cluster_.get(), table_id).size();
2269
0
  ASSERT_GT(end_num_tablets, start_num_tablets);
2270
0
  DestroyTable(table_name);
2271
0
}
2272
2273
0
void PgMiniTest::RunManyConcurrentReadersTest() {
2274
0
  constexpr int kNumConcurrentRead = 8;
2275
0
  constexpr int kMinNumNonEmptyReads = 10;
2276
0
  const std::string kTableName = "savepoints";
2277
0
  TestThreadHolder thread_holder;
2278
2279
0
  std::atomic<int32_t> next_write_start{0};
2280
0
  std::atomic<int32_t> num_non_empty_reads{0};
2281
0
  CountDownLatch reader_latch(0);
2282
0
  CountDownLatch writer_latch(1);
2283
0
  std::atomic<bool> writer_thread_is_stopped{false};
2284
0
  CountDownLatch reader_threads_are_stopped(kNumConcurrentRead);
2285
2286
0
  {
2287
0
    auto conn = ASSERT_RESULT(Connect());
2288
0
    ASSERT_OK(conn.ExecuteFormat("CREATE TABLE $0 (a int)", kTableName));
2289
0
  }
2290
2291
0
  thread_holder.AddThreadFunctor([
2292
0
      &stop = thread_holder.stop_flag(), &next_write_start, &reader_latch, &writer_latch,
2293
0
      &writer_thread_is_stopped, kTableName, this] {
2294
0
    auto conn = ASSERT_RESULT(Connect());
2295
0
    while (!stop.load(std::memory_order_acquire)) {
2296
0
      auto write_start = (next_write_start += 5);
2297
0
      ASSERT_OK(conn.StartTransaction(IsolationLevel::SNAPSHOT_ISOLATION));
2298
0
      ASSERT_OK(conn.ExecuteFormat("INSERT INTO $0 VALUES ($1)", kTableName, write_start));
2299
0
      ASSERT_OK(conn.Execute("SAVEPOINT one"));
2300
0
      ASSERT_OK(conn.ExecuteFormat("INSERT INTO $0 VALUES ($1)", kTableName, write_start + 1));
2301
0
      ASSERT_OK(conn.Execute("SAVEPOINT two"));
2302
0
      ASSERT_OK(conn.ExecuteFormat("INSERT INTO $0 VALUES ($1)", kTableName, write_start + 2));
2303
0
      ASSERT_OK(conn.Execute("ROLLBACK TO SAVEPOINT one"));
2304
0
      ASSERT_OK(conn.ExecuteFormat("INSERT INTO $0 VALUES ($1)", kTableName, write_start + 3));
2305
0
      ASSERT_OK(conn.Execute("ROLLBACK TO SAVEPOINT one"));
2306
0
      ASSERT_OK(conn.ExecuteFormat("INSERT INTO $0 VALUES ($1)", kTableName, write_start + 4));
2307
2308
      // Start concurrent reader threads
2309
0
      reader_latch.Reset(kNumConcurrentRead * 5);
2310
0
      writer_latch.CountDown();
2311
2312
      // Commit while reader threads are running
2313
0
      ASSERT_OK(conn.CommitTransaction());
2314
2315
      // Allow reader threads to complete and halt.
2316
0
      ASSERT_TRUE(reader_latch.WaitFor(5s * kTimeMultiplier));
2317
0
      writer_latch.Reset(1);
2318
0
    }
2319
0
    writer_thread_is_stopped = true;
2320
0
  });
2321
2322
0
  for (int reader_idx = 0; reader_idx < kNumConcurrentRead; ++reader_idx) {
2323
0
    thread_holder.AddThreadFunctor([
2324
0
        &stop = thread_holder.stop_flag(), &next_write_start, &num_non_empty_reads,
2325
0
        &reader_latch, &writer_latch, &reader_threads_are_stopped, kTableName, this] {
2326
0
      auto conn = ASSERT_RESULT(Connect());
2327
0
      while (!stop.load(std::memory_order_acquire)) {
2328
0
        ASSERT_TRUE(writer_latch.WaitFor(10s * kTimeMultiplier));
2329
2330
0
        auto read_start = next_write_start.load();
2331
0
        auto read_end = read_start + 4;
2332
0
        auto fetch_query = strings::Substitute(
2333
0
            "SELECT * FROM $0 WHERE a BETWEEN $1 AND $2 ORDER BY a ASC",
2334
0
            kTableName, read_start, read_end);
2335
2336
0
        auto res = ASSERT_RESULT(conn.Fetch(fetch_query));
2337
0
        auto fetched_rows = PQntuples(res.get());
2338
0
        if (fetched_rows != 0) {
2339
0
          num_non_empty_reads++;
2340
0
          if (fetched_rows != 2) {
2341
0
            LOG(INFO)
2342
0
                << "Expected to fetch (" << read_start << ") and (" << read_end << "). "
2343
0
                << "Instead, got the following results:";
2344
0
            for (int i = 0; i < fetched_rows; ++i) {
2345
0
              auto fetched_val = CHECK_RESULT(GetInt32(res.get(), i, 0));
2346
0
              LOG(INFO) << "Result " << i << " - " << fetched_val;
2347
0
            }
2348
0
          }
2349
0
          EXPECT_EQ(fetched_rows, 2);
2350
0
          auto first_fetched_val = ASSERT_RESULT(GetInt32(res.get(), 0, 0));
2351
0
          EXPECT_EQ(read_start, first_fetched_val);
2352
0
          auto second_fetched_val = ASSERT_RESULT(GetInt32(res.get(), 1, 0));
2353
0
          EXPECT_EQ(read_start + 4, second_fetched_val);
2354
0
        }
2355
0
        reader_latch.CountDown(1);
2356
0
      }
2357
0
      reader_threads_are_stopped.CountDown(1);
2358
0
    });
2359
0
  }
2360
2361
0
  std::this_thread::sleep_for(60s);
2362
0
  thread_holder.stop_flag().store(true, std::memory_order_release);
2363
0
  while (!writer_thread_is_stopped.load(std::memory_order_acquire) ||
2364
0
          reader_threads_are_stopped.count() != 0) {
2365
0
    reader_latch.Reset(0);
2366
0
    writer_latch.Reset(0);
2367
0
    std::this_thread::sleep_for(10ms * kTimeMultiplier);
2368
0
  }
2369
0
  thread_holder.Stop();
2370
0
  EXPECT_GE(num_non_empty_reads, kMinNumNonEmptyReads);
2371
0
}
2372
2373
0
TEST_F(PgMiniTest, YB_DISABLE_TEST_IN_TSAN(BigInsertWithAbortedIntentsAndRestart)) {
2374
0
  FLAGS_apply_intents_task_injected_delay_ms = 200;
2375
2376
0
  constexpr int64_t kRowNumModToAbort = 7;
2377
0
  constexpr int64_t kNumBatches = 10;
2378
0
  constexpr int64_t kNumRows = RegularBuildVsSanitizers(10000, 1000);
2379
0
  FLAGS_txn_max_apply_batch_records = kNumRows / kNumBatches;
2380
2381
0
  auto conn = ASSERT_RESULT(Connect());
2382
0
  ASSERT_OK(conn.Execute("CREATE TABLE t (a int PRIMARY KEY) SPLIT INTO 1 TABLETS"));
2383
2384
0
  ASSERT_OK(conn.StartTransaction(IsolationLevel::SERIALIZABLE_ISOLATION));
2385
0
  for (int64_t row_num = 0; row_num < kNumRows; ++row_num) {
2386
0
    auto should_abort = row_num % kRowNumModToAbort == 0;
2387
0
    if (should_abort) {
2388
0
      ASSERT_OK(conn.Execute("SAVEPOINT A"));
2389
0
    }
2390
0
    ASSERT_OK(conn.ExecuteFormat("INSERT INTO t VALUES ($0)", row_num));
2391
0
    if (should_abort) {
2392
0
      ASSERT_OK(conn.Execute("ROLLBACK TO A"));
2393
0
    }
2394
0
  }
2395
2396
0
  ASSERT_OK(conn.CommitTransaction());
2397
2398
0
  LOG(INFO) << "Restart cluster";
2399
0
  ASSERT_OK(RestartCluster());
2400
0
  conn = ASSERT_RESULT(Connect());
2401
2402
0
  ASSERT_OK(WaitFor([this] {
2403
0
    auto intents_count = CountIntents(cluster_.get());
2404
0
    LOG(INFO) << "Intents count: " << intents_count;
2405
2406
0
    return intents_count == 0;
2407
0
  }, 60s * kTimeMultiplier, "Intents cleanup", 200ms));
2408
2409
0
  for (int64_t row_num = 0; row_num < kNumRows; ++row_num) {
2410
0
    auto should_abort = row_num % kRowNumModToAbort == 0;
2411
2412
0
    auto res = ASSERT_RESULT(conn.FetchFormat("SELECT * FROM t WHERE a = $0", row_num));
2413
0
    if (should_abort) {
2414
0
      EXPECT_NOT_OK(GetInt32(res.get(), 0, 0)) << "Did not expect to find value for: " << row_num;
2415
0
    } else {
2416
0
      int64_t value = EXPECT_RESULT(GetInt32(res.get(), 0, 0));
2417
0
      EXPECT_EQ(value, row_num) << "Expected to find " << row_num << ", found " << value << ".";
2418
0
    }
2419
0
  }
2420
0
}
2421
2422
TEST_F(
2423
    PgMiniTest,
2424
0
    YB_DISABLE_TEST_IN_SANITIZERS(TestConcurrentReadersMaskAbortedIntentsWithApplyDelay)) {
2425
0
  ASSERT_OK(cluster_->WaitForAllTabletServers());
2426
0
  std::this_thread::sleep_for(10s);
2427
0
  FLAGS_apply_intents_task_injected_delay_ms = 10000;
2428
0
  RunManyConcurrentReadersTest();
2429
0
}
2430
2431
TEST_F(
2432
    PgMiniTest,
2433
0
    YB_DISABLE_TEST_IN_SANITIZERS(TestConcurrentReadersMaskAbortedIntentsWithResponseDelay)) {
2434
0
  ASSERT_OK(cluster_->WaitForAllTabletServers());
2435
0
  std::this_thread::sleep_for(10s);
2436
0
  FLAGS_TEST_inject_random_delay_on_txn_status_response_ms = 30;
2437
0
  RunManyConcurrentReadersTest();
2438
0
}
2439
2440
TEST_F(
2441
    PgMiniTest,
2442
0
    YB_DISABLE_TEST_IN_SANITIZERS(TestConcurrentReadersMaskAbortedIntentsWithUpdateDelay)) {
2443
0
  ASSERT_OK(cluster_->WaitForAllTabletServers());
2444
0
  std::this_thread::sleep_for(10s);
2445
0
  FLAGS_TEST_txn_participant_inject_latency_on_apply_update_txn_ms = 30;
2446
0
  RunManyConcurrentReadersTest();
2447
0
}
2448
2449
0
TEST_F(PgMiniTest, YB_DISABLE_TEST_IN_TSAN(TestSerializableStrongReadLockNotAborted)) {
2450
0
  auto conn = ASSERT_RESULT(Connect());
2451
0
  ASSERT_OK(conn.Execute("CREATE TABLE t (a int PRIMARY KEY, b int) SPLIT INTO 1 TABLETS"));
2452
0
  for (int i = 0; i < 100; ++i) {
2453
0
    ASSERT_OK(conn.ExecuteFormat("INSERT INTO t VALUES ($0, $0)", i));
2454
0
  }
2455
2456
0
  auto conn1 = ASSERT_RESULT(Connect());
2457
0
  ASSERT_OK(conn1.StartTransaction(IsolationLevel::SERIALIZABLE_ISOLATION));
2458
0
  ASSERT_OK(conn1.Execute("SAVEPOINT A"));
2459
0
  auto res1 = ASSERT_RESULT(conn1.FetchFormat("SELECT b FROM t WHERE a = $0", 90));
2460
0
  ASSERT_OK(conn1.Execute("ROLLBACK TO A"));
2461
2462
0
  auto conn2 = ASSERT_RESULT(Connect());
2463
0
  ASSERT_OK(conn2.StartTransaction(IsolationLevel::SERIALIZABLE_ISOLATION));
2464
0
  auto update_status = conn2.ExecuteFormat("UPDATE t SET b = $0 WHERE a = $1", 1000, 90);
2465
2466
0
  auto commit_status = conn1.CommitTransaction();
2467
2468
0
  EXPECT_TRUE(commit_status.ok() ^ update_status.ok())
2469
0
      << "Expected exactly one of commit of first transaction or update of second transaction to "
2470
0
      << "fail.\n"
2471
0
      << "Commit status: " << commit_status << ".\n"
2472
0
      << "Update status: " << update_status << ".\n";
2473
0
}
2474
2475
// Use special mode when non leader master times out all rpcs.
2476
// Then step down master leader and perform backup.
2477
TEST_F_EX(PgMiniTest, YB_DISABLE_TEST_IN_SANITIZERS_OR_MAC(NonRespondingMaster),
2478
0
          PgMiniMasterFailoverTest) {
2479
0
  FLAGS_TEST_timeout_non_leader_master_rpcs = true;
2480
0
  tools::TmpDirProvider tmp_dir;
2481
2482
0
  auto conn = ASSERT_RESULT(Connect());
2483
0
  ASSERT_OK(conn.Execute("CREATE DATABASE test"));
2484
0
  conn = ASSERT_RESULT(ConnectToDB("test"));
2485
0
  ASSERT_OK(conn.Execute("CREATE TABLE t (i INT)"));
2486
2487
0
  auto peer = ASSERT_RESULT(cluster_->GetLeaderMiniMaster())->tablet_peer();
2488
0
  LOG(INFO) << "Old leader: " << peer->permanent_uuid();
2489
0
  ASSERT_OK(StepDown(peer, /* new_leader_uuid */ std::string(), ForceStepDown::kTrue));
2490
0
  ASSERT_OK(WaitFor([this, peer]() -> Result<bool> {
2491
0
    auto leader = VERIFY_RESULT(cluster_->GetLeaderMiniMaster())->tablet_peer();
2492
0
    if (leader->permanent_uuid() != peer->permanent_uuid()) {
2493
0
      LOG(INFO) << "New leader: " << leader->permanent_uuid();
2494
0
      return true;
2495
0
    }
2496
0
    return false;
2497
0
  }, 10s, "Wait leader change"));
2498
2499
0
  ASSERT_OK(tools::RunBackupCommand(
2500
0
      pg_host_port(), cluster_->GetMasterAddresses(), cluster_->GetTserverHTTPAddresses(),
2501
0
      *tmp_dir, {"--backup_location", tmp_dir / "backup", "--no_upload", "--keyspace", "ysql.test",
2502
0
       "create"}));
2503
0
}
2504
2505
} // namespace pgwrapper
2506
} // namespace yb