YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/ent/src/yb/cdc/cdc_rpc.cc
Line
Count
Source (jump to first uncovered line)
1
//
2
// Copyright (c) YugaByte, Inc.
3
//
4
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
5
// in compliance with the License.  You may obtain a copy of the License at
6
//
7
// http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software distributed under the License
10
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
11
// or implied.  See the License for the specific language governing permissions and limitations
12
// under the License.
13
//
14
//
15
16
#include "yb/cdc/cdc_rpc.h"
17
18
#include "yb/cdc/cdc_service.pb.h"
19
#include "yb/cdc/cdc_service.proxy.h"
20
21
#include "yb/client/client.h"
22
#include "yb/client/client-internal.h"
23
#include "yb/client/meta_cache.h"
24
#include "yb/client/tablet_rpc.h"
25
26
#include "yb/rpc/rpc.h"
27
#include "yb/rpc/rpc_controller.h"
28
29
#include "yb/tserver/tserver_service.pb.h"
30
#include "yb/tserver/tserver_service.proxy.h"
31
32
#include "yb/util/trace.h"
33
34
using namespace std::literals;
35
36
using yb::tserver::TabletServerErrorPB;
37
using yb::tserver::TabletServerServiceProxy;
38
using yb::tserver::WriteRequestPB;
39
using yb::tserver::WriteResponsePB;
40
41
namespace yb {
42
namespace cdc {
43
44
class CDCWriteRpc : public rpc::Rpc, public client::internal::TabletRpc {
45
 public:
46
  CDCWriteRpc(CoarseTimePoint deadline,
47
              client::internal::RemoteTablet *tablet,
48
              const std::shared_ptr<const client::YBTable>& table,
49
              client::YBClient *client,
50
              WriteRequestPB *req,
51
              WriteCDCRecordCallback callback,
52
              bool use_local_tserver)
53
      : rpc::Rpc(deadline, client->messenger(), &client->proxy_cache()),
54
        trace_(new Trace),
55
        invoker_(use_local_tserver /* local_tserver_only */,
56
                 false /* consistent_prefix */,
57
                 client,
58
                 this,
59
                 this,
60
                 tablet,
61
                 table,
62
                 mutable_retrier(),
63
                 trace_.get()),
64
0
        callback_(std::move(callback)) {
65
0
    req_.Swap(req);
66
0
  }
67
68
0
  virtual ~CDCWriteRpc() {
69
0
    CHECK(called_);
70
0
  }
71
72
0
  void SendRpc() override {
73
0
    invoker_.Execute(tablet_id());
74
0
  }
75
76
0
  void Finished(const Status &status) override {
77
0
    Status new_status = status;
78
0
    if (invoker_.Done(&new_status)) {
79
0
      InvokeCallback(new_status);
80
0
    }
81
0
  }
82
83
0
  void Failed(const Status &status) override {}
84
85
0
  void Abort() override {
86
0
    rpc::Rpc::Abort();
87
0
  }
88
89
0
  const TabletServerErrorPB *response_error() const override {
90
0
    return resp_.has_error() ? &resp_.error() : nullptr;
91
0
  }
92
93
 private:
94
0
  void SendRpcToTserver(int attempt_num) override {
95
0
    InvokeAsync(invoker_.proxy().get(),
96
0
                PrepareController(),
97
0
                std::bind(&CDCWriteRpc::Finished, this, Status::OK()));
98
0
  }
99
100
0
  const std::string &tablet_id() const {
101
0
    return req_.tablet_id();
102
0
  }
103
104
0
  std::string ToString() const override {
105
0
    return Format("CDCWriteRpc: $0, retrier: $1", req_, retrier());
106
0
  }
107
108
0
  void InvokeCallback(const Status &status) {
109
0
    if (!called_) {
110
0
      called_ = true;
111
0
      callback_(status, resp_);
112
0
    } else {
113
0
      LOG(WARNING) << "Multiple invocation of CDCWriteRpc: "
114
0
                   << status.ToString() << " : " << resp_.DebugString();
115
0
    }
116
0
  }
117
118
  void InvokeAsync(TabletServerServiceProxy *proxy,
119
                   rpc::RpcController *controller,
120
0
                   rpc::ResponseCallback callback) {
121
0
    proxy->WriteAsync(req_, &resp_, controller, std::move(callback));
122
0
  }
123
124
  TracePtr trace_;
125
  client::internal::TabletInvoker invoker_;
126
  WriteRequestPB req_;
127
  WriteResponsePB resp_;
128
  WriteCDCRecordCallback callback_;
129
  bool called_ = false;
130
};
131
132
rpc::RpcCommandPtr CreateCDCWriteRpc(
133
    CoarseTimePoint deadline,
134
    client::internal::RemoteTablet* tablet,
135
    const std::shared_ptr<const client::YBTable>& table,
136
    client::YBClient* client,
137
    WriteRequestPB* req,
138
    WriteCDCRecordCallback callback,
139
0
    bool use_local_tserver) {
140
0
  return std::make_shared<CDCWriteRpc>(
141
0
      deadline, tablet, table, client, req, std::move(callback), use_local_tserver);
142
0
}
143
144
///////////////////////////////////////////////////////////////////////////////
145
///////////////////////////////////////////////////////////////////////////////
146
///////////////////////////////////////////////////////////////////////////////
147
148
class CDCReadRpc : public rpc::Rpc, public client::internal::TabletRpc {
149
 public:
150
  CDCReadRpc(CoarseTimePoint deadline,
151
             client::internal::RemoteTablet *tablet,
152
             client::YBClient *client,
153
             GetChangesRequestPB* req,
154
             GetChangesCDCRpcCallback callback)
155
      : rpc::Rpc(deadline, client->messenger(), &client->proxy_cache()),
156
        trace_(new Trace),
157
        invoker_(false /* local_tserver_only */,
158
                 false /* consistent_prefix */,
159
                 client,
160
                 this,
161
                 this,
162
                 tablet,
163
                 /* table =*/ nullptr,
164
                 mutable_retrier(),
165
                 trace_.get(),
166
                 master::IncludeInactive::kTrue),
167
0
        callback_(std::move(callback)) {
168
0
    req_.Swap(req);
169
0
  }
170
171
0
  virtual ~CDCReadRpc() {
172
0
    CHECK(called_);
173
0
  }
174
175
0
  void SendRpc() override {
176
0
    invoker_.Execute(tablet_id());
177
0
  }
178
179
0
  void Finished(const Status &status) override {
180
0
    auto retained = shared_from_this(); // Ensure we don't destruct until after the callback.
181
0
    Status new_status = status;
182
0
    if (invoker_.Done(&new_status)) {
183
0
      InvokeCallback(new_status);
184
0
    }
185
0
  }
186
187
0
  void Failed(const Status &status) override { }
188
189
0
  void Abort() override {
190
0
    rpc::Rpc::Abort();
191
0
  }
192
193
0
  const tserver::TabletServerErrorPB *response_error() const override {
194
    // Clear the contents of last_error_, since this function is invoked again on retry.
195
0
    last_error_.Clear();
196
197
0
    if (resp_.has_error()) {
198
0
      if (resp_.error().has_code()) {
199
        // Map CDC Errors to TabletServer Errors.
200
0
        switch (resp_.error().code()) {
201
0
          case CDCErrorPB::TABLET_NOT_FOUND:
202
0
            last_error_.set_code(tserver::TabletServerErrorPB::TABLET_NOT_FOUND);
203
0
            if (resp_.error().has_status()) {
204
0
              last_error_.mutable_status()->CopyFrom(resp_.error().status());
205
0
            }
206
0
            return &last_error_;
207
0
          case CDCErrorPB::LEADER_NOT_READY:
208
0
            last_error_.set_code(tserver::TabletServerErrorPB::LEADER_NOT_READY_TO_SERVE);
209
0
            if (resp_.error().has_status()) {
210
0
              last_error_.mutable_status()->CopyFrom(resp_.error().status());
211
0
            }
212
0
            return &last_error_;
213
          // TS.STALE_FOLLOWER => pattern not used.
214
0
          default:
215
0
            break;
216
0
        }
217
0
      }
218
0
    }
219
0
    return nullptr;
220
0
  }
221
222
0
  void SendRpcToTserver(int attempt_num) override {
223
    // should be fast because the proxy cache has EndPoint from the tablet lookup.
224
0
    cdc_proxy_ = std::make_shared<CDCServiceProxy>(
225
0
       &invoker_.client().proxy_cache(), invoker_.ProxyEndpoint());
226
227
0
    auto self = std::static_pointer_cast<CDCReadRpc>(shared_from_this());
228
0
    InvokeAsync(cdc_proxy_.get(),
229
0
        PrepareController(),
230
0
        std::bind(&CDCReadRpc::Finished, self, Status::OK()));
231
0
  }
232
233
 private:
234
0
  const std::string &tablet_id() const {
235
0
    return req_.tablet_id();
236
0
  }
237
238
0
  std::string ToString() const override {
239
0
    return Format("CDCReadRpc: $0, retrier: $1", req_, retrier());
240
0
  }
241
242
0
  void InvokeCallback(const Status &status) {
243
0
    if (!called_) {
244
0
      called_ = true;
245
0
      callback_(status, std::move(resp_));
246
0
    } else {
247
0
      LOG(WARNING) << "Multiple invocation of CDCReadRpc: "
248
0
                   << status.ToString() << " : " << resp_.DebugString();
249
0
    }
250
0
  }
251
252
  void InvokeAsync(CDCServiceProxy *cdc_proxy,
253
                   rpc::RpcController *controller,
254
0
                   rpc::ResponseCallback callback) {
255
0
    cdc_proxy->GetChangesAsync(req_, &resp_, controller, std::move(callback));
256
0
  }
257
258
  TracePtr trace_;
259
  client::internal::TabletInvoker invoker_;
260
261
  GetChangesRequestPB req_;
262
  GetChangesResponsePB resp_;
263
  GetChangesCDCRpcCallback callback_;
264
265
  std::shared_ptr<CDCServiceProxy> cdc_proxy_;
266
  mutable tserver::TabletServerErrorPB last_error_;
267
  bool called_ = false;
268
};
269
270
MUST_USE_RESULT rpc::RpcCommandPtr CreateGetChangesCDCRpc(
271
    CoarseTimePoint deadline,
272
    client::internal::RemoteTablet* tablet,
273
    client::YBClient* client,
274
    GetChangesRequestPB* req,
275
0
    GetChangesCDCRpcCallback callback) {
276
0
  return std::make_shared<CDCReadRpc>(
277
0
      deadline, tablet, client, req, std::move(callback));
278
0
}
279
280
281
} // namespace cdc
282
} // namespace yb