YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/docdb/docdb_rocksdb_util.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include "yb/docdb/docdb_rocksdb_util.h"
15
16
#include <memory>
17
#include <thread>
18
19
#include "yb/common/transaction.h"
20
21
#include "yb/docdb/bounded_rocksdb_iterator.h"
22
#include "yb/docdb/consensus_frontier.h"
23
#include "yb/docdb/doc_key.h"
24
#include "yb/docdb/intent_aware_iterator.h"
25
#include "yb/docdb/key_bounds.h"
26
#include "yb/docdb/value_type.h"
27
28
#include "yb/gutil/casts.h"
29
#include "yb/gutil/sysinfo.h"
30
31
#include "yb/rocksdb/db/db_impl.h"
32
#include "yb/rocksdb/db/filename.h"
33
#include "yb/rocksdb/db/version_edit.h"
34
#include "yb/rocksdb/db/version_set.h"
35
#include "yb/rocksdb/db/writebuffer.h"
36
#include "yb/rocksdb/memtablerep.h"
37
#include "yb/rocksdb/options.h"
38
#include "yb/rocksdb/rate_limiter.h"
39
#include "yb/rocksdb/table.h"
40
#include "yb/rocksdb/table/filtering_iterator.h"
41
#include "yb/rocksdb/types.h"
42
#include "yb/rocksdb/util/compression.h"
43
44
#include "yb/rocksutil/yb_rocksdb_logger.h"
45
46
#include "yb/util/bytes_formatter.h"
47
#include "yb/util/priority_thread_pool.h"
48
#include "yb/util/result.h"
49
#include "yb/util/size_literals.h"
50
#include "yb/util/status.h"
51
#include "yb/util/status_format.h"
52
#include "yb/util/status_log.h"
53
#include "yb/util/trace.h"
54
55
using namespace yb::size_literals;  // NOLINT.
56
using namespace std::literals;
57
58
static constexpr int32_t kMinBlockStartInterval = 1;
59
static constexpr int32_t kDefaultBlockStartInterval = 16;
60
static constexpr int32_t kMaxBlockStartInterval = 256;
61
62
DEFINE_int32(rocksdb_max_background_flushes, -1, "Number threads to do background flushes.");
63
DEFINE_bool(rocksdb_disable_compactions, false, "Disable rocksdb compactions.");
64
DEFINE_bool(rocksdb_compaction_measure_io_stats, false, "Measure stats for rocksdb compactions.");
65
DEFINE_int32(rocksdb_base_background_compactions, -1,
66
             "Number threads to do background compactions.");
67
DEFINE_int32(rocksdb_max_background_compactions, -1,
68
             "Increased number of threads to do background compactions (used when compactions need "
69
             "to catch up.)");
70
DEFINE_int32(rocksdb_level0_file_num_compaction_trigger, 5,
71
             "Number of files to trigger level-0 compaction. -1 if compaction should not be "
72
             "triggered by number of files at all.");
73
74
DEFINE_int32(rocksdb_level0_slowdown_writes_trigger, -1,
75
             "The number of files above which writes are slowed down.");
76
DEFINE_int32(rocksdb_level0_stop_writes_trigger, -1,
77
             "The number of files above which compactions are stopped.");
78
DEFINE_int32(rocksdb_universal_compaction_size_ratio, 20,
79
             "The percentage upto which files that are larger are include in a compaction.");
80
DEFINE_uint64(rocksdb_universal_compaction_always_include_size_threshold, 64_MB,
81
             "Always include files of smaller or equal size in a compaction.");
82
DEFINE_int32(rocksdb_universal_compaction_min_merge_width, 4,
83
             "The minimum number of files in a single compaction run.");
84
DEFINE_int64(rocksdb_compact_flush_rate_limit_bytes_per_sec, 1_GB,
85
             "Use to control write rate of flush and compaction.");
86
DEFINE_string(rocksdb_compact_flush_rate_limit_sharing_mode, "tserver",
87
              "Allows to control rate limit sharing/calculation across RocksDB instances\n"
88
              "  tserver - rate limit is shared across all RocksDB instances"
89
              " at tabset server level\n"
90
              "  none - rate limit is calculated independently for every RocksDB instance");
91
DEFINE_uint64(rocksdb_compaction_size_threshold_bytes, 2ULL * 1024 * 1024 * 1024,
92
             "Threshold beyond which compaction is considered large.");
93
DEFINE_uint64(rocksdb_max_file_size_for_compaction, 0,
94
             "Maximal allowed file size to participate in RocksDB compaction. 0 - unlimited.");
95
DEFINE_int32(rocksdb_max_write_buffer_number, 2,
96
             "Maximum number of write buffers that are built up in memory.");
97
98
DEFINE_int64(db_block_size_bytes, 32_KB,
99
             "Size of RocksDB data block (in bytes).");
100
101
DEFINE_int64(db_filter_block_size_bytes, 64_KB,
102
             "Size of RocksDB filter block (in bytes).");
103
104
DEFINE_int64(db_index_block_size_bytes, 32_KB,
105
             "Size of RocksDB index block (in bytes).");
106
107
DEFINE_int64(db_min_keys_per_index_block, 100,
108
             "Minimum number of keys per index block.");
109
110
DEFINE_int64(db_write_buffer_size, -1,
111
             "Size of RocksDB write buffer (in bytes). -1 to use default.");
112
113
DEFINE_int32(memstore_size_mb, 128,
114
             "Max size (in mb) of the memstore, before needing to flush.");
115
116
DEFINE_bool(use_docdb_aware_bloom_filter, true,
117
            "Whether to use the DocDbAwareFilterPolicy for both bloom storage and seeks.");
118
// Empirically 2 is a minimal value that provides best performance on sequential scan.
119
DEFINE_int32(max_nexts_to_avoid_seek, 2,
120
             "The number of next calls to try before doing resorting to do a rocksdb seek.");
121
122
DEFINE_bool(use_multi_level_index, true, "Whether to use multi-level data index.");
123
124
DEFINE_string(
125
    regular_tablets_data_block_key_value_encoding, "shared_prefix",
126
    "Key-value encoding to use for regular data blocks in RocksDB. Possible options: "
127
    "shared_prefix, three_shared_parts");
128
129
DEFINE_uint64(initial_seqno, 1ULL << 50, "Initial seqno for new RocksDB instances.");
130
131
DEFINE_int32(num_reserved_small_compaction_threads, -1, "Number of reserved small compaction "
132
             "threads. It allows splitting small vs. large compactions.");
