YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/consensus/log-dump.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include <map>
34
#include <set>
35
#include <vector>
36
37
#include <boost/preprocessor/cat.hpp>
38
#include <boost/preprocessor/stringize.hpp>
39
#include <gflags/gflags.h>
40
#include <glog/logging.h>
41
42
#include "yb/common/schema.h"
43
#include "yb/common/wire_protocol.h"
44
45
#include "yb/consensus/consensus.pb.h"
46
#include "yb/consensus/log.h"
47
#include "yb/consensus/log_index.h"
48
#include "yb/consensus/log_reader.h"
49
50
#include "yb/gutil/stl_util.h"
51
#include "yb/gutil/strings/numbers.h"
52
53
#include "yb/util/atomic.h"
54
#include "yb/util/env.h"
55
#include "yb/util/flags.h"
56
#include "yb/util/logging.h"
57
#include "yb/util/memory/arena.h"
58
#include "yb/util/metric_entity.h"
59
#include "yb/util/monotime.h"
60
#include "yb/util/opid.h"
61
#include "yb/util/pb_util.h"
62
#include "yb/util/result.h"
63
#include "yb/util/size_literals.h"
64
#include "yb/util/status_format.h"
65
66
DEFINE_bool(print_headers, true, "print the log segment headers/footers");
67
DEFINE_bool(filter_log_segment, false, "filter the input log segment");
68
DEFINE_string(print_entries, "decoded",
69
              "How to print entries:\n"
70
              "  false|0|no = don't print\n"
71
              "  true|1|yes|decoded = print them decoded\n"
72
              "  pb = print the raw protobuf\n"
73
              "  id = print only their ids");
74
DEFINE_int32(truncate_data, 100,
75
             "Truncate the data fields to the given number of bytes "
76
             "before printing. Set to 0 to disable");
77
78
DEFINE_int64(min_op_term_to_omit, yb::OpId::Invalid().term,
79
             "Term of first record (inclusive) to omit from the result for --filter_log_segment");
80
81
DEFINE_int64(min_op_index_to_omit, yb::OpId::Invalid().index,
82
             "Index of first record (inclusive) to omit from the result for --filter_log_segment");
83
84
DEFINE_int64(max_op_term_to_omit, yb::OpId::Invalid().term,
85
             "Term of last record (inclusive) to omit from the result for --filter_log_segment");
86
87
DEFINE_int64(max_op_index_to_omit, yb::OpId::Invalid().index,
88
             "Index of last record (inclusive) to omit from the result for --filter_log_segment");
