/Users/deen/code/yugabyte-db/ent/src/yb/integration-tests/cdc_service-int-test.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | |
3 | | #include <gflags/gflags_declare.h> |
4 | | |
5 | | #include "yb/common/wire_protocol.h" |
6 | | #include "yb/common/wire_protocol-test-util.h" |
7 | | #include "yb/common/ql_value.h" |
8 | | |
9 | | #include "yb/consensus/log.h" |
10 | | #include "yb/consensus/log_reader.h" |
11 | | |
12 | | #include "yb/cdc/cdc_service.h" |
13 | | #include "yb/cdc/cdc_service.proxy.h" |
14 | | #include "yb/client/error.h" |
15 | | #include "yb/client/schema.h" |
16 | | #include "yb/client/session.h" |
17 | | #include "yb/client/table.h" |
18 | | #include "yb/client/table_handle.h" |
19 | | #include "yb/client/yb_op.h" |
20 | | #include "yb/client/yb_table_name.h" |
21 | | #include "yb/client/client-test-util.h" |
22 | | #include "yb/docdb/primitive_value.h" |
23 | | #include "yb/docdb/value_type.h" |
24 | | |
25 | | #include "yb/gutil/casts.h" |
26 | | #include "yb/gutil/walltime.h" |
27 | | |
28 | | #include "yb/integration-tests/cdc_test_util.h" |
29 | | #include "yb/integration-tests/mini_cluster.h" |
30 | | #include "yb/integration-tests/yb_mini_cluster_test_base.h" |
31 | | |
32 | | #include "yb/master/master_defaults.h" |
33 | | #include "yb/master/mini_master.h" |
34 | | #include "yb/rpc/messenger.h" |
35 | | #include "yb/rpc/rpc_controller.h" |
36 | | #include "yb/tablet/tablet.h" |
37 | | #include "yb/tablet/tablet_metadata.h" |
38 | | #include "yb/tablet/tablet_peer.h" |
39 | | |
40 | | #include "yb/tserver/mini_tablet_server.h" |
41 | | #include "yb/tserver/tablet_server.h" |
42 | | #include "yb/tserver/ts_tablet_manager.h" |
43 | | #include "yb/tserver/tserver_service.proxy.h" |
44 | | |
45 | | #include "yb/util/format.h" |
46 | | #include "yb/util/metrics.h" |
47 | | #include "yb/util/monotime.h" |
48 | | #include "yb/util/slice.h" |
49 | | #include "yb/util/status_format.h" |
50 | | #include "yb/util/tsan_util.h" |
51 | | #include "yb/yql/cql/ql/util/errcodes.h" |
52 | | #include "yb/yql/cql/ql/util/statement_result.h" |
53 | | |
54 | | DECLARE_bool(TEST_record_segments_violate_max_time_policy); |
55 | | DECLARE_bool(TEST_record_segments_violate_min_space_policy); |
56 | | DECLARE_bool(enable_load_balancing); |
57 | | DECLARE_bool(enable_log_retention_by_op_idx); |
58 | | DECLARE_bool(enable_ysql); |
59 | | DECLARE_double(leader_failure_max_missed_heartbeat_periods); |
60 | | DECLARE_int32(cdc_min_replicated_index_considered_stale_secs); |
61 | | DECLARE_int32(cdc_state_checkpoint_update_interval_ms); |
62 | | DECLARE_int32(cdc_wal_retention_time_secs); |
63 | | DECLARE_int32(client_read_write_timeout_ms); |
64 | | DECLARE_int32(follower_unavailable_considered_failed_sec); |
65 | | DECLARE_int32(log_max_seconds_to_retain); |
66 | | DECLARE_int32(log_min_seconds_to_retain); |
67 | | DECLARE_int32(log_min_segments_to_retain); |
68 | | DECLARE_int32(update_min_cdc_indices_interval_secs); |
69 | | DECLARE_int64(TEST_simulate_free_space_bytes); |
70 | | DECLARE_int64(log_stop_retaining_min_disk_mb); |
71 | | DECLARE_uint64(log_segment_size_bytes); |
72 | | DECLARE_int32(update_metrics_interval_ms); |
73 | | DECLARE_bool(enable_collect_cdc_metrics); |
74 | | DECLARE_bool(cdc_enable_replicate_intents); |
75 | | DECLARE_bool(get_changes_honor_deadline); |
76 | | DECLARE_int32(cdc_read_rpc_timeout_ms); |
77 | | DECLARE_int32(TEST_get_changes_read_loop_delay_ms); |
78 | | DECLARE_double(cdc_read_safe_deadline_ratio); |
79 | | |
80 | | METRIC_DECLARE_entity(cdc); |
81 | | METRIC_DECLARE_gauge_int64(last_read_opid_index); |
82 | | |
83 | | namespace yb { |
84 | | |
85 | | namespace log { |
86 | | class LogReader; |
87 | | } |
88 | | |
89 | | namespace cdc { |
90 | | |
91 | | using client::TableHandle; |
92 | | using client::YBSessionPtr; |
93 | | using master::MiniMaster; |
94 | | using rpc::RpcController; |
95 | | |
96 | | const std::string kCDCTestKeyspace = "my_keyspace"; |
97 | | const std::string kCDCTestTableName = "cdc_test_table"; |
98 | | const client::YBTableName kTableName(YQL_DATABASE_CQL, kCDCTestKeyspace, kCDCTestTableName); |
99 | | const client::YBTableName kCdcStateTableName( |
100 | | YQL_DATABASE_CQL, master::kSystemNamespaceName, master::kCdcStateTableName); |
101 | | |
102 | 0 | CDCServiceImpl* CDCService(tserver::TabletServer* tserver) { |
103 | 0 | return down_cast<CDCServiceImpl*>( |
104 | 0 | tserver->rpc_server()->TEST_service_pool("yb.cdc.CDCService")->TEST_get_service().get()); |
105 | 0 | } |
106 | | |
107 | | class CDCServiceTest : public YBMiniClusterTestBase<MiniCluster>, |
108 | | public testing::WithParamInterface<bool> { |
109 | | protected: |
110 | 0 | void SetUp() override { |
111 | 0 | YBMiniClusterTestBase::SetUp(); |
112 | |
|
113 | 0 | MiniClusterOptions opts; |
114 | 0 | SetAtomicFlag(false, &FLAGS_enable_ysql); |
115 | 0 | SetAtomicFlag(GetParam(), &FLAGS_cdc_enable_replicate_intents); |
116 | 0 | SetAtomicFlag(1000, &FLAGS_update_metrics_interval_ms); |
117 | 0 | opts.num_tablet_servers = server_count(); |
118 | 0 | opts.num_masters = 1; |
119 | 0 | cluster_.reset(new MiniCluster(opts)); |
120 | 0 | ASSERT_OK(cluster_->Start()); |
121 | |
|
122 | 0 | client_ = ASSERT_RESULT(cluster_->CreateClient()); |
123 | 0 | cdc_proxy_ = std::make_unique<CDCServiceProxy>( |
124 | 0 | &client_->proxy_cache(), |
125 | 0 | HostPort::FromBoundEndpoint(cluster_->mini_tablet_server(0)->bound_rpc_addr())); |
126 | |
|
127 | 0 | CreateTable(tablet_count(), &table_); |
128 | 0 | } |
129 | | |
130 | 0 | void DoTearDown() override { |
131 | 0 | if (!(stream_id_.empty())) { |
132 | 0 | ASSERT_OK(client_->DeleteCDCStream(stream_id_, |
133 | 0 | false /*force_delete*/, |
134 | 0 | true /*ignore_errors*/)); |
135 | | // wait for stream to completely finish deleting |
136 | 0 | } |
137 | 0 | Result<bool> exist = client_->TableExists(kTableName); |
138 | 0 | ASSERT_OK(exist); |
139 | |
|
140 | 0 | if (exist.get()) { |
141 | 0 | ASSERT_OK(client_->DeleteTable(kTableName)); |
142 | 0 | } |
143 | |
|
144 | 0 | client_.reset(); |
145 | |
|
146 | 0 | if (cluster_) { |
147 | 0 | cluster_->Shutdown(); |
148 | 0 | cluster_.reset(); |
149 | 0 | } |
150 | |
|
151 | 0 | YBMiniClusterTestBase::DoTearDown(); |
152 | 0 | } |
153 | | |
154 | | void CreateTable(int num_tablets, TableHandle* table); |
155 | | void GetTablets(std::vector<TabletId>* tablet_ids, |
156 | | const client::YBTableName& table_name = kTableName); |
157 | | void GetTablet(std::string* tablet_id, |
158 | | const client::YBTableName& table_name = kTableName); |
159 | | void GetChanges(const TabletId& tablet_id, const CDCStreamId& stream_id, |
160 | | int64_t term, int64_t index, bool* has_error = nullptr); |
161 | | void WriteTestRow(int32_t key, int32_t int_val, const string& string_val, |
162 | | const TabletId& tablet_id, const std::shared_ptr<tserver::TabletServerServiceProxy>& proxy); |
163 | | CHECKED_STATUS WriteToProxyWithRetries( |
164 | | const std::shared_ptr<tserver::TabletServerServiceProxy>& proxy, |
165 | | const tserver::WriteRequestPB& req, tserver::WriteResponsePB* resp, RpcController* rpc); |
166 | | CHECKED_STATUS GetChangesWithRetries( |
167 | | const GetChangesRequestPB& change_req, GetChangesResponsePB* change_resp, |
168 | | int timeout_ms, int max_attempts = 3); |
169 | | tserver::MiniTabletServer* GetLeaderForTablet(const std::string& tablet_id); |
170 | 0 | virtual int server_count() { return 1; } |
171 | 0 | virtual int tablet_count() { return 1; } |
172 | | |
173 | | std::unique_ptr<CDCServiceProxy> cdc_proxy_; |
174 | | std::unique_ptr<client::YBClient> client_; |
175 | | CDCStreamId stream_id_; |
176 | | TableHandle table_; |
177 | | }; |
178 | | |
179 | | INSTANTIATE_TEST_CASE_P(EnableReplicateIntents, CDCServiceTest, ::testing::Bool()); |
180 | | |
181 | 0 | void CDCServiceTest::CreateTable(int num_tablets, TableHandle* table) { |
182 | 0 | ASSERT_OK(client_->CreateNamespaceIfNotExists(kTableName.namespace_name(), |
183 | 0 | kTableName.namespace_type())); |
184 | |
|
185 | 0 | client::YBSchemaBuilder builder; |
186 | 0 | builder.AddColumn("key")->Type(INT32)->HashPrimaryKey()->NotNull(); |
187 | 0 | builder.AddColumn("int_val")->Type(INT32); |
188 | 0 | builder.AddColumn("string_val")->Type(STRING); |
189 | |
|
190 | 0 | TableProperties table_properties; |
191 | 0 | table_properties.SetTransactional(true); |
192 | 0 | builder.SetTableProperties(table_properties); |
193 | |
|
194 | 0 | ASSERT_OK(table->Create(kTableName, num_tablets, client_.get(), &builder)); |
195 | 0 | } |
196 | | |
197 | | void AssertChangeRecords(const google::protobuf::RepeatedPtrField<cdc::KeyValuePairPB>& changes, |
198 | | int32_t expected_int, std::string expected_str) { |
199 | | ASSERT_EQ(changes.size(), 2); |
200 | | ASSERT_EQ(changes[0].key(), "int_val"); |
201 | | ASSERT_EQ(changes[0].value().int32_value(), expected_int); |
202 | | ASSERT_EQ(changes[1].key(), "string_val"); |
203 | | ASSERT_EQ(changes[1].value().string_value(), expected_str); |
204 | | } |
205 | | |
206 | 0 | void VerifyCdcStateNotEmpty(client::YBClient* client) { |
207 | 0 | client::TableHandle table; |
208 | 0 | client::YBTableName cdc_state_table( |
209 | 0 | YQL_DATABASE_CQL, master::kSystemNamespaceName, master::kCdcStateTableName); |
210 | 0 | ASSERT_OK(table.Open(cdc_state_table, client)); |
211 | 0 | ASSERT_EQ(1, boost::size(client::TableRange(table))); |
212 | 0 | const auto& row = client::TableRange(table).begin(); |
213 | 0 | string checkpoint = row->column(master::kCdcCheckpointIdx).string_value(); |
214 | 0 | auto result = OpId::FromString(checkpoint); |
215 | 0 | ASSERT_OK(result); |
216 | 0 | OpId op_id = *result; |
217 | | // Verify that op id index has been advanced and is not 0. |
218 | 0 | ASSERT_GT(op_id.index, 0); |
219 | 0 | } |
220 | | |
221 | | void VerifyCdcStateMatches(client::YBClient* client, |
222 | | const CDCStreamId& stream_id, |
223 | | const TabletId& tablet_id, |
224 | | uint64_t term, |
225 | 0 | uint64_t index) { |
226 | 0 | client::TableHandle table; |
227 | 0 | client::YBTableName cdc_state_table( |
228 | 0 | YQL_DATABASE_CQL, master::kSystemNamespaceName, master::kCdcStateTableName); |
229 | 0 | ASSERT_OK(table.Open(cdc_state_table, client)); |
230 | 0 | const auto op = table.NewReadOp(); |
231 | 0 | auto* const req = op->mutable_request(); |
232 | 0 | QLAddStringHashValue(req, tablet_id); |
233 | 0 | auto cond = req->mutable_where_expr()->mutable_condition(); |
234 | 0 | cond->set_op(QLOperator::QL_OP_AND); |
235 | 0 | QLAddStringCondition(cond, Schema::first_column_id() + master::kCdcStreamIdIdx, QL_OP_EQUAL, |
236 | 0 | stream_id); |
237 | 0 | table.AddColumns({master::kCdcCheckpoint}, req); |
238 | |
|
239 | 0 | auto session = client->NewSession(); |
240 | 0 | ASSERT_OK(session->ApplyAndFlush(op)); |
241 | |
|
242 | 0 | LOG(INFO) << strings::Substitute("Verifying tablet: $0, stream: $1, op_id: $2", |
243 | 0 | tablet_id, stream_id, OpId(term, index).ToString()); |
244 | |
|
245 | 0 | auto row_block = ql::RowsResult(op.get()).GetRowBlock(); |
246 | 0 | ASSERT_EQ(row_block->row_count(), 1); |
247 | |
|
248 | 0 | string checkpoint = row_block->row(0).column(0).string_value(); |
249 | 0 | auto result = OpId::FromString(checkpoint); |
250 | 0 | ASSERT_OK(result); |
251 | 0 | OpId op_id = *result; |
252 | |
|
253 | 0 | ASSERT_EQ(op_id.term, term); |
254 | 0 | ASSERT_EQ(op_id.index, index); |
255 | 0 | } |
256 | | |
257 | | void VerifyStreamDeletedFromCdcState(client::YBClient* client, |
258 | | const CDCStreamId& stream_id, |
259 | | const TabletId& tablet_id, |
260 | 0 | int timeout_secs = 10) { |
261 | 0 | client::TableHandle table; |
262 | 0 | const client::YBTableName cdc_state_table( |
263 | 0 | YQL_DATABASE_CQL, master::kSystemNamespaceName, master::kCdcStateTableName); |
264 | 0 | ASSERT_OK(table.Open(cdc_state_table, client)); |
265 | |
|
266 | 0 | const auto op = table.NewReadOp(); |
267 | 0 | auto* const req = op->mutable_request(); |
268 | 0 | QLAddStringHashValue(req, tablet_id); |
269 | |
|
270 | 0 | auto cond = req->mutable_where_expr()->mutable_condition(); |
271 | 0 | cond->set_op(QLOperator::QL_OP_AND); |
272 | 0 | QLAddStringCondition(cond, Schema::first_column_id() + master::kCdcStreamIdIdx, QL_OP_EQUAL, |
273 | 0 | stream_id); |
274 | |
|
275 | 0 | table.AddColumns({master::kCdcCheckpoint}, req); |
276 | 0 | auto session = client->NewSession(); |
277 | | |
278 | | // The deletion of cdc_state rows for the specified stream happen in an asynchronous thread, |
279 | | // so even if the request has returned, it doesn't mean that the rows have been deleted yet. |
280 | 0 | ASSERT_OK(WaitFor([&](){ |
281 | 0 | EXPECT_OK(session->ApplyAndFlush(op)); |
282 | 0 | auto row_block = ql::RowsResult(op.get()).GetRowBlock(); |
283 | 0 | if (row_block->row_count() == 0) { |
284 | 0 | return true; |
285 | 0 | } |
286 | 0 | return false; |
287 | 0 | }, MonoDelta::FromSeconds(timeout_secs) * kTimeMultiplier, |
288 | 0 | "Stream rows in cdc_state have been deleted.")); |
289 | 0 | } |
290 | | |
291 | | void CDCServiceTest::GetTablets(std::vector<TabletId>* tablet_ids, |
292 | 0 | const client::YBTableName& table_name) { |
293 | 0 | std::vector<std::string> ranges; |
294 | 0 | ASSERT_OK(client_->GetTablets( |
295 | 0 | table_name, 0 /* max_tablets */, tablet_ids, &ranges, nullptr /* locations */, |
296 | 0 | RequireTabletsRunning::kFalse, master::IncludeInactive::kTrue)); |
297 | 0 | ASSERT_EQ(tablet_ids->size(), tablet_count()); |
298 | 0 | } |
299 | | |
300 | | void CDCServiceTest::GetTablet(std::string* tablet_id, |
301 | 0 | const client::YBTableName& table_name) { |
302 | 0 | std::vector<TabletId> tablet_ids; |
303 | 0 | GetTablets(&tablet_ids, table_name); |
304 | 0 | *tablet_id = tablet_ids[0]; |
305 | 0 | } |
306 | | |
307 | | void CDCServiceTest::GetChanges(const TabletId& tablet_id, const CDCStreamId& stream_id, |
308 | 0 | int64_t term, int64_t index, bool* has_error) { |
309 | 0 | GetChangesRequestPB change_req; |
310 | 0 | GetChangesResponsePB change_resp; |
311 | |
|
312 | 0 | change_req.set_tablet_id(tablet_id); |
313 | 0 | change_req.set_stream_id(stream_id); |
314 | 0 | change_req.mutable_from_checkpoint()->mutable_op_id()->set_term(term); |
315 | 0 | change_req.mutable_from_checkpoint()->mutable_op_id()->set_index(index); |
316 | 0 | change_req.set_serve_as_proxy(true); |
317 | |
|
318 | 0 | { |
319 | 0 | RpcController rpc; |
320 | 0 | rpc.set_timeout(MonoDelta::FromSeconds(10.0) * kTimeMultiplier); |
321 | 0 | SCOPED_TRACE(change_req.DebugString()); |
322 | 0 | auto s = cdc_proxy_->GetChanges(change_req, &change_resp, &rpc); |
323 | 0 | if (!has_error) { |
324 | 0 | ASSERT_OK(s); |
325 | 0 | ASSERT_FALSE(change_resp.has_error()); |
326 | 0 | } else if (!s.ok() || change_resp.has_error()) { |
327 | 0 | *has_error = true; |
328 | 0 | return; |
329 | 0 | } |
330 | 0 | } |
331 | 0 | } |
332 | | |
333 | | void CDCServiceTest::WriteTestRow(int32_t key, |
334 | | int32_t int_val, |
335 | | const string& string_val, |
336 | | const TabletId& tablet_id, |
337 | 0 | const std::shared_ptr<tserver::TabletServerServiceProxy>& proxy) { |
338 | 0 | tserver::WriteRequestPB write_req; |
339 | 0 | tserver::WriteResponsePB write_resp; |
340 | 0 | write_req.set_tablet_id(tablet_id); |
341 | |
|
342 | 0 | RpcController rpc; |
343 | 0 | AddTestRowInsert(key, int_val, string_val, &write_req); |
344 | 0 | SCOPED_TRACE(write_req.DebugString()); |
345 | 0 | ASSERT_OK(WriteToProxyWithRetries(proxy, write_req, &write_resp, &rpc)); |
346 | 0 | SCOPED_TRACE(write_resp.DebugString()); |
347 | 0 | ASSERT_FALSE(write_resp.has_error()); |
348 | 0 | } |
349 | | |
350 | | Status CDCServiceTest::WriteToProxyWithRetries( |
351 | | const std::shared_ptr<tserver::TabletServerServiceProxy>& proxy, |
352 | | const tserver::WriteRequestPB& req, |
353 | | tserver::WriteResponsePB* resp, |
354 | 0 | RpcController* rpc) { |
355 | 0 | return LoggedWaitFor( |
356 | 0 | [&req, resp, rpc, proxy]() -> Result<bool> { |
357 | 0 | auto s = proxy->Write(req, resp, rpc); |
358 | 0 | if (s.IsTryAgain() || |
359 | 0 | (resp->has_error() && StatusFromPB(resp->error().status()).IsTryAgain())) { |
360 | 0 | rpc->Reset(); |
361 | 0 | return false; |
362 | 0 | } |
363 | 0 | RETURN_NOT_OK(s); |
364 | 0 | return true; |
365 | 0 | }, |
366 | 0 | MonoDelta::FromSeconds(10) * kTimeMultiplier, "Write test row"); |
367 | 0 | } |
368 | | |
369 | | Status CDCServiceTest::GetChangesWithRetries( |
370 | | const GetChangesRequestPB& req, |
371 | | GetChangesResponsePB* resp, |
372 | | int timeout_ms, |
373 | 0 | int max_attempts) { |
374 | | // Keep track of number of attempts. |
375 | 0 | int num_attempts = 0; |
376 | |
|
377 | 0 | auto return_status = LoggedWaitFor( |
378 | 0 | [&]() -> Result<bool> { |
379 | | // RpcController needs to be rebuilt every time since the deadline is constant. |
380 | 0 | RpcController rpc; |
381 | 0 | rpc.set_timeout(MonoDelta::FromMilliseconds(timeout_ms)); |
382 | | |
383 | | // Update return_status to be the status from the latest attempt. |
384 | 0 | auto s = cdc_proxy_->GetChanges(req, resp, &rpc); |
385 | 0 | ++num_attempts; |
386 | | |
387 | | // Exit if we exhausted the number of attempts. |
388 | 0 | if (num_attempts >= max_attempts) { |
389 | 0 | return s.ok() ? Status::OK() : STATUS_FORMAT( |
390 | 0 | TimedOut, "Tried calling GetChanges $0 times with no success.", num_attempts); |
391 | 0 | } |
392 | | |
393 | | // Try again if we still have attempts left. |
394 | 0 | if (!s.ok()) { |
395 | 0 | return false; |
396 | 0 | } |
397 | | |
398 | 0 | return true; |
399 | 0 | }, |
400 | 0 | MonoDelta::FromSeconds(5) * max_attempts * kTimeMultiplier, "Call GetChanges"); |
401 | |
|
402 | 0 | return return_status; |
403 | 0 | } |
404 | | |
405 | 0 | tserver::MiniTabletServer* CDCServiceTest::GetLeaderForTablet(const std::string& tablet_id) { |
406 | 0 | for (size_t i = 0; i < cluster_->num_tablet_servers(); i++) { |
407 | 0 | if (cluster_->mini_tablet_server(i)->server()->LeaderAndReady(tablet_id)) { |
408 | 0 | return cluster_->mini_tablet_server(i); |
409 | 0 | } |
410 | 0 | } |
411 | 0 | return nullptr; |
412 | 0 | } |
413 | | |
414 | 0 | TEST_P(CDCServiceTest, TestCompoundKey) { |
415 | | // Create a table with a compound primary key. |
416 | 0 | static const std::string kCDCTestTableCompoundKeyName = "cdc_test_table_compound_key"; |
417 | 0 | static const client::YBTableName kTableNameCompoundKey( |
418 | 0 | YQL_DATABASE_CQL, kCDCTestKeyspace, kCDCTestTableCompoundKeyName); |
419 | |
|
420 | 0 | client::YBSchemaBuilder builder; |
421 | 0 | builder.AddColumn("hash_key")->Type(STRING)->HashPrimaryKey()->NotNull(); |
422 | 0 | builder.AddColumn("range_key")->Type(STRING)->PrimaryKey()->NotNull(); |
423 | 0 | builder.AddColumn("val")->Type(INT32); |
424 | |
|
425 | 0 | TableProperties table_properties; |
426 | 0 | table_properties.SetTransactional(true); |
427 | 0 | builder.SetTableProperties(table_properties); |
428 | |
|
429 | 0 | TableHandle table; |
430 | 0 | ASSERT_OK(table.Create(kTableNameCompoundKey, tablet_count(), client_.get(), &builder)); |
431 | | |
432 | | // Create a stream on the table |
433 | 0 | CreateCDCStream(cdc_proxy_, table.table()->id(), &stream_id_); |
434 | |
|
435 | 0 | std::string tablet_id; |
436 | 0 | GetTablet(&tablet_id, table.name()); |
437 | | |
438 | | // Now apply two ops with same hash key but different range key in a batch. |
439 | 0 | auto session = client_->NewSession(); |
440 | 0 | for (int i = 0; i < 2; i++) { |
441 | 0 | const auto op = table.NewUpdateOp(); |
442 | 0 | auto* const req = op->mutable_request(); |
443 | 0 | QLAddStringHashValue(req, "hk"); |
444 | 0 | QLAddStringRangeValue(req, Format("rk_$0", i)); |
445 | 0 | table.AddInt32ColumnValue(req, "val", i); |
446 | 0 | session->Apply(op); |
447 | 0 | } |
448 | 0 | ASSERT_OK(session->Flush()); |
449 | | |
450 | | // Get CDC changes. |
451 | 0 | GetChangesRequestPB change_req; |
452 | 0 | GetChangesResponsePB change_resp; |
453 | |
|
454 | 0 | change_req.set_tablet_id(tablet_id); |
455 | 0 | change_req.set_stream_id(stream_id_); |
456 | 0 | change_req.mutable_from_checkpoint()->mutable_op_id()->set_index(0); |
457 | 0 | change_req.mutable_from_checkpoint()->mutable_op_id()->set_term(0); |
458 | |
|
459 | 0 | { |
460 | 0 | RpcController rpc; |
461 | 0 | SCOPED_TRACE(change_req.DebugString()); |
462 | 0 | ASSERT_OK(cdc_proxy_->GetChanges(change_req, &change_resp, &rpc)); |
463 | 0 | ASSERT_FALSE(change_resp.has_error()); |
464 | 0 | ASSERT_EQ(change_resp.records_size(), 2); |
465 | 0 | } |
466 | | |
467 | | // Verify the results. |
468 | 0 | for (int i = 0; i < change_resp.records_size(); i++) { |
469 | 0 | ASSERT_EQ(change_resp.records(i).operation(), CDCRecordPB::WRITE); |
470 | |
|
471 | 0 | ASSERT_EQ(change_resp.records(i).key_size(), 2); |
472 | | // Check the key. |
473 | 0 | ASSERT_EQ(change_resp.records(i).key(0).value().string_value(), "hk"); |
474 | 0 | ASSERT_EQ(change_resp.records(i).key(1).value().string_value(), Format("rk_$0", i)); |
475 | |
|
476 | 0 | ASSERT_EQ(change_resp.records(i).changes_size(), 1); |
477 | 0 | ASSERT_EQ(change_resp.records(i).changes(0).value().int32_value(), i); |
478 | 0 | } |
479 | 0 | } |
480 | | |
481 | 0 | TEST_P(CDCServiceTest, TestCreateCDCStream) { |
482 | 0 | CreateCDCStream(cdc_proxy_, table_.table()->id(), &stream_id_); |
483 | |
|
484 | 0 | NamespaceId ns_id; |
485 | 0 | std::vector<TableId> table_ids; |
486 | 0 | std::unordered_map<std::string, std::string> options; |
487 | 0 | ASSERT_OK(client_->GetCDCStream(stream_id_, &ns_id, &table_ids, &options)); |
488 | 0 | ASSERT_EQ(table_ids.front(), table_.table()->id()); |
489 | 0 | } |
490 | | |
491 | 0 | TEST_P(CDCServiceTest, TestCreateCDCStreamWithDefaultRententionTime) { |
492 | | // Set default WAL retention time to 10 hours. |
493 | 0 | FLAGS_cdc_wal_retention_time_secs = 36000; |
494 | |
|
495 | 0 | CreateCDCStream(cdc_proxy_, table_.table()->id(), &stream_id_); |
496 | |
|
497 | 0 | NamespaceId ns_id; |
498 | 0 | std::vector<TableId> table_ids; |
499 | 0 | std::unordered_map<std::string, std::string> options; |
500 | 0 | ASSERT_OK(client_->GetCDCStream(stream_id_, &ns_id, &table_ids, &options)); |
501 | | |
502 | | |
503 | | // Verify that the wal retention time was set at the tablet level. |
504 | 0 | VerifyWalRetentionTime(cluster_.get(), kCDCTestTableName, FLAGS_cdc_wal_retention_time_secs); |
505 | 0 | } |
506 | | |
507 | 0 | TEST_P(CDCServiceTest, TestDeleteCDCStream) { |
508 | 0 | FLAGS_cdc_state_checkpoint_update_interval_ms = 0; |
509 | 0 | CreateCDCStream(cdc_proxy_, table_.table()->id(), &stream_id_); |
510 | |
|
511 | 0 | NamespaceId ns_id; |
512 | 0 | std::vector<TableId> table_ids; |
513 | 0 | std::unordered_map<std::string, std::string> options; |
514 | 0 | ASSERT_OK(client_->GetCDCStream(stream_id_, &ns_id, &table_ids, &options)); |
515 | 0 | ASSERT_EQ(table_ids.front(), table_.table()->id()); |
516 | | |
517 | |
|
518 | 0 | std::vector<std::string> tablet_ids; |
519 | 0 | std::vector<std::string> ranges; |
520 | 0 | ASSERT_OK(client_->GetTablets(table_.table()->name(), 0 /* max_tablets */, &tablet_ids, &ranges)); |
521 | |
|
522 | 0 | for (const auto& tablet_id : tablet_ids) { |
523 | 0 | VerifyCdcStateMatches(client_.get(), stream_id_, tablet_id, 0, 0); |
524 | 0 | } |
525 | |
|
526 | 0 | { |
527 | 0 | const auto& tserver = cluster_->mini_tablet_server(0)->server(); |
528 | 0 | std::string tablet_id; |
529 | 0 | GetTablet(&tablet_id, table_.name()); |
530 | 0 | ASSERT_NO_FATALS(WriteTestRow(0, 10, "key0", tablet_id, tserver->proxy())); |
531 | 0 | } |
532 | |
|
533 | 0 | ASSERT_OK(client_->DeleteCDCStream(stream_id_)); |
534 | | |
535 | | // Check that the stream still no longer exists. |
536 | 0 | ns_id.clear(); |
537 | 0 | table_ids.clear(); |
538 | 0 | options.clear(); |
539 | 0 | Status s = client_->GetCDCStream(stream_id_, &ns_id, &table_ids, &options); |
540 | 0 | ASSERT_TRUE(s.IsNotFound()); |
541 | |
|
542 | 0 | for (const auto& tablet_id : tablet_ids) { |
543 | 0 | VerifyStreamDeletedFromCdcState(client_.get(), stream_id_, tablet_id); |
544 | 0 | } |
545 | | |
546 | | // Once the CatalogManager has cleaned up cdc_state, make sure a subsequent call to GetChanges |
547 | | // doesn't re-populate cdc_state with cleaned up entries. UpdateCheckpoint will be called since |
548 | | // this is the first time we're calling GetChanges. |
549 | 0 | bool get_changes_error = false; |
550 | 0 | for (const auto& tablet_id : tablet_ids) { |
551 | 0 | GetChanges(tablet_id, stream_id_, 0, 0, &get_changes_error); |
552 | 0 | ASSERT_FALSE(get_changes_error); |
553 | 0 | } |
554 | |
|
555 | 0 | for (const auto& tablet_id : tablet_ids) { |
556 | 0 | VerifyStreamDeletedFromCdcState(client_.get(), stream_id_, tablet_id); |
557 | 0 | } |
558 | 0 | } |
559 | | |
560 | 0 | TEST_P(CDCServiceTest, TestMetricsOnDeletedReplication) { |
561 | 0 | CreateCDCStream(cdc_proxy_, table_.table()->id(), &stream_id_); |
562 | 0 | ANNOTATE_UNPROTECTED_WRITE(FLAGS_enable_collect_cdc_metrics) = true; |
563 | |
|
564 | 0 | std::string tablet_id; |
565 | 0 | GetTablet(&tablet_id); |
566 | |
|
567 | 0 | const auto& tserver = cluster_->mini_tablet_server(0)->server(); |
568 | | // Use proxy for to most accurately simulate normal requests. |
569 | 0 | const auto& proxy = tserver->proxy(); |
570 | |
|
571 | 0 | GetChangesRequestPB change_req; |
572 | 0 | GetChangesResponsePB change_resp; |
573 | 0 | change_req.set_tablet_id(tablet_id); |
574 | 0 | change_req.set_stream_id(stream_id_); |
575 | 0 | change_req.mutable_from_checkpoint()->mutable_op_id()->set_index(0); |
576 | 0 | change_req.mutable_from_checkpoint()->mutable_op_id()->set_term(0); |
577 | 0 | { |
578 | 0 | RpcController rpc; |
579 | 0 | SCOPED_TRACE(change_req.DebugString()); |
580 | 0 | ASSERT_OK(cdc_proxy_->GetChanges(change_req, &change_resp, &rpc)); |
581 | 0 | } |
582 | | |
583 | | // Insert test rows, one at a time so they have different hybrid times. |
584 | 0 | tserver::WriteRequestPB write_req; |
585 | 0 | tserver::WriteResponsePB write_resp; |
586 | 0 | write_req.set_tablet_id(tablet_id); |
587 | 0 | { |
588 | 0 | RpcController rpc; |
589 | 0 | AddTestRowInsert(1, 11, "key1", &write_req); |
590 | 0 | AddTestRowInsert(2, 22, "key2", &write_req); |
591 | 0 | SCOPED_TRACE(write_req.DebugString()); |
592 | 0 | ASSERT_OK(WriteToProxyWithRetries(proxy, write_req, &write_resp, &rpc)); |
593 | 0 | SCOPED_TRACE(write_resp.DebugString()); |
594 | 0 | ASSERT_FALSE(write_resp.has_error()); |
595 | 0 | } |
596 | |
|
597 | 0 | auto cdc_service = CDCService(tserver); |
598 | | // Assert that leader lag > 0. |
599 | 0 | ASSERT_OK(WaitFor([&]() -> Result<bool> { |
600 | 0 | auto metrics = cdc_service->GetCDCTabletMetrics({"" /* UUID */, stream_id_, tablet_id}); |
601 | 0 | return metrics->async_replication_sent_lag_micros->value() > 0 && |
602 | 0 | metrics->async_replication_committed_lag_micros->value() > 0; |
603 | 0 | }, MonoDelta::FromSeconds(10) * kTimeMultiplier, "Wait for Lag > 0")); |
604 | | |
605 | | // Now, delete the replication stream and assert that lag is 0. |
606 | 0 | ASSERT_OK(client_->DeleteCDCStream(stream_id_)); |
607 | 0 | ASSERT_OK(WaitFor([&]() -> Result<bool> { |
608 | 0 | auto metrics = cdc_service->GetCDCTabletMetrics({"" /* UUID */, stream_id_, tablet_id}); |
609 | 0 | return metrics->async_replication_sent_lag_micros->value() == 0 && |
610 | 0 | metrics->async_replication_committed_lag_micros->value() == 0; |
611 | 0 | }, MonoDelta::FromSeconds(10) * kTimeMultiplier, "Wait for Lag = 0")); |
612 | 0 | } |
613 | | |
614 | | |
615 | 0 | TEST_P(CDCServiceTest, TestGetChanges) { |
616 | 0 | CreateCDCStream(cdc_proxy_, table_.table()->id(), &stream_id_); |
617 | 0 | ANNOTATE_UNPROTECTED_WRITE(FLAGS_enable_collect_cdc_metrics) = true; |
618 | |
|
619 | 0 | std::string tablet_id; |
620 | 0 | GetTablet(&tablet_id); |
621 | |
|
622 | 0 | const auto& tserver = cluster_->mini_tablet_server(0)->server(); |
623 | | // Use proxy for to most accurately simulate normal requests. |
624 | 0 | const auto& proxy = tserver->proxy(); |
625 | | |
626 | | // Insert test rows, one at a time so they have different hybrid times. |
627 | 0 | tserver::WriteRequestPB write_req; |
628 | 0 | tserver::WriteResponsePB write_resp; |
629 | 0 | write_req.set_tablet_id(tablet_id); |
630 | 0 | { |
631 | 0 | RpcController rpc; |
632 | 0 | AddTestRowInsert(1, 11, "key1", &write_req); |
633 | 0 | AddTestRowInsert(2, 22, "key2", &write_req); |
634 | 0 | SCOPED_TRACE(write_req.DebugString()); |
635 | 0 | ASSERT_OK(WriteToProxyWithRetries(proxy, write_req, &write_resp, &rpc)); |
636 | 0 | SCOPED_TRACE(write_resp.DebugString()); |
637 | 0 | ASSERT_FALSE(write_resp.has_error()); |
638 | 0 | } |
639 | | |
640 | | // Get CDC changes. |
641 | 0 | GetChangesRequestPB change_req; |
642 | 0 | GetChangesResponsePB change_resp; |
643 | |
|
644 | 0 | change_req.set_tablet_id(tablet_id); |
645 | 0 | change_req.set_stream_id(stream_id_); |
646 | 0 | change_req.mutable_from_checkpoint()->mutable_op_id()->set_index(0); |
647 | 0 | change_req.mutable_from_checkpoint()->mutable_op_id()->set_term(0); |
648 | |
|
649 | 0 | { |
650 | 0 | RpcController rpc; |
651 | 0 | SCOPED_TRACE(change_req.DebugString()); |
652 | 0 | ASSERT_OK(cdc_proxy_->GetChanges(change_req, &change_resp, &rpc)); |
653 | 0 | SCOPED_TRACE(change_resp.DebugString()); |
654 | 0 | ASSERT_FALSE(change_resp.has_error()); |
655 | 0 | ASSERT_EQ(change_resp.records_size(), 2); |
656 | |
|
657 | 0 | std::pair<int, std::string> expected_results[2] = |
658 | 0 | {std::make_pair(11, "key1"), std::make_pair(22, "key2")}; |
659 | 0 | for (int i = 0; i < change_resp.records_size(); i++) { |
660 | 0 | ASSERT_EQ(change_resp.records(i).operation(), CDCRecordPB::WRITE); |
661 | | |
662 | | // Check the key. |
663 | 0 | ASSERT_NO_FATALS(AssertIntKey(change_resp.records(i).key(), i + 1)); |
664 | | |
665 | | // Check the change records. |
666 | 0 | ASSERT_NO_FATALS(AssertChangeRecords(change_resp.records(i).changes(), |
667 | 0 | expected_results[i].first, |
668 | 0 | expected_results[i].second)); |
669 | 0 | } |
670 | | |
671 | | // Verify the CDC Service-level metrics match what we just did. |
672 | 0 | auto cdc_service = CDCService(tserver); |
673 | 0 | auto metrics = cdc_service->GetCDCTabletMetrics({"" /* UUID */, stream_id_, tablet_id}); |
674 | 0 | ASSERT_EQ(metrics->last_read_opid_index->value(), metrics->last_readable_opid_index->value()); |
675 | 0 | ASSERT_EQ(metrics->last_read_opid_index->value(), change_resp.records_size() + 1 /* checkpt */); |
676 | 0 | ASSERT_EQ(metrics->rpc_payload_bytes_responded->TotalCount(), 1); |
677 | 0 | } |
678 | | |
679 | | // Insert another row. |
680 | 0 | { |
681 | 0 | write_req.Clear(); |
682 | 0 | write_req.set_tablet_id(tablet_id); |
683 | 0 | AddTestRowInsert(3, 33, "key3", &write_req); |
684 | |
|
685 | 0 | RpcController rpc; |
686 | 0 | SCOPED_TRACE(write_req.DebugString()); |
687 | 0 | ASSERT_OK(WriteToProxyWithRetries(proxy, write_req, &write_resp, &rpc)); |
688 | 0 | SCOPED_TRACE(write_resp.DebugString()); |
689 | 0 | ASSERT_FALSE(write_resp.has_error()); |
690 | 0 | } |
691 | | |
692 | | // Get next set of changes. |
693 | | // Copy checkpoint received from previous GetChanges CDC request. |
694 | 0 | change_req.mutable_from_checkpoint()->CopyFrom(change_resp.checkpoint()); |
695 | 0 | change_resp.Clear(); |
696 | 0 | { |
697 | 0 | RpcController rpc; |
698 | 0 | SCOPED_TRACE(change_req.DebugString()); |
699 | 0 | ASSERT_OK(cdc_proxy_->GetChanges(change_req, &change_resp, &rpc)); |
700 | 0 | SCOPED_TRACE(change_resp.DebugString()); |
701 | 0 | ASSERT_FALSE(change_resp.has_error()); |
702 | 0 | ASSERT_EQ(change_resp.records_size(), 1); |
703 | 0 | ASSERT_EQ(change_resp.records(0).operation(), CDCRecordPB_OperationType_WRITE); |
704 | | |
705 | | // Check the key. |
706 | 0 | ASSERT_NO_FATALS(AssertIntKey(change_resp.records(0).key(), 3)); |
707 | | |
708 | | // Check the change records. |
709 | 0 | ASSERT_NO_FATALS(AssertChangeRecords(change_resp.records(0).changes(), 33, "key3")); |
710 | 0 | } |
711 | | |
712 | | // Delete a row. |
713 | 0 | { |
714 | 0 | write_req.Clear(); |
715 | 0 | write_req.set_tablet_id(tablet_id); |
716 | 0 | AddTestRowDelete(1, &write_req); |
717 | |
|
718 | 0 | RpcController rpc; |
719 | 0 | SCOPED_TRACE(write_req.DebugString()); |
720 | 0 | ASSERT_OK(WriteToProxyWithRetries(proxy, write_req, &write_resp, &rpc)); |
721 | 0 | SCOPED_TRACE(write_resp.DebugString()); |
722 | 0 | ASSERT_FALSE(write_resp.has_error()); |
723 | 0 | } |
724 | | |
725 | | // Get next set of changes. |
726 | | // Copy checkpoint received from previous GetChanges CDC request. |
727 | 0 | change_req.mutable_from_checkpoint()->CopyFrom(change_resp.checkpoint()); |
728 | 0 | change_resp.Clear(); |
729 | 0 | { |
730 | 0 | RpcController rpc; |
731 | 0 | SCOPED_TRACE(change_req.DebugString()); |
732 | 0 | ASSERT_OK(cdc_proxy_->GetChanges(change_req, &change_resp, &rpc)); |
733 | 0 | SCOPED_TRACE(change_resp.DebugString()); |
734 | 0 | ASSERT_FALSE(change_resp.has_error()); |
735 | 0 | ASSERT_EQ(change_resp.records_size(), 1); |
736 | 0 | ASSERT_EQ(change_resp.records(0).operation(), CDCRecordPB_OperationType_DELETE); |
737 | | |
738 | | // Check the key deleted. |
739 | 0 | ASSERT_NO_FATALS(AssertIntKey(change_resp.records(0).key(), 1)); |
740 | 0 | } |
741 | | |
742 | | // Cleanup stream before shutdown. |
743 | 0 | ASSERT_OK(client_->DeleteCDCStream(stream_id_)); |
744 | 0 | VerifyStreamDeletedFromCdcState(client_.get(), stream_id_, tablet_id); |
745 | 0 | } |
746 | | |
747 | 0 | TEST_P(CDCServiceTest, TestGetChangesWithDeadline) { |
748 | 0 | CreateCDCStream(cdc_proxy_, table_.table()->id(), &stream_id_); |
749 | 0 | FLAGS_TEST_get_changes_read_loop_delay_ms = kTimeMultiplier; |
750 | 0 | FLAGS_log_segment_size_bytes = 100; |
751 | 0 | FLAGS_cdc_read_rpc_timeout_ms = 100 * kTimeMultiplier; |
752 | 0 | FLAGS_get_changes_honor_deadline = true; |
753 | 0 | FLAGS_cdc_read_safe_deadline_ratio = 0.30; |
754 | |
|
755 | 0 | std::string tablet_id; |
756 | 0 | GetTablet(&tablet_id); |
757 | |
|
758 | 0 | const auto& tserver = cluster_->mini_tablet_server(0)->server(); |
759 | | // Use proxy for to most accurately simulate normal requests. |
760 | 0 | const auto& proxy = tserver->proxy(); |
761 | | |
762 | | // Insert <num_records> test rows. |
763 | 0 | const int num_records = 500; |
764 | 0 | for (int i = 0; i < num_records; i++) { |
765 | 0 | WriteTestRow(i, i, Format("key$0", i), tablet_id, proxy); |
766 | 0 | } |
767 | | |
768 | | // Get CDC changes. Note that the timeout value and read delay |
769 | | // should ensure that some, but not all records are read. |
770 | 0 | GetChangesRequestPB change_req; |
771 | 0 | GetChangesResponsePB change_resp; |
772 | |
|
773 | 0 | change_req.set_tablet_id(tablet_id); |
774 | 0 | change_req.set_stream_id(stream_id_); |
775 | 0 | change_req.set_max_records(num_records); |
776 | 0 | change_req.mutable_from_checkpoint()->mutable_op_id()->set_index(0); |
777 | 0 | change_req.mutable_from_checkpoint()->mutable_op_id()->set_term(0); |
778 | |
|
779 | 0 | SCOPED_TRACE(change_req.DebugString()); |
780 | 0 | ASSERT_OK(GetChangesWithRetries(change_req, &change_resp, |
781 | 0 | FLAGS_cdc_read_rpc_timeout_ms)); |
782 | |
|
783 | 0 | SCOPED_TRACE(change_resp.DebugString()); |
784 | 0 | ASSERT_FALSE(change_resp.has_error()); |
785 | | |
786 | | // Ensure that partial results are returned |
787 | 0 | ASSERT_GT(change_resp.records_size(), 0); |
788 | 0 | ASSERT_LT(change_resp.records_size(), num_records); |
789 | | |
790 | | // Try again, but use a timeout value large enough such that |
791 | | // all records should be read before timeout. |
792 | 0 | FLAGS_TEST_get_changes_read_loop_delay_ms = 0; |
793 | 0 | FLAGS_cdc_read_rpc_timeout_ms = 30 * 1000 * kTimeMultiplier; |
794 | |
|
795 | 0 | ASSERT_OK(GetChangesWithRetries(change_req, &change_resp, |
796 | 0 | FLAGS_cdc_read_rpc_timeout_ms)); |
797 | 0 | SCOPED_TRACE(change_resp.DebugString()); |
798 | 0 | ASSERT_FALSE(change_resp.has_error()); |
799 | | |
800 | | // This time, we expect all records to be read. |
801 | 0 | ASSERT_EQ(change_resp.records_size(), num_records); |
802 | | |
803 | | // Cleanup stream before shutdown. |
804 | 0 | ASSERT_OK(client_->DeleteCDCStream(stream_id_)); |
805 | 0 | VerifyStreamDeletedFromCdcState(client_.get(), stream_id_, tablet_id); |
806 | 0 | } |
807 | | |
808 | 0 | TEST_P(CDCServiceTest, TestGetChangesInvalidStream) { |
809 | 0 | std::string tablet_id; |
810 | 0 | GetTablet(&tablet_id); |
811 | | |
812 | | // Get CDC changes for non-existent stream. |
813 | 0 | GetChangesRequestPB change_req; |
814 | 0 | GetChangesResponsePB change_resp; |
815 | |
|
816 | 0 | change_req.set_tablet_id(tablet_id); |
817 | 0 | change_req.set_stream_id("InvalidStreamId"); |
818 | 0 | change_req.mutable_from_checkpoint()->mutable_op_id()->set_index(0); |
819 | 0 | change_req.mutable_from_checkpoint()->mutable_op_id()->set_term(0); |
820 | |
|
821 | 0 | RpcController rpc; |
822 | 0 | ASSERT_OK(cdc_proxy_->GetChanges(change_req, &change_resp, &rpc)); |
823 | 0 | ASSERT_TRUE(change_resp.has_error()); |
824 | 0 | } |
825 | | |
826 | 0 | TEST_P(CDCServiceTest, TestGetCheckpoint) { |
827 | 0 | CreateCDCStream(cdc_proxy_, table_.table()->id(), &stream_id_); |
828 | |
|
829 | 0 | std::string tablet_id; |
830 | 0 | GetTablet(&tablet_id); |
831 | |
|
832 | 0 | GetCheckpointRequestPB req; |
833 | 0 | GetCheckpointResponsePB resp; |
834 | |
|
835 | 0 | req.set_tablet_id(tablet_id); |
836 | 0 | req.set_stream_id(stream_id_); |
837 | |
|
838 | 0 | { |
839 | 0 | RpcController rpc; |
840 | 0 | SCOPED_TRACE(req.DebugString()); |
841 | 0 | ASSERT_OK(cdc_proxy_->GetCheckpoint(req, &resp, &rpc)); |
842 | 0 | SCOPED_TRACE(resp.DebugString()); |
843 | 0 | ASSERT_FALSE(resp.has_error()); |
844 | 0 | ASSERT_EQ(resp.checkpoint().op_id().term(), 0); |
845 | 0 | ASSERT_EQ(resp.checkpoint().op_id().index(), 0); |
846 | 0 | } |
847 | 0 | } |
848 | | |
849 | | class CDCServiceTestMultipleServersOneTablet : public CDCServiceTest { |
850 | 0 | virtual int server_count() override { return 3; } |
851 | 0 | virtual int tablet_count() override { return 1; } |
852 | | }; |
853 | | |
854 | | INSTANTIATE_TEST_CASE_P(EnableReplicateIntents, CDCServiceTestMultipleServersOneTablet, |
855 | | ::testing::Bool()); |
856 | | |
857 | 0 | TEST_P(CDCServiceTestMultipleServersOneTablet, TestMetricsAfterServerFailure) { |
858 | | // Test that the metric value is not time since epoch after a leadership change. |
859 | 0 | SetAtomicFlag(0, &FLAGS_cdc_state_checkpoint_update_interval_ms); |
860 | 0 | CreateCDCStream(cdc_proxy_, table_.table()->id(), &stream_id_); |
861 | 0 | ANNOTATE_UNPROTECTED_WRITE(FLAGS_enable_collect_cdc_metrics) = false; |
862 | |
|
863 | 0 | std::string tablet_id; |
864 | 0 | GetTablet(&tablet_id); |
865 | |
|
866 | 0 | tserver::MiniTabletServer* leader_mini_tserver; |
867 | 0 | ASSERT_OK(WaitFor([&]() -> Result<bool> { |
868 | 0 | leader_mini_tserver = GetLeaderForTablet(tablet_id); |
869 | 0 | return leader_mini_tserver != nullptr; |
870 | 0 | }, MonoDelta::FromSeconds(30) * kTimeMultiplier, "Wait for tablet to have a leader.")); |
871 | 0 | auto timestamp_before_write = GetCurrentTimeMicros(); |
872 | 0 | ASSERT_NO_FATALS(WriteTestRow(0, 10, "key0", tablet_id, leader_mini_tserver->server()->proxy())); |
873 | 0 | ASSERT_NO_FATALS(GetChanges(tablet_id, stream_id_, 0, 0)); |
874 | 0 | ASSERT_OK(leader_mini_tserver->Restart()); |
875 | 0 | leader_mini_tserver = nullptr; |
876 | 0 | ASSERT_OK(WaitFor([&]() -> Result<bool> { |
877 | 0 | leader_mini_tserver = GetLeaderForTablet(tablet_id); |
878 | 0 | return leader_mini_tserver != nullptr; |
879 | 0 | }, MonoDelta::FromSeconds(30) * kTimeMultiplier, "Wait for tablet to have a leader.")); |
880 | |
|
881 | 0 | auto leader_tserver = leader_mini_tserver->server(); |
882 | 0 | auto leader_proxy = std::make_unique<CDCServiceProxy>( |
883 | 0 | &client_->proxy_cache(), |
884 | 0 | HostPort::FromBoundEndpoint(leader_mini_tserver->bound_rpc_addr())); |
885 | 0 | auto cdc_service = CDCService(leader_tserver); |
886 | 0 | cdc_service->UpdateLagMetrics(); |
887 | 0 | auto metrics = cdc_service->GetCDCTabletMetrics({"" /* UUID */, stream_id_, tablet_id}); |
888 | 0 | auto timestamp_after_write = GetCurrentTimeMicros(); |
889 | 0 | auto value = metrics->async_replication_committed_lag_micros->value(); |
890 | 0 | ASSERT_GE(value, 0); |
891 | 0 | ASSERT_LE(value, timestamp_after_write - timestamp_before_write); |
892 | 0 | } |
893 | | |
894 | 0 | TEST_P(CDCServiceTestMultipleServersOneTablet, TestUpdateLagMetrics) { |
895 | 0 | CreateCDCStream(cdc_proxy_, table_.table()->id(), &stream_id_); |
896 | 0 | ANNOTATE_UNPROTECTED_WRITE(FLAGS_enable_collect_cdc_metrics) = true; |
897 | |
|
898 | 0 | std::string tablet_id; |
899 | 0 | GetTablet(&tablet_id); |
900 | | |
901 | | // Get the leader and a follower for the tablet. |
902 | 0 | tserver::MiniTabletServer* leader_mini_tserver = nullptr; |
903 | 0 | tserver::MiniTabletServer* follower_mini_tserver = nullptr; |
904 | |
|
905 | 0 | ASSERT_OK(WaitFor([&]() -> Result<bool> { |
906 | 0 | leader_mini_tserver = nullptr; |
907 | 0 | follower_mini_tserver = nullptr; |
908 | 0 | for (size_t i = 0; i < cluster_->num_tablet_servers(); i++) { |
909 | 0 | std::shared_ptr<tablet::TabletPeer> tablet_peer; |
910 | 0 | Status s = cluster_->mini_tablet_server(i)->server()->tablet_manager()-> |
911 | 0 | GetTabletPeer(tablet_id, &tablet_peer); |
912 | 0 | if (!s.ok()) { |
913 | 0 | continue; |
914 | 0 | } |
915 | 0 | if (tablet_peer->LeaderStatus() == consensus::LeaderStatus::LEADER_AND_READY) { |
916 | 0 | leader_mini_tserver = cluster_->mini_tablet_server(i); |
917 | 0 | } else { |
918 | 0 | follower_mini_tserver = cluster_->mini_tablet_server(i); |
919 | 0 | } |
920 | 0 | } |
921 | 0 | return leader_mini_tserver != nullptr && follower_mini_tserver != nullptr; |
922 | 0 | }, MonoDelta::FromSeconds(30) * kTimeMultiplier, "Wait for tablet to have a leader.")); |
923 | |
|
924 | 0 | auto leader_proxy = std::make_unique<CDCServiceProxy>( |
925 | 0 | &client_->proxy_cache(), |
926 | 0 | HostPort::FromBoundEndpoint(leader_mini_tserver->bound_rpc_addr())); |
927 | |
|
928 | 0 | auto follower_proxy = std::make_unique<CDCServiceProxy>( |
929 | 0 | &client_->proxy_cache(), |
930 | 0 | HostPort::FromBoundEndpoint(follower_mini_tserver->bound_rpc_addr())); |
931 | |
|
932 | 0 | auto leader_tserver = leader_mini_tserver->server(); |
933 | 0 | auto follower_tserver = follower_mini_tserver->server(); |
934 | | // Use proxy for to most accurately simulate normal requests. |
935 | 0 | const auto& proxy = leader_tserver->proxy(); |
936 | |
|
937 | 0 | auto cdc_service = CDCService(leader_tserver); |
938 | 0 | auto cdc_service_follower = CDCService(follower_tserver); |
939 | | |
940 | | // At the start of time, assert both leader and follower at 0 lag. |
941 | 0 | ASSERT_OK(WaitFor([&]() -> Result<bool> { |
942 | 0 | { |
943 | | // Leader metrics |
944 | 0 | auto metrics = cdc_service->GetCDCTabletMetrics({"" /* UUID */, stream_id_, tablet_id}); |
945 | 0 | if (!(metrics->async_replication_sent_lag_micros->value() == 0 && |
946 | 0 | metrics->async_replication_committed_lag_micros->value() == 0)) { |
947 | 0 | return false; |
948 | 0 | } |
949 | 0 | } |
950 | 0 | { |
951 | | // Follower metrics |
952 | 0 | auto follower_metrics = |
953 | 0 | cdc_service_follower->GetCDCTabletMetrics({"" /* UUID */, stream_id_, tablet_id}); |
954 | 0 | return follower_metrics->async_replication_sent_lag_micros->value() == 0 && |
955 | 0 | follower_metrics->async_replication_committed_lag_micros->value() == 0; |
956 | 0 | } |
957 | 0 | }, MonoDelta::FromSeconds(10) * kTimeMultiplier, "At start, wait for Lag = 0")); |
958 | | |
959 | | |
960 | | // Create the in-memory structures for both follower and leader by polling for the tablet. |
961 | 0 | GetChangesRequestPB change_req; |
962 | 0 | GetChangesResponsePB change_resp; |
963 | 0 | change_req.set_tablet_id(tablet_id); |
964 | 0 | change_req.set_stream_id(stream_id_); |
965 | 0 | change_req.mutable_from_checkpoint()->mutable_op_id()->set_index(0); |
966 | 0 | change_req.mutable_from_checkpoint()->mutable_op_id()->set_term(0); |
967 | 0 | { |
968 | 0 | RpcController rpc; |
969 | 0 | SCOPED_TRACE(change_req.DebugString()); |
970 | 0 | ASSERT_OK(leader_proxy->GetChanges(change_req, &change_resp, &rpc)); |
971 | 0 | change_resp.Clear(); |
972 | 0 | rpc.Reset(); |
973 | 0 | ASSERT_OK(follower_proxy->GetChanges(change_req, &change_resp, &rpc)); |
974 | 0 | } |
975 | | |
976 | | // Insert test rows, one at a time so they have different hybrid times. |
977 | 0 | tserver::WriteRequestPB write_req; |
978 | 0 | tserver::WriteResponsePB write_resp; |
979 | 0 | write_req.set_tablet_id(tablet_id); |
980 | 0 | { |
981 | 0 | RpcController rpc; |
982 | 0 | AddTestRowInsert(1, 11, "key1", &write_req); |
983 | 0 | SCOPED_TRACE(write_req.DebugString()); |
984 | 0 | ASSERT_OK(WriteToProxyWithRetries(proxy, write_req, &write_resp, &rpc)); |
985 | 0 | SCOPED_TRACE(write_resp.DebugString()); |
986 | 0 | ASSERT_FALSE(write_resp.has_error()); |
987 | 0 | } |
988 | |
|
989 | 0 | { |
990 | 0 | write_req.Clear(); |
991 | 0 | write_req.set_tablet_id(tablet_id); |
992 | 0 | RpcController rpc; |
993 | 0 | AddTestRowInsert(2, 22, "key2", &write_req); |
994 | 0 | SCOPED_TRACE(write_req.DebugString()); |
995 | 0 | ASSERT_OK(WriteToProxyWithRetries(proxy, write_req, &write_resp, &rpc)); |
996 | 0 | SCOPED_TRACE(write_resp.DebugString()); |
997 | 0 | ASSERT_FALSE(write_resp.has_error()); |
998 | 0 | } |
999 | | |
1000 | | // Assert that leader lag > 0. |
1001 | 0 | ASSERT_OK(WaitFor([&]() -> Result<bool> { |
1002 | 0 | auto metrics = cdc_service->GetCDCTabletMetrics({"" /* UUID */, stream_id_, tablet_id}); |
1003 | 0 | return metrics->async_replication_sent_lag_micros->value() > 0 && |
1004 | 0 | metrics->async_replication_committed_lag_micros->value() > 0; |
1005 | 0 | }, MonoDelta::FromSeconds(10) * kTimeMultiplier, "Wait for Lag > 0")); |
1006 | |
|
1007 | 0 | { |
1008 | | // Make sure we wait for follower update thread to run at least once. |
1009 | 0 | SleepFor(MonoDelta::FromMilliseconds(FLAGS_update_metrics_interval_ms)); |
1010 | | // On the follower, we shouldn't create metrics for tablets that we're not leader for, so these |
1011 | | // should be 0 even if there are un-polled for records. |
1012 | 0 | auto metrics_follower = cdc_service_follower-> |
1013 | 0 | GetCDCTabletMetrics({"" /* UUID */, stream_id_, tablet_id}); |
1014 | 0 | ASSERT_TRUE(metrics_follower->async_replication_sent_lag_micros->value() == 0 && |
1015 | 0 | metrics_follower->async_replication_committed_lag_micros->value() == 0); |
1016 | 0 | } |
1017 | |
|
1018 | 0 | change_req.mutable_from_checkpoint()->CopyFrom(change_resp.checkpoint()); |
1019 | 0 | change_resp.Clear(); |
1020 | 0 | { |
1021 | 0 | RpcController rpc; |
1022 | 0 | SCOPED_TRACE(change_req.DebugString()); |
1023 | 0 | ASSERT_OK(leader_proxy->GetChanges(change_req, &change_resp, &rpc)); |
1024 | 0 | } |
1025 | | |
1026 | | // When we GetChanges the first time, only the read lag metric should be 0. |
1027 | 0 | ASSERT_OK(WaitFor([&]() -> Result<bool> { |
1028 | 0 | auto metrics = cdc_service->GetCDCTabletMetrics({"" /* UUID */, stream_id_, tablet_id}); |
1029 | 0 | return metrics->async_replication_sent_lag_micros->value() == 0 && |
1030 | 0 | metrics->async_replication_committed_lag_micros->value() > 0; |
1031 | 0 | }, MonoDelta::FromSeconds(10) * kTimeMultiplier, "Wait for Read Lag = 0")); |
1032 | |
|
1033 | 0 | change_req.mutable_from_checkpoint()->CopyFrom(change_resp.checkpoint()); |
1034 | 0 | change_resp.Clear(); |
1035 | 0 | { |
1036 | 0 | RpcController rpc; |
1037 | 0 | SCOPED_TRACE(change_req.DebugString()); |
1038 | 0 | ASSERT_OK(leader_proxy->GetChanges(change_req, &change_resp, &rpc)); |
1039 | 0 | } |
1040 | | |
1041 | | // When we GetChanges the second time, both the lag metrics should be 0. |
1042 | 0 | ASSERT_OK(WaitFor([&]() -> Result<bool> { |
1043 | 0 | auto metrics = cdc_service->GetCDCTabletMetrics({"" /* UUID */, stream_id_, tablet_id}); |
1044 | 0 | return metrics->async_replication_sent_lag_micros->value() == 0 && |
1045 | 0 | metrics->async_replication_committed_lag_micros->value() == 0; |
1046 | 0 | }, MonoDelta::FromSeconds(10) * kTimeMultiplier, "Wait for All Lag = 0")); |
1047 | 0 | } |
1048 | | |
1049 | | class CDCServiceTestMultipleServers : public CDCServiceTest { |
1050 | | public: |
1051 | 0 | virtual int server_count() override { return 2; } |
1052 | 0 | virtual int tablet_count() override { return 4; } |
1053 | | }; |
1054 | | |
1055 | | INSTANTIATE_TEST_CASE_P(EnableReplicateIntents, CDCServiceTestMultipleServers, ::testing::Bool()); |
1056 | | |
1057 | 0 | TEST_P(CDCServiceTestMultipleServers, TestListTablets) { |
1058 | 0 | CreateCDCStream(cdc_proxy_, table_.table()->id(), &stream_id_); |
1059 | |
|
1060 | 0 | std::string tablet_id; |
1061 | 0 | GetTablet(&tablet_id); |
1062 | |
|
1063 | 0 | ListTabletsRequestPB req; |
1064 | 0 | ListTabletsResponsePB resp; |
1065 | |
|
1066 | 0 | req.set_stream_id(stream_id_); |
1067 | |
|
1068 | 0 | auto cdc_proxy_bcast_addr = cluster_->mini_tablet_server(0)->options()->broadcast_addresses[0]; |
1069 | 0 | int cdc_proxy_count = 0; |
1070 | | |
1071 | | // Test a simple query for all tablets. |
1072 | 0 | { |
1073 | 0 | RpcController rpc; |
1074 | 0 | SCOPED_TRACE(req.DebugString()); |
1075 | 0 | ASSERT_OK(cdc_proxy_->ListTablets(req, &resp, &rpc)); |
1076 | 0 | SCOPED_TRACE(resp.DebugString()); |
1077 | 0 | ASSERT_FALSE(resp.has_error()); |
1078 | |
|
1079 | 0 | ASSERT_EQ(resp.tablets_size(), tablet_count()); |
1080 | 0 | ASSERT_EQ(resp.tablets(0).tablet_id(), tablet_id); |
1081 | |
|
1082 | 0 | for (auto& tablet : resp.tablets()) { |
1083 | 0 | auto owner_tserver = HostPort::FromPB(tablet.tservers(0).broadcast_addresses(0)); |
1084 | 0 | if (owner_tserver == cdc_proxy_bcast_addr) { |
1085 | 0 | ++cdc_proxy_count; |
1086 | 0 | } |
1087 | 0 | } |
1088 | 0 | } |
1089 | | |
1090 | | // Query for tablets only on the first server. We should only get a subset. |
1091 | 0 | { |
1092 | 0 | req.set_local_only(true); |
1093 | 0 | RpcController rpc; |
1094 | 0 | SCOPED_TRACE(req.DebugString()); |
1095 | 0 | ASSERT_OK(cdc_proxy_->ListTablets(req, &resp, &rpc)); |
1096 | 0 | SCOPED_TRACE(resp.DebugString()); |
1097 | 0 | ASSERT_FALSE(resp.has_error()); |
1098 | 0 | ASSERT_EQ(resp.tablets_size(), cdc_proxy_count); |
1099 | 0 | } |
1100 | 0 | } |
1101 | | |
1102 | 0 | TEST_P(CDCServiceTestMultipleServers, TestGetChangesProxyRouting) { |
1103 | 0 | CreateCDCStream(cdc_proxy_, table_.table()->id(), &stream_id_); |
1104 | | |
1105 | | // Figure out [1] all tablets and [2] which ones are local to the first server. |
1106 | 0 | std::vector<std::string> local_tablets, all_tablets; |
1107 | 0 | for (bool is_local : {true, false}) { |
1108 | 0 | RpcController rpc; |
1109 | 0 | ListTabletsRequestPB req; |
1110 | 0 | ListTabletsResponsePB resp; |
1111 | 0 | req.set_stream_id(stream_id_); |
1112 | 0 | req.set_local_only(is_local); |
1113 | 0 | ASSERT_OK(cdc_proxy_->ListTablets(req, &resp, &rpc)); |
1114 | 0 | ASSERT_FALSE(resp.has_error()); |
1115 | 0 | auto& cur_tablets = is_local ? local_tablets : all_tablets; |
1116 | 0 | for (int i = 0; i < resp.tablets_size(); ++i) { |
1117 | 0 | cur_tablets.push_back(resp.tablets(i).tablet_id()); |
1118 | 0 | } |
1119 | 0 | std::sort(cur_tablets.begin(), cur_tablets.end()); |
1120 | 0 | } |
1121 | 0 | ASSERT_LT(local_tablets.size(), all_tablets.size()); |
1122 | 0 | ASSERT_LT(0, local_tablets.size()); |
1123 | 0 | { |
1124 | | // Overlap between these two lists should be all the local tablets |
1125 | 0 | std::vector<std::string> tablet_intersection; |
1126 | 0 | std::set_intersection(local_tablets.begin(), local_tablets.end(), |
1127 | 0 | all_tablets.begin(), all_tablets.end(), |
1128 | 0 | std::back_inserter(tablet_intersection)); |
1129 | 0 | ASSERT_TRUE(std::equal(local_tablets.begin(), local_tablets.end(), |
1130 | 0 | tablet_intersection.begin())); |
1131 | 0 | } |
1132 | | // Difference should be all tablets on the other server. |
1133 | 0 | std::vector<std::string> remote_tablets; |
1134 | 0 | std::set_difference(all_tablets.begin(), all_tablets.end(), |
1135 | 0 | local_tablets.begin(), local_tablets.end(), |
1136 | 0 | std::back_inserter(remote_tablets)); |
1137 | 0 | ASSERT_LT(0, remote_tablets.size()); |
1138 | 0 | ASSERT_EQ(all_tablets.size() - local_tablets.size(), remote_tablets.size()); |
1139 | | |
1140 | | // Insert test rows, equal amount per tablet. |
1141 | 0 | int cur_row = 1; |
1142 | 0 | int to_write = 2; |
1143 | 0 | for (bool is_local : {true, false}) { |
1144 | 0 | const auto& tserver = cluster_->mini_tablet_server(is_local?0:1)->server(); |
1145 | | // Use proxy for to most accurately simulate normal requests. |
1146 | 0 | const auto& proxy = tserver->proxy(); |
1147 | 0 | auto& cur_tablets = is_local ? local_tablets : remote_tablets; |
1148 | 0 | for (auto& tablet_id : cur_tablets) { |
1149 | 0 | tserver::WriteRequestPB write_req; |
1150 | 0 | tserver::WriteResponsePB write_resp; |
1151 | 0 | write_req.set_tablet_id(tablet_id); |
1152 | 0 | RpcController rpc; |
1153 | 0 | for (int i = 1; i <= to_write; ++i) { |
1154 | 0 | AddTestRowInsert(cur_row, 11 * cur_row, "key" + std::to_string(cur_row), &write_req); |
1155 | 0 | ++cur_row; |
1156 | 0 | } |
1157 | |
|
1158 | 0 | SCOPED_TRACE(write_req.DebugString()); |
1159 | 0 | ASSERT_OK(WriteToProxyWithRetries(proxy, write_req, &write_resp, &rpc)); |
1160 | 0 | SCOPED_TRACE(write_resp.DebugString()); |
1161 | 0 | ASSERT_FALSE(write_resp.has_error()); |
1162 | 0 | } |
1163 | 0 | } |
1164 | | |
1165 | | // Query for all tablets on the first server. Ensure the non-local ones have errors. |
1166 | 0 | for (bool is_local : {true, false}) { |
1167 | 0 | auto& cur_tablets = is_local ? local_tablets : remote_tablets; |
1168 | 0 | for (auto tablet_id : cur_tablets) { |
1169 | 0 | std::vector<bool> proxy_options{false}; |
1170 | | // Verify that remote tablet queries work only when proxy forwarding is enabled. |
1171 | 0 | if (!is_local) proxy_options.push_back(true); |
1172 | 0 | for (auto use_proxy : proxy_options) { |
1173 | 0 | GetChangesRequestPB change_req; |
1174 | 0 | GetChangesResponsePB change_resp; |
1175 | 0 | change_req.set_tablet_id(tablet_id); |
1176 | 0 | change_req.set_stream_id(stream_id_); |
1177 | 0 | change_req.mutable_from_checkpoint()->mutable_op_id()->set_index(0); |
1178 | 0 | change_req.mutable_from_checkpoint()->mutable_op_id()->set_term(0); |
1179 | 0 | change_req.set_serve_as_proxy(use_proxy); |
1180 | 0 | RpcController rpc; |
1181 | 0 | SCOPED_TRACE(change_req.DebugString()); |
1182 | 0 | ASSERT_OK(cdc_proxy_->GetChanges(change_req, &change_resp, &rpc)); |
1183 | 0 | SCOPED_TRACE(change_resp.DebugString()); |
1184 | 0 | bool should_error = !(is_local || use_proxy); |
1185 | 0 | ASSERT_EQ(change_resp.has_error(), should_error); |
1186 | 0 | if (!should_error) { |
1187 | 0 | ASSERT_EQ(to_write, change_resp.records_size()); |
1188 | 0 | } |
1189 | 0 | } |
1190 | 0 | } |
1191 | 0 | } |
1192 | | |
1193 | | // Verify the CDC metrics match what we just did. |
1194 | 0 | const auto& tserver = cluster_->mini_tablet_server(0)->server(); |
1195 | 0 | auto cdc_service = CDCService(tserver); |
1196 | 0 | auto server_metrics = cdc_service->GetCDCServerMetrics(); |
1197 | 0 | ASSERT_EQ(server_metrics->cdc_rpc_proxy_count->value(), remote_tablets.size()); |
1198 | 0 | } |
1199 | | |
1200 | 0 | TEST_P(CDCServiceTest, TestOnlyGetLocalChanges) { |
1201 | 0 | CreateCDCStream(cdc_proxy_, table_.table()->id(), &stream_id_); |
1202 | |
|
1203 | 0 | std::string tablet_id; |
1204 | 0 | GetTablet(&tablet_id); |
1205 | |
|
1206 | 0 | const auto& proxy = cluster_->mini_tablet_server(0)->server()->proxy(); |
1207 | |
|
1208 | 0 | { |
1209 | | // Insert local test rows. |
1210 | 0 | tserver::WriteRequestPB write_req; |
1211 | 0 | tserver::WriteResponsePB write_resp; |
1212 | 0 | write_req.set_tablet_id(tablet_id); |
1213 | 0 | RpcController rpc; |
1214 | 0 | AddTestRowInsert(1, 11, "key1", &write_req); |
1215 | 0 | AddTestRowInsert(2, 22, "key2", &write_req); |
1216 | |
|
1217 | 0 | SCOPED_TRACE(write_req.DebugString()); |
1218 | 0 | ASSERT_OK(WriteToProxyWithRetries(proxy, write_req, &write_resp, &rpc)); |
1219 | 0 | SCOPED_TRACE(write_resp.DebugString()); |
1220 | 0 | ASSERT_FALSE(write_resp.has_error()); |
1221 | 0 | } |
1222 | |
|
1223 | 0 | { |
1224 | | // Insert remote test rows. |
1225 | 0 | tserver::WriteRequestPB write_req; |
1226 | 0 | tserver::WriteResponsePB write_resp; |
1227 | 0 | write_req.set_tablet_id(tablet_id); |
1228 | | // Apply at the lowest possible hybrid time. |
1229 | 0 | write_req.set_external_hybrid_time(yb::kInitialHybridTimeValue); |
1230 | |
|
1231 | 0 | RpcController rpc; |
1232 | 0 | AddTestRowInsert(1, 11, "key1_ext", &write_req); |
1233 | 0 | AddTestRowInsert(3, 33, "key3_ext", &write_req); |
1234 | |
|
1235 | 0 | SCOPED_TRACE(write_req.DebugString()); |
1236 | 0 | ASSERT_OK(WriteToProxyWithRetries(proxy, write_req, &write_resp, &rpc)); |
1237 | 0 | SCOPED_TRACE(write_resp.DebugString()); |
1238 | 0 | ASSERT_FALSE(write_resp.has_error()); |
1239 | 0 | } |
1240 | |
|
1241 | 0 | auto CheckChangesAndTable = [&]() { |
1242 | | // Get CDC changes. |
1243 | 0 | GetChangesRequestPB change_req; |
1244 | 0 | GetChangesResponsePB change_resp; |
1245 | |
|
1246 | 0 | change_req.set_tablet_id(tablet_id); |
1247 | 0 | change_req.set_stream_id(stream_id_); |
1248 | 0 | change_req.mutable_from_checkpoint()->mutable_op_id()->set_index(0); |
1249 | 0 | change_req.mutable_from_checkpoint()->mutable_op_id()->set_term(0); |
1250 | |
|
1251 | 0 | { |
1252 | | // Make sure only the two local test rows show up. |
1253 | 0 | RpcController rpc; |
1254 | 0 | SCOPED_TRACE(change_req.DebugString()); |
1255 | 0 | ASSERT_OK(cdc_proxy_->GetChanges(change_req, &change_resp, &rpc)); |
1256 | 0 | SCOPED_TRACE(change_resp.DebugString()); |
1257 | 0 | ASSERT_FALSE(change_resp.has_error()); |
1258 | 0 | ASSERT_EQ(change_resp.records_size(), 2); |
1259 | |
|
1260 | 0 | std::pair<int, std::string> expected_results[2] = |
1261 | 0 | {std::make_pair(11, "key1"), std::make_pair(22, "key2")}; |
1262 | 0 | for (int i = 0; i < change_resp.records_size(); i++) { |
1263 | 0 | ASSERT_EQ(change_resp.records(i).operation(), CDCRecordPB::WRITE); |
1264 | | |
1265 | | // Check the key. |
1266 | 0 | ASSERT_NO_FATALS(AssertIntKey(change_resp.records(i).key(), i + 1)); |
1267 | | |
1268 | | // Check the change records. |
1269 | 0 | ASSERT_NO_FATALS(AssertChangeRecords(change_resp.records(i).changes(), |
1270 | 0 | expected_results[i].first, |
1271 | 0 | expected_results[i].second)); |
1272 | 0 | } |
1273 | 0 | } |
1274 | | |
1275 | | // Now, fetch the entire table and ensure that we fetch all the keys inserted. |
1276 | 0 | client::TableHandle table; |
1277 | 0 | EXPECT_OK(table.Open(table_.table()->name(), client_.get())); |
1278 | 0 | auto result = ScanTableToStrings(table); |
1279 | 0 | std::sort(result.begin(), result.end()); |
1280 | |
|
1281 | 0 | ASSERT_EQ(3, result.size()); |
1282 | | |
1283 | | // Make sure that key1 and not key1_ext shows up, since we applied key1_ext at a lower hybrid |
1284 | | // time. |
1285 | 0 | ASSERT_EQ("{ int32:1, int32:11, string:\"key1\" }", result[0]); |
1286 | 0 | ASSERT_EQ("{ int32:2, int32:22, string:\"key2\" }", result[1]); |
1287 | 0 | ASSERT_EQ("{ int32:3, int32:33, string:\"key3_ext\" }", result[2]); |
1288 | 0 | }; |
1289 | |
|
1290 | 0 | ASSERT_NO_FATALS(CheckChangesAndTable()); |
1291 | |
|
1292 | 0 | ASSERT_OK(cluster_->RestartSync()); |
1293 | |
|
1294 | 0 | ASSERT_OK(WaitFor([&](){ |
1295 | 0 | std::shared_ptr<tablet::TabletPeer> tablet_peer; |
1296 | 0 | if (!cluster_->mini_tablet_server(0)->server()->tablet_manager()-> |
1297 | 0 | LookupTablet(tablet_id, &tablet_peer)) { |
1298 | 0 | return false; |
1299 | 0 | } |
1300 | 0 | return tablet_peer->LeaderStatus() == consensus::LeaderStatus::LEADER_AND_READY; |
1301 | 0 | }, MonoDelta::FromSeconds(30) * kTimeMultiplier, "Wait until tablet has a leader.")); |
1302 | |
|
1303 | 0 | ASSERT_NO_FATALS(CheckChangesAndTable()); |
1304 | | |
1305 | | // Cleanup stream before shutdown. |
1306 | 0 | ASSERT_OK(client_->DeleteCDCStream(stream_id_)); |
1307 | 0 | VerifyStreamDeletedFromCdcState(client_.get(), stream_id_, tablet_id); |
1308 | 0 | } |
1309 | | |
1310 | 0 | TEST_P(CDCServiceTest, TestCheckpointUpdatedForRemoteRows) { |
1311 | 0 | CreateCDCStream(cdc_proxy_, table_.table()->id(), &stream_id_); |
1312 | |
|
1313 | 0 | std::string tablet_id; |
1314 | 0 | GetTablet(&tablet_id); |
1315 | |
|
1316 | 0 | const auto& proxy = cluster_->mini_tablet_server(0)->server()->proxy(); |
1317 | |
|
1318 | 0 | { |
1319 | | // Insert remote test rows. |
1320 | 0 | tserver::WriteRequestPB write_req; |
1321 | 0 | tserver::WriteResponsePB write_resp; |
1322 | 0 | write_req.set_tablet_id(tablet_id); |
1323 | | // Apply at the lowest possible hybrid time. |
1324 | 0 | write_req.set_external_hybrid_time(yb::kInitialHybridTimeValue); |
1325 | |
|
1326 | 0 | RpcController rpc; |
1327 | 0 | AddTestRowInsert(1, 11, "key1_ext", &write_req); |
1328 | 0 | AddTestRowInsert(3, 33, "key3_ext", &write_req); |
1329 | |
|
1330 | 0 | SCOPED_TRACE(write_req.DebugString()); |
1331 | 0 | ASSERT_OK(WriteToProxyWithRetries(proxy, write_req, &write_resp, &rpc)); |
1332 | 0 | SCOPED_TRACE(write_resp.DebugString()); |
1333 | 0 | ASSERT_FALSE(write_resp.has_error()); |
1334 | 0 | } |
1335 | |
|
1336 | 0 | auto CheckChanges = [&]() { |
1337 | | // Get CDC changes. |
1338 | 0 | GetChangesRequestPB change_req; |
1339 | 0 | GetChangesResponsePB change_resp; |
1340 | |
|
1341 | 0 | change_req.set_tablet_id(tablet_id); |
1342 | 0 | change_req.set_stream_id(stream_id_); |
1343 | 0 | change_req.mutable_from_checkpoint()->mutable_op_id()->set_index(0); |
1344 | 0 | change_req.mutable_from_checkpoint()->mutable_op_id()->set_term(0); |
1345 | |
|
1346 | 0 | { |
1347 | | // Make sure that checkpoint is updated even when there are no CDC records. |
1348 | 0 | RpcController rpc; |
1349 | 0 | SCOPED_TRACE(change_req.DebugString()); |
1350 | 0 | ASSERT_OK(cdc_proxy_->GetChanges(change_req, &change_resp, &rpc)); |
1351 | 0 | SCOPED_TRACE(change_resp.DebugString()); |
1352 | 0 | ASSERT_FALSE(change_resp.has_error()); |
1353 | 0 | ASSERT_EQ(change_resp.records_size(), 0); |
1354 | 0 | ASSERT_GT(change_resp.checkpoint().op_id().index(), 0); |
1355 | 0 | } |
1356 | 0 | }; |
1357 | |
|
1358 | 0 | ASSERT_NO_FATALS(CheckChanges()); |
1359 | | |
1360 | | // Cleanup stream before shutdown. |
1361 | 0 | ASSERT_OK(client_->DeleteCDCStream(stream_id_)); |
1362 | 0 | VerifyStreamDeletedFromCdcState(client_.get(), stream_id_, tablet_id); |
1363 | 0 | } |
1364 | | |
1365 | | // Test to ensure that cdc_state table's checkpoint is updated as expected. |
1366 | | // This also tests for #2897 to ensure that cdc_state table checkpoint is not overwritten to 0.0 |
1367 | | // in case the consumer does not send from checkpoint. |
1368 | 0 | TEST_P(CDCServiceTest, TestCheckpointUpdate) { |
1369 | 0 | FLAGS_cdc_state_checkpoint_update_interval_ms = 0; |
1370 | |
|
1371 | 0 | CreateCDCStream(cdc_proxy_, table_.table()->id(), &stream_id_); |
1372 | |
|
1373 | 0 | std::string tablet_id; |
1374 | 0 | GetTablet(&tablet_id); |
1375 | |
|
1376 | 0 | const auto& proxy = cluster_->mini_tablet_server(0)->server()->proxy(); |
1377 | | |
1378 | | // Insert test rows. |
1379 | 0 | tserver::WriteRequestPB write_req; |
1380 | 0 | tserver::WriteResponsePB write_resp; |
1381 | 0 | write_req.set_tablet_id(tablet_id); |
1382 | 0 | { |
1383 | 0 | RpcController rpc; |
1384 | 0 | AddTestRowInsert(1, 11, "key1", &write_req); |
1385 | 0 | AddTestRowInsert(2, 22, "key2", &write_req); |
1386 | |
|
1387 | 0 | SCOPED_TRACE(write_req.DebugString()); |
1388 | 0 | ASSERT_OK(WriteToProxyWithRetries(proxy, write_req, &write_resp, &rpc)); |
1389 | 0 | SCOPED_TRACE(write_resp.DebugString()); |
1390 | 0 | ASSERT_FALSE(write_resp.has_error()); |
1391 | 0 | } |
1392 | | |
1393 | | // Get CDC changes. |
1394 | 0 | GetChangesRequestPB change_req; |
1395 | 0 | GetChangesResponsePB change_resp; |
1396 | |
|
1397 | 0 | change_req.set_tablet_id(tablet_id); |
1398 | 0 | change_req.set_stream_id(stream_id_); |
1399 | 0 | change_req.mutable_from_checkpoint()->mutable_op_id()->set_index(0); |
1400 | 0 | change_req.mutable_from_checkpoint()->mutable_op_id()->set_term(0); |
1401 | |
|
1402 | 0 | { |
1403 | 0 | RpcController rpc; |
1404 | 0 | SCOPED_TRACE(change_req.DebugString()); |
1405 | 0 | ASSERT_OK(cdc_proxy_->GetChanges(change_req, &change_resp, &rpc)); |
1406 | 0 | SCOPED_TRACE(change_resp.DebugString()); |
1407 | 0 | ASSERT_FALSE(change_resp.has_error()); |
1408 | 0 | ASSERT_EQ(change_resp.records_size(), 2); |
1409 | 0 | } |
1410 | | |
1411 | | // Call GetChanges again and pass in checkpoint that producer can mark as committed. |
1412 | 0 | change_req.mutable_from_checkpoint()->CopyFrom(change_resp.checkpoint()); |
1413 | 0 | change_resp.Clear(); |
1414 | 0 | { |
1415 | 0 | RpcController rpc; |
1416 | 0 | SCOPED_TRACE(change_req.DebugString()); |
1417 | 0 | ASSERT_OK(cdc_proxy_->GetChanges(change_req, &change_resp, &rpc)); |
1418 | 0 | SCOPED_TRACE(change_resp.DebugString()); |
1419 | 0 | ASSERT_FALSE(change_resp.has_error()); |
1420 | | // No more changes, so 0 records should be received. |
1421 | 0 | ASSERT_EQ(change_resp.records_size(), 0); |
1422 | 0 | } |
1423 | | |
1424 | | // Verify that cdc_state table has correct checkpoint. |
1425 | 0 | ASSERT_NO_FATALS(VerifyCdcStateNotEmpty(client_.get())); |
1426 | | |
1427 | | // Call GetChanges again but without any from checkpoint. |
1428 | 0 | change_req.Clear(); |
1429 | 0 | change_req.set_tablet_id(tablet_id); |
1430 | 0 | change_req.set_stream_id(stream_id_); |
1431 | 0 | change_resp.Clear(); |
1432 | 0 | { |
1433 | 0 | RpcController rpc; |
1434 | 0 | SCOPED_TRACE(change_req.DebugString()); |
1435 | 0 | ASSERT_OK(cdc_proxy_->GetChanges(change_req, &change_resp, &rpc)); |
1436 | 0 | SCOPED_TRACE(change_resp.DebugString()); |
1437 | 0 | ASSERT_FALSE(change_resp.has_error()); |
1438 | | // Verify that producer uses the "from_checkpoint" from cdc_state table and does not send back |
1439 | | // any records. |
1440 | 0 | ASSERT_EQ(change_resp.records_size(), 0); |
1441 | 0 | } |
1442 | | |
1443 | | // Verify that cdc_state table's checkpoint is unaffected. |
1444 | 0 | ASSERT_NO_FATALS(VerifyCdcStateNotEmpty(client_.get())); |
1445 | | |
1446 | | // Cleanup stream before shutdown. |
1447 | 0 | ASSERT_OK(client_->DeleteCDCStream(stream_id_)); |
1448 | 0 | VerifyStreamDeletedFromCdcState(client_.get(), stream_id_, tablet_id); |
1449 | 0 | } |
1450 | | |
1451 | | namespace { |
1452 | | void WaitForCDCIndex(const std::shared_ptr<tablet::TabletPeer>& tablet_peer, |
1453 | | int64_t expected_index, |
1454 | 0 | int timeout_secs) { |
1455 | 0 | LOG(INFO) << "Waiting until index equals " << expected_index |
1456 | 0 | << ". Timeout: " << timeout_secs; |
1457 | 0 | ASSERT_OK(WaitFor([&](){ |
1458 | 0 | if (tablet_peer->log_available() && |
1459 | 0 | tablet_peer->log()->cdc_min_replicated_index() == expected_index && |
1460 | 0 | tablet_peer->tablet_metadata()->cdc_min_replicated_index() == expected_index) { |
1461 | 0 | return true; |
1462 | 0 | } |
1463 | 0 | return false; |
1464 | 0 | }, MonoDelta::FromSeconds(timeout_secs) * kTimeMultiplier, |
1465 | 0 | "Wait until cdc min replicated index.")); |
1466 | 0 | LOG(INFO) << "Done waiting"; |
1467 | 0 | } |
1468 | | } // namespace |
1469 | | |
1470 | | class CDCServiceTestMaxRentionTime : public CDCServiceTest { |
1471 | | public: |
1472 | 0 | void SetUp() override { |
1473 | | // Immediately write any index provided by a GetChanges request to cdc_state table. |
1474 | 0 | FLAGS_cdc_state_checkpoint_update_interval_ms = 0; |
1475 | 0 | FLAGS_log_min_segments_to_retain = 1; |
1476 | 0 | FLAGS_log_min_seconds_to_retain = 1; |
1477 | 0 | FLAGS_cdc_wal_retention_time_secs = 1; |
1478 | 0 | FLAGS_enable_log_retention_by_op_idx = true; |
1479 | 0 | FLAGS_log_max_seconds_to_retain = kMaxSecondsToRetain; |
1480 | 0 | FLAGS_TEST_record_segments_violate_max_time_policy = true; |
1481 | 0 | FLAGS_update_min_cdc_indices_interval_secs = 1; |
1482 | | |
1483 | | // This will rollover log segments a lot faster. |
1484 | 0 | FLAGS_log_segment_size_bytes = 100; |
1485 | 0 | CDCServiceTest::SetUp(); |
1486 | 0 | } |
1487 | | const int kMaxSecondsToRetain = 30; |
1488 | | }; |
1489 | | |
1490 | | INSTANTIATE_TEST_CASE_P(EnableReplicateIntents, CDCServiceTestMaxRentionTime, ::testing::Bool()); |
1491 | | |
1492 | 0 | TEST_P(CDCServiceTestMaxRentionTime, TestLogRetentionByOpId_MaxRentionTime) { |
1493 | 0 | CreateCDCStream(cdc_proxy_, table_.table()->id(), &stream_id_); |
1494 | |
|
1495 | 0 | std::string tablet_id; |
1496 | 0 | GetTablet(&tablet_id); |
1497 | |
|
1498 | 0 | const auto& proxy = cluster_->mini_tablet_server(0)->server()->proxy(); |
1499 | |
|
1500 | 0 | std::shared_ptr<tablet::TabletPeer> tablet_peer; |
1501 | 0 | ASSERT_TRUE(cluster_->mini_tablet_server(0)->server()->tablet_manager()->LookupTablet(tablet_id, |
1502 | 0 | &tablet_peer)); |
1503 | | |
1504 | | // Write a row so that the next GetChanges request doesn't fail. |
1505 | 0 | WriteTestRow(0, 10, "key0", tablet_id, proxy); |
1506 | | |
1507 | | // Get CDC changes. |
1508 | 0 | GetChanges(tablet_id, stream_id_, /* term */ 0, /* index */ 0); |
1509 | |
|
1510 | 0 | WaitForCDCIndex(tablet_peer, 0, 4 * FLAGS_update_min_cdc_indices_interval_secs); |
1511 | |
|
1512 | 0 | MonoTime start = MonoTime::Now(); |
1513 | | // Write a lot more data to generate many log files that can be GCed. This should take less |
1514 | | // than kMaxSecondsToRetain for the next check to succeed. |
1515 | 0 | for (int i = 1; i <= 100; i++) { |
1516 | 0 | WriteTestRow(i, 10 + i, "key" + std::to_string(i), tablet_id, proxy); |
1517 | 0 | } |
1518 | 0 | MonoDelta elapsed = MonoTime::Now().GetDeltaSince(start); |
1519 | 0 | ASSERT_LT(elapsed.ToSeconds(), kMaxSecondsToRetain); |
1520 | 0 | MonoDelta time_to_sleep = MonoDelta::FromSeconds(kMaxSecondsToRetain + 10) - elapsed; |
1521 | | |
1522 | | // Since we haven't updated the minimum cdc index, and the elapsed time is less than |
1523 | | // kMaxSecondsToRetain, no log files should be returned. |
1524 | 0 | log::SegmentSequence segment_sequence; |
1525 | 0 | ASSERT_OK(tablet_peer->log()->GetSegmentsToGCUnlocked(std::numeric_limits<int64_t>::max(), |
1526 | 0 | &segment_sequence)); |
1527 | 0 | ASSERT_EQ(segment_sequence.size(), 0); |
1528 | 0 | LOG(INFO) << "No segments to be GCed because less than " << kMaxSecondsToRetain |
1529 | 0 | << " seconds have elapsed"; |
1530 | |
|
1531 | 0 | SleepFor(time_to_sleep); |
1532 | |
|
1533 | 0 | ASSERT_OK(tablet_peer->log()->GetSegmentsToGCUnlocked(std::numeric_limits<int64_t>::max(), |
1534 | 0 | &segment_sequence)); |
1535 | 0 | ASSERT_GT(segment_sequence.size(), 0); |
1536 | 0 | ASSERT_EQ(segment_sequence.size(), |
1537 | 0 | tablet_peer->log()->reader_->segments_violate_max_time_policy_->size()); |
1538 | |
|
1539 | 0 | for (size_t i = 0; i < segment_sequence.size(); i++) { |
1540 | 0 | ASSERT_EQ(segment_sequence[i]->path(), |
1541 | 0 | (*tablet_peer->log()->reader_->segments_violate_max_time_policy_)[i]->path()); |
1542 | 0 | LOG(INFO) << "Segment " << segment_sequence[i]->path() << " to be GCed"; |
1543 | 0 | } |
1544 | 0 | } |
1545 | | |
1546 | | class CDCServiceTestDurableMinReplicatedIndex : public CDCServiceTest { |
1547 | | public: |
1548 | 0 | void SetUp() override { |
1549 | | // Immediately write any index provided by a GetChanges request to cdc_state table. |
1550 | 0 | FLAGS_cdc_state_checkpoint_update_interval_ms = 0; |
1551 | 0 | FLAGS_update_min_cdc_indices_interval_secs = 1; |
1552 | 0 | FLAGS_enable_log_retention_by_op_idx = true; |
1553 | 0 | CDCServiceTest::SetUp(); |
1554 | 0 | } |
1555 | | }; |
1556 | | |
1557 | | INSTANTIATE_TEST_CASE_P(EnableReplicateIntents, CDCServiceTestDurableMinReplicatedIndex, |
1558 | | ::testing::Bool()); |
1559 | | |
1560 | 0 | TEST_P(CDCServiceTestDurableMinReplicatedIndex, TestBootstrapProducer) { |
1561 | 0 | constexpr int kNRows = 100; |
1562 | |
|
1563 | 0 | std::string tablet_id; |
1564 | 0 | GetTablet(&tablet_id); |
1565 | |
|
1566 | 0 | const auto& proxy = cluster_->mini_tablet_server(0)->server()->proxy(); |
1567 | 0 | for (int i = 0; i < kNRows; i++) { |
1568 | 0 | WriteTestRow(i, 10 + i, "key" + std::to_string(i), tablet_id, proxy); |
1569 | 0 | } |
1570 | |
|
1571 | 0 | BootstrapProducerRequestPB req; |
1572 | 0 | BootstrapProducerResponsePB resp; |
1573 | 0 | req.add_table_ids(table_.table()->id()); |
1574 | 0 | rpc::RpcController rpc; |
1575 | 0 | ASSERT_OK(cdc_proxy_->BootstrapProducer(req, &resp, &rpc)); |
1576 | 0 | ASSERT_FALSE(resp.has_error()); |
1577 | |
|
1578 | 0 | ASSERT_EQ(resp.cdc_bootstrap_ids().size(), 1); |
1579 | |
|
1580 | 0 | string bootstrap_id = resp.cdc_bootstrap_ids(0); |
1581 | | |
1582 | | // Verify that for each of the table's tablets, a new row in cdc_state table with the returned |
1583 | | // id was inserted. |
1584 | 0 | client::TableHandle table; |
1585 | 0 | client::YBTableName cdc_state_table( |
1586 | 0 | YQL_DATABASE_CQL, master::kSystemNamespaceName, master::kCdcStateTableName); |
1587 | 0 | ASSERT_OK(table.Open(cdc_state_table, client_.get())); |
1588 | 0 | ASSERT_EQ(1, boost::size(client::TableRange(table))); |
1589 | 0 | int nrows = 0; |
1590 | 0 | for (const auto& row : client::TableRange(table)) { |
1591 | 0 | nrows++; |
1592 | 0 | stream_id_ = row.column(master::kCdcStreamIdIdx).string_value(); |
1593 | 0 | ASSERT_EQ(stream_id_, bootstrap_id); |
1594 | |
|
1595 | 0 | string checkpoint = row.column(master::kCdcCheckpointIdx).string_value(); |
1596 | 0 | auto s = OpId::FromString(checkpoint); |
1597 | 0 | ASSERT_OK(s); |
1598 | 0 | OpId op_id = *s; |
1599 | | // When no writes are present, the checkpoint's index is 1. Plus one for the ALTER WAL RETENTION |
1600 | | // TIME that we issue when cdc is enabled on a table. |
1601 | 0 | ASSERT_EQ(op_id.index, 2 + kNRows); |
1602 | 0 | } |
1603 | | |
1604 | | // This table only has one tablet. |
1605 | 0 | ASSERT_EQ(nrows, 1); |
1606 | | |
1607 | | // Ensure that cdc_min_replicated_index is set to the correct value after Bootstrap. |
1608 | 0 | std::shared_ptr<tablet::TabletPeer> tablet_peer; |
1609 | 0 | ASSERT_TRUE(cluster_->mini_tablet_server(0)->server()->tablet_manager()->LookupTablet(tablet_id, |
1610 | 0 | &tablet_peer)); |
1611 | |
|
1612 | 0 | auto latest_opid = tablet_peer->log()->GetLatestEntryOpId(); |
1613 | 0 | WaitForCDCIndex(tablet_peer, latest_opid.index, 4 * FLAGS_update_min_cdc_indices_interval_secs); |
1614 | 0 | } |
1615 | | |
1616 | 0 | TEST_P(CDCServiceTestDurableMinReplicatedIndex, TestLogCDCMinReplicatedIndexIsDurable) { |
1617 | 0 | CreateCDCStream(cdc_proxy_, table_.table()->id(), &stream_id_); |
1618 | |
|
1619 | 0 | std::string tablet_id; |
1620 | 0 | GetTablet(&tablet_id); |
1621 | |
|
1622 | 0 | const auto& proxy = cluster_->mini_tablet_server(0)->server()->proxy(); |
1623 | |
|
1624 | 0 | std::shared_ptr<tablet::TabletPeer> tablet_peer; |
1625 | 0 | ASSERT_TRUE(cluster_->mini_tablet_server(0)->server()->tablet_manager()->LookupTablet(tablet_id, |
1626 | 0 | &tablet_peer)); |
1627 | | // Write a row so that the next GetChanges request doesn't fail. |
1628 | 0 | WriteTestRow(0, 10, "key0", tablet_id, proxy); |
1629 | | |
1630 | | // Get CDC changes. |
1631 | 0 | GetChanges(tablet_id, stream_id_, /* term */ 0, /* index */ 10); |
1632 | |
|
1633 | 0 | WaitForCDCIndex(tablet_peer, 10, 4 * FLAGS_update_min_cdc_indices_interval_secs); |
1634 | | |
1635 | | // Restart the entire cluster to verify that the CDC tablet metadata got loaded from disk. |
1636 | 0 | ASSERT_OK(cluster_->RestartSync()); |
1637 | |
|
1638 | 0 | ASSERT_OK(WaitFor([&](){ |
1639 | 0 | if (cluster_->mini_tablet_server(0)->server()->tablet_manager()-> |
1640 | 0 | LookupTablet(tablet_id, &tablet_peer)) { |
1641 | 0 | if (tablet_peer->LeaderStatus() == consensus::LeaderStatus::LEADER_AND_READY && |
1642 | 0 | tablet_peer->log() != nullptr) { |
1643 | 0 | LOG(INFO) << "TServer is ready "; |
1644 | 0 | return true; |
1645 | 0 | } |
1646 | 0 | } |
1647 | 0 | return false; |
1648 | 0 | }, MonoDelta::FromSeconds(30) * kTimeMultiplier, "Wait until tablet has a leader.")); |
1649 | | |
1650 | | // Verify the log and meta min replicated index was loaded correctly from disk. |
1651 | 0 | ASSERT_EQ(tablet_peer->log()->cdc_min_replicated_index(), 10); |
1652 | 0 | ASSERT_EQ(tablet_peer->tablet_metadata()->cdc_min_replicated_index(), 10); |
1653 | 0 | } |
1654 | | |
1655 | | class CDCServiceTestMinSpace : public CDCServiceTest { |
1656 | | public: |
1657 | 0 | void SetUp() override { |
1658 | 0 | FLAGS_cdc_state_checkpoint_update_interval_ms = 0; |
1659 | 0 | FLAGS_log_min_segments_to_retain = 1; |
1660 | 0 | FLAGS_log_min_seconds_to_retain = 1; |
1661 | 0 | FLAGS_cdc_wal_retention_time_secs = 1; |
1662 | 0 | FLAGS_enable_log_retention_by_op_idx = true; |
1663 | | // We want the logs to be GCed because of space, not because they exceeded the maximum time to |
1664 | | // be retained. |
1665 | 0 | FLAGS_log_max_seconds_to_retain = 10 * 3600; // 10 hours. |
1666 | 0 | FLAGS_log_stop_retaining_min_disk_mb = 1; |
1667 | 0 | FLAGS_TEST_record_segments_violate_min_space_policy = true; |
1668 | | |
1669 | | // This will rollover log segments a lot faster. |
1670 | 0 | FLAGS_log_segment_size_bytes = 500; |
1671 | 0 | CDCServiceTest::SetUp(); |
1672 | 0 | } |
1673 | | }; |
1674 | | |
1675 | 0 | TEST_P(CDCServiceTestMinSpace, TestLogRetentionByOpId_MinSpace) { |
1676 | 0 | CreateCDCStream(cdc_proxy_, table_.table()->id(), &stream_id_); |
1677 | |
|
1678 | 0 | std::string tablet_id; |
1679 | 0 | GetTablet(&tablet_id); |
1680 | |
|
1681 | 0 | const auto& proxy = cluster_->mini_tablet_server(0)->server()->proxy(); |
1682 | |
|
1683 | 0 | std::shared_ptr<tablet::TabletPeer> tablet_peer; |
1684 | 0 | ASSERT_TRUE(cluster_->mini_tablet_server(0)->server()->tablet_manager()->LookupTablet(tablet_id, |
1685 | 0 | &tablet_peer)); |
1686 | | // Write a row so that the next GetChanges request doesn't fail. |
1687 | 0 | WriteTestRow(0, 10, "key0", tablet_id, proxy); |
1688 | | |
1689 | | // Get CDC changes. |
1690 | 0 | GetChanges(tablet_id, stream_id_, /* term */ 0, /* index */ 0); |
1691 | |
|
1692 | 0 | WaitForCDCIndex(tablet_peer, 0, 4 * FLAGS_update_min_cdc_indices_interval_secs); |
1693 | | |
1694 | | // Write a lot more data to generate many log files that can be GCed. This should take less |
1695 | | // than kMaxSecondsToRetain for the next check to succeed. |
1696 | 0 | for (int i = 1; i <= 5000; i++) { |
1697 | 0 | WriteTestRow(i, 10 + i, "key" + std::to_string(i), tablet_id, proxy); |
1698 | 0 | } |
1699 | |
|
1700 | 0 | log::SegmentSequence segment_sequence; |
1701 | 0 | ASSERT_OK(tablet_peer->log()->GetSegmentsToGCUnlocked(std::numeric_limits<int64_t>::max(), |
1702 | 0 | &segment_sequence)); |
1703 | 0 | ASSERT_EQ(segment_sequence.size(), 0); |
1704 | |
|
1705 | 0 | FLAGS_TEST_simulate_free_space_bytes = 128; |
1706 | |
|
1707 | 0 | ASSERT_OK(tablet_peer->log()->GetSegmentsToGCUnlocked(std::numeric_limits<int64_t>::max(), |
1708 | 0 | &segment_sequence)); |
1709 | 0 | ASSERT_GT(segment_sequence.size(), 0); |
1710 | 0 | ASSERT_EQ(segment_sequence.size(), |
1711 | 0 | tablet_peer->log()->reader_->segments_violate_min_space_policy_->size()); |
1712 | |
|
1713 | 0 | for (size_t i = 0; i < segment_sequence.size(); i++) { |
1714 | 0 | ASSERT_EQ(segment_sequence[i]->path(), |
1715 | 0 | (*tablet_peer->log()->reader_->segments_violate_min_space_policy_)[i]->path()); |
1716 | 0 | LOG(INFO) << "Segment " << segment_sequence[i]->path() << " to be GCed"; |
1717 | 0 | } |
1718 | |
|
1719 | 0 | int32_t num_gced(0); |
1720 | 0 | ASSERT_OK(tablet_peer->log()->GC(std::numeric_limits<int64_t>::max(), &num_gced)); |
1721 | 0 | ASSERT_EQ(num_gced, segment_sequence.size()); |
1722 | | |
1723 | | // Read from 0.0. This should start reading from the beginning of the logs. |
1724 | 0 | GetChanges(tablet_id, stream_id_, /* term */ 0, /* index */ 0); |
1725 | 0 | } |
1726 | | |
1727 | | class CDCLogAndMetaIndex : public CDCServiceTest { |
1728 | | public: |
1729 | 0 | void SetUp() override { |
1730 | | // Immediately write any index provided by a GetChanges request to cdc_state table. |
1731 | 0 | FLAGS_cdc_state_checkpoint_update_interval_ms = 0; |
1732 | 0 | FLAGS_update_min_cdc_indices_interval_secs = 1; |
1733 | 0 | FLAGS_cdc_min_replicated_index_considered_stale_secs = 5; |
1734 | 0 | FLAGS_enable_log_retention_by_op_idx = true; |
1735 | 0 | CDCServiceTest::SetUp(); |
1736 | 0 | } |
1737 | | }; |
1738 | | |
1739 | | INSTANTIATE_TEST_CASE_P(EnableReplicateIntents, CDCLogAndMetaIndex, ::testing::Bool()); |
1740 | | |
1741 | 0 | TEST_P(CDCLogAndMetaIndex, TestLogAndMetaCdcIndex) { |
1742 | 0 | constexpr int kNStreams = 5; |
1743 | | |
1744 | | // This will rollover log segments a lot faster. |
1745 | 0 | FLAGS_log_segment_size_bytes = 100; |
1746 | |
|
1747 | 0 | CDCStreamId stream_id[kNStreams]; |
1748 | |
|
1749 | 0 | for (int i = 0; i < kNStreams; i++) { |
1750 | 0 | CreateCDCStream(cdc_proxy_, table_.table()->id(), &stream_id[i]); |
1751 | 0 | } |
1752 | |
|
1753 | 0 | std::string tablet_id; |
1754 | 0 | GetTablet(&tablet_id); |
1755 | |
|
1756 | 0 | const auto &proxy = cluster_->mini_tablet_server(0)->server()->proxy(); |
1757 | | |
1758 | | // Insert test rows. |
1759 | 0 | for (int i = 1; i <= kNStreams; i++) { |
1760 | 0 | WriteTestRow(i, 10 + i, "key" + std::to_string(i), tablet_id, proxy); |
1761 | 0 | } |
1762 | |
|
1763 | 0 | std::shared_ptr<tablet::TabletPeer> tablet_peer; |
1764 | 0 | ASSERT_TRUE(cluster_->mini_tablet_server(0)->server()->tablet_manager()->LookupTablet(tablet_id, |
1765 | 0 | &tablet_peer)); |
1766 | | |
1767 | | // Before any cdc request, the min index should be max value. |
1768 | 0 | ASSERT_EQ(tablet_peer->log()->cdc_min_replicated_index(), std::numeric_limits<int64_t>::max()); |
1769 | 0 | ASSERT_EQ(tablet_peer->tablet_metadata()->cdc_min_replicated_index(), |
1770 | 0 | std::numeric_limits<int64_t>::max()); |
1771 | |
|
1772 | 0 | for (int i = 0; i < kNStreams; i++) { |
1773 | | // Get CDC changes. |
1774 | 0 | GetChanges(tablet_id, stream_id[i], /* term */ 0, /* index */ i); |
1775 | 0 | } |
1776 | | |
1777 | | // After the request succeeded, verify that the min cdc limit was set correctly. In this case |
1778 | | // it belongs to stream_id[0] with index 0. |
1779 | 0 | WaitForCDCIndex(tablet_peer, 0, 4 * FLAGS_update_min_cdc_indices_interval_secs); |
1780 | | |
1781 | | // Changing the lowest index from all the streams should also be reflected in the log object. |
1782 | 0 | GetChanges(tablet_id, stream_id[0], /* term */ 0, /* index */ 4); |
1783 | | |
1784 | | // After the request succeeded, verify that the min cdc limit was set correctly. In this case |
1785 | | // it belongs to stream_id[1] with index 1. |
1786 | 0 | WaitForCDCIndex(tablet_peer, 1, 4 * FLAGS_update_min_cdc_indices_interval_secs); |
1787 | |
|
1788 | 0 | for (int i = 0; i < kNStreams; i++) { |
1789 | 0 | ASSERT_OK(client_->DeleteCDCStream(stream_id[i], |
1790 | 0 | true /*force_delete*/, |
1791 | 0 | true /*ignore_errors*/)); |
1792 | 0 | } |
1793 | 0 | } |
1794 | | |
1795 | | class CDCLogAndMetaIndexReset : public CDCLogAndMetaIndex { |
1796 | | public: |
1797 | 0 | void SetUp() override { |
1798 | 0 | FLAGS_cdc_min_replicated_index_considered_stale_secs = 5; |
1799 | | // This will rollover log segments a lot faster. |
1800 | 0 | FLAGS_log_segment_size_bytes = 100; |
1801 | 0 | CDCLogAndMetaIndex::SetUp(); |
1802 | 0 | } |
1803 | | }; |
1804 | | |
1805 | | INSTANTIATE_TEST_CASE_P(EnableReplicateIntents, CDCLogAndMetaIndexReset, ::testing::Bool()); |
1806 | | |
1807 | | // Test that when all the streams for a specific tablet have been deleted, the log and meta |
1808 | | // cdc min replicated index is reset to max int64. |
1809 | 0 | TEST_P(CDCLogAndMetaIndexReset, TestLogAndMetaCdcIndexAreReset) { |
1810 | 0 | constexpr int kNStreams = 5; |
1811 | | |
1812 | | // This will rollover log segments a lot faster. |
1813 | 0 | FLAGS_log_segment_size_bytes = 100; |
1814 | |
|
1815 | 0 | CDCStreamId stream_id[kNStreams]; |
1816 | |
|
1817 | 0 | for (int i = 0; i < kNStreams; i++) { |
1818 | 0 | CreateCDCStream(cdc_proxy_, table_.table()->id(), &stream_id[i]); |
1819 | 0 | } |
1820 | |
|
1821 | 0 | std::string tablet_id; |
1822 | 0 | GetTablet(&tablet_id); |
1823 | |
|
1824 | 0 | const auto &proxy = cluster_->mini_tablet_server(0)->server()->proxy(); |
1825 | | |
1826 | | // Insert test rows. |
1827 | 0 | for (int i = 1; i <= kNStreams; i++) { |
1828 | 0 | WriteTestRow(i, 10 + i, "key" + std::to_string(i), tablet_id, proxy); |
1829 | 0 | } |
1830 | |
|
1831 | 0 | std::shared_ptr<tablet::TabletPeer> tablet_peer; |
1832 | 0 | ASSERT_TRUE(cluster_->mini_tablet_server(0)->server()->tablet_manager()->LookupTablet(tablet_id, |
1833 | 0 | &tablet_peer)); |
1834 | | |
1835 | | // Before any cdc request, the min index should be max value. |
1836 | 0 | ASSERT_EQ(tablet_peer->log()->cdc_min_replicated_index(), std::numeric_limits<int64_t>::max()); |
1837 | 0 | ASSERT_EQ(tablet_peer->tablet_metadata()->cdc_min_replicated_index(), |
1838 | 0 | std::numeric_limits<int64_t>::max()); |
1839 | | |
1840 | |
|
1841 | 0 | for (int i = 0; i < kNStreams; i++) { |
1842 | | // Get CDC changes. |
1843 | 0 | GetChanges(tablet_id, stream_id[i], /* term */ 0, /* index */ 5); |
1844 | 0 | } |
1845 | | |
1846 | | // After the request succeeded, verify that the min cdc limit was set correctly. In this case |
1847 | | // all the streams have index 5. |
1848 | 0 | WaitForCDCIndex(tablet_peer, 5, 4 * FLAGS_update_min_cdc_indices_interval_secs); |
1849 | |
|
1850 | 0 | client::TableHandle table; |
1851 | 0 | client::YBTableName cdc_state_table( |
1852 | 0 | YQL_DATABASE_CQL, master::kSystemNamespaceName, master::kCdcStateTableName); |
1853 | 0 | ASSERT_OK(table.Open(cdc_state_table, client_.get())); |
1854 | |
|
1855 | 0 | auto session = client_->NewSession(); |
1856 | 0 | for (int i = 0; i < kNStreams; i++) { |
1857 | 0 | const auto delete_op = table.NewDeleteOp(); |
1858 | 0 | auto* delete_req = delete_op->mutable_request(); |
1859 | 0 | QLAddStringHashValue(delete_req, tablet_id); |
1860 | 0 | QLAddStringRangeValue(delete_req, stream_id[i]); |
1861 | 0 | session->Apply(delete_op); |
1862 | 0 | } |
1863 | 0 | ASSERT_OK(session->Flush()); |
1864 | 0 | LOG(INFO) << "Successfully deleted all streams from cdc_state"; |
1865 | |
|
1866 | 0 | SleepFor(MonoDelta::FromSeconds(FLAGS_cdc_min_replicated_index_considered_stale_secs + 1)); |
1867 | |
|
1868 | 0 | LOG(INFO) << "Done sleeping"; |
1869 | | // RunLogGC should reset cdc min replicated index to max int64 because more than |
1870 | | // FLAGS_cdc_min_replicated_index_considered_stale_secs seconds have elapsed since the index |
1871 | | // was last updated. |
1872 | 0 | ASSERT_OK(tablet_peer->RunLogGC()); |
1873 | 0 | LOG(INFO) << "GC done running"; |
1874 | 0 | ASSERT_EQ(tablet_peer->log()->cdc_min_replicated_index(), std::numeric_limits<int64_t>::max()); |
1875 | 0 | ASSERT_EQ(tablet_peer->tablet_metadata()->cdc_min_replicated_index(), |
1876 | 0 | std::numeric_limits<int64_t>::max()); |
1877 | |
|
1878 | 0 | for (int i = 0; i < kNStreams; i++) { |
1879 | 0 | ASSERT_OK(client_->DeleteCDCStream(stream_id[i], |
1880 | 0 | true /*force_delete*/, |
1881 | 0 | true /*ignore_errors*/)); |
1882 | 0 | } |
1883 | 0 | } |
1884 | | |
1885 | | class CDCServiceTestThreeServers : public CDCServiceTest { |
1886 | | public: |
1887 | 0 | void SetUp() override { |
1888 | | // We don't want the tablets to move in the middle of the test. |
1889 | 0 | FLAGS_enable_load_balancing = false; |
1890 | 0 | FLAGS_leader_failure_max_missed_heartbeat_periods = 12.0; |
1891 | 0 | FLAGS_update_min_cdc_indices_interval_secs = 5; |
1892 | 0 | FLAGS_enable_log_retention_by_op_idx = true; |
1893 | 0 | FLAGS_client_read_write_timeout_ms = 20 * 1000 * kTimeMultiplier; |
1894 | | |
1895 | | // Always update cdc_state table. |
1896 | 0 | FLAGS_cdc_state_checkpoint_update_interval_ms = 0; |
1897 | |
|
1898 | 0 | FLAGS_follower_unavailable_considered_failed_sec = 20 * kTimeMultiplier; |
1899 | |
|
1900 | 0 | CDCServiceTest::SetUp(); |
1901 | 0 | } |
1902 | | |
1903 | 0 | void DoTearDown() override { |
1904 | 0 | YBMiniClusterTestBase::DoTearDown(); |
1905 | 0 | } |
1906 | | |
1907 | 0 | virtual int server_count() override { return 3; } |
1908 | 0 | virtual int tablet_count() override { return 3; } |
1909 | | |
1910 | | // Get the first tablet_id for which any peer is a leader. |
1911 | | void GetFirstTabletIdAndLeaderPeer(TabletId* tablet_id, ssize_t* leader_idx, int timeout_secs); |
1912 | | }; |
1913 | | |
1914 | | INSTANTIATE_TEST_CASE_P(EnableReplicateIntents, CDCServiceTestThreeServers, ::testing::Bool()); |
1915 | | |
1916 | | // Sometimes leadership takes a while. Keep retrying until timeout_secs seconds have elapsed. |
1917 | | void CDCServiceTestThreeServers::GetFirstTabletIdAndLeaderPeer(TabletId* tablet_id, |
1918 | | ssize_t* leader_idx, |
1919 | 0 | int timeout_secs) { |
1920 | 0 | std::vector<TabletId> tablet_ids; |
1921 | | // Verify that we are only returning a tablet that belongs to the table created for this test. |
1922 | 0 | GetTablets(&tablet_ids); |
1923 | 0 | ASSERT_EQ(tablet_ids.size(), tablet_count()); |
1924 | |
|
1925 | 0 | MonoTime now = MonoTime::Now(); |
1926 | 0 | MonoTime deadline = now + MonoDelta::FromSeconds(timeout_secs); |
1927 | 0 | while(now.ComesBefore(deadline) && (!tablet_id || tablet_id->empty())) { |
1928 | 0 | for (size_t idx = 0; idx < cluster_->num_tablet_servers(); idx++) { |
1929 | 0 | auto peers = cluster_->mini_tablet_server(idx)->server()->tablet_manager()->GetTabletPeers(); |
1930 | 0 | ASSERT_GT(peers.size(), 0); |
1931 | |
|
1932 | 0 | for (const auto &peer : peers) { |
1933 | 0 | auto it = std::find(tablet_ids.begin(), tablet_ids.end(), peer->tablet_id()); |
1934 | 0 | if (it != tablet_ids.end() && |
1935 | 0 | peer->LeaderStatus() == consensus::LeaderStatus::LEADER_AND_READY) { |
1936 | 0 | *tablet_id = peer->tablet_id(); |
1937 | 0 | *leader_idx = idx; |
1938 | 0 | LOG(INFO) << "Selected tablet " << tablet_id << " for tablet server " << idx; |
1939 | 0 | break; |
1940 | 0 | } |
1941 | 0 | } |
1942 | 0 | } |
1943 | 0 | now = MonoTime::Now(); |
1944 | 0 | } |
1945 | 0 | } |
1946 | | |
1947 | | |
1948 | | // Test that whenever a leader change happens (forced here by shutting down the tablet leader), |
1949 | | // next leader correctly reads the minimum applied cdc index by reading the cdc_state table. |
1950 | 0 | TEST_P(CDCServiceTestThreeServers, TestNewLeaderUpdatesLogCDCAppliedIndex) { |
1951 | 0 | constexpr int kNRecords = 30; |
1952 | 0 | constexpr int kGettingLeaderTimeoutSecs = 20; |
1953 | |
|
1954 | 0 | TabletId tablet_id; |
1955 | | // Index of the TS that is the leader for the selected tablet_id. |
1956 | 0 | ssize_t leader_idx = -1; |
1957 | |
|
1958 | 0 | GetFirstTabletIdAndLeaderPeer(&tablet_id, &leader_idx, kGettingLeaderTimeoutSecs); |
1959 | 0 | ASSERT_FALSE(tablet_id.empty()); |
1960 | 0 | ASSERT_GE(leader_idx, 0); |
1961 | |
|
1962 | 0 | const auto &proxy = cluster_->mini_tablet_server(leader_idx)->server()->proxy(); |
1963 | 0 | for (int i = 0; i < kNRecords; i++) { |
1964 | 0 | WriteTestRow(i, 10 + i, "key" + std::to_string(i), tablet_id, proxy); |
1965 | 0 | } |
1966 | 0 | LOG(INFO) << "Inserted " << kNRecords << " records"; |
1967 | |
|
1968 | 0 | CreateCDCStream(cdc_proxy_, table_.table()->id(), &stream_id_); |
1969 | 0 | LOG(INFO) << "Created cdc stream " << stream_id_; |
1970 | |
|
1971 | 0 | std::shared_ptr<tablet::TabletPeer> tablet_peer; |
1972 | | // Check that the index hasn't been updated in any of the peers. |
1973 | 0 | for (int idx = 0; idx < server_count(); idx++) { |
1974 | 0 | if (cluster_->mini_tablet_server(idx)->server()->tablet_manager()-> |
1975 | 0 | LookupTablet(tablet_id, &tablet_peer)) { |
1976 | 0 | ASSERT_EQ(tablet_peer->log()->cdc_min_replicated_index(), std::numeric_limits<int64>::max()); |
1977 | 0 | ASSERT_EQ(tablet_peer->tablet_metadata()->cdc_min_replicated_index(), |
1978 | 0 | std::numeric_limits<int64>::max()); |
1979 | |
|
1980 | 0 | } |
1981 | 0 | } |
1982 | | |
1983 | | // Kill the tablet leader tserver so that another tserver becomes the leader. |
1984 | 0 | cluster_->mini_tablet_server(leader_idx)->Shutdown(); |
1985 | 0 | LOG(INFO) << "tserver " << leader_idx << " was shutdown"; |
1986 | | |
1987 | | // CDC Proxy is pinned to the first TServer, so we need to update the proxy if we kill that one. |
1988 | 0 | if (leader_idx == 0) { |
1989 | 0 | cdc_proxy_ = std::make_unique<CDCServiceProxy>( |
1990 | 0 | &client_->proxy_cache(), |
1991 | 0 | HostPort::FromBoundEndpoint(cluster_->mini_tablet_server(1)->bound_rpc_addr())); |
1992 | 0 | } |
1993 | | |
1994 | | // Wait until GetChanges doesn't return any errors. This means that we are able to write to |
1995 | | // the cdc_state table. |
1996 | 0 | ASSERT_OK(WaitFor([&](){ |
1997 | 0 | bool has_error = false; |
1998 | 0 | GetChanges(tablet_id, stream_id_, /* term */ 0, /* index */ 5, &has_error); |
1999 | 0 | return !has_error; |
2000 | 0 | }, MonoDelta::FromSeconds(180) * kTimeMultiplier, "Wait until cdc state table can take writes.")); |
2001 | |
|
2002 | 0 | std::unique_ptr<CDCServiceProxy> cdc_proxy; |
2003 | 0 | ASSERT_OK(WaitFor([&](){ |
2004 | 0 | for (int idx = 0; idx < server_count(); idx++) { |
2005 | 0 | if (idx == leader_idx) { |
2006 | | // This TServer is shutdown for now. |
2007 | 0 | continue; |
2008 | 0 | } |
2009 | 0 | if (cluster_->mini_tablet_server(idx)->server()->tablet_manager()-> |
2010 | 0 | LookupTablet(tablet_id, &tablet_peer)) { |
2011 | 0 | if (tablet_peer->LeaderStatus() == consensus::LeaderStatus::LEADER_AND_READY) { |
2012 | 0 | LOG(INFO) << "Found new leader for tablet " << tablet_id << " in TS " << idx; |
2013 | 0 | return true; |
2014 | 0 | } |
2015 | 0 | } |
2016 | 0 | } |
2017 | 0 | return false; |
2018 | 0 | }, MonoDelta::FromSeconds(30) * kTimeMultiplier, "Wait until tablet has a leader.")); |
2019 | |
|
2020 | 0 | SleepFor(MonoDelta::FromSeconds((FLAGS_update_min_cdc_indices_interval_secs * 3))); |
2021 | 0 | LOG(INFO) << "Done sleeping"; |
2022 | |
|
2023 | 0 | ASSERT_EQ(tablet_peer->log()->cdc_min_replicated_index(), 5); |
2024 | 0 | ASSERT_EQ(tablet_peer->tablet_metadata()->cdc_min_replicated_index(), 5); |
2025 | |
|
2026 | 0 | ASSERT_OK(cluster_->mini_tablet_server(leader_idx)->Start()); |
2027 | 0 | ASSERT_OK(cluster_->mini_tablet_server(leader_idx)->WaitStarted()); |
2028 | 0 | } |
2029 | | |
2030 | | } // namespace cdc |
2031 | | } // namespace yb |