133
134
DEFINE_bool(enable_ondisk_compression, true,
135
            "Determines whether SSTable compression is enabled or not.");
136
137
DEFINE_int32(priority_thread_pool_size, -1,
138
             "Max running workers in compaction thread pool. "
139
             "If -1 and max_background_compactions is specified - use max_background_compactions. "
140
             "If -1 and max_background_compactions is not specified - use sqrt(num_cpus).");
141
142
DEFINE_string(compression_type, "Snappy",
143
              "On-disk compression type to use in RocksDB."
144
              "By default, Snappy is used if supported.");
145
146
DEFINE_int32(block_restart_interval, kDefaultBlockStartInterval,
147
             "Controls the number of keys to look at for computing the diff encoding.");
148
149
namespace yb {
150
151
namespace {
152
153
441k
Result<rocksdb::CompressionType> GetConfiguredCompressionType(const std::string& flag_value) {
154
441k
  if (!FLAGS_enable_ondisk_compression) {
155
1
    return rocksdb::kNoCompression;
156
1
  }
157
441k
  const std::vector<rocksdb::CompressionType> kValidRocksDBCompressionTypes = {
158
441k
    rocksdb::kNoCompression,
159
441k
    rocksdb::kSnappyCompression,
160
441k
    rocksdb::kZlibCompression,
161
441k
    rocksdb::kLZ4Compression
162
441k
  };
163
882k
  for (const auto& compression_type : kValidRocksDBCompressionTypes) {
164
882k
    if (flag_value == rocksdb::CompressionTypeToString(compression_type)) {
165
441k
      if (rocksdb::CompressionTypeSupported(compression_type)) {
166
441k
        return compression_type;
167
441k
      }
168
11
      return STATUS_FORMAT(
169
11
          InvalidArgument, "Configured compression type $0 is not supported.", flag_value);
170
11
    }
171
882k
  }
172
18.4E
  return STATUS_FORMAT(
173
441k
      InvalidArgument, "Configured compression type $0 is not valid.", flag_value);
174
441k
}
175
176
} // namespace
177
178
namespace docdb {
179
180
  Result<rocksdb::KeyValueEncodingFormat> GetConfiguredKeyValueEncodingFormat(
181
141k
    const std::string& flag_value) {
182
141k
    for (const auto& encoding_format : rocksdb::kKeyValueEncodingFormatList) {
183
141k
      if (flag_value == KeyValueEncodingFormatToString(encoding_format)) {
184
141k
        return encoding_format;
185
141k
      }
186
141k
    }
187
18.4E
    return STATUS_FORMAT(InvalidArgument, "Key-value encoding format $0 is not valid.", flag_value);
188
141k
  }
189
190
} // namespace docdb
191
192
} // namespace yb
193
194
namespace {
195
196
11.3k
bool CompressionTypeValidator(const char* flagname, const std::string& flag_compression_type) {
197
11.3k
  auto res = yb::GetConfiguredCompressionType(flag_compression_type);
198
11.3k
  if (!res.ok()) {
199
    // Below we CHECK_RESULT on the same value returned here, and validating the result here ensures
200
    // that CHECK_RESULT will never fail once the process is running.
201
0
    LOG(ERROR) << res.status().ToString();
202
0
    return false;
203
0
  }
204
11.3k
  return true;
205
11.3k
}
206
207
11.3k
bool KeyValueEncodingFormatValidator(const char* flag_name, const std::string& flag_value) {
208
11.3k
  auto res = yb::docdb::GetConfiguredKeyValueEncodingFormat(flag_value);
209
11.3k
  bool ok = res.ok();
210
11.3k
  if (!ok) {
211
0
    LOG(ERROR) << flag_name << ": " << res.status();
212
0
  }
213
11.3k
  return ok;
214
11.3k
}
215
216
} // namespace
217
218
__attribute__((unused))
219
DEFINE_validator(compression_type, &CompressionTypeValidator);
220
__attribute__((unused))
221
DEFINE_validator(regular_tablets_data_block_key_value_encoding, &KeyValueEncodingFormatValidator);
222
223
using std::shared_ptr;
224
using std::string;
225
using std::unique_ptr;
226
using strings::Substitute;
227
228
namespace yb {
229
namespace docdb {
230
231
std::shared_ptr<rocksdb::BoundaryValuesExtractor> DocBoundaryValuesExtractorInstance();
232
233
441M
void SeekForward(const rocksdb::Slice& slice, rocksdb::Iterator *iter) {
234
441M
  if (!iter->Valid() || iter->key().compare(slice) >= 0) {
235
193M
    return;
236
193M
  }
237
247M
  ROCKSDB_SEEK(iter, slice);
238
247M
}
239
240
64.2M
void SeekForward(const KeyBytes& key_bytes, rocksdb::Iterator *iter) {
241
64.2M
  SeekForward(key_bytes.AsSlice(), iter);
242
64.2M
}
243
244
23.0M
KeyBytes AppendDocHt(const Slice& key, const DocHybridTime& doc_ht) {
245
23.0M
  char buf[kMaxBytesPerEncodedHybridTime + 1];
246
23.0M
  buf[0] = ValueTypeAsChar::kHybridTime;
247
23.0M
  auto end = doc_ht.EncodedInDocDbFormat(buf + 1);
248
23.0M
  return KeyBytes(key, Slice(buf, end));
249
23.0M
}
250
251
23.0M
void SeekPastSubKey(const Slice& key, rocksdb::Iterator* iter) {
252
23.0M
  SeekForward(AppendDocHt(key, DocHybridTime::kMin), iter);
253
23.0M
}
254
255
41.2M
void SeekOutOfSubKey(KeyBytes* key_bytes, rocksdb::Iterator* iter) {
256
41.2M
  key_bytes->AppendValueType(ValueType::kMaxByte);
257
41.2M
  SeekForward(*key_bytes, iter);
258
41.2M
  key_bytes->RemoveValueTypeSuffix(ValueType::kMaxByte);
259
41.2M
}
260
261
void SeekPossiblyUsingNext(rocksdb::Iterator* iter, const Slice& seek_key,
262
260M
                           int* next_count, int* seek_count) {
263
530M
  for (int nexts = FLAGS_max_nexts_to_avoid_seek; nexts-- > 0;) {
264
520M
    if (!iter->Valid() || iter->key().compare(seek_key) >= 0) {
265
251M
      VTRACE(3, "Did $0 Next(s) instead of a Seek", nexts);
266
251M
      return;
267
251M
    }
268
18.4E
    VLOG(4) << "Skipping: " << SubDocKey::DebugSliceToString(iter->key());
269
270
269M
    iter->Next();
271
269M
    ++*next_count;
272
269M
  }
273
274
9.53M
  VTRACE(3, "Forced to do an actual Seek after $0 Next(s)", FLAGS_max_nexts_to_avoid_seek);
275
9.53M
  iter->Seek(seek_key);
276
9.53M
  ++*seek_count;
277
9.53M
}
278
279
void PerformRocksDBSeek(
280
    rocksdb::Iterator *iter,
281
    const rocksdb::Slice &seek_key,
282
    const char* file_name,
283
272M
    int line) {
284
272M
  int next_count = 0;
285
272M
  int seek_count = 0;
286
272M
  if (seek_key.size() == 0) {
287
66.9k
    iter->SeekToFirst();
288
66.9k
    ++seek_count;
289
272M
  } else if (!iter->Valid() || iter->key().compare(seek_key) > 0) {
290
12.3M
    iter->Seek(seek_key);
291
12.3M
    ++seek_count;
292
260M
  } else {
293
260M
    SeekPossiblyUsingNext(iter, seek_key, &next_count, &seek_count);
294
260M
  }
295
18.4E
  VLOG(4) << Substitute(
296
18.4E
      "PerformRocksDBSeek at $0:$1:\n"
297
18.4E
      "    Seek key:         $2\n"
298
18.4E
      "    Seek key (raw):   $3\n"
299
18.4E
      "    Actual key:       $4\n"
300
18.4E
      "    Actual key (raw): $5\n"
301
18.4E
      "    Actual value:     $6\n"
302
18.4E
      "    Next() calls:     $7\n"
303
18.4E
      "    Seek() calls:     $8\n",
304
18.4E
      file_name, line,
305
18.4E
      BestEffortDocDBKeyToStr(seek_key),
306
18.4E
      FormatSliceAsStr(seek_key),
307
18.4E
      iter->Valid() ? BestEffortDocDBKeyToStr(KeyBytes(iter->key())) : "N/A",
308
18.4E
      iter->Valid() ? FormatSliceAsStr(iter->key()) : "N/A",
309
18.4E
      iter->Valid() ? FormatSliceAsStr(iter->value()) : "N/A",
310
18.4E
      next_count,
311
18.4E
      seek_count);
312
272M
}
313
314
namespace {
315
316
rocksdb::ReadOptions PrepareReadOptions(
317
    rocksdb::DB* rocksdb,
318
    BloomFilterMode bloom_filter_mode,
319
    const boost::optional<const Slice>& user_key_for_filter,
320
    const rocksdb::QueryId query_id,
321
    std::shared_ptr<rocksdb::ReadFileFilter> file_filter,
322
15.3M
    const Slice* iterate_upper_bound) {
323
15.3M
  rocksdb::ReadOptions read_opts;
324
15.3M
  read_opts.query_id = query_id;
325
15.3M
  if (FLAGS_use_docdb_aware_bloom_filter &&
326
13.9M
    bloom_filter_mode == BloomFilterMode::USE_BLOOM_FILTER) {
327
8.61M
    DCHECK(user_key_for_filter);
328
8.61M
    read_opts.table_aware_file_filter = rocksdb->GetOptions().table_factory->
329
8.61M
        NewTableAwareReadFileFilter(read_opts, user_key_for_filter.get());
330
8.61M
  }
331
15.3M
  read_opts.file_filter = std::move(file_filter);
332
15.3M
  read_opts.iterate_upper_bound = iterate_upper_bound;
333
15.3M
  return read_opts;
334
15.3M
}
335
336
} // namespace
337
338
BoundedRocksDbIterator CreateRocksDBIterator(
339
    rocksdb::DB* rocksdb,
340
    const KeyBounds* docdb_key_bounds,
341
    BloomFilterMode bloom_filter_mode,
342
    const boost::optional<const Slice>& user_key_for_filter,
343
    const rocksdb::QueryId query_id,
344
    std::shared_ptr<rocksdb::ReadFileFilter> file_filter,
345
7.06M
    const Slice* iterate_upper_bound) {
346
7.06M
  rocksdb::ReadOptions read_opts = PrepareReadOptions(rocksdb, bloom_filter_mode,
347
7.06M
      user_key_for_filter, query_id, std::move(file_filter), iterate_upper_bound);
348
7.06M
  return BoundedRocksDbIterator(rocksdb, read_opts, docdb_key_bounds);
349
7.06M
}
350
351
unique_ptr<IntentAwareIterator> CreateIntentAwareIterator(
352
    const DocDB& doc_db,
353
    BloomFilterMode bloom_filter_mode,
354
    const boost::optional<const Slice>& user_key_for_filter,
355
    const rocksdb::QueryId query_id,
356
    const TransactionOperationContext& txn_op_context,
357
    CoarseTimePoint deadline,
358
    const ReadHybridTime& read_time,
359
    std::shared_ptr<rocksdb::ReadFileFilter> file_filter,
360
8.29M
    const Slice* iterate_upper_bound) {
361
  // TODO(dtxn) do we need separate options for intents db?
362
8.29M
  rocksdb::ReadOptions read_opts = PrepareReadOptions(doc_db.regular, bloom_filter_mode,
363
8.29M
      user_key_for_filter, query_id, std::move(file_filter), iterate_upper_bound);
364
8.29M
  return std::make_unique<IntentAwareIterator>(
365
8.29M
      doc_db, read_opts, deadline, read_time, txn_op_context);
366
8.29M
}
367
368
namespace {
369
370
std::mutex rocksdb_flags_mutex;
371
372
860k
int32_t GetMaxBackgroundFlushes() {
373
860k
  const auto kNumCpus = base::NumCPUs();
374
860k
  if (FLAGS_rocksdb_max_background_flushes == -1) {
375
860k
    constexpr auto kCpusPerFlushThread = 8;
376
860k
    constexpr auto kAutoMaxBackgroundFlushesHighLimit = 4;
377
860k
    auto flushes = 1 + kNumCpus / kCpusPerFlushThread;
378
860k
    auto max_flushes = std::min(flushes, kAutoMaxBackgroundFlushesHighLimit);
379
860k
    LOG(INFO) << "Overriding FLAGS_rocksdb_max_background_flushes to " << max_flushes;
380
860k
    return max_flushes;
381
0
  } else {
382
0
    return FLAGS_rocksdb_max_background_flushes;
383
0
  }
384
860k
}
385
386
// This controls the maximum number of schedulable compactions, per each instance of rocksdb, of
387
// which we will have many. We also do not want to waste resources by having too many queued
388
// compactions.
389
1.72M
int32_t GetMaxBackgroundCompactions() {
390
1.72M
  if (FLAGS_rocksdb_disable_compactions) {
391
0
    return 0;
392
0
  }
393
1.72M
  int rocksdb_max_background_compactions = FLAGS_rocksdb_max_background_compactions;
394
395
1.72M
  if (rocksdb_max_background_compactions >= 0) {
396
7
    return rocksdb_max_background_compactions;
397
7
  }
398
399
1.72M
  const auto kNumCpus = base::NumCPUs();
400
1.72M
  if (kNumCpus <= 4) {
401
0
    rocksdb_max_background_compactions = 1;
402
1.72M
  } else if (kNumCpus <= 8) {
403
0
    rocksdb_max_background_compactions = 2;
404
1.72M
  } else if (kNumCpus <= 32) {
405
1.72M
    rocksdb_max_background_compactions = 3;
406
0
  } else {
407
0
    rocksdb_max_background_compactions = 4;
408
0
  }
409
1.72M
  LOG(INFO) << "FLAGS_rocksdb_max_background_compactions was not set, automatically configuring "
410
1.72M
      << rocksdb_max_background_compactions << " background compactions.";
411
1.72M
  return rocksdb_max_background_compactions;
412
1.72M
}
413
414
860k
int32_t GetBaseBackgroundCompactions() {
415
860k
  if (FLAGS_rocksdb_disable_compactions) {
416
0
    return 0;
417
0
  }
418
419
860k
  if (FLAGS_rocksdb_base_background_compactions == -1) {
420
860k
    const auto base_background_compactions = GetMaxBackgroundCompactions();
421
860k
    LOG(INFO) << "FLAGS_rocksdb_base_background_compactions was not set, automatically configuring "
422
860k
        << base_background_compactions << " base background compactions.";
423
860k
    return base_background_compactions;
424
860k
  }
425
426
0
  return FLAGS_rocksdb_base_background_compactions;
427
0
}
428
429
// Auto initialize some of the RocksDB flags.
430
859k
void AutoInitFromRocksDBFlags(rocksdb::Options* options) {
431
859k
  std::unique_lock<std::mutex> lock(rocksdb_flags_mutex);
432
433
859k
  options->max_background_flushes = GetMaxBackgroundFlushes();
434
435
859k
  if (FLAGS_rocksdb_disable_compactions) {
436
4
    return;
437
4
  }
438
439
859k
  options->max_background_compactions = GetMaxBackgroundCompactions();
440
859k
  options->base_background_compactions = GetBaseBackgroundCompactions();
441
859k
}
442
443
430k
void AutoInitFromBlockBasedTableOptions(rocksdb::BlockBasedTableOptions* table_options) {
444
430k
  std::unique_lock<std::mutex> lock(rocksdb_flags_mutex);
445
446
430k
  table_options->block_size = FLAGS_db_block_size_bytes;
447
430k
  table_options->filter_block_size = FLAGS_db_filter_block_size_bytes;
448
430k
  table_options->index_block_size = FLAGS_db_index_block_size_bytes;
449
430k
  table_options->min_keys_per_index_block = FLAGS_db_min_keys_per_index_block;
450
451
430k
  if (FLAGS_block_restart_interval < kMinBlockStartInterval) {
452
0
      LOG(INFO) << "FLAGS_block_restart_interval was set to a very low value, overriding "
453
0
                << "block_restart_interval to " << kDefaultBlockStartInterval << ".";
454
0
      table_options->block_restart_interval = kDefaultBlockStartInterval;
455
430k
    } else if (FLAGS_block_restart_interval > kMaxBlockStartInterval) {
456
0
      LOG(INFO) << "FLAGS_block_restart_interval was set to a very high value, overriding "
457
0
                << "block_restart_interval to " << kMaxBlockStartInterval << ".";
458
0
      table_options->block_restart_interval = kMaxBlockStartInterval;
459
430k
    } else {
460
430k
      table_options->block_restart_interval = FLAGS_block_restart_interval;
461
430k
    }
462
430k
}
463
464
class HybridTimeFilteringIterator : public rocksdb::FilteringIterator {
465
 public:
466
  HybridTimeFilteringIterator(
467
      rocksdb::InternalIterator* iterator, bool arena_mode, HybridTime hybrid_time_filter)
468
0
      : rocksdb::FilteringIterator(iterator, arena_mode), hybrid_time_filter_(hybrid_time_filter) {}
469
470
 private:
471
0
  bool Satisfied(Slice key) override {
472
0
    auto user_key = rocksdb::ExtractUserKey(key);
473
0
    auto doc_ht = DocHybridTime::DecodeFromEnd(&user_key);
474
0
    if (!doc_ht.ok()) {
475
0
      LOG(DFATAL) << "Unable to decode doc ht " << rocksdb::ExtractUserKey(key) << ": "
476
0
                  << doc_ht.status();
477
0
      return true;
478
0
    }
479
0
    return doc_ht->hybrid_time() <= hybrid_time_filter_;
480
0
  }
481
482
  HybridTime hybrid_time_filter_;
483
};
484
485
template <class T, class... Args>
486
0
T* CreateOnArena(rocksdb::Arena* arena, Args&&... args) {
487
0
  if (!arena) {
488
0
    return new T(std::forward<Args>(args)...);
489
0
  }
490
0
  auto mem = arena->AllocateAligned(sizeof(T));
491
0
  return new (mem) T(std::forward<Args>(args)...);
492
0
}
493
494
rocksdb::InternalIterator* WrapIterator(
495
3.42M
    rocksdb::InternalIterator* iterator, rocksdb::Arena* arena, const Slice& filter) {
496
3.42M
  if (!filter.empty()) {
497
0
    HybridTime hybrid_time_filter;
498
0
    memcpy(&hybrid_time_filter, filter.data(), sizeof(hybrid_time_filter));
499
0
    return CreateOnArena<HybridTimeFilteringIterator>(
500
0
        arena, iterator, arena != nullptr, hybrid_time_filter);
501
0
  }
502
3.42M
  return iterator;
503
3.42M
}
504
505
void AddSupportedFilterPolicy(
506
    const rocksdb::BlockBasedTableOptions::FilterPolicyPtr& filter_policy,
507
844k
    rocksdb::BlockBasedTableOptions* table_options) {
508
844k
  table_options->supported_filter_policies->emplace(filter_policy->Name(), filter_policy);
509
844k
}
510
511
PriorityThreadPool* GetGlobalPriorityThreadPool
512
430k
  (const scoped_refptr<MetricEntity>& metric_entity = nullptr) {
513
430k
    static PriorityThreadPool priority_thread_pool_for_compactions_and_flushes(
514
430k
      GetGlobalRocksDBPriorityThreadPoolSize(), metric_entity);
515
430k
    return &priority_thread_pool_for_compactions_and_flushes;
516
430k
}
517
518
} // namespace
519
520
0
rocksdb::Options TEST_AutoInitFromRocksDBFlags() {
521
0
  rocksdb::Options options;
522
0
  AutoInitFromRocksDBFlags(&options);
523
0
  return options;
524
0
}
525
526
0
rocksdb::BlockBasedTableOptions TEST_AutoInitFromRocksDbTableFlags() {
527
0
  rocksdb::BlockBasedTableOptions blockBasedTableOptions;
528
0
  AutoInitFromBlockBasedTableOptions(&blockBasedTableOptions);
529
0
  return blockBasedTableOptions;
530
0
}
531
532
15.5k
int32_t GetGlobalRocksDBPriorityThreadPoolSize() {
533
15.5k
  if (FLAGS_rocksdb_disable_compactions) {
534
6
    return 1;
535
6
  }
536
537
15.5k
  auto priority_thread_pool_size = FLAGS_priority_thread_pool_size;
538
15.5k
  if (priority_thread_pool_size >= 0) {
539
0
    return priority_thread_pool_size;
540
0
  }
541
542
15.5k
  if (FLAGS_rocksdb_max_background_compactions != -1) {
543
    // If we did set the per-rocksdb flag, but not FLAGS_priority_thread_pool_size, just port
544
    // over that value.
545
3
    priority_thread_pool_size = GetMaxBackgroundCompactions();
546
15.5k
  } else {
547
15.5k
    const int kNumCpus = base::NumCPUs();
548
    // If we did not override the per-rocksdb queue size, then just use a production friendly
549
    // formula.
550
    //
551
    // For less then 8cpus, just manually tune to 1-2 threads. Above that, we can use 3.5/8.
552
15.5k
    if (kNumCpus < 4) {
553
0
      priority_thread_pool_size = 1;
554
15.5k
    } else if (kNumCpus < 8) {
555
0
      priority_thread_pool_size = 2;
556
15.5k
    } else {
557
15.5k
      priority_thread_pool_size = (int32_t) std::floor(kNumCpus * 3.5 / 8.0);
558
15.5k
    }
559
15.5k
  }
560
561
15.5k
  LOG(INFO) << "FLAGS_priority_thread_pool_size was not set, automatically configuring to "
562
15.5k
      << priority_thread_pool_size << ".";
563
564
15.5k
  return priority_thread_pool_size;
565
15.5k
}
566
567
void InitRocksDBOptions(
568
    rocksdb::Options* options, const string& log_prefix,
569
    const shared_ptr<rocksdb::Statistics>& statistics,
570
    const tablet::TabletOptions& tablet_options,
571
429k
    rocksdb::BlockBasedTableOptions table_options) {
572
429k
  AutoInitFromRocksDBFlags(options);
573
429k
  SetLogPrefix(options, log_prefix);
574
429k
  options->create_if_missing = true;
575
429k
  options->disableDataSync = true;
576
429k
  options->statistics = statistics;
577
429k
  options->info_log_level = YBRocksDBLogger::ConvertToRocksDBLogLevel(FLAGS_minloglevel);
578
429k
  options->initial_seqno = FLAGS_initial_seqno;
579
429k
  options->boundary_extractor = DocBoundaryValuesExtractorInstance();
580
429k
  options->compaction_measure_io_stats = FLAGS_rocksdb_compaction_measure_io_stats;
581
429k
  options->memory_monitor = tablet_options.memory_monitor;
582
429k
  if (FLAGS_db_write_buffer_size != -1) {
583
163
    options->write_buffer_size = FLAGS_db_write_buffer_size;
584
429k
  } else {
585
429k
    options->write_buffer_size = FLAGS_memstore_size_mb * 1_MB;
586
429k
  }
587
429k
  options->env = tablet_options.rocksdb_env;
588
429k
  options->checkpoint_env = rocksdb::Env::Default();
589
429k
  options->priority_thread_pool_for_compactions_and_flushes =
590
429k
    (tablet_options.ServerMetricEntity) ?
591
373k
    GetGlobalPriorityThreadPool(tablet_options.ServerMetricEntity) :
592
55.7k
    GetGlobalPriorityThreadPool();
593
594
429k
  if (FLAGS_num_reserved_small_compaction_threads != -1) {
595
0
    options->num_reserved_small_compaction_threads = FLAGS_num_reserved_small_compaction_threads;
596
0
  }
597
598
  // Since the flag validator for FLAGS_compression_type will fail if the result of this call is not
599
  // OK, this CHECK_RESULT should never fail and is safe.
600
429k
  options->compression = CHECK_RESULT(GetConfiguredCompressionType(FLAGS_compression_type));
601
602
429k
  options->listeners.insert(
603
429k
      options->listeners.end(), tablet_options.listeners.begin(),
604
429k
      tablet_options.listeners.end()); // Append listeners
605
606
  // Set block cache options.
607
429k
  if (tablet_options.block_cache) {
608
381k
    table_options.block_cache = tablet_options.block_cache;
609
    // Cache the bloom filters in the block cache.
610
381k
    table_options.cache_index_and_filter_blocks = true;
611
47.8k
  } else {
612
47.8k
    table_options.no_block_cache = true;
613
47.8k
    table_options.cache_index_and_filter_blocks = false;
614
47.8k
  }
615
616
429k
  AutoInitFromBlockBasedTableOptions(&table_options);
617
618
  // Set our custom bloom filter that is docdb aware.
619
429k
  if (FLAGS_use_docdb_aware_bloom_filter) {
620
422k
    const auto filter_block_size_bits = table_options.filter_block_size * 8;
621
422k
    table_options.filter_policy = std::make_shared<const DocDbAwareV3FilterPolicy>(
622
422k
        filter_block_size_bits, options->info_log.get());
623
422k
    table_options.supported_filter_policies =
624
422k
        std::make_shared<rocksdb::BlockBasedTableOptions::FilterPoliciesMap>();
625
422k
    AddSupportedFilterPolicy(std::make_shared<const DocDbAwareHashedComponentsFilterPolicy>(
626
422k
            filter_block_size_bits, options->info_log.get()), &table_options);
627
422k
    AddSupportedFilterPolicy(std::make_shared<const DocDbAwareV2FilterPolicy>(
628
422k
            filter_block_size_bits, options->info_log.get()), &table_options);
629
422k
  }
630
631
430k
  if (FLAGS_use_multi_level_index) {
632
430k
    table_options.index_type = rocksdb::IndexType::kMultiLevelBinarySearch;
633
18.4E
  } else {
634
18.4E
    table_options.index_type = rocksdb::IndexType::kBinarySearch;
635
18.4E
  }
636
637
429k
  options->table_factory.reset(rocksdb::NewBlockBasedTableFactory(table_options));
638
639
  // Compaction related options.
640
641
  // Enable universal style compactions.
642
429k
  bool compactions_enabled = !FLAGS_rocksdb_disable_compactions;
643
429k
  options->compaction_style = compactions_enabled
644
429k
    ? rocksdb::CompactionStyle::kCompactionStyleUniversal
645
18.4E
    : rocksdb::CompactionStyle::kCompactionStyleNone;
646
  // Set the number of levels to 1.
647
429k
  options->num_levels = 1;
648
649
429k
  AutoInitFromRocksDBFlags(options);
650
430k
  if (compactions_enabled) {
651
430k
    options->level0_file_num_compaction_trigger = FLAGS_rocksdb_level0_file_num_compaction_trigger;
652
430k
    options->level0_slowdown_writes_trigger = max_if_negative(
653
430k
        FLAGS_rocksdb_level0_slowdown_writes_trigger);
654
430k
    options->level0_stop_writes_trigger = max_if_negative(FLAGS_rocksdb_level0_stop_writes_trigger);
655
    // This determines the algo used to compute which files will be included. The "total size" based
656
    // computation compares the size of every new file with the sum of all files included so far.
657
430k
    options->compaction_options_universal.stop_style =
658
430k
        rocksdb::CompactionStopStyle::kCompactionStopStyleTotalSize;
659
430k
    options->compaction_options_universal.size_ratio =
660
430k
        FLAGS_rocksdb_universal_compaction_size_ratio;
661
430k
    options->compaction_options_universal.always_include_size_threshold =
662
430k
        FLAGS_rocksdb_universal_compaction_always_include_size_threshold;
663
430k
    options->compaction_options_universal.min_merge_width =
664
430k
        FLAGS_rocksdb_universal_compaction_min_merge_width;
665
430k
    options->compaction_size_threshold_bytes = FLAGS_rocksdb_compaction_size_threshold_bytes;
666
370k
    options->rate_limiter = tablet_options.rate_limiter ? tablet_options.rate_limiter
667
59.2k
                                                        : CreateRocksDBRateLimiter();
668
18.4E
  } else {
669
18.4E
    options->level0_slowdown_writes_trigger = std::numeric_limits<int>::max();
670
18.4E
    options->level0_stop_writes_trigger = std::numeric_limits<int>::max();
671
18.4E
  }
672
673
429k
  options->max_write_buffer_number = FLAGS_rocksdb_max_write_buffer_number;
674
675
429k
  options->memtable_factory = std::make_shared<rocksdb::SkipListFactory>(
676
429k
      0 /* lookahead */, rocksdb::ConcurrentWrites::kFalse);
677
678
429k
  options->iterator_replacer = std::make_shared<rocksdb::IteratorReplacer>(&WrapIterator);
679
429k
}
680
681
538k
void SetLogPrefix(rocksdb::Options* options, const std::string& log_prefix) {
682
538k
  options->log_prefix = log_prefix;
683
538k
  options->info_log = std::make_shared<YBRocksDBLogger>(options->log_prefix);
684
538k
}
685
686
namespace {
687
688
// Helper class for RocksDBPatcher.
689
class RocksDBPatcherHelper {
690
 public:
691
  explicit RocksDBPatcherHelper(rocksdb::VersionSet* version_set)
692
      : version_set_(version_set), cfd_(version_set->GetColumnFamilySet()->GetDefault()),
693
862
        delete_edit_(cfd_), add_edit_(cfd_) {
694
862
  }
695
696
1.72k
  int Levels() const {
697
1.72k
    return cfd_->NumberLevels();
698
1.72k
  }
699
700
862
  const std::vector<rocksdb::FileMetaData*>& LevelFiles(int level) {
701
862
    return cfd_->current()->storage_info()->LevelFiles(level);
702
862
  }
703
704
  template <class F>
705
862
  auto IterateFiles(const F& f) {
706
    // Auto routing based on f return type.
707
862
    return IterateFilesHelper(f, static_cast<decltype(f(0, *LevelFiles(0).front()))*>(nullptr));
708
862
  }
Unexecuted instantiation: docdb_rocksdb_util.cc:_ZN2yb5docdb12_GLOBAL__N_120RocksDBPatcherHelper12IterateFilesIZNS0_14RocksDBPatcher4Impl19SetHybridTimeFilterENS_10HybridTimeEEUliRKN7rocksdb12FileMetaDataEE_EEDaRKT_
docdb_rocksdb_util.cc:_ZN2yb5docdb12_GLOBAL__N_120RocksDBPatcherHelper12IterateFilesIZNS0_14RocksDBPatcher4Impl21ModifyFlushedFrontierERKNS0_17ConsensusFrontierEEUliN7rocksdb12FileMetaDataEE_EEDaRKT_
Line
Count
Source
705
862
  auto IterateFiles(const F& f) {
706
    // Auto routing based on f return type.
707
862
    return IterateFilesHelper(f, static_cast<decltype(f(0, *LevelFiles(0).front()))*>(nullptr));
708
862
  }
Unexecuted instantiation: docdb_rocksdb_util.cc:_ZN2yb5docdb12_GLOBAL__N_120RocksDBPatcherHelper12IterateFilesIZNS0_14RocksDBPatcher4Impl15UpdateFileSizesEvEUliRKN7rocksdb12FileMetaDataEE_EEDaRKT_
709
710
1.72k
  void ModifyFile(int level, const rocksdb::FileMetaData& fmd) {
711
1.72k
    delete_edit_->DeleteFile(level, fmd.fd.GetNumber());
712
1.72k
    add_edit_->AddCleanedFile(level, fmd);
713
1.72k
  }
714
715
862
  rocksdb::VersionEdit& Edit() {
716
862
    return *add_edit_;
717
862
  }
718
719
  CHECKED_STATUS Apply(
720
862
      const rocksdb::Options& options, const rocksdb::ImmutableCFOptions& imm_cf_options) {
721
862
    if (!delete_edit_.modified() && !add_edit_.modified()) {
722
0
      return Status::OK();
723
0
    }
724
725
862
    rocksdb::MutableCFOptions mutable_cf_options(options, imm_cf_options);
726
862
    {
727
862
      rocksdb::InstrumentedMutex mutex;
728
862
      rocksdb::InstrumentedMutexLock lock(&mutex);
729
1.72k
      for (auto* edit : {&delete_edit_, &add_edit_}) {
730
1.72k
        if (edit->modified()) {
731
1.72k
          RETURN_NOT_OK(version_set_->LogAndApply(cfd_, mutable_cf_options, edit->get(), &mutex));
732
1.72k
        }
733
1.72k
      }
734
862
    }
735
736
862
    return Status::OK();
737
862
  }
738
739
 private:
740
  template <class F>
741
862
  void IterateFilesHelper(const F& f, void*) {
742
1.72k
    for (int level = 0; level < Levels(); ++level) {
743
1.72k
      for (const auto* file : LevelFiles(level)) {
744
1.72k
        f(level, *file);
745
1.72k
      }
746
862
    }
747
862
  }
Unexecuted instantiation: docdb_rocksdb_util.cc:_ZN2yb5docdb12_GLOBAL__N_120RocksDBPatcherHelper18IterateFilesHelperIZNS0_14RocksDBPatcher4Impl19SetHybridTimeFilterENS_10HybridTimeEEUliRKN7rocksdb12FileMetaDataEE_EEvRKT_Pv
docdb_rocksdb_util.cc:_ZN2yb5docdb12_GLOBAL__N_120RocksDBPatcherHelper18IterateFilesHelperIZNS0_14RocksDBPatcher4Impl21ModifyFlushedFrontierERKNS0_17ConsensusFrontierEEUliN7rocksdb12FileMetaDataEE_EEvRKT_Pv
Line
Count
Source
741
862
  void IterateFilesHelper(const F& f, void*) {
742
1.72k
    for (int level = 0; level < Levels(); ++level) {
743
1.72k
      for (const auto* file : LevelFiles(level)) {
744
1.72k
        f(level, *file);
745
1.72k
      }
746
862
    }
747
862
  }
748
749
  template <class F>
750
0
  CHECKED_STATUS IterateFilesHelper(const F& f, Status*) {
751
0
    for (int level = 0; level < Levels(); ++level) {
752
0
      for (const auto* file : LevelFiles(level)) {
753
0
        RETURN_NOT_OK(f(level, *file));
754
0
      }
755
0
    }
756
0
    return Status::OK();
757
0
  }
758
759
  class TrackedEdit {
760
   public:
761
1.72k
    explicit TrackedEdit(rocksdb::ColumnFamilyData* cfd) {
762
1.72k
      edit_.SetColumnFamily(cfd->GetID());
763
1.72k
    }
764
765
6.03k
    rocksdb::VersionEdit* get() {
766
6.03k
      modified_ = true;
767
6.03k
      return &edit_;
768
6.03k
    }
769
770
3.44k
    rocksdb::VersionEdit* operator->() {
771
3.44k
      return get();
772
3.44k
    }
773
774
862
    rocksdb::VersionEdit& operator*() {
775
862
      return *get();
776
862
    }
777
778
2.58k
    bool modified() const {
779
2.58k
      return modified_;
780
2.58k
    }
781
782
   private:
783
    rocksdb::VersionEdit edit_;
784
    bool modified_ = false;
785
  };
786
787
  rocksdb::VersionSet* version_set_;
788
  rocksdb::ColumnFamilyData* cfd_;
789
  TrackedEdit delete_edit_;
790
  TrackedEdit add_edit_;
791
};
792
793
} // namespace
794
795
class RocksDBPatcher::Impl {
796
 public:
797
  Impl(const std::string& dbpath, const rocksdb::Options& options)
798
      : options_(SanitizeOptions(dbpath, &comparator_, options)),
799
        imm_cf_options_(options_),
800
        env_options_(options_),
801
        cf_options_(options_),
802
862
        version_set_(dbpath, &options_, env_options_, block_cache_.get(), &write_buffer_, nullptr) {
803
862
    cf_options_.comparator = comparator_.user_comparator();
804
862
  }
805
806
862
  CHECKED_STATUS Load() {
807
862
    std::vector<rocksdb::ColumnFamilyDescriptor> column_families;
808
862
    column_families.emplace_back("default", cf_options_);
809
862
    return version_set_.Recover(column_families);
810
862
  }
811
812
0
  CHECKED_STATUS SetHybridTimeFilter(HybridTime value) {
813
0
    RocksDBPatcherHelper helper(&version_set_);
814
815
0
    helper.IterateFiles([&helper, value](int level, const rocksdb::FileMetaData& file) {
816
0
      if (!file.largest.user_frontier) {
817
0
        return;
818
0
      }
819
0
      auto& consensus_frontier = down_cast<ConsensusFrontier&>(*file.largest.user_frontier);
820
0
      if (consensus_frontier.hybrid_time() <= value ||
821
0
          consensus_frontier.hybrid_time_filter() <= value) {
822
0
        return;
823
0
      }
824
0
      rocksdb::FileMetaData fmd = file;
825
0
      down_cast<ConsensusFrontier&>(*fmd.largest.user_frontier).set_hybrid_time_filter(value);
826
0
      helper.ModifyFile(level, fmd);
827
0
    });
828
829
0
    return helper.Apply(options_, imm_cf_options_);
830
0
  }
831
832
862
  CHECKED_STATUS ModifyFlushedFrontier(const ConsensusFrontier& frontier) {
833
862
    RocksDBPatcherHelper helper(&version_set_);
834
835
862
    docdb::ConsensusFrontier final_frontier = frontier;
836
837
862
    auto* existing_frontier = down_cast<docdb::ConsensusFrontier*>(version_set_.FlushedFrontier());
838
862
    if (existing_frontier) {
839
862
      if (!frontier.history_cutoff()) {
840
862
        final_frontier.set_history_cutoff(existing_frontier->history_cutoff());
841
862
      }
842
862
      if (!frontier.op_id()) {
843
        // Update op id only if it was specified in frontier.
844
0
        final_frontier.set_op_id(existing_frontier->op_id());
845
0
      }
846
862
    }
847
848
862
    helper.Edit().ModifyFlushedFrontier(
849
862
        final_frontier.Clone(), rocksdb::FrontierModificationMode::kForce);
850
851
1.72k
    helper.IterateFiles([&helper, &frontier](int level, rocksdb::FileMetaData fmd) {
852
1.72k
      bool modified = false;
853
3.44k
      for (auto* user_frontier : {&fmd.smallest.user_frontier, &fmd.largest.user_frontier}) {
854
3.44k
        if (!*user_frontier) {
855
0
          continue;
856
0
        }
857
3.44k
        auto& consensus_frontier = down_cast<ConsensusFrontier&>(**user_frontier);
858
3.44k
        if (!consensus_frontier.op_id().empty()) {
859
3.44k
          consensus_frontier.set_op_id(OpId());
860
3.44k
          modified = true;
861
3.44k
        }
862
3.44k
        if (frontier.history_cutoff()) {
863
0
          consensus_frontier.set_history_cutoff(frontier.history_cutoff());
864
0
          modified = true;
865
0
        }
866
3.44k
      }
867
1.72k
      if (modified) {
868
1.72k
        helper.ModifyFile(level, fmd);
869
1.72k
      }
870
1.72k
    });
871
872
862
    return helper.Apply(options_, imm_cf_options_);
873
862
  }
874
875
0
  CHECKED_STATUS UpdateFileSizes() {
876
0
    RocksDBPatcherHelper helper(&version_set_);
877
878
0
    RETURN_NOT_OK(helper.IterateFiles(
879
0
        [&helper, this](int level, const rocksdb::FileMetaData& file) -> Status {
880
0
      auto base_path = rocksdb::MakeTableFileName(
881
0
          options_.db_paths[file.fd.GetPathId()].path, file.fd.GetNumber());
882
0
      auto data_path = rocksdb::TableBaseToDataFileName(base_path);
883
0
      auto base_size = VERIFY_RESULT(options_.env->GetFileSize(base_path));
884
0
      auto data_size = VERIFY_RESULT(options_.env->GetFileSize(data_path));
885
0
      auto total_size = base_size + data_size;
886
0
      if (file.fd.base_file_size == base_size && file.fd.total_file_size == total_size) {
887
0
        return Status::OK();
888
0
      }
889
0
      rocksdb::FileMetaData fmd = file;
890
0
      fmd.fd.base_file_size = base_size;
891
0
      fmd.fd.total_file_size = total_size;
892
0
      helper.ModifyFile(level, fmd);
893
0
      return Status::OK();
894
0
    }));
895
896
0
    return helper.Apply(options_, imm_cf_options_);
897
0
  }
898
899
 private:
900
  const rocksdb::InternalKeyComparator comparator_{rocksdb::BytewiseComparator()};
901
  rocksdb::WriteBuffer write_buffer_{1_KB};
902
  std::shared_ptr<rocksdb::Cache> block_cache_{rocksdb::NewLRUCache(1_MB)};
903
904
  rocksdb::Options options_;
905
  rocksdb::ImmutableCFOptions imm_cf_options_;
906
  rocksdb::EnvOptions env_options_;
907
  rocksdb::ColumnFamilyOptions cf_options_;
908
  rocksdb::VersionSet version_set_;
909
};
910
911
RocksDBPatcher::RocksDBPatcher(const std::string& dbpath, const rocksdb::Options& options)
912
862
    : impl_(new Impl(dbpath, options)) {
913
862
}
914
915
862
RocksDBPatcher::~RocksDBPatcher() {
916
862
}
917
918
862
Status RocksDBPatcher::Load() {
919
862
  return impl_->Load();
920
862
}
921
922
0
Status RocksDBPatcher::SetHybridTimeFilter(HybridTime value) {
923
0
  return impl_->SetHybridTimeFilter(value);
924
0
}
925
926
862
Status RocksDBPatcher::ModifyFlushedFrontier(const ConsensusFrontier& frontier) {
927
862
  return impl_->ModifyFlushedFrontier(frontier);
928
862
}
929
930
0
Status RocksDBPatcher::UpdateFileSizes() {
931
0
  return impl_->UpdateFileSizes();
932
0
}
933
934
134
Status ForceRocksDBCompact(rocksdb::DB* db) {
935
134
  RETURN_NOT_OK_PREPEND(
936
134
      db->CompactRange(rocksdb::CompactRangeOptions(), /* begin = */ nullptr, /* end = */ nullptr),
937
134
      "Compact range failed:");
938
134
  return Status::OK();
939
134
}
940
941
5.81k
RateLimiterSharingMode GetRocksDBRateLimiterSharingMode() {
942
5.81k
  auto result = ParseEnumInsensitive<RateLimiterSharingMode>(
943
5.81k
      FLAGS_rocksdb_compact_flush_rate_limit_sharing_mode);
944
5.81k
  if (PREDICT_TRUE(result.ok())) {
945
5.81k
    return *result;
946
5.81k
  }
947
0
  LOG(DFATAL) << result.status();
948
0
  return RateLimiterSharingMode::NONE;
949
0
}
950
951
63.8k
std::shared_ptr<rocksdb::RateLimiter> CreateRocksDBRateLimiter() {
952
63.8k
  if (PREDICT_TRUE((FLAGS_rocksdb_compact_flush_rate_limit_bytes_per_sec > 0))) {
953
63.8k
    return std::shared_ptr<rocksdb::RateLimiter>(
954
63.8k
      rocksdb::NewGenericRateLimiter(FLAGS_rocksdb_compact_flush_rate_limit_bytes_per_sec));
955
63.8k
  }
956
10
  return nullptr;
957
10
}
958
959
} // namespace docdb
960
} // namespace yb