YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/rocksdb/db/db_impl.h
Line
Count
Source (jump to first uncovered line)
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under the BSD-style license found in the
3
//  LICENSE file in the root directory of this source tree. An additional grant
4
//  of patent rights can be found in the PATENTS file in the same directory.
5
//
6
// The following only applies to changes made to this file as part of YugaByte development.
7
//
8
// Portions Copyright (c) YugaByte, Inc.
9
//
10
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
11
// in compliance with the License.  You may obtain a copy of the License at
12
//
13
// http://www.apache.org/licenses/LICENSE-2.0
14
//
15
// Unless required by applicable law or agreed to in writing, software distributed under the License
16
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
17
// or implied.  See the License for the specific language governing permissions and limitations
18
// under the License.
19
//
20
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
21
// Use of this source code is governed by a BSD-style license that can be
22
// found in the LICENSE file. See the AUTHORS file for names of contributors.
23
#ifndef YB_ROCKSDB_DB_DB_IMPL_H
24
#define YB_ROCKSDB_DB_DB_IMPL_H
25
26
#pragma once
27
28
#include <algorithm>
29
#include <atomic>
30
#include <deque>
31
#include <limits>
32
#include <list>
33
#include <set>
34
#include <string>
35
#include <utility>
36
#include <vector>
37
38
#include "yb/gutil/thread_annotations.h"
39
40
#include "yb/rocksdb/db.h"
41
#include "yb/rocksdb/db/column_family.h"
42
#include "yb/rocksdb/db/compaction.h"
43
#include "yb/rocksdb/db/dbformat.h"
44
#include "yb/rocksdb/db/flush_scheduler.h"
45
#include "yb/rocksdb/db/internal_stats.h"
46
#include "yb/rocksdb/db/log_writer.h"
47
#include "yb/rocksdb/db/memtable_list.h"
48
#include "yb/rocksdb/db/snapshot_impl.h"
49
#include "yb/rocksdb/db/version_edit.h"
50
#include "yb/rocksdb/db/wal_manager.h"
51
#include "yb/rocksdb/db/write_controller.h"
52
#include "yb/rocksdb/db/write_thread.h"
53
#include "yb/rocksdb/db/writebuffer.h"
54
#include "yb/rocksdb/env.h"
55
#include "yb/rocksdb/memtablerep.h"
56
#include "yb/rocksdb/port/port.h"
57
#include "yb/rocksdb/transaction_log.h"
58
#include "yb/rocksdb/util/autovector.h"
59
#include "yb/rocksdb/util/event_logger.h"
60
#include "yb/rocksdb/util/instrumented_mutex.h"
61
#include "yb/rocksdb/util/stop_watch.h"
62
#include "yb/rocksdb/util/thread_local.h"
63
64
namespace rocksdb {
65
66
class MemTable;
67
class TableCache;
68
class Version;
69
class VersionEdit;
70
class VersionSet;
71
class Arena;
72
class WriteCallback;
73
class FileNumbersProvider;
74
struct JobContext;
75
struct ExternalSstFileInfo;
76
77
class DBImpl : public DB {
78
 public:
79
  DBImpl(const DBOptions& options, const std::string& dbname);
80
  virtual ~DBImpl();
81
82
  // Implementations of the DB interface
83
  using DB::Put;
84
  virtual Status Put(const WriteOptions& options,
85
                     ColumnFamilyHandle* column_family, const Slice& key,
86
                     const Slice& value) override;
87
  using DB::Merge;
88
  virtual Status Merge(const WriteOptions& options,
89
                       ColumnFamilyHandle* column_family, const Slice& key,
90
                       const Slice& value) override;
91
  using DB::Delete;
92
  virtual Status Delete(const WriteOptions& options,
93
                        ColumnFamilyHandle* column_family,
94
                        const Slice& key) override;
95
  using DB::SingleDelete;
96
  virtual Status SingleDelete(const WriteOptions& options,
97
                              ColumnFamilyHandle* column_family,
98
                              const Slice& key) override;
99
  using DB::Write;
100
  virtual Status Write(const WriteOptions& options,
101
                       WriteBatch* updates) override;
102
103
  using DB::Get;
104
  virtual Status Get(const ReadOptions& options,
105
                     ColumnFamilyHandle* column_family, const Slice& key,
106
                     std::string* value) override;
107
  using DB::MultiGet;
108
  virtual std::vector<Status> MultiGet(
109
      const ReadOptions& options,
110
      const std::vector<ColumnFamilyHandle*>& column_family,
111
      const std::vector<Slice>& keys,
112
      std::vector<std::string>* values) override;
113
114
  virtual Status CreateColumnFamily(const ColumnFamilyOptions& options,
115
                                    const std::string& column_family,
116
                                    ColumnFamilyHandle** handle) override;
117
  virtual Status DropColumnFamily(ColumnFamilyHandle* column_family) override;
118
119
  // Returns false if key doesn't exist in the database and true if it may.
120
  // If value_found is not passed in as null, then return the value if found in
121
  // memory. On return, if value was found, then value_found will be set to true
122
  // , otherwise false.
123
  using DB::KeyMayExist;
124
  virtual bool KeyMayExist(const ReadOptions& options,
125
                           ColumnFamilyHandle* column_family, const Slice& key,
126
                           std::string* value,
127
                           bool* value_found = nullptr) override;
128
  using DB::NewIterator;
129
  virtual Iterator* NewIterator(const ReadOptions& options,
130
                                ColumnFamilyHandle* column_family) override;
131
  virtual Status NewIterators(
132
      const ReadOptions& options,
133
      const std::vector<ColumnFamilyHandle*>& column_families,
134
      std::vector<Iterator*>* iterators) override;
135
  virtual const Snapshot* GetSnapshot() override;
136
  virtual void ReleaseSnapshot(const Snapshot* snapshot) override;
137
  using DB::GetProperty;
138
  virtual bool GetProperty(ColumnFamilyHandle* column_family,
139
                           const Slice& property, std::string* value) override;
140
  using DB::GetIntProperty;
141
  virtual bool GetIntProperty(ColumnFamilyHandle* column_family,
142
                              const Slice& property, uint64_t* value) override;
143
  using DB::GetAggregatedIntProperty;
144
  virtual bool GetAggregatedIntProperty(const Slice& property,
145
                                        uint64_t* aggregated_value) override;
146
  using DB::GetApproximateSizes;
147
  virtual void GetApproximateSizes(ColumnFamilyHandle* column_family,
148
                                   const Range* range, int n, uint64_t* sizes,
149
                                   bool include_memtable = false) override;
150
  using DB::CompactRange;
151
  virtual Status CompactRange(const CompactRangeOptions& options,
152
                              ColumnFamilyHandle* column_family,
153
                              const Slice* begin, const Slice* end) override;
154
155
  using DB::CompactFiles;
156
  virtual Status CompactFiles(const CompactionOptions& compact_options,
157
                              ColumnFamilyHandle* column_family,
158
                              const std::vector<std::string>& input_file_names,
159
                              const int output_level,
160
                              const int output_path_id = -1) override;
161
162
  virtual Status PauseBackgroundWork() override;
163
  virtual Status ContinueBackgroundWork() override;
164
165
  virtual Status EnableAutoCompaction(
166
      const std::vector<ColumnFamilyHandle*>& column_family_handles) override;
167
168
  using DB::SetOptions;
169
  Status SetOptions(
170
      ColumnFamilyHandle* column_family,
171
      const std::unordered_map<std::string, std::string>& options_map,
172
      bool dump_options = true) override;
173
174
  // Set whether DB should be flushed on shutdown.
175
  void SetDisableFlushOnShutdown(bool disable_flush_on_shutdown) override;
176
  void StartShutdown() override;
177
178
  using DB::NumberLevels;
179
  virtual int NumberLevels(ColumnFamilyHandle* column_family) override;
180
  using DB::MaxMemCompactionLevel;
181
  virtual int MaxMemCompactionLevel(ColumnFamilyHandle* column_family) override;
182
  using DB::Level0StopWriteTrigger;
183
  virtual int Level0StopWriteTrigger(
184
      ColumnFamilyHandle* column_family) override;
185
  virtual const std::string& GetName() const override;
186
  virtual Env* GetEnv() const override;
187
  Env* GetCheckpointEnv() const override;
188
  using DB::GetOptions;
189
  virtual const Options& GetOptions(
190
      ColumnFamilyHandle* column_family) const override;
191
  using DB::GetDBOptions;
192
  virtual const DBOptions& GetDBOptions() const override;
193
  using DB::Flush;
194
  virtual Status Flush(const FlushOptions& options,
195
                       ColumnFamilyHandle* column_family) override;
196
  using DB::WaitForFlush;
197
  virtual Status WaitForFlush(ColumnFamilyHandle* column_family) override;
198
  virtual Status SyncWAL() override;
199
200
  virtual SequenceNumber GetLatestSequenceNumber() const override;
201
202
#ifndef ROCKSDB_LITE
203
  virtual Status DisableFileDeletions() override;
204
  virtual Status EnableFileDeletions(bool force) override;
205
  virtual int IsFileDeletionsEnabled() const;
206
  // All the returned filenames start with "/"
207
  virtual Status GetLiveFiles(std::vector<std::string>&,
208
                              uint64_t* manifest_file_size,
209
                              bool flush_memtable = true) override;
210
  virtual Status GetSortedWalFiles(VectorLogPtr* files) override;
211
212
  virtual Status GetUpdatesSince(
213
      SequenceNumber seq_number, unique_ptr<TransactionLogIterator>* iter,
214
      const TransactionLogIterator::ReadOptions&
215
          read_options = TransactionLogIterator::ReadOptions()) override;
216
  virtual Status DeleteFile(std::string name) override;
217
  Status DeleteFilesInRange(ColumnFamilyHandle* column_family,
218
                            const Slice* begin, const Slice* end);
219
220
  void GetLiveFilesMetaData(std::vector<LiveFileMetaData>* metadata) override;
221
222
  UserFrontierPtr GetFlushedFrontier() override;
223
224
  CHECKED_STATUS ModifyFlushedFrontier(
225
      UserFrontierPtr frontier,
226
      FrontierModificationMode mode) override;
227
228
  FlushAbility GetFlushAbility() override;
229
230
  UserFrontierPtr GetMutableMemTableFrontier(UpdateUserValueType type) override;
231
232
  // Obtains the meta data of the specified column family of the DB.
233
  // STATUS(NotFound, "") will be returned if the current DB does not have
234
  // any column family match the specified name.
235
  // TODO(yhchiang): output parameter is placed in the end in this codebase.
236
  virtual void GetColumnFamilyMetaData(
237
      ColumnFamilyHandle* column_family,
238
      ColumnFamilyMetaData* metadata) override;
239
240
  // Obtains all column family options and corresponding names,
241
  // dropped columns are not included into the resulting collections.
242
  virtual void GetColumnFamiliesOptions(
243
      std::vector<std::string>* column_family_names,
244
      std::vector<ColumnFamilyOptions>* column_family_options) override;
245
246
  // experimental API
247
  Status SuggestCompactRange(ColumnFamilyHandle* column_family,
248
                             const Slice* begin, const Slice* end);
249
250
  Status PromoteL0(ColumnFamilyHandle* column_family, int target_level);
251
252
  // Similar to Write() but will call the callback once on the single write
253
  // thread to determine whether it is safe to perform the write.
254
  virtual Status WriteWithCallback(const WriteOptions& write_options,
255
                                   WriteBatch* my_batch,
256
                                   WriteCallback* callback);
257
258
  // Returns the sequence number that is guaranteed to be smaller than or equal
259
  // to the sequence number of any key that could be inserted into the current
260
  // memtables. It can then be assumed that any write with a larger(or equal)
261
  // sequence number will be present in this memtable or a later memtable.
262
  //
263
  // If the earliest sequence number could not be determined,
264
  // kMaxSequenceNumber will be returned.
265
  //
266
  // If include_history=true, will also search Memtables in MemTableList
267
  // History.
268
  SequenceNumber GetEarliestMemTableSequenceNumber(SuperVersion* sv,
269
                                                   bool include_history);
270
271
  // For a given key, check to see if there are any records for this key
272
  // in the memtables, including memtable history.  If cache_only is false,
273
  // SST files will also be checked.
274
  //
275
  // If a key is found, *found_record_for_key will be set to true and
276
  // *seq will will be set to the stored sequence number for the latest
277
  // operation on this key or kMaxSequenceNumber if unknown.
278
  // If no key is found, *found_record_for_key will be set to false.
279
  //
280
  // Note: If cache_only=false, it is possible for *seq to be set to 0 if
281
  // the sequence number has been cleared from the record.  If the caller is
282
  // holding an active db snapshot, we know the missing sequence must be less
283
  // than the snapshot's sequence number (sequence numbers are only cleared
284
  // when there are no earlier active snapshots).
285
  //
286
  // If NotFound is returned and found_record_for_key is set to false, then no
287
  // record for this key was found.  If the caller is holding an active db
288
  // snapshot, we know that no key could have existing after this snapshot
289
  // (since we do not compact keys that have an earlier snapshot).
290
  //
291
  // Returns OK or NotFound on success,
292
  // other status on unexpected error.
293
  Status GetLatestSequenceForKey(SuperVersion* sv, const Slice& key,
294
                                 bool cache_only, SequenceNumber* seq,
295
                                 bool* found_record_for_key);
296
297
  using DB::AddFile;
298
  virtual Status AddFile(ColumnFamilyHandle* column_family,
299
                         const ExternalSstFileInfo* file_info,
300
                         bool move_file) override;
301
  virtual Status AddFile(ColumnFamilyHandle* column_family,
302
                         const std::string& file_path, bool move_file) override;
303
304
#endif  // ROCKSDB_LITE
305
306
  // Similar to GetSnapshot(), but also lets the db know that this snapshot
307
  // will be used for transaction write-conflict checking.  The DB can then
308
  // make sure not to compact any keys that would prevent a write-conflict from
309
  // being detected.
310
  const Snapshot* GetSnapshotForWriteConflictBoundary();
311
312
  // checks if all live files exist on file system and that their file sizes
313
  // match to our in-memory records
314
  virtual Status CheckConsistency();
315
316
  virtual Status GetDbIdentity(std::string* identity) const override;
317
318
  Status RunManualCompaction(ColumnFamilyData* cfd, int input_level,
319
                             int output_level, uint32_t output_path_id,
320
                             const Slice* begin, const Slice* end,
321
                             bool exclusive,
322
                             bool disallow_trivial_move = false);
323
324
  // Return an internal iterator over the current state of the database.
325
  // The keys of this iterator are internal keys (see format.h).
326
  // The returned iterator should be deleted when no longer needed.
327
  InternalIterator* NewInternalIterator(
328
      Arena* arena, ColumnFamilyHandle* column_family = nullptr);
329
330
  // Extra methods (for testing) that are not in the public DB interface
331
  // Implemented in db_impl_debug.cc
332
333
  // Compact any files in the named level that overlap [*begin, *end]
334
  Status TEST_CompactRange(int level, const Slice* begin, const Slice* end,
335
                           ColumnFamilyHandle* column_family = nullptr,
336
                           bool disallow_trivial_move = false);
337
338
  // Force current memtable contents to be flushed.
339
  Status TEST_FlushMemTable(bool wait = true);
340
341
  // Wait for memtable compaction
342
  Status TEST_WaitForFlushMemTable(ColumnFamilyHandle* column_family = nullptr);
343
344
  // Wait for any compaction
345
  Status TEST_WaitForCompact();
346
347
  // Return the maximum overlapping data (in bytes) at next level for any
348
  // file at a level >= 1.
349
  int64_t TEST_MaxNextLevelOverlappingBytes(ColumnFamilyHandle* column_family =
350
                                                nullptr);
351
352
  // Return the current manifest file no.
353
  uint64_t TEST_Current_Manifest_FileNo();
354
355
  // get total level0 file size. Only for testing.
356
  uint64_t TEST_GetLevel0TotalSize();
357
358
  int TEST_NumRunningLargeCompactions();
359
360
  int TEST_NumTotalRunningCompactions();
361
362
  int TEST_NumRunningFlushes();
363
364
  int TEST_NumBackgroundCompactionsScheduled();
365
366
  void TEST_GetFilesMetaData(ColumnFamilyHandle* column_family,
367
                             std::vector<std::vector<FileMetaData>>* metadata);
368
369
  void TEST_LockMutex();
370
371
  void TEST_UnlockMutex();
372
373
  // REQUIRES: mutex locked
374
  void* TEST_BeginWrite();
375
376
  // REQUIRES: mutex locked
377
  // pass the pointer that you got from TEST_BeginWrite()
378
  void TEST_EndWrite(void* w);
379
380
  uint64_t TEST_MaxTotalInMemoryState() const {
381
    return max_total_in_memory_state_;
382
  }
383
384
  size_t TEST_LogsToFreeSize();
385
386
  uint64_t TEST_LogfileNumber();
387
388
  // Returns column family name to ImmutableCFOptions map.
389
  Status TEST_GetAllImmutableCFOptions(
390
      std::unordered_map<std::string, const ImmutableCFOptions*>* iopts_map);
391
392
0
  Cache* TEST_table_cache() { return table_cache_.get(); }
393
394
  WriteController& TEST_write_controler() { return write_controller_; }
395
396
  // Return maximum background compaction alowed to be scheduled based on
397
  // compaction status.
398
  int BGCompactionsAllowed() const;
399
400
  // Returns the list of live files in 'live' and the list
401
  // of all files in the filesystem in 'candidate_files'.
402
  // If force == false and the last call was less than
403
  // db_options_.delete_obsolete_files_period_micros microseconds ago,
404
  // it will not fill up the job_context
405
  void FindObsoleteFiles(JobContext* job_context, bool force,
406
                         bool no_full_scan = false);
407
408
  // Diffs the files listed in filenames and those that do not
409
  // belong to live files are posibly removed. Also, removes all the
410
  // files in sst_delete_files and log_delete_files.
411
  // It is not necessary to hold the mutex when invoking this method.
412
  void PurgeObsoleteFiles(const JobContext& background_contet);
413
414
  ColumnFamilyHandle* DefaultColumnFamily() const override;
415
416
330
  const SnapshotList& snapshots() const { return snapshots_; }
417
418
  void CancelAllBackgroundWork(bool wait);
419
420
  // Find Super version and reference it. Based on options, it might return
421
  // the thread local cached one.
422
  // Call ReturnAndCleanupSuperVersion() when it is no longer needed.
423
  SuperVersion* GetAndRefSuperVersion(ColumnFamilyData* cfd);
424
425
  // Similar to the previous function but looks up based on a column family id.
426
  // nullptr will be returned if this column family no longer exists.
427
  // REQUIRED: this function should only be called on the write thread or if the
428
  // mutex is held.
429
  SuperVersion* GetAndRefSuperVersion(uint32_t column_family_id);
430
431
  // Same as above, should called without mutex held and not on write thread.
432
  SuperVersion* GetAndRefSuperVersionUnlocked(uint32_t column_family_id);
433
434
  // Un-reference the super version and return it to thread local cache if
435
  // needed. If it is the last reference of the super version. Clean it up
436
  // after un-referencing it.
437
  void ReturnAndCleanupSuperVersion(ColumnFamilyData* cfd, SuperVersion* sv);
438
439
  // Similar to the previous function but looks up based on a column family id.
440
  // nullptr will be returned if this column family no longer exists.
441
  // REQUIRED: this function should only be called on the write thread.
442
  void ReturnAndCleanupSuperVersion(uint32_t colun_family_id, SuperVersion* sv);
443
444
  // Same as above, should called without mutex held and not on write thread.
445
  void ReturnAndCleanupSuperVersionUnlocked(uint32_t colun_family_id,
446
                                            SuperVersion* sv);
447
448
  // REQUIRED: this function should only be called on the write thread or if the
449
  // mutex is held.  Return value only valid until next call to this function or
450
  // mutex is released.
451
  ColumnFamilyHandle* GetColumnFamilyHandle(uint32_t column_family_id);
452
453
  // Same as above, should called without mutex held and not on write thread.
454
  ColumnFamilyHandle* GetColumnFamilyHandleUnlocked(uint32_t column_family_id);
455
456
  // Returns the number of currently running flushes.
457
  // REQUIREMENT: mutex_ must be held when calling this function.
458
0
  int num_running_flushes() {
459
0
    mutex_.AssertHeld();
460
0
    return num_running_flushes_;
461
0
  }
462
463
  // Returns the number of currently running compactions.
464
  // REQUIREMENT: mutex_ must be held when calling this function.
465
0
  int num_running_compactions() {
466
0
    mutex_.AssertHeld();
467
0
    return num_total_running_compactions_;
468
0
  }
469
470
83.5k
  int num_running_large_compactions() {
471
83.5k
    mutex_.AssertHeld();
472
83.5k
    return num_running_large_compactions_;
473
83.5k
  }
474
475
  // Imports data from other database dir. Source database is left unmodified.
476
  // Checks that source database has appropriate seqno.
477
  // I.e. seqno ranges of imported database does not overlap with seqno ranges of destination db.
478
  // And max seqno of imported database is less that active seqno of destination db.
479
  CHECKED_STATUS Import(const std::string& source_dir) override;
480
481
  bool AreWritesStopped();
482
  bool NeedsDelay() override;
483
484
  Result<std::string> GetMiddleKey() override;
485
486
  // Used in testing to make the old memtable immutable and start writing to a new one.
487
  void TEST_SwitchMemtable() override;
488
489
 protected:
490
  Env* const env_;
491
  Env* const checkpoint_env_;
492
  const std::string dbname_;
493
  unique_ptr<VersionSet> versions_;
494
  const DBOptions db_options_;
495
  Statistics* stats_;
496
497
  InternalIterator* NewInternalIterator(const ReadOptions&,
498
                                        ColumnFamilyData* cfd,
499
                                        SuperVersion* super_version,
500
                                        Arena* arena);
501
502
  // Except in DB::Open(), WriteOptionsFile can only be called when:
503
  // 1. WriteThread::Writer::EnterUnbatched() is used.
504
  // 2. db_mutex is held
505
  Status WriteOptionsFile();
506
507
  // The following two functions can only be called when:
508
  // 1. WriteThread::Writer::EnterUnbatched() is used.
509
  // 2. db_mutex is NOT held
510
  Status RenameTempFileToOptionsFile(const std::string& file_name);
511
  Status DeleteObsoleteOptionsFiles();
512
513
  void NotifyOnFlushCompleted(ColumnFamilyData* cfd, FileMetaData* file_meta,
514
                              const MutableCFOptions& mutable_cf_options,
515
                              int job_id, TableProperties prop);
516
517
  void NotifyOnCompactionCompleted(ColumnFamilyData* cfd,
518
                                   Compaction *c, const Status &st,
519
                                   const CompactionJobStats& job_stats,
520
                                   int job_id);
521
522
  Status WriteImpl(const WriteOptions& options, WriteBatch* updates,
523
                   WriteCallback* callback);
524
525
 private:
526
  friend class DB;
527
  friend class InternalStats;
528
#ifndef ROCKSDB_LITE
529
  friend class ForwardIterator;
530
#endif
531
  friend struct SuperVersion;
532
  friend class CompactedDBImpl;
533
#ifndef NDEBUG
534
  friend class XFTransactionWriteHandler;
535
#endif
536
  struct CompactionState;
537
538
  struct ManualCompaction;
539
540
  struct WriteContext;
541
542
  class ThreadPoolTask;
543
544
  class CompactionTask;
545
  friend class CompactionTask;
546
547
  class FlushTask;
548
  friend class FlushTask;
549
550
  class TaskPriorityUpdater;
551
  friend class TaskPriorityUpdater;
552
553
  Status NewDB();
554
555
  // Recover the descriptor from persistent storage.  May do a significant
556
  // amount of work to recover recently logged updates.  Any changes to
557
  // be made to the descriptor are added to *edit.
558
  Status Recover(const std::vector<ColumnFamilyDescriptor>& column_families,
559
                 bool read_only = false, bool error_if_log_file_exist = false);
560
561
  void MaybeIgnoreError(Status* s) const;
562
563
  const Status CreateArchivalDirectory();
564
565
  // Delete any unneeded files and stale in-memory entries.
566
  void DeleteObsoleteFiles();
567
568
  // Flush the in-memory write buffer to storage.  Switches to a new
569
  // log-file/memtable and writes a new descriptor iff successful.
570
  Result<FileNumbersHolder> FlushMemTableToOutputFile(
571
      ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options,
572
      bool* made_progress, JobContext* job_context, LogBuffer* log_buffer);
573
574
  // REQUIRES: log_numbers are sorted in ascending order
575
  Status RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
576
                         SequenceNumber* max_sequence, bool read_only);
577
578
  // The following two methods are used to flush a memtable to
579
  // storage. The first one is used atdatabase RecoveryTime (when the
580
  // database is opened) and is heavyweight because it holds the mutex
581
  // for the entire period. The second method WriteLevel0Table supports
582
  // concurrent flush memtables to storage.
583
  Status WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
584
                                     MemTable* mem, VersionEdit* edit);
