/Users/deen/code/yugabyte-db/src/yb/rpc/refined_stream.h
Line | Count | Source |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | // |
13 | | |
14 | | #ifndef YB_RPC_REFINED_STREAM_H |
15 | | #define YB_RPC_REFINED_STREAM_H |
16 | | |
17 | | #include "yb/rpc/circular_read_buffer.h" |
18 | | #include "yb/rpc/stream.h" |
19 | | |
20 | | #include "yb/util/mem_tracker.h" |
21 | | |
22 | | namespace yb { |
23 | | namespace rpc { |
24 | | |
25 | | YB_DEFINE_ENUM(RefinedStreamState, (kInitial)(kHandshake)(kEnabled)(kDisabled)); |
26 | | YB_DEFINE_ENUM(LocalSide, (kClient)(kServer)); |
27 | | |
28 | | // StreamRefiner is used by RefinedStream to perform actual stream data modification. |
29 | | class StreamRefiner { |
30 | | public: |
31 | | virtual void Start(RefinedStream* stream) = 0; |
32 | | virtual CHECKED_STATUS ProcessHeader() = 0; |
33 | | virtual CHECKED_STATUS Send(OutboundDataPtr data) = 0; |
34 | | virtual CHECKED_STATUS Handshake() = 0; |
35 | | virtual Result<ReadBufferFull> Read(StreamReadBuffer* out) = 0; |
36 | | virtual const Protocol* GetProtocol() = 0; |
37 | | |
38 | | virtual std::string ToString() const = 0; |
39 | | |
40 | 3.19M | virtual ~StreamRefiner() = default; |
41 | | }; |
42 | | |
43 | | // Stream that alters the data sent and received by lower layer streams. |
44 | | // For instance it could be used to compress or encrypt the data. |
45 | | // |
46 | | // RefinedStream keeps the code common to all such streams, |
47 | | // while StreamRefiner provides actual data modification. |
48 | | class RefinedStream : public Stream, public StreamContext { |
49 | | public: |
50 | | RefinedStream(std::unique_ptr<Stream> lower_stream, std::unique_ptr<StreamRefiner> refiner, |
51 | | size_t receive_buffer_size, const MemTrackerPtr& buffer_tracker); |
52 | | |
53 | | size_t GetPendingWriteBytes() override; |
54 | | void Close() override; |
55 | | CHECKED_STATUS TryWrite() override; |
56 | | void ParseReceived() override; |
57 | | bool Idle(std::string* reason) override; |
58 | | void DumpPB(const DumpRunningRpcsRequestPB& req, RpcConnectionPB* resp) override; |
59 | | const Endpoint& Remote() const override; |
60 | | const Endpoint& Local() const override; |
61 | | CHECKED_STATUS Start(bool connect, ev::loop_ref* loop, StreamContext* context) override; |
62 | | void Shutdown(const Status& status) override; |
63 | | Result<size_t> Send(OutboundDataPtr data) override; |
64 | | void Cancelled(size_t handle) override; |
65 | | bool IsConnected() override; |
66 | | const Protocol* GetProtocol() override; |
67 | | StreamReadBuffer& ReadBuffer() override; |
68 | | std::string ToString() const override; |
69 | | |
70 | | // Implementation StreamContext |
71 | | Result<size_t> ProcessReceived(ReadBufferFull read_buffer_full) override; |
72 | | void Connected() override; |
73 | | |
74 | | void UpdateLastActivity() override; |
75 | | void UpdateLastRead() override; |
76 | | void UpdateLastWrite() override; |
77 | | void Transferred(const OutboundDataPtr& data, const Status& status) override; |
78 | | void Destroy(const Status& status) override; |
79 | | |
80 | | CHECKED_STATUS Established(RefinedStreamState state); |
81 | | CHECKED_STATUS SendToLower(OutboundDataPtr data); |
82 | | CHECKED_STATUS StartHandshake(); |
83 | | |
84 | 6.21M | StreamContext& context() const { |
85 | 6.21M | return *context_; |
86 | 6.21M | } |
87 | | |
88 | 600k | LocalSide local_side() const { |
89 | 600k | return local_side_; |
90 | 600k | } |
91 | | |
92 | 1.18M | const MemTrackerPtr& buffer_tracker() const { |
93 | 1.18M | return buffer_tracker_; |
94 | 1.18M | } |
95 | | |
96 | | private: |
97 | | Result<size_t> Handshake(); |
98 | | Result<size_t> Read(); |
99 | | |
100 | | std::unique_ptr<Stream> lower_stream_; |
101 | | std::unique_ptr<StreamRefiner> refiner_; |
102 | | RefinedStreamState state_ = RefinedStreamState::kInitial; |
103 | | StreamContext* context_ = nullptr; |
104 | | std::vector<OutboundDataPtr> pending_data_; |
105 | | size_t upper_stream_bytes_to_skip_ = 0; |
106 | | LocalSide local_side_ = LocalSide::kServer; |
107 | | CircularReadBuffer read_buffer_; |
108 | | MemTrackerPtr buffer_tracker_; |
109 | | }; |
110 | | |
111 | | class RefinedStreamFactory : public StreamFactory { |
112 | | public: |
113 | | using RefinerFactory = std::function<std::unique_ptr<StreamRefiner>( |
114 | | const StreamCreateData& data)>; |
115 | | |
116 | | RefinedStreamFactory( |
117 | | StreamFactoryPtr lower_layer_factory, const MemTrackerPtr& buffer_tracker, |
118 | | RefinerFactory refiner_factory); |
119 | | |
120 | | private: |
121 | | std::unique_ptr<Stream> Create(const StreamCreateData& data) override; |
122 | | |
123 | | StreamFactoryPtr lower_layer_factory_; |
124 | | MemTrackerPtr buffer_tracker_; |
125 | | RefinerFactory refiner_factory_; |
126 | | }; |
127 | | |
128 | | } // namespace rpc |
129 | | } // namespace yb |
130 | | |
131 | | #endif // YB_RPC_REFINED_STREAM_H |