/Users/deen/code/yugabyte-db/src/yb/rpc/refined_stream.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | // |
13 | | |
14 | | #include "yb/rpc/refined_stream.h" |
15 | | |
16 | | #include "yb/rpc/rpc_util.h" |
17 | | |
18 | | #include "yb/util/logging.h" |
19 | | #include "yb/util/result.h" |
20 | | #include "yb/util/status_format.h" |
21 | | |
22 | | namespace yb { |
23 | | namespace rpc { |
24 | | |
25 | | RefinedStream::RefinedStream( |
26 | | std::unique_ptr<Stream> lower_stream, std::unique_ptr<StreamRefiner> refiner, |
27 | | size_t receive_buffer_size, const MemTrackerPtr& buffer_tracker) |
28 | | : lower_stream_(std::move(lower_stream)), refiner_(std::move(refiner)), |
29 | 3.69M | read_buffer_(receive_buffer_size, buffer_tracker) { |
30 | 3.69M | } |
31 | | |
32 | 324M | size_t RefinedStream::GetPendingWriteBytes() { |
33 | 324M | return lower_stream_->GetPendingWriteBytes(); |
34 | 324M | } |
35 | | |
36 | 2.20k | void RefinedStream::Close() { |
37 | 2.20k | lower_stream_->Close(); |
38 | 2.20k | } |
39 | | |
40 | 160M | Status RefinedStream::TryWrite() { |
41 | 160M | return lower_stream_->TryWrite(); |
42 | 160M | } |
43 | | |
44 | 0 | void RefinedStream::ParseReceived() { |
45 | 0 | lower_stream_->ParseReceived(); |
46 | 0 | } |
47 | | |
48 | 913M | bool RefinedStream::Idle(std::string* reason) { |
49 | 913M | return lower_stream_->Idle(reason); |
50 | 913M | } |
51 | | |
52 | 8 | void RefinedStream::DumpPB(const DumpRunningRpcsRequestPB& req, RpcConnectionPB* resp) { |
53 | 8 | lower_stream_->DumpPB(req, resp); |
54 | 8 | } |
55 | | |
56 | 52.6M | const Endpoint& RefinedStream::Remote() const { |
57 | 52.6M | return lower_stream_->Remote(); |
58 | 52.6M | } |
59 | | |
60 | 212k | const Endpoint& RefinedStream::Local() const { |
61 | 212k | return lower_stream_->Local(); |
62 | 212k | } |
63 | | |
64 | 3.69M | Status RefinedStream::Start(bool connect, ev::loop_ref* loop, StreamContext* context) { |
65 | 3.69M | local_side_ = connect ? LocalSide::kClient3.07M : LocalSide::kServer623k ; |
66 | 3.69M | context_ = context; |
67 | 3.69M | refiner_->Start(this); |
68 | 3.69M | return lower_stream_->Start(connect, loop, this); |
69 | 3.69M | } |
70 | | |
71 | 6.23M | void RefinedStream::Shutdown(const Status& status) { |
72 | 6.23M | VLOG_WITH_PREFIX1.11k (1) << "Shutdown with status: " << status1.11k ; |
73 | | |
74 | 6.23M | for (auto& data : pending_data_) { |
75 | 5.01M | if (data5.01M ) { |
76 | 5.01M | context().Transferred(data, status); |
77 | 5.01M | } |
78 | 5.01M | } |
79 | | |
80 | 6.23M | pending_data_.clear(); |
81 | 6.23M | lower_stream_->Shutdown(status); |
82 | 6.23M | } |
83 | | |
84 | 163M | Result<size_t> RefinedStream::Send(OutboundDataPtr data) { |
85 | 163M | switch (state_) { |
86 | 6.24M | case RefinedStreamState::kInitial: |
87 | 6.24M | case RefinedStreamState::kHandshake: |
88 | 6.24M | pending_data_.push_back(std::move(data)); |
89 | 6.24M | return std::numeric_limits<size_t>::max(); |
90 | 156k | case RefinedStreamState::kEnabled: |
91 | 156k | RETURN_NOT_OK(refiner_->Send(std::move(data))); |
92 | 156k | return std::numeric_limits<size_t>::max(); |
93 | 157M | case RefinedStreamState::kDisabled: |
94 | 157M | return lower_stream_->Send(std::move(data)); |
95 | 163M | } |
96 | | |
97 | 0 | FATAL_INVALID_ENUM_VALUE(RefinedStreamState, state_); |
98 | 0 | } |
99 | | |
100 | 146M | void RefinedStream::UpdateLastActivity() { |
101 | 146M | context_->UpdateLastActivity(); |
102 | 146M | } |
103 | | |
104 | 157M | void RefinedStream::UpdateLastRead() { |
105 | 157M | context_->UpdateLastRead(); |
106 | 157M | } |
107 | | |
108 | 155M | void RefinedStream::UpdateLastWrite() { |
109 | 155M | context_->UpdateLastWrite(); |
110 | 155M | } |
111 | | |
112 | 157M | void RefinedStream::Transferred(const OutboundDataPtr& data, const Status& status) { |
113 | 157M | context_->Transferred(data, status); |
114 | 157M | } |
115 | | |
116 | 3.00M | void RefinedStream::Destroy(const Status& status) { |
117 | 3.00M | context_->Destroy(status); |
118 | 3.00M | } |
119 | | |
120 | 506 | std::string RefinedStream::ToString() const { |
121 | 506 | return Format("$0[$1] $2 $3", |
122 | 506 | *refiner_, local_side_ == LocalSide::kClient ? "C"1 : "S"505 , state_, *lower_stream_); |
123 | 506 | } |
124 | | |
125 | 18.8k | void RefinedStream::Cancelled(size_t handle) { |
126 | 18.8k | if (state_ == RefinedStreamState::kDisabled) { |
127 | 18.7k | lower_stream_->Cancelled(handle); |
128 | 18.7k | return; |
129 | 18.7k | } |
130 | 1 | LOG_WITH_PREFIX(DFATAL) << "Cancel is not supported for proxy stream: " << handle; |
131 | 1 | } |
132 | | |
133 | 57.9M | bool RefinedStream::IsConnected() { |
134 | 57.9M | return state_ == RefinedStreamState::kEnabled || state_ == RefinedStreamState::kDisabled57.9M ; |
135 | 57.9M | } |
136 | | |
137 | 22.6M | const Protocol* RefinedStream::GetProtocol() { |
138 | 22.6M | return refiner_->GetProtocol(); |
139 | 22.6M | } |
140 | | |
141 | 1.55G | StreamReadBuffer& RefinedStream::ReadBuffer() { |
142 | 1.55G | return state_ != RefinedStreamState::kDisabled ? read_buffer_16.1M : context_->ReadBuffer()1.53G ; |
143 | 1.55G | } |
144 | | |
145 | 155M | Result<size_t> RefinedStream::ProcessReceived(ReadBufferFull read_buffer_full) { |
146 | 155M | switch (state_) { |
147 | 622k | case RefinedStreamState::kInitial: { |
148 | 622k | RETURN_NOT_OK(refiner_->ProcessHeader()); |
149 | 622k | if (state_ == RefinedStreamState::kInitial) { |
150 | | // Received data was not enough to check stream header. |
151 | 0 | return 0; |
152 | 0 | } |
153 | 622k | return ProcessReceived(read_buffer_full); |
154 | 622k | } |
155 | | |
156 | 154M | case RefinedStreamState::kDisabled: |
157 | 154M | return context_->ProcessReceived(read_buffer_full); |
158 | | |
159 | 8.84k | case RefinedStreamState::kHandshake: |
160 | 8.84k | return Handshake(); |
161 | | |
162 | 155k | case RefinedStreamState::kEnabled: { |
163 | 155k | return Read(); |
164 | 622k | } |
165 | 155M | } |
166 | | |
167 | 0 | return STATUS_FORMAT(IllegalState, "Unexpected state: $0", to_underlying(state_)); |
168 | 155M | } |
169 | | |
170 | 1.20M | void RefinedStream::Connected() { |
171 | 1.20M | if (local_side_ != LocalSide::kClient) { |
172 | 622k | return; |
173 | 622k | } |
174 | | |
175 | 579k | auto status = StartHandshake(); |
176 | 579k | if (status.ok()) { |
177 | 576k | status = refiner_->Handshake(); |
178 | 576k | } |
179 | 579k | if (!status.ok()) { |
180 | 0 | context_->Destroy(status); |
181 | 0 | } |
182 | 579k | } |
183 | | |
184 | 1.19M | CHECKED_STATUS TransferData(StreamReadBuffer* source, StreamReadBuffer* dest) { |
185 | 1.19M | auto dst = VERIFY_RESULT(dest->PrepareAppend()); |
186 | 0 | auto dst_it = dst.begin(); |
187 | 1.19M | size_t total_len = 0; |
188 | 1.19M | for (auto src_vec : source->AppendedVecs()) { |
189 | 1.81M | while (src_vec.iov_len != 0) { |
190 | 619k | if (dst_it->iov_len == 0) { |
191 | 0 | if (++dst_it == dst.end()) { |
192 | 0 | return STATUS(RuntimeError, "No enough space in destination buffer"); |
193 | 0 | } |
194 | 0 | } |
195 | 619k | size_t len = std::min(dst_it->iov_len, src_vec.iov_len); |
196 | 619k | memcpy(dst_it->iov_base, src_vec.iov_base, len); |
197 | 619k | IoVecRemovePrefix(len, &*dst_it); |
198 | 619k | IoVecRemovePrefix(len, &src_vec); |
199 | 619k | total_len += len; |
200 | 619k | } |
201 | 1.19M | } |
202 | 1.19M | source->Consume(total_len, Slice()); |
203 | 1.19M | dest->DataAppended(total_len); |
204 | 1.19M | return Status::OK(); |
205 | 1.19M | } |
206 | | |
207 | 1.19M | Status RefinedStream::Established(RefinedStreamState state) { |
208 | 1.19M | state_ = state; |
209 | | |
210 | 1.19M | if (state == RefinedStreamState::kDisabled) { |
211 | 1.19M | RETURN_NOT_OK(TransferData(&read_buffer_, &context_->ReadBuffer())); |
212 | 1.19M | } |
213 | | |
214 | 1.19M | ResetLogPrefix(); |
215 | | |
216 | 1.19M | VLOG_WITH_PREFIX407 (1) << __func__ << ": " << state407 ; |
217 | | |
218 | 1.19M | context().Connected(); |
219 | 1.24M | for (auto& data : pending_data_) { |
220 | 1.24M | RETURN_NOT_OK(Send(std::move(data))); |
221 | 1.24M | } |
222 | 1.19M | pending_data_.clear(); |
223 | 1.19M | return Status::OK(); |
224 | 1.19M | } |
225 | | |
226 | 224k | Status RefinedStream::SendToLower(OutboundDataPtr data) { |
227 | 224k | return ResultToStatus(lower_stream_->Send(std::move(data))); |
228 | 224k | } |
229 | | |
230 | 582k | Status RefinedStream::StartHandshake() { |
231 | 582k | state_ = RefinedStreamState::kHandshake; |
232 | 582k | ResetLogPrefix(); |
233 | 582k | return Status::OK(); |
234 | 582k | } |
235 | | |
236 | 8.84k | Result<size_t> RefinedStream::Handshake() { |
237 | 8.84k | auto handshake_status = refiner_->Handshake(); |
238 | 8.84k | LOG_IF_WITH_PREFIX457 (INFO, !handshake_status.ok()) << "Handshake failed: " << handshake_status457 ; |
239 | 8.84k | RETURN_NOT_OK(handshake_status); |
240 | | |
241 | 8.39k | if (state_ == RefinedStreamState::kEnabled) { |
242 | 5.21k | return Read(); |
243 | 5.21k | } |
244 | | |
245 | 3.18k | return 0; |
246 | 8.39k | } |
247 | | |
248 | | // Used to read bytes to global skip buffer. |
249 | | class SkipStreamReadBuffer : public StreamReadBuffer { |
250 | | public: |
251 | | explicit SkipStreamReadBuffer(const Slice& out, size_t skip_len) |
252 | 32.4k | : out_(out), wpos_(out_.mutable_data()), skip_len_(skip_len) {} |
253 | | |
254 | 0 | size_t DataAvailable() override { return 0; } |
255 | | |
256 | 0 | bool ReadyToRead() override { return false; } |
257 | | |
258 | 0 | bool Empty() override { return true; } |
259 | | |
260 | 0 | void Reset() override { |
261 | 0 | wpos_ = out_.mutable_data(); |
262 | 0 | } |
263 | | |
264 | 31.7k | bool Full() override { |
265 | 31.7k | return wpos_ == out_.end(); |
266 | 31.7k | } |
267 | | |
268 | 32.4k | Result<IoVecs> PrepareAppend() override { |
269 | 32.4k | return IoVecs({iovec{ |
270 | 32.4k | .iov_base = wpos_, |
271 | 32.4k | .iov_len = std::min(static_cast<size_t>(out_.end() - wpos_), skip_len_), |
272 | 32.4k | }}); |
273 | 32.4k | } |
274 | | |
275 | 32.4k | void DataAppended(size_t len) override { |
276 | 32.4k | skip_len_ -= len; |
277 | 32.4k | wpos_ += len; |
278 | 32.4k | } |
279 | | |
280 | 32.4k | IoVecs AppendedVecs() override { |
281 | 32.4k | return IoVecs({iovec{ |
282 | 32.4k | .iov_base = out_.mutable_data(), |
283 | 32.4k | .iov_len = static_cast<size_t>(wpos_ - out_.mutable_data()), |
284 | 32.4k | }}); |
285 | 32.4k | } |
286 | | |
287 | 0 | void Consume(size_t count, const Slice& prepend) override { |
288 | 0 | } |
289 | | |
290 | 0 | std::string ToString() const override { |
291 | 0 | return "SkipStreamReadBuffer"; |
292 | 0 | } |
293 | | |
294 | | private: |
295 | | Slice out_; |
296 | | uint8_t* wpos_; |
297 | | size_t skip_len_; |
298 | | }; |
299 | | |
300 | 160k | Result<size_t> RefinedStream::Read() { |
301 | 168k | for (;;) { |
302 | 168k | auto& out_buffer = context_->ReadBuffer(); |
303 | 168k | auto data_available_before_reading = read_buffer_.DataAvailable(); |
304 | | |
305 | 168k | if (upper_stream_bytes_to_skip_ > 0) { |
306 | 1.60k | auto global_skip_buffer = GetGlobalSkipBuffer(); |
307 | 32.4k | do { |
308 | 32.4k | SkipStreamReadBuffer buffer(global_skip_buffer, upper_stream_bytes_to_skip_); |
309 | 32.4k | RETURN_NOT_OK(refiner_->Read(&buffer)); |
310 | 32.4k | size_t len = buffer.AppendedVecs()[0].iov_len; |
311 | 32.4k | if (!len) { |
312 | 1.35k | break; |
313 | 1.35k | } |
314 | 31.0k | VLOG_WITH_PREFIX0 (4) << "Skip upper: " << len << " of " << upper_stream_bytes_to_skip_0 ; |
315 | 31.0k | upper_stream_bytes_to_skip_ -= len; |
316 | 31.0k | } while (upper_stream_bytes_to_skip_ > 0); |
317 | 1.60k | } |
318 | | |
319 | 168k | if (upper_stream_bytes_to_skip_ == 0) { |
320 | 166k | auto read_buffer_full = VERIFY_RESULT166k (refiner_->Read(&out_buffer));166k |
321 | 166k | if (out_buffer.ReadyToRead()) { |
322 | 154k | auto temp = VERIFY_RESULT(context_->ProcessReceived(read_buffer_full)); |
323 | 0 | upper_stream_bytes_to_skip_ = temp; |
324 | 154k | VLOG_IF_WITH_PREFIX59 (3, temp != 0) << "Skip: " << upper_stream_bytes_to_skip_59 ; |
325 | 154k | } |
326 | 166k | } |
327 | | |
328 | 168k | if (read_buffer_.Empty() || read_buffer_.DataAvailable() == data_available_before_reading8.64k ) { |
329 | 160k | break; |
330 | 160k | } |
331 | 168k | } |
332 | | |
333 | 160k | return 0; |
334 | 160k | } |
335 | | |
336 | | RefinedStreamFactory::RefinedStreamFactory( |
337 | | StreamFactoryPtr lower_layer_factory, const MemTrackerPtr& buffer_tracker, |
338 | | RefinerFactory refiner_factory) |
339 | | : lower_layer_factory_(std::move(lower_layer_factory)), buffer_tracker_(buffer_tracker), |
340 | 23.7k | refiner_factory_(std::move(refiner_factory)) { |
341 | 23.7k | } |
342 | | |
343 | 3.69M | std::unique_ptr<Stream> RefinedStreamFactory::Create(const StreamCreateData& data) { |
344 | 3.69M | return std::make_unique<RefinedStream>( |
345 | 3.69M | lower_layer_factory_->Create(data), refiner_factory_(data), data.receive_buffer_size, |
346 | 3.69M | buffer_tracker_); |
347 | 3.69M | } |
348 | | |
349 | | } // namespace rpc |
350 | | } // namespace yb |