YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/yql/cql/cqlserver/cql_rpc.h
Line
Count
Source (jump to first uncovered line)
1
//
2
// Copyright (c) YugaByte, Inc.
3
//
4
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
5
// in compliance with the License.  You may obtain a copy of the License at
6
//
7
// http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software distributed under the License
10
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
11
// or implied.  See the License for the specific language governing permissions and limitations
12
// under the License.
13
//
14
//
15
#ifndef YB_YQL_CQL_CQLSERVER_CQL_RPC_H
16
#define YB_YQL_CQL_CQLSERVER_CQL_RPC_H
17
18
#include <stdint.h>
19
20
#include <atomic>
21
#include <mutex>
22
#include <set>
23
#include <type_traits>
24
#include <utility>
25
26
#include <boost/version.hpp>
27
28
#include "yb/master/master_defaults.h"
29
30
#include "yb/rpc/binary_call_parser.h"
31
#include "yb/rpc/circular_read_buffer.h"
32
#include "yb/rpc/rpc_with_call_id.h"
33
#include "yb/rpc/server_event.h"
34
35
#include "yb/util/net/net_fwd.h"
36
37
#include "yb/yql/cql/ql/ql_session.h"
38
#include "yb/yql/cql/ql/util/cql_message.h"
39
40
namespace yb {
41
namespace cqlserver {
42
43
class CQLStatement;
44
class CQLServiceImpl;
45
46
class CQLConnectionContext : public rpc::ConnectionContextWithCallId,
47
                             public rpc::BinaryCallParserListener {
48
 public:
49
  CQLConnectionContext(size_t receive_buffer_size, const MemTrackerPtr& buffer_tracker,
50
                       const MemTrackerPtr& call_tracker);
51
52
  void DumpPB(const rpc::DumpRunningRpcsRequestPB& req,
53
              rpc::RpcConnectionPB* resp) override;
54
55
  // Accessor methods for CQL message compression scheme to use.
56
17.8M
  ql::CQLMessage::CompressionScheme compression_scheme() const {
57
17.8M
    return compression_scheme_;
58
17.8M
  }
59
44
  void set_compression_scheme(ql::CQLMessage::CompressionScheme compression_scheme) {
60
44
    compression_scheme_ = compression_scheme;
61
44
  }
62
63
  // Accessor methods for registered CQL events.
64
8.94M
  ql::CQLMessage::Events registered_events() const {
65
8.94M
    return registered_events_;
66
8.94M
  }
67
2.31k
  void add_registered_events(ql::CQLMessage::Events events) {
68
2.31k
    registered_events_ |= events;
69
2.31k
  }
70
71
6.11k
  static std::string Name() { return "CQL"; }
72
73
 private:
74
10.0k
  void Connected(const rpc::ConnectionPtr& connection) override {}
75
76
0
  rpc::RpcConnectionPB::StateType State() override {
77
0
    return rpc::RpcConnectionPB::OPEN;
78
0
  }
79
80
  uint64_t ExtractCallId(rpc::InboundCall* call) override;
81
  Result<rpc::ProcessCallsResult> ProcessCalls(const rpc::ConnectionPtr& connection,
82
                                               const IoVecs& bytes_to_process,
83
                                               rpc::ReadBufferFull read_buffer_full) override;
84
  // Takes ownership of call_data content.
85
  CHECKED_STATUS HandleCall(
86
      const rpc::ConnectionPtr& connection, rpc::CallData* call_data) override;
87
88
47.2M
  rpc::StreamReadBuffer& ReadBuffer() override {
89
47.2M
    return read_buffer_;
90
47.2M
  }
91
92
  // SQL session of this CQL client connection.
93
  ql::QLSession::SharedPtr ql_session_;
94
95
  // CQL message compression scheme to use.
96
  ql::CQLMessage::CompressionScheme compression_scheme_ = ql::CQLMessage::CompressionScheme::kNone;
97
98
  // Stored registered events for the connection.
99
  ql::CQLMessage::Events registered_events_ = ql::CQLMessage::kNoEvents;
100
101
  rpc::BinaryCallParser parser_;
102
103
  rpc::CircularReadBuffer read_buffer_;
104
105
  MemTrackerPtr call_tracker_;
106
};
107
108
class CQLInboundCall : public rpc::InboundCall {
109
 public:
110
  explicit CQLInboundCall(rpc::ConnectionPtr conn,
111
                          CallProcessedListener call_processed_listener,
112
                          ql::QLSession::SharedPtr ql_session);
113
114
  // Takes ownership of call_data content.
115
  CHECKED_STATUS ParseFrom(const MemTrackerPtr& call_tracker, rpc::CallData* call_data);
116
117
  // Serialize the response packet for the finished call.
118
  // The resulting slices refer to memory in this object.
119
  void DoSerialize(boost::container::small_vector_base<RefCntBuffer>* output) override;
120
121
  void LogTrace() const override;
122
  std::string ToString() const override;
123
  bool DumpPB(const rpc::DumpRunningRpcsRequestPB& req, rpc::RpcCallInProgressPB* resp) override;
124
125
  CoarseTimePoint GetClientDeadline() const override;
126
127
  // Return the response message buffer.
128
0
  RefCntBuffer& response_msg_buf() {
129
0
    return response_msg_buf_;
130
0
  }
131
132
  // Return the SQL session of this CQL call.
133
9.01M
  const ql::QLSession::SharedPtr& ql_session() const {
134
9.01M
    return ql_session_;
135
9.01M
  }
136
137
17.8M
  uint16_t stream_id() const { return stream_id_; }
138
139
  Slice serialized_remote_method() const override;
140
  Slice method_name() const override;
141
142
  static Slice static_serialized_remote_method();
143
144
  void RespondFailure(rpc::ErrorStatusPB::RpcErrorCodePB error_code, const Status& status) override;
145
  void RespondSuccess(const RefCntBuffer& buffer);
146
  void GetCallDetails(rpc::RpcCallInProgressPB *call_in_progress_pb) const;
147
8.92M
  void SetRequest(std::shared_ptr<const ql::CQLRequest> request, CQLServiceImpl* service_impl) {
148
8.92M
    service_impl_ = service_impl;
149
#ifdef THREAD_SANITIZER
150
    request_ = request;
151
#else
152
8.92M
    std::atomic_store_explicit(&request_, request, std::memory_order_release);
153
8.92M
#endif
154
8.92M
  }
155
156
8.94M
  size_t ObjectSize() const override { return sizeof(*this); }
157
158
8.94M
  size_t DynamicMemoryUsage() const override {
159
    // TODO - who is tracking request_ memory usage ?
160
8.94M
    return DynamicMemoryUsageOf(response_msg_buf_);
161
8.94M
  }
162
163
 private:
164
  RefCntBuffer response_msg_buf_;
165
  const ql::QLSession::SharedPtr ql_session_;
166
  uint16_t stream_id_;
167
  std::shared_ptr<const ql::CQLRequest> request_;
168
  // Pointer to the containing CQL service implementation.
169
  CQLServiceImpl* service_impl_;
170
171
  ScopedTrackedConsumption consumption_;
172
173
  CoarseTimePoint deadline_;
174
};
175
176
using CQLInboundCallPtr = std::shared_ptr<CQLInboundCall>;
177
178
} // namespace cqlserver
179
} // namespace yb
180
181
#endif // YB_YQL_CQL_CQLSERVER_CQL_RPC_H