YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/tools/data-patcher-test.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include <boost/algorithm/string/join.hpp>
15
16
#include "yb/integration-tests/cql_test_base.h"
17
#include "yb/integration-tests/mini_cluster.h"
18
19
#include "yb/master/mini_master.h"
20
21
#include "yb/server/skewed_clock.h"
22
23
#include "yb/tserver/mini_tablet_server.h"
24
25
#include "yb/util/atomic.h"
26
#include "yb/util/physical_time.h"
27
#include "yb/util/range.h"
28
#include "yb/util/subprocess.h"
29
#include "yb/util/string_util.h"
30
#include "yb/util/test_macros.h"
31
#include "yb/util/timestamp.h"
32
33
#include "yb/gutil/casts.h"
34
#include "yb/gutil/strings/join.h"
35
#include "yb/gutil/strings/split.h"
36
37
using namespace std::literals;
38
39
DECLARE_bool(fail_on_out_of_range_clock_skew);
40
DECLARE_double(TEST_transaction_ignore_applying_probability);
41
DECLARE_string(time_source);
42
DECLARE_uint64(clock_skew_force_crash_bound_usec);
43
44
namespace yb {
45
namespace tools {
46
47
class DataPatcherTest : public CqlTestBase<MiniCluster> {
48
 protected:
49
0
  void SetUp() override {
50
0
    server::SkewedClock::Register();
51
0
    FLAGS_time_source = server::SkewedClock::kName;
52
0
    CqlTestBase<MiniCluster>::SetUp();
53
0
  }
54
55
0
  Result<std::string> RunDataPatcher(const std::vector<std::string>& args) {
56
0
    std::vector<std::string> command{GetToolPath("data-patcher")};
57
0
    for (const auto& arg : args) {
58
0
      command.push_back(arg);
59
0
    }
60
0
    std::string result;
61
0
    LOG(INFO) << "Run tool: " << JoinStrings(command, " ");
62
0
    RETURN_NOT_OK(Subprocess::Call(command, &result));
63
0
    std::vector<std::string> output_lines;
64
0
    SplitStringUsing(result, "\n", &output_lines);
65
0
    LOG(INFO) << "Standard output from tool:";
66
0
    for (const auto& line : output_lines) {
67
0
      LOG(INFO) << line;
68
0
    }
69
0
    return result;
70
0
  }
71
};
72
73
// Checks that the values in the table as as expected, and return a vector of their write times.
74
void CheckAndGetWriteTimes(
75
    CassandraSession* session, size_t total_values, std::vector<int64_t>* write_times,
76
0
    const char* step_description) {
77
0
  auto result = ASSERT_RESULT(session->ExecuteWithResult("SELECT i, writetime(j) FROM t"));
78
0
  auto iterator = result.CreateIterator();
79
0
  std::vector<int32_t> values;
80
0
  write_times->clear();
81
0
  write_times->resize(total_values);
82
0
  std::map<int32_t, int64_t> write_time_by_key;
83
0
  while (iterator.Next()) {
84
0
    auto key = iterator.Row().Value(0).As<int32_t>();
85
0
    values.push_back(key);
86
0
    auto time = iterator.Row().Value(1).As<int64_t>();
87
0
    ASSERT_GE(key, 0);
88
0
    ASSERT_LT(key, total_values);
89
0
    if (0 <= key && key < narrow_cast<int32_t>(total_values)) {
90
0
      (*write_times)[key] = time;
91
0
    }
92
0
  }
93
0
  for (size_t k = 0; k < total_values; ++k) {
94
0
    if ((*write_times)[k] == 0) {
95
0
      LOG(WARNING) << "Missing value (" << step_description << "): " << k;
96
0
    }
97
0
  }
98
99
0
  std::sort(values.begin(), values.end());
100
0
  ASSERT_EQ(AsString(values), AsString(Range(total_values)));
101
0
  ASSERT_EQ(total_values, write_times->size());
102
0
}
103
104
template <class Range>
105
0
CHECKED_STATUS InsertValues(CassandraSession* session, const Range& range) {
106
0
  std::vector<CassandraFuture> futures;
107
0
  for (auto i : range) {
108
0
    auto expr = Format("INSERT INTO t (i, j) VALUES ($0, $0);", i);
109
0
    if (i & 1) {
110
0
      expr = "BEGIN TRANSACTION " + expr + " END TRANSACTION;";
111
0
    }
112
0
    futures.push_back(session->ExecuteGetFuture(expr));
113
0
  }
114
0
  for (auto& future : futures) {
115
0
    RETURN_NOT_OK(future.Wait());
116
0
  }
117
0
  return Status::OK();
118
0
}
119
120
0
auto NextValueRange(int group_size, size_t* last_value) {
121
0
  *last_value += group_size;
122
0
  return Range(*last_value);
123
0
}
124
125
0
std::vector<int32_t> GetKeyOrder(const std::vector<int64_t>& write_times) {
126
0
  std::vector<int32_t> key_order;
127
0
  key_order.reserve(write_times.size());
128
0
  for (size_t i = 0; i < write_times.size(); ++i) {
129
0
    key_order.push_back(narrow_cast<int32_t>(i));
130
0
  }
131
0
  std::sort(
132
0
      key_order.begin(), key_order.end(), [&write_times](int32_t k1, int32_t k2) {
133
0
        return write_times[k1] < write_times[k2];
134
0
      });
135
0
  return key_order;
136
0
}
137
138
void CheckWriteTimeConsistency(
139
    const std::vector<int64_t>& old_write_times,
140
0
    const std::vector<int64_t>& new_write_times) {
141
0
  ASSERT_EQ(old_write_times.size(), new_write_times.size());
142
0
  const size_t n = old_write_times.size();
143
144
  // Check that old and new keys, when ordered by their write times, are in the same order.
145
0
  const auto old_order = GetKeyOrder(old_write_times);
146
0
  const auto new_order = GetKeyOrder(old_write_times);
147
0
  ASSERT_VECTORS_EQ(old_order, new_order);
148
0
  const auto& key_order = old_order;
149
150
0
  size_t k;
151
0
  for (k = 0; k < n && old_write_times[key_order[k]] == new_write_times[key_order[k]]; ++k) {}
152
0
  LOG(INFO) << "Out of " << n << " keys, the first " << k << " write times were preserved";
153
0
}
154
155
// Write values with normal time.
156
// Jump time to a long delta in future.
157
// Write another set of values.
158
// Stop cluster.
159
// Patch SST files to subtract time delta.
160
// Start cluster.
161
// Write more values.
162
// Check that all write times are before current time.
163
0
TEST_F(DataPatcherTest, AddTimeDelta) {
164
0
  constexpr int kValueGroupSize = 10;
165
0
  constexpr int kDataPatcherConcurrency = 2;
166
0
  const MonoDelta kMonoDelta(60min * 24 * 365 * 80);
167
168
0
  FLAGS_fail_on_out_of_range_clock_skew = false;
169
0
  FLAGS_TEST_transaction_ignore_applying_probability = 0.8;
170
171
0
  auto session = ASSERT_RESULT(EstablishSession(driver_.get()));
172
0
  ASSERT_OK(session.ExecuteQuery(
173
0
      "CREATE TABLE t (i INT PRIMARY KEY, j INT) WITH transactions = { 'enabled' : true }"));
174
0
  size_t last_value = 0;
175
176
0
  ASSERT_OK(InsertValues(&session, NextValueRange(kValueGroupSize, &last_value)));
177
178
0
  std::vector<int64_t> original_write_times;
179
0
  ASSERT_NO_FATALS(CheckAndGetWriteTimes(
180
0
      &session, last_value, &original_write_times,
181
0
      "after inserting initial rows without clock skew"));
182
0
  for (size_t k = 0; k < original_write_times.size(); ++k) {
183
0
    LOG(INFO) << "Write time before clock jump for " << k << ": " << original_write_times[k];
184
0
  }
185
186
  // Cassandra writetime and PhysicalTime are both in microseconds.
187
0
  const Timestamp time_before_clock_jump(ASSERT_RESULT(WallClock()->Now()).time_point);
188
0
  for (int64_t write_time : original_write_times) {
189
0
    ASSERT_LT(write_time, time_before_clock_jump.value());
190
0
  }
191
192
0
  const auto jump_clocks = [&]() {
193
0
    return JumpClocks(cluster_.get(), kMonoDelta.ToChronoMilliseconds());
194
0
  };
195
196
0
  auto delta_changers = jump_clocks();
197
0
  ASSERT_OK(InsertValues(&session, NextValueRange(kValueGroupSize, &last_value)));
198
199
0
  ASSERT_OK(InsertValues(&session, NextValueRange(kValueGroupSize, &last_value)));
200
201
0
  ASSERT_OK(cluster_->FlushTablets());
202
0
  ASSERT_OK(cluster_->CompactTablets());
203
204
  // Insert more values after flushing / compacting data, to make sure there is some data that is
205
  // only available in WAL files, not in SSTables.
206
0
  ASSERT_OK(InsertValues(&session, NextValueRange(kValueGroupSize, &last_value)));
207
208
  // When we undo the clock jump, all times should be less than this.
209
0
  const Timestamp time_after_clock_jump(ASSERT_RESULT(WallClock()->Now()).time_point);
210
211
0
  std::vector<int64_t> write_times_after_jump;
212
0
  ASSERT_NO_FATALS(CheckAndGetWriteTimes(
213
0
      &session, last_value, &write_times_after_jump, "after inserting some rows with clock skew"));
214
0
  for (size_t k = 0; k < write_times_after_jump.size(); ++k) {
215
0
    LOG(INFO) << "Write time after clock jump for " << k << ": " << write_times_after_jump[k];
216
0
  }
217
218
0
  std::vector<FsManager*> fs_managers;
219
0
  for (size_t i = 0; i != cluster_->num_masters(); ++i) {
220
0
    fs_managers.push_back(&cluster_->mini_master(i)->fs_manager());
221
0
  }
222
0
  for (size_t i = 0; i != cluster_->num_tablet_servers(); ++i) {
223
0
    fs_managers.push_back(&cluster_->mini_tablet_server(i)->fs_manager());
224
0
  }
225
226
0
  std::vector<std::string> data_root_dirs_vec;
227
0
  std::vector<std::string> wal_root_dirs_vec;
228
0
  for (auto* fs_manager : fs_managers) {
229
0
    auto dirs = fs_manager->GetDataRootDirs();
230
0
    data_root_dirs_vec.insert(data_root_dirs_vec.end(), dirs.begin(), dirs.end());
231
0
    dirs = fs_manager->GetWalRootDirs();
232
0
    wal_root_dirs_vec.insert(wal_root_dirs_vec.end(), dirs.begin(), dirs.end());
233
0
  }
234
0
  auto data_root_dirs = boost::join(data_root_dirs_vec, ",");
235
0
  auto wal_root_dirs = boost::join(wal_root_dirs_vec, ",");
236
0
  LOG(INFO) << "Data dirs: " << data_root_dirs;
237
0
  LOG(INFO) << "WAL dirs: " << wal_root_dirs;
238
239
0
  LOG(INFO) << "Running data-patcher sub-time before shutting down the cluster, without WAL dirs";
240
0
  std::vector<std::string> args{
241
0
      "sub-time", "--delta", Format("$0s", static_cast<int64_t>(kMonoDelta.ToSeconds())),
242
0
      "--bound-time", time_before_clock_jump.ToFormattedString(),
243
0
      "--data-dirs", data_root_dirs,
244
0
      "--concurrency", Format("$0", kDataPatcherConcurrency),
245
0
      "--debug"
246
0
  };
247
0
  ASSERT_OK(RunDataPatcher(args));
248
249
0
  LOG(INFO) << "Shutting down the cluster";
250
0
  ShutdownCluster();
251
252
0
  args.push_back("--wal-dirs");
253
0
  args.push_back(wal_root_dirs);
254
0
  LOG(INFO) << "Running data-patcher sub-time after shutting down the cluster, with WAL dirs";
255
0
  ASSERT_OK(RunDataPatcher(args));
256
257
0
  delta_changers.clear();
258
0
  LOG(INFO) << "Running data-patcher apply-patch with --dry-run";
259
0
  args = {"apply-patch", "--data-dirs", data_root_dirs, "--wal-dirs", wal_root_dirs, "--dry-run"};
260
0
  ASSERT_OK(RunDataPatcher(args));
261
262
0
  args.resize(args.size() - 1);
263
0
  LOG(INFO) << "Running data-patcher apply-patch without --dry-run";
264
0
  ASSERT_OK(RunDataPatcher(args));
265
266
0
  const auto start_cluster = [&]() {
267
0
    ASSERT_OK(StartCluster());
268
0
    session = ASSERT_RESULT(EstablishSession(driver_.get()));
269
0
  };
270
0
  start_cluster();
271
272
  // Check that we have the same values as before and their timestamps are in the right order.
273
0
  std::vector<int64_t> patched_write_times;
274
0
  ASSERT_NO_FATALS(CheckAndGetWriteTimes(
275
0
      &session, last_value, &patched_write_times, "after using data-patcher to undo clock skew"));
276
0
  ASSERT_NO_FATALS(CheckWriteTimeConsistency(write_times_after_jump, patched_write_times));
277
0
  for (auto write_time : patched_write_times) {
278
0
    ASSERT_LT(write_time, time_after_clock_jump.value());
279
0
  }
280
281
0
  const auto original_last_value = last_value;
282
0
  ASSERT_OK(InsertValues(&session, NextValueRange(kValueGroupSize, &last_value)));
283
0
  std::vector<int64_t> patched_write_times_with_extra_rows;
284
0
  ASSERT_NO_FATALS(CheckAndGetWriteTimes(
285
0
      &session, last_value, &patched_write_times_with_extra_rows,
286
0
      "after undoing clock skew and inserting more rows"));
287
288
0
  LOG(INFO) << "Shutting down cluster before reverting the data-patcher changes";
289
0
  ShutdownCluster();
290
291
0
  LOG(INFO) << "Running revert with --dry-run";
292
0
  args.push_back("--revert");
293
0
  args.push_back("--dry-run");
294
0
  ASSERT_OK(RunDataPatcher(args));
295
296
0
  LOG(INFO) << "Running revert without --dry-run";
297
0
  args.resize(args.size() - 1);
298
0
  ASSERT_OK(RunDataPatcher(args));
299
300
0
  LOG(INFO) << "Turning off clock skew checking and restarting the cluster";
301
0
  SetAtomicFlag(0, &FLAGS_clock_skew_force_crash_bound_usec);
302
0
  SetAtomicFlag(false, &FLAGS_fail_on_out_of_range_clock_skew);
303
0
  start_cluster();
304
0
  delta_changers = jump_clocks();
305
306
0
  std::vector<int64_t> write_times_after_revert;
307
0
  ASSERT_NO_FATALS(CheckAndGetWriteTimes(
308
0
      &session, original_last_value, &write_times_after_revert,
309
0
      "after reverting the effect of data-patcher, going back to clock skew"));
310
0
  ASSERT_VECTORS_EQ(write_times_after_jump, write_times_after_revert);
311
0
}
312
313
} // namespace tools
314
} // namespace yb