585
586
  // num_bytes: for slowdown case, delay time is calculated based on
587
  //            `num_bytes` going through.
588
  Status DelayWrite(uint64_t num_bytes);
589
590
  Status ScheduleFlushes(WriteContext* context);
591
592
  Status SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context);
593
594
  // Force current memtable contents to be flushed.
595
  Status FlushMemTable(ColumnFamilyData* cfd, const FlushOptions& options);
596
597
  // Wait for memtable flushed
598
  Status WaitForFlushMemTable(ColumnFamilyData* cfd);
599
600
#ifndef ROCKSDB_LITE
601
  Status CompactFilesImpl(
602
      const CompactionOptions& compact_options, ColumnFamilyData* cfd,
603
      Version* version, const std::vector<std::string>& input_file_names,
604
      const int output_level, int output_path_id, JobContext* job_context,
605
      LogBuffer* log_buffer);
606
#endif  // ROCKSDB_LITE
607
608
  ColumnFamilyData* GetColumnFamilyDataByName(const std::string& cf_name);
609
610
  void MaybeScheduleFlushOrCompaction();
611
  void SchedulePendingFlush(ColumnFamilyData* cfd);
612
  void SchedulePendingCompaction(ColumnFamilyData* cfd);
613
  static void BGWorkCompaction(void* arg);
