YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/util/blocking_queue.h
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
#ifndef YB_UTIL_BLOCKING_QUEUE_H
33
#define YB_UTIL_BLOCKING_QUEUE_H
34
35
#include <unistd.h>
36
37
#include <list>
38
#include <string>
39
#include <type_traits>
40
#include <vector>
41
42
#include "yb/util/condition_variable.h"
43
#include "yb/util/mutex.h"
44
45
namespace yb {
46
47
// Return values for BlockingQueue::Put()
48
enum QueueStatus {
49
  QUEUE_SUCCESS = 0,
50
  QUEUE_SHUTDOWN = 1,
51
  QUEUE_FULL = 2
52
};
53
54
// Default logical length implementation: always returns 1.
55
struct DefaultLogicalSize {
56
  template<typename T>
57
50.1M
  static size_t logical_size(const T& /* unused */) {
58
50.1M
    return 1;
59
50.1M
  }
unsigned long yb::DefaultLogicalSize::logical_size<std::__1::pair<yb::Schema, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > > >(std::__1::pair<yb::Schema, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > > const&)
Line
Count
Source
57
8.75k
  static size_t logical_size(const T& /* unused */) {
58
8.75k
    return 1;
59
8.75k
  }
unsigned long yb::DefaultLogicalSize::logical_size<yb::log::LogEntryBatch*>(yb::log::LogEntryBatch* const&)
Line
Count
Source
57
50.1M
  static size_t logical_size(const T& /* unused */) {
58
50.1M
    return 1;
59
50.1M
  }
60
};
61
62
template <typename T, class LOGICAL_SIZE = DefaultLogicalSize>
63
class BlockingQueue {
64
 public:
65
  // If T is a pointer, this will be the base type.  If T is not a pointer, you
66
  // can ignore this and the functions which make use of it.
67
  // Template substitution failure is not an error.
68
  typedef typename std::remove_pointer<T>::type T_VAL;
69
70
  explicit BlockingQueue(size_t max_size)
71
    : shutdown_(false),
72
      size_(0),
73
      max_size_(max_size),
74
      not_empty_(&lock_),
75
152k
      not_full_(&lock_) {
76
152k
  }
yb::BlockingQueue<std::__1::pair<yb::Schema, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > >, yb::DefaultLogicalSize>::BlockingQueue(unsigned long)
Line
Count
Source
75
1.96k
      not_full_(&lock_) {
76
1.96k
  }
yb::BlockingQueue<yb::log::LogEntryBatch*, yb::DefaultLogicalSize>::BlockingQueue(unsigned long)
Line
Count
Source
75
150k
      not_full_(&lock_) {
76
150k
  }
77
78
  // If the queue holds a bare pointer, it must be empty on destruction, since
79
  // it may have ownership of the pointer.
80
78.4k
  ~BlockingQueue() {
81
78.4k
    DCHECK(list_.empty() || !std::is_pointer<T>::value)
82
8
        << "BlockingQueue holds bare pointers at destruction time";
83
78.4k
  }
yb::BlockingQueue<std::__1::pair<yb::Schema, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > >, yb::DefaultLogicalSize>::~BlockingQueue()
Line
Count
Source
80
1.96k
  ~BlockingQueue() {
81
1.96k
    DCHECK(list_.empty() || !std::is_pointer<T>::value)
82
0
        << "BlockingQueue holds bare pointers at destruction time";
83
1.96k
  }
yb::BlockingQueue<yb::log::LogEntryBatch*, yb::DefaultLogicalSize>::~BlockingQueue()
Line
Count
Source
80
76.5k
  ~BlockingQueue() {
81
76.5k
    DCHECK(list_.empty() || !std::is_pointer<T>::value)
82
8
        << "BlockingQueue holds bare pointers at destruction time";
83
76.5k
  }
84
85
  // Get an element from the queue. Returns false if we were shut down prior to
86
  // getting the element.
87
12.2k
  bool BlockingGet(T *out) {
88
12.2k
    MutexLock l(lock_);
89
12.2k
    while (true) {
90
12.2k
      if (!list_.empty()) {
91
4.37k
        *out = list_.front();
92
4.37k
        list_.pop_front();
93
4.37k
        decrement_size_unlocked(*out);
94
4.37k
        not_full_.Signal();
95
4.37k
        return true;
96
4.37k
      }
97
7.86k
      if (shutdown_) {
98
7.86k
        return false;
99
7.86k
      }
100
0
      not_empty_.Wait();
101
0
    }
102
12.2k
  }
103
104
  // Get an element from the queue. Returns false if the queue is empty and
105
  // we were shut down prior to getting the element.
106
  bool BlockingGet(std::unique_ptr<T_VAL> *out) {
107
    T t = NULL;
108
    bool got_element = BlockingGet(&t);
109
    if (!got_element) {
110
      return false;
111
    }
112
    out->reset(t);
113
    return true;
114
  }
115
116
  // Get all elements from the queue and append them to a
117
  // vector. Returns false if shutdown prior to getting the elements.
118
  bool BlockingDrainTo(std::vector<T>* out) {
119
    return BlockingDrainTo(out, MonoTime::kMax);
120
  }
121
122
25.2M
  bool BlockingDrainTo(std::vector<T>* out, const MonoTime& wait_timeout_deadline) {
123
25.2M
    MutexLock l(lock_);
124
49.3M
    while (true) {
125
49.2M
      if (!list_.empty()) {
126
24.9M
        out->reserve(list_.size());
127
25.0M
        for (const T& elt : list_) {
128
25.0M
          out->push_back(elt);
129
25.0M
          decrement_size_unlocked(elt);
130
25.0M
        }
131
24.9M
        list_.clear();
132
24.9M
        not_full_.Signal();
133
24.9M
        return true;
134
24.9M
      }
135
24.3M
      if (shutdown_) {
136
30.6k
        return false;
137
30.6k
      }
138
24.3M
      if (!not_empty_.WaitUntil(wait_timeout_deadline)) {
139
282k
        return true;
140
282k
      }
141
24.3M
    }
142
25.2M
  }
143
144
  // Attempts to put the given value in the queue.
145
  // Returns:
146
  //   QUEUE_SUCCESS: if successfully inserted
147
  //   QUEUE_FULL: if the queue has reached max_size
148
  //   QUEUE_SHUTDOWN: if someone has already called Shutdown()
149
4.37k
  QueueStatus Put(const T &val) {
150
4.37k
    MutexLock l(lock_);
151
4.37k
    if (size_ >= max_size_) {
152
0
      return QUEUE_FULL;
153
0
    }
154
4.37k
    if (shutdown_) {
155
0
      return QUEUE_SHUTDOWN;
156
0
    }
157
4.37k
    list_.push_back(val);
158
4.37k
    increment_size_unlocked(val);
159
4.37k
    l.Unlock();
160
4.37k
    not_empty_.Signal();
161
4.37k
    return QUEUE_SUCCESS;
162
4.37k
  }
163
164
  // Returns the same as the other Put() overload above.
165
  // If the element was inserted, the std::unique_ptr releases its contents.
166
  QueueStatus Put(std::unique_ptr<T_VAL> *val) {
167
    QueueStatus s = Put(val->get());
168
    if (s == QUEUE_SUCCESS) {
169
      val->release();
170
    }
171
    return s;
172
  }
173
174
  // Gets an element for the queue; if the queue is full, blocks until
175
  // space becomes available. Returns false if we were shutdown prior
176
  // to enqueueing the element.
177
25.0M
  bool BlockingPut(const T& val) {
178
25.0M
    MutexLock l(lock_);
179
25.0M
    while (
true25.0M
) {
180
25.0M
      if (shutdown_) {
181
0
        return false;
182
0
      }
183
25.0M
      
if (25.0M
size_ < max_size_25.0M
) {
184
25.0M
        list_.push_back(val);
185
25.0M
        increment_size_unlocked(val);
186
25.0M
        l.Unlock();
187
25.0M
        not_empty_.Signal();
188
25.0M
        return true;
189
25.0M
      }
190
18.4E
      not_full_.Wait();
191
18.4E
    }
192
25.0M
  }
193
194
  // Same as other BlockingPut() overload above. If the element was
195
  // enqueued, std::unique_ptr releases its contents.
196
  bool BlockingPut(std::unique_ptr<T_VAL>* val) {
197
    bool ret = Put(val->get());
198
    if (ret) {
199
      val->release();
200
    }
201
    return ret;
202
  }
203
204
  // Shut down the queue.
205
  // When a blocking queue is shut down, no more elements can be added to it,
206
  // and Put() will return QUEUE_SHUTDOWN.
207
  // Existing elements will drain out of it, and then BlockingGet will start
208
  // returning false.
209
154k
  void Shutdown() {
210
154k
    MutexLock l(lock_);
211
154k
    shutdown_ = true;
212
154k
    not_full_.Broadcast();
213
154k
    not_empty_.Broadcast();
214
154k
  }
yb::BlockingQueue<std::__1::pair<yb::Schema, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > >, yb::DefaultLogicalSize>::Shutdown()
Line
Count
Source
209
1.96k
  void Shutdown() {
210
1.96k
    MutexLock l(lock_);
211
1.96k
    shutdown_ = true;
212
1.96k
    not_full_.Broadcast();
213
1.96k
    not_empty_.Broadcast();
214
1.96k
  }
yb::BlockingQueue<yb::log::LogEntryBatch*, yb::DefaultLogicalSize>::Shutdown()
Line
Count
Source
209
152k
  void Shutdown() {
210
152k
    MutexLock l(lock_);
211
152k
    shutdown_ = true;
212
152k
    not_full_.Broadcast();
213
152k
    not_empty_.Broadcast();
214
152k
  }
215
216
389k
  bool empty() const {
217
389k
    MutexLock l(lock_);
218
389k
    return list_.empty();
219
389k
  }
220
221
0
  size_t max_size() const {
222
0
    return max_size_;
223
0
  }
224
225
0
  std::string ToString() const {
226
0
    std::string ret;
227
228
0
    MutexLock l(lock_);
229
0
    for (const T& t : list_) {
230
0
      ret.append(t->ToString());
231
0
      ret.append("\n");
232
0
    }
233
0
    return ret;
234
0
  }
235
236
 private:
237
238
  // Increments queue size. Must be called when 'lock_' is held.
239
25.0M
  void increment_size_unlocked(const T& t) {
240
25.0M
    size_ += LOGICAL_SIZE::logical_size(t);
241
25.0M
  }
yb::BlockingQueue<std::__1::pair<yb::Schema, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > >, yb::DefaultLogicalSize>::increment_size_unlocked(std::__1::pair<yb::Schema, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > > const&)
Line
Count
Source
239
4.37k
  void increment_size_unlocked(const T& t) {
240
4.37k
    size_ += LOGICAL_SIZE::logical_size(t);
241
4.37k
  }
yb::BlockingQueue<yb::log::LogEntryBatch*, yb::DefaultLogicalSize>::increment_size_unlocked(yb::log::LogEntryBatch* const&)
Line
Count
Source
239
25.0M
  void increment_size_unlocked(const T& t) {
240
25.0M
    size_ += LOGICAL_SIZE::logical_size(t);
241
25.0M
  }
242
243
  // Decrements queue size. Must be called when 'lock_' is held.
244
25.0M
  void decrement_size_unlocked(const T& t) {
245
25.0M
    size_ -= LOGICAL_SIZE::logical_size(t);
246
25.0M
  }
yb::BlockingQueue<std::__1::pair<yb::Schema, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > >, yb::DefaultLogicalSize>::decrement_size_unlocked(std::__1::pair<yb::Schema, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > > const&)
Line
Count
Source
244
4.37k
  void decrement_size_unlocked(const T& t) {
245
4.37k
    size_ -= LOGICAL_SIZE::logical_size(t);
246
4.37k
  }
yb::BlockingQueue<yb::log::LogEntryBatch*, yb::DefaultLogicalSize>::decrement_size_unlocked(yb::log::LogEntryBatch* const&)
Line
Count
Source
244
25.0M
  void decrement_size_unlocked(const T& t) {
245
25.0M
    size_ -= LOGICAL_SIZE::logical_size(t);
246
25.0M
  }
247
248
  bool shutdown_;
249
  size_t size_;
250
  size_t max_size_;
251
  mutable Mutex lock_;
252
  ConditionVariable not_empty_;
253
  ConditionVariable not_full_;
254
  std::list<T> list_;
255
};
256
257
} // namespace yb
258
259
#endif  // YB_UTIL_BLOCKING_QUEUE_H