/Users/deen/code/yugabyte-db/src/yb/rocksdb/db/flush_scheduler.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | | // This source code is licensed under the BSD-style license found in the |
3 | | // LICENSE file in the root directory of this source tree. An additional grant |
4 | | // of patent rights can be found in the PATENTS file in the same directory. |
5 | | // |
6 | | // The following only applies to changes made to this file as part of YugaByte development. |
7 | | // |
8 | | // Portions Copyright (c) YugaByte, Inc. |
9 | | // |
10 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
11 | | // in compliance with the License. You may obtain a copy of the License at |
12 | | // |
13 | | // http://www.apache.org/licenses/LICENSE-2.0 |
14 | | // |
15 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
16 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
17 | | // or implied. See the License for the specific language governing permissions and limitations |
18 | | // under the License. |
19 | | // |
20 | | |
21 | | #include "yb/rocksdb/db/flush_scheduler.h" |
22 | | |
23 | | #include <cassert> |
24 | | |
25 | | #include "yb/rocksdb/db/column_family.h" |
26 | | |
27 | | namespace rocksdb { |
28 | | |
29 | 11.8k | void FlushScheduler::ScheduleFlush(ColumnFamilyData* cfd) { |
30 | 11.8k | #ifndef NDEBUG |
31 | 11.8k | { |
32 | 11.8k | std::lock_guard<std::mutex> lock(checking_mutex_); |
33 | 11.8k | assert(checking_set_.count(cfd) == 0); |
34 | 11.8k | checking_set_.insert(cfd); |
35 | 11.8k | } |
36 | 11.8k | #endif // NDEBUG |
37 | 11.8k | cfd->Ref(); |
38 | 11.8k | Node* node = new Node{cfd, head_.load(std::memory_order_relaxed)}; |
39 | 11.8k | while (!head_.compare_exchange_strong( |
40 | 0 | node->next, node, std::memory_order_relaxed, std::memory_order_relaxed)) { |
41 | | // failing CAS updates the first param, so we are already set for |
42 | | // retry. TakeNextColumnFamily won't happen until after another |
43 | | // inter-thread synchronization, so we don't even need release |
44 | | // semantics for this CAS |
45 | 0 | } |
46 | 11.8k | } |
47 | | |
48 | 3.31M | ColumnFamilyData* FlushScheduler::TakeNextColumnFamily() { |
49 | 3.31M | while (true) { |
50 | 3.31M | if (Empty()) { |
51 | 3.29M | return nullptr; |
52 | 3.29M | } |
53 | | |
54 | | // dequeue the head |
55 | 11.7k | Node* node = head_.load(std::memory_order_relaxed); |
56 | 11.7k | head_.store(node->next, std::memory_order_relaxed); |
57 | 11.7k | ColumnFamilyData* cfd = node->column_family; |
58 | 11.7k | delete node; |
59 | | |
60 | 11.7k | #ifndef NDEBUG |
61 | 11.7k | { |
62 | 11.7k | auto iter = checking_set_.find(cfd); |
63 | 11.7k | assert(iter != checking_set_.end()); |
64 | 11.7k | checking_set_.erase(iter); |
65 | 11.7k | } |
66 | 11.7k | #endif // NDEBUG |
67 | | |
68 | 11.7k | if (!cfd->IsDropped()) { |
69 | | // success |
70 | 11.7k | return cfd; |
71 | 11.7k | } |
72 | | |
73 | | // no longer relevant, retry |
74 | 27 | if (cfd->Unref()) { |
75 | 0 | delete cfd; |
76 | 0 | } |
77 | 27 | } |
78 | 3.31M | } |
79 | | |
80 | 25.5M | bool FlushScheduler::Empty() { |
81 | 25.5M | auto rv = head_.load(std::memory_order_relaxed) == nullptr; |
82 | 25.5M | assert(rv == checking_set_.empty()); |
83 | 25.5M | return rv; |
84 | 25.5M | } |
85 | | |
86 | 332k | void FlushScheduler::Clear() { |
87 | 332k | ColumnFamilyData* cfd; |
88 | 332k | while ((cfd = TakeNextColumnFamily()) != nullptr) { |
89 | 92 | if (cfd->Unref()) { |
90 | 0 | delete cfd; |
91 | 0 | } |
92 | 92 | } |
93 | 332k | assert(Empty()); |
94 | 332k | } |
95 | | |
96 | | } // namespace rocksdb |