614
  static void BGWorkFlush(void* db);
615
  static void UnscheduleCallback(void* arg);
616
  void WaitAfterBackgroundError(const Status& s, const char* job_name, LogBuffer* log_buffer);
617
  void BackgroundCallCompaction(
618
      ManualCompaction* manual_compaction, std::unique_ptr<Compaction> compaction = nullptr,
619
      CompactionTask* compaction_task = nullptr);
620
  void BackgroundCallFlush(ColumnFamilyData* cfd);
621
  Result<FileNumbersHolder> BackgroundCompaction(
622
      bool* made_progress, JobContext* job_context, LogBuffer* log_buffer,
623
      ManualCompaction* manual_compaction = nullptr,
624
      std::unique_ptr<Compaction> compaction = nullptr);
625
  Result<FileNumbersHolder> BackgroundFlush(
626
      bool* made_progress, JobContext* job_context, LogBuffer* log_buffer, ColumnFamilyData* cfd);
627
  void BackgroundJobComplete(const Status& s, JobContext* job_context, LogBuffer* log_buffer);
628
629
  uint64_t GetCurrentVersionSstFilesSize() override;
630
631
  uint64_t GetCurrentVersionSstFilesUncompressedSize() override;
632
633
  std::pair<uint64_t, uint64_t> GetCurrentVersionSstFilesAllSizes() override;
