/Users/deen/code/yugabyte-db/src/yb/rocksdb/utilities/ttl/db_ttl_impl.h
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) 2011 The LevelDB Authors. All rights reserved. |
2 | | // Use of this source code is governed by a BSD-style license that can be |
3 | | // found in the LICENSE file. See the AUTHORS file for names of contributors. |
4 | | // |
5 | | // The following only applies to changes made to this file as part of YugaByte development. |
6 | | // |
7 | | // Portions Copyright (c) YugaByte, Inc. |
8 | | // |
9 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
10 | | // in compliance with the License. You may obtain a copy of the License at |
11 | | // |
12 | | // http://www.apache.org/licenses/LICENSE-2.0 |
13 | | // |
14 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
15 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
16 | | // or implied. See the License for the specific language governing permissions and limitations |
17 | | // under the License. |
18 | | // |
19 | | #ifndef YB_ROCKSDB_UTILITIES_TTL_DB_TTL_IMPL_H |
20 | | #define YB_ROCKSDB_UTILITIES_TTL_DB_TTL_IMPL_H |
21 | | |
22 | | #pragma once |
23 | | |
24 | | #ifndef ROCKSDB_LITE |
25 | | #include <deque> |
26 | | #include <string> |
27 | | #include <vector> |
28 | | |
29 | | #include "yb/rocksdb/db.h" |
30 | | #include "yb/rocksdb/env.h" |
31 | | #include "yb/rocksdb/compaction_filter.h" |
32 | | #include "yb/rocksdb/merge_operator.h" |
33 | | #include "yb/rocksdb/utilities/utility_db.h" |
34 | | #include "yb/rocksdb/utilities/db_ttl.h" |
35 | | #include "yb/rocksdb/db/db_impl.h" |
36 | | |
37 | | #ifdef _WIN32 |
38 | | // Windows API macro interference |
39 | | #undef GetCurrentTime |
40 | | #endif |
41 | | |
42 | | |
43 | | namespace rocksdb { |
44 | | |
45 | | class DBWithTTLImpl : public DBWithTTL { |
46 | | public: |
47 | | static void SanitizeOptions(int32_t ttl, ColumnFamilyOptions* options, |
48 | | Env* env); |
49 | | |
50 | | explicit DBWithTTLImpl(DB* db); |
51 | | |
52 | | virtual ~DBWithTTLImpl(); |
53 | | |
54 | | Status CreateColumnFamilyWithTtl(const ColumnFamilyOptions& options, |
55 | | const std::string& column_family_name, |
56 | | ColumnFamilyHandle** handle, |
57 | | int ttl) override; |
58 | | |
59 | | Status CreateColumnFamily(const ColumnFamilyOptions& options, |
60 | | const std::string& column_family_name, |
61 | | ColumnFamilyHandle** handle) override; |
62 | | |
63 | | using StackableDB::Put; |
64 | | virtual Status Put(const WriteOptions& options, |
65 | | ColumnFamilyHandle* column_family, const Slice& key, |
66 | | const Slice& val) override; |
67 | | |
68 | | using StackableDB::Get; |
69 | | virtual Status Get(const ReadOptions& options, |
70 | | ColumnFamilyHandle* column_family, const Slice& key, |
71 | | std::string* value) override; |
72 | | |
73 | | using StackableDB::MultiGet; |
74 | | virtual std::vector<Status> MultiGet( |
75 | | const ReadOptions& options, |
76 | | const std::vector<ColumnFamilyHandle*>& column_family, |
77 | | const std::vector<Slice>& keys, |
78 | | std::vector<std::string>* values) override; |
79 | | |
80 | | using StackableDB::KeyMayExist; |
81 | | virtual bool KeyMayExist(const ReadOptions& options, |
82 | | ColumnFamilyHandle* column_family, const Slice& key, |
83 | | std::string* value, |
84 | | bool* value_found = nullptr) override; |
85 | | |
86 | | using StackableDB::Merge; |
87 | | virtual Status Merge(const WriteOptions& options, |
88 | | ColumnFamilyHandle* column_family, const Slice& key, |
89 | | const Slice& value) override; |
90 | | |
91 | | virtual Status Write(const WriteOptions& opts, WriteBatch* updates) override; |
92 | | |
93 | | using StackableDB::NewIterator; |
94 | | virtual Iterator* NewIterator(const ReadOptions& opts, |
95 | | ColumnFamilyHandle* column_family) override; |
96 | | |
97 | 0 | virtual DB* GetBaseDB() override { return db_; } |
98 | | |
99 | | static bool IsStale(const Slice& value, int32_t ttl, Env* env); |
100 | | |
101 | | static Status AppendTS(const Slice& val, std::string* val_with_ts, Env* env); |
102 | | |
103 | | static Status SanityCheckTimestamp(const Slice& str); |
104 | | |
105 | | static Status StripTS(std::string* str); |
106 | | |
107 | | static const uint32_t kTSLength = sizeof(int32_t); // size of timestamp |
108 | | |
109 | | static const int32_t kMinTimestamp = 1368146402; // 05/09/2013:5:40PM GMT-8 |
110 | | |
111 | | static const int32_t kMaxTimestamp = 2147483647; // 01/18/2038:7:14PM GMT-8 |
112 | | }; |
113 | | |
114 | | class TtlIterator : public Iterator { |
115 | | |
116 | | public: |
117 | 12 | explicit TtlIterator(Iterator* iter) : iter_(iter) { assert(iter_); } |
118 | | |
119 | 12 | ~TtlIterator() { delete iter_; } |
120 | | |
121 | 135 | bool Valid() const override { return iter_->Valid(); } |
122 | | |
123 | 7 | void SeekToFirst() override { iter_->SeekToFirst(); } |
124 | | |
125 | 0 | void SeekToLast() override { iter_->SeekToLast(); } |
126 | | |
127 | 8 | void Seek(const Slice& target) override { iter_->Seek(target); } |
128 | | |
129 | 121 | void Next() override { iter_->Next(); } |
130 | | |
131 | 0 | void Prev() override { iter_->Prev(); } |
132 | | |
133 | 11 | Slice key() const override { return iter_->key(); } |
134 | | |
135 | 0 | int32_t timestamp() const { |
136 | 0 | return DecodeFixed32(iter_->value().data() + iter_->value().size() - |
137 | 0 | DBWithTTLImpl::kTSLength); |
138 | 0 | } |
139 | | |
140 | 121 | Slice value() const override { |
141 | | // TODO: handle timestamp corruption like in general iterator semantics |
142 | 121 | assert(DBWithTTLImpl::SanityCheckTimestamp(iter_->value()).ok()); |
143 | 121 | Slice trimmed_value = iter_->value(); |
144 | 121 | trimmed_value.remove_suffix(DBWithTTLImpl::kTSLength); |
145 | 121 | return trimmed_value; |
146 | 121 | } |
147 | | |
148 | 7 | Status status() const override { return iter_->status(); } |
149 | | |
150 | | private: |
151 | | Iterator* iter_; |
152 | | }; |
153 | | |
154 | | class TtlCompactionFilter : public CompactionFilter { |
155 | | public: |
156 | | TtlCompactionFilter( |
157 | | int32_t ttl, Env* env, CompactionFilter* user_comp_filter, |
158 | | std::unique_ptr<CompactionFilter> user_comp_filter_from_factory = nullptr) |
159 | | : ttl_(ttl), |
160 | | env_(env), |
161 | | user_comp_filter_(user_comp_filter), |
162 | | user_comp_filter_from_factory_( |
163 | 32 | std::move(user_comp_filter_from_factory)) { |
164 | | // Unlike the merge operator, compaction filter is necessary for TTL, hence |
165 | | // this would be called even if user doesn't specify any compaction-filter |
166 | 32 | if (!user_comp_filter_) { |
167 | 32 | user_comp_filter_ = user_comp_filter_from_factory_.get(); |
168 | 32 | } |
169 | 32 | } |
170 | | |
171 | | FilterDecision Filter(int level, const Slice& key, const Slice& old_val, |
172 | 2.33k | std::string* new_val, bool* value_changed) override { |
173 | 2.33k | if (DBWithTTLImpl::IsStale(old_val, ttl_, env_)) { |
174 | 707 | return FilterDecision::kDiscard; |
175 | 707 | } |
176 | 1.62k | if (user_comp_filter_ == nullptr) { |
177 | 1.38k | return FilterDecision::kKeep; |
178 | 1.38k | } |
179 | 237 | assert(old_val.size() >= DBWithTTLImpl::kTSLength); |
180 | 237 | Slice old_val_without_ts(old_val.data(), |
181 | 237 | old_val.size() - DBWithTTLImpl::kTSLength); |
182 | 237 | if (user_comp_filter_->Filter(level, key, old_val_without_ts, new_val, |
183 | 33 | value_changed) != FilterDecision::kKeep) { |
184 | 33 | return FilterDecision::kDiscard; |
185 | 33 | } |
186 | 204 | if (*value_changed) { |
187 | 102 | new_val->append(old_val.cend() - DBWithTTLImpl::kTSLength, DBWithTTLImpl::kTSLength); |
188 | 102 | } |
189 | 204 | return FilterDecision::kKeep; |
190 | 204 | } |
191 | | |
192 | 0 | virtual const char* Name() const override { return "Delete By TTL"; } |
193 | | |
194 | | private: |
195 | | int32_t ttl_; |
196 | | Env* env_; |
197 | | CompactionFilter* user_comp_filter_; |
198 | | std::unique_ptr<CompactionFilter> user_comp_filter_from_factory_; |
199 | | }; |
200 | | |
201 | | class TtlCompactionFilterFactory : public CompactionFilterFactory { |
202 | | public: |
203 | | TtlCompactionFilterFactory( |
204 | | int32_t ttl, Env* env, |
205 | | std::shared_ptr<CompactionFilterFactory> comp_filter_factory) |
206 | 39 | : ttl_(ttl), env_(env), user_comp_filter_factory_(comp_filter_factory) {} |
207 | | |
208 | | std::unique_ptr<CompactionFilter> CreateCompactionFilter( |
209 | 32 | const CompactionFilter::Context& context) override { |
210 | 32 | std::unique_ptr<CompactionFilter> user_comp_filter_from_factory; |
211 | 32 | if (user_comp_filter_factory_) { |
212 | 4 | user_comp_filter_from_factory = |
213 | 4 | user_comp_filter_factory_->CreateCompactionFilter(context); |
214 | 4 | } |
215 | | |
216 | 32 | return std::make_unique<TtlCompactionFilter>( |
217 | 32 | ttl_, env_, nullptr, std::move(user_comp_filter_from_factory)); |
218 | 32 | } |
219 | | |
220 | 119 | virtual const char* Name() const override { |
221 | 119 | return "TtlCompactionFilterFactory"; |
222 | 119 | } |
223 | | |
224 | | private: |
225 | | int32_t ttl_; |
226 | | Env* env_; |
227 | | std::shared_ptr<CompactionFilterFactory> user_comp_filter_factory_; |
228 | | }; |
229 | | |
230 | | class TtlMergeOperator : public MergeOperator { |
231 | | |
232 | | public: |
233 | | explicit TtlMergeOperator(const std::shared_ptr<MergeOperator>& merge_op, |
234 | | Env* env) |
235 | 15 | : user_merge_op_(merge_op), env_(env) { |
236 | 15 | assert(merge_op); |
237 | 15 | assert(env); |
238 | 15 | } |
239 | | |
240 | | virtual bool FullMerge(const Slice& key, const Slice* existing_value, |
241 | | const std::deque<std::string>& operands, |
242 | | std::string* new_value, Logger* logger) const |
243 | 586 | override { |
244 | 586 | constexpr uint32_t kTsLen = DBWithTTLImpl::kTSLength; |
245 | 586 | if (existing_value && existing_value->size() < kTsLen) { |
246 | 0 | RLOG(InfoLogLevel::ERROR_LEVEL, logger, |
247 | 0 | "Error: Could not remove timestamp from existing value."); |
248 | 0 | return false; |
249 | 0 | } |
250 | | |
251 | | // Extract time-stamp from each operand to be passed to user_merge_op_ |
252 | 586 | std::deque<std::string> operands_without_ts; |
253 | 22.0k | for (const auto& operand : operands) { |
254 | 22.0k | if (operand.size() < kTsLen) { |
255 | 0 | RLOG(InfoLogLevel::ERROR_LEVEL, logger, |
256 | 0 | "Error: Could not remove timestamp from operand value."); |
257 | 0 | return false; |
258 | 0 | } |
259 | 22.0k | operands_without_ts.push_back(operand.substr(0, operand.size() - kTsLen)); |
260 | 22.0k | } |
261 | | |
262 | | // Apply the user merge operator (store result in *new_value) |
263 | 586 | bool good = true; |
264 | 586 | if (existing_value) { |
265 | 35 | Slice existing_value_without_ts(existing_value->data(), |
266 | 35 | existing_value->size() - kTsLen); |
267 | 35 | good = user_merge_op_->FullMerge(key, &existing_value_without_ts, |
268 | 35 | operands_without_ts, new_value, logger); |
269 | 551 | } else { |
270 | 551 | good = user_merge_op_->FullMerge(key, nullptr, operands_without_ts, |
271 | 551 | new_value, logger); |
272 | 551 | } |
273 | | |
274 | | // Return false if the user merge operator returned false |
275 | 586 | if (!good) { |
276 | 0 | return false; |
277 | 0 | } |
278 | | |
279 | | // Augment the *new_value with the ttl time-stamp |
280 | 586 | int64_t curtime; |
281 | 586 | if (!env_->GetCurrentTime(&curtime).ok()) { |
282 | 0 | RLOG(InfoLogLevel::ERROR_LEVEL, logger, |
283 | 0 | "Error: Could not get current time to be attached internally " |
284 | 0 | "to the new value."); |
285 | 0 | return false; |
286 | 586 | } else { |
287 | 586 | char ts_string[kTsLen]; |
288 | 586 | EncodeFixed32(ts_string, static_cast<uint32_t>(curtime)); |
289 | 586 | new_value->append(ts_string, kTsLen); |
290 | 586 | return true; |
291 | 586 | } |
292 | 586 | } |
293 | | |
294 | | virtual bool PartialMergeMulti(const Slice& key, |
295 | | const std::deque<Slice>& operand_list, |
296 | | std::string* new_value, Logger* logger) const |
297 | 14 | override { |
298 | 14 | constexpr uint32_t kTsLen = DBWithTTLImpl::kTSLength; |
299 | 14 | std::deque<Slice> operands_without_ts; |
300 | | |
301 | 39 | for (const auto& operand : operand_list) { |
302 | 39 | if (operand.size() < kTsLen) { |
303 | 0 | RLOG(InfoLogLevel::ERROR_LEVEL, logger, |
304 | 0 | "Error: Could not remove timestamp from value."); |
305 | 0 | return false; |
306 | 0 | } |
307 | | |
308 | 39 | operands_without_ts.push_back( |
309 | 39 | Slice(operand.data(), operand.size() - kTsLen)); |
310 | 39 | } |
311 | | |
312 | | // Apply the user partial-merge operator (store result in *new_value) |
313 | 14 | assert(new_value); |
314 | 14 | if (!user_merge_op_->PartialMergeMulti(key, operands_without_ts, new_value, |
315 | 14 | logger)) { |
316 | 14 | return false; |
317 | 14 | } |
318 | | |
319 | | // Augment the *new_value with the ttl time-stamp |
320 | 0 | int64_t curtime; |
321 | 0 | if (!env_->GetCurrentTime(&curtime).ok()) { |
322 | 0 | RLOG(InfoLogLevel::ERROR_LEVEL, logger, |
323 | 0 | "Error: Could not get current time to be attached internally " |
324 | 0 | "to the new value."); |
325 | 0 | return false; |
326 | 0 | } else { |
327 | 0 | char ts_string[kTsLen]; |
328 | 0 | EncodeFixed32(ts_string, static_cast<uint32_t>(curtime)); |
329 | 0 | new_value->append(ts_string, kTsLen); |
330 | 0 | return true; |
331 | 0 | } |
332 | 0 | } |
333 | | |
334 | 45 | virtual const char* Name() const override { return "Merge By TTL"; } |
335 | | |
336 | | private: |
337 | | std::shared_ptr<MergeOperator> user_merge_op_; |
338 | | Env* env_; |
339 | | }; |
340 | | } // namespace rocksdb |
341 | | #endif // ROCKSDB_LITE |
342 | | |
343 | | #endif // YB_ROCKSDB_UTILITIES_TTL_DB_TTL_IMPL_H |