YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/rpc/circular_read_buffer.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include "yb/rpc/circular_read_buffer.h"
15
16
#include "yb/util/result.h"
17
#include "yb/util/tostring.h"
18
19
namespace yb {
20
namespace rpc {
21
22
CircularReadBuffer::CircularReadBuffer(size_t capacity, const MemTrackerPtr& parent_tracker)
23
    : consumption_(MemTracker::FindOrCreateTracker("Receive", parent_tracker, AddToParent::kFalse),
24
                   capacity),
25
7.46M
      buffer_(static_cast<char*>(malloc(capacity))), capacity_(capacity) {
26
7.46M
}
27
28
1.08G
bool CircularReadBuffer::Empty() {
29
1.08G
  return size_ == 0;
30
1.08G
}
31
32
6.33M
void CircularReadBuffer::Reset() {
33
6.33M
  buffer_.reset();
34
6.33M
}
35
36
322M
Result<IoVecs> CircularReadBuffer::PrepareAppend() {
37
322M
  if (!buffer_) {
38
0
    return STATUS(IllegalState, "Read buffer was reset");
39
0
  }
40
41
322M
  IoVecs result;
42
43
322M
  if (!prepend_.empty()) {
44
252k
    result.push_back(iovec{prepend_.mutable_data(), prepend_.size()});
45
252k
  }
46
47
322M
  size_t end = pos_ + size_;
48
322M
  if (end < capacity_) {
49
321M
    result.push_back(iovec{buffer_.get() + end, capacity_ - end});
50
321M
  }
51
322M
  size_t start = end <= capacity_ ? 
0321M
:
end - capacity_289k
;
52
322M
  if (pos_ > start) {
53
1.07k
    result.push_back(iovec{buffer_.get() + start, pos_ - start});
54
1.07k
  }
55
56
322M
  if (result.empty()) {
57
0
    static Status busy_status = STATUS(Busy, "Circular read buffer is full");
58
0
    return busy_status;
59
0
  }
60
61
322M
  return result;
62
322M
}
63
64
4.74k
std::string CircularReadBuffer::ToString() const {
65
4.74k
  return YB_CLASS_TO_STRING(capacity, pos, size);
66
4.74k
}
67
68
160M
void CircularReadBuffer::DataAppended(size_t len) {
69
160M
  if (!prepend_.empty()) {
70
251k
    size_t prepend_len = std::min(len, prepend_.size());
71
251k
    prepend_.remove_prefix(prepend_len);
72
251k
    len -= prepend_len;
73
251k
  }
74
160M
  size_ += len;
75
160M
}
76
77
160M
IoVecs CircularReadBuffer::AppendedVecs() {
78
160M
  IoVecs result;
79
80
160M
  size_t end = pos_ + size_;
81
160M
  if (end <= capacity_) {
82
160M
    result.push_back(iovec{buffer_.get() + pos_, size_});
83
160M
  } else {
84
69.9k
    result.push_back(iovec{buffer_.get() + pos_, capacity_ - pos_});
85
69.9k
    result.push_back(iovec{buffer_.get(), end - capacity_});
86
69.9k
  }
87
88
160M
  return result;
89
160M
}
90
91
159M
bool CircularReadBuffer::Full() {
92
159M
  return size_ == capacity_;
93
159M
}
94
95
176k
size_t CircularReadBuffer::DataAvailable() {
96
176k
  return size_;
97
176k
}
98
99
160M
void CircularReadBuffer::Consume(size_t count, const Slice& prepend) {
100
160M
  pos_ += count;
101
160M
  if (pos_ >= capacity_) {
102
19.1k
    pos_ -= capacity_;
103
19.1k
  }
104
160M
  size_ -= count;
105
160M
  if (size_ == 0) {
106
160M
    pos_ = 0;
107
160M
  }
108
160M
  DCHECK(prepend_.empty());
109
160M
  prepend_ = prepend;
110
160M
  had_prepend_ = !prepend.empty();
111
160M
}
112
113
159M
bool CircularReadBuffer::ReadyToRead() {
114
159M
  return prepend_.empty() && 
(159M
had_prepend_159M
||
!Empty()158M
);
115
159M
}
116
117
} // namespace rpc
118
} // namespace yb