/Users/deen/code/yugabyte-db/src/yb/client/async_rpc.h
Line | Count | Source |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | // |
13 | | |
14 | | #ifndef YB_CLIENT_ASYNC_RPC_H_ |
15 | | #define YB_CLIENT_ASYNC_RPC_H_ |
16 | | |
17 | | #include <boost/range/iterator_range_core.hpp> |
18 | | #include <boost/version.hpp> |
19 | | |
20 | | #include "yb/client/in_flight_op.h" |
21 | | #include "yb/client/tablet_rpc.h" |
22 | | |
23 | | #include "yb/common/common_types.pb.h" |
24 | | #include "yb/common/read_hybrid_time.h" |
25 | | |
26 | | #include "yb/rpc/rpc_fwd.h" |
27 | | |
28 | | #include "yb/tserver/tserver.pb.h" |
29 | | |
30 | | #include "yb/util/metrics_fwd.h" |
31 | | |
32 | | namespace yb { |
33 | | namespace client { |
34 | | |
35 | | class YBTable; |
36 | | class YBClient; |
37 | | |
38 | | namespace internal { |
39 | | |
40 | | class Batcher; |
41 | | struct InFlightOp; |
42 | | class RemoteTablet; |
43 | | class RemoteTabletServer; |
44 | | |
45 | | // Container for async rpc metrics |
46 | | struct AsyncRpcMetrics { |
47 | | explicit AsyncRpcMetrics(const scoped_refptr<MetricEntity>& metric_entity); |
48 | | |
49 | | scoped_refptr<Histogram> remote_write_rpc_time; |
50 | | scoped_refptr<Histogram> remote_read_rpc_time; |
51 | | scoped_refptr<Histogram> local_write_rpc_time; |
52 | | scoped_refptr<Histogram> local_read_rpc_time; |
53 | | scoped_refptr<Histogram> time_to_send; |
54 | | scoped_refptr<Counter> consistent_prefix_successful_reads; |
55 | | scoped_refptr<Counter> consistent_prefix_failed_reads; |
56 | | }; |
57 | | |
58 | | using InFlightOps = boost::iterator_range<std::vector<InFlightOp>::iterator>; |
59 | | |
60 | | struct AsyncRpcData { |
61 | | BatcherPtr batcher; |
62 | | RemoteTablet* tablet = nullptr; |
63 | | bool allow_local_calls_in_curr_thread = false; |
64 | | bool need_consistent_read = false; |
65 | | InFlightOps ops; |
66 | | bool need_metadata = false; |
67 | | }; |
68 | | |
69 | | struct FlushExtraResult { |
70 | | // Latest hybrid time that was present on tserver during processing of this request. |
71 | | HybridTime propagated_hybrid_time; |
72 | | |
73 | | // When read time was not specified by client it will contain read time that servers used |
74 | | // to process this request. |
75 | | ReadHybridTime used_read_time; |
76 | | }; |
77 | | |
78 | | // An Async RPC which is in-flight to a tablet. Initially, the RPC is sent |
79 | | // to the leader replica, but it may be retried with another replica if the |
80 | | // leader fails. |
81 | | // |
82 | | // Keeps a reference on the owning batcher while alive. It doesn't take a generic callback, |
83 | | // but ProcessResponseFromTserver will update the state after getting the end response. |
84 | | // This class deletes itself after Rpc returns and is processed. |
85 | | class AsyncRpc : public rpc::Rpc, public TabletRpc { |
86 | | public: |
87 | | AsyncRpc(const AsyncRpcData& data, YBConsistencyLevel consistency_level); |
88 | | |
89 | | virtual ~AsyncRpc(); |
90 | | |
91 | | void SendRpc() override; |
92 | | string ToString() const override; |
93 | | |
94 | | std::shared_ptr<const YBTable> table() const; |
95 | 14.7M | const RemoteTablet& tablet() const { return *tablet_invoker_.tablet(); } |
96 | 183k | const InFlightOps& ops() const { return ops_; } |
97 | | |
98 | | protected: |
99 | | void Finished(const Status& status) override; |
100 | | |
101 | | void SendRpcToTserver(int attempt_num) override; |
102 | | |
103 | | virtual void CallRemoteMethod() = 0; |
104 | | |
105 | | // This is the last step where errors and responses are collected from the response and |
106 | | // stored in batcher. If there's a callback from the user, it is done in this step. |
107 | | virtual void ProcessResponseFromTserver(const Status& status) = 0; |
108 | | |
109 | | // See FlushExtraResult for details. |
110 | | virtual FlushExtraResult MakeFlushExtraResult() = 0; |
111 | | |
112 | | virtual void SwapResponses() = 0; |
113 | | |
114 | | void Failed(const Status& status) override; |
115 | | |
116 | | // Is this a local call? |
117 | | bool IsLocalCall() const; |
118 | | |
119 | | // Creates the Local node tablet invoker or remote tablet invoker based on the GFLAG |
120 | | // 'FLAGS_ysql_forward_rpcs_to_local_tserver'. |
121 | | TabletInvoker *GetTabletInvoker(AsyncRpcData *data, YBConsistencyLevel yb_consistency_level); |
122 | | |
123 | | // Pointer back to the batcher. Processes the write response when it |
124 | | // completes, regardless of success or failure. |
125 | | BatcherPtr batcher_; |
126 | | |
127 | | // Operations which were batched into this RPC. |
128 | | // These operations are in kRequestSent state. |
129 | | InFlightOps ops_; |
130 | | |
131 | | TabletInvoker tablet_invoker_; |
132 | | |
133 | | CoarseTimePoint start_; |
134 | | std::shared_ptr<AsyncRpcMetrics> async_rpc_metrics_; |
135 | | rpc::RpcCommandPtr retained_self_; |
136 | | }; |
137 | | |
138 | | template <class Req, class Resp> |
139 | | class AsyncRpcBase : public AsyncRpc { |
140 | | public: |
141 | | AsyncRpcBase(const AsyncRpcData& data, YBConsistencyLevel consistency_level); |
142 | | ~AsyncRpcBase(); |
143 | | |
144 | 7.45M | const Resp& resp() const { return resp_; } |
145 | | Resp& resp() { return resp_; } |
146 | | |
147 | | protected: |
148 | | // Returns `true` if caller should continue processing response, `false` otherwise. |
149 | | bool CommonResponseCheck(const Status& status); |
150 | | void SendRpcToTserver(int attempt_num) override; |
151 | | |
152 | | virtual void NotifyBatcher(const Status& status) = 0; |
153 | | |
154 | | void ProcessResponseFromTserver(const Status& status) override; |
155 | | |
156 | | protected: // TODO replace with private |
157 | 12.1M | const tserver::TabletServerErrorPB* response_error() const override { |
158 | 12.1M | return resp_.has_error() ? &resp_.error()213k : nullptr11.9M ; |
159 | 12.1M | } yb::client::internal::AsyncRpcBase<yb::tserver::WriteRequestPB, yb::tserver::WriteResponsePB>::response_error() const Line | Count | Source | 157 | 2.55M | const tserver::TabletServerErrorPB* response_error() const override { | 158 | 2.55M | return resp_.has_error() ? &resp_.error()97.8k : nullptr2.45M ; | 159 | 2.55M | } |
yb::client::internal::AsyncRpcBase<yb::tserver::ReadRequestPB, yb::tserver::ReadResponsePB>::response_error() const Line | Count | Source | 157 | 9.56M | const tserver::TabletServerErrorPB* response_error() const override { | 158 | 9.56M | return resp_.has_error() ? &resp_.error()115k : nullptr9.45M ; | 159 | 9.56M | } |
|
160 | | |
161 | | FlushExtraResult MakeFlushExtraResult() override; |
162 | | |
163 | | Req req_; |
164 | | Resp resp_; |
165 | | }; |
166 | | |
167 | | class WriteRpc : public AsyncRpcBase<tserver::WriteRequestPB, tserver::WriteResponsePB> { |
168 | | public: |
169 | | // Relies on ops requests to be not on arena. |
170 | | explicit WriteRpc(const AsyncRpcData& data); |
171 | | |
172 | | virtual ~WriteRpc(); |
173 | | |
174 | | private: |
175 | | void SwapResponses() override; |
176 | | void CallRemoteMethod() override; |
177 | | void NotifyBatcher(const Status& status) override; |
178 | | bool ShouldRetryExpiredRequest() override; |
179 | | }; |
180 | | |
181 | | class ReadRpc : public AsyncRpcBase<tserver::ReadRequestPB, tserver::ReadResponsePB> { |
182 | | public: |
183 | | // Relies on ops requests to be not on arena. |
184 | | explicit ReadRpc(const AsyncRpcData& data, YBConsistencyLevel yb_consistency_level); |
185 | | |
186 | | virtual ~ReadRpc(); |
187 | | |
188 | | private: |
189 | | void SwapResponses() override; |
190 | | void CallRemoteMethod() override; |
191 | | void NotifyBatcher(const Status& status) override; |
192 | | }; |
193 | | |
194 | | } // namespace internal |
195 | | } // namespace client |
196 | | } // namespace yb |
197 | | |
198 | | #endif // YB_CLIENT_ASYNC_RPC_H_ |