89
90
DEFINE_string(output_wal_dir, "", "WAL directory for the output of --filter_log_segment");
91
92
namespace yb {
93
namespace log {
94
95
using consensus::OperationType;
96
using consensus::ReplicateMsg;
97
using std::string;
98
using std::vector;
99
using std::cout;
100
using std::endl;
101
102
enum PrintEntryType {
103
  DONT_PRINT,
104
  PRINT_PB,
105
  PRINT_DECODED,
106
  PRINT_ID
107
};
108
109
0
static PrintEntryType ParsePrintType() {
110
0
  if (!ParseLeadingBoolValue(FLAGS_print_entries.c_str(), true)) {
111
0
    return DONT_PRINT;
112
0
  }
113
114
0
  if (ParseLeadingBoolValue(FLAGS_print_entries.c_str(), false) ||
115
0
      FLAGS_print_entries == "decoded") {
116
0
    return PRINT_DECODED;
117
0
  }
118
119
0
  if (FLAGS_print_entries == "pb") {
120
0
    return PRINT_PB;
121
0
  }
122
123
0
  if (FLAGS_print_entries == "id") {
124
0
    return PRINT_ID;
125
0
  }
126
127
0
  LOG(FATAL) << "Unknown value for --print_entries: " << FLAGS_print_entries;
128
0
}
129
130
0
void PrintIdOnly(const LogEntryPB& entry) {
131
0
  switch (entry.type()) {
132
0
    case log::REPLICATE:
133
0
    {
134
0
      cout << entry.replicate().id().term() << "." << entry.replicate().id().index()
135
0
           << "@" << entry.replicate().hybrid_time() << "\t";
136
0
      cout << "REPLICATE "
137
0
           << OperationType_Name(entry.replicate().op_type());
138
0
      break;
139
0
    }
140
0
    default:
141
0
      cout << "UNKNOWN: " << entry.ShortDebugString();
142
0
  }
143
144
0
  cout << endl;
145
0
}
146
147
Status PrintDecodedWriteRequestPB(const string& indent,
148
                                  const Schema& tablet_schema,
149
0
                                  const tablet::WritePB& write) {
150
0
  return Status::OK();
151
0
}
152
153
0
Status PrintDecoded(const LogEntryPB& entry, const Schema& tablet_schema) {
154
0
  PrintIdOnly(entry);
155
156
0
  const string indent = "\t";
157
0
  if (entry.has_replicate()) {
158
    // We can actually decode REPLICATE messages.
159
160
0
    const ReplicateMsg& replicate = entry.replicate();
161
0
    if (replicate.op_type() == consensus::WRITE_OP) {
162
0
      RETURN_NOT_OK(PrintDecodedWriteRequestPB(indent, tablet_schema, replicate.write()));
163
0
    } else {
164
0
      cout << indent << replicate.ShortDebugString() << endl;
165
0
    }
166
0
  }
167
168
0
  return Status::OK();
169
0
}
170
171
0
Status PrintSegment(const scoped_refptr<ReadableLogSegment>& segment) {
172
0
  PrintEntryType print_type = ParsePrintType();
173
0
  if (FLAGS_print_headers) {
174
0
    cout << "Header:\n" << segment->header().DebugString();
175
0
  }
176
0
  auto read_entries = segment->ReadEntries();
177
0
  RETURN_NOT_OK(read_entries.status);
178
179
0
  if (print_type == DONT_PRINT) return Status::OK();
180
181
0
  Schema tablet_schema;
182
0
  RETURN_NOT_OK(SchemaFromPB(segment->header().unused_schema(), &tablet_schema));
183
184
0
  for (const auto& entry : read_entries.entries) {
185
186
0
    if (print_type == PRINT_PB) {
187
0
      if (FLAGS_truncate_data > 0) {
188
0
        pb_util::TruncateFields(entry.get(), FLAGS_truncate_data);
189
0
      }
190
191
0
      cout << "Entry:\n" << entry->DebugString();
192
0
    } else if (print_type == PRINT_DECODED) {
193
0
      RETURN_NOT_OK(PrintDecoded(*entry, tablet_schema));
194
0
    } else if (print_type == PRINT_ID) {
195
0
      PrintIdOnly(*entry);
196
0
    }
197
0
  }
198
0
  if (FLAGS_print_headers && segment->HasFooter()) {
199
0
    cout << "Footer:\n" << segment->footer().DebugString();
200
0
  }
201
202
0
  return Status::OK();
203
0
}
204
205
0
Status DumpLog(const string& tablet_id, const string& tablet_wal_path) {
206
0
  Env *env = Env::Default();
207
0
  FsManagerOpts fs_opts;
208
0
  fs_opts.read_only = true;
209
0
  FsManager fs_manager(env, fs_opts);
210
211
0
  RETURN_NOT_OK(fs_manager.Open());
212
0
  std::unique_ptr<LogReader> reader;
213
0
  RETURN_NOT_OK(LogReader::Open(env,
214
0
                                scoped_refptr<LogIndex>(),
215
0
                                "Log reader: ",
216
0
                                tablet_wal_path,
217
0
                                scoped_refptr<MetricEntity>(),
218
0
                                scoped_refptr<MetricEntity>(),
219
0
                                &reader));
220
221
0
  SegmentSequence segments;
222
0
  RETURN_NOT_OK(reader->GetSegmentsSnapshot(&segments));
223
224
0
  for (const scoped_refptr<ReadableLogSegment>& segment : segments) {
225
0
    RETURN_NOT_OK(PrintSegment(segment));
226
0
  }
227
228
0
  return Status::OK();
229
0
}
230
231
0
Status DumpSegment(const string &segment_path) {
232
0
  Env *env = Env::Default();
233
0
  auto segment = VERIFY_RESULT(ReadableLogSegment::Open(env, segment_path));
234
0
  RETURN_NOT_OK(PrintSegment(segment));
235
236
0
  return Status::OK();
237
0
}
238
239
0
Status FilterLogSegment(const string& segment_path) {
240
0
  Env *const env = Env::Default();
241
242
0
  auto output_wal_dir = FLAGS_output_wal_dir;
243
0
  if (output_wal_dir.empty()) {
244
0
    return STATUS(InvalidArgument, "--output_wal_dir not specified");
245
0
  }
246
247
0
  if (env->DirExists(output_wal_dir)) {
248
0
    return STATUS_FORMAT(IllegalState, "Directory '$0' already exists", output_wal_dir);
249
0
  }
250
0
  RETURN_NOT_OK(env->CreateDir(output_wal_dir));
251
0
  output_wal_dir = VERIFY_RESULT(env->Canonicalize(output_wal_dir));
252
0
  LOG(INFO) << "Created directory " << output_wal_dir;
253
254
0
  auto segment = VERIFY_RESULT(ReadableLogSegment::Open(env, segment_path));
255
0
  Schema tablet_schema;
256
0
  const auto& segment_header = segment->header();
257
258
0
  RETURN_NOT_OK(SchemaFromPB(segment->header().unused_schema(), &tablet_schema));
259
260
0
  auto log_options = LogOptions();
261
0
  log_options.env = env;
262
263
  // We have to subtract one here because the Log implementation will add one for the new segment.
264
0
  log_options.initial_active_segment_sequence_number = segment_header.sequence_number() - 1;
265
0
  const auto source_segment_size_bytes = VERIFY_RESULT(env->GetFileSize(segment_path));
266
  // Set the target segment size slightly larger to make sure all the data fits. Also round it up
267
  // to the nearest 1 MB.
268
0
  const auto target_segment_size_bytes = (
269
0
      static_cast<size_t>(source_segment_size_bytes * 1.1) + 1_MB - 1) / 1_MB * 1_MB;
270
0
  log_options.initial_segment_size_bytes = target_segment_size_bytes;
271
0
  log_options.segment_size_bytes = target_segment_size_bytes;
272
0
  LOG(INFO) << "Source segment size " << segment_path
273
0
            << ": " << source_segment_size_bytes << " bytes";
274
0
  LOG(INFO) << "Target segment size: "
275
0
            << target_segment_size_bytes << " bytes";
276
0
  std::unique_ptr<ThreadPool> log_thread_pool;
277
0
  RETURN_NOT_OK(ThreadPoolBuilder("log").unlimited_threads().Build(&log_thread_pool));
278
279
0
  const OpId first_op_id_to_omit = { FLAGS_min_op_term_to_omit, FLAGS_min_op_index_to_omit };
280
0
  const auto first_op_id_to_omit_valid = first_op_id_to_omit.valid();
281
0
  if (!first_op_id_to_omit_valid && first_op_id_to_omit != OpId::Invalid()) {
282
0
    return STATUS(InvalidArgument,
283
0
                  "--min_op_term_to_omit / --min_op_index_to_omit can only be specified together");
284
0
  }
285
286
0
  const OpId last_op_id_to_omit = { FLAGS_max_op_term_to_omit, FLAGS_max_op_index_to_omit };
287
0
  const auto last_op_id_to_omit_valid = last_op_id_to_omit.valid();
288
0
  if (!last_op_id_to_omit_valid && last_op_id_to_omit != OpId::Invalid()) {
289
0
    return STATUS(InvalidArgument,
290
0
                  "--max_op_term_to_omit / --max_op_index_to_omit can only be specified together");
291
0
  }
292
293
  // If neither first/last OpId to omit are specified, we will just copy all operations to the
294
  // output file. This might be useful in some kinds of testing or troubleshooting.
295
0
  const bool omit_something = first_op_id_to_omit_valid || last_op_id_to_omit_valid;
296
297
  // Invalid first/last OpId to omit indicate an open range of OpIds to omit.
298
0
  const bool omit_starting_with_earliest_op_id = omit_something && !first_op_id_to_omit_valid;
299
0
  const bool omit_to_infinite_op_id = omit_something && !last_op_id_to_omit_valid;
300
301
0
  if (omit_something) {
302
0
    if (first_op_id_to_omit_valid && last_op_id_to_omit_valid) {
303
0
      LOG(INFO) << "Will omit records between OpIds " << first_op_id_to_omit << " and "
304
0
                << last_op_id_to_omit << " (including the exact OpId matches).";
305
0
    } else if (first_op_id_to_omit_valid) {
306
0
      LOG(INFO) << "Will omit records with OpId greater than or equal to " << first_op_id_to_omit;
307
0
    } else {
308
0
      LOG(INFO) << "Will omit records with OpId less than or equal to " << last_op_id_to_omit;
309
0
    }
310
0
  } else {
311
0
    LOG(INFO) << "Will include all records of the source WAL in the output";
312
0
  }
313
314
0
  scoped_refptr<Log> log;
315
0
  RETURN_NOT_OK(Log::Open(
316
0
      log_options,
317
0
      segment_header.unused_tablet_id(),
318
0
      output_wal_dir,
319
0
      "log-dump-tool",
320
0
      tablet_schema,
321
0
      segment_header.unused_schema_version(),
322
0
      /* table_metric_entity */ nullptr,
323
0
      /* tablet_metric_entity */ nullptr,
324
0
      log_thread_pool.get(),
325
0
      log_thread_pool.get(),
326
0
      /* cdc_min_replicated_index */ 0,
327
0
      &log));
328
329
0
  auto read_entries = segment->ReadEntries();
330
0
  RETURN_NOT_OK(read_entries.status);
331
0
  uint64_t num_omitted = 0;
332
0
  uint64_t num_included = 0;
333
0
  CHECK_EQ(read_entries.entries.size(), read_entries.entry_metadata.size());
334
0
  for (size_t i = 0; i < read_entries.entries.size(); ++i) {
335
0
    auto& entry_ptr = read_entries.entries[i];
336
0
    const OpId op_id = OpId::FromPB(entry_ptr->replicate().id());
337
0
    if (omit_something &&
338
0
        (omit_starting_with_earliest_op_id ||
339
0
         (first_op_id_to_omit_valid && op_id >= first_op_id_to_omit)) &&
340
0
        (omit_to_infinite_op_id ||
341
0
         (last_op_id_to_omit_valid && op_id <= last_op_id_to_omit))) {
342
0
      num_omitted++;
343
0
      continue;
344
0
    }
345
0
    RETURN_NOT_OK(log->Append(
346
0
        entry_ptr.release(), read_entries.entry_metadata[i],
347
0
        /* skip_wal_rewrite */ false));
348
0
    num_included++;
349
0
  }
350
0
  LOG(INFO) << "Included " << num_included << " entries, omitted " << num_omitted << " entries";
351
0
  RETURN_NOT_OK(log->Close());
352
353
0
  auto resulting_files = VERIFY_RESULT(
354
0
      env->GetChildren(output_wal_dir, ExcludeDots::kTrue));
355
0
  sort(resulting_files.begin(), resulting_files.end());
356
0
  for (const auto& resulting_file_name : resulting_files) {
357
0
    LOG(INFO) << "Generated file " << JoinPathSegments(output_wal_dir, resulting_file_name);
358
0
  }
359
360
0
  return Status::OK();
361
0
}
362
363
} // namespace log
364
} // namespace yb
365
366
int main(int argc, char **argv) {
367
  yb::ParseCommandLineFlags(&argc, &argv, true);
368
  using yb::Status;
369
370
  if (argc != 2 && argc != 3) {
371
    std::cerr << "usage: " << argv[0]
372
              << " --fs_data_dirs <dirs>"
373
              << " {<tablet_name> <log path>} | <log segment path>"
374
              << " [--filter_log_segment --output_wal_dir <dest_dir>]"
375
              << std::endl;
376
    return 1;
377
  }
378
379
  yb::Status status;
380
  yb::InitGoogleLoggingSafeBasic(argv[0]);
381
  if (argc == 2) {
382
    if (FLAGS_filter_log_segment) {
383
      status = yb::log::FilterLogSegment(argv[1]);
384
    } else {
385
      status = yb::log::DumpSegment(argv[1]);
386
    }
387
  } else {
388
    if (FLAGS_filter_log_segment) {
389
      status = STATUS(
390
          InvalidArgument,
391
          "--filter_log_segment is only allowed when a single segment file is specified");
392
    } else {
393
      status = yb::log::DumpLog(argv[1], argv[2]);
394
    }
395
  }
396
397
  if (status.ok()) {
398
    return 0;
399
  }
400
  std::cerr << "Error: " << status.ToString() << std::endl;
401
  return 1;
402
}