/Users/deen/code/yugabyte-db/src/yb/rocksdb/utilities/transactions/transaction_base.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | | // This source code is licensed under the BSD-style license found in the |
3 | | // LICENSE file in the root directory of this source tree. An additional grant |
4 | | // of patent rights can be found in the PATENTS file in the same directory. |
5 | | // |
6 | | // The following only applies to changes made to this file as part of YugaByte development. |
7 | | // |
8 | | // Portions Copyright (c) YugaByte, Inc. |
9 | | // |
10 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
11 | | // in compliance with the License. You may obtain a copy of the License at |
12 | | // |
13 | | // http://www.apache.org/licenses/LICENSE-2.0 |
14 | | // |
15 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
16 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
17 | | // or implied. See the License for the specific language governing permissions and limitations |
18 | | // under the License. |
19 | | // |
20 | | |
21 | | #ifndef ROCKSDB_LITE |
22 | | |
23 | | #include "yb/rocksdb/utilities/transactions/transaction_base.h" |
24 | | |
25 | | #include "yb/rocksdb/comparator.h" |
26 | | #include "yb/rocksdb/db.h" |
27 | | #include "yb/rocksdb/db/db_impl.h" |
28 | | #include "yb/rocksdb/status.h" |
29 | | |
30 | | namespace rocksdb { |
31 | | |
32 | | TransactionBaseImpl::TransactionBaseImpl(DB* db, |
33 | | const WriteOptions& write_options) |
34 | | : db_(db), |
35 | | write_options_(write_options), |
36 | | cmp_(GetColumnFamilyUserComparator(db->DefaultColumnFamily())), |
37 | | start_time_(db_->GetEnv()->NowMicros()), |
38 | | write_batch_(cmp_, 0, true), |
39 | 3.23k | indexing_enabled_(true) {} |
40 | | |
41 | 3.23k | TransactionBaseImpl::~TransactionBaseImpl() { |
42 | | // Release snapshot if snapshot is set |
43 | 3.23k | SetSnapshotInternal(nullptr); |
44 | 3.23k | } |
45 | | |
46 | 3.21k | void TransactionBaseImpl::Clear() { |
47 | 3.21k | save_points_.reset(nullptr); |
48 | 3.21k | write_batch_.Clear(); |
49 | 3.21k | tracked_keys_.clear(); |
50 | 3.21k | num_puts_ = 0; |
51 | 3.21k | num_deletes_ = 0; |
52 | 3.21k | num_merges_ = 0; |
53 | 3.21k | } |
54 | | |
55 | | void TransactionBaseImpl::Reinitialize(DB* db, |
56 | 12 | const WriteOptions& write_options) { |
57 | 12 | Clear(); |
58 | 12 | ClearSnapshot(); |
59 | 12 | db_ = db; |
60 | 12 | write_options_ = write_options; |
61 | 12 | start_time_ = db_->GetEnv()->NowMicros(); |
62 | 12 | indexing_enabled_ = true; |
63 | 12 | cmp_ = GetColumnFamilyUserComparator(db_->DefaultColumnFamily()); |
64 | 12 | } |
65 | | |
66 | 71 | void TransactionBaseImpl::SetSnapshot() { |
67 | 71 | assert(dynamic_cast<DBImpl*>(db_) != nullptr); |
68 | 71 | auto db_impl = reinterpret_cast<DBImpl*>(db_); |
69 | | |
70 | 71 | const Snapshot* snapshot = db_impl->GetSnapshotForWriteConflictBoundary(); |
71 | | |
72 | 71 | SetSnapshotInternal(snapshot); |
73 | 71 | } |
74 | | |
75 | 3.30k | void TransactionBaseImpl::SetSnapshotInternal(const Snapshot* snapshot) { |
76 | | // Set a custom deleter for the snapshot_ SharedPtr as the snapshot needs to |
77 | | // be released, not deleted when it is no longer referenced. |
78 | 3.30k | snapshot_.reset(snapshot, std::bind(&TransactionBaseImpl::ReleaseSnapshot, |
79 | 3.30k | this, std::placeholders::_1, db_)); |
80 | 3.30k | snapshot_needed_ = false; |
81 | 3.30k | snapshot_notifier_ = nullptr; |
82 | 3.30k | } |
83 | | |
84 | | void TransactionBaseImpl::SetSnapshotOnNextOperation( |
85 | 5 | std::shared_ptr<TransactionNotifier> notifier) { |
86 | 5 | snapshot_needed_ = true; |
87 | 5 | snapshot_notifier_ = notifier; |
88 | 5 | } |
89 | | |
90 | 3.53k | void TransactionBaseImpl::SetSnapshotIfNeeded() { |
91 | 3.53k | if (snapshot_needed_) { |
92 | 6 | std::shared_ptr<TransactionNotifier> notifier = snapshot_notifier_; |
93 | 6 | SetSnapshot(); |
94 | 6 | if (notifier != nullptr) { |
95 | 1 | notifier->SnapshotCreated(GetSnapshot()); |
96 | 1 | } |
97 | 6 | } |
98 | 3.53k | } |
99 | | |
100 | | Status TransactionBaseImpl::TryLock(ColumnFamilyHandle* column_family, |
101 | | const SliceParts& key, bool read_only, |
102 | 4 | bool untracked) { |
103 | 4 | size_t key_size = 0; |
104 | 12 | for (int i = 0; i < key.num_parts; ++i) { |
105 | 8 | key_size += key.parts[i].size(); |
106 | 8 | } |
107 | | |
108 | 4 | std::string str; |
109 | 4 | str.reserve(key_size); |
110 | | |
111 | 12 | for (int i = 0; i < key.num_parts; ++i) { |
112 | 8 | str.append(key.parts[i].cdata(), key.parts[i].size()); |
113 | 8 | } |
114 | | |
115 | 4 | return TryLock(column_family, str, read_only, untracked); |
116 | 4 | } |
117 | | |
118 | 20 | void TransactionBaseImpl::SetSavePoint() { |
119 | 20 | if (save_points_ == nullptr) { |
120 | 12 | save_points_.reset(new std::stack<TransactionBaseImpl::SavePoint>()); |
121 | 12 | } |
122 | 20 | save_points_->emplace(snapshot_, snapshot_needed_, snapshot_notifier_, |
123 | 20 | num_puts_, num_deletes_, num_merges_); |
124 | 20 | write_batch_.SetSavePoint(); |
125 | 20 | } |
126 | | |
127 | 22 | Status TransactionBaseImpl::RollbackToSavePoint() { |
128 | 22 | if (save_points_ != nullptr && save_points_->size() > 0) { |
129 | | // Restore saved SavePoint |
130 | 16 | TransactionBaseImpl::SavePoint& save_point = save_points_->top(); |
131 | 16 | snapshot_ = save_point.snapshot_; |
132 | 16 | snapshot_needed_ = save_point.snapshot_needed_; |
133 | 16 | snapshot_notifier_ = save_point.snapshot_notifier_; |
134 | 16 | num_puts_ = save_point.num_puts_; |
135 | 16 | num_deletes_ = save_point.num_deletes_; |
136 | 16 | num_merges_ = save_point.num_merges_; |
137 | | |
138 | | // Rollback batch |
139 | 16 | Status s = write_batch_.RollbackToSavePoint(); |
140 | 16 | assert(s.ok()); |
141 | | |
142 | | // Rollback any keys that were tracked since the last savepoint |
143 | 16 | const TransactionKeyMap& key_map = save_point.new_keys_; |
144 | 12 | for (const auto& key_map_iter : key_map) { |
145 | 12 | uint32_t column_family_id = key_map_iter.first; |
146 | 12 | auto& keys = key_map_iter.second; |
147 | | |
148 | 12 | auto& cf_tracked_keys = tracked_keys_[column_family_id]; |
149 | | |
150 | 22 | for (const auto& key_iter : keys) { |
151 | 22 | const std::string& key = key_iter.first; |
152 | 22 | uint32_t num_reads = key_iter.second.num_reads; |
153 | 22 | uint32_t num_writes = key_iter.second.num_writes; |
154 | | |
155 | 22 | auto tracked_keys_iter = cf_tracked_keys.find(key); |
156 | 22 | assert(tracked_keys_iter != cf_tracked_keys.end()); |
157 | | |
158 | | // Decrement the total reads/writes of this key by the number of |
159 | | // reads/writes done since the last SavePoint. |
160 | 22 | if (num_reads > 0) { |
161 | 0 | assert(tracked_keys_iter->second.num_reads >= num_reads); |
162 | 0 | tracked_keys_iter->second.num_reads -= num_reads; |
163 | 0 | } |
164 | 22 | if (num_writes > 0) { |
165 | 22 | assert(tracked_keys_iter->second.num_writes >= num_writes); |
166 | 22 | tracked_keys_iter->second.num_writes -= num_writes; |
167 | 22 | } |
168 | 22 | if (tracked_keys_iter->second.num_reads == 0 && |
169 | 22 | tracked_keys_iter->second.num_writes == 0) { |
170 | 13 | tracked_keys_[column_family_id].erase(tracked_keys_iter); |
171 | 13 | } |
172 | 22 | } |
173 | 12 | } |
174 | | |
175 | 16 | save_points_->pop(); |
176 | | |
177 | 16 | return s; |
178 | 6 | } else { |
179 | 6 | assert(write_batch_.RollbackToSavePoint().IsNotFound()); |
180 | 6 | return STATUS(NotFound, ""); |
181 | 6 | } |
182 | 22 | } |
183 | | |
184 | | Status TransactionBaseImpl::Get(const ReadOptions& read_options, |
185 | | ColumnFamilyHandle* column_family, |
186 | 164 | const Slice& key, std::string* value) { |
187 | 164 | return write_batch_.GetFromBatchAndDB(db_, read_options, column_family, key, |
188 | 164 | value); |
189 | 164 | } |
190 | | |
191 | | Status TransactionBaseImpl::GetForUpdate(const ReadOptions& read_options, |
192 | | ColumnFamilyHandle* column_family, |
193 | 99 | const Slice& key, std::string* value) { |
194 | 99 | Status s = TryLock(column_family, key, true /* read_only */); |
195 | | |
196 | 99 | if (s.ok() && value != nullptr) { |
197 | 80 | s = Get(read_options, column_family, key, value); |
198 | 80 | } |
199 | 99 | return s; |
200 | 99 | } |
201 | | |
202 | | std::vector<Status> TransactionBaseImpl::MultiGet( |
203 | | const ReadOptions& read_options, |
204 | | const std::vector<ColumnFamilyHandle*>& column_family, |
205 | 0 | const std::vector<Slice>& keys, std::vector<std::string>* values) { |
206 | 0 | size_t num_keys = keys.size(); |
207 | 0 | values->resize(num_keys); |
208 | |
|
209 | 0 | std::vector<Status> stat_list(num_keys); |
210 | 0 | for (size_t i = 0; i < num_keys; ++i) { |
211 | 0 | std::string* value = values ? &(*values)[i] : nullptr; |
212 | 0 | stat_list[i] = Get(read_options, column_family[i], keys[i], value); |
213 | 0 | } |
214 | |
|
215 | 0 | return stat_list; |
216 | 0 | } |
217 | | |
218 | | std::vector<Status> TransactionBaseImpl::MultiGetForUpdate( |
219 | | const ReadOptions& read_options, |
220 | | const std::vector<ColumnFamilyHandle*>& column_family, |
221 | 8 | const std::vector<Slice>& keys, std::vector<std::string>* values) { |
222 | | // Regardless of whether the MultiGet succeeded, track these keys. |
223 | 8 | size_t num_keys = keys.size(); |
224 | 8 | values->resize(num_keys); |
225 | | |
226 | | // Lock all keys |
227 | 33 | for (size_t i = 0; i < num_keys; ++i) { |
228 | 26 | Status s = TryLock(column_family[i], keys[i], true /* read_only */); |
229 | 26 | if (!s.ok()) { |
230 | | // Fail entire multiget if we cannot lock all keys |
231 | 1 | return std::vector<Status>(num_keys, s); |
232 | 1 | } |
233 | 26 | } |
234 | | |
235 | | // TODO(agiardullo): optimize multiget? |
236 | 7 | std::vector<Status> stat_list(num_keys); |
237 | 31 | for (size_t i = 0; i < num_keys; ++i) { |
238 | 24 | std::string* value = values ? &(*values)[i] : nullptr; |
239 | 24 | stat_list[i] = Get(read_options, column_family[i], keys[i], value); |
240 | 24 | } |
241 | | |
242 | 7 | return stat_list; |
243 | 8 | } |
244 | | |
245 | 3 | Iterator* TransactionBaseImpl::GetIterator(const ReadOptions& read_options) { |
246 | 3 | Iterator* db_iter = db_->NewIterator(read_options); |
247 | 3 | assert(db_iter); |
248 | | |
249 | 3 | return write_batch_.NewIteratorWithBase(db_iter); |
250 | 3 | } |
251 | | |
252 | | Iterator* TransactionBaseImpl::GetIterator(const ReadOptions& read_options, |
253 | 0 | ColumnFamilyHandle* column_family) { |
254 | 0 | Iterator* db_iter = db_->NewIterator(read_options, column_family); |
255 | 0 | assert(db_iter); |
256 | |
|
257 | 0 | return write_batch_.NewIteratorWithBase(column_family, db_iter); |
258 | 0 | } |
259 | | |
260 | | Status TransactionBaseImpl::Put(ColumnFamilyHandle* column_family, |
261 | 263 | const Slice& key, const Slice& value) { |
262 | 263 | Status s = TryLock(column_family, key, false /* read_only */); |
263 | | |
264 | 263 | if (s.ok()) { |
265 | 197 | GetBatchForWrite()->Put(column_family, key, value); |
266 | 197 | num_puts_++; |
267 | 197 | } |
268 | | |
269 | 263 | return s; |
270 | 263 | } |
271 | | |
272 | | Status TransactionBaseImpl::Put(ColumnFamilyHandle* column_family, |
273 | | const SliceParts& key, |
274 | 4 | const SliceParts& value) { |
275 | 4 | Status s = TryLock(column_family, key, false /* read_only */); |
276 | | |
277 | 4 | if (s.ok()) { |
278 | 3 | GetBatchForWrite()->Put(column_family, key, value); |
279 | 3 | num_puts_++; |
280 | 3 | } |
281 | | |
282 | 4 | return s; |
283 | 4 | } |
284 | | |
285 | | Status TransactionBaseImpl::Merge(ColumnFamilyHandle* column_family, |
286 | 4 | const Slice& key, const Slice& value) { |
287 | 4 | Status s = TryLock(column_family, key, false /* read_only */); |
288 | | |
289 | 4 | if (s.ok()) { |
290 | 3 | GetBatchForWrite()->Merge(column_family, key, value); |
291 | 3 | num_merges_++; |
292 | 3 | } |
293 | | |
294 | 4 | return s; |
295 | 4 | } |
296 | | |
297 | | Status TransactionBaseImpl::Delete(ColumnFamilyHandle* column_family, |
298 | 39 | const Slice& key) { |
299 | 39 | Status s = TryLock(column_family, key, false /* read_only */); |
300 | | |
301 | 39 | if (s.ok()) { |
302 | 24 | GetBatchForWrite()->Delete(column_family, key); |
303 | 24 | num_deletes_++; |
304 | 24 | } |
305 | | |
306 | 39 | return s; |
307 | 39 | } |
308 | | |
309 | | Status TransactionBaseImpl::Delete(ColumnFamilyHandle* column_family, |
310 | 0 | const SliceParts& key) { |
311 | 0 | Status s = TryLock(column_family, key, false /* read_only */); |
312 | |
|
313 | 0 | if (s.ok()) { |
314 | 0 | GetBatchForWrite()->Delete(column_family, key); |
315 | 0 | num_deletes_++; |
316 | 0 | } |
317 | |
|
318 | 0 | return s; |
319 | 0 | } |
320 | | |
321 | | Status TransactionBaseImpl::SingleDelete(ColumnFamilyHandle* column_family, |
322 | 7 | const Slice& key) { |
323 | 7 | Status s = TryLock(column_family, key, false /* read_only */); |
324 | | |
325 | 7 | if (s.ok()) { |
326 | 7 | GetBatchForWrite()->SingleDelete(column_family, key); |
327 | 7 | num_deletes_++; |
328 | 7 | } |
329 | | |
330 | 7 | return s; |
331 | 7 | } |
332 | | |
333 | | Status TransactionBaseImpl::SingleDelete(ColumnFamilyHandle* column_family, |
334 | 0 | const SliceParts& key) { |
335 | 0 | Status s = TryLock(column_family, key, false /* read_only */); |
336 | |
|
337 | 0 | if (s.ok()) { |
338 | 0 | GetBatchForWrite()->SingleDelete(column_family, key); |
339 | 0 | num_deletes_++; |
340 | 0 | } |
341 | |
|
342 | 0 | return s; |
343 | 0 | } |
344 | | |
345 | | Status TransactionBaseImpl::PutUntracked(ColumnFamilyHandle* column_family, |
346 | 3.08k | const Slice& key, const Slice& value) { |
347 | 3.08k | Status s = |
348 | 3.08k | TryLock(column_family, key, false /* read_only */, true /* untracked */); |
349 | | |
350 | 3.08k | if (s.ok()) { |
351 | 3.08k | GetBatchForWrite()->Put(column_family, key, value); |
352 | 3.08k | num_puts_++; |
353 | 3.08k | } |
354 | | |
355 | 3.08k | return s; |
356 | 3.08k | } |
357 | | |
358 | | Status TransactionBaseImpl::PutUntracked(ColumnFamilyHandle* column_family, |
359 | | const SliceParts& key, |
360 | 0 | const SliceParts& value) { |
361 | 0 | Status s = |
362 | 0 | TryLock(column_family, key, false /* read_only */, true /* untracked */); |
363 | |
|
364 | 0 | if (s.ok()) { |
365 | 0 | GetBatchForWrite()->Put(column_family, key, value); |
366 | 0 | num_puts_++; |
367 | 0 | } |
368 | |
|
369 | 0 | return s; |
370 | 0 | } |
371 | | |
372 | | Status TransactionBaseImpl::MergeUntracked(ColumnFamilyHandle* column_family, |
373 | | const Slice& key, |
374 | 2 | const Slice& value) { |
375 | 2 | Status s = |
376 | 2 | TryLock(column_family, key, false /* read_only */, true /* untracked */); |
377 | | |
378 | 2 | if (s.ok()) { |
379 | 2 | GetBatchForWrite()->Merge(column_family, key, value); |
380 | 2 | num_merges_++; |
381 | 2 | } |
382 | | |
383 | 2 | return s; |
384 | 2 | } |
385 | | |
386 | | Status TransactionBaseImpl::DeleteUntracked(ColumnFamilyHandle* column_family, |
387 | 12 | const Slice& key) { |
388 | 12 | Status s = |
389 | 12 | TryLock(column_family, key, false /* read_only */, true /* untracked */); |
390 | | |
391 | 12 | if (s.ok()) { |
392 | 3 | GetBatchForWrite()->Delete(column_family, key); |
393 | 3 | num_deletes_++; |
394 | 3 | } |
395 | | |
396 | 12 | return s; |
397 | 12 | } |
398 | | |
399 | | Status TransactionBaseImpl::DeleteUntracked(ColumnFamilyHandle* column_family, |
400 | 0 | const SliceParts& key) { |
401 | 0 | Status s = |
402 | 0 | TryLock(column_family, key, false /* read_only */, true /* untracked */); |
403 | |
|
404 | 0 | if (s.ok()) { |
405 | 0 | GetBatchForWrite()->Delete(column_family, key); |
406 | 0 | num_deletes_++; |
407 | 0 | } |
408 | |
|
409 | 0 | return s; |
410 | 0 | } |
411 | | |
412 | 0 | void TransactionBaseImpl::PutLogData(const Slice& blob) { |
413 | 0 | write_batch_.PutLogData(blob); |
414 | 0 | } |
415 | | |
416 | 3.21k | WriteBatchWithIndex* TransactionBaseImpl::GetWriteBatch() { |
417 | 3.21k | return &write_batch_; |
418 | 3.21k | } |
419 | | |
420 | 1 | uint64_t TransactionBaseImpl::GetElapsedTime() const { |
421 | 1 | return (db_->GetEnv()->NowMicros() - start_time_) / 1000; |
422 | 1 | } |
423 | | |
424 | 11 | uint64_t TransactionBaseImpl::GetNumPuts() const { return num_puts_; } |
425 | | |
426 | 8 | uint64_t TransactionBaseImpl::GetNumDeletes() const { return num_deletes_; } |
427 | | |
428 | 0 | uint64_t TransactionBaseImpl::GetNumMerges() const { return num_merges_; } |
429 | | |
430 | 6 | uint64_t TransactionBaseImpl::GetNumKeys() const { |
431 | 6 | uint64_t count = 0; |
432 | | |
433 | | // sum up locked keys in all column families |
434 | 14 | for (const auto& key_map_iter : tracked_keys_) { |
435 | 14 | const auto& keys = key_map_iter.second; |
436 | 14 | count += keys.size(); |
437 | 14 | } |
438 | | |
439 | 6 | return count; |
440 | 6 | } |
441 | | |
442 | | void TransactionBaseImpl::TrackKey(uint32_t cfh_id, const std::string& key, |
443 | 3.43k | SequenceNumber seq, bool read_only) { |
444 | | // Update map of all tracked keys for this transaction |
445 | 3.43k | TrackKey(&tracked_keys_, cfh_id, key, seq, read_only); |
446 | | |
447 | 3.43k | if (save_points_ != nullptr && !save_points_->empty()) { |
448 | | // Update map of tracked keys in this SavePoint |
449 | 31 | TrackKey(&save_points_->top().new_keys_, cfh_id, key, seq, read_only); |
450 | 31 | } |
451 | 3.43k | } |
452 | | |
453 | | // Add a key to the given TransactionKeyMap |
454 | | void TransactionBaseImpl::TrackKey(TransactionKeyMap* key_map, uint32_t cfh_id, |
455 | | const std::string& key, SequenceNumber seq, |
456 | 3.48k | bool read_only) { |
457 | 3.48k | auto& cf_key_map = (*key_map)[cfh_id]; |
458 | 3.48k | auto iter = cf_key_map.find(key); |
459 | 3.48k | if (iter == cf_key_map.end()) { |
460 | 3.36k | auto result = cf_key_map.insert({key, TransactionKeyMapInfo(seq)}); |
461 | 3.36k | iter = result.first; |
462 | 122 | } else if (seq < iter->second.seq) { |
463 | | // Now tracking this key with an earlier sequence number |
464 | 0 | iter->second.seq = seq; |
465 | 0 | } |
466 | | |
467 | 3.48k | if (read_only) { |
468 | 126 | iter->second.num_reads++; |
469 | 3.35k | } else { |
470 | 3.35k | iter->second.num_writes++; |
471 | 3.35k | } |
472 | 3.48k | } |
473 | | |
474 | | std::unique_ptr<TransactionKeyMap> |
475 | 15 | TransactionBaseImpl::GetTrackedKeysSinceSavePoint() { |
476 | 15 | if (save_points_ != nullptr && !save_points_->empty()) { |
477 | | // Examine the number of reads/writes performed on all keys written |
478 | | // since the last SavePoint and compare to the total number of reads/writes |
479 | | // for each key. |
480 | 12 | TransactionKeyMap* result = new TransactionKeyMap(); |
481 | 9 | for (const auto& key_map_iter : save_points_->top().new_keys_) { |
482 | 9 | uint32_t column_family_id = key_map_iter.first; |
483 | 9 | auto& keys = key_map_iter.second; |
484 | | |
485 | 9 | auto& cf_tracked_keys = tracked_keys_[column_family_id]; |
486 | | |
487 | 16 | for (const auto& key_iter : keys) { |
488 | 16 | const std::string& key = key_iter.first; |
489 | 16 | uint32_t num_reads = key_iter.second.num_reads; |
490 | 16 | uint32_t num_writes = key_iter.second.num_writes; |
491 | | |
492 | 16 | auto total_key_info = cf_tracked_keys.find(key); |
493 | 16 | assert(total_key_info != cf_tracked_keys.end()); |
494 | 16 | assert(total_key_info->second.num_reads >= num_reads); |
495 | 16 | assert(total_key_info->second.num_writes >= num_writes); |
496 | | |
497 | 16 | if (total_key_info->second.num_reads == num_reads && |
498 | 16 | total_key_info->second.num_writes == num_writes) { |
499 | | // All the reads/writes to this key were done in the last savepoint. |
500 | 10 | bool read_only = (num_writes == 0); |
501 | 10 | TrackKey(result, column_family_id, key, key_iter.second.seq, |
502 | 10 | read_only); |
503 | 10 | } |
504 | 16 | } |
505 | 9 | } |
506 | 12 | return std::unique_ptr<TransactionKeyMap>(result); |
507 | 12 | } |
508 | | |
509 | | // No SavePoint |
510 | 3 | return nullptr; |
511 | 3 | } |
512 | | |
513 | | // Gets the write batch that should be used for Put/Merge/Deletes. |
514 | | // |
515 | | // Returns either a WriteBatch or WriteBatchWithIndex depending on whether |
516 | | // DisableIndexing() has been called. |
517 | 3.32k | WriteBatchBase* TransactionBaseImpl::GetBatchForWrite() { |
518 | 3.32k | if (indexing_enabled_) { |
519 | | // Use WriteBatchWithIndex |
520 | 241 | return &write_batch_; |
521 | 3.08k | } else { |
522 | | // Don't use WriteBatchWithIndex. Return base WriteBatch. |
523 | 3.08k | return write_batch_.GetWriteBatch(); |
524 | 3.08k | } |
525 | 3.32k | } |
526 | | |
527 | 3.30k | void TransactionBaseImpl::ReleaseSnapshot(const Snapshot* snapshot, DB* db) { |
528 | 3.30k | if (snapshot != nullptr) { |
529 | 71 | db->ReleaseSnapshot(snapshot); |
530 | 71 | } |
531 | 3.30k | } |
532 | | |
533 | | void TransactionBaseImpl::UndoGetForUpdate(ColumnFamilyHandle* column_family, |
534 | 53 | const Slice& key) { |
535 | 53 | uint32_t column_family_id = GetColumnFamilyID(column_family); |
536 | 53 | auto& cf_tracked_keys = tracked_keys_[column_family_id]; |
537 | 53 | std::string key_str = key.ToString(); |
538 | 53 | bool can_decrement = false; |
539 | 53 | bool can_unlock __attribute__((unused)) = false; |
540 | | |
541 | 53 | if (save_points_ != nullptr && !save_points_->empty()) { |
542 | | // Check if this key was fetched ForUpdate in this SavePoint |
543 | 22 | auto& cf_savepoint_keys = save_points_->top().new_keys_[column_family_id]; |
544 | | |
545 | 22 | auto savepoint_iter = cf_savepoint_keys.find(key_str); |
546 | 22 | if (savepoint_iter != cf_savepoint_keys.end()) { |
547 | 8 | if (savepoint_iter->second.num_reads > 0) { |
548 | 6 | savepoint_iter->second.num_reads--; |
549 | 6 | can_decrement = true; |
550 | | |
551 | 6 | if (savepoint_iter->second.num_reads == 0 && |
552 | 6 | savepoint_iter->second.num_writes == 0) { |
553 | | // No other GetForUpdates or write on this key in this SavePoint |
554 | 5 | cf_savepoint_keys.erase(savepoint_iter); |
555 | 5 | can_unlock = true; |
556 | 5 | } |
557 | 6 | } |
558 | 8 | } |
559 | 31 | } else { |
560 | | // No SavePoint set |
561 | 31 | can_decrement = true; |
562 | 31 | can_unlock = true; |
563 | 31 | } |
564 | | |
565 | | // We can only decrement the read count for this key if we were able to |
566 | | // decrement the read count in the current SavePoint, OR if there is no |
567 | | // SavePoint set. |
568 | 53 | if (can_decrement) { |
569 | 37 | auto key_iter = cf_tracked_keys.find(key_str); |
570 | | |
571 | 37 | if (key_iter != cf_tracked_keys.end()) { |
572 | 26 | if (key_iter->second.num_reads > 0) { |
573 | 24 | key_iter->second.num_reads--; |
574 | | |
575 | 24 | if (key_iter->second.num_reads == 0 && |
576 | 16 | key_iter->second.num_writes == 0) { |
577 | | // No other GetForUpdates or writes on this key |
578 | 10 | assert(can_unlock); |
579 | 10 | cf_tracked_keys.erase(key_iter); |
580 | 10 | UnlockGetForUpdate(column_family, key); |
581 | 10 | } |
582 | 24 | } |
583 | 26 | } |
584 | 37 | } |
585 | 53 | } |
586 | | |
587 | | } // namespace rocksdb |
588 | | |
589 | | #endif // ROCKSDB_LITE |