YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/rocksdb/utilities/transactions/transaction_db_impl.cc
Line
Count
Source (jump to first uncovered line)
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under the BSD-style license found in the
3
//  LICENSE file in the root directory of this source tree. An additional grant
4
//  of patent rights can be found in the PATENTS file in the same directory.
5
//
6
// The following only applies to changes made to this file as part of YugaByte development.
7
//
8
// Portions Copyright (c) YugaByte, Inc.
9
//
10
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
11
// in compliance with the License.  You may obtain a copy of the License at
12
//
13
// http://www.apache.org/licenses/LICENSE-2.0
14
//
15
// Unless required by applicable law or agreed to in writing, software distributed under the License
16
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
17
// or implied.  See the License for the specific language governing permissions and limitations
18
// under the License.
19
//
20
21
#ifndef ROCKSDB_LITE
22
23
#include "yb/rocksdb/utilities/transactions/transaction_db_impl.h"
24
25
#include <string>
26
#include <vector>
27
28
#include "yb/rocksdb/db.h"
29
#include "yb/rocksdb/options.h"
30
#include "yb/rocksdb/utilities/transaction_db.h"
31
#include "yb/rocksdb/utilities/transactions/transaction_db_mutex_impl.h"
32
#include "yb/rocksdb/utilities/transactions/transaction_impl.h"
33
34
namespace rocksdb {
35
36
TransactionDBImpl::TransactionDBImpl(DB* db,
37
                                     const TransactionDBOptions& txn_db_options)
38
    : TransactionDB(db),
39
      txn_db_options_(txn_db_options),
40
      lock_mgr_(this, txn_db_options_.num_stripes, txn_db_options.max_num_locks,
41
                txn_db_options_.custom_mutex_factory
42
                    ? txn_db_options_.custom_mutex_factory
43
                    : std::shared_ptr<TransactionDBMutexFactory>(
44
47
                          new TransactionDBMutexFactoryImpl())) {}
45
46
Transaction* TransactionDBImpl::BeginTransaction(
47
    const WriteOptions& write_options, const TransactionOptions& txn_options,
48
3.18k
    Transaction* old_txn) {
49
3.18k
  if (old_txn != nullptr) {
50
6
    ReinitializeTransaction(old_txn, write_options, txn_options);
51
6
    return old_txn;
52
3.18k
  } else {
53
3.18k
    return new TransactionImpl(this, write_options, txn_options);
54
3.18k
  }
55
3.18k
}
56
57
TransactionDBOptions TransactionDBImpl::ValidateTxnDBOptions(
58
47
    const TransactionDBOptions& txn_db_options) {
59
47
  TransactionDBOptions validated = txn_db_options;
60
61
47
  if (txn_db_options.num_stripes == 0) {
62
0
    validated.num_stripes = 1;
63
0
  }
64
65
47
  return validated;
66
47
}
67
68
Status TransactionDB::Open(const Options& options,
69
                           const TransactionDBOptions& txn_db_options,
70
45
                           const std::string& dbname, TransactionDB** dbptr) {
71
45
  DBOptions db_options(options);
72
45
  ColumnFamilyOptions cf_options(options);
73
45
  std::vector<ColumnFamilyDescriptor> column_families;
74
45
  column_families.push_back(
75
45
      ColumnFamilyDescriptor(kDefaultColumnFamilyName, cf_options));
76
45
  std::vector<ColumnFamilyHandle*> handles;
77
45
  Status s = TransactionDB::Open(db_options, txn_db_options, dbname,
78
45
                                 column_families, &handles, dbptr);
79
45
  if (s.ok()) {
80
45
    assert(handles.size() == 1);
81
    // i can delete the handle since DBImpl is always holding a reference to
82
    // default column family
83
45
    delete handles[0];
84
45
  }
85
86
45
  return s;
87
45
}
88
89
Status TransactionDB::Open(
90
    const DBOptions& db_options, const TransactionDBOptions& txn_db_options,
91
    const std::string& dbname,
92
    const std::vector<ColumnFamilyDescriptor>& column_families,
93
47
    std::vector<ColumnFamilyHandle*>* handles, TransactionDB** dbptr) {
94
47
  Status s;
95
47
  DB* db;
96
97
47
  std::vector<ColumnFamilyDescriptor> column_families_copy = column_families;
98
47
  std::vector<size_t> compaction_enabled_cf_indices;
99
100
  // Enable MemTable History if not already enabled
101
98
  for (size_t i = 0; i < column_families_copy.size(); i++) {
102
51
    ColumnFamilyOptions* options = &column_families_copy[i].options;
103
104
51
    if (options->max_write_buffer_number_to_maintain == 0) {
105
      // Setting to -1 will set the History size to max_write_buffer_number.
106
51
      options->max_write_buffer_number_to_maintain = -1;
107
51
    }
108
109
51
    if (!options->disable_auto_compactions) {
110
      // Disable compactions momentarily to prevent race with DB::Open
111
50
      options->disable_auto_compactions = true;
112
50
      compaction_enabled_cf_indices.push_back(i);
113
50
    }
114
51
  }
115
116
47
  s = DB::Open(db_options, dbname, column_families_copy, handles, &db);
117
118
47
  if (s.ok()) {
119
47
    TransactionDBImpl* txn_db = new TransactionDBImpl(
120
47
        db, TransactionDBImpl::ValidateTxnDBOptions(txn_db_options));
121
47
    *dbptr = txn_db;
122
123
51
    for (auto cf_ptr : *handles) {
124
51
      txn_db->AddColumnFamily(cf_ptr);
125
51
    }
126
127
    // Re-enable compaction for the column families that initially had
128
    // compaction enabled.
129
47
    assert(column_families_copy.size() == (*handles).size());
130
47
    std::vector<ColumnFamilyHandle*> compaction_enabled_cf_handles;
131
47
    compaction_enabled_cf_handles.reserve(compaction_enabled_cf_indices.size());
132
50
    for (auto index : compaction_enabled_cf_indices) {
133
50
      compaction_enabled_cf_handles.push_back((*handles)[index]);
134
50
    }
135
136
47
    s = txn_db->EnableAutoCompaction(compaction_enabled_cf_handles);
137
47
  }
138
139
47
  return s;
140
47
}
141
142
// Let TransactionLockMgr know that this column family exists so it can
143
// allocate a LockMap for it.
144
51
void TransactionDBImpl::AddColumnFamily(const ColumnFamilyHandle* handle) {
145
51
  lock_mgr_.AddColumnFamily(handle->GetID());
146
51
}
147
148
Status TransactionDBImpl::CreateColumnFamily(
149
    const ColumnFamilyOptions& options, const std::string& column_family_name,
150
6
    ColumnFamilyHandle** handle) {
151
6
  InstrumentedMutexLock l(&column_family_mutex_);
152
153
6
  Status s = db_->CreateColumnFamily(options, column_family_name, handle);
154
6
  if (s.ok()) {
155
6
    lock_mgr_.AddColumnFamily((*handle)->GetID());
156
6
  }
157
158
6
  return s;
159
6
}
160
161
// Let TransactionLockMgr know that it can deallocate the LockMap for this
162
// column family.
163
4
Status TransactionDBImpl::DropColumnFamily(ColumnFamilyHandle* column_family) {
164
4
  InstrumentedMutexLock l(&column_family_mutex_);
165
166
4
  Status s = db_->DropColumnFamily(column_family);
167
4
  if (s.ok()) {
168
4
    lock_mgr_.RemoveColumnFamily(column_family->GetID());
169
4
  }
170
171
4
  return s;
172
4
}
173
174
Status TransactionDBImpl::TryLock(TransactionImpl* txn, uint32_t cfh_id,
175
3.33k
                                  const std::string& key) {
176
3.33k
  return lock_mgr_.TryLock(txn, cfh_id, key, GetEnv());
177
3.33k
}
178
179
void TransactionDBImpl::UnLock(TransactionImpl* txn,
180
6.37k
                               const TransactionKeyMap* keys) {
181
6.37k
  lock_mgr_.UnLock(txn, keys, GetEnv());
182
6.37k
}
183
184
void TransactionDBImpl::UnLock(TransactionImpl* txn, uint32_t cfh_id,
185
31
                               const std::string& key) {
186
31
  lock_mgr_.UnLock(txn, cfh_id, key, GetEnv());
187
31
}
188
189
// Used when wrapping DB write operations in a transaction
190
Transaction* TransactionDBImpl::BeginInternalTransaction(
191
3.09k
    const WriteOptions& options) {
192
3.09k
  TransactionOptions txn_options;
193
3.09k
  Transaction* txn = BeginTransaction(options, txn_options, nullptr);
194
195
3.09k
  assert(dynamic_cast<TransactionImpl*>(txn) != nullptr);
196
3.09k
  auto txn_impl = reinterpret_cast<TransactionImpl*>(txn);
197
198
  // Use default timeout for non-transactional writes
199
3.09k
  txn_impl->SetLockTimeout(txn_db_options_.default_lock_timeout);
200
201
3.09k
  return txn;
202
3.09k
}
203
204
// All user Put, Merge, Delete, and Write requests must be intercepted to make
205
// sure that they lock all keys that they are writing to avoid causing conflicts
206
// with any concurent transactions. The easiest way to do this is to wrap all
207
// write operations in a transaction.
208
//
209
// Put(), Merge(), and Delete() only lock a single key per call.  Write() will
210
// sort its keys before locking them.  This guarantees that TransactionDB write
211
// methods cannot deadlock with eachother (but still could deadlock with a
212
// Transaction).
213
Status TransactionDBImpl::Put(const WriteOptions& options,
214
                              ColumnFamilyHandle* column_family,
215
3.08k
                              const Slice& key, const Slice& val) {
216
3.08k
  Status s;
217
218
3.08k
  Transaction* txn = BeginInternalTransaction(options);
219
3.08k
  txn->DisableIndexing();
220
221
  // Since the client didn't create a transaction, they don't care about
222
  // conflict checking for this write.  So we just need to do PutUntracked().
223
3.08k
  s = txn->PutUntracked(column_family, key, val);
224
225
3.08k
  if (s.ok()) {
226
3.07k
    s = txn->Commit();
227
3.07k
  }
228
229
3.08k
  delete txn;
230
231
3.08k
  return s;
232
3.08k
}
233
234
Status TransactionDBImpl::Delete(const WriteOptions& wopts,
235
                                 ColumnFamilyHandle* column_family,
236
10
                                 const Slice& key) {
237
10
  Status s;
238
239
10
  Transaction* txn = BeginInternalTransaction(wopts);
240
10
  txn->DisableIndexing();
241
242
  // Since the client didn't create a transaction, they don't care about
243
  // conflict checking for this write.  So we just need to do
244
  // DeleteUntracked().
245
10
  s = txn->DeleteUntracked(column_family, key);
246
247
10
  if (s.ok()) {
248
1
    s = txn->Commit();
249
1
  }
250
251
10
  delete txn;
252
253
10
  return s;
254
10
}
255
256
Status TransactionDBImpl::Merge(const WriteOptions& options,
257
                                ColumnFamilyHandle* column_family,
258
0
                                const Slice& key, const Slice& value) {
259
0
  Status s;
260
261
0
  Transaction* txn = BeginInternalTransaction(options);
262
0
  txn->DisableIndexing();
263
264
  // Since the client didn't create a transaction, they don't care about
265
  // conflict checking for this write.  So we just need to do
266
  // MergeUntracked().
267
0
  s = txn->MergeUntracked(column_family, key, value);
268
269
0
  if (s.ok()) {
270
0
    s = txn->Commit();
271
0
  }
272
273
0
  delete txn;
274
275
0
  return s;
276
0
}
277
278
3
Status TransactionDBImpl::Write(const WriteOptions& opts, WriteBatch* updates) {
279
  // Need to lock all keys in this batch to prevent write conflicts with
280
  // concurrent transactions.
281
3
  Transaction* txn = BeginInternalTransaction(opts);
282
3
  txn->DisableIndexing();
283
284
3
  assert(dynamic_cast<TransactionImpl*>(txn) != nullptr);
285
3
  auto txn_impl = reinterpret_cast<TransactionImpl*>(txn);
286
287
  // Since commitBatch sorts the keys before locking, concurrent Write()
288
  // operations will not cause a deadlock.
289
  // In order to avoid a deadlock with a concurrent Transaction, Transactions
290
  // should use a lock timeout.
291
3
  Status s = txn_impl->CommitBatch(updates);
292
293
3
  delete txn;
294
295
3
  return s;
296
3
}
297
298
void TransactionDBImpl::InsertExpirableTransaction(TransactionID tx_id,
299
9
                                                   TransactionImpl* tx) {
300
9
  assert(tx->GetExpirationTime() > 0);
301
9
  std::lock_guard<std::mutex> lock(map_mutex_);
302
9
  expirable_transactions_map_.insert({tx_id, tx});
303
9
}
304
305
8
void TransactionDBImpl::RemoveExpirableTransaction(TransactionID tx_id) {
306
8
  std::lock_guard<std::mutex> lock(map_mutex_);
307
8
  expirable_transactions_map_.erase(tx_id);
308
8
}
309
310
bool TransactionDBImpl::TryStealingExpiredTransactionLocks(
311
119
    TransactionID tx_id) {
312
119
  std::lock_guard<std::mutex> lock(map_mutex_);
313
314
119
  auto tx_it = expirable_transactions_map_.find(tx_id);
315
119
  if (tx_it == expirable_transactions_map_.end()) {
316
114
    return true;
317
114
  }
318
5
  TransactionImpl& tx = *(tx_it->second);
319
5
  return tx.TryStealingLocks();
320
5
}
321
322
void TransactionDBImpl::ReinitializeTransaction(
323
    Transaction* txn, const WriteOptions& write_options,
324
6
    const TransactionOptions& txn_options) {
325
6
  assert(dynamic_cast<TransactionImpl*>(txn) != nullptr);
326
6
  auto txn_impl = reinterpret_cast<TransactionImpl*>(txn);
327
328
6
  txn_impl->Reinitialize(this, write_options, txn_options);
329
6
}
330
331
}  //  namespace rocksdb
332
#endif  // ROCKSDB_LITE