YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/rocksdb/db/flush_scheduler.cc
Line
Count
Source (jump to first uncovered line)
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under the BSD-style license found in the
3
//  LICENSE file in the root directory of this source tree. An additional grant
4
//  of patent rights can be found in the PATENTS file in the same directory.
5
//
6
// The following only applies to changes made to this file as part of YugaByte development.
7
//
8
// Portions Copyright (c) YugaByte, Inc.
9
//
10
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
11
// in compliance with the License.  You may obtain a copy of the License at
12
//
13
// http://www.apache.org/licenses/LICENSE-2.0
14
//
15
// Unless required by applicable law or agreed to in writing, software distributed under the License
16
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
17
// or implied.  See the License for the specific language governing permissions and limitations
18
// under the License.
19
//
20
21
#include "yb/rocksdb/db/flush_scheduler.h"
22
23
#include <cassert>
24
25
#include "yb/rocksdb/db/column_family.h"
26
27
namespace rocksdb {
28
29
12.1k
void FlushScheduler::ScheduleFlush(ColumnFamilyData* cfd) {
30
12.1k
#ifndef NDEBUG
31
12.1k
  {
32
12.1k
    std::lock_guard<std::mutex> lock(checking_mutex_);
33
12.1k
    assert(checking_set_.count(cfd) == 0);
34
0
    checking_set_.insert(cfd);
35
12.1k
  }
36
0
#endif  // NDEBUG
37
0
  cfd->Ref();
38
12.1k
  Node* node = new Node{cfd, head_.load(std::memory_order_relaxed)};
39
12.1k
  while (!head_.compare_exchange_strong(
40
12.1k
      node->next, node, std::memory_order_relaxed, std::memory_order_relaxed)) {
41
    // failing CAS updates the first param, so we are already set for
42
    // retry.  TakeNextColumnFamily won't happen until after another
43
    // inter-thread synchronization, so we don't even need release
44
    // semantics for this CAS
45
0
  }
46
12.1k
}
47
48
3.38M
ColumnFamilyData* FlushScheduler::TakeNextColumnFamily() {
49
3.38M
  while (true) {
50
3.38M
    if (Empty()) {
51
3.37M
      return nullptr;
52
3.37M
    }
53
54
    // dequeue the head
55
12.1k
    Node* node = head_.load(std::memory_order_relaxed);
56
12.1k
    head_.store(node->next, std::memory_order_relaxed);
57
12.1k
    ColumnFamilyData* cfd = node->column_family;
58
12.1k
    delete node;
59
60
12.1k
#ifndef NDEBUG
61
12.1k
    {
62
12.1k
      auto iter = checking_set_.find(cfd);
63
12.1k
      assert(iter != checking_set_.end());
64
0
      checking_set_.erase(iter);
65
12.1k
    }
66
0
#endif  // NDEBUG
67
68
12.1k
    if (!cfd->IsDropped()) {
69
      // success
70
12.0k
      return cfd;
71
12.0k
    }
72
73
    // no longer relevant, retry
74
53
    if (cfd->Unref()) {
75
0
      delete cfd;
76
0
    }
77
53
  }
78
3.38M
}
79
80
31.7M
bool FlushScheduler::Empty() {
81
31.7M
  auto rv = head_.load(std::memory_order_relaxed) == nullptr;
82
31.7M
  assert(rv == checking_set_.empty());
83
0
  return rv;
84
31.7M
}
85
86
405k
void FlushScheduler::Clear() {
87
405k
  ColumnFamilyData* cfd;
88
405k
  while ((cfd = TakeNextColumnFamily()) != nullptr) {
89
98
    if (cfd->Unref()) {
90
0
      delete cfd;
91
0
    }
92
98
  }
93
405k
  assert(Empty());
94
405k
}
95
96
}  // namespace rocksdb