/Users/deen/code/yugabyte-db/src/yb/docdb/docdb_compaction_filter_intents.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | // |
13 | | |
14 | | #include "yb/docdb/docdb_compaction_filter_intents.h" |
15 | | |
16 | | #include <memory> |
17 | | |
18 | | #include <glog/logging.h> |
19 | | |
20 | | #include "yb/common/common.pb.h" |
21 | | |
22 | | #include "yb/docdb/doc_kv_util.h" |
23 | | #include "yb/docdb/docdb-internal.h" |
24 | | #include "yb/docdb/intent.h" |
25 | | #include "yb/docdb/value.h" |
26 | | #include "yb/docdb/value_type.h" |
27 | | |
28 | | #include "yb/rocksdb/compaction_filter.h" |
29 | | |
30 | | #include "yb/tablet/tablet.h" |
31 | | #include "yb/tablet/transaction_participant.h" |
32 | | |
33 | | #include "yb/util/atomic.h" |
34 | | #include "yb/util/logging.h" |
35 | | #include "yb/util/string_util.h" |
36 | | |
37 | | DEFINE_uint64(aborted_intent_cleanup_ms, 60000, // 1 minute by default, 1 sec for testing |
38 | | "Duration in ms after which to check if a transaction is aborted."); |
39 | | |
40 | | DEFINE_uint64(aborted_intent_cleanup_max_batch_size, 256, // Cleanup 256 transactions at a time |
41 | | "Number of transactions to collect for possible cleanup."); |
42 | | |
43 | | DEFINE_int32(external_intent_cleanup_secs, 60 * 60 * 24, // 24 hours by default |
44 | | "Duration in secs after which to cleanup external intents."); |
45 | | |
46 | | DEFINE_uint64(intents_compaction_filter_max_errors_to_log, 100, |
47 | | "Maximum number of errors to log for life cycle of the intents compcation filter."); |
48 | | |
49 | | using std::shared_ptr; |
50 | | using std::unique_ptr; |
51 | | using std::unordered_set; |
52 | | using rocksdb::CompactionFilter; |
53 | | using rocksdb::VectorToString; |
54 | | |
55 | | namespace yb { |
56 | | namespace docdb { |
57 | | |
58 | | // ------------------------------------------------------------------------------------------------ |
59 | | |
60 | | namespace { |
61 | | class DocDBIntentsCompactionFilter : public rocksdb::CompactionFilter { |
62 | | public: |
63 | | explicit DocDBIntentsCompactionFilter(tablet::Tablet* tablet, const KeyBounds* key_bounds) |
64 | 0 | : tablet_(tablet), compaction_start_time_(tablet->clock()->Now().GetPhysicalValueMicros()) {} |
65 | | |
66 | | ~DocDBIntentsCompactionFilter() override; |
67 | | |
68 | | rocksdb::FilterDecision Filter( |
69 | | int level, const Slice& key, const Slice& existing_value, std::string* new_value, |
70 | | bool* value_changed) override; |
71 | | |
72 | | const char* Name() const override; |
73 | | |
74 | | void CompactionFinished() override; |
75 | | |
76 | 0 | TransactionIdSet& transactions_to_cleanup() { |
77 | 0 | return transactions_to_cleanup_; |
78 | 0 | } |
79 | | |
80 | | void AddToSet(const TransactionId& transaction_id, TransactionIdSet* set); |
81 | | |
82 | | private: |
83 | | void CleanupTransactions(); |
84 | | |
85 | | std::string LogPrefix() const; |
86 | | |
87 | | Result<boost::optional<TransactionId>> FilterTransactionMetadata( |
88 | | const Slice& key, const Slice& existing_value); |
89 | | |
90 | | Result<rocksdb::FilterDecision> FilterExternalIntent(const Slice& key); |
91 | | |
92 | | tablet::Tablet* const tablet_; |
93 | | const MicrosTime compaction_start_time_; |
94 | | |
95 | | TransactionIdSet transactions_to_cleanup_; |
96 | | int rejected_transactions_ = 0; |
97 | | uint64_t num_errors_ = 0; |
98 | | |
99 | | // We use this to only log a message that the filter is being used once on the first call to |
100 | | // the Filter function. |
101 | | bool filter_usage_logged_ = false; |
102 | | }; |
103 | | |
104 | 0 | #define MAYBE_LOG_ERROR_AND_RETURN_KEEP(result) { \ |
105 | 0 | if (!result.ok()) { \ |
106 | 0 | if (num_errors_ < GetAtomicFlag(&FLAGS_intents_compaction_filter_max_errors_to_log)) { \ |
107 | 0 | LOG_WITH_PREFIX(ERROR) << StatusToString(result.status()); \ |
108 | 0 | } \ |
109 | 0 | num_errors_++; \ |
110 | 0 | return rocksdb::FilterDecision::kKeep; \ |
111 | 0 | } \ |
112 | 0 | } |
113 | | |
114 | 0 | void DocDBIntentsCompactionFilter::CleanupTransactions() { |
115 | 0 | VLOG_WITH_PREFIX(3) << "DocDB intents compaction filter is being deleted"; |
116 | 0 | if (transactions_to_cleanup_.empty()) { |
117 | 0 | return; |
118 | 0 | } |
119 | 0 | TransactionStatusManager* manager = tablet_->transaction_participant(); |
120 | 0 | if (rejected_transactions_ > 0) { |
121 | 0 | LOG_WITH_PREFIX(WARNING) << "Number of aborted transactions not cleaned up " << |
122 | 0 | "on account of reaching size limits:" << rejected_transactions_; |
123 | 0 | } |
124 | 0 | manager->Cleanup(std::move(transactions_to_cleanup_)); |
125 | 0 | } |
126 | | |
127 | 0 | DocDBIntentsCompactionFilter::~DocDBIntentsCompactionFilter() { |
128 | 0 | } |
129 | | |
130 | | rocksdb::FilterDecision DocDBIntentsCompactionFilter::Filter( |
131 | | int level, const Slice& key, const Slice& existing_value, std::string* new_value, |
132 | 0 | bool* value_changed) { |
133 | 0 | if (!filter_usage_logged_) { |
134 | 0 | VLOG_WITH_PREFIX(3) << "DocDB intents compaction filter is being used for a compaction"; |
135 | 0 | filter_usage_logged_ = true; |
136 | 0 | } |
137 | |
|
138 | 0 | if (GetKeyType(key, StorageDbType::kIntents) == KeyType::kExternalIntents) { |
139 | 0 | auto filter_decision_result = FilterExternalIntent(key); |
140 | 0 | MAYBE_LOG_ERROR_AND_RETURN_KEEP(filter_decision_result); |
141 | 0 | return *filter_decision_result; |
142 | 0 | } |
143 | | |
144 | | // Find transaction metadata row. |
145 | 0 | if (GetKeyType(key, StorageDbType::kIntents) == KeyType::kTransactionMetadata) { |
146 | 0 | auto transaction_id_result = FilterTransactionMetadata(key, existing_value); |
147 | 0 | MAYBE_LOG_ERROR_AND_RETURN_KEEP(transaction_id_result); |
148 | 0 | auto transaction_id_optional = *transaction_id_result; |
149 | 0 | if (transaction_id_optional.has_value()) { |
150 | 0 | AddToSet(transaction_id_optional.value(), &transactions_to_cleanup_); |
151 | 0 | } |
152 | 0 | } |
153 | | |
154 | | // TODO(dtxn): If/when we add processing of reverse index or intents here - we will need to |
155 | | // respect key_bounds passed to constructor in order to ignore/delete non-relevant keys. As of |
156 | | // 06/19/2019, intents and reverse indexes are being deleted by docdb::PrepareApplyIntentsBatch. |
157 | 0 | return rocksdb::FilterDecision::kKeep; |
158 | 0 | } |
159 | | |
160 | | Result<boost::optional<TransactionId>> DocDBIntentsCompactionFilter::FilterTransactionMetadata( |
161 | 0 | const Slice& key, const Slice& existing_value) { |
162 | 0 | TransactionMetadataPB metadata_pb; |
163 | 0 | if (!metadata_pb.ParseFromArray( |
164 | 0 | existing_value.cdata(), narrow_cast<int>(existing_value.size()))) { |
165 | 0 | return STATUS(IllegalState, "Failed to parse transaction metadata"); |
166 | 0 | } |
167 | 0 | uint64_t write_time = metadata_pb.metadata_write_time(); |
168 | 0 | if (!write_time) { |
169 | 0 | write_time = HybridTime(metadata_pb.start_hybrid_time()).GetPhysicalValueMicros(); |
170 | 0 | } |
171 | 0 | if (compaction_start_time_ < write_time + FLAGS_aborted_intent_cleanup_ms * 1000) { |
172 | 0 | return boost::none; |
173 | 0 | } |
174 | 0 | Slice key_slice = key; |
175 | 0 | return VERIFY_RESULT_PREPEND( |
176 | 0 | DecodeTransactionIdFromIntentValue(&key_slice), "Could not decode Transaction metadata"); |
177 | 0 | } |
178 | | |
179 | | Result<rocksdb::FilterDecision> |
180 | 0 | DocDBIntentsCompactionFilter::FilterExternalIntent(const Slice& key) { |
181 | 0 | Slice key_slice = key; |
182 | 0 | RETURN_NOT_OK_PREPEND(key_slice.consume_byte(ValueTypeAsChar::kExternalTransactionId), |
183 | 0 | "Could not decode external transaction byte"); |
184 | | // Ignoring transaction id result since function just returns kKeep or kDiscard. |
185 | 0 | RETURN_NOT_OK_PREPEND( |
186 | 0 | DecodeTransactionId(&key_slice), "Could not decode external transaction id"); |
187 | 0 | auto doc_hybrid_time = VERIFY_RESULT_PREPEND( |
188 | 0 | DecodeInvertedDocHt(key_slice), "Could not decode hybrid time"); |
189 | 0 | auto write_time_micros = doc_hybrid_time.hybrid_time().GetPhysicalValueMicros(); |
190 | 0 | int64_t delta_micros = compaction_start_time_ - write_time_micros; |
191 | 0 | if (delta_micros > |
192 | 0 | GetAtomicFlag(&FLAGS_external_intent_cleanup_secs) * MonoTime::kMicrosecondsPerSecond) { |
193 | 0 | return rocksdb::FilterDecision::kDiscard; |
194 | 0 | } |
195 | 0 | return rocksdb::FilterDecision::kKeep; |
196 | 0 | } |
197 | | |
198 | 0 | void DocDBIntentsCompactionFilter::CompactionFinished() { |
199 | 0 | if (num_errors_ > 0) { |
200 | 0 | LOG_WITH_PREFIX(WARNING) << Format( |
201 | 0 | "Found $0 total errors during intents compaction filter.", num_errors_); |
202 | 0 | } |
203 | 0 | CleanupTransactions(); |
204 | 0 | } |
205 | | |
206 | | void DocDBIntentsCompactionFilter::AddToSet(const TransactionId& transaction_id, |
207 | 0 | TransactionIdSet* set) { |
208 | 0 | if (set->size() <= FLAGS_aborted_intent_cleanup_max_batch_size) { |
209 | 0 | set->insert(transaction_id); |
210 | 0 | } else { |
211 | 0 | rejected_transactions_++; |
212 | 0 | } |
213 | 0 | } |
214 | | |
215 | 0 | const char* DocDBIntentsCompactionFilter::Name() const { |
216 | 0 | return "DocDBIntentsCompactionFilter"; |
217 | 0 | } |
218 | | |
219 | 0 | std::string DocDBIntentsCompactionFilter::LogPrefix() const { |
220 | 0 | return Format("T $0: ", tablet_->tablet_id()); |
221 | 0 | } |
222 | | |
223 | | } // namespace |
224 | | |
225 | | // ------------------------------------------------------------------------------------------------ |
226 | | |
227 | | DocDBIntentsCompactionFilterFactory::DocDBIntentsCompactionFilterFactory( |
228 | | tablet::Tablet* tablet, const KeyBounds* key_bounds) |
229 | 107k | : tablet_(tablet), key_bounds_(key_bounds) {} |
230 | | |
231 | 101k | DocDBIntentsCompactionFilterFactory::~DocDBIntentsCompactionFilterFactory() {} |
232 | | |
233 | | std::unique_ptr<CompactionFilter> DocDBIntentsCompactionFilterFactory::CreateCompactionFilter( |
234 | 0 | const CompactionFilter::Context& context) { |
235 | 0 | return std::make_unique<DocDBIntentsCompactionFilter>(tablet_, key_bounds_); |
236 | 0 | } |
237 | | |
238 | 647k | const char* DocDBIntentsCompactionFilterFactory::Name() const { |
239 | 647k | return "DocDBIntentsCompactionFilterFactory"; |
240 | 647k | } |
241 | | |
242 | | } // namespace docdb |
243 | | } // namespace yb |