YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/tools/data-patcher.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
#include <iostream>
14
#include <regex>
15
16
#include "yb/common/doc_hybrid_time.h"
17
#include "yb/common/transaction.h"
18
19
#include "yb/consensus/log_index.h"
20
#include "yb/consensus/log_reader.h"
21
#include "yb/consensus/log_util.h"
22
23
#include "yb/docdb/consensus_frontier.h"
24
#include "yb/docdb/docdb_rocksdb_util.h"
25
#include "yb/docdb/docdb_types.h"
26
#include "yb/docdb/value_type.h"
27
#include "yb/docdb/kv_debug.h"
28
#include "yb/docdb/docdb-internal.h"
29
#include "yb/docdb/value.h"
30
31
#include "yb/fs/fs_manager.h"
32
33
#include "yb/gutil/stl_util.h"
34
35
#include "yb/rocksdb/db/builder.h"
36
#include "yb/rocksdb/db/dbformat.h"
37
#include "yb/rocksdb/db/filename.h"
38
#include "yb/rocksdb/db/version_set.h"
39
#include "yb/rocksdb/db/writebuffer.h"
40
#include "yb/rocksdb/table/block_based_table_reader.h"
41
#include "yb/rocksdb/table/internal_iterator.h"
42
#include "yb/rocksdb/table/table_builder.h"
43
#include "yb/rocksdb/util/file_reader_writer.h"
44
45
#include "yb/tools/tool_arguments.h"
46
47
#include "yb/util/bytes_formatter.h"
48
#include "yb/util/date_time.h"
49
#include "yb/util/env.h"
50
#include "yb/util/env_util.h"
51
#include "yb/util/enums.h"
52
#include "yb/util/logging.h"
53
#include "yb/util/metrics.h"
54
#include "yb/util/path_util.h"
55
#include "yb/util/pb_util.h"
56
#include "yb/util/result.h"
57
#include "yb/util/scope_exit.h"
58
#include "yb/util/status.h"
59
#include "yb/util/status_format.h"
60
#include "yb/util/threadpool.h"
61
#include "yb/util/tostring.h"
62
#include "yb/util/string_util.h"
63
64
using namespace std::placeholders;
65
namespace po = boost::program_options;
66
67
DECLARE_int32(rocksdb_max_background_flushes);
68
DECLARE_int32(rocksdb_base_background_compactions);
69
DECLARE_int32(rocksdb_max_background_compactions);
70
DECLARE_int32(priority_thread_pool_size);
71
72
namespace yb {
73
namespace tools {
74
75
const std::regex kTabletRegex(".*/tablet-[0-9a-f]{32}");
76
const std::regex kTabletDbRegex(".*/tablet-[0-9a-f]{32}(?:\\.intents)?");
77
const std::regex kManifestRegex("MANIFEST-[0-9]{6}");
78
const std::string kTmpExtension = ".tmp";
79
const std::string kPatchedExtension = ".patched";
80
const std::string kBackupExtension = ".apply-patch-backup";
81
const std::string kIntentsExtension = ".intents";
82
const std::string kToolName = "Data Patcher";
83
const std::string kLogPrefix = kToolName + ": ";
84
85
using docdb::StorageDbType;
86
87
#define DATA_PATCHER_ACTIONS (Help)(AddTime)(SubTime)(ApplyPatch)
88
89
YB_DEFINE_ENUM(DataPatcherAction, DATA_PATCHER_ACTIONS);
90
YB_DEFINE_ENUM(FileType, (kSST)(kWAL));
91
92
const std::string kHelpDescription = "Show help on command";
93
94
// ------------------------------------------------------------------------------------------------
95
// Help command
96
// ------------------------------------------------------------------------------------------------
97
98
struct HelpArguments {
99
  std::string command;
100
};
101
102
0
std::unique_ptr<OptionsDescription> HelpOptions() {
103
0
  auto result = std::make_unique<OptionsDescriptionImpl<HelpArguments>>(kHelpDescription);
104
0
  result->positional.add("command", 1);
105
0
  result->hidden.add_options()
106
0
      ("command", po::value(&result->args.command));
107
0
  return result;
108
0
}
109
110
0
CHECKED_STATUS HelpExecute(const HelpArguments& args) {
111
0
  if (args.command.empty()) {
112
0
    ShowCommands<DataPatcherAction>();
113
0
    return Status::OK();
114
0
  }
115
116
0
  ShowHelp(VERIFY_RESULT(ActionByName<DataPatcherAction>(args.command)));
117
0
  return Status::OK();
118
0
}
119
120
// ------------------------------------------------------------------------------------------------
121
// Utility functions used add/subtract commands as well as in the apply patch command
122
// ------------------------------------------------------------------------------------------------
123
124
// ------------------------------------------------------------------------------------------------
125
// Common implementation for "add time" and "subtract time" commands
126
// ------------------------------------------------------------------------------------------------
127
128
struct ChangeTimeArguments {
129
  std::string delta;
130
  std::string bound_time;
131
  std::vector<std::string> data_dirs;
132
  std::vector<std::string> wal_dirs;
133
  // When concurrency is positive, limit number of concurrent jobs to it.
134
  // For zero, do not limit the number of concurrent jobs.
135
  int concurrency = 0;
136
  size_t max_num_old_wal_entries = 0;
137
  bool debug = false;
138
139
0
  std::string ToString() {
140
0
    return YB_STRUCT_TO_STRING(
141
0
        delta, bound_time, data_dirs, wal_dirs, concurrency, max_num_old_wal_entries, debug);
142
0
  }
143
};
144
145
0
std::unique_ptr<OptionsDescription> ChangeTimeOptions(const std::string& caption) {
146
0
  auto result = std::make_unique<OptionsDescriptionImpl<ChangeTimeArguments>>(caption);
147
0
  auto& args = result->args;
148
0
  result->desc.add_options()
149
0
      ("delta", po::value(&args.delta)->required(),
150
0
       "Delta to add/subtract. For instance 2:30, to use two and a half hour as delta.")
151
0
      ("bound-time", po::value(&args.bound_time)->required(),
152
0
       "All new time before this bound will be ordered and assigned new time keeping the order. "
153
0
           "For instance: 2021-12-02T20:24:07.649298.")
154
0
      ("data-dirs", po::value(&args.data_dirs), "TServer data dirs.")
155
0
      ("wal-dirs", po::value(&args.wal_dirs),
156
0
       "TServer WAL dirs. Not recommended to use while node process is running.")
157
0
      ("concurrency", po::value(&args.concurrency)->default_value(-1),
158
0
       "Max number of concurrent jobs, if this number is less or equals to 0 then number of "
159
0
           "concurrent jobs is unlimited.")
160
0
      ("max-num-old-wal-entries",
161
0
       po::value(&args.max_num_old_wal_entries)->default_value(1000000000L),
162
0
       "Maximum number of WAL entries allowed with timestamps that cannot be shifted by the delta, "
163
0
       "and therefore have to be re-mapped differently.")
164
0
      ("debug", po::bool_switch(&args.debug),
165
0
       "Output detailed debug information. Only practical for a small amount of data.");
166
0
  return result;
167
0
}
168
169
class RocksDBHelper {
170
 public:
171
0
  RocksDBHelper() : immutable_cf_options_(options_) {
172
0
    docdb::InitRocksDBOptions(
173
0
        &options_, kLogPrefix, /* statistics= */ nullptr, tablet_options_, table_options_);
174
0
    internal_key_comparator_ = std::make_shared<rocksdb::InternalKeyComparator>(
175
0
        options_.comparator);
176
0
  }
177
178
0
  Result<std::unique_ptr<rocksdb::TableReader>> NewTableReader(const std::string& fname) {
179
0
    uint64_t base_file_size;
180
0
    auto file_reader = VERIFY_RESULT(NewFileReader(fname, &base_file_size));
181
182
0
    std::unique_ptr<rocksdb::TableReader> table_reader;
183
0
    RETURN_NOT_OK(rocksdb::BlockBasedTable::Open(
184
0
      immutable_cf_options_, env_options_, table_options_, internal_key_comparator_,
185
0
      std::move(file_reader), base_file_size, &table_reader
186
0
    ));
187
188
0
    auto data_fname = rocksdb::TableBaseToDataFileName(fname);
189
0
    auto data_file_reader = VERIFY_RESULT(NewFileReader(data_fname));
190
0
    table_reader->SetDataFileReader(std::move(data_file_reader));
191
0
    return table_reader;
192
0
  }
193
194
  std::unique_ptr<rocksdb::TableBuilder> NewTableBuilder(
195
      rocksdb::WritableFileWriter* base_file_writer,
196
0
      rocksdb::WritableFileWriter* data_file_writer) {
197
0
      rocksdb::ImmutableCFOptions immutable_cf_options(options_);
198
0
     return std::unique_ptr<rocksdb::TableBuilder>(rocksdb::NewTableBuilder(
199
0
        immutable_cf_options, internal_key_comparator_, int_tbl_prop_collector_factories_,
200
0
        /* column_family_id= */ 0, base_file_writer, data_file_writer,
201
0
        rocksdb::CompressionType::kSnappyCompression, rocksdb::CompressionOptions()));
202
0
  }
203
204
  Result<std::unique_ptr<rocksdb::RandomAccessFileReader>> NewFileReader(
205
0
      const std::string& fname, uint64_t* file_size = nullptr) {
206
0
    std::unique_ptr<rocksdb::RandomAccessFile> input_file;
207
0
    RETURN_NOT_OK(rocksdb::Env::Default()->NewRandomAccessFile(fname, &input_file, env_options_));
208
0
    if (file_size) {
209
0
      *file_size = VERIFY_RESULT(input_file->Size());
210
0
    }
211
0
    return std::make_unique<rocksdb::RandomAccessFileReader>(std::move(input_file));
212
0
  }
213
214
0
  Result<std::unique_ptr<rocksdb::WritableFileWriter>> NewFileWriter(const std::string& fname) {
215
0
    std::unique_ptr<rocksdb::WritableFile> file;
216
0
    RETURN_NOT_OK(NewWritableFile(rocksdb::Env::Default(), fname, &file, env_options_));
217
0
    return std::make_unique<rocksdb::WritableFileWriter>(std::move(file), env_options_);
218
0
  }
219
220
0
  rocksdb::EnvOptions& env_options() {
221
0
    return env_options_;
222
0
  }
223
224
0
  rocksdb::Options& options() {
225
0
    return options_;
226
0
  }
227
228
 private:
229
  rocksdb::EnvOptions env_options_;
230
  tablet::TabletOptions tablet_options_;
231
232
  rocksdb::BlockBasedTableOptions table_options_;
233
  rocksdb::Options options_;
234
  rocksdb::ImmutableCFOptions immutable_cf_options_;
235
236
  rocksdb::InternalKeyComparatorPtr internal_key_comparator_;
237
  rocksdb::IntTblPropCollectorFactories int_tbl_prop_collector_factories_;
238
};
239
240
struct DeltaData {
241
  MonoDelta delta;
242
  HybridTime bound_time;
243
  size_t max_num_old_wal_entries = 0;
244
245
  std::vector<MicrosTime> early_times;
246
  MicrosTime min_micros_to_keep = 0;
247
  MicrosTime base_micros = 0;
248
  bool time_map_ready = false;
249
250
  DeltaData(MonoDelta delta_, HybridTime bound_time_, size_t max_num_old_wal_entries_)
251
      : delta(delta_),
252
        bound_time(bound_time_),
253
0
        max_num_old_wal_entries(max_num_old_wal_entries_) {
254
0
    CHECK_GE(max_num_old_wal_entries, 0);
255
0
  }
256
257
0
  void AddEarlyTime(HybridTime time) {
258
0
    CHECK(!time_map_ready);
259
0
    if (time <= bound_time) {
260
0
      early_times.push_back(time.GetPhysicalValueMicros());
261
0
    }
262
0
  }
263
264
0
  void FinalizeTimeMap() {
265
0
    std::sort(early_times.begin(), early_times.end());
266
0
    Unique(&early_times);
267
268
    // All old WAL timestamps will be mapped to
269
    // [base_micros, bound_time.GetPhysicalValueMicros()), assuming there are no more
270
    // than max_num_old_wal_entries of them.
271
    //
272
    // Old SST timestamps will be left as is if their microsecond component is min_micros_to_keep
273
    // or less, or mapped to the range [base_micros - early_times.size(), base_micros - 1]
274
    // otherwise, based on their relative position in the early_times array.
275
276
0
    base_micros = bound_time.GetPhysicalValueMicros() - max_num_old_wal_entries;
277
0
    min_micros_to_keep = base_micros - early_times.size() - 1;
278
0
    time_map_ready = true;
279
0
  }
280
281
0
  HybridTime NewEarlyTime(HybridTime ht, FileType file_type) const {
282
0
    CHECK(time_map_ready);
283
0
    if (ht.GetPhysicalValueMicros() <= min_micros_to_keep)
284
0
      return ht;
285
286
0
    const auto micros = ht.GetPhysicalValueMicros();
287
0
    auto it = std::lower_bound(early_times.begin(), early_times.end(), micros);
288
0
    CHECK_EQ(*it, micros);
289
0
    switch (file_type) {
290
0
      case FileType::kSST: {
291
0
        auto distance_from_end = early_times.end() - it;
292
0
        auto new_micros = base_micros - distance_from_end;
293
0
        CHECK_LT(new_micros, base_micros);
294
0
        return HybridTime(new_micros, ht.GetLogicalValue());
295
0
      }
296
0
      case FileType::kWAL: {
297
0
        auto distance_from_start = it - early_times.begin();
298
0
        auto new_micros = base_micros + distance_from_start;
299
0
        CHECK_GE(new_micros, base_micros);
300
0
        CHECK_LE(new_micros, bound_time.GetPhysicalValueMicros());
301
0
        return HybridTime(new_micros, ht.GetLogicalValue());
302
0
      }
303
0
    }
304
0
    FATAL_INVALID_ENUM_VALUE(FileType, file_type);
305
0
  }
306
307
0
  Result<HybridTime> AddDelta(HybridTime ht, FileType file_type) const {
308
0
    int64_t microseconds = ht.GetPhysicalValueMicros();
309
0
    int64_t bound_microseconds = bound_time.GetPhysicalValueMicros();
310
0
    if (microseconds <= bound_microseconds) {
311
0
      return NewEarlyTime(ht, file_type);
312
0
    }
313
314
0
    int64_t delta_microseconds = delta.ToMicroseconds();
315
0
    SCHECK_GE(static_cast<int64_t>(microseconds - kYugaByteMicrosecondEpoch), -delta_microseconds,
316
0
              InvalidArgument, Format("Hybrid time underflow: $0 + $1", ht, delta));
317
0
    int64_t kMaxMilliseconds = HybridTime::kMax.GetPhysicalValueMicros();
318
0
    SCHECK_GT(kMaxMilliseconds - microseconds, delta_microseconds, InvalidArgument,
319
0
              Format("Hybrid time overflow: $0 + $1", ht, delta));
320
0
    microseconds += delta_microseconds;
321
0
    return HybridTime::FromMicrosecondsAndLogicalValue(microseconds, ht.GetLogicalValue());
322
0
  }
323
324
0
  Result<DocHybridTime> AddDelta(DocHybridTime doc_ht, FileType file_type) const {
325
0
    return DocHybridTime(
326
0
        VERIFY_RESULT(AddDelta(doc_ht.hybrid_time(), file_type)), doc_ht.write_id());
327
0
  }
328
329
0
  Result<char*> AddDeltaToSstKey(Slice key, std::vector<char>* buffer) const {
330
0
    auto doc_ht = VERIFY_RESULT(DocHybridTime::DecodeFromEnd(&key));
331
0
    buffer->resize(std::max(buffer->size(), key.size() + kMaxBytesPerEncodedHybridTime));
332
0
    memcpy(buffer->data(), key.data(), key.size());
333
0
    auto new_doc_ht = VERIFY_RESULT(AddDelta(doc_ht, FileType::kSST));
334
0
    return new_doc_ht.EncodedInDocDbFormat(buffer->data() + key.size());
335
0
  }
336
337
};
338
339
CHECKED_STATUS AddDeltaToSstFile(
340
    const std::string& fname, MonoDelta delta, HybridTime bound_time,
341
0
    size_t max_num_old_wal_entries, bool debug, RocksDBHelper* helper) {
342
0
  LOG(INFO) << "Patching: " << fname << ", " << static_cast<const void*>(&fname);
343
344
0
  constexpr size_t kKeySuffixLen = 8;
345
0
  const auto storage_db_type =
346
0
      boost::ends_with(DirName(fname), kIntentsExtension) ? StorageDbType::kIntents
347
0
                                                          : StorageDbType::kRegular;
348
349
0
  auto table_reader = VERIFY_RESULT(helper->NewTableReader(fname));
350
351
0
  rocksdb::ReadOptions read_options;
352
0
  std::unique_ptr<rocksdb::InternalIterator> iterator(table_reader->NewIterator(read_options));
353
0
  auto data_fname = rocksdb::TableBaseToDataFileName(fname);
354
355
0
  auto out_dir = DirName(fname) + kPatchedExtension;
356
0
  auto& env = *Env::Default();
357
0
  RETURN_NOT_OK(env.CreateDirs(out_dir));
358
359
0
  const auto new_fname = JoinPathSegments(out_dir, BaseName(fname));
360
0
  const auto new_data_fname = rocksdb::TableBaseToDataFileName(new_fname);
361
0
  const auto tmp_fname = new_fname + kTmpExtension;
362
0
  const auto tmp_data_fname = new_data_fname + kTmpExtension;
363
0
  size_t num_entries = 0;
364
0
  size_t total_file_size = 0;
365
366
0
  {
367
0
    auto base_file_writer = VERIFY_RESULT(helper->NewFileWriter(tmp_fname));
368
0
    auto data_file_writer = VERIFY_RESULT(helper->NewFileWriter(tmp_data_fname));
369
370
0
    auto builder = helper->NewTableBuilder(base_file_writer.get(), data_file_writer.get());
371
0
    const auto add_kv = [&builder, debug, storage_db_type](const Slice& k, const Slice& v) {
372
0
      if (debug) {
373
0
        const Slice user_key(k.data(), k.size() - kKeySuffixLen);
374
0
        auto key_type = docdb::GetKeyType(user_key, storage_db_type);
375
0
        auto rocksdb_value_type = static_cast<rocksdb::ValueType>(*(k.end() - kKeySuffixLen));
376
0
        LOG(INFO) << "DEBUG: output KV pair "
377
0
                  << "(db_type=" << storage_db_type
378
0
                  << ", key_type=" << key_type
379
0
                  << ", rocksdb_value_type=" << static_cast<uint64_t>(rocksdb_value_type)
380
0
                  << "): "
381
0
                  << docdb::DocDBKeyToDebugStr(user_key, storage_db_type)
382
0
                  << " => "
383
0
                  << docdb::DocDBValueToDebugStr(key_type, user_key, v);
384
0
      }
385
0
      builder->Add(k, v);
386
0
    };
387
388
0
    bool done = false;
389
0
    auto se = ScopeExit([&builder, &done] {
390
0
      if (!done) {
391
0
        builder->Abandon();
392
0
      }
393
0
    });
394
395
0
    DeltaData delta_data(delta, bound_time, max_num_old_wal_entries);
396
0
    std::vector<char> buffer(0x100);
397
0
    faststring txn_metadata_buffer;
398
399
0
    std::string value_buffer;
400
401
0
    for (int is_final_pass = bound_time ? 0 : 1; is_final_pass != 2; ++is_final_pass) {
402
0
      LOG(INFO) << "Performing the " << (is_final_pass ? "final" : "initial") << " pass for "
403
0
                << "file " << fname;
404
0
      for (iterator->SeekToFirst(); iterator->Valid(); iterator->Next()) {
405
0
        const Slice key = iterator->key();
406
0
        const auto rocksdb_value_type =
407
0
            static_cast<rocksdb::ValueType>(*(key.end() - kKeySuffixLen));
408
0
        if (storage_db_type == StorageDbType::kRegular ||
409
0
            key[0] != docdb::ValueTypeAsChar::kTransactionId) {
410
          // Regular DB entry, or a normal intent entry (not txn metadata or reverse index).
411
          // Update the timestamp at the end of the key.
412
0
          const auto key_without_suffix = key.WithoutSuffix(kKeySuffixLen);
413
414
0
          bool value_updated = false;
415
0
          if (storage_db_type == StorageDbType::kRegular) {
416
0
            docdb::Value docdb_value;
417
0
            RETURN_NOT_OK(docdb_value.Decode(iterator->value()));
418
0
            if (docdb_value.intent_doc_ht().is_valid()) {
419
0
              auto intent_ht = docdb_value.intent_doc_ht().hybrid_time();
420
0
              if (is_final_pass) {
421
0
                DocHybridTime new_intent_doc_ht(
422
0
                    VERIFY_RESULT(delta_data.AddDelta(intent_ht, FileType::kSST)),
423
0
                    docdb_value.intent_doc_ht().write_id());
424
0
                docdb_value.set_intent_doc_ht(new_intent_doc_ht);
425
0
                value_buffer.clear();
426
0
                docdb_value.EncodeAndAppend(&value_buffer);
427
0
                value_updated = true;
428
0
              } else {
429
0
                delta_data.AddEarlyTime(intent_ht);
430
0
              }
431
0
            }
432
0
          }
433
434
0
          if (is_final_pass) {
435
0
            auto end = VERIFY_RESULT(delta_data.AddDeltaToSstKey(key_without_suffix, &buffer));
436
0
            memcpy(end, key_without_suffix.end(), kKeySuffixLen);
437
0
            end += kKeySuffixLen;
438
0
            add_kv(Slice(buffer.data(), end), value_updated ? value_buffer : iterator->value());
439
0
          } else {
440
0
            Slice key_without_suffix_copy = key_without_suffix;
441
0
            auto doc_ht = VERIFY_RESULT(DocHybridTime::DecodeFromEnd(&key_without_suffix_copy));
442
0
            delta_data.AddEarlyTime(doc_ht.hybrid_time());
443
0
          }
444
0
          continue;
445
0
        }
446
447
        // Now we know this is the intents DB and the key starts with a transaction id.
448
        // In this case, we only modify the value, and never modify the key.
449
450
0
        if (rocksdb_value_type != rocksdb::ValueType::kTypeValue) {
451
0
          if (is_final_pass) {
452
0
            add_kv(key, iterator->value());
453
0
          }
454
0
          continue;
455
0
        }
456
457
        // Transaction metadata record.
458
0
        if (key.size() == 1 + TransactionId::StaticSize() + kKeySuffixLen) {
459
          // We do not modify the key in this case, only the value.
460
461
          // Modify transaction start time stored in metadata.
462
0
          TransactionMetadataPB metadata_pb;
463
0
          const auto v = iterator->value();
464
0
          if (!metadata_pb.ParseFromArray(v.data(), narrow_cast<int>(v.size()))) {
465
0
            return STATUS_FORMAT(Corruption, "Bad txn metadata: $0", v.ToDebugHexString());
466
0
          }
467
468
0
          if (is_final_pass) {
469
0
            const auto new_start_ht = VERIFY_RESULT(delta_data.AddDelta(
470
0
                HybridTime(metadata_pb.start_hybrid_time()), FileType::kSST));
471
0
            metadata_pb.set_start_hybrid_time(new_start_ht.ToUint64());
472
0
            txn_metadata_buffer.clear();
473
0
            pb_util::SerializeToString(metadata_pb, &txn_metadata_buffer);
474
0
            add_kv(key, txn_metadata_buffer);
475
0
          } else {
476
0
            delta_data.AddEarlyTime(HybridTime(metadata_pb.start_hybrid_time()));
477
0
          }
478
0
          continue;
479
0
        }
480
481
        // Transaction reverse index. We do not modify the timestamp at the end of the key because
482
        // it does not matter if it is shifted, only the relative order of those timestamps matters.
483
        // We do modify the timestamp stored at the end of the encoded value, because the value is
484
        // the intent key.
485
0
        if (is_final_pass) {
486
0
          auto value_end = VERIFY_RESULT_PREPEND(
487
0
              delta_data.AddDeltaToSstKey(iterator->value(), &buffer),
488
0
              Format("Intent key $0, value: $1, filename: $2",
489
0
                      iterator->key().ToDebugHexString(), iterator->value().ToDebugHexString(),
490
0
                      fname));
491
0
          add_kv(iterator->key(), Slice(buffer.data(), value_end));
492
0
        } else {
493
0
          auto value = iterator->value();
494
0
          auto doc_ht_result = DocHybridTime::DecodeFromEnd(&value);
495
0
          if (!doc_ht_result.ok()) {
496
0
            LOG(INFO)
497
0
                << "Failed to decode hybrid time from the end of value for "
498
0
                << "key " << key.ToDebugHexString() << " (" << FormatSliceAsStr(key) << "), "
499
0
                << "value " << value.ToDebugHexString() << " (" << FormatSliceAsStr(value) << "), "
500
0
                << "decoded value " << DocDBValueToDebugStr(
501
0
                    docdb::KeyType::kReverseTxnKey, iterator->key(), iterator->value());
502
0
            return doc_ht_result.status();
503
0
          }
504
0
          delta_data.AddEarlyTime(doc_ht_result->hybrid_time());
505
0
        }
506
0
      }
507
508
0
      if (is_final_pass) {
509
0
        done = true;
510
0
        RETURN_NOT_OK(builder->Finish());
511
0
        num_entries = builder->NumEntries();
512
0
        total_file_size = builder->TotalFileSize();
513
0
      } else {
514
0
        delta_data.FinalizeTimeMap();
515
0
      }
516
0
    }
517
0
  }
518
519
0
  RETURN_NOT_OK(env.RenameFile(tmp_data_fname, new_data_fname));
520
0
  RETURN_NOT_OK(env.RenameFile(tmp_fname, new_fname));
521
0
  LOG(INFO) << "Generated: " << new_fname << ", with: " << num_entries
522
0
            << " entries and size: " << HumanizeBytes(total_file_size);
523
524
0
  return Status::OK();
525
0
}
526
527
// Checks if the given file in the given directory is an SSTable file that we need to process.
528
// That means it belong to a valid tablet directory and that we haven't processed it before.
529
void CheckDataFile(
530
0
    const std::string& dirname, const std::string& fname, std::vector<std::string>* out) {
531
0
  if (!regex_match(dirname, kTabletDbRegex)) {
532
0
    return;
533
0
  }
534
0
  if (!boost::ends_with(fname, ".sst")) {
535
0
    return;
536
0
  }
537
0
  if (Env::Default()->FileExists(JoinPathSegments(dirname + kPatchedExtension, fname))) {
538
0
    return;
539
0
  }
540
0
  auto full_path = JoinPathSegments(dirname, fname);
541
0
  out->push_back(full_path);
542
0
}
543
544
CHECKED_STATUS ChangeTimeInDataFiles(
545
    MonoDelta delta, HybridTime bound_time, size_t max_num_old_wal_entries,
546
0
    const std::vector<std::string>& dirs, bool debug, TaskRunner* runner) {
547
0
  std::vector<std::string> files_to_process;
548
0
  Env* env = Env::Default();
549
0
  auto callback = [&files_to_process, env](
550
0
      Env::FileType type, const std::string& dirname, const std::string& fname) -> Status {
551
0
    if (type == Env::FileType::FILE_TYPE) {
552
0
      CheckDataFile(dirname, fname, &files_to_process);
553
0
    } else {
554
0
      auto full_path = JoinPathSegments(dirname, fname);
555
0
      if (regex_match(full_path, kTabletDbRegex)) {
556
0
        RETURN_NOT_OK(env->CreateDirs(full_path + kPatchedExtension));
557
0
      }
558
0
    }
559
0
    return Status::OK();
560
0
  };
561
0
  for (const auto& dir : dirs) {
562
0
    RETURN_NOT_OK(env->Walk(dir, Env::DirectoryOrder::POST_ORDER, callback));
563
0
  }
564
0
  std::random_shuffle(files_to_process.begin(), files_to_process.end());
565
0
  for (const auto& fname : files_to_process) {
566
0
    runner->Submit([fname, delta, bound_time, max_num_old_wal_entries, debug]() {
567
0
      RocksDBHelper helper;
568
0
      return AddDeltaToSstFile(fname, delta, bound_time, max_num_old_wal_entries, debug, &helper);
569
0
    });
570
0
  }
571
0
  return Status::OK();
572
0
}
573
574
CHECKED_STATUS ChangeTimeInWalDir(
575
    MonoDelta delta, HybridTime bound_time, size_t max_num_old_wal_entries,
576
0
    const std::string& dir) {
577
0
  auto env = Env::Default();
578
0
  auto log_index = make_scoped_refptr<log::LogIndex>(dir);
579
0
  std::unique_ptr<log::LogReader> log_reader;
580
0
  RETURN_NOT_OK(log::LogReader::Open(
581
0
      env, log_index, kLogPrefix, dir, /* table_metric_entity= */ nullptr,
582
0
      /* tablet_metric_entity= */ nullptr, &log_reader));
583
0
  log::SegmentSequence segments;
584
0
  RETURN_NOT_OK(log_reader->GetSegmentsSnapshot(&segments));
585
0
  auto patched_dir = dir + kPatchedExtension;
586
0
  RETURN_NOT_OK(env->CreateDirs(patched_dir));
587
0
  auto new_segment_path = JoinPathSegments(patched_dir, FsManager::GetWalSegmentFileName(1));
588
0
  auto tmp_segment_path = new_segment_path + kTmpExtension;
589
0
  std::shared_ptr<WritableFile> new_segment_file;
590
0
  {
591
0
    std::unique_ptr<WritableFile> writable_file;
592
0
    RETURN_NOT_OK(env->NewWritableFile(tmp_segment_path, &writable_file));
593
0
    new_segment_file = std::move(writable_file);
594
0
  }
595
0
  log::WritableLogSegment new_segment(/* path= */ "", new_segment_file);
596
597
  // Set up the new header and footer.
598
0
  log::LogSegmentHeaderPB header;
599
0
  header.set_major_version(log::kLogMajorVersion);
600
0
  header.set_minor_version(log::kLogMinorVersion);
601
0
  header.set_sequence_number(1);
602
0
  header.set_unused_tablet_id("TABLET ID");
603
0
  header.mutable_unused_schema();
604
605
0
  RETURN_NOT_OK(new_segment.WriteHeaderAndOpen(header));
606
607
  // Set up the new footer. This will be maintained as the segment is written.
608
0
  size_t num_entries = 0;
609
0
  int64_t min_replicate_index = std::numeric_limits<int64_t>::max();
610
0
  int64_t max_replicate_index = std::numeric_limits<int64_t>::min();
611
612
0
  faststring buffer;
613
0
  DeltaData delta_data(delta, bound_time, max_num_old_wal_entries);
614
0
  auto add_delta = [&delta_data](HybridTime ht) -> Result<uint64_t> {
615
0
    return VERIFY_RESULT(delta_data.AddDelta(ht, FileType::kWAL)).ToUint64();
616
0
  };
617
618
0
  for (int is_final_step = bound_time ? 0 : 1; is_final_step != 2; ++is_final_step) {
619
0
    for (const auto& segment : segments) {
620
      // Read entry batches of a WAL segment, and write as few entry batches as possible, but still
621
      // make sure that we don't create consecutive entries in the same write batch where the Raft
622
      // index does not strictly increase. We have a check in log_reader.cc for that.
623
      // batch only increasing Raft operation indexes.
624
0
      auto read_result = segment->ReadEntries();
625
0
      log::LogEntryBatchPB batch;
626
0
      OpId committed_op_id;
627
0
      int64_t last_index = -1;
628
629
0
      auto write_entry_batch = [
630
0
          &batch, &buffer, &num_entries, &new_segment, &read_result, &committed_op_id](
631
0
              bool last_batch_of_segment) -> Status {
632
0
        if (last_batch_of_segment) {
633
0
          read_result.committed_op_id.ToPB(batch.mutable_committed_op_id());
634
0
        } else if (committed_op_id.valid()) {
635
0
          committed_op_id.ToPB(batch.mutable_committed_op_id());
636
0
        }
637
0
        if (!read_result.entry_metadata.empty()) {
638
0
          batch.set_mono_time(read_result.entry_metadata.back().entry_time.ToUInt64());
639
0
        }
640
0
        buffer.clear();
641
0
        pb_util::AppendToString(batch, &buffer);
642
0
        num_entries += batch.entry().size();
643
0
        RETURN_NOT_OK(new_segment.WriteEntryBatch(Slice(buffer)));
644
0
        batch.clear_entry();
645
0
        return Status::OK();
646
0
      };
647
648
0
      auto add_entry = [&write_entry_batch, &batch, &last_index, &committed_op_id](
649
0
          std::unique_ptr<log::LogEntryPB> entry) -> Status {
650
0
        if (entry->has_replicate() && entry->replicate().id().index() <= last_index) {
651
0
          RETURN_NOT_OK(write_entry_batch(/* last_batch_of_segment= */ false));
652
0
        }
653
0
        last_index = entry->replicate().id().index();
654
0
        if (entry->has_replicate() &&
655
0
            entry->replicate().has_committed_op_id()) {
656
0
          committed_op_id = OpId::FromPB(entry->replicate().committed_op_id());
657
0
        }
658
0
        batch.mutable_entry()->AddAllocated(entry.release());
659
0
        return Status::OK();
660
0
      };
661
662
0
      for (auto& entry : read_result.entries) {
663
0
        auto& replicate = *entry->mutable_replicate();
664
0
        auto replicate_ht = HybridTime(replicate.hybrid_time());
665
0
        if (is_final_step) {
666
0
          replicate.set_hybrid_time(VERIFY_RESULT(add_delta(replicate_ht)));
667
0
        } else {
668
0
          delta_data.AddEarlyTime(replicate_ht);
669
0
        }
670
0
        if (replicate.has_transaction_state()) {
671
0
          auto& state = *replicate.mutable_transaction_state();
672
0
          if (state.status() == TransactionStatus::APPLYING) {
673
0
            auto commit_ht = HybridTime(state.commit_hybrid_time());
674
0
            if (is_final_step) {
675
0
              state.set_commit_hybrid_time(VERIFY_RESULT(add_delta(commit_ht)));
676
0
            } else {
677
0
              delta_data.AddEarlyTime(commit_ht);
678
0
            }
679
0
          }
680
0
        } else if (replicate.has_history_cutoff()) {
681
0
          auto& state = *replicate.mutable_history_cutoff();
682
0
          auto history_cutoff_ht = HybridTime(state.history_cutoff());
683
0
          if (is_final_step) {
684
0
            state.set_history_cutoff(VERIFY_RESULT(add_delta(history_cutoff_ht)));
685
0
          } else {
686
0
            delta_data.AddEarlyTime(history_cutoff_ht);
687
0
          }
688
0
        }
689
0
        if (is_final_step) {
690
0
          auto index = entry->replicate().id().index();
691
0
          min_replicate_index = std::min(min_replicate_index, index);
692
0
          max_replicate_index = std::max(max_replicate_index, index);
693
0
          RETURN_NOT_OK(add_entry(std::move(entry)));
694
0
        }
695
0
      }
696
697
0
      if (is_final_step) {
698
0
        RETURN_NOT_OK(write_entry_batch(/* last_batch_of_segment= */ true));
699
0
      }
700
0
      LOG(INFO) << "Step " << is_final_step << " processed " << batch.entry().size()
701
0
                << " entries in " << segment->path();
702
0
    }
703
0
    if (!is_final_step) {
704
0
      delta_data.FinalizeTimeMap();
705
0
    }
706
0
  }
707
708
0
  log::LogSegmentFooterPB footer;
709
0
  footer.set_num_entries(num_entries);
710
0
  if (num_entries) {
711
0
    footer.set_min_replicate_index(min_replicate_index);
712
0
    footer.set_max_replicate_index(max_replicate_index);
713
0
  }
714
715
0
  RETURN_NOT_OK(new_segment.WriteFooterAndClose(footer));
716
0
  return Env::Default()->RenameFile(tmp_segment_path, new_segment_path);
717
0
}
718
719
CHECKED_STATUS ChangeTimeInWalDirs(
720
    MonoDelta delta, HybridTime bound_time, size_t max_num_old_wal_entries,
721
0
    const std::vector<std::string>& dirs, TaskRunner* runner) {
722
0
  Env* env = Env::Default();
723
0
  std::vector<std::string> wal_dirs;
724
0
  auto callback = [&wal_dirs](
725
0
      Env::FileType type, const std::string& dirname, const std::string& fname) {
726
0
    if (type != Env::FileType::DIRECTORY_TYPE) {
727
0
      return Status::OK();
728
0
    }
729
0
    auto full_path = JoinPathSegments(dirname, fname);
730
0
    if (!regex_match(full_path, kTabletRegex)) {
731
0
      return Status::OK();
732
0
    }
733
0
    wal_dirs.push_back(full_path);
734
0
    return Status::OK();
735
0
  };
736
0
  for (const auto& dir : dirs) {
737
0
    RETURN_NOT_OK(env->Walk(dir, Env::DirectoryOrder::POST_ORDER, callback));
738
0
  }
739
0
  std::random_shuffle(wal_dirs.begin(), wal_dirs.end());
740
0
  for (const auto& dir : wal_dirs) {
741
0
    runner->Submit([delta, bound_time, max_num_old_wal_entries, dir] {
742
0
      return ChangeTimeInWalDir(delta, bound_time, max_num_old_wal_entries, dir);
743
0
    });
744
0
  }
745
0
  return Status::OK();
746
0
}
747
748
0
CHECKED_STATUS ChangeTimeExecute(const ChangeTimeArguments& args, bool subtract) {
749
0
  auto delta = VERIFY_RESULT(DateTime::IntervalFromString(args.delta));
750
0
  if (subtract) {
751
0
    delta = -delta;
752
0
  }
753
0
  HybridTime bound_time;
754
0
  if (!args.bound_time.empty()) {
755
0
    bound_time = VERIFY_RESULT(HybridTime::ParseHybridTime(args.bound_time)).AddDelta(-delta);
756
0
    if (bound_time < HybridTime::FromMicros(kYugaByteMicrosecondEpoch)) {
757
0
      return STATUS_FORMAT(
758
0
          InvalidArgument, "Wrong bound-time and delta combination: $0", bound_time);
759
0
    }
760
0
    LOG(INFO) << "Bound time before adding delta: " << bound_time.ToString();
761
0
  }
762
0
  TaskRunner runner;
763
0
  RETURN_NOT_OK(runner.Init(args.concurrency));
764
0
  RETURN_NOT_OK(ChangeTimeInDataFiles(
765
0
      delta, bound_time, args.max_num_old_wal_entries, SplitAndFlatten(args.data_dirs), args.debug,
766
0
      &runner));
767
0
  RETURN_NOT_OK(ChangeTimeInWalDirs(
768
0
      delta, bound_time, args.max_num_old_wal_entries, SplitAndFlatten(args.wal_dirs), &runner));
769
0
  return runner.Wait();
770
0
}
771
772
// ------------------------------------------------------------------------------------------------
773
// Add time command
774
// ------------------------------------------------------------------------------------------------
775
776
const std::string kAddTimeDescription = "Add time delta to physical time in SST files";
777
778
using AddTimeArguments = ChangeTimeArguments;
779
780
0
std::unique_ptr<OptionsDescription> AddTimeOptions() {
781
0
  return ChangeTimeOptions(kAddTimeDescription);
782
0
}
783
784
0
CHECKED_STATUS AddTimeExecute(const AddTimeArguments& args) {
785
0
  return ChangeTimeExecute(args, /* subtract= */ false);
786
0
}
787
788
// ------------------------------------------------------------------------------------------------
789
// Subtract time command
790
// ------------------------------------------------------------------------------------------------
791
792
const std::string kSubTimeDescription = "Subtract time delta from physical time in SST files";
793
794
using SubTimeArguments = ChangeTimeArguments;
795
796
0
std::unique_ptr<OptionsDescription> SubTimeOptions() {
797
0
  return ChangeTimeOptions(kSubTimeDescription);
798
0
}
799
800
0
CHECKED_STATUS SubTimeExecute(const SubTimeArguments& args) {
801
0
  return ChangeTimeExecute(args, /* subtract= */ true);
802
0
}
803
804
// ------------------------------------------------------------------------------------------------
805
// Apply patch
806
// ------------------------------------------------------------------------------------------------
807
808
const std::string kApplyPatchDescription = "Apply prepared SST files patch";
809
810
struct ApplyPatchArguments {
811
  std::vector<std::string> data_dirs;
812
  std::vector<std::string> wal_dirs;
813
  bool dry_run = false;
814
  bool revert = false;
815
};
816
817
0
std::unique_ptr<OptionsDescription> ApplyPatchOptions() {
818
0
  auto result = std::make_unique<OptionsDescriptionImpl<ApplyPatchArguments>>(
819
0
      kApplyPatchDescription);
820
0
  auto& args = result->args;
821
0
  result->desc.add_options()
822
0
      ("data-dirs", po::value(&args.data_dirs)->required(), "TServer data dirs")
823
0
      ("wal-dirs", po::value(&args.wal_dirs)->required(), "TServer WAL dirs")
824
0
      ("dry-run", po::bool_switch(&args.dry_run),
825
0
       "Do not make any changes to live data, only check that apply-patch would work. "
826
0
       "This might still involve making changes to files in .patched directories.")
827
0
      ("revert", po::bool_switch(&args.revert),
828
0
       "Revert a previous apply-patch operation. This will move backup RocksDB and WAL "
829
0
       "directories back to their live locations, and the live locations to .patched locations.");
830
0
  return result;
831
0
}
832
833
class ApplyPatch {
834
 public:
835
0
  CHECKED_STATUS Execute(const ApplyPatchArguments& args) {
836
0
    dry_run_ = args.dry_run;
837
0
    revert_ = args.revert;
838
0
    LOG(INFO) << "Running the ApplyPatch command";
839
0
    LOG(INFO) << "    data_dirs=" << yb::ToString(args.data_dirs);
840
0
    LOG(INFO) << "    wal_dirs=" << yb::ToString(args.data_dirs);
841
0
    LOG(INFO) << "    dry_run=" << args.dry_run;
842
0
    LOG(INFO) << "    revert=" << args.revert;
843
0
    for (const auto& dir : SplitAndFlatten(args.data_dirs)) {
844
0
      RETURN_NOT_OK(env_->Walk(
845
0
          dir, Env::DirectoryOrder::POST_ORDER,
846
0
          std::bind(&ApplyPatch::WalkDataCallback, this, _1, _2, _3)));
847
0
    }
848
849
0
    for (const auto& dir : SplitAndFlatten(args.wal_dirs)) {
850
0
      RETURN_NOT_OK(env_->Walk(
851
0
          dir, Env::DirectoryOrder::POST_ORDER,
852
0
          std::bind(&ApplyPatch::WalkWalCallback, this, _1, _2, _3)));
853
0
    }
854
855
0
    RocksDBHelper helper;
856
0
    auto options = helper.options();
857
0
    options.skip_stats_update_on_db_open = true;
858
859
0
    int num_revert_errors = 0;
860
861
0
    int num_dirs_handled = 0;
862
0
    for (const auto* dirs : {&data_dirs_, &wal_dirs_}) {
863
0
      for (const auto& dir : *dirs) {
864
0
        auto backup_path = dir + kBackupExtension;
865
0
        auto patched_path = dir + kPatchedExtension;
866
867
0
        if (revert_) {
868
0
          bool backup_dir_exists = env_->FileExists(backup_path);
869
0
          bool patched_path_exists = env_->FileExists(patched_path);
870
0
          if (backup_dir_exists && !patched_path_exists) {
871
0
            auto rename_status = ChainRename(backup_path, dir, patched_path);
872
0
            if (!rename_status.ok()) {
873
0
              LOG(INFO) << "Error during revert: " << rename_status;
874
0
              num_revert_errors++;
875
0
            }
876
0
          } else {
877
0
            LOG(INFO)
878
0
                << "Not attempting to restore " << backup_path << " to " << dir
879
0
                << " after moving " << dir << " back to " << patched_path
880
0
                << ": "
881
0
                << (backup_dir_exists ? "" : "backup path does not exist; ")
882
0
                << (patched_path_exists ? "patched path already exists" : "");
883
0
            num_revert_errors++;
884
0
          }
885
0
          continue;
886
0
        }
887
888
0
        if (dirs == &data_dirs_) {
889
0
          if (valid_rocksdb_dirs_.count(dir)) {
890
0
            LOG(INFO) << "Patching non-live RocksDB metadata in " << patched_path;
891
0
            docdb::RocksDBPatcher patcher(patched_path, options);
892
0
            RETURN_NOT_OK(patcher.Load());
893
0
            RETURN_NOT_OK(patcher.UpdateFileSizes());
894
0
            docdb::ConsensusFrontier frontier;
895
0
            frontier.set_hybrid_time(HybridTime::kMin);
896
0
            frontier.set_history_cutoff(HybridTime::FromMicros(kYugaByteMicrosecondEpoch));
897
0
            RETURN_NOT_OK(patcher.ModifyFlushedFrontier(frontier));
898
0
          } else {
899
0
            LOG(INFO) << "We did not see RocksDB CURRENT or MANIFEST-... files in "
900
0
                       << dir << ", skipping applying " << patched_path;
901
0
            continue;
902
0
          }
903
0
        }
904
905
0
        RETURN_NOT_OK(ChainRename(patched_path, dir, backup_path));
906
0
        num_dirs_handled++;
907
0
      }
908
0
    }
909
910
0
    LOG(INFO) << "Processed " << num_dirs_handled << " directories (two renames per each)";
911
0
    if (num_revert_errors) {
912
0
      return STATUS_FORMAT(
913
0
          IOError,
914
0
          "Encountered $0 errors when trying to revert an applied patch. "
915
0
          "Check the log above for details.",
916
0
          num_revert_errors);
917
0
    }
918
0
    return Status::OK();
919
0
  }
920
921
 private:
922
923
  // ----------------------------------------------------------------------------------------------
924
  // Functions for traversing RocksDB data directories
925
  // ----------------------------------------------------------------------------------------------
926
927
  CHECKED_STATUS WalkDataCallback(
928
0
      Env::FileType type, const std::string& dirname, const std::string& fname) {
929
0
    switch (type) {
930
0
      case Env::FileType::FILE_TYPE:
931
0
        return HandleDataFile(dirname, fname);
932
0
      case Env::FileType::DIRECTORY_TYPE:
933
0
        CheckDirectory(dirname, fname, &data_dirs_);
934
0
        return Status::OK();
935
0
    }
936
0
    FATAL_INVALID_ENUM_VALUE(Env::FileType, type);
937
0
  }
938
939
  // Handles a file found during walking through the a data (RocksDB) directory tree. Looks for
940
  // CURRENT and MANIFEST files and copies them to the corresponding .patched directory. Does not
941
  // modify live data of the cluster.
942
0
  CHECKED_STATUS HandleDataFile(const std::string& dirname, const std::string& fname) {
943
0
    if (revert_) {
944
      // We don't look at any of the manifest files during the revert operation.
945
0
      return Status::OK();
946
0
    }
947
948
0
    if (!regex_match(dirname, kTabletDbRegex)) {
949
0
      return Status::OK();
950
0
    }
951
0
    if (fname != "CURRENT" && !regex_match(fname, kManifestRegex)) {
952
0
      return Status::OK();
953
0
    }
954
0
    auto patched_dirname = dirname + kPatchedExtension;
955
0
    if (env_->DirExists(patched_dirname)) {
956
0
      valid_rocksdb_dirs_.insert(dirname);
957
0
      auto full_src_path = JoinPathSegments(dirname, fname);
958
0
      auto full_dst_path = JoinPathSegments(patched_dirname, fname);
959
0
      LOG(INFO) << "Copying file " << full_src_path << " to " << full_dst_path;
960
0
      Status copy_status = env_util::CopyFile(env_, full_src_path, full_dst_path);
961
0
      if (!copy_status.ok()) {
962
0
        LOG(INFO) << "Error copying file " << full_src_path << " to " << full_dst_path << ": "
963
0
                    << copy_status;
964
0
      }
965
0
      return copy_status;
966
0
    }
967
968
0
    LOG(INFO) << "Directory " << patched_dirname << " does not exist, not copying "
969
0
                << "the file " << fname << " there (this is not an error)";
970
0
    return Status::OK();
971
0
  }
972
973
  // ----------------------------------------------------------------------------------------------
974
  // Traversing WAL directories
975
  // ----------------------------------------------------------------------------------------------
976
977
  CHECKED_STATUS WalkWalCallback(
978
0
      Env::FileType type, const std::string& dirname, const std::string& fname) {
979
0
    if (type != Env::FileType::DIRECTORY_TYPE) {
980
0
      return Status::OK();
981
0
    }
982
0
    CheckDirectory(dirname, fname, &wal_dirs_);
983
0
    return Status::OK();
984
0
  }
985
986
  // ----------------------------------------------------------------------------------------------
987
  // Functions used for both RocksDB data and WALs
988
  // ----------------------------------------------------------------------------------------------
989
990
  // Look at the given directory, and if it is a patched directory (or a backup directory if we are
991
  // doing a revert operation), strip off the suffix and add the corresponding live directory to
992
  // the given vector.
993
  void CheckDirectory(
994
0
      const std::string& dirname, const std::string& fname, std::vector<std::string>* dirs) {
995
0
    const std::string& needed_extension = revert_ ? kBackupExtension : kPatchedExtension;
996
0
    if (!boost::ends_with(fname, needed_extension)) {
997
0
      return;
998
0
    }
999
0
    auto patched_path = JoinPathSegments(dirname, fname);
1000
0
    auto full_path = patched_path.substr(0, patched_path.size() - needed_extension.size());
1001
0
    if (!regex_match(full_path, kTabletDbRegex)) {
1002
0
      return;
1003
0
    }
1004
0
    dirs->push_back(full_path);
1005
0
    return;
1006
0
  }
1007
1008
  // Renames dir1 -> dir2 -> dir3, starting from the end of the chain.
1009
  CHECKED_STATUS ChainRename(
1010
0
      const std::string& dir1, const std::string& dir2, const std::string& dir3) {
1011
0
    RETURN_NOT_OK(SafeRename(dir2, dir3, /* check_dst_collision= */ true));
1012
1013
    // Don't check that dir2 does not exist, because we haven't actually moved dir2 to dir3 in the
1014
    // dry run mode.
1015
0
    return SafeRename(dir1, dir2, /* check_dst_collision= */ false);
1016
0
  }
1017
1018
  // A logging wrapper over directory renaming. In dry-run mode, checks for some errors, but
1019
  // check_dst_collision=false allows to skip ensuring that the destination does not exist.
1020
  CHECKED_STATUS SafeRename(
1021
0
      const std::string& src, const std::string& dst, bool check_dst_collision) {
1022
0
    if (dry_run_) {
1023
0
      if (!env_->FileExists(src)) {
1024
0
        return STATUS_FORMAT(
1025
0
            IOError, "Would fail to rename $0 to $1, source does not exist", src, dst);
1026
0
      }
1027
0
      if (check_dst_collision && env_->FileExists(dst)) {
1028
0
        return STATUS_FORMAT(
1029
0
            IOError, "Would fail to rename $0 to $1, destination already exists", src, dst);
1030
0
      }
1031
0
      LOG(INFO) << "Would rename " << src << " to " << dst;
1032
0
      return Status::OK();
1033
0
    }
1034
0
    LOG(INFO) << "Renaming " << src << " to " << dst;
1035
0
    Status s = env_->RenameFile(src, dst);
1036
0
    if (!s.ok()) {
1037
0
      LOG(ERROR) << "Error renaming " << src << " to " << dst << ": " << s;
1038
0
    }
1039
0
    return s;
1040
0
  }
1041
1042
  Env* env_ = Env::Default();
1043
  std::vector<std::string> data_dirs_;
1044
  std::vector<std::string> wal_dirs_;
1045
1046
  // The set of tablet RocksDB directories where we found CURRENT and MANIFEST-... files, indicating
1047
  // that there is a valid RocksDB database present.
1048
  std::set<std::string> valid_rocksdb_dirs_;
1049
1050
  bool dry_run_ = false;
1051
  bool revert_ = false;
1052
};
1053
1054
0
CHECKED_STATUS ApplyPatchExecute(const ApplyPatchArguments& args) {
1055
0
  ApplyPatch apply_patch;
1056
0
  return apply_patch.Execute(args);
1057
0
}
1058
1059
YB_TOOL_ARGUMENTS(DataPatcherAction, DATA_PATCHER_ACTIONS);
1060
1061
} // namespace tools
1062
} // namespace yb
1063
1064
13.2k
int main(int argc, char** argv) {
1065
13.2k
  yb::HybridTime::TEST_SetPrettyToString(true);
1066
1067
  // Setup flags to avoid unnecessary logging
1068
13.2k
  FLAGS_rocksdb_max_background_flushes = 1;
1069
13.2k
  FLAGS_rocksdb_max_background_compactions = 1;
1070
13.2k
  FLAGS_rocksdb_base_background_compactions = 1;
1071
13.2k
  FLAGS_priority_thread_pool_size = 1;
1072
1073
13.2k
  yb::InitGoogleLoggingSafeBasic(argv[0]);
1074
13.2k
  return yb::tools::ExecuteTool<yb::tools::DataPatcherAction>(argc, argv);
1075
13.2k
}