/Users/deen/code/yugabyte-db/src/yb/rocksdb/db/flush_job.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | | // This source code is licensed under the BSD-style license found in the |
3 | | // LICENSE file in the root directory of this source tree. An additional grant |
4 | | // of patent rights can be found in the PATENTS file in the same directory. |
5 | | // |
6 | | // The following only applies to changes made to this file as part of YugaByte development. |
7 | | // |
8 | | // Portions Copyright (c) YugaByte, Inc. |
9 | | // |
10 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
11 | | // in compliance with the License. You may obtain a copy of the License at |
12 | | // |
13 | | // http://www.apache.org/licenses/LICENSE-2.0 |
14 | | // |
15 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
16 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
17 | | // or implied. See the License for the specific language governing permissions and limitations |
18 | | // under the License. |
19 | | // |
20 | | // Copyright (c) 2011 The LevelDB Authors. All rights reserved. |
21 | | // Use of this source code is governed by a BSD-style license that can be |
22 | | // found in the LICENSE file. See the AUTHORS file for names of contributors. |
23 | | |
24 | | #include "yb/rocksdb/db/flush_job.h" |
25 | | |
26 | | #ifndef __STDC_FORMAT_MACROS |
27 | | #define __STDC_FORMAT_MACROS |
28 | | #endif |
29 | | |
30 | | #include <inttypes.h> |
31 | | |
32 | | #include <algorithm> |
33 | | #include <vector> |
34 | | #include <chrono> |
35 | | |
36 | | #include "yb/rocksdb/db/builder.h" |
37 | | #include "yb/rocksdb/db/dbformat.h" |
38 | | #include "yb/rocksdb/db/event_helpers.h" |
39 | | #include "yb/rocksdb/db/filename.h" |
40 | | #include "yb/rocksdb/db/file_numbers.h" |
41 | | #include "yb/rocksdb/db/internal_stats.h" |
42 | | #include "yb/rocksdb/db/log_reader.h" |
43 | | #include "yb/rocksdb/db/log_writer.h" |
44 | | #include "yb/rocksdb/db/memtable.h" |
45 | | #include "yb/rocksdb/db/memtable_list.h" |
46 | | #include "yb/rocksdb/db/version_set.h" |
47 | | #include "yb/rocksdb/port/likely.h" |
48 | | #include "yb/rocksdb/port/port.h" |
49 | | #include "yb/rocksdb/db.h" |
50 | | #include "yb/rocksdb/env.h" |
51 | | #include "yb/rocksdb/statistics.h" |
52 | | #include "yb/rocksdb/status.h" |
53 | | #include "yb/rocksdb/table.h" |
54 | | #include "yb/rocksdb/table/merger.h" |
55 | | #include "yb/rocksdb/table/scoped_arena_iterator.h" |
56 | | #include "yb/rocksdb/util/coding.h" |
57 | | #include "yb/rocksdb/util/event_logger.h" |
58 | | #include "yb/rocksdb/util/log_buffer.h" |
59 | | #include "yb/rocksdb/util/logging.h" |
60 | | #include "yb/rocksdb/util/mutexlock.h" |
61 | | #include "yb/rocksdb/util/statistics.h" |
62 | | #include "yb/rocksdb/util/stop_watch.h" |
63 | | #include "yb/rocksdb/util/sync_point.h" |
64 | | |
65 | | #include "yb/util/atomic.h" |
66 | | #include "yb/util/flag_tags.h" |
67 | | #include "yb/util/logging.h" |
68 | | #include "yb/util/result.h" |
69 | | #include "yb/util/stats/iostats_context_imp.h" |
70 | | |
71 | | DEFINE_int32(rocksdb_nothing_in_memtable_to_flush_sleep_ms, 10, |
72 | | "Used for a temporary workaround for http://bit.ly/ybissue437. How long to wait (ms) in case " |
73 | | "we could not flush any memtables, usually due to filters preventing us from doing so."); |
74 | | |
75 | | DEFINE_test_flag(bool, rocksdb_crash_on_flush, false, |
76 | | "When set, memtable flush in rocksdb crashes."); |
77 | | |
78 | | DEFINE_bool(rocksdb_release_mutex_during_wait_for_memtables_to_flush, true, |
79 | | "When a flush is scheduled, but there isn't a memtable eligible yet, release " |
80 | | "the mutex before going to sleep and reacquire it post sleep."); |
81 | | |
82 | | namespace rocksdb { |
83 | | |
84 | | FlushJob::FlushJob(const std::string& dbname, ColumnFamilyData* cfd, |
85 | | const DBOptions& db_options, |
86 | | const MutableCFOptions& mutable_cf_options, |
87 | | const EnvOptions& env_options, VersionSet* versions, |
88 | | InstrumentedMutex* db_mutex, |
89 | | std::atomic<bool>* shutting_down, |
90 | | std::atomic<bool>* disable_flush_on_shutdown, |
91 | | std::vector<SequenceNumber> existing_snapshots, |
92 | | SequenceNumber earliest_write_conflict_snapshot, |
93 | | MemTableFilter mem_table_flush_filter, |
94 | | FileNumbersProvider* file_numbers_provider, |
95 | | JobContext* job_context, LogBuffer* log_buffer, |
96 | | Directory* db_directory, Directory* output_file_directory, |
97 | | CompressionType output_compression, Statistics* stats, |
98 | | EventLogger* event_logger) |
99 | | : dbname_(dbname), |
100 | | cfd_(cfd), |
101 | | db_options_(db_options), |
102 | | mutable_cf_options_(mutable_cf_options), |
103 | | env_options_(env_options), |
104 | | versions_(versions), |
105 | | db_mutex_(db_mutex), |
106 | | shutting_down_(shutting_down), |
107 | | disable_flush_on_shutdown_(disable_flush_on_shutdown), |
108 | | existing_snapshots_(std::move(existing_snapshots)), |
109 | | earliest_write_conflict_snapshot_(earliest_write_conflict_snapshot), |
110 | | mem_table_flush_filter_(std::move(mem_table_flush_filter)), |
111 | | file_numbers_provider_(file_numbers_provider), |
112 | | job_context_(job_context), |
113 | | log_buffer_(log_buffer), |
114 | | db_directory_(db_directory), |
115 | | output_file_directory_(output_file_directory), |
116 | | output_compression_(output_compression), |
117 | | stats_(stats), |
118 | 59.6k | event_logger_(event_logger) { |
119 | | // Update the thread status to indicate flush. |
120 | 59.6k | ReportStartedFlush(); |
121 | 59.6k | TEST_SYNC_POINT("FlushJob::FlushJob()"); |
122 | 59.6k | } |
123 | | |
124 | 59.6k | FlushJob::~FlushJob() { |
125 | 59.6k | } |
126 | | |
127 | 59.6k | void FlushJob::ReportStartedFlush() { |
128 | 59.6k | IOSTATS_RESET(bytes_written); |
129 | 59.6k | } |
130 | | |
131 | 57.0k | void FlushJob::RecordFlushIOStats() { |
132 | 57.0k | RecordTick(stats_, FLUSH_WRITE_BYTES, IOSTATS(bytes_written)); |
133 | 57.0k | IOSTATS_RESET(bytes_written); |
134 | 57.0k | } |
135 | | |
136 | 59.6k | Result<FileNumbersHolder> FlushJob::Run(FileMetaData* file_meta) { |
137 | 59.6k | if (PREDICT_FALSE(yb::GetAtomicFlag(&FLAGS_TEST_rocksdb_crash_on_flush))) { |
138 | 0 | CHECK(false) << "a flush should not have been scheduled."; |
139 | 0 | } |
140 | | |
141 | | // Save the contents of the earliest memtable as a new Table |
142 | 59.6k | FileMetaData meta; |
143 | 59.6k | autovector<MemTable*> mems; |
144 | 59.6k | cfd_->imm()->PickMemtablesToFlush(&mems, mem_table_flush_filter_); |
145 | 59.6k | if (mems.empty()) { |
146 | | // A temporary workaround for repeated "Nothing in memtable to flush" messages in a |
147 | | // transactional workload due to the flush filter preventing us from flushing any memtables in |
148 | | // the provisional records RocksDB. |
149 | | // |
150 | | // See https://github.com/yugabyte/yugabyte-db/issues/437 for more details. |
151 | 31.1k | YB_LOG_EVERY_N_SECS(INFO, 1) |
152 | 567 | << db_options_.log_prefix |
153 | 567 | << "[" << cfd_->GetName() << "] No eligible memtables to flush."; |
154 | | |
155 | 31.1k | bool release_mutex = FLAGS_rocksdb_release_mutex_during_wait_for_memtables_to_flush; |
156 | | |
157 | 31.1k | if (release_mutex) { |
158 | | // Release the mutex before the sleep, so as to unblock writers. |
159 | 31.1k | db_mutex_->Unlock(); |
160 | 31.1k | } |
161 | | |
162 | 31.1k | std::this_thread::sleep_for(std::chrono::milliseconds( |
163 | 31.1k | FLAGS_rocksdb_nothing_in_memtable_to_flush_sleep_ms)); |
164 | | |
165 | 31.1k | if (release_mutex) { |
166 | 31.1k | db_mutex_->Lock(); |
167 | 31.1k | } |
168 | | |
169 | 31.1k | return FileNumbersHolder(); |
170 | 31.1k | } |
171 | | |
172 | | // entries mems are (implicitly) sorted in ascending order by their created |
173 | | // time. We will use the first memtable's `edit` to keep the meta info for |
174 | | // this flush. |
175 | 28.5k | MemTable* m = mems[0]; |
176 | 28.5k | VersionEdit* edit = m->GetEdits(); |
177 | 28.5k | edit->SetPrevLogNumber(0); |
178 | | // SetLogNumber(log_num) indicates logs with number smaller than log_num |
179 | | // will no longer be picked up for recovery. |
180 | 28.5k | edit->SetLogNumber(mems.back()->GetNextLogNumber()); |
181 | 28.5k | edit->SetColumnFamily(cfd_->GetID()); |
182 | | |
183 | | // This will release and re-acquire the mutex. |
184 | 28.5k | auto fnum = WriteLevel0Table(mems, edit, &meta); |
185 | | |
186 | 28.5k | if (fnum.ok() && (28.5k (28.5k shutting_down_->load(std::memory_order_acquire)28.5k && |
187 | 28.5k | disable_flush_on_shutdown_->load(std::memory_order_acquire)826 ) || |
188 | 28.5k | cfd_->IsDropped()28.5k )) { |
189 | 3 | fnum = STATUS(ShutdownInProgress, "Database shutdown or Column family drop during flush"); |
190 | 3 | } |
191 | | |
192 | 28.5k | if (!fnum.ok()) { |
193 | 21 | cfd_->imm()->RollbackMemtableFlush(mems, meta.fd.GetNumber()); |
194 | 28.5k | } else { |
195 | 28.5k | TEST_SYNC_POINT("FlushJob::InstallResults"); |
196 | | // Replace immutable memtable with the generated Table |
197 | 28.5k | Status s = cfd_->imm()->InstallMemtableFlushResults( |
198 | 28.5k | cfd_, mutable_cf_options_, mems, versions_, db_mutex_, |
199 | 28.5k | meta.fd.GetNumber(), &job_context_->memtables_to_free, db_directory_, |
200 | 28.5k | log_buffer_, *fnum); |
201 | 28.5k | if (!s.ok()) { |
202 | 1 | fnum = s; |
203 | 1 | } |
204 | 28.5k | } |
205 | | |
206 | 28.5k | if (fnum.ok() && file_meta != nullptr28.5k ) { |
207 | 28.5k | *file_meta = meta; |
208 | 28.5k | } |
209 | | |
210 | | // This includes both SST and MANIFEST files IO. |
211 | 28.5k | RecordFlushIOStats(); |
212 | | |
213 | 28.5k | auto stream = event_logger_->LogToBuffer(log_buffer_); |
214 | 28.5k | stream << "job" << job_context_->job_id << "event" |
215 | 28.5k | << "flush_finished"; |
216 | 28.5k | stream << "lsm_state"; |
217 | 28.5k | stream.StartArray(); |
218 | 28.5k | auto vstorage = cfd_->current()->storage_info(); |
219 | 173k | for (int level = 0; level < vstorage->num_levels(); ++level144k ) { |
220 | 144k | stream << vstorage->NumLevelFiles(level); |
221 | 144k | } |
222 | 28.5k | stream.EndArray(); |
223 | | |
224 | 28.5k | return fnum; |
225 | 59.6k | } |
226 | | |
227 | | Result<FileNumbersHolder> FlushJob::WriteLevel0Table( |
228 | 28.5k | const autovector<MemTable*>& mems, VersionEdit* edit, FileMetaData* meta) { |
229 | 28.5k | db_mutex_->AssertHeld(); |
230 | 28.5k | const uint64_t start_micros = db_options_.env->NowMicros(); |
231 | 28.5k | auto file_number_holder = file_numbers_provider_->NewFileNumber(); |
232 | 28.5k | auto file_number = file_number_holder.Last(); |
233 | | // path 0 for level 0 file. |
234 | 28.5k | meta->fd = FileDescriptor(file_number, 0, 0, 0); |
235 | | |
236 | 28.5k | Status s; |
237 | 28.5k | { |
238 | 28.5k | db_mutex_->Unlock(); |
239 | 28.5k | if (log_buffer_) { |
240 | 28.5k | log_buffer_->FlushBufferToLog(); |
241 | 28.5k | } |
242 | 28.5k | std::vector<InternalIterator*> memtables; |
243 | 28.5k | ReadOptions ro; |
244 | 28.5k | ro.total_order_seek = true; |
245 | 28.5k | Arena arena; |
246 | 28.5k | uint64_t total_num_entries = 0, total_num_deletes = 0; |
247 | 28.5k | size_t total_memory_usage = 0; |
248 | 29.0k | for (MemTable* m : mems) { |
249 | 29.0k | RLOG(InfoLogLevel::INFO_LEVEL, db_options_.info_log, |
250 | 29.0k | "[%s] [JOB %d] Flushing memtable with next log file: %" PRIu64 "\n", |
251 | 29.0k | cfd_->GetName().c_str(), job_context_->job_id, m->GetNextLogNumber()); |
252 | 29.0k | memtables.push_back(m->NewIterator(ro, &arena)); |
253 | 29.0k | total_num_entries += m->num_entries(); |
254 | 29.0k | total_num_deletes += m->num_deletes(); |
255 | 29.0k | total_memory_usage += m->ApproximateMemoryUsage(); |
256 | 29.0k | const auto* range = m->Frontiers(); |
257 | 29.0k | if (range) { |
258 | 4.61k | UserFrontier::Update( |
259 | 4.61k | &range->Smallest(), UpdateUserValueType::kSmallest, &meta->smallest.user_frontier); |
260 | 4.61k | UserFrontier::Update( |
261 | 4.61k | &range->Largest(), UpdateUserValueType::kLargest, &meta->largest.user_frontier); |
262 | 4.61k | } |
263 | 29.0k | } |
264 | | |
265 | 28.5k | event_logger_->Log() << "job" << job_context_->job_id << "event" |
266 | 28.5k | << "flush_started" |
267 | 28.5k | << "num_memtables" << mems.size() << "num_entries" |
268 | 28.5k | << total_num_entries << "num_deletes" |
269 | 28.5k | << total_num_deletes << "memory_usage" |
270 | 28.5k | << total_memory_usage; |
271 | | |
272 | 28.5k | TableFileCreationInfo info; |
273 | 28.5k | { |
274 | 28.5k | ScopedArenaIterator iter( |
275 | 28.5k | NewMergingIterator(cfd_->internal_comparator().get(), &memtables[0], |
276 | 28.5k | static_cast<int>(memtables.size()), &arena)); |
277 | 28.5k | RLOG(InfoLogLevel::INFO_LEVEL, db_options_.info_log, |
278 | 28.5k | "[%s] [JOB %d] Level-0 flush table #%" PRIu64 ": started", |
279 | 28.5k | cfd_->GetName().c_str(), job_context_->job_id, meta->fd.GetNumber()); |
280 | | |
281 | 28.5k | TEST_SYNC_POINT_CALLBACK("FlushJob::WriteLevel0Table:output_compression", |
282 | 28.5k | &output_compression_); |
283 | 28.5k | s = BuildTable(dbname_, |
284 | 28.5k | db_options_.env, |
285 | 28.5k | *cfd_->ioptions(), |
286 | 28.5k | env_options_, |
287 | 28.5k | cfd_->table_cache(), |
288 | 28.5k | iter.get(), |
289 | 28.5k | meta, |
290 | 28.5k | cfd_->internal_comparator(), |
291 | 28.5k | cfd_->int_tbl_prop_collector_factories(), |
292 | 28.5k | cfd_->GetID(), |
293 | 28.5k | existing_snapshots_, |
294 | 28.5k | earliest_write_conflict_snapshot_, |
295 | 28.5k | output_compression_, |
296 | 28.5k | cfd_->ioptions()->compression_opts, |
297 | 28.5k | mutable_cf_options_.paranoid_file_checks, |
298 | 28.5k | cfd_->internal_stats(), |
299 | 28.5k | db_options_.boundary_extractor.get(), |
300 | 28.5k | Env::IO_HIGH, |
301 | 28.5k | &table_properties_); |
302 | 28.5k | info.table_properties = table_properties_; |
303 | 28.5k | LogFlush(db_options_.info_log); |
304 | 28.5k | } |
305 | 28.5k | RLOG(InfoLogLevel::INFO_LEVEL, db_options_.info_log, |
306 | 28.5k | "[%s] [JOB %d] Level-0 flush table #%" PRIu64 ": %" PRIu64 |
307 | 28.5k | " bytes %s%s %s", |
308 | 28.5k | cfd_->GetName().c_str(), job_context_->job_id, meta->fd.GetNumber(), |
309 | 28.5k | meta->fd.GetTotalFileSize(), s.ToString().c_str(), |
310 | 28.5k | meta->marked_for_compaction ? " (needs compaction)" : "", |
311 | 28.5k | meta->FrontiersToString().c_str()); |
312 | | |
313 | | // output to event logger |
314 | 28.5k | if (s.ok()) { |
315 | 28.5k | info.db_name = dbname_; |
316 | 28.5k | info.cf_name = cfd_->GetName(); |
317 | 28.5k | info.file_path = TableFileName(db_options_.db_paths, |
318 | 28.5k | meta->fd.GetNumber(), |
319 | 28.5k | meta->fd.GetPathId()); |
320 | 28.5k | info.file_size = meta->fd.GetTotalFileSize(); |
321 | 28.5k | info.job_id = job_context_->job_id; |
322 | 28.5k | EventHelpers::LogAndNotifyTableFileCreation( |
323 | 28.5k | event_logger_, db_options_.listeners, |
324 | 28.5k | meta->fd, info); |
325 | 28.5k | TEST_SYNC_POINT("FlushJob::LogAndNotifyTableFileCreation()"); |
326 | 28.5k | } |
327 | | |
328 | 28.5k | if (!db_options_.disableDataSync && output_file_directory_ != nullptr22.1k ) { |
329 | 22.1k | RETURN_NOT_OK(output_file_directory_->Fsync()); |
330 | 22.1k | } |
331 | 28.5k | TEST_SYNC_POINT("FlushJob::WriteLevel0Table"); |
332 | 28.5k | db_mutex_->Lock(); |
333 | 28.5k | } |
334 | | |
335 | | // Note that if total_file_size is zero, the file has been deleted and |
336 | | // should not be added to the manifest. |
337 | 28.5k | if (s.ok() && meta->fd.GetTotalFileSize() > 028.5k ) { |
338 | | // if we have more than 1 background thread, then we cannot |
339 | | // insert files directly into higher levels because some other |
340 | | // threads could be concurrently producing compacted files for |
341 | | // that key range. |
342 | | // Add file to L0 |
343 | 27.4k | edit->AddCleanedFile(0 /* level */, *meta); |
344 | 27.4k | } |
345 | | |
346 | 28.5k | InternalStats::CompactionStats stats(1); |
347 | 28.5k | stats.micros = db_options_.env->NowMicros() - start_micros; |
348 | 28.5k | stats.bytes_written = meta->fd.GetTotalFileSize(); |
349 | 28.5k | cfd_->internal_stats()->AddCompactionStats(0 /* level */, stats); |
350 | 28.5k | cfd_->internal_stats()->AddCFStats(InternalStats::BYTES_FLUSHED, |
351 | 28.5k | meta->fd.GetTotalFileSize()); |
352 | 28.5k | RecordFlushIOStats(); |
353 | 28.5k | if (s.ok()) { |
354 | 28.5k | return file_number_holder; |
355 | 28.5k | } else { |
356 | 23 | return s; |
357 | 23 | } |
358 | 28.5k | } |
359 | | |
360 | | } // namespace rocksdb |