/Users/deen/code/yugabyte-db/src/yb/rocksdb/utilities/transactions/transaction_impl.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | | // This source code is licensed under the BSD-style license found in the |
3 | | // LICENSE file in the root directory of this source tree. An additional grant |
4 | | // of patent rights can be found in the PATENTS file in the same directory. |
5 | | // |
6 | | // The following only applies to changes made to this file as part of YugaByte development. |
7 | | // |
8 | | // Portions Copyright (c) YugaByte, Inc. |
9 | | // |
10 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
11 | | // in compliance with the License. You may obtain a copy of the License at |
12 | | // |
13 | | // http://www.apache.org/licenses/LICENSE-2.0 |
14 | | // |
15 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
16 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
17 | | // or implied. See the License for the specific language governing permissions and limitations |
18 | | // under the License. |
19 | | // |
20 | | |
21 | | #ifndef ROCKSDB_LITE |
22 | | |
23 | | #include "yb/rocksdb/utilities/transactions/transaction_impl.h" |
24 | | |
25 | | #include <map> |
26 | | #include <set> |
27 | | #include <string> |
28 | | #include <vector> |
29 | | |
30 | | #include "yb/rocksdb/comparator.h" |
31 | | #include "yb/rocksdb/db.h" |
32 | | #include "yb/rocksdb/db/db_impl.h" |
33 | | #include "yb/rocksdb/snapshot.h" |
34 | | #include "yb/rocksdb/status.h" |
35 | | #include "yb/rocksdb/util/sync_point.h" |
36 | | #include "yb/rocksdb/utilities/transactions/transaction_db_impl.h" |
37 | | #include "yb/rocksdb/utilities/transactions/transaction_util.h" |
38 | | |
39 | | namespace rocksdb { |
40 | | |
41 | | struct WriteOptions; |
42 | | |
43 | | std::atomic<TransactionID> TransactionImpl::txn_id_counter_(1); |
44 | | |
45 | 3.18k | TransactionID TransactionImpl::GenTxnID() { |
46 | 3.18k | return txn_id_counter_.fetch_add(1); |
47 | 3.18k | } |
48 | | |
49 | | TransactionImpl::TransactionImpl(TransactionDB* txn_db, |
50 | | const WriteOptions& write_options, |
51 | | const TransactionOptions& txn_options) |
52 | | : TransactionBaseImpl(txn_db->GetBaseDB(), write_options), |
53 | | txn_db_impl_(nullptr), |
54 | | txn_id_(0), |
55 | | expiration_time_(0), |
56 | | lock_timeout_(0), |
57 | 3.18k | exec_status_(STARTED) { |
58 | 3.18k | txn_db_impl_ = dynamic_cast<TransactionDBImpl*>(txn_db); |
59 | 3.18k | assert(txn_db_impl_); |
60 | | |
61 | 3.18k | Initialize(txn_options); |
62 | 3.18k | } |
63 | | |
64 | 3.18k | void TransactionImpl::Initialize(const TransactionOptions& txn_options) { |
65 | 3.18k | txn_id_ = GenTxnID(); |
66 | | |
67 | 3.18k | exec_status_ = STARTED; |
68 | | |
69 | 3.18k | lock_timeout_ = txn_options.lock_timeout * 1000; |
70 | 3.18k | if (lock_timeout_ < 0) { |
71 | | // Lock timeout not set, use default |
72 | 3.17k | lock_timeout_ = |
73 | 3.17k | txn_db_impl_->GetTxnDBOptions().transaction_lock_timeout * 1000; |
74 | 3.17k | } |
75 | | |
76 | 3.18k | if (txn_options.expiration >= 0) { |
77 | 9 | expiration_time_ = start_time_ + txn_options.expiration * 1000; |
78 | 3.17k | } else { |
79 | 3.17k | expiration_time_ = 0; |
80 | 3.17k | } |
81 | | |
82 | 3.18k | if (txn_options.set_snapshot) { |
83 | 19 | SetSnapshot(); |
84 | 19 | } |
85 | | |
86 | 3.18k | if (expiration_time_ > 0) { |
87 | 9 | txn_db_impl_->InsertExpirableTransaction(txn_id_, this); |
88 | 9 | } |
89 | 3.18k | } |
90 | | |
91 | 3.18k | TransactionImpl::~TransactionImpl() { |
92 | 3.18k | txn_db_impl_->UnLock(this, &GetTrackedKeys()); |
93 | 3.18k | if (expiration_time_ > 0) { |
94 | 8 | txn_db_impl_->RemoveExpirableTransaction(txn_id_); |
95 | 8 | } |
96 | 3.18k | } |
97 | | |
98 | 3.17k | void TransactionImpl::Clear() { |
99 | 3.17k | txn_db_impl_->UnLock(this, &GetTrackedKeys()); |
100 | 3.17k | TransactionBaseImpl::Clear(); |
101 | 3.17k | } |
102 | | |
103 | | void TransactionImpl::Reinitialize(TransactionDB* txn_db, |
104 | | const WriteOptions& write_options, |
105 | 6 | const TransactionOptions& txn_options) { |
106 | 6 | TransactionBaseImpl::Reinitialize(txn_db->GetBaseDB(), write_options); |
107 | 6 | Initialize(txn_options); |
108 | 6 | } |
109 | | |
110 | 13 | bool TransactionImpl::IsExpired() const { |
111 | 13 | if (expiration_time_ > 0) { |
112 | 13 | if (db_->GetEnv()->NowMicros() >= expiration_time_) { |
113 | | // Transaction is expired. |
114 | 9 | return true; |
115 | 9 | } |
116 | 4 | } |
117 | | |
118 | 4 | return false; |
119 | 4 | } |
120 | | |
121 | 3 | Status TransactionImpl::CommitBatch(WriteBatch* batch) { |
122 | 3 | TransactionKeyMap keys_to_unlock; |
123 | | |
124 | 3 | Status s = LockBatch(batch, &keys_to_unlock); |
125 | | |
126 | 3 | if (s.ok()) { |
127 | 3 | s = DoCommit(batch); |
128 | | |
129 | 3 | txn_db_impl_->UnLock(this, &keys_to_unlock); |
130 | 3 | } |
131 | | |
132 | 3 | return s; |
133 | 3 | } |
134 | | |
135 | 3.15k | Status TransactionImpl::Commit() { |
136 | 3.15k | Status s = DoCommit(GetWriteBatch()->GetWriteBatch()); |
137 | | |
138 | 3.15k | Clear(); |
139 | | |
140 | 3.15k | return s; |
141 | 3.15k | } |
142 | | |
143 | 3.16k | Status TransactionImpl::DoCommit(WriteBatch* batch) { |
144 | 3.16k | Status s; |
145 | | |
146 | 3.16k | if (expiration_time_ > 0) { |
147 | 8 | if (IsExpired()) { |
148 | 4 | return STATUS(Expired, ""); |
149 | 4 | } |
150 | | |
151 | | // Transaction should only be committed if the thread succeeds |
152 | | // changing its execution status to COMMITTING. This is because |
153 | | // A different transaction may consider this one expired and attempt |
154 | | // to steal its locks between the IsExpired() check and the beginning |
155 | | // of a commit. |
156 | 4 | ExecutionStatus expected = STARTED; |
157 | 4 | bool can_commit = std::atomic_compare_exchange_strong( |
158 | 4 | &exec_status_, &expected, COMMITTING); |
159 | | |
160 | 4 | TEST_SYNC_POINT("TransactionTest::ExpirableTransactionDataRace:1"); |
161 | | |
162 | 4 | if (can_commit) { |
163 | 4 | s = db_->Write(write_options_, batch); |
164 | 0 | } else { |
165 | 0 | assert(exec_status_ == LOCKS_STOLEN); |
166 | 0 | return STATUS(Expired, ""); |
167 | 0 | } |
168 | 3.15k | } else { |
169 | 3.15k | s = db_->Write(write_options_, batch); |
170 | 3.15k | } |
171 | | |
172 | 3.15k | return s; |
173 | 3.16k | } |
174 | | |
175 | 8 | void TransactionImpl::Rollback() { Clear(); } |
176 | | |
177 | 15 | Status TransactionImpl::RollbackToSavePoint() { |
178 | | // Unlock any keys locked since last transaction |
179 | 15 | const std::unique_ptr<TransactionKeyMap>& keys = |
180 | 15 | GetTrackedKeysSinceSavePoint(); |
181 | | |
182 | 15 | if (keys) { |
183 | 12 | txn_db_impl_->UnLock(this, keys.get()); |
184 | 12 | } |
185 | | |
186 | 15 | return TransactionBaseImpl::RollbackToSavePoint(); |
187 | 15 | } |
188 | | |
189 | | // Lock all keys in this batch. |
190 | | // On success, caller should unlock keys_to_unlock |
191 | | Status TransactionImpl::LockBatch(WriteBatch* batch, |
192 | 3 | TransactionKeyMap* keys_to_unlock) { |
193 | 3 | class Handler : public WriteBatch::Handler { |
194 | 3 | public: |
195 | | // Sorted map of column_family_id to sorted set of keys. |
196 | | // Since LockBatch() always locks keys in sorted order, it cannot deadlock |
197 | | // with itself. We're not using a comparator here since it doesn't matter |
198 | | // what the sorting is as long as it's consistent. |
199 | 3 | std::map<uint32_t, std::set<std::string>> keys_; |
200 | | |
201 | 3 | Handler() {} |
202 | | |
203 | 3 | void RecordKey(uint32_t column_family_id, const Slice& key) { |
204 | 3 | std::string key_str = key.ToString(); |
205 | | |
206 | 3 | auto iter = (keys_)[column_family_id].find(key_str); |
207 | 3 | if (iter == (keys_)[column_family_id].end()) { |
208 | | // key not yet seen, store it. |
209 | 3 | (keys_)[column_family_id].insert({std::move(key_str)}); |
210 | 3 | } |
211 | 3 | } |
212 | | |
213 | 3 | virtual Status PutCF(uint32_t column_family_id, const SliceParts& key, |
214 | 3 | const SliceParts& value) override { |
215 | 3 | RecordKey(column_family_id, key.TheOnlyPart()); |
216 | 3 | return Status::OK(); |
217 | 3 | } |
218 | 3 | virtual Status MergeCF(uint32_t column_family_id, const Slice& key, |
219 | 0 | const Slice& value) override { |
220 | 0 | RecordKey(column_family_id, key); |
221 | 0 | return Status::OK(); |
222 | 0 | } |
223 | 3 | virtual Status DeleteCF(uint32_t column_family_id, |
224 | 0 | const Slice& key) override { |
225 | 0 | RecordKey(column_family_id, key); |
226 | 0 | return Status::OK(); |
227 | 0 | } |
228 | 3 | }; |
229 | | |
230 | | // Iterating on this handler will add all keys in this batch into keys |
231 | 3 | Handler handler; |
232 | 3 | RETURN_NOT_OK(batch->Iterate(&handler)); |
233 | | |
234 | 3 | Status s; |
235 | | |
236 | | // Attempt to lock all keys |
237 | 2 | for (const auto& cf_iter : handler.keys_) { |
238 | 2 | uint32_t cfh_id = cf_iter.first; |
239 | 2 | auto& cfh_keys = cf_iter.second; |
240 | | |
241 | 3 | for (const auto& key_iter : cfh_keys) { |
242 | 3 | const std::string& key = key_iter; |
243 | | |
244 | 3 | s = txn_db_impl_->TryLock(this, cfh_id, key); |
245 | 3 | if (!s.ok()) { |
246 | 0 | break; |
247 | 0 | } |
248 | 3 | TrackKey(keys_to_unlock, cfh_id, std::move(key), kMaxSequenceNumber, |
249 | 3 | false); |
250 | 3 | } |
251 | | |
252 | 2 | if (!s.ok()) { |
253 | 0 | break; |
254 | 0 | } |
255 | 2 | } |
256 | | |
257 | 3 | if (!s.ok()) { |
258 | 0 | txn_db_impl_->UnLock(this, keys_to_unlock); |
259 | 0 | } |
260 | | |
261 | 3 | return s; |
262 | 3 | } |
263 | | |
264 | | // Attempt to lock this key. |
265 | | // Returns OK if the key has been successfully locked. Non-ok, otherwise. |
266 | | // If check_shapshot is true and this transaction has a snapshot set, |
267 | | // this key will only be locked if there have been no writes to this key since |
268 | | // the snapshot time. |
269 | | Status TransactionImpl::TryLock(ColumnFamilyHandle* column_family, |
270 | | const Slice& key, bool read_only, |
271 | 3.41k | bool untracked) { |
272 | 3.41k | uint32_t cfh_id = GetColumnFamilyID(column_family); |
273 | 3.41k | std::string key_str = key.ToString(); |
274 | 3.41k | bool previously_locked; |
275 | 3.41k | Status s; |
276 | | |
277 | | // lock this key if this transactions hasn't already locked it |
278 | 3.41k | SequenceNumber current_seqno = kMaxSequenceNumber; |
279 | 3.41k | SequenceNumber new_seqno = kMaxSequenceNumber; |
280 | | |
281 | 3.41k | const auto& tracked_keys = GetTrackedKeys(); |
282 | 3.41k | const auto tracked_keys_cf = tracked_keys.find(cfh_id); |
283 | 3.41k | if (tracked_keys_cf == tracked_keys.end()) { |
284 | 3.21k | previously_locked = false; |
285 | 197 | } else { |
286 | 197 | auto iter = tracked_keys_cf->second.find(key_str); |
287 | 197 | if (iter == tracked_keys_cf->second.end()) { |
288 | 112 | previously_locked = false; |
289 | 85 | } else { |
290 | 85 | previously_locked = true; |
291 | 85 | current_seqno = iter->second.seq; |
292 | 85 | } |
293 | 197 | } |
294 | | |
295 | | // lock this key if this transactions hasn't already locked it |
296 | 3.41k | if (!previously_locked) { |
297 | 3.33k | s = txn_db_impl_->TryLock(this, cfh_id, key_str); |
298 | 3.33k | } |
299 | | |
300 | 3.41k | SetSnapshotIfNeeded(); |
301 | | |
302 | | // Even though we do not care about doing conflict checking for this write, |
303 | | // we still need to take a lock to make sure we do not cause a conflict with |
304 | | // some other write. However, we do not need to check if there have been |
305 | | // any writes since this transaction's snapshot. |
306 | | // TODO(agiardullo): could optimize by supporting shared txn locks in the |
307 | | // future |
308 | 3.41k | if (untracked || snapshot_ == nullptr) { |
309 | | // Need to remember the earliest sequence number that we know that this |
310 | | // key has not been modified after. This is useful if this same |
311 | | // transaction |
312 | | // later tries to lock this key again. |
313 | 3.30k | if (current_seqno == kMaxSequenceNumber) { |
314 | | // Since we haven't checked a snapshot, we only know this key has not |
315 | | // been modified since after we locked it. |
316 | 3.25k | new_seqno = db_->GetLatestSequenceNumber(); |
317 | 58 | } else { |
318 | 58 | new_seqno = current_seqno; |
319 | 58 | } |
320 | 106 | } else { |
321 | | // If a snapshot is set, we need to make sure the key hasn't been modified |
322 | | // since the snapshot. This must be done after we locked the key. |
323 | 106 | if (s.ok()) { |
324 | 102 | s = ValidateSnapshot(column_family, key, current_seqno, &new_seqno); |
325 | | |
326 | 102 | if (!s.ok()) { |
327 | | // Failed to validate key |
328 | 25 | if (!previously_locked) { |
329 | | // Unlock key we just locked |
330 | 24 | txn_db_impl_->UnLock(this, cfh_id, key.ToString()); |
331 | 24 | } |
332 | 25 | } |
333 | 102 | } |
334 | 106 | } |
335 | | |
336 | 3.41k | if (s.ok()) { |
337 | | // Let base class know we've conflict checked this key. |
338 | 3.31k | TrackKey(cfh_id, key_str, new_seqno, read_only); |
339 | 3.31k | } |
340 | | |
341 | 3.41k | return s; |
342 | 3.41k | } |
343 | | |
344 | | // Return OK() if this key has not been modified more recently than the |
345 | | // transaction snapshot_. |
346 | | Status TransactionImpl::ValidateSnapshot(ColumnFamilyHandle* column_family, |
347 | | const Slice& key, |
348 | | SequenceNumber prev_seqno, |
349 | 102 | SequenceNumber* new_seqno) { |
350 | 102 | assert(snapshot_); |
351 | | |
352 | 102 | SequenceNumber seq = snapshot_->GetSequenceNumber(); |
353 | 102 | if (prev_seqno <= seq) { |
354 | | // If the key has been previous validated at a sequence number earlier |
355 | | // than the curent snapshot's sequence number, we already know it has not |
356 | | // been modified. |
357 | 26 | return Status::OK(); |
358 | 26 | } |
359 | | |
360 | 76 | *new_seqno = seq; |
361 | | |
362 | 76 | assert(dynamic_cast<DBImpl*>(db_) != nullptr); |
363 | 76 | auto db_impl = reinterpret_cast<DBImpl*>(db_); |
364 | | |
365 | 76 | ColumnFamilyHandle* cfh = |
366 | 41 | column_family ? column_family : db_impl->DefaultColumnFamily(); |
367 | | |
368 | 76 | return TransactionUtil::CheckKeyForConflicts(db_impl, cfh, key.ToString(), |
369 | 76 | snapshot_->GetSequenceNumber(), |
370 | 76 | false /* cache_only */); |
371 | 76 | } |
372 | | |
373 | 5 | bool TransactionImpl::TryStealingLocks() { |
374 | 5 | assert(IsExpired()); |
375 | 5 | ExecutionStatus expected = STARTED; |
376 | 5 | return std::atomic_compare_exchange_strong(&exec_status_, &expected, |
377 | 5 | LOCKS_STOLEN); |
378 | 5 | } |
379 | | |
380 | | void TransactionImpl::UnlockGetForUpdate(ColumnFamilyHandle* column_family, |
381 | 7 | const Slice& key) { |
382 | 7 | txn_db_impl_->UnLock(this, GetColumnFamilyID(column_family), key.ToString()); |
383 | 7 | } |
384 | | |
385 | | } // namespace rocksdb |
386 | | |
387 | | #endif // ROCKSDB_LITE |