YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/rocksdb/db/repair.cc
Line
Count
Source (jump to first uncovered line)
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under the BSD-style license found in the
3
//  LICENSE file in the root directory of this source tree. An additional grant
4
//  of patent rights can be found in the PATENTS file in the same directory.
5
//
6
// The following only applies to changes made to this file as part of YugaByte development.
7
//
8
// Portions Copyright (c) YugaByte, Inc.
9
//
10
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
11
// in compliance with the License.  You may obtain a copy of the License at
12
//
13
// http://www.apache.org/licenses/LICENSE-2.0
14
//
15
// Unless required by applicable law or agreed to in writing, software distributed under the License
16
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
17
// or implied.  See the License for the specific language governing permissions and limitations
18
// under the License.
19
//
20
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
21
// Use of this source code is governed by a BSD-style license that can be
22
// found in the LICENSE file. See the AUTHORS file for names of contributors.
23
//
24
// Repairer does best effort recovery to recover as much data as possible after
25
// a disaster without compromising consistency. It does not guarantee bringing
26
// the database to a time consistent state.
27
//
28
// Repair process is broken into 4 phases:
29
// (a) Find files
30
// (b) Convert logs to tables
31
// (c) Extract metadata
32
// (d) Write Descriptor
33
//
34
// (a) Find files
35
//
36
// The repairer goes through all the files in the directory, and classifies them
37
// based on their file name. Any file that cannot be identified by name will be
38
// ignored.
39
//
40
// (b) Convert logs to table
41
//
42
// Every log file that is active is replayed. All sections of the file where the
43
// checksum does not match is skipped over. We intentionally give preference to
44
// data consistency.
45
//
46
// (c) Extract metadata
47
//
48
// We scan every table to compute
49
// (1) smallest/largest for the table
50
// (2) largest sequence number in the table
51
//
52
// If we are unable to scan the file, then we ignore the table.
53
//
54
// (d) Write Descriptor
55
//
56
// We generate descriptor contents:
57
//  - log number is set to zero
58
//  - next-file-number is set to 1 + largest file number we found
59
//  - last-sequence-number is set to largest sequence# found across
60
//    all tables (see 2c)
61
//  - compaction pointers are cleared
62
//  - every table file is added at level 0
63
//
64
// Possible optimization 1:
65
//   (a) Compute total size and use to pick appropriate max-level M
66
//   (b) Sort tables by largest sequence# in the table
67
//   (c) For each table: if it overlaps earlier table, place in level-0,
68
//       else place in level-M.
69
//   (d) We can provide options for time consistent recovery and unsafe recovery
70
//       (ignore checksum failure when applicable)
71
// Possible optimization 2:
72
//   Store per-table metadata (smallest, largest, largest-seq#, ...)
73
//   in the table's meta section to speed up ScanTable.
74
75
#ifndef ROCKSDB_LITE
76
77
#ifndef __STDC_FORMAT_MACROS
78
#define __STDC_FORMAT_MACROS
79
#endif
80
81
#include <inttypes.h>
82
#include "yb/rocksdb/db/builder.h"
83
#include "yb/rocksdb/db/db_impl.h"
84
#include "yb/rocksdb/db/dbformat.h"
85
#include "yb/rocksdb/db/filename.h"
86
#include "yb/rocksdb/db/log_reader.h"
87
#include "yb/rocksdb/db/log_writer.h"
88
#include "yb/rocksdb/db/memtable.h"
89
#include "yb/rocksdb/db/table_cache.h"
90
#include "yb/rocksdb/db/version_edit.h"
91
#include "yb/rocksdb/db/writebuffer.h"
92
#include "yb/rocksdb/db/write_batch_internal.h"
93
#include "yb/rocksdb/comparator.h"
94
#include "yb/rocksdb/db.h"
95
#include "yb/rocksdb/env.h"
96
#include "yb/rocksdb/options.h"
97
#include "yb/rocksdb/immutable_options.h"
98
#include "yb/rocksdb/table/scoped_arena_iterator.h"
99
#include "yb/rocksdb/table/table_reader.h"
100
#include "yb/rocksdb/util/file_reader_writer.h"
101
#include "yb/rocksdb/util/logging.h"
102
103
#include "yb/util/status_log.h"
104
105
namespace rocksdb {
106
107
namespace {
108
109
class Repairer {
110
 public:
111
  Repairer(const std::string& dbname, const Options& options)
112
      : dbname_(dbname),
113
        env_(options.env),
114
        icmp_(std::make_shared<InternalKeyComparator>(options.comparator)),
115
        options_(SanitizeOptions(dbname, icmp_.get(), options)),
116
        ioptions_(options_),
117
        raw_table_cache_(
118
            // TableCache can be small since we expect each table to be opened
119
            // once.
120
            NewLRUCache(10, options_.table_cache_numshardbits)),
121
3
        next_file_number_(1) {
122
3
    GetIntTblPropCollectorFactory(options, &int_tbl_prop_collector_factories_);
123
124
3
    table_cache_ =
125
3
        new TableCache(ioptions_, env_options_, raw_table_cache_.get());
126
3
    edit_ = new VersionEdit();
127
3
  }
128
129
3
  ~Repairer() {
130
3
    delete table_cache_;
131
3
    raw_table_cache_.reset();
132
3
    delete edit_;
133
3
  }
134
135
3
  Status Run() {
136
3
    Status status = FindFiles();
137
3
    if (status.ok()) {
138
3
      ConvertLogFilesToTables();
139
3
      ExtractMetaData();
140
3
      status = WriteDescriptor();
141
3
    }
142
3
    if (status.ok()) {
143
3
      uint64_t bytes = 0;
144
6
      for (size_t i = 0; i < tables_.size(); 
i++3
) {
145
3
        bytes += tables_[i].meta.fd.GetTotalFileSize();
146
3
      }
147
3
      RLOG(InfoLogLevel::WARN_LEVEL, options_.info_log,
148
3
          "**** Repaired rocksdb %s; "
149
3
          "recovered %" ROCKSDB_PRIszt " files; %" PRIu64
150
3
          "bytes. "
151
3
          "Some data may have been lost. "
152
3
          "****",
153
3
          dbname_.c_str(), tables_.size(), bytes);
154
3
    }
155
3
    return status;
156
3
  }
157
158
 private:
159
  struct TableInfo {
160
    FileMetaData meta;
161
    SequenceNumber min_sequence;
162
    SequenceNumber max_sequence;
163
  };
164
165
  std::string const dbname_;
166
  Env* const env_;
167
  InternalKeyComparatorPtr icmp_;
168
  IntTblPropCollectorFactories int_tbl_prop_collector_factories_;
169
  const Options options_;
170
  const ImmutableCFOptions ioptions_;
171
  std::shared_ptr<Cache> raw_table_cache_;
172
  TableCache* table_cache_;
173
  VersionEdit* edit_;
174
175
  std::vector<std::string> manifests_;
176
  std::vector<FileDescriptor> table_fds_;
177
  std::vector<uint64_t> logs_;
178
  std::vector<TableInfo> tables_;
179
  uint64_t next_file_number_;
180
  const EnvOptions env_options_;
181
182
3
  Status FindFiles() {
183
3
    std::vector<std::string> filenames;
184
3
    bool found_file = false;
185
6
    for (size_t path_id = 0; path_id < options_.db_paths.size(); 
path_id++3
) {
186
3
      Status status =
187
3
          env_->GetChildren(options_.db_paths[path_id].path, &filenames);
188
3
      if (!status.ok()) {
189
0
        return status;
190
0
      }
191
3
      if (!filenames.empty()) {
192
3
        found_file = true;
193
3
      }
194
195
3
      uint64_t number;
196
3
      FileType type;
197
36
      for (size_t i = 0; i < filenames.size(); 
i++33
) {
198
33
        if (ParseFileName(filenames[i], &number, &type)) {
199
20
          if (type == kDescriptorFile) {
200
3
            assert(path_id == 0);
201
0
            manifests_.push_back(filenames[i]);
202
17
          } else {
203
17
            if (number + 1 > next_file_number_) {
204
5
              next_file_number_ = number + 1;
205
5
            }
206
17
            if (type == kLogFile) {
207
3
              assert(path_id == 0);
208
0
              logs_.push_back(number);
209
14
            } else if (type == kTableFile) {
210
1
              table_fds_.emplace_back(number, static_cast<uint32_t>(path_id),
211
1
                                      0, 0);
212
13
            } else {
213
              // Ignore other files
214
13
            }
215
17
          }
216
20
        }
217
33
      }
218
3
    }
219
3
    if (!found_file) {
220
0
      return STATUS(Corruption, dbname_, "repair found no files");
221
0
    }
222
3
    return Status::OK();
223
3
  }
224
225
3
  void ConvertLogFilesToTables() {
226
6
    for (size_t i = 0; i < logs_.size(); 
i++3
) {
227
3
      std::string logname = LogFileName(dbname_, logs_[i]);
228
3
      Status status = ConvertLogToTable(logs_[i]);
229
3
      if (!status.ok()) {
230
0
        RLOG(InfoLogLevel::WARN_LEVEL, options_.info_log,
231
0
            "Log #%" PRIu64 ": ignoring conversion error: %s", logs_[i],
232
0
            status.ToString().c_str());
233
0
      }
234
3
      ArchiveFile(logname);
235
3
    }
236
3
  }
237
238
3
  Status ConvertLogToTable(uint64_t log) {
239
3
    struct LogReporter : public log::Reader::Reporter {
240
3
      Env* env;
241
3
      std::shared_ptr<Logger> info_log;
242
3
      uint64_t lognum;
243
3
      void Corruption(size_t bytes, const Status& s) override {
244
        // We print error messages for corruption, but continue repairing.
245
0
        RLOG(InfoLogLevel::ERROR_LEVEL, info_log,
246
0
            "Log #%" PRIu64 ": dropping %d bytes; %s", lognum,
247
0
            static_cast<int>(bytes), s.ToString().c_str());
248
0
      }
249
3
    };
250
251
    // Open the log file
252
3
    std::string logname = LogFileName(dbname_, log);
253
3
    unique_ptr<SequentialFile> lfile;
254
3
    Status status = env_->NewSequentialFile(logname, &lfile, env_options_);
255
3
    if (!status.ok()) {
256
0
      return status;
257
0
    }
258
3
    unique_ptr<SequentialFileReader> lfile_reader(
259
3
        new SequentialFileReader(std::move(lfile)));
260
261
    // Create the log reader.
262
3
    LogReporter reporter;
263
3
    reporter.env = env_;
264
3
    reporter.info_log = options_.info_log;
265
3
    reporter.lognum = log;
266
    // We intentially make log::Reader do checksumming so that
267
    // corruptions cause entire commits to be skipped instead of
268
    // propagating bad information (like overly large sequence
269
    // numbers).
270
3
    log::Reader reader(options_.info_log, std::move(lfile_reader), &reporter,
271
3
                       true /*enable checksum*/, 0 /*initial_offset*/, log);
272
273
    // Read all the records and add to a memtable
274
3
    std::string scratch;
275
3
    Slice record;
276
3
    WriteBatch batch;
277
3
    WriteBuffer wb(options_.db_write_buffer_size);
278
3
    MemTable* mem =
279
3
        new MemTable(*icmp_, ioptions_, MutableCFOptions(options_, ioptions_),
280
3
                     &wb, kMaxSequenceNumber);
281
3
    auto cf_mems_default = new ColumnFamilyMemTablesDefault(mem);
282
3
    mem->Ref();
283
3
    int counter = 0;
284
1.00k
    while (reader.ReadRecord(&record, &scratch)) {
285
1.00k
      if (record.size() < 12) {
286
0
        reporter.Corruption(
287
0
            record.size(), STATUS(Corruption, "log record too small"));
288
0
        continue;
289
0
      }
290
1.00k
      WriteBatchInternal::SetContents(&batch, record);
291
1.00k
      status = WriteBatchInternal::InsertInto(&batch, cf_mems_default, nullptr);
292
1.00k
      if (status.ok()) {
293
1.00k
        counter += WriteBatchInternal::Count(&batch);
294
1.00k
      } else {
295
0
        RLOG(InfoLogLevel::WARN_LEVEL,
296
0
            options_.info_log, "Log #%" PRIu64 ": ignoring %s", log,
297
0
            status.ToString().c_str());
298
0
        status = Status::OK();  // Keep going with rest of file
299
0
      }
300
1.00k
    }
301
302
    // Do not record a version edit for this conversion to a Table
303
    // since ExtractMetaData() will also generate edits.
304
3
    FileMetaData meta;
305
3
    meta.fd = FileDescriptor(next_file_number_++, 0, 0, 0);
306
3
    {
307
3
      ReadOptions ro;
308
3
      ro.total_order_seek = true;
309
3
      Arena arena;
310
3
      ScopedArenaIterator iter(mem->NewIterator(ro, &arena));
311
3
      status = BuildTable(dbname_,
312
3
                          env_,
313
3
                          ioptions_,
314
3
                          env_options_,
315
3
                          table_cache_,
316
3
                          iter.get(),
317
3
                          &meta,
318
3
                          icmp_,
319
3
                          int_tbl_prop_collector_factories_,
320
3
                          TablePropertiesCollectorFactory::Context::kUnknownColumnFamily,
321
3
                          {},
322
3
                          kMaxSequenceNumber,
323
3
                          kNoCompression,
324
3
                          CompressionOptions(),
325
3
                          /* paranoid_file_checks */ false,
326
3
                          /* internal_stats */ nullptr,
327
3
                          options_.boundary_extractor.get());
328
3
    }
329
3
    delete mem->Unref();
330
3
    delete cf_mems_default;
331
3
    mem = nullptr;
332
3
    if (status.ok()) {
333
3
      if (meta.fd.GetTotalFileSize() > 0) {
334
2
        table_fds_.push_back(meta.fd);
335
2
      }
336
3
    }
337
3
    RLOG(InfoLogLevel::INFO_LEVEL, options_.info_log,
338
3
        "Log #%" PRIu64 ": %d ops saved to Table #%" PRIu64 " %s",
339
3
        log, counter, meta.fd.GetNumber(), status.ToString().c_str());
340
3
    return status;
341
3
  }
342
343
3
  void ExtractMetaData() {
344
6
    for (size_t i = 0; i < table_fds_.size(); 
i++3
) {
345
3
      TableInfo t;
346
3
      t.meta.fd = table_fds_[i];
347
3
      Status status = ScanTable(&t);
348
3
      if (!status.ok()) {
349
0
        std::string fname = TableFileName(
350
0
            options_.db_paths, t.meta.fd.GetNumber(), t.meta.fd.GetPathId());
351
0
        char file_num_buf[kFormatFileNumberBufSize];
352
0
        FormatFileNumber(t.meta.fd.GetNumber(), t.meta.fd.GetPathId(),
353
0
                         file_num_buf, sizeof(file_num_buf));
354
0
        RLOG(InfoLogLevel::WARN_LEVEL, options_.info_log,
355
0
            "Table #%s: ignoring %s", file_num_buf,
356
0
            status.ToString().c_str());
357
0
        ArchiveFile(fname);
358
3
      } else {
359
3
        tables_.push_back(t);
360
3
      }
361
3
    }
362
3
  }
363
364
3
  Status ScanTable(TableInfo* t) {
365
3
    std::string fname = TableFileName(options_.db_paths, t->meta.fd.GetNumber(),
366
3
                                      t->meta.fd.GetPathId());
367
3
    int counter = 0;
368
3
    uint64_t base_file_size;
369
3
    Status status = env_->GetFileSize(fname, &base_file_size);
370
3
    t->meta.fd = FileDescriptor(t->meta.fd.GetNumber(), t->meta.fd.GetPathId(),
371
3
                                t->meta.fd.total_file_size, base_file_size);
372
3
    if (status.ok()) {
373
3
      TableReader* reader;
374
3
      InternalIterator* iter = table_cache_->NewIterator(
375
3
          ReadOptions(), env_options_, icmp_, t->meta.fd, t->meta.UserFilter(), &reader);
376
3
      t->meta.fd.total_file_size = base_file_size +
377
3
          (reader->IsSplitSst() ? reader->GetTableProperties()->data_size : 
00
);
378
3
      bool empty = true;
379
3
      ParsedInternalKey parsed;
380
3
      t->min_sequence = 0;
381
3
      t->max_sequence = 0;
382
1.00k
      for (iter->SeekToFirst(); iter->Valid(); 
iter->Next()1.00k
) {
383
1.00k
        Slice key = iter->key();
384
1.00k
        if (!ParseInternalKey(key, &parsed)) {
385
0
          RLOG(InfoLogLevel::ERROR_LEVEL,
386
0
              options_.info_log, "Table #%" PRIu64 ": unparsable key %s",
387
0
              t->meta.fd.GetNumber(), EscapeString(key).c_str());
388
0
          continue;
389
0
        }
390
391
1.00k
        counter++;
392
1.00k
        if (empty) {
393
3
          empty = false;
394
3
          t->meta.smallest.key = InternalKey::DecodeFrom(key);
395
3
        }
396
1.00k
        t->meta.largest.key = InternalKey::DecodeFrom(key);
397
1.00k
        if (parsed.sequence < t->min_sequence) {
398
0
          t->min_sequence = parsed.sequence;
399
0
        }
400
1.00k
        if (parsed.sequence > t->max_sequence) {
401
1.00k
          t->max_sequence = parsed.sequence;
402
1.00k
        }
403
1.00k
      }
404
3
      if (!iter->status().ok()) {
405
0
        status = iter->status();
406
0
      }
407
3
      delete iter;
408
3
    }
409
3
    RLOG(InfoLogLevel::INFO_LEVEL,
410
3
        options_.info_log, "Table #%" PRIu64 ": %d entries %s",
411
3
        t->meta.fd.GetNumber(), counter, status.ToString().c_str());
412
3
    return status;
413
3
  }
414
415
3
  Status WriteDescriptor() {
416
3
    std::string tmp = TempFileName(dbname_, 1);
417
3
    unique_ptr<WritableFile> file;
418
3
    EnvOptions env_options = env_->OptimizeForManifestWrite(env_options_);
419
3
    Status status = env_->NewWritableFile(tmp, &file, env_options);
420
3
    if (!status.ok()) {
421
0
      return status;
422
0
    }
423
424
3
    SequenceNumber max_sequence = 0;
425
6
    for (size_t i = 0; i < tables_.size(); 
i++3
) {
426
3
      if (max_sequence < tables_[i].max_sequence) {
427
3
        max_sequence = tables_[i].max_sequence;
428
3
      }
429
3
    }
430
431
3
    edit_->SetComparatorName(icmp_->user_comparator()->Name());
432
3
    edit_->SetLogNumber(0);
433
3
    edit_->SetNextFile(next_file_number_);
434
3
    edit_->SetLastSequence(max_sequence);
435
436
6
    for (size_t i = 0; i < tables_.size(); 
i++3
) {
437
      // TODO(opt): separate out into multiple levels
438
3
      const TableInfo& t = tables_[i];
439
3
      auto meta = t.meta;
440
3
      meta.smallest.seqno = t.min_sequence;
441
3
      meta.largest.seqno = t.max_sequence;
442
3
      edit_->AddCleanedFile(0, meta);
443
3
    }
444
445
    // fprintf(stderr, "NewDescriptor:\n%s\n", edit_.DebugString().c_str());
446
3
    {
447
3
      unique_ptr<WritableFileWriter> file_writer(
448
3
          new WritableFileWriter(std::move(file), env_options));
449
3
      log::Writer log(std::move(file_writer), 0, false);
450
3
      std::string record;
451
3
      edit_->AppendEncodedTo(&record);
452
3
      status = log.AddRecord(record);
453
3
    }
454
455
3
    if (status.ok()) {
456
      // Discard older manifests
457
6
      for (size_t i = 0; i < manifests_.size(); 
i++3
) {
458
3
        ArchiveFile(dbname_ + "/" + manifests_[i]);
459
3
      }
460
461
      // Install new manifest
462
3
      status = env_->RenameFile(tmp, DescriptorFileName(dbname_, 1));
463
3
      if (status.ok()) {
464
3
        status = SetCurrentFile(env_, dbname_, 1, nullptr, options_.disableDataSync);
465
3
      }
466
3
    }
467
3
    if (!status.ok()) {
468
0
      env_->CleanupFile(tmp);
469
0
    }
470
3
    return status;
471
3
  }
472
473
6
  void ArchiveFile(const std::string& fname) {
474
    // Move into another directory.  E.g., for
475
    //    dir/foo
476
    // rename to
477
    //    dir/lost/foo
478
6
    const char* slash = strrchr(fname.c_str(), '/');
479
6
    std::string new_dir;
480
6
    if (slash != nullptr) {
481
6
      new_dir.assign(fname.data(), slash - fname.data());
482
6
    }
483
6
    new_dir.append("/lost");
484
6
    WARN_NOT_OK(env_->CreateDir(new_dir), "Failed to create dir " + new_dir);
485
6
    std::string new_file = new_dir;
486
6
    new_file.append("/");
487
6
    new_file.append((slash == nullptr) ? 
fname.c_str()0
: slash + 1);
488
6
    Status s = env_->RenameFile(fname, new_file);
489
6
    RLOG(InfoLogLevel::INFO_LEVEL,
490
6
        options_.info_log, "Archiving %s: %s\n",
491
6
        fname.c_str(), s.ToString().c_str());
492
6
  }
493
};
494
}  // namespace
495
496
3
Status RepairDB(const std::string& dbname, const Options& options) {
497
3
  Repairer repairer(dbname, options);
498
3
  return repairer.Run();
499
3
}
500
501
}  // namespace rocksdb
502
503
#endif  // ROCKSDB_LITE