YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/rpc/binary_call_parser.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include "yb/rpc/binary_call_parser.h"
15
16
#include "yb/gutil/endian.h"
17
18
#include "yb/rpc/connection.h"
19
#include "yb/rpc/connection_context.h"
20
#include "yb/rpc/stream.h"
21
22
#include "yb/util/logging.h"
23
#include "yb/util/result.h"
24
#include "yb/util/size_literals.h"
25
#include "yb/util/status_format.h"
26
27
using yb::operator"" _MB;
28
29
DEFINE_bool(
30
    binary_call_parser_reject_on_mem_tracker_hard_limit, true,
31
    "Whether to reject/ignore calls on hitting mem tracker hard limit.");
32
33
DEFINE_int64(
34
    rpc_throttle_threshold_bytes, 1048576,
35
    "Throttle inbound RPC calls larger than specified size on hitting mem tracker soft limit. "
36
    "Throttling is disabled if negative value is specified.");
37
38
DECLARE_int32(memory_limit_warn_threshold_percentage);
39
40
namespace yb {
41
namespace rpc {
42
43
bool ShouldThrottleRpc(
44
141M
    const MemTrackerPtr& throttle_tracker, ssize_t call_data_size, const char* throttle_message) {
45
141M
  return (FLAGS_rpc_throttle_threshold_bytes >= 0 &&
46
141M
      
call_data_size > FLAGS_rpc_throttle_threshold_bytes140M
&&
47
141M
      
!CheckMemoryPressureWithLogging(throttle_tracker, 0 /* score */, throttle_message)6.62k
);
48
141M
}
49
50
BinaryCallParser::BinaryCallParser(
51
    const MemTrackerPtr& parent_tracker, size_t header_size, size_t size_offset,
52
    size_t max_message_length, IncludeHeader include_header, SkipEmptyMessages skip_empty_messages,
53
    BinaryCallParserListener* listener)
54
    : call_header_buffer_(header_size),
55
      size_offset_(size_offset),
56
      max_message_length_(max_message_length),
57
      include_header_(include_header),
58
      skip_empty_messages_(skip_empty_messages),
59
3.74M
      listener_(listener) {
60
3.74M
  buffer_tracker_ = MemTracker::FindOrCreateTracker("Reading", parent_tracker);
61
3.74M
}
62
63
Result<ProcessCallsResult> BinaryCallParser::Parse(
64
    const rpc::ConnectionPtr& connection, const IoVecs& data, ReadBufferFull read_buffer_full,
65
158M
    const MemTrackerPtr* tracker_for_throttle) {
66
158M
  if (call_data_.should_reject()) {
67
    // We can't properly respond with error, because we don't have enough call data since we
68
    // have ignored it. So, we will just ignore this call and client will have timeout.
69
    // We can implement partial higher level call header parsing to be able to respond with proper
70
    // error if we will need to detect case when server is hitting memory limit at client side.
71
4.99M
    call_data_.Reset();
72
4.99M
    call_data_consumption_ = ScopedTrackedConsumption();
73
153M
  } else if (!call_data_.empty()) {
74
34.1k
    RETURN_NOT_OK(listener_->HandleCall(connection, &call_data_));
75
34.1k
    call_data_.Reset();
76
34.1k
    call_data_consumption_ = ScopedTrackedConsumption();
77
34.1k
  }
78
79
158M
  const auto full_input_size = IoVecsFullSize(data);
80
158M
  VLOG
(4) << "BinaryCallParser::Parse, full_input_size: " << full_input_size438k
;
81
158M
  size_t consumed = 0;
82
158M
  const size_t header_size = call_header_buffer_.size();
83
158M
  const size_t body_offset = include_header_ ? 
07.61M
:
header_size151M
;
84
158M
  VLOG(4) << "BinaryCallParser::Parse, header_size: " << header_size
85
125k
          << " body_offset: " << body_offset;
86
320M
  while (full_input_size >= consumed + header_size) {
87
161M
    IoVecsToBuffer(data, consumed, consumed + header_size, &call_header_buffer_);
88
89
161M
    const size_t data_length = NetworkByteOrder::Load32(call_header_buffer_.data() + size_offset_);
90
161M
    const size_t total_length = data_length + header_size;
91
161M
    VLOG(4) << "BinaryCallParser::Parse, consumed: " << consumed << " data_length: " << data_length
92
483k
            << " total_length: " << total_length;
93
161M
    if (total_length > max_message_length_) {
94
2.20k
      return STATUS_FORMAT(
95
2.20k
          NetworkError,
96
2.20k
          "The frame had a length of $0, but we only support messages up to $1 bytes long.",
97
2.20k
          total_length, max_message_length_);
98
2.20k
    }
99
161M
    const size_t call_data_size = total_length - body_offset;
100
161M
    VLOG
(4) << "BinaryCallParser::Parse, call_data_size: " << call_data_size315k
;
101
161M
    if (consumed + total_length > full_input_size) {
102
      // `data` only contains beginning of the current call. Here we allocate memory buffer for the
103
      // whole call data and fill beginning with `data`, the rest of the data for the call will be
104
      // received and appended by the caller into the same buffer.
105
35.9k
      const size_t call_received_size = full_input_size - (consumed + body_offset);
106
18.4E
      VLOG(4) << "BinaryCallParser::Parse, call_received_size: " << call_received_size;
107
108
35.9k
      if (tracker_for_throttle) {
109
18.4E
        VLOG(4) << "BinaryCallParser::Parse, tracker_for_throttle memory usage: "
110
18.4E
                << (*tracker_for_throttle)->LogUsage("");
111
9.44k
        if (ShouldThrottleRpc(*tracker_for_throttle, call_data_size, "Ignoring RPC call: ")) {
112
6
          call_data_ = CallData(call_data_size, CallData::ShouldRejectTag());
113
6
          return ProcessCallsResult{
114
6
              .consumed = full_input_size,
115
6
              .buffer = Slice(),
116
6
              .bytes_to_skip = call_data_size - call_received_size
117
6
          };
118
6
        }
119
9.44k
      }
120
121
35.9k
      MemTracker* blocking_mem_tracker = nullptr;
122
35.9k
      if (buffer_tracker_->TryConsume(call_data_size, &blocking_mem_tracker)) {
123
34.1k
        call_data_consumption_ = ScopedTrackedConsumption(
124
34.1k
            buffer_tracker_, call_data_size, AlreadyConsumed::kTrue);
125
34.1k
        call_data_ = CallData(call_data_size);
126
34.1k
        IoVecsToBuffer(data, consumed + body_offset, full_input_size, call_data_.data());
127
34.1k
        Slice buffer(call_data_.data() + call_received_size, call_data_size - call_received_size);
128
34.1k
        VLOG(4) << "BinaryCallParser::Parse, consumed: " << consumed
129
1
                << " returning: { full_input_size: " << full_input_size
130
1
                << " buffer.size(): " << buffer.size() << " }";
131
34.1k
        return ProcessCallsResult{
132
34.1k
          .consumed = full_input_size,
133
34.1k
          .buffer = buffer,
134
34.1k
        };
135
34.1k
      } else 
if (1.78k
read_buffer_full1.78k
&&
consumed == 0321
) {
136
321
        auto consumption = blocking_mem_tracker ? blocking_mem_tracker->consumption() : 
-10
;
137
321
        auto limit = blocking_mem_tracker ? blocking_mem_tracker->limit() : 
-10
;
138
321
        if (FLAGS_binary_call_parser_reject_on_mem_tracker_hard_limit) {
139
321
          YB_LOG_EVERY_N_SECS(WARNING, 3)
140
12
              << "Unable to allocate read buffer because of limit, required: " << call_data_size
141
12
              << ", blocked by: " << AsString(blocking_mem_tracker)
142
12
              << ", consumption: " << consumption << " of " << limit << ". Call will be ignored.\n"
143
12
              << DumpMemoryUsage();
144
321
          call_data_ = CallData(call_data_size, CallData::ShouldRejectTag());
145
321
          return ProcessCallsResult{
146
321
            .consumed = full_input_size,
147
321
            .buffer = Slice(),
148
321
            .bytes_to_skip = call_data_size - call_received_size
149
321
          };
150
321
        } else {
151
          // For backward compatibility in behavior until we fix
152
          // https://github.com/yugabyte/yugabyte-db/issues/2563.
153
0
          LOG(WARNING) << "Unable to allocate read buffer because of limit, required: "
154
0
                       << call_data_size << ", blocked by: " << AsString(blocking_mem_tracker)
155
0
                       << ", consumption: " << consumption << " of " << limit << "\n"
156
0
                       << DumpMemoryUsage();
157
0
        }
158
321
      }
159
1.46k
      break;
160
35.9k
    }
161
162
    // We might need to skip empty messages (we use them as low level heartbeats for inter-YB RPC
163
    // connections, don't confuse with RAFT heartbeats which are higher level non-empty messages).
164
161M
    if (!skip_empty_messages_ || 
data_length > 0152M
) {
165
152M
      connection->UpdateLastActivity();
166
152M
      CallData call_data(call_data_size);
167
152M
      IoVecsToBuffer(data, consumed + body_offset, consumed + total_length, call_data.data());
168
152M
      RETURN_NOT_OK(listener_->HandleCall(connection, &call_data));
169
152M
    }
170
171
161M
    consumed += total_length;
172
161M
  }
173
18.4E
  VLOG(4) << "BinaryCallParser::Parse, returning: { consumed: " << consumed << " buffer: empty }";
174
158M
  return ProcessCallsResult {
175
158M
    .consumed = consumed,
176
158M
    .buffer = Slice(),
177
158M
  };
178
158M
}
179
180
} // namespace rpc
181
} // namespace yb