YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/rpc/stream.h
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#ifndef YB_RPC_STREAM_H
15
#define YB_RPC_STREAM_H
16
17
#include "yb/rpc/rpc_fwd.h"
18
19
#include "yb/util/status_fwd.h"
20
#include "yb/util/net/socket.h"
21
22
namespace ev {
23
24
struct loop_ref;
25
26
}
27
28
namespace yb {
29
30
class MemTracker;
31
class MetricEntity;
32
33
namespace rpc {
34
35
class StreamReadBuffer {
36
 public:
37
  // Returns whether we could read appended data from this buffer. It is NOT always !Empty().
38
  virtual bool ReadyToRead() = 0;
39
40
  // Returns true if this buffer is empty.
41
  virtual bool Empty() = 0;
42
43
  // Resets buffer and release allocated memory.
44
  virtual void Reset() = 0;
45
46
  // Returns true if this buffer is full and we cannot anymore append into it.
47
  virtual bool Full() = 0;
48
49
  // Ensures there is some space to append into. Depending on currently used size.
50
  // Returns iov's that could be used for appending data into to this buffer.
51
  virtual Result<IoVecs> PrepareAppend() = 0;
52
53
  // Extends amount of appended data by len.
54
  virtual void DataAppended(size_t len) = 0;
55
56
  // Returns currently appended data.
57
  virtual IoVecs AppendedVecs() = 0;
58
59
  // Consumes count bytes of appended data. If prepend is not empty, then all future reads should
60
  // write data to prepend, until it is filled. I.e. unfilled part of prepend will be the first
61
  // entry of vector returned by PrepareAppend.
62
  virtual void Consume(size_t count, const Slice& prepend) = 0;
63
64
  virtual size_t DataAvailable() = 0;
65
66
  // Render this buffer to string.
67
  virtual std::string ToString() const = 0;
68
69
6.49M
  virtual ~StreamReadBuffer() {}
70
};
71
72
class StreamContext {
73
 public:
74
  virtual void UpdateLastActivity() = 0;
75
  virtual void UpdateLastRead() = 0;
76
  virtual void UpdateLastWrite() = 0;
77
  virtual void Transferred(const OutboundDataPtr& data, const Status& status) = 0;
78
  virtual void Destroy(const Status& status) = 0;
79
80
  // Called by underlying stream when stream has been connected (Stream::IsConnected() became true).
81
  virtual void Connected() = 0;
82
83
  virtual Result<size_t> ProcessReceived(ReadBufferFull read_buffer_full) = 0;
84
  virtual StreamReadBuffer& ReadBuffer() = 0;
85
86
 protected:
87
6.45M
  ~StreamContext() {}
88
};
89
90
class Stream {
91
 public:
92
7.44M
  Stream() = default;
93
94
  Stream(const Stream&) = delete;
95
  void operator=(const Stream&) = delete;
96
97
  virtual CHECKED_STATUS Start(bool connect, ev::loop_ref* loop, StreamContext* context) = 0;
98
  virtual void Close() = 0;
99
  virtual void Shutdown(const Status& status) = 0;
100
101
  // Returns handle to block associated with this data. This handle could be used to cancel
102
  // transfer of this block using Cancelled.
103
  // For instance when unsent call times out.
104
  virtual Result<size_t> Send(OutboundDataPtr data) = 0;
105
106
  virtual CHECKED_STATUS TryWrite() = 0;
107
  virtual void ParseReceived() = 0;
108
  virtual size_t GetPendingWriteBytes() = 0;
109
  virtual void Cancelled(size_t handle) = 0;
110
111
  virtual bool Idle(std::string* reason_not_idle) = 0;
112
  virtual bool IsConnected() = 0;
113
  virtual void DumpPB(const DumpRunningRpcsRequestPB& req, RpcConnectionPB* resp) = 0;
114
115
  // The address of the remote end of the connection.
116
  virtual const Endpoint& Remote() const = 0;
117
118
  // The address of the local end of the connection.
119
  virtual const Endpoint& Local() const = 0;
120
121
  virtual std::string ToString() const;
122
123
61.0k
  const std::string& LogPrefix() {
124
61.0k
    if (log_prefix_.empty()) {
125
58.6k
      log_prefix_ = ToString() + ": ";
126
58.6k
    }
127
61.0k
    return log_prefix_;
128
61.0k
  }
129
130
  virtual const Protocol* GetProtocol() = 0;
131
132
6.44M
  virtual ~Stream() {}
133
134
 protected:
135
1.78M
  void ResetLogPrefix() {
136
1.78M
    log_prefix_.clear();
137
1.78M
  }
138
139
  std::string log_prefix_;
140
};
141
142
struct StreamCreateData {
143
  Endpoint remote;
144
  const std::string& remote_hostname;
145
  Socket* socket;
146
  size_t receive_buffer_size;
147
  std::shared_ptr<MemTracker> mem_tracker;
148
  scoped_refptr<MetricEntity> metric_entity;
149
};
150
151
class StreamFactory {
152
 public:
153
  virtual std::unique_ptr<Stream> Create(const StreamCreateData& data) = 0;
154
155
9.40k
  virtual ~StreamFactory() {}
156
};
157
158
class Protocol {
159
 public:
160
38.4k
  explicit Protocol(const std::string& id) : id_(id) {}
161
162
  Protocol(const Protocol& schema) = delete;
163
  void operator=(const Protocol& schema) = delete;
164
165
10.2k
  const std::string& ToString() const { return id_; }
166
167
0
  const std::string& id() const { return id_; }
168
169
 private:
170
  std::string id_;
171
};
172
173
} // namespace rpc
174
} // namespace yb
175
176
#endif // YB_RPC_STREAM_H