/Users/deen/code/yugabyte-db/src/yb/tablet/transaction_loader.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | // |
13 | | |
14 | | #include "yb/tablet/transaction_loader.h" |
15 | | |
16 | | #include "yb/docdb/bounded_rocksdb_iterator.h" |
17 | | #include "yb/docdb/doc_key.h" |
18 | | #include "yb/docdb/docdb_rocksdb_util.h" |
19 | | #include "yb/docdb/intent.h" |
20 | | |
21 | | #include "yb/tablet/transaction_status_resolver.h" |
22 | | |
23 | | #include "yb/util/bitmap.h" |
24 | | #include "yb/util/flag_tags.h" |
25 | | #include "yb/util/logging.h" |
26 | | #include "yb/util/metrics.h" |
27 | | #include "yb/util/operation_counter.h" |
28 | | #include "yb/util/pb_util.h" |
29 | | #include "yb/util/scope_exit.h" |
30 | | #include "yb/util/thread.h" |
31 | | |
32 | | using namespace std::literals; |
33 | | |
34 | | DEFINE_test_flag(int32, inject_load_transaction_delay_ms, 0, |
35 | | "Inject delay before loading each transaction at startup."); |
36 | | |
37 | | DECLARE_bool(TEST_fail_on_replicated_batch_idx_set_in_txn_record); |
38 | | |
39 | | METRIC_DEFINE_simple_counter( |
40 | | tablet, transaction_load_attempts, |
41 | | "Total number of attempts to load transaction metadata from the intents RocksDB", |
42 | | yb::MetricUnit::kTransactions); |
43 | | |
44 | | namespace yb { |
45 | | namespace tablet { |
46 | | |
47 | | namespace { |
48 | | |
49 | 113k | docdb::BoundedRocksDbIterator CreateFullScanIterator(rocksdb::DB* db) { |
50 | 113k | return docdb::BoundedRocksDbIterator(docdb::CreateRocksDBIterator( |
51 | 113k | db, &docdb::KeyBounds::kNoBounds, |
52 | 113k | docdb::BloomFilterMode::DONT_USE_BLOOM_FILTER, |
53 | 113k | /* user_key_for_filter= */ boost::none, rocksdb::kDefaultQueryId)); |
54 | 113k | } |
55 | | |
56 | | } // namespace |
57 | | |
58 | | class TransactionLoader::Executor { |
59 | | public: |
60 | | explicit Executor( |
61 | | TransactionLoader* loader, |
62 | | RWOperationCounter* pending_op_counter) |
63 | 56.9k | : loader_(*loader), scoped_pending_operation_(pending_op_counter) { |
64 | 56.9k | metric_transaction_load_attempts_ = METRIC_transaction_load_attempts.Instantiate( |
65 | 56.9k | loader_.entity_); |
66 | 56.9k | } |
67 | | |
68 | 56.9k | bool Start(const docdb::DocDB& db) { |
69 | 56.9k | if (!scoped_pending_operation_.ok()) { |
70 | 0 | return false; |
71 | 0 | } |
72 | 56.9k | regular_iterator_ = CreateFullScanIterator(db.regular); |
73 | 56.9k | intents_iterator_ = CreateFullScanIterator(db.intents); |
74 | 56.9k | auto& load_thread = loader_.load_thread_; |
75 | 56.9k | load_thread = std::thread(&Executor::Execute, this); |
76 | 56.9k | return true; |
77 | 56.9k | } |
78 | | |
79 | | private: |
80 | 56.9k | void Execute() { |
81 | 56.9k | CDSAttacher attacher; |
82 | | |
83 | 56.9k | SetThreadName("TransactionLoader"); |
84 | | |
85 | 56.9k | auto se = ScopeExit([this] { |
86 | 56.9k | auto pending_applies = std::move(pending_applies_); |
87 | 56.9k | TransactionLoaderContext& context = loader_.context_; |
88 | 56.9k | loader_.executor_.reset(); |
89 | | |
90 | 56.9k | context.LoadFinished(pending_applies); |
91 | 56.9k | }); |
92 | | |
93 | 56.9k | LOG_WITH_PREFIX(INFO) << "Load transactions start"; |
94 | | |
95 | 56.9k | LoadPendingApplies(); |
96 | 56.9k | LoadTransactions(); |
97 | 56.9k | } |
98 | | |
99 | 56.9k | void LoadTransactions() { |
100 | 56.9k | size_t loaded_transactions = 0; |
101 | 56.9k | TransactionId id = TransactionId::Nil(); |
102 | 56.9k | AppendTransactionKeyPrefix(id, ¤t_key_); |
103 | 56.9k | intents_iterator_.Seek(current_key_.AsSlice()); |
104 | 56.9k | while (intents_iterator_.Valid()) { |
105 | 4 | auto key = intents_iterator_.key(); |
106 | 4 | if (!key.TryConsumeByte(docdb::ValueTypeAsChar::kTransactionId)) { |
107 | 0 | break; |
108 | 0 | } |
109 | 4 | auto decode_id_result = DecodeTransactionId(&key); |
110 | 4 | if (!decode_id_result.ok()) { |
111 | 0 | LOG_WITH_PREFIX(DFATAL) |
112 | 0 | << "Failed to decode transaction id from: " << key.ToDebugHexString(); |
113 | 0 | intents_iterator_.Next(); |
114 | 0 | continue; |
115 | 0 | } |
116 | 4 | id = *decode_id_result; |
117 | 4 | current_key_.Clear(); |
118 | 4 | AppendTransactionKeyPrefix(id, ¤t_key_); |
119 | 4 | if (key.empty()) { // The key only contains a transaction id - it is metadata record. |
120 | 4 | if (FLAGS_TEST_inject_load_transaction_delay_ms > 0) { |
121 | 0 | std::this_thread::sleep_for(FLAGS_TEST_inject_load_transaction_delay_ms * 1ms); |
122 | 0 | } |
123 | 4 | LoadTransaction(id); |
124 | 4 | ++loaded_transactions; |
125 | 4 | } |
126 | 4 | current_key_.AppendValueType(docdb::ValueType::kMaxByte); |
127 | 4 | intents_iterator_.Seek(current_key_.AsSlice()); |
128 | 4 | } |
129 | | |
130 | 56.9k | intents_iterator_.Reset(); |
131 | | |
132 | 56.9k | context().CompleteLoad([this] { |
133 | 56.9k | loader_.all_loaded_.store(true, std::memory_order_release); |
134 | 56.9k | }); |
135 | 56.9k | { |
136 | | // We need to lock and unlock the mutex here to avoid missing a notification in WaitLoaded |
137 | | // and WaitAllLoaded. The waiting loop in those functions is equivalent to the following, |
138 | | // after locking the mutex (and of course wait(...) releases the mutex while waiting): |
139 | | // |
140 | | // 1 while (!all_loaded_.load(std::memory_order_acquire)) { |
141 | | // 2 load_cond_.wait(lock); |
142 | | // 3 } |
143 | | // |
144 | | // If we did not have the lock/unlock here, it would be possible that all_loaded_ would be set |
145 | | // to true and notify_all() would be called between lines 1 and 2, and we would miss the |
146 | | // notification and wait indefinitely at line 2. With lock/unlock this is no longer possible |
147 | | // because if we set all_loaded_ to true between lines 1 and 2, the only time we would be able |
148 | | // to send a notification at line 2 as wait(...) releases the mutex, but then we would check |
149 | | // all_loaded_ and exit the loop at line 1. |
150 | 56.9k | std::lock_guard<std::mutex> lock(loader_.mutex_); |
151 | 56.9k | } |
152 | 56.9k | loader_.load_cond_.notify_all(); |
153 | 56.9k | LOG_WITH_PREFIX(INFO) << __func__ << " done: loaded " << loaded_transactions << " transactions"; |
154 | 56.9k | } |
155 | | |
156 | 56.9k | void LoadPendingApplies() { |
157 | 56.9k | std::array<char, 1 + sizeof(TransactionId) + 1> seek_buffer; |
158 | 56.9k | seek_buffer[0] = docdb::ValueTypeAsChar::kTransactionApplyState; |
159 | 56.9k | seek_buffer[seek_buffer.size() - 1] = docdb::ValueTypeAsChar::kMaxByte; |
160 | 56.9k | regular_iterator_.Seek(Slice(seek_buffer.data(), 1)); |
161 | | |
162 | 56.9k | while (regular_iterator_.Valid()) { |
163 | 197 | auto key = regular_iterator_.key(); |
164 | 197 | if (!key.TryConsumeByte(docdb::ValueTypeAsChar::kTransactionApplyState)) { |
165 | 166 | break; |
166 | 166 | } |
167 | 31 | auto txn_id = DecodeTransactionId(&key); |
168 | 31 | if (!txn_id.ok() || !key.TryConsumeByte(docdb::ValueTypeAsChar::kGroupEnd)27 ) { |
169 | 0 | LOG_WITH_PREFIX(DFATAL) << "Wrong txn id: " << regular_iterator_.key().ToDebugString(); |
170 | 0 | regular_iterator_.Next(); |
171 | 0 | continue; |
172 | 0 | } |
173 | 31 | Slice value = regular_iterator_.value(); |
174 | 31 | if (value.TryConsumeByte(docdb::ValueTypeAsChar::kString)) { |
175 | 0 | auto pb = pb_util::ParseFromSlice<docdb::ApplyTransactionStatePB>(value); |
176 | 0 | if (!pb.ok()) { |
177 | 0 | LOG_WITH_PREFIX(DFATAL) << "Failed to decode apply state pb from RocksDB" |
178 | 0 | << key.ToDebugString() << ": " << pb.status(); |
179 | 0 | regular_iterator_.Next(); |
180 | 0 | continue; |
181 | 0 | } |
182 | | |
183 | 0 | auto state = docdb::ApplyTransactionState::FromPB(*pb); |
184 | 0 | if (!state.ok()) { |
185 | 0 | LOG_WITH_PREFIX(DFATAL) << "Failed to decode apply state from stored pb " |
186 | 0 | << state.status(); |
187 | 0 | regular_iterator_.Next(); |
188 | 0 | continue; |
189 | 0 | } |
190 | | |
191 | 0 | auto it = pending_applies_.emplace(*txn_id, ApplyStateWithCommitHt { |
192 | 0 | .state = state.get(), |
193 | 0 | .commit_ht = HybridTime(pb->commit_ht()) |
194 | 0 | }).first; |
195 | |
|
196 | 0 | VLOG_WITH_PREFIX(4) << "Loaded pending apply for " << *txn_id << ": " |
197 | 0 | << it->second.ToString(); |
198 | 31 | } else if (value.TryConsumeByte(docdb::ValueTypeAsChar::kTombstone)) { |
199 | 18.4E | VLOG_WITH_PREFIX(4) << "Found deleted large apply for " << *txn_id; |
200 | 29 | } else { |
201 | 2 | LOG_WITH_PREFIX(DFATAL) |
202 | 2 | << "Unexpected value type in apply state: " << value.ToDebugString(); |
203 | 2 | } |
204 | | |
205 | 31 | memcpy(seek_buffer.data() + 1, txn_id->data(), txn_id->size()); |
206 | 31 | ROCKSDB_SEEK(®ular_iterator_, Slice(seek_buffer)); |
207 | 31 | } |
208 | 56.9k | } |
209 | | |
210 | | // id - transaction id to load. |
211 | 4 | void LoadTransaction(const TransactionId& id) { |
212 | 4 | metric_transaction_load_attempts_->Increment(); |
213 | 4 | VLOG_WITH_PREFIX0 (1) << "Loading transaction: " << id0 ; |
214 | | |
215 | 4 | TransactionMetadataPB metadata_pb; |
216 | | |
217 | 4 | const Slice& value = intents_iterator_.value(); |
218 | 4 | if (!metadata_pb.ParseFromArray(value.cdata(), narrow_cast<int>(value.size()))) { |
219 | 0 | LOG_WITH_PREFIX(DFATAL) << "Unable to parse stored metadata: " |
220 | 0 | << value.ToDebugHexString(); |
221 | 0 | return; |
222 | 0 | } |
223 | | |
224 | 4 | auto metadata = TransactionMetadata::FromPB(metadata_pb); |
225 | 4 | if (!metadata.ok()) { |
226 | 0 | LOG_WITH_PREFIX(DFATAL) << "Loaded bad metadata: " << metadata.status(); |
227 | 0 | return; |
228 | 0 | } |
229 | | |
230 | 4 | if (!metadata->start_time.is_valid()) { |
231 | 0 | metadata->start_time = HybridTime::kMin; |
232 | 0 | LOG_WITH_PREFIX(INFO) << "Patched start time " << metadata->transaction_id << ": " |
233 | 0 | << metadata->start_time; |
234 | 0 | } |
235 | | |
236 | 4 | TransactionalBatchData last_batch_data; |
237 | 4 | OneWayBitmap replicated_batches; |
238 | 4 | FetchLastBatchData(id, &last_batch_data, &replicated_batches); |
239 | | |
240 | 4 | if (!status_resolver_) { |
241 | 4 | status_resolver_ = &context().AddStatusResolver(); |
242 | 4 | } |
243 | 4 | status_resolver_->Add(metadata->status_tablet, id); |
244 | | |
245 | 4 | auto pending_apply_it = pending_applies_.find(id); |
246 | 4 | context().LoadTransaction( |
247 | 4 | std::move(*metadata), std::move(last_batch_data), std::move(replicated_batches), |
248 | 4 | pending_apply_it != pending_applies_.end() ? &pending_apply_it->second0 : nullptr); |
249 | 4 | { |
250 | 4 | std::lock_guard<std::mutex> lock(loader_.mutex_); |
251 | 4 | loader_.last_loaded_ = id; |
252 | 4 | } |
253 | 4 | loader_.load_cond_.notify_all(); |
254 | 4 | } |
255 | | |
256 | | void FetchLastBatchData( |
257 | | const TransactionId& id, |
258 | | TransactionalBatchData* last_batch_data, |
259 | 4 | OneWayBitmap* replicated_batches) { |
260 | 4 | current_key_.AppendValueType(docdb::ValueType::kMaxByte); |
261 | 4 | intents_iterator_.Seek(current_key_.AsSlice()); |
262 | 4 | if (intents_iterator_.Valid()) { |
263 | 0 | intents_iterator_.Prev(); |
264 | 4 | } else { |
265 | 4 | intents_iterator_.SeekToLast(); |
266 | 4 | } |
267 | 4 | current_key_.RemoveLastByte(); |
268 | 2.00k | while (intents_iterator_.Valid() && intents_iterator_.key().starts_with(current_key_)2.00k ) { |
269 | 2.00k | auto decoded_key = docdb::DecodeIntentKey(intents_iterator_.value()); |
270 | 2.00k | LOG_IF_WITH_PREFIX1 (DFATAL, !decoded_key.ok()) |
271 | 1 | << "Failed to decode intent while loading transaction " << id << ", " |
272 | 1 | << intents_iterator_.key().ToDebugHexString() << " => " |
273 | 1 | << intents_iterator_.value().ToDebugHexString() << ": " << decoded_key.status(); |
274 | 2.00k | if (decoded_key.ok() && docdb::HasStrong(decoded_key->intent_types)2.00k ) { |
275 | 4 | last_batch_data->hybrid_time = decoded_key->doc_ht.hybrid_time(); |
276 | 4 | Slice rev_key_slice(intents_iterator_.value()); |
277 | | // Required by the transaction sealing protocol. |
278 | 4 | if (!rev_key_slice.empty() && rev_key_slice[0] == docdb::ValueTypeAsChar::kBitSet) { |
279 | 0 | CHECK(!FLAGS_TEST_fail_on_replicated_batch_idx_set_in_txn_record); |
280 | 0 | rev_key_slice.remove_prefix(1); |
281 | 0 | auto result = OneWayBitmap::Decode(&rev_key_slice); |
282 | 0 | if (result.ok()) { |
283 | 0 | *replicated_batches = std::move(*result); |
284 | 0 | VLOG_WITH_PREFIX(1) << "Decoded replicated batches for " << id << ": " |
285 | 0 | << replicated_batches->ToString(); |
286 | 0 | } else { |
287 | 0 | LOG_WITH_PREFIX(DFATAL) |
288 | 0 | << "Failed to decode replicated batches from " |
289 | 0 | << intents_iterator_.value().ToDebugHexString() << ": " << result.status(); |
290 | 0 | } |
291 | 0 | } |
292 | 4 | std::string rev_key = rev_key_slice.ToBuffer(); |
293 | 4 | intents_iterator_.Seek(rev_key); |
294 | | // Delete could run in parallel to this load, and since our deletes break snapshot read |
295 | | // we could get into a situation when metadata and reverse record were successfully read, |
296 | | // but intent record could not be found. |
297 | 4 | if (intents_iterator_.Valid() && intents_iterator_.key().starts_with(rev_key)) { |
298 | 4 | VLOG_WITH_PREFIX0 (1) |
299 | 0 | << "Found latest record for " << id |
300 | 0 | << ": " << docdb::SubDocKey::DebugSliceToString(intents_iterator_.key()) |
301 | 0 | << " => " << intents_iterator_.value().ToDebugHexString(); |
302 | 4 | auto txn_id_slice = id.AsSlice(); |
303 | 4 | auto decoded_value_or_status = docdb::DecodeIntentValue( |
304 | 4 | intents_iterator_.value(), &txn_id_slice); |
305 | 4 | LOG_IF_WITH_PREFIX0 (DFATAL, !decoded_value_or_status.ok()) |
306 | 0 | << "Failed to decode intent value: " << decoded_value_or_status.status() << ", " |
307 | 0 | << docdb::SubDocKey::DebugSliceToString(intents_iterator_.key()) << " => " |
308 | 0 | << intents_iterator_.value().ToDebugHexString(); |
309 | 4 | if (decoded_value_or_status.ok()) { |
310 | 4 | last_batch_data->next_write_id = decoded_value_or_status->write_id; |
311 | 4 | } |
312 | 4 | ++last_batch_data->next_write_id; |
313 | 4 | } |
314 | 4 | break; |
315 | 4 | } |
316 | 2.00k | intents_iterator_.Prev(); |
317 | 2.00k | } |
318 | 4 | } |
319 | | |
320 | 170k | TransactionLoaderContext& context() const { |
321 | 170k | return loader_.context_; |
322 | 170k | } |
323 | | |
324 | 113k | const std::string& LogPrefix() const { |
325 | 113k | return context().LogPrefix(); |
326 | 113k | } |
327 | | |
328 | | TransactionLoader& loader_; |
329 | | ScopedRWOperation scoped_pending_operation_; |
330 | | |
331 | | docdb::BoundedRocksDbIterator regular_iterator_; |
332 | | docdb::BoundedRocksDbIterator intents_iterator_; |
333 | | |
334 | | // Buffer that contains key of current record, i.e. value type + transaction id. |
335 | | docdb::KeyBytes current_key_; |
336 | | |
337 | | TransactionStatusResolver* status_resolver_ = nullptr; |
338 | | |
339 | | ApplyStatesMap pending_applies_; |
340 | | |
341 | | scoped_refptr<Counter> metric_transaction_load_attempts_; |
342 | | }; |
343 | | |
344 | | TransactionLoader::TransactionLoader( |
345 | | TransactionLoaderContext* context, const scoped_refptr<MetricEntity>& entity) |
346 | 56.9k | : context_(*context), entity_(entity) {} |
347 | | |
348 | 44.6k | TransactionLoader::~TransactionLoader() { |
349 | 44.6k | } |
350 | | |
351 | 56.9k | void TransactionLoader::Start(RWOperationCounter* pending_op_counter, const docdb::DocDB& db) { |
352 | 56.9k | executor_ = std::make_unique<Executor>(this, pending_op_counter); |
353 | 56.9k | if (!executor_->Start(db)) { |
354 | 0 | executor_ = nullptr; |
355 | 0 | } |
356 | 56.9k | } |
357 | | |
358 | | namespace { |
359 | | |
360 | | // Waiting threads will only wake up on a timeout if there is still an uncaught race condition that |
361 | | // causes us to miss a notification on the condition variable. |
362 | | constexpr auto kWaitLoadedWakeUpInterval = 10s; |
363 | | |
364 | | } // namespace |
365 | | |
366 | 10.8M | void TransactionLoader::WaitLoaded(const TransactionId& id) NO_THREAD_SAFETY_ANALYSIS { |
367 | 10.8M | if (all_loaded_.load(std::memory_order_acquire)10.8M ) { |
368 | 10.8M | return; |
369 | 10.8M | } |
370 | 18.4E | std::unique_lock<std::mutex> lock(mutex_); |
371 | | // Defensively wake up at least once a second to avoid deadlock due to any issue similar to #8696. |
372 | 18.4E | while (!all_loaded_.load(std::memory_order_acquire)) { |
373 | 0 | if (last_loaded_ >= id) { |
374 | 0 | break; |
375 | 0 | } |
376 | 0 | load_cond_.wait_for(lock, kWaitLoadedWakeUpInterval); |
377 | 0 | } |
378 | 18.4E | } |
379 | | |
380 | | // Disable thread safety analysis because std::unique_lock is used. |
381 | 88.4k | void TransactionLoader::WaitAllLoaded() NO_THREAD_SAFETY_ANALYSIS { |
382 | 88.4k | if (all_loaded_.load(std::memory_order_acquire)88.4k ) { |
383 | 88.4k | return; |
384 | 88.4k | } |
385 | | // Defensively wake up at least once a second to avoid deadlock due to any issue similar to #8696. |
386 | 18.4E | std::unique_lock<std::mutex> lock(mutex_); |
387 | 18.4E | while (!all_loaded_.load(std::memory_order_acquire)) { |
388 | 0 | load_cond_.wait_for(lock, kWaitLoadedWakeUpInterval); |
389 | 0 | } |
390 | 18.4E | } |
391 | | |
392 | 44.7k | void TransactionLoader::Shutdown() { |
393 | 44.7k | if (load_thread_.joinable()) { |
394 | 44.6k | load_thread_.join(); |
395 | 44.6k | } |
396 | 44.7k | } |
397 | | |
398 | | } // namespace tablet |
399 | | } // namespace yb |