YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/rocksdb/db/flush_scheduler.cc
Line
Count
Source (jump to first uncovered line)
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under the BSD-style license found in the
3
//  LICENSE file in the root directory of this source tree. An additional grant
4
//  of patent rights can be found in the PATENTS file in the same directory.
5
//
6
// The following only applies to changes made to this file as part of YugaByte development.
7
//
8
// Portions Copyright (c) YugaByte, Inc.
9
//
10
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
11
// in compliance with the License.  You may obtain a copy of the License at
12
//
13
// http://www.apache.org/licenses/LICENSE-2.0
14
//
15
// Unless required by applicable law or agreed to in writing, software distributed under the License
16
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
17
// or implied.  See the License for the specific language governing permissions and limitations
18
// under the License.
19
//
20
21
#include "yb/rocksdb/db/flush_scheduler.h"
22
23
#include <cassert>
24
25
#include "yb/rocksdb/db/column_family.h"
26
27
namespace rocksdb {
28
29
11.8k
void FlushScheduler::ScheduleFlush(ColumnFamilyData* cfd) {
30
11.8k
#ifndef NDEBUG
31
11.8k
  {
32
11.8k
    std::lock_guard<std::mutex> lock(checking_mutex_);
33
11.8k
    assert(checking_set_.count(cfd) == 0);
34
11.8k
    checking_set_.insert(cfd);
35
11.8k
  }
36
11.8k
#endif  // NDEBUG
37
11.8k
  cfd->Ref();
38
11.8k
  Node* node = new Node{cfd, head_.load(std::memory_order_relaxed)};
39
11.8k
  while (!head_.compare_exchange_strong(
40
0
      node->next, node, std::memory_order_relaxed, std::memory_order_relaxed)) {
41
    // failing CAS updates the first param, so we are already set for
42
    // retry.  TakeNextColumnFamily won't happen until after another
43
    // inter-thread synchronization, so we don't even need release
44
    // semantics for this CAS
45
0
  }
46
11.8k
}
47
48
3.31M
ColumnFamilyData* FlushScheduler::TakeNextColumnFamily() {
49
3.31M
  while (true) {
50
3.31M
    if (Empty()) {
51
3.29M
      return nullptr;
52
3.29M
    }
53
54
    // dequeue the head
55
11.7k
    Node* node = head_.load(std::memory_order_relaxed);
56
11.7k
    head_.store(node->next, std::memory_order_relaxed);
57
11.7k
    ColumnFamilyData* cfd = node->column_family;
58
11.7k
    delete node;
59
60
11.7k
#ifndef NDEBUG
61
11.7k
    {
62
11.7k
      auto iter = checking_set_.find(cfd);
63
11.7k
      assert(iter != checking_set_.end());
64
11.7k
      checking_set_.erase(iter);
65
11.7k
    }
66
11.7k
#endif  // NDEBUG
67
68
11.7k
    if (!cfd->IsDropped()) {
69
      // success
70
11.7k
      return cfd;
71
11.7k
    }
72
73
    // no longer relevant, retry
74
27
    if (cfd->Unref()) {
75
0
      delete cfd;
76
0
    }
77
27
  }
78
3.31M
}
79
80
25.5M
bool FlushScheduler::Empty() {
81
25.5M
  auto rv = head_.load(std::memory_order_relaxed) == nullptr;
82
25.5M
  assert(rv == checking_set_.empty());
83
25.5M
  return rv;
84
25.5M
}
85
86
332k
void FlushScheduler::Clear() {
87
332k
  ColumnFamilyData* cfd;
88
332k
  while ((cfd = TakeNextColumnFamily()) != nullptr) {
89
92
    if (cfd->Unref()) {
90
0
      delete cfd;
91
0
    }
92
92
  }
93
332k
  assert(Empty());
94
332k
}
95
96
}  // namespace rocksdb