YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/rpc/binary_call_parser.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include "yb/rpc/binary_call_parser.h"
15
16
#include "yb/gutil/endian.h"
17
18
#include "yb/rpc/connection.h"
19
#include "yb/rpc/connection_context.h"
20
#include "yb/rpc/stream.h"
21
22
#include "yb/util/logging.h"
23
#include "yb/util/result.h"
24
#include "yb/util/size_literals.h"
25
#include "yb/util/status_format.h"
26
27
using yb::operator"" _MB;
28
29
DEFINE_bool(
30
    binary_call_parser_reject_on_mem_tracker_hard_limit, true,
31
    "Whether to reject/ignore calls on hitting mem tracker hard limit.");
32
33
DEFINE_int64(
34
    rpc_throttle_threshold_bytes, 1048576,
35
    "Throttle inbound RPC calls larger than specified size on hitting mem tracker soft limit. "
36
    "Throttling is disabled if negative value is specified.");
37
38
DECLARE_int32(memory_limit_warn_threshold_percentage);
39
40
namespace yb {
41
namespace rpc {
42
43
bool ShouldThrottleRpc(
44
38.9M
    const MemTrackerPtr& throttle_tracker, ssize_t call_data_size, const char* throttle_message) {
45
38.9M
  return (FLAGS_rpc_throttle_threshold_bytes >= 0 &&
46
38.9M
      call_data_size > FLAGS_rpc_throttle_threshold_bytes &&
47
1.24k
      !CheckMemoryPressureWithLogging(throttle_tracker, 0 /* score */, throttle_message));
48
38.9M
}
49
50
BinaryCallParser::BinaryCallParser(
51
    const MemTrackerPtr& parent_tracker, size_t header_size, size_t size_offset,
52
    size_t max_message_length, IncludeHeader include_header, SkipEmptyMessages skip_empty_messages,
53
    BinaryCallParserListener* listener)
54
    : call_header_buffer_(header_size),
55
      size_offset_(size_offset),
56
      max_message_length_(max_message_length),
57
      include_header_(include_header),
58
      skip_empty_messages_(skip_empty_messages),
59
942k
      listener_(listener) {
60
942k
  buffer_tracker_ = MemTracker::FindOrCreateTracker("Reading", parent_tracker);
61
942k
}
62
63
Result<ProcessCallsResult> BinaryCallParser::Parse(
64
    const rpc::ConnectionPtr& connection, const IoVecs& data, ReadBufferFull read_buffer_full,
65
36.7M
    const MemTrackerPtr* tracker_for_throttle) {
66
36.7M
  if (call_data_.should_reject()) {
67
    // We can't properly respond with error, because we don't have enough call data since we
68
    // have ignored it. So, we will just ignore this call and client will have timeout.
69
    // We can implement partial higher level call header parsing to be able to respond with proper
70
    // error if we will need to detect case when server is hitting memory limit at client side.
71
1.53M
    call_data_.Reset();
72
1.53M
    call_data_consumption_ = ScopedTrackedConsumption();
73
35.2M
  } else if (!call_data_.empty()) {
74
8.19k
    RETURN_NOT_OK(listener_->HandleCall(connection, &call_data_));
75
8.19k
    call_data_.Reset();
76
8.19k
    call_data_consumption_ = ScopedTrackedConsumption();
77
8.19k
  }
78
79
36.7M
  const auto full_input_size = IoVecsFullSize(data);
80
14.4k
  VLOG(4) << "BinaryCallParser::Parse, full_input_size: " << full_input_size;
81
36.7M
  size_t consumed = 0;
82
36.7M
  const size_t header_size = call_header_buffer_.size();
83
32.8M
  const size_t body_offset = include_header_ ? 0 : header_size;
84
25.0k
  VLOG(4) << "BinaryCallParser::Parse, header_size: " << header_size
85
25.0k
          << " body_offset: " << body_offset;
86
74.6M
  while (full_input_size >= consumed + header_size) {
87
37.9M
    IoVecsToBuffer(data, consumed, consumed + header_size, &call_header_buffer_);
88
89
37.9M
    const size_t data_length = NetworkByteOrder::Load32(call_header_buffer_.data() + size_offset_);
90
37.9M
    const size_t total_length = data_length + header_size;
91
60.7k
    VLOG(4) << "BinaryCallParser::Parse, consumed: " << consumed << " data_length: " << data_length
92
60.7k
            << " total_length: " << total_length;
93
37.9M
    if (total_length > max_message_length_) {
94
0
      return STATUS_FORMAT(
95
0
          NetworkError,
96
0
          "The frame had a length of $0, but we only support messages up to $1 bytes long.",
97
0
          total_length, max_message_length_);
98
0
    }
99
37.9M
    const size_t call_data_size = total_length - body_offset;
100
42.8k
    VLOG(4) << "BinaryCallParser::Parse, call_data_size: " << call_data_size;
101
37.9M
    if (consumed + total_length > full_input_size) {
102
      // `data` only contains beginning of the current call. Here we allocate memory buffer for the
103
      // whole call data and fill beginning with `data`, the rest of the data for the call will be
104
      // received and appended by the caller into the same buffer.
105
8.19k
      const size_t call_received_size = full_input_size - (consumed + body_offset);
106
1
      VLOG(4) << "BinaryCallParser::Parse, call_received_size: " << call_received_size;
107
108
8.19k
      if (tracker_for_throttle) {
109
0
        VLOG(4) << "BinaryCallParser::Parse, tracker_for_throttle memory usage: "
110
0
                << (*tracker_for_throttle)->LogUsage("");
111
920
        if (ShouldThrottleRpc(*tracker_for_throttle, call_data_size, "Ignoring RPC call: ")) {
112
0
          call_data_ = CallData(call_data_size, CallData::ShouldRejectTag());
113
0
          return ProcessCallsResult{
114
0
              .consumed = full_input_size,
115
0
              .buffer = Slice(),
116
0
              .bytes_to_skip = call_data_size - call_received_size
117
0
          };
118
0
        }
119
8.19k
      }
120
121
8.19k
      MemTracker* blocking_mem_tracker = nullptr;
122
8.19k
      if (buffer_tracker_->TryConsume(call_data_size, &blocking_mem_tracker)) {
123
8.19k
        call_data_consumption_ = ScopedTrackedConsumption(
124
8.19k
            buffer_tracker_, call_data_size, AlreadyConsumed::kTrue);
125
8.19k
        call_data_ = CallData(call_data_size);
126
8.19k
        IoVecsToBuffer(data, consumed + body_offset, full_input_size, call_data_.data());
127
8.19k
        Slice buffer(call_data_.data() + call_received_size, call_data_size - call_received_size);
128
0
        VLOG(4) << "BinaryCallParser::Parse, consumed: " << consumed
129
0
                << " returning: { full_input_size: " << full_input_size
130
0
                << " buffer.size(): " << buffer.size() << " }";
131
8.19k
        return ProcessCallsResult{
132
8.19k
          .consumed = full_input_size,
133
8.19k
          .buffer = buffer,
134
8.19k
        };
135
18.4E
      } else if (read_buffer_full && consumed == 0) {
136
0
        auto consumption = blocking_mem_tracker ? blocking_mem_tracker->consumption() : -1;
137
0
        auto limit = blocking_mem_tracker ? blocking_mem_tracker->limit() : -1;
138
0
        if (FLAGS_binary_call_parser_reject_on_mem_tracker_hard_limit) {
139
0
          YB_LOG_EVERY_N_SECS(WARNING, 3)
140
0
              << "Unable to allocate read buffer because of limit, required: " << call_data_size
141
0
              << ", blocked by: " << AsString(blocking_mem_tracker)
142
0
              << ", consumption: " << consumption << " of " << limit << ". Call will be ignored.\n"
143
0
              << DumpMemoryUsage();
144
0
          call_data_ = CallData(call_data_size, CallData::ShouldRejectTag());
145
0
          return ProcessCallsResult{
146
0
            .consumed = full_input_size,
147
0
            .buffer = Slice(),
148
0
            .bytes_to_skip = call_data_size - call_received_size
149
0
          };
150
0
        } else {
151
          // For backward compatibility in behavior until we fix
152
          // https://github.com/yugabyte/yugabyte-db/issues/2563.
153
0
          LOG(WARNING) << "Unable to allocate read buffer because of limit, required: "
154
0
                       << call_data_size << ", blocked by: " << AsString(blocking_mem_tracker)
155
0
                       << ", consumption: " << consumption << " of " << limit << "\n"
156
0
                       << DumpMemoryUsage();
157
0
        }
158
0
      }
159
18.4E
      break;
160
37.9M
    }
161
162
    // We might need to skip empty messages (we use them as low level heartbeats for inter-YB RPC
163
    // connections, don't confuse with RAFT heartbeats which are higher level non-empty messages).
164
37.9M
    if (!skip_empty_messages_ || data_length > 0) {
165
37.6M
      connection->UpdateLastActivity();
166
37.6M
      CallData call_data(call_data_size);
167
37.6M
      IoVecsToBuffer(data, consumed + body_offset, consumed + total_length, call_data.data());
168
37.6M
      RETURN_NOT_OK(listener_->HandleCall(connection, &call_data));
169
37.6M
    }
170
171
37.9M
    consumed += total_length;
172
37.9M
  }
173
18.4E
  VLOG(4) << "BinaryCallParser::Parse, returning: { consumed: " << consumed << " buffer: empty }";
174
36.7M
  return ProcessCallsResult {
175
36.7M
    .consumed = consumed,
176
36.7M
    .buffer = Slice(),
177
36.7M
  };
178
36.7M
}
179
180
} // namespace rpc
181
} // namespace yb