YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/consensus/log.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include "yb/consensus/log.h"
34
35
#include <algorithm>
36
#include <atomic>
37
#include <chrono>
38
#include <condition_variable>
39
#include <memory>
40
#include <mutex>
41
#include <thread>
42
#include <vector>
43
44
#include <boost/algorithm/string/predicate.hpp>
45
#include <gflags/gflags.h>
46
47
#include "yb/common/schema.h"
48
#include "yb/common/wire_protocol.h"
49
50
#include "yb/consensus/consensus_util.h"
51
#include "yb/consensus/log_index.h"
52
#include "yb/consensus/log_metrics.h"
53
#include "yb/consensus/log_reader.h"
54
#include "yb/consensus/log_util.h"
55
56
#include "yb/fs/fs_manager.h"
57
58
#include "yb/gutil/bind.h"
59
#include "yb/gutil/map-util.h"
60
#include "yb/gutil/ref_counted.h"
61
#include "yb/gutil/stl_util.h"
62
#include "yb/gutil/strings/substitute.h"
63
#include "yb/gutil/walltime.h"
64
65
#include "yb/util/async_util.h"
66
#include "yb/util/countdown_latch.h"
67
#include "yb/util/debug/long_operation_tracker.h"
68
#include "yb/util/debug/trace_event.h"
69
#include "yb/util/env_util.h"
70
#include "yb/util/fault_injection.h"
71
#include "yb/util/file_util.h"
72
#include "yb/util/flag_tags.h"
73
#include "yb/util/format.h"
74
#include "yb/util/logging.h"
75
#include "yb/util/metrics.h"
76
#include "yb/util/operation_counter.h"
77
#include "yb/util/opid.h"
78
#include "yb/util/path_util.h"
79
#include "yb/util/pb_util.h"
80
#include "yb/util/random.h"
81
#include "yb/util/scope_exit.h"
82
#include "yb/util/shared_lock.h"
83
#include "yb/util/size_literals.h"
84
#include "yb/util/status.h"
85
#include "yb/util/status_format.h"
86
#include "yb/util/status_log.h"
87
#include "yb/util/stopwatch.h"
88
#include "yb/util/taskstream.h"
89
#include "yb/util/thread.h"
90
#include "yb/util/trace.h"
91
#include "yb/util/tsan_util.h"
92
#include "yb/util/unique_lock.h"
93
94
using namespace yb::size_literals;  // NOLINT.
95
using namespace std::literals;  // NOLINT.
96
using namespace std::placeholders;
97
98
// Log retention configuration.
99
// -----------------------------
100
DEFINE_int32(log_min_segments_to_retain, 2,
101
             "The minimum number of past log segments to keep at all times,"
102
             " regardless of what is required for durability. "
103
             "Must be at least 1.");
104
TAG_FLAG(log_min_segments_to_retain, runtime);
105
TAG_FLAG(log_min_segments_to_retain, advanced);
106
107
DEFINE_int32(log_min_seconds_to_retain, 900,
108
             "The minimum number of seconds for which to keep log segments to keep at all times, "
109
             "regardless of what is required for durability. Logs may be still retained for "
110
             "a longer amount of time if they are necessary for correct restart. This should be "
111
             "set long enough such that a tablet server which has temporarily failed can be "
112
             "restarted within the given time period. If a server is down for longer than this "
113
             "amount of time, it is possible that its tablets will be re-replicated on other "
114
             "machines.");
115
TAG_FLAG(log_min_seconds_to_retain, runtime);
116
TAG_FLAG(log_min_seconds_to_retain, advanced);
117
118
// Flags for controlling kernel watchdog limits.
119
DEFINE_int32(consensus_log_scoped_watch_delay_callback_threshold_ms, 1000,
120
             "If calling consensus log callback(s) take longer than this, the kernel watchdog "
121
             "will print out a stack trace.");
122
TAG_FLAG(consensus_log_scoped_watch_delay_callback_threshold_ms, runtime);
123
TAG_FLAG(consensus_log_scoped_watch_delay_callback_threshold_ms, advanced);
124
DEFINE_int32(consensus_log_scoped_watch_delay_append_threshold_ms, 1000,
125
             "If consensus log append takes longer than this, the kernel watchdog "
126
             "will print out a stack trace.");
127
TAG_FLAG(consensus_log_scoped_watch_delay_append_threshold_ms, runtime);
128
TAG_FLAG(consensus_log_scoped_watch_delay_append_threshold_ms, advanced);
129
130
// Fault/latency injection flags.
131
// -----------------------------
132
DEFINE_bool(log_inject_latency, false,
133
            "If true, injects artificial latency in log sync operations. "
134
            "Advanced option. Use at your own risk -- has a negative effect "
135
            "on performance for obvious reasons!");
136
DEFINE_int32(log_inject_latency_ms_mean, 100,
137
             "The number of milliseconds of latency to inject, on average. "
138
             "Only takes effect if --log_inject_latency is true");
139
DEFINE_int32(log_inject_latency_ms_stddev, 100,
140
             "The standard deviation of latency to inject in before log sync operations. "
141
             "Only takes effect if --log_inject_latency is true");
142
TAG_FLAG(log_inject_latency, unsafe);
143
TAG_FLAG(log_inject_latency_ms_mean, unsafe);
144
TAG_FLAG(log_inject_latency_ms_stddev, unsafe);
145
146
DEFINE_int32(log_inject_append_latency_ms_max, 0,
147
             "The maximum latency to inject before the log append operation.");
148
149
DEFINE_test_flag(bool, log_consider_all_ops_safe, false,
150
            "If true, we consider all operations to be safe and will not wait"
151
            "for the opId to apply to the local log. i.e. WaitForSafeOpIdToApply "
152
            "becomes a noop.");
153
154
DEFINE_test_flag(bool, simulate_abrupt_server_restart, false,
155
                 "If true, don't properly close the log segment.");
156
157
// TaskStream flags.
158
// We have to make the queue length really long.
159
// TODO: Create new flags log_taskstream_queue_max_size and log_taskstream_queue_max_wait_ms
160
// and deprecate these flags.
161
DEFINE_int32(taskstream_queue_max_size, 100000,
162
             "Maximum number of operations waiting in the taskstream queue.");
163
164
DEFINE_int32(taskstream_queue_max_wait_ms, 1000,
165
             "Maximum time in ms to wait for items in the taskstream queue to arrive.");
166
167
DEFINE_int32(wait_for_safe_op_id_to_apply_default_timeout_ms, 15000 * yb::kTimeMultiplier,
168
             "Timeout used by WaitForSafeOpIdToApply when it was not specified by caller.");
169
170
DEFINE_test_flag(int64, log_fault_after_segment_allocation_min_replicate_index, 0,
171
                 "Fault of segment allocation when min replicate index is at least specified. "
172
                 "0 to disable.");
173
174
DEFINE_int64(time_based_wal_gc_clock_delta_usec, 0,
175
             "A delta in microseconds to add to the clock value used to determine if a WAL "
176
             "segment is safe to be garbage collected. This is needed for clusters running with a "
177
             "skewed hybrid clock, because the clock used for time-based WAL GC is the wall clock, "
178
             "not hybrid clock.");
179
180
// Validate that log_min_segments_to_retain >= 1
181
16.5k
static bool ValidateLogsToRetain(const char* flagname, int value) {
182
16.5k
  if (value >= 1) {
183
16.5k
    return true;
184
16.5k
  }
185
0
  LOG(ERROR) << strings::Substitute("$0 must be at least 1, value $1 is invalid",
186
0
                                    flagname, value);
187
0
  return false;
188
16.5k
}
189
static bool dummy = google::RegisterFlagValidator(
190
    &FLAGS_log_min_segments_to_retain, &ValidateLogsToRetain);
