YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/yql/pgwrapper/pg_on_conflict-test.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
13
#include "yb/common/common.pb.h"
14
15
#include "yb/util/random_util.h"
16
#include "yb/util/scope_exit.h"
17
#include "yb/util/test_thread_holder.h"
18
#include "yb/util/tsan_util.h"
19
20
#include "yb/yql/pgwrapper/libpq_test_base.h"
21
#include "yb/yql/pgwrapper/libpq_utils.h"
22
23
using namespace std::literals;
24
25
DECLARE_int64(external_mini_cluster_max_log_bytes);
26
27
namespace yb {
28
namespace pgwrapper {
29
30
class PgOnConflictTest : public LibPqTestBase {
31
 protected:
32
  void TestOnConflict(bool kill_master, const MonoDelta& duration);
33
};
34
35
namespace {
36
37
struct OnConflictKey {
38
  int key;
39
  size_t operation_index = 0;
40
};
41
42
constexpr int kMaxBatchSize = 5;
43
44
struct BatchInfo {
45
  int key;
46
  char append_char; // Zero means read request
47
  std::string read_value;
48
49
0
  std::string ToString() const {
50
0
    if (append_char) {
51
0
      char x[2] = {append_char, 0};
52
0
      return Format("[$0+$1]", key, x);
53
0
    } else {
54
0
      return Format("[$0 $1]", key, read_value);
55
0
    }
56
0
  }
57
58
0
  bool ComesBefore(const BatchInfo& rhs) const {
59
0
    if (key != rhs.key) {
60
0
      return false;
61
0
    }
62
0
    if (append_char) {
63
0
      if (rhs.append_char) {
64
0
        return false;
65
0
      }
66
      // rhs see our append
67
0
      return rhs.read_value.find(append_char) != std::string::npos;
68
0
    } else if (!rhs.append_char) {
69
      // rhs has larger list
70
0
      return read_value.length() < rhs.read_value.length();
71
0
    } else {
72
      // we don't see the result of rhs
73
0
      return read_value.find(rhs.append_char) == std::string::npos;
74
0
    }
75
0
  }
76
};
77
78
struct TransactionInfo {
79
  typedef std::array<BatchInfo, kMaxBatchSize> Batches;
80
  typedef Batches::const_iterator const_iterator;
81
82
  int batch_size = 0;
83
  Batches batches;
84
  int last_visit = 0; // Used to check whether this vertex was visited by current DFS run.
85
86
0
  const_iterator begin() const {
87
0
    return batches.begin();
88
0
  }
89
90
0
  const_iterator end() const {
91
0
    return batches.begin() + batch_size;
92
0
  }
93
94
0
  bool ComesBefore(const TransactionInfo& rhs) const {
95
0
    for (const auto& lbatch : *this) {
96
0
      for (const auto& rbatch : rhs) {
97
0
        if (lbatch.ComesBefore(rbatch)) {
98
0
          return true;
99
0
        }
100
0
      }
101
0
    }
102
0
    return false;
103
0
  }
104
};
105
106
class OnConflictHelper {
107
 public:
108
  explicit OnConflictHelper(size_t concurrent_keys)
109
0
      : concurrent_keys_(concurrent_keys), active_keys_(concurrent_keys) {
110
0
    for(size_t i = 0; i != concurrent_keys; ++i) {
111
0
      active_keys_[i].key = ++next_key_;
112
0
    }
113
0
    for (auto i = 'A'; i <= 'Z'; ++i) {
114
0
      chars_.push_back(i);
115
0
    }
116
0
  }
117
118
0
  std::pair<int, char> RandomPair() {
119
0
    size_t i = RandomUniformInt<size_t>(0, concurrent_keys_ - 1);
120
0
    std::lock_guard<std::mutex> lock(mutex_);
121
0
    auto& key = active_keys_[i];
122
0
    char append_char;
123
0
    if (RandomUniformBool()) {
124
0
      append_char = 0; // Read key
125
0
    } else {
126
0
      append_char = chars_[key.operation_index];
127
0
      if (++key.operation_index == chars_.size()) {
128
0
        key.key = ++next_key_;
129
0
        key.operation_index = 0;
130
0
      }
131
0
    }
132
0
    return std::make_pair(key.key, append_char);
133
0
  }
134
135
0
  void Committed(TransactionInfo&& info) {
136
0
    std::lock_guard<std::mutex> lock(mutex_);
137
0
    committed_.push_back(std::move(info));
138
0
  }
139
140
0
  void Report() {
141
0
    LOG(INFO) << "Committed transactions:";
142
143
0
    ordered_.reserve(committed_.size());
144
    // Iteration order does not matter here, so we iterate from end to have lower keys at the start
145
    // of the list.
146
0
    for (auto it = committed_.rbegin(); it != committed_.rend(); ++it) {
147
0
      if (it->last_visit == 0) {
148
0
        DepthFirstSearch(&*it, nullptr /* dest */);
149
0
      }
150
0
    }
151
152
0
    std::reverse(ordered_.begin(), ordered_.end());
153
154
0
    for (const auto* info : ordered_) {
155
0
      LOG(INFO) << "  " << yb::ToString(*info);
156
0
    }
157
158
0
    int inversions = 0;
159
0
    for (auto it = ordered_.begin(); it != ordered_.end(); ++it) {
160
0
      for (auto j = ordered_.begin(); j != it; ++j) {
161
0
        if ((**it).ComesBefore(**j)) {
162
0
          LOG(INFO) << "Order inversion: " << yb::ToString(**it) << " and " << yb::ToString(**j);
163
0
          ++inversions;
164
0
          ++query_;
165
0
          DepthFirstSearch(*j, *it);
166
0
        }
167
0
      }
168
0
    }
169
170
0
    ASSERT_EQ(inversions, 0);
171
0
  }
172
173
 private:
174
  // Returns true if dest was reached.
175
0
  bool DepthFirstSearch(TransactionInfo* v, TransactionInfo* dest) {
176
0
    v->last_visit = query_;
177
0
    if (v == dest) {
178
0
      LOG(INFO) << "  " << yb::ToString(*v);
179
0
      return true;
180
0
    }
181
0
    for (auto& target : committed_) {
182
0
      if (target.last_visit < query_ && v->ComesBefore(target)) {
183
0
        if (DepthFirstSearch(&target, dest)) {
184
0
          LOG(INFO) << "  " << yb::ToString(*v);
185
0
          return true;
186
0
        }
187
0
      }
188
0
    }
189
0
    if (!dest) {
190
0
      ordered_.push_back(v);
191
0
    }
192
0
    return false;
193
0
  }
194
195
  const size_t concurrent_keys_;
196
  std::string chars_;
197
198
  std::mutex mutex_;
199
  int next_key_ = 0;
200
  std::vector<OnConflictKey> active_keys_;
201
  std::vector<TransactionInfo> committed_;
202
  std::vector<TransactionInfo*> ordered_;
203
  // Number of depth-first search run, used to filter visited vertexes.
204
  int query_ = 1;
205
};
206
207
}  // anonymous namespace
208
209
// Check that INSERT .. ON CONFLICT .. does not generate duplicate key errors.
210
0
void PgOnConflictTest::TestOnConflict(bool kill_master, const MonoDelta& duration) {
211
0
#ifndef NDEBUG
212
0
  constexpr int kWriters = RegularBuildVsSanitizers(15, 5);
213
#else
214
  constexpr int kWriters = 25;
215
#endif
216
0
  auto conn = ASSERT_RESULT(Connect());
217
218
0
  ASSERT_OK(conn.Execute("CREATE TABLE test (k int PRIMARY KEY, v TEXT)"));
219
220
0
  std::atomic<int> processed(0);
221
0
  TestThreadHolder thread_holder;
222
0
  OnConflictHelper helper(3);
223
0
  for (int i = 0; i != kWriters; ++i) {
224
0
    thread_holder.AddThreadFunctor(
225
0
        [this, &stop = thread_holder.stop_flag(), &processed, &helper] {
226
0
      SetFlagOnExit set_flag_on_exit(&stop);
227
0
      auto connection = ASSERT_RESULT(Connect());
228
0
      char value[2] = "0";
229
0
      while (!stop.load(std::memory_order_acquire)) {
230
0
        int batch_size = RandomUniformInt(2, kMaxBatchSize);
231
0
        TransactionInfo transaction_info;
232
0
        transaction_info.batch_size = batch_size;
233
0
        bool ok = false;
234
0
        if (batch_size != 1) {
235
0
          ASSERT_OK(connection.Execute("START TRANSACTION ISOLATION LEVEL SERIALIZABLE"));
236
0
        }
237
0
        auto se = ScopeExit([&connection, batch_size, &ok, &processed, &helper, &transaction_info] {
238
0
          if (batch_size != 1) {
239
0
            if (ok) {
240
0
              auto status = connection.Execute("COMMIT");
241
0
              if (status.ok()) {
242
0
                ++processed;
243
0
                helper.Committed(std::move(transaction_info));
244
0
                return;
245
0
              }
246
0
              auto msg = status.message().ToBuffer();
247
0
              if (msg.find("expired or aborted by a conflict") == std::string::npos &&
248
0
                  msg.find("Transaction aborted") == std::string::npos) {
249
0
                ASSERT_OK(status);
250
0
              }
251
0
            }
252
0
            ASSERT_OK(connection.Execute("ROLLBACK"));
253
0
          } else if (ok) {
254
            // To re-enable this we need to decrease the lower bound of batch_size to 1.
255
0
            ++processed;
256
0
          }
257
0
        });
258
0
        ok = true;
259
0
        for (int j = 0; j != batch_size; ++j) {
260
0
          auto key_and_appended_char = helper.RandomPair();
261
0
          Status status;
262
0
          auto& current_batch = transaction_info.batches[j];
263
0
          current_batch.key = key_and_appended_char.first;
264
0
          current_batch.append_char = key_and_appended_char.second;
265
0
          if (key_and_appended_char.second) {
266
0
            value[0] = key_and_appended_char.second;
267
0
            status = connection.ExecuteFormat(
268
0
                "INSERT INTO test (k, v) VALUES ($0, '$1') ON CONFLICT (K) DO "
269
0
                "UPDATE SET v = CONCAT(test.v, '$1')",
270
0
                key_and_appended_char.first, value);
271
0
          } else {
272
0
            auto result = connection.FetchFormat(
273
0
                "SELECT v FROM test WHERE k = $0", key_and_appended_char.first);
274
0
            if (!result.ok()) {
275
0
              status = result.status();
276
0
            } else {
277
0
              auto tuples = PQntuples(result->get());
278
0
              if (tuples == 1) {
279
0
                ASSERT_EQ(PQnfields(result->get()), 1);
280
0
                current_batch.read_value = ASSERT_RESULT(
281
0
                    GetString(result->get(), 0, 0));
282
0
              } else {
283
0
                ASSERT_EQ(tuples, 0);
284
0
              }
285
0
            }
286
0
          }
287
0
          if (status.ok()) {
288
0
            continue;
289
0
          }
290
0
          ok = false;
291
0
          if (TransactionalFailure(status)) {
292
0
            break;
293
0
          }
294
0
          auto msg = status.message().ToBuffer();
295
0
          if (msg.find("Snapshot too old: Snapshot too old.") != std::string::npos ||
296
0
              msg.find("Commit of expired transaction") != std::string::npos ||
297
0
              msg.find("Catalog Version Mismatch") != std::string::npos ||
298
0
              msg.find("Soft memory limit exceeded") != std::string::npos ||
299
0
              msg.find("Missing metadata for transaction") != std::string::npos ||
300
0
              msg.find("timed out after deadline expired") != std::string::npos) {
301
0
            break;
302
0
          }
303
304
0
          ASSERT_OK(status);
305
0
        }
306
0
      }
307
0
    });
308
0
  }
309
310
0
  if (!kill_master) {
311
0
    thread_holder.WaitAndStop(duration.ToSteadyDuration());
312
0
  } else {
313
    // Every 15 seconds, pick a random master, then kill it if it is running, otherwise resume it.
314
0
    auto deadline = CoarseMonoClock::now() + duration;
315
0
    auto num_masters = cluster_->num_masters();
316
0
    while (!thread_holder.stop_flag().load(std::memory_order_acquire)) {
317
0
      MonoDelta left(deadline - CoarseMonoClock::now());
318
0
      if (left < MonoDelta::kZero) {
319
0
        break;
320
0
      }
321
0
      auto* master = cluster_->master(RandomUniformInt<size_t>(0, num_masters - 1));
322
0
      if (master->IsProcessAlive()) {
323
0
        std::this_thread::sleep_for(
324
0
            std::min(left, MonoDelta(20s) * kTimeMultiplier).ToSteadyDuration());
325
0
        LOG(INFO) << "Killing: " << master->uuid();
326
0
        master->Shutdown();
327
0
      } else {
328
0
        std::this_thread::sleep_for(
329
0
            std::min(left, MonoDelta(15s)).ToSteadyDuration());
330
0
        LOG(INFO) << "Resuming: " << master->uuid();
331
0
        ASSERT_OK(master->Start());
332
0
      }
333
0
      int live_masters = 0;
334
0
      for (size_t i = 0; i != num_masters; ++i) {
335
0
        if (cluster_->master(i)->IsProcessAlive()) {
336
0
          ++live_masters;
337
0
        }
338
0
      }
339
0
      LOG(INFO) << "Live masters: " << live_masters;
340
0
    }
341
342
0
    for (size_t i = 0; i != num_masters; ++i) {
343
0
      if (!cluster_->master(i)->IsProcessAlive()) {
344
0
        ASSERT_OK(cluster_->master(i)->Start());
345
0
      }
346
0
    }
347
348
0
    thread_holder.Stop();
349
0
  }
350
351
0
  for (;;) {
352
0
    auto res = conn.Fetch("SELECT * FROM test ORDER BY k");
353
0
    if (!res.ok()) {
354
0
      ASSERT_TRUE(TransactionalFailure(res.status())) << res.status();
355
0
      continue;
356
0
    }
357
0
    int cols = PQnfields(res->get());
358
0
    ASSERT_EQ(cols, 2);
359
0
    int rows = PQntuples(res->get());
360
0
    for (int i = 0; i != rows; ++i) {
361
0
      auto key = GetInt32(res->get(), i, 0);
362
0
      auto value = GetString(res->get(), i, 1);
363
0
      LOG(INFO) << "  " << key << ": " << value;
364
0
    }
365
0
    LOG(INFO) << "Total processed: " << processed.load(std::memory_order_acquire);
366
0
    break;
367
0
  }
368
369
0
  helper.Report();
370
0
}
371
372
0
TEST_F(PgOnConflictTest, YB_DISABLE_TEST_IN_TSAN(OnConflict)) {
373
0
  TestOnConflict(false /* kill_master */, 120s);
374
0
}
375
376
0
TEST_F(PgOnConflictTest, YB_DISABLE_TEST_IN_TSAN(OnConflictWithKillMaster)) {
377
0
  TestOnConflict(true /* kill_master */, 180s);
378
0
}
379
380
// When auto-commit fails block state switched to TBLOCK_ABORT.
381
// But correct state in this case is TBLOCK_DEFAULT.
382
// https://github.com/YugaByte/yugabyte-db/commit/73e966e5735efc21bf2ad43f9d961a488afbe050
383
0
TEST_F(PgOnConflictTest, YB_DISABLE_TEST_IN_TSAN(NoTxnOnConflict)) {
384
0
  constexpr int kWriters = 5;
385
0
  constexpr int kKeys = 20;
386
0
  auto conn = ASSERT_RESULT(Connect());
387
388
0
  ASSERT_OK(conn.Execute("CREATE TABLE test (k int PRIMARY KEY, v TEXT)"));
389
390
0
  TestThreadHolder thread_holder;
391
0
  for (int i = 0; i != kWriters; ++i) {
392
0
    thread_holder.AddThreadFunctor([this, &stop = thread_holder.stop_flag()] {
393
0
      SetFlagOnExit set_flag_on_exit(&stop);
394
0
      auto connection = ASSERT_RESULT(Connect());
395
0
      char value[2] = "0";
396
0
      while (!stop.load(std::memory_order_acquire)) {
397
0
        int key = RandomUniformInt(1, kKeys);
398
0
        value[0] = RandomUniformInt('A', 'Z');
399
0
        auto status = connection.ExecuteFormat(
400
0
            "INSERT INTO test (k, v) VALUES ($0, '$1') ON CONFLICT (K) DO "
401
0
            "UPDATE SET v = CONCAT(test.v, '$1')",
402
0
            key, value);
403
0
        if (status.ok() || TransactionalFailure(status)) {
404
0
          continue;
405
0
        }
406
0
        ASSERT_OK(status);
407
0
      }
408
0
    });
409
0
  }
410
411
0
  thread_holder.WaitAndStop(30s);
412
0
  LogResult(ASSERT_RESULT(conn.Fetch("SELECT * FROM test ORDER BY k")).get());
413
0
}
414
415
0
TEST_F(PgOnConflictTest, YB_DISABLE_TEST_IN_TSAN(ValidSessionAfterTxnCommitConflict)) {
416
0
  auto conn = ASSERT_RESULT(Connect());
417
0
  ASSERT_OK(conn.Execute("CREATE TABLE test (k int PRIMARY KEY)"));
418
0
  ASSERT_OK(conn.Execute("BEGIN"));
419
0
  ASSERT_OK(conn.Execute("INSERT INTO test VALUES(1)"));
420
0
  auto extra_conn = ASSERT_RESULT(Connect());
421
0
  ASSERT_OK(extra_conn.Execute("INSERT INTO test VALUES(1)"));
422
0
  ASSERT_NOK(conn.Execute("COMMIT"));
423
  // Check connection is in valid state after failed COMMIT
424
0
  auto result_ptr = ASSERT_RESULT(conn.Fetch("SELECT * FROM test"));
425
0
  auto value = ASSERT_RESULT(GetInt32(result_ptr.get(), 0, 0));
426
0
  ASSERT_EQ(value, 1);
427
0
}
428
429
} // namespace pgwrapper
430
} // namespace yb