YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/tools/data-patcher.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
#include <iostream>
14
#include <regex>
15
16
#include "yb/common/doc_hybrid_time.h"
17
#include "yb/common/transaction.h"
18
19
#include "yb/consensus/log_index.h"
20
#include "yb/consensus/log_reader.h"
21
#include "yb/consensus/log_util.h"
22
23
#include "yb/docdb/consensus_frontier.h"
24
#include "yb/docdb/docdb_rocksdb_util.h"
25
#include "yb/docdb/docdb_types.h"
26
#include "yb/docdb/value_type.h"
27
#include "yb/docdb/kv_debug.h"
28
#include "yb/docdb/docdb-internal.h"
29
#include "yb/docdb/value.h"
30
31
#include "yb/fs/fs_manager.h"
32
33
#include "yb/gutil/stl_util.h"
34
35
#include "yb/rocksdb/db/builder.h"
36
#include "yb/rocksdb/db/dbformat.h"
37
#include "yb/rocksdb/db/filename.h"
38
#include "yb/rocksdb/db/version_set.h"
39
#include "yb/rocksdb/db/writebuffer.h"
40
#include "yb/rocksdb/table/block_based_table_reader.h"
41
#include "yb/rocksdb/table/internal_iterator.h"
42
#include "yb/rocksdb/table/table_builder.h"
43
#include "yb/rocksdb/util/file_reader_writer.h"
44
45
#include "yb/tools/tool_arguments.h"
46
47
#include "yb/util/bytes_formatter.h"
48
#include "yb/util/date_time.h"
49
#include "yb/util/env.h"
50
#include "yb/util/env_util.h"
51
#include "yb/util/enums.h"
52
#include "yb/util/logging.h"
53
#include "yb/util/metrics.h"
54
#include "yb/util/path_util.h"
55
#include "yb/util/pb_util.h"
56
#include "yb/util/result.h"
57
#include "yb/util/scope_exit.h"
58
#include "yb/util/status.h"
59
#include "yb/util/status_format.h"
60
#include "yb/util/threadpool.h"
61
#include "yb/util/tostring.h"
62
#include "yb/util/string_util.h"
63
64
using namespace std::placeholders;
65
namespace po = boost::program_options;
66
67
DECLARE_int32(rocksdb_max_background_flushes);
68
DECLARE_int32(rocksdb_base_background_compactions);
69
DECLARE_int32(rocksdb_max_background_compactions);
70
DECLARE_int32(priority_thread_pool_size);
71
72
namespace yb {
73
namespace tools {
74
75
const std::regex kTabletRegex(".*/tablet-[0-9a-f]{32}");
76
const std::regex kTabletDbRegex(".*/tablet-[0-9a-f]{32}(?:\\.intents)?");
77
const std::regex kManifestRegex("MANIFEST-[0-9]{6}");
78
const std::string kTmpExtension = ".tmp";
79
const std::string kPatchedExtension = ".patched";
80
const std::string kBackupExtension = ".apply-patch-backup";
81
const std::string kIntentsExtension = ".intents";
82
const std::string kToolName = "Data Patcher";
83
const std::string kLogPrefix = kToolName + ": ";
84
85
using docdb::StorageDbType;
86
87
#define DATA_PATCHER_ACTIONS (Help)(AddTime)(SubTime)(ApplyPatch)
88
89
YB_DEFINE_ENUM(DataPatcherAction, DATA_PATCHER_ACTIONS);
90
YB_DEFINE_ENUM(FileType, (kSST)(kWAL));
91
92
const std::string kHelpDescription = "Show help on command";
93
94
// ------------------------------------------------------------------------------------------------
95
// Help command
96
// ------------------------------------------------------------------------------------------------
97
98
struct HelpArguments {
99
  std::string command;
100
};
101
102
0
std::unique_ptr<OptionsDescription> HelpOptions() {
103
0
  auto result = std::make_unique<OptionsDescriptionImpl<HelpArguments>>(kHelpDescription);
104
0
  result->positional.add("command", 1);
105
0
  result->hidden.add_options()
106
0
      ("command", po::value(&result->args.command));
107
0
  return result;
108
0
}
109
110
0
CHECKED_STATUS HelpExecute(const HelpArguments& args) {
111
0
  if (args.command.empty()) {
112
0
    ShowCommands<DataPatcherAction>();
113
0
    return Status::OK();
114
0
  }
115
116
0
  ShowHelp(VERIFY_RESULT(ActionByName<DataPatcherAction>(args.command)));
117
0
  return Status::OK();
118
0
}
119
120
// ------------------------------------------------------------------------------------------------
121
// Utility functions used add/subtract commands as well as in the apply patch command
122
// ------------------------------------------------------------------------------------------------
123
124
// ------------------------------------------------------------------------------------------------
125
// Common implementation for "add time" and "subtract time" commands
126
// ------------------------------------------------------------------------------------------------
127
128
struct ChangeTimeArguments {
129
  std::string delta;
130
  std::string bound_time;
131
  std::vector<std::string> data_dirs;
132
  std::vector<std::string> wal_dirs;
133
  // When concurrency is positive, limit number of concurrent jobs to it.
134
  // For zero, do not limit the number of concurrent jobs.
135
  int concurrency = 0;
136
  size_t max_num_old_wal_entries = 0;
137
  bool debug = false;
138
139
0
  std::string ToString() {
140
0
    return YB_STRUCT_TO_STRING(
141
0
        delta, bound_time, data_dirs, wal_dirs, concurrency, max_num_old_wal_entries, debug);
142
0
  }
143
};
144
145
0
std::unique_ptr<OptionsDescription> ChangeTimeOptions(const std::string& caption) {
146
0
  auto result = std::make_unique<OptionsDescriptionImpl<ChangeTimeArguments>>(caption);
147
0
  auto& args = result->args;
148
0
  result->desc.add_options()
149
0
      ("delta", po::value(&args.delta)->required(),
150
0
       "Delta to add/subtract. For instance 2:30, to use two and a half hour as delta.")
151
0
      ("bound-time", po::value(&args.bound_time)->required(),
152
0
       "All new time before this bound will be ordered and assigned new time keeping the order. "
153
0
           "For instance: 2021-12-02T20:24:07.649298.")
154
0
      ("data-dirs", po::value(&args.data_dirs), "TServer data dirs.")
155
0
      ("wal-dirs", po::value(&args.wal_dirs),
156
0
       "TServer WAL dirs. Not recommended to use while node process is running.")
157
0
      ("concurrency", po::value(&args.concurrency)->default_value(-1),
158
0
       "Max number of concurrent jobs, if this number is less or equals to 0 then number of "
159
0
           "concurrent jobs is unlimited.")
160
0
      ("max-num-old-wal-entries",
161
0
       po::value(&args.max_num_old_wal_entries)->default_value(1000000000L),
162
0
       "Maximum number of WAL entries allowed with timestamps that cannot be shifted by the delta, "
163
0
       "and therefore have to be re-mapped differently.")
164
0
      ("debug", po::bool_switch(&args.debug),
165
0
       "Output detailed debug information. Only practical for a small amount of data.");
166
0
  return result;
167
0
}
168
169
class RocksDBHelper {
170
 public:
171
0
  RocksDBHelper() : immutable_cf_options_(options_) {
172
0
    docdb::InitRocksDBOptions(
173
0
        &options_, kLogPrefix, /* statistics= */ nullptr, tablet_options_, table_options_);
174
0
    internal_key_comparator_ = std::make_shared<rocksdb::InternalKeyComparator>(
175
0
        options_.comparator);
176
0
  }
177
178
0
  Result<std::unique_ptr<rocksdb::TableReader>> NewTableReader(const std::string& fname) {
179
0
    uint64_t base_file_size = 0;
180
0
    auto file_reader = VERIFY_RESULT(NewFileReader(fname, &base_file_size));
181
182
0
    std::unique_ptr<rocksdb::TableReader> table_reader;
183
0
    RETURN_NOT_OK(rocksdb::BlockBasedTable::Open(
184
0
      immutable_cf_options_, env_options_, table_options_, internal_key_comparator_,
185
0
      std::move(file_reader), base_file_size, &table_reader
186
0
    ));
187
188
0
    auto data_fname = rocksdb::TableBaseToDataFileName(fname);
189
0
    auto data_file_reader = VERIFY_RESULT(NewFileReader(data_fname));
190
0
    table_reader->SetDataFileReader(std::move(data_file_reader));
191
0
    return table_reader;
192
0
  }
193
194
  std::unique_ptr<rocksdb::TableBuilder> NewTableBuilder(
195
      rocksdb::WritableFileWriter* base_file_writer,
196
0
      rocksdb::WritableFileWriter* data_file_writer) {
197
0
      rocksdb::ImmutableCFOptions immutable_cf_options(options_);
198
0
     return std::unique_ptr<rocksdb::TableBuilder>(rocksdb::NewTableBuilder(
199
0
        immutable_cf_options, internal_key_comparator_, int_tbl_prop_collector_factories_,
200
0
        /* column_family_id= */ 0, base_file_writer, data_file_writer,
201
0
        rocksdb::CompressionType::kSnappyCompression, rocksdb::CompressionOptions()));
202
0
  }
203
204
  Result<std::unique_ptr<rocksdb::RandomAccessFileReader>> NewFileReader(
205
0
      const std::string& fname, uint64_t* file_size = nullptr) {
206
0
    std::unique_ptr<rocksdb::RandomAccessFile> input_file;
207
0
    RETURN_NOT_OK(rocksdb::Env::Default()->NewRandomAccessFile(fname, &input_file, env_options_));
208
0
    if (file_size) {
209
0
      *file_size = VERIFY_RESULT(input_file->Size());
210
0
    }
211
0
    return std::make_unique<rocksdb::RandomAccessFileReader>(std::move(input_file));
212
0
  }
213
214
0
  Result<std::unique_ptr<rocksdb::WritableFileWriter>> NewFileWriter(const std::string& fname) {
215
0
    std::unique_ptr<rocksdb::WritableFile> file;
216
0
    RETURN_NOT_OK(NewWritableFile(rocksdb::Env::Default(), fname, &file, env_options_));
217
0
    return std::make_unique<rocksdb::WritableFileWriter>(std::move(file), env_options_);
218
0
  }
219
220
0
  rocksdb::EnvOptions& env_options() {
221
0
    return env_options_;
222
0
  }
223
224
0
  rocksdb::Options& options() {
225
0
    return options_;
226
0
  }
227
228
 private:
229
  rocksdb::EnvOptions env_options_;
230
  tablet::TabletOptions tablet_options_;
231
232
  rocksdb::BlockBasedTableOptions table_options_;
233
  rocksdb::Options options_;
234
  rocksdb::ImmutableCFOptions immutable_cf_options_;
235
236
  rocksdb::InternalKeyComparatorPtr internal_key_comparator_;
237
  rocksdb::IntTblPropCollectorFactories int_tbl_prop_collector_factories_;
238
};
239
240
struct DeltaData {
241
  MonoDelta delta;
242
  HybridTime bound_time;
243
  size_t max_num_old_wal_entries = 0;
244
245
  std::vector<MicrosTime> early_times;
246
  MicrosTime min_micros_to_keep = 0;
247
  MicrosTime base_micros = 0;
248
  bool time_map_ready = false;
249
250
  DeltaData(MonoDelta delta_, HybridTime bound_time_, size_t max_num_old_wal_entries_)
251
      : delta(delta_),
252
        bound_time(bound_time_),
253
0
        max_num_old_wal_entries(max_num_old_wal_entries_) {
254
0
    CHECK_GE(max_num_old_wal_entries, 0);
255
0
  }
256
257
0
  void AddEarlyTime(HybridTime time) {
258
0
    CHECK(!time_map_ready);
259
0
    if (time <= bound_time) {
260
0
      early_times.push_back(time.GetPhysicalValueMicros());
261
0
    }
262
0
  }
263
264
0
  void FinalizeTimeMap() {
265
0
    std::sort(early_times.begin(), early_times.end());
266
0
    Unique(&early_times);
267
268
    // All old WAL timestamps will be mapped to
269
    // [base_micros, bound_time.GetPhysicalValueMicros()), assuming there are no more
270
    // than max_num_old_wal_entries of them.
271
    //
272
    // Old SST timestamps will be left as is if their microsecond component is min_micros_to_keep
273
    // or less, or mapped to the range [base_micros - early_times.size(), base_micros - 1]
274
    // otherwise, based on their relative position in the early_times array.
275
276
0
    base_micros = bound_time.GetPhysicalValueMicros() - max_num_old_wal_entries;
277
0
    min_micros_to_keep = base_micros - early_times.size() - 1;
278
0
    time_map_ready = true;
279
0
  }
280
281
0
  HybridTime NewEarlyTime(HybridTime ht, FileType file_type) const {
282
0
    CHECK(time_map_ready);
283
0
    if (ht.GetPhysicalValueMicros() <= min_micros_to_keep)
284
0
      return ht;
285
286
0
    const auto micros = ht.GetPhysicalValueMicros();
287
0
    auto it = std::lower_bound(early_times.begin(), early_times.end(), micros);
288
0
    CHECK_EQ(*it, micros);
289
0
    switch (file_type) {
290
0
      case FileType::kSST: {
291
0
        auto distance_from_end = early_times.end() - it;
292
0
        auto new_micros = base_micros - distance_from_end;
293
0
        CHECK_LT(new_micros, base_micros);
294
0
        return HybridTime(new_micros, ht.GetLogicalValue());
295
0
      }
296
0
      case FileType::kWAL: {
297
0
        auto distance_from_start = it - early_times.begin();
298
0
        auto new_micros = base_micros + distance_from_start;
299
0
        CHECK_GE(new_micros, base_micros);
300
0
        CHECK_LE(new_micros, bound_time.GetPhysicalValueMicros());
301
0
        return HybridTime(new_micros, ht.GetLogicalValue());
302
0
      }
303
0
    }
304
0
    FATAL_INVALID_ENUM_VALUE(FileType, file_type);
305
0
  }
306
307
0
  Result<HybridTime> AddDelta(HybridTime ht, FileType file_type) const {
308
0
    int64_t microseconds = ht.GetPhysicalValueMicros();
309
0
    int64_t bound_microseconds = bound_time.GetPhysicalValueMicros();
310
0
    if (microseconds <= bound_microseconds) {
311
0
      return NewEarlyTime(ht, file_type);
312
0
    }
313
314
0
    int64_t delta_microseconds = delta.ToMicroseconds();
315
0
    SCHECK_GE(static_cast<int64_t>(microseconds - kYugaByteMicrosecondEpoch), -delta_microseconds,
316
0
              InvalidArgument, Format("Hybrid time underflow: $0 + $1", ht, delta));
317
0
    int64_t kMaxMilliseconds = HybridTime::kMax.GetPhysicalValueMicros();
318
0
    SCHECK_GT(kMaxMilliseconds - microseconds, delta_microseconds, InvalidArgument,
319
0
              Format("Hybrid time overflow: $0 + $1", ht, delta));
320
0
    microseconds += delta_microseconds;
321
0
    return HybridTime::FromMicrosecondsAndLogicalValue(microseconds, ht.GetLogicalValue());
322
0
  }
323
324
0
  Result<DocHybridTime> AddDelta(DocHybridTime doc_ht, FileType file_type) const {
325
0
    return DocHybridTime(
326
0
        VERIFY_RESULT(AddDelta(doc_ht.hybrid_time(), file_type)), doc_ht.write_id());
327
0
  }
328
329
0
  Result<char*> AddDeltaToSstKey(Slice key, std::vector<char>* buffer) const {
330
0
    auto doc_ht = VERIFY_RESULT(DocHybridTime::DecodeFromEnd(&key));
331
0
    buffer->resize(std::max(buffer->size(), key.size() + kMaxBytesPerEncodedHybridTime));
332
0
    memcpy(buffer->data(), key.data(), key.size());
333
0
    auto new_doc_ht = VERIFY_RESULT(AddDelta(doc_ht, FileType::kSST));
334
0
    return new_doc_ht.EncodedInDocDbFormat(buffer->data() + key.size());
335
0
  }
336
337
};
338
339
CHECKED_STATUS AddDeltaToSstFile(
340
    const std::string& fname, MonoDelta delta, HybridTime bound_time,
341
0
    size_t max_num_old_wal_entries, bool debug, RocksDBHelper* helper) {
342
0
  LOG(INFO) << "Patching: " << fname << ", " << static_cast<const void*>(&fname);
343
344
0
  constexpr size_t kKeySuffixLen = 8;
345
0
  const auto storage_db_type =
346
0
      boost::ends_with(DirName(fname), kIntentsExtension) ? StorageDbType::kIntents
347
0
                                                          : StorageDbType::kRegular;
348
349
0
  auto table_reader = VERIFY_RESULT(helper->NewTableReader(fname));
350
351
0
  rocksdb::ReadOptions read_options;
352
0
  std::unique_ptr<rocksdb::InternalIterator> iterator(table_reader->NewIterator(read_options));
353
0
  auto data_fname = rocksdb::TableBaseToDataFileName(fname);
354
355
0
  auto out_dir = DirName(fname) + kPatchedExtension;
356
0
  auto& env = *Env::Default();
357
0
  RETURN_NOT_OK(env.CreateDirs(out_dir));
358
359
0
  const auto new_fname = JoinPathSegments(out_dir, BaseName(fname));
360
0
  const auto new_data_fname = rocksdb::TableBaseToDataFileName(new_fname);
361
0
  const auto tmp_fname = new_fname + kTmpExtension;
362
0
  const auto tmp_data_fname = new_data_fname + kTmpExtension;
363
0
  size_t num_entries = 0;
364
0
  size_t total_file_size = 0;
365
366
0
  {
367
0
    auto base_file_writer = VERIFY_RESULT(helper->NewFileWriter(tmp_fname));
368
0
    auto data_file_writer = VERIFY_RESULT(helper->NewFileWriter(tmp_data_fname));
369
370
0
    auto builder = helper->NewTableBuilder(base_file_writer.get(), data_file_writer.get());
371
0
    const auto add_kv = [&builder, debug, storage_db_type](const Slice& k, const Slice& v) {
372
0
      if (debug) {
373
0
        const Slice user_key(k.data(), k.size() - kKeySuffixLen);
374
0
        auto key_type = docdb::GetKeyType(user_key, storage_db_type);
375
0
        auto rocksdb_value_type = static_cast<rocksdb::ValueType>(*(k.end() - kKeySuffixLen));
376
0
        LOG(INFO) << "DEBUG: output KV pair "
377
0
                  << "(db_type=" << storage_db_type
378
0
                  << ", key_type=" << key_type
379
0
                  << ", rocksdb_value_type=" << static_cast<uint64_t>(rocksdb_value_type)
380
0
                  << "): "
381
0
                  << docdb::DocDBKeyToDebugStr(user_key, storage_db_type)
382
0
                  << " => "
383
0
                  << docdb::DocDBValueToDebugStr(key_type, user_key, v);
384
0
      }
385
0
      builder->Add(k, v);
386
0
    };
387
388
0
    bool done = false;
389
0
    auto se = ScopeExit([&builder, &done] {
390
0
      if (!done) {
391
0
        builder->Abandon();
392
0
      }
393
0
    });
394
395
0
    DeltaData delta_data(delta, bound_time, max_num_old_wal_entries);
396
0
    std::vector<char> buffer(0x100);
397
0
    faststring txn_metadata_buffer;
398
399
0
    std::string value_buffer;
400
401
0
    for (int is_final_pass = bound_time ? 0 : 1; is_final_pass != 2; ++is_final_pass) {
402
0
      LOG(INFO) << "Performing the " << (is_final_pass ? "final" : "initial") << " pass for "
403
0
                << "file " << fname;
404
0
      for (iterator->SeekToFirst(); iterator->Valid(); iterator->Next()) {
405
0
        const Slice key = iterator->key();
406
0
        const auto rocksdb_value_type =
407
0
            static_cast<rocksdb::ValueType>(*(key.end() - kKeySuffixLen));
408
0
        if (storage_db_type == StorageDbType::kRegular ||
409
0
            key[0] != docdb::ValueTypeAsChar::kTransactionId) {
410
          // Regular DB entry, or a normal intent entry (not txn metadata or reverse index).
411
          // Update the timestamp at the end of the key.
412
0
          const auto key_without_suffix = key.WithoutSuffix(kKeySuffixLen);
413
414
0
          bool value_updated = false;
415
0
          if (storage_db_type == StorageDbType::kRegular) {
416
0
            docdb::Value docdb_value;
417
0
            auto value_slice = iterator->value();
418
0
            auto control_fields = VERIFY_RESULT(docdb::ValueControlFields::Decode(&value_slice));
419
0
            if (control_fields.intent_doc_ht.is_valid()) {
420
0
              auto intent_ht = control_fields.intent_doc_ht.hybrid_time();
421
0
              if (is_final_pass) {
422
0
                DocHybridTime new_intent_doc_ht(
423
0
                    VERIFY_RESULT(delta_data.AddDelta(intent_ht, FileType::kSST)),
424
0
                    docdb_value.intent_doc_ht().write_id());
425
0
                control_fields.intent_doc_ht = new_intent_doc_ht;
426
0
                value_buffer.clear();
427
0
                control_fields.AppendEncoded(&value_buffer);
428
0
                value_buffer.append(value_slice.cdata(), value_slice.size());
429
0
                value_updated = true;
430
0
              } else {
431
0
                delta_data.AddEarlyTime(intent_ht);
432
0
              }
433
0
            }
434
0
          }
435
436
0
          if (is_final_pass) {
437
0
            auto end = VERIFY_RESULT(delta_data.AddDeltaToSstKey(key_without_suffix, &buffer));
438
0
            memcpy(end, key_without_suffix.end(), kKeySuffixLen);
439
0
            end += kKeySuffixLen;
440
0
            add_kv(Slice(buffer.data(), end), value_updated ? value_buffer : iterator->value());
441
0
          } else {
442
0
            Slice key_without_suffix_copy = key_without_suffix;
443
0
            auto doc_ht = VERIFY_RESULT(DocHybridTime::DecodeFromEnd(&key_without_suffix_copy));
444
0
            delta_data.AddEarlyTime(doc_ht.hybrid_time());
445
0
          }
446
0
          continue;
447
0
        }
448
449
        // Now we know this is the intents DB and the key starts with a transaction id.
450
        // In this case, we only modify the value, and never modify the key.
451
452
0
        if (rocksdb_value_type != rocksdb::ValueType::kTypeValue) {
453
0
          if (is_final_pass) {
454
0
            add_kv(key, iterator->value());
455
0
          }
456
0
          continue;
457
0
        }
458
459
        // Transaction metadata record.
460
0
        if (key.size() == 1 + TransactionId::StaticSize() + kKeySuffixLen) {
461
          // We do not modify the key in this case, only the value.
462
463
          // Modify transaction start time stored in metadata.
464
0
          TransactionMetadataPB metadata_pb;
465
0
          const auto v = iterator->value();
466
0
          if (!metadata_pb.ParseFromArray(v.data(), narrow_cast<int>(v.size()))) {
467
0
            return STATUS_FORMAT(Corruption, "Bad txn metadata: $0", v.ToDebugHexString());
468
0
          }
469
470
0
          if (is_final_pass) {
471
0
            const auto new_start_ht = VERIFY_RESULT(delta_data.AddDelta(
472
0
                HybridTime(metadata_pb.start_hybrid_time()), FileType::kSST));
473
0
            metadata_pb.set_start_hybrid_time(new_start_ht.ToUint64());
474
0
            txn_metadata_buffer.clear();
475
0
            pb_util::SerializeToString(metadata_pb, &txn_metadata_buffer);
476
0
            add_kv(key, txn_metadata_buffer);
477
0
          } else {
478
0
            delta_data.AddEarlyTime(HybridTime(metadata_pb.start_hybrid_time()));
479
0
          }
480
0
          continue;
481
0
        }
482
483
        // Transaction reverse index. We do not modify the timestamp at the end of the key because
484
        // it does not matter if it is shifted, only the relative order of those timestamps matters.
485
        // We do modify the timestamp stored at the end of the encoded value, because the value is
486
        // the intent key.
487
0
        if (is_final_pass) {
488
0
          auto value_end = VERIFY_RESULT_PREPEND(
489
0
              delta_data.AddDeltaToSstKey(iterator->value(), &buffer),
490
0
              Format("Intent key $0, value: $1, filename: $2",
491
0
                      iterator->key().ToDebugHexString(), iterator->value().ToDebugHexString(),
492
0
                      fname));
493
0
          add_kv(iterator->key(), Slice(buffer.data(), value_end));
494
0
        } else {
495
0
          auto value = iterator->value();
496
0
          auto doc_ht_result = DocHybridTime::DecodeFromEnd(&value);
497
0
          if (!doc_ht_result.ok()) {
498
0
            LOG(INFO)
499
0
                << "Failed to decode hybrid time from the end of value for "
500
0
                << "key " << key.ToDebugHexString() << " (" << FormatSliceAsStr(key) << "), "
501
0
                << "value " << value.ToDebugHexString() << " (" << FormatSliceAsStr(value) << "), "
502
0
                << "decoded value " << DocDBValueToDebugStr(
503
0
                    docdb::KeyType::kReverseTxnKey, iterator->key(), iterator->value());
504
0
            return doc_ht_result.status();
505
0
          }
506
0
          delta_data.AddEarlyTime(doc_ht_result->hybrid_time());
507
0
        }
508
0
      }
509
510
0
      if (is_final_pass) {
511
0
        done = true;
512
0
        RETURN_NOT_OK(builder->Finish());
513
0
        num_entries = builder->NumEntries();
514
0
        total_file_size = builder->TotalFileSize();
515
0
      } else {
516
0
        delta_data.FinalizeTimeMap();
517
0
      }
518
0
    }
519
0
  }
520
521
0
  RETURN_NOT_OK(env.RenameFile(tmp_data_fname, new_data_fname));
522
0
  RETURN_NOT_OK(env.RenameFile(tmp_fname, new_fname));
523
0
  LOG(INFO) << "Generated: " << new_fname << ", with: " << num_entries
524
0
            << " entries and size: " << HumanizeBytes(total_file_size);
525
526
0
  return Status::OK();
527
0
}
528
529
// Checks if the given file in the given directory is an SSTable file that we need to process.
530
// That means it belong to a valid tablet directory and that we haven't processed it before.
531
void CheckDataFile(
532
0
    const std::string& dirname, const std::string& fname, std::vector<std::string>* out) {
533
0
  if (!regex_match(dirname, kTabletDbRegex)) {
534
0
    return;
535
0
  }
536
0
  if (!boost::ends_with(fname, ".sst")) {
537
0
    return;
538
0
  }
539
0
  if (Env::Default()->FileExists(JoinPathSegments(dirname + kPatchedExtension, fname))) {
540
0
    return;
541
0
  }
542
0
  auto full_path = JoinPathSegments(dirname, fname);
543
0
  out->push_back(full_path);
544
0
}
545
546
CHECKED_STATUS ChangeTimeInDataFiles(
547
    MonoDelta delta, HybridTime bound_time, size_t max_num_old_wal_entries,
548
0
    const std::vector<std::string>& dirs, bool debug, TaskRunner* runner) {
549
0
  std::vector<std::string> files_to_process;
550
0
  Env* env = Env::Default();
551
0
  auto callback = [&files_to_process, env](
552
0
      Env::FileType type, const std::string& dirname, const std::string& fname) -> Status {
553
0
    if (type == Env::FileType::FILE_TYPE) {
554
0
      CheckDataFile(dirname, fname, &files_to_process);
555
0
    } else {
556
0
      auto full_path = JoinPathSegments(dirname, fname);
557
0
      if (regex_match(full_path, kTabletDbRegex)) {
558
0
        RETURN_NOT_OK(env->CreateDirs(full_path + kPatchedExtension));
559
0
      }
560
0
    }
561
0
    return Status::OK();
562
0
  };
563
0
  for (const auto& dir : dirs) {
564
0
    RETURN_NOT_OK(env->Walk(dir, Env::DirectoryOrder::POST_ORDER, callback));
565
0
  }
566
0
  std::random_shuffle(files_to_process.begin(), files_to_process.end());
567
0
  for (const auto& fname : files_to_process) {
568
0
    runner->Submit([fname, delta, bound_time, max_num_old_wal_entries, debug]() {
569
0
      RocksDBHelper helper;
570
0
      return AddDeltaToSstFile(fname, delta, bound_time, max_num_old_wal_entries, debug, &helper);
571
0
    });
572
0
  }
573
0
  return Status::OK();
574
0
}
575
576
CHECKED_STATUS ChangeTimeInWalDir(
577
    MonoDelta delta, HybridTime bound_time, size_t max_num_old_wal_entries,
578
0
    const std::string& dir) {
579
0
  auto env = Env::Default();
580
0
  auto log_index = make_scoped_refptr<log::LogIndex>(dir);
581
0
  std::unique_ptr<log::LogReader> log_reader;
582
0
  RETURN_NOT_OK(log::LogReader::Open(
583
0
      env, log_index, kLogPrefix, dir, /* table_metric_entity= */ nullptr,
584
0
      /* tablet_metric_entity= */ nullptr, &log_reader));
585
0
  log::SegmentSequence segments;
586
0
  RETURN_NOT_OK(log_reader->GetSegmentsSnapshot(&segments));
587
0
  auto patched_dir = dir + kPatchedExtension;
588
0
  RETURN_NOT_OK(env->CreateDirs(patched_dir));
589
0
  auto new_segment_path = JoinPathSegments(patched_dir, FsManager::GetWalSegmentFileName(1));
590
0
  auto tmp_segment_path = new_segment_path + kTmpExtension;
591
0
  std::shared_ptr<WritableFile> new_segment_file;
592
0
  {
593
0
    std::unique_ptr<WritableFile> writable_file;
594
0
    RETURN_NOT_OK(env->NewWritableFile(tmp_segment_path, &writable_file));
595
0
    new_segment_file = std::move(writable_file);
596
0
  }
597
0
  log::WritableLogSegment new_segment(/* path= */ "", new_segment_file);
598
599
  // Set up the new header and footer.
600
0
  log::LogSegmentHeaderPB header;
601
0
  header.set_major_version(log::kLogMajorVersion);
602
0
  header.set_minor_version(log::kLogMinorVersion);
603
0
  header.set_sequence_number(1);
604
0
  header.set_unused_tablet_id("TABLET ID");
605
0
  header.mutable_unused_schema();
606
607
0
  RETURN_NOT_OK(new_segment.WriteHeaderAndOpen(header));
608
609
  // Set up the new footer. This will be maintained as the segment is written.
610
0
  size_t num_entries = 0;
611
0
  int64_t min_replicate_index = std::numeric_limits<int64_t>::max();
612
0
  int64_t max_replicate_index = std::numeric_limits<int64_t>::min();
613
614
0
  faststring buffer;
615
0
  DeltaData delta_data(delta, bound_time, max_num_old_wal_entries);
616
0
  auto add_delta = [&delta_data](HybridTime ht) -> Result<uint64_t> {
617
0
    return VERIFY_RESULT(delta_data.AddDelta(ht, FileType::kWAL)).ToUint64();
618
0
  };
619
620
0
  for (int is_final_step = bound_time ? 0 : 1; is_final_step != 2; ++is_final_step) {
621
0
    for (const auto& segment : segments) {
622
      // Read entry batches of a WAL segment, and write as few entry batches as possible, but still
623
      // make sure that we don't create consecutive entries in the same write batch where the Raft
624
      // index does not strictly increase. We have a check in log_reader.cc for that.
625
      // batch only increasing Raft operation indexes.
626
0
      auto read_result = segment->ReadEntries();
627
0
      log::LogEntryBatchPB batch;
628
0
      OpId committed_op_id;
629
0
      int64_t last_index = -1;
630
631
0
      auto write_entry_batch = [
632
0
          &batch, &buffer, &num_entries, &new_segment, &read_result, &committed_op_id](
633
0
              bool last_batch_of_segment) -> Status {
634
0
        if (last_batch_of_segment) {
635
0
          read_result.committed_op_id.ToPB(batch.mutable_committed_op_id());
636
0
        } else if (committed_op_id.valid()) {
637
0
          committed_op_id.ToPB(batch.mutable_committed_op_id());
638
0
        }
639
0
        if (!read_result.entry_metadata.empty()) {
640
0
          batch.set_mono_time(read_result.entry_metadata.back().entry_time.ToUInt64());
641
0
        }
642
0
        buffer.clear();
643
0
        pb_util::AppendToString(batch, &buffer);
644
0
        num_entries += batch.entry().size();
645
0
        RETURN_NOT_OK(new_segment.WriteEntryBatch(Slice(buffer)));
646
0
        batch.clear_entry();
647
0
        return Status::OK();
648
0
      };
649
650
0
      auto add_entry = [&write_entry_batch, &batch, &last_index, &committed_op_id](
651
0
          std::unique_ptr<log::LogEntryPB> entry) -> Status {
652
0
        if (entry->has_replicate() && entry->replicate().id().index() <= last_index) {
653
0
          RETURN_NOT_OK(write_entry_batch(/* last_batch_of_segment= */ false));
654
0
        }
655
0
        last_index = entry->replicate().id().index();
656
0
        if (entry->has_replicate() &&
657
0
            entry->replicate().has_committed_op_id()) {
658
0
          committed_op_id = OpId::FromPB(entry->replicate().committed_op_id());
659
0
        }
660
0
        batch.mutable_entry()->AddAllocated(entry.release());
661
0
        return Status::OK();
662
0
      };
663
664
0
      for (auto& entry : read_result.entries) {
665
0
        auto& replicate = *entry->mutable_replicate();
666
0
        auto replicate_ht = HybridTime(replicate.hybrid_time());
667
0
        if (is_final_step) {
668
0
          replicate.set_hybrid_time(VERIFY_RESULT(add_delta(replicate_ht)));
669
0
        } else {
670
0
          delta_data.AddEarlyTime(replicate_ht);
671
0
        }
672
0
        if (replicate.has_transaction_state()) {
673
0
          auto& state = *replicate.mutable_transaction_state();
674
0
          if (state.status() == TransactionStatus::APPLYING) {
675
0
            auto commit_ht = HybridTime(state.commit_hybrid_time());
676
0
            if (is_final_step) {
677
0
              state.set_commit_hybrid_time(VERIFY_RESULT(add_delta(commit_ht)));
678
0
            } else {
679
0
              delta_data.AddEarlyTime(commit_ht);
680
0
            }
681
0
          }
682
0
        } else if (replicate.has_history_cutoff()) {
683
0
          auto& state = *replicate.mutable_history_cutoff();
684
0
          auto history_cutoff_ht = HybridTime(state.history_cutoff());
685
0
          if (is_final_step) {
686
0
            state.set_history_cutoff(VERIFY_RESULT(add_delta(history_cutoff_ht)));
687
0
          } else {
688
0
            delta_data.AddEarlyTime(history_cutoff_ht);
689
0
          }
690
0
        }
691
0
        if (is_final_step) {
692
0
          auto index = entry->replicate().id().index();
693
0
          min_replicate_index = std::min(min_replicate_index, index);
694
0
          max_replicate_index = std::max(max_replicate_index, index);
695
0
          RETURN_NOT_OK(add_entry(std::move(entry)));
696
0
        }
697
0
      }
698
699
0
      if (is_final_step) {
700
0
        RETURN_NOT_OK(write_entry_batch(/* last_batch_of_segment= */ true));
701
0
      }
702
0
      LOG(INFO) << "Step " << is_final_step << " processed " << batch.entry().size()
703
0
                << " entries in " << segment->path();
704
0
    }
705
0
    if (!is_final_step) {
706
0
      delta_data.FinalizeTimeMap();
707
0
    }
708
0
  }
709
710
0
  log::LogSegmentFooterPB footer;
711
0
  footer.set_num_entries(num_entries);
712
0
  if (num_entries) {
713
0
    footer.set_min_replicate_index(min_replicate_index);
714
0
    footer.set_max_replicate_index(max_replicate_index);
715
0
  }
716
717
0
  RETURN_NOT_OK(new_segment.WriteFooterAndClose(footer));
718
0
  return Env::Default()->RenameFile(tmp_segment_path, new_segment_path);
719
0
}
720
721
CHECKED_STATUS ChangeTimeInWalDirs(
722
    MonoDelta delta, HybridTime bound_time, size_t max_num_old_wal_entries,
723
0
    const std::vector<std::string>& dirs, TaskRunner* runner) {
724
0
  Env* env = Env::Default();
725
0
  std::vector<std::string> wal_dirs;
726
0
  auto callback = [&wal_dirs](
727
0
      Env::FileType type, const std::string& dirname, const std::string& fname) {
728
0
    if (type != Env::FileType::DIRECTORY_TYPE) {
729
0
      return Status::OK();
730
0
    }
731
0
    auto full_path = JoinPathSegments(dirname, fname);
732
0
    if (!regex_match(full_path, kTabletRegex)) {
733
0
      return Status::OK();
734
0
    }
735
0
    wal_dirs.push_back(full_path);
736
0
    return Status::OK();
737
0
  };
738
0
  for (const auto& dir : dirs) {
739
0
    RETURN_NOT_OK(env->Walk(dir, Env::DirectoryOrder::POST_ORDER, callback));
740
0
  }
741
0
  std::random_shuffle(wal_dirs.begin(), wal_dirs.end());
742
0
  for (const auto& dir : wal_dirs) {
743
0
    runner->Submit([delta, bound_time, max_num_old_wal_entries, dir] {
744
0
      return ChangeTimeInWalDir(delta, bound_time, max_num_old_wal_entries, dir);
745
0
    });
746
0
  }
747
0
  return Status::OK();
748
0
}
749
750
0
CHECKED_STATUS ChangeTimeExecute(const ChangeTimeArguments& args, bool subtract) {
751
0
  auto delta = VERIFY_RESULT(DateTime::IntervalFromString(args.delta));
752
0
  if (subtract) {
753
0
    delta = -delta;
754
0
  }
755
0
  HybridTime bound_time;
756
0
  if (!args.bound_time.empty()) {
757
0
    bound_time = VERIFY_RESULT(HybridTime::ParseHybridTime(args.bound_time)).AddDelta(-delta);
758
0
    if (bound_time < HybridTime::FromMicros(kYugaByteMicrosecondEpoch)) {
759
0
      return STATUS_FORMAT(
760
0
          InvalidArgument, "Wrong bound-time and delta combination: $0", bound_time);
761
0
    }
762
0
    LOG(INFO) << "Bound time before adding delta: " << bound_time.ToString();
763
0
  }
764
0
  TaskRunner runner;
765
0
  RETURN_NOT_OK(runner.Init(args.concurrency));
766
0
  RETURN_NOT_OK(ChangeTimeInDataFiles(
767
0
      delta, bound_time, args.max_num_old_wal_entries, SplitAndFlatten(args.data_dirs), args.debug,
768
0
      &runner));
769
0
  RETURN_NOT_OK(ChangeTimeInWalDirs(
770
0
      delta, bound_time, args.max_num_old_wal_entries, SplitAndFlatten(args.wal_dirs), &runner));
771
0
  return runner.Wait();
772
0
}
773
774
// ------------------------------------------------------------------------------------------------
775
// Add time command
776
// ------------------------------------------------------------------------------------------------
777
778
const std::string kAddTimeDescription = "Add time delta to physical time in SST files";
779
780
using AddTimeArguments = ChangeTimeArguments;
781
782
0
std::unique_ptr<OptionsDescription> AddTimeOptions() {
783
0
  return ChangeTimeOptions(kAddTimeDescription);
784
0
}
785
786
0
CHECKED_STATUS AddTimeExecute(const AddTimeArguments& args) {
787
0
  return ChangeTimeExecute(args, /* subtract= */ false);
788
0
}
789
790
// ------------------------------------------------------------------------------------------------
791
// Subtract time command
792
// ------------------------------------------------------------------------------------------------
793
794
const std::string kSubTimeDescription = "Subtract time delta from physical time in SST files";
795
796
using SubTimeArguments = ChangeTimeArguments;
797
798
0
std::unique_ptr<OptionsDescription> SubTimeOptions() {
799
0
  return ChangeTimeOptions(kSubTimeDescription);
800
0
}
801
802
0
CHECKED_STATUS SubTimeExecute(const SubTimeArguments& args) {
803
0
  return ChangeTimeExecute(args, /* subtract= */ true);
804
0
}
805
806
// ------------------------------------------------------------------------------------------------
807
// Apply patch
808
// ------------------------------------------------------------------------------------------------
809
810
const std::string kApplyPatchDescription = "Apply prepared SST files patch";
811
812
struct ApplyPatchArguments {
813
  std::vector<std::string> data_dirs;
814
  std::vector<std::string> wal_dirs;
815
  bool dry_run = false;
816
  bool revert = false;
817
};
818
819
0
std::unique_ptr<OptionsDescription> ApplyPatchOptions() {
820
0
  auto result = std::make_unique<OptionsDescriptionImpl<ApplyPatchArguments>>(
821
0
      kApplyPatchDescription);
822
0
  auto& args = result->args;
823
0
  result->desc.add_options()
824
0
      ("data-dirs", po::value(&args.data_dirs)->required(), "TServer data dirs")
825
0
      ("wal-dirs", po::value(&args.wal_dirs)->required(), "TServer WAL dirs")
826
0
      ("dry-run", po::bool_switch(&args.dry_run),
827
0
       "Do not make any changes to live data, only check that apply-patch would work. "
828
0
       "This might still involve making changes to files in .patched directories.")
829
0
      ("revert", po::bool_switch(&args.revert),
830
0
       "Revert a previous apply-patch operation. This will move backup RocksDB and WAL "
831
0
       "directories back to their live locations, and the live locations to .patched locations.");
832
0
  return result;
833
0
}
834
835
class ApplyPatch {
836
 public:
837
0
  CHECKED_STATUS Execute(const ApplyPatchArguments& args) {
838
0
    dry_run_ = args.dry_run;
839
0
    revert_ = args.revert;
840
0
    LOG(INFO) << "Running the ApplyPatch command";
841
0
    LOG(INFO) << "    data_dirs=" << yb::ToString(args.data_dirs);
842
0
    LOG(INFO) << "    wal_dirs=" << yb::ToString(args.data_dirs);
843
0
    LOG(INFO) << "    dry_run=" << args.dry_run;
844
0
    LOG(INFO) << "    revert=" << args.revert;
845
0
    for (const auto& dir : SplitAndFlatten(args.data_dirs)) {
846
0
      RETURN_NOT_OK(env_->Walk(
847
0
          dir, Env::DirectoryOrder::POST_ORDER,
848
0
          std::bind(&ApplyPatch::WalkDataCallback, this, _1, _2, _3)));
849
0
    }
850
851
0
    for (const auto& dir : SplitAndFlatten(args.wal_dirs)) {
852
0
      RETURN_NOT_OK(env_->Walk(
853
0
          dir, Env::DirectoryOrder::POST_ORDER,
854
0
          std::bind(&ApplyPatch::WalkWalCallback, this, _1, _2, _3)));
855
0
    }
856
857
0
    RocksDBHelper helper;
858
0
    auto options = helper.options();
859
0
    options.skip_stats_update_on_db_open = true;
860
861
0
    int num_revert_errors = 0;
862
863
0
    int num_dirs_handled = 0;
864
0
    for (const auto* dirs : {&data_dirs_, &wal_dirs_}) {
865
0
      for (const auto& dir : *dirs) {
866
0
        auto backup_path = dir + kBackupExtension;
867
0
        auto patched_path = dir + kPatchedExtension;
868
869
0
        if (revert_) {
870
0
          bool backup_dir_exists = env_->FileExists(backup_path);
871
0
          bool patched_path_exists = env_->FileExists(patched_path);
872
0
          if (backup_dir_exists && !patched_path_exists) {
873
0
            auto rename_status = ChainRename(backup_path, dir, patched_path);
874
0
            if (!rename_status.ok()) {
875
0
              LOG(INFO) << "Error during revert: " << rename_status;
876
0
              num_revert_errors++;
877
0
            }
878
0
          } else {
879
0
            LOG(INFO)
880
0
                << "Not attempting to restore " << backup_path << " to " << dir
881
0
                << " after moving " << dir << " back to " << patched_path
882
0
                << ": "
883
0
                << (backup_dir_exists ? "" : "backup path does not exist; ")
884
0
                << (patched_path_exists ? "patched path already exists" : "");
885
0
            num_revert_errors++;
886
0
          }
887
0
          continue;
888
0
        }
889
890
0
        if (dirs == &data_dirs_) {
891
0
          if (valid_rocksdb_dirs_.count(dir)) {
892
0
            LOG(INFO) << "Patching non-live RocksDB metadata in " << patched_path;
893
0
            docdb::RocksDBPatcher patcher(patched_path, options);
894
0
            RETURN_NOT_OK(patcher.Load());
895
0
            RETURN_NOT_OK(patcher.UpdateFileSizes());
896
0
            docdb::ConsensusFrontier frontier;
897
0
            frontier.set_hybrid_time(HybridTime::kMin);
898
0
            frontier.set_history_cutoff(HybridTime::FromMicros(kYugaByteMicrosecondEpoch));
899
0
            RETURN_NOT_OK(patcher.ModifyFlushedFrontier(frontier));
900
0
          } else {
901
0
            LOG(INFO) << "We did not see RocksDB CURRENT or MANIFEST-... files in "
902
0
                       << dir << ", skipping applying " << patched_path;
903
0
            continue;
904
0
          }
905
0
        }
906
907
0
        RETURN_NOT_OK(ChainRename(patched_path, dir, backup_path));
908
0
        num_dirs_handled++;
909
0
      }
910
0
    }
911
912
0
    LOG(INFO) << "Processed " << num_dirs_handled << " directories (two renames per each)";
913
0
    if (num_revert_errors) {
914
0
      return STATUS_FORMAT(
915
0
          IOError,
916
0
          "Encountered $0 errors when trying to revert an applied patch. "
917
0
          "Check the log above for details.",
918
0
          num_revert_errors);
919
0
    }
920
0
    return Status::OK();
921
0
  }
922
923
 private:
924
925
  // ----------------------------------------------------------------------------------------------
926
  // Functions for traversing RocksDB data directories
927
  // ----------------------------------------------------------------------------------------------
928
929
  CHECKED_STATUS WalkDataCallback(
930
0
      Env::FileType type, const std::string& dirname, const std::string& fname) {
931
0
    switch (type) {
932
0
      case Env::FileType::FILE_TYPE:
933
0
        return HandleDataFile(dirname, fname);
934
0
      case Env::FileType::DIRECTORY_TYPE:
935
0
        CheckDirectory(dirname, fname, &data_dirs_);
936
0
        return Status::OK();
937
0
    }
938
0
    FATAL_INVALID_ENUM_VALUE(Env::FileType, type);
939
0
  }
940
941
  // Handles a file found during walking through the a data (RocksDB) directory tree. Looks for
942
  // CURRENT and MANIFEST files and copies them to the corresponding .patched directory. Does not
943
  // modify live data of the cluster.
944
0
  CHECKED_STATUS HandleDataFile(const std::string& dirname, const std::string& fname) {
945
0
    if (revert_) {
946
      // We don't look at any of the manifest files during the revert operation.
947
0
      return Status::OK();
948
0
    }
949
950
0
    if (!regex_match(dirname, kTabletDbRegex)) {
951
0
      return Status::OK();
952
0
    }
953
0
    if (fname != "CURRENT" && !regex_match(fname, kManifestRegex)) {
954
0
      return Status::OK();
955
0
    }
956
0
    auto patched_dirname = dirname + kPatchedExtension;
957
0
    if (env_->DirExists(patched_dirname)) {
958
0
      valid_rocksdb_dirs_.insert(dirname);
959
0
      auto full_src_path = JoinPathSegments(dirname, fname);
960
0
      auto full_dst_path = JoinPathSegments(patched_dirname, fname);
961
0
      LOG(INFO) << "Copying file " << full_src_path << " to " << full_dst_path;
962
0
      Status copy_status = env_util::CopyFile(env_, full_src_path, full_dst_path);
963
0
      if (!copy_status.ok()) {
964
0
        LOG(INFO) << "Error copying file " << full_src_path << " to " << full_dst_path << ": "
965
0
                    << copy_status;
966
0
      }
967
0
      return copy_status;
968
0
    }
969
970
0
    LOG(INFO) << "Directory " << patched_dirname << " does not exist, not copying "
971
0
                << "the file " << fname << " there (this is not an error)";
972
0
    return Status::OK();
973
0
  }
974
975
  // ----------------------------------------------------------------------------------------------
976
  // Traversing WAL directories
977
  // ----------------------------------------------------------------------------------------------
978
979
  CHECKED_STATUS WalkWalCallback(
980
0
      Env::FileType type, const std::string& dirname, const std::string& fname) {
981
0
    if (type != Env::FileType::DIRECTORY_TYPE) {
982
0
      return Status::OK();
983
0
    }
984
0
    CheckDirectory(dirname, fname, &wal_dirs_);
985
0
    return Status::OK();
986
0
  }
987
988
  // ----------------------------------------------------------------------------------------------
989
  // Functions used for both RocksDB data and WALs
990
  // ----------------------------------------------------------------------------------------------
991
992
  // Look at the given directory, and if it is a patched directory (or a backup directory if we are
993
  // doing a revert operation), strip off the suffix and add the corresponding live directory to
994
  // the given vector.
995
  void CheckDirectory(
996
0
      const std::string& dirname, const std::string& fname, std::vector<std::string>* dirs) {
997
0
    const std::string& needed_extension = revert_ ? kBackupExtension : kPatchedExtension;
998
0
    if (!boost::ends_with(fname, needed_extension)) {
999
0
      return;
1000
0
    }
1001
0
    auto patched_path = JoinPathSegments(dirname, fname);
1002
0
    auto full_path = patched_path.substr(0, patched_path.size() - needed_extension.size());
1003
0
    if (!regex_match(full_path, kTabletDbRegex)) {
1004
0
      return;
1005
0
    }
1006
0
    dirs->push_back(full_path);
1007
0
    return;
1008
0
  }
1009
1010
  // Renames dir1 -> dir2 -> dir3, starting from the end of the chain.
1011
  CHECKED_STATUS ChainRename(
1012
0
      const std::string& dir1, const std::string& dir2, const std::string& dir3) {
1013
0
    RETURN_NOT_OK(SafeRename(dir2, dir3, /* check_dst_collision= */ true));
1014
1015
    // Don't check that dir2 does not exist, because we haven't actually moved dir2 to dir3 in the
1016
    // dry run mode.
1017
0
    return SafeRename(dir1, dir2, /* check_dst_collision= */ false);
1018
0
  }
1019
1020
  // A logging wrapper over directory renaming. In dry-run mode, checks for some errors, but
1021
  // check_dst_collision=false allows to skip ensuring that the destination does not exist.
1022
  CHECKED_STATUS SafeRename(
1023
0
      const std::string& src, const std::string& dst, bool check_dst_collision) {
1024
0
    if (dry_run_) {
1025
0
      if (!env_->FileExists(src)) {
1026
0
        return STATUS_FORMAT(
1027
0
            IOError, "Would fail to rename $0 to $1, source does not exist", src, dst);
1028
0
      }
1029
0
      if (check_dst_collision && env_->FileExists(dst)) {
1030
0
        return STATUS_FORMAT(
1031
0
            IOError, "Would fail to rename $0 to $1, destination already exists", src, dst);
1032
0
      }
1033
0
      LOG(INFO) << "Would rename " << src << " to " << dst;
1034
0
      return Status::OK();
1035
0
    }
1036
0
    LOG(INFO) << "Renaming " << src << " to " << dst;
1037
0
    Status s = env_->RenameFile(src, dst);
1038
0
    if (!s.ok()) {
1039
0
      LOG(ERROR) << "Error renaming " << src << " to " << dst << ": " << s;
1040
0
    }
1041
0
    return s;
1042
0
  }
1043
1044
  Env* env_ = Env::Default();
1045
  std::vector<std::string> data_dirs_;
1046
  std::vector<std::string> wal_dirs_;
1047
1048
  // The set of tablet RocksDB directories where we found CURRENT and MANIFEST-... files, indicating
1049
  // that there is a valid RocksDB database present.
1050
  std::set<std::string> valid_rocksdb_dirs_;
1051
1052
  bool dry_run_ = false;
1053
  bool revert_ = false;
1054
};
1055
1056
0
CHECKED_STATUS ApplyPatchExecute(const ApplyPatchArguments& args) {
1057
0
  ApplyPatch apply_patch;
1058
0
  return apply_patch.Execute(args);
1059
0
}
1060
1061
YB_TOOL_ARGUMENTS(DataPatcherAction, DATA_PATCHER_ACTIONS);
1062
1063
} // namespace tools
1064
} // namespace yb
1065
1066
18.6k
int main(int argc, char** argv) {
1067
18.6k
  yb::HybridTime::TEST_SetPrettyToString(true);
1068
1069
  // Setup flags to avoid unnecessary logging
1070
18.6k
  FLAGS_rocksdb_max_background_flushes = 1;
1071
18.6k
  FLAGS_rocksdb_max_background_compactions = 1;
1072
18.6k
  FLAGS_rocksdb_base_background_compactions = 1;
1073
18.6k
  FLAGS_priority_thread_pool_size = 1;
1074
1075
18.6k
  yb::InitGoogleLoggingSafeBasic(argv[0]);
1076
18.6k
  return yb::tools::ExecuteTool<yb::tools::DataPatcherAction>(argc, argv);
1077
18.6k
}