YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/ent/src/yb/integration-tests/cdc_service-txn-test.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
13
#include "yb/common/wire_protocol.h"
14
#include "yb/common/wire_protocol-test-util.h"
15
16
#include "yb/cdc/cdc_service.h"
17
#include "yb/cdc/cdc_service.proxy.h"
18
19
#include "yb/client/session.h"
20
#include "yb/client/table.h"
21
#include "yb/client/transaction.h"
22
#include "yb/client/txn-test-base.h"
23
24
#include "yb/docdb/primitive_value.h"
25
#include "yb/docdb/value_type.h"
26
27
#include "yb/integration-tests/cdc_test_util.h"
28
29
#include "yb/master/master_client.pb.h"
30
31
#include "yb/rpc/messenger.h"
32
#include "yb/rpc/rpc_controller.h"
33
34
#include "yb/tablet/tablet.h"
35
36
#include "yb/tserver/mini_tablet_server.h"
37
#include "yb/tserver/tablet_server.h"
38
#include "yb/util/logging.h"
39
40
#include "yb/util/metrics.h"
41
#include "yb/util/slice.h"
42
43
DECLARE_bool(cdc_enable_replicate_intents);
44
45
namespace yb {
46
namespace cdc {
47
48
using client::Flush;
49
using client::TransactionTestBase;
50
using client::WriteOpType;
51
using rpc::RpcController;
52
53
class CDCServiceTxnTest : public TransactionTestBase<MiniCluster>,
54
                          public testing::WithParamInterface<bool /* enable_intents */> {
55
 protected:
56
0
  void SetUp() override {
57
0
    mini_cluster_opt_.num_masters = 1;
58
0
    mini_cluster_opt_.num_tablet_servers = 1;
59
0
    SetAtomicFlag(GetParam(), &FLAGS_cdc_enable_replicate_intents);
60
0
    create_table_ = false;
61
0
    SetIsolationLevel(IsolationLevel::SERIALIZABLE_ISOLATION);
62
0
    SetNumTablets(1);
63
0
    TransactionTestBase::SetUp();
64
0
    CreateTable();
65
66
0
    const auto mini_server = cluster_->mini_tablet_servers().front();
67
0
    cdc_proxy_ = std::make_unique<CDCServiceProxy>(
68
0
        &client_->proxy_cache(), HostPort::FromBoundEndpoint(mini_server->bound_rpc_addr()));
69
0
  }
70
71
  std::unique_ptr<CDCServiceProxy> cdc_proxy_;
72
};
73
74
INSTANTIATE_TEST_CASE_P(EnableIntentReplication, CDCServiceTxnTest, ::testing::Bool());
75
76
void AssertValue(const google::protobuf::Map<string, QLValuePB>& changes, int32_t expected_value) {
77
  ASSERT_EQ(changes.size(), 1);
78
  const auto& value = changes.find("value");
79
  ASSERT_NE(value, changes.end());
80
  ASSERT_EQ(value->second.int32_value(), expected_value);
81
}
82
83
void CheckIntentRecord(const CDCRecordPB& record, int expected_value, bool replicate_intents) {
84
  ASSERT_EQ(record.changes_size(), 1);
85
  // Check the key.
86
  ASSERT_NO_FATALS(AssertIntKey(record.key(), expected_value));
87
  // Make sure transaction metadata is set.
88
  if (replicate_intents) {
89
    ASSERT_TRUE(record.has_transaction_state());
90
    ASSERT_TRUE(record.has_time());
91
    const auto& transaction_state = record.transaction_state();
92
    ASSERT_TRUE(transaction_state.has_transaction_id());
93
  }
94
}
95
96
void CheckApplyRecord(const CDCRecordPB& apply_record, bool replicate_intents) {
97
  ASSERT_EQ(apply_record.changes_size(), 0);
98
  if (replicate_intents) {
99
    ASSERT_TRUE(apply_record.has_transaction_state());
100
    ASSERT_TRUE(apply_record.has_partition());
101
    const auto& txn_state = apply_record.transaction_state();
102
    ASSERT_TRUE(txn_state.has_transaction_id());
103
    ASSERT_EQ(apply_record.operation(), cdc::CDCRecordPB::APPLY);
104
    ASSERT_TRUE(apply_record.has_time());
105
  }
106
}
107
108
0
void CheckRegularRecord(const CDCRecordPB& record, int expected_value) {
109
0
  ASSERT_EQ(record.changes_size(), 1);
110
0
  ASSERT_NO_FATALS(AssertIntKey(record.key(), expected_value));
111
0
}
112
113
0
TEST_P(CDCServiceTxnTest, TestGetChanges) {
114
  // Consider the following writes:
115
  // TO: WRITE K0
116
  // T1: WRITE K1 (TXN1)
117
  // T2: WRITE K2 (TXN2)
118
  // T3: WRITE K3
119
  // T4: APPLYING TXN2
120
  // T5: APPLYING TXN1
121
  // T6: WRITE K4
122
0
  bool replicate_intents = GetParam();
123
0
  auto session = CreateSession();
124
0
  ASSERT_RESULT(WriteRow(session, 10000 /* key */, 10000 /* value */, WriteOpType::INSERT,
125
0
                         Flush::kTrue));
126
127
0
  auto txn1 = CreateTransaction();
128
0
  auto session1 = CreateSession(txn1);
129
0
  ASSERT_RESULT(WriteRow(session1, 10001 /* key */, 10001 /* value */, WriteOpType::INSERT,
130
0
                         Flush::kTrue));
131
132
0
  auto txn2 = CreateTransaction();
133
0
  auto session2 = CreateSession(txn2);
134
0
  ASSERT_RESULT(WriteRow(session2, 10002 /* key */, 10002 /* value */, WriteOpType::INSERT,
135
0
                         Flush::kTrue));
136
137
0
  ASSERT_RESULT(WriteRow(session, 10003 /* key */, 10003 /* value */, WriteOpType::INSERT,
138
0
                         Flush::kTrue));
139
140
0
  ASSERT_OK(txn2->CommitFuture().get());
141
0
  ASSERT_OK(txn1->CommitFuture().get());
142
143
0
  ASSERT_RESULT(WriteRow(session, 10004 /* key */, 10004 /* value */, WriteOpType::INSERT,
144
0
                         Flush::kTrue));
145
146
  // Get tablet ID.
147
0
  google::protobuf::RepeatedPtrField<master::TabletLocationsPB> tablets;
148
0
  ASSERT_OK(
149
0
      client_->GetTablets(table_->name(), 0, &tablets, /* partition_list_version =*/ nullptr));
150
0
  ASSERT_EQ(tablets.size(), 1);
151
152
  // Create CDC stream on table.
153
0
  CDCStreamId stream_id;
154
0
  CreateCDCStream(cdc_proxy_, table_.table()->id(), &stream_id);
155
156
0
  GetChangesRequestPB change_req;
157
0
  GetChangesResponsePB change_resp;
158
159
0
  change_req.set_stream_id(stream_id);
160
0
  change_req.set_tablet_id(tablets.Get(0).tablet_id());
161
0
  change_req.mutable_from_checkpoint()->mutable_op_id()->set_index(0);
162
0
  change_req.mutable_from_checkpoint()->mutable_op_id()->set_term(0);
163
164
  // Get CDC changes.
165
0
  {
166
0
    RpcController rpc;
167
0
    SCOPED_TRACE(change_req.DebugString());
168
0
    ASSERT_OK(cdc_proxy_->GetChanges(change_req, &change_resp, &rpc));
169
0
    SCOPED_TRACE(change_resp.DebugString());
170
0
    ASSERT_FALSE(change_resp.has_error());
171
172
    // Expect total 7 records: 5 WRITE_OP records and 2 UPDATE_TRANSACTION_OP records.
173
0
    ASSERT_EQ(change_resp.records_size(), 7);
174
175
0
    struct Record {
176
0
      int32_t value; // 0 if apply record.
177
0
      bool is_intent; // Differentiate between intent and regular record.
178
0
    };
179
180
0
    Record expected_records_in_order[7] =
181
0
        {{10000, false}, {10001, true}, {10002, true}, {10003, false}, {0, false}, {0, false},
182
0
        {10004, false}};
183
0
    Record expected_records_out_of_order[7] =
184
0
        {{10000, false}, {10003, false}, {10002, true}, {0, false}, {10001, true}, {0, false},
185
0
        {10004, false}};
186
187
    // We expect a different order of records based on whether we replicate intents, since with the
188
    // feature enabled, we will receive intents at write time and not commit time.
189
0
    const Record* expected_order = replicate_intents ?
190
0
        expected_records_in_order : expected_records_out_of_order;
191
192
0
    for (int i = 0; i < 7; i++) {
193
0
      const auto& record = expected_order[i];
194
0
      if (record.value == 0) {
195
        // This contains the record for APPLYING transaction.
196
0
        ASSERT_NO_FATALS(CheckApplyRecord(change_resp.records(i), replicate_intents));
197
0
      } else if (record.is_intent) {
198
0
        ASSERT_NO_FATALS(CheckIntentRecord(change_resp.records(i), record.value,
199
0
                                           replicate_intents));
200
0
      } else {
201
0
        ASSERT_NO_FATALS(CheckRegularRecord(change_resp.records(i), record.value));
202
0
      }
203
0
    }
204
0
  }
205
0
}
206
207
0
TEST_P(CDCServiceTxnTest, TestGetChangesForPendingTransaction) {
208
  // If GetChanges is called in the middle of a transaction, ensure that transaction is not
209
  // incorrectly considered as aborted if we can't find the transaction commit record.
210
  // A subsequent call to GetChanges after the transaction is committed should get the
211
  // rows committed by transaction.
212
213
0
  static const int32_t kNumIntentsToWrite = 3;
214
0
  static const int32_t kStartKey = 10000;
215
  // Get tablet ID.
216
0
  bool replicate_intents = GetParam();
217
0
  google::protobuf::RepeatedPtrField<master::TabletLocationsPB> tablets;
218
0
  ASSERT_OK(
219
0
      client_->GetTablets(table_->name(), 0, &tablets, /* partition_list_version =*/ nullptr));
220
0
  ASSERT_EQ(tablets.size(), 1);
221
222
  // Create CDC stream on table.
223
0
  CDCStreamId stream_id;
224
0
  CreateCDCStream(cdc_proxy_, table_.table()->id(), &stream_id);
225
226
0
  auto txn = CreateTransaction();
227
0
  auto session = CreateSession(txn);
228
0
  ASSERT_RESULT(WriteRow(session, kStartKey /* key */, kStartKey /* value */, WriteOpType::INSERT,
229
0
                         Flush::kTrue));
230
0
  ASSERT_RESULT(WriteRow(session, kStartKey + 1 /* key */, kStartKey + 1 /* value */,
231
0
                         WriteOpType::INSERT, Flush::kTrue));
232
0
  ASSERT_RESULT(WriteRow(session, kStartKey + 2 /* key */, kStartKey + 2 /* value */,
233
0
                         WriteOpType::INSERT, Flush::kTrue));
234
235
0
  GetChangesRequestPB change_req;
236
0
  GetChangesResponsePB change_resp;
237
238
0
  change_req.set_stream_id(stream_id);
239
0
  change_req.set_tablet_id(tablets.Get(0).tablet_id());
240
0
  change_req.mutable_from_checkpoint()->mutable_op_id()->set_index(0);
241
0
  change_req.mutable_from_checkpoint()->mutable_op_id()->set_term(0);
242
243
  // Get CDC changes.
244
0
  {
245
0
    RpcController rpc;
246
0
    SCOPED_TRACE(change_req.DebugString());
247
0
    ASSERT_OK(cdc_proxy_->GetChanges(change_req, &change_resp, &rpc));
248
0
    SCOPED_TRACE(change_resp.DebugString());
249
0
    ASSERT_FALSE(change_resp.has_error());
250
251
    // Expect 3 records for the 3 intents if we replicate intents but 0 if we don't.
252
0
    ASSERT_EQ(change_resp.records_size(), replicate_intents ? kNumIntentsToWrite : 0);
253
0
  }
254
255
0
  int32_t expected_order[kNumIntentsToWrite] = {kStartKey, kStartKey + 1, kStartKey + 2};
256
257
0
  for (int i = 0; i < change_resp.records_size(); i++) {
258
0
    ASSERT_NO_FATALS(CheckIntentRecord(change_resp.records(i), expected_order[i],
259
0
                                       replicate_intents));
260
0
  }
261
262
  // Commit transaction.
263
0
  ASSERT_OK(txn->CommitFuture().get());
264
0
  ASSERT_OK(session->Flush());
265
266
0
  auto checkpoint = change_resp.checkpoint();
267
268
  // Get CDC changes.
269
0
  {
270
    // Need to poll because Flush returns on majority_replicated and CDC waits for fully committed.
271
0
    ASSERT_OK(WaitFor([&]() -> Result<bool> {
272
0
      RpcController rpc;
273
0
      *change_req.mutable_from_checkpoint() = checkpoint;
274
0
      change_resp.Clear();
275
0
      RETURN_NOT_OK(cdc_proxy_->GetChanges(change_req, &change_resp, &rpc));
276
0
      if (change_resp.has_error()) return Result<bool>(StatusFromPB(change_resp.error().status()));
277
      // Expect 1 new record if we replicate intents and 4 if we don't.
278
0
      return change_resp.records_size() == (replicate_intents ? 1 : kNumIntentsToWrite + 1);
279
0
    }, MonoDelta::FromSeconds(30), "Wait for Transaction to be committed."));
280
0
    for (int i = 0; i < change_resp.records_size() - 1; i++) {
281
0
      ASSERT_NO_FATALS(CheckIntentRecord(change_resp.records(i), expected_order[i],
282
0
                                         replicate_intents));
283
0
    }
284
0
    ASSERT_NO_FATALS(CheckApplyRecord(change_resp.records(change_resp.records_size() - 1),
285
0
                                      replicate_intents));
286
0
  }
287
0
}
288
289
// Only test 'enable_replicate_intents = true'.
290
class CDCServiceTxnTestEnableReplicateIntents : public CDCServiceTxnTest {
291
};
292
293
INSTANTIATE_TEST_CASE_P(EnableIntentReplication, CDCServiceTxnTestEnableReplicateIntents,
294
                        ::testing::Values(true /* enable_replicate_intents */));
295
296
0
TEST_P(CDCServiceTxnTestEnableReplicateIntents, MetricsTest) {
297
0
  static const int32_t entry_to_add = 100;
298
0
  auto txn = CreateTransaction();
299
0
  auto session = CreateSession(txn);
300
0
  ASSERT_RESULT(WriteRow(session, entry_to_add /* key */, entry_to_add /* value */,
301
0
                         WriteOpType::INSERT, Flush::kTrue));
302
0
  ASSERT_OK(txn->CommitFuture().get());
303
304
  // Get tablet ID.
305
0
  google::protobuf::RepeatedPtrField<master::TabletLocationsPB> tablets;
306
0
  ASSERT_OK(
307
0
      client_->GetTablets(table_->name(), 0, &tablets, /* partition_list_version =*/ nullptr));
308
0
  ASSERT_EQ(tablets.size(), 1);
309
310
  // Create CDC stream on table.
311
0
  CDCStreamId stream_id;
312
0
  CreateCDCStream(cdc_proxy_, table_.table()->id(), &stream_id);
313
314
0
  auto tablet_id = tablets.Get(0).tablet_id();
315
316
0
  GetChangesRequestPB change_req;
317
0
  GetChangesResponsePB change_resp;
318
319
0
  change_req.set_stream_id(stream_id);
320
0
  change_req.set_tablet_id(tablet_id);
321
0
  change_req.mutable_from_checkpoint()->mutable_op_id()->set_index(0);
322
0
  change_req.mutable_from_checkpoint()->mutable_op_id()->set_term(0);
323
324
  // Get CDC changes.
325
0
  {
326
0
    RpcController rpc;
327
0
    SCOPED_TRACE(change_req.DebugString());
328
0
    ASSERT_OK(cdc_proxy_->GetChanges(change_req, &change_resp, &rpc));
329
0
    SCOPED_TRACE(change_resp.DebugString());
330
0
    ASSERT_FALSE(change_resp.has_error());
331
332
0
    ASSERT_EQ(change_resp.records_size(), 2);
333
0
  }
334
335
0
  ASSERT_OK(WaitFor([&]() -> Result<bool> {
336
0
    const auto& tserver = cluster_->mini_tablet_server(0)->server();
337
0
    auto cdc_service = dynamic_cast<CDCServiceImpl*>(
338
0
        tserver->rpc_server()->TEST_service_pool("yb.cdc.CDCService")->TEST_get_service().get());
339
0
    auto metrics = cdc_service->GetCDCTabletMetrics({"" /* UUID */, stream_id, tablet_id});
340
0
    auto lag = metrics->async_replication_sent_lag_micros->value();
341
0
    YB_LOG_EVERY_N_SECS(INFO, 1) << "Sent lag: " << lag << "us";
342
    // Only check sent lag, since we're just calling GetChanges once and expect committed lag to be
343
    // greater than 0.
344
0
    return lag == 0;
345
0
  }, MonoDelta::FromSeconds(10), "Wait for Sent Lag == 0"));
346
347
348
0
}
349
350
} // namespace cdc
351
} // namespace yb