/Users/deen/code/yugabyte-db/src/yb/docdb/docdb_compaction_filter.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | // |
13 | | |
14 | | #include "yb/docdb/docdb_compaction_filter.h" |
15 | | |
16 | | #include <memory> |
17 | | |
18 | | #include <glog/logging.h> |
19 | | |
20 | | #include "yb/docdb/consensus_frontier.h" |
21 | | #include "yb/docdb/doc_key.h" |
22 | | #include "yb/docdb/doc_ttl_util.h" |
23 | | #include "yb/docdb/key_bounds.h" |
24 | | #include "yb/docdb/value.h" |
25 | | #include "yb/docdb/value_type.h" |
26 | | |
27 | | #include "yb/rocksdb/compaction_filter.h" |
28 | | |
29 | | #include "yb/util/fast_varint.h" |
30 | | #include "yb/util/result.h" |
31 | | #include "yb/util/status_format.h" |
32 | | #include "yb/util/string_util.h" |
33 | | |
34 | | using std::shared_ptr; |
35 | | using std::unique_ptr; |
36 | | using std::unordered_set; |
37 | | using rocksdb::CompactionFilter; |
38 | | using rocksdb::VectorToString; |
39 | | using rocksdb::FilterDecision; |
40 | | |
41 | | namespace yb { |
42 | | namespace docdb { |
43 | | |
44 | | // ------------------------------------------------------------------------------------------------ |
45 | | |
46 | | DocDBCompactionFilter::DocDBCompactionFilter( |
47 | | HistoryRetentionDirective retention, |
48 | | IsMajorCompaction is_major_compaction, |
49 | | const KeyBounds* key_bounds) |
50 | | : retention_(std::move(retention)), |
51 | | key_bounds_(key_bounds), |
52 | 436 | is_major_compaction_(is_major_compaction) { |
53 | 436 | } |
54 | | |
55 | 433 | DocDBCompactionFilter::~DocDBCompactionFilter() { |
56 | 433 | } |
57 | | |
58 | | FilterDecision DocDBCompactionFilter::Filter( |
59 | | int level, const Slice& key, const Slice& existing_value, std::string* new_value, |
60 | 10.2M | bool* value_changed) { |
61 | 10.2M | auto result = const_cast<DocDBCompactionFilter*>(this)->DoFilter( |
62 | 10.2M | level, key, existing_value, new_value, value_changed); |
63 | 10.2M | if (!result.ok()) { |
64 | 0 | LOG(FATAL) << "Error filtering " << key.ToDebugString() << ": " << result.status(); |
65 | 0 | } |
66 | 10.2M | if (*result != FilterDecision::kKeep) { |
67 | 0 | VLOG(3) << "Discarding key: " << BestEffortDocDBKeyToStr(key); |
68 | 10.2M | } else { |
69 | 18.4E | VLOG(4) << "Keeping key: " << BestEffortDocDBKeyToStr(key); |
70 | 10.2M | } |
71 | 10.2M | return *result; |
72 | 10.2M | } |
73 | | |
74 | | Result<FilterDecision> DocDBCompactionFilter::DoFilter( |
75 | | int level, const Slice& key, const Slice& existing_value, std::string* new_value, |
76 | 10.2M | bool* value_changed) { |
77 | 10.2M | const HybridTime history_cutoff = retention_.history_cutoff; |
78 | | |
79 | 10.2M | if (!filter_usage_logged_) { |
80 | | // TODO: switch this to VLOG if it becomes too chatty. |
81 | 436 | LOG(INFO) << "DocDB compaction filter is being used for a " |
82 | 233 | << (is_major_compaction_ ? "major" : "minor") << " compaction" |
83 | 436 | << ", history_cutoff=" << history_cutoff; |
84 | 436 | filter_usage_logged_ = true; |
85 | 436 | } |
86 | | |
87 | 10.2M | if (!IsWithinBounds(key_bounds_, key) && |
88 | 0 | DecodeValueType(key) != ValueType::kTransactionApplyState) { |
89 | | // If we reach this point, then we're processing a record which should have been excluded by |
90 | | // proper use of GetLiveRanges(). We include this as a sanity check, but we should never get |
91 | | // here. |
92 | 0 | LOG(DFATAL) << "Unexpectedly filtered out-of-bounds key during compaction: " |
93 | 0 | << SubDocKey::DebugSliceToString(key) |
94 | 0 | << " with bounds: " << key_bounds_->ToString(); |
95 | 0 | return FilterDecision::kDiscard; |
96 | 0 | } |
97 | | |
98 | | // Just remove intent records from regular DB, because it was beta feature. |
99 | | // Currently intents are stored in separate DB. |
100 | 10.2M | if (DecodeValueType(key) == ValueType::kObsoleteIntentPrefix) { |
101 | 0 | return FilterDecision::kDiscard; |
102 | 0 | } |
103 | | |
104 | 10.2M | auto same_bytes = strings::MemoryDifferencePos( |
105 | 10.2M | key.data(), prev_subdoc_key_.data(), std::min(key.size(), prev_subdoc_key_.size())); |
106 | | |
107 | | // The number of initial components (including document key and subkeys) that this |
108 | | // SubDocKey shares with previous one. This does not care about the hybrid_time field. |
109 | 10.2M | size_t num_shared_components = sub_key_ends_.size(); |
110 | 25.2M | while (num_shared_components > 0 && sub_key_ends_[num_shared_components - 1] > same_bytes) { |
111 | 15.0M | --num_shared_components; |
112 | 15.0M | } |
113 | | |
114 | 10.2M | sub_key_ends_.resize(num_shared_components); |
115 | | |
116 | 10.2M | RETURN_NOT_OK(SubDocKey::DecodeDocKeyAndSubKeyEnds(key, &sub_key_ends_)); |
117 | 10.2M | const size_t new_stack_size = sub_key_ends_.size(); |
118 | | |
119 | | // Remove overwrite hybrid_times for components that are no longer relevant for the current |
120 | | // SubDocKey. |
121 | 10.2M | overwrite_.resize(std::min(overwrite_.size(), num_shared_components)); |
122 | 10.2M | DocHybridTime ht; |
123 | 10.2M | RETURN_NOT_OK(ht.DecodeFromEnd(key)); |
124 | | // We're comparing the hybrid time in this key with the stack top of overwrite_ht_ after |
125 | | // truncating the stack to the number of components in the common prefix of previous and current |
126 | | // key. |
127 | | // |
128 | | // Example (history_cutoff_ = 12): |
129 | | // -------------------------------------------------------------------------------------------- |
130 | | // Key overwrite_ht_ stack and relevant notes |
131 | | // -------------------------------------------------------------------------------------------- |
132 | | // k1 T10 [MinHT] |
133 | | // |
134 | | // k1 T5 [T10] |
135 | | // |
136 | | // k1 col1 T11 [T10, T11] |
137 | | // |
138 | | // k1 col1 T7 The stack does not get truncated (shared prefix length is 2), so |
139 | | // prev_overwrite_ht = 11. Removing this entry because 7 < 11. |
140 | | // The stack stays at [T10, T11]. |
141 | | // |
142 | | // k1 col2 T9 Truncating the stack to [T10], setting prev_overwrite_ht to 10, and therefore |
143 | | // deciding to remove this entry because 9 < 10. |
144 | | // |
145 | 10.2M | const DocHybridTime prev_overwrite_ht = |
146 | 10.2M | overwrite_.empty() ? DocHybridTime::kMin : overwrite_.back().doc_ht; |
147 | 10.2M | const Expiration prev_exp = |
148 | 10.2M | overwrite_.empty() ? Expiration() : overwrite_.back().expiration; |
149 | | |
150 | | // We only keep entries with hybrid_time equal to or later than the latest time the subdocument |
151 | | // was fully overwritten or deleted prior to or at the history cutoff time. The intuition is that |
152 | | // key/value pairs that were overwritten at or before history cutoff time will not be visible at |
153 | | // history cutoff time or any later time anyway. |
154 | | // |
155 | | // Furthermore, we only need to update the overwrite hybrid_time stack in case we have decided to |
156 | | // keep the new entry. Otherwise, the current entry's hybrid time ht is less than the previous |
157 | | // overwrite hybrid_time prev_overwrite_ht, and therefore it does not provide any new information |
158 | | // about key/value pairs that follow being overwritten at a particular hybrid time. Another way to |
159 | | // explain this is to look at the logic that follows. If we don't early-exit here while ht is less |
160 | | // than prev_overwrite_ht, we'll end up adding more prev_overwrite_ht values to the overwrite |
161 | | // hybrid_time stack, and we might as well do that while handling the next key/value pair that |
162 | | // does not get cleaned up the same way as this one. |
163 | | // |
164 | | // TODO: When more merge records are supported, isTtlRow should be redefined appropriately. |
165 | 10.2M | bool isTtlRow = IsMergeRecord(existing_value); |
166 | 10.2M | if (ht < prev_overwrite_ht && !isTtlRow) { |
167 | 3.28k | return FilterDecision::kDiscard; |
168 | 3.28k | } |
169 | | |
170 | | // Every subdocument was fully overwritten at least at the time any of its parents was fully |
171 | | // overwritten. |
172 | 10.2M | if (overwrite_.size() < new_stack_size - 1) { |
173 | 4.84M | overwrite_.resize(new_stack_size - 1, {prev_overwrite_ht, prev_exp}); |
174 | 4.84M | } |
175 | | |
176 | 10.2M | Expiration popped_exp = overwrite_.empty() ? Expiration() : overwrite_.back().expiration; |
177 | | // This will happen in case previous key has the same document key and subkeys as the current |
178 | | // key, and the only difference is in the hybrid_time. We want to replace the hybrid_time at the |
179 | | // top of the overwrite_ht stack in this case. |
180 | 10.2M | if (overwrite_.size() == new_stack_size) { |
181 | 5.22k | overwrite_.pop_back(); |
182 | 5.22k | } |
183 | | |
184 | | // Check whether current key is the same as the previous key, except for the timestamp. |
185 | 10.2M | if (same_bytes != sub_key_ends_.back()) { |
186 | 10.2M | within_merge_block_ = false; |
187 | 10.2M | } |
188 | | |
189 | | // See if we found a higher hybrid time not exceeding the history cutoff hybrid time at which the |
190 | | // subdocument (including a primitive value) rooted at the current key was fully overwritten. |
191 | | // In case of ht > history_cutoff_, we just keep the parent document's highest known overwrite |
192 | | // hybrid time that does not exceed the cutoff hybrid time. In that case this entry is obviously |
193 | | // too new to be garbage-collected. |
194 | 10.2M | if (ht.hybrid_time() > history_cutoff) { |
195 | 10.2M | AssignPrevSubDocKey(key.cdata(), same_bytes); |
196 | 10.2M | overwrite_.push_back({prev_overwrite_ht, prev_exp}); |
197 | 10.2M | return FilterDecision::kKeep; |
198 | 10.2M | } |
199 | | |
200 | | // Check for CQL columns deleted from the schema. This is done regardless of whether this is a |
201 | | // major or minor compaction. |
202 | | // |
203 | | // TODO: could there be a case when there is still a read request running that uses an old schema, |
204 | | // and we end up removing some data that the client expects to see? |
205 | 18.4E | if (sub_key_ends_.size() > 1) { |
206 | | // Column ID is the first subkey in every CQL row. |
207 | 20.9k | if (key[sub_key_ends_[0]] == ValueTypeAsChar::kColumnId) { |
208 | 0 | Slice column_id_slice(key.data() + sub_key_ends_[0] + 1, key.data() + sub_key_ends_[1]); |
209 | 0 | auto column_id_as_int64 = VERIFY_RESULT(util::FastDecodeSignedVarIntUnsafe(&column_id_slice)); |
210 | 0 | ColumnId column_id; |
211 | 0 | RETURN_NOT_OK(ColumnId::FromInt64(column_id_as_int64, &column_id)); |
212 | 0 | if (retention_.deleted_cols->count(column_id) != 0) { |
213 | 0 | return FilterDecision::kDiscard; |
214 | 0 | } |
215 | 18.4E | } |
216 | 20.9k | } |
217 | | |
218 | 18.4E | auto overwrite_ht = isTtlRow ? prev_overwrite_ht : std::max(prev_overwrite_ht, ht); |
219 | | |
220 | 18.4E | Value value; |
221 | 18.4E | Slice value_slice = existing_value; |
222 | 18.4E | RETURN_NOT_OK(value.DecodeControlFields(&value_slice)); |
223 | 18.4E | const auto value_type = static_cast<ValueType>( |
224 | 18.4E | value_slice.FirstByteOr(ValueTypeAsChar::kInvalid)); |
225 | 18.4E | const Expiration curr_exp(ht.hybrid_time(), value.ttl()); |
226 | | |
227 | | // If within the merge block. |
228 | | // If the row is a TTL row, delete it. |
229 | | // Otherwise, replace it with the cached TTL (i.e., apply merge). |
230 | | // Otherwise, |
231 | | // If this is a TTL row, cache TTL (start merge block). |
232 | | // If normal row, compute its ttl and continue. |
233 | | |
234 | 18.4E | Expiration expiration; |
235 | 18.4E | if (within_merge_block_) { |
236 | 0 | expiration = popped_exp; |
237 | 18.4E | } else if (ht.hybrid_time() >= prev_exp.write_ht && |
238 | 20.9k | (curr_exp.ttl != Value::kMaxTtl || isTtlRow)) { |
239 | 0 | expiration = curr_exp; |
240 | 18.4E | } else { |
241 | 18.4E | expiration = prev_exp; |
242 | 18.4E | } |
243 | | |
244 | 18.4E | overwrite_.push_back({overwrite_ht, expiration}); |
245 | | |
246 | 18.4E | if (overwrite_.size() != new_stack_size) { |
247 | 0 | return STATUS_FORMAT(Corruption, "Overwrite size does not match new_stack_size: $0 vs $1", |
248 | 0 | overwrite_.size(), new_stack_size); |
249 | 0 | } |
250 | 18.4E | AssignPrevSubDocKey(key.cdata(), same_bytes); |
251 | | |
252 | | // If the entry has the TTL flag, delete the entry. |
253 | 18.4E | if (isTtlRow) { |
254 | 0 | within_merge_block_ = true; |
255 | 0 | return FilterDecision::kDiscard; |
256 | 0 | } |
257 | | |
258 | | // Only check for expiration if the current hybrid time is at or below history cutoff. |
259 | | // The key could not have possibly expired by history_cutoff_ otherwise. |
260 | 18.4E | MonoDelta true_ttl = ComputeTTL(expiration.ttl, retention_.table_ttl); |
261 | 18.4E | const auto has_expired = HasExpiredTTL( |
262 | 18.4E | true_ttl == expiration.ttl ? expiration.write_ht : ht.hybrid_time(), |
263 | 18.4E | true_ttl, |
264 | 18.4E | history_cutoff); |
265 | | // As of 02/2017, we don't have init markers for top level documents in QL. As a result, we can |
266 | | // compact away each column if it has expired, including the liveness system column. The init |
267 | | // markers in Redis wouldn't be affected since they don't have any TTL associated with them and |
268 | | // the TTL would default to kMaxTtl which would make has_expired false. |
269 | 18.4E | if (has_expired) { |
270 | | // This is consistent with the condition we're testing for deletes at the bottom of the function |
271 | | // because ht_at_or_below_cutoff is implied by has_expired. |
272 | 0 | if (is_major_compaction_ && !retention_.retain_delete_markers_in_major_compaction) { |
273 | 0 | return FilterDecision::kDiscard; |
274 | 0 | } |
275 | | |
276 | | // During minor compactions, expired values are written back as tombstones because removing the |
277 | | // record might expose earlier values which would be incorrect. |
278 | 0 | *value_changed = true; |
279 | 0 | *new_value = Value::EncodedTombstone(); |
280 | 18.4E | } else if (within_merge_block_) { |
281 | 0 | *value_changed = true; |
282 | |
|
283 | 0 | if (expiration.ttl != Value::kMaxTtl) { |
284 | 0 | expiration.ttl += MonoDelta::FromMicroseconds( |
285 | 0 | overwrite_.back().expiration.write_ht.PhysicalDiff(ht.hybrid_time())); |
286 | 0 | overwrite_.back().expiration.ttl = expiration.ttl; |
287 | 0 | } |
288 | |
|
289 | 0 | *value.mutable_ttl() = expiration.ttl; |
290 | 0 | new_value->clear(); |
291 | | |
292 | | // We are reusing the existing encoded value without decoding/encoding it. |
293 | 0 | value.EncodeAndAppend(new_value, &value_slice); |
294 | 0 | within_merge_block_ = false; |
295 | 18.4E | } else if (value.intent_doc_ht().is_valid() && ht.hybrid_time() < history_cutoff) { |
296 | | // Cleanup intent doc hybrid time when we don't need it anymore. |
297 | | // See https://github.com/yugabyte/yugabyte-db/issues/4535 for details. |
298 | 0 | value.ClearIntentDocHt(); |
299 | |
|
300 | 0 | new_value->clear(); |
301 | | |
302 | | // We are reusing the existing encoded value without decoding/encoding it. |
303 | 0 | value.EncodeAndAppend(new_value, &value_slice); |
304 | 0 | } |
305 | | |
306 | | // If we are backfilling an index table, we want to preserve the delete markers in the table |
307 | | // until the backfill process is completed. For other normal use cases, delete markers/tombstones |
308 | | // can be cleaned up on a major compaction. |
309 | | // retention_.retain_delete_markers_in_major_compaction will be set to true until the index |
310 | | // backfill is complete. |
311 | | // |
312 | | // Tombstones at or below the history cutoff hybrid_time can always be cleaned up on full (major) |
313 | | // compactions. However, we do need to update the overwrite hybrid time stack in this case (as we |
314 | | // just did), because this deletion (tombstone) entry might be the only reason for cleaning up |
315 | | // more entries appearing at earlier hybrid times. |
316 | 18.4E | return value_type == ValueType::kTombstone && is_major_compaction_ && |
317 | 300 | !retention_.retain_delete_markers_in_major_compaction |
318 | 300 | ? FilterDecision::kDiscard |
319 | 18.4E | : FilterDecision::kKeep; |
320 | 18.4E | } |
321 | | |
322 | | void DocDBCompactionFilter::AssignPrevSubDocKey( |
323 | 10.2M | const char* data, size_t same_bytes) { |
324 | 10.2M | size_t size = sub_key_ends_.back(); |
325 | 10.2M | prev_subdoc_key_.resize(size); |
326 | 10.2M | memcpy(prev_subdoc_key_.data() + same_bytes, data + same_bytes, size - same_bytes); |
327 | 10.2M | } |
328 | | |
329 | | |
330 | 436 | rocksdb::UserFrontierPtr DocDBCompactionFilter::GetLargestUserFrontier() const { |
331 | 436 | auto* consensus_frontier = new ConsensusFrontier(); |
332 | 436 | consensus_frontier->set_history_cutoff(retention_.history_cutoff); |
333 | 436 | return rocksdb::UserFrontierPtr(consensus_frontier); |
334 | 436 | } |
335 | | |
336 | 0 | const char* DocDBCompactionFilter::Name() const { |
337 | 0 | return "DocDBCompactionFilter"; |
338 | 0 | } |
339 | | |
340 | 436 | std::vector<std::pair<Slice, Slice>> DocDBCompactionFilter::GetLiveRanges() const { |
341 | 436 | static constexpr char kApplyStateEndChar = ValueTypeAsChar::kTransactionApplyState + 1; |
342 | 436 | if (!key_bounds_ || (key_bounds_->lower.empty() && key_bounds_->upper.empty())) { |
343 | 257 | return {}; |
344 | 257 | } |
345 | 179 | auto end_apply_state_region = Slice(&kApplyStateEndChar, 1); |
346 | 179 | auto first_range = std::make_pair(Slice(), end_apply_state_region); |
347 | 179 | auto second_range = std::make_pair( |
348 | 179 | key_bounds_->lower.AsSlice().Less(end_apply_state_region) |
349 | 74 | ? end_apply_state_region |
350 | 105 | : key_bounds_->lower.AsSlice(), |
351 | 179 | key_bounds_->upper.AsSlice()); |
352 | | |
353 | 179 | return {first_range, second_range}; |
354 | 179 | } |
355 | | |
356 | | // ------------------------------------------------------------------------------------------------ |
357 | | |
358 | | DocDBCompactionFilterFactory::DocDBCompactionFilterFactory( |
359 | | std::shared_ptr<HistoryRetentionPolicy> retention_policy, const KeyBounds* key_bounds) |
360 | 220k | : retention_policy_(std::move(retention_policy)), key_bounds_(key_bounds) { |
361 | 220k | } |
362 | | |
363 | 208k | DocDBCompactionFilterFactory::~DocDBCompactionFilterFactory() { |
364 | 208k | } |
365 | | |
366 | | unique_ptr<CompactionFilter> DocDBCompactionFilterFactory::CreateCompactionFilter( |
367 | 436 | const CompactionFilter::Context& context) { |
368 | 436 | return std::make_unique<DocDBCompactionFilter>( |
369 | 436 | retention_policy_->GetRetentionDirective(), |
370 | 436 | IsMajorCompaction(context.is_full_compaction), |
371 | 436 | key_bounds_); |
372 | 436 | } |
373 | | |
374 | 1.32M | const char* DocDBCompactionFilterFactory::Name() const { |
375 | 1.32M | return "DocDBCompactionFilterFactory"; |
376 | 1.32M | } |
377 | | |
378 | | // ------------------------------------------------------------------------------------------------ |
379 | | |
380 | 0 | HistoryRetentionDirective ManualHistoryRetentionPolicy::GetRetentionDirective() { |
381 | 0 | std::lock_guard<std::mutex> lock(deleted_cols_mtx_); |
382 | 0 | return {history_cutoff_.load(std::memory_order_acquire), |
383 | 0 | std::make_shared<ColumnIds>(deleted_cols_), table_ttl_.load(std::memory_order_acquire), |
384 | 0 | ShouldRetainDeleteMarkersInMajorCompaction::kFalse}; |
385 | 0 | } |
386 | | |
387 | 0 | void ManualHistoryRetentionPolicy::SetHistoryCutoff(HybridTime history_cutoff) { |
388 | 0 | history_cutoff_.store(history_cutoff, std::memory_order_release); |
389 | 0 | } |
390 | | |
391 | 0 | void ManualHistoryRetentionPolicy::AddDeletedColumn(ColumnId col) { |
392 | 0 | std::lock_guard<std::mutex> lock(deleted_cols_mtx_); |
393 | 0 | deleted_cols_.insert(col); |
394 | 0 | } |
395 | | |
396 | 0 | void ManualHistoryRetentionPolicy::SetTableTTLForTests(MonoDelta ttl) { |
397 | 0 | table_ttl_.store(ttl, std::memory_order_release); |
398 | 0 | } |
399 | | |
400 | | } // namespace docdb |
401 | | } // namespace yb |