YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/util/ref_cnt_buffer-test.cc
Line
Count
Source (jump to first uncovered line)
1
//
2
// Copyright (c) YugaByte, Inc.
3
//
4
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
5
// in compliance with the License.  You may obtain a copy of the License at
6
//
7
// http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software distributed under the License
10
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
11
// or implied.  See the License for the specific language governing permissions and limitations
12
// under the License.
13
//
14
//
15
16
#include <condition_variable>
17
#include <mutex>
18
#include <thread>
19
20
#include <boost/ptr_container/ptr_vector.hpp>
21
#include <gtest/gtest.h>
22
23
#include "yb/util/ref_cnt_buffer.h"
24
#include "yb/util/test_util.h"
25
26
using namespace std::literals;
27
28
namespace yb {
29
namespace util {
30
31
class RefCntBufferTest : public YBTest {
32
};
33
34
const size_t kSizeLimit = 0x1000;
35
36
// Test buffer allocation by its size. Also check copy semantics.
37
1
TEST_F(RefCntBufferTest, TestSize) {
38
1
  unsigned int seed = SeedRandom();
39
10.0k
  for (auto i = 10000; i--;) {
40
10.0k
    size_t size = rand_r(&seed) % (kSizeLimit + 1); // Zero size is also allowed
41
10.0k
    RefCntBuffer buffer(size);
42
10.0k
    auto copy = buffer;
43
20.5M
    for (size_t index = 0; index != size; ++index) {
44
20.5M
      buffer.begin()[index] = index;
45
20.5M
    }
46
10.0k
    ASSERT_EQ(buffer.begin(), copy.begin());
47
10.0k
    ASSERT_EQ(buffer.end(), copy.end());
48
10.0k
    ASSERT_EQ(buffer.size(), copy.size());
49
10.0k
  }
50
1
}
51
52
// Test buffer allocation by data block.
53
1
TEST_F(RefCntBufferTest, TestFromData) {
54
1
  unsigned int seed = SeedRandom();
55
10.0k
  for (auto i = 10000; i--;) {
56
10.0k
    size_t size = rand_r(&seed) % (kSizeLimit + 1); // Zero size is also allowed
57
10.0k
    RefCntBuffer buffer(size);
58
20.4M
    for (size_t index = 0; index != size; ++index) {
59
20.4M
      buffer.begin()[index] = index;
60
20.4M
    }
61
62
10.0k
    RefCntBuffer copy(buffer.begin(), buffer.end());
63
10.0k
    ASSERT_NE(buffer.begin(), copy.begin());
64
10.0k
    ASSERT_NE(buffer.end(), copy.end());
65
10.0k
    ASSERT_EQ(buffer.size(), copy.size());
66
20.4M
    for (size_t index = 0; index != size; ++index) {
67
20.4M
      ASSERT_EQ(buffer.begin()[index], copy.begin()[index]);
68
20.4M
    }
69
10.0k
  }
70
1
}
71
72
// Test vector of buffers.
73
1
TEST_F(RefCntBufferTest, TestVector) {
74
1
  std::vector<RefCntBuffer> v;
75
10.0k
  for (auto i = 10000; i--;) {
76
10.0k
    v.emplace_back(kSizeLimit);
77
10.0k
    ASSERT_TRUE(v.back());
78
10.0k
  }
79
80
1
  unsigned int seed = SeedRandom();
81
10.0k
  while (!v.empty()) {
82
10.0k
    size_t idx = rand_r(&seed) % v.size();
83
10.0k
    auto temp = v[idx];
84
10.0k
    v[idx] = v.back();
85
10.0k
    v.pop_back();
86
10.0k
    ASSERT_TRUE(temp);
87
10.0k
  }
88
1
}
89
90
namespace {
91
92
const size_t kInitialBuffers = 1000;
93
94
class TestQueue {
95
 public:
96
  TestQueue(const TestQueue&) = delete;
97
  TestQueue& operator=(const TestQueue&) = delete;
98
99
4
  TestQueue() {}
100
101
0
  void TalkTo(boost::ptr_vector<TestQueue>* queues) {
102
0
    queues_ = queues;
103
0
  }
104
105
10.7k
  void Enqueue(RefCntBuffer buffer) {
106
10.7k
    {
107
10.7k
      std::lock_guard<std::mutex> lock(mutex_);
108
10.7k
      ASSERT_TRUE(buffer);
109
      // We don't use std::move in this test because we want to check reference counting.
110
10.7k
      buffers_.push_back(buffer);
111
10.7k
      ++received_buffers_;
112
10.7k
    }
113
10.7k
    cond_.notify_one();
114
10.7k
  }
115
116
4
  void Interrupt() {
117
4
    {
118
4
      std::lock_guard<std::mutex> lock(mutex_);
119
4
      interruption_requested_ = true;
120
4
    }
121
4
    cond_.notify_one();
122
4
  }
123
124
4
  void Assert() {
125
4
    LOG(INFO) << "Sent buffers: " << sent_buffers_ << ", received buffers: " << received_buffers_
126
4
              << ", has buffers: " << buffers_.size();
127
4
    ASSERT_EQ(kInitialBuffers + received_buffers_ - sent_buffers_, buffers_.size());
128
4
  }
129
130
4
  void Run() {
131
4
    std::unique_lock<std::mutex> lock(mutex_);
132
3.46k
    for (auto i = kInitialBuffers; i--;) {
133
3.45k
      buffers_.emplace_back(kSizeLimit);
134
3.45k
      ASSERT_TRUE(buffers_.back());
135
3.45k
    }
136
137
6
    unsigned int seed = SeedRandom();
138
10.8k
    while (!interruption_requested_) {
139
10.7k
      RefCntBuffer buffer;
140
10.7k
      if (!buffers_.empty()) {
141
10.7k
        size_t idx = rand_r(&seed) % buffers_.size();
142
10.7k
        buffer = buffers_[idx];
143
10.7k
        buffers_[idx] = buffers_.back();
144
10.7k
        buffers_.pop_back();
145
10.7k
        ++sent_buffers_;
146
10.7k
        ASSERT_TRUE(buffer);
147
10.7k
      }
148
149
10.8k
      if (buffer) {
150
10.7k
        lock.unlock();
151
10.7k
        size_t queue_index = rand_r(&seed) % queues_->size();
152
10.7k
        (*queues_)[queue_index].Enqueue(buffer);
153
10.7k
        lock.lock();
154
10.7k
      }
155
156
10.8k
      cond_.wait_for(lock, 1ms); // Wait until something enqueued, or timeout.
157
10.8k
    }
158
6
  }
159
 private:
160
  boost::ptr_vector<TestQueue>* queues_;
161
  std::vector<RefCntBuffer> buffers_;
162
  std::atomic<bool> interruption_requested_ = {false};
163
  std::mutex mutex_;
164
  std::condition_variable cond_;
165
  size_t sent_buffers_ = 0;
166
  size_t received_buffers_ = 0;
167
};
168
169
} // namespace
170
171
// Test how buffers behave with multiple threads. Mostly for ASAN and TSAN.
172
1
TEST_F(RefCntBufferTest, TestThreads) {
173
1
  const size_t kQueuesCount = 4;
174
1
  boost::ptr_vector<TestQueue> queues;
175
5
  for (size_t i = kQueuesCount; i--;) {
176
4
    queues.push_back(new TestQueue);
177
4
  }
178
4
  for (auto& queue : queues) {
179
4
    queue.TalkTo(&queues);
180
4
  }
181
182
1
  std::vector<std::thread> threads;
183
4
  for (auto& queue : queues) {
184
4
    threads.emplace_back(std::bind(&TestQueue::Run, &queue));
185
4
  }
186
187
1
  std::this_thread::sleep_for(2s);
188
189
4
  for (auto& queue : queues) {
190
4
    queue.Interrupt();
191
4
  }
192
193
4
  for (auto& thread : threads) {
194
4
    thread.join();
195
4
  }
196
197
4
  for (auto& queue : queues) {
198
4
    queue.Assert();
199
4
  }
200
1
}
201
202
} // namespace util
203
} // namespace yb