/Users/deen/code/yugabyte-db/src/yb/rpc/growable_buffer.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // |
2 | | // Copyright (c) YugaByte, Inc. |
3 | | // |
4 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
5 | | // in compliance with the License. You may obtain a copy of the License at |
6 | | // |
7 | | // http://www.apache.org/licenses/LICENSE-2.0 |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
10 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
11 | | // or implied. See the License for the specific language governing permissions and limitations |
12 | | // under the License. |
13 | | // |
14 | | // |
15 | | |
16 | | #include "yb/rpc/growable_buffer.h" |
17 | | |
18 | | #include <stdint.h> |
19 | | |
20 | | #include <functional> |
21 | | #include <thread> |
22 | | |
23 | | #include <boost/lockfree/stack.hpp> |
24 | | #include <glog/logging.h> |
25 | | |
26 | | #include "yb/util/mem_tracker.h" |
27 | | #include "yb/util/result.h" |
28 | | #include "yb/util/status_format.h" |
29 | | |
30 | | using namespace std::placeholders; |
31 | | |
32 | | namespace yb { |
33 | | namespace rpc { |
34 | | |
35 | | class GrowableBufferAllocator::Impl : public GarbageCollector, |
36 | | public std::enable_shared_from_this<Impl> { |
37 | | public: |
38 | | Impl(size_t block_size, |
39 | | const MemTrackerPtr& mem_tracker) |
40 | | : block_size_(block_size), |
41 | | mandatory_tracker_(MemTracker::FindOrCreateTracker( |
42 | | "Mandatory", mem_tracker, AddToParent::kFalse)), |
43 | | used_tracker_(MemTracker::FindOrCreateTracker("Used", mem_tracker)), |
44 | | allocated_tracker_(MemTracker::FindOrCreateTracker("Allocated", mem_tracker)), |
45 | 1.40k | pool_(0) { |
46 | 1.40k | } |
47 | | |
48 | 1.40k | void CompleteInit() { |
49 | 1.40k | allocated_tracker_->parent()->AddGarbageCollector(shared_from_this()); |
50 | 1.40k | } |
51 | | |
52 | 3 | virtual ~Impl() { |
53 | 3 | CollectGarbage(std::numeric_limits<size_t>::max()); |
54 | 3 | } |
55 | | |
56 | 4.85k | uint8_t* Allocate(bool forced) { |
57 | 4.85k | uint8_t* result = nullptr; |
58 | | |
59 | 4.85k | if (forced) { |
60 | 534 | if (pool_.pop(result)) { |
61 | 101 | allocated_tracker_->Release(block_size_); |
62 | 433 | } else { |
63 | 433 | result = static_cast<uint8_t*>(malloc(block_size_)); |
64 | 433 | } |
65 | 534 | mandatory_tracker_->Consume(block_size_); |
66 | 534 | return result; |
67 | 534 | } |
68 | | |
69 | 4.31k | if (!pool_.pop(result)) { |
70 | 32 | if (!allocated_tracker_->TryConsume(block_size_)) { |
71 | 0 | return nullptr; |
72 | 0 | } |
73 | 32 | result = static_cast<uint8_t*>(malloc(block_size_)); |
74 | 32 | } |
75 | 4.31k | used_tracker_->Consume(block_size_); |
76 | 4.31k | allocated_tracker_->Release(block_size_); |
77 | 4.31k | return result; |
78 | 4.31k | } |
79 | | |
80 | 4.44k | void Free(uint8_t* buffer, bool was_forced) { |
81 | 4.44k | if (!buffer) { |
82 | 0 | return; |
83 | 0 | } |
84 | | |
85 | 4.44k | auto* tracker = was_forced ? mandatory_tracker_.get() : used_tracker_.get(); |
86 | 4.44k | tracker->Release(block_size_); |
87 | 4.44k | if (allocated_tracker_->TryConsume(block_size_)) { |
88 | 4.44k | if (!pool_.push(buffer)) { |
89 | 0 | allocated_tracker_->Release(block_size_); |
90 | 0 | free(buffer); |
91 | 0 | } |
92 | 0 | } else { |
93 | 0 | free(buffer); |
94 | 0 | } |
95 | 4.44k | } |
96 | | |
97 | 531 | size_t block_size() const { |
98 | 531 | return block_size_; |
99 | 531 | } |
100 | | |
101 | | private: |
102 | 3 | void CollectGarbage(size_t required) override { |
103 | 3 | uint8_t* buffer = nullptr; |
104 | 3 | size_t total = 0; |
105 | 41 | while (total < required && pool_.pop(buffer)) { |
106 | 38 | free(buffer); |
107 | 38 | total += block_size_; |
108 | 38 | } |
109 | 3 | allocated_tracker_->Release(total); |
110 | 3 | } |
111 | | |
112 | | const size_t block_size_; |
113 | | // Buffers that allocated with force flag, does not could in parent. |
114 | | MemTrackerPtr mandatory_tracker_; |
115 | | // Buffers that are in use by client of this class. |
116 | | MemTrackerPtr used_tracker_; |
117 | | // Buffers that is contained in pool. |
118 | | MemTrackerPtr allocated_tracker_; |
119 | | boost::lockfree::stack<uint8_t*> pool_; |
120 | | }; |
121 | | |
122 | | GrowableBufferAllocator::GrowableBufferAllocator( |
123 | | size_t block_size, const MemTrackerPtr& mem_tracker) |
124 | 1.40k | : impl_(std::make_shared<Impl>(block_size, mem_tracker)) { |
125 | 1.40k | impl_->CompleteInit(); |
126 | 1.40k | } |
127 | | |
128 | 3 | GrowableBufferAllocator::~GrowableBufferAllocator() { |
129 | 3 | } |
130 | | |
131 | 531 | size_t GrowableBufferAllocator::block_size() const { |
132 | 531 | return impl_->block_size(); |
133 | 531 | } |
134 | | |
135 | 4.85k | uint8_t* GrowableBufferAllocator::Allocate(bool forced) { |
136 | 4.85k | return impl_->Allocate(forced); |
137 | 4.85k | } |
138 | | |
139 | 4.44k | void GrowableBufferAllocator::Free(uint8_t* buffer, bool was_forced) { |
140 | 4.44k | impl_->Free(buffer, was_forced); |
141 | 4.44k | } |
142 | | |
143 | | namespace { |
144 | | |
145 | | constexpr size_t kDefaultBuffersCapacity = 8; |
146 | | |
147 | | } |
148 | | |
149 | | GrowableBuffer::GrowableBuffer(GrowableBufferAllocator* allocator, size_t limit) |
150 | | : allocator_(*allocator), |
151 | | block_size_(allocator->block_size()), |
152 | | limit_(limit), |
153 | 531 | buffers_(kDefaultBuffersCapacity) { |
154 | 531 | buffers_.push_back( |
155 | 531 | BufferPtr(allocator_.Allocate(true), GrowableBufferDeleter(&allocator_, true))); |
156 | 531 | } |
157 | | |
158 | 2 | std::string GrowableBuffer::ToString() const { |
159 | 2 | return YB_CLASS_TO_STRING(size, limit); |
160 | 2 | } |
161 | | |
162 | 114k | void GrowableBuffer::Consume(size_t count, const Slice& prepend) { |
163 | 114k | if (count > size_) { |
164 | 0 | LOG(DFATAL) << "Consume more bytes than contained: " << size_ << " vs " << count; |
165 | 0 | } |
166 | 114k | if (!prepend.empty()) { |
167 | 0 | LOG(DFATAL) << "GrowableBuffer does not support prepending"; |
168 | 0 | } |
169 | 114k | if (count) { |
170 | 114k | pos_ += count; |
171 | 124k | while (pos_ >= block_size_) { |
172 | 9.75k | pos_ -= block_size_; |
173 | 9.75k | auto buffer = std::move(buffers_.front()); |
174 | 9.75k | buffers_.pop_front(); |
175 | | // Reuse buffer if only one left. |
176 | 9.75k | if (buffers_.size() < 2) { |
177 | 5.46k | buffers_.push_back(std::move(buffer)); |
178 | 5.46k | } |
179 | 9.75k | } |
180 | 114k | size_ -= count; |
181 | 114k | if (size_ == 0) { // Buffer was fully read, so we could reset start position also. |
182 | 104k | pos_ = 0; |
183 | 104k | } |
184 | 114k | } |
185 | 114k | } |
186 | | |
187 | 0 | void GrowableBuffer::Swap(GrowableBuffer* rhs) { |
188 | 0 | DCHECK_EQ(limit_, rhs->limit_); |
189 | 0 | DCHECK_EQ(block_size_, rhs->block_size_); |
190 | 0 | DCHECK_EQ(&allocator_, &rhs->allocator_); |
191 | |
|
192 | 0 | buffers_.swap(rhs->buffers_); |
193 | 0 | consumption_.Swap(&rhs->consumption_); |
194 | 0 | std::swap(size_, rhs->size_); |
195 | 0 | std::swap(pos_, rhs->pos_); |
196 | 0 | } |
197 | | |
198 | 338k | IoVecs GrowableBuffer::IoVecsForRange(size_t begin, size_t end) { |
199 | 338k | DCHECK_LE(begin, end); |
200 | | |
201 | 338k | auto it = buffers_.begin(); |
202 | 384k | while (begin >= block_size_) { |
203 | 45.8k | ++it; |
204 | 45.8k | begin -= block_size_; |
205 | 45.8k | end -= block_size_; |
206 | 45.8k | } |
207 | 338k | IoVecs result; |
208 | 346k | while (end > block_size_) { |
209 | 7.94k | result.push_back({ it->get() + begin, block_size_ - begin }); |
210 | 7.94k | begin = 0; |
211 | 7.94k | end -= block_size_; |
212 | 7.94k | ++it; |
213 | 7.94k | } |
214 | 338k | result.push_back({ it->get() + begin, end - begin }); |
215 | 338k | return result; |
216 | 338k | } |
217 | | |
218 | | // Returns currently read data. |
219 | 114k | IoVecs GrowableBuffer::AppendedVecs() { |
220 | 114k | return IoVecsForRange(pos_, pos_ + size_); |
221 | 114k | } |
222 | | |
223 | 223k | Result<IoVecs> GrowableBuffer::PrepareAppend() { |
224 | 223k | if (!valid()) { |
225 | 0 | return STATUS(IllegalState, "Read buffer was reset"); |
226 | 0 | } |
227 | | |
228 | 223k | DCHECK_LT(pos_, block_size_); |
229 | | |
230 | | // Check if we have too small capacity left. |
231 | 223k | if (pos_ + size_ * 2 >= block_size_ && capacity_left() * 2 < block_size_) { |
232 | 4.45k | if (buffers_.size() == buffers_.capacity()) { |
233 | 4 | buffers_.set_capacity(buffers_.capacity() * 2); |
234 | 4 | } |
235 | 4.45k | BufferPtr new_buffer; |
236 | 4.45k | if (buffers_.size() * block_size_ < limit_) { |
237 | | // We need at least 2 buffers for normal functioning. |
238 | | // Because with one buffer we could reach situation when our command limit is just several |
239 | | // bytes. |
240 | 4.31k | bool forced = buffers_.size() < 2; |
241 | 4.31k | new_buffer = BufferPtr( |
242 | 4.31k | allocator_.Allocate(forced), GrowableBufferDeleter(&allocator_, forced)); |
243 | 4.31k | } |
244 | 4.45k | if (new_buffer) { |
245 | 4.31k | buffers_.push_back(std::move(new_buffer)); |
246 | 131 | } else if (capacity_left() == 0) { |
247 | 1 | return STATUS_FORMAT( |
248 | 1 | Busy, "Prepare read when buffer already full, size: $0, limit: $1", size_, limit_); |
249 | 1 | } |
250 | 223k | } |
251 | | |
252 | 223k | return IoVecsForRange(pos_ + size_, buffers_.size() * block_size_); |
253 | 223k | } |
254 | | |
255 | 118k | void GrowableBuffer::DataAppended(size_t len) { |
256 | 118k | if (len > capacity_left()) { |
257 | 0 | LOG(DFATAL) << "Data appended over capacity: " << len << " > " << capacity_left(); |
258 | 0 | } |
259 | 118k | size_ += len; |
260 | 118k | } |
261 | | |
262 | 250 | void GrowableBuffer::Reset() { |
263 | 250 | Clear(); |
264 | 250 | buffers_.clear(); |
265 | 250 | buffers_.set_capacity(0); |
266 | 250 | } |
267 | | |
268 | 223k | bool GrowableBuffer::valid() const { |
269 | 223k | return !buffers_.empty(); |
270 | 223k | } |
271 | | |
272 | 0 | std::ostream& operator<<(std::ostream& out, const GrowableBuffer& receiver) { |
273 | 0 | return out << receiver.ToString(); |
274 | 0 | } |
275 | | |
276 | | } // namespace rpc |
277 | | } // namespace yb |