/Users/deen/code/yugabyte-db/src/yb/tools/data-patcher.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | // |
13 | | #include <iostream> |
14 | | #include <regex> |
15 | | |
16 | | #include "yb/common/doc_hybrid_time.h" |
17 | | #include "yb/common/transaction.h" |
18 | | |
19 | | #include "yb/consensus/log_index.h" |
20 | | #include "yb/consensus/log_reader.h" |
21 | | #include "yb/consensus/log_util.h" |
22 | | |
23 | | #include "yb/docdb/consensus_frontier.h" |
24 | | #include "yb/docdb/docdb_rocksdb_util.h" |
25 | | #include "yb/docdb/docdb_types.h" |
26 | | #include "yb/docdb/value_type.h" |
27 | | #include "yb/docdb/kv_debug.h" |
28 | | #include "yb/docdb/docdb-internal.h" |
29 | | #include "yb/docdb/value.h" |
30 | | |
31 | | #include "yb/fs/fs_manager.h" |
32 | | |
33 | | #include "yb/gutil/stl_util.h" |
34 | | |
35 | | #include "yb/rocksdb/db/builder.h" |
36 | | #include "yb/rocksdb/db/dbformat.h" |
37 | | #include "yb/rocksdb/db/filename.h" |
38 | | #include "yb/rocksdb/db/version_set.h" |
39 | | #include "yb/rocksdb/db/writebuffer.h" |
40 | | #include "yb/rocksdb/table/block_based_table_reader.h" |
41 | | #include "yb/rocksdb/table/internal_iterator.h" |
42 | | #include "yb/rocksdb/table/table_builder.h" |
43 | | #include "yb/rocksdb/util/file_reader_writer.h" |
44 | | |
45 | | #include "yb/tools/tool_arguments.h" |
46 | | |
47 | | #include "yb/util/bytes_formatter.h" |
48 | | #include "yb/util/date_time.h" |
49 | | #include "yb/util/env.h" |
50 | | #include "yb/util/env_util.h" |
51 | | #include "yb/util/enums.h" |
52 | | #include "yb/util/logging.h" |
53 | | #include "yb/util/metrics.h" |
54 | | #include "yb/util/path_util.h" |
55 | | #include "yb/util/pb_util.h" |
56 | | #include "yb/util/result.h" |
57 | | #include "yb/util/scope_exit.h" |
58 | | #include "yb/util/status.h" |
59 | | #include "yb/util/status_format.h" |
60 | | #include "yb/util/threadpool.h" |
61 | | #include "yb/util/tostring.h" |
62 | | #include "yb/util/string_util.h" |
63 | | |
64 | | using namespace std::placeholders; |
65 | | namespace po = boost::program_options; |
66 | | |
67 | | DECLARE_int32(rocksdb_max_background_flushes); |
68 | | DECLARE_int32(rocksdb_base_background_compactions); |
69 | | DECLARE_int32(rocksdb_max_background_compactions); |
70 | | DECLARE_int32(priority_thread_pool_size); |
71 | | |
72 | | namespace yb { |
73 | | namespace tools { |
74 | | |
75 | | const std::regex kTabletRegex(".*/tablet-[0-9a-f]{32}"); |
76 | | const std::regex kTabletDbRegex(".*/tablet-[0-9a-f]{32}(?:\\.intents)?"); |
77 | | const std::regex kManifestRegex("MANIFEST-[0-9]{6}"); |
78 | | const std::string kTmpExtension = ".tmp"; |
79 | | const std::string kPatchedExtension = ".patched"; |
80 | | const std::string kBackupExtension = ".apply-patch-backup"; |
81 | | const std::string kIntentsExtension = ".intents"; |
82 | | const std::string kToolName = "Data Patcher"; |
83 | | const std::string kLogPrefix = kToolName + ": "; |
84 | | |
85 | | using docdb::StorageDbType; |
86 | | |
87 | | #define DATA_PATCHER_ACTIONS (Help)(AddTime)(SubTime)(ApplyPatch) |
88 | | |
89 | | YB_DEFINE_ENUM(DataPatcherAction, DATA_PATCHER_ACTIONS); |
90 | | YB_DEFINE_ENUM(FileType, (kSST)(kWAL)); |
91 | | |
92 | | const std::string kHelpDescription = "Show help on command"; |
93 | | |
94 | | // ------------------------------------------------------------------------------------------------ |
95 | | // Help command |
96 | | // ------------------------------------------------------------------------------------------------ |
97 | | |
98 | | struct HelpArguments { |
99 | | std::string command; |
100 | | }; |
101 | | |
102 | 0 | std::unique_ptr<OptionsDescription> HelpOptions() { |
103 | 0 | auto result = std::make_unique<OptionsDescriptionImpl<HelpArguments>>(kHelpDescription); |
104 | 0 | result->positional.add("command", 1); |
105 | 0 | result->hidden.add_options() |
106 | 0 | ("command", po::value(&result->args.command)); |
107 | 0 | return result; |
108 | 0 | } |
109 | | |
110 | 0 | CHECKED_STATUS HelpExecute(const HelpArguments& args) { |
111 | 0 | if (args.command.empty()) { |
112 | 0 | ShowCommands<DataPatcherAction>(); |
113 | 0 | return Status::OK(); |
114 | 0 | } |
115 | | |
116 | 0 | ShowHelp(VERIFY_RESULT(ActionByName<DataPatcherAction>(args.command))); |
117 | 0 | return Status::OK(); |
118 | 0 | } |
119 | | |
120 | | // ------------------------------------------------------------------------------------------------ |
121 | | // Utility functions used add/subtract commands as well as in the apply patch command |
122 | | // ------------------------------------------------------------------------------------------------ |
123 | | |
124 | | // ------------------------------------------------------------------------------------------------ |
125 | | // Common implementation for "add time" and "subtract time" commands |
126 | | // ------------------------------------------------------------------------------------------------ |
127 | | |
128 | | struct ChangeTimeArguments { |
129 | | std::string delta; |
130 | | std::string bound_time; |
131 | | std::vector<std::string> data_dirs; |
132 | | std::vector<std::string> wal_dirs; |
133 | | // When concurrency is positive, limit number of concurrent jobs to it. |
134 | | // For zero, do not limit the number of concurrent jobs. |
135 | | int concurrency = 0; |
136 | | size_t max_num_old_wal_entries = 0; |
137 | | bool debug = false; |
138 | | |
139 | 0 | std::string ToString() { |
140 | 0 | return YB_STRUCT_TO_STRING( |
141 | 0 | delta, bound_time, data_dirs, wal_dirs, concurrency, max_num_old_wal_entries, debug); |
142 | 0 | } |
143 | | }; |
144 | | |
145 | 0 | std::unique_ptr<OptionsDescription> ChangeTimeOptions(const std::string& caption) { |
146 | 0 | auto result = std::make_unique<OptionsDescriptionImpl<ChangeTimeArguments>>(caption); |
147 | 0 | auto& args = result->args; |
148 | 0 | result->desc.add_options() |
149 | 0 | ("delta", po::value(&args.delta)->required(), |
150 | 0 | "Delta to add/subtract. For instance 2:30, to use two and a half hour as delta.") |
151 | 0 | ("bound-time", po::value(&args.bound_time)->required(), |
152 | 0 | "All new time before this bound will be ordered and assigned new time keeping the order. " |
153 | 0 | "For instance: 2021-12-02T20:24:07.649298.") |
154 | 0 | ("data-dirs", po::value(&args.data_dirs), "TServer data dirs.") |
155 | 0 | ("wal-dirs", po::value(&args.wal_dirs), |
156 | 0 | "TServer WAL dirs. Not recommended to use while node process is running.") |
157 | 0 | ("concurrency", po::value(&args.concurrency)->default_value(-1), |
158 | 0 | "Max number of concurrent jobs, if this number is less or equals to 0 then number of " |
159 | 0 | "concurrent jobs is unlimited.") |
160 | 0 | ("max-num-old-wal-entries", |
161 | 0 | po::value(&args.max_num_old_wal_entries)->default_value(1000000000L), |
162 | 0 | "Maximum number of WAL entries allowed with timestamps that cannot be shifted by the delta, " |
163 | 0 | "and therefore have to be re-mapped differently.") |
164 | 0 | ("debug", po::bool_switch(&args.debug), |
165 | 0 | "Output detailed debug information. Only practical for a small amount of data."); |
166 | 0 | return result; |
167 | 0 | } |
168 | | |
169 | | class RocksDBHelper { |
170 | | public: |
171 | 0 | RocksDBHelper() : immutable_cf_options_(options_) { |
172 | 0 | docdb::InitRocksDBOptions( |
173 | 0 | &options_, kLogPrefix, /* statistics= */ nullptr, tablet_options_, table_options_); |
174 | 0 | internal_key_comparator_ = std::make_shared<rocksdb::InternalKeyComparator>( |
175 | 0 | options_.comparator); |
176 | 0 | } |
177 | | |
178 | 0 | Result<std::unique_ptr<rocksdb::TableReader>> NewTableReader(const std::string& fname) { |
179 | 0 | uint64_t base_file_size; |
180 | 0 | auto file_reader = VERIFY_RESULT(NewFileReader(fname, &base_file_size)); |
181 | |
|
182 | 0 | std::unique_ptr<rocksdb::TableReader> table_reader; |
183 | 0 | RETURN_NOT_OK(rocksdb::BlockBasedTable::Open( |
184 | 0 | immutable_cf_options_, env_options_, table_options_, internal_key_comparator_, |
185 | 0 | std::move(file_reader), base_file_size, &table_reader |
186 | 0 | )); |
187 | |
|
188 | 0 | auto data_fname = rocksdb::TableBaseToDataFileName(fname); |
189 | 0 | auto data_file_reader = VERIFY_RESULT(NewFileReader(data_fname)); |
190 | 0 | table_reader->SetDataFileReader(std::move(data_file_reader)); |
191 | 0 | return table_reader; |
192 | 0 | } |
193 | | |
194 | | std::unique_ptr<rocksdb::TableBuilder> NewTableBuilder( |
195 | | rocksdb::WritableFileWriter* base_file_writer, |
196 | 0 | rocksdb::WritableFileWriter* data_file_writer) { |
197 | 0 | rocksdb::ImmutableCFOptions immutable_cf_options(options_); |
198 | 0 | return std::unique_ptr<rocksdb::TableBuilder>(rocksdb::NewTableBuilder( |
199 | 0 | immutable_cf_options, internal_key_comparator_, int_tbl_prop_collector_factories_, |
200 | 0 | /* column_family_id= */ 0, base_file_writer, data_file_writer, |
201 | 0 | rocksdb::CompressionType::kSnappyCompression, rocksdb::CompressionOptions())); |
202 | 0 | } |
203 | | |
204 | | Result<std::unique_ptr<rocksdb::RandomAccessFileReader>> NewFileReader( |
205 | 0 | const std::string& fname, uint64_t* file_size = nullptr) { |
206 | 0 | std::unique_ptr<rocksdb::RandomAccessFile> input_file; |
207 | 0 | RETURN_NOT_OK(rocksdb::Env::Default()->NewRandomAccessFile(fname, &input_file, env_options_)); |
208 | 0 | if (file_size) { |
209 | 0 | *file_size = VERIFY_RESULT(input_file->Size()); |
210 | 0 | } |
211 | 0 | return std::make_unique<rocksdb::RandomAccessFileReader>(std::move(input_file)); |
212 | 0 | } |
213 | | |
214 | 0 | Result<std::unique_ptr<rocksdb::WritableFileWriter>> NewFileWriter(const std::string& fname) { |
215 | 0 | std::unique_ptr<rocksdb::WritableFile> file; |
216 | 0 | RETURN_NOT_OK(NewWritableFile(rocksdb::Env::Default(), fname, &file, env_options_)); |
217 | 0 | return std::make_unique<rocksdb::WritableFileWriter>(std::move(file), env_options_); |
218 | 0 | } |
219 | | |
220 | 0 | rocksdb::EnvOptions& env_options() { |
221 | 0 | return env_options_; |
222 | 0 | } |
223 | | |
224 | 0 | rocksdb::Options& options() { |
225 | 0 | return options_; |
226 | 0 | } |
227 | | |
228 | | private: |
229 | | rocksdb::EnvOptions env_options_; |
230 | | tablet::TabletOptions tablet_options_; |
231 | | |
232 | | rocksdb::BlockBasedTableOptions table_options_; |
233 | | rocksdb::Options options_; |
234 | | rocksdb::ImmutableCFOptions immutable_cf_options_; |
235 | | |
236 | | rocksdb::InternalKeyComparatorPtr internal_key_comparator_; |
237 | | rocksdb::IntTblPropCollectorFactories int_tbl_prop_collector_factories_; |
238 | | }; |
239 | | |
240 | | struct DeltaData { |
241 | | MonoDelta delta; |
242 | | HybridTime bound_time; |
243 | | size_t max_num_old_wal_entries = 0; |
244 | | |
245 | | std::vector<MicrosTime> early_times; |
246 | | MicrosTime min_micros_to_keep = 0; |
247 | | MicrosTime base_micros = 0; |
248 | | bool time_map_ready = false; |
249 | | |
250 | | DeltaData(MonoDelta delta_, HybridTime bound_time_, size_t max_num_old_wal_entries_) |
251 | | : delta(delta_), |
252 | | bound_time(bound_time_), |
253 | 0 | max_num_old_wal_entries(max_num_old_wal_entries_) { |
254 | 0 | CHECK_GE(max_num_old_wal_entries, 0); |
255 | 0 | } |
256 | | |
257 | 0 | void AddEarlyTime(HybridTime time) { |
258 | 0 | CHECK(!time_map_ready); |
259 | 0 | if (time <= bound_time) { |
260 | 0 | early_times.push_back(time.GetPhysicalValueMicros()); |
261 | 0 | } |
262 | 0 | } |
263 | | |
264 | 0 | void FinalizeTimeMap() { |
265 | 0 | std::sort(early_times.begin(), early_times.end()); |
266 | 0 | Unique(&early_times); |
267 | | |
268 | | // All old WAL timestamps will be mapped to |
269 | | // [base_micros, bound_time.GetPhysicalValueMicros()), assuming there are no more |
270 | | // than max_num_old_wal_entries of them. |
271 | | // |
272 | | // Old SST timestamps will be left as is if their microsecond component is min_micros_to_keep |
273 | | // or less, or mapped to the range [base_micros - early_times.size(), base_micros - 1] |
274 | | // otherwise, based on their relative position in the early_times array. |
275 | |
|
276 | 0 | base_micros = bound_time.GetPhysicalValueMicros() - max_num_old_wal_entries; |
277 | 0 | min_micros_to_keep = base_micros - early_times.size() - 1; |
278 | 0 | time_map_ready = true; |
279 | 0 | } |
280 | | |
281 | 0 | HybridTime NewEarlyTime(HybridTime ht, FileType file_type) const { |
282 | 0 | CHECK(time_map_ready); |
283 | 0 | if (ht.GetPhysicalValueMicros() <= min_micros_to_keep) |
284 | 0 | return ht; |
285 | | |
286 | 0 | const auto micros = ht.GetPhysicalValueMicros(); |
287 | 0 | auto it = std::lower_bound(early_times.begin(), early_times.end(), micros); |
288 | 0 | CHECK_EQ(*it, micros); |
289 | 0 | switch (file_type) { |
290 | 0 | case FileType::kSST: { |
291 | 0 | auto distance_from_end = early_times.end() - it; |
292 | 0 | auto new_micros = base_micros - distance_from_end; |
293 | 0 | CHECK_LT(new_micros, base_micros); |
294 | 0 | return HybridTime(new_micros, ht.GetLogicalValue()); |
295 | 0 | } |
296 | 0 | case FileType::kWAL: { |
297 | 0 | auto distance_from_start = it - early_times.begin(); |
298 | 0 | auto new_micros = base_micros + distance_from_start; |
299 | 0 | CHECK_GE(new_micros, base_micros); |
300 | 0 | CHECK_LE(new_micros, bound_time.GetPhysicalValueMicros()); |
301 | 0 | return HybridTime(new_micros, ht.GetLogicalValue()); |
302 | 0 | } |
303 | 0 | } |
304 | 0 | FATAL_INVALID_ENUM_VALUE(FileType, file_type); |
305 | 0 | } |
306 | | |
307 | 0 | Result<HybridTime> AddDelta(HybridTime ht, FileType file_type) const { |
308 | 0 | int64_t microseconds = ht.GetPhysicalValueMicros(); |
309 | 0 | int64_t bound_microseconds = bound_time.GetPhysicalValueMicros(); |
310 | 0 | if (microseconds <= bound_microseconds) { |
311 | 0 | return NewEarlyTime(ht, file_type); |
312 | 0 | } |
313 | | |
314 | 0 | int64_t delta_microseconds = delta.ToMicroseconds(); |
315 | 0 | SCHECK_GE(static_cast<int64_t>(microseconds - kYugaByteMicrosecondEpoch), -delta_microseconds, |
316 | 0 | InvalidArgument, Format("Hybrid time underflow: $0 + $1", ht, delta)); |
317 | 0 | int64_t kMaxMilliseconds = HybridTime::kMax.GetPhysicalValueMicros(); |
318 | 0 | SCHECK_GT(kMaxMilliseconds - microseconds, delta_microseconds, InvalidArgument, |
319 | 0 | Format("Hybrid time overflow: $0 + $1", ht, delta)); |
320 | 0 | microseconds += delta_microseconds; |
321 | 0 | return HybridTime::FromMicrosecondsAndLogicalValue(microseconds, ht.GetLogicalValue()); |
322 | 0 | } |
323 | | |
324 | 0 | Result<DocHybridTime> AddDelta(DocHybridTime doc_ht, FileType file_type) const { |
325 | 0 | return DocHybridTime( |
326 | 0 | VERIFY_RESULT(AddDelta(doc_ht.hybrid_time(), file_type)), doc_ht.write_id()); |
327 | 0 | } |
328 | | |
329 | 0 | Result<char*> AddDeltaToSstKey(Slice key, std::vector<char>* buffer) const { |
330 | 0 | auto doc_ht = VERIFY_RESULT(DocHybridTime::DecodeFromEnd(&key)); |
331 | 0 | buffer->resize(std::max(buffer->size(), key.size() + kMaxBytesPerEncodedHybridTime)); |
332 | 0 | memcpy(buffer->data(), key.data(), key.size()); |
333 | 0 | auto new_doc_ht = VERIFY_RESULT(AddDelta(doc_ht, FileType::kSST)); |
334 | 0 | return new_doc_ht.EncodedInDocDbFormat(buffer->data() + key.size()); |
335 | 0 | } |
336 | | |
337 | | }; |
338 | | |
339 | | CHECKED_STATUS AddDeltaToSstFile( |
340 | | const std::string& fname, MonoDelta delta, HybridTime bound_time, |
341 | 0 | size_t max_num_old_wal_entries, bool debug, RocksDBHelper* helper) { |
342 | 0 | LOG(INFO) << "Patching: " << fname << ", " << static_cast<const void*>(&fname); |
343 | |
|
344 | 0 | constexpr size_t kKeySuffixLen = 8; |
345 | 0 | const auto storage_db_type = |
346 | 0 | boost::ends_with(DirName(fname), kIntentsExtension) ? StorageDbType::kIntents |
347 | 0 | : StorageDbType::kRegular; |
348 | |
|
349 | 0 | auto table_reader = VERIFY_RESULT(helper->NewTableReader(fname)); |
350 | |
|
351 | 0 | rocksdb::ReadOptions read_options; |
352 | 0 | std::unique_ptr<rocksdb::InternalIterator> iterator(table_reader->NewIterator(read_options)); |
353 | 0 | auto data_fname = rocksdb::TableBaseToDataFileName(fname); |
354 | |
|
355 | 0 | auto out_dir = DirName(fname) + kPatchedExtension; |
356 | 0 | auto& env = *Env::Default(); |
357 | 0 | RETURN_NOT_OK(env.CreateDirs(out_dir)); |
358 | |
|
359 | 0 | const auto new_fname = JoinPathSegments(out_dir, BaseName(fname)); |
360 | 0 | const auto new_data_fname = rocksdb::TableBaseToDataFileName(new_fname); |
361 | 0 | const auto tmp_fname = new_fname + kTmpExtension; |
362 | 0 | const auto tmp_data_fname = new_data_fname + kTmpExtension; |
363 | 0 | size_t num_entries = 0; |
364 | 0 | size_t total_file_size = 0; |
365 | |
|
366 | 0 | { |
367 | 0 | auto base_file_writer = VERIFY_RESULT(helper->NewFileWriter(tmp_fname)); |
368 | 0 | auto data_file_writer = VERIFY_RESULT(helper->NewFileWriter(tmp_data_fname)); |
369 | |
|
370 | 0 | auto builder = helper->NewTableBuilder(base_file_writer.get(), data_file_writer.get()); |
371 | 0 | const auto add_kv = [&builder, debug, storage_db_type](const Slice& k, const Slice& v) { |
372 | 0 | if (debug) { |
373 | 0 | const Slice user_key(k.data(), k.size() - kKeySuffixLen); |
374 | 0 | auto key_type = docdb::GetKeyType(user_key, storage_db_type); |
375 | 0 | auto rocksdb_value_type = static_cast<rocksdb::ValueType>(*(k.end() - kKeySuffixLen)); |
376 | 0 | LOG(INFO) << "DEBUG: output KV pair " |
377 | 0 | << "(db_type=" << storage_db_type |
378 | 0 | << ", key_type=" << key_type |
379 | 0 | << ", rocksdb_value_type=" << static_cast<uint64_t>(rocksdb_value_type) |
380 | 0 | << "): " |
381 | 0 | << docdb::DocDBKeyToDebugStr(user_key, storage_db_type) |
382 | 0 | << " => " |
383 | 0 | << docdb::DocDBValueToDebugStr(key_type, user_key, v); |
384 | 0 | } |
385 | 0 | builder->Add(k, v); |
386 | 0 | }; |
387 | |
|
388 | 0 | bool done = false; |
389 | 0 | auto se = ScopeExit([&builder, &done] { |
390 | 0 | if (!done) { |
391 | 0 | builder->Abandon(); |
392 | 0 | } |
393 | 0 | }); |
394 | |
|
395 | 0 | DeltaData delta_data(delta, bound_time, max_num_old_wal_entries); |
396 | 0 | std::vector<char> buffer(0x100); |
397 | 0 | faststring txn_metadata_buffer; |
398 | |
|
399 | 0 | std::string value_buffer; |
400 | |
|
401 | 0 | for (int is_final_pass = bound_time ? 0 : 1; is_final_pass != 2; ++is_final_pass) { |
402 | 0 | LOG(INFO) << "Performing the " << (is_final_pass ? "final" : "initial") << " pass for " |
403 | 0 | << "file " << fname; |
404 | 0 | for (iterator->SeekToFirst(); iterator->Valid(); iterator->Next()) { |
405 | 0 | const Slice key = iterator->key(); |
406 | 0 | const auto rocksdb_value_type = |
407 | 0 | static_cast<rocksdb::ValueType>(*(key.end() - kKeySuffixLen)); |
408 | 0 | if (storage_db_type == StorageDbType::kRegular || |
409 | 0 | key[0] != docdb::ValueTypeAsChar::kTransactionId) { |
410 | | // Regular DB entry, or a normal intent entry (not txn metadata or reverse index). |
411 | | // Update the timestamp at the end of the key. |
412 | 0 | const auto key_without_suffix = key.WithoutSuffix(kKeySuffixLen); |
413 | |
|
414 | 0 | bool value_updated = false; |
415 | 0 | if (storage_db_type == StorageDbType::kRegular) { |
416 | 0 | docdb::Value docdb_value; |
417 | 0 | RETURN_NOT_OK(docdb_value.Decode(iterator->value())); |
418 | 0 | if (docdb_value.intent_doc_ht().is_valid()) { |
419 | 0 | auto intent_ht = docdb_value.intent_doc_ht().hybrid_time(); |
420 | 0 | if (is_final_pass) { |
421 | 0 | DocHybridTime new_intent_doc_ht( |
422 | 0 | VERIFY_RESULT(delta_data.AddDelta(intent_ht, FileType::kSST)), |
423 | 0 | docdb_value.intent_doc_ht().write_id()); |
424 | 0 | docdb_value.set_intent_doc_ht(new_intent_doc_ht); |
425 | 0 | value_buffer.clear(); |
426 | 0 | docdb_value.EncodeAndAppend(&value_buffer); |
427 | 0 | value_updated = true; |
428 | 0 | } else { |
429 | 0 | delta_data.AddEarlyTime(intent_ht); |
430 | 0 | } |
431 | 0 | } |
432 | 0 | } |
433 | |
|
434 | 0 | if (is_final_pass) { |
435 | 0 | auto end = VERIFY_RESULT(delta_data.AddDeltaToSstKey(key_without_suffix, &buffer)); |
436 | 0 | memcpy(end, key_without_suffix.end(), kKeySuffixLen); |
437 | 0 | end += kKeySuffixLen; |
438 | 0 | add_kv(Slice(buffer.data(), end), value_updated ? value_buffer : iterator->value()); |
439 | 0 | } else { |
440 | 0 | Slice key_without_suffix_copy = key_without_suffix; |
441 | 0 | auto doc_ht = VERIFY_RESULT(DocHybridTime::DecodeFromEnd(&key_without_suffix_copy)); |
442 | 0 | delta_data.AddEarlyTime(doc_ht.hybrid_time()); |
443 | 0 | } |
444 | 0 | continue; |
445 | 0 | } |
446 | | |
447 | | // Now we know this is the intents DB and the key starts with a transaction id. |
448 | | // In this case, we only modify the value, and never modify the key. |
449 | | |
450 | 0 | if (rocksdb_value_type != rocksdb::ValueType::kTypeValue) { |
451 | 0 | if (is_final_pass) { |
452 | 0 | add_kv(key, iterator->value()); |
453 | 0 | } |
454 | 0 | continue; |
455 | 0 | } |
456 | | |
457 | | // Transaction metadata record. |
458 | 0 | if (key.size() == 1 + TransactionId::StaticSize() + kKeySuffixLen) { |
459 | | // We do not modify the key in this case, only the value. |
460 | | |
461 | | // Modify transaction start time stored in metadata. |
462 | 0 | TransactionMetadataPB metadata_pb; |
463 | 0 | const auto v = iterator->value(); |
464 | 0 | if (!metadata_pb.ParseFromArray(v.data(), narrow_cast<int>(v.size()))) { |
465 | 0 | return STATUS_FORMAT(Corruption, "Bad txn metadata: $0", v.ToDebugHexString()); |
466 | 0 | } |
467 | | |
468 | 0 | if (is_final_pass) { |
469 | 0 | const auto new_start_ht = VERIFY_RESULT(delta_data.AddDelta( |
470 | 0 | HybridTime(metadata_pb.start_hybrid_time()), FileType::kSST)); |
471 | 0 | metadata_pb.set_start_hybrid_time(new_start_ht.ToUint64()); |
472 | 0 | txn_metadata_buffer.clear(); |
473 | 0 | pb_util::SerializeToString(metadata_pb, &txn_metadata_buffer); |
474 | 0 | add_kv(key, txn_metadata_buffer); |
475 | 0 | } else { |
476 | 0 | delta_data.AddEarlyTime(HybridTime(metadata_pb.start_hybrid_time())); |
477 | 0 | } |
478 | 0 | continue; |
479 | 0 | } |
480 | | |
481 | | // Transaction reverse index. We do not modify the timestamp at the end of the key because |
482 | | // it does not matter if it is shifted, only the relative order of those timestamps matters. |
483 | | // We do modify the timestamp stored at the end of the encoded value, because the value is |
484 | | // the intent key. |
485 | 0 | if (is_final_pass) { |
486 | 0 | auto value_end = VERIFY_RESULT_PREPEND( |
487 | 0 | delta_data.AddDeltaToSstKey(iterator->value(), &buffer), |
488 | 0 | Format("Intent key $0, value: $1, filename: $2", |
489 | 0 | iterator->key().ToDebugHexString(), iterator->value().ToDebugHexString(), |
490 | 0 | fname)); |
491 | 0 | add_kv(iterator->key(), Slice(buffer.data(), value_end)); |
492 | 0 | } else { |
493 | 0 | auto value = iterator->value(); |
494 | 0 | auto doc_ht_result = DocHybridTime::DecodeFromEnd(&value); |
495 | 0 | if (!doc_ht_result.ok()) { |
496 | 0 | LOG(INFO) |
497 | 0 | << "Failed to decode hybrid time from the end of value for " |
498 | 0 | << "key " << key.ToDebugHexString() << " (" << FormatSliceAsStr(key) << "), " |
499 | 0 | << "value " << value.ToDebugHexString() << " (" << FormatSliceAsStr(value) << "), " |
500 | 0 | << "decoded value " << DocDBValueToDebugStr( |
501 | 0 | docdb::KeyType::kReverseTxnKey, iterator->key(), iterator->value()); |
502 | 0 | return doc_ht_result.status(); |
503 | 0 | } |
504 | 0 | delta_data.AddEarlyTime(doc_ht_result->hybrid_time()); |
505 | 0 | } |
506 | 0 | } |
507 | |
|
508 | 0 | if (is_final_pass) { |
509 | 0 | done = true; |
510 | 0 | RETURN_NOT_OK(builder->Finish()); |
511 | 0 | num_entries = builder->NumEntries(); |
512 | 0 | total_file_size = builder->TotalFileSize(); |
513 | 0 | } else { |
514 | 0 | delta_data.FinalizeTimeMap(); |
515 | 0 | } |
516 | 0 | } |
517 | 0 | } |
518 | |
|
519 | 0 | RETURN_NOT_OK(env.RenameFile(tmp_data_fname, new_data_fname)); |
520 | 0 | RETURN_NOT_OK(env.RenameFile(tmp_fname, new_fname)); |
521 | 0 | LOG(INFO) << "Generated: " << new_fname << ", with: " << num_entries |
522 | 0 | << " entries and size: " << HumanizeBytes(total_file_size); |
523 | |
|
524 | 0 | return Status::OK(); |
525 | 0 | } |
526 | | |
527 | | // Checks if the given file in the given directory is an SSTable file that we need to process. |
528 | | // That means it belong to a valid tablet directory and that we haven't processed it before. |
529 | | void CheckDataFile( |
530 | 0 | const std::string& dirname, const std::string& fname, std::vector<std::string>* out) { |
531 | 0 | if (!regex_match(dirname, kTabletDbRegex)) { |
532 | 0 | return; |
533 | 0 | } |
534 | 0 | if (!boost::ends_with(fname, ".sst")) { |
535 | 0 | return; |
536 | 0 | } |
537 | 0 | if (Env::Default()->FileExists(JoinPathSegments(dirname + kPatchedExtension, fname))) { |
538 | 0 | return; |
539 | 0 | } |
540 | 0 | auto full_path = JoinPathSegments(dirname, fname); |
541 | 0 | out->push_back(full_path); |
542 | 0 | } |
543 | | |
544 | | CHECKED_STATUS ChangeTimeInDataFiles( |
545 | | MonoDelta delta, HybridTime bound_time, size_t max_num_old_wal_entries, |
546 | 0 | const std::vector<std::string>& dirs, bool debug, TaskRunner* runner) { |
547 | 0 | std::vector<std::string> files_to_process; |
548 | 0 | Env* env = Env::Default(); |
549 | 0 | auto callback = [&files_to_process, env]( |
550 | 0 | Env::FileType type, const std::string& dirname, const std::string& fname) -> Status { |
551 | 0 | if (type == Env::FileType::FILE_TYPE) { |
552 | 0 | CheckDataFile(dirname, fname, &files_to_process); |
553 | 0 | } else { |
554 | 0 | auto full_path = JoinPathSegments(dirname, fname); |
555 | 0 | if (regex_match(full_path, kTabletDbRegex)) { |
556 | 0 | RETURN_NOT_OK(env->CreateDirs(full_path + kPatchedExtension)); |
557 | 0 | } |
558 | 0 | } |
559 | 0 | return Status::OK(); |
560 | 0 | }; |
561 | 0 | for (const auto& dir : dirs) { |
562 | 0 | RETURN_NOT_OK(env->Walk(dir, Env::DirectoryOrder::POST_ORDER, callback)); |
563 | 0 | } |
564 | 0 | std::random_shuffle(files_to_process.begin(), files_to_process.end()); |
565 | 0 | for (const auto& fname : files_to_process) { |
566 | 0 | runner->Submit([fname, delta, bound_time, max_num_old_wal_entries, debug]() { |
567 | 0 | RocksDBHelper helper; |
568 | 0 | return AddDeltaToSstFile(fname, delta, bound_time, max_num_old_wal_entries, debug, &helper); |
569 | 0 | }); |
570 | 0 | } |
571 | 0 | return Status::OK(); |
572 | 0 | } |
573 | | |
574 | | CHECKED_STATUS ChangeTimeInWalDir( |
575 | | MonoDelta delta, HybridTime bound_time, size_t max_num_old_wal_entries, |
576 | 0 | const std::string& dir) { |
577 | 0 | auto env = Env::Default(); |
578 | 0 | auto log_index = make_scoped_refptr<log::LogIndex>(dir); |
579 | 0 | std::unique_ptr<log::LogReader> log_reader; |
580 | 0 | RETURN_NOT_OK(log::LogReader::Open( |
581 | 0 | env, log_index, kLogPrefix, dir, /* table_metric_entity= */ nullptr, |
582 | 0 | /* tablet_metric_entity= */ nullptr, &log_reader)); |
583 | 0 | log::SegmentSequence segments; |
584 | 0 | RETURN_NOT_OK(log_reader->GetSegmentsSnapshot(&segments)); |
585 | 0 | auto patched_dir = dir + kPatchedExtension; |
586 | 0 | RETURN_NOT_OK(env->CreateDirs(patched_dir)); |
587 | 0 | auto new_segment_path = JoinPathSegments(patched_dir, FsManager::GetWalSegmentFileName(1)); |
588 | 0 | auto tmp_segment_path = new_segment_path + kTmpExtension; |
589 | 0 | std::shared_ptr<WritableFile> new_segment_file; |
590 | 0 | { |
591 | 0 | std::unique_ptr<WritableFile> writable_file; |
592 | 0 | RETURN_NOT_OK(env->NewWritableFile(tmp_segment_path, &writable_file)); |
593 | 0 | new_segment_file = std::move(writable_file); |
594 | 0 | } |
595 | 0 | log::WritableLogSegment new_segment(/* path= */ "", new_segment_file); |
596 | | |
597 | | // Set up the new header and footer. |
598 | 0 | log::LogSegmentHeaderPB header; |
599 | 0 | header.set_major_version(log::kLogMajorVersion); |
600 | 0 | header.set_minor_version(log::kLogMinorVersion); |
601 | 0 | header.set_sequence_number(1); |
602 | 0 | header.set_unused_tablet_id("TABLET ID"); |
603 | 0 | header.mutable_unused_schema(); |
604 | |
|
605 | 0 | RETURN_NOT_OK(new_segment.WriteHeaderAndOpen(header)); |
606 | | |
607 | | // Set up the new footer. This will be maintained as the segment is written. |
608 | 0 | size_t num_entries = 0; |
609 | 0 | int64_t min_replicate_index = std::numeric_limits<int64_t>::max(); |
610 | 0 | int64_t max_replicate_index = std::numeric_limits<int64_t>::min(); |
611 | |
|
612 | 0 | faststring buffer; |
613 | 0 | DeltaData delta_data(delta, bound_time, max_num_old_wal_entries); |
614 | 0 | auto add_delta = [&delta_data](HybridTime ht) -> Result<uint64_t> { |
615 | 0 | return VERIFY_RESULT(delta_data.AddDelta(ht, FileType::kWAL)).ToUint64(); |
616 | 0 | }; |
617 | |
|
618 | 0 | for (int is_final_step = bound_time ? 0 : 1; is_final_step != 2; ++is_final_step) { |
619 | 0 | for (const auto& segment : segments) { |
620 | | // Read entry batches of a WAL segment, and write as few entry batches as possible, but still |
621 | | // make sure that we don't create consecutive entries in the same write batch where the Raft |
622 | | // index does not strictly increase. We have a check in log_reader.cc for that. |
623 | | // batch only increasing Raft operation indexes. |
624 | 0 | auto read_result = segment->ReadEntries(); |
625 | 0 | log::LogEntryBatchPB batch; |
626 | 0 | OpId committed_op_id; |
627 | 0 | int64_t last_index = -1; |
628 | |
|
629 | 0 | auto write_entry_batch = [ |
630 | 0 | &batch, &buffer, &num_entries, &new_segment, &read_result, &committed_op_id]( |
631 | 0 | bool last_batch_of_segment) -> Status { |
632 | 0 | if (last_batch_of_segment) { |
633 | 0 | read_result.committed_op_id.ToPB(batch.mutable_committed_op_id()); |
634 | 0 | } else if (committed_op_id.valid()) { |
635 | 0 | committed_op_id.ToPB(batch.mutable_committed_op_id()); |
636 | 0 | } |
637 | 0 | if (!read_result.entry_metadata.empty()) { |
638 | 0 | batch.set_mono_time(read_result.entry_metadata.back().entry_time.ToUInt64()); |
639 | 0 | } |
640 | 0 | buffer.clear(); |
641 | 0 | pb_util::AppendToString(batch, &buffer); |
642 | 0 | num_entries += batch.entry().size(); |
643 | 0 | RETURN_NOT_OK(new_segment.WriteEntryBatch(Slice(buffer))); |
644 | 0 | batch.clear_entry(); |
645 | 0 | return Status::OK(); |
646 | 0 | }; |
647 | |
|
648 | 0 | auto add_entry = [&write_entry_batch, &batch, &last_index, &committed_op_id]( |
649 | 0 | std::unique_ptr<log::LogEntryPB> entry) -> Status { |
650 | 0 | if (entry->has_replicate() && entry->replicate().id().index() <= last_index) { |
651 | 0 | RETURN_NOT_OK(write_entry_batch(/* last_batch_of_segment= */ false)); |
652 | 0 | } |
653 | 0 | last_index = entry->replicate().id().index(); |
654 | 0 | if (entry->has_replicate() && |
655 | 0 | entry->replicate().has_committed_op_id()) { |
656 | 0 | committed_op_id = OpId::FromPB(entry->replicate().committed_op_id()); |
657 | 0 | } |
658 | 0 | batch.mutable_entry()->AddAllocated(entry.release()); |
659 | 0 | return Status::OK(); |
660 | 0 | }; |
661 | |
|
662 | 0 | for (auto& entry : read_result.entries) { |
663 | 0 | auto& replicate = *entry->mutable_replicate(); |
664 | 0 | auto replicate_ht = HybridTime(replicate.hybrid_time()); |
665 | 0 | if (is_final_step) { |
666 | 0 | replicate.set_hybrid_time(VERIFY_RESULT(add_delta(replicate_ht))); |
667 | 0 | } else { |
668 | 0 | delta_data.AddEarlyTime(replicate_ht); |
669 | 0 | } |
670 | 0 | if (replicate.has_transaction_state()) { |
671 | 0 | auto& state = *replicate.mutable_transaction_state(); |
672 | 0 | if (state.status() == TransactionStatus::APPLYING) { |
673 | 0 | auto commit_ht = HybridTime(state.commit_hybrid_time()); |
674 | 0 | if (is_final_step) { |
675 | 0 | state.set_commit_hybrid_time(VERIFY_RESULT(add_delta(commit_ht))); |
676 | 0 | } else { |
677 | 0 | delta_data.AddEarlyTime(commit_ht); |
678 | 0 | } |
679 | 0 | } |
680 | 0 | } else if (replicate.has_history_cutoff()) { |
681 | 0 | auto& state = *replicate.mutable_history_cutoff(); |
682 | 0 | auto history_cutoff_ht = HybridTime(state.history_cutoff()); |
683 | 0 | if (is_final_step) { |
684 | 0 | state.set_history_cutoff(VERIFY_RESULT(add_delta(history_cutoff_ht))); |
685 | 0 | } else { |
686 | 0 | delta_data.AddEarlyTime(history_cutoff_ht); |
687 | 0 | } |
688 | 0 | } |
689 | 0 | if (is_final_step) { |
690 | 0 | auto index = entry->replicate().id().index(); |
691 | 0 | min_replicate_index = std::min(min_replicate_index, index); |
692 | 0 | max_replicate_index = std::max(max_replicate_index, index); |
693 | 0 | RETURN_NOT_OK(add_entry(std::move(entry))); |
694 | 0 | } |
695 | 0 | } |
696 | |
|
697 | 0 | if (is_final_step) { |
698 | 0 | RETURN_NOT_OK(write_entry_batch(/* last_batch_of_segment= */ true)); |
699 | 0 | } |
700 | 0 | LOG(INFO) << "Step " << is_final_step << " processed " << batch.entry().size() |
701 | 0 | << " entries in " << segment->path(); |
702 | 0 | } |
703 | 0 | if (!is_final_step) { |
704 | 0 | delta_data.FinalizeTimeMap(); |
705 | 0 | } |
706 | 0 | } |
707 | |
|
708 | 0 | log::LogSegmentFooterPB footer; |
709 | 0 | footer.set_num_entries(num_entries); |
710 | 0 | if (num_entries) { |
711 | 0 | footer.set_min_replicate_index(min_replicate_index); |
712 | 0 | footer.set_max_replicate_index(max_replicate_index); |
713 | 0 | } |
714 | |
|
715 | 0 | RETURN_NOT_OK(new_segment.WriteFooterAndClose(footer)); |
716 | 0 | return Env::Default()->RenameFile(tmp_segment_path, new_segment_path); |
717 | 0 | } |
718 | | |
719 | | CHECKED_STATUS ChangeTimeInWalDirs( |
720 | | MonoDelta delta, HybridTime bound_time, size_t max_num_old_wal_entries, |
721 | 0 | const std::vector<std::string>& dirs, TaskRunner* runner) { |
722 | 0 | Env* env = Env::Default(); |
723 | 0 | std::vector<std::string> wal_dirs; |
724 | 0 | auto callback = [&wal_dirs]( |
725 | 0 | Env::FileType type, const std::string& dirname, const std::string& fname) { |
726 | 0 | if (type != Env::FileType::DIRECTORY_TYPE) { |
727 | 0 | return Status::OK(); |
728 | 0 | } |
729 | 0 | auto full_path = JoinPathSegments(dirname, fname); |
730 | 0 | if (!regex_match(full_path, kTabletRegex)) { |
731 | 0 | return Status::OK(); |
732 | 0 | } |
733 | 0 | wal_dirs.push_back(full_path); |
734 | 0 | return Status::OK(); |
735 | 0 | }; |
736 | 0 | for (const auto& dir : dirs) { |
737 | 0 | RETURN_NOT_OK(env->Walk(dir, Env::DirectoryOrder::POST_ORDER, callback)); |
738 | 0 | } |
739 | 0 | std::random_shuffle(wal_dirs.begin(), wal_dirs.end()); |
740 | 0 | for (const auto& dir : wal_dirs) { |
741 | 0 | runner->Submit([delta, bound_time, max_num_old_wal_entries, dir] { |
742 | 0 | return ChangeTimeInWalDir(delta, bound_time, max_num_old_wal_entries, dir); |
743 | 0 | }); |
744 | 0 | } |
745 | 0 | return Status::OK(); |
746 | 0 | } |
747 | | |
748 | 0 | CHECKED_STATUS ChangeTimeExecute(const ChangeTimeArguments& args, bool subtract) { |
749 | 0 | auto delta = VERIFY_RESULT(DateTime::IntervalFromString(args.delta)); |
750 | 0 | if (subtract) { |
751 | 0 | delta = -delta; |
752 | 0 | } |
753 | 0 | HybridTime bound_time; |
754 | 0 | if (!args.bound_time.empty()) { |
755 | 0 | bound_time = VERIFY_RESULT(HybridTime::ParseHybridTime(args.bound_time)).AddDelta(-delta); |
756 | 0 | if (bound_time < HybridTime::FromMicros(kYugaByteMicrosecondEpoch)) { |
757 | 0 | return STATUS_FORMAT( |
758 | 0 | InvalidArgument, "Wrong bound-time and delta combination: $0", bound_time); |
759 | 0 | } |
760 | 0 | LOG(INFO) << "Bound time before adding delta: " << bound_time.ToString(); |
761 | 0 | } |
762 | 0 | TaskRunner runner; |
763 | 0 | RETURN_NOT_OK(runner.Init(args.concurrency)); |
764 | 0 | RETURN_NOT_OK(ChangeTimeInDataFiles( |
765 | 0 | delta, bound_time, args.max_num_old_wal_entries, SplitAndFlatten(args.data_dirs), args.debug, |
766 | 0 | &runner)); |
767 | 0 | RETURN_NOT_OK(ChangeTimeInWalDirs( |
768 | 0 | delta, bound_time, args.max_num_old_wal_entries, SplitAndFlatten(args.wal_dirs), &runner)); |
769 | 0 | return runner.Wait(); |
770 | 0 | } |
771 | | |
772 | | // ------------------------------------------------------------------------------------------------ |
773 | | // Add time command |
774 | | // ------------------------------------------------------------------------------------------------ |
775 | | |
776 | | const std::string kAddTimeDescription = "Add time delta to physical time in SST files"; |
777 | | |
778 | | using AddTimeArguments = ChangeTimeArguments; |
779 | | |
780 | 0 | std::unique_ptr<OptionsDescription> AddTimeOptions() { |
781 | 0 | return ChangeTimeOptions(kAddTimeDescription); |
782 | 0 | } |
783 | | |
784 | 0 | CHECKED_STATUS AddTimeExecute(const AddTimeArguments& args) { |
785 | 0 | return ChangeTimeExecute(args, /* subtract= */ false); |
786 | 0 | } |
787 | | |
788 | | // ------------------------------------------------------------------------------------------------ |
789 | | // Subtract time command |
790 | | // ------------------------------------------------------------------------------------------------ |
791 | | |
792 | | const std::string kSubTimeDescription = "Subtract time delta from physical time in SST files"; |
793 | | |
794 | | using SubTimeArguments = ChangeTimeArguments; |
795 | | |
796 | 0 | std::unique_ptr<OptionsDescription> SubTimeOptions() { |
797 | 0 | return ChangeTimeOptions(kSubTimeDescription); |
798 | 0 | } |
799 | | |
800 | 0 | CHECKED_STATUS SubTimeExecute(const SubTimeArguments& args) { |
801 | 0 | return ChangeTimeExecute(args, /* subtract= */ true); |
802 | 0 | } |
803 | | |
804 | | // ------------------------------------------------------------------------------------------------ |
805 | | // Apply patch |
806 | | // ------------------------------------------------------------------------------------------------ |
807 | | |
808 | | const std::string kApplyPatchDescription = "Apply prepared SST files patch"; |
809 | | |
810 | | struct ApplyPatchArguments { |
811 | | std::vector<std::string> data_dirs; |
812 | | std::vector<std::string> wal_dirs; |
813 | | bool dry_run = false; |
814 | | bool revert = false; |
815 | | }; |
816 | | |
817 | 0 | std::unique_ptr<OptionsDescription> ApplyPatchOptions() { |
818 | 0 | auto result = std::make_unique<OptionsDescriptionImpl<ApplyPatchArguments>>( |
819 | 0 | kApplyPatchDescription); |
820 | 0 | auto& args = result->args; |
821 | 0 | result->desc.add_options() |
822 | 0 | ("data-dirs", po::value(&args.data_dirs)->required(), "TServer data dirs") |
823 | 0 | ("wal-dirs", po::value(&args.wal_dirs)->required(), "TServer WAL dirs") |
824 | 0 | ("dry-run", po::bool_switch(&args.dry_run), |
825 | 0 | "Do not make any changes to live data, only check that apply-patch would work. " |
826 | 0 | "This might still involve making changes to files in .patched directories.") |
827 | 0 | ("revert", po::bool_switch(&args.revert), |
828 | 0 | "Revert a previous apply-patch operation. This will move backup RocksDB and WAL " |
829 | 0 | "directories back to their live locations, and the live locations to .patched locations."); |
830 | 0 | return result; |
831 | 0 | } |
832 | | |
833 | | class ApplyPatch { |
834 | | public: |
835 | 0 | CHECKED_STATUS Execute(const ApplyPatchArguments& args) { |
836 | 0 | dry_run_ = args.dry_run; |
837 | 0 | revert_ = args.revert; |
838 | 0 | LOG(INFO) << "Running the ApplyPatch command"; |
839 | 0 | LOG(INFO) << " data_dirs=" << yb::ToString(args.data_dirs); |
840 | 0 | LOG(INFO) << " wal_dirs=" << yb::ToString(args.data_dirs); |
841 | 0 | LOG(INFO) << " dry_run=" << args.dry_run; |
842 | 0 | LOG(INFO) << " revert=" << args.revert; |
843 | 0 | for (const auto& dir : SplitAndFlatten(args.data_dirs)) { |
844 | 0 | RETURN_NOT_OK(env_->Walk( |
845 | 0 | dir, Env::DirectoryOrder::POST_ORDER, |
846 | 0 | std::bind(&ApplyPatch::WalkDataCallback, this, _1, _2, _3))); |
847 | 0 | } |
848 | |
|
849 | 0 | for (const auto& dir : SplitAndFlatten(args.wal_dirs)) { |
850 | 0 | RETURN_NOT_OK(env_->Walk( |
851 | 0 | dir, Env::DirectoryOrder::POST_ORDER, |
852 | 0 | std::bind(&ApplyPatch::WalkWalCallback, this, _1, _2, _3))); |
853 | 0 | } |
854 | |
|
855 | 0 | RocksDBHelper helper; |
856 | 0 | auto options = helper.options(); |
857 | 0 | options.skip_stats_update_on_db_open = true; |
858 | |
|
859 | 0 | int num_revert_errors = 0; |
860 | |
|
861 | 0 | int num_dirs_handled = 0; |
862 | 0 | for (const auto* dirs : {&data_dirs_, &wal_dirs_}) { |
863 | 0 | for (const auto& dir : *dirs) { |
864 | 0 | auto backup_path = dir + kBackupExtension; |
865 | 0 | auto patched_path = dir + kPatchedExtension; |
866 | |
|
867 | 0 | if (revert_) { |
868 | 0 | bool backup_dir_exists = env_->FileExists(backup_path); |
869 | 0 | bool patched_path_exists = env_->FileExists(patched_path); |
870 | 0 | if (backup_dir_exists && !patched_path_exists) { |
871 | 0 | auto rename_status = ChainRename(backup_path, dir, patched_path); |
872 | 0 | if (!rename_status.ok()) { |
873 | 0 | LOG(INFO) << "Error during revert: " << rename_status; |
874 | 0 | num_revert_errors++; |
875 | 0 | } |
876 | 0 | } else { |
877 | 0 | LOG(INFO) |
878 | 0 | << "Not attempting to restore " << backup_path << " to " << dir |
879 | 0 | << " after moving " << dir << " back to " << patched_path |
880 | 0 | << ": " |
881 | 0 | << (backup_dir_exists ? "" : "backup path does not exist; ") |
882 | 0 | << (patched_path_exists ? "patched path already exists" : ""); |
883 | 0 | num_revert_errors++; |
884 | 0 | } |
885 | 0 | continue; |
886 | 0 | } |
887 | | |
888 | 0 | if (dirs == &data_dirs_) { |
889 | 0 | if (valid_rocksdb_dirs_.count(dir)) { |
890 | 0 | LOG(INFO) << "Patching non-live RocksDB metadata in " << patched_path; |
891 | 0 | docdb::RocksDBPatcher patcher(patched_path, options); |
892 | 0 | RETURN_NOT_OK(patcher.Load()); |
893 | 0 | RETURN_NOT_OK(patcher.UpdateFileSizes()); |
894 | 0 | docdb::ConsensusFrontier frontier; |
895 | 0 | frontier.set_hybrid_time(HybridTime::kMin); |
896 | 0 | frontier.set_history_cutoff(HybridTime::FromMicros(kYugaByteMicrosecondEpoch)); |
897 | 0 | RETURN_NOT_OK(patcher.ModifyFlushedFrontier(frontier)); |
898 | 0 | } else { |
899 | 0 | LOG(INFO) << "We did not see RocksDB CURRENT or MANIFEST-... files in " |
900 | 0 | << dir << ", skipping applying " << patched_path; |
901 | 0 | continue; |
902 | 0 | } |
903 | 0 | } |
904 | | |
905 | 0 | RETURN_NOT_OK(ChainRename(patched_path, dir, backup_path)); |
906 | 0 | num_dirs_handled++; |
907 | 0 | } |
908 | 0 | } |
909 | |
|
910 | 0 | LOG(INFO) << "Processed " << num_dirs_handled << " directories (two renames per each)"; |
911 | 0 | if (num_revert_errors) { |
912 | 0 | return STATUS_FORMAT( |
913 | 0 | IOError, |
914 | 0 | "Encountered $0 errors when trying to revert an applied patch. " |
915 | 0 | "Check the log above for details.", |
916 | 0 | num_revert_errors); |
917 | 0 | } |
918 | 0 | return Status::OK(); |
919 | 0 | } |
920 | | |
921 | | private: |
922 | | |
923 | | // ---------------------------------------------------------------------------------------------- |
924 | | // Functions for traversing RocksDB data directories |
925 | | // ---------------------------------------------------------------------------------------------- |
926 | | |
927 | | CHECKED_STATUS WalkDataCallback( |
928 | 0 | Env::FileType type, const std::string& dirname, const std::string& fname) { |
929 | 0 | switch (type) { |
930 | 0 | case Env::FileType::FILE_TYPE: |
931 | 0 | return HandleDataFile(dirname, fname); |
932 | 0 | case Env::FileType::DIRECTORY_TYPE: |
933 | 0 | CheckDirectory(dirname, fname, &data_dirs_); |
934 | 0 | return Status::OK(); |
935 | 0 | } |
936 | 0 | FATAL_INVALID_ENUM_VALUE(Env::FileType, type); |
937 | 0 | } |
938 | | |
939 | | // Handles a file found during walking through the a data (RocksDB) directory tree. Looks for |
940 | | // CURRENT and MANIFEST files and copies them to the corresponding .patched directory. Does not |
941 | | // modify live data of the cluster. |
942 | 0 | CHECKED_STATUS HandleDataFile(const std::string& dirname, const std::string& fname) { |
943 | 0 | if (revert_) { |
944 | | // We don't look at any of the manifest files during the revert operation. |
945 | 0 | return Status::OK(); |
946 | 0 | } |
947 | | |
948 | 0 | if (!regex_match(dirname, kTabletDbRegex)) { |
949 | 0 | return Status::OK(); |
950 | 0 | } |
951 | 0 | if (fname != "CURRENT" && !regex_match(fname, kManifestRegex)) { |
952 | 0 | return Status::OK(); |
953 | 0 | } |
954 | 0 | auto patched_dirname = dirname + kPatchedExtension; |
955 | 0 | if (env_->DirExists(patched_dirname)) { |
956 | 0 | valid_rocksdb_dirs_.insert(dirname); |
957 | 0 | auto full_src_path = JoinPathSegments(dirname, fname); |
958 | 0 | auto full_dst_path = JoinPathSegments(patched_dirname, fname); |
959 | 0 | LOG(INFO) << "Copying file " << full_src_path << " to " << full_dst_path; |
960 | 0 | Status copy_status = env_util::CopyFile(env_, full_src_path, full_dst_path); |
961 | 0 | if (!copy_status.ok()) { |
962 | 0 | LOG(INFO) << "Error copying file " << full_src_path << " to " << full_dst_path << ": " |
963 | 0 | << copy_status; |
964 | 0 | } |
965 | 0 | return copy_status; |
966 | 0 | } |
967 | | |
968 | 0 | LOG(INFO) << "Directory " << patched_dirname << " does not exist, not copying " |
969 | 0 | << "the file " << fname << " there (this is not an error)"; |
970 | 0 | return Status::OK(); |
971 | 0 | } |
972 | | |
973 | | // ---------------------------------------------------------------------------------------------- |
974 | | // Traversing WAL directories |
975 | | // ---------------------------------------------------------------------------------------------- |
976 | | |
977 | | CHECKED_STATUS WalkWalCallback( |
978 | 0 | Env::FileType type, const std::string& dirname, const std::string& fname) { |
979 | 0 | if (type != Env::FileType::DIRECTORY_TYPE) { |
980 | 0 | return Status::OK(); |
981 | 0 | } |
982 | 0 | CheckDirectory(dirname, fname, &wal_dirs_); |
983 | 0 | return Status::OK(); |
984 | 0 | } |
985 | | |
986 | | // ---------------------------------------------------------------------------------------------- |
987 | | // Functions used for both RocksDB data and WALs |
988 | | // ---------------------------------------------------------------------------------------------- |
989 | | |
990 | | // Look at the given directory, and if it is a patched directory (or a backup directory if we are |
991 | | // doing a revert operation), strip off the suffix and add the corresponding live directory to |
992 | | // the given vector. |
993 | | void CheckDirectory( |
994 | 0 | const std::string& dirname, const std::string& fname, std::vector<std::string>* dirs) { |
995 | 0 | const std::string& needed_extension = revert_ ? kBackupExtension : kPatchedExtension; |
996 | 0 | if (!boost::ends_with(fname, needed_extension)) { |
997 | 0 | return; |
998 | 0 | } |
999 | 0 | auto patched_path = JoinPathSegments(dirname, fname); |
1000 | 0 | auto full_path = patched_path.substr(0, patched_path.size() - needed_extension.size()); |
1001 | 0 | if (!regex_match(full_path, kTabletDbRegex)) { |
1002 | 0 | return; |
1003 | 0 | } |
1004 | 0 | dirs->push_back(full_path); |
1005 | 0 | return; |
1006 | 0 | } |
1007 | | |
1008 | | // Renames dir1 -> dir2 -> dir3, starting from the end of the chain. |
1009 | | CHECKED_STATUS ChainRename( |
1010 | 0 | const std::string& dir1, const std::string& dir2, const std::string& dir3) { |
1011 | 0 | RETURN_NOT_OK(SafeRename(dir2, dir3, /* check_dst_collision= */ true)); |
1012 | | |
1013 | | // Don't check that dir2 does not exist, because we haven't actually moved dir2 to dir3 in the |
1014 | | // dry run mode. |
1015 | 0 | return SafeRename(dir1, dir2, /* check_dst_collision= */ false); |
1016 | 0 | } |
1017 | | |
1018 | | // A logging wrapper over directory renaming. In dry-run mode, checks for some errors, but |
1019 | | // check_dst_collision=false allows to skip ensuring that the destination does not exist. |
1020 | | CHECKED_STATUS SafeRename( |
1021 | 0 | const std::string& src, const std::string& dst, bool check_dst_collision) { |
1022 | 0 | if (dry_run_) { |
1023 | 0 | if (!env_->FileExists(src)) { |
1024 | 0 | return STATUS_FORMAT( |
1025 | 0 | IOError, "Would fail to rename $0 to $1, source does not exist", src, dst); |
1026 | 0 | } |
1027 | 0 | if (check_dst_collision && env_->FileExists(dst)) { |
1028 | 0 | return STATUS_FORMAT( |
1029 | 0 | IOError, "Would fail to rename $0 to $1, destination already exists", src, dst); |
1030 | 0 | } |
1031 | 0 | LOG(INFO) << "Would rename " << src << " to " << dst; |
1032 | 0 | return Status::OK(); |
1033 | 0 | } |
1034 | 0 | LOG(INFO) << "Renaming " << src << " to " << dst; |
1035 | 0 | Status s = env_->RenameFile(src, dst); |
1036 | 0 | if (!s.ok()) { |
1037 | 0 | LOG(ERROR) << "Error renaming " << src << " to " << dst << ": " << s; |
1038 | 0 | } |
1039 | 0 | return s; |
1040 | 0 | } |
1041 | | |
1042 | | Env* env_ = Env::Default(); |
1043 | | std::vector<std::string> data_dirs_; |
1044 | | std::vector<std::string> wal_dirs_; |
1045 | | |
1046 | | // The set of tablet RocksDB directories where we found CURRENT and MANIFEST-... files, indicating |
1047 | | // that there is a valid RocksDB database present. |
1048 | | std::set<std::string> valid_rocksdb_dirs_; |
1049 | | |
1050 | | bool dry_run_ = false; |
1051 | | bool revert_ = false; |
1052 | | }; |
1053 | | |
1054 | 0 | CHECKED_STATUS ApplyPatchExecute(const ApplyPatchArguments& args) { |
1055 | 0 | ApplyPatch apply_patch; |
1056 | 0 | return apply_patch.Execute(args); |
1057 | 0 | } |
1058 | | |
1059 | | YB_TOOL_ARGUMENTS(DataPatcherAction, DATA_PATCHER_ACTIONS); |
1060 | | |
1061 | | } // namespace tools |
1062 | | } // namespace yb |
1063 | | |
1064 | 13.2k | int main(int argc, char** argv) { |
1065 | 13.2k | yb::HybridTime::TEST_SetPrettyToString(true); |
1066 | | |
1067 | | // Setup flags to avoid unnecessary logging |
1068 | 13.2k | FLAGS_rocksdb_max_background_flushes = 1; |
1069 | 13.2k | FLAGS_rocksdb_max_background_compactions = 1; |
1070 | 13.2k | FLAGS_rocksdb_base_background_compactions = 1; |
1071 | 13.2k | FLAGS_priority_thread_pool_size = 1; |
1072 | | |
1073 | 13.2k | yb::InitGoogleLoggingSafeBasic(argv[0]); |
1074 | 13.2k | return yb::tools::ExecuteTool<yb::tools::DataPatcherAction>(argc, argv); |
1075 | 13.2k | } |