YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/ent/src/yb/tserver/cdc_poller.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include "yb/tserver/cdc_poller.h"
15
#include "yb/tserver/cdc_consumer.h"
16
#include "yb/tserver/twodc_output_client.h"
17
18
#include "yb/cdc/cdc_rpc.h"
19
#include "yb/cdc/cdc_service.pb.h"
20
#include "yb/cdc/cdc_service.proxy.h"
21
#include "yb/client/client.h"
22
23
#include "yb/consensus/opid_util.h"
24
25
#include "yb/gutil/strings/substitute.h"
26
27
#include "yb/util/logging.h"
28
#include "yb/util/status_log.h"
29
#include "yb/util/threadpool.h"
30
31
// Similar heuristic to heartbeat_interval in heartbeater.cc.
32
DEFINE_int32(async_replication_polling_delay_ms, 0,
33
             "How long to delay in ms between applying and polling.");
34
DEFINE_int32(async_replication_idle_delay_ms, 100,
35
             "How long to delay between polling when we expect no data at the destination.");
36
DEFINE_int32(async_replication_max_idle_wait, 3,
37
             "Maximum number of consecutive empty GetChanges until the poller "
38
             "backs off to the idle interval, rather than immediately retrying.");
39
DEFINE_int32(replication_failure_delay_exponent, 16 /* ~ 2^16/1000 ~= 65 sec */,
40
             "Max number of failures (N) to use when calculating exponential backoff (2^N-1).");
41
DEFINE_bool(cdc_consumer_use_proxy_forwarding, false,
42
            "When enabled, read requests from the CDC Consumer that go to the wrong node are "
43
            "forwarded to the correct node by the Producer.");
