YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/rocksdb/db/flush_job.cc
Line
Count
Source (jump to first uncovered line)
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under the BSD-style license found in the
3
//  LICENSE file in the root directory of this source tree. An additional grant
4
//  of patent rights can be found in the PATENTS file in the same directory.
5
//
6
// The following only applies to changes made to this file as part of YugaByte development.
7
//
8
// Portions Copyright (c) YugaByte, Inc.
9
//
10
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
11
// in compliance with the License.  You may obtain a copy of the License at
12
//
13
// http://www.apache.org/licenses/LICENSE-2.0
14
//
15
// Unless required by applicable law or agreed to in writing, software distributed under the License
16
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
17
// or implied.  See the License for the specific language governing permissions and limitations
18
// under the License.
19
//
20
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
21
// Use of this source code is governed by a BSD-style license that can be
22
// found in the LICENSE file. See the AUTHORS file for names of contributors.
23
24
#include "yb/rocksdb/db/flush_job.h"
25
26
#ifndef __STDC_FORMAT_MACROS
27
#define __STDC_FORMAT_MACROS
28
#endif
29
30
#include <inttypes.h>
31
32
#include <algorithm>
33
#include <vector>
34
#include <chrono>
35
36
#include "yb/rocksdb/db/builder.h"
37
#include "yb/rocksdb/db/dbformat.h"
38
#include "yb/rocksdb/db/event_helpers.h"
39
#include "yb/rocksdb/db/filename.h"
40
#include "yb/rocksdb/db/file_numbers.h"
41
#include "yb/rocksdb/db/internal_stats.h"
42
#include "yb/rocksdb/db/log_reader.h"
43
#include "yb/rocksdb/db/log_writer.h"
44
#include "yb/rocksdb/db/memtable.h"
45
#include "yb/rocksdb/db/memtable_list.h"
46
#include "yb/rocksdb/db/version_set.h"
47
#include "yb/rocksdb/port/likely.h"
48
#include "yb/rocksdb/port/port.h"
49
#include "yb/rocksdb/db.h"
50
#include "yb/rocksdb/env.h"
51
#include "yb/rocksdb/statistics.h"
52
#include "yb/rocksdb/status.h"
53
#include "yb/rocksdb/table.h"
54
#include "yb/rocksdb/table/merger.h"
55
#include "yb/rocksdb/table/scoped_arena_iterator.h"
56
#include "yb/rocksdb/util/coding.h"
57
#include "yb/rocksdb/util/event_logger.h"
58
#include "yb/rocksdb/util/log_buffer.h"
59
#include "yb/rocksdb/util/logging.h"
60
#include "yb/rocksdb/util/mutexlock.h"
61
#include "yb/rocksdb/util/statistics.h"
62
#include "yb/rocksdb/util/stop_watch.h"
63
#include "yb/rocksdb/util/sync_point.h"
64
65
#include "yb/util/atomic.h"
66
#include "yb/util/flag_tags.h"
67
#include "yb/util/logging.h"
68
#include "yb/util/result.h"
69
#include "yb/util/stats/iostats_context_imp.h"
70
71
DEFINE_int32(rocksdb_nothing_in_memtable_to_flush_sleep_ms, 10,
72
    "Used for a temporary workaround for http://bit.ly/ybissue437. How long to wait (ms) in case "
73
    "we could not flush any memtables, usually due to filters preventing us from doing so.");
74
75
DEFINE_test_flag(bool, rocksdb_crash_on_flush, false,
76
                 "When set, memtable flush in rocksdb crashes.");
77
78
DEFINE_bool(rocksdb_release_mutex_during_wait_for_memtables_to_flush, true,
79
            "When a flush is scheduled, but there isn't a memtable eligible yet, release "
80
            "the mutex before going to sleep and reacquire it post sleep.");
