YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/ent/src/yb/cdc/cdc_producer.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
13
#include "yb/cdc/cdc_producer.h"
14
#include "yb/cdc/cdc_common_util.h"
15
16
#include "yb/cdc/cdc_service.pb.h"
17
#include "yb/common/schema.h"
18
#include "yb/common/transaction.h"
19
#include "yb/common/wire_protocol.h"
20
21
#include "yb/consensus/raft_consensus.h"
22
#include "yb/consensus/replicate_msgs_holder.h"
23
24
#include "yb/docdb/doc_key.h"
25
#include "yb/docdb/docdb.pb.h"
26
#include "yb/docdb/primitive_value.h"
27
#include "yb/docdb/value.h"
28
#include "yb/docdb/value_type.h"
29
30
#include "yb/tablet/tablet.h"
31
#include "yb/tablet/tablet_metadata.h"
32
#include "yb/tablet/tablet_peer.h"
33
#include "yb/tablet/transaction_participant.h"
34
35
#include "yb/tserver/tablet_server.h"
36
#include "yb/tserver/ts_tablet_manager.h"
37
38
DEFINE_int32(cdc_transaction_timeout_ms, 0,
39
  "Don't check for an aborted transaction unless its original write is lagging by this duration.");
40
41
// Todo(Rahul): Enable this by default (https://github.com/yugabyte/yugabyte-db/issues/6128)
42
DEFINE_bool(cdc_enable_replicate_intents, true,
43
            "Enable replication of intents before they've been committed.");
