YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/rpc/refined_stream.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include "yb/rpc/refined_stream.h"
15
16
#include "yb/rpc/rpc_util.h"
17
18
#include "yb/util/logging.h"
19
#include "yb/util/result.h"
20
#include "yb/util/status_format.h"
21
22
namespace yb {
23
namespace rpc {
24
25
RefinedStream::RefinedStream(
26
    std::unique_ptr<Stream> lower_stream, std::unique_ptr<StreamRefiner> refiner,
27
    size_t receive_buffer_size, const MemTrackerPtr& buffer_tracker)
28
    : lower_stream_(std::move(lower_stream)), refiner_(std::move(refiner)),
29
876k
      read_buffer_(receive_buffer_size, buffer_tracker) {
30
876k
}
31
32
84.0M
size_t RefinedStream::GetPendingWriteBytes() {
33
84.0M
  return lower_stream_->GetPendingWriteBytes();
34
84.0M
}
35
36
1.14k
void RefinedStream::Close() {
37
1.14k
  lower_stream_->Close();
38
1.14k
}
39
40
41.2M
Status RefinedStream::TryWrite() {
41
41.2M
  return lower_stream_->TryWrite();
42
41.2M
}
43
44
0
void RefinedStream::ParseReceived() {
45
0
  lower_stream_->ParseReceived();
46
0
}
47
48
58.5M
bool RefinedStream::Idle(std::string* reason) {
49
58.5M
  return lower_stream_->Idle(reason);
50
58.5M
}
51
52
8
void RefinedStream::DumpPB(const DumpRunningRpcsRequestPB& req, RpcConnectionPB* resp) {
53
8
  lower_stream_->DumpPB(req, resp);
54
8
}
55
56
15.0M
const Endpoint& RefinedStream::Remote() const {
57
15.0M
  return lower_stream_->Remote();
58
15.0M
}
59
60
23.4k
const Endpoint& RefinedStream::Local() const {
61
23.4k
  return lower_stream_->Local();
62
23.4k
}
63
64
879k
Status RefinedStream::Start(bool connect, ev::loop_ref* loop, StreamContext* context) {
65
564k
  local_side_ = connect ? LocalSide::kClient : LocalSide::kServer;
66
879k
  context_ = context;
67
879k
  refiner_->Start(this);
68
879k
  return lower_stream_->Start(connect, loop, this);
69
879k
}
70
71
1.10M
void RefinedStream::Shutdown(const Status& status) {
72
415
  VLOG_WITH_PREFIX(1) << "Shutdown with status: " << status;
73
74
608k
  for (auto& data : pending_data_) {
75
608k
    if (data) {
76
607k
      context().Transferred(data, status);
77
607k
    }
78
608k
  }
79
80
1.10M
  pending_data_.clear();
81
1.10M
  lower_stream_->Shutdown(status);
82
1.10M
}
83
84
42.6M
Result<size_t> RefinedStream::Send(OutboundDataPtr data) {
85
42.6M
  switch (state_) {
86
1.20M
  case RefinedStreamState::kInitial:
87
1.20M
  case RefinedStreamState::kHandshake:
88
1.20M
    pending_data_.push_back(std::move(data));
89
1.20M
    return std::numeric_limits<size_t>::max();
90
57.4k
  case RefinedStreamState::kEnabled:
91
57.4k
    RETURN_NOT_OK(refiner_->Send(std::move(data)));
92
57.4k
    return std::numeric_limits<size_t>::max();
93
41.4M
  case RefinedStreamState::kDisabled:
94
41.4M
    return lower_stream_->Send(std::move(data));
95
0
  }
96
97
0
  FATAL_INVALID_ENUM_VALUE(RefinedStreamState, state_);
98
0
}
99
100
39.9M
void RefinedStream::UpdateLastActivity() {
101
39.9M
  context_->UpdateLastActivity();
102
39.9M
}
103
104
40.2M
void RefinedStream::UpdateLastRead() {
105
40.2M
  context_->UpdateLastRead();
106
40.2M
}
107
108
40.4M
void RefinedStream::UpdateLastWrite() {
109
40.4M
  context_->UpdateLastWrite();
110
40.4M
}
111
112
41.4M
void RefinedStream::Transferred(const OutboundDataPtr& data, const Status& status) {
113
41.4M
  context_->Transferred(data, status);
114
41.4M
}
115
116
550k
void RefinedStream::Destroy(const Status& status) {
117
550k
  context_->Destroy(status);
118
550k
}
119
120
189
std::string RefinedStream::ToString() const {
121
189
  return Format("$0[$1] $2 $3",
122
188
                *refiner_, local_side_ == LocalSide::kClient ? "C" : "S", state_, *lower_stream_);
123
189
}
124
125
3.26k
void RefinedStream::Cancelled(size_t handle) {
126
3.26k
  if (state_ == RefinedStreamState::kDisabled) {
127
3.26k
    lower_stream_->Cancelled(handle);
128
3.26k
    return;
129
3.26k
  }
130
0
  LOG_WITH_PREFIX(DFATAL) << "Cancel is not supported for proxy stream: " << handle;
131
0
}
132
133
13.9M
bool RefinedStream::IsConnected() {
134
13.9M
  return state_ == RefinedStreamState::kEnabled || state_ == RefinedStreamState::kDisabled;
135
13.9M
}
136
137
3.32M
const Protocol* RefinedStream::GetProtocol() {
138
3.32M
  return refiner_->GetProtocol();
139
3.32M
}
140
141
221M
StreamReadBuffer& RefinedStream::ReadBuffer() {
142
218M
  return state_ != RefinedStreamState::kDisabled ? read_buffer_ : context_->ReadBuffer();
143
221M
}
144
145
40.1M
Result<size_t> RefinedStream::ProcessReceived(ReadBufferFull read_buffer_full) {
146
40.1M
  switch (state_) {
147
314k
    case RefinedStreamState::kInitial: {
148
314k
      RETURN_NOT_OK(refiner_->ProcessHeader());
149
314k
      if (state_ == RefinedStreamState::kInitial) {
150
        // Received data was not enough to check stream header.
151
0
        return 0;
152
0
      }
153
314k
      return ProcessReceived(read_buffer_full);
154
314k
    }
155
156
39.7M
    case RefinedStreamState::kDisabled:
157
39.7M
      return context_->ProcessReceived(read_buffer_full);
158
159
3.21k
    case RefinedStreamState::kHandshake:
160
3.21k
      return Handshake();
161
162
58.0k
    case RefinedStreamState::kEnabled: {
163
58.0k
      return Read();
164
0
    }
165
0
  }
166
167
0
  return STATUS_FORMAT(IllegalState, "Unexpected state: $0", to_underlying(state_));
168
0
}
169
170
587k
void RefinedStream::Connected() {
171
587k
  if (local_side_ != LocalSide::kClient) {
172
314k
    return;
173
314k
  }
174
175
273k
  auto status = StartHandshake();
176
273k
  if (status.ok()) {
177
271k
    status = refiner_->Handshake();
178
271k
  }
179
273k
  if (!status.ok()) {
180
0
    context_->Destroy(status);
181
0
  }
182
273k
}
183
184
583k
CHECKED_STATUS TransferData(StreamReadBuffer* source, StreamReadBuffer* dest) {
185
583k
  auto dst = VERIFY_RESULT(dest->PrepareAppend());
186
583k
  auto dst_it = dst.begin();
187
583k
  size_t total_len = 0;
188
585k
  for (auto src_vec : source->AppendedVecs()) {
189
898k
    while (src_vec.iov_len != 0) {
190
313k
      if (dst_it->iov_len == 0) {
191
0
        if (++dst_it == dst.end()) {
192
0
          return STATUS(RuntimeError, "No enough space in destination buffer");
193
0
        }
194
313k
      }
195
313k
      size_t len = std::min(dst_it->iov_len, src_vec.iov_len);
196
313k
      memcpy(dst_it->iov_base, src_vec.iov_base, len);
197
313k
      IoVecRemovePrefix(len, &*dst_it);
198
313k
      IoVecRemovePrefix(len, &src_vec);
199
313k
      total_len += len;
200
313k
    }
201
585k
  }
202
583k
  source->Consume(total_len, Slice());
203
583k
  dest->DataAppended(total_len);
204
583k
  return Status::OK();
205
583k
}
206
207
586k
Status RefinedStream::Established(RefinedStreamState state) {
208
586k
  state_ = state;
209
210
586k
  if (state == RefinedStreamState::kDisabled) {
211
584k
    RETURN_NOT_OK(TransferData(&read_buffer_, &context_->ReadBuffer()));
212
584k
  }
213
214
586k
  ResetLogPrefix();
215
216
18.4E
  VLOG_WITH_PREFIX(1) << __func__ << ": " << state;
217
218
586k
  context().Connected();
219
604k
  for (auto& data : pending_data_) {
220
604k
    RETURN_NOT_OK(Send(std::move(data)));
221
604k
  }
222
586k
  pending_data_.clear();
223
586k
  return Status::OK();
224
586k
}
225
226
112k
Status RefinedStream::SendToLower(OutboundDataPtr data) {
227
112k
  return ResultToStatus(lower_stream_->Send(std::move(data)));
228
112k
}
229
230
274k
Status RefinedStream::StartHandshake() {
231
274k
  state_ = RefinedStreamState::kHandshake;
232
274k
  ResetLogPrefix();
233
274k
  return Status::OK();
234
274k
}
235
236
3.21k
Result<size_t> RefinedStream::Handshake() {
237
3.21k
  auto handshake_status = refiner_->Handshake();
238
147
  LOG_IF_WITH_PREFIX(INFO, !handshake_status.ok()) << "Handshake failed: " << handshake_status;
239
3.21k
  RETURN_NOT_OK(handshake_status);
240
241
3.06k
  if (state_ == RefinedStreamState::kEnabled) {
242
1.91k
    return Read();
243
1.91k
  }
244
245
1.15k
  return 0;
246
1.15k
}
247
248
// Used to read bytes to global skip buffer.
249
class SkipStreamReadBuffer : public StreamReadBuffer {
250
 public:
251
  explicit SkipStreamReadBuffer(const Slice& out, size_t skip_len)
252
32.4k
      : out_(out), wpos_(out_.mutable_data()), skip_len_(skip_len) {}
253
254
0
  size_t DataAvailable() override { return 0; }
255
256
0
  bool ReadyToRead() override { return false; }
257
258
0
  bool Empty() override { return true; }
259
260
0
  void Reset() override {
261
0
    wpos_ = out_.mutable_data();
262
0
  }
263
264
31.7k
  bool Full() override {
265
31.7k
    return wpos_ == out_.end();
266
31.7k
  }
267
268
32.4k
  Result<IoVecs> PrepareAppend() override {
269
32.4k
    return IoVecs({iovec{
270
32.4k
      .iov_base = wpos_,
271
32.4k
      .iov_len = std::min(static_cast<size_t>(out_.end() - wpos_), skip_len_),
272
32.4k
    }});
273
32.4k
  }
274
275
32.4k
  void DataAppended(size_t len) override {
276
32.4k
    skip_len_ -= len;
277
32.4k
    wpos_ += len;
278
32.4k
  }
279
280
32.4k
  IoVecs AppendedVecs() override {
281
32.4k
    return IoVecs({iovec{
282
32.4k
      .iov_base = out_.mutable_data(),
283
32.4k
      .iov_len = static_cast<size_t>(wpos_ - out_.mutable_data()),
284
32.4k
    }});
285
32.4k
  }
286
287
0
  void Consume(size_t count, const Slice& prepend) override {
288
0
  }
289
290
0
  std::string ToString() const override {
291
0
    return "SkipStreamReadBuffer";
292
0
  }
293
294
 private:
295
  Slice out_;
296
  uint8_t* wpos_;
297
  size_t skip_len_;
298
};
299
300
59.9k
Result<size_t> RefinedStream::Read() {
301
63.2k
  for (;;) {
302
63.2k
    auto& out_buffer = context_->ReadBuffer();
303
63.2k
    auto data_available_before_reading = read_buffer_.DataAvailable();
304
305
63.2k
    if (upper_stream_bytes_to_skip_ > 0) {
306
1.61k
      auto global_skip_buffer = GetGlobalSkipBuffer();
307
32.4k
      do {
308
32.4k
        SkipStreamReadBuffer buffer(global_skip_buffer, upper_stream_bytes_to_skip_);
309
32.4k
        RETURN_NOT_OK(refiner_->Read(&buffer));
310
32.4k
        size_t len = buffer.AppendedVecs()[0].iov_len;
311
32.4k
        if (!len) {
312
1.36k
          break;
313
1.36k
        }
314
31.0k
        VLOG_WITH_PREFIX(4) << "Skip upper: " << len << " of " << upper_stream_bytes_to_skip_;
315
31.0k
        upper_stream_bytes_to_skip_ -= len;
316
31.0k
      } while (upper_stream_bytes_to_skip_ > 0);
317
1.61k
    }
318
319
63.2k
    if (upper_stream_bytes_to_skip_ == 0) {
320
61.8k
      auto read_buffer_full = VERIFY_RESULT(refiner_->Read(&out_buffer));
321
61.8k
      if (out_buffer.ReadyToRead()) {
322
57.2k
        auto temp = VERIFY_RESULT(context_->ProcessReceived(read_buffer_full));
323
57.2k
        upper_stream_bytes_to_skip_ = temp;
324
42
        VLOG_IF_WITH_PREFIX(3, temp != 0) << "Skip: " << upper_stream_bytes_to_skip_;
325
57.2k
      }
326
61.8k
    }
327
328
63.2k
    if (read_buffer_.Empty() || read_buffer_.DataAvailable() == data_available_before_reading) {
329
59.8k
      break;
330
59.8k
    }
331
63.2k
  }
332
333
59.8k
  return 0;
334
59.9k
}
335
336
RefinedStreamFactory::RefinedStreamFactory(
337
    StreamFactoryPtr lower_layer_factory, const MemTrackerPtr& buffer_tracker,
338
    RefinerFactory refiner_factory)
339
    : lower_layer_factory_(std::move(lower_layer_factory)), buffer_tracker_(buffer_tracker),
340
16.2k
      refiner_factory_(std::move(refiner_factory)) {
341
16.2k
}
342
343
873k
std::unique_ptr<Stream> RefinedStreamFactory::Create(const StreamCreateData& data) {
344
873k
  return std::make_unique<RefinedStream>(
345
873k
      lower_layer_factory_->Create(data), refiner_factory_(data), data.receive_buffer_size,
346
873k
      buffer_tracker_);
347
873k
}
348
349
}  // namespace rpc
350
}  // namespace yb