/Users/deen/code/yugabyte-db/src/yb/client/backup-txn-test.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | // |
13 | | |
14 | | #include "yb/client/error.h" |
15 | | #include "yb/client/session.h" |
16 | | #include "yb/client/snapshot_test_util.h" |
17 | | #include "yb/client/table.h" |
18 | | #include "yb/client/transaction.h" |
19 | | #include "yb/client/yb_table_name.h" |
20 | | |
21 | | #include "yb/common/transaction_error.h" |
22 | | |
23 | | #include "yb/master/master_backup.proxy.h" |
24 | | |
25 | | #include "yb/rocksdb/db.h" |
26 | | |
27 | | #include "yb/tablet/tablet.h" |
28 | | #include "yb/tablet/tablet_peer.h" |
29 | | #include "yb/tablet/tablet_retention_policy.h" |
30 | | #include "yb/tablet/tablet_snapshots.h" |
31 | | |
32 | | #include "yb/tserver/mini_tablet_server.h" |
33 | | #include "yb/tserver/tablet_server.h" |
34 | | |
35 | | #include "yb/util/test_thread_holder.h" |
36 | | |
37 | | using namespace std::literals; |
38 | | using yb::master::SysSnapshotEntryPB; |
39 | | |
40 | | DECLARE_bool(enable_history_cutoff_propagation); |
41 | | DECLARE_bool(flush_rocksdb_on_shutdown); |
42 | | DECLARE_int32(TEST_inject_status_resolver_complete_delay_ms); |
43 | | DECLARE_int32(history_cutoff_propagation_interval_ms); |
44 | | DECLARE_int32(raft_heartbeat_interval_ms); |
45 | | DECLARE_int32(timestamp_history_retention_interval_sec); |
46 | | DECLARE_int32(unresponsive_ts_rpc_timeout_ms); |
47 | | DECLARE_uint64(max_clock_skew_usec); |
48 | | DECLARE_uint64(snapshot_coordinator_cleanup_delay_ms); |
49 | | DECLARE_uint64(snapshot_coordinator_poll_interval_ms); |
50 | | |
51 | | namespace yb { |
52 | | namespace client { |
53 | | |
54 | | using ImportedSnapshotData = google::protobuf::RepeatedPtrField< |
55 | | master::ImportSnapshotMetaResponsePB::TableMetaPB>; |
56 | | |
57 | | class BackupTxnTest : public TransactionTestBase<MiniCluster> { |
58 | | protected: |
59 | 16 | void SetUp() override { |
60 | 16 | FLAGS_enable_history_cutoff_propagation = true; |
61 | 16 | SetIsolationLevel(IsolationLevel::SNAPSHOT_ISOLATION); |
62 | 16 | mini_cluster_opt_.num_masters = 3; |
63 | 16 | TransactionTestBase::SetUp(); |
64 | 16 | snapshot_util_ = std::make_unique<SnapshotTestUtil>(); |
65 | 16 | snapshot_util_->SetProxy(&client_->proxy_cache()); |
66 | 16 | snapshot_util_->SetCluster(cluster_.get()); |
67 | 16 | } |
68 | | |
69 | 0 | void DoBeforeTearDown() override { |
70 | 0 | if (!testing::Test::HasFailure()) { |
71 | 0 | FLAGS_flush_rocksdb_on_shutdown = false; |
72 | 0 | ASSERT_OK(cluster_->RestartSync()); |
73 | 0 | } |
74 | |
|
75 | 0 | TransactionTestBase::DoBeforeTearDown(); |
76 | 0 | } |
77 | | |
78 | 0 | CHECKED_STATUS WaitAllSnapshotsDeleted() { |
79 | 0 | RETURN_NOT_OK(snapshot_util_->WaitAllSnapshotsDeleted()); |
80 | 0 | // Check if deleted in DocDB. |
81 | 0 | return WaitFor([this]() -> Result<bool> { |
82 | 0 | auto peers = ListTabletPeers(cluster_.get(), ListPeersFilter::kAll); |
83 | 0 | for (const auto& peer : peers) { |
84 | 0 | auto db = peer->tablet()->doc_db().regular; |
85 | 0 | if (!db) { |
86 | 0 | continue; |
87 | 0 | } |
88 | 0 | auto dir = tablet::TabletSnapshots::SnapshotsDirName(db->GetName()); |
89 | 0 | auto children = VERIFY_RESULT(Env::Default()->GetChildren(dir, ExcludeDots::kTrue)); |
90 | 0 | if (!children.empty()) { |
91 | 0 | LOG(INFO) << peer->LogPrefix() << "Children: " << AsString(children); |
92 | 0 | return false; |
93 | 0 | } |
94 | 0 | } |
95 | 0 | return true; |
96 | 0 | }, kWaitTimeout * kTimeMultiplier, "Delete on tablets"); |
97 | 0 | } |
98 | | |
99 | 0 | Result<bool> IsSnapshotImportDone(const ImportedSnapshotData& data) { |
100 | 0 | for (const auto& table : data) { |
101 | 0 | RETURN_NOT_OK(client_->OpenTable(table.table_ids().new_id())); |
102 | 0 | } |
103 | |
|
104 | 0 | return true; |
105 | 0 | } |
106 | | |
107 | | void TestDeleteTable(bool restart_masters); |
108 | | |
109 | | std::unique_ptr<SnapshotTestUtil> snapshot_util_; |
110 | | }; |
111 | | |
112 | 0 | TEST_F(BackupTxnTest, Simple) { |
113 | 0 | SetAtomicFlag( |
114 | 0 | std::chrono::duration_cast<std::chrono::microseconds>(1s).count() * kTimeMultiplier, |
115 | 0 | &FLAGS_max_clock_skew_usec); |
116 | 0 | ASSERT_NO_FATALS(WriteData()); |
117 | |
|
118 | 0 | TxnSnapshotId snapshot_id = ASSERT_RESULT(snapshot_util_->StartSnapshot(table_)); |
119 | |
|
120 | 0 | bool has_pending = false; |
121 | 0 | ASSERT_OK(WaitFor([this, &snapshot_id, &has_pending]() -> Result<bool> { |
122 | 0 | if (!VERIFY_RESULT(snapshot_util_->IsSnapshotDone(snapshot_id))) { |
123 | 0 | has_pending = true; |
124 | 0 | return false; |
125 | 0 | } |
126 | 0 | return true; |
127 | 0 | }, 10s, "Snapshot done")); |
128 | |
|
129 | 0 | ASSERT_TRUE(has_pending); |
130 | |
|
131 | 0 | ASSERT_OK(snapshot_util_->VerifySnapshot(snapshot_id, SysSnapshotEntryPB::COMPLETE, |
132 | 0 | table_.table()->GetPartitionCount())); |
133 | |
|
134 | 0 | ASSERT_NO_FATALS(WriteData(WriteOpType::UPDATE)); |
135 | 0 | ASSERT_NO_FATALS(VerifyData(1, WriteOpType::UPDATE)); |
136 | |
|
137 | 0 | ASSERT_OK(snapshot_util_->RestoreSnapshot(snapshot_id)); |
138 | |
|
139 | 0 | ASSERT_NO_FATALS(VerifyData(/* num_transactions=*/ 1, WriteOpType::INSERT)); |
140 | 0 | } |
141 | | |
142 | 0 | TEST_F(BackupTxnTest, PointInTimeRestore) { |
143 | 0 | ASSERT_NO_FATALS(WriteData()); |
144 | 0 | auto hybrid_time = cluster_->mini_tablet_server(0)->server()->Clock()->Now(); |
145 | 0 | ASSERT_NO_FATALS(WriteData(WriteOpType::UPDATE)); |
146 | |
|
147 | 0 | auto snapshot_id = ASSERT_RESULT(snapshot_util_->CreateSnapshot(table_)); |
148 | 0 | ASSERT_OK(snapshot_util_->VerifySnapshot(snapshot_id, SysSnapshotEntryPB::COMPLETE, |
149 | 0 | table_.table()->GetPartitionCount())); |
150 | |
|
151 | 0 | ASSERT_OK(snapshot_util_->RestoreSnapshot(snapshot_id, hybrid_time)); |
152 | |
|
153 | 0 | ASSERT_NO_FATALS(VerifyData(/* num_transactions=*/ 1, WriteOpType::INSERT)); |
154 | 0 | } |
155 | | |
156 | | // This test writes a lot of update to the same key. |
157 | | // Then takes snapshot and restores it to time before the write. |
158 | | // So we test how filtering iterator works in case where a lot of record should be skipped. |
159 | 0 | TEST_F(BackupTxnTest, PointInTimeBigSkipRestore) { |
160 | 0 | constexpr int kNumWrites = RegularBuildVsSanitizers(100000, 100); |
161 | 0 | constexpr int kKey = 123; |
162 | |
|
163 | 0 | std::vector<std::future<FlushStatus>> futures; |
164 | 0 | auto session = CreateSession(); |
165 | 0 | ASSERT_OK(WriteRow(session, kKey, 0)); |
166 | 0 | auto hybrid_time = cluster_->mini_tablet_server(0)->server()->Clock()->Now(); |
167 | 0 | for (int r = 1; r <= kNumWrites; ++r) { |
168 | 0 | ASSERT_OK(WriteRow(session, kKey, r, WriteOpType::INSERT, client::Flush::kFalse)); |
169 | 0 | futures.push_back(session->FlushFuture()); |
170 | 0 | } |
171 | |
|
172 | 0 | int good = 0; |
173 | 0 | for (auto& future : futures) { |
174 | 0 | if (future.get().status.ok()) { |
175 | 0 | ++good; |
176 | 0 | } |
177 | 0 | } |
178 | |
|
179 | 0 | LOG(INFO) << "Total good: " << good; |
180 | |
|
181 | 0 | auto snapshot_id = ASSERT_RESULT(snapshot_util_->CreateSnapshot(table_)); |
182 | 0 | ASSERT_OK(snapshot_util_->VerifySnapshot(snapshot_id, SysSnapshotEntryPB::COMPLETE, |
183 | 0 | table_.table()->GetPartitionCount())); |
184 | |
|
185 | 0 | ASSERT_OK(snapshot_util_->RestoreSnapshot(snapshot_id, hybrid_time)); |
186 | |
|
187 | 0 | auto value = ASSERT_RESULT(SelectRow(session, kKey)); |
188 | 0 | ASSERT_EQ(value, 0); |
189 | 0 | } |
190 | | |
191 | | // Restore to the time before current history cutoff. |
192 | 0 | TEST_F(BackupTxnTest, PointInTimeRestoreBeforeHistoryCutoff) { |
193 | 0 | ANNOTATE_UNPROTECTED_WRITE(FLAGS_history_cutoff_propagation_interval_ms) = 1; |
194 | |
|
195 | 0 | ASSERT_NO_FATALS(WriteData()); |
196 | 0 | auto hybrid_time = cluster_->mini_tablet_server(0)->server()->Clock()->Now(); |
197 | 0 | ASSERT_NO_FATALS(WriteData(WriteOpType::UPDATE)); |
198 | |
|
199 | 0 | auto snapshot_id = ASSERT_RESULT(snapshot_util_->CreateSnapshot(table_)); |
200 | 0 | ASSERT_OK(snapshot_util_->VerifySnapshot(snapshot_id, SysSnapshotEntryPB::COMPLETE, |
201 | 0 | table_.table()->GetPartitionCount())); |
202 | |
|
203 | 0 | ANNOTATE_UNPROTECTED_WRITE(FLAGS_timestamp_history_retention_interval_sec) = 0; |
204 | |
|
205 | 0 | ASSERT_OK(WaitFor([this, hybrid_time]() -> Result<bool> { |
206 | 0 | auto peers = ListTabletPeers(cluster_.get(), ListPeersFilter::kLeaders); |
207 | 0 | for (const auto& peer : peers) { |
208 | | // History cutoff is not moved for tablets w/o writes after current history cutoff. |
209 | | // So just ignore such tablets. |
210 | 0 | if (peer->tablet()->mvcc_manager()->LastReplicatedHybridTime() < hybrid_time) { |
211 | 0 | continue; |
212 | 0 | } |
213 | | // Check that history cutoff is after hybrid_time. |
214 | 0 | auto read_operation = tablet::ScopedReadOperation::Create( |
215 | 0 | peer->tablet(), tablet::RequireLease::kTrue, ReadHybridTime::SingleTime(hybrid_time)); |
216 | 0 | if (read_operation.ok()) { |
217 | 0 | auto policy = peer->tablet()->RetentionPolicy(); |
218 | 0 | LOG(INFO) << "Pending history cutoff, tablet: " << peer->tablet_id() |
219 | 0 | << ", current: " << policy->GetRetentionDirective().history_cutoff |
220 | 0 | << ", desired: " << hybrid_time; |
221 | 0 | return false; |
222 | 0 | } |
223 | 0 | if (!read_operation.status().IsSnapshotTooOld()) { |
224 | 0 | return read_operation.status(); |
225 | 0 | } |
226 | 0 | } |
227 | 0 | return true; |
228 | 0 | }, kWaitTimeout, "History retention move past hybrid time")); |
229 | |
|
230 | 0 | ASSERT_OK(snapshot_util_->RestoreSnapshot(snapshot_id, hybrid_time)); |
231 | |
|
232 | 0 | ASSERT_NO_FATALS(VerifyData(/* num_transactions=*/ 1, WriteOpType::INSERT)); |
233 | 0 | } |
234 | | |
235 | 0 | TEST_F(BackupTxnTest, Persistence) { |
236 | 0 | LOG(INFO) << "Write data"; |
237 | |
|
238 | 0 | ASSERT_NO_FATALS(WriteData()); |
239 | |
|
240 | 0 | LOG(INFO) << "Create snapshot"; |
241 | |
|
242 | 0 | auto snapshot_id = ASSERT_RESULT(snapshot_util_->CreateSnapshot(table_)); |
243 | |
|
244 | 0 | LOG(INFO) << "First restart"; |
245 | |
|
246 | 0 | ASSERT_OK(ASSERT_RESULT(cluster_->GetLeaderMiniMaster())->Restart()); |
247 | 0 | ASSERT_OK(snapshot_util_->VerifySnapshot(snapshot_id, SysSnapshotEntryPB::COMPLETE, |
248 | 0 | table_.table()->GetPartitionCount())); |
249 | |
|
250 | 0 | LOG(INFO) << "Create namespace"; |
251 | | |
252 | | // Create namespace and flush, to avoid replaying logs in the master tablet containing the |
253 | | // CREATE_ON_MASTER operation for the snapshot. |
254 | 0 | ASSERT_OK(client_->CreateNamespaceIfNotExists(kTableName.namespace_name() + "_Test", |
255 | 0 | kTableName.namespace_type())); |
256 | |
|
257 | 0 | LOG(INFO) << "Flush"; |
258 | |
|
259 | 0 | auto tablet_peer = ASSERT_RESULT(cluster_->GetLeaderMiniMaster())->tablet_peer(); |
260 | 0 | ASSERT_OK(tablet_peer->tablet()->Flush(tablet::FlushMode::kSync)); |
261 | |
|
262 | 0 | LOG(INFO) << "Second restart"; |
263 | |
|
264 | 0 | ASSERT_OK(ASSERT_RESULT(cluster_->GetLeaderMiniMaster())->Restart()); |
265 | |
|
266 | 0 | LOG(INFO) << "Verify"; |
267 | |
|
268 | 0 | ASSERT_OK(snapshot_util_->VerifySnapshot(snapshot_id, SysSnapshotEntryPB::COMPLETE, |
269 | 0 | table_.table()->GetPartitionCount())); |
270 | 0 | } |
271 | | |
272 | 0 | TEST_F(BackupTxnTest, Delete) { |
273 | 0 | ASSERT_NO_FATALS(WriteData()); |
274 | 0 | auto snapshot_id = ASSERT_RESULT(snapshot_util_->CreateSnapshot(table_)); |
275 | 0 | ASSERT_OK(snapshot_util_->VerifySnapshot(snapshot_id, SysSnapshotEntryPB::COMPLETE, |
276 | 0 | table_.table()->GetPartitionCount())); |
277 | 0 | ASSERT_OK(snapshot_util_->DeleteSnapshot(snapshot_id)); |
278 | 0 | ASSERT_OK(snapshot_util_->WaitAllSnapshotsDeleted()); |
279 | |
|
280 | 0 | SetAtomicFlag(1000, &FLAGS_snapshot_coordinator_cleanup_delay_ms); |
281 | |
|
282 | 0 | ASSERT_OK(snapshot_util_->WaitAllSnapshotsCleaned()); |
283 | 0 | } |
284 | | |
285 | 0 | TEST_F(BackupTxnTest, CleanupAfterRestart) { |
286 | 0 | SetAtomicFlag(300000, &FLAGS_snapshot_coordinator_cleanup_delay_ms); |
287 | |
|
288 | 0 | ASSERT_NO_FATALS(WriteData()); |
289 | 0 | auto snapshot_id = ASSERT_RESULT(snapshot_util_->CreateSnapshot(table_)); |
290 | 0 | ASSERT_OK(snapshot_util_->VerifySnapshot(snapshot_id, SysSnapshotEntryPB::COMPLETE, |
291 | 0 | table_.table()->GetPartitionCount())); |
292 | 0 | ASSERT_OK(snapshot_util_->DeleteSnapshot(snapshot_id)); |
293 | 0 | ASSERT_OK(snapshot_util_->WaitAllSnapshotsDeleted()); |
294 | |
|
295 | 0 | ASSERT_FALSE(ASSERT_RESULT(snapshot_util_->ListSnapshots()).empty()); |
296 | |
|
297 | 0 | SetAtomicFlag(1000, &FLAGS_snapshot_coordinator_cleanup_delay_ms); |
298 | 0 | ASSERT_OK(ASSERT_RESULT(cluster_->GetLeaderMiniMaster())->Restart()); |
299 | |
|
300 | 0 | ASSERT_OK(snapshot_util_->WaitAllSnapshotsCleaned()); |
301 | 0 | } |
302 | | |
303 | 0 | TEST_F(BackupTxnTest, ImportMeta) { |
304 | 0 | ASSERT_NO_FATALS(WriteData()); |
305 | 0 | auto snapshot_id = ASSERT_RESULT(snapshot_util_->CreateSnapshot(table_)); |
306 | 0 | ASSERT_OK(snapshot_util_->VerifySnapshot(snapshot_id, SysSnapshotEntryPB::COMPLETE, |
307 | 0 | table_.table()->GetPartitionCount())); |
308 | |
|
309 | 0 | ASSERT_OK(client_->DeleteTable(kTableName)); |
310 | 0 | ASSERT_OK(client_->DeleteNamespace(kTableName.namespace_name())); |
311 | |
|
312 | 0 | auto snapshots = ASSERT_RESULT(snapshot_util_->ListSnapshots( |
313 | 0 | snapshot_id, ListDeleted::kFalse, PrepareForBackup::kTrue)); |
314 | 0 | ASSERT_EQ(snapshots.size(), 1); |
315 | |
|
316 | 0 | auto import_data = ASSERT_RESULT(snapshot_util_->StartImportSnapshot(snapshots[0])); |
317 | |
|
318 | 0 | ASSERT_OK(WaitFor([this, import_data] { |
319 | 0 | return IsSnapshotImportDone(import_data); |
320 | 0 | }, kWaitTimeout * kTimeMultiplier, "Complete import snapshot")); |
321 | |
|
322 | 0 | ASSERT_OK(table_.Open(kTableName, client_.get())); |
323 | |
|
324 | 0 | ASSERT_NO_FATALS(WriteData()); |
325 | 0 | } |
326 | | |
327 | 0 | TEST_F(BackupTxnTest, Retry) { |
328 | 0 | FLAGS_unresponsive_ts_rpc_timeout_ms = 1000; |
329 | 0 | FLAGS_snapshot_coordinator_poll_interval_ms = 1000; |
330 | |
|
331 | 0 | ASSERT_NO_FATALS(WriteData()); |
332 | |
|
333 | 0 | ShutdownAllTServers(cluster_.get()); |
334 | |
|
335 | 0 | TxnSnapshotId snapshot_id = ASSERT_RESULT(snapshot_util_->StartSnapshot(table_)); |
336 | |
|
337 | 0 | std::this_thread::sleep_for(FLAGS_unresponsive_ts_rpc_timeout_ms * 1ms + 1s); |
338 | |
|
339 | 0 | ASSERT_OK(snapshot_util_->VerifySnapshot(snapshot_id, SysSnapshotEntryPB::CREATING, |
340 | 0 | table_.table()->GetPartitionCount())); |
341 | |
|
342 | 0 | ASSERT_OK(StartAllTServers(cluster_.get())); |
343 | |
|
344 | 0 | ASSERT_OK(snapshot_util_->WaitSnapshotDone(snapshot_id, 15s)); |
345 | |
|
346 | 0 | ASSERT_NO_FATALS(VerifyData()); |
347 | |
|
348 | 0 | ASSERT_NO_FATALS(WriteData(WriteOpType::UPDATE)); |
349 | 0 | ASSERT_NO_FATALS(VerifyData(WriteOpType::UPDATE)); |
350 | |
|
351 | 0 | ASSERT_OK(snapshot_util_->RestoreSnapshot(snapshot_id)); |
352 | |
|
353 | 0 | ASSERT_NO_FATALS(VerifyData()); |
354 | 0 | } |
355 | | |
356 | 0 | TEST_F(BackupTxnTest, Failure) { |
357 | 0 | FLAGS_timestamp_history_retention_interval_sec = 0; |
358 | 0 | FLAGS_history_cutoff_propagation_interval_ms = 1; |
359 | |
|
360 | 0 | ASSERT_NO_FATALS(WriteData()); |
361 | |
|
362 | 0 | ShutdownAllTServers(cluster_.get()); |
363 | |
|
364 | 0 | TxnSnapshotId snapshot_id = ASSERT_RESULT(snapshot_util_->StartSnapshot(table_)); |
365 | |
|
366 | 0 | ASSERT_OK(snapshot_util_->VerifySnapshot(snapshot_id, SysSnapshotEntryPB::CREATING, |
367 | 0 | table_.table()->GetPartitionCount())); |
368 | |
|
369 | 0 | ShutdownAllMasters(cluster_.get()); |
370 | |
|
371 | 0 | ASSERT_OK(StartAllTServers(cluster_.get())); |
372 | | |
373 | | // Wait 2 rounds to be sure that very recent history cutoff committed. |
374 | 0 | std::this_thread::sleep_for(FLAGS_raft_heartbeat_interval_ms * 2ms * kTimeMultiplier); |
375 | |
|
376 | 0 | ASSERT_OK(StartAllMasters(cluster_.get())); |
377 | |
|
378 | 0 | ASSERT_OK(snapshot_util_->WaitSnapshotInState(snapshot_id, SysSnapshotEntryPB::FAILED, 30s)); |
379 | 0 | } |
380 | | |
381 | 0 | TEST_F(BackupTxnTest, Restart) { |
382 | 0 | FLAGS_timestamp_history_retention_interval_sec = |
383 | 0 | std::chrono::duration_cast<std::chrono::seconds>(kWaitTimeout).count() * |
384 | 0 | kTimeMultiplier; |
385 | 0 | FLAGS_history_cutoff_propagation_interval_ms = 1; |
386 | 0 | FLAGS_flush_rocksdb_on_shutdown = false; |
387 | |
|
388 | 0 | ASSERT_NO_FATALS(WriteData()); |
389 | 0 | auto snapshot_id = ASSERT_RESULT(snapshot_util_->CreateSnapshot(table_)); |
390 | |
|
391 | 0 | ShutdownAllMasters(cluster_.get()); |
392 | | |
393 | | // Wait 2 rounds to be sure that very recent history cutoff committed. |
394 | 0 | std::this_thread::sleep_for((FLAGS_timestamp_history_retention_interval_sec + 1) * 1s); |
395 | |
|
396 | 0 | ASSERT_OK(StartAllMasters(cluster_.get())); |
397 | |
|
398 | 0 | ASSERT_OK(snapshot_util_->WaitSnapshotInState(snapshot_id, SysSnapshotEntryPB::COMPLETE, 1s)); |
399 | 0 | } |
400 | | |
401 | 0 | TEST_F(BackupTxnTest, CompleteAndBounceMaster) { |
402 | 0 | ASSERT_NO_FATALS(WriteData()); |
403 | 0 | auto snapshot_id = ASSERT_RESULT(snapshot_util_->CreateSnapshot(table_)); |
404 | |
|
405 | 0 | std::this_thread::sleep_for(1s); |
406 | |
|
407 | 0 | ASSERT_OK(client_->DeleteTable(kTableName)); |
408 | |
|
409 | 0 | auto leader = ASSERT_RESULT(cluster_->GetLeaderMiniMaster()); |
410 | 0 | leader->Shutdown(); |
411 | |
|
412 | 0 | ASSERT_OK(snapshot_util_->WaitSnapshotInState(snapshot_id, SysSnapshotEntryPB::COMPLETE, 1s)); |
413 | |
|
414 | 0 | ASSERT_OK(leader->Start()); |
415 | 0 | } |
416 | | |
417 | 0 | TEST_F(BackupTxnTest, FlushSysCatalogAndDelete) { |
418 | 0 | ASSERT_NO_FATALS(WriteData()); |
419 | 0 | auto snapshot_id = ASSERT_RESULT(snapshot_util_->CreateSnapshot(table_)); |
420 | |
|
421 | 0 | for (size_t i = 0; i != cluster_->num_masters(); ++i) { |
422 | 0 | auto tablet_peer = cluster_->mini_master(i)->tablet_peer(); |
423 | 0 | ASSERT_OK(tablet_peer->tablet()->Flush(tablet::FlushMode::kSync)); |
424 | 0 | } |
425 | |
|
426 | 0 | ShutdownAllTServers(cluster_.get()); |
427 | 0 | ASSERT_OK(snapshot_util_->DeleteSnapshot(snapshot_id)); |
428 | |
|
429 | 0 | FLAGS_flush_rocksdb_on_shutdown = false; |
430 | 0 | ShutdownAllMasters(cluster_.get()); |
431 | |
|
432 | 0 | LOG(INFO) << "Start masters"; |
433 | |
|
434 | 0 | ASSERT_OK(StartAllMasters(cluster_.get())); |
435 | 0 | ASSERT_OK(StartAllTServers(cluster_.get())); |
436 | |
|
437 | 0 | ASSERT_OK(snapshot_util_->WaitSnapshotInState(snapshot_id, SysSnapshotEntryPB::DELETED, 30s)); |
438 | 0 | } |
439 | | |
440 | | // Workload writes same value across all keys in a txn, using sevaral txns in concurrently. |
441 | | // Checks that after restore all keys/tablets report same value. |
442 | 0 | TEST_F(BackupTxnTest, Consistency) { |
443 | 0 | constexpr int kThreads = 5; |
444 | 0 | constexpr int kKeys = 10; |
445 | |
|
446 | 0 | FLAGS_TEST_inject_status_resolver_complete_delay_ms = 100; |
447 | |
|
448 | 0 | TestThreadHolder thread_holder; |
449 | 0 | std::atomic<int> value(0); |
450 | |
|
451 | 0 | for (int i = 0; i != kThreads; ++i) { |
452 | 0 | thread_holder.AddThreadFunctor([this, &stop = thread_holder.stop_flag(), &value] { |
453 | 0 | auto session = CreateSession(); |
454 | 0 | while (!stop.load(std::memory_order_acquire)) { |
455 | 0 | auto txn = CreateTransaction(); |
456 | 0 | session->SetTransaction(txn); |
457 | 0 | auto v = value.fetch_add(1, std::memory_order_acq_rel); |
458 | 0 | for (int j = 0; j != kKeys; ++j) { |
459 | 0 | ASSERT_OK(WriteRow(session, j, v, WriteOpType::INSERT, Flush::kFalse)); |
460 | 0 | } |
461 | 0 | auto status = session->Flush(); |
462 | 0 | if (status.ok()) { |
463 | 0 | status = txn->CommitFuture().get(); |
464 | 0 | } |
465 | 0 | if (!status.ok()) { |
466 | 0 | TransactionError txn_error(status); |
467 | 0 | ASSERT_TRUE(txn_error == TransactionErrorCode::kConflict || |
468 | 0 | txn_error == TransactionErrorCode::kAborted) << status; |
469 | 0 | } else { |
470 | 0 | LOG(INFO) << "Committed: " << txn->id() << ", written: " << v; |
471 | 0 | } |
472 | 0 | } |
473 | 0 | }); |
474 | 0 | } |
475 | |
|
476 | 0 | while (value.load(std::memory_order_acquire) < 100) { |
477 | 0 | std::this_thread::sleep_for(5ms); |
478 | 0 | } |
479 | |
|
480 | 0 | auto snapshot_id = ASSERT_RESULT(snapshot_util_->CreateSnapshot(table_)); |
481 | |
|
482 | 0 | thread_holder.Stop(); |
483 | |
|
484 | 0 | ASSERT_OK(snapshot_util_->RestoreSnapshot(snapshot_id)); |
485 | |
|
486 | 0 | auto session = CreateSession(); |
487 | 0 | int restored_value = -1; |
488 | 0 | for (int j = 0; j != kKeys; ++j) { |
489 | 0 | auto current_value = ASSERT_RESULT(SelectRow(session, j)); |
490 | 0 | LOG(INFO) << "Key: " << j << ", value: " << current_value; |
491 | 0 | if (restored_value == -1) { |
492 | 0 | restored_value = current_value; |
493 | 0 | } else { |
494 | 0 | ASSERT_EQ(restored_value, current_value); |
495 | 0 | } |
496 | 0 | } |
497 | |
|
498 | 0 | LOG(INFO) << "Value: " << restored_value; |
499 | 0 | } |
500 | | |
501 | 0 | void BackupTxnTest::TestDeleteTable(bool restart_masters) { |
502 | 0 | FLAGS_unresponsive_ts_rpc_timeout_ms = 1000; |
503 | 0 | FLAGS_snapshot_coordinator_poll_interval_ms = 2500 * kTimeMultiplier; |
504 | |
|
505 | 0 | ASSERT_NO_FATALS(WriteData()); |
506 | |
|
507 | 0 | ShutdownAllTServers(cluster_.get()); |
508 | 0 | TxnSnapshotId snapshot_id = ASSERT_RESULT(snapshot_util_->StartSnapshot(table_)); |
509 | |
|
510 | 0 | std::this_thread::sleep_for(FLAGS_unresponsive_ts_rpc_timeout_ms * 1ms + 1s); |
511 | 0 | ASSERT_OK(snapshot_util_->VerifySnapshot(snapshot_id, SysSnapshotEntryPB::CREATING, |
512 | 0 | table_.table()->GetPartitionCount())); |
513 | |
|
514 | 0 | ASSERT_OK(client_->DeleteTable(kTableName, false)); |
515 | |
|
516 | 0 | if (restart_masters) { |
517 | 0 | ShutdownAllMasters(cluster_.get()); |
518 | 0 | } |
519 | |
|
520 | 0 | ASSERT_OK(StartAllTServers(cluster_.get())); |
521 | |
|
522 | 0 | if (restart_masters) { |
523 | 0 | ASSERT_OK(StartAllMasters(cluster_.get())); |
524 | 0 | ASSERT_OK(WaitUntilMasterHasLeader(cluster_.get(), 5s)); |
525 | 0 | } |
526 | |
|
527 | 0 | ASSERT_OK(snapshot_util_->WaitSnapshotInState(snapshot_id, SysSnapshotEntryPB::FAILED, |
528 | 0 | 5s * kTimeMultiplier)); |
529 | 0 | } |
530 | | |
531 | 0 | TEST_F(BackupTxnTest, DeleteTable) { |
532 | 0 | TestDeleteTable(/* restart_masters= */ false); |
533 | 0 | } |
534 | | |
535 | 0 | TEST_F(BackupTxnTest, DeleteTableWithMastersRestart) { |
536 | 0 | TestDeleteTable(/* restart_masters= */ true); |
537 | 0 | } |
538 | | |
539 | | } // namespace client |
540 | | } // namespace yb |