YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/rpc/growable_buffer.cc
Line
Count
Source (jump to first uncovered line)
1
//
2
// Copyright (c) YugaByte, Inc.
3
//
4
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
5
// in compliance with the License.  You may obtain a copy of the License at
6
//
7
// http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software distributed under the License
10
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
11
// or implied.  See the License for the specific language governing permissions and limitations
12
// under the License.
13
//
14
//
15
16
#include "yb/rpc/growable_buffer.h"
17
18
#include <stdint.h>
19
20
#include <functional>
21
#include <thread>
22
23
#include <boost/lockfree/stack.hpp>
24
#include <glog/logging.h>
25
26
#include "yb/util/mem_tracker.h"
27
#include "yb/util/result.h"
28
#include "yb/util/status_format.h"
29
30
using namespace std::placeholders;
31
32
namespace yb {
33
namespace rpc {
34
35
class GrowableBufferAllocator::Impl : public GarbageCollector,
36
                                      public std::enable_shared_from_this<Impl> {
37
 public:
38
  Impl(size_t block_size,
39
       const MemTrackerPtr& mem_tracker)
40
      : block_size_(block_size),
41
        mandatory_tracker_(MemTracker::FindOrCreateTracker(
42
            "Mandatory", mem_tracker, AddToParent::kFalse)),
43
        used_tracker_(MemTracker::FindOrCreateTracker("Used", mem_tracker)),
44
        allocated_tracker_(MemTracker::FindOrCreateTracker("Allocated", mem_tracker)),
45
1.40k
        pool_(0) {
46
1.40k
  }
47
48
1.40k
  void CompleteInit() {
49
1.40k
    allocated_tracker_->parent()->AddGarbageCollector(shared_from_this());
50
1.40k
  }
51
52
3
  virtual ~Impl() {
53
3
    CollectGarbage(std::numeric_limits<size_t>::max());
54
3
  }
55
56
4.85k
  uint8_t* Allocate(bool forced) {
57
4.85k
    uint8_t* result = nullptr;
58
59
4.85k
    if (forced) {
60
534
      if (pool_.pop(result)) {
61
101
        allocated_tracker_->Release(block_size_);
62
433
      } else {
63
433
        result = static_cast<uint8_t*>(malloc(block_size_));
64
433
      }
65
534
      mandatory_tracker_->Consume(block_size_);
66
534
      return result;
67
534
    }
68
69
4.31k
    if (!pool_.pop(result)) {
70
32
      if (!allocated_tracker_->TryConsume(block_size_)) {
71
0
        return nullptr;
72
0
      }
73
32
      result = static_cast<uint8_t*>(malloc(block_size_));
74
32
    }
75
4.31k
    used_tracker_->Consume(block_size_);
76
4.31k
    allocated_tracker_->Release(block_size_);
77
4.31k
    return result;
78
4.31k
  }
79
80
4.44k
  void Free(uint8_t* buffer, bool was_forced) {
81
4.44k
    if (!buffer) {
82
0
      return;
83
0
    }
84
85
4.44k
    auto* tracker = was_forced ? mandatory_tracker_.get() : used_tracker_.get();
86
4.44k
    tracker->Release(block_size_);
87
4.44k
    if (allocated_tracker_->TryConsume(block_size_)) {
88
4.44k
      if (!pool_.push(buffer)) {
89
0
        allocated_tracker_->Release(block_size_);
90
0
        free(buffer);
91
0
      }
92
0
    } else {
93
0
      free(buffer);
94
0
    }
95
4.44k
  }
96
97
531
  size_t block_size() const {
98
531
    return block_size_;
99
531
  }
100
101
 private:
102
3
  void CollectGarbage(size_t required) override {
103
3
    uint8_t* buffer = nullptr;
104
3
    size_t total = 0;
105
41
    while (total < required && pool_.pop(buffer)) {
106
38
      free(buffer);
107
38
      total += block_size_;
108
38
    }
109
3
    allocated_tracker_->Release(total);
110
3
  }
111
112
  const size_t block_size_;
113
  // Buffers that allocated with force flag, does not could in parent.
114
  MemTrackerPtr mandatory_tracker_;
115
  // Buffers that are in use by client of this class.
116
  MemTrackerPtr used_tracker_;
117
  // Buffers that is contained in pool.
118
  MemTrackerPtr allocated_tracker_;
119
  boost::lockfree::stack<uint8_t*> pool_;
120
};
121
122
GrowableBufferAllocator::GrowableBufferAllocator(
123
    size_t block_size, const MemTrackerPtr& mem_tracker)
124
1.40k
    : impl_(std::make_shared<Impl>(block_size, mem_tracker)) {
125
1.40k
  impl_->CompleteInit();
126
1.40k
}
127
128
3
GrowableBufferAllocator::~GrowableBufferAllocator() {
129
3
}
130
131
531
size_t GrowableBufferAllocator::block_size() const {
132
531
  return impl_->block_size();
133
531
}
134
135
4.85k
uint8_t* GrowableBufferAllocator::Allocate(bool forced) {
136
4.85k
  return impl_->Allocate(forced);
137
4.85k
}
138
139
4.44k
void GrowableBufferAllocator::Free(uint8_t* buffer, bool was_forced) {
140
4.44k
  impl_->Free(buffer, was_forced);
141
4.44k
}
142
143
namespace {
144
145
constexpr size_t kDefaultBuffersCapacity = 8;
146
147
}
148
149
GrowableBuffer::GrowableBuffer(GrowableBufferAllocator* allocator, size_t limit)
150
    : allocator_(*allocator),
151
      block_size_(allocator->block_size()),
152
      limit_(limit),
153
531
      buffers_(kDefaultBuffersCapacity) {
154
531
  buffers_.push_back(
155
531
      BufferPtr(allocator_.Allocate(true), GrowableBufferDeleter(&allocator_, true)));
156
531
}
157
158
2
std::string GrowableBuffer::ToString() const {
159
2
  return YB_CLASS_TO_STRING(size, limit);
160
2
}
161
162
114k
void GrowableBuffer::Consume(size_t count, const Slice& prepend) {
163
114k
  if (count > size_) {
164
0
    LOG(DFATAL) << "Consume more bytes than contained: " << size_ << " vs " << count;
165
0
  }
166
114k
  if (!prepend.empty()) {
167
0
    LOG(DFATAL) << "GrowableBuffer does not support prepending";
168
0
  }
169
114k
  if (count) {
170
114k
    pos_ += count;
171
124k
    while (pos_ >= block_size_) {
172
9.75k
      pos_ -= block_size_;
173
9.75k
      auto buffer = std::move(buffers_.front());
174
9.75k
      buffers_.pop_front();
175
      // Reuse buffer if only one left.
176
9.75k
      if (buffers_.size() < 2) {
177
5.46k
        buffers_.push_back(std::move(buffer));
178
5.46k
      }
179
9.75k
    }
180
114k
    size_ -= count;
181
114k
    if (size_ == 0) { // Buffer was fully read, so we could reset start position also.
182
104k
      pos_ = 0;
183
104k
    }
184
114k
  }
185
114k
}
186
187
0
void GrowableBuffer::Swap(GrowableBuffer* rhs) {
188
0
  DCHECK_EQ(limit_, rhs->limit_);
189
0
  DCHECK_EQ(block_size_, rhs->block_size_);
190
0
  DCHECK_EQ(&allocator_, &rhs->allocator_);
191
192
0
  buffers_.swap(rhs->buffers_);
193
0
  consumption_.Swap(&rhs->consumption_);
194
0
  std::swap(size_, rhs->size_);
195
0
  std::swap(pos_, rhs->pos_);
196
0
}
197
198
338k
IoVecs GrowableBuffer::IoVecsForRange(size_t begin, size_t end) {
199
338k
  DCHECK_LE(begin, end);
200
201
338k
  auto it = buffers_.begin();
202
384k
  while (begin >= block_size_) {
203
45.8k
    ++it;
204
45.8k
    begin -= block_size_;
205
45.8k
    end -= block_size_;
206
45.8k
  }
207
338k
  IoVecs result;
208
346k
  while (end > block_size_) {
209
7.94k
    result.push_back({ it->get() + begin, block_size_ - begin });
210
7.94k
    begin = 0;
211
7.94k
    end -= block_size_;
212
7.94k
    ++it;
213
7.94k
  }
214
338k
  result.push_back({ it->get() + begin, end - begin });
215
338k
  return result;
216
338k
}
217
218
// Returns currently read data.
219
114k
IoVecs GrowableBuffer::AppendedVecs() {
220
114k
  return IoVecsForRange(pos_, pos_ + size_);
221
114k
}
222
223
223k
Result<IoVecs> GrowableBuffer::PrepareAppend() {
224
223k
  if (!valid()) {
225
0
    return STATUS(IllegalState, "Read buffer was reset");
226
0
  }
227
228
223k
  DCHECK_LT(pos_, block_size_);
229
230
  // Check if we have too small capacity left.
231
223k
  if (pos_ + size_ * 2 >= block_size_ && capacity_left() * 2 < block_size_) {
232
4.45k
    if (buffers_.size() == buffers_.capacity()) {
233
4
      buffers_.set_capacity(buffers_.capacity() * 2);
234
4
    }
235
4.45k
    BufferPtr new_buffer;
236
4.45k
    if (buffers_.size() * block_size_ < limit_) {
237
      // We need at least 2 buffers for normal functioning.
238
      // Because with one buffer we could reach situation when our command limit is just several
239
      // bytes.
240
4.31k
      bool forced = buffers_.size() < 2;
241
4.31k
      new_buffer = BufferPtr(
242
4.31k
          allocator_.Allocate(forced), GrowableBufferDeleter(&allocator_, forced));
243
4.31k
    }
244
4.45k
    if (new_buffer) {
245
4.31k
      buffers_.push_back(std::move(new_buffer));
246
131
    } else if (capacity_left() == 0) {
247
1
      return STATUS_FORMAT(
248
1
          Busy, "Prepare read when buffer already full, size: $0, limit: $1", size_, limit_);
249
1
    }
250
223k
  }
251
252
223k
  return IoVecsForRange(pos_ + size_, buffers_.size() * block_size_);
253
223k
}
254
255
118k
void GrowableBuffer::DataAppended(size_t len) {
256
118k
  if (len > capacity_left()) {
257
0
    LOG(DFATAL) << "Data appended over capacity: " << len << " > " << capacity_left();
258
0
  }
259
118k
  size_ += len;
260
118k
}
261
262
250
void GrowableBuffer::Reset() {
263
250
  Clear();
264
250
  buffers_.clear();
265
250
  buffers_.set_capacity(0);
266
250
}
267
268
223k
bool GrowableBuffer::valid() const {
269
223k
  return !buffers_.empty();
270
223k
}
271
272
0
std::ostream& operator<<(std::ostream& out, const GrowableBuffer& receiver) {
273
0
  return out << receiver.ToString();
274
0
}
275
276
} // namespace rpc
277
} // namespace yb