YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/rpc/refined_stream.h
Line
Count
Source
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#ifndef YB_RPC_REFINED_STREAM_H
15
#define YB_RPC_REFINED_STREAM_H
16
17
#include "yb/rpc/circular_read_buffer.h"
18
#include "yb/rpc/stream.h"
19
20
#include "yb/util/mem_tracker.h"
21
22
namespace yb {
23
namespace rpc {
24
25
YB_DEFINE_ENUM(RefinedStreamState, (kInitial)(kHandshake)(kEnabled)(kDisabled));
26
YB_DEFINE_ENUM(LocalSide, (kClient)(kServer));
27
28
// StreamRefiner is used by RefinedStream to perform actual stream data modification.
29
class StreamRefiner {
30
 public:
31
  virtual void Start(RefinedStream* stream) = 0;
32
  virtual CHECKED_STATUS ProcessHeader() = 0;
33
  virtual CHECKED_STATUS Send(OutboundDataPtr data) = 0;
34
  virtual CHECKED_STATUS Handshake() = 0;
35
  virtual Result<ReadBufferFull> Read(StreamReadBuffer* out) = 0;
36
  virtual const Protocol* GetProtocol() = 0;
37
38
  virtual std::string ToString() const = 0;
39
40
559k
  virtual ~StreamRefiner() = default;
41
};
42
43
// Stream that alters the data sent and received by lower layer streams.
44
// For instance it could be used to compress or encrypt the data.
45
//
46
// RefinedStream keeps the code common to all such streams,
47
// while StreamRefiner provides actual data modification.
48
class RefinedStream : public Stream, public StreamContext {
49
 public:
50
  RefinedStream(std::unique_ptr<Stream> lower_stream, std::unique_ptr<StreamRefiner> refiner,
51
                size_t receive_buffer_size, const MemTrackerPtr& buffer_tracker);
52
53
  size_t GetPendingWriteBytes() override;
54
  void Close() override;
55
  CHECKED_STATUS TryWrite() override;
56
  void ParseReceived() override;
57
  bool Idle(std::string* reason) override;
58
  void DumpPB(const DumpRunningRpcsRequestPB& req, RpcConnectionPB* resp) override;
59
  const Endpoint& Remote() const override;
60
  const Endpoint& Local() const override;
61
  CHECKED_STATUS Start(bool connect, ev::loop_ref* loop, StreamContext* context) override;
62
  void Shutdown(const Status& status) override;
63
  Result<size_t> Send(OutboundDataPtr data) override;
64
  void Cancelled(size_t handle) override;
65
  bool IsConnected() override;
66
  const Protocol* GetProtocol() override;
67
  StreamReadBuffer& ReadBuffer() override;
68
  std::string ToString() const override;
69
70
  // Implementation StreamContext
71
  Result<size_t> ProcessReceived(ReadBufferFull read_buffer_full) override;
72
  void Connected() override;
73
74
  void UpdateLastActivity() override;
75
  void UpdateLastRead() override;
76
  void UpdateLastWrite() override;
77
  void Transferred(const OutboundDataPtr& data, const Status& status) override;
78
  void Destroy(const Status& status) override;
79
80
  CHECKED_STATUS Established(RefinedStreamState state);
81
  CHECKED_STATUS SendToLower(OutboundDataPtr data);
82
  CHECKED_STATUS StartHandshake();
83
84
1.19M
  StreamContext& context() const {
85
1.19M
    return *context_;
86
1.19M
  }
87
88
281k
  LocalSide local_side() const {
89
281k
    return local_side_;
90
281k
  }
91
92
575k
  const MemTrackerPtr& buffer_tracker() const {
93
575k
    return buffer_tracker_;
94
575k
  }
95
96
 private:
97
  Result<size_t> Handshake();
98
  Result<size_t> Read();
99
100
  std::unique_ptr<Stream> lower_stream_;
101
  std::unique_ptr<StreamRefiner> refiner_;
102
  RefinedStreamState state_ = RefinedStreamState::kInitial;
103
  StreamContext* context_ = nullptr;
104
  std::vector<OutboundDataPtr> pending_data_;
105
  size_t upper_stream_bytes_to_skip_ = 0;
106
  LocalSide local_side_ = LocalSide::kServer;
107
  CircularReadBuffer read_buffer_;
108
  MemTrackerPtr buffer_tracker_;
109
};
110
111
class RefinedStreamFactory : public StreamFactory {
112
 public:
113
  using RefinerFactory = std::function<std::unique_ptr<StreamRefiner>(
114
        const StreamCreateData& data)>;
115
116
  RefinedStreamFactory(
117
      StreamFactoryPtr lower_layer_factory, const MemTrackerPtr& buffer_tracker,
118
      RefinerFactory refiner_factory);
119
120
 private:
121
  std::unique_ptr<Stream> Create(const StreamCreateData& data) override;
122
123
  StreamFactoryPtr lower_layer_factory_;
124
  MemTrackerPtr buffer_tracker_;
125
  RefinerFactory refiner_factory_;
126
};
127
128
}  // namespace rpc
129
}  // namespace yb
130
131
#endif  // YB_RPC_REFINED_STREAM_H