YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/docdb/docdb_util.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include "yb/docdb/docdb_util.h"
15
16
#include "yb/docdb/consensus_frontier.h"
17
#include "yb/docdb/doc_key.h"
18
#include "yb/docdb/docdb.h"
19
#include "yb/docdb/docdb_debug.h"
20
#include "yb/docdb/docdb_rocksdb_util.h"
21
#include "yb/docdb/rocksdb_writer.h"
22
23
#include "yb/rocksutil/write_batch_formatter.h"
24
#include "yb/rocksutil/yb_rocksdb.h"
25
26
#include "yb/tablet/tablet_options.h"
27
28
#include "yb/util/env.h"
29
#include "yb/util/status_format.h"
30
#include "yb/util/string_trim.h"
31
#include "yb/docdb/docdb_pgapi.h"
32
33
using std::string;
34
using std::make_shared;
35
using std::endl;
36
using strings::Substitute;
37
using yb::FormatBytesAsStr;
38
using yb::util::ApplyEagerLineContinuation;
39
using std::vector;
40
41
namespace yb {
42
namespace docdb {
43
44
void SetValueFromQLBinaryWrapper(
45
78.3k
    QLValuePB ql_value, const int pg_data_type, DatumMessagePB* cdc_datum_message) {
46
78.3k
  WARN_NOT_OK(yb::docdb::SetValueFromQLBinary(ql_value, pg_data_type, cdc_datum_message), "Failed");
47
78.3k
}
48
49
647k
rocksdb::DB* DocDBRocksDBUtil::rocksdb() {
50
647k
  return DCHECK_NOTNULL(regular_db_.get());
51
647k
}
52
53
642k
rocksdb::DB* DocDBRocksDBUtil::intents_db() {
54
642k
  return DCHECK_NOTNULL(intents_db_.get());
55
642k
}
56
57
443
std::string DocDBRocksDBUtil::IntentsDBDir() {
58
443
  return rocksdb_dir_ + ".intents";
59
443
}
60
61
190
Status DocDBRocksDBUtil::OpenRocksDB() {
62
  // Init the directory if needed.
63
190
  if (rocksdb_dir_.empty()) {
64
0
    RETURN_NOT_OK(InitRocksDBDir());
65
0
  }
66
67
190
  rocksdb::DB* rocksdb = nullptr;
68
190
  RETURN_NOT_OK(rocksdb::DB::Open(regular_db_options_, rocksdb_dir_, &rocksdb));
69
190
  LOG(INFO) << "Opened RocksDB at " << rocksdb_dir_;
70
190
  regular_db_.reset(rocksdb);
71
72
190
  rocksdb = nullptr;
73
190
  RETURN_NOT_OK(rocksdb::DB::Open(intents_db_options_, IntentsDBDir(), &rocksdb));
74
190
  intents_db_.reset(rocksdb);
75
76
190
  return Status::OK();
77
190
}
78
79
205
void DocDBRocksDBUtil::CloseRocksDB() {
80
205
  intents_db_.reset();
81
205
  regular_db_.reset();
82
205
}
83
84
69
Status DocDBRocksDBUtil::ReopenRocksDB() {
85
69
  CloseRocksDB();
86
69
  return OpenRocksDB();
87
69
}
88
89
134
Status DocDBRocksDBUtil::DestroyRocksDB() {
90
134
  CloseRocksDB();
91
134
  LOG(INFO) << "Destroying RocksDB database at " << rocksdb_dir_;
92
134
  RETURN_NOT_OK(rocksdb::DestroyDB(rocksdb_dir_, regular_db_options_));
93
134
  RETURN_NOT_OK(rocksdb::DestroyDB(IntentsDBDir(), intents_db_options_));
94
134
  return Status::OK();
95
134
}
96
97
119
void DocDBRocksDBUtil::ResetMonotonicCounter() {
98
119
  monotonic_counter_.store(0);
99
119
}
100
101
namespace {
102
103
class DirectWriteToWriteBatchHandler : public rocksdb::DirectWriteHandler {
104
 public:
105
  explicit DirectWriteToWriteBatchHandler(rocksdb::WriteBatch *write_batch)
106
254
      : write_batch_(write_batch) {}
107
108
2.48k
  void Put(const SliceParts& key, const SliceParts& value) override {
109
2.48k
    write_batch_->Put(key, value);
110
2.48k
  }
111
112
0
  void SingleDelete(const Slice& key) override {
113
0
    write_batch_->SingleDelete(key);
114
0
  }
115
116
 private:
117
  rocksdb::WriteBatch *write_batch_;
118
};
119
120
} // namespace
121
122
Status DocDBRocksDBUtil::PopulateRocksDBWriteBatch(
123
    const DocWriteBatch& dwb,
124
    rocksdb::WriteBatch* rocksdb_write_batch,
125
    HybridTime hybrid_time,
126
    bool decode_dockey,
127
    bool increment_write_id,
128
761k
    PartialRangeKeyIntents partial_range_key_intents) const {
129
761k
  if (decode_dockey) {
130
1.41M
    for (const auto& entry : dwb.key_value_pairs()) {
131
      // Skip key validation for external intents.
132
1.41M
      if (!entry.first.empty() && entry.first[0] == ValueTypeAsChar::kExternalTransactionId) {
133
0
        continue;
134
0
      }
135
1.41M
      SubDocKey subdoc_key;
136
      // We don't expect any invalid encoded keys in the write batch. However, these encoded keys
137
      // don't contain the HybridTime.
138
1.41M
      RETURN_NOT_OK_PREPEND(subdoc_key.FullyDecodeFromKeyWithOptionalHybridTime(entry.first),
139
1.41M
          Substitute("when decoding key: $0", FormatBytesAsStr(entry.first)));
140
1.41M
    }
141
761k
  }
142
143
761k
  if (current_txn_id_.is_initialized()) {
144
254
    if (!increment_write_id) {
145
0
      return STATUS(
146
0
          InternalError, "For transactional write only increment_write_id=true is supported");
147
0
    }
148
254
    KeyValueWriteBatchPB kv_write_batch;
149
254
    dwb.TEST_CopyToWriteBatchPB(&kv_write_batch);
150
254
    TransactionalWriter writer(
151
254
        kv_write_batch, hybrid_time, *current_txn_id_, txn_isolation_level_,
152
254
        partial_range_key_intents, /* replicated_batches_state= */ Slice(), intra_txn_write_id_);
153
254
    DirectWriteToWriteBatchHandler handler(rocksdb_write_batch);
154
254
    RETURN_NOT_OK(writer.Apply(&handler));
155
254
    intra_txn_write_id_ = writer.intra_txn_write_id();
156
760k
  } else {
157
    // TODO: this block has common code with docdb::PrepareExternalWriteBatch and probably
158
    // can be refactored, so common code is reused.
159
760k
    IntraTxnWriteId write_id = 0;
160
1.41M
    for (const auto& entry : dwb.key_value_pairs()) {
161
1.41M
      string rocksdb_key;
162
1.41M
      if (hybrid_time.is_valid()) {
163
        // HybridTime provided. Append a PrimitiveValue with the HybridTime to the key.
164
1.41M
        const KeyBytes encoded_ht =
165
1.41M
            PrimitiveValue(DocHybridTime(hybrid_time, write_id)).ToKeyBytes();
166
1.41M
        rocksdb_key = entry.first + encoded_ht.ToStringBuffer();
167
1.41M
      } else {
168
        // Useful when printing out a write batch that does not yet know the HybridTime it will be
169
        // committed with.
170
32
        rocksdb_key = entry.first;
171
32
      }
172
1.41M
      rocksdb_write_batch->Put(rocksdb_key, entry.second);
173
1.41M
      if (increment_write_id) {
174
1.41M
        ++write_id;
175
1.41M
      }
176
1.41M
    }
177
760k
  }
178
761k
  return Status::OK();
179
761k
}
180
181
Status DocDBRocksDBUtil::WriteToRocksDB(
182
    const DocWriteBatch& doc_write_batch,
183
    const HybridTime& hybrid_time,
184
    bool decode_dockey,
185
    bool increment_write_id,
186
762k
    PartialRangeKeyIntents partial_range_key_intents) {
187
762k
  if (doc_write_batch.IsEmpty()) {
188
1.39k
    return Status::OK();
189
1.39k
  }
190
761k
  if (!hybrid_time.is_valid()) {
191
0
    return STATUS_SUBSTITUTE(InvalidArgument, "Hybrid time is not valid: $0",
192
0
                             hybrid_time.ToString());
193
0
  }
194
195
761k
  ConsensusFrontiers frontiers;
196
761k
  rocksdb::WriteBatch rocksdb_write_batch;
197
761k
  if (!op_id_.empty()) {
198
200
    ++op_id_.index;
199
200
    set_op_id(op_id_, &frontiers);
200
200
    set_hybrid_time(hybrid_time, &frontiers);
201
200
    rocksdb_write_batch.SetFrontiers(&frontiers);
202
200
  }
203
204
761k
  RETURN_NOT_OK(PopulateRocksDBWriteBatch(
205
761k
      doc_write_batch, &rocksdb_write_batch, hybrid_time, decode_dockey, increment_write_id,
206
761k
      partial_range_key_intents));
207
208
761k
  rocksdb::DB* db = current_txn_id_ ? 
intents_db_.get()254
:
regular_db_.get()760k
;
209
761k
  rocksdb::Status rocksdb_write_status = db->Write(write_options(), &rocksdb_write_batch);
210
211
761k
  if (!rocksdb_write_status.ok()) {
212
0
    LOG(ERROR) << "Failed writing to RocksDB: " << rocksdb_write_status.ToString();
213
0
    return STATUS_SUBSTITUTE(RuntimeError,
214
0
                             "Error writing to RocksDB: $0", rocksdb_write_status.ToString());
215
0
  }
216
761k
  return Status::OK();
217
761k
}
218
219
119
Status DocDBRocksDBUtil::InitCommonRocksDBOptionsForTests() {
220
  // TODO(bojanserafimov): create MemoryMonitor?
221
119
  const size_t cache_size = block_cache_size();
222
119
  if (cache_size > 0) {
223
119
    block_cache_ = rocksdb::NewLRUCache(cache_size);
224
119
  }
225
226
119
  regular_db_options_.statistics = rocksdb::CreateDBStatisticsForTests(/* for intents */ false);
227
119
  intents_db_options_.statistics = rocksdb::CreateDBStatisticsForTests(/* for intents */ true);
228
119
  RETURN_NOT_OK(ReinitDBOptions());
229
119
  InitRocksDBWriteOptions(&write_options_);
230
119
  return Status::OK();
231
119
}
232
233
0
Status DocDBRocksDBUtil::InitCommonRocksDBOptionsForBulkLoad() {
234
0
  const size_t cache_size = block_cache_size();
235
0
  if (cache_size > 0) {
236
0
    block_cache_ = rocksdb::NewLRUCache(cache_size);
237
0
  }
238
239
  // Don't care about statistics/metrics as we don't keep metric registries during
240
  // bulk load.
241
0
  regular_db_options_.statistics = nullptr;
242
0
  intents_db_options_.statistics = nullptr;
243
0
  RETURN_NOT_OK(ReinitDBOptions());
244
0
  InitRocksDBWriteOptions(&write_options_);
245
0
  return Status::OK();
246
0
}
247
248
Status DocDBRocksDBUtil::WriteToRocksDBAndClear(
249
    DocWriteBatch* dwb,
250
    const HybridTime& hybrid_time,
251
25
    bool decode_dockey, bool increment_write_id) {
252
25
  RETURN_NOT_OK(WriteToRocksDB(*dwb, hybrid_time, decode_dockey, increment_write_id));
253
25
  dwb->Clear();
254
25
  return Status::OK();
255
25
}
256
257
10
Status DocDBRocksDBUtil::WriteSimple(int index) {
258
10
  auto encoded_doc_key = DocKey(PrimitiveValues(Format("row$0", index), 11111 * index)).Encode();
259
10
  op_id_.term = index / 2;
260
10
  op_id_.index = index;
261
10
  auto& dwb = DefaultDocWriteBatch();
262
10
  QLValuePB value;
263
10
  value.set_int32_value(index);
264
10
  RETURN_NOT_OK(dwb.SetPrimitive(
265
10
      DocPath(encoded_doc_key, PrimitiveValue(ColumnId(10))),
266
10
      ValueRef(value, SortingType::kNotSpecified)));
267
10
  return WriteToRocksDBAndClear(&dwb, HybridTime::FromMicros(1000 * index));
268
10
}
269
270
588
void DocDBRocksDBUtil::SetHistoryCutoffHybridTime(HybridTime history_cutoff) {
271
588
  retention_policy_->SetHistoryCutoff(history_cutoff);
272
588
}
273
274
2
void DocDBRocksDBUtil::SetTableTTL(uint64_t ttl_msec) {
275
2
  schema_.SetDefaultTimeToLive(ttl_msec);
276
2
  retention_policy_->SetTableTTLForTests(MonoDelta::FromMilliseconds(ttl_msec));
277
2
}
278
279
292
string DocDBRocksDBUtil::DocDBDebugDumpToStr() {
280
292
  return yb::docdb::DocDBDebugDumpToStr(rocksdb()) +
281
292
         yb::docdb::DocDBDebugDumpToStr(intents_db(), StorageDbType::kIntents);
282
292
}
283
284
Status DocDBRocksDBUtil::SetPrimitive(
285
    const DocPath& doc_path,
286
    const ValueControlFields& control_fields,
287
    const ValueRef& value,
288
    const HybridTime hybrid_time,
289
2.62k
    const ReadHybridTime& read_ht) {
290
2.62k
  auto dwb = MakeDocWriteBatch();
291
2.62k
  RETURN_NOT_OK(dwb.SetPrimitive(doc_path, control_fields, value, read_ht));
292
2.62k
  return WriteToRocksDB(dwb, hybrid_time);
293
2.62k
}
294
295
Status DocDBRocksDBUtil::SetPrimitive(
296
    const DocPath& doc_path,
297
    const QLValuePB& value,
298
    const HybridTime hybrid_time,
299
2.29k
    const ReadHybridTime& read_ht) {
300
2.29k
  return SetPrimitive(doc_path, ValueRef(value), hybrid_time, read_ht);
301
2.29k
}
302
303
Status DocDBRocksDBUtil::AddExternalIntents(
304
    const TransactionId& txn_id,
305
    const std::vector<ExternalIntent>& intents,
306
    const Uuid& involved_tablet,
307
2
    HybridTime hybrid_time) {
308
2
  class Provider : public ExternalIntentsProvider {
309
2
   public:
310
2
    Provider(
311
2
        const std::vector<ExternalIntent>* intents, const Uuid& involved_tablet,
312
2
        HybridTime hybrid_time)
313
2
        : intents_(*intents), involved_tablet_(involved_tablet), hybrid_time_(hybrid_time) {}
314
315
2
    void SetKey(const Slice& slice) override {
316
2
      key_.AppendRawBytes(slice);
317
2
    }
318
319
2
    void SetValue(const Slice& slice) override {
320
2
      value_ = slice;
321
2
    }
322
323
2
    void Apply(rocksdb::WriteBatch* batch) {
324
2
      KeyValuePairPB kv_pair;
325
2
      kv_pair.set_key(key_.ToStringBuffer());
326
2
      kv_pair.set_value(value_.ToStringBuffer());
327
2
      ExternalTxnApplyState external_txn_apply_state;
328
2
      AddExternalPairToWriteBatch(
329
2
          kv_pair, hybrid_time_, /* write_id= */ 0, &external_txn_apply_state,
330
2
          /* regular_write_batch= */ nullptr, batch);
331
2
    }
332
333
6
    boost::optional<std::pair<Slice, Slice>> Next() override {
334
6
      if (next_idx_ >= intents_.size()) {
335
2
        return boost::none;
336
2
      }
337
338
      // It is ok to have inefficient code in tests.
339
4
      const auto& intent = intents_[next_idx_];
340
4
      ++next_idx_;
341
342
4
      intent_key_ = intent.doc_path.encoded_doc_key();
343
4
      for (const auto& subkey : intent.doc_path.subkeys()) {
344
4
        subkey.AppendToKey(&intent_key_);
345
4
      }
346
4
      intent_value_ = intent.value;
347
348
4
      return std::pair<Slice, Slice>(intent_key_.AsSlice(), intent_value_);
349
6
    }
350
351
2
    const Uuid& InvolvedTablet() override {
352
2
      return involved_tablet_;
353
2
    }
354
355
2
   private:
356
2
    const std::vector<ExternalIntent>& intents_;
357
2
    const Uuid involved_tablet_;
358
2
    const HybridTime hybrid_time_;
359
2
    size_t next_idx_ = 0;
360
2
    KeyBytes key_;
361
2
    KeyBuffer value_;
362
363
2
    KeyBytes intent_key_;
364
2
    std::string intent_value_;
365
2
  };
366
367
2
  Provider provider(&intents, involved_tablet, hybrid_time);
368
2
  CombineExternalIntents(txn_id, &provider);
369
370
2
  rocksdb::WriteBatch rocksdb_write_batch;
371
2
  provider.Apply(&rocksdb_write_batch);
372
373
2
  return intents_db_->Write(write_options(), &rocksdb_write_batch);
374
2
}
375
376
Status DocDBRocksDBUtil::InsertSubDocument(
377
    const DocPath& doc_path,
378
    const ValueRef& value,
379
    const HybridTime hybrid_time,
380
    MonoDelta ttl,
381
116
    const ReadHybridTime& read_ht) {
382
116
  auto dwb = MakeDocWriteBatch();
383
116
  RETURN_NOT_OK(dwb.InsertSubDocument(doc_path, value, read_ht,
384
116
                                      CoarseTimePoint::max(), rocksdb::kDefaultQueryId, ttl));
385
116
  return WriteToRocksDB(dwb, hybrid_time);
386
116
}
387
388
Status DocDBRocksDBUtil::ExtendSubDocument(
389
    const DocPath& doc_path,
390
    const ValueRef& value,
391
    const HybridTime hybrid_time,
392
    MonoDelta ttl,
393
82
    const ReadHybridTime& read_ht) {
394
82
  auto dwb = MakeDocWriteBatch();
395
82
  RETURN_NOT_OK(dwb.ExtendSubDocument(doc_path, value, read_ht,
396
82
                                      CoarseTimePoint::max(), rocksdb::kDefaultQueryId, ttl));
397
82
  return WriteToRocksDB(dwb, hybrid_time);
398
82
}
399
400
Status DocDBRocksDBUtil::ExtendList(
401
    const DocPath& doc_path,
402
    const ValueRef& value,
403
    HybridTime hybrid_time,
404
4
    const ReadHybridTime& read_ht) {
405
4
  auto dwb = MakeDocWriteBatch();
406
4
  RETURN_NOT_OK(dwb.ExtendList(doc_path, value, read_ht, CoarseTimePoint::max()));
407
4
  return WriteToRocksDB(dwb, hybrid_time);
408
4
}
409
410
Status DocDBRocksDBUtil::ReplaceInList(
411
    const DocPath &doc_path,
412
    const int target_cql_index,
413
    const ValueRef& value,
414
    const ReadHybridTime& read_ht,
415
    const HybridTime& hybrid_time,
416
    const rocksdb::QueryId query_id,
417
    MonoDelta default_ttl,
418
    MonoDelta ttl,
419
8
    UserTimeMicros user_timestamp) {
420
8
  auto dwb = MakeDocWriteBatch();
421
8
  RETURN_NOT_OK(dwb.ReplaceCqlInList(
422
8
      doc_path, target_cql_index, value, read_ht, CoarseTimePoint::max(), query_id, default_ttl,
423
8
      ttl));
424
6
  return WriteToRocksDB(dwb, hybrid_time);
425
8
}
426
427
Status DocDBRocksDBUtil::DeleteSubDoc(
428
    const DocPath& doc_path,
429
    HybridTime hybrid_time,
430
15
    const ReadHybridTime& read_ht) {
431
15
  auto dwb = MakeDocWriteBatch();
432
15
  RETURN_NOT_OK(dwb.DeleteSubDoc(doc_path, read_ht));
433
15
  return WriteToRocksDB(dwb, hybrid_time);
434
15
}
435
436
0
void DocDBRocksDBUtil::DocDBDebugDumpToConsole() {
437
0
  DocDBDebugDump(regular_db_.get(), std::cerr, StorageDbType::kRegular);
438
0
}
439
440
2.01k
Status DocDBRocksDBUtil::FlushRocksDbAndWait() {
441
2.01k
  rocksdb::FlushOptions flush_options;
442
2.01k
  flush_options.wait = true;
443
2.01k
  return rocksdb()->Flush(flush_options);
444
2.01k
}
445
446
124
Status DocDBRocksDBUtil::ReinitDBOptions() {
447
124
  tablet::TabletOptions tablet_options;
448
124
  tablet_options.block_cache = block_cache_;
449
124
  docdb::InitRocksDBOptions(
450
124
      &regular_db_options_, "[R] " /* log_prefix */, regular_db_options_.statistics,
451
124
      tablet_options);
452
124
  docdb::InitRocksDBOptions(
453
124
      &intents_db_options_, "[I] " /* log_prefix */, intents_db_options_.statistics,
454
124
      tablet_options);
455
124
  regular_db_options_.compaction_filter_factory =
456
124
      std::make_shared<docdb::DocDBCompactionFilterFactory>(
457
124
          retention_policy_, &KeyBounds::kNoBounds);
458
124
  regular_db_options_.compaction_file_filter_factory =
459
124
      compaction_file_filter_factory_;
460
124
  regular_db_options_.max_file_size_for_compaction =
461
124
      max_file_size_for_compaction_;
462
124
  if (!regular_db_) {
463
119
    return Status::OK();
464
119
  }
465
5
  return ReopenRocksDB();
466
124
}
467
468
362k
DocWriteBatch DocDBRocksDBUtil::MakeDocWriteBatch() {
469
362k
  return DocWriteBatch(
470
362k
      DocDB::FromRegularUnbounded(regular_db_.get()), init_marker_behavior_, &monotonic_counter_);
471
362k
}
472
473
4
DocWriteBatch DocDBRocksDBUtil::MakeDocWriteBatch(InitMarkerBehavior init_marker_behavior) {
474
4
  return DocWriteBatch(
475
4
      DocDB::FromRegularUnbounded(regular_db_.get()), init_marker_behavior, &monotonic_counter_);
476
4
}
477
478
10
DocWriteBatch& DocDBRocksDBUtil::DefaultDocWriteBatch() {
479
10
  if (!doc_write_batch_) {
480
2
    doc_write_batch_ = MakeDocWriteBatch();
481
2
  }
482
483
10
  return *doc_write_batch_;
484
10
}
485
486
23
void DocDBRocksDBUtil::SetInitMarkerBehavior(InitMarkerBehavior init_marker_behavior) {
487
23
  if (init_marker_behavior_ != init_marker_behavior) {
488
15
    LOG(INFO) << "Setting init marker behavior to " << init_marker_behavior;
489
15
    init_marker_behavior_ = init_marker_behavior;
490
15
  }
491
23
}
492
493
}  // namespace docdb
494
}  // namespace yb