/Users/deen/code/yugabyte-db/src/yb/util/concurrent_value.h
Line | Count | Source (jump to first uncovered line) |
1 | | // |
2 | | // Copyright (c) YugaByte, Inc. |
3 | | // |
4 | | |
5 | | #ifndef YB_UTIL_CONCURRENT_VALUE_H |
6 | | #define YB_UTIL_CONCURRENT_VALUE_H |
7 | | |
8 | | #include <atomic> |
9 | | #include <mutex> |
10 | | #include <thread> |
11 | | |
12 | | #if defined(__APPLE__) && __clang_major__ < 8 || YB_ZAPCC |
13 | | #define YB_CONCURRENT_VALUE_USE_BOOST_THREAD_SPECIFIC_PTR 1 |
14 | | #else |
15 | | #define YB_CONCURRENT_VALUE_USE_BOOST_THREAD_SPECIFIC_PTR 0 |
16 | | #endif |
17 | | |
18 | | #if YB_CONCURRENT_VALUE_USE_BOOST_THREAD_SPECIFIC_PTR |
19 | | #include <boost/thread/tss.hpp> |
20 | | #endif |
21 | | |
22 | | #include "yb/util/logging.h" |
23 | | |
24 | | namespace yb { |
25 | | |
26 | | namespace internal { |
27 | | typedef decltype(std::this_thread::get_id()) ThreadId; |
28 | | |
29 | | const auto kNullThreadId = ThreadId(); |
30 | | |
31 | | // Tracks list of threads that is using URCU. |
32 | | template<class T> |
33 | | class ThreadList { |
34 | | public: |
35 | | typedef T Data; |
36 | | |
37 | 0 | ~ThreadList() { |
38 | 0 | const auto current_thread_id = std::this_thread::get_id(); |
39 | 0 | destructor_thread_id_.store(current_thread_id, std::memory_order_relaxed); |
40 | | |
41 | | // Check if the current thread has an associated URCUThreadData object that has not been |
42 | | // retired yet. We are doing it by traversing the linked list because the thread local |
43 | | // variable might have been destructed already. |
44 | 0 | size_t desired_allocated_threads = 0; |
45 | 0 | for (auto* p = head_.load(std::memory_order_acquire); p;) { |
46 | 0 | if (p->owner.load(std::memory_order_acquire) == current_thread_id) { |
47 | 0 | desired_allocated_threads = 1; |
48 | 0 | break; |
49 | 0 | } |
50 | 0 | p = p->next.load(std::memory_order_acquire); |
51 | 0 | } |
52 | | |
53 | | // Wait for all threads holding URCUThreadData objects, except maybe for this thread, to call |
54 | | // Retire(). This thread might have to do that later, depending on the destruction orders of |
55 | | // statics. |
56 | 0 | while (allocated_.load(std::memory_order_acquire) != desired_allocated_threads) { |
57 | 0 | std::this_thread::sleep_for(std::chrono::milliseconds(10)); |
58 | 0 | } |
59 | | |
60 | | // At this point no other threads should be touching the linked list. We are not enforcing that, |
61 | | // but we assume that if the ThreadList destructor is being called, the system has already |
62 | | // almost shut down. |
63 | 0 | for (auto* p = head_.exchange(nullptr, std::memory_order_acquire); p;) { |
64 | 0 | auto* n = p->next.load(std::memory_order_relaxed); |
65 | 0 | if (p->owner.load(std::memory_order_relaxed) != current_thread_id) { |
66 | | // If the current thread has not called Retire() on its URCUThreadData object, then we will |
67 | | // defer deleting that object until Retire() is called. |
68 | 0 | delete p; |
69 | 0 | } |
70 | 0 | p = n; |
71 | 0 | } |
72 | 0 | } |
73 | | |
74 | 0 | Data* Alloc() { |
75 | 0 | allocated_.fetch_add(1, std::memory_order_relaxed); |
76 | 0 | Data* data; |
77 | 0 | const auto current_thread_id = std::this_thread::get_id(); |
78 | 0 |
|
79 | 0 | // First, try to reuse a retired (non-active) HP record. |
80 | 0 | for (data = head_.load(std::memory_order_acquire); data; |
81 | 0 | data = data->next.load(std::memory_order_relaxed)) { |
82 | 0 | auto old_value = kNullThreadId; |
83 | 0 | if (data->owner.compare_exchange_strong(old_value, |
84 | 0 | current_thread_id, |
85 | 0 | std::memory_order_seq_cst, |
86 | 0 | std::memory_order_relaxed)) { |
87 | 0 | return data; |
88 | 0 | } |
89 | 0 | } |
90 | 0 |
|
91 | 0 | data = new Data(current_thread_id); |
92 | 0 |
|
93 | 0 | auto old_head = head_.load(std::memory_order_acquire); |
94 | 0 | do { |
95 | 0 | data->next.store(old_head, std::memory_order_relaxed); |
96 | 0 | } while (!head_.compare_exchange_weak(old_head, |
97 | 0 | data, |
98 | 0 | std::memory_order_acq_rel, |
99 | 0 | std::memory_order_acquire)); |
100 | 0 |
|
101 | 0 | return data; |
102 | 0 | } |
103 | | |
104 | 0 | void Retire(Data* data) { |
105 | 0 | DCHECK_ONLY_NOTNULL(data); |
106 | 0 | auto old_thread_id = data->owner.exchange(kNullThreadId, std::memory_order_acq_rel); |
107 | 0 | allocated_.fetch_sub(1, std::memory_order_release); |
108 | | |
109 | | // Using relaxed memory order because we only need to delete the URCUThreadData object here |
110 | | // in case we set destructor_thread_id_ earlier on the same thread. If we are in a different |
111 | | // thread, then the thread id will not match anyway. |
112 | 0 | if (old_thread_id == destructor_thread_id_.load(std::memory_order_relaxed)) { |
113 | 0 | delete data; |
114 | 0 | } |
115 | 0 | } Unexecuted instantiation: _ZN2yb8internal10ThreadListINS0_14URCUThreadDataEE6RetireEPS2_ Unexecuted instantiation: _ZN2yb8internal10ThreadListINS0_14URCUThreadDataEE6RetireEPS2_ |
116 | | |
117 | 0 | Data* Head(std::memory_order mo) const { |
118 | 0 | return head_.load(mo); |
119 | 0 | } |
120 | | |
121 | 0 | static ThreadList<T>& Instance() { |
122 | 0 | static ThreadList<T> result; |
123 | 0 | return result; |
124 | 0 | } |
125 | | |
126 | | private: |
127 | 0 | ThreadList() {} |
128 | | |
129 | | std::atomic<Data*> head_{nullptr}; |
130 | | std::atomic<size_t> allocated_{0}; |
131 | | std::atomic<ThreadId> destructor_thread_id_{kNullThreadId}; |
132 | | }; |
133 | | |
134 | | // URCU data associated with thread. |
135 | | struct URCUThreadData { |
136 | | std::atomic<uint32_t> access_control{0}; |
137 | | std::atomic<URCUThreadData*> next{nullptr}; |
138 | | std::atomic<ThreadId> owner; |
139 | | |
140 | 0 | explicit URCUThreadData(ThreadId owner_) : owner(owner_) {} |
141 | | }; |
142 | | |
143 | | constexpr uint32_t kControlBit = 0x80000000; |
144 | | constexpr uint32_t kNestMask = kControlBit - 1; |
145 | | |
146 | | // Userspace Read-copy-update. |
147 | | // Full description https://en.wikipedia.org/wiki/Read-copy-update |
148 | | // In computer science, read-copy-update (RCU) is a synchronization mechanism based on mutual |
149 | | // exclusion. It is used when performance of reads is crucial and is an example of space-time |
150 | | // tradeoff, enabling fast operations at the cost of more space. |
151 | | // |
152 | | // Read-copy-update allows multiple threads to efficiently read from shared memory by deferring |
153 | | // updates after pre-existing reads to a later time while simultaneously marking the data, |
154 | | // ensuring new readers will read the updated data. This makes all readers proceed as if there |
155 | | // were no synchronization involved, hence they will be fast, but also making updates more |
156 | | // difficult. |
157 | | class URCU { |
158 | | public: |
159 | 0 | URCU() {} |
160 | | |
161 | | URCU(const URCU&) = delete; |
162 | | void operator=(const URCU&) = delete; |
163 | | |
164 | 0 | void AccessLock() { |
165 | 0 | auto* data = DCHECK_NOTNULL(ThreadData()); |
166 | 0 |
|
167 | 0 | uint32_t tmp = data->access_control.load(std::memory_order_relaxed); |
168 | 0 | if ((tmp & kNestMask) == 0) { |
169 | 0 | data->access_control.store(global_control_word_.load(std::memory_order_relaxed), |
170 | 0 | std::memory_order_relaxed); |
171 | 0 |
|
172 | 0 | std::atomic_thread_fence(std::memory_order_seq_cst); |
173 | 0 | } else { |
174 | 0 | // nested lock |
175 | 0 | data->access_control.store(tmp + 1, std::memory_order_relaxed); |
176 | 0 | } |
177 | 0 | } |
178 | | |
179 | 0 | void AccessUnlock() { |
180 | 0 | auto* data = DCHECK_NOTNULL(ThreadData()); |
181 | 0 |
|
182 | 0 | uint32_t tmp = data->access_control.load(std::memory_order_relaxed); |
183 | 0 | CHECK_GT(tmp & kNestMask, 0); |
184 | 0 |
|
185 | 0 | data->access_control.store(tmp - 1, std::memory_order_release); |
186 | 0 | } |
187 | | |
188 | 0 | void Synchronize() { |
189 | 0 | std::lock_guard<std::mutex> lock(mutex_); |
190 | 0 | FlipAndWait(); |
191 | 0 | FlipAndWait(); |
192 | 0 | } |
193 | | |
194 | | private: |
195 | 0 | URCUThreadData* ThreadData() { |
196 | 0 | auto result = data_.get(); |
197 | 0 | if (!result) { |
198 | 0 | data_.reset(result = ThreadList<URCUThreadData>::Instance().Alloc()); |
199 | 0 | } |
200 | 0 | return result; |
201 | 0 | } |
202 | | |
203 | 0 | void FlipAndWait() { |
204 | 0 | global_control_word_.fetch_xor(kControlBit, std::memory_order_seq_cst); |
205 | 0 |
|
206 | 0 | for (auto* data = ThreadList<URCUThreadData>::Instance().Head(std::memory_order_acquire); |
207 | 0 | data; |
208 | 0 | data = data->next.load(std::memory_order_acquire)) { |
209 | 0 | while (data->owner.load(std::memory_order_acquire) != kNullThreadId && |
210 | 0 | CheckGracePeriod(data)) { |
211 | 0 | std::this_thread::sleep_for(std::chrono::milliseconds(10)); |
212 | 0 | std::atomic_thread_fence(std::memory_order_seq_cst); |
213 | 0 | } |
214 | 0 | } |
215 | 0 | } |
216 | | |
217 | 0 | bool CheckGracePeriod(URCUThreadData* data) { |
218 | 0 | const uint32_t v = data->access_control.load(std::memory_order_acquire); |
219 | 0 | return (v & kNestMask) && |
220 | 0 | ((v ^ global_control_word_.load(std::memory_order_relaxed)) & ~kNestMask); |
221 | 0 | } |
222 | | |
223 | | std::atomic <uint32_t> global_control_word_{1}; |
224 | | std::mutex mutex_; |
225 | | #if YB_CONCURRENT_VALUE_USE_BOOST_THREAD_SPECIFIC_PTR |
226 | | static void CleanupThreadData(URCUThreadData* data) { |
227 | | ThreadList<URCUThreadData>::Instance().Retire(data); |
228 | | } |
229 | | |
230 | | static boost::thread_specific_ptr<URCUThreadData> data_; |
231 | | #else |
232 | | struct CleanupThreadData { |
233 | 0 | void operator()(URCUThreadData* data) { |
234 | 0 | ThreadList<URCUThreadData>::Instance().Retire(data); |
235 | 0 | } |
236 | | }; |
237 | | |
238 | | static thread_local std::unique_ptr<URCUThreadData, CleanupThreadData> data_; |
239 | | #endif |
240 | | }; |
241 | | |
242 | | // Reference to concurrent value. Provides read access to concurrent value. |
243 | | // Should have short life time period. |
244 | | template<class T> |
245 | | class ConcurrentValueReference { |
246 | | public: |
247 | | explicit ConcurrentValueReference(std::atomic<T*>* value, URCU* urcu) |
248 | | : urcu_(urcu) { |
249 | | urcu_->AccessLock(); |
250 | | value_ = value->load(std::memory_order_acquire); |
251 | | } |
252 | | |
253 | | ~ConcurrentValueReference() { |
254 | | if (urcu_) { |
255 | | urcu_->AccessUnlock(); |
256 | | } |
257 | | } |
258 | | |
259 | | ConcurrentValueReference(const ConcurrentValueReference&) = delete; |
260 | | void operator=(const ConcurrentValueReference&) = delete; |
261 | | |
262 | | ConcurrentValueReference(ConcurrentValueReference&& rhs) |
263 | | : value_(rhs.value_), urcu_(rhs.urcu_) { |
264 | | rhs.urcu_ = nullptr; |
265 | | } |
266 | | |
267 | | const T& operator*() const { |
268 | | return get(); |
269 | | } |
270 | | |
271 | | const T* operator->() const { |
272 | | return &get(); |
273 | | } |
274 | | |
275 | | const T& get() const { |
276 | | DCHECK_ONLY_NOTNULL(urcu_); |
277 | | return *value_; |
278 | | } |
279 | | private: |
280 | | const T* value_; |
281 | | URCU* urcu_; |
282 | | }; |
283 | | |
284 | | // Concurrent value is used for cases when some object has a lot of reads with small amount of |
285 | | // writes. |
286 | | template<class T> |
287 | | class ConcurrentValue { |
288 | | public: |
289 | | template<class... Args> |
290 | | explicit ConcurrentValue(Args&&... args) : value_(new T(std::forward<Args>(args)...)) {} |
291 | | |
292 | | ~ConcurrentValue() { |
293 | | delete value_.load(std::memory_order_relaxed); |
294 | | } |
295 | | |
296 | | ConcurrentValueReference<T> get() { |
297 | | return ConcurrentValueReference<T>(&value_, &urcu_); |
298 | | } |
299 | | |
300 | | template<class... Args> |
301 | | void Emplace(Args&& ... args) { |
302 | | DoSet(new T(std::forward<Args>(args)...)); |
303 | | } |
304 | | |
305 | | void Set(const T& t) { |
306 | | DoSet(new T(t)); |
307 | | } |
308 | | |
309 | | void Set(T&& t) { |
310 | | DoSet(new T(std::move(t))); |
311 | | } |
312 | | |
313 | | private: |
314 | | void DoSet(T* new_value) { |
315 | | auto* old_value = value_.exchange(new_value, std::memory_order_acq_rel); |
316 | | urcu_.Synchronize(); |
317 | | delete old_value; |
318 | | } |
319 | | |
320 | | std::atomic<T*> value_ = {nullptr}; |
321 | | URCU urcu_; |
322 | | }; |
323 | | |
324 | | } // namespace internal |
325 | | |
326 | | using internal::ConcurrentValue; |
327 | | |
328 | | } // namespace yb |
329 | | |
330 | | #endif // YB_UTIL_CONCURRENT_VALUE_H |