YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/util/concurrent_value.h
Line
Count
Source (jump to first uncovered line)
1
//
2
// Copyright (c) YugaByte, Inc.
3
//
4
5
#ifndef YB_UTIL_CONCURRENT_VALUE_H
6
#define YB_UTIL_CONCURRENT_VALUE_H
7
8
#include <atomic>
9
#include <mutex>
10
#include <thread>
11
12
#if defined(__APPLE__) && __clang_major__ < 8 || YB_ZAPCC
13
#define YB_CONCURRENT_VALUE_USE_BOOST_THREAD_SPECIFIC_PTR 1
14
#else
15
#define YB_CONCURRENT_VALUE_USE_BOOST_THREAD_SPECIFIC_PTR 0
16
#endif
17
18
#if YB_CONCURRENT_VALUE_USE_BOOST_THREAD_SPECIFIC_PTR
19
#include <boost/thread/tss.hpp>
20
#endif
21
22
#include "yb/util/logging.h"
23
24
namespace yb {
25
26
namespace internal {
27
typedef decltype(std::this_thread::get_id()) ThreadId;
28
29
const auto kNullThreadId = ThreadId();
30
31
// Tracks list of threads that is using URCU.
32
template<class T>
33
class ThreadList {
34
 public:
35
  typedef T Data;
36
37
0
  ~ThreadList() {
38
0
    const auto current_thread_id = std::this_thread::get_id();
39
0
    destructor_thread_id_.store(current_thread_id, std::memory_order_relaxed);
40
41
    // Check if the current thread has an associated URCUThreadData object that has not been
42
    // retired yet. We are doing it by traversing the linked list because the thread local
43
    // variable might have been destructed already.
44
0
    size_t desired_allocated_threads = 0;
45
0
    for (auto* p = head_.load(std::memory_order_acquire); p;) {
46
0
      if (p->owner.load(std::memory_order_acquire) == current_thread_id) {
47
0
        desired_allocated_threads = 1;
48
0
        break;
49
0
      }
50
0
      p = p->next.load(std::memory_order_acquire);
51
0
    }
52
53
    // Wait for all threads holding URCUThreadData objects, except maybe for this thread, to call
54
    // Retire(). This thread might have to do that later, depending on the destruction orders of
55
    // statics.
56
0
    while (allocated_.load(std::memory_order_acquire) != desired_allocated_threads) {
57
0
      std::this_thread::sleep_for(std::chrono::milliseconds(10));
58
0
    }
59
60
    // At this point no other threads should be touching the linked list. We are not enforcing that,
61
    // but we assume that if the ThreadList destructor is being called, the system has already
62
    // almost shut down.
63
0
    for (auto* p = head_.exchange(nullptr, std::memory_order_acquire); p;) {
64
0
      auto* n = p->next.load(std::memory_order_relaxed);
65
0
      if (p->owner.load(std::memory_order_relaxed) != current_thread_id) {
66
        // If the current thread has not called Retire() on its URCUThreadData object, then we will
67
        // defer deleting that object until Retire() is called.
68
0
        delete p;
69
0
      }
70
0
      p = n;
71
0
    }
72
0
  }
73
74
0
  Data* Alloc() {
75
0
    allocated_.fetch_add(1, std::memory_order_relaxed);
76
0
    Data* data;
77
0
    const auto current_thread_id = std::this_thread::get_id();
78
0
79
0
    // First, try to reuse a retired (non-active) HP record.
80
0
    for (data = head_.load(std::memory_order_acquire); data;
81
0
         data = data->next.load(std::memory_order_relaxed)) {
82
0
      auto old_value = kNullThreadId;
83
0
      if (data->owner.compare_exchange_strong(old_value,
84
0
                                              current_thread_id,
85
0
                                              std::memory_order_seq_cst,
86
0
                                              std::memory_order_relaxed)) {
87
0
        return data;
88
0
      }
89
0
    }
90
0
91
0
    data = new Data(current_thread_id);
92
0
93
0
    auto old_head = head_.load(std::memory_order_acquire);
94
0
    do {
95
0
      data->next.store(old_head, std::memory_order_relaxed);
96
0
    } while (!head_.compare_exchange_weak(old_head,
97
0
                                          data,
98
0
                                          std::memory_order_acq_rel,
99
0
                                          std::memory_order_acquire));
100
0
101
0
    return data;
102
0
  }
103
104
0
  void Retire(Data* data) {
105
0
    DCHECK_ONLY_NOTNULL(data);
106
0
    auto old_thread_id = data->owner.exchange(kNullThreadId, std::memory_order_acq_rel);
107
0
    allocated_.fetch_sub(1, std::memory_order_release);
108
109
    // Using relaxed memory order because we only need to delete the URCUThreadData object here
110
    // in case we set destructor_thread_id_ earlier on the same thread. If we are in a different
111
    // thread, then the thread id will not match anyway.
112
0
    if (old_thread_id == destructor_thread_id_.load(std::memory_order_relaxed)) {
113
0
      delete data;
114
0
    }
115
0
  }
Unexecuted instantiation: _ZN2yb8internal10ThreadListINS0_14URCUThreadDataEE6RetireEPS2_
Unexecuted instantiation: _ZN2yb8internal10ThreadListINS0_14URCUThreadDataEE6RetireEPS2_
116
117
0
  Data* Head(std::memory_order mo) const {
118
0
    return head_.load(mo);
119
0
  }
120
121
0
  static ThreadList<T>& Instance() {
122
0
    static ThreadList<T> result;
123
0
    return result;
124
0
  }
125
126
 private:
127
0
  ThreadList() {}
128
129
  std::atomic<Data*> head_{nullptr};
130
  std::atomic<size_t> allocated_{0};
131
  std::atomic<ThreadId> destructor_thread_id_{kNullThreadId};
132
};
133
134
// URCU data associated with thread.
135
struct URCUThreadData {
136
  std::atomic<uint32_t> access_control{0};
137
  std::atomic<URCUThreadData*> next{nullptr};
138
  std::atomic<ThreadId> owner;
139
140
0
  explicit URCUThreadData(ThreadId owner_) : owner(owner_) {}
141
};
142
143
constexpr uint32_t kControlBit = 0x80000000;
144
constexpr uint32_t kNestMask = kControlBit - 1;
145
146
// Userspace Read-copy-update.
147
// Full description https://en.wikipedia.org/wiki/Read-copy-update
148
// In computer science, read-copy-update (RCU) is a synchronization mechanism based on mutual
149
// exclusion. It is used when performance of reads is crucial and is an example of space-time
150
// tradeoff, enabling fast operations at the cost of more space.
151
//
152
// Read-copy-update allows multiple threads to efficiently read from shared memory by deferring
153
// updates after pre-existing reads to a later time while simultaneously marking the data,
154
// ensuring new readers will read the updated data. This makes all readers proceed as if there
155
// were no synchronization involved, hence they will be fast, but also making updates more
156
// difficult.
157
class URCU {
158
 public:
159
0
  URCU() {}
160
161
  URCU(const URCU&) = delete;
162
  void operator=(const URCU&) = delete;
163
164
0
  void AccessLock() {
165
0
    auto* data = DCHECK_NOTNULL(ThreadData());
166
0
167
0
    uint32_t tmp = data->access_control.load(std::memory_order_relaxed);
168
0
    if ((tmp & kNestMask) == 0) {
169
0
      data->access_control.store(global_control_word_.load(std::memory_order_relaxed),
170
0
          std::memory_order_relaxed);
171
0
172
0
      std::atomic_thread_fence(std::memory_order_seq_cst);
173
0
    } else {
174
0
      // nested lock
175
0
      data->access_control.store(tmp + 1, std::memory_order_relaxed);
176
0
    }
177
0
  }
178
179
0
  void AccessUnlock() {
180
0
    auto* data = DCHECK_NOTNULL(ThreadData());
181
0
182
0
    uint32_t tmp = data->access_control.load(std::memory_order_relaxed);
183
0
    CHECK_GT(tmp & kNestMask, 0);
184
0
185
0
    data->access_control.store(tmp - 1, std::memory_order_release);
186
0
  }
187
188
0
  void Synchronize() {
189
0
    std::lock_guard<std::mutex> lock(mutex_);
190
0
    FlipAndWait();
191
0
    FlipAndWait();
192
0
  }
193
194
 private:
195
0
  URCUThreadData* ThreadData() {
196
0
    auto result = data_.get();
197
0
    if (!result) {
198
0
      data_.reset(result = ThreadList<URCUThreadData>::Instance().Alloc());
199
0
    }
200
0
    return result;
201
0
  }
202
203
0
  void FlipAndWait() {
204
0
    global_control_word_.fetch_xor(kControlBit, std::memory_order_seq_cst);
205
0
206
0
    for (auto* data = ThreadList<URCUThreadData>::Instance().Head(std::memory_order_acquire);
207
0
         data;
208
0
         data = data->next.load(std::memory_order_acquire)) {
209
0
      while (data->owner.load(std::memory_order_acquire) != kNullThreadId &&
210
0
             CheckGracePeriod(data)) {
211
0
        std::this_thread::sleep_for(std::chrono::milliseconds(10));
212
0
        std::atomic_thread_fence(std::memory_order_seq_cst);
213
0
      }
214
0
    }
215
0
  }
216
217
0
  bool CheckGracePeriod(URCUThreadData* data) {
218
0
    const uint32_t v = data->access_control.load(std::memory_order_acquire);
219
0
    return (v & kNestMask) &&
220
0
           ((v ^ global_control_word_.load(std::memory_order_relaxed)) & ~kNestMask);
221
0
  }
222
223
  std::atomic <uint32_t> global_control_word_{1};
224
  std::mutex mutex_;
225
#if YB_CONCURRENT_VALUE_USE_BOOST_THREAD_SPECIFIC_PTR
226
  static void CleanupThreadData(URCUThreadData* data) {
227
    ThreadList<URCUThreadData>::Instance().Retire(data);
228
  }
229
230
  static boost::thread_specific_ptr<URCUThreadData> data_;
231
#else
232
  struct CleanupThreadData {
233
0
    void operator()(URCUThreadData* data) {
234
0
      ThreadList<URCUThreadData>::Instance().Retire(data);
235
0
    }
236
  };
237
238
  static thread_local std::unique_ptr<URCUThreadData, CleanupThreadData> data_;
239
#endif
240
};
241
242
// Reference to concurrent value. Provides read access to concurrent value.
243
// Should have short life time period.
244
template<class T>
245
class ConcurrentValueReference {
246
 public:
247
  explicit ConcurrentValueReference(std::atomic<T*>* value, URCU* urcu)
248
      : urcu_(urcu) {
249
    urcu_->AccessLock();
250
    value_ = value->load(std::memory_order_acquire);
251
  }
252
253
  ~ConcurrentValueReference() {
254
    if (urcu_) {
255
      urcu_->AccessUnlock();
256
    }
257
  }
258
259
  ConcurrentValueReference(const ConcurrentValueReference&) = delete;
260
  void operator=(const ConcurrentValueReference&) = delete;
261
262
  ConcurrentValueReference(ConcurrentValueReference&& rhs)
263
      : value_(rhs.value_), urcu_(rhs.urcu_) {
264
    rhs.urcu_ = nullptr;
265
  }
266
267
  const T& operator*() const {
268
    return get();
269
  }
270
271
  const T* operator->() const {
272
    return &get();
273
  }
274
275
  const T& get() const {
276
    DCHECK_ONLY_NOTNULL(urcu_);
277
    return *value_;
278
  }
279
 private:
280
  const T* value_;
281
  URCU* urcu_;
282
};
283
284
// Concurrent value is used for cases when some object has a lot of reads with small amount of
285
// writes.
286
template<class T>
287
class ConcurrentValue {
288
 public:
289
  template<class... Args>
290
  explicit ConcurrentValue(Args&&... args) : value_(new T(std::forward<Args>(args)...)) {}
291
292
  ~ConcurrentValue() {
293
    delete value_.load(std::memory_order_relaxed);
294
  }
295
296
  ConcurrentValueReference<T> get() {
297
    return ConcurrentValueReference<T>(&value_, &urcu_);
298
  }
299
300
  template<class... Args>
301
  void Emplace(Args&& ... args) {
302
    DoSet(new T(std::forward<Args>(args)...));
303
  }
304
305
  void Set(const T& t) {
306
    DoSet(new T(t));
307
  }
308
309
  void Set(T&& t) {
310
    DoSet(new T(std::move(t)));
311
  }
312
313
 private:
314
  void DoSet(T* new_value) {
315
    auto* old_value = value_.exchange(new_value, std::memory_order_acq_rel);
316
    urcu_.Synchronize();
317
    delete old_value;
318
  }
319
320
  std::atomic<T*> value_ = {nullptr};
321
  URCU urcu_;
322
};
323
324
} // namespace internal
325
326
using internal::ConcurrentValue;
327
328
} // namespace yb
329
330
#endif // YB_UTIL_CONCURRENT_VALUE_H