YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/rocksdb/db/wal_manager.cc
Line
Count
Source (jump to first uncovered line)
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under the BSD-style license found in the
3
//  LICENSE file in the root directory of this source tree. An additional grant
4
//  of patent rights can be found in the PATENTS file in the same directory.
5
//
6
// The following only applies to changes made to this file as part of YugaByte development.
7
//
8
// Portions Copyright (c) YugaByte, Inc.
9
//
10
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
11
// in compliance with the License.  You may obtain a copy of the License at
12
//
13
// http://www.apache.org/licenses/LICENSE-2.0
14
//
15
// Unless required by applicable law or agreed to in writing, software distributed under the License
16
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
17
// or implied.  See the License for the specific language governing permissions and limitations
18
// under the License.
19
//
20
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
21
// Use of this source code is governed by a BSD-style license that can be
22
// found in the LICENSE file. See the AUTHORS file for names of contributors.
23
24
#include "yb/rocksdb/db/wal_manager.h"
25
26
#ifndef __STDC_FORMAT_MACROS
27
#define __STDC_FORMAT_MACROS
28
#endif
29
30
#include <inttypes.h>
31
#include <algorithm>
32
#include <vector>
33
#include <memory>
34
35
#include "yb/rocksdb/db/filename.h"
36
#include "yb/rocksdb/db/transaction_log_impl.h"
37
#include "yb/rocksdb/db/log_reader.h"
38
#include "yb/rocksdb/db/write_batch_internal.h"
39
#include "yb/rocksdb/port/port.h"
40
#include "yb/rocksdb/env.h"
41
#include "yb/rocksdb/options.h"
42
#include "yb/rocksdb/write_batch.h"
43
#include "yb/rocksdb/util/coding.h"
44
#include "yb/rocksdb/util/file_reader_writer.h"
45
#include "yb/rocksdb/util/logging.h"
46
#include "yb/rocksdb/util/mutexlock.h"
47
#include "yb/rocksdb/util/sync_point.h"
48
49
#include "yb/util/status_log.h"
50
#include "yb/util/string_util.h"
51
52
namespace rocksdb {
53
54
#ifndef ROCKSDB_LITE
55
56
2.15k
Status WalManager::GetSortedWalFiles(VectorLogPtr* files) {
57
  // First get sorted files in db dir, then get sorted files from archived
58
  // dir, to avoid a race condition where a log file is moved to archived
59
  // dir in between.
60
2.15k
  Status s;
61
  // list wal files in main db dir.
62
2.15k
  VectorLogPtr logs;
63
2.15k
  s = GetSortedWalsOfType(db_options_.wal_dir, &logs, kAliveLogFile);
64
2.15k
  if (!s.ok()) {
65
0
    return s;
66
0
  }
67
68
  // Reproduce the race condition where a log file is moved
69
  // to archived dir, between these two sync points, used in
70
  // (DBTest,TransactionLogIteratorRace)
71
2.15k
  TEST_SYNC_POINT("WalManager::GetSortedWalFiles:1");
72
2.15k
  TEST_SYNC_POINT("WalManager::GetSortedWalFiles:2");
73
74
2.15k
  files->clear();
75
  // list wal files in archive dir.
76
2.15k
  std::string archivedir = ArchivalDirectory(db_options_.wal_dir);
77
2.15k
  Status exists = env_->FileExists(archivedir);
78
2.15k
  if (exists.ok()) {
79
57
    s = GetSortedWalsOfType(archivedir, files, kArchivedLogFile);
80
57
    if (!s.ok()) {
81
0
      return s;
82
0
    }
83
2.10k
  } else if (!exists.IsNotFound()) {
84
0
    assert(s.IsIOError());
85
0
    return s;
86
0
  }
87
88
2.15k
  uint64_t latest_archived_log_number = 0;
89
2.15k
  if (!files->empty()) {
90
45
    latest_archived_log_number = files->back()->LogNumber();
91
45
    RLOG(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
92
45
        "Latest Archived log: %" PRIu64,
93
45
        latest_archived_log_number);
94
45
  }
95
96
2.15k
  files->reserve(files->size() + logs.size());
97
2.15k
  for (auto& log : logs) {
98
208
    if (log->LogNumber() > latest_archived_log_number) {
99
202
      files->push_back(std::move(log));
100
202
    } else {
101
      // When the race condition happens, we could see the
102
      // same log in both db dir and archived dir. Simply
103
      // ignore the one in db dir. Note that, if we read
104
      // archived dir first, we would have missed the log file.
105
6
      RLOG(InfoLogLevel::WARN_LEVEL, db_options_.info_log,
106
6
          "%s already moved to archive", log->PathName().c_str());
107
6
    }
108
208
  }
109
110
2.15k
  return s;
111
2.15k
}
112
113
Status WalManager::GetUpdatesSince(
114
    SequenceNumber seq, std::unique_ptr<TransactionLogIterator>* iter,
115
    const TransactionLogIterator::ReadOptions& read_options,
116
50
    VersionSet* version_set) {
117
118
  //  Get all sorted Wal Files.
119
  //  Do binary search and open files and find the seq number.
120
121
50
  std::unique_ptr<VectorLogPtr> wal_files(new VectorLogPtr);
122
50
  Status s = GetSortedWalFiles(wal_files.get());
123
50
  if (!s.ok()) {
124
0
    return s;
125
0
  }
126
127
50
  s = RetainProbableWalFiles(wal_files.get(), seq);
128
50
  if (!s.ok()) {
129
0
    return s;
130
0
  }
131
50
  iter->reset(new TransactionLogIteratorImpl(
132
50
      db_options_.wal_dir, &db_options_, read_options, env_options_, seq,
133
50
      std::move(wal_files), version_set));
134
50
  return (*iter)->status();
135
50
}
136
137
// 1. Go through all archived files and
138
//    a. if ttl is enabled, delete outdated files
139
//    b. if archive size limit is enabled, delete empty files,
140
//        compute file number and size.
141
// 2. If size limit is enabled:
142
//    a. compute how many files should be deleted
143
//    b. get sorted non-empty archived logs
144
//    c. delete what should be deleted
145
1.27M
void WalManager::PurgeObsoleteWALFiles() {
146
1.27M
  bool const ttl_enabled = db_options_.WAL_ttl_seconds > 0;
147
1.27M
  bool const size_limit_enabled = db_options_.WAL_size_limit_MB > 0;
148
1.27M
  if (!ttl_enabled && 
!size_limit_enabled1.27M
) {
149
1.27M
    return;
150
1.27M
  }
151
152
387
  int64_t current_time;
153
387
  Status s = env_->GetCurrentTime(&current_time);
154
387
  if (!s.ok()) {
155
0
    RLOG(InfoLogLevel::ERROR_LEVEL, db_options_.info_log,
156
0
        "Can't get current time: %s", s.ToString().c_str());
157
0
    assert(false);
158
0
    return;
159
0
  }
160
387
  uint64_t const now_seconds = static_cast<uint64_t>(current_time);
161
387
  uint64_t const time_to_check = (ttl_enabled && 
!size_limit_enabled314
)
162
387
                                     ? 
db_options_.WAL_ttl_seconds / 2232
163
387
                                     : 
kDefaultIntervalToDeleteObsoleteWAL155
;
164
165
387
  if (purge_wal_files_last_run_ + time_to_check > now_seconds) {
166
216
    return;
167
216
  }
168
169
171
  purge_wal_files_last_run_ = now_seconds;
170
171
171
  std::string archival_dir = ArchivalDirectory(db_options_.wal_dir);
172
171
  std::vector<std::string> files;
173
171
  s = env_->GetChildren(archival_dir, &files);
174
171
  if (!s.ok()) {
175
0
    RLOG(InfoLogLevel::ERROR_LEVEL, db_options_.info_log,
176
0
        "Can't get archive files: %s", s.ToString().c_str());
177
0
    assert(false);
178
0
    return;
179
0
  }
180
181
171
  size_t log_files_num = 0;
182
171
  uint64_t log_file_size = 0;
183
184
314
  for (auto& f : files) {
185
314
    uint64_t number;
186
314
    FileType type;
187
314
    if (ParseFileName(f, &number, &type) && 
type == kLogFile122
) {
188
122
      std::string const file_path = archival_dir + "/" + f;
189
122
      if (ttl_enabled) {
190
102
        uint64_t file_m_time;
191
102
        s = env_->GetFileModificationTime(file_path, &file_m_time);
192
102
        if (!s.ok()) {
193
0
          RLOG(InfoLogLevel::WARN_LEVEL, db_options_.info_log,
194
0
              "Can't get file mod time: %s: %s",
195
0
              file_path.c_str(), s.ToString().c_str());
196
0
          continue;
197
0
        }
198
102
        if (now_seconds - file_m_time > db_options_.WAL_ttl_seconds) {
199
21
          s = env_->DeleteFile(file_path);
200
21
          if (!s.ok()) {
201
0
            RLOG(InfoLogLevel::WARN_LEVEL, db_options_.info_log,
202
0
                "Can't delete file: %s: %s",
203
0
                file_path.c_str(), s.ToString().c_str());
204
0
            continue;
205
21
          } else {
206
21
            MutexLock l(&read_first_record_cache_mutex_);
207
21
            read_first_record_cache_.erase(number);
208
21
          }
209
21
          continue;
210
21
        }
211
102
      }
212
213
101
      if (size_limit_enabled) {
214
20
        uint64_t file_size;
215
20
        s = env_->GetFileSize(file_path, &file_size);
216
20
        if (!s.ok()) {
217
0
          RLOG(InfoLogLevel::ERROR_LEVEL, db_options_.info_log,
218
0
              "Unable to get file size: %s: %s",
219
0
              file_path.c_str(), s.ToString().c_str());
220
0
          return;
221
20
        } else {
222
20
          if (file_size > 0) {
223
20
            log_file_size = std::max(log_file_size, file_size);
224
20
            ++log_files_num;
225
20
          } else {
226
0
            s = env_->DeleteFile(file_path);
227
0
            if (!s.ok()) {
228
0
              RLOG(InfoLogLevel::WARN_LEVEL, db_options_.info_log,
229
0
                  "Unable to delete file: %s: %s",
230
0
                  file_path.c_str(), s.ToString().c_str());
231
0
              continue;
232
0
            } else {
233
0
              MutexLock l(&read_first_record_cache_mutex_);
234
0
              read_first_record_cache_.erase(number);
235
0
            }
236
0
          }
237
20
        }
238
20
      }
239
101
    }
240
314
  }
241
242
171
  if (0 == log_files_num || 
!size_limit_enabled1
) {
243
98
    return;
244
98
  }
245
246
73
  size_t const files_keep_num =
247
73
      db_options_.WAL_size_limit_MB * 1024 * 1024 / log_file_size;
248
73
  if (log_files_num <= files_keep_num) {
249
0
    return;
250
0
  }
251
252
73
  size_t files_del_num = log_files_num - files_keep_num;
253
73
  VectorLogPtr archived_logs;
254
73
  CHECK_OK(GetSortedWalsOfType(archival_dir, &archived_logs, kArchivedLogFile));
255
256
73
  if (files_del_num > archived_logs.size()) {
257
0
    RLOG(InfoLogLevel::WARN_LEVEL, db_options_.info_log,
258
0
        "Trying to delete more archived log files than "
259
0
        "exist. Deleting all");
260
0
    files_del_num = archived_logs.size();
261
0
  }
262
263
92
  for (size_t i = 0; i < files_del_num; 
++i19
) {
264
19
    std::string const file_path = archived_logs[i]->PathName();
265
19
    s = env_->DeleteFile(db_options_.wal_dir + "/" + file_path);
266
19
    if (!s.ok()) {
267
0
      RLOG(InfoLogLevel::WARN_LEVEL, db_options_.info_log,
268
0
          "Unable to delete file: %s: %s", file_path.c_str(),
269
0
          s.ToString().c_str());
270
0
      continue;
271
19
    } else {
272
19
      MutexLock l(&read_first_record_cache_mutex_);
273
19
      read_first_record_cache_.erase(archived_logs[i]->LogNumber());
274
19
    }
275
19
  }
276
73
}
277
278
108
void WalManager::ArchiveWALFile(const std::string& fname, uint64_t number) {
279
108
  auto archived_log_name = ArchivedLogFileName(db_options_.wal_dir, number);
280
  // The sync point below is used in (DBTest,TransactionLogIteratorRace)
281
108
  TEST_SYNC_POINT("WalManager::PurgeObsoleteFiles:1");
282
108
  Status s = env_->RenameFile(fname, archived_log_name);
283
  // The sync point below is used in (DBTest,TransactionLogIteratorRace)
284
108
  TEST_SYNC_POINT("WalManager::PurgeObsoleteFiles:2");
285
108
  RLOG(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
286
108
      "Move log file %s to %s -- %s\n", fname.c_str(),
287
108
      archived_log_name.c_str(), s.ToString().c_str());
288
108
}
289
290
namespace {
291
struct CompareLogByPointer {
292
  bool operator()(const std::unique_ptr<LogFile>& a,
293
438
                  const std::unique_ptr<LogFile>& b) {
294
438
    LogFileImpl* a_impl = dynamic_cast<LogFileImpl*>(a.get());
295
438
    LogFileImpl* b_impl = dynamic_cast<LogFileImpl*>(b.get());
296
438
    return *a_impl < *b_impl;
297
438
  }
298
};
299
}
300
301
Status WalManager::GetSortedWalsOfType(const std::string& path,
302
                                       VectorLogPtr* log_files,
303
2.21k
                                       WalFileType log_type) {
304
2.21k
  std::vector<std::string> all_files;
305
2.21k
  const Status status = env_->GetChildren(path, &all_files);
306
2.21k
  if (!status.ok()) {
307
0
    return status;
308
0
  }
309
2.21k
  log_files->reserve(all_files.size());
310
27.8k
  for (const auto& f : all_files) {
311
27.8k
    uint64_t number;
312
27.8k
    FileType type;
313
27.8k
    if (ParseFileName(f, &number, &type) && 
type == kLogFile21.7k
) {
314
2.42k
      SequenceNumber sequence;
315
2.42k
      Status s = ReadFirstRecord(log_type, number, &sequence);
316
2.42k
      if (!s.ok()) {
317
0
        return s;
318
0
      }
319
2.42k
      if (sequence == 0) {
320
        // empty file
321
2.11k
        continue;
322
2.11k
      }
323
324
      // Reproduce the race condition where a log file is moved
325
      // to archived dir, between these two sync points, used in
326
      // (DBTest,TransactionLogIteratorRace)
327
306
      TEST_SYNC_POINT("WalManager::GetSortedWalsOfType:1");
328
306
      TEST_SYNC_POINT("WalManager::GetSortedWalsOfType:2");
329
330
306
      uint64_t size_bytes;
331
306
      s = env_->GetFileSize(LogFileName(path, number), &size_bytes);
332
      // re-try in case the alive log file has been moved to archive.
333
306
      std::string archived_file = ArchivedLogFileName(path, number);
334
306
      if (!s.ok() && 
log_type == kAliveLogFile1
&&
335
306
          
env_->FileExists(archived_file).ok()1
) {
336
1
        s = env_->GetFileSize(archived_file, &size_bytes);
337
1
        if (!s.ok() && 
env_->FileExists(archived_file).IsNotFound()0
) {
338
          // oops, the file just got deleted from archived dir! move on
339
0
          s = Status::OK();
340
0
          continue;
341
0
        }
342
1
      }
343
306
      if (!s.ok()) {
344
0
        return s;
345
0
      }
346
347
306
      log_files->push_back(std::unique_ptr<LogFile>(
348
306
          new LogFileImpl(number, log_type, sequence, size_bytes)));
349
306
    }
350
27.8k
  }
351
2.21k
  CompareLogByPointer compare_log_files;
352
2.21k
  std::sort(log_files->begin(), log_files->end(), compare_log_files);
353
2.21k
  return status;
354
2.21k
}
355
356
Status WalManager::RetainProbableWalFiles(VectorLogPtr* all_logs,
357
50
                                          const SequenceNumber target) {
358
50
  int64_t start = 0;  // signed to avoid overflow when target is < first file.
359
50
  int64_t end = static_cast<int64_t>(all_logs->size()) - 1;
360
  // Binary Search. avoid opening all files.
361
121
  while (end >= start) {
362
71
    int64_t mid = start + (end - start) / 2;  // Avoid overflow.
363
71
    SequenceNumber current_seq_num = (*all_logs)[mid]->StartSequence();
364
71
    if (current_seq_num == target) {
365
0
      end = mid;
366
0
      break;
367
71
    } else if (current_seq_num < target) {
368
10
      start = mid + 1;
369
61
    } else {
370
61
      end = mid - 1;
371
61
    }
372
71
  }
373
  // end could be -ve.
374
50
  size_t start_index = std::max<int64_t>(0, end);
375
  // The last wal file is always included
376
50
  all_logs->erase(all_logs->begin(), all_logs->begin() + start_index);
377
50
  return Status::OK();
378
50
}
379
380
Status WalManager::ReadFirstRecord(const WalFileType type,
381
                                   const uint64_t number,
382
2.42k
                                   SequenceNumber* sequence) {
383
2.42k
  *sequence = 0;
384
2.42k
  if (type != kAliveLogFile && 
type != kArchivedLogFile125
) {
385
0
    RLOG(InfoLogLevel::ERROR_LEVEL, db_options_.info_log,
386
0
        "[WalManger] Unknown file type %s", ToString(type).c_str());
387
0
    return STATUS(NotSupported,
388
0
        "File Type Not Known " + ToString(type));
389
0
  }
390
2.42k
  {
391
2.42k
    MutexLock l(&read_first_record_cache_mutex_);
392
2.42k
    auto itr = read_first_record_cache_.find(number);
393
2.42k
    if (itr != read_first_record_cache_.end()) {
394
168
      *sequence = itr->second;
395
168
      return Status::OK();
396
168
    }
397
2.42k
  }
398
2.25k
  Status s;
399
2.25k
  if (type == kAliveLogFile) {
400
2.16k
    std::string fname = LogFileName(db_options_.wal_dir, number);
401
2.16k
    s = ReadFirstLine(fname, sequence);
402
2.16k
    if (
env_->FileExists(fname).ok()2.16k
&& !s.ok()) {
403
      // return any error that is not caused by non-existing file
404
0
      return s;
405
0
    }
406
2.16k
  }
407
408
2.25k
  if (type == kArchivedLogFile || 
!s.ok()2.16k
) {
409
    //  check if the file got moved to archive.
410
90
    std::string archived_file =
411
90
        ArchivedLogFileName(db_options_.wal_dir, number);
412
90
    s = ReadFirstLine(archived_file, sequence);
413
    // maybe the file was deleted from archive dir. If that's the case, return
414
    // Status::OK(). The caller with identify this as empty file because
415
    // *sequence == 0
416
90
    if (!s.ok() && 
env_->FileExists(archived_file).IsNotFound()0
) {
417
0
      return Status::OK();
418
0
    }
419
90
  }
420
421
2.25k
  if (s.ok() && 
*sequence != 02.25k
) {
422
139
    MutexLock l(&read_first_record_cache_mutex_);
423
139
    read_first_record_cache_.insert({number, *sequence});
424
139
  }
425
2.25k
  return s;
426
2.25k
}
427
428
// the function returns status.ok() and sequence == 0 if the file exists, but is
429
// empty
430
Status WalManager::ReadFirstLine(const std::string& fname,
431
2.25k
                                 SequenceNumber* sequence) {
432
2.25k
  struct LogReporter : public log::Reader::Reporter {
433
2.25k
    Env* env;
434
2.25k
    Logger* info_log;
435
2.25k
    const char* fname;
436
437
2.25k
    Status* status;
438
2.25k
    bool ignore_error;  // true if db_options_.paranoid_checks==false
439
2.25k
    void Corruption(size_t bytes, const Status& s) override {
440
0
      RLOG(InfoLogLevel::WARN_LEVEL, info_log,
441
0
          "[WalManager] %s%s: dropping %d bytes; %s",
442
0
          (this->ignore_error ? "(ignoring error) " : ""), fname,
443
0
          static_cast<int>(bytes), s.ToString().c_str());
444
0
      if (this->status->ok()) {
445
        // only keep the first error
446
0
        *this->status = s;
447
0
      }
448
0
    }
449
2.25k
  };
450
451
2.25k
  std::unique_ptr<SequentialFile> file;
452
2.25k
  Status status = env_->NewSequentialFile(fname, &file, env_options_);
453
2.25k
  unique_ptr<SequentialFileReader> file_reader(
454
2.25k
      new SequentialFileReader(std::move(file)));
455
456
2.25k
  if (!status.ok()) {
457
0
    return status;
458
0
  }
459
460
2.25k
  LogReporter reporter;
461
2.25k
  reporter.env = env_;
462
2.25k
  reporter.info_log = db_options_.info_log.get();
463
2.25k
  reporter.fname = fname.c_str();
464
2.25k
  reporter.status = &status;
465
2.25k
  reporter.ignore_error = !db_options_.paranoid_checks;
466
2.25k
  log::Reader reader(db_options_.info_log, std::move(file_reader), &reporter,
467
2.25k
                     true /*checksum*/, 0 /*initial_offset*/, *sequence);
468
2.25k
  std::string scratch;
469
2.25k
  Slice record;
470
471
2.25k
  if (reader.ReadRecord(&record, &scratch) &&
472
2.25k
      
(139
status.ok()139
||
!db_options_.paranoid_checks0
)) {
473
139
    if (record.size() < 12) {
474
0
      reporter.Corruption(record.size(),
475
0
                          STATUS(Corruption, "log record too small"));
476
      // TODO read record's till the first no corrupt entry?
477
139
    } else {
478
139
      WriteBatch batch;
479
139
      WriteBatchInternal::SetContents(&batch, record);
480
139
      *sequence = WriteBatchInternal::Sequence(&batch);
481
139
      return Status::OK();
482
139
    }
483
139
  }
484
485
  // ReadRecord returns false on EOF, which means that the log file is empty. we
486
  // return status.ok() in that case and set sequence number to 0
487
2.11k
  *sequence = 0;
488
2.11k
  return status;
489
2.25k
}
490
491
#endif  // ROCKSDB_LITE
492
}  // namespace rocksdb