YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/rpc/growable_buffer.cc
Line
Count
Source (jump to first uncovered line)
1
//
2
// Copyright (c) YugaByte, Inc.
3
//
4
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
5
// in compliance with the License.  You may obtain a copy of the License at
6
//
7
// http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software distributed under the License
10
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
11
// or implied.  See the License for the specific language governing permissions and limitations
12
// under the License.
13
//
14
//
15
16
#include "yb/rpc/growable_buffer.h"
17
18
#include <stdint.h>
19
20
#include <functional>
21
#include <thread>
22
23
#include <boost/lockfree/stack.hpp>
24
#include <glog/logging.h>
25
26
#include "yb/util/mem_tracker.h"
27
#include "yb/util/result.h"
28
#include "yb/util/status_format.h"
29
30
using namespace std::placeholders;
31
32
namespace yb {
33
namespace rpc {
34
35
class GrowableBufferAllocator::Impl : public GarbageCollector,
36
                                      public std::enable_shared_from_this<Impl> {
37
 public:
38
  Impl(size_t block_size,
39
       const MemTrackerPtr& mem_tracker)
40
      : block_size_(block_size),
41
        mandatory_tracker_(MemTracker::FindOrCreateTracker(
42
            "Mandatory", mem_tracker, AddToParent::kFalse)),
43
        used_tracker_(MemTracker::FindOrCreateTracker("Used", mem_tracker)),
44
        allocated_tracker_(MemTracker::FindOrCreateTracker("Allocated", mem_tracker)),
45
2.94k
        pool_(0) {
46
2.94k
  }
47
48
2.94k
  void CompleteInit() {
49
2.94k
    allocated_tracker_->parent()->AddGarbageCollector(shared_from_this());
50
2.94k
  }
51
52
3
  virtual ~Impl() {
53
3
    CollectGarbage(std::numeric_limits<size_t>::max());
54
3
  }
55
56
6.16k
  uint8_t* Allocate(bool forced) {
57
6.16k
    uint8_t* result = nullptr;
58
59
6.16k
    if (forced) {
60
1.52k
      if (pool_.pop(result)) {
61
205
        allocated_tracker_->Release(block_size_);
62
1.31k
      } else {
63
1.31k
        result = static_cast<uint8_t*>(malloc(block_size_));
64
1.31k
      }
65
1.52k
      mandatory_tracker_->Consume(block_size_);
66
1.52k
      return result;
67
1.52k
    }
68
69
4.63k
    if (!pool_.pop(result)) {
70
61
      if (!allocated_tracker_->TryConsume(block_size_)) {
71
0
        return nullptr;
72
0
      }
73
61
      result = static_cast<uint8_t*>(malloc(block_size_));
74
61
    }
75
4.63k
    used_tracker_->Consume(block_size_);
76
4.63k
    allocated_tracker_->Release(block_size_);
77
4.63k
    return result;
78
4.63k
  }
79
80
5.12k
  void Free(uint8_t* buffer, bool was_forced) {
81
5.12k
    if (!buffer) {
82
0
      return;
83
0
    }
84
85
5.12k
    auto* tracker = was_forced ? 
mandatory_tracker_.get()484
:
used_tracker_.get()4.63k
;
86
5.12k
    tracker->Release(block_size_);
87
5.12k
    if (
allocated_tracker_->TryConsume(block_size_)5.12k
) {
88
5.12k
      if (!pool_.push(buffer)) {
89
0
        allocated_tracker_->Release(block_size_);
90
0
        free(buffer);
91
0
      }
92
18.4E
    } else {
93
18.4E
      free(buffer);
94
18.4E
    }
95
5.12k
  }
96
97
1.51k
  size_t block_size() const {
98
1.51k
    return block_size_;
99
1.51k
  }
100
101
 private:
102
3
  void CollectGarbage(size_t required) override {
103
3
    uint8_t* buffer = nullptr;
104
3
    size_t total = 0;
105
42
    while (total < required && pool_.pop(buffer)) {
106
39
      free(buffer);
107
39
      total += block_size_;
108
39
    }
109
3
    allocated_tracker_->Release(total);
110
3
  }
111
112
  const size_t block_size_;
113
  // Buffers that allocated with force flag, does not could in parent.
114
  MemTrackerPtr mandatory_tracker_;
115
  // Buffers that are in use by client of this class.
116
  MemTrackerPtr used_tracker_;
117
  // Buffers that is contained in pool.
118
  MemTrackerPtr allocated_tracker_;
119
  boost::lockfree::stack<uint8_t*> pool_;
120
};
121
122
GrowableBufferAllocator::GrowableBufferAllocator(
123
    size_t block_size, const MemTrackerPtr& mem_tracker)
124
2.94k
    : impl_(std::make_shared<Impl>(block_size, mem_tracker)) {
125
2.94k
  impl_->CompleteInit();
126
2.94k
}
127
128
3
GrowableBufferAllocator::~GrowableBufferAllocator() {
129
3
}
130
131
1.51k
size_t GrowableBufferAllocator::block_size() const {
132
1.51k
  return impl_->block_size();
133
1.51k
}
134
135
6.16k
uint8_t* GrowableBufferAllocator::Allocate(bool forced) {
136
6.16k
  return impl_->Allocate(forced);
137
6.16k
}
138
139
5.12k
void GrowableBufferAllocator::Free(uint8_t* buffer, bool was_forced) {
140
5.12k
  impl_->Free(buffer, was_forced);
141
5.12k
}
142
143
namespace {
144
145
constexpr size_t kDefaultBuffersCapacity = 8;
146
147
}
148
149
GrowableBuffer::GrowableBuffer(GrowableBufferAllocator* allocator, size_t limit)
150
    : allocator_(*allocator),
151
      block_size_(allocator->block_size()),
152
      limit_(limit),
153
1.51k
      buffers_(kDefaultBuffersCapacity) {
154
1.51k
  buffers_.push_back(
155
1.51k
      BufferPtr(allocator_.Allocate(true), GrowableBufferDeleter(&allocator_, true)));
156
1.51k
}
157
158
15
std::string GrowableBuffer::ToString() const {
159
15
  return YB_CLASS_TO_STRING(size, limit);
160
15
}
161
162
225k
void GrowableBuffer::Consume(size_t count, const Slice& prepend) {
163
225k
  if (count > size_) {
164
0
    LOG(DFATAL) << "Consume more bytes than contained: " << size_ << " vs " << count;
165
0
  }
166
225k
  if (!prepend.empty()) {
167
0
    LOG(DFATAL) << "GrowableBuffer does not support prepending";
168
0
  }
169
225k
  if (count) {
170
219k
    pos_ += count;
171
230k
    while (pos_ >= block_size_) {
172
10.3k
      pos_ -= block_size_;
173
10.3k
      auto buffer = std::move(buffers_.front());
174
10.3k
      buffers_.pop_front();
175
      // Reuse buffer if only one left.
176
10.3k
      if (buffers_.size() < 2) {
177
5.75k
        buffers_.push_back(std::move(buffer));
178
5.75k
      }
179
10.3k
    }
180
219k
    size_ -= count;
181
219k
    if (size_ == 0) { // Buffer was fully read, so we could reset start position also.
182
209k
      pos_ = 0;
183
209k
    }
184
219k
  }
185
225k
}
186
187
0
void GrowableBuffer::Swap(GrowableBuffer* rhs) {
188
0
  DCHECK_EQ(limit_, rhs->limit_);
189
0
  DCHECK_EQ(block_size_, rhs->block_size_);
190
0
  DCHECK_EQ(&allocator_, &rhs->allocator_);
191
192
0
  buffers_.swap(rhs->buffers_);
193
0
  consumption_.Swap(&rhs->consumption_);
194
0
  std::swap(size_, rhs->size_);
195
0
  std::swap(pos_, rhs->pos_);
196
0
}
197
198
665k
IoVecs GrowableBuffer::IoVecsForRange(size_t begin, size_t end) {
199
665k
  DCHECK_LE(begin, end);
200
201
665k
  auto it = buffers_.begin();
202
736k
  while (begin >= block_size_) {
203
70.9k
    ++it;
204
70.9k
    begin -= block_size_;
205
70.9k
    end -= block_size_;
206
70.9k
  }
207
665k
  IoVecs result;
208
699k
  while (end > block_size_) {
209
34.4k
    result.push_back({ it->get() + begin, block_size_ - begin });
210
34.4k
    begin = 0;
211
34.4k
    end -= block_size_;
212
34.4k
    ++it;
213
34.4k
  }
214
665k
  result.push_back({ it->get() + begin, end - begin });
215
665k
  return result;
216
665k
}
217
218
// Returns currently read data.
219
225k
IoVecs GrowableBuffer::AppendedVecs() {
220
225k
  return IoVecsForRange(pos_, pos_ + size_);
221
225k
}
222
223
440k
Result<IoVecs> GrowableBuffer::PrepareAppend() {
224
440k
  if (!valid()) {
225
0
    return STATUS(IllegalState, "Read buffer was reset");
226
0
  }
227
228
440k
  DCHECK_LT(pos_, block_size_);
229
230
  // Check if we have too small capacity left.
231
440k
  if (pos_ + size_ * 2 >= block_size_ && 
capacity_left() * 2 < block_size_19.5k
) {
232
4.77k
    if (buffers_.size() == buffers_.capacity()) {
233
7
      buffers_.set_capacity(buffers_.capacity() * 2);
234
7
    }
235
4.77k
    BufferPtr new_buffer;
236
4.77k
    if (buffers_.size() * block_size_ < limit_) {
237
      // We need at least 2 buffers for normal functioning.
238
      // Because with one buffer we could reach situation when our command limit is just several
239
      // bytes.
240
4.64k
      bool forced = buffers_.size() < 2;
241
4.64k
      new_buffer = BufferPtr(
242
4.64k
          allocator_.Allocate(forced), GrowableBufferDeleter(&allocator_, forced));
243
4.64k
    }
244
4.77k
    if (new_buffer) {
245
4.64k
      buffers_.push_back(std::move(new_buffer));
246
4.64k
    } else 
if (129
capacity_left() == 0129
) {
247
1
      return STATUS_FORMAT(
248
1
          Busy, "Prepare read when buffer already full, size: $0, limit: $1", size_, limit_);
249
1
    }
250
4.77k
  }
251
252
440k
  return IoVecsForRange(pos_ + size_, buffers_.size() * block_size_);
253
440k
}
254
255
229k
void GrowableBuffer::DataAppended(size_t len) {
256
229k
  if (len > capacity_left()) {
257
0
    LOG(DFATAL) << "Data appended over capacity: " << len << " > " << capacity_left();
258
0
  }
259
229k
  size_ += len;
260
229k
}
261
262
947
void GrowableBuffer::Reset() {
263
947
  Clear();
264
947
  buffers_.clear();
265
947
  buffers_.set_capacity(0);
266
947
}
267
268
440k
bool GrowableBuffer::valid() const {
269
440k
  return !buffers_.empty();
270
440k
}
271
272
0
std::ostream& operator<<(std::ostream& out, const GrowableBuffer& receiver) {
273
0
  return out << receiver.ToString();
274
0
}
275
276
} // namespace rpc
277
} // namespace yb