/Users/deen/code/yugabyte-db/ent/src/yb/integration-tests/cdc_service-txn-test.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | |
13 | | #include "yb/common/wire_protocol.h" |
14 | | #include "yb/common/wire_protocol-test-util.h" |
15 | | |
16 | | #include "yb/cdc/cdc_service.h" |
17 | | #include "yb/cdc/cdc_service.proxy.h" |
18 | | |
19 | | #include "yb/client/session.h" |
20 | | #include "yb/client/table.h" |
21 | | #include "yb/client/transaction.h" |
22 | | #include "yb/client/txn-test-base.h" |
23 | | |
24 | | #include "yb/docdb/primitive_value.h" |
25 | | #include "yb/docdb/value_type.h" |
26 | | |
27 | | #include "yb/integration-tests/cdc_test_util.h" |
28 | | |
29 | | #include "yb/master/master_client.pb.h" |
30 | | |
31 | | #include "yb/rpc/messenger.h" |
32 | | #include "yb/rpc/rpc_controller.h" |
33 | | |
34 | | #include "yb/tablet/tablet.h" |
35 | | |
36 | | #include "yb/tserver/mini_tablet_server.h" |
37 | | #include "yb/tserver/tablet_server.h" |
38 | | #include "yb/util/logging.h" |
39 | | |
40 | | #include "yb/util/metrics.h" |
41 | | #include "yb/util/slice.h" |
42 | | |
43 | | DECLARE_bool(cdc_enable_replicate_intents); |
44 | | |
45 | | namespace yb { |
46 | | namespace cdc { |
47 | | |
48 | | using client::Flush; |
49 | | using client::TransactionTestBase; |
50 | | using client::WriteOpType; |
51 | | using rpc::RpcController; |
52 | | |
53 | | class CDCServiceTxnTest : public TransactionTestBase<MiniCluster>, |
54 | | public testing::WithParamInterface<bool /* enable_intents */> { |
55 | | protected: |
56 | 0 | void SetUp() override { |
57 | 0 | mini_cluster_opt_.num_masters = 1; |
58 | 0 | mini_cluster_opt_.num_tablet_servers = 1; |
59 | 0 | SetAtomicFlag(GetParam(), &FLAGS_cdc_enable_replicate_intents); |
60 | 0 | create_table_ = false; |
61 | 0 | SetIsolationLevel(IsolationLevel::SERIALIZABLE_ISOLATION); |
62 | 0 | SetNumTablets(1); |
63 | 0 | TransactionTestBase::SetUp(); |
64 | 0 | CreateTable(); |
65 | |
|
66 | 0 | const auto mini_server = cluster_->mini_tablet_servers().front(); |
67 | 0 | cdc_proxy_ = std::make_unique<CDCServiceProxy>( |
68 | 0 | &client_->proxy_cache(), HostPort::FromBoundEndpoint(mini_server->bound_rpc_addr())); |
69 | 0 | } |
70 | | |
71 | | std::unique_ptr<CDCServiceProxy> cdc_proxy_; |
72 | | }; |
73 | | |
74 | | INSTANTIATE_TEST_CASE_P(EnableIntentReplication, CDCServiceTxnTest, ::testing::Bool()); |
75 | | |
76 | | void AssertValue(const google::protobuf::Map<string, QLValuePB>& changes, int32_t expected_value) { |
77 | | ASSERT_EQ(changes.size(), 1); |
78 | | const auto& value = changes.find("value"); |
79 | | ASSERT_NE(value, changes.end()); |
80 | | ASSERT_EQ(value->second.int32_value(), expected_value); |
81 | | } |
82 | | |
83 | | void CheckIntentRecord(const CDCRecordPB& record, int expected_value, bool replicate_intents) { |
84 | | ASSERT_EQ(record.changes_size(), 1); |
85 | | // Check the key. |
86 | | ASSERT_NO_FATALS(AssertIntKey(record.key(), expected_value)); |
87 | | // Make sure transaction metadata is set. |
88 | | if (replicate_intents) { |
89 | | ASSERT_TRUE(record.has_transaction_state()); |
90 | | ASSERT_TRUE(record.has_time()); |
91 | | const auto& transaction_state = record.transaction_state(); |
92 | | ASSERT_TRUE(transaction_state.has_transaction_id()); |
93 | | } |
94 | | } |
95 | | |
96 | | void CheckApplyRecord(const CDCRecordPB& apply_record, bool replicate_intents) { |
97 | | ASSERT_EQ(apply_record.changes_size(), 0); |
98 | | if (replicate_intents) { |
99 | | ASSERT_TRUE(apply_record.has_transaction_state()); |
100 | | ASSERT_TRUE(apply_record.has_partition()); |
101 | | const auto& txn_state = apply_record.transaction_state(); |
102 | | ASSERT_TRUE(txn_state.has_transaction_id()); |
103 | | ASSERT_EQ(apply_record.operation(), cdc::CDCRecordPB::APPLY); |
104 | | ASSERT_TRUE(apply_record.has_time()); |
105 | | } |
106 | | } |
107 | | |
108 | 0 | void CheckRegularRecord(const CDCRecordPB& record, int expected_value) { |
109 | 0 | ASSERT_EQ(record.changes_size(), 1); |
110 | 0 | ASSERT_NO_FATALS(AssertIntKey(record.key(), expected_value)); |
111 | 0 | } |
112 | | |
113 | 0 | TEST_P(CDCServiceTxnTest, TestGetChanges) { |
114 | | // Consider the following writes: |
115 | | // TO: WRITE K0 |
116 | | // T1: WRITE K1 (TXN1) |
117 | | // T2: WRITE K2 (TXN2) |
118 | | // T3: WRITE K3 |
119 | | // T4: APPLYING TXN2 |
120 | | // T5: APPLYING TXN1 |
121 | | // T6: WRITE K4 |
122 | 0 | bool replicate_intents = GetParam(); |
123 | 0 | auto session = CreateSession(); |
124 | 0 | ASSERT_RESULT(WriteRow(session, 10000 /* key */, 10000 /* value */, WriteOpType::INSERT, |
125 | 0 | Flush::kTrue)); |
126 | |
|
127 | 0 | auto txn1 = CreateTransaction(); |
128 | 0 | auto session1 = CreateSession(txn1); |
129 | 0 | ASSERT_RESULT(WriteRow(session1, 10001 /* key */, 10001 /* value */, WriteOpType::INSERT, |
130 | 0 | Flush::kTrue)); |
131 | |
|
132 | 0 | auto txn2 = CreateTransaction(); |
133 | 0 | auto session2 = CreateSession(txn2); |
134 | 0 | ASSERT_RESULT(WriteRow(session2, 10002 /* key */, 10002 /* value */, WriteOpType::INSERT, |
135 | 0 | Flush::kTrue)); |
136 | |
|
137 | 0 | ASSERT_RESULT(WriteRow(session, 10003 /* key */, 10003 /* value */, WriteOpType::INSERT, |
138 | 0 | Flush::kTrue)); |
139 | |
|
140 | 0 | ASSERT_OK(txn2->CommitFuture().get()); |
141 | 0 | ASSERT_OK(txn1->CommitFuture().get()); |
142 | |
|
143 | 0 | ASSERT_RESULT(WriteRow(session, 10004 /* key */, 10004 /* value */, WriteOpType::INSERT, |
144 | 0 | Flush::kTrue)); |
145 | | |
146 | | // Get tablet ID. |
147 | 0 | google::protobuf::RepeatedPtrField<master::TabletLocationsPB> tablets; |
148 | 0 | ASSERT_OK( |
149 | 0 | client_->GetTablets(table_->name(), 0, &tablets, /* partition_list_version =*/ nullptr)); |
150 | 0 | ASSERT_EQ(tablets.size(), 1); |
151 | | |
152 | | // Create CDC stream on table. |
153 | 0 | CDCStreamId stream_id; |
154 | 0 | CreateCDCStream(cdc_proxy_, table_.table()->id(), &stream_id); |
155 | |
|
156 | 0 | GetChangesRequestPB change_req; |
157 | 0 | GetChangesResponsePB change_resp; |
158 | |
|
159 | 0 | change_req.set_stream_id(stream_id); |
160 | 0 | change_req.set_tablet_id(tablets.Get(0).tablet_id()); |
161 | 0 | change_req.mutable_from_checkpoint()->mutable_op_id()->set_index(0); |
162 | 0 | change_req.mutable_from_checkpoint()->mutable_op_id()->set_term(0); |
163 | | |
164 | | // Get CDC changes. |
165 | 0 | { |
166 | 0 | RpcController rpc; |
167 | 0 | SCOPED_TRACE(change_req.DebugString()); |
168 | 0 | ASSERT_OK(cdc_proxy_->GetChanges(change_req, &change_resp, &rpc)); |
169 | 0 | SCOPED_TRACE(change_resp.DebugString()); |
170 | 0 | ASSERT_FALSE(change_resp.has_error()); |
171 | | |
172 | | // Expect total 7 records: 5 WRITE_OP records and 2 UPDATE_TRANSACTION_OP records. |
173 | 0 | ASSERT_EQ(change_resp.records_size(), 7); |
174 | |
|
175 | 0 | struct Record { |
176 | 0 | int32_t value; // 0 if apply record. |
177 | 0 | bool is_intent; // Differentiate between intent and regular record. |
178 | 0 | }; |
179 | |
|
180 | 0 | Record expected_records_in_order[7] = |
181 | 0 | {{10000, false}, {10001, true}, {10002, true}, {10003, false}, {0, false}, {0, false}, |
182 | 0 | {10004, false}}; |
183 | 0 | Record expected_records_out_of_order[7] = |
184 | 0 | {{10000, false}, {10003, false}, {10002, true}, {0, false}, {10001, true}, {0, false}, |
185 | 0 | {10004, false}}; |
186 | | |
187 | | // We expect a different order of records based on whether we replicate intents, since with the |
188 | | // feature enabled, we will receive intents at write time and not commit time. |
189 | 0 | const Record* expected_order = replicate_intents ? |
190 | 0 | expected_records_in_order : expected_records_out_of_order; |
191 | |
|
192 | 0 | for (int i = 0; i < 7; i++) { |
193 | 0 | const auto& record = expected_order[i]; |
194 | 0 | if (record.value == 0) { |
195 | | // This contains the record for APPLYING transaction. |
196 | 0 | ASSERT_NO_FATALS(CheckApplyRecord(change_resp.records(i), replicate_intents)); |
197 | 0 | } else if (record.is_intent) { |
198 | 0 | ASSERT_NO_FATALS(CheckIntentRecord(change_resp.records(i), record.value, |
199 | 0 | replicate_intents)); |
200 | 0 | } else { |
201 | 0 | ASSERT_NO_FATALS(CheckRegularRecord(change_resp.records(i), record.value)); |
202 | 0 | } |
203 | 0 | } |
204 | 0 | } |
205 | 0 | } |
206 | | |
207 | 0 | TEST_P(CDCServiceTxnTest, TestGetChangesForPendingTransaction) { |
208 | | // If GetChanges is called in the middle of a transaction, ensure that transaction is not |
209 | | // incorrectly considered as aborted if we can't find the transaction commit record. |
210 | | // A subsequent call to GetChanges after the transaction is committed should get the |
211 | | // rows committed by transaction. |
212 | |
|
213 | 0 | static const int32_t kNumIntentsToWrite = 3; |
214 | 0 | static const int32_t kStartKey = 10000; |
215 | | // Get tablet ID. |
216 | 0 | bool replicate_intents = GetParam(); |
217 | 0 | google::protobuf::RepeatedPtrField<master::TabletLocationsPB> tablets; |
218 | 0 | ASSERT_OK( |
219 | 0 | client_->GetTablets(table_->name(), 0, &tablets, /* partition_list_version =*/ nullptr)); |
220 | 0 | ASSERT_EQ(tablets.size(), 1); |
221 | | |
222 | | // Create CDC stream on table. |
223 | 0 | CDCStreamId stream_id; |
224 | 0 | CreateCDCStream(cdc_proxy_, table_.table()->id(), &stream_id); |
225 | |
|
226 | 0 | auto txn = CreateTransaction(); |
227 | 0 | auto session = CreateSession(txn); |
228 | 0 | ASSERT_RESULT(WriteRow(session, kStartKey /* key */, kStartKey /* value */, WriteOpType::INSERT, |
229 | 0 | Flush::kTrue)); |
230 | 0 | ASSERT_RESULT(WriteRow(session, kStartKey + 1 /* key */, kStartKey + 1 /* value */, |
231 | 0 | WriteOpType::INSERT, Flush::kTrue)); |
232 | 0 | ASSERT_RESULT(WriteRow(session, kStartKey + 2 /* key */, kStartKey + 2 /* value */, |
233 | 0 | WriteOpType::INSERT, Flush::kTrue)); |
234 | |
|
235 | 0 | GetChangesRequestPB change_req; |
236 | 0 | GetChangesResponsePB change_resp; |
237 | |
|
238 | 0 | change_req.set_stream_id(stream_id); |
239 | 0 | change_req.set_tablet_id(tablets.Get(0).tablet_id()); |
240 | 0 | change_req.mutable_from_checkpoint()->mutable_op_id()->set_index(0); |
241 | 0 | change_req.mutable_from_checkpoint()->mutable_op_id()->set_term(0); |
242 | | |
243 | | // Get CDC changes. |
244 | 0 | { |
245 | 0 | RpcController rpc; |
246 | 0 | SCOPED_TRACE(change_req.DebugString()); |
247 | 0 | ASSERT_OK(cdc_proxy_->GetChanges(change_req, &change_resp, &rpc)); |
248 | 0 | SCOPED_TRACE(change_resp.DebugString()); |
249 | 0 | ASSERT_FALSE(change_resp.has_error()); |
250 | | |
251 | | // Expect 3 records for the 3 intents if we replicate intents but 0 if we don't. |
252 | 0 | ASSERT_EQ(change_resp.records_size(), replicate_intents ? kNumIntentsToWrite : 0); |
253 | 0 | } |
254 | |
|
255 | 0 | int32_t expected_order[kNumIntentsToWrite] = {kStartKey, kStartKey + 1, kStartKey + 2}; |
256 | |
|
257 | 0 | for (int i = 0; i < change_resp.records_size(); i++) { |
258 | 0 | ASSERT_NO_FATALS(CheckIntentRecord(change_resp.records(i), expected_order[i], |
259 | 0 | replicate_intents)); |
260 | 0 | } |
261 | | |
262 | | // Commit transaction. |
263 | 0 | ASSERT_OK(txn->CommitFuture().get()); |
264 | 0 | ASSERT_OK(session->Flush()); |
265 | |
|
266 | 0 | auto checkpoint = change_resp.checkpoint(); |
267 | | |
268 | | // Get CDC changes. |
269 | 0 | { |
270 | | // Need to poll because Flush returns on majority_replicated and CDC waits for fully committed. |
271 | 0 | ASSERT_OK(WaitFor([&]() -> Result<bool> { |
272 | 0 | RpcController rpc; |
273 | 0 | *change_req.mutable_from_checkpoint() = checkpoint; |
274 | 0 | change_resp.Clear(); |
275 | 0 | RETURN_NOT_OK(cdc_proxy_->GetChanges(change_req, &change_resp, &rpc)); |
276 | 0 | if (change_resp.has_error()) return Result<bool>(StatusFromPB(change_resp.error().status())); |
277 | | // Expect 1 new record if we replicate intents and 4 if we don't. |
278 | 0 | return change_resp.records_size() == (replicate_intents ? 1 : kNumIntentsToWrite + 1); |
279 | 0 | }, MonoDelta::FromSeconds(30), "Wait for Transaction to be committed.")); |
280 | 0 | for (int i = 0; i < change_resp.records_size() - 1; i++) { |
281 | 0 | ASSERT_NO_FATALS(CheckIntentRecord(change_resp.records(i), expected_order[i], |
282 | 0 | replicate_intents)); |
283 | 0 | } |
284 | 0 | ASSERT_NO_FATALS(CheckApplyRecord(change_resp.records(change_resp.records_size() - 1), |
285 | 0 | replicate_intents)); |
286 | 0 | } |
287 | 0 | } |
288 | | |
289 | | // Only test 'enable_replicate_intents = true'. |
290 | | class CDCServiceTxnTestEnableReplicateIntents : public CDCServiceTxnTest { |
291 | | }; |
292 | | |
293 | | INSTANTIATE_TEST_CASE_P(EnableIntentReplication, CDCServiceTxnTestEnableReplicateIntents, |
294 | | ::testing::Values(true /* enable_replicate_intents */)); |
295 | | |
296 | 0 | TEST_P(CDCServiceTxnTestEnableReplicateIntents, MetricsTest) { |
297 | 0 | static const int32_t entry_to_add = 100; |
298 | 0 | auto txn = CreateTransaction(); |
299 | 0 | auto session = CreateSession(txn); |
300 | 0 | ASSERT_RESULT(WriteRow(session, entry_to_add /* key */, entry_to_add /* value */, |
301 | 0 | WriteOpType::INSERT, Flush::kTrue)); |
302 | 0 | ASSERT_OK(txn->CommitFuture().get()); |
303 | | |
304 | | // Get tablet ID. |
305 | 0 | google::protobuf::RepeatedPtrField<master::TabletLocationsPB> tablets; |
306 | 0 | ASSERT_OK( |
307 | 0 | client_->GetTablets(table_->name(), 0, &tablets, /* partition_list_version =*/ nullptr)); |
308 | 0 | ASSERT_EQ(tablets.size(), 1); |
309 | | |
310 | | // Create CDC stream on table. |
311 | 0 | CDCStreamId stream_id; |
312 | 0 | CreateCDCStream(cdc_proxy_, table_.table()->id(), &stream_id); |
313 | |
|
314 | 0 | auto tablet_id = tablets.Get(0).tablet_id(); |
315 | |
|
316 | 0 | GetChangesRequestPB change_req; |
317 | 0 | GetChangesResponsePB change_resp; |
318 | |
|
319 | 0 | change_req.set_stream_id(stream_id); |
320 | 0 | change_req.set_tablet_id(tablet_id); |
321 | 0 | change_req.mutable_from_checkpoint()->mutable_op_id()->set_index(0); |
322 | 0 | change_req.mutable_from_checkpoint()->mutable_op_id()->set_term(0); |
323 | | |
324 | | // Get CDC changes. |
325 | 0 | { |
326 | 0 | RpcController rpc; |
327 | 0 | SCOPED_TRACE(change_req.DebugString()); |
328 | 0 | ASSERT_OK(cdc_proxy_->GetChanges(change_req, &change_resp, &rpc)); |
329 | 0 | SCOPED_TRACE(change_resp.DebugString()); |
330 | 0 | ASSERT_FALSE(change_resp.has_error()); |
331 | |
|
332 | 0 | ASSERT_EQ(change_resp.records_size(), 2); |
333 | 0 | } |
334 | |
|
335 | 0 | ASSERT_OK(WaitFor([&]() -> Result<bool> { |
336 | 0 | const auto& tserver = cluster_->mini_tablet_server(0)->server(); |
337 | 0 | auto cdc_service = dynamic_cast<CDCServiceImpl*>( |
338 | 0 | tserver->rpc_server()->TEST_service_pool("yb.cdc.CDCService")->TEST_get_service().get()); |
339 | 0 | auto metrics = cdc_service->GetCDCTabletMetrics({"" /* UUID */, stream_id, tablet_id}); |
340 | 0 | auto lag = metrics->async_replication_sent_lag_micros->value(); |
341 | 0 | YB_LOG_EVERY_N_SECS(INFO, 1) << "Sent lag: " << lag << "us"; |
342 | | // Only check sent lag, since we're just calling GetChanges once and expect committed lag to be |
343 | | // greater than 0. |
344 | 0 | return lag == 0; |
345 | 0 | }, MonoDelta::FromSeconds(10), "Wait for Sent Lag == 0")); |
346 | | |
347 | |
|
348 | 0 | } |
349 | | |
350 | | } // namespace cdc |
351 | | } // namespace yb |