YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/docdb/docdb_rocksdb_util.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include "yb/docdb/docdb_rocksdb_util.h"
15
16
#include <memory>
17
#include <thread>
18
19
#include "yb/common/transaction.h"
20
21
#include "yb/docdb/bounded_rocksdb_iterator.h"
22
#include "yb/docdb/consensus_frontier.h"
23
#include "yb/docdb/doc_key.h"
24
#include "yb/docdb/intent_aware_iterator.h"
25
#include "yb/docdb/key_bounds.h"
26
#include "yb/docdb/value_type.h"
27
28
#include "yb/gutil/casts.h"
29
#include "yb/gutil/sysinfo.h"
30
31
#include "yb/rocksdb/db/db_impl.h"
32
#include "yb/rocksdb/db/filename.h"
33
#include "yb/rocksdb/db/version_edit.h"
34
#include "yb/rocksdb/db/version_set.h"
35
#include "yb/rocksdb/db/writebuffer.h"
36
#include "yb/rocksdb/memtablerep.h"
37
#include "yb/rocksdb/options.h"
38
#include "yb/rocksdb/rate_limiter.h"
39
#include "yb/rocksdb/table.h"
40
#include "yb/rocksdb/table/filtering_iterator.h"
41
#include "yb/rocksdb/types.h"
42
#include "yb/rocksdb/util/compression.h"
43
44
#include "yb/rocksutil/yb_rocksdb_logger.h"
45
46
#include "yb/util/bytes_formatter.h"
47
#include "yb/util/priority_thread_pool.h"
48
#include "yb/util/result.h"
49
#include "yb/util/size_literals.h"
50
#include "yb/util/status.h"
51
#include "yb/util/status_format.h"
52
#include "yb/util/status_log.h"
53
#include "yb/util/trace.h"
54
55
using namespace yb::size_literals;  // NOLINT.
56
using namespace std::literals;
57
58
static constexpr int32_t kMinBlockStartInterval = 1;
59
static constexpr int32_t kDefaultBlockStartInterval = 16;
60
static constexpr int32_t kMaxBlockStartInterval = 256;
61
62
DEFINE_int32(rocksdb_max_background_flushes, -1, "Number threads to do background flushes.");
63
DEFINE_bool(rocksdb_disable_compactions, false, "Disable rocksdb compactions.");
64
DEFINE_bool(rocksdb_compaction_measure_io_stats, false, "Measure stats for rocksdb compactions.");
65
DEFINE_int32(rocksdb_base_background_compactions, -1,
66
             "Number threads to do background compactions.");
67
DEFINE_int32(rocksdb_max_background_compactions, -1,
68
             "Increased number of threads to do background compactions (used when compactions need "
69
             "to catch up.) Unless rocksdb_disable_compactions=true, this cannot be set to zero.");
70
DEFINE_int32(rocksdb_level0_file_num_compaction_trigger, 5,
71
             "Number of files to trigger level-0 compaction. -1 if compaction should not be "
72
             "triggered by number of files at all.");
73
74
DEFINE_int32(rocksdb_level0_slowdown_writes_trigger, -1,
75
             "The number of files above which writes are slowed down.");
76
DEFINE_int32(rocksdb_level0_stop_writes_trigger, -1,
77
             "The number of files above which compactions are stopped.");
78
DEFINE_int32(rocksdb_universal_compaction_size_ratio, 20,
79
             "The percentage upto which files that are larger are include in a compaction.");
80
DEFINE_uint64(rocksdb_universal_compaction_always_include_size_threshold, 64_MB,
81
             "Always include files of smaller or equal size in a compaction.");
82
DEFINE_int32(rocksdb_universal_compaction_min_merge_width, 4,
83
             "The minimum number of files in a single compaction run.");
84
DEFINE_int64(rocksdb_compact_flush_rate_limit_bytes_per_sec, 1_GB,
85
             "Use to control write rate of flush and compaction.");
86
DEFINE_string(rocksdb_compact_flush_rate_limit_sharing_mode, "tserver",
87
              "Allows to control rate limit sharing/calculation across RocksDB instances\n"
88
              "  tserver - rate limit is shared across all RocksDB instances"
89
              " at tabset server level\n"
90
              "  none - rate limit is calculated independently for every RocksDB instance");
91
DEFINE_uint64(rocksdb_compaction_size_threshold_bytes, 2ULL * 1024 * 1024 * 1024,
92
             "Threshold beyond which compaction is considered large.");
93
DEFINE_uint64(rocksdb_max_file_size_for_compaction, 0,
94
             "Maximal allowed file size to participate in RocksDB compaction. 0 - unlimited.");
95
DEFINE_int32(rocksdb_max_write_buffer_number, 2,
96
             "Maximum number of write buffers that are built up in memory.");
97
98
DEFINE_int64(db_block_size_bytes, 32_KB,
99
             "Size of RocksDB data block (in bytes).");
100
101
DEFINE_int64(db_filter_block_size_bytes, 64_KB,
102
             "Size of RocksDB filter block (in bytes).");
103
104
DEFINE_int64(db_index_block_size_bytes, 32_KB,
105
             "Size of RocksDB index block (in bytes).");
106
107
DEFINE_int64(db_min_keys_per_index_block, 100,
108
             "Minimum number of keys per index block.");
109
110
DEFINE_int64(db_write_buffer_size, -1,
111
             "Size of RocksDB write buffer (in bytes). -1 to use default.");
112
113
DEFINE_int32(memstore_size_mb, 128,
114
             "Max size (in mb) of the memstore, before needing to flush.");
115
116
DEFINE_bool(use_docdb_aware_bloom_filter, true,
117
            "Whether to use the DocDbAwareFilterPolicy for both bloom storage and seeks.");
118
// Empirically 2 is a minimal value that provides best performance on sequential scan.
119
DEFINE_int32(max_nexts_to_avoid_seek, 2,
120
             "The number of next calls to try before doing resorting to do a rocksdb seek.");
121
122
DEFINE_bool(use_multi_level_index, true, "Whether to use multi-level data index.");
123
124
DEFINE_string(
125
    regular_tablets_data_block_key_value_encoding, "shared_prefix",
126
    "Key-value encoding to use for regular data blocks in RocksDB. Possible options: "
127
    "shared_prefix, three_shared_parts");
128
129
DEFINE_uint64(initial_seqno, 1ULL << 50, "Initial seqno for new RocksDB instances.");
130
131
DEFINE_int32(num_reserved_small_compaction_threads, -1, "Number of reserved small compaction "
132
             "threads. It allows splitting small vs. large compactions.");
133
134
DEFINE_bool(enable_ondisk_compression, true,
135
            "Determines whether SSTable compression is enabled or not.");
