YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/yql/pgwrapper/pg_libpq-test.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
13
#include <signal.h>
14
15
#include <fstream>
16
#include <thread>
17
18
#include "yb/client/client_fwd.h"
19
#include "yb/client/table_info.h"
20
#include "yb/client/yb_table_name.h"
21
22
#include "yb/common/common.pb.h"
23
#include "yb/common/pgsql_error.h"
24
#include "yb/common/schema.h"
25
26
#include "yb/master/master_client.pb.h"
27
#include "yb/master/master_defaults.h"
28
29
#include "yb/util/async_util.h"
30
#include "yb/util/barrier.h"
31
#include "yb/util/metrics.h"
32
#include "yb/util/monotime.h"
33
#include "yb/util/path_util.h"
34
#include "yb/util/random_util.h"
35
#include "yb/util/scope_exit.h"
36
#include "yb/util/status_log.h"
37
#include "yb/util/test_thread_holder.h"
38
#include "yb/util/tsan_util.h"
39
40
#include "yb/yql/pgwrapper/libpq_test_base.h"
41
#include "yb/yql/pgwrapper/libpq_utils.h"
42
43
using namespace std::literals;
44
45
DECLARE_int64(external_mini_cluster_max_log_bytes);
46
47
METRIC_DECLARE_entity(tablet);
48
METRIC_DECLARE_counter(transaction_not_found);
49
50
METRIC_DECLARE_entity(server);
51
METRIC_DECLARE_counter(rpc_inbound_calls_created);
52
53
namespace yb {
54
namespace pgwrapper {
55
56
class PgLibPqTest : public LibPqTestBase {
57
 protected:
58
  void TestUriAuth();
59
60
  void TestMultiBankAccount(IsolationLevel isolation);
61
62
  void DoIncrement(int key, int num_increments, IsolationLevel isolation);
63
64
  void TestParallelCounter(IsolationLevel isolation);
65
66
  void TestConcurrentCounter(IsolationLevel isolation);
67
68
  void TestOnConflict(bool kill_master, const MonoDelta& duration);
69
70
  void TestCacheRefreshRetry(const bool is_retry_disabled);
71
72
  const std::vector<std::string> names{
73
    "uppercase:P",
74
    "space: ",
75
    "symbol:#",
76
    "single_quote:'",
77
    "double_quote:\"",
78
    "backslash:\\",
79
    "mixed:P #'\"\\",
80
  };
81
};
82
83
0
TEST_F(PgLibPqTest, YB_DISABLE_TEST_IN_TSAN(Simple)) {
84
0
  auto conn = ASSERT_RESULT(Connect());
85
86
0
  ASSERT_OK(conn.Execute("CREATE TABLE t (key INT, value TEXT)"));
87
0
  ASSERT_OK(conn.Execute("INSERT INTO t (key, value) VALUES (1, 'hello')"));
88
89
0
  auto res = ASSERT_RESULT(conn.Fetch("SELECT * FROM t"));
90
91
0
  {
92
0
    auto lines = PQntuples(res.get());
93
0
    ASSERT_EQ(1, lines);
94
95
0
    auto columns = PQnfields(res.get());
96
0
    ASSERT_EQ(2, columns);
97
98
0
    auto key = ASSERT_RESULT(GetInt32(res.get(), 0, 0));
99
0
    ASSERT_EQ(key, 1);
100
0
    auto value = ASSERT_RESULT(GetString(res.get(), 0, 1));
101
0
    ASSERT_EQ(value, "hello");
102
0
  }
103
0
}
104
105
// Test libpq connection to various database names.
106
0
TEST_F(PgLibPqTest, YB_DISABLE_TEST_IN_TSAN(DatabaseNames)) {
107
0
  PGConn conn = ASSERT_RESULT(Connect());
108
109
0
  for (const std::string& db_name : names) {
110
0
    ASSERT_OK(conn.ExecuteFormat("CREATE DATABASE $0", PqEscapeIdentifier(db_name)));
111
0
    ASSERT_OK(ConnectToDB(db_name));
112
0
  }
113
0
}
114
115
// Test libpq connection to various user names.
116
0
TEST_F(PgLibPqTest, YB_DISABLE_TEST_IN_TSAN(UserNames)) {
117
0
  PGConn conn = ASSERT_RESULT(Connect());
118
119
0
  for (const std::string& user_name : names) {
120
0
    ASSERT_OK(conn.ExecuteFormat("CREATE USER $0", PqEscapeIdentifier(user_name)));
121
0
    ASSERT_OK(ConnectToDBAsUser("" /* db_name */, user_name));
122
0
  }
123
0
}
124
125
// Test libpq connection using URI connection string.
126
0
TEST_F(PgLibPqTest, YB_DISABLE_TEST_IN_TSAN(Uri)) {
127
0
  const std::string& host = pg_ts->bind_host();
128
0
  const uint16_t port = pg_ts->pgsql_rpc_port();
129
0
  {
130
0
    const std::string& conn_str = Format("postgres://yugabyte@$0:$1", host, port);
131
0
    LOG(INFO) << "Connecting using string: " << conn_str;
132
0
    PGConn conn = ASSERT_RESULT(ConnectUsingString(conn_str));
133
0
    {
134
0
      auto res = ASSERT_RESULT(conn.Fetch("select current_database()"));
135
0
      auto answer = ASSERT_RESULT(GetString(res.get(), 0, 0));
136
0
      ASSERT_EQ(answer, "yugabyte");
137
0
    }
138
0
    {
139
0
      auto res = ASSERT_RESULT(conn.Fetch("select current_user"));
140
0
      auto answer = ASSERT_RESULT(GetString(res.get(), 0, 0));
141
0
      ASSERT_EQ(answer, "yugabyte");
142
0
    }
143
0
    {
144
0
      auto res = ASSERT_RESULT(conn.Fetch("show listen_addresses"));
145
0
      auto answer = ASSERT_RESULT(GetString(res.get(), 0, 0));
146
0
      ASSERT_EQ(answer, host);
147
0
    }
148
0
    {
149
0
      auto res = ASSERT_RESULT(conn.Fetch("show port"));
150
0
      auto answer = ASSERT_RESULT(GetString(res.get(), 0, 0));
151
0
      ASSERT_EQ(answer, std::to_string(port));
152
0
    }
153
0
  }
154
  // Supply database name.
155
0
  {
156
0
    const std::string& conn_str = Format("postgres://yugabyte@$0:$1/template1", host, port);
157
0
    LOG(INFO) << "Connecting using string: " << conn_str;
158
0
    PGConn conn = ASSERT_RESULT(ConnectUsingString(conn_str));
159
0
    {
160
0
      auto res = ASSERT_RESULT(conn.Fetch("select current_database()"));
161
0
      auto answer = ASSERT_RESULT(GetString(res.get(), 0, 0));
162
0
      ASSERT_EQ(answer, "template1");
163
0
    }
164
0
  }
165
  // Supply an incorrect password.  Since HBA config gives the yugabyte user trust access, postgres
166
  // won't request a password, our client won't send this password, and the authentication should
167
  // succeed.
168
0
  {
169
0
    const std::string& conn_str = Format("postgres://yugabyte:monkey123@$0:$1", host, port);
170
0
    LOG(INFO) << "Connecting using string: " << conn_str;
171
0
    ASSERT_OK(ConnectUsingString(conn_str));
172
0
  }
173
0
}
174
175
0
void PgLibPqTest::TestUriAuth() {
176
0
  const std::string& host = pg_ts->bind_host();
177
0
  const uint16_t port = pg_ts->pgsql_rpc_port();
178
  // Don't supply password.
179
0
  {
180
0
    const std::string& conn_str = Format("postgres://yugabyte@$0:$1", host, port);
181
0
    LOG(INFO) << "Connecting using string: " << conn_str;
182
0
    Result<PGConn> result = ConnectUsingString(
183
0
        conn_str,
184
0
        CoarseMonoClock::Now() + 2s /* deadline */);
185
0
    ASSERT_NOK(result);
186
0
    ASSERT_TRUE(result.status().IsNetworkError());
187
0
    ASSERT_TRUE(result.status().message().ToBuffer().find("Connect failed") != std::string::npos)
188
0
        << result.status();
189
0
  }
190
  // Supply an incorrect password.
191
0
  {
192
0
    const std::string& conn_str = Format("postgres://yugabyte:monkey123@$0:$1", host, port);
193
0
    LOG(INFO) << "Connecting using string: " << conn_str;
194
0
    Result<PGConn> result = ConnectUsingString(
195
0
        conn_str,
196
0
        CoarseMonoClock::Now() + 2s /* deadline */);
197
0
    ASSERT_NOK(result);
198
0
    ASSERT_TRUE(result.status().IsNetworkError());
199
0
    ASSERT_TRUE(result.status().message().ToBuffer().find("Connect failed") != std::string::npos)
200
0
        << result.status();
201
0
  }
202
  // Supply the correct password.
203
0
  {
204
0
    const std::string& conn_str = Format("postgres://yugabyte:yugabyte@$0:$1", host, port);
205
0
    LOG(INFO) << "Connecting using string: " << conn_str;
206
0
    ASSERT_OK(ConnectUsingString(conn_str));
207
0
  }
208
0
}
209
210
// Enable authentication using password.  This scheme requests the plain password.  You may still
211
// use SSL for encryption on the wire.
212
class PgLibPqTestAuthPassword : public PgLibPqTest {
213
0
  void UpdateMiniClusterOptions(ExternalMiniClusterOptions* options) override {
214
0
    options->extra_tserver_flags.push_back("--ysql_hba_conf_csv=host all all samehost password");
215
0
  }
216
};
217
218
0
TEST_F_EX(PgLibPqTest, YB_DISABLE_TEST_IN_TSAN(UriPassword), PgLibPqTestAuthPassword) {
219
0
  TestUriAuth();
220
0
}
221
222
// Enable authentication using md5.  This scheme is a challenge and response, so the plain password
223
// isn't sent.
224
class PgLibPqTestAuthMd5 : public PgLibPqTest {
225
0
  void UpdateMiniClusterOptions(ExternalMiniClusterOptions* options) override {
226
0
    options->extra_tserver_flags.push_back("--ysql_hba_conf_csv=host all all samehost md5");
227
0
  }
228
};
229
230
0
TEST_F_EX(PgLibPqTest, YB_DISABLE_TEST_IN_TSAN(UriMd5), PgLibPqTestAuthMd5) {
231
0
  TestUriAuth();
232
0
}
233
234
// Test that repeats example from this article:
235
// https://blogs.msdn.microsoft.com/craigfr/2007/05/16/serializable-vs-snapshot-isolation-level/
236
//
237
// Multiple rows with values 0 and 1 are stored in table.
238
// Two concurrent transaction fetches all rows from table and does the following.
239
// First transaction changes value of all rows with value 0 to 1.
240
// Second transaction changes value of all rows with value 1 to 0.
241
// As outcome we should have rows with the same value.
242
//
243
// The described prodecure is repeated multiple times to increase probability of catching bug,
244
// w/o running test multiple times.
245
0
TEST_F(PgLibPqTest, YB_DISABLE_TEST_IN_TSAN(SerializableColoring)) {
246
0
  constexpr auto kKeys = RegularBuildVsSanitizers(10, 20);
247
0
  constexpr auto kColors = 2;
248
0
  constexpr auto kIterations = 20;
249
250
0
  auto conn = ASSERT_RESULT(Connect());
251
252
0
  ASSERT_OK(conn.Execute("CREATE TABLE t (key INT PRIMARY KEY, color INT)"));
253
254
0
  auto iterations_left = kIterations;
255
256
0
  for (int iteration = 0; iterations_left > 0; ++iteration) {
257
0
    auto iteration_title = Format("Iteration: $0", iteration);
258
0
    SCOPED_TRACE(iteration_title);
259
0
    LOG(INFO) << iteration_title;
260
261
0
    auto s = conn.Execute("DELETE FROM t");
262
0
    if (!s.ok()) {
263
0
      ASSERT_TRUE(HasTryAgain(s)) << s;
264
0
      continue;
265
0
    }
266
0
    for (int k = 0; k != kKeys; ++k) {
267
0
      int32_t color = RandomUniformInt(0, kColors - 1);
268
0
      ASSERT_OK(conn.ExecuteFormat("INSERT INTO t (key, color) VALUES ($0, $1)", k, color));
269
0
    }
270
271
0
    std::atomic<int> complete{ 0 };
272
0
    std::vector<std::thread> threads;
273
0
    for (int i = 0; i != kColors; ++i) {
274
0
      int32_t color = i;
275
0
      threads.emplace_back([this, color, kKeys, &complete] {
276
0
        auto connection = ASSERT_RESULT(Connect());
277
278
0
        ASSERT_OK(connection.Execute("BEGIN"));
279
0
        ASSERT_OK(connection.Execute("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE"));
280
281
0
        auto res = connection.Fetch("SELECT * FROM t");
282
0
        if (!res.ok()) {
283
0
          ASSERT_TRUE(HasTryAgain(res.status())) << res.status();
284
0
          return;
285
0
        }
286
0
        auto columns = PQnfields(res->get());
287
0
        ASSERT_EQ(2, columns);
288
289
0
        auto lines = PQntuples(res->get());
290
0
        ASSERT_EQ(kKeys, lines);
291
0
        for (int j = 0; j != lines; ++j) {
292
0
          if (ASSERT_RESULT(GetInt32(res->get(), j, 1)) == color) {
293
0
            continue;
294
0
          }
295
296
0
          auto key = ASSERT_RESULT(GetInt32(res->get(), j, 0));
297
0
          auto status = connection.ExecuteFormat(
298
0
              "UPDATE t SET color = $1 WHERE key = $0", key, color);
299
0
          if (!status.ok()) {
300
0
            auto msg = status.message().ToBuffer();
301
            // Missing metadata means that transaction was aborted and cleaned.
302
0
            ASSERT_TRUE(HasTryAgain(status) ||
303
0
                        msg.find("Missing metadata") != std::string::npos) << status;
304
0
            break;
305
0
          }
306
0
        }
307
308
0
        auto status = connection.Execute("COMMIT");
309
0
        if (!status.ok()) {
310
0
          auto msg = status.message().ToBuffer();
311
0
          ASSERT_TRUE(msg.find("Operation expired") != std::string::npos) << status;
312
0
          return;
313
0
        }
314
315
0
        ++complete;
316
0
      });
317
0
    }
318
319
0
    for (auto& thread : threads) {
320
0
      thread.join();
321
0
    }
322
323
0
    if (complete == 0) {
324
0
      continue;
325
0
    }
326
327
0
    auto res = ASSERT_RESULT(conn.Fetch("SELECT * FROM t"));
328
0
    auto columns = PQnfields(res.get());
329
0
    ASSERT_EQ(2, columns);
330
331
0
    auto lines = PQntuples(res.get());
332
0
    ASSERT_EQ(kKeys, lines);
333
334
0
    std::vector<int32_t> zeroes, ones;
335
0
    for (int i = 0; i != lines; ++i) {
336
0
      auto key = ASSERT_RESULT(GetInt32(res.get(), i, 0));
337
0
      auto current = ASSERT_RESULT(GetInt32(res.get(), i, 1));
338
0
      if (current == 0) {
339
0
        zeroes.push_back(key);
340
0
      } else {
341
0
        ones.push_back(key);
342
0
      }
343
0
    }
344
345
0
    std::sort(ones.begin(), ones.end());
346
0
    std::sort(zeroes.begin(), zeroes.end());
347
348
0
    LOG(INFO) << "Zeroes: " << yb::ToString(zeroes) << ", ones: " << yb::ToString(ones);
349
0
    ASSERT_TRUE(zeroes.empty() || ones.empty());
350
351
0
    --iterations_left;
352
0
  }
353
0
}
354
355
0
TEST_F(PgLibPqTest, YB_DISABLE_TEST_IN_TSAN(SerializableReadWriteConflict)) {
356
0
  const auto kKeys = RegularBuildVsSanitizers(20, 5);
357
0
  const auto kNumTries = RegularBuildVsSanitizers(4, 1);
358
0
  auto tries = 1;
359
0
  for (; tries <= kNumTries; ++tries) {
360
0
    auto conn = ASSERT_RESULT(Connect());
361
0
    ASSERT_OK(conn.Execute("DROP TABLE IF EXISTS t"));
362
0
    ASSERT_OK(conn.Execute("CREATE TABLE t (key INT PRIMARY KEY)"));
363
364
0
    size_t reads_won = 0, writes_won = 0;
365
0
    for (int i = 0; i != kKeys; ++i) {
366
0
      auto read_conn = ASSERT_RESULT(Connect());
367
0
      ASSERT_OK(read_conn.Execute("BEGIN ISOLATION LEVEL SERIALIZABLE"));
368
0
      auto res = read_conn.FetchFormat("SELECT * FROM t WHERE key = $0", i);
369
0
      auto read_status = ResultToStatus(res);
370
371
0
      auto write_conn = ASSERT_RESULT(Connect());
372
0
      ASSERT_OK(write_conn.Execute("BEGIN ISOLATION LEVEL SERIALIZABLE"));
373
0
      auto write_status = write_conn.ExecuteFormat("INSERT INTO t (key) VALUES ($0)", i);
374
375
0
      std::thread read_commit_thread([&read_conn, &read_status] {
376
0
        if (read_status.ok()) {
377
0
          read_status = read_conn.Execute("COMMIT");
378
0
        }
379
0
      });
380
381
0
      std::thread write_commit_thread([&write_conn, &write_status] {
382
0
        if (write_status.ok()) {
383
0
          write_status = write_conn.Execute("COMMIT");
384
0
        }
385
0
      });
386
387
0
      read_commit_thread.join();
388
0
      write_commit_thread.join();
389
390
0
      LOG(INFO) << "Read: " << read_status << ", write: " << write_status;
391
392
0
      if (!read_status.ok()) {
393
0
        ASSERT_OK(write_status);
394
0
        ++writes_won;
395
0
      } else {
396
0
        ASSERT_NOK(write_status);
397
0
        ++reads_won;
398
0
      }
399
0
    }
400
401
0
    LOG(INFO) << "Reads won: " << reads_won << ", writes won: " << writes_won
402
0
              << " (" << tries << "/" << kNumTries << ")";
403
    // always pass for TSAN, we're just looking for memory issues
404
0
    if (RegularBuildVsSanitizers(false, true)) {
405
0
      break;
406
0
    }
407
    // break (succeed) if we hit 25% on our "coin toss" transaction conflict above
408
0
    if (reads_won >= kKeys / 4 && writes_won >= kKeys / 4) {
409
0
      break;
410
0
    }
411
    // otherwise, retry and see if this is consistent behavior
412
0
  }
413
0
  ASSERT_LE(tries, kNumTries);
414
0
}
415
416
0
TEST_F(PgLibPqTest, YB_DISABLE_TEST_IN_TSAN(ReadRestart)) {
417
0
  auto conn = ASSERT_RESULT(Connect());
418
0
  ASSERT_OK(conn.Execute("CREATE TABLE t (key INT PRIMARY KEY)"));
419
420
0
  std::atomic<bool> stop(false);
421
0
  std::atomic<int> last_written(0);
422
423
0
  std::thread write_thread([this, &stop, &last_written] {
424
0
    auto write_conn = ASSERT_RESULT(Connect());
425
0
    int write_key = 1;
426
0
    while (!stop.load(std::memory_order_acquire)) {
427
0
      SCOPED_TRACE(Format("Writing: $0", write_key));
428
429
0
      ASSERT_OK(write_conn.Execute("BEGIN"));
430
0
      auto status = write_conn.ExecuteFormat("INSERT INTO t (key) VALUES ($0)", write_key);
431
0
      if (status.ok()) {
432
0
        status = write_conn.Execute("COMMIT");
433
0
      }
434
0
      if (status.ok()) {
435
0
        last_written.store(write_key, std::memory_order_release);
436
0
        ++write_key;
437
0
      } else {
438
0
        LOG(INFO) << "Write " << write_key << " failed: " << status;
439
0
      }
440
0
    }
441
0
  });
442
443
0
  auto se = ScopeExit([&stop, &write_thread] {
444
0
    stop.store(true, std::memory_order_release);
445
0
    write_thread.join();
446
0
  });
447
448
0
  auto deadline = CoarseMonoClock::now() + 30s;
449
450
0
  while (CoarseMonoClock::now() < deadline) {
451
0
    int read_key = last_written.load(std::memory_order_acquire);
452
0
    if (read_key == 0) {
453
0
      std::this_thread::sleep_for(100ms);
454
0
      continue;
455
0
    }
456
457
0
    SCOPED_TRACE(Format("Reading: $0", read_key));
458
459
0
    ASSERT_OK(conn.Execute("BEGIN"));
460
461
0
    auto res = ASSERT_RESULT(conn.FetchFormat("SELECT * FROM t WHERE key = $0", read_key));
462
0
    auto columns = PQnfields(res.get());
463
0
    ASSERT_EQ(1, columns);
464
465
0
    auto lines = PQntuples(res.get());
466
0
    ASSERT_EQ(1, lines);
467
468
0
    auto key = ASSERT_RESULT(GetInt32(res.get(), 0, 0));
469
0
    ASSERT_EQ(key, read_key);
470
471
0
    ASSERT_OK(conn.Execute("ROLLBACK"));
472
0
  }
473
474
0
  ASSERT_GE(last_written.load(std::memory_order_acquire), 100);
475
0
}
476
477
// Concurrently insert records into tables with foreign key relationship while truncating both.
478
0
TEST_F(PgLibPqTest, YB_DISABLE_TEST_IN_TSAN(ConcurrentInsertTruncateForeignKey)) {
479
0
  auto conn = ASSERT_RESULT(Connect());
480
481
0
  ASSERT_OK(conn.Execute("DROP TABLE IF EXISTS t2"));
482
0
  ASSERT_OK(conn.Execute("DROP TABLE IF EXISTS t1"));
483
0
  ASSERT_OK(conn.Execute("CREATE TABLE t1 (k int primary key, v int)"));
484
0
  ASSERT_OK(conn.Execute(
485
0
        "CREATE TABLE t2 (k int primary key, t1_k int, FOREIGN KEY (t1_k) REFERENCES t1 (k))"));
486
487
0
  const int kMaxKeys = 1 << 20;
488
489
0
  constexpr auto kWriteThreads = 4;
490
0
  constexpr auto kTruncateThreads = 2;
491
492
0
  TestThreadHolder thread_holder;
493
0
  for (int i = 0; i != kWriteThreads; ++i) {
494
0
    thread_holder.AddThreadFunctor([this, &stop = thread_holder.stop_flag()] {
495
0
      auto write_conn = ASSERT_RESULT(Connect());
496
0
      while (!stop.load(std::memory_order_acquire)) {
497
0
        int t1_k = RandomUniformInt(0, kMaxKeys - 1);
498
0
        int t1_v = RandomUniformInt(0, kMaxKeys - 1);
499
0
        auto status = write_conn.ExecuteFormat("INSERT INTO t1 VALUES ($0, $1)", t1_k, t1_v);
500
0
        int t2_k = RandomUniformInt(0, kMaxKeys - 1);
501
0
        status = write_conn.ExecuteFormat("INSERT INTO t2 VALUES ($0, $1)", t2_k, t1_k);
502
0
      }
503
0
    });
504
0
  }
505
506
0
  for (int i = 0; i != kTruncateThreads; ++i) {
507
0
    thread_holder.AddThreadFunctor([this, &stop = thread_holder.stop_flag()] {
508
0
      auto truncate_conn = ASSERT_RESULT(Connect());
509
0
      int idx = 0;
510
0
      while (!stop.load(std::memory_order_acquire)) {
511
0
        auto status = truncate_conn.Execute("TRUNCATE TABLE t1, t2 CASCADE");
512
0
        ++idx;
513
0
        std::this_thread::sleep_for(100ms);
514
0
      }
515
0
    });
516
0
  }
517
518
0
  thread_holder.WaitAndStop(30s);
519
0
}
520
521
// Concurrently insert records to table with index.
522
0
TEST_F(PgLibPqTest, YB_DISABLE_TEST_IN_TSAN(ConcurrentIndexInsert)) {
523
0
  auto conn = ASSERT_RESULT(Connect());
524
525
0
  ASSERT_OK(conn.Execute(
526
0
      "CREATE TABLE IF NOT EXISTS users(id text, ename text, age int, PRIMARY KEY(id))"));
527
528
0
  ASSERT_OK(conn.Execute(
529
0
      "CREATE INDEX IF NOT EXISTS name_idx ON users(ename)"));
530
531
0
  constexpr auto kWriteThreads = 4;
532
533
0
  std::atomic<bool> stop(false);
534
0
  std::vector<std::thread> write_threads;
535
536
0
  while (write_threads.size() != kWriteThreads) {
537
0
    write_threads.emplace_back([this, &stop] {
538
0
      auto write_conn = ASSERT_RESULT(Connect());
539
0
      auto this_thread_id = std::this_thread::get_id();
540
0
      auto tid = std::hash<decltype(this_thread_id)>()(this_thread_id);
541
0
      int idx = 0;
542
0
      while (!stop.load(std::memory_order_acquire)) {
543
0
        ASSERT_OK(write_conn.ExecuteFormat(
544
0
            "INSERT INTO users (id, ename, age) VALUES ('user-$0-$1', 'name-$1', $2)",
545
0
            tid, idx, 20 + (idx % 50)));
546
0
        ++idx;
547
0
      }
548
0
    });
549
0
  }
550
551
0
  auto se = ScopeExit([&stop, &write_threads] {
552
0
    stop.store(true, std::memory_order_release);
553
0
    for (auto& thread : write_threads) {
554
0
      thread.join();
555
0
    }
556
0
  });
557
558
0
  std::this_thread::sleep_for(30s);
559
0
}
560
561
Result<int64_t> ReadSumBalance(
562
    PGConn* conn, int accounts, IsolationLevel isolation,
563
0
    std::atomic<int>* counter) {
564
0
  RETURN_NOT_OK(conn->StartTransaction(isolation));
565
0
  bool failed = true;
566
0
  auto se = ScopeExit([conn, &failed] {
567
0
    if (failed) {
568
0
      EXPECT_OK(conn->Execute("ROLLBACK"));
569
0
    }
570
0
  });
571
572
0
  std::string query = "";
573
0
  for (int i = 1; i <= accounts; ++i) {
574
0
    if (!query.empty()) {
575
0
      query += " UNION ";
576
0
    }
577
0
    query += Format("SELECT balance, id FROM account_$0 WHERE id = $0", i);
578
0
  }
579
580
0
  auto res = VERIFY_RESULT(conn->FetchMatrix(query, accounts, 2));
581
0
  int64_t sum = 0;
582
0
  for (int i = 0; i != accounts; ++i) {
583
0
    sum += VERIFY_RESULT(GetValue<int64_t>(res.get(), i, 0));
584
0
  }
585
586
0
  failed = false;
587
0
  RETURN_NOT_OK(conn->Execute("COMMIT"));
588
0
  return sum;
589
0
}
590
591
0
void PgLibPqTest::TestMultiBankAccount(IsolationLevel isolation) {
592
0
  constexpr int kAccounts = RegularBuildVsSanitizers(20, 10);
593
0
  constexpr int64_t kInitialBalance = 100;
594
595
0
#ifndef NDEBUG
596
0
  const auto kTimeout = 180s;
597
0
  constexpr int kThreads = RegularBuildVsSanitizers(12, 5);
598
#else
599
  const auto kTimeout = 60s;
600
  constexpr int kThreads = 5;
601
#endif
602
603
0
  PGConn conn = ASSERT_RESULT(Connect());
604
0
  std::vector<PGConn> thread_connections;
605
0
  for (int i = 0; i < kThreads; ++i) {
606
0
    thread_connections.push_back(ASSERT_RESULT(Connect()));
607
0
  }
608
609
0
  for (int i = 1; i <= kAccounts; ++i) {
610
0
    ASSERT_OK(conn.ExecuteFormat(
611
0
        "CREATE TABLE account_$0 (id int, balance bigint, PRIMARY KEY(id))", i));
612
0
    ASSERT_OK(conn.ExecuteFormat(
613
0
        "INSERT INTO account_$0 (id, balance) VALUES ($0, $1)", i, kInitialBalance));
614
0
  }
615
616
0
  std::atomic<int> writes(0);
617
0
  std::atomic<int> reads(0);
618
619
0
  constexpr auto kRequiredReads = RegularBuildVsSanitizers(5, 2);
620
0
  constexpr auto kRequiredWrites = RegularBuildVsSanitizers(1000, 500);
621
622
0
  std::atomic<int> counter(100000);
623
0
  TestThreadHolder thread_holder;
624
0
  for (int i = 1; i <= kThreads; ++i) {
625
0
    thread_holder.AddThreadFunctor(
626
0
        [&conn = thread_connections[i - 1], &writes, &isolation,
627
0
         &stop_flag = thread_holder.stop_flag()]() {
628
0
      while (!stop_flag.load(std::memory_order_acquire)) {
629
0
        int from = RandomUniformInt(1, kAccounts);
630
0
        int to = RandomUniformInt(1, kAccounts - 1);
631
0
        if (to >= from) {
632
0
          ++to;
633
0
        }
634
0
        int64_t amount = RandomUniformInt(1, 10);
635
0
        ASSERT_OK(conn.StartTransaction(isolation));
636
0
        auto status = conn.ExecuteFormat(
637
0
              "UPDATE account_$0 SET balance = balance - $1 WHERE id = $0", from, amount);
638
0
        if (status.ok()) {
639
0
          status = conn.ExecuteFormat(
640
0
              "UPDATE account_$0 SET balance = balance + $1 WHERE id = $0", to, amount);
641
0
        }
642
0
        if (status.ok()) {
643
0
          status = conn.Execute("COMMIT;");
644
0
        } else {
645
0
          ASSERT_OK(conn.Execute("ROLLBACK;"));
646
0
        }
647
0
        if (!status.ok()) {
648
0
          ASSERT_TRUE(TransactionalFailure(status)) << status;
649
0
        } else {
650
0
          LOG(INFO) << "Updated: " << from << " => " << to << " by " << amount;
651
0
          ++writes;
652
0
        }
653
0
      }
654
0
    });
655
0
  }
656
657
0
  thread_holder.AddThreadFunctor(
658
0
      [this, &counter, &reads, &writes, isolation, &stop_flag = thread_holder.stop_flag()]() {
659
0
    SetFlagOnExit set_flag_on_exit(&stop_flag);
660
0
    auto connection = ASSERT_RESULT(Connect());
661
0
    auto failures_in_row = 0;
662
0
    while (!stop_flag.load(std::memory_order_acquire)) {
663
0
      if (isolation == IsolationLevel::SERIALIZABLE_ISOLATION) {
664
0
        auto lower_bound = reads.load() * kRequiredWrites < writes.load() * kRequiredReads
665
0
            ? 1.0 - 1.0 / (1ULL << failures_in_row) : 0.0;
666
0
        ASSERT_OK(connection.ExecuteFormat(
667
0
            "SET yb_transaction_priority_lower_bound = $0", lower_bound));
668
0
      }
669
0
      auto sum = ReadSumBalance(&connection, kAccounts, isolation, &counter);
670
0
      if (!sum.ok()) {
671
        // Do not overflow long when doing bitshift above.
672
0
        failures_in_row = std::min(failures_in_row + 1, 63);
673
0
        ASSERT_TRUE(TransactionalFailure(sum.status())) << sum.status();
674
0
      } else {
675
0
        failures_in_row = 0;
676
0
        ASSERT_EQ(*sum, kAccounts * kInitialBalance);
677
0
        ++reads;
678
0
      }
679
0
    }
680
0
  });
681
682
0
  auto wait_status = WaitFor([&reads, &writes, &stop = thread_holder.stop_flag()] {
683
0
    return stop.load() || (writes.load() >= kRequiredWrites && reads.load() >= kRequiredReads);
684
0
  }, kTimeout, Format("At least $0 reads and $1 writes", kRequiredReads, kRequiredWrites));
685
686
0
  LOG(INFO) << "Writes: " << writes.load() << ", reads: " << reads.load();
687
688
0
  ASSERT_OK(wait_status);
689
690
0
  thread_holder.Stop();
691
692
0
  ASSERT_OK(WaitFor([&conn, isolation, &counter]() -> Result<bool> {
693
0
    auto sum = ReadSumBalance(&conn, kAccounts, isolation, &counter);
694
0
    if (!sum.ok()) {
695
0
      if (!TransactionalFailure(sum.status())) {
696
0
        return sum.status();
697
0
      }
698
0
      return false;
699
0
    }
700
0
    EXPECT_EQ(*sum, kAccounts * kInitialBalance);
701
0
    return true;
702
0
  }, 10s, "Final read"));
703
704
0
  auto total_not_found = 0;
705
0
  for (auto* tserver : cluster_->tserver_daemons()) {
706
0
    auto tablets = ASSERT_RESULT(cluster_->GetTabletIds(tserver));
707
0
    for (const auto& tablet : tablets) {
708
0
      auto result = tserver->GetInt64Metric(
709
0
          &METRIC_ENTITY_tablet, tablet.c_str(), &METRIC_transaction_not_found, "value");
710
0
      if (result.ok()) {
711
0
        total_not_found += *result;
712
0
      } else {
713
0
        ASSERT_TRUE(result.status().IsNotFound()) << result.status();
714
0
      }
715
0
    }
716
0
  }
717
718
0
  LOG(INFO) << "Total not found: " << total_not_found;
719
  // Check that total not found is not too big.
720
0
  ASSERT_LE(total_not_found, 200);
721
0
}
722
723
class PgLibPqSmallClockSkewTest : public PgLibPqTest {
724
0
  void UpdateMiniClusterOptions(ExternalMiniClusterOptions* options) override {
725
    // Use small clock skew, to decrease number of read restarts.
726
0
    options->extra_tserver_flags.push_back("--max_clock_skew_usec=5000");
727
0
  }
728
};
729
730
TEST_F_EX(PgLibPqTest, YB_DISABLE_TEST_IN_TSAN(MultiBankAccountSnapshot),
731
0
          PgLibPqSmallClockSkewTest) {
732
0
  TestMultiBankAccount(IsolationLevel::SNAPSHOT_ISOLATION);
733
0
}
734
735
0
TEST_F(PgLibPqTest, YB_DISABLE_TEST_IN_TSAN(MultiBankAccountSerializable)) {
736
0
  TestMultiBankAccount(IsolationLevel::SERIALIZABLE_ISOLATION);
737
0
}
738
739
0
void PgLibPqTest::DoIncrement(int key, int num_increments, IsolationLevel isolation) {
740
0
  auto conn = ASSERT_RESULT(Connect());
741
742
  // Perform increments
743
0
  int succeeded_incs = 0;
744
0
  while (succeeded_incs < num_increments) {
745
0
    ASSERT_OK(conn.StartTransaction(isolation));
746
0
    bool committed = false;
747
0
    auto exec_status = conn.ExecuteFormat("UPDATE t SET value = value + 1 WHERE key = $0", key);
748
0
    if (exec_status.ok()) {
749
0
      auto commit_status = conn.Execute("COMMIT");
750
0
      if (commit_status.ok()) {
751
0
        succeeded_incs++;
752
0
        committed = true;
753
0
      }
754
0
    }
755
0
    if (!committed) {
756
0
      ASSERT_OK(conn.Execute("ROLLBACK"));
757
0
    }
758
0
  }
759
0
}
760
761
0
void PgLibPqTest::TestParallelCounter(IsolationLevel isolation) {
762
0
  auto conn = ASSERT_RESULT(Connect());
763
764
0
  ASSERT_OK(conn.Execute("CREATE TABLE t (key INT, value INT)"));
765
766
0
  const auto kThreads = RegularBuildVsSanitizers(3, 2);
767
0
  const auto kIncrements = RegularBuildVsSanitizers(100, 20);
768
769
  // Make a counter for each thread and have each thread increment it
770
0
  std::vector<std::thread> threads;
771
0
  while (threads.size() != kThreads) {
772
0
    int key = narrow_cast<int>(threads.size());
773
0
    ASSERT_OK(conn.ExecuteFormat("INSERT INTO t (key, value) VALUES ($0, 0)", key));
774
775
0
    threads.emplace_back([this, key, isolation] {
776
0
      DoIncrement(key, kIncrements, isolation);
777
0
    });
778
0
  }
779
780
  // Wait for completion
781
0
  for (auto& thread : threads) {
782
0
    thread.join();
783
0
  }
784
785
  // Check each counter
786
0
  for (int i = 0; i < kThreads; i++) {
787
0
    auto res = ASSERT_RESULT(conn.FetchFormat("SELECT value FROM t WHERE key = $0", i));
788
789
0
    auto row_val = ASSERT_RESULT(GetInt32(res.get(), 0, 0));
790
0
    ASSERT_EQ(row_val, kIncrements);
791
0
  }
792
0
}
793
794
0
TEST_F(PgLibPqTest, YB_DISABLE_TEST_IN_TSAN(TestParallelCounterSerializable)) {
795
0
  TestParallelCounter(IsolationLevel::SERIALIZABLE_ISOLATION);
796
0
}
797
798
0
TEST_F(PgLibPqTest, YB_DISABLE_TEST_IN_TSAN(TestParallelCounterRepeatableRead)) {
799
0
  TestParallelCounter(IsolationLevel::SNAPSHOT_ISOLATION);
800
0
}
801
802
0
void PgLibPqTest::TestConcurrentCounter(IsolationLevel isolation) {
803
0
  auto conn = ASSERT_RESULT(Connect());
804
805
0
  ASSERT_OK(conn.Execute("CREATE TABLE t (key INT, value INT)"));
806
807
0
  ASSERT_OK(conn.Execute("INSERT INTO t (key, value) VALUES (0, 0)"));
808
809
0
  const auto kThreads = RegularBuildVsSanitizers(3, 2);
810
0
  const auto kIncrements = RegularBuildVsSanitizers(100, 20);
811
812
  // Have each thread increment the same already-created counter
813
0
  std::vector<std::thread> threads;
814
0
  while (threads.size() != kThreads) {
815
0
    threads.emplace_back([this, isolation] {
816
0
      DoIncrement(0, kIncrements, isolation);
817
0
    });
818
0
  }
819
820
  // Wait for completion
821
0
  for (auto& thread : threads) {
822
0
    thread.join();
823
0
  }
824
825
  // Check that we incremented exactly the desired number of times
826
0
  auto res = ASSERT_RESULT(conn.Fetch("SELECT value FROM t WHERE key = 0"));
827
828
0
  auto row_val = ASSERT_RESULT(GetInt32(res.get(), 0, 0));
829
0
  ASSERT_EQ(row_val, kThreads * kIncrements);
830
0
}
831
832
0
TEST_F(PgLibPqTest, YB_DISABLE_TEST_IN_TSAN(TestConcurrentCounterSerializable)) {
833
0
  TestConcurrentCounter(IsolationLevel::SERIALIZABLE_ISOLATION);
834
0
}
835
836
0
TEST_F(PgLibPqTest, YB_DISABLE_TEST_IN_TSAN(TestConcurrentCounterRepeatableRead)) {
837
0
  TestConcurrentCounter(IsolationLevel::SNAPSHOT_ISOLATION);
838
0
}
839
840
0
TEST_F(PgLibPqTest, YB_DISABLE_TEST_IN_TSAN(SecondaryIndexInsertSelect)) {
841
0
  constexpr int kThreads = 4;
842
843
0
  auto conn = ASSERT_RESULT(Connect());
844
845
0
  ASSERT_OK(conn.Execute("CREATE TABLE t (a INT PRIMARY KEY, b INT)"));
846
0
  ASSERT_OK(conn.Execute("CREATE INDEX ON t (b, a)"));
847
848
0
  TestThreadHolder holder;
849
0
  std::array<std::atomic<int>, kThreads> written;
850
0
  for (auto& w : written) {
851
0
    w.store(0, std::memory_order_release);
852
0
  }
853
854
0
  for (int i = 0; i != kThreads; ++i) {
855
0
    holder.AddThread([this, i, &stop = holder.stop_flag(), &written] {
856
0
      auto connection = ASSERT_RESULT(Connect());
857
0
      int key = 0;
858
859
0
      while (!stop.load(std::memory_order_acquire)) {
860
0
        if (RandomUniformBool()) {
861
0
          int a = i * 1000000 + key;
862
0
          int b = key;
863
0
          ASSERT_OK(connection.ExecuteFormat("INSERT INTO t (a, b) VALUES ($0, $1)", a, b));
864
0
          written[i].store(++key, std::memory_order_release);
865
0
        } else {
866
0
          int writer_index = RandomUniformInt(0, kThreads - 1);
867
0
          int num_written = written[writer_index].load(std::memory_order_acquire);
868
0
          if (num_written == 0) {
869
0
            continue;
870
0
          }
871
0
          int read_key = num_written - 1;
872
0
          int b = read_key;
873
0
          int read_a = ASSERT_RESULT(connection.FetchValue<int32_t>(
874
0
              Format("SELECT a FROM t WHERE b = $0 LIMIT 1", b)));
875
0
          ASSERT_EQ(read_a % 1000000, read_key);
876
0
        }
877
0
      }
878
0
    });
879
0
  }
880
881
0
  holder.WaitAndStop(60s);
882
0
}
883
884
0
void AssertRows(PGConn *conn, int expected_num_rows) {
885
0
  auto res = ASSERT_RESULT(conn->Fetch("SELECT * FROM test"));
886
0
  ASSERT_EQ(PQntuples(res.get()), expected_num_rows);
887
0
}
888
889
0
TEST_F(PgLibPqTest, YB_DISABLE_TEST_IN_TSAN(InTxnDelete)) {
890
0
  auto conn = ASSERT_RESULT(Connect());
891
892
0
  ASSERT_OK(conn.Execute("CREATE TABLE test (pk int PRIMARY KEY)"));
893
0
  ASSERT_OK(conn.Execute("BEGIN"));
894
0
  ASSERT_OK(conn.Execute("INSERT INTO test VALUES (1)"));
895
0
  ASSERT_NO_FATALS(AssertRows(&conn, 1));
896
0
  ASSERT_OK(conn.Execute("DELETE FROM test"));
897
0
  ASSERT_NO_FATALS(AssertRows(&conn, 0));
898
0
  ASSERT_OK(conn.Execute("INSERT INTO test VALUES (1)"));
899
0
  ASSERT_NO_FATALS(AssertRows(&conn, 1));
900
0
  ASSERT_OK(conn.Execute("COMMIT"));
901
902
0
  ASSERT_NO_FATALS(AssertRows(&conn, 1));
903
0
}
904
905
namespace {
906
907
Result<string> GetNamespaceIdByNamespaceName(
908
0
    client::YBClient* client, const string& namespace_name) {
909
0
  const auto namespaces = VERIFY_RESULT(client->ListNamespaces(YQL_DATABASE_PGSQL));
910
0
  for (const auto& ns : namespaces) {
911
0
    if (ns.name() == namespace_name) {
912
0
      return ns.id();
913
0
    }
914
0
  }
915
0
  return STATUS(NotFound, "The namespace does not exist");
916
0
}
917
918
Result<string> GetTableIdByTableName(
919
0
    client::YBClient* client, const string& namespace_name, const string& table_name) {
920
0
  const auto tables = VERIFY_RESULT(client->ListTables());
921
0
  for (const auto& t : tables) {
922
0
    if (t.namespace_name() == namespace_name && t.table_name() == table_name) {
923
0
      return t.table_id();
924
0
    }
925
0
  }
926
0
  return STATUS(NotFound, "The table does not exist");
927
0
}
928
929
} // namespace
930
931
0
TEST_F(PgLibPqTest, YB_DISABLE_TEST_IN_TSAN(CompoundKeyColumnOrder)) {
932
0
  const string namespace_name = "yugabyte";
933
0
  const string table_name = "test";
934
0
  auto conn = ASSERT_RESULT(Connect());
935
0
  ASSERT_OK(conn.ExecuteFormat(
936
0
      "CREATE TABLE $0 (r2 int, r1 int, h int, v2 int, v1 int, primary key (h, r1, r2))",
937
0
      table_name));
938
0
  auto client = ASSERT_RESULT(cluster_->CreateClient());
939
940
0
  std::string table_id =
941
0
      ASSERT_RESULT(GetTableIdByTableName(client.get(), namespace_name, table_name));
942
0
  std::shared_ptr<client::YBTableInfo> table_info = std::make_shared<client::YBTableInfo>();
943
0
  {
944
0
    Synchronizer sync;
945
0
    ASSERT_OK(client->GetTableSchemaById(table_id, table_info, sync.AsStatusCallback()));
946
0
    ASSERT_OK(sync.Wait());
947
0
  }
948
949
0
  const auto& columns = table_info->schema.columns();
950
0
  std::array<string, 5> expected_column_names{"h", "r1", "r2", "v2", "v1"};
951
0
  ASSERT_EQ(expected_column_names.size(), columns.size());
952
0
  for (size_t i = 0; i < expected_column_names.size(); ++i) {
953
0
    ASSERT_EQ(columns[i].name(), expected_column_names[i]);
954
0
  }
955
0
}
956
957
0
TEST_F(PgLibPqTest, YB_DISABLE_TEST_IN_TSAN(BulkCopy)) {
958
0
  const std::string kTableName = "customer";
959
0
  auto conn = ASSERT_RESULT(Connect());
960
0
  ASSERT_OK(conn.ExecuteFormat(
961
0
      "CREATE TABLE CUSTOMER ( CUSTKEY     INTEGER NOT NULL PRIMARY KEY,\n"
962
0
      "                        NAME        VARCHAR(25) NOT NULL,\n"
963
0
      "                        ADDRESS     VARCHAR(40) NOT NULL,\n"
964
0
      "                        NATIONKEY   INTEGER NOT NULL,\n"
965
0
      "                        PHONE       CHAR(15) NOT NULL,\n"
966
0
      "                        MKTSEGMENT  CHAR(10) NOT NULL,\n"
967
0
      "                        COMMENT     VARCHAR(117) NOT NULL);",
968
0
      kTableName));
969
970
0
  constexpr int kNumBatches = 10;
971
0
  constexpr int kBatchSize = 1000;
972
973
0
  int customer_key = 0;
974
0
  for (int i = 0; i != kNumBatches; ++i) {
975
0
    ASSERT_OK(conn.CopyBegin(Format("COPY $0 FROM STDIN WITH BINARY", kTableName)));
976
0
    for (int j = 0; j != kBatchSize; ++j) {
977
0
      conn.CopyStartRow(7);
978
0
      conn.CopyPutInt32(++customer_key);
979
0
      conn.CopyPutString(Format("Name $0 $1", i, j));
980
0
      conn.CopyPutString(Format("Address $0 $1", i, j));
981
0
      conn.CopyPutInt32(i);
982
0
      conn.CopyPutString(std::to_string(999999876543210 + customer_key));
983
0
      conn.CopyPutString(std::to_string(9876543210 + customer_key));
984
0
      conn.CopyPutString(Format("Comment $0 $1", i, j));
985
0
    }
986
987
0
    ASSERT_OK(conn.CopyEnd());
988
0
  }
989
990
0
  LOG(INFO) << "Finished copy";
991
0
  for (;;) {
992
0
    auto result = conn.FetchFormat("SELECT COUNT(*) FROM $0", kTableName);
993
0
    if (result.ok()) {
994
0
      LogResult(result->get());
995
0
      auto count = ASSERT_RESULT(GetInt64(result->get(), 0, 0));
996
0
      LOG(INFO) << "Total count: " << count;
997
0
      ASSERT_EQ(count, kNumBatches * kBatchSize);
998
0
      break;
999
0
    } else {
1000
0
      auto message = result.status().ToString();
1001
0
      ASSERT_TRUE(message.find("Snaphost too old") != std::string::npos) << result.status();
1002
0
    }
1003
0
  }
1004
0
}
1005
1006
0
TEST_F(PgLibPqTest, YB_DISABLE_TEST_IN_TSAN(CatalogManagerMapsTest)) {
1007
0
  auto conn = ASSERT_RESULT(Connect());
1008
0
  ASSERT_OK(conn.Execute("CREATE DATABASE test_db"));
1009
0
  {
1010
0
    auto test_conn = ASSERT_RESULT(ConnectToDB("test_db"));
1011
0
    ASSERT_OK(test_conn.Execute("CREATE TABLE foo (a int PRIMARY KEY)"));
1012
0
    ASSERT_OK(test_conn.Execute("ALTER TABLE foo RENAME TO bar"));
1013
0
    ASSERT_OK(test_conn.Execute("ALTER TABLE bar RENAME COLUMN a to b"));
1014
0
  }
1015
0
  ASSERT_OK(conn.Execute("ALTER DATABASE test_db RENAME TO test_db_renamed"));
1016
1017
0
  auto client = ASSERT_RESULT(cluster_->CreateClient());
1018
0
  Result<bool> result(false);
1019
0
  result = client->TableExists(client::YBTableName(YQL_DATABASE_PGSQL, "test_db_renamed", "bar"));
1020
0
  ASSERT_OK(result);
1021
0
  ASSERT_TRUE(result.get());
1022
0
  result = client->TableExists(client::YBTableName(YQL_DATABASE_PGSQL, "test_db_renamed", "foo"));
1023
0
  ASSERT_OK(result);
1024
0
  ASSERT_FALSE(result.get());
1025
0
  result = client->NamespaceExists("test_db_renamed", YQL_DATABASE_PGSQL);
1026
0
  ASSERT_OK(result);
1027
0
  ASSERT_TRUE(result.get());
1028
0
  result = client->NamespaceExists("test_db", YQL_DATABASE_PGSQL);
1029
0
  ASSERT_OK(result);
1030
0
  ASSERT_FALSE(result.get());
1031
1032
0
  std::string table_id =
1033
0
      ASSERT_RESULT(GetTableIdByTableName(client.get(), "test_db_renamed", "bar"));
1034
0
  std::shared_ptr<client::YBTableInfo> table_info = std::make_shared<client::YBTableInfo>();
1035
0
  {
1036
0
    Synchronizer sync;
1037
0
    ASSERT_OK(client->GetTableSchemaById(table_id, table_info, sync.AsStatusCallback()));
1038
0
    ASSERT_OK(sync.Wait());
1039
0
  }
1040
0
  ASSERT_EQ(table_info->schema.num_columns(), 1);
1041
0
  ASSERT_EQ(table_info->schema.Column(0).name(), "b");
1042
0
}
1043
1044
0
TEST_F(PgLibPqTest, YB_DISABLE_TEST_IN_TSAN(TestSystemTableRollback)) {
1045
0
  auto conn1 = ASSERT_RESULT(Connect());
1046
0
  ASSERT_OK(conn1.Execute("CREATE TABLE pktable (ptest1 int PRIMARY KEY);"));
1047
0
  Status s = conn1.Execute("CREATE TABLE fktable (ftest1 inet REFERENCES pktable);");
1048
0
  LOG(INFO) << "Status of second table creation: " << s;
1049
0
  auto res = ASSERT_RESULT(conn1.Fetch("SELECT * FROM pg_class WHERE relname='fktable'"));
1050
0
  ASSERT_EQ(0, PQntuples(res.get()));
1051
0
}
1052
1053
namespace {
1054
1055
Result<master::TabletLocationsPB> GetColocatedTabletLocations(
1056
    client::YBClient* client,
1057
    std::string database_name,
1058
0
    MonoDelta timeout) {
1059
0
  const string ns_id =
1060
0
      VERIFY_RESULT(GetNamespaceIdByNamespaceName(client, database_name));
1061
1062
0
  google::protobuf::RepeatedPtrField<master::TabletLocationsPB> tablets;
1063
1064
  // Get TabletLocations for the colocated tablet.
1065
0
  RETURN_NOT_OK(WaitFor(
1066
0
      [&]() -> Result<bool> {
1067
0
        Status s = client->GetTabletsFromTableId(
1068
0
            ns_id + master::kColocatedParentTableIdSuffix,
1069
0
            0 /* max_tablets */,
1070
0
            &tablets);
1071
0
        if (s.ok()) {
1072
0
          return tablets.size() == 1;
1073
0
        } else if (s.IsNotFound()) {
1074
0
          return false;
1075
0
        } else {
1076
0
          return s;
1077
0
        }
1078
0
      },
1079
0
      timeout,
1080
0
      "wait for colocated parent tablet"));
1081
1082
0
  return tablets[0];
1083
0
}
1084
1085
0
const TableId GetTableGroupTableId(const std::string& tablegroup_id) {
1086
0
  return tablegroup_id + master::kTablegroupParentTableIdSuffix;
1087
0
}
1088
1089
Result<master::TabletLocationsPB> GetTablegroupTabletLocations(
1090
    client::YBClient* client,
1091
    std::string database_name,
1092
    std::string tablegroup_id,
1093
0
    MonoDelta timeout) {
1094
0
  google::protobuf::RepeatedPtrField<master::TabletLocationsPB> tablets;
1095
1096
0
  bool exists = VERIFY_RESULT(client->TablegroupExists(database_name, tablegroup_id));
1097
0
  if (!exists) {
1098
0
    return STATUS(NotFound, "tablegroup does not exist");
1099
0
  }
1100
1101
  // Get TabletLocations for the tablegroup tablet.
1102
0
  RETURN_NOT_OK(WaitFor(
1103
0
      [&]() -> Result<bool> {
1104
0
        Status s = client->GetTabletsFromTableId(
1105
0
            GetTableGroupTableId(tablegroup_id),
1106
0
            0 /* max_tablets */,
1107
0
            &tablets);
1108
0
        if (s.ok()) {
1109
0
          return tablets.size() == 1;
1110
0
        } else if (s.IsNotFound()) {
1111
0
          return false;
1112
0
        } else {
1113
0
          return s;
1114
0
        }
1115
0
      },
1116
0
      timeout,
1117
0
      "wait for tablegroup parent tablet"));
1118
1119
0
  return tablets[0];
1120
0
}
1121
1122
} // namespace
1123
1124
0
TEST_F(PgLibPqTest, YB_DISABLE_TEST_IN_TSAN(TableColocation)) {
1125
0
  auto client = ASSERT_RESULT(cluster_->CreateClient());
1126
0
  const string kDatabaseName = "test_db";
1127
0
  google::protobuf::RepeatedPtrField<master::TabletLocationsPB> tablets;
1128
0
  google::protobuf::RepeatedPtrField<master::TabletLocationsPB> tablets_bar_index;
1129
1130
0
  auto conn = ASSERT_RESULT(Connect());
1131
0
  ASSERT_OK(conn.ExecuteFormat("CREATE DATABASE $0 WITH colocated = true", kDatabaseName));
1132
0
  conn = ASSERT_RESULT(ConnectToDB(kDatabaseName));
1133
1134
  // A parent table with one tablet should be created when the database is created.
1135
0
  const auto colocated_tablet_locations = ASSERT_RESULT(GetColocatedTabletLocations(
1136
0
      client.get(),
1137
0
      kDatabaseName,
1138
0
      30s));
1139
0
  const auto colocated_tablet_id = colocated_tablet_locations.tablet_id();
1140
0
  const auto colocated_table = ASSERT_RESULT(client->OpenTable(
1141
0
      colocated_tablet_locations.table_id()));
1142
1143
  // Create a range partition table, the table should share the tablet with the parent table.
1144
0
  ASSERT_OK(conn.Execute("CREATE TABLE foo (a INT, PRIMARY KEY (a ASC))"));
1145
0
  auto table_id = ASSERT_RESULT(GetTableIdByTableName(client.get(), kDatabaseName, "foo"));
1146
0
  ASSERT_OK(client->GetTabletsFromTableId(table_id, 0, &tablets));
1147
0
  ASSERT_EQ(tablets.size(), 1);
1148
0
  ASSERT_EQ(tablets[0].tablet_id(), colocated_tablet_id);
1149
1150
  // Create a colocated index table.
1151
0
  ASSERT_OK(conn.Execute("CREATE INDEX foo_index1 ON foo (a)"));
1152
0
  table_id = ASSERT_RESULT(GetTableIdByTableName(client.get(), kDatabaseName, "foo_index1"));
1153
0
  ASSERT_OK(client->GetTabletsFromTableId(table_id, 0, &tablets));
1154
0
  ASSERT_EQ(tablets.size(), 1);
1155
0
  ASSERT_EQ(tablets[0].tablet_id(), colocated_tablet_id);
1156
1157
  // Create a hash partition table and opt out of using the parent tablet.
1158
0
  ASSERT_OK(conn.Execute("CREATE TABLE bar (a INT) WITH (colocated = false)"));
1159
0
  table_id = ASSERT_RESULT(GetTableIdByTableName(client.get(), kDatabaseName, "bar"));
1160
0
  ASSERT_OK(client->GetTabletsFromTableId(table_id, 0, &tablets));
1161
0
  for (auto& tablet : tablets) {
1162
0
    ASSERT_NE(tablet.tablet_id(), colocated_tablet_id);
1163
0
  }
1164
1165
  // Create an index on the non-colocated table. The index should follow the table and opt out of
1166
  // colocation.
1167
0
  ASSERT_OK(conn.Execute("CREATE INDEX bar_index ON bar (a)"));
1168
0
  table_id = ASSERT_RESULT(GetTableIdByTableName(client.get(), kDatabaseName, "bar_index"));
1169
0
  const auto table_bar_index = ASSERT_RESULT(client->OpenTable(table_id));
1170
0
  ASSERT_OK(client->GetTabletsFromTableId(table_id, 0, &tablets));
1171
0
  for (auto& tablet : tablets) {
1172
0
    ASSERT_NE(tablet.tablet_id(), colocated_tablet_id);
1173
0
  }
1174
0
  tablets_bar_index.Swap(&tablets);
1175
1176
  // Create a range partition table without specifying primary key.
1177
0
  ASSERT_OK(conn.Execute("CREATE TABLE baz (a INT)"));
1178
0
  table_id = ASSERT_RESULT(GetTableIdByTableName(client.get(), kDatabaseName, "baz"));
1179
0
  ASSERT_OK(client->GetTabletsFromTableId(table_id, 0, &tablets));
1180
0
  ASSERT_EQ(tablets.size(), 1);
1181
0
  ASSERT_EQ(tablets[0].tablet_id(), colocated_tablet_id);
1182
1183
  // Create another table and index.
1184
0
  ASSERT_OK(conn.Execute("CREATE TABLE qux (a INT, PRIMARY KEY (a ASC)) WITH (colocated = true)"));
1185
0
  ASSERT_OK(conn.Execute("CREATE INDEX qux_index ON qux (a)"));
1186
0
  table_id = ASSERT_RESULT(GetTableIdByTableName(client.get(), kDatabaseName, "qux_index"));
1187
0
  ASSERT_OK(client->GetTabletsFromTableId(table_id, 0, &tablets));
1188
1189
  // Drop a table in the parent tablet.
1190
0
  ASSERT_OK(conn.Execute("DROP TABLE qux"));
1191
0
  ASSERT_FALSE(ASSERT_RESULT(
1192
0
        client->TableExists(client::YBTableName(YQL_DATABASE_PGSQL, kDatabaseName, "qux"))));
1193
0
  ASSERT_FALSE(ASSERT_RESULT(
1194
0
        client->TableExists(client::YBTableName(YQL_DATABASE_PGSQL, kDatabaseName, "qux_index"))));
1195
1196
  // Drop a table that is opted out.
1197
0
  ASSERT_OK(conn.Execute("DROP TABLE bar"));
1198
0
  ASSERT_FALSE(ASSERT_RESULT(
1199
0
        client->TableExists(client::YBTableName(YQL_DATABASE_PGSQL, kDatabaseName, "bar"))));
1200
0
  ASSERT_FALSE(ASSERT_RESULT(
1201
0
        client->TableExists(client::YBTableName(YQL_DATABASE_PGSQL, kDatabaseName, "bar_index"))));
1202
1203
  // The tablets for bar_index should be deleted.
1204
0
  std::vector<bool> tablet_founds(tablets_bar_index.size(), true);
1205
0
  ASSERT_OK(WaitFor(
1206
0
      [&] {
1207
0
        for (int i = 0; i < tablets_bar_index.size(); ++i) {
1208
0
          client->LookupTabletById(
1209
0
              tablets_bar_index[i].tablet_id(),
1210
0
              table_bar_index,
1211
0
              master::IncludeInactive::kFalse,
1212
0
              CoarseMonoClock::Now() + 30s,
1213
0
              [&, i](const Result<client::internal::RemoteTabletPtr>& result) {
1214
0
                tablet_founds[i] = result.ok();
1215
0
              },
1216
0
              client::UseCache::kFalse);
1217
0
        }
1218
0
        return std::all_of(
1219
0
            tablet_founds.cbegin(),
1220
0
            tablet_founds.cend(),
1221
0
            [](bool tablet_found) {
1222
0
              return !tablet_found;
1223
0
            });
1224
0
      },
1225
0
      30s, "Drop table opted out of colocation"));
1226
1227
  // Drop the database.
1228
0
  conn = ASSERT_RESULT(Connect());
1229
0
  ASSERT_OK(conn.ExecuteFormat("DROP DATABASE $0", kDatabaseName));
1230
0
  ASSERT_FALSE(ASSERT_RESULT(client->NamespaceExists(kDatabaseName, YQL_DATABASE_PGSQL)));
1231
0
  ASSERT_FALSE(ASSERT_RESULT(
1232
0
        client->TableExists(client::YBTableName(YQL_DATABASE_PGSQL, kDatabaseName, "foo"))));
1233
0
  ASSERT_FALSE(ASSERT_RESULT(
1234
0
        client->TableExists(client::YBTableName(YQL_DATABASE_PGSQL, kDatabaseName, "foo_index1"))));
1235
1236
  // The colocation tablet should be deleted.
1237
0
  bool tablet_found = true;
1238
0
  int rpc_calls = 0;
1239
0
  ASSERT_OK(WaitFor(
1240
0
      [&] {
1241
0
        rpc_calls++;
1242
0
        client->LookupTabletById(
1243
0
            colocated_tablet_id,
1244
0
            colocated_table,
1245
0
              master::IncludeInactive::kFalse,
1246
0
            CoarseMonoClock::Now() + 30s,
1247
0
            [&](const Result<client::internal::RemoteTabletPtr>& result) {
1248
0
              tablet_found = result.ok();
1249
0
              rpc_calls--;
1250
0
            },
1251
0
            client::UseCache::kFalse);
1252
0
        return !tablet_found;
1253
0
      },
1254
0
      30s, "Drop colocated database"));
1255
  // To prevent an "AddressSanitizer: stack-use-after-scope", do not return from this function until
1256
  // all callbacks are done.
1257
0
  ASSERT_OK(WaitFor(
1258
0
      [&rpc_calls] {
1259
0
        LOG(INFO) << "Waiting for " << rpc_calls << " RPCs to run callbacks";
1260
0
        return rpc_calls == 0;
1261
0
      },
1262
0
      30s, "Drop colocated database (wait for RPCs to finish)"));
1263
0
}
1264
1265
// Test for ensuring that transaction conflicts work as expected for colocated tables.
1266
// Related to https://github.com/yugabyte/yugabyte-db/issues/3251.
1267
0
TEST_F(PgLibPqTest, YB_DISABLE_TEST_IN_TSAN(TxnConflictsForColocatedTables)) {
1268
0
  auto conn = ASSERT_RESULT(Connect());
1269
0
  ASSERT_OK(conn.Execute("CREATE DATABASE test_db WITH colocated = true"));
1270
1271
0
  auto conn1 = ASSERT_RESULT(ConnectToDB("test_db"));
1272
0
  auto conn2 = ASSERT_RESULT(ConnectToDB("test_db"));
1273
1274
0
  ASSERT_OK(conn1.Execute("CREATE TABLE t (a INT, PRIMARY KEY (a ASC))"));
1275
0
  ASSERT_OK(conn1.Execute("INSERT INTO t(a) VALUES(1)"));
1276
1277
  // From conn1, select the row in UPDATE row lock mode. From conn2, delete the row.
1278
  // Ensure that conn1's transaction will detect a conflict at the time of commit.
1279
0
  ASSERT_OK(conn1.StartTransaction(IsolationLevel::SERIALIZABLE_ISOLATION));
1280
0
  auto res = ASSERT_RESULT(conn1.Fetch("SELECT * FROM t FOR UPDATE"));
1281
0
  ASSERT_EQ(PQntuples(res.get()), 1);
1282
1283
0
  auto status = conn2.Execute("DELETE FROM t WHERE a = 1");
1284
0
  ASSERT_FALSE(status.ok());
1285
0
  ASSERT_EQ(PgsqlError(status), YBPgErrorCode::YB_PG_T_R_SERIALIZATION_FAILURE) << status;
1286
0
  ASSERT_STR_CONTAINS(status.ToString(), "Conflicts with higher priority transaction");
1287
1288
0
  ASSERT_OK(conn1.CommitTransaction());
1289
1290
  // Ensure that reads to separate tables in a colocated database do not conflict.
1291
0
  ASSERT_OK(conn1.Execute("CREATE TABLE t2 (a INT, PRIMARY KEY (a ASC))"));
1292
0
  ASSERT_OK(conn1.Execute("INSERT INTO t2(a) VALUES(1)"));
1293
1294
0
  ASSERT_OK(conn1.StartTransaction(IsolationLevel::SERIALIZABLE_ISOLATION));
1295
0
  ASSERT_OK(conn2.StartTransaction(IsolationLevel::SERIALIZABLE_ISOLATION));
1296
1297
0
  res = ASSERT_RESULT(conn1.Fetch("SELECT * FROM t FOR UPDATE"));
1298
0
  ASSERT_EQ(PQntuples(res.get()), 1);
1299
0
  res = ASSERT_RESULT(conn2.Fetch("SELECT * FROM t2 FOR UPDATE"));
1300
0
  ASSERT_EQ(PQntuples(res.get()), 1);
1301
1302
0
  ASSERT_OK(conn1.CommitTransaction());
1303
0
  ASSERT_OK(conn2.CommitTransaction());
1304
0
}
1305
1306
// Ensure tablet bootstrap doesn't crash when replaying change metadata operations
1307
// for a deleted colocated table. This is a regression test for #6096.
1308
0
TEST_F(PgLibPqTest, YB_DISABLE_TEST_IN_TSAN(ReplayDeletedTableInColocatedDB)) {
1309
0
  const std::string kDatabaseName = "testdb";
1310
0
  constexpr int kTimeoutSecs = 30;
1311
0
  auto client = ASSERT_RESULT(cluster_->CreateClient());
1312
1313
0
  PGConn conn = ASSERT_RESULT(Connect());
1314
0
  ASSERT_OK(conn.ExecuteFormat("CREATE DATABASE $0 with colocated=true", kDatabaseName));
1315
1316
0
  PGConn conn_new = ASSERT_RESULT(ConnectToDB(kDatabaseName));
1317
0
  ASSERT_OK(conn_new.Execute("CREATE TABLE foo (i int)"));
1318
0
  ASSERT_OK(conn_new.Execute("INSERT INTO foo VALUES (10)"));
1319
1320
  // Flush tablets; requests from here on will be replayed from the WAL during bootstrap.
1321
0
  auto table_id = ASSERT_RESULT(GetTableIdByTableName(client.get(), kDatabaseName, "foo"));
1322
0
  ASSERT_OK(client->FlushTables(
1323
0
      {table_id},
1324
0
      false /* add_indexes */,
1325
0
      kTimeoutSecs,
1326
0
      false /* is_compaction */));
1327
1328
  // ALTER requires foo's table id to be in the TS raft metadata
1329
0
  ASSERT_OK(conn_new.Execute("ALTER TABLE foo ADD c char"));
1330
0
  ASSERT_OK(conn_new.Execute("ALTER TABLE foo RENAME COLUMN c to d"));
1331
  // but DROP will remove foo's table id from the TS raft metadata
1332
0
  ASSERT_OK(conn_new.Execute("DROP TABLE foo"));
1333
0
  ASSERT_OK(conn_new.Execute("CREATE TABLE bar (c char)"));
1334
1335
  // Restart a TS that serves this tablet so we do a local bootstrap and replay WAL files.
1336
  // Ensure we don't crash here due to missing table info in metadata when replaying the ALTER.
1337
0
  ASSERT_NO_FATALS(cluster_->tablet_server(0)->Shutdown());
1338
1339
0
  LOG(INFO) << "Start tserver";
1340
0
  ASSERT_OK(cluster_->tablet_server(0)->Restart());
1341
0
  ASSERT_OK(cluster_->WaitForTabletsRunning(cluster_->tablet_server(0),
1342
0
      MonoDelta::FromSeconds(60)));
1343
1344
  // Ensure the rest of the WAL replayed successfully.
1345
0
  PGConn conn_after = ASSERT_RESULT(ConnectToDB(kDatabaseName));
1346
0
  auto res = ASSERT_RESULT(conn_after.FetchValue<int64_t>("SELECT COUNT(*) FROM bar"));
1347
0
  ASSERT_EQ(res, 0);
1348
0
}
1349
1350
class PgLibPqTablegroupTest : public PgLibPqTest {
1351
0
  void UpdateMiniClusterOptions(ExternalMiniClusterOptions* options) override {
1352
    // Enable tablegroup beta feature
1353
0
    options->extra_tserver_flags.push_back("--ysql_beta_feature_tablegroup=true");
1354
0
    options->extra_master_flags.push_back("--ysql_beta_feature_tablegroup=true");
1355
0
  }
1356
};
1357
1358
namespace {
1359
1360
struct TableGroupInfo {
1361
  int oid;
1362
  std::string id;
1363
  TabletId tablet_id;
1364
  std::shared_ptr<client::YBTable> table;
1365
};
1366
1367
Result<TableGroupInfo> SelectTableGroup(
1368
    client::YBClient* client, PGConn* conn, const std::string& database_name,
1369
0
    const std::string& group_name) {
1370
0
  TableGroupInfo group_info;
1371
0
  auto res = VERIFY_RESULT(
1372
0
      conn->FetchFormat("SELECT oid FROM pg_database WHERE datname=\'$0\'", database_name));
1373
0
  const int database_oid = VERIFY_RESULT(GetInt32(res.get(), 0, 0));
1374
0
  res = VERIFY_RESULT(
1375
0
      conn->FetchFormat("SELECT oid FROM pg_yb_tablegroup WHERE grpname=\'$0\'", group_name));
1376
0
  group_info.oid = VERIFY_RESULT(GetInt32(res.get(), 0, 0));
1377
1378
0
  group_info.id = GetPgsqlTablegroupId(database_oid, group_info.oid);
1379
0
  group_info.tablet_id = VERIFY_RESULT(GetTablegroupTabletLocations(
1380
0
      client,
1381
0
      database_name,
1382
0
      group_info.id,
1383
0
      30s))
1384
0
    .tablet_id();
1385
0
  group_info.table = VERIFY_RESULT(client->OpenTable(GetTableGroupTableId(group_info.id)));
1386
0
  return group_info;
1387
0
}
1388
1389
} // namespace
1390
1391
TEST_F_EX(PgLibPqTest, YB_DISABLE_TEST_IN_TSAN(ColocatedTablegroups),
1392
0
          PgLibPqTablegroupTest) {
1393
0
  auto client = ASSERT_RESULT(cluster_->CreateClient());
1394
0
  const string kDatabaseName ="tgroup_test_db";
1395
0
  const string kTablegroupName ="test_tgroup";
1396
0
  const string kTablegroupAltName ="test_alt_tgroup";
1397
0
  google::protobuf::RepeatedPtrField<master::TabletLocationsPB> tablets;
1398
0
  google::protobuf::RepeatedPtrField<master::TabletLocationsPB> tablets_bar_index;
1399
1400
0
  auto conn = ASSERT_RESULT(Connect());
1401
0
  ASSERT_OK(conn.ExecuteFormat("CREATE DATABASE $0", kDatabaseName));
1402
0
  conn = ASSERT_RESULT(ConnectToDB(kDatabaseName));
1403
0
  ASSERT_OK(conn.ExecuteFormat("CREATE TABLEGROUP $0", kTablegroupName));
1404
1405
  // A parent table with one tablet should be created when the tablegroup is created.
1406
0
  const auto tablegroup = ASSERT_RESULT(SelectTableGroup(
1407
0
      client.get(), &conn, kDatabaseName, kTablegroupName));
1408
1409
  // Create a range partition table, the table should share the tablet with the parent table.
1410
0
  ASSERT_OK(conn.ExecuteFormat("CREATE TABLE foo (a INT, PRIMARY KEY (a ASC)) TABLEGROUP $0",
1411
0
                               kTablegroupName));
1412
0
  auto table_id = ASSERT_RESULT(GetTableIdByTableName(client.get(), kDatabaseName, "foo"));
1413
0
  ASSERT_OK(client->GetTabletsFromTableId(table_id, 0, &tablets));
1414
0
  ASSERT_EQ(tablets.size(), 1);
1415
0
  ASSERT_EQ(tablets[0].tablet_id(), tablegroup.tablet_id);
1416
1417
  // Create a index table that uses the tablegroup by default.
1418
0
  ASSERT_OK(conn.Execute("CREATE INDEX foo_index1 ON foo (a)"));
1419
0
  table_id = ASSERT_RESULT(GetTableIdByTableName(client.get(), kDatabaseName, "foo_index1"));
1420
0
  ASSERT_OK(client->GetTabletsFromTableId(table_id, 0, &tablets));
1421
0
  ASSERT_EQ(tablets.size(), 1);
1422
0
  ASSERT_EQ(tablets[0].tablet_id(), tablegroup.tablet_id);
1423
1424
  // Create a hash partition table and dont use tablegroup.
1425
0
  ASSERT_OK(conn.Execute("CREATE TABLE bar (a INT)"));
1426
0
  table_id = ASSERT_RESULT(GetTableIdByTableName(client.get(), kDatabaseName, "bar"));
1427
0
  ASSERT_OK(client->GetTabletsFromTableId(table_id, 0, &tablets));
1428
0
  for (auto& tablet : tablets) {
1429
0
    ASSERT_NE(tablet.tablet_id(), tablegroup.tablet_id);
1430
0
  }
1431
1432
  // Create an index on the table not in a tablegroup. The index should follow the table
1433
  // and opt out of the tablegroup.
1434
0
  ASSERT_OK(conn.Execute("CREATE INDEX bar_index ON bar (a)"));
1435
0
  table_id = ASSERT_RESULT(GetTableIdByTableName(client.get(), kDatabaseName, "bar_index"));
1436
0
  const auto table_bar_index = ASSERT_RESULT(client->OpenTable(table_id));
1437
0
  ASSERT_OK(client->GetTabletsFromTableId(table_id, 0, &tablets));
1438
0
  for (auto& tablet : tablets) {
1439
0
    ASSERT_NE(tablet.tablet_id(), tablegroup.tablet_id);
1440
0
  }
1441
0
  tablets_bar_index.Swap(&tablets);
1442
1443
  // Create a range partition table without specifying primary key.
1444
0
  ASSERT_OK(conn.ExecuteFormat("CREATE TABLE baz (a INT) TABLEGROUP $0", kTablegroupName));
1445
0
  table_id = ASSERT_RESULT(GetTableIdByTableName(client.get(), kDatabaseName, "baz"));
1446
0
  ASSERT_OK(client->GetTabletsFromTableId(table_id, 0, &tablets));
1447
0
  ASSERT_EQ(tablets.size(), 1);
1448
0
  ASSERT_EQ(tablets[0].tablet_id(), tablegroup.tablet_id);
1449
1450
  // Create another table and index.
1451
0
  ASSERT_OK(conn.ExecuteFormat("CREATE TABLE qux (a INT, PRIMARY KEY (a ASC)) TABLEGROUP $0",
1452
0
                               kTablegroupName));
1453
0
  ASSERT_OK(conn.Execute("CREATE INDEX qux_index ON qux (a)"));
1454
0
  table_id = ASSERT_RESULT(GetTableIdByTableName(client.get(), kDatabaseName, "qux"));
1455
0
  ASSERT_OK(client->GetTabletsFromTableId(table_id, 0, &tablets));
1456
0
  ASSERT_EQ(tablets[0].tablet_id(), tablegroup.tablet_id);
1457
0
  table_id = ASSERT_RESULT(GetTableIdByTableName(client.get(), kDatabaseName, "qux_index"));
1458
0
  ASSERT_OK(client->GetTabletsFromTableId(table_id, 0, &tablets));
1459
0
  ASSERT_EQ(tablets[0].tablet_id(), tablegroup.tablet_id);
1460
1461
  // Now create a second tablegroup.
1462
0
  ASSERT_OK(conn.ExecuteFormat("CREATE TABLEGROUP $0", kTablegroupAltName));
1463
1464
  // A parent table with one tablet should be created when the tablegroup is created.
1465
0
  auto tablegroup_alt = ASSERT_RESULT(SelectTableGroup(
1466
0
      client.get(), &conn, kDatabaseName, kTablegroupAltName));
1467
1468
  // Create another range partition table - should be part of the second tablegroup
1469
0
  ASSERT_OK(conn.ExecuteFormat("CREATE TABLE quuz (a INT, PRIMARY KEY (a ASC)) TABLEGROUP $0",
1470
0
                               kTablegroupAltName));
1471
0
  table_id = ASSERT_RESULT(GetTableIdByTableName(client.get(), kDatabaseName, "quuz"));
1472
0
  ASSERT_OK(client->GetTabletsFromTableId(table_id, 0, &tablets));
1473
0
  ASSERT_EQ(tablets.size(), 1);
1474
0
  ASSERT_EQ(tablets[0].tablet_id(), tablegroup_alt.tablet_id);
1475
1476
    // Drop a table in the parent tablet.
1477
0
  ASSERT_OK(conn.Execute("DROP TABLE quuz"));
1478
0
  ASSERT_FALSE(ASSERT_RESULT(
1479
0
        client->TableExists(client::YBTableName(YQL_DATABASE_PGSQL, kDatabaseName, "quuz"))));
1480
0
  ASSERT_FALSE(ASSERT_RESULT(
1481
0
        client->TableExists(client::YBTableName(YQL_DATABASE_PGSQL, kDatabaseName, "quuz_index"))));
1482
1483
  // Drop a table that is opted out.
1484
0
  ASSERT_OK(conn.Execute("DROP TABLE bar"));
1485
0
  ASSERT_FALSE(ASSERT_RESULT(
1486
0
        client->TableExists(client::YBTableName(YQL_DATABASE_PGSQL, kDatabaseName, "bar"))));
1487
0
  ASSERT_FALSE(ASSERT_RESULT(
1488
0
        client->TableExists(client::YBTableName(YQL_DATABASE_PGSQL, kDatabaseName, "bar_index"))));
1489
1490
  // The tablets for bar_index should be deleted.
1491
0
  std::vector<bool> tablet_founds(tablets_bar_index.size(), true);
1492
0
  ASSERT_OK(WaitFor(
1493
0
      [&] {
1494
0
        for (int i = 0; i < tablets_bar_index.size(); ++i) {
1495
0
          client->LookupTabletById(
1496
0
              tablets_bar_index[i].tablet_id(),
1497
0
              table_bar_index,
1498
0
              master::IncludeInactive::kFalse,
1499
0
              CoarseMonoClock::Now() + 30s,
1500
0
              [&, i](const Result<client::internal::RemoteTabletPtr>& result) {
1501
0
                tablet_founds[i] = result.ok();
1502
0
              },
1503
0
              client::UseCache::kFalse);
1504
0
        }
1505
0
        return std::all_of(
1506
0
            tablet_founds.cbegin(),
1507
0
            tablet_founds.cend(),
1508
0
            [](bool tablet_found) {
1509
0
              return !tablet_found;
1510
0
            });
1511
0
      },
1512
0
      30s, "Drop table did not use tablegroups"));
1513
1514
  // Drop a tablegroup.
1515
0
  ASSERT_OK(conn.ExecuteFormat("DROP TABLEGROUP $0", kTablegroupAltName));
1516
0
  ASSERT_FALSE(ASSERT_RESULT(client->TablegroupExists(kDatabaseName, kTablegroupAltName)));
1517
1518
  // The alt tablegroup tablet should be deleted after dropping the tablegroup.
1519
0
  bool alt_tablet_found = true;
1520
0
  int rpc_calls = 0;
1521
0
  ASSERT_OK(WaitFor(
1522
0
      [&] {
1523
0
        rpc_calls++;
1524
0
        client->LookupTabletById(
1525
0
            tablegroup_alt.tablet_id,
1526
0
            tablegroup_alt.table,
1527
0
            master::IncludeInactive::kFalse,
1528
0
            CoarseMonoClock::Now() + 30s,
1529
0
            [&](const Result<client::internal::RemoteTabletPtr>& result) {
1530
0
              alt_tablet_found = result.ok();
1531
0
              rpc_calls--;
1532
0
            },
1533
0
            client::UseCache::kFalse);
1534
0
        return !alt_tablet_found;
1535
0
      },
1536
0
      30s, "Drop tablegroup"));
1537
1538
  // Recreate that tablegroup. Being able to recreate it and add tables to it tests that it was
1539
  // properly cleaned up from catalog manager maps and postgres metadata at time of DROP.
1540
0
  ASSERT_OK(conn.ExecuteFormat("CREATE TABLEGROUP $0", kTablegroupAltName));
1541
1542
  // A parent table with one tablet should be created when the tablegroup is created.
1543
0
  tablegroup_alt = ASSERT_RESULT(SelectTableGroup(
1544
0
        client.get(), &conn, kDatabaseName, kTablegroupAltName));
1545
1546
  // Add a table back in and ensure that it is part of the recreated tablegroup.
1547
0
  ASSERT_OK(conn.ExecuteFormat("CREATE TABLE quuz (a INT, PRIMARY KEY (a ASC)) TABLEGROUP $0",
1548
0
                               kTablegroupAltName));
1549
0
  table_id = ASSERT_RESULT(GetTableIdByTableName(client.get(), kDatabaseName, "quuz"));
1550
0
  ASSERT_OK(client->GetTabletsFromTableId(table_id, 0, &tablets));
1551
0
  ASSERT_EQ(tablets.size(), 1);
1552
0
  ASSERT_EQ(tablets[0].tablet_id(), tablegroup_alt.tablet_id);
1553
1554
  // Drop the database.
1555
0
  conn = ASSERT_RESULT(Connect());
1556
0
  ASSERT_OK(conn.ExecuteFormat("DROP DATABASE $0", kDatabaseName));
1557
0
  ASSERT_FALSE(ASSERT_RESULT(client->NamespaceExists(kDatabaseName, YQL_DATABASE_PGSQL)));
1558
0
  ASSERT_FALSE(ASSERT_RESULT(
1559
0
        client->TableExists(client::YBTableName(YQL_DATABASE_PGSQL, kDatabaseName, "foo"))));
1560
0
  ASSERT_FALSE(ASSERT_RESULT(
1561
0
        client->TableExists(client::YBTableName(YQL_DATABASE_PGSQL, kDatabaseName, "foo_index1"))));
1562
1563
  // The original tablegroup tablet should be deleted after dropping the database.
1564
0
  bool orig_tablet_found = true;
1565
0
  ASSERT_OK(WaitFor(
1566
0
      [&] {
1567
0
        rpc_calls++;
1568
0
        client->LookupTabletById(
1569
0
            tablegroup.tablet_id,
1570
0
            tablegroup.table,
1571
0
            master::IncludeInactive::kFalse,
1572
0
            CoarseMonoClock::Now() + 30s,
1573
0
            [&](const Result<client::internal::RemoteTabletPtr>& result) {
1574
0
              orig_tablet_found = result.ok();
1575
0
              rpc_calls--;
1576
0
            },
1577
0
            client::UseCache::kFalse);
1578
0
        return !orig_tablet_found;
1579
0
      },
1580
0
      30s, "Drop database with tablegroup"));
1581
1582
  // The second tablegroup tablet should also be deleted after dropping the database.
1583
0
  bool second_tablet_found = true;
1584
0
  ASSERT_OK(WaitFor(
1585
0
      [&] {
1586
0
        rpc_calls++;
1587
0
        client->LookupTabletById(
1588
0
            tablegroup_alt.tablet_id,
1589
0
            tablegroup_alt.table,
1590
0
            master::IncludeInactive::kFalse,
1591
0
            CoarseMonoClock::Now() + 30s,
1592
0
            [&](const Result<client::internal::RemoteTabletPtr>& result) {
1593
0
              second_tablet_found = result.ok();
1594
0
              rpc_calls--;
1595
0
            },
1596
0
            client::UseCache::kFalse);
1597
0
        return !second_tablet_found;
1598
0
      },
1599
0
      30s, "Drop database with tablegroup"));
1600
1601
  // To prevent an "AddressSanitizer: stack-use-after-scope", do not return from this function until
1602
  // all callbacks are done.
1603
0
  ASSERT_OK(WaitFor(
1604
0
      [&rpc_calls] {
1605
0
        LOG(INFO) << "Waiting for " << rpc_calls << " RPCs to run callbacks";
1606
0
        return rpc_calls == 0;
1607
0
      },
1608
0
      30s, "Drop database with tablegroup (wait for RPCs to finish)"));
1609
0
}
1610
1611
// Test that the number of RPCs sent to master upon first connection is not too high.
1612
// See https://github.com/yugabyte/yugabyte-db/issues/3049
1613
0
TEST_F(PgLibPqTest, YB_DISABLE_TEST_IN_TSAN(NumberOfInitialRpcs)) {
1614
0
  auto get_master_inbound_rpcs_created = [&cluster_ = this->cluster_]() -> Result<int64_t> {
1615
0
    int64_t m_in_created = 0;
1616
0
    for (auto* master : cluster_->master_daemons()) {
1617
0
      m_in_created += VERIFY_RESULT(master->GetInt64Metric(
1618
0
          &METRIC_ENTITY_server, "yb.master", &METRIC_rpc_inbound_calls_created, "value"));
1619
0
    }
1620
0
    return m_in_created;
1621
0
  };
1622
1623
0
  int64_t rpcs_before = ASSERT_RESULT(get_master_inbound_rpcs_created());
1624
0
  ASSERT_RESULT(Connect());
1625
0
  int64_t rpcs_after  = ASSERT_RESULT(get_master_inbound_rpcs_created());
1626
0
  int64_t rpcs_during = rpcs_after - rpcs_before;
1627
1628
  // Real-world numbers (debug build, local Mac): 328 RPCs before, 95 after the fix for #3049
1629
0
  LOG(INFO) << "Master inbound RPC during connection: " << rpcs_during;
1630
  // RPC counter is affected no only by table read/write operations but also by heartbeat mechanism.
1631
  // As far as ASAN/TSAN builds are slower they can receive more heartbeats while
1632
  // processing requests. As a result RPC count might be higher in comparison to other build types.
1633
0
  ASSERT_LT(rpcs_during, RegularBuildVsSanitizers(150, 200));
1634
0
}
1635
1636
0
TEST_F(PgLibPqTest, YB_DISABLE_TEST_IN_TSAN(RangePresplit)) {
1637
0
  const string kDatabaseName ="yugabyte";
1638
0
  auto client = ASSERT_RESULT(cluster_->CreateClient());
1639
1640
0
  auto conn = ASSERT_RESULT(ConnectToDB(kDatabaseName));
1641
0
  ASSERT_OK(conn.Execute("CREATE TABLE range(a int, PRIMARY KEY(a ASC)) " \
1642
0
      "SPLIT AT VALUES ((100), (1000))"));
1643
1644
0
  auto ns_id = ASSERT_RESULT(GetNamespaceIdByNamespaceName(client.get(), kDatabaseName));
1645
0
  ASSERT_FALSE(ns_id.empty());
1646
1647
0
  google::protobuf::RepeatedPtrField<master::TabletLocationsPB> tablets;
1648
0
  auto table_id = ASSERT_RESULT(GetTableIdByTableName(client.get(), kDatabaseName, "range"));
1649
1650
  // Validate that number of tablets created is 3.
1651
0
  ASSERT_OK(client->GetTabletsFromTableId(table_id, 0, &tablets));
1652
0
  ASSERT_EQ(tablets.size(), 3);
1653
0
}
1654
1655
// Override the base test to start a cluster that kicks out unresponsive tservers faster.
1656
class PgLibPqTestSmallTSTimeout : public PgLibPqTest {
1657
 public:
1658
0
  void UpdateMiniClusterOptions(ExternalMiniClusterOptions* options) override {
1659
0
    options->extra_master_flags.push_back("--tserver_unresponsive_timeout_ms=8000");
1660
0
    options->extra_master_flags.push_back("--unresponsive_ts_rpc_timeout_ms=10000");
1661
0
    options->extra_tserver_flags.push_back("--follower_unavailable_considered_failed_sec=8");
1662
0
  }
1663
};
1664
1665
// Test that adding a tserver and removing a tserver causes the colocation tablet to adjust raft
1666
// configuration off the old tserver and onto the new tserver.
1667
TEST_F_EX(PgLibPqTest,
1668
          YB_DISABLE_TEST_IN_TSAN(LoadBalanceSingleColocatedDB),
1669
0
          PgLibPqTestSmallTSTimeout) {
1670
0
  const std::string kDatabaseName = "co";
1671
0
  const auto kTimeout = 60s;
1672
0
  const auto starting_num_tablet_servers = cluster_->num_tablet_servers();
1673
0
  ExternalMiniClusterOptions opts;
1674
0
  std::map<std::string, int> ts_loads;
1675
0
  static const int tserver_unresponsive_timeout_ms = 8000;
1676
1677
0
  auto client = ASSERT_RESULT(cluster_->CreateClient());
1678
0
  auto conn = ASSERT_RESULT(Connect());
1679
1680
0
  ASSERT_OK(conn.ExecuteFormat("CREATE DATABASE $0 WITH colocated = true", kDatabaseName));
1681
1682
  // Collect colocation tablet replica locations.
1683
0
  {
1684
0
    master::TabletLocationsPB tablet_locations = ASSERT_RESULT(GetColocatedTabletLocations(
1685
0
        client.get(),
1686
0
        kDatabaseName,
1687
0
        kTimeout));
1688
0
    for (const auto& replica : tablet_locations.replicas()) {
1689
0
      ts_loads[replica.ts_info().permanent_uuid()]++;
1690
0
    }
1691
0
  }
1692
1693
  // Ensure each tserver has exactly one colocation tablet replica.
1694
0
  ASSERT_EQ(ts_loads.size(), starting_num_tablet_servers);
1695
0
  for (const auto& entry : ts_loads) {
1696
0
    ASSERT_NOTNULL(cluster_->tablet_server_by_uuid(entry.first));
1697
0
    ASSERT_EQ(entry.second, 1);
1698
0
    LOG(INFO) << "found ts " << entry.first << " has " << entry.second << " replicas";
1699
0
  }
1700
1701
  // Add a tablet server.
1702
0
  UpdateMiniClusterOptions(&opts);
1703
0
  ASSERT_OK(cluster_->AddTabletServer(ExternalMiniClusterOptions::kDefaultStartCqlProxy,
1704
0
                                      opts.extra_tserver_flags));
1705
0
  ASSERT_OK(cluster_->WaitForTabletServerCount(starting_num_tablet_servers + 1, kTimeout));
1706
1707
  // Wait for load balancing.  This should move some tablet-peers (e.g. of the colocation tablet,
1708
  // system.transactions tablets) to the new tserver.
1709
0
  ASSERT_OK(WaitFor(
1710
0
      [&]() -> Result<bool> {
1711
0
        bool is_idle = VERIFY_RESULT(client->IsLoadBalancerIdle());
1712
0
        return !is_idle;
1713
0
      },
1714
0
      kTimeout,
1715
0
      "wait for load balancer to be active"));
1716
0
  ASSERT_OK(WaitFor(
1717
0
      [&]() -> Result<bool> {
1718
0
        return client->IsLoadBalancerIdle();
1719
0
      },
1720
0
      kTimeout,
1721
0
      "wait for load balancer to be idle"));
1722
1723
  // Remove a tablet server.
1724
0
  cluster_->tablet_server(0)->Shutdown();
1725
1726
  // Wait for the master leader to mark it dead.
1727
0
  ASSERT_OK(WaitFor([&]() -> Result<bool> {
1728
0
    return cluster_->is_ts_stale(0);
1729
0
  },
1730
0
  MonoDelta::FromMilliseconds(2 * tserver_unresponsive_timeout_ms),
1731
0
  "Is TS dead",
1732
0
  MonoDelta::FromSeconds(1)));
1733
1734
  // Collect colocation tablet replica locations and verify that load has been moved off
1735
  // from the dead TS.
1736
0
  ASSERT_OK(LoggedWaitFor([&]() -> Result<bool> {
1737
0
    master::TabletLocationsPB tablet_locations = VERIFY_RESULT(GetColocatedTabletLocations(
1738
0
        client.get(),
1739
0
        kDatabaseName,
1740
0
        kTimeout));
1741
0
    ts_loads.clear();
1742
0
    for (const auto& replica : tablet_locations.replicas()) {
1743
0
      ts_loads[replica.ts_info().permanent_uuid()]++;
1744
0
    }
1745
    // Ensure each colocation tablet replica is on the three tablet servers excluding the first one,
1746
    // which is shut down.
1747
0
    if (ts_loads.size() != starting_num_tablet_servers) {
1748
0
      return false;
1749
0
    }
1750
0
    for (const auto& entry : ts_loads) {
1751
0
      ExternalTabletServer* ts = cluster_->tablet_server_by_uuid(entry.first);
1752
0
      if (ts == nullptr || ts == cluster_->tablet_server(0) || entry.second != 1) {
1753
0
        return false;
1754
0
      }
1755
0
    }
1756
0
    return true;
1757
0
  },
1758
0
  kTimeout,
1759
0
  "Wait for load to be moved off from tserver 0"));
1760
0
}
1761
1762
// Test that adding a tserver causes colocation tablets to offload tablet-peers to the new tserver.
1763
0
TEST_F(PgLibPqTest, YB_DISABLE_TEST_IN_TSAN(LoadBalanceMultipleColocatedDB)) {
1764
0
  constexpr int kNumDatabases = 3;
1765
0
  const auto kTimeout = 60s;
1766
0
  const size_t starting_num_tablet_servers = cluster_->num_tablet_servers();
1767
0
  const std::string kDatabasePrefix = "co";
1768
0
  std::map<std::string, int> ts_loads;
1769
1770
0
  auto client = ASSERT_RESULT(cluster_->CreateClient());
1771
0
  auto conn = ASSERT_RESULT(Connect());
1772
1773
0
  for (int i = 0; i < kNumDatabases; ++i) {
1774
0
    ASSERT_OK(conn.ExecuteFormat("CREATE DATABASE $0$1 WITH colocated = true", kDatabasePrefix, i));
1775
0
  }
1776
1777
  // Add a tablet server.
1778
0
  ASSERT_OK(cluster_->AddTabletServer());
1779
0
  ASSERT_OK(cluster_->WaitForTabletServerCount(starting_num_tablet_servers + 1, kTimeout));
1780
1781
  // Wait for load balancing.  This should move some tablet-peers (e.g. of the colocation tablets,
1782
  // system.transactions tablets) to the new tserver.
1783
0
  ASSERT_OK(WaitFor(
1784
0
      [&]() -> Result<bool> {
1785
0
        bool is_idle = VERIFY_RESULT(client->IsLoadBalancerIdle());
1786
0
        return !is_idle;
1787
0
      },
1788
0
      kTimeout,
1789
0
      "wait for load balancer to be active"));
1790
0
  ASSERT_OK(WaitFor(
1791
0
      [&]() -> Result<bool> {
1792
0
        return client->IsLoadBalancerIdle();
1793
0
      },
1794
0
      kTimeout,
1795
0
      "wait for load balancer to be idle"));
1796
1797
  // Collect colocation tablets' replica locations.
1798
0
  for (int i = 0; i < kNumDatabases; ++i) {
1799
0
    master::TabletLocationsPB tablet_locations = ASSERT_RESULT(GetColocatedTabletLocations(
1800
0
        client.get(),
1801
0
        Format("$0$1", kDatabasePrefix, i),
1802
0
        kTimeout));
1803
0
    for (const auto& replica : tablet_locations.replicas()) {
1804
0
      ts_loads[replica.ts_info().permanent_uuid()]++;
1805
0
    }
1806
0
  }
1807
1808
  // Ensure that the load is properly distributed.
1809
0
  int min_load = kNumDatabases;
1810
0
  int max_load = 0;
1811
0
  for (const auto& entry : ts_loads) {
1812
0
    if (entry.second < min_load) {
1813
0
      min_load = entry.second;
1814
0
    } else if (entry.second > max_load) {
1815
0
      max_load = entry.second;
1816
0
    }
1817
0
  }
1818
0
  LOG(INFO) << "Found max_load on a TS = " << max_load << ", and min_load on a ts = " << min_load;
1819
0
  ASSERT_LT(max_load - min_load, 2);
1820
0
  ASSERT_EQ(ts_loads.size(), kNumDatabases + 1);
1821
0
}
1822
1823
// Override the base test to start a cluster with transparent retries on cache version mismatch
1824
// disabled.
1825
class PgLibPqTestNoRetry : public PgLibPqTest {
1826
 public:
1827
0
  void UpdateMiniClusterOptions(ExternalMiniClusterOptions* options) override {
1828
0
    options->extra_tserver_flags.push_back(
1829
0
        "--TEST_ysql_disable_transparent_cache_refresh_retry=true");
1830
0
  }
1831
};
1832
1833
// This test is like "TestPgCacheConsistency#testVersionMismatchWithFailedRetry".  That one gets
1834
// failures because the queries are "parse" message types, and we don't consider retry for those.
1835
// These queries are "simple query" message types, so they should be considered for transparent
1836
// retry.  The last factor is whether `--TEST_ysql_disable_transparent_cache_refresh_retry` is
1837
// specified.
1838
0
void PgLibPqTest::TestCacheRefreshRetry(const bool is_retry_disabled) {
1839
0
  constexpr int kNumTries = 5;
1840
0
  const std::string kNamespaceName = "yugabyte";
1841
0
  const std::string kTableName = "t";
1842
0
  int num_successes = 0;
1843
0
  std::array<PGConn, 2> conns = {
1844
0
    ASSERT_RESULT(ConnectToDB(kNamespaceName, true /* simple_query_protocol */)),
1845
0
    ASSERT_RESULT(ConnectToDB(kNamespaceName, true /* simple_query_protocol */)),
1846
0
  };
1847
1848
0
  ASSERT_OK(conns[0].ExecuteFormat("CREATE TABLE $0 (i int)", kTableName));
1849
  // Make the catalog version cache up to date.
1850
0
  ASSERT_OK(conns[1].FetchFormat("SELECT * FROM $0", kTableName));
1851
1852
0
  for (int i = 0; i < kNumTries; ++i) {
1853
0
    ASSERT_OK(conns[0].ExecuteFormat("ALTER TABLE $0 ADD COLUMN j$1 int", kTableName, i));
1854
0
    auto res = conns[1].FetchFormat("SELECT * FROM $0", kTableName);
1855
0
    if (is_retry_disabled) {
1856
      // Ensure that we fall under one of two cases:
1857
      // 1. tserver gets updated catalog version before SELECT (rare)
1858
      //    - YBCheckSharedCatalogCacheVersion causes YBRefreshCache
1859
      //    - trying the SELECT requires getting the table schema, but it will be a cache miss since
1860
      //      the whole cache was invalidated, so we get the up-to-date table schema and succeed
1861
      // 2. tserver doesn't get updated catalog version before SELECT (common)
1862
      //    - trying the SELECT causes catalog version mismatch
1863
0
      if (res.ok()) {
1864
0
        LOG(WARNING) << "SELECT was ok";
1865
0
        num_successes++;
1866
0
        continue;
1867
0
      }
1868
0
      auto msg = res.status().message().ToBuffer();
1869
0
      ASSERT_TRUE(msg.find("Catalog Version Mismatch") != std::string::npos) << res.status();
1870
0
    } else {
1871
      // Ensure that the request is successful (thanks to retry).
1872
0
      if (!res.ok()) {
1873
0
        LOG(WARNING) << "SELECT was not ok: " << res.status();
1874
0
        continue;
1875
0
      }
1876
0
      num_successes++;
1877
0
    }
1878
    // Make the catalog version cache up to date, if needed.
1879
0
    ASSERT_OK(conns[1].FetchFormat("SELECT * FROM $0", kTableName));
1880
0
  }
1881
1882
0
  LOG(INFO) << "number of successes: " << num_successes << "/" << kNumTries;
1883
0
  if (is_retry_disabled) {
1884
    // Expect at least half of the tries to fail with catalog version mismatch.  There can be some
1885
    // successes because, between the ALTER and SELECT, the catalog version could have propogated
1886
    // through shared memory (see `YBCheckSharedCatalogCacheVersion`).
1887
0
    const int num_failures = kNumTries - num_successes;
1888
0
    ASSERT_GE(num_failures, kNumTries / 2);
1889
0
  } else {
1890
    // Expect all the tries to succeed.  This is because it is unacceptable to fail when retries are
1891
    // enabled.
1892
0
    ASSERT_EQ(num_successes, kNumTries);
1893
0
  }
1894
0
}
1895
1896
TEST_F_EX(PgLibPqTest,
1897
          YB_DISABLE_TEST_IN_TSAN(CacheRefreshRetryDisabled),
1898
0
          PgLibPqTestNoRetry) {
1899
0
  TestCacheRefreshRetry(true /* is_retry_disabled */);
1900
0
}
1901
1902
0
TEST_F(PgLibPqTest, YB_DISABLE_TEST_IN_TSAN(CacheRefreshRetryEnabled)) {
1903
0
  TestCacheRefreshRetry(false /* is_retry_disabled */);
1904
0
}
1905
1906
class PgLibPqDatabaseTimeoutTest : public PgLibPqTest {
1907
0
  void UpdateMiniClusterOptions(ExternalMiniClusterOptions* options) override {
1908
0
    options->extra_tserver_flags.push_back("--TEST_user_ddl_operation_timeout_sec=1");
1909
0
    options->extra_master_flags.push_back("--ysql_transaction_bg_task_wait_ms=5000");
1910
0
  }
1911
};
1912
1913
0
TEST_F(PgLibPqDatabaseTimeoutTest, YB_DISABLE_TEST_IN_TSAN(TestDatabaseTimeoutGC)) {
1914
0
  NamespaceName test_name = "test_pgsql";
1915
0
  auto client = ASSERT_RESULT(cluster_->CreateClient());
1916
1917
  // Create Database: will timeout because the admin setting is lower than the DB create latency.
1918
0
  {
1919
0
    auto conn = ASSERT_RESULT(Connect());
1920
0
    ASSERT_NOK(conn.Execute("CREATE DATABASE " + test_name));
1921
0
  }
1922
1923
  // Verify DocDB Database creation, even though it failed in PG layer.
1924
  // 'ysql_transaction_bg_task_wait_ms' setting ensures we can finish this before the GC.
1925
0
  ASSERT_OK(LoggedWaitFor([&]() -> Result<bool> {
1926
0
    Result<bool> ret = client->NamespaceExists(test_name, YQLDatabase::YQL_DATABASE_PGSQL);
1927
0
    WARN_NOT_OK(ResultToStatus(ret), "" /* prefix */);
1928
0
    return ret.ok() && ret.get();
1929
0
  }, MonoDelta::FromSeconds(60),
1930
0
     "Verify Namespace was created in DocDB"));
1931
1932
  // After bg_task_wait, DocDB will notice the PG layer failure because the transaction aborts.
1933
  // Confirm that DocDB async deletes the namespace.
1934
0
  ASSERT_OK(LoggedWaitFor([&]() -> Result<bool> {
1935
0
    Result<bool> ret = client->NamespaceExists(test_name, YQLDatabase::YQL_DATABASE_PGSQL);
1936
0
    WARN_NOT_OK(ResultToStatus(ret), "ret");
1937
0
    return ret.ok() && ret.get() == false;
1938
0
  }, MonoDelta::FromSeconds(20), "Verify Namespace was removed by Transaction GC"));
1939
0
}
1940
1941
0
TEST_F(PgLibPqDatabaseTimeoutTest, YB_DISABLE_TEST_IN_TSAN(TestDatabaseTimeoutAndRestartGC)) {
1942
0
  NamespaceName test_name = "test_pgsql";
1943
0
  auto client = ASSERT_RESULT(cluster_->CreateClient());
1944
1945
  // Create Database: will timeout because the admin setting is lower than the DB create latency.
1946
0
  {
1947
0
    auto conn = ASSERT_RESULT(Connect());
1948
0
    ASSERT_NOK(conn.Execute("CREATE DATABASE " + test_name));
1949
0
  }
1950
1951
  // Verify DocDB Database creation, even though it fails in PG layer.
1952
  // 'ysql_transaction_bg_task_wait_ms' setting ensures we can finish this before the GC.
1953
0
  ASSERT_OK(LoggedWaitFor([&]() -> Result<bool> {
1954
0
    Result<bool> ret = client->NamespaceExists(test_name, YQLDatabase::YQL_DATABASE_PGSQL);
1955
0
    WARN_NOT_OK(ResultToStatus(ret), "");
1956
0
    return ret.ok() && ret.get() == true;
1957
0
  }, MonoDelta::FromSeconds(60),
1958
0
      "Verify Namespace was created in DocDB"));
1959
1960
0
  LOG(INFO) << "Restarting Master.";
1961
1962
  // Restart the master before the BG task can kick in and GC the failed transaction.
1963
0
  auto master = cluster_->GetLeaderMaster();
1964
0
  master->Shutdown();
1965
0
  ASSERT_OK(master->Restart());
1966
0
  ASSERT_OK(LoggedWaitFor([&]() -> Result<bool> {
1967
0
    auto s = cluster_->GetIsMasterLeaderServiceReady(master);
1968
0
    return s.ok();
1969
0
  }, MonoDelta::FromSeconds(20), "Wait for Master to be ready."));
1970
1971
  // Confirm that Catalog Loader deletes the namespace on master restart.
1972
0
  client = ASSERT_RESULT(cluster_->CreateClient()); // Reinit the YBClient after restart.
1973
0
  ASSERT_OK(LoggedWaitFor([&]() -> Result<bool> {
1974
0
    Result<bool> ret = client->NamespaceExists(test_name, YQLDatabase::YQL_DATABASE_PGSQL);
1975
0
    WARN_NOT_OK(ResultToStatus(ret), "");
1976
0
    return ret.ok() && ret.get() == false;
1977
0
  }, MonoDelta::FromSeconds(20), "Verify Namespace was removed by Transaction GC"));
1978
0
}
1979
1980
class PgLibPqTableTimeoutTest : public PgLibPqTest {
1981
0
  void UpdateMiniClusterOptions(ExternalMiniClusterOptions* options) override {
1982
    // Use small clock skew, to decrease number of read restarts.
1983
0
    options->extra_tserver_flags.push_back("--TEST_user_ddl_operation_timeout_sec=1");
1984
0
    options->extra_master_flags.push_back("--TEST_simulate_slow_table_create_secs=2");
1985
0
    options->extra_master_flags.push_back("--ysql_transaction_bg_task_wait_ms=3000");
1986
0
  }
1987
};
1988
1989
0
TEST_F(PgLibPqTableTimeoutTest, YB_DISABLE_TEST_IN_TSAN(TestTableTimeoutGC)) {
1990
0
  const string kDatabaseName ="yugabyte";
1991
0
  NamespaceName test_name = "test_pgsql_table";
1992
0
  auto client = ASSERT_RESULT(cluster_->CreateClient());
1993
1994
  // Create Table: will timeout because the admin setting is lower than the DB create latency.
1995
0
  {
1996
0
    auto conn = ASSERT_RESULT(Connect());
1997
0
    ASSERT_NOK(conn.Execute("CREATE TABLE " + test_name + " (key INT PRIMARY KEY)"));
1998
0
  }
1999
2000
  // Wait for DocDB Table creation, even though it will fail in PG layer.
2001
  // 'ysql_transaction_bg_task_wait_ms' setting ensures we can finish this before the GC.
2002
0
  ASSERT_OK(LoggedWaitFor([&]() -> Result<bool> {
2003
0
    LOG(INFO) << "Requesting TableExists";
2004
0
    auto ret = client->TableExists(
2005
0
        client::YBTableName(YQL_DATABASE_PGSQL, kDatabaseName, test_name));
2006
0
    WARN_NOT_OK(ResultToStatus(ret), "");
2007
0
    return ret.ok() && ret.get() == true;
2008
0
  }, MonoDelta::FromSeconds(20), "Verify Table was created in DocDB"));
2009
2010
  // DocDB will notice the PG layer failure because the transaction aborts.
2011
  // Confirm that DocDB async deletes the namespace.
2012
0
  ASSERT_OK(LoggedWaitFor([&]() -> Result<bool> {
2013
0
    auto ret = client->TableExists(
2014
0
        client::YBTableName(YQL_DATABASE_PGSQL, kDatabaseName, test_name));
2015
0
    WARN_NOT_OK(ResultToStatus(ret), "");
2016
0
    return ret.ok() && ret.get() == false;
2017
0
  }, MonoDelta::FromSeconds(20), "Verify Table was removed by Transaction GC"));
2018
0
}
2019
2020
0
TEST_F(PgLibPqTableTimeoutTest, YB_DISABLE_TEST_IN_TSAN(TestTableTimeoutAndRestartGC)) {
2021
0
  const string kDatabaseName ="yugabyte";
2022
0
  NamespaceName test_name = "test_pgsql_table";
2023
0
  auto client = ASSERT_RESULT(cluster_->CreateClient());
2024
2025
  // Create Table: will timeout because the admin setting is lower than the DB create latency.
2026
0
  {
2027
0
    auto conn = ASSERT_RESULT(Connect());
2028
0
    ASSERT_NOK(conn.Execute("CREATE TABLE " + test_name + " (key INT PRIMARY KEY)"));
2029
0
  }
2030
2031
  // Wait for DocDB Table creation, even though it will fail in PG layer.
2032
  // 'ysql_transaction_bg_task_wait_ms' setting ensures we can finish this before the GC.
2033
0
  ASSERT_OK(LoggedWaitFor([&]() -> Result<bool> {
2034
0
    LOG(INFO) << "Requesting TableExists";
2035
0
    auto ret = client->TableExists(
2036
0
        client::YBTableName(YQL_DATABASE_PGSQL, kDatabaseName, test_name));
2037
0
    WARN_NOT_OK(ResultToStatus(ret), "");
2038
0
    return ret.ok() && ret.get() == true;
2039
0
  }, MonoDelta::FromSeconds(20), "Verify Table was created in DocDB"));
2040
2041
0
  LOG(INFO) << "Restarting Master.";
2042
2043
  // Restart the master before the BG task can kick in and GC the failed transaction.
2044
0
  auto master = cluster_->GetLeaderMaster();
2045
0
  master->Shutdown();
2046
0
  ASSERT_OK(master->Restart());
2047
0
  ASSERT_OK(LoggedWaitFor([&]() -> Result<bool> {
2048
0
    auto s = cluster_->GetIsMasterLeaderServiceReady(master);
2049
0
    return s.ok();
2050
0
  }, MonoDelta::FromSeconds(20), "Wait for Master to be ready."));
2051
2052
  // Confirm that Catalog Loader deletes the namespace on master restart.
2053
0
  client = ASSERT_RESULT(cluster_->CreateClient()); // Reinit the YBClient after restart.
2054
0
  ASSERT_OK(LoggedWaitFor([&]() -> Result<bool> {
2055
0
    auto ret = client->TableExists(
2056
0
        client::YBTableName(YQL_DATABASE_PGSQL, kDatabaseName, test_name));
2057
0
    WARN_NOT_OK(ResultToStatus(ret), "");
2058
0
    return ret.ok() && ret.get() == false;
2059
0
  }, MonoDelta::FromSeconds(20), "Verify Table was removed by Transaction GC"));
2060
0
}
2061
2062
class PgLibPqIndexTableTimeoutTest : public PgLibPqTest {
2063
0
  void UpdateMiniClusterOptions(ExternalMiniClusterOptions* options) override {
2064
0
    options->extra_tserver_flags.push_back("--TEST_user_ddl_operation_timeout_sec=10");
2065
0
  }
2066
};
2067
2068
0
TEST_F(PgLibPqIndexTableTimeoutTest, YB_DISABLE_TEST_IN_TSAN(TestIndexTableTimeoutGC)) {
2069
0
  const string kDatabaseName ="yugabyte";
2070
0
  NamespaceName test_name = "test_pgsql_table";
2071
0
  NamespaceName test_name_idx = test_name + "_idx";
2072
2073
0
  auto client = ASSERT_RESULT(cluster_->CreateClient());
2074
2075
  // Lower the delays so we successfully create this first table.
2076
0
  ASSERT_OK(cluster_->SetFlagOnMasters("ysql_transaction_bg_task_wait_ms", "10"));
2077
0
  ASSERT_OK(cluster_->SetFlagOnMasters("TEST_simulate_slow_table_create_secs", "0"));
2078
2079
  // Create Table that Index will be set on.
2080
0
  {
2081
0
    auto conn = ASSERT_RESULT(Connect());
2082
0
    ASSERT_OK(conn.Execute("CREATE TABLE " + test_name + " (key INT PRIMARY KEY)"));
2083
0
  }
2084
2085
  // After successfully creating the first table, set to flags similar to: PgLibPqTableTimeoutTest.
2086
0
  ASSERT_OK(cluster_->SetFlagOnMasters("ysql_transaction_bg_task_wait_ms", "13000"));
2087
0
  ASSERT_OK(cluster_->SetFlagOnMasters("TEST_simulate_slow_table_create_secs", "12"));
2088
2089
  // Create Index: will timeout because the admin setting is lower than the DB create latency.
2090
0
  {
2091
0
    auto conn = ASSERT_RESULT(Connect());
2092
0
    ASSERT_NOK(conn.Execute("CREATE INDEX " + test_name_idx + " ON " + test_name + "(key)"));
2093
0
  }
2094
2095
  // Wait for DocDB Table creation, even though it will fail in PG layer.
2096
  // 'ysql_transaction_bg_task_wait_ms' setting ensures we can finish this before the GC.
2097
0
  ASSERT_OK(LoggedWaitFor([&]() -> Result<bool> {
2098
0
    LOG(INFO) << "Requesting TableExists";
2099
0
    auto ret = client->TableExists(
2100
0
        client::YBTableName(YQL_DATABASE_PGSQL, kDatabaseName, test_name_idx));
2101
0
    WARN_NOT_OK(ResultToStatus(ret), "");
2102
0
    return ret.ok() && ret.get() == true;
2103
0
  }, MonoDelta::FromSeconds(40), "Verify Index Table was created in DocDB"));
2104
2105
  // DocDB will notice the PG layer failure because the transaction aborts.
2106
  // Confirm that DocDB async deletes the namespace.
2107
0
  ASSERT_OK(LoggedWaitFor([&]() -> Result<bool> {
2108
0
    auto ret = client->TableExists(
2109
0
        client::YBTableName(YQL_DATABASE_PGSQL, kDatabaseName, test_name_idx));
2110
0
    WARN_NOT_OK(ResultToStatus(ret), "");
2111
0
    return ret.ok() && ret.get() == false;
2112
0
  }, MonoDelta::FromSeconds(40), "Verify Index Table was removed by Transaction GC"));
2113
0
}
2114
2115
class PgLibPqTestEnumType: public PgLibPqTest {
2116
 public:
2117
0
  void UpdateMiniClusterOptions(ExternalMiniClusterOptions* options) override {
2118
0
    options->extra_tserver_flags.push_back("--TEST_do_not_add_enum_sort_order=true");
2119
0
  }
2120
};
2121
2122
// Make sure that enum type backfill works.
2123
TEST_F_EX(PgLibPqTest,
2124
          YB_DISABLE_TEST_IN_TSAN(EnumType),
2125
0
          PgLibPqTestEnumType) {
2126
0
  const string kDatabaseName ="yugabyte";
2127
0
  const string kTableName ="enum_table";
2128
0
  const string kEnumTypeName ="enum_type";
2129
0
  auto conn = std::make_unique<PGConn>(ASSERT_RESULT(ConnectToDB(kDatabaseName)));
2130
0
  ASSERT_OK(conn->ExecuteFormat(
2131
0
    "CREATE TYPE $0 as enum('b', 'e', 'f', 'c', 'a', 'd')", kEnumTypeName));
2132
0
  ASSERT_OK(conn->ExecuteFormat(
2133
0
    "CREATE TABLE $0 (id $1)",
2134
0
    kTableName,
2135
0
    kEnumTypeName));
2136
0
  ASSERT_OK(conn->ExecuteFormat("INSERT INTO $0 VALUES ('a')", kTableName));
2137
0
  ASSERT_OK(conn->ExecuteFormat("INSERT INTO $0 VALUES ('b')", kTableName));
2138
0
  ASSERT_OK(conn->ExecuteFormat("INSERT INTO $0 VALUES ('c')", kTableName));
2139
2140
  // Do table scan to verify contents the table with an ORDER BY clause. This
2141
  // ensures that old enum values which did not have sort order can be read back,
2142
  // sorted and displayed correctly.
2143
0
  const std::string query = Format("SELECT * FROM $0 ORDER BY id", kTableName);
2144
0
  ASSERT_FALSE(ASSERT_RESULT(conn->HasIndexScan(query)));
2145
0
  PGResultPtr res = ASSERT_RESULT(conn->Fetch(query));
2146
0
  ASSERT_EQ(PQntuples(res.get()), 3);
2147
0
  ASSERT_EQ(PQnfields(res.get()), 1);
2148
0
  std::vector<string> values = {
2149
0
    ASSERT_RESULT(GetString(res.get(), 0, 0)),
2150
0
    ASSERT_RESULT(GetString(res.get(), 1, 0)),
2151
0
    ASSERT_RESULT(GetString(res.get(), 2, 0)),
2152
0
  };
2153
0
  ASSERT_EQ(values[0], "b");
2154
0
  ASSERT_EQ(values[1], "c");
2155
0
  ASSERT_EQ(values[2], "a");
2156
2157
  // Now alter the gflag so any new values will have sort order added.
2158
0
  ASSERT_OK(cluster_->SetFlagOnTServers(
2159
0
              "TEST_do_not_add_enum_sort_order", "false"));
2160
2161
  // Disconnect from the database so we don't have a case where the
2162
  // postmaster dies while clients are still connected.
2163
0
  conn = nullptr;
2164
2165
  // For each tablet server, kill the corresponding PostgreSQL process.
2166
  // A new PostgreSQL process will be respawned by the tablet server and
2167
  // inherit the new --TEST_do_not_add_enum_sort_order flag from the tablet
2168
  // server.
2169
0
  for (size_t i = 0; i < cluster_->num_tablet_servers(); ++i) {
2170
0
    ExternalTabletServer* ts = cluster_->tablet_server(i);
2171
0
    const string pg_pid_file = JoinPathSegments(ts->GetRootDir(), "pg_data",
2172
0
                                                "postmaster.pid");
2173
2174
0
    LOG(INFO) << "pg_pid_file: " << pg_pid_file;
2175
0
    ASSERT_TRUE(Env::Default()->FileExists(pg_pid_file));
2176
0
    std::ifstream pg_pid_in;
2177
0
    pg_pid_in.open(pg_pid_file, std::ios_base::in);
2178
0
    ASSERT_FALSE(pg_pid_in.eof());
2179
0
    pid_t pg_pid = 0;
2180
0
    pg_pid_in >> pg_pid;
2181
0
    ASSERT_GT(pg_pid, 0);
2182
0
    LOG(INFO) << "Killing PostgresSQL process: " << pg_pid;
2183
0
    ASSERT_EQ(kill(pg_pid, SIGKILL), 0);
2184
0
  }
2185
2186
  // Reconnect to the database after the new PostgreSQL starts.
2187
0
  conn = std::make_unique<PGConn>(ASSERT_RESULT(ConnectToDB(kDatabaseName)));
2188
2189
  // Insert three more rows with --TEST_do_not_add_enum_sort_order=false.
2190
  // The new enum values will have sort order added.
2191
0
  ASSERT_OK(conn->ExecuteFormat("INSERT INTO $0 VALUES ('d')", kTableName));
2192
0
  ASSERT_OK(conn->ExecuteFormat("INSERT INTO $0 VALUES ('e')", kTableName));
2193
0
  ASSERT_OK(conn->ExecuteFormat("INSERT INTO $0 VALUES ('f')", kTableName));
2194
2195
  // Do table scan again to verify contents the table with an ORDER BY clause.
2196
  // This ensures that old enum values which did not have sort order, mixed
2197
  // with new enum values which have sort order, can be read back, sorted and
2198
  // displayed correctly.
2199
0
  ASSERT_FALSE(ASSERT_RESULT(conn->HasIndexScan(query)));
2200
0
  res = ASSERT_RESULT(conn->Fetch(query));
2201
0
  ASSERT_EQ(PQntuples(res.get()), 6);
2202
0
  ASSERT_EQ(PQnfields(res.get()), 1);
2203
0
  values = {
2204
0
    ASSERT_RESULT(GetString(res.get(), 0, 0)),
2205
0
    ASSERT_RESULT(GetString(res.get(), 1, 0)),
2206
0
    ASSERT_RESULT(GetString(res.get(), 2, 0)),
2207
0
    ASSERT_RESULT(GetString(res.get(), 3, 0)),
2208
0
    ASSERT_RESULT(GetString(res.get(), 4, 0)),
2209
0
    ASSERT_RESULT(GetString(res.get(), 5, 0)),
2210
0
  };
2211
0
  ASSERT_EQ(values[0], "b");
2212
0
  ASSERT_EQ(values[1], "e");
2213
0
  ASSERT_EQ(values[2], "f");
2214
0
  ASSERT_EQ(values[3], "c");
2215
0
  ASSERT_EQ(values[4], "a");
2216
0
  ASSERT_EQ(values[5], "d");
2217
2218
  // Create an index on the enum table column.
2219
0
  ASSERT_OK(conn->ExecuteFormat("CREATE INDEX ON $0 (id ASC)", kTableName));
2220
2221
  // Index only scan to verify contents of index table.
2222
0
  ASSERT_TRUE(ASSERT_RESULT(conn->HasIndexScan(query)));
2223
0
  res = ASSERT_RESULT(conn->Fetch(query));
2224
0
  ASSERT_EQ(PQntuples(res.get()), 6);
2225
0
  ASSERT_EQ(PQnfields(res.get()), 1);
2226
0
  values = {
2227
0
    ASSERT_RESULT(GetString(res.get(), 0, 0)),
2228
0
    ASSERT_RESULT(GetString(res.get(), 1, 0)),
2229
0
    ASSERT_RESULT(GetString(res.get(), 2, 0)),
2230
0
    ASSERT_RESULT(GetString(res.get(), 3, 0)),
2231
0
    ASSERT_RESULT(GetString(res.get(), 4, 0)),
2232
0
    ASSERT_RESULT(GetString(res.get(), 5, 0)),
2233
0
  };
2234
0
  ASSERT_EQ(values[0], "b");
2235
0
  ASSERT_EQ(values[1], "e");
2236
0
  ASSERT_EQ(values[2], "f");
2237
0
  ASSERT_EQ(values[3], "c");
2238
0
  ASSERT_EQ(values[4], "a");
2239
0
  ASSERT_EQ(values[5], "d");
2240
2241
  // Test where clause.
2242
0
  const std::string query2 = Format("SELECT * FROM $0 where id = 'b'", kTableName);
2243
0
  ASSERT_TRUE(ASSERT_RESULT(conn->HasIndexScan(query2)));
2244
0
  res = ASSERT_RESULT(conn->Fetch(query2));
2245
0
  ASSERT_EQ(PQntuples(res.get()), 1);
2246
0
  ASSERT_EQ(PQnfields(res.get()), 1);
2247
0
  const string value = ASSERT_RESULT(GetString(res.get(), 0, 0));
2248
0
  ASSERT_EQ(value, "b");
2249
0
}
2250
2251
// Test postgres large oid (>= 2^31). Internally postgres oid is an unsigned 32-bit integer. But
2252
// when extended to Datum type (unsigned long), the sign-bit is extended so that the high 32-bit
2253
// is ffffffff. This caused unexpected assertion failures and errors.
2254
class PgLibPqLargeOidTest: public PgLibPqTest {
2255
 public:
2256
0
  void UpdateMiniClusterOptions(ExternalMiniClusterOptions* options) override {
2257
0
    options->extra_tserver_flags.push_back(
2258
0
      Format("--TEST_ysql_oid_prefetch_adjustment=$0", kOidAdjustment));
2259
0
  }
2260
  const Oid kOidAdjustment = 2147483648U - kPgFirstNormalObjectId; // 2^31 - 16384
2261
};
2262
TEST_F_EX(PgLibPqTest,
2263
          YB_DISABLE_TEST_IN_TSAN(LargeOid),
2264
0
          PgLibPqLargeOidTest) {
2265
  // Test large OID with enum type which had Postgres Assert failure.
2266
0
  const string kDatabaseName ="yugabyte";
2267
0
  const string kTableName ="enum_table";
2268
0
  const string kEnumTypeName ="enum_type";
2269
0
  PGConn conn = ASSERT_RESULT(ConnectToDB(kDatabaseName));
2270
0
  ASSERT_OK(conn.ExecuteFormat("CREATE TYPE $0 as enum('a', 'c')", kEnumTypeName));
2271
  // Do ALTER TYPE to ensure we correctly put sort order as the high 32-bit after clearing
2272
  // the signed extended ffffffff. The following index scan would yield wrong order if we
2273
  // left ffffffff in the high 32-bit.
2274
0
  ASSERT_OK(conn.ExecuteFormat("ALTER TYPE $0 ADD VALUE 'b' BEFORE 'c'", kEnumTypeName));
2275
0
  std::string query = "SELECT oid FROM pg_enum";
2276
0
  PGResultPtr res = ASSERT_RESULT(conn.Fetch(query));
2277
0
  ASSERT_EQ(PQntuples(res.get()), 3);
2278
0
  ASSERT_EQ(PQnfields(res.get()), 1);
2279
0
  std::vector<int32> enum_oids = {
2280
0
    ASSERT_RESULT(GetInt32(res.get(), 0, 0)),
2281
0
    ASSERT_RESULT(GetInt32(res.get(), 1, 0)),
2282
0
    ASSERT_RESULT(GetInt32(res.get(), 2, 0)),
2283
0
  };
2284
  // Ensure that we do see large OIDs in pg_enum table.
2285
0
  LOG(INFO) << "enum_oids: " << (Oid)enum_oids[0] << ","
2286
0
            << (Oid)enum_oids[1] << "," << (Oid)enum_oids[2];
2287
0
  ASSERT_GT((Oid)enum_oids[0], kOidAdjustment);
2288
0
  ASSERT_GT((Oid)enum_oids[1], kOidAdjustment);
2289
0
  ASSERT_GT((Oid)enum_oids[2], kOidAdjustment);
2290
2291
  // Create a table using the enum type and insert a few rows.
2292
0
  ASSERT_OK(conn.ExecuteFormat(
2293
0
    "CREATE TABLE $0 (id $1)",
2294
0
    kTableName,
2295
0
    kEnumTypeName));
2296
0
  ASSERT_OK(conn.ExecuteFormat("INSERT INTO $0 VALUES ('a'), ('b'), ('c')", kTableName));
2297
2298
  // Create an index on the enum table column.
2299
0
  ASSERT_OK(conn.ExecuteFormat("CREATE INDEX ON $0 (id ASC)", kTableName));
2300
2301
  // Index only scan to verify that with large OIDs, the contents of index table
2302
  // is still correct. This also triggers index backfill statement, which used to
2303
  // fail on large oid such as:
2304
  // BACKFILL INDEX 2147500041 WITH x'0880011a00' READ TIME 6725053491126669312 PARTITION x'5555';
2305
  // We fix the syntax error by rewriting it to
2306
  // BACKFILL INDEX -2147467255 WITH x'0880011a00' READ TIME 6725053491126669312 PARTITION x'5555';
2307
  // Internally, -2147467255 will be reinterpreted as OID 2147500041 which is the OID of the index.
2308
0
  query = Format("SELECT * FROM $0 ORDER BY id", kTableName);
2309
0
  ASSERT_TRUE(ASSERT_RESULT(conn.HasIndexScan(query)));
2310
0
  res = ASSERT_RESULT(conn.Fetch(query));
2311
0
  ASSERT_EQ(PQntuples(res.get()), 3);
2312
0
  ASSERT_EQ(PQnfields(res.get()), 1);
2313
0
  std::vector<string> enum_values = {
2314
0
    ASSERT_RESULT(GetString(res.get(), 0, 0)),
2315
0
    ASSERT_RESULT(GetString(res.get(), 1, 0)),
2316
0
    ASSERT_RESULT(GetString(res.get(), 2, 0)),
2317
0
  };
2318
0
  ASSERT_EQ(enum_values[0], "a");
2319
0
  ASSERT_EQ(enum_values[1], "b");
2320
0
  ASSERT_EQ(enum_values[2], "c");
2321
0
}
2322
2323
namespace {
2324
2325
class CoordinatedRunner {
2326
 public:
2327
  using RepeatableCommand = std::function<Status()>;
2328
2329
  explicit CoordinatedRunner(std::vector<RepeatableCommand> commands)
2330
0
      : barrier_(commands.size()) {
2331
0
    threads_.reserve(commands.size());
2332
0
    for (auto& c : commands) {
2333
0
      threads_.emplace_back([this, cmd = std::move(c)] () {
2334
0
        while (!(stop_.load(std::memory_order_acquire) ||
2335
0
            error_detected_.load(std::memory_order_acquire))) {
2336
0
          barrier_.Wait();
2337
0
          const auto status = cmd();
2338
0
          if (!status.ok()) {
2339
0
            LOG(ERROR) << "Error detected: " << status;
2340
0
            error_detected_.store(true, std::memory_order_release);
2341
0
          }
2342
0
        }
2343
0
        barrier_.Detach();
2344
0
      });
2345
0
    }
2346
0
  }
2347
2348
0
  void Stop() {
2349
0
    stop_.store(true, std::memory_order_release);
2350
0
    for (auto& thread : threads_) {
2351
0
      thread.join();
2352
0
    }
2353
0
  }
2354
2355
0
  bool HasError() {
2356
0
    return error_detected_.load(std::memory_order_acquire);
2357
0
  }
2358
2359
 private:
2360
  std::vector<std::thread> threads_;
2361
  Barrier barrier_;
2362
  std::atomic<bool> stop_{false};
2363
  std::atomic<bool> error_detected_{false};
2364
};
2365
2366
0
bool RetryableError(const Status& status) {
2367
0
  const auto msg = status.message().ToBuffer();
2368
0
  const std::string expected_errors[] = {"Try again",
2369
0
                                         "Catalog Version Mismatch",
2370
0
                                         "Restart read required at",
2371
0
                                         "schema version mismatch for table"};
2372
0
  for (const auto& expected : expected_errors) {
2373
0
    if (msg.find(expected) != std::string::npos) {
2374
0
      return true;
2375
0
    }
2376
0
  }
2377
0
  return false;
2378
0
}
2379
2380
} // namespace
2381
2382
0
TEST_F(PgLibPqTest, YB_DISABLE_TEST_IN_TSAN(PagingReadRestart)) {
2383
0
  auto conn = ASSERT_RESULT(Connect());
2384
0
  ASSERT_OK(conn.Execute("CREATE TABLE t (key INT PRIMARY KEY)"));
2385
0
  ASSERT_OK(conn.Execute("INSERT INTO t SELECT generate_series(1, 5000)"));
2386
0
  const size_t reader_count = 20;
2387
0
  std::vector<CoordinatedRunner::RepeatableCommand> commands;
2388
0
  commands.reserve(reader_count + 1);
2389
0
  commands.emplace_back(
2390
0
      [connection = std::make_shared<PGConn>(ASSERT_RESULT(Connect()))] () -> Status {
2391
0
        RETURN_NOT_OK(connection->Execute("ALTER TABLE t ADD COLUMN v INT DEFAULT 100"));
2392
0
        RETURN_NOT_OK(connection->Execute("ALTER TABLE t DROP COLUMN v"));
2393
0
        return Status::OK();
2394
0
  });
2395
0
  for (size_t i = 0; i < reader_count; ++i) {
2396
0
    commands.emplace_back(
2397
0
        [connection = std::make_shared<PGConn>(ASSERT_RESULT(Connect()))] () -> Status {
2398
0
          const auto res = connection->Fetch("SELECT key FROM t");
2399
0
          return (res.ok() || RetryableError(res.status())) ? Status::OK() : res.status();
2400
0
    });
2401
0
  }
2402
0
  CoordinatedRunner runner(std::move(commands));
2403
0
  std::this_thread::sleep_for(10s);
2404
0
  runner.Stop();
2405
0
  ASSERT_FALSE(runner.HasError());
2406
0
}
2407
2408
0
TEST_F(PgLibPqTest, YB_DISABLE_TEST_IN_TSAN(CollationRangePresplit)) {
2409
0
  const string kDatabaseName ="yugabyte";
2410
0
  auto client = ASSERT_RESULT(cluster_->CreateClient());
2411
2412
0
  auto conn = ASSERT_RESULT(ConnectToDB(kDatabaseName));
2413
0
  ASSERT_OK(conn.Execute("CREATE TABLE collrange(a text COLLATE \"en-US-x-icu\", "
2414
0
                         "PRIMARY KEY(a ASC)) SPLIT AT VALUES (('100'), ('200'))"));
2415
2416
0
  google::protobuf::RepeatedPtrField<master::TabletLocationsPB> tablets;
2417
0
  auto table_id = ASSERT_RESULT(GetTableIdByTableName(client.get(), kDatabaseName, "collrange"));
2418
2419
  // Validate that number of tablets created is 3.
2420
0
  ASSERT_OK(client->GetTabletsFromTableId(table_id, 0, &tablets));
2421
0
  ASSERT_EQ(tablets.size(), 3);
2422
  // Partition key length of plain encoded '100' or '200'.
2423
0
  const size_t partition_key_length = 7;
2424
  // When a text value is collation encoded, we need at least 3 extra bytes.
2425
0
  const size_t min_collation_extra_bytes = 3;
2426
0
  for (const auto& tablet : tablets) {
2427
0
    ASSERT_TRUE(tablet.has_partition());
2428
0
    auto partition_start = tablet.partition().partition_key_start();
2429
0
    auto partition_end = tablet.partition().partition_key_end();
2430
0
    LOG(INFO) << "partition_start: " << b2a_hex(partition_start)
2431
0
              << ", partition_end: " << b2a_hex(partition_end);
2432
0
    ASSERT_TRUE(partition_start.empty() ||
2433
0
                partition_start.size() >= partition_key_length + min_collation_extra_bytes);
2434
0
    ASSERT_TRUE(partition_end.empty() ||
2435
0
                partition_end.size() >= partition_key_length + min_collation_extra_bytes);
2436
0
  }
2437
0
}
2438
2439
} // namespace pgwrapper
2440
} // namespace yb