YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/ent/src/yb/tserver/twodc_output_client.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
13
#include "yb/tserver/twodc_output_client.h"
14
15
#include <shared_mutex>
16
17
#include "yb/cdc/cdc_util.h"
18
#include "yb/cdc/cdc_rpc.h"
19
#include "yb/client/client.h"
20
#include "yb/client/client_error.h"
21
#include "yb/client/client_utils.h"
22
#include "yb/client/meta_cache.h"
23
#include "yb/client/table.h"
24
#include "yb/gutil/strings/join.h"
25
#include "yb/master/master_replication.pb.h"
26
#include "yb/rpc/rpc.h"
27
#include "yb/rpc/rpc_fwd.h"
28
#include "yb/tserver/cdc_consumer.h"
29
#include "yb/tserver/tserver_service.proxy.h"
30
#include "yb/tserver/twodc_write_interface.h"
31
#include "yb/util/flag_tags.h"
32
#include "yb/util/logging.h"
33
#include "yb/util/net/net_util.h"
34
#include "yb/util/result.h"
35
#include "yb/util/status.h"
36
#include "yb/util/stol_utils.h"
37
38
DECLARE_int32(cdc_write_rpc_timeout_ms);
39
40
DEFINE_bool(cdc_force_remote_tserver, false,
41
            "Avoid local tserver apply optimization for CDC and force remote RPCs.");
42
TAG_FLAG(cdc_force_remote_tserver, runtime);
43
44
DECLARE_int32(cdc_read_rpc_timeout_ms);
45
46
DEFINE_test_flag(bool, xcluster_consumer_fail_after_process_split_op, false,
47
                 "Whether or not to fail after processing a replicated split_op on the consumer.");
48
49
using namespace std::placeholders;
50
51
namespace yb {
52
namespace tserver {
53
namespace enterprise {
54
55
using rpc::Rpc;
56
57
class TwoDCOutputClient : public cdc::CDCOutputClient {
58
 public:
59
  TwoDCOutputClient(
60
      CDCConsumer* cdc_consumer,
61
      const cdc::ConsumerTabletInfo& consumer_tablet_info,
62
      const cdc::ProducerTabletInfo& producer_tablet_info,
63
      const std::shared_ptr<CDCClient>& local_client,
64
      rpc::Rpcs* rpcs,
65
      std::function<void(const cdc::OutputClientResponse& response)> apply_changes_clbk,
66
      bool use_local_tserver) :
67
      cdc_consumer_(cdc_consumer),
68
      consumer_tablet_info_(consumer_tablet_info),
69
      producer_tablet_info_(producer_tablet_info),
70
      local_client_(local_client),
71
      rpcs_(rpcs),
72
      write_handle_(rpcs->InvalidHandle()),
73
      apply_changes_clbk_(std::move(apply_changes_clbk)),
74
      use_local_tserver_(use_local_tserver),
75
0
      all_tablets_result_(STATUS(Uninitialized, "Result has not been initialized.")) {}
76
77
0
  ~TwoDCOutputClient() {
78
0
    std::lock_guard<decltype(lock_)> l(lock_);
79
0
    rpcs_->Abort({&write_handle_});
80
0
  }
81
82
  CHECKED_STATUS ApplyChanges(const cdc::GetChangesResponsePB* resp) override;
83
84
  void WriteCDCRecordDone(const Status& status, const WriteResponsePB& response);
85
86
 private:
87
88
  // Process all records in twodc_resp_copy_ starting from the start index. If we find a ddl
89
  // record, then we process the current changes first, wait for those to complete, then process
90
  // the ddl + other changes after.
91
  CHECKED_STATUS ProcessChangesStartingFromIndex(int start);
92
93
  CHECKED_STATUS ProcessRecordForTablet(
94
      const int record_idx, const Result<client::internal::RemoteTabletPtr>& tablet);
95
96
  CHECKED_STATUS ProcessRecordForLocalTablet(const int record_idx);
97
98
  CHECKED_STATUS ProcessRecordForTabletRange(
99
      const int record_idx,
100
      const std::string partition_key_start,
101
      const std::string partition_key_end,
102
      const Result<std::vector<client::internal::RemoteTabletPtr>>& tablets);
103
104
  CHECKED_STATUS ProcessSplitOp(const cdc::CDCRecordPB& record);
105
106
  // Processes the Record and sends the CDCWrite for it.
107
  CHECKED_STATUS ProcessRecord(
108
      const std::vector<std::string>& tablet_ids, const cdc::CDCRecordPB& record);
109
110
  void SendNextCDCWriteToTablet(std::unique_ptr<WriteRequestPB> write_request);
111
112
  // Increment processed record count.
113
  // Returns true if all records are processed, false if there are still some pending records.
114
  bool IncProcessedRecordCount() REQUIRES(lock_);
115
116
  cdc::OutputClientResponse PrepareResponse() REQUIRES(lock_);
117
  void SendResponse(const cdc::OutputClientResponse& resp) EXCLUDES(lock_);
118
119
  void HandleResponse() EXCLUDES(lock_);
120
  void HandleError(const Status& s, bool done) EXCLUDES(lock_);
121
122
  bool UseLocalTserver();
123
124
  CDCConsumer* cdc_consumer_;
125
  cdc::ConsumerTabletInfo consumer_tablet_info_;
126
  cdc::ProducerTabletInfo producer_tablet_info_;
127
  std::shared_ptr<CDCClient> local_client_;
128
  rpc::Rpcs* rpcs_;
129
  rpc::Rpcs::Handle write_handle_ GUARDED_BY(lock_);
130
  std::function<void(const cdc::OutputClientResponse& response)> apply_changes_clbk_;
131
132
  bool use_local_tserver_;
133
134
  std::shared_ptr<client::YBTable> table_;
135
136
  // Used to protect error_status_, op_id_, done_processing_, write_handle_ and record counts.
137
  mutable rw_spinlock lock_;
138
  Status error_status_ GUARDED_BY(lock_);
139
  OpIdPB op_id_ GUARDED_BY(lock_) = consensus::MinimumOpId();
140
  bool done_processing_ GUARDED_BY(lock_) = false;
141
142
  uint32_t processed_record_count_ GUARDED_BY(lock_) = 0;
143
  uint32_t record_count_ GUARDED_BY(lock_) = 0;
144
145
  // This will cache the response to an ApplyChanges() request.
146
  cdc::GetChangesResponsePB twodc_resp_copy_;
147
148
  // Store the result of the lookup for all the tablets.
149
  yb::Result<std::vector<scoped_refptr<yb::client::internal::RemoteTablet>>> all_tablets_result_;
150
151
  yb::MonoDelta timeout_ms_;
152
153
  std::unique_ptr<TwoDCWriteInterface> write_strategy_ GUARDED_BY(lock_);
154
};
155
156
0
#define HANDLE_ERROR_AND_RETURN_IF_NOT_OK(status) do { \
157
0
  auto&& _s = (status); \
158
0
  if (!_s.ok()) { \
159
0
    HandleError(_s, true); \
160
0
    return std::move(_s); \
161
0
  } \
162
0
} while (0);
163
164
0
Status TwoDCOutputClient::ApplyChanges(const cdc::GetChangesResponsePB* poller_resp) {
165
  // ApplyChanges is called in a single threaded manner.
166
  // For all the changes in GetChangesResponsePB, we first fan out and find the tablet for
167
  // every record key.
168
  // Then we apply the records in the same order in which we received them.
169
  // Once all changes have been applied (successfully or not), we invoke the callback which will
170
  // then either poll for next set of changes (in case of successful application) or will try to
171
  // re-apply.
172
0
  DCHECK(poller_resp->has_checkpoint());
173
0
  twodc_resp_copy_.Clear();
174
175
  // Init class variables that threads will use.
176
0
  {
177
0
    std::lock_guard<decltype(lock_)> l(lock_);
178
0
    DCHECK(consensus::OpIdEquals(op_id_, consensus::MinimumOpId()));
179
0
    op_id_ = poller_resp->checkpoint().op_id();
180
0
    error_status_ = Status::OK();
181
0
    done_processing_ = false;
182
0
    processed_record_count_ = 0;
183
0
    record_count_ = poller_resp->records_size();
184
0
    ResetWriteInterface(&write_strategy_);
185
0
  }
186
187
  // Ensure we have records.
188
0
  if (poller_resp->records_size() == 0) {
189
0
    HandleResponse();
190
0
    return Status::OK();
191
0
  }
192
193
  // Ensure we have a connection to the consumer table cached.
194
0
  if (!table_) {
195
0
    HANDLE_ERROR_AND_RETURN_IF_NOT_OK(
196
0
        local_client_->client->OpenTable(consumer_tablet_info_.table_id, &table_));
197
0
  }
198
199
0
  twodc_resp_copy_ = *poller_resp;
200
0
  timeout_ms_ = MonoDelta::FromMilliseconds(FLAGS_cdc_read_rpc_timeout_ms);
201
  // Using this future as a barrier to get all the tablets before processing.  Ordered iteration
202
  // matters: we need to ensure that each record is handled sequentially.
203
0
  all_tablets_result_ = local_client_->client->LookupAllTabletsFuture(
204
0
      table_, CoarseMonoClock::now() + timeout_ms_).get();
205
206
0
  HANDLE_ERROR_AND_RETURN_IF_NOT_OK(ProcessChangesStartingFromIndex(0));
207
208
0
  return Status::OK();
209
0
}
210
211
0
Status TwoDCOutputClient::ProcessChangesStartingFromIndex(int start) {
212
0
  bool processed_write_record = false;
213
0
  for (int i = start; i < twodc_resp_copy_.records_size(); i++) {
214
    // All KV-pairs within a single CDC record will be for the same row.
215
    // key(0).key() will contain the hash code for that row. We use this to lookup the tablet.
216
0
    const auto& record = twodc_resp_copy_.records(i);
217
218
0
    if (record.operation() == cdc::CDCRecordPB::SPLIT_OP) {
219
0
      if (processed_write_record) {
220
        // We have existing write operations to handle first, so we'll handle those first, and
221
        // then return to processing this split op later (see WriteCDCRecordDone).
222
        // It is important to handle these buffered writes first, since handling the split op will
223
        // cause us to replace this poller, and thus if any of those writes fail, we would end up
224
        // losing those records (as the new pollers would start processing ops after the split op).
225
0
        break;
226
0
      }
227
      // No other records to process, so we can process the SPLIT_OP.
228
0
      RETURN_NOT_OK(ProcessSplitOp(record));
229
0
      continue;
230
0
    }
231
232
0
    if (UseLocalTserver()) {
233
0
      RETURN_NOT_OK(ProcessRecordForLocalTablet(i));
234
0
    } else {
235
0
      if (record.operation() == cdc::CDCRecordPB::APPLY) {
236
0
        RETURN_NOT_OK(ProcessRecordForTabletRange(
237
0
            i, record.partition().partition_key_start(),
238
0
            record.partition().partition_key_end(), all_tablets_result_));
239
0
      } else {
240
0
        auto partition_hash_key = PartitionSchema::EncodeMultiColumnHashValue(
241
0
            VERIFY_RESULT(CheckedStoInt<uint16_t>(record.key(0).key())));
242
0
        auto tablet_result = local_client_->client->LookupTabletByKeyFuture(
243
0
            table_, partition_hash_key, CoarseMonoClock::now() + timeout_ms_).get();
244
0
        RETURN_NOT_OK(ProcessRecordForTablet(i, tablet_result));
245
0
      }
246
0
    }
247
0
    processed_write_record = true;
248
0
  }
249
250
0
  if (processed_write_record) {
251
    // Send out the buffered writes.
252
0
    std::unique_ptr<WriteRequestPB> write_request;
253
0
    {
254
0
      std::lock_guard<decltype(lock_)> l(lock_);
255
0
      write_request = write_strategy_->GetNextWriteRequest();
256
0
    }
257
0
    if (!write_request) {
258
0
      LOG(WARNING) << "Expected to find a write_request but were unable to";
259
0
      return STATUS(IllegalState, "Could not find a write request to send");
260
0
    }
261
0
    SendNextCDCWriteToTablet(std::move(write_request));
262
0
  }
263
264
0
  return Status::OK();
265
0
}
266
267
0
bool TwoDCOutputClient::UseLocalTserver() {
268
0
  return use_local_tserver_ && !FLAGS_cdc_force_remote_tserver;
269
0
}
270
271
Status TwoDCOutputClient::ProcessRecord(const std::vector<std::string>& tablet_ids,
272
0
                                      const cdc::CDCRecordPB& record) {
273
0
  std::lock_guard<decltype(lock_)> l(lock_);
274
0
  for (const auto& tablet_id : tablet_ids) {
275
0
    auto status = write_strategy_->ProcessRecord(tablet_id, record);
276
0
    if (!status.ok()) {
277
0
      error_status_ = status;
278
0
      return status;
279
0
    }
280
0
  }
281
0
  IncProcessedRecordCount();
282
0
  return Status::OK();
283
0
}
284
285
Status TwoDCOutputClient::ProcessRecordForTablet(
286
    const int record_idx,
287
0
    const Result<client::internal::RemoteTabletPtr>& tablet) {
288
0
  RETURN_NOT_OK(tablet);
289
0
  return ProcessRecord({tablet->get()->tablet_id()}, twodc_resp_copy_.records(record_idx));
290
0
}
291
292
Status TwoDCOutputClient::ProcessRecordForTabletRange(
293
    const int record_idx,
294
    const std::string partition_key_start,
295
    const std::string partition_key_end,
296
0
    const Result<std::vector<client::internal::RemoteTabletPtr>>& tablets) {
297
0
  RETURN_NOT_OK(tablets);
298
299
0
  auto filtered_tablets_result = client::FilterTabletsByHashPartitionKeyRange(
300
0
      *tablets, partition_key_start, partition_key_end);
301
0
  RETURN_NOT_OK(filtered_tablets_result);
302
303
0
  auto filtered_tablets = *filtered_tablets_result;
304
0
  auto tablet_ids = std::vector<std::string>(filtered_tablets.size());
305
0
  std::transform(filtered_tablets.begin(), filtered_tablets.end(), tablet_ids.begin(),
306
0
                 [&](const auto& tablet_ptr) {
307
0
    return tablet_ptr->tablet_id();
308
0
  });
309
0
  return ProcessRecord(tablet_ids, twodc_resp_copy_.records(record_idx));
310
0
}
311
312
0
Status TwoDCOutputClient::ProcessRecordForLocalTablet(const int record_idx) {
313
0
  return ProcessRecord({consumer_tablet_info_.tablet_id}, twodc_resp_copy_.records(record_idx));
314
0
}
315
316
0
Status TwoDCOutputClient::ProcessSplitOp(const cdc::CDCRecordPB& record) {
317
  // Construct and send the update request.
318
0
  master::ProducerSplitTabletInfoPB split_info;
319
0
  split_info.set_tablet_id(record.split_tablet_request().tablet_id());
320
0
  split_info.set_new_tablet1_id(record.split_tablet_request().new_tablet1_id());
321
0
  split_info.set_new_tablet2_id(record.split_tablet_request().new_tablet2_id());
322
0
  split_info.set_split_encoded_key(record.split_tablet_request().split_encoded_key());
323
0
  split_info.set_split_partition_key(record.split_tablet_request().split_partition_key());
324
325
0
  RETURN_NOT_OK(local_client_->client->UpdateConsumerOnProducerSplit(
326
0
      producer_tablet_info_.universe_uuid, producer_tablet_info_.stream_id, split_info));
327
328
0
  if (PREDICT_FALSE(FLAGS_TEST_xcluster_consumer_fail_after_process_split_op)) {
329
0
    return STATUS(
330
0
        InternalError, "Fail due to FLAGS_TEST_xcluster_consumer_fail_after_process_split_op");
331
0
  }
332
333
  // Increment processed records, and check for completion.
334
0
  bool done;
335
0
  {
336
0
    std::lock_guard<decltype(lock_)> l(lock_);
337
0
    done = IncProcessedRecordCount();
338
0
  }
339
0
  if (done) {
340
0
    HandleResponse();
341
0
  }
342
0
  return Status::OK();
343
0
}
344
345
0
void TwoDCOutputClient::SendNextCDCWriteToTablet(std::unique_ptr<WriteRequestPB> write_request) {
346
  // TODO: This should be parallelized for better performance with M:N setups.
347
0
  auto deadline = CoarseMonoClock::Now() +
348
0
                  MonoDelta::FromMilliseconds(FLAGS_cdc_write_rpc_timeout_ms);
349
0
  std::lock_guard<decltype(lock_)> l(lock_);
350
0
  write_handle_ = rpcs_->Prepare();
351
0
  if (write_handle_ != rpcs_->InvalidHandle()) {
352
    // Send in nullptr for RemoteTablet since cdc rpc now gets the tablet_id from the write request.
353
0
    *write_handle_ = CreateCDCWriteRpc(
354
0
        deadline,
355
0
        nullptr /* RemoteTablet */,
356
0
        table_,
357
0
        local_client_->client.get(),
358
0
        write_request.get(),
359
0
        std::bind(&TwoDCOutputClient::WriteCDCRecordDone, this, _1, _2),
360
0
        UseLocalTserver());
361
0
    (**write_handle_).SendRpc();
362
0
  } else {
363
0
    LOG(WARNING) << "Invalid handle for CDC write, tablet ID: " << write_request->tablet_id();
364
0
  }
365
0
}
366
367
0
void TwoDCOutputClient::WriteCDCRecordDone(const Status& status, const WriteResponsePB& response) {
368
  // Handle response.
369
0
  rpc::RpcCommandPtr retained = nullptr;
370
0
  {
371
0
    std::lock_guard<decltype(lock_)> l(lock_);
372
0
    retained = rpcs_->Unregister(&write_handle_);
373
0
  }
374
0
  if (!status.ok()) {
375
0
    HandleError(status, true /* done */);
376
0
    return;
377
0
  } else if (response.has_error()) {
378
0
    HandleError(StatusFromPB(response.error().status()), true /* done */);
379
0
    return;
380
0
  }
381
0
  cdc_consumer_->IncrementNumSuccessfulWriteRpcs();
382
383
  // See if we need to handle any more writes.
384
0
  std::unique_ptr <WriteRequestPB> write_request;
385
0
  {
386
0
    std::lock_guard<decltype(lock_)> l(lock_);
387
0
    write_request = write_strategy_->GetNextWriteRequest();
388
0
  }
389
390
0
  if (write_request) {
391
0
    SendNextCDCWriteToTablet(std::move(write_request));
392
0
  } else {
393
    // We may still have more records to process (in case of ddls/master requests).
394
0
    int next_record = 0;
395
0
    {
396
0
      SharedLock<decltype(lock_)> l(lock_);
397
0
      if (processed_record_count_ < record_count_) {
398
        // processed_record_count_ is 1-based, so no need to add 1 to get next record.
399
0
        next_record = processed_record_count_;
400
0
      }
401
0
    }
402
0
    if (next_record > 0) {
403
      // Process rest of the records.
404
0
      Status s = ProcessChangesStartingFromIndex(next_record);
405
0
      if (!s.ok()) {
406
0
        HandleError(s, true);
407
0
      }
408
0
    } else {
409
      // Last record, return response to caller.
410
0
      HandleResponse();
411
0
    }
412
0
  }
413
0
}
414
415
0
void TwoDCOutputClient::HandleError(const Status& s, bool done) {
416
0
  LOG(ERROR) << "Error while applying replicated record: " << s
417
0
             << ", consumer tablet: " << consumer_tablet_info_.tablet_id;
418
0
  {
419
0
    std::lock_guard<decltype(lock_)> l(lock_);
420
0
    error_status_ = s;
421
    // In case of a consumer side tablet split, need to refresh the partitions.
422
0
    if (client::ClientError(error_status_) == client::ClientErrorCode::kTablePartitionListIsStale) {
423
0
      table_->MarkPartitionsAsStale();
424
0
    }
425
0
  }
426
0
  if (done) {
427
0
    HandleResponse();
428
0
  }
429
0
}
430
431
0
cdc::OutputClientResponse TwoDCOutputClient::PrepareResponse() {
432
0
  cdc::OutputClientResponse response;
433
0
  response.status = error_status_;
434
0
  if (response.status.ok()) {
435
0
    response.last_applied_op_id = op_id_;
436
0
    response.processed_record_count = processed_record_count_;
437
0
  }
438
0
  op_id_ = consensus::MinimumOpId();
439
0
  processed_record_count_ = 0;
440
0
  return response;
441
0
}
442
443
0
void TwoDCOutputClient::SendResponse(const cdc::OutputClientResponse& resp) {
444
0
  apply_changes_clbk_(resp);
445
0
}
446
447
0
void TwoDCOutputClient::HandleResponse() {
448
0
  cdc::OutputClientResponse response;
449
0
  {
450
0
    std::lock_guard<decltype(lock_)> l(lock_);
451
0
    response = PrepareResponse();
452
0
  }
453
0
  SendResponse(response);
454
0
}
455
456
0
bool TwoDCOutputClient::IncProcessedRecordCount() {
457
0
  processed_record_count_++;
458
0
  if (processed_record_count_ == record_count_) {
459
0
    done_processing_ = true;
460
0
  }
461
0
  CHECK(processed_record_count_ <= record_count_);
462
0
  return done_processing_;
463
0
}
464
465
std::unique_ptr<cdc::CDCOutputClient> CreateTwoDCOutputClient(
466
    CDCConsumer* cdc_consumer,
467
    const cdc::ConsumerTabletInfo& consumer_tablet_info,
468
    const cdc::ProducerTabletInfo& producer_tablet_info,
469
    const std::shared_ptr<CDCClient>& local_client,
470
    rpc::Rpcs* rpcs,
471
    std::function<void(const cdc::OutputClientResponse& response)> apply_changes_clbk,
472
0
    bool use_local_tserver) {
473
0
  return std::make_unique<TwoDCOutputClient>(
474
0
      cdc_consumer, consumer_tablet_info, producer_tablet_info, local_client, rpcs,
475
0
      std::move(apply_changes_clbk), use_local_tserver);
476
0
}
477
478
} // namespace enterprise
479
} // namespace tserver
480
} // namespace yb