/Users/deen/code/yugabyte-db/src/yb/rocksdb/db/flush_job.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | | // This source code is licensed under the BSD-style license found in the |
3 | | // LICENSE file in the root directory of this source tree. An additional grant |
4 | | // of patent rights can be found in the PATENTS file in the same directory. |
5 | | // |
6 | | // The following only applies to changes made to this file as part of YugaByte development. |
7 | | // |
8 | | // Portions Copyright (c) YugaByte, Inc. |
9 | | // |
10 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
11 | | // in compliance with the License. You may obtain a copy of the License at |
12 | | // |
13 | | // http://www.apache.org/licenses/LICENSE-2.0 |
14 | | // |
15 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
16 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
17 | | // or implied. See the License for the specific language governing permissions and limitations |
18 | | // under the License. |
19 | | // |
20 | | // Copyright (c) 2011 The LevelDB Authors. All rights reserved. |
21 | | // Use of this source code is governed by a BSD-style license that can be |
22 | | // found in the LICENSE file. See the AUTHORS file for names of contributors. |
23 | | |
24 | | #include "yb/rocksdb/db/flush_job.h" |
25 | | |
26 | | #ifndef __STDC_FORMAT_MACROS |
27 | | #define __STDC_FORMAT_MACROS |
28 | | #endif |
29 | | |
30 | | #include <inttypes.h> |
31 | | |
32 | | #include <algorithm> |
33 | | #include <vector> |
34 | | #include <chrono> |
35 | | |
36 | | #include "yb/rocksdb/db/builder.h" |
37 | | #include "yb/rocksdb/db/dbformat.h" |
38 | | #include "yb/rocksdb/db/event_helpers.h" |
39 | | #include "yb/rocksdb/db/filename.h" |
40 | | #include "yb/rocksdb/db/file_numbers.h" |
41 | | #include "yb/rocksdb/db/internal_stats.h" |
42 | | #include "yb/rocksdb/db/log_reader.h" |
43 | | #include "yb/rocksdb/db/log_writer.h" |
44 | | #include "yb/rocksdb/db/memtable.h" |
45 | | #include "yb/rocksdb/db/memtable_list.h" |
46 | | #include "yb/rocksdb/db/version_set.h" |
47 | | #include "yb/rocksdb/port/likely.h" |
48 | | #include "yb/rocksdb/port/port.h" |
49 | | #include "yb/rocksdb/db.h" |
50 | | #include "yb/rocksdb/env.h" |
51 | | #include "yb/rocksdb/statistics.h" |
52 | | #include "yb/rocksdb/status.h" |
53 | | #include "yb/rocksdb/table.h" |
54 | | #include "yb/rocksdb/table/merger.h" |
55 | | #include "yb/rocksdb/table/scoped_arena_iterator.h" |
56 | | #include "yb/rocksdb/util/coding.h" |
57 | | #include "yb/rocksdb/util/event_logger.h" |
58 | | #include "yb/rocksdb/util/log_buffer.h" |
59 | | #include "yb/rocksdb/util/logging.h" |
60 | | #include "yb/rocksdb/util/mutexlock.h" |
61 | | #include "yb/rocksdb/util/statistics.h" |
62 | | #include "yb/rocksdb/util/stop_watch.h" |
63 | | #include "yb/rocksdb/util/sync_point.h" |
64 | | |
65 | | #include "yb/util/atomic.h" |
66 | | #include "yb/util/flag_tags.h" |
67 | | #include "yb/util/logging.h" |
68 | | #include "yb/util/result.h" |
69 | | #include "yb/util/stats/iostats_context_imp.h" |
70 | | |
71 | | DEFINE_int32(rocksdb_nothing_in_memtable_to_flush_sleep_ms, 10, |
72 | | "Used for a temporary workaround for http://bit.ly/ybissue437. How long to wait (ms) in case " |
73 | | "we could not flush any memtables, usually due to filters preventing us from doing so."); |
74 | | |
75 | | DEFINE_test_flag(bool, rocksdb_crash_on_flush, false, |
76 | | "When set, memtable flush in rocksdb crashes."); |
77 | | |
78 | | DEFINE_bool(rocksdb_release_mutex_during_wait_for_memtables_to_flush, true, |
79 | | "When a flush is scheduled, but there isn't a memtable eligible yet, release " |
80 | | "the mutex before going to sleep and reacquire it post sleep."); |
81 | | |
82 | | namespace rocksdb { |
83 | | |
84 | | FlushJob::FlushJob(const std::string& dbname, ColumnFamilyData* cfd, |
85 | | const DBOptions& db_options, |
86 | | const MutableCFOptions& mutable_cf_options, |
87 | | const EnvOptions& env_options, VersionSet* versions, |
88 | | InstrumentedMutex* db_mutex, |
89 | | std::atomic<bool>* shutting_down, |
90 | | std::atomic<bool>* disable_flush_on_shutdown, |
91 | | std::vector<SequenceNumber> existing_snapshots, |
92 | | SequenceNumber earliest_write_conflict_snapshot, |
93 | | MemTableFilter mem_table_flush_filter, |
94 | | FileNumbersProvider* file_numbers_provider, |
95 | | JobContext* job_context, LogBuffer* log_buffer, |
96 | | Directory* db_directory, Directory* output_file_directory, |
97 | | CompressionType output_compression, Statistics* stats, |
98 | | EventLogger* event_logger) |
99 | | : dbname_(dbname), |
100 | | cfd_(cfd), |
101 | | db_options_(db_options), |
102 | | mutable_cf_options_(mutable_cf_options), |
103 | | env_options_(env_options), |
104 | | versions_(versions), |
105 | | db_mutex_(db_mutex), |
106 | | shutting_down_(shutting_down), |
107 | | disable_flush_on_shutdown_(disable_flush_on_shutdown), |
108 | | existing_snapshots_(std::move(existing_snapshots)), |
109 | | earliest_write_conflict_snapshot_(earliest_write_conflict_snapshot), |
110 | | mem_table_flush_filter_(std::move(mem_table_flush_filter)), |
111 | | file_numbers_provider_(file_numbers_provider), |
112 | | job_context_(job_context), |
113 | | log_buffer_(log_buffer), |
114 | | db_directory_(db_directory), |
115 | | output_file_directory_(output_file_directory), |
116 | | output_compression_(output_compression), |
117 | | stats_(stats), |
118 | 31.8k | event_logger_(event_logger) { |
119 | | // Update the thread status to indicate flush. |
120 | 31.8k | ReportStartedFlush(); |
121 | 31.8k | TEST_SYNC_POINT("FlushJob::FlushJob()"); |
122 | 31.8k | } |
123 | | |
124 | 31.8k | FlushJob::~FlushJob() { |
125 | 31.8k | } |
126 | | |
127 | 31.8k | void FlushJob::ReportStartedFlush() { |
128 | 31.8k | IOSTATS_RESET(bytes_written); |
129 | 31.8k | } |
130 | | |
131 | 25.0k | void FlushJob::ReportFlushInputSize(const autovector<MemTable*>& mems) { |
132 | 25.0k | uint64_t input_size = 0; |
133 | 25.4k | for (auto* mem : mems) { |
134 | 25.4k | input_size += mem->ApproximateMemoryUsage(); |
135 | 25.4k | } |
136 | 25.0k | } |
137 | | |
138 | 25.0k | void FlushJob::RecordFlushIOStats() { |
139 | 25.0k | } |
140 | | |
141 | 31.8k | Result<FileNumbersHolder> FlushJob::Run(FileMetaData* file_meta) { |
142 | 31.8k | if (PREDICT_FALSE(yb::GetAtomicFlag(&FLAGS_TEST_rocksdb_crash_on_flush))) { |
143 | 0 | CHECK(false) << "a flush should not have been scheduled."; |
144 | 0 | } |
145 | | |
146 | | // Save the contents of the earliest memtable as a new Table |
147 | 31.8k | FileMetaData meta; |
148 | 31.8k | autovector<MemTable*> mems; |
149 | 31.8k | cfd_->imm()->PickMemtablesToFlush(&mems, mem_table_flush_filter_); |
150 | 31.8k | if (mems.empty()) { |
151 | | // A temporary workaround for repeated "Nothing in memtable to flush" messages in a |
152 | | // transactional workload due to the flush filter preventing us from flushing any memtables in |
153 | | // the provisional records RocksDB. |
154 | | // |
155 | | // See https://github.com/yugabyte/yugabyte-db/issues/437 for more details. |
156 | 6.84k | YB_LOG_EVERY_N_SECS(INFO, 1) |
157 | 129 | << db_options_.log_prefix |
158 | 129 | << "[" << cfd_->GetName() << "] No eligible memtables to flush."; |
159 | | |
160 | 6.84k | bool release_mutex = FLAGS_rocksdb_release_mutex_during_wait_for_memtables_to_flush; |
161 | | |
162 | 6.84k | if (release_mutex) { |
163 | | // Release the mutex before the sleep, so as to unblock writers. |
164 | 6.84k | db_mutex_->Unlock(); |
165 | 6.84k | } |
166 | | |
167 | 6.84k | std::this_thread::sleep_for(std::chrono::milliseconds( |
168 | 6.84k | FLAGS_rocksdb_nothing_in_memtable_to_flush_sleep_ms)); |
169 | | |
170 | 6.84k | if (release_mutex) { |
171 | 6.84k | db_mutex_->Lock(); |
172 | 6.84k | } |
173 | | |
174 | 6.84k | return FileNumbersHolder(); |
175 | 6.84k | } |
176 | | |
177 | 25.0k | ReportFlushInputSize(mems); |
178 | | |
179 | | // entries mems are (implicitly) sorted in ascending order by their created |
180 | | // time. We will use the first memtable's `edit` to keep the meta info for |
181 | | // this flush. |
182 | 25.0k | MemTable* m = mems[0]; |
183 | 25.0k | VersionEdit* edit = m->GetEdits(); |
184 | 25.0k | edit->SetPrevLogNumber(0); |
185 | | // SetLogNumber(log_num) indicates logs with number smaller than log_num |
186 | | // will no longer be picked up for recovery. |
187 | 25.0k | edit->SetLogNumber(mems.back()->GetNextLogNumber()); |
188 | 25.0k | edit->SetColumnFamily(cfd_->GetID()); |
189 | | |
190 | | // This will release and re-acquire the mutex. |
191 | 25.0k | auto fnum = WriteLevel0Table(mems, edit, &meta); |
192 | | |
193 | 25.0k | if (fnum.ok() && ((shutting_down_->load(std::memory_order_acquire) && |
194 | 721 | disable_flush_on_shutdown_->load(std::memory_order_acquire)) || |
195 | 25.0k | cfd_->IsDropped())) { |
196 | 0 | fnum = STATUS(ShutdownInProgress, "Database shutdown or Column family drop during flush"); |
197 | 0 | } |
198 | | |
199 | 25.0k | if (!fnum.ok()) { |
200 | 18 | cfd_->imm()->RollbackMemtableFlush(mems, meta.fd.GetNumber()); |
201 | 25.0k | } else { |
202 | 25.0k | TEST_SYNC_POINT("FlushJob::InstallResults"); |
203 | | // Replace immutable memtable with the generated Table |
204 | 25.0k | Status s = cfd_->imm()->InstallMemtableFlushResults( |
205 | 25.0k | cfd_, mutable_cf_options_, mems, versions_, db_mutex_, |
206 | 25.0k | meta.fd.GetNumber(), &job_context_->memtables_to_free, db_directory_, |
207 | 25.0k | log_buffer_, *fnum); |
208 | 25.0k | if (!s.ok()) { |
209 | 1 | fnum = s; |
210 | 1 | } |
211 | 25.0k | } |
212 | | |
213 | 25.0k | if (fnum.ok() && file_meta != nullptr) { |
214 | 25.0k | *file_meta = meta; |
215 | 25.0k | } |
216 | 25.0k | RecordFlushIOStats(); |
217 | | |
218 | 25.0k | auto stream = event_logger_->LogToBuffer(log_buffer_); |
219 | 25.0k | stream << "job" << job_context_->job_id << "event" |
220 | 25.0k | << "flush_finished"; |
221 | 25.0k | stream << "lsm_state"; |
222 | 25.0k | stream.StartArray(); |
223 | 25.0k | auto vstorage = cfd_->current()->storage_info(); |
224 | 166k | for (int level = 0; level < vstorage->num_levels(); ++level) { |
225 | 141k | stream << vstorage->NumLevelFiles(level); |
226 | 141k | } |
227 | 25.0k | stream.EndArray(); |
228 | | |
229 | 25.0k | return fnum; |
230 | 25.0k | } |
231 | | |
232 | | Result<FileNumbersHolder> FlushJob::WriteLevel0Table( |
233 | 25.0k | const autovector<MemTable*>& mems, VersionEdit* edit, FileMetaData* meta) { |
234 | 25.0k | db_mutex_->AssertHeld(); |
235 | 25.0k | const uint64_t start_micros = db_options_.env->NowMicros(); |
236 | 25.0k | auto file_number_holder = file_numbers_provider_->NewFileNumber(); |
237 | 25.0k | auto file_number = file_number_holder.Last(); |
238 | | // path 0 for level 0 file. |
239 | 25.0k | meta->fd = FileDescriptor(file_number, 0, 0, 0); |
240 | | |
241 | 25.0k | Status s; |
242 | 25.0k | { |
243 | 25.0k | db_mutex_->Unlock(); |
244 | 25.0k | if (log_buffer_) { |
245 | 25.0k | log_buffer_->FlushBufferToLog(); |
246 | 25.0k | } |
247 | 25.0k | std::vector<InternalIterator*> memtables; |
248 | 25.0k | ReadOptions ro; |
249 | 25.0k | ro.total_order_seek = true; |
250 | 25.0k | Arena arena; |
251 | 25.0k | uint64_t total_num_entries = 0, total_num_deletes = 0; |
252 | 25.0k | size_t total_memory_usage = 0; |
253 | 25.4k | for (MemTable* m : mems) { |
254 | 25.4k | RLOG(InfoLogLevel::INFO_LEVEL, db_options_.info_log, |
255 | 25.4k | "[%s] [JOB %d] Flushing memtable with next log file: %" PRIu64 "\n", |
256 | 25.4k | cfd_->GetName().c_str(), job_context_->job_id, m->GetNextLogNumber()); |
257 | 25.4k | memtables.push_back(m->NewIterator(ro, &arena)); |
258 | 25.4k | total_num_entries += m->num_entries(); |
259 | 25.4k | total_num_deletes += m->num_deletes(); |
260 | 25.4k | total_memory_usage += m->ApproximateMemoryUsage(); |
261 | 25.4k | const auto* range = m->Frontiers(); |
262 | 25.4k | if (range) { |
263 | 2.93k | UserFrontier::Update( |
264 | 2.93k | &range->Smallest(), UpdateUserValueType::kSmallest, &meta->smallest.user_frontier); |
265 | 2.93k | UserFrontier::Update( |
266 | 2.93k | &range->Largest(), UpdateUserValueType::kLargest, &meta->largest.user_frontier); |
267 | 2.93k | } |
268 | 25.4k | } |
269 | | |
270 | 25.0k | event_logger_->Log() << "job" << job_context_->job_id << "event" |
271 | 25.0k | << "flush_started" |
272 | 25.0k | << "num_memtables" << mems.size() << "num_entries" |
273 | 25.0k | << total_num_entries << "num_deletes" |
274 | 25.0k | << total_num_deletes << "memory_usage" |
275 | 25.0k | << total_memory_usage; |
276 | | |
277 | 25.0k | TableFileCreationInfo info; |
278 | 25.0k | { |
279 | 25.0k | ScopedArenaIterator iter( |
280 | 25.0k | NewMergingIterator(cfd_->internal_comparator().get(), &memtables[0], |
281 | 25.0k | static_cast<int>(memtables.size()), &arena)); |
282 | 25.0k | RLOG(InfoLogLevel::INFO_LEVEL, db_options_.info_log, |
283 | 25.0k | "[%s] [JOB %d] Level-0 flush table #%" PRIu64 ": started", |
284 | 25.0k | cfd_->GetName().c_str(), job_context_->job_id, meta->fd.GetNumber()); |
285 | | |
286 | 25.0k | TEST_SYNC_POINT_CALLBACK("FlushJob::WriteLevel0Table:output_compression", |
287 | 25.0k | &output_compression_); |
288 | 25.0k | s = BuildTable(dbname_, |
289 | 25.0k | db_options_.env, |
290 | 25.0k | *cfd_->ioptions(), |
291 | 25.0k | env_options_, |
292 | 25.0k | cfd_->table_cache(), |
293 | 25.0k | iter.get(), |
294 | 25.0k | meta, |
295 | 25.0k | cfd_->internal_comparator(), |
296 | 25.0k | cfd_->int_tbl_prop_collector_factories(), |
297 | 25.0k | cfd_->GetID(), |
298 | 25.0k | existing_snapshots_, |
299 | 25.0k | earliest_write_conflict_snapshot_, |
300 | 25.0k | output_compression_, |
301 | 25.0k | cfd_->ioptions()->compression_opts, |
302 | 25.0k | mutable_cf_options_.paranoid_file_checks, |
303 | 25.0k | cfd_->internal_stats(), |
304 | 25.0k | db_options_.boundary_extractor.get(), |
305 | 25.0k | Env::IO_HIGH, |
306 | 25.0k | &table_properties_); |
307 | 25.0k | info.table_properties = table_properties_; |
308 | 25.0k | LogFlush(db_options_.info_log); |
309 | 25.0k | } |
310 | 25.0k | RLOG(InfoLogLevel::INFO_LEVEL, db_options_.info_log, |
311 | 25.0k | "[%s] [JOB %d] Level-0 flush table #%" PRIu64 ": %" PRIu64 |
312 | 25.0k | " bytes %s%s %s", |
313 | 25.0k | cfd_->GetName().c_str(), job_context_->job_id, meta->fd.GetNumber(), |
314 | 25.0k | meta->fd.GetTotalFileSize(), s.ToString().c_str(), |
315 | 25.0k | meta->marked_for_compaction ? " (needs compaction)" : "", |
316 | 25.0k | meta->FrontiersToString().c_str()); |
317 | | |
318 | | // output to event logger |
319 | 25.0k | if (s.ok()) { |
320 | 25.0k | info.db_name = dbname_; |
321 | 25.0k | info.cf_name = cfd_->GetName(); |
322 | 25.0k | info.file_path = TableFileName(db_options_.db_paths, |
323 | 25.0k | meta->fd.GetNumber(), |
324 | 25.0k | meta->fd.GetPathId()); |
325 | 25.0k | info.file_size = meta->fd.GetTotalFileSize(); |
326 | 25.0k | info.job_id = job_context_->job_id; |
327 | 25.0k | EventHelpers::LogAndNotifyTableFileCreation( |
328 | 25.0k | event_logger_, db_options_.listeners, |
329 | 25.0k | meta->fd, info); |
330 | 25.0k | TEST_SYNC_POINT("FlushJob::LogAndNotifyTableFileCreation()"); |
331 | 25.0k | } |
332 | | |
333 | 25.0k | if (!db_options_.disableDataSync && output_file_directory_ != nullptr) { |
334 | 22.1k | RETURN_NOT_OK(output_file_directory_->Fsync()); |
335 | 22.1k | } |
336 | 25.0k | TEST_SYNC_POINT("FlushJob::WriteLevel0Table"); |
337 | 25.0k | db_mutex_->Lock(); |
338 | 25.0k | } |
339 | | |
340 | | // Note that if total_file_size is zero, the file has been deleted and |
341 | | // should not be added to the manifest. |
342 | 25.0k | if (s.ok() && meta->fd.GetTotalFileSize() > 0) { |
343 | | // if we have more than 1 background thread, then we cannot |
344 | | // insert files directly into higher levels because some other |
345 | | // threads could be concurrently producing compacted files for |
346 | | // that key range. |
347 | | // Add file to L0 |
348 | 24.1k | edit->AddCleanedFile(0 /* level */, *meta); |
349 | 24.1k | } |
350 | | |
351 | 25.0k | InternalStats::CompactionStats stats(1); |
352 | 25.0k | stats.micros = db_options_.env->NowMicros() - start_micros; |
353 | 25.0k | stats.bytes_written = meta->fd.GetTotalFileSize(); |
354 | 25.0k | cfd_->internal_stats()->AddCompactionStats(0 /* level */, stats); |
355 | 25.0k | cfd_->internal_stats()->AddCFStats(InternalStats::BYTES_FLUSHED, |
356 | 25.0k | meta->fd.GetTotalFileSize()); |
357 | 25.0k | RecordTick(stats_, COMPACT_WRITE_BYTES, meta->fd.GetTotalFileSize()); |
358 | 25.0k | if (s.ok()) { |
359 | 25.0k | return file_number_holder; |
360 | 20 | } else { |
361 | 20 | return s; |
362 | 20 | } |
363 | 25.0k | } |
364 | | |
365 | | } // namespace rocksdb |