81
82
namespace rocksdb {
83
84
FlushJob::FlushJob(const std::string& dbname, ColumnFamilyData* cfd,
85
                   const DBOptions& db_options,
86
                   const MutableCFOptions& mutable_cf_options,
87
                   const EnvOptions& env_options, VersionSet* versions,
88
                   InstrumentedMutex* db_mutex,
89
                   std::atomic<bool>* shutting_down,
90
                   std::atomic<bool>* disable_flush_on_shutdown,
91
                   std::vector<SequenceNumber> existing_snapshots,
92
                   SequenceNumber earliest_write_conflict_snapshot,
93
                   MemTableFilter mem_table_flush_filter,
94
                   FileNumbersProvider* file_numbers_provider,
95
                   JobContext* job_context, LogBuffer* log_buffer,
96
                   Directory* db_directory, Directory* output_file_directory,
97
                   CompressionType output_compression, Statistics* stats,
98
                   EventLogger* event_logger)
99
    : dbname_(dbname),
100
      cfd_(cfd),
101
      db_options_(db_options),
102
      mutable_cf_options_(mutable_cf_options),
103
      env_options_(env_options),
104
      versions_(versions),
105
      db_mutex_(db_mutex),
106
      shutting_down_(shutting_down),
107
      disable_flush_on_shutdown_(disable_flush_on_shutdown),
108
      existing_snapshots_(std::move(existing_snapshots)),
109
      earliest_write_conflict_snapshot_(earliest_write_conflict_snapshot),
110
      mem_table_flush_filter_(std::move(mem_table_flush_filter)),
111
      file_numbers_provider_(file_numbers_provider),
112
      job_context_(job_context),
113
      log_buffer_(log_buffer),
114
      db_directory_(db_directory),
115
      output_file_directory_(output_file_directory),
116
      output_compression_(output_compression),
117
      stats_(stats),
118
31.8k
      event_logger_(event_logger) {
119
  // Update the thread status to indicate flush.
120
31.8k
  ReportStartedFlush();
121
31.8k
  TEST_SYNC_POINT("FlushJob::FlushJob()");
122
31.8k
}
123
124
31.8k
FlushJob::~FlushJob() {
125
31.8k
}
126
127
31.8k
void FlushJob::ReportStartedFlush() {
128
31.8k
  IOSTATS_RESET(bytes_written);
129
31.8k
}
130
131
25.0k
void FlushJob::ReportFlushInputSize(const autovector<MemTable*>& mems) {
132
25.0k
  uint64_t input_size = 0;
133
25.4k
  for (auto* mem : mems) {
134
25.4k
    input_size += mem->ApproximateMemoryUsage();
135
25.4k
  }
136
25.0k
}
137
138
25.0k
void FlushJob::RecordFlushIOStats() {
139
25.0k
}
140
141
31.8k
Result<FileNumbersHolder> FlushJob::Run(FileMetaData* file_meta) {
142
31.8k
  if (PREDICT_FALSE(yb::GetAtomicFlag(&FLAGS_TEST_rocksdb_crash_on_flush))) {
143
0
    CHECK(false) << "a flush should not have been scheduled.";
144
0
  }
145
146
  // Save the contents of the earliest memtable as a new Table
147
31.8k
  FileMetaData meta;
148
31.8k
  autovector<MemTable*> mems;
149
31.8k
  cfd_->imm()->PickMemtablesToFlush(&mems, mem_table_flush_filter_);
150
31.8k
  if (mems.empty()) {
151
    // A temporary workaround for repeated "Nothing in memtable to flush" messages in a
152
    // transactional workload due to the flush filter preventing us from flushing any memtables in
153
    // the provisional records RocksDB.
154
    //
155
    // See https://github.com/yugabyte/yugabyte-db/issues/437 for more details.
156
6.84k
    YB_LOG_EVERY_N_SECS(INFO, 1)
157
129
        << db_options_.log_prefix
158
129
        << "[" << cfd_->GetName() << "] No eligible memtables to flush.";
159
160
6.84k
    bool release_mutex = FLAGS_rocksdb_release_mutex_during_wait_for_memtables_to_flush;
161
162
6.84k
    if (release_mutex) {
163
      // Release the mutex before the sleep, so as to unblock writers.
164
6.84k
      db_mutex_->Unlock();
165
6.84k
    }
166
167
6.84k
      std::this_thread::sleep_for(std::chrono::milliseconds(
168
6.84k
        FLAGS_rocksdb_nothing_in_memtable_to_flush_sleep_ms));
169
170
6.84k
    if (release_mutex) {
171
6.84k
      db_mutex_->Lock();
172
6.84k
    }
173
174
6.84k
    return FileNumbersHolder();
175
6.84k
  }
176
177
25.0k
  ReportFlushInputSize(mems);
178
179
  // entries mems are (implicitly) sorted in ascending order by their created
180
  // time. We will use the first memtable's `edit` to keep the meta info for
181
  // this flush.
182
25.0k
  MemTable* m = mems[0];
183
25.0k
  VersionEdit* edit = m->GetEdits();
184
25.0k
  edit->SetPrevLogNumber(0);
185
  // SetLogNumber(log_num) indicates logs with number smaller than log_num
186
  // will no longer be picked up for recovery.
187
25.0k
  edit->SetLogNumber(mems.back()->GetNextLogNumber());
188
25.0k
  edit->SetColumnFamily(cfd_->GetID());
189
190
  // This will release and re-acquire the mutex.
191
25.0k
  auto fnum = WriteLevel0Table(mems, edit, &meta);
192
193
25.0k
  if (fnum.ok() && ((shutting_down_->load(std::memory_order_acquire) &&
194
721
                     disable_flush_on_shutdown_->load(std::memory_order_acquire)) ||
195
25.0k
                    cfd_->IsDropped())) {
196
0
    fnum = STATUS(ShutdownInProgress, "Database shutdown or Column family drop during flush");
197
0
  }
198
199
25.0k
  if (!fnum.ok()) {
200
18
    cfd_->imm()->RollbackMemtableFlush(mems, meta.fd.GetNumber());
201
25.0k
  } else {
202
25.0k
    TEST_SYNC_POINT("FlushJob::InstallResults");
203
    // Replace immutable memtable with the generated Table
204
25.0k
    Status s = cfd_->imm()->InstallMemtableFlushResults(
205
25.0k
        cfd_, mutable_cf_options_, mems, versions_, db_mutex_,
206
25.0k
        meta.fd.GetNumber(), &job_context_->memtables_to_free, db_directory_,
207
25.0k
        log_buffer_, *fnum);
208
25.0k
    if (!s.ok()) {
209
1
      fnum = s;
210
1
    }
211
25.0k
  }
212
213
25.0k
  if (fnum.ok() && file_meta != nullptr) {
214
25.0k
    *file_meta = meta;
215
25.0k
  }
216
25.0k
  RecordFlushIOStats();
217
218
25.0k
  auto stream = event_logger_->LogToBuffer(log_buffer_);
219
25.0k
  stream << "job" << job_context_->job_id << "event"
220
25.0k
         << "flush_finished";
221
25.0k
  stream << "lsm_state";
222
25.0k
  stream.StartArray();
223
25.0k
  auto vstorage = cfd_->current()->storage_info();
224
166k
  for (int level = 0; level < vstorage->num_levels(); ++level) {
225
141k
    stream << vstorage->NumLevelFiles(level);
226
141k
  }
227
25.0k
  stream.EndArray();
228
229
25.0k
  return fnum;
230
25.0k
}
231
232
Result<FileNumbersHolder> FlushJob::WriteLevel0Table(
233
25.0k
    const autovector<MemTable*>& mems, VersionEdit* edit, FileMetaData* meta) {
234
25.0k
  db_mutex_->AssertHeld();
235
25.0k
  const uint64_t start_micros = db_options_.env->NowMicros();
236
25.0k
  auto file_number_holder = file_numbers_provider_->NewFileNumber();
237
25.0k
  auto file_number = file_number_holder.Last();
238
  // path 0 for level 0 file.
239
25.0k
  meta->fd = FileDescriptor(file_number, 0, 0, 0);
240
241
25.0k
  Status s;
242
25.0k
  {
243
25.0k
    db_mutex_->Unlock();
244
25.0k
    if (log_buffer_) {
245
25.0k
      log_buffer_->FlushBufferToLog();
246
25.0k
    }
247
25.0k
    std::vector<InternalIterator*> memtables;
248
25.0k
    ReadOptions ro;
249
25.0k
    ro.total_order_seek = true;
250
25.0k
    Arena arena;
251
25.0k
    uint64_t total_num_entries = 0, total_num_deletes = 0;
252
25.0k
    size_t total_memory_usage = 0;
253
25.4k
    for (MemTable* m : mems) {
254
25.4k
      RLOG(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
255
25.4k
          "[%s] [JOB %d] Flushing memtable with next log file: %" PRIu64 "\n",
256
25.4k
          cfd_->GetName().c_str(), job_context_->job_id, m->GetNextLogNumber());
257
25.4k
      memtables.push_back(m->NewIterator(ro, &arena));
258
25.4k
      total_num_entries += m->num_entries();
259
25.4k
      total_num_deletes += m->num_deletes();
260
25.4k
      total_memory_usage += m->ApproximateMemoryUsage();
261
25.4k
      const auto* range = m->Frontiers();
262
25.4k
      if (range) {
263
2.93k
        UserFrontier::Update(
264
2.93k
            &range->Smallest(), UpdateUserValueType::kSmallest, &meta->smallest.user_frontier);
265
2.93k
        UserFrontier::Update(
266
2.93k
            &range->Largest(), UpdateUserValueType::kLargest, &meta->largest.user_frontier);
267
2.93k
      }
268
25.4k
    }
269
270
25.0k
    event_logger_->Log() << "job" << job_context_->job_id << "event"
271
25.0k
                         << "flush_started"
272
25.0k
                         << "num_memtables" << mems.size() << "num_entries"
273
25.0k
                         << total_num_entries << "num_deletes"
274
25.0k
                         << total_num_deletes << "memory_usage"
275
25.0k
                         << total_memory_usage;
276
277
25.0k
    TableFileCreationInfo info;
278
25.0k
    {
279
25.0k
      ScopedArenaIterator iter(
280
25.0k
          NewMergingIterator(cfd_->internal_comparator().get(), &memtables[0],
281
25.0k
                             static_cast<int>(memtables.size()), &arena));
282
25.0k
      RLOG(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
283
25.0k
          "[%s] [JOB %d] Level-0 flush table #%" PRIu64 ": started",
284
25.0k
          cfd_->GetName().c_str(), job_context_->job_id, meta->fd.GetNumber());
285
286
25.0k
      TEST_SYNC_POINT_CALLBACK("FlushJob::WriteLevel0Table:output_compression",
287
25.0k
                               &output_compression_);
288
25.0k
      s = BuildTable(dbname_,
289
25.0k
                     db_options_.env,
290
25.0k
                     *cfd_->ioptions(),
291
25.0k
                     env_options_,
292
25.0k
                     cfd_->table_cache(),
293
25.0k
                     iter.get(),
294
25.0k
                     meta,
295
25.0k
                     cfd_->internal_comparator(),
296
25.0k
                     cfd_->int_tbl_prop_collector_factories(),
297
25.0k
                     cfd_->GetID(),
298
25.0k
                     existing_snapshots_,
299
25.0k
                     earliest_write_conflict_snapshot_,
300
25.0k
                     output_compression_,
301
25.0k
                     cfd_->ioptions()->compression_opts,
302
25.0k
                     mutable_cf_options_.paranoid_file_checks,
303
25.0k
                     cfd_->internal_stats(),
304
25.0k
                     db_options_.boundary_extractor.get(),
305
25.0k
                     Env::IO_HIGH,
306
25.0k
                     &table_properties_);
307
25.0k
      info.table_properties = table_properties_;
308
25.0k
      LogFlush(db_options_.info_log);
309
25.0k
    }
310
25.0k
    RLOG(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
311
25.0k
        "[%s] [JOB %d] Level-0 flush table #%" PRIu64 ": %" PRIu64
312
25.0k
        " bytes %s%s %s",
313
25.0k
        cfd_->GetName().c_str(), job_context_->job_id, meta->fd.GetNumber(),
314
25.0k
        meta->fd.GetTotalFileSize(), s.ToString().c_str(),
315
25.0k
        meta->marked_for_compaction ? " (needs compaction)" : "",
316
25.0k
        meta->FrontiersToString().c_str());
317
318
    // output to event logger
319
25.0k
    if (s.ok()) {
320
25.0k
      info.db_name = dbname_;
321
25.0k
      info.cf_name = cfd_->GetName();
322
25.0k
      info.file_path = TableFileName(db_options_.db_paths,
323
25.0k
                                     meta->fd.GetNumber(),
324
25.0k
                                     meta->fd.GetPathId());
325
25.0k
      info.file_size = meta->fd.GetTotalFileSize();
326
25.0k
      info.job_id = job_context_->job_id;
327
25.0k
      EventHelpers::LogAndNotifyTableFileCreation(
328
25.0k
          event_logger_, db_options_.listeners,
329
25.0k
          meta->fd, info);
330
25.0k
      TEST_SYNC_POINT("FlushJob::LogAndNotifyTableFileCreation()");
331
25.0k
    }
332
333
25.0k
    if (!db_options_.disableDataSync && output_file_directory_ != nullptr) {
334
22.1k
      RETURN_NOT_OK(output_file_directory_->Fsync());
335
22.1k
    }
336
25.0k
    TEST_SYNC_POINT("FlushJob::WriteLevel0Table");
337
25.0k
    db_mutex_->Lock();
338
25.0k
  }
339
340
  // Note that if total_file_size is zero, the file has been deleted and
341
  // should not be added to the manifest.
342
25.0k
  if (s.ok() && meta->fd.GetTotalFileSize() > 0) {
343
    // if we have more than 1 background thread, then we cannot
344
    // insert files directly into higher levels because some other
345
    // threads could be concurrently producing compacted files for
346
    // that key range.
347
    // Add file to L0
348
24.1k
    edit->AddCleanedFile(0 /* level */, *meta);
349
24.1k
  }
350
351
25.0k
  InternalStats::CompactionStats stats(1);
352
25.0k
  stats.micros = db_options_.env->NowMicros() - start_micros;
353
25.0k
  stats.bytes_written = meta->fd.GetTotalFileSize();
354
25.0k
  cfd_->internal_stats()->AddCompactionStats(0 /* level */, stats);
355
25.0k
  cfd_->internal_stats()->AddCFStats(InternalStats::BYTES_FLUSHED,
356
25.0k
      meta->fd.GetTotalFileSize());
357
25.0k
  RecordTick(stats_, COMPACT_WRITE_BYTES, meta->fd.GetTotalFileSize());
358
25.0k
  if (s.ok()) {
359
25.0k
    return file_number_holder;
360
20
  } else {
361
20
    return s;
362
20
  }
363
25.0k
}
364
365
}  // namespace rocksdb