/Users/deen/code/yugabyte-db/src/yb/rpc/refined_stream.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | // |
13 | | |
14 | | #include "yb/rpc/refined_stream.h" |
15 | | |
16 | | #include "yb/rpc/rpc_util.h" |
17 | | |
18 | | #include "yb/util/logging.h" |
19 | | #include "yb/util/result.h" |
20 | | #include "yb/util/status_format.h" |
21 | | |
22 | | namespace yb { |
23 | | namespace rpc { |
24 | | |
25 | | RefinedStream::RefinedStream( |
26 | | std::unique_ptr<Stream> lower_stream, std::unique_ptr<StreamRefiner> refiner, |
27 | | size_t receive_buffer_size, const MemTrackerPtr& buffer_tracker) |
28 | | : lower_stream_(std::move(lower_stream)), refiner_(std::move(refiner)), |
29 | 876k | read_buffer_(receive_buffer_size, buffer_tracker) { |
30 | 876k | } |
31 | | |
32 | 84.0M | size_t RefinedStream::GetPendingWriteBytes() { |
33 | 84.0M | return lower_stream_->GetPendingWriteBytes(); |
34 | 84.0M | } |
35 | | |
36 | 1.14k | void RefinedStream::Close() { |
37 | 1.14k | lower_stream_->Close(); |
38 | 1.14k | } |
39 | | |
40 | 41.2M | Status RefinedStream::TryWrite() { |
41 | 41.2M | return lower_stream_->TryWrite(); |
42 | 41.2M | } |
43 | | |
44 | 0 | void RefinedStream::ParseReceived() { |
45 | 0 | lower_stream_->ParseReceived(); |
46 | 0 | } |
47 | | |
48 | 58.5M | bool RefinedStream::Idle(std::string* reason) { |
49 | 58.5M | return lower_stream_->Idle(reason); |
50 | 58.5M | } |
51 | | |
52 | 8 | void RefinedStream::DumpPB(const DumpRunningRpcsRequestPB& req, RpcConnectionPB* resp) { |
53 | 8 | lower_stream_->DumpPB(req, resp); |
54 | 8 | } |
55 | | |
56 | 15.0M | const Endpoint& RefinedStream::Remote() const { |
57 | 15.0M | return lower_stream_->Remote(); |
58 | 15.0M | } |
59 | | |
60 | 23.4k | const Endpoint& RefinedStream::Local() const { |
61 | 23.4k | return lower_stream_->Local(); |
62 | 23.4k | } |
63 | | |
64 | 879k | Status RefinedStream::Start(bool connect, ev::loop_ref* loop, StreamContext* context) { |
65 | 564k | local_side_ = connect ? LocalSide::kClient : LocalSide::kServer; |
66 | 879k | context_ = context; |
67 | 879k | refiner_->Start(this); |
68 | 879k | return lower_stream_->Start(connect, loop, this); |
69 | 879k | } |
70 | | |
71 | 1.10M | void RefinedStream::Shutdown(const Status& status) { |
72 | 415 | VLOG_WITH_PREFIX(1) << "Shutdown with status: " << status; |
73 | | |
74 | 608k | for (auto& data : pending_data_) { |
75 | 608k | if (data) { |
76 | 607k | context().Transferred(data, status); |
77 | 607k | } |
78 | 608k | } |
79 | | |
80 | 1.10M | pending_data_.clear(); |
81 | 1.10M | lower_stream_->Shutdown(status); |
82 | 1.10M | } |
83 | | |
84 | 42.6M | Result<size_t> RefinedStream::Send(OutboundDataPtr data) { |
85 | 42.6M | switch (state_) { |
86 | 1.20M | case RefinedStreamState::kInitial: |
87 | 1.20M | case RefinedStreamState::kHandshake: |
88 | 1.20M | pending_data_.push_back(std::move(data)); |
89 | 1.20M | return std::numeric_limits<size_t>::max(); |
90 | 57.4k | case RefinedStreamState::kEnabled: |
91 | 57.4k | RETURN_NOT_OK(refiner_->Send(std::move(data))); |
92 | 57.4k | return std::numeric_limits<size_t>::max(); |
93 | 41.4M | case RefinedStreamState::kDisabled: |
94 | 41.4M | return lower_stream_->Send(std::move(data)); |
95 | 0 | } |
96 | | |
97 | 0 | FATAL_INVALID_ENUM_VALUE(RefinedStreamState, state_); |
98 | 0 | } |
99 | | |
100 | 39.9M | void RefinedStream::UpdateLastActivity() { |
101 | 39.9M | context_->UpdateLastActivity(); |
102 | 39.9M | } |
103 | | |
104 | 40.2M | void RefinedStream::UpdateLastRead() { |
105 | 40.2M | context_->UpdateLastRead(); |
106 | 40.2M | } |
107 | | |
108 | 40.4M | void RefinedStream::UpdateLastWrite() { |
109 | 40.4M | context_->UpdateLastWrite(); |
110 | 40.4M | } |
111 | | |
112 | 41.4M | void RefinedStream::Transferred(const OutboundDataPtr& data, const Status& status) { |
113 | 41.4M | context_->Transferred(data, status); |
114 | 41.4M | } |
115 | | |
116 | 550k | void RefinedStream::Destroy(const Status& status) { |
117 | 550k | context_->Destroy(status); |
118 | 550k | } |
119 | | |
120 | 189 | std::string RefinedStream::ToString() const { |
121 | 189 | return Format("$0[$1] $2 $3", |
122 | 188 | *refiner_, local_side_ == LocalSide::kClient ? "C" : "S", state_, *lower_stream_); |
123 | 189 | } |
124 | | |
125 | 3.26k | void RefinedStream::Cancelled(size_t handle) { |
126 | 3.26k | if (state_ == RefinedStreamState::kDisabled) { |
127 | 3.26k | lower_stream_->Cancelled(handle); |
128 | 3.26k | return; |
129 | 3.26k | } |
130 | 0 | LOG_WITH_PREFIX(DFATAL) << "Cancel is not supported for proxy stream: " << handle; |
131 | 0 | } |
132 | | |
133 | 13.9M | bool RefinedStream::IsConnected() { |
134 | 13.9M | return state_ == RefinedStreamState::kEnabled || state_ == RefinedStreamState::kDisabled; |
135 | 13.9M | } |
136 | | |
137 | 3.32M | const Protocol* RefinedStream::GetProtocol() { |
138 | 3.32M | return refiner_->GetProtocol(); |
139 | 3.32M | } |
140 | | |
141 | 221M | StreamReadBuffer& RefinedStream::ReadBuffer() { |
142 | 218M | return state_ != RefinedStreamState::kDisabled ? read_buffer_ : context_->ReadBuffer(); |
143 | 221M | } |
144 | | |
145 | 40.1M | Result<size_t> RefinedStream::ProcessReceived(ReadBufferFull read_buffer_full) { |
146 | 40.1M | switch (state_) { |
147 | 314k | case RefinedStreamState::kInitial: { |
148 | 314k | RETURN_NOT_OK(refiner_->ProcessHeader()); |
149 | 314k | if (state_ == RefinedStreamState::kInitial) { |
150 | | // Received data was not enough to check stream header. |
151 | 0 | return 0; |
152 | 0 | } |
153 | 314k | return ProcessReceived(read_buffer_full); |
154 | 314k | } |
155 | | |
156 | 39.7M | case RefinedStreamState::kDisabled: |
157 | 39.7M | return context_->ProcessReceived(read_buffer_full); |
158 | | |
159 | 3.21k | case RefinedStreamState::kHandshake: |
160 | 3.21k | return Handshake(); |
161 | | |
162 | 58.0k | case RefinedStreamState::kEnabled: { |
163 | 58.0k | return Read(); |
164 | 0 | } |
165 | 0 | } |
166 | | |
167 | 0 | return STATUS_FORMAT(IllegalState, "Unexpected state: $0", to_underlying(state_)); |
168 | 0 | } |
169 | | |
170 | 587k | void RefinedStream::Connected() { |
171 | 587k | if (local_side_ != LocalSide::kClient) { |
172 | 314k | return; |
173 | 314k | } |
174 | | |
175 | 273k | auto status = StartHandshake(); |
176 | 273k | if (status.ok()) { |
177 | 271k | status = refiner_->Handshake(); |
178 | 271k | } |
179 | 273k | if (!status.ok()) { |
180 | 0 | context_->Destroy(status); |
181 | 0 | } |
182 | 273k | } |
183 | | |
184 | 583k | CHECKED_STATUS TransferData(StreamReadBuffer* source, StreamReadBuffer* dest) { |
185 | 583k | auto dst = VERIFY_RESULT(dest->PrepareAppend()); |
186 | 583k | auto dst_it = dst.begin(); |
187 | 583k | size_t total_len = 0; |
188 | 585k | for (auto src_vec : source->AppendedVecs()) { |
189 | 898k | while (src_vec.iov_len != 0) { |
190 | 313k | if (dst_it->iov_len == 0) { |
191 | 0 | if (++dst_it == dst.end()) { |
192 | 0 | return STATUS(RuntimeError, "No enough space in destination buffer"); |
193 | 0 | } |
194 | 313k | } |
195 | 313k | size_t len = std::min(dst_it->iov_len, src_vec.iov_len); |
196 | 313k | memcpy(dst_it->iov_base, src_vec.iov_base, len); |
197 | 313k | IoVecRemovePrefix(len, &*dst_it); |
198 | 313k | IoVecRemovePrefix(len, &src_vec); |
199 | 313k | total_len += len; |
200 | 313k | } |
201 | 585k | } |
202 | 583k | source->Consume(total_len, Slice()); |
203 | 583k | dest->DataAppended(total_len); |
204 | 583k | return Status::OK(); |
205 | 583k | } |
206 | | |
207 | 586k | Status RefinedStream::Established(RefinedStreamState state) { |
208 | 586k | state_ = state; |
209 | | |
210 | 586k | if (state == RefinedStreamState::kDisabled) { |
211 | 584k | RETURN_NOT_OK(TransferData(&read_buffer_, &context_->ReadBuffer())); |
212 | 584k | } |
213 | | |
214 | 586k | ResetLogPrefix(); |
215 | | |
216 | 18.4E | VLOG_WITH_PREFIX(1) << __func__ << ": " << state; |
217 | | |
218 | 586k | context().Connected(); |
219 | 604k | for (auto& data : pending_data_) { |
220 | 604k | RETURN_NOT_OK(Send(std::move(data))); |
221 | 604k | } |
222 | 586k | pending_data_.clear(); |
223 | 586k | return Status::OK(); |
224 | 586k | } |
225 | | |
226 | 112k | Status RefinedStream::SendToLower(OutboundDataPtr data) { |
227 | 112k | return ResultToStatus(lower_stream_->Send(std::move(data))); |
228 | 112k | } |
229 | | |
230 | 274k | Status RefinedStream::StartHandshake() { |
231 | 274k | state_ = RefinedStreamState::kHandshake; |
232 | 274k | ResetLogPrefix(); |
233 | 274k | return Status::OK(); |
234 | 274k | } |
235 | | |
236 | 3.21k | Result<size_t> RefinedStream::Handshake() { |
237 | 3.21k | auto handshake_status = refiner_->Handshake(); |
238 | 147 | LOG_IF_WITH_PREFIX(INFO, !handshake_status.ok()) << "Handshake failed: " << handshake_status; |
239 | 3.21k | RETURN_NOT_OK(handshake_status); |
240 | | |
241 | 3.06k | if (state_ == RefinedStreamState::kEnabled) { |
242 | 1.91k | return Read(); |
243 | 1.91k | } |
244 | | |
245 | 1.15k | return 0; |
246 | 1.15k | } |
247 | | |
248 | | // Used to read bytes to global skip buffer. |
249 | | class SkipStreamReadBuffer : public StreamReadBuffer { |
250 | | public: |
251 | | explicit SkipStreamReadBuffer(const Slice& out, size_t skip_len) |
252 | 32.4k | : out_(out), wpos_(out_.mutable_data()), skip_len_(skip_len) {} |
253 | | |
254 | 0 | size_t DataAvailable() override { return 0; } |
255 | | |
256 | 0 | bool ReadyToRead() override { return false; } |
257 | | |
258 | 0 | bool Empty() override { return true; } |
259 | | |
260 | 0 | void Reset() override { |
261 | 0 | wpos_ = out_.mutable_data(); |
262 | 0 | } |
263 | | |
264 | 31.7k | bool Full() override { |
265 | 31.7k | return wpos_ == out_.end(); |
266 | 31.7k | } |
267 | | |
268 | 32.4k | Result<IoVecs> PrepareAppend() override { |
269 | 32.4k | return IoVecs({iovec{ |
270 | 32.4k | .iov_base = wpos_, |
271 | 32.4k | .iov_len = std::min(static_cast<size_t>(out_.end() - wpos_), skip_len_), |
272 | 32.4k | }}); |
273 | 32.4k | } |
274 | | |
275 | 32.4k | void DataAppended(size_t len) override { |
276 | 32.4k | skip_len_ -= len; |
277 | 32.4k | wpos_ += len; |
278 | 32.4k | } |
279 | | |
280 | 32.4k | IoVecs AppendedVecs() override { |
281 | 32.4k | return IoVecs({iovec{ |
282 | 32.4k | .iov_base = out_.mutable_data(), |
283 | 32.4k | .iov_len = static_cast<size_t>(wpos_ - out_.mutable_data()), |
284 | 32.4k | }}); |
285 | 32.4k | } |
286 | | |
287 | 0 | void Consume(size_t count, const Slice& prepend) override { |
288 | 0 | } |
289 | | |
290 | 0 | std::string ToString() const override { |
291 | 0 | return "SkipStreamReadBuffer"; |
292 | 0 | } |
293 | | |
294 | | private: |
295 | | Slice out_; |
296 | | uint8_t* wpos_; |
297 | | size_t skip_len_; |
298 | | }; |
299 | | |
300 | 59.9k | Result<size_t> RefinedStream::Read() { |
301 | 63.2k | for (;;) { |
302 | 63.2k | auto& out_buffer = context_->ReadBuffer(); |
303 | 63.2k | auto data_available_before_reading = read_buffer_.DataAvailable(); |
304 | | |
305 | 63.2k | if (upper_stream_bytes_to_skip_ > 0) { |
306 | 1.61k | auto global_skip_buffer = GetGlobalSkipBuffer(); |
307 | 32.4k | do { |
308 | 32.4k | SkipStreamReadBuffer buffer(global_skip_buffer, upper_stream_bytes_to_skip_); |
309 | 32.4k | RETURN_NOT_OK(refiner_->Read(&buffer)); |
310 | 32.4k | size_t len = buffer.AppendedVecs()[0].iov_len; |
311 | 32.4k | if (!len) { |
312 | 1.36k | break; |
313 | 1.36k | } |
314 | 31.0k | VLOG_WITH_PREFIX(4) << "Skip upper: " << len << " of " << upper_stream_bytes_to_skip_; |
315 | 31.0k | upper_stream_bytes_to_skip_ -= len; |
316 | 31.0k | } while (upper_stream_bytes_to_skip_ > 0); |
317 | 1.61k | } |
318 | | |
319 | 63.2k | if (upper_stream_bytes_to_skip_ == 0) { |
320 | 61.8k | auto read_buffer_full = VERIFY_RESULT(refiner_->Read(&out_buffer)); |
321 | 61.8k | if (out_buffer.ReadyToRead()) { |
322 | 57.2k | auto temp = VERIFY_RESULT(context_->ProcessReceived(read_buffer_full)); |
323 | 57.2k | upper_stream_bytes_to_skip_ = temp; |
324 | 42 | VLOG_IF_WITH_PREFIX(3, temp != 0) << "Skip: " << upper_stream_bytes_to_skip_; |
325 | 57.2k | } |
326 | 61.8k | } |
327 | | |
328 | 63.2k | if (read_buffer_.Empty() || read_buffer_.DataAvailable() == data_available_before_reading) { |
329 | 59.8k | break; |
330 | 59.8k | } |
331 | 63.2k | } |
332 | | |
333 | 59.8k | return 0; |
334 | 59.9k | } |
335 | | |
336 | | RefinedStreamFactory::RefinedStreamFactory( |
337 | | StreamFactoryPtr lower_layer_factory, const MemTrackerPtr& buffer_tracker, |
338 | | RefinerFactory refiner_factory) |
339 | | : lower_layer_factory_(std::move(lower_layer_factory)), buffer_tracker_(buffer_tracker), |
340 | 16.2k | refiner_factory_(std::move(refiner_factory)) { |
341 | 16.2k | } |
342 | | |
343 | 873k | std::unique_ptr<Stream> RefinedStreamFactory::Create(const StreamCreateData& data) { |
344 | 873k | return std::make_unique<RefinedStream>( |
345 | 873k | lower_layer_factory_->Create(data), refiner_factory_(data), data.receive_buffer_size, |
346 | 873k | buffer_tracker_); |
347 | 873k | } |
348 | | |
349 | | } // namespace rpc |
350 | | } // namespace yb |