/Users/deen/code/yugabyte-db/ent/src/yb/cdc/cdc_rpc.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // |
2 | | // Copyright (c) YugaByte, Inc. |
3 | | // |
4 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
5 | | // in compliance with the License. You may obtain a copy of the License at |
6 | | // |
7 | | // http://www.apache.org/licenses/LICENSE-2.0 |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
10 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
11 | | // or implied. See the License for the specific language governing permissions and limitations |
12 | | // under the License. |
13 | | // |
14 | | // |
15 | | |
16 | | #include "yb/cdc/cdc_rpc.h" |
17 | | |
18 | | #include "yb/cdc/cdc_service.pb.h" |
19 | | #include "yb/cdc/cdc_service.proxy.h" |
20 | | |
21 | | #include "yb/client/client.h" |
22 | | #include "yb/client/client-internal.h" |
23 | | #include "yb/client/meta_cache.h" |
24 | | #include "yb/client/tablet_rpc.h" |
25 | | |
26 | | #include "yb/rpc/rpc.h" |
27 | | #include "yb/rpc/rpc_controller.h" |
28 | | |
29 | | #include "yb/tserver/tserver_service.pb.h" |
30 | | #include "yb/tserver/tserver_service.proxy.h" |
31 | | |
32 | | #include "yb/util/trace.h" |
33 | | |
34 | | using namespace std::literals; |
35 | | |
36 | | using yb::tserver::TabletServerErrorPB; |
37 | | using yb::tserver::TabletServerServiceProxy; |
38 | | using yb::tserver::WriteRequestPB; |
39 | | using yb::tserver::WriteResponsePB; |
40 | | |
41 | | namespace yb { |
42 | | namespace cdc { |
43 | | |
44 | | class CDCWriteRpc : public rpc::Rpc, public client::internal::TabletRpc { |
45 | | public: |
46 | | CDCWriteRpc(CoarseTimePoint deadline, |
47 | | client::internal::RemoteTablet *tablet, |
48 | | const std::shared_ptr<const client::YBTable>& table, |
49 | | client::YBClient *client, |
50 | | WriteRequestPB *req, |
51 | | WriteCDCRecordCallback callback, |
52 | | bool use_local_tserver) |
53 | | : rpc::Rpc(deadline, client->messenger(), &client->proxy_cache()), |
54 | | trace_(new Trace), |
55 | | invoker_(use_local_tserver /* local_tserver_only */, |
56 | | false /* consistent_prefix */, |
57 | | client, |
58 | | this, |
59 | | this, |
60 | | tablet, |
61 | | table, |
62 | | mutable_retrier(), |
63 | | trace_.get()), |
64 | 0 | callback_(std::move(callback)) { |
65 | 0 | req_.Swap(req); |
66 | 0 | } |
67 | | |
68 | 0 | virtual ~CDCWriteRpc() { |
69 | 0 | CHECK(called_); |
70 | 0 | } |
71 | | |
72 | 0 | void SendRpc() override { |
73 | 0 | invoker_.Execute(tablet_id()); |
74 | 0 | } |
75 | | |
76 | 0 | void Finished(const Status &status) override { |
77 | 0 | Status new_status = status; |
78 | 0 | if (invoker_.Done(&new_status)) { |
79 | 0 | InvokeCallback(new_status); |
80 | 0 | } |
81 | 0 | } |
82 | | |
83 | 0 | void Failed(const Status &status) override {} |
84 | | |
85 | 0 | void Abort() override { |
86 | 0 | rpc::Rpc::Abort(); |
87 | 0 | } |
88 | | |
89 | 0 | const TabletServerErrorPB *response_error() const override { |
90 | 0 | return resp_.has_error() ? &resp_.error() : nullptr; |
91 | 0 | } |
92 | | |
93 | | private: |
94 | 0 | void SendRpcToTserver(int attempt_num) override { |
95 | 0 | InvokeAsync(invoker_.proxy().get(), |
96 | 0 | PrepareController(), |
97 | 0 | std::bind(&CDCWriteRpc::Finished, this, Status::OK())); |
98 | 0 | } |
99 | | |
100 | 0 | const std::string &tablet_id() const { |
101 | 0 | return req_.tablet_id(); |
102 | 0 | } |
103 | | |
104 | 0 | std::string ToString() const override { |
105 | 0 | return Format("CDCWriteRpc: $0, retrier: $1", req_, retrier()); |
106 | 0 | } |
107 | | |
108 | 0 | void InvokeCallback(const Status &status) { |
109 | 0 | if (!called_) { |
110 | 0 | called_ = true; |
111 | 0 | callback_(status, resp_); |
112 | 0 | } else { |
113 | 0 | LOG(WARNING) << "Multiple invocation of CDCWriteRpc: " |
114 | 0 | << status.ToString() << " : " << resp_.DebugString(); |
115 | 0 | } |
116 | 0 | } |
117 | | |
118 | | void InvokeAsync(TabletServerServiceProxy *proxy, |
119 | | rpc::RpcController *controller, |
120 | 0 | rpc::ResponseCallback callback) { |
121 | 0 | proxy->WriteAsync(req_, &resp_, controller, std::move(callback)); |
122 | 0 | } |
123 | | |
124 | | TracePtr trace_; |
125 | | client::internal::TabletInvoker invoker_; |
126 | | WriteRequestPB req_; |
127 | | WriteResponsePB resp_; |
128 | | WriteCDCRecordCallback callback_; |
129 | | bool called_ = false; |
130 | | }; |
131 | | |
132 | | rpc::RpcCommandPtr CreateCDCWriteRpc( |
133 | | CoarseTimePoint deadline, |
134 | | client::internal::RemoteTablet* tablet, |
135 | | const std::shared_ptr<const client::YBTable>& table, |
136 | | client::YBClient* client, |
137 | | WriteRequestPB* req, |
138 | | WriteCDCRecordCallback callback, |
139 | 0 | bool use_local_tserver) { |
140 | 0 | return std::make_shared<CDCWriteRpc>( |
141 | 0 | deadline, tablet, table, client, req, std::move(callback), use_local_tserver); |
142 | 0 | } |
143 | | |
144 | | /////////////////////////////////////////////////////////////////////////////// |
145 | | /////////////////////////////////////////////////////////////////////////////// |
146 | | /////////////////////////////////////////////////////////////////////////////// |
147 | | |
148 | | class CDCReadRpc : public rpc::Rpc, public client::internal::TabletRpc { |
149 | | public: |
150 | | CDCReadRpc(CoarseTimePoint deadline, |
151 | | client::internal::RemoteTablet *tablet, |
152 | | client::YBClient *client, |
153 | | GetChangesRequestPB* req, |
154 | | GetChangesCDCRpcCallback callback) |
155 | | : rpc::Rpc(deadline, client->messenger(), &client->proxy_cache()), |
156 | | trace_(new Trace), |
157 | | invoker_(false /* local_tserver_only */, |
158 | | false /* consistent_prefix */, |
159 | | client, |
160 | | this, |
161 | | this, |
162 | | tablet, |
163 | | /* table =*/ nullptr, |
164 | | mutable_retrier(), |
165 | | trace_.get(), |
166 | | master::IncludeInactive::kTrue), |
167 | 0 | callback_(std::move(callback)) { |
168 | 0 | req_.Swap(req); |
169 | 0 | } |
170 | | |
171 | 0 | virtual ~CDCReadRpc() { |
172 | 0 | CHECK(called_); |
173 | 0 | } |
174 | | |
175 | 0 | void SendRpc() override { |
176 | 0 | invoker_.Execute(tablet_id()); |
177 | 0 | } |
178 | | |
179 | 0 | void Finished(const Status &status) override { |
180 | 0 | auto retained = shared_from_this(); // Ensure we don't destruct until after the callback. |
181 | 0 | Status new_status = status; |
182 | 0 | if (invoker_.Done(&new_status)) { |
183 | 0 | InvokeCallback(new_status); |
184 | 0 | } |
185 | 0 | } |
186 | | |
187 | 0 | void Failed(const Status &status) override { } |
188 | | |
189 | 0 | void Abort() override { |
190 | 0 | rpc::Rpc::Abort(); |
191 | 0 | } |
192 | | |
193 | 0 | const tserver::TabletServerErrorPB *response_error() const override { |
194 | | // Clear the contents of last_error_, since this function is invoked again on retry. |
195 | 0 | last_error_.Clear(); |
196 | |
|
197 | 0 | if (resp_.has_error()) { |
198 | 0 | if (resp_.error().has_code()) { |
199 | | // Map CDC Errors to TabletServer Errors. |
200 | 0 | switch (resp_.error().code()) { |
201 | 0 | case CDCErrorPB::TABLET_NOT_FOUND: |
202 | 0 | last_error_.set_code(tserver::TabletServerErrorPB::TABLET_NOT_FOUND); |
203 | 0 | if (resp_.error().has_status()) { |
204 | 0 | last_error_.mutable_status()->CopyFrom(resp_.error().status()); |
205 | 0 | } |
206 | 0 | return &last_error_; |
207 | 0 | case CDCErrorPB::LEADER_NOT_READY: |
208 | 0 | last_error_.set_code(tserver::TabletServerErrorPB::LEADER_NOT_READY_TO_SERVE); |
209 | 0 | if (resp_.error().has_status()) { |
210 | 0 | last_error_.mutable_status()->CopyFrom(resp_.error().status()); |
211 | 0 | } |
212 | 0 | return &last_error_; |
213 | | // TS.STALE_FOLLOWER => pattern not used. |
214 | 0 | default: |
215 | 0 | break; |
216 | 0 | } |
217 | 0 | } |
218 | 0 | } |
219 | 0 | return nullptr; |
220 | 0 | } |
221 | | |
222 | 0 | void SendRpcToTserver(int attempt_num) override { |
223 | | // should be fast because the proxy cache has EndPoint from the tablet lookup. |
224 | 0 | cdc_proxy_ = std::make_shared<CDCServiceProxy>( |
225 | 0 | &invoker_.client().proxy_cache(), invoker_.ProxyEndpoint()); |
226 | |
|
227 | 0 | auto self = std::static_pointer_cast<CDCReadRpc>(shared_from_this()); |
228 | 0 | InvokeAsync(cdc_proxy_.get(), |
229 | 0 | PrepareController(), |
230 | 0 | std::bind(&CDCReadRpc::Finished, self, Status::OK())); |
231 | 0 | } |
232 | | |
233 | | private: |
234 | 0 | const std::string &tablet_id() const { |
235 | 0 | return req_.tablet_id(); |
236 | 0 | } |
237 | | |
238 | 0 | std::string ToString() const override { |
239 | 0 | return Format("CDCReadRpc: $0, retrier: $1", req_, retrier()); |
240 | 0 | } |
241 | | |
242 | 0 | void InvokeCallback(const Status &status) { |
243 | 0 | if (!called_) { |
244 | 0 | called_ = true; |
245 | 0 | callback_(status, std::move(resp_)); |
246 | 0 | } else { |
247 | 0 | LOG(WARNING) << "Multiple invocation of CDCReadRpc: " |
248 | 0 | << status.ToString() << " : " << resp_.DebugString(); |
249 | 0 | } |
250 | 0 | } |
251 | | |
252 | | void InvokeAsync(CDCServiceProxy *cdc_proxy, |
253 | | rpc::RpcController *controller, |
254 | 0 | rpc::ResponseCallback callback) { |
255 | 0 | cdc_proxy->GetChangesAsync(req_, &resp_, controller, std::move(callback)); |
256 | 0 | } |
257 | | |
258 | | TracePtr trace_; |
259 | | client::internal::TabletInvoker invoker_; |
260 | | |
261 | | GetChangesRequestPB req_; |
262 | | GetChangesResponsePB resp_; |
263 | | GetChangesCDCRpcCallback callback_; |
264 | | |
265 | | std::shared_ptr<CDCServiceProxy> cdc_proxy_; |
266 | | mutable tserver::TabletServerErrorPB last_error_; |
267 | | bool called_ = false; |
268 | | }; |
269 | | |
270 | | MUST_USE_RESULT rpc::RpcCommandPtr CreateGetChangesCDCRpc( |
271 | | CoarseTimePoint deadline, |
272 | | client::internal::RemoteTablet* tablet, |
273 | | client::YBClient* client, |
274 | | GetChangesRequestPB* req, |
275 | 0 | GetChangesCDCRpcCallback callback) { |
276 | 0 | return std::make_shared<CDCReadRpc>( |
277 | 0 | deadline, tablet, client, req, std::move(callback)); |
278 | 0 | } |
279 | | |
280 | | |
281 | | } // namespace cdc |
282 | | } // namespace yb |