/Users/deen/code/yugabyte-db/ent/src/yb/cdc/cdcsdk_producer.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | |
13 | | #include "yb/cdc/cdc_producer.h" |
14 | | |
15 | | #include "yb/cdc/cdc_common_util.h" |
16 | | |
17 | | #include "yb/common/wire_protocol.h" |
18 | | #include "yb/common/ql_expr.h" |
19 | | |
20 | | #include "yb/docdb/docdb_util.h" |
21 | | #include "yb/docdb/doc_key.h" |
22 | | |
23 | | DEFINE_int32(cdc_snapshot_batch_size, 250, "Batch size for the snapshot operation in CDC"); |
24 | | |
25 | | namespace yb { |
26 | | namespace cdc { |
27 | | |
28 | | using consensus::ReplicateMsgPtr; |
29 | | using consensus::ReplicateMsgs; |
30 | | using docdb::PrimitiveValue; |
31 | | using tablet::TransactionParticipant; |
32 | | using yb::QLTableRow; |
33 | | |
34 | | YB_DEFINE_ENUM(OpType, (INSERT)(UPDATE)(DELETE)); |
35 | | |
36 | 757 | void SetOperation(RowMessage* row_message, OpType type, const Schema& schema) { |
37 | 757 | switch (type) { |
38 | 486 | case OpType::INSERT: |
39 | 486 | row_message->set_op(RowMessage_Op_INSERT); |
40 | 486 | break; |
41 | 137 | case OpType::UPDATE: |
42 | 137 | row_message->set_op(RowMessage_Op_UPDATE); |
43 | 137 | break; |
44 | 134 | case OpType::DELETE: |
45 | 134 | row_message->set_op(RowMessage_Op_DELETE); |
46 | 134 | break; |
47 | 757 | } |
48 | | |
49 | 757 | row_message->set_pgschema_name(schema.SchemaName()); |
50 | 757 | } |
51 | | |
52 | | void AddColumnToMap( |
53 | | const std::shared_ptr<tablet::TabletPeer>& tablet_peer, |
54 | | const ColumnSchema& col_schema, |
55 | | const docdb::PrimitiveValue& col, |
56 | 1.86k | DatumMessagePB* cdc_datum_message) { |
57 | 1.86k | cdc_datum_message->set_column_name(col_schema.name()); |
58 | 1.86k | QLValuePB ql_value; |
59 | 1.86k | if (tablet_peer->tablet()->table_type() == PGSQL_TABLE_TYPE) { |
60 | 1.86k | docdb::PrimitiveValue::ToQLValuePB(col, col_schema.type(), &ql_value); |
61 | 1.86k | if (!IsNull(ql_value) && col_schema.pg_type_oid() != 01.77k /*kInvalidOid*/) { |
62 | 1.77k | docdb::SetValueFromQLBinaryWrapper(ql_value, col_schema.pg_type_oid(), cdc_datum_message); |
63 | 1.77k | } else { |
64 | 92 | cdc_datum_message->set_column_type(col_schema.pg_type_oid()); |
65 | 92 | } |
66 | 1.86k | } |
67 | 1.86k | } |
68 | | |
69 | 895 | DatumMessagePB* AddTuple(RowMessage* row_message) { |
70 | 895 | if (!row_message) { |
71 | 0 | return nullptr; |
72 | 0 | } |
73 | 895 | DatumMessagePB* tuple = nullptr; |
74 | | |
75 | 895 | if (row_message->op() == RowMessage_Op_DELETE) { |
76 | 172 | tuple = row_message->add_old_tuple(); |
77 | 172 | row_message->add_new_tuple(); |
78 | 723 | } else { |
79 | 723 | tuple = row_message->add_new_tuple(); |
80 | 723 | row_message->add_old_tuple(); |
81 | 723 | } |
82 | 895 | return tuple; |
83 | 895 | } |
84 | | |
85 | | void AddPrimaryKey( |
86 | | const std::shared_ptr<tablet::TabletPeer>& tablet_peer, const docdb::SubDocKey& decoded_key, |
87 | 757 | const Schema& tablet_schema, RowMessage* row_message) { |
88 | 757 | size_t i = 0; |
89 | 757 | for (const auto& col : decoded_key.doc_key().hashed_group()) { |
90 | 757 | DatumMessagePB* tuple = AddTuple(row_message); |
91 | | |
92 | 757 | AddColumnToMap( |
93 | 757 | tablet_peer, tablet_schema.column(i), col, tuple); |
94 | 757 | i++; |
95 | 757 | } |
96 | | |
97 | 757 | for (const auto& col : decoded_key.doc_key().range_group()) { |
98 | 138 | DatumMessagePB* tuple = AddTuple(row_message); |
99 | | |
100 | 138 | AddColumnToMap( |
101 | 138 | tablet_peer, tablet_schema.column(i), col, tuple); |
102 | 138 | i++; |
103 | 138 | } |
104 | 757 | } |
105 | | |
106 | | void SetCDCSDKOpId( |
107 | | int64_t term, int64_t index, uint32_t write_id, const std::string& key, |
108 | 1.21k | CDCSDKOpIdPB* cdc_sdk_op_id_pb) { |
109 | 1.21k | cdc_sdk_op_id_pb->set_term(term); |
110 | 1.21k | cdc_sdk_op_id_pb->set_index(index); |
111 | 1.21k | cdc_sdk_op_id_pb->set_write_id(write_id); |
112 | 1.21k | cdc_sdk_op_id_pb->set_write_id_key(key); |
113 | 1.21k | } |
114 | | |
115 | | void SetCheckpoint( |
116 | | int64_t term, int64_t index, int32 write_id, const std::string& key, uint64 time, |
117 | 8.21k | CDCSDKCheckpointPB* cdc_sdk_checkpoint_pb, OpId* last_streamed_op_id) { |
118 | 8.21k | cdc_sdk_checkpoint_pb->set_term(term); |
119 | 8.21k | cdc_sdk_checkpoint_pb->set_index(index); |
120 | 8.21k | cdc_sdk_checkpoint_pb->set_write_id(write_id); |
121 | 8.21k | cdc_sdk_checkpoint_pb->set_key(key); |
122 | 8.21k | cdc_sdk_checkpoint_pb->set_snapshot_time(time); |
123 | 8.21k | if (last_streamed_op_id) { |
124 | 8.19k | last_streamed_op_id->term = term; |
125 | 8.19k | last_streamed_op_id->index = index; |
126 | 8.19k | } |
127 | 8.21k | } |
128 | | |
129 | | bool ShouldCreateNewProtoRecord( |
130 | 617 | const RowMessage& row_message, const Schema& schema, size_t col_count) { |
131 | 617 | return (row_message.op() == RowMessage_Op_INSERT && col_count == schema.num_columns()434 ) || |
132 | 617 | (455 row_message.op() == RowMessage_Op_UPDATE455 || row_message.op() == RowMessage_Op_DELETE354 ); |
133 | 617 | } |
134 | | |
135 | 617 | bool IsInsertOperation(const RowMessage& row_message) { |
136 | 617 | return row_message.op() == RowMessage_Op_INSERT; |
137 | 617 | } |
138 | | |
139 | 1.58k | bool IsInsertOrUpdate(const RowMessage& row_message) { |
140 | 1.58k | return row_message.IsInitialized() && |
141 | 1.58k | (row_message.op() == RowMessage_Op_INSERT |
142 | 1.58k | || row_message.op() == RowMessage_Op_UPDATE271 ); |
143 | 1.58k | } |
144 | | |
145 | | void MakeNewProtoRecord( |
146 | | const docdb::IntentKeyValueForCDC& intent, const OpId& op_id, const RowMessage& row_message, |
147 | | const Schema& schema, size_t col_count, CDCSDKProtoRecordPB* proto_record, |
148 | 617 | GetChangesResponsePB* resp, IntraTxnWriteId* write_id, std::string* reverse_index_key) { |
149 | 617 | if (ShouldCreateNewProtoRecord(row_message, schema, col_count)) { |
150 | 345 | CDCSDKOpIdPB* cdc_sdk_op_id_pb = proto_record->mutable_cdc_sdk_op_id(); |
151 | 345 | SetCDCSDKOpId( |
152 | 345 | op_id.term, op_id.index, intent.write_id, intent.reverse_index_key, cdc_sdk_op_id_pb); |
153 | | |
154 | 345 | CDCSDKProtoRecordPB* record_to_be_added = resp->add_cdc_sdk_proto_records(); |
155 | 345 | record_to_be_added->CopyFrom(*proto_record); |
156 | 345 | record_to_be_added->mutable_row_message()->CopyFrom(row_message); |
157 | | |
158 | 345 | *write_id = intent.write_id; |
159 | 345 | *reverse_index_key = intent.reverse_index_key; |
160 | 345 | } |
161 | 617 | } |
162 | | // Populate CDC record corresponding to WAL batch in ReplicateMsg. |
163 | | CHECKED_STATUS PopulateCDCSDKIntentRecord( |
164 | | const OpId& op_id, |
165 | | const TransactionId& transaction_id, |
166 | | const std::vector<docdb::IntentKeyValueForCDC>& intents, |
167 | | const StreamMetadata& metadata, |
168 | | const std::shared_ptr<tablet::TabletPeer>& tablet_peer, |
169 | | GetChangesResponsePB* resp, |
170 | | ScopedTrackedConsumption* consumption, |
171 | | IntraTxnWriteId* write_id, |
172 | | std::string* reverse_index_key, |
173 | 147 | Schema* old_schema) { |
174 | 147 | Schema& schema = old_schema ? *old_schema : *tablet_peer->tablet()->schema()0 ; |
175 | 147 | Slice prev_key; |
176 | 147 | CDCSDKProtoRecordPB proto_record; |
177 | 147 | RowMessage* row_message = proto_record.mutable_row_message(); |
178 | 147 | size_t col_count = 0; |
179 | 617 | for (const auto& intent : intents) { |
180 | 617 | Slice key(intent.key_buf); |
181 | 617 | Slice value(intent.value_buf); |
182 | 617 | const auto key_size = |
183 | 617 | VERIFY_RESULT(docdb::DocKey::EncodedSize(key, docdb::DocKeyPart::kWholeDocKey)); |
184 | | |
185 | 0 | docdb::PrimitiveValue column_id; |
186 | 617 | boost::optional<docdb::PrimitiveValue> column_id_opt; |
187 | 617 | Slice key_column = key.WithoutPrefix(key_size); |
188 | 617 | if (!key_column.empty()) { |
189 | 535 | RETURN_NOT_OK(docdb::SubDocument::DecodeKey(&key_column, &column_id)); |
190 | 535 | column_id_opt = boost::optional<docdb::PrimitiveValue>(column_id); |
191 | 535 | } |
192 | | |
193 | 617 | Slice sub_doc_key = key; |
194 | 617 | docdb::SubDocKey decoded_key; |
195 | 617 | RETURN_NOT_OK(decoded_key.DecodeFrom(&sub_doc_key, docdb::HybridTimeRequired::kFalse)); |
196 | | |
197 | 617 | docdb::Value decoded_value; |
198 | 617 | RETURN_NOT_OK(decoded_value.Decode(value)); |
199 | | |
200 | 617 | if (column_id_opt && column_id_opt->value_type() == docdb::ValueType::kColumnId535 && |
201 | 617 | schema.is_key_column(column_id_opt->GetColumnId())373 ) { |
202 | 0 | *write_id = intent.write_id; |
203 | 0 | *reverse_index_key = intent.reverse_index_key; |
204 | 0 | continue; |
205 | 0 | } |
206 | | |
207 | 617 | if (*consumption) { |
208 | 613 | consumption->Add(key.size()); |
209 | 613 | } |
210 | | |
211 | | // Compare key hash with previously seen key hash to determine whether the write pair |
212 | | // is part of the same row or not. |
213 | 617 | Slice primary_key(key.data(), key_size); |
214 | 617 | if (prev_key != primary_key || col_count >= schema.num_columns()382 ) { |
215 | 345 | proto_record.Clear(); |
216 | 345 | row_message->Clear(); |
217 | | |
218 | | // Check whether operation is WRITE or DELETE. |
219 | 345 | if (decoded_value.value_type() == docdb::ValueType::kTombstone && |
220 | 345 | decoded_key.num_subkeys() == 082 ) { |
221 | 82 | SetOperation(row_message, OpType::DELETE, schema); |
222 | 82 | *write_id = intent.write_id; |
223 | 263 | } else { |
224 | 263 | if (column_id_opt && |
225 | 263 | column_id_opt->value_type() == docdb::ValueType::kSystemColumnId && |
226 | 263 | decoded_value.value_type() == docdb::ValueType::kNullLow162 ) { |
227 | 162 | SetOperation(row_message, OpType::INSERT, schema); |
228 | 162 | col_count = schema.num_key_columns() - 1; |
229 | 162 | } else { |
230 | 101 | SetOperation(row_message, OpType::UPDATE, schema); |
231 | 101 | col_count = schema.num_columns(); |
232 | 101 | *write_id = intent.write_id; |
233 | 101 | } |
234 | 263 | } |
235 | | |
236 | | // Write pair contains record for different row. Create a new CDCRecord in this case. |
237 | 345 | row_message->set_transaction_id(transaction_id.ToString()); |
238 | 345 | AddPrimaryKey(tablet_peer, decoded_key, schema, row_message); |
239 | 345 | } |
240 | | |
241 | 617 | if (IsInsertOperation(*row_message)) { |
242 | 434 | ++col_count; |
243 | 434 | } |
244 | | |
245 | 617 | prev_key = primary_key; |
246 | 617 | if (IsInsertOrUpdate(*row_message)) { |
247 | 535 | if (column_id_opt && column_id_opt->value_type() == docdb::ValueType::kColumnId) { |
248 | 373 | const ColumnSchema& col = VERIFY_RESULT(schema.column_by_id(column_id_opt->GetColumnId())); |
249 | | |
250 | 0 | AddColumnToMap( |
251 | 373 | tablet_peer, col, decoded_value.primitive_value(), row_message->add_new_tuple()); |
252 | 373 | row_message->add_old_tuple(); |
253 | | |
254 | 373 | } else if ( |
255 | 162 | column_id_opt && column_id_opt->value_type() != docdb::ValueType::kSystemColumnId) { |
256 | 0 | LOG(DFATAL) << "Unexpected value type in key: " << column_id_opt->value_type() |
257 | 0 | << " key: " << decoded_key.ToString() |
258 | 0 | << " value: " << decoded_value.primitive_value(); |
259 | 0 | } |
260 | 535 | } |
261 | 617 | row_message->set_table(tablet_peer->tablet()->metadata()->table_name()); |
262 | 617 | MakeNewProtoRecord( |
263 | 617 | intent, op_id, *row_message, schema, col_count, &proto_record, resp, write_id, |
264 | 617 | reverse_index_key); |
265 | 617 | } |
266 | | |
267 | 147 | return Status::OK(); |
268 | 147 | } |
269 | | |
270 | | // Populate CDC record corresponding to WAL batch in ReplicateMsg. |
271 | | CHECKED_STATUS PopulateCDCSDKWriteRecord( |
272 | | const ReplicateMsgPtr& msg, |
273 | | const StreamMetadata& metadata, |
274 | | const std::shared_ptr<tablet::TabletPeer>& tablet_peer, |
275 | | GetChangesResponsePB* resp, |
276 | 412 | const Schema& schema) { |
277 | 412 | const auto& batch = msg->write().write_batch(); |
278 | 412 | CDCSDKProtoRecordPB* proto_record = nullptr; |
279 | 412 | RowMessage* row_message = nullptr; |
280 | | |
281 | | // Write batch may contain records from different rows. |
282 | | // For CDC, we need to split the batch into 1 CDC record per row of the table. |
283 | | // We'll use DocDB key hash to identify the records that belong to the same row. |
284 | 412 | Slice prev_key; |
285 | | |
286 | 970 | for (const auto& write_pair : batch.write_pairs()) { |
287 | 970 | Slice key = write_pair.key(); |
288 | 970 | const auto key_size = |
289 | 970 | VERIFY_RESULT(docdb::DocKey::EncodedSize(key, docdb::DocKeyPart::kWholeDocKey)); |
290 | | |
291 | 0 | Slice value = write_pair.value(); |
292 | 970 | docdb::Value decoded_value; |
293 | 970 | RETURN_NOT_OK(decoded_value.Decode(value)); |
294 | | |
295 | | // Compare key hash with previously seen key hash to determine whether the write pair |
296 | | // is part of the same row or not. |
297 | 970 | Slice primary_key(key.data(), key_size); |
298 | 970 | if (prev_key != primary_key) { |
299 | | // Write pair contains record for different row. Create a new CDCRecord in this case. |
300 | 412 | proto_record = resp->add_cdc_sdk_proto_records(); |
301 | 412 | row_message = proto_record->mutable_row_message(); |
302 | 412 | row_message->set_pgschema_name(schema.SchemaName()); |
303 | 412 | row_message->set_table(tablet_peer->tablet()->metadata()->table_name()); |
304 | | |
305 | 412 | CDCSDKOpIdPB* cdc_sdk_op_id_pb = proto_record->mutable_cdc_sdk_op_id(); |
306 | 412 | SetCDCSDKOpId(msg->id().term(), msg->id().index(), 0, "", cdc_sdk_op_id_pb); |
307 | | |
308 | 412 | Slice sub_doc_key = key; |
309 | 412 | docdb::SubDocKey decoded_key; |
310 | 412 | RETURN_NOT_OK(decoded_key.DecodeFrom(&sub_doc_key, docdb::HybridTimeRequired::kFalse)); |
311 | | |
312 | | // Check whether operation is WRITE or DELETE. |
313 | 412 | if (decoded_value.value_type() == docdb::ValueType::kTombstone && |
314 | 412 | decoded_key.num_subkeys() == 052 ) { |
315 | 52 | SetOperation(row_message, OpType::DELETE, schema); |
316 | 360 | } else { |
317 | 360 | docdb::PrimitiveValue column_id; |
318 | 360 | Slice key_column(key.WithoutPrefix(key_size)); |
319 | 360 | RETURN_NOT_OK(docdb::SubDocument::DecodeKey(&key_column, &column_id)); |
320 | | |
321 | 360 | if (column_id.value_type() == docdb::ValueType::kSystemColumnId && |
322 | 360 | decoded_value.value_type() == docdb::ValueType::kNullLow324 ) { |
323 | 324 | SetOperation(row_message, OpType::INSERT, schema); |
324 | 324 | } else { |
325 | 36 | SetOperation(row_message, OpType::UPDATE, schema); |
326 | 36 | } |
327 | 360 | } |
328 | | |
329 | 412 | AddPrimaryKey(tablet_peer, decoded_key, schema, row_message); |
330 | | |
331 | | // Process intent records. |
332 | 412 | row_message->set_commit_time(msg->hybrid_time()); |
333 | 412 | } |
334 | 970 | prev_key = primary_key; |
335 | 970 | DCHECK(proto_record); |
336 | | |
337 | 970 | if (IsInsertOrUpdate(*row_message)) { |
338 | 918 | docdb::PrimitiveValue column_id; |
339 | 918 | Slice key_column((const char*)(key.data() + key_size)); |
340 | 918 | RETURN_NOT_OK(docdb::SubDocument::DecodeKey(&key_column, &column_id)); |
341 | 918 | if (column_id.value_type() == docdb::ValueType::kColumnId) { |
342 | 594 | const ColumnSchema& col = VERIFY_RESULT(schema.column_by_id(column_id.GetColumnId())); |
343 | | |
344 | 0 | AddColumnToMap( |
345 | 594 | tablet_peer, col, decoded_value.primitive_value(), row_message->add_new_tuple()); |
346 | 594 | row_message->add_old_tuple(); |
347 | | |
348 | 594 | } else if (324 column_id.value_type() != docdb::ValueType::kSystemColumnId324 ) { |
349 | 0 | LOG(DFATAL) << "Unexpected value type in key: " << column_id.value_type(); |
350 | 0 | } |
351 | 918 | } |
352 | 970 | } |
353 | | |
354 | 412 | return Status::OK(); |
355 | 412 | } |
356 | | |
357 | | void SetTableProperties( |
358 | | const TablePropertiesPB* table_properties, |
359 | 631 | CDCSDKTablePropertiesPB* cdc_sdk_table_properties_pb) { |
360 | 631 | cdc_sdk_table_properties_pb->set_default_time_to_live(table_properties->default_time_to_live()); |
361 | 631 | cdc_sdk_table_properties_pb->set_num_tablets(table_properties->num_tablets()); |
362 | 631 | cdc_sdk_table_properties_pb->set_is_ysql_catalog_table(table_properties->is_ysql_catalog_table()); |
363 | 631 | } |
364 | | |
365 | 1.77k | void SetColumnInfo(const ColumnSchemaPB& column, CDCSDKColumnInfoPB* column_info) { |
366 | 1.77k | column_info->set_name(column.name()); |
367 | 1.77k | column_info->mutable_type()->CopyFrom(column.type()); |
368 | 1.77k | column_info->set_is_key(column.is_key()); |
369 | 1.77k | column_info->set_is_hash_key(column.is_hash_key()); |
370 | 1.77k | column_info->set_is_nullable(column.is_nullable()); |
371 | 1.77k | column_info->set_oid(column.pg_type_oid()); |
372 | 1.77k | } |
373 | | |
374 | | CHECKED_STATUS PopulateCDCSDKDDLRecord( |
375 | | const ReplicateMsgPtr& msg, CDCSDKProtoRecordPB* proto_record, const string& table_name, |
376 | 306 | const Schema& schema) { |
377 | 306 | SCHECK( |
378 | 306 | msg->has_change_metadata_request(), InvalidArgument, |
379 | 306 | Format( |
380 | 306 | "Change metadata (DDL) message requires metadata information: $0", |
381 | 306 | msg->ShortDebugString())); |
382 | | |
383 | 306 | RowMessage* row_message = nullptr; |
384 | | |
385 | 306 | row_message = proto_record->mutable_row_message(); |
386 | 306 | row_message->set_op(RowMessage_Op_DDL); |
387 | 306 | row_message->set_table(table_name); |
388 | | |
389 | 306 | CDCSDKOpIdPB* cdc_sdk_op_id_pb = proto_record->mutable_cdc_sdk_op_id(); |
390 | 306 | SetCDCSDKOpId(msg->id().term(), msg->id().index(), 0, "", cdc_sdk_op_id_pb); |
391 | | |
392 | 854 | for (const auto& column : msg->change_metadata_request().schema().columns()) { |
393 | 854 | CDCSDKColumnInfoPB* column_info = nullptr; |
394 | 854 | column_info = row_message->mutable_schema()->add_column_info(); |
395 | 854 | SetColumnInfo(column, column_info); |
396 | 854 | } |
397 | | |
398 | 306 | CDCSDKTablePropertiesPB* cdc_sdk_table_properties_pb; |
399 | 306 | const TablePropertiesPB* table_properties = |
400 | 306 | &(msg->change_metadata_request().schema().table_properties()); |
401 | | |
402 | 306 | cdc_sdk_table_properties_pb = row_message->mutable_schema()->mutable_tab_info(); |
403 | 306 | row_message->set_schema_version(msg->change_metadata_request().schema_version()); |
404 | 306 | row_message->set_new_table_name(msg->change_metadata_request().new_table_name()); |
405 | 306 | row_message->set_pgschema_name(schema.SchemaName()); |
406 | 306 | SetTableProperties(table_properties, cdc_sdk_table_properties_pb); |
407 | | |
408 | 306 | return Status::OK(); |
409 | 306 | } |
410 | | |
411 | | CHECKED_STATUS PopulateCDCSDKTruncateRecord( |
412 | 0 | const ReplicateMsgPtr& msg, CDCSDKProtoRecordPB* proto_record, const Schema& schema) { |
413 | 0 | SCHECK( |
414 | 0 | msg->has_truncate(), InvalidArgument, |
415 | 0 | Format( |
416 | 0 | "Truncate message requires truncate request information: $0", msg->ShortDebugString())); |
417 | | |
418 | 0 | RowMessage* row_message = nullptr; |
419 | |
|
420 | 0 | row_message = proto_record->mutable_row_message(); |
421 | 0 | row_message->set_op(RowMessage_Op_TRUNCATE); |
422 | 0 | row_message->set_pgschema_name(schema.SchemaName()); |
423 | 0 | row_message->mutable_truncate_request_info()->CopyFrom(msg->truncate()); |
424 | |
|
425 | 0 | CDCSDKOpIdPB* cdc_sdk_op_id_pb; |
426 | |
|
427 | 0 | cdc_sdk_op_id_pb = proto_record->mutable_cdc_sdk_op_id(); |
428 | 0 | SetCDCSDKOpId(msg->id().term(), msg->id().index(), 0, "", cdc_sdk_op_id_pb); |
429 | |
|
430 | 0 | return Status::OK(); |
431 | 0 | } |
432 | | |
433 | 147 | void SetTermIndex(int64_t term, int64_t index, CDCSDKCheckpointPB* checkpoint) { |
434 | 147 | checkpoint->set_term(term); |
435 | 147 | checkpoint->set_index(index); |
436 | 147 | } |
437 | | |
438 | 147 | void SetKeyWriteId(string key, int32_t write_id, CDCSDKCheckpointPB* checkpoint) { |
439 | 147 | checkpoint->set_key(key); |
440 | 147 | checkpoint->set_write_id(write_id); |
441 | 147 | } |
442 | | |
443 | | CHECKED_STATUS ProcessIntents( |
444 | | const OpId& op_id, |
445 | | const TransactionId& transaction_id, |
446 | | const StreamMetadata& metadata, |
447 | | GetChangesResponsePB* resp, |
448 | | ScopedTrackedConsumption* consumption, |
449 | | CDCSDKCheckpointPB* checkpoint, |
450 | | const std::shared_ptr<tablet::TabletPeer>& tablet_peer, |
451 | | std::vector<docdb::IntentKeyValueForCDC>* keyValueIntents, |
452 | | docdb::ApplyTransactionState* stream_state, |
453 | 147 | Schema* schema) { |
454 | 147 | if (stream_state->key.empty() && stream_state->write_id == 0) { |
455 | 147 | CDCSDKProtoRecordPB* proto_record = resp->add_cdc_sdk_proto_records(); |
456 | 147 | RowMessage* row_message = proto_record->mutable_row_message(); |
457 | 147 | row_message->set_op(RowMessage_Op_BEGIN); |
458 | 147 | row_message->set_transaction_id(transaction_id.ToString()); |
459 | 147 | row_message->set_table(tablet_peer->tablet()->metadata()->table_name()); |
460 | 147 | } |
461 | | |
462 | 147 | auto tablet = tablet_peer->shared_tablet(); |
463 | 147 | RETURN_NOT_OK(tablet->GetIntents(transaction_id, keyValueIntents, stream_state)); |
464 | | |
465 | 617 | for (auto& keyValue : *keyValueIntents)147 { |
466 | 617 | docdb::SubDocKey sub_doc_key; |
467 | 617 | CHECK_OK( |
468 | 617 | sub_doc_key.FullyDecodeFrom(Slice(keyValue.key_buf), docdb::HybridTimeRequired::kFalse)); |
469 | 617 | docdb::Value decoded_value; |
470 | 617 | RETURN_NOT_OK(decoded_value.Decode(Slice(keyValue.value_buf))); |
471 | 617 | } |
472 | | |
473 | 147 | std::string reverse_index_key; |
474 | 147 | IntraTxnWriteId write_id = 0; |
475 | | |
476 | | // Need to populate the CDCSDKRecords |
477 | 147 | RETURN_NOT_OK(PopulateCDCSDKIntentRecord( |
478 | 147 | op_id, transaction_id, *keyValueIntents, metadata, tablet_peer, resp, consumption, &write_id, |
479 | 147 | &reverse_index_key, schema)); |
480 | | |
481 | 147 | SetTermIndex(op_id.term, op_id.index, checkpoint); |
482 | | |
483 | 147 | if (stream_state->key.empty() && stream_state->write_id == 0) { |
484 | 147 | CDCSDKProtoRecordPB* proto_record = resp->add_cdc_sdk_proto_records(); |
485 | 147 | RowMessage* row_message = proto_record->mutable_row_message(); |
486 | | |
487 | 147 | row_message->set_op(RowMessage_Op_COMMIT); |
488 | 147 | row_message->set_transaction_id(transaction_id.ToString()); |
489 | 147 | row_message->set_table(tablet_peer->tablet()->metadata()->table_name()); |
490 | | |
491 | 147 | CDCSDKOpIdPB* cdc_sdk_op_id_pb = proto_record->mutable_cdc_sdk_op_id(); |
492 | 147 | SetCDCSDKOpId(op_id.term, op_id.index, 0, "", cdc_sdk_op_id_pb); |
493 | 147 | SetKeyWriteId("", 0, checkpoint); |
494 | 147 | } else { |
495 | 0 | SetKeyWriteId(reverse_index_key, write_id, checkpoint); |
496 | 0 | } |
497 | | |
498 | 147 | return Status::OK(); |
499 | 147 | } |
500 | | |
501 | | CHECKED_STATUS PopulateCDCSDKSnapshotRecord( |
502 | | GetChangesResponsePB* resp, |
503 | | const QLTableRow* row, |
504 | | const Schema& schema, |
505 | | const std::shared_ptr<tablet::TabletPeer>& tablet_peer, |
506 | 25.5k | ReadHybridTime time) { |
507 | 25.5k | CDCSDKProtoRecordPB* proto_record = nullptr; |
508 | 25.5k | RowMessage* row_message = nullptr; |
509 | 25.5k | string table_name = tablet_peer->tablet()->metadata()->table_name(); |
510 | | |
511 | 25.5k | proto_record = resp->add_cdc_sdk_proto_records(); |
512 | 25.5k | row_message = proto_record->mutable_row_message(); |
513 | 25.5k | row_message->set_table(table_name); |
514 | 25.5k | row_message->set_op(RowMessage_Op_READ); |
515 | 25.5k | row_message->set_pgschema_name(schema.SchemaName()); |
516 | 25.5k | row_message->set_commit_time(time.read.ToUint64()); |
517 | | |
518 | 25.5k | DatumMessagePB* cdc_datum_message = nullptr; |
519 | | |
520 | 102k | for (size_t col_idx = 0; col_idx < schema.num_columns(); col_idx++76.5k ) { |
521 | 76.5k | ColumnId col_id = schema.column_id(col_idx); |
522 | 76.5k | const auto* value = row->GetColumn(col_id); |
523 | 76.5k | const ColumnSchema& col_schema = VERIFY_RESULT(schema.column_by_id(col_id)); |
524 | | |
525 | 0 | cdc_datum_message = row_message->add_new_tuple(); |
526 | 76.5k | cdc_datum_message->set_column_name(col_schema.name()); |
527 | | |
528 | 76.5k | if (value && value->value_case() != QLValuePB::VALUE_NOT_SET |
529 | 76.5k | && col_schema.pg_type_oid() != 0 /*kInvalidOid*/) { |
530 | 76.5k | docdb::SetValueFromQLBinaryWrapper(*value, col_schema.pg_type_oid(), cdc_datum_message); |
531 | 76.5k | } else { |
532 | 0 | cdc_datum_message->set_column_type(col_schema.pg_type_oid()); |
533 | 0 | } |
534 | | |
535 | 76.5k | row_message->add_old_tuple(); |
536 | 76.5k | } |
537 | | |
538 | 25.5k | return Status::OK(); |
539 | 25.5k | } |
540 | | |
541 | 325 | void FillDDLInfo(RowMessage* row_message, const SchemaPB& schema, const uint32_t schema_version) { |
542 | 918 | for (const auto& column : schema.columns()) { |
543 | 918 | CDCSDKColumnInfoPB* column_info; |
544 | 918 | column_info = row_message->mutable_schema()->add_column_info(); |
545 | 918 | SetColumnInfo(column, column_info); |
546 | 918 | } |
547 | | |
548 | 325 | row_message->set_schema_version(schema_version); |
549 | 325 | row_message->set_pgschema_name(schema.pgschema_name()); |
550 | 325 | CDCSDKTablePropertiesPB* cdc_sdk_table_properties_pb = |
551 | 325 | row_message->mutable_schema()->mutable_tab_info(); |
552 | | |
553 | 325 | const TablePropertiesPB* table_properties = &(schema.table_properties()); |
554 | 325 | SetTableProperties(table_properties, cdc_sdk_table_properties_pb); |
555 | 325 | } |
556 | | |
557 | | // CDC get changes is different from 2DC as it doesn't need |
558 | | // to read intents from WAL. |
559 | | |
560 | | Status GetChangesForCDCSDK( |
561 | | const std::string& stream_id, |
562 | | const std::string& tablet_id, |
563 | | const CDCSDKCheckpointPB& from_op_id, |
564 | | const StreamMetadata& stream_metadata, |
565 | | const std::shared_ptr<tablet::TabletPeer>& tablet_peer, |
566 | | const MemTrackerPtr& mem_tracker, |
567 | | consensus::ReplicateMsgsHolder* msgs_holder, |
568 | | GetChangesResponsePB* resp, |
569 | | std::string* commit_timestamp, |
570 | | std::shared_ptr<Schema>* cached_schema, |
571 | | OpId* last_streamed_op_id, |
572 | | int64_t* last_readable_opid_index, |
573 | 641 | const CoarseTimePoint deadline) { |
574 | 641 | OpId op_id{from_op_id.term(), from_op_id.index()}; |
575 | 641 | ScopedTrackedConsumption consumption; |
576 | 641 | CDCSDKProtoRecordPB* proto_record = nullptr; |
577 | 641 | RowMessage* row_message = nullptr; |
578 | 641 | CDCSDKCheckpointPB checkpoint; |
579 | 641 | bool checkpoint_updated = false; |
580 | | |
581 | | // It is snapshot call. |
582 | 641 | if (from_op_id.write_id() == -1) { |
583 | 24 | auto txn_participant = tablet_peer->tablet()->transaction_participant(); |
584 | 24 | ReadHybridTime time; |
585 | 24 | std::string nextKey; |
586 | 24 | SchemaPB schema_pb; |
587 | | // It is first call in snapshot then take snapshot. |
588 | 24 | if ((from_op_id.key().empty()) && (from_op_id.snapshot_time() == 0)) { |
589 | 12 | if (txn_participant == nullptr || txn_participant->context() == nullptr) |
590 | 0 | return STATUS_SUBSTITUTE( |
591 | 12 | Corruption, "Cannot read data as the transaction participant context is null"); |
592 | 12 | tablet::RemoveIntentsData data; |
593 | 12 | RETURN_NOT_OK(txn_participant->context()->GetLastReplicatedData(&data)); |
594 | | // Set the checkpoint and communicate to the follower. |
595 | 12 | VLOG(1) << "The first snapshot term " << data.op_id.term << "index " << data.op_id.index |
596 | 0 | << "time " << data.log_ht.ToUint64(); |
597 | | // Update the CDCConsumerOpId. |
598 | 12 | { |
599 | 12 | std::shared_ptr<consensus::Consensus> shared_consensus = tablet_peer->shared_consensus(); |
600 | 12 | shared_consensus->UpdateCDCConsumerOpId(data.op_id); |
601 | 12 | } |
602 | 12 | if (txn_participant == nullptr || txn_participant->context() == nullptr) { |
603 | 0 | return STATUS_SUBSTITUTE( |
604 | 0 | Corruption, "Cannot read data as the transaction participant context is null"); |
605 | 0 | } |
606 | 12 | RETURN_NOT_OK(txn_participant->context()->GetLastReplicatedData(&data)); |
607 | 12 | time = ReadHybridTime::SingleTime(data.log_ht); |
608 | | |
609 | | // This should go to cdc_state table. |
610 | | // Below condition update the checkpoint in cdc_state table. |
611 | 12 | SetCheckpoint( |
612 | 12 | data.op_id.term, data.op_id.index, -1, "", time.read.ToUint64(), &checkpoint, nullptr); |
613 | 12 | checkpoint_updated = true; |
614 | 12 | } else { |
615 | | // Snapshot is already taken. |
616 | 12 | HybridTime ht; |
617 | 12 | time = ReadHybridTime::FromUint64(from_op_id.snapshot_time()); |
618 | 12 | nextKey = from_op_id.key(); |
619 | 12 | VLOG(1) << "The after snapshot term " << from_op_id.term() << "index " << from_op_id.index() |
620 | 0 | << "key " << from_op_id.key() << "snapshot time " << from_op_id.snapshot_time(); |
621 | | |
622 | 12 | Schema schema = *tablet_peer->tablet()->schema().get(); |
623 | 12 | int limit = FLAGS_cdc_snapshot_batch_size; |
624 | 12 | int fetched = 0; |
625 | 12 | std::vector<QLTableRow> rows; |
626 | 12 | auto iter = VERIFY_RESULT(tablet_peer->tablet()->CreateCDCSnapshotIterator( |
627 | 12 | schema.CopyWithoutColumnIds(), time, nextKey)); |
628 | | |
629 | 0 | QLTableRow row; |
630 | 12 | SchemaToPB(*tablet_peer->tablet()->schema().get(), &schema_pb); |
631 | | |
632 | 12 | proto_record = resp->add_cdc_sdk_proto_records(); |
633 | 12 | row_message = proto_record->mutable_row_message(); |
634 | 12 | row_message->set_op(RowMessage_Op_DDL); |
635 | 12 | row_message->set_table(tablet_peer->tablet()->metadata()->table_name()); |
636 | | |
637 | 12 | FillDDLInfo(row_message, schema_pb, tablet_peer->tablet()->metadata()->schema_version()); |
638 | | |
639 | 25.5k | while (VERIFY_RESULT(iter->HasNext()) && fetched < limit25.5k ) { |
640 | 25.5k | RETURN_NOT_OK(iter->NextRow(&row)); |
641 | 25.5k | RETURN_NOT_OK(PopulateCDCSDKSnapshotRecord(resp, &row, schema, tablet_peer, time)); |
642 | 25.5k | fetched++; |
643 | 25.5k | } |
644 | 12 | docdb::SubDocKey sub_doc_key; |
645 | 12 | RETURN_NOT_OK(iter->GetNextReadSubDocKey(&sub_doc_key)); |
646 | | |
647 | | // Snapshot ends when next key is empty. |
648 | 12 | if (sub_doc_key.doc_key().empty()) { |
649 | 8 | VLOG(1) << "Setting next sub doc key empty "0 ; |
650 | | // Get the checkpoint or read the checkpoint from the table/cache. |
651 | 8 | SetCheckpoint(from_op_id.term(), from_op_id.index(), 0, "", 0, &checkpoint, nullptr); |
652 | 8 | checkpoint_updated = true; |
653 | 8 | } else { |
654 | 4 | VLOG(1) << "Setting next sub doc key is " << sub_doc_key.Encode().ToStringBuffer()0 ; |
655 | | |
656 | 4 | checkpoint.set_write_id(-1); |
657 | 4 | SetCheckpoint( |
658 | 4 | from_op_id.term(), from_op_id.index(), -1, sub_doc_key.Encode().ToStringBuffer(), |
659 | 4 | time.read.ToUint64(), &checkpoint, nullptr); |
660 | 4 | checkpoint_updated = true; |
661 | 4 | } |
662 | 12 | } |
663 | 617 | } else if (!from_op_id.key().empty() && from_op_id.write_id() != 00 ) { |
664 | 0 | std::string reverse_index_key = from_op_id.key(); |
665 | 0 | Slice reverse_index_key_slice(reverse_index_key); |
666 | 0 | std::vector<docdb::IntentKeyValueForCDC> keyValueIntents; |
667 | 0 | docdb::ApplyTransactionState stream_state; |
668 | 0 | stream_state.key = from_op_id.key(); |
669 | 0 | stream_state.write_id = from_op_id.write_id(); |
670 | |
|
671 | 0 | RETURN_NOT_OK(reverse_index_key_slice.consume_byte(docdb::ValueTypeAsChar::kTransactionId)); |
672 | 0 | auto transaction_id = VERIFY_RESULT(DecodeTransactionId(&reverse_index_key_slice)); |
673 | | |
674 | 0 | RETURN_NOT_OK(ProcessIntents( |
675 | 0 | op_id, transaction_id, stream_metadata, resp, &consumption, &checkpoint, tablet_peer, |
676 | 0 | &keyValueIntents, &stream_state, nullptr)); |
677 | | |
678 | 0 | if (checkpoint.write_id() == 0 && checkpoint.key().empty()) { |
679 | 0 | last_streamed_op_id->term = checkpoint.term(); |
680 | 0 | last_streamed_op_id->index = checkpoint.index(); |
681 | 0 | } |
682 | 0 | checkpoint_updated = true; |
683 | 617 | } else { |
684 | 617 | OpId checkpoint_op_id; |
685 | 617 | RequestScope request_scope; |
686 | | |
687 | 617 | auto read_ops = VERIFY_RESULT(tablet_peer->consensus()->ReadReplicatedMessagesForCDC( |
688 | 617 | op_id, last_readable_opid_index, deadline)); |
689 | | |
690 | 617 | if (read_ops.read_from_disk_size && mem_tracker605 ) { |
691 | 605 | consumption = ScopedTrackedConsumption(mem_tracker, read_ops.read_from_disk_size); |
692 | 605 | } |
693 | | |
694 | 617 | auto txn_participant = tablet_peer->tablet()->transaction_participant(); |
695 | 617 | if (txn_participant) { |
696 | 617 | request_scope = RequestScope(txn_participant); |
697 | 617 | } |
698 | | |
699 | 617 | Schema current_schema; |
700 | 617 | bool pending_intents = false; |
701 | 617 | bool schema_streamed = false; |
702 | | |
703 | 8.61k | for (const auto& msg : read_ops.messages) { |
704 | 8.61k | if (!schema_streamed && !(**cached_schema).initialized()6.21k ) { |
705 | 313 | current_schema.CopyFrom(*tablet_peer->tablet()->schema().get()); |
706 | 313 | string table_name = tablet_peer->tablet()->metadata()->table_name(); |
707 | 313 | schema_streamed = true; |
708 | | |
709 | 313 | proto_record = resp->add_cdc_sdk_proto_records(); |
710 | 313 | row_message = proto_record->mutable_row_message(); |
711 | 313 | row_message->set_op(RowMessage_Op_DDL); |
712 | 313 | row_message->set_table(table_name); |
713 | | |
714 | 313 | *cached_schema = std::make_shared<Schema>(std::move(current_schema)); |
715 | 313 | SchemaPB current_schema_pb; |
716 | 313 | SchemaToPB(**cached_schema, ¤t_schema_pb); |
717 | 313 | FillDDLInfo(row_message, |
718 | 313 | current_schema_pb, |
719 | 313 | tablet_peer->tablet()->metadata()->schema_version()); |
720 | 8.30k | } else { |
721 | 8.30k | current_schema = **cached_schema; |
722 | 8.30k | } |
723 | | |
724 | 8.61k | const auto& batch = msg->write().write_batch(); |
725 | | |
726 | 8.61k | switch (msg->op_type()) { |
727 | 147 | case consensus::OperationType::UPDATE_TRANSACTION_OP: |
728 | | // Ignore intents. |
729 | | // Read from IntentDB after they have been applied. |
730 | 147 | if (msg->transaction_state().status() == TransactionStatus::APPLYING) { |
731 | 147 | auto txn_id = |
732 | 147 | VERIFY_RESULT(FullyDecodeTransactionId(msg->transaction_state().transaction_id())); |
733 | 0 | auto result = GetTransactionStatus(txn_id, tablet_peer->Now(), txn_participant); |
734 | 147 | std::vector<docdb::IntentKeyValueForCDC> intents; |
735 | 147 | docdb::ApplyTransactionState new_stream_state; |
736 | | |
737 | 147 | *commit_timestamp = msg->transaction_state().commit_hybrid_time(); |
738 | 147 | op_id.term = msg->id().term(); |
739 | 147 | op_id.index = msg->id().index(); |
740 | 147 | RETURN_NOT_OK(ProcessIntents( |
741 | 147 | op_id, txn_id, stream_metadata, resp, &consumption, &checkpoint, tablet_peer, |
742 | 147 | &intents, &new_stream_state, ¤t_schema)); |
743 | | |
744 | 147 | if (new_stream_state.write_id != 0 && !new_stream_state.key.empty()0 ) { |
745 | 0 | pending_intents = true; |
746 | 147 | } else { |
747 | 147 | last_streamed_op_id->term = msg->id().term(); |
748 | 147 | last_streamed_op_id->index = msg->id().index(); |
749 | 147 | } |
750 | 147 | } |
751 | 147 | checkpoint_updated = true; |
752 | 147 | break; |
753 | | |
754 | 690 | case consensus::OperationType::WRITE_OP: |
755 | 690 | if (!batch.has_transaction()) { |
756 | 412 | RETURN_NOT_OK( |
757 | 412 | PopulateCDCSDKWriteRecord(msg, stream_metadata, tablet_peer, resp, current_schema)); |
758 | | |
759 | 412 | SetCheckpoint( |
760 | 412 | msg->id().term(), msg->id().index(), 0, "", 0, &checkpoint, last_streamed_op_id); |
761 | 412 | checkpoint_updated = true; |
762 | 412 | } |
763 | 690 | break; |
764 | | |
765 | 7.78k | case consensus::OperationType::CHANGE_METADATA_OP: { |
766 | 7.78k | RETURN_NOT_OK(SchemaFromPB(msg->change_metadata_request().schema(), ¤t_schema)); |
767 | 7.78k | string table_name = tablet_peer->tablet()->metadata()->table_name(); |
768 | 7.78k | *cached_schema = std::make_shared<Schema>(std::move(current_schema)); |
769 | 7.78k | if ((resp->cdc_sdk_proto_records_size() > 0 && |
770 | 7.78k | resp->cdc_sdk_proto_records(resp->cdc_sdk_proto_records_size() - 1) |
771 | 7.47k | .row_message() |
772 | 7.47k | .op() == RowMessage_Op_DDL)) { |
773 | 7.47k | if ((resp->cdc_sdk_proto_records(resp->cdc_sdk_proto_records_size() - 1) |
774 | 7.47k | .row_message() |
775 | 7.47k | .schema_version() != msg->change_metadata_request().schema_version())) { |
776 | 0 | RETURN_NOT_OK(PopulateCDCSDKDDLRecord( |
777 | 0 | msg, resp->add_cdc_sdk_proto_records(), table_name, current_schema)); |
778 | 0 | } |
779 | 7.47k | } else { |
780 | 306 | RETURN_NOT_OK(PopulateCDCSDKDDLRecord( |
781 | 306 | msg, resp->add_cdc_sdk_proto_records(), table_name, current_schema)); |
782 | 306 | } |
783 | 7.78k | SetCheckpoint( |
784 | 7.78k | msg->id().term(), msg->id().index(), 0, "", 0, &checkpoint, last_streamed_op_id); |
785 | 7.78k | checkpoint_updated = true; |
786 | 7.78k | } |
787 | 0 | break; |
788 | | |
789 | 0 | case consensus::OperationType::TRUNCATE_OP: { |
790 | 0 | RETURN_NOT_OK( |
791 | 0 | PopulateCDCSDKTruncateRecord(msg, resp->add_cdc_sdk_proto_records(), current_schema)); |
792 | | |
793 | 0 | SetCheckpoint( |
794 | 0 | msg->id().term(), msg->id().index(), 0, "", 0, &checkpoint, last_streamed_op_id); |
795 | 0 | checkpoint_updated = true; |
796 | 0 | } break; |
797 | | |
798 | 0 | default: |
799 | | // Nothing to do for other operation types. |
800 | 0 | break; |
801 | 8.61k | } |
802 | | |
803 | 8.61k | if (pending_intents) break0 ; |
804 | 8.61k | } |
805 | 617 | if (read_ops.messages.size() > 0) |
806 | 617 | *msgs_holder = consensus::ReplicateMsgsHolder( |
807 | 617 | nullptr, std::move(read_ops.messages), std::move(consumption)); |
808 | 617 | } |
809 | | |
810 | 641 | if (consumption) { |
811 | 0 | consumption.Add(resp->SpaceUsedLong()); |
812 | 0 | } |
813 | | |
814 | 641 | checkpoint_updated ? resp->mutable_cdc_sdk_checkpoint()->CopyFrom(checkpoint) |
815 | 641 | : resp->mutable_cdc_sdk_checkpoint()->CopyFrom(from_op_id)0 ; |
816 | | |
817 | 641 | if (checkpoint_updated) { |
818 | 641 | VLOG(1) << "The checkpoint is updated " << resp->checkpoint().DebugString()0 ; |
819 | 641 | } else { |
820 | 0 | VLOG(1) << "The checkpoint is not updated " << resp->checkpoint().DebugString(); |
821 | 0 | } |
822 | | |
823 | 641 | if (last_streamed_op_id->index > 0) { |
824 | 617 | resp->mutable_checkpoint()->mutable_op_id()->set_term(last_streamed_op_id->term); |
825 | 617 | resp->mutable_checkpoint()->mutable_op_id()->set_index(last_streamed_op_id->index); |
826 | 617 | } |
827 | | |
828 | 641 | return Status::OK(); |
829 | 641 | } |
830 | | |
831 | | } // namespace cdc |
832 | | } // namespace yb |