YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/util/blocking_queue.h
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
#ifndef YB_UTIL_BLOCKING_QUEUE_H
33
#define YB_UTIL_BLOCKING_QUEUE_H
34
35
#include <unistd.h>
36
37
#include <list>
38
#include <string>
39
#include <type_traits>
40
#include <vector>
41
42
#include "yb/util/condition_variable.h"
43
#include "yb/util/mutex.h"
44
45
namespace yb {
46
47
// Return values for BlockingQueue::Put()
48
enum QueueStatus {
49
  QUEUE_SUCCESS = 0,
50
  QUEUE_SHUTDOWN = 1,
51
  QUEUE_FULL = 2
52
};
53
54
// Default logical length implementation: always returns 1.
55
struct DefaultLogicalSize {
56
  template<typename T>
57
27.9M
  static size_t logical_size(const T& /* unused */) {
58
27.9M
    return 1;
59
27.9M
  }
_ZN2yb18DefaultLogicalSize12logical_sizeIxEEmRKT_
Line
Count
Source
57
169k
  static size_t logical_size(const T& /* unused */) {
58
169k
    return 1;
59
169k
  }
_ZN2yb18DefaultLogicalSize12logical_sizeIiEEmRKT_
Line
Count
Source
57
95
  static size_t logical_size(const T& /* unused */) {
58
95
    return 1;
59
95
  }
_ZN2yb18DefaultLogicalSize12logical_sizeIPiEEmRKT_
Line
Count
Source
57
29
  static size_t logical_size(const T& /* unused */) {
58
29
    return 1;
59
29
  }
_ZN2yb18DefaultLogicalSize12logical_sizeIPNS_3log13LogEntryBatchEEEmRKT_
Line
Count
Source
57
27.8M
  static size_t logical_size(const T& /* unused */) {
58
27.8M
    return 1;
59
27.8M
  }
_ZN2yb18DefaultLogicalSize12logical_sizeINSt3__14pairINS_6SchemaENS2_12basic_stringIcNS2_11char_traitsIcEENS2_9allocatorIcEEEEEEEEmRKT_
Line
Count
Source
57
8.39k
  static size_t logical_size(const T& /* unused */) {
58
8.39k
    return 1;
59
8.39k
  }
60
};
61
62
template <typename T, class LOGICAL_SIZE = DefaultLogicalSize>
63
class BlockingQueue {
64
 public:
65
  // If T is a pointer, this will be the base type.  If T is not a pointer, you
66
  // can ignore this and the functions which make use of it.
67
  // Template substitution failure is not an error.
68
  typedef typename std::remove_pointer<T>::type T_VAL;
69
70
  explicit BlockingQueue(size_t max_size)
71
    : shutdown_(false),
72
      size_(0),
73
      max_size_(max_size),
74
      not_empty_(&lock_),
75
91.3k
      not_full_(&lock_) {
76
91.3k
  }
_ZN2yb13BlockingQueueIxNS_18DefaultLogicalSizeEEC2Em
Line
Count
Source
75
3
      not_full_(&lock_) {
76
3
  }
_ZN2yb13BlockingQueueIiNS_18DefaultLogicalSizeEEC2Em
Line
Count
Source
75
14
      not_full_(&lock_) {
76
14
  }
blocking_queue-test.cc:_ZN2yb13BlockingQueueINSt3__112basic_stringIcNS1_11char_traitsIcEENS1_9allocatorIcEEEENS_12_GLOBAL__N_117LengthLogicalSizeEEC2Em
Line
Count
Source
75
1
      not_full_(&lock_) {
76
1
  }
_ZN2yb13BlockingQueueIPiNS_18DefaultLogicalSizeEEC2Em
Line
Count
Source
75
5
      not_full_(&lock_) {
76
5
  }
_ZN2yb13BlockingQueueIPNS_3log13LogEntryBatchENS_18DefaultLogicalSizeEEC2Em
Line
Count
Source
75
89.4k
      not_full_(&lock_) {
76
89.4k
  }
_ZN2yb13BlockingQueueINSt3__14pairINS_6SchemaENS1_12basic_stringIcNS1_11char_traitsIcEENS1_9allocatorIcEEEEEENS_18DefaultLogicalSizeEEC2Em
Line
Count
Source
75
1.87k
      not_full_(&lock_) {
76
1.87k
  }
77
78
  // If the queue holds a bare pointer, it must be empty on destruction, since
79
  // it may have ownership of the pointer.
80
50.5k
  ~BlockingQueue() {
81
18.4E
    DCHECK(list_.empty() || !std::is_pointer<T>::value)
82
18.4E
        << "BlockingQueue holds bare pointers at destruction time";
83
50.5k
  }
_ZN2yb13BlockingQueueIxNS_18DefaultLogicalSizeEED2Ev
Line
Count
Source
80
3
  ~BlockingQueue() {
81
0
    DCHECK(list_.empty() || !std::is_pointer<T>::value)
82
0
        << "BlockingQueue holds bare pointers at destruction time";
83
3
  }
_ZN2yb13BlockingQueueIiNS_18DefaultLogicalSizeEED2Ev
Line
Count
Source
80
13
  ~BlockingQueue() {
81
0
    DCHECK(list_.empty() || !std::is_pointer<T>::value)
82
0
        << "BlockingQueue holds bare pointers at destruction time";
83
13
  }
blocking_queue-test.cc:_ZN2yb13BlockingQueueINSt3__112basic_stringIcNS1_11char_traitsIcEENS1_9allocatorIcEEEENS_12_GLOBAL__N_117LengthLogicalSizeEED2Ev
Line
Count
Source
80
1
  ~BlockingQueue() {
81
0
    DCHECK(list_.empty() || !std::is_pointer<T>::value)
82
0
        << "BlockingQueue holds bare pointers at destruction time";
83
1
  }
_ZN2yb13BlockingQueueIPiNS_18DefaultLogicalSizeEED2Ev
Line
Count
Source
80
5
  ~BlockingQueue() {
81
1
    DCHECK(list_.empty() || !std::is_pointer<T>::value)
82
1
        << "BlockingQueue holds bare pointers at destruction time";
83
5
  }
_ZN2yb13BlockingQueueIPNS_3log13LogEntryBatchENS_18DefaultLogicalSizeEED2Ev
Line
Count
Source
80
48.6k
  ~BlockingQueue() {
81
18.4E
    DCHECK(list_.empty() || !std::is_pointer<T>::value)
82
18.4E
        << "BlockingQueue holds bare pointers at destruction time";
83
48.6k
  }
_ZN2yb13BlockingQueueINSt3__14pairINS_6SchemaENS1_12basic_stringIcNS1_11char_traitsIcEENS1_9allocatorIcEEEEEENS_18DefaultLogicalSizeEED2Ev
Line
Count
Source
80
1.87k
  ~BlockingQueue() {
81
0
    DCHECK(list_.empty() || !std::is_pointer<T>::value)
82
0
        << "BlockingQueue holds bare pointers at destruction time";
83
1.87k
  }
84
85
  // Get an element from the queue. Returns false if we were shut down prior to
86
  // getting the element.
87
96.5k
  bool BlockingGet(T *out) {
88
96.5k
    MutexLock l(lock_);
89
97.9k
    while (true) {
90
97.9k
      if (!list_.empty()) {
91
89.0k
        *out = list_.front();
92
89.0k
        list_.pop_front();
93
89.0k
        decrement_size_unlocked(*out);
94
89.0k
        not_full_.Signal();
95
89.0k
        return true;
96
89.0k
      }
97
8.87k
      if (shutdown_) {
98
7.51k
        return false;
99
7.51k
      }
100
1.36k
      not_empty_.Wait();
101
1.36k
    }
102
96.5k
  }
_ZN2yb13BlockingQueueIxNS_18DefaultLogicalSizeEE11BlockingGetEPx
Line
Count
Source
87
84.8k
  bool BlockingGet(T *out) {
88
84.8k
    MutexLock l(lock_);
89
86.1k
    while (true) {
90
86.1k
      if (!list_.empty()) {
91
84.8k
        *out = list_.front();
92
84.8k
        list_.pop_front();
93
84.8k
        decrement_size_unlocked(*out);
94
84.8k
        not_full_.Signal();
95
84.8k
        return true;
96
84.8k
      }
97
1.34k
      if (shutdown_) {
98
3
        return false;
99
3
      }
100
1.34k
      not_empty_.Wait();
101
1.34k
    }
102
84.8k
  }
_ZN2yb13BlockingQueueIiNS_18DefaultLogicalSizeEE11BlockingGetEPi
Line
Count
Source
87
51
  bool BlockingGet(T *out) {
88
51
    MutexLock l(lock_);
89
71
    while (true) {
90
71
      if (!list_.empty()) {
91
43
        *out = list_.front();
92
43
        list_.pop_front();
93
43
        decrement_size_unlocked(*out);
94
43
        not_full_.Signal();
95
43
        return true;
96
43
      }
97
28
      if (shutdown_) {
98
8
        return false;
99
8
      }
100
20
      not_empty_.Wait();
101
20
    }
102
51
  }
_ZN2yb13BlockingQueueIPiNS_18DefaultLogicalSizeEE11BlockingGetEPS1_
Line
Count
Source
87
1
  bool BlockingGet(T *out) {
88
1
    MutexLock l(lock_);
89
1
    while (true) {
90
1
      if (!list_.empty()) {
91
1
        *out = list_.front();
92
1
        list_.pop_front();
93
1
        decrement_size_unlocked(*out);
94
1
        not_full_.Signal();
95
1
        return true;
96
1
      }
97
0
      if (shutdown_) {
98
0
        return false;
99
0
      }
100
0
      not_empty_.Wait();
101
0
    }
102
1
  }
_ZN2yb13BlockingQueueINSt3__14pairINS_6SchemaENS1_12basic_stringIcNS1_11char_traitsIcEENS1_9allocatorIcEEEEEENS_18DefaultLogicalSizeEE11BlockingGetEPSA_
Line
Count
Source
87
11.6k
  bool BlockingGet(T *out) {
88
11.6k
    MutexLock l(lock_);
89
11.6k
    while (true) {
90
11.6k
      if (!list_.empty()) {
91
4.19k
        *out = list_.front();
92
4.19k
        list_.pop_front();
93
4.19k
        decrement_size_unlocked(*out);
94
4.19k
        not_full_.Signal();
95
4.19k
        return true;
96
4.19k
      }
97
7.50k
      if (shutdown_) {
98
7.50k
        return false;
99
7.50k
      }
100
0
      not_empty_.Wait();
101
0
    }
102
11.6k
  }
103
104
  // Get an element from the queue. Returns false if the queue is empty and
105
  // we were shut down prior to getting the element.
106
1
  bool BlockingGet(std::unique_ptr<T_VAL> *out) {
107
1
    T t = NULL;
108
1
    bool got_element = BlockingGet(&t);
109
1
    if (!got_element) {
110
0
      return false;
111
0
    }
112
1
    out->reset(t);
113
1
    return true;
114
1
  }
115
116
  // Get all elements from the queue and append them to a
117
  // vector. Returns false if shutdown prior to getting the elements.
118
1
  bool BlockingDrainTo(std::vector<T>* out) {
119
1
    return BlockingDrainTo(out, MonoTime::kMax);
120
1
  }
121
122
14.0M
  bool BlockingDrainTo(std::vector<T>* out, const MonoTime& wait_timeout_deadline) {
123
14.0M
    MutexLock l(lock_);
124
27.3M
    while (true) {
125
27.3M
      if (!list_.empty()) {
126
13.8M
        out->reserve(list_.size());
127
13.9M
        for (const T& elt : list_) {
128
13.9M
          out->push_back(elt);
129
13.9M
          decrement_size_unlocked(elt);
130
13.9M
        }
131
13.8M
        list_.clear();
132
13.8M
        not_full_.Signal();
133
13.8M
        return true;
134
13.8M
      }
135
13.5M
      if (shutdown_) {
136
20.3k
        return false;
137
20.3k
      }
138
13.4M
      if (!not_empty_.WaitUntil(wait_timeout_deadline)) {
139
151k
        return true;
140
151k
      }
141
13.4M
    }
142
14.0M
  }
_ZN2yb13BlockingQueueIiNS_18DefaultLogicalSizeEE15BlockingDrainToEPNSt3__16vectorIiNS3_9allocatorIiEEEERKNS_8MonoTimeE
Line
Count
Source
122
1
  bool BlockingDrainTo(std::vector<T>* out, const MonoTime& wait_timeout_deadline) {
123
1
    MutexLock l(lock_);
124
1
    while (true) {
125
1
      if (!list_.empty()) {
126
1
        out->reserve(list_.size());
127
3
        for (const T& elt : list_) {
128
3
          out->push_back(elt);
129
3
          decrement_size_unlocked(elt);
130
3
        }
131
1
        list_.clear();
132
1
        not_full_.Signal();
133
1
        return true;
134
1
      }
135
0
      if (shutdown_) {
136
0
        return false;
137
0
      }
138
0
      if (!not_empty_.WaitUntil(wait_timeout_deadline)) {
139
0
        return true;
140
0
      }
141
0
    }
142
1
  }
_ZN2yb13BlockingQueueIPiNS_18DefaultLogicalSizeEE15BlockingDrainToEPNSt3__16vectorIS1_NS4_9allocatorIS1_EEEERKNS_8MonoTimeE
Line
Count
Source
122
6
  bool BlockingDrainTo(std::vector<T>* out, const MonoTime& wait_timeout_deadline) {
123
6
    MutexLock l(lock_);
124
6
    while (true) {
125
6
      if (!list_.empty()) {
126
3
        out->reserve(list_.size());
127
13
        for (const T& elt : list_) {
128
13
          out->push_back(elt);
129
13
          decrement_size_unlocked(elt);
130
13
        }
131
3
        list_.clear();
132
3
        not_full_.Signal();
133
3
        return true;
134
3
      }
135
3
      if (shutdown_) {
136
0
        return false;
137
0
      }
138
3
      if (!not_empty_.WaitUntil(wait_timeout_deadline)) {
139
3
        return true;
140
3
      }
141
3
    }
142
6
  }
_ZN2yb13BlockingQueueIPNS_3log13LogEntryBatchENS_18DefaultLogicalSizeEE15BlockingDrainToEPNSt3__16vectorIS3_NS6_9allocatorIS3_EEEERKNS_8MonoTimeE
Line
Count
Source
122
14.0M
  bool BlockingDrainTo(std::vector<T>* out, const MonoTime& wait_timeout_deadline) {
123
14.0M
    MutexLock l(lock_);
124
27.3M
    while (true) {
125
27.3M
      if (!list_.empty()) {
126
13.8M
        out->reserve(list_.size());
127
13.9M
        for (const T& elt : list_) {
128
13.9M
          out->push_back(elt);
129
13.9M
          decrement_size_unlocked(elt);
130
13.9M
        }
131
13.8M
        list_.clear();
132
13.8M
        not_full_.Signal();
133
13.8M
        return true;
134
13.8M
      }
135
13.5M
      if (shutdown_) {
136
20.3k
        return false;
137
20.3k
      }
138
13.4M
      if (!not_empty_.WaitUntil(wait_timeout_deadline)) {
139
151k
        return true;
140
151k
      }
141
13.4M
    }
142
14.0M
  }
143
144
  // Attempts to put the given value in the queue.
145
  // Returns:
146
  //   QUEUE_SUCCESS: if successfully inserted
147
  //   QUEUE_FULL: if the queue has reached max_size
148
  //   QUEUE_SHUTDOWN: if someone has already called Shutdown()
149
89.0k
  QueueStatus Put(const T &val) {
150
89.0k
    MutexLock l(lock_);
151
89.0k
    if (size_ >= max_size_) {
152
2
      return QUEUE_FULL;
153
2
    }
154
89.0k
    if (shutdown_) {
155
1
      return QUEUE_SHUTDOWN;
156
1
    }
157
89.0k
    list_.push_back(val);
158
89.0k
    increment_size_unlocked(val);
159
89.0k
    l.Unlock();
160
89.0k
    not_empty_.Signal();
161
89.0k
    return QUEUE_SUCCESS;
162
89.0k
  }
_ZN2yb13BlockingQueueIxNS_18DefaultLogicalSizeEE3PutERKx
Line
Count
Source
149
84.8k
  QueueStatus Put(const T &val) {
150
84.8k
    MutexLock l(lock_);
151
84.8k
    if (size_ >= max_size_) {
152
0
      return QUEUE_FULL;
153
0
    }
154
84.8k
    if (shutdown_) {
155
1
      return QUEUE_SHUTDOWN;
156
1
    }
157
84.8k
    list_.push_back(val);
158
84.8k
    increment_size_unlocked(val);
159
84.8k
    l.Unlock();
160
84.8k
    not_empty_.Signal();
161
84.8k
    return QUEUE_SUCCESS;
162
84.8k
  }
_ZN2yb13BlockingQueueIiNS_18DefaultLogicalSizeEE3PutERKi
Line
Count
Source
149
30
  QueueStatus Put(const T &val) {
150
30
    MutexLock l(lock_);
151
30
    if (size_ >= max_size_) {
152
1
      return QUEUE_FULL;
153
1
    }
154
29
    if (shutdown_) {
155
0
      return QUEUE_SHUTDOWN;
156
0
    }
157
29
    list_.push_back(val);
158
29
    increment_size_unlocked(val);
159
29
    l.Unlock();
160
29
    not_empty_.Signal();
161
29
    return QUEUE_SUCCESS;
162
29
  }
blocking_queue-test.cc:_ZN2yb13BlockingQueueINSt3__112basic_stringIcNS1_11char_traitsIcEENS1_9allocatorIcEEEENS_12_GLOBAL__N_117LengthLogicalSizeEE3PutERKS7_
Line
Count
Source
149
3
  QueueStatus Put(const T &val) {
150
3
    MutexLock l(lock_);
151
3
    if (size_ >= max_size_) {
152
1
      return QUEUE_FULL;
153
1
    }
154
2
    if (shutdown_) {
155
0
      return QUEUE_SHUTDOWN;
156
0
    }
157
2
    list_.push_back(val);
158
2
    increment_size_unlocked(val);
159
2
    l.Unlock();
160
2
    not_empty_.Signal();
161
2
    return QUEUE_SUCCESS;
162
2
  }
_ZN2yb13BlockingQueueIPiNS_18DefaultLogicalSizeEE3PutERKS1_
Line
Count
Source
149
2
  QueueStatus Put(const T &val) {
150
2
    MutexLock l(lock_);
151
2
    if (size_ >= max_size_) {
152
0
      return QUEUE_FULL;
153
0
    }
154
2
    if (shutdown_) {
155
0
      return QUEUE_SHUTDOWN;
156
0
    }
157
2
    list_.push_back(val);
158
2
    increment_size_unlocked(val);
159
2
    l.Unlock();
160
2
    not_empty_.Signal();
161
2
    return QUEUE_SUCCESS;
162
2
  }
_ZN2yb13BlockingQueueINSt3__14pairINS_6SchemaENS1_12basic_stringIcNS1_11char_traitsIcEENS1_9allocatorIcEEEEEENS_18DefaultLogicalSizeEE3PutERKSA_
Line
Count
Source
149
4.19k
  QueueStatus Put(const T &val) {
150
4.19k
    MutexLock l(lock_);
151
4.19k
    if (size_ >= max_size_) {
152
0
      return QUEUE_FULL;
153
0
    }
154
4.19k
    if (shutdown_) {
155
0
      return QUEUE_SHUTDOWN;
156
0
    }
157
4.19k
    list_.push_back(val);
158
4.19k
    increment_size_unlocked(val);
159
4.19k
    l.Unlock();
160
4.19k
    not_empty_.Signal();
161
4.19k
    return QUEUE_SUCCESS;
162
4.19k
  }
163
164
  // Returns the same as the other Put() overload above.
165
  // If the element was inserted, the std::unique_ptr releases its contents.
166
1
  QueueStatus Put(std::unique_ptr<T_VAL> *val) {
167
1
    QueueStatus s = Put(val->get());
168
1
    if (s == QUEUE_SUCCESS) {
169
1
      val->release();
170
1
    }
171
1
    return s;
172
1
  }
173
174
  // Gets an element for the queue; if the queue is full, blocks until
175
  // space becomes available. Returns false if we were shutdown prior
176
  // to enqueueing the element.
177
13.8M
  bool BlockingPut(const T& val) {
178
13.8M
    MutexLock l(lock_);
179
13.8M
    while (true) {
180
13.8M
      if (shutdown_) {
181
0
        return false;
182
0
      }
183
13.8M
      if (size_ < max_size_) {
184
13.8M
        list_.push_back(val);
185
13.8M
        increment_size_unlocked(val);
186
13.8M
        l.Unlock();
187
13.8M
        not_empty_.Signal();
188
13.8M
        return true;
189
13.8M
      }
190
39
      not_full_.Wait();
191
39
    }
192
13.8M
  }
_ZN2yb13BlockingQueueIiNS_18DefaultLogicalSizeEE11BlockingPutERKi
Line
Count
Source
177
20
  bool BlockingPut(const T& val) {
178
20
    MutexLock l(lock_);
179
20
    while (true) {
180
20
      if (shutdown_) {
181
0
        return false;
182
0
      }
183
20
      if (size_ < max_size_) {
184
20
        list_.push_back(val);
185
20
        increment_size_unlocked(val);
186
20
        l.Unlock();
187
20
        not_empty_.Signal();
188
20
        return true;
189
20
      }
190
0
      not_full_.Wait();
191
0
    }
192
20
  }
_ZN2yb13BlockingQueueIPiNS_18DefaultLogicalSizeEE11BlockingPutERKS1_
Line
Count
Source
177
13
  bool BlockingPut(const T& val) {
178
13
    MutexLock l(lock_);
179
13
    while (true) {
180
13
      if (shutdown_) {
181
0
        return false;
182
0
      }
183
13
      if (size_ < max_size_) {
184
13
        list_.push_back(val);
185
13
        increment_size_unlocked(val);
186
13
        l.Unlock();
187
13
        not_empty_.Signal();
188
13
        return true;
189
13
      }
190
0
      not_full_.Wait();
191
0
    }
192
13
  }
_ZN2yb13BlockingQueueIPNS_3log13LogEntryBatchENS_18DefaultLogicalSizeEE11BlockingPutERKS3_
Line
Count
Source
177
13.8M
  bool BlockingPut(const T& val) {
178
13.8M
    MutexLock l(lock_);
179
13.8M
    while (true) {
180
13.8M
      if (shutdown_) {
181
0
        return false;
182
0
      }
183
13.8M
      if (size_ < max_size_) {
184
13.8M
        list_.push_back(val);
185
13.8M
        increment_size_unlocked(val);
186
13.8M
        l.Unlock();
187
13.8M
        not_empty_.Signal();
188
13.8M
        return true;
189
13.8M
      }
190
39
      not_full_.Wait();
191
39
    }
192
13.8M
  }
193
194
  // Same as other BlockingPut() overload above. If the element was
195
  // enqueued, std::unique_ptr releases its contents.
196
  bool BlockingPut(std::unique_ptr<T_VAL>* val) {
197
    bool ret = Put(val->get());
198
    if (ret) {
199
      val->release();
200
    }
201
    return ret;
202
  }
203
204
  // Shut down the queue.
205
  // When a blocking queue is shut down, no more elements can be added to it,
206
  // and Put() will return QUEUE_SHUTDOWN.
207
  // Existing elements will drain out of it, and then BlockingGet will start
208
  // returning false.
209
99.2k
  void Shutdown() {
210
99.2k
    MutexLock l(lock_);
211
99.2k
    shutdown_ = true;
212
99.2k
    not_full_.Broadcast();
213
99.2k
    not_empty_.Broadcast();
214
99.2k
  }
_ZN2yb13BlockingQueueIxNS_18DefaultLogicalSizeEE8ShutdownEv
Line
Count
Source
209
3
  void Shutdown() {
210
3
    MutexLock l(lock_);
211
3
    shutdown_ = true;
212
3
    not_full_.Broadcast();
213
3
    not_empty_.Broadcast();
214
3
  }
_ZN2yb13BlockingQueueIiNS_18DefaultLogicalSizeEE8ShutdownEv
Line
Count
Source
209
1
  void Shutdown() {
210
1
    MutexLock l(lock_);
211
1
    shutdown_ = true;
212
1
    not_full_.Broadcast();
213
1
    not_empty_.Broadcast();
214
1
  }
_ZN2yb13BlockingQueueIPiNS_18DefaultLogicalSizeEE8ShutdownEv
Line
Count
Source
209
7
  void Shutdown() {
210
7
    MutexLock l(lock_);
211
7
    shutdown_ = true;
212
7
    not_full_.Broadcast();
213
7
    not_empty_.Broadcast();
214
7
  }
_ZN2yb13BlockingQueueIPNS_3log13LogEntryBatchENS_18DefaultLogicalSizeEE8ShutdownEv
Line
Count
Source
209
97.3k
  void Shutdown() {
210
97.3k
    MutexLock l(lock_);
211
97.3k
    shutdown_ = true;
212
97.3k
    not_full_.Broadcast();
213
97.3k
    not_empty_.Broadcast();
214
97.3k
  }
_ZN2yb13BlockingQueueINSt3__14pairINS_6SchemaENS1_12basic_stringIcNS1_11char_traitsIcEENS1_9allocatorIcEEEEEENS_18DefaultLogicalSizeEE8ShutdownEv
Line
Count
Source
209
1.87k
  void Shutdown() {
210
1.87k
    MutexLock l(lock_);
211
1.87k
    shutdown_ = true;
212
1.87k
    not_full_.Broadcast();
213
1.87k
    not_empty_.Broadcast();
214
1.87k
  }
215
216
220k
  bool empty() const {
217
220k
    MutexLock l(lock_);
218
220k
    return list_.empty();
219
220k
  }
_ZNK2yb13BlockingQueueIPiNS_18DefaultLogicalSizeEE5emptyEv
Line
Count
Source
216
6
  bool empty() const {
217
6
    MutexLock l(lock_);
218
6
    return list_.empty();
219
6
  }
_ZNK2yb13BlockingQueueIPNS_3log13LogEntryBatchENS_18DefaultLogicalSizeEE5emptyEv
Line
Count
Source
216
220k
  bool empty() const {
217
220k
    MutexLock l(lock_);
218
220k
    return list_.empty();
219
220k
  }
220
221
0
  size_t max_size() const {
222
0
    return max_size_;
223
0
  }
Unexecuted instantiation: _ZNK2yb13BlockingQueueIPiNS_18DefaultLogicalSizeEE8max_sizeEv
Unexecuted instantiation: _ZNK2yb13BlockingQueueIPNS_3log13LogEntryBatchENS_18DefaultLogicalSizeEE8max_sizeEv
224
225
0
  std::string ToString() const {
226
0
    std::string ret;
227
228
0
    MutexLock l(lock_);
229
0
    for (const T& t : list_) {
230
0
      ret.append(t->ToString());
231
0
      ret.append("\n");
232
0
    }
233
0
    return ret;
234
0
  }
235
236
 private:
237
238
  // Increments queue size. Must be called when 'lock_' is held.
239
13.9M
  void increment_size_unlocked(const T& t) {
240
13.9M
    size_ += LOGICAL_SIZE::logical_size(t);
241
13.9M
  }
_ZN2yb13BlockingQueueIxNS_18DefaultLogicalSizeEE23increment_size_unlockedERKx
Line
Count
Source
239
84.8k
  void increment_size_unlocked(const T& t) {
240
84.8k
    size_ += LOGICAL_SIZE::logical_size(t);
241
84.8k
  }
_ZN2yb13BlockingQueueIiNS_18DefaultLogicalSizeEE23increment_size_unlockedERKi
Line
Count
Source
239
49
  void increment_size_unlocked(const T& t) {
240
49
    size_ += LOGICAL_SIZE::logical_size(t);
241
49
  }
blocking_queue-test.cc:_ZN2yb13BlockingQueueINSt3__112basic_stringIcNS1_11char_traitsIcEENS1_9allocatorIcEEEENS_12_GLOBAL__N_117LengthLogicalSizeEE23increment_size_unlockedERKS7_
Line
Count
Source
239
2
  void increment_size_unlocked(const T& t) {
240
2
    size_ += LOGICAL_SIZE::logical_size(t);
241
2
  }
_ZN2yb13BlockingQueueIPiNS_18DefaultLogicalSizeEE23increment_size_unlockedERKS1_
Line
Count
Source
239
15
  void increment_size_unlocked(const T& t) {
240
15
    size_ += LOGICAL_SIZE::logical_size(t);
241
15
  }
_ZN2yb13BlockingQueueIPNS_3log13LogEntryBatchENS_18DefaultLogicalSizeEE23increment_size_unlockedERKS3_
Line
Count
Source
239
13.8M
  void increment_size_unlocked(const T& t) {
240
13.8M
    size_ += LOGICAL_SIZE::logical_size(t);
241
13.8M
  }
_ZN2yb13BlockingQueueINSt3__14pairINS_6SchemaENS1_12basic_stringIcNS1_11char_traitsIcEENS1_9allocatorIcEEEEEENS_18DefaultLogicalSizeEE23increment_size_unlockedERKSA_
Line
Count
Source
239
4.19k
  void increment_size_unlocked(const T& t) {
240
4.19k
    size_ += LOGICAL_SIZE::logical_size(t);
241
4.19k
  }
242
243
  // Decrements queue size. Must be called when 'lock_' is held.
244
13.9M
  void decrement_size_unlocked(const T& t) {
245
13.9M
    size_ -= LOGICAL_SIZE::logical_size(t);
246
13.9M
  }
_ZN2yb13BlockingQueueIxNS_18DefaultLogicalSizeEE23decrement_size_unlockedERKx
Line
Count
Source
244
84.8k
  void decrement_size_unlocked(const T& t) {
245
84.8k
    size_ -= LOGICAL_SIZE::logical_size(t);
246
84.8k
  }
_ZN2yb13BlockingQueueIiNS_18DefaultLogicalSizeEE23decrement_size_unlockedERKi
Line
Count
Source
244
46
  void decrement_size_unlocked(const T& t) {
245
46
    size_ -= LOGICAL_SIZE::logical_size(t);
246
46
  }
_ZN2yb13BlockingQueueIPiNS_18DefaultLogicalSizeEE23decrement_size_unlockedERKS1_
Line
Count
Source
244
14
  void decrement_size_unlocked(const T& t) {
245
14
    size_ -= LOGICAL_SIZE::logical_size(t);
246
14
  }
_ZN2yb13BlockingQueueIPNS_3log13LogEntryBatchENS_18DefaultLogicalSizeEE23decrement_size_unlockedERKS3_
Line
Count
Source
244
13.9M
  void decrement_size_unlocked(const T& t) {
245
13.9M
    size_ -= LOGICAL_SIZE::logical_size(t);
246
13.9M
  }
_ZN2yb13BlockingQueueINSt3__14pairINS_6SchemaENS1_12basic_stringIcNS1_11char_traitsIcEENS1_9allocatorIcEEEEEENS_18DefaultLogicalSizeEE23decrement_size_unlockedERKSA_
Line
Count
Source
244
4.19k
  void decrement_size_unlocked(const T& t) {
245
4.19k
    size_ -= LOGICAL_SIZE::logical_size(t);
246
4.19k
  }
247
248
  bool shutdown_;
249
  size_t size_;
250
  size_t max_size_;
251
  mutable Mutex lock_;
252
  ConditionVariable not_empty_;
253
  ConditionVariable not_full_;
254
  std::list<T> list_;
255
};
256
257
} // namespace yb
258
259
#endif  // YB_UTIL_BLOCKING_QUEUE_H