YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/ent/src/yb/cdc/cdcsdk_producer.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
13
#include "yb/cdc/cdc_producer.h"
14
15
#include "yb/cdc/cdc_common_util.h"
16
17
#include "yb/common/wire_protocol.h"
18
#include "yb/common/ql_expr.h"
19
20
#include "yb/docdb/docdb_util.h"
21
#include "yb/docdb/doc_key.h"
22
23
DEFINE_int32(cdc_snapshot_batch_size, 250, "Batch size for the snapshot operation in CDC");
24
25
namespace yb {
26
namespace cdc {
27
28
using consensus::ReplicateMsgPtr;
29
using consensus::ReplicateMsgs;
30
using docdb::PrimitiveValue;
31
using tablet::TransactionParticipant;
32
using yb::QLTableRow;
33
34
YB_DEFINE_ENUM(OpType, (INSERT)(UPDATE)(DELETE));
35
36
378
void SetOperation(RowMessage* row_message, OpType type, const Schema& schema) {
37
378
  switch (type) {
38
242
    case OpType::INSERT:
39
242
      row_message->set_op(RowMessage_Op_INSERT);
40
242
      break;
41
69
    case OpType::UPDATE:
42
69
      row_message->set_op(RowMessage_Op_UPDATE);
43
69
      break;
44
67
    case OpType::DELETE:
45
67
      row_message->set_op(RowMessage_Op_DELETE);
46
67
      break;
47
378
  }
48
49
378
  row_message->set_pgschema_name(schema.SchemaName());
50
378
}
51
52
void AddColumnToMap(
53
    const std::shared_ptr<tablet::TabletPeer>& tablet_peer,
54
    const ColumnSchema& col_schema,
55
    const docdb::PrimitiveValue& col,
56
930
    DatumMessagePB* cdc_datum_message) {
57
930
  cdc_datum_message->set_column_name(col_schema.name());
58
930
  QLValuePB ql_value;
59
930
  if (tablet_peer->tablet()->table_type() == PGSQL_TABLE_TYPE) {
60
930
    docdb::PrimitiveValue::ToQLValuePB(col, col_schema.type(), &ql_value);
61
930
    if (!IsNull(ql_value) && col_schema.pg_type_oid() != 0 /*kInvalidOid*/) {
62
885
      docdb::SetValueFromQLBinaryWrapper(ql_value, col_schema.pg_type_oid(), cdc_datum_message);
63
45
    } else {
64
45
      cdc_datum_message->set_column_type(col_schema.pg_type_oid());
65
45
    }
66
930
  }
67
930
}
68
69
447
DatumMessagePB* AddTuple(RowMessage* row_message) {
70
447
  if (!row_message) {
71
0
    return nullptr;
72
0
  }
73
447
  DatumMessagePB* tuple = nullptr;
74
75
447
  if (row_message->op() == RowMessage_Op_DELETE) {
76
86
    tuple = row_message->add_old_tuple();
77
86
    row_message->add_new_tuple();
78
361
  } else {
79
361
    tuple = row_message->add_new_tuple();
80
361
    row_message->add_old_tuple();
81
361
  }
82
447
  return tuple;
83
447
}
84
85
void AddPrimaryKey(
86
    const std::shared_ptr<tablet::TabletPeer>& tablet_peer, const docdb::SubDocKey& decoded_key,
87
378
    const Schema& tablet_schema, RowMessage* row_message) {
88
378
  size_t i = 0;
89
378
  for (const auto& col : decoded_key.doc_key().hashed_group()) {
90
378
    DatumMessagePB* tuple = AddTuple(row_message);
91
92
378
    AddColumnToMap(
93
378
        tablet_peer, tablet_schema.column(i), col, tuple);
94
378
    i++;
95
378
  }
96
97
69
  for (const auto& col : decoded_key.doc_key().range_group()) {
98
69
    DatumMessagePB* tuple = AddTuple(row_message);
99
100
69
    AddColumnToMap(
101
69
        tablet_peer, tablet_schema.column(i), col, tuple);
102
69
    i++;
103
69
  }
104
378
}
105
106
void SetCDCSDKOpId(
107
    int64_t term, int64_t index, uint32_t write_id, const std::string& key,
108
604
    CDCSDKOpIdPB* cdc_sdk_op_id_pb) {
109
604
  cdc_sdk_op_id_pb->set_term(term);
110
604
  cdc_sdk_op_id_pb->set_index(index);
111
604
  cdc_sdk_op_id_pb->set_write_id(write_id);
112
604
  cdc_sdk_op_id_pb->set_write_id_key(key);
113
604
}
114
115
void SetCheckpoint(
116
    int64_t term, int64_t index, int32 write_id, const std::string& key, uint64 time,
117
4.10k
    CDCSDKCheckpointPB* cdc_sdk_checkpoint_pb, OpId* last_streamed_op_id) {
118
4.10k
  cdc_sdk_checkpoint_pb->set_term(term);
119
4.10k
  cdc_sdk_checkpoint_pb->set_index(index);
120
4.10k
  cdc_sdk_checkpoint_pb->set_write_id(write_id);
121
4.10k
  cdc_sdk_checkpoint_pb->set_key(key);
122
4.10k
  cdc_sdk_checkpoint_pb->set_snapshot_time(time);
123
4.10k
  if (last_streamed_op_id) {
124
4.09k
    last_streamed_op_id->term = term;
125
4.09k
    last_streamed_op_id->index = index;
126
4.09k
  }
127
4.10k
}
128
129
bool ShouldCreateNewProtoRecord(
130
309
    const RowMessage& row_message, const Schema& schema, size_t col_count) {
131
309
  return (row_message.op() == RowMessage_Op_INSERT && col_count == schema.num_columns()) ||
132
228
         (row_message.op() == RowMessage_Op_UPDATE || row_message.op() == RowMessage_Op_DELETE);
133
309
}
134
135
309
bool IsInsertOperation(const RowMessage& row_message) {
136
309
  return row_message.op() == RowMessage_Op_INSERT;
137
309
}
138
139
792
bool IsInsertOrUpdate(const RowMessage& row_message) {
140
792
  return row_message.IsInitialized()  &&
141
792
      (row_message.op() == RowMessage_Op_INSERT
142
136
       || row_message.op() == RowMessage_Op_UPDATE);
143
792
}
144
145
void MakeNewProtoRecord(
146
    const docdb::IntentKeyValueForCDC& intent, const OpId& op_id, const RowMessage& row_message,
147
    const Schema& schema, size_t col_count, CDCSDKProtoRecordPB* proto_record,
148
309
    GetChangesResponsePB* resp, IntraTxnWriteId* write_id, std::string* reverse_index_key) {
149
309
  if (ShouldCreateNewProtoRecord(row_message, schema, col_count)) {
150
173
    CDCSDKOpIdPB* cdc_sdk_op_id_pb = proto_record->mutable_cdc_sdk_op_id();
151
173
    SetCDCSDKOpId(
152
173
        op_id.term, op_id.index, intent.write_id, intent.reverse_index_key, cdc_sdk_op_id_pb);
153
154
173
    CDCSDKProtoRecordPB* record_to_be_added = resp->add_cdc_sdk_proto_records();
155
173
    record_to_be_added->CopyFrom(*proto_record);
156
173
    record_to_be_added->mutable_row_message()->CopyFrom(row_message);
157
158
173
    *write_id = intent.write_id;
159
173
    *reverse_index_key = intent.reverse_index_key;
160
173
  }
161
309
}
162
// Populate CDC record corresponding to WAL batch in ReplicateMsg.
163
CHECKED_STATUS PopulateCDCSDKIntentRecord(
164
    const OpId& op_id,
165
    const TransactionId& transaction_id,
166
    const std::vector<docdb::IntentKeyValueForCDC>& intents,
167
    const StreamMetadata& metadata,
168
    const std::shared_ptr<tablet::TabletPeer>& tablet_peer,
169
    GetChangesResponsePB* resp,
170
    ScopedTrackedConsumption* consumption,
171
    IntraTxnWriteId* write_id,
172
    std::string* reverse_index_key,
173
74
    Schema* old_schema) {
174
74
  Schema& schema = old_schema ? *old_schema : *tablet_peer->tablet()->schema();
175
74
  Slice prev_key;
176
74
  CDCSDKProtoRecordPB proto_record;
177
74
  RowMessage* row_message = proto_record.mutable_row_message();
178
74
  size_t col_count = 0;
179
309
  for (const auto& intent : intents) {
180
309
    Slice key(intent.key_buf);
181
309
    Slice value(intent.value_buf);
182
309
    const auto key_size =
183
309
        VERIFY_RESULT(docdb::DocKey::EncodedSize(key, docdb::DocKeyPart::kWholeDocKey));
184
185
309
    docdb::PrimitiveValue column_id;
186
309
    boost::optional<docdb::PrimitiveValue> column_id_opt;
187
309
    Slice key_column = key.WithoutPrefix(key_size);
188
309
    if (!key_column.empty()) {
189
268
      RETURN_NOT_OK(docdb::SubDocument::DecodeKey(&key_column, &column_id));
190
268
      column_id_opt = boost::optional<docdb::PrimitiveValue>(column_id);
191
268
    }
192
193
309
    Slice sub_doc_key = key;
194
309
    docdb::SubDocKey decoded_key;
195
309
    RETURN_NOT_OK(decoded_key.DecodeFrom(&sub_doc_key, docdb::HybridTimeRequired::kFalse));
196
197
309
    docdb::Value decoded_value;
198
309
    RETURN_NOT_OK(decoded_value.Decode(value));
199
200
309
    if (column_id_opt && column_id_opt->value_type() == docdb::ValueType::kColumnId &&
201
187
        schema.is_key_column(column_id_opt->GetColumnId())) {
202
0
      *write_id = intent.write_id;
203
0
      *reverse_index_key = intent.reverse_index_key;
204
0
      continue;
205
0
    }
206
207
309
    if (*consumption) {
208
307
      consumption->Add(key.size());
209
307
    }
210
211
    // Compare key hash with previously seen key hash to determine whether the write pair
212
    // is part of the same row or not.
213
309
    Slice primary_key(key.data(), key_size);
214
309
    if (prev_key != primary_key || col_count >= schema.num_columns()) {
215
173
      proto_record.Clear();
216
173
      row_message->Clear();
217
218
      // Check whether operation is WRITE or DELETE.
219
173
      if (decoded_value.value_type() == docdb::ValueType::kTombstone &&
220
41
          decoded_key.num_subkeys() == 0) {
221
41
        SetOperation(row_message, OpType::DELETE, schema);
222
41
        *write_id = intent.write_id;
223
132
      } else {
224
132
        if (column_id_opt &&
225
132
            column_id_opt->value_type() == docdb::ValueType::kSystemColumnId &&
226
81
            decoded_value.value_type() == docdb::ValueType::kNullLow) {
227
81
          SetOperation(row_message, OpType::INSERT, schema);
228
81
          col_count = schema.num_key_columns() - 1;
229
51
        } else {
230
51
          SetOperation(row_message, OpType::UPDATE, schema);
231
51
          col_count = schema.num_columns();
232
51
          *write_id = intent.write_id;
233
51
        }
234
132
      }
235
236
      // Write pair contains record for different row. Create a new CDCRecord in this case.
237
173
      row_message->set_transaction_id(transaction_id.ToString());
238
173
      AddPrimaryKey(tablet_peer, decoded_key, schema, row_message);
239
173
    }
240
241
309
    if (IsInsertOperation(*row_message)) {
242
217
      ++col_count;
243
217
    }
244
245
309
    prev_key = primary_key;
246
309
    if (IsInsertOrUpdate(*row_message)) {
247
268
      if (column_id_opt && column_id_opt->value_type() == docdb::ValueType::kColumnId) {
248
187
        const ColumnSchema& col = VERIFY_RESULT(schema.column_by_id(column_id_opt->GetColumnId()));
249
250
187
        AddColumnToMap(
251
187
            tablet_peer, col, decoded_value.primitive_value(), row_message->add_new_tuple());
252
187
        row_message->add_old_tuple();
253
254
81
      } else if (
255
81
          column_id_opt && column_id_opt->value_type() != docdb::ValueType::kSystemColumnId) {
256
0
        LOG(DFATAL) << "Unexpected value type in key: " << column_id_opt->value_type()
257
0
                    << " key: " << decoded_key.ToString()
258
0
                    << " value: " << decoded_value.primitive_value();
259
0
      }
260
268
    }
261
309
    row_message->set_table(tablet_peer->tablet()->metadata()->table_name());
262
309
    MakeNewProtoRecord(
263
309
        intent, op_id, *row_message, schema, col_count, &proto_record, resp, write_id,
264
309
        reverse_index_key);
265
309
  }
266
267
74
  return Status::OK();
268
74
}
269
270
// Populate CDC record corresponding to WAL batch in ReplicateMsg.
271
CHECKED_STATUS PopulateCDCSDKWriteRecord(
272
    const ReplicateMsgPtr& msg,
273
    const StreamMetadata& metadata,
274
    const std::shared_ptr<tablet::TabletPeer>& tablet_peer,
275
    GetChangesResponsePB* resp,
276
205
    const Schema& schema) {
277
205
  const auto& batch = msg->write().write_batch();
278
205
  CDCSDKProtoRecordPB* proto_record = nullptr;
279
205
  RowMessage* row_message = nullptr;
280
281
  // Write batch may contain records from different rows.
282
  // For CDC, we need to split the batch into 1 CDC record per row of the table.
283
  // We'll use DocDB key hash to identify the records that belong to the same row.
284
205
  Slice prev_key;
285
286
483
  for (const auto& write_pair : batch.write_pairs()) {
287
483
    Slice key = write_pair.key();
288
483
    const auto key_size =
289
483
        VERIFY_RESULT(docdb::DocKey::EncodedSize(key, docdb::DocKeyPart::kWholeDocKey));
290
291
483
    Slice value = write_pair.value();
292
483
    docdb::Value decoded_value;
293
483
    RETURN_NOT_OK(decoded_value.Decode(value));
294
295
    // Compare key hash with previously seen key hash to determine whether the write pair
296
    // is part of the same row or not.
297
483
    Slice primary_key(key.data(), key_size);
298
483
    if (prev_key != primary_key) {
299
      // Write pair contains record for different row. Create a new CDCRecord in this case.
300
205
      proto_record = resp->add_cdc_sdk_proto_records();
301
205
      row_message = proto_record->mutable_row_message();
302
205
      row_message->set_pgschema_name(schema.SchemaName());
303
205
      row_message->set_table(tablet_peer->tablet()->metadata()->table_name());
304
305
205
      CDCSDKOpIdPB* cdc_sdk_op_id_pb = proto_record->mutable_cdc_sdk_op_id();
306
205
      SetCDCSDKOpId(msg->id().term(), msg->id().index(), 0, "", cdc_sdk_op_id_pb);
307
308
205
      Slice sub_doc_key = key;
309
205
      docdb::SubDocKey decoded_key;
310
205
      RETURN_NOT_OK(decoded_key.DecodeFrom(&sub_doc_key, docdb::HybridTimeRequired::kFalse));
311
312
      // Check whether operation is WRITE or DELETE.
313
205
      if (decoded_value.value_type() == docdb::ValueType::kTombstone &&
314
26
          decoded_key.num_subkeys() == 0) {
315
26
        SetOperation(row_message, OpType::DELETE, schema);
316
179
      } else {
317
179
        docdb::PrimitiveValue column_id;
318
179
        Slice key_column(key.WithoutPrefix(key_size));
319
179
        RETURN_NOT_OK(docdb::SubDocument::DecodeKey(&key_column, &column_id));
320
321
179
        if (column_id.value_type() == docdb::ValueType::kSystemColumnId &&
322
161
            decoded_value.value_type() == docdb::ValueType::kNullLow) {
323
161
          SetOperation(row_message, OpType::INSERT, schema);
324
18
        } else {
325
18
          SetOperation(row_message, OpType::UPDATE, schema);
326
18
        }
327
179
      }
328
329
205
      AddPrimaryKey(tablet_peer, decoded_key, schema, row_message);
330
331
      // Process intent records.
332
205
      row_message->set_commit_time(msg->hybrid_time());
333
205
    }
334
483
    prev_key = primary_key;
335
483
    DCHECK(proto_record);
336
337
483
    if (IsInsertOrUpdate(*row_message)) {
338
457
      docdb::PrimitiveValue column_id;
339
457
      Slice key_column((const char*)(key.data() + key_size));
340
457
      RETURN_NOT_OK(docdb::SubDocument::DecodeKey(&key_column, &column_id));
341
457
      if (column_id.value_type() == docdb::ValueType::kColumnId) {
342
296
        const ColumnSchema& col = VERIFY_RESULT(schema.column_by_id(column_id.GetColumnId()));
343
344
296
        AddColumnToMap(
345
296
            tablet_peer, col, decoded_value.primitive_value(), row_message->add_new_tuple());
346
296
        row_message->add_old_tuple();
347
348
161
      } else if (column_id.value_type() != docdb::ValueType::kSystemColumnId) {
349
0
        LOG(DFATAL) << "Unexpected value type in key: " << column_id.value_type();
350
0
      }
351
457
    }
352
483
  }
353
354
205
  return Status::OK();
355
205
}
356
357
void SetTableProperties(
358
    const TablePropertiesPB* table_properties,
359
314
    CDCSDKTablePropertiesPB* cdc_sdk_table_properties_pb) {
360
314
  cdc_sdk_table_properties_pb->set_default_time_to_live(table_properties->default_time_to_live());
361
314
  cdc_sdk_table_properties_pb->set_num_tablets(table_properties->num_tablets());
362
314
  cdc_sdk_table_properties_pb->set_is_ysql_catalog_table(table_properties->is_ysql_catalog_table());
363
314
}
364
365
883
void SetColumnInfo(const ColumnSchemaPB& column, CDCSDKColumnInfoPB* column_info) {
366
883
  column_info->set_name(column.name());
367
883
  column_info->mutable_type()->CopyFrom(column.type());
368
883
  column_info->set_is_key(column.is_key());
369
883
  column_info->set_is_hash_key(column.is_hash_key());
370
883
  column_info->set_is_nullable(column.is_nullable());
371
883
  column_info->set_oid(column.pg_type_oid());
372
883
}
373
374
CHECKED_STATUS PopulateCDCSDKDDLRecord(
375
    const ReplicateMsgPtr& msg, CDCSDKProtoRecordPB* proto_record, const string& table_name,
376
152
    const Schema& schema) {
377
152
  SCHECK(
378
152
      msg->has_change_metadata_request(), InvalidArgument,
379
152
      Format(
380
152
          "Change metadata (DDL) message requires metadata information: $0",
381
152
          msg->ShortDebugString()));
382
383
152
  RowMessage* row_message = nullptr;
384
385
152
  row_message = proto_record->mutable_row_message();
386
152
  row_message->set_op(RowMessage_Op_DDL);
387
152
  row_message->set_table(table_name);
388
389
152
  CDCSDKOpIdPB* cdc_sdk_op_id_pb = proto_record->mutable_cdc_sdk_op_id();
390
152
  SetCDCSDKOpId(msg->id().term(), msg->id().index(), 0, "", cdc_sdk_op_id_pb);
391
392
425
  for (const auto& column : msg->change_metadata_request().schema().columns()) {
393
425
    CDCSDKColumnInfoPB* column_info = nullptr;
394
425
    column_info = row_message->mutable_schema()->add_column_info();
395
425
    SetColumnInfo(column, column_info);
396
425
  }
397
398
152
  CDCSDKTablePropertiesPB* cdc_sdk_table_properties_pb;
399
152
  const TablePropertiesPB* table_properties =
400
152
      &(msg->change_metadata_request().schema().table_properties());
401
402
152
  cdc_sdk_table_properties_pb = row_message->mutable_schema()->mutable_tab_info();
403
152
  row_message->set_schema_version(msg->change_metadata_request().schema_version());
404
152
  row_message->set_new_table_name(msg->change_metadata_request().new_table_name());
405
152
  row_message->set_pgschema_name(schema.SchemaName());
406
152
  SetTableProperties(table_properties, cdc_sdk_table_properties_pb);
407
408
152
  return Status::OK();
409
152
}
410
411
CHECKED_STATUS PopulateCDCSDKTruncateRecord(
412
0
    const ReplicateMsgPtr& msg, CDCSDKProtoRecordPB* proto_record, const Schema& schema) {
413
0
  SCHECK(
414
0
      msg->has_truncate(), InvalidArgument,
415
0
      Format(
416
0
          "Truncate message requires truncate request information: $0", msg->ShortDebugString()));
417
418
0
  RowMessage* row_message = nullptr;
419
420
0
  row_message = proto_record->mutable_row_message();
421
0
  row_message->set_op(RowMessage_Op_TRUNCATE);
422
0
  row_message->set_pgschema_name(schema.SchemaName());
423
0
  row_message->mutable_truncate_request_info()->CopyFrom(msg->truncate());
424
425
0
  CDCSDKOpIdPB* cdc_sdk_op_id_pb;
426
427
0
  cdc_sdk_op_id_pb = proto_record->mutable_cdc_sdk_op_id();
428
0
  SetCDCSDKOpId(msg->id().term(), msg->id().index(), 0, "", cdc_sdk_op_id_pb);
429
430
0
  return Status::OK();
431
0
}
432
433
74
void SetTermIndex(int64_t term, int64_t index, CDCSDKCheckpointPB* checkpoint) {
434
74
  checkpoint->set_term(term);
435
74
  checkpoint->set_index(index);
436
74
}
437
438
74
void SetKeyWriteId(string key, int32_t write_id, CDCSDKCheckpointPB* checkpoint) {
439
74
  checkpoint->set_key(key);
440
74
  checkpoint->set_write_id(write_id);
441
74
}
442
443
CHECKED_STATUS ProcessIntents(
444
    const OpId& op_id,
445
    const TransactionId& transaction_id,
446
    const StreamMetadata& metadata,
447
    GetChangesResponsePB* resp,
448
    ScopedTrackedConsumption* consumption,
449
    CDCSDKCheckpointPB* checkpoint,
450
    const std::shared_ptr<tablet::TabletPeer>& tablet_peer,
451
    std::vector<docdb::IntentKeyValueForCDC>* keyValueIntents,
452
    docdb::ApplyTransactionState* stream_state,
453
74
    Schema* schema) {
454
74
  if (stream_state->key.empty() && stream_state->write_id == 0) {
455
74
    CDCSDKProtoRecordPB* proto_record = resp->add_cdc_sdk_proto_records();
456
74
    RowMessage* row_message = proto_record->mutable_row_message();
457
74
    row_message->set_op(RowMessage_Op_BEGIN);
458
74
    row_message->set_transaction_id(transaction_id.ToString());
459
74
    row_message->set_table(tablet_peer->tablet()->metadata()->table_name());
460
74
  }
461
462
74
  auto tablet = tablet_peer->shared_tablet();
463
74
  RETURN_NOT_OK(tablet->GetIntents(transaction_id, keyValueIntents, stream_state));
464
465
309
  for (auto& keyValue : *keyValueIntents) {
466
309
    docdb::SubDocKey sub_doc_key;
467
309
    CHECK_OK(
468
309
        sub_doc_key.FullyDecodeFrom(Slice(keyValue.key_buf), docdb::HybridTimeRequired::kFalse));
469
309
    docdb::Value decoded_value;
470
309
    RETURN_NOT_OK(decoded_value.Decode(Slice(keyValue.value_buf)));
471
309
  }
472
473
74
  std::string reverse_index_key;
474
74
  IntraTxnWriteId write_id = 0;
475
476
  // Need to populate the CDCSDKRecords
477
74
  RETURN_NOT_OK(PopulateCDCSDKIntentRecord(
478
74
      op_id, transaction_id, *keyValueIntents, metadata, tablet_peer, resp, consumption, &write_id,
479
74
      &reverse_index_key, schema));
480
481
74
  SetTermIndex(op_id.term, op_id.index, checkpoint);
482
483
74
  if (stream_state->key.empty() && stream_state->write_id == 0) {
484
74
    CDCSDKProtoRecordPB* proto_record = resp->add_cdc_sdk_proto_records();
485
74
    RowMessage* row_message = proto_record->mutable_row_message();
486
487
74
    row_message->set_op(RowMessage_Op_COMMIT);
488
74
    row_message->set_transaction_id(transaction_id.ToString());
489
74
    row_message->set_table(tablet_peer->tablet()->metadata()->table_name());
490
491
74
    CDCSDKOpIdPB* cdc_sdk_op_id_pb = proto_record->mutable_cdc_sdk_op_id();
492
74
    SetCDCSDKOpId(op_id.term, op_id.index, 0, "", cdc_sdk_op_id_pb);
493
74
    SetKeyWriteId("", 0, checkpoint);
494
0
  } else {
495
0
    SetKeyWriteId(reverse_index_key, write_id, checkpoint);
496
0
  }
497
498
74
  return Status::OK();
499
74
}
500
501
CHECKED_STATUS PopulateCDCSDKSnapshotRecord(
502
    GetChangesResponsePB* resp,
503
    const QLTableRow* row,
504
    const Schema& schema,
505
    const std::shared_ptr<tablet::TabletPeer>& tablet_peer,
506
12.7k
    ReadHybridTime time) {
507
12.7k
  CDCSDKProtoRecordPB* proto_record = nullptr;
508
12.7k
  RowMessage* row_message = nullptr;
509
12.7k
  string table_name = tablet_peer->tablet()->metadata()->table_name();
510
511
12.7k
  proto_record = resp->add_cdc_sdk_proto_records();
512
12.7k
  row_message = proto_record->mutable_row_message();
513
12.7k
  row_message->set_table(table_name);
514
12.7k
  row_message->set_op(RowMessage_Op_READ);
515
12.7k
  row_message->set_pgschema_name(schema.SchemaName());
516
12.7k
  row_message->set_commit_time(time.read.ToUint64());
517
518
12.7k
  DatumMessagePB* cdc_datum_message = nullptr;
519
520
51.0k
  for (size_t col_idx = 0; col_idx < schema.num_columns(); col_idx++) {
521
38.2k
    ColumnId col_id = schema.column_id(col_idx);
522
38.2k
    const auto* value = row->GetColumn(col_id);
523
38.2k
    const ColumnSchema& col_schema = VERIFY_RESULT(schema.column_by_id(col_id));
524
525
38.2k
    cdc_datum_message = row_message->add_new_tuple();
526
38.2k
    cdc_datum_message->set_column_name(col_schema.name());
527
528
38.2k
    if (value && value->value_case() != QLValuePB::VALUE_NOT_SET
529
38.2k
        && col_schema.pg_type_oid() != 0 /*kInvalidOid*/) {
530
38.2k
      docdb::SetValueFromQLBinaryWrapper(*value, col_schema.pg_type_oid(), cdc_datum_message);
531
0
    } else {
532
0
      cdc_datum_message->set_column_type(col_schema.pg_type_oid());
533
0
    }
534
535
38.2k
    row_message->add_old_tuple();
536
38.2k
  }
537
538
12.7k
  return Status::OK();
539
12.7k
}
540
541
162
void FillDDLInfo(RowMessage* row_message, const SchemaPB& schema, const uint32_t schema_version) {
542
458
  for (const auto& column : schema.columns()) {
543
458
    CDCSDKColumnInfoPB* column_info;
544
458
    column_info = row_message->mutable_schema()->add_column_info();
545
458
    SetColumnInfo(column, column_info);
546
458
  }
547
548
162
  row_message->set_schema_version(schema_version);
549
162
  row_message->set_pgschema_name(schema.pgschema_name());
550
162
  CDCSDKTablePropertiesPB* cdc_sdk_table_properties_pb =
551
162
      row_message->mutable_schema()->mutable_tab_info();
552
553
162
  const TablePropertiesPB* table_properties = &(schema.table_properties());
554
162
  SetTableProperties(table_properties, cdc_sdk_table_properties_pb);
555
162
}
556
557
// CDC get changes is different from 2DC as it doesn't need
558
// to read intents from WAL.
559
560
Status GetChangesForCDCSDK(
561
    const std::string& stream_id,
562
    const std::string& tablet_id,
563
    const CDCSDKCheckpointPB& from_op_id,
564
    const StreamMetadata& stream_metadata,
565
    const std::shared_ptr<tablet::TabletPeer>& tablet_peer,
566
    const MemTrackerPtr& mem_tracker,
567
    consensus::ReplicateMsgsHolder* msgs_holder,
568
    GetChangesResponsePB* resp,
569
    std::string* commit_timestamp,
570
    std::shared_ptr<Schema>* cached_schema,
571
    OpId* last_streamed_op_id,
572
    int64_t* last_readable_opid_index,
573
319
    const CoarseTimePoint deadline) {
574
319
  OpId op_id{from_op_id.term(), from_op_id.index()};
575
319
  ScopedTrackedConsumption consumption;
576
319
  CDCSDKProtoRecordPB* proto_record = nullptr;
577
319
  RowMessage* row_message = nullptr;
578
319
  CDCSDKCheckpointPB checkpoint;
579
319
  bool checkpoint_updated = false;
580
581
  // It is snapshot call.
582
319
  if (from_op_id.write_id() == -1) {
583
12
    auto txn_participant = tablet_peer->tablet()->transaction_participant();
584
12
    tablet::RemoveIntentsData data;
585
12
    ReadHybridTime time;
586
12
    std::string nextKey;
587
12
    SchemaPB schema_pb;
588
    // It is first call in snapshot then take snapshot.
589
12
    if ((from_op_id.key().empty()) && (from_op_id.snapshot_time() == 0)) {
590
6
      if (txn_participant == nullptr || txn_participant->context() == nullptr)
591
0
        return STATUS_SUBSTITUTE(
592
6
            Corruption, "Cannot read data as the transaction participant context is null");
593
6
      txn_participant->context()->GetLastReplicatedData(&data);
594
      // Set the checkpoint and communicate to the follower.
595
0
      VLOG(1) << "The first snapshot term " << data.op_id.term << "index  " << data.op_id.index
596
0
              << "time " << data.log_ht.ToUint64();
597
      // Update the CDCConsumerOpId.
598
6
      {
599
6
        std::shared_ptr<consensus::Consensus> shared_consensus = tablet_peer->shared_consensus();
600
6
        shared_consensus->UpdateCDCConsumerOpId(data.op_id);
601
6
      }
602
6
      if (txn_participant == nullptr || txn_participant->context() == nullptr)
603
0
        return STATUS_SUBSTITUTE(
604
6
            Corruption, "Cannot read data as the transaction participant context is null");
605
6
      txn_participant->context()->GetLastReplicatedData(&data);
606
6
      time = ReadHybridTime::SingleTime(data.log_ht);
607
608
      // This should go to cdc_state table.
609
      // Below condition update the checkpoint in cdc_state table.
610
6
      SetCheckpoint(
611
6
          data.op_id.term, data.op_id.index, -1, "", time.read.ToUint64(), &checkpoint, nullptr);
612
6
      checkpoint_updated = true;
613
6
    } else {
614
      // Snapshot is already taken.
615
6
      HybridTime ht;
616
6
      time = ReadHybridTime::FromUint64(from_op_id.snapshot_time());
617
6
      nextKey = from_op_id.key();
618
0
      VLOG(1) << "The after snapshot term " << from_op_id.term() << "index  " << from_op_id.index()
619
0
              << "key " << from_op_id.key() << "snapshot time " << from_op_id.snapshot_time();
620
621
6
      Schema schema = *tablet_peer->tablet()->schema().get();
622
6
      int limit = FLAGS_cdc_snapshot_batch_size;
623
6
      int fetched = 0;
624
6
      std::vector<QLTableRow> rows;
625
6
      auto iter = VERIFY_RESULT(tablet_peer->tablet()->CreateCDCSnapshotIterator(
626
6
          schema.CopyWithoutColumnIds(), time, nextKey));
627
628
6
      QLTableRow row;
629
6
      SchemaToPB(*tablet_peer->tablet()->schema().get(), &schema_pb);
630
631
6
      proto_record = resp->add_cdc_sdk_proto_records();
632
6
      row_message = proto_record->mutable_row_message();
633
6
      row_message->set_op(RowMessage_Op_DDL);
634
6
      row_message->set_table(tablet_peer->tablet()->metadata()->table_name());
635
636
6
      FillDDLInfo(row_message, schema_pb, tablet_peer->tablet()->metadata()->schema_version());
637
638
12.7k
      while (VERIFY_RESULT(iter->HasNext()) && fetched < limit) {
639
12.7k
        RETURN_NOT_OK(iter->NextRow(&row));
640
12.7k
        RETURN_NOT_OK(PopulateCDCSDKSnapshotRecord(resp, &row, schema, tablet_peer, time));
641
12.7k
        fetched++;
642
12.7k
      }
643
6
      docdb::SubDocKey sub_doc_key;
644
6
      RETURN_NOT_OK(iter->GetNextReadSubDocKey(&sub_doc_key));
645
646
      // Snapshot ends when next key is empty.
647
6
      if (sub_doc_key.doc_key().empty()) {
648
0
        VLOG(1) << "Setting next sub doc key empty ";
649
        // Get the checkpoint or read the checkpoint from the table/cache.
650
4
        SetCheckpoint(from_op_id.term(), from_op_id.index(), 0, "", 0, &checkpoint, nullptr);
651
4
        checkpoint_updated = true;
652
2
      } else {
653
0
        VLOG(1) << "Setting next sub doc key is " << sub_doc_key.Encode().ToStringBuffer();
654
655
2
        checkpoint.set_write_id(-1);
656
2
        SetCheckpoint(
657
2
            from_op_id.term(), from_op_id.index(), -1, sub_doc_key.Encode().ToStringBuffer(),
658
2
            time.read.ToUint64(), &checkpoint, nullptr);
659
2
        checkpoint_updated = true;
660
2
      }
661
6
    }
662
307
  } else if (!from_op_id.key().empty() && from_op_id.write_id() != 0) {
663
0
    std::string reverse_index_key = from_op_id.key();
664
0
    Slice reverse_index_key_slice(reverse_index_key);
665
0
    std::vector<docdb::IntentKeyValueForCDC> keyValueIntents;
666
0
    docdb::ApplyTransactionState stream_state;
667
0
    stream_state.key = from_op_id.key();
668
0
    stream_state.write_id = from_op_id.write_id();
669
670
0
    RETURN_NOT_OK(reverse_index_key_slice.consume_byte(docdb::ValueTypeAsChar::kTransactionId));
671
0
    auto transaction_id = VERIFY_RESULT(DecodeTransactionId(&reverse_index_key_slice));
672
673
0
    RETURN_NOT_OK(ProcessIntents(
674
0
        op_id, transaction_id, stream_metadata, resp, &consumption, &checkpoint, tablet_peer,
675
0
        &keyValueIntents, &stream_state, nullptr));
676
677
0
    if (checkpoint.write_id() == 0 && checkpoint.key().empty()) {
678
0
      last_streamed_op_id->term = checkpoint.term();
679
0
      last_streamed_op_id->index = checkpoint.index();
680
0
    }
681
0
    checkpoint_updated = true;
682
307
  } else {
683
307
    OpId checkpoint_op_id;
684
307
    RequestScope request_scope;
685
686
307
    auto read_ops = VERIFY_RESULT(tablet_peer->consensus()->ReadReplicatedMessagesForCDC(
687
307
        op_id, last_readable_opid_index, deadline));
688
689
307
    if (read_ops.read_from_disk_size && mem_tracker) {
690
301
      consumption = ScopedTrackedConsumption(mem_tracker, read_ops.read_from_disk_size);
691
301
    }
692
693
307
    auto txn_participant = tablet_peer->tablet()->transaction_participant();
694
307
    if (txn_participant) {
695
307
      request_scope = RequestScope(txn_participant);
696
307
    }
697
698
307
    Schema current_schema;
699
307
    bool pending_intents = false;
700
307
    bool schema_streamed = false;
701
702
4.30k
    for (const auto& msg : read_ops.messages) {
703
4.30k
      if (!schema_streamed && !(**cached_schema).initialized()) {
704
156
        current_schema.CopyFrom(*tablet_peer->tablet()->schema().get());
705
156
        string table_name = tablet_peer->tablet()->metadata()->table_name();
706
156
        schema_streamed = true;
707
708
156
        proto_record = resp->add_cdc_sdk_proto_records();
709
156
        row_message = proto_record->mutable_row_message();
710
156
        row_message->set_op(RowMessage_Op_DDL);
711
156
        row_message->set_table(table_name);
712
713
156
        *cached_schema = std::make_shared<Schema>(std::move(current_schema));
714
156
        SchemaPB current_schema_pb;
715
156
        SchemaToPB(**cached_schema, &current_schema_pb);
716
156
        FillDDLInfo(row_message,
717
156
                    current_schema_pb,
718
156
                    tablet_peer->tablet()->metadata()->schema_version());
719
4.15k
      } else {
720
4.15k
        current_schema = **cached_schema;
721
4.15k
      }
722
723
4.30k
      const auto& batch = msg->write().write_batch();
724
725
4.30k
      switch (msg->op_type()) {
726
74
        case consensus::OperationType::UPDATE_TRANSACTION_OP:
727
          // Ignore intents.
728
          // Read from IntentDB after they have been applied.
729
74
          if (msg->transaction_state().status() == TransactionStatus::APPLYING) {
730
74
            auto txn_id =
731
74
                VERIFY_RESULT(FullyDecodeTransactionId(msg->transaction_state().transaction_id()));
732
74
            auto result = GetTransactionStatus(txn_id, tablet_peer->Now(), txn_participant);
733
74
            std::vector<docdb::IntentKeyValueForCDC> intents;
734
74
            docdb::ApplyTransactionState new_stream_state;
735
736
74
            *commit_timestamp = msg->transaction_state().commit_hybrid_time();
737
74
            op_id.term = msg->id().term();
738
74
            op_id.index = msg->id().index();
739
74
            RETURN_NOT_OK(ProcessIntents(
740
74
                op_id, txn_id, stream_metadata, resp, &consumption, &checkpoint, tablet_peer,
741
74
                &intents, &new_stream_state, &current_schema));
742
743
74
            if (new_stream_state.write_id != 0 && !new_stream_state.key.empty()) {
744
0
              pending_intents = true;
745
74
            } else {
746
74
              last_streamed_op_id->term = msg->id().term();
747
74
              last_streamed_op_id->index = msg->id().index();
748
74
            }
749
74
          }
750
74
          checkpoint_updated = true;
751
74
          break;
752
753
344
        case consensus::OperationType::WRITE_OP:
754
344
          if (!batch.has_transaction()) {
755
205
            RETURN_NOT_OK(
756
205
                PopulateCDCSDKWriteRecord(msg, stream_metadata, tablet_peer, resp, current_schema));
757
758
205
            SetCheckpoint(
759
205
                msg->id().term(), msg->id().index(), 0, "", 0, &checkpoint, last_streamed_op_id);
760
205
            checkpoint_updated = true;
761
205
          }
762
344
          break;
763
764
3.88k
        case consensus::OperationType::CHANGE_METADATA_OP: {
765
3.88k
          RETURN_NOT_OK(SchemaFromPB(msg->change_metadata_request().schema(), &current_schema));
766
3.88k
          string table_name = tablet_peer->tablet()->metadata()->table_name();
767
3.88k
          *cached_schema = std::make_shared<Schema>(std::move(current_schema));
768
3.88k
          if ((resp->cdc_sdk_proto_records_size() > 0 &&
769
3.73k
               resp->cdc_sdk_proto_records(resp->cdc_sdk_proto_records_size() - 1)
770
3.73k
                       .row_message()
771
3.73k
                       .op() == RowMessage_Op_DDL)) {
772
3.73k
            if ((resp->cdc_sdk_proto_records(resp->cdc_sdk_proto_records_size() - 1)
773
3.73k
                     .row_message()
774
0
                     .schema_version() != msg->change_metadata_request().schema_version())) {
775
0
              RETURN_NOT_OK(PopulateCDCSDKDDLRecord(
776
0
                  msg, resp->add_cdc_sdk_proto_records(), table_name, current_schema));
777
0
            }
778
152
          } else {
779
152
            RETURN_NOT_OK(PopulateCDCSDKDDLRecord(
780
152
                msg, resp->add_cdc_sdk_proto_records(), table_name, current_schema));
781
152
          }
782
3.88k
          SetCheckpoint(
783
3.88k
              msg->id().term(), msg->id().index(), 0, "", 0, &checkpoint, last_streamed_op_id);
784
3.88k
          checkpoint_updated = true;
785
3.88k
        }
786
3.88k
        break;
787
788
0
        case consensus::OperationType::TRUNCATE_OP: {
789
0
          RETURN_NOT_OK(
790
0
              PopulateCDCSDKTruncateRecord(msg, resp->add_cdc_sdk_proto_records(), current_schema));
791
792
0
          SetCheckpoint(
793
0
              msg->id().term(), msg->id().index(), 0, "", 0, &checkpoint, last_streamed_op_id);
794
0
          checkpoint_updated = true;
795
0
        } break;
796
797
0
        default:
798
          // Nothing to do for other operation types.
799
0
          break;
800
4.30k
      }
801
802
4.30k
      if (pending_intents) break;
803
4.30k
    }
804
307
    if (read_ops.messages.size() > 0)
805
307
      *msgs_holder = consensus::ReplicateMsgsHolder(
806
307
          nullptr, std::move(read_ops.messages), std::move(consumption));
807
307
  }
808
809
319
  if (consumption) {
810
0
    consumption.Add(resp->SpaceUsedLong());
811
0
  }
812
813
319
  checkpoint_updated ? resp->mutable_cdc_sdk_checkpoint()->CopyFrom(checkpoint)
814
0
                       : resp->mutable_cdc_sdk_checkpoint()->CopyFrom(from_op_id);
815
816
319
  if (checkpoint_updated) {
817
0
    VLOG(1) << "The checkpoint is updated " << resp->checkpoint().DebugString();
818
0
  } else {
819
0
    VLOG(1) << "The checkpoint is not updated " << resp->checkpoint().DebugString();
820
0
  }
821
822
319
  if (last_streamed_op_id->index > 0) {
823
307
    resp->mutable_checkpoint()->mutable_op_id()->set_term(last_streamed_op_id->term);
824
307
    resp->mutable_checkpoint()->mutable_op_id()->set_index(last_streamed_op_id->index);
825
307
  }
826
827
319
  return Status::OK();
828
319
}
829
830
}  // namespace cdc
831
}  // namespace yb