YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/util/blocking_queue-test.cc
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include <functional>
34
#include <memory>
35
#include <string>
36
#include <thread>
37
#include <vector>
38
39
#include <glog/logging.h>
40
#include <gtest/gtest.h>
41
42
#include "yb/util/blocking_queue.h"
43
#include "yb/util/countdown_latch.h"
44
45
using std::shared_ptr;
46
using std::string;
47
using std::vector;
48
49
namespace yb {
50
51
BlockingQueue<int32_t> test1_queue(5);
52
53
1
void InsertSomeThings(void) {
54
1
  ASSERT_EQ(test1_queue.Put(1), QUEUE_SUCCESS);
55
1
  ASSERT_EQ(test1_queue.Put(2), QUEUE_SUCCESS);
56
1
  ASSERT_EQ(test1_queue.Put(3), QUEUE_SUCCESS);
57
1
}
58
59
1
TEST(BlockingQueueTest, Test1) {
60
1
  std::thread inserter_thread(InsertSomeThings);
61
1
  int32_t i;
62
1
  ASSERT_TRUE(test1_queue.BlockingGet(&i));
63
1
  ASSERT_EQ(1, i);
64
1
  ASSERT_TRUE(test1_queue.BlockingGet(&i));
65
1
  ASSERT_EQ(2, i);
66
1
  ASSERT_TRUE(test1_queue.BlockingGet(&i));
67
1
  ASSERT_EQ(3, i);
68
1
  inserter_thread.join();
69
1
}
70
71
1
TEST(BlockingQueueTest, TestBlockingDrainTo) {
72
1
  BlockingQueue<int32_t> test_queue(3);
73
1
  ASSERT_EQ(test_queue.Put(1), QUEUE_SUCCESS);
74
1
  ASSERT_EQ(test_queue.Put(2), QUEUE_SUCCESS);
75
1
  ASSERT_EQ(test_queue.Put(3), QUEUE_SUCCESS);
76
1
  vector<int32_t> out;
77
1
  ASSERT_TRUE(test_queue.BlockingDrainTo(&out));
78
1
  ASSERT_EQ(1, out[0]);
79
1
  ASSERT_EQ(2, out[1]);
80
1
  ASSERT_EQ(3, out[2]);
81
1
}
82
83
1
TEST(BlockingQueueTest, TestTooManyInsertions) {
84
1
  BlockingQueue<int32_t> test_queue(2);
85
1
  ASSERT_EQ(test_queue.Put(123), QUEUE_SUCCESS);
86
1
  ASSERT_EQ(test_queue.Put(123), QUEUE_SUCCESS);
87
1
  ASSERT_EQ(test_queue.Put(123), QUEUE_FULL);
88
1
}
89
90
namespace {
91
92
struct LengthLogicalSize {
93
2
  static size_t logical_size(const string& s) {
94
2
    return s.length();
95
2
  }
96
};
97
98
} // anonymous namespace
99
100
1
TEST(BlockingQueueTest, TestLogicalSize) {
101
1
  BlockingQueue<string, LengthLogicalSize> test_queue(4);
102
1
  ASSERT_EQ(test_queue.Put("a"), QUEUE_SUCCESS);
103
1
  ASSERT_EQ(test_queue.Put("bcd"), QUEUE_SUCCESS);
104
1
  ASSERT_EQ(test_queue.Put("e"), QUEUE_FULL);
105
1
}
106
107
1
TEST(BlockingQueueTest, TestNonPointerParamsMayBeNonEmptyOnDestruct) {
108
1
  BlockingQueue<int32_t> test_queue(1);
109
1
  ASSERT_EQ(test_queue.Put(123), QUEUE_SUCCESS);
110
  // No DCHECK failure on destruct.
111
1
}
112
113
#ifndef NDEBUG
114
2
TEST(BlockingQueueDeathTest, TestPointerParamsMustBeEmptyOnDestruct) {
115
2
  ::testing::FLAGS_gtest_death_test_style = "threadsafe";
116
2
  ASSERT_DEATH({
117
2
      BlockingQueue<int32_t*> test_queue(1);
118
2
      int32_t element = 123;
119
2
      ASSERT_EQ(test_queue.Put(&element), QUEUE_SUCCESS);
120
      // Debug assertion triggered on queue destruction since type is a pointer.
121
2
    },
122
2
    "BlockingQueue holds bare pointers");
123
2
}
124
#endif // NDEBUG
125
126
1
TEST(BlockingQueueTest, TestGetFromShutdownQueue) {
127
1
  BlockingQueue<int64_t> test_queue(2);
128
1
  ASSERT_EQ(test_queue.Put(123), QUEUE_SUCCESS);
129
1
  test_queue.Shutdown();
130
1
  ASSERT_EQ(test_queue.Put(456), QUEUE_SHUTDOWN);
131
1
  int64_t i;
132
1
  ASSERT_TRUE(test_queue.BlockingGet(&i));
133
1
  ASSERT_EQ(123, i);
134
1
  ASSERT_FALSE(test_queue.BlockingGet(&i));
135
1
}
136
137
1
TEST(BlockingQueueTest, TestGscopedPtrMethods) {
138
1
  BlockingQueue<int*> test_queue(2);
139
1
  std::unique_ptr<int> input_int(new int(123));
140
1
  ASSERT_EQ(test_queue.Put(&input_int), QUEUE_SUCCESS);
141
1
  std::unique_ptr<int> output_int;
142
1
  ASSERT_TRUE(test_queue.BlockingGet(&output_int));
143
1
  ASSERT_EQ(123, *output_int.get());
144
1
  test_queue.Shutdown();
145
1
}
146
147
class MultiThreadTest {
148
 public:
149
  typedef std::vector<std::thread> thread_vec_t;
150
151
  MultiThreadTest()
152
      : puts_(4),
153
        blocking_puts_(4),
154
        nthreads_(5),
155
        queue_(nthreads_ * puts_),
156
        num_inserters_(nthreads_),
157
1
        sync_latch_(nthreads_) {
158
1
  }
159
160
5
  void InserterThread(int arg) {
161
25
    for (int i = 0; i < puts_; i++) {
162
20
      ASSERT_EQ(queue_.Put(arg), QUEUE_SUCCESS);
163
20
    }
164
5
    sync_latch_.CountDown();
165
5
    sync_latch_.Wait();
166
25
    for (int i = 0; i < blocking_puts_; i++) {
167
20
      ASSERT_TRUE(queue_.BlockingPut(arg));
168
20
    }
169
5
    MutexLock guard(lock_);
170
5
    if (--num_inserters_ == 0) {
171
1
      queue_.Shutdown();
172
1
    }
173
5
  }
174
175
6
  void RemoverThread() {
176
54
    for (int i = 0; i < puts_ + blocking_puts_; i++) {
177
48
      int32_t arg = 0;
178
48
      bool got = queue_.BlockingGet(&arg);
179
48
      if (!got) {
180
8
        arg = -1;
181
8
      }
182
48
      MutexLock guard(lock_);
183
48
      gotten_[arg] = gotten_[arg] + 1;
184
48
    }
185
6
  }
186
187
1
  void Run() {
188
6
    for (int i = 0; i < nthreads_; i++) {
189
5
      threads_.emplace_back(std::bind(&MultiThreadTest::InserterThread, this, i));
190
5
      threads_.emplace_back(std::bind(&MultiThreadTest::RemoverThread, this));
191
5
    }
192
    // We add an extra thread to ensure that there aren't enough elements in
193
    // the queue to go around.  This way, we test removal after Shutdown.
194
1
    threads_.emplace_back(std::bind(&MultiThreadTest::RemoverThread, this));
195
11
    for (auto& thread : threads_) {
196
11
      thread.join();
197
11
    }
198
    // Let's check to make sure we got what we should have.
199
1
    MutexLock guard(lock_);
200
6
    for (int i = 0; i < nthreads_; i++) {
201
5
      ASSERT_EQ(puts_ + blocking_puts_, gotten_[i]);
202
5
    }
203
    // And there were nthreads_ * (puts_ + blocking_puts_)
204
    // elements removed, but only nthreads_ * puts_ +
205
    // blocking_puts_ elements added.  So some removers hit the
206
    // shutdown case.
207
1
    ASSERT_EQ(puts_ + blocking_puts_, gotten_[-1]);
208
1
  }
209
210
  int puts_;
211
  int blocking_puts_;
212
  int nthreads_;
213
  BlockingQueue<int32_t> queue_;
214
  Mutex lock_;
215
  std::map<int32_t, int> gotten_;
216
  thread_vec_t threads_;
217
  int num_inserters_;
218
  CountDownLatch sync_latch_;
219
};
220
221
1
TEST(BlockingQueueTest, TestMultipleThreads) {
222
1
  MultiThreadTest test;
223
1
  test.Run();
224
1
}
225
226
}  // namespace yb