YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/tablet/tablet_bootstrap.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include "yb/tablet/tablet_bootstrap.h"
34
35
#include <map>
36
#include <set>
37
38
#include <boost/preprocessor/cat.hpp>
39
#include <boost/preprocessor/stringize.hpp>
40
41
#include "yb/common/common_fwd.h"
42
#include "yb/common/schema.h"
43
#include "yb/common/wire_protocol.h"
44
45
#include "yb/consensus/consensus.h"
46
#include "yb/consensus/consensus.pb.h"
47
#include "yb/consensus/consensus_meta.h"
48
#include "yb/consensus/consensus_util.h"
49
#include "yb/consensus/log.h"
50
#include "yb/consensus/log_anchor_registry.h"
51
#include "yb/consensus/log_index.h"
52
#include "yb/consensus/log_reader.h"
53
#include "yb/consensus/log_util.h"
54
#include "yb/consensus/opid_util.h"
55
#include "yb/consensus/retryable_requests.h"
56
57
#include "yb/docdb/consensus_frontier.h"
58
#include "yb/docdb/value_type.h"
59
60
#include "yb/gutil/casts.h"
61
#include "yb/gutil/ref_counted.h"
62
#include "yb/gutil/strings/substitute.h"
63
#include "yb/gutil/thread_annotations.h"
64
65
#include "yb/rpc/rpc_fwd.h"
66
67
#include "yb/tablet/tablet_fwd.h"
68
#include "yb/tablet/mvcc.h"
69
#include "yb/tablet/operations/change_metadata_operation.h"
70
#include "yb/tablet/operations/history_cutoff_operation.h"
71
#include "yb/tablet/operations/snapshot_operation.h"
72
#include "yb/tablet/operations/split_operation.h"
73
#include "yb/tablet/operations/truncate_operation.h"
74
#include "yb/tablet/operations/update_txn_operation.h"
75
#include "yb/tablet/operations/write_operation.h"
76
#include "yb/tablet/snapshot_coordinator.h"
77
#include "yb/tablet/tablet.h"
78
#include "yb/tablet/tablet_metadata.h"
79
#include "yb/tablet/tablet_options.h"
80
#include "yb/tablet/tablet_snapshots.h"
81
#include "yb/tablet/tablet_splitter.h"
82
#include "yb/tablet/transaction_coordinator.h"
83
#include "yb/tablet/transaction_participant.h"
84
85
#include "yb/tserver/backup.pb.h"
86
87
#include "yb/util/atomic.h"
88
#include "yb/util/env_util.h"
89
#include "yb/util/fault_injection.h"
90
#include "yb/util/flag_tags.h"
91
#include "yb/util/format.h"
92
#include "yb/util/logging.h"
93
#include "yb/util/metric_entity.h"
94
#include "yb/util/monotime.h"
95
#include "yb/util/opid.h"
96
#include "yb/util/scope_exit.h"
97
#include "yb/util/status.h"
98
#include "yb/util/status_format.h"
99
#include "yb/util/stopwatch.h"
100
101
DEFINE_bool(skip_remove_old_recovery_dir, false,
102
            "Skip removing WAL recovery dir after startup. (useful for debugging)");
103
TAG_FLAG(skip_remove_old_recovery_dir, hidden);
104
105
DEFINE_bool(skip_wal_rewrite, true,
106
            "Skip rewriting WAL files during bootstrap.");
107
TAG_FLAG(skip_wal_rewrite, experimental);
108
TAG_FLAG(skip_wal_rewrite, runtime);
109
110
DEFINE_test_flag(double, fault_crash_during_log_replay, 0.0,
111
                 "Fraction of the time when the tablet will crash immediately "
112
                 "after processing a log entry during log replay.");
113
114
DECLARE_uint64(max_clock_sync_error_usec);
115
116
DEFINE_bool(force_recover_flushed_frontier, false,
117
            "Could be used to ignore the flushed frontier metadata from RocksDB manifest and "
118
            "recover it from the log instead.");
119
TAG_FLAG(force_recover_flushed_frontier, hidden);
120
TAG_FLAG(force_recover_flushed_frontier, advanced);
121
122
DEFINE_bool(skip_flushed_entries, true,
123
            "Only replay WAL entries that are not flushed to RocksDB or within the retryable "
124
            "request timeout.");
125
126
DECLARE_int32(retryable_request_timeout_secs);
127
128
DEFINE_uint64(transaction_status_tablet_log_segment_size_bytes, 4_MB,
129
              "The segment size for transaction status tablet log roll-overs, in bytes.");
130
DEFINE_test_flag(int32, tablet_bootstrap_delay_ms, 0,
131
                 "Time (in ms) to delay tablet bootstrap by.");
132
133
DEFINE_test_flag(bool, dump_docdb_before_tablet_bootstrap, false,
134
                 "Dump the contents of DocDB before tablet bootstrap. Should only be used when "
135
                 "data is small.")
136
137
DEFINE_test_flag(bool, dump_docdb_after_tablet_bootstrap, false,
138
                 "Dump the contents of DocDB after tablet bootstrap. Should only be used when "
139
                 "data is small.")
140
141
namespace yb {
142
namespace tablet {
143
144
using namespace std::literals; // NOLINT
145
using namespace std::placeholders;
146
using std::shared_ptr;
147
148
using log::Log;
149
using log::LogEntryPB;
150
using log::LogOptions;
151
using log::LogReader;
152
using log::ReadableLogSegment;
153
using log::LogEntryMetadata;
154
using log::LogIndex;
155
using log::CreateNewSegment;
156
using log::SegmentSequence;
157
using consensus::ChangeConfigRecordPB;
158
using consensus::RaftConfigPB;
159
using consensus::ConsensusBootstrapInfo;
160
using consensus::ConsensusMetadata;
161
using consensus::MinimumOpId;
162
using consensus::OpIdEquals;
163
using consensus::OpIdToString;
164
using consensus::ReplicateMsg;
165
using consensus::MakeOpIdPB;
166
using strings::Substitute;
167
using tserver::WriteRequestPB;
168
using tserver::TabletSnapshotOpRequestPB;
169
170
static string DebugInfo(const string& tablet_id,
171
                        uint64_t segment_seqno,
172
                        size_t entry_idx,
173
                        const string& segment_path,
174
0
                        const LogEntryPB* entry) {
175
  // Truncate the debug string to a reasonable length for logging.  Otherwise, glog will truncate
176
  // for us and we may miss important information which came after this long string.
177
0
  string debug_str = entry ? entry->ShortDebugString() : "<nullptr>"s;
178
0
  if (debug_str.size() > 500) {
179
0
    debug_str.resize(500);
180
0
    debug_str.append("...");
181
0
  }
182
0
  return Substitute("Debug Info: Error playing entry $0 of segment $1 of tablet $2. "
183
0
                    "Segment path: $3. Entry: $4", entry_idx, segment_seqno, tablet_id,
184
0
                    segment_path, debug_str);
185
0
}
186
187
// ================================================================================================
188
// Class ReplayState.
189
// ================================================================================================
190
191
struct Entry {
192
  std::unique_ptr<log::LogEntryPB> entry;
193
  RestartSafeCoarseTimePoint entry_time;
194
195
0
  std::string ToString() const {
196
0
    return Format("{ entry: $0 entry_time: $1 }", entry, entry_time);
197
0
  }
198
};
199
200
typedef std::map<int64_t, Entry> OpIndexToEntryMap;
201
202
// State kept during replay.
203
struct ReplayState {
204
  ReplayState(
205
      const DocDbOpIds& op_ids_,
206
      const std::string& log_prefix_)
207
      : stored_op_ids(op_ids_),
208
2.79k
        log_prefix(log_prefix_) {
209
2.79k
  }
210
211
  // Return true if 'b' is allowed to immediately follow 'a' in the log.
212
  static bool IsValidSequence(const OpId& a, const OpId& b);
213
214
  // Return a Corruption status if 'id' seems to be out-of-sequence in the log.
215
  Status CheckSequentialReplicateId(const consensus::ReplicateMsg& msg);
216
217
  void UpdateCommittedOpId(const OpId& id);
218
219
  // half_limit is half the limit on the number of entries added
220
  void AddEntriesToStrings(
221
      const OpIndexToEntryMap& entries, std::vector<std::string>* strings, size_t half_limit) const;
222
223
  // half_limit is half the limit on the number of entries to be dumped
224
  void DumpReplayStateToStrings(std::vector<std::string>* strings, int half_limit) const;
225
226
  bool CanApply(log::LogEntryPB* entry);
227
228
2.79k
  const std::string& LogPrefix() const { return log_prefix; }
229
230
  void UpdateCommittedFromStored();
231
232
  // Determines the lowest possible OpId we have to replay. This is based on OpIds of operations
233
  // flushed to regular and intents RocksDBs. Also logs some diagnostics.
234
  OpId GetLowestOpIdToReplay(bool has_intents_db, const char* extra_log_prefix) const;
235
236
  // ----------------------------------------------------------------------------------------------
237
  // ReplayState member fields
238
  // ----------------------------------------------------------------------------------------------
239
240
  // The last replicate message's ID.
241
  OpId prev_op_id;
242
243
  // The last operation known to be committed. All other operations with lower IDs are also
244
  // committed.
245
  OpId committed_op_id;
246
247
  // All REPLICATE entries that have not been applied to RocksDB yet. We decide what entries are
248
  // safe to apply and delete from this map based on the commit index included into each REPLICATE
249
  // message.
250
  //
251
  // The key in this map is the Raft index.
252
  OpIndexToEntryMap pending_replicates;
253
254
  // ----------------------------------------------------------------------------------------------
255
  // State specific to RocksDB-backed tables (not transaction status table)
256
257
  const DocDbOpIds stored_op_ids;
258
259
  // Total number of log entries applied to RocksDB.
260
  int64_t num_entries_applied_to_rocksdb = 0;
261
262
  // If we encounter the last entry flushed to a RocksDB SSTable (as identified by the max
263
  // persistent sequence number), we remember the hybrid time of that entry in this field.
264
  // We guarantee that we'll either see that entry or a latter entry we know is committed into Raft
265
  // during log replay. This is crucial for properly setting safe time at bootstrap.
266
  HybridTime max_committed_hybrid_time = HybridTime::kMin;
267
268
  const std::string log_prefix;
269
};
270
271
2.79k
void ReplayState::UpdateCommittedFromStored() {
272
2.79k
  if (stored_op_ids.regular > committed_op_id) {
273
206
    committed_op_id = stored_op_ids.regular;
274
206
  }
275
276
2.79k
  if (stored_op_ids.intents > committed_op_id) {
277
0
    committed_op_id = stored_op_ids.intents;
278
0
  }
279
2.79k
}
280
281
// Return true if 'b' is allowed to immediately follow 'a' in the log.
282
1.37M
bool ReplayState::IsValidSequence(const OpId& a, const OpId& b) {
283
1.37M
  if (a.term == 0 && 
a.index == 02.79k
) {
284
    // Not initialized - can start with any opid.
285
2.79k
    return true;
286
2.79k
  }
287
288
  // Within the same term, we should never skip entries.
289
  // We can, however go backwards (see KUDU-783 for an example)
290
1.36M
  if (b.term == a.term && 
b.index > a.index + 11.35M
) {
291
0
    return false;
292
0
  }
293
294
  // TODO: check that the term does not decrease.
295
  // https://github.com/yugabyte/yugabyte-db/issues/5115
296
297
1.36M
  return true;
298
1.36M
}
299
300
// Return a Corruption status if 'id' seems to be out-of-sequence in the log.
301
1.37M
Status ReplayState::CheckSequentialReplicateId(const ReplicateMsg& msg) {
302
1.37M
  SCHECK(msg.has_id(), Corruption, "A REPLICATE message must have an id");
303
1.37M
  const auto msg_op_id = OpId::FromPB(msg.id());
304
1.37M
  if (PREDICT_FALSE(!IsValidSequence(prev_op_id, msg_op_id))) {
305
0
    string op_desc = Format(
306
0
        "$0 REPLICATE (Type: $1)", msg_op_id, OperationType_Name(msg.op_type()));
307
0
    return STATUS_FORMAT(Corruption,
308
0
                         "Unexpected op id following op id $0. Operation: $1",
309
0
                         prev_op_id, op_desc);
310
0
  }
311
312
1.37M
  prev_op_id = msg_op_id;
313
1.37M
  return Status::OK();
314
1.37M
}
315
316
1.37M
void ReplayState::UpdateCommittedOpId(const OpId& id) {
317
1.37M
  if (committed_op_id < id) {
318
1.06M
    
VLOG_WITH_PREFIX4
(1) << "Updating committed op id to " << id4
;
319
1.06M
    committed_op_id = id;
320
1.06M
  }
321
1.37M
}
322
323
void ReplayState::AddEntriesToStrings(const OpIndexToEntryMap& entries,
324
                                      std::vector<std::string>* strings,
325
394
                                      size_t half_limit) const {
326
394
  const auto n = entries.size();
327
394
  const bool overflow = n > 2 * half_limit;
328
394
  size_t index = 0;
329
1.22k
  for (const auto& entry : entries) {
330
1.22k
    if (!overflow || 
(0
index < half_limit0
||
index >= n - half_limit0
)) {
331
1.22k
      const auto& replicate = entry.second.entry.get()->replicate();
332
1.22k
      strings->push_back(Format(
333
1.22k
          "    [$0] op_id: $1 hybrid_time: $2 op_type: $3 committed_op_id: $4",
334
1.22k
          index + 1,
335
1.22k
          OpId::FromPB(replicate.id()),
336
1.22k
          replicate.hybrid_time(),
337
1.22k
          replicate.op_type(),
338
1.22k
          OpId::FromPB(replicate.committed_op_id())));
339
1.22k
    }
340
1.22k
    if (overflow && 
index == half_limit - 10
) {
341
0
      strings->push_back(Format("($0 lines skipped)", n - 2 * half_limit));
342
0
    }
343
1.22k
    index++;
344
1.22k
  }
345
394
}
346
347
void ReplayState::DumpReplayStateToStrings(
348
    std::vector<std::string>* strings,
349
2.79k
    int half_limit) const {
350
2.79k
  strings->push_back(Format(
351
2.79k
      "ReplayState: "
352
2.79k
      "Previous OpId: $0, "
353
2.79k
      "Committed OpId: $1, "
354
2.79k
      "Pending Replicates: $2, "
355
2.79k
      "Flushed Regular: $3, "
356
2.79k
      "Flushed Intents: $4",
357
2.79k
      prev_op_id,
358
2.79k
      committed_op_id,
359
2.79k
      pending_replicates.size(),
360
2.79k
      stored_op_ids.regular,
361
2.79k
      stored_op_ids.intents));
362
2.79k
  if (num_entries_applied_to_rocksdb > 0) {
363
2.79k
    strings->push_back(Substitute("Log entries applied to RocksDB: $0",
364
2.79k
                                  num_entries_applied_to_rocksdb));
365
2.79k
  }
366
2.79k
  if (!pending_replicates.empty()) {
367
394
    strings->push_back(Substitute("Dumping REPLICATES ($0 items):", pending_replicates.size()));
368
394
    AddEntriesToStrings(pending_replicates, strings, half_limit);
369
394
  }
370
2.79k
}
371
372
2.64M
bool ReplayState::CanApply(LogEntryPB* entry) {
373
2.64M
  return OpId::FromPB(entry->replicate().id()) <= committed_op_id;
374
2.64M
}
375
376
2.79k
OpId ReplayState::GetLowestOpIdToReplay(bool has_intents_db, const char* extra_log_prefix) const {
377
2.79k
  const auto op_id_replay_lowest =
378
2.79k
      has_intents_db ? 
std::min(stored_op_ids.regular, stored_op_ids.intents)422
379
2.79k
                     : 
stored_op_ids.regular2.37k
;
380
2.79k
  LOG_WITH_PREFIX(INFO)
381
2.79k
      << extra_log_prefix
382
2.79k
      << "op_id_replay_lowest=" << op_id_replay_lowest
383
2.79k
      << " (regular_op_id=" << stored_op_ids.regular
384
2.79k
      << ", intents_op_id=" << stored_op_ids.intents
385
2.79k
      << ", has_intents_db=" << has_intents_db << ")";
386
2.79k
  return op_id_replay_lowest;
387
2.79k
}
388
389
// ================================================================================================
390
// Class TabletBootstrap.
391
// ================================================================================================
392
393
namespace {
394
395
struct ReplayDecision {
396
  bool should_replay = false;
397
398
  // This is true for transaction update operations that have already been applied to the regular
399
  // RocksDB but not to the intents RocksDB.
400
  AlreadyAppliedToRegularDB already_applied_to_regular_db = AlreadyAppliedToRegularDB::kFalse;
401
402
0
  std::string ToString() const {
403
0
    return YB_STRUCT_TO_STRING(should_replay, already_applied_to_regular_db);
404
0
  }
405
};
406
407
ReplayDecision ShouldReplayOperation(
408
    consensus::OperationType op_type,
409
    const int64_t index,
410
    const int64_t regular_flushed_index,
411
    const int64_t intents_flushed_index,
412
    TransactionStatus txn_status,
413
1.30M
    bool write_op_has_transaction) {
414
  // In most cases we assume that intents_flushed_index <= regular_flushed_index but here we are
415
  // trying to be resilient to violations of that assumption.
416
1.30M
  if (index <= std::min(regular_flushed_index, intents_flushed_index)) {
417
    // Never replay anyting that is flushed to both regular and intents RocksDBs in a transactional
418
    // table.
419
18.4E
    VLOG_WITH_FUNC(3) << "index: " << index << " "
420
18.4E
                      << "regular_flushed_index: " << regular_flushed_index
421
18.4E
                      << " intents_flushed_index: " << intents_flushed_index;
422
145k
    return {false};
423
145k
  }
424
425
1.16M
  if (op_type == consensus::UPDATE_TRANSACTION_OP) {
426
33.7k
    if (txn_status == TransactionStatus::APPLYING &&
427
33.7k
        
intents_flushed_index < index31.0k
&&
index <= regular_flushed_index31.0k
) {
428
      // Intents were applied/flushed to regular RocksDB, but not flushed into the intents RocksDB.
429
4.81k
      
VLOG_WITH_FUNC0
(3) << "index: " << index << " "
430
0
                        << "regular_flushed_index: " << regular_flushed_index
431
0
                        << " intents_flushed_index: " << intents_flushed_index;
432
4.81k
      return {true, AlreadyAppliedToRegularDB::kTrue};
433
4.81k
    }
434
    // For other types of transaction updates, we ignore them if they have been flushed to the
435
    // regular RocksDB.
436
28.9k
    
VLOG_WITH_FUNC0
(3) << "index: " << index << " > "
437
0
                      << "regular_flushed_index: " << regular_flushed_index;
438
28.9k
    return {index > regular_flushed_index};
439
33.7k
  }
440
441
1.12M
  if (op_type == consensus::WRITE_OP && 
write_op_has_transaction1.11M
) {
442
    // Write intents that have not been flushed into the intents DB.
443
46.9k
    
VLOG_WITH_FUNC0
(3) << "index: " << index << " > "
444
0
                      << "intents_flushed_index: " << intents_flushed_index;
445
46.9k
    return {index > intents_flushed_index};
446
46.9k
  }
447
448
18.4E
  VLOG_WITH_FUNC(3) << "index: " << index << " > "
449
18.4E
                    << "regular_flushed_index: " << regular_flushed_index;
450
1.08M
  return {index > regular_flushed_index};
451
1.12M
}
452
453
1.30M
bool WriteOpHasTransaction(const ReplicateMsg& replicate) {
454
1.30M
  if (!replicate.has_write()) {
455
17.2k
    return false;
456
17.2k
  }
457
1.29M
  const auto& write_request = replicate.write();
458
1.29M
  if (!write_request.has_write_batch()) {
459
34.7k
    return false;
460
34.7k
  }
461
1.25M
  const auto& write_batch = write_request.write_batch();
462
1.25M
  if (write_batch.has_transaction()) {
463
53.2k
    return true;
464
53.2k
  }
465
7.15M
  
for (const auto& pair : write_batch.write_pairs())1.20M
{
466
7.15M
    if (
!pair.key().empty()7.15M
&& pair.key()[0] == docdb::ValueTypeAsChar::kExternalTransactionId) {
467
0
      return true;
468
0
    }
469
7.15M
  }
470
1.20M
  return false;
471
1.20M
}
472
473
}  // anonymous namespace
474
475
YB_STRONGLY_TYPED_BOOL(NeedsRecovery);
476
477
// Bootstraps an existing tablet by opening the metadata from disk, and rebuilding soft state by
478
// playing log segments. A bootstrapped tablet can then be added to an existing consensus
479
// configuration as a LEARNER, which will bring its state up to date with the rest of the consensus
480
// configuration, or it can start serving the data itself, after it has been appointed LEADER of
481
// that particular consensus configuration.
482
//
483
// NOTE: this does not handle pulling data from other replicas in the cluster. That is handled by
484
// the 'RemoteBootstrap' classes, which copy blocks and metadata locally before invoking this local
485
// bootstrap functionality.
486
//
487
// This class is not thread-safe.
488
class TabletBootstrap {
489
 public:
490
  explicit TabletBootstrap(const BootstrapTabletData& data)
491
      : data_(data),
492
        meta_(data.tablet_init_data.metadata),
493
        mem_tracker_(data.tablet_init_data.parent_mem_tracker),
494
        listener_(data.listener),
495
        append_pool_(data.append_pool),
496
        allocation_pool_(data.allocation_pool),
497
      skip_wal_rewrite_(FLAGS_skip_wal_rewrite) ,
498
150k
        test_hooks_(data.test_hooks) {
499
150k
  }
500
501
150k
  ~TabletBootstrap() {}
502
503
  CHECKED_STATUS Bootstrap(
504
      TabletPtr* rebuilt_tablet,
505
      scoped_refptr<log::Log>* rebuilt_log,
506
150k
      consensus::ConsensusBootstrapInfo* consensus_info) {
507
150k
    const string tablet_id = meta_->raft_group_id();
508
509
    // Replay requires a valid Consensus metadata file to exist in order to compare the committed
510
    // consensus configuration seqno with the log entries and also to persist committed but
511
    // unpersisted changes.
512
150k
    RETURN_NOT_OK_PREPEND(ConsensusMetadata::Load(meta_->fs_manager(), tablet_id,
513
150k
                                                  meta_->fs_manager()->uuid(), &cmeta_),
514
150k
                          "Unable to load Consensus metadata");
515
516
    // Make sure we don't try to locally bootstrap a tablet that was in the middle of a remote
517
    // bootstrap. It's likely that not all files were copied over successfully.
518
150k
    TabletDataState tablet_data_state = meta_->tablet_data_state();
519
150k
    if (!CanServeTabletData(tablet_data_state)) {
520
1
      return STATUS(Corruption, "Unable to locally bootstrap tablet " + tablet_id + ": " +
521
1
                                "RaftGroupMetadata bootstrap state is " +
522
1
                                TabletDataState_Name(tablet_data_state));
523
1
    }
524
525
150k
    listener_->StatusMessage("Bootstrap starting.");
526
527
150k
    if (VLOG_IS_ON(1)) {
528
0
      RaftGroupReplicaSuperBlockPB super_block;
529
0
      meta_->ToSuperBlock(&super_block);
530
0
      VLOG_WITH_PREFIX(1) << "Tablet Metadata: " << super_block.DebugString();
531
0
    }
532
533
150k
    const bool has_blocks = 
VERIFY_RESULT150k
(OpenTablet());150k
534
535
150k
    if (FLAGS_TEST_dump_docdb_before_tablet_bootstrap) {
536
0
      LOG_WITH_PREFIX(INFO) << "DEBUG: DocDB dump before tablet bootstrap:";
537
0
      tablet_->TEST_DocDBDumpToLog(IncludeIntents::kTrue);
538
0
    }
539
540
150k
    const auto needs_recovery = VERIFY_RESULT(PrepareToReplay());
541
150k
    if (needs_recovery && 
!skip_wal_rewrite_2.79k
) {
542
0
      RETURN_NOT_OK(OpenLogReader());
543
0
    }
544
545
    // This is a new tablet, nothing left to do.
546
150k
    if (!has_blocks && 
!needs_recovery149k
) {
547
147k
      LOG_WITH_PREFIX(INFO) << "No blocks or log segments found. Creating new log.";
548
147k
      RETURN_NOT_OK_PREPEND(OpenNewLog(CreateNewSegment::kTrue), "Failed to open new log");
549
147k
      RETURN_NOT_OK(FinishBootstrap("No bootstrap required, opened a new log",
550
147k
                                    rebuilt_log,
551
147k
                                    rebuilt_tablet));
552
147k
      consensus_info->last_id = MinimumOpId();
553
147k
      consensus_info->last_committed_id = MinimumOpId();
554
147k
      return Status::OK();
555
147k
    }
556
557
    // Only sleep if this isn't a new tablet, since we only want to delay on restart when testing.
558
2.48k
    if (PREDICT_FALSE(FLAGS_TEST_tablet_bootstrap_delay_ms > 0)) {
559
0
      SleepFor(MonoDelta::FromMilliseconds(FLAGS_TEST_tablet_bootstrap_delay_ms));
560
0
    }
561
562
    // If there were blocks, there must be segments to replay. This is required by Raft, since we
563
    // always need to know the term and index of the last logged op in order to vote, know how to
564
    // respond to AppendEntries(), etc.
565
2.48k
    if (has_blocks && 
!needs_recovery1.24k
) {
566
0
      return STATUS(IllegalState, Substitute("Tablet $0: Found rowsets but no log "
567
0
                                            "segments could be found.",
568
0
                                            tablet_id));
569
0
    }
570
571
2.48k
    RETURN_NOT_OK_PREPEND(PlaySegments(consensus_info), "Failed log replay. Reason");
572
573
2.48k
    if (cmeta_->current_term() < consensus_info->last_id.term()) {
574
407
      cmeta_->set_current_term(consensus_info->last_id.term());
575
407
    }
576
577
    // Flush the consensus metadata once at the end to persist our changes, if any.
578
2.48k
    RETURN_NOT_OK(cmeta_->Flush());
579
580
2.48k
    RETURN_NOT_OK(RemoveRecoveryDir());
581
582
2.48k
    if (FLAGS_force_recover_flushed_frontier) {
583
0
      RETURN_NOT_OK(tablet_->Flush(FlushMode::kSync));
584
0
      docdb::ConsensusFrontier new_consensus_frontier;
585
0
      new_consensus_frontier.set_op_id(consensus_info->last_committed_id);
586
0
      new_consensus_frontier.set_hybrid_time(tablet_->mvcc_manager()->LastReplicatedHybridTime());
587
      // We don't attempt to recover the history cutoff here because it will be recovered
588
      // automatically on the first compaction, and this is a special mode for manual
589
      // troubleshooting.
590
0
      LOG_WITH_PREFIX(WARNING)
591
0
          << "--force_recover_flushed_frontier specified, forcefully setting "
592
0
          << "flushed frontier after bootstrap: " << new_consensus_frontier.ToString();
593
0
      RETURN_NOT_OK(tablet_->ModifyFlushedFrontier(
594
0
          new_consensus_frontier, rocksdb::FrontierModificationMode::kForce));
595
0
    }
596
597
2.48k
    RETURN_NOT_OK(FinishBootstrap("Bootstrap complete.", rebuilt_log, rebuilt_tablet));
598
599
2.48k
    return Status::OK();
600
2.48k
  }
601
602
 private:
603
  // Finishes bootstrap, setting 'rebuilt_log' and 'rebuilt_tablet'.
604
  CHECKED_STATUS FinishBootstrap(
605
      const std::string& message,
606
      scoped_refptr<log::Log>* rebuilt_log,
607
150k
      TabletPtr* rebuilt_tablet) {
608
150k
    tablet_->MarkFinishedBootstrapping();
609
150k
    listener_->StatusMessage(message);
610
150k
    if (FLAGS_TEST_dump_docdb_after_tablet_bootstrap) {
611
0
      LOG_WITH_PREFIX(INFO) << "DEBUG: DocDB debug dump after tablet bootstrap:\n";
612
0
      tablet_->TEST_DocDBDumpToLog(IncludeIntents::kTrue);
613
0
    }
614
615
150k
    *rebuilt_tablet = std::move(tablet_);
616
150k
    RETURN_NOT_OK(log_->EnsureInitialNewSegmentAllocated());
617
150k
    rebuilt_log->swap(log_);
618
150k
    return Status::OK();
619
150k
  }
620
621
  // Sets result to true if there was any data on disk for this tablet.
622
150k
  Result<bool> OpenTablet() {
623
150k
    CleanupSnapshots();
624
625
150k
    auto tablet = std::make_shared<Tablet>(data_.tablet_init_data);
626
    // Doing nothing for now except opening a tablet locally.
627
150k
    LOG_TIMING_PREFIX(INFO, LogPrefix(), "opening tablet") {
628
150k
      RETURN_NOT_OK(tablet->Open());
629
150k
    }
630
631
    // In theory, an error can happen in case of tablet Shutdown or in RocksDB object replacement
632
    // operation like RestoreSnapshot or Truncate. However, those operations can't really be
633
    // happening concurrently as we haven't opened the tablet yet.
634
150k
    const bool has_ss_tables = VERIFY_RESULT(tablet->HasSSTables());
635
636
0
    tablet_ = std::move(tablet);
637
150k
    return has_ss_tables;
638
150k
  }
639
640
  // Checks if a previous log recovery directory exists. If so, it deletes any files in the log dir
641
  // and sets 'needs_recovery' to true, meaning that the previous recovery attempt should be retried
642
  // from the recovery dir.
643
  //
644
  // Otherwise, if there is a log directory with log files in it, renames that log dir to the log
645
  // recovery dir and creates a new, empty log dir so that log replay can proceed. 'needs_recovery'
646
  // is also returned as true in this case.
647
  //
648
  // If no log segments are found, 'needs_recovery' is set to false.
649
150k
  Result<NeedsRecovery> PrepareToReplay() {
650
150k
    const string& log_dir = tablet_->metadata()->wal_dir();
651
652
    // If the recovery directory exists, then we crashed mid-recovery.  Throw away any logs from the
653
    // previous recovery attempt and restart the log replay process from the beginning using the
654
    // same recovery dir as last time.
655
150k
    const string recovery_path = FsManager::GetTabletWalRecoveryDir(log_dir);
656
150k
    if (GetEnv()->FileExists(recovery_path)) {
657
0
      LOG_WITH_PREFIX(INFO) << "Previous recovery directory found at " << recovery_path << ": "
658
0
                            << "Replaying log files from this location instead of " << log_dir;
659
660
      // Since we have a recovery directory, clear out the log_dir by recursively deleting it and
661
      // creating a new one so that we don't end up with remnants of old WAL segments or indexes
662
      // after replay.
663
0
      if (GetEnv()->FileExists(log_dir)) {
664
0
        LOG_WITH_PREFIX(INFO) << "Deleting old log files from previous recovery attempt in "
665
0
                              << log_dir;
666
0
        RETURN_NOT_OK_PREPEND(GetEnv()->DeleteRecursively(log_dir),
667
0
                              "Could not recursively delete old log dir " + log_dir);
668
0
      }
669
670
0
      RETURN_NOT_OK_PREPEND(env_util::CreateDirIfMissing(GetEnv(), DirName(log_dir)),
671
0
                            "Failed to create table log directory " + DirName(log_dir));
672
673
0
      RETURN_NOT_OK_PREPEND(env_util::CreateDirIfMissing(GetEnv(), log_dir),
674
0
                            "Failed to create tablet log directory " + log_dir);
675
676
0
      return NeedsRecovery::kTrue;
677
0
    }
678
679
    // If we made it here, there was no pre-existing recovery dir.  Now we look for log files in
680
    // log_dir, and if we find any then we rename the whole log_dir to a recovery dir and return
681
    // needs_recovery = true.
682
150k
    RETURN_NOT_OK_PREPEND(env_util::CreateDirIfMissing(GetEnv(), DirName(log_dir)),
683
150k
                          "Failed to create table log directory " + DirName(log_dir));
684
685
150k
    RETURN_NOT_OK_PREPEND(env_util::CreateDirIfMissing(GetEnv(), log_dir),
686
150k
                          "Failed to create tablet log directory " + log_dir);
687
688
150k
    vector<string> log_dir_children = VERIFY_RESULT_PREPEND(
689
150k
        GetEnv()->GetChildren(log_dir, ExcludeDots::kTrue), "Couldn't list log segments.");
690
691
    // To ensure consistent order of log messages. Note: this does not affect the replay order
692
    // of segments, only the order of INFO log messages below.
693
0
    sort(log_dir_children.begin(), log_dir_children.end());
694
695
150k
    bool needs_recovery = false;
696
150k
    for (const string& log_dir_child : log_dir_children) {
697
11.1k
      if (!log::IsLogFileName(log_dir_child)) {
698
778
        continue;
699
778
      }
700
701
10.3k
      needs_recovery = true;
702
10.3k
      string source_path = JoinPathSegments(log_dir, log_dir_child);
703
10.3k
      if (
skip_wal_rewrite_10.3k
) {
704
10.3k
        LOG_WITH_PREFIX(INFO) << "Will attempt to recover log segment " << source_path;
705
10.3k
        continue;
706
10.3k
      }
707
708
18.4E
      string dest_path = JoinPathSegments(recovery_path, log_dir_child);
709
18.4E
      LOG_WITH_PREFIX(INFO) << "Will attempt to recover log segment " << source_path
710
18.4E
                            << " to " << dest_path;
711
18.4E
    }
712
713
150k
    if (!skip_wal_rewrite_ && 
needs_recovery0
) {
714
      // Atomically rename the log directory to the recovery directory and then re-create the log
715
      // directory.
716
0
      LOG_WITH_PREFIX(INFO) << "Moving log directory " << log_dir << " to recovery directory "
717
0
                            << recovery_path << " in preparation for log replay";
718
0
      RETURN_NOT_OK_PREPEND(GetEnv()->RenameFile(log_dir, recovery_path),
719
0
                            Substitute("Could not move log directory $0 to recovery dir $1",
720
0
                                      log_dir, recovery_path));
721
0
      RETURN_NOT_OK_PREPEND(GetEnv()->CreateDir(log_dir),
722
0
                            "Failed to recreate log directory " + log_dir);
723
0
    }
724
150k
    return NeedsRecovery(needs_recovery);
725
150k
  }
726
727
  // Opens the latest log segments for the Tablet that will allow to rebuild the tablet's soft
728
  // state. If there are existing log segments in the tablet's log directly they are moved to a
729
  // "log-recovery" directory which is deleted when the replay process is completed (as they have
730
  // been duplicated in the current log directory).
731
  //
732
  // If a "log-recovery" directory is already present, we will continue to replay from the
733
  // "log-recovery" directory. Tablet metadata is updated once replay has finished from the
734
  // "log-recovery" directory.
735
0
  Status OpenLogReader() {
736
0
    auto wal_dir = tablet_->metadata()->wal_dir();
737
0
    auto wal_path = skip_wal_rewrite_ ? wal_dir :
738
0
        meta_->fs_manager()->GetTabletWalRecoveryDir(wal_dir);
739
0
    VLOG_WITH_PREFIX(1) << "Opening log reader in log recovery dir " << wal_path;
740
    // Open the reader.
741
0
    scoped_refptr<LogIndex> index(nullptr);
742
0
    RETURN_NOT_OK_PREPEND(
743
0
        LogReader::Open(
744
0
            GetEnv(),
745
0
            index,
746
0
            LogPrefix(),
747
0
            wal_path,
748
0
            tablet_->GetTableMetricsEntity().get(),
749
0
            tablet_->GetTabletMetricsEntity().get(),
750
0
            &log_reader_),
751
0
        "Could not open LogReader. Reason");
752
0
    return Status::OK();
753
0
  }
754
755
  // Removes the recovery directory and all files contained therein.  Intended to be invoked after
756
  // log replay successfully completes.
757
2.79k
  CHECKED_STATUS RemoveRecoveryDir() {
758
2.79k
    const string recovery_path = FsManager::GetTabletWalRecoveryDir(tablet_->metadata()->wal_dir());
759
2.79k
    if (!GetEnv()->FileExists(recovery_path)) {
760
2.79k
      VLOG
(1) << "Tablet WAL recovery dir " << recovery_path << " does not exist."0
;
761
2.79k
      if (!skip_wal_rewrite_) {
762
0
        return STATUS(IllegalState, "Expected recovery dir, none found.");
763
0
      }
764
2.79k
      return Status::OK();
765
2.79k
    }
766
767
0
    LOG_WITH_PREFIX(INFO) << "Preparing to delete log recovery files and directory "
768
0
                          << recovery_path;
769
770
0
    string tmp_path = Substitute("$0-$1", recovery_path, GetCurrentTimeMicros());
771
0
    LOG_WITH_PREFIX(INFO) << "Renaming log recovery dir from "  << recovery_path
772
0
                          << " to " << tmp_path;
773
0
    RETURN_NOT_OK_PREPEND(GetEnv()->RenameFile(recovery_path, tmp_path),
774
0
                          Substitute("Could not rename old recovery dir from: $0 to: $1",
775
0
                                    recovery_path, tmp_path));
776
777
0
    if (FLAGS_skip_remove_old_recovery_dir) {
778
0
      LOG_WITH_PREFIX(INFO) << "--skip_remove_old_recovery_dir enabled. NOT deleting " << tmp_path;
779
0
      return Status::OK();
780
0
    }
781
0
    LOG_WITH_PREFIX(INFO) << "Deleting all files from renamed log recovery directory " << tmp_path;
782
0
    RETURN_NOT_OK_PREPEND(GetEnv()->DeleteRecursively(tmp_path),
783
0
                          "Could not remove renamed recovery dir " + tmp_path);
784
0
    LOG_WITH_PREFIX(INFO) << "Completed deletion of old log recovery files and directory "
785
0
                          << tmp_path;
786
0
    return Status::OK();
787
0
  }
788
789
  // Opens a new log in the tablet's log directory.  The directory is expected to be clean.
790
150k
  CHECKED_STATUS OpenNewLog(log::CreateNewSegment create_new_segment) {
791
150k
    auto log_options = LogOptions();
792
150k
    const auto& metadata = *tablet_->metadata();
793
150k
    log_options.retention_secs = metadata.wal_retention_secs();
794
150k
    log_options.env = GetEnv();
795
150k
    if (tablet_->metadata()->table_type() == TableType::TRANSACTION_STATUS_TABLE_TYPE) {
796
48.0k
      auto log_segment_size = FLAGS_transaction_status_tablet_log_segment_size_bytes;
797
48.0k
      if (log_segment_size) {
798
48.0k
        log_options.segment_size_bytes = log_segment_size;
799
48.0k
      }
800
48.0k
    }
801
150k
    RETURN_NOT_OK(Log::Open(
802
150k
        log_options,
803
150k
        tablet_->tablet_id(),
804
150k
        metadata.wal_dir(),
805
150k
        metadata.fs_manager()->uuid(),
806
150k
        *tablet_->schema(),
807
150k
        metadata.schema_version(),
808
150k
        tablet_->GetTableMetricsEntity(),
809
150k
        tablet_->GetTabletMetricsEntity(),
810
150k
        append_pool_,
811
150k
        allocation_pool_,
812
150k
        metadata.cdc_min_replicated_index(),
813
150k
        &log_,
814
150k
        create_new_segment));
815
    // Disable sync temporarily in order to speed up appends during the bootstrap process.
816
150k
    log_->DisableSync();
817
150k
    return Status::OK();
818
150k
  }
819
820
  // Handle the given log entry. Validates entry.type() (it can only be REPLICATE), optionally
821
  // injects latency in tests, and delegates to HandleReplicateMessage.
822
  CHECKED_STATUS HandleEntry(
823
1.37M
      yb::log::LogEntryMetadata entry_metadata, std::unique_ptr<log::LogEntryPB>* entry_ptr) {
824
1.37M
    auto& entry = **entry_ptr;
825
1.37M
    
VLOG_WITH_PREFIX21
(2) << "Handling entry: " << entry.ShortDebugString()21
;
826
827
1.37M
    switch (entry.type()) {
828
1.37M
      case log::REPLICATE:
829
1.37M
        RETURN_NOT_OK(HandleReplicateMessage(entry_metadata, entry_ptr));
830
1.37M
        break;
831
1.37M
      default:
832
0
        return STATUS(Corruption, Substitute("Unexpected log entry type: $0", entry.type()));
833
1.37M
    }
834
1.37M
    MAYBE_FAULT(FLAGS_TEST_fault_crash_during_log_replay);
835
1.37M
    return Status::OK();
836
1.37M
  }
837
838
  // HandleReplicateMessage implements these important pieces of logic:
839
  //   - Removes the "tail" of pending_replicates overwritten by a new leader's operations when
840
  //     encountering an entry with an index lower than or equal to the index of an operation that
841
  //     is already present in pending_replicates.
842
  //   - Ignores entries that have already been flushed into regular and intents RocksDBs.
843
  //   - Updates committed OpId based on the comsmited OpId from the entry and calls
844
  //     ApplyCommittedPendingReplicates.
845
  //   - Updates the "monotonic counter" used for assigning internal keys in YCQL arrays.
846
  CHECKED_STATUS HandleReplicateMessage(
847
1.37M
      LogEntryMetadata entry_metadata, std::unique_ptr<log::LogEntryPB>* replicate_entry_ptr) {
848
1.37M
    auto& replicate_entry = **replicate_entry_ptr;
849
1.37M
    stats_.ops_read++;
850
851
1.37M
    const ReplicateMsg& replicate = replicate_entry.replicate();
852
18.4E
    VLOG_WITH_PREFIX(1) << "HandleReplicateMessage: " << entry_metadata.ToString()
853
18.4E
                        << ", op id: " << replicate.id()
854
18.4E
                        << ", committed op id: " << replicate.committed_op_id();
855
1.37M
    RETURN_NOT_OK(replay_state_->CheckSequentialReplicateId(replicate));
856
1.37M
    SCHECK(replicate.has_hybrid_time(), Corruption, "A REPLICATE message must have a hybrid time");
857
1.37M
    UpdateClock(replicate.hybrid_time());
858
859
    // This sets the monotonic counter to at least replicate.monotonic_counter() atomically.
860
1.37M
    tablet_->UpdateMonotonicCounter(replicate.monotonic_counter());
861
862
1.37M
    const auto op_id = OpId::FromPB(replicate_entry.replicate().id());
863
864
    // Append the replicate message to the log as is if we are not skipping wal rewrite. If we are
865
    // skipping, set consensus_state_only to true.
866
1.37M
    RETURN_NOT_OK(log_->Append(replicate_entry_ptr->get(), entry_metadata, skip_wal_rewrite_));
867
868
1.37M
    auto iter = replay_state_->pending_replicates.lower_bound(op_id.index);
869
870
    // If there was an entry with the same or higher index as the entry we're adding, then we need
871
    // to delete that entry and all entries with higher indexes.
872
1.37M
    if (iter != replay_state_->pending_replicates.end()) {
873
10.5k
      auto& existing_entry = iter->second;
874
10.5k
      auto& last_entry = replay_state_->pending_replicates.rbegin()->second;
875
876
10.5k
      LOG_WITH_PREFIX(INFO) << "Overwriting operations starting at: "
877
10.5k
                            << existing_entry.entry->replicate().id()
878
10.5k
                            << " up to: " << last_entry.entry->replicate().id()
879
10.5k
                            << " with operation: " << replicate.id();
880
10.5k
      stats_.ops_overwritten += std::distance(iter, replay_state_->pending_replicates.end());
881
10.5k
      if (test_hooks_) {
882
        // Tell the test framework about overwritten OpIds.
883
10.5k
        for (auto callback_iter = iter;
884
73.3k
             callback_iter != replay_state_->pending_replicates.end();
885
62.7k
             callback_iter++) {
886
62.7k
          test_hooks_->Overwritten(
887
62.7k
              yb::OpId::FromPB(callback_iter->second.entry->replicate().id()));
888
62.7k
        }
889
10.5k
      }
890
10.5k
      replay_state_->pending_replicates.erase(iter, replay_state_->pending_replicates.end());
891
10.5k
    }
892
893
    // We expect entry_metadata.entry_time to always be set for newly written WAL entries. However,
894
    // for some very old WALs, it might be missing.
895
18.4E
    LOG_IF_WITH_PREFIX(DFATAL, entry_metadata.entry_time == RestartSafeCoarseTimePoint())
896
18.4E
        << "Entry metadata must have a restart-safe time. OpId: " << OpId::FromPB(replicate.id());
897
898
1.37M
    CHECK(replay_state_->pending_replicates.emplace(
899
1.37M
        op_id.index, Entry{std::move(*replicate_entry_ptr), entry_metadata.entry_time}).second);
900
901
18.4E
    CHECK(replicate.has_committed_op_id())
902
18.4E
        << "Replicate message has no committed_op_id for table type "
903
18.4E
        << TableType_Name(tablet_->table_type()) << ". Replicate message:\n"
904
18.4E
        << replicate.DebugString();
905
906
    // We include the commit index as of the time a REPLICATE entry was added to the leader's log
907
    // into that entry. This allows us to decide when we can replay a REPLICATE entry during
908
    // bootstrap.
909
1.37M
    replay_state_->UpdateCommittedOpId(OpId::FromPB(replicate.committed_op_id()));
910
911
1.37M
    return ApplyCommittedPendingReplicates();
912
1.37M
  }
913
914
  // Replays the given committed operation.
915
  CHECKED_STATUS PlayAnyRequest(
916
270k
      ReplicateMsg* replicate, AlreadyAppliedToRegularDB already_applied_to_regular_db) {
917
270k
    const auto op_type = replicate->op_type();
918
270k
    if (test_hooks_) {
919
243k
      test_hooks_->Replayed(yb::OpId::FromPB(replicate->id()), already_applied_to_regular_db);
920
243k
    }
921
270k
    switch (op_type) {
922
227k
      case consensus::WRITE_OP:
923
227k
        return PlayWriteRequest(replicate, already_applied_to_regular_db);
924
925
2.75k
      case consensus::CHANGE_METADATA_OP:
926
2.75k
        return PlayChangeMetadataRequest(replicate);
927
928
4.50k
      case consensus::CHANGE_CONFIG_OP:
929
4.50k
        return PlayChangeConfigRequest(replicate);
930
931
0
      case consensus::TRUNCATE_OP:
932
0
        return PlayTruncateRequest(replicate);
933
934
2.53k
      case consensus::NO_OP:
935
2.53k
        return Status::OK();  // This is why it is a no-op!
936
937
33.7k
      case consensus::UPDATE_TRANSACTION_OP:
938
33.7k
        return PlayUpdateTransactionRequest(replicate, already_applied_to_regular_db);
939
940
0
      case consensus::SNAPSHOT_OP:
941
0
        return PlayTabletSnapshotRequest(replicate);
942
943
0
      case consensus::HISTORY_CUTOFF_OP:
944
0
        return PlayHistoryCutoffRequest(replicate);
945
946
1
      case consensus::SPLIT_OP:
947
1
        return PlaySplitOpRequest(replicate);
948
949
      // Unexpected cases:
950
0
      case consensus::UNKNOWN_OP:
951
0
        return STATUS(IllegalState, Substitute("Unsupported operation type: $0", op_type));
952
270k
    }
953
954
0
    LOG_WITH_PREFIX(DFATAL) << "Invalid operation type " << op_type
955
0
                            << "for a REPLICATE operation: " << replicate->ShortDebugString();
956
0
    return STATUS_FORMAT(Corruption, "Invalid operation type: $0", op_type);
957
270k
  }
958
959
0
  CHECKED_STATUS PlayTabletSnapshotRequest(ReplicateMsg* replicate_msg) {
960
0
    TabletSnapshotOpRequestPB* const snapshot = replicate_msg->mutable_snapshot_request();
961
962
0
    SnapshotOperation operation(tablet_.get(), snapshot);
963
0
    operation.set_hybrid_time(HybridTime(replicate_msg->hybrid_time()));
964
965
0
    return operation.Replicated(/* leader_term= */ yb::OpId::kUnknownTerm);
966
0
  }
967
968
0
  CHECKED_STATUS PlayHistoryCutoffRequest(ReplicateMsg* replicate_msg) {
969
0
    HistoryCutoffOperation operation(
970
0
        tablet_.get(), replicate_msg->mutable_history_cutoff());
971
972
0
    return operation.Apply(/* leader_term= */ yb::OpId::kUnknownTerm);
973
0
  }
974
975
1
  CHECKED_STATUS PlaySplitOpRequest(ReplicateMsg* replicate_msg) {
976
1
    SplitTabletRequestPB* const split_request = replicate_msg->mutable_split_request();
977
    // We might be asked to replay SPLIT_OP even if it was applied and flushed when
978
    // FLAGS_force_recover_flushed_frontier is set.
979
1
    if (split_request->tablet_id() != tablet_->tablet_id()) {
980
      // Ignore SPLIT_OP designated for ancestor tablet(s).
981
0
      return Status::OK();
982
0
    }
983
984
1
    if (tablet_->metadata()->tablet_data_state() == TabletDataState::TABLET_DATA_SPLIT_COMPLETED) {
985
      // Ignore SPLIT_OP if tablet has been already split.
986
1
      
VLOG_WITH_PREFIX_AND_FUNC0
(1) << "Tablet has been already split."0
;
987
1
      return Status::OK();
988
1
    }
989
990
0
    SplitOperation operation(tablet_.get(), data_.tablet_init_data.tablet_splitter, split_request);
991
0
    operation.set_hybrid_time(HybridTime(replicate_msg->hybrid_time()));
992
0
    return data_.tablet_init_data.tablet_splitter->ApplyTabletSplit(&operation, log_.get());
993
994
    // TODO(tsplit): In scope of https://github.com/yugabyte/yugabyte-db/issues/1461 add integration
995
    // tests for:
996
    // - tablet bootstrap of original tablet which hasn't been yet split and replaying split
997
    // operation.
998
    // - tablet bootstrap of original tablet which has been already successfully split and replaying
999
    // split operation.
1000
    // - tablet bootstrap of new after-split tablet replaying split operation.
1001
1
  }
1002
1003
  void HandleRetryableRequest(
1004
1.30M
      const ReplicateMsg& replicate, RestartSafeCoarseTimePoint entry_time) {
1005
1.30M
    if (!replicate.has_write())
1006
17.2k
      return;
1007
1008
1.29M
    if (data_.retryable_requests) {
1009
1.00M
      data_.retryable_requests->Bootstrap(replicate, entry_time);
1010
1.00M
    }
1011
1012
    // In a test, we might not have data_.retryable_requests, but we still want to tell the test
1013
    // that we would submit this OpId to retryable_requests.
1014
1.29M
    if (test_hooks_) {
1015
282k
      test_hooks_->RetryableRequest(OpId::FromPB(replicate.id()));
1016
282k
    }
1017
1.29M
  }
1018
1019
  // Performs various checks based on the OpId, and decides whether to replay the given operation.
1020
  // If so, calls PlayAnyRequest, or sometimes calls PlayUpdateTransactionRequest directly.
1021
  CHECKED_STATUS MaybeReplayCommittedEntry(
1022
1.30M
      LogEntryPB* replicate_entry, RestartSafeCoarseTimePoint entry_time) {
1023
1.30M
    ReplicateMsg* const replicate = replicate_entry->mutable_replicate();
1024
1.30M
    const auto op_type = replicate->op_type();
1025
1.30M
    const auto decision = ShouldReplayOperation(
1026
1.30M
        op_type,
1027
1.30M
        replicate->id().index(),
1028
1.30M
        replay_state_->stored_op_ids.regular.index,
1029
1.30M
        replay_state_->stored_op_ids.intents.index,
1030
        // txn_status
1031
1.30M
        replicate->has_transaction_state()
1032
1.30M
            ? 
replicate->transaction_state().status()37.5k
1033
1.30M
            : 
TransactionStatus::ABORTED1.27M
, // should not be used
1034
        // write_op_has_transaction
1035
1.30M
        WriteOpHasTransaction(*replicate));
1036
1037
1.30M
    HandleRetryableRequest(*replicate, entry_time);
1038
18.4E
    VLOG_WITH_PREFIX_AND_FUNC(3) << "decision: " << AsString(decision);
1039
1.30M
    if (decision.should_replay) {
1040
270k
      const auto status = PlayAnyRequest(replicate, decision.already_applied_to_regular_db);
1041
270k
      if (!status.ok()) {
1042
0
        return status.CloneAndAppend(Format(
1043
0
            "Failed to play $0 request. ReplicateMsg: { $1 }",
1044
0
            OperationType_Name(op_type), *replicate));
1045
0
      }
1046
270k
      replay_state_->max_committed_hybrid_time.MakeAtLeast(HybridTime(replicate->hybrid_time()));
1047
270k
    }
1048
1049
1.30M
    return Status::OK();
1050
1.30M
  }
1051
1052
2.79k
  void DumpReplayStateToLog() {
1053
    // Dump the replay state, this will log the pending replicates, which might be useful for
1054
    // debugging.
1055
2.79k
    vector<string> state_dump;
1056
2.79k
    constexpr int kMaxLinesToDump = 1000;
1057
2.79k
    replay_state_->DumpReplayStateToStrings(&state_dump, kMaxLinesToDump / 2);
1058
7.20k
    for (const string& line : state_dump) {
1059
7.20k
      LOG_WITH_PREFIX(INFO) << line;
1060
7.20k
    }
1061
2.79k
  }
1062
1063
2.79k
  Result<DocDbOpIds> GetFlushedOpIds() {
1064
2.79k
    const auto flushed_op_ids = VERIFY_RESULT(tablet_->MaxPersistentOpId());
1065
1066
2.79k
    if (FLAGS_force_recover_flushed_frontier) {
1067
      // This is used very rarely to replay all log entries and recover RocksDB flushed OpId
1068
      // metadata.
1069
0
      LOG_WITH_PREFIX(WARNING)
1070
0
          << "--force_recover_flushed_frontier specified, ignoring existing flushed frontiers "
1071
0
          << "from RocksDB metadata (will replay all log records): " << flushed_op_ids.ToString();
1072
0
      return DocDbOpIds();
1073
0
    }
1074
1075
2.79k
    if (test_hooks_) {
1076
407
      const auto docdb_flushed_op_ids_override = test_hooks_->GetFlushedOpIdsOverride();
1077
407
      if (docdb_flushed_op_ids_override.is_initialized()) {
1078
401
        LOG_WITH_PREFIX(INFO) << "Using test values of flushed DocDB OpIds: "
1079
401
                              << docdb_flushed_op_ids_override->ToString();
1080
401
        return *docdb_flushed_op_ids_override;
1081
401
      }
1082
407
    }
1083
1084
    // Production codepath.
1085
2.39k
    LOG_WITH_PREFIX(INFO) << "Flushed DocDB OpIds: " << flushed_op_ids.ToString();
1086
2.39k
    return flushed_op_ids;
1087
2.79k
  }
1088
1089
  // Determines the first segment to replay based two criteria:
1090
  // - The first OpId of the segment must be less than or equal to (in terms of OpId comparison
1091
  //   where term is compared first and index second) the "flushed OpId". This "flushed OpId" is
1092
  //   determined as the minimum of intents and regular RocksDBs' flushed OpIds for transactional
1093
  //   tables, and just the regular RocksDB's flushed OpId for non-transactional tables. Note that
1094
  //   in practice the flushed OpId of the intents RocksDB should also be less than or equal to the
1095
  //   flushed OpId of the regular RocksDB or this would be an invariant violation.
1096
  //
1097
  // - The "restart safe time" of the first operation in the segment that we choose to start the
1098
  //   replay with must be such that we guarantee that at least FLAGS_retryable_request_timeout_secs
1099
  //   seconds worth of latest log records are replayed. This is needed to allow deduplicating
1100
  //   automatic retries from YCQL and YSQL query layer and avoid Jepsen-type consistency
1101
  //   violations. We satisfy this constraint by taking the last segment's first operation's
1102
  //   restart-safe time, subtracting FLAGS_retryable_request_timeout_secs seconds from it, and
1103
  //   finding a segment that has that time or earlier as its first operation's restart-safe time.
1104
  //   This also means we are never allowed to start replay with the last segment, as long as
1105
  //   FLAGS_retryable_request_timeout_secs is greater than 0.
1106
  //
1107
  //   This "restart safe time" is similar to the regular Linux monotonic clock time, but is
1108
  //   maintained across tablet server restarts. See RestartSafeCoarseMonoClock for details.
1109
  //
1110
  //   See https://github.com/yugabyte/yugabyte-db/commit/5cf01889a1b4589a82085e578b5f4746c6614a5d
1111
  //   and the Git history of retryable_requests.cc for more context on this requirement.
1112
  //
1113
  // As long as the two conditions above are satisfied, it is advantageous to us to pick the latest
1114
  // possible segment to start replay with. That way we can skip the maximum number of segments.
1115
  //
1116
  // Returns the iterator pointing to the first segment to start replay with. Also produces a number
1117
  // of diagnostic log messages.
1118
  //
1119
  // This functionality was originally introduced in
1120
  // https://github.com/yugabyte/yugabyte-db/commit/41ef3f75e3c68686595c7613f53b649823b84fed
1121
2.79k
  SegmentSequence::iterator SkipFlushedEntries(SegmentSequence* segments_ptr) {
1122
2.79k
    static const char* kBootstrapOptimizerLogPrefix =
1123
2.79k
        "Bootstrap optimizer (skip_flushed_entries): ";
1124
1125
    // Lower bound on op IDs that need to be replayed. This is the "flushed OpId" that this
1126
    // function's comment mentions.
1127
2.79k
    const auto op_id_replay_lowest = replay_state_->GetLowestOpIdToReplay(
1128
        // Determine whether we have an intents DB.
1129
2.79k
        tablet_->doc_db().intents || 
(2.55k
test_hooks_2.55k
&&
test_hooks_->HasIntentsDB()407
),
1130
2.79k
        kBootstrapOptimizerLogPrefix);
1131
1132
2.79k
    SegmentSequence& segments = *segments_ptr;
1133
1134
    // Time point of the first entry of the last WAL segment, and how far back in time from it we
1135
    // should retain other entries.
1136
2.79k
    boost::optional<RestartSafeCoarseTimePoint> replay_from_this_or_earlier_time;
1137
2.79k
    const RestartSafeCoarseDuration min_seconds_to_retain_logs =
1138
2.79k
        std::chrono::seconds(GetAtomicFlag(&FLAGS_retryable_request_timeout_secs));
1139
1140
2.79k
    auto iter = segments.end();
1141
9.81k
    while (iter != segments.begin()) {
1142
7.48k
      --iter;
1143
7.48k
      ReadableLogSegment& segment = **iter;
1144
7.48k
      const std::string& segment_path = segment.path();
1145
1146
7.48k
      const auto first_op_metadata_result = segment.ReadFirstEntryMetadata();
1147
7.48k
      if (!first_op_metadata_result.ok()) {
1148
35
        if (test_hooks_) {
1149
0
          test_hooks_->FirstOpIdOfSegment(segment_path, OpId::Invalid());
1150
0
        }
1151
35
        LOG_WITH_PREFIX(WARNING)
1152
35
            << kBootstrapOptimizerLogPrefix
1153
35
            << "Could not read the first entry's metadata of log segment " << segment_path << ". "
1154
35
            << "Simply continuing to earlier segments to determine the first segment "
1155
35
            << "to start the replay at. The error was: " << first_op_metadata_result.status();
1156
35
        continue;
1157
35
      }
1158
7.44k
      const auto& first_op_metadata = *first_op_metadata_result;
1159
1160
7.44k
      const auto op_id = first_op_metadata.op_id;
1161
7.44k
      if (test_hooks_) {
1162
3.54k
        test_hooks_->FirstOpIdOfSegment(segment_path, op_id);
1163
3.54k
      }
1164
7.44k
      const RestartSafeCoarseTimePoint first_op_time = first_op_metadata.entry_time;
1165
1166
7.44k
      if (!replay_from_this_or_earlier_time.is_initialized()) {
1167
2.79k
        replay_from_this_or_earlier_time = first_op_time - min_seconds_to_retain_logs;
1168
2.79k
      }
1169
1170
7.44k
      const auto is_first_op_id_low_enough = op_id <= op_id_replay_lowest;
1171
7.44k
      const auto is_first_op_time_early_enough = first_op_time <= replay_from_this_or_earlier_time;
1172
1173
7.44k
      const auto common_details_str = [&]() {
1174
7.44k
        std::ostringstream ss;
1175
7.44k
        ss << EXPR_VALUE_FOR_LOG(first_op_time) << ", "
1176
7.44k
           << EXPR_VALUE_FOR_LOG(min_seconds_to_retain_logs) << ", "
1177
7.44k
           << EXPR_VALUE_FOR_LOG(*replay_from_this_or_earlier_time);
1178
7.44k
        return ss.str();
1179
7.44k
      };
1180
1181
7.44k
      if (is_first_op_id_low_enough && 
is_first_op_time_early_enough2.21k
) {
1182
465
        LOG_WITH_PREFIX(INFO)
1183
465
            << kBootstrapOptimizerLogPrefix
1184
465
            << "found first mandatory segment op id: " << op_id
1185
465
            << common_details_str() << ", "
1186
465
            << "number of segments to be skipped: " << (iter - segments.begin());
1187
465
        return iter;
1188
465
      }
1189
1190
6.98k
      LOG_WITH_PREFIX(INFO)
1191
6.98k
          << "Segment " << segment_path << " cannot be used as the first segment to start replay "
1192
6.98k
          << "with according to our OpId and retention criteria. "
1193
6.98k
          << (iter == segments.begin()
1194
6.98k
                  ? "However, this is already the earliest segment so we have to start replay "
1195
2.33k
                    "here. We should probably investigate how we got into this situation. "
1196
6.98k
                  : 
"Continuing to earlier segments."4.65k
)
1197
6.98k
          << EXPR_VALUE_FOR_LOG(op_id) << ", "
1198
6.98k
          << common_details_str() << ", "
1199
6.98k
          << EXPR_VALUE_FOR_LOG(is_first_op_id_low_enough) << ", "
1200
6.98k
          << EXPR_VALUE_FOR_LOG(is_first_op_time_early_enough);
1201
6.98k
    }
1202
1203
2.32k
    LOG_WITH_PREFIX(INFO)
1204
2.32k
        << kBootstrapOptimizerLogPrefix
1205
2.32k
        << "will replay all segments starting from the very first one.";
1206
1207
2.32k
    return iter;
1208
2.79k
  }
1209
1210
  // Plays the log segments into the tablet being built.  The process of playing the segments can
1211
  // work in two modes:
1212
  //
1213
  // - With skip_wal_rewrite enabled (default mode):
1214
  //   Reuses existing segments of the log, rebuilding log segment footers when necessary.
1215
  //
1216
  // - With skip_wal_rewrite disabled (legacy mode):
1217
  //   Moves the old log to a "recovery directory" and replays entries from the old into a new log.
1218
  //   This is very I/O-intensive. We should probably get rid of this mode eventually.
1219
  //
1220
  // The resulting log can be continued later on when then tablet is rebuilt and starts accepting
1221
  // writes from clients.
1222
2.79k
  CHECKED_STATUS PlaySegments(ConsensusBootstrapInfo* consensus_info) {
1223
2.79k
    const auto flushed_op_ids = VERIFY_RESULT(GetFlushedOpIds());
1224
1225
2.79k
    if (tablet_->snapshot_coordinator()) {
1226
      // We should load transaction aware snapshots before replaying logs, because we need them
1227
      // during this replay.
1228
89
      RETURN_NOT_OK(tablet_->snapshot_coordinator()->Load(tablet_.get()));
1229
89
    }
1230
1231
2.79k
    replay_state_ = std::make_unique<ReplayState>(flushed_op_ids, LogPrefix());
1232
2.79k
    replay_state_->max_committed_hybrid_time = VERIFY_RESULT(tablet_->MaxPersistentHybridTime());
1233
1234
2.79k
    if (FLAGS_force_recover_flushed_frontier) {
1235
0
      LOG_WITH_PREFIX(WARNING)
1236
0
          << "--force_recover_flushed_frontier specified, ignoring max committed hybrid time from  "
1237
0
          << "RocksDB metadata (will replay all log records): "
1238
0
          << replay_state_->max_committed_hybrid_time;
1239
0
      replay_state_->max_committed_hybrid_time = HybridTime::kMin;
1240
2.79k
    } else {
1241
2.79k
      LOG_WITH_PREFIX(INFO) << "Max persistent index in RocksDB's SSTables before bootstrap: "
1242
2.79k
                            << "regular RocksDB: "
1243
2.79k
                            << replay_state_->stored_op_ids.regular << "; "
1244
2.79k
                            << "intents RocksDB: "
1245
2.79k
                            << replay_state_->stored_op_ids.intents;
1246
2.79k
    }
1247
1248
    // Open the log.
1249
    //
1250
    // If skip_wal_rewrite is true (default case), defer appending to this log until bootstrap is
1251
    // finished to preserve the state of old log. In that case we don't need to create a new
1252
    // segment until bootstrap is done.
1253
    //
1254
    // If skip_wal_rewrite is false, create a new segment and append each replayed entry to this
1255
    // new log.
1256
2.79k
    RETURN_NOT_OK_PREPEND(
1257
2.79k
        OpenNewLog(log::CreateNewSegment(!FLAGS_skip_wal_rewrite)), "Failed to open new log");
1258
1259
2.79k
    log::SegmentSequence segments;
1260
2.79k
    RETURN_NOT_OK(log_->GetSegmentsSnapshot(&segments));
1261
1262
    // Find the earliest log segment we need to read, so the rest can be ignored.
1263
2.79k
    auto iter = FLAGS_skip_flushed_entries ? 
SkipFlushedEntries(&segments)2.79k
:
segments.begin()3
;
1264
1265
2.79k
    yb::OpId last_committed_op_id;
1266
2.79k
    yb::OpId last_read_entry_op_id;
1267
2.79k
    RestartSafeCoarseTimePoint last_entry_time;
1268
10.2k
    for (; iter != segments.end(); 
++iter7.48k
) {
1269
7.48k
      const scoped_refptr<ReadableLogSegment>& segment = *iter;
1270
1271
7.48k
      auto read_result = segment->ReadEntries();
1272
7.48k
      last_committed_op_id = std::max(last_committed_op_id, read_result.committed_op_id);
1273
7.48k
      if (!read_result.entries.empty()) {
1274
7.44k
        last_read_entry_op_id = yb::OpId::FromPB(read_result.entries.back()->replicate().id());
1275
7.44k
      }
1276
1.37M
      for (size_t entry_idx = 0; entry_idx < read_result.entries.size(); 
++entry_idx1.37M
) {
1277
1.37M
        const Status s = HandleEntry(
1278
1.37M
            read_result.entry_metadata[entry_idx], &read_result.entries[entry_idx]);
1279
1.37M
        if (!s.ok()) {
1280
0
          LOG_WITH_PREFIX(INFO) << "Dumping replay state to log: " << s;
1281
0
          DumpReplayStateToLog();
1282
0
          RETURN_NOT_OK_PREPEND(s, DebugInfo(tablet_->tablet_id(),
1283
0
                                            segment->header().sequence_number(),
1284
0
                                            entry_idx, segment->path(),
1285
0
                                            read_result.entries[entry_idx].get()));
1286
0
        }
1287
1.37M
      }
1288
7.48k
      if (!read_result.entry_metadata.empty()) {
1289
7.44k
        last_entry_time = read_result.entry_metadata.back().entry_time;
1290
7.44k
      }
1291
1292
      // If the LogReader failed to read for some reason, we'll still try to replay as many entries
1293
      // as possible, and then fail with Corruption.
1294
7.48k
      if (PREDICT_FALSE(!read_result.status.ok())) {
1295
1
        return STATUS_FORMAT(Corruption,
1296
1
                            "Error reading Log Segment of tablet $0: $1 "
1297
1
                                "(Read up to entry $2 of segment $3, in path $4)",
1298
1
                            tablet_->tablet_id(),
1299
1
                            read_result.status,
1300
1
                            read_result.entries.size(),
1301
1
                            segment->header().sequence_number(),
1302
1
                            segment->path());
1303
1
      }
1304
1305
      // TODO: could be more granular here and log during the segments as well, plus give info about
1306
      // number of MB processed, but this is better than nothing.
1307
7.48k
      auto status = Format(
1308
7.48k
          "Bootstrap replayed $0/$1 log segments. $2. Pending: $3 replicates. "
1309
7.48k
              "Last read committed op id: $4",
1310
7.48k
          (iter - segments.begin()) + 1, segments.size(), stats_,
1311
7.48k
          replay_state_->pending_replicates.size(), read_result.committed_op_id);
1312
7.48k
      if (read_result.entry_metadata.empty()) {
1313
35
        status += ", no entries in last segment";
1314
7.44k
      } else {
1315
7.44k
        status += ", last entry metadata: " + read_result.entry_metadata.back().ToString() +
1316
7.44k
                  ", last read entry op id: " + last_read_entry_op_id.ToString();
1317
7.44k
      }
1318
7.48k
      listener_->StatusMessage(status);
1319
7.48k
    }
1320
1321
2.79k
    replay_state_->UpdateCommittedFromStored();
1322
2.79k
    RETURN_NOT_OK(ApplyCommittedPendingReplicates());
1323
1324
2.79k
    if (last_committed_op_id.index > replay_state_->committed_op_id.index) {
1325
2.14k
      auto it = replay_state_->pending_replicates.find(last_committed_op_id.index);
1326
2.14k
      if (it != replay_state_->pending_replicates.end()) {
1327
        // That should be guaranteed by RAFT protocol. If record is committed, it cannot
1328
        // be overriden by a new leader.
1329
2.14k
        if (
last_committed_op_id.term == it->second.entry->replicate().id().term()2.14k
) {
1330
2.14k
          replay_state_->UpdateCommittedOpId(last_committed_op_id);
1331
2.14k
          RETURN_NOT_OK(ApplyCommittedPendingReplicates());
1332
18.4E
        } else {
1333
18.4E
          DumpReplayStateToLog();
1334
18.4E
          LOG_WITH_PREFIX(DFATAL)
1335
18.4E
              << "Invalid last committed op id: " << last_committed_op_id
1336
18.4E
              << ", record with this index has another term: "
1337
18.4E
              << it->second.entry->replicate().id();
1338
18.4E
        }
1339
2.14k
      } else {
1340
0
        DumpReplayStateToLog();
1341
0
        LOG_WITH_PREFIX(DFATAL)
1342
0
            << "Does not have an entry for the last committed index: " << last_committed_op_id
1343
0
            << ", entries: " << yb::ToString(replay_state_->pending_replicates);
1344
0
      }
1345
2.14k
    }
1346
1347
2.79k
    LOG_WITH_PREFIX(INFO) << "Dumping replay state to log at the end of " << __FUNCTION__;
1348
2.79k
    DumpReplayStateToLog();
1349
1350
    // Set up the ConsensusBootstrapInfo structure for the caller.
1351
2.79k
    for (auto& e : replay_state_->pending_replicates) {
1352
      // We only allow log entries with an index later than the index of the last log entry already
1353
      // applied to RocksDB to be passed to the tablet as "orphaned replicates". This will make sure
1354
      // we don't try to write to RocksDB with non-monotonic sequence ids, but still create
1355
      // ConsensusRound instances for writes that have not been persisted into RocksDB.
1356
1.22k
      consensus_info->orphaned_replicates.emplace_back(e.second.entry->release_replicate());
1357
1.22k
    }
1358
2.79k
    LOG_WITH_PREFIX(INFO)
1359
2.79k
        << "Number of orphaned replicates: " << consensus_info->orphaned_replicates.size()
1360
2.79k
        << ", last id: " << replay_state_->prev_op_id
1361
2.79k
        << ", committed id: " << replay_state_->committed_op_id;
1362
1363
2.79k
    SCHECK_FORMAT(
1364
2.79k
        replay_state_->prev_op_id.term >= replay_state_->committed_op_id.term &&
1365
2.79k
            replay_state_->prev_op_id.index >= replay_state_->committed_op_id.index,
1366
2.79k
        IllegalState,
1367
2.79k
        "WAL files missing, or committed op id is incorrect. Expected both term and index "
1368
2.79k
            "of prev_op_id to be greater than or equal to the corresponding components of "
1369
2.79k
            "committed_op_id. prev_op_id=$0, committed_op_id=$1",
1370
2.79k
        replay_state_->prev_op_id, replay_state_->committed_op_id);
1371
1372
2.79k
    tablet_->mvcc_manager()->SetLastReplicated(replay_state_->max_committed_hybrid_time);
1373
2.79k
    consensus_info->last_id = MakeOpIdPB(replay_state_->prev_op_id);
1374
2.79k
    consensus_info->last_committed_id = MakeOpIdPB(replay_state_->committed_op_id);
1375
1376
2.79k
    if (data_.retryable_requests) {
1377
2.29k
      data_.retryable_requests->Clock().Adjust(last_entry_time);
1378
2.29k
    }
1379
1380
2.79k
    return Status::OK();
1381
2.79k
  }
1382
1383
  CHECKED_STATUS PlayWriteRequest(
1384
227k
      ReplicateMsg* replicate_msg, AlreadyAppliedToRegularDB already_applied_to_regular_db) {
1385
227k
    SCHECK(replicate_msg->has_hybrid_time(), IllegalState,
1386
227k
           "A write operation with no hybrid time");
1387
1388
227k
    auto* write = replicate_msg->mutable_write();
1389
1390
227k
    SCHECK(write->has_write_batch(), Corruption, "A write request must have a write batch");
1391
1392
227k
    WriteOperation operation(tablet_.get(), write);
1393
227k
    operation.set_op_id(OpId::FromPB(replicate_msg->id()));
1394
227k
    HybridTime hybrid_time(replicate_msg->hybrid_time());
1395
227k
    operation.set_hybrid_time(hybrid_time);
1396
1397
227k
    auto op_id = operation.op_id();
1398
227k
    tablet_->mvcc_manager()->AddFollowerPending(hybrid_time, op_id);
1399
1400
227k
    if (test_hooks_ &&
1401
227k
        
replicate_msg->has_write()212k
&&
1402
227k
        
replicate_msg->write().has_write_batch()212k
&&
1403
227k
        
replicate_msg->write().write_batch().has_transaction()212k
&&
1404
227k
        
test_hooks_->ShouldSkipWritingIntents()46.9k
) {
1405
      // Used in unit tests to avoid instantiating the entire transactional subsystem.
1406
46.9k
      tablet_->mvcc_manager()->Replicated(hybrid_time, op_id);
1407
46.9k
      return Status::OK();
1408
46.9k
    }
1409
1410
180k
    auto apply_status = tablet_->ApplyRowOperations(
1411
180k
        &operation, already_applied_to_regular_db);
1412
    // Failure is regular case, since could happen because transaction was aborted, while
1413
    // replicating its intents.
1414
18.4E
    LOG_IF(INFO, !apply_status.ok()) << "Apply operation failed: " << apply_status;
1415
1416
180k
    tablet_->mvcc_manager()->Replicated(hybrid_time, op_id);
1417
180k
    return Status::OK();
1418
227k
  }
1419
1420
2.75k
  CHECKED_STATUS PlayChangeMetadataRequest(ReplicateMsg* replicate_msg) {
1421
2.75k
    ChangeMetadataRequestPB* request = replicate_msg->mutable_change_metadata_request();
1422
1423
    // Decode schema
1424
2.75k
    Schema schema;
1425
2.75k
    if (request->has_schema()) {
1426
1.49k
      RETURN_NOT_OK(SchemaFromPB(request->schema(), &schema));
1427
1.49k
    }
1428
1429
2.75k
    ChangeMetadataOperation operation(request);
1430
1431
    // If table id isn't in metadata, ignore the replay as the table might've been dropped.
1432
2.75k
    auto table_info = meta_->GetTableInfo(operation.table_id());
1433
2.75k
    if (!table_info.ok()) {
1434
2
      LOG_WITH_PREFIX(WARNING) << "Table ID " << operation.table_id()
1435
2
          << " not found in metadata, skipping this ChangeMetadataRequest";
1436
2
      return Status::OK();
1437
2
    }
1438
1439
2.75k
    RETURN_NOT_OK(tablet_->CreatePreparedChangeMetadata(
1440
2.75k
        &operation, request->has_schema() ? &schema : nullptr));
1441
1442
2.75k
    if (request->has_schema()) {
1443
      // Apply the alter schema to the tablet.
1444
1.48k
      RETURN_NOT_OK_PREPEND(tablet_->AlterSchema(&operation), "Failed to AlterSchema:");
1445
1446
      // Also update the log information. Normally, the AlterSchema() call above takes care of this,
1447
      // but our new log isn't hooked up to the tablet yet.
1448
1.48k
      log_->SetSchemaForNextLogSegment(schema, operation.schema_version());
1449
1.48k
    }
1450
1451
2.75k
    if (request->has_wal_retention_secs()) {
1452
0
      RETURN_NOT_OK_PREPEND(tablet_->AlterWalRetentionSecs(&operation),
1453
0
                            "Failed to alter wal retention secs");
1454
0
      log_->set_wal_retention_secs(request->wal_retention_secs());
1455
0
    }
1456
1457
2.75k
    return Status::OK();
1458
2.75k
  }
1459
1460
4.50k
  CHECKED_STATUS PlayChangeConfigRequest(ReplicateMsg* replicate_msg) {
1461
4.50k
    ChangeConfigRecordPB* change_config = replicate_msg->mutable_change_config_record();
1462
4.50k
    RaftConfigPB config = change_config->new_config();
1463
1464
4.50k
    int64_t cmeta_opid_index =  cmeta_->committed_config().opid_index();
1465
4.50k
    if (replicate_msg->id().index() > cmeta_opid_index) {
1466
34
      SCHECK(!config.has_opid_index(),
1467
34
             Corruption,
1468
34
             "A config change record must have an opid_index");
1469
34
      config.set_opid_index(replicate_msg->id().index());
1470
34
      
VLOG_WITH_PREFIX0
(1) << "WAL replay found Raft configuration with log index "
1471
0
                          << config.opid_index()
1472
0
                          << " that is greater than the committed config's index "
1473
0
                          << cmeta_opid_index
1474
0
                          << ". Applying this configuration change.";
1475
34
      cmeta_->set_committed_config(config);
1476
      // We flush once at the end of bootstrap.
1477
4.46k
    } else {
1478
4.46k
      
VLOG_WITH_PREFIX0
(1) << "WAL replay found Raft configuration with log index "
1479
0
                          << replicate_msg->id().index()
1480
0
                          << ", which is less than or equal to the committed "
1481
0
                          << "config's index " << cmeta_opid_index << ". "
1482
0
                          << "Skipping application of this config change.";
1483
4.46k
    }
1484
1485
4.50k
    return Status::OK();
1486
4.50k
  }
1487
1488
0
  CHECKED_STATUS PlayTruncateRequest(ReplicateMsg* replicate_msg) {
1489
0
    auto* req = replicate_msg->mutable_truncate();
1490
1491
0
    TruncateOperation operation(tablet_.get(), req);
1492
1493
0
    Status s = tablet_->Truncate(&operation);
1494
1495
0
    RETURN_NOT_OK_PREPEND(s, "Failed to Truncate:");
1496
1497
0
    return Status::OK();
1498
0
  }
1499
1500
  CHECKED_STATUS PlayUpdateTransactionRequest(
1501
33.7k
      ReplicateMsg* replicate_msg, AlreadyAppliedToRegularDB already_applied_to_regular_db) {
1502
33.7k
    SCHECK(replicate_msg->has_hybrid_time(),
1503
33.7k
           Corruption, "A transaction update request must have a hybrid time");
1504
1505
33.7k
    UpdateTxnOperation operation(
1506
33.7k
        /* tablet */ nullptr, replicate_msg->mutable_transaction_state());
1507
33.7k
    operation.set_op_id(OpId::FromPB(replicate_msg->id()));
1508
33.7k
    HybridTime hybrid_time(replicate_msg->hybrid_time());
1509
33.7k
    operation.set_hybrid_time(hybrid_time);
1510
1511
33.7k
    auto op_id = OpId::FromPB(replicate_msg->id());
1512
33.7k
    tablet_->mvcc_manager()->AddFollowerPending(hybrid_time, op_id);
1513
33.7k
    auto scope_exit = ScopeExit([this, hybrid_time, op_id] {
1514
33.7k
      tablet_->mvcc_manager()->Replicated(hybrid_time, op_id);
1515
33.7k
    });
1516
1517
33.7k
    if (test_hooks_ && 
test_hooks_->ShouldSkipTransactionUpdates()31.0k
) {
1518
      // Used in tests where we don't have transaction participant instantiated.
1519
31.0k
      return Status::OK();
1520
31.0k
    }
1521
1522
2.66k
    auto transaction_participant = tablet_->transaction_participant();
1523
2.66k
    if (transaction_participant) {
1524
4
      TransactionParticipant::ReplicatedData replicated_data = {
1525
4
        .leader_term = yb::OpId::kUnknownTerm,
1526
4
        .state = *operation.request(),
1527
4
        .op_id = operation.op_id(),
1528
4
        .hybrid_time = operation.hybrid_time(),
1529
4
        .sealed = operation.request()->sealed(),
1530
4
        .already_applied_to_regular_db = already_applied_to_regular_db
1531
4
      };
1532
4
      return transaction_participant->ProcessReplicated(replicated_data);
1533
4
    }
1534
1535
2.66k
    auto transaction_coordinator = tablet_->transaction_coordinator();
1536
2.66k
    if (!transaction_coordinator) {
1537
0
      return STATUS(
1538
0
          IllegalState,
1539
0
          "No transaction coordinator or participant, cannot process a transaction update request");
1540
0
    }
1541
2.66k
    TransactionCoordinator::ReplicatedData replicated_data = {
1542
2.66k
        .leader_term = yb::OpId::kUnknownTerm,
1543
2.66k
        .state = *operation.request(),
1544
2.66k
        .op_id = operation.op_id(),
1545
2.66k
        .hybrid_time = operation.hybrid_time(),
1546
2.66k
    };
1547
2.66k
    return transaction_coordinator->ProcessReplicated(replicated_data);
1548
2.66k
  }
1549
1550
  // Decodes a HybridTime from the provided string and updates the clock with it.
1551
1.37M
  void UpdateClock(uint64_t hybrid_time) {
1552
1.37M
    data_.tablet_init_data.clock->Update(HybridTime(hybrid_time));
1553
1.37M
  }
1554
1555
  // Return a log prefix string in the standard "T xxx P yyy" format.
1556
350k
  std::string LogPrefix() const {
1557
350k
    return consensus::MakeTabletLogPrefix(meta_->raft_group_id(), meta_->fs_manager()->uuid());
1558
350k
  }
1559
1560
755k
  Env* GetEnv() {
1561
755k
    if (data_.tablet_init_data.tablet_options.env) {
1562
755k
      return data_.tablet_init_data.tablet_options.env;
1563
755k
    }
1564
4
    return meta_->fs_manager()->env();
1565
755k
  }
1566
1567
150k
  void CleanupSnapshots() {
1568
    // Disk clean-up: deleting temporary/incomplete snapshots.
1569
150k
    const string top_snapshots_dir = TabletSnapshots::SnapshotsDirName(meta_->rocksdb_dir());
1570
1571
150k
    if (meta_->fs_manager()->env()->FileExists(top_snapshots_dir)) {
1572
2.23k
      vector<string> snapshot_dirs;
1573
2.23k
      Status s = meta_->fs_manager()->env()->GetChildren(
1574
2.23k
          top_snapshots_dir, ExcludeDots::kTrue, &snapshot_dirs);
1575
1576
2.23k
      if (!s.ok()) {
1577
0
        LOG_WITH_PREFIX(WARNING) << "Cannot get list of snapshot directories in "
1578
0
                                 << top_snapshots_dir << ": " << s;
1579
2.23k
      } else {
1580
2.23k
        for (const string& dir_name : snapshot_dirs) {
1581
0
          const string snapshot_dir = JoinPathSegments(top_snapshots_dir, dir_name);
1582
1583
0
          if (TabletSnapshots::IsTempSnapshotDir(snapshot_dir)) {
1584
0
            LOG_WITH_PREFIX(INFO) << "Deleting old temporary snapshot directory " << snapshot_dir;
1585
1586
0
            s = meta_->fs_manager()->env()->DeleteRecursively(snapshot_dir);
1587
0
            if (!s.ok()) {
1588
0
              LOG_WITH_PREFIX(WARNING) << "Cannot delete old temporary snapshot directory "
1589
0
                                       << snapshot_dir << ": " << s;
1590
0
            }
1591
1592
0
            s = meta_->fs_manager()->env()->SyncDir(top_snapshots_dir);
1593
0
            if (!s.ok()) {
1594
0
              LOG_WITH_PREFIX(WARNING) << "Cannot sync top snapshots dir " << top_snapshots_dir
1595
0
                                       << ": " << s;
1596
0
            }
1597
0
          }
1598
0
        }
1599
2.23k
      }
1600
2.23k
    }
1601
150k
  }
1602
1603
  // Goes through the contiguous prefix of pending_replicates and applies those that are committed
1604
  // by calling MaybeReplayCommittedEntry.
1605
1.37M
  CHECKED_STATUS ApplyCommittedPendingReplicates() {
1606
1.37M
    auto& pending_replicates = replay_state_->pending_replicates;
1607
1.37M
    auto iter = pending_replicates.begin();
1608
2.68M
    while (iter != pending_replicates.end() && 
replay_state_->CanApply(iter->second.entry.get())2.64M
) {
1609
18.4E
      VLOG_WITH_PREFIX(1) << "Applying committed pending replicate "
1610
18.4E
                          << iter->second.entry->replicate().id();
1611
1.30M
      auto op_id = iter->second.entry->replicate().id();
1612
1.30M
      RETURN_NOT_OK(MaybeReplayCommittedEntry(iter->second.entry.get(), iter->second.entry_time));
1613
1.30M
      iter = pending_replicates.erase(iter);  // erase and advance the iterator (C++11)
1614
1.30M
      ++replay_state_->num_entries_applied_to_rocksdb;
1615
1.30M
    }
1616
1.37M
    return Status::OK();
1617
1.37M
  }
1618
1619
  // ----------------------------------------------------------------------------------------------
1620
  // Member fields
1621
  // ----------------------------------------------------------------------------------------------
1622
1623
  BootstrapTabletData data_;
1624
  RaftGroupMetadataPtr meta_;
1625
  std::shared_ptr<MemTracker> mem_tracker_;
1626
  TabletStatusListener* listener_;
1627
  TabletPtr tablet_;
1628
  scoped_refptr<log::Log> log_;
1629
  std::unique_ptr<log::LogReader> log_reader_;
1630
  std::unique_ptr<ReplayState> replay_state_;
1631
1632
  std::unique_ptr<consensus::ConsensusMetadata> cmeta_;
1633
1634
  // Thread pool for append task for bootstrap.
1635
  ThreadPool* append_pool_;
1636
1637
  ThreadPool* allocation_pool_;
1638
1639
  // Statistics on the replay of entries in the log.
1640
  struct Stats {
1641
    std::string ToString() const;
1642
1643
    // Number of REPLICATE messages read from the log
1644
    int ops_read = 0;
1645
1646
    // Number of REPLICATE messages which were overwritten by later entries.
1647
    int ops_overwritten = 0;
1648
  } stats_;
1649
1650
  HybridTime rocksdb_last_entry_hybrid_time_ = HybridTime::kMin;
1651
1652
  bool skip_wal_rewrite_;
1653
1654
  // A way to inject flushed OpIds for regular and intents RocksDBs.
1655
  boost::optional<DocDbOpIds> TEST_docdb_flushed_op_ids_;
1656
1657
  bool TEST_collect_replayed_op_ids_;
1658
1659
  // This is populated if TEST_collect_replayed_op_ids is true.
1660
  std::vector<yb::OpId> TEST_replayed_op_ids_;
1661
1662
  std::shared_ptr<TabletBootstrapTestHooksIf> test_hooks_;
1663
1664
  DISALLOW_COPY_AND_ASSIGN(TabletBootstrap);
1665
};
1666
1667
// ============================================================================
1668
//  Class TabletBootstrap::Stats.
1669
// ============================================================================
1670
1671
7.46k
string TabletBootstrap::Stats::ToString() const {
1672
7.46k
  return Format("Read operations: $0, overwritten operations: $1",
1673
7.46k
                ops_read, ops_overwritten);
1674
7.46k
}
1675
1676
CHECKED_STATUS BootstrapTabletImpl(
1677
    const BootstrapTabletData& data,
1678
    TabletPtr* rebuilt_tablet,
1679
    scoped_refptr<log::Log>* rebuilt_log,
1680
150k
    consensus::ConsensusBootstrapInfo* results) {
1681
150k
  TabletBootstrap tablet_bootstrap(data);
1682
150k
  auto bootstrap_status = tablet_bootstrap.Bootstrap(rebuilt_tablet, rebuilt_log, results);
1683
150k
  if (!bootstrap_status.ok()) {
1684
15
    LOG(WARNING) << "T " << (*rebuilt_tablet ? 
(*rebuilt_tablet)->tablet_id()0
: "N/A")
1685
15
                 << " Tablet bootstrap failed: " << bootstrap_status;
1686
15
  }
1687
150k
  return bootstrap_status;
1688
150k
}
1689
1690
} // namespace tablet
1691
} // namespace yb