YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/tablet/transaction_loader.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include "yb/tablet/transaction_loader.h"
15
16
#include "yb/docdb/bounded_rocksdb_iterator.h"
17
#include "yb/docdb/doc_key.h"
18
#include "yb/docdb/docdb_rocksdb_util.h"
19
#include "yb/docdb/intent.h"
20
21
#include "yb/tablet/transaction_status_resolver.h"
22
23
#include "yb/util/bitmap.h"
24
#include "yb/util/flag_tags.h"
25
#include "yb/util/logging.h"
26
#include "yb/util/metrics.h"
27
#include "yb/util/operation_counter.h"
28
#include "yb/util/pb_util.h"
29
#include "yb/util/scope_exit.h"
30
#include "yb/util/thread.h"
31
32
using namespace std::literals;
33
34
DEFINE_test_flag(int32, inject_load_transaction_delay_ms, 0,
35
                 "Inject delay before loading each transaction at startup.");
36
37
DECLARE_bool(TEST_fail_on_replicated_batch_idx_set_in_txn_record);
38
39
METRIC_DEFINE_simple_counter(
40
    tablet, transaction_load_attempts,
41
    "Total number of attempts to load transaction metadata from the intents RocksDB",
42
    yb::MetricUnit::kTransactions);
43
44
namespace yb {
45
namespace tablet {
46
47
namespace {
48
49
113k
docdb::BoundedRocksDbIterator CreateFullScanIterator(rocksdb::DB* db) {
50
113k
  return docdb::BoundedRocksDbIterator(docdb::CreateRocksDBIterator(
51
113k
      db, &docdb::KeyBounds::kNoBounds,
52
113k
      docdb::BloomFilterMode::DONT_USE_BLOOM_FILTER,
53
113k
      /* user_key_for_filter= */ boost::none, rocksdb::kDefaultQueryId));
54
113k
}
55
56
} // namespace
57
58
class TransactionLoader::Executor {
59
 public:
60
  explicit Executor(
61
      TransactionLoader* loader,
62
      RWOperationCounter* pending_op_counter)
63
56.9k
      : loader_(*loader), scoped_pending_operation_(pending_op_counter) {
64
56.9k
    metric_transaction_load_attempts_ = METRIC_transaction_load_attempts.Instantiate(
65
56.9k
        loader_.entity_);
66
56.9k
  }
67
68
56.9k
  bool Start(const docdb::DocDB& db) {
69
56.9k
    if (!scoped_pending_operation_.ok()) {
70
0
      return false;
71
0
    }
72
56.9k
    regular_iterator_ = CreateFullScanIterator(db.regular);
73
56.9k
    intents_iterator_ = CreateFullScanIterator(db.intents);
74
56.9k
    auto& load_thread = loader_.load_thread_;
75
56.9k
    load_thread = std::thread(&Executor::Execute, this);
76
56.9k
    return true;
77
56.9k
  }
78
79
 private:
80
56.9k
  void Execute() {
81
56.9k
    CDSAttacher attacher;
82
83
56.9k
    SetThreadName("TransactionLoader");
84
85
56.9k
    auto se = ScopeExit([this] {
86
56.9k
      auto pending_applies = std::move(pending_applies_);
87
56.9k
      TransactionLoaderContext& context = loader_.context_;
88
56.9k
      loader_.executor_.reset();
89
90
56.9k
      context.LoadFinished(pending_applies);
91
56.9k
    });
92
93
56.9k
    LOG_WITH_PREFIX(INFO) << "Load transactions start";
94
95
56.9k
    LoadPendingApplies();
96
56.9k
    LoadTransactions();
97
56.9k
  }
98
99
56.9k
  void LoadTransactions() {
100
56.9k
    size_t loaded_transactions = 0;
101
56.9k
    TransactionId id = TransactionId::Nil();
102
56.9k
    AppendTransactionKeyPrefix(id, &current_key_);
103
56.9k
    intents_iterator_.Seek(current_key_.AsSlice());
104
56.9k
    while (intents_iterator_.Valid()) {
105
4
      auto key = intents_iterator_.key();
106
4
      if (!key.TryConsumeByte(docdb::ValueTypeAsChar::kTransactionId)) {
107
0
        break;
108
0
      }
109
4
      auto decode_id_result = DecodeTransactionId(&key);
110
4
      if (!decode_id_result.ok()) {
111
0
        LOG_WITH_PREFIX(DFATAL)
112
0
            << "Failed to decode transaction id from: " << key.ToDebugHexString();
113
0
        intents_iterator_.Next();
114
0
        continue;
115
0
      }
116
4
      id = *decode_id_result;
117
4
      current_key_.Clear();
118
4
      AppendTransactionKeyPrefix(id, &current_key_);
119
4
      if (key.empty()) { // The key only contains a transaction id - it is metadata record.
120
4
        if (FLAGS_TEST_inject_load_transaction_delay_ms > 0) {
121
0
          std::this_thread::sleep_for(FLAGS_TEST_inject_load_transaction_delay_ms * 1ms);
122
0
        }
123
4
        LoadTransaction(id);
124
4
        ++loaded_transactions;
125
4
      }
126
4
      current_key_.AppendValueType(docdb::ValueType::kMaxByte);
127
4
      intents_iterator_.Seek(current_key_.AsSlice());
128
4
    }
129
130
56.9k
    intents_iterator_.Reset();
131
132
56.9k
    context().CompleteLoad([this] {
133
56.9k
      loader_.all_loaded_.store(true, std::memory_order_release);
134
56.9k
    });
135
56.9k
    {
136
      // We need to lock and unlock the mutex here to avoid missing a notification in WaitLoaded
137
      // and WaitAllLoaded. The waiting loop in those functions is equivalent to the following,
138
      // after locking the mutex (and of course wait(...) releases the mutex while waiting):
139
      //
140
      // 1 while (!all_loaded_.load(std::memory_order_acquire)) {
141
      // 2   load_cond_.wait(lock);
142
      // 3 }
143
      //
144
      // If we did not have the lock/unlock here, it would be possible that all_loaded_ would be set
145
      // to true and notify_all() would be called between lines 1 and 2, and we would miss the
146
      // notification and wait indefinitely at line 2. With lock/unlock this is no longer possible
147
      // because if we set all_loaded_ to true between lines 1 and 2, the only time we would be able
148
      // to send a notification at line 2 as wait(...) releases the mutex, but then we would check
149
      // all_loaded_ and exit the loop at line 1.
150
56.9k
      std::lock_guard<std::mutex> lock(loader_.mutex_);
151
56.9k
    }
152
56.9k
    loader_.load_cond_.notify_all();
153
56.9k
    LOG_WITH_PREFIX(INFO) << __func__ << " done: loaded " << loaded_transactions << " transactions";
154
56.9k
  }
155
156
56.9k
  void LoadPendingApplies() {
157
56.9k
    std::array<char, 1 + sizeof(TransactionId) + 1> seek_buffer;
158
56.9k
    seek_buffer[0] = docdb::ValueTypeAsChar::kTransactionApplyState;
159
56.9k
    seek_buffer[seek_buffer.size() - 1] = docdb::ValueTypeAsChar::kMaxByte;
160
56.9k
    regular_iterator_.Seek(Slice(seek_buffer.data(), 1));
161
162
56.9k
    while (regular_iterator_.Valid()) {
163
197
      auto key = regular_iterator_.key();
164
197
      if (!key.TryConsumeByte(docdb::ValueTypeAsChar::kTransactionApplyState)) {
165
166
        break;
166
166
      }
167
31
      auto txn_id = DecodeTransactionId(&key);
168
31
      if (!txn_id.ok() || 
!key.TryConsumeByte(docdb::ValueTypeAsChar::kGroupEnd)27
) {
169
0
        LOG_WITH_PREFIX(DFATAL) << "Wrong txn id: " << regular_iterator_.key().ToDebugString();
170
0
        regular_iterator_.Next();
171
0
        continue;
172
0
      }
173
31
      Slice value = regular_iterator_.value();
174
31
      if (value.TryConsumeByte(docdb::ValueTypeAsChar::kString)) {
175
0
        auto pb = pb_util::ParseFromSlice<docdb::ApplyTransactionStatePB>(value);
176
0
        if (!pb.ok()) {
177
0
          LOG_WITH_PREFIX(DFATAL) << "Failed to decode apply state pb from RocksDB"
178
0
                                  << key.ToDebugString() << ": " << pb.status();
179
0
          regular_iterator_.Next();
180
0
          continue;
181
0
        }
182
183
0
        auto state = docdb::ApplyTransactionState::FromPB(*pb);
184
0
        if (!state.ok()) {
185
0
          LOG_WITH_PREFIX(DFATAL) << "Failed to decode apply state from stored pb "
186
0
              << state.status();
187
0
          regular_iterator_.Next();
188
0
          continue;
189
0
        }
190
191
0
        auto it = pending_applies_.emplace(*txn_id, ApplyStateWithCommitHt {
192
0
          .state = state.get(),
193
0
          .commit_ht = HybridTime(pb->commit_ht())
194
0
        }).first;
195
196
0
        VLOG_WITH_PREFIX(4) << "Loaded pending apply for " << *txn_id << ": "
197
0
                            << it->second.ToString();
198
31
      } else if (value.TryConsumeByte(docdb::ValueTypeAsChar::kTombstone)) {
199
18.4E
        VLOG_WITH_PREFIX(4) << "Found deleted large apply for " << *txn_id;
200
29
      } else {
201
2
        LOG_WITH_PREFIX(DFATAL)
202
2
            << "Unexpected value type in apply state: " << value.ToDebugString();
203
2
      }
204
205
31
      memcpy(seek_buffer.data() + 1, txn_id->data(), txn_id->size());
206
31
      ROCKSDB_SEEK(&regular_iterator_, Slice(seek_buffer));
207
31
    }
208
56.9k
  }
209
210
  // id - transaction id to load.
211
4
  void LoadTransaction(const TransactionId& id) {
212
4
    metric_transaction_load_attempts_->Increment();
213
4
    
VLOG_WITH_PREFIX0
(1) << "Loading transaction: " << id0
;
214
215
4
    TransactionMetadataPB metadata_pb;
216
217
4
    const Slice& value = intents_iterator_.value();
218
4
    if (!metadata_pb.ParseFromArray(value.cdata(), narrow_cast<int>(value.size()))) {
219
0
      LOG_WITH_PREFIX(DFATAL) << "Unable to parse stored metadata: "
220
0
                              << value.ToDebugHexString();
221
0
      return;
222
0
    }
223
224
4
    auto metadata = TransactionMetadata::FromPB(metadata_pb);
225
4
    if (!metadata.ok()) {
226
0
      LOG_WITH_PREFIX(DFATAL) << "Loaded bad metadata: " << metadata.status();
227
0
      return;
228
0
    }
229
230
4
    if (!metadata->start_time.is_valid()) {
231
0
      metadata->start_time = HybridTime::kMin;
232
0
      LOG_WITH_PREFIX(INFO) << "Patched start time " << metadata->transaction_id << ": "
233
0
                            << metadata->start_time;
234
0
    }
235
236
4
    TransactionalBatchData last_batch_data;
237
4
    OneWayBitmap replicated_batches;
238
4
    FetchLastBatchData(id, &last_batch_data, &replicated_batches);
239
240
4
    if (!status_resolver_) {
241
4
      status_resolver_ = &context().AddStatusResolver();
242
4
    }
243
4
    status_resolver_->Add(metadata->status_tablet, id);
244
245
4
    auto pending_apply_it = pending_applies_.find(id);
246
4
    context().LoadTransaction(
247
4
        std::move(*metadata), std::move(last_batch_data), std::move(replicated_batches),
248
4
        pending_apply_it != pending_applies_.end() ? 
&pending_apply_it->second0
: nullptr);
249
4
    {
250
4
      std::lock_guard<std::mutex> lock(loader_.mutex_);
251
4
      loader_.last_loaded_ = id;
252
4
    }
253
4
    loader_.load_cond_.notify_all();
254
4
  }
255
256
  void FetchLastBatchData(
257
      const TransactionId& id,
258
      TransactionalBatchData* last_batch_data,
259
4
      OneWayBitmap* replicated_batches) {
260
4
    current_key_.AppendValueType(docdb::ValueType::kMaxByte);
261
4
    intents_iterator_.Seek(current_key_.AsSlice());
262
4
    if (intents_iterator_.Valid()) {
263
0
      intents_iterator_.Prev();
264
4
    } else {
265
4
      intents_iterator_.SeekToLast();
266
4
    }
267
4
    current_key_.RemoveLastByte();
268
2.00k
    while (intents_iterator_.Valid() && 
intents_iterator_.key().starts_with(current_key_)2.00k
) {
269
2.00k
      auto decoded_key = docdb::DecodeIntentKey(intents_iterator_.value());
270
2.00k
      
LOG_IF_WITH_PREFIX1
(DFATAL, !decoded_key.ok())
271
1
          << "Failed to decode intent while loading transaction " << id << ", "
272
1
          << intents_iterator_.key().ToDebugHexString() << " => "
273
1
          << intents_iterator_.value().ToDebugHexString() << ": " << decoded_key.status();
274
2.00k
      if (decoded_key.ok() && 
docdb::HasStrong(decoded_key->intent_types)2.00k
) {
275
4
        last_batch_data->hybrid_time = decoded_key->doc_ht.hybrid_time();
276
4
        Slice rev_key_slice(intents_iterator_.value());
277
        // Required by the transaction sealing protocol.
278
4
        if (!rev_key_slice.empty() && rev_key_slice[0] == docdb::ValueTypeAsChar::kBitSet) {
279
0
          CHECK(!FLAGS_TEST_fail_on_replicated_batch_idx_set_in_txn_record);
280
0
          rev_key_slice.remove_prefix(1);
281
0
          auto result = OneWayBitmap::Decode(&rev_key_slice);
282
0
          if (result.ok()) {
283
0
            *replicated_batches = std::move(*result);
284
0
            VLOG_WITH_PREFIX(1) << "Decoded replicated batches for " << id << ": "
285
0
                                << replicated_batches->ToString();
286
0
          } else {
287
0
            LOG_WITH_PREFIX(DFATAL)
288
0
                << "Failed to decode replicated batches from "
289
0
                << intents_iterator_.value().ToDebugHexString() << ": " << result.status();
290
0
          }
291
0
        }
292
4
        std::string rev_key = rev_key_slice.ToBuffer();
293
4
        intents_iterator_.Seek(rev_key);
294
        // Delete could run in parallel to this load, and since our deletes break snapshot read
295
        // we could get into a situation when metadata and reverse record were successfully read,
296
        // but intent record could not be found.
297
4
        if (intents_iterator_.Valid() && intents_iterator_.key().starts_with(rev_key)) {
298
4
          
VLOG_WITH_PREFIX0
(1)
299
0
              << "Found latest record for " << id
300
0
              << ": " << docdb::SubDocKey::DebugSliceToString(intents_iterator_.key())
301
0
              << " => " << intents_iterator_.value().ToDebugHexString();
302
4
          auto txn_id_slice = id.AsSlice();
303
4
          auto decoded_value_or_status = docdb::DecodeIntentValue(
304
4
              intents_iterator_.value(), &txn_id_slice);
305
4
          
LOG_IF_WITH_PREFIX0
(DFATAL, !decoded_value_or_status.ok())
306
0
              << "Failed to decode intent value: " << decoded_value_or_status.status() << ", "
307
0
              << docdb::SubDocKey::DebugSliceToString(intents_iterator_.key()) << " => "
308
0
              << intents_iterator_.value().ToDebugHexString();
309
4
          if (decoded_value_or_status.ok()) {
310
4
            last_batch_data->next_write_id = decoded_value_or_status->write_id;
311
4
          }
312
4
          ++last_batch_data->next_write_id;
313
4
        }
314
4
        break;
315
4
      }
316
2.00k
      intents_iterator_.Prev();
317
2.00k
    }
318
4
  }
319
320
170k
  TransactionLoaderContext& context() const {
321
170k
    return loader_.context_;
322
170k
  }
323
324
113k
  const std::string& LogPrefix() const {
325
113k
    return context().LogPrefix();
326
113k
  }
327
328
  TransactionLoader& loader_;
329
  ScopedRWOperation scoped_pending_operation_;
330
331
  docdb::BoundedRocksDbIterator regular_iterator_;
332
  docdb::BoundedRocksDbIterator intents_iterator_;
333
334
  // Buffer that contains key of current record, i.e. value type + transaction id.
335
  docdb::KeyBytes current_key_;
336
337
  TransactionStatusResolver* status_resolver_ = nullptr;
338
339
  ApplyStatesMap pending_applies_;
340
341
  scoped_refptr<Counter> metric_transaction_load_attempts_;
342
};
343
344
TransactionLoader::TransactionLoader(
345
    TransactionLoaderContext* context, const scoped_refptr<MetricEntity>& entity)
346
56.9k
    : context_(*context), entity_(entity) {}
347
348
44.6k
TransactionLoader::~TransactionLoader() {
349
44.6k
}
350
351
56.9k
void TransactionLoader::Start(RWOperationCounter* pending_op_counter, const docdb::DocDB& db) {
352
56.9k
  executor_ = std::make_unique<Executor>(this, pending_op_counter);
353
56.9k
  if (!executor_->Start(db)) {
354
0
    executor_ = nullptr;
355
0
  }
356
56.9k
}
357
358
namespace {
359
360
// Waiting threads will only wake up on a timeout if there is still an uncaught race condition that
361
// causes us to miss a notification on the condition variable.
362
constexpr auto kWaitLoadedWakeUpInterval = 10s;
363
364
}  // namespace
365
366
10.8M
void TransactionLoader::WaitLoaded(const TransactionId& id) NO_THREAD_SAFETY_ANALYSIS {
367
10.8M
  if (
all_loaded_.load(std::memory_order_acquire)10.8M
) {
368
10.8M
    return;
369
10.8M
  }
370
18.4E
  std::unique_lock<std::mutex> lock(mutex_);
371
  // Defensively wake up at least once a second to avoid deadlock due to any issue similar to #8696.
372
18.4E
  while (!all_loaded_.load(std::memory_order_acquire)) {
373
0
    if (last_loaded_ >= id) {
374
0
      break;
375
0
    }
376
0
    load_cond_.wait_for(lock, kWaitLoadedWakeUpInterval);
377
0
  }
378
18.4E
}
379
380
// Disable thread safety analysis because std::unique_lock is used.
381
88.4k
void TransactionLoader::WaitAllLoaded() NO_THREAD_SAFETY_ANALYSIS {
382
88.4k
  if (
all_loaded_.load(std::memory_order_acquire)88.4k
) {
383
88.4k
    return;
384
88.4k
  }
385
  // Defensively wake up at least once a second to avoid deadlock due to any issue similar to #8696.
386
18.4E
  std::unique_lock<std::mutex> lock(mutex_);
387
18.4E
  while (!all_loaded_.load(std::memory_order_acquire)) {
388
0
    load_cond_.wait_for(lock, kWaitLoadedWakeUpInterval);
389
0
  }
390
18.4E
}
391
392
44.7k
void TransactionLoader::Shutdown() {
393
44.7k
  if (load_thread_.joinable()) {
394
44.6k
    load_thread_.join();
395
44.6k
  }
396
44.7k
}
397
398
} // namespace tablet
399
} // namespace yb