YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/rocksdb/db/flush_job.cc
Line
Count
Source (jump to first uncovered line)
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under the BSD-style license found in the
3
//  LICENSE file in the root directory of this source tree. An additional grant
4
//  of patent rights can be found in the PATENTS file in the same directory.
5
//
6
// The following only applies to changes made to this file as part of YugaByte development.
7
//
8
// Portions Copyright (c) YugaByte, Inc.
9
//
10
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
11
// in compliance with the License.  You may obtain a copy of the License at
12
//
13
// http://www.apache.org/licenses/LICENSE-2.0
14
//
15
// Unless required by applicable law or agreed to in writing, software distributed under the License
16
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
17
// or implied.  See the License for the specific language governing permissions and limitations
18
// under the License.
19
//
20
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
21
// Use of this source code is governed by a BSD-style license that can be
22
// found in the LICENSE file. See the AUTHORS file for names of contributors.
23
24
#include "yb/rocksdb/db/flush_job.h"
25
26
#ifndef __STDC_FORMAT_MACROS
27
#define __STDC_FORMAT_MACROS
28
#endif
29
30
#include <inttypes.h>
31
32
#include <algorithm>
33
#include <vector>
34
#include <chrono>
35
36
#include "yb/rocksdb/db/builder.h"
37
#include "yb/rocksdb/db/dbformat.h"
38
#include "yb/rocksdb/db/event_helpers.h"
39
#include "yb/rocksdb/db/filename.h"
40
#include "yb/rocksdb/db/file_numbers.h"
41
#include "yb/rocksdb/db/internal_stats.h"
42
#include "yb/rocksdb/db/log_reader.h"
43
#include "yb/rocksdb/db/log_writer.h"
44
#include "yb/rocksdb/db/memtable.h"
45
#include "yb/rocksdb/db/memtable_list.h"
46
#include "yb/rocksdb/db/version_set.h"
47
#include "yb/rocksdb/port/likely.h"
48
#include "yb/rocksdb/port/port.h"
49
#include "yb/rocksdb/db.h"
50
#include "yb/rocksdb/env.h"
51
#include "yb/rocksdb/statistics.h"
52
#include "yb/rocksdb/status.h"
53
#include "yb/rocksdb/table.h"
54
#include "yb/rocksdb/table/merger.h"
55
#include "yb/rocksdb/table/scoped_arena_iterator.h"
56
#include "yb/rocksdb/util/coding.h"
57
#include "yb/rocksdb/util/event_logger.h"
58
#include "yb/rocksdb/util/log_buffer.h"
59
#include "yb/rocksdb/util/logging.h"
60
#include "yb/rocksdb/util/mutexlock.h"
61
#include "yb/rocksdb/util/statistics.h"
62
#include "yb/rocksdb/util/stop_watch.h"
63
#include "yb/rocksdb/util/sync_point.h"
64
65
#include "yb/util/atomic.h"
66
#include "yb/util/flag_tags.h"
67
#include "yb/util/logging.h"
68
#include "yb/util/result.h"
69
#include "yb/util/stats/iostats_context_imp.h"
70
71
DEFINE_int32(rocksdb_nothing_in_memtable_to_flush_sleep_ms, 10,
72
    "Used for a temporary workaround for http://bit.ly/ybissue437. How long to wait (ms) in case "
73
    "we could not flush any memtables, usually due to filters preventing us from doing so.");
74
75
DEFINE_test_flag(bool, rocksdb_crash_on_flush, false,
76
                 "When set, memtable flush in rocksdb crashes.");
77
78
DEFINE_bool(rocksdb_release_mutex_during_wait_for_memtables_to_flush, true,
79
            "When a flush is scheduled, but there isn't a memtable eligible yet, release "
80
            "the mutex before going to sleep and reacquire it post sleep.");
