/Users/deen/code/yugabyte-db/ent/src/yb/cdc/cdcsdk_producer.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | |
13 | | #include "yb/cdc/cdc_producer.h" |
14 | | |
15 | | #include "yb/cdc/cdc_common_util.h" |
16 | | |
17 | | #include "yb/common/wire_protocol.h" |
18 | | #include "yb/common/ql_expr.h" |
19 | | |
20 | | #include "yb/docdb/docdb_util.h" |
21 | | #include "yb/docdb/doc_key.h" |
22 | | |
23 | | DEFINE_int32(cdc_snapshot_batch_size, 250, "Batch size for the snapshot operation in CDC"); |
24 | | |
25 | | namespace yb { |
26 | | namespace cdc { |
27 | | |
28 | | using consensus::ReplicateMsgPtr; |
29 | | using consensus::ReplicateMsgs; |
30 | | using docdb::PrimitiveValue; |
31 | | using tablet::TransactionParticipant; |
32 | | using yb::QLTableRow; |
33 | | |
34 | | YB_DEFINE_ENUM(OpType, (INSERT)(UPDATE)(DELETE)); |
35 | | |
36 | 378 | void SetOperation(RowMessage* row_message, OpType type, const Schema& schema) { |
37 | 378 | switch (type) { |
38 | 242 | case OpType::INSERT: |
39 | 242 | row_message->set_op(RowMessage_Op_INSERT); |
40 | 242 | break; |
41 | 69 | case OpType::UPDATE: |
42 | 69 | row_message->set_op(RowMessage_Op_UPDATE); |
43 | 69 | break; |
44 | 67 | case OpType::DELETE: |
45 | 67 | row_message->set_op(RowMessage_Op_DELETE); |
46 | 67 | break; |
47 | 378 | } |
48 | | |
49 | 378 | row_message->set_pgschema_name(schema.SchemaName()); |
50 | 378 | } |
51 | | |
52 | | void AddColumnToMap( |
53 | | const std::shared_ptr<tablet::TabletPeer>& tablet_peer, |
54 | | const ColumnSchema& col_schema, |
55 | | const docdb::PrimitiveValue& col, |
56 | 930 | DatumMessagePB* cdc_datum_message) { |
57 | 930 | cdc_datum_message->set_column_name(col_schema.name()); |
58 | 930 | QLValuePB ql_value; |
59 | 930 | if (tablet_peer->tablet()->table_type() == PGSQL_TABLE_TYPE) { |
60 | 930 | docdb::PrimitiveValue::ToQLValuePB(col, col_schema.type(), &ql_value); |
61 | 930 | if (!IsNull(ql_value) && col_schema.pg_type_oid() != 0 /*kInvalidOid*/) { |
62 | 885 | docdb::SetValueFromQLBinaryWrapper(ql_value, col_schema.pg_type_oid(), cdc_datum_message); |
63 | 45 | } else { |
64 | 45 | cdc_datum_message->set_column_type(col_schema.pg_type_oid()); |
65 | 45 | } |
66 | 930 | } |
67 | 930 | } |
68 | | |
69 | 447 | DatumMessagePB* AddTuple(RowMessage* row_message) { |
70 | 447 | if (!row_message) { |
71 | 0 | return nullptr; |
72 | 0 | } |
73 | 447 | DatumMessagePB* tuple = nullptr; |
74 | | |
75 | 447 | if (row_message->op() == RowMessage_Op_DELETE) { |
76 | 86 | tuple = row_message->add_old_tuple(); |
77 | 86 | row_message->add_new_tuple(); |
78 | 361 | } else { |
79 | 361 | tuple = row_message->add_new_tuple(); |
80 | 361 | row_message->add_old_tuple(); |
81 | 361 | } |
82 | 447 | return tuple; |
83 | 447 | } |
84 | | |
85 | | void AddPrimaryKey( |
86 | | const std::shared_ptr<tablet::TabletPeer>& tablet_peer, const docdb::SubDocKey& decoded_key, |
87 | 378 | const Schema& tablet_schema, RowMessage* row_message) { |
88 | 378 | size_t i = 0; |
89 | 378 | for (const auto& col : decoded_key.doc_key().hashed_group()) { |
90 | 378 | DatumMessagePB* tuple = AddTuple(row_message); |
91 | | |
92 | 378 | AddColumnToMap( |
93 | 378 | tablet_peer, tablet_schema.column(i), col, tuple); |
94 | 378 | i++; |
95 | 378 | } |
96 | | |
97 | 69 | for (const auto& col : decoded_key.doc_key().range_group()) { |
98 | 69 | DatumMessagePB* tuple = AddTuple(row_message); |
99 | | |
100 | 69 | AddColumnToMap( |
101 | 69 | tablet_peer, tablet_schema.column(i), col, tuple); |
102 | 69 | i++; |
103 | 69 | } |
104 | 378 | } |
105 | | |
106 | | void SetCDCSDKOpId( |
107 | | int64_t term, int64_t index, uint32_t write_id, const std::string& key, |
108 | 604 | CDCSDKOpIdPB* cdc_sdk_op_id_pb) { |
109 | 604 | cdc_sdk_op_id_pb->set_term(term); |
110 | 604 | cdc_sdk_op_id_pb->set_index(index); |
111 | 604 | cdc_sdk_op_id_pb->set_write_id(write_id); |
112 | 604 | cdc_sdk_op_id_pb->set_write_id_key(key); |
113 | 604 | } |
114 | | |
115 | | void SetCheckpoint( |
116 | | int64_t term, int64_t index, int32 write_id, const std::string& key, uint64 time, |
117 | 4.10k | CDCSDKCheckpointPB* cdc_sdk_checkpoint_pb, OpId* last_streamed_op_id) { |
118 | 4.10k | cdc_sdk_checkpoint_pb->set_term(term); |
119 | 4.10k | cdc_sdk_checkpoint_pb->set_index(index); |
120 | 4.10k | cdc_sdk_checkpoint_pb->set_write_id(write_id); |
121 | 4.10k | cdc_sdk_checkpoint_pb->set_key(key); |
122 | 4.10k | cdc_sdk_checkpoint_pb->set_snapshot_time(time); |
123 | 4.10k | if (last_streamed_op_id) { |
124 | 4.09k | last_streamed_op_id->term = term; |
125 | 4.09k | last_streamed_op_id->index = index; |
126 | 4.09k | } |
127 | 4.10k | } |
128 | | |
129 | | bool ShouldCreateNewProtoRecord( |
130 | 309 | const RowMessage& row_message, const Schema& schema, size_t col_count) { |
131 | 309 | return (row_message.op() == RowMessage_Op_INSERT && col_count == schema.num_columns()) || |
132 | 228 | (row_message.op() == RowMessage_Op_UPDATE || row_message.op() == RowMessage_Op_DELETE); |
133 | 309 | } |
134 | | |
135 | 309 | bool IsInsertOperation(const RowMessage& row_message) { |
136 | 309 | return row_message.op() == RowMessage_Op_INSERT; |
137 | 309 | } |
138 | | |
139 | 792 | bool IsInsertOrUpdate(const RowMessage& row_message) { |
140 | 792 | return row_message.IsInitialized() && |
141 | 792 | (row_message.op() == RowMessage_Op_INSERT |
142 | 136 | || row_message.op() == RowMessage_Op_UPDATE); |
143 | 792 | } |
144 | | |
145 | | void MakeNewProtoRecord( |
146 | | const docdb::IntentKeyValueForCDC& intent, const OpId& op_id, const RowMessage& row_message, |
147 | | const Schema& schema, size_t col_count, CDCSDKProtoRecordPB* proto_record, |
148 | 309 | GetChangesResponsePB* resp, IntraTxnWriteId* write_id, std::string* reverse_index_key) { |
149 | 309 | if (ShouldCreateNewProtoRecord(row_message, schema, col_count)) { |
150 | 173 | CDCSDKOpIdPB* cdc_sdk_op_id_pb = proto_record->mutable_cdc_sdk_op_id(); |
151 | 173 | SetCDCSDKOpId( |
152 | 173 | op_id.term, op_id.index, intent.write_id, intent.reverse_index_key, cdc_sdk_op_id_pb); |
153 | | |
154 | 173 | CDCSDKProtoRecordPB* record_to_be_added = resp->add_cdc_sdk_proto_records(); |
155 | 173 | record_to_be_added->CopyFrom(*proto_record); |
156 | 173 | record_to_be_added->mutable_row_message()->CopyFrom(row_message); |
157 | | |
158 | 173 | *write_id = intent.write_id; |
159 | 173 | *reverse_index_key = intent.reverse_index_key; |
160 | 173 | } |
161 | 309 | } |
162 | | // Populate CDC record corresponding to WAL batch in ReplicateMsg. |
163 | | CHECKED_STATUS PopulateCDCSDKIntentRecord( |
164 | | const OpId& op_id, |
165 | | const TransactionId& transaction_id, |
166 | | const std::vector<docdb::IntentKeyValueForCDC>& intents, |
167 | | const StreamMetadata& metadata, |
168 | | const std::shared_ptr<tablet::TabletPeer>& tablet_peer, |
169 | | GetChangesResponsePB* resp, |
170 | | ScopedTrackedConsumption* consumption, |
171 | | IntraTxnWriteId* write_id, |
172 | | std::string* reverse_index_key, |
173 | 74 | Schema* old_schema) { |
174 | 74 | Schema& schema = old_schema ? *old_schema : *tablet_peer->tablet()->schema(); |
175 | 74 | Slice prev_key; |
176 | 74 | CDCSDKProtoRecordPB proto_record; |
177 | 74 | RowMessage* row_message = proto_record.mutable_row_message(); |
178 | 74 | size_t col_count = 0; |
179 | 309 | for (const auto& intent : intents) { |
180 | 309 | Slice key(intent.key_buf); |
181 | 309 | Slice value(intent.value_buf); |
182 | 309 | const auto key_size = |
183 | 309 | VERIFY_RESULT(docdb::DocKey::EncodedSize(key, docdb::DocKeyPart::kWholeDocKey)); |
184 | | |
185 | 309 | docdb::PrimitiveValue column_id; |
186 | 309 | boost::optional<docdb::PrimitiveValue> column_id_opt; |
187 | 309 | Slice key_column = key.WithoutPrefix(key_size); |
188 | 309 | if (!key_column.empty()) { |
189 | 268 | RETURN_NOT_OK(docdb::SubDocument::DecodeKey(&key_column, &column_id)); |
190 | 268 | column_id_opt = boost::optional<docdb::PrimitiveValue>(column_id); |
191 | 268 | } |
192 | | |
193 | 309 | Slice sub_doc_key = key; |
194 | 309 | docdb::SubDocKey decoded_key; |
195 | 309 | RETURN_NOT_OK(decoded_key.DecodeFrom(&sub_doc_key, docdb::HybridTimeRequired::kFalse)); |
196 | | |
197 | 309 | docdb::Value decoded_value; |
198 | 309 | RETURN_NOT_OK(decoded_value.Decode(value)); |
199 | | |
200 | 309 | if (column_id_opt && column_id_opt->value_type() == docdb::ValueType::kColumnId && |
201 | 187 | schema.is_key_column(column_id_opt->GetColumnId())) { |
202 | 0 | *write_id = intent.write_id; |
203 | 0 | *reverse_index_key = intent.reverse_index_key; |
204 | 0 | continue; |
205 | 0 | } |
206 | | |
207 | 309 | if (*consumption) { |
208 | 307 | consumption->Add(key.size()); |
209 | 307 | } |
210 | | |
211 | | // Compare key hash with previously seen key hash to determine whether the write pair |
212 | | // is part of the same row or not. |
213 | 309 | Slice primary_key(key.data(), key_size); |
214 | 309 | if (prev_key != primary_key || col_count >= schema.num_columns()) { |
215 | 173 | proto_record.Clear(); |
216 | 173 | row_message->Clear(); |
217 | | |
218 | | // Check whether operation is WRITE or DELETE. |
219 | 173 | if (decoded_value.value_type() == docdb::ValueType::kTombstone && |
220 | 41 | decoded_key.num_subkeys() == 0) { |
221 | 41 | SetOperation(row_message, OpType::DELETE, schema); |
222 | 41 | *write_id = intent.write_id; |
223 | 132 | } else { |
224 | 132 | if (column_id_opt && |
225 | 132 | column_id_opt->value_type() == docdb::ValueType::kSystemColumnId && |
226 | 81 | decoded_value.value_type() == docdb::ValueType::kNullLow) { |
227 | 81 | SetOperation(row_message, OpType::INSERT, schema); |
228 | 81 | col_count = schema.num_key_columns() - 1; |
229 | 51 | } else { |
230 | 51 | SetOperation(row_message, OpType::UPDATE, schema); |
231 | 51 | col_count = schema.num_columns(); |
232 | 51 | *write_id = intent.write_id; |
233 | 51 | } |
234 | 132 | } |
235 | | |
236 | | // Write pair contains record for different row. Create a new CDCRecord in this case. |
237 | 173 | row_message->set_transaction_id(transaction_id.ToString()); |
238 | 173 | AddPrimaryKey(tablet_peer, decoded_key, schema, row_message); |
239 | 173 | } |
240 | | |
241 | 309 | if (IsInsertOperation(*row_message)) { |
242 | 217 | ++col_count; |
243 | 217 | } |
244 | | |
245 | 309 | prev_key = primary_key; |
246 | 309 | if (IsInsertOrUpdate(*row_message)) { |
247 | 268 | if (column_id_opt && column_id_opt->value_type() == docdb::ValueType::kColumnId) { |
248 | 187 | const ColumnSchema& col = VERIFY_RESULT(schema.column_by_id(column_id_opt->GetColumnId())); |
249 | | |
250 | 187 | AddColumnToMap( |
251 | 187 | tablet_peer, col, decoded_value.primitive_value(), row_message->add_new_tuple()); |
252 | 187 | row_message->add_old_tuple(); |
253 | | |
254 | 81 | } else if ( |
255 | 81 | column_id_opt && column_id_opt->value_type() != docdb::ValueType::kSystemColumnId) { |
256 | 0 | LOG(DFATAL) << "Unexpected value type in key: " << column_id_opt->value_type() |
257 | 0 | << " key: " << decoded_key.ToString() |
258 | 0 | << " value: " << decoded_value.primitive_value(); |
259 | 0 | } |
260 | 268 | } |
261 | 309 | row_message->set_table(tablet_peer->tablet()->metadata()->table_name()); |
262 | 309 | MakeNewProtoRecord( |
263 | 309 | intent, op_id, *row_message, schema, col_count, &proto_record, resp, write_id, |
264 | 309 | reverse_index_key); |
265 | 309 | } |
266 | | |
267 | 74 | return Status::OK(); |
268 | 74 | } |
269 | | |
270 | | // Populate CDC record corresponding to WAL batch in ReplicateMsg. |
271 | | CHECKED_STATUS PopulateCDCSDKWriteRecord( |
272 | | const ReplicateMsgPtr& msg, |
273 | | const StreamMetadata& metadata, |
274 | | const std::shared_ptr<tablet::TabletPeer>& tablet_peer, |
275 | | GetChangesResponsePB* resp, |
276 | 205 | const Schema& schema) { |
277 | 205 | const auto& batch = msg->write().write_batch(); |
278 | 205 | CDCSDKProtoRecordPB* proto_record = nullptr; |
279 | 205 | RowMessage* row_message = nullptr; |
280 | | |
281 | | // Write batch may contain records from different rows. |
282 | | // For CDC, we need to split the batch into 1 CDC record per row of the table. |
283 | | // We'll use DocDB key hash to identify the records that belong to the same row. |
284 | 205 | Slice prev_key; |
285 | | |
286 | 483 | for (const auto& write_pair : batch.write_pairs()) { |
287 | 483 | Slice key = write_pair.key(); |
288 | 483 | const auto key_size = |
289 | 483 | VERIFY_RESULT(docdb::DocKey::EncodedSize(key, docdb::DocKeyPart::kWholeDocKey)); |
290 | | |
291 | 483 | Slice value = write_pair.value(); |
292 | 483 | docdb::Value decoded_value; |
293 | 483 | RETURN_NOT_OK(decoded_value.Decode(value)); |
294 | | |
295 | | // Compare key hash with previously seen key hash to determine whether the write pair |
296 | | // is part of the same row or not. |
297 | 483 | Slice primary_key(key.data(), key_size); |
298 | 483 | if (prev_key != primary_key) { |
299 | | // Write pair contains record for different row. Create a new CDCRecord in this case. |
300 | 205 | proto_record = resp->add_cdc_sdk_proto_records(); |
301 | 205 | row_message = proto_record->mutable_row_message(); |
302 | 205 | row_message->set_pgschema_name(schema.SchemaName()); |
303 | 205 | row_message->set_table(tablet_peer->tablet()->metadata()->table_name()); |
304 | | |
305 | 205 | CDCSDKOpIdPB* cdc_sdk_op_id_pb = proto_record->mutable_cdc_sdk_op_id(); |
306 | 205 | SetCDCSDKOpId(msg->id().term(), msg->id().index(), 0, "", cdc_sdk_op_id_pb); |
307 | | |
308 | 205 | Slice sub_doc_key = key; |
309 | 205 | docdb::SubDocKey decoded_key; |
310 | 205 | RETURN_NOT_OK(decoded_key.DecodeFrom(&sub_doc_key, docdb::HybridTimeRequired::kFalse)); |
311 | | |
312 | | // Check whether operation is WRITE or DELETE. |
313 | 205 | if (decoded_value.value_type() == docdb::ValueType::kTombstone && |
314 | 26 | decoded_key.num_subkeys() == 0) { |
315 | 26 | SetOperation(row_message, OpType::DELETE, schema); |
316 | 179 | } else { |
317 | 179 | docdb::PrimitiveValue column_id; |
318 | 179 | Slice key_column(key.WithoutPrefix(key_size)); |
319 | 179 | RETURN_NOT_OK(docdb::SubDocument::DecodeKey(&key_column, &column_id)); |
320 | | |
321 | 179 | if (column_id.value_type() == docdb::ValueType::kSystemColumnId && |
322 | 161 | decoded_value.value_type() == docdb::ValueType::kNullLow) { |
323 | 161 | SetOperation(row_message, OpType::INSERT, schema); |
324 | 18 | } else { |
325 | 18 | SetOperation(row_message, OpType::UPDATE, schema); |
326 | 18 | } |
327 | 179 | } |
328 | | |
329 | 205 | AddPrimaryKey(tablet_peer, decoded_key, schema, row_message); |
330 | | |
331 | | // Process intent records. |
332 | 205 | row_message->set_commit_time(msg->hybrid_time()); |
333 | 205 | } |
334 | 483 | prev_key = primary_key; |
335 | 483 | DCHECK(proto_record); |
336 | | |
337 | 483 | if (IsInsertOrUpdate(*row_message)) { |
338 | 457 | docdb::PrimitiveValue column_id; |
339 | 457 | Slice key_column((const char*)(key.data() + key_size)); |
340 | 457 | RETURN_NOT_OK(docdb::SubDocument::DecodeKey(&key_column, &column_id)); |
341 | 457 | if (column_id.value_type() == docdb::ValueType::kColumnId) { |
342 | 296 | const ColumnSchema& col = VERIFY_RESULT(schema.column_by_id(column_id.GetColumnId())); |
343 | | |
344 | 296 | AddColumnToMap( |
345 | 296 | tablet_peer, col, decoded_value.primitive_value(), row_message->add_new_tuple()); |
346 | 296 | row_message->add_old_tuple(); |
347 | | |
348 | 161 | } else if (column_id.value_type() != docdb::ValueType::kSystemColumnId) { |
349 | 0 | LOG(DFATAL) << "Unexpected value type in key: " << column_id.value_type(); |
350 | 0 | } |
351 | 457 | } |
352 | 483 | } |
353 | | |
354 | 205 | return Status::OK(); |
355 | 205 | } |
356 | | |
357 | | void SetTableProperties( |
358 | | const TablePropertiesPB* table_properties, |
359 | 314 | CDCSDKTablePropertiesPB* cdc_sdk_table_properties_pb) { |
360 | 314 | cdc_sdk_table_properties_pb->set_default_time_to_live(table_properties->default_time_to_live()); |
361 | 314 | cdc_sdk_table_properties_pb->set_num_tablets(table_properties->num_tablets()); |
362 | 314 | cdc_sdk_table_properties_pb->set_is_ysql_catalog_table(table_properties->is_ysql_catalog_table()); |
363 | 314 | } |
364 | | |
365 | 883 | void SetColumnInfo(const ColumnSchemaPB& column, CDCSDKColumnInfoPB* column_info) { |
366 | 883 | column_info->set_name(column.name()); |
367 | 883 | column_info->mutable_type()->CopyFrom(column.type()); |
368 | 883 | column_info->set_is_key(column.is_key()); |
369 | 883 | column_info->set_is_hash_key(column.is_hash_key()); |
370 | 883 | column_info->set_is_nullable(column.is_nullable()); |
371 | 883 | column_info->set_oid(column.pg_type_oid()); |
372 | 883 | } |
373 | | |
374 | | CHECKED_STATUS PopulateCDCSDKDDLRecord( |
375 | | const ReplicateMsgPtr& msg, CDCSDKProtoRecordPB* proto_record, const string& table_name, |
376 | 152 | const Schema& schema) { |
377 | 152 | SCHECK( |
378 | 152 | msg->has_change_metadata_request(), InvalidArgument, |
379 | 152 | Format( |
380 | 152 | "Change metadata (DDL) message requires metadata information: $0", |
381 | 152 | msg->ShortDebugString())); |
382 | | |
383 | 152 | RowMessage* row_message = nullptr; |
384 | | |
385 | 152 | row_message = proto_record->mutable_row_message(); |
386 | 152 | row_message->set_op(RowMessage_Op_DDL); |
387 | 152 | row_message->set_table(table_name); |
388 | | |
389 | 152 | CDCSDKOpIdPB* cdc_sdk_op_id_pb = proto_record->mutable_cdc_sdk_op_id(); |
390 | 152 | SetCDCSDKOpId(msg->id().term(), msg->id().index(), 0, "", cdc_sdk_op_id_pb); |
391 | | |
392 | 425 | for (const auto& column : msg->change_metadata_request().schema().columns()) { |
393 | 425 | CDCSDKColumnInfoPB* column_info = nullptr; |
394 | 425 | column_info = row_message->mutable_schema()->add_column_info(); |
395 | 425 | SetColumnInfo(column, column_info); |
396 | 425 | } |
397 | | |
398 | 152 | CDCSDKTablePropertiesPB* cdc_sdk_table_properties_pb; |
399 | 152 | const TablePropertiesPB* table_properties = |
400 | 152 | &(msg->change_metadata_request().schema().table_properties()); |
401 | | |
402 | 152 | cdc_sdk_table_properties_pb = row_message->mutable_schema()->mutable_tab_info(); |
403 | 152 | row_message->set_schema_version(msg->change_metadata_request().schema_version()); |
404 | 152 | row_message->set_new_table_name(msg->change_metadata_request().new_table_name()); |
405 | 152 | row_message->set_pgschema_name(schema.SchemaName()); |
406 | 152 | SetTableProperties(table_properties, cdc_sdk_table_properties_pb); |
407 | | |
408 | 152 | return Status::OK(); |
409 | 152 | } |
410 | | |
411 | | CHECKED_STATUS PopulateCDCSDKTruncateRecord( |
412 | 0 | const ReplicateMsgPtr& msg, CDCSDKProtoRecordPB* proto_record, const Schema& schema) { |
413 | 0 | SCHECK( |
414 | 0 | msg->has_truncate(), InvalidArgument, |
415 | 0 | Format( |
416 | 0 | "Truncate message requires truncate request information: $0", msg->ShortDebugString())); |
417 | |
|
418 | 0 | RowMessage* row_message = nullptr; |
419 | |
|
420 | 0 | row_message = proto_record->mutable_row_message(); |
421 | 0 | row_message->set_op(RowMessage_Op_TRUNCATE); |
422 | 0 | row_message->set_pgschema_name(schema.SchemaName()); |
423 | 0 | row_message->mutable_truncate_request_info()->CopyFrom(msg->truncate()); |
424 | |
|
425 | 0 | CDCSDKOpIdPB* cdc_sdk_op_id_pb; |
426 | |
|
427 | 0 | cdc_sdk_op_id_pb = proto_record->mutable_cdc_sdk_op_id(); |
428 | 0 | SetCDCSDKOpId(msg->id().term(), msg->id().index(), 0, "", cdc_sdk_op_id_pb); |
429 | |
|
430 | 0 | return Status::OK(); |
431 | 0 | } |
432 | | |
433 | 74 | void SetTermIndex(int64_t term, int64_t index, CDCSDKCheckpointPB* checkpoint) { |
434 | 74 | checkpoint->set_term(term); |
435 | 74 | checkpoint->set_index(index); |
436 | 74 | } |
437 | | |
438 | 74 | void SetKeyWriteId(string key, int32_t write_id, CDCSDKCheckpointPB* checkpoint) { |
439 | 74 | checkpoint->set_key(key); |
440 | 74 | checkpoint->set_write_id(write_id); |
441 | 74 | } |
442 | | |
443 | | CHECKED_STATUS ProcessIntents( |
444 | | const OpId& op_id, |
445 | | const TransactionId& transaction_id, |
446 | | const StreamMetadata& metadata, |
447 | | GetChangesResponsePB* resp, |
448 | | ScopedTrackedConsumption* consumption, |
449 | | CDCSDKCheckpointPB* checkpoint, |
450 | | const std::shared_ptr<tablet::TabletPeer>& tablet_peer, |
451 | | std::vector<docdb::IntentKeyValueForCDC>* keyValueIntents, |
452 | | docdb::ApplyTransactionState* stream_state, |
453 | 74 | Schema* schema) { |
454 | 74 | if (stream_state->key.empty() && stream_state->write_id == 0) { |
455 | 74 | CDCSDKProtoRecordPB* proto_record = resp->add_cdc_sdk_proto_records(); |
456 | 74 | RowMessage* row_message = proto_record->mutable_row_message(); |
457 | 74 | row_message->set_op(RowMessage_Op_BEGIN); |
458 | 74 | row_message->set_transaction_id(transaction_id.ToString()); |
459 | 74 | row_message->set_table(tablet_peer->tablet()->metadata()->table_name()); |
460 | 74 | } |
461 | | |
462 | 74 | auto tablet = tablet_peer->shared_tablet(); |
463 | 74 | RETURN_NOT_OK(tablet->GetIntents(transaction_id, keyValueIntents, stream_state)); |
464 | | |
465 | 309 | for (auto& keyValue : *keyValueIntents) { |
466 | 309 | docdb::SubDocKey sub_doc_key; |
467 | 309 | CHECK_OK( |
468 | 309 | sub_doc_key.FullyDecodeFrom(Slice(keyValue.key_buf), docdb::HybridTimeRequired::kFalse)); |
469 | 309 | docdb::Value decoded_value; |
470 | 309 | RETURN_NOT_OK(decoded_value.Decode(Slice(keyValue.value_buf))); |
471 | 309 | } |
472 | | |
473 | 74 | std::string reverse_index_key; |
474 | 74 | IntraTxnWriteId write_id = 0; |
475 | | |
476 | | // Need to populate the CDCSDKRecords |
477 | 74 | RETURN_NOT_OK(PopulateCDCSDKIntentRecord( |
478 | 74 | op_id, transaction_id, *keyValueIntents, metadata, tablet_peer, resp, consumption, &write_id, |
479 | 74 | &reverse_index_key, schema)); |
480 | | |
481 | 74 | SetTermIndex(op_id.term, op_id.index, checkpoint); |
482 | | |
483 | 74 | if (stream_state->key.empty() && stream_state->write_id == 0) { |
484 | 74 | CDCSDKProtoRecordPB* proto_record = resp->add_cdc_sdk_proto_records(); |
485 | 74 | RowMessage* row_message = proto_record->mutable_row_message(); |
486 | | |
487 | 74 | row_message->set_op(RowMessage_Op_COMMIT); |
488 | 74 | row_message->set_transaction_id(transaction_id.ToString()); |
489 | 74 | row_message->set_table(tablet_peer->tablet()->metadata()->table_name()); |
490 | | |
491 | 74 | CDCSDKOpIdPB* cdc_sdk_op_id_pb = proto_record->mutable_cdc_sdk_op_id(); |
492 | 74 | SetCDCSDKOpId(op_id.term, op_id.index, 0, "", cdc_sdk_op_id_pb); |
493 | 74 | SetKeyWriteId("", 0, checkpoint); |
494 | 0 | } else { |
495 | 0 | SetKeyWriteId(reverse_index_key, write_id, checkpoint); |
496 | 0 | } |
497 | | |
498 | 74 | return Status::OK(); |
499 | 74 | } |
500 | | |
501 | | CHECKED_STATUS PopulateCDCSDKSnapshotRecord( |
502 | | GetChangesResponsePB* resp, |
503 | | const QLTableRow* row, |
504 | | const Schema& schema, |
505 | | const std::shared_ptr<tablet::TabletPeer>& tablet_peer, |
506 | 12.7k | ReadHybridTime time) { |
507 | 12.7k | CDCSDKProtoRecordPB* proto_record = nullptr; |
508 | 12.7k | RowMessage* row_message = nullptr; |
509 | 12.7k | string table_name = tablet_peer->tablet()->metadata()->table_name(); |
510 | | |
511 | 12.7k | proto_record = resp->add_cdc_sdk_proto_records(); |
512 | 12.7k | row_message = proto_record->mutable_row_message(); |
513 | 12.7k | row_message->set_table(table_name); |
514 | 12.7k | row_message->set_op(RowMessage_Op_READ); |
515 | 12.7k | row_message->set_pgschema_name(schema.SchemaName()); |
516 | 12.7k | row_message->set_commit_time(time.read.ToUint64()); |
517 | | |
518 | 12.7k | DatumMessagePB* cdc_datum_message = nullptr; |
519 | | |
520 | 51.0k | for (size_t col_idx = 0; col_idx < schema.num_columns(); col_idx++) { |
521 | 38.2k | ColumnId col_id = schema.column_id(col_idx); |
522 | 38.2k | const auto* value = row->GetColumn(col_id); |
523 | 38.2k | const ColumnSchema& col_schema = VERIFY_RESULT(schema.column_by_id(col_id)); |
524 | | |
525 | 38.2k | cdc_datum_message = row_message->add_new_tuple(); |
526 | 38.2k | cdc_datum_message->set_column_name(col_schema.name()); |
527 | | |
528 | 38.2k | if (value && value->value_case() != QLValuePB::VALUE_NOT_SET |
529 | 38.2k | && col_schema.pg_type_oid() != 0 /*kInvalidOid*/) { |
530 | 38.2k | docdb::SetValueFromQLBinaryWrapper(*value, col_schema.pg_type_oid(), cdc_datum_message); |
531 | 0 | } else { |
532 | 0 | cdc_datum_message->set_column_type(col_schema.pg_type_oid()); |
533 | 0 | } |
534 | | |
535 | 38.2k | row_message->add_old_tuple(); |
536 | 38.2k | } |
537 | | |
538 | 12.7k | return Status::OK(); |
539 | 12.7k | } |
540 | | |
541 | 162 | void FillDDLInfo(RowMessage* row_message, const SchemaPB& schema, const uint32_t schema_version) { |
542 | 458 | for (const auto& column : schema.columns()) { |
543 | 458 | CDCSDKColumnInfoPB* column_info; |
544 | 458 | column_info = row_message->mutable_schema()->add_column_info(); |
545 | 458 | SetColumnInfo(column, column_info); |
546 | 458 | } |
547 | | |
548 | 162 | row_message->set_schema_version(schema_version); |
549 | 162 | row_message->set_pgschema_name(schema.pgschema_name()); |
550 | 162 | CDCSDKTablePropertiesPB* cdc_sdk_table_properties_pb = |
551 | 162 | row_message->mutable_schema()->mutable_tab_info(); |
552 | | |
553 | 162 | const TablePropertiesPB* table_properties = &(schema.table_properties()); |
554 | 162 | SetTableProperties(table_properties, cdc_sdk_table_properties_pb); |
555 | 162 | } |
556 | | |
557 | | // CDC get changes is different from 2DC as it doesn't need |
558 | | // to read intents from WAL. |
559 | | |
560 | | Status GetChangesForCDCSDK( |
561 | | const std::string& stream_id, |
562 | | const std::string& tablet_id, |
563 | | const CDCSDKCheckpointPB& from_op_id, |
564 | | const StreamMetadata& stream_metadata, |
565 | | const std::shared_ptr<tablet::TabletPeer>& tablet_peer, |
566 | | const MemTrackerPtr& mem_tracker, |
567 | | consensus::ReplicateMsgsHolder* msgs_holder, |
568 | | GetChangesResponsePB* resp, |
569 | | std::string* commit_timestamp, |
570 | | std::shared_ptr<Schema>* cached_schema, |
571 | | OpId* last_streamed_op_id, |
572 | | int64_t* last_readable_opid_index, |
573 | 319 | const CoarseTimePoint deadline) { |
574 | 319 | OpId op_id{from_op_id.term(), from_op_id.index()}; |
575 | 319 | ScopedTrackedConsumption consumption; |
576 | 319 | CDCSDKProtoRecordPB* proto_record = nullptr; |
577 | 319 | RowMessage* row_message = nullptr; |
578 | 319 | CDCSDKCheckpointPB checkpoint; |
579 | 319 | bool checkpoint_updated = false; |
580 | | |
581 | | // It is snapshot call. |
582 | 319 | if (from_op_id.write_id() == -1) { |
583 | 12 | auto txn_participant = tablet_peer->tablet()->transaction_participant(); |
584 | 12 | tablet::RemoveIntentsData data; |
585 | 12 | ReadHybridTime time; |
586 | 12 | std::string nextKey; |
587 | 12 | SchemaPB schema_pb; |
588 | | // It is first call in snapshot then take snapshot. |
589 | 12 | if ((from_op_id.key().empty()) && (from_op_id.snapshot_time() == 0)) { |
590 | 6 | if (txn_participant == nullptr || txn_participant->context() == nullptr) |
591 | 0 | return STATUS_SUBSTITUTE( |
592 | 6 | Corruption, "Cannot read data as the transaction participant context is null"); |
593 | 6 | txn_participant->context()->GetLastReplicatedData(&data); |
594 | | // Set the checkpoint and communicate to the follower. |
595 | 0 | VLOG(1) << "The first snapshot term " << data.op_id.term << "index " << data.op_id.index |
596 | 0 | << "time " << data.log_ht.ToUint64(); |
597 | | // Update the CDCConsumerOpId. |
598 | 6 | { |
599 | 6 | std::shared_ptr<consensus::Consensus> shared_consensus = tablet_peer->shared_consensus(); |
600 | 6 | shared_consensus->UpdateCDCConsumerOpId(data.op_id); |
601 | 6 | } |
602 | 6 | if (txn_participant == nullptr || txn_participant->context() == nullptr) |
603 | 0 | return STATUS_SUBSTITUTE( |
604 | 6 | Corruption, "Cannot read data as the transaction participant context is null"); |
605 | 6 | txn_participant->context()->GetLastReplicatedData(&data); |
606 | 6 | time = ReadHybridTime::SingleTime(data.log_ht); |
607 | | |
608 | | // This should go to cdc_state table. |
609 | | // Below condition update the checkpoint in cdc_state table. |
610 | 6 | SetCheckpoint( |
611 | 6 | data.op_id.term, data.op_id.index, -1, "", time.read.ToUint64(), &checkpoint, nullptr); |
612 | 6 | checkpoint_updated = true; |
613 | 6 | } else { |
614 | | // Snapshot is already taken. |
615 | 6 | HybridTime ht; |
616 | 6 | time = ReadHybridTime::FromUint64(from_op_id.snapshot_time()); |
617 | 6 | nextKey = from_op_id.key(); |
618 | 0 | VLOG(1) << "The after snapshot term " << from_op_id.term() << "index " << from_op_id.index() |
619 | 0 | << "key " << from_op_id.key() << "snapshot time " << from_op_id.snapshot_time(); |
620 | | |
621 | 6 | Schema schema = *tablet_peer->tablet()->schema().get(); |
622 | 6 | int limit = FLAGS_cdc_snapshot_batch_size; |
623 | 6 | int fetched = 0; |
624 | 6 | std::vector<QLTableRow> rows; |
625 | 6 | auto iter = VERIFY_RESULT(tablet_peer->tablet()->CreateCDCSnapshotIterator( |
626 | 6 | schema.CopyWithoutColumnIds(), time, nextKey)); |
627 | | |
628 | 6 | QLTableRow row; |
629 | 6 | SchemaToPB(*tablet_peer->tablet()->schema().get(), &schema_pb); |
630 | | |
631 | 6 | proto_record = resp->add_cdc_sdk_proto_records(); |
632 | 6 | row_message = proto_record->mutable_row_message(); |
633 | 6 | row_message->set_op(RowMessage_Op_DDL); |
634 | 6 | row_message->set_table(tablet_peer->tablet()->metadata()->table_name()); |
635 | | |
636 | 6 | FillDDLInfo(row_message, schema_pb, tablet_peer->tablet()->metadata()->schema_version()); |
637 | | |
638 | 12.7k | while (VERIFY_RESULT(iter->HasNext()) && fetched < limit) { |
639 | 12.7k | RETURN_NOT_OK(iter->NextRow(&row)); |
640 | 12.7k | RETURN_NOT_OK(PopulateCDCSDKSnapshotRecord(resp, &row, schema, tablet_peer, time)); |
641 | 12.7k | fetched++; |
642 | 12.7k | } |
643 | 6 | docdb::SubDocKey sub_doc_key; |
644 | 6 | RETURN_NOT_OK(iter->GetNextReadSubDocKey(&sub_doc_key)); |
645 | | |
646 | | // Snapshot ends when next key is empty. |
647 | 6 | if (sub_doc_key.doc_key().empty()) { |
648 | 0 | VLOG(1) << "Setting next sub doc key empty "; |
649 | | // Get the checkpoint or read the checkpoint from the table/cache. |
650 | 4 | SetCheckpoint(from_op_id.term(), from_op_id.index(), 0, "", 0, &checkpoint, nullptr); |
651 | 4 | checkpoint_updated = true; |
652 | 2 | } else { |
653 | 0 | VLOG(1) << "Setting next sub doc key is " << sub_doc_key.Encode().ToStringBuffer(); |
654 | | |
655 | 2 | checkpoint.set_write_id(-1); |
656 | 2 | SetCheckpoint( |
657 | 2 | from_op_id.term(), from_op_id.index(), -1, sub_doc_key.Encode().ToStringBuffer(), |
658 | 2 | time.read.ToUint64(), &checkpoint, nullptr); |
659 | 2 | checkpoint_updated = true; |
660 | 2 | } |
661 | 6 | } |
662 | 307 | } else if (!from_op_id.key().empty() && from_op_id.write_id() != 0) { |
663 | 0 | std::string reverse_index_key = from_op_id.key(); |
664 | 0 | Slice reverse_index_key_slice(reverse_index_key); |
665 | 0 | std::vector<docdb::IntentKeyValueForCDC> keyValueIntents; |
666 | 0 | docdb::ApplyTransactionState stream_state; |
667 | 0 | stream_state.key = from_op_id.key(); |
668 | 0 | stream_state.write_id = from_op_id.write_id(); |
669 | |
|
670 | 0 | RETURN_NOT_OK(reverse_index_key_slice.consume_byte(docdb::ValueTypeAsChar::kTransactionId)); |
671 | 0 | auto transaction_id = VERIFY_RESULT(DecodeTransactionId(&reverse_index_key_slice)); |
672 | |
|
673 | 0 | RETURN_NOT_OK(ProcessIntents( |
674 | 0 | op_id, transaction_id, stream_metadata, resp, &consumption, &checkpoint, tablet_peer, |
675 | 0 | &keyValueIntents, &stream_state, nullptr)); |
676 | |
|
677 | 0 | if (checkpoint.write_id() == 0 && checkpoint.key().empty()) { |
678 | 0 | last_streamed_op_id->term = checkpoint.term(); |
679 | 0 | last_streamed_op_id->index = checkpoint.index(); |
680 | 0 | } |
681 | 0 | checkpoint_updated = true; |
682 | 307 | } else { |
683 | 307 | OpId checkpoint_op_id; |
684 | 307 | RequestScope request_scope; |
685 | | |
686 | 307 | auto read_ops = VERIFY_RESULT(tablet_peer->consensus()->ReadReplicatedMessagesForCDC( |
687 | 307 | op_id, last_readable_opid_index, deadline)); |
688 | | |
689 | 307 | if (read_ops.read_from_disk_size && mem_tracker) { |
690 | 301 | consumption = ScopedTrackedConsumption(mem_tracker, read_ops.read_from_disk_size); |
691 | 301 | } |
692 | | |
693 | 307 | auto txn_participant = tablet_peer->tablet()->transaction_participant(); |
694 | 307 | if (txn_participant) { |
695 | 307 | request_scope = RequestScope(txn_participant); |
696 | 307 | } |
697 | | |
698 | 307 | Schema current_schema; |
699 | 307 | bool pending_intents = false; |
700 | 307 | bool schema_streamed = false; |
701 | | |
702 | 4.30k | for (const auto& msg : read_ops.messages) { |
703 | 4.30k | if (!schema_streamed && !(**cached_schema).initialized()) { |
704 | 156 | current_schema.CopyFrom(*tablet_peer->tablet()->schema().get()); |
705 | 156 | string table_name = tablet_peer->tablet()->metadata()->table_name(); |
706 | 156 | schema_streamed = true; |
707 | | |
708 | 156 | proto_record = resp->add_cdc_sdk_proto_records(); |
709 | 156 | row_message = proto_record->mutable_row_message(); |
710 | 156 | row_message->set_op(RowMessage_Op_DDL); |
711 | 156 | row_message->set_table(table_name); |
712 | | |
713 | 156 | *cached_schema = std::make_shared<Schema>(std::move(current_schema)); |
714 | 156 | SchemaPB current_schema_pb; |
715 | 156 | SchemaToPB(**cached_schema, ¤t_schema_pb); |
716 | 156 | FillDDLInfo(row_message, |
717 | 156 | current_schema_pb, |
718 | 156 | tablet_peer->tablet()->metadata()->schema_version()); |
719 | 4.15k | } else { |
720 | 4.15k | current_schema = **cached_schema; |
721 | 4.15k | } |
722 | | |
723 | 4.30k | const auto& batch = msg->write().write_batch(); |
724 | | |
725 | 4.30k | switch (msg->op_type()) { |
726 | 74 | case consensus::OperationType::UPDATE_TRANSACTION_OP: |
727 | | // Ignore intents. |
728 | | // Read from IntentDB after they have been applied. |
729 | 74 | if (msg->transaction_state().status() == TransactionStatus::APPLYING) { |
730 | 74 | auto txn_id = |
731 | 74 | VERIFY_RESULT(FullyDecodeTransactionId(msg->transaction_state().transaction_id())); |
732 | 74 | auto result = GetTransactionStatus(txn_id, tablet_peer->Now(), txn_participant); |
733 | 74 | std::vector<docdb::IntentKeyValueForCDC> intents; |
734 | 74 | docdb::ApplyTransactionState new_stream_state; |
735 | | |
736 | 74 | *commit_timestamp = msg->transaction_state().commit_hybrid_time(); |
737 | 74 | op_id.term = msg->id().term(); |
738 | 74 | op_id.index = msg->id().index(); |
739 | 74 | RETURN_NOT_OK(ProcessIntents( |
740 | 74 | op_id, txn_id, stream_metadata, resp, &consumption, &checkpoint, tablet_peer, |
741 | 74 | &intents, &new_stream_state, ¤t_schema)); |
742 | | |
743 | 74 | if (new_stream_state.write_id != 0 && !new_stream_state.key.empty()) { |
744 | 0 | pending_intents = true; |
745 | 74 | } else { |
746 | 74 | last_streamed_op_id->term = msg->id().term(); |
747 | 74 | last_streamed_op_id->index = msg->id().index(); |
748 | 74 | } |
749 | 74 | } |
750 | 74 | checkpoint_updated = true; |
751 | 74 | break; |
752 | | |
753 | 344 | case consensus::OperationType::WRITE_OP: |
754 | 344 | if (!batch.has_transaction()) { |
755 | 205 | RETURN_NOT_OK( |
756 | 205 | PopulateCDCSDKWriteRecord(msg, stream_metadata, tablet_peer, resp, current_schema)); |
757 | | |
758 | 205 | SetCheckpoint( |
759 | 205 | msg->id().term(), msg->id().index(), 0, "", 0, &checkpoint, last_streamed_op_id); |
760 | 205 | checkpoint_updated = true; |
761 | 205 | } |
762 | 344 | break; |
763 | | |
764 | 3.88k | case consensus::OperationType::CHANGE_METADATA_OP: { |
765 | 3.88k | RETURN_NOT_OK(SchemaFromPB(msg->change_metadata_request().schema(), ¤t_schema)); |
766 | 3.88k | string table_name = tablet_peer->tablet()->metadata()->table_name(); |
767 | 3.88k | *cached_schema = std::make_shared<Schema>(std::move(current_schema)); |
768 | 3.88k | if ((resp->cdc_sdk_proto_records_size() > 0 && |
769 | 3.73k | resp->cdc_sdk_proto_records(resp->cdc_sdk_proto_records_size() - 1) |
770 | 3.73k | .row_message() |
771 | 3.73k | .op() == RowMessage_Op_DDL)) { |
772 | 3.73k | if ((resp->cdc_sdk_proto_records(resp->cdc_sdk_proto_records_size() - 1) |
773 | 3.73k | .row_message() |
774 | 0 | .schema_version() != msg->change_metadata_request().schema_version())) { |
775 | 0 | RETURN_NOT_OK(PopulateCDCSDKDDLRecord( |
776 | 0 | msg, resp->add_cdc_sdk_proto_records(), table_name, current_schema)); |
777 | 0 | } |
778 | 152 | } else { |
779 | 152 | RETURN_NOT_OK(PopulateCDCSDKDDLRecord( |
780 | 152 | msg, resp->add_cdc_sdk_proto_records(), table_name, current_schema)); |
781 | 152 | } |
782 | 3.88k | SetCheckpoint( |
783 | 3.88k | msg->id().term(), msg->id().index(), 0, "", 0, &checkpoint, last_streamed_op_id); |
784 | 3.88k | checkpoint_updated = true; |
785 | 3.88k | } |
786 | 3.88k | break; |
787 | | |
788 | 0 | case consensus::OperationType::TRUNCATE_OP: { |
789 | 0 | RETURN_NOT_OK( |
790 | 0 | PopulateCDCSDKTruncateRecord(msg, resp->add_cdc_sdk_proto_records(), current_schema)); |
791 | |
|
792 | 0 | SetCheckpoint( |
793 | 0 | msg->id().term(), msg->id().index(), 0, "", 0, &checkpoint, last_streamed_op_id); |
794 | 0 | checkpoint_updated = true; |
795 | 0 | } break; |
796 | |
|
797 | 0 | default: |
798 | | // Nothing to do for other operation types. |
799 | 0 | break; |
800 | 4.30k | } |
801 | | |
802 | 4.30k | if (pending_intents) break; |
803 | 4.30k | } |
804 | 307 | if (read_ops.messages.size() > 0) |
805 | 307 | *msgs_holder = consensus::ReplicateMsgsHolder( |
806 | 307 | nullptr, std::move(read_ops.messages), std::move(consumption)); |
807 | 307 | } |
808 | | |
809 | 319 | if (consumption) { |
810 | 0 | consumption.Add(resp->SpaceUsedLong()); |
811 | 0 | } |
812 | | |
813 | 319 | checkpoint_updated ? resp->mutable_cdc_sdk_checkpoint()->CopyFrom(checkpoint) |
814 | 0 | : resp->mutable_cdc_sdk_checkpoint()->CopyFrom(from_op_id); |
815 | | |
816 | 319 | if (checkpoint_updated) { |
817 | 0 | VLOG(1) << "The checkpoint is updated " << resp->checkpoint().DebugString(); |
818 | 0 | } else { |
819 | 0 | VLOG(1) << "The checkpoint is not updated " << resp->checkpoint().DebugString(); |
820 | 0 | } |
821 | | |
822 | 319 | if (last_streamed_op_id->index > 0) { |
823 | 307 | resp->mutable_checkpoint()->mutable_op_id()->set_term(last_streamed_op_id->term); |
824 | 307 | resp->mutable_checkpoint()->mutable_op_id()->set_index(last_streamed_op_id->index); |
825 | 307 | } |
826 | | |
827 | 319 | return Status::OK(); |
828 | 319 | } |
829 | | |
830 | | } // namespace cdc |
831 | | } // namespace yb |