634
635
  uint64_t GetCurrentVersionDataSstFilesSize() override;
636
637
  uint64_t GetCurrentVersionNumSSTFiles() override;
638
639
  int GetCfdImmNumNotFlushed() override;
640
641
  // Updates stats_ object with SST files size metrics.
642
  void SetSSTFileTickers();
643
644
  // Return the minimum empty level that could hold the total data in the
645
  // input level. Return the input level, if such level could not be found.
646
  int FindMinimumEmptyLevelFitting(ColumnFamilyData* cfd,
647
      const MutableCFOptions& mutable_cf_options, int level);
648
649
  // Move the files in the input level to the target level.
650
  // If target_level < 0, automatically calculate the minimum level that could
651
  // hold the data set.
652
  Status ReFitLevel(ColumnFamilyData* cfd, int level, int target_level = -1);
653
654
  // helper functions for adding and removing from flush & compaction queues
655
  bool AddToCompactionQueue(ColumnFamilyData* cfd);
656
  std::unique_ptr<Compaction> PopFirstFromSmallCompactionQueue();
657
  std::unique_ptr<Compaction> PopFirstFromLargeCompactionQueue();
658
  bool IsEmptyCompactionQueue();
659
  void AddToFlushQueue(ColumnFamilyData* cfd);
660
  ColumnFamilyData* PopFirstFromFlushQueue();
