/Users/deen/code/yugabyte-db/src/yb/rpc/circular_read_buffer.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | // |
13 | | |
14 | | #include "yb/rpc/circular_read_buffer.h" |
15 | | |
16 | | #include "yb/util/result.h" |
17 | | #include "yb/util/tostring.h" |
18 | | |
19 | | namespace yb { |
20 | | namespace rpc { |
21 | | |
22 | | CircularReadBuffer::CircularReadBuffer(size_t capacity, const MemTrackerPtr& parent_tracker) |
23 | | : consumption_(MemTracker::FindOrCreateTracker("Receive", parent_tracker, AddToParent::kFalse), |
24 | | capacity), |
25 | 1.82M | buffer_(static_cast<char*>(malloc(capacity))), capacity_(capacity) { |
26 | 1.82M | } |
27 | | |
28 | 101M | bool CircularReadBuffer::Empty() { |
29 | 101M | return size_ == 0; |
30 | 101M | } |
31 | | |
32 | 1.21M | void CircularReadBuffer::Reset() { |
33 | 1.21M | buffer_.reset(); |
34 | 1.21M | } |
35 | | |
36 | 85.8M | Result<IoVecs> CircularReadBuffer::PrepareAppend() { |
37 | 85.8M | if (!buffer_) { |
38 | 0 | return STATUS(IllegalState, "Read buffer was reset"); |
39 | 0 | } |
40 | | |
41 | 85.8M | IoVecs result; |
42 | | |
43 | 85.8M | if (!prepend_.empty()) { |
44 | 99.9k | result.push_back(iovec{prepend_.mutable_data(), prepend_.size()}); |
45 | 99.9k | } |
46 | | |
47 | 85.8M | size_t end = pos_ + size_; |
48 | 85.8M | if (end < capacity_) { |
49 | 85.7M | result.push_back(iovec{buffer_.get() + end, capacity_ - end}); |
50 | 85.7M | } |
51 | 85.7M | size_t start = end <= capacity_ ? 0 : end - capacity_; |
52 | 85.8M | if (pos_ > start) { |
53 | 977 | result.push_back(iovec{buffer_.get() + start, pos_ - start}); |
54 | 977 | } |
55 | | |
56 | 85.8M | if (result.empty()) { |
57 | 0 | static Status busy_status = STATUS(Busy, "Circular read buffer is full"); |
58 | 0 | return busy_status; |
59 | 0 | } |
60 | | |
61 | 85.8M | return result; |
62 | 85.8M | } |
63 | | |
64 | 122 | std::string CircularReadBuffer::ToString() const { |
65 | 122 | return YB_CLASS_TO_STRING(capacity, pos, size); |
66 | 122 | } |
67 | | |
68 | 43.0M | void CircularReadBuffer::DataAppended(size_t len) { |
69 | 43.0M | if (!prepend_.empty()) { |
70 | 99.2k | size_t prepend_len = std::min(len, prepend_.size()); |
71 | 99.2k | prepend_.remove_prefix(prepend_len); |
72 | 99.2k | len -= prepend_len; |
73 | 99.2k | } |
74 | 43.0M | size_ += len; |
75 | 43.0M | } |
76 | | |
77 | 43.3M | IoVecs CircularReadBuffer::AppendedVecs() { |
78 | 43.3M | IoVecs result; |
79 | | |
80 | 43.3M | size_t end = pos_ + size_; |
81 | 43.3M | if (end <= capacity_) { |
82 | 43.3M | result.push_back(iovec{buffer_.get() + pos_, size_}); |
83 | 9.98k | } else { |
84 | 9.98k | result.push_back(iovec{buffer_.get() + pos_, capacity_ - pos_}); |
85 | 9.98k | result.push_back(iovec{buffer_.get(), end - capacity_}); |
86 | 9.98k | } |
87 | | |
88 | 43.3M | return result; |
89 | 43.3M | } |
90 | | |
91 | 42.4M | bool CircularReadBuffer::Full() { |
92 | 42.4M | return size_ == capacity_; |
93 | 42.4M | } |
94 | | |
95 | 67.0k | size_t CircularReadBuffer::DataAvailable() { |
96 | 67.0k | return size_; |
97 | 67.0k | } |
98 | | |
99 | 43.0M | void CircularReadBuffer::Consume(size_t count, const Slice& prepend) { |
100 | 43.0M | pos_ += count; |
101 | 43.0M | if (pos_ >= capacity_) { |
102 | 8.57k | pos_ -= capacity_; |
103 | 8.57k | } |
104 | 43.0M | size_ -= count; |
105 | 43.0M | if (size_ == 0) { |
106 | 42.9M | pos_ = 0; |
107 | 42.9M | } |
108 | 43.0M | DCHECK(prepend_.empty()); |
109 | 43.0M | prepend_ = prepend; |
110 | 43.0M | had_prepend_ = !prepend.empty(); |
111 | 43.0M | } |
112 | | |
113 | 42.4M | bool CircularReadBuffer::ReadyToRead() { |
114 | 42.4M | return prepend_.empty() && (had_prepend_ || !Empty()); |
115 | 42.4M | } |
116 | | |
117 | | } // namespace rpc |
118 | | } // namespace yb |