YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/docdb/docdb_util.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include "yb/docdb/docdb_util.h"
15
16
#include "yb/docdb/consensus_frontier.h"
17
#include "yb/docdb/doc_key.h"
18
#include "yb/docdb/docdb.h"
19
#include "yb/docdb/docdb_debug.h"
20
#include "yb/docdb/docdb_rocksdb_util.h"
21
#include "yb/docdb/rocksdb_writer.h"
22
23
#include "yb/rocksutil/write_batch_formatter.h"
24
#include "yb/rocksutil/yb_rocksdb.h"
25
26
#include "yb/tablet/tablet_options.h"
27
28
#include "yb/util/env.h"
29
#include "yb/util/status_format.h"
30
#include "yb/util/string_trim.h"
31
#include "yb/docdb/docdb_pgapi.h"
32
33
using std::string;
34
using std::make_shared;
35
using std::endl;
36
using strings::Substitute;
37
using yb::FormatBytesAsStr;
38
using yb::util::ApplyEagerLineContinuation;
39
using std::vector;
40
41
namespace yb {
42
namespace docdb {
43
44
void SetValueFromQLBinaryWrapper(
45
39.1k
    QLValuePB ql_value, const int pg_data_type, DatumMessagePB* cdc_datum_message) {
46
39.1k
  WARN_NOT_OK(yb::docdb::SetValueFromQLBinary(ql_value, pg_data_type, cdc_datum_message), "Failed");
47
39.1k
}
48
49
0
rocksdb::DB* DocDBRocksDBUtil::rocksdb() {
50
0
  return DCHECK_NOTNULL(regular_db_.get());
51
0
}
52
53
0
rocksdb::DB* DocDBRocksDBUtil::intents_db() {
54
0
  return DCHECK_NOTNULL(intents_db_.get());
55
0
}
56
57
0
std::string DocDBRocksDBUtil::IntentsDBDir() {
58
0
  return rocksdb_dir_ + ".intents";
59
0
}
60
61
0
Status DocDBRocksDBUtil::OpenRocksDB() {
62
  // Init the directory if needed.
63
0
  if (rocksdb_dir_.empty()) {
64
0
    RETURN_NOT_OK(InitRocksDBDir());
65
0
  }
66
67
0
  rocksdb::DB* rocksdb = nullptr;
68
0
  RETURN_NOT_OK(rocksdb::DB::Open(regular_db_options_, rocksdb_dir_, &rocksdb));
69
0
  LOG(INFO) << "Opened RocksDB at " << rocksdb_dir_;
70
0
  regular_db_.reset(rocksdb);
71
72
0
  rocksdb = nullptr;
73
0
  RETURN_NOT_OK(rocksdb::DB::Open(intents_db_options_, IntentsDBDir(), &rocksdb));
74
0
  intents_db_.reset(rocksdb);
75
76
0
  return Status::OK();
77
0
}
78
79
0
void DocDBRocksDBUtil::CloseRocksDB() {
80
0
  intents_db_.reset();
81
0
  regular_db_.reset();
82
0
}
83
84
0
Status DocDBRocksDBUtil::ReopenRocksDB() {
85
0
  CloseRocksDB();
86
0
  return OpenRocksDB();
87
0
}
88
89
0
Status DocDBRocksDBUtil::DestroyRocksDB() {
90
0
  CloseRocksDB();
91
0
  LOG(INFO) << "Destroying RocksDB database at " << rocksdb_dir_;
92
0
  RETURN_NOT_OK(rocksdb::DestroyDB(rocksdb_dir_, regular_db_options_));
93
0
  RETURN_NOT_OK(rocksdb::DestroyDB(IntentsDBDir(), intents_db_options_));
94
0
  return Status::OK();
95
0
}
96
97
0
void DocDBRocksDBUtil::ResetMonotonicCounter() {
98
0
  monotonic_counter_.store(0);
99
0
}
100
101
namespace {
102
103
class DirectWriteToWriteBatchHandler : public rocksdb::DirectWriteHandler {
104
 public:
105
  explicit DirectWriteToWriteBatchHandler(rocksdb::WriteBatch *write_batch)
106
0
      : write_batch_(write_batch) {}
107
108
0
  void Put(const SliceParts& key, const SliceParts& value) override {
109
0
    write_batch_->Put(key, value);
110
0
  }
111
112
0
  void SingleDelete(const Slice& key) override {
113
0
    write_batch_->SingleDelete(key);
114
0
  }
115
116
 private:
117
  rocksdb::WriteBatch *write_batch_;
118
};
119
120
} // namespace
121
122
Status DocDBRocksDBUtil::PopulateRocksDBWriteBatch(
123
    const DocWriteBatch& dwb,
124
    rocksdb::WriteBatch* rocksdb_write_batch,
125
    HybridTime hybrid_time,
126
    bool decode_dockey,
127
    bool increment_write_id,
128
0
    PartialRangeKeyIntents partial_range_key_intents) const {
129
0
  if (decode_dockey) {
130
0
    for (const auto& entry : dwb.key_value_pairs()) {
131
      // Skip key validation for external intents.
132
0
      if (!entry.first.empty() && entry.first[0] == ValueTypeAsChar::kExternalTransactionId) {
133
0
        continue;
134
0
      }
135
0
      SubDocKey subdoc_key;
136
      // We don't expect any invalid encoded keys in the write batch. However, these encoded keys
137
      // don't contain the HybridTime.
138
0
      RETURN_NOT_OK_PREPEND(subdoc_key.FullyDecodeFromKeyWithOptionalHybridTime(entry.first),
139
0
          Substitute("when decoding key: $0", FormatBytesAsStr(entry.first)));
140
0
    }
141
0
  }
142
143
0
  if (current_txn_id_.is_initialized()) {
144
0
    if (!increment_write_id) {
145
0
      return STATUS(
146
0
          InternalError, "For transactional write only increment_write_id=true is supported");
147
0
    }
148
0
    KeyValueWriteBatchPB kv_write_batch;
149
0
    dwb.TEST_CopyToWriteBatchPB(&kv_write_batch);
150
0
    TransactionalWriter writer(
151
0
        kv_write_batch, hybrid_time, *current_txn_id_, txn_isolation_level_,
152
0
        partial_range_key_intents, /* replicated_batches_state= */ Slice(), intra_txn_write_id_);
153
0
    DirectWriteToWriteBatchHandler handler(rocksdb_write_batch);
154
0
    RETURN_NOT_OK(writer.Apply(&handler));
155
0
    intra_txn_write_id_ = writer.intra_txn_write_id();
156
0
  } else {
157
    // TODO: this block has common code with docdb::PrepareExternalWriteBatch and probably
158
    // can be refactored, so common code is reused.
159
0
    IntraTxnWriteId write_id = 0;
160
0
    for (const auto& entry : dwb.key_value_pairs()) {
161
0
      string rocksdb_key;
162
0
      if (hybrid_time.is_valid()) {
163
        // HybridTime provided. Append a PrimitiveValue with the HybridTime to the key.
164
0
        const KeyBytes encoded_ht =
165
0
            PrimitiveValue(DocHybridTime(hybrid_time, write_id)).ToKeyBytes();
166
0
        rocksdb_key = entry.first + encoded_ht.ToStringBuffer();
167
0
      } else {
168
        // Useful when printing out a write batch that does not yet know the HybridTime it will be
169
        // committed with.
170
0
        rocksdb_key = entry.first;
171
0
      }
172
0
      rocksdb_write_batch->Put(rocksdb_key, entry.second);
173
0
      if (increment_write_id) {
174
0
        ++write_id;
175
0
      }
176
0
    }
177
0
  }
178
0
  return Status::OK();
179
0
}
180
181
Status DocDBRocksDBUtil::WriteToRocksDB(
182
    const DocWriteBatch& doc_write_batch,
183
    const HybridTime& hybrid_time,
184
    bool decode_dockey,
185
    bool increment_write_id,
186
0
    PartialRangeKeyIntents partial_range_key_intents) {
187
0
  if (doc_write_batch.IsEmpty()) {
188
0
    return Status::OK();
189
0
  }
190
0
  if (!hybrid_time.is_valid()) {
191
0
    return STATUS_SUBSTITUTE(InvalidArgument, "Hybrid time is not valid: $0",
192
0
                             hybrid_time.ToString());
193
0
  }
194
195
0
  ConsensusFrontiers frontiers;
196
0
  rocksdb::WriteBatch rocksdb_write_batch;
197
0
  if (!op_id_.empty()) {
198
0
    ++op_id_.index;
199
0
    set_op_id(op_id_, &frontiers);
200
0
    set_hybrid_time(hybrid_time, &frontiers);
201
0
    rocksdb_write_batch.SetFrontiers(&frontiers);
202
0
  }
203
204
0
  RETURN_NOT_OK(PopulateRocksDBWriteBatch(
205
0
      doc_write_batch, &rocksdb_write_batch, hybrid_time, decode_dockey, increment_write_id,
206
0
      partial_range_key_intents));
207
208
0
  rocksdb::DB* db = current_txn_id_ ? intents_db_.get() : regular_db_.get();
209
0
  rocksdb::Status rocksdb_write_status = db->Write(write_options(), &rocksdb_write_batch);
210
211
0
  if (!rocksdb_write_status.ok()) {
212
0
    LOG(ERROR) << "Failed writing to RocksDB: " << rocksdb_write_status.ToString();
213
0
    return STATUS_SUBSTITUTE(RuntimeError,
214
0
                             "Error writing to RocksDB: $0", rocksdb_write_status.ToString());
215
0
  }
216
0
  return Status::OK();
217
0
}
218
219
0
Status DocDBRocksDBUtil::InitCommonRocksDBOptionsForTests() {
220
  // TODO(bojanserafimov): create MemoryMonitor?
221
0
  const size_t cache_size = block_cache_size();
222
0
  if (cache_size > 0) {
223
0
    block_cache_ = rocksdb::NewLRUCache(cache_size);
224
0
  }
225
226
0
  regular_db_options_.statistics = rocksdb::CreateDBStatisticsForTests(/* for intents */ false);
227
0
  intents_db_options_.statistics = rocksdb::CreateDBStatisticsForTests(/* for intents */ true);
228
0
  RETURN_NOT_OK(ReinitDBOptions());
229
0
  InitRocksDBWriteOptions(&write_options_);
230
0
  return Status::OK();
231
0
}
232
233
0
Status DocDBRocksDBUtil::InitCommonRocksDBOptionsForBulkLoad() {
234
0
  const size_t cache_size = block_cache_size();
235
0
  if (cache_size > 0) {
236
0
    block_cache_ = rocksdb::NewLRUCache(cache_size);
237
0
  }
238
239
  // Don't care about statistics/metrics as we don't keep metric registries during
240
  // bulk load.
241
0
  regular_db_options_.statistics = nullptr;
242
0
  intents_db_options_.statistics = nullptr;
243
0
  RETURN_NOT_OK(ReinitDBOptions());
244
0
  InitRocksDBWriteOptions(&write_options_);
245
0
  return Status::OK();
246
0
}
247
248
Status DocDBRocksDBUtil::WriteToRocksDBAndClear(
249
    DocWriteBatch* dwb,
250
    const HybridTime& hybrid_time,
251
0
    bool decode_dockey, bool increment_write_id) {
252
0
  RETURN_NOT_OK(WriteToRocksDB(*dwb, hybrid_time, decode_dockey, increment_write_id));
253
0
  dwb->Clear();
254
0
  return Status::OK();
255
0
}
256
257
0
Status DocDBRocksDBUtil::WriteSimple(int index) {
258
0
  auto encoded_doc_key = DocKey(PrimitiveValues(Format("row$0", index), 11111 * index)).Encode();
259
0
  op_id_.term = index / 2;
260
0
  op_id_.index = index;
261
0
  auto& dwb = DefaultDocWriteBatch();
262
0
  RETURN_NOT_OK(dwb.SetPrimitive(
263
0
      DocPath(encoded_doc_key, PrimitiveValue(ColumnId(10))), PrimitiveValue(index)));
264
0
  return WriteToRocksDBAndClear(&dwb, HybridTime::FromMicros(1000 * index));
265
0
}
266
267
0
void DocDBRocksDBUtil::SetHistoryCutoffHybridTime(HybridTime history_cutoff) {
268
0
  retention_policy_->SetHistoryCutoff(history_cutoff);
269
0
}
270
271
0
void DocDBRocksDBUtil::SetTableTTL(uint64_t ttl_msec) {
272
0
  schema_.SetDefaultTimeToLive(ttl_msec);
273
0
  retention_policy_->SetTableTTLForTests(MonoDelta::FromMilliseconds(ttl_msec));
274
0
}
275
276
0
string DocDBRocksDBUtil::DocDBDebugDumpToStr() {
277
0
  return yb::docdb::DocDBDebugDumpToStr(rocksdb()) +
278
0
         yb::docdb::DocDBDebugDumpToStr(intents_db(), StorageDbType::kIntents);
279
0
}
280
281
Status DocDBRocksDBUtil::SetPrimitive(
282
    const DocPath& doc_path,
283
    const Value& value,
284
    const HybridTime hybrid_time,
285
0
    const ReadHybridTime& read_ht) {
286
0
  auto dwb = MakeDocWriteBatch();
287
0
  RETURN_NOT_OK(dwb.SetPrimitive(doc_path, value, read_ht));
288
0
  return WriteToRocksDB(dwb, hybrid_time);
289
0
}
290
291
Status DocDBRocksDBUtil::SetPrimitive(
292
    const DocPath& doc_path,
293
    const PrimitiveValue& primitive_value,
294
    const HybridTime hybrid_time,
295
0
    const ReadHybridTime& read_ht) {
296
0
  return SetPrimitive(doc_path, Value(primitive_value), hybrid_time, read_ht);
297
0
}
298
299
Status DocDBRocksDBUtil::AddExternalIntents(
300
    const TransactionId& txn_id,
301
    const std::vector<ExternalIntent>& intents,
302
    const Uuid& involved_tablet,
303
0
    HybridTime hybrid_time) {
304
0
  class Provider : public ExternalIntentsProvider {
305
0
   public:
306
0
    Provider(
307
0
        const std::vector<ExternalIntent>* intents, const Uuid& involved_tablet,
308
0
        HybridTime hybrid_time)
309
0
        : intents_(*intents), involved_tablet_(involved_tablet), hybrid_time_(hybrid_time) {}
310
311
0
    void SetKey(const Slice& slice) override {
312
0
      key_.AppendRawBytes(slice);
313
0
    }
314
315
0
    void SetValue(const Slice& slice) override {
316
0
      value_ = slice;
317
0
    }
318
319
0
    void Apply(rocksdb::WriteBatch* batch) {
320
0
      KeyValuePairPB kv_pair;
321
0
      kv_pair.set_key(key_.ToStringBuffer());
322
0
      kv_pair.set_value(value_.ToStringBuffer());
323
0
      ExternalTxnApplyState external_txn_apply_state;
324
0
      AddExternalPairToWriteBatch(
325
0
          kv_pair, hybrid_time_, /* write_id= */ 0, &external_txn_apply_state,
326
0
          /* regular_write_batch= */ nullptr, batch);
327
0
    }
328
329
0
    boost::optional<std::pair<Slice, Slice>> Next() override {
330
0
      if (next_idx_ >= intents_.size()) {
331
0
        return boost::none;
332
0
      }
333
334
      // It is ok to have inefficient code in tests.
335
0
      const auto& intent = intents_[next_idx_];
336
0
      ++next_idx_;
337
338
0
      intent_key_ = intent.doc_path.encoded_doc_key();
339
0
      for (const auto& subkey : intent.doc_path.subkeys()) {
340
0
        subkey.AppendToKey(&intent_key_);
341
0
      }
342
0
      intent_value_ = intent.value.Encode();
343
344
0
      return std::pair<Slice, Slice>(intent_key_.AsSlice(), intent_value_);
345
0
    }
346
347
0
    const Uuid& InvolvedTablet() override {
348
0
      return involved_tablet_;
349
0
    }
350
351
0
   private:
352
0
    const std::vector<ExternalIntent>& intents_;
353
0
    const Uuid involved_tablet_;
354
0
    const HybridTime hybrid_time_;
355
0
    size_t next_idx_ = 0;
356
0
    KeyBytes key_;
357
0
    KeyBuffer value_;
358
359
0
    KeyBytes intent_key_;
360
0
    std::string intent_value_;
361
0
  };
362
363
0
  Provider provider(&intents, involved_tablet, hybrid_time);
364
0
  CombineExternalIntents(txn_id, &provider);
365
366
0
  rocksdb::WriteBatch rocksdb_write_batch;
367
0
  provider.Apply(&rocksdb_write_batch);
368
369
0
  return intents_db_->Write(write_options(), &rocksdb_write_batch);
370
0
}
371
372
Status DocDBRocksDBUtil::InsertSubDocument(
373
    const DocPath& doc_path,
374
    const SubDocument& value,
375
    const HybridTime hybrid_time,
376
    MonoDelta ttl,
377
0
    const ReadHybridTime& read_ht) {
378
0
  auto dwb = MakeDocWriteBatch();
379
0
  RETURN_NOT_OK(dwb.InsertSubDocument(doc_path, value, read_ht,
380
0
                                      CoarseTimePoint::max(), rocksdb::kDefaultQueryId, ttl));
381
0
  return WriteToRocksDB(dwb, hybrid_time);
382
0
}
383
384
Status DocDBRocksDBUtil::ExtendSubDocument(
385
    const DocPath& doc_path,
386
    const SubDocument& value,
387
    const HybridTime hybrid_time,
388
    MonoDelta ttl,
389
0
    const ReadHybridTime& read_ht) {
390
0
  auto dwb = MakeDocWriteBatch();
391
0
  RETURN_NOT_OK(dwb.ExtendSubDocument(doc_path, value, read_ht,
392
0
                                      CoarseTimePoint::max(), rocksdb::kDefaultQueryId, ttl));
393
0
  return WriteToRocksDB(dwb, hybrid_time);
394
0
}
395
396
Status DocDBRocksDBUtil::ExtendList(
397
    const DocPath& doc_path,
398
    const SubDocument& value,
399
    HybridTime hybrid_time,
400
0
    const ReadHybridTime& read_ht) {
401
0
  auto dwb = MakeDocWriteBatch();
402
0
  RETURN_NOT_OK(dwb.ExtendList(doc_path, value, read_ht, CoarseTimePoint::max()));
403
0
  return WriteToRocksDB(dwb, hybrid_time);
404
0
}
405
406
Status DocDBRocksDBUtil::ReplaceInList(
407
    const DocPath &doc_path,
408
    const int target_cql_index,
409
    const SubDocument& value,
410
    const ReadHybridTime& read_ht,
411
    const HybridTime& hybrid_time,
412
    const rocksdb::QueryId query_id,
413
    MonoDelta default_ttl,
414
    MonoDelta ttl,
415
0
    UserTimeMicros user_timestamp) {
416
0
  auto dwb = MakeDocWriteBatch();
417
0
  RETURN_NOT_OK(dwb.ReplaceCqlInList(
418
0
      doc_path, target_cql_index, value, read_ht, CoarseTimePoint::max(), query_id, default_ttl,
419
0
      ttl));
420
0
  return WriteToRocksDB(dwb, hybrid_time);
421
0
}
422
423
Status DocDBRocksDBUtil::DeleteSubDoc(
424
    const DocPath& doc_path,
425
    HybridTime hybrid_time,
426
0
    const ReadHybridTime& read_ht) {
427
0
  auto dwb = MakeDocWriteBatch();
428
0
  RETURN_NOT_OK(dwb.DeleteSubDoc(doc_path, read_ht));
429
0
  return WriteToRocksDB(dwb, hybrid_time);
430
0
}
431
432
0
void DocDBRocksDBUtil::DocDBDebugDumpToConsole() {
433
0
  DocDBDebugDump(regular_db_.get(), std::cerr, StorageDbType::kRegular);
434
0
}
435
436
0
Status DocDBRocksDBUtil::FlushRocksDbAndWait() {
437
0
  rocksdb::FlushOptions flush_options;
438
0
  flush_options.wait = true;
439
0
  return rocksdb()->Flush(flush_options);
440
0
}
441
442
0
Status DocDBRocksDBUtil::ReinitDBOptions() {
443
0
  tablet::TabletOptions tablet_options;
444
0
  tablet_options.block_cache = block_cache_;
445
0
  docdb::InitRocksDBOptions(
446
0
      &regular_db_options_, "[R] " /* log_prefix */, regular_db_options_.statistics,
447
0
      tablet_options);
448
0
  docdb::InitRocksDBOptions(
449
0
      &intents_db_options_, "[I] " /* log_prefix */, intents_db_options_.statistics,
450
0
      tablet_options);
451
0
  regular_db_options_.compaction_filter_factory =
452
0
      std::make_shared<docdb::DocDBCompactionFilterFactory>(
453
0
          retention_policy_, &KeyBounds::kNoBounds);
454
0
  regular_db_options_.compaction_file_filter_factory =
455
0
      compaction_file_filter_factory_;
456
0
  regular_db_options_.max_file_size_for_compaction =
457
0
      max_file_size_for_compaction_;
458
0
  if (!regular_db_) {
459
0
    return Status::OK();
460
0
  }
461
0
  return ReopenRocksDB();
462
0
}
463
464
0
DocWriteBatch DocDBRocksDBUtil::MakeDocWriteBatch() {
465
0
  return DocWriteBatch(
466
0
      DocDB::FromRegularUnbounded(regular_db_.get()), init_marker_behavior_, &monotonic_counter_);
467
0
}
468
469
0
DocWriteBatch DocDBRocksDBUtil::MakeDocWriteBatch(InitMarkerBehavior init_marker_behavior) {
470
0
  return DocWriteBatch(
471
0
      DocDB::FromRegularUnbounded(regular_db_.get()), init_marker_behavior, &monotonic_counter_);
472
0
}
473
474
0
DocWriteBatch& DocDBRocksDBUtil::DefaultDocWriteBatch() {
475
0
  if (!doc_write_batch_) {
476
0
    doc_write_batch_ = MakeDocWriteBatch();
477
0
  }
478
479
0
  return *doc_write_batch_;
480
0
}
481
482
0
void DocDBRocksDBUtil::SetInitMarkerBehavior(InitMarkerBehavior init_marker_behavior) {
483
0
  if (init_marker_behavior_ != init_marker_behavior) {
484
0
    LOG(INFO) << "Setting init marker behavior to " << init_marker_behavior;
485
0
    init_marker_behavior_ = init_marker_behavior;
486
0
  }
487
0
}
488
489
}  // namespace docdb
490
}  // namespace yb