191
192
static std::string kSegmentPlaceholderFilePrefix = ".tmp.newsegment";
193
static std::string kSegmentPlaceholderFileTemplate = kSegmentPlaceholderFilePrefix + "XXXXXX";
194
195
namespace yb {
196
namespace log {
197
198
using env_util::OpenFileForRandom;
199
using std::shared_ptr;
200
using std::unique_ptr;
201
using strings::Substitute;
202
203
namespace {
204
205
39.9M
bool IsMarkerType(LogEntryTypePB type) {
206
39.9M
  return type == LogEntryTypePB::ROLLOVER_MARKER ||
207
39.9M
         
type == LogEntryTypePB::FLUSH_MARKER39.9M
;
208
39.9M
}
209
210
} // namespace
211
212
// This class represents a batch of operations to be written and synced to the log. It is opaque to
213
// the user and is managed by the Log class.
214
class LogEntryBatch {
215
 public:
216
  LogEntryBatch(LogEntryTypePB type, LogEntryBatchPB&& entry_batch_pb);
217
  ~LogEntryBatch();
218
219
0
  std::string ToString() const {
220
0
    return Format("{ type: $0 state: $1 max_op_id: $2 }", type_, state_, MaxReplicateOpId());
221
0
  }
222
223
51.4M
  bool HasReplicateEntries() const {
224
51.4M
    return type_ == LogEntryTypePB::REPLICATE && 
count() > 051.4M
;
225
51.4M
  }
226
227
 private:
228
  friend class Log;
229
  friend class MultiThreadedLogTest;
230
231
  // Serializes contents of the entry to an internal buffer.
232
  CHECKED_STATUS Serialize();
233
234
  // Sets the callback that will be invoked after the entry is
235
  // appended and synced to disk
236
25.0M
  void set_callback(const StatusCallback& cb) {
237
25.0M
    callback_ = cb;
238
25.0M
  }
239
240
  // Returns the callback that will be invoked after the entry is
241
  // appended and synced to disk.
242
50.1M
  const StatusCallback& callback() {
243
50.1M
    return callback_;
244
50.1M
  }
245
246
25.0M
  bool failed_to_append() const {
247
25.0M
    return state_ == kEntryFailedToAppend;
248
25.0M
  }
249
250
0
  void set_failed_to_append() {
251
0
    state_ = kEntryFailedToAppend;
252
0
  }
253
254
  // Mark the entry as reserved, but not yet ready to write to the log.
255
  void MarkReserved();
256
257
  // Mark the entry as ready to write to log.
258
  void MarkReady();
259
260
  // Returns a Slice representing the serialized contents of the entry.
261
25.0M
  Slice data() const {
262
25.0M
    DCHECK_EQ(state_, kEntrySerialized);
263
25.0M
    return Slice(buffer_);
264
25.0M
  }
265
266
  bool IsMarker() const;
267
268
  bool IsSingleEntryOfType(LogEntryTypePB type) const;
269
270
128M
  size_t count() const { return count_; }
271
272
  // Returns the total size in bytes of the object.
273
50.1M
  size_t total_size_bytes() const {
274
50.1M
    return total_size_bytes_;
275
50.1M
  }
276
277
  // The highest OpId of a REPLICATE message in this batch.
278
29.4M
  OpId MaxReplicateOpId() const {
279
29.4M
    DCHECK_EQ(REPLICATE, type_);
280
29.4M
    int idx = entry_batch_pb_.entry_size() - 1;
281
29.4M
    if (idx < 0) {
282
0
      return OpId::Invalid();
283
0
    }
284
29.4M
    DCHECK(entry_batch_pb_.entry(idx).replicate().IsInitialized());
285
29.4M
    return OpId::FromPB(entry_batch_pb_.entry(idx).replicate().id());
286
29.4M
  }
287
288
25.0M
  void SetReplicates(const ReplicateMsgs& replicates) {
289
25.0M
    replicates_ = replicates;
290
25.0M
  }
291
292
  // The type of entries in this batch.
293
  const LogEntryTypePB type_;
294
295
  // Contents of the log entries that will be written to disk.
296
  LogEntryBatchPB entry_batch_pb_;
297
298
  // Total size in bytes of all entries
299
  uint32_t total_size_bytes_ = 0;
300
301
  // Number of entries in 'entry_batch_pb_'
302
  const size_t count_;
303
304
  // The vector of refcounted replicates.  This makes sure there's at least a reference to each
305
  // replicate message until we're finished appending.
306
  ReplicateMsgs replicates_;
307
308
  // Callback to be invoked upon the entries being written and synced to disk.
309
  StatusCallback callback_;
310
311
  // Buffer to which 'phys_entries_' are serialized by call to 'Serialize()'
312
  faststring buffer_;
313
314
  // Offset into the log file for this entry batch.
315
  int64_t offset_;
316
317
  // Segment sequence number for this entry batch.
318
  uint64_t active_segment_sequence_number_;
319
320
  enum LogEntryState {
321
    kEntryInitialized,
322
    kEntryReserved,
323
    kEntryReady,
324
    kEntrySerialized,
325
    kEntryFailedToAppend
326
  };
327
  LogEntryState state_ = kEntryInitialized;
328
329
  DISALLOW_COPY_AND_ASSIGN(LogEntryBatch);
330
};
331
332
// This class is responsible for managing the task that appends to the log file.
333
// This task runs in a common thread pool with append tasks from other tablets.
334
// A token is used to ensure that only one append task per tablet is executed concurrently.
335
class Log::Appender {
336
 public:
337
  explicit Appender(Log* log, ThreadPool* append_thread_pool);
338
339
  // Initializes the objects and starts the task.
340
  Status Init();
341
342
25.0M
  CHECKED_STATUS Submit(LogEntryBatch* item) {
343
25.0M
    ScopedRWOperation operation(&task_stream_counter_);
344
25.0M
    RETURN_NOT_OK(operation);
345
25.0M
    if (!task_stream_) {
346
0
      return STATUS(IllegalState, "Appender stopped");
347
0
    }
348
25.0M
    return task_stream_->Submit(item);
349
25.0M
  }
350
351
13
  CHECKED_STATUS TEST_SubmitFunc(const std::function<void()>& func) {
352
13
    return task_stream_->TEST_SubmitFunc(func);
353
13
  }
354
355
  // Waits until the last enqueued elements are processed, sets the appender_ to closing
356
  // state. If any entries are added to the queue during the process, invoke their callbacks'
357
  // 'OnFailure()' method.
358
  void Shutdown();
359
360
3
  const std::string& LogPrefix() const {
361
3
    return log_->LogPrefix();
362
3
  }
363
364
0
  std::string GetRunThreadStack() const {
365
0
    return task_stream_->GetRunThreadStack();
366
0
  }
367
368
0
  std::string ToString() const {
369
0
    return task_stream_->ToString();
370
0
  }
371
372
 private:
373
  // Process the given log entry batch or does a sync if a null is passed.
374
  void ProcessBatch(LogEntryBatch* entry_batch);
375
  void GroupWork();
376
377
  Log* const log_;
378
379
  // Lock to protect access to thread_ during shutdown.
380
  RWOperationCounter task_stream_counter_;
381
  unique_ptr<TaskStream<LogEntryBatch>> task_stream_;
382
383
  // vector of entry batches in group, to execute callbacks after call to Sync.
384
  std::vector<std::unique_ptr<LogEntryBatch>> sync_batch_;
385
386
  // Time at which current group was started
387
  MonoTime time_started_;
388
};
389
390
Log::Appender::Appender(Log *log, ThreadPool* append_thread_pool)
391
    : log_(log),
392
      task_stream_counter_(Format("Appender for $0", log->tablet_id())),
393
      task_stream_(new TaskStream<LogEntryBatch>(
394
          std::bind(&Log::Appender::ProcessBatch, this, _1), append_thread_pool,
395
          FLAGS_taskstream_queue_max_size,
396
151k
          MonoDelta::FromMilliseconds(FLAGS_taskstream_queue_max_wait_ms))) {
397
151k
  DCHECK(dummy);
398
151k
}
399
400
150k
Status Log::Appender::Init() {
401
18.4E
  VLOG_WITH_PREFIX(1) << "Starting log task stream";
402
150k
  return Status::OK();
403
150k
}
404
405
50.0M
void Log::Appender::ProcessBatch(LogEntryBatch* entry_batch) {
406
  // A callback function to TaskStream is expected to process the accumulated batch of entries.
407
50.0M
  if (entry_batch == nullptr) {
408
    // Here, we do sync and call callbacks.
409
24.9M
    GroupWork();
410
24.9M
    return;
411
24.9M
  }
412
413
25.0M
  if (sync_batch_.empty()) { // Start of batch.
414
    // Used in tests to delay writing log entries.
415
24.9M
    auto sleep_duration = log_->sleep_duration_.load(std::memory_order_acquire);
416
24.9M
    if (sleep_duration.count() > 0) {
417
1
      std::this_thread::sleep_for(sleep_duration);
418
1
    }
419
24.9M
    time_started_ = MonoTime::Now();
420
24.9M
  }
421
25.0M
  TRACE_EVENT_FLOW_END0("log", "Batch", entry_batch);
422
25.0M
  Status s = log_->DoAppend(entry_batch);
423
424
25.0M
  if (PREDICT_FALSE(!s.ok())) {
425
0
    LOG_WITH_PREFIX(DFATAL) << "Error appending to the log: " << s;
426
0
    entry_batch->set_failed_to_append();
427
    // TODO If a single operation fails to append, should we abort all subsequent operations
428
    // in this batch or allow them to be appended? What about operations in future batches?
429
0
    if (!entry_batch->callback().is_null()) {
430
0
      entry_batch->callback().Run(s);
431
0
    }
432
0
    return;
433
0
  }
434
25.0M
  if (!log_->sync_disabled_) {
435
25.0M
    bool expected = false;
436
25.0M
    if (log_->periodic_sync_needed_.compare_exchange_strong(expected, true,
437
25.0M
                                                            std::memory_order_acq_rel)) {
438
647k
      log_->periodic_sync_earliest_unsync_entry_time_ = MonoTime::Now();
439
647k
    }
440
25.0M
    log_->periodic_sync_unsynced_bytes_ += entry_batch->total_size_bytes();
441
25.0M
  }
442
25.0M
  sync_batch_.emplace_back(entry_batch);
443
25.0M
}
444
445
24.9M
void Log::Appender::GroupWork() {
446
24.9M
  if (sync_batch_.empty()) {
447
0
    Status s = log_->Sync();
448
0
    return;
449
0
  }
450
24.9M
  if (log_->metrics_) {
451
24.9M
    log_->metrics_->entry_batches_per_group->Increment(sync_batch_.size());
452
24.9M
  }
453
24.9M
  TRACE_EVENT1("log", "batch", "batch_size", sync_batch_.size());
454
455
24.9M
  auto se = ScopeExit([this] {
456
24.9M
    if (log_->metrics_) {
457
24.9M
      MonoTime time_now = MonoTime::Now();
458
24.9M
      log_->metrics_->group_commit_latency->Increment(
459
24.9M
          time_now.GetDeltaSince(time_started_).ToMicroseconds());
460
24.9M
    }
461
24.9M
    sync_batch_.clear();
462
24.9M
  });
463
464
24.9M
  Status s = log_->Sync();
465
24.9M
  if (PREDICT_FALSE(!s.ok())) {
466
0
    LOG_WITH_PREFIX(DFATAL) << "Error syncing log: " << s;
467
0
    for (std::unique_ptr<LogEntryBatch>& entry_batch : sync_batch_) {
468
0
      if (!entry_batch->callback().is_null()) {
469
0
        entry_batch->callback().Run(s);
470
0
      }
471
0
    }
472
24.9M
  } else {
473
24.9M
    TRACE_EVENT0("log", "Callbacks");
474
24.9M
    
VLOG_WITH_PREFIX201
(2) << "Synchronized " << sync_batch_.size() << " entry batches"201
;
475
24.9M
    LongOperationTracker long_operation_tracker(
476
24.9M
        "Log callback", FLAGS_consensus_log_scoped_watch_delay_callback_threshold_ms * 1ms);
477
25.0M
    for (std::unique_ptr<LogEntryBatch>& entry_batch : sync_batch_) {
478
25.0M
      if (PREDICT_TRUE(!entry_batch->failed_to_append() && !entry_batch->callback().is_null())) {
479
25.0M
        entry_batch->callback().Run(Status::OK());
480
25.0M
      }
481
      // It's important to delete each batch as we see it, because deleting it may free up memory
482
      // from memory trackers, and the callback of a later batch may want to use that memory.
483
25.0M
      entry_batch.reset();
484
25.0M
    }
485
24.9M
    sync_batch_.clear();
486
24.9M
  }
487
18.4E
  VLOG_WITH_PREFIX(1) << "Exiting AppendTask for tablet " << log_->tablet_id();
488
24.9M
}
489
490
150k
void Log::Appender::Shutdown() {
491
150k
  ScopedRWOperationPause pause(&task_stream_counter_, CoarseMonoClock::now() + 15s, Stop::kTrue);
492
150k
  if (!pause.ok()) {
493
0
    LOG(DFATAL) << "Failed to stop appender";
494
0
    return;
495
0
  }
496
150k
  if (task_stream_) {
497
76.4k
    
VLOG_WITH_PREFIX5
(1) << "Shutting down log task stream"5
;
498
76.4k
    task_stream_->Stop();
499
18.4E
    VLOG_WITH_PREFIX(1) << "Log append task stream is shut down";
500
76.4k
    task_stream_.reset();
501
76.4k
  }
502
150k
}
503
504
// This task is submitted to allocation_pool_ in order to asynchronously pre-allocate new log
505
// segments.
506
160k
void Log::SegmentAllocationTask() {
507
160k
  allocation_status_.Set(PreAllocateNewSegment());
508
160k
}
509
510
const Status Log::kLogShutdownStatus(
511
    STATUS(ServiceUnavailable, "WAL is shutting down", "", Errno(ESHUTDOWN)));
512
513
Status Log::Open(const LogOptions &options,
514
                 const std::string& tablet_id,
515
                 const std::string& wal_dir,
516
                 const std::string& peer_uuid,
517
                 const Schema& schema,
518
                 uint32_t schema_version,
519
                 const scoped_refptr<MetricEntity>& table_metric_entity,
520
                 const scoped_refptr<MetricEntity>& tablet_metric_entity,
521
                 ThreadPool* append_thread_pool,
522
                 ThreadPool* allocation_thread_pool,
523
                 int64_t cdc_min_replicated_index,
524
                 scoped_refptr<Log>* log,
525
151k
                 CreateNewSegment create_new_segment) {
526
527
151k
  RETURN_NOT_OK_PREPEND(env_util::CreateDirIfMissing(options.env, DirName(wal_dir)),
528
151k
                        Substitute("Failed to create table wal dir $0", DirName(wal_dir)));
529
530
151k
  RETURN_NOT_OK_PREPEND(env_util::CreateDirIfMissing(options.env, wal_dir),
531
151k
                        Substitute("Failed to create tablet wal dir $0", wal_dir));
532
533
151k
  scoped_refptr<Log> new_log(new Log(options,
534
151k
                                     wal_dir,
535
151k
                                     tablet_id,
536
151k
                                     peer_uuid,
537
151k
                                     schema,
538
151k
                                     schema_version,
539
151k
                                     table_metric_entity,
540
151k
                                     tablet_metric_entity,
541
151k
                                     append_thread_pool,
542
151k
                                     allocation_thread_pool,
543
151k
                                     create_new_segment));
544
151k
  RETURN_NOT_OK(new_log->Init());
545
151k
  log->swap(new_log);
546
151k
  return Status::OK();
547
151k
}
548
549
Log::Log(
550
    LogOptions options,
551
    string wal_dir,
552
    string tablet_id,
553
    string peer_uuid,
554
    const Schema& schema,
555
    uint32_t schema_version,
556
    const scoped_refptr<MetricEntity>& table_metric_entity,
557
    const scoped_refptr<MetricEntity>& tablet_metric_entity,
558
    ThreadPool* append_thread_pool,
559
    ThreadPool* allocation_thread_pool,
560
    CreateNewSegment create_new_segment)
561
    : options_(std::move(options)),
562
      wal_dir_(std::move(wal_dir)),
563
      tablet_id_(std::move(tablet_id)),
564
      peer_uuid_(std::move(peer_uuid)),
565
      schema_(std::make_unique<Schema>(schema)),
566
      schema_version_(schema_version),
567
      active_segment_sequence_number_(options.initial_active_segment_sequence_number),
568
      log_state_(kLogInitialized),
569
      max_segment_size_(options_.segment_size_bytes),
570
      // We halve the initial log segment size here because we double it for every new segment,
571
      // including the very first segment.
572
      cur_max_segment_size_((options.initial_segment_size_bytes + 1) / 2),
573
      appender_(new Appender(this, append_thread_pool)),
574
      allocation_token_(allocation_thread_pool->NewToken(ThreadPool::ExecutionMode::SERIAL)),
575
      durable_wal_write_(options_.durable_wal_write),
576
      interval_durable_wal_write_(options_.interval_durable_wal_write),
577
      bytes_durable_wal_write_mb_(options_.bytes_durable_wal_write_mb),
578
      sync_disabled_(false),
579
      allocation_state_(SegmentAllocationState::kAllocationNotStarted),
580
      table_metric_entity_(table_metric_entity),
581
      tablet_metric_entity_(tablet_metric_entity),
582
      on_disk_size_(0),
583
      log_prefix_(consensus::MakeTabletLogPrefix(tablet_id_, peer_uuid_)),
584
151k
      create_new_segment_at_start_(create_new_segment) {
585
151k
  set_wal_retention_secs(options.retention_secs);
586
151k
  if (table_metric_entity_ && 
tablet_metric_entity_150k
) {
587
150k
    metrics_.reset(new LogMetrics(table_metric_entity_, tablet_metric_entity_));
588
150k
  }
589
151k
}
590
591
151k
Status Log::Init() {
592
151k
  std::lock_guard<percpu_rwlock> write_lock(state_lock_);
593
151k
  CHECK_EQ(kLogInitialized, log_state_);
594
  // Init the index
595
151k
  log_index_.reset(new LogIndex(wal_dir_));
596
  // Reader for previous segments.
597
151k
  RETURN_NOT_OK(LogReader::Open(get_env(),
598
151k
                                log_index_,
599
151k
                                log_prefix_,
600
151k
                                wal_dir_,
601
151k
                                table_metric_entity_.get(),
602
151k
                                tablet_metric_entity_.get(),
603
151k
                                &reader_));
604
605
  // The case where we are continuing an existing log.  We must pick up where the previous WAL left
606
  // off in terms of sequence numbers.
607
151k
  if (reader_->num_segments() != 0) {
608
2.79k
    
VLOG_WITH_PREFIX0
(1) << "Using existing " << reader_->num_segments()
609
0
                        << " segments from path: " << wal_dir_;
610
611
2.79k
    vector<scoped_refptr<ReadableLogSegment> > segments;
612
2.79k
    RETURN_NOT_OK(reader_->GetSegmentsSnapshot(&segments));
613
2.79k
    active_segment_sequence_number_ = segments.back()->header().sequence_number();
614
2.79k
    LOG_WITH_PREFIX(INFO) << "Opened existing logs. Last segment is " << segments.back()->path();
615
616
    // In case where TServer process reboots, we need to reload the wal file size into the metric,
617
    // otherwise we do nothing
618
2.79k
    if (metrics_ && 
metrics_->wal_size->value() == 02.39k
) {
619
2.24k
      std::for_each(segments.begin(), segments.end(),
620
3.69k
                    [this](const auto& segment) {
621
3.69k
                    this->metrics_->wal_size->IncrementBy(segment->file_size());});
622
2.24k
    }
623
624
2.79k
  }
625
626
151k
  if (durable_wal_write_) {
627
63
    YB_LOG_FIRST_N(INFO, 1) << "durable_wal_write is turned on.";
628
151k
  } else if (interval_durable_wal_write_) {
629
150k
    
YB_LOG_FIRST_N14.3k
(INFO, 1) << "interval_durable_wal_write_ms is turned on to sync every "
630
14.3k
                            << interval_durable_wal_write_.ToMilliseconds() << " ms.";
631
150k
  } else 
if (306
bytes_durable_wal_write_mb_ > 0306
) {
632
0
    YB_LOG_FIRST_N(INFO, 1) << "bytes_durable_wal_write_mb is turned on to sync every "
633
0
     << bytes_durable_wal_write_mb_ << " MB of data.";
634
306
  } else {
635
306
    
YB_LOG_FIRST_N0
(INFO, 1) << "durable_wal_write is turned off. Buffered IO will be used for WAL."0
;
636
306
  }
637
638
151k
  if (create_new_segment_at_start_) {
639
148k
    RETURN_NOT_OK(EnsureInitialNewSegmentAllocated());
640
148k
  }
641
151k
  return Status::OK();
642
151k
}
643
644
159k
Status Log::AsyncAllocateSegment() {
645
159k
  SCHECK_EQ(
646
159k
      allocation_state_.load(std::memory_order_acquire),
647
159k
      SegmentAllocationState::kAllocationNotStarted, AlreadyPresent, "Allocation already running");
648
159k
  allocation_status_.Reset();
649
159k
  allocation_state_.store(SegmentAllocationState::kAllocationInProgress, std::memory_order_release);
650
159k
  
VLOG_WITH_PREFIX76
(1) << "Active segment: " << active_segment_sequence_number_
651
76
                      << ". Starting new segment allocation.";
652
159k
  return allocation_token_->SubmitClosure(Bind(&Log::SegmentAllocationTask, Unretained(this)));
653
159k
}
654
655
85.4k
Status Log::CloseCurrentSegment() {
656
85.4k
  if (!footer_builder_.has_min_replicate_index()) {
657
741
    
VLOG_WITH_PREFIX0
(1) << "Writing a segment without any REPLICATE message. Segment: "
658
0
                        << active_segment_->path();
659
741
  }
660
85.4k
  
VLOG_WITH_PREFIX35
(2) << "Segment footer for " << active_segment_->path()
661
35
                      << ": " << footer_builder_.ShortDebugString();
662
663
85.4k
  auto close_timestamp_micros = GetCurrentTimeMicros();
664
665
85.4k
  if (FLAGS_time_based_wal_gc_clock_delta_usec != 0) {
666
0
    auto unadjusted_close_timestamp_micros = close_timestamp_micros;
667
0
    close_timestamp_micros += FLAGS_time_based_wal_gc_clock_delta_usec;
668
0
    LOG_WITH_PREFIX(INFO)
669
0
        << "Adjusting log segment closing timestamp by "
670
0
        << FLAGS_time_based_wal_gc_clock_delta_usec << " usec from "
671
0
        << unadjusted_close_timestamp_micros << " usec to " << close_timestamp_micros << " usec";
672
0
  }
673
674
85.4k
  footer_builder_.set_close_timestamp_micros(close_timestamp_micros);
675
676
85.4k
  auto status = active_segment_->WriteFooterAndClose(footer_builder_);
677
678
85.4k
  if (status.ok() && 
metrics_85.3k
) {
679
84.7k
      metrics_->wal_size->IncrementBy(active_segment_->Size());
680
84.7k
  }
681
85.4k
  return status;
682
85.4k
}
683
684
8.94k
Status Log::RollOver() {
685
8.94k
  LOG_SLOW_EXECUTION(WARNING, 50, LogPrefix() + "Log roll took a long time") {
686
8.94k
    SCOPED_LATENCY_METRIC(metrics_, roll_latency);
687
8.94k
    RSTATUS_DCHECK(active_segment_, InternalError, "Called RollOver without active segment.");
688
689
    // Check if any errors have occurred during allocation
690
8.94k
    RETURN_NOT_OK(allocation_status_.Get());
691
692
8.94k
    DCHECK_EQ(allocation_state(), SegmentAllocationState::kAllocationFinished);
693
694
8.94k
    LOG_WITH_PREFIX(INFO) << Format(
695
8.94k
        "Last appended OpId in segment $0: $1", active_segment_->path(),
696
8.94k
        last_appended_entry_op_id_.ToString());
697
698
8.94k
    RETURN_NOT_OK(Sync());
699
8.94k
    RETURN_NOT_OK(CloseCurrentSegment());
700
701
8.94k
    RETURN_NOT_OK(SwitchToAllocatedSegment());
702
703
8.94k
    LOG_WITH_PREFIX(INFO) << "Rolled over to a new segment: " << active_segment_->path();
704
8.94k
  }
705
8.94k
  return Status::OK();
706
8.94k
}
707
708
void Log::Reserve(LogEntryTypePB type,
709
                    LogEntryBatchPB* entry_batch,
710
25.0M
                    LogEntryBatch** reserved_entry) {
711
25.0M
  TRACE_EVENT0("log", "Log::Reserve");
712
25.0M
  DCHECK(reserved_entry != nullptr);
713
25.0M
  {
714
25.0M
    SharedLock<rw_spinlock> read_lock(state_lock_.get_lock());
715
25.0M
    CHECK_EQ(kLogWriting, log_state_);
716
25.0M
  }
717
718
  // In DEBUG builds, verify that all of the entries in the batch match the specified type.  In
719
  // non-debug builds the foreach loop gets optimized out.
720
25.0M
#ifndef NDEBUG
721
25.0M
  for (const LogEntryPB& entry : entry_batch->entry()) {
722
15.3M
    DCHECK_EQ
(entry.type(), type) << "Bad batch: " << entry_batch->DebugString()0
;
723
15.3M
  }
724
25.0M
#endif
725
726
25.0M
  auto new_entry_batch = std::make_unique<LogEntryBatch>(type, std::move(*entry_batch));
727
25.0M
  new_entry_batch->MarkReserved();
728
729
  // Release the memory back to the caller: this will be freed when
730
  // the entry is removed from the queue.
731
  //
732
  // TODO (perf) Use a ring buffer instead of a blocking queue and set
733
  // 'reserved_entry' to a pre-allocated slot in the buffer.
734
25.0M
  *reserved_entry = new_entry_batch.release();
735
25.0M
}
736
737
Status Log::TEST_AsyncAppendWithReplicates(
738
2.00k
    LogEntryBatch* entry, const ReplicateMsgs& replicates, const StatusCallback& callback) {
739
2.00k
  entry->SetReplicates(replicates);
740
2.00k
  return AsyncAppend(entry, callback);
741
2.00k
}
742
743
25.0M
Status Log::AsyncAppend(LogEntryBatch* entry_batch, const StatusCallback& callback) {
744
25.0M
  {
745
25.0M
    SharedLock<rw_spinlock> read_lock(state_lock_.get_lock());
746
25.0M
    CHECK_EQ(kLogWriting, log_state_);
747
25.0M
  }
748
749
25.0M
  entry_batch->set_callback(callback);
750
25.0M
  entry_batch->MarkReady();
751
752
25.0M
  if (entry_batch->HasReplicateEntries()) {
753
14.0M
    last_submitted_op_id_ = entry_batch->MaxReplicateOpId();
754
14.0M
  }
755
756
25.0M
  auto submit_status = appender_->Submit(entry_batch);
757
25.0M
  if (PREDICT_FALSE(!submit_status.ok())) {
758
0
    LOG_WITH_PREFIX(WARNING)
759
0
        << "Failed to submit batch " << entry_batch->MaxReplicateOpId() << ": " << submit_status;
760
0
    delete entry_batch;
761
0
    return kLogShutdownStatus;
762
0
  }
763
764
25.0M
  return Status::OK();
765
25.0M
}
766
767
Status Log::AsyncAppendReplicates(const ReplicateMsgs& msgs, const yb::OpId& committed_op_id,
768
                                  RestartSafeCoarseTimePoint batch_mono_time,
769
25.0M
                                  const StatusCallback& callback) {
770
25.0M
  auto batch = CreateBatchFromAllocatedOperations(msgs);
771
25.0M
  if (!committed_op_id.empty()) {
772
24.7M
    committed_op_id.ToPB(batch.mutable_committed_op_id());
773
24.7M
  }
774
  // Set batch mono time if it was specified.
775
25.0M
  if (batch_mono_time != RestartSafeCoarseTimePoint()) {
776
25.0M
    batch.set_mono_time(batch_mono_time.ToUInt64());
777
25.0M
  }
778
779
25.0M
  LogEntryBatch* reserved_entry_batch;
780
25.0M
  Reserve(REPLICATE, &batch, &reserved_entry_batch);
781
782
  // If we're able to reserve, set the vector of replicate shared pointers in the LogEntryBatch.
783
  // This will make sure there's a reference for each replicate while we're appending.
784
25.0M
  reserved_entry_batch->SetReplicates(msgs);
785
786
25.0M
  RETURN_NOT_OK(AsyncAppend(reserved_entry_batch, callback));
787
25.0M
  return Status::OK();
788
25.0M
}
789
790
Status Log::DoAppend(LogEntryBatch* entry_batch,
791
                     bool caller_owns_operation,
792
26.4M
                     bool skip_wal_write) {
793
26.4M
  if (!skip_wal_write) {
794
25.0M
    RETURN_NOT_OK(entry_batch->Serialize());
795
25.0M
    Slice entry_batch_data = entry_batch->data();
796
25.0M
    LOG_IF(DFATAL, entry_batch_data.size() <= 0 && !entry_batch->IsMarker())
797
3.62k
        << "Cannot call DoAppend() with no data";
798
799
25.0M
    if (entry_batch->IsSingleEntryOfType(ROLLOVER_MARKER)) {
800
6.28k
      
VLOG_WITH_PREFIX_AND_FUNC0
(1) << "Got ROLLOVER_MARKER"0
;
801
6.28k
      if (active_segment_ && footer_builder_.IsInitialized() && footer_builder_.num_entries() > 0) {
802
        // Active segment is not empty - rollover.
803
6.21k
        if (allocation_state() == SegmentAllocationState::kAllocationNotStarted) {
804
6.18k
          RETURN_NOT_OK(AsyncAllocateSegment());
805
6.18k
        }
806
6.21k
        return RollOver();
807
6.21k
      }
808
6.28k
    }
809
810
25.0M
    auto entry_batch_bytes = entry_batch->total_size_bytes();
811
    // If there is no data to write return OK.
812
25.0M
    if (PREDICT_FALSE(entry_batch_bytes == 0)) {
813
218
      return Status::OK();
814
218
    }
815
816
    // If the size of this entry overflows the current segment, get a new one.
817
25.0M
    if (allocation_state() == SegmentAllocationState::kAllocationNotStarted) {
818
25.0M
      if (active_segment_->Size() + entry_batch_bytes + kEntryHeaderSize > cur_max_segment_size_) {
819
2.77k
        LOG_WITH_PREFIX(INFO) << "Max segment size " << cur_max_segment_size_ << " reached. "
820
2.77k
                              << "Starting new segment allocation.";
821
2.77k
        RETURN_NOT_OK(AsyncAllocateSegment());
822
2.77k
        if (!options_.async_preallocate_segments) {
823
0
          RETURN_NOT_OK(RollOver());
824
0
        }
825
2.77k
      }
826
25.0M
    } else 
if (6.93k
allocation_state() == SegmentAllocationState::kAllocationFinished6.93k
) {
827
2.73k
      RETURN_NOT_OK(RollOver());
828
4.20k
    } else {
829
4.20k
      
VLOG_WITH_PREFIX3.77k
(1) << "Segment allocation already in progress..."3.77k
;
830
4.20k
    }
831
832
25.0M
    int64_t start_offset = active_segment_->written_offset();
833
834
25.0M
    
LOG_SLOW_EXECUTION25.0M
(WARNING, 50, "Append to log took a long time") {
835
25.0M
      SCOPED_LATENCY_METRIC(metrics_, append_latency);
836
25.0M
      LongOperationTracker long_operation_tracker(
837
25.0M
          "Log append", FLAGS_consensus_log_scoped_watch_delay_append_threshold_ms * 1ms);
838
839
25.0M
      RETURN_NOT_OK(active_segment_->WriteEntryBatch(entry_batch_data));
840
25.0M
    }
841
842
25.0M
    if (metrics_) {
843
25.0M
      metrics_->bytes_logged->IncrementBy(active_segment_->written_offset() - start_offset);
844
25.0M
    }
845
846
    // Populate the offset and sequence number for the entry batch if we did a WAL write.
847
25.0M
    entry_batch->offset_ = start_offset;
848
25.0M
    entry_batch->active_segment_sequence_number_ = active_segment_sequence_number_;
849
25.0M
  }
850
851
  // We keep track of the last-written OpId here. This is needed to initialize Consensus on
852
  // startup.
853
26.4M
  if (entry_batch->HasReplicateEntries()) {
854
15.4M
    last_appended_entry_op_id_ = entry_batch->MaxReplicateOpId();
855
15.4M
  }
856
857
26.4M
  CHECK_OK(UpdateIndexForBatch(*entry_batch));
858
26.4M
  UpdateFooterForBatch(entry_batch);
859
860
  // We expect the caller to free the actual entries if caller_owns_operation is set.
861
26.4M
  if (caller_owns_operation) {
862
40.3M
    for (int i = 0; i < entry_batch->entry_batch_pb_.entry_size(); 
i++15.2M
) {
863
15.2M
      LogEntryPB* entry_pb = entry_batch->entry_batch_pb_.mutable_entry(i);
864
15.2M
      entry_pb->release_replicate();
865
15.2M
    }
866
25.0M
  }
867
868
26.4M
  return Status::OK();
869
26.4M
}
870
871
26.4M
Status Log::UpdateIndexForBatch(const LogEntryBatch& batch) {
872
26.4M
  if (batch.type_ != REPLICATE) {
873
0
    return Status::OK();
874
0
  }
875
876
26.4M
  for (const LogEntryPB& entry_pb : batch.entry_batch_pb_.entry()) {
877
16.6M
    LogIndexEntry index_entry;
878
879
16.6M
    index_entry.op_id = yb::OpId::FromPB(entry_pb.replicate().id());
880
16.6M
    index_entry.segment_sequence_number = batch.active_segment_sequence_number_;
881
16.6M
    index_entry.offset_in_segment = batch.offset_;
882
16.6M
    RETURN_NOT_OK(log_index_->AddEntry(index_entry));
883
16.6M
  }
884
26.4M
  return Status::OK();
885
26.4M
}
886
887
26.4M
void Log::UpdateFooterForBatch(LogEntryBatch* batch) {
888
26.4M
  footer_builder_.set_num_entries(footer_builder_.num_entries() + batch->count());
889
890
  // We keep track of the last-written OpId here.  This is needed to initialize Consensus on
891
  // startup.  We also retrieve the OpId of the first operation in the batch so that, if we roll
892
  // over to a new segment, we set the first operation in the footer immediately.
893
  // Update the index bounds for the current segment.
894
26.4M
  for (const LogEntryPB& entry_pb : batch->entry_batch_pb_.entry()) {
895
16.6M
    int64_t index = entry_pb.replicate().id().index();
896
16.6M
    if (!footer_builder_.has_min_replicate_index() ||
897
16.6M
        
index < footer_builder_.min_replicate_index()16.5M
) {
898
161k
      footer_builder_.set_min_replicate_index(index);
899
161k
      min_replicate_index_.store(index, std::memory_order_release);
900
161k
    }
901
16.6M
    if (!footer_builder_.has_max_replicate_index() ||
902
16.6M
        
index > footer_builder_.max_replicate_index()16.5M
) {
903
16.4M
      footer_builder_.set_max_replicate_index(index);
904
16.4M
    }
905
16.6M
  }
906
26.4M
}
907
908
6.28k
Status Log::AllocateSegmentAndRollOver() {
909
6.28k
  
VLOG_WITH_PREFIX_AND_FUNC0
(1) << "Start"0
;
910
6.28k
  auto* reserved_entry_batch = ReserveMarker(ROLLOVER_MARKER);
911
6.28k
  Synchronizer s;
912
6.28k
  RETURN_NOT_OK(AsyncAppend(reserved_entry_batch, s.AsStatusCallback()));
913
6.28k
  return s.Wait();
914
6.28k
}
915
916
298k
Status Log::EnsureInitialNewSegmentAllocated() {
917
298k
  if (log_state_ == LogState::kLogWriting) {
918
    // New segment already created.
919
147k
    return Status::OK();
920
147k
  }
921
150k
  if (log_state_ != LogState::kLogInitialized) {
922
0
    return STATUS_FORMAT(
923
0
        IllegalState, "Unexpected log state in EnsureInitialNewSegmentAllocated: $0", log_state_);
924
0
  }
925
150k
  RETURN_NOT_OK(AsyncAllocateSegment());
926
150k
  RETURN_NOT_OK(allocation_status_.Get());
927
150k
  RETURN_NOT_OK(SwitchToAllocatedSegment());
928
929
150k
  RETURN_NOT_OK(appender_->Init());
930
150k
  log_state_ = LogState::kLogWriting;
931
150k
  return Status::OK();
932
150k
}
933
934
25.1M
Status Log::Sync() {
935
25.1M
  TRACE_EVENT0("log", "Sync");
936
25.1M
  SCOPED_LATENCY_METRIC(metrics_, sync_latency);
937
938
25.1M
  if (
!sync_disabled_25.1M
) {
939
25.1M
    if (PREDICT_FALSE(GetAtomicFlag(&FLAGS_log_inject_latency))) {
940
102k
      Random r(static_cast<uint32_t>(GetCurrentTimeMicros()));
941
102k
      int sleep_ms = r.Normal(GetAtomicFlag(&FLAGS_log_inject_latency_ms_mean),
942
102k
                              GetAtomicFlag(&FLAGS_log_inject_latency_ms_stddev));
943
102k
      if (sleep_ms > 0) {
944
101k
        LOG_WITH_PREFIX(INFO) << "Injecting " << sleep_ms << "ms of latency in Log::Sync()";
945
101k
        SleepFor(MonoDelta::FromMilliseconds(sleep_ms));
946
101k
      }
947
102k
    }
948
949
25.1M
    bool timed_or_data_limit_sync = false;
950
25.1M
    if (!durable_wal_write_ && 
periodic_sync_needed_.load()25.1M
) {
951
25.0M
      if (interval_durable_wal_write_) {
952
25.0M
        if (MonoTime::Now() > periodic_sync_earliest_unsync_entry_time_
953
25.0M
            + interval_durable_wal_write_) {
954
533k
          timed_or_data_limit_sync = true;
955
533k
        }
956
25.0M
      }
957
25.0M
      if (
bytes_durable_wal_write_mb_ > 025.0M
) {
958
25.0M
        if (periodic_sync_unsynced_bytes_ >= bytes_durable_wal_write_mb_ * 1_MB) {
959
5.41k
          timed_or_data_limit_sync = true;
960
5.41k
        }
961
25.0M
      }
962
25.0M
    }
963
964
25.1M
    if (durable_wal_write_ || 
timed_or_data_limit_sync25.1M
) {
965
547k
      periodic_sync_needed_.store(false);
966
547k
      periodic_sync_unsynced_bytes_ = 0;
967
547k
      
LOG_SLOW_EXECUTION547k
(WARNING, 50, "Fsync log took a long time") {
968
547k
        RETURN_NOT_OK(active_segment_->Sync());
969
547k
      }
970
547k
    }
971
25.1M
  }
972
973
  // Update the reader on how far it can read the active segment.
974
25.1M
  reader_->UpdateLastSegmentOffset(active_segment_->written_offset());
975
976
25.1M
  {
977
25.1M
    std::lock_guard<std::mutex> write_lock(last_synced_entry_op_id_mutex_);
978
25.1M
    last_synced_entry_op_id_.store(last_appended_entry_op_id_, boost::memory_order_release);
979
25.1M
    last_synced_entry_op_id_cond_.notify_all();
980
25.1M
  }
981
982
25.1M
  return Status::OK();
983
25.1M
}
984
985
50.4M
Status Log::GetSegmentsToGCUnlocked(int64_t min_op_idx, SegmentSequence* segments_to_gc) const {
986
  // For the lifetime of a Log::CopyTo call, log_copy_min_index_ may be set to something
987
  // other than std::numeric_limits<int64_t>::max(). This value will correspond to the
988
  // minimum op_idx which is currently being copied and must be retained. In order to
989
  // avoid concurrently deleting those ops, we bump min_op_idx here to be at-least as
990
  // low as log_copy_min_index_.
991
50.4M
  min_op_idx = std::min(log_copy_min_index_, min_op_idx);
992
  // Find the prefix of segments in the segment sequence that is guaranteed not to include
993
  // 'min_op_idx'.
994
50.4M
  RETURN_NOT_OK(reader_->GetSegmentPrefixNotIncluding(
995
50.4M
      min_op_idx, cdc_min_replicated_index_.load(std::memory_order_acquire), segments_to_gc));
996
997
50.4M
  auto max_to_delete = std::max<ssize_t>(
998
50.4M
      reader_->num_segments() - FLAGS_log_min_segments_to_retain, 0);
999
50.4M
  ssize_t segments_to_gc_size = segments_to_gc->size();
1000
50.4M
  if (segments_to_gc_size > max_to_delete) {
1001
2.38M
    
VLOG_WITH_PREFIX0
(2)
1002
0
        << "GCing " << segments_to_gc_size << " in " << wal_dir_
1003
0
        << " would not leave enough remaining segments to satisfy minimum "
1004
0
        << "retention requirement. Only considering "
1005
0
        << max_to_delete << "/" << reader_->num_segments();
1006
2.38M
    segments_to_gc->resize(max_to_delete);
1007
48.0M
  } else if (segments_to_gc_size < max_to_delete) {
1008
1.33M
    auto extra_segments = max_to_delete - segments_to_gc_size;
1009
1.33M
    
VLOG_WITH_PREFIX0
(2) << "Too many log segments, need to GC " << extra_segments << " more."0
;
1010
1.33M
  }
1011
1012
  // Don't GC segments that are newer than the configured time-based retention.
1013
50.4M
  int64_t now = GetCurrentTimeMicros() + FLAGS_time_based_wal_gc_clock_delta_usec;
1014
1015
50.4M
  for (size_t i = 0; i < segments_to_gc->size(); 
i++115
) {
1016
228k
    const scoped_refptr<ReadableLogSegment>& segment = (*segments_to_gc)[i];
1017
1018
    // Segments here will always have a footer, since we don't return the in-progress segment up
1019
    // above. However, segments written by older YB builds may not have the timestamp info (TODO:
1020
    // make sure we indeed care about these old builds). In that case, we're allowed to GC them.
1021
228k
    if (!segment->footer().has_close_timestamp_micros()) 
continue0
;
1022
1023
228k
    int64_t age_seconds = (now - segment->footer().close_timestamp_micros()) / 1000000;
1024
228k
    if (age_seconds < wal_retention_secs()) {
1025
228k
      
VLOG_WITH_PREFIX0
(2)
1026
0
          << "Segment " << segment->path() << " is only " << age_seconds << "s old: "
1027
0
          << "cannot GC it yet due to configured time-based retention policy.";
1028
      // Truncate the list of segments to GC here -- if this one is too new, then all later ones are
1029
      // also too new.
1030
228k
      segments_to_gc->resize(i);
1031
228k
      break;
1032
228k
    }
1033
228k
  }
1034
1035
50.4M
  return Status::OK();
1036
50.4M
}
1037
1038
Status Log::Append(LogEntryPB* phys_entry,
1039
                   LogEntryMetadata entry_metadata,
1040
1.37M
                   bool skip_wal_write) {
1041
1.37M
  LogEntryBatchPB entry_batch_pb;
1042
1.37M
  if (
entry_metadata.entry_time != RestartSafeCoarseTimePoint()1.37M
) {
1043
1.37M
    entry_batch_pb.set_mono_time(entry_metadata.entry_time.ToUInt64());
1044
1.37M
  }
1045
1046
1.37M
  entry_batch_pb.mutable_entry()->AddAllocated(phys_entry);
1047
1.37M
  LogEntryBatch entry_batch(phys_entry->type(), std::move(entry_batch_pb));
1048
  // Mark this as reserved, as we're building it from preallocated data.
1049
1.37M
  entry_batch.state_ = LogEntryBatch::kEntryReserved;
1050
  // Ready assumes the data is reserved before it is ready.
1051
1.37M
  entry_batch.MarkReady();
1052
1.37M
  if (skip_wal_write) {
1053
    // Get the LogIndex entry from read path metadata.
1054
1.37M
    entry_batch.offset_ = entry_metadata.offset;
1055
1.37M
    entry_batch.active_segment_sequence_number_ = entry_metadata.active_segment_sequence_number;
1056
1.37M
  }
1057
1.37M
  Status s = DoAppend(&entry_batch, false, skip_wal_write);
1058
1.37M
  if (s.ok() && 
!skip_wal_write1.37M
) {
1059
    // Only sync if we actually performed a wal write.
1060
0
    s = Sync();
1061
0
  }
1062
1.37M
  entry_batch.entry_batch_pb_.mutable_entry()->ExtractSubrange(0, 1, nullptr);
1063
1.37M
  return s;
1064
1.37M
}
1065
1066
6.43k
LogEntryBatch* Log::ReserveMarker(LogEntryTypePB type) {
1067
6.43k
  LogEntryBatchPB entry_batch;
1068
6.43k
  entry_batch.add_entry()->set_type(type);
1069
6.43k
  LogEntryBatch* reserved_entry_batch;
1070
6.43k
  Reserve(type, &entry_batch, &reserved_entry_batch);
1071
6.43k
  return reserved_entry_batch;
1072
6.43k
}
1073
1074
148
Status Log::WaitUntilAllFlushed() {
1075
  // In order to make sure we empty the queue we need to use the async API.
1076
148
  auto* reserved_entry_batch = ReserveMarker(FLUSH_MARKER);
1077
148
  Synchronizer s;
1078
148
  RETURN_NOT_OK(AsyncAppend(reserved_entry_batch, s.AsStatusCallback()));
1079
148
  return s.Wait();
1080
148
}
1081
1082
306k
void Log::set_wal_retention_secs(uint32_t wal_retention_secs) {
1083
306k
  LOG_WITH_PREFIX(INFO) << "Setting log wal retention time to " << wal_retention_secs << " seconds";
1084
306k
  wal_retention_secs_.store(wal_retention_secs, std::memory_order_release);
1085
306k
}
1086
1087
228k
uint32_t Log::wal_retention_secs() const {
1088
228k
  uint32_t wal_retention_secs = wal_retention_secs_.load(std::memory_order_acquire);
1089
228k
  auto flag_wal_retention = ANNOTATE_UNPROTECTED_READ(FLAGS_log_min_seconds_to_retain);
1090
228k
  return flag_wal_retention > 0 ?
1091
228k
      std::max(wal_retention_secs, static_cast<uint32_t>(flag_wal_retention)) :
1092
228k
      
wal_retention_secs95
;
1093
228k
}
1094
1095
51.6M
yb::OpId Log::GetLatestEntryOpId() const {
1096
51.6M
  return last_synced_entry_op_id_.load(boost::memory_order_acquire);
1097
51.6M
}
1098
1099
606
int64_t Log::GetMinReplicateIndex() const {
1100
606
  return min_replicate_index_.load(std::memory_order_acquire);
1101
606
}
1102
1103
13.6M
yb::OpId Log::WaitForSafeOpIdToApply(const yb::OpId& min_allowed, MonoDelta duration) {
1104
13.6M
  if (FLAGS_TEST_log_consider_all_ops_safe || 
all_op_ids_safe_13.6M
) {
1105
39
    return min_allowed;
1106
39
  }
1107
1108
13.6M
  auto result = last_synced_entry_op_id_.load(boost::memory_order_acquire);
1109
1110
13.6M
  if (result < min_allowed) {
1111
8.10M
    auto start = CoarseMonoClock::Now();
1112
8.10M
    std::unique_lock<std::mutex> lock(last_synced_entry_op_id_mutex_);
1113
8.10M
    auto wait_time = duration ? 
duration.ToSteadyDuration()7.82M
1114
8.10M
                              : 
FLAGS_wait_for_safe_op_id_to_apply_default_timeout_ms * 1ms278k
;
1115
8.10M
    for (;;) {
1116
8.10M
      if (last_synced_entry_op_id_cond_.wait_for(
1117
16.2M
              lock, wait_time, [this, min_allowed, &result] {
1118
16.2M
            result = last_synced_entry_op_id_.load(boost::memory_order_acquire);
1119
16.2M
            return result >= min_allowed;
1120
16.2M
      })) {
1121
8.10M
        break;
1122
8.10M
      }
1123
18.4E
      if (duration) {
1124
16
        return yb::OpId();
1125
16
      }
1126
      // TODO(bogdan): If the log is closed at this point, consider refactoring to return status
1127
      // and fail cleanly.
1128
18.4E
      LOG_WITH_PREFIX(ERROR) << "Appender stack: " << appender_->GetRunThreadStack();
1129
18.4E
      LOG_WITH_PREFIX(DFATAL)
1130
18.4E
          << "Long wait for safe op id: " << min_allowed
1131
18.4E
          << ", current: " << GetLatestEntryOpId()
1132
18.4E
          << ", last appended: " << last_appended_entry_op_id_
1133
18.4E
          << ", last submitted: " << last_submitted_op_id_
1134
18.4E
          << ", appender: " << appender_->ToString()
1135
18.4E
          << ", passed: " << MonoDelta(CoarseMonoClock::Now() - start);
1136
18.4E
    }
1137
8.10M
  }
1138
1139
13.6M
  DCHECK_GE(result.term, min_allowed.term)
1140
0
      << "result: " << result << ", min_allowed: " << min_allowed;
1141
13.6M
  return result;
1142
13.6M
}
1143
1144
2.60k
Status Log::GC(int64_t min_op_idx, int32_t* num_gced) {
1145
2.60k
  CHECK_GE(min_op_idx, 0);
1146
1147
2.60k
  LOG_WITH_PREFIX(INFO) << "Running Log GC on " << wal_dir_ << ": retaining ops >= " << min_op_idx
1148
2.60k
                        << ", log segment size = " << options_.segment_size_bytes;
1149
2.60k
  VLOG_TIMING(1, "Log GC") {
1150
2.60k
    SegmentSequence segments_to_delete;
1151
1152
2.60k
    {
1153
2.60k
      std::lock_guard<percpu_rwlock> l(state_lock_);
1154
2.60k
      CHECK_EQ(kLogWriting, log_state_);
1155
1156
2.60k
      RETURN_NOT_OK(GetSegmentsToGCUnlocked(min_op_idx, &segments_to_delete));
1157
1158
2.60k
      if (segments_to_delete.size() == 0) {
1159
2.51k
        
VLOG_WITH_PREFIX0
(1) << "No segments to delete."0
;
1160
2.51k
        *num_gced = 0;
1161
2.51k
        return Status::OK();
1162
2.51k
      }
1163
      // Trim the prefix of segments from the reader so that they are no longer referenced by the
1164
      // log.
1165
94
      RETURN_NOT_OK(reader_->TrimSegmentsUpToAndIncluding(
1166
94
          segments_to_delete[segments_to_delete.size() - 1]->header().sequence_number()));
1167
94
    }
1168
1169
    // Now that they are no longer referenced by the Log, delete the files.
1170
94
    *num_gced = 0;
1171
104
    for (const scoped_refptr<ReadableLogSegment>& segment : segments_to_delete) {
1172
104
      LOG_WITH_PREFIX(INFO) << "Deleting log segment in path: " << segment->path()
1173
104
                            << " (GCed ops < " << segment->footer().max_replicate_index() + 1
1174
104
                            << ")";
1175
104
      RETURN_NOT_OK(get_env()->DeleteFile(segment->path()));
1176
104
      (*num_gced)++;
1177
1178
104
      if (metrics_) {
1179
104
        metrics_->wal_size->IncrementBy(-1 * segment->file_size());
1180
104
      }
1181
104
    }
1182
1183
    // Determine the minimum remaining replicate index in order to properly GC the index chunks.
1184
94
    int64_t min_remaining_op_idx = reader_->GetMinReplicateIndex();
1185
94
    if (min_remaining_op_idx > 0) {
1186
94
      log_index_->GC(min_remaining_op_idx);
1187
94
    }
1188
94
  }
1189
94
  return Status::OK();
1190
2.60k
}
1191
1192
50.4M
Status Log::GetGCableDataSize(int64_t min_op_idx, int64_t* total_size) const {
1193
50.4M
  if (min_op_idx < 0) {
1194
0
    return STATUS_FORMAT(InvalidArgument, "Invalid min op index $0", min_op_idx);
1195
0
  }
1196
1197
50.4M
  SegmentSequence segments_to_delete;
1198
50.4M
  *total_size = 0;
1199
50.4M
  {
1200
50.4M
    SharedLock<rw_spinlock> read_lock(state_lock_.get_lock());
1201
50.4M
    if (log_state_ != kLogWriting) {
1202
0
      return STATUS_FORMAT(IllegalState, "Invalid log state $0, expected $1",
1203
0
          log_state_, kLogWriting);
1204
0
    }
1205
50.4M
    Status s = GetSegmentsToGCUnlocked(min_op_idx, &segments_to_delete);
1206
1207
50.4M
    if (!s.ok() || 
segments_to_delete.size() == 050.4M
) {
1208
50.4M
      return Status::OK();
1209
50.4M
    }
1210
50.4M
  }
1211
13
  for (const scoped_refptr<ReadableLogSegment>& segment : segments_to_delete) {
1212
11
    *total_size += segment->file_size();
1213
11
  }
1214
13
  return Status::OK();
1215
50.4M
}
1216
1217
53.9M
LogReader* Log::GetLogReader() const {
1218
53.9M
  return reader_.get();
1219
53.9M
}
1220
1221
4.83k
Status Log::GetSegmentsSnapshot(SegmentSequence* segments) const {
1222
4.83k
  SharedLock<rw_spinlock> read_lock(state_lock_.get_lock());
1223
4.83k
  if (!reader_) {
1224
0
    return STATUS(IllegalState, "Log already closed");
1225
0
  }
1226
1227
4.83k
  return reader_->GetSegmentsSnapshot(segments);
1228
4.83k
}
1229
1230
2.36M
uint64_t Log::OnDiskSize() {
1231
2.36M
  SegmentSequence segments;
1232
2.36M
  {
1233
2.36M
    shared_lock<rw_spinlock> l(state_lock_.get_lock());
1234
    // If the log is closed, the tablet is either being deleted or tombstoned,
1235
    // so we don't count the size of its log anymore as it should be deleted.
1236
2.36M
    if (log_state_ == kLogClosed || 
!reader_->GetSegmentsSnapshot(&segments).ok()2.36M
) {
1237
86
      return on_disk_size_.load();
1238
86
    }
1239
2.36M
  }
1240
2.36M
  uint64_t ret = 0;
1241
2.97M
  for (const auto& segment : segments) {
1242
2.97M
    ret += segment->file_size();
1243
2.97M
  }
1244
1245
2.36M
  on_disk_size_.store(ret, std::memory_order_release);
1246
2.36M
  return ret;
1247
2.36M
}
1248
1249
void Log::SetSchemaForNextLogSegment(const Schema& schema,
1250
68.3k
                                     uint32_t version) {
1251
68.3k
  std::lock_guard<rw_spinlock> l(schema_lock_);
1252
68.3k
  *schema_ = schema;
1253
68.3k
  schema_version_ = version;
1254
68.3k
}
1255
1256
150k
Status Log::Close() {
1257
150k
  if (PREDICT_FALSE(FLAGS_TEST_simulate_abrupt_server_restart)) {
1258
0
    return Status::OK();
1259
0
  }
1260
  // Allocation pool is used from appender pool, so we should shutdown appender first.
1261
150k
  appender_->Shutdown();
1262
150k
  allocation_token_.reset();
1263
1264
150k
  std::lock_guard<percpu_rwlock> l(state_lock_);
1265
150k
  switch (log_state_) {
1266
76.5k
    case kLogWriting:
1267
76.5k
      RETURN_NOT_OK(Sync());
1268
76.5k
      RETURN_NOT_OK(CloseCurrentSegment());
1269
76.5k
      RETURN_NOT_OK(ReplaceSegmentInReaderUnlocked());
1270
76.1k
      log_state_ = kLogClosed;
1271
76.1k
      
VLOG_WITH_PREFIX666
(1) << "Log closed"666
;
1272
1273
      // Release FDs held by these objects.
1274
76.1k
      log_index_.reset();
1275
76.1k
      reader_.reset();
1276
1277
76.1k
      return Status::OK();
1278
1279
74.1k
    case kLogClosed:
1280
74.1k
      
VLOG_WITH_PREFIX8
(1) << "Log already closed"8
;
1281
74.1k
      return Status::OK();
1282
1283
7
    default:
1284
7
      return STATUS(IllegalState, Substitute("Bad state for Close() $0", log_state_));
1285
150k
  }
1286
150k
}
1287
1288
4.40k
size_t Log::num_segments() const {
1289
4.40k
  std::shared_lock<rw_spinlock> read_lock(state_lock_.get_lock());
1290
4.40k
  return reader_ ? reader_->num_segments() : 
00
;
1291
4.40k
}
1292
1293
4.71k
scoped_refptr<ReadableLogSegment> Log::GetSegmentBySequenceNumber(int64_t seq) const {
1294
4.71k
  SharedLock<rw_spinlock> read_lock(state_lock_.get_lock());
1295
4.71k
  if (!reader_) {
1296
0
    return nullptr;
1297
0
  }
1298
1299
4.71k
  return reader_->GetSegmentBySequenceNumber(seq);
1300
4.71k
}
1301
1302
2
bool Log::HasOnDiskData(FsManager* fs_manager, const string& wal_dir) {
1303
2
  return fs_manager->env()->FileExists(wal_dir);
1304
2
}
1305
1306
Status Log::DeleteOnDiskData(Env* env,
1307
                             const string& tablet_id,
1308
                             const string& wal_dir,
1309
75.6k
                             const string& peer_uuid) {
1310
75.6k
  if (!env->FileExists(wal_dir)) {
1311
240
    return Status::OK();
1312
240
  }
1313
75.4k
  LOG(INFO) << "T " << tablet_id << " P " << peer_uuid
1314
75.4k
            << ": Deleting WAL dir " << wal_dir;
1315
75.4k
  RETURN_NOT_OK_PREPEND(env->DeleteRecursively(wal_dir),
1316
75.4k
                        "Unable to recursively delete WAL dir for tablet " + tablet_id);
1317
75.4k
  return Status::OK();
1318
75.4k
}
1319
1320
68
Status Log::FlushIndex() {
1321
68
  if (!log_index_) {
1322
0
    return Status::OK();
1323
0
  }
1324
68
  return log_index_->Flush();
1325
68
}
1326
1327
165
Status Log::CopyTo(const std::string& dest_wal_dir) {
1328
  // We mainly need log_copy_mutex_ to simplify managing of log_copy_min_index_.
1329
165
  std::lock_guard<decltype(log_copy_mutex_)> log_copy_lock(log_copy_mutex_);
1330
165
  auto se = ScopeExit([this]() {
1331
165
    std::lock_guard<percpu_rwlock> l(state_lock_);
1332
165
    log_copy_min_index_ = std::numeric_limits<int64_t>::max();
1333
165
  });
1334
1335
165
  SegmentSequence segments;
1336
165
  scoped_refptr<LogIndex> log_index;
1337
165
  {
1338
165
    UniqueLock<percpu_rwlock> l(state_lock_);
1339
165
    if (log_state_ != kLogInitialized) {
1340
165
      SCHECK_EQ(log_state_, kLogWriting, IllegalState, Format("Invalid log state: $0", log_state_));
1341
165
      ReverseLock<decltype(l)> rlock(l);
1342
      // Rollover current active segment if it is not empty.
1343
165
      RETURN_NOT_OK(AllocateSegmentAndRollOver());
1344
165
    }
1345
1346
165
    SCHECK(
1347
165
        log_state_ == kLogInitialized || log_state_ == kLogWriting, IllegalState,
1348
165
        Format("Invalid log state: $0", log_state_));
1349
    // Remember log_index, because it could be reset if someone closes the log after we release
1350
    // state_lock_.
1351
165
    log_index = log_index_;
1352
165
    RETURN_NOT_OK(reader_->GetSegmentsSnapshot(&segments));
1353
1354
    // We skip the last snapshot segment because it might be mutable and is either:
1355
    // 1) A segment that was empty when we tried to rollover at the beginning of the
1356
    // function.
1357
    // 2) A segment that was created after calling AllocateSegmentAndRollOver above (or
1358
    // created even after by concurrent operations).
1359
    // In both cases segments in snapshot prior to the last one contain all operations that
1360
    // were present in log before calling Log::CopyTo and not yet GCed.
1361
165
    segments.pop_back();
1362
    // At this point all segments in `segments` are closed and immutable.
1363
1364
    // Looking for first non-empty segment.
1365
165
    auto it =
1366
165
        std::find_if(segments.begin(), segments.end(), [](const ReadableLogSegmentPtr& segment) {
1367
          // Check whether segment is not empty.
1368
165
          return segment->readable_up_to() > segment->first_entry_offset();
1369
165
        });
1370
165
    if (it != segments.end()) {
1371
      // We've found first non-empty segment to copy, set an anchor for Log GC.
1372
165
      log_copy_min_index_ = VERIFY_RESULT((*it)->ReadFirstEntryMetadata()).op_id.index;
1373
165
    }
1374
165
  }
1375
1376
165
  RETURN_NOT_OK_PREPEND(env_util::CreateDirIfMissing(options_.env, dest_wal_dir),
1377
165
                        Format("Failed to create tablet WAL dir $0", dest_wal_dir));
1378
1379
165
  RETURN_NOT_OK(log_index->Flush());
1380
1381
165
  auto* const env = options_.env;
1382
1383
165
  {
1384
636
    for (const auto& segment : segments) {
1385
636
      const auto sequence_number = segment->header().sequence_number();
1386
636
      const auto file_name = FsManager::GetWalSegmentFileName(sequence_number);
1387
636
      const auto src_path = JoinPathSegments(wal_dir_, file_name);
1388
636
      SCHECK_EQ(src_path, segment->path(), InternalError, "Log segment path does not match");
1389
636
      const auto dest_path = JoinPathSegments(dest_wal_dir, file_name);
1390
1391
636
      RETURN_NOT_OK(env->LinkFile(src_path, dest_path));
1392
636
      
VLOG_WITH_PREFIX0
(1) << Format("Hard linked $0 to $1", src_path, dest_path)0
;
1393
636
    }
1394
165
  }
1395
1396
165
  const auto files = VERIFY_RESULT(env->GetChildren(wal_dir_, ExcludeDots::kTrue));
1397
1398
966
  for (const auto& file : files) {
1399
966
    const auto src_path = JoinPathSegments(wal_dir_, file);
1400
966
    const auto dest_path = JoinPathSegments(dest_wal_dir, file);
1401
1402
966
    if (FsManager::IsWalSegmentFileName(file)) {
1403
      // Already processed above.
1404
801
    } else 
if (165
!boost::starts_with(file, kSegmentPlaceholderFilePrefix)165
) {
1405
165
      RETURN_NOT_OK_PREPEND(
1406
165
          CopyFile(env, src_path, dest_path),
1407
165
          Format("Failed to copy file $0 to $1", src_path, dest_path));
1408
165
      
VLOG_WITH_PREFIX0
(1) << Format("Copied $0 to $1", src_path, dest_path)0
;
1409
165
    }
1410
966
  }
1411
165
  return Status::OK();
1412
165
}
1413
1414
319k
uint64_t Log::NextSegmentDesiredSize() {
1415
319k
  return std::min(cur_max_segment_size_ * 2, max_segment_size_);
1416
319k
}
1417
1418
160k
Status Log::PreAllocateNewSegment() {
1419
160k
  TRACE_EVENT1("log", "PreAllocateNewSegment", "file", next_segment_path_);
1420
160k
  CHECK_EQ(allocation_state(), SegmentAllocationState::kAllocationInProgress);
1421
1422
160k
  WritableFileOptions opts;
1423
  // We always want to sync on close: https://github.com/yugabyte/yugabyte-db/issues/3490
1424
160k
  opts.sync_on_close = true;
1425
160k
  opts.o_direct = durable_wal_write_;
1426
160k
  RETURN_NOT_OK(CreatePlaceholderSegment(opts, &next_segment_path_, &next_segment_file_));
1427
1428
160k
  if (options_.preallocate_segments) {
1429
159k
    uint64_t next_segment_size = NextSegmentDesiredSize();
1430
159k
    TRACE("Preallocating $0 byte segment in $1", next_segment_size, next_segment_path_);
1431
    // TODO (perf) zero the new segments -- this could result in additional performance
1432
    // improvements.
1433
159k
    RETURN_NOT_OK(next_segment_file_->PreAllocate(next_segment_size));
1434
159k
  }
1435
1436
160k
  allocation_state_.store(SegmentAllocationState::kAllocationFinished, std::memory_order_release);
1437
160k
  return Status::OK();
1438
160k
}
1439
1440
160k
Status Log::SwitchToAllocatedSegment() {
1441
160k
  CHECK_EQ(allocation_state(), SegmentAllocationState::kAllocationFinished);
1442
1443
  // Increment "next" log segment seqno.
1444
160k
  active_segment_sequence_number_++;
1445
160k
  const string new_segment_path =
1446
160k
      FsManager::GetWalSegmentFilePath(wal_dir_, active_segment_sequence_number_);
1447
1448
160k
  RETURN_NOT_OK(get_env()->RenameFile(next_segment_path_, new_segment_path));
1449
160k
  RETURN_NOT_OK(get_env()->SyncDir(wal_dir_));
1450
1451
160k
  int64_t fault_after_min_replicate_index =
1452
160k
      FLAGS_TEST_log_fault_after_segment_allocation_min_replicate_index;
1453
160k
  if (PREDICT_FALSE(fault_after_min_replicate_index)) {
1454
11
    if (reader_->GetMinReplicateIndex() >= fault_after_min_replicate_index) {
1455
1
      MAYBE_FAULT(1.0);
1456
1
    }
1457
11
  }
1458
1459
  // Create a new segment.
1460
160k
  std::unique_ptr<WritableLogSegment> new_segment(
1461
160k
      new WritableLogSegment(new_segment_path, next_segment_file_));
1462
1463
  // Set up the new header and footer.
1464
160k
  LogSegmentHeaderPB header;
1465
160k
  header.set_major_version(kLogMajorVersion);
1466
160k
  header.set_minor_version(kLogMinorVersion);
1467
160k
  header.set_sequence_number(active_segment_sequence_number_);
1468
160k
  header.set_unused_tablet_id(tablet_id_);
1469
1470
  // Set up the new footer. This will be maintained as the segment is written.
1471
160k
  footer_builder_.Clear();
1472
160k
  footer_builder_.set_num_entries(0);
1473
1474
  // Set the new segment's schema.
1475
160k
  {
1476
160k
    SharedLock<decltype(schema_lock_)> l(schema_lock_);
1477
160k
    SchemaToPB(*schema_, header.mutable_unused_schema());
1478
160k
    header.set_unused_schema_version(schema_version_);
1479
160k
  }
1480
1481
160k
  RETURN_NOT_OK(new_segment->WriteHeaderAndOpen(header));
1482
  // Transform the currently-active segment into a readable one, since we need to be able to replay
1483
  // the segments for other peers.
1484
160k
  {
1485
160k
    if (active_segment_.get() != nullptr) {
1486
8.94k
      std::lock_guard<decltype(state_lock_)> l(state_lock_);
1487
8.94k
      CHECK_OK(ReplaceSegmentInReaderUnlocked());
1488
8.94k
    }
1489
160k
  }
1490
1491
  // Open the segment we just created in readable form and add it to the reader.
1492
160k
  std::unique_ptr<RandomAccessFile> readable_file;
1493
160k
  RETURN_NOT_OK(get_env()->NewRandomAccessFile(new_segment_path, &readable_file));
1494
1495
160k
  scoped_refptr<ReadableLogSegment> readable_segment(
1496
160k
    new ReadableLogSegment(new_segment_path,
1497
160k
                           shared_ptr<RandomAccessFile>(readable_file.release())));
1498
160k
  RETURN_NOT_OK(readable_segment->Init(header, new_segment->first_entry_offset()));
1499
160k
  RETURN_NOT_OK(reader_->AppendEmptySegment(readable_segment));
1500
1501
  // Now set 'active_segment_' to the new segment.
1502
160k
  active_segment_ = std::move(new_segment);
1503
160k
  cur_max_segment_size_ = NextSegmentDesiredSize();
1504
1505
160k
  allocation_state_.store(
1506
160k
      SegmentAllocationState::kAllocationNotStarted, std::memory_order_release);
1507
1508
160k
  return Status::OK();
1509
160k
}
1510
1511
85.2k
Status Log::ReplaceSegmentInReaderUnlocked() {
1512
  // We should never switch to a new segment if we wrote nothing to the old one.
1513
85.2k
  CHECK(active_segment_->IsClosed());
1514
85.2k
  shared_ptr<RandomAccessFile> readable_file;
1515
85.2k
  RETURN_NOT_OK(OpenFileForRandom(
1516
85.2k
      get_env(), active_segment_->path(), &readable_file));
1517
1518
84.8k
  scoped_refptr<ReadableLogSegment> readable_segment(
1519
84.8k
      new ReadableLogSegment(active_segment_->path(), readable_file));
1520
  // Note: active_segment_->header() will only contain an initialized PB if we wrote the header out.
1521
84.8k
  RETURN_NOT_OK(readable_segment->Init(active_segment_->header(),
1522
84.8k
                                       active_segment_->footer(),
1523
84.8k
                                       active_segment_->first_entry_offset()));
1524
1525
84.8k
  return reader_->ReplaceLastSegment(readable_segment);
1526
84.8k
}
1527
1528
Status Log::CreatePlaceholderSegment(const WritableFileOptions& opts,
1529
                                     string* result_path,
1530
160k
                                     shared_ptr<WritableFile>* out) {
1531
160k
  string path_tmpl = JoinPathSegments(wal_dir_, kSegmentPlaceholderFileTemplate);
1532
160k
  
VLOG_WITH_PREFIX10
(2) << "Creating temp. file for place holder segment, template: " << path_tmpl10
;
1533
160k
  std::unique_ptr<WritableFile> segment_file;
1534
160k
  RETURN_NOT_OK(get_env()->NewTempWritableFile(opts,
1535
160k
                                               path_tmpl,
1536
160k
                                               result_path,
1537
160k
                                               &segment_file));
1538
160k
  
VLOG_WITH_PREFIX46
(1) << "Created next WAL segment, placeholder path: " << *result_path46
;
1539
160k
  out->reset(segment_file.release());
1540
160k
  return Status::OK();
1541
160k
}
1542
1543
4.71k
uint64_t Log::active_segment_sequence_number() const {
1544
4.71k
  return active_segment_sequence_number_;
1545
4.71k
}
1546
1547
13
Status Log::TEST_SubmitFuncToAppendToken(const std::function<void()>& func) {
1548
13
  return appender_->TEST_SubmitFunc(func);
1549
13
}
1550
1551
103
Status Log::ResetLastSyncedEntryOpId(const OpId& op_id) {
1552
103
  RETURN_NOT_OK(WaitUntilAllFlushed());
1553
1554
103
  OpId old_value;
1555
103
  {
1556
103
    std::lock_guard<std::mutex> write_lock(last_synced_entry_op_id_mutex_);
1557
103
    old_value = last_synced_entry_op_id_.load(boost::memory_order_acquire);
1558
103
    last_synced_entry_op_id_.store(op_id, boost::memory_order_release);
1559
103
    last_synced_entry_op_id_cond_.notify_all();
1560
103
  }
1561
103
  LOG_WITH_PREFIX(INFO) << "Reset last synced entry op id from " << old_value << " to " << op_id;
1562
1563
103
  return Status::OK();
1564
103
}
1565
1566
75.0k
Log::~Log() {
1567
75.0k
  WARN_NOT_OK(Close(), "Error closing log");
1568
75.0k
}
1569
1570
// ------------------------------------------------------------------------------------------------
1571
// LogEntryBatch
1572
1573
LogEntryBatch::LogEntryBatch(LogEntryTypePB type, LogEntryBatchPB&& entry_batch_pb)
1574
    : type_(type),
1575
      entry_batch_pb_(std::move(entry_batch_pb)),
1576
26.4M
      count_(entry_batch_pb_.entry().size()) {
1577
26.4M
  if (
!IsMarkerType(type_)26.4M
) {
1578
26.4M
    DCHECK_NE(entry_batch_pb_.mono_time(), 0);
1579
26.4M
  }
1580
26.4M
}
1581
1582
26.4M
LogEntryBatch::~LogEntryBatch() {
1583
  // ReplicateMsg objects are pointed to by LogEntryBatchPB but are really owned by shared pointers
1584
  // in replicates_. To avoid double freeing, release them from the protobuf.
1585
26.4M
  for (auto& entry : *entry_batch_pb_.mutable_entry()) {
1586
15.3M
    if (entry.has_replicate()) {
1587
0
      entry.release_replicate();
1588
0
    }
1589
15.3M
  }
1590
26.4M
}
1591
1592
25.0M
void LogEntryBatch::MarkReserved() {
1593
25.0M
  DCHECK_EQ(state_, kEntryInitialized);
1594
25.0M
  state_ = kEntryReserved;
1595
25.0M
}
1596
1597
25.0M
bool LogEntryBatch::IsMarker() const {
1598
25.0M
  return count() == 1 && 
IsMarkerType(entry_batch_pb_.entry(0).type())13.4M
;
1599
25.0M
}
1600
1601
25.0M
bool LogEntryBatch::IsSingleEntryOfType(LogEntryTypePB type) const {
1602
25.0M
  return count() == 1 && 
entry_batch_pb_.entry(0).type() == type13.4M
;
1603
25.0M
}
1604
1605
25.0M
Status LogEntryBatch::Serialize() {
1606
25.0M
  DCHECK_EQ(state_, kEntryReady);
1607
25.0M
  buffer_.clear();
1608
  // *_MARKER LogEntries are markers and are not serialized.
1609
25.0M
  if (PREDICT_FALSE(IsMarker())) {
1610
6.43k
    total_size_bytes_ = 0;
1611
6.43k
    state_ = kEntrySerialized;
1612
6.43k
    return Status::OK();
1613
6.43k
  }
1614
25.0M
  DCHECK_NE(entry_batch_pb_.mono_time(), 0);
1615
25.0M
  total_size_bytes_ = entry_batch_pb_.ByteSize();
1616
25.0M
  buffer_.reserve(total_size_bytes_);
1617
1618
25.0M
  pb_util::AppendToString(entry_batch_pb_, &buffer_);
1619
1620
25.0M
  state_ = kEntrySerialized;
1621
25.0M
  return Status::OK();
1622
25.0M
}
1623
1624
26.4M
void LogEntryBatch::MarkReady() {
1625
26.4M
  DCHECK_EQ(state_, kEntryReserved);
1626
26.4M
  state_ = kEntryReady;
1627
26.4M
}
1628
1629
}  // namespace log
1630
}  // namespace yb