YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/docdb/docdb_compaction_filter.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include "yb/docdb/docdb_compaction_filter.h"
15
16
#include <memory>
17
18
#include <glog/logging.h>
19
20
#include "yb/docdb/consensus_frontier.h"
21
#include "yb/docdb/doc_key.h"
22
#include "yb/docdb/doc_ttl_util.h"
23
#include "yb/docdb/key_bounds.h"
24
#include "yb/docdb/value.h"
25
#include "yb/docdb/value_type.h"
26
27
#include "yb/rocksdb/compaction_filter.h"
28
29
#include "yb/util/fast_varint.h"
30
#include "yb/util/result.h"
31
#include "yb/util/status_format.h"
32
#include "yb/util/string_util.h"
33
34
using std::shared_ptr;
35
using std::unique_ptr;
36
using std::unordered_set;
37
using rocksdb::CompactionFilter;
38
using rocksdb::VectorToString;
39
using rocksdb::FilterDecision;
40
41
namespace yb {
42
namespace docdb {
43
44
// ------------------------------------------------------------------------------------------------
45
46
DocDBCompactionFilter::DocDBCompactionFilter(
47
    HistoryRetentionDirective retention,
48
    IsMajorCompaction is_major_compaction,
49
    const KeyBounds* key_bounds)
50
    : retention_(std::move(retention)),
51
      key_bounds_(key_bounds),
52
436
      is_major_compaction_(is_major_compaction) {
53
436
}
54
55
433
DocDBCompactionFilter::~DocDBCompactionFilter() {
56
433
}
57
58
FilterDecision DocDBCompactionFilter::Filter(
59
    int level, const Slice& key, const Slice& existing_value, std::string* new_value,
60
10.2M
    bool* value_changed) {
61
10.2M
  auto result = const_cast<DocDBCompactionFilter*>(this)->DoFilter(
62
10.2M
      level, key, existing_value, new_value, value_changed);
63
10.2M
  if (!result.ok()) {
64
0
    LOG(FATAL) << "Error filtering " << key.ToDebugString() << ": " << result.status();
65
0
  }
66
10.2M
  if (*result != FilterDecision::kKeep) {
67
0
    VLOG(3) << "Discarding key: " << BestEffortDocDBKeyToStr(key);
68
10.2M
  } else {
69
18.4E
    VLOG(4) << "Keeping key: " << BestEffortDocDBKeyToStr(key);
70
10.2M
  }
71
10.2M
  return *result;
72
10.2M
}
73
74
Result<FilterDecision> DocDBCompactionFilter::DoFilter(
75
    int level, const Slice& key, const Slice& existing_value, std::string* new_value,
76
10.2M
    bool* value_changed) {
77
10.2M
  const HybridTime history_cutoff = retention_.history_cutoff;
78
79
10.2M
  if (!filter_usage_logged_) {
80
    // TODO: switch this to VLOG if it becomes too chatty.
81
436
    LOG(INFO) << "DocDB compaction filter is being used for a "
82
233
              << (is_major_compaction_ ? "major" : "minor") << " compaction"
83
436
              << ", history_cutoff=" << history_cutoff;
84
436
    filter_usage_logged_ = true;
85
436
  }
86
87
10.2M
  if (!IsWithinBounds(key_bounds_, key) &&
88
0
      DecodeValueType(key) != ValueType::kTransactionApplyState) {
89
    // If we reach this point, then we're processing a record which should have been excluded by
90
    // proper use of GetLiveRanges(). We include this as a sanity check, but we should never get
91
    // here.
92
0
    LOG(DFATAL) << "Unexpectedly filtered out-of-bounds key during compaction: "
93
0
        << SubDocKey::DebugSliceToString(key)
94
0
        << " with bounds: " << key_bounds_->ToString();
95
0
    return FilterDecision::kDiscard;
96
0
  }
97
98
  // Just remove intent records from regular DB, because it was beta feature.
99
  // Currently intents are stored in separate DB.
100
10.2M
  if (DecodeValueType(key) == ValueType::kObsoleteIntentPrefix) {
101
0
    return FilterDecision::kDiscard;
102
0
  }
103
104
10.2M
  auto same_bytes = strings::MemoryDifferencePos(
105
10.2M
      key.data(), prev_subdoc_key_.data(), std::min(key.size(), prev_subdoc_key_.size()));
106
107
  // The number of initial components (including document key and subkeys) that this
108
  // SubDocKey shares with previous one. This does not care about the hybrid_time field.
109
10.2M
  size_t num_shared_components = sub_key_ends_.size();
110
25.2M
  while (num_shared_components > 0 && sub_key_ends_[num_shared_components - 1] > same_bytes) {
111
15.0M
    --num_shared_components;
112
15.0M
  }
113
114
10.2M
  sub_key_ends_.resize(num_shared_components);
115
116
10.2M
  RETURN_NOT_OK(SubDocKey::DecodeDocKeyAndSubKeyEnds(key, &sub_key_ends_));
117
10.2M
  const size_t new_stack_size = sub_key_ends_.size();
118
119
  // Remove overwrite hybrid_times for components that are no longer relevant for the current
120
  // SubDocKey.
121
10.2M
  overwrite_.resize(std::min(overwrite_.size(), num_shared_components));
122
10.2M
  DocHybridTime ht;
123
10.2M
  RETURN_NOT_OK(ht.DecodeFromEnd(key));
124
  // We're comparing the hybrid time in this key with the stack top of overwrite_ht_ after
125
  // truncating the stack to the number of components in the common prefix of previous and current
126
  // key.
127
  //
128
  // Example (history_cutoff_ = 12):
129
  // --------------------------------------------------------------------------------------------
130
  // Key          overwrite_ht_ stack and relevant notes
131
  // --------------------------------------------------------------------------------------------
132
  // k1 T10       [MinHT]
133
  //
134
  // k1 T5        [T10]
135
  //
136
  // k1 col1 T11  [T10, T11]
137
  //
138
  // k1 col1 T7   The stack does not get truncated (shared prefix length is 2), so
139
  //              prev_overwrite_ht = 11. Removing this entry because 7 < 11.
140
  //              The stack stays at [T10, T11].
141
  //
142
  // k1 col2 T9   Truncating the stack to [T10], setting prev_overwrite_ht to 10, and therefore
143
  //              deciding to remove this entry because 9 < 10.
144
  //
145
10.2M
  const DocHybridTime prev_overwrite_ht =
146
10.2M
      overwrite_.empty() ? DocHybridTime::kMin : overwrite_.back().doc_ht;
147
10.2M
  const Expiration prev_exp =
148
10.2M
      overwrite_.empty() ? Expiration() : overwrite_.back().expiration;
149
150
  // We only keep entries with hybrid_time equal to or later than the latest time the subdocument
151
  // was fully overwritten or deleted prior to or at the history cutoff time. The intuition is that
152
  // key/value pairs that were overwritten at or before history cutoff time will not be visible at
153
  // history cutoff time or any later time anyway.
154
  //
155
  // Furthermore, we only need to update the overwrite hybrid_time stack in case we have decided to
156
  // keep the new entry. Otherwise, the current entry's hybrid time ht is less than the previous
157
  // overwrite hybrid_time prev_overwrite_ht, and therefore it does not provide any new information
158
  // about key/value pairs that follow being overwritten at a particular hybrid time. Another way to
159
  // explain this is to look at the logic that follows. If we don't early-exit here while ht is less
160
  // than prev_overwrite_ht, we'll end up adding more prev_overwrite_ht values to the overwrite
161
  // hybrid_time stack, and we might as well do that while handling the next key/value pair that
162
  // does not get cleaned up the same way as this one.
163
  //
164
  // TODO: When more merge records are supported, isTtlRow should be redefined appropriately.
165
10.2M
  bool isTtlRow = IsMergeRecord(existing_value);
166
10.2M
  if (ht < prev_overwrite_ht && !isTtlRow) {
167
3.28k
    return FilterDecision::kDiscard;
168
3.28k
  }
169
170
  // Every subdocument was fully overwritten at least at the time any of its parents was fully
171
  // overwritten.
172
10.2M
  if (overwrite_.size() < new_stack_size - 1) {
173
4.84M
    overwrite_.resize(new_stack_size - 1, {prev_overwrite_ht, prev_exp});
174
4.84M
  }
175
176
10.2M
  Expiration popped_exp = overwrite_.empty() ? Expiration() : overwrite_.back().expiration;
177
  // This will happen in case previous key has the same document key and subkeys as the current
178
  // key, and the only difference is in the hybrid_time. We want to replace the hybrid_time at the
179
  // top of the overwrite_ht stack in this case.
180
10.2M
  if (overwrite_.size() == new_stack_size) {
181
5.22k
    overwrite_.pop_back();
182
5.22k
  }
183
184
  // Check whether current key is the same as the previous key, except for the timestamp.
185
10.2M
  if (same_bytes != sub_key_ends_.back()) {
186
10.2M
    within_merge_block_ = false;
187
10.2M
  }
188
189
  // See if we found a higher hybrid time not exceeding the history cutoff hybrid time at which the
190
  // subdocument (including a primitive value) rooted at the current key was fully overwritten.
191
  // In case of ht > history_cutoff_, we just keep the parent document's highest known overwrite
192
  // hybrid time that does not exceed the cutoff hybrid time. In that case this entry is obviously
193
  // too new to be garbage-collected.
194
10.2M
  if (ht.hybrid_time() > history_cutoff) {
195
10.2M
    AssignPrevSubDocKey(key.cdata(), same_bytes);
196
10.2M
    overwrite_.push_back({prev_overwrite_ht, prev_exp});
197
10.2M
    return FilterDecision::kKeep;
198
10.2M
  }
199
200
  // Check for CQL columns deleted from the schema. This is done regardless of whether this is a
201
  // major or minor compaction.
202
  //
203
  // TODO: could there be a case when there is still a read request running that uses an old schema,
204
  //       and we end up removing some data that the client expects to see?
205
18.4E
  if (sub_key_ends_.size() > 1) {
206
    // Column ID is the first subkey in every CQL row.
207
20.9k
    if (key[sub_key_ends_[0]]  == ValueTypeAsChar::kColumnId) {
208
0
      Slice column_id_slice(key.data() + sub_key_ends_[0] + 1, key.data() + sub_key_ends_[1]);
209
0
      auto column_id_as_int64 = VERIFY_RESULT(util::FastDecodeSignedVarIntUnsafe(&column_id_slice));
210
0
      ColumnId column_id;
211
0
      RETURN_NOT_OK(ColumnId::FromInt64(column_id_as_int64, &column_id));
212
0
      if (retention_.deleted_cols->count(column_id) != 0) {
213
0
        return FilterDecision::kDiscard;
214
0
      }
215
18.4E
    }
216
20.9k
  }
217
218
18.4E
  auto overwrite_ht = isTtlRow ? prev_overwrite_ht : std::max(prev_overwrite_ht, ht);
219
220
18.4E
  Value value;
221
18.4E
  Slice value_slice = existing_value;
222
18.4E
  RETURN_NOT_OK(value.DecodeControlFields(&value_slice));
223
18.4E
  const auto value_type = static_cast<ValueType>(
224
18.4E
      value_slice.FirstByteOr(ValueTypeAsChar::kInvalid));
225
18.4E
  const Expiration curr_exp(ht.hybrid_time(), value.ttl());
226
227
  // If within the merge block.
228
  //     If the row is a TTL row, delete it.
229
  //     Otherwise, replace it with the cached TTL (i.e., apply merge).
230
  // Otherwise,
231
  //     If this is a TTL row, cache TTL (start merge block).
232
  //     If normal row, compute its ttl and continue.
233
234
18.4E
  Expiration expiration;
235
18.4E
  if (within_merge_block_) {
236
0
    expiration = popped_exp;
237
18.4E
  } else if (ht.hybrid_time() >= prev_exp.write_ht &&
238
20.9k
             (curr_exp.ttl != Value::kMaxTtl || isTtlRow)) {
239
0
    expiration = curr_exp;
240
18.4E
  } else {
241
18.4E
    expiration = prev_exp;
242
18.4E
  }
243
244
18.4E
  overwrite_.push_back({overwrite_ht, expiration});
245
246
18.4E
  if (overwrite_.size() != new_stack_size) {
247
0
    return STATUS_FORMAT(Corruption, "Overwrite size does not match new_stack_size: $0 vs $1",
248
0
                         overwrite_.size(), new_stack_size);
249
0
  }
250
18.4E
  AssignPrevSubDocKey(key.cdata(), same_bytes);
251
252
  // If the entry has the TTL flag, delete the entry.
253
18.4E
  if (isTtlRow) {
254
0
    within_merge_block_ = true;
255
0
    return FilterDecision::kDiscard;
256
0
  }
257
258
  // Only check for expiration if the current hybrid time is at or below history cutoff.
259
  // The key could not have possibly expired by history_cutoff_ otherwise.
260
18.4E
  MonoDelta true_ttl = ComputeTTL(expiration.ttl, retention_.table_ttl);
261
18.4E
  const auto has_expired = HasExpiredTTL(
262
18.4E
      true_ttl == expiration.ttl ? expiration.write_ht : ht.hybrid_time(),
263
18.4E
      true_ttl,
264
18.4E
      history_cutoff);
265
  // As of 02/2017, we don't have init markers for top level documents in QL. As a result, we can
266
  // compact away each column if it has expired, including the liveness system column. The init
267
  // markers in Redis wouldn't be affected since they don't have any TTL associated with them and
268
  // the TTL would default to kMaxTtl which would make has_expired false.
269
18.4E
  if (has_expired) {
270
    // This is consistent with the condition we're testing for deletes at the bottom of the function
271
    // because ht_at_or_below_cutoff is implied by has_expired.
272
0
    if (is_major_compaction_ && !retention_.retain_delete_markers_in_major_compaction) {
273
0
      return FilterDecision::kDiscard;
274
0
    }
275
276
    // During minor compactions, expired values are written back as tombstones because removing the
277
    // record might expose earlier values which would be incorrect.
278
0
    *value_changed = true;
279
0
    *new_value = Value::EncodedTombstone();
280
18.4E
  } else if (within_merge_block_) {
281
0
    *value_changed = true;
282
283
0
    if (expiration.ttl != Value::kMaxTtl) {
284
0
      expiration.ttl += MonoDelta::FromMicroseconds(
285
0
          overwrite_.back().expiration.write_ht.PhysicalDiff(ht.hybrid_time()));
286
0
      overwrite_.back().expiration.ttl = expiration.ttl;
287
0
    }
288
289
0
    *value.mutable_ttl() = expiration.ttl;
290
0
    new_value->clear();
291
292
    // We are reusing the existing encoded value without decoding/encoding it.
293
0
    value.EncodeAndAppend(new_value, &value_slice);
294
0
    within_merge_block_ = false;
295
18.4E
  } else if (value.intent_doc_ht().is_valid() && ht.hybrid_time() < history_cutoff) {
296
    // Cleanup intent doc hybrid time when we don't need it anymore.
297
    // See https://github.com/yugabyte/yugabyte-db/issues/4535 for details.
298
0
    value.ClearIntentDocHt();
299
300
0
    new_value->clear();
301
302
    // We are reusing the existing encoded value without decoding/encoding it.
303
0
    value.EncodeAndAppend(new_value, &value_slice);
304
0
  }
305
306
  // If we are backfilling an index table, we want to preserve the delete markers in the table
307
  // until the backfill process is completed. For other normal use cases, delete markers/tombstones
308
  // can be cleaned up on a major compaction.
309
  // retention_.retain_delete_markers_in_major_compaction will be set to true until the index
310
  // backfill is complete.
311
  //
312
  // Tombstones at or below the history cutoff hybrid_time can always be cleaned up on full (major)
313
  // compactions. However, we do need to update the overwrite hybrid time stack in this case (as we
314
  // just did), because this deletion (tombstone) entry might be the only reason for cleaning up
315
  // more entries appearing at earlier hybrid times.
316
18.4E
  return value_type == ValueType::kTombstone && is_major_compaction_ &&
317
300
                 !retention_.retain_delete_markers_in_major_compaction
318
300
             ? FilterDecision::kDiscard
319
18.4E
             : FilterDecision::kKeep;
320
18.4E
}
321
322
void DocDBCompactionFilter::AssignPrevSubDocKey(
323
10.2M
    const char* data, size_t same_bytes) {
324
10.2M
  size_t size = sub_key_ends_.back();
325
10.2M
  prev_subdoc_key_.resize(size);
326
10.2M
  memcpy(prev_subdoc_key_.data() + same_bytes, data + same_bytes, size - same_bytes);
327
10.2M
}
328
329
330
436
rocksdb::UserFrontierPtr DocDBCompactionFilter::GetLargestUserFrontier() const {
331
436
  auto* consensus_frontier = new ConsensusFrontier();
332
436
  consensus_frontier->set_history_cutoff(retention_.history_cutoff);
333
436
  return rocksdb::UserFrontierPtr(consensus_frontier);
334
436
}
335
336
0
const char* DocDBCompactionFilter::Name() const {
337
0
  return "DocDBCompactionFilter";
338
0
}
339
340
436
std::vector<std::pair<Slice, Slice>> DocDBCompactionFilter::GetLiveRanges() const {
341
436
  static constexpr char kApplyStateEndChar = ValueTypeAsChar::kTransactionApplyState + 1;
342
436
  if (!key_bounds_ || (key_bounds_->lower.empty() && key_bounds_->upper.empty())) {
343
257
    return {};
344
257
  }
345
179
  auto end_apply_state_region = Slice(&kApplyStateEndChar, 1);
346
179
  auto first_range = std::make_pair(Slice(), end_apply_state_region);
347
179
  auto second_range = std::make_pair(
348
179
    key_bounds_->lower.AsSlice().Less(end_apply_state_region)
349
74
        ? end_apply_state_region
350
105
        : key_bounds_->lower.AsSlice(),
351
179
    key_bounds_->upper.AsSlice());
352
353
179
  return {first_range, second_range};
354
179
}
355
356
// ------------------------------------------------------------------------------------------------
357
358
DocDBCompactionFilterFactory::DocDBCompactionFilterFactory(
359
    std::shared_ptr<HistoryRetentionPolicy> retention_policy, const KeyBounds* key_bounds)
360
220k
    : retention_policy_(std::move(retention_policy)), key_bounds_(key_bounds) {
361
220k
}
362
363
208k
DocDBCompactionFilterFactory::~DocDBCompactionFilterFactory() {
364
208k
}
365
366
unique_ptr<CompactionFilter> DocDBCompactionFilterFactory::CreateCompactionFilter(
367
436
    const CompactionFilter::Context& context) {
368
436
  return std::make_unique<DocDBCompactionFilter>(
369
436
      retention_policy_->GetRetentionDirective(),
370
436
      IsMajorCompaction(context.is_full_compaction),
371
436
      key_bounds_);
372
436
}
373
374
1.32M
const char* DocDBCompactionFilterFactory::Name() const {
375
1.32M
  return "DocDBCompactionFilterFactory";
376
1.32M
}
377
378
// ------------------------------------------------------------------------------------------------
379
380
0
HistoryRetentionDirective ManualHistoryRetentionPolicy::GetRetentionDirective() {
381
0
  std::lock_guard<std::mutex> lock(deleted_cols_mtx_);
382
0
  return {history_cutoff_.load(std::memory_order_acquire),
383
0
          std::make_shared<ColumnIds>(deleted_cols_), table_ttl_.load(std::memory_order_acquire),
384
0
          ShouldRetainDeleteMarkersInMajorCompaction::kFalse};
385
0
}
386
387
0
void ManualHistoryRetentionPolicy::SetHistoryCutoff(HybridTime history_cutoff) {
388
0
  history_cutoff_.store(history_cutoff, std::memory_order_release);
389
0
}
390
391
0
void ManualHistoryRetentionPolicy::AddDeletedColumn(ColumnId col) {
392
0
  std::lock_guard<std::mutex> lock(deleted_cols_mtx_);
393
0
  deleted_cols_.insert(col);
394
0
}
395
396
0
void ManualHistoryRetentionPolicy::SetTableTTLForTests(MonoDelta ttl) {
397
0
  table_ttl_.store(ttl, std::memory_order_release);
398
0
}
399
400
}  // namespace docdb
401
}  // namespace yb