661
662
  // Compaction is marked as large based on options, so cannot be static or free function.
663
  bool IsLargeCompaction(const Compaction& compaction);
664
665
  // helper function to call after some of the logs_ were synced
666
  void MarkLogsSynced(uint64_t up_to, bool synced_dir, const Status& status);
667
668
  const Snapshot* GetSnapshotImpl(bool is_write_conflict_boundary);
669
670
  CHECKED_STATUS ApplyVersionEdit(VersionEdit* edit);
671
672
  void SubmitCompactionOrFlushTask(std::unique_ptr<ThreadPoolTask> task);
673
674
  // Returns true if we have some background work.
675
  // I.e. scheduled but not complete compaction or flush.
676
  // prefix is used for logging.
677
  bool CheckBackgroundWorkAndLog(const char* prefix) const;
678
679
  void ListenFilesChanged(std::function<void()> listener) override;
680
681
  std::function<void()> GetFilesChangedListener() const;
682
683
  bool HasFilesChangedListener() const;
684
685
  void FilesChanged();
686
687
2.68M
  bool IsShuttingDown() { return shutting_down_.load(std::memory_order_acquire); }
688
689
  struct TaskPriorityChange {
690
    size_t task_serial_no;
691
    int new_priority;
692
  };
693
694
  const std::string& LogPrefix() const;
