/Users/deen/code/yugabyte-db/src/yb/util/ref_cnt_buffer-test.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // |
2 | | // Copyright (c) YugaByte, Inc. |
3 | | // |
4 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
5 | | // in compliance with the License. You may obtain a copy of the License at |
6 | | // |
7 | | // http://www.apache.org/licenses/LICENSE-2.0 |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
10 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
11 | | // or implied. See the License for the specific language governing permissions and limitations |
12 | | // under the License. |
13 | | // |
14 | | // |
15 | | |
16 | | #include <condition_variable> |
17 | | #include <mutex> |
18 | | #include <thread> |
19 | | |
20 | | #include <boost/ptr_container/ptr_vector.hpp> |
21 | | #include <gtest/gtest.h> |
22 | | |
23 | | #include "yb/util/ref_cnt_buffer.h" |
24 | | #include "yb/util/test_util.h" |
25 | | |
26 | | using namespace std::literals; |
27 | | |
28 | | namespace yb { |
29 | | namespace util { |
30 | | |
31 | | class RefCntBufferTest : public YBTest { |
32 | | }; |
33 | | |
34 | | const size_t kSizeLimit = 0x1000; |
35 | | |
36 | | // Test buffer allocation by its size. Also check copy semantics. |
37 | 1 | TEST_F(RefCntBufferTest, TestSize) { |
38 | 1 | unsigned int seed = SeedRandom(); |
39 | 10.0k | for (auto i = 10000; i--;) { |
40 | 10.0k | size_t size = rand_r(&seed) % (kSizeLimit + 1); // Zero size is also allowed |
41 | 10.0k | RefCntBuffer buffer(size); |
42 | 10.0k | auto copy = buffer; |
43 | 20.5M | for (size_t index = 0; index != size; ++index) { |
44 | 20.5M | buffer.begin()[index] = index; |
45 | 20.5M | } |
46 | 10.0k | ASSERT_EQ(buffer.begin(), copy.begin()); |
47 | 10.0k | ASSERT_EQ(buffer.end(), copy.end()); |
48 | 10.0k | ASSERT_EQ(buffer.size(), copy.size()); |
49 | 10.0k | } |
50 | 1 | } |
51 | | |
52 | | // Test buffer allocation by data block. |
53 | 1 | TEST_F(RefCntBufferTest, TestFromData) { |
54 | 1 | unsigned int seed = SeedRandom(); |
55 | 10.0k | for (auto i = 10000; i--;) { |
56 | 10.0k | size_t size = rand_r(&seed) % (kSizeLimit + 1); // Zero size is also allowed |
57 | 10.0k | RefCntBuffer buffer(size); |
58 | 20.4M | for (size_t index = 0; index != size; ++index) { |
59 | 20.4M | buffer.begin()[index] = index; |
60 | 20.4M | } |
61 | | |
62 | 10.0k | RefCntBuffer copy(buffer.begin(), buffer.end()); |
63 | 10.0k | ASSERT_NE(buffer.begin(), copy.begin()); |
64 | 10.0k | ASSERT_NE(buffer.end(), copy.end()); |
65 | 10.0k | ASSERT_EQ(buffer.size(), copy.size()); |
66 | 20.4M | for (size_t index = 0; index != size; ++index) { |
67 | 20.4M | ASSERT_EQ(buffer.begin()[index], copy.begin()[index]); |
68 | 20.4M | } |
69 | 10.0k | } |
70 | 1 | } |
71 | | |
72 | | // Test vector of buffers. |
73 | 1 | TEST_F(RefCntBufferTest, TestVector) { |
74 | 1 | std::vector<RefCntBuffer> v; |
75 | 10.0k | for (auto i = 10000; i--;) { |
76 | 10.0k | v.emplace_back(kSizeLimit); |
77 | 10.0k | ASSERT_TRUE(v.back()); |
78 | 10.0k | } |
79 | | |
80 | 1 | unsigned int seed = SeedRandom(); |
81 | 10.0k | while (!v.empty()) { |
82 | 10.0k | size_t idx = rand_r(&seed) % v.size(); |
83 | 10.0k | auto temp = v[idx]; |
84 | 10.0k | v[idx] = v.back(); |
85 | 10.0k | v.pop_back(); |
86 | 10.0k | ASSERT_TRUE(temp); |
87 | 10.0k | } |
88 | 1 | } |
89 | | |
90 | | namespace { |
91 | | |
92 | | const size_t kInitialBuffers = 1000; |
93 | | |
94 | | class TestQueue { |
95 | | public: |
96 | | TestQueue(const TestQueue&) = delete; |
97 | | TestQueue& operator=(const TestQueue&) = delete; |
98 | | |
99 | 4 | TestQueue() {} |
100 | | |
101 | 0 | void TalkTo(boost::ptr_vector<TestQueue>* queues) { |
102 | 0 | queues_ = queues; |
103 | 0 | } |
104 | | |
105 | 10.7k | void Enqueue(RefCntBuffer buffer) { |
106 | 10.7k | { |
107 | 10.7k | std::lock_guard<std::mutex> lock(mutex_); |
108 | 10.7k | ASSERT_TRUE(buffer); |
109 | | // We don't use std::move in this test because we want to check reference counting. |
110 | 10.7k | buffers_.push_back(buffer); |
111 | 10.7k | ++received_buffers_; |
112 | 10.7k | } |
113 | 10.7k | cond_.notify_one(); |
114 | 10.7k | } |
115 | | |
116 | 4 | void Interrupt() { |
117 | 4 | { |
118 | 4 | std::lock_guard<std::mutex> lock(mutex_); |
119 | 4 | interruption_requested_ = true; |
120 | 4 | } |
121 | 4 | cond_.notify_one(); |
122 | 4 | } |
123 | | |
124 | 4 | void Assert() { |
125 | 4 | LOG(INFO) << "Sent buffers: " << sent_buffers_ << ", received buffers: " << received_buffers_ |
126 | 4 | << ", has buffers: " << buffers_.size(); |
127 | 4 | ASSERT_EQ(kInitialBuffers + received_buffers_ - sent_buffers_, buffers_.size()); |
128 | 4 | } |
129 | | |
130 | 4 | void Run() { |
131 | 4 | std::unique_lock<std::mutex> lock(mutex_); |
132 | 3.46k | for (auto i = kInitialBuffers; i--;) { |
133 | 3.45k | buffers_.emplace_back(kSizeLimit); |
134 | 3.45k | ASSERT_TRUE(buffers_.back()); |
135 | 3.45k | } |
136 | | |
137 | 6 | unsigned int seed = SeedRandom(); |
138 | 10.8k | while (!interruption_requested_) { |
139 | 10.7k | RefCntBuffer buffer; |
140 | 10.7k | if (!buffers_.empty()) { |
141 | 10.7k | size_t idx = rand_r(&seed) % buffers_.size(); |
142 | 10.7k | buffer = buffers_[idx]; |
143 | 10.7k | buffers_[idx] = buffers_.back(); |
144 | 10.7k | buffers_.pop_back(); |
145 | 10.7k | ++sent_buffers_; |
146 | 10.7k | ASSERT_TRUE(buffer); |
147 | 10.7k | } |
148 | | |
149 | 10.8k | if (buffer) { |
150 | 10.7k | lock.unlock(); |
151 | 10.7k | size_t queue_index = rand_r(&seed) % queues_->size(); |
152 | 10.7k | (*queues_)[queue_index].Enqueue(buffer); |
153 | 10.7k | lock.lock(); |
154 | 10.7k | } |
155 | | |
156 | 10.8k | cond_.wait_for(lock, 1ms); // Wait until something enqueued, or timeout. |
157 | 10.8k | } |
158 | 6 | } |
159 | | private: |
160 | | boost::ptr_vector<TestQueue>* queues_; |
161 | | std::vector<RefCntBuffer> buffers_; |
162 | | std::atomic<bool> interruption_requested_ = {false}; |
163 | | std::mutex mutex_; |
164 | | std::condition_variable cond_; |
165 | | size_t sent_buffers_ = 0; |
166 | | size_t received_buffers_ = 0; |
167 | | }; |
168 | | |
169 | | } // namespace |
170 | | |
171 | | // Test how buffers behave with multiple threads. Mostly for ASAN and TSAN. |
172 | 1 | TEST_F(RefCntBufferTest, TestThreads) { |
173 | 1 | const size_t kQueuesCount = 4; |
174 | 1 | boost::ptr_vector<TestQueue> queues; |
175 | 5 | for (size_t i = kQueuesCount; i--;) { |
176 | 4 | queues.push_back(new TestQueue); |
177 | 4 | } |
178 | 4 | for (auto& queue : queues) { |
179 | 4 | queue.TalkTo(&queues); |
180 | 4 | } |
181 | | |
182 | 1 | std::vector<std::thread> threads; |
183 | 4 | for (auto& queue : queues) { |
184 | 4 | threads.emplace_back(std::bind(&TestQueue::Run, &queue)); |
185 | 4 | } |
186 | | |
187 | 1 | std::this_thread::sleep_for(2s); |
188 | | |
189 | 4 | for (auto& queue : queues) { |
190 | 4 | queue.Interrupt(); |
191 | 4 | } |
192 | | |
193 | 4 | for (auto& thread : threads) { |
194 | 4 | thread.join(); |
195 | 4 | } |
196 | | |
197 | 4 | for (auto& queue : queues) { |
198 | 4 | queue.Assert(); |
199 | 4 | } |
200 | 1 | } |
201 | | |
202 | | } // namespace util |
203 | | } // namespace yb |