YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/rocksdb/db/column_family.cc
Line
Count
Source (jump to first uncovered line)
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under the BSD-style license found in the
3
//  LICENSE file in the root directory of this source tree. An additional grant
4
//  of patent rights can be found in the PATENTS file in the same directory.
5
//
6
// The following only applies to changes made to this file as part of YugaByte development.
7
//
8
// Portions Copyright (c) YugaByte, Inc.
9
//
10
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
11
// in compliance with the License.  You may obtain a copy of the License at
12
//
13
// http://www.apache.org/licenses/LICENSE-2.0
14
//
15
// Unless required by applicable law or agreed to in writing, software distributed under the License
16
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
17
// or implied.  See the License for the specific language governing permissions and limitations
18
// under the License.
19
//
20
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
21
// Use of this source code is governed by a BSD-style license that can be
22
// found in the LICENSE file. See the AUTHORS file for names of contributors.
23
24
#include "yb/rocksdb/db/column_family.h"
25
26
#ifndef __STDC_FORMAT_MACROS
27
#define __STDC_FORMAT_MACROS
28
#endif
29
30
#include <inttypes.h>
31
#include <vector>
32
#include <string>
33
#include <algorithm>
34
#include <limits>
35
36
#include "yb/rocksdb/db/compaction_picker.h"
37
#include "yb/rocksdb/db/db_impl.h"
38
#include "yb/rocksdb/db/internal_stats.h"
39
#include "yb/rocksdb/db/job_context.h"
40
#include "yb/rocksdb/db/table_properties_collector.h"
41
#include "yb/rocksdb/db/version_set.h"
42
#include "yb/rocksdb/db/write_controller.h"
43
#include "yb/rocksdb/db/writebuffer.h"
44
#include "yb/rocksdb/util/autovector.h"
45
#include "yb/rocksdb/util/compression.h"
46
#include "yb/rocksdb/util/options_helper.h"
47
#include "yb/rocksdb/util/statistics.h"
48
#include "yb/rocksdb/util/xfunc.h"
49
50
#include "yb/util/logging.h"
51
#include <glog/logging.h>
52
53
DEFINE_int32(memstore_arena_size_kb, 64, "Size of each arena allocation for the memstore");
54
55
namespace rocksdb {
56
57
ColumnFamilyHandleImpl::ColumnFamilyHandleImpl(
58
    ColumnFamilyData* column_family_data, DBImpl* db, InstrumentedMutex* mutex)
59
1.08M
    : cfd_(column_family_data), db_(db), mutex_(mutex) {
60
1.08M
  if (cfd_ != nullptr) {
61
688k
    cfd_->Ref();
62
688k
  }
63
1.08M
}
64
65
1.04M
ColumnFamilyHandleImpl::~ColumnFamilyHandleImpl() {
66
1.04M
  if (cfd_ != nullptr) {
67
    // Job id == 0 means that this is not our background process, but rather
68
    // user thread
69
669k
    JobContext job_context(0);
70
669k
    mutex_->Lock();
71
669k
    if (cfd_->Unref()) {
72
26
      delete cfd_;
73
26
    }
74
669k
    db_->FindObsoleteFiles(&job_context, false, true);
75
669k
    mutex_->Unlock();
76
669k
    if (job_context.HaveSomethingToDelete()) {
77
242k
      db_->PurgeObsoleteFiles(job_context);
78
242k
    }
79
669k
    job_context.Clean();
80
669k
  }
81
1.04M
}
82
83
44.8M
uint32_t ColumnFamilyHandleImpl::GetID() const { return cfd()->GetID(); }
84
85
0
const std::string& ColumnFamilyHandleImpl::GetName() const {
86
0
  return cfd()->GetName();
87
0
}
88
89
69
Status ColumnFamilyHandleImpl::GetDescriptor(ColumnFamilyDescriptor* desc) {
90
69
#ifndef ROCKSDB_LITE
91
  // accessing mutable cf-options requires db mutex.
92
69
  InstrumentedMutexLock l(mutex_);
93
69
  *desc = ColumnFamilyDescriptor(
94
69
      cfd()->GetName(),
95
69
      BuildColumnFamilyOptions(*cfd()->options(),
96
69
                               *cfd()->GetLatestMutableCFOptions()));
97
69
  return Status::OK();
98
#else
99
  return STATUS(NotSupported, "");
100
#endif  // !ROCKSDB_LITE
101
69
}
102
103
7.36M
const Comparator* ColumnFamilyHandleImpl::user_comparator() const {
104
7.36M
  return cfd()->user_comparator();
105
7.36M
}
106
107
void GetIntTblPropCollectorFactory(
108
    const ColumnFamilyOptions& cf_options,
109
690k
    IntTblPropCollectorFactories* int_tbl_prop_collector_factories) {
110
690k
  auto& collector_factories = cf_options.table_properties_collector_factories;
111
690k
  for (size_t i = 0; i < cf_options.table_properties_collector_factories.size();
112
107
       ++i) {
113
107
    DCHECK(collector_factories[i]);
114
107
    int_tbl_prop_collector_factories->emplace_back(
115
107
        new UserKeyTablePropertiesCollectorFactory(collector_factories[i]));
116
107
  }
117
  // Add collector to collect internal key statistics
118
690k
  int_tbl_prop_collector_factories->emplace_back(
119
690k
      new InternalKeyPropertiesCollectorFactory);
120
690k
}
121
122
346k
Status CheckCompressionSupported(const ColumnFamilyOptions& cf_options) {
123
346k
  if (!cf_options.compression_per_level.empty()) {
124
88
    for (size_t level = 0; level < cf_options.compression_per_level.size();
125
66
         ++level) {
126
66
      if (!CompressionTypeSupported(cf_options.compression_per_level[level])) {
127
0
        return STATUS(InvalidArgument,
128
0
            "Compression type " +
129
0
            CompressionTypeToString(cf_options.compression_per_level[level]) +
130
0
            " is not linked with the binary.");
131
0
      }
132
66
    }
133
346k
  } else {
134
346k
    if (!CompressionTypeSupported(cf_options.compression)) {
135
2
      return STATUS(InvalidArgument,
136
2
          "Compression type " +
137
2
          CompressionTypeToString(cf_options.compression) +
138
2
          " is not linked with the binary.");
139
2
    }
140
346k
  }
141
346k
  return Status::OK();
142
346k
}
143
144
386
Status CheckConcurrentWritesSupported(const ColumnFamilyOptions& cf_options) {
145
386
  if (cf_options.inplace_update_support) {
146
0
    return STATUS(InvalidArgument,
147
0
        "In-place memtable updates (inplace_update_support) is not compatible "
148
0
        "with concurrent writes (allow_concurrent_memtable_write)");
149
0
  }
150
386
  if (cf_options.filter_deletes) {
151
0
    return STATUS(InvalidArgument,
152
0
        "Delete filtering (filter_deletes) is not compatible with concurrent "
153
0
        "memtable writes (allow_concurrent_memtable_writes)");
154
0
  }
155
386
  if (!cf_options.memtable_factory->IsInsertConcurrentlySupported()) {
156
2
    return STATUS(InvalidArgument,
157
2
        "Memtable doesn't concurrent writes (allow_concurrent_memtable_write)");
158
2
  }
159
384
  return Status::OK();
160
384
}
161
162
ColumnFamilyOptions SanitizeOptions(const DBOptions& db_options,
163
                                    const InternalKeyComparator* icmp,
164
1.03M
                                    const ColumnFamilyOptions& src) {
165
1.03M
  ColumnFamilyOptions result = src;
166
1.03M
  result.comparator = icmp;
167
1.03M
#ifdef OS_MACOSX
168
  // TODO(icanadi) make write_buffer_size uint64_t instead of size_t
169
1.03M
  ClipToRange(&result.write_buffer_size, ((size_t)64) << 10, ((size_t)1) << 30);
170
#else
171
  ClipToRange(&result.write_buffer_size,
172
              ((size_t)64) << 10, ((size_t)64) << 30);
173
#endif
174
  // if user sets arena_block_size, we trust user to use this value. Otherwise,
175
  // calculate a proper value from writer_buffer_size;
176
1.03M
  if (result.arena_block_size <= 0) {
177
1.03M
    result.arena_block_size = std::min(
178
1.03M
        result.write_buffer_size / 8, static_cast<size_t>(FLAGS_memstore_arena_size_kb << 10));
179
180
    // Align up to 4k
181
1.03M
    const size_t align = 4 * 1024;
182
1.03M
    result.arena_block_size =
183
1.03M
        ((result.arena_block_size + align - 1) / align) * align;
184
1.03M
  }
185
1.03M
  result.min_write_buffer_number_to_merge =
186
1.03M
      std::min(result.min_write_buffer_number_to_merge,
187
1.03M
               result.max_write_buffer_number - 1);
188
1.03M
  if (result.num_levels < 1) {
189
54
    result.num_levels = 1;
190
54
  }
191
1.03M
  if (result.compaction_style == kCompactionStyleLevel &&
192
366k
      result.num_levels < 2) {
193
57
    result.num_levels = 2;
194
57
  }
195
1.03M
  if (result.max_write_buffer_number < 2) {
196
0
    result.max_write_buffer_number = 2;
197
0
  }
198
1.03M
  if (result.max_write_buffer_number_to_maintain < 0) {
199
88
    result.max_write_buffer_number_to_maintain = result.max_write_buffer_number;
200
88
  }
201
1.03M
  XFUNC_TEST("memtablelist_history", "transaction_xftest_SanitizeOptions",
202
1.03M
             xf_transaction_set_memtable_history1,
203
1.03M
             xf_transaction_set_memtable_history,
204
1.03M
             &result.max_write_buffer_number_to_maintain);
205
1.03M
  XFUNC_TEST("memtablelist_history_clear", "transaction_xftest_SanitizeOptions",
206
1.03M
             xf_transaction_clear_memtable_history1,
207
1.03M
             xf_transaction_clear_memtable_history,
208
1.03M
             &result.max_write_buffer_number_to_maintain);
209
210
1.03M
  if (!result.prefix_extractor) {
211
1.03M
    DCHECK(result.memtable_factory);
212
1.03M
    Slice name = result.memtable_factory->Name();
213
1.03M
    if (name.compare("HashSkipListRepFactory") == 0 ||
214
1.03M
        name.compare("HashLinkListRepFactory") == 0) {
215
80
      result.memtable_factory = std::make_shared<SkipListFactory>();
216
80
    }
217
1.03M
  }
218
219
1.03M
  if (result.compaction_style == kCompactionStyleFIFO) {
220
321
    result.num_levels = 1;
221
    // since we delete level0 files in FIFO compaction when there are too many
222
    // of them, these options don't really mean anything
223
321
    result.level0_file_num_compaction_trigger = std::numeric_limits<int>::max();
224
321
    result.level0_slowdown_writes_trigger = std::numeric_limits<int>::max();
225
321
    result.level0_stop_writes_trigger = std::numeric_limits<int>::max();
226
321
  }
227
228
1.03M
  if (result.level0_file_num_compaction_trigger == 0) {
229
6
    RWARN(db_options.info_log.get(),
230
6
        "level0_file_num_compaction_trigger cannot be 0");
231
6
    result.level0_file_num_compaction_trigger = 1;
232
6
  }
233
234
1.03M
  if (result.level0_stop_writes_trigger <
235
1.03M
          result.level0_slowdown_writes_trigger ||
236
1.03M
      result.level0_slowdown_writes_trigger <
237
104
          result.level0_file_num_compaction_trigger) {
238
104
    RWARN(db_options.info_log.get(),
239
104
        "This condition must be satisfied: "
240
104
            "level0_stop_writes_trigger(%d) >= "
241
104
            "level0_slowdown_writes_trigger(%d) >= "
242
104
            "level0_file_num_compaction_trigger(%d)",
243
104
        result.level0_stop_writes_trigger,
244
104
        result.level0_slowdown_writes_trigger,
245
104
        result.level0_file_num_compaction_trigger);
246
104
    if (result.level0_slowdown_writes_trigger <
247
56
        result.level0_file_num_compaction_trigger) {
248
56
      result.level0_slowdown_writes_trigger =
249
56
          result.level0_file_num_compaction_trigger;
250
56
    }
251
104
    if (result.level0_stop_writes_trigger <
252
80
        result.level0_slowdown_writes_trigger) {
253
80
      result.level0_stop_writes_trigger = result.level0_slowdown_writes_trigger;
254
80
    }
255
104
    RWARN(db_options.info_log.get(),
256
104
        "Adjust the value to "
257
104
            "level0_stop_writes_trigger(%d)"
258
104
            "level0_slowdown_writes_trigger(%d)"
259
104
            "level0_file_num_compaction_trigger(%d)",
260
104
        result.level0_stop_writes_trigger,
261
104
        result.level0_slowdown_writes_trigger,
262
104
        result.level0_file_num_compaction_trigger);
263
104
  }
264
265
1.03M
  if (result.soft_pending_compaction_bytes_limit == 0) {
266
1.03M
    result.soft_pending_compaction_bytes_limit =
267
1.03M
        result.hard_pending_compaction_bytes_limit;
268
18.4E
  } else if (result.hard_pending_compaction_bytes_limit > 0 &&
269
4
             result.soft_pending_compaction_bytes_limit >
270
1
                 result.hard_pending_compaction_bytes_limit) {
271
1
    result.soft_pending_compaction_bytes_limit =
272
1
        result.hard_pending_compaction_bytes_limit;
273
1
  }
274
275
1.03M
  if (result.level_compaction_dynamic_level_bytes) {
276
32
    if (result.compaction_style != kCompactionStyleLevel ||
277
32
        db_options.db_paths.size() > 1U) {
278
      // 1. level_compaction_dynamic_level_bytes only makes sense for
279
      //    level-based compaction.
280
      // 2. we don't yet know how to make both of this feature and multiple
281
      //    DB path work.
282
0
      result.level_compaction_dynamic_level_bytes = false;
283
0
    }
284
32
  }
285
286
1.03M
  return result;
287
1.03M
}
288
289
int SuperVersion::dummy = 0;
290
void* const SuperVersion::kSVInUse = &SuperVersion::dummy;
291
void* const SuperVersion::kSVObsolete = nullptr;
292
293
1.05M
SuperVersion::~SuperVersion() {
294
25.3k
  for (auto td : to_delete) {
295
25.3k
    delete td;
296
25.3k
  }
297
1.05M
}
298
299
30.6M
SuperVersion* SuperVersion::Ref() {
300
30.6M
  refs.fetch_add(1, std::memory_order_relaxed);
301
30.6M
  return this;
302
30.6M
}
303
304
31.6M
bool SuperVersion::Unref() {
305
  // fetch_sub returns the previous value of ref
306
31.6M
  uint32_t previous_refs = refs.fetch_sub(1);
307
31.6M
  DCHECK_GT(previous_refs, 0);
308
31.6M
  return previous_refs == 1;
309
31.6M
}
310
311
985k
void SuperVersion::Cleanup() {
312
985k
  DCHECK_EQ(refs.load(std::memory_order_relaxed), 0);
313
985k
  imm->Unref(&to_delete);
314
985k
  MemTable* m = mem->Unref();
315
985k
  if (m != nullptr) {
316
155
    auto* memory_usage = current->cfd()->imm()->current_memory_usage();
317
155
    DCHECK_GE(*memory_usage, m->ApproximateMemoryUsage());
318
155
    *memory_usage -= m->ApproximateMemoryUsage();
319
155
    to_delete.push_back(m);
320
155
  }
321
985k
  current->Unref();
322
985k
}
323
324
void SuperVersion::Init(MemTable* new_mem, MemTableListVersion* new_imm,
325
1.00M
                        Version* new_current) {
326
1.00M
  mem = new_mem;
327
1.00M
  imm = new_imm;
328
1.00M
  current = new_current;
329
1.00M
  mem->Ref();
330
1.00M
  imm->Ref();
331
1.00M
  current->Ref();
332
1.00M
  refs.store(1, std::memory_order_relaxed);
333
1.00M
}
334
335
namespace {
336
160k
void SuperVersionUnrefHandle(void* ptr) {
337
  // UnrefHandle is called when a thread exists or a ThreadLocalPtr gets
338
  // destroyed. When former happens, the thread shouldn't see kSVInUse.
339
  // When latter happens, we are in ~ColumnFamilyData(), no get should happen as
340
  // well.
341
160k
  SuperVersion* sv = static_cast<SuperVersion*>(ptr);
342
160k
  if (sv->Unref()) {
343
0
    sv->db_mutex->Lock();
344
0
    sv->Cleanup();
345
0
    sv->db_mutex->Unlock();
346
0
    delete sv;
347
0
  }
348
160k
}
349
}  // anonymous namespace
350
351
ColumnFamilyData::ColumnFamilyData(
352
    uint32_t id, const std::string& name, Version* _dummy_versions,
353
    Cache* _table_cache, WriteBuffer* write_buffer,
354
    const ColumnFamilyOptions& cf_options, const DBOptions* db_options,
355
    const EnvOptions& env_options, ColumnFamilySet* column_family_set)
356
    : id_(id),
357
      name_(name),
358
      dummy_versions_(_dummy_versions),
359
      current_(nullptr),
360
      refs_(0),
361
      dropped_(false),
362
      internal_comparator_(std::make_shared<InternalKeyComparator>(cf_options.comparator)),
363
      options_(*db_options,
364
               SanitizeOptions(*db_options, internal_comparator_.get(), cf_options)),
365
      ioptions_(options_),
366
      mutable_cf_options_(options_, ioptions_),
367
      write_buffer_(write_buffer),
368
      mem_(nullptr),
369
      imm_(options_.min_write_buffer_number_to_merge,
370
           options_.max_write_buffer_number_to_maintain),
371
      super_version_(nullptr),
372
      super_version_number_(0),
373
      local_sv_(new ThreadLocalPtr(&SuperVersionUnrefHandle)),
374
      next_(nullptr),
375
      prev_(nullptr),
376
      log_number_(0),
377
      column_family_set_(column_family_set),
378
      pending_flush_(false),
379
      pending_compaction_(false),
380
690k
      prev_compaction_needed_bytes_(0) {
381
690k
  Ref();
382
383
  // Convert user defined table properties collector factories to internal ones.
384
690k
  GetIntTblPropCollectorFactory(options_, &int_tbl_prop_collector_factories_);
385
386
  // if _dummy_versions is nullptr, then this is a dummy column family.
387
690k
  if (_dummy_versions != nullptr) {
388
347k
    internal_stats_.reset(
389
347k
        new InternalStats(ioptions_.num_levels, db_options->env, this));
390
347k
    table_cache_.reset(new TableCache(ioptions_, env_options, _table_cache));
391
347k
    if (ioptions_.compaction_style == kCompactionStyleLevel) {
392
15.6k
      compaction_picker_.reset(
393
15.6k
          new LevelCompactionPicker(ioptions_, internal_comparator_.get()));
394
15.6k
#ifndef ROCKSDB_LITE
395
331k
    } else if (ioptions_.compaction_style == kCompactionStyleUniversal) {
396
331k
      compaction_picker_.reset(
397
331k
          new UniversalCompactionPicker(ioptions_, internal_comparator_.get()));
398
520
    } else if (ioptions_.compaction_style == kCompactionStyleFIFO) {
399
289
      compaction_picker_.reset(
400
289
          new FIFOCompactionPicker(ioptions_, internal_comparator_.get()));
401
231
    } else if (ioptions_.compaction_style == kCompactionStyleNone) {
402
150
      compaction_picker_.reset(new NullCompactionPicker(
403
150
          ioptions_, internal_comparator_.get()));
404
150
      RLOG(InfoLogLevel::WARN_LEVEL, ioptions_.info_log,
405
150
          "Column family %s does not use any background compaction. "
406
150
          "Compactions can only be done via CompactFiles\n",
407
150
          GetName().c_str());
408
150
#endif  // !ROCKSDB_LITE
409
81
    } else {
410
81
      RLOG(InfoLogLevel::ERROR_LEVEL, ioptions_.info_log,
411
81
          "Unable to recognize the specified compaction style %d. "
412
81
          "Column family %s will use kCompactionStyleLevel.\n",
413
81
          ioptions_.compaction_style, GetName().c_str());
414
81
      compaction_picker_.reset(
415
81
          new LevelCompactionPicker(ioptions_, internal_comparator_.get()));
416
81
    }
417
418
347k
    if (column_family_set_->NumberOfColumnFamilies() < 10) {
419
347k
      RLOG(InfoLogLevel::DEBUG_LEVEL, ioptions_.info_log,
420
347k
          "--------------- Options for column family [%s]:\n", name.c_str());
421
347k
      if (ioptions_.info_log != nullptr &&
422
347k
          ioptions_.info_log->GetInfoLogLevel() <= InfoLogLevel::DEBUG_LEVEL) {
423
17.7k
        options_.DumpCFOptions(ioptions_.info_log);
424
17.7k
      }
425
76
    } else {
426
76
      RLOG(InfoLogLevel::INFO_LEVEL, ioptions_.info_log,
427
76
          "\t(skipping printing options)\n");
428
76
    }
429
347k
  }
430
431
690k
  RecalculateWriteStallConditions(mutable_cf_options_);
432
690k
}
433
434
// DB mutex held
435
652k
ColumnFamilyData::~ColumnFamilyData() {
436
0
  DCHECK_EQ(refs_.load(std::memory_order_relaxed), 0) << this;
437
  // remove from linked list
438
652k
  auto prev = prev_;
439
652k
  auto next = next_;
440
652k
  prev->next_ = next;
441
652k
  next->prev_ = prev;
442
443
652k
  if (!dropped_ && column_family_set_ != nullptr) {
444
    // If it's dropped, it's already removed from column family set
445
    // If column_family_set_ == nullptr, this is dummy CFD and not in
446
    // ColumnFamilySet
447
328k
    column_family_set_->RemoveColumnFamily(this);
448
328k
  }
449
450
652k
  {
451
652k
    Version* const current_version = current();
452
652k
    if (current_version != nullptr) {
453
328k
      current_version->Unref();
454
328k
    }
455
652k
  }
456
457
  // It would be wrong if this ColumnFamilyData is in flush_queue_ or
458
  // compaction_queue_ and we destroyed it
459
652k
  DCHECK(!pending_flush_);
460
652k
  DCHECK(!pending_compaction_);
461
462
652k
  if (super_version_ != nullptr) {
463
    // Release SuperVersion reference kept in ThreadLocalPtr.
464
    // This must be done outside of mutex_ since unref handler can lock mutex.
465
327k
    super_version_->db_mutex->Unlock();
466
327k
    local_sv_.reset();
467
327k
    super_version_->db_mutex->Lock();
468
469
327k
    bool is_last_reference __attribute__((unused));
470
327k
    is_last_reference = super_version_->Unref();
471
327k
    DCHECK(is_last_reference);
472
327k
    super_version_->Cleanup();
473
327k
    delete super_version_;
474
327k
    super_version_ = nullptr;
475
327k
  }
476
477
652k
  if (dummy_versions_ != nullptr) {
478
    // List must be empty
479
329k
    DCHECK_EQ(dummy_versions_->TEST_Next(), dummy_versions_);
480
329k
    bool deleted __attribute__((unused)) = dummy_versions_->Unref();
481
329k
    DCHECK(deleted);
482
329k
  }
483
484
652k
  if (mem_ != nullptr) {
485
329k
    delete mem_->Unref();
486
329k
  }
487
652k
  autovector<MemTable*> to_delete;
488
652k
  imm_.current()->Unref(&to_delete);
489
103
  for (MemTable* m : to_delete) {
490
103
    delete m;
491
103
  }
492
652k
}
493
494
26
void ColumnFamilyData::SetDropped() {
495
  // can't drop default CF
496
26
  DCHECK_NE(id_, 0);
497
26
  dropped_ = true;
498
26
  write_controller_token_.reset();
499
500
  // remove from column_family_set
501
26
  column_family_set_->RemoveColumnFamily(this);
502
26
}
503
504
const double kSlowdownRatio = 1.2;
505
506
namespace {
507
std::unique_ptr<WriteControllerToken> SetupDelay(
508
    uint64_t max_write_rate, WriteController* write_controller,
509
    uint64_t compaction_needed_bytes, uint64_t prev_compaction_neeed_bytes,
510
956
    bool auto_comapctions_disabled) {
511
956
  const uint64_t kMinWriteRate = 1024u;  // Minimum write rate 1KB/s.
512
513
956
  uint64_t write_rate = write_controller->delayed_write_rate();
514
515
956
  if (auto_comapctions_disabled) {
516
    // When auto compaction is disabled, always use the value user gave.
517
6
    write_rate = max_write_rate;
518
950
  } else if (write_controller->NeedsDelay() && max_write_rate > kMinWriteRate) {
519
    // If user gives rate less than kMinWriteRate, don't adjust it.
520
    //
521
    // If already delayed, need to adjust based on previous compaction debt.
522
    // When there are two or more column families require delay, we always
523
    // increase or reduce write rate based on information for one single
524
    // column family. It is likely to be OK but we can improve if there is a
525
    // problem.
526
    // Ignore compaction_needed_bytes = 0 case because compaction_needed_bytes
527
    // is only available in level-based compaction
528
    //
529
    // If the compaction debt stays the same as previously, we also further slow
530
    // down. It usually means a mem table is full. It's mainly for the case
531
    // where both of flush and compaction are much slower than the speed we
532
    // insert to mem tables, so we need to actively slow down before we get
533
    // feedback signal from compaction and flushes to avoid the full stop
534
    // because of hitting the max write buffer number.
535
723
    if (prev_compaction_neeed_bytes > 0 &&
536
548
        prev_compaction_neeed_bytes <= compaction_needed_bytes) {
537
161
      write_rate = static_cast<uint64_t>(static_cast<double>(write_rate) /
538
161
                                         kSlowdownRatio);
539
161
      if (write_rate < kMinWriteRate) {
540
0
        write_rate = kMinWriteRate;
541
0
      }
542
562
    } else if (prev_compaction_neeed_bytes > compaction_needed_bytes) {
543
      // We are speeding up by ratio of kSlowdownRatio when we have paid
544
      // compaction debt. But we'll never speed up to faster than the write rate
545
      // given by users.
546
387
      write_rate = static_cast<uint64_t>(static_cast<double>(write_rate) *
547
387
                                         kSlowdownRatio);
548
387
      if (write_rate > max_write_rate) {
549
287
        write_rate = max_write_rate;
550
287
      }
551
387
    }
552
723
  }
553
956
  return write_controller->GetDelayToken(write_rate);
554
956
}
555
556
int GetL0ThresholdSpeedupCompaction(int level0_file_num_compaction_trigger,
557
998k
                                    int level0_slowdown_writes_trigger) {
558
  // SanitizeOptions() ensures it.
559
998k
  DCHECK_LE(level0_file_num_compaction_trigger, level0_slowdown_writes_trigger);
560
561
998k
  const int64_t level0_file_num_compaction_trigger64 = level0_file_num_compaction_trigger;
562
998k
  const int64_t level0_slowdown_writes_trigger64 = level0_slowdown_writes_trigger;
563
  // 1/4 of the way between L0 compaction trigger threshold and slowdown
564
  // condition.
565
  // Or twice as compaction trigger, if it is smaller.
566
998k
  auto result = std::min(level0_file_num_compaction_trigger64 * 2,
567
998k
                         level0_file_num_compaction_trigger64 +
568
998k
                             (level0_slowdown_writes_trigger64 -
569
998k
                              level0_file_num_compaction_trigger64) / 4);
570
998k
  return static_cast<int>(std::min<int64_t>(result, std::numeric_limits<int>::max()));
571
998k
}
572
}  // namespace
573
574
void ColumnFamilyData::RecalculateWriteStallConditions(
575
1.69M
      const MutableCFOptions& mutable_cf_options) {
576
1.69M
  Version* current_version = current();
577
1.69M
  if (current_version != nullptr) {
578
1.00M
    auto* vstorage = current_version ->storage_info();
579
1.00M
    auto write_controller = column_family_set_->write_controller_;
580
1.00M
    uint64_t compaction_needed_bytes =
581
1.00M
        vstorage->estimated_compaction_needed_bytes();
582
583
1.00M
    if (imm()->NumNotFlushed() >= mutable_cf_options.max_write_buffer_number) {
584
3.61k
      write_controller_token_ = write_controller->GetStopToken();
585
3.61k
      internal_stats_->AddCFStats(InternalStats::MEMTABLE_COMPACTION, 1);
586
3.61k
      RLOG(InfoLogLevel::WARN_LEVEL, ioptions_.info_log,
587
3.61k
          "[%s] Stopping writes because we have %d immutable memtables "
588
3.61k
          "(waiting for flush), max_write_buffer_number is set to %d",
589
3.61k
          name_.c_str(), imm()->NumNotFlushed(),
590
3.61k
          mutable_cf_options.max_write_buffer_number);
591
999k
    } else if (vstorage->l0_delay_trigger_count() >=
592
663
               mutable_cf_options.level0_stop_writes_trigger) {
593
663
      write_controller_token_ = write_controller->GetStopToken();
594
663
      internal_stats_->AddCFStats(InternalStats::LEVEL0_NUM_FILES_TOTAL, 1);
595
663
      if (compaction_picker_->IsLevel0CompactionInProgress()) {
596
76
        internal_stats_->AddCFStats(
597
76
            InternalStats::LEVEL0_NUM_FILES_WITH_COMPACTION, 1);
598
76
      }
599
663
      RLOG(InfoLogLevel::WARN_LEVEL, ioptions_.info_log,
600
663
          "[%s] Stopping writes because we have %d level-0 files",
601
663
          name_.c_str(), vstorage->l0_delay_trigger_count());
602
999k
    } else if (mutable_cf_options.hard_pending_compaction_bytes_limit > 0 &&
603
194
               compaction_needed_bytes >=
604
26
                   mutable_cf_options.hard_pending_compaction_bytes_limit) {
605
26
      write_controller_token_ = write_controller->GetStopToken();
606
26
      internal_stats_->AddCFStats(
607
26
          InternalStats::HARD_PENDING_COMPACTION_BYTES_LIMIT, 1);
608
26
      RLOG(InfoLogLevel::WARN_LEVEL, ioptions_.info_log,
609
26
          "[%s] Stopping writes because of estimated pending compaction "
610
26
          "bytes %" PRIu64,
611
26
          name_.c_str(), compaction_needed_bytes);
612
999k
    } else if (mutable_cf_options.max_write_buffer_number > 3 &&
613
1.69k
               imm()->NumNotFlushed() >=
614
13
                   mutable_cf_options.max_write_buffer_number - 1) {
615
13
      write_controller_token_ =
616
13
          SetupDelay(ioptions_.delayed_write_rate, write_controller,
617
13
                     compaction_needed_bytes, prev_compaction_needed_bytes_,
618
13
                     mutable_cf_options.disable_auto_compactions);
619
13
      internal_stats_->AddCFStats(InternalStats::MEMTABLE_SLOWDOWN, 1);
620
13
      RLOG(InfoLogLevel::WARN_LEVEL, ioptions_.info_log,
621
13
          "[%s] Stalling writes because we have %d immutable memtables "
622
13
          "(waiting for flush), max_write_buffer_number is set to %d "
623
13
          "rate %" PRIu64,
624
13
          name_.c_str(), imm()->NumNotFlushed(),
625
13
          mutable_cf_options.max_write_buffer_number,
626
13
          write_controller->delayed_write_rate());
627
999k
    } else if (mutable_cf_options.level0_slowdown_writes_trigger >= 0 &&
628
999k
               vstorage->l0_delay_trigger_count() >=
629
867
                   mutable_cf_options.level0_slowdown_writes_trigger) {
630
867
      write_controller_token_ =
631
867
          SetupDelay(ioptions_.delayed_write_rate, write_controller,
632
867
                     compaction_needed_bytes, prev_compaction_needed_bytes_,
633
867
                     mutable_cf_options.disable_auto_compactions);
634
867
      internal_stats_->AddCFStats(InternalStats::LEVEL0_SLOWDOWN_TOTAL, 1);
635
867
      if (compaction_picker_->IsLevel0CompactionInProgress()) {
636
322
        internal_stats_->AddCFStats(
637
322
            InternalStats::LEVEL0_SLOWDOWN_WITH_COMPACTION, 1);
638
322
      }
639
867
      RLOG(InfoLogLevel::WARN_LEVEL, ioptions_.info_log,
640
867
          "[%s] Stalling writes because we have %d level-0 files "
641
867
          "rate %" PRIu64,
642
867
          name_.c_str(), vstorage->l0_delay_trigger_count(),
643
867
          write_controller->delayed_write_rate());
644
998k
    } else if (mutable_cf_options.soft_pending_compaction_bytes_limit > 0 &&
645
681
               vstorage->estimated_compaction_needed_bytes() >=
646
76
                   mutable_cf_options.soft_pending_compaction_bytes_limit) {
647
76
      write_controller_token_ =
648
76
          SetupDelay(ioptions_.delayed_write_rate, write_controller,
649
76
                     compaction_needed_bytes, prev_compaction_needed_bytes_,
650
76
                     mutable_cf_options.disable_auto_compactions);
651
76
      internal_stats_->AddCFStats(
652
76
          InternalStats::SOFT_PENDING_COMPACTION_BYTES_LIMIT, 1);
653
76
      RLOG(InfoLogLevel::WARN_LEVEL, ioptions_.info_log,
654
76
          "[%s] Stalling writes because of estimated pending compaction "
655
76
          "bytes %" PRIu64 " rate %" PRIu64,
656
76
          name_.c_str(), vstorage->estimated_compaction_needed_bytes(),
657
76
          write_controller->delayed_write_rate());
658
998k
    } else if (vstorage->l0_delay_trigger_count() >=
659
998k
               GetL0ThresholdSpeedupCompaction(
660
998k
                   mutable_cf_options.level0_file_num_compaction_trigger,
661
12.5k
                   mutable_cf_options.level0_slowdown_writes_trigger)) {
662
12.5k
      write_controller_token_ = write_controller->GetCompactionPressureToken();
663
12.5k
      RLOG(InfoLogLevel::WARN_LEVEL, ioptions_.info_log,
664
12.5k
          "[%s] Increasing compaction threads because we have %d level-0 "
665
12.5k
          "files ",
666
12.5k
          name_.c_str(), vstorage->l0_delay_trigger_count());
667
985k
    } else if (vstorage->estimated_compaction_needed_bytes() >=
668
985k
               mutable_cf_options.soft_pending_compaction_bytes_limit / 4) {
669
      // Increase compaction threads if bytes needed for compaction exceeds
670
      // 1/4 of threshold for slowing down.
671
      // If soft pending compaction byte limit is not set, always speed up
672
      // compaction.
673
985k
      write_controller_token_ = write_controller->GetCompactionPressureToken();
674
985k
      if (mutable_cf_options.soft_pending_compaction_bytes_limit > 0) {
675
103
        RLOG(InfoLogLevel::WARN_LEVEL, ioptions_.info_log,
676
103
            "[%s] Increasing compaction threads because of estimated pending "
677
103
            "compaction "
678
103
            "bytes %" PRIu64,
679
103
            name_.c_str(), vstorage->estimated_compaction_needed_bytes());
680
103
      }
681
433
    } else {
682
433
      write_controller_token_.reset();
683
433
    }
684
1.00M
    prev_compaction_needed_bytes_ = compaction_needed_bytes;
685
1.00M
  }
686
1.69M
}
687
688
1.73k
const EnvOptions* ColumnFamilyData::soptions() const {
689
1.73k
  return &(column_family_set_->env_options_);
690
1.73k
}
691
692
1.00M
void ColumnFamilyData::SetCurrent(Version* current_version) {
693
1.00M
  current_.store(current_version);
694
1.00M
}
695
696
5
uint64_t ColumnFamilyData::GetNumLiveVersions() const {
697
5
  return VersionSet::GetNumLiveVersions(dummy_versions_);
698
5
}
699
700
18
uint64_t ColumnFamilyData::GetTotalSstFilesSize() const {
701
18
  return VersionSet::GetTotalSstFilesSize(dummy_versions_);
702
18
}
703
704
MemTable* ColumnFamilyData::ConstructNewMemtable(
705
376k
    const MutableCFOptions& mutable_cf_options, SequenceNumber earliest_seq) {
706
376k
  DCHECK_ONLY_NOTNULL(current());
707
376k
  return new MemTable(*internal_comparator_, ioptions_, mutable_cf_options,
708
376k
                      write_buffer_, earliest_seq);
709
376k
}
710
711
void ColumnFamilyData::CreateNewMemtable(
712
350k
    const MutableCFOptions& mutable_cf_options, SequenceNumber earliest_seq) {
713
350k
  if (mem_ != nullptr) {
714
3.79k
    delete mem_->Unref();
715
3.79k
  }
716
350k
  SetMemtable(ConstructNewMemtable(mutable_cf_options, earliest_seq));
717
350k
  mem_->Ref();
718
350k
}
719
720
788k
bool ColumnFamilyData::NeedsCompaction() const {
721
  // TODO: do we need to check if current() is nullptr?
722
788k
  return compaction_picker_->NeedsCompaction(current()->storage_info());
723
788k
}
724
725
std::unique_ptr<Compaction> ColumnFamilyData::PickCompaction(
726
33.8k
    const MutableCFOptions& mutable_options, LogBuffer* log_buffer) {
727
  // TODO: do we need to check if current() is not nullptr here?
728
33.8k
  Version* const current_version = current();
729
33.8k
  auto result = compaction_picker_->PickCompaction(
730
33.8k
      GetName(), mutable_options, current_version->storage_info(), log_buffer);
731
33.8k
  if (result != nullptr) {
732
18.1k
    result->SetInputVersion(current_);
733
18.1k
  }
734
33.8k
  return result;
735
33.8k
}
736
737
const int ColumnFamilyData::kCompactAllLevels = -1;
738
const int ColumnFamilyData::kCompactToBaseLevel = -2;
739
740
std::unique_ptr<Compaction> ColumnFamilyData::CompactRange(
741
    const MutableCFOptions& mutable_cf_options, int input_level,
742
    int output_level, uint32_t output_path_id, const InternalKey* begin,
743
5.87k
    const InternalKey* end, InternalKey** compaction_end, bool* conflict) {
744
5.87k
  Version* const current_version = current();
745
  // TODO: do we need to check that current_version is not nullptr?
746
5.87k
  auto result = compaction_picker_->CompactRange(
747
5.87k
      GetName(), mutable_cf_options, current_version->storage_info(), input_level,
748
5.87k
      output_level, output_path_id, begin, end, compaction_end, conflict);
749
5.87k
  if (result != nullptr) {
750
4.86k
    result->SetInputVersion(current_version);
751
4.86k
  }
752
5.87k
  return result;
753
5.87k
}
754
755
SuperVersion* ColumnFamilyData::GetReferencedSuperVersion(
756
15.3M
    InstrumentedMutex* db_mutex) {
757
15.3M
  SuperVersion* sv = nullptr;
758
15.3M
  sv = GetThreadLocalSuperVersion(db_mutex);
759
15.3M
  sv->Ref();
760
15.3M
  if (!ReturnThreadLocalSuperVersion(sv)) {
761
26
    sv->Unref();
762
26
  }
763
15.3M
  return sv;
764
15.3M
}
765
766
SuperVersion* ColumnFamilyData::GetThreadLocalSuperVersion(
767
21.9M
    InstrumentedMutex* db_mutex) {
768
21.9M
  SuperVersion* sv = nullptr;
769
  // The SuperVersion is cached in thread local storage to avoid acquiring
770
  // mutex when SuperVersion does not change since the last use. When a new
771
  // SuperVersion is installed, the compaction or flush thread cleans up
772
  // cached SuperVersion in all existing thread local storage. To avoid
773
  // acquiring mutex for this operation, we use atomic Swap() on the thread
774
  // local pointer to guarantee exclusive access. If the thread local pointer
775
  // is being used while a new SuperVersion is installed, the cached
776
  // SuperVersion can become stale. In that case, the background thread would
777
  // have swapped in kSVObsolete. We re-check the value at when returning
778
  // SuperVersion back to thread local, with an atomic compare and swap.
779
  // The superversion will need to be released if detected to be stale.
780
21.9M
  void* ptr = local_sv_->Swap(SuperVersion::kSVInUse);
781
  // Invariant:
782
  // (1) Scrape (always) installs kSVObsolete in ThreadLocal storage
783
  // (2) the Swap above (always) installs kSVInUse, ThreadLocal storage
784
  // should only keep kSVInUse before ReturnThreadLocalSuperVersion call
785
  // (if no Scrape happens).
786
21.9M
  DCHECK_NE(ptr, SuperVersion::kSVInUse);
787
21.9M
  sv = static_cast<SuperVersion*>(ptr);
788
21.9M
  if (sv == SuperVersion::kSVObsolete ||
789
21.6M
      sv->version_number != super_version_number_.load()) {
790
258k
    RecordTick(ioptions_.statistics, NUMBER_SUPERVERSION_ACQUIRES);
791
258k
    SuperVersion* sv_to_delete = nullptr;
792
793
258k
    if (sv && sv->Unref()) {
794
0
      RecordTick(ioptions_.statistics, NUMBER_SUPERVERSION_CLEANUPS);
795
0
      db_mutex->Lock();
796
      // NOTE: underlying resources held by superversion (sst files) might
797
      // not be released until the next background job.
798
0
      sv->Cleanup();
799
0
      sv_to_delete = sv;
800
258k
    } else {
801
258k
      db_mutex->Lock();
802
258k
    }
803
258k
    sv = super_version_->Ref();
804
258k
    db_mutex->Unlock();
805
806
258k
    delete sv_to_delete;
807
258k
  }
808
21.9M
  DCHECK_ONLY_NOTNULL(sv);
809
21.9M
  return sv;
810
21.9M
}
811
812
21.9M
bool ColumnFamilyData::ReturnThreadLocalSuperVersion(SuperVersion* sv) {
813
21.9M
  DCHECK_ONLY_NOTNULL(sv);
814
  // Put the SuperVersion back
815
21.9M
  void* expected = SuperVersion::kSVInUse;
816
21.9M
  if (local_sv_->CompareAndSwap(static_cast<void*>(sv), expected)) {
817
    // When we see kSVInUse in the ThreadLocal, we are sure ThreadLocal
818
    // storage has not been altered and no Scrape has happened. The
819
    // SuperVersion is still current.
820
21.9M
    return true;
821
18.4E
  } else {
822
    // ThreadLocal scrape happened in the process of this GetImpl call (after
823
    // thread local Swap() at the beginning and before CompareAndSwap()).
824
    // This means the SuperVersion it holds is obsolete.
825
18.4E
    DCHECK_EQ(expected, SuperVersion::kSVObsolete);
826
18.4E
  }
827
18.4E
  return false;
828
21.9M
}
829
830
std::unique_ptr<SuperVersion> ColumnFamilyData::InstallSuperVersion(
831
242k
    SuperVersion* new_superversion, InstrumentedMutex* db_mutex) {
832
242k
  db_mutex->AssertHeld();
833
242k
  return InstallSuperVersion(new_superversion, db_mutex, mutable_cf_options_);
834
242k
}
835
836
std::unique_ptr<SuperVersion> ColumnFamilyData::InstallSuperVersion(
837
    SuperVersion* new_superversion, InstrumentedMutex* db_mutex,
838
1.00M
    const MutableCFOptions& mutable_cf_options) {
839
1.00M
  new_superversion->db_mutex = db_mutex;
840
1.00M
  new_superversion->mutable_cf_options = mutable_cf_options;
841
1.00M
  new_superversion->Init(mem_, imm_.current(), current_);
842
1.00M
  SuperVersion* old_superversion = super_version_;
843
1.00M
  super_version_ = new_superversion;
844
1.00M
  ++super_version_number_;
845
1.00M
  super_version_->version_number = super_version_number_;
846
  // Reset SuperVersions cached in thread local storage
847
1.00M
  ResetThreadLocalSuperVersions();
848
849
1.00M
  RecalculateWriteStallConditions(mutable_cf_options);
850
851
1.00M
  if (old_superversion != nullptr && old_superversion->Unref()) {
852
410k
    old_superversion->Cleanup();
853
    // will let caller delete outside of mutex
854
410k
    return std::unique_ptr<SuperVersion>(old_superversion);
855
410k
  }
856
592k
  return nullptr;
857
592k
}
858
859
1.00M
void ColumnFamilyData::ResetThreadLocalSuperVersions() {
860
1.00M
  autovector<void*> sv_ptrs;
861
1.00M
  local_sv_->Scrape(&sv_ptrs, SuperVersion::kSVObsolete);
862
70.7k
  for (auto ptr : sv_ptrs) {
863
70.7k
    DCHECK(ptr);
864
70.7k
    if (ptr == SuperVersion::kSVInUse) {
865
175
      continue;
866
175
    }
867
70.5k
    auto sv = static_cast<SuperVersion*>(ptr);
868
70.5k
    if (sv->Unref()) {
869
0
      sv->Cleanup();
870
0
      delete sv;
871
0
    }
872
70.5k
  }
873
1.00M
}
874
875
#ifndef ROCKSDB_LITE
876
Status ColumnFamilyData::SetOptions(
877
656k
      const std::unordered_map<std::string, std::string>& options_map) {
878
656k
  MutableCFOptions new_mutable_cf_options;
879
656k
  Status s = GetMutableOptionsFromStrings(mutable_cf_options_, options_map,
880
656k
                                          &new_mutable_cf_options);
881
656k
  if (s.ok()) {
882
656k
    mutable_cf_options_ = new_mutable_cf_options;
883
656k
    mutable_cf_options_.RefreshDerivedOptions(ioptions_);
884
656k
  }
885
656k
  return s;
886
656k
}
887
#endif  // ROCKSDB_LITE
888
889
ColumnFamilySet::ColumnFamilySet(const std::string& dbname,
890
                                 const DBOptions* db_options,
891
                                 const EnvOptions& env_options,
892
                                 Cache* table_cache,
893
                                 WriteBuffer* write_buffer,
894
                                 WriteController* write_controller)
895
    : max_column_family_(0),
896
      dummy_cfd_(new ColumnFamilyData(0, "", nullptr, nullptr, nullptr,
897
                                      ColumnFamilyOptions(), db_options,
898
                                      env_options, nullptr)),
899
      default_cfd_cache_(nullptr),
900
      db_name_(dbname),
901
      db_options_(db_options),
902
      env_options_(env_options),
903
      table_cache_(table_cache),
904
      write_buffer_(write_buffer),
905
343k
      write_controller_(write_controller) {
906
  // initialize linked list
907
343k
  dummy_cfd_->prev_ = dummy_cfd_;
908
343k
  dummy_cfd_->next_ = dummy_cfd_;
909
343k
}
910
911
324k
ColumnFamilySet::~ColumnFamilySet() {
912
652k
  while (column_family_data_.size() > 0) {
913
    // cfd destructor will delete itself from column_family_data_
914
328k
    auto cfd = column_family_data_.begin()->second;
915
328k
    cfd->Unref();
916
328k
    delete cfd;
917
328k
  }
918
324k
  dummy_cfd_->Unref();
919
324k
  delete dummy_cfd_;
920
324k
}
921
922
21.5M
ColumnFamilyData* ColumnFamilySet::GetDefault() const {
923
21.5M
  DCHECK_ONLY_NOTNULL(default_cfd_cache_);
924
21.5M
  return default_cfd_cache_;
925
21.5M
}
926
927
22.7M
ColumnFamilyData* ColumnFamilySet::GetColumnFamily(uint32_t id) const {
928
22.7M
  auto cfd_iter = column_family_data_.find(id);
929
22.7M
  if (cfd_iter != column_family_data_.end()) {
930
22.7M
    return cfd_iter->second;
931
18.4E
  } else {
932
18.4E
    return nullptr;
933
18.4E
  }
934
22.7M
}
935
936
ColumnFamilyData* ColumnFamilySet::GetColumnFamily(const std::string& name)
937
348k
    const {
938
348k
  auto cfd_iter = column_families_.find(name);
939
348k
  if (cfd_iter != column_families_.end()) {
940
346k
    auto cfd = GetColumnFamily(cfd_iter->second);
941
346k
    DCHECK_ONLY_NOTNULL(cfd);
942
346k
    return cfd;
943
1.85k
  } else {
944
1.85k
    return nullptr;
945
1.85k
  }
946
348k
}
947
948
1.71k
uint32_t ColumnFamilySet::GetNextColumnFamilyID() {
949
1.71k
  return ++max_column_family_;
950
1.71k
}
951
952
601k
uint32_t ColumnFamilySet::GetMaxColumnFamily() { return max_column_family_; }
953
954
342k
void ColumnFamilySet::UpdateMaxColumnFamily(uint32_t new_max_column_family) {
955
342k
  max_column_family_ = std::max(new_max_column_family, max_column_family_);
956
342k
}
957
958
13.6M
size_t ColumnFamilySet::NumberOfColumnFamilies() const {
959
13.6M
  return column_families_.size();
960
13.6M
}
961
962
// under a DB mutex AND write thread
963
ColumnFamilyData* ColumnFamilySet::CreateColumnFamily(
964
    const std::string& name, uint32_t id, Version* dummy_versions,
965
347k
    const ColumnFamilyOptions& options) {
966
347k
  DCHECK_EQ(column_families_.count(name), 0);
967
347k
  ColumnFamilyData* new_cfd =
968
347k
      new ColumnFamilyData(id, name, dummy_versions, table_cache_,
969
347k
                           write_buffer_, options, db_options_,
970
347k
                           env_options_, this);
971
347k
  column_families_.insert({name, id});
972
347k
  column_family_data_.insert({id, new_cfd});
973
347k
  max_column_family_ = std::max(max_column_family_, id);
974
  // add to linked list
975
347k
  new_cfd->next_ = dummy_cfd_;
976
347k
  auto prev = dummy_cfd_->prev_;
977
347k
  new_cfd->prev_ = prev;
978
347k
  prev->next_ = new_cfd;
979
347k
  dummy_cfd_->prev_ = new_cfd;
980
347k
  if (id == 0) {
981
342k
    default_cfd_cache_ = new_cfd;
982
342k
  }
983
347k
  return new_cfd;
984
347k
}
985
986
// REQUIRES: DB mutex held
987
130k
void ColumnFamilySet::FreeDeadColumnFamilies() {
988
130k
  autovector<ColumnFamilyData*> to_delete;
989
462k
  for (auto cfd = dummy_cfd_->next_; cfd != dummy_cfd_; cfd = cfd->next_) {
990
332k
    if (cfd->refs_.load(std::memory_order_relaxed) == 0) {
991
0
      to_delete.push_back(cfd);
992
0
    }
993
332k
  }
994
0
  for (auto cfd : to_delete) {
995
    // this is very rare, so it's not a problem that we do it under a mutex
996
0
    delete cfd;
997
0
  }
998
130k
}
999
1000
// under a DB mutex AND from a write thread
1001
328k
void ColumnFamilySet::RemoveColumnFamily(ColumnFamilyData* cfd) {
1002
328k
  auto cfd_iter = column_family_data_.find(cfd->GetID());
1003
328k
  DCHECK(cfd_iter != column_family_data_.end());
1004
328k
  column_family_data_.erase(cfd_iter);
1005
328k
  column_families_.erase(cfd->GetName());
1006
328k
}
1007
1008
// under a DB mutex OR from a write thread
1009
42.9M
bool ColumnFamilyMemTablesImpl::Seek(uint32_t column_family_id) {
1010
42.9M
  if (column_family_id == 0) {
1011
    // optimization for common case
1012
20.9M
    current_ = column_family_set_->GetDefault();
1013
22.0M
  } else {
1014
22.0M
    current_ = column_family_set_->GetColumnFamily(column_family_id);
1015
22.0M
  }
1016
42.9M
  handle_.SetCFD(current_);
1017
42.9M
  return current_ != nullptr;
1018
42.9M
}
1019
1020
3.02M
uint64_t ColumnFamilyMemTablesImpl::GetLogNumber() const {
1021
3.02M
  DCHECK_ONLY_NOTNULL(current_);
1022
3.02M
  return current_->GetLogNumber();
1023
3.02M
}
1024
1025
42.9M
MemTable* ColumnFamilyMemTablesImpl::GetMemTable() const {
1026
42.9M
  DCHECK_ONLY_NOTNULL(current_);
1027
42.9M
  return current_->mem();
1028
42.9M
}
1029
1030
185
ColumnFamilyHandle* ColumnFamilyMemTablesImpl::GetColumnFamilyHandle() {
1031
185
  DCHECK_ONLY_NOTNULL(current_);
1032
185
  return &handle_;
1033
185
}
1034
1035
52.9M
uint32_t GetColumnFamilyID(ColumnFamilyHandle* column_family) {
1036
52.9M
  uint32_t column_family_id = 0;
1037
52.9M
  if (column_family != nullptr) {
1038
44.9M
    auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
1039
44.9M
    column_family_id = cfh->GetID();
1040
44.9M
  }
1041
52.9M
  return column_family_id;
1042
52.9M
}
1043
1044
const Comparator* GetColumnFamilyUserComparator(
1045
7.36M
    ColumnFamilyHandle* column_family) {
1046
7.36M
  if (column_family != nullptr) {
1047
7.36M
    auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
1048
7.36M
    return cfh->user_comparator();
1049
7.36M
  }
1050
18.4E
  return nullptr;
1051
18.4E
}
1052
1053
}  // namespace rocksdb