YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/rocksdb/utilities/transactions/transaction_impl.cc
Line
Count
Source (jump to first uncovered line)
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under the BSD-style license found in the
3
//  LICENSE file in the root directory of this source tree. An additional grant
4
//  of patent rights can be found in the PATENTS file in the same directory.
5
//
6
// The following only applies to changes made to this file as part of YugaByte development.
7
//
8
// Portions Copyright (c) YugaByte, Inc.
9
//
10
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
11
// in compliance with the License.  You may obtain a copy of the License at
12
//
13
// http://www.apache.org/licenses/LICENSE-2.0
14
//
15
// Unless required by applicable law or agreed to in writing, software distributed under the License
16
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
17
// or implied.  See the License for the specific language governing permissions and limitations
18
// under the License.
19
//
20
21
#ifndef ROCKSDB_LITE
22
23
#include "yb/rocksdb/utilities/transactions/transaction_impl.h"
24
25
#include <map>
26
#include <set>
27
#include <string>
28
#include <vector>
29
30
#include "yb/rocksdb/comparator.h"
31
#include "yb/rocksdb/db.h"
32
#include "yb/rocksdb/db/db_impl.h"
33
#include "yb/rocksdb/snapshot.h"
34
#include "yb/rocksdb/status.h"
35
#include "yb/rocksdb/util/sync_point.h"
36
#include "yb/rocksdb/utilities/transactions/transaction_db_impl.h"
37
#include "yb/rocksdb/utilities/transactions/transaction_util.h"
38
39
namespace rocksdb {
40
41
struct WriteOptions;
42
43
std::atomic<TransactionID> TransactionImpl::txn_id_counter_(1);
44
45
3.18k
TransactionID TransactionImpl::GenTxnID() {
46
3.18k
  return txn_id_counter_.fetch_add(1);
47
3.18k
}
48
49
TransactionImpl::TransactionImpl(TransactionDB* txn_db,
50
                                 const WriteOptions& write_options,
51
                                 const TransactionOptions& txn_options)
52
    : TransactionBaseImpl(txn_db->GetBaseDB(), write_options),
53
      txn_db_impl_(nullptr),
54
      txn_id_(0),
55
      expiration_time_(0),
56
      lock_timeout_(0),
57
3.18k
      exec_status_(STARTED) {
58
3.18k
  txn_db_impl_ = dynamic_cast<TransactionDBImpl*>(txn_db);
59
3.18k
  assert(txn_db_impl_);
60
61
3.18k
  Initialize(txn_options);
62
3.18k
}
63
64
3.18k
void TransactionImpl::Initialize(const TransactionOptions& txn_options) {
65
3.18k
  txn_id_ = GenTxnID();
66
67
3.18k
  exec_status_ = STARTED;
68
69
3.18k
  lock_timeout_ = txn_options.lock_timeout * 1000;
70
3.18k
  if (lock_timeout_ < 0) {
71
    // Lock timeout not set, use default
72
3.17k
    lock_timeout_ =
73
3.17k
        txn_db_impl_->GetTxnDBOptions().transaction_lock_timeout * 1000;
74
3.17k
  }
75
76
3.18k
  if (txn_options.expiration >= 0) {
77
9
    expiration_time_ = start_time_ + txn_options.expiration * 1000;
78
3.17k
  } else {
79
3.17k
    expiration_time_ = 0;
80
3.17k
  }
81
82
3.18k
  if (txn_options.set_snapshot) {
83
19
    SetSnapshot();
84
19
  }
85
86
3.18k
  if (expiration_time_ > 0) {
87
9
    txn_db_impl_->InsertExpirableTransaction(txn_id_, this);
88
9
  }
89
3.18k
}
90
91
3.18k
TransactionImpl::~TransactionImpl() {
92
3.18k
  txn_db_impl_->UnLock(this, &GetTrackedKeys());
93
3.18k
  if (expiration_time_ > 0) {
94
8
    txn_db_impl_->RemoveExpirableTransaction(txn_id_);
95
8
  }
96
3.18k
}
97
98
3.17k
void TransactionImpl::Clear() {
99
3.17k
  txn_db_impl_->UnLock(this, &GetTrackedKeys());
100
3.17k
  TransactionBaseImpl::Clear();
101
3.17k
}
102
103
void TransactionImpl::Reinitialize(TransactionDB* txn_db,
104
                                   const WriteOptions& write_options,
105
6
                                   const TransactionOptions& txn_options) {
106
6
  TransactionBaseImpl::Reinitialize(txn_db->GetBaseDB(), write_options);
107
6
  Initialize(txn_options);
108
6
}
109
110
13
bool TransactionImpl::IsExpired() const {
111
13
  if (expiration_time_ > 0) {
112
13
    if (db_->GetEnv()->NowMicros() >= expiration_time_) {
113
      // Transaction is expired.
114
9
      return true;
115
9
    }
116
4
  }
117
118
4
  return false;
119
4
}
120
121
3
Status TransactionImpl::CommitBatch(WriteBatch* batch) {
122
3
  TransactionKeyMap keys_to_unlock;
123
124
3
  Status s = LockBatch(batch, &keys_to_unlock);
125
126
3
  if (s.ok()) {
127
3
    s = DoCommit(batch);
128
129
3
    txn_db_impl_->UnLock(this, &keys_to_unlock);
130
3
  }
131
132
3
  return s;
133
3
}
134
135
3.15k
Status TransactionImpl::Commit() {
136
3.15k
  Status s = DoCommit(GetWriteBatch()->GetWriteBatch());
137
138
3.15k
  Clear();
139
140
3.15k
  return s;
141
3.15k
}
142
143
3.16k
Status TransactionImpl::DoCommit(WriteBatch* batch) {
144
3.16k
  Status s;
145
146
3.16k
  if (expiration_time_ > 0) {
147
8
    if (IsExpired()) {
148
4
      return STATUS(Expired, "");
149
4
    }
150
151
    // Transaction should only be committed if the thread succeeds
152
    // changing its execution status to COMMITTING. This is because
153
    // A different transaction may consider this one expired and attempt
154
    // to steal its locks between the IsExpired() check and the beginning
155
    // of a commit.
156
4
    ExecutionStatus expected = STARTED;
157
4
    bool can_commit = std::atomic_compare_exchange_strong(
158
4
        &exec_status_, &expected, COMMITTING);
159
160
4
    TEST_SYNC_POINT("TransactionTest::ExpirableTransactionDataRace:1");
161
162
4
    if (can_commit) {
163
4
      s = db_->Write(write_options_, batch);
164
0
    } else {
165
0
      assert(exec_status_ == LOCKS_STOLEN);
166
0
      return STATUS(Expired, "");
167
0
    }
168
3.15k
  } else {
169
3.15k
    s = db_->Write(write_options_, batch);
170
3.15k
  }
171
172
3.15k
  return s;
173
3.16k
}
174
175
8
void TransactionImpl::Rollback() { Clear(); }
176
177
15
Status TransactionImpl::RollbackToSavePoint() {
178
  // Unlock any keys locked since last transaction
179
15
  const std::unique_ptr<TransactionKeyMap>& keys =
180
15
      GetTrackedKeysSinceSavePoint();
181
182
15
  if (keys) {
183
12
    txn_db_impl_->UnLock(this, keys.get());
184
12
  }
185
186
15
  return TransactionBaseImpl::RollbackToSavePoint();
187
15
}
188
189
// Lock all keys in this batch.
190
// On success, caller should unlock keys_to_unlock
191
Status TransactionImpl::LockBatch(WriteBatch* batch,
192
3
                                  TransactionKeyMap* keys_to_unlock) {
193
3
  class Handler : public WriteBatch::Handler {
194
3
   public:
195
    // Sorted map of column_family_id to sorted set of keys.
196
    // Since LockBatch() always locks keys in sorted order, it cannot deadlock
197
    // with itself.  We're not using a comparator here since it doesn't matter
198
    // what the sorting is as long as it's consistent.
199
3
    std::map<uint32_t, std::set<std::string>> keys_;
200
201
3
    Handler() {}
202
203
3
    void RecordKey(uint32_t column_family_id, const Slice& key) {
204
3
      std::string key_str = key.ToString();
205
206
3
      auto iter = (keys_)[column_family_id].find(key_str);
207
3
      if (iter == (keys_)[column_family_id].end()) {
208
        // key not yet seen, store it.
209
3
        (keys_)[column_family_id].insert({std::move(key_str)});
210
3
      }
211
3
    }
212
213
3
    virtual Status PutCF(uint32_t column_family_id, const SliceParts& key,
214
3
                         const SliceParts& value) override {
215
3
      RecordKey(column_family_id, key.TheOnlyPart());
216
3
      return Status::OK();
217
3
    }
218
3
    virtual Status MergeCF(uint32_t column_family_id, const Slice& key,
219
0
                           const Slice& value) override {
220
0
      RecordKey(column_family_id, key);
221
0
      return Status::OK();
222
0
    }
223
3
    virtual Status DeleteCF(uint32_t column_family_id,
224
0
                            const Slice& key) override {
225
0
      RecordKey(column_family_id, key);
226
0
      return Status::OK();
227
0
    }
228
3
  };
229
230
  // Iterating on this handler will add all keys in this batch into keys
231
3
  Handler handler;
232
3
  RETURN_NOT_OK(batch->Iterate(&handler));
233
234
3
  Status s;
235
236
  // Attempt to lock all keys
237
2
  for (const auto& cf_iter : handler.keys_) {
238
2
    uint32_t cfh_id = cf_iter.first;
239
2
    auto& cfh_keys = cf_iter.second;
240
241
3
    for (const auto& key_iter : cfh_keys) {
242
3
      const std::string& key = key_iter;
243
244
3
      s = txn_db_impl_->TryLock(this, cfh_id, key);
245
3
      if (!s.ok()) {
246
0
        break;
247
0
      }
248
3
      TrackKey(keys_to_unlock, cfh_id, std::move(key), kMaxSequenceNumber,
249
3
               false);
250
3
    }
251
252
2
    if (!s.ok()) {
253
0
      break;
254
0
    }
255
2
  }
256
257
3
  if (!s.ok()) {
258
0
    txn_db_impl_->UnLock(this, keys_to_unlock);
259
0
  }
260
261
3
  return s;
262
3
}
263
264
// Attempt to lock this key.
265
// Returns OK if the key has been successfully locked.  Non-ok, otherwise.
266
// If check_shapshot is true and this transaction has a snapshot set,
267
// this key will only be locked if there have been no writes to this key since
268
// the snapshot time.
269
Status TransactionImpl::TryLock(ColumnFamilyHandle* column_family,
270
                                const Slice& key, bool read_only,
271
3.41k
                                bool untracked) {
272
3.41k
  uint32_t cfh_id = GetColumnFamilyID(column_family);
273
3.41k
  std::string key_str = key.ToString();
274
3.41k
  bool previously_locked;
275
3.41k
  Status s;
276
277
  // lock this key if this transactions hasn't already locked it
278
3.41k
  SequenceNumber current_seqno = kMaxSequenceNumber;
279
3.41k
  SequenceNumber new_seqno = kMaxSequenceNumber;
280
281
3.41k
  const auto& tracked_keys = GetTrackedKeys();
282
3.41k
  const auto tracked_keys_cf = tracked_keys.find(cfh_id);
283
3.41k
  if (tracked_keys_cf == tracked_keys.end()) {
284
3.21k
    previously_locked = false;
285
197
  } else {
286
197
    auto iter = tracked_keys_cf->second.find(key_str);
287
197
    if (iter == tracked_keys_cf->second.end()) {
288
112
      previously_locked = false;
289
85
    } else {
290
85
      previously_locked = true;
291
85
      current_seqno = iter->second.seq;
292
85
    }
293
197
  }
294
295
  // lock this key if this transactions hasn't already locked it
296
3.41k
  if (!previously_locked) {
297
3.33k
    s = txn_db_impl_->TryLock(this, cfh_id, key_str);
298
3.33k
  }
299
300
3.41k
  SetSnapshotIfNeeded();
301
302
  // Even though we do not care about doing conflict checking for this write,
303
  // we still need to take a lock to make sure we do not cause a conflict with
304
  // some other write.  However, we do not need to check if there have been
305
  // any writes since this transaction's snapshot.
306
  // TODO(agiardullo): could optimize by supporting shared txn locks in the
307
  // future
308
3.41k
  if (untracked || snapshot_ == nullptr) {
309
    // Need to remember the earliest sequence number that we know that this
310
    // key has not been modified after.  This is useful if this same
311
    // transaction
312
    // later tries to lock this key again.
313
3.30k
    if (current_seqno == kMaxSequenceNumber) {
314
      // Since we haven't checked a snapshot, we only know this key has not
315
      // been modified since after we locked it.
316
3.25k
      new_seqno = db_->GetLatestSequenceNumber();
317
58
    } else {
318
58
      new_seqno = current_seqno;
319
58
    }
320
106
  } else {
321
    // If a snapshot is set, we need to make sure the key hasn't been modified
322
    // since the snapshot.  This must be done after we locked the key.
323
106
    if (s.ok()) {
324
102
      s = ValidateSnapshot(column_family, key, current_seqno, &new_seqno);
325
326
102
      if (!s.ok()) {
327
        // Failed to validate key
328
25
        if (!previously_locked) {
329
          // Unlock key we just locked
330
24
          txn_db_impl_->UnLock(this, cfh_id, key.ToString());
331
24
        }
332
25
      }
333
102
    }
334
106
  }
335
336
3.41k
  if (s.ok()) {
337
    // Let base class know we've conflict checked this key.
338
3.31k
    TrackKey(cfh_id, key_str, new_seqno, read_only);
339
3.31k
  }
340
341
3.41k
  return s;
342
3.41k
}
343
344
// Return OK() if this key has not been modified more recently than the
345
// transaction snapshot_.
346
Status TransactionImpl::ValidateSnapshot(ColumnFamilyHandle* column_family,
347
                                         const Slice& key,
348
                                         SequenceNumber prev_seqno,
349
102
                                         SequenceNumber* new_seqno) {
350
102
  assert(snapshot_);
351
352
102
  SequenceNumber seq = snapshot_->GetSequenceNumber();
353
102
  if (prev_seqno <= seq) {
354
    // If the key has been previous validated at a sequence number earlier
355
    // than the curent snapshot's sequence number, we already know it has not
356
    // been modified.
357
26
    return Status::OK();
358
26
  }
359
360
76
  *new_seqno = seq;
361
362
76
  assert(dynamic_cast<DBImpl*>(db_) != nullptr);
363
76
  auto db_impl = reinterpret_cast<DBImpl*>(db_);
364
365
76
  ColumnFamilyHandle* cfh =
366
41
      column_family ? column_family : db_impl->DefaultColumnFamily();
367
368
76
  return TransactionUtil::CheckKeyForConflicts(db_impl, cfh, key.ToString(),
369
76
                                               snapshot_->GetSequenceNumber(),
370
76
                                               false /* cache_only */);
371
76
}
372
373
5
bool TransactionImpl::TryStealingLocks() {
374
5
  assert(IsExpired());
375
5
  ExecutionStatus expected = STARTED;
376
5
  return std::atomic_compare_exchange_strong(&exec_status_, &expected,
377
5
                                             LOCKS_STOLEN);
378
5
}
379
380
void TransactionImpl::UnlockGetForUpdate(ColumnFamilyHandle* column_family,
381
7
                                         const Slice& key) {
382
7
  txn_db_impl_->UnLock(this, GetColumnFamilyID(column_family), key.ToString());
383
7
}
384
385
}  // namespace rocksdb
386
387
#endif  // ROCKSDB_LITE