/Users/deen/code/yugabyte-db/src/yb/tools/data-patcher-test.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | // |
13 | | |
14 | | #include <boost/algorithm/string/join.hpp> |
15 | | |
16 | | #include "yb/integration-tests/cql_test_base.h" |
17 | | #include "yb/integration-tests/mini_cluster.h" |
18 | | |
19 | | #include "yb/master/mini_master.h" |
20 | | |
21 | | #include "yb/server/skewed_clock.h" |
22 | | |
23 | | #include "yb/tserver/mini_tablet_server.h" |
24 | | |
25 | | #include "yb/util/atomic.h" |
26 | | #include "yb/util/physical_time.h" |
27 | | #include "yb/util/range.h" |
28 | | #include "yb/util/subprocess.h" |
29 | | #include "yb/util/string_util.h" |
30 | | #include "yb/util/test_macros.h" |
31 | | #include "yb/util/timestamp.h" |
32 | | |
33 | | #include "yb/gutil/casts.h" |
34 | | #include "yb/gutil/strings/join.h" |
35 | | #include "yb/gutil/strings/split.h" |
36 | | |
37 | | using namespace std::literals; |
38 | | |
39 | | DECLARE_bool(fail_on_out_of_range_clock_skew); |
40 | | DECLARE_double(TEST_transaction_ignore_applying_probability); |
41 | | DECLARE_string(time_source); |
42 | | DECLARE_uint64(clock_skew_force_crash_bound_usec); |
43 | | |
44 | | namespace yb { |
45 | | namespace tools { |
46 | | |
47 | | class DataPatcherTest : public CqlTestBase<MiniCluster> { |
48 | | protected: |
49 | 0 | void SetUp() override { |
50 | 0 | server::SkewedClock::Register(); |
51 | 0 | FLAGS_time_source = server::SkewedClock::kName; |
52 | 0 | CqlTestBase<MiniCluster>::SetUp(); |
53 | 0 | } |
54 | | |
55 | 0 | Result<std::string> RunDataPatcher(const std::vector<std::string>& args) { |
56 | 0 | std::vector<std::string> command{GetToolPath("data-patcher")}; |
57 | 0 | for (const auto& arg : args) { |
58 | 0 | command.push_back(arg); |
59 | 0 | } |
60 | 0 | std::string result; |
61 | 0 | LOG(INFO) << "Run tool: " << JoinStrings(command, " "); |
62 | 0 | RETURN_NOT_OK(Subprocess::Call(command, &result)); |
63 | 0 | std::vector<std::string> output_lines; |
64 | 0 | SplitStringUsing(result, "\n", &output_lines); |
65 | 0 | LOG(INFO) << "Standard output from tool:"; |
66 | 0 | for (const auto& line : output_lines) { |
67 | 0 | LOG(INFO) << line; |
68 | 0 | } |
69 | 0 | return result; |
70 | 0 | } |
71 | | }; |
72 | | |
73 | | // Checks that the values in the table as as expected, and return a vector of their write times. |
74 | | void CheckAndGetWriteTimes( |
75 | | CassandraSession* session, size_t total_values, std::vector<int64_t>* write_times, |
76 | 0 | const char* step_description) { |
77 | 0 | auto result = ASSERT_RESULT(session->ExecuteWithResult("SELECT i, writetime(j) FROM t")); |
78 | 0 | auto iterator = result.CreateIterator(); |
79 | 0 | std::vector<int32_t> values; |
80 | 0 | write_times->clear(); |
81 | 0 | write_times->resize(total_values); |
82 | 0 | std::map<int32_t, int64_t> write_time_by_key; |
83 | 0 | while (iterator.Next()) { |
84 | 0 | auto key = iterator.Row().Value(0).As<int32_t>(); |
85 | 0 | values.push_back(key); |
86 | 0 | auto time = iterator.Row().Value(1).As<int64_t>(); |
87 | 0 | ASSERT_GE(key, 0); |
88 | 0 | ASSERT_LT(key, total_values); |
89 | 0 | if (0 <= key && key < narrow_cast<int32_t>(total_values)) { |
90 | 0 | (*write_times)[key] = time; |
91 | 0 | } |
92 | 0 | } |
93 | 0 | for (size_t k = 0; k < total_values; ++k) { |
94 | 0 | if ((*write_times)[k] == 0) { |
95 | 0 | LOG(WARNING) << "Missing value (" << step_description << "): " << k; |
96 | 0 | } |
97 | 0 | } |
98 | |
|
99 | 0 | std::sort(values.begin(), values.end()); |
100 | 0 | ASSERT_EQ(AsString(values), AsString(Range(total_values))); |
101 | 0 | ASSERT_EQ(total_values, write_times->size()); |
102 | 0 | } |
103 | | |
104 | | template <class Range> |
105 | 0 | CHECKED_STATUS InsertValues(CassandraSession* session, const Range& range) { |
106 | 0 | std::vector<CassandraFuture> futures; |
107 | 0 | for (auto i : range) { |
108 | 0 | auto expr = Format("INSERT INTO t (i, j) VALUES ($0, $0);", i); |
109 | 0 | if (i & 1) { |
110 | 0 | expr = "BEGIN TRANSACTION " + expr + " END TRANSACTION;"; |
111 | 0 | } |
112 | 0 | futures.push_back(session->ExecuteGetFuture(expr)); |
113 | 0 | } |
114 | 0 | for (auto& future : futures) { |
115 | 0 | RETURN_NOT_OK(future.Wait()); |
116 | 0 | } |
117 | 0 | return Status::OK(); |
118 | 0 | } |
119 | | |
120 | 0 | auto NextValueRange(int group_size, size_t* last_value) { |
121 | 0 | *last_value += group_size; |
122 | 0 | return Range(*last_value); |
123 | 0 | } |
124 | | |
125 | 0 | std::vector<int32_t> GetKeyOrder(const std::vector<int64_t>& write_times) { |
126 | 0 | std::vector<int32_t> key_order; |
127 | 0 | key_order.reserve(write_times.size()); |
128 | 0 | for (size_t i = 0; i < write_times.size(); ++i) { |
129 | 0 | key_order.push_back(narrow_cast<int32_t>(i)); |
130 | 0 | } |
131 | 0 | std::sort( |
132 | 0 | key_order.begin(), key_order.end(), [&write_times](int32_t k1, int32_t k2) { |
133 | 0 | return write_times[k1] < write_times[k2]; |
134 | 0 | }); |
135 | 0 | return key_order; |
136 | 0 | } |
137 | | |
138 | | void CheckWriteTimeConsistency( |
139 | | const std::vector<int64_t>& old_write_times, |
140 | 0 | const std::vector<int64_t>& new_write_times) { |
141 | 0 | ASSERT_EQ(old_write_times.size(), new_write_times.size()); |
142 | 0 | const size_t n = old_write_times.size(); |
143 | | |
144 | | // Check that old and new keys, when ordered by their write times, are in the same order. |
145 | 0 | const auto old_order = GetKeyOrder(old_write_times); |
146 | 0 | const auto new_order = GetKeyOrder(old_write_times); |
147 | 0 | ASSERT_VECTORS_EQ(old_order, new_order); |
148 | 0 | const auto& key_order = old_order; |
149 | |
|
150 | 0 | size_t k; |
151 | 0 | for (k = 0; k < n && old_write_times[key_order[k]] == new_write_times[key_order[k]]; ++k) {} |
152 | 0 | LOG(INFO) << "Out of " << n << " keys, the first " << k << " write times were preserved"; |
153 | 0 | } |
154 | | |
155 | | // Write values with normal time. |
156 | | // Jump time to a long delta in future. |
157 | | // Write another set of values. |
158 | | // Stop cluster. |
159 | | // Patch SST files to subtract time delta. |
160 | | // Start cluster. |
161 | | // Write more values. |
162 | | // Check that all write times are before current time. |
163 | 0 | TEST_F(DataPatcherTest, AddTimeDelta) { |
164 | 0 | constexpr int kValueGroupSize = 10; |
165 | 0 | constexpr int kDataPatcherConcurrency = 2; |
166 | 0 | const MonoDelta kMonoDelta(60min * 24 * 365 * 80); |
167 | |
|
168 | 0 | FLAGS_fail_on_out_of_range_clock_skew = false; |
169 | 0 | FLAGS_TEST_transaction_ignore_applying_probability = 0.8; |
170 | |
|
171 | 0 | auto session = ASSERT_RESULT(EstablishSession(driver_.get())); |
172 | 0 | ASSERT_OK(session.ExecuteQuery( |
173 | 0 | "CREATE TABLE t (i INT PRIMARY KEY, j INT) WITH transactions = { 'enabled' : true }")); |
174 | 0 | size_t last_value = 0; |
175 | |
|
176 | 0 | ASSERT_OK(InsertValues(&session, NextValueRange(kValueGroupSize, &last_value))); |
177 | |
|
178 | 0 | std::vector<int64_t> original_write_times; |
179 | 0 | ASSERT_NO_FATALS(CheckAndGetWriteTimes( |
180 | 0 | &session, last_value, &original_write_times, |
181 | 0 | "after inserting initial rows without clock skew")); |
182 | 0 | for (size_t k = 0; k < original_write_times.size(); ++k) { |
183 | 0 | LOG(INFO) << "Write time before clock jump for " << k << ": " << original_write_times[k]; |
184 | 0 | } |
185 | | |
186 | | // Cassandra writetime and PhysicalTime are both in microseconds. |
187 | 0 | const Timestamp time_before_clock_jump(ASSERT_RESULT(WallClock()->Now()).time_point); |
188 | 0 | for (int64_t write_time : original_write_times) { |
189 | 0 | ASSERT_LT(write_time, time_before_clock_jump.value()); |
190 | 0 | } |
191 | |
|
192 | 0 | const auto jump_clocks = [&]() { |
193 | 0 | return JumpClocks(cluster_.get(), kMonoDelta.ToChronoMilliseconds()); |
194 | 0 | }; |
195 | |
|
196 | 0 | auto delta_changers = jump_clocks(); |
197 | 0 | ASSERT_OK(InsertValues(&session, NextValueRange(kValueGroupSize, &last_value))); |
198 | |
|
199 | 0 | ASSERT_OK(InsertValues(&session, NextValueRange(kValueGroupSize, &last_value))); |
200 | |
|
201 | 0 | ASSERT_OK(cluster_->FlushTablets()); |
202 | 0 | ASSERT_OK(cluster_->CompactTablets()); |
203 | | |
204 | | // Insert more values after flushing / compacting data, to make sure there is some data that is |
205 | | // only available in WAL files, not in SSTables. |
206 | 0 | ASSERT_OK(InsertValues(&session, NextValueRange(kValueGroupSize, &last_value))); |
207 | | |
208 | | // When we undo the clock jump, all times should be less than this. |
209 | 0 | const Timestamp time_after_clock_jump(ASSERT_RESULT(WallClock()->Now()).time_point); |
210 | |
|
211 | 0 | std::vector<int64_t> write_times_after_jump; |
212 | 0 | ASSERT_NO_FATALS(CheckAndGetWriteTimes( |
213 | 0 | &session, last_value, &write_times_after_jump, "after inserting some rows with clock skew")); |
214 | 0 | for (size_t k = 0; k < write_times_after_jump.size(); ++k) { |
215 | 0 | LOG(INFO) << "Write time after clock jump for " << k << ": " << write_times_after_jump[k]; |
216 | 0 | } |
217 | |
|
218 | 0 | std::vector<FsManager*> fs_managers; |
219 | 0 | for (size_t i = 0; i != cluster_->num_masters(); ++i) { |
220 | 0 | fs_managers.push_back(&cluster_->mini_master(i)->fs_manager()); |
221 | 0 | } |
222 | 0 | for (size_t i = 0; i != cluster_->num_tablet_servers(); ++i) { |
223 | 0 | fs_managers.push_back(&cluster_->mini_tablet_server(i)->fs_manager()); |
224 | 0 | } |
225 | |
|
226 | 0 | std::vector<std::string> data_root_dirs_vec; |
227 | 0 | std::vector<std::string> wal_root_dirs_vec; |
228 | 0 | for (auto* fs_manager : fs_managers) { |
229 | 0 | auto dirs = fs_manager->GetDataRootDirs(); |
230 | 0 | data_root_dirs_vec.insert(data_root_dirs_vec.end(), dirs.begin(), dirs.end()); |
231 | 0 | dirs = fs_manager->GetWalRootDirs(); |
232 | 0 | wal_root_dirs_vec.insert(wal_root_dirs_vec.end(), dirs.begin(), dirs.end()); |
233 | 0 | } |
234 | 0 | auto data_root_dirs = boost::join(data_root_dirs_vec, ","); |
235 | 0 | auto wal_root_dirs = boost::join(wal_root_dirs_vec, ","); |
236 | 0 | LOG(INFO) << "Data dirs: " << data_root_dirs; |
237 | 0 | LOG(INFO) << "WAL dirs: " << wal_root_dirs; |
238 | |
|
239 | 0 | LOG(INFO) << "Running data-patcher sub-time before shutting down the cluster, without WAL dirs"; |
240 | 0 | std::vector<std::string> args{ |
241 | 0 | "sub-time", "--delta", Format("$0s", static_cast<int64_t>(kMonoDelta.ToSeconds())), |
242 | 0 | "--bound-time", time_before_clock_jump.ToFormattedString(), |
243 | 0 | "--data-dirs", data_root_dirs, |
244 | 0 | "--concurrency", Format("$0", kDataPatcherConcurrency), |
245 | 0 | "--debug" |
246 | 0 | }; |
247 | 0 | ASSERT_OK(RunDataPatcher(args)); |
248 | |
|
249 | 0 | LOG(INFO) << "Shutting down the cluster"; |
250 | 0 | ShutdownCluster(); |
251 | |
|
252 | 0 | args.push_back("--wal-dirs"); |
253 | 0 | args.push_back(wal_root_dirs); |
254 | 0 | LOG(INFO) << "Running data-patcher sub-time after shutting down the cluster, with WAL dirs"; |
255 | 0 | ASSERT_OK(RunDataPatcher(args)); |
256 | |
|
257 | 0 | delta_changers.clear(); |
258 | 0 | LOG(INFO) << "Running data-patcher apply-patch with --dry-run"; |
259 | 0 | args = {"apply-patch", "--data-dirs", data_root_dirs, "--wal-dirs", wal_root_dirs, "--dry-run"}; |
260 | 0 | ASSERT_OK(RunDataPatcher(args)); |
261 | |
|
262 | 0 | args.resize(args.size() - 1); |
263 | 0 | LOG(INFO) << "Running data-patcher apply-patch without --dry-run"; |
264 | 0 | ASSERT_OK(RunDataPatcher(args)); |
265 | |
|
266 | 0 | const auto start_cluster = [&]() { |
267 | 0 | ASSERT_OK(StartCluster()); |
268 | 0 | session = ASSERT_RESULT(EstablishSession(driver_.get())); |
269 | 0 | }; |
270 | 0 | start_cluster(); |
271 | | |
272 | | // Check that we have the same values as before and their timestamps are in the right order. |
273 | 0 | std::vector<int64_t> patched_write_times; |
274 | 0 | ASSERT_NO_FATALS(CheckAndGetWriteTimes( |
275 | 0 | &session, last_value, &patched_write_times, "after using data-patcher to undo clock skew")); |
276 | 0 | ASSERT_NO_FATALS(CheckWriteTimeConsistency(write_times_after_jump, patched_write_times)); |
277 | 0 | for (auto write_time : patched_write_times) { |
278 | 0 | ASSERT_LT(write_time, time_after_clock_jump.value()); |
279 | 0 | } |
280 | |
|
281 | 0 | const auto original_last_value = last_value; |
282 | 0 | ASSERT_OK(InsertValues(&session, NextValueRange(kValueGroupSize, &last_value))); |
283 | 0 | std::vector<int64_t> patched_write_times_with_extra_rows; |
284 | 0 | ASSERT_NO_FATALS(CheckAndGetWriteTimes( |
285 | 0 | &session, last_value, &patched_write_times_with_extra_rows, |
286 | 0 | "after undoing clock skew and inserting more rows")); |
287 | |
|
288 | 0 | LOG(INFO) << "Shutting down cluster before reverting the data-patcher changes"; |
289 | 0 | ShutdownCluster(); |
290 | |
|
291 | 0 | LOG(INFO) << "Running revert with --dry-run"; |
292 | 0 | args.push_back("--revert"); |
293 | 0 | args.push_back("--dry-run"); |
294 | 0 | ASSERT_OK(RunDataPatcher(args)); |
295 | |
|
296 | 0 | LOG(INFO) << "Running revert without --dry-run"; |
297 | 0 | args.resize(args.size() - 1); |
298 | 0 | ASSERT_OK(RunDataPatcher(args)); |
299 | |
|
300 | 0 | LOG(INFO) << "Turning off clock skew checking and restarting the cluster"; |
301 | 0 | SetAtomicFlag(0, &FLAGS_clock_skew_force_crash_bound_usec); |
302 | 0 | SetAtomicFlag(false, &FLAGS_fail_on_out_of_range_clock_skew); |
303 | 0 | start_cluster(); |
304 | 0 | delta_changers = jump_clocks(); |
305 | |
|
306 | 0 | std::vector<int64_t> write_times_after_revert; |
307 | 0 | ASSERT_NO_FATALS(CheckAndGetWriteTimes( |
308 | 0 | &session, original_last_value, &write_times_after_revert, |
309 | 0 | "after reverting the effect of data-patcher, going back to clock skew")); |
310 | 0 | ASSERT_VECTORS_EQ(write_times_after_jump, write_times_after_revert); |
311 | 0 | } |
312 | | |
313 | | } // namespace tools |
314 | | } // namespace yb |