YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/yql/pgwrapper/pg_libpq_err-test.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
13
#include "yb/yql/pgwrapper/libpq_test_base.h"
14
#include "yb/yql/pgwrapper/libpq_utils.h"
15
16
namespace yb {
17
namespace pgwrapper {
18
19
class PgLibPqErrTest : public LibPqTestBase {
20
};
21
22
0
TEST_F(PgLibPqErrTest, YB_DISABLE_TEST_IN_TSAN(BeginWithoutCommit)) {
23
0
  constexpr auto kIterations = 10;
24
25
  // Create table and insert some rows.
26
0
  auto conn = ASSERT_RESULT(Connect());
27
0
  ASSERT_OK(conn.Execute("CREATE TABLE terr (k INT PRIMARY KEY, v INT)"));
28
0
  ASSERT_OK(conn.Execute("INSERT INTO terr (k, v) VALUES (1, 1)"));
29
30
  // Run "BEGIN; SELECT;" blocks on several connections.
31
  // When there's many threads, Postgres didn't have enough time to clear its memory contexts
32
  // before exiting. This test is to show that Postgres's handles to SELECT statements will be
33
  // released during the exiting process.
34
  //
35
  // NOTES
36
  // - Transaction manager (TxnMan) holds reference to SELECT statement objects.
37
  // - Postgres will have to release SELECT handles when exiting. Otherwise, TxnMan process will
38
  //   not destroy the SELECT statement objects as it assumes Postgres process needs the handles.
39
0
  for (int iteration = 0; iteration < kIterations; ++iteration) {
40
0
    std::atomic<int> complete{ 0 };
41
0
    std::vector<std::thread> threads;
42
0
    threads.emplace_back([this, iteration, &complete] {
43
      // Exec BEGIN block.
44
0
      auto connection = ASSERT_RESULT(Connect());
45
0
      ASSERT_OK(connection.Execute("BEGIN ISOLATION LEVEL SERIALIZABLE"));
46
0
      auto res = connection.Fetch("SELECT * FROM terr");
47
48
      // Test early termination.
49
      // - Just return for one of the thread.
50
      // - We are not meant to test SELECT, so ignore its error for timeout, abort, ...
51
0
      if (iteration == 3 || !res.ok()) {
52
0
        return;
53
0
      }
54
55
0
      ++complete;
56
0
    });
57
58
0
    for (auto& thread : threads) {
59
0
      thread.join();
60
0
    }
61
62
0
    if (complete == 0) {
63
0
      continue;
64
0
    }
65
0
  }
66
0
}
67
68
0
TEST_F(PgLibPqErrTest, YB_DISABLE_TEST_IN_TSAN(InsertWithoutCommit)) {
69
0
  constexpr auto kRetryCount = 3;
70
0
  constexpr auto kIterations = 10;
71
0
  constexpr auto kRowPerSeed = 100;
72
73
  // Create table.
74
0
  auto conn = ASSERT_RESULT(Connect());
75
0
  ASSERT_OK(conn.Execute("CREATE TABLE terr (k INT PRIMARY KEY, v INT)"));
76
77
0
  int seed = 0;
78
0
  for (int iteration = 0; iteration < kIterations; ++iteration) {
79
    // Use different seed for primary key to avoid duplicate among thread.
80
0
    seed++;
81
82
0
    std::atomic<int> complete{ 0 };
83
0
    std::vector<std::thread> threads;
84
0
    threads.emplace_back([this, seed, &complete] {
85
0
      auto connection = ASSERT_RESULT(Connect());
86
0
      ASSERT_OK(connection.Execute("BEGIN"));
87
88
      // INSERT.
89
0
      int row_count = 0;
90
0
      for (int k = 0; k != kRowPerSeed; ++k) {
91
        // For each row of data, retry a number of times or until success.
92
0
        for (int rt = 0; rt < kRetryCount; rt++) {
93
0
          auto status = connection.ExecuteFormat("INSERT INTO terr VALUES ($0, $1)",
94
0
                                      seed * (k + 1), seed);
95
0
          if (!status.ok()) {
96
0
            string serr = status.ToString();
97
0
            if (serr.find("aborted") != std::string::npos) {
98
              // If DocDB aborted the execution, the transaction cannot be continued.
99
0
              return;
100
0
            }
101
            // Run the statement again as "Try again" was reported.
102
0
            ASSERT_TRUE(HasTryAgain(status)) << status;
103
0
            continue;
104
0
          }
105
106
          // INSERT succeeded.
107
0
          row_count++;
108
0
          break;
109
0
        }
110
0
      }
111
112
      // SELECT: Retry number of times or until success.
113
0
      for (int rt = 0; rt < kRetryCount; rt++) {
114
0
        auto res = connection.Fetch("SELECT * FROM terr");
115
0
        if (!res.ok()) {
116
0
          ASSERT_TRUE(HasTryAgain(res.status())) << res.status();
117
0
          continue;
118
0
        }
119
120
        // SELECT succeeded.
121
0
        auto columns = PQnfields(res->get());
122
0
        ASSERT_EQ(2, columns);
123
124
0
        auto lines = PQntuples(res->get());
125
0
        ASSERT_EQ(row_count, lines);
126
0
        break;
127
0
      }
128
129
0
      ++complete;
130
0
    });
131
132
0
    for (auto& thread : threads) {
133
0
      thread.join();
134
0
    }
135
136
0
    if (complete == 0) {
137
0
      continue;
138
0
    }
139
140
    // Table should have no data as transactions were not committed.
141
0
    auto res = ASSERT_RESULT(conn.Fetch("SELECT * FROM terr"));
142
0
    auto columns = PQnfields(res.get());
143
0
    ASSERT_EQ(2, columns);
144
145
0
    auto lines = PQntuples(res.get());
146
0
    ASSERT_EQ(0, lines);
147
0
  }
148
0
}
149
150
0
TEST_F(PgLibPqErrTest, YB_DISABLE_TEST_IN_TSAN(InsertDuplicateWithoutCommit)) {
151
0
  constexpr auto kRetryCount = 3;
152
0
  constexpr auto kIterations = 10;
153
0
  constexpr auto kRowPerSeed = 100;
154
155
  // Create table.
156
0
  auto conn = ASSERT_RESULT(Connect());
157
0
  ASSERT_OK(conn.Execute("CREATE TABLE terr (k INT PRIMARY KEY, v INT)"));
158
159
  // Use the same insertion seed for all threads to get same row value.
160
  // This should NOT cause duplicate key error as they are on different transaction.
161
0
  int seed = 1;
162
0
  for (int iteration = 0; iteration < kIterations; ++iteration) {
163
0
    std::atomic<int> complete{ 0 };
164
0
    std::vector<std::thread> threads;
165
0
    threads.emplace_back([this, seed, &complete] {
166
0
      auto connection = ASSERT_RESULT(Connect());
167
0
      ASSERT_OK(connection.Execute("BEGIN"));
168
169
      // INSERT.
170
0
      int row_count = 0;
171
0
      for (int k = 0; k != kRowPerSeed; ++k) {
172
        // For each row of data, retry a number of times or until success.
173
0
        for (int rt = 0; rt < kRetryCount; rt++) {
174
0
          auto status = connection.ExecuteFormat("INSERT INTO terr VALUES ($0, $1)",
175
0
                                           seed * (k + 1), seed);
176
0
          if (!status.ok()) {
177
0
            string serr = status.ToString();
178
0
            if (serr.find("aborted") != std::string::npos) {
179
              // If DocDB aborted the execution, the transaction cannot be continued.
180
0
              return;
181
0
            }
182
            // Run the statement again as "Try again" was reported.
183
0
            ASSERT_TRUE(HasTryAgain(status)) << status;
184
0
            continue;
185
0
          }
186
187
          // INSERT succeeded.
188
0
          row_count++;
189
0
          break;
190
0
        }
191
0
      }
192
193
      // SELECT: Retry number of times or until success.
194
0
      for (int rt = 0; rt < kRetryCount; rt++) {
195
0
        auto res = connection.Fetch("SELECT * FROM terr");
196
0
        if (!res.ok()) {
197
0
          ASSERT_TRUE(HasTryAgain(res.status())) << res.status();
198
0
          continue;
199
0
        }
200
201
        // SELECT succeeded.
202
0
        auto columns = PQnfields(res->get());
203
0
        ASSERT_EQ(2, columns);
204
205
0
        auto lines = PQntuples(res->get());
206
0
        ASSERT_EQ(row_count, lines);
207
0
        break;
208
0
      }
209
210
0
      ++complete;
211
0
    });
212
213
0
    for (auto& thread : threads) {
214
0
      thread.join();
215
0
    }
216
217
0
    if (complete == 0) {
218
0
      continue;
219
0
    }
220
221
    // Table should have no data as transactions were not committed.
222
0
    auto res = ASSERT_RESULT(conn.Fetch("SELECT * FROM terr"));
223
0
    auto columns = PQnfields(res.get());
224
0
    ASSERT_EQ(2, columns);
225
226
0
    auto lines = PQntuples(res.get());
227
0
    ASSERT_EQ(0, lines);
228
0
  }
229
0
}
230
231
0
TEST_F(PgLibPqErrTest, YB_DISABLE_TEST_IN_TSAN(UpdateWithoutCommit)) {
232
0
  constexpr auto kRetryCount = 3;
233
0
  constexpr auto kIterations = 10;
234
0
  constexpr auto kRowCount = 100;
235
0
  constexpr auto kSeed = 7;
236
237
  // Create table.
238
0
  auto conn = ASSERT_RESULT(Connect());
239
0
  ASSERT_OK(conn.Execute("CREATE TABLE terr (k INT PRIMARY KEY, v INT)"));
240
241
  // INSERT a few rows.
242
0
  int seed = kSeed;
243
0
  for (int k = 0; k != kRowCount; ++k) {
244
0
    ASSERT_OK(conn.ExecuteFormat("INSERT INTO terr VALUES ($0, $1)", k, seed));
245
0
  }
246
247
0
  for (int iteration = 1; iteration <= kIterations; ++iteration) {
248
0
    seed = kSeed + iteration;
249
250
0
    std::atomic<int> complete{ 0 };
251
0
    std::vector<std::thread> threads;
252
0
    threads.emplace_back([this, seed, kRowCount, &complete] {
253
0
      auto connection = ASSERT_RESULT(Connect());
254
0
      ASSERT_OK(connection.Execute("BEGIN"));
255
256
      // UPDATE using different seeds.
257
0
      for (int k = 0; k != kRowCount; ++k) {
258
0
        for (int rt = 0; rt < kRetryCount; rt++) {
259
0
          auto status = connection.ExecuteFormat("UPDATE terr SET v = $0 WHERE k = $1", seed, k);
260
0
          if (!status.ok()) {
261
0
            string serr = status.ToString();
262
0
            if (serr.find("aborted") != std::string::npos) {
263
              // If DocDB aborted the execution, the transaction cannot be continued.
264
0
              return;
265
0
            }
266
            // Run the statement again as "Try again" was reported.
267
0
            ASSERT_TRUE(HasTryAgain(status)) << status;
268
0
            continue;
269
0
          }
270
0
        }
271
0
      }
272
273
      // SELECT: Retry number of times or until success.
274
0
      for (int rt = 0; rt < kRetryCount; rt++) {
275
0
        auto res = connection.Fetch("SELECT * FROM terr");
276
0
        if (!res.ok()) {
277
0
          ASSERT_TRUE(HasTryAgain(res.status())) << res.status();
278
0
          continue;
279
0
        }
280
281
        // Done selecting. Retry not needed.
282
0
        auto columns = PQnfields(res->get());
283
0
        ASSERT_EQ(2, columns);
284
285
0
        auto lines = PQntuples(res->get());
286
0
        ASSERT_EQ(kRowCount, lines);
287
288
        // Check column 'v' vs 'seed'.
289
0
        for (int i = 0; i != lines; ++i) {
290
0
          int32_t v = ASSERT_RESULT(GetInt32(res->get(), i, 1));
291
0
          ASSERT_EQ(v, seed);
292
0
        }
293
0
        break;
294
0
      }
295
296
0
      ++complete;
297
0
    });
298
299
0
    for (auto& thread : threads) {
300
0
      thread.join();
301
0
    }
302
303
0
    if (complete == 0) {
304
0
      continue;
305
0
    }
306
307
    // Table should have no updates as transactions were not committed.
308
0
    auto res = ASSERT_RESULT(conn.Fetch("SELECT * FROM terr"));
309
0
    auto columns = PQnfields(res.get());
310
0
    ASSERT_EQ(2, columns);
311
312
0
    auto lines = PQntuples(res.get());
313
0
    ASSERT_EQ(kRowCount, lines);
314
315
    // Check column 'v' vs original value 'kSeed'.
316
0
    for (int i = 0; i != lines; ++i) {
317
0
      int32_t v = ASSERT_RESULT(GetInt32(res.get(), i, 1));
318
0
      ASSERT_EQ(v, kSeed);
319
0
    }
320
0
  }
321
0
}
322
323
0
TEST_F(PgLibPqErrTest, YB_DISABLE_TEST_IN_TSAN(DeleteWithoutCommit)) {
324
0
  constexpr auto kRetryCount = 3;
325
0
  constexpr auto kIterations = 10;
326
0
  constexpr auto kRowCount = 100;
327
0
  constexpr auto kSeed = 7;
328
329
  // Create table.
330
0
  auto conn = ASSERT_RESULT(Connect());
331
0
  ASSERT_OK(conn.Execute("CREATE TABLE terr (k INT PRIMARY KEY, v INT)"));
332
333
  // INSERT a few rows.
334
0
  int seed = kSeed;
335
0
  for (int k = 0; k != kRowCount; ++k) {
336
0
    ASSERT_OK(conn.ExecuteFormat("INSERT INTO terr VALUES ($0, $1)", k, seed));
337
0
  }
338
339
0
  for (int iteration = 1; iteration <= kIterations; ++iteration) {
340
0
    std::atomic<int> complete{ 0 };
341
0
    std::vector<std::thread> threads;
342
0
    threads.emplace_back([this, &complete] {
343
0
      auto connection = ASSERT_RESULT(Connect());
344
0
      ASSERT_OK(connection.Execute("BEGIN"));
345
346
      // DELETE.
347
0
      int delete_count = 0;
348
0
      for (int k = 0; k != kRowCount; ++k) {
349
        // For each row of data, retry a number of times or until success.
350
0
        for (int rt = 0; rt < kRetryCount; rt++) {
351
0
          auto status = connection.ExecuteFormat("DELETE FROM terr WHERE k = $0", k);
352
0
          if (!status.ok()) {
353
0
            string serr = status.ToString();
354
0
            if (serr.find("aborted") != std::string::npos) {
355
              // If DocDB aborted the execution, the transaction cannot be continued.
356
0
              return;
357
0
            }
358
            // Run the statement again as "Try again" was reported.
359
0
            ASSERT_TRUE(HasTryAgain(status)) << status;
360
0
            continue;
361
0
          }
362
363
          // DELETE succeeded.
364
0
          delete_count++;
365
0
          break;
366
0
        }
367
0
      }
368
369
      // SELECT: Retry number of times or until success.
370
0
      for (int rt = 0; rt < kRetryCount; rt++) {
371
0
        auto res = connection.Fetch("SELECT * FROM terr");
372
0
        if (!res.ok()) {
373
0
          ASSERT_TRUE(HasTryAgain(res.status())) << res.status();
374
0
          continue;
375
0
        }
376
377
        // SELECT succeeded.
378
0
        auto columns = PQnfields(res->get());
379
0
        ASSERT_EQ(2, columns);
380
381
0
        auto lines = PQntuples(res->get());
382
0
        ASSERT_EQ(kRowCount - delete_count, lines);
383
384
0
        break;
385
0
      }
386
387
0
      ++complete;
388
0
    });
389
390
0
    for (auto& thread : threads) {
391
0
      thread.join();
392
0
    }
393
394
0
    if (complete == 0) {
395
0
      continue;
396
0
    }
397
398
    // Table should still have all the rows as transactions were not committed.
399
0
    auto res = ASSERT_RESULT(conn.Fetch("SELECT * FROM terr"));
400
0
    auto columns = PQnfields(res.get());
401
0
    ASSERT_EQ(2, columns);
402
403
0
    auto lines = PQntuples(res.get());
404
0
    ASSERT_EQ(kRowCount, lines);
405
0
  }
406
0
}
407
408
0
TEST_F(PgLibPqErrTest, YB_DISABLE_TEST_IN_TSAN(InsertTransactionAborted)) {
409
0
  constexpr auto kIterations = 10;
410
0
  constexpr auto kRowPerSeed = 100;
411
412
  // Create table.
413
0
  auto conn = ASSERT_RESULT(Connect());
414
0
  ASSERT_OK(conn.Execute("CREATE TABLE terr (k INT PRIMARY KEY, v INT)"));
415
416
  // INSERT.
417
0
  int seed = 1;
418
0
  for (int k = 0; k != kRowPerSeed; ++k) {
419
0
      ASSERT_OK(conn.ExecuteFormat("INSERT INTO terr VALUES ($0, $1)",
420
0
                                   seed * (k + 1), seed));
421
0
  }
422
423
  // Use the same insertion seed for all threads to cause duplicate key.
424
0
  for (int iteration = 0; iteration < kIterations; ++iteration) {
425
0
    std::atomic<int> complete{ 0 };
426
0
    std::vector<std::thread> threads;
427
0
    threads.emplace_back([this, seed, &complete] {
428
0
      auto connection = ASSERT_RESULT(Connect());
429
0
      ASSERT_OK(connection.Execute("BEGIN"));
430
431
      // INSERT.
432
0
      for (int k = 0; k != kRowPerSeed; ++k) {
433
0
        auto status = connection.ExecuteFormat("INSERT INTO terr VALUES ($0, $1)",
434
0
                                         seed * (k + 1), seed);
435
0
        CHECK(!status.ok()) << "INSERT is expected to fail";
436
437
0
        string serr = status.ToString();
438
0
        CHECK(serr.find("duplicate") != std::string::npos ||
439
0
              serr.find("aborted") != std::string::npos)
440
0
          << "Expecting duplicate key or transaction aborted";
441
0
      }
442
443
0
      ++complete;
444
0
    });
445
446
0
    for (auto& thread : threads) {
447
0
      thread.join();
448
0
    }
449
450
0
    if (complete == 0) {
451
0
      continue;
452
0
    }
453
0
  }
454
0
}
455
456
} // namespace pgwrapper
457
} // namespace yb