YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/rocksdb/db/column_family.cc
Line
Count
Source (jump to first uncovered line)
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under the BSD-style license found in the
3
//  LICENSE file in the root directory of this source tree. An additional grant
4
//  of patent rights can be found in the PATENTS file in the same directory.
5
//
6
// The following only applies to changes made to this file as part of YugaByte development.
7
//
8
// Portions Copyright (c) YugaByte, Inc.
9
//
10
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
11
// in compliance with the License.  You may obtain a copy of the License at
12
//
13
// http://www.apache.org/licenses/LICENSE-2.0
14
//
15
// Unless required by applicable law or agreed to in writing, software distributed under the License
16
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
17
// or implied.  See the License for the specific language governing permissions and limitations
18
// under the License.
19
//
20
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
21
// Use of this source code is governed by a BSD-style license that can be
22
// found in the LICENSE file. See the AUTHORS file for names of contributors.
23
24
#include "yb/rocksdb/db/column_family.h"
25
26
#ifndef __STDC_FORMAT_MACROS
27
#define __STDC_FORMAT_MACROS
28
#endif
29
30
#include <inttypes.h>
31
#include <vector>
32
#include <string>
33
#include <algorithm>
34
#include <limits>
35
36
#include "yb/rocksdb/db/compaction_picker.h"
37
#include "yb/rocksdb/db/db_impl.h"
38
#include "yb/rocksdb/db/internal_stats.h"
39
#include "yb/rocksdb/db/job_context.h"
40
#include "yb/rocksdb/db/table_properties_collector.h"
41
#include "yb/rocksdb/db/version_set.h"
42
#include "yb/rocksdb/db/write_controller.h"
43
#include "yb/rocksdb/db/writebuffer.h"
44
#include "yb/rocksdb/util/autovector.h"
45
#include "yb/rocksdb/util/compression.h"
46
#include "yb/rocksdb/util/options_helper.h"
47
#include "yb/rocksdb/util/statistics.h"
48
#include "yb/rocksdb/util/xfunc.h"
49
50
#include "yb/util/logging.h"
51
#include <glog/logging.h>
52
53
DEFINE_int32(memstore_arena_size_kb, 64, "Size of each arena allocation for the memstore");
54
55
namespace rocksdb {
56
57
ColumnFamilyHandleImpl::ColumnFamilyHandleImpl(
58
    ColumnFamilyData* column_family_data, DBImpl* db, InstrumentedMutex* mutex)
59
1.36M
    : cfd_(column_family_data), db_(db), mutex_(mutex) {
60
1.36M
  if (cfd_ != nullptr) {
61
874k
    cfd_->Ref();
62
874k
  }
63
1.36M
}
64
65
1.28M
ColumnFamilyHandleImpl::~ColumnFamilyHandleImpl() {
66
1.28M
  if (cfd_ != nullptr) {
67
    // Job id == 0 means that this is not our background process, but rather
68
    // user thread
69
835k
    JobContext job_context(0);
70
835k
    mutex_->Lock();
71
835k
    if (cfd_->Unref()) {
72
26
      delete cfd_;
73
26
    }
74
835k
    db_->FindObsoleteFiles(&job_context, false, true);
75
835k
    mutex_->Unlock();
76
835k
    if (job_context.HaveSomethingToDelete()) {
77
256k
      db_->PurgeObsoleteFiles(job_context);
78
256k
    }
79
835k
    job_context.Clean();
80
835k
  }
81
1.28M
}
82
83
44.5M
uint32_t ColumnFamilyHandleImpl::GetID() const { return cfd()->GetID(); }
84
85
0
const std::string& ColumnFamilyHandleImpl::GetName() const {
86
0
  return cfd()->GetName();
87
0
}
88
89
69
Status ColumnFamilyHandleImpl::GetDescriptor(ColumnFamilyDescriptor* desc) {
90
69
#ifndef ROCKSDB_LITE
91
  // accessing mutable cf-options requires db mutex.
92
69
  InstrumentedMutexLock l(mutex_);
93
69
  *desc = ColumnFamilyDescriptor(
94
69
      cfd()->GetName(),
95
69
      BuildColumnFamilyOptions(*cfd()->options(),
96
69
                               *cfd()->GetLatestMutableCFOptions()));
97
69
  return Status::OK();
98
#else
99
  return STATUS(NotSupported, "");
100
#endif  // !ROCKSDB_LITE
101
69
}
102
103
7.28M
const Comparator* ColumnFamilyHandleImpl::user_comparator() const {
104
7.28M
  return cfd()->user_comparator();
105
7.28M
}
106
107
void GetIntTblPropCollectorFactory(
108
    const ColumnFamilyOptions& cf_options,
109
879k
    IntTblPropCollectorFactories* int_tbl_prop_collector_factories) {
110
879k
  auto& collector_factories = cf_options.table_properties_collector_factories;
111
879k
  for (size_t i = 0; i < cf_options.table_properties_collector_factories.size();
112
879k
       
++i107
) {
113
107
    DCHECK(collector_factories[i]);
114
107
    int_tbl_prop_collector_factories->emplace_back(
115
107
        new UserKeyTablePropertiesCollectorFactory(collector_factories[i]));
116
107
  }
117
  // Add collector to collect internal key statistics
118
879k
  int_tbl_prop_collector_factories->emplace_back(
119
879k
      new InternalKeyPropertiesCollectorFactory);
120
879k
}
121
122
439k
Status CheckCompressionSupported(const ColumnFamilyOptions& cf_options) {
123
439k
  if (!cf_options.compression_per_level.empty()) {
124
88
    for (size_t level = 0; level < cf_options.compression_per_level.size();
125
66
         ++level) {
126
66
      if (!CompressionTypeSupported(cf_options.compression_per_level[level])) {
127
0
        return STATUS(InvalidArgument,
128
0
            "Compression type " +
129
0
            CompressionTypeToString(cf_options.compression_per_level[level]) +
130
0
            " is not linked with the binary.");
131
0
      }
132
66
    }
133
439k
  } else {
134
439k
    if (!CompressionTypeSupported(cf_options.compression)) {
135
2
      return STATUS(InvalidArgument,
136
2
          "Compression type " +
137
2
          CompressionTypeToString(cf_options.compression) +
138
2
          " is not linked with the binary.");
139
2
    }
140
439k
  }
141
439k
  return Status::OK();
142
439k
}
143
144
386
Status CheckConcurrentWritesSupported(const ColumnFamilyOptions& cf_options) {
145
386
  if (cf_options.inplace_update_support) {
146
0
    return STATUS(InvalidArgument,
147
0
        "In-place memtable updates (inplace_update_support) is not compatible "
148
0
        "with concurrent writes (allow_concurrent_memtable_write)");
149
0
  }
150
386
  if (cf_options.filter_deletes) {
151
0
    return STATUS(InvalidArgument,
152
0
        "Delete filtering (filter_deletes) is not compatible with concurrent "
153
0
        "memtable writes (allow_concurrent_memtable_writes)");
154
0
  }
155
386
  if (!cf_options.memtable_factory->IsInsertConcurrentlySupported()) {
156
2
    return STATUS(InvalidArgument,
157
2
        "Memtable doesn't concurrent writes (allow_concurrent_memtable_write)");
158
2
  }
159
384
  return Status::OK();
160
386
}
161
162
ColumnFamilyOptions SanitizeOptions(const DBOptions& db_options,
163
                                    const InternalKeyComparator* icmp,
164
1.30M
                                    const ColumnFamilyOptions& src) {
165
1.30M
  ColumnFamilyOptions result = src;
166
1.30M
  result.comparator = icmp;
167
1.30M
#ifdef OS_MACOSX
168
  // TODO(icanadi) make write_buffer_size uint64_t instead of size_t
169
1.30M
  ClipToRange(&result.write_buffer_size, ((size_t)64) << 10, ((size_t)1) << 30);
170
#else
171
  ClipToRange(&result.write_buffer_size,
172
              ((size_t)64) << 10, ((size_t)64) << 30);
173
#endif
174
  // if user sets arena_block_size, we trust user to use this value. Otherwise,
175
  // calculate a proper value from writer_buffer_size;
176
1.30M
  if (result.arena_block_size <= 0) {
177
1.30M
    result.arena_block_size = std::min(
178
1.30M
        result.write_buffer_size / 8, static_cast<size_t>(FLAGS_memstore_arena_size_kb << 10));
179
180
    // Align up to 4k
181
1.30M
    const size_t align = 4 * 1024;
182
1.30M
    result.arena_block_size =
183
1.30M
        ((result.arena_block_size + align - 1) / align) * align;
184
1.30M
  }
185
1.30M
  result.min_write_buffer_number_to_merge =
186
1.30M
      std::min(result.min_write_buffer_number_to_merge,
187
1.30M
               result.max_write_buffer_number - 1);
188
1.30M
  if (result.num_levels < 1) {
189
54
    result.num_levels = 1;
190
54
  }
191
1.30M
  if (result.compaction_style == kCompactionStyleLevel &&
192
1.30M
      
result.num_levels < 2461k
) {
193
57
    result.num_levels = 2;
194
57
  }
195
1.30M
  if (result.max_write_buffer_number < 2) {
196
0
    result.max_write_buffer_number = 2;
197
0
  }
198
1.30M
  if (result.max_write_buffer_number_to_maintain < 0) {
199
88
    result.max_write_buffer_number_to_maintain = result.max_write_buffer_number;
200
88
  }
201
1.30M
  XFUNC_TEST("memtablelist_history", "transaction_xftest_SanitizeOptions",
202
1.30M
             xf_transaction_set_memtable_history1,
203
1.30M
             xf_transaction_set_memtable_history,
204
1.30M
             &result.max_write_buffer_number_to_maintain);
205
1.30M
  XFUNC_TEST("memtablelist_history_clear", "transaction_xftest_SanitizeOptions",
206
1.30M
             xf_transaction_clear_memtable_history1,
207
1.30M
             xf_transaction_clear_memtable_history,
208
1.30M
             &result.max_write_buffer_number_to_maintain);
209
210
1.30M
  if (!result.prefix_extractor) {
211
1.29M
    DCHECK(result.memtable_factory);
212
1.29M
    Slice name = result.memtable_factory->Name();
213
1.29M
    if (name.compare("HashSkipListRepFactory") == 0 ||
214
1.29M
        name.compare("HashLinkListRepFactory") == 0) {
215
80
      result.memtable_factory = std::make_shared<SkipListFactory>();
216
80
    }
217
1.29M
  }
218
219
1.30M
  if (result.compaction_style == kCompactionStyleFIFO) {
220
321
    result.num_levels = 1;
221
    // since we delete level0 files in FIFO compaction when there are too many
222
    // of them, these options don't really mean anything
223
321
    result.level0_file_num_compaction_trigger = std::numeric_limits<int>::max();
224
321
    result.level0_slowdown_writes_trigger = std::numeric_limits<int>::max();
225
321
    result.level0_stop_writes_trigger = std::numeric_limits<int>::max();
226
321
  }
227
228
1.30M
  if (result.level0_file_num_compaction_trigger == 0) {
229
10
    RWARN(db_options.info_log.get(),
230
10
        "level0_file_num_compaction_trigger cannot be 0");
231
10
    result.level0_file_num_compaction_trigger = 1;
232
10
  }
233
234
1.30M
  if (result.level0_stop_writes_trigger <
235
1.30M
          result.level0_slowdown_writes_trigger ||
236
1.30M
      result.level0_slowdown_writes_trigger <
237
1.30M
          result.level0_file_num_compaction_trigger) {
238
109
    RWARN(db_options.info_log.get(),
239
109
        "This condition must be satisfied: "
240
109
            "level0_stop_writes_trigger(%d) >= "
241
109
            "level0_slowdown_writes_trigger(%d) >= "
242
109
            "level0_file_num_compaction_trigger(%d)",
243
109
        result.level0_stop_writes_trigger,
244
109
        result.level0_slowdown_writes_trigger,
245
109
        result.level0_file_num_compaction_trigger);
246
109
    if (result.level0_slowdown_writes_trigger <
247
109
        result.level0_file_num_compaction_trigger) {
248
61
      result.level0_slowdown_writes_trigger =
249
61
          result.level0_file_num_compaction_trigger;
250
61
    }
251
109
    if (result.level0_stop_writes_trigger <
252
109
        result.level0_slowdown_writes_trigger) {
253
85
      result.level0_stop_writes_trigger = result.level0_slowdown_writes_trigger;
254
85
    }
255
109
    RWARN(db_options.info_log.get(),
256
109
        "Adjust the value to "
257
109
            "level0_stop_writes_trigger(%d)"
258
109
            "level0_slowdown_writes_trigger(%d)"
259
109
            "level0_file_num_compaction_trigger(%d)",
260
109
        result.level0_stop_writes_trigger,
261
109
        result.level0_slowdown_writes_trigger,
262
109
        result.level0_file_num_compaction_trigger);
263
109
  }
264
265
1.30M
  if (
result.soft_pending_compaction_bytes_limit == 01.30M
) {
266
1.30M
    result.soft_pending_compaction_bytes_limit =
267
1.30M
        result.hard_pending_compaction_bytes_limit;
268
18.4E
  } else if (result.hard_pending_compaction_bytes_limit > 0 &&
269
18.4E
             result.soft_pending_compaction_bytes_limit >
270
4
                 result.hard_pending_compaction_bytes_limit) {
271
1
    result.soft_pending_compaction_bytes_limit =
272
1
        result.hard_pending_compaction_bytes_limit;
273
1
  }
274
275
1.30M
  if (result.level_compaction_dynamic_level_bytes) {
276
32
    if (result.compaction_style != kCompactionStyleLevel ||
277
32
        db_options.db_paths.size() > 1U) {
278
      // 1. level_compaction_dynamic_level_bytes only makes sense for
279
      //    level-based compaction.
280
      // 2. we don't yet know how to make both of this feature and multiple
281
      //    DB path work.
282
0
      result.level_compaction_dynamic_level_bytes = false;
283
0
    }
284
32
  }
285
286
1.30M
  return result;
287
1.30M
}
288
289
int SuperVersion::dummy = 0;
290
void* const SuperVersion::kSVInUse = &SuperVersion::dummy;
291
void* const SuperVersion::kSVObsolete = nullptr;
292
293
1.28M
SuperVersion::~SuperVersion() {
294
1.28M
  for (auto td : to_delete) {
295
28.9k
    delete td;
296
28.9k
  }
297
1.28M
}
298
299
53.5M
SuperVersion* SuperVersion::Ref() {
300
53.5M
  refs.fetch_add(1, std::memory_order_relaxed);
301
53.5M
  return this;
302
53.5M
}
303
304
54.7M
bool SuperVersion::Unref() {
305
  // fetch_sub returns the previous value of ref
306
54.7M
  uint32_t previous_refs = refs.fetch_sub(1);
307
54.7M
  DCHECK_GT(previous_refs, 0);
308
54.7M
  return previous_refs == 1;
309
54.7M
}
310
311
1.20M
void SuperVersion::Cleanup() {
312
1.20M
  DCHECK_EQ(refs.load(std::memory_order_relaxed), 0);
313
1.20M
  imm->Unref(&to_delete);
314
1.20M
  MemTable* m = mem->Unref();
315
1.20M
  if (m != nullptr) {
316
181
    auto* memory_usage = current->cfd()->imm()->current_memory_usage();
317
181
    DCHECK_GE(*memory_usage, m->ApproximateMemoryUsage());
318
181
    *memory_usage -= m->ApproximateMemoryUsage();
319
181
    to_delete.push_back(m);
320
181
  }
321
1.20M
  current->Unref();
322
1.20M
}
323
324
void SuperVersion::Init(MemTable* new_mem, MemTableListVersion* new_imm,
325
1.24M
                        Version* new_current) {
326
1.24M
  mem = new_mem;
327
1.24M
  imm = new_imm;
328
1.24M
  current = new_current;
329
1.24M
  mem->Ref();
330
1.24M
  imm->Ref();
331
1.24M
  current->Ref();
332
1.24M
  refs.store(1, std::memory_order_relaxed);
333
1.24M
}
334
335
namespace {
336
272k
void SuperVersionUnrefHandle(void* ptr) {
337
  // UnrefHandle is called when a thread exists or a ThreadLocalPtr gets
338
  // destroyed. When former happens, the thread shouldn't see kSVInUse.
339
  // When latter happens, we are in ~ColumnFamilyData(), no get should happen as
340
  // well.
341
272k
  SuperVersion* sv = static_cast<SuperVersion*>(ptr);
342
272k
  if (sv->Unref()) {
343
0
    sv->db_mutex->Lock();
344
0
    sv->Cleanup();
345
0
    sv->db_mutex->Unlock();
346
0
    delete sv;
347
0
  }
348
272k
}
349
}  // anonymous namespace
350
351
ColumnFamilyData::ColumnFamilyData(
352
    uint32_t id, const std::string& name, Version* _dummy_versions,
353
    Cache* _table_cache, WriteBuffer* write_buffer,
354
    const ColumnFamilyOptions& cf_options, const DBOptions* db_options,
355
    const EnvOptions& env_options, ColumnFamilySet* column_family_set)
356
    : id_(id),
357
      name_(name),
358
      dummy_versions_(_dummy_versions),
359
      current_(nullptr),
360
      refs_(0),
361
      dropped_(false),
362
      internal_comparator_(std::make_shared<InternalKeyComparator>(cf_options.comparator)),
363
      options_(*db_options,
364
               SanitizeOptions(*db_options, internal_comparator_.get(), cf_options)),
365
      ioptions_(options_),
366
      mutable_cf_options_(options_, ioptions_),
367
      write_buffer_(write_buffer),
368
      mem_(nullptr),
369
      imm_(options_.min_write_buffer_number_to_merge,
370
           options_.max_write_buffer_number_to_maintain),
371
      super_version_(nullptr),
372
      super_version_number_(0),
373
      local_sv_(new ThreadLocalPtr(&SuperVersionUnrefHandle)),
374
      next_(nullptr),
375
      prev_(nullptr),
376
      log_number_(0),
377
      column_family_set_(column_family_set),
378
      pending_flush_(false),
379
      pending_compaction_(false),
380
879k
      prev_compaction_needed_bytes_(0) {
381
879k
  Ref();
382
383
  // Convert user defined table properties collector factories to internal ones.
384
879k
  GetIntTblPropCollectorFactory(options_, &int_tbl_prop_collector_factories_);
385
386
  // if _dummy_versions is nullptr, then this is a dummy column family.
387
879k
  if (_dummy_versions != nullptr) {
388
442k
    internal_stats_.reset(
389
442k
        new InternalStats(ioptions_.num_levels, db_options->env, this));
390
442k
    table_cache_.reset(new TableCache(ioptions_, env_options, _table_cache));
391
442k
    if (ioptions_.compaction_style == kCompactionStyleLevel) {
392
15.6k
      compaction_picker_.reset(
393
15.6k
          new LevelCompactionPicker(ioptions_, internal_comparator_.get()));
394
15.6k
#ifndef ROCKSDB_LITE
395
426k
    } else if (ioptions_.compaction_style == kCompactionStyleUniversal) {
396
425k
      compaction_picker_.reset(
397
425k
          new UniversalCompactionPicker(ioptions_, internal_comparator_.get()));
398
425k
    } else 
if (671
ioptions_.compaction_style == kCompactionStyleFIFO671
) {
399
289
      compaction_picker_.reset(
400
289
          new FIFOCompactionPicker(ioptions_, internal_comparator_.get()));
401
382
    } else if (ioptions_.compaction_style == kCompactionStyleNone) {
402
354
      compaction_picker_.reset(new NullCompactionPicker(
403
354
          ioptions_, internal_comparator_.get()));
404
354
      RLOG(InfoLogLevel::WARN_LEVEL, ioptions_.info_log,
405
354
          "Column family %s does not use any background compaction. "
406
354
          "Compactions can only be done via CompactFiles\n",
407
354
          GetName().c_str());
408
354
#endif  // !ROCKSDB_LITE
409
354
    } else {
410
28
      RLOG(InfoLogLevel::ERROR_LEVEL, ioptions_.info_log,
411
28
          "Unable to recognize the specified compaction style %d. "
412
28
          "Column family %s will use kCompactionStyleLevel.\n",
413
28
          ioptions_.compaction_style, GetName().c_str());
414
28
      compaction_picker_.reset(
415
28
          new LevelCompactionPicker(ioptions_, internal_comparator_.get()));
416
28
    }
417
418
442k
    if (column_family_set_->NumberOfColumnFamilies() < 10) {
419
441k
      RLOG(InfoLogLevel::DEBUG_LEVEL, ioptions_.info_log,
420
441k
          "--------------- Options for column family [%s]:\n", name.c_str());
421
441k
      if (ioptions_.info_log != nullptr &&
422
441k
          
ioptions_.info_log->GetInfoLogLevel() <= InfoLogLevel::DEBUG_LEVEL441k
) {
423
17.7k
        options_.DumpCFOptions(ioptions_.info_log);
424
17.7k
      }
425
441k
    } else {
426
123
      RLOG(InfoLogLevel::INFO_LEVEL, ioptions_.info_log,
427
123
          "\t(skipping printing options)\n");
428
123
    }
429
442k
  }
430
431
879k
  RecalculateWriteStallConditions(mutable_cf_options_);
432
879k
}
433
434
// DB mutex held
435
799k
ColumnFamilyData::~ColumnFamilyData() {
436
799k
  DCHECK_EQ
(refs_.load(std::memory_order_relaxed), 0) << this0
;
437
  // remove from linked list
438
799k
  auto prev = prev_;
439
799k
  auto next = next_;
440
799k
  prev->next_ = next;
441
799k
  next->prev_ = prev;
442
443
799k
  if (!dropped_ && 
column_family_set_ != nullptr798k
) {
444
    // If it's dropped, it's already removed from column family set
445
    // If column_family_set_ == nullptr, this is dummy CFD and not in
446
    // ColumnFamilySet
447
402k
    column_family_set_->RemoveColumnFamily(this);
448
402k
  }
449
450
799k
  {
451
799k
    Version* const current_version = current();
452
799k
    if (current_version != nullptr) {
453
402k
      current_version->Unref();
454
402k
    }
455
799k
  }
456
457
  // It would be wrong if this ColumnFamilyData is in flush_queue_ or
458
  // compaction_queue_ and we destroyed it
459
799k
  DCHECK(!pending_flush_);
460
799k
  DCHECK(!pending_compaction_);
461
462
799k
  if (super_version_ != nullptr) {
463
    // Release SuperVersion reference kept in ThreadLocalPtr.
464
    // This must be done outside of mutex_ since unref handler can lock mutex.
465
399k
    super_version_->db_mutex->Unlock();
466
399k
    local_sv_.reset();
467
399k
    super_version_->db_mutex->Lock();
468
469
399k
    bool is_last_reference __attribute__((unused));
470
399k
    is_last_reference = super_version_->Unref();
471
399k
    DCHECK(is_last_reference);
472
399k
    super_version_->Cleanup();
473
399k
    delete super_version_;
474
399k
    super_version_ = nullptr;
475
399k
  }
476
477
799k
  if (dummy_versions_ != nullptr) {
478
    // List must be empty
479
402k
    DCHECK_EQ(dummy_versions_->TEST_Next(), dummy_versions_);
480
402k
    bool deleted __attribute__((unused)) = dummy_versions_->Unref();
481
402k
    DCHECK(deleted);
482
402k
  }
483
484
799k
  if (mem_ != nullptr) {
485
402k
    delete mem_->Unref();
486
402k
  }
487
799k
  autovector<MemTable*> to_delete;
488
799k
  imm_.current()->Unref(&to_delete);
489
799k
  for (MemTable* m : to_delete) {
490
107
    delete m;
491
107
  }
492
799k
}
493
494
26
void ColumnFamilyData::SetDropped() {
495
  // can't drop default CF
496
26
  DCHECK_NE(id_, 0);
497
26
  dropped_ = true;
498
26
  write_controller_token_.reset();
499
500
  // remove from column_family_set
501
26
  column_family_set_->RemoveColumnFamily(this);
502
26
}
503
504
const double kSlowdownRatio = 1.2;
505
506
namespace {
507
std::unique_ptr<WriteControllerToken> SetupDelay(
508
    uint64_t max_write_rate, WriteController* write_controller,
509
    uint64_t compaction_needed_bytes, uint64_t prev_compaction_neeed_bytes,
510
899
    bool auto_comapctions_disabled) {
511
899
  const uint64_t kMinWriteRate = 1024u;  // Minimum write rate 1KB/s.
512
513
899
  uint64_t write_rate = write_controller->delayed_write_rate();
514
515
899
  if (auto_comapctions_disabled) {
516
    // When auto compaction is disabled, always use the value user gave.
517
6
    write_rate = max_write_rate;
518
893
  } else if (write_controller->NeedsDelay() && 
max_write_rate > kMinWriteRate716
) {
519
    // If user gives rate less than kMinWriteRate, don't adjust it.
520
    //
521
    // If already delayed, need to adjust based on previous compaction debt.
522
    // When there are two or more column families require delay, we always
523
    // increase or reduce write rate based on information for one single
524
    // column family. It is likely to be OK but we can improve if there is a
525
    // problem.
526
    // Ignore compaction_needed_bytes = 0 case because compaction_needed_bytes
527
    // is only available in level-based compaction
528
    //
529
    // If the compaction debt stays the same as previously, we also further slow
530
    // down. It usually means a mem table is full. It's mainly for the case
531
    // where both of flush and compaction are much slower than the speed we
532
    // insert to mem tables, so we need to actively slow down before we get
533
    // feedback signal from compaction and flushes to avoid the full stop
534
    // because of hitting the max write buffer number.
535
716
    if (prev_compaction_neeed_bytes > 0 &&
536
716
        
prev_compaction_neeed_bytes <= compaction_needed_bytes552
) {
537
166
      write_rate = static_cast<uint64_t>(static_cast<double>(write_rate) /
538
166
                                         kSlowdownRatio);
539
166
      if (write_rate < kMinWriteRate) {
540
0
        write_rate = kMinWriteRate;
541
0
      }
542
550
    } else if (prev_compaction_neeed_bytes > compaction_needed_bytes) {
543
      // We are speeding up by ratio of kSlowdownRatio when we have paid
544
      // compaction debt. But we'll never speed up to faster than the write rate
545
      // given by users.
546
386
      write_rate = static_cast<uint64_t>(static_cast<double>(write_rate) *
547
386
                                         kSlowdownRatio);
548
386
      if (write_rate > max_write_rate) {
549
282
        write_rate = max_write_rate;
550
282
      }
551
386
    }
552
716
  }
553
899
  return write_controller->GetDelayToken(write_rate);
554
899
}
555
556
int GetL0ThresholdSpeedupCompaction(int level0_file_num_compaction_trigger,
557
1.23M
                                    int level0_slowdown_writes_trigger) {
558
  // SanitizeOptions() ensures it.
559
1.23M
  DCHECK_LE(level0_file_num_compaction_trigger, level0_slowdown_writes_trigger);
560
561
1.23M
  const int64_t level0_file_num_compaction_trigger64 = level0_file_num_compaction_trigger;
562
1.23M
  const int64_t level0_slowdown_writes_trigger64 = level0_slowdown_writes_trigger;
563
  // 1/4 of the way between L0 compaction trigger threshold and slowdown
564
  // condition.
565
  // Or twice as compaction trigger, if it is smaller.
566
1.23M
  auto result = std::min(level0_file_num_compaction_trigger64 * 2,
567
1.23M
                         level0_file_num_compaction_trigger64 +
568
1.23M
                             (level0_slowdown_writes_trigger64 -
569
1.23M
                              level0_file_num_compaction_trigger64) / 4);
570
1.23M
  return static_cast<int>(std::min<int64_t>(result, std::numeric_limits<int>::max()));
571
1.23M
}
572
}  // namespace
573
574
void ColumnFamilyData::RecalculateWriteStallConditions(
575
2.12M
      const MutableCFOptions& mutable_cf_options) {
576
2.12M
  Version* current_version = current();
577
2.12M
  if (current_version != nullptr) {
578
1.24M
    auto* vstorage = current_version ->storage_info();
579
1.24M
    auto write_controller = column_family_set_->write_controller_;
580
1.24M
    uint64_t compaction_needed_bytes =
581
1.24M
        vstorage->estimated_compaction_needed_bytes();
582
583
1.24M
    if (imm()->NumNotFlushed() >= mutable_cf_options.max_write_buffer_number) {
584
3.77k
      write_controller_token_ = write_controller->GetStopToken();
585
3.77k
      internal_stats_->AddCFStats(InternalStats::MEMTABLE_COMPACTION, 1);
586
3.77k
      RLOG(InfoLogLevel::WARN_LEVEL, ioptions_.info_log,
587
3.77k
          "[%s] Stopping writes because we have %d immutable memtables "
588
3.77k
          "(waiting for flush), max_write_buffer_number is set to %d",
589
3.77k
          name_.c_str(), imm()->NumNotFlushed(),
590
3.77k
          mutable_cf_options.max_write_buffer_number);
591
1.23M
    } else if (vstorage->l0_delay_trigger_count() >=
592
1.23M
               mutable_cf_options.level0_stop_writes_trigger) {
593
616
      write_controller_token_ = write_controller->GetStopToken();
594
616
      internal_stats_->AddCFStats(InternalStats::LEVEL0_NUM_FILES_TOTAL, 1);
595
616
      if (compaction_picker_->IsLevel0CompactionInProgress()) {
596
32
        internal_stats_->AddCFStats(
597
32
            InternalStats::LEVEL0_NUM_FILES_WITH_COMPACTION, 1);
598
32
      }
599
616
      RLOG(InfoLogLevel::WARN_LEVEL, ioptions_.info_log,
600
616
          "[%s] Stopping writes because we have %d level-0 files",
601
616
          name_.c_str(), vstorage->l0_delay_trigger_count());
602
1.23M
    } else if (mutable_cf_options.hard_pending_compaction_bytes_limit > 0 &&
603
1.23M
               compaction_needed_bytes >=
604
194
                   mutable_cf_options.hard_pending_compaction_bytes_limit) {
605
26
      write_controller_token_ = write_controller->GetStopToken();
606
26
      internal_stats_->AddCFStats(
607
26
          InternalStats::HARD_PENDING_COMPACTION_BYTES_LIMIT, 1);
608
26
      RLOG(InfoLogLevel::WARN_LEVEL, ioptions_.info_log,
609
26
          "[%s] Stopping writes because of estimated pending compaction "
610
26
          "bytes %" PRIu64,
611
26
          name_.c_str(), compaction_needed_bytes);
612
1.23M
    } else if (mutable_cf_options.max_write_buffer_number > 3 &&
613
1.23M
               imm()->NumNotFlushed() >=
614
1.69k
                   mutable_cf_options.max_write_buffer_number - 1) {
615
14
      write_controller_token_ =
616
14
          SetupDelay(ioptions_.delayed_write_rate, write_controller,
617
14
                     compaction_needed_bytes, prev_compaction_needed_bytes_,
618
14
                     mutable_cf_options.disable_auto_compactions);
619
14
      internal_stats_->AddCFStats(InternalStats::MEMTABLE_SLOWDOWN, 1);
620
14
      RLOG(InfoLogLevel::WARN_LEVEL, ioptions_.info_log,
621
14
          "[%s] Stalling writes because we have %d immutable memtables "
622
14
          "(waiting for flush), max_write_buffer_number is set to %d "
623
14
          "rate %" PRIu64,
624
14
          name_.c_str(), imm()->NumNotFlushed(),
625
14
          mutable_cf_options.max_write_buffer_number,
626
14
          write_controller->delayed_write_rate());
627
1.23M
    } else if (mutable_cf_options.level0_slowdown_writes_trigger >= 0 &&
628
1.23M
               vstorage->l0_delay_trigger_count() >=
629
1.23M
                   mutable_cf_options.level0_slowdown_writes_trigger) {
630
806
      write_controller_token_ =
631
806
          SetupDelay(ioptions_.delayed_write_rate, write_controller,
632
806
                     compaction_needed_bytes, prev_compaction_needed_bytes_,
633
806
                     mutable_cf_options.disable_auto_compactions);
634
806
      internal_stats_->AddCFStats(InternalStats::LEVEL0_SLOWDOWN_TOTAL, 1);
635
806
      if (compaction_picker_->IsLevel0CompactionInProgress()) {
636
257
        internal_stats_->AddCFStats(
637
257
            InternalStats::LEVEL0_SLOWDOWN_WITH_COMPACTION, 1);
638
257
      }
639
806
      RLOG(InfoLogLevel::WARN_LEVEL, ioptions_.info_log,
640
806
          "[%s] Stalling writes because we have %d level-0 files "
641
806
          "rate %" PRIu64,
642
806
          name_.c_str(), vstorage->l0_delay_trigger_count(),
643
806
          write_controller->delayed_write_rate());
644
1.23M
    } else if (mutable_cf_options.soft_pending_compaction_bytes_limit > 0 &&
645
1.23M
               vstorage->estimated_compaction_needed_bytes() >=
646
679
                   mutable_cf_options.soft_pending_compaction_bytes_limit) {
647
79
      write_controller_token_ =
648
79
          SetupDelay(ioptions_.delayed_write_rate, write_controller,
649
79
                     compaction_needed_bytes, prev_compaction_needed_bytes_,
650
79
                     mutable_cf_options.disable_auto_compactions);
651
79
      internal_stats_->AddCFStats(
652
79
          InternalStats::SOFT_PENDING_COMPACTION_BYTES_LIMIT, 1);
653
79
      RLOG(InfoLogLevel::WARN_LEVEL, ioptions_.info_log,
654
79
          "[%s] Stalling writes because of estimated pending compaction "
655
79
          "bytes %" PRIu64 " rate %" PRIu64,
656
79
          name_.c_str(), vstorage->estimated_compaction_needed_bytes(),
657
79
          write_controller->delayed_write_rate());
658
1.23M
    } else if (vstorage->l0_delay_trigger_count() >=
659
1.23M
               GetL0ThresholdSpeedupCompaction(
660
1.23M
                   mutable_cf_options.level0_file_num_compaction_trigger,
661
1.23M
                   mutable_cf_options.level0_slowdown_writes_trigger)) {
662
12.4k
      write_controller_token_ = write_controller->GetCompactionPressureToken();
663
12.4k
      RLOG(InfoLogLevel::WARN_LEVEL, ioptions_.info_log,
664
12.4k
          "[%s] Increasing compaction threads because we have %d level-0 "
665
12.4k
          "files ",
666
12.4k
          name_.c_str(), vstorage->l0_delay_trigger_count());
667
1.22M
    } else if (vstorage->estimated_compaction_needed_bytes() >=
668
1.22M
               mutable_cf_options.soft_pending_compaction_bytes_limit / 4) {
669
      // Increase compaction threads if bytes needed for compaction exceeds
670
      // 1/4 of threshold for slowing down.
671
      // If soft pending compaction byte limit is not set, always speed up
672
      // compaction.
673
1.22M
      write_controller_token_ = write_controller->GetCompactionPressureToken();
674
1.22M
      if (mutable_cf_options.soft_pending_compaction_bytes_limit > 0) {
675
96
        RLOG(InfoLogLevel::WARN_LEVEL, ioptions_.info_log,
676
96
            "[%s] Increasing compaction threads because of estimated pending "
677
96
            "compaction "
678
96
            "bytes %" PRIu64,
679
96
            name_.c_str(), vstorage->estimated_compaction_needed_bytes());
680
96
      }
681
1.22M
    } else {
682
483
      write_controller_token_.reset();
683
483
    }
684
1.24M
    prev_compaction_needed_bytes_ = compaction_needed_bytes;
685
1.24M
  }
686
2.12M
}
687
688
1.67k
const EnvOptions* ColumnFamilyData::soptions() const {
689
1.67k
  return &(column_family_set_->env_options_);
690
1.67k
}
691
692
1.21M
void ColumnFamilyData::SetCurrent(Version* current_version) {
693
1.21M
  current_.store(current_version);
694
1.21M
}
695
696
5
uint64_t ColumnFamilyData::GetNumLiveVersions() const {
697
5
  return VersionSet::GetNumLiveVersions(dummy_versions_);
698
5
}
699
700
18
uint64_t ColumnFamilyData::GetTotalSstFilesSize() const {
701
18
  return VersionSet::GetTotalSstFilesSize(dummy_versions_);
702
18
}
703
704
MemTable* ColumnFamilyData::ConstructNewMemtable(
705
474k
    const MutableCFOptions& mutable_cf_options, SequenceNumber earliest_seq) {
706
474k
  DCHECK_ONLY_NOTNULL(current());
707
474k
  return new MemTable(*internal_comparator_, ioptions_, mutable_cf_options,
708
474k
                      write_buffer_, earliest_seq);
709
474k
}
710
711
void ColumnFamilyData::CreateNewMemtable(
712
445k
    const MutableCFOptions& mutable_cf_options, SequenceNumber earliest_seq) {
713
445k
  if (mem_ != nullptr) {
714
3.78k
    delete mem_->Unref();
715
3.78k
  }
716
445k
  SetMemtable(ConstructNewMemtable(mutable_cf_options, earliest_seq));
717
445k
  mem_->Ref();
718
445k
}
719
720
1.00M
bool ColumnFamilyData::NeedsCompaction() const {
721
  // TODO: do we need to check if current() is nullptr?
722
1.00M
  return compaction_picker_->NeedsCompaction(current()->storage_info());
723
1.00M
}
724
725
std::unique_ptr<Compaction> ColumnFamilyData::PickCompaction(
726
34.6k
    const MutableCFOptions& mutable_options, LogBuffer* log_buffer) {
727
  // TODO: do we need to check if current() is not nullptr here?
728
34.6k
  Version* const current_version = current();
729
34.6k
  auto result = compaction_picker_->PickCompaction(
730
34.6k
      GetName(), mutable_options, current_version->storage_info(), log_buffer);
731
34.6k
  if (result != nullptr) {
732
18.4k
    result->SetInputVersion(current_);
733
18.4k
  }
734
34.6k
  return result;
735
34.6k
}
736
737
const int ColumnFamilyData::kCompactAllLevels = -1;
738
const int ColumnFamilyData::kCompactToBaseLevel = -2;
739
740
std::unique_ptr<Compaction> ColumnFamilyData::CompactRange(
741
    const MutableCFOptions& mutable_cf_options, int input_level,
742
    int output_level, uint32_t output_path_id, const InternalKey* begin,
743
6.27k
    const InternalKey* end, InternalKey** compaction_end, bool* conflict) {
744
6.27k
  Version* const current_version = current();
745
  // TODO: do we need to check that current_version is not nullptr?
746
6.27k
  auto result = compaction_picker_->CompactRange(
747
6.27k
      GetName(), mutable_cf_options, current_version->storage_info(), input_level,
748
6.27k
      output_level, output_path_id, begin, end, compaction_end, conflict);
749
6.27k
  if (result != nullptr) {
750
5.21k
    result->SetInputVersion(current_version);
751
5.21k
  }
752
6.27k
  return result;
753
6.27k
}
754
755
SuperVersion* ColumnFamilyData::GetReferencedSuperVersion(
756
38.0M
    InstrumentedMutex* db_mutex) {
757
38.0M
  SuperVersion* sv = nullptr;
758
38.0M
  sv = GetThreadLocalSuperVersion(db_mutex);
759
38.0M
  sv->Ref();
760
38.0M
  if (!ReturnThreadLocalSuperVersion(sv)) {
761
61
    sv->Unref();
762
61
  }
763
38.0M
  return sv;
764
38.0M
}
765
766
SuperVersion* ColumnFamilyData::GetThreadLocalSuperVersion(
767
44.6M
    InstrumentedMutex* db_mutex) {
768
44.6M
  SuperVersion* sv = nullptr;
769
  // The SuperVersion is cached in thread local storage to avoid acquiring
770
  // mutex when SuperVersion does not change since the last use. When a new
771
  // SuperVersion is installed, the compaction or flush thread cleans up
772
  // cached SuperVersion in all existing thread local storage. To avoid
773
  // acquiring mutex for this operation, we use atomic Swap() on the thread
774
  // local pointer to guarantee exclusive access. If the thread local pointer
775
  // is being used while a new SuperVersion is installed, the cached
776
  // SuperVersion can become stale. In that case, the background thread would
777
  // have swapped in kSVObsolete. We re-check the value at when returning
778
  // SuperVersion back to thread local, with an atomic compare and swap.
779
  // The superversion will need to be released if detected to be stale.
780
44.6M
  void* ptr = local_sv_->Swap(SuperVersion::kSVInUse);
781
  // Invariant:
782
  // (1) Scrape (always) installs kSVObsolete in ThreadLocal storage
783
  // (2) the Swap above (always) installs kSVInUse, ThreadLocal storage
784
  // should only keep kSVInUse before ReturnThreadLocalSuperVersion call
785
  // (if no Scrape happens).
786
44.6M
  DCHECK_NE(ptr, SuperVersion::kSVInUse);
787
44.6M
  sv = static_cast<SuperVersion*>(ptr);
788
44.6M
  if (sv == SuperVersion::kSVObsolete ||
789
44.6M
      
sv->version_number != super_version_number_.load()44.2M
) {
790
486k
    RecordTick(ioptions_.statistics, NUMBER_SUPERVERSION_ACQUIRES);
791
486k
    SuperVersion* sv_to_delete = nullptr;
792
793
486k
    if (sv && 
sv->Unref()25
) {
794
0
      RecordTick(ioptions_.statistics, NUMBER_SUPERVERSION_CLEANUPS);
795
0
      db_mutex->Lock();
796
      // NOTE: underlying resources held by superversion (sst files) might
797
      // not be released until the next background job.
798
0
      sv->Cleanup();
799
0
      sv_to_delete = sv;
800
486k
    } else {
801
486k
      db_mutex->Lock();
802
486k
    }
803
486k
    sv = super_version_->Ref();
804
486k
    db_mutex->Unlock();
805
806
486k
    delete sv_to_delete;
807
486k
  }
808
44.6M
  DCHECK_ONLY_NOTNULL(sv);
809
44.6M
  return sv;
810
44.6M
}
811
812
44.6M
bool ColumnFamilyData::ReturnThreadLocalSuperVersion(SuperVersion* sv) {
813
44.6M
  DCHECK_ONLY_NOTNULL(sv);
814
  // Put the SuperVersion back
815
44.6M
  void* expected = SuperVersion::kSVInUse;
816
44.6M
  if (
local_sv_->CompareAndSwap(static_cast<void*>(sv), expected)44.6M
) {
817
    // When we see kSVInUse in the ThreadLocal, we are sure ThreadLocal
818
    // storage has not been altered and no Scrape has happened. The
819
    // SuperVersion is still current.
820
44.6M
    return true;
821
18.4E
  } else {
822
    // ThreadLocal scrape happened in the process of this GetImpl call (after
823
    // thread local Swap() at the beginning and before CompareAndSwap()).
824
    // This means the SuperVersion it holds is obsolete.
825
18.4E
    DCHECK_EQ(expected, SuperVersion::kSVObsolete);
826
18.4E
  }
827
18.4E
  return false;
828
44.6M
}
829
830
std::unique_ptr<SuperVersion> ColumnFamilyData::InstallSuperVersion(
831
262k
    SuperVersion* new_superversion, InstrumentedMutex* db_mutex) {
832
262k
  db_mutex->AssertHeld();
833
262k
  return InstallSuperVersion(new_superversion, db_mutex, mutable_cf_options_);
834
262k
}
835
836
std::unique_ptr<SuperVersion> ColumnFamilyData::InstallSuperVersion(
837
    SuperVersion* new_superversion, InstrumentedMutex* db_mutex,
838
1.24M
    const MutableCFOptions& mutable_cf_options) {
839
1.24M
  new_superversion->db_mutex = db_mutex;
840
1.24M
  new_superversion->mutable_cf_options = mutable_cf_options;
841
1.24M
  new_superversion->Init(mem_, imm_.current(), current_);
842
1.24M
  SuperVersion* old_superversion = super_version_;
843
1.24M
  super_version_ = new_superversion;
844
1.24M
  ++super_version_number_;
845
1.24M
  super_version_->version_number = super_version_number_;
846
  // Reset SuperVersions cached in thread local storage
847
1.24M
  ResetThreadLocalSuperVersions();
848
849
1.24M
  RecalculateWriteStallConditions(mutable_cf_options);
850
851
1.24M
  if (old_superversion != nullptr && 
old_superversion->Unref()801k
) {
852
534k
    old_superversion->Cleanup();
853
    // will let caller delete outside of mutex
854
534k
    return std::unique_ptr<SuperVersion>(old_superversion);
855
534k
  }
856
707k
  return nullptr;
857
1.24M
}
858
859
1.24M
void ColumnFamilyData::ResetThreadLocalSuperVersions() {
860
1.24M
  autovector<void*> sv_ptrs;
861
1.24M
  local_sv_->Scrape(&sv_ptrs, SuperVersion::kSVObsolete);
862
1.24M
  for (auto ptr : sv_ptrs) {
863
155k
    DCHECK(ptr);
864
155k
    if (ptr == SuperVersion::kSVInUse) {
865
238
      continue;
866
238
    }
867
155k
    auto sv = static_cast<SuperVersion*>(ptr);
868
155k
    if (sv->Unref()) {
869
0
      sv->Cleanup();
870
0
      delete sv;
871
0
    }
872
155k
  }
873
1.24M
}
874
875
#ifndef ROCKSDB_LITE
876
Status ColumnFamilyData::SetOptions(
877
842k
      const std::unordered_map<std::string, std::string>& options_map) {
878
842k
  MutableCFOptions new_mutable_cf_options;
879
842k
  Status s = GetMutableOptionsFromStrings(mutable_cf_options_, options_map,
880
842k
                                          &new_mutable_cf_options);
881
842k
  if (s.ok()) {
882
842k
    mutable_cf_options_ = new_mutable_cf_options;
883
842k
    mutable_cf_options_.RefreshDerivedOptions(ioptions_);
884
842k
  }
885
842k
  return s;
886
842k
}
887
#endif  // ROCKSDB_LITE
888
889
ColumnFamilySet::ColumnFamilySet(const std::string& dbname,
890
                                 const DBOptions* db_options,
891
                                 const EnvOptions& env_options,
892
                                 Cache* table_cache,
893
                                 WriteBuffer* write_buffer,
894
                                 WriteController* write_controller)
895
    : max_column_family_(0),
896
      dummy_cfd_(new ColumnFamilyData(0, "", nullptr, nullptr, nullptr,
897
                                      ColumnFamilyOptions(), db_options,
898
                                      env_options, nullptr)),
899
      default_cfd_cache_(nullptr),
900
      db_name_(dbname),
901
      db_options_(db_options),
902
      env_options_(env_options),
903
      table_cache_(table_cache),
904
      write_buffer_(write_buffer),
905
437k
      write_controller_(write_controller) {
906
  // initialize linked list
907
437k
  dummy_cfd_->prev_ = dummy_cfd_;
908
437k
  dummy_cfd_->next_ = dummy_cfd_;
909
437k
}
910
911
398k
ColumnFamilySet::~ColumnFamilySet() {
912
799k
  while (column_family_data_.size() > 0) {
913
    // cfd destructor will delete itself from column_family_data_
914
401k
    auto cfd = column_family_data_.begin()->second;
915
401k
    cfd->Unref();
916
401k
    delete cfd;
917
401k
  }
918
398k
  dummy_cfd_->Unref();
919
398k
  delete dummy_cfd_;
920
398k
}
921
922
28.2M
ColumnFamilyData* ColumnFamilySet::GetDefault() const {
923
28.2M
  DCHECK_ONLY_NOTNULL(default_cfd_cache_);
924
28.2M
  return default_cfd_cache_;
925
28.2M
}
926
927
22.7M
ColumnFamilyData* ColumnFamilySet::GetColumnFamily(uint32_t id) const {
928
22.7M
  auto cfd_iter = column_family_data_.find(id);
929
22.7M
  if (
cfd_iter != column_family_data_.end()22.7M
) {
930
22.7M
    return cfd_iter->second;
931
18.4E
  } else {
932
18.4E
    return nullptr;
933
18.4E
  }
934
22.7M
}
935
936
ColumnFamilyData* ColumnFamilySet::GetColumnFamily(const std::string& name)
937
441k
    const {
938
441k
  auto cfd_iter = column_families_.find(name);
939
441k
  if (cfd_iter != column_families_.end()) {
940
439k
    auto cfd = GetColumnFamily(cfd_iter->second);
941
439k
    DCHECK_ONLY_NOTNULL(cfd);
942
439k
    return cfd;
943
439k
  } else {
944
1.90k
    return nullptr;
945
1.90k
  }
946
441k
}
947
948
1.71k
uint32_t ColumnFamilySet::GetNextColumnFamilyID() {
949
1.71k
  return ++max_column_family_;
950
1.71k
}
951
952
718k
uint32_t ColumnFamilySet::GetMaxColumnFamily() { return max_column_family_; }
953
954
436k
void ColumnFamilySet::UpdateMaxColumnFamily(uint32_t new_max_column_family) {
955
436k
  max_column_family_ = std::max(new_max_column_family, max_column_family_);
956
436k
}
957
958
19.8M
size_t ColumnFamilySet::NumberOfColumnFamilies() const {
959
19.8M
  return column_families_.size();
960
19.8M
}
961
962
// under a DB mutex AND write thread
963
ColumnFamilyData* ColumnFamilySet::CreateColumnFamily(
964
    const std::string& name, uint32_t id, Version* dummy_versions,
965
441k
    const ColumnFamilyOptions& options) {
966
441k
  DCHECK_EQ(column_families_.count(name), 0);
967
441k
  ColumnFamilyData* new_cfd =
968
441k
      new ColumnFamilyData(id, name, dummy_versions, table_cache_,
969
441k
                           write_buffer_, options, db_options_,
970
441k
                           env_options_, this);
971
441k
  column_families_.insert({name, id});
972
441k
  column_family_data_.insert({id, new_cfd});
973
441k
  max_column_family_ = std::max(max_column_family_, id);
974
  // add to linked list
975
441k
  new_cfd->next_ = dummy_cfd_;
976
441k
  auto prev = dummy_cfd_->prev_;
977
441k
  new_cfd->prev_ = prev;
978
441k
  prev->next_ = new_cfd;
979
441k
  dummy_cfd_->prev_ = new_cfd;
980
441k
  if (id == 0) {
981
436k
    default_cfd_cache_ = new_cfd;
982
436k
  }
983
441k
  return new_cfd;
984
441k
}
985
986
// REQUIRES: DB mutex held
987
175k
void ColumnFamilySet::FreeDeadColumnFamilies() {
988
175k
  autovector<ColumnFamilyData*> to_delete;
989
608k
  for (auto cfd = dummy_cfd_->next_; cfd != dummy_cfd_; 
cfd = cfd->next_432k
) {
990
432k
    if (cfd->refs_.load(std::memory_order_relaxed) == 0) {
991
0
      to_delete.push_back(cfd);
992
0
    }
993
432k
  }
994
175k
  for (auto cfd : to_delete) {
995
    // this is very rare, so it's not a problem that we do it under a mutex
996
0
    delete cfd;
997
0
  }
998
175k
}
999
1000
// under a DB mutex AND from a write thread
1001
401k
void ColumnFamilySet::RemoveColumnFamily(ColumnFamilyData* cfd) {
1002
401k
  auto cfd_iter = column_family_data_.find(cfd->GetID());
1003
401k
  DCHECK(cfd_iter != column_family_data_.end());
1004
401k
  column_family_data_.erase(cfd_iter);
1005
401k
  column_families_.erase(cfd->GetName());
1006
401k
}
1007
1008
// under a DB mutex OR from a write thread
1009
49.4M
bool ColumnFamilyMemTablesImpl::Seek(uint32_t column_family_id) {
1010
49.4M
  if (column_family_id == 0) {
1011
    // optimization for common case
1012
27.5M
    current_ = column_family_set_->GetDefault();
1013
27.5M
  } else {
1014
21.8M
    current_ = column_family_set_->GetColumnFamily(column_family_id);
1015
21.8M
  }
1016
49.4M
  handle_.SetCFD(current_);
1017
49.4M
  return current_ != nullptr;
1018
49.4M
}
1019
1020
3.02M
uint64_t ColumnFamilyMemTablesImpl::GetLogNumber() const {
1021
3.02M
  DCHECK_ONLY_NOTNULL(current_);
1022
3.02M
  return current_->GetLogNumber();
1023
3.02M
}
1024
1025
49.4M
MemTable* ColumnFamilyMemTablesImpl::GetMemTable() const {
1026
49.4M
  DCHECK_ONLY_NOTNULL(current_);
1027
49.4M
  return current_->mem();
1028
49.4M
}
1029
1030
185
ColumnFamilyHandle* ColumnFamilyMemTablesImpl::GetColumnFamilyHandle() {
1031
185
  DCHECK_ONLY_NOTNULL(current_);
1032
185
  return &handle_;
1033
185
}
1034
1035
54.0M
uint32_t GetColumnFamilyID(ColumnFamilyHandle* column_family) {
1036
54.0M
  uint32_t column_family_id = 0;
1037
54.0M
  if (column_family != nullptr) {
1038
44.5M
    auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
1039
44.5M
    column_family_id = cfh->GetID();
1040
44.5M
  }
1041
54.0M
  return column_family_id;
1042
54.0M
}
1043
1044
const Comparator* GetColumnFamilyUserComparator(
1045
7.28M
    ColumnFamilyHandle* column_family) {
1046
7.28M
  if (
column_family != nullptr7.28M
) {
1047
7.28M
    auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
1048
7.28M
    return cfh->user_comparator();
1049
7.28M
  }
1050
18.4E
  return nullptr;
1051
7.28M
}
1052
1053
}  // namespace rocksdb