44
45
DECLARE_int32(cdc_read_rpc_timeout_ms);
46
47
namespace yb {
48
namespace tserver {
49
namespace enterprise {
50
51
CDCPoller::CDCPoller(const cdc::ProducerTabletInfo& producer_tablet_info,
52
                     const cdc::ConsumerTabletInfo& consumer_tablet_info,
53
                     std::function<bool(void)> should_continue_polling,
54
                     std::function<void(void)> remove_self_from_pollers_map,
55
                     ThreadPool* thread_pool,
56
                     rpc::Rpcs* rpcs,
57
                     const std::shared_ptr<CDCClient>& local_client,
58
                     const std::shared_ptr<CDCClient>& producer_client,
59
                     CDCConsumer* cdc_consumer,
60
                     bool use_local_tserver) :
61
    producer_tablet_info_(producer_tablet_info),
62
    consumer_tablet_info_(consumer_tablet_info),
63
    should_continue_polling_(std::move(should_continue_polling)),
64
    remove_self_from_pollers_map_(std::move(remove_self_from_pollers_map)),
65
    op_id_(consensus::MinimumOpId()),
66
    resp_(std::make_unique<cdc::GetChangesResponsePB>()),
67
    output_client_(CreateTwoDCOutputClient(
68
        cdc_consumer,
69
        consumer_tablet_info,
70
        producer_tablet_info,
71
        local_client,
72
        rpcs,
73
        std::bind(&CDCPoller::HandleApplyChanges, this, std::placeholders::_1),
74
        use_local_tserver)),
75
    producer_client_(producer_client),
76
    thread_pool_(thread_pool),
77
    rpcs_(rpcs),
78
    poll_handle_(rpcs_->InvalidHandle()),
79
0
    cdc_consumer_(cdc_consumer) {}
80
81
0
CDCPoller::~CDCPoller() {
82
0
  rpcs_->Abort({&poll_handle_});
83
0
}
84
85
0
std::string CDCPoller::LogPrefixUnlocked() const {
86
0
  return strings::Substitute("P [$0:$1] C [$2:$3]: ",
87
0
                             producer_tablet_info_.stream_id,
88
0
                             producer_tablet_info_.tablet_id,
89
0
                             consumer_tablet_info_.table_id,
90
0
                             consumer_tablet_info_.tablet_id);
91
0
}
92
93
0
bool CDCPoller::CheckOnline() {
94
0
  return cdc_consumer_ != nullptr;
95
0
}
96
97
#define RETURN_WHEN_OFFLINE() \
98
0
  if (!CheckOnline()) { \
99
0
    LOG_WITH_PREFIX_UNLOCKED(WARNING) << "CDC Poller went offline"; \
100
0
    return; \
101
0
  }
102
103
0
void CDCPoller::Poll() {
104
0
  RETURN_WHEN_OFFLINE();
105
0
  WARN_NOT_OK(thread_pool_->SubmitFunc(std::bind(&CDCPoller::DoPoll, this)),
106
0
              "Could not submit Poll to thread pool");
107
0
}
108
109
0
void CDCPoller::DoPoll() {
110
0
  RETURN_WHEN_OFFLINE();
111
112
0
  auto retained = shared_from_this();
113
0
  std::lock_guard<std::mutex> l(data_mutex_);
114
115
  // determine if we should delay our upcoming poll
116
0
  int64_t delay = FLAGS_async_replication_polling_delay_ms; // normal throttling.
117
0
  if (idle_polls_ >= FLAGS_async_replication_max_idle_wait) {
118
0
    delay = std::max(delay, (int64_t)FLAGS_async_replication_idle_delay_ms); // idle backoff.
119
0
  }
120
0
  if (poll_failures_ > 0) {
121
0
    delay = std::max(delay, (int64_t)1 << poll_failures_); // exponential backoff for failures.
122
0
  }
123
0
  if (delay > 0) {
124
0
    SleepFor(MonoDelta::FromMilliseconds(delay));
125
0
  }
126
127
0
  cdc::GetChangesRequestPB req;
128
0
  req.set_stream_id(producer_tablet_info_.stream_id);
129
0
  req.set_tablet_id(producer_tablet_info_.tablet_id);
130
0
  req.set_serve_as_proxy(FLAGS_cdc_consumer_use_proxy_forwarding);
131
132
0
  cdc::CDCCheckpointPB checkpoint;
133
0
  *checkpoint.mutable_op_id() = op_id_;
134
0
  if (checkpoint.op_id().index() > 0 || checkpoint.op_id().term() > 0) {
135
    // Only send non-zero checkpoints in request.
136
    // If we don't know the latest checkpoint, then CDC producer can use the checkpoint from
137
    // cdc_state table.
138
    // This is useful in scenarios where a new tablet peer becomes replication leader for a
139
    // producer tablet and is not aware of the last checkpoint.
140
0
    *req.mutable_from_checkpoint() = checkpoint;
141
0
  }
142
143
0
  auto rpcs = rpcs_;
144
0
  poll_handle_ = rpcs->Prepare();
145
0
  if (poll_handle_ == rpcs->InvalidHandle()) {
146
0
    return remove_self_from_pollers_map_();
147
0
  }
148
149
0
  *poll_handle_ = CreateGetChangesCDCRpc(
150
0
      CoarseMonoClock::now() + MonoDelta::FromMilliseconds(FLAGS_cdc_read_rpc_timeout_ms),
151
0
      nullptr, /* RemoteTablet: will get this from 'req' */
152
0
      producer_client_->client.get(),
153
0
      &req,
154
0
      [=](const Status &status, cdc::GetChangesResponsePB &&new_resp) {
155
0
        auto retained = rpcs->Unregister(&poll_handle_);
156
0
        auto resp = std::make_shared<cdc::GetChangesResponsePB>(std::move(new_resp));
157
0
        WARN_NOT_OK(thread_pool_->SubmitFunc(std::bind(&CDCPoller::HandlePoll, this,
158
0
                                                       status, resp)),
159
0
                    "Could not submit HandlePoll to thread pool");
160
0
      });
161
0
  (**poll_handle_).SendRpc();
162
0
}
163
164
void CDCPoller::HandlePoll(yb::Status status,
165
0
                           std::shared_ptr<cdc::GetChangesResponsePB> resp) {
166
0
  RETURN_WHEN_OFFLINE();
167
168
0
  auto retained = shared_from_this();
169
0
  std::lock_guard<std::mutex> l(data_mutex_);
170
171
0
  if (!should_continue_polling_()) {
172
0
    return remove_self_from_pollers_map_();
173
0
  }
174
175
0
  status_ = status;
176
0
  resp_ = resp;
177
178
0
  bool failed = false;
179
0
  if (!status_.ok()) {
180
0
    LOG_WITH_PREFIX_UNLOCKED(INFO) << "CDCPoller failure: " << status_.ToString();
181
0
    failed = true;
182
0
  } else if (resp_->has_error()) {
183
0
    LOG_WITH_PREFIX_UNLOCKED(WARNING) << "CDCPoller failure response: code="
184
0
                                      << resp_->error().code()
185
0
                                      << ", status=" << resp->error().status().DebugString();
186
0
    failed = true;
187
0
  } else if (!resp_->has_checkpoint()) {
188
0
    LOG_WITH_PREFIX_UNLOCKED(ERROR) << "CDCPoller failure: no checkpoint";
189
0
    failed = true;
190
0
  }
191
0
  if (failed) {
192
    // In case of errors, try polling again with backoff
193
0
    poll_failures_ = std::min(poll_failures_ + 1, FLAGS_replication_failure_delay_exponent);
194
0
    return Poll();
195
0
  }
196
0
  poll_failures_ = std::max(poll_failures_ - 2, 0); // otherwise, recover slowly if we're congested
197
198
  // Success Case: ApplyChanges() from Poll
199
0
  WARN_NOT_OK(output_client_->ApplyChanges(resp_.get()), "Could not ApplyChanges");
200
0
}
201
202
0
void CDCPoller::HandleApplyChanges(cdc::OutputClientResponse response) {
203
0
  RETURN_WHEN_OFFLINE();
204
205
0
  WARN_NOT_OK(thread_pool_->SubmitFunc(std::bind(&CDCPoller::DoHandleApplyChanges, this, response)),
206
0
              "Could not submit HandleApplyChanges to thread pool");
207
0
}
208
209
0
void CDCPoller::DoHandleApplyChanges(cdc::OutputClientResponse response) {
210
0
  RETURN_WHEN_OFFLINE();
211
212
0
  auto retained = shared_from_this();
213
0
  std::lock_guard<std::mutex> l(data_mutex_);
214
215
0
  if (!should_continue_polling_()) {
216
0
    return remove_self_from_pollers_map_();
217
0
  }
218
219
0
  if (!response.status.ok()) {
220
0
    LOG_WITH_PREFIX_UNLOCKED(WARNING) << "ApplyChanges failure: " << response.status;
221
    // Repeat the ApplyChanges step, with exponential backoff
222
0
    apply_failures_ = std::min(apply_failures_ + 1, FLAGS_replication_failure_delay_exponent);
223
0
    int64_t delay = (1 << apply_failures_) -1;
224
0
    SleepFor(MonoDelta::FromMilliseconds(delay));
225
0
    WARN_NOT_OK(output_client_->ApplyChanges(resp_.get()), "Could not ApplyChanges");
226
0
    return;
227
0
  }
228
0
  apply_failures_ = std::max(apply_failures_ - 2, 0); // recover slowly if we've gotten congested
229
230
0
  op_id_ = response.last_applied_op_id;
231
232
0
  idle_polls_ = (response.processed_record_count == 0) ? idle_polls_ + 1 : 0;
233
234
0
  Poll();
235
0
}
236
#undef RETURN_WHEN_OFFLINE
237
238
} // namespace enterprise
239
} // namespace tserver
240
} // namespace yb