YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/ent/src/yb/cdc/cdc_producer.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
13
#include "yb/cdc/cdc_producer.h"
14
#include "yb/cdc/cdc_common_util.h"
15
16
#include "yb/cdc/cdc_service.pb.h"
17
#include "yb/common/schema.h"
18
#include "yb/common/transaction.h"
19
#include "yb/common/wire_protocol.h"
20
21
#include "yb/consensus/raft_consensus.h"
22
#include "yb/consensus/replicate_msgs_holder.h"
23
24
#include "yb/docdb/doc_key.h"
25
#include "yb/docdb/docdb.pb.h"
26
#include "yb/docdb/primitive_value.h"
27
#include "yb/docdb/value.h"
28
#include "yb/docdb/value_type.h"
29
30
#include "yb/tablet/tablet.h"
31
#include "yb/tablet/tablet_metadata.h"
32
#include "yb/tablet/tablet_peer.h"
33
#include "yb/tablet/transaction_participant.h"
34
35
#include "yb/tserver/tablet_server.h"
36
#include "yb/tserver/ts_tablet_manager.h"
37
38
#include "yb/util/flag_tags.h"
39
#include "yb/util/logging.h"
40
41
DEFINE_int32(cdc_transaction_timeout_ms, 0,
42
  "Don't check for an aborted transaction unless its original write is lagging by this duration.");
43
44
DEFINE_bool(cdc_enable_replicate_intents, true,
45
            "Enable replication of intents before they've been committed.");
46
47
DEFINE_test_flag(bool, xcluster_simulate_have_more_records, false, 
48
                 "Whether GetChanges should indicate that it has more records for safe time "
49
                 "calculation.");
50
51
namespace yb {
52
namespace cdc {
53
54
using consensus::ReplicateMsgPtr;
55
using consensus::ReplicateMsgs;
56
using docdb::PrimitiveValue;
57
using tablet::TransactionParticipant;
58
59
void AddColumnToMap(const ColumnSchema& col_schema,
60
                    const docdb::PrimitiveValue& col,
61
0
                    cdc::KeyValuePairPB* kv_pair) {
62
0
  kv_pair->set_key(col_schema.name());
63
0
  PrimitiveValue::ToQLValuePB(col, col_schema.type(), kv_pair->mutable_value());
64
0
}
65
66
void AddPrimaryKey(const docdb::SubDocKey& decoded_key,
67
                   const Schema& tablet_schema,
68
0
                   CDCRecordPB* record) {
69
0
  size_t i = 0;
70
0
  for (const auto& col : decoded_key.doc_key().hashed_group()) {
71
0
    AddColumnToMap(tablet_schema.column(i), col, record->add_key());
72
0
    i++;
73
0
  }
74
0
  for (const auto& col : decoded_key.doc_key().range_group()) {
75
0
    AddColumnToMap(tablet_schema.column(i), col, record->add_key());
76
0
    i++;
77
0
  }
78
0
}
79
80
// Set committed record information including commit time for record.
81
// This will look at transaction status to determine commit time to be used for CDC record.
82
// Returns true if we need to stop processing WAL records beyond this, false otherwise.
83
Result<bool> SetCommittedRecordIndexForReplicateMsg(
84
    const ReplicateMsgPtr& msg, size_t index, const TxnStatusMap& txn_map,
85
0
    ReplicateIntents replicate_intents, std::vector<RecordTimeIndex>* records) {
86
0
  if (replicate_intents) {
87
    // If we're replicating intents, we have no stop condition, so add the record and continue.
88
0
    records->emplace_back(msg->hybrid_time(), index);
89
0
    return false;
90
0
  }
91
0
  switch (msg->op_type()) {
92
0
    case consensus::OperationType::UPDATE_TRANSACTION_OP: {
93
0
      if (msg->transaction_state().status() == TransactionStatus::APPLYING) {
94
0
        records->emplace_back(msg->transaction_state().commit_hybrid_time(), index);
95
0
      }
96
      // Ignore other transaction statuses since we only care about APPLYING
97
      // while sending CDC records.
98
0
      return false;
99
0
    }
100
101
0
    case consensus::OperationType::WRITE_OP: {
102
0
      if (msg->write().write_batch().has_transaction()) {
103
0
        auto txn_id = VERIFY_RESULT(FullyDecodeTransactionId(
104
0
            msg->write().write_batch().transaction().transaction_id()));
105
0
        const auto txn_status = txn_map.find(txn_id);
106
0
        if (txn_status == txn_map.end()) {
107
0
          return STATUS(IllegalState, "Unexpected transaction ID", txn_id.ToString());
108
0
        }
109
110
0
        if (txn_status->second.status == PENDING || txn_status->second.status == CREATED) {
111
          // Ignore all records beyond this because we don't know whether those records
112
          // were committed before or after this record without the transaction commit time.
113
0
          return true;
114
0
        } else if (txn_status->second.status == COMMITTED) {
115
          // Add record to txn_msgs because there may be records appearing after this in WAL
116
          // but committed before this one. Example:
117
          // T0: WRITE K1 [TXN1]
118
          // T1: WRITE K2
119
          // T2: APPLYING TXN1
120
          // Here, WRITE K2 appears after WRITE K1 but is committed before K1.
121
0
          records->emplace_back(txn_status->second.status_time.ToUint64(), index);
122
0
        }
123
0
      } else {
124
        // Msg is not part of transaction. Use write hybrid time from msg itself.
125
0
        records->emplace_back(msg->hybrid_time(), index);
126
0
      }
127
0
      return false;
128
0
    }
129
0
    case consensus::OperationType::SPLIT_OP: {
130
0
      records->emplace_back(msg->hybrid_time(), index);
131
0
      return true;  // Don't need to process any records after a SPLIT_OP.
132
0
    }
133
134
0
    case consensus::OperationType::CHANGE_CONFIG_OP:
135
0
      FALLTHROUGH_INTENDED;
136
0
    case consensus::OperationType::CHANGE_METADATA_OP:
137
0
      FALLTHROUGH_INTENDED;
138
0
    case consensus::OperationType::HISTORY_CUTOFF_OP:
139
0
      FALLTHROUGH_INTENDED;
140
0
    case consensus::OperationType::NO_OP:
141
0
      FALLTHROUGH_INTENDED;
142
0
    case consensus::OperationType::SNAPSHOT_OP:
143
0
      FALLTHROUGH_INTENDED;
144
0
    case consensus::OperationType::TRUNCATE_OP:
145
0
      FALLTHROUGH_INTENDED;
146
0
    case consensus::OperationType::UNKNOWN_OP:
147
0
      return false;
148
0
  }
149
0
  FATAL_INVALID_ENUM_VALUE(consensus::OperationType, msg->op_type());
150
0
}
151
152
Result<std::vector<RecordTimeIndex>> GetCommittedRecordIndexes(
153
    const ReplicateMsgs& msgs, const TxnStatusMap& txn_map, ReplicateIntents replicate_intents,
154
0
    OpId* checkpoint) {
155
0
  size_t index = 0;
156
0
  std::vector<RecordTimeIndex> records;
157
158
  // Order ReplicateMsgs based on commit time.
159
0
  for (const auto &msg : msgs) {
160
0
    if (!msg->write().has_external_hybrid_time()) {
161
      // If the message came from an external source, ignore it when producing change list.
162
      // Note that checkpoint, however, will be updated and will account for external message too.
163
0
      bool stop = VERIFY_RESULT(SetCommittedRecordIndexForReplicateMsg(
164
0
          msg, index, txn_map, replicate_intents, &records));
165
0
      if (stop) {
166
0
        return records;
167
0
      }
168
0
    }
169
0
    *checkpoint = OpId::FromPB(msg->id());
170
0
    index++;
171
0
  }
172
0
  return records;
173
0
}
174
175
// Filter out WAL records that are external and order records based on transaction commit time.
176
// Records in WAL don't represent the exact order in which records are written in DB due to delay
177
// in writing txn APPLYING record.
178
// Consider the following WAL entries:
179
// TO: WRITE K0
180
// T1: WRITE K1 (TXN1)
181
// T2: WRITE K2 (TXN2)
182
// T3: WRITE K3
183
// T4: APPLYING TXN2
184
// T5: APPLYING TXN1
185
// T6: WRITE K4
186
// The order in which keys are written to DB in this example is K0, K3, K2, K1, K4.
187
// This method will also set checkpoint to the op id of last processed record.
188
Result<ReplicateMsgs> FilterAndSortWrites(const ReplicateMsgs& msgs,
189
                                          const TxnStatusMap& txn_map,
190
                                          ReplicateIntents replicate_intents,
191
0
                                          OpId* checkpoint) {
192
0
  std::vector<RecordTimeIndex> records = VERIFY_RESULT(GetCommittedRecordIndexes(
193
0
      msgs, txn_map, replicate_intents, checkpoint));
194
195
0
  if (!replicate_intents) {
196
0
    std::sort(records.begin(), records.end());
197
0
  }
198
199
0
  ReplicateMsgs ordered_msgs;
200
0
  ordered_msgs.reserve(records.size());
201
0
  for (const auto& record : records) {
202
0
    ordered_msgs.emplace_back(msgs[record.second]);
203
0
  }
204
0
  return ordered_msgs;
205
0
}
206
207
Result<TransactionStatusResult> GetTransactionStatus(
208
    const TransactionId& txn_id,
209
    const HybridTime& hybrid_time,
210
147
    TransactionParticipant* txn_participant) {
211
147
  static const std::string reason = "cdc";
212
213
147
  std::promise<Result<TransactionStatusResult>> txn_status_promise;
214
147
  auto future = txn_status_promise.get_future();
215
147
  auto callback = [&txn_status_promise](Result<TransactionStatusResult> result) {
216
147
    txn_status_promise.set_value(std::move(result));
217
147
  };
218
219
147
  txn_participant->RequestStatusAt(
220
147
      {&txn_id, hybrid_time, hybrid_time, 0, &reason, TransactionLoadFlags{}, callback});
221
147
  future.wait();
222
147
  return future.get();
223
147
}
224
225
// Build transaction status as of hybrid_time.
226
Result<TxnStatusMap> BuildTxnStatusMap(const ReplicateMsgs& messages,
227
                                       bool more_replicate_msgs,
228
                                       const HybridTime& cdc_read_hybrid_time,
229
0
                                       TransactionParticipant* txn_participant) {
230
0
  TxnStatusMap txn_map;
231
  // First go through all APPLYING records and mark transaction as committed.
232
0
  for (const auto& msg : messages) {
233
0
    if (msg->op_type() == consensus::OperationType::UPDATE_TRANSACTION_OP
234
0
        && msg->transaction_state().status() == TransactionStatus::APPLYING) {
235
0
      auto txn_id = VERIFY_RESULT(FullyDecodeTransactionId(
236
0
          msg->transaction_state().transaction_id()));
237
0
      txn_map.emplace(txn_id,
238
0
                      TransactionStatusResult(
239
0
                          TransactionStatus::COMMITTED,
240
0
                          HybridTime(msg->transaction_state().commit_hybrid_time())));
241
0
    }
242
0
  }
243
244
  // Now go through all WRITE_OP records and get transaction status of records for which
245
  // corresponding APPLYING record does not exist in WAL as yet.
246
0
  for (const auto& msg : messages) {
247
0
    if (msg->op_type() == consensus::OperationType::WRITE_OP
248
0
        && msg->write().write_batch().has_transaction()) {
249
0
      auto txn_id = VERIFY_RESULT(FullyDecodeTransactionId(
250
0
          msg->write().write_batch().transaction().transaction_id()));
251
252
0
      if (!txn_map.count(txn_id)) {
253
0
        TransactionStatusResult txn_status(TransactionStatus::PENDING, HybridTime::kMin);
254
255
0
        auto result = GetTransactionStatus(txn_id, cdc_read_hybrid_time, txn_participant);
256
0
        if (!result.ok()) {
257
0
          if (result.status().IsNotFound()) {
258
            // Naive heuristic for handling whether a transaction is aborted or still pending:
259
            // 1. If the normal transaction timeout is not reached, assume good operation.
260
            // 2. If more_replicate_messages, assume a race between reading
261
            //    TransactionParticipant & LogCache.
262
            // TODO (#2405) : Handle long running or very large transactions correctly.
263
0
            if (!more_replicate_msgs) {
264
0
              auto timeout = HybridTime::FromPB(msg->hybrid_time())
265
0
                  .AddMilliseconds(FLAGS_cdc_transaction_timeout_ms);
266
0
              if (timeout < cdc_read_hybrid_time) {
267
0
                LOG(INFO) << "Transaction not found, considering it aborted: " << txn_id;
268
0
                txn_status = TransactionStatusResult::Aborted();
269
0
              }
270
0
            }
271
0
          } else {
272
0
            return result.status();
273
0
          }
274
0
        } else {
275
0
          txn_status = *result;
276
0
        }
277
0
        txn_map.emplace(txn_id, txn_status);
278
0
      }
279
0
    }
280
0
  }
281
0
  return txn_map;
282
0
}
283
284
CHECKED_STATUS SetRecordTime(const TransactionId& txn_id,
285
                             const TxnStatusMap& txn_map,
286
0
                             CDCRecordPB* record) {
287
0
  auto txn_status = txn_map.find(txn_id);
288
0
  if (txn_status == txn_map.end()) {
289
0
    return STATUS(IllegalState, "Unexpected transaction ID", txn_id.ToString());
290
0
  }
291
0
  record->set_time(txn_status->second.status_time.ToUint64());
292
0
  return Status::OK();
293
0
}
294
295
// Populate CDC record corresponding to WAL batch in ReplicateMsg.
296
CHECKED_STATUS PopulateWriteRecord(const ReplicateMsgPtr& msg,
297
                                   const TxnStatusMap& txn_map,
298
                                   const StreamMetadata& metadata,
299
                                   const std::shared_ptr<tablet::TabletPeer>& tablet_peer,
300
                                   ReplicateIntents replicate_intents,
301
0
                                   GetChangesResponsePB* resp) {
302
0
  const auto& batch = msg->write().write_batch();
303
0
  const auto& schema = *tablet_peer->tablet()->schema();
304
  // Write batch may contain records from different rows.
305
  // For CDC, we need to split the batch into 1 CDC record per row of the table.
306
  // We'll use DocDB key hash to identify the records that belong to the same row.
307
0
  Slice prev_key;
308
0
  CDCRecordPB* record = nullptr;
309
0
  for (const auto& write_pair : batch.write_pairs()) {
310
0
    Slice key = write_pair.key();
311
0
    const auto key_size = VERIFY_RESULT(
312
0
        docdb::DocKey::EncodedSize(key, docdb::DocKeyPart::kWholeDocKey));
313
314
0
    Slice value = write_pair.value();
315
0
    docdb::Value decoded_value;
316
0
    RETURN_NOT_OK(decoded_value.Decode(value));
317
318
    // Compare key hash with previously seen key hash to determine whether the write pair
319
    // is part of the same row or not.
320
0
    Slice primary_key(key.data(), key_size);
321
0
    if (prev_key != primary_key) {
322
      // Write pair contains record for different row. Create a new CDCRecord in this case.
323
0
      record = resp->add_records();
324
0
      Slice sub_doc_key = key;
325
0
      docdb::SubDocKey decoded_key;
326
0
      RETURN_NOT_OK(decoded_key.DecodeFrom(&sub_doc_key, docdb::HybridTimeRequired::kFalse));
327
328
0
      if (metadata.record_format == CDCRecordFormat::WAL) {
329
        // For 2DC, populate serialized data from WAL, to avoid unnecessary deserializing on
330
        // producer and re-serializing on consumer.
331
0
        auto kv_pair = record->add_key();
332
0
        kv_pair->set_key(std::to_string(decoded_key.doc_key().hash()));
333
0
        kv_pair->mutable_value()->set_binary_value(write_pair.key());
334
0
      } else {
335
0
        AddPrimaryKey(decoded_key, schema, record);
336
0
      }
337
338
      // Check whether operation is WRITE or DELETE.
339
0
      if (decoded_value.value_type() == docdb::ValueType::kTombstone &&
340
0
          decoded_key.num_subkeys() == 0) {
341
0
        record->set_operation(CDCRecordPB::DELETE);
342
0
      } else {
343
0
        record->set_operation(CDCRecordPB::WRITE);
344
0
      }
345
346
      // Process intent records.
347
0
      record->set_time(msg->hybrid_time());
348
0
      if (batch.has_transaction()) {
349
0
        if (!replicate_intents) {
350
0
          auto txn_id = VERIFY_RESULT(FullyDecodeTransactionId(
351
0
              batch.transaction().transaction_id()));
352
          // If we're not replicating intents, set record time using the transaction map.
353
0
          RETURN_NOT_OK(SetRecordTime(txn_id, txn_map, record));
354
0
        } else {
355
0
          record->mutable_transaction_state()->set_transaction_id(
356
0
              batch.transaction().transaction_id());
357
0
          record->mutable_transaction_state()->add_tablets(tablet_peer->tablet_id());
358
0
        }
359
0
      }
360
0
    }
361
0
    prev_key = primary_key;
362
0
    DCHECK(record);
363
364
0
    if (metadata.record_format == CDCRecordFormat::WAL) {
365
0
      auto kv_pair = record->add_changes();
366
0
      kv_pair->set_key(write_pair.key());
367
0
      kv_pair->mutable_value()->set_binary_value(write_pair.value());
368
0
    } else if (record->operation() == CDCRecordPB_OperationType_WRITE) {
369
0
      PrimitiveValue column_id;
370
0
      Slice key_column = write_pair.key().data() + key_size;
371
0
      RETURN_NOT_OK(PrimitiveValue::DecodeKey(&key_column, &column_id));
372
0
      if (column_id.value_type() == docdb::ValueType::kColumnId) {
373
0
        const ColumnSchema& col = VERIFY_RESULT(schema.column_by_id(column_id.GetColumnId()));
374
0
        AddColumnToMap(col, decoded_value.primitive_value(), record->add_changes());
375
0
      } else if (column_id.value_type() != docdb::ValueType::kSystemColumnId) {
376
0
        LOG(DFATAL) << "Unexpected value type in key: " << column_id.value_type();
377
0
      }
378
0
    }
379
0
  }
380
0
  return Status::OK();
381
0
}
382
383
// Populate CDC record corresponding to WAL UPDATE_TRANSACTION_OP entry.
384
CHECKED_STATUS PopulateTransactionRecord(const ReplicateMsgPtr& msg,
385
                                         const std::shared_ptr<tablet::TabletPeer>& tablet_peer,
386
                                         ReplicateIntents replicate_intents,
387
0
                                         CDCRecordPB* record) {
388
0
  SCHECK(msg->has_transaction_state(), InvalidArgument,
389
0
         Format("Update transaction message requires transaction_state: $0",
390
0
                msg->ShortDebugString()));
391
0
  record->set_operation(CDCRecordPB_OperationType_WRITE);
392
0
  record->set_time(replicate_intents ?
393
0
      msg->hybrid_time() : msg->transaction_state().commit_hybrid_time());
394
0
  record->mutable_transaction_state()->CopyFrom(msg->transaction_state());
395
0
  if (replicate_intents && msg->transaction_state().status() == TransactionStatus::APPLYING) {
396
    // Add the partition metadata so the consumer knows which tablets to apply the transaction
397
    // to.
398
0
    tablet_peer->tablet()->metadata()->partition()->ToPB(record->mutable_partition());
399
0
  }
400
0
  return Status::OK();
401
0
}
402
403
0
CHECKED_STATUS PopulateSplitOpRecord(const ReplicateMsgPtr& msg, CDCRecordPB* record) {
404
0
  SCHECK(msg->has_split_request(), InvalidArgument,
405
0
         Format("Split op message requires split_request: $0", msg->ShortDebugString()));
406
0
  record->set_operation(CDCRecordPB::SPLIT_OP);
407
0
  record->set_time(msg->hybrid_time());
408
0
  record->mutable_split_tablet_request()->CopyFrom(msg->split_request());
409
0
  return Status::OK();
410
0
}
411
412
Result<HybridTime> GetSafeTimeForTarget(const std::shared_ptr<tablet::TabletPeer>& tablet_peer,
413
                                        HybridTime ht_of_last_returned_message,
414
0
                                        HaveMoreMessages have_more_messages) {
415
0
  if (ht_of_last_returned_message != HybridTime::kInvalid && have_more_messages) {
416
0
    return ht_of_last_returned_message;
417
0
  }
418
0
  return tablet_peer->LeaderSafeTime();
419
0
}
420
421
Status GetChangesForXCluster(const std::string& stream_id,
422
                             const std::string& tablet_id,
423
                             const OpId& from_op_id,
424
                             const StreamMetadata& stream_metadata,
425
                             const std::shared_ptr<tablet::TabletPeer>& tablet_peer,
426
                             const MemTrackerPtr& mem_tracker,
427
                             consensus::ReplicateMsgsHolder* msgs_holder,
428
                             GetChangesResponsePB* resp,
429
                             int64_t* last_readable_opid_index,
430
0
                             const CoarseTimePoint deadline) {
431
0
  auto replicate_intents = ReplicateIntents(GetAtomicFlag(&FLAGS_cdc_enable_replicate_intents));
432
  // Request scope on transaction participant so that transactions are not removed from participant
433
  // while RequestScope is active.
434
0
  RequestScope request_scope;
435
436
0
  auto read_ops = VERIFY_RESULT(tablet_peer->consensus()->
437
0
    ReadReplicatedMessagesForCDC(from_op_id, last_readable_opid_index, deadline));
438
0
  ScopedTrackedConsumption consumption;
439
0
  if (read_ops.read_from_disk_size && mem_tracker) {
440
0
    consumption = ScopedTrackedConsumption(mem_tracker, read_ops.read_from_disk_size);
441
0
  }
442
443
0
  OpId checkpoint;
444
0
  TxnStatusMap txn_map;
445
0
  if (!replicate_intents) {
446
0
    auto txn_participant = tablet_peer->tablet()->transaction_participant();
447
0
    if (txn_participant) {
448
0
      request_scope = RequestScope(txn_participant);
449
0
    }
450
0
    txn_map = TxnStatusMap(VERIFY_RESULT(BuildTxnStatusMap(
451
0
      read_ops.messages, read_ops.have_more_messages, tablet_peer->Now(), txn_participant)));
452
0
  }
453
0
  ReplicateMsgs messages = VERIFY_RESULT(FilterAndSortWrites(
454
0
      read_ops.messages, txn_map, replicate_intents, &checkpoint));
455
456
0
  for (const auto& msg : messages) {
457
0
    switch (msg->op_type()) {
458
0
      case consensus::OperationType::UPDATE_TRANSACTION_OP:
459
0
        if (!replicate_intents) {
460
0
          RETURN_NOT_OK(PopulateTransactionRecord(
461
0
              msg, tablet_peer, replicate_intents, resp->add_records()));
462
0
        } else if (msg->transaction_state().status() == TransactionStatus::APPLYING) {
463
0
          auto record = resp->add_records();
464
0
          record->set_operation(CDCRecordPB::APPLY);
465
0
          record->set_time(msg->hybrid_time());
466
0
          auto* txn_state = record->mutable_transaction_state();
467
0
          txn_state->set_transaction_id(msg->transaction_state().transaction_id());
468
0
          txn_state->set_commit_hybrid_time(msg->transaction_state().commit_hybrid_time());
469
0
          tablet_peer->tablet()->metadata()->partition()->ToPB(record->mutable_partition());
470
0
        }
471
0
        break;
472
0
      case consensus::OperationType::WRITE_OP:
473
0
        RETURN_NOT_OK(PopulateWriteRecord(msg, txn_map, stream_metadata, tablet_peer,
474
0
                                          replicate_intents, resp));
475
0
        break;
476
0
      case consensus::OperationType::SPLIT_OP:
477
0
        RETURN_NOT_OK(PopulateSplitOpRecord(msg, resp->add_records()));
478
0
        break;
479
480
0
      default:
481
        // Nothing to do for other operation types.
482
0
        break;
483
0
    }
484
0
  }
485
486
0
  if (consumption) {
487
0
    consumption.Add(resp->SpaceUsedLong());
488
0
  }
489
0
  auto ht_of_last_returned_message = messages.empty() ? 
490
0
      HybridTime::kInvalid : HybridTime(messages.back()->hybrid_time());
491
0
  auto have_more_messages = PREDICT_FALSE(FLAGS_TEST_xcluster_simulate_have_more_records) ?
492
0
      HaveMoreMessages::kTrue : read_ops.have_more_messages;
493
0
  auto safe_time_result = GetSafeTimeForTarget(
494
0
      tablet_peer, ht_of_last_returned_message, have_more_messages);
495
0
  if (safe_time_result.ok()) {
496
0
    resp->set_safe_hybrid_time((*safe_time_result).ToUint64());
497
0
  } else {
498
0
    YB_LOG_EVERY_N_SECS(WARNING, 10) << 
499
0
        "Could not compute safe time: " << safe_time_result.status();
500
0
  }
501
0
  *msgs_holder = consensus::ReplicateMsgsHolder(
502
0
      nullptr, std::move(messages), std::move(consumption));
503
0
  (checkpoint.index > 0 ? checkpoint : from_op_id).ToPB(
504
0
      resp->mutable_checkpoint()->mutable_op_id());
505
0
  return Status::OK();
506
0
}
507
508
}  // namespace cdc
509
}  // namespace yb