/Users/deen/code/yugabyte-db/src/yb/yql/cql/cqlserver/cql_rpc.h
Line | Count | Source (jump to first uncovered line) |
1 | | // |
2 | | // Copyright (c) YugaByte, Inc. |
3 | | // |
4 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
5 | | // in compliance with the License. You may obtain a copy of the License at |
6 | | // |
7 | | // http://www.apache.org/licenses/LICENSE-2.0 |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
10 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
11 | | // or implied. See the License for the specific language governing permissions and limitations |
12 | | // under the License. |
13 | | // |
14 | | // |
15 | | #ifndef YB_YQL_CQL_CQLSERVER_CQL_RPC_H |
16 | | #define YB_YQL_CQL_CQLSERVER_CQL_RPC_H |
17 | | |
18 | | #include <stdint.h> |
19 | | |
20 | | #include <atomic> |
21 | | #include <mutex> |
22 | | #include <set> |
23 | | #include <type_traits> |
24 | | #include <utility> |
25 | | |
26 | | #include <boost/version.hpp> |
27 | | |
28 | | #include "yb/master/master_defaults.h" |
29 | | |
30 | | #include "yb/rpc/binary_call_parser.h" |
31 | | #include "yb/rpc/circular_read_buffer.h" |
32 | | #include "yb/rpc/rpc_with_call_id.h" |
33 | | #include "yb/rpc/server_event.h" |
34 | | |
35 | | #include "yb/util/net/net_fwd.h" |
36 | | |
37 | | #include "yb/yql/cql/ql/ql_session.h" |
38 | | #include "yb/yql/cql/ql/util/cql_message.h" |
39 | | |
40 | | namespace yb { |
41 | | namespace cqlserver { |
42 | | |
43 | | class CQLStatement; |
44 | | class CQLServiceImpl; |
45 | | |
46 | | class CQLConnectionContext : public rpc::ConnectionContextWithCallId, |
47 | | public rpc::BinaryCallParserListener { |
48 | | public: |
49 | | CQLConnectionContext(size_t receive_buffer_size, const MemTrackerPtr& buffer_tracker, |
50 | | const MemTrackerPtr& call_tracker); |
51 | | |
52 | | void DumpPB(const rpc::DumpRunningRpcsRequestPB& req, |
53 | | rpc::RpcConnectionPB* resp) override; |
54 | | |
55 | | // Accessor methods for CQL message compression scheme to use. |
56 | 17.8M | ql::CQLMessage::CompressionScheme compression_scheme() const { |
57 | 17.8M | return compression_scheme_; |
58 | 17.8M | } |
59 | 44 | void set_compression_scheme(ql::CQLMessage::CompressionScheme compression_scheme) { |
60 | 44 | compression_scheme_ = compression_scheme; |
61 | 44 | } |
62 | | |
63 | | // Accessor methods for registered CQL events. |
64 | 8.94M | ql::CQLMessage::Events registered_events() const { |
65 | 8.94M | return registered_events_; |
66 | 8.94M | } |
67 | 2.31k | void add_registered_events(ql::CQLMessage::Events events) { |
68 | 2.31k | registered_events_ |= events; |
69 | 2.31k | } |
70 | | |
71 | 6.11k | static std::string Name() { return "CQL"; } |
72 | | |
73 | | private: |
74 | 10.0k | void Connected(const rpc::ConnectionPtr& connection) override {} |
75 | | |
76 | 0 | rpc::RpcConnectionPB::StateType State() override { |
77 | 0 | return rpc::RpcConnectionPB::OPEN; |
78 | 0 | } |
79 | | |
80 | | uint64_t ExtractCallId(rpc::InboundCall* call) override; |
81 | | Result<rpc::ProcessCallsResult> ProcessCalls(const rpc::ConnectionPtr& connection, |
82 | | const IoVecs& bytes_to_process, |
83 | | rpc::ReadBufferFull read_buffer_full) override; |
84 | | // Takes ownership of call_data content. |
85 | | CHECKED_STATUS HandleCall( |
86 | | const rpc::ConnectionPtr& connection, rpc::CallData* call_data) override; |
87 | | |
88 | 47.2M | rpc::StreamReadBuffer& ReadBuffer() override { |
89 | 47.2M | return read_buffer_; |
90 | 47.2M | } |
91 | | |
92 | | // SQL session of this CQL client connection. |
93 | | ql::QLSession::SharedPtr ql_session_; |
94 | | |
95 | | // CQL message compression scheme to use. |
96 | | ql::CQLMessage::CompressionScheme compression_scheme_ = ql::CQLMessage::CompressionScheme::kNone; |
97 | | |
98 | | // Stored registered events for the connection. |
99 | | ql::CQLMessage::Events registered_events_ = ql::CQLMessage::kNoEvents; |
100 | | |
101 | | rpc::BinaryCallParser parser_; |
102 | | |
103 | | rpc::CircularReadBuffer read_buffer_; |
104 | | |
105 | | MemTrackerPtr call_tracker_; |
106 | | }; |
107 | | |
108 | | class CQLInboundCall : public rpc::InboundCall { |
109 | | public: |
110 | | explicit CQLInboundCall(rpc::ConnectionPtr conn, |
111 | | CallProcessedListener call_processed_listener, |
112 | | ql::QLSession::SharedPtr ql_session); |
113 | | |
114 | | // Takes ownership of call_data content. |
115 | | CHECKED_STATUS ParseFrom(const MemTrackerPtr& call_tracker, rpc::CallData* call_data); |
116 | | |
117 | | // Serialize the response packet for the finished call. |
118 | | // The resulting slices refer to memory in this object. |
119 | | void DoSerialize(boost::container::small_vector_base<RefCntBuffer>* output) override; |
120 | | |
121 | | void LogTrace() const override; |
122 | | std::string ToString() const override; |
123 | | bool DumpPB(const rpc::DumpRunningRpcsRequestPB& req, rpc::RpcCallInProgressPB* resp) override; |
124 | | |
125 | | CoarseTimePoint GetClientDeadline() const override; |
126 | | |
127 | | // Return the response message buffer. |
128 | 0 | RefCntBuffer& response_msg_buf() { |
129 | 0 | return response_msg_buf_; |
130 | 0 | } |
131 | | |
132 | | // Return the SQL session of this CQL call. |
133 | 9.01M | const ql::QLSession::SharedPtr& ql_session() const { |
134 | 9.01M | return ql_session_; |
135 | 9.01M | } |
136 | | |
137 | 17.8M | uint16_t stream_id() const { return stream_id_; } |
138 | | |
139 | | Slice serialized_remote_method() const override; |
140 | | Slice method_name() const override; |
141 | | |
142 | | static Slice static_serialized_remote_method(); |
143 | | |
144 | | void RespondFailure(rpc::ErrorStatusPB::RpcErrorCodePB error_code, const Status& status) override; |
145 | | void RespondSuccess(const RefCntBuffer& buffer); |
146 | | void GetCallDetails(rpc::RpcCallInProgressPB *call_in_progress_pb) const; |
147 | 8.92M | void SetRequest(std::shared_ptr<const ql::CQLRequest> request, CQLServiceImpl* service_impl) { |
148 | 8.92M | service_impl_ = service_impl; |
149 | | #ifdef THREAD_SANITIZER |
150 | | request_ = request; |
151 | | #else |
152 | 8.92M | std::atomic_store_explicit(&request_, request, std::memory_order_release); |
153 | 8.92M | #endif |
154 | 8.92M | } |
155 | | |
156 | 8.94M | size_t ObjectSize() const override { return sizeof(*this); } |
157 | | |
158 | 8.94M | size_t DynamicMemoryUsage() const override { |
159 | | // TODO - who is tracking request_ memory usage ? |
160 | 8.94M | return DynamicMemoryUsageOf(response_msg_buf_); |
161 | 8.94M | } |
162 | | |
163 | | private: |
164 | | RefCntBuffer response_msg_buf_; |
165 | | const ql::QLSession::SharedPtr ql_session_; |
166 | | uint16_t stream_id_; |
167 | | std::shared_ptr<const ql::CQLRequest> request_; |
168 | | // Pointer to the containing CQL service implementation. |
169 | | CQLServiceImpl* service_impl_; |
170 | | |
171 | | ScopedTrackedConsumption consumption_; |
172 | | |
173 | | CoarseTimePoint deadline_; |
174 | | }; |
175 | | |
176 | | using CQLInboundCallPtr = std::shared_ptr<CQLInboundCall>; |
177 | | |
178 | | } // namespace cqlserver |
179 | | } // namespace yb |
180 | | |
181 | | #endif // YB_YQL_CQL_CQLSERVER_CQL_RPC_H |