695
696
  // table_cache_ provides its own synchronization
697
  std::shared_ptr<Cache> table_cache_;
698
699
  // Lock over the persistent DB state.  Non-nullptr iff successfully acquired.
700
  FileLock* db_lock_;
701
702
  // The mutex for options file related operations.
703
  // NOTE: should never acquire options_file_mutex_ and mutex_ at the
704
  //       same time.
705
  InstrumentedMutex options_files_mutex_;
706
  // State below is protected by mutex_
707
  InstrumentedMutex mutex_;
708
709
  std::atomic<bool> shutting_down_;
710
711
  // This condition variable is signaled when state of having background work is changed.
712
  InstrumentedCondVar bg_cv_;
713
714
  uint64_t logfile_number_;
715
  std::deque<uint64_t>
716
      log_recycle_files;  // a list of log files that we can recycle
717
  bool log_dir_synced_;
718
  bool log_empty_;
719
  ColumnFamilyHandleImpl* default_cf_handle_;
720
  InternalStats* default_cf_internal_stats_;
721
  unique_ptr<ColumnFamilyMemTablesImpl> column_family_memtables_;
722
  struct LogFileNumberSize {
723
    explicit LogFileNumberSize(uint64_t _number)
724
455k
        : number(_number) {}
725
15.3M
    void AddSize(uint64_t new_size) { size += new_size; }
726
    uint64_t number;
727
    uint64_t size = 0;
728
    bool getting_flushed = false;
729
  };
730
  struct LogWriterNumber {
731
    // pass ownership of _writer
732
    LogWriterNumber(uint64_t _number, log::Writer* _writer)
733
455k
        : number(_number), writer(_writer) {}
734
735
20.1k
    log::Writer* ReleaseWriter() {
736
20.1k
      auto* w = writer;
737
20.1k
      writer = nullptr;
738
20.1k
      return w;
739
20.1k
    }
740
395k
    void ClearWriter() {
741
395k
      delete writer;
742
395k
      writer = nullptr;
743
395k
    }
744
745
    uint64_t number;
746
    // Visual Studio doesn't support deque's member to be noncopyable because
747
    // of a unique_ptr as a member.
748
    log::Writer* writer;  // own
749
    // true for some prefix of logs_
750
    bool getting_synced = false;
751
  };
752
  std::deque<LogFileNumberSize> alive_log_files_;
753
  // Log files that aren't fully synced, and the current log file.
754
  // Synchronization:
755
  //  - push_back() is done from write thread with locked mutex_,
756
  //  - pop_front() is done from any thread with locked mutex_,
757
  //  - back() and items with getting_synced=true are not popped,
758
  //  - it follows that write thread with unlocked mutex_ can safely access
759
  //    back() and items with getting_synced=true.
760
  std::deque<LogWriterNumber> logs_;
761
  // Signaled when getting_synced becomes false for some of the logs_.
762
  InstrumentedCondVar log_sync_cv_;
763
764
9.82M
  uint64_t total_log_size() {
765
    // TODO: use a weaker memory order for higher performance?
766
9.82M
    const int64_t total_log_size_signed = total_log_size_.load();
767
9.82M
    assert(total_log_size_signed >= 0);
768
9.82M
    if (total_log_size_signed < 0)  // Just in case, for release builds.
769
0
      return 0;
770
9.82M
    return static_cast<uint64_t>(total_log_size_signed);
771
9.82M
  }
772
773
  // We are using a signed int for the total log size to avoid weird effects in case of underflow.
774
  std::atomic<int64_t> total_log_size_;
775
  // only used for dynamically adjusting max_total_wal_size. it is a sum of
776
  // [write_buffer_size * max_write_buffer_number] over all column families
777
  uint64_t max_total_in_memory_state_;
778
  // If true, we have only one (default) column family. We use this to optimize
779
  // some code-paths
780
  bool single_column_family_mode_;
781
  // If this is non-empty, we need to delete these log files in background
782
  // threads. Protected by db mutex.
783
  autovector<log::Writer*> logs_to_free_;
784
785
  bool is_snapshot_supported_;
786
787
  // Class to maintain directories for all database paths other than main one.
788
  class Directories {
789
   public:
790
    Status SetDirectories(Env* env, const std::string& dbname,
791
                          const std::string& wal_dir,
792
                          const std::vector<DbPath>& data_paths);
793
794
    Directory* GetDataDir(size_t path_id);
795
796
75
    Directory* GetWalDir() {
797
75
      if (wal_dir_) {
798
0
        return wal_dir_.get();
799
0
      }
800
75
      return db_dir_.get();
801
75
    }
802
803
950k
    Directory* GetDbDir() { return db_dir_.get(); }
804
805
   private:
806
    std::unique_ptr<Directory> db_dir_;
807
    std::vector<std::unique_ptr<Directory>> data_dirs_;
808
    std::unique_ptr<Directory> wal_dir_;
809
810
    Status CreateAndNewDirectory(Env* env, const std::string& dirname,
811
                                 std::unique_ptr<Directory>* directory) const;
812
  };
813
814
  Directories directories_;
815
816
  WriteBuffer write_buffer_;
817
818
  WriteThread write_thread_;
