/Users/deen/code/yugabyte-db/src/yb/rocksdb/utilities/transactions/transaction_db_impl.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | | // This source code is licensed under the BSD-style license found in the |
3 | | // LICENSE file in the root directory of this source tree. An additional grant |
4 | | // of patent rights can be found in the PATENTS file in the same directory. |
5 | | // |
6 | | // The following only applies to changes made to this file as part of YugaByte development. |
7 | | // |
8 | | // Portions Copyright (c) YugaByte, Inc. |
9 | | // |
10 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
11 | | // in compliance with the License. You may obtain a copy of the License at |
12 | | // |
13 | | // http://www.apache.org/licenses/LICENSE-2.0 |
14 | | // |
15 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
16 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
17 | | // or implied. See the License for the specific language governing permissions and limitations |
18 | | // under the License. |
19 | | // |
20 | | |
21 | | #ifndef ROCKSDB_LITE |
22 | | |
23 | | #include "yb/rocksdb/utilities/transactions/transaction_db_impl.h" |
24 | | |
25 | | #include <string> |
26 | | #include <vector> |
27 | | |
28 | | #include "yb/rocksdb/db.h" |
29 | | #include "yb/rocksdb/options.h" |
30 | | #include "yb/rocksdb/utilities/transaction_db.h" |
31 | | #include "yb/rocksdb/utilities/transactions/transaction_db_mutex_impl.h" |
32 | | #include "yb/rocksdb/utilities/transactions/transaction_impl.h" |
33 | | |
34 | | namespace rocksdb { |
35 | | |
36 | | TransactionDBImpl::TransactionDBImpl(DB* db, |
37 | | const TransactionDBOptions& txn_db_options) |
38 | | : TransactionDB(db), |
39 | | txn_db_options_(txn_db_options), |
40 | | lock_mgr_(this, txn_db_options_.num_stripes, txn_db_options.max_num_locks, |
41 | | txn_db_options_.custom_mutex_factory |
42 | | ? txn_db_options_.custom_mutex_factory |
43 | | : std::shared_ptr<TransactionDBMutexFactory>( |
44 | 47 | new TransactionDBMutexFactoryImpl())) {} |
45 | | |
46 | | Transaction* TransactionDBImpl::BeginTransaction( |
47 | | const WriteOptions& write_options, const TransactionOptions& txn_options, |
48 | 3.18k | Transaction* old_txn) { |
49 | 3.18k | if (old_txn != nullptr) { |
50 | 6 | ReinitializeTransaction(old_txn, write_options, txn_options); |
51 | 6 | return old_txn; |
52 | 3.18k | } else { |
53 | 3.18k | return new TransactionImpl(this, write_options, txn_options); |
54 | 3.18k | } |
55 | 3.18k | } |
56 | | |
57 | | TransactionDBOptions TransactionDBImpl::ValidateTxnDBOptions( |
58 | 47 | const TransactionDBOptions& txn_db_options) { |
59 | 47 | TransactionDBOptions validated = txn_db_options; |
60 | | |
61 | 47 | if (txn_db_options.num_stripes == 0) { |
62 | 0 | validated.num_stripes = 1; |
63 | 0 | } |
64 | | |
65 | 47 | return validated; |
66 | 47 | } |
67 | | |
68 | | Status TransactionDB::Open(const Options& options, |
69 | | const TransactionDBOptions& txn_db_options, |
70 | 45 | const std::string& dbname, TransactionDB** dbptr) { |
71 | 45 | DBOptions db_options(options); |
72 | 45 | ColumnFamilyOptions cf_options(options); |
73 | 45 | std::vector<ColumnFamilyDescriptor> column_families; |
74 | 45 | column_families.push_back( |
75 | 45 | ColumnFamilyDescriptor(kDefaultColumnFamilyName, cf_options)); |
76 | 45 | std::vector<ColumnFamilyHandle*> handles; |
77 | 45 | Status s = TransactionDB::Open(db_options, txn_db_options, dbname, |
78 | 45 | column_families, &handles, dbptr); |
79 | 45 | if (s.ok()) { |
80 | 45 | assert(handles.size() == 1); |
81 | | // i can delete the handle since DBImpl is always holding a reference to |
82 | | // default column family |
83 | 45 | delete handles[0]; |
84 | 45 | } |
85 | | |
86 | 45 | return s; |
87 | 45 | } |
88 | | |
89 | | Status TransactionDB::Open( |
90 | | const DBOptions& db_options, const TransactionDBOptions& txn_db_options, |
91 | | const std::string& dbname, |
92 | | const std::vector<ColumnFamilyDescriptor>& column_families, |
93 | 47 | std::vector<ColumnFamilyHandle*>* handles, TransactionDB** dbptr) { |
94 | 47 | Status s; |
95 | 47 | DB* db; |
96 | | |
97 | 47 | std::vector<ColumnFamilyDescriptor> column_families_copy = column_families; |
98 | 47 | std::vector<size_t> compaction_enabled_cf_indices; |
99 | | |
100 | | // Enable MemTable History if not already enabled |
101 | 98 | for (size_t i = 0; i < column_families_copy.size(); i++) { |
102 | 51 | ColumnFamilyOptions* options = &column_families_copy[i].options; |
103 | | |
104 | 51 | if (options->max_write_buffer_number_to_maintain == 0) { |
105 | | // Setting to -1 will set the History size to max_write_buffer_number. |
106 | 51 | options->max_write_buffer_number_to_maintain = -1; |
107 | 51 | } |
108 | | |
109 | 51 | if (!options->disable_auto_compactions) { |
110 | | // Disable compactions momentarily to prevent race with DB::Open |
111 | 50 | options->disable_auto_compactions = true; |
112 | 50 | compaction_enabled_cf_indices.push_back(i); |
113 | 50 | } |
114 | 51 | } |
115 | | |
116 | 47 | s = DB::Open(db_options, dbname, column_families_copy, handles, &db); |
117 | | |
118 | 47 | if (s.ok()) { |
119 | 47 | TransactionDBImpl* txn_db = new TransactionDBImpl( |
120 | 47 | db, TransactionDBImpl::ValidateTxnDBOptions(txn_db_options)); |
121 | 47 | *dbptr = txn_db; |
122 | | |
123 | 51 | for (auto cf_ptr : *handles) { |
124 | 51 | txn_db->AddColumnFamily(cf_ptr); |
125 | 51 | } |
126 | | |
127 | | // Re-enable compaction for the column families that initially had |
128 | | // compaction enabled. |
129 | 47 | assert(column_families_copy.size() == (*handles).size()); |
130 | 47 | std::vector<ColumnFamilyHandle*> compaction_enabled_cf_handles; |
131 | 47 | compaction_enabled_cf_handles.reserve(compaction_enabled_cf_indices.size()); |
132 | 50 | for (auto index : compaction_enabled_cf_indices) { |
133 | 50 | compaction_enabled_cf_handles.push_back((*handles)[index]); |
134 | 50 | } |
135 | | |
136 | 47 | s = txn_db->EnableAutoCompaction(compaction_enabled_cf_handles); |
137 | 47 | } |
138 | | |
139 | 47 | return s; |
140 | 47 | } |
141 | | |
142 | | // Let TransactionLockMgr know that this column family exists so it can |
143 | | // allocate a LockMap for it. |
144 | 51 | void TransactionDBImpl::AddColumnFamily(const ColumnFamilyHandle* handle) { |
145 | 51 | lock_mgr_.AddColumnFamily(handle->GetID()); |
146 | 51 | } |
147 | | |
148 | | Status TransactionDBImpl::CreateColumnFamily( |
149 | | const ColumnFamilyOptions& options, const std::string& column_family_name, |
150 | 6 | ColumnFamilyHandle** handle) { |
151 | 6 | InstrumentedMutexLock l(&column_family_mutex_); |
152 | | |
153 | 6 | Status s = db_->CreateColumnFamily(options, column_family_name, handle); |
154 | 6 | if (s.ok()) { |
155 | 6 | lock_mgr_.AddColumnFamily((*handle)->GetID()); |
156 | 6 | } |
157 | | |
158 | 6 | return s; |
159 | 6 | } |
160 | | |
161 | | // Let TransactionLockMgr know that it can deallocate the LockMap for this |
162 | | // column family. |
163 | 4 | Status TransactionDBImpl::DropColumnFamily(ColumnFamilyHandle* column_family) { |
164 | 4 | InstrumentedMutexLock l(&column_family_mutex_); |
165 | | |
166 | 4 | Status s = db_->DropColumnFamily(column_family); |
167 | 4 | if (s.ok()) { |
168 | 4 | lock_mgr_.RemoveColumnFamily(column_family->GetID()); |
169 | 4 | } |
170 | | |
171 | 4 | return s; |
172 | 4 | } |
173 | | |
174 | | Status TransactionDBImpl::TryLock(TransactionImpl* txn, uint32_t cfh_id, |
175 | 3.33k | const std::string& key) { |
176 | 3.33k | return lock_mgr_.TryLock(txn, cfh_id, key, GetEnv()); |
177 | 3.33k | } |
178 | | |
179 | | void TransactionDBImpl::UnLock(TransactionImpl* txn, |
180 | 6.37k | const TransactionKeyMap* keys) { |
181 | 6.37k | lock_mgr_.UnLock(txn, keys, GetEnv()); |
182 | 6.37k | } |
183 | | |
184 | | void TransactionDBImpl::UnLock(TransactionImpl* txn, uint32_t cfh_id, |
185 | 31 | const std::string& key) { |
186 | 31 | lock_mgr_.UnLock(txn, cfh_id, key, GetEnv()); |
187 | 31 | } |
188 | | |
189 | | // Used when wrapping DB write operations in a transaction |
190 | | Transaction* TransactionDBImpl::BeginInternalTransaction( |
191 | 3.09k | const WriteOptions& options) { |
192 | 3.09k | TransactionOptions txn_options; |
193 | 3.09k | Transaction* txn = BeginTransaction(options, txn_options, nullptr); |
194 | | |
195 | 3.09k | assert(dynamic_cast<TransactionImpl*>(txn) != nullptr); |
196 | 3.09k | auto txn_impl = reinterpret_cast<TransactionImpl*>(txn); |
197 | | |
198 | | // Use default timeout for non-transactional writes |
199 | 3.09k | txn_impl->SetLockTimeout(txn_db_options_.default_lock_timeout); |
200 | | |
201 | 3.09k | return txn; |
202 | 3.09k | } |
203 | | |
204 | | // All user Put, Merge, Delete, and Write requests must be intercepted to make |
205 | | // sure that they lock all keys that they are writing to avoid causing conflicts |
206 | | // with any concurent transactions. The easiest way to do this is to wrap all |
207 | | // write operations in a transaction. |
208 | | // |
209 | | // Put(), Merge(), and Delete() only lock a single key per call. Write() will |
210 | | // sort its keys before locking them. This guarantees that TransactionDB write |
211 | | // methods cannot deadlock with eachother (but still could deadlock with a |
212 | | // Transaction). |
213 | | Status TransactionDBImpl::Put(const WriteOptions& options, |
214 | | ColumnFamilyHandle* column_family, |
215 | 3.08k | const Slice& key, const Slice& val) { |
216 | 3.08k | Status s; |
217 | | |
218 | 3.08k | Transaction* txn = BeginInternalTransaction(options); |
219 | 3.08k | txn->DisableIndexing(); |
220 | | |
221 | | // Since the client didn't create a transaction, they don't care about |
222 | | // conflict checking for this write. So we just need to do PutUntracked(). |
223 | 3.08k | s = txn->PutUntracked(column_family, key, val); |
224 | | |
225 | 3.08k | if (s.ok()) { |
226 | 3.07k | s = txn->Commit(); |
227 | 3.07k | } |
228 | | |
229 | 3.08k | delete txn; |
230 | | |
231 | 3.08k | return s; |
232 | 3.08k | } |
233 | | |
234 | | Status TransactionDBImpl::Delete(const WriteOptions& wopts, |
235 | | ColumnFamilyHandle* column_family, |
236 | 10 | const Slice& key) { |
237 | 10 | Status s; |
238 | | |
239 | 10 | Transaction* txn = BeginInternalTransaction(wopts); |
240 | 10 | txn->DisableIndexing(); |
241 | | |
242 | | // Since the client didn't create a transaction, they don't care about |
243 | | // conflict checking for this write. So we just need to do |
244 | | // DeleteUntracked(). |
245 | 10 | s = txn->DeleteUntracked(column_family, key); |
246 | | |
247 | 10 | if (s.ok()) { |
248 | 1 | s = txn->Commit(); |
249 | 1 | } |
250 | | |
251 | 10 | delete txn; |
252 | | |
253 | 10 | return s; |
254 | 10 | } |
255 | | |
256 | | Status TransactionDBImpl::Merge(const WriteOptions& options, |
257 | | ColumnFamilyHandle* column_family, |
258 | 0 | const Slice& key, const Slice& value) { |
259 | 0 | Status s; |
260 | |
|
261 | 0 | Transaction* txn = BeginInternalTransaction(options); |
262 | 0 | txn->DisableIndexing(); |
263 | | |
264 | | // Since the client didn't create a transaction, they don't care about |
265 | | // conflict checking for this write. So we just need to do |
266 | | // MergeUntracked(). |
267 | 0 | s = txn->MergeUntracked(column_family, key, value); |
268 | |
|
269 | 0 | if (s.ok()) { |
270 | 0 | s = txn->Commit(); |
271 | 0 | } |
272 | |
|
273 | 0 | delete txn; |
274 | |
|
275 | 0 | return s; |
276 | 0 | } |
277 | | |
278 | 3 | Status TransactionDBImpl::Write(const WriteOptions& opts, WriteBatch* updates) { |
279 | | // Need to lock all keys in this batch to prevent write conflicts with |
280 | | // concurrent transactions. |
281 | 3 | Transaction* txn = BeginInternalTransaction(opts); |
282 | 3 | txn->DisableIndexing(); |
283 | | |
284 | 3 | assert(dynamic_cast<TransactionImpl*>(txn) != nullptr); |
285 | 3 | auto txn_impl = reinterpret_cast<TransactionImpl*>(txn); |
286 | | |
287 | | // Since commitBatch sorts the keys before locking, concurrent Write() |
288 | | // operations will not cause a deadlock. |
289 | | // In order to avoid a deadlock with a concurrent Transaction, Transactions |
290 | | // should use a lock timeout. |
291 | 3 | Status s = txn_impl->CommitBatch(updates); |
292 | | |
293 | 3 | delete txn; |
294 | | |
295 | 3 | return s; |
296 | 3 | } |
297 | | |
298 | | void TransactionDBImpl::InsertExpirableTransaction(TransactionID tx_id, |
299 | 9 | TransactionImpl* tx) { |
300 | 9 | assert(tx->GetExpirationTime() > 0); |
301 | 9 | std::lock_guard<std::mutex> lock(map_mutex_); |
302 | 9 | expirable_transactions_map_.insert({tx_id, tx}); |
303 | 9 | } |
304 | | |
305 | 8 | void TransactionDBImpl::RemoveExpirableTransaction(TransactionID tx_id) { |
306 | 8 | std::lock_guard<std::mutex> lock(map_mutex_); |
307 | 8 | expirable_transactions_map_.erase(tx_id); |
308 | 8 | } |
309 | | |
310 | | bool TransactionDBImpl::TryStealingExpiredTransactionLocks( |
311 | 119 | TransactionID tx_id) { |
312 | 119 | std::lock_guard<std::mutex> lock(map_mutex_); |
313 | | |
314 | 119 | auto tx_it = expirable_transactions_map_.find(tx_id); |
315 | 119 | if (tx_it == expirable_transactions_map_.end()) { |
316 | 114 | return true; |
317 | 114 | } |
318 | 5 | TransactionImpl& tx = *(tx_it->second); |
319 | 5 | return tx.TryStealingLocks(); |
320 | 5 | } |
321 | | |
322 | | void TransactionDBImpl::ReinitializeTransaction( |
323 | | Transaction* txn, const WriteOptions& write_options, |
324 | 6 | const TransactionOptions& txn_options) { |
325 | 6 | assert(dynamic_cast<TransactionImpl*>(txn) != nullptr); |
326 | 6 | auto txn_impl = reinterpret_cast<TransactionImpl*>(txn); |
327 | | |
328 | 6 | txn_impl->Reinitialize(this, write_options, txn_options); |
329 | 6 | } |
330 | | |
331 | | } // namespace rocksdb |
332 | | #endif // ROCKSDB_LITE |