YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/tablet/tablet_snapshots.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include "yb/tablet/tablet_snapshots.h"
15
16
#include <boost/algorithm/string/predicate.hpp>
17
18
#include "yb/common/index.h"
19
#include "yb/common/schema.h"
20
#include "yb/common/snapshot.h"
21
#include "yb/common/wire_protocol.h"
22
23
#include "yb/docdb/consensus_frontier.h"
24
#include "yb/docdb/docdb_rocksdb_util.h"
25
26
#include "yb/rocksdb/db.h"
27
#include "yb/rocksdb/util/file_util.h"
28
#include "yb/rocksdb/utilities/checkpoint.h"
29
30
#include "yb/tablet/operations/snapshot_operation.h"
31
#include "yb/tablet/tablet.h"
32
#include "yb/tablet/tablet_metadata.h"
33
34
#include "yb/util/file_util.h"
35
#include "yb/util/format.h"
36
#include "yb/util/logging.h"
37
#include "yb/util/operation_counter.h"
38
#include "yb/util/scope_exit.h"
39
#include "yb/util/status_format.h"
40
#include "yb/util/status_log.h"
41
42
using namespace std::literals;
43
44
namespace yb {
45
namespace tablet {
46
47
namespace {
48
49
const std::string kTempSnapshotDirSuffix = ".tmp";
50
51
} // namespace
52
53
struct TabletSnapshots::RestoreMetadata {
54
  boost::optional<Schema> schema;
55
  boost::optional<IndexMap> index_map;
56
  uint32_t schema_version;
57
  bool hide;
58
};
59
60
89.1k
TabletSnapshots::TabletSnapshots(Tablet* tablet) : TabletComponent(tablet) {}
61
62
313k
std::string TabletSnapshots::SnapshotsDirName(const std::string& rocksdb_dir) {
63
313k
  return rocksdb_dir + kSnapshotsDirSuffix;
64
313k
}
65
66
12
bool TabletSnapshots::IsTempSnapshotDir(const std::string& dir) {
67
12
  return boost::ends_with(dir, kTempSnapshotDirSuffix);
68
12
}
69
70
864
Status TabletSnapshots::Prepare(SnapshotOperation* operation) {
71
864
  return Status::OK();
72
864
}
73
74
14
Status TabletSnapshots::Create(SnapshotOperation* operation) {
75
14
  return Create(CreateSnapshotData {
76
14
    .snapshot_hybrid_time = HybridTime::FromPB(operation->request()->snapshot_hybrid_time()),
77
14
    .hybrid_time = operation->hybrid_time(),
78
14
    .op_id = operation->op_id(),
79
14
    .snapshot_dir = VERIFY_RESULT(operation->GetSnapshotDir()),
80
14
    .schedule_id = TryFullyDecodeSnapshotScheduleId(operation->request()->schedule_id()),
81
14
  });
82
14
}
83
84
14
Status TabletSnapshots::Create(const CreateSnapshotData& data) {
85
14
  LongOperationTracker long_operation_tracker("Create snapshot", 5s);
86
87
14
  ScopedRWOperation scoped_read_operation(&pending_op_counter());
88
14
  RETURN_NOT_OK(scoped_read_operation);
89
90
14
  Status s = regular_db().Flush(rocksdb::FlushOptions());
91
14
  if (PREDICT_FALSE(!s.ok())) {
92
0
    LOG_WITH_PREFIX(WARNING) << "RocksDB flush status: " << s;
93
0
    return s.CloneAndPrepend("Unable to flush RocksDB");
94
0
  }
95
96
14
  const std::string& snapshot_dir = data.snapshot_dir;
97
98
14
  Env* const env = metadata().fs_manager()->env();
99
14
  auto snapshot_hybrid_time = data.snapshot_hybrid_time;
100
14
  auto is_transactional_snapshot = snapshot_hybrid_time.is_valid();
101
102
  // Delete previous snapshot in the same directory if it exists.
103
14
  RETURN_NOT_OK(CleanupSnapshotDir(snapshot_dir));
104
105
14
  LOG_WITH_PREFIX(INFO) << "Started tablet snapshot creation in folder: " << snapshot_dir;
106
107
14
  const auto top_snapshots_dir = DirName(snapshot_dir);
108
14
  const auto tmp_snapshot_dir = snapshot_dir + kTempSnapshotDirSuffix;
109
110
  // Delete temp directory if it exists.
111
14
  RETURN_NOT_OK(CleanupSnapshotDir(tmp_snapshot_dir));
112
113
14
  bool exit_on_failure = true;
114
  // Delete snapshot (RocksDB checkpoint) directories on exit.
115
14
  auto se = ScopeExit(
116
14
      [this, env, &exit_on_failure, &snapshot_dir, &tmp_snapshot_dir, &top_snapshots_dir] {
117
14
    bool do_sync = false;
118
119
14
    if (env->FileExists(tmp_snapshot_dir)) {
120
0
      do_sync = true;
121
0
      const Status deletion_status = env->DeleteRecursively(tmp_snapshot_dir);
122
0
      if (PREDICT_FALSE(!deletion_status.ok())) {
123
0
        LOG_WITH_PREFIX(WARNING)
124
0
            << "Cannot recursively delete temp snapshot dir "
125
0
            << tmp_snapshot_dir << ": " << deletion_status;
126
0
      }
127
0
    }
128
129
14
    if (exit_on_failure && env->FileExists(snapshot_dir)) {
130
0
      do_sync = true;
131
0
      const Status deletion_status = env->DeleteRecursively(snapshot_dir);
132
0
      if (PREDICT_FALSE(!deletion_status.ok())) {
133
0
        LOG_WITH_PREFIX(WARNING)
134
0
            << "Cannot recursively delete snapshot dir " << snapshot_dir << ": " << deletion_status;
135
0
      }
136
0
    }
137
138
14
    if (do_sync) {
139
0
      const Status sync_status = env->SyncDir(top_snapshots_dir);
140
0
      if (PREDICT_FALSE(!sync_status.ok())) {
141
0
        LOG_WITH_PREFIX(WARNING)
142
0
            << "Cannot sync top snapshots dir " << top_snapshots_dir << ": " << sync_status;
143
0
      }
144
0
    }
145
14
  });
146
147
  // Note: checkpoint::CreateCheckpoint() calls DisableFileDeletions()/EnableFileDeletions()
148
  //       for the RocksDB object.
149
14
  s = CreateCheckpoint(tmp_snapshot_dir);
150
14
  if (PREDICT_FALSE(!s.ok())) {
151
0
    LOG_WITH_PREFIX(WARNING) << "Cannot create RocksDB checkpoint: " << s;
152
0
    return s.CloneAndPrepend("Cannot create RocksDB checkpoint");
153
0
  }
154
155
14
  if (is_transactional_snapshot) {
156
0
    rocksdb::Options rocksdb_options;
157
0
    tablet().InitRocksDBOptions(&rocksdb_options, LogPrefix());
158
0
    docdb::RocksDBPatcher patcher(tmp_snapshot_dir, rocksdb_options);
159
160
0
    RETURN_NOT_OK(patcher.Load());
161
0
    RETURN_NOT_OK(patcher.SetHybridTimeFilter(snapshot_hybrid_time));
162
0
  }
163
164
14
  RETURN_NOT_OK_PREPEND(
165
14
      env->RenameFile(tmp_snapshot_dir, snapshot_dir),
166
14
      Format("Cannot rename temp snapshot dir $0 to $1", tmp_snapshot_dir, snapshot_dir));
167
14
  RETURN_NOT_OK_PREPEND(
168
14
      env->SyncDir(top_snapshots_dir),
169
14
      Format("Cannot sync top snapshots dir $0", top_snapshots_dir));
170
171
14
  if (data.schedule_id && tablet().metadata()->AddSnapshotSchedule(data.schedule_id)) {
172
0
    RETURN_NOT_OK(tablet().metadata()->Flush());
173
0
  }
174
175
14
  LOG_WITH_PREFIX(INFO) << "Complete snapshot creation in folder: " << snapshot_dir
176
14
                        << ", snapshot hybrid time: " << snapshot_hybrid_time;
177
178
14
  exit_on_failure = false;
179
14
  return Status::OK();
180
14
}
181
182
28
Env& TabletSnapshots::env() {
183
28
  return *metadata().fs_manager()->env();
184
28
}
185
186
28
Status TabletSnapshots::CleanupSnapshotDir(const std::string& dir) {
187
28
  auto& env = this->env();
188
28
  if (!env.FileExists(dir)) {
189
28
    return Status::OK();
190
28
  }
191
192
0
  LOG_WITH_PREFIX(INFO) << "Deleting old snapshot dir " << dir;
193
0
  RETURN_NOT_OK_PREPEND(env.DeleteRecursively(dir),
194
0
                        "Cannot recursively delete old snapshot dir " + dir);
195
0
  auto top_snapshots_dir = DirName(dir);
196
0
  RETURN_NOT_OK_PREPEND(env.SyncDir(top_snapshots_dir),
197
0
                        "Cannot sync top snapshots dir " + top_snapshots_dir);
198
199
0
  return Status::OK();
200
0
}
201
202
862
Status TabletSnapshots::Restore(SnapshotOperation* operation) {
203
862
  const std::string snapshot_dir = VERIFY_RESULT(operation->GetSnapshotDir());
204
862
  const auto& request = *operation->request();
205
862
  auto restore_at = HybridTime::FromPB(request.snapshot_hybrid_time());
206
862
  auto restoration_id = TryFullyDecodeTxnSnapshotRestorationId(request.restoration_id());
207
208
0
  VLOG_WITH_PREFIX_AND_FUNC(1) << YB_STRUCT_TO_STRING(snapshot_dir, restore_at);
209
210
862
  if (!snapshot_dir.empty()) {
211
862
    RETURN_NOT_OK_PREPEND(
212
862
        FileExists(&rocksdb_env(), snapshot_dir),
213
862
        Format("Snapshot directory does not exist: $0", snapshot_dir));
214
862
  }
215
216
862
  docdb::ConsensusFrontier frontier;
217
862
  frontier.set_op_id(operation->op_id());
218
862
  frontier.set_hybrid_time(operation->hybrid_time());
219
862
  RestoreMetadata restore_metadata;
220
862
  if (request.has_schema()) {
221
0
    restore_metadata.schema.emplace();
222
0
    RETURN_NOT_OK(SchemaFromPB(request.schema(), restore_metadata.schema.get_ptr()));
223
0
    restore_metadata.index_map.emplace(request.indexes());
224
0
    restore_metadata.schema_version = request.schema_version();
225
0
    restore_metadata.hide = request.hide();
226
0
  }
227
862
  Status s = RestoreCheckpoint(snapshot_dir, restore_at, restore_metadata, frontier);
228
0
  VLOG_WITH_PREFIX(1) << "Complete checkpoint restoring with result " << s << " in folder: "
229
0
                      << metadata().rocksdb_dir();
230
862
  if (s.ok() && restoration_id) {
231
0
    s = tablet().RestoreStarted(restoration_id);
232
0
  }
233
862
  return s;
234
862
}
235
236
Status TabletSnapshots::RestoreCheckpoint(
237
    const std::string& dir, HybridTime restore_at, const RestoreMetadata& restore_metadata,
238
862
    const docdb::ConsensusFrontier& frontier) {
239
862
  LongOperationTracker long_operation_tracker("Restore checkpoint", 5s);
240
241
862
  const auto destroy = !dir.empty();
242
243
  // The following two lines can't just be changed to RETURN_NOT_OK(PauseReadWriteOperations()):
244
  // op_pause has to stay in scope until the end of the function.
245
862
  auto op_pauses = VERIFY_RESULT(StartShutdownRocksDBs(DisableFlushOnShutdown(destroy)));
246
247
862
  std::lock_guard<std::mutex> lock(create_checkpoint_lock());
248
249
862
  const string db_dir = regular_db().GetName();
250
861
  const std::string intents_db_dir = has_intents_db() ? intents_db().GetName() : std::string();
251
252
862
  if (dir.empty()) {
253
    // Just change rocksdb hybrid time limit, because it should be in retention interval.
254
    // TODO(pitr) apply transactions and reset intents.
255
0
    RETURN_NOT_OK(CompleteShutdownRocksDBs(Destroy(destroy), &op_pauses));
256
862
  } else {
257
    // Destroy DB object.
258
    // TODO: snapshot current DB and try to restore it in case of failure.
259
862
    RETURN_NOT_OK(CompleteShutdownRocksDBs(Destroy(destroy), &op_pauses));
260
261
862
    auto s = CopyDirectory(
262
862
        &rocksdb_env(), dir, db_dir, UseHardLinks::kTrue, CreateIfMissing::kTrue);
263
862
    if (PREDICT_FALSE(!s.ok())) {
264
0
      LOG_WITH_PREFIX(WARNING) << "Copy checkpoint files status: " << s;
265
0
      return STATUS(IllegalState, "Unable to copy checkpoint files", s.ToString());
266
0
    }
267
862
  }
268
269
862
  {
270
862
    rocksdb::Options rocksdb_options;
271
862
    tablet().InitRocksDBOptions(&rocksdb_options, LogPrefix());
272
862
    docdb::RocksDBPatcher patcher(db_dir, rocksdb_options);
273
274
862
    RETURN_NOT_OK(patcher.Load());
275
862
    RETURN_NOT_OK(patcher.ModifyFlushedFrontier(frontier));
276
862
    if (restore_at) {
277
0
      RETURN_NOT_OK(patcher.SetHybridTimeFilter(restore_at));
278
0
    }
279
862
  }
280
281
862
  if (restore_metadata.schema) {
282
    // TODO(pitr) check deleted columns
283
0
    tablet().metadata()->SetSchema(
284
0
        *restore_metadata.schema, *restore_metadata.index_map, {} /* deleted_columns */,
285
0
        restore_metadata.schema_version);
286
0
    tablet().metadata()->SetHidden(restore_metadata.hide);
287
0
    RETURN_NOT_OK(tablet().metadata()->Flush());
288
0
    RefreshYBMetaDataCache();
289
0
  }
290
291
  // Reopen database from copied checkpoint.
292
  // Note: db_dir == metadata()->rocksdb_dir() is still valid db dir.
293
862
  auto s = OpenRocksDBs();
294
862
  if (PREDICT_FALSE(!s.ok())) {
295
0
    LOG_WITH_PREFIX(WARNING) << "Failed tablet db opening from checkpoint: " << s;
296
0
    return s;
297
0
  }
298
299
862
  LOG_WITH_PREFIX(INFO) << "Checkpoint restored from " << dir;
300
862
  LOG_WITH_PREFIX(INFO) << "Re-enabling compactions";
301
862
  s = tablet().EnableCompactions(&op_pauses.non_abortable);
302
862
  if (!s.ok()) {
303
0
    LOG_WITH_PREFIX(WARNING) << "Failed to enable compactions after restoring a checkpoint";
304
0
    return s;
305
0
  }
306
307
  // Ensure that op_pauses stays in scope throughout this function.
308
1.72k
  for (auto* op_pause : op_pauses.AsArray()) {
309
1.72k
    DFATAL_OR_RETURN_NOT_OK(op_pause->status());
310
1.72k
  }
311
312
862
  return Status::OK();
313
862
}
314
315
Result<std::string> TabletSnapshots::RestoreToTemporary(
316
0
    const TxnSnapshotId& snapshot_id, HybridTime restore_at) {
317
0
  auto source_dir = JoinPathSegments(
318
0
      VERIFY_RESULT(metadata().TopSnapshotsDir()), snapshot_id.ToString());
319
0
  auto dest_dir = source_dir + kTempSnapshotDirSuffix;
320
0
  RETURN_NOT_OK(CleanupSnapshotDir(dest_dir));
321
0
  RETURN_NOT_OK(CopyDirectory(
322
0
      &rocksdb_env(), source_dir, dest_dir, UseHardLinks::kTrue, CreateIfMissing::kTrue));
323
324
0
  {
325
0
    rocksdb::Options rocksdb_options;
326
0
    tablet().InitRocksDBOptions(&rocksdb_options, LogPrefix());
327
0
    docdb::RocksDBPatcher patcher(dest_dir, rocksdb_options);
328
329
0
    RETURN_NOT_OK(patcher.Load());
330
0
    RETURN_NOT_OK(patcher.SetHybridTimeFilter(restore_at));
331
0
  }
332
333
0
  return dest_dir;
334
0
}
335
336
0
Status TabletSnapshots::Delete(const SnapshotOperation& operation) {
337
0
  const std::string top_snapshots_dir = metadata().snapshots_dir();
338
0
  const auto& snapshot_id = operation.request()->snapshot_id();
339
0
  auto txn_snapshot_id = TryFullyDecodeTxnSnapshotId(snapshot_id);
340
0
  const std::string snapshot_dir = JoinPathSegments(
341
0
      top_snapshots_dir, !txn_snapshot_id ? snapshot_id : txn_snapshot_id.ToString());
342
343
0
  std::lock_guard<std::mutex> lock(create_checkpoint_lock());
344
0
  Env* const env = metadata().fs_manager()->env();
345
346
0
  if (env->FileExists(snapshot_dir)) {
347
0
    const Status deletion_status = env->DeleteRecursively(snapshot_dir);
348
0
    if (PREDICT_FALSE(!deletion_status.ok())) {
349
0
      LOG_WITH_PREFIX(WARNING) << "Cannot recursively delete snapshot dir " << snapshot_dir
350
0
                               << ": " << deletion_status;
351
0
    }
352
353
0
    const Status sync_status = env->SyncDir(top_snapshots_dir);
354
0
    if (PREDICT_FALSE(!sync_status.ok())) {
355
0
      LOG_WITH_PREFIX(WARNING) << "Cannot sync top snapshots dir " << top_snapshots_dir
356
0
                               << ": " << sync_status;
357
0
    }
358
0
  }
359
360
0
  docdb::ConsensusFrontier frontier;
361
0
  frontier.set_op_id(operation.op_id());
362
0
  frontier.set_hybrid_time(operation.hybrid_time());
363
  // Here we are just recording the fact that we've executed the "delete snapshot" Raft operation
364
  // so that it won't get replayed if we crash. No need to force the flushed frontier to be the
365
  // exact value set above.
366
0
  RETURN_NOT_OK(tablet().ModifyFlushedFrontier(
367
0
      frontier, rocksdb::FrontierModificationMode::kUpdate));
368
369
0
  LOG_WITH_PREFIX(INFO) << "Complete snapshot deletion on tablet in folder: " << snapshot_dir;
370
371
0
  return Status::OK();
372
0
}
373
374
Status TabletSnapshots::CreateCheckpoint(
375
1.55k
    const std::string& dir, const CreateIntentsCheckpointIn create_intents_checkpoint_in) {
376
1.55k
  ScopedRWOperation scoped_read_operation(&pending_op_counter());
377
1.55k
  RETURN_NOT_OK(scoped_read_operation);
378
379
1.55k
  auto temp_intents_dir = dir + kIntentsDBSuffix;
380
1.55k
  auto final_intents_dir = JoinPathSegments(dir, kIntentsSubdir);
381
382
1.55k
  std::lock_guard<std::mutex> lock(create_checkpoint_lock());
383
384
1.55k
  if (!has_regular_db()) {
385
165
    LOG_WITH_PREFIX(INFO) << "Skipped creating checkpoint in " << dir;
386
165
    return STATUS(NotSupported,
387
165
                  "Tablet does not have a RocksDB (could be a transaction status tablet)");
388
165
  }
389
390
1.38k
  auto parent_dir = DirName(dir);
391
1.38k
  RETURN_NOT_OK_PREPEND(metadata().fs_manager()->CreateDirIfMissing(parent_dir),
392
1.38k
                        Format("Unable to create checkpoints directory $0", parent_dir));
393
394
  // Order does not matter because we flush both DBs and does not have parallel writes.
395
1.38k
  Status status;
396
1.38k
  if (has_intents_db()) {
397
103
    status = rocksdb::checkpoint::CreateCheckpoint(&intents_db(), temp_intents_dir);
398
103
  }
399
1.38k
  if (status.ok()) {
400
1.38k
    status = rocksdb::checkpoint::CreateCheckpoint(&regular_db(), dir);
401
1.38k
  }
402
1.38k
  if (status.ok() && has_intents_db() &&
403
102
      create_intents_checkpoint_in == CreateIntentsCheckpointIn::kUseIntentsDbSuffix) {
404
55
    status = Env::Default()->RenameFile(temp_intents_dir, final_intents_dir);
405
55
  }
406
407
1.38k
  if (!status.ok()) {
408
0
    LOG_WITH_PREFIX(WARNING) << "Create checkpoint status: " << status;
409
0
    return STATUS_FORMAT(IllegalState, "Unable to create checkpoint: $0", status);
410
0
  }
411
1.38k
  LOG_WITH_PREFIX(INFO) << "Checkpoint created in " << dir;
412
413
1.38k
  TEST_last_rocksdb_checkpoint_dir_ = dir;
414
415
1.38k
  return Status::OK();
416
1.38k
}
417
418
220k
Status TabletSnapshots::CreateDirectories(const string& rocksdb_dir, FsManager* fs) {
419
220k
  const auto top_snapshots_dir = SnapshotsDirName(rocksdb_dir);
420
220k
  RETURN_NOT_OK_PREPEND(fs->CreateDirIfMissingAndSync(top_snapshots_dir),
421
220k
                        Format("Unable to create snapshots directory $0", top_snapshots_dir));
422
220k
  return Status::OK();
423
220k
}
424
425
0
Status TabletSnapshots::RestoreFinished(SnapshotOperation* operation) {
426
0
  return tablet().RestoreFinished(
427
0
      VERIFY_RESULT(FullyDecodeTxnSnapshotRestorationId(operation->request()->restoration_id())),
428
0
      HybridTime::FromPB(operation->request()->restoration_hybrid_time()));
429
0
}
430
431
} // namespace tablet
432
} // namespace yb