/Users/deen/code/yugabyte-db/src/yb/docdb/docdb_rocksdb_util.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | // |
13 | | |
14 | | #include "yb/docdb/docdb_rocksdb_util.h" |
15 | | |
16 | | #include <memory> |
17 | | #include <thread> |
18 | | |
19 | | #include "yb/common/transaction.h" |
20 | | |
21 | | #include "yb/docdb/bounded_rocksdb_iterator.h" |
22 | | #include "yb/docdb/consensus_frontier.h" |
23 | | #include "yb/docdb/doc_key.h" |
24 | | #include "yb/docdb/intent_aware_iterator.h" |
25 | | #include "yb/docdb/key_bounds.h" |
26 | | #include "yb/docdb/value_type.h" |
27 | | |
28 | | #include "yb/gutil/casts.h" |
29 | | #include "yb/gutil/sysinfo.h" |
30 | | |
31 | | #include "yb/rocksdb/db/db_impl.h" |
32 | | #include "yb/rocksdb/db/filename.h" |
33 | | #include "yb/rocksdb/db/version_edit.h" |
34 | | #include "yb/rocksdb/db/version_set.h" |
35 | | #include "yb/rocksdb/db/writebuffer.h" |
36 | | #include "yb/rocksdb/memtablerep.h" |
37 | | #include "yb/rocksdb/options.h" |
38 | | #include "yb/rocksdb/rate_limiter.h" |
39 | | #include "yb/rocksdb/table.h" |
40 | | #include "yb/rocksdb/table/filtering_iterator.h" |
41 | | #include "yb/rocksdb/types.h" |
42 | | #include "yb/rocksdb/util/compression.h" |
43 | | |
44 | | #include "yb/rocksutil/yb_rocksdb_logger.h" |
45 | | |
46 | | #include "yb/util/bytes_formatter.h" |
47 | | #include "yb/util/priority_thread_pool.h" |
48 | | #include "yb/util/result.h" |
49 | | #include "yb/util/size_literals.h" |
50 | | #include "yb/util/status.h" |
51 | | #include "yb/util/status_format.h" |
52 | | #include "yb/util/status_log.h" |
53 | | #include "yb/util/trace.h" |
54 | | |
55 | | using namespace yb::size_literals; // NOLINT. |
56 | | using namespace std::literals; |
57 | | |
58 | | static constexpr int32_t kMinBlockStartInterval = 1; |
59 | | static constexpr int32_t kDefaultBlockStartInterval = 16; |
60 | | static constexpr int32_t kMaxBlockStartInterval = 256; |
61 | | |
62 | | DEFINE_int32(rocksdb_max_background_flushes, -1, "Number threads to do background flushes."); |
63 | | DEFINE_bool(rocksdb_disable_compactions, false, "Disable rocksdb compactions."); |
64 | | DEFINE_bool(rocksdb_compaction_measure_io_stats, false, "Measure stats for rocksdb compactions."); |
65 | | DEFINE_int32(rocksdb_base_background_compactions, -1, |
66 | | "Number threads to do background compactions."); |
67 | | DEFINE_int32(rocksdb_max_background_compactions, -1, |
68 | | "Increased number of threads to do background compactions (used when compactions need " |
69 | | "to catch up.)"); |
70 | | DEFINE_int32(rocksdb_level0_file_num_compaction_trigger, 5, |
71 | | "Number of files to trigger level-0 compaction. -1 if compaction should not be " |
72 | | "triggered by number of files at all."); |
73 | | |
74 | | DEFINE_int32(rocksdb_level0_slowdown_writes_trigger, -1, |
75 | | "The number of files above which writes are slowed down."); |
76 | | DEFINE_int32(rocksdb_level0_stop_writes_trigger, -1, |
77 | | "The number of files above which compactions are stopped."); |
78 | | DEFINE_int32(rocksdb_universal_compaction_size_ratio, 20, |
79 | | "The percentage upto which files that are larger are include in a compaction."); |
80 | | DEFINE_uint64(rocksdb_universal_compaction_always_include_size_threshold, 64_MB, |
81 | | "Always include files of smaller or equal size in a compaction."); |
82 | | DEFINE_int32(rocksdb_universal_compaction_min_merge_width, 4, |
83 | | "The minimum number of files in a single compaction run."); |
84 | | DEFINE_int64(rocksdb_compact_flush_rate_limit_bytes_per_sec, 1_GB, |
85 | | "Use to control write rate of flush and compaction."); |
86 | | DEFINE_string(rocksdb_compact_flush_rate_limit_sharing_mode, "tserver", |
87 | | "Allows to control rate limit sharing/calculation across RocksDB instances\n" |
88 | | " tserver - rate limit is shared across all RocksDB instances" |
89 | | " at tabset server level\n" |
90 | | " none - rate limit is calculated independently for every RocksDB instance"); |
91 | | DEFINE_uint64(rocksdb_compaction_size_threshold_bytes, 2ULL * 1024 * 1024 * 1024, |
92 | | "Threshold beyond which compaction is considered large."); |
93 | | DEFINE_uint64(rocksdb_max_file_size_for_compaction, 0, |
94 | | "Maximal allowed file size to participate in RocksDB compaction. 0 - unlimited."); |
95 | | DEFINE_int32(rocksdb_max_write_buffer_number, 2, |
96 | | "Maximum number of write buffers that are built up in memory."); |
97 | | |
98 | | DEFINE_int64(db_block_size_bytes, 32_KB, |
99 | | "Size of RocksDB data block (in bytes)."); |
100 | | |
101 | | DEFINE_int64(db_filter_block_size_bytes, 64_KB, |
102 | | "Size of RocksDB filter block (in bytes)."); |
103 | | |
104 | | DEFINE_int64(db_index_block_size_bytes, 32_KB, |
105 | | "Size of RocksDB index block (in bytes)."); |
106 | | |
107 | | DEFINE_int64(db_min_keys_per_index_block, 100, |
108 | | "Minimum number of keys per index block."); |
109 | | |
110 | | DEFINE_int64(db_write_buffer_size, -1, |
111 | | "Size of RocksDB write buffer (in bytes). -1 to use default."); |
112 | | |
113 | | DEFINE_int32(memstore_size_mb, 128, |
114 | | "Max size (in mb) of the memstore, before needing to flush."); |
115 | | |
116 | | DEFINE_bool(use_docdb_aware_bloom_filter, true, |
117 | | "Whether to use the DocDbAwareFilterPolicy for both bloom storage and seeks."); |
118 | | // Empirically 2 is a minimal value that provides best performance on sequential scan. |
119 | | DEFINE_int32(max_nexts_to_avoid_seek, 2, |
120 | | "The number of next calls to try before doing resorting to do a rocksdb seek."); |
121 | | |
122 | | DEFINE_bool(use_multi_level_index, true, "Whether to use multi-level data index."); |
123 | | |
124 | | DEFINE_string( |
125 | | regular_tablets_data_block_key_value_encoding, "shared_prefix", |
126 | | "Key-value encoding to use for regular data blocks in RocksDB. Possible options: " |
127 | | "shared_prefix, three_shared_parts"); |
128 | | |
129 | | DEFINE_uint64(initial_seqno, 1ULL << 50, "Initial seqno for new RocksDB instances."); |
130 | | |
131 | | DEFINE_int32(num_reserved_small_compaction_threads, -1, "Number of reserved small compaction " |
132 | | "threads. It allows splitting small vs. large compactions."); |
133 | | |
134 | | DEFINE_bool(enable_ondisk_compression, true, |
135 | | "Determines whether SSTable compression is enabled or not."); |
136 | | |
137 | | DEFINE_int32(priority_thread_pool_size, -1, |
138 | | "Max running workers in compaction thread pool. " |
139 | | "If -1 and max_background_compactions is specified - use max_background_compactions. " |
140 | | "If -1 and max_background_compactions is not specified - use sqrt(num_cpus)."); |
141 | | |
142 | | DEFINE_string(compression_type, "Snappy", |
143 | | "On-disk compression type to use in RocksDB." |
144 | | "By default, Snappy is used if supported."); |
145 | | |
146 | | DEFINE_int32(block_restart_interval, kDefaultBlockStartInterval, |
147 | | "Controls the number of keys to look at for computing the diff encoding."); |
148 | | |
149 | | namespace yb { |
150 | | |
151 | | namespace { |
152 | | |
153 | 441k | Result<rocksdb::CompressionType> GetConfiguredCompressionType(const std::string& flag_value) { |
154 | 441k | if (!FLAGS_enable_ondisk_compression) { |
155 | 1 | return rocksdb::kNoCompression; |
156 | 1 | } |
157 | 441k | const std::vector<rocksdb::CompressionType> kValidRocksDBCompressionTypes = { |
158 | 441k | rocksdb::kNoCompression, |
159 | 441k | rocksdb::kSnappyCompression, |
160 | 441k | rocksdb::kZlibCompression, |
161 | 441k | rocksdb::kLZ4Compression |
162 | 441k | }; |
163 | 882k | for (const auto& compression_type : kValidRocksDBCompressionTypes) { |
164 | 882k | if (flag_value == rocksdb::CompressionTypeToString(compression_type)) { |
165 | 441k | if (rocksdb::CompressionTypeSupported(compression_type)) { |
166 | 441k | return compression_type; |
167 | 441k | } |
168 | 11 | return STATUS_FORMAT( |
169 | 11 | InvalidArgument, "Configured compression type $0 is not supported.", flag_value); |
170 | 11 | } |
171 | 882k | } |
172 | 18.4E | return STATUS_FORMAT( |
173 | 441k | InvalidArgument, "Configured compression type $0 is not valid.", flag_value); |
174 | 441k | } |
175 | | |
176 | | } // namespace |
177 | | |
178 | | namespace docdb { |
179 | | |
180 | | Result<rocksdb::KeyValueEncodingFormat> GetConfiguredKeyValueEncodingFormat( |
181 | 141k | const std::string& flag_value) { |
182 | 141k | for (const auto& encoding_format : rocksdb::kKeyValueEncodingFormatList) { |
183 | 141k | if (flag_value == KeyValueEncodingFormatToString(encoding_format)) { |
184 | 141k | return encoding_format; |
185 | 141k | } |
186 | 141k | } |
187 | 18.4E | return STATUS_FORMAT(InvalidArgument, "Key-value encoding format $0 is not valid.", flag_value); |
188 | 141k | } |
189 | | |
190 | | } // namespace docdb |
191 | | |
192 | | } // namespace yb |
193 | | |
194 | | namespace { |
195 | | |
196 | 11.3k | bool CompressionTypeValidator(const char* flagname, const std::string& flag_compression_type) { |
197 | 11.3k | auto res = yb::GetConfiguredCompressionType(flag_compression_type); |
198 | 11.3k | if (!res.ok()) { |
199 | | // Below we CHECK_RESULT on the same value returned here, and validating the result here ensures |
200 | | // that CHECK_RESULT will never fail once the process is running. |
201 | 0 | LOG(ERROR) << res.status().ToString(); |
202 | 0 | return false; |
203 | 0 | } |
204 | 11.3k | return true; |
205 | 11.3k | } |
206 | | |
207 | 11.3k | bool KeyValueEncodingFormatValidator(const char* flag_name, const std::string& flag_value) { |
208 | 11.3k | auto res = yb::docdb::GetConfiguredKeyValueEncodingFormat(flag_value); |
209 | 11.3k | bool ok = res.ok(); |
210 | 11.3k | if (!ok) { |
211 | 0 | LOG(ERROR) << flag_name << ": " << res.status(); |
212 | 0 | } |
213 | 11.3k | return ok; |
214 | 11.3k | } |
215 | | |
216 | | } // namespace |
217 | | |
218 | | __attribute__((unused)) |
219 | | DEFINE_validator(compression_type, &CompressionTypeValidator); |
220 | | __attribute__((unused)) |
221 | | DEFINE_validator(regular_tablets_data_block_key_value_encoding, &KeyValueEncodingFormatValidator); |
222 | | |
223 | | using std::shared_ptr; |
224 | | using std::string; |
225 | | using std::unique_ptr; |
226 | | using strings::Substitute; |
227 | | |
228 | | namespace yb { |
229 | | namespace docdb { |
230 | | |
231 | | std::shared_ptr<rocksdb::BoundaryValuesExtractor> DocBoundaryValuesExtractorInstance(); |
232 | | |
233 | 441M | void SeekForward(const rocksdb::Slice& slice, rocksdb::Iterator *iter) { |
234 | 441M | if (!iter->Valid() || iter->key().compare(slice) >= 0) { |
235 | 193M | return; |
236 | 193M | } |
237 | 247M | ROCKSDB_SEEK(iter, slice); |
238 | 247M | } |
239 | | |
240 | 64.2M | void SeekForward(const KeyBytes& key_bytes, rocksdb::Iterator *iter) { |
241 | 64.2M | SeekForward(key_bytes.AsSlice(), iter); |
242 | 64.2M | } |
243 | | |
244 | 23.0M | KeyBytes AppendDocHt(const Slice& key, const DocHybridTime& doc_ht) { |
245 | 23.0M | char buf[kMaxBytesPerEncodedHybridTime + 1]; |
246 | 23.0M | buf[0] = ValueTypeAsChar::kHybridTime; |
247 | 23.0M | auto end = doc_ht.EncodedInDocDbFormat(buf + 1); |
248 | 23.0M | return KeyBytes(key, Slice(buf, end)); |
249 | 23.0M | } |
250 | | |
251 | 23.0M | void SeekPastSubKey(const Slice& key, rocksdb::Iterator* iter) { |
252 | 23.0M | SeekForward(AppendDocHt(key, DocHybridTime::kMin), iter); |
253 | 23.0M | } |
254 | | |
255 | 41.2M | void SeekOutOfSubKey(KeyBytes* key_bytes, rocksdb::Iterator* iter) { |
256 | 41.2M | key_bytes->AppendValueType(ValueType::kMaxByte); |
257 | 41.2M | SeekForward(*key_bytes, iter); |
258 | 41.2M | key_bytes->RemoveValueTypeSuffix(ValueType::kMaxByte); |
259 | 41.2M | } |
260 | | |
261 | | void SeekPossiblyUsingNext(rocksdb::Iterator* iter, const Slice& seek_key, |
262 | 260M | int* next_count, int* seek_count) { |
263 | 530M | for (int nexts = FLAGS_max_nexts_to_avoid_seek; nexts-- > 0;) { |
264 | 520M | if (!iter->Valid() || iter->key().compare(seek_key) >= 0) { |
265 | 251M | VTRACE(3, "Did $0 Next(s) instead of a Seek", nexts); |
266 | 251M | return; |
267 | 251M | } |
268 | 18.4E | VLOG(4) << "Skipping: " << SubDocKey::DebugSliceToString(iter->key()); |
269 | | |
270 | 269M | iter->Next(); |
271 | 269M | ++*next_count; |
272 | 269M | } |
273 | | |
274 | 9.53M | VTRACE(3, "Forced to do an actual Seek after $0 Next(s)", FLAGS_max_nexts_to_avoid_seek); |
275 | 9.53M | iter->Seek(seek_key); |
276 | 9.53M | ++*seek_count; |
277 | 9.53M | } |
278 | | |
279 | | void PerformRocksDBSeek( |
280 | | rocksdb::Iterator *iter, |
281 | | const rocksdb::Slice &seek_key, |
282 | | const char* file_name, |
283 | 272M | int line) { |
284 | 272M | int next_count = 0; |
285 | 272M | int seek_count = 0; |
286 | 272M | if (seek_key.size() == 0) { |
287 | 66.9k | iter->SeekToFirst(); |
288 | 66.9k | ++seek_count; |
289 | 272M | } else if (!iter->Valid() || iter->key().compare(seek_key) > 0) { |
290 | 12.3M | iter->Seek(seek_key); |
291 | 12.3M | ++seek_count; |
292 | 260M | } else { |
293 | 260M | SeekPossiblyUsingNext(iter, seek_key, &next_count, &seek_count); |
294 | 260M | } |
295 | 18.4E | VLOG(4) << Substitute( |
296 | 18.4E | "PerformRocksDBSeek at $0:$1:\n" |
297 | 18.4E | " Seek key: $2\n" |
298 | 18.4E | " Seek key (raw): $3\n" |
299 | 18.4E | " Actual key: $4\n" |
300 | 18.4E | " Actual key (raw): $5\n" |
301 | 18.4E | " Actual value: $6\n" |
302 | 18.4E | " Next() calls: $7\n" |
303 | 18.4E | " Seek() calls: $8\n", |
304 | 18.4E | file_name, line, |
305 | 18.4E | BestEffortDocDBKeyToStr(seek_key), |
306 | 18.4E | FormatSliceAsStr(seek_key), |
307 | 18.4E | iter->Valid() ? BestEffortDocDBKeyToStr(KeyBytes(iter->key())) : "N/A", |
308 | 18.4E | iter->Valid() ? FormatSliceAsStr(iter->key()) : "N/A", |
309 | 18.4E | iter->Valid() ? FormatSliceAsStr(iter->value()) : "N/A", |
310 | 18.4E | next_count, |
311 | 18.4E | seek_count); |
312 | 272M | } |
313 | | |
314 | | namespace { |
315 | | |
316 | | rocksdb::ReadOptions PrepareReadOptions( |
317 | | rocksdb::DB* rocksdb, |
318 | | BloomFilterMode bloom_filter_mode, |
319 | | const boost::optional<const Slice>& user_key_for_filter, |
320 | | const rocksdb::QueryId query_id, |
321 | | std::shared_ptr<rocksdb::ReadFileFilter> file_filter, |
322 | 15.3M | const Slice* iterate_upper_bound) { |
323 | 15.3M | rocksdb::ReadOptions read_opts; |
324 | 15.3M | read_opts.query_id = query_id; |
325 | 15.3M | if (FLAGS_use_docdb_aware_bloom_filter && |
326 | 13.9M | bloom_filter_mode == BloomFilterMode::USE_BLOOM_FILTER) { |
327 | 8.61M | DCHECK(user_key_for_filter); |
328 | 8.61M | read_opts.table_aware_file_filter = rocksdb->GetOptions().table_factory-> |
329 | 8.61M | NewTableAwareReadFileFilter(read_opts, user_key_for_filter.get()); |
330 | 8.61M | } |
331 | 15.3M | read_opts.file_filter = std::move(file_filter); |
332 | 15.3M | read_opts.iterate_upper_bound = iterate_upper_bound; |
333 | 15.3M | return read_opts; |
334 | 15.3M | } |
335 | | |
336 | | } // namespace |
337 | | |
338 | | BoundedRocksDbIterator CreateRocksDBIterator( |
339 | | rocksdb::DB* rocksdb, |
340 | | const KeyBounds* docdb_key_bounds, |
341 | | BloomFilterMode bloom_filter_mode, |
342 | | const boost::optional<const Slice>& user_key_for_filter, |
343 | | const rocksdb::QueryId query_id, |
344 | | std::shared_ptr<rocksdb::ReadFileFilter> file_filter, |
345 | 7.06M | const Slice* iterate_upper_bound) { |
346 | 7.06M | rocksdb::ReadOptions read_opts = PrepareReadOptions(rocksdb, bloom_filter_mode, |
347 | 7.06M | user_key_for_filter, query_id, std::move(file_filter), iterate_upper_bound); |
348 | 7.06M | return BoundedRocksDbIterator(rocksdb, read_opts, docdb_key_bounds); |
349 | 7.06M | } |
350 | | |
351 | | unique_ptr<IntentAwareIterator> CreateIntentAwareIterator( |
352 | | const DocDB& doc_db, |
353 | | BloomFilterMode bloom_filter_mode, |
354 | | const boost::optional<const Slice>& user_key_for_filter, |
355 | | const rocksdb::QueryId query_id, |
356 | | const TransactionOperationContext& txn_op_context, |
357 | | CoarseTimePoint deadline, |
358 | | const ReadHybridTime& read_time, |
359 | | std::shared_ptr<rocksdb::ReadFileFilter> file_filter, |
360 | 8.29M | const Slice* iterate_upper_bound) { |
361 | | // TODO(dtxn) do we need separate options for intents db? |
362 | 8.29M | rocksdb::ReadOptions read_opts = PrepareReadOptions(doc_db.regular, bloom_filter_mode, |
363 | 8.29M | user_key_for_filter, query_id, std::move(file_filter), iterate_upper_bound); |
364 | 8.29M | return std::make_unique<IntentAwareIterator>( |
365 | 8.29M | doc_db, read_opts, deadline, read_time, txn_op_context); |
366 | 8.29M | } |
367 | | |
368 | | namespace { |
369 | | |
370 | | std::mutex rocksdb_flags_mutex; |
371 | | |
372 | 860k | int32_t GetMaxBackgroundFlushes() { |
373 | 860k | const auto kNumCpus = base::NumCPUs(); |
374 | 860k | if (FLAGS_rocksdb_max_background_flushes == -1) { |
375 | 860k | constexpr auto kCpusPerFlushThread = 8; |
376 | 860k | constexpr auto kAutoMaxBackgroundFlushesHighLimit = 4; |
377 | 860k | auto flushes = 1 + kNumCpus / kCpusPerFlushThread; |
378 | 860k | auto max_flushes = std::min(flushes, kAutoMaxBackgroundFlushesHighLimit); |
379 | 860k | LOG(INFO) << "Overriding FLAGS_rocksdb_max_background_flushes to " << max_flushes; |
380 | 860k | return max_flushes; |
381 | 0 | } else { |
382 | 0 | return FLAGS_rocksdb_max_background_flushes; |
383 | 0 | } |
384 | 860k | } |
385 | | |
386 | | // This controls the maximum number of schedulable compactions, per each instance of rocksdb, of |
387 | | // which we will have many. We also do not want to waste resources by having too many queued |
388 | | // compactions. |
389 | 1.72M | int32_t GetMaxBackgroundCompactions() { |
390 | 1.72M | if (FLAGS_rocksdb_disable_compactions) { |
391 | 0 | return 0; |
392 | 0 | } |
393 | 1.72M | int rocksdb_max_background_compactions = FLAGS_rocksdb_max_background_compactions; |
394 | | |
395 | 1.72M | if (rocksdb_max_background_compactions >= 0) { |
396 | 7 | return rocksdb_max_background_compactions; |
397 | 7 | } |
398 | | |
399 | 1.72M | const auto kNumCpus = base::NumCPUs(); |
400 | 1.72M | if (kNumCpus <= 4) { |
401 | 0 | rocksdb_max_background_compactions = 1; |
402 | 1.72M | } else if (kNumCpus <= 8) { |
403 | 0 | rocksdb_max_background_compactions = 2; |
404 | 1.72M | } else if (kNumCpus <= 32) { |
405 | 1.72M | rocksdb_max_background_compactions = 3; |
406 | 0 | } else { |
407 | 0 | rocksdb_max_background_compactions = 4; |
408 | 0 | } |
409 | 1.72M | LOG(INFO) << "FLAGS_rocksdb_max_background_compactions was not set, automatically configuring " |
410 | 1.72M | << rocksdb_max_background_compactions << " background compactions."; |
411 | 1.72M | return rocksdb_max_background_compactions; |
412 | 1.72M | } |
413 | | |
414 | 860k | int32_t GetBaseBackgroundCompactions() { |
415 | 860k | if (FLAGS_rocksdb_disable_compactions) { |
416 | 0 | return 0; |
417 | 0 | } |
418 | | |
419 | 860k | if (FLAGS_rocksdb_base_background_compactions == -1) { |
420 | 860k | const auto base_background_compactions = GetMaxBackgroundCompactions(); |
421 | 860k | LOG(INFO) << "FLAGS_rocksdb_base_background_compactions was not set, automatically configuring " |
422 | 860k | << base_background_compactions << " base background compactions."; |
423 | 860k | return base_background_compactions; |
424 | 860k | } |
425 | | |
426 | 0 | return FLAGS_rocksdb_base_background_compactions; |
427 | 0 | } |
428 | | |
429 | | // Auto initialize some of the RocksDB flags. |
430 | 859k | void AutoInitFromRocksDBFlags(rocksdb::Options* options) { |
431 | 859k | std::unique_lock<std::mutex> lock(rocksdb_flags_mutex); |
432 | | |
433 | 859k | options->max_background_flushes = GetMaxBackgroundFlushes(); |
434 | | |
435 | 859k | if (FLAGS_rocksdb_disable_compactions) { |
436 | 4 | return; |
437 | 4 | } |
438 | | |
439 | 859k | options->max_background_compactions = GetMaxBackgroundCompactions(); |
440 | 859k | options->base_background_compactions = GetBaseBackgroundCompactions(); |
441 | 859k | } |
442 | | |
443 | 430k | void AutoInitFromBlockBasedTableOptions(rocksdb::BlockBasedTableOptions* table_options) { |
444 | 430k | std::unique_lock<std::mutex> lock(rocksdb_flags_mutex); |
445 | | |
446 | 430k | table_options->block_size = FLAGS_db_block_size_bytes; |
447 | 430k | table_options->filter_block_size = FLAGS_db_filter_block_size_bytes; |
448 | 430k | table_options->index_block_size = FLAGS_db_index_block_size_bytes; |
449 | 430k | table_options->min_keys_per_index_block = FLAGS_db_min_keys_per_index_block; |
450 | | |
451 | 430k | if (FLAGS_block_restart_interval < kMinBlockStartInterval) { |
452 | 0 | LOG(INFO) << "FLAGS_block_restart_interval was set to a very low value, overriding " |
453 | 0 | << "block_restart_interval to " << kDefaultBlockStartInterval << "."; |
454 | 0 | table_options->block_restart_interval = kDefaultBlockStartInterval; |
455 | 430k | } else if (FLAGS_block_restart_interval > kMaxBlockStartInterval) { |
456 | 0 | LOG(INFO) << "FLAGS_block_restart_interval was set to a very high value, overriding " |
457 | 0 | << "block_restart_interval to " << kMaxBlockStartInterval << "."; |
458 | 0 | table_options->block_restart_interval = kMaxBlockStartInterval; |
459 | 430k | } else { |
460 | 430k | table_options->block_restart_interval = FLAGS_block_restart_interval; |
461 | 430k | } |
462 | 430k | } |
463 | | |
464 | | class HybridTimeFilteringIterator : public rocksdb::FilteringIterator { |
465 | | public: |
466 | | HybridTimeFilteringIterator( |
467 | | rocksdb::InternalIterator* iterator, bool arena_mode, HybridTime hybrid_time_filter) |
468 | 0 | : rocksdb::FilteringIterator(iterator, arena_mode), hybrid_time_filter_(hybrid_time_filter) {} |
469 | | |
470 | | private: |
471 | 0 | bool Satisfied(Slice key) override { |
472 | 0 | auto user_key = rocksdb::ExtractUserKey(key); |
473 | 0 | auto doc_ht = DocHybridTime::DecodeFromEnd(&user_key); |
474 | 0 | if (!doc_ht.ok()) { |
475 | 0 | LOG(DFATAL) << "Unable to decode doc ht " << rocksdb::ExtractUserKey(key) << ": " |
476 | 0 | << doc_ht.status(); |
477 | 0 | return true; |
478 | 0 | } |
479 | 0 | return doc_ht->hybrid_time() <= hybrid_time_filter_; |
480 | 0 | } |
481 | | |
482 | | HybridTime hybrid_time_filter_; |
483 | | }; |
484 | | |
485 | | template <class T, class... Args> |
486 | 0 | T* CreateOnArena(rocksdb::Arena* arena, Args&&... args) { |
487 | 0 | if (!arena) { |
488 | 0 | return new T(std::forward<Args>(args)...); |
489 | 0 | } |
490 | 0 | auto mem = arena->AllocateAligned(sizeof(T)); |
491 | 0 | return new (mem) T(std::forward<Args>(args)...); |
492 | 0 | } |
493 | | |
494 | | rocksdb::InternalIterator* WrapIterator( |
495 | 3.42M | rocksdb::InternalIterator* iterator, rocksdb::Arena* arena, const Slice& filter) { |
496 | 3.42M | if (!filter.empty()) { |
497 | 0 | HybridTime hybrid_time_filter; |
498 | 0 | memcpy(&hybrid_time_filter, filter.data(), sizeof(hybrid_time_filter)); |
499 | 0 | return CreateOnArena<HybridTimeFilteringIterator>( |
500 | 0 | arena, iterator, arena != nullptr, hybrid_time_filter); |
501 | 0 | } |
502 | 3.42M | return iterator; |
503 | 3.42M | } |
504 | | |
505 | | void AddSupportedFilterPolicy( |
506 | | const rocksdb::BlockBasedTableOptions::FilterPolicyPtr& filter_policy, |
507 | 844k | rocksdb::BlockBasedTableOptions* table_options) { |
508 | 844k | table_options->supported_filter_policies->emplace(filter_policy->Name(), filter_policy); |
509 | 844k | } |
510 | | |
511 | | PriorityThreadPool* GetGlobalPriorityThreadPool |
512 | 430k | (const scoped_refptr<MetricEntity>& metric_entity = nullptr) { |
513 | 430k | static PriorityThreadPool priority_thread_pool_for_compactions_and_flushes( |
514 | 430k | GetGlobalRocksDBPriorityThreadPoolSize(), metric_entity); |
515 | 430k | return &priority_thread_pool_for_compactions_and_flushes; |
516 | 430k | } |
517 | | |
518 | | } // namespace |
519 | | |
520 | 0 | rocksdb::Options TEST_AutoInitFromRocksDBFlags() { |
521 | 0 | rocksdb::Options options; |
522 | 0 | AutoInitFromRocksDBFlags(&options); |
523 | 0 | return options; |
524 | 0 | } |
525 | | |
526 | 0 | rocksdb::BlockBasedTableOptions TEST_AutoInitFromRocksDbTableFlags() { |
527 | 0 | rocksdb::BlockBasedTableOptions blockBasedTableOptions; |
528 | 0 | AutoInitFromBlockBasedTableOptions(&blockBasedTableOptions); |
529 | 0 | return blockBasedTableOptions; |
530 | 0 | } |
531 | | |
532 | 15.5k | int32_t GetGlobalRocksDBPriorityThreadPoolSize() { |
533 | 15.5k | if (FLAGS_rocksdb_disable_compactions) { |
534 | 6 | return 1; |
535 | 6 | } |
536 | | |
537 | 15.5k | auto priority_thread_pool_size = FLAGS_priority_thread_pool_size; |
538 | 15.5k | if (priority_thread_pool_size >= 0) { |
539 | 0 | return priority_thread_pool_size; |
540 | 0 | } |
541 | | |
542 | 15.5k | if (FLAGS_rocksdb_max_background_compactions != -1) { |
543 | | // If we did set the per-rocksdb flag, but not FLAGS_priority_thread_pool_size, just port |
544 | | // over that value. |
545 | 3 | priority_thread_pool_size = GetMaxBackgroundCompactions(); |
546 | 15.5k | } else { |
547 | 15.5k | const int kNumCpus = base::NumCPUs(); |
548 | | // If we did not override the per-rocksdb queue size, then just use a production friendly |
549 | | // formula. |
550 | | // |
551 | | // For less then 8cpus, just manually tune to 1-2 threads. Above that, we can use 3.5/8. |
552 | 15.5k | if (kNumCpus < 4) { |
553 | 0 | priority_thread_pool_size = 1; |
554 | 15.5k | } else if (kNumCpus < 8) { |
555 | 0 | priority_thread_pool_size = 2; |
556 | 15.5k | } else { |
557 | 15.5k | priority_thread_pool_size = (int32_t) std::floor(kNumCpus * 3.5 / 8.0); |
558 | 15.5k | } |
559 | 15.5k | } |
560 | | |
561 | 15.5k | LOG(INFO) << "FLAGS_priority_thread_pool_size was not set, automatically configuring to " |
562 | 15.5k | << priority_thread_pool_size << "."; |
563 | | |
564 | 15.5k | return priority_thread_pool_size; |
565 | 15.5k | } |
566 | | |
567 | | void InitRocksDBOptions( |
568 | | rocksdb::Options* options, const string& log_prefix, |
569 | | const shared_ptr<rocksdb::Statistics>& statistics, |
570 | | const tablet::TabletOptions& tablet_options, |
571 | 429k | rocksdb::BlockBasedTableOptions table_options) { |
572 | 429k | AutoInitFromRocksDBFlags(options); |
573 | 429k | SetLogPrefix(options, log_prefix); |
574 | 429k | options->create_if_missing = true; |
575 | 429k | options->disableDataSync = true; |
576 | 429k | options->statistics = statistics; |
577 | 429k | options->info_log_level = YBRocksDBLogger::ConvertToRocksDBLogLevel(FLAGS_minloglevel); |
578 | 429k | options->initial_seqno = FLAGS_initial_seqno; |
579 | 429k | options->boundary_extractor = DocBoundaryValuesExtractorInstance(); |
580 | 429k | options->compaction_measure_io_stats = FLAGS_rocksdb_compaction_measure_io_stats; |
581 | 429k | options->memory_monitor = tablet_options.memory_monitor; |
582 | 429k | if (FLAGS_db_write_buffer_size != -1) { |
583 | 163 | options->write_buffer_size = FLAGS_db_write_buffer_size; |
584 | 429k | } else { |
585 | 429k | options->write_buffer_size = FLAGS_memstore_size_mb * 1_MB; |
586 | 429k | } |
587 | 429k | options->env = tablet_options.rocksdb_env; |
588 | 429k | options->checkpoint_env = rocksdb::Env::Default(); |
589 | 429k | options->priority_thread_pool_for_compactions_and_flushes = |
590 | 429k | (tablet_options.ServerMetricEntity) ? |
591 | 373k | GetGlobalPriorityThreadPool(tablet_options.ServerMetricEntity) : |
592 | 55.7k | GetGlobalPriorityThreadPool(); |
593 | | |
594 | 429k | if (FLAGS_num_reserved_small_compaction_threads != -1) { |
595 | 0 | options->num_reserved_small_compaction_threads = FLAGS_num_reserved_small_compaction_threads; |
596 | 0 | } |
597 | | |
598 | | // Since the flag validator for FLAGS_compression_type will fail if the result of this call is not |
599 | | // OK, this CHECK_RESULT should never fail and is safe. |
600 | 429k | options->compression = CHECK_RESULT(GetConfiguredCompressionType(FLAGS_compression_type)); |
601 | | |
602 | 429k | options->listeners.insert( |
603 | 429k | options->listeners.end(), tablet_options.listeners.begin(), |
604 | 429k | tablet_options.listeners.end()); // Append listeners |
605 | | |
606 | | // Set block cache options. |
607 | 429k | if (tablet_options.block_cache) { |
608 | 381k | table_options.block_cache = tablet_options.block_cache; |
609 | | // Cache the bloom filters in the block cache. |
610 | 381k | table_options.cache_index_and_filter_blocks = true; |
611 | 47.8k | } else { |
612 | 47.8k | table_options.no_block_cache = true; |
613 | 47.8k | table_options.cache_index_and_filter_blocks = false; |
614 | 47.8k | } |
615 | | |
616 | 429k | AutoInitFromBlockBasedTableOptions(&table_options); |
617 | | |
618 | | // Set our custom bloom filter that is docdb aware. |
619 | 429k | if (FLAGS_use_docdb_aware_bloom_filter) { |
620 | 422k | const auto filter_block_size_bits = table_options.filter_block_size * 8; |
621 | 422k | table_options.filter_policy = std::make_shared<const DocDbAwareV3FilterPolicy>( |
622 | 422k | filter_block_size_bits, options->info_log.get()); |
623 | 422k | table_options.supported_filter_policies = |
624 | 422k | std::make_shared<rocksdb::BlockBasedTableOptions::FilterPoliciesMap>(); |
625 | 422k | AddSupportedFilterPolicy(std::make_shared<const DocDbAwareHashedComponentsFilterPolicy>( |
626 | 422k | filter_block_size_bits, options->info_log.get()), &table_options); |
627 | 422k | AddSupportedFilterPolicy(std::make_shared<const DocDbAwareV2FilterPolicy>( |
628 | 422k | filter_block_size_bits, options->info_log.get()), &table_options); |
629 | 422k | } |
630 | | |
631 | 430k | if (FLAGS_use_multi_level_index) { |
632 | 430k | table_options.index_type = rocksdb::IndexType::kMultiLevelBinarySearch; |
633 | 18.4E | } else { |
634 | 18.4E | table_options.index_type = rocksdb::IndexType::kBinarySearch; |
635 | 18.4E | } |
636 | | |
637 | 429k | options->table_factory.reset(rocksdb::NewBlockBasedTableFactory(table_options)); |
638 | | |
639 | | // Compaction related options. |
640 | | |
641 | | // Enable universal style compactions. |
642 | 429k | bool compactions_enabled = !FLAGS_rocksdb_disable_compactions; |
643 | 429k | options->compaction_style = compactions_enabled |
644 | 429k | ? rocksdb::CompactionStyle::kCompactionStyleUniversal |
645 | 18.4E | : rocksdb::CompactionStyle::kCompactionStyleNone; |
646 | | // Set the number of levels to 1. |
647 | 429k | options->num_levels = 1; |
648 | | |
649 | 429k | AutoInitFromRocksDBFlags(options); |
650 | 430k | if (compactions_enabled) { |
651 | 430k | options->level0_file_num_compaction_trigger = FLAGS_rocksdb_level0_file_num_compaction_trigger; |
652 | 430k | options->level0_slowdown_writes_trigger = max_if_negative( |
653 | 430k | FLAGS_rocksdb_level0_slowdown_writes_trigger); |
654 | 430k | options->level0_stop_writes_trigger = max_if_negative(FLAGS_rocksdb_level0_stop_writes_trigger); |
655 | | // This determines the algo used to compute which files will be included. The "total size" based |
656 | | // computation compares the size of every new file with the sum of all files included so far. |
657 | 430k | options->compaction_options_universal.stop_style = |
658 | 430k | rocksdb::CompactionStopStyle::kCompactionStopStyleTotalSize; |
659 | 430k | options->compaction_options_universal.size_ratio = |
660 | 430k | FLAGS_rocksdb_universal_compaction_size_ratio; |
661 | 430k | options->compaction_options_universal.always_include_size_threshold = |
662 | 430k | FLAGS_rocksdb_universal_compaction_always_include_size_threshold; |
663 | 430k | options->compaction_options_universal.min_merge_width = |
664 | 430k | FLAGS_rocksdb_universal_compaction_min_merge_width; |
665 | 430k | options->compaction_size_threshold_bytes = FLAGS_rocksdb_compaction_size_threshold_bytes; |
666 | 370k | options->rate_limiter = tablet_options.rate_limiter ? tablet_options.rate_limiter |
667 | 59.2k | : CreateRocksDBRateLimiter(); |
668 | 18.4E | } else { |
669 | 18.4E | options->level0_slowdown_writes_trigger = std::numeric_limits<int>::max(); |
670 | 18.4E | options->level0_stop_writes_trigger = std::numeric_limits<int>::max(); |
671 | 18.4E | } |
672 | | |
673 | 429k | options->max_write_buffer_number = FLAGS_rocksdb_max_write_buffer_number; |
674 | | |
675 | 429k | options->memtable_factory = std::make_shared<rocksdb::SkipListFactory>( |
676 | 429k | 0 /* lookahead */, rocksdb::ConcurrentWrites::kFalse); |
677 | | |
678 | 429k | options->iterator_replacer = std::make_shared<rocksdb::IteratorReplacer>(&WrapIterator); |
679 | 429k | } |
680 | | |
681 | 538k | void SetLogPrefix(rocksdb::Options* options, const std::string& log_prefix) { |
682 | 538k | options->log_prefix = log_prefix; |
683 | 538k | options->info_log = std::make_shared<YBRocksDBLogger>(options->log_prefix); |
684 | 538k | } |
685 | | |
686 | | namespace { |
687 | | |
688 | | // Helper class for RocksDBPatcher. |
689 | | class RocksDBPatcherHelper { |
690 | | public: |
691 | | explicit RocksDBPatcherHelper(rocksdb::VersionSet* version_set) |
692 | | : version_set_(version_set), cfd_(version_set->GetColumnFamilySet()->GetDefault()), |
693 | 862 | delete_edit_(cfd_), add_edit_(cfd_) { |
694 | 862 | } |
695 | | |
696 | 1.72k | int Levels() const { |
697 | 1.72k | return cfd_->NumberLevels(); |
698 | 1.72k | } |
699 | | |
700 | 862 | const std::vector<rocksdb::FileMetaData*>& LevelFiles(int level) { |
701 | 862 | return cfd_->current()->storage_info()->LevelFiles(level); |
702 | 862 | } |
703 | | |
704 | | template <class F> |
705 | 862 | auto IterateFiles(const F& f) { |
706 | | // Auto routing based on f return type. |
707 | 862 | return IterateFilesHelper(f, static_cast<decltype(f(0, *LevelFiles(0).front()))*>(nullptr)); |
708 | 862 | } Unexecuted instantiation: docdb_rocksdb_util.cc:_ZN2yb5docdb12_GLOBAL__N_120RocksDBPatcherHelper12IterateFilesIZNS0_14RocksDBPatcher4Impl19SetHybridTimeFilterENS_10HybridTimeEEUliRKN7rocksdb12FileMetaDataEE_EEDaRKT_ docdb_rocksdb_util.cc:_ZN2yb5docdb12_GLOBAL__N_120RocksDBPatcherHelper12IterateFilesIZNS0_14RocksDBPatcher4Impl21ModifyFlushedFrontierERKNS0_17ConsensusFrontierEEUliN7rocksdb12FileMetaDataEE_EEDaRKT_ Line | Count | Source | 705 | 862 | auto IterateFiles(const F& f) { | 706 | | // Auto routing based on f return type. | 707 | 862 | return IterateFilesHelper(f, static_cast<decltype(f(0, *LevelFiles(0).front()))*>(nullptr)); | 708 | 862 | } |
Unexecuted instantiation: docdb_rocksdb_util.cc:_ZN2yb5docdb12_GLOBAL__N_120RocksDBPatcherHelper12IterateFilesIZNS0_14RocksDBPatcher4Impl15UpdateFileSizesEvEUliRKN7rocksdb12FileMetaDataEE_EEDaRKT_ |
709 | | |
710 | 1.72k | void ModifyFile(int level, const rocksdb::FileMetaData& fmd) { |
711 | 1.72k | delete_edit_->DeleteFile(level, fmd.fd.GetNumber()); |
712 | 1.72k | add_edit_->AddCleanedFile(level, fmd); |
713 | 1.72k | } |
714 | | |
715 | 862 | rocksdb::VersionEdit& Edit() { |
716 | 862 | return *add_edit_; |
717 | 862 | } |
718 | | |
719 | | CHECKED_STATUS Apply( |
720 | 862 | const rocksdb::Options& options, const rocksdb::ImmutableCFOptions& imm_cf_options) { |
721 | 862 | if (!delete_edit_.modified() && !add_edit_.modified()) { |
722 | 0 | return Status::OK(); |
723 | 0 | } |
724 | | |
725 | 862 | rocksdb::MutableCFOptions mutable_cf_options(options, imm_cf_options); |
726 | 862 | { |
727 | 862 | rocksdb::InstrumentedMutex mutex; |
728 | 862 | rocksdb::InstrumentedMutexLock lock(&mutex); |
729 | 1.72k | for (auto* edit : {&delete_edit_, &add_edit_}) { |
730 | 1.72k | if (edit->modified()) { |
731 | 1.72k | RETURN_NOT_OK(version_set_->LogAndApply(cfd_, mutable_cf_options, edit->get(), &mutex)); |
732 | 1.72k | } |
733 | 1.72k | } |
734 | 862 | } |
735 | | |
736 | 862 | return Status::OK(); |
737 | 862 | } |
738 | | |
739 | | private: |
740 | | template <class F> |
741 | 862 | void IterateFilesHelper(const F& f, void*) { |
742 | 1.72k | for (int level = 0; level < Levels(); ++level) { |
743 | 1.72k | for (const auto* file : LevelFiles(level)) { |
744 | 1.72k | f(level, *file); |
745 | 1.72k | } |
746 | 862 | } |
747 | 862 | } Unexecuted instantiation: docdb_rocksdb_util.cc:_ZN2yb5docdb12_GLOBAL__N_120RocksDBPatcherHelper18IterateFilesHelperIZNS0_14RocksDBPatcher4Impl19SetHybridTimeFilterENS_10HybridTimeEEUliRKN7rocksdb12FileMetaDataEE_EEvRKT_Pv docdb_rocksdb_util.cc:_ZN2yb5docdb12_GLOBAL__N_120RocksDBPatcherHelper18IterateFilesHelperIZNS0_14RocksDBPatcher4Impl21ModifyFlushedFrontierERKNS0_17ConsensusFrontierEEUliN7rocksdb12FileMetaDataEE_EEvRKT_Pv Line | Count | Source | 741 | 862 | void IterateFilesHelper(const F& f, void*) { | 742 | 1.72k | for (int level = 0; level < Levels(); ++level) { | 743 | 1.72k | for (const auto* file : LevelFiles(level)) { | 744 | 1.72k | f(level, *file); | 745 | 1.72k | } | 746 | 862 | } | 747 | 862 | } |
|
748 | | |
749 | | template <class F> |
750 | 0 | CHECKED_STATUS IterateFilesHelper(const F& f, Status*) { |
751 | 0 | for (int level = 0; level < Levels(); ++level) { |
752 | 0 | for (const auto* file : LevelFiles(level)) { |
753 | 0 | RETURN_NOT_OK(f(level, *file)); |
754 | 0 | } |
755 | 0 | } |
756 | 0 | return Status::OK(); |
757 | 0 | } |
758 | | |
759 | | class TrackedEdit { |
760 | | public: |
761 | 1.72k | explicit TrackedEdit(rocksdb::ColumnFamilyData* cfd) { |
762 | 1.72k | edit_.SetColumnFamily(cfd->GetID()); |
763 | 1.72k | } |
764 | | |
765 | 6.03k | rocksdb::VersionEdit* get() { |
766 | 6.03k | modified_ = true; |
767 | 6.03k | return &edit_; |
768 | 6.03k | } |
769 | | |
770 | 3.44k | rocksdb::VersionEdit* operator->() { |
771 | 3.44k | return get(); |
772 | 3.44k | } |
773 | | |
774 | 862 | rocksdb::VersionEdit& operator*() { |
775 | 862 | return *get(); |
776 | 862 | } |
777 | | |
778 | 2.58k | bool modified() const { |
779 | 2.58k | return modified_; |
780 | 2.58k | } |
781 | | |
782 | | private: |
783 | | rocksdb::VersionEdit edit_; |
784 | | bool modified_ = false; |
785 | | }; |
786 | | |
787 | | rocksdb::VersionSet* version_set_; |
788 | | rocksdb::ColumnFamilyData* cfd_; |
789 | | TrackedEdit delete_edit_; |
790 | | TrackedEdit add_edit_; |
791 | | }; |
792 | | |
793 | | } // namespace |
794 | | |
795 | | class RocksDBPatcher::Impl { |
796 | | public: |
797 | | Impl(const std::string& dbpath, const rocksdb::Options& options) |
798 | | : options_(SanitizeOptions(dbpath, &comparator_, options)), |
799 | | imm_cf_options_(options_), |
800 | | env_options_(options_), |
801 | | cf_options_(options_), |
802 | 862 | version_set_(dbpath, &options_, env_options_, block_cache_.get(), &write_buffer_, nullptr) { |
803 | 862 | cf_options_.comparator = comparator_.user_comparator(); |
804 | 862 | } |
805 | | |
806 | 862 | CHECKED_STATUS Load() { |
807 | 862 | std::vector<rocksdb::ColumnFamilyDescriptor> column_families; |
808 | 862 | column_families.emplace_back("default", cf_options_); |
809 | 862 | return version_set_.Recover(column_families); |
810 | 862 | } |
811 | | |
812 | 0 | CHECKED_STATUS SetHybridTimeFilter(HybridTime value) { |
813 | 0 | RocksDBPatcherHelper helper(&version_set_); |
814 | |
|
815 | 0 | helper.IterateFiles([&helper, value](int level, const rocksdb::FileMetaData& file) { |
816 | 0 | if (!file.largest.user_frontier) { |
817 | 0 | return; |
818 | 0 | } |
819 | 0 | auto& consensus_frontier = down_cast<ConsensusFrontier&>(*file.largest.user_frontier); |
820 | 0 | if (consensus_frontier.hybrid_time() <= value || |
821 | 0 | consensus_frontier.hybrid_time_filter() <= value) { |
822 | 0 | return; |
823 | 0 | } |
824 | 0 | rocksdb::FileMetaData fmd = file; |
825 | 0 | down_cast<ConsensusFrontier&>(*fmd.largest.user_frontier).set_hybrid_time_filter(value); |
826 | 0 | helper.ModifyFile(level, fmd); |
827 | 0 | }); |
828 | |
|
829 | 0 | return helper.Apply(options_, imm_cf_options_); |
830 | 0 | } |
831 | | |
832 | 862 | CHECKED_STATUS ModifyFlushedFrontier(const ConsensusFrontier& frontier) { |
833 | 862 | RocksDBPatcherHelper helper(&version_set_); |
834 | | |
835 | 862 | docdb::ConsensusFrontier final_frontier = frontier; |
836 | | |
837 | 862 | auto* existing_frontier = down_cast<docdb::ConsensusFrontier*>(version_set_.FlushedFrontier()); |
838 | 862 | if (existing_frontier) { |
839 | 862 | if (!frontier.history_cutoff()) { |
840 | 862 | final_frontier.set_history_cutoff(existing_frontier->history_cutoff()); |
841 | 862 | } |
842 | 862 | if (!frontier.op_id()) { |
843 | | // Update op id only if it was specified in frontier. |
844 | 0 | final_frontier.set_op_id(existing_frontier->op_id()); |
845 | 0 | } |
846 | 862 | } |
847 | | |
848 | 862 | helper.Edit().ModifyFlushedFrontier( |
849 | 862 | final_frontier.Clone(), rocksdb::FrontierModificationMode::kForce); |
850 | | |
851 | 1.72k | helper.IterateFiles([&helper, &frontier](int level, rocksdb::FileMetaData fmd) { |
852 | 1.72k | bool modified = false; |
853 | 3.44k | for (auto* user_frontier : {&fmd.smallest.user_frontier, &fmd.largest.user_frontier}) { |
854 | 3.44k | if (!*user_frontier) { |
855 | 0 | continue; |
856 | 0 | } |
857 | 3.44k | auto& consensus_frontier = down_cast<ConsensusFrontier&>(**user_frontier); |
858 | 3.44k | if (!consensus_frontier.op_id().empty()) { |
859 | 3.44k | consensus_frontier.set_op_id(OpId()); |
860 | 3.44k | modified = true; |
861 | 3.44k | } |
862 | 3.44k | if (frontier.history_cutoff()) { |
863 | 0 | consensus_frontier.set_history_cutoff(frontier.history_cutoff()); |
864 | 0 | modified = true; |
865 | 0 | } |
866 | 3.44k | } |
867 | 1.72k | if (modified) { |
868 | 1.72k | helper.ModifyFile(level, fmd); |
869 | 1.72k | } |
870 | 1.72k | }); |
871 | | |
872 | 862 | return helper.Apply(options_, imm_cf_options_); |
873 | 862 | } |
874 | | |
875 | 0 | CHECKED_STATUS UpdateFileSizes() { |
876 | 0 | RocksDBPatcherHelper helper(&version_set_); |
877 | |
|
878 | 0 | RETURN_NOT_OK(helper.IterateFiles( |
879 | 0 | [&helper, this](int level, const rocksdb::FileMetaData& file) -> Status { |
880 | 0 | auto base_path = rocksdb::MakeTableFileName( |
881 | 0 | options_.db_paths[file.fd.GetPathId()].path, file.fd.GetNumber()); |
882 | 0 | auto data_path = rocksdb::TableBaseToDataFileName(base_path); |
883 | 0 | auto base_size = VERIFY_RESULT(options_.env->GetFileSize(base_path)); |
884 | 0 | auto data_size = VERIFY_RESULT(options_.env->GetFileSize(data_path)); |
885 | 0 | auto total_size = base_size + data_size; |
886 | 0 | if (file.fd.base_file_size == base_size && file.fd.total_file_size == total_size) { |
887 | 0 | return Status::OK(); |
888 | 0 | } |
889 | 0 | rocksdb::FileMetaData fmd = file; |
890 | 0 | fmd.fd.base_file_size = base_size; |
891 | 0 | fmd.fd.total_file_size = total_size; |
892 | 0 | helper.ModifyFile(level, fmd); |
893 | 0 | return Status::OK(); |
894 | 0 | })); |
895 | |
|
896 | 0 | return helper.Apply(options_, imm_cf_options_); |
897 | 0 | } |
898 | | |
899 | | private: |
900 | | const rocksdb::InternalKeyComparator comparator_{rocksdb::BytewiseComparator()}; |
901 | | rocksdb::WriteBuffer write_buffer_{1_KB}; |
902 | | std::shared_ptr<rocksdb::Cache> block_cache_{rocksdb::NewLRUCache(1_MB)}; |
903 | | |
904 | | rocksdb::Options options_; |
905 | | rocksdb::ImmutableCFOptions imm_cf_options_; |
906 | | rocksdb::EnvOptions env_options_; |
907 | | rocksdb::ColumnFamilyOptions cf_options_; |
908 | | rocksdb::VersionSet version_set_; |
909 | | }; |
910 | | |
911 | | RocksDBPatcher::RocksDBPatcher(const std::string& dbpath, const rocksdb::Options& options) |
912 | 862 | : impl_(new Impl(dbpath, options)) { |
913 | 862 | } |
914 | | |
915 | 862 | RocksDBPatcher::~RocksDBPatcher() { |
916 | 862 | } |
917 | | |
918 | 862 | Status RocksDBPatcher::Load() { |
919 | 862 | return impl_->Load(); |
920 | 862 | } |
921 | | |
922 | 0 | Status RocksDBPatcher::SetHybridTimeFilter(HybridTime value) { |
923 | 0 | return impl_->SetHybridTimeFilter(value); |
924 | 0 | } |
925 | | |
926 | 862 | Status RocksDBPatcher::ModifyFlushedFrontier(const ConsensusFrontier& frontier) { |
927 | 862 | return impl_->ModifyFlushedFrontier(frontier); |
928 | 862 | } |
929 | | |
930 | 0 | Status RocksDBPatcher::UpdateFileSizes() { |
931 | 0 | return impl_->UpdateFileSizes(); |
932 | 0 | } |
933 | | |
934 | 134 | Status ForceRocksDBCompact(rocksdb::DB* db) { |
935 | 134 | RETURN_NOT_OK_PREPEND( |
936 | 134 | db->CompactRange(rocksdb::CompactRangeOptions(), /* begin = */ nullptr, /* end = */ nullptr), |
937 | 134 | "Compact range failed:"); |
938 | 134 | return Status::OK(); |
939 | 134 | } |
940 | | |
941 | 5.81k | RateLimiterSharingMode GetRocksDBRateLimiterSharingMode() { |
942 | 5.81k | auto result = ParseEnumInsensitive<RateLimiterSharingMode>( |
943 | 5.81k | FLAGS_rocksdb_compact_flush_rate_limit_sharing_mode); |
944 | 5.81k | if (PREDICT_TRUE(result.ok())) { |
945 | 5.81k | return *result; |
946 | 5.81k | } |
947 | 0 | LOG(DFATAL) << result.status(); |
948 | 0 | return RateLimiterSharingMode::NONE; |
949 | 0 | } |
950 | | |
951 | 63.8k | std::shared_ptr<rocksdb::RateLimiter> CreateRocksDBRateLimiter() { |
952 | 63.8k | if (PREDICT_TRUE((FLAGS_rocksdb_compact_flush_rate_limit_bytes_per_sec > 0))) { |
953 | 63.8k | return std::shared_ptr<rocksdb::RateLimiter>( |
954 | 63.8k | rocksdb::NewGenericRateLimiter(FLAGS_rocksdb_compact_flush_rate_limit_bytes_per_sec)); |
955 | 63.8k | } |
956 | 10 | return nullptr; |
957 | 10 | } |
958 | | |
959 | | } // namespace docdb |
960 | | } // namespace yb |