81
82
namespace rocksdb {
83
84
FlushJob::FlushJob(const std::string& dbname, ColumnFamilyData* cfd,
85
                   const DBOptions& db_options,
86
                   const MutableCFOptions& mutable_cf_options,
87
                   const EnvOptions& env_options, VersionSet* versions,
88
                   InstrumentedMutex* db_mutex,
89
                   std::atomic<bool>* shutting_down,
90
                   std::atomic<bool>* disable_flush_on_shutdown,
91
                   std::vector<SequenceNumber> existing_snapshots,
92
                   SequenceNumber earliest_write_conflict_snapshot,
93
                   MemTableFilter mem_table_flush_filter,
94
                   FileNumbersProvider* file_numbers_provider,
95
                   JobContext* job_context, LogBuffer* log_buffer,
96
                   Directory* db_directory, Directory* output_file_directory,
97
                   CompressionType output_compression, Statistics* stats,
98
                   EventLogger* event_logger)
99
    : dbname_(dbname),
100
      cfd_(cfd),
101
      db_options_(db_options),
102
      mutable_cf_options_(mutable_cf_options),
103
      env_options_(env_options),
104
      versions_(versions),
105
      db_mutex_(db_mutex),
106
      shutting_down_(shutting_down),
107
      disable_flush_on_shutdown_(disable_flush_on_shutdown),
108
      existing_snapshots_(std::move(existing_snapshots)),
109
      earliest_write_conflict_snapshot_(earliest_write_conflict_snapshot),
110
      mem_table_flush_filter_(std::move(mem_table_flush_filter)),
111
      file_numbers_provider_(file_numbers_provider),
112
      job_context_(job_context),
113
      log_buffer_(log_buffer),
114
      db_directory_(db_directory),
115
      output_file_directory_(output_file_directory),
116
      output_compression_(output_compression),
117
      stats_(stats),
118
59.6k
      event_logger_(event_logger) {
119
  // Update the thread status to indicate flush.
120
59.6k
  ReportStartedFlush();
121
59.6k
  TEST_SYNC_POINT("FlushJob::FlushJob()");
122
59.6k
}
123
124
59.6k
FlushJob::~FlushJob() {
125
59.6k
}
126
127
59.6k
void FlushJob::ReportStartedFlush() {
128
59.6k
  IOSTATS_RESET(bytes_written);
129
59.6k
}
130
131
57.0k
void FlushJob::RecordFlushIOStats() {
132
57.0k
  RecordTick(stats_, FLUSH_WRITE_BYTES, IOSTATS(bytes_written));
133
57.0k
  IOSTATS_RESET(bytes_written);
134
57.0k
}
135
136
59.6k
Result<FileNumbersHolder> FlushJob::Run(FileMetaData* file_meta) {
137
59.6k
  if (PREDICT_FALSE(yb::GetAtomicFlag(&FLAGS_TEST_rocksdb_crash_on_flush))) {
138
0
    CHECK(false) << "a flush should not have been scheduled.";
139
0
  }
140
141
  // Save the contents of the earliest memtable as a new Table
142
59.6k
  FileMetaData meta;
143
59.6k
  autovector<MemTable*> mems;
144
59.6k
  cfd_->imm()->PickMemtablesToFlush(&mems, mem_table_flush_filter_);
145
59.6k
  if (mems.empty()) {
146
    // A temporary workaround for repeated "Nothing in memtable to flush" messages in a
147
    // transactional workload due to the flush filter preventing us from flushing any memtables in
148
    // the provisional records RocksDB.
149
    //
150
    // See https://github.com/yugabyte/yugabyte-db/issues/437 for more details.
151
31.1k
    YB_LOG_EVERY_N_SECS(INFO, 1)
152
567
        << db_options_.log_prefix
153
567
        << "[" << cfd_->GetName() << "] No eligible memtables to flush.";
154
155
31.1k
    bool release_mutex = FLAGS_rocksdb_release_mutex_during_wait_for_memtables_to_flush;
156
157
31.1k
    if (release_mutex) {
158
      // Release the mutex before the sleep, so as to unblock writers.
159
31.1k
      db_mutex_->Unlock();
160
31.1k
    }
161
162
31.1k
      std::this_thread::sleep_for(std::chrono::milliseconds(
163
31.1k
        FLAGS_rocksdb_nothing_in_memtable_to_flush_sleep_ms));
164
165
31.1k
    if (release_mutex) {
166
31.1k
      db_mutex_->Lock();
167
31.1k
    }
168
169
31.1k
    return FileNumbersHolder();
170
31.1k
  }
171
172
  // entries mems are (implicitly) sorted in ascending order by their created
173
  // time. We will use the first memtable's `edit` to keep the meta info for
174
  // this flush.
175
28.5k
  MemTable* m = mems[0];
176
28.5k
  VersionEdit* edit = m->GetEdits();
177
28.5k
  edit->SetPrevLogNumber(0);
178
  // SetLogNumber(log_num) indicates logs with number smaller than log_num
179
  // will no longer be picked up for recovery.
180
28.5k
  edit->SetLogNumber(mems.back()->GetNextLogNumber());
181
28.5k
  edit->SetColumnFamily(cfd_->GetID());
182
183
  // This will release and re-acquire the mutex.
184
28.5k
  auto fnum = WriteLevel0Table(mems, edit, &meta);
185
186
28.5k
  if (fnum.ok() && 
(28.5k
(28.5k
shutting_down_->load(std::memory_order_acquire)28.5k
&&
187
28.5k
                     
disable_flush_on_shutdown_->load(std::memory_order_acquire)826
) ||
188
28.5k
                    
cfd_->IsDropped()28.5k
)) {
189
3
    fnum = STATUS(ShutdownInProgress, "Database shutdown or Column family drop during flush");
190
3
  }
191
192
28.5k
  if (!fnum.ok()) {
193
21
    cfd_->imm()->RollbackMemtableFlush(mems, meta.fd.GetNumber());
194
28.5k
  } else {
195
28.5k
    TEST_SYNC_POINT("FlushJob::InstallResults");
196
    // Replace immutable memtable with the generated Table
197
28.5k
    Status s = cfd_->imm()->InstallMemtableFlushResults(
198
28.5k
        cfd_, mutable_cf_options_, mems, versions_, db_mutex_,
199
28.5k
        meta.fd.GetNumber(), &job_context_->memtables_to_free, db_directory_,
200
28.5k
        log_buffer_, *fnum);
201
28.5k
    if (!s.ok()) {
202
1
      fnum = s;
203
1
    }
204
28.5k
  }
205
206
28.5k
  if (fnum.ok() && 
file_meta != nullptr28.5k
) {
207
28.5k
    *file_meta = meta;
208
28.5k
  }
209
210
  // This includes both SST and MANIFEST files IO.
211
28.5k
  RecordFlushIOStats();
212
213
28.5k
  auto stream = event_logger_->LogToBuffer(log_buffer_);
214
28.5k
  stream << "job" << job_context_->job_id << "event"
215
28.5k
         << "flush_finished";
216
28.5k
  stream << "lsm_state";
217
28.5k
  stream.StartArray();
218
28.5k
  auto vstorage = cfd_->current()->storage_info();
219
173k
  for (int level = 0; level < vstorage->num_levels(); 
++level144k
) {
220
144k
    stream << vstorage->NumLevelFiles(level);
221
144k
  }
222
28.5k
  stream.EndArray();
223
224
28.5k
  return fnum;
225
59.6k
}
226
227
Result<FileNumbersHolder> FlushJob::WriteLevel0Table(
228
28.5k
    const autovector<MemTable*>& mems, VersionEdit* edit, FileMetaData* meta) {
229
28.5k
  db_mutex_->AssertHeld();
230
28.5k
  const uint64_t start_micros = db_options_.env->NowMicros();
231
28.5k
  auto file_number_holder = file_numbers_provider_->NewFileNumber();
232
28.5k
  auto file_number = file_number_holder.Last();
233
  // path 0 for level 0 file.
234
28.5k
  meta->fd = FileDescriptor(file_number, 0, 0, 0);
235
236
28.5k
  Status s;
237
28.5k
  {
238
28.5k
    db_mutex_->Unlock();
239
28.5k
    if (log_buffer_) {
240
28.5k
      log_buffer_->FlushBufferToLog();
241
28.5k
    }
242
28.5k
    std::vector<InternalIterator*> memtables;
243
28.5k
    ReadOptions ro;
244
28.5k
    ro.total_order_seek = true;
245
28.5k
    Arena arena;
246
28.5k
    uint64_t total_num_entries = 0, total_num_deletes = 0;
247
28.5k
    size_t total_memory_usage = 0;
248
29.0k
    for (MemTable* m : mems) {
249
29.0k
      RLOG(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
250
29.0k
          "[%s] [JOB %d] Flushing memtable with next log file: %" PRIu64 "\n",
251
29.0k
          cfd_->GetName().c_str(), job_context_->job_id, m->GetNextLogNumber());
252
29.0k
      memtables.push_back(m->NewIterator(ro, &arena));
253
29.0k
      total_num_entries += m->num_entries();
254
29.0k
      total_num_deletes += m->num_deletes();
255
29.0k
      total_memory_usage += m->ApproximateMemoryUsage();
256
29.0k
      const auto* range = m->Frontiers();
257
29.0k
      if (range) {
258
4.61k
        UserFrontier::Update(
259
4.61k
            &range->Smallest(), UpdateUserValueType::kSmallest, &meta->smallest.user_frontier);
260
4.61k
        UserFrontier::Update(
261
4.61k
            &range->Largest(), UpdateUserValueType::kLargest, &meta->largest.user_frontier);
262
4.61k
      }
263
29.0k
    }
264
265
28.5k
    event_logger_->Log() << "job" << job_context_->job_id << "event"
266
28.5k
                         << "flush_started"
267
28.5k
                         << "num_memtables" << mems.size() << "num_entries"
268
28.5k
                         << total_num_entries << "num_deletes"
269
28.5k
                         << total_num_deletes << "memory_usage"
270
28.5k
                         << total_memory_usage;
271
272
28.5k
    TableFileCreationInfo info;
273
28.5k
    {
274
28.5k
      ScopedArenaIterator iter(
275
28.5k
          NewMergingIterator(cfd_->internal_comparator().get(), &memtables[0],
276
28.5k
                             static_cast<int>(memtables.size()), &arena));
277
28.5k
      RLOG(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
278
28.5k
          "[%s] [JOB %d] Level-0 flush table #%" PRIu64 ": started",
279
28.5k
          cfd_->GetName().c_str(), job_context_->job_id, meta->fd.GetNumber());
280
281
28.5k
      TEST_SYNC_POINT_CALLBACK("FlushJob::WriteLevel0Table:output_compression",
282
28.5k
                               &output_compression_);
283
28.5k
      s = BuildTable(dbname_,
284
28.5k
                     db_options_.env,
285
28.5k
                     *cfd_->ioptions(),
286
28.5k
                     env_options_,
287
28.5k
                     cfd_->table_cache(),
288
28.5k
                     iter.get(),
289
28.5k
                     meta,
290
28.5k
                     cfd_->internal_comparator(),
291
28.5k
                     cfd_->int_tbl_prop_collector_factories(),
292
28.5k
                     cfd_->GetID(),
293
28.5k
                     existing_snapshots_,
294
28.5k
                     earliest_write_conflict_snapshot_,
295
28.5k
                     output_compression_,
296
28.5k
                     cfd_->ioptions()->compression_opts,
297
28.5k
                     mutable_cf_options_.paranoid_file_checks,
298
28.5k
                     cfd_->internal_stats(),
299
28.5k
                     db_options_.boundary_extractor.get(),
300
28.5k
                     Env::IO_HIGH,
301
28.5k
                     &table_properties_);
302
28.5k
      info.table_properties = table_properties_;
303
28.5k
      LogFlush(db_options_.info_log);
304
28.5k
    }
305
28.5k
    RLOG(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
306
28.5k
        "[%s] [JOB %d] Level-0 flush table #%" PRIu64 ": %" PRIu64
307
28.5k
        " bytes %s%s %s",
308
28.5k
        cfd_->GetName().c_str(), job_context_->job_id, meta->fd.GetNumber(),
309
28.5k
        meta->fd.GetTotalFileSize(), s.ToString().c_str(),
310
28.5k
        meta->marked_for_compaction ? " (needs compaction)" : "",
311
28.5k
        meta->FrontiersToString().c_str());
312
313
    // output to event logger
314
28.5k
    if (s.ok()) {
315
28.5k
      info.db_name = dbname_;
316
28.5k
      info.cf_name = cfd_->GetName();
317
28.5k
      info.file_path = TableFileName(db_options_.db_paths,
318
28.5k
                                     meta->fd.GetNumber(),
319
28.5k
                                     meta->fd.GetPathId());
320
28.5k
      info.file_size = meta->fd.GetTotalFileSize();
321
28.5k
      info.job_id = job_context_->job_id;
322
28.5k
      EventHelpers::LogAndNotifyTableFileCreation(
323
28.5k
          event_logger_, db_options_.listeners,
324
28.5k
          meta->fd, info);
325
28.5k
      TEST_SYNC_POINT("FlushJob::LogAndNotifyTableFileCreation()");
326
28.5k
    }
327
328
28.5k
    if (!db_options_.disableDataSync && 
output_file_directory_ != nullptr22.1k
) {
329
22.1k
      RETURN_NOT_OK(output_file_directory_->Fsync());
330
22.1k
    }
331
28.5k
    TEST_SYNC_POINT("FlushJob::WriteLevel0Table");
332
28.5k
    db_mutex_->Lock();
333
28.5k
  }
334
335
  // Note that if total_file_size is zero, the file has been deleted and
336
  // should not be added to the manifest.
337
28.5k
  if (s.ok() && 
meta->fd.GetTotalFileSize() > 028.5k
) {
338
    // if we have more than 1 background thread, then we cannot
339
    // insert files directly into higher levels because some other
340
    // threads could be concurrently producing compacted files for
341
    // that key range.
342
    // Add file to L0
343
27.4k
    edit->AddCleanedFile(0 /* level */, *meta);
344
27.4k
  }
345
346
28.5k
  InternalStats::CompactionStats stats(1);
347
28.5k
  stats.micros = db_options_.env->NowMicros() - start_micros;
348
28.5k
  stats.bytes_written = meta->fd.GetTotalFileSize();
349
28.5k
  cfd_->internal_stats()->AddCompactionStats(0 /* level */, stats);
350
28.5k
  cfd_->internal_stats()->AddCFStats(InternalStats::BYTES_FLUSHED,
351
28.5k
      meta->fd.GetTotalFileSize());
352
28.5k
  RecordFlushIOStats();
353
28.5k
  if (s.ok()) {
354
28.5k
    return file_number_holder;
355
28.5k
  } else {
356
23
    return s;
357
23
  }
358
28.5k
}
359
360
}  // namespace rocksdb