YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/client/serializable-txn-test.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include "yb/client/error.h"
15
#include "yb/client/session.h"
16
#include "yb/client/transaction.h"
17
#include "yb/client/txn-test-base.h"
18
#include "yb/client/yb_op.h"
19
20
#include "yb/gutil/casts.h"
21
22
#include "yb/util/async_util.h"
23
#include "yb/util/random_util.h"
24
#include "yb/util/thread.h"
25
#include "yb/util/tsan_util.h"
26
27
using namespace std::literals;
28
29
DECLARE_int64(transaction_rpc_timeout_ms);
30
DECLARE_int32(txn_max_apply_batch_records);
31
32
namespace yb {
33
namespace client {
34
35
class SerializableTxnTest
36
    : public TransactionCustomLogSegmentSizeTest<0, TransactionTestBase<MiniCluster>> {
37
 protected:
38
6
  void SetUp() override {
39
6
    SetIsolationLevel(IsolationLevel::SERIALIZABLE_ISOLATION);
40
6
    TransactionTestBase::SetUp();
41
6
  }
42
43
  void TestIncrements(bool transactional);
44
  void TestIncrement(int key, bool transactional);
45
  void TestColoring();
46
};
47
48
0
TEST_F(SerializableTxnTest, NonConflictingWrites) {
49
0
  const auto kTransactions = 10;
50
0
  const auto kKey = 0;
51
52
0
  struct Entry {
53
0
    YBTransactionPtr txn;
54
0
    YBqlWriteOpPtr op;
55
0
    std::future<FlushStatus> flush_future;
56
0
    std::future<Status> commit_future;
57
0
    bool done = false;
58
0
  };
59
60
0
  std::vector<Entry> entries;
61
0
  for (int i = 0; i != kTransactions; ++i) {
62
0
    entries.emplace_back();
63
0
    auto& entry = entries.back();
64
0
    entry.txn = CreateTransaction();
65
0
    auto session = CreateSession(entry.txn);
66
0
    entry.op = ASSERT_RESULT(WriteRow(session, kKey, i));
67
0
    entry.flush_future = session->FlushFuture();
68
0
  }
69
70
0
  ASSERT_OK(WaitFor([&entries]() -> Result<bool> {
71
0
    for (auto& entry : entries) {
72
0
      if (entry.flush_future.valid() && IsReady(entry.flush_future)) {
73
0
        LOG(INFO) << "Flush done";
74
0
        RETURN_NOT_OK(entry.flush_future.get().status);
75
0
        entry.commit_future = entry.txn->CommitFuture();
76
0
      }
77
0
    }
78
79
0
    for (auto& entry : entries) {
80
0
      if (entry.commit_future.valid() && IsReady(entry.commit_future)) {
81
0
        LOG(INFO) << "Commit done";
82
0
        RETURN_NOT_OK(entry.commit_future.get());
83
0
        entry.done = true;
84
0
      }
85
0
    }
86
87
0
    for (const auto& entry : entries) {
88
0
      if (!entry.done) {
89
0
        return false;
90
0
      }
91
0
    }
92
93
0
    return true;
94
0
  }, 10s, "Complete all operations"));
95
96
0
  for (const auto& entry : entries) {
97
0
    ASSERT_EQ(entry.op->response().status(), QLResponsePB::YQL_STATUS_OK);
98
0
  }
99
0
}
100
101
0
TEST_F(SerializableTxnTest, ReadWriteConflict) {
102
0
  const auto kKeys = 20;
103
104
0
  size_t reads_won = 0, writes_won = 0;
105
0
  for (int i = 0; i != kKeys; ++i) {
106
0
    auto read_txn = CreateTransaction();
107
0
    auto read_session = CreateSession(read_txn);
108
0
    auto read = ReadRow(read_session, i);
109
0
    ASSERT_OK(read_session->Flush());
110
111
0
    auto write_txn = CreateTransaction();
112
0
    auto write_session = CreateSession(write_txn);
113
0
    auto write_status = ResultToStatus(WriteRow(
114
0
        write_session, i, i, WriteOpType::INSERT, Flush::kTrue));
115
116
0
    auto read_commit_future = read_txn->CommitFuture();
117
0
    if (write_status.ok()) {
118
0
      write_status = write_txn->CommitFuture().get();
119
0
    }
120
0
    auto read_status = read_commit_future.get();
121
122
0
    LOG(INFO) << "Read: " << read_status << ", write: " << write_status;
123
124
0
    if (!read_status.ok()) {
125
0
      ASSERT_OK(write_status);
126
0
      ++writes_won;
127
0
    } else {
128
0
      ASSERT_NOK(write_status);
129
0
      ++reads_won;
130
0
    }
131
0
  }
132
133
0
  LOG(INFO) << "Reads won: " << reads_won << ", writes won: " << writes_won;
134
0
  ASSERT_GE(reads_won, kKeys / 4);
135
0
  ASSERT_GE(writes_won, kKeys / 4);
136
0
}
137
138
// Execute UPDATE table SET value = value + 1 WHERE key = kKey in parallel, using
139
// serializable isolation.
140
// With retries the resulting value should be equal to number of increments.
141
0
void SerializableTxnTest::TestIncrement(int key, bool transactional) {
142
0
  const auto kIncrements = RegularBuildVsSanitizers(100, 20);
143
144
0
  {
145
0
    auto session = CreateSession();
146
0
    auto op = ASSERT_RESULT(WriteRow(session, key, 0));
147
0
    ASSERT_EQ(op->response().status(), QLResponsePB::YQL_STATUS_OK);
148
0
  }
149
150
0
  struct Entry {
151
0
    YBqlWriteOpPtr op;
152
0
    YBTransactionPtr txn;
153
0
    YBSessionPtr session;
154
0
    std::shared_future<FlushStatus> write_future;
155
0
    std::shared_future<Status> commit_future;
156
0
  };
157
158
0
  std::vector<Entry> entries;
159
160
0
  for (int i = 0; i != kIncrements; ++i) {
161
0
    Entry entry;
162
0
    entry.txn = transactional ? CreateTransaction() : nullptr;
163
0
    entry.session = CreateSession(entry.txn, clock_);
164
0
    entry.session->SetReadPoint(Restart::kFalse);
165
0
    entries.push_back(entry);
166
0
  }
167
168
  // For each of entries we do the following:
169
  // 1) Write increment operation.
170
  // 2) Wait until write complete and commit transaction of this entry.
171
  // 3) Wait until commit complete.
172
  // When failure happens on any step - retry from step 1.
173
  // Exit from loop when all entries successfully committed their transactions.
174
  // We do all actions in busy loop to get most possible concurrency for operations.
175
0
  for (;;) {
176
0
    bool incomplete = false;
177
0
    for (auto& entry : entries) {
178
0
      bool entry_complete = false;
179
0
      if (!entry.op) {
180
        // Execute UPDATE table SET value = value + 1 WHERE key = kKey
181
0
        entry.session->SetTransaction(entry.txn);
182
0
        entry.op = ASSERT_RESULT(kv_table_test::Increment(&table_, entry.session, key));
183
0
        entry.write_future = entry.session->FlushFuture();
184
0
      } else if (entry.write_future.valid()) {
185
0
        if (IsReady(entry.write_future)) {
186
0
          auto write_status = entry.write_future.get().status;
187
0
          entry.write_future = std::shared_future<FlushStatus>();
188
0
          if (!write_status.ok()) {
189
0
            ASSERT_TRUE(write_status.IsTryAgain() ||
190
0
                        ((write_status.IsTimedOut() || write_status.IsServiceUnavailable())
191
0
                            && transactional)) << write_status;
192
0
            entry.txn = transactional ? CreateTransaction() : nullptr;
193
0
            entry.op = nullptr;
194
0
          } else {
195
0
            if (entry.op->response().status() == QLResponsePB::YQL_STATUS_RESTART_REQUIRED_ERROR) {
196
0
              auto old_txn = entry.txn;
197
0
              if (transactional) {
198
0
                entry.txn = ASSERT_RESULT(entry.txn->CreateRestartedTransaction());
199
0
              } else {
200
0
                entry.session->SetReadPoint(Restart::kTrue);
201
0
              }
202
0
              entry.op = nullptr;
203
0
            } else {
204
0
              ASSERT_EQ(entry.op->response().status(), QLResponsePB::YQL_STATUS_OK);
205
0
              if (transactional) {
206
0
                entry.commit_future = entry.txn->CommitFuture();
207
0
              }
208
0
            }
209
0
          }
210
0
        }
211
0
      } else if (entry.commit_future.valid()) {
212
0
        if (IsReady(entry.commit_future)) {
213
0
          auto status = entry.commit_future.get();
214
0
          if (status.IsExpired()) {
215
0
            entry.txn = transactional ? CreateTransaction() : nullptr;
216
0
            entry.op = nullptr;
217
0
          } else {
218
0
            ASSERT_OK(status);
219
0
            entry.commit_future = std::shared_future<Status>();
220
0
          }
221
0
        }
222
0
      } else {
223
0
        entry_complete = true;
224
0
      }
225
0
      incomplete = incomplete || !entry_complete;
226
0
    }
227
0
    if (!incomplete) {
228
0
      break;
229
0
    }
230
0
  }
231
232
0
  auto value = ASSERT_RESULT(SelectRow(CreateSession(), key));
233
0
  ASSERT_EQ(value, kIncrements);
234
0
}
235
236
// Execute UPDATE table SET value = value + 1 WHERE key = kKey in parallel, using
237
// serializable isolation.
238
// With retries the resulting value should be equal to number of increments.
239
0
void SerializableTxnTest::TestIncrements(bool transactional) {
240
0
  FLAGS_transaction_rpc_timeout_ms = MonoDelta(1min).ToMilliseconds();
241
242
0
  const auto kThreads = RegularBuildVsSanitizers(3, 2);
243
244
0
  std::vector<std::thread> threads;
245
0
  while (threads.size() != kThreads) {
246
0
    int key = narrow_cast<int>(threads.size());
247
0
    threads.emplace_back([this, key, transactional] {
248
0
      CDSAttacher attacher;
249
0
      TestIncrement(key, transactional);
250
0
    });
251
0
  }
252
253
0
  for (auto& thread : threads) {
254
0
    thread.join();
255
0
  }
256
0
}
257
258
0
TEST_F(SerializableTxnTest, Increment) {
259
0
  TestIncrements(true /* transactional */);
260
0
}
261
262
0
TEST_F(SerializableTxnTest, IncrementNonTransactional) {
263
0
  TestIncrements(false /* transactional */);
264
0
}
265
266
// Test that repeats example from this article:
267
// https://blogs.msdn.microsoft.com/craigfr/2007/05/16/serializable-vs-snapshot-isolation-level/
268
//
269
// Multiple rows with values 0 and 1 are stored in table.
270
// Two concurrent transaction fetches all rows from table and does the following.
271
// First transaction changes value of all rows with value 0 to 1.
272
// Second transaction changes value of all rows with value 1 to 0.
273
// As outcome we should have rows with the same value.
274
//
275
// The described prodecure is repeated multiple times to increase probability of catching bug,
276
// w/o running test multiple times.
277
0
void SerializableTxnTest::TestColoring() {
278
0
  constexpr auto kKeys = 20;
279
0
  constexpr auto kColors = 2;
280
0
  constexpr auto kIterations = 20;
281
282
0
  size_t iterations_left = kIterations;
283
0
  for (int i = 0; iterations_left > 0 && !testing::Test::HasFailure(); ++i) {
284
0
    SCOPED_TRACE(Format("Iteration: $0", i));
285
286
0
    auto session = CreateSession(nullptr /* transaction */, clock_);
287
0
    session->SetForceConsistentRead(ForceConsistentRead::kTrue);
288
289
0
    {
290
0
      std::vector<YBqlWriteOpPtr> ops;
291
0
      for (int j = 0; j != kKeys; ++j) {
292
0
        auto color = RandomUniformInt(0, kColors - 1);
293
0
        ops.push_back(ASSERT_RESULT(WriteRow(session,
294
0
            j,
295
0
            color,
296
0
            WriteOpType::INSERT,
297
0
            Flush::kFalse)));
298
0
      }
299
300
0
      ASSERT_OK(session->Flush());
301
302
0
      for (const auto& op : ops) {
303
0
        ASSERT_OK(CheckOp(op.get()));
304
0
      }
305
0
    }
306
307
0
    std::vector<std::thread> threads;
308
0
    std::atomic<size_t> successes(0);
309
310
0
    while (threads.size() != kColors) {
311
0
      int32_t color = narrow_cast<int32_t>(threads.size());
312
0
      threads.emplace_back([this, color, &successes, kKeys] {
313
0
        CDSAttacher attacher;
314
0
        for (;;) {
315
0
          auto txn = CreateTransaction();
316
0
          LOG(INFO) << "Start: " << txn->id() << ", color: " << color;
317
0
          auto session = CreateSession(txn);
318
0
          session->SetTransaction(txn);
319
0
          auto values = SelectAllRows(session);
320
0
          if (!values.ok()) {
321
0
            ASSERT_TRUE(values.status().IsTryAgain()) << values.status();
322
0
            continue;
323
0
          }
324
0
          ASSERT_EQ(values->size(), kKeys);
325
326
0
          std::vector<YBqlWriteOpPtr> ops;
327
0
          for (const auto& p : *values) {
328
0
            if (p.second == color) {
329
0
              continue;
330
0
            }
331
0
            ops.push_back(ASSERT_RESULT(WriteRow(
332
0
                session, p.first, color, WriteOpType::INSERT, Flush::kFalse)));
333
0
          }
334
335
0
          if (ops.empty()) {
336
0
            break;
337
0
          }
338
339
0
          auto flush_status = session->Flush();
340
0
          if (!flush_status.ok()) {
341
0
            ASSERT_TRUE(flush_status.IsTryAgain()) << flush_status;
342
0
            break;
343
0
          }
344
345
0
          for (const auto& op : ops) {
346
0
            ASSERT_OK(CheckOp(op.get()));
347
0
          }
348
349
0
          LOG(INFO) << "Commit: " << txn->id() << ", color: " << color;
350
0
          auto commit_status = txn->CommitFuture().get();
351
0
          if (!commit_status.ok()) {
352
0
            ASSERT_TRUE(commit_status.IsExpired()) << commit_status;
353
0
            break;
354
0
          }
355
356
0
          ++successes;
357
0
          break;
358
0
        }
359
0
      });
360
0
    }
361
362
0
    for (auto& thread : threads) {
363
0
      thread.join();
364
0
    }
365
366
0
    if (successes == 0) {
367
0
      continue;
368
0
    }
369
370
0
    session->SetReadPoint(Restart::kFalse);
371
0
    auto values = ASSERT_RESULT(SelectAllRows(session));
372
0
    ASSERT_EQ(values.size(), kKeys);
373
0
    LOG(INFO) << "Values: " << yb::ToString(values);
374
0
    int32_t color = -1;
375
0
    for (const auto& p : values) {
376
0
      if (color == -1) {
377
0
        color = p.second;
378
0
      } else {
379
0
        ASSERT_EQ(color, p.second);
380
0
      }
381
0
    }
382
0
    --iterations_left;
383
0
  }
384
0
}
385
386
0
TEST_F(SerializableTxnTest, Coloring) {
387
0
  TestColoring();
388
0
}
389
390
0
TEST_F(SerializableTxnTest, ColoringWithLongApply) {
391
0
  FLAGS_txn_max_apply_batch_records = 3;
392
0
  TestColoring();
393
0
}
394
395
} // namespace client
396
} // namespace yb