/Users/deen/code/yugabyte-db/src/yb/consensus/log-dump.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | // |
18 | | // The following only applies to changes made to this file as part of YugaByte development. |
19 | | // |
20 | | // Portions Copyright (c) YugaByte, Inc. |
21 | | // |
22 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
23 | | // in compliance with the License. You may obtain a copy of the License at |
24 | | // |
25 | | // http://www.apache.org/licenses/LICENSE-2.0 |
26 | | // |
27 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
28 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
29 | | // or implied. See the License for the specific language governing permissions and limitations |
30 | | // under the License. |
31 | | // |
32 | | |
33 | | #include <map> |
34 | | #include <set> |
35 | | #include <vector> |
36 | | |
37 | | #include <boost/preprocessor/cat.hpp> |
38 | | #include <boost/preprocessor/stringize.hpp> |
39 | | #include <gflags/gflags.h> |
40 | | #include <glog/logging.h> |
41 | | |
42 | | #include "yb/common/schema.h" |
43 | | #include "yb/common/wire_protocol.h" |
44 | | |
45 | | #include "yb/consensus/consensus.pb.h" |
46 | | #include "yb/consensus/log.h" |
47 | | #include "yb/consensus/log_index.h" |
48 | | #include "yb/consensus/log_reader.h" |
49 | | |
50 | | #include "yb/gutil/stl_util.h" |
51 | | #include "yb/gutil/strings/numbers.h" |
52 | | |
53 | | #include "yb/util/atomic.h" |
54 | | #include "yb/util/env.h" |
55 | | #include "yb/util/flags.h" |
56 | | #include "yb/util/logging.h" |
57 | | #include "yb/util/memory/arena.h" |
58 | | #include "yb/util/metric_entity.h" |
59 | | #include "yb/util/monotime.h" |
60 | | #include "yb/util/opid.h" |
61 | | #include "yb/util/pb_util.h" |
62 | | #include "yb/util/result.h" |
63 | | #include "yb/util/size_literals.h" |
64 | | #include "yb/util/status_format.h" |
65 | | |
66 | | DEFINE_bool(print_headers, true, "print the log segment headers/footers"); |
67 | | DEFINE_bool(filter_log_segment, false, "filter the input log segment"); |
68 | | DEFINE_string(print_entries, "decoded", |
69 | | "How to print entries:\n" |
70 | | " false|0|no = don't print\n" |
71 | | " true|1|yes|decoded = print them decoded\n" |
72 | | " pb = print the raw protobuf\n" |
73 | | " id = print only their ids"); |
74 | | DEFINE_int32(truncate_data, 100, |
75 | | "Truncate the data fields to the given number of bytes " |
76 | | "before printing. Set to 0 to disable"); |
77 | | |
78 | | DEFINE_int64(min_op_term_to_omit, yb::OpId::Invalid().term, |
79 | | "Term of first record (inclusive) to omit from the result for --filter_log_segment"); |
80 | | |
81 | | DEFINE_int64(min_op_index_to_omit, yb::OpId::Invalid().index, |
82 | | "Index of first record (inclusive) to omit from the result for --filter_log_segment"); |
83 | | |
84 | | DEFINE_int64(max_op_term_to_omit, yb::OpId::Invalid().term, |
85 | | "Term of last record (inclusive) to omit from the result for --filter_log_segment"); |
86 | | |
87 | | DEFINE_int64(max_op_index_to_omit, yb::OpId::Invalid().index, |
88 | | "Index of last record (inclusive) to omit from the result for --filter_log_segment"); |
89 | | |
90 | | DEFINE_string(output_wal_dir, "", "WAL directory for the output of --filter_log_segment"); |
91 | | |
92 | | namespace yb { |
93 | | namespace log { |
94 | | |
95 | | using consensus::OperationType; |
96 | | using consensus::ReplicateMsg; |
97 | | using std::string; |
98 | | using std::vector; |
99 | | using std::cout; |
100 | | using std::endl; |
101 | | |
102 | | enum PrintEntryType { |
103 | | DONT_PRINT, |
104 | | PRINT_PB, |
105 | | PRINT_DECODED, |
106 | | PRINT_ID |
107 | | }; |
108 | | |
109 | 0 | static PrintEntryType ParsePrintType() { |
110 | 0 | if (!ParseLeadingBoolValue(FLAGS_print_entries.c_str(), true)) { |
111 | 0 | return DONT_PRINT; |
112 | 0 | } |
113 | | |
114 | 0 | if (ParseLeadingBoolValue(FLAGS_print_entries.c_str(), false) || |
115 | 0 | FLAGS_print_entries == "decoded") { |
116 | 0 | return PRINT_DECODED; |
117 | 0 | } |
118 | | |
119 | 0 | if (FLAGS_print_entries == "pb") { |
120 | 0 | return PRINT_PB; |
121 | 0 | } |
122 | | |
123 | 0 | if (FLAGS_print_entries == "id") { |
124 | 0 | return PRINT_ID; |
125 | 0 | } |
126 | | |
127 | 0 | LOG(FATAL) << "Unknown value for --print_entries: " << FLAGS_print_entries; |
128 | 0 | } |
129 | | |
130 | 0 | void PrintIdOnly(const LogEntryPB& entry) { |
131 | 0 | switch (entry.type()) { |
132 | 0 | case log::REPLICATE: |
133 | 0 | { |
134 | 0 | cout << entry.replicate().id().term() << "." << entry.replicate().id().index() |
135 | 0 | << "@" << entry.replicate().hybrid_time() << "\t"; |
136 | 0 | cout << "REPLICATE " |
137 | 0 | << OperationType_Name(entry.replicate().op_type()); |
138 | 0 | break; |
139 | 0 | } |
140 | 0 | default: |
141 | 0 | cout << "UNKNOWN: " << entry.ShortDebugString(); |
142 | 0 | } |
143 | |
|
144 | 0 | cout << endl; |
145 | 0 | } |
146 | | |
147 | | Status PrintDecodedWriteRequestPB(const string& indent, |
148 | | const Schema& tablet_schema, |
149 | 0 | const tablet::WritePB& write) { |
150 | 0 | return Status::OK(); |
151 | 0 | } |
152 | | |
153 | 0 | Status PrintDecoded(const LogEntryPB& entry, const Schema& tablet_schema) { |
154 | 0 | PrintIdOnly(entry); |
155 | |
|
156 | 0 | const string indent = "\t"; |
157 | 0 | if (entry.has_replicate()) { |
158 | | // We can actually decode REPLICATE messages. |
159 | |
|
160 | 0 | const ReplicateMsg& replicate = entry.replicate(); |
161 | 0 | if (replicate.op_type() == consensus::WRITE_OP) { |
162 | 0 | RETURN_NOT_OK(PrintDecodedWriteRequestPB(indent, tablet_schema, replicate.write())); |
163 | 0 | } else { |
164 | 0 | cout << indent << replicate.ShortDebugString() << endl; |
165 | 0 | } |
166 | 0 | } |
167 | |
|
168 | 0 | return Status::OK(); |
169 | 0 | } |
170 | | |
171 | 0 | Status PrintSegment(const scoped_refptr<ReadableLogSegment>& segment) { |
172 | 0 | PrintEntryType print_type = ParsePrintType(); |
173 | 0 | if (FLAGS_print_headers) { |
174 | 0 | cout << "Header:\n" << segment->header().DebugString(); |
175 | 0 | } |
176 | 0 | auto read_entries = segment->ReadEntries(); |
177 | 0 | RETURN_NOT_OK(read_entries.status); |
178 | |
|
179 | 0 | if (print_type == DONT_PRINT) return Status::OK(); |
180 | | |
181 | 0 | Schema tablet_schema; |
182 | 0 | RETURN_NOT_OK(SchemaFromPB(segment->header().unused_schema(), &tablet_schema)); |
183 | |
|
184 | 0 | for (const auto& entry : read_entries.entries) { |
185 | |
|
186 | 0 | if (print_type == PRINT_PB) { |
187 | 0 | if (FLAGS_truncate_data > 0) { |
188 | 0 | pb_util::TruncateFields(entry.get(), FLAGS_truncate_data); |
189 | 0 | } |
190 | |
|
191 | 0 | cout << "Entry:\n" << entry->DebugString(); |
192 | 0 | } else if (print_type == PRINT_DECODED) { |
193 | 0 | RETURN_NOT_OK(PrintDecoded(*entry, tablet_schema)); |
194 | 0 | } else if (print_type == PRINT_ID) { |
195 | 0 | PrintIdOnly(*entry); |
196 | 0 | } |
197 | 0 | } |
198 | 0 | if (FLAGS_print_headers && segment->HasFooter()) { |
199 | 0 | cout << "Footer:\n" << segment->footer().DebugString(); |
200 | 0 | } |
201 | |
|
202 | 0 | return Status::OK(); |
203 | 0 | } |
204 | | |
205 | 0 | Status DumpLog(const string& tablet_id, const string& tablet_wal_path) { |
206 | 0 | Env *env = Env::Default(); |
207 | 0 | FsManagerOpts fs_opts; |
208 | 0 | fs_opts.read_only = true; |
209 | 0 | FsManager fs_manager(env, fs_opts); |
210 | |
|
211 | 0 | RETURN_NOT_OK(fs_manager.Open()); |
212 | 0 | std::unique_ptr<LogReader> reader; |
213 | 0 | RETURN_NOT_OK(LogReader::Open(env, |
214 | 0 | scoped_refptr<LogIndex>(), |
215 | 0 | "Log reader: ", |
216 | 0 | tablet_wal_path, |
217 | 0 | scoped_refptr<MetricEntity>(), |
218 | 0 | scoped_refptr<MetricEntity>(), |
219 | 0 | &reader)); |
220 | |
|
221 | 0 | SegmentSequence segments; |
222 | 0 | RETURN_NOT_OK(reader->GetSegmentsSnapshot(&segments)); |
223 | |
|
224 | 0 | for (const scoped_refptr<ReadableLogSegment>& segment : segments) { |
225 | 0 | RETURN_NOT_OK(PrintSegment(segment)); |
226 | 0 | } |
227 | |
|
228 | 0 | return Status::OK(); |
229 | 0 | } |
230 | | |
231 | 0 | Status DumpSegment(const string &segment_path) { |
232 | 0 | Env *env = Env::Default(); |
233 | 0 | auto segment = VERIFY_RESULT(ReadableLogSegment::Open(env, segment_path)); |
234 | 0 | RETURN_NOT_OK(PrintSegment(segment)); |
235 | |
|
236 | 0 | return Status::OK(); |
237 | 0 | } |
238 | | |
239 | 0 | Status FilterLogSegment(const string& segment_path) { |
240 | 0 | Env *const env = Env::Default(); |
241 | |
|
242 | 0 | auto output_wal_dir = FLAGS_output_wal_dir; |
243 | 0 | if (output_wal_dir.empty()) { |
244 | 0 | return STATUS(InvalidArgument, "--output_wal_dir not specified"); |
245 | 0 | } |
246 | | |
247 | 0 | if (env->DirExists(output_wal_dir)) { |
248 | 0 | return STATUS_FORMAT(IllegalState, "Directory '$0' already exists", output_wal_dir); |
249 | 0 | } |
250 | 0 | RETURN_NOT_OK(env->CreateDir(output_wal_dir)); |
251 | 0 | output_wal_dir = VERIFY_RESULT(env->Canonicalize(output_wal_dir)); |
252 | 0 | LOG(INFO) << "Created directory " << output_wal_dir; |
253 | |
|
254 | 0 | auto segment = VERIFY_RESULT(ReadableLogSegment::Open(env, segment_path)); |
255 | 0 | Schema tablet_schema; |
256 | 0 | const auto& segment_header = segment->header(); |
257 | |
|
258 | 0 | RETURN_NOT_OK(SchemaFromPB(segment->header().unused_schema(), &tablet_schema)); |
259 | |
|
260 | 0 | auto log_options = LogOptions(); |
261 | 0 | log_options.env = env; |
262 | | |
263 | | // We have to subtract one here because the Log implementation will add one for the new segment. |
264 | 0 | log_options.initial_active_segment_sequence_number = segment_header.sequence_number() - 1; |
265 | 0 | const auto source_segment_size_bytes = VERIFY_RESULT(env->GetFileSize(segment_path)); |
266 | | // Set the target segment size slightly larger to make sure all the data fits. Also round it up |
267 | | // to the nearest 1 MB. |
268 | 0 | const auto target_segment_size_bytes = ( |
269 | 0 | static_cast<size_t>(source_segment_size_bytes * 1.1) + 1_MB - 1) / 1_MB * 1_MB; |
270 | 0 | log_options.initial_segment_size_bytes = target_segment_size_bytes; |
271 | 0 | log_options.segment_size_bytes = target_segment_size_bytes; |
272 | 0 | LOG(INFO) << "Source segment size " << segment_path |
273 | 0 | << ": " << source_segment_size_bytes << " bytes"; |
274 | 0 | LOG(INFO) << "Target segment size: " |
275 | 0 | << target_segment_size_bytes << " bytes"; |
276 | 0 | std::unique_ptr<ThreadPool> log_thread_pool; |
277 | 0 | RETURN_NOT_OK(ThreadPoolBuilder("log").unlimited_threads().Build(&log_thread_pool)); |
278 | |
|
279 | 0 | const OpId first_op_id_to_omit = { FLAGS_min_op_term_to_omit, FLAGS_min_op_index_to_omit }; |
280 | 0 | const auto first_op_id_to_omit_valid = first_op_id_to_omit.valid(); |
281 | 0 | if (!first_op_id_to_omit_valid && first_op_id_to_omit != OpId::Invalid()) { |
282 | 0 | return STATUS(InvalidArgument, |
283 | 0 | "--min_op_term_to_omit / --min_op_index_to_omit can only be specified together"); |
284 | 0 | } |
285 | | |
286 | 0 | const OpId last_op_id_to_omit = { FLAGS_max_op_term_to_omit, FLAGS_max_op_index_to_omit }; |
287 | 0 | const auto last_op_id_to_omit_valid = last_op_id_to_omit.valid(); |
288 | 0 | if (!last_op_id_to_omit_valid && last_op_id_to_omit != OpId::Invalid()) { |
289 | 0 | return STATUS(InvalidArgument, |
290 | 0 | "--max_op_term_to_omit / --max_op_index_to_omit can only be specified together"); |
291 | 0 | } |
292 | | |
293 | | // If neither first/last OpId to omit are specified, we will just copy all operations to the |
294 | | // output file. This might be useful in some kinds of testing or troubleshooting. |
295 | 0 | const bool omit_something = first_op_id_to_omit_valid || last_op_id_to_omit_valid; |
296 | | |
297 | | // Invalid first/last OpId to omit indicate an open range of OpIds to omit. |
298 | 0 | const bool omit_starting_with_earliest_op_id = omit_something && !first_op_id_to_omit_valid; |
299 | 0 | const bool omit_to_infinite_op_id = omit_something && !last_op_id_to_omit_valid; |
300 | |
|
301 | 0 | if (omit_something) { |
302 | 0 | if (first_op_id_to_omit_valid && last_op_id_to_omit_valid) { |
303 | 0 | LOG(INFO) << "Will omit records between OpIds " << first_op_id_to_omit << " and " |
304 | 0 | << last_op_id_to_omit << " (including the exact OpId matches)."; |
305 | 0 | } else if (first_op_id_to_omit_valid) { |
306 | 0 | LOG(INFO) << "Will omit records with OpId greater than or equal to " << first_op_id_to_omit; |
307 | 0 | } else { |
308 | 0 | LOG(INFO) << "Will omit records with OpId less than or equal to " << last_op_id_to_omit; |
309 | 0 | } |
310 | 0 | } else { |
311 | 0 | LOG(INFO) << "Will include all records of the source WAL in the output"; |
312 | 0 | } |
313 | |
|
314 | 0 | scoped_refptr<Log> log; |
315 | 0 | RETURN_NOT_OK(Log::Open( |
316 | 0 | log_options, |
317 | 0 | segment_header.unused_tablet_id(), |
318 | 0 | output_wal_dir, |
319 | 0 | "log-dump-tool", |
320 | 0 | tablet_schema, |
321 | 0 | segment_header.unused_schema_version(), |
322 | 0 | /* table_metric_entity */ nullptr, |
323 | 0 | /* tablet_metric_entity */ nullptr, |
324 | 0 | log_thread_pool.get(), |
325 | 0 | log_thread_pool.get(), |
326 | 0 | /* cdc_min_replicated_index */ 0, |
327 | 0 | &log)); |
328 | |
|
329 | 0 | auto read_entries = segment->ReadEntries(); |
330 | 0 | RETURN_NOT_OK(read_entries.status); |
331 | 0 | uint64_t num_omitted = 0; |
332 | 0 | uint64_t num_included = 0; |
333 | 0 | CHECK_EQ(read_entries.entries.size(), read_entries.entry_metadata.size()); |
334 | 0 | for (size_t i = 0; i < read_entries.entries.size(); ++i) { |
335 | 0 | auto& entry_ptr = read_entries.entries[i]; |
336 | 0 | const OpId op_id = OpId::FromPB(entry_ptr->replicate().id()); |
337 | 0 | if (omit_something && |
338 | 0 | (omit_starting_with_earliest_op_id || |
339 | 0 | (first_op_id_to_omit_valid && op_id >= first_op_id_to_omit)) && |
340 | 0 | (omit_to_infinite_op_id || |
341 | 0 | (last_op_id_to_omit_valid && op_id <= last_op_id_to_omit))) { |
342 | 0 | num_omitted++; |
343 | 0 | continue; |
344 | 0 | } |
345 | 0 | RETURN_NOT_OK(log->Append( |
346 | 0 | entry_ptr.release(), read_entries.entry_metadata[i], |
347 | 0 | /* skip_wal_rewrite */ false)); |
348 | 0 | num_included++; |
349 | 0 | } |
350 | 0 | LOG(INFO) << "Included " << num_included << " entries, omitted " << num_omitted << " entries"; |
351 | 0 | RETURN_NOT_OK(log->Close()); |
352 | |
|
353 | 0 | auto resulting_files = VERIFY_RESULT( |
354 | 0 | env->GetChildren(output_wal_dir, ExcludeDots::kTrue)); |
355 | 0 | sort(resulting_files.begin(), resulting_files.end()); |
356 | 0 | for (const auto& resulting_file_name : resulting_files) { |
357 | 0 | LOG(INFO) << "Generated file " << JoinPathSegments(output_wal_dir, resulting_file_name); |
358 | 0 | } |
359 | |
|
360 | 0 | return Status::OK(); |
361 | 0 | } |
362 | | |
363 | | } // namespace log |
364 | | } // namespace yb |
365 | | |
366 | | int main(int argc, char **argv) { |
367 | | yb::ParseCommandLineFlags(&argc, &argv, true); |
368 | | using yb::Status; |
369 | | |
370 | | if (argc != 2 && argc != 3) { |
371 | | std::cerr << "usage: " << argv[0] |
372 | | << " --fs_data_dirs <dirs>" |
373 | | << " {<tablet_name> <log path>} | <log segment path>" |
374 | | << " [--filter_log_segment --output_wal_dir <dest_dir>]" |
375 | | << std::endl; |
376 | | return 1; |
377 | | } |
378 | | |
379 | | yb::Status status; |
380 | | yb::InitGoogleLoggingSafeBasic(argv[0]); |
381 | | if (argc == 2) { |
382 | | if (FLAGS_filter_log_segment) { |
383 | | status = yb::log::FilterLogSegment(argv[1]); |
384 | | } else { |
385 | | status = yb::log::DumpSegment(argv[1]); |
386 | | } |
387 | | } else { |
388 | | if (FLAGS_filter_log_segment) { |
389 | | status = STATUS( |
390 | | InvalidArgument, |
391 | | "--filter_log_segment is only allowed when a single segment file is specified"); |
392 | | } else { |
393 | | status = yb::log::DumpLog(argv[1], argv[2]); |
394 | | } |
395 | | } |
396 | | |
397 | | if (status.ok()) { |
398 | | return 0; |
399 | | } |
400 | | std::cerr << "Error: " << status.ToString() << std::endl; |
401 | | return 1; |
402 | | } |