YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/ent/src/yb/cdc/cdcsdk_producer.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
13
#include "yb/cdc/cdc_producer.h"
14
15
#include "yb/cdc/cdc_common_util.h"
16
17
#include "yb/common/wire_protocol.h"
18
#include "yb/common/ql_expr.h"
19
20
#include "yb/docdb/docdb_util.h"
21
#include "yb/docdb/doc_key.h"
22
23
DEFINE_int32(cdc_snapshot_batch_size, 250, "Batch size for the snapshot operation in CDC");
24
25
namespace yb {
26
namespace cdc {
27
28
using consensus::ReplicateMsgPtr;
29
using consensus::ReplicateMsgs;
30
using docdb::PrimitiveValue;
31
using tablet::TransactionParticipant;
32
using yb::QLTableRow;
33
34
YB_DEFINE_ENUM(OpType, (INSERT)(UPDATE)(DELETE));
35
36
757
void SetOperation(RowMessage* row_message, OpType type, const Schema& schema) {
37
757
  switch (type) {
38
486
    case OpType::INSERT:
39
486
      row_message->set_op(RowMessage_Op_INSERT);
40
486
      break;
41
137
    case OpType::UPDATE:
42
137
      row_message->set_op(RowMessage_Op_UPDATE);
43
137
      break;
44
134
    case OpType::DELETE:
45
134
      row_message->set_op(RowMessage_Op_DELETE);
46
134
      break;
47
757
  }
48
49
757
  row_message->set_pgschema_name(schema.SchemaName());
50
757
}
51
52
void AddColumnToMap(
53
    const std::shared_ptr<tablet::TabletPeer>& tablet_peer,
54
    const ColumnSchema& col_schema,
55
    const docdb::PrimitiveValue& col,
56
1.86k
    DatumMessagePB* cdc_datum_message) {
57
1.86k
  cdc_datum_message->set_column_name(col_schema.name());
58
1.86k
  QLValuePB ql_value;
59
1.86k
  if (tablet_peer->tablet()->table_type() == PGSQL_TABLE_TYPE) {
60
1.86k
    docdb::PrimitiveValue::ToQLValuePB(col, col_schema.type(), &ql_value);
61
1.86k
    if (!IsNull(ql_value) && 
col_schema.pg_type_oid() != 01.77k
/*kInvalidOid*/) {
62
1.77k
      docdb::SetValueFromQLBinaryWrapper(ql_value, col_schema.pg_type_oid(), cdc_datum_message);
63
1.77k
    } else {
64
92
      cdc_datum_message->set_column_type(col_schema.pg_type_oid());
65
92
    }
66
1.86k
  }
67
1.86k
}
68
69
895
DatumMessagePB* AddTuple(RowMessage* row_message) {
70
895
  if (!row_message) {
71
0
    return nullptr;
72
0
  }
73
895
  DatumMessagePB* tuple = nullptr;
74
75
895
  if (row_message->op() == RowMessage_Op_DELETE) {
76
172
    tuple = row_message->add_old_tuple();
77
172
    row_message->add_new_tuple();
78
723
  } else {
79
723
    tuple = row_message->add_new_tuple();
80
723
    row_message->add_old_tuple();
81
723
  }
82
895
  return tuple;
83
895
}
84
85
void AddPrimaryKey(
86
    const std::shared_ptr<tablet::TabletPeer>& tablet_peer, const docdb::SubDocKey& decoded_key,
87
757
    const Schema& tablet_schema, RowMessage* row_message) {
88
757
  size_t i = 0;
89
757
  for (const auto& col : decoded_key.doc_key().hashed_group()) {
90
757
    DatumMessagePB* tuple = AddTuple(row_message);
91
92
757
    AddColumnToMap(
93
757
        tablet_peer, tablet_schema.column(i), col, tuple);
94
757
    i++;
95
757
  }
96
97
757
  for (const auto& col : decoded_key.doc_key().range_group()) {
98
138
    DatumMessagePB* tuple = AddTuple(row_message);
99
100
138
    AddColumnToMap(
101
138
        tablet_peer, tablet_schema.column(i), col, tuple);
102
138
    i++;
103
138
  }
104
757
}
105
106
void SetCDCSDKOpId(
107
    int64_t term, int64_t index, uint32_t write_id, const std::string& key,
108
1.21k
    CDCSDKOpIdPB* cdc_sdk_op_id_pb) {
109
1.21k
  cdc_sdk_op_id_pb->set_term(term);
110
1.21k
  cdc_sdk_op_id_pb->set_index(index);
111
1.21k
  cdc_sdk_op_id_pb->set_write_id(write_id);
112
1.21k
  cdc_sdk_op_id_pb->set_write_id_key(key);
113
1.21k
}
114
115
void SetCheckpoint(
116
    int64_t term, int64_t index, int32 write_id, const std::string& key, uint64 time,
117
8.21k
    CDCSDKCheckpointPB* cdc_sdk_checkpoint_pb, OpId* last_streamed_op_id) {
118
8.21k
  cdc_sdk_checkpoint_pb->set_term(term);
119
8.21k
  cdc_sdk_checkpoint_pb->set_index(index);
120
8.21k
  cdc_sdk_checkpoint_pb->set_write_id(write_id);
121
8.21k
  cdc_sdk_checkpoint_pb->set_key(key);
122
8.21k
  cdc_sdk_checkpoint_pb->set_snapshot_time(time);
123
8.21k
  if (last_streamed_op_id) {
124
8.19k
    last_streamed_op_id->term = term;
125
8.19k
    last_streamed_op_id->index = index;
126
8.19k
  }
127
8.21k
}
128
129
bool ShouldCreateNewProtoRecord(
130
617
    const RowMessage& row_message, const Schema& schema, size_t col_count) {
131
617
  return (row_message.op() == RowMessage_Op_INSERT && 
col_count == schema.num_columns()434
) ||
132
617
         
(455
row_message.op() == RowMessage_Op_UPDATE455
||
row_message.op() == RowMessage_Op_DELETE354
);
133
617
}
134
135
617
bool IsInsertOperation(const RowMessage& row_message) {
136
617
  return row_message.op() == RowMessage_Op_INSERT;
137
617
}
138
139
1.58k
bool IsInsertOrUpdate(const RowMessage& row_message) {
140
1.58k
  return row_message.IsInitialized()  &&
141
1.58k
      (row_message.op() == RowMessage_Op_INSERT
142
1.58k
       || 
row_message.op() == RowMessage_Op_UPDATE271
);
143
1.58k
}
144
145
void MakeNewProtoRecord(
146
    const docdb::IntentKeyValueForCDC& intent, const OpId& op_id, const RowMessage& row_message,
147
    const Schema& schema, size_t col_count, CDCSDKProtoRecordPB* proto_record,
148
617
    GetChangesResponsePB* resp, IntraTxnWriteId* write_id, std::string* reverse_index_key) {
149
617
  if (ShouldCreateNewProtoRecord(row_message, schema, col_count)) {
150
345
    CDCSDKOpIdPB* cdc_sdk_op_id_pb = proto_record->mutable_cdc_sdk_op_id();
151
345
    SetCDCSDKOpId(
152
345
        op_id.term, op_id.index, intent.write_id, intent.reverse_index_key, cdc_sdk_op_id_pb);
153
154
345
    CDCSDKProtoRecordPB* record_to_be_added = resp->add_cdc_sdk_proto_records();
155
345
    record_to_be_added->CopyFrom(*proto_record);
156
345
    record_to_be_added->mutable_row_message()->CopyFrom(row_message);
157
158
345
    *write_id = intent.write_id;
159
345
    *reverse_index_key = intent.reverse_index_key;
160
345
  }
161
617
}
162
// Populate CDC record corresponding to WAL batch in ReplicateMsg.
163
CHECKED_STATUS PopulateCDCSDKIntentRecord(
164
    const OpId& op_id,
165
    const TransactionId& transaction_id,
166
    const std::vector<docdb::IntentKeyValueForCDC>& intents,
167
    const StreamMetadata& metadata,
168
    const std::shared_ptr<tablet::TabletPeer>& tablet_peer,
169
    GetChangesResponsePB* resp,
170
    ScopedTrackedConsumption* consumption,
171
    IntraTxnWriteId* write_id,
172
    std::string* reverse_index_key,
173
147
    Schema* old_schema) {
174
147
  Schema& schema = old_schema ? *old_schema : 
*tablet_peer->tablet()->schema()0
;
175
147
  Slice prev_key;
176
147
  CDCSDKProtoRecordPB proto_record;
177
147
  RowMessage* row_message = proto_record.mutable_row_message();
178
147
  size_t col_count = 0;
179
617
  for (const auto& intent : intents) {
180
617
    Slice key(intent.key_buf);
181
617
    Slice value(intent.value_buf);
182
617
    const auto key_size =
183
617
        VERIFY_RESULT(docdb::DocKey::EncodedSize(key, docdb::DocKeyPart::kWholeDocKey));
184
185
0
    docdb::PrimitiveValue column_id;
186
617
    boost::optional<docdb::PrimitiveValue> column_id_opt;
187
617
    Slice key_column = key.WithoutPrefix(key_size);
188
617
    if (!key_column.empty()) {
189
535
      RETURN_NOT_OK(docdb::SubDocument::DecodeKey(&key_column, &column_id));
190
535
      column_id_opt = boost::optional<docdb::PrimitiveValue>(column_id);
191
535
    }
192
193
617
    Slice sub_doc_key = key;
194
617
    docdb::SubDocKey decoded_key;
195
617
    RETURN_NOT_OK(decoded_key.DecodeFrom(&sub_doc_key, docdb::HybridTimeRequired::kFalse));
196
197
617
    docdb::Value decoded_value;
198
617
    RETURN_NOT_OK(decoded_value.Decode(value));
199
200
617
    if (column_id_opt && 
column_id_opt->value_type() == docdb::ValueType::kColumnId535
&&
201
617
        
schema.is_key_column(column_id_opt->GetColumnId())373
) {
202
0
      *write_id = intent.write_id;
203
0
      *reverse_index_key = intent.reverse_index_key;
204
0
      continue;
205
0
    }
206
207
617
    if (*consumption) {
208
613
      consumption->Add(key.size());
209
613
    }
210
211
    // Compare key hash with previously seen key hash to determine whether the write pair
212
    // is part of the same row or not.
213
617
    Slice primary_key(key.data(), key_size);
214
617
    if (prev_key != primary_key || 
col_count >= schema.num_columns()382
) {
215
345
      proto_record.Clear();
216
345
      row_message->Clear();
217
218
      // Check whether operation is WRITE or DELETE.
219
345
      if (decoded_value.value_type() == docdb::ValueType::kTombstone &&
220
345
          
decoded_key.num_subkeys() == 082
) {
221
82
        SetOperation(row_message, OpType::DELETE, schema);
222
82
        *write_id = intent.write_id;
223
263
      } else {
224
263
        if (column_id_opt &&
225
263
            column_id_opt->value_type() == docdb::ValueType::kSystemColumnId &&
226
263
            
decoded_value.value_type() == docdb::ValueType::kNullLow162
) {
227
162
          SetOperation(row_message, OpType::INSERT, schema);
228
162
          col_count = schema.num_key_columns() - 1;
229
162
        } else {
230
101
          SetOperation(row_message, OpType::UPDATE, schema);
231
101
          col_count = schema.num_columns();
232
101
          *write_id = intent.write_id;
233
101
        }
234
263
      }
235
236
      // Write pair contains record for different row. Create a new CDCRecord in this case.
237
345
      row_message->set_transaction_id(transaction_id.ToString());
238
345
      AddPrimaryKey(tablet_peer, decoded_key, schema, row_message);
239
345
    }
240
241
617
    if (IsInsertOperation(*row_message)) {
242
434
      ++col_count;
243
434
    }
244
245
617
    prev_key = primary_key;
246
617
    if (IsInsertOrUpdate(*row_message)) {
247
535
      if (column_id_opt && column_id_opt->value_type() == docdb::ValueType::kColumnId) {
248
373
        const ColumnSchema& col = VERIFY_RESULT(schema.column_by_id(column_id_opt->GetColumnId()));
249
250
0
        AddColumnToMap(
251
373
            tablet_peer, col, decoded_value.primitive_value(), row_message->add_new_tuple());
252
373
        row_message->add_old_tuple();
253
254
373
      } else if (
255
162
          column_id_opt && column_id_opt->value_type() != docdb::ValueType::kSystemColumnId) {
256
0
        LOG(DFATAL) << "Unexpected value type in key: " << column_id_opt->value_type()
257
0
                    << " key: " << decoded_key.ToString()
258
0
                    << " value: " << decoded_value.primitive_value();
259
0
      }
260
535
    }
261
617
    row_message->set_table(tablet_peer->tablet()->metadata()->table_name());
262
617
    MakeNewProtoRecord(
263
617
        intent, op_id, *row_message, schema, col_count, &proto_record, resp, write_id,
264
617
        reverse_index_key);
265
617
  }
266
267
147
  return Status::OK();
268
147
}
269
270
// Populate CDC record corresponding to WAL batch in ReplicateMsg.
271
CHECKED_STATUS PopulateCDCSDKWriteRecord(
272
    const ReplicateMsgPtr& msg,
273
    const StreamMetadata& metadata,
274
    const std::shared_ptr<tablet::TabletPeer>& tablet_peer,
275
    GetChangesResponsePB* resp,
276
412
    const Schema& schema) {
277
412
  const auto& batch = msg->write().write_batch();
278
412
  CDCSDKProtoRecordPB* proto_record = nullptr;
279
412
  RowMessage* row_message = nullptr;
280
281
  // Write batch may contain records from different rows.
282
  // For CDC, we need to split the batch into 1 CDC record per row of the table.
283
  // We'll use DocDB key hash to identify the records that belong to the same row.
284
412
  Slice prev_key;
285
286
970
  for (const auto& write_pair : batch.write_pairs()) {
287
970
    Slice key = write_pair.key();
288
970
    const auto key_size =
289
970
        VERIFY_RESULT(docdb::DocKey::EncodedSize(key, docdb::DocKeyPart::kWholeDocKey));
290
291
0
    Slice value = write_pair.value();
292
970
    docdb::Value decoded_value;
293
970
    RETURN_NOT_OK(decoded_value.Decode(value));
294
295
    // Compare key hash with previously seen key hash to determine whether the write pair
296
    // is part of the same row or not.
297
970
    Slice primary_key(key.data(), key_size);
298
970
    if (prev_key != primary_key) {
299
      // Write pair contains record for different row. Create a new CDCRecord in this case.
300
412
      proto_record = resp->add_cdc_sdk_proto_records();
301
412
      row_message = proto_record->mutable_row_message();
302
412
      row_message->set_pgschema_name(schema.SchemaName());
303
412
      row_message->set_table(tablet_peer->tablet()->metadata()->table_name());
304
305
412
      CDCSDKOpIdPB* cdc_sdk_op_id_pb = proto_record->mutable_cdc_sdk_op_id();
306
412
      SetCDCSDKOpId(msg->id().term(), msg->id().index(), 0, "", cdc_sdk_op_id_pb);
307
308
412
      Slice sub_doc_key = key;
309
412
      docdb::SubDocKey decoded_key;
310
412
      RETURN_NOT_OK(decoded_key.DecodeFrom(&sub_doc_key, docdb::HybridTimeRequired::kFalse));
311
312
      // Check whether operation is WRITE or DELETE.
313
412
      if (decoded_value.value_type() == docdb::ValueType::kTombstone &&
314
412
          
decoded_key.num_subkeys() == 052
) {
315
52
        SetOperation(row_message, OpType::DELETE, schema);
316
360
      } else {
317
360
        docdb::PrimitiveValue column_id;
318
360
        Slice key_column(key.WithoutPrefix(key_size));
319
360
        RETURN_NOT_OK(docdb::SubDocument::DecodeKey(&key_column, &column_id));
320
321
360
        if (column_id.value_type() == docdb::ValueType::kSystemColumnId &&
322
360
            
decoded_value.value_type() == docdb::ValueType::kNullLow324
) {
323
324
          SetOperation(row_message, OpType::INSERT, schema);
324
324
        } else {
325
36
          SetOperation(row_message, OpType::UPDATE, schema);
326
36
        }
327
360
      }
328
329
412
      AddPrimaryKey(tablet_peer, decoded_key, schema, row_message);
330
331
      // Process intent records.
332
412
      row_message->set_commit_time(msg->hybrid_time());
333
412
    }
334
970
    prev_key = primary_key;
335
970
    DCHECK(proto_record);
336
337
970
    if (IsInsertOrUpdate(*row_message)) {
338
918
      docdb::PrimitiveValue column_id;
339
918
      Slice key_column((const char*)(key.data() + key_size));
340
918
      RETURN_NOT_OK(docdb::SubDocument::DecodeKey(&key_column, &column_id));
341
918
      if (column_id.value_type() == docdb::ValueType::kColumnId) {
342
594
        const ColumnSchema& col = VERIFY_RESULT(schema.column_by_id(column_id.GetColumnId()));
343
344
0
        AddColumnToMap(
345
594
            tablet_peer, col, decoded_value.primitive_value(), row_message->add_new_tuple());
346
594
        row_message->add_old_tuple();
347
348
594
      } else 
if (324
column_id.value_type() != docdb::ValueType::kSystemColumnId324
) {
349
0
        LOG(DFATAL) << "Unexpected value type in key: " << column_id.value_type();
350
0
      }
351
918
    }
352
970
  }
353
354
412
  return Status::OK();
355
412
}
356
357
void SetTableProperties(
358
    const TablePropertiesPB* table_properties,
359
631
    CDCSDKTablePropertiesPB* cdc_sdk_table_properties_pb) {
360
631
  cdc_sdk_table_properties_pb->set_default_time_to_live(table_properties->default_time_to_live());
361
631
  cdc_sdk_table_properties_pb->set_num_tablets(table_properties->num_tablets());
362
631
  cdc_sdk_table_properties_pb->set_is_ysql_catalog_table(table_properties->is_ysql_catalog_table());
363
631
}
364
365
1.77k
void SetColumnInfo(const ColumnSchemaPB& column, CDCSDKColumnInfoPB* column_info) {
366
1.77k
  column_info->set_name(column.name());
367
1.77k
  column_info->mutable_type()->CopyFrom(column.type());
368
1.77k
  column_info->set_is_key(column.is_key());
369
1.77k
  column_info->set_is_hash_key(column.is_hash_key());
370
1.77k
  column_info->set_is_nullable(column.is_nullable());
371
1.77k
  column_info->set_oid(column.pg_type_oid());
372
1.77k
}
373
374
CHECKED_STATUS PopulateCDCSDKDDLRecord(
375
    const ReplicateMsgPtr& msg, CDCSDKProtoRecordPB* proto_record, const string& table_name,
376
306
    const Schema& schema) {
377
306
  SCHECK(
378
306
      msg->has_change_metadata_request(), InvalidArgument,
379
306
      Format(
380
306
          "Change metadata (DDL) message requires metadata information: $0",
381
306
          msg->ShortDebugString()));
382
383
306
  RowMessage* row_message = nullptr;
384
385
306
  row_message = proto_record->mutable_row_message();
386
306
  row_message->set_op(RowMessage_Op_DDL);
387
306
  row_message->set_table(table_name);
388
389
306
  CDCSDKOpIdPB* cdc_sdk_op_id_pb = proto_record->mutable_cdc_sdk_op_id();
390
306
  SetCDCSDKOpId(msg->id().term(), msg->id().index(), 0, "", cdc_sdk_op_id_pb);
391
392
854
  for (const auto& column : msg->change_metadata_request().schema().columns()) {
393
854
    CDCSDKColumnInfoPB* column_info = nullptr;
394
854
    column_info = row_message->mutable_schema()->add_column_info();
395
854
    SetColumnInfo(column, column_info);
396
854
  }
397
398
306
  CDCSDKTablePropertiesPB* cdc_sdk_table_properties_pb;
399
306
  const TablePropertiesPB* table_properties =
400
306
      &(msg->change_metadata_request().schema().table_properties());
401
402
306
  cdc_sdk_table_properties_pb = row_message->mutable_schema()->mutable_tab_info();
403
306
  row_message->set_schema_version(msg->change_metadata_request().schema_version());
404
306
  row_message->set_new_table_name(msg->change_metadata_request().new_table_name());
405
306
  row_message->set_pgschema_name(schema.SchemaName());
406
306
  SetTableProperties(table_properties, cdc_sdk_table_properties_pb);
407
408
306
  return Status::OK();
409
306
}
410
411
CHECKED_STATUS PopulateCDCSDKTruncateRecord(
412
0
    const ReplicateMsgPtr& msg, CDCSDKProtoRecordPB* proto_record, const Schema& schema) {
413
0
  SCHECK(
414
0
      msg->has_truncate(), InvalidArgument,
415
0
      Format(
416
0
          "Truncate message requires truncate request information: $0", msg->ShortDebugString()));
417
418
0
  RowMessage* row_message = nullptr;
419
420
0
  row_message = proto_record->mutable_row_message();
421
0
  row_message->set_op(RowMessage_Op_TRUNCATE);
422
0
  row_message->set_pgschema_name(schema.SchemaName());
423
0
  row_message->mutable_truncate_request_info()->CopyFrom(msg->truncate());
424
425
0
  CDCSDKOpIdPB* cdc_sdk_op_id_pb;
426
427
0
  cdc_sdk_op_id_pb = proto_record->mutable_cdc_sdk_op_id();
428
0
  SetCDCSDKOpId(msg->id().term(), msg->id().index(), 0, "", cdc_sdk_op_id_pb);
429
430
0
  return Status::OK();
431
0
}
432
433
147
void SetTermIndex(int64_t term, int64_t index, CDCSDKCheckpointPB* checkpoint) {
434
147
  checkpoint->set_term(term);
435
147
  checkpoint->set_index(index);
436
147
}
437
438
147
void SetKeyWriteId(string key, int32_t write_id, CDCSDKCheckpointPB* checkpoint) {
439
147
  checkpoint->set_key(key);
440
147
  checkpoint->set_write_id(write_id);
441
147
}
442
443
CHECKED_STATUS ProcessIntents(
444
    const OpId& op_id,
445
    const TransactionId& transaction_id,
446
    const StreamMetadata& metadata,
447
    GetChangesResponsePB* resp,
448
    ScopedTrackedConsumption* consumption,
449
    CDCSDKCheckpointPB* checkpoint,
450
    const std::shared_ptr<tablet::TabletPeer>& tablet_peer,
451
    std::vector<docdb::IntentKeyValueForCDC>* keyValueIntents,
452
    docdb::ApplyTransactionState* stream_state,
453
147
    Schema* schema) {
454
147
  if (stream_state->key.empty() && stream_state->write_id == 0) {
455
147
    CDCSDKProtoRecordPB* proto_record = resp->add_cdc_sdk_proto_records();
456
147
    RowMessage* row_message = proto_record->mutable_row_message();
457
147
    row_message->set_op(RowMessage_Op_BEGIN);
458
147
    row_message->set_transaction_id(transaction_id.ToString());
459
147
    row_message->set_table(tablet_peer->tablet()->metadata()->table_name());
460
147
  }
461
462
147
  auto tablet = tablet_peer->shared_tablet();
463
147
  RETURN_NOT_OK(tablet->GetIntents(transaction_id, keyValueIntents, stream_state));
464
465
617
  
for (auto& keyValue : *keyValueIntents)147
{
466
617
    docdb::SubDocKey sub_doc_key;
467
617
    CHECK_OK(
468
617
        sub_doc_key.FullyDecodeFrom(Slice(keyValue.key_buf), docdb::HybridTimeRequired::kFalse));
469
617
    docdb::Value decoded_value;
470
617
    RETURN_NOT_OK(decoded_value.Decode(Slice(keyValue.value_buf)));
471
617
  }
472
473
147
  std::string reverse_index_key;
474
147
  IntraTxnWriteId write_id = 0;
475
476
  // Need to populate the CDCSDKRecords
477
147
  RETURN_NOT_OK(PopulateCDCSDKIntentRecord(
478
147
      op_id, transaction_id, *keyValueIntents, metadata, tablet_peer, resp, consumption, &write_id,
479
147
      &reverse_index_key, schema));
480
481
147
  SetTermIndex(op_id.term, op_id.index, checkpoint);
482
483
147
  if (stream_state->key.empty() && stream_state->write_id == 0) {
484
147
    CDCSDKProtoRecordPB* proto_record = resp->add_cdc_sdk_proto_records();
485
147
    RowMessage* row_message = proto_record->mutable_row_message();
486
487
147
    row_message->set_op(RowMessage_Op_COMMIT);
488
147
    row_message->set_transaction_id(transaction_id.ToString());
489
147
    row_message->set_table(tablet_peer->tablet()->metadata()->table_name());
490
491
147
    CDCSDKOpIdPB* cdc_sdk_op_id_pb = proto_record->mutable_cdc_sdk_op_id();
492
147
    SetCDCSDKOpId(op_id.term, op_id.index, 0, "", cdc_sdk_op_id_pb);
493
147
    SetKeyWriteId("", 0, checkpoint);
494
147
  } else {
495
0
    SetKeyWriteId(reverse_index_key, write_id, checkpoint);
496
0
  }
497
498
147
  return Status::OK();
499
147
}
500
501
CHECKED_STATUS PopulateCDCSDKSnapshotRecord(
502
    GetChangesResponsePB* resp,
503
    const QLTableRow* row,
504
    const Schema& schema,
505
    const std::shared_ptr<tablet::TabletPeer>& tablet_peer,
506
25.5k
    ReadHybridTime time) {
507
25.5k
  CDCSDKProtoRecordPB* proto_record = nullptr;
508
25.5k
  RowMessage* row_message = nullptr;
509
25.5k
  string table_name = tablet_peer->tablet()->metadata()->table_name();
510
511
25.5k
  proto_record = resp->add_cdc_sdk_proto_records();
512
25.5k
  row_message = proto_record->mutable_row_message();
513
25.5k
  row_message->set_table(table_name);
514
25.5k
  row_message->set_op(RowMessage_Op_READ);
515
25.5k
  row_message->set_pgschema_name(schema.SchemaName());
516
25.5k
  row_message->set_commit_time(time.read.ToUint64());
517
518
25.5k
  DatumMessagePB* cdc_datum_message = nullptr;
519
520
102k
  for (size_t col_idx = 0; col_idx < schema.num_columns(); 
col_idx++76.5k
) {
521
76.5k
    ColumnId col_id = schema.column_id(col_idx);
522
76.5k
    const auto* value = row->GetColumn(col_id);
523
76.5k
    const ColumnSchema& col_schema = VERIFY_RESULT(schema.column_by_id(col_id));
524
525
0
    cdc_datum_message = row_message->add_new_tuple();
526
76.5k
    cdc_datum_message->set_column_name(col_schema.name());
527
528
76.5k
    if (value && value->value_case() != QLValuePB::VALUE_NOT_SET
529
76.5k
        && col_schema.pg_type_oid() != 0 /*kInvalidOid*/) {
530
76.5k
      docdb::SetValueFromQLBinaryWrapper(*value, col_schema.pg_type_oid(), cdc_datum_message);
531
76.5k
    } else {
532
0
      cdc_datum_message->set_column_type(col_schema.pg_type_oid());
533
0
    }
534
535
76.5k
    row_message->add_old_tuple();
536
76.5k
  }
537
538
25.5k
  return Status::OK();
539
25.5k
}
540
541
325
void FillDDLInfo(RowMessage* row_message, const SchemaPB& schema, const uint32_t schema_version) {
542
918
  for (const auto& column : schema.columns()) {
543
918
    CDCSDKColumnInfoPB* column_info;
544
918
    column_info = row_message->mutable_schema()->add_column_info();
545
918
    SetColumnInfo(column, column_info);
546
918
  }
547
548
325
  row_message->set_schema_version(schema_version);
549
325
  row_message->set_pgschema_name(schema.pgschema_name());
550
325
  CDCSDKTablePropertiesPB* cdc_sdk_table_properties_pb =
551
325
      row_message->mutable_schema()->mutable_tab_info();
552
553
325
  const TablePropertiesPB* table_properties = &(schema.table_properties());
554
325
  SetTableProperties(table_properties, cdc_sdk_table_properties_pb);
555
325
}
556
557
// CDC get changes is different from 2DC as it doesn't need
558
// to read intents from WAL.
559
560
Status GetChangesForCDCSDK(
561
    const std::string& stream_id,
562
    const std::string& tablet_id,
563
    const CDCSDKCheckpointPB& from_op_id,
564
    const StreamMetadata& stream_metadata,
565
    const std::shared_ptr<tablet::TabletPeer>& tablet_peer,
566
    const MemTrackerPtr& mem_tracker,
567
    consensus::ReplicateMsgsHolder* msgs_holder,
568
    GetChangesResponsePB* resp,
569
    std::string* commit_timestamp,
570
    std::shared_ptr<Schema>* cached_schema,
571
    OpId* last_streamed_op_id,
572
    int64_t* last_readable_opid_index,
573
641
    const CoarseTimePoint deadline) {
574
641
  OpId op_id{from_op_id.term(), from_op_id.index()};
575
641
  ScopedTrackedConsumption consumption;
576
641
  CDCSDKProtoRecordPB* proto_record = nullptr;
577
641
  RowMessage* row_message = nullptr;
578
641
  CDCSDKCheckpointPB checkpoint;
579
641
  bool checkpoint_updated = false;
580
581
  // It is snapshot call.
582
641
  if (from_op_id.write_id() == -1) {
583
24
    auto txn_participant = tablet_peer->tablet()->transaction_participant();
584
24
    ReadHybridTime time;
585
24
    std::string nextKey;
586
24
    SchemaPB schema_pb;
587
    // It is first call in snapshot then take snapshot.
588
24
    if ((from_op_id.key().empty()) && (from_op_id.snapshot_time() == 0)) {
589
12
      if (txn_participant == nullptr || txn_participant->context() == nullptr)
590
0
        return STATUS_SUBSTITUTE(
591
12
            Corruption, "Cannot read data as the transaction participant context is null");
592
12
      tablet::RemoveIntentsData data;
593
12
      RETURN_NOT_OK(txn_participant->context()->GetLastReplicatedData(&data));
594
      // Set the checkpoint and communicate to the follower.
595
12
      VLOG(1) << "The first snapshot term " << data.op_id.term << "index  " << data.op_id.index
596
0
              << "time " << data.log_ht.ToUint64();
597
      // Update the CDCConsumerOpId.
598
12
      {
599
12
        std::shared_ptr<consensus::Consensus> shared_consensus = tablet_peer->shared_consensus();
600
12
        shared_consensus->UpdateCDCConsumerOpId(data.op_id);
601
12
      }
602
12
      if (txn_participant == nullptr || txn_participant->context() == nullptr) {
603
0
        return STATUS_SUBSTITUTE(
604
0
            Corruption, "Cannot read data as the transaction participant context is null");
605
0
      }
606
12
      RETURN_NOT_OK(txn_participant->context()->GetLastReplicatedData(&data));
607
12
      time = ReadHybridTime::SingleTime(data.log_ht);
608
609
      // This should go to cdc_state table.
610
      // Below condition update the checkpoint in cdc_state table.
611
12
      SetCheckpoint(
612
12
          data.op_id.term, data.op_id.index, -1, "", time.read.ToUint64(), &checkpoint, nullptr);
613
12
      checkpoint_updated = true;
614
12
    } else {
615
      // Snapshot is already taken.
616
12
      HybridTime ht;
617
12
      time = ReadHybridTime::FromUint64(from_op_id.snapshot_time());
618
12
      nextKey = from_op_id.key();
619
12
      VLOG(1) << "The after snapshot term " << from_op_id.term() << "index  " << from_op_id.index()
620
0
              << "key " << from_op_id.key() << "snapshot time " << from_op_id.snapshot_time();
621
622
12
      Schema schema = *tablet_peer->tablet()->schema().get();
623
12
      int limit = FLAGS_cdc_snapshot_batch_size;
624
12
      int fetched = 0;
625
12
      std::vector<QLTableRow> rows;
626
12
      auto iter = VERIFY_RESULT(tablet_peer->tablet()->CreateCDCSnapshotIterator(
627
12
          schema.CopyWithoutColumnIds(), time, nextKey));
628
629
0
      QLTableRow row;
630
12
      SchemaToPB(*tablet_peer->tablet()->schema().get(), &schema_pb);
631
632
12
      proto_record = resp->add_cdc_sdk_proto_records();
633
12
      row_message = proto_record->mutable_row_message();
634
12
      row_message->set_op(RowMessage_Op_DDL);
635
12
      row_message->set_table(tablet_peer->tablet()->metadata()->table_name());
636
637
12
      FillDDLInfo(row_message, schema_pb, tablet_peer->tablet()->metadata()->schema_version());
638
639
25.5k
      while (VERIFY_RESULT(iter->HasNext()) && 
fetched < limit25.5k
) {
640
25.5k
        RETURN_NOT_OK(iter->NextRow(&row));
641
25.5k
        RETURN_NOT_OK(PopulateCDCSDKSnapshotRecord(resp, &row, schema, tablet_peer, time));
642
25.5k
        fetched++;
643
25.5k
      }
644
12
      docdb::SubDocKey sub_doc_key;
645
12
      RETURN_NOT_OK(iter->GetNextReadSubDocKey(&sub_doc_key));
646
647
      // Snapshot ends when next key is empty.
648
12
      if (sub_doc_key.doc_key().empty()) {
649
8
        VLOG
(1) << "Setting next sub doc key empty "0
;
650
        // Get the checkpoint or read the checkpoint from the table/cache.
651
8
        SetCheckpoint(from_op_id.term(), from_op_id.index(), 0, "", 0, &checkpoint, nullptr);
652
8
        checkpoint_updated = true;
653
8
      } else {
654
4
        VLOG
(1) << "Setting next sub doc key is " << sub_doc_key.Encode().ToStringBuffer()0
;
655
656
4
        checkpoint.set_write_id(-1);
657
4
        SetCheckpoint(
658
4
            from_op_id.term(), from_op_id.index(), -1, sub_doc_key.Encode().ToStringBuffer(),
659
4
            time.read.ToUint64(), &checkpoint, nullptr);
660
4
        checkpoint_updated = true;
661
4
      }
662
12
    }
663
617
  } else if (!from_op_id.key().empty() && 
from_op_id.write_id() != 00
) {
664
0
    std::string reverse_index_key = from_op_id.key();
665
0
    Slice reverse_index_key_slice(reverse_index_key);
666
0
    std::vector<docdb::IntentKeyValueForCDC> keyValueIntents;
667
0
    docdb::ApplyTransactionState stream_state;
668
0
    stream_state.key = from_op_id.key();
669
0
    stream_state.write_id = from_op_id.write_id();
670
671
0
    RETURN_NOT_OK(reverse_index_key_slice.consume_byte(docdb::ValueTypeAsChar::kTransactionId));
672
0
    auto transaction_id = VERIFY_RESULT(DecodeTransactionId(&reverse_index_key_slice));
673
674
0
    RETURN_NOT_OK(ProcessIntents(
675
0
        op_id, transaction_id, stream_metadata, resp, &consumption, &checkpoint, tablet_peer,
676
0
        &keyValueIntents, &stream_state, nullptr));
677
678
0
    if (checkpoint.write_id() == 0 && checkpoint.key().empty()) {
679
0
      last_streamed_op_id->term = checkpoint.term();
680
0
      last_streamed_op_id->index = checkpoint.index();
681
0
    }
682
0
    checkpoint_updated = true;
683
617
  } else {
684
617
    OpId checkpoint_op_id;
685
617
    RequestScope request_scope;
686
687
617
    auto read_ops = VERIFY_RESULT(tablet_peer->consensus()->ReadReplicatedMessagesForCDC(
688
617
        op_id, last_readable_opid_index, deadline));
689
690
617
    if (read_ops.read_from_disk_size && 
mem_tracker605
) {
691
605
      consumption = ScopedTrackedConsumption(mem_tracker, read_ops.read_from_disk_size);
692
605
    }
693
694
617
    auto txn_participant = tablet_peer->tablet()->transaction_participant();
695
617
    if (txn_participant) {
696
617
      request_scope = RequestScope(txn_participant);
697
617
    }
698
699
617
    Schema current_schema;
700
617
    bool pending_intents = false;
701
617
    bool schema_streamed = false;
702
703
8.61k
    for (const auto& msg : read_ops.messages) {
704
8.61k
      if (!schema_streamed && 
!(**cached_schema).initialized()6.21k
) {
705
313
        current_schema.CopyFrom(*tablet_peer->tablet()->schema().get());
706
313
        string table_name = tablet_peer->tablet()->metadata()->table_name();
707
313
        schema_streamed = true;
708
709
313
        proto_record = resp->add_cdc_sdk_proto_records();
710
313
        row_message = proto_record->mutable_row_message();
711
313
        row_message->set_op(RowMessage_Op_DDL);
712
313
        row_message->set_table(table_name);
713
714
313
        *cached_schema = std::make_shared<Schema>(std::move(current_schema));
715
313
        SchemaPB current_schema_pb;
716
313
        SchemaToPB(**cached_schema, &current_schema_pb);
717
313
        FillDDLInfo(row_message,
718
313
                    current_schema_pb,
719
313
                    tablet_peer->tablet()->metadata()->schema_version());
720
8.30k
      } else {
721
8.30k
        current_schema = **cached_schema;
722
8.30k
      }
723
724
8.61k
      const auto& batch = msg->write().write_batch();
725
726
8.61k
      switch (msg->op_type()) {
727
147
        case consensus::OperationType::UPDATE_TRANSACTION_OP:
728
          // Ignore intents.
729
          // Read from IntentDB after they have been applied.
730
147
          if (msg->transaction_state().status() == TransactionStatus::APPLYING) {
731
147
            auto txn_id =
732
147
                VERIFY_RESULT(FullyDecodeTransactionId(msg->transaction_state().transaction_id()));
733
0
            auto result = GetTransactionStatus(txn_id, tablet_peer->Now(), txn_participant);
734
147
            std::vector<docdb::IntentKeyValueForCDC> intents;
735
147
            docdb::ApplyTransactionState new_stream_state;
736
737
147
            *commit_timestamp = msg->transaction_state().commit_hybrid_time();
738
147
            op_id.term = msg->id().term();
739
147
            op_id.index = msg->id().index();
740
147
            RETURN_NOT_OK(ProcessIntents(
741
147
                op_id, txn_id, stream_metadata, resp, &consumption, &checkpoint, tablet_peer,
742
147
                &intents, &new_stream_state, &current_schema));
743
744
147
            if (new_stream_state.write_id != 0 && 
!new_stream_state.key.empty()0
) {
745
0
              pending_intents = true;
746
147
            } else {
747
147
              last_streamed_op_id->term = msg->id().term();
748
147
              last_streamed_op_id->index = msg->id().index();
749
147
            }
750
147
          }
751
147
          checkpoint_updated = true;
752
147
          break;
753
754
690
        case consensus::OperationType::WRITE_OP:
755
690
          if (!batch.has_transaction()) {
756
412
            RETURN_NOT_OK(
757
412
                PopulateCDCSDKWriteRecord(msg, stream_metadata, tablet_peer, resp, current_schema));
758
759
412
            SetCheckpoint(
760
412
                msg->id().term(), msg->id().index(), 0, "", 0, &checkpoint, last_streamed_op_id);
761
412
            checkpoint_updated = true;
762
412
          }
763
690
          break;
764
765
7.78k
        case consensus::OperationType::CHANGE_METADATA_OP: {
766
7.78k
          RETURN_NOT_OK(SchemaFromPB(msg->change_metadata_request().schema(), &current_schema));
767
7.78k
          string table_name = tablet_peer->tablet()->metadata()->table_name();
768
7.78k
          *cached_schema = std::make_shared<Schema>(std::move(current_schema));
769
7.78k
          if ((resp->cdc_sdk_proto_records_size() > 0 &&
770
7.78k
               resp->cdc_sdk_proto_records(resp->cdc_sdk_proto_records_size() - 1)
771
7.47k
                       .row_message()
772
7.47k
                       .op() == RowMessage_Op_DDL)) {
773
7.47k
            if ((resp->cdc_sdk_proto_records(resp->cdc_sdk_proto_records_size() - 1)
774
7.47k
                     .row_message()
775
7.47k
                     .schema_version() != msg->change_metadata_request().schema_version())) {
776
0
              RETURN_NOT_OK(PopulateCDCSDKDDLRecord(
777
0
                  msg, resp->add_cdc_sdk_proto_records(), table_name, current_schema));
778
0
            }
779
7.47k
          } else {
780
306
            RETURN_NOT_OK(PopulateCDCSDKDDLRecord(
781
306
                msg, resp->add_cdc_sdk_proto_records(), table_name, current_schema));
782
306
          }
783
7.78k
          SetCheckpoint(
784
7.78k
              msg->id().term(), msg->id().index(), 0, "", 0, &checkpoint, last_streamed_op_id);
785
7.78k
          checkpoint_updated = true;
786
7.78k
        }
787
0
        break;
788
789
0
        case consensus::OperationType::TRUNCATE_OP: {
790
0
          RETURN_NOT_OK(
791
0
              PopulateCDCSDKTruncateRecord(msg, resp->add_cdc_sdk_proto_records(), current_schema));
792
793
0
          SetCheckpoint(
794
0
              msg->id().term(), msg->id().index(), 0, "", 0, &checkpoint, last_streamed_op_id);
795
0
          checkpoint_updated = true;
796
0
        } break;
797
798
0
        default:
799
          // Nothing to do for other operation types.
800
0
          break;
801
8.61k
      }
802
803
8.61k
      if (pending_intents) 
break0
;
804
8.61k
    }
805
617
    if (read_ops.messages.size() > 0)
806
617
      *msgs_holder = consensus::ReplicateMsgsHolder(
807
617
          nullptr, std::move(read_ops.messages), std::move(consumption));
808
617
  }
809
810
641
  if (consumption) {
811
0
    consumption.Add(resp->SpaceUsedLong());
812
0
  }
813
814
641
  checkpoint_updated ? resp->mutable_cdc_sdk_checkpoint()->CopyFrom(checkpoint)
815
641
                       : 
resp->mutable_cdc_sdk_checkpoint()->CopyFrom(from_op_id)0
;
816
817
641
  if (checkpoint_updated) {
818
641
    VLOG
(1) << "The checkpoint is updated " << resp->checkpoint().DebugString()0
;
819
641
  } else {
820
0
    VLOG(1) << "The checkpoint is not updated " << resp->checkpoint().DebugString();
821
0
  }
822
823
641
  if (last_streamed_op_id->index > 0) {
824
617
    resp->mutable_checkpoint()->mutable_op_id()->set_term(last_streamed_op_id->term);
825
617
    resp->mutable_checkpoint()->mutable_op_id()->set_index(last_streamed_op_id->index);
826
617
  }
827
828
641
  return Status::OK();
829
641
}
830
831
}  // namespace cdc
832
}  // namespace yb