136
137
DEFINE_int32(priority_thread_pool_size, -1,
138
             "Max running workers in compaction thread pool. "
139
             "If -1 and max_background_compactions is specified - use max_background_compactions. "
140
             "If -1 and max_background_compactions is not specified - use sqrt(num_cpus).");
141
142
DEFINE_string(compression_type, "Snappy",
143
              "On-disk compression type to use in RocksDB."
144
              "By default, Snappy is used if supported.");
145
146
DEFINE_int32(block_restart_interval, kDefaultBlockStartInterval,
147
             "Controls the number of keys to look at for computing the diff encoding.");
148
149
namespace yb {
150
151
namespace {
152
153
544k
Result<rocksdb::CompressionType> GetConfiguredCompressionType(const std::string& flag_value) {
154
544k
  if (!FLAGS_enable_ondisk_compression) {
155
1
    return rocksdb::kNoCompression;
156
1
  }
157
544k
  const std::vector<rocksdb::CompressionType> kValidRocksDBCompressionTypes = {
158
544k
    rocksdb::kNoCompression,
159
544k
    rocksdb::kSnappyCompression,
160
544k
    rocksdb::kZlibCompression,
161
544k
    rocksdb::kLZ4Compression
162
544k
  };
163
1.08M
  for (const auto& compression_type : kValidRocksDBCompressionTypes) {
164
1.08M
    if (flag_value == rocksdb::CompressionTypeToString(compression_type)) {
165
544k
      if (rocksdb::CompressionTypeSupported(compression_type)) {
166
544k
        return compression_type;
167
544k
      }
168
30
      return STATUS_FORMAT(
169
544k
          InvalidArgument, "Configured compression type $0 is not supported.", flag_value);
170
544k
    }
171
1.08M
  }
172
18.4E
  return STATUS_FORMAT(
173
544k
      InvalidArgument, "Configured compression type $0 is not valid.", flag_value);
174
544k
}
175
176
} // namespace
177
178
namespace docdb {
179
180
  Result<rocksdb::KeyValueEncodingFormat> GetConfiguredKeyValueEncodingFormat(
181
196k
    const std::string& flag_value) {
182
196k
    for (const auto& encoding_format : rocksdb::kKeyValueEncodingFormatList) {
183
195k
      if (
flag_value == KeyValueEncodingFormatToString(encoding_format)195k
) {
184
195k
        return encoding_format;
185
195k
      }
186
195k
    }
187
29
    return STATUS_FORMAT(InvalidArgument, "Key-value encoding format $0 is not valid.", flag_value);
188
196k
  }
189
190
} // namespace docdb
191
192
} // namespace yb
193
194
namespace {
195
196
16.8k
bool CompressionTypeValidator(const char* flagname, const std::string& flag_compression_type) {
197
16.8k
  auto res = yb::GetConfiguredCompressionType(flag_compression_type);
198
16.8k
  if (!res.ok()) {
199
    // Below we CHECK_RESULT on the same value returned here, and validating the result here ensures
200
    // that CHECK_RESULT will never fail once the process is running.
201
0
    LOG(ERROR) << res.status().ToString();
202
0
    return false;
203
0
  }
204
16.8k
  return true;
205
16.8k
}
206
207
16.8k
bool KeyValueEncodingFormatValidator(const char* flag_name, const std::string& flag_value) {
208
16.8k
  auto res = yb::docdb::GetConfiguredKeyValueEncodingFormat(flag_value);
209
16.8k
  bool ok = res.ok();
210
16.8k
  if (!ok) {
211
0
    LOG(ERROR) << flag_name << ": " << res.status();
212
0
  }
213
16.8k
  return ok;
214
16.8k
}
215
216
} // namespace
217
218
DEFINE_validator(compression_type, &CompressionTypeValidator);
219
DEFINE_validator(regular_tablets_data_block_key_value_encoding, &KeyValueEncodingFormatValidator);
220
221
using std::shared_ptr;
222
using std::string;
223
using std::unique_ptr;
224
using strings::Substitute;
225
226
namespace yb {
227
namespace docdb {
228
229
std::shared_ptr<rocksdb::BoundaryValuesExtractor> DocBoundaryValuesExtractorInstance();
230
231
1.27G
void SeekForward(const rocksdb::Slice& slice, rocksdb::Iterator *iter) {
232
1.27G
  if (!iter->Valid() || 
iter->key().compare(slice) >= 01.12G
) {
233
528M
    return;
234
528M
  }
235
747M
  ROCKSDB_SEEK(iter, slice);
236
747M
}
237
238
234M
void SeekForward(const KeyBytes& key_bytes, rocksdb::Iterator *iter) {
239
234M
  SeekForward(key_bytes.AsSlice(), iter);
240
234M
}
241
242
98.3M
KeyBytes AppendDocHt(const Slice& key, const DocHybridTime& doc_ht) {
243
98.3M
  char buf[kMaxBytesPerEncodedHybridTime + 1];
244
98.3M
  buf[0] = ValueTypeAsChar::kHybridTime;
245
98.3M
  auto end = doc_ht.EncodedInDocDbFormat(buf + 1);
246
98.3M
  return KeyBytes(key, Slice(buf, end));
247
98.3M
}
248
249
98.3M
void SeekPastSubKey(const Slice& key, rocksdb::Iterator* iter) {
250
98.3M
  SeekForward(AppendDocHt(key, DocHybridTime::kMin), iter);
251
98.3M
}
252
253
135M
void SeekOutOfSubKey(KeyBytes* key_bytes, rocksdb::Iterator* iter) {
254
135M
  key_bytes->AppendValueType(ValueType::kMaxByte);
255
135M
  SeekForward(*key_bytes, iter);
256
135M
  key_bytes->RemoveValueTypeSuffix(ValueType::kMaxByte);
257
135M
}
258
259
void SeekPossiblyUsingNext(rocksdb::Iterator* iter, const Slice& seek_key,
260
770M
                           int* next_count, int* seek_count) {
261
1.57G
  for (int nexts = FLAGS_max_nexts_to_avoid_seek; nexts-- > 0;) {
262
1.53G
    if (!iter->Valid() || 
iter->key().compare(seek_key) >= 01.53G
) {
263
736M
      VTRACE(3, "Did $0 Next(s) instead of a Seek", nexts);
264
736M
      return;
265
736M
    }
266
803M
    VLOG
(4) << "Skipping: " << SubDocKey::DebugSliceToString(iter->key())282k
;
267
268
803M
    iter->Next();
269
803M
    ++*next_count;
270
803M
  }
271
272
34.1M
  VTRACE(3, "Forced to do an actual Seek after $0 Next(s)", FLAGS_max_nexts_to_avoid_seek);
273
34.1M
  iter->Seek(seek_key);
274
34.1M
  ++*seek_count;
275
34.1M
}
276
277
void PerformRocksDBSeek(
278
    rocksdb::Iterator *iter,
279
    const rocksdb::Slice &seek_key,
280
    const char* file_name,
281
800M
    int line) {
282
800M
  int next_count = 0;
283
800M
  int seek_count = 0;
284
800M
  if (seek_key.size() == 0) {
285
98.2k
    iter->SeekToFirst();
286
98.2k
    ++seek_count;
287
799M
  } else if (!iter->Valid() || 
iter->key().compare(seek_key) > 0771M
) {
288
31.6M
    iter->Seek(seek_key);
289
31.6M
    ++seek_count;
290
768M
  } else {
291
768M
    SeekPossiblyUsingNext(iter, seek_key, &next_count, &seek_count);
292
768M
  }
293
800M
  VLOG(4) << Substitute(
294
1.10M
      "PerformRocksDBSeek at $0:$1:\n"
295
1.10M
      "    Seek key:         $2\n"
296
1.10M
      "    Seek key (raw):   $3\n"
297
1.10M
      "    Actual key:       $4\n"
298
1.10M
      "    Actual key (raw): $5\n"
299
1.10M
      "    Actual value:     $6\n"
300
1.10M
      "    Next() calls:     $7\n"
301
1.10M
      "    Seek() calls:     $8\n",
302
1.10M
      file_name, line,
303
1.10M
      BestEffortDocDBKeyToStr(seek_key),
304
1.10M
      FormatSliceAsStr(seek_key),
305
1.10M
      iter->Valid() ? 
BestEffortDocDBKeyToStr(KeyBytes(iter->key()))0
: "N/A",
306
1.10M
      iter->Valid() ? 
FormatSliceAsStr(iter->key())0
: "N/A",
307
1.10M
      iter->Valid() ? 
FormatSliceAsStr(iter->value())0
: "N/A",
308
1.10M
      next_count,
309
1.10M
      seek_count);
310
800M
}
311
312
namespace {
313
314
rocksdb::ReadOptions PrepareReadOptions(
315
    rocksdb::DB* rocksdb,
316
    BloomFilterMode bloom_filter_mode,
317
    const boost::optional<const Slice>& user_key_for_filter,
318
    const rocksdb::QueryId query_id,
319
    std::shared_ptr<rocksdb::ReadFileFilter> file_filter,
320
38.0M
    const Slice* iterate_upper_bound) {
321
38.0M
  rocksdb::ReadOptions read_opts;
322
38.0M
  read_opts.query_id = query_id;
323
38.0M
  if (FLAGS_use_docdb_aware_bloom_filter &&
324
38.0M
    
bloom_filter_mode == BloomFilterMode::USE_BLOOM_FILTER33.7M
) {
325
21.7M
    DCHECK(user_key_for_filter);
326
21.7M
    read_opts.table_aware_file_filter = rocksdb->GetOptions().table_factory->
327
21.7M
        NewTableAwareReadFileFilter(read_opts, user_key_for_filter.get());
328
21.7M
  }
329
38.0M
  read_opts.file_filter = std::move(file_filter);
330
38.0M
  read_opts.iterate_upper_bound = iterate_upper_bound;
331
38.0M
  return read_opts;
332
38.0M
}
333
334
} // namespace
335
336
BoundedRocksDbIterator CreateRocksDBIterator(
337
    rocksdb::DB* rocksdb,
338
    const KeyBounds* docdb_key_bounds,
339
    BloomFilterMode bloom_filter_mode,
340
    const boost::optional<const Slice>& user_key_for_filter,
341
    const rocksdb::QueryId query_id,
342
    std::shared_ptr<rocksdb::ReadFileFilter> file_filter,
343
17.3M
    const Slice* iterate_upper_bound) {
344
17.3M
  rocksdb::ReadOptions read_opts = PrepareReadOptions(rocksdb, bloom_filter_mode,
345
17.3M
      user_key_for_filter, query_id, std::move(file_filter), iterate_upper_bound);
346
17.3M
  return BoundedRocksDbIterator(rocksdb, read_opts, docdb_key_bounds);
347
17.3M
}
348
349
unique_ptr<IntentAwareIterator> CreateIntentAwareIterator(
350
    const DocDB& doc_db,
351
    BloomFilterMode bloom_filter_mode,
352
    const boost::optional<const Slice>& user_key_for_filter,
353
    const rocksdb::QueryId query_id,
354
    const TransactionOperationContext& txn_op_context,
355
    CoarseTimePoint deadline,
356
    const ReadHybridTime& read_time,
357
    std::shared_ptr<rocksdb::ReadFileFilter> file_filter,
358
20.7M
    const Slice* iterate_upper_bound) {
359
  // TODO(dtxn) do we need separate options for intents db?
360
20.7M
  rocksdb::ReadOptions read_opts = PrepareReadOptions(doc_db.regular, bloom_filter_mode,
361
20.7M
      user_key_for_filter, query_id, std::move(file_filter), iterate_upper_bound);
362
20.7M
  return std::make_unique<IntentAwareIterator>(
363
20.7M
      doc_db, read_opts, deadline, read_time, txn_op_context);
364
20.7M
}
365
366
namespace {
367
368
std::mutex rocksdb_flags_mutex;
369
370
1.05M
int32_t GetMaxBackgroundFlushes() {
371
1.05M
  const auto kNumCpus = base::NumCPUs();
372
1.05M
  if (FLAGS_rocksdb_max_background_flushes == -1) {
373
1.05M
    constexpr auto kCpusPerFlushThread = 8;
374
1.05M
    constexpr auto kAutoMaxBackgroundFlushesHighLimit = 4;
375
1.05M
    auto flushes = 1 + kNumCpus / kCpusPerFlushThread;
376
1.05M
    auto max_flushes = std::min(flushes, kAutoMaxBackgroundFlushesHighLimit);
377
1.05M
    LOG(INFO) << "Overriding FLAGS_rocksdb_max_background_flushes to " << max_flushes;
378
1.05M
    return max_flushes;
379
1.05M
  } else {
380
1
    return FLAGS_rocksdb_max_background_flushes;
381
1
  }
382
1.05M
}
383
384
// This controls the maximum number of schedulable compactions, per each instance of rocksdb, of
385
// which we will have many. We also do not want to waste resources by having too many queued
386
// compactions.
387
2.11M
int32_t GetMaxBackgroundCompactions() {
388
2.11M
  if (FLAGS_rocksdb_disable_compactions) {
389
0
    return 0;
390
0
  }
391
2.11M
  int rocksdb_max_background_compactions = FLAGS_rocksdb_max_background_compactions;
392
393
2.11M
  if (rocksdb_max_background_compactions == 0) {
394
0
    LOG(FATAL) << "--rocksdb_max_background_compactions may not be set to zero with compactions "
395
0
        << "enabled. Either change this flag or set --rocksdb_disable_compactions=true.";
396
2.11M
  } else if (rocksdb_max_background_compactions > 0) {
397
12
    return rocksdb_max_background_compactions;
398
12
  }
399
400
2.11M
  const auto kNumCpus = base::NumCPUs();
401
2.11M
  if (kNumCpus <= 4) {
402
0
    rocksdb_max_background_compactions = 1;
403
2.11M
  } else if (kNumCpus <= 8) {
404
0
    rocksdb_max_background_compactions = 2;
405
2.11M
  } else if (kNumCpus <= 32) {
406
2.11M
    rocksdb_max_background_compactions = 3;
407
2.11M
  } else {
408
0
    rocksdb_max_background_compactions = 4;
409
0
  }
410
2.11M
  LOG(INFO) << "FLAGS_rocksdb_max_background_compactions was not set, automatically configuring "
411
2.11M
      << rocksdb_max_background_compactions << " background compactions.";
412
2.11M
  return rocksdb_max_background_compactions;
413
2.11M
}
414
415
1.05M
int32_t GetBaseBackgroundCompactions() {
416
1.05M
  if (FLAGS_rocksdb_disable_compactions) {
417
0
    return 0;
418
0
  }
419
420
1.05M
  if (FLAGS_rocksdb_base_background_compactions == -1) {
421
1.05M
    const auto base_background_compactions = GetMaxBackgroundCompactions();
422
1.05M
    LOG(INFO) << "FLAGS_rocksdb_base_background_compactions was not set, automatically configuring "
423
1.05M
        << base_background_compactions << " base background compactions.";
424
1.05M
    return base_background_compactions;
425
1.05M
  }
426
427
1
  return FLAGS_rocksdb_base_background_compactions;
428
1.05M
}
429
430
// Auto initialize some of the RocksDB flags.
431
1.05M
void AutoInitFromRocksDBFlags(rocksdb::Options* options) {
432
1.05M
  std::unique_lock<std::mutex> lock(rocksdb_flags_mutex);
433
434
1.05M
  options->max_background_flushes = GetMaxBackgroundFlushes();
435
436
1.05M
  if (FLAGS_rocksdb_disable_compactions) {
437
7
    return;
438
7
  }
439
440
1.05M
  options->max_background_compactions = GetMaxBackgroundCompactions();
441
1.05M
  options->base_background_compactions = GetBaseBackgroundCompactions();
442
1.05M
}
443
444
527k
void AutoInitFromBlockBasedTableOptions(rocksdb::BlockBasedTableOptions* table_options) {
445
527k
  std::unique_lock<std::mutex> lock(rocksdb_flags_mutex);
446
447
527k
  table_options->block_size = FLAGS_db_block_size_bytes;
448
527k
  table_options->filter_block_size = FLAGS_db_filter_block_size_bytes;
449
527k
  table_options->index_block_size = FLAGS_db_index_block_size_bytes;
450
527k
  table_options->min_keys_per_index_block = FLAGS_db_min_keys_per_index_block;
451
452
527k
  if (FLAGS_block_restart_interval < kMinBlockStartInterval) {
453
1
      LOG(INFO) << "FLAGS_block_restart_interval was set to a very low value, overriding "
454
1
                << "block_restart_interval to " << kDefaultBlockStartInterval << ".";
455
1
      table_options->block_restart_interval = kDefaultBlockStartInterval;
456
527k
    } else if (FLAGS_block_restart_interval > kMaxBlockStartInterval) {
457
1
      LOG(INFO) << "FLAGS_block_restart_interval was set to a very high value, overriding "
458
1
                << "block_restart_interval to " << kMaxBlockStartInterval << ".";
459
1
      table_options->block_restart_interval = kMaxBlockStartInterval;
460
527k
    } else {
461
527k
      table_options->block_restart_interval = FLAGS_block_restart_interval;
462
527k
    }
463
527k
}
464
465
class HybridTimeFilteringIterator : public rocksdb::FilteringIterator {
466
 public:
467
  HybridTimeFilteringIterator(
468
      rocksdb::InternalIterator* iterator, bool arena_mode, HybridTime hybrid_time_filter)
469
36
      : rocksdb::FilteringIterator(iterator, arena_mode), hybrid_time_filter_(hybrid_time_filter) {}
470
471
 private:
472
4.10k
  bool Satisfied(Slice key) override {
473
4.10k
    auto user_key = rocksdb::ExtractUserKey(key);
474
4.10k
    auto doc_ht = DocHybridTime::DecodeFromEnd(&user_key);
475
4.10k
    if (!doc_ht.ok()) {
476
0
      LOG(DFATAL) << "Unable to decode doc ht " << rocksdb::ExtractUserKey(key) << ": "
477
0
                  << doc_ht.status();
478
0
      return true;
479
0
    }
480
4.10k
    return doc_ht->hybrid_time() <= hybrid_time_filter_;
481
4.10k
  }
482
483
  HybridTime hybrid_time_filter_;
484
};
485
486
template <class T, class... Args>
487
36
T* CreateOnArena(rocksdb::Arena* arena, Args&&... args) {
488
36
  if (!arena) {
489
5
    return new T(std::forward<Args>(args)...);
490
5
  }
491
31
  auto mem = arena->AllocateAligned(sizeof(T));
492
31
  return new (mem) T(std::forward<Args>(args)...);
493
36
}
494
495
rocksdb::InternalIterator* WrapIterator(
496
14.2M
    rocksdb::InternalIterator* iterator, rocksdb::Arena* arena, const Slice& filter) {
497
14.2M
  if (!filter.empty()) {
498
36
    HybridTime hybrid_time_filter;
499
36
    memcpy(&hybrid_time_filter, filter.data(), sizeof(hybrid_time_filter));
500
36
    return CreateOnArena<HybridTimeFilteringIterator>(
501
36
        arena, iterator, arena != nullptr, hybrid_time_filter);
502
36
  }
503
14.2M
  return iterator;
504
14.2M
}
505
506
void AddSupportedFilterPolicy(
507
    const rocksdb::BlockBasedTableOptions::FilterPolicyPtr& filter_policy,
508
1.03M
    rocksdb::BlockBasedTableOptions* table_options) {
509
1.03M
  table_options->supported_filter_policies->emplace(filter_policy->Name(), filter_policy);
510
1.03M
}
511
512
PriorityThreadPool* GetGlobalPriorityThreadPool
513
527k
  (const scoped_refptr<MetricEntity>& metric_entity = nullptr) {
514
527k
    static PriorityThreadPool priority_thread_pool_for_compactions_and_flushes(
515
527k
      GetGlobalRocksDBPriorityThreadPoolSize(), metric_entity);
516
527k
    return &priority_thread_pool_for_compactions_and_flushes;
517
527k
}
518
519
} // namespace
520
521
9
rocksdb::Options TEST_AutoInitFromRocksDBFlags() {
522
9
  rocksdb::Options options;
523
9
  AutoInitFromRocksDBFlags(&options);
524
9
  return options;
525
9
}
526
527
4
rocksdb::BlockBasedTableOptions TEST_AutoInitFromRocksDbTableFlags() {
528
4
  rocksdb::BlockBasedTableOptions blockBasedTableOptions;
529
4
  AutoInitFromBlockBasedTableOptions(&blockBasedTableOptions);
530
4
  return blockBasedTableOptions;
531
4
}
532
533
23.3k
int32_t GetGlobalRocksDBPriorityThreadPoolSize() {
534
23.3k
  if (FLAGS_rocksdb_disable_compactions) {
535
7
    return 1;
536
7
  }
537
538
23.3k
  auto priority_thread_pool_size = FLAGS_priority_thread_pool_size;
539
540
23.3k
  if (priority_thread_pool_size == 0) {
541
0
    LOG(FATAL) << "--priority_thread_pool_size may not be set to zero with compactions "
542
0
        << "enabled. Either change this flag or set --rocksdb_disable_compactions=true.";
543
23.3k
  } else if (priority_thread_pool_size > 0) {
544
1
    return priority_thread_pool_size;
545
1
  }
546
547
23.3k
  if (FLAGS_rocksdb_max_background_compactions != -1) {
548
    // If we did set the per-rocksdb flag, but not FLAGS_priority_thread_pool_size, just port
549
    // over that value.
550
4
    priority_thread_pool_size = GetMaxBackgroundCompactions();
551
23.3k
  } else {
552
23.3k
    const int kNumCpus = base::NumCPUs();
553
    // If we did not override the per-rocksdb queue size, then just use a production friendly
554
    // formula.
555
    //
556
    // For less then 8cpus, just manually tune to 1-2 threads. Above that, we can use 3.5/8.
557
23.3k
    if (kNumCpus < 4) {
558
1
      priority_thread_pool_size = 1;
559
23.3k
    } else if (kNumCpus < 8) {
560
1
      priority_thread_pool_size = 2;
561
23.3k
    } else {
562
23.3k
      priority_thread_pool_size = (int32_t) std::floor(kNumCpus * 3.5 / 8.0);
563
23.3k
    }
564
23.3k
  }
565
566
23.3k
  LOG(INFO) << "FLAGS_priority_thread_pool_size was not set, automatically configuring to "
567
23.3k
      << priority_thread_pool_size << ".";
568
569
23.3k
  return priority_thread_pool_size;
570
23.3k
}
571
572
void InitRocksDBOptions(
573
    rocksdb::Options* options, const string& log_prefix,
574
    const shared_ptr<rocksdb::Statistics>& statistics,
575
    const tablet::TabletOptions& tablet_options,
576
527k
    rocksdb::BlockBasedTableOptions table_options) {
577
527k
  AutoInitFromRocksDBFlags(options);
578
527k
  SetLogPrefix(options, log_prefix);
579
527k
  options->create_if_missing = true;
580
527k
  options->disableDataSync = true;
581
527k
  options->statistics = statistics;
582
527k
  options->info_log_level = YBRocksDBLogger::ConvertToRocksDBLogLevel(FLAGS_minloglevel);
583
527k
  options->initial_seqno = FLAGS_initial_seqno;
584
527k
  options->boundary_extractor = DocBoundaryValuesExtractorInstance();
585
527k
  options->compaction_measure_io_stats = FLAGS_rocksdb_compaction_measure_io_stats;
586
527k
  options->memory_monitor = tablet_options.memory_monitor;
587
527k
  if (FLAGS_db_write_buffer_size != -1) {
588
410
    options->write_buffer_size = FLAGS_db_write_buffer_size;
589
527k
  } else {
590
527k
    options->write_buffer_size = FLAGS_memstore_size_mb * 1_MB;
591
527k
  }
592
527k
  options->env = tablet_options.rocksdb_env;
593
527k
  options->checkpoint_env = rocksdb::Env::Default();
594
527k
  options->priority_thread_pool_for_compactions_and_flushes =
595
527k
    (tablet_options.ServerMetricEntity) ?
596
438k
    GetGlobalPriorityThreadPool(tablet_options.ServerMetricEntity) :
597
527k
    
GetGlobalPriorityThreadPool()89.3k
;
598
599
527k
  if (FLAGS_num_reserved_small_compaction_threads != -1) {
600
0
    options->num_reserved_small_compaction_threads = FLAGS_num_reserved_small_compaction_threads;
601
0
  }
602
603
  // Since the flag validator for FLAGS_compression_type will fail if the result of this call is not
604
  // OK, this CHECK_RESULT should never fail and is safe.
605
527k
  options->compression = CHECK_RESULT(GetConfiguredCompressionType(FLAGS_compression_type));
606
607
527k
  options->listeners.insert(
608
527k
      options->listeners.end(), tablet_options.listeners.begin(),
609
527k
      tablet_options.listeners.end()); // Append listeners
610
611
  // Set block cache options.
612
527k
  if (tablet_options.block_cache) {
613
451k
    table_options.block_cache = tablet_options.block_cache;
614
    // Cache the bloom filters in the block cache.
615
451k
    table_options.cache_index_and_filter_blocks = true;
616
451k
  } else {
617
75.8k
    table_options.no_block_cache = true;
618
75.8k
    table_options.cache_index_and_filter_blocks = false;
619
75.8k
  }
620
621
527k
  AutoInitFromBlockBasedTableOptions(&table_options);
622
623
  // Set our custom bloom filter that is docdb aware.
624
527k
  if (FLAGS_use_docdb_aware_bloom_filter) {
625
515k
    const auto filter_block_size_bits = table_options.filter_block_size * 8;
626
515k
    table_options.filter_policy = std::make_shared<const DocDbAwareV3FilterPolicy>(
627
515k
        filter_block_size_bits, options->info_log.get());
628
515k
    table_options.supported_filter_policies =
629
515k
        std::make_shared<rocksdb::BlockBasedTableOptions::FilterPoliciesMap>();
630
515k
    AddSupportedFilterPolicy(std::make_shared<const DocDbAwareHashedComponentsFilterPolicy>(
631
515k
            filter_block_size_bits, options->info_log.get()), &table_options);
632
515k
    AddSupportedFilterPolicy(std::make_shared<const DocDbAwareV2FilterPolicy>(
633
515k
            filter_block_size_bits, options->info_log.get()), &table_options);
634
515k
  }
635
636
527k
  if (
FLAGS_use_multi_level_index527k
) {
637
527k
    table_options.index_type = rocksdb::IndexType::kMultiLevelBinarySearch;
638
18.4E
  } else {
639
18.4E
    table_options.index_type = rocksdb::IndexType::kBinarySearch;
640
18.4E
  }
641
642
527k
  options->table_factory.reset(rocksdb::NewBlockBasedTableFactory(table_options));
643
644
  // Compaction related options.
645
646
  // Enable universal style compactions.
647
527k
  bool compactions_enabled = !FLAGS_rocksdb_disable_compactions;
648
527k
  options->compaction_style = compactions_enabled
649
527k
    ? rocksdb::CompactionStyle::kCompactionStyleUniversal
650
18.4E
    : rocksdb::CompactionStyle::kCompactionStyleNone;
651
  // Set the number of levels to 1.
652
527k
  options->num_levels = 1;
653
654
527k
  AutoInitFromRocksDBFlags(options);
655
527k
  if (
compactions_enabled527k
) {
656
527k
    options->level0_file_num_compaction_trigger = FLAGS_rocksdb_level0_file_num_compaction_trigger;
657
527k
    options->level0_slowdown_writes_trigger = max_if_negative(
658
527k
        FLAGS_rocksdb_level0_slowdown_writes_trigger);
659
527k
    options->level0_stop_writes_trigger = max_if_negative(FLAGS_rocksdb_level0_stop_writes_trigger);
660
    // This determines the algo used to compute which files will be included. The "total size" based
661
    // computation compares the size of every new file with the sum of all files included so far.
662
527k
    options->compaction_options_universal.stop_style =
663
527k
        rocksdb::CompactionStopStyle::kCompactionStopStyleTotalSize;
664
527k
    options->compaction_options_universal.size_ratio =
665
527k
        FLAGS_rocksdb_universal_compaction_size_ratio;
666
527k
    options->compaction_options_universal.always_include_size_threshold =
667
527k
        FLAGS_rocksdb_universal_compaction_always_include_size_threshold;
668
527k
    options->compaction_options_universal.min_merge_width =
669
527k
        FLAGS_rocksdb_universal_compaction_min_merge_width;
670
527k
    options->compaction_size_threshold_bytes = FLAGS_rocksdb_compaction_size_threshold_bytes;
671
527k
    options->rate_limiter = tablet_options.rate_limiter ? 
tablet_options.rate_limiter438k
672
527k
                                                        : 
CreateRocksDBRateLimiter()89.6k
;
673
18.4E
  } else {
674
18.4E
    options->level0_slowdown_writes_trigger = std::numeric_limits<int>::max();
675
18.4E
    options->level0_stop_writes_trigger = std::numeric_limits<int>::max();
676
18.4E
  }
677
678
527k
  options->max_write_buffer_number = FLAGS_rocksdb_max_write_buffer_number;
679
680
527k
  options->memtable_factory = std::make_shared<rocksdb::SkipListFactory>(
681
527k
      0 /* lookahead */, rocksdb::ConcurrentWrites::kFalse);
682
683
527k
  options->iterator_replacer = std::make_shared<rocksdb::IteratorReplacer>(&WrapIterator);
684
527k
}
685
686
673k
void SetLogPrefix(rocksdb::Options* options, const std::string& log_prefix) {
687
673k
  options->log_prefix = log_prefix;
688
673k
  options->info_log = std::make_shared<YBRocksDBLogger>(options->log_prefix);
689
673k
}
690
691
namespace {
692
693
// Helper class for RocksDBPatcher.
694
class RocksDBPatcherHelper {
695
 public:
696
  explicit RocksDBPatcherHelper(rocksdb::VersionSet* version_set)
697
      : version_set_(version_set), cfd_(version_set->GetColumnFamilySet()->GetDefault()),
698
1.96k
        delete_edit_(cfd_), add_edit_(cfd_) {
699
1.96k
  }
700
701
3.92k
  int Levels() const {
702
3.92k
    return cfd_->NumberLevels();
703
3.92k
  }
704
705
1.96k
  const std::vector<rocksdb::FileMetaData*>& LevelFiles(int level) {
706
1.96k
    return cfd_->current()->storage_info()->LevelFiles(level);
707
1.96k
  }
708
709
  template <class F>
710
1.96k
  auto IterateFiles(const F& f) {
711
    // Auto routing based on f return type.
712
1.96k
    return IterateFilesHelper(f, static_cast<decltype(f(0, *LevelFiles(0).front()))*>(nullptr));
713
1.96k
  }
docdb_rocksdb_util.cc:auto yb::docdb::(anonymous namespace)::RocksDBPatcherHelper::IterateFiles<yb::docdb::RocksDBPatcher::Impl::SetHybridTimeFilter(yb::HybridTime)::'lambda'(int, rocksdb::FileMetaData const&)>(yb::docdb::RocksDBPatcher::Impl::SetHybridTimeFilter(yb::HybridTime)::'lambda'(int, rocksdb::FileMetaData const&) const&)
Line
Count
Source
710
161
  auto IterateFiles(const F& f) {
711
    // Auto routing based on f return type.
712
161
    return IterateFilesHelper(f, static_cast<decltype(f(0, *LevelFiles(0).front()))*>(nullptr));
713
161
  }
docdb_rocksdb_util.cc:auto yb::docdb::(anonymous namespace)::RocksDBPatcherHelper::IterateFiles<yb::docdb::RocksDBPatcher::Impl::ModifyFlushedFrontier(yb::docdb::ConsensusFrontier const&)::'lambda'(int, rocksdb::FileMetaData)>(yb::docdb::RocksDBPatcher::Impl::ModifyFlushedFrontier(yb::docdb::ConsensusFrontier const&)::'lambda'(int, rocksdb::FileMetaData) const&)
Line
Count
Source
710
1.79k
  auto IterateFiles(const F& f) {
711
    // Auto routing based on f return type.
712
1.79k
    return IterateFilesHelper(f, static_cast<decltype(f(0, *LevelFiles(0).front()))*>(nullptr));
713
1.79k
  }
Unexecuted instantiation: docdb_rocksdb_util.cc:auto yb::docdb::(anonymous namespace)::RocksDBPatcherHelper::IterateFiles<yb::docdb::RocksDBPatcher::Impl::UpdateFileSizes()::'lambda'(int, rocksdb::FileMetaData const&)>(yb::docdb::RocksDBPatcher::Impl::UpdateFileSizes()::'lambda'(int, rocksdb::FileMetaData const&) const&)
714
715
3.55k
  void ModifyFile(int level, const rocksdb::FileMetaData& fmd) {
716
3.55k
    delete_edit_->DeleteFile(level, fmd.fd.GetNumber());
717
3.55k
    add_edit_->AddCleanedFile(level, fmd);
718
3.55k
  }
719
720
1.79k
  rocksdb::VersionEdit& Edit() {
721
1.79k
    return *add_edit_;
722
1.79k
  }
723
724
  CHECKED_STATUS Apply(
725
1.96k
      const rocksdb::Options& options, const rocksdb::ImmutableCFOptions& imm_cf_options) {
726
1.96k
    if (!delete_edit_.modified() && 
!add_edit_.modified()168
) {
727
147
      return Status::OK();
728
147
    }
729
730
1.81k
    rocksdb::MutableCFOptions mutable_cf_options(options, imm_cf_options);
731
1.81k
    {
732
1.81k
      rocksdb::InstrumentedMutex mutex;
733
1.81k
      rocksdb::InstrumentedMutexLock lock(&mutex);
734
3.62k
      for (auto* edit : {&delete_edit_, &add_edit_}) {
735
3.62k
        if (edit->modified()) {
736
3.60k
          RETURN_NOT_OK(version_set_->LogAndApply(cfd_, mutable_cf_options, edit->get(), &mutex));
737
3.60k
        }
738
3.62k
      }
739
1.81k
    }
740
741
1.81k
    return Status::OK();
742
1.81k
  }
743
744
 private:
745
  template <class F>
746
1.96k
  void IterateFilesHelper(const F& f, void*) {
747
3.92k
    for (int level = 0; level < Levels(); 
++level1.96k
) {
748
3.72k
      for (const auto* file : LevelFiles(level)) {
749
3.72k
        f(level, *file);
750
3.72k
      }
751
1.96k
    }
752
1.96k
  }
docdb_rocksdb_util.cc:void yb::docdb::(anonymous namespace)::RocksDBPatcherHelper::IterateFilesHelper<yb::docdb::RocksDBPatcher::Impl::SetHybridTimeFilter(yb::HybridTime)::'lambda'(int, rocksdb::FileMetaData const&)>(yb::docdb::RocksDBPatcher::Impl::SetHybridTimeFilter(yb::HybridTime)::'lambda'(int, rocksdb::FileMetaData const&) const&, void*)
Line
Count
Source
746
161
  void IterateFilesHelper(const F& f, void*) {
747
322
    for (int level = 0; level < Levels(); 
++level161
) {
748
185
      for (const auto* file : LevelFiles(level)) {
749
185
        f(level, *file);
750
185
      }
751
161
    }
752
161
  }
docdb_rocksdb_util.cc:void yb::docdb::(anonymous namespace)::RocksDBPatcherHelper::IterateFilesHelper<yb::docdb::RocksDBPatcher::Impl::ModifyFlushedFrontier(yb::docdb::ConsensusFrontier const&)::'lambda'(int, rocksdb::FileMetaData)>(yb::docdb::RocksDBPatcher::Impl::ModifyFlushedFrontier(yb::docdb::ConsensusFrontier const&)::'lambda'(int, rocksdb::FileMetaData) const&, void*)
Line
Count
Source
746
1.79k
  void IterateFilesHelper(const F& f, void*) {
747
3.59k
    for (int level = 0; level < Levels(); 
++level1.79k
) {
748
3.53k
      for (const auto* file : LevelFiles(level)) {
749
3.53k
        f(level, *file);
750
3.53k
      }
751
1.79k
    }
752
1.79k
  }
753
754
  template <class F>
755
0
  CHECKED_STATUS IterateFilesHelper(const F& f, Status*) {
756
0
    for (int level = 0; level < Levels(); ++level) {
757
0
      for (const auto* file : LevelFiles(level)) {
758
0
        RETURN_NOT_OK(f(level, *file));
759
0
      }
760
0
    }
761
0
    return Status::OK();
762
0
  }
763
764
  class TrackedEdit {
765
   public:
766
3.92k
    explicit TrackedEdit(rocksdb::ColumnFamilyData* cfd) {
767
3.92k
      edit_.SetColumnFamily(cfd->GetID());
768
3.92k
    }
769
770
12.5k
    rocksdb::VersionEdit* get() {
771
12.5k
      modified_ = true;
772
12.5k
      return &edit_;
773
12.5k
    }
774
775
7.10k
    rocksdb::VersionEdit* operator->() {
776
7.10k
      return get();
777
7.10k
    }
778
779
1.79k
    rocksdb::VersionEdit& operator*() {
780
1.79k
      return *get();
781
1.79k
    }
782
783
5.75k
    bool modified() const {
784
5.75k
      return modified_;
785
5.75k
    }
786
787
   private:
788
    rocksdb::VersionEdit edit_;
789
    bool modified_ = false;
790
  };
791
792
  rocksdb::VersionSet* version_set_;
793
  rocksdb::ColumnFamilyData* cfd_;
794
  TrackedEdit delete_edit_;
795
  TrackedEdit add_edit_;
796
};
797
798
} // namespace
799
800
class RocksDBPatcher::Impl {
801
 public:
802
  Impl(const std::string& dbpath, const rocksdb::Options& options)
803
      : options_(SanitizeOptions(dbpath, &comparator_, options)),
804
        imm_cf_options_(options_),
805
        env_options_(options_),
806
        cf_options_(options_),
807
1.93k
        version_set_(dbpath, &options_, env_options_, block_cache_.get(), &write_buffer_, nullptr) {
808
1.93k
    cf_options_.comparator = comparator_.user_comparator();
809
1.93k
  }
810
811
1.93k
  CHECKED_STATUS Load() {
812
1.93k
    std::vector<rocksdb::ColumnFamilyDescriptor> column_families;
813
1.93k
    column_families.emplace_back("default", cf_options_);
814
1.93k
    return version_set_.Recover(column_families);
815
1.93k
  }
816
817
161
  CHECKED_STATUS SetHybridTimeFilter(HybridTime value) {
818
161
    RocksDBPatcherHelper helper(&version_set_);
819
820
185
    helper.IterateFiles([&helper, value](int level, const rocksdb::FileMetaData& file) {
821
185
      if (!file.largest.user_frontier) {
822
0
        return;
823
0
      }
824
185
      auto& consensus_frontier = down_cast<ConsensusFrontier&>(*file.largest.user_frontier);
825
185
      if (consensus_frontier.hybrid_time() <= value ||
826
185
          
consensus_frontier.hybrid_time_filter() <= value14
) {
827
171
        return;
828
171
      }
829
14
      rocksdb::FileMetaData fmd = file;
830
14
      down_cast<ConsensusFrontier&>(*fmd.largest.user_frontier).set_hybrid_time_filter(value);
831
14
      helper.ModifyFile(level, fmd);
832
14
    });
833
834
161
    return helper.Apply(options_, imm_cf_options_);
835
161
  }
836
837
1.79k
  CHECKED_STATUS ModifyFlushedFrontier(const ConsensusFrontier& frontier) {
838
1.79k
    RocksDBPatcherHelper helper(&version_set_);
839
840
1.79k
    docdb::ConsensusFrontier final_frontier = frontier;
841
842
1.79k
    auto* existing_frontier = down_cast<docdb::ConsensusFrontier*>(version_set_.FlushedFrontier());
843
1.79k
    if (existing_frontier) {
844
1.78k
      if (!frontier.history_cutoff()) {
845
1.78k
        final_frontier.set_history_cutoff(existing_frontier->history_cutoff());
846
1.78k
      }
847
1.78k
      if (!frontier.op_id()) {
848
        // Update op id only if it was specified in frontier.
849
0
        final_frontier.set_op_id(existing_frontier->op_id());
850
0
      }
851
1.78k
    }
852
853
1.79k
    helper.Edit().ModifyFlushedFrontier(
854
1.79k
        final_frontier.Clone(), rocksdb::FrontierModificationMode::kForce);
855
856
3.53k
    helper.IterateFiles([&helper, &frontier](int level, rocksdb::FileMetaData fmd) {
857
3.53k
      bool modified = false;
858
7.07k
      for (auto* user_frontier : {&fmd.smallest.user_frontier, &fmd.largest.user_frontier}) {
859
7.07k
        if (!*user_frontier) {
860
0
          continue;
861
0
        }
862
7.07k
        auto& consensus_frontier = down_cast<ConsensusFrontier&>(**user_frontier);
863
7.07k
        if (!consensus_frontier.op_id().empty()) {
864
7.07k
          consensus_frontier.set_op_id(OpId());
865
7.07k
          modified = true;
866
7.07k
        }
867
7.07k
        if (frontier.history_cutoff()) {
868
0
          consensus_frontier.set_history_cutoff(frontier.history_cutoff());
869
0
          modified = true;
870
0
        }
871
7.07k
      }
872
3.53k
      if (modified) {
873
3.53k
        helper.ModifyFile(level, fmd);
874
3.53k
      }
875
3.53k
    });
876
877
1.79k
    return helper.Apply(options_, imm_cf_options_);
878
1.79k
  }
879
880
0
  CHECKED_STATUS UpdateFileSizes() {
881
0
    RocksDBPatcherHelper helper(&version_set_);
882
883
0
    RETURN_NOT_OK(helper.IterateFiles(
884
0
        [&helper, this](int level, const rocksdb::FileMetaData& file) -> Status {
885
0
      auto base_path = rocksdb::MakeTableFileName(
886
0
          options_.db_paths[file.fd.GetPathId()].path, file.fd.GetNumber());
887
0
      auto data_path = rocksdb::TableBaseToDataFileName(base_path);
888
0
      auto base_size = VERIFY_RESULT(options_.env->GetFileSize(base_path));
889
0
      auto data_size = VERIFY_RESULT(options_.env->GetFileSize(data_path));
890
0
      auto total_size = base_size + data_size;
891
0
      if (file.fd.base_file_size == base_size && file.fd.total_file_size == total_size) {
892
0
        return Status::OK();
893
0
      }
894
0
      rocksdb::FileMetaData fmd = file;
895
0
      fmd.fd.base_file_size = base_size;
896
0
      fmd.fd.total_file_size = total_size;
897
0
      helper.ModifyFile(level, fmd);
898
0
      return Status::OK();
899
0
    }));
900
901
0
    return helper.Apply(options_, imm_cf_options_);
902
0
  }
903
904
 private:
905
  const rocksdb::InternalKeyComparator comparator_{rocksdb::BytewiseComparator()};
906
  rocksdb::WriteBuffer write_buffer_{1_KB};
907
  std::shared_ptr<rocksdb::Cache> block_cache_{rocksdb::NewLRUCache(1_MB)};
908
909
  rocksdb::Options options_;
910
  rocksdb::ImmutableCFOptions imm_cf_options_;
911
  rocksdb::EnvOptions env_options_;
912
  rocksdb::ColumnFamilyOptions cf_options_;
913
  rocksdb::VersionSet version_set_;
914
};
915
916
RocksDBPatcher::RocksDBPatcher(const std::string& dbpath, const rocksdb::Options& options)
917
1.93k
    : impl_(new Impl(dbpath, options)) {
918
1.93k
}
919
920
1.93k
RocksDBPatcher::~RocksDBPatcher() {
921
1.93k
}
922
923
1.93k
Status RocksDBPatcher::Load() {
924
1.93k
  return impl_->Load();
925
1.93k
}
926
927
161
Status RocksDBPatcher::SetHybridTimeFilter(HybridTime value) {
928
161
  return impl_->SetHybridTimeFilter(value);
929
161
}
930
931
1.79k
Status RocksDBPatcher::ModifyFlushedFrontier(const ConsensusFrontier& frontier) {
932
1.79k
  return impl_->ModifyFlushedFrontier(frontier);
933
1.79k
}
934
935
0
Status RocksDBPatcher::UpdateFileSizes() {
936
0
  return impl_->UpdateFileSizes();
937
0
}
938
939
235
Status ForceRocksDBCompact(rocksdb::DB* db) {
940
235
  RETURN_NOT_OK_PREPEND(
941
235
      db->CompactRange(rocksdb::CompactRangeOptions(), /* begin = */ nullptr, /* end = */ nullptr),
942
235
      "Compact range failed:");
943
235
  return Status::OK();
944
235
}
945
946
8.75k
RateLimiterSharingMode GetRocksDBRateLimiterSharingMode() {
947
8.75k
  auto result = ParseEnumInsensitive<RateLimiterSharingMode>(
948
8.75k
      FLAGS_rocksdb_compact_flush_rate_limit_sharing_mode);
949
8.75k
  if (PREDICT_TRUE(result.ok())) {
950
8.75k
    return *result;
951
8.75k
  }
952
0
  LOG(DFATAL) << result.status();
953
0
  return RateLimiterSharingMode::NONE;
954
8.75k
}
955
956
98.3k
std::shared_ptr<rocksdb::RateLimiter> CreateRocksDBRateLimiter() {
957
98.3k
  if (PREDICT_TRUE((FLAGS_rocksdb_compact_flush_rate_limit_bytes_per_sec > 0))) {
958
98.3k
    return std::shared_ptr<rocksdb::RateLimiter>(
959
98.3k
      rocksdb::NewGenericRateLimiter(FLAGS_rocksdb_compact_flush_rate_limit_bytes_per_sec));
960
98.3k
  }
961
11
  return nullptr;
962
98.3k
}
963
964
} // namespace docdb
965
} // namespace yb