/Users/deen/code/yugabyte-db/src/yb/rocksdb/utilities/ttl/db_ttl_impl.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) 2011 The LevelDB Authors. All rights reserved. |
2 | | // Use of this source code is governed by a BSD-style license that can be |
3 | | // found in the LICENSE file. See the AUTHORS file for names of contributors. |
4 | | // |
5 | | // The following only applies to changes made to this file as part of YugaByte development. |
6 | | // |
7 | | // Portions Copyright (c) YugaByte, Inc. |
8 | | // |
9 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
10 | | // in compliance with the License. You may obtain a copy of the License at |
11 | | // |
12 | | // http://www.apache.org/licenses/LICENSE-2.0 |
13 | | // |
14 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
15 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
16 | | // or implied. See the License for the specific language governing permissions and limitations |
17 | | // under the License. |
18 | | // |
19 | | #ifndef ROCKSDB_LITE |
20 | | |
21 | | #include "yb/rocksdb/utilities/ttl/db_ttl_impl.h" |
22 | | |
23 | | #include "yb/rocksdb/convenience.h" |
24 | | #include "yb/rocksdb/env.h" |
25 | | #include "yb/rocksdb/iterator.h" |
26 | | #include "yb/rocksdb/util/coding.h" |
27 | | |
28 | | namespace rocksdb { |
29 | | |
30 | | void DBWithTTLImpl::SanitizeOptions(int32_t ttl, ColumnFamilyOptions* options, |
31 | 39 | Env* env) { |
32 | 39 | if (options->compaction_filter) { |
33 | 0 | options->compaction_filter = |
34 | 0 | new TtlCompactionFilter(ttl, env, options->compaction_filter); |
35 | 39 | } else { |
36 | 39 | options->compaction_filter_factory = |
37 | 39 | std::shared_ptr<CompactionFilterFactory>(new TtlCompactionFilterFactory( |
38 | 39 | ttl, env, options->compaction_filter_factory)); |
39 | 39 | } |
40 | | |
41 | 39 | if (options->merge_operator) { |
42 | 15 | options->merge_operator.reset( |
43 | 15 | new TtlMergeOperator(options->merge_operator, env)); |
44 | 15 | } |
45 | 39 | } |
46 | | |
47 | | // Open the db inside DBWithTTLImpl because options needs pointer to its ttl |
48 | 37 | DBWithTTLImpl::DBWithTTLImpl(DB* db) : DBWithTTL(db) {} |
49 | | |
50 | 37 | DBWithTTLImpl::~DBWithTTLImpl() { |
51 | | // Need to stop background compaction before getting rid of the filter |
52 | 37 | CancelAllBackgroundWork(db_, /* wait = */ true); |
53 | 37 | delete GetOptions().compaction_filter; |
54 | 37 | } |
55 | | |
56 | | Status UtilityDB::OpenTtlDB(const Options& options, const std::string& dbname, |
57 | 0 | StackableDB** dbptr, int32_t ttl, bool read_only) { |
58 | 0 | DBWithTTL* db; |
59 | 0 | Status s = DBWithTTL::Open(options, dbname, &db, ttl, read_only); |
60 | 0 | if (s.ok()) { |
61 | 0 | *dbptr = db; |
62 | 0 | } else { |
63 | 0 | *dbptr = nullptr; |
64 | 0 | } |
65 | 0 | return s; |
66 | 0 | } |
67 | | |
68 | | Status DBWithTTL::Open(const Options& options, const std::string& dbname, |
69 | 36 | DBWithTTL** dbptr, int32_t ttl, bool read_only) { |
70 | | |
71 | 36 | DBOptions db_options(options); |
72 | 36 | ColumnFamilyOptions cf_options(options); |
73 | 36 | std::vector<ColumnFamilyDescriptor> column_families; |
74 | 36 | column_families.push_back( |
75 | 36 | ColumnFamilyDescriptor(kDefaultColumnFamilyName, cf_options)); |
76 | 36 | std::vector<ColumnFamilyHandle*> handles; |
77 | 36 | Status s = DBWithTTL::Open(db_options, dbname, column_families, &handles, |
78 | 36 | dbptr, {ttl}, read_only); |
79 | 36 | if (s.ok()) { |
80 | 36 | assert(handles.size() == 1); |
81 | | // i can delete the handle since DBImpl is always holding a reference to |
82 | | // default column family |
83 | 0 | delete handles[0]; |
84 | 36 | } |
85 | 0 | return s; |
86 | 36 | } |
87 | | |
88 | | Status DBWithTTL::Open( |
89 | | const DBOptions& db_options, const std::string& dbname, |
90 | | const std::vector<ColumnFamilyDescriptor>& column_families, |
91 | | std::vector<ColumnFamilyHandle*>* handles, DBWithTTL** dbptr, |
92 | 37 | std::vector<int32_t> ttls, bool read_only) { |
93 | | |
94 | 37 | if (ttls.size() != column_families.size()) { |
95 | 0 | return STATUS(InvalidArgument, |
96 | 0 | "ttls size has to be the same as number of column families"); |
97 | 0 | } |
98 | | |
99 | 37 | std::vector<ColumnFamilyDescriptor> column_families_sanitized = |
100 | 37 | column_families; |
101 | 75 | for (size_t i = 0; i < column_families_sanitized.size(); ++i38 ) { |
102 | 38 | DBWithTTLImpl::SanitizeOptions( |
103 | 38 | ttls[i], &column_families_sanitized[i].options, |
104 | 38 | db_options.env == nullptr ? Env::Default()0 : db_options.env); |
105 | 38 | } |
106 | 37 | DB* db; |
107 | | |
108 | 37 | Status st; |
109 | 37 | if (read_only) { |
110 | 1 | st = DB::OpenForReadOnly(db_options, dbname, column_families_sanitized, |
111 | 1 | handles, &db); |
112 | 36 | } else { |
113 | 36 | st = DB::Open(db_options, dbname, column_families_sanitized, handles, &db); |
114 | 36 | } |
115 | 37 | if (st.ok()) { |
116 | 37 | *dbptr = new DBWithTTLImpl(db); |
117 | 37 | } else { |
118 | 0 | *dbptr = nullptr; |
119 | 0 | } |
120 | 37 | return st; |
121 | 37 | } |
122 | | |
123 | | Status DBWithTTLImpl::CreateColumnFamilyWithTtl( |
124 | | const ColumnFamilyOptions& options, const std::string& column_family_name, |
125 | 1 | ColumnFamilyHandle** handle, int ttl) { |
126 | 1 | ColumnFamilyOptions sanitized_options = options; |
127 | 1 | DBWithTTLImpl::SanitizeOptions(ttl, &sanitized_options, GetEnv()); |
128 | | |
129 | 1 | return DBWithTTL::CreateColumnFamily(sanitized_options, column_family_name, |
130 | 1 | handle); |
131 | 1 | } |
132 | | |
133 | | Status DBWithTTLImpl::CreateColumnFamily(const ColumnFamilyOptions& options, |
134 | | const std::string& column_family_name, |
135 | 0 | ColumnFamilyHandle** handle) { |
136 | 0 | return CreateColumnFamilyWithTtl(options, column_family_name, handle, 0); |
137 | 0 | } |
138 | | |
139 | | // Appends the current timestamp to the string. |
140 | | // Returns false if could not get the current_time, true if append succeeds |
141 | | Status DBWithTTLImpl::AppendTS(const Slice& val, std::string* val_with_ts, |
142 | 2.66k | Env* env) { |
143 | 2.66k | val_with_ts->reserve(kTSLength + val.size()); |
144 | 2.66k | char ts_string[kTSLength]; |
145 | 2.66k | int64_t curtime; |
146 | 2.66k | Status st = env->GetCurrentTime(&curtime); |
147 | 2.66k | if (!st.ok()) { |
148 | 0 | return st; |
149 | 0 | } |
150 | 2.66k | EncodeFixed32(ts_string, (int32_t)curtime); |
151 | 2.66k | val_with_ts->append(val.cdata(), val.size()); |
152 | 2.66k | val_with_ts->append(ts_string, kTSLength); |
153 | 2.66k | return st; |
154 | 2.66k | } |
155 | | |
156 | | // Returns corruption if the length of the string is lesser than timestamp, or |
157 | | // timestamp refers to a time lesser than ttl-feature release time |
158 | 2.14k | Status DBWithTTLImpl::SanityCheckTimestamp(const Slice& str) { |
159 | 2.14k | if (str.size() < kTSLength) { |
160 | 0 | return STATUS(Corruption, "Error: value's length less than timestamp's\n"); |
161 | 0 | } |
162 | | // Checks that TS is not lesser than kMinTimestamp |
163 | | // Gaurds against corruption & normal database opened incorrectly in ttl mode |
164 | 2.14k | int32_t timestamp_value = DecodeFixed32(str.data() + str.size() - kTSLength); |
165 | 2.14k | if (timestamp_value < kMinTimestamp) { |
166 | 0 | return STATUS(Corruption, "Error: Timestamp < ttl feature release time!\n"); |
167 | 0 | } |
168 | 2.14k | return Status::OK(); |
169 | 2.14k | } |
170 | | |
171 | | // Checks if the string is stale or not according to TTl provided |
172 | 2.33k | bool DBWithTTLImpl::IsStale(const Slice& value, int32_t ttl, Env* env) { |
173 | 2.33k | if (ttl <= 0) { // Data is fresh if TTL is non-positive |
174 | 372 | return false; |
175 | 372 | } |
176 | 1.95k | int64_t curtime; |
177 | 1.95k | if (!env->GetCurrentTime(&curtime).ok()) { |
178 | 0 | return false; // Treat the data as fresh if could not get current time |
179 | 0 | } |
180 | 1.95k | int32_t timestamp_value = |
181 | 1.95k | DecodeFixed32(value.data() + value.size() - kTSLength); |
182 | 1.95k | return (timestamp_value + ttl) < curtime; |
183 | 1.95k | } |
184 | | |
185 | | // Strips the TS from the end of the string |
186 | 2.02k | Status DBWithTTLImpl::StripTS(std::string* str) { |
187 | 2.02k | Status st; |
188 | 2.02k | if (str->length() < kTSLength) { |
189 | 0 | return STATUS(Corruption, "Bad timestamp in key-value"); |
190 | 0 | } |
191 | | // Erasing characters which hold the TS |
192 | 2.02k | str->erase(str->length() - kTSLength, kTSLength); |
193 | 2.02k | return st; |
194 | 2.02k | } |
195 | | |
196 | | Status DBWithTTLImpl::Put(const WriteOptions& options, |
197 | | ColumnFamilyHandle* column_family, const Slice& key, |
198 | 1.87k | const Slice& val) { |
199 | 1.87k | WriteBatch batch; |
200 | 1.87k | batch.Put(column_family, key, val); |
201 | 1.87k | return Write(options, &batch); |
202 | 1.87k | } |
203 | | |
204 | | Status DBWithTTLImpl::Get(const ReadOptions& options, |
205 | | ColumnFamilyHandle* column_family, const Slice& key, |
206 | 2.71k | std::string* value) { |
207 | 2.71k | Status st = db_->Get(options, column_family, key, value); |
208 | 2.71k | if (!st.ok()) { |
209 | 896 | return st; |
210 | 896 | } |
211 | 1.82k | st = SanityCheckTimestamp(*value); |
212 | 1.82k | if (!st.ok()) { |
213 | 0 | return st; |
214 | 0 | } |
215 | 1.82k | return StripTS(value); |
216 | 1.82k | } |
217 | | |
218 | | std::vector<Status> DBWithTTLImpl::MultiGet( |
219 | | const ReadOptions& options, |
220 | | const std::vector<ColumnFamilyHandle*>& column_family, |
221 | 1 | const std::vector<Slice>& keys, std::vector<std::string>* values) { |
222 | 1 | auto statuses = db_->MultiGet(options, column_family, keys, values); |
223 | 101 | for (size_t i = 0; i < keys.size(); ++i100 ) { |
224 | 100 | if (!statuses[i].ok()) { |
225 | 0 | continue; |
226 | 0 | } |
227 | 100 | statuses[i] = SanityCheckTimestamp((*values)[i]); |
228 | 100 | if (!statuses[i].ok()) { |
229 | 0 | continue; |
230 | 0 | } |
231 | 100 | statuses[i] = StripTS(&(*values)[i]); |
232 | 100 | } |
233 | 1 | return statuses; |
234 | 1 | } |
235 | | |
236 | | bool DBWithTTLImpl::KeyMayExist(const ReadOptions& options, |
237 | | ColumnFamilyHandle* column_family, |
238 | | const Slice& key, std::string* value, |
239 | 100 | bool* value_found) { |
240 | 100 | bool ret = db_->KeyMayExist(options, column_family, key, value, value_found); |
241 | 100 | if (ret && value != nullptr && value_found != nullptr && *value_found) { |
242 | 100 | if (!SanityCheckTimestamp(*value).ok() || !StripTS(value).ok()) { |
243 | 0 | return false; |
244 | 0 | } |
245 | 100 | } |
246 | 100 | return ret; |
247 | 100 | } |
248 | | |
249 | | Status DBWithTTLImpl::Merge(const WriteOptions& options, |
250 | | ColumnFamilyHandle* column_family, const Slice& key, |
251 | 683 | const Slice& value) { |
252 | 683 | WriteBatch batch; |
253 | 683 | batch.Merge(column_family, key, value); |
254 | 683 | return Write(options, &batch); |
255 | 683 | } |
256 | | |
257 | 2.55k | Status DBWithTTLImpl::Write(const WriteOptions& opts, WriteBatch* updates) { |
258 | 2.55k | class Handler : public WriteBatch::Handler { |
259 | 2.55k | public: |
260 | 2.55k | explicit Handler(Env* env) : env_(env) {} |
261 | 2.55k | WriteBatch updates_ttl; |
262 | 2.55k | Status batch_rewrite_status; |
263 | | |
264 | 2.55k | Status PutCF( |
265 | 2.55k | uint32_t column_family_id, const SliceParts& key, const SliceParts& value) override { |
266 | 1.97k | std::string value_with_ts; |
267 | 1.97k | Status st = AppendTS(value.TheOnlyPart(), &value_with_ts, env_); |
268 | 1.97k | if (!st.ok()) { |
269 | 0 | batch_rewrite_status = st; |
270 | 1.97k | } else { |
271 | 1.97k | WriteBatchInternal::Put(&updates_ttl, column_family_id, key.TheOnlyPart(), |
272 | 1.97k | value_with_ts); |
273 | 1.97k | } |
274 | 1.97k | return Status::OK(); |
275 | 1.97k | } |
276 | 2.55k | virtual Status MergeCF(uint32_t column_family_id, const Slice& key, |
277 | 2.55k | const Slice& value) override { |
278 | 690 | std::string value_with_ts; |
279 | 690 | Status st = AppendTS(value, &value_with_ts, env_); |
280 | 690 | if (!st.ok()) { |
281 | 0 | batch_rewrite_status = st; |
282 | 690 | } else { |
283 | 690 | WriteBatchInternal::Merge(&updates_ttl, column_family_id, key, |
284 | 690 | value_with_ts); |
285 | 690 | } |
286 | 690 | return Status::OK(); |
287 | 690 | } |
288 | 2.55k | virtual Status DeleteCF(uint32_t column_family_id, |
289 | 2.55k | const Slice& key) override { |
290 | 50 | WriteBatchInternal::Delete(&updates_ttl, column_family_id, key); |
291 | 50 | return Status::OK(); |
292 | 50 | } |
293 | 2.55k | void LogData(const Slice& blob) override { |
294 | 0 | updates_ttl.PutLogData(blob); |
295 | 0 | } |
296 | | |
297 | 2.55k | private: |
298 | 2.55k | Env* env_; |
299 | 2.55k | }; |
300 | 2.55k | Handler handler(GetEnv()); |
301 | 2.55k | RETURN_NOT_OK(updates->Iterate(&handler)); |
302 | 2.55k | if (!handler.batch_rewrite_status.ok()) { |
303 | 0 | return handler.batch_rewrite_status; |
304 | 2.55k | } else { |
305 | 2.55k | return db_->Write(opts, &(handler.updates_ttl)); |
306 | 2.55k | } |
307 | 2.55k | } |
308 | | |
309 | | Iterator* DBWithTTLImpl::NewIterator(const ReadOptions& opts, |
310 | 12 | ColumnFamilyHandle* column_family) { |
311 | 12 | return new TtlIterator(db_->NewIterator(opts, column_family)); |
312 | 12 | } |
313 | | |
314 | | } // namespace rocksdb |
315 | | #endif // ROCKSDB_LITE |