YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/util/lockfree-test.cc
Line
Count
Source (jump to first uncovered line)
1
//
2
// Copyright (c) YugaByte, Inc.
3
//
4
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
5
// in compliance with the License.  You may obtain a copy of the License at
6
//
7
// http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software distributed under the License
10
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
11
// or implied.  See the License for the specific language governing permissions and limitations
12
// under the License.
13
//
14
//
15
16
#include <atomic>
17
#include <string>
18
#include <thread>
19
20
#include <boost/lockfree/queue.hpp>
21
#include <cds/container/basket_queue.h>
22
#include <cds/container/moir_queue.h>
23
#include <cds/container/optimistic_queue.h>
24
#include <cds/container/rwqueue.h>
25
#include <cds/container/segmented_queue.h>
26
#include <cds/container/vyukov_mpmc_cycle_queue.h>
27
#include <cds/gc/dhp.h>
28
#include <gtest/gtest.h>
29
30
#include "yb/util/lockfree.h"
31
#include "yb/util/logging.h"
32
#include "yb/util/monotime.h"
33
#include "yb/util/random_util.h"
34
#include "yb/util/test_thread_holder.h"
35
#include "yb/util/thread.h"
36
#include "yb/util/tsan_util.h"
37
38
using namespace std::literals;
39
40
namespace yb {
41
42
struct TestEntry : public MPSCQueueEntry<TestEntry> {
43
  size_t thread_index;
44
  size_t index;
45
};
46
47
1
TEST(LockfreeTest, MPSCQueueSimple) {
48
1
  const size_t kTotalEntries = 10;
49
1
  std::vector<TestEntry> entries(kTotalEntries);
50
11
  for (size_t i = 0; i != entries.size(); ++i) {
51
10
    entries[i].index = i;
52
10
  }
53
1
  MPSCQueue<TestEntry> queue;
54
55
  // Push pop 1 entry
56
1
  queue.Push(&entries[0]);
57
1
  ASSERT_EQ(&entries[0], queue.Pop());
58
59
  // Push pop multiple entries
60
10
  for (auto& entry : entries) {
61
10
    queue.Push(&entry);
62
10
  }
63
64
10
  for (auto& entry : entries) {
65
10
    ASSERT_EQ(&entry, queue.Pop());
66
10
  }
67
68
  // Mixed push and pop
69
1
  queue.Push(&entries[0]);
70
1
  queue.Push(&entries[1]);
71
1
  ASSERT_EQ(&entries[0], queue.Pop());
72
1
  queue.Push(&entries[2]);
73
1
  queue.Push(&entries[3]);
74
1
  ASSERT_EQ(&entries[1], queue.Pop());
75
1
  ASSERT_EQ(&entries[2], queue.Pop());
76
1
  queue.Push(&entries[4]);
77
1
  ASSERT_EQ(&entries[3], queue.Pop());
78
1
  ASSERT_EQ(&entries[4], queue.Pop());
79
1
  queue.Push(&entries[5]);
80
1
  queue.Push(&entries[6]);
81
1
  queue.Push(&entries[7]);
82
1
  ASSERT_EQ(&entries[5], queue.Pop());
83
1
  ASSERT_EQ(&entries[6], queue.Pop());
84
1
  ASSERT_EQ(&entries[7], queue.Pop());
85
1
  ASSERT_EQ(nullptr, queue.Pop());
86
1
  queue.Push(&entries[8]);
87
1
  queue.Push(&entries[9]);
88
1
  ASSERT_EQ(&entries[8], queue.Pop());
89
1
  ASSERT_EQ(&entries[9], queue.Pop());
90
1
  ASSERT_EQ(nullptr, queue.Pop());
91
1
}
92
93
1
TEST(LockfreeTest, MPSCQueueConcurrent) {
94
1
  constexpr size_t kThreads = 10;
95
1
  constexpr size_t kEntriesPerThread = 200000;
96
97
1
  std::vector<TestEntry> entries(kThreads * kEntriesPerThread);
98
1
  MPSCQueue<TestEntry> queue;
99
100
1
  auto start_time = MonoTime::Now();
101
1
  std::vector<std::thread> threads;
102
11
  while (threads.size() != kThreads) {
103
10
    size_t thread_index = threads.size();
104
10
    threads.emplace_back([&queue, thread_index, &entries] {
105
10
      size_t base = thread_index * kEntriesPerThread;
106
1.99M
      for (size_t i = 0; i != kEntriesPerThread; ++i) {
107
1.99M
        auto& entry = entries[base + i];
108
1.99M
        entry.thread_index = thread_index;
109
1.99M
        entry.index = i;
110
1.99M
        queue.Push(&entry);
111
1.99M
      }
112
10
    });
113
10
  }
114
115
1
  size_t threads_left = kThreads;
116
1
  std::vector<size_t> next_index(kThreads);
117
2.04M
  while (threads_left > 0) {
118
2.04M
    auto* entry = queue.Pop();
119
2.04M
    if (!entry) {
120
43.8k
      continue;
121
43.8k
    }
122
123
2.00M
    ASSERT_EQ(entry->index, next_index[entry->thread_index]);
124
2.00M
    if (++next_index[entry->thread_index] == kEntriesPerThread) {
125
10
      --threads_left;
126
10
    }
127
2.00M
  }
128
129
1
  auto passed = MonoTime::Now() - start_time;
130
131
1
  LOG(INFO) << "Passed: " << passed;
132
133
10
  for (auto i : next_index) {
134
10
    ASSERT_EQ(i, kEntriesPerThread);
135
10
  }
136
137
10
  for (auto& thread : threads) {
138
10
    thread.join();
139
10
  }
140
1
}
141
142
#ifndef NDEBUG
143
constexpr auto kEntries = RegularBuildVsSanitizers(1000000, 1000);
144
#else
145
constexpr auto kEntries = 10000000;
146
#endif
147
148
template <class T, class Allocator = std::allocator<T>>
149
struct BlockAllocator {
150
  template <class U>
151
  struct rebind {
152
    typedef BlockAllocator<U, typename Allocator::template rebind<U>::other> other;
153
  };
154
155
  typedef typename Allocator::value_type value_type;
156
  typedef typename Allocator::pointer pointer;
157
  typedef typename Allocator::size_type size_type;
158
159
1.28M
  void deallocate(pointer p, size_type n) {
160
1.28M
    BlockEntry* entry = OBJECT_FROM_MEMBER(BlockEntry, value, p);
161
1.28M
    if (entry->counter->fetch_sub(1, std::memory_order_acq_rel) == 1) {
162
15.6k
      Block* block = OBJECT_FROM_MEMBER(Block, counter, entry->counter);
163
15.6k
      impl_.deallocate(block, 1);
164
15.6k
    }
165
1.28M
  }
_ZN2yb14BlockAllocatorIN3cds9container7details17make_basket_queueINS1_2gc2HPElNS1_3opt9allocatorINS0_IiNSt3__19allocatorIiEEEEE4packINS2_12basket_queue6traitsEEEE9node_typeENSA_ISJ_EEE10deallocateEPSJ_m
Line
Count
Source
159
303k
  void deallocate(pointer p, size_type n) {
160
303k
    BlockEntry* entry = OBJECT_FROM_MEMBER(BlockEntry, value, p);
161
303k
    if (entry->counter->fetch_sub(1, std::memory_order_acq_rel) == 1) {
162
7.77k
      Block* block = OBJECT_FROM_MEMBER(Block, counter, entry->counter);
163
7.77k
      impl_.deallocate(block, 1);
164
7.77k
    }
165
303k
  }
_ZN2yb14BlockAllocatorIN3cds9container7details17make_basket_queueINS1_2gc3DHPElNS1_3opt9allocatorINS0_IiNSt3__19allocatorIiEEEEE4packINS2_12basket_queue6traitsEEEE9node_typeENSA_ISJ_EEE10deallocateEPSJ_m
Line
Count
Source
159
986k
  void deallocate(pointer p, size_type n) {
160
986k
    BlockEntry* entry = OBJECT_FROM_MEMBER(BlockEntry, value, p);
161
986k
    if (entry->counter->fetch_sub(1, std::memory_order_acq_rel) == 1) {
162
7.87k
      Block* block = OBJECT_FROM_MEMBER(Block, counter, entry->counter);
163
7.87k
      impl_.deallocate(block, 1);
164
7.87k
    }
165
986k
  }
Unexecuted instantiation: _ZN2yb14BlockAllocatorIN3cds9container7details12make_msqueueINS1_2gc2HPElNS1_3opt9allocatorINS0_IiNSt3__19allocatorIiEEEEE4packINS2_7msqueue6traitsEEEE9node_typeENSA_ISJ_EEE10deallocateEPSJ_m
Unexecuted instantiation: _ZN2yb14BlockAllocatorIN3cds9container7details12make_msqueueINS1_2gc3DHPElNS1_3opt9allocatorINS0_IiNSt3__19allocatorIiEEEEE4packINS2_7msqueue6traitsEEEE9node_typeENSA_ISJ_EEE10deallocateEPSJ_m
Unexecuted instantiation: _ZN2yb14BlockAllocatorIN3cds9container7details21make_optimistic_queueINS1_2gc2HPElNS1_3opt9allocatorINS0_IiNSt3__19allocatorIiEEEEE4packINS2_16optimistic_queue6traitsEEEE9node_typeENSA_ISJ_EEE10deallocateEPSJ_m
Unexecuted instantiation: _ZN2yb14BlockAllocatorIN3cds9container7details21make_optimistic_queueINS1_2gc3DHPElNS1_3opt9allocatorINS0_IiNSt3__19allocatorIiEEEEE4packINS2_16optimistic_queue6traitsEEEE9node_typeENSA_ISJ_EEE10deallocateEPSJ_m
166
167
  static constexpr size_t kBlockEntries = 0x80;
168
169
  struct BlockEntry {
170
    std::atomic<size_t>* counter;
171
    T value;
172
  };
173
174
  struct Block {
175
    std::atomic<size_t> counter;
176
    BlockEntry entries[kBlockEntries];
177
  };
178
179
  struct TSS {
180
    size_t idx = kBlockEntries;
181
    Block* block = nullptr;
182
183
254
    ~TSS() {
184
254
      auto delta = kBlockEntries - idx;
185
254
      if (delta != 0 && block->counter.fetch_sub(delta, std::memory_order_acq_rel) == delta) {
186
0
        Impl().deallocate(block, 1);
187
0
      }
188
254
    }
_ZN2yb14BlockAllocatorIN3cds9container7details17make_basket_queueINS1_2gc2HPElNS1_3opt9allocatorINS0_IiNSt3__19allocatorIiEEEEE4packINS2_12basket_queue6traitsEEEE9node_typeENSA_ISJ_EEE3TSSD2Ev
Line
Count
Source
183
128
    ~TSS() {
184
128
      auto delta = kBlockEntries - idx;
185
128
      if (delta != 0 && block->counter.fetch_sub(delta, std::memory_order_acq_rel) == delta) {
186
0
        Impl().deallocate(block, 1);
187
0
      }
188
128
    }
_ZN2yb14BlockAllocatorIN3cds9container7details17make_basket_queueINS1_2gc3DHPElNS1_3opt9allocatorINS0_IiNSt3__19allocatorIiEEEEE4packINS2_12basket_queue6traitsEEEE9node_typeENSA_ISJ_EEE3TSSD2Ev
Line
Count
Source
183
126
    ~TSS() {
184
126
      auto delta = kBlockEntries - idx;
185
126
      if (delta != 0 && block->counter.fetch_sub(delta, std::memory_order_acq_rel) == delta) {
186
0
        Impl().deallocate(block, 1);
187
0
      }
188
126
    }
Unexecuted instantiation: _ZN2yb14BlockAllocatorIN3cds9container7details12make_msqueueINS1_2gc2HPElNS1_3opt9allocatorINS0_IiNSt3__19allocatorIiEEEEE4packINS2_7msqueue6traitsEEEE9node_typeENSA_ISJ_EEE3TSSD2Ev
Unexecuted instantiation: _ZN2yb14BlockAllocatorIN3cds9container7details12make_msqueueINS1_2gc3DHPElNS1_3opt9allocatorINS0_IiNSt3__19allocatorIiEEEEE4packINS2_7msqueue6traitsEEEE9node_typeENSA_ISJ_EEE3TSSD2Ev
Unexecuted instantiation: _ZN2yb14BlockAllocatorIN3cds9container7details21make_optimistic_queueINS1_2gc2HPElNS1_3opt9allocatorINS0_IiNSt3__19allocatorIiEEEEE4packINS2_16optimistic_queue6traitsEEEE9node_typeENSA_ISJ_EEE3TSSD2Ev
Unexecuted instantiation: _ZN2yb14BlockAllocatorIN3cds9container7details21make_optimistic_queueINS1_2gc3DHPElNS1_3opt9allocatorINS0_IiNSt3__19allocatorIiEEEEE4packINS2_16optimistic_queue6traitsEEEE9node_typeENSA_ISJ_EEE3TSSD2Ev
189
  };
190
191
  static thread_local std::unique_ptr<TSS> tss_;
192
193
1.99M
  pointer allocate(size_type n) {
194
1.99M
    TSS* tss = tss_.get();
195
1.99M
    if (PREDICT_FALSE(!tss)) {
196
256
      tss_.reset(new TSS);
197
256
      tss = tss_.get();
198
256
    }
199
1.99M
    if (PREDICT_FALSE(tss->idx == kBlockEntries)) {
200
15.7k
      tss->block = impl_.allocate(1);
201
15.7k
      tss->block->counter.store(kBlockEntries, std::memory_order_release);
202
15.7k
      tss->idx = 0;
203
15.7k
    }
204
1.99M
    auto& entry = tss->block->entries[tss->idx++];
205
1.99M
    entry.counter = &tss->block->counter;
206
1.99M
    return &entry.value;
207
1.99M
  }
_ZN2yb14BlockAllocatorIN3cds9container7details17make_basket_queueINS1_2gc2HPElNS1_3opt9allocatorINS0_IiNSt3__19allocatorIiEEEEE4packINS2_12basket_queue6traitsEEEE9node_typeENSA_ISJ_EEE8allocateEm
Line
Count
Source
193
998k
  pointer allocate(size_type n) {
194
998k
    TSS* tss = tss_.get();
195
998k
    if (PREDICT_FALSE(!tss)) {
196
128
      tss_.reset(new TSS);
197
128
      tss = tss_.get();
198
128
    }
199
998k
    if (PREDICT_FALSE(tss->idx == kBlockEntries)) {
200
7.87k
      tss->block = impl_.allocate(1);
201
7.87k
      tss->block->counter.store(kBlockEntries, std::memory_order_release);
202
7.87k
      tss->idx = 0;
203
7.87k
    }
204
998k
    auto& entry = tss->block->entries[tss->idx++];
205
998k
    entry.counter = &tss->block->counter;
206
998k
    return &entry.value;
207
998k
  }
_ZN2yb14BlockAllocatorIN3cds9container7details17make_basket_queueINS1_2gc3DHPElNS1_3opt9allocatorINS0_IiNSt3__19allocatorIiEEEEE4packINS2_12basket_queue6traitsEEEE9node_typeENSA_ISJ_EEE8allocateEm
Line
Count
Source
193
999k
  pointer allocate(size_type n) {
194
999k
    TSS* tss = tss_.get();
195
999k
    if (PREDICT_FALSE(!tss)) {
196
128
      tss_.reset(new TSS);
197
128
      tss = tss_.get();
198
128
    }
199
999k
    if (PREDICT_FALSE(tss->idx == kBlockEntries)) {
200
7.87k
      tss->block = impl_.allocate(1);
201
7.87k
      tss->block->counter.store(kBlockEntries, std::memory_order_release);
202
7.87k
      tss->idx = 0;
203
7.87k
    }
204
999k
    auto& entry = tss->block->entries[tss->idx++];
205
999k
    entry.counter = &tss->block->counter;
206
999k
    return &entry.value;
207
999k
  }
Unexecuted instantiation: _ZN2yb14BlockAllocatorIN3cds9container7details12make_msqueueINS1_2gc2HPElNS1_3opt9allocatorINS0_IiNSt3__19allocatorIiEEEEE4packINS2_7msqueue6traitsEEEE9node_typeENSA_ISJ_EEE8allocateEm
Unexecuted instantiation: _ZN2yb14BlockAllocatorIN3cds9container7details12make_msqueueINS1_2gc3DHPElNS1_3opt9allocatorINS0_IiNSt3__19allocatorIiEEEEE4packINS2_7msqueue6traitsEEEE9node_typeENSA_ISJ_EEE8allocateEm
Unexecuted instantiation: _ZN2yb14BlockAllocatorIN3cds9container7details21make_optimistic_queueINS1_2gc2HPElNS1_3opt9allocatorINS0_IiNSt3__19allocatorIiEEEEE4packINS2_16optimistic_queue6traitsEEEE9node_typeENSA_ISJ_EEE8allocateEm
Unexecuted instantiation: _ZN2yb14BlockAllocatorIN3cds9container7details21make_optimistic_queueINS1_2gc3DHPElNS1_3opt9allocatorINS0_IiNSt3__19allocatorIiEEEEE4packINS2_16optimistic_queue6traitsEEEE9node_typeENSA_ISJ_EEE8allocateEm
208
209
  typedef typename Allocator::template rebind<Block>::other Impl;
210
  Impl impl_;
211
};
212
213
template <class T, class Allocator>
214
thread_local std::unique_ptr<typename BlockAllocator<T, Allocator>::TSS>
215
    BlockAllocator<T, Allocator>::tss_;
216
217
class QueuePerformanceHelper {
218
 public:
219
1
  void Warmup() {
220
    // Empty name would not be printed, so we use it for warmup.
221
1
    TestQueue<boost::lockfree::queue<ptrdiff_t>>("", 1000);
222
1
  }
223
224
1
  void Perform(size_t workers, bool mixed_mode) {
225
1
    Setup(workers, mixed_mode);
226
1
    RunAll();
227
1
  }
228
229
 private:
230
1
  void Setup(size_t workers, bool mixed_mode) {
231
1
    workers_ = workers;
232
1
    mixed_mode_ = mixed_mode;
233
234
1
    LOG(INFO) << "Setup, workers: " << workers << ", mixed mode: " << mixed_mode;
235
1
  }
236
237
1
  void RunAll() {
238
1
    typedef cds::opt::allocator<BlockAllocator<int>> OptAllocator;
239
1
    TestQueue<boost::lockfree::queue<ptrdiff_t, boost::lockfree::fixed_sized<true>>>(
240
1
        "boost::lockfree::queue", 50000);
241
1
    TestQueue<cds::container::BasketQueue<cds::gc::HP, ptrdiff_t>>("BasketQueue");
242
1
    TestQueue<cds::container::BasketQueue<cds::gc::DHP, ptrdiff_t>>("BasketQueue/DHP");
243
1
    TestQueue<cds::container::BasketQueue<
244
1
        cds::gc::HP, ptrdiff_t,
245
1
        cds::container::basket_queue::make_traits<OptAllocator>::type>>(
246
1
            "BasketQueue/BlockAllocator");
247
1
    TestQueue<cds::container::BasketQueue<
248
1
        cds::gc::DHP, ptrdiff_t,
249
1
        cds::container::basket_queue::make_traits<OptAllocator>::type>>(
250
1
            "BasketQueue/BlockAllocator/DHP");
251
    // FCQueue disabled, since looks like it has bugs.
252
    // TestQueue<cds::container::FCQueue<ptrdiff_t>>("FCQueue");
253
1
    TestQueue<cds::container::MoirQueue<cds::gc::HP, ptrdiff_t>>("MoirQueue");
254
1
    TestQueue<cds::container::MoirQueue<cds::gc::DHP, ptrdiff_t>>("MoirQueue/DHP");
255
1
    TestQueue<cds::container::MoirQueue<
256
1
        cds::gc::HP, ptrdiff_t,
257
1
        cds::container::msqueue::make_traits<OptAllocator>::type>>(
258
1
            "MoirQueue/BlockAllocator");
259
1
    TestQueue<cds::container::MoirQueue<
260
1
        cds::gc::DHP, ptrdiff_t,
261
1
        cds::container::msqueue::make_traits<OptAllocator>::type>>(
262
1
            "MoirQueue/BlockAllocator/DHP");
263
1
    TestQueue<cds::container::MSQueue<cds::gc::HP, ptrdiff_t>>("MSQueue");
264
1
    TestQueue<cds::container::MSQueue<cds::gc::DHP, ptrdiff_t>>("MSQueue/DHP");
265
1
    TestQueue<cds::container::MSQueue<
266
1
        cds::gc::HP, ptrdiff_t,
267
1
        cds::container::msqueue::make_traits<OptAllocator>::type>>(
268
1
            "MSQueue/BlockAllocator");
269
1
    TestQueue<cds::container::MSQueue<
270
1
        cds::gc::DHP, ptrdiff_t,
271
1
        cds::container::msqueue::make_traits<OptAllocator>::type>>(
272
1
            "MSQueue/BlockAllocator/DHP");
273
1
    TestQueue<cds::container::OptimisticQueue<cds::gc::HP, ptrdiff_t>>("OptimisticQueue");
274
1
    TestQueue<cds::container::OptimisticQueue<cds::gc::DHP, ptrdiff_t>>("OptimisticQueue/DHP");
275
1
    TestQueue<cds::container::OptimisticQueue<
276
1
        cds::gc::HP, ptrdiff_t,
277
1
        cds::container::optimistic_queue::make_traits<OptAllocator>::type>>(
278
1
            "OptimisticQueue/BlockAllocator");
279
1
    TestQueue<cds::container::OptimisticQueue<
280
1
        cds::gc::DHP, ptrdiff_t,
281
1
        cds::container::optimistic_queue::make_traits<OptAllocator>::type>>(
282
1
            "OptimisticQueue/BlockAllocator/DHP");
283
1
    TestQueue<cds::container::RWQueue<ptrdiff_t>>("RWQueue");
284
1
    TestQueue<cds::container::SegmentedQueue<cds::gc::HP, ptrdiff_t>>("SegmentedQueue/16", 16);
285
1
    TestQueue<cds::container::SegmentedQueue<cds::gc::HP, ptrdiff_t>>("SegmentedQueue/128", 128);
286
1
    TestQueue<cds::container::VyukovMPMCCycleQueue<ptrdiff_t>>("VyukovMPMCCycleQueue", 50000);
287
1
  }
288
 private:
289
  template <class T, class... Args>
290
8
  void DoTestQueue(const std::string& name, T* queue) {
291
8
    std::atomic<size_t> pushes(0);
292
8
    std::atomic<size_t> pops(0);
293
294
8
    std::vector<std::thread> threads;
295
8
    threads.reserve(workers_);
296
297
8
    CountDownLatch start_latch(workers_);
298
8
    CountDownLatch finish_latch(workers_);
299
300
8
    enum class Role {
301
8
      kReader,
302
8
      kWriter,
303
8
      kBoth,
304
8
    };
305
306
2.05k
    for (size_t i = 0; i != workers_; ++i) {
307
2.04k
      Role role = mixed_mode_ ? Role::kBoth : (i & 1 ? Role::kReader : Role::kWriter);
308
2.02k
      threads.emplace_back([queue, &start_latch, &finish_latch, &pushes, &pops, role] {
309
2.02k
        CDSAttacher attacher;
310
2.02k
        start_latch.CountDown();
311
2.02k
        start_latch.Wait();
312
2.02k
        bool push_done = false;
313
2.02k
        bool pop_done = false;
314
2.02k
        int commands_left = 0;
315
2.02k
        uint64_t commands = 0;
316
2.02k
        std::mt19937_64& random = ThreadLocalRandom();
317
2.02k
        if (role == Role::kWriter) {
318
900
          pop_done = true;
319
1.12k
        } else if (role == Role::kReader) {
320
900
          push_done = true;
321
900
        }
322
15.3M
        while (!push_done || !pop_done) {
323
15.3M
          if (commands_left == 0) {
324
277k
            switch (role) {
325
167k
              case Role::kReader:
326
167k
                commands = 0;
327
167k
                break;
328
110k
              case Role::kWriter:
329
110k
                commands = std::numeric_limits<uint64_t>::max();
330
110k
                break;
331
0
              case Role::kBoth:
332
0
                commands = random();;
333
0
                break;
334
277k
            }
335
277k
            commands_left = sizeof(commands) * 8;
336
277k
          }
337
15.3M
          bool push = (commands & 1) != 0;
338
15.3M
          commands >>= 1;
339
15.3M
          --commands_left;
340
15.3M
          if (push) {
341
7.04M
            auto entry = pushes.fetch_add(1, std::memory_order_acq_rel);
342
7.04M
            if (entry > kEntries) {
343
896
              push_done = true;
344
896
              continue;
345
896
            }
346
7.04M
            while (!queue->push(entry)) {}
347
8.29M
          } else {
348
8.29M
            if (pops.load(std::memory_order_acquire) >= kEntries) {
349
866
              pop_done = true;
350
866
              continue;
351
866
            }
352
8.29M
            typename T::value_type entry;
353
8.29M
            if (queue->pop(entry)) {
354
6.89M
              if (pops.fetch_add(1, std::memory_order_acq_rel) == kEntries - 1) {
355
7
                pop_done = true;
356
7
                continue;
357
7
              }
358
6.89M
            }
359
8.29M
          }
360
15.3M
        }
361
1.82k
        finish_latch.CountDown();
362
1.82k
      });
_ZZN2yb22QueuePerformanceHelper11DoTestQueueIN5boost8lockfree5queueIlJEEEJEEEvRKNSt3__112basic_stringIcNS6_11char_traitsIcEENS6_9allocatorIcEEEEPT_ENKUlvE_clEv
Line
Count
Source
308
254
      threads.emplace_back([queue, &start_latch, &finish_latch, &pushes, &pops, role] {
309
254
        CDSAttacher attacher;
310
254
        start_latch.CountDown();
311
254
        start_latch.Wait();
312
254
        bool push_done = false;
313
254
        bool pop_done = false;
314
254
        int commands_left = 0;
315
254
        uint64_t commands = 0;
316
254
        std::mt19937_64& random = ThreadLocalRandom();
317
254
        if (role == Role::kWriter) {
318
128
          pop_done = true;
319
128
        } else if (role == Role::kReader) {
320
128
          push_done = true;
321
128
        }
322
2.66M
        while (!push_done || !pop_done) {
323
2.66M
          if (commands_left == 0) {
324
61.6k
            switch (role) {
325
45.9k
              case Role::kReader:
326
45.9k
                commands = 0;
327
45.9k
                break;
328
15.6k
              case Role::kWriter:
329
15.6k
                commands = std::numeric_limits<uint64_t>::max();
330
15.6k
                break;
331
0
              case Role::kBoth:
332
0
                commands = random();;
333
0
                break;
334
61.6k
            }
335
61.6k
            commands_left = sizeof(commands) * 8;
336
61.6k
          }
337
2.66M
          bool push = (commands & 1) != 0;
338
2.66M
          commands >>= 1;
339
2.66M
          --commands_left;
340
2.66M
          if (push) {
341
1.00M
            auto entry = pushes.fetch_add(1, std::memory_order_acq_rel);
342
1.00M
            if (entry > kEntries) {
343
128
              push_done = true;
344
128
              continue;
345
128
            }
346
999k
            while (!queue->push(entry)) {}
347
1.66M
          } else {
348
1.66M
            if (pops.load(std::memory_order_acquire) >= kEntries) {
349
118
              pop_done = true;
350
118
              continue;
351
118
            }
352
1.66M
            typename T::value_type entry;
353
1.66M
            if (queue->pop(entry)) {
354
999k
              if (pops.fetch_add(1, std::memory_order_acq_rel) == kEntries - 1) {
355
1
                pop_done = true;
356
1
                continue;
357
1
              }
358
999k
            }
359
1.66M
          }
360
2.66M
        }
361
250
        finish_latch.CountDown();
362
250
      });
_ZZN2yb22QueuePerformanceHelper11DoTestQueueIN5boost8lockfree5queueIlJNS3_11fixed_sizedILb1EEEEEEJEEEvRKNSt3__112basic_stringIcNS8_11char_traitsIcEENS8_9allocatorIcEEEEPT_ENKUlvE_clEv
Line
Count
Source
308
253
      threads.emplace_back([queue, &start_latch, &finish_latch, &pushes, &pops, role] {
309
253
        CDSAttacher attacher;
310
253
        start_latch.CountDown();
311
253
        start_latch.Wait();
312
253
        bool push_done = false;
313
253
        bool pop_done = false;
314
253
        int commands_left = 0;
315
253
        uint64_t commands = 0;
316
253
        std::mt19937_64& random = ThreadLocalRandom();
317
253
        if (role == Role::kWriter) {
318
128
          pop_done = true;
319
128
        } else if (role == Role::kReader) {
320
128
          push_done = true;
321
128
        }
322
2.69M
        while (!push_done || !pop_done) {
323
2.69M
          if (commands_left == 0) {
324
55.8k
            switch (role) {
325
40.1k
              case Role::kReader:
326
40.1k
                commands = 0;
327
40.1k
                break;
328
15.6k
              case Role::kWriter:
329
15.6k
                commands = std::numeric_limits<uint64_t>::max();
330
15.6k
                break;
331
0
              case Role::kBoth:
332
0
                commands = random();;
333
0
                break;
334
55.8k
            }
335
55.8k
            commands_left = sizeof(commands) * 8;
336
55.8k
          }
337
2.69M
          bool push = (commands & 1) != 0;
338
2.69M
          commands >>= 1;
339
2.69M
          --commands_left;
340
2.69M
          if (push) {
341
1.00M
            auto entry = pushes.fetch_add(1, std::memory_order_acq_rel);
342
1.00M
            if (entry > kEntries) {
343
128
              push_done = true;
344
128
              continue;
345
128
            }
346
999k
            while (!queue->push(entry)) {}
347
1.69M
          } else {
348
1.69M
            if (pops.load(std::memory_order_acquire) >= kEntries) {
349
126
              pop_done = true;
350
126
              continue;
351
126
            }
352
1.69M
            typename T::value_type entry;
353
1.69M
            if (queue->pop(entry)) {
354
999k
              if (pops.fetch_add(1, std::memory_order_acq_rel) == kEntries - 1) {
355
1
                pop_done = true;
356
1
                continue;
357
1
              }
358
999k
            }
359
1.69M
          }
360
2.69M
        }
361
248
        finish_latch.CountDown();
362
248
      });
_ZZN2yb22QueuePerformanceHelper11DoTestQueueIN3cds9container11BasketQueueINS2_2gc2HPElNS3_12basket_queue6traitsEEEJEEEvRKNSt3__112basic_stringIcNSA_11char_traitsIcEENSA_9allocatorIcEEEEPT_ENKUlvE_clEv
Line
Count
Source
308
252
      threads.emplace_back([queue, &start_latch, &finish_latch, &pushes, &pops, role] {
309
252
        CDSAttacher attacher;
310
252
        start_latch.CountDown();
311
252
        start_latch.Wait();
312
252
        bool push_done = false;
313
252
        bool pop_done = false;
314
252
        int commands_left = 0;
315
252
        uint64_t commands = 0;
316
252
        std::mt19937_64& random = ThreadLocalRandom();
317
252
        if (role == Role::kWriter) {
318
128
          pop_done = true;
319
128
        } else if (role == Role::kReader) {
320
128
          push_done = true;
321
128
        }
322
1.98M
        while (!push_done || !pop_done) {
323
1.97M
          if (commands_left == 0) {
324
31.8k
            switch (role) {
325
16.1k
              case Role::kReader:
326
16.1k
                commands = 0;
327
16.1k
                break;
328
15.6k
              case Role::kWriter:
329
15.6k
                commands = std::numeric_limits<uint64_t>::max();
330
15.6k
                break;
331
0
              case Role::kBoth:
332
0
                commands = random();;
333
0
                break;
334
31.8k
            }
335
31.8k
            commands_left = sizeof(commands) * 8;
336
31.8k
          }
337
1.97M
          bool push = (commands & 1) != 0;
338
1.97M
          commands >>= 1;
339
1.97M
          --commands_left;
340
1.97M
          if (push) {
341
999k
            auto entry = pushes.fetch_add(1, std::memory_order_acq_rel);
342
999k
            if (entry > kEntries) {
343
128
              push_done = true;
344
128
              continue;
345
128
            }
346
999k
            while (!queue->push(entry)) {}
347
980k
          } else {
348
980k
            if (pops.load(std::memory_order_acquire) >= kEntries) {
349
125
              pop_done = true;
350
125
              continue;
351
125
            }
352
980k
            typename T::value_type entry;
353
980k
            if (queue->pop(entry)) {
354
953k
              if (pops.fetch_add(1, std::memory_order_acq_rel) == kEntries - 1) {
355
1
                pop_done = true;
356
1
                continue;
357
1
              }
358
953k
            }
359
980k
          }
360
1.97M
        }
361
253
        finish_latch.CountDown();
362
253
      });
_ZZN2yb22QueuePerformanceHelper11DoTestQueueIN3cds9container11BasketQueueINS2_2gc3DHPElNS3_12basket_queue6traitsEEEJEEEvRKNSt3__112basic_stringIcNSA_11char_traitsIcEENSA_9allocatorIcEEEEPT_ENKUlvE_clEv
Line
Count
Source
308
255
      threads.emplace_back([queue, &start_latch, &finish_latch, &pushes, &pops, role] {
309
255
        CDSAttacher attacher;
310
255
        start_latch.CountDown();
311
255
        start_latch.Wait();
312
255
        bool push_done = false;
313
255
        bool pop_done = false;
314
255
        int commands_left = 0;
315
255
        uint64_t commands = 0;
316
255
        std::mt19937_64& random = ThreadLocalRandom();
317
255
        if (role == Role::kWriter) {
318
128
          pop_done = true;
319
128
        } else if (role == Role::kReader) {
320
128
          push_done = true;
321
128
        }
322
1.99M
        while (!push_done || !pop_done) {
323
1.99M
          if (commands_left == 0) {
324
31.4k
            switch (role) {
325
15.8k
              case Role::kReader:
326
15.8k
                commands = 0;
327
15.8k
                break;
328
15.6k
              case Role::kWriter:
329
15.6k
                commands = std::numeric_limits<uint64_t>::max();
330
15.6k
                break;
331
0
              case Role::kBoth:
332
0
                commands = random();;
333
0
                break;
334
31.4k
            }
335
31.4k
            commands_left = sizeof(commands) * 8;
336
31.4k
          }
337
1.99M
          bool push = (commands & 1) != 0;
338
1.99M
          commands >>= 1;
339
1.99M
          --commands_left;
340
1.99M
          if (push) {
341
999k
            auto entry = pushes.fetch_add(1, std::memory_order_acq_rel);
342
999k
            if (entry > kEntries) {
343
128
              push_done = true;
344
128
              continue;
345
128
            }
346
999k
            while (!queue->push(entry)) {}
347
992k
          } else {
348
992k
            if (pops.load(std::memory_order_acquire) >= kEntries) {
349
124
              pop_done = true;
350
124
              continue;
351
124
            }
352
992k
            typename T::value_type entry;
353
992k
            if (queue->pop(entry)) {
354
985k
              if (pops.fetch_add(1, std::memory_order_acq_rel) == kEntries - 1) {
355
1
                pop_done = true;
356
1
                continue;
357
1
              }
358
985k
            }
359
992k
          }
360
1.99M
        }
361
255
        finish_latch.CountDown();
362
255
      });
_ZZN2yb22QueuePerformanceHelper11DoTestQueueIN3cds9container11BasketQueueINS2_2gc2HPElNS2_3opt9allocatorINS_14BlockAllocatorIiNSt3__19allocatorIiEEEEE4packINS3_12basket_queue6traitsEEEEEJEEEvRKNSA_12basic_stringIcNSA_11char_traitsIcEENSB_IcEEEEPT_ENKUlvE_clEv
Line
Count
Source
308
254
      threads.emplace_back([queue, &start_latch, &finish_latch, &pushes, &pops, role] {
309
254
        CDSAttacher attacher;
310
254
        start_latch.CountDown();
311
254
        start_latch.Wait();
312
254
        bool push_done = false;
313
254
        bool pop_done = false;
314
254
        int commands_left = 0;
315
254
        uint64_t commands = 0;
316
254
        std::mt19937_64& random = ThreadLocalRandom();
317
254
        if (role == Role::kWriter) {
318
128
          pop_done = true;
319
128
        } else if (role == Role::kReader) {
320
128
          push_done = true;
321
128
        }
322
1.97M
        while (!push_done || !pop_done) {
323
1.97M
          if (commands_left == 0) {
324
31.8k
            switch (role) {
325
16.2k
              case Role::kReader:
326
16.2k
                commands = 0;
327
16.2k
                break;
328
15.6k
              case Role::kWriter:
329
15.6k
                commands = std::numeric_limits<uint64_t>::max();
330
15.6k
                break;
331
0
              case Role::kBoth:
332
0
                commands = random();;
333
0
                break;
334
31.6k
            }
335
31.6k
            commands_left = sizeof(commands) * 8;
336
31.6k
          }
337
1.97M
          bool push = (commands & 1) != 0;
338
1.97M
          commands >>= 1;
339
1.97M
          --commands_left;
340
1.97M
          if (push) {
341
999k
            auto entry = pushes.fetch_add(1, std::memory_order_acq_rel);
342
999k
            if (entry > kEntries) {
343
128
              push_done = true;
344
128
              continue;
345
128
            }
346
999k
            while (!queue->push(entry)) {}
347
976k
          } else {
348
976k
            if (pops.load(std::memory_order_acquire) >= kEntries) {
349
122
              pop_done = true;
350
122
              continue;
351
122
            }
352
976k
            typename T::value_type entry;
353
976k
            if (queue->pop(entry)) {
354
954k
              if (pops.fetch_add(1, std::memory_order_acq_rel) == kEntries - 1) {
355
1
                pop_done = true;
356
1
                continue;
357
1
              }
358
954k
            }
359
976k
          }
360
1.97M
        }
361
60
        finish_latch.CountDown();
362
60
      });
_ZZN2yb22QueuePerformanceHelper11DoTestQueueIN3cds9container11BasketQueueINS2_2gc3DHPElNS2_3opt9allocatorINS_14BlockAllocatorIiNSt3__19allocatorIiEEEEE4packINS3_12basket_queue6traitsEEEEEJEEEvRKNSA_12basic_stringIcNSA_11char_traitsIcEENSB_IcEEEEPT_ENKUlvE_clEv
Line
Count
Source
308
250
      threads.emplace_back([queue, &start_latch, &finish_latch, &pushes, &pops, role] {
309
250
        CDSAttacher attacher;
310
250
        start_latch.CountDown();
311
250
        start_latch.Wait();
312
250
        bool push_done = false;
313
250
        bool pop_done = false;
314
250
        int commands_left = 0;
315
250
        uint64_t commands = 0;
316
250
        std::mt19937_64& random = ThreadLocalRandom();
317
250
        if (role == Role::kWriter) {
318
128
          pop_done = true;
319
128
        } else if (role == Role::kReader) {
320
128
          push_done = true;
321
128
        }
322
2.02M
        while (!push_done || !pop_done) {
323
2.02M
          if (commands_left == 0) {
324
32.1k
            switch (role) {
325
16.4k
              case Role::kReader:
326
16.4k
                commands = 0;
327
16.4k
                break;
328
15.6k
              case Role::kWriter:
329
15.6k
                commands = std::numeric_limits<uint64_t>::max();
330
15.6k
                break;
331
0
              case Role::kBoth:
332
0
                commands = random();;
333
0
                break;
334
32.1k
            }
335
32.1k
            commands_left = sizeof(commands) * 8;
336
32.1k
          }
337
2.02M
          bool push = (commands & 1) != 0;
338
2.02M
          commands >>= 1;
339
2.02M
          --commands_left;
340
2.02M
          if (push) {
341
999k
            auto entry = pushes.fetch_add(1, std::memory_order_acq_rel);
342
999k
            if (entry > kEntries) {
343
128
              push_done = true;
344
128
              continue;
345
128
            }
346
999k
            while (!queue->push(entry)) {}
347
1.02M
          } else {
348
1.02M
            if (pops.load(std::memory_order_acquire) >= kEntries) {
349
125
              pop_done = true;
350
125
              continue;
351
125
            }
352
1.02M
            typename T::value_type entry;
353
1.02M
            if (queue->pop(entry)) {
354
986k
              if (pops.fetch_add(1, std::memory_order_acq_rel) == kEntries - 1) {
355
1
                pop_done = true;
356
1
                continue;
357
1
              }
358
986k
            }
359
1.02M
          }
360
2.02M
        }
361
249
        finish_latch.CountDown();
362
249
      });
_ZZN2yb22QueuePerformanceHelper11DoTestQueueIN3cds9container9MoirQueueINS2_2gc2HPElNS3_7msqueue6traitsEEEJEEEvRKNSt3__112basic_stringIcNSA_11char_traitsIcEENSA_9allocatorIcEEEEPT_ENKUlvE_clEv
Line
Count
Source
308
256
      threads.emplace_back([queue, &start_latch, &finish_latch, &pushes, &pops, role] {
309
256
        CDSAttacher attacher;
310
256
        start_latch.CountDown();
311
256
        start_latch.Wait();
312
256
        bool push_done = false;
313
256
        bool pop_done = false;
314
256
        int commands_left = 0;
315
256
        uint64_t commands = 0;
316
256
        std::mt19937_64& random = ThreadLocalRandom();
317
256
        if (role == Role::kWriter) {
318
128
          pop_done = true;
319
128
        } else if (role == Role::kReader) {
320
128
          push_done = true;
321
128
        }
322
1.94M
        while (!push_done || !pop_done) {
323
1.94M
          if (commands_left == 0) {
324
31.9k
            switch (role) {
325
16.3k
              case Role::kReader:
326
16.3k
                commands = 0;
327
16.3k
                break;
328
15.6k
              case Role::kWriter:
329
15.6k
                commands = std::numeric_limits<uint64_t>::max();
330
15.6k
                break;
331
0
              case Role::kBoth:
332
0
                commands = random();;
333
0
                break;
334
31.9k
            }
335
31.9k
            commands_left = sizeof(commands) * 8;
336
31.9k
          }
337
1.94M
          bool push = (commands & 1) != 0;
338
1.94M
          commands >>= 1;
339
1.94M
          --commands_left;
340
1.94M
          if (push) {
341
999k
            auto entry = pushes.fetch_add(1, std::memory_order_acq_rel);
342
999k
            if (entry > kEntries) {
343
128
              push_done = true;
344
128
              continue;
345
128
            }
346
999k
            while (!queue->push(entry)) {}
347
944k
          } else {
348
944k
            if (pops.load(std::memory_order_acquire) >= kEntries) {
349
126
              pop_done = true;
350
126
              continue;
351
126
            }
352
944k
            typename T::value_type entry;
353
998k
            if (queue->pop(entry)) {
354
998k
              if (pops.fetch_add(1, std::memory_order_acq_rel) == kEntries - 1) {
355
1
                pop_done = true;
356
1
                continue;
357
1
              }
358
998k
            }
359
944k
          }
360
1.94M
        }
361
255
        finish_latch.CountDown();
362
255
      });
_ZZN2yb22QueuePerformanceHelper11DoTestQueueIN3cds9container9MoirQueueINS2_2gc3DHPElNS3_7msqueue6traitsEEEJEEEvRKNSt3__112basic_stringIcNSA_11char_traitsIcEENSA_9allocatorIcEEEEPT_ENKUlvE_clEv
Line
Count
Source
308
252
      threads.emplace_back([queue, &start_latch, &finish_latch, &pushes, &pops, role] {
309
252
        CDSAttacher attacher;
310
252
        start_latch.CountDown();
311
252
        start_latch.Wait();
312
252
        bool push_done = false;
313
252
        bool pop_done = false;
314
252
        int commands_left = 0;
315
252
        uint64_t commands = 0;
316
252
        std::mt19937_64& random = ThreadLocalRandom();
317
252
        if (role == Role::kWriter) {
318
4
          pop_done = true;
319
248
        } else if (role == Role::kReader) {
320
4
          push_done = true;
321
4
        }
322
65.8k
        while (!push_done || !pop_done) {
323
65.5k
          if (commands_left == 0) {
324
1.05k
            switch (role) {
325
291
              case Role::kReader:
326
291
                commands = 0;
327
291
                break;
328
766
              case Role::kWriter:
329
766
                commands = std::numeric_limits<uint64_t>::max();
330
766
                break;
331
0
              case Role::kBoth:
332
0
                commands = random();;
333
0
                break;
334
1.05k
            }
335
1.05k
            commands_left = sizeof(commands) * 8;
336
1.05k
          }
337
65.5k
          bool push = (commands & 1) != 0;
338
65.5k
          commands >>= 1;
339
65.5k
          --commands_left;
340
65.5k
          if (push) {
341
48.8k
            auto entry = pushes.fetch_add(1, std::memory_order_acq_rel);
342
48.8k
            if (entry > kEntries) {
343
0
              push_done = true;
344
0
              continue;
345
0
            }
346
48.8k
            while (!queue->push(entry)) {}
347
16.7k
          } else {
348
16.7k
            if (pops.load(std::memory_order_acquire) >= kEntries) {
349
0
              pop_done = true;
350
0
              continue;
351
0
            }
352
16.7k
            typename T::value_type entry;
353
18.4k
            if (queue->pop(entry)) {
354
18.4k
              if (pops.fetch_add(1, std::memory_order_acq_rel) == kEntries - 1) {
355
0
                pop_done = true;
356
0
                continue;
357
0
              }
358
18.4k
            }
359
16.7k
          }
360
65.5k
        }
361
252
        finish_latch.CountDown();
362
252
      });
Unexecuted instantiation: _ZZN2yb22QueuePerformanceHelper11DoTestQueueIN3cds9container9MoirQueueINS2_2gc2HPElNS2_3opt9allocatorINS_14BlockAllocatorIiNSt3__19allocatorIiEEEEE4packINS3_7msqueue6traitsEEEEEJEEEvRKNSA_12basic_stringIcNSA_11char_traitsIcEENSB_IcEEEEPT_ENKUlvE_clEv
Unexecuted instantiation: _ZZN2yb22QueuePerformanceHelper11DoTestQueueIN3cds9container9MoirQueueINS2_2gc3DHPElNS2_3opt9allocatorINS_14BlockAllocatorIiNSt3__19allocatorIiEEEEE4packINS3_7msqueue6traitsEEEEEJEEEvRKNSA_12basic_stringIcNSA_11char_traitsIcEENSB_IcEEEEPT_ENKUlvE_clEv
Unexecuted instantiation: _ZZN2yb22QueuePerformanceHelper11DoTestQueueIN3cds9container7MSQueueINS2_2gc2HPElNS3_7msqueue6traitsEEEJEEEvRKNSt3__112basic_stringIcNSA_11char_traitsIcEENSA_9allocatorIcEEEEPT_ENKUlvE_clEv
Unexecuted instantiation: _ZZN2yb22QueuePerformanceHelper11DoTestQueueIN3cds9container7MSQueueINS2_2gc3DHPElNS3_7msqueue6traitsEEEJEEEvRKNSt3__112basic_stringIcNSA_11char_traitsIcEENSA_9allocatorIcEEEEPT_ENKUlvE_clEv
Unexecuted instantiation: _ZZN2yb22QueuePerformanceHelper11DoTestQueueIN3cds9container7MSQueueINS2_2gc2HPElNS2_3opt9allocatorINS_14BlockAllocatorIiNSt3__19allocatorIiEEEEE4packINS3_7msqueue6traitsEEEEEJEEEvRKNSA_12basic_stringIcNSA_11char_traitsIcEENSB_IcEEEEPT_ENKUlvE_clEv
Unexecuted instantiation: _ZZN2yb22QueuePerformanceHelper11DoTestQueueIN3cds9container7MSQueueINS2_2gc3DHPElNS2_3opt9allocatorINS_14BlockAllocatorIiNSt3__19allocatorIiEEEEE4packINS3_7msqueue6traitsEEEEEJEEEvRKNSA_12basic_stringIcNSA_11char_traitsIcEENSB_IcEEEEPT_ENKUlvE_clEv
Unexecuted instantiation: _ZZN2yb22QueuePerformanceHelper11DoTestQueueIN3cds9container15OptimisticQueueINS2_2gc2HPElNS3_16optimistic_queue6traitsEEEJEEEvRKNSt3__112basic_stringIcNSA_11char_traitsIcEENSA_9allocatorIcEEEEPT_ENKUlvE_clEv
Unexecuted instantiation: _ZZN2yb22QueuePerformanceHelper11DoTestQueueIN3cds9container15OptimisticQueueINS2_2gc3DHPElNS3_16optimistic_queue6traitsEEEJEEEvRKNSt3__112basic_stringIcNSA_11char_traitsIcEENSA_9allocatorIcEEEEPT_ENKUlvE_clEv
Unexecuted instantiation: _ZZN2yb22QueuePerformanceHelper11DoTestQueueIN3cds9container15OptimisticQueueINS2_2gc2HPElNS2_3opt9allocatorINS_14BlockAllocatorIiNSt3__19allocatorIiEEEEE4packINS3_16optimistic_queue6traitsEEEEEJEEEvRKNSA_12basic_stringIcNSA_11char_traitsIcEENSB_IcEEEEPT_ENKUlvE_clEv
Unexecuted instantiation: _ZZN2yb22QueuePerformanceHelper11DoTestQueueIN3cds9container15OptimisticQueueINS2_2gc3DHPElNS2_3opt9allocatorINS_14BlockAllocatorIiNSt3__19allocatorIiEEEEE4packINS3_16optimistic_queue6traitsEEEEEJEEEvRKNSA_12basic_stringIcNSA_11char_traitsIcEENSB_IcEEEEPT_ENKUlvE_clEv
Unexecuted instantiation: _ZZN2yb22QueuePerformanceHelper11DoTestQueueIN3cds9container7RWQueueIlNS3_7rwqueue6traitsEEEJEEEvRKNSt3__112basic_stringIcNS8_11char_traitsIcEENS8_9allocatorIcEEEEPT_ENKUlvE_clEv
Unexecuted instantiation: _ZZN2yb22QueuePerformanceHelper11DoTestQueueIN3cds9container14SegmentedQueueINS2_2gc2HPElNS3_15segmented_queue6traitsEEEJEEEvRKNSt3__112basic_stringIcNSA_11char_traitsIcEENSA_9allocatorIcEEEEPT_ENKUlvE_clEv
Unexecuted instantiation: _ZZN2yb22QueuePerformanceHelper11DoTestQueueIN3cds9container20VyukovMPMCCycleQueueIlNS3_12vyukov_queue6traitsEEEJEEEvRKNSt3__112basic_stringIcNS8_11char_traitsIcEENS8_9allocatorIcEEEEPT_ENKUlvE_clEv
363
2.04k
    }
364
365
8
    start_latch.Wait();
366
8
    auto start = MonoTime::Now();
367
368
8
    bool wait_result = finish_latch.WaitUntil(start + 10s);
369
8
    auto stop = MonoTime::Now();
370
8
    auto passed = stop - start;
371
372
8
    if (!wait_result) {
373
0
      pushes.fetch_add(kEntries, std::memory_order_acq_rel);
374
0
      pops.fetch_add(kEntries, std::memory_order_acq_rel);
375
      // Cleanup queue, since some of implementations could hang on queue overflow.
376
0
      while (!finish_latch.WaitFor(10ms)) {
377
0
        typename T::value_type entry;
378
0
        while (queue->pop(entry)) {}
379
0
      }
380
0
    }
381
382
1.79k
    for (auto& thread : threads) {
383
1.79k
      thread.join();
384
1.79k
    }
385
386
8
    if (!name.empty()) {
387
6
      if (wait_result) {
388
6
        LOG(INFO) << name << ": " << passed;
389
0
      } else {
390
0
        LOG(INFO) << name << ": TIMED OUT";
391
0
      }
392
6
    }
393
8
  }
_ZN2yb22QueuePerformanceHelper11DoTestQueueIN5boost8lockfree5queueIlJEEEJEEEvRKNSt3__112basic_stringIcNS6_11char_traitsIcEENS6_9allocatorIcEEEEPT_
Line
Count
Source
290
1
  void DoTestQueue(const std::string& name, T* queue) {
291
1
    std::atomic<size_t> pushes(0);
292
1
    std::atomic<size_t> pops(0);
293
294
1
    std::vector<std::thread> threads;
295
1
    threads.reserve(workers_);
296
297
1
    CountDownLatch start_latch(workers_);
298
1
    CountDownLatch finish_latch(workers_);
299
300
1
    enum class Role {
301
1
      kReader,
302
1
      kWriter,
303
1
      kBoth,
304
1
    };
305
306
257
    for (size_t i = 0; i != workers_; ++i) {
307
256
      Role role = mixed_mode_ ? Role::kBoth : (i & 1 ? Role::kReader : Role::kWriter);
308
256
      threads.emplace_back([queue, &start_latch, &finish_latch, &pushes, &pops, role] {
309
256
        CDSAttacher attacher;
310
256
        start_latch.CountDown();
311
256
        start_latch.Wait();
312
256
        bool push_done = false;
313
256
        bool pop_done = false;
314
256
        int commands_left = 0;
315
256
        uint64_t commands = 0;
316
256
        std::mt19937_64& random = ThreadLocalRandom();
317
256
        if (role == Role::kWriter) {
318
256
          pop_done = true;
319
256
        } else if (role == Role::kReader) {
320
256
          push_done = true;
321
256
        }
322
256
        while (!push_done || !pop_done) {
323
256
          if (commands_left == 0) {
324
256
            switch (role) {
325
256
              case Role::kReader:
326
256
                commands = 0;
327
256
                break;
328
256
              case Role::kWriter:
329
256
                commands = std::numeric_limits<uint64_t>::max();
330
256
                break;
331
256
              case Role::kBoth:
332
256
                commands = random();;
333
256
                break;
334
256
            }
335
256
            commands_left = sizeof(commands) * 8;
336
256
          }
337
256
          bool push = (commands & 1) != 0;
338
256
          commands >>= 1;
339
256
          --commands_left;
340
256
          if (push) {
341
256
            auto entry = pushes.fetch_add(1, std::memory_order_acq_rel);
342
256
            if (entry > kEntries) {
343
256
              push_done = true;
344
256
              continue;
345
256
            }
346
256
            while (!queue->push(entry)) {}
347
256
          } else {
348
256
            if (pops.load(std::memory_order_acquire) >= kEntries) {
349
256
              pop_done = true;
350
256
              continue;
351
256
            }
352
256
            typename T::value_type entry;
353
256
            if (queue->pop(entry)) {
354
256
              if (pops.fetch_add(1, std::memory_order_acq_rel) == kEntries - 1) {
355
256
                pop_done = true;
356
256
                continue;
357
256
              }
358
256
            }
359
256
          }
360
256
        }
361
256
        finish_latch.CountDown();
362
256
      });
363
256
    }
364
365
1
    start_latch.Wait();
366
1
    auto start = MonoTime::Now();
367
368
1
    bool wait_result = finish_latch.WaitUntil(start + 10s);
369
1
    auto stop = MonoTime::Now();
370
1
    auto passed = stop - start;
371
372
1
    if (!wait_result) {
373
0
      pushes.fetch_add(kEntries, std::memory_order_acq_rel);
374
0
      pops.fetch_add(kEntries, std::memory_order_acq_rel);
375
      // Cleanup queue, since some of implementations could hang on queue overflow.
376
0
      while (!finish_latch.WaitFor(10ms)) {
377
0
        typename T::value_type entry;
378
0
        while (queue->pop(entry)) {}
379
0
      }
380
0
    }
381
382
256
    for (auto& thread : threads) {
383
256
      thread.join();
384
256
    }
385
386
1
    if (!name.empty()) {
387
0
      if (wait_result) {
388
0
        LOG(INFO) << name << ": " << passed;
389
0
      } else {
390
0
        LOG(INFO) << name << ": TIMED OUT";
391
0
      }
392
0
    }
393
1
  }
_ZN2yb22QueuePerformanceHelper11DoTestQueueIN5boost8lockfree5queueIlJNS3_11fixed_sizedILb1EEEEEEJEEEvRKNSt3__112basic_stringIcNS8_11char_traitsIcEENS8_9allocatorIcEEEEPT_
Line
Count
Source
290
1
  void DoTestQueue(const std::string& name, T* queue) {
291
1
    std::atomic<size_t> pushes(0);
292
1
    std::atomic<size_t> pops(0);
293
294
1
    std::vector<std::thread> threads;
295
1
    threads.reserve(workers_);
296
297
1
    CountDownLatch start_latch(workers_);
298
1
    CountDownLatch finish_latch(workers_);
299
300
1
    enum class Role {
301
1
      kReader,
302
1
      kWriter,
303
1
      kBoth,
304
1
    };
305
306
257
    for (size_t i = 0; i != workers_; ++i) {
307
256
      Role role = mixed_mode_ ? Role::kBoth : (i & 1 ? Role::kReader : Role::kWriter);
308
256
      threads.emplace_back([queue, &start_latch, &finish_latch, &pushes, &pops, role] {
309
256
        CDSAttacher attacher;
310
256
        start_latch.CountDown();
311
256
        start_latch.Wait();
312
256
        bool push_done = false;
313
256
        bool pop_done = false;
314
256
        int commands_left = 0;
315
256
        uint64_t commands = 0;
316
256
        std::mt19937_64& random = ThreadLocalRandom();
317
256
        if (role == Role::kWriter) {
318
256
          pop_done = true;
319
256
        } else if (role == Role::kReader) {
320
256
          push_done = true;
321
256
        }
322
256
        while (!push_done || !pop_done) {
323
256
          if (commands_left == 0) {
324
256
            switch (role) {
325
256
              case Role::kReader:
326
256
                commands = 0;
327
256
                break;
328
256
              case Role::kWriter:
329
256
                commands = std::numeric_limits<uint64_t>::max();
330
256
                break;
331
256
              case Role::kBoth:
332
256
                commands = random();;
333
256
                break;
334
256
            }
335
256
            commands_left = sizeof(commands) * 8;
336
256
          }
337
256
          bool push = (commands & 1) != 0;
338
256
          commands >>= 1;
339
256
          --commands_left;
340
256
          if (push) {
341
256
            auto entry = pushes.fetch_add(1, std::memory_order_acq_rel);
342
256
            if (entry > kEntries) {
343
256
              push_done = true;
344
256
              continue;
345
256
            }
346
256
            while (!queue->push(entry)) {}
347
256
          } else {
348
256
            if (pops.load(std::memory_order_acquire) >= kEntries) {
349
256
              pop_done = true;
350
256
              continue;
351
256
            }
352
256
            typename T::value_type entry;
353
256
            if (queue->pop(entry)) {
354
256
              if (pops.fetch_add(1, std::memory_order_acq_rel) == kEntries - 1) {
355
256
                pop_done = true;
356
256
                continue;
357
256
              }
358
256
            }
359
256
          }
360
256
        }
361
256
        finish_latch.CountDown();
362
256
      });
363
256
    }
364
365
1
    start_latch.Wait();
366
1
    auto start = MonoTime::Now();
367
368
1
    bool wait_result = finish_latch.WaitUntil(start + 10s);
369
1
    auto stop = MonoTime::Now();
370
1
    auto passed = stop - start;
371
372
1
    if (!wait_result) {
373
0
      pushes.fetch_add(kEntries, std::memory_order_acq_rel);
374
0
      pops.fetch_add(kEntries, std::memory_order_acq_rel);
375
      // Cleanup queue, since some of implementations could hang on queue overflow.
376
0
      while (!finish_latch.WaitFor(10ms)) {
377
0
        typename T::value_type entry;
378
0
        while (queue->pop(entry)) {}
379
0
      }
380
0
    }
381
382
256
    for (auto& thread : threads) {
383
256
      thread.join();
384
256
    }
385
386
1
    if (!name.empty()) {
387
1
      if (wait_result) {
388
1
        LOG(INFO) << name << ": " << passed;
389
0
      } else {
390
0
        LOG(INFO) << name << ": TIMED OUT";
391
0
      }
392
1
    }
393
1
  }
_ZN2yb22QueuePerformanceHelper11DoTestQueueIN3cds9container11BasketQueueINS2_2gc2HPElNS3_12basket_queue6traitsEEEJEEEvRKNSt3__112basic_stringIcNSA_11char_traitsIcEENSA_9allocatorIcEEEEPT_
Line
Count
Source
290
1
  void DoTestQueue(const std::string& name, T* queue) {
291
1
    std::atomic<size_t> pushes(0);
292
1
    std::atomic<size_t> pops(0);
293
294
1
    std::vector<std::thread> threads;
295
1
    threads.reserve(workers_);
296
297
1
    CountDownLatch start_latch(workers_);
298
1
    CountDownLatch finish_latch(workers_);
299
300
1
    enum class Role {
301
1
      kReader,
302
1
      kWriter,
303
1
      kBoth,
304
1
    };
305
306
257
    for (size_t i = 0; i != workers_; ++i) {
307
256
      Role role = mixed_mode_ ? Role::kBoth : (i & 1 ? Role::kReader : Role::kWriter);
308
256
      threads.emplace_back([queue, &start_latch, &finish_latch, &pushes, &pops, role] {
309
256
        CDSAttacher attacher;
310
256
        start_latch.CountDown();
311
256
        start_latch.Wait();
312
256
        bool push_done = false;
313
256
        bool pop_done = false;
314
256
        int commands_left = 0;
315
256
        uint64_t commands = 0;
316
256
        std::mt19937_64& random = ThreadLocalRandom();
317
256
        if (role == Role::kWriter) {
318
256
          pop_done = true;
319
256
        } else if (role == Role::kReader) {
320
256
          push_done = true;
321
256
        }
322
256
        while (!push_done || !pop_done) {
323
256
          if (commands_left == 0) {
324
256
            switch (role) {
325
256
              case Role::kReader:
326
256
                commands = 0;
327
256
                break;
328
256
              case Role::kWriter:
329
256
                commands = std::numeric_limits<uint64_t>::max();
330
256
                break;
331
256
              case Role::kBoth:
332
256
                commands = random();;
333
256
                break;
334
256
            }
335
256
            commands_left = sizeof(commands) * 8;
336
256
          }
337
256
          bool push = (commands & 1) != 0;
338
256
          commands >>= 1;
339
256
          --commands_left;
340
256
          if (push) {
341
256
            auto entry = pushes.fetch_add(1, std::memory_order_acq_rel);
342
256
            if (entry > kEntries) {
343
256
              push_done = true;
344
256
              continue;
345
256
            }
346
256
            while (!queue->push(entry)) {}
347
256
          } else {
348
256
            if (pops.load(std::memory_order_acquire) >= kEntries) {
349
256
              pop_done = true;
350
256
              continue;
351
256
            }
352
256
            typename T::value_type entry;
353
256
            if (queue->pop(entry)) {
354
256
              if (pops.fetch_add(1, std::memory_order_acq_rel) == kEntries - 1) {
355
256
                pop_done = true;
356
256
                continue;
357
256
              }
358
256
            }
359
256
          }
360
256
        }
361
256
        finish_latch.CountDown();
362
256
      });
363
256
    }
364
365
1
    start_latch.Wait();
366
1
    auto start = MonoTime::Now();
367
368
1
    bool wait_result = finish_latch.WaitUntil(start + 10s);
369
1
    auto stop = MonoTime::Now();
370
1
    auto passed = stop - start;
371
372
1
    if (!wait_result) {
373
0
      pushes.fetch_add(kEntries, std::memory_order_acq_rel);
374
0
      pops.fetch_add(kEntries, std::memory_order_acq_rel);
375
      // Cleanup queue, since some of implementations could hang on queue overflow.
376
0
      while (!finish_latch.WaitFor(10ms)) {
377
0
        typename T::value_type entry;
378
0
        while (queue->pop(entry)) {}
379
0
      }
380
0
    }
381
382
256
    for (auto& thread : threads) {
383
256
      thread.join();
384
256
    }
385
386
1
    if (!name.empty()) {
387
1
      if (wait_result) {
388
1
        LOG(INFO) << name << ": " << passed;
389
0
      } else {
390
0
        LOG(INFO) << name << ": TIMED OUT";
391
0
      }
392
1
    }
393
1
  }
_ZN2yb22QueuePerformanceHelper11DoTestQueueIN3cds9container11BasketQueueINS2_2gc3DHPElNS3_12basket_queue6traitsEEEJEEEvRKNSt3__112basic_stringIcNSA_11char_traitsIcEENSA_9allocatorIcEEEEPT_
Line
Count
Source
290
1
  void DoTestQueue(const std::string& name, T* queue) {
291
1
    std::atomic<size_t> pushes(0);
292
1
    std::atomic<size_t> pops(0);
293
294
1
    std::vector<std::thread> threads;
295
1
    threads.reserve(workers_);
296
297
1
    CountDownLatch start_latch(workers_);
298
1
    CountDownLatch finish_latch(workers_);
299
300
1
    enum class Role {
301
1
      kReader,
302
1
      kWriter,
303
1
      kBoth,
304
1
    };
305
306
257
    for (size_t i = 0; i != workers_; ++i) {
307
256
      Role role = mixed_mode_ ? Role::kBoth : (i & 1 ? Role::kReader : Role::kWriter);
308
256
      threads.emplace_back([queue, &start_latch, &finish_latch, &pushes, &pops, role] {
309
256
        CDSAttacher attacher;
310
256
        start_latch.CountDown();
311
256
        start_latch.Wait();
312
256
        bool push_done = false;
313
256
        bool pop_done = false;
314
256
        int commands_left = 0;
315
256
        uint64_t commands = 0;
316
256
        std::mt19937_64& random = ThreadLocalRandom();
317
256
        if (role == Role::kWriter) {
318
256
          pop_done = true;
319
256
        } else if (role == Role::kReader) {
320
256
          push_done = true;
321
256
        }
322
256
        while (!push_done || !pop_done) {
323
256
          if (commands_left == 0) {
324
256
            switch (role) {
325
256
              case Role::kReader:
326
256
                commands = 0;
327
256
                break;
328
256
              case Role::kWriter:
329
256
                commands = std::numeric_limits<uint64_t>::max();
330
256
                break;
331
256
              case Role::kBoth:
332
256
                commands = random();;
333
256
                break;
334
256
            }
335
256
            commands_left = sizeof(commands) * 8;
336
256
          }
337
256
          bool push = (commands & 1) != 0;
338
256
          commands >>= 1;
339
256
          --commands_left;
340
256
          if (push) {
341
256
            auto entry = pushes.fetch_add(1, std::memory_order_acq_rel);
342
256
            if (entry > kEntries) {
343
256
              push_done = true;
344
256
              continue;
345
256
            }
346
256
            while (!queue->push(entry)) {}
347
256
          } else {
348
256
            if (pops.load(std::memory_order_acquire) >= kEntries) {
349
256
              pop_done = true;
350
256
              continue;
351
256
            }
352
256
            typename T::value_type entry;
353
256
            if (queue->pop(entry)) {
354
256
              if (pops.fetch_add(1, std::memory_order_acq_rel) == kEntries - 1) {
355
256
                pop_done = true;
356
256
                continue;
357
256
              }
358
256
            }
359
256
          }
360
256
        }
361
256
        finish_latch.CountDown();
362
256
      });
363
256
    }
364
365
1
    start_latch.Wait();
366
1
    auto start = MonoTime::Now();
367
368
1
    bool wait_result = finish_latch.WaitUntil(start + 10s);
369
1
    auto stop = MonoTime::Now();
370
1
    auto passed = stop - start;
371
372
1
    if (!wait_result) {
373
0
      pushes.fetch_add(kEntries, std::memory_order_acq_rel);
374
0
      pops.fetch_add(kEntries, std::memory_order_acq_rel);
375
      // Cleanup queue, since some of implementations could hang on queue overflow.
376
0
      while (!finish_latch.WaitFor(10ms)) {
377
0
        typename T::value_type entry;
378
0
        while (queue->pop(entry)) {}
379
0
      }
380
0
    }
381
382
256
    for (auto& thread : threads) {
383
256
      thread.join();
384
256
    }
385
386
1
    if (!name.empty()) {
387
1
      if (wait_result) {
388
1
        LOG(INFO) << name << ": " << passed;
389
0
      } else {
390
0
        LOG(INFO) << name << ": TIMED OUT";
391
0
      }
392
1
    }
393
1
  }
_ZN2yb22QueuePerformanceHelper11DoTestQueueIN3cds9container11BasketQueueINS2_2gc2HPElNS2_3opt9allocatorINS_14BlockAllocatorIiNSt3__19allocatorIiEEEEE4packINS3_12basket_queue6traitsEEEEEJEEEvRKNSA_12basic_stringIcNSA_11char_traitsIcEENSB_IcEEEEPT_
Line
Count
Source
290
1
  void DoTestQueue(const std::string& name, T* queue) {
291
1
    std::atomic<size_t> pushes(0);
292
1
    std::atomic<size_t> pops(0);
293
294
1
    std::vector<std::thread> threads;
295
1
    threads.reserve(workers_);
296
297
1
    CountDownLatch start_latch(workers_);
298
1
    CountDownLatch finish_latch(workers_);
299
300
1
    enum class Role {
301
1
      kReader,
302
1
      kWriter,
303
1
      kBoth,
304
1
    };
305
306
257
    for (size_t i = 0; i != workers_; ++i) {
307
256
      Role role = mixed_mode_ ? Role::kBoth : (i & 1 ? Role::kReader : Role::kWriter);
308
256
      threads.emplace_back([queue, &start_latch, &finish_latch, &pushes, &pops, role] {
309
256
        CDSAttacher attacher;
310
256
        start_latch.CountDown();
311
256
        start_latch.Wait();
312
256
        bool push_done = false;
313
256
        bool pop_done = false;
314
256
        int commands_left = 0;
315
256
        uint64_t commands = 0;
316
256
        std::mt19937_64& random = ThreadLocalRandom();
317
256
        if (role == Role::kWriter) {
318
256
          pop_done = true;
319
256
        } else if (role == Role::kReader) {
320
256
          push_done = true;
321
256
        }
322
256
        while (!push_done || !pop_done) {
323
256
          if (commands_left == 0) {
324
256
            switch (role) {
325
256
              case Role::kReader:
326
256
                commands = 0;
327
256
                break;
328
256
              case Role::kWriter:
329
256
                commands = std::numeric_limits<uint64_t>::max();
330
256
                break;
331
256
              case Role::kBoth:
332
256
                commands = random();;
333
256
                break;
334
256
            }
335
256
            commands_left = sizeof(commands) * 8;
336
256
          }
337
256
          bool push = (commands & 1) != 0;
338
256
          commands >>= 1;
339
256
          --commands_left;
340
256
          if (push) {
341
256
            auto entry = pushes.fetch_add(1, std::memory_order_acq_rel);
342
256
            if (entry > kEntries) {
343
256
              push_done = true;
344
256
              continue;
345
256
            }
346
256
            while (!queue->push(entry)) {}
347
256
          } else {
348
256
            if (pops.load(std::memory_order_acquire) >= kEntries) {
349
256
              pop_done = true;
350
256
              continue;
351
256
            }
352
256
            typename T::value_type entry;
353
256
            if (queue->pop(entry)) {
354
256
              if (pops.fetch_add(1, std::memory_order_acq_rel) == kEntries - 1) {
355
256
                pop_done = true;
356
256
                continue;
357
256
              }
358
256
            }
359
256
          }
360
256
        }
361
256
        finish_latch.CountDown();
362
256
      });
363
256
    }
364
365
1
    start_latch.Wait();
366
1
    auto start = MonoTime::Now();
367
368
1
    bool wait_result = finish_latch.WaitUntil(start + 10s);
369
1
    auto stop = MonoTime::Now();
370
1
    auto passed = stop - start;
371
372
1
    if (!wait_result) {
373
0
      pushes.fetch_add(kEntries, std::memory_order_acq_rel);
374
0
      pops.fetch_add(kEntries, std::memory_order_acq_rel);
375
      // Cleanup queue, since some of implementations could hang on queue overflow.
376
0
      while (!finish_latch.WaitFor(10ms)) {
377
0
        typename T::value_type entry;
378
0
        while (queue->pop(entry)) {}
379
0
      }
380
0
    }
381
382
256
    for (auto& thread : threads) {
383
256
      thread.join();
384
256
    }
385
386
1
    if (!name.empty()) {
387
1
      if (wait_result) {
388
1
        LOG(INFO) << name << ": " << passed;
389
0
      } else {
390
0
        LOG(INFO) << name << ": TIMED OUT";
391
0
      }
392
1
    }
393
1
  }
_ZN2yb22QueuePerformanceHelper11DoTestQueueIN3cds9container11BasketQueueINS2_2gc3DHPElNS2_3opt9allocatorINS_14BlockAllocatorIiNSt3__19allocatorIiEEEEE4packINS3_12basket_queue6traitsEEEEEJEEEvRKNSA_12basic_stringIcNSA_11char_traitsIcEENSB_IcEEEEPT_
Line
Count
Source
290
1
  void DoTestQueue(const std::string& name, T* queue) {
291
1
    std::atomic<size_t> pushes(0);
292
1
    std::atomic<size_t> pops(0);
293
294
1
    std::vector<std::thread> threads;
295
1
    threads.reserve(workers_);
296
297
1
    CountDownLatch start_latch(workers_);
298
1
    CountDownLatch finish_latch(workers_);
299
300
1
    enum class Role {
301
1
      kReader,
302
1
      kWriter,
303
1
      kBoth,
304
1
    };
305
306
257
    for (size_t i = 0; i != workers_; ++i) {
307
256
      Role role = mixed_mode_ ? Role::kBoth : (i & 1 ? Role::kReader : Role::kWriter);
308
256
      threads.emplace_back([queue, &start_latch, &finish_latch, &pushes, &pops, role] {
309
256
        CDSAttacher attacher;
310
256
        start_latch.CountDown();
311
256
        start_latch.Wait();
312
256
        bool push_done = false;
313
256
        bool pop_done = false;
314
256
        int commands_left = 0;
315
256
        uint64_t commands = 0;
316
256
        std::mt19937_64& random = ThreadLocalRandom();
317
256
        if (role == Role::kWriter) {
318
256
          pop_done = true;
319
256
        } else if (role == Role::kReader) {
320
256
          push_done = true;
321
256
        }
322
256
        while (!push_done || !pop_done) {
323
256
          if (commands_left == 0) {
324
256
            switch (role) {
325
256
              case Role::kReader:
326
256
                commands = 0;
327
256
                break;
328
256
              case Role::kWriter:
329
256
                commands = std::numeric_limits<uint64_t>::max();
330
256
                break;
331
256
              case Role::kBoth:
332
256
                commands = random();;
333
256
                break;
334
256
            }
335
256
            commands_left = sizeof(commands) * 8;
336
256
          }
337
256
          bool push = (commands & 1) != 0;
338
256
          commands >>= 1;
339
256
          --commands_left;
340
256
          if (push) {
341
256
            auto entry = pushes.fetch_add(1, std::memory_order_acq_rel);
342
256
            if (entry > kEntries) {
343
256
              push_done = true;
344
256
              continue;
345
256
            }
346
256
            while (!queue->push(entry)) {}
347
256
          } else {
348
256
            if (pops.load(std::memory_order_acquire) >= kEntries) {
349
256
              pop_done = true;
350
256
              continue;
351
256
            }
352
256
            typename T::value_type entry;
353
256
            if (queue->pop(entry)) {
354
256
              if (pops.fetch_add(1, std::memory_order_acq_rel) == kEntries - 1) {
355
256
                pop_done = true;
356
256
                continue;
357
256
              }
358
256
            }
359
256
          }
360
256
        }
361
256
        finish_latch.CountDown();
362
256
      });
363
256
    }
364
365
1
    start_latch.Wait();
366
1
    auto start = MonoTime::Now();
367
368
1
    bool wait_result = finish_latch.WaitUntil(start + 10s);
369
1
    auto stop = MonoTime::Now();
370
1
    auto passed = stop - start;
371
372
1
    if (!wait_result) {
373
0
      pushes.fetch_add(kEntries, std::memory_order_acq_rel);
374
0
      pops.fetch_add(kEntries, std::memory_order_acq_rel);
375
      // Cleanup queue, since some of implementations could hang on queue overflow.
376
0
      while (!finish_latch.WaitFor(10ms)) {
377
0
        typename T::value_type entry;
378
0
        while (queue->pop(entry)) {}
379
0
      }
380
0
    }
381
382
256
    for (auto& thread : threads) {
383
256
      thread.join();
384
256
    }
385
386
1
    if (!name.empty()) {
387
1
      if (wait_result) {
388
1
        LOG(INFO) << name << ": " << passed;
389
0
      } else {
390
0
        LOG(INFO) << name << ": TIMED OUT";
391
0
      }
392
1
    }
393
1
  }
_ZN2yb22QueuePerformanceHelper11DoTestQueueIN3cds9container9MoirQueueINS2_2gc2HPElNS3_7msqueue6traitsEEEJEEEvRKNSt3__112basic_stringIcNSA_11char_traitsIcEENSA_9allocatorIcEEEEPT_
Line
Count
Source
290
1
  void DoTestQueue(const std::string& name, T* queue) {
291
1
    std::atomic<size_t> pushes(0);
292
1
    std::atomic<size_t> pops(0);
293
294
1
    std::vector<std::thread> threads;
295
1
    threads.reserve(workers_);
296
297
1
    CountDownLatch start_latch(workers_);
298
1
    CountDownLatch finish_latch(workers_);
299
300
1
    enum class Role {
301
1
      kReader,
302
1
      kWriter,
303
1
      kBoth,
304
1
    };
305
306
257
    for (size_t i = 0; i != workers_; ++i) {
307
256
      Role role = mixed_mode_ ? Role::kBoth : (i & 1 ? Role::kReader : Role::kWriter);
308
256
      threads.emplace_back([queue, &start_latch, &finish_latch, &pushes, &pops, role] {
309
256
        CDSAttacher attacher;
310
256
        start_latch.CountDown();
311
256
        start_latch.Wait();
312
256
        bool push_done = false;
313
256
        bool pop_done = false;
314
256
        int commands_left = 0;
315
256
        uint64_t commands = 0;
316
256
        std::mt19937_64& random = ThreadLocalRandom();
317
256
        if (role == Role::kWriter) {
318
256
          pop_done = true;
319
256
        } else if (role == Role::kReader) {
320
256
          push_done = true;
321
256
        }
322
256
        while (!push_done || !pop_done) {
323
256
          if (commands_left == 0) {
324
256
            switch (role) {
325
256
              case Role::kReader:
326
256
                commands = 0;
327
256
                break;
328
256
              case Role::kWriter:
329
256
                commands = std::numeric_limits<uint64_t>::max();
330
256
                break;
331
256
              case Role::kBoth:
332
256
                commands = random();;
333
256
                break;
334
256
            }
335
256
            commands_left = sizeof(commands) * 8;
336
256
          }
337
256
          bool push = (commands & 1) != 0;
338
256
          commands >>= 1;
339
256
          --commands_left;
340
256
          if (push) {
341
256
            auto entry = pushes.fetch_add(1, std::memory_order_acq_rel);
342
256
            if (entry > kEntries) {
343
256
              push_done = true;
344
256
              continue;
345
256
            }
346
256
            while (!queue->push(entry)) {}
347
256
          } else {
348
256
            if (pops.load(std::memory_order_acquire) >= kEntries) {
349
256
              pop_done = true;
350
256
              continue;
351
256
            }
352
256
            typename T::value_type entry;
353
256
            if (queue->pop(entry)) {
354
256
              if (pops.fetch_add(1, std::memory_order_acq_rel) == kEntries - 1) {
355
256
                pop_done = true;
356
256
                continue;
357
256
              }
358
256
            }
359
256
          }
360
256
        }
361
256
        finish_latch.CountDown();
362
256
      });
363
256
    }
364
365
1
    start_latch.Wait();
366
1
    auto start = MonoTime::Now();
367
368
1
    bool wait_result = finish_latch.WaitUntil(start + 10s);
369
1
    auto stop = MonoTime::Now();
370
1
    auto passed = stop - start;
371
372
1
    if (!wait_result) {
373
0
      pushes.fetch_add(kEntries, std::memory_order_acq_rel);
374
0
      pops.fetch_add(kEntries, std::memory_order_acq_rel);
375
      // Cleanup queue, since some of implementations could hang on queue overflow.
376
0
      while (!finish_latch.WaitFor(10ms)) {
377
0
        typename T::value_type entry;
378
0
        while (queue->pop(entry)) {}
379
0
      }
380
0
    }
381
382
256
    for (auto& thread : threads) {
383
256
      thread.join();
384
256
    }
385
386
1
    if (!name.empty()) {
387
1
      if (wait_result) {
388
1
        LOG(INFO) << name << ": " << passed;
389
0
      } else {
390
0
        LOG(INFO) << name << ": TIMED OUT";
391
0
      }
392
1
    }
393
1
  }
_ZN2yb22QueuePerformanceHelper11DoTestQueueIN3cds9container9MoirQueueINS2_2gc3DHPElNS3_7msqueue6traitsEEEJEEEvRKNSt3__112basic_stringIcNSA_11char_traitsIcEENSA_9allocatorIcEEEEPT_
Line
Count
Source
290
1
  void DoTestQueue(const std::string& name, T* queue) {
291
1
    std::atomic<size_t> pushes(0);
292
1
    std::atomic<size_t> pops(0);
293
294
1
    std::vector<std::thread> threads;
295
1
    threads.reserve(workers_);
296
297
1
    CountDownLatch start_latch(workers_);
298
1
    CountDownLatch finish_latch(workers_);
299
300
1
    enum class Role {
301
1
      kReader,
302
1
      kWriter,
303
1
      kBoth,
304
1
    };
305
306
257
    for (size_t i = 0; i != workers_; ++i) {
307
256
      Role role = mixed_mode_ ? Role::kBoth : (i & 1 ? Role::kReader : Role::kWriter);
308
256
      threads.emplace_back([queue, &start_latch, &finish_latch, &pushes, &pops, role] {
309
256
        CDSAttacher attacher;
310
256
        start_latch.CountDown();
311
256
        start_latch.Wait();
312
256
        bool push_done = false;
313
256
        bool pop_done = false;
314
256
        int commands_left = 0;
315
256
        uint64_t commands = 0;
316
256
        std::mt19937_64& random = ThreadLocalRandom();
317
256
        if (role == Role::kWriter) {
318
256
          pop_done = true;
319
256
        } else if (role == Role::kReader) {
320
256
          push_done = true;
321
256
        }
322
256
        while (!push_done || !pop_done) {
323
256
          if (commands_left == 0) {
324
256
            switch (role) {
325
256
              case Role::kReader:
326
256
                commands = 0;
327
256
                break;
328
256
              case Role::kWriter:
329
256
                commands = std::numeric_limits<uint64_t>::max();
330
256
                break;
331
256
              case Role::kBoth:
332
256
                commands = random();;
333
256
                break;
334
256
            }
335
256
            commands_left = sizeof(commands) * 8;
336
256
          }
337
256
          bool push = (commands & 1) != 0;
338
256
          commands >>= 1;
339
256
          --commands_left;
340
256
          if (push) {
341
256
            auto entry = pushes.fetch_add(1, std::memory_order_acq_rel);
342
256
            if (entry > kEntries) {
343
256
              push_done = true;
344
256
              continue;
345
256
            }
346
256
            while (!queue->push(entry)) {}
347
256
          } else {
348
256
            if (pops.load(std::memory_order_acquire) >= kEntries) {
349
256
              pop_done = true;
350
256
              continue;
351
256
            }
352
256
            typename T::value_type entry;
353
256
            if (queue->pop(entry)) {
354
256
              if (pops.fetch_add(1, std::memory_order_acq_rel) == kEntries - 1) {
355
256
                pop_done = true;
356
256
                continue;
357
256
              }
358
256
            }
359
256
          }
360
256
        }
361
256
        finish_latch.CountDown();
362
256
      });
363
256
    }
364
365
1
    start_latch.Wait();
366
1
    auto start = MonoTime::Now();
367
368
1
    bool wait_result = finish_latch.WaitUntil(start + 10s);
369
1
    auto stop = MonoTime::Now();
370
1
    auto passed = stop - start;
371
372
1
    if (!wait_result) {
373
0
      pushes.fetch_add(kEntries, std::memory_order_acq_rel);
374
0
      pops.fetch_add(kEntries, std::memory_order_acq_rel);
375
      // Cleanup queue, since some of implementations could hang on queue overflow.
376
0
      while (!finish_latch.WaitFor(10ms)) {
377
0
        typename T::value_type entry;
378
0
        while (queue->pop(entry)) {}
379
0
      }
380
0
    }
381
382
0
    for (auto& thread : threads) {
383
0
      thread.join();
384
0
    }
385
386
1
    if (!name.empty()) {
387
0
      if (wait_result) {
388
0
        LOG(INFO) << name << ": " << passed;
389
0
      } else {
390
0
        LOG(INFO) << name << ": TIMED OUT";
391
0
      }
392
0
    }
393
1
  }
Unexecuted instantiation: _ZN2yb22QueuePerformanceHelper11DoTestQueueIN3cds9container9MoirQueueINS2_2gc2HPElNS2_3opt9allocatorINS_14BlockAllocatorIiNSt3__19allocatorIiEEEEE4packINS3_7msqueue6traitsEEEEEJEEEvRKNSA_12basic_stringIcNSA_11char_traitsIcEENSB_IcEEEEPT_
Unexecuted instantiation: _ZN2yb22QueuePerformanceHelper11DoTestQueueIN3cds9container9MoirQueueINS2_2gc3DHPElNS2_3opt9allocatorINS_14BlockAllocatorIiNSt3__19allocatorIiEEEEE4packINS3_7msqueue6traitsEEEEEJEEEvRKNSA_12basic_stringIcNSA_11char_traitsIcEENSB_IcEEEEPT_
Unexecuted instantiation: _ZN2yb22QueuePerformanceHelper11DoTestQueueIN3cds9container7MSQueueINS2_2gc2HPElNS3_7msqueue6traitsEEEJEEEvRKNSt3__112basic_stringIcNSA_11char_traitsIcEENSA_9allocatorIcEEEEPT_
Unexecuted instantiation: _ZN2yb22QueuePerformanceHelper11DoTestQueueIN3cds9container7MSQueueINS2_2gc3DHPElNS3_7msqueue6traitsEEEJEEEvRKNSt3__112basic_stringIcNSA_11char_traitsIcEENSA_9allocatorIcEEEEPT_
Unexecuted instantiation: _ZN2yb22QueuePerformanceHelper11DoTestQueueIN3cds9container7MSQueueINS2_2gc2HPElNS2_3opt9allocatorINS_14BlockAllocatorIiNSt3__19allocatorIiEEEEE4packINS3_7msqueue6traitsEEEEEJEEEvRKNSA_12basic_stringIcNSA_11char_traitsIcEENSB_IcEEEEPT_
Unexecuted instantiation: _ZN2yb22QueuePerformanceHelper11DoTestQueueIN3cds9container7MSQueueINS2_2gc3DHPElNS2_3opt9allocatorINS_14BlockAllocatorIiNSt3__19allocatorIiEEEEE4packINS3_7msqueue6traitsEEEEEJEEEvRKNSA_12basic_stringIcNSA_11char_traitsIcEENSB_IcEEEEPT_
Unexecuted instantiation: _ZN2yb22QueuePerformanceHelper11DoTestQueueIN3cds9container15OptimisticQueueINS2_2gc2HPElNS3_16optimistic_queue6traitsEEEJEEEvRKNSt3__112basic_stringIcNSA_11char_traitsIcEENSA_9allocatorIcEEEEPT_
Unexecuted instantiation: _ZN2yb22QueuePerformanceHelper11DoTestQueueIN3cds9container15OptimisticQueueINS2_2gc3DHPElNS3_16optimistic_queue6traitsEEEJEEEvRKNSt3__112basic_stringIcNSA_11char_traitsIcEENSA_9allocatorIcEEEEPT_
Unexecuted instantiation: _ZN2yb22QueuePerformanceHelper11DoTestQueueIN3cds9container15OptimisticQueueINS2_2gc2HPElNS2_3opt9allocatorINS_14BlockAllocatorIiNSt3__19allocatorIiEEEEE4packINS3_16optimistic_queue6traitsEEEEEJEEEvRKNSA_12basic_stringIcNSA_11char_traitsIcEENSB_IcEEEEPT_
Unexecuted instantiation: _ZN2yb22QueuePerformanceHelper11DoTestQueueIN3cds9container15OptimisticQueueINS2_2gc3DHPElNS2_3opt9allocatorINS_14BlockAllocatorIiNSt3__19allocatorIiEEEEE4packINS3_16optimistic_queue6traitsEEEEEJEEEvRKNSA_12basic_stringIcNSA_11char_traitsIcEENSB_IcEEEEPT_
Unexecuted instantiation: _ZN2yb22QueuePerformanceHelper11DoTestQueueIN3cds9container7RWQueueIlNS3_7rwqueue6traitsEEEJEEEvRKNSt3__112basic_stringIcNS8_11char_traitsIcEENS8_9allocatorIcEEEEPT_
Unexecuted instantiation: _ZN2yb22QueuePerformanceHelper11DoTestQueueIN3cds9container14SegmentedQueueINS2_2gc2HPElNS3_15segmented_queue6traitsEEEJEEEvRKNSt3__112basic_stringIcNSA_11char_traitsIcEENSA_9allocatorIcEEEEPT_
Unexecuted instantiation: _ZN2yb22QueuePerformanceHelper11DoTestQueueIN3cds9container20VyukovMPMCCycleQueueIlNS3_12vyukov_queue6traitsEEEJEEEvRKNSt3__112basic_stringIcNS8_11char_traitsIcEENS8_9allocatorIcEEEEPT_
394
395
  template <class T>
396
6
  void TestQueue(const std::string& name) {
397
6
    T queue;
398
6
    DoTestQueue(name, &queue);
399
6
  }
_ZN2yb22QueuePerformanceHelper9TestQueueIN3cds9container11BasketQueueINS2_2gc2HPElNS3_12basket_queue6traitsEEEEEvRKNSt3__112basic_stringIcNSA_11char_traitsIcEENSA_9allocatorIcEEEE
Line
Count
Source
396
1
  void TestQueue(const std::string& name) {
397
1
    T queue;
398
1
    DoTestQueue(name, &queue);
399
1
  }
_ZN2yb22QueuePerformanceHelper9TestQueueIN3cds9container11BasketQueueINS2_2gc3DHPElNS3_12basket_queue6traitsEEEEEvRKNSt3__112basic_stringIcNSA_11char_traitsIcEENSA_9allocatorIcEEEE
Line
Count
Source
396
1
  void TestQueue(const std::string& name) {
397
1
    T queue;
398
1
    DoTestQueue(name, &queue);
399
1
  }
_ZN2yb22QueuePerformanceHelper9TestQueueIN3cds9container11BasketQueueINS2_2gc2HPElNS2_3opt9allocatorINS_14BlockAllocatorIiNSt3__19allocatorIiEEEEE4packINS3_12basket_queue6traitsEEEEEEEvRKNSA_12basic_stringIcNSA_11char_traitsIcEENSB_IcEEEE
Line
Count
Source
396
1
  void TestQueue(const std::string& name) {
397
1
    T queue;
398
1
    DoTestQueue(name, &queue);
399
1
  }
_ZN2yb22QueuePerformanceHelper9TestQueueIN3cds9container11BasketQueueINS2_2gc3DHPElNS2_3opt9allocatorINS_14BlockAllocatorIiNSt3__19allocatorIiEEEEE4packINS3_12basket_queue6traitsEEEEEEEvRKNSA_12basic_stringIcNSA_11char_traitsIcEENSB_IcEEEE
Line
Count
Source
396
1
  void TestQueue(const std::string& name) {
397
1
    T queue;
398
1
    DoTestQueue(name, &queue);
399
1
  }
_ZN2yb22QueuePerformanceHelper9TestQueueIN3cds9container9MoirQueueINS2_2gc2HPElNS3_7msqueue6traitsEEEEEvRKNSt3__112basic_stringIcNSA_11char_traitsIcEENSA_9allocatorIcEEEE
Line
Count
Source
396
1
  void TestQueue(const std::string& name) {
397
1
    T queue;
398
1
    DoTestQueue(name, &queue);
399
1
  }
_ZN2yb22QueuePerformanceHelper9TestQueueIN3cds9container9MoirQueueINS2_2gc3DHPElNS3_7msqueue6traitsEEEEEvRKNSt3__112basic_stringIcNSA_11char_traitsIcEENSA_9allocatorIcEEEE
Line
Count
Source
396
1
  void TestQueue(const std::string& name) {
397
1
    T queue;
398
1
    DoTestQueue(name, &queue);
399
1
  }
Unexecuted instantiation: _ZN2yb22QueuePerformanceHelper9TestQueueIN3cds9container9MoirQueueINS2_2gc2HPElNS2_3opt9allocatorINS_14BlockAllocatorIiNSt3__19allocatorIiEEEEE4packINS3_7msqueue6traitsEEEEEEEvRKNSA_12basic_stringIcNSA_11char_traitsIcEENSB_IcEEEE
Unexecuted instantiation: _ZN2yb22QueuePerformanceHelper9TestQueueIN3cds9container9MoirQueueINS2_2gc3DHPElNS2_3opt9allocatorINS_14BlockAllocatorIiNSt3__19allocatorIiEEEEE4packINS3_7msqueue6traitsEEEEEEEvRKNSA_12basic_stringIcNSA_11char_traitsIcEENSB_IcEEEE
Unexecuted instantiation: _ZN2yb22QueuePerformanceHelper9TestQueueIN3cds9container7MSQueueINS2_2gc2HPElNS3_7msqueue6traitsEEEEEvRKNSt3__112basic_stringIcNSA_11char_traitsIcEENSA_9allocatorIcEEEE
Unexecuted instantiation: _ZN2yb22QueuePerformanceHelper9TestQueueIN3cds9container7MSQueueINS2_2gc3DHPElNS3_7msqueue6traitsEEEEEvRKNSt3__112basic_stringIcNSA_11char_traitsIcEENSA_9allocatorIcEEEE
Unexecuted instantiation: _ZN2yb22QueuePerformanceHelper9TestQueueIN3cds9container7MSQueueINS2_2gc2HPElNS2_3opt9allocatorINS_14BlockAllocatorIiNSt3__19allocatorIiEEEEE4packINS3_7msqueue6traitsEEEEEEEvRKNSA_12basic_stringIcNSA_11char_traitsIcEENSB_IcEEEE
Unexecuted instantiation: _ZN2yb22QueuePerformanceHelper9TestQueueIN3cds9container7MSQueueINS2_2gc3DHPElNS2_3opt9allocatorINS_14BlockAllocatorIiNSt3__19allocatorIiEEEEE4packINS3_7msqueue6traitsEEEEEEEvRKNSA_12basic_stringIcNSA_11char_traitsIcEENSB_IcEEEE
Unexecuted instantiation: _ZN2yb22QueuePerformanceHelper9TestQueueIN3cds9container15OptimisticQueueINS2_2gc2HPElNS3_16optimistic_queue6traitsEEEEEvRKNSt3__112basic_stringIcNSA_11char_traitsIcEENSA_9allocatorIcEEEE
Unexecuted instantiation: _ZN2yb22QueuePerformanceHelper9TestQueueIN3cds9container15OptimisticQueueINS2_2gc3DHPElNS3_16optimistic_queue6traitsEEEEEvRKNSt3__112basic_stringIcNSA_11char_traitsIcEENSA_9allocatorIcEEEE
Unexecuted instantiation: _ZN2yb22QueuePerformanceHelper9TestQueueIN3cds9container15OptimisticQueueINS2_2gc2HPElNS2_3opt9allocatorINS_14BlockAllocatorIiNSt3__19allocatorIiEEEEE4packINS3_16optimistic_queue6traitsEEEEEEEvRKNSA_12basic_stringIcNSA_11char_traitsIcEENSB_IcEEEE
Unexecuted instantiation: _ZN2yb22QueuePerformanceHelper9TestQueueIN3cds9container15OptimisticQueueINS2_2gc3DHPElNS2_3opt9allocatorINS_14BlockAllocatorIiNSt3__19allocatorIiEEEEE4packINS3_16optimistic_queue6traitsEEEEEEEvRKNSA_12basic_stringIcNSA_11char_traitsIcEENSB_IcEEEE
Unexecuted instantiation: _ZN2yb22QueuePerformanceHelper9TestQueueIN3cds9container7RWQueueIlNS3_7rwqueue6traitsEEEEEvRKNSt3__112basic_stringIcNS8_11char_traitsIcEENS8_9allocatorIcEEEE
400
401
  template <class T, class... Args>
402
2
  void TestQueue(const std::string& name, Args&&... args) {
403
2
    T queue(std::forward<Args>(args)...);
404
2
    DoTestQueue(name, &queue);
405
2
  }
_ZN2yb22QueuePerformanceHelper9TestQueueIN5boost8lockfree5queueIlJEEEJiEEEvRKNSt3__112basic_stringIcNS6_11char_traitsIcEENS6_9allocatorIcEEEEDpOT0_
Line
Count
Source
402
1
  void TestQueue(const std::string& name, Args&&... args) {
403
1
    T queue(std::forward<Args>(args)...);
404
1
    DoTestQueue(name, &queue);
405
1
  }
_ZN2yb22QueuePerformanceHelper9TestQueueIN5boost8lockfree5queueIlJNS3_11fixed_sizedILb1EEEEEEJiEEEvRKNSt3__112basic_stringIcNS8_11char_traitsIcEENS8_9allocatorIcEEEEDpOT0_
Line
Count
Source
402
1
  void TestQueue(const std::string& name, Args&&... args) {
403
1
    T queue(std::forward<Args>(args)...);
404
1
    DoTestQueue(name, &queue);
405
1
  }
Unexecuted instantiation: _ZN2yb22QueuePerformanceHelper9TestQueueIN3cds9container14SegmentedQueueINS2_2gc2HPElNS3_15segmented_queue6traitsEEEJiEEEvRKNSt3__112basic_stringIcNSA_11char_traitsIcEENSA_9allocatorIcEEEEDpOT0_
Unexecuted instantiation: _ZN2yb22QueuePerformanceHelper9TestQueueIN3cds9container20VyukovMPMCCycleQueueIlNS3_12vyukov_queue6traitsEEEJiEEEvRKNSt3__112basic_stringIcNS8_11char_traitsIcEENS8_9allocatorIcEEEEDpOT0_
406
407
  size_t workers_ = 0x100;
408
  bool mixed_mode_ = false;
409
};
410
411
1
TEST(LockfreeTest, QueuePerformance) {
412
1
  InitGoogleLoggingSafeBasic("lockfree");
413
1
  cds::gc::hp::GarbageCollector::construct(0 /* nHazardPtrCount */, 1000 /* nMaxThreadCount */);
414
1
  InitThreading();
415
416
  // We should move it is ThreadMgr in case we decide to use some data struct that uses GC.
417
  // I.e. anything that is customized with cds::gc::HP/cds::gc::DHP.
418
1
  QueuePerformanceHelper helper;
419
1
  helper.Warmup();
420
1
  helper.Perform(0x100, false);
421
1
  helper.Perform(0x100, true);
422
1
  helper.Perform(0x10, false);
423
1
  helper.Perform(0x10, true);
424
1
}
425
426
1
TEST(LockfreeTest, Stack) {
427
1
  constexpr int kNumEntries = 100;
428
1
  constexpr int kNumThreads = 5;
429
430
1
  struct Entry : public MPSCQueueEntry<Entry> {
431
1
    int value;
432
1
  };
433
434
1
  LockFreeStack<Entry> stack;
435
1
  std::vector<Entry> entries(kNumEntries);
436
101
  for (int i = 0; i != kNumEntries; ++i) {
437
100
    entries[i].value = i;
438
100
    stack.Push(&entries[i]);
439
100
  }
440
441
1
  TestThreadHolder holder;
442
6
  for (int i = 0; i != kNumThreads; ++i) {
443
    // Each thread randomly does one of
444
    // 1) pull items from shared stack and store it to local set.
445
    // 2) push random item from local set to shared stack.
446
5
    holder.AddThread([&stack, &stop = holder.stop_flag()] {
447
5
      std::vector<Entry*> local;
448
6.70M
      while (!stop.load(std::memory_order_acquire)) {
449
6.70M
        bool push = !local.empty() && RandomUniformInt(0, 1);
450
6.70M
        if (push) {
451
3.27M
          size_t index = RandomUniformInt<size_t>(0, local.size() - 1);
452
3.27M
          stack.Push(local[index]);
453
3.27M
          local[index] = local.back();
454
3.27M
          local.pop_back();
455
3.43M
        } else {
456
3.43M
          auto entry = stack.Pop();
457
3.43M
          if (entry) {
458
3.28M
            local.push_back(entry);
459
3.28M
          }
460
3.43M
        }
461
6.70M
      }
462
80
      while (!local.empty()) {
463
75
        stack.Push(local.back());
464
75
        local.pop_back();
465
75
      }
466
5
    });
467
5
  }
468
469
1
  holder.WaitAndStop(5s);
470
471
1
  std::vector<int> content;
472
101
  while (content.size() <= kNumEntries) {
473
101
    auto entry = stack.Pop();
474
101
    if (!entry) {
475
1
      break;
476
1
    }
477
100
    content.push_back(entry->value);
478
100
  }
479
480
1
  LOG(INFO) << "Content: " << yb::ToString(content);
481
482
1
  ASSERT_EQ(content.size(), kNumEntries);
483
484
1
  std::sort(content.begin(), content.end());
485
101
  for (int i = 0; i != kNumEntries; ++i) {
486
100
    ASSERT_EQ(content[i], i);
487
100
  }
488
1
}
489
490
} // namespace yb