/Users/deen/code/yugabyte-db/src/yb/docdb/docdb_util.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | // |
13 | | |
14 | | #include "yb/docdb/docdb_util.h" |
15 | | |
16 | | #include "yb/docdb/consensus_frontier.h" |
17 | | #include "yb/docdb/doc_key.h" |
18 | | #include "yb/docdb/docdb.h" |
19 | | #include "yb/docdb/docdb_debug.h" |
20 | | #include "yb/docdb/docdb_rocksdb_util.h" |
21 | | #include "yb/docdb/rocksdb_writer.h" |
22 | | |
23 | | #include "yb/rocksutil/write_batch_formatter.h" |
24 | | #include "yb/rocksutil/yb_rocksdb.h" |
25 | | |
26 | | #include "yb/tablet/tablet_options.h" |
27 | | |
28 | | #include "yb/util/env.h" |
29 | | #include "yb/util/status_format.h" |
30 | | #include "yb/util/string_trim.h" |
31 | | #include "yb/docdb/docdb_pgapi.h" |
32 | | |
33 | | using std::string; |
34 | | using std::make_shared; |
35 | | using std::endl; |
36 | | using strings::Substitute; |
37 | | using yb::FormatBytesAsStr; |
38 | | using yb::util::ApplyEagerLineContinuation; |
39 | | using std::vector; |
40 | | |
41 | | namespace yb { |
42 | | namespace docdb { |
43 | | |
44 | | void SetValueFromQLBinaryWrapper( |
45 | 39.1k | QLValuePB ql_value, const int pg_data_type, DatumMessagePB* cdc_datum_message) { |
46 | 39.1k | WARN_NOT_OK(yb::docdb::SetValueFromQLBinary(ql_value, pg_data_type, cdc_datum_message), "Failed"); |
47 | 39.1k | } |
48 | | |
49 | 0 | rocksdb::DB* DocDBRocksDBUtil::rocksdb() { |
50 | 0 | return DCHECK_NOTNULL(regular_db_.get()); |
51 | 0 | } |
52 | | |
53 | 0 | rocksdb::DB* DocDBRocksDBUtil::intents_db() { |
54 | 0 | return DCHECK_NOTNULL(intents_db_.get()); |
55 | 0 | } |
56 | | |
57 | 0 | std::string DocDBRocksDBUtil::IntentsDBDir() { |
58 | 0 | return rocksdb_dir_ + ".intents"; |
59 | 0 | } |
60 | | |
61 | 0 | Status DocDBRocksDBUtil::OpenRocksDB() { |
62 | | // Init the directory if needed. |
63 | 0 | if (rocksdb_dir_.empty()) { |
64 | 0 | RETURN_NOT_OK(InitRocksDBDir()); |
65 | 0 | } |
66 | |
|
67 | 0 | rocksdb::DB* rocksdb = nullptr; |
68 | 0 | RETURN_NOT_OK(rocksdb::DB::Open(regular_db_options_, rocksdb_dir_, &rocksdb)); |
69 | 0 | LOG(INFO) << "Opened RocksDB at " << rocksdb_dir_; |
70 | 0 | regular_db_.reset(rocksdb); |
71 | |
|
72 | 0 | rocksdb = nullptr; |
73 | 0 | RETURN_NOT_OK(rocksdb::DB::Open(intents_db_options_, IntentsDBDir(), &rocksdb)); |
74 | 0 | intents_db_.reset(rocksdb); |
75 | |
|
76 | 0 | return Status::OK(); |
77 | 0 | } |
78 | | |
79 | 0 | void DocDBRocksDBUtil::CloseRocksDB() { |
80 | 0 | intents_db_.reset(); |
81 | 0 | regular_db_.reset(); |
82 | 0 | } |
83 | | |
84 | 0 | Status DocDBRocksDBUtil::ReopenRocksDB() { |
85 | 0 | CloseRocksDB(); |
86 | 0 | return OpenRocksDB(); |
87 | 0 | } |
88 | | |
89 | 0 | Status DocDBRocksDBUtil::DestroyRocksDB() { |
90 | 0 | CloseRocksDB(); |
91 | 0 | LOG(INFO) << "Destroying RocksDB database at " << rocksdb_dir_; |
92 | 0 | RETURN_NOT_OK(rocksdb::DestroyDB(rocksdb_dir_, regular_db_options_)); |
93 | 0 | RETURN_NOT_OK(rocksdb::DestroyDB(IntentsDBDir(), intents_db_options_)); |
94 | 0 | return Status::OK(); |
95 | 0 | } |
96 | | |
97 | 0 | void DocDBRocksDBUtil::ResetMonotonicCounter() { |
98 | 0 | monotonic_counter_.store(0); |
99 | 0 | } |
100 | | |
101 | | namespace { |
102 | | |
103 | | class DirectWriteToWriteBatchHandler : public rocksdb::DirectWriteHandler { |
104 | | public: |
105 | | explicit DirectWriteToWriteBatchHandler(rocksdb::WriteBatch *write_batch) |
106 | 0 | : write_batch_(write_batch) {} |
107 | | |
108 | 0 | void Put(const SliceParts& key, const SliceParts& value) override { |
109 | 0 | write_batch_->Put(key, value); |
110 | 0 | } |
111 | | |
112 | 0 | void SingleDelete(const Slice& key) override { |
113 | 0 | write_batch_->SingleDelete(key); |
114 | 0 | } |
115 | | |
116 | | private: |
117 | | rocksdb::WriteBatch *write_batch_; |
118 | | }; |
119 | | |
120 | | } // namespace |
121 | | |
122 | | Status DocDBRocksDBUtil::PopulateRocksDBWriteBatch( |
123 | | const DocWriteBatch& dwb, |
124 | | rocksdb::WriteBatch* rocksdb_write_batch, |
125 | | HybridTime hybrid_time, |
126 | | bool decode_dockey, |
127 | | bool increment_write_id, |
128 | 0 | PartialRangeKeyIntents partial_range_key_intents) const { |
129 | 0 | if (decode_dockey) { |
130 | 0 | for (const auto& entry : dwb.key_value_pairs()) { |
131 | | // Skip key validation for external intents. |
132 | 0 | if (!entry.first.empty() && entry.first[0] == ValueTypeAsChar::kExternalTransactionId) { |
133 | 0 | continue; |
134 | 0 | } |
135 | 0 | SubDocKey subdoc_key; |
136 | | // We don't expect any invalid encoded keys in the write batch. However, these encoded keys |
137 | | // don't contain the HybridTime. |
138 | 0 | RETURN_NOT_OK_PREPEND(subdoc_key.FullyDecodeFromKeyWithOptionalHybridTime(entry.first), |
139 | 0 | Substitute("when decoding key: $0", FormatBytesAsStr(entry.first))); |
140 | 0 | } |
141 | 0 | } |
142 | |
|
143 | 0 | if (current_txn_id_.is_initialized()) { |
144 | 0 | if (!increment_write_id) { |
145 | 0 | return STATUS( |
146 | 0 | InternalError, "For transactional write only increment_write_id=true is supported"); |
147 | 0 | } |
148 | 0 | KeyValueWriteBatchPB kv_write_batch; |
149 | 0 | dwb.TEST_CopyToWriteBatchPB(&kv_write_batch); |
150 | 0 | TransactionalWriter writer( |
151 | 0 | kv_write_batch, hybrid_time, *current_txn_id_, txn_isolation_level_, |
152 | 0 | partial_range_key_intents, /* replicated_batches_state= */ Slice(), intra_txn_write_id_); |
153 | 0 | DirectWriteToWriteBatchHandler handler(rocksdb_write_batch); |
154 | 0 | RETURN_NOT_OK(writer.Apply(&handler)); |
155 | 0 | intra_txn_write_id_ = writer.intra_txn_write_id(); |
156 | 0 | } else { |
157 | | // TODO: this block has common code with docdb::PrepareExternalWriteBatch and probably |
158 | | // can be refactored, so common code is reused. |
159 | 0 | IntraTxnWriteId write_id = 0; |
160 | 0 | for (const auto& entry : dwb.key_value_pairs()) { |
161 | 0 | string rocksdb_key; |
162 | 0 | if (hybrid_time.is_valid()) { |
163 | | // HybridTime provided. Append a PrimitiveValue with the HybridTime to the key. |
164 | 0 | const KeyBytes encoded_ht = |
165 | 0 | PrimitiveValue(DocHybridTime(hybrid_time, write_id)).ToKeyBytes(); |
166 | 0 | rocksdb_key = entry.first + encoded_ht.ToStringBuffer(); |
167 | 0 | } else { |
168 | | // Useful when printing out a write batch that does not yet know the HybridTime it will be |
169 | | // committed with. |
170 | 0 | rocksdb_key = entry.first; |
171 | 0 | } |
172 | 0 | rocksdb_write_batch->Put(rocksdb_key, entry.second); |
173 | 0 | if (increment_write_id) { |
174 | 0 | ++write_id; |
175 | 0 | } |
176 | 0 | } |
177 | 0 | } |
178 | 0 | return Status::OK(); |
179 | 0 | } |
180 | | |
181 | | Status DocDBRocksDBUtil::WriteToRocksDB( |
182 | | const DocWriteBatch& doc_write_batch, |
183 | | const HybridTime& hybrid_time, |
184 | | bool decode_dockey, |
185 | | bool increment_write_id, |
186 | 0 | PartialRangeKeyIntents partial_range_key_intents) { |
187 | 0 | if (doc_write_batch.IsEmpty()) { |
188 | 0 | return Status::OK(); |
189 | 0 | } |
190 | 0 | if (!hybrid_time.is_valid()) { |
191 | 0 | return STATUS_SUBSTITUTE(InvalidArgument, "Hybrid time is not valid: $0", |
192 | 0 | hybrid_time.ToString()); |
193 | 0 | } |
194 | | |
195 | 0 | ConsensusFrontiers frontiers; |
196 | 0 | rocksdb::WriteBatch rocksdb_write_batch; |
197 | 0 | if (!op_id_.empty()) { |
198 | 0 | ++op_id_.index; |
199 | 0 | set_op_id(op_id_, &frontiers); |
200 | 0 | set_hybrid_time(hybrid_time, &frontiers); |
201 | 0 | rocksdb_write_batch.SetFrontiers(&frontiers); |
202 | 0 | } |
203 | |
|
204 | 0 | RETURN_NOT_OK(PopulateRocksDBWriteBatch( |
205 | 0 | doc_write_batch, &rocksdb_write_batch, hybrid_time, decode_dockey, increment_write_id, |
206 | 0 | partial_range_key_intents)); |
207 | |
|
208 | 0 | rocksdb::DB* db = current_txn_id_ ? intents_db_.get() : regular_db_.get(); |
209 | 0 | rocksdb::Status rocksdb_write_status = db->Write(write_options(), &rocksdb_write_batch); |
210 | |
|
211 | 0 | if (!rocksdb_write_status.ok()) { |
212 | 0 | LOG(ERROR) << "Failed writing to RocksDB: " << rocksdb_write_status.ToString(); |
213 | 0 | return STATUS_SUBSTITUTE(RuntimeError, |
214 | 0 | "Error writing to RocksDB: $0", rocksdb_write_status.ToString()); |
215 | 0 | } |
216 | 0 | return Status::OK(); |
217 | 0 | } |
218 | | |
219 | 0 | Status DocDBRocksDBUtil::InitCommonRocksDBOptionsForTests() { |
220 | | // TODO(bojanserafimov): create MemoryMonitor? |
221 | 0 | const size_t cache_size = block_cache_size(); |
222 | 0 | if (cache_size > 0) { |
223 | 0 | block_cache_ = rocksdb::NewLRUCache(cache_size); |
224 | 0 | } |
225 | |
|
226 | 0 | regular_db_options_.statistics = rocksdb::CreateDBStatisticsForTests(/* for intents */ false); |
227 | 0 | intents_db_options_.statistics = rocksdb::CreateDBStatisticsForTests(/* for intents */ true); |
228 | 0 | RETURN_NOT_OK(ReinitDBOptions()); |
229 | 0 | InitRocksDBWriteOptions(&write_options_); |
230 | 0 | return Status::OK(); |
231 | 0 | } |
232 | | |
233 | 0 | Status DocDBRocksDBUtil::InitCommonRocksDBOptionsForBulkLoad() { |
234 | 0 | const size_t cache_size = block_cache_size(); |
235 | 0 | if (cache_size > 0) { |
236 | 0 | block_cache_ = rocksdb::NewLRUCache(cache_size); |
237 | 0 | } |
238 | | |
239 | | // Don't care about statistics/metrics as we don't keep metric registries during |
240 | | // bulk load. |
241 | 0 | regular_db_options_.statistics = nullptr; |
242 | 0 | intents_db_options_.statistics = nullptr; |
243 | 0 | RETURN_NOT_OK(ReinitDBOptions()); |
244 | 0 | InitRocksDBWriteOptions(&write_options_); |
245 | 0 | return Status::OK(); |
246 | 0 | } |
247 | | |
248 | | Status DocDBRocksDBUtil::WriteToRocksDBAndClear( |
249 | | DocWriteBatch* dwb, |
250 | | const HybridTime& hybrid_time, |
251 | 0 | bool decode_dockey, bool increment_write_id) { |
252 | 0 | RETURN_NOT_OK(WriteToRocksDB(*dwb, hybrid_time, decode_dockey, increment_write_id)); |
253 | 0 | dwb->Clear(); |
254 | 0 | return Status::OK(); |
255 | 0 | } |
256 | | |
257 | 0 | Status DocDBRocksDBUtil::WriteSimple(int index) { |
258 | 0 | auto encoded_doc_key = DocKey(PrimitiveValues(Format("row$0", index), 11111 * index)).Encode(); |
259 | 0 | op_id_.term = index / 2; |
260 | 0 | op_id_.index = index; |
261 | 0 | auto& dwb = DefaultDocWriteBatch(); |
262 | 0 | RETURN_NOT_OK(dwb.SetPrimitive( |
263 | 0 | DocPath(encoded_doc_key, PrimitiveValue(ColumnId(10))), PrimitiveValue(index))); |
264 | 0 | return WriteToRocksDBAndClear(&dwb, HybridTime::FromMicros(1000 * index)); |
265 | 0 | } |
266 | | |
267 | 0 | void DocDBRocksDBUtil::SetHistoryCutoffHybridTime(HybridTime history_cutoff) { |
268 | 0 | retention_policy_->SetHistoryCutoff(history_cutoff); |
269 | 0 | } |
270 | | |
271 | 0 | void DocDBRocksDBUtil::SetTableTTL(uint64_t ttl_msec) { |
272 | 0 | schema_.SetDefaultTimeToLive(ttl_msec); |
273 | 0 | retention_policy_->SetTableTTLForTests(MonoDelta::FromMilliseconds(ttl_msec)); |
274 | 0 | } |
275 | | |
276 | 0 | string DocDBRocksDBUtil::DocDBDebugDumpToStr() { |
277 | 0 | return yb::docdb::DocDBDebugDumpToStr(rocksdb()) + |
278 | 0 | yb::docdb::DocDBDebugDumpToStr(intents_db(), StorageDbType::kIntents); |
279 | 0 | } |
280 | | |
281 | | Status DocDBRocksDBUtil::SetPrimitive( |
282 | | const DocPath& doc_path, |
283 | | const Value& value, |
284 | | const HybridTime hybrid_time, |
285 | 0 | const ReadHybridTime& read_ht) { |
286 | 0 | auto dwb = MakeDocWriteBatch(); |
287 | 0 | RETURN_NOT_OK(dwb.SetPrimitive(doc_path, value, read_ht)); |
288 | 0 | return WriteToRocksDB(dwb, hybrid_time); |
289 | 0 | } |
290 | | |
291 | | Status DocDBRocksDBUtil::SetPrimitive( |
292 | | const DocPath& doc_path, |
293 | | const PrimitiveValue& primitive_value, |
294 | | const HybridTime hybrid_time, |
295 | 0 | const ReadHybridTime& read_ht) { |
296 | 0 | return SetPrimitive(doc_path, Value(primitive_value), hybrid_time, read_ht); |
297 | 0 | } |
298 | | |
299 | | Status DocDBRocksDBUtil::AddExternalIntents( |
300 | | const TransactionId& txn_id, |
301 | | const std::vector<ExternalIntent>& intents, |
302 | | const Uuid& involved_tablet, |
303 | 0 | HybridTime hybrid_time) { |
304 | 0 | class Provider : public ExternalIntentsProvider { |
305 | 0 | public: |
306 | 0 | Provider( |
307 | 0 | const std::vector<ExternalIntent>* intents, const Uuid& involved_tablet, |
308 | 0 | HybridTime hybrid_time) |
309 | 0 | : intents_(*intents), involved_tablet_(involved_tablet), hybrid_time_(hybrid_time) {} |
310 | |
|
311 | 0 | void SetKey(const Slice& slice) override { |
312 | 0 | key_.AppendRawBytes(slice); |
313 | 0 | } |
314 | |
|
315 | 0 | void SetValue(const Slice& slice) override { |
316 | 0 | value_ = slice; |
317 | 0 | } |
318 | |
|
319 | 0 | void Apply(rocksdb::WriteBatch* batch) { |
320 | 0 | KeyValuePairPB kv_pair; |
321 | 0 | kv_pair.set_key(key_.ToStringBuffer()); |
322 | 0 | kv_pair.set_value(value_.ToStringBuffer()); |
323 | 0 | ExternalTxnApplyState external_txn_apply_state; |
324 | 0 | AddExternalPairToWriteBatch( |
325 | 0 | kv_pair, hybrid_time_, /* write_id= */ 0, &external_txn_apply_state, |
326 | 0 | /* regular_write_batch= */ nullptr, batch); |
327 | 0 | } |
328 | |
|
329 | 0 | boost::optional<std::pair<Slice, Slice>> Next() override { |
330 | 0 | if (next_idx_ >= intents_.size()) { |
331 | 0 | return boost::none; |
332 | 0 | } |
333 | | |
334 | | // It is ok to have inefficient code in tests. |
335 | 0 | const auto& intent = intents_[next_idx_]; |
336 | 0 | ++next_idx_; |
337 | |
|
338 | 0 | intent_key_ = intent.doc_path.encoded_doc_key(); |
339 | 0 | for (const auto& subkey : intent.doc_path.subkeys()) { |
340 | 0 | subkey.AppendToKey(&intent_key_); |
341 | 0 | } |
342 | 0 | intent_value_ = intent.value.Encode(); |
343 | |
|
344 | 0 | return std::pair<Slice, Slice>(intent_key_.AsSlice(), intent_value_); |
345 | 0 | } |
346 | |
|
347 | 0 | const Uuid& InvolvedTablet() override { |
348 | 0 | return involved_tablet_; |
349 | 0 | } |
350 | |
|
351 | 0 | private: |
352 | 0 | const std::vector<ExternalIntent>& intents_; |
353 | 0 | const Uuid involved_tablet_; |
354 | 0 | const HybridTime hybrid_time_; |
355 | 0 | size_t next_idx_ = 0; |
356 | 0 | KeyBytes key_; |
357 | 0 | KeyBuffer value_; |
358 | |
|
359 | 0 | KeyBytes intent_key_; |
360 | 0 | std::string intent_value_; |
361 | 0 | }; |
362 | |
|
363 | 0 | Provider provider(&intents, involved_tablet, hybrid_time); |
364 | 0 | CombineExternalIntents(txn_id, &provider); |
365 | |
|
366 | 0 | rocksdb::WriteBatch rocksdb_write_batch; |
367 | 0 | provider.Apply(&rocksdb_write_batch); |
368 | |
|
369 | 0 | return intents_db_->Write(write_options(), &rocksdb_write_batch); |
370 | 0 | } |
371 | | |
372 | | Status DocDBRocksDBUtil::InsertSubDocument( |
373 | | const DocPath& doc_path, |
374 | | const SubDocument& value, |
375 | | const HybridTime hybrid_time, |
376 | | MonoDelta ttl, |
377 | 0 | const ReadHybridTime& read_ht) { |
378 | 0 | auto dwb = MakeDocWriteBatch(); |
379 | 0 | RETURN_NOT_OK(dwb.InsertSubDocument(doc_path, value, read_ht, |
380 | 0 | CoarseTimePoint::max(), rocksdb::kDefaultQueryId, ttl)); |
381 | 0 | return WriteToRocksDB(dwb, hybrid_time); |
382 | 0 | } |
383 | | |
384 | | Status DocDBRocksDBUtil::ExtendSubDocument( |
385 | | const DocPath& doc_path, |
386 | | const SubDocument& value, |
387 | | const HybridTime hybrid_time, |
388 | | MonoDelta ttl, |
389 | 0 | const ReadHybridTime& read_ht) { |
390 | 0 | auto dwb = MakeDocWriteBatch(); |
391 | 0 | RETURN_NOT_OK(dwb.ExtendSubDocument(doc_path, value, read_ht, |
392 | 0 | CoarseTimePoint::max(), rocksdb::kDefaultQueryId, ttl)); |
393 | 0 | return WriteToRocksDB(dwb, hybrid_time); |
394 | 0 | } |
395 | | |
396 | | Status DocDBRocksDBUtil::ExtendList( |
397 | | const DocPath& doc_path, |
398 | | const SubDocument& value, |
399 | | HybridTime hybrid_time, |
400 | 0 | const ReadHybridTime& read_ht) { |
401 | 0 | auto dwb = MakeDocWriteBatch(); |
402 | 0 | RETURN_NOT_OK(dwb.ExtendList(doc_path, value, read_ht, CoarseTimePoint::max())); |
403 | 0 | return WriteToRocksDB(dwb, hybrid_time); |
404 | 0 | } |
405 | | |
406 | | Status DocDBRocksDBUtil::ReplaceInList( |
407 | | const DocPath &doc_path, |
408 | | const int target_cql_index, |
409 | | const SubDocument& value, |
410 | | const ReadHybridTime& read_ht, |
411 | | const HybridTime& hybrid_time, |
412 | | const rocksdb::QueryId query_id, |
413 | | MonoDelta default_ttl, |
414 | | MonoDelta ttl, |
415 | 0 | UserTimeMicros user_timestamp) { |
416 | 0 | auto dwb = MakeDocWriteBatch(); |
417 | 0 | RETURN_NOT_OK(dwb.ReplaceCqlInList( |
418 | 0 | doc_path, target_cql_index, value, read_ht, CoarseTimePoint::max(), query_id, default_ttl, |
419 | 0 | ttl)); |
420 | 0 | return WriteToRocksDB(dwb, hybrid_time); |
421 | 0 | } |
422 | | |
423 | | Status DocDBRocksDBUtil::DeleteSubDoc( |
424 | | const DocPath& doc_path, |
425 | | HybridTime hybrid_time, |
426 | 0 | const ReadHybridTime& read_ht) { |
427 | 0 | auto dwb = MakeDocWriteBatch(); |
428 | 0 | RETURN_NOT_OK(dwb.DeleteSubDoc(doc_path, read_ht)); |
429 | 0 | return WriteToRocksDB(dwb, hybrid_time); |
430 | 0 | } |
431 | | |
432 | 0 | void DocDBRocksDBUtil::DocDBDebugDumpToConsole() { |
433 | 0 | DocDBDebugDump(regular_db_.get(), std::cerr, StorageDbType::kRegular); |
434 | 0 | } |
435 | | |
436 | 0 | Status DocDBRocksDBUtil::FlushRocksDbAndWait() { |
437 | 0 | rocksdb::FlushOptions flush_options; |
438 | 0 | flush_options.wait = true; |
439 | 0 | return rocksdb()->Flush(flush_options); |
440 | 0 | } |
441 | | |
442 | 0 | Status DocDBRocksDBUtil::ReinitDBOptions() { |
443 | 0 | tablet::TabletOptions tablet_options; |
444 | 0 | tablet_options.block_cache = block_cache_; |
445 | 0 | docdb::InitRocksDBOptions( |
446 | 0 | ®ular_db_options_, "[R] " /* log_prefix */, regular_db_options_.statistics, |
447 | 0 | tablet_options); |
448 | 0 | docdb::InitRocksDBOptions( |
449 | 0 | &intents_db_options_, "[I] " /* log_prefix */, intents_db_options_.statistics, |
450 | 0 | tablet_options); |
451 | 0 | regular_db_options_.compaction_filter_factory = |
452 | 0 | std::make_shared<docdb::DocDBCompactionFilterFactory>( |
453 | 0 | retention_policy_, &KeyBounds::kNoBounds); |
454 | 0 | regular_db_options_.compaction_file_filter_factory = |
455 | 0 | compaction_file_filter_factory_; |
456 | 0 | regular_db_options_.max_file_size_for_compaction = |
457 | 0 | max_file_size_for_compaction_; |
458 | 0 | if (!regular_db_) { |
459 | 0 | return Status::OK(); |
460 | 0 | } |
461 | 0 | return ReopenRocksDB(); |
462 | 0 | } |
463 | | |
464 | 0 | DocWriteBatch DocDBRocksDBUtil::MakeDocWriteBatch() { |
465 | 0 | return DocWriteBatch( |
466 | 0 | DocDB::FromRegularUnbounded(regular_db_.get()), init_marker_behavior_, &monotonic_counter_); |
467 | 0 | } |
468 | | |
469 | 0 | DocWriteBatch DocDBRocksDBUtil::MakeDocWriteBatch(InitMarkerBehavior init_marker_behavior) { |
470 | 0 | return DocWriteBatch( |
471 | 0 | DocDB::FromRegularUnbounded(regular_db_.get()), init_marker_behavior, &monotonic_counter_); |
472 | 0 | } |
473 | | |
474 | 0 | DocWriteBatch& DocDBRocksDBUtil::DefaultDocWriteBatch() { |
475 | 0 | if (!doc_write_batch_) { |
476 | 0 | doc_write_batch_ = MakeDocWriteBatch(); |
477 | 0 | } |
478 | |
|
479 | 0 | return *doc_write_batch_; |
480 | 0 | } |
481 | | |
482 | 0 | void DocDBRocksDBUtil::SetInitMarkerBehavior(InitMarkerBehavior init_marker_behavior) { |
483 | 0 | if (init_marker_behavior_ != init_marker_behavior) { |
484 | 0 | LOG(INFO) << "Setting init marker behavior to " << init_marker_behavior; |
485 | 0 | init_marker_behavior_ = init_marker_behavior; |
486 | 0 | } |
487 | 0 | } |
488 | | |
489 | | } // namespace docdb |
490 | | } // namespace yb |