819
820
#ifndef NDEBUG
821
  std::atomic<int> write_waiters_{0};
822
#endif
823
824
  WriteBatch tmp_batch_;
825
826
  WriteController write_controller_;
827
828
  // Size of the last batch group. In slowdown mode, next write needs to
829
  // sleep if it uses up the quota.
830
  uint64_t last_batch_group_size_;
831
832
  FlushScheduler flush_scheduler_;
833
834
  SnapshotList snapshots_;
835
836
  // For each background job, pending_outputs_ keeps the file number being written by that job.
837
  // FindObsoleteFiles()/PurgeObsoleteFiles() never deletes any file that is present in
838
  // pending_outputs_. After a background job is done executing, its file number is
839
  // deleted from pending_outputs_, which allows PurgeObsoleteFiles() to clean
840
  // it up.
841
  //
842
  // Background job needs to call
843
  //   {
844
  //     auto file_num_holder = pending_outputs_->NewFileNumber();
845
  //     auto file_num = *file_num_holder;
846
  //     <do something>
847
  //   }
848
  // This will protect file with number `file_num` from being deleted while <do something> is
849
  // running. NewFileNumber() will allocate new file number and append it to pending_outputs_.
850
  // This will prevent any background process to delete this file. File number will be
851
  // automatically removed from pending_outputs_ when file_num_holder is released on exit outside
852
  // of the scope.
853
  std::unique_ptr<FileNumbersProvider> pending_outputs_;
854
855
  // flush_queue_ and compaction_queue_ hold column families that we need to
856
  // flush and compact, respectively.
857
  // A column family is inserted into flush_queue_ when it satisfies condition
858
  // cfd->imm()->IsFlushPending()
859
  // A column family is inserted into compaction_queue_ when it satisfied
860
  // condition cfd->NeedsCompaction()
861
  // Column families in this list are all Ref()-erenced
862
  // TODO(icanadi) Provide some kind of ReferencedColumnFamily class that will
863
  // do RAII on ColumnFamilyData
864
  // Column families are in this queue when they need to be flushed or
865
  // compacted. Consumers of these queues are flush and compaction threads. When
866
  // column family is put on this queue, we increase unscheduled_flushes_ and
867
  // unscheduled_compactions_. When these variables are bigger than zero, that
868
  // means we need to schedule background threads for compaction and thread.
869
  // Once the background threads are scheduled, we decrease unscheduled_flushes_
870
  // and unscheduled_compactions_. That way we keep track of number of
871
  // compaction and flush threads we need to schedule. This scheduling is done
872
  // in MaybeScheduleFlushOrCompaction()
873
  // invariant(column family present in flush_queue_ <==>
874
  // ColumnFamilyData::pending_flush_ == true)
875
  std::deque<ColumnFamilyData*> flush_queue_;
876
  // invariant(column family present in compaction_queue_ <==>
877
  // ColumnFamilyData::pending_compaction_ == true)
878
  std::deque<std::unique_ptr<Compaction>> small_compaction_queue_;
879
  std::deque<std::unique_ptr<Compaction>> large_compaction_queue_;
880
  int unscheduled_flushes_;
881
  int unscheduled_compactions_;
882
883
  // count how many background compactions are running or have been scheduled
884
  // This variable is left untouched when priority thread pool is used.
885
  int bg_compaction_scheduled_;
886
887
  // Those tasks are managed by thread pool.
888
  // And we remove them from this set, when they are processed/aborted by thread pool.
889
  std::unordered_set<CompactionTask*> compaction_tasks_;
890
891
  // stores the total number of compactions that are currently running
892
  int num_total_running_compactions_;
893
894
  // stores the number of large compaction that are currently running
895
  int num_running_large_compactions_;
896
897
  // number of background memtable flush jobs, submitted to the HIGH pool
898
  int bg_flush_scheduled_;
899
900
  // stores the number of flushes are currently running
901
  int num_running_flushes_;
902
903
  // Information for a manual compaction
904
  struct ManualCompaction {
905
    ColumnFamilyData* cfd;
906
    int input_level;
907
    int output_level;
908
    uint32_t output_path_id;
909
    Status status;
910
    bool done;
911
    bool in_progress;             // compaction request being processed?
912
    bool incomplete;              // only part of requested range compacted
913
    bool exclusive;               // current behavior of only one manual
914
    bool disallow_trivial_move;   // Force actual compaction to run
915
    const InternalKey* begin;     // nullptr means beginning of key range
916
    const InternalKey* end;       // nullptr means end of key range
917
    InternalKey* manual_end;      // how far we are compacting
918
    InternalKey tmp_storage;      // Used to keep track of compaction progress
919
    InternalKey tmp_storage1;     // Used to keep track of compaction progress
920
    std::unique_ptr<Compaction> compaction;
921
  };
922
  std::deque<ManualCompaction*> manual_compaction_dequeue_;
923
924
  struct CompactionArg {
925
    DBImpl* db;
926
    ManualCompaction* m;
927
  };
928
929
  // Have we encountered a background error in paranoid mode?
930
  Status bg_error_;
931
932
  // shall we disable deletion of obsolete files
933
  // if 0 the deletion is enabled.
934
  // if non-zero, files will not be getting deleted
935
  // This enables two different threads to call
936
  // EnableFileDeletions() and DisableFileDeletions()
937
  // without any synchronization
938
  int disable_delete_obsolete_files_;
939
940
  // next time when we should run DeleteObsoleteFiles with full scan
941
  uint64_t delete_obsolete_files_next_run_;
942
943
  // last time stats were dumped to LOG
944
  std::atomic<uint64_t> last_stats_dump_time_microsec_;
945
946
  // Each flush or compaction gets its own job id. this counter makes sure
947
  // they're unique
948
  std::atomic<int> next_job_id_;
949
950
  // A flag indicating whether the current rocksdb database has any
951
  // data that is not yet persisted into either WAL or SST file.
952
  // Used when disableWAL is true.
953
  bool has_unpersisted_data_;
954
955
  static const int KEEP_LOG_FILE_NUM = 1000;
956
  // MSVC version 1800 still does not have constexpr for ::max()
957
  static const uint64_t kNoTimeOut = port::kMaxUint64;
958
959
  std::string db_absolute_path_;
960
961
  // The options to access storage files
962
  const EnvOptions env_options_;
963
964
#ifndef ROCKSDB_LITE
965
  WalManager wal_manager_;
966
#endif  // ROCKSDB_LITE
967
968
  // Unified interface for logging events
969
  EventLogger event_logger_;
970
971
  // A value of > 0 temporarily disables scheduling of background work
972
  int bg_work_paused_;
973
974
  // A value of > 0 temporarily disables scheduling of background compaction
975
  int bg_compaction_paused_;
976
977
  // Guard against multiple concurrent refitting
978
  bool refitting_level_;
979
980
  // Indicate DB was opened successfully
981
  bool opened_successfully_;
982
983
  // Returns flush tick of the last flush of this DB.
984
  int64_t last_flush_at_tick_ = 0;
985
986
  // Whether DB should be flushed on shutdown.
987
  std::atomic<bool> disable_flush_on_shutdown_{false};
988
989
  mutable std::mutex files_changed_listener_mutex_;
990
991
  std::function<void()> files_changed_listener_ GUARDED_BY(files_changed_listener_mutex_);
992
993
  // No copying allowed
994
  DBImpl(const DBImpl&) = delete;
995
  void operator=(const DBImpl&) = delete;
996
997
  // Background threads call this function, which is just a wrapper around
998
  // the InstallSuperVersion() function. Background threads carry
999
  // job_context which can have new_superversion already
1000
  // allocated.
1001
  void InstallSuperVersionAndScheduleWorkWrapper(
1002
      ColumnFamilyData* cfd, JobContext* job_context,
1003
      const MutableCFOptions& mutable_cf_options);
1004
1005
  // All ColumnFamily state changes go through this function. Here we analyze
1006
  // the new state and we schedule background work if we detect that the new
1007
  // state needs flush or compaction.
1008
  std::unique_ptr<SuperVersion> InstallSuperVersionAndScheduleWork(
1009
      ColumnFamilyData* cfd, SuperVersion* new_sv,
1010
      const MutableCFOptions& mutable_cf_options);
1011
1012
#ifndef ROCKSDB_LITE
1013
  using DB::GetPropertiesOfAllTables;
1014
  virtual Status GetPropertiesOfAllTables(ColumnFamilyHandle* column_family,
1015
                                          TablePropertiesCollection* props)
1016
      override;
1017
  virtual Status GetPropertiesOfTablesInRange(
1018
      ColumnFamilyHandle* column_family, const Range* range, std::size_t n,
1019
      TablePropertiesCollection* props) override;
1020
1021
  // Obtains all column family options and corresponding names,
1022
  // dropped columns are not included into the resulting collections.
1023
  // REQUIREMENT: mutex_ must be held when calling this function.
1024
  void GetColumnFamiliesOptionsUnlocked(
1025
      std::vector<std::string>* column_family_names,
1026
      std::vector<ColumnFamilyOptions>* column_family_options);
1027
#endif  // ROCKSDB_LITE
1028
1029
  // Function that Get and KeyMayExist call with no_io true or false
1030
  // Note: 'value_found' from KeyMayExist propagates here
1031
  Status GetImpl(const ReadOptions& options, ColumnFamilyHandle* column_family,
1032
                 const Slice& key, std::string* value,
1033
                 bool* value_found = nullptr);
1034
1035
  bool GetIntPropertyInternal(ColumnFamilyData* cfd,
1036
                              const DBPropertyInfo& property_info,
1037
                              bool is_locked, uint64_t* value);
1038
1039
  bool HasPendingManualCompaction();
1040
  bool HasExclusiveManualCompaction();
1041
  void AddManualCompaction(ManualCompaction* m);
1042
  void RemoveManualCompaction(ManualCompaction* m);
1043
  bool ShouldntRunManualCompaction(ManualCompaction* m);
1044
  bool HaveManualCompaction(ColumnFamilyData* cfd);
1045
  bool MCOverlap(ManualCompaction* m, ManualCompaction* m1);
1046
};
1047
1048
// Sanitize db options.  The caller should delete result.info_log if
1049
// it is not equal to src.info_log.
1050
extern Options SanitizeOptions(const std::string& db,
1051
                               const InternalKeyComparator* icmp,
1052
                               const Options& src);
1053
extern DBOptions SanitizeOptions(const std::string& db, const DBOptions& src);
1054
1055
// Fix user-supplied options to be reasonable
1056
template <class T, class V>
1057
2.16M
static void ClipToRange(T* ptr, V minvalue, V maxvalue) {
1058
2.16M
  if (static_cast<V>(*ptr) > maxvalue) 
*ptr = maxvalue24.1k
;
1059
2.16M
  if (static_cast<V>(*ptr) < minvalue) 
*ptr = minvalue3.81k
;
1060
2.16M
}
column_family.cc:void rocksdb::ClipToRange<unsigned long, unsigned long>(unsigned long*, unsigned long, unsigned long)
Line
Count
Source
1057
1.30M
static void ClipToRange(T* ptr, V minvalue, V maxvalue) {
1058
1.30M
  if (static_cast<V>(*ptr) > maxvalue) 
*ptr = maxvalue0
;
1059
1.30M
  if (static_cast<V>(*ptr) < minvalue) 
*ptr = minvalue3.81k
;
1060
1.30M
}
db_impl.cc:void rocksdb::ClipToRange<int, int>(int*, int, int)
Line
Count
Source
1057
857k
static void ClipToRange(T* ptr, V minvalue, V maxvalue) {
1058
857k
  if (static_cast<V>(*ptr) > maxvalue) 
*ptr = maxvalue24.1k
;
1059
857k
  if (static_cast<V>(*ptr) < minvalue) 
*ptr = minvalue2
;
1060
857k
}
1061
1062
}  // namespace rocksdb
1063
1064
#endif // YB_ROCKSDB_DB_DB_IMPL_H