/Users/deen/code/yugabyte-db/src/yb/docdb/docdb_rocksdb_util.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | // |
13 | | |
14 | | #include "yb/docdb/docdb_rocksdb_util.h" |
15 | | |
16 | | #include <memory> |
17 | | #include <thread> |
18 | | |
19 | | #include "yb/common/transaction.h" |
20 | | |
21 | | #include "yb/docdb/bounded_rocksdb_iterator.h" |
22 | | #include "yb/docdb/consensus_frontier.h" |
23 | | #include "yb/docdb/doc_key.h" |
24 | | #include "yb/docdb/intent_aware_iterator.h" |
25 | | #include "yb/docdb/key_bounds.h" |
26 | | #include "yb/docdb/value_type.h" |
27 | | |
28 | | #include "yb/gutil/casts.h" |
29 | | #include "yb/gutil/sysinfo.h" |
30 | | |
31 | | #include "yb/rocksdb/db/db_impl.h" |
32 | | #include "yb/rocksdb/db/filename.h" |
33 | | #include "yb/rocksdb/db/version_edit.h" |
34 | | #include "yb/rocksdb/db/version_set.h" |
35 | | #include "yb/rocksdb/db/writebuffer.h" |
36 | | #include "yb/rocksdb/memtablerep.h" |
37 | | #include "yb/rocksdb/options.h" |
38 | | #include "yb/rocksdb/rate_limiter.h" |
39 | | #include "yb/rocksdb/table.h" |
40 | | #include "yb/rocksdb/table/filtering_iterator.h" |
41 | | #include "yb/rocksdb/types.h" |
42 | | #include "yb/rocksdb/util/compression.h" |
43 | | |
44 | | #include "yb/rocksutil/yb_rocksdb_logger.h" |
45 | | |
46 | | #include "yb/util/bytes_formatter.h" |
47 | | #include "yb/util/priority_thread_pool.h" |
48 | | #include "yb/util/result.h" |
49 | | #include "yb/util/size_literals.h" |
50 | | #include "yb/util/status.h" |
51 | | #include "yb/util/status_format.h" |
52 | | #include "yb/util/status_log.h" |
53 | | #include "yb/util/trace.h" |
54 | | |
55 | | using namespace yb::size_literals; // NOLINT. |
56 | | using namespace std::literals; |
57 | | |
58 | | static constexpr int32_t kMinBlockStartInterval = 1; |
59 | | static constexpr int32_t kDefaultBlockStartInterval = 16; |
60 | | static constexpr int32_t kMaxBlockStartInterval = 256; |
61 | | |
62 | | DEFINE_int32(rocksdb_max_background_flushes, -1, "Number threads to do background flushes."); |
63 | | DEFINE_bool(rocksdb_disable_compactions, false, "Disable rocksdb compactions."); |
64 | | DEFINE_bool(rocksdb_compaction_measure_io_stats, false, "Measure stats for rocksdb compactions."); |
65 | | DEFINE_int32(rocksdb_base_background_compactions, -1, |
66 | | "Number threads to do background compactions."); |
67 | | DEFINE_int32(rocksdb_max_background_compactions, -1, |
68 | | "Increased number of threads to do background compactions (used when compactions need " |
69 | | "to catch up.) Unless rocksdb_disable_compactions=true, this cannot be set to zero."); |
70 | | DEFINE_int32(rocksdb_level0_file_num_compaction_trigger, 5, |
71 | | "Number of files to trigger level-0 compaction. -1 if compaction should not be " |
72 | | "triggered by number of files at all."); |
73 | | |
74 | | DEFINE_int32(rocksdb_level0_slowdown_writes_trigger, -1, |
75 | | "The number of files above which writes are slowed down."); |
76 | | DEFINE_int32(rocksdb_level0_stop_writes_trigger, -1, |
77 | | "The number of files above which compactions are stopped."); |
78 | | DEFINE_int32(rocksdb_universal_compaction_size_ratio, 20, |
79 | | "The percentage upto which files that are larger are include in a compaction."); |
80 | | DEFINE_uint64(rocksdb_universal_compaction_always_include_size_threshold, 64_MB, |
81 | | "Always include files of smaller or equal size in a compaction."); |
82 | | DEFINE_int32(rocksdb_universal_compaction_min_merge_width, 4, |
83 | | "The minimum number of files in a single compaction run."); |
84 | | DEFINE_int64(rocksdb_compact_flush_rate_limit_bytes_per_sec, 1_GB, |
85 | | "Use to control write rate of flush and compaction."); |
86 | | DEFINE_string(rocksdb_compact_flush_rate_limit_sharing_mode, "tserver", |
87 | | "Allows to control rate limit sharing/calculation across RocksDB instances\n" |
88 | | " tserver - rate limit is shared across all RocksDB instances" |
89 | | " at tabset server level\n" |
90 | | " none - rate limit is calculated independently for every RocksDB instance"); |
91 | | DEFINE_uint64(rocksdb_compaction_size_threshold_bytes, 2ULL * 1024 * 1024 * 1024, |
92 | | "Threshold beyond which compaction is considered large."); |
93 | | DEFINE_uint64(rocksdb_max_file_size_for_compaction, 0, |
94 | | "Maximal allowed file size to participate in RocksDB compaction. 0 - unlimited."); |
95 | | DEFINE_int32(rocksdb_max_write_buffer_number, 2, |
96 | | "Maximum number of write buffers that are built up in memory."); |
97 | | |
98 | | DEFINE_int64(db_block_size_bytes, 32_KB, |
99 | | "Size of RocksDB data block (in bytes)."); |
100 | | |
101 | | DEFINE_int64(db_filter_block_size_bytes, 64_KB, |
102 | | "Size of RocksDB filter block (in bytes)."); |
103 | | |
104 | | DEFINE_int64(db_index_block_size_bytes, 32_KB, |
105 | | "Size of RocksDB index block (in bytes)."); |
106 | | |
107 | | DEFINE_int64(db_min_keys_per_index_block, 100, |
108 | | "Minimum number of keys per index block."); |
109 | | |
110 | | DEFINE_int64(db_write_buffer_size, -1, |
111 | | "Size of RocksDB write buffer (in bytes). -1 to use default."); |
112 | | |
113 | | DEFINE_int32(memstore_size_mb, 128, |
114 | | "Max size (in mb) of the memstore, before needing to flush."); |
115 | | |
116 | | DEFINE_bool(use_docdb_aware_bloom_filter, true, |
117 | | "Whether to use the DocDbAwareFilterPolicy for both bloom storage and seeks."); |
118 | | // Empirically 2 is a minimal value that provides best performance on sequential scan. |
119 | | DEFINE_int32(max_nexts_to_avoid_seek, 2, |
120 | | "The number of next calls to try before doing resorting to do a rocksdb seek."); |
121 | | |
122 | | DEFINE_bool(use_multi_level_index, true, "Whether to use multi-level data index."); |
123 | | |
124 | | DEFINE_string( |
125 | | regular_tablets_data_block_key_value_encoding, "shared_prefix", |
126 | | "Key-value encoding to use for regular data blocks in RocksDB. Possible options: " |
127 | | "shared_prefix, three_shared_parts"); |
128 | | |
129 | | DEFINE_uint64(initial_seqno, 1ULL << 50, "Initial seqno for new RocksDB instances."); |
130 | | |
131 | | DEFINE_int32(num_reserved_small_compaction_threads, -1, "Number of reserved small compaction " |
132 | | "threads. It allows splitting small vs. large compactions."); |
133 | | |
134 | | DEFINE_bool(enable_ondisk_compression, true, |
135 | | "Determines whether SSTable compression is enabled or not."); |
136 | | |
137 | | DEFINE_int32(priority_thread_pool_size, -1, |
138 | | "Max running workers in compaction thread pool. " |
139 | | "If -1 and max_background_compactions is specified - use max_background_compactions. " |
140 | | "If -1 and max_background_compactions is not specified - use sqrt(num_cpus)."); |
141 | | |
142 | | DEFINE_string(compression_type, "Snappy", |
143 | | "On-disk compression type to use in RocksDB." |
144 | | "By default, Snappy is used if supported."); |
145 | | |
146 | | DEFINE_int32(block_restart_interval, kDefaultBlockStartInterval, |
147 | | "Controls the number of keys to look at for computing the diff encoding."); |
148 | | |
149 | | namespace yb { |
150 | | |
151 | | namespace { |
152 | | |
153 | 544k | Result<rocksdb::CompressionType> GetConfiguredCompressionType(const std::string& flag_value) { |
154 | 544k | if (!FLAGS_enable_ondisk_compression) { |
155 | 1 | return rocksdb::kNoCompression; |
156 | 1 | } |
157 | 544k | const std::vector<rocksdb::CompressionType> kValidRocksDBCompressionTypes = { |
158 | 544k | rocksdb::kNoCompression, |
159 | 544k | rocksdb::kSnappyCompression, |
160 | 544k | rocksdb::kZlibCompression, |
161 | 544k | rocksdb::kLZ4Compression |
162 | 544k | }; |
163 | 1.08M | for (const auto& compression_type : kValidRocksDBCompressionTypes) { |
164 | 1.08M | if (flag_value == rocksdb::CompressionTypeToString(compression_type)) { |
165 | 544k | if (rocksdb::CompressionTypeSupported(compression_type)) { |
166 | 544k | return compression_type; |
167 | 544k | } |
168 | 30 | return STATUS_FORMAT( |
169 | 544k | InvalidArgument, "Configured compression type $0 is not supported.", flag_value); |
170 | 544k | } |
171 | 1.08M | } |
172 | 18.4E | return STATUS_FORMAT( |
173 | 544k | InvalidArgument, "Configured compression type $0 is not valid.", flag_value); |
174 | 544k | } |
175 | | |
176 | | } // namespace |
177 | | |
178 | | namespace docdb { |
179 | | |
180 | | Result<rocksdb::KeyValueEncodingFormat> GetConfiguredKeyValueEncodingFormat( |
181 | 196k | const std::string& flag_value) { |
182 | 196k | for (const auto& encoding_format : rocksdb::kKeyValueEncodingFormatList) { |
183 | 195k | if (flag_value == KeyValueEncodingFormatToString(encoding_format)195k ) { |
184 | 195k | return encoding_format; |
185 | 195k | } |
186 | 195k | } |
187 | 29 | return STATUS_FORMAT(InvalidArgument, "Key-value encoding format $0 is not valid.", flag_value); |
188 | 196k | } |
189 | | |
190 | | } // namespace docdb |
191 | | |
192 | | } // namespace yb |
193 | | |
194 | | namespace { |
195 | | |
196 | 16.8k | bool CompressionTypeValidator(const char* flagname, const std::string& flag_compression_type) { |
197 | 16.8k | auto res = yb::GetConfiguredCompressionType(flag_compression_type); |
198 | 16.8k | if (!res.ok()) { |
199 | | // Below we CHECK_RESULT on the same value returned here, and validating the result here ensures |
200 | | // that CHECK_RESULT will never fail once the process is running. |
201 | 0 | LOG(ERROR) << res.status().ToString(); |
202 | 0 | return false; |
203 | 0 | } |
204 | 16.8k | return true; |
205 | 16.8k | } |
206 | | |
207 | 16.8k | bool KeyValueEncodingFormatValidator(const char* flag_name, const std::string& flag_value) { |
208 | 16.8k | auto res = yb::docdb::GetConfiguredKeyValueEncodingFormat(flag_value); |
209 | 16.8k | bool ok = res.ok(); |
210 | 16.8k | if (!ok) { |
211 | 0 | LOG(ERROR) << flag_name << ": " << res.status(); |
212 | 0 | } |
213 | 16.8k | return ok; |
214 | 16.8k | } |
215 | | |
216 | | } // namespace |
217 | | |
218 | | DEFINE_validator(compression_type, &CompressionTypeValidator); |
219 | | DEFINE_validator(regular_tablets_data_block_key_value_encoding, &KeyValueEncodingFormatValidator); |
220 | | |
221 | | using std::shared_ptr; |
222 | | using std::string; |
223 | | using std::unique_ptr; |
224 | | using strings::Substitute; |
225 | | |
226 | | namespace yb { |
227 | | namespace docdb { |
228 | | |
229 | | std::shared_ptr<rocksdb::BoundaryValuesExtractor> DocBoundaryValuesExtractorInstance(); |
230 | | |
231 | 1.27G | void SeekForward(const rocksdb::Slice& slice, rocksdb::Iterator *iter) { |
232 | 1.27G | if (!iter->Valid() || iter->key().compare(slice) >= 01.12G ) { |
233 | 528M | return; |
234 | 528M | } |
235 | 747M | ROCKSDB_SEEK(iter, slice); |
236 | 747M | } |
237 | | |
238 | 234M | void SeekForward(const KeyBytes& key_bytes, rocksdb::Iterator *iter) { |
239 | 234M | SeekForward(key_bytes.AsSlice(), iter); |
240 | 234M | } |
241 | | |
242 | 98.3M | KeyBytes AppendDocHt(const Slice& key, const DocHybridTime& doc_ht) { |
243 | 98.3M | char buf[kMaxBytesPerEncodedHybridTime + 1]; |
244 | 98.3M | buf[0] = ValueTypeAsChar::kHybridTime; |
245 | 98.3M | auto end = doc_ht.EncodedInDocDbFormat(buf + 1); |
246 | 98.3M | return KeyBytes(key, Slice(buf, end)); |
247 | 98.3M | } |
248 | | |
249 | 98.3M | void SeekPastSubKey(const Slice& key, rocksdb::Iterator* iter) { |
250 | 98.3M | SeekForward(AppendDocHt(key, DocHybridTime::kMin), iter); |
251 | 98.3M | } |
252 | | |
253 | 135M | void SeekOutOfSubKey(KeyBytes* key_bytes, rocksdb::Iterator* iter) { |
254 | 135M | key_bytes->AppendValueType(ValueType::kMaxByte); |
255 | 135M | SeekForward(*key_bytes, iter); |
256 | 135M | key_bytes->RemoveValueTypeSuffix(ValueType::kMaxByte); |
257 | 135M | } |
258 | | |
259 | | void SeekPossiblyUsingNext(rocksdb::Iterator* iter, const Slice& seek_key, |
260 | 770M | int* next_count, int* seek_count) { |
261 | 1.57G | for (int nexts = FLAGS_max_nexts_to_avoid_seek; nexts-- > 0;) { |
262 | 1.53G | if (!iter->Valid() || iter->key().compare(seek_key) >= 01.53G ) { |
263 | 736M | VTRACE(3, "Did $0 Next(s) instead of a Seek", nexts); |
264 | 736M | return; |
265 | 736M | } |
266 | 803M | VLOG(4) << "Skipping: " << SubDocKey::DebugSliceToString(iter->key())282k ; |
267 | | |
268 | 803M | iter->Next(); |
269 | 803M | ++*next_count; |
270 | 803M | } |
271 | | |
272 | 34.1M | VTRACE(3, "Forced to do an actual Seek after $0 Next(s)", FLAGS_max_nexts_to_avoid_seek); |
273 | 34.1M | iter->Seek(seek_key); |
274 | 34.1M | ++*seek_count; |
275 | 34.1M | } |
276 | | |
277 | | void PerformRocksDBSeek( |
278 | | rocksdb::Iterator *iter, |
279 | | const rocksdb::Slice &seek_key, |
280 | | const char* file_name, |
281 | 800M | int line) { |
282 | 800M | int next_count = 0; |
283 | 800M | int seek_count = 0; |
284 | 800M | if (seek_key.size() == 0) { |
285 | 98.2k | iter->SeekToFirst(); |
286 | 98.2k | ++seek_count; |
287 | 799M | } else if (!iter->Valid() || iter->key().compare(seek_key) > 0771M ) { |
288 | 31.6M | iter->Seek(seek_key); |
289 | 31.6M | ++seek_count; |
290 | 768M | } else { |
291 | 768M | SeekPossiblyUsingNext(iter, seek_key, &next_count, &seek_count); |
292 | 768M | } |
293 | 800M | VLOG(4) << Substitute( |
294 | 1.10M | "PerformRocksDBSeek at $0:$1:\n" |
295 | 1.10M | " Seek key: $2\n" |
296 | 1.10M | " Seek key (raw): $3\n" |
297 | 1.10M | " Actual key: $4\n" |
298 | 1.10M | " Actual key (raw): $5\n" |
299 | 1.10M | " Actual value: $6\n" |
300 | 1.10M | " Next() calls: $7\n" |
301 | 1.10M | " Seek() calls: $8\n", |
302 | 1.10M | file_name, line, |
303 | 1.10M | BestEffortDocDBKeyToStr(seek_key), |
304 | 1.10M | FormatSliceAsStr(seek_key), |
305 | 1.10M | iter->Valid() ? BestEffortDocDBKeyToStr(KeyBytes(iter->key()))0 : "N/A", |
306 | 1.10M | iter->Valid() ? FormatSliceAsStr(iter->key())0 : "N/A", |
307 | 1.10M | iter->Valid() ? FormatSliceAsStr(iter->value())0 : "N/A", |
308 | 1.10M | next_count, |
309 | 1.10M | seek_count); |
310 | 800M | } |
311 | | |
312 | | namespace { |
313 | | |
314 | | rocksdb::ReadOptions PrepareReadOptions( |
315 | | rocksdb::DB* rocksdb, |
316 | | BloomFilterMode bloom_filter_mode, |
317 | | const boost::optional<const Slice>& user_key_for_filter, |
318 | | const rocksdb::QueryId query_id, |
319 | | std::shared_ptr<rocksdb::ReadFileFilter> file_filter, |
320 | 38.0M | const Slice* iterate_upper_bound) { |
321 | 38.0M | rocksdb::ReadOptions read_opts; |
322 | 38.0M | read_opts.query_id = query_id; |
323 | 38.0M | if (FLAGS_use_docdb_aware_bloom_filter && |
324 | 38.0M | bloom_filter_mode == BloomFilterMode::USE_BLOOM_FILTER33.7M ) { |
325 | 21.7M | DCHECK(user_key_for_filter); |
326 | 21.7M | read_opts.table_aware_file_filter = rocksdb->GetOptions().table_factory-> |
327 | 21.7M | NewTableAwareReadFileFilter(read_opts, user_key_for_filter.get()); |
328 | 21.7M | } |
329 | 38.0M | read_opts.file_filter = std::move(file_filter); |
330 | 38.0M | read_opts.iterate_upper_bound = iterate_upper_bound; |
331 | 38.0M | return read_opts; |
332 | 38.0M | } |
333 | | |
334 | | } // namespace |
335 | | |
336 | | BoundedRocksDbIterator CreateRocksDBIterator( |
337 | | rocksdb::DB* rocksdb, |
338 | | const KeyBounds* docdb_key_bounds, |
339 | | BloomFilterMode bloom_filter_mode, |
340 | | const boost::optional<const Slice>& user_key_for_filter, |
341 | | const rocksdb::QueryId query_id, |
342 | | std::shared_ptr<rocksdb::ReadFileFilter> file_filter, |
343 | 17.3M | const Slice* iterate_upper_bound) { |
344 | 17.3M | rocksdb::ReadOptions read_opts = PrepareReadOptions(rocksdb, bloom_filter_mode, |
345 | 17.3M | user_key_for_filter, query_id, std::move(file_filter), iterate_upper_bound); |
346 | 17.3M | return BoundedRocksDbIterator(rocksdb, read_opts, docdb_key_bounds); |
347 | 17.3M | } |
348 | | |
349 | | unique_ptr<IntentAwareIterator> CreateIntentAwareIterator( |
350 | | const DocDB& doc_db, |
351 | | BloomFilterMode bloom_filter_mode, |
352 | | const boost::optional<const Slice>& user_key_for_filter, |
353 | | const rocksdb::QueryId query_id, |
354 | | const TransactionOperationContext& txn_op_context, |
355 | | CoarseTimePoint deadline, |
356 | | const ReadHybridTime& read_time, |
357 | | std::shared_ptr<rocksdb::ReadFileFilter> file_filter, |
358 | 20.7M | const Slice* iterate_upper_bound) { |
359 | | // TODO(dtxn) do we need separate options for intents db? |
360 | 20.7M | rocksdb::ReadOptions read_opts = PrepareReadOptions(doc_db.regular, bloom_filter_mode, |
361 | 20.7M | user_key_for_filter, query_id, std::move(file_filter), iterate_upper_bound); |
362 | 20.7M | return std::make_unique<IntentAwareIterator>( |
363 | 20.7M | doc_db, read_opts, deadline, read_time, txn_op_context); |
364 | 20.7M | } |
365 | | |
366 | | namespace { |
367 | | |
368 | | std::mutex rocksdb_flags_mutex; |
369 | | |
370 | 1.05M | int32_t GetMaxBackgroundFlushes() { |
371 | 1.05M | const auto kNumCpus = base::NumCPUs(); |
372 | 1.05M | if (FLAGS_rocksdb_max_background_flushes == -1) { |
373 | 1.05M | constexpr auto kCpusPerFlushThread = 8; |
374 | 1.05M | constexpr auto kAutoMaxBackgroundFlushesHighLimit = 4; |
375 | 1.05M | auto flushes = 1 + kNumCpus / kCpusPerFlushThread; |
376 | 1.05M | auto max_flushes = std::min(flushes, kAutoMaxBackgroundFlushesHighLimit); |
377 | 1.05M | LOG(INFO) << "Overriding FLAGS_rocksdb_max_background_flushes to " << max_flushes; |
378 | 1.05M | return max_flushes; |
379 | 1.05M | } else { |
380 | 1 | return FLAGS_rocksdb_max_background_flushes; |
381 | 1 | } |
382 | 1.05M | } |
383 | | |
384 | | // This controls the maximum number of schedulable compactions, per each instance of rocksdb, of |
385 | | // which we will have many. We also do not want to waste resources by having too many queued |
386 | | // compactions. |
387 | 2.11M | int32_t GetMaxBackgroundCompactions() { |
388 | 2.11M | if (FLAGS_rocksdb_disable_compactions) { |
389 | 0 | return 0; |
390 | 0 | } |
391 | 2.11M | int rocksdb_max_background_compactions = FLAGS_rocksdb_max_background_compactions; |
392 | | |
393 | 2.11M | if (rocksdb_max_background_compactions == 0) { |
394 | 0 | LOG(FATAL) << "--rocksdb_max_background_compactions may not be set to zero with compactions " |
395 | 0 | << "enabled. Either change this flag or set --rocksdb_disable_compactions=true."; |
396 | 2.11M | } else if (rocksdb_max_background_compactions > 0) { |
397 | 12 | return rocksdb_max_background_compactions; |
398 | 12 | } |
399 | | |
400 | 2.11M | const auto kNumCpus = base::NumCPUs(); |
401 | 2.11M | if (kNumCpus <= 4) { |
402 | 0 | rocksdb_max_background_compactions = 1; |
403 | 2.11M | } else if (kNumCpus <= 8) { |
404 | 0 | rocksdb_max_background_compactions = 2; |
405 | 2.11M | } else if (kNumCpus <= 32) { |
406 | 2.11M | rocksdb_max_background_compactions = 3; |
407 | 2.11M | } else { |
408 | 0 | rocksdb_max_background_compactions = 4; |
409 | 0 | } |
410 | 2.11M | LOG(INFO) << "FLAGS_rocksdb_max_background_compactions was not set, automatically configuring " |
411 | 2.11M | << rocksdb_max_background_compactions << " background compactions."; |
412 | 2.11M | return rocksdb_max_background_compactions; |
413 | 2.11M | } |
414 | | |
415 | 1.05M | int32_t GetBaseBackgroundCompactions() { |
416 | 1.05M | if (FLAGS_rocksdb_disable_compactions) { |
417 | 0 | return 0; |
418 | 0 | } |
419 | | |
420 | 1.05M | if (FLAGS_rocksdb_base_background_compactions == -1) { |
421 | 1.05M | const auto base_background_compactions = GetMaxBackgroundCompactions(); |
422 | 1.05M | LOG(INFO) << "FLAGS_rocksdb_base_background_compactions was not set, automatically configuring " |
423 | 1.05M | << base_background_compactions << " base background compactions."; |
424 | 1.05M | return base_background_compactions; |
425 | 1.05M | } |
426 | | |
427 | 1 | return FLAGS_rocksdb_base_background_compactions; |
428 | 1.05M | } |
429 | | |
430 | | // Auto initialize some of the RocksDB flags. |
431 | 1.05M | void AutoInitFromRocksDBFlags(rocksdb::Options* options) { |
432 | 1.05M | std::unique_lock<std::mutex> lock(rocksdb_flags_mutex); |
433 | | |
434 | 1.05M | options->max_background_flushes = GetMaxBackgroundFlushes(); |
435 | | |
436 | 1.05M | if (FLAGS_rocksdb_disable_compactions) { |
437 | 7 | return; |
438 | 7 | } |
439 | | |
440 | 1.05M | options->max_background_compactions = GetMaxBackgroundCompactions(); |
441 | 1.05M | options->base_background_compactions = GetBaseBackgroundCompactions(); |
442 | 1.05M | } |
443 | | |
444 | 527k | void AutoInitFromBlockBasedTableOptions(rocksdb::BlockBasedTableOptions* table_options) { |
445 | 527k | std::unique_lock<std::mutex> lock(rocksdb_flags_mutex); |
446 | | |
447 | 527k | table_options->block_size = FLAGS_db_block_size_bytes; |
448 | 527k | table_options->filter_block_size = FLAGS_db_filter_block_size_bytes; |
449 | 527k | table_options->index_block_size = FLAGS_db_index_block_size_bytes; |
450 | 527k | table_options->min_keys_per_index_block = FLAGS_db_min_keys_per_index_block; |
451 | | |
452 | 527k | if (FLAGS_block_restart_interval < kMinBlockStartInterval) { |
453 | 1 | LOG(INFO) << "FLAGS_block_restart_interval was set to a very low value, overriding " |
454 | 1 | << "block_restart_interval to " << kDefaultBlockStartInterval << "."; |
455 | 1 | table_options->block_restart_interval = kDefaultBlockStartInterval; |
456 | 527k | } else if (FLAGS_block_restart_interval > kMaxBlockStartInterval) { |
457 | 1 | LOG(INFO) << "FLAGS_block_restart_interval was set to a very high value, overriding " |
458 | 1 | << "block_restart_interval to " << kMaxBlockStartInterval << "."; |
459 | 1 | table_options->block_restart_interval = kMaxBlockStartInterval; |
460 | 527k | } else { |
461 | 527k | table_options->block_restart_interval = FLAGS_block_restart_interval; |
462 | 527k | } |
463 | 527k | } |
464 | | |
465 | | class HybridTimeFilteringIterator : public rocksdb::FilteringIterator { |
466 | | public: |
467 | | HybridTimeFilteringIterator( |
468 | | rocksdb::InternalIterator* iterator, bool arena_mode, HybridTime hybrid_time_filter) |
469 | 36 | : rocksdb::FilteringIterator(iterator, arena_mode), hybrid_time_filter_(hybrid_time_filter) {} |
470 | | |
471 | | private: |
472 | 4.10k | bool Satisfied(Slice key) override { |
473 | 4.10k | auto user_key = rocksdb::ExtractUserKey(key); |
474 | 4.10k | auto doc_ht = DocHybridTime::DecodeFromEnd(&user_key); |
475 | 4.10k | if (!doc_ht.ok()) { |
476 | 0 | LOG(DFATAL) << "Unable to decode doc ht " << rocksdb::ExtractUserKey(key) << ": " |
477 | 0 | << doc_ht.status(); |
478 | 0 | return true; |
479 | 0 | } |
480 | 4.10k | return doc_ht->hybrid_time() <= hybrid_time_filter_; |
481 | 4.10k | } |
482 | | |
483 | | HybridTime hybrid_time_filter_; |
484 | | }; |
485 | | |
486 | | template <class T, class... Args> |
487 | 36 | T* CreateOnArena(rocksdb::Arena* arena, Args&&... args) { |
488 | 36 | if (!arena) { |
489 | 5 | return new T(std::forward<Args>(args)...); |
490 | 5 | } |
491 | 31 | auto mem = arena->AllocateAligned(sizeof(T)); |
492 | 31 | return new (mem) T(std::forward<Args>(args)...); |
493 | 36 | } |
494 | | |
495 | | rocksdb::InternalIterator* WrapIterator( |
496 | 14.2M | rocksdb::InternalIterator* iterator, rocksdb::Arena* arena, const Slice& filter) { |
497 | 14.2M | if (!filter.empty()) { |
498 | 36 | HybridTime hybrid_time_filter; |
499 | 36 | memcpy(&hybrid_time_filter, filter.data(), sizeof(hybrid_time_filter)); |
500 | 36 | return CreateOnArena<HybridTimeFilteringIterator>( |
501 | 36 | arena, iterator, arena != nullptr, hybrid_time_filter); |
502 | 36 | } |
503 | 14.2M | return iterator; |
504 | 14.2M | } |
505 | | |
506 | | void AddSupportedFilterPolicy( |
507 | | const rocksdb::BlockBasedTableOptions::FilterPolicyPtr& filter_policy, |
508 | 1.03M | rocksdb::BlockBasedTableOptions* table_options) { |
509 | 1.03M | table_options->supported_filter_policies->emplace(filter_policy->Name(), filter_policy); |
510 | 1.03M | } |
511 | | |
512 | | PriorityThreadPool* GetGlobalPriorityThreadPool |
513 | 527k | (const scoped_refptr<MetricEntity>& metric_entity = nullptr) { |
514 | 527k | static PriorityThreadPool priority_thread_pool_for_compactions_and_flushes( |
515 | 527k | GetGlobalRocksDBPriorityThreadPoolSize(), metric_entity); |
516 | 527k | return &priority_thread_pool_for_compactions_and_flushes; |
517 | 527k | } |
518 | | |
519 | | } // namespace |
520 | | |
521 | 9 | rocksdb::Options TEST_AutoInitFromRocksDBFlags() { |
522 | 9 | rocksdb::Options options; |
523 | 9 | AutoInitFromRocksDBFlags(&options); |
524 | 9 | return options; |
525 | 9 | } |
526 | | |
527 | 4 | rocksdb::BlockBasedTableOptions TEST_AutoInitFromRocksDbTableFlags() { |
528 | 4 | rocksdb::BlockBasedTableOptions blockBasedTableOptions; |
529 | 4 | AutoInitFromBlockBasedTableOptions(&blockBasedTableOptions); |
530 | 4 | return blockBasedTableOptions; |
531 | 4 | } |
532 | | |
533 | 23.3k | int32_t GetGlobalRocksDBPriorityThreadPoolSize() { |
534 | 23.3k | if (FLAGS_rocksdb_disable_compactions) { |
535 | 7 | return 1; |
536 | 7 | } |
537 | | |
538 | 23.3k | auto priority_thread_pool_size = FLAGS_priority_thread_pool_size; |
539 | | |
540 | 23.3k | if (priority_thread_pool_size == 0) { |
541 | 0 | LOG(FATAL) << "--priority_thread_pool_size may not be set to zero with compactions " |
542 | 0 | << "enabled. Either change this flag or set --rocksdb_disable_compactions=true."; |
543 | 23.3k | } else if (priority_thread_pool_size > 0) { |
544 | 1 | return priority_thread_pool_size; |
545 | 1 | } |
546 | | |
547 | 23.3k | if (FLAGS_rocksdb_max_background_compactions != -1) { |
548 | | // If we did set the per-rocksdb flag, but not FLAGS_priority_thread_pool_size, just port |
549 | | // over that value. |
550 | 4 | priority_thread_pool_size = GetMaxBackgroundCompactions(); |
551 | 23.3k | } else { |
552 | 23.3k | const int kNumCpus = base::NumCPUs(); |
553 | | // If we did not override the per-rocksdb queue size, then just use a production friendly |
554 | | // formula. |
555 | | // |
556 | | // For less then 8cpus, just manually tune to 1-2 threads. Above that, we can use 3.5/8. |
557 | 23.3k | if (kNumCpus < 4) { |
558 | 1 | priority_thread_pool_size = 1; |
559 | 23.3k | } else if (kNumCpus < 8) { |
560 | 1 | priority_thread_pool_size = 2; |
561 | 23.3k | } else { |
562 | 23.3k | priority_thread_pool_size = (int32_t) std::floor(kNumCpus * 3.5 / 8.0); |
563 | 23.3k | } |
564 | 23.3k | } |
565 | | |
566 | 23.3k | LOG(INFO) << "FLAGS_priority_thread_pool_size was not set, automatically configuring to " |
567 | 23.3k | << priority_thread_pool_size << "."; |
568 | | |
569 | 23.3k | return priority_thread_pool_size; |
570 | 23.3k | } |
571 | | |
572 | | void InitRocksDBOptions( |
573 | | rocksdb::Options* options, const string& log_prefix, |
574 | | const shared_ptr<rocksdb::Statistics>& statistics, |
575 | | const tablet::TabletOptions& tablet_options, |
576 | 527k | rocksdb::BlockBasedTableOptions table_options) { |
577 | 527k | AutoInitFromRocksDBFlags(options); |
578 | 527k | SetLogPrefix(options, log_prefix); |
579 | 527k | options->create_if_missing = true; |
580 | 527k | options->disableDataSync = true; |
581 | 527k | options->statistics = statistics; |
582 | 527k | options->info_log_level = YBRocksDBLogger::ConvertToRocksDBLogLevel(FLAGS_minloglevel); |
583 | 527k | options->initial_seqno = FLAGS_initial_seqno; |
584 | 527k | options->boundary_extractor = DocBoundaryValuesExtractorInstance(); |
585 | 527k | options->compaction_measure_io_stats = FLAGS_rocksdb_compaction_measure_io_stats; |
586 | 527k | options->memory_monitor = tablet_options.memory_monitor; |
587 | 527k | if (FLAGS_db_write_buffer_size != -1) { |
588 | 410 | options->write_buffer_size = FLAGS_db_write_buffer_size; |
589 | 527k | } else { |
590 | 527k | options->write_buffer_size = FLAGS_memstore_size_mb * 1_MB; |
591 | 527k | } |
592 | 527k | options->env = tablet_options.rocksdb_env; |
593 | 527k | options->checkpoint_env = rocksdb::Env::Default(); |
594 | 527k | options->priority_thread_pool_for_compactions_and_flushes = |
595 | 527k | (tablet_options.ServerMetricEntity) ? |
596 | 438k | GetGlobalPriorityThreadPool(tablet_options.ServerMetricEntity) : |
597 | 527k | GetGlobalPriorityThreadPool()89.3k ; |
598 | | |
599 | 527k | if (FLAGS_num_reserved_small_compaction_threads != -1) { |
600 | 0 | options->num_reserved_small_compaction_threads = FLAGS_num_reserved_small_compaction_threads; |
601 | 0 | } |
602 | | |
603 | | // Since the flag validator for FLAGS_compression_type will fail if the result of this call is not |
604 | | // OK, this CHECK_RESULT should never fail and is safe. |
605 | 527k | options->compression = CHECK_RESULT(GetConfiguredCompressionType(FLAGS_compression_type)); |
606 | | |
607 | 527k | options->listeners.insert( |
608 | 527k | options->listeners.end(), tablet_options.listeners.begin(), |
609 | 527k | tablet_options.listeners.end()); // Append listeners |
610 | | |
611 | | // Set block cache options. |
612 | 527k | if (tablet_options.block_cache) { |
613 | 451k | table_options.block_cache = tablet_options.block_cache; |
614 | | // Cache the bloom filters in the block cache. |
615 | 451k | table_options.cache_index_and_filter_blocks = true; |
616 | 451k | } else { |
617 | 75.8k | table_options.no_block_cache = true; |
618 | 75.8k | table_options.cache_index_and_filter_blocks = false; |
619 | 75.8k | } |
620 | | |
621 | 527k | AutoInitFromBlockBasedTableOptions(&table_options); |
622 | | |
623 | | // Set our custom bloom filter that is docdb aware. |
624 | 527k | if (FLAGS_use_docdb_aware_bloom_filter) { |
625 | 515k | const auto filter_block_size_bits = table_options.filter_block_size * 8; |
626 | 515k | table_options.filter_policy = std::make_shared<const DocDbAwareV3FilterPolicy>( |
627 | 515k | filter_block_size_bits, options->info_log.get()); |
628 | 515k | table_options.supported_filter_policies = |
629 | 515k | std::make_shared<rocksdb::BlockBasedTableOptions::FilterPoliciesMap>(); |
630 | 515k | AddSupportedFilterPolicy(std::make_shared<const DocDbAwareHashedComponentsFilterPolicy>( |
631 | 515k | filter_block_size_bits, options->info_log.get()), &table_options); |
632 | 515k | AddSupportedFilterPolicy(std::make_shared<const DocDbAwareV2FilterPolicy>( |
633 | 515k | filter_block_size_bits, options->info_log.get()), &table_options); |
634 | 515k | } |
635 | | |
636 | 527k | if (FLAGS_use_multi_level_index527k ) { |
637 | 527k | table_options.index_type = rocksdb::IndexType::kMultiLevelBinarySearch; |
638 | 18.4E | } else { |
639 | 18.4E | table_options.index_type = rocksdb::IndexType::kBinarySearch; |
640 | 18.4E | } |
641 | | |
642 | 527k | options->table_factory.reset(rocksdb::NewBlockBasedTableFactory(table_options)); |
643 | | |
644 | | // Compaction related options. |
645 | | |
646 | | // Enable universal style compactions. |
647 | 527k | bool compactions_enabled = !FLAGS_rocksdb_disable_compactions; |
648 | 527k | options->compaction_style = compactions_enabled |
649 | 527k | ? rocksdb::CompactionStyle::kCompactionStyleUniversal |
650 | 18.4E | : rocksdb::CompactionStyle::kCompactionStyleNone; |
651 | | // Set the number of levels to 1. |
652 | 527k | options->num_levels = 1; |
653 | | |
654 | 527k | AutoInitFromRocksDBFlags(options); |
655 | 527k | if (compactions_enabled527k ) { |
656 | 527k | options->level0_file_num_compaction_trigger = FLAGS_rocksdb_level0_file_num_compaction_trigger; |
657 | 527k | options->level0_slowdown_writes_trigger = max_if_negative( |
658 | 527k | FLAGS_rocksdb_level0_slowdown_writes_trigger); |
659 | 527k | options->level0_stop_writes_trigger = max_if_negative(FLAGS_rocksdb_level0_stop_writes_trigger); |
660 | | // This determines the algo used to compute which files will be included. The "total size" based |
661 | | // computation compares the size of every new file with the sum of all files included so far. |
662 | 527k | options->compaction_options_universal.stop_style = |
663 | 527k | rocksdb::CompactionStopStyle::kCompactionStopStyleTotalSize; |
664 | 527k | options->compaction_options_universal.size_ratio = |
665 | 527k | FLAGS_rocksdb_universal_compaction_size_ratio; |
666 | 527k | options->compaction_options_universal.always_include_size_threshold = |
667 | 527k | FLAGS_rocksdb_universal_compaction_always_include_size_threshold; |
668 | 527k | options->compaction_options_universal.min_merge_width = |
669 | 527k | FLAGS_rocksdb_universal_compaction_min_merge_width; |
670 | 527k | options->compaction_size_threshold_bytes = FLAGS_rocksdb_compaction_size_threshold_bytes; |
671 | 527k | options->rate_limiter = tablet_options.rate_limiter ? tablet_options.rate_limiter438k |
672 | 527k | : CreateRocksDBRateLimiter()89.6k ; |
673 | 18.4E | } else { |
674 | 18.4E | options->level0_slowdown_writes_trigger = std::numeric_limits<int>::max(); |
675 | 18.4E | options->level0_stop_writes_trigger = std::numeric_limits<int>::max(); |
676 | 18.4E | } |
677 | | |
678 | 527k | options->max_write_buffer_number = FLAGS_rocksdb_max_write_buffer_number; |
679 | | |
680 | 527k | options->memtable_factory = std::make_shared<rocksdb::SkipListFactory>( |
681 | 527k | 0 /* lookahead */, rocksdb::ConcurrentWrites::kFalse); |
682 | | |
683 | 527k | options->iterator_replacer = std::make_shared<rocksdb::IteratorReplacer>(&WrapIterator); |
684 | 527k | } |
685 | | |
686 | 673k | void SetLogPrefix(rocksdb::Options* options, const std::string& log_prefix) { |
687 | 673k | options->log_prefix = log_prefix; |
688 | 673k | options->info_log = std::make_shared<YBRocksDBLogger>(options->log_prefix); |
689 | 673k | } |
690 | | |
691 | | namespace { |
692 | | |
693 | | // Helper class for RocksDBPatcher. |
694 | | class RocksDBPatcherHelper { |
695 | | public: |
696 | | explicit RocksDBPatcherHelper(rocksdb::VersionSet* version_set) |
697 | | : version_set_(version_set), cfd_(version_set->GetColumnFamilySet()->GetDefault()), |
698 | 1.96k | delete_edit_(cfd_), add_edit_(cfd_) { |
699 | 1.96k | } |
700 | | |
701 | 3.92k | int Levels() const { |
702 | 3.92k | return cfd_->NumberLevels(); |
703 | 3.92k | } |
704 | | |
705 | 1.96k | const std::vector<rocksdb::FileMetaData*>& LevelFiles(int level) { |
706 | 1.96k | return cfd_->current()->storage_info()->LevelFiles(level); |
707 | 1.96k | } |
708 | | |
709 | | template <class F> |
710 | 1.96k | auto IterateFiles(const F& f) { |
711 | | // Auto routing based on f return type. |
712 | 1.96k | return IterateFilesHelper(f, static_cast<decltype(f(0, *LevelFiles(0).front()))*>(nullptr)); |
713 | 1.96k | } docdb_rocksdb_util.cc:auto yb::docdb::(anonymous namespace)::RocksDBPatcherHelper::IterateFiles<yb::docdb::RocksDBPatcher::Impl::SetHybridTimeFilter(yb::HybridTime)::'lambda'(int, rocksdb::FileMetaData const&)>(yb::docdb::RocksDBPatcher::Impl::SetHybridTimeFilter(yb::HybridTime)::'lambda'(int, rocksdb::FileMetaData const&) const&) Line | Count | Source | 710 | 161 | auto IterateFiles(const F& f) { | 711 | | // Auto routing based on f return type. | 712 | 161 | return IterateFilesHelper(f, static_cast<decltype(f(0, *LevelFiles(0).front()))*>(nullptr)); | 713 | 161 | } |
docdb_rocksdb_util.cc:auto yb::docdb::(anonymous namespace)::RocksDBPatcherHelper::IterateFiles<yb::docdb::RocksDBPatcher::Impl::ModifyFlushedFrontier(yb::docdb::ConsensusFrontier const&)::'lambda'(int, rocksdb::FileMetaData)>(yb::docdb::RocksDBPatcher::Impl::ModifyFlushedFrontier(yb::docdb::ConsensusFrontier const&)::'lambda'(int, rocksdb::FileMetaData) const&) Line | Count | Source | 710 | 1.79k | auto IterateFiles(const F& f) { | 711 | | // Auto routing based on f return type. | 712 | 1.79k | return IterateFilesHelper(f, static_cast<decltype(f(0, *LevelFiles(0).front()))*>(nullptr)); | 713 | 1.79k | } |
Unexecuted instantiation: docdb_rocksdb_util.cc:auto yb::docdb::(anonymous namespace)::RocksDBPatcherHelper::IterateFiles<yb::docdb::RocksDBPatcher::Impl::UpdateFileSizes()::'lambda'(int, rocksdb::FileMetaData const&)>(yb::docdb::RocksDBPatcher::Impl::UpdateFileSizes()::'lambda'(int, rocksdb::FileMetaData const&) const&) |
714 | | |
715 | 3.55k | void ModifyFile(int level, const rocksdb::FileMetaData& fmd) { |
716 | 3.55k | delete_edit_->DeleteFile(level, fmd.fd.GetNumber()); |
717 | 3.55k | add_edit_->AddCleanedFile(level, fmd); |
718 | 3.55k | } |
719 | | |
720 | 1.79k | rocksdb::VersionEdit& Edit() { |
721 | 1.79k | return *add_edit_; |
722 | 1.79k | } |
723 | | |
724 | | CHECKED_STATUS Apply( |
725 | 1.96k | const rocksdb::Options& options, const rocksdb::ImmutableCFOptions& imm_cf_options) { |
726 | 1.96k | if (!delete_edit_.modified() && !add_edit_.modified()168 ) { |
727 | 147 | return Status::OK(); |
728 | 147 | } |
729 | | |
730 | 1.81k | rocksdb::MutableCFOptions mutable_cf_options(options, imm_cf_options); |
731 | 1.81k | { |
732 | 1.81k | rocksdb::InstrumentedMutex mutex; |
733 | 1.81k | rocksdb::InstrumentedMutexLock lock(&mutex); |
734 | 3.62k | for (auto* edit : {&delete_edit_, &add_edit_}) { |
735 | 3.62k | if (edit->modified()) { |
736 | 3.60k | RETURN_NOT_OK(version_set_->LogAndApply(cfd_, mutable_cf_options, edit->get(), &mutex)); |
737 | 3.60k | } |
738 | 3.62k | } |
739 | 1.81k | } |
740 | | |
741 | 1.81k | return Status::OK(); |
742 | 1.81k | } |
743 | | |
744 | | private: |
745 | | template <class F> |
746 | 1.96k | void IterateFilesHelper(const F& f, void*) { |
747 | 3.92k | for (int level = 0; level < Levels(); ++level1.96k ) { |
748 | 3.72k | for (const auto* file : LevelFiles(level)) { |
749 | 3.72k | f(level, *file); |
750 | 3.72k | } |
751 | 1.96k | } |
752 | 1.96k | } docdb_rocksdb_util.cc:void yb::docdb::(anonymous namespace)::RocksDBPatcherHelper::IterateFilesHelper<yb::docdb::RocksDBPatcher::Impl::SetHybridTimeFilter(yb::HybridTime)::'lambda'(int, rocksdb::FileMetaData const&)>(yb::docdb::RocksDBPatcher::Impl::SetHybridTimeFilter(yb::HybridTime)::'lambda'(int, rocksdb::FileMetaData const&) const&, void*) Line | Count | Source | 746 | 161 | void IterateFilesHelper(const F& f, void*) { | 747 | 322 | for (int level = 0; level < Levels(); ++level161 ) { | 748 | 185 | for (const auto* file : LevelFiles(level)) { | 749 | 185 | f(level, *file); | 750 | 185 | } | 751 | 161 | } | 752 | 161 | } |
docdb_rocksdb_util.cc:void yb::docdb::(anonymous namespace)::RocksDBPatcherHelper::IterateFilesHelper<yb::docdb::RocksDBPatcher::Impl::ModifyFlushedFrontier(yb::docdb::ConsensusFrontier const&)::'lambda'(int, rocksdb::FileMetaData)>(yb::docdb::RocksDBPatcher::Impl::ModifyFlushedFrontier(yb::docdb::ConsensusFrontier const&)::'lambda'(int, rocksdb::FileMetaData) const&, void*) Line | Count | Source | 746 | 1.79k | void IterateFilesHelper(const F& f, void*) { | 747 | 3.59k | for (int level = 0; level < Levels(); ++level1.79k ) { | 748 | 3.53k | for (const auto* file : LevelFiles(level)) { | 749 | 3.53k | f(level, *file); | 750 | 3.53k | } | 751 | 1.79k | } | 752 | 1.79k | } |
|
753 | | |
754 | | template <class F> |
755 | 0 | CHECKED_STATUS IterateFilesHelper(const F& f, Status*) { |
756 | 0 | for (int level = 0; level < Levels(); ++level) { |
757 | 0 | for (const auto* file : LevelFiles(level)) { |
758 | 0 | RETURN_NOT_OK(f(level, *file)); |
759 | 0 | } |
760 | 0 | } |
761 | 0 | return Status::OK(); |
762 | 0 | } |
763 | | |
764 | | class TrackedEdit { |
765 | | public: |
766 | 3.92k | explicit TrackedEdit(rocksdb::ColumnFamilyData* cfd) { |
767 | 3.92k | edit_.SetColumnFamily(cfd->GetID()); |
768 | 3.92k | } |
769 | | |
770 | 12.5k | rocksdb::VersionEdit* get() { |
771 | 12.5k | modified_ = true; |
772 | 12.5k | return &edit_; |
773 | 12.5k | } |
774 | | |
775 | 7.10k | rocksdb::VersionEdit* operator->() { |
776 | 7.10k | return get(); |
777 | 7.10k | } |
778 | | |
779 | 1.79k | rocksdb::VersionEdit& operator*() { |
780 | 1.79k | return *get(); |
781 | 1.79k | } |
782 | | |
783 | 5.75k | bool modified() const { |
784 | 5.75k | return modified_; |
785 | 5.75k | } |
786 | | |
787 | | private: |
788 | | rocksdb::VersionEdit edit_; |
789 | | bool modified_ = false; |
790 | | }; |
791 | | |
792 | | rocksdb::VersionSet* version_set_; |
793 | | rocksdb::ColumnFamilyData* cfd_; |
794 | | TrackedEdit delete_edit_; |
795 | | TrackedEdit add_edit_; |
796 | | }; |
797 | | |
798 | | } // namespace |
799 | | |
800 | | class RocksDBPatcher::Impl { |
801 | | public: |
802 | | Impl(const std::string& dbpath, const rocksdb::Options& options) |
803 | | : options_(SanitizeOptions(dbpath, &comparator_, options)), |
804 | | imm_cf_options_(options_), |
805 | | env_options_(options_), |
806 | | cf_options_(options_), |
807 | 1.93k | version_set_(dbpath, &options_, env_options_, block_cache_.get(), &write_buffer_, nullptr) { |
808 | 1.93k | cf_options_.comparator = comparator_.user_comparator(); |
809 | 1.93k | } |
810 | | |
811 | 1.93k | CHECKED_STATUS Load() { |
812 | 1.93k | std::vector<rocksdb::ColumnFamilyDescriptor> column_families; |
813 | 1.93k | column_families.emplace_back("default", cf_options_); |
814 | 1.93k | return version_set_.Recover(column_families); |
815 | 1.93k | } |
816 | | |
817 | 161 | CHECKED_STATUS SetHybridTimeFilter(HybridTime value) { |
818 | 161 | RocksDBPatcherHelper helper(&version_set_); |
819 | | |
820 | 185 | helper.IterateFiles([&helper, value](int level, const rocksdb::FileMetaData& file) { |
821 | 185 | if (!file.largest.user_frontier) { |
822 | 0 | return; |
823 | 0 | } |
824 | 185 | auto& consensus_frontier = down_cast<ConsensusFrontier&>(*file.largest.user_frontier); |
825 | 185 | if (consensus_frontier.hybrid_time() <= value || |
826 | 185 | consensus_frontier.hybrid_time_filter() <= value14 ) { |
827 | 171 | return; |
828 | 171 | } |
829 | 14 | rocksdb::FileMetaData fmd = file; |
830 | 14 | down_cast<ConsensusFrontier&>(*fmd.largest.user_frontier).set_hybrid_time_filter(value); |
831 | 14 | helper.ModifyFile(level, fmd); |
832 | 14 | }); |
833 | | |
834 | 161 | return helper.Apply(options_, imm_cf_options_); |
835 | 161 | } |
836 | | |
837 | 1.79k | CHECKED_STATUS ModifyFlushedFrontier(const ConsensusFrontier& frontier) { |
838 | 1.79k | RocksDBPatcherHelper helper(&version_set_); |
839 | | |
840 | 1.79k | docdb::ConsensusFrontier final_frontier = frontier; |
841 | | |
842 | 1.79k | auto* existing_frontier = down_cast<docdb::ConsensusFrontier*>(version_set_.FlushedFrontier()); |
843 | 1.79k | if (existing_frontier) { |
844 | 1.78k | if (!frontier.history_cutoff()) { |
845 | 1.78k | final_frontier.set_history_cutoff(existing_frontier->history_cutoff()); |
846 | 1.78k | } |
847 | 1.78k | if (!frontier.op_id()) { |
848 | | // Update op id only if it was specified in frontier. |
849 | 0 | final_frontier.set_op_id(existing_frontier->op_id()); |
850 | 0 | } |
851 | 1.78k | } |
852 | | |
853 | 1.79k | helper.Edit().ModifyFlushedFrontier( |
854 | 1.79k | final_frontier.Clone(), rocksdb::FrontierModificationMode::kForce); |
855 | | |
856 | 3.53k | helper.IterateFiles([&helper, &frontier](int level, rocksdb::FileMetaData fmd) { |
857 | 3.53k | bool modified = false; |
858 | 7.07k | for (auto* user_frontier : {&fmd.smallest.user_frontier, &fmd.largest.user_frontier}) { |
859 | 7.07k | if (!*user_frontier) { |
860 | 0 | continue; |
861 | 0 | } |
862 | 7.07k | auto& consensus_frontier = down_cast<ConsensusFrontier&>(**user_frontier); |
863 | 7.07k | if (!consensus_frontier.op_id().empty()) { |
864 | 7.07k | consensus_frontier.set_op_id(OpId()); |
865 | 7.07k | modified = true; |
866 | 7.07k | } |
867 | 7.07k | if (frontier.history_cutoff()) { |
868 | 0 | consensus_frontier.set_history_cutoff(frontier.history_cutoff()); |
869 | 0 | modified = true; |
870 | 0 | } |
871 | 7.07k | } |
872 | 3.53k | if (modified) { |
873 | 3.53k | helper.ModifyFile(level, fmd); |
874 | 3.53k | } |
875 | 3.53k | }); |
876 | | |
877 | 1.79k | return helper.Apply(options_, imm_cf_options_); |
878 | 1.79k | } |
879 | | |
880 | 0 | CHECKED_STATUS UpdateFileSizes() { |
881 | 0 | RocksDBPatcherHelper helper(&version_set_); |
882 | |
|
883 | 0 | RETURN_NOT_OK(helper.IterateFiles( |
884 | 0 | [&helper, this](int level, const rocksdb::FileMetaData& file) -> Status { |
885 | 0 | auto base_path = rocksdb::MakeTableFileName( |
886 | 0 | options_.db_paths[file.fd.GetPathId()].path, file.fd.GetNumber()); |
887 | 0 | auto data_path = rocksdb::TableBaseToDataFileName(base_path); |
888 | 0 | auto base_size = VERIFY_RESULT(options_.env->GetFileSize(base_path)); |
889 | 0 | auto data_size = VERIFY_RESULT(options_.env->GetFileSize(data_path)); |
890 | 0 | auto total_size = base_size + data_size; |
891 | 0 | if (file.fd.base_file_size == base_size && file.fd.total_file_size == total_size) { |
892 | 0 | return Status::OK(); |
893 | 0 | } |
894 | 0 | rocksdb::FileMetaData fmd = file; |
895 | 0 | fmd.fd.base_file_size = base_size; |
896 | 0 | fmd.fd.total_file_size = total_size; |
897 | 0 | helper.ModifyFile(level, fmd); |
898 | 0 | return Status::OK(); |
899 | 0 | })); |
900 | | |
901 | 0 | return helper.Apply(options_, imm_cf_options_); |
902 | 0 | } |
903 | | |
904 | | private: |
905 | | const rocksdb::InternalKeyComparator comparator_{rocksdb::BytewiseComparator()}; |
906 | | rocksdb::WriteBuffer write_buffer_{1_KB}; |
907 | | std::shared_ptr<rocksdb::Cache> block_cache_{rocksdb::NewLRUCache(1_MB)}; |
908 | | |
909 | | rocksdb::Options options_; |
910 | | rocksdb::ImmutableCFOptions imm_cf_options_; |
911 | | rocksdb::EnvOptions env_options_; |
912 | | rocksdb::ColumnFamilyOptions cf_options_; |
913 | | rocksdb::VersionSet version_set_; |
914 | | }; |
915 | | |
916 | | RocksDBPatcher::RocksDBPatcher(const std::string& dbpath, const rocksdb::Options& options) |
917 | 1.93k | : impl_(new Impl(dbpath, options)) { |
918 | 1.93k | } |
919 | | |
920 | 1.93k | RocksDBPatcher::~RocksDBPatcher() { |
921 | 1.93k | } |
922 | | |
923 | 1.93k | Status RocksDBPatcher::Load() { |
924 | 1.93k | return impl_->Load(); |
925 | 1.93k | } |
926 | | |
927 | 161 | Status RocksDBPatcher::SetHybridTimeFilter(HybridTime value) { |
928 | 161 | return impl_->SetHybridTimeFilter(value); |
929 | 161 | } |
930 | | |
931 | 1.79k | Status RocksDBPatcher::ModifyFlushedFrontier(const ConsensusFrontier& frontier) { |
932 | 1.79k | return impl_->ModifyFlushedFrontier(frontier); |
933 | 1.79k | } |
934 | | |
935 | 0 | Status RocksDBPatcher::UpdateFileSizes() { |
936 | 0 | return impl_->UpdateFileSizes(); |
937 | 0 | } |
938 | | |
939 | 235 | Status ForceRocksDBCompact(rocksdb::DB* db) { |
940 | 235 | RETURN_NOT_OK_PREPEND( |
941 | 235 | db->CompactRange(rocksdb::CompactRangeOptions(), /* begin = */ nullptr, /* end = */ nullptr), |
942 | 235 | "Compact range failed:"); |
943 | 235 | return Status::OK(); |
944 | 235 | } |
945 | | |
946 | 8.75k | RateLimiterSharingMode GetRocksDBRateLimiterSharingMode() { |
947 | 8.75k | auto result = ParseEnumInsensitive<RateLimiterSharingMode>( |
948 | 8.75k | FLAGS_rocksdb_compact_flush_rate_limit_sharing_mode); |
949 | 8.75k | if (PREDICT_TRUE(result.ok())) { |
950 | 8.75k | return *result; |
951 | 8.75k | } |
952 | 0 | LOG(DFATAL) << result.status(); |
953 | 0 | return RateLimiterSharingMode::NONE; |
954 | 8.75k | } |
955 | | |
956 | 98.3k | std::shared_ptr<rocksdb::RateLimiter> CreateRocksDBRateLimiter() { |
957 | 98.3k | if (PREDICT_TRUE((FLAGS_rocksdb_compact_flush_rate_limit_bytes_per_sec > 0))) { |
958 | 98.3k | return std::shared_ptr<rocksdb::RateLimiter>( |
959 | 98.3k | rocksdb::NewGenericRateLimiter(FLAGS_rocksdb_compact_flush_rate_limit_bytes_per_sec)); |
960 | 98.3k | } |
961 | 11 | return nullptr; |
962 | 98.3k | } |
963 | | |
964 | | } // namespace docdb |
965 | | } // namespace yb |