YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/docdb/docdb_compaction_filter_intents.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include "yb/docdb/docdb_compaction_filter_intents.h"
15
16
#include <memory>
17
18
#include <glog/logging.h>
19
20
#include "yb/common/common.pb.h"
21
22
#include "yb/docdb/doc_kv_util.h"
23
#include "yb/docdb/docdb-internal.h"
24
#include "yb/docdb/intent.h"
25
#include "yb/docdb/value.h"
26
#include "yb/docdb/value_type.h"
27
28
#include "yb/rocksdb/compaction_filter.h"
29
30
#include "yb/tablet/tablet.h"
31
#include "yb/tablet/transaction_participant.h"
32
33
#include "yb/util/atomic.h"
34
#include "yb/util/logging.h"
35
#include "yb/util/string_util.h"
36
37
DEFINE_uint64(aborted_intent_cleanup_ms, 60000, // 1 minute by default, 1 sec for testing
38
             "Duration in ms after which to check if a transaction is aborted.");
39
40
DEFINE_uint64(aborted_intent_cleanup_max_batch_size, 256, // Cleanup 256 transactions at a time
41
              "Number of transactions to collect for possible cleanup.");
42
43
DEFINE_int32(external_intent_cleanup_secs, 60 * 60 * 24, // 24 hours by default
44
             "Duration in secs after which to cleanup external intents.");
45
46
DEFINE_uint64(intents_compaction_filter_max_errors_to_log, 100,
47
              "Maximum number of errors to log for life cycle of the intents compcation filter.");
48
49
using std::shared_ptr;
50
using std::unique_ptr;
51
using std::unordered_set;
52
using rocksdb::CompactionFilter;
53
using rocksdb::VectorToString;
54
55
namespace yb {
56
namespace docdb {
57
58
// ------------------------------------------------------------------------------------------------
59
60
namespace {
61
class DocDBIntentsCompactionFilter : public rocksdb::CompactionFilter {
62
 public:
63
  explicit DocDBIntentsCompactionFilter(tablet::Tablet* tablet, const KeyBounds* key_bounds)
64
0
      : tablet_(tablet), compaction_start_time_(tablet->clock()->Now().GetPhysicalValueMicros()) {}
65
66
  ~DocDBIntentsCompactionFilter() override;
67
68
  rocksdb::FilterDecision Filter(
69
      int level, const Slice& key, const Slice& existing_value, std::string* new_value,
70
      bool* value_changed) override;
71
72
  const char* Name() const override;
73
74
  void CompactionFinished() override;
75
76
0
  TransactionIdSet& transactions_to_cleanup() {
77
0
    return transactions_to_cleanup_;
78
0
  }
79
80
  void AddToSet(const TransactionId& transaction_id, TransactionIdSet* set);
81
82
 private:
83
  void CleanupTransactions();
84
85
  std::string LogPrefix() const;
86
87
  Result<boost::optional<TransactionId>> FilterTransactionMetadata(
88
      const Slice& key, const Slice& existing_value);
89
90
  Result<rocksdb::FilterDecision> FilterExternalIntent(const Slice& key);
91
92
  tablet::Tablet* const tablet_;
93
  const MicrosTime compaction_start_time_;
94
95
  TransactionIdSet transactions_to_cleanup_;
96
  int rejected_transactions_ = 0;
97
  uint64_t num_errors_ = 0;
98
99
  // We use this to only log a message that the filter is being used once on the first call to
100
  // the Filter function.
101
  bool filter_usage_logged_ = false;
102
};
103
104
0
#define MAYBE_LOG_ERROR_AND_RETURN_KEEP(result) { \
105
0
  if (!result.ok()) { \
106
0
    if (num_errors_ < GetAtomicFlag(&FLAGS_intents_compaction_filter_max_errors_to_log)) { \
107
0
      LOG_WITH_PREFIX(ERROR) << StatusToString(result.status()); \
108
0
    } \
109
0
    num_errors_++; \
110
0
    return rocksdb::FilterDecision::kKeep; \
111
0
  } \
112
0
}
113
114
0
void DocDBIntentsCompactionFilter::CleanupTransactions() {
115
0
  VLOG_WITH_PREFIX(3) << "DocDB intents compaction filter is being deleted";
116
0
  if (transactions_to_cleanup_.empty()) {
117
0
    return;
118
0
  }
119
0
  TransactionStatusManager* manager = tablet_->transaction_participant();
120
0
  if (rejected_transactions_ > 0) {
121
0
    LOG_WITH_PREFIX(WARNING) << "Number of aborted transactions not cleaned up " <<
122
0
                                "on account of reaching size limits:" << rejected_transactions_;
123
0
  }
124
0
  manager->Cleanup(std::move(transactions_to_cleanup_));
125
0
}
126
127
0
DocDBIntentsCompactionFilter::~DocDBIntentsCompactionFilter() {
128
0
}
129
130
rocksdb::FilterDecision DocDBIntentsCompactionFilter::Filter(
131
    int level, const Slice& key, const Slice& existing_value, std::string* new_value,
132
0
    bool* value_changed) {
133
0
  if (!filter_usage_logged_) {
134
0
    VLOG_WITH_PREFIX(3) << "DocDB intents compaction filter is being used for a compaction";
135
0
    filter_usage_logged_ = true;
136
0
  }
137
138
0
  if (GetKeyType(key, StorageDbType::kIntents) == KeyType::kExternalIntents) {
139
0
    auto filter_decision_result = FilterExternalIntent(key);
140
0
    MAYBE_LOG_ERROR_AND_RETURN_KEEP(filter_decision_result);
141
0
    return *filter_decision_result;
142
0
  }
143
144
  // Find transaction metadata row.
145
0
  if (GetKeyType(key, StorageDbType::kIntents) == KeyType::kTransactionMetadata) {
146
0
    auto transaction_id_result = FilterTransactionMetadata(key, existing_value);
147
0
    MAYBE_LOG_ERROR_AND_RETURN_KEEP(transaction_id_result);
148
0
    auto transaction_id_optional = *transaction_id_result;
149
0
    if (transaction_id_optional.has_value()) {
150
0
      AddToSet(transaction_id_optional.value(), &transactions_to_cleanup_);
151
0
    }
152
0
  }
153
154
  // TODO(dtxn): If/when we add processing of reverse index or intents here - we will need to
155
  // respect key_bounds passed to constructor in order to ignore/delete non-relevant keys. As of
156
  // 06/19/2019, intents and reverse indexes are being deleted by docdb::PrepareApplyIntentsBatch.
157
0
  return rocksdb::FilterDecision::kKeep;
158
0
}
159
160
Result<boost::optional<TransactionId>> DocDBIntentsCompactionFilter::FilterTransactionMetadata(
161
0
    const Slice& key, const Slice& existing_value) {
162
0
  TransactionMetadataPB metadata_pb;
163
0
  if (!metadata_pb.ParseFromArray(
164
0
          existing_value.cdata(), narrow_cast<int>(existing_value.size()))) {
165
0
    return STATUS(IllegalState, "Failed to parse transaction metadata");
166
0
  }
167
0
  uint64_t write_time = metadata_pb.metadata_write_time();
168
0
  if (!write_time) {
169
0
    write_time = HybridTime(metadata_pb.start_hybrid_time()).GetPhysicalValueMicros();
170
0
  }
171
0
  if (compaction_start_time_ < write_time + FLAGS_aborted_intent_cleanup_ms * 1000) {
172
0
    return boost::none;
173
0
  }
174
0
  Slice key_slice = key;
175
0
  return VERIFY_RESULT_PREPEND(
176
0
      DecodeTransactionIdFromIntentValue(&key_slice), "Could not decode Transaction metadata");
177
0
}
178
179
Result<rocksdb::FilterDecision>
180
0
DocDBIntentsCompactionFilter::FilterExternalIntent(const Slice& key) {
181
0
  Slice key_slice = key;
182
0
  RETURN_NOT_OK_PREPEND(key_slice.consume_byte(ValueTypeAsChar::kExternalTransactionId),
183
0
                        "Could not decode external transaction byte");
184
  // Ignoring transaction id result since function just returns kKeep or kDiscard.
185
0
  RETURN_NOT_OK_PREPEND(
186
0
      DecodeTransactionId(&key_slice), "Could not decode external transaction id");
187
0
  auto doc_hybrid_time = VERIFY_RESULT_PREPEND(
188
0
      DecodeInvertedDocHt(key_slice), "Could not decode hybrid time");
189
0
  auto write_time_micros = doc_hybrid_time.hybrid_time().GetPhysicalValueMicros();
190
0
  int64_t delta_micros = compaction_start_time_ - write_time_micros;
191
0
  if (delta_micros >
192
0
      GetAtomicFlag(&FLAGS_external_intent_cleanup_secs) * MonoTime::kMicrosecondsPerSecond) {
193
0
    return rocksdb::FilterDecision::kDiscard;
194
0
  }
195
0
  return rocksdb::FilterDecision::kKeep;
196
0
}
197
198
0
void DocDBIntentsCompactionFilter::CompactionFinished() {
199
0
  if (num_errors_ > 0) {
200
0
    LOG_WITH_PREFIX(WARNING) << Format(
201
0
        "Found $0 total errors during intents compaction filter.", num_errors_);
202
0
  }
203
0
  CleanupTransactions();
204
0
}
205
206
void DocDBIntentsCompactionFilter::AddToSet(const TransactionId& transaction_id,
207
0
                                            TransactionIdSet* set) {
208
0
  if (set->size() <= FLAGS_aborted_intent_cleanup_max_batch_size) {
209
0
    set->insert(transaction_id);
210
0
  } else {
211
0
    rejected_transactions_++;
212
0
  }
213
0
}
214
215
0
const char* DocDBIntentsCompactionFilter::Name() const {
216
0
  return "DocDBIntentsCompactionFilter";
217
0
}
218
219
0
std::string DocDBIntentsCompactionFilter::LogPrefix() const {
220
0
  return Format("T $0: ", tablet_->tablet_id());
221
0
}
222
223
} // namespace
224
225
// ------------------------------------------------------------------------------------------------
226
227
DocDBIntentsCompactionFilterFactory::DocDBIntentsCompactionFilterFactory(
228
    tablet::Tablet* tablet, const KeyBounds* key_bounds)
229
107k
    : tablet_(tablet), key_bounds_(key_bounds) {}
230
231
101k
DocDBIntentsCompactionFilterFactory::~DocDBIntentsCompactionFilterFactory() {}
232
233
std::unique_ptr<CompactionFilter> DocDBIntentsCompactionFilterFactory::CreateCompactionFilter(
234
0
    const CompactionFilter::Context& context) {
235
0
  return std::make_unique<DocDBIntentsCompactionFilter>(tablet_, key_bounds_);
236
0
}
237
238
647k
const char* DocDBIntentsCompactionFilterFactory::Name() const {
239
647k
  return "DocDBIntentsCompactionFilterFactory";
240
647k
}
241
242
}  // namespace docdb
243
}  // namespace yb