/Users/deen/code/yugabyte-db/src/yb/tablet/tablet_bootstrap.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | // |
18 | | // The following only applies to changes made to this file as part of YugaByte development. |
19 | | // |
20 | | // Portions Copyright (c) YugaByte, Inc. |
21 | | // |
22 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
23 | | // in compliance with the License. You may obtain a copy of the License at |
24 | | // |
25 | | // http://www.apache.org/licenses/LICENSE-2.0 |
26 | | // |
27 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
28 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
29 | | // or implied. See the License for the specific language governing permissions and limitations |
30 | | // under the License. |
31 | | // |
32 | | |
33 | | #include "yb/tablet/tablet_bootstrap.h" |
34 | | |
35 | | #include <map> |
36 | | #include <set> |
37 | | |
38 | | #include <boost/preprocessor/cat.hpp> |
39 | | #include <boost/preprocessor/stringize.hpp> |
40 | | |
41 | | #include "yb/common/common_fwd.h" |
42 | | #include "yb/common/schema.h" |
43 | | #include "yb/common/wire_protocol.h" |
44 | | |
45 | | #include "yb/consensus/consensus.h" |
46 | | #include "yb/consensus/consensus.pb.h" |
47 | | #include "yb/consensus/consensus_meta.h" |
48 | | #include "yb/consensus/consensus_util.h" |
49 | | #include "yb/consensus/log.h" |
50 | | #include "yb/consensus/log_anchor_registry.h" |
51 | | #include "yb/consensus/log_index.h" |
52 | | #include "yb/consensus/log_reader.h" |
53 | | #include "yb/consensus/log_util.h" |
54 | | #include "yb/consensus/opid_util.h" |
55 | | #include "yb/consensus/retryable_requests.h" |
56 | | |
57 | | #include "yb/docdb/consensus_frontier.h" |
58 | | #include "yb/docdb/value_type.h" |
59 | | |
60 | | #include "yb/gutil/casts.h" |
61 | | #include "yb/gutil/ref_counted.h" |
62 | | #include "yb/gutil/strings/substitute.h" |
63 | | #include "yb/gutil/thread_annotations.h" |
64 | | |
65 | | #include "yb/rpc/rpc_fwd.h" |
66 | | |
67 | | #include "yb/tablet/tablet_fwd.h" |
68 | | #include "yb/tablet/mvcc.h" |
69 | | #include "yb/tablet/operations/change_metadata_operation.h" |
70 | | #include "yb/tablet/operations/history_cutoff_operation.h" |
71 | | #include "yb/tablet/operations/snapshot_operation.h" |
72 | | #include "yb/tablet/operations/split_operation.h" |
73 | | #include "yb/tablet/operations/truncate_operation.h" |
74 | | #include "yb/tablet/operations/update_txn_operation.h" |
75 | | #include "yb/tablet/operations/write_operation.h" |
76 | | #include "yb/tablet/snapshot_coordinator.h" |
77 | | #include "yb/tablet/tablet.h" |
78 | | #include "yb/tablet/tablet_metadata.h" |
79 | | #include "yb/tablet/tablet_options.h" |
80 | | #include "yb/tablet/tablet_snapshots.h" |
81 | | #include "yb/tablet/tablet_splitter.h" |
82 | | #include "yb/tablet/transaction_coordinator.h" |
83 | | #include "yb/tablet/transaction_participant.h" |
84 | | |
85 | | #include "yb/tserver/backup.pb.h" |
86 | | |
87 | | #include "yb/util/atomic.h" |
88 | | #include "yb/util/env_util.h" |
89 | | #include "yb/util/fault_injection.h" |
90 | | #include "yb/util/flag_tags.h" |
91 | | #include "yb/util/format.h" |
92 | | #include "yb/util/logging.h" |
93 | | #include "yb/util/metric_entity.h" |
94 | | #include "yb/util/monotime.h" |
95 | | #include "yb/util/opid.h" |
96 | | #include "yb/util/scope_exit.h" |
97 | | #include "yb/util/status.h" |
98 | | #include "yb/util/status_format.h" |
99 | | #include "yb/util/stopwatch.h" |
100 | | |
101 | | DEFINE_bool(skip_remove_old_recovery_dir, false, |
102 | | "Skip removing WAL recovery dir after startup. (useful for debugging)"); |
103 | | TAG_FLAG(skip_remove_old_recovery_dir, hidden); |
104 | | |
105 | | DEFINE_bool(skip_wal_rewrite, true, |
106 | | "Skip rewriting WAL files during bootstrap."); |
107 | | TAG_FLAG(skip_wal_rewrite, experimental); |
108 | | TAG_FLAG(skip_wal_rewrite, runtime); |
109 | | |
110 | | DEFINE_test_flag(double, fault_crash_during_log_replay, 0.0, |
111 | | "Fraction of the time when the tablet will crash immediately " |
112 | | "after processing a log entry during log replay."); |
113 | | |
114 | | DECLARE_uint64(max_clock_sync_error_usec); |
115 | | |
116 | | DEFINE_bool(force_recover_flushed_frontier, false, |
117 | | "Could be used to ignore the flushed frontier metadata from RocksDB manifest and " |
118 | | "recover it from the log instead."); |
119 | | TAG_FLAG(force_recover_flushed_frontier, hidden); |
120 | | TAG_FLAG(force_recover_flushed_frontier, advanced); |
121 | | |
122 | | DEFINE_bool(skip_flushed_entries, true, |
123 | | "Only replay WAL entries that are not flushed to RocksDB or within the retryable " |
124 | | "request timeout."); |
125 | | |
126 | | DECLARE_int32(retryable_request_timeout_secs); |
127 | | |
128 | | DEFINE_uint64(transaction_status_tablet_log_segment_size_bytes, 4_MB, |
129 | | "The segment size for transaction status tablet log roll-overs, in bytes."); |
130 | | DEFINE_test_flag(int32, tablet_bootstrap_delay_ms, 0, |
131 | | "Time (in ms) to delay tablet bootstrap by."); |
132 | | |
133 | | DEFINE_test_flag(bool, dump_docdb_before_tablet_bootstrap, false, |
134 | | "Dump the contents of DocDB before tablet bootstrap. Should only be used when " |
135 | | "data is small.") |
136 | | |
137 | | DEFINE_test_flag(bool, dump_docdb_after_tablet_bootstrap, false, |
138 | | "Dump the contents of DocDB after tablet bootstrap. Should only be used when " |
139 | | "data is small.") |
140 | | |
141 | | namespace yb { |
142 | | namespace tablet { |
143 | | |
144 | | using namespace std::literals; // NOLINT |
145 | | using namespace std::placeholders; |
146 | | using std::shared_ptr; |
147 | | |
148 | | using log::Log; |
149 | | using log::LogEntryPB; |
150 | | using log::LogOptions; |
151 | | using log::LogReader; |
152 | | using log::ReadableLogSegment; |
153 | | using log::LogEntryMetadata; |
154 | | using log::LogIndex; |
155 | | using log::CreateNewSegment; |
156 | | using log::SegmentSequence; |
157 | | using consensus::ChangeConfigRecordPB; |
158 | | using consensus::RaftConfigPB; |
159 | | using consensus::ConsensusBootstrapInfo; |
160 | | using consensus::ConsensusMetadata; |
161 | | using consensus::MinimumOpId; |
162 | | using consensus::OpIdEquals; |
163 | | using consensus::OpIdToString; |
164 | | using consensus::ReplicateMsg; |
165 | | using consensus::MakeOpIdPB; |
166 | | using strings::Substitute; |
167 | | using tserver::WriteRequestPB; |
168 | | using tserver::TabletSnapshotOpRequestPB; |
169 | | |
170 | | static string DebugInfo(const string& tablet_id, |
171 | | uint64_t segment_seqno, |
172 | | size_t entry_idx, |
173 | | const string& segment_path, |
174 | 0 | const LogEntryPB* entry) { |
175 | | // Truncate the debug string to a reasonable length for logging. Otherwise, glog will truncate |
176 | | // for us and we may miss important information which came after this long string. |
177 | 0 | string debug_str = entry ? entry->ShortDebugString() : "<nullptr>"s; |
178 | 0 | if (debug_str.size() > 500) { |
179 | 0 | debug_str.resize(500); |
180 | 0 | debug_str.append("..."); |
181 | 0 | } |
182 | 0 | return Substitute("Debug Info: Error playing entry $0 of segment $1 of tablet $2. " |
183 | 0 | "Segment path: $3. Entry: $4", entry_idx, segment_seqno, tablet_id, |
184 | 0 | segment_path, debug_str); |
185 | 0 | } |
186 | | |
187 | | // ================================================================================================ |
188 | | // Class ReplayState. |
189 | | // ================================================================================================ |
190 | | |
191 | | struct Entry { |
192 | | std::unique_ptr<log::LogEntryPB> entry; |
193 | | RestartSafeCoarseTimePoint entry_time; |
194 | | |
195 | 0 | std::string ToString() const { |
196 | 0 | return Format("{ entry: $0 entry_time: $1 }", entry, entry_time); |
197 | 0 | } |
198 | | }; |
199 | | |
200 | | typedef std::map<int64_t, Entry> OpIndexToEntryMap; |
201 | | |
202 | | // State kept during replay. |
203 | | struct ReplayState { |
204 | | ReplayState( |
205 | | const DocDbOpIds& op_ids_, |
206 | | const std::string& log_prefix_) |
207 | | : stored_op_ids(op_ids_), |
208 | 2.79k | log_prefix(log_prefix_) { |
209 | 2.79k | } |
210 | | |
211 | | // Return true if 'b' is allowed to immediately follow 'a' in the log. |
212 | | static bool IsValidSequence(const OpId& a, const OpId& b); |
213 | | |
214 | | // Return a Corruption status if 'id' seems to be out-of-sequence in the log. |
215 | | Status CheckSequentialReplicateId(const consensus::ReplicateMsg& msg); |
216 | | |
217 | | void UpdateCommittedOpId(const OpId& id); |
218 | | |
219 | | // half_limit is half the limit on the number of entries added |
220 | | void AddEntriesToStrings( |
221 | | const OpIndexToEntryMap& entries, std::vector<std::string>* strings, size_t half_limit) const; |
222 | | |
223 | | // half_limit is half the limit on the number of entries to be dumped |
224 | | void DumpReplayStateToStrings(std::vector<std::string>* strings, int half_limit) const; |
225 | | |
226 | | bool CanApply(log::LogEntryPB* entry); |
227 | | |
228 | 2.79k | const std::string& LogPrefix() const { return log_prefix; } |
229 | | |
230 | | void UpdateCommittedFromStored(); |
231 | | |
232 | | // Determines the lowest possible OpId we have to replay. This is based on OpIds of operations |
233 | | // flushed to regular and intents RocksDBs. Also logs some diagnostics. |
234 | | OpId GetLowestOpIdToReplay(bool has_intents_db, const char* extra_log_prefix) const; |
235 | | |
236 | | // ---------------------------------------------------------------------------------------------- |
237 | | // ReplayState member fields |
238 | | // ---------------------------------------------------------------------------------------------- |
239 | | |
240 | | // The last replicate message's ID. |
241 | | OpId prev_op_id; |
242 | | |
243 | | // The last operation known to be committed. All other operations with lower IDs are also |
244 | | // committed. |
245 | | OpId committed_op_id; |
246 | | |
247 | | // All REPLICATE entries that have not been applied to RocksDB yet. We decide what entries are |
248 | | // safe to apply and delete from this map based on the commit index included into each REPLICATE |
249 | | // message. |
250 | | // |
251 | | // The key in this map is the Raft index. |
252 | | OpIndexToEntryMap pending_replicates; |
253 | | |
254 | | // ---------------------------------------------------------------------------------------------- |
255 | | // State specific to RocksDB-backed tables (not transaction status table) |
256 | | |
257 | | const DocDbOpIds stored_op_ids; |
258 | | |
259 | | // Total number of log entries applied to RocksDB. |
260 | | int64_t num_entries_applied_to_rocksdb = 0; |
261 | | |
262 | | // If we encounter the last entry flushed to a RocksDB SSTable (as identified by the max |
263 | | // persistent sequence number), we remember the hybrid time of that entry in this field. |
264 | | // We guarantee that we'll either see that entry or a latter entry we know is committed into Raft |
265 | | // during log replay. This is crucial for properly setting safe time at bootstrap. |
266 | | HybridTime max_committed_hybrid_time = HybridTime::kMin; |
267 | | |
268 | | const std::string log_prefix; |
269 | | }; |
270 | | |
271 | 2.79k | void ReplayState::UpdateCommittedFromStored() { |
272 | 2.79k | if (stored_op_ids.regular > committed_op_id) { |
273 | 206 | committed_op_id = stored_op_ids.regular; |
274 | 206 | } |
275 | | |
276 | 2.79k | if (stored_op_ids.intents > committed_op_id) { |
277 | 0 | committed_op_id = stored_op_ids.intents; |
278 | 0 | } |
279 | 2.79k | } |
280 | | |
281 | | // Return true if 'b' is allowed to immediately follow 'a' in the log. |
282 | 1.37M | bool ReplayState::IsValidSequence(const OpId& a, const OpId& b) { |
283 | 1.37M | if (a.term == 0 && a.index == 02.79k ) { |
284 | | // Not initialized - can start with any opid. |
285 | 2.79k | return true; |
286 | 2.79k | } |
287 | | |
288 | | // Within the same term, we should never skip entries. |
289 | | // We can, however go backwards (see KUDU-783 for an example) |
290 | 1.36M | if (b.term == a.term && b.index > a.index + 11.35M ) { |
291 | 0 | return false; |
292 | 0 | } |
293 | | |
294 | | // TODO: check that the term does not decrease. |
295 | | // https://github.com/yugabyte/yugabyte-db/issues/5115 |
296 | | |
297 | 1.36M | return true; |
298 | 1.36M | } |
299 | | |
300 | | // Return a Corruption status if 'id' seems to be out-of-sequence in the log. |
301 | 1.37M | Status ReplayState::CheckSequentialReplicateId(const ReplicateMsg& msg) { |
302 | 1.37M | SCHECK(msg.has_id(), Corruption, "A REPLICATE message must have an id"); |
303 | 1.37M | const auto msg_op_id = OpId::FromPB(msg.id()); |
304 | 1.37M | if (PREDICT_FALSE(!IsValidSequence(prev_op_id, msg_op_id))) { |
305 | 0 | string op_desc = Format( |
306 | 0 | "$0 REPLICATE (Type: $1)", msg_op_id, OperationType_Name(msg.op_type())); |
307 | 0 | return STATUS_FORMAT(Corruption, |
308 | 0 | "Unexpected op id following op id $0. Operation: $1", |
309 | 0 | prev_op_id, op_desc); |
310 | 0 | } |
311 | | |
312 | 1.37M | prev_op_id = msg_op_id; |
313 | 1.37M | return Status::OK(); |
314 | 1.37M | } |
315 | | |
316 | 1.37M | void ReplayState::UpdateCommittedOpId(const OpId& id) { |
317 | 1.37M | if (committed_op_id < id) { |
318 | 1.06M | VLOG_WITH_PREFIX4 (1) << "Updating committed op id to " << id4 ; |
319 | 1.06M | committed_op_id = id; |
320 | 1.06M | } |
321 | 1.37M | } |
322 | | |
323 | | void ReplayState::AddEntriesToStrings(const OpIndexToEntryMap& entries, |
324 | | std::vector<std::string>* strings, |
325 | 394 | size_t half_limit) const { |
326 | 394 | const auto n = entries.size(); |
327 | 394 | const bool overflow = n > 2 * half_limit; |
328 | 394 | size_t index = 0; |
329 | 1.22k | for (const auto& entry : entries) { |
330 | 1.22k | if (!overflow || (0 index < half_limit0 || index >= n - half_limit0 )) { |
331 | 1.22k | const auto& replicate = entry.second.entry.get()->replicate(); |
332 | 1.22k | strings->push_back(Format( |
333 | 1.22k | " [$0] op_id: $1 hybrid_time: $2 op_type: $3 committed_op_id: $4", |
334 | 1.22k | index + 1, |
335 | 1.22k | OpId::FromPB(replicate.id()), |
336 | 1.22k | replicate.hybrid_time(), |
337 | 1.22k | replicate.op_type(), |
338 | 1.22k | OpId::FromPB(replicate.committed_op_id()))); |
339 | 1.22k | } |
340 | 1.22k | if (overflow && index == half_limit - 10 ) { |
341 | 0 | strings->push_back(Format("($0 lines skipped)", n - 2 * half_limit)); |
342 | 0 | } |
343 | 1.22k | index++; |
344 | 1.22k | } |
345 | 394 | } |
346 | | |
347 | | void ReplayState::DumpReplayStateToStrings( |
348 | | std::vector<std::string>* strings, |
349 | 2.79k | int half_limit) const { |
350 | 2.79k | strings->push_back(Format( |
351 | 2.79k | "ReplayState: " |
352 | 2.79k | "Previous OpId: $0, " |
353 | 2.79k | "Committed OpId: $1, " |
354 | 2.79k | "Pending Replicates: $2, " |
355 | 2.79k | "Flushed Regular: $3, " |
356 | 2.79k | "Flushed Intents: $4", |
357 | 2.79k | prev_op_id, |
358 | 2.79k | committed_op_id, |
359 | 2.79k | pending_replicates.size(), |
360 | 2.79k | stored_op_ids.regular, |
361 | 2.79k | stored_op_ids.intents)); |
362 | 2.79k | if (num_entries_applied_to_rocksdb > 0) { |
363 | 2.79k | strings->push_back(Substitute("Log entries applied to RocksDB: $0", |
364 | 2.79k | num_entries_applied_to_rocksdb)); |
365 | 2.79k | } |
366 | 2.79k | if (!pending_replicates.empty()) { |
367 | 394 | strings->push_back(Substitute("Dumping REPLICATES ($0 items):", pending_replicates.size())); |
368 | 394 | AddEntriesToStrings(pending_replicates, strings, half_limit); |
369 | 394 | } |
370 | 2.79k | } |
371 | | |
372 | 2.64M | bool ReplayState::CanApply(LogEntryPB* entry) { |
373 | 2.64M | return OpId::FromPB(entry->replicate().id()) <= committed_op_id; |
374 | 2.64M | } |
375 | | |
376 | 2.79k | OpId ReplayState::GetLowestOpIdToReplay(bool has_intents_db, const char* extra_log_prefix) const { |
377 | 2.79k | const auto op_id_replay_lowest = |
378 | 2.79k | has_intents_db ? std::min(stored_op_ids.regular, stored_op_ids.intents)422 |
379 | 2.79k | : stored_op_ids.regular2.37k ; |
380 | 2.79k | LOG_WITH_PREFIX(INFO) |
381 | 2.79k | << extra_log_prefix |
382 | 2.79k | << "op_id_replay_lowest=" << op_id_replay_lowest |
383 | 2.79k | << " (regular_op_id=" << stored_op_ids.regular |
384 | 2.79k | << ", intents_op_id=" << stored_op_ids.intents |
385 | 2.79k | << ", has_intents_db=" << has_intents_db << ")"; |
386 | 2.79k | return op_id_replay_lowest; |
387 | 2.79k | } |
388 | | |
389 | | // ================================================================================================ |
390 | | // Class TabletBootstrap. |
391 | | // ================================================================================================ |
392 | | |
393 | | namespace { |
394 | | |
395 | | struct ReplayDecision { |
396 | | bool should_replay = false; |
397 | | |
398 | | // This is true for transaction update operations that have already been applied to the regular |
399 | | // RocksDB but not to the intents RocksDB. |
400 | | AlreadyAppliedToRegularDB already_applied_to_regular_db = AlreadyAppliedToRegularDB::kFalse; |
401 | | |
402 | 0 | std::string ToString() const { |
403 | 0 | return YB_STRUCT_TO_STRING(should_replay, already_applied_to_regular_db); |
404 | 0 | } |
405 | | }; |
406 | | |
407 | | ReplayDecision ShouldReplayOperation( |
408 | | consensus::OperationType op_type, |
409 | | const int64_t index, |
410 | | const int64_t regular_flushed_index, |
411 | | const int64_t intents_flushed_index, |
412 | | TransactionStatus txn_status, |
413 | 1.30M | bool write_op_has_transaction) { |
414 | | // In most cases we assume that intents_flushed_index <= regular_flushed_index but here we are |
415 | | // trying to be resilient to violations of that assumption. |
416 | 1.30M | if (index <= std::min(regular_flushed_index, intents_flushed_index)) { |
417 | | // Never replay anyting that is flushed to both regular and intents RocksDBs in a transactional |
418 | | // table. |
419 | 18.4E | VLOG_WITH_FUNC(3) << "index: " << index << " " |
420 | 18.4E | << "regular_flushed_index: " << regular_flushed_index |
421 | 18.4E | << " intents_flushed_index: " << intents_flushed_index; |
422 | 145k | return {false}; |
423 | 145k | } |
424 | | |
425 | 1.16M | if (op_type == consensus::UPDATE_TRANSACTION_OP) { |
426 | 33.7k | if (txn_status == TransactionStatus::APPLYING && |
427 | 33.7k | intents_flushed_index < index31.0k && index <= regular_flushed_index31.0k ) { |
428 | | // Intents were applied/flushed to regular RocksDB, but not flushed into the intents RocksDB. |
429 | 4.81k | VLOG_WITH_FUNC0 (3) << "index: " << index << " " |
430 | 0 | << "regular_flushed_index: " << regular_flushed_index |
431 | 0 | << " intents_flushed_index: " << intents_flushed_index; |
432 | 4.81k | return {true, AlreadyAppliedToRegularDB::kTrue}; |
433 | 4.81k | } |
434 | | // For other types of transaction updates, we ignore them if they have been flushed to the |
435 | | // regular RocksDB. |
436 | 28.9k | VLOG_WITH_FUNC0 (3) << "index: " << index << " > " |
437 | 0 | << "regular_flushed_index: " << regular_flushed_index; |
438 | 28.9k | return {index > regular_flushed_index}; |
439 | 33.7k | } |
440 | | |
441 | 1.12M | if (op_type == consensus::WRITE_OP && write_op_has_transaction1.11M ) { |
442 | | // Write intents that have not been flushed into the intents DB. |
443 | 46.9k | VLOG_WITH_FUNC0 (3) << "index: " << index << " > " |
444 | 0 | << "intents_flushed_index: " << intents_flushed_index; |
445 | 46.9k | return {index > intents_flushed_index}; |
446 | 46.9k | } |
447 | | |
448 | 18.4E | VLOG_WITH_FUNC(3) << "index: " << index << " > " |
449 | 18.4E | << "regular_flushed_index: " << regular_flushed_index; |
450 | 1.08M | return {index > regular_flushed_index}; |
451 | 1.12M | } |
452 | | |
453 | 1.30M | bool WriteOpHasTransaction(const ReplicateMsg& replicate) { |
454 | 1.30M | if (!replicate.has_write()) { |
455 | 17.2k | return false; |
456 | 17.2k | } |
457 | 1.29M | const auto& write_request = replicate.write(); |
458 | 1.29M | if (!write_request.has_write_batch()) { |
459 | 34.7k | return false; |
460 | 34.7k | } |
461 | 1.25M | const auto& write_batch = write_request.write_batch(); |
462 | 1.25M | if (write_batch.has_transaction()) { |
463 | 53.2k | return true; |
464 | 53.2k | } |
465 | 7.15M | for (const auto& pair : write_batch.write_pairs())1.20M { |
466 | 7.15M | if (!pair.key().empty()7.15M && pair.key()[0] == docdb::ValueTypeAsChar::kExternalTransactionId) { |
467 | 0 | return true; |
468 | 0 | } |
469 | 7.15M | } |
470 | 1.20M | return false; |
471 | 1.20M | } |
472 | | |
473 | | } // anonymous namespace |
474 | | |
475 | | YB_STRONGLY_TYPED_BOOL(NeedsRecovery); |
476 | | |
477 | | // Bootstraps an existing tablet by opening the metadata from disk, and rebuilding soft state by |
478 | | // playing log segments. A bootstrapped tablet can then be added to an existing consensus |
479 | | // configuration as a LEARNER, which will bring its state up to date with the rest of the consensus |
480 | | // configuration, or it can start serving the data itself, after it has been appointed LEADER of |
481 | | // that particular consensus configuration. |
482 | | // |
483 | | // NOTE: this does not handle pulling data from other replicas in the cluster. That is handled by |
484 | | // the 'RemoteBootstrap' classes, which copy blocks and metadata locally before invoking this local |
485 | | // bootstrap functionality. |
486 | | // |
487 | | // This class is not thread-safe. |
488 | | class TabletBootstrap { |
489 | | public: |
490 | | explicit TabletBootstrap(const BootstrapTabletData& data) |
491 | | : data_(data), |
492 | | meta_(data.tablet_init_data.metadata), |
493 | | mem_tracker_(data.tablet_init_data.parent_mem_tracker), |
494 | | listener_(data.listener), |
495 | | append_pool_(data.append_pool), |
496 | | allocation_pool_(data.allocation_pool), |
497 | | skip_wal_rewrite_(FLAGS_skip_wal_rewrite) , |
498 | 150k | test_hooks_(data.test_hooks) { |
499 | 150k | } |
500 | | |
501 | 150k | ~TabletBootstrap() {} |
502 | | |
503 | | CHECKED_STATUS Bootstrap( |
504 | | TabletPtr* rebuilt_tablet, |
505 | | scoped_refptr<log::Log>* rebuilt_log, |
506 | 150k | consensus::ConsensusBootstrapInfo* consensus_info) { |
507 | 150k | const string tablet_id = meta_->raft_group_id(); |
508 | | |
509 | | // Replay requires a valid Consensus metadata file to exist in order to compare the committed |
510 | | // consensus configuration seqno with the log entries and also to persist committed but |
511 | | // unpersisted changes. |
512 | 150k | RETURN_NOT_OK_PREPEND(ConsensusMetadata::Load(meta_->fs_manager(), tablet_id, |
513 | 150k | meta_->fs_manager()->uuid(), &cmeta_), |
514 | 150k | "Unable to load Consensus metadata"); |
515 | | |
516 | | // Make sure we don't try to locally bootstrap a tablet that was in the middle of a remote |
517 | | // bootstrap. It's likely that not all files were copied over successfully. |
518 | 150k | TabletDataState tablet_data_state = meta_->tablet_data_state(); |
519 | 150k | if (!CanServeTabletData(tablet_data_state)) { |
520 | 1 | return STATUS(Corruption, "Unable to locally bootstrap tablet " + tablet_id + ": " + |
521 | 1 | "RaftGroupMetadata bootstrap state is " + |
522 | 1 | TabletDataState_Name(tablet_data_state)); |
523 | 1 | } |
524 | | |
525 | 150k | listener_->StatusMessage("Bootstrap starting."); |
526 | | |
527 | 150k | if (VLOG_IS_ON(1)) { |
528 | 0 | RaftGroupReplicaSuperBlockPB super_block; |
529 | 0 | meta_->ToSuperBlock(&super_block); |
530 | 0 | VLOG_WITH_PREFIX(1) << "Tablet Metadata: " << super_block.DebugString(); |
531 | 0 | } |
532 | | |
533 | 150k | const bool has_blocks = VERIFY_RESULT150k (OpenTablet());150k |
534 | | |
535 | 150k | if (FLAGS_TEST_dump_docdb_before_tablet_bootstrap) { |
536 | 0 | LOG_WITH_PREFIX(INFO) << "DEBUG: DocDB dump before tablet bootstrap:"; |
537 | 0 | tablet_->TEST_DocDBDumpToLog(IncludeIntents::kTrue); |
538 | 0 | } |
539 | | |
540 | 150k | const auto needs_recovery = VERIFY_RESULT(PrepareToReplay()); |
541 | 150k | if (needs_recovery && !skip_wal_rewrite_2.79k ) { |
542 | 0 | RETURN_NOT_OK(OpenLogReader()); |
543 | 0 | } |
544 | | |
545 | | // This is a new tablet, nothing left to do. |
546 | 150k | if (!has_blocks && !needs_recovery149k ) { |
547 | 147k | LOG_WITH_PREFIX(INFO) << "No blocks or log segments found. Creating new log."; |
548 | 147k | RETURN_NOT_OK_PREPEND(OpenNewLog(CreateNewSegment::kTrue), "Failed to open new log"); |
549 | 147k | RETURN_NOT_OK(FinishBootstrap("No bootstrap required, opened a new log", |
550 | 147k | rebuilt_log, |
551 | 147k | rebuilt_tablet)); |
552 | 147k | consensus_info->last_id = MinimumOpId(); |
553 | 147k | consensus_info->last_committed_id = MinimumOpId(); |
554 | 147k | return Status::OK(); |
555 | 147k | } |
556 | | |
557 | | // Only sleep if this isn't a new tablet, since we only want to delay on restart when testing. |
558 | 2.48k | if (PREDICT_FALSE(FLAGS_TEST_tablet_bootstrap_delay_ms > 0)) { |
559 | 0 | SleepFor(MonoDelta::FromMilliseconds(FLAGS_TEST_tablet_bootstrap_delay_ms)); |
560 | 0 | } |
561 | | |
562 | | // If there were blocks, there must be segments to replay. This is required by Raft, since we |
563 | | // always need to know the term and index of the last logged op in order to vote, know how to |
564 | | // respond to AppendEntries(), etc. |
565 | 2.48k | if (has_blocks && !needs_recovery1.24k ) { |
566 | 0 | return STATUS(IllegalState, Substitute("Tablet $0: Found rowsets but no log " |
567 | 0 | "segments could be found.", |
568 | 0 | tablet_id)); |
569 | 0 | } |
570 | | |
571 | 2.48k | RETURN_NOT_OK_PREPEND(PlaySegments(consensus_info), "Failed log replay. Reason"); |
572 | | |
573 | 2.48k | if (cmeta_->current_term() < consensus_info->last_id.term()) { |
574 | 407 | cmeta_->set_current_term(consensus_info->last_id.term()); |
575 | 407 | } |
576 | | |
577 | | // Flush the consensus metadata once at the end to persist our changes, if any. |
578 | 2.48k | RETURN_NOT_OK(cmeta_->Flush()); |
579 | | |
580 | 2.48k | RETURN_NOT_OK(RemoveRecoveryDir()); |
581 | | |
582 | 2.48k | if (FLAGS_force_recover_flushed_frontier) { |
583 | 0 | RETURN_NOT_OK(tablet_->Flush(FlushMode::kSync)); |
584 | 0 | docdb::ConsensusFrontier new_consensus_frontier; |
585 | 0 | new_consensus_frontier.set_op_id(consensus_info->last_committed_id); |
586 | 0 | new_consensus_frontier.set_hybrid_time(tablet_->mvcc_manager()->LastReplicatedHybridTime()); |
587 | | // We don't attempt to recover the history cutoff here because it will be recovered |
588 | | // automatically on the first compaction, and this is a special mode for manual |
589 | | // troubleshooting. |
590 | 0 | LOG_WITH_PREFIX(WARNING) |
591 | 0 | << "--force_recover_flushed_frontier specified, forcefully setting " |
592 | 0 | << "flushed frontier after bootstrap: " << new_consensus_frontier.ToString(); |
593 | 0 | RETURN_NOT_OK(tablet_->ModifyFlushedFrontier( |
594 | 0 | new_consensus_frontier, rocksdb::FrontierModificationMode::kForce)); |
595 | 0 | } |
596 | | |
597 | 2.48k | RETURN_NOT_OK(FinishBootstrap("Bootstrap complete.", rebuilt_log, rebuilt_tablet)); |
598 | | |
599 | 2.48k | return Status::OK(); |
600 | 2.48k | } |
601 | | |
602 | | private: |
603 | | // Finishes bootstrap, setting 'rebuilt_log' and 'rebuilt_tablet'. |
604 | | CHECKED_STATUS FinishBootstrap( |
605 | | const std::string& message, |
606 | | scoped_refptr<log::Log>* rebuilt_log, |
607 | 150k | TabletPtr* rebuilt_tablet) { |
608 | 150k | tablet_->MarkFinishedBootstrapping(); |
609 | 150k | listener_->StatusMessage(message); |
610 | 150k | if (FLAGS_TEST_dump_docdb_after_tablet_bootstrap) { |
611 | 0 | LOG_WITH_PREFIX(INFO) << "DEBUG: DocDB debug dump after tablet bootstrap:\n"; |
612 | 0 | tablet_->TEST_DocDBDumpToLog(IncludeIntents::kTrue); |
613 | 0 | } |
614 | | |
615 | 150k | *rebuilt_tablet = std::move(tablet_); |
616 | 150k | RETURN_NOT_OK(log_->EnsureInitialNewSegmentAllocated()); |
617 | 150k | rebuilt_log->swap(log_); |
618 | 150k | return Status::OK(); |
619 | 150k | } |
620 | | |
621 | | // Sets result to true if there was any data on disk for this tablet. |
622 | 150k | Result<bool> OpenTablet() { |
623 | 150k | CleanupSnapshots(); |
624 | | |
625 | 150k | auto tablet = std::make_shared<Tablet>(data_.tablet_init_data); |
626 | | // Doing nothing for now except opening a tablet locally. |
627 | 150k | LOG_TIMING_PREFIX(INFO, LogPrefix(), "opening tablet") { |
628 | 150k | RETURN_NOT_OK(tablet->Open()); |
629 | 150k | } |
630 | | |
631 | | // In theory, an error can happen in case of tablet Shutdown or in RocksDB object replacement |
632 | | // operation like RestoreSnapshot or Truncate. However, those operations can't really be |
633 | | // happening concurrently as we haven't opened the tablet yet. |
634 | 150k | const bool has_ss_tables = VERIFY_RESULT(tablet->HasSSTables()); |
635 | | |
636 | 0 | tablet_ = std::move(tablet); |
637 | 150k | return has_ss_tables; |
638 | 150k | } |
639 | | |
640 | | // Checks if a previous log recovery directory exists. If so, it deletes any files in the log dir |
641 | | // and sets 'needs_recovery' to true, meaning that the previous recovery attempt should be retried |
642 | | // from the recovery dir. |
643 | | // |
644 | | // Otherwise, if there is a log directory with log files in it, renames that log dir to the log |
645 | | // recovery dir and creates a new, empty log dir so that log replay can proceed. 'needs_recovery' |
646 | | // is also returned as true in this case. |
647 | | // |
648 | | // If no log segments are found, 'needs_recovery' is set to false. |
649 | 150k | Result<NeedsRecovery> PrepareToReplay() { |
650 | 150k | const string& log_dir = tablet_->metadata()->wal_dir(); |
651 | | |
652 | | // If the recovery directory exists, then we crashed mid-recovery. Throw away any logs from the |
653 | | // previous recovery attempt and restart the log replay process from the beginning using the |
654 | | // same recovery dir as last time. |
655 | 150k | const string recovery_path = FsManager::GetTabletWalRecoveryDir(log_dir); |
656 | 150k | if (GetEnv()->FileExists(recovery_path)) { |
657 | 0 | LOG_WITH_PREFIX(INFO) << "Previous recovery directory found at " << recovery_path << ": " |
658 | 0 | << "Replaying log files from this location instead of " << log_dir; |
659 | | |
660 | | // Since we have a recovery directory, clear out the log_dir by recursively deleting it and |
661 | | // creating a new one so that we don't end up with remnants of old WAL segments or indexes |
662 | | // after replay. |
663 | 0 | if (GetEnv()->FileExists(log_dir)) { |
664 | 0 | LOG_WITH_PREFIX(INFO) << "Deleting old log files from previous recovery attempt in " |
665 | 0 | << log_dir; |
666 | 0 | RETURN_NOT_OK_PREPEND(GetEnv()->DeleteRecursively(log_dir), |
667 | 0 | "Could not recursively delete old log dir " + log_dir); |
668 | 0 | } |
669 | | |
670 | 0 | RETURN_NOT_OK_PREPEND(env_util::CreateDirIfMissing(GetEnv(), DirName(log_dir)), |
671 | 0 | "Failed to create table log directory " + DirName(log_dir)); |
672 | |
|
673 | 0 | RETURN_NOT_OK_PREPEND(env_util::CreateDirIfMissing(GetEnv(), log_dir), |
674 | 0 | "Failed to create tablet log directory " + log_dir); |
675 | |
|
676 | 0 | return NeedsRecovery::kTrue; |
677 | 0 | } |
678 | | |
679 | | // If we made it here, there was no pre-existing recovery dir. Now we look for log files in |
680 | | // log_dir, and if we find any then we rename the whole log_dir to a recovery dir and return |
681 | | // needs_recovery = true. |
682 | 150k | RETURN_NOT_OK_PREPEND(env_util::CreateDirIfMissing(GetEnv(), DirName(log_dir)), |
683 | 150k | "Failed to create table log directory " + DirName(log_dir)); |
684 | | |
685 | 150k | RETURN_NOT_OK_PREPEND(env_util::CreateDirIfMissing(GetEnv(), log_dir), |
686 | 150k | "Failed to create tablet log directory " + log_dir); |
687 | | |
688 | 150k | vector<string> log_dir_children = VERIFY_RESULT_PREPEND( |
689 | 150k | GetEnv()->GetChildren(log_dir, ExcludeDots::kTrue), "Couldn't list log segments."); |
690 | | |
691 | | // To ensure consistent order of log messages. Note: this does not affect the replay order |
692 | | // of segments, only the order of INFO log messages below. |
693 | 0 | sort(log_dir_children.begin(), log_dir_children.end()); |
694 | | |
695 | 150k | bool needs_recovery = false; |
696 | 150k | for (const string& log_dir_child : log_dir_children) { |
697 | 11.1k | if (!log::IsLogFileName(log_dir_child)) { |
698 | 778 | continue; |
699 | 778 | } |
700 | | |
701 | 10.3k | needs_recovery = true; |
702 | 10.3k | string source_path = JoinPathSegments(log_dir, log_dir_child); |
703 | 10.3k | if (skip_wal_rewrite_10.3k ) { |
704 | 10.3k | LOG_WITH_PREFIX(INFO) << "Will attempt to recover log segment " << source_path; |
705 | 10.3k | continue; |
706 | 10.3k | } |
707 | | |
708 | 18.4E | string dest_path = JoinPathSegments(recovery_path, log_dir_child); |
709 | 18.4E | LOG_WITH_PREFIX(INFO) << "Will attempt to recover log segment " << source_path |
710 | 18.4E | << " to " << dest_path; |
711 | 18.4E | } |
712 | | |
713 | 150k | if (!skip_wal_rewrite_ && needs_recovery0 ) { |
714 | | // Atomically rename the log directory to the recovery directory and then re-create the log |
715 | | // directory. |
716 | 0 | LOG_WITH_PREFIX(INFO) << "Moving log directory " << log_dir << " to recovery directory " |
717 | 0 | << recovery_path << " in preparation for log replay"; |
718 | 0 | RETURN_NOT_OK_PREPEND(GetEnv()->RenameFile(log_dir, recovery_path), |
719 | 0 | Substitute("Could not move log directory $0 to recovery dir $1", |
720 | 0 | log_dir, recovery_path)); |
721 | 0 | RETURN_NOT_OK_PREPEND(GetEnv()->CreateDir(log_dir), |
722 | 0 | "Failed to recreate log directory " + log_dir); |
723 | 0 | } |
724 | 150k | return NeedsRecovery(needs_recovery); |
725 | 150k | } |
726 | | |
727 | | // Opens the latest log segments for the Tablet that will allow to rebuild the tablet's soft |
728 | | // state. If there are existing log segments in the tablet's log directly they are moved to a |
729 | | // "log-recovery" directory which is deleted when the replay process is completed (as they have |
730 | | // been duplicated in the current log directory). |
731 | | // |
732 | | // If a "log-recovery" directory is already present, we will continue to replay from the |
733 | | // "log-recovery" directory. Tablet metadata is updated once replay has finished from the |
734 | | // "log-recovery" directory. |
735 | 0 | Status OpenLogReader() { |
736 | 0 | auto wal_dir = tablet_->metadata()->wal_dir(); |
737 | 0 | auto wal_path = skip_wal_rewrite_ ? wal_dir : |
738 | 0 | meta_->fs_manager()->GetTabletWalRecoveryDir(wal_dir); |
739 | 0 | VLOG_WITH_PREFIX(1) << "Opening log reader in log recovery dir " << wal_path; |
740 | | // Open the reader. |
741 | 0 | scoped_refptr<LogIndex> index(nullptr); |
742 | 0 | RETURN_NOT_OK_PREPEND( |
743 | 0 | LogReader::Open( |
744 | 0 | GetEnv(), |
745 | 0 | index, |
746 | 0 | LogPrefix(), |
747 | 0 | wal_path, |
748 | 0 | tablet_->GetTableMetricsEntity().get(), |
749 | 0 | tablet_->GetTabletMetricsEntity().get(), |
750 | 0 | &log_reader_), |
751 | 0 | "Could not open LogReader. Reason"); |
752 | 0 | return Status::OK(); |
753 | 0 | } |
754 | | |
755 | | // Removes the recovery directory and all files contained therein. Intended to be invoked after |
756 | | // log replay successfully completes. |
757 | 2.79k | CHECKED_STATUS RemoveRecoveryDir() { |
758 | 2.79k | const string recovery_path = FsManager::GetTabletWalRecoveryDir(tablet_->metadata()->wal_dir()); |
759 | 2.79k | if (!GetEnv()->FileExists(recovery_path)) { |
760 | 2.79k | VLOG(1) << "Tablet WAL recovery dir " << recovery_path << " does not exist."0 ; |
761 | 2.79k | if (!skip_wal_rewrite_) { |
762 | 0 | return STATUS(IllegalState, "Expected recovery dir, none found."); |
763 | 0 | } |
764 | 2.79k | return Status::OK(); |
765 | 2.79k | } |
766 | | |
767 | 0 | LOG_WITH_PREFIX(INFO) << "Preparing to delete log recovery files and directory " |
768 | 0 | << recovery_path; |
769 | |
|
770 | 0 | string tmp_path = Substitute("$0-$1", recovery_path, GetCurrentTimeMicros()); |
771 | 0 | LOG_WITH_PREFIX(INFO) << "Renaming log recovery dir from " << recovery_path |
772 | 0 | << " to " << tmp_path; |
773 | 0 | RETURN_NOT_OK_PREPEND(GetEnv()->RenameFile(recovery_path, tmp_path), |
774 | 0 | Substitute("Could not rename old recovery dir from: $0 to: $1", |
775 | 0 | recovery_path, tmp_path)); |
776 | |
|
777 | 0 | if (FLAGS_skip_remove_old_recovery_dir) { |
778 | 0 | LOG_WITH_PREFIX(INFO) << "--skip_remove_old_recovery_dir enabled. NOT deleting " << tmp_path; |
779 | 0 | return Status::OK(); |
780 | 0 | } |
781 | 0 | LOG_WITH_PREFIX(INFO) << "Deleting all files from renamed log recovery directory " << tmp_path; |
782 | 0 | RETURN_NOT_OK_PREPEND(GetEnv()->DeleteRecursively(tmp_path), |
783 | 0 | "Could not remove renamed recovery dir " + tmp_path); |
784 | 0 | LOG_WITH_PREFIX(INFO) << "Completed deletion of old log recovery files and directory " |
785 | 0 | << tmp_path; |
786 | 0 | return Status::OK(); |
787 | 0 | } |
788 | | |
789 | | // Opens a new log in the tablet's log directory. The directory is expected to be clean. |
790 | 150k | CHECKED_STATUS OpenNewLog(log::CreateNewSegment create_new_segment) { |
791 | 150k | auto log_options = LogOptions(); |
792 | 150k | const auto& metadata = *tablet_->metadata(); |
793 | 150k | log_options.retention_secs = metadata.wal_retention_secs(); |
794 | 150k | log_options.env = GetEnv(); |
795 | 150k | if (tablet_->metadata()->table_type() == TableType::TRANSACTION_STATUS_TABLE_TYPE) { |
796 | 48.0k | auto log_segment_size = FLAGS_transaction_status_tablet_log_segment_size_bytes; |
797 | 48.0k | if (log_segment_size) { |
798 | 48.0k | log_options.segment_size_bytes = log_segment_size; |
799 | 48.0k | } |
800 | 48.0k | } |
801 | 150k | RETURN_NOT_OK(Log::Open( |
802 | 150k | log_options, |
803 | 150k | tablet_->tablet_id(), |
804 | 150k | metadata.wal_dir(), |
805 | 150k | metadata.fs_manager()->uuid(), |
806 | 150k | *tablet_->schema(), |
807 | 150k | metadata.schema_version(), |
808 | 150k | tablet_->GetTableMetricsEntity(), |
809 | 150k | tablet_->GetTabletMetricsEntity(), |
810 | 150k | append_pool_, |
811 | 150k | allocation_pool_, |
812 | 150k | metadata.cdc_min_replicated_index(), |
813 | 150k | &log_, |
814 | 150k | create_new_segment)); |
815 | | // Disable sync temporarily in order to speed up appends during the bootstrap process. |
816 | 150k | log_->DisableSync(); |
817 | 150k | return Status::OK(); |
818 | 150k | } |
819 | | |
820 | | // Handle the given log entry. Validates entry.type() (it can only be REPLICATE), optionally |
821 | | // injects latency in tests, and delegates to HandleReplicateMessage. |
822 | | CHECKED_STATUS HandleEntry( |
823 | 1.37M | yb::log::LogEntryMetadata entry_metadata, std::unique_ptr<log::LogEntryPB>* entry_ptr) { |
824 | 1.37M | auto& entry = **entry_ptr; |
825 | 1.37M | VLOG_WITH_PREFIX21 (2) << "Handling entry: " << entry.ShortDebugString()21 ; |
826 | | |
827 | 1.37M | switch (entry.type()) { |
828 | 1.37M | case log::REPLICATE: |
829 | 1.37M | RETURN_NOT_OK(HandleReplicateMessage(entry_metadata, entry_ptr)); |
830 | 1.37M | break; |
831 | 1.37M | default: |
832 | 0 | return STATUS(Corruption, Substitute("Unexpected log entry type: $0", entry.type())); |
833 | 1.37M | } |
834 | 1.37M | MAYBE_FAULT(FLAGS_TEST_fault_crash_during_log_replay); |
835 | 1.37M | return Status::OK(); |
836 | 1.37M | } |
837 | | |
838 | | // HandleReplicateMessage implements these important pieces of logic: |
839 | | // - Removes the "tail" of pending_replicates overwritten by a new leader's operations when |
840 | | // encountering an entry with an index lower than or equal to the index of an operation that |
841 | | // is already present in pending_replicates. |
842 | | // - Ignores entries that have already been flushed into regular and intents RocksDBs. |
843 | | // - Updates committed OpId based on the comsmited OpId from the entry and calls |
844 | | // ApplyCommittedPendingReplicates. |
845 | | // - Updates the "monotonic counter" used for assigning internal keys in YCQL arrays. |
846 | | CHECKED_STATUS HandleReplicateMessage( |
847 | 1.37M | LogEntryMetadata entry_metadata, std::unique_ptr<log::LogEntryPB>* replicate_entry_ptr) { |
848 | 1.37M | auto& replicate_entry = **replicate_entry_ptr; |
849 | 1.37M | stats_.ops_read++; |
850 | | |
851 | 1.37M | const ReplicateMsg& replicate = replicate_entry.replicate(); |
852 | 18.4E | VLOG_WITH_PREFIX(1) << "HandleReplicateMessage: " << entry_metadata.ToString() |
853 | 18.4E | << ", op id: " << replicate.id() |
854 | 18.4E | << ", committed op id: " << replicate.committed_op_id(); |
855 | 1.37M | RETURN_NOT_OK(replay_state_->CheckSequentialReplicateId(replicate)); |
856 | 1.37M | SCHECK(replicate.has_hybrid_time(), Corruption, "A REPLICATE message must have a hybrid time"); |
857 | 1.37M | UpdateClock(replicate.hybrid_time()); |
858 | | |
859 | | // This sets the monotonic counter to at least replicate.monotonic_counter() atomically. |
860 | 1.37M | tablet_->UpdateMonotonicCounter(replicate.monotonic_counter()); |
861 | | |
862 | 1.37M | const auto op_id = OpId::FromPB(replicate_entry.replicate().id()); |
863 | | |
864 | | // Append the replicate message to the log as is if we are not skipping wal rewrite. If we are |
865 | | // skipping, set consensus_state_only to true. |
866 | 1.37M | RETURN_NOT_OK(log_->Append(replicate_entry_ptr->get(), entry_metadata, skip_wal_rewrite_)); |
867 | | |
868 | 1.37M | auto iter = replay_state_->pending_replicates.lower_bound(op_id.index); |
869 | | |
870 | | // If there was an entry with the same or higher index as the entry we're adding, then we need |
871 | | // to delete that entry and all entries with higher indexes. |
872 | 1.37M | if (iter != replay_state_->pending_replicates.end()) { |
873 | 10.5k | auto& existing_entry = iter->second; |
874 | 10.5k | auto& last_entry = replay_state_->pending_replicates.rbegin()->second; |
875 | | |
876 | 10.5k | LOG_WITH_PREFIX(INFO) << "Overwriting operations starting at: " |
877 | 10.5k | << existing_entry.entry->replicate().id() |
878 | 10.5k | << " up to: " << last_entry.entry->replicate().id() |
879 | 10.5k | << " with operation: " << replicate.id(); |
880 | 10.5k | stats_.ops_overwritten += std::distance(iter, replay_state_->pending_replicates.end()); |
881 | 10.5k | if (test_hooks_) { |
882 | | // Tell the test framework about overwritten OpIds. |
883 | 10.5k | for (auto callback_iter = iter; |
884 | 73.3k | callback_iter != replay_state_->pending_replicates.end(); |
885 | 62.7k | callback_iter++) { |
886 | 62.7k | test_hooks_->Overwritten( |
887 | 62.7k | yb::OpId::FromPB(callback_iter->second.entry->replicate().id())); |
888 | 62.7k | } |
889 | 10.5k | } |
890 | 10.5k | replay_state_->pending_replicates.erase(iter, replay_state_->pending_replicates.end()); |
891 | 10.5k | } |
892 | | |
893 | | // We expect entry_metadata.entry_time to always be set for newly written WAL entries. However, |
894 | | // for some very old WALs, it might be missing. |
895 | 18.4E | LOG_IF_WITH_PREFIX(DFATAL, entry_metadata.entry_time == RestartSafeCoarseTimePoint()) |
896 | 18.4E | << "Entry metadata must have a restart-safe time. OpId: " << OpId::FromPB(replicate.id()); |
897 | | |
898 | 1.37M | CHECK(replay_state_->pending_replicates.emplace( |
899 | 1.37M | op_id.index, Entry{std::move(*replicate_entry_ptr), entry_metadata.entry_time}).second); |
900 | | |
901 | 18.4E | CHECK(replicate.has_committed_op_id()) |
902 | 18.4E | << "Replicate message has no committed_op_id for table type " |
903 | 18.4E | << TableType_Name(tablet_->table_type()) << ". Replicate message:\n" |
904 | 18.4E | << replicate.DebugString(); |
905 | | |
906 | | // We include the commit index as of the time a REPLICATE entry was added to the leader's log |
907 | | // into that entry. This allows us to decide when we can replay a REPLICATE entry during |
908 | | // bootstrap. |
909 | 1.37M | replay_state_->UpdateCommittedOpId(OpId::FromPB(replicate.committed_op_id())); |
910 | | |
911 | 1.37M | return ApplyCommittedPendingReplicates(); |
912 | 1.37M | } |
913 | | |
914 | | // Replays the given committed operation. |
915 | | CHECKED_STATUS PlayAnyRequest( |
916 | 270k | ReplicateMsg* replicate, AlreadyAppliedToRegularDB already_applied_to_regular_db) { |
917 | 270k | const auto op_type = replicate->op_type(); |
918 | 270k | if (test_hooks_) { |
919 | 243k | test_hooks_->Replayed(yb::OpId::FromPB(replicate->id()), already_applied_to_regular_db); |
920 | 243k | } |
921 | 270k | switch (op_type) { |
922 | 227k | case consensus::WRITE_OP: |
923 | 227k | return PlayWriteRequest(replicate, already_applied_to_regular_db); |
924 | | |
925 | 2.75k | case consensus::CHANGE_METADATA_OP: |
926 | 2.75k | return PlayChangeMetadataRequest(replicate); |
927 | | |
928 | 4.50k | case consensus::CHANGE_CONFIG_OP: |
929 | 4.50k | return PlayChangeConfigRequest(replicate); |
930 | | |
931 | 0 | case consensus::TRUNCATE_OP: |
932 | 0 | return PlayTruncateRequest(replicate); |
933 | | |
934 | 2.53k | case consensus::NO_OP: |
935 | 2.53k | return Status::OK(); // This is why it is a no-op! |
936 | | |
937 | 33.7k | case consensus::UPDATE_TRANSACTION_OP: |
938 | 33.7k | return PlayUpdateTransactionRequest(replicate, already_applied_to_regular_db); |
939 | | |
940 | 0 | case consensus::SNAPSHOT_OP: |
941 | 0 | return PlayTabletSnapshotRequest(replicate); |
942 | | |
943 | 0 | case consensus::HISTORY_CUTOFF_OP: |
944 | 0 | return PlayHistoryCutoffRequest(replicate); |
945 | | |
946 | 1 | case consensus::SPLIT_OP: |
947 | 1 | return PlaySplitOpRequest(replicate); |
948 | | |
949 | | // Unexpected cases: |
950 | 0 | case consensus::UNKNOWN_OP: |
951 | 0 | return STATUS(IllegalState, Substitute("Unsupported operation type: $0", op_type)); |
952 | 270k | } |
953 | | |
954 | 0 | LOG_WITH_PREFIX(DFATAL) << "Invalid operation type " << op_type |
955 | 0 | << "for a REPLICATE operation: " << replicate->ShortDebugString(); |
956 | 0 | return STATUS_FORMAT(Corruption, "Invalid operation type: $0", op_type); |
957 | 270k | } |
958 | | |
959 | 0 | CHECKED_STATUS PlayTabletSnapshotRequest(ReplicateMsg* replicate_msg) { |
960 | 0 | TabletSnapshotOpRequestPB* const snapshot = replicate_msg->mutable_snapshot_request(); |
961 | |
|
962 | 0 | SnapshotOperation operation(tablet_.get(), snapshot); |
963 | 0 | operation.set_hybrid_time(HybridTime(replicate_msg->hybrid_time())); |
964 | |
|
965 | 0 | return operation.Replicated(/* leader_term= */ yb::OpId::kUnknownTerm); |
966 | 0 | } |
967 | | |
968 | 0 | CHECKED_STATUS PlayHistoryCutoffRequest(ReplicateMsg* replicate_msg) { |
969 | 0 | HistoryCutoffOperation operation( |
970 | 0 | tablet_.get(), replicate_msg->mutable_history_cutoff()); |
971 | |
|
972 | 0 | return operation.Apply(/* leader_term= */ yb::OpId::kUnknownTerm); |
973 | 0 | } |
974 | | |
975 | 1 | CHECKED_STATUS PlaySplitOpRequest(ReplicateMsg* replicate_msg) { |
976 | 1 | SplitTabletRequestPB* const split_request = replicate_msg->mutable_split_request(); |
977 | | // We might be asked to replay SPLIT_OP even if it was applied and flushed when |
978 | | // FLAGS_force_recover_flushed_frontier is set. |
979 | 1 | if (split_request->tablet_id() != tablet_->tablet_id()) { |
980 | | // Ignore SPLIT_OP designated for ancestor tablet(s). |
981 | 0 | return Status::OK(); |
982 | 0 | } |
983 | | |
984 | 1 | if (tablet_->metadata()->tablet_data_state() == TabletDataState::TABLET_DATA_SPLIT_COMPLETED) { |
985 | | // Ignore SPLIT_OP if tablet has been already split. |
986 | 1 | VLOG_WITH_PREFIX_AND_FUNC0 (1) << "Tablet has been already split."0 ; |
987 | 1 | return Status::OK(); |
988 | 1 | } |
989 | | |
990 | 0 | SplitOperation operation(tablet_.get(), data_.tablet_init_data.tablet_splitter, split_request); |
991 | 0 | operation.set_hybrid_time(HybridTime(replicate_msg->hybrid_time())); |
992 | 0 | return data_.tablet_init_data.tablet_splitter->ApplyTabletSplit(&operation, log_.get()); |
993 | | |
994 | | // TODO(tsplit): In scope of https://github.com/yugabyte/yugabyte-db/issues/1461 add integration |
995 | | // tests for: |
996 | | // - tablet bootstrap of original tablet which hasn't been yet split and replaying split |
997 | | // operation. |
998 | | // - tablet bootstrap of original tablet which has been already successfully split and replaying |
999 | | // split operation. |
1000 | | // - tablet bootstrap of new after-split tablet replaying split operation. |
1001 | 1 | } |
1002 | | |
1003 | | void HandleRetryableRequest( |
1004 | 1.30M | const ReplicateMsg& replicate, RestartSafeCoarseTimePoint entry_time) { |
1005 | 1.30M | if (!replicate.has_write()) |
1006 | 17.2k | return; |
1007 | | |
1008 | 1.29M | if (data_.retryable_requests) { |
1009 | 1.00M | data_.retryable_requests->Bootstrap(replicate, entry_time); |
1010 | 1.00M | } |
1011 | | |
1012 | | // In a test, we might not have data_.retryable_requests, but we still want to tell the test |
1013 | | // that we would submit this OpId to retryable_requests. |
1014 | 1.29M | if (test_hooks_) { |
1015 | 282k | test_hooks_->RetryableRequest(OpId::FromPB(replicate.id())); |
1016 | 282k | } |
1017 | 1.29M | } |
1018 | | |
1019 | | // Performs various checks based on the OpId, and decides whether to replay the given operation. |
1020 | | // If so, calls PlayAnyRequest, or sometimes calls PlayUpdateTransactionRequest directly. |
1021 | | CHECKED_STATUS MaybeReplayCommittedEntry( |
1022 | 1.30M | LogEntryPB* replicate_entry, RestartSafeCoarseTimePoint entry_time) { |
1023 | 1.30M | ReplicateMsg* const replicate = replicate_entry->mutable_replicate(); |
1024 | 1.30M | const auto op_type = replicate->op_type(); |
1025 | 1.30M | const auto decision = ShouldReplayOperation( |
1026 | 1.30M | op_type, |
1027 | 1.30M | replicate->id().index(), |
1028 | 1.30M | replay_state_->stored_op_ids.regular.index, |
1029 | 1.30M | replay_state_->stored_op_ids.intents.index, |
1030 | | // txn_status |
1031 | 1.30M | replicate->has_transaction_state() |
1032 | 1.30M | ? replicate->transaction_state().status()37.5k |
1033 | 1.30M | : TransactionStatus::ABORTED1.27M , // should not be used |
1034 | | // write_op_has_transaction |
1035 | 1.30M | WriteOpHasTransaction(*replicate)); |
1036 | | |
1037 | 1.30M | HandleRetryableRequest(*replicate, entry_time); |
1038 | 18.4E | VLOG_WITH_PREFIX_AND_FUNC(3) << "decision: " << AsString(decision); |
1039 | 1.30M | if (decision.should_replay) { |
1040 | 270k | const auto status = PlayAnyRequest(replicate, decision.already_applied_to_regular_db); |
1041 | 270k | if (!status.ok()) { |
1042 | 0 | return status.CloneAndAppend(Format( |
1043 | 0 | "Failed to play $0 request. ReplicateMsg: { $1 }", |
1044 | 0 | OperationType_Name(op_type), *replicate)); |
1045 | 0 | } |
1046 | 270k | replay_state_->max_committed_hybrid_time.MakeAtLeast(HybridTime(replicate->hybrid_time())); |
1047 | 270k | } |
1048 | | |
1049 | 1.30M | return Status::OK(); |
1050 | 1.30M | } |
1051 | | |
1052 | 2.79k | void DumpReplayStateToLog() { |
1053 | | // Dump the replay state, this will log the pending replicates, which might be useful for |
1054 | | // debugging. |
1055 | 2.79k | vector<string> state_dump; |
1056 | 2.79k | constexpr int kMaxLinesToDump = 1000; |
1057 | 2.79k | replay_state_->DumpReplayStateToStrings(&state_dump, kMaxLinesToDump / 2); |
1058 | 7.20k | for (const string& line : state_dump) { |
1059 | 7.20k | LOG_WITH_PREFIX(INFO) << line; |
1060 | 7.20k | } |
1061 | 2.79k | } |
1062 | | |
1063 | 2.79k | Result<DocDbOpIds> GetFlushedOpIds() { |
1064 | 2.79k | const auto flushed_op_ids = VERIFY_RESULT(tablet_->MaxPersistentOpId()); |
1065 | | |
1066 | 2.79k | if (FLAGS_force_recover_flushed_frontier) { |
1067 | | // This is used very rarely to replay all log entries and recover RocksDB flushed OpId |
1068 | | // metadata. |
1069 | 0 | LOG_WITH_PREFIX(WARNING) |
1070 | 0 | << "--force_recover_flushed_frontier specified, ignoring existing flushed frontiers " |
1071 | 0 | << "from RocksDB metadata (will replay all log records): " << flushed_op_ids.ToString(); |
1072 | 0 | return DocDbOpIds(); |
1073 | 0 | } |
1074 | | |
1075 | 2.79k | if (test_hooks_) { |
1076 | 407 | const auto docdb_flushed_op_ids_override = test_hooks_->GetFlushedOpIdsOverride(); |
1077 | 407 | if (docdb_flushed_op_ids_override.is_initialized()) { |
1078 | 401 | LOG_WITH_PREFIX(INFO) << "Using test values of flushed DocDB OpIds: " |
1079 | 401 | << docdb_flushed_op_ids_override->ToString(); |
1080 | 401 | return *docdb_flushed_op_ids_override; |
1081 | 401 | } |
1082 | 407 | } |
1083 | | |
1084 | | // Production codepath. |
1085 | 2.39k | LOG_WITH_PREFIX(INFO) << "Flushed DocDB OpIds: " << flushed_op_ids.ToString(); |
1086 | 2.39k | return flushed_op_ids; |
1087 | 2.79k | } |
1088 | | |
1089 | | // Determines the first segment to replay based two criteria: |
1090 | | // - The first OpId of the segment must be less than or equal to (in terms of OpId comparison |
1091 | | // where term is compared first and index second) the "flushed OpId". This "flushed OpId" is |
1092 | | // determined as the minimum of intents and regular RocksDBs' flushed OpIds for transactional |
1093 | | // tables, and just the regular RocksDB's flushed OpId for non-transactional tables. Note that |
1094 | | // in practice the flushed OpId of the intents RocksDB should also be less than or equal to the |
1095 | | // flushed OpId of the regular RocksDB or this would be an invariant violation. |
1096 | | // |
1097 | | // - The "restart safe time" of the first operation in the segment that we choose to start the |
1098 | | // replay with must be such that we guarantee that at least FLAGS_retryable_request_timeout_secs |
1099 | | // seconds worth of latest log records are replayed. This is needed to allow deduplicating |
1100 | | // automatic retries from YCQL and YSQL query layer and avoid Jepsen-type consistency |
1101 | | // violations. We satisfy this constraint by taking the last segment's first operation's |
1102 | | // restart-safe time, subtracting FLAGS_retryable_request_timeout_secs seconds from it, and |
1103 | | // finding a segment that has that time or earlier as its first operation's restart-safe time. |
1104 | | // This also means we are never allowed to start replay with the last segment, as long as |
1105 | | // FLAGS_retryable_request_timeout_secs is greater than 0. |
1106 | | // |
1107 | | // This "restart safe time" is similar to the regular Linux monotonic clock time, but is |
1108 | | // maintained across tablet server restarts. See RestartSafeCoarseMonoClock for details. |
1109 | | // |
1110 | | // See https://github.com/yugabyte/yugabyte-db/commit/5cf01889a1b4589a82085e578b5f4746c6614a5d |
1111 | | // and the Git history of retryable_requests.cc for more context on this requirement. |
1112 | | // |
1113 | | // As long as the two conditions above are satisfied, it is advantageous to us to pick the latest |
1114 | | // possible segment to start replay with. That way we can skip the maximum number of segments. |
1115 | | // |
1116 | | // Returns the iterator pointing to the first segment to start replay with. Also produces a number |
1117 | | // of diagnostic log messages. |
1118 | | // |
1119 | | // This functionality was originally introduced in |
1120 | | // https://github.com/yugabyte/yugabyte-db/commit/41ef3f75e3c68686595c7613f53b649823b84fed |
1121 | 2.79k | SegmentSequence::iterator SkipFlushedEntries(SegmentSequence* segments_ptr) { |
1122 | 2.79k | static const char* kBootstrapOptimizerLogPrefix = |
1123 | 2.79k | "Bootstrap optimizer (skip_flushed_entries): "; |
1124 | | |
1125 | | // Lower bound on op IDs that need to be replayed. This is the "flushed OpId" that this |
1126 | | // function's comment mentions. |
1127 | 2.79k | const auto op_id_replay_lowest = replay_state_->GetLowestOpIdToReplay( |
1128 | | // Determine whether we have an intents DB. |
1129 | 2.79k | tablet_->doc_db().intents || (2.55k test_hooks_2.55k && test_hooks_->HasIntentsDB()407 ), |
1130 | 2.79k | kBootstrapOptimizerLogPrefix); |
1131 | | |
1132 | 2.79k | SegmentSequence& segments = *segments_ptr; |
1133 | | |
1134 | | // Time point of the first entry of the last WAL segment, and how far back in time from it we |
1135 | | // should retain other entries. |
1136 | 2.79k | boost::optional<RestartSafeCoarseTimePoint> replay_from_this_or_earlier_time; |
1137 | 2.79k | const RestartSafeCoarseDuration min_seconds_to_retain_logs = |
1138 | 2.79k | std::chrono::seconds(GetAtomicFlag(&FLAGS_retryable_request_timeout_secs)); |
1139 | | |
1140 | 2.79k | auto iter = segments.end(); |
1141 | 9.81k | while (iter != segments.begin()) { |
1142 | 7.48k | --iter; |
1143 | 7.48k | ReadableLogSegment& segment = **iter; |
1144 | 7.48k | const std::string& segment_path = segment.path(); |
1145 | | |
1146 | 7.48k | const auto first_op_metadata_result = segment.ReadFirstEntryMetadata(); |
1147 | 7.48k | if (!first_op_metadata_result.ok()) { |
1148 | 35 | if (test_hooks_) { |
1149 | 0 | test_hooks_->FirstOpIdOfSegment(segment_path, OpId::Invalid()); |
1150 | 0 | } |
1151 | 35 | LOG_WITH_PREFIX(WARNING) |
1152 | 35 | << kBootstrapOptimizerLogPrefix |
1153 | 35 | << "Could not read the first entry's metadata of log segment " << segment_path << ". " |
1154 | 35 | << "Simply continuing to earlier segments to determine the first segment " |
1155 | 35 | << "to start the replay at. The error was: " << first_op_metadata_result.status(); |
1156 | 35 | continue; |
1157 | 35 | } |
1158 | 7.44k | const auto& first_op_metadata = *first_op_metadata_result; |
1159 | | |
1160 | 7.44k | const auto op_id = first_op_metadata.op_id; |
1161 | 7.44k | if (test_hooks_) { |
1162 | 3.54k | test_hooks_->FirstOpIdOfSegment(segment_path, op_id); |
1163 | 3.54k | } |
1164 | 7.44k | const RestartSafeCoarseTimePoint first_op_time = first_op_metadata.entry_time; |
1165 | | |
1166 | 7.44k | if (!replay_from_this_or_earlier_time.is_initialized()) { |
1167 | 2.79k | replay_from_this_or_earlier_time = first_op_time - min_seconds_to_retain_logs; |
1168 | 2.79k | } |
1169 | | |
1170 | 7.44k | const auto is_first_op_id_low_enough = op_id <= op_id_replay_lowest; |
1171 | 7.44k | const auto is_first_op_time_early_enough = first_op_time <= replay_from_this_or_earlier_time; |
1172 | | |
1173 | 7.44k | const auto common_details_str = [&]() { |
1174 | 7.44k | std::ostringstream ss; |
1175 | 7.44k | ss << EXPR_VALUE_FOR_LOG(first_op_time) << ", " |
1176 | 7.44k | << EXPR_VALUE_FOR_LOG(min_seconds_to_retain_logs) << ", " |
1177 | 7.44k | << EXPR_VALUE_FOR_LOG(*replay_from_this_or_earlier_time); |
1178 | 7.44k | return ss.str(); |
1179 | 7.44k | }; |
1180 | | |
1181 | 7.44k | if (is_first_op_id_low_enough && is_first_op_time_early_enough2.21k ) { |
1182 | 465 | LOG_WITH_PREFIX(INFO) |
1183 | 465 | << kBootstrapOptimizerLogPrefix |
1184 | 465 | << "found first mandatory segment op id: " << op_id |
1185 | 465 | << common_details_str() << ", " |
1186 | 465 | << "number of segments to be skipped: " << (iter - segments.begin()); |
1187 | 465 | return iter; |
1188 | 465 | } |
1189 | | |
1190 | 6.98k | LOG_WITH_PREFIX(INFO) |
1191 | 6.98k | << "Segment " << segment_path << " cannot be used as the first segment to start replay " |
1192 | 6.98k | << "with according to our OpId and retention criteria. " |
1193 | 6.98k | << (iter == segments.begin() |
1194 | 6.98k | ? "However, this is already the earliest segment so we have to start replay " |
1195 | 2.33k | "here. We should probably investigate how we got into this situation. " |
1196 | 6.98k | : "Continuing to earlier segments."4.65k ) |
1197 | 6.98k | << EXPR_VALUE_FOR_LOG(op_id) << ", " |
1198 | 6.98k | << common_details_str() << ", " |
1199 | 6.98k | << EXPR_VALUE_FOR_LOG(is_first_op_id_low_enough) << ", " |
1200 | 6.98k | << EXPR_VALUE_FOR_LOG(is_first_op_time_early_enough); |
1201 | 6.98k | } |
1202 | | |
1203 | 2.32k | LOG_WITH_PREFIX(INFO) |
1204 | 2.32k | << kBootstrapOptimizerLogPrefix |
1205 | 2.32k | << "will replay all segments starting from the very first one."; |
1206 | | |
1207 | 2.32k | return iter; |
1208 | 2.79k | } |
1209 | | |
1210 | | // Plays the log segments into the tablet being built. The process of playing the segments can |
1211 | | // work in two modes: |
1212 | | // |
1213 | | // - With skip_wal_rewrite enabled (default mode): |
1214 | | // Reuses existing segments of the log, rebuilding log segment footers when necessary. |
1215 | | // |
1216 | | // - With skip_wal_rewrite disabled (legacy mode): |
1217 | | // Moves the old log to a "recovery directory" and replays entries from the old into a new log. |
1218 | | // This is very I/O-intensive. We should probably get rid of this mode eventually. |
1219 | | // |
1220 | | // The resulting log can be continued later on when then tablet is rebuilt and starts accepting |
1221 | | // writes from clients. |
1222 | 2.79k | CHECKED_STATUS PlaySegments(ConsensusBootstrapInfo* consensus_info) { |
1223 | 2.79k | const auto flushed_op_ids = VERIFY_RESULT(GetFlushedOpIds()); |
1224 | | |
1225 | 2.79k | if (tablet_->snapshot_coordinator()) { |
1226 | | // We should load transaction aware snapshots before replaying logs, because we need them |
1227 | | // during this replay. |
1228 | 89 | RETURN_NOT_OK(tablet_->snapshot_coordinator()->Load(tablet_.get())); |
1229 | 89 | } |
1230 | | |
1231 | 2.79k | replay_state_ = std::make_unique<ReplayState>(flushed_op_ids, LogPrefix()); |
1232 | 2.79k | replay_state_->max_committed_hybrid_time = VERIFY_RESULT(tablet_->MaxPersistentHybridTime()); |
1233 | | |
1234 | 2.79k | if (FLAGS_force_recover_flushed_frontier) { |
1235 | 0 | LOG_WITH_PREFIX(WARNING) |
1236 | 0 | << "--force_recover_flushed_frontier specified, ignoring max committed hybrid time from " |
1237 | 0 | << "RocksDB metadata (will replay all log records): " |
1238 | 0 | << replay_state_->max_committed_hybrid_time; |
1239 | 0 | replay_state_->max_committed_hybrid_time = HybridTime::kMin; |
1240 | 2.79k | } else { |
1241 | 2.79k | LOG_WITH_PREFIX(INFO) << "Max persistent index in RocksDB's SSTables before bootstrap: " |
1242 | 2.79k | << "regular RocksDB: " |
1243 | 2.79k | << replay_state_->stored_op_ids.regular << "; " |
1244 | 2.79k | << "intents RocksDB: " |
1245 | 2.79k | << replay_state_->stored_op_ids.intents; |
1246 | 2.79k | } |
1247 | | |
1248 | | // Open the log. |
1249 | | // |
1250 | | // If skip_wal_rewrite is true (default case), defer appending to this log until bootstrap is |
1251 | | // finished to preserve the state of old log. In that case we don't need to create a new |
1252 | | // segment until bootstrap is done. |
1253 | | // |
1254 | | // If skip_wal_rewrite is false, create a new segment and append each replayed entry to this |
1255 | | // new log. |
1256 | 2.79k | RETURN_NOT_OK_PREPEND( |
1257 | 2.79k | OpenNewLog(log::CreateNewSegment(!FLAGS_skip_wal_rewrite)), "Failed to open new log"); |
1258 | | |
1259 | 2.79k | log::SegmentSequence segments; |
1260 | 2.79k | RETURN_NOT_OK(log_->GetSegmentsSnapshot(&segments)); |
1261 | | |
1262 | | // Find the earliest log segment we need to read, so the rest can be ignored. |
1263 | 2.79k | auto iter = FLAGS_skip_flushed_entries ? SkipFlushedEntries(&segments)2.79k : segments.begin()3 ; |
1264 | | |
1265 | 2.79k | yb::OpId last_committed_op_id; |
1266 | 2.79k | yb::OpId last_read_entry_op_id; |
1267 | 2.79k | RestartSafeCoarseTimePoint last_entry_time; |
1268 | 10.2k | for (; iter != segments.end(); ++iter7.48k ) { |
1269 | 7.48k | const scoped_refptr<ReadableLogSegment>& segment = *iter; |
1270 | | |
1271 | 7.48k | auto read_result = segment->ReadEntries(); |
1272 | 7.48k | last_committed_op_id = std::max(last_committed_op_id, read_result.committed_op_id); |
1273 | 7.48k | if (!read_result.entries.empty()) { |
1274 | 7.44k | last_read_entry_op_id = yb::OpId::FromPB(read_result.entries.back()->replicate().id()); |
1275 | 7.44k | } |
1276 | 1.37M | for (size_t entry_idx = 0; entry_idx < read_result.entries.size(); ++entry_idx1.37M ) { |
1277 | 1.37M | const Status s = HandleEntry( |
1278 | 1.37M | read_result.entry_metadata[entry_idx], &read_result.entries[entry_idx]); |
1279 | 1.37M | if (!s.ok()) { |
1280 | 0 | LOG_WITH_PREFIX(INFO) << "Dumping replay state to log: " << s; |
1281 | 0 | DumpReplayStateToLog(); |
1282 | 0 | RETURN_NOT_OK_PREPEND(s, DebugInfo(tablet_->tablet_id(), |
1283 | 0 | segment->header().sequence_number(), |
1284 | 0 | entry_idx, segment->path(), |
1285 | 0 | read_result.entries[entry_idx].get())); |
1286 | 0 | } |
1287 | 1.37M | } |
1288 | 7.48k | if (!read_result.entry_metadata.empty()) { |
1289 | 7.44k | last_entry_time = read_result.entry_metadata.back().entry_time; |
1290 | 7.44k | } |
1291 | | |
1292 | | // If the LogReader failed to read for some reason, we'll still try to replay as many entries |
1293 | | // as possible, and then fail with Corruption. |
1294 | 7.48k | if (PREDICT_FALSE(!read_result.status.ok())) { |
1295 | 1 | return STATUS_FORMAT(Corruption, |
1296 | 1 | "Error reading Log Segment of tablet $0: $1 " |
1297 | 1 | "(Read up to entry $2 of segment $3, in path $4)", |
1298 | 1 | tablet_->tablet_id(), |
1299 | 1 | read_result.status, |
1300 | 1 | read_result.entries.size(), |
1301 | 1 | segment->header().sequence_number(), |
1302 | 1 | segment->path()); |
1303 | 1 | } |
1304 | | |
1305 | | // TODO: could be more granular here and log during the segments as well, plus give info about |
1306 | | // number of MB processed, but this is better than nothing. |
1307 | 7.48k | auto status = Format( |
1308 | 7.48k | "Bootstrap replayed $0/$1 log segments. $2. Pending: $3 replicates. " |
1309 | 7.48k | "Last read committed op id: $4", |
1310 | 7.48k | (iter - segments.begin()) + 1, segments.size(), stats_, |
1311 | 7.48k | replay_state_->pending_replicates.size(), read_result.committed_op_id); |
1312 | 7.48k | if (read_result.entry_metadata.empty()) { |
1313 | 35 | status += ", no entries in last segment"; |
1314 | 7.44k | } else { |
1315 | 7.44k | status += ", last entry metadata: " + read_result.entry_metadata.back().ToString() + |
1316 | 7.44k | ", last read entry op id: " + last_read_entry_op_id.ToString(); |
1317 | 7.44k | } |
1318 | 7.48k | listener_->StatusMessage(status); |
1319 | 7.48k | } |
1320 | | |
1321 | 2.79k | replay_state_->UpdateCommittedFromStored(); |
1322 | 2.79k | RETURN_NOT_OK(ApplyCommittedPendingReplicates()); |
1323 | | |
1324 | 2.79k | if (last_committed_op_id.index > replay_state_->committed_op_id.index) { |
1325 | 2.14k | auto it = replay_state_->pending_replicates.find(last_committed_op_id.index); |
1326 | 2.14k | if (it != replay_state_->pending_replicates.end()) { |
1327 | | // That should be guaranteed by RAFT protocol. If record is committed, it cannot |
1328 | | // be overriden by a new leader. |
1329 | 2.14k | if (last_committed_op_id.term == it->second.entry->replicate().id().term()2.14k ) { |
1330 | 2.14k | replay_state_->UpdateCommittedOpId(last_committed_op_id); |
1331 | 2.14k | RETURN_NOT_OK(ApplyCommittedPendingReplicates()); |
1332 | 18.4E | } else { |
1333 | 18.4E | DumpReplayStateToLog(); |
1334 | 18.4E | LOG_WITH_PREFIX(DFATAL) |
1335 | 18.4E | << "Invalid last committed op id: " << last_committed_op_id |
1336 | 18.4E | << ", record with this index has another term: " |
1337 | 18.4E | << it->second.entry->replicate().id(); |
1338 | 18.4E | } |
1339 | 2.14k | } else { |
1340 | 0 | DumpReplayStateToLog(); |
1341 | 0 | LOG_WITH_PREFIX(DFATAL) |
1342 | 0 | << "Does not have an entry for the last committed index: " << last_committed_op_id |
1343 | 0 | << ", entries: " << yb::ToString(replay_state_->pending_replicates); |
1344 | 0 | } |
1345 | 2.14k | } |
1346 | | |
1347 | 2.79k | LOG_WITH_PREFIX(INFO) << "Dumping replay state to log at the end of " << __FUNCTION__; |
1348 | 2.79k | DumpReplayStateToLog(); |
1349 | | |
1350 | | // Set up the ConsensusBootstrapInfo structure for the caller. |
1351 | 2.79k | for (auto& e : replay_state_->pending_replicates) { |
1352 | | // We only allow log entries with an index later than the index of the last log entry already |
1353 | | // applied to RocksDB to be passed to the tablet as "orphaned replicates". This will make sure |
1354 | | // we don't try to write to RocksDB with non-monotonic sequence ids, but still create |
1355 | | // ConsensusRound instances for writes that have not been persisted into RocksDB. |
1356 | 1.22k | consensus_info->orphaned_replicates.emplace_back(e.second.entry->release_replicate()); |
1357 | 1.22k | } |
1358 | 2.79k | LOG_WITH_PREFIX(INFO) |
1359 | 2.79k | << "Number of orphaned replicates: " << consensus_info->orphaned_replicates.size() |
1360 | 2.79k | << ", last id: " << replay_state_->prev_op_id |
1361 | 2.79k | << ", committed id: " << replay_state_->committed_op_id; |
1362 | | |
1363 | 2.79k | SCHECK_FORMAT( |
1364 | 2.79k | replay_state_->prev_op_id.term >= replay_state_->committed_op_id.term && |
1365 | 2.79k | replay_state_->prev_op_id.index >= replay_state_->committed_op_id.index, |
1366 | 2.79k | IllegalState, |
1367 | 2.79k | "WAL files missing, or committed op id is incorrect. Expected both term and index " |
1368 | 2.79k | "of prev_op_id to be greater than or equal to the corresponding components of " |
1369 | 2.79k | "committed_op_id. prev_op_id=$0, committed_op_id=$1", |
1370 | 2.79k | replay_state_->prev_op_id, replay_state_->committed_op_id); |
1371 | | |
1372 | 2.79k | tablet_->mvcc_manager()->SetLastReplicated(replay_state_->max_committed_hybrid_time); |
1373 | 2.79k | consensus_info->last_id = MakeOpIdPB(replay_state_->prev_op_id); |
1374 | 2.79k | consensus_info->last_committed_id = MakeOpIdPB(replay_state_->committed_op_id); |
1375 | | |
1376 | 2.79k | if (data_.retryable_requests) { |
1377 | 2.29k | data_.retryable_requests->Clock().Adjust(last_entry_time); |
1378 | 2.29k | } |
1379 | | |
1380 | 2.79k | return Status::OK(); |
1381 | 2.79k | } |
1382 | | |
1383 | | CHECKED_STATUS PlayWriteRequest( |
1384 | 227k | ReplicateMsg* replicate_msg, AlreadyAppliedToRegularDB already_applied_to_regular_db) { |
1385 | 227k | SCHECK(replicate_msg->has_hybrid_time(), IllegalState, |
1386 | 227k | "A write operation with no hybrid time"); |
1387 | | |
1388 | 227k | auto* write = replicate_msg->mutable_write(); |
1389 | | |
1390 | 227k | SCHECK(write->has_write_batch(), Corruption, "A write request must have a write batch"); |
1391 | | |
1392 | 227k | WriteOperation operation(tablet_.get(), write); |
1393 | 227k | operation.set_op_id(OpId::FromPB(replicate_msg->id())); |
1394 | 227k | HybridTime hybrid_time(replicate_msg->hybrid_time()); |
1395 | 227k | operation.set_hybrid_time(hybrid_time); |
1396 | | |
1397 | 227k | auto op_id = operation.op_id(); |
1398 | 227k | tablet_->mvcc_manager()->AddFollowerPending(hybrid_time, op_id); |
1399 | | |
1400 | 227k | if (test_hooks_ && |
1401 | 227k | replicate_msg->has_write()212k && |
1402 | 227k | replicate_msg->write().has_write_batch()212k && |
1403 | 227k | replicate_msg->write().write_batch().has_transaction()212k && |
1404 | 227k | test_hooks_->ShouldSkipWritingIntents()46.9k ) { |
1405 | | // Used in unit tests to avoid instantiating the entire transactional subsystem. |
1406 | 46.9k | tablet_->mvcc_manager()->Replicated(hybrid_time, op_id); |
1407 | 46.9k | return Status::OK(); |
1408 | 46.9k | } |
1409 | | |
1410 | 180k | auto apply_status = tablet_->ApplyRowOperations( |
1411 | 180k | &operation, already_applied_to_regular_db); |
1412 | | // Failure is regular case, since could happen because transaction was aborted, while |
1413 | | // replicating its intents. |
1414 | 18.4E | LOG_IF(INFO, !apply_status.ok()) << "Apply operation failed: " << apply_status; |
1415 | | |
1416 | 180k | tablet_->mvcc_manager()->Replicated(hybrid_time, op_id); |
1417 | 180k | return Status::OK(); |
1418 | 227k | } |
1419 | | |
1420 | 2.75k | CHECKED_STATUS PlayChangeMetadataRequest(ReplicateMsg* replicate_msg) { |
1421 | 2.75k | ChangeMetadataRequestPB* request = replicate_msg->mutable_change_metadata_request(); |
1422 | | |
1423 | | // Decode schema |
1424 | 2.75k | Schema schema; |
1425 | 2.75k | if (request->has_schema()) { |
1426 | 1.49k | RETURN_NOT_OK(SchemaFromPB(request->schema(), &schema)); |
1427 | 1.49k | } |
1428 | | |
1429 | 2.75k | ChangeMetadataOperation operation(request); |
1430 | | |
1431 | | // If table id isn't in metadata, ignore the replay as the table might've been dropped. |
1432 | 2.75k | auto table_info = meta_->GetTableInfo(operation.table_id()); |
1433 | 2.75k | if (!table_info.ok()) { |
1434 | 2 | LOG_WITH_PREFIX(WARNING) << "Table ID " << operation.table_id() |
1435 | 2 | << " not found in metadata, skipping this ChangeMetadataRequest"; |
1436 | 2 | return Status::OK(); |
1437 | 2 | } |
1438 | | |
1439 | 2.75k | RETURN_NOT_OK(tablet_->CreatePreparedChangeMetadata( |
1440 | 2.75k | &operation, request->has_schema() ? &schema : nullptr)); |
1441 | | |
1442 | 2.75k | if (request->has_schema()) { |
1443 | | // Apply the alter schema to the tablet. |
1444 | 1.48k | RETURN_NOT_OK_PREPEND(tablet_->AlterSchema(&operation), "Failed to AlterSchema:"); |
1445 | | |
1446 | | // Also update the log information. Normally, the AlterSchema() call above takes care of this, |
1447 | | // but our new log isn't hooked up to the tablet yet. |
1448 | 1.48k | log_->SetSchemaForNextLogSegment(schema, operation.schema_version()); |
1449 | 1.48k | } |
1450 | | |
1451 | 2.75k | if (request->has_wal_retention_secs()) { |
1452 | 0 | RETURN_NOT_OK_PREPEND(tablet_->AlterWalRetentionSecs(&operation), |
1453 | 0 | "Failed to alter wal retention secs"); |
1454 | 0 | log_->set_wal_retention_secs(request->wal_retention_secs()); |
1455 | 0 | } |
1456 | | |
1457 | 2.75k | return Status::OK(); |
1458 | 2.75k | } |
1459 | | |
1460 | 4.50k | CHECKED_STATUS PlayChangeConfigRequest(ReplicateMsg* replicate_msg) { |
1461 | 4.50k | ChangeConfigRecordPB* change_config = replicate_msg->mutable_change_config_record(); |
1462 | 4.50k | RaftConfigPB config = change_config->new_config(); |
1463 | | |
1464 | 4.50k | int64_t cmeta_opid_index = cmeta_->committed_config().opid_index(); |
1465 | 4.50k | if (replicate_msg->id().index() > cmeta_opid_index) { |
1466 | 34 | SCHECK(!config.has_opid_index(), |
1467 | 34 | Corruption, |
1468 | 34 | "A config change record must have an opid_index"); |
1469 | 34 | config.set_opid_index(replicate_msg->id().index()); |
1470 | 34 | VLOG_WITH_PREFIX0 (1) << "WAL replay found Raft configuration with log index " |
1471 | 0 | << config.opid_index() |
1472 | 0 | << " that is greater than the committed config's index " |
1473 | 0 | << cmeta_opid_index |
1474 | 0 | << ". Applying this configuration change."; |
1475 | 34 | cmeta_->set_committed_config(config); |
1476 | | // We flush once at the end of bootstrap. |
1477 | 4.46k | } else { |
1478 | 4.46k | VLOG_WITH_PREFIX0 (1) << "WAL replay found Raft configuration with log index " |
1479 | 0 | << replicate_msg->id().index() |
1480 | 0 | << ", which is less than or equal to the committed " |
1481 | 0 | << "config's index " << cmeta_opid_index << ". " |
1482 | 0 | << "Skipping application of this config change."; |
1483 | 4.46k | } |
1484 | | |
1485 | 4.50k | return Status::OK(); |
1486 | 4.50k | } |
1487 | | |
1488 | 0 | CHECKED_STATUS PlayTruncateRequest(ReplicateMsg* replicate_msg) { |
1489 | 0 | auto* req = replicate_msg->mutable_truncate(); |
1490 | |
|
1491 | 0 | TruncateOperation operation(tablet_.get(), req); |
1492 | |
|
1493 | 0 | Status s = tablet_->Truncate(&operation); |
1494 | |
|
1495 | 0 | RETURN_NOT_OK_PREPEND(s, "Failed to Truncate:"); |
1496 | |
|
1497 | 0 | return Status::OK(); |
1498 | 0 | } |
1499 | | |
1500 | | CHECKED_STATUS PlayUpdateTransactionRequest( |
1501 | 33.7k | ReplicateMsg* replicate_msg, AlreadyAppliedToRegularDB already_applied_to_regular_db) { |
1502 | 33.7k | SCHECK(replicate_msg->has_hybrid_time(), |
1503 | 33.7k | Corruption, "A transaction update request must have a hybrid time"); |
1504 | | |
1505 | 33.7k | UpdateTxnOperation operation( |
1506 | 33.7k | /* tablet */ nullptr, replicate_msg->mutable_transaction_state()); |
1507 | 33.7k | operation.set_op_id(OpId::FromPB(replicate_msg->id())); |
1508 | 33.7k | HybridTime hybrid_time(replicate_msg->hybrid_time()); |
1509 | 33.7k | operation.set_hybrid_time(hybrid_time); |
1510 | | |
1511 | 33.7k | auto op_id = OpId::FromPB(replicate_msg->id()); |
1512 | 33.7k | tablet_->mvcc_manager()->AddFollowerPending(hybrid_time, op_id); |
1513 | 33.7k | auto scope_exit = ScopeExit([this, hybrid_time, op_id] { |
1514 | 33.7k | tablet_->mvcc_manager()->Replicated(hybrid_time, op_id); |
1515 | 33.7k | }); |
1516 | | |
1517 | 33.7k | if (test_hooks_ && test_hooks_->ShouldSkipTransactionUpdates()31.0k ) { |
1518 | | // Used in tests where we don't have transaction participant instantiated. |
1519 | 31.0k | return Status::OK(); |
1520 | 31.0k | } |
1521 | | |
1522 | 2.66k | auto transaction_participant = tablet_->transaction_participant(); |
1523 | 2.66k | if (transaction_participant) { |
1524 | 4 | TransactionParticipant::ReplicatedData replicated_data = { |
1525 | 4 | .leader_term = yb::OpId::kUnknownTerm, |
1526 | 4 | .state = *operation.request(), |
1527 | 4 | .op_id = operation.op_id(), |
1528 | 4 | .hybrid_time = operation.hybrid_time(), |
1529 | 4 | .sealed = operation.request()->sealed(), |
1530 | 4 | .already_applied_to_regular_db = already_applied_to_regular_db |
1531 | 4 | }; |
1532 | 4 | return transaction_participant->ProcessReplicated(replicated_data); |
1533 | 4 | } |
1534 | | |
1535 | 2.66k | auto transaction_coordinator = tablet_->transaction_coordinator(); |
1536 | 2.66k | if (!transaction_coordinator) { |
1537 | 0 | return STATUS( |
1538 | 0 | IllegalState, |
1539 | 0 | "No transaction coordinator or participant, cannot process a transaction update request"); |
1540 | 0 | } |
1541 | 2.66k | TransactionCoordinator::ReplicatedData replicated_data = { |
1542 | 2.66k | .leader_term = yb::OpId::kUnknownTerm, |
1543 | 2.66k | .state = *operation.request(), |
1544 | 2.66k | .op_id = operation.op_id(), |
1545 | 2.66k | .hybrid_time = operation.hybrid_time(), |
1546 | 2.66k | }; |
1547 | 2.66k | return transaction_coordinator->ProcessReplicated(replicated_data); |
1548 | 2.66k | } |
1549 | | |
1550 | | // Decodes a HybridTime from the provided string and updates the clock with it. |
1551 | 1.37M | void UpdateClock(uint64_t hybrid_time) { |
1552 | 1.37M | data_.tablet_init_data.clock->Update(HybridTime(hybrid_time)); |
1553 | 1.37M | } |
1554 | | |
1555 | | // Return a log prefix string in the standard "T xxx P yyy" format. |
1556 | 350k | std::string LogPrefix() const { |
1557 | 350k | return consensus::MakeTabletLogPrefix(meta_->raft_group_id(), meta_->fs_manager()->uuid()); |
1558 | 350k | } |
1559 | | |
1560 | 755k | Env* GetEnv() { |
1561 | 755k | if (data_.tablet_init_data.tablet_options.env) { |
1562 | 755k | return data_.tablet_init_data.tablet_options.env; |
1563 | 755k | } |
1564 | 4 | return meta_->fs_manager()->env(); |
1565 | 755k | } |
1566 | | |
1567 | 150k | void CleanupSnapshots() { |
1568 | | // Disk clean-up: deleting temporary/incomplete snapshots. |
1569 | 150k | const string top_snapshots_dir = TabletSnapshots::SnapshotsDirName(meta_->rocksdb_dir()); |
1570 | | |
1571 | 150k | if (meta_->fs_manager()->env()->FileExists(top_snapshots_dir)) { |
1572 | 2.23k | vector<string> snapshot_dirs; |
1573 | 2.23k | Status s = meta_->fs_manager()->env()->GetChildren( |
1574 | 2.23k | top_snapshots_dir, ExcludeDots::kTrue, &snapshot_dirs); |
1575 | | |
1576 | 2.23k | if (!s.ok()) { |
1577 | 0 | LOG_WITH_PREFIX(WARNING) << "Cannot get list of snapshot directories in " |
1578 | 0 | << top_snapshots_dir << ": " << s; |
1579 | 2.23k | } else { |
1580 | 2.23k | for (const string& dir_name : snapshot_dirs) { |
1581 | 0 | const string snapshot_dir = JoinPathSegments(top_snapshots_dir, dir_name); |
1582 | |
|
1583 | 0 | if (TabletSnapshots::IsTempSnapshotDir(snapshot_dir)) { |
1584 | 0 | LOG_WITH_PREFIX(INFO) << "Deleting old temporary snapshot directory " << snapshot_dir; |
1585 | |
|
1586 | 0 | s = meta_->fs_manager()->env()->DeleteRecursively(snapshot_dir); |
1587 | 0 | if (!s.ok()) { |
1588 | 0 | LOG_WITH_PREFIX(WARNING) << "Cannot delete old temporary snapshot directory " |
1589 | 0 | << snapshot_dir << ": " << s; |
1590 | 0 | } |
1591 | |
|
1592 | 0 | s = meta_->fs_manager()->env()->SyncDir(top_snapshots_dir); |
1593 | 0 | if (!s.ok()) { |
1594 | 0 | LOG_WITH_PREFIX(WARNING) << "Cannot sync top snapshots dir " << top_snapshots_dir |
1595 | 0 | << ": " << s; |
1596 | 0 | } |
1597 | 0 | } |
1598 | 0 | } |
1599 | 2.23k | } |
1600 | 2.23k | } |
1601 | 150k | } |
1602 | | |
1603 | | // Goes through the contiguous prefix of pending_replicates and applies those that are committed |
1604 | | // by calling MaybeReplayCommittedEntry. |
1605 | 1.37M | CHECKED_STATUS ApplyCommittedPendingReplicates() { |
1606 | 1.37M | auto& pending_replicates = replay_state_->pending_replicates; |
1607 | 1.37M | auto iter = pending_replicates.begin(); |
1608 | 2.68M | while (iter != pending_replicates.end() && replay_state_->CanApply(iter->second.entry.get())2.64M ) { |
1609 | 18.4E | VLOG_WITH_PREFIX(1) << "Applying committed pending replicate " |
1610 | 18.4E | << iter->second.entry->replicate().id(); |
1611 | 1.30M | auto op_id = iter->second.entry->replicate().id(); |
1612 | 1.30M | RETURN_NOT_OK(MaybeReplayCommittedEntry(iter->second.entry.get(), iter->second.entry_time)); |
1613 | 1.30M | iter = pending_replicates.erase(iter); // erase and advance the iterator (C++11) |
1614 | 1.30M | ++replay_state_->num_entries_applied_to_rocksdb; |
1615 | 1.30M | } |
1616 | 1.37M | return Status::OK(); |
1617 | 1.37M | } |
1618 | | |
1619 | | // ---------------------------------------------------------------------------------------------- |
1620 | | // Member fields |
1621 | | // ---------------------------------------------------------------------------------------------- |
1622 | | |
1623 | | BootstrapTabletData data_; |
1624 | | RaftGroupMetadataPtr meta_; |
1625 | | std::shared_ptr<MemTracker> mem_tracker_; |
1626 | | TabletStatusListener* listener_; |
1627 | | TabletPtr tablet_; |
1628 | | scoped_refptr<log::Log> log_; |
1629 | | std::unique_ptr<log::LogReader> log_reader_; |
1630 | | std::unique_ptr<ReplayState> replay_state_; |
1631 | | |
1632 | | std::unique_ptr<consensus::ConsensusMetadata> cmeta_; |
1633 | | |
1634 | | // Thread pool for append task for bootstrap. |
1635 | | ThreadPool* append_pool_; |
1636 | | |
1637 | | ThreadPool* allocation_pool_; |
1638 | | |
1639 | | // Statistics on the replay of entries in the log. |
1640 | | struct Stats { |
1641 | | std::string ToString() const; |
1642 | | |
1643 | | // Number of REPLICATE messages read from the log |
1644 | | int ops_read = 0; |
1645 | | |
1646 | | // Number of REPLICATE messages which were overwritten by later entries. |
1647 | | int ops_overwritten = 0; |
1648 | | } stats_; |
1649 | | |
1650 | | HybridTime rocksdb_last_entry_hybrid_time_ = HybridTime::kMin; |
1651 | | |
1652 | | bool skip_wal_rewrite_; |
1653 | | |
1654 | | // A way to inject flushed OpIds for regular and intents RocksDBs. |
1655 | | boost::optional<DocDbOpIds> TEST_docdb_flushed_op_ids_; |
1656 | | |
1657 | | bool TEST_collect_replayed_op_ids_; |
1658 | | |
1659 | | // This is populated if TEST_collect_replayed_op_ids is true. |
1660 | | std::vector<yb::OpId> TEST_replayed_op_ids_; |
1661 | | |
1662 | | std::shared_ptr<TabletBootstrapTestHooksIf> test_hooks_; |
1663 | | |
1664 | | DISALLOW_COPY_AND_ASSIGN(TabletBootstrap); |
1665 | | }; |
1666 | | |
1667 | | // ============================================================================ |
1668 | | // Class TabletBootstrap::Stats. |
1669 | | // ============================================================================ |
1670 | | |
1671 | 7.46k | string TabletBootstrap::Stats::ToString() const { |
1672 | 7.46k | return Format("Read operations: $0, overwritten operations: $1", |
1673 | 7.46k | ops_read, ops_overwritten); |
1674 | 7.46k | } |
1675 | | |
1676 | | CHECKED_STATUS BootstrapTabletImpl( |
1677 | | const BootstrapTabletData& data, |
1678 | | TabletPtr* rebuilt_tablet, |
1679 | | scoped_refptr<log::Log>* rebuilt_log, |
1680 | 150k | consensus::ConsensusBootstrapInfo* results) { |
1681 | 150k | TabletBootstrap tablet_bootstrap(data); |
1682 | 150k | auto bootstrap_status = tablet_bootstrap.Bootstrap(rebuilt_tablet, rebuilt_log, results); |
1683 | 150k | if (!bootstrap_status.ok()) { |
1684 | 15 | LOG(WARNING) << "T " << (*rebuilt_tablet ? (*rebuilt_tablet)->tablet_id()0 : "N/A") |
1685 | 15 | << " Tablet bootstrap failed: " << bootstrap_status; |
1686 | 15 | } |
1687 | 150k | return bootstrap_status; |
1688 | 150k | } |
1689 | | |
1690 | | } // namespace tablet |
1691 | | } // namespace yb |