44
45
namespace yb {
46
namespace cdc {
47
48
using consensus::ReplicateMsgPtr;
49
using consensus::ReplicateMsgs;
50
using docdb::PrimitiveValue;
51
using tablet::TransactionParticipant;
52
53
void AddColumnToMap(const ColumnSchema& col_schema,
54
                    const docdb::PrimitiveValue& col,
55
0
                    cdc::KeyValuePairPB* kv_pair) {
56
0
  kv_pair->set_key(col_schema.name());
57
0
  PrimitiveValue::ToQLValuePB(col, col_schema.type(), kv_pair->mutable_value());
58
0
}
59
60
void AddPrimaryKey(const docdb::SubDocKey& decoded_key,
61
                   const Schema& tablet_schema,
62
0
                   CDCRecordPB* record) {
63
0
  size_t i = 0;
64
0
  for (const auto& col : decoded_key.doc_key().hashed_group()) {
65
0
    AddColumnToMap(tablet_schema.column(i), col, record->add_key());
66
0
    i++;
67
0
  }
68
0
  for (const auto& col : decoded_key.doc_key().range_group()) {
69
0
    AddColumnToMap(tablet_schema.column(i), col, record->add_key());
70
0
    i++;
71
0
  }
72
0
}
73
74
// Set committed record information including commit time for record.
75
// This will look at transaction status to determine commit time to be used for CDC record.
76
// Returns true if we need to stop processing WAL records beyond this, false otherwise.
77
Result<bool> SetCommittedRecordIndexForReplicateMsg(
78
    const ReplicateMsgPtr& msg, size_t index, const TxnStatusMap& txn_map,
79
0
    ReplicateIntents replicate_intents, std::vector<RecordTimeIndex>* records) {
80
0
  if (replicate_intents) {
81
    // If we're replicating intents, we have no stop condition, so add the record and continue.
82
0
    records->emplace_back(msg->hybrid_time(), index);
83
0
    return false;
84
0
  }
85
0
  switch (msg->op_type()) {
86
0
    case consensus::OperationType::UPDATE_TRANSACTION_OP: {
87
0
      if (msg->transaction_state().status() == TransactionStatus::APPLYING) {
88
0
        records->emplace_back(msg->transaction_state().commit_hybrid_time(), index);
89
0
      }
90
      // Ignore other transaction statuses since we only care about APPLYING
91
      // while sending CDC records.
92
0
      return false;
93
0
    }
94
95
0
    case consensus::OperationType::WRITE_OP: {
96
0
      if (msg->write().write_batch().has_transaction()) {
97
0
        auto txn_id = VERIFY_RESULT(FullyDecodeTransactionId(
98
0
            msg->write().write_batch().transaction().transaction_id()));
99
0
        const auto txn_status = txn_map.find(txn_id);
100
0
        if (txn_status == txn_map.end()) {
101
0
          return STATUS(IllegalState, "Unexpected transaction ID", txn_id.ToString());
102
0
        }
103
104
0
        if (txn_status->second.status == PENDING || txn_status->second.status == CREATED) {
105
          // Ignore all records beyond this because we don't know whether those records
106
          // were committed before or after this record without the transaction commit time.
107
0
          return true;
108
0
        } else if (txn_status->second.status == COMMITTED) {
109
          // Add record to txn_msgs because there may be records appearing after this in WAL
110
          // but committed before this one. Example:
111
          // T0: WRITE K1 [TXN1]
112
          // T1: WRITE K2
113
          // T2: APPLYING TXN1
114
          // Here, WRITE K2 appears after WRITE K1 but is committed before K1.
115
0
          records->emplace_back(txn_status->second.status_time.ToUint64(), index);
116
0
        }
117
0
      } else {
118
        // Msg is not part of transaction. Use write hybrid time from msg itself.
119
0
        records->emplace_back(msg->hybrid_time(), index);
120
0
      }
121
0
      return false;
122
0
    }
123
0
    case consensus::OperationType::SPLIT_OP: {
124
0
      records->emplace_back(msg->hybrid_time(), index);
125
0
      return true;  // Don't need to process any records after a SPLIT_OP.
126
0
    }
127
128
0
    case consensus::OperationType::CHANGE_CONFIG_OP:
129
0
      FALLTHROUGH_INTENDED;
130
0
    case consensus::OperationType::CHANGE_METADATA_OP:
131
0
      FALLTHROUGH_INTENDED;
132
0
    case consensus::OperationType::HISTORY_CUTOFF_OP:
133
0
      FALLTHROUGH_INTENDED;
134
0
    case consensus::OperationType::NO_OP:
135
0
      FALLTHROUGH_INTENDED;
136
0
    case consensus::OperationType::SNAPSHOT_OP:
137
0
      FALLTHROUGH_INTENDED;
138
0
    case consensus::OperationType::TRUNCATE_OP:
139
0
      FALLTHROUGH_INTENDED;
140
0
    case consensus::OperationType::UNKNOWN_OP:
141
0
      return false;
142
0
  }
143
0
  FATAL_INVALID_ENUM_VALUE(consensus::OperationType, msg->op_type());
144
0
}
145
146
Result<std::vector<RecordTimeIndex>> GetCommittedRecordIndexes(
147
    const ReplicateMsgs& msgs, const TxnStatusMap& txn_map, ReplicateIntents replicate_intents,
148
0
    OpId* checkpoint) {
149
0
  size_t index = 0;
150
0
  std::vector<RecordTimeIndex> records;
151
152
  // Order ReplicateMsgs based on commit time.
153
0
  for (const auto &msg : msgs) {
154
0
    if (!msg->write().has_external_hybrid_time()) {
155
      // If the message came from an external source, ignore it when producing change list.
156
      // Note that checkpoint, however, will be updated and will account for external message too.
157
0
      bool stop = VERIFY_RESULT(SetCommittedRecordIndexForReplicateMsg(
158
0
          msg, index, txn_map, replicate_intents, &records));
159
0
      if (stop) {
160
0
        return records;
161
0
      }
162
0
    }
163
0
    *checkpoint = OpId::FromPB(msg->id());
164
0
    index++;
165
0
  }
166
0
  return records;
167
0
}
168
169
// Filter out WAL records that are external and order records based on transaction commit time.
170
// Records in WAL don't represent the exact order in which records are written in DB due to delay
171
// in writing txn APPLYING record.
172
// Consider the following WAL entries:
173
// TO: WRITE K0
174
// T1: WRITE K1 (TXN1)
175
// T2: WRITE K2 (TXN2)
176
// T3: WRITE K3
177
// T4: APPLYING TXN2
178
// T5: APPLYING TXN1
179
// T6: WRITE K4
180
// The order in which keys are written to DB in this example is K0, K3, K2, K1, K4.
181
// This method will also set checkpoint to the op id of last processed record.
182
Result<ReplicateMsgs> FilterAndSortWrites(const ReplicateMsgs& msgs,
183
                                          const TxnStatusMap& txn_map,
184
                                          ReplicateIntents replicate_intents,
185
0
                                          OpId* checkpoint) {
186
0
  std::vector<RecordTimeIndex> records = VERIFY_RESULT(GetCommittedRecordIndexes(
187
0
      msgs, txn_map, replicate_intents, checkpoint));
188
189
0
  if (!replicate_intents) {
190
0
    std::sort(records.begin(), records.end());
191
0
  }
192
193
0
  ReplicateMsgs ordered_msgs;
194
0
  ordered_msgs.reserve(records.size());
195
0
  for (const auto& record : records) {
196
0
    ordered_msgs.emplace_back(msgs[record.second]);
197
0
  }
198
0
  return ordered_msgs;
199
0
}
200
201
Result<TransactionStatusResult> GetTransactionStatus(
202
    const TransactionId& txn_id,
203
    const HybridTime& hybrid_time,
204
74
    TransactionParticipant* txn_participant) {
205
74
  static const std::string reason = "cdc";
206
207
74
  std::promise<Result<TransactionStatusResult>> txn_status_promise;
208
74
  auto future = txn_status_promise.get_future();
209
74
  auto callback = [&txn_status_promise](Result<TransactionStatusResult> result) {
210
74
    txn_status_promise.set_value(std::move(result));
211
74
  };
212
213
74
  txn_participant->RequestStatusAt(
214
74
      {&txn_id, hybrid_time, hybrid_time, 0, &reason, TransactionLoadFlags{}, callback});
215
74
  future.wait();
216
74
  return future.get();
217
74
}
218
219
// Build transaction status as of hybrid_time.
220
Result<TxnStatusMap> BuildTxnStatusMap(const ReplicateMsgs& messages,
221
                                       bool more_replicate_msgs,
222
                                       const HybridTime& cdc_read_hybrid_time,
223
0
                                       TransactionParticipant* txn_participant) {
224
0
  TxnStatusMap txn_map;
225
  // First go through all APPLYING records and mark transaction as committed.
226
0
  for (const auto& msg : messages) {
227
0
    if (msg->op_type() == consensus::OperationType::UPDATE_TRANSACTION_OP
228
0
        && msg->transaction_state().status() == TransactionStatus::APPLYING) {
229
0
      auto txn_id = VERIFY_RESULT(FullyDecodeTransactionId(
230
0
          msg->transaction_state().transaction_id()));
231
0
      txn_map.emplace(txn_id,
232
0
                      TransactionStatusResult(
233
0
                          TransactionStatus::COMMITTED,
234
0
                          HybridTime(msg->transaction_state().commit_hybrid_time())));
235
0
    }
236
0
  }
237
238
  // Now go through all WRITE_OP records and get transaction status of records for which
239
  // corresponding APPLYING record does not exist in WAL as yet.
240
0
  for (const auto& msg : messages) {
241
0
    if (msg->op_type() == consensus::OperationType::WRITE_OP
242
0
        && msg->write().write_batch().has_transaction()) {
243
0
      auto txn_id = VERIFY_RESULT(FullyDecodeTransactionId(
244
0
          msg->write().write_batch().transaction().transaction_id()));
245
246
0
      if (!txn_map.count(txn_id)) {
247
0
        TransactionStatusResult txn_status(TransactionStatus::PENDING, HybridTime::kMin);
248
249
0
        auto result = GetTransactionStatus(txn_id, cdc_read_hybrid_time, txn_participant);
250
0
        if (!result.ok()) {
251
0
          if (result.status().IsNotFound()) {
252
            // Naive heuristic for handling whether a transaction is aborted or still pending:
253
            // 1. If the normal transaction timeout is not reached, assume good operation.
254
            // 2. If more_replicate_messages, assume a race between reading
255
            //    TransactionParticipant & LogCache.
256
            // TODO (#2405) : Handle long running or very large transactions correctly.
257
0
            if (!more_replicate_msgs) {
258
0
              auto timeout = HybridTime::FromPB(msg->hybrid_time())
259
0
                  .AddMilliseconds(FLAGS_cdc_transaction_timeout_ms);
260
0
              if (timeout < cdc_read_hybrid_time) {
261
0
                LOG(INFO) << "Transaction not found, considering it aborted: " << txn_id;
262
0
                txn_status = TransactionStatusResult::Aborted();
263
0
              }
264
0
            }
265
0
          } else {
266
0
            return result.status();
267
0
          }
268
0
        } else {
269
0
          txn_status = *result;
270
0
        }
271
0
        txn_map.emplace(txn_id, txn_status);
272
0
      }
273
0
    }
274
0
  }
275
0
  return txn_map;
276
0
}
277
278
CHECKED_STATUS SetRecordTime(const TransactionId& txn_id,
279
                             const TxnStatusMap& txn_map,
280
0
                             CDCRecordPB* record) {
281
0
  auto txn_status = txn_map.find(txn_id);
282
0
  if (txn_status == txn_map.end()) {
283
0
    return STATUS(IllegalState, "Unexpected transaction ID", txn_id.ToString());
284
0
  }
285
0
  record->set_time(txn_status->second.status_time.ToUint64());
286
0
  return Status::OK();
287
0
}
288
289
// Populate CDC record corresponding to WAL batch in ReplicateMsg.
290
CHECKED_STATUS PopulateWriteRecord(const ReplicateMsgPtr& msg,
291
                                   const TxnStatusMap& txn_map,
292
                                   const StreamMetadata& metadata,
293
                                   const std::shared_ptr<tablet::TabletPeer>& tablet_peer,
294
                                   ReplicateIntents replicate_intents,
295
0
                                   GetChangesResponsePB* resp) {
296
0
  const auto& batch = msg->write().write_batch();
297
0
  const auto& schema = *tablet_peer->tablet()->schema();
298
  // Write batch may contain records from different rows.
299
  // For CDC, we need to split the batch into 1 CDC record per row of the table.
300
  // We'll use DocDB key hash to identify the records that belong to the same row.
301
0
  Slice prev_key;
302
0
  CDCRecordPB* record = nullptr;
303
0
  for (const auto& write_pair : batch.write_pairs()) {
304
0
    Slice key = write_pair.key();
305
0
    const auto key_size = VERIFY_RESULT(
306
0
        docdb::DocKey::EncodedSize(key, docdb::DocKeyPart::kWholeDocKey));
307
308
0
    Slice value = write_pair.value();
309
0
    docdb::Value decoded_value;
310
0
    RETURN_NOT_OK(decoded_value.Decode(value));
311
312
    // Compare key hash with previously seen key hash to determine whether the write pair
313
    // is part of the same row or not.
314
0
    Slice primary_key(key.data(), key_size);
315
0
    if (prev_key != primary_key) {
316
      // Write pair contains record for different row. Create a new CDCRecord in this case.
317
0
      record = resp->add_records();
318
0
      Slice sub_doc_key = key;
319
0
      docdb::SubDocKey decoded_key;
320
0
      RETURN_NOT_OK(decoded_key.DecodeFrom(&sub_doc_key, docdb::HybridTimeRequired::kFalse));
321
322
0
      if (metadata.record_format == CDCRecordFormat::WAL) {
323
        // For 2DC, populate serialized data from WAL, to avoid unnecessary deserializing on
324
        // producer and re-serializing on consumer.
325
0
        auto kv_pair = record->add_key();
326
0
        kv_pair->set_key(std::to_string(decoded_key.doc_key().hash()));
327
0
        kv_pair->mutable_value()->set_binary_value(write_pair.key());
328
0
      } else {
329
0
        AddPrimaryKey(decoded_key, schema, record);
330
0
      }
331
332
      // Check whether operation is WRITE or DELETE.
333
0
      if (decoded_value.value_type() == docdb::ValueType::kTombstone &&
334
0
          decoded_key.num_subkeys() == 0) {
335
0
        record->set_operation(CDCRecordPB::DELETE);
336
0
      } else {
337
0
        record->set_operation(CDCRecordPB::WRITE);
338
0
      }
339
340
      // Process intent records.
341
0
      record->set_time(msg->hybrid_time());
342
0
      if (batch.has_transaction()) {
343
0
        if (!replicate_intents) {
344
0
          auto txn_id = VERIFY_RESULT(FullyDecodeTransactionId(
345
0
              batch.transaction().transaction_id()));
346
          // If we're not replicating intents, set record time using the transaction map.
347
0
          RETURN_NOT_OK(SetRecordTime(txn_id, txn_map, record));
348
0
        } else {
349
0
          record->mutable_transaction_state()->set_transaction_id(
350
0
              batch.transaction().transaction_id());
351
0
          record->mutable_transaction_state()->add_tablets(tablet_peer->tablet_id());
352
0
        }
353
0
      }
354
0
    }
355
0
    prev_key = primary_key;
356
0
    DCHECK(record);
357
358
0
    if (metadata.record_format == CDCRecordFormat::WAL) {
359
0
      auto kv_pair = record->add_changes();
360
0
      kv_pair->set_key(write_pair.key());
361
0
      kv_pair->mutable_value()->set_binary_value(write_pair.value());
362
0
    } else if (record->operation() == CDCRecordPB_OperationType_WRITE) {
363
0
      PrimitiveValue column_id;
364
0
      Slice key_column = write_pair.key().data() + key_size;
365
0
      RETURN_NOT_OK(PrimitiveValue::DecodeKey(&key_column, &column_id));
366
0
      if (column_id.value_type() == docdb::ValueType::kColumnId) {
367
0
        const ColumnSchema& col = VERIFY_RESULT(schema.column_by_id(column_id.GetColumnId()));
368
0
        AddColumnToMap(col, decoded_value.primitive_value(), record->add_changes());
369
0
      } else if (column_id.value_type() != docdb::ValueType::kSystemColumnId) {
370
0
        LOG(DFATAL) << "Unexpected value type in key: " << column_id.value_type();
371
0
      }
372
0
    }
373
0
  }
374
0
  return Status::OK();
375
0
}
376
377
// Populate CDC record corresponding to WAL UPDATE_TRANSACTION_OP entry.
378
CHECKED_STATUS PopulateTransactionRecord(const ReplicateMsgPtr& msg,
379
                                         const std::shared_ptr<tablet::TabletPeer>& tablet_peer,
380
                                         ReplicateIntents replicate_intents,
381
0
                                         CDCRecordPB* record) {
382
0
  SCHECK(msg->has_transaction_state(), InvalidArgument,
383
0
         Format("Update transaction message requires transaction_state: $0",
384
0
                msg->ShortDebugString()));
385
0
  record->set_operation(CDCRecordPB_OperationType_WRITE);
386
0
  record->set_time(replicate_intents ?
387
0
      msg->hybrid_time() : msg->transaction_state().commit_hybrid_time());
388
0
  record->mutable_transaction_state()->CopyFrom(msg->transaction_state());
389
0
  if (replicate_intents && msg->transaction_state().status() == TransactionStatus::APPLYING) {
390
    // Add the partition metadata so the consumer knows which tablets to apply the transaction
391
    // to.
392
0
    tablet_peer->tablet()->metadata()->partition()->ToPB(record->mutable_partition());
393
0
  }
394
0
  return Status::OK();
395
0
}
396
397
0
CHECKED_STATUS PopulateSplitOpRecord(const ReplicateMsgPtr& msg, CDCRecordPB* record) {
398
0
  SCHECK(msg->has_split_request(), InvalidArgument,
399
0
         Format("Split op message requires split_request: $0", msg->ShortDebugString()));
400
0
  record->set_operation(CDCRecordPB::SPLIT_OP);
401
0
  record->set_time(msg->hybrid_time());
402
0
  record->mutable_split_tablet_request()->CopyFrom(msg->split_request());
403
0
  return Status::OK();
404
0
}
405
406
Status GetChangesForXCluster(const std::string& stream_id,
407
                             const std::string& tablet_id,
408
                             const OpId& from_op_id,
409
                             const StreamMetadata& stream_metadata,
410
                             const std::shared_ptr<tablet::TabletPeer>& tablet_peer,
411
                             const MemTrackerPtr& mem_tracker,
412
                             consensus::ReplicateMsgsHolder* msgs_holder,
413
                             GetChangesResponsePB* resp,
414
                             int64_t* last_readable_opid_index,
415
0
                             const CoarseTimePoint deadline) {
416
0
  auto replicate_intents = ReplicateIntents(GetAtomicFlag(&FLAGS_cdc_enable_replicate_intents));
417
  // Request scope on transaction participant so that transactions are not removed from participant
418
  // while RequestScope is active.
419
0
  RequestScope request_scope;
420
421
0
  auto read_ops = VERIFY_RESULT(tablet_peer->consensus()->
422
0
    ReadReplicatedMessagesForCDC(from_op_id, last_readable_opid_index, deadline));
423
0
  ScopedTrackedConsumption consumption;
424
0
  if (read_ops.read_from_disk_size && mem_tracker) {
425
0
    consumption = ScopedTrackedConsumption(mem_tracker, read_ops.read_from_disk_size);
426
0
  }
427
428
0
  OpId checkpoint;
429
0
  TxnStatusMap txn_map;
430
0
  if (!replicate_intents) {
431
0
    auto txn_participant = tablet_peer->tablet()->transaction_participant();
432
0
    if (txn_participant) {
433
0
      request_scope = RequestScope(txn_participant);
434
0
    }
435
0
    txn_map = TxnStatusMap(VERIFY_RESULT(BuildTxnStatusMap(
436
0
      read_ops.messages, read_ops.have_more_messages, tablet_peer->Now(), txn_participant)));
437
0
  }
438
0
  ReplicateMsgs messages = VERIFY_RESULT(FilterAndSortWrites(
439
0
      read_ops.messages, txn_map, replicate_intents, &checkpoint));
440
441
0
  for (const auto& msg : messages) {
442
0
    switch (msg->op_type()) {
443
0
      case consensus::OperationType::UPDATE_TRANSACTION_OP:
444
0
        if (!replicate_intents) {
445
0
          RETURN_NOT_OK(PopulateTransactionRecord(
446
0
              msg, tablet_peer, replicate_intents, resp->add_records()));
447
0
        } else if (msg->transaction_state().status() == TransactionStatus::APPLYING) {
448
0
          auto record = resp->add_records();
449
0
          record->set_operation(CDCRecordPB::APPLY);
450
0
          record->set_time(msg->hybrid_time());
451
0
          auto* txn_state = record->mutable_transaction_state();
452
0
          txn_state->set_transaction_id(msg->transaction_state().transaction_id());
453
0
          txn_state->set_commit_hybrid_time(msg->transaction_state().commit_hybrid_time());
454
0
          tablet_peer->tablet()->metadata()->partition()->ToPB(record->mutable_partition());
455
0
        }
456
0
        break;
457
0
      case consensus::OperationType::WRITE_OP:
458
0
        RETURN_NOT_OK(PopulateWriteRecord(msg, txn_map, stream_metadata, tablet_peer,
459
0
                                          replicate_intents, resp));
460
0
        break;
461
0
      case consensus::OperationType::SPLIT_OP:
462
0
        RETURN_NOT_OK(PopulateSplitOpRecord(msg, resp->add_records()));
463
0
        break;
464
465
0
      default:
466
        // Nothing to do for other operation types.
467
0
        break;
468
0
    }
469
0
  }
470
471
0
  if (consumption) {
472
0
    consumption.Add(resp->SpaceUsedLong());
473
0
  }
474
0
  *msgs_holder = consensus::ReplicateMsgsHolder(
475
0
      nullptr, std::move(messages), std::move(consumption));
476
0
  (checkpoint.index > 0 ? checkpoint : from_op_id).ToPB(
477
0
      resp->mutable_checkpoint()->mutable_op_id());
478
0
  return Status::OK();
479
0
}
480
481
}  // namespace cdc
482
}  // namespace yb