/Users/deen/code/yugabyte-db/src/yb/util/lockfree.h
Line | Count | Source |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | // |
13 | | |
14 | | #ifndef YB_UTIL_LOCKFREE_H |
15 | | #define YB_UTIL_LOCKFREE_H |
16 | | |
17 | | #include <atomic> |
18 | | |
19 | | #include <boost/atomic.hpp> |
20 | | |
21 | | #include "yb/gutil/dynamic_annotations.h" |
22 | | #include "yb/util/atomic.h" // For IsAcceptableAtomicImpl |
23 | | |
24 | | namespace yb { |
25 | | |
26 | | // Multi producer - singe consumer queue. |
27 | | template <class T> |
28 | | class MPSCQueue { |
29 | | public: |
30 | | // Thread safe - could be invoked from multiple threads. |
31 | 6.87M | void Push(T* value) { |
32 | 6.87M | T* old_head = push_head_.load(std::memory_order_acquire); |
33 | 6.88M | for (;;) { |
34 | 6.88M | SetNext(value, old_head); |
35 | 6.88M | if (push_head_.compare_exchange_weak(old_head, value, std::memory_order_acq_rel)) { |
36 | 6.87M | break; |
37 | 6.87M | } |
38 | 6.88M | } |
39 | 6.87M | } yb::MPSCQueue<yb::tablet::OperationDriver>::Push(yb::tablet::OperationDriver*) Line | Count | Source | 31 | 5.04M | void Push(T* value) { | 32 | 5.04M | T* old_head = push_head_.load(std::memory_order_acquire); | 33 | 5.04M | for (;;) { | 34 | 5.04M | SetNext(value, old_head); | 35 | 5.04M | if (push_head_.compare_exchange_weak(old_head, value, std::memory_order_acq_rel)) { | 36 | 5.04M | break; | 37 | 5.04M | } | 38 | 5.04M | } | 39 | 5.04M | } |
yb::MPSCQueue<yb::rpc::StrandTask>::Push(yb::rpc::StrandTask*) Line | Count | Source | 31 | 1.66M | void Push(T* value) { | 32 | 1.66M | T* old_head = push_head_.load(std::memory_order_acquire); | 33 | 1.67M | for (;;) { | 34 | 1.67M | SetNext(value, old_head); | 35 | 1.67M | if (push_head_.compare_exchange_weak(old_head, value, std::memory_order_acq_rel)) { | 36 | 1.66M | break; | 37 | 1.66M | } | 38 | 1.67M | } | 39 | 1.66M | } |
yb::MPSCQueue<yb::client::internal::LookupData>::Push(yb::client::internal::LookupData*) Line | Count | Source | 31 | 160k | void Push(T* value) { | 32 | 160k | T* old_head = push_head_.load(std::memory_order_acquire); | 33 | 160k | for (;;) { | 34 | 160k | SetNext(value, old_head); | 35 | 160k | if (push_head_.compare_exchange_weak(old_head, value, std::memory_order_acq_rel)) { | 36 | 160k | break; | 37 | 160k | } | 38 | 160k | } | 39 | 160k | } |
Unexecuted instantiation: transaction_dump.cc:yb::MPSCQueue<yb::docdb::(anonymous namespace)::DumpEntry>::Push(yb::docdb::(anonymous namespace)::DumpEntry*) |
40 | | |
41 | | // Could be invoked only by one thread at time. |
42 | 39.0M | T* Pop() { |
43 | 39.0M | if (!pop_head_) { |
44 | 38.6M | PreparePop(); |
45 | 38.6M | } |
46 | 39.0M | auto result = pop_head_; |
47 | 39.0M | if (!result) { |
48 | 32.1M | return nullptr; |
49 | 32.1M | } |
50 | 6.87M | pop_head_ = GetNext(result); |
51 | 6.87M | return result; |
52 | 39.0M | } yb::MPSCQueue<yb::tablet::OperationDriver>::Pop() Line | Count | Source | 42 | 35.8M | T* Pop() { | 43 | 35.8M | if (!pop_head_) { | 44 | 35.6M | PreparePop(); | 45 | 35.6M | } | 46 | 35.8M | auto result = pop_head_; | 47 | 35.8M | if (!result) { | 48 | 30.7M | return nullptr; | 49 | 30.7M | } | 50 | 5.04M | pop_head_ = GetNext(result); | 51 | 5.04M | return result; | 52 | 35.8M | } |
yb::MPSCQueue<yb::rpc::StrandTask>::Pop() Line | Count | Source | 42 | 3.02M | T* Pop() { | 43 | 3.02M | if (!pop_head_) { | 44 | 2.83M | PreparePop(); | 45 | 2.83M | } | 46 | 3.02M | auto result = pop_head_; | 47 | 3.02M | if (!result) { | 48 | 1.35M | return nullptr; | 49 | 1.35M | } | 50 | 1.67M | pop_head_ = GetNext(result); | 51 | 1.67M | return result; | 52 | 3.02M | } |
yb::MPSCQueue<yb::client::internal::LookupData>::Pop() Line | Count | Source | 42 | 232k | T* Pop() { | 43 | 232k | if (!pop_head_) { | 44 | 142k | PreparePop(); | 45 | 142k | } | 46 | 232k | auto result = pop_head_; | 47 | 232k | if (!result) { | 48 | 72.7k | return nullptr; | 49 | 72.7k | } | 50 | 160k | pop_head_ = GetNext(result); | 51 | 160k | return result; | 52 | 232k | } |
Unexecuted instantiation: transaction_dump.cc:yb::MPSCQueue<yb::docdb::(anonymous namespace)::DumpEntry>::Pop() |
53 | | |
54 | | private: |
55 | 38.6M | void PreparePop() { |
56 | 38.6M | T* current = push_head_.exchange(nullptr, std::memory_order_acq_rel); |
57 | | // Reverse original list. |
58 | 38.6M | T* prev = nullptr; |
59 | 45.5M | while (current) { |
60 | 6.88M | auto next = GetNext(current); |
61 | 6.88M | SetNext(current, prev); |
62 | 6.88M | prev = current; |
63 | 6.88M | current = next; |
64 | 6.88M | } |
65 | 38.6M | pop_head_ = prev; |
66 | 38.6M | } yb::MPSCQueue<yb::tablet::OperationDriver>::PreparePop() Line | Count | Source | 55 | 35.6M | void PreparePop() { | 56 | 35.6M | T* current = push_head_.exchange(nullptr, std::memory_order_acq_rel); | 57 | | // Reverse original list. | 58 | 35.6M | T* prev = nullptr; | 59 | 40.7M | while (current) { | 60 | 5.05M | auto next = GetNext(current); | 61 | 5.05M | SetNext(current, prev); | 62 | 5.05M | prev = current; | 63 | 5.05M | current = next; | 64 | 5.05M | } | 65 | 35.6M | pop_head_ = prev; | 66 | 35.6M | } |
yb::MPSCQueue<yb::rpc::StrandTask>::PreparePop() Line | Count | Source | 55 | 2.83M | void PreparePop() { | 56 | 2.83M | T* current = push_head_.exchange(nullptr, std::memory_order_acq_rel); | 57 | | // Reverse original list. | 58 | 2.83M | T* prev = nullptr; | 59 | 4.50M | while (current) { | 60 | 1.67M | auto next = GetNext(current); | 61 | 1.67M | SetNext(current, prev); | 62 | 1.67M | prev = current; | 63 | 1.67M | current = next; | 64 | 1.67M | } | 65 | 2.83M | pop_head_ = prev; | 66 | 2.83M | } |
yb::MPSCQueue<yb::client::internal::LookupData>::PreparePop() Line | Count | Source | 55 | 142k | void PreparePop() { | 56 | 142k | T* current = push_head_.exchange(nullptr, std::memory_order_acq_rel); | 57 | | // Reverse original list. | 58 | 142k | T* prev = nullptr; | 59 | 302k | while (current) { | 60 | 160k | auto next = GetNext(current); | 61 | 160k | SetNext(current, prev); | 62 | 160k | prev = current; | 63 | 160k | current = next; | 64 | 160k | } | 65 | 142k | pop_head_ = prev; | 66 | 142k | } |
Unexecuted instantiation: transaction_dump.cc:yb::MPSCQueue<yb::docdb::(anonymous namespace)::DumpEntry>::PreparePop() |
67 | | |
68 | | // List of entries ready for pop, pop head points to the entry that should be returned first. |
69 | | T* pop_head_ = nullptr; |
70 | | // List of push entries, push head points to last pushed entry. |
71 | | std::atomic<T*> push_head_{nullptr}; |
72 | | }; |
73 | | |
74 | | template <class T> |
75 | | class MPSCQueueEntry { |
76 | | public: |
77 | 13.7M | void SetNext(T* next) { |
78 | 13.7M | next_ = next; |
79 | 13.7M | } yb::MPSCQueueEntry<yb::tablet::OperationDriver>::SetNext(yb::tablet::OperationDriver*) Line | Count | Source | 77 | 10.0M | void SetNext(T* next) { | 78 | 10.0M | next_ = next; | 79 | 10.0M | } |
yb::MPSCQueueEntry<yb::rpc::StrandTask>::SetNext(yb::rpc::StrandTask*) Line | Count | Source | 77 | 3.34M | void SetNext(T* next) { | 78 | 3.34M | next_ = next; | 79 | 3.34M | } |
yb::MPSCQueueEntry<yb::client::internal::LookupData>::SetNext(yb::client::internal::LookupData*) Line | Count | Source | 77 | 320k | void SetNext(T* next) { | 78 | 320k | next_ = next; | 79 | 320k | } |
stack_trace.cc:yb::MPSCQueueEntry<yb::(anonymous namespace)::ThreadStackEntry>::SetNext(yb::(anonymous namespace)::ThreadStackEntry*) Line | Count | Source | 77 | 22.5k | void SetNext(T* next) { | 78 | 22.5k | next_ = next; | 79 | 22.5k | } |
|
80 | | |
81 | 13.7M | T* GetNext() const { |
82 | 13.7M | return next_; |
83 | 13.7M | } yb::MPSCQueueEntry<yb::tablet::OperationDriver>::GetNext() const Line | Count | Source | 81 | 10.0M | T* GetNext() const { | 82 | 10.0M | return next_; | 83 | 10.0M | } |
yb::MPSCQueueEntry<yb::rpc::StrandTask>::GetNext() const Line | Count | Source | 81 | 3.34M | T* GetNext() const { | 82 | 3.34M | return next_; | 83 | 3.34M | } |
yb::MPSCQueueEntry<yb::client::internal::LookupData>::GetNext() const Line | Count | Source | 81 | 320k | T* GetNext() const { | 82 | 320k | return next_; | 83 | 320k | } |
stack_trace.cc:yb::MPSCQueueEntry<yb::(anonymous namespace)::ThreadStackEntry>::GetNext() const Line | Count | Source | 81 | 13.5k | T* GetNext() const { | 82 | 13.5k | return next_; | 83 | 13.5k | } |
|
84 | | |
85 | | private: |
86 | | T* next_ = nullptr; |
87 | | }; |
88 | | |
89 | | template <class T> |
90 | 13.7M | void SetNext(MPSCQueueEntry<T>* entry, T* next) { |
91 | 13.7M | entry->SetNext(next); |
92 | 13.7M | } void yb::SetNext<yb::tablet::OperationDriver>(yb::MPSCQueueEntry<yb::tablet::OperationDriver>*, yb::tablet::OperationDriver*) Line | Count | Source | 90 | 10.0M | void SetNext(MPSCQueueEntry<T>* entry, T* next) { | 91 | 10.0M | entry->SetNext(next); | 92 | 10.0M | } |
void yb::SetNext<yb::rpc::StrandTask>(yb::MPSCQueueEntry<yb::rpc::StrandTask>*, yb::rpc::StrandTask*) Line | Count | Source | 90 | 3.34M | void SetNext(MPSCQueueEntry<T>* entry, T* next) { | 91 | 3.34M | entry->SetNext(next); | 92 | 3.34M | } |
void yb::SetNext<yb::client::internal::LookupData>(yb::MPSCQueueEntry<yb::client::internal::LookupData>*, yb::client::internal::LookupData*) Line | Count | Source | 90 | 320k | void SetNext(MPSCQueueEntry<T>* entry, T* next) { | 91 | 320k | entry->SetNext(next); | 92 | 320k | } |
stack_trace.cc:void yb::SetNext<yb::(anonymous namespace)::ThreadStackEntry>(yb::MPSCQueueEntry<yb::(anonymous namespace)::ThreadStackEntry>*, yb::(anonymous namespace)::ThreadStackEntry*) Line | Count | Source | 90 | 22.5k | void SetNext(MPSCQueueEntry<T>* entry, T* next) { | 91 | 22.5k | entry->SetNext(next); | 92 | 22.5k | } |
|
93 | | |
94 | | template <class T> |
95 | 13.7M | T* GetNext(const MPSCQueueEntry<T>* entry) { |
96 | 13.7M | return entry->GetNext(); |
97 | 13.7M | } yb::tablet::OperationDriver* yb::GetNext<yb::tablet::OperationDriver>(yb::MPSCQueueEntry<yb::tablet::OperationDriver> const*) Line | Count | Source | 95 | 10.0M | T* GetNext(const MPSCQueueEntry<T>* entry) { | 96 | 10.0M | return entry->GetNext(); | 97 | 10.0M | } |
yb::rpc::StrandTask* yb::GetNext<yb::rpc::StrandTask>(yb::MPSCQueueEntry<yb::rpc::StrandTask> const*) Line | Count | Source | 95 | 3.34M | T* GetNext(const MPSCQueueEntry<T>* entry) { | 96 | 3.34M | return entry->GetNext(); | 97 | 3.34M | } |
yb::client::internal::LookupData* yb::GetNext<yb::client::internal::LookupData>(yb::MPSCQueueEntry<yb::client::internal::LookupData> const*) Line | Count | Source | 95 | 320k | T* GetNext(const MPSCQueueEntry<T>* entry) { | 96 | 320k | return entry->GetNext(); | 97 | 320k | } |
stack_trace.cc:yb::(anonymous namespace)::ThreadStackEntry* yb::GetNext<yb::(anonymous namespace)::ThreadStackEntry>(yb::MPSCQueueEntry<yb::(anonymous namespace)::ThreadStackEntry> const*) Line | Count | Source | 95 | 13.5k | T* GetNext(const MPSCQueueEntry<T>* entry) { | 96 | 13.5k | return entry->GetNext(); | 97 | 13.5k | } |
|
98 | | |
99 | | // Intrusive stack implementation based on linked list. |
100 | | template <class T> |
101 | | class LockFreeStack { |
102 | | public: |
103 | 65.4k | LockFreeStack() { |
104 | 65.4k | CHECK(IsAcceptableAtomicImpl(head_)); |
105 | 65.4k | } |
106 | | |
107 | 22.5k | void Push(T* value) { |
108 | 22.5k | Head old_head = head_.load(boost::memory_order_acquire); |
109 | 22.5k | for (;;) { |
110 | 22.5k | ANNOTATE_IGNORE_WRITES_BEGIN(); |
111 | 22.5k | SetNext(value, old_head.pointer); |
112 | 22.5k | ANNOTATE_IGNORE_WRITES_END(); |
113 | 22.5k | Head new_head{value, old_head.version + 1}; |
114 | 22.5k | if (head_.compare_exchange_weak(old_head, new_head, boost::memory_order_acq_rel)) { |
115 | 22.5k | break; |
116 | 22.5k | } |
117 | 22.5k | } |
118 | 22.5k | } |
119 | | |
120 | 20.3k | T* Pop() { |
121 | 20.3k | Head old_head = head_.load(boost::memory_order_acquire); |
122 | 20.3k | for (;;) { |
123 | 20.3k | if (!old_head.pointer) { |
124 | 6.79k | break; |
125 | 6.79k | } |
126 | 13.5k | ANNOTATE_IGNORE_READS_BEGIN(); |
127 | 13.5k | Head new_head{GetNext(old_head.pointer), old_head.version + 1}; |
128 | 13.5k | ANNOTATE_IGNORE_READS_END(); |
129 | 13.5k | if (head_.compare_exchange_weak(old_head, new_head, boost::memory_order_acq_rel)) { |
130 | 13.5k | break; |
131 | 13.5k | } |
132 | 13.5k | } |
133 | 20.3k | return old_head.pointer; |
134 | 20.3k | } |
135 | | |
136 | | private: |
137 | | // The clang compiler may generate code that requires 16-byte alignment |
138 | | // that causes SEGV if this struct is not aligned properly. |
139 | | struct Head { |
140 | | T* pointer; |
141 | | size_t version; |
142 | | } __attribute__((aligned(16))); |
143 | | |
144 | | boost::atomic<Head> head_{Head{nullptr, 0}}; |
145 | | }; |
146 | | |
147 | | } // namespace yb |
148 | | |
149 | | #endif // YB_UTIL_LOCKFREE_H |