YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/rocksdb/utilities/ttl/db_ttl_impl.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
2
// Use of this source code is governed by a BSD-style license that can be
3
// found in the LICENSE file. See the AUTHORS file for names of contributors.
4
//
5
// The following only applies to changes made to this file as part of YugaByte development.
6
//
7
// Portions Copyright (c) YugaByte, Inc.
8
//
9
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
10
// in compliance with the License.  You may obtain a copy of the License at
11
//
12
// http://www.apache.org/licenses/LICENSE-2.0
13
//
14
// Unless required by applicable law or agreed to in writing, software distributed under the License
15
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
16
// or implied.  See the License for the specific language governing permissions and limitations
17
// under the License.
18
//
19
#ifndef ROCKSDB_LITE
20
21
#include "yb/rocksdb/utilities/ttl/db_ttl_impl.h"
22
23
#include "yb/rocksdb/convenience.h"
24
#include "yb/rocksdb/env.h"
25
#include "yb/rocksdb/iterator.h"
26
#include "yb/rocksdb/util/coding.h"
27
28
namespace rocksdb {
29
30
void DBWithTTLImpl::SanitizeOptions(int32_t ttl, ColumnFamilyOptions* options,
31
39
                                    Env* env) {
32
39
  if (options->compaction_filter) {
33
0
    options->compaction_filter =
34
0
        new TtlCompactionFilter(ttl, env, options->compaction_filter);
35
39
  } else {
36
39
    options->compaction_filter_factory =
37
39
        std::shared_ptr<CompactionFilterFactory>(new TtlCompactionFilterFactory(
38
39
            ttl, env, options->compaction_filter_factory));
39
39
  }
40
41
39
  if (options->merge_operator) {
42
15
    options->merge_operator.reset(
43
15
        new TtlMergeOperator(options->merge_operator, env));
44
15
  }
45
39
}
46
47
// Open the db inside DBWithTTLImpl because options needs pointer to its ttl
48
37
DBWithTTLImpl::DBWithTTLImpl(DB* db) : DBWithTTL(db) {}
49
50
37
DBWithTTLImpl::~DBWithTTLImpl() {
51
  // Need to stop background compaction before getting rid of the filter
52
37
  CancelAllBackgroundWork(db_, /* wait = */ true);
53
37
  delete GetOptions().compaction_filter;
54
37
}
55
56
Status UtilityDB::OpenTtlDB(const Options& options, const std::string& dbname,
57
0
                            StackableDB** dbptr, int32_t ttl, bool read_only) {
58
0
  DBWithTTL* db;
59
0
  Status s = DBWithTTL::Open(options, dbname, &db, ttl, read_only);
60
0
  if (s.ok()) {
61
0
    *dbptr = db;
62
0
  } else {
63
0
    *dbptr = nullptr;
64
0
  }
65
0
  return s;
66
0
}
67
68
Status DBWithTTL::Open(const Options& options, const std::string& dbname,
69
36
                       DBWithTTL** dbptr, int32_t ttl, bool read_only) {
70
71
36
  DBOptions db_options(options);
72
36
  ColumnFamilyOptions cf_options(options);
73
36
  std::vector<ColumnFamilyDescriptor> column_families;
74
36
  column_families.push_back(
75
36
      ColumnFamilyDescriptor(kDefaultColumnFamilyName, cf_options));
76
36
  std::vector<ColumnFamilyHandle*> handles;
77
36
  Status s = DBWithTTL::Open(db_options, dbname, column_families, &handles,
78
36
                             dbptr, {ttl}, read_only);
79
36
  if (s.ok()) {
80
36
    assert(handles.size() == 1);
81
    // i can delete the handle since DBImpl is always holding a reference to
82
    // default column family
83
0
    delete handles[0];
84
36
  }
85
0
  return s;
86
36
}
87
88
Status DBWithTTL::Open(
89
    const DBOptions& db_options, const std::string& dbname,
90
    const std::vector<ColumnFamilyDescriptor>& column_families,
91
    std::vector<ColumnFamilyHandle*>* handles, DBWithTTL** dbptr,
92
37
    std::vector<int32_t> ttls, bool read_only) {
93
94
37
  if (ttls.size() != column_families.size()) {
95
0
    return STATUS(InvalidArgument,
96
0
        "ttls size has to be the same as number of column families");
97
0
  }
98
99
37
  std::vector<ColumnFamilyDescriptor> column_families_sanitized =
100
37
      column_families;
101
75
  for (size_t i = 0; i < column_families_sanitized.size(); 
++i38
) {
102
38
    DBWithTTLImpl::SanitizeOptions(
103
38
        ttls[i], &column_families_sanitized[i].options,
104
38
        db_options.env == nullptr ? 
Env::Default()0
: db_options.env);
105
38
  }
106
37
  DB* db;
107
108
37
  Status st;
109
37
  if (read_only) {
110
1
    st = DB::OpenForReadOnly(db_options, dbname, column_families_sanitized,
111
1
                             handles, &db);
112
36
  } else {
113
36
    st = DB::Open(db_options, dbname, column_families_sanitized, handles, &db);
114
36
  }
115
37
  if (st.ok()) {
116
37
    *dbptr = new DBWithTTLImpl(db);
117
37
  } else {
118
0
    *dbptr = nullptr;
119
0
  }
120
37
  return st;
121
37
}
122
123
Status DBWithTTLImpl::CreateColumnFamilyWithTtl(
124
    const ColumnFamilyOptions& options, const std::string& column_family_name,
125
1
    ColumnFamilyHandle** handle, int ttl) {
126
1
  ColumnFamilyOptions sanitized_options = options;
127
1
  DBWithTTLImpl::SanitizeOptions(ttl, &sanitized_options, GetEnv());
128
129
1
  return DBWithTTL::CreateColumnFamily(sanitized_options, column_family_name,
130
1
                                       handle);
131
1
}
132
133
Status DBWithTTLImpl::CreateColumnFamily(const ColumnFamilyOptions& options,
134
                                         const std::string& column_family_name,
135
0
                                         ColumnFamilyHandle** handle) {
136
0
  return CreateColumnFamilyWithTtl(options, column_family_name, handle, 0);
137
0
}
138
139
// Appends the current timestamp to the string.
140
// Returns false if could not get the current_time, true if append succeeds
141
Status DBWithTTLImpl::AppendTS(const Slice& val, std::string* val_with_ts,
142
2.66k
                               Env* env) {
143
2.66k
  val_with_ts->reserve(kTSLength + val.size());
144
2.66k
  char ts_string[kTSLength];
145
2.66k
  int64_t curtime;
146
2.66k
  Status st = env->GetCurrentTime(&curtime);
147
2.66k
  if (!st.ok()) {
148
0
    return st;
149
0
  }
150
2.66k
  EncodeFixed32(ts_string, (int32_t)curtime);
151
2.66k
  val_with_ts->append(val.cdata(), val.size());
152
2.66k
  val_with_ts->append(ts_string, kTSLength);
153
2.66k
  return st;
154
2.66k
}
155
156
// Returns corruption if the length of the string is lesser than timestamp, or
157
// timestamp refers to a time lesser than ttl-feature release time
158
2.14k
Status DBWithTTLImpl::SanityCheckTimestamp(const Slice& str) {
159
2.14k
  if (str.size() < kTSLength) {
160
0
    return STATUS(Corruption, "Error: value's length less than timestamp's\n");
161
0
  }
162
  // Checks that TS is not lesser than kMinTimestamp
163
  // Gaurds against corruption & normal database opened incorrectly in ttl mode
164
2.14k
  int32_t timestamp_value = DecodeFixed32(str.data() + str.size() - kTSLength);
165
2.14k
  if (timestamp_value < kMinTimestamp) {
166
0
    return STATUS(Corruption, "Error: Timestamp < ttl feature release time!\n");
167
0
  }
168
2.14k
  return Status::OK();
169
2.14k
}
170
171
// Checks if the string is stale or not according to TTl provided
172
2.33k
bool DBWithTTLImpl::IsStale(const Slice& value, int32_t ttl, Env* env) {
173
2.33k
  if (ttl <= 0) {  // Data is fresh if TTL is non-positive
174
372
    return false;
175
372
  }
176
1.95k
  int64_t curtime;
177
1.95k
  if (!env->GetCurrentTime(&curtime).ok()) {
178
0
    return false;  // Treat the data as fresh if could not get current time
179
0
  }
180
1.95k
  int32_t timestamp_value =
181
1.95k
      DecodeFixed32(value.data() + value.size() - kTSLength);
182
1.95k
  return (timestamp_value + ttl) < curtime;
183
1.95k
}
184
185
// Strips the TS from the end of the string
186
2.02k
Status DBWithTTLImpl::StripTS(std::string* str) {
187
2.02k
  Status st;
188
2.02k
  if (str->length() < kTSLength) {
189
0
    return STATUS(Corruption, "Bad timestamp in key-value");
190
0
  }
191
  // Erasing characters which hold the TS
192
2.02k
  str->erase(str->length() - kTSLength, kTSLength);
193
2.02k
  return st;
194
2.02k
}
195
196
Status DBWithTTLImpl::Put(const WriteOptions& options,
197
                          ColumnFamilyHandle* column_family, const Slice& key,
198
1.87k
                          const Slice& val) {
199
1.87k
  WriteBatch batch;
200
1.87k
  batch.Put(column_family, key, val);
201
1.87k
  return Write(options, &batch);
202
1.87k
}
203
204
Status DBWithTTLImpl::Get(const ReadOptions& options,
205
                          ColumnFamilyHandle* column_family, const Slice& key,
206
2.71k
                          std::string* value) {
207
2.71k
  Status st = db_->Get(options, column_family, key, value);
208
2.71k
  if (!st.ok()) {
209
896
    return st;
210
896
  }
211
1.82k
  st = SanityCheckTimestamp(*value);
212
1.82k
  if (!st.ok()) {
213
0
    return st;
214
0
  }
215
1.82k
  return StripTS(value);
216
1.82k
}
217
218
std::vector<Status> DBWithTTLImpl::MultiGet(
219
    const ReadOptions& options,
220
    const std::vector<ColumnFamilyHandle*>& column_family,
221
1
    const std::vector<Slice>& keys, std::vector<std::string>* values) {
222
1
  auto statuses = db_->MultiGet(options, column_family, keys, values);
223
101
  for (size_t i = 0; i < keys.size(); 
++i100
) {
224
100
    if (!statuses[i].ok()) {
225
0
      continue;
226
0
    }
227
100
    statuses[i] = SanityCheckTimestamp((*values)[i]);
228
100
    if (!statuses[i].ok()) {
229
0
      continue;
230
0
    }
231
100
    statuses[i] = StripTS(&(*values)[i]);
232
100
  }
233
1
  return statuses;
234
1
}
235
236
bool DBWithTTLImpl::KeyMayExist(const ReadOptions& options,
237
                                ColumnFamilyHandle* column_family,
238
                                const Slice& key, std::string* value,
239
100
                                bool* value_found) {
240
100
  bool ret = db_->KeyMayExist(options, column_family, key, value, value_found);
241
100
  if (ret && value != nullptr && value_found != nullptr && *value_found) {
242
100
    if (!SanityCheckTimestamp(*value).ok() || !StripTS(value).ok()) {
243
0
      return false;
244
0
    }
245
100
  }
246
100
  return ret;
247
100
}
248
249
Status DBWithTTLImpl::Merge(const WriteOptions& options,
250
                            ColumnFamilyHandle* column_family, const Slice& key,
251
683
                            const Slice& value) {
252
683
  WriteBatch batch;
253
683
  batch.Merge(column_family, key, value);
254
683
  return Write(options, &batch);
255
683
}
256
257
2.55k
Status DBWithTTLImpl::Write(const WriteOptions& opts, WriteBatch* updates) {
258
2.55k
  class Handler : public WriteBatch::Handler {
259
2.55k
   public:
260
2.55k
    explicit Handler(Env* env) : env_(env) {}
261
2.55k
    WriteBatch updates_ttl;
262
2.55k
    Status batch_rewrite_status;
263
264
2.55k
    Status PutCF(
265
2.55k
        uint32_t column_family_id, const SliceParts& key, const SliceParts& value) override {
266
1.97k
      std::string value_with_ts;
267
1.97k
      Status st = AppendTS(value.TheOnlyPart(), &value_with_ts, env_);
268
1.97k
      if (!st.ok()) {
269
0
        batch_rewrite_status = st;
270
1.97k
      } else {
271
1.97k
        WriteBatchInternal::Put(&updates_ttl, column_family_id, key.TheOnlyPart(),
272
1.97k
                                value_with_ts);
273
1.97k
      }
274
1.97k
      return Status::OK();
275
1.97k
    }
276
2.55k
    virtual Status MergeCF(uint32_t column_family_id, const Slice& key,
277
2.55k
                           const Slice& value) override {
278
690
      std::string value_with_ts;
279
690
      Status st = AppendTS(value, &value_with_ts, env_);
280
690
      if (!st.ok()) {
281
0
        batch_rewrite_status = st;
282
690
      } else {
283
690
        WriteBatchInternal::Merge(&updates_ttl, column_family_id, key,
284
690
                                  value_with_ts);
285
690
      }
286
690
      return Status::OK();
287
690
    }
288
2.55k
    virtual Status DeleteCF(uint32_t column_family_id,
289
2.55k
                            const Slice& key) override {
290
50
      WriteBatchInternal::Delete(&updates_ttl, column_family_id, key);
291
50
      return Status::OK();
292
50
    }
293
2.55k
    void LogData(const Slice& blob) override {
294
0
      updates_ttl.PutLogData(blob);
295
0
    }
296
297
2.55k
   private:
298
2.55k
    Env* env_;
299
2.55k
  };
300
2.55k
  Handler handler(GetEnv());
301
2.55k
  RETURN_NOT_OK(updates->Iterate(&handler));
302
2.55k
  if (!handler.batch_rewrite_status.ok()) {
303
0
    return handler.batch_rewrite_status;
304
2.55k
  } else {
305
2.55k
    return db_->Write(opts, &(handler.updates_ttl));
306
2.55k
  }
307
2.55k
}
308
309
Iterator* DBWithTTLImpl::NewIterator(const ReadOptions& opts,
310
12
                                     ColumnFamilyHandle* column_family) {
311
12
  return new TtlIterator(db_->NewIterator(opts, column_family));
312
12
}
313
314
}  // namespace rocksdb
315
#endif  // ROCKSDB_LITE