YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/rpc/refined_stream.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include "yb/rpc/refined_stream.h"
15
16
#include "yb/rpc/rpc_util.h"
17
18
#include "yb/util/logging.h"
19
#include "yb/util/result.h"
20
#include "yb/util/status_format.h"
21
22
namespace yb {
23
namespace rpc {
24
25
RefinedStream::RefinedStream(
26
    std::unique_ptr<Stream> lower_stream, std::unique_ptr<StreamRefiner> refiner,
27
    size_t receive_buffer_size, const MemTrackerPtr& buffer_tracker)
28
    : lower_stream_(std::move(lower_stream)), refiner_(std::move(refiner)),
29
3.69M
      read_buffer_(receive_buffer_size, buffer_tracker) {
30
3.69M
}
31
32
324M
size_t RefinedStream::GetPendingWriteBytes() {
33
324M
  return lower_stream_->GetPendingWriteBytes();
34
324M
}
35
36
2.20k
void RefinedStream::Close() {
37
2.20k
  lower_stream_->Close();
38
2.20k
}
39
40
160M
Status RefinedStream::TryWrite() {
41
160M
  return lower_stream_->TryWrite();
42
160M
}
43
44
0
void RefinedStream::ParseReceived() {
45
0
  lower_stream_->ParseReceived();
46
0
}
47
48
913M
bool RefinedStream::Idle(std::string* reason) {
49
913M
  return lower_stream_->Idle(reason);
50
913M
}
51
52
8
void RefinedStream::DumpPB(const DumpRunningRpcsRequestPB& req, RpcConnectionPB* resp) {
53
8
  lower_stream_->DumpPB(req, resp);
54
8
}
55
56
52.6M
const Endpoint& RefinedStream::Remote() const {
57
52.6M
  return lower_stream_->Remote();
58
52.6M
}
59
60
212k
const Endpoint& RefinedStream::Local() const {
61
212k
  return lower_stream_->Local();
62
212k
}
63
64
3.69M
Status RefinedStream::Start(bool connect, ev::loop_ref* loop, StreamContext* context) {
65
3.69M
  local_side_ = connect ? 
LocalSide::kClient3.07M
:
LocalSide::kServer623k
;
66
3.69M
  context_ = context;
67
3.69M
  refiner_->Start(this);
68
3.69M
  return lower_stream_->Start(connect, loop, this);
69
3.69M
}
70
71
6.23M
void RefinedStream::Shutdown(const Status& status) {
72
6.23M
  
VLOG_WITH_PREFIX1.11k
(1) << "Shutdown with status: " << status1.11k
;
73
74
6.23M
  for (auto& data : pending_data_) {
75
5.01M
    if (
data5.01M
) {
76
5.01M
      context().Transferred(data, status);
77
5.01M
    }
78
5.01M
  }
79
80
6.23M
  pending_data_.clear();
81
6.23M
  lower_stream_->Shutdown(status);
82
6.23M
}
83
84
163M
Result<size_t> RefinedStream::Send(OutboundDataPtr data) {
85
163M
  switch (state_) {
86
6.24M
  case RefinedStreamState::kInitial:
87
6.24M
  case RefinedStreamState::kHandshake:
88
6.24M
    pending_data_.push_back(std::move(data));
89
6.24M
    return std::numeric_limits<size_t>::max();
90
156k
  case RefinedStreamState::kEnabled:
91
156k
    RETURN_NOT_OK(refiner_->Send(std::move(data)));
92
156k
    return std::numeric_limits<size_t>::max();
93
157M
  case RefinedStreamState::kDisabled:
94
157M
    return lower_stream_->Send(std::move(data));
95
163M
  }
96
97
0
  FATAL_INVALID_ENUM_VALUE(RefinedStreamState, state_);
98
0
}
99
100
146M
void RefinedStream::UpdateLastActivity() {
101
146M
  context_->UpdateLastActivity();
102
146M
}
103
104
157M
void RefinedStream::UpdateLastRead() {
105
157M
  context_->UpdateLastRead();
106
157M
}
107
108
155M
void RefinedStream::UpdateLastWrite() {
109
155M
  context_->UpdateLastWrite();
110
155M
}
111
112
157M
void RefinedStream::Transferred(const OutboundDataPtr& data, const Status& status) {
113
157M
  context_->Transferred(data, status);
114
157M
}
115
116
3.00M
void RefinedStream::Destroy(const Status& status) {
117
3.00M
  context_->Destroy(status);
118
3.00M
}
119
120
506
std::string RefinedStream::ToString() const {
121
506
  return Format("$0[$1] $2 $3",
122
506
                *refiner_, local_side_ == LocalSide::kClient ? 
"C"1
:
"S"505
, state_, *lower_stream_);
123
506
}
124
125
18.8k
void RefinedStream::Cancelled(size_t handle) {
126
18.8k
  if (state_ == RefinedStreamState::kDisabled) {
127
18.7k
    lower_stream_->Cancelled(handle);
128
18.7k
    return;
129
18.7k
  }
130
1
  LOG_WITH_PREFIX(DFATAL) << "Cancel is not supported for proxy stream: " << handle;
131
1
}
132
133
57.9M
bool RefinedStream::IsConnected() {
134
57.9M
  return state_ == RefinedStreamState::kEnabled || 
state_ == RefinedStreamState::kDisabled57.9M
;
135
57.9M
}
136
137
22.6M
const Protocol* RefinedStream::GetProtocol() {
138
22.6M
  return refiner_->GetProtocol();
139
22.6M
}
140
141
1.55G
StreamReadBuffer& RefinedStream::ReadBuffer() {
142
1.55G
  return state_ != RefinedStreamState::kDisabled ? 
read_buffer_16.1M
:
context_->ReadBuffer()1.53G
;
143
1.55G
}
144
145
155M
Result<size_t> RefinedStream::ProcessReceived(ReadBufferFull read_buffer_full) {
146
155M
  switch (state_) {
147
622k
    case RefinedStreamState::kInitial: {
148
622k
      RETURN_NOT_OK(refiner_->ProcessHeader());
149
622k
      if (state_ == RefinedStreamState::kInitial) {
150
        // Received data was not enough to check stream header.
151
0
        return 0;
152
0
      }
153
622k
      return ProcessReceived(read_buffer_full);
154
622k
    }
155
156
154M
    case RefinedStreamState::kDisabled:
157
154M
      return context_->ProcessReceived(read_buffer_full);
158
159
8.84k
    case RefinedStreamState::kHandshake:
160
8.84k
      return Handshake();
161
162
155k
    case RefinedStreamState::kEnabled: {
163
155k
      return Read();
164
622k
    }
165
155M
  }
166
167
0
  return STATUS_FORMAT(IllegalState, "Unexpected state: $0", to_underlying(state_));
168
155M
}
169
170
1.20M
void RefinedStream::Connected() {
171
1.20M
  if (local_side_ != LocalSide::kClient) {
172
622k
    return;
173
622k
  }
174
175
579k
  auto status = StartHandshake();
176
579k
  if (status.ok()) {
177
576k
    status = refiner_->Handshake();
178
576k
  }
179
579k
  if (!status.ok()) {
180
0
    context_->Destroy(status);
181
0
  }
182
579k
}
183
184
1.19M
CHECKED_STATUS TransferData(StreamReadBuffer* source, StreamReadBuffer* dest) {
185
1.19M
  auto dst = VERIFY_RESULT(dest->PrepareAppend());
186
0
  auto dst_it = dst.begin();
187
1.19M
  size_t total_len = 0;
188
1.19M
  for (auto src_vec : source->AppendedVecs()) {
189
1.81M
    while (src_vec.iov_len != 0) {
190
619k
      if (dst_it->iov_len == 0) {
191
0
        if (++dst_it == dst.end()) {
192
0
          return STATUS(RuntimeError, "No enough space in destination buffer");
193
0
        }
194
0
      }
195
619k
      size_t len = std::min(dst_it->iov_len, src_vec.iov_len);
196
619k
      memcpy(dst_it->iov_base, src_vec.iov_base, len);
197
619k
      IoVecRemovePrefix(len, &*dst_it);
198
619k
      IoVecRemovePrefix(len, &src_vec);
199
619k
      total_len += len;
200
619k
    }
201
1.19M
  }
202
1.19M
  source->Consume(total_len, Slice());
203
1.19M
  dest->DataAppended(total_len);
204
1.19M
  return Status::OK();
205
1.19M
}
206
207
1.19M
Status RefinedStream::Established(RefinedStreamState state) {
208
1.19M
  state_ = state;
209
210
1.19M
  if (state == RefinedStreamState::kDisabled) {
211
1.19M
    RETURN_NOT_OK(TransferData(&read_buffer_, &context_->ReadBuffer()));
212
1.19M
  }
213
214
1.19M
  ResetLogPrefix();
215
216
1.19M
  
VLOG_WITH_PREFIX407
(1) << __func__ << ": " << state407
;
217
218
1.19M
  context().Connected();
219
1.24M
  for (auto& data : pending_data_) {
220
1.24M
    RETURN_NOT_OK(Send(std::move(data)));
221
1.24M
  }
222
1.19M
  pending_data_.clear();
223
1.19M
  return Status::OK();
224
1.19M
}
225
226
224k
Status RefinedStream::SendToLower(OutboundDataPtr data) {
227
224k
  return ResultToStatus(lower_stream_->Send(std::move(data)));
228
224k
}
229
230
582k
Status RefinedStream::StartHandshake() {
231
582k
  state_ = RefinedStreamState::kHandshake;
232
582k
  ResetLogPrefix();
233
582k
  return Status::OK();
234
582k
}
235
236
8.84k
Result<size_t> RefinedStream::Handshake() {
237
8.84k
  auto handshake_status = refiner_->Handshake();
238
8.84k
  
LOG_IF_WITH_PREFIX457
(INFO, !handshake_status.ok()) << "Handshake failed: " << handshake_status457
;
239
8.84k
  RETURN_NOT_OK(handshake_status);
240
241
8.39k
  if (state_ == RefinedStreamState::kEnabled) {
242
5.21k
    return Read();
243
5.21k
  }
244
245
3.18k
  return 0;
246
8.39k
}
247
248
// Used to read bytes to global skip buffer.
249
class SkipStreamReadBuffer : public StreamReadBuffer {
250
 public:
251
  explicit SkipStreamReadBuffer(const Slice& out, size_t skip_len)
252
32.4k
      : out_(out), wpos_(out_.mutable_data()), skip_len_(skip_len) {}
253
254
0
  size_t DataAvailable() override { return 0; }
255
256
0
  bool ReadyToRead() override { return false; }
257
258
0
  bool Empty() override { return true; }
259
260
0
  void Reset() override {
261
0
    wpos_ = out_.mutable_data();
262
0
  }
263
264
31.7k
  bool Full() override {
265
31.7k
    return wpos_ == out_.end();
266
31.7k
  }
267
268
32.4k
  Result<IoVecs> PrepareAppend() override {
269
32.4k
    return IoVecs({iovec{
270
32.4k
      .iov_base = wpos_,
271
32.4k
      .iov_len = std::min(static_cast<size_t>(out_.end() - wpos_), skip_len_),
272
32.4k
    }});
273
32.4k
  }
274
275
32.4k
  void DataAppended(size_t len) override {
276
32.4k
    skip_len_ -= len;
277
32.4k
    wpos_ += len;
278
32.4k
  }
279
280
32.4k
  IoVecs AppendedVecs() override {
281
32.4k
    return IoVecs({iovec{
282
32.4k
      .iov_base = out_.mutable_data(),
283
32.4k
      .iov_len = static_cast<size_t>(wpos_ - out_.mutable_data()),
284
32.4k
    }});
285
32.4k
  }
286
287
0
  void Consume(size_t count, const Slice& prepend) override {
288
0
  }
289
290
0
  std::string ToString() const override {
291
0
    return "SkipStreamReadBuffer";
292
0
  }
293
294
 private:
295
  Slice out_;
296
  uint8_t* wpos_;
297
  size_t skip_len_;
298
};
299
300
160k
Result<size_t> RefinedStream::Read() {
301
168k
  for (;;) {
302
168k
    auto& out_buffer = context_->ReadBuffer();
303
168k
    auto data_available_before_reading = read_buffer_.DataAvailable();
304
305
168k
    if (upper_stream_bytes_to_skip_ > 0) {
306
1.60k
      auto global_skip_buffer = GetGlobalSkipBuffer();
307
32.4k
      do {
308
32.4k
        SkipStreamReadBuffer buffer(global_skip_buffer, upper_stream_bytes_to_skip_);
309
32.4k
        RETURN_NOT_OK(refiner_->Read(&buffer));
310
32.4k
        size_t len = buffer.AppendedVecs()[0].iov_len;
311
32.4k
        if (!len) {
312
1.35k
          break;
313
1.35k
        }
314
31.0k
        
VLOG_WITH_PREFIX0
(4) << "Skip upper: " << len << " of " << upper_stream_bytes_to_skip_0
;
315
31.0k
        upper_stream_bytes_to_skip_ -= len;
316
31.0k
      } while (upper_stream_bytes_to_skip_ > 0);
317
1.60k
    }
318
319
168k
    if (upper_stream_bytes_to_skip_ == 0) {
320
166k
      auto read_buffer_full = 
VERIFY_RESULT166k
(refiner_->Read(&out_buffer));166k
321
166k
      if (out_buffer.ReadyToRead()) {
322
154k
        auto temp = VERIFY_RESULT(context_->ProcessReceived(read_buffer_full));
323
0
        upper_stream_bytes_to_skip_ = temp;
324
154k
        
VLOG_IF_WITH_PREFIX59
(3, temp != 0) << "Skip: " << upper_stream_bytes_to_skip_59
;
325
154k
      }
326
166k
    }
327
328
168k
    if (read_buffer_.Empty() || 
read_buffer_.DataAvailable() == data_available_before_reading8.64k
) {
329
160k
      break;
330
160k
    }
331
168k
  }
332
333
160k
  return 0;
334
160k
}
335
336
RefinedStreamFactory::RefinedStreamFactory(
337
    StreamFactoryPtr lower_layer_factory, const MemTrackerPtr& buffer_tracker,
338
    RefinerFactory refiner_factory)
339
    : lower_layer_factory_(std::move(lower_layer_factory)), buffer_tracker_(buffer_tracker),
340
23.7k
      refiner_factory_(std::move(refiner_factory)) {
341
23.7k
}
342
343
3.69M
std::unique_ptr<Stream> RefinedStreamFactory::Create(const StreamCreateData& data) {
344
3.69M
  return std::make_unique<RefinedStream>(
345
3.69M
      lower_layer_factory_->Create(data), refiner_factory_(data), data.receive_buffer_size,
346
3.69M
      buffer_tracker_);
347
3.69M
}
348
349
}  // namespace rpc
350
}  // namespace yb