/Users/deen/code/yugabyte-db/ent/src/yb/cdc/cdc_producer.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | |
13 | | #include "yb/cdc/cdc_producer.h" |
14 | | #include "yb/cdc/cdc_common_util.h" |
15 | | |
16 | | #include "yb/cdc/cdc_service.pb.h" |
17 | | #include "yb/common/schema.h" |
18 | | #include "yb/common/transaction.h" |
19 | | #include "yb/common/wire_protocol.h" |
20 | | |
21 | | #include "yb/consensus/raft_consensus.h" |
22 | | #include "yb/consensus/replicate_msgs_holder.h" |
23 | | |
24 | | #include "yb/docdb/doc_key.h" |
25 | | #include "yb/docdb/docdb.pb.h" |
26 | | #include "yb/docdb/primitive_value.h" |
27 | | #include "yb/docdb/value.h" |
28 | | #include "yb/docdb/value_type.h" |
29 | | |
30 | | #include "yb/tablet/tablet.h" |
31 | | #include "yb/tablet/tablet_metadata.h" |
32 | | #include "yb/tablet/tablet_peer.h" |
33 | | #include "yb/tablet/transaction_participant.h" |
34 | | |
35 | | #include "yb/tserver/tablet_server.h" |
36 | | #include "yb/tserver/ts_tablet_manager.h" |
37 | | |
38 | | #include "yb/util/flag_tags.h" |
39 | | #include "yb/util/logging.h" |
40 | | |
41 | | DEFINE_int32(cdc_transaction_timeout_ms, 0, |
42 | | "Don't check for an aborted transaction unless its original write is lagging by this duration."); |
43 | | |
44 | | DEFINE_bool(cdc_enable_replicate_intents, true, |
45 | | "Enable replication of intents before they've been committed."); |
46 | | |
47 | | DEFINE_test_flag(bool, xcluster_simulate_have_more_records, false, |
48 | | "Whether GetChanges should indicate that it has more records for safe time " |
49 | | "calculation."); |
50 | | |
51 | | namespace yb { |
52 | | namespace cdc { |
53 | | |
54 | | using consensus::ReplicateMsgPtr; |
55 | | using consensus::ReplicateMsgs; |
56 | | using docdb::PrimitiveValue; |
57 | | using tablet::TransactionParticipant; |
58 | | |
59 | | void AddColumnToMap(const ColumnSchema& col_schema, |
60 | | const docdb::PrimitiveValue& col, |
61 | 0 | cdc::KeyValuePairPB* kv_pair) { |
62 | 0 | kv_pair->set_key(col_schema.name()); |
63 | 0 | PrimitiveValue::ToQLValuePB(col, col_schema.type(), kv_pair->mutable_value()); |
64 | 0 | } |
65 | | |
66 | | void AddPrimaryKey(const docdb::SubDocKey& decoded_key, |
67 | | const Schema& tablet_schema, |
68 | 0 | CDCRecordPB* record) { |
69 | 0 | size_t i = 0; |
70 | 0 | for (const auto& col : decoded_key.doc_key().hashed_group()) { |
71 | 0 | AddColumnToMap(tablet_schema.column(i), col, record->add_key()); |
72 | 0 | i++; |
73 | 0 | } |
74 | 0 | for (const auto& col : decoded_key.doc_key().range_group()) { |
75 | 0 | AddColumnToMap(tablet_schema.column(i), col, record->add_key()); |
76 | 0 | i++; |
77 | 0 | } |
78 | 0 | } |
79 | | |
80 | | // Set committed record information including commit time for record. |
81 | | // This will look at transaction status to determine commit time to be used for CDC record. |
82 | | // Returns true if we need to stop processing WAL records beyond this, false otherwise. |
83 | | Result<bool> SetCommittedRecordIndexForReplicateMsg( |
84 | | const ReplicateMsgPtr& msg, size_t index, const TxnStatusMap& txn_map, |
85 | 0 | ReplicateIntents replicate_intents, std::vector<RecordTimeIndex>* records) { |
86 | 0 | if (replicate_intents) { |
87 | | // If we're replicating intents, we have no stop condition, so add the record and continue. |
88 | 0 | records->emplace_back(msg->hybrid_time(), index); |
89 | 0 | return false; |
90 | 0 | } |
91 | 0 | switch (msg->op_type()) { |
92 | 0 | case consensus::OperationType::UPDATE_TRANSACTION_OP: { |
93 | 0 | if (msg->transaction_state().status() == TransactionStatus::APPLYING) { |
94 | 0 | records->emplace_back(msg->transaction_state().commit_hybrid_time(), index); |
95 | 0 | } |
96 | | // Ignore other transaction statuses since we only care about APPLYING |
97 | | // while sending CDC records. |
98 | 0 | return false; |
99 | 0 | } |
100 | | |
101 | 0 | case consensus::OperationType::WRITE_OP: { |
102 | 0 | if (msg->write().write_batch().has_transaction()) { |
103 | 0 | auto txn_id = VERIFY_RESULT(FullyDecodeTransactionId( |
104 | 0 | msg->write().write_batch().transaction().transaction_id())); |
105 | 0 | const auto txn_status = txn_map.find(txn_id); |
106 | 0 | if (txn_status == txn_map.end()) { |
107 | 0 | return STATUS(IllegalState, "Unexpected transaction ID", txn_id.ToString()); |
108 | 0 | } |
109 | | |
110 | 0 | if (txn_status->second.status == PENDING || txn_status->second.status == CREATED) { |
111 | | // Ignore all records beyond this because we don't know whether those records |
112 | | // were committed before or after this record without the transaction commit time. |
113 | 0 | return true; |
114 | 0 | } else if (txn_status->second.status == COMMITTED) { |
115 | | // Add record to txn_msgs because there may be records appearing after this in WAL |
116 | | // but committed before this one. Example: |
117 | | // T0: WRITE K1 [TXN1] |
118 | | // T1: WRITE K2 |
119 | | // T2: APPLYING TXN1 |
120 | | // Here, WRITE K2 appears after WRITE K1 but is committed before K1. |
121 | 0 | records->emplace_back(txn_status->second.status_time.ToUint64(), index); |
122 | 0 | } |
123 | 0 | } else { |
124 | | // Msg is not part of transaction. Use write hybrid time from msg itself. |
125 | 0 | records->emplace_back(msg->hybrid_time(), index); |
126 | 0 | } |
127 | 0 | return false; |
128 | 0 | } |
129 | 0 | case consensus::OperationType::SPLIT_OP: { |
130 | 0 | records->emplace_back(msg->hybrid_time(), index); |
131 | 0 | return true; // Don't need to process any records after a SPLIT_OP. |
132 | 0 | } |
133 | | |
134 | 0 | case consensus::OperationType::CHANGE_CONFIG_OP: |
135 | 0 | FALLTHROUGH_INTENDED; |
136 | 0 | case consensus::OperationType::CHANGE_METADATA_OP: |
137 | 0 | FALLTHROUGH_INTENDED; |
138 | 0 | case consensus::OperationType::HISTORY_CUTOFF_OP: |
139 | 0 | FALLTHROUGH_INTENDED; |
140 | 0 | case consensus::OperationType::NO_OP: |
141 | 0 | FALLTHROUGH_INTENDED; |
142 | 0 | case consensus::OperationType::SNAPSHOT_OP: |
143 | 0 | FALLTHROUGH_INTENDED; |
144 | 0 | case consensus::OperationType::TRUNCATE_OP: |
145 | 0 | FALLTHROUGH_INTENDED; |
146 | 0 | case consensus::OperationType::UNKNOWN_OP: |
147 | 0 | return false; |
148 | 0 | } |
149 | 0 | FATAL_INVALID_ENUM_VALUE(consensus::OperationType, msg->op_type()); |
150 | 0 | } |
151 | | |
152 | | Result<std::vector<RecordTimeIndex>> GetCommittedRecordIndexes( |
153 | | const ReplicateMsgs& msgs, const TxnStatusMap& txn_map, ReplicateIntents replicate_intents, |
154 | 0 | OpId* checkpoint) { |
155 | 0 | size_t index = 0; |
156 | 0 | std::vector<RecordTimeIndex> records; |
157 | | |
158 | | // Order ReplicateMsgs based on commit time. |
159 | 0 | for (const auto &msg : msgs) { |
160 | 0 | if (!msg->write().has_external_hybrid_time()) { |
161 | | // If the message came from an external source, ignore it when producing change list. |
162 | | // Note that checkpoint, however, will be updated and will account for external message too. |
163 | 0 | bool stop = VERIFY_RESULT(SetCommittedRecordIndexForReplicateMsg( |
164 | 0 | msg, index, txn_map, replicate_intents, &records)); |
165 | 0 | if (stop) { |
166 | 0 | return records; |
167 | 0 | } |
168 | 0 | } |
169 | 0 | *checkpoint = OpId::FromPB(msg->id()); |
170 | 0 | index++; |
171 | 0 | } |
172 | 0 | return records; |
173 | 0 | } |
174 | | |
175 | | // Filter out WAL records that are external and order records based on transaction commit time. |
176 | | // Records in WAL don't represent the exact order in which records are written in DB due to delay |
177 | | // in writing txn APPLYING record. |
178 | | // Consider the following WAL entries: |
179 | | // TO: WRITE K0 |
180 | | // T1: WRITE K1 (TXN1) |
181 | | // T2: WRITE K2 (TXN2) |
182 | | // T3: WRITE K3 |
183 | | // T4: APPLYING TXN2 |
184 | | // T5: APPLYING TXN1 |
185 | | // T6: WRITE K4 |
186 | | // The order in which keys are written to DB in this example is K0, K3, K2, K1, K4. |
187 | | // This method will also set checkpoint to the op id of last processed record. |
188 | | Result<ReplicateMsgs> FilterAndSortWrites(const ReplicateMsgs& msgs, |
189 | | const TxnStatusMap& txn_map, |
190 | | ReplicateIntents replicate_intents, |
191 | 0 | OpId* checkpoint) { |
192 | 0 | std::vector<RecordTimeIndex> records = VERIFY_RESULT(GetCommittedRecordIndexes( |
193 | 0 | msgs, txn_map, replicate_intents, checkpoint)); |
194 | | |
195 | 0 | if (!replicate_intents) { |
196 | 0 | std::sort(records.begin(), records.end()); |
197 | 0 | } |
198 | |
|
199 | 0 | ReplicateMsgs ordered_msgs; |
200 | 0 | ordered_msgs.reserve(records.size()); |
201 | 0 | for (const auto& record : records) { |
202 | 0 | ordered_msgs.emplace_back(msgs[record.second]); |
203 | 0 | } |
204 | 0 | return ordered_msgs; |
205 | 0 | } |
206 | | |
207 | | Result<TransactionStatusResult> GetTransactionStatus( |
208 | | const TransactionId& txn_id, |
209 | | const HybridTime& hybrid_time, |
210 | 147 | TransactionParticipant* txn_participant) { |
211 | 147 | static const std::string reason = "cdc"; |
212 | | |
213 | 147 | std::promise<Result<TransactionStatusResult>> txn_status_promise; |
214 | 147 | auto future = txn_status_promise.get_future(); |
215 | 147 | auto callback = [&txn_status_promise](Result<TransactionStatusResult> result) { |
216 | 147 | txn_status_promise.set_value(std::move(result)); |
217 | 147 | }; |
218 | | |
219 | 147 | txn_participant->RequestStatusAt( |
220 | 147 | {&txn_id, hybrid_time, hybrid_time, 0, &reason, TransactionLoadFlags{}, callback}); |
221 | 147 | future.wait(); |
222 | 147 | return future.get(); |
223 | 147 | } |
224 | | |
225 | | // Build transaction status as of hybrid_time. |
226 | | Result<TxnStatusMap> BuildTxnStatusMap(const ReplicateMsgs& messages, |
227 | | bool more_replicate_msgs, |
228 | | const HybridTime& cdc_read_hybrid_time, |
229 | 0 | TransactionParticipant* txn_participant) { |
230 | 0 | TxnStatusMap txn_map; |
231 | | // First go through all APPLYING records and mark transaction as committed. |
232 | 0 | for (const auto& msg : messages) { |
233 | 0 | if (msg->op_type() == consensus::OperationType::UPDATE_TRANSACTION_OP |
234 | 0 | && msg->transaction_state().status() == TransactionStatus::APPLYING) { |
235 | 0 | auto txn_id = VERIFY_RESULT(FullyDecodeTransactionId( |
236 | 0 | msg->transaction_state().transaction_id())); |
237 | 0 | txn_map.emplace(txn_id, |
238 | 0 | TransactionStatusResult( |
239 | 0 | TransactionStatus::COMMITTED, |
240 | 0 | HybridTime(msg->transaction_state().commit_hybrid_time()))); |
241 | 0 | } |
242 | 0 | } |
243 | | |
244 | | // Now go through all WRITE_OP records and get transaction status of records for which |
245 | | // corresponding APPLYING record does not exist in WAL as yet. |
246 | 0 | for (const auto& msg : messages) { |
247 | 0 | if (msg->op_type() == consensus::OperationType::WRITE_OP |
248 | 0 | && msg->write().write_batch().has_transaction()) { |
249 | 0 | auto txn_id = VERIFY_RESULT(FullyDecodeTransactionId( |
250 | 0 | msg->write().write_batch().transaction().transaction_id())); |
251 | | |
252 | 0 | if (!txn_map.count(txn_id)) { |
253 | 0 | TransactionStatusResult txn_status(TransactionStatus::PENDING, HybridTime::kMin); |
254 | |
|
255 | 0 | auto result = GetTransactionStatus(txn_id, cdc_read_hybrid_time, txn_participant); |
256 | 0 | if (!result.ok()) { |
257 | 0 | if (result.status().IsNotFound()) { |
258 | | // Naive heuristic for handling whether a transaction is aborted or still pending: |
259 | | // 1. If the normal transaction timeout is not reached, assume good operation. |
260 | | // 2. If more_replicate_messages, assume a race between reading |
261 | | // TransactionParticipant & LogCache. |
262 | | // TODO (#2405) : Handle long running or very large transactions correctly. |
263 | 0 | if (!more_replicate_msgs) { |
264 | 0 | auto timeout = HybridTime::FromPB(msg->hybrid_time()) |
265 | 0 | .AddMilliseconds(FLAGS_cdc_transaction_timeout_ms); |
266 | 0 | if (timeout < cdc_read_hybrid_time) { |
267 | 0 | LOG(INFO) << "Transaction not found, considering it aborted: " << txn_id; |
268 | 0 | txn_status = TransactionStatusResult::Aborted(); |
269 | 0 | } |
270 | 0 | } |
271 | 0 | } else { |
272 | 0 | return result.status(); |
273 | 0 | } |
274 | 0 | } else { |
275 | 0 | txn_status = *result; |
276 | 0 | } |
277 | 0 | txn_map.emplace(txn_id, txn_status); |
278 | 0 | } |
279 | 0 | } |
280 | 0 | } |
281 | 0 | return txn_map; |
282 | 0 | } |
283 | | |
284 | | CHECKED_STATUS SetRecordTime(const TransactionId& txn_id, |
285 | | const TxnStatusMap& txn_map, |
286 | 0 | CDCRecordPB* record) { |
287 | 0 | auto txn_status = txn_map.find(txn_id); |
288 | 0 | if (txn_status == txn_map.end()) { |
289 | 0 | return STATUS(IllegalState, "Unexpected transaction ID", txn_id.ToString()); |
290 | 0 | } |
291 | 0 | record->set_time(txn_status->second.status_time.ToUint64()); |
292 | 0 | return Status::OK(); |
293 | 0 | } |
294 | | |
295 | | // Populate CDC record corresponding to WAL batch in ReplicateMsg. |
296 | | CHECKED_STATUS PopulateWriteRecord(const ReplicateMsgPtr& msg, |
297 | | const TxnStatusMap& txn_map, |
298 | | const StreamMetadata& metadata, |
299 | | const std::shared_ptr<tablet::TabletPeer>& tablet_peer, |
300 | | ReplicateIntents replicate_intents, |
301 | 0 | GetChangesResponsePB* resp) { |
302 | 0 | const auto& batch = msg->write().write_batch(); |
303 | 0 | const auto& schema = *tablet_peer->tablet()->schema(); |
304 | | // Write batch may contain records from different rows. |
305 | | // For CDC, we need to split the batch into 1 CDC record per row of the table. |
306 | | // We'll use DocDB key hash to identify the records that belong to the same row. |
307 | 0 | Slice prev_key; |
308 | 0 | CDCRecordPB* record = nullptr; |
309 | 0 | for (const auto& write_pair : batch.write_pairs()) { |
310 | 0 | Slice key = write_pair.key(); |
311 | 0 | const auto key_size = VERIFY_RESULT( |
312 | 0 | docdb::DocKey::EncodedSize(key, docdb::DocKeyPart::kWholeDocKey)); |
313 | | |
314 | 0 | Slice value = write_pair.value(); |
315 | 0 | docdb::Value decoded_value; |
316 | 0 | RETURN_NOT_OK(decoded_value.Decode(value)); |
317 | | |
318 | | // Compare key hash with previously seen key hash to determine whether the write pair |
319 | | // is part of the same row or not. |
320 | 0 | Slice primary_key(key.data(), key_size); |
321 | 0 | if (prev_key != primary_key) { |
322 | | // Write pair contains record for different row. Create a new CDCRecord in this case. |
323 | 0 | record = resp->add_records(); |
324 | 0 | Slice sub_doc_key = key; |
325 | 0 | docdb::SubDocKey decoded_key; |
326 | 0 | RETURN_NOT_OK(decoded_key.DecodeFrom(&sub_doc_key, docdb::HybridTimeRequired::kFalse)); |
327 | | |
328 | 0 | if (metadata.record_format == CDCRecordFormat::WAL) { |
329 | | // For 2DC, populate serialized data from WAL, to avoid unnecessary deserializing on |
330 | | // producer and re-serializing on consumer. |
331 | 0 | auto kv_pair = record->add_key(); |
332 | 0 | kv_pair->set_key(std::to_string(decoded_key.doc_key().hash())); |
333 | 0 | kv_pair->mutable_value()->set_binary_value(write_pair.key()); |
334 | 0 | } else { |
335 | 0 | AddPrimaryKey(decoded_key, schema, record); |
336 | 0 | } |
337 | | |
338 | | // Check whether operation is WRITE or DELETE. |
339 | 0 | if (decoded_value.value_type() == docdb::ValueType::kTombstone && |
340 | 0 | decoded_key.num_subkeys() == 0) { |
341 | 0 | record->set_operation(CDCRecordPB::DELETE); |
342 | 0 | } else { |
343 | 0 | record->set_operation(CDCRecordPB::WRITE); |
344 | 0 | } |
345 | | |
346 | | // Process intent records. |
347 | 0 | record->set_time(msg->hybrid_time()); |
348 | 0 | if (batch.has_transaction()) { |
349 | 0 | if (!replicate_intents) { |
350 | 0 | auto txn_id = VERIFY_RESULT(FullyDecodeTransactionId( |
351 | 0 | batch.transaction().transaction_id())); |
352 | | // If we're not replicating intents, set record time using the transaction map. |
353 | 0 | RETURN_NOT_OK(SetRecordTime(txn_id, txn_map, record)); |
354 | 0 | } else { |
355 | 0 | record->mutable_transaction_state()->set_transaction_id( |
356 | 0 | batch.transaction().transaction_id()); |
357 | 0 | record->mutable_transaction_state()->add_tablets(tablet_peer->tablet_id()); |
358 | 0 | } |
359 | 0 | } |
360 | 0 | } |
361 | 0 | prev_key = primary_key; |
362 | 0 | DCHECK(record); |
363 | |
|
364 | 0 | if (metadata.record_format == CDCRecordFormat::WAL) { |
365 | 0 | auto kv_pair = record->add_changes(); |
366 | 0 | kv_pair->set_key(write_pair.key()); |
367 | 0 | kv_pair->mutable_value()->set_binary_value(write_pair.value()); |
368 | 0 | } else if (record->operation() == CDCRecordPB_OperationType_WRITE) { |
369 | 0 | PrimitiveValue column_id; |
370 | 0 | Slice key_column = write_pair.key().data() + key_size; |
371 | 0 | RETURN_NOT_OK(PrimitiveValue::DecodeKey(&key_column, &column_id)); |
372 | 0 | if (column_id.value_type() == docdb::ValueType::kColumnId) { |
373 | 0 | const ColumnSchema& col = VERIFY_RESULT(schema.column_by_id(column_id.GetColumnId())); |
374 | 0 | AddColumnToMap(col, decoded_value.primitive_value(), record->add_changes()); |
375 | 0 | } else if (column_id.value_type() != docdb::ValueType::kSystemColumnId) { |
376 | 0 | LOG(DFATAL) << "Unexpected value type in key: " << column_id.value_type(); |
377 | 0 | } |
378 | 0 | } |
379 | 0 | } |
380 | 0 | return Status::OK(); |
381 | 0 | } |
382 | | |
383 | | // Populate CDC record corresponding to WAL UPDATE_TRANSACTION_OP entry. |
384 | | CHECKED_STATUS PopulateTransactionRecord(const ReplicateMsgPtr& msg, |
385 | | const std::shared_ptr<tablet::TabletPeer>& tablet_peer, |
386 | | ReplicateIntents replicate_intents, |
387 | 0 | CDCRecordPB* record) { |
388 | 0 | SCHECK(msg->has_transaction_state(), InvalidArgument, |
389 | 0 | Format("Update transaction message requires transaction_state: $0", |
390 | 0 | msg->ShortDebugString())); |
391 | 0 | record->set_operation(CDCRecordPB_OperationType_WRITE); |
392 | 0 | record->set_time(replicate_intents ? |
393 | 0 | msg->hybrid_time() : msg->transaction_state().commit_hybrid_time()); |
394 | 0 | record->mutable_transaction_state()->CopyFrom(msg->transaction_state()); |
395 | 0 | if (replicate_intents && msg->transaction_state().status() == TransactionStatus::APPLYING) { |
396 | | // Add the partition metadata so the consumer knows which tablets to apply the transaction |
397 | | // to. |
398 | 0 | tablet_peer->tablet()->metadata()->partition()->ToPB(record->mutable_partition()); |
399 | 0 | } |
400 | 0 | return Status::OK(); |
401 | 0 | } |
402 | | |
403 | 0 | CHECKED_STATUS PopulateSplitOpRecord(const ReplicateMsgPtr& msg, CDCRecordPB* record) { |
404 | 0 | SCHECK(msg->has_split_request(), InvalidArgument, |
405 | 0 | Format("Split op message requires split_request: $0", msg->ShortDebugString())); |
406 | 0 | record->set_operation(CDCRecordPB::SPLIT_OP); |
407 | 0 | record->set_time(msg->hybrid_time()); |
408 | 0 | record->mutable_split_tablet_request()->CopyFrom(msg->split_request()); |
409 | 0 | return Status::OK(); |
410 | 0 | } |
411 | | |
412 | | Result<HybridTime> GetSafeTimeForTarget(const std::shared_ptr<tablet::TabletPeer>& tablet_peer, |
413 | | HybridTime ht_of_last_returned_message, |
414 | 0 | HaveMoreMessages have_more_messages) { |
415 | 0 | if (ht_of_last_returned_message != HybridTime::kInvalid && have_more_messages) { |
416 | 0 | return ht_of_last_returned_message; |
417 | 0 | } |
418 | 0 | return tablet_peer->LeaderSafeTime(); |
419 | 0 | } |
420 | | |
421 | | Status GetChangesForXCluster(const std::string& stream_id, |
422 | | const std::string& tablet_id, |
423 | | const OpId& from_op_id, |
424 | | const StreamMetadata& stream_metadata, |
425 | | const std::shared_ptr<tablet::TabletPeer>& tablet_peer, |
426 | | const MemTrackerPtr& mem_tracker, |
427 | | consensus::ReplicateMsgsHolder* msgs_holder, |
428 | | GetChangesResponsePB* resp, |
429 | | int64_t* last_readable_opid_index, |
430 | 0 | const CoarseTimePoint deadline) { |
431 | 0 | auto replicate_intents = ReplicateIntents(GetAtomicFlag(&FLAGS_cdc_enable_replicate_intents)); |
432 | | // Request scope on transaction participant so that transactions are not removed from participant |
433 | | // while RequestScope is active. |
434 | 0 | RequestScope request_scope; |
435 | |
|
436 | 0 | auto read_ops = VERIFY_RESULT(tablet_peer->consensus()-> |
437 | 0 | ReadReplicatedMessagesForCDC(from_op_id, last_readable_opid_index, deadline)); |
438 | 0 | ScopedTrackedConsumption consumption; |
439 | 0 | if (read_ops.read_from_disk_size && mem_tracker) { |
440 | 0 | consumption = ScopedTrackedConsumption(mem_tracker, read_ops.read_from_disk_size); |
441 | 0 | } |
442 | |
|
443 | 0 | OpId checkpoint; |
444 | 0 | TxnStatusMap txn_map; |
445 | 0 | if (!replicate_intents) { |
446 | 0 | auto txn_participant = tablet_peer->tablet()->transaction_participant(); |
447 | 0 | if (txn_participant) { |
448 | 0 | request_scope = RequestScope(txn_participant); |
449 | 0 | } |
450 | 0 | txn_map = TxnStatusMap(VERIFY_RESULT(BuildTxnStatusMap( |
451 | 0 | read_ops.messages, read_ops.have_more_messages, tablet_peer->Now(), txn_participant))); |
452 | 0 | } |
453 | 0 | ReplicateMsgs messages = VERIFY_RESULT(FilterAndSortWrites( |
454 | 0 | read_ops.messages, txn_map, replicate_intents, &checkpoint)); |
455 | | |
456 | 0 | for (const auto& msg : messages) { |
457 | 0 | switch (msg->op_type()) { |
458 | 0 | case consensus::OperationType::UPDATE_TRANSACTION_OP: |
459 | 0 | if (!replicate_intents) { |
460 | 0 | RETURN_NOT_OK(PopulateTransactionRecord( |
461 | 0 | msg, tablet_peer, replicate_intents, resp->add_records())); |
462 | 0 | } else if (msg->transaction_state().status() == TransactionStatus::APPLYING) { |
463 | 0 | auto record = resp->add_records(); |
464 | 0 | record->set_operation(CDCRecordPB::APPLY); |
465 | 0 | record->set_time(msg->hybrid_time()); |
466 | 0 | auto* txn_state = record->mutable_transaction_state(); |
467 | 0 | txn_state->set_transaction_id(msg->transaction_state().transaction_id()); |
468 | 0 | txn_state->set_commit_hybrid_time(msg->transaction_state().commit_hybrid_time()); |
469 | 0 | tablet_peer->tablet()->metadata()->partition()->ToPB(record->mutable_partition()); |
470 | 0 | } |
471 | 0 | break; |
472 | 0 | case consensus::OperationType::WRITE_OP: |
473 | 0 | RETURN_NOT_OK(PopulateWriteRecord(msg, txn_map, stream_metadata, tablet_peer, |
474 | 0 | replicate_intents, resp)); |
475 | 0 | break; |
476 | 0 | case consensus::OperationType::SPLIT_OP: |
477 | 0 | RETURN_NOT_OK(PopulateSplitOpRecord(msg, resp->add_records())); |
478 | 0 | break; |
479 | | |
480 | 0 | default: |
481 | | // Nothing to do for other operation types. |
482 | 0 | break; |
483 | 0 | } |
484 | 0 | } |
485 | | |
486 | 0 | if (consumption) { |
487 | 0 | consumption.Add(resp->SpaceUsedLong()); |
488 | 0 | } |
489 | 0 | auto ht_of_last_returned_message = messages.empty() ? |
490 | 0 | HybridTime::kInvalid : HybridTime(messages.back()->hybrid_time()); |
491 | 0 | auto have_more_messages = PREDICT_FALSE(FLAGS_TEST_xcluster_simulate_have_more_records) ? |
492 | 0 | HaveMoreMessages::kTrue : read_ops.have_more_messages; |
493 | 0 | auto safe_time_result = GetSafeTimeForTarget( |
494 | 0 | tablet_peer, ht_of_last_returned_message, have_more_messages); |
495 | 0 | if (safe_time_result.ok()) { |
496 | 0 | resp->set_safe_hybrid_time((*safe_time_result).ToUint64()); |
497 | 0 | } else { |
498 | 0 | YB_LOG_EVERY_N_SECS(WARNING, 10) << |
499 | 0 | "Could not compute safe time: " << safe_time_result.status(); |
500 | 0 | } |
501 | 0 | *msgs_holder = consensus::ReplicateMsgsHolder( |
502 | 0 | nullptr, std::move(messages), std::move(consumption)); |
503 | 0 | (checkpoint.index > 0 ? checkpoint : from_op_id).ToPB( |
504 | 0 | resp->mutable_checkpoint()->mutable_op_id()); |
505 | 0 | return Status::OK(); |
506 | 0 | } |
507 | | |
508 | | } // namespace cdc |
509 | | } // namespace yb |