/Users/deen/code/yugabyte-db/ent/src/yb/cdc/cdc_producer.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | |
13 | | #include "yb/cdc/cdc_producer.h" |
14 | | #include "yb/cdc/cdc_common_util.h" |
15 | | |
16 | | #include "yb/cdc/cdc_service.pb.h" |
17 | | #include "yb/common/schema.h" |
18 | | #include "yb/common/transaction.h" |
19 | | #include "yb/common/wire_protocol.h" |
20 | | |
21 | | #include "yb/consensus/raft_consensus.h" |
22 | | #include "yb/consensus/replicate_msgs_holder.h" |
23 | | |
24 | | #include "yb/docdb/doc_key.h" |
25 | | #include "yb/docdb/docdb.pb.h" |
26 | | #include "yb/docdb/primitive_value.h" |
27 | | #include "yb/docdb/value.h" |
28 | | #include "yb/docdb/value_type.h" |
29 | | |
30 | | #include "yb/tablet/tablet.h" |
31 | | #include "yb/tablet/tablet_metadata.h" |
32 | | #include "yb/tablet/tablet_peer.h" |
33 | | #include "yb/tablet/transaction_participant.h" |
34 | | |
35 | | #include "yb/tserver/tablet_server.h" |
36 | | #include "yb/tserver/ts_tablet_manager.h" |
37 | | |
38 | | DEFINE_int32(cdc_transaction_timeout_ms, 0, |
39 | | "Don't check for an aborted transaction unless its original write is lagging by this duration."); |
40 | | |
41 | | // Todo(Rahul): Enable this by default (https://github.com/yugabyte/yugabyte-db/issues/6128) |
42 | | DEFINE_bool(cdc_enable_replicate_intents, true, |
43 | | "Enable replication of intents before they've been committed."); |
44 | | |
45 | | namespace yb { |
46 | | namespace cdc { |
47 | | |
48 | | using consensus::ReplicateMsgPtr; |
49 | | using consensus::ReplicateMsgs; |
50 | | using docdb::PrimitiveValue; |
51 | | using tablet::TransactionParticipant; |
52 | | |
53 | | void AddColumnToMap(const ColumnSchema& col_schema, |
54 | | const docdb::PrimitiveValue& col, |
55 | 0 | cdc::KeyValuePairPB* kv_pair) { |
56 | 0 | kv_pair->set_key(col_schema.name()); |
57 | 0 | PrimitiveValue::ToQLValuePB(col, col_schema.type(), kv_pair->mutable_value()); |
58 | 0 | } |
59 | | |
60 | | void AddPrimaryKey(const docdb::SubDocKey& decoded_key, |
61 | | const Schema& tablet_schema, |
62 | 0 | CDCRecordPB* record) { |
63 | 0 | size_t i = 0; |
64 | 0 | for (const auto& col : decoded_key.doc_key().hashed_group()) { |
65 | 0 | AddColumnToMap(tablet_schema.column(i), col, record->add_key()); |
66 | 0 | i++; |
67 | 0 | } |
68 | 0 | for (const auto& col : decoded_key.doc_key().range_group()) { |
69 | 0 | AddColumnToMap(tablet_schema.column(i), col, record->add_key()); |
70 | 0 | i++; |
71 | 0 | } |
72 | 0 | } |
73 | | |
74 | | // Set committed record information including commit time for record. |
75 | | // This will look at transaction status to determine commit time to be used for CDC record. |
76 | | // Returns true if we need to stop processing WAL records beyond this, false otherwise. |
77 | | Result<bool> SetCommittedRecordIndexForReplicateMsg( |
78 | | const ReplicateMsgPtr& msg, size_t index, const TxnStatusMap& txn_map, |
79 | 0 | ReplicateIntents replicate_intents, std::vector<RecordTimeIndex>* records) { |
80 | 0 | if (replicate_intents) { |
81 | | // If we're replicating intents, we have no stop condition, so add the record and continue. |
82 | 0 | records->emplace_back(msg->hybrid_time(), index); |
83 | 0 | return false; |
84 | 0 | } |
85 | 0 | switch (msg->op_type()) { |
86 | 0 | case consensus::OperationType::UPDATE_TRANSACTION_OP: { |
87 | 0 | if (msg->transaction_state().status() == TransactionStatus::APPLYING) { |
88 | 0 | records->emplace_back(msg->transaction_state().commit_hybrid_time(), index); |
89 | 0 | } |
90 | | // Ignore other transaction statuses since we only care about APPLYING |
91 | | // while sending CDC records. |
92 | 0 | return false; |
93 | 0 | } |
94 | | |
95 | 0 | case consensus::OperationType::WRITE_OP: { |
96 | 0 | if (msg->write().write_batch().has_transaction()) { |
97 | 0 | auto txn_id = VERIFY_RESULT(FullyDecodeTransactionId( |
98 | 0 | msg->write().write_batch().transaction().transaction_id())); |
99 | 0 | const auto txn_status = txn_map.find(txn_id); |
100 | 0 | if (txn_status == txn_map.end()) { |
101 | 0 | return STATUS(IllegalState, "Unexpected transaction ID", txn_id.ToString()); |
102 | 0 | } |
103 | | |
104 | 0 | if (txn_status->second.status == PENDING || txn_status->second.status == CREATED) { |
105 | | // Ignore all records beyond this because we don't know whether those records |
106 | | // were committed before or after this record without the transaction commit time. |
107 | 0 | return true; |
108 | 0 | } else if (txn_status->second.status == COMMITTED) { |
109 | | // Add record to txn_msgs because there may be records appearing after this in WAL |
110 | | // but committed before this one. Example: |
111 | | // T0: WRITE K1 [TXN1] |
112 | | // T1: WRITE K2 |
113 | | // T2: APPLYING TXN1 |
114 | | // Here, WRITE K2 appears after WRITE K1 but is committed before K1. |
115 | 0 | records->emplace_back(txn_status->second.status_time.ToUint64(), index); |
116 | 0 | } |
117 | 0 | } else { |
118 | | // Msg is not part of transaction. Use write hybrid time from msg itself. |
119 | 0 | records->emplace_back(msg->hybrid_time(), index); |
120 | 0 | } |
121 | 0 | return false; |
122 | 0 | } |
123 | 0 | case consensus::OperationType::SPLIT_OP: { |
124 | 0 | records->emplace_back(msg->hybrid_time(), index); |
125 | 0 | return true; // Don't need to process any records after a SPLIT_OP. |
126 | 0 | } |
127 | |
|
128 | 0 | case consensus::OperationType::CHANGE_CONFIG_OP: |
129 | 0 | FALLTHROUGH_INTENDED; |
130 | 0 | case consensus::OperationType::CHANGE_METADATA_OP: |
131 | 0 | FALLTHROUGH_INTENDED; |
132 | 0 | case consensus::OperationType::HISTORY_CUTOFF_OP: |
133 | 0 | FALLTHROUGH_INTENDED; |
134 | 0 | case consensus::OperationType::NO_OP: |
135 | 0 | FALLTHROUGH_INTENDED; |
136 | 0 | case consensus::OperationType::SNAPSHOT_OP: |
137 | 0 | FALLTHROUGH_INTENDED; |
138 | 0 | case consensus::OperationType::TRUNCATE_OP: |
139 | 0 | FALLTHROUGH_INTENDED; |
140 | 0 | case consensus::OperationType::UNKNOWN_OP: |
141 | 0 | return false; |
142 | 0 | } |
143 | 0 | FATAL_INVALID_ENUM_VALUE(consensus::OperationType, msg->op_type()); |
144 | 0 | } |
145 | | |
146 | | Result<std::vector<RecordTimeIndex>> GetCommittedRecordIndexes( |
147 | | const ReplicateMsgs& msgs, const TxnStatusMap& txn_map, ReplicateIntents replicate_intents, |
148 | 0 | OpId* checkpoint) { |
149 | 0 | size_t index = 0; |
150 | 0 | std::vector<RecordTimeIndex> records; |
151 | | |
152 | | // Order ReplicateMsgs based on commit time. |
153 | 0 | for (const auto &msg : msgs) { |
154 | 0 | if (!msg->write().has_external_hybrid_time()) { |
155 | | // If the message came from an external source, ignore it when producing change list. |
156 | | // Note that checkpoint, however, will be updated and will account for external message too. |
157 | 0 | bool stop = VERIFY_RESULT(SetCommittedRecordIndexForReplicateMsg( |
158 | 0 | msg, index, txn_map, replicate_intents, &records)); |
159 | 0 | if (stop) { |
160 | 0 | return records; |
161 | 0 | } |
162 | 0 | } |
163 | 0 | *checkpoint = OpId::FromPB(msg->id()); |
164 | 0 | index++; |
165 | 0 | } |
166 | 0 | return records; |
167 | 0 | } |
168 | | |
169 | | // Filter out WAL records that are external and order records based on transaction commit time. |
170 | | // Records in WAL don't represent the exact order in which records are written in DB due to delay |
171 | | // in writing txn APPLYING record. |
172 | | // Consider the following WAL entries: |
173 | | // TO: WRITE K0 |
174 | | // T1: WRITE K1 (TXN1) |
175 | | // T2: WRITE K2 (TXN2) |
176 | | // T3: WRITE K3 |
177 | | // T4: APPLYING TXN2 |
178 | | // T5: APPLYING TXN1 |
179 | | // T6: WRITE K4 |
180 | | // The order in which keys are written to DB in this example is K0, K3, K2, K1, K4. |
181 | | // This method will also set checkpoint to the op id of last processed record. |
182 | | Result<ReplicateMsgs> FilterAndSortWrites(const ReplicateMsgs& msgs, |
183 | | const TxnStatusMap& txn_map, |
184 | | ReplicateIntents replicate_intents, |
185 | 0 | OpId* checkpoint) { |
186 | 0 | std::vector<RecordTimeIndex> records = VERIFY_RESULT(GetCommittedRecordIndexes( |
187 | 0 | msgs, txn_map, replicate_intents, checkpoint)); |
188 | |
|
189 | 0 | if (!replicate_intents) { |
190 | 0 | std::sort(records.begin(), records.end()); |
191 | 0 | } |
192 | |
|
193 | 0 | ReplicateMsgs ordered_msgs; |
194 | 0 | ordered_msgs.reserve(records.size()); |
195 | 0 | for (const auto& record : records) { |
196 | 0 | ordered_msgs.emplace_back(msgs[record.second]); |
197 | 0 | } |
198 | 0 | return ordered_msgs; |
199 | 0 | } |
200 | | |
201 | | Result<TransactionStatusResult> GetTransactionStatus( |
202 | | const TransactionId& txn_id, |
203 | | const HybridTime& hybrid_time, |
204 | 74 | TransactionParticipant* txn_participant) { |
205 | 74 | static const std::string reason = "cdc"; |
206 | | |
207 | 74 | std::promise<Result<TransactionStatusResult>> txn_status_promise; |
208 | 74 | auto future = txn_status_promise.get_future(); |
209 | 74 | auto callback = [&txn_status_promise](Result<TransactionStatusResult> result) { |
210 | 74 | txn_status_promise.set_value(std::move(result)); |
211 | 74 | }; |
212 | | |
213 | 74 | txn_participant->RequestStatusAt( |
214 | 74 | {&txn_id, hybrid_time, hybrid_time, 0, &reason, TransactionLoadFlags{}, callback}); |
215 | 74 | future.wait(); |
216 | 74 | return future.get(); |
217 | 74 | } |
218 | | |
219 | | // Build transaction status as of hybrid_time. |
220 | | Result<TxnStatusMap> BuildTxnStatusMap(const ReplicateMsgs& messages, |
221 | | bool more_replicate_msgs, |
222 | | const HybridTime& cdc_read_hybrid_time, |
223 | 0 | TransactionParticipant* txn_participant) { |
224 | 0 | TxnStatusMap txn_map; |
225 | | // First go through all APPLYING records and mark transaction as committed. |
226 | 0 | for (const auto& msg : messages) { |
227 | 0 | if (msg->op_type() == consensus::OperationType::UPDATE_TRANSACTION_OP |
228 | 0 | && msg->transaction_state().status() == TransactionStatus::APPLYING) { |
229 | 0 | auto txn_id = VERIFY_RESULT(FullyDecodeTransactionId( |
230 | 0 | msg->transaction_state().transaction_id())); |
231 | 0 | txn_map.emplace(txn_id, |
232 | 0 | TransactionStatusResult( |
233 | 0 | TransactionStatus::COMMITTED, |
234 | 0 | HybridTime(msg->transaction_state().commit_hybrid_time()))); |
235 | 0 | } |
236 | 0 | } |
237 | | |
238 | | // Now go through all WRITE_OP records and get transaction status of records for which |
239 | | // corresponding APPLYING record does not exist in WAL as yet. |
240 | 0 | for (const auto& msg : messages) { |
241 | 0 | if (msg->op_type() == consensus::OperationType::WRITE_OP |
242 | 0 | && msg->write().write_batch().has_transaction()) { |
243 | 0 | auto txn_id = VERIFY_RESULT(FullyDecodeTransactionId( |
244 | 0 | msg->write().write_batch().transaction().transaction_id())); |
245 | |
|
246 | 0 | if (!txn_map.count(txn_id)) { |
247 | 0 | TransactionStatusResult txn_status(TransactionStatus::PENDING, HybridTime::kMin); |
248 | |
|
249 | 0 | auto result = GetTransactionStatus(txn_id, cdc_read_hybrid_time, txn_participant); |
250 | 0 | if (!result.ok()) { |
251 | 0 | if (result.status().IsNotFound()) { |
252 | | // Naive heuristic for handling whether a transaction is aborted or still pending: |
253 | | // 1. If the normal transaction timeout is not reached, assume good operation. |
254 | | // 2. If more_replicate_messages, assume a race between reading |
255 | | // TransactionParticipant & LogCache. |
256 | | // TODO (#2405) : Handle long running or very large transactions correctly. |
257 | 0 | if (!more_replicate_msgs) { |
258 | 0 | auto timeout = HybridTime::FromPB(msg->hybrid_time()) |
259 | 0 | .AddMilliseconds(FLAGS_cdc_transaction_timeout_ms); |
260 | 0 | if (timeout < cdc_read_hybrid_time) { |
261 | 0 | LOG(INFO) << "Transaction not found, considering it aborted: " << txn_id; |
262 | 0 | txn_status = TransactionStatusResult::Aborted(); |
263 | 0 | } |
264 | 0 | } |
265 | 0 | } else { |
266 | 0 | return result.status(); |
267 | 0 | } |
268 | 0 | } else { |
269 | 0 | txn_status = *result; |
270 | 0 | } |
271 | 0 | txn_map.emplace(txn_id, txn_status); |
272 | 0 | } |
273 | 0 | } |
274 | 0 | } |
275 | 0 | return txn_map; |
276 | 0 | } |
277 | | |
278 | | CHECKED_STATUS SetRecordTime(const TransactionId& txn_id, |
279 | | const TxnStatusMap& txn_map, |
280 | 0 | CDCRecordPB* record) { |
281 | 0 | auto txn_status = txn_map.find(txn_id); |
282 | 0 | if (txn_status == txn_map.end()) { |
283 | 0 | return STATUS(IllegalState, "Unexpected transaction ID", txn_id.ToString()); |
284 | 0 | } |
285 | 0 | record->set_time(txn_status->second.status_time.ToUint64()); |
286 | 0 | return Status::OK(); |
287 | 0 | } |
288 | | |
289 | | // Populate CDC record corresponding to WAL batch in ReplicateMsg. |
290 | | CHECKED_STATUS PopulateWriteRecord(const ReplicateMsgPtr& msg, |
291 | | const TxnStatusMap& txn_map, |
292 | | const StreamMetadata& metadata, |
293 | | const std::shared_ptr<tablet::TabletPeer>& tablet_peer, |
294 | | ReplicateIntents replicate_intents, |
295 | 0 | GetChangesResponsePB* resp) { |
296 | 0 | const auto& batch = msg->write().write_batch(); |
297 | 0 | const auto& schema = *tablet_peer->tablet()->schema(); |
298 | | // Write batch may contain records from different rows. |
299 | | // For CDC, we need to split the batch into 1 CDC record per row of the table. |
300 | | // We'll use DocDB key hash to identify the records that belong to the same row. |
301 | 0 | Slice prev_key; |
302 | 0 | CDCRecordPB* record = nullptr; |
303 | 0 | for (const auto& write_pair : batch.write_pairs()) { |
304 | 0 | Slice key = write_pair.key(); |
305 | 0 | const auto key_size = VERIFY_RESULT( |
306 | 0 | docdb::DocKey::EncodedSize(key, docdb::DocKeyPart::kWholeDocKey)); |
307 | |
|
308 | 0 | Slice value = write_pair.value(); |
309 | 0 | docdb::Value decoded_value; |
310 | 0 | RETURN_NOT_OK(decoded_value.Decode(value)); |
311 | | |
312 | | // Compare key hash with previously seen key hash to determine whether the write pair |
313 | | // is part of the same row or not. |
314 | 0 | Slice primary_key(key.data(), key_size); |
315 | 0 | if (prev_key != primary_key) { |
316 | | // Write pair contains record for different row. Create a new CDCRecord in this case. |
317 | 0 | record = resp->add_records(); |
318 | 0 | Slice sub_doc_key = key; |
319 | 0 | docdb::SubDocKey decoded_key; |
320 | 0 | RETURN_NOT_OK(decoded_key.DecodeFrom(&sub_doc_key, docdb::HybridTimeRequired::kFalse)); |
321 | |
|
322 | 0 | if (metadata.record_format == CDCRecordFormat::WAL) { |
323 | | // For 2DC, populate serialized data from WAL, to avoid unnecessary deserializing on |
324 | | // producer and re-serializing on consumer. |
325 | 0 | auto kv_pair = record->add_key(); |
326 | 0 | kv_pair->set_key(std::to_string(decoded_key.doc_key().hash())); |
327 | 0 | kv_pair->mutable_value()->set_binary_value(write_pair.key()); |
328 | 0 | } else { |
329 | 0 | AddPrimaryKey(decoded_key, schema, record); |
330 | 0 | } |
331 | | |
332 | | // Check whether operation is WRITE or DELETE. |
333 | 0 | if (decoded_value.value_type() == docdb::ValueType::kTombstone && |
334 | 0 | decoded_key.num_subkeys() == 0) { |
335 | 0 | record->set_operation(CDCRecordPB::DELETE); |
336 | 0 | } else { |
337 | 0 | record->set_operation(CDCRecordPB::WRITE); |
338 | 0 | } |
339 | | |
340 | | // Process intent records. |
341 | 0 | record->set_time(msg->hybrid_time()); |
342 | 0 | if (batch.has_transaction()) { |
343 | 0 | if (!replicate_intents) { |
344 | 0 | auto txn_id = VERIFY_RESULT(FullyDecodeTransactionId( |
345 | 0 | batch.transaction().transaction_id())); |
346 | | // If we're not replicating intents, set record time using the transaction map. |
347 | 0 | RETURN_NOT_OK(SetRecordTime(txn_id, txn_map, record)); |
348 | 0 | } else { |
349 | 0 | record->mutable_transaction_state()->set_transaction_id( |
350 | 0 | batch.transaction().transaction_id()); |
351 | 0 | record->mutable_transaction_state()->add_tablets(tablet_peer->tablet_id()); |
352 | 0 | } |
353 | 0 | } |
354 | 0 | } |
355 | 0 | prev_key = primary_key; |
356 | 0 | DCHECK(record); |
357 | |
|
358 | 0 | if (metadata.record_format == CDCRecordFormat::WAL) { |
359 | 0 | auto kv_pair = record->add_changes(); |
360 | 0 | kv_pair->set_key(write_pair.key()); |
361 | 0 | kv_pair->mutable_value()->set_binary_value(write_pair.value()); |
362 | 0 | } else if (record->operation() == CDCRecordPB_OperationType_WRITE) { |
363 | 0 | PrimitiveValue column_id; |
364 | 0 | Slice key_column = write_pair.key().data() + key_size; |
365 | 0 | RETURN_NOT_OK(PrimitiveValue::DecodeKey(&key_column, &column_id)); |
366 | 0 | if (column_id.value_type() == docdb::ValueType::kColumnId) { |
367 | 0 | const ColumnSchema& col = VERIFY_RESULT(schema.column_by_id(column_id.GetColumnId())); |
368 | 0 | AddColumnToMap(col, decoded_value.primitive_value(), record->add_changes()); |
369 | 0 | } else if (column_id.value_type() != docdb::ValueType::kSystemColumnId) { |
370 | 0 | LOG(DFATAL) << "Unexpected value type in key: " << column_id.value_type(); |
371 | 0 | } |
372 | 0 | } |
373 | 0 | } |
374 | 0 | return Status::OK(); |
375 | 0 | } |
376 | | |
377 | | // Populate CDC record corresponding to WAL UPDATE_TRANSACTION_OP entry. |
378 | | CHECKED_STATUS PopulateTransactionRecord(const ReplicateMsgPtr& msg, |
379 | | const std::shared_ptr<tablet::TabletPeer>& tablet_peer, |
380 | | ReplicateIntents replicate_intents, |
381 | 0 | CDCRecordPB* record) { |
382 | 0 | SCHECK(msg->has_transaction_state(), InvalidArgument, |
383 | 0 | Format("Update transaction message requires transaction_state: $0", |
384 | 0 | msg->ShortDebugString())); |
385 | 0 | record->set_operation(CDCRecordPB_OperationType_WRITE); |
386 | 0 | record->set_time(replicate_intents ? |
387 | 0 | msg->hybrid_time() : msg->transaction_state().commit_hybrid_time()); |
388 | 0 | record->mutable_transaction_state()->CopyFrom(msg->transaction_state()); |
389 | 0 | if (replicate_intents && msg->transaction_state().status() == TransactionStatus::APPLYING) { |
390 | | // Add the partition metadata so the consumer knows which tablets to apply the transaction |
391 | | // to. |
392 | 0 | tablet_peer->tablet()->metadata()->partition()->ToPB(record->mutable_partition()); |
393 | 0 | } |
394 | 0 | return Status::OK(); |
395 | 0 | } |
396 | | |
397 | 0 | CHECKED_STATUS PopulateSplitOpRecord(const ReplicateMsgPtr& msg, CDCRecordPB* record) { |
398 | 0 | SCHECK(msg->has_split_request(), InvalidArgument, |
399 | 0 | Format("Split op message requires split_request: $0", msg->ShortDebugString())); |
400 | 0 | record->set_operation(CDCRecordPB::SPLIT_OP); |
401 | 0 | record->set_time(msg->hybrid_time()); |
402 | 0 | record->mutable_split_tablet_request()->CopyFrom(msg->split_request()); |
403 | 0 | return Status::OK(); |
404 | 0 | } |
405 | | |
406 | | Status GetChangesForXCluster(const std::string& stream_id, |
407 | | const std::string& tablet_id, |
408 | | const OpId& from_op_id, |
409 | | const StreamMetadata& stream_metadata, |
410 | | const std::shared_ptr<tablet::TabletPeer>& tablet_peer, |
411 | | const MemTrackerPtr& mem_tracker, |
412 | | consensus::ReplicateMsgsHolder* msgs_holder, |
413 | | GetChangesResponsePB* resp, |
414 | | int64_t* last_readable_opid_index, |
415 | 0 | const CoarseTimePoint deadline) { |
416 | 0 | auto replicate_intents = ReplicateIntents(GetAtomicFlag(&FLAGS_cdc_enable_replicate_intents)); |
417 | | // Request scope on transaction participant so that transactions are not removed from participant |
418 | | // while RequestScope is active. |
419 | 0 | RequestScope request_scope; |
420 | |
|
421 | 0 | auto read_ops = VERIFY_RESULT(tablet_peer->consensus()-> |
422 | 0 | ReadReplicatedMessagesForCDC(from_op_id, last_readable_opid_index, deadline)); |
423 | 0 | ScopedTrackedConsumption consumption; |
424 | 0 | if (read_ops.read_from_disk_size && mem_tracker) { |
425 | 0 | consumption = ScopedTrackedConsumption(mem_tracker, read_ops.read_from_disk_size); |
426 | 0 | } |
427 | |
|
428 | 0 | OpId checkpoint; |
429 | 0 | TxnStatusMap txn_map; |
430 | 0 | if (!replicate_intents) { |
431 | 0 | auto txn_participant = tablet_peer->tablet()->transaction_participant(); |
432 | 0 | if (txn_participant) { |
433 | 0 | request_scope = RequestScope(txn_participant); |
434 | 0 | } |
435 | 0 | txn_map = TxnStatusMap(VERIFY_RESULT(BuildTxnStatusMap( |
436 | 0 | read_ops.messages, read_ops.have_more_messages, tablet_peer->Now(), txn_participant))); |
437 | 0 | } |
438 | 0 | ReplicateMsgs messages = VERIFY_RESULT(FilterAndSortWrites( |
439 | 0 | read_ops.messages, txn_map, replicate_intents, &checkpoint)); |
440 | |
|
441 | 0 | for (const auto& msg : messages) { |
442 | 0 | switch (msg->op_type()) { |
443 | 0 | case consensus::OperationType::UPDATE_TRANSACTION_OP: |
444 | 0 | if (!replicate_intents) { |
445 | 0 | RETURN_NOT_OK(PopulateTransactionRecord( |
446 | 0 | msg, tablet_peer, replicate_intents, resp->add_records())); |
447 | 0 | } else if (msg->transaction_state().status() == TransactionStatus::APPLYING) { |
448 | 0 | auto record = resp->add_records(); |
449 | 0 | record->set_operation(CDCRecordPB::APPLY); |
450 | 0 | record->set_time(msg->hybrid_time()); |
451 | 0 | auto* txn_state = record->mutable_transaction_state(); |
452 | 0 | txn_state->set_transaction_id(msg->transaction_state().transaction_id()); |
453 | 0 | txn_state->set_commit_hybrid_time(msg->transaction_state().commit_hybrid_time()); |
454 | 0 | tablet_peer->tablet()->metadata()->partition()->ToPB(record->mutable_partition()); |
455 | 0 | } |
456 | 0 | break; |
457 | 0 | case consensus::OperationType::WRITE_OP: |
458 | 0 | RETURN_NOT_OK(PopulateWriteRecord(msg, txn_map, stream_metadata, tablet_peer, |
459 | 0 | replicate_intents, resp)); |
460 | 0 | break; |
461 | 0 | case consensus::OperationType::SPLIT_OP: |
462 | 0 | RETURN_NOT_OK(PopulateSplitOpRecord(msg, resp->add_records())); |
463 | 0 | break; |
464 | |
|
465 | 0 | default: |
466 | | // Nothing to do for other operation types. |
467 | 0 | break; |
468 | 0 | } |
469 | 0 | } |
470 | |
|
471 | 0 | if (consumption) { |
472 | 0 | consumption.Add(resp->SpaceUsedLong()); |
473 | 0 | } |
474 | 0 | *msgs_holder = consensus::ReplicateMsgsHolder( |
475 | 0 | nullptr, std::move(messages), std::move(consumption)); |
476 | 0 | (checkpoint.index > 0 ? checkpoint : from_op_id).ToPB( |
477 | 0 | resp->mutable_checkpoint()->mutable_op_id()); |
478 | 0 | return Status::OK(); |
479 | 0 | } |
480 | | |
481 | | } // namespace cdc |
482 | | } // namespace yb |