YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/consensus/log.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include "yb/consensus/log.h"
34
35
#include <algorithm>
36
#include <atomic>
37
#include <chrono>
38
#include <condition_variable>
39
#include <memory>
40
#include <mutex>
41
#include <thread>
42
#include <vector>
43
44
#include <boost/algorithm/string/predicate.hpp>
45
#include <gflags/gflags.h>
46
47
#include "yb/common/schema.h"
48
#include "yb/common/wire_protocol.h"
49
50
#include "yb/consensus/consensus_util.h"
51
#include "yb/consensus/log_index.h"
52
#include "yb/consensus/log_metrics.h"
53
#include "yb/consensus/log_reader.h"
54
#include "yb/consensus/log_util.h"
55
56
#include "yb/fs/fs_manager.h"
57
58
#include "yb/gutil/bind.h"
59
#include "yb/gutil/map-util.h"
60
#include "yb/gutil/ref_counted.h"
61
#include "yb/gutil/stl_util.h"
62
#include "yb/gutil/strings/substitute.h"
63
#include "yb/gutil/walltime.h"
64
65
#include "yb/util/async_util.h"
66
#include "yb/util/countdown_latch.h"
67
#include "yb/util/debug/long_operation_tracker.h"
68
#include "yb/util/debug/trace_event.h"
69
#include "yb/util/env_util.h"
70
#include "yb/util/fault_injection.h"
71
#include "yb/util/file_util.h"
72
#include "yb/util/flag_tags.h"
73
#include "yb/util/format.h"
74
#include "yb/util/logging.h"
75
#include "yb/util/metrics.h"
76
#include "yb/util/opid.h"
77
#include "yb/util/path_util.h"
78
#include "yb/util/pb_util.h"
79
#include "yb/util/random.h"
80
#include "yb/util/scope_exit.h"
81
#include "yb/util/shared_lock.h"
82
#include "yb/util/size_literals.h"
83
#include "yb/util/status.h"
84
#include "yb/util/status_format.h"
85
#include "yb/util/status_log.h"
86
#include "yb/util/stopwatch.h"
87
#include "yb/util/taskstream.h"
88
#include "yb/util/thread.h"
89
#include "yb/util/trace.h"
90
#include "yb/util/tsan_util.h"
91
#include "yb/util/unique_lock.h"
92
93
using namespace yb::size_literals;  // NOLINT.
94
using namespace std::literals;  // NOLINT.
95
using namespace std::placeholders;
96
97
// Log retention configuration.
98
// -----------------------------
99
DEFINE_int32(log_min_segments_to_retain, 2,
100
             "The minimum number of past log segments to keep at all times,"
101
             " regardless of what is required for durability. "
102
             "Must be at least 1.");
103
TAG_FLAG(log_min_segments_to_retain, runtime);
104
TAG_FLAG(log_min_segments_to_retain, advanced);
105
106
DEFINE_int32(log_min_seconds_to_retain, 900,
107
             "The minimum number of seconds for which to keep log segments to keep at all times, "
108
             "regardless of what is required for durability. Logs may be still retained for "
109
             "a longer amount of time if they are necessary for correct restart. This should be "
110
             "set long enough such that a tablet server which has temporarily failed can be "
111
             "restarted within the given time period. If a server is down for longer than this "
112
             "amount of time, it is possible that its tablets will be re-replicated on other "
113
             "machines.");
114
TAG_FLAG(log_min_seconds_to_retain, runtime);
115
TAG_FLAG(log_min_seconds_to_retain, advanced);
116
117
// Flags for controlling kernel watchdog limits.
118
DEFINE_int32(consensus_log_scoped_watch_delay_callback_threshold_ms, 1000,
119
             "If calling consensus log callback(s) take longer than this, the kernel watchdog "
120
             "will print out a stack trace.");
121
TAG_FLAG(consensus_log_scoped_watch_delay_callback_threshold_ms, runtime);
122
TAG_FLAG(consensus_log_scoped_watch_delay_callback_threshold_ms, advanced);
123
DEFINE_int32(consensus_log_scoped_watch_delay_append_threshold_ms, 1000,
124
             "If consensus log append takes longer than this, the kernel watchdog "
125
             "will print out a stack trace.");
126
TAG_FLAG(consensus_log_scoped_watch_delay_append_threshold_ms, runtime);
127
TAG_FLAG(consensus_log_scoped_watch_delay_append_threshold_ms, advanced);
128
129
// Fault/latency injection flags.
130
// -----------------------------
131
DEFINE_bool(log_inject_latency, false,
132
            "If true, injects artificial latency in log sync operations. "
133
            "Advanced option. Use at your own risk -- has a negative effect "
134
            "on performance for obvious reasons!");
135
DEFINE_int32(log_inject_latency_ms_mean, 100,
136
             "The number of milliseconds of latency to inject, on average. "
137
             "Only takes effect if --log_inject_latency is true");
138
DEFINE_int32(log_inject_latency_ms_stddev, 100,
139
             "The standard deviation of latency to inject in before log sync operations. "
140
             "Only takes effect if --log_inject_latency is true");
141
TAG_FLAG(log_inject_latency, unsafe);
142
TAG_FLAG(log_inject_latency_ms_mean, unsafe);
143
TAG_FLAG(log_inject_latency_ms_stddev, unsafe);
144
145
DEFINE_int32(log_inject_append_latency_ms_max, 0,
146
             "The maximum latency to inject before the log append operation.");
147
148
DEFINE_test_flag(bool, log_consider_all_ops_safe, false,
149
            "If true, we consider all operations to be safe and will not wait"
150
            "for the opId to apply to the local log. i.e. WaitForSafeOpIdToApply "
151
            "becomes a noop.");
152
153
DEFINE_test_flag(bool, simulate_abrupt_server_restart, false,
154
                 "If true, don't properly close the log segment.");
155
156
// TaskStream flags.
157
// We have to make the queue length really long.
158
// TODO: Create new flags log_taskstream_queue_max_size and log_taskstream_queue_max_wait_ms
159
// and deprecate these flags.
160
DEFINE_int32(taskstream_queue_max_size, 100000,
161
             "Maximum number of operations waiting in the taskstream queue.");
162
163
DEFINE_int32(taskstream_queue_max_wait_ms, 1000,
164
             "Maximum time in ms to wait for items in the taskstream queue to arrive.");
165
166
DEFINE_int32(wait_for_safe_op_id_to_apply_default_timeout_ms, 15000 * yb::kTimeMultiplier,
167
             "Timeout used by WaitForSafeOpIdToApply when it was not specified by caller.");
168
169
DEFINE_test_flag(int64, log_fault_after_segment_allocation_min_replicate_index, 0,
170
                 "Fault of segment allocation when min replicate index is at least specified. "
171
                 "0 to disable.");
172
173
DEFINE_int64(time_based_wal_gc_clock_delta_usec, 0,
174
             "A delta in microseconds to add to the clock value used to determine if a WAL "
175
             "segment is safe to be garbage collected. This is needed for clusters running with a "
176
             "skewed hybrid clock, because the clock used for time-based WAL GC is the wall clock, "
177
             "not hybrid clock.");
178
179
// Validate that log_min_segments_to_retain >= 1
180
11.2k
static bool ValidateLogsToRetain(const char* flagname, int value) {
181
11.2k
  if (value >= 1) {
182
11.2k
    return true;
183
11.2k
  }
184
0
  LOG(ERROR) << strings::Substitute("$0 must be at least 1, value $1 is invalid",
185
0
                                    flagname, value);
186
0
  return false;
187
0
}
188
static bool dummy = google::RegisterFlagValidator(
189
    &FLAGS_log_min_segments_to_retain, &ValidateLogsToRetain);
190
191
static std::string kSegmentPlaceholderFilePrefix = ".tmp.newsegment";
192
static std::string kSegmentPlaceholderFileTemplate = kSegmentPlaceholderFilePrefix + "XXXXXX";
193
194
namespace yb {
195
namespace log {
196
197
using env_util::OpenFileForRandom;
198
using std::shared_ptr;
199
using std::unique_ptr;
200
using strings::Substitute;
201
202
namespace {
203
204
22.5M
bool IsMarkerType(LogEntryTypePB type) {
205
22.5M
  return type == LogEntryTypePB::ROLLOVER_MARKER ||
206
22.5M
         type == LogEntryTypePB::FLUSH_MARKER;
207
22.5M
}
208
209
} // namespace
210
211
// This class represents a batch of operations to be written and synced to the log. It is opaque to
212
// the user and is managed by the Log class.
213
class LogEntryBatch {
214
 public:
215
  LogEntryBatch(LogEntryTypePB type, LogEntryBatchPB&& entry_batch_pb);
216
  ~LogEntryBatch();
217
218
0
  std::string ToString() const {
219
0
    return Format("{ type: $0 state: $1 max_op_id: $2 }", type_, state_, MaxReplicateOpId());
220
0
  }
221
222
28.7M
  bool HasReplicateEntries() const {
223
28.7M
    return type_ == LogEntryTypePB::REPLICATE && count() > 0;
224
28.7M
  }
225
226
 private:
227
  friend class Log;
228
  friend class MultiThreadedLogTest;
229
230
  // Serializes contents of the entry to an internal buffer.
231
  CHECKED_STATUS Serialize();
232
233
  // Sets the callback that will be invoked after the entry is
234
  // appended and synced to disk
235
13.9M
  void set_callback(const StatusCallback& cb) {
236
13.9M
    callback_ = cb;
237
13.9M
  }
238
239
  // Returns the callback that will be invoked after the entry is
240
  // appended and synced to disk.
241
27.8M
  const StatusCallback& callback() {
242
27.8M
    return callback_;
243
27.8M
  }
244
245
13.9M
  bool failed_to_append() const {
246
13.9M
    return state_ == kEntryFailedToAppend;
247
13.9M
  }
248
249
0
  void set_failed_to_append() {
250
0
    state_ = kEntryFailedToAppend;
251
0
  }
252
253
  // Mark the entry as reserved, but not yet ready to write to the log.
254
  void MarkReserved();
255
256
  // Mark the entry as ready to write to log.
257
  void MarkReady();
258
259
  // Returns a Slice representing the serialized contents of the entry.
260
13.9M
  Slice data() const {
261
13.9M
    DCHECK_EQ(state_, kEntrySerialized);
262
13.9M
    return Slice(buffer_);
263
13.9M
  }
264
265
  bool IsMarker() const;
266
267
  bool IsSingleEntryOfType(LogEntryTypePB type) const;
268
269
71.4M
  size_t count() const { return count_; }
270
271
  // Returns the total size in bytes of the object.
272
27.8M
  size_t total_size_bytes() const {
273
27.8M
    return total_size_bytes_;
274
27.8M
  }
275
276
  // The highest OpId of a REPLICATE message in this batch.
277
16.7M
  OpId MaxReplicateOpId() const {
278
16.7M
    DCHECK_EQ(REPLICATE, type_);
279
16.7M
    int idx = entry_batch_pb_.entry_size() - 1;
280
16.7M
    if (idx < 0) {
281
0
      return OpId::Invalid();
282
0
    }
283
16.7M
    DCHECK(entry_batch_pb_.entry(idx).replicate().IsInitialized());
284
16.7M
    return OpId::FromPB(entry_batch_pb_.entry(idx).replicate().id());
285
16.7M
  }
286
287
13.8M
  void SetReplicates(const ReplicateMsgs& replicates) {
288
13.8M
    replicates_ = replicates;
289
13.8M
  }
290
291
  // The type of entries in this batch.
292
  const LogEntryTypePB type_;
293
294
  // Contents of the log entries that will be written to disk.
295
  LogEntryBatchPB entry_batch_pb_;
296
297
  // Total size in bytes of all entries
298
  uint32_t total_size_bytes_ = 0;
299
300
  // Number of entries in 'entry_batch_pb_'
301
  const size_t count_;
302
303
  // The vector of refcounted replicates.  This makes sure there's at least a reference to each
304
  // replicate message until we're finished appending.
305
  ReplicateMsgs replicates_;
306
307
  // Callback to be invoked upon the entries being written and synced to disk.
308
  StatusCallback callback_;
309
310
  // Buffer to which 'phys_entries_' are serialized by call to 'Serialize()'
311
  faststring buffer_;
312
313
  // Offset into the log file for this entry batch.
314
  int64_t offset_;
315
316
  // Segment sequence number for this entry batch.
317
  uint64_t active_segment_sequence_number_;
318
319
  enum LogEntryState {
320
    kEntryInitialized,
321
    kEntryReserved,
322
    kEntryReady,
323
    kEntrySerialized,
324
    kEntryFailedToAppend
325
  };
326
  LogEntryState state_ = kEntryInitialized;
327
328
  DISALLOW_COPY_AND_ASSIGN(LogEntryBatch);
329
};
330
331
// This class is responsible for managing the task that appends to the log file.
332
// This task runs in a common thread pool with append tasks from other tablets.
333
// A token is used to ensure that only one append task per tablet is executed concurrently.
334
class Log::Appender {
335
 public:
336
  explicit Appender(Log* log, ThreadPool* append_thread_pool);
337
338
  // Initializes the objects and starts the task.
339
  Status Init();
340
341
13.8M
  CHECKED_STATUS Submit(LogEntryBatch* item) {
342
13.8M
    return task_stream_->Submit(item);
343
13.8M
  }
344
345
13
  CHECKED_STATUS TEST_SubmitFunc(const std::function<void()>& func) {
346
13
    return task_stream_->TEST_SubmitFunc(func);
347
13
  }
348
349
  // Waits until the last enqueued elements are processed, sets the appender_ to closing
350
  // state. If any entries are added to the queue during the process, invoke their callbacks'
351
  // 'OnFailure()' method.
352
  void Shutdown();
353
354
0
  const std::string& LogPrefix() const {
355
0
    return log_->LogPrefix();
356
0
  }
357
358
0
  std::string GetRunThreadStack() const {
359
0
    return task_stream_->GetRunThreadStack();
360
0
  }
361
362
0
  std::string ToString() const {
363
0
    return task_stream_->ToString();
364
0
  }
365
366
 private:
367
  // Process the given log entry batch or does a sync if a null is passed.
368
  void ProcessBatch(LogEntryBatch* entry_batch);
369
  void GroupWork();
370
371
  Log* const log_;
372
373
  // Lock to protect access to thread_ during shutdown.
374
  mutable std::mutex lock_;
375
  unique_ptr<TaskStream<LogEntryBatch>> task_stream_;
376
377
  // vector of entry batches in group, to execute callbacks after call to Sync.
378
  std::vector<std::unique_ptr<LogEntryBatch>> sync_batch_;
379
380
  // Time at which current group was started
381
  MonoTime time_started_;
382
};
383
384
Log::Appender::Appender(Log *log, ThreadPool* append_thread_pool)
385
    : log_(log),
386
      task_stream_(new TaskStream<LogEntryBatch>(
387
          std::bind(&Log::Appender::ProcessBatch, this, _1), append_thread_pool,
388
          FLAGS_taskstream_queue_max_size,
389
89.6k
          MonoDelta::FromMilliseconds(FLAGS_taskstream_queue_max_wait_ms))) {
390
89.6k
  DCHECK(dummy);
391
89.6k
}
392
393
89.6k
Status Log::Appender::Init() {
394
72
  VLOG_WITH_PREFIX(1) << "Starting log task stream";
395
89.6k
  return Status::OK();
396
89.6k
}
397
398
27.7M
void Log::Appender::ProcessBatch(LogEntryBatch* entry_batch) {
399
  // A callback function to TaskStream is expected to process the accumulated batch of entries.
400
27.7M
  if (entry_batch == nullptr) {
401
    // Here, we do sync and call callbacks.
402
13.8M
    GroupWork();
403
13.8M
    return;
404
13.8M
  }
405
406
13.9M
  if (sync_batch_.empty()) { // Start of batch.
407
    // Used in tests to delay writing log entries.
408
13.8M
    auto sleep_duration = log_->sleep_duration_.load(std::memory_order_acquire);
409
13.8M
    if (sleep_duration.count() > 0) {
410
1
      std::this_thread::sleep_for(sleep_duration);
411
1
    }
412
13.8M
    time_started_ = MonoTime::Now();
413
13.8M
  }
414
13.9M
  TRACE_EVENT_FLOW_END0("log", "Batch", entry_batch);
415
13.9M
  Status s = log_->DoAppend(entry_batch);
416
417
13.9M
  if (PREDICT_FALSE(!s.ok())) {
418
0
    LOG_WITH_PREFIX(DFATAL) << "Error appending to the log: " << s;
419
0
    entry_batch->set_failed_to_append();
420
    // TODO If a single operation fails to append, should we abort all subsequent operations
421
    // in this batch or allow them to be appended? What about operations in future batches?
422
0
    if (!entry_batch->callback().is_null()) {
423
0
      entry_batch->callback().Run(s);
424
0
    }
425
0
    return;
426
0
  }
427
13.9M
  if (!log_->sync_disabled_) {
428
13.8M
    bool expected = false;
429
13.8M
    if (log_->periodic_sync_needed_.compare_exchange_strong(expected, true,
430
336k
                                                            std::memory_order_acq_rel)) {
431
336k
      log_->periodic_sync_earliest_unsync_entry_time_ = MonoTime::Now();
432
336k
    }
433
13.8M
    log_->periodic_sync_unsynced_bytes_ += entry_batch->total_size_bytes();
434
13.8M
  }
435
13.9M
  sync_batch_.emplace_back(entry_batch);
436
13.9M
}
437
438
13.8M
void Log::Appender::GroupWork() {
439
13.8M
  if (sync_batch_.empty()) {
440
0
    Status s = log_->Sync();
441
0
    return;
442
0
  }
443
13.8M
  if (log_->metrics_) {
444
13.8M
    log_->metrics_->entry_batches_per_group->Increment(sync_batch_.size());
445
13.8M
  }
446
13.8M
  TRACE_EVENT1("log", "batch", "batch_size", sync_batch_.size());
447
448
13.8M
  auto se = ScopeExit([this] {
449
13.8M
    if (log_->metrics_) {
450
13.8M
      MonoTime time_now = MonoTime::Now();
451
13.8M
      log_->metrics_->group_commit_latency->Increment(
452
13.8M
          time_now.GetDeltaSince(time_started_).ToMicroseconds());
453
13.8M
    }
454
13.8M
    sync_batch_.clear();
455
13.8M
  });
456
457
13.8M
  Status s = log_->Sync();
458
13.8M
  if (PREDICT_FALSE(!s.ok())) {
459
0
    LOG_WITH_PREFIX(DFATAL) << "Error syncing log: " << s;
460
0
    for (std::unique_ptr<LogEntryBatch>& entry_batch : sync_batch_) {
461
0
      if (!entry_batch->callback().is_null()) {
462
0
        entry_batch->callback().Run(s);
463
0
      }
464
0
    }
465
13.8M
  } else {
466
13.8M
    TRACE_EVENT0("log", "Callbacks");
467
1.53k
    VLOG_WITH_PREFIX(2) << "Synchronized " << sync_batch_.size() << " entry batches";
468
13.8M
    LongOperationTracker long_operation_tracker(
469
13.8M
        "Log callback", FLAGS_consensus_log_scoped_watch_delay_callback_threshold_ms * 1ms);
470
13.9M
    for (std::unique_ptr<LogEntryBatch>& entry_batch : sync_batch_) {
471
13.9M
      if (PREDICT_TRUE(!entry_batch->failed_to_append() && !entry_batch->callback().is_null())) {
472
13.9M
        entry_batch->callback().Run(Status::OK());
473
13.9M
      }
474
      // It's important to delete each batch as we see it, because deleting it may free up memory
475
      // from memory trackers, and the callback of a later batch may want to use that memory.
476
13.9M
      entry_batch.reset();
477
13.9M
    }
478
13.8M
    sync_batch_.clear();
479
13.8M
  }
480
18.4E
  VLOG_WITH_PREFIX(1) << "Exiting AppendTask for tablet " << log_->tablet_id();
481
13.8M
}
482
483
95.7k
void Log::Appender::Shutdown() {
484
95.7k
  std::lock_guard<std::mutex> lock_guard(lock_);
485
95.7k
  if (task_stream_) {
486
1
    VLOG_WITH_PREFIX(1) << "Shutting down log task stream";
487
48.6k
    task_stream_->Stop();
488
7
    VLOG_WITH_PREFIX(1) << "Log append task stream is shut down";
489
48.6k
    task_stream_.reset();
490
48.6k
  }
491
95.7k
}
492
493
// This task is submitted to allocation_pool_ in order to asynchronously pre-allocate new log
494
// segments.
495
97.0k
void Log::SegmentAllocationTask() {
496
97.0k
  allocation_status_.Set(PreAllocateNewSegment());
497
97.0k
}
498
499
const Status Log::kLogShutdownStatus(
500
    STATUS(ServiceUnavailable, "WAL is shutting down", "", Errno(ESHUTDOWN)));
501
502
Status Log::Open(const LogOptions &options,
503
                 const std::string& tablet_id,
504
                 const std::string& wal_dir,
505
                 const std::string& peer_uuid,
506
                 const Schema& schema,
507
                 uint32_t schema_version,
508
                 const scoped_refptr<MetricEntity>& table_metric_entity,
509
                 const scoped_refptr<MetricEntity>& tablet_metric_entity,
510
                 ThreadPool* append_thread_pool,
511
                 ThreadPool* allocation_thread_pool,
512
                 int64_t cdc_min_replicated_index,
513
                 scoped_refptr<Log>* log,
514
89.6k
                 CreateNewSegment create_new_segment) {
515
516
89.6k
  RETURN_NOT_OK_PREPEND(env_util::CreateDirIfMissing(options.env, DirName(wal_dir)),
517
89.6k
                        Substitute("Failed to create table wal dir $0", DirName(wal_dir)));
518
519
89.6k
  RETURN_NOT_OK_PREPEND(env_util::CreateDirIfMissing(options.env, wal_dir),
520
89.6k
                        Substitute("Failed to create tablet wal dir $0", wal_dir));
521
522
89.6k
  scoped_refptr<Log> new_log(new Log(options,
523
89.6k
                                     wal_dir,
524
89.6k
                                     tablet_id,
525
89.6k
                                     peer_uuid,
526
89.6k
                                     schema,
527
89.6k
                                     schema_version,
528
89.6k
                                     table_metric_entity,
529
89.6k
                                     tablet_metric_entity,
530
89.6k
                                     append_thread_pool,
531
89.6k
                                     allocation_thread_pool,
532
89.6k
                                     create_new_segment));
533
89.6k
  RETURN_NOT_OK(new_log->Init());
534
89.6k
  log->swap(new_log);
535
89.6k
  return Status::OK();
536
89.6k
}
537
538
Log::Log(
539
    LogOptions options,
540
    string wal_dir,
541
    string tablet_id,
542
    string peer_uuid,
543
    const Schema& schema,
544
    uint32_t schema_version,
545
    const scoped_refptr<MetricEntity>& table_metric_entity,
546
    const scoped_refptr<MetricEntity>& tablet_metric_entity,
547
    ThreadPool* append_thread_pool,
548
    ThreadPool* allocation_thread_pool,
549
    CreateNewSegment create_new_segment)
550
    : options_(std::move(options)),
551
      wal_dir_(std::move(wal_dir)),
552
      tablet_id_(std::move(tablet_id)),
553
      peer_uuid_(std::move(peer_uuid)),
554
      schema_(std::make_unique<Schema>(schema)),
555
      schema_version_(schema_version),
556
      active_segment_sequence_number_(options.initial_active_segment_sequence_number),
557
      log_state_(kLogInitialized),
558
      max_segment_size_(options_.segment_size_bytes),
559
      // We halve the initial log segment size here because we double it for every new segment,
560
      // including the very first segment.
561
      cur_max_segment_size_((options.initial_segment_size_bytes + 1) / 2),
562
      appender_(new Appender(this, append_thread_pool)),
563
      allocation_token_(allocation_thread_pool->NewToken(ThreadPool::ExecutionMode::SERIAL)),
564
      durable_wal_write_(options_.durable_wal_write),
565
      interval_durable_wal_write_(options_.interval_durable_wal_write),
566
      bytes_durable_wal_write_mb_(options_.bytes_durable_wal_write_mb),
567
      sync_disabled_(false),
568
      allocation_state_(SegmentAllocationState::kAllocationNotStarted),
569
      table_metric_entity_(table_metric_entity),
570
      tablet_metric_entity_(tablet_metric_entity),
571
      on_disk_size_(0),
572
      log_prefix_(consensus::MakeTabletLogPrefix(tablet_id_, peer_uuid_)),
573
89.6k
      create_new_segment_at_start_(create_new_segment) {
574
89.6k
  set_wal_retention_secs(options.retention_secs);
575
89.6k
  if (table_metric_entity_ && tablet_metric_entity_) {
576
89.2k
    metrics_.reset(new LogMetrics(table_metric_entity_, tablet_metric_entity_));
577
89.2k
  }
578
89.6k
}
579
580
89.6k
Status Log::Init() {
581
89.6k
  std::lock_guard<percpu_rwlock> write_lock(state_lock_);
582
89.6k
  CHECK_EQ(kLogInitialized, log_state_);
583
  // Init the index
584
89.6k
  log_index_.reset(new LogIndex(wal_dir_));
585
  // Reader for previous segments.
586
89.6k
  RETURN_NOT_OK(LogReader::Open(get_env(),
587
89.6k
                                log_index_,
588
89.6k
                                log_prefix_,
589
89.6k
                                wal_dir_,
590
89.6k
                                table_metric_entity_.get(),
591
89.6k
                                tablet_metric_entity_.get(),
592
89.6k
                                &reader_));
593
594
  // The case where we are continuing an existing log.  We must pick up where the previous WAL left
595
  // off in terms of sequence numbers.
596
89.6k
  if (reader_->num_segments() != 0) {
597
0
    VLOG_WITH_PREFIX(1) << "Using existing " << reader_->num_segments()
598
0
                        << " segments from path: " << wal_dir_;
599
600
1.73k
    vector<scoped_refptr<ReadableLogSegment> > segments;
601
1.73k
    RETURN_NOT_OK(reader_->GetSegmentsSnapshot(&segments));
602
1.73k
    active_segment_sequence_number_ = segments.back()->header().sequence_number();
603
1.73k
    LOG_WITH_PREFIX(INFO) << "Opened existing logs. Last segment is " << segments.back()->path();
604
605
    // In case where TServer process reboots, we need to reload the wal file size into the metric,
606
    // otherwise we do nothing
607
1.73k
    if (metrics_ && metrics_->wal_size->value() == 0) {
608
1.22k
      std::for_each(segments.begin(), segments.end(),
609
2.22k
                    [this](const auto& segment) {
610
2.22k
                    this->metrics_->wal_size->IncrementBy(segment->file_size());});
611
1.22k
    }
612
613
1.73k
  }
614
615
89.6k
  if (durable_wal_write_) {
616
47
    YB_LOG_FIRST_N(INFO, 1) << "durable_wal_write is turned on.";
617
89.6k
  } else if (interval_durable_wal_write_) {
618
9.53k
    YB_LOG_FIRST_N(INFO, 1) << "interval_durable_wal_write_ms is turned on to sync every "
619
9.53k
                            << interval_durable_wal_write_.ToMilliseconds() << " ms.";
620
176
  } else if (bytes_durable_wal_write_mb_ > 0) {
621
0
    YB_LOG_FIRST_N(INFO, 1) << "bytes_durable_wal_write_mb is turned on to sync every "
622
0
     << bytes_durable_wal_write_mb_ << " MB of data.";
623
176
  } else {
624
0
    YB_LOG_FIRST_N(INFO, 1) << "durable_wal_write is turned off. Buffered IO will be used for WAL.";
625
176
  }
626
627
89.6k
  if (create_new_segment_at_start_) {
628
87.8k
    RETURN_NOT_OK(EnsureInitialNewSegmentAllocated());
629
87.8k
  }
630
89.6k
  return Status::OK();
631
89.6k
}
632
633
96.9k
Status Log::AsyncAllocateSegment() {
634
96.9k
  SCHECK_EQ(
635
96.9k
      allocation_state_.load(std::memory_order_acquire),
636
96.9k
      SegmentAllocationState::kAllocationNotStarted, AlreadyPresent, "Allocation already running");
637
96.9k
  allocation_status_.Reset();
638
96.9k
  allocation_state_.store(SegmentAllocationState::kAllocationInProgress, std::memory_order_release);
639
10
  VLOG_WITH_PREFIX(1) << "Active segment: " << active_segment_sequence_number_
640
10
                      << ". Starting new segment allocation.";
641
96.9k
  return allocation_token_->SubmitClosure(Bind(&Log::SegmentAllocationTask, Unretained(this)));
642
96.9k
}
643
644
56.0k
Status Log::CloseCurrentSegment() {
645
56.0k
  if (!footer_builder_.has_min_replicate_index()) {
646
0
    VLOG_WITH_PREFIX(1) << "Writing a segment without any REPLICATE message. Segment: "
647
0
                        << active_segment_->path();
648
663
  }
649
5
  VLOG_WITH_PREFIX(2) << "Segment footer for " << active_segment_->path()
650
5
                      << ": " << footer_builder_.ShortDebugString();
651
652
56.0k
  auto close_timestamp_micros = GetCurrentTimeMicros();
653
654
56.0k
  if (FLAGS_time_based_wal_gc_clock_delta_usec != 0) {
655
0
    auto unadjusted_close_timestamp_micros = close_timestamp_micros;
656
0
    close_timestamp_micros += FLAGS_time_based_wal_gc_clock_delta_usec;
657
0
    LOG_WITH_PREFIX(INFO)
658
0
        << "Adjusting log segment closing timestamp by "
659
0
        << FLAGS_time_based_wal_gc_clock_delta_usec << " usec from "
660
0
        << unadjusted_close_timestamp_micros << " usec to " << close_timestamp_micros << " usec";
661
0
  }
662
663
56.0k
  footer_builder_.set_close_timestamp_micros(close_timestamp_micros);
664
665
56.0k
  auto status = active_segment_->WriteFooterAndClose(footer_builder_);
666
667
56.0k
  if (status.ok() && metrics_) {
668
55.4k
      metrics_->wal_size->IncrementBy(active_segment_->Size());
669
55.4k
  }
670
56.0k
  return status;
671
56.0k
}
672
673
7.40k
Status Log::RollOver() {
674
7.40k
  LOG_SLOW_EXECUTION(WARNING, 50, LogPrefix() + "Log roll took a long time") {
675
7.40k
    SCOPED_LATENCY_METRIC(metrics_, roll_latency);
676
7.40k
    RSTATUS_DCHECK(active_segment_, InternalError, "Called RollOver without active segment.");
677
678
    // Check if any errors have occurred during allocation
679
7.40k
    RETURN_NOT_OK(allocation_status_.Get());
680
681
7.40k
    DCHECK_EQ(allocation_state(), SegmentAllocationState::kAllocationFinished);
682
683
7.40k
    LOG_WITH_PREFIX(INFO) << Format(
684
7.40k
        "Last appended OpId in segment $0: $1", active_segment_->path(),
685
7.40k
        last_appended_entry_op_id_.ToString());
686
687
7.40k
    RETURN_NOT_OK(Sync());
688
7.40k
    RETURN_NOT_OK(CloseCurrentSegment());
689
690
7.40k
    RETURN_NOT_OK(SwitchToAllocatedSegment());
691
692
7.40k
    LOG_WITH_PREFIX(INFO) << "Rolled over to a new segment: " << active_segment_->path();
693
7.40k
  }
694
7.40k
  return Status::OK();
695
7.40k
}
696
697
void Log::Reserve(LogEntryTypePB type,
698
                    LogEntryBatchPB* entry_batch,
699
13.8M
                    LogEntryBatch** reserved_entry) {
700
13.8M
  TRACE_EVENT0("log", "Log::Reserve");
701
13.8M
  DCHECK(reserved_entry != nullptr);
702
13.8M
  {
703
13.8M
    SharedLock<rw_spinlock> read_lock(state_lock_.get_lock());
704
13.8M
    CHECK_EQ(kLogWriting, log_state_);
705
13.8M
  }
706
707
  // In DEBUG builds, verify that all of the entries in the batch match the specified type.  In
708
  // non-debug builds the foreach loop gets optimized out.
709
13.8M
#ifndef NDEBUG
710
8.67M
  for (const LogEntryPB& entry : entry_batch->entry()) {
711
0
    DCHECK_EQ(entry.type(), type) << "Bad batch: " << entry_batch->DebugString();
712
8.67M
  }
713
13.8M
#endif
714
715
13.8M
  auto new_entry_batch = std::make_unique<LogEntryBatch>(type, std::move(*entry_batch));
716
13.8M
  new_entry_batch->MarkReserved();
717
718
  // Release the memory back to the caller: this will be freed when
719
  // the entry is removed from the queue.
720
  //
721
  // TODO (perf) Use a ring buffer instead of a blocking queue and set
722
  // 'reserved_entry' to a pre-allocated slot in the buffer.
723
13.8M
  *reserved_entry = new_entry_batch.release();
724
13.8M
}
725
726
Status Log::TEST_AsyncAppendWithReplicates(
727
2.00k
    LogEntryBatch* entry, const ReplicateMsgs& replicates, const StatusCallback& callback) {
728
2.00k
  entry->SetReplicates(replicates);
729
2.00k
  return AsyncAppend(entry, callback);
730
2.00k
}
731
732
13.8M
Status Log::AsyncAppend(LogEntryBatch* entry_batch, const StatusCallback& callback) {
733
13.8M
  {
734
13.8M
    SharedLock<rw_spinlock> read_lock(state_lock_.get_lock());
735
13.8M
    CHECK_EQ(kLogWriting, log_state_);
736
13.8M
  }
737
738
13.8M
  entry_batch->set_callback(callback);
739
13.8M
  entry_batch->MarkReady();
740
741
13.8M
  if (entry_batch->HasReplicateEntries()) {
742
7.90M
    last_submitted_op_id_ = entry_batch->MaxReplicateOpId();
743
7.90M
  }
744
745
13.8M
  auto submit_status = appender_->Submit(entry_batch);
746
13.8M
  if (PREDICT_FALSE(!submit_status.ok())) {
747
0
    LOG_WITH_PREFIX(WARNING)
748
0
        << "Failed to submit batch " << entry_batch->MaxReplicateOpId() << ": " << submit_status;
749
0
    delete entry_batch;
750
0
    return kLogShutdownStatus;
751
0
  }
752
753
13.8M
  return Status::OK();
754
13.8M
}
755
756
Status Log::AsyncAppendReplicates(const ReplicateMsgs& msgs, const yb::OpId& committed_op_id,
757
                                  RestartSafeCoarseTimePoint batch_mono_time,
758
13.8M
                                  const StatusCallback& callback) {
759
13.8M
  auto batch = CreateBatchFromAllocatedOperations(msgs);
760
13.8M
  if (!committed_op_id.empty()) {
761
13.6M
    committed_op_id.ToPB(batch.mutable_committed_op_id());
762
13.6M
  }
763
  // Set batch mono time if it was specified.
764
13.8M
  if (batch_mono_time != RestartSafeCoarseTimePoint()) {
765
13.8M
    batch.set_mono_time(batch_mono_time.ToUInt64());
766
13.8M
  }
767
768
13.8M
  LogEntryBatch* reserved_entry_batch;
769
13.8M
  Reserve(REPLICATE, &batch, &reserved_entry_batch);
770
771
  // If we're able to reserve, set the vector of replicate shared pointers in the LogEntryBatch.
772
  // This will make sure there's a reference for each replicate while we're appending.
773
13.8M
  reserved_entry_batch->SetReplicates(msgs);
774
775
13.8M
  RETURN_NOT_OK(AsyncAppend(reserved_entry_batch, callback));
776
13.8M
  return Status::OK();
777
13.8M
}
778
779
Status Log::DoAppend(LogEntryBatch* entry_batch,
780
                     bool caller_owns_operation,
781
14.8M
                     bool skip_wal_write) {
782
14.8M
  if (!skip_wal_write) {
783
13.9M
    RETURN_NOT_OK(entry_batch->Serialize());
784
13.9M
    Slice entry_batch_data = entry_batch->data();
785
2.43k
    LOG_IF(DFATAL, entry_batch_data.size() <= 0 && !entry_batch->IsMarker())
786
2.43k
        << "Cannot call DoAppend() with no data";
787
788
13.9M
    if (entry_batch->IsSingleEntryOfType(ROLLOVER_MARKER)) {
789
0
      VLOG_WITH_PREFIX_AND_FUNC(1) << "Got ROLLOVER_MARKER";
790
6.32k
      if (active_segment_ && footer_builder_.IsInitialized() && footer_builder_.num_entries() > 0) {
791
        // Active segment is not empty - rollover.
792
6.27k
        if (allocation_state() == SegmentAllocationState::kAllocationNotStarted) {
793
6.24k
          RETURN_NOT_OK(AsyncAllocateSegment());
794
6.24k
        }
795
6.27k
        return RollOver();
796
13.8M
      }
797
6.32k
    }
798
799
13.8M
    auto entry_batch_bytes = entry_batch->total_size_bytes();
800
    // If there is no data to write return OK.
801
13.8M
    if (PREDICT_FALSE(entry_batch_bytes == 0)) {
802
189
      return Status::OK();
803
189
    }
804
805
    // If the size of this entry overflows the current segment, get a new one.
806
13.8M
    if (allocation_state() == SegmentAllocationState::kAllocationNotStarted) {
807
13.8M
      if (active_segment_->Size() + entry_batch_bytes + kEntryHeaderSize > cur_max_segment_size_) {
808
1.16k
        LOG_WITH_PREFIX(INFO) << "Max segment size " << cur_max_segment_size_ << " reached. "
809
1.16k
                              << "Starting new segment allocation.";
810
1.16k
        RETURN_NOT_OK(AsyncAllocateSegment());
811
1.16k
        if (!options_.async_preallocate_segments) {
812
0
          RETURN_NOT_OK(RollOver());
813
0
        }
814
1.16k
      }
815
3.60k
    } else if (allocation_state() == SegmentAllocationState::kAllocationFinished) {
816
1.12k
      RETURN_NOT_OK(RollOver());
817
2.47k
    } else {
818
2.21k
      VLOG_WITH_PREFIX(1) << "Segment allocation already in progress...";
819
2.47k
    }
820
821
13.8M
    int64_t start_offset = active_segment_->written_offset();
822
823
13.8M
    LOG_SLOW_EXECUTION(WARNING, 50, "Append to log took a long time") {
824
13.8M
      SCOPED_LATENCY_METRIC(metrics_, append_latency);
825
13.8M
      LongOperationTracker long_operation_tracker(
826
13.8M
          "Log append", FLAGS_consensus_log_scoped_watch_delay_append_threshold_ms * 1ms);
827
828
13.8M
      RETURN_NOT_OK(active_segment_->WriteEntryBatch(entry_batch_data));
829
13.8M
    }
830
831
13.8M
    if (metrics_) {
832
13.8M
      metrics_->bytes_logged->IncrementBy(active_segment_->written_offset() - start_offset);
833
13.8M
    }
834
835
    // Populate the offset and sequence number for the entry batch if we did a WAL write.
836
13.8M
    entry_batch->offset_ = start_offset;
837
13.8M
    entry_batch->active_segment_sequence_number_ = active_segment_sequence_number_;
838
13.8M
  }
839
840
  // We keep track of the last-written OpId here. This is needed to initialize Consensus on
841
  // startup.
842
14.8M
  if (entry_batch->HasReplicateEntries()) {
843
8.88M
    last_appended_entry_op_id_ = entry_batch->MaxReplicateOpId();
844
8.88M
  }
845
846
14.8M
  CHECK_OK(UpdateIndexForBatch(*entry_batch));
847
14.8M
  UpdateFooterForBatch(entry_batch);
848
849
  // We expect the caller to free the actual entries if caller_owns_operation is set.
850
14.8M
  if (caller_owns_operation) {
851
22.5M
    for (int i = 0; i < entry_batch->entry_batch_pb_.entry_size(); i++) {
852
8.67M
      LogEntryPB* entry_pb = entry_batch->entry_batch_pb_.mutable_entry(i);
853
8.67M
      entry_pb->release_replicate();
854
8.67M
    }
855
13.8M
  }
856
857
14.8M
  return Status::OK();
858
14.8M
}
859
860
14.8M
Status Log::UpdateIndexForBatch(const LogEntryBatch& batch) {
861
14.8M
  if (batch.type_ != REPLICATE) {
862
0
    return Status::OK();
863
0
  }
864
865
14.8M
  for (const LogEntryPB& entry_pb : batch.entry_batch_pb_.entry()) {
866
9.65M
    LogIndexEntry index_entry;
867
868
9.65M
    index_entry.op_id = yb::OpId::FromPB(entry_pb.replicate().id());
869
9.65M
    index_entry.segment_sequence_number = batch.active_segment_sequence_number_;
870
9.65M
    index_entry.offset_in_segment = batch.offset_;
871
9.65M
    RETURN_NOT_OK(log_index_->AddEntry(index_entry));
872
9.65M
  }
873
14.8M
  return Status::OK();
874
14.8M
}
875
876
14.8M
void Log::UpdateFooterForBatch(LogEntryBatch* batch) {
877
14.8M
  footer_builder_.set_num_entries(footer_builder_.num_entries() + batch->count());
878
879
  // We keep track of the last-written OpId here.  This is needed to initialize Consensus on
880
  // startup.  We also retrieve the OpId of the first operation in the batch so that, if we roll
881
  // over to a new segment, we set the first operation in the footer immediately.
882
  // Update the index bounds for the current segment.
883
9.65M
  for (const LogEntryPB& entry_pb : batch->entry_batch_pb_.entry()) {
884
9.65M
    int64_t index = entry_pb.replicate().id().index();
885
9.65M
    if (!footer_builder_.has_min_replicate_index() ||
886
9.54M
        index < footer_builder_.min_replicate_index()) {
887
98.6k
      footer_builder_.set_min_replicate_index(index);
888
98.6k
      min_replicate_index_.store(index, std::memory_order_release);
889
98.6k
    }
890
9.65M
    if (!footer_builder_.has_max_replicate_index() ||
891
9.54M
        index > footer_builder_.max_replicate_index()) {
892
9.47M
      footer_builder_.set_max_replicate_index(index);
893
9.47M
    }
894
9.65M
  }
895
14.8M
}
896
897
6.32k
Status Log::AllocateSegmentAndRollOver() {
898
0
  VLOG_WITH_PREFIX_AND_FUNC(1) << "Start";
899
6.32k
  auto* reserved_entry_batch = ReserveMarker(ROLLOVER_MARKER);
900
6.32k
  Synchronizer s;
901
6.32k
  RETURN_NOT_OK(AsyncAppend(reserved_entry_batch, s.AsStatusCallback()));
902
6.32k
  return s.Wait();
903
6.32k
}
904
905
177k
Status Log::EnsureInitialNewSegmentAllocated() {
906
177k
  if (log_state_ == LogState::kLogWriting) {
907
    // New segment already created.
908
87.4k
    return Status::OK();
909
87.4k
  }
910
89.5k
  if (log_state_ != LogState::kLogInitialized) {
911
0
    return STATUS_FORMAT(
912
0
        IllegalState, "Unexpected log state in EnsureInitialNewSegmentAllocated: $0", log_state_);
913
0
  }
914
89.5k
  RETURN_NOT_OK(AsyncAllocateSegment());
915
89.5k
  RETURN_NOT_OK(allocation_status_.Get());
916
89.5k
  RETURN_NOT_OK(SwitchToAllocatedSegment());
917
918
89.5k
  RETURN_NOT_OK(appender_->Init());
919
89.5k
  log_state_ = LogState::kLogWriting;
920
89.5k
  return Status::OK();
921
89.5k
}
922
923
13.9M
Status Log::Sync() {
924
13.9M
  TRACE_EVENT0("log", "Sync");
925
13.9M
  SCOPED_LATENCY_METRIC(metrics_, sync_latency);
926
927
13.9M
  if (!sync_disabled_) {
928
13.9M
    if (PREDICT_FALSE(GetAtomicFlag(&FLAGS_log_inject_latency))) {
929
47.9k
      Random r(static_cast<uint32_t>(GetCurrentTimeMicros()));
930
47.9k
      int sleep_ms = r.Normal(GetAtomicFlag(&FLAGS_log_inject_latency_ms_mean),
931
47.9k
                              GetAtomicFlag(&FLAGS_log_inject_latency_ms_stddev));
932
47.9k
      if (sleep_ms > 0) {
933
47.5k
        LOG_WITH_PREFIX(INFO) << "Injecting " << sleep_ms << "ms of latency in Log::Sync()";
934
47.5k
        SleepFor(MonoDelta::FromMilliseconds(sleep_ms));
935
47.5k
      }
936
47.9k
    }
937
938
13.9M
    bool timed_or_data_limit_sync = false;
939
13.9M
    if (!durable_wal_write_ && periodic_sync_needed_.load()) {
940
13.8M
      if (interval_durable_wal_write_) {
941
13.8M
        if (MonoTime::Now() > periodic_sync_earliest_unsync_entry_time_
942
270k
            + interval_durable_wal_write_) {
943
270k
          timed_or_data_limit_sync = true;
944
270k
        }
945
13.8M
      }
946
13.8M
      if (bytes_durable_wal_write_mb_ > 0) {
947
13.8M
        if (periodic_sync_unsynced_bytes_ >= bytes_durable_wal_write_mb_ * 1_MB) {
948
1.92k
          timed_or_data_limit_sync = true;
949
1.92k
        }
950
13.8M
      }
951
13.8M
    }
952
953
13.9M
    if (durable_wal_write_ || timed_or_data_limit_sync) {
954
277k
      periodic_sync_needed_.store(false);
955
277k
      periodic_sync_unsynced_bytes_ = 0;
956
277k
      LOG_SLOW_EXECUTION(WARNING, 50, "Fsync log took a long time") {
957
277k
        RETURN_NOT_OK(active_segment_->Sync());
958
277k
      }
959
277k
    }
960
13.9M
  }
961
962
  // Update the reader on how far it can read the active segment.
963
13.9M
  reader_->UpdateLastSegmentOffset(active_segment_->written_offset());
964
965
13.9M
  {
966
13.9M
    std::lock_guard<std::mutex> write_lock(last_synced_entry_op_id_mutex_);
967
13.9M
    last_synced_entry_op_id_.store(last_appended_entry_op_id_, boost::memory_order_release);
968
13.9M
    last_synced_entry_op_id_cond_.notify_all();
969
13.9M
  }
970
971
13.9M
  return Status::OK();
972
13.9M
}
973
974
6.61M
Status Log::GetSegmentsToGCUnlocked(int64_t min_op_idx, SegmentSequence* segments_to_gc) const {
975
  // For the lifetime of a Log::CopyTo call, log_copy_min_index_ may be set to something
976
  // other than std::numeric_limits<int64_t>::max(). This value will correspond to the
977
  // minimum op_idx which is currently being copied and must be retained. In order to
978
  // avoid concurrently deleting those ops, we bump min_op_idx here to be at-least as
979
  // low as log_copy_min_index_.
980
6.61M
  min_op_idx = std::min(log_copy_min_index_, min_op_idx);
981
  // Find the prefix of segments in the segment sequence that is guaranteed not to include
982
  // 'min_op_idx'.
983
6.61M
  RETURN_NOT_OK(reader_->GetSegmentPrefixNotIncluding(
984
6.61M
      min_op_idx, cdc_min_replicated_index_.load(std::memory_order_acquire), segments_to_gc));
985
986
6.61M
  auto max_to_delete = std::max<ssize_t>(
987
6.61M
      reader_->num_segments() - FLAGS_log_min_segments_to_retain, 0);
988
6.61M
  ssize_t segments_to_gc_size = segments_to_gc->size();
989
6.61M
  if (segments_to_gc_size > max_to_delete) {
990
0
    VLOG_WITH_PREFIX(2)
991
0
        << "GCing " << segments_to_gc_size << " in " << wal_dir_
992
0
        << " would not leave enough remaining segments to satisfy minimum "
993
0
        << "retention requirement. Only considering "
994
0
        << max_to_delete << "/" << reader_->num_segments();
995
101k
    segments_to_gc->resize(max_to_delete);
996
6.51M
  } else if (segments_to_gc_size < max_to_delete) {
997
73.8k
    auto extra_segments = max_to_delete - segments_to_gc_size;
998
0
    VLOG_WITH_PREFIX(2) << "Too many log segments, need to GC " << extra_segments << " more.";
999
73.8k
  }
1000
1001
  // Don't GC segments that are newer than the configured time-based retention.
1002
6.61M
  int64_t now = GetCurrentTimeMicros() + FLAGS_time_based_wal_gc_clock_delta_usec;
1003
1004
6.61M
  for (size_t i = 0; i < segments_to_gc->size(); i++) {
1005
70.8k
    const scoped_refptr<ReadableLogSegment>& segment = (*segments_to_gc)[i];
1006
1007
    // Segments here will always have a footer, since we don't return the in-progress segment up
1008
    // above. However, segments written by older YB builds may not have the timestamp info (TODO:
1009
    // make sure we indeed care about these old builds). In that case, we're allowed to GC them.
1010
70.8k
    if (!segment->footer().has_close_timestamp_micros()) continue;
1011
1012
70.8k
    int64_t age_seconds = (now - segment->footer().close_timestamp_micros()) / 1000000;
1013
70.8k
    if (age_seconds < wal_retention_secs()) {
1014
0
      VLOG_WITH_PREFIX(2)
1015
0
          << "Segment " << segment->path() << " is only " << age_seconds << "s old: "
1016
0
          << "cannot GC it yet due to configured time-based retention policy.";
1017
      // Truncate the list of segments to GC here -- if this one is too new, then all later ones are
1018
      // also too new.
1019
70.7k
      segments_to_gc->resize(i);
1020
70.7k
      break;
1021
70.7k
    }
1022
70.8k
  }
1023
1024
6.61M
  return Status::OK();
1025
6.61M
}
1026
1027
Status Log::Append(LogEntryPB* phys_entry,
1028
                   LogEntryMetadata entry_metadata,
1029
975k
                   bool skip_wal_write) {
1030
975k
  LogEntryBatchPB entry_batch_pb;
1031
975k
  if (entry_metadata.entry_time != RestartSafeCoarseTimePoint()) {
1032
975k
    entry_batch_pb.set_mono_time(entry_metadata.entry_time.ToUInt64());
1033
975k
  }
1034
1035
975k
  entry_batch_pb.mutable_entry()->AddAllocated(phys_entry);
1036
975k
  LogEntryBatch entry_batch(phys_entry->type(), std::move(entry_batch_pb));
1037
  // Mark this as reserved, as we're building it from preallocated data.
1038
975k
  entry_batch.state_ = LogEntryBatch::kEntryReserved;
1039
  // Ready assumes the data is reserved before it is ready.
1040
975k
  entry_batch.MarkReady();
1041
975k
  if (skip_wal_write) {
1042
    // Get the LogIndex entry from read path metadata.
1043
975k
    entry_batch.offset_ = entry_metadata.offset;
1044
975k
    entry_batch.active_segment_sequence_number_ = entry_metadata.active_segment_sequence_number;
1045
975k
  }
1046
975k
  Status s = DoAppend(&entry_batch, false, skip_wal_write);
1047
975k
  if (s.ok() && !skip_wal_write) {
1048
    // Only sync if we actually performed a wal write.
1049
0
    s = Sync();
1050
0
  }
1051
975k
  entry_batch.entry_batch_pb_.mutable_entry()->ExtractSubrange(0, 1, nullptr);
1052
975k
  return s;
1053
975k
}
1054
1055
6.46k
LogEntryBatch* Log::ReserveMarker(LogEntryTypePB type) {
1056
6.46k
  LogEntryBatchPB entry_batch;
1057
6.46k
  entry_batch.add_entry()->set_type(type);
1058
6.46k
  LogEntryBatch* reserved_entry_batch;
1059
6.46k
  Reserve(type, &entry_batch, &reserved_entry_batch);
1060
6.46k
  return reserved_entry_batch;
1061
6.46k
}
1062
1063
144
Status Log::WaitUntilAllFlushed() {
1064
  // In order to make sure we empty the queue we need to use the async API.
1065
144
  auto* reserved_entry_batch = ReserveMarker(FLUSH_MARKER);
1066
144
  Synchronizer s;
1067
144
  RETURN_NOT_OK(AsyncAppend(reserved_entry_batch, s.AsStatusCallback()));
1068
144
  return s.Wait();
1069
144
}
1070
1071
181k
void Log::set_wal_retention_secs(uint32_t wal_retention_secs) {
1072
181k
  LOG_WITH_PREFIX(INFO) << "Setting log wal retention time to " << wal_retention_secs << " seconds";
1073
181k
  wal_retention_secs_.store(wal_retention_secs, std::memory_order_release);
1074
181k
}
1075
1076
70.8k
uint32_t Log::wal_retention_secs() const {
1077
70.8k
  uint32_t wal_retention_secs = wal_retention_secs_.load(std::memory_order_acquire);
1078
70.8k
  auto flag_wal_retention = ANNOTATE_UNPROTECTED_READ(FLAGS_log_min_seconds_to_retain);
1079
70.8k
  return flag_wal_retention > 0 ?
1080
70.7k
      std::max(wal_retention_secs, static_cast<uint32_t>(flag_wal_retention)) :
1081
88
      wal_retention_secs;
1082
70.8k
}
1083
1084
6.79M
yb::OpId Log::GetLatestEntryOpId() const {
1085
6.79M
  return last_synced_entry_op_id_.load(boost::memory_order_acquire);
1086
6.79M
}
1087
1088
302
int64_t Log::GetMinReplicateIndex() const {
1089
302
  return min_replicate_index_.load(std::memory_order_acquire);
1090
302
}
1091
1092
7.47M
yb::OpId Log::WaitForSafeOpIdToApply(const yb::OpId& min_allowed, MonoDelta duration) {
1093
7.47M
  if (FLAGS_TEST_log_consider_all_ops_safe || all_op_ids_safe_) {
1094
39
    return min_allowed;
1095
39
  }
1096
1097
7.47M
  auto result = last_synced_entry_op_id_.load(boost::memory_order_acquire);
1098
1099
7.47M
  if (result < min_allowed) {
1100
4.40M
    auto start = CoarseMonoClock::Now();
1101
4.40M
    std::unique_lock<std::mutex> lock(last_synced_entry_op_id_mutex_);
1102
4.23M
    auto wait_time = duration ? duration.ToSteadyDuration()
1103
173k
                              : FLAGS_wait_for_safe_op_id_to_apply_default_timeout_ms * 1ms;
1104
4.40M
    for (;;) {
1105
4.40M
      if (last_synced_entry_op_id_cond_.wait_for(
1106
8.81M
              lock, wait_time, [this, min_allowed, &result] {
1107
8.81M
            result = last_synced_entry_op_id_.load(boost::memory_order_acquire);
1108
8.81M
            return result >= min_allowed;
1109
4.40M
      })) {
1110
4.40M
        break;
1111
4.40M
      }
1112
18.4E
      if (duration) {
1113
5
        return yb::OpId();
1114
5
      }
1115
      // TODO(bogdan): If the log is closed at this point, consider refactoring to return status
1116
      // and fail cleanly.
1117
18.4E
      LOG_WITH_PREFIX(ERROR) << "Appender stack: " << appender_->GetRunThreadStack();
1118
18.4E
      LOG_WITH_PREFIX(DFATAL)
1119
18.4E
          << "Long wait for safe op id: " << min_allowed
1120
18.4E
          << ", current: " << GetLatestEntryOpId()
1121
18.4E
          << ", last appended: " << last_appended_entry_op_id_
1122
18.4E
          << ", last submitted: " << last_submitted_op_id_
1123
18.4E
          << ", appender: " << appender_->ToString()
1124
18.4E
          << ", passed: " << MonoDelta(CoarseMonoClock::Now() - start);
1125
18.4E
    }
1126
4.40M
  }
1127
1128
0
  DCHECK_GE(result.term, min_allowed.term)
1129
0
      << "result: " << result << ", min_allowed: " << min_allowed;
1130
7.47M
  return result;
1131
7.47M
}
1132
1133
2.49k
Status Log::GC(int64_t min_op_idx, int32_t* num_gced) {
1134
2.49k
  CHECK_GE(min_op_idx, 0);
1135
1136
2.49k
  LOG_WITH_PREFIX(INFO) << "Running Log GC on " << wal_dir_ << ": retaining ops >= " << min_op_idx
1137
2.49k
                        << ", log segment size = " << options_.segment_size_bytes;
1138
2.49k
  VLOG_TIMING(1, "Log GC") {
1139
2.49k
    SegmentSequence segments_to_delete;
1140
1141
2.49k
    {
1142
2.49k
      std::lock_guard<percpu_rwlock> l(state_lock_);
1143
2.49k
      CHECK_EQ(kLogWriting, log_state_);
1144
1145
2.49k
      RETURN_NOT_OK(GetSegmentsToGCUnlocked(min_op_idx, &segments_to_delete));
1146
1147
2.49k
      if (segments_to_delete.size() == 0) {
1148
0
        VLOG_WITH_PREFIX(1) << "No segments to delete.";
1149
2.45k
        *num_gced = 0;
1150
2.45k
        return Status::OK();
1151
2.45k
      }
1152
      // Trim the prefix of segments from the reader so that they are no longer referenced by the
1153
      // log.
1154
41
      RETURN_NOT_OK(reader_->TrimSegmentsUpToAndIncluding(
1155
41
          segments_to_delete[segments_to_delete.size() - 1]->header().sequence_number()));
1156
41
    }
1157
1158
    // Now that they are no longer referenced by the Log, delete the files.
1159
41
    *num_gced = 0;
1160
87
    for (const scoped_refptr<ReadableLogSegment>& segment : segments_to_delete) {
1161
87
      LOG_WITH_PREFIX(INFO) << "Deleting log segment in path: " << segment->path()
1162
87
                            << " (GCed ops < " << segment->footer().max_replicate_index() + 1
1163
87
                            << ")";
1164
87
      RETURN_NOT_OK(get_env()->DeleteFile(segment->path()));
1165
87
      (*num_gced)++;
1166
1167
87
      if (metrics_) {
1168
87
        metrics_->wal_size->IncrementBy(-1 * segment->file_size());
1169
87
      }
1170
87
    }
1171
1172
    // Determine the minimum remaining replicate index in order to properly GC the index chunks.
1173
41
    int64_t min_remaining_op_idx = reader_->GetMinReplicateIndex();
1174
41
    if (min_remaining_op_idx > 0) {
1175
41
      log_index_->GC(min_remaining_op_idx);
1176
41
    }
1177
41
  }
1178
41
  return Status::OK();
1179
2.49k
}
1180
1181
6.61M
Status Log::GetGCableDataSize(int64_t min_op_idx, int64_t* total_size) const {
1182
6.61M
  if (min_op_idx < 0) {
1183
0
    return STATUS_FORMAT(InvalidArgument, "Invalid min op index $0", min_op_idx);
1184
0
  }
1185
1186
6.61M
  SegmentSequence segments_to_delete;
1187
6.61M
  *total_size = 0;
1188
6.61M
  {
1189
6.61M
    SharedLock<rw_spinlock> read_lock(state_lock_.get_lock());
1190
6.61M
    if (log_state_ != kLogWriting) {
1191
0
      return STATUS_FORMAT(IllegalState, "Invalid log state $0, expected $1",
1192
0
          log_state_, kLogWriting);
1193
0
    }
1194
6.61M
    Status s = GetSegmentsToGCUnlocked(min_op_idx, &segments_to_delete);
1195
1196
6.61M
    if (!s.ok() || segments_to_delete.size() == 0) {
1197
6.61M
      return Status::OK();
1198
6.61M
    }
1199
2
  }
1200
2
  for (const scoped_refptr<ReadableLogSegment>& segment : segments_to_delete) {
1201
1
    *total_size += segment->file_size();
1202
1
  }
1203
2
  return Status::OK();
1204
2
}
1205
1206
21.4M
LogReader* Log::GetLogReader() const {
1207
21.4M
  return reader_.get();
1208
21.4M
}
1209
1210
3.17k
Status Log::GetSegmentsSnapshot(SegmentSequence* segments) const {
1211
3.17k
  SharedLock<rw_spinlock> read_lock(state_lock_.get_lock());
1212
3.17k
  if (!reader_) {
1213
0
    return STATUS(IllegalState, "Log already closed");
1214
0
  }
1215
1216
3.17k
  return reader_->GetSegmentsSnapshot(segments);
1217
3.17k
}
1218
1219
302k
uint64_t Log::OnDiskSize() {
1220
302k
  SegmentSequence segments;
1221
302k
  {
1222
302k
    shared_lock<rw_spinlock> l(state_lock_.get_lock());
1223
    // If the log is closed, the tablet is either being deleted or tombstoned,
1224
    // so we don't count the size of its log anymore as it should be deleted.
1225
302k
    if (log_state_ == kLogClosed || !reader_->GetSegmentsSnapshot(&segments).ok()) {
1226
56
      return on_disk_size_.load();
1227
56
    }
1228
302k
  }
1229
302k
  uint64_t ret = 0;
1230
336k
  for (const auto& segment : segments) {
1231
336k
    ret += segment->file_size();
1232
336k
  }
1233
1234
302k
  on_disk_size_.store(ret, std::memory_order_release);
1235
302k
  return ret;
1236
302k
}
1237
1238
void Log::SetSchemaForNextLogSegment(const Schema& schema,
1239
48.3k
                                     uint32_t version) {
1240
48.3k
  std::lock_guard<rw_spinlock> l(schema_lock_);
1241
48.3k
  *schema_ = schema;
1242
48.3k
  schema_version_ = version;
1243
48.3k
}
1244
1245
95.7k
Status Log::Close() {
1246
95.7k
  if (PREDICT_FALSE(FLAGS_TEST_simulate_abrupt_server_restart)) {
1247
0
    return Status::OK();
1248
0
  }
1249
  // Allocation pool is used from appender pool, so we should shutdown appender first.
1250
95.7k
  appender_->Shutdown();
1251
95.7k
  allocation_token_.reset();
1252
1253
95.7k
  std::lock_guard<percpu_rwlock> l(state_lock_);
1254
95.7k
  switch (log_state_) {
1255
48.6k
    case kLogWriting:
1256
48.6k
      RETURN_NOT_OK(Sync());
1257
48.6k
      RETURN_NOT_OK(CloseCurrentSegment());
1258
48.6k
      RETURN_NOT_OK(ReplaceSegmentInReaderUnlocked());
1259
48.2k
      log_state_ = kLogClosed;
1260
361
      VLOG_WITH_PREFIX(1) << "Log closed";
1261
1262
      // Release FDs held by these objects.
1263
48.2k
      log_index_.reset();
1264
48.2k
      reader_.reset();
1265
1266
48.2k
      return Status::OK();
1267
1268
47.0k
    case kLogClosed:
1269
6
      VLOG_WITH_PREFIX(1) << "Log already closed";
1270
47.0k
      return Status::OK();
1271
1272
3
    default:
1273
3
      return STATUS(IllegalState, Substitute("Bad state for Close() $0", log_state_));
1274
95.7k
  }
1275
95.7k
}
1276
1277
3.75k
size_t Log::num_segments() const {
1278
3.75k
  std::shared_lock<rw_spinlock> read_lock(state_lock_.get_lock());
1279
3.75k
  return reader_ ? reader_->num_segments() : 0;
1280
3.75k
}
1281
1282
2.24k
scoped_refptr<ReadableLogSegment> Log::GetSegmentBySequenceNumber(int64_t seq) const {
1283
2.24k
  SharedLock<rw_spinlock> read_lock(state_lock_.get_lock());
1284
2.24k
  if (!reader_) {
1285
0
    return nullptr;
1286
0
  }
1287
1288
2.24k
  return reader_->GetSegmentBySequenceNumber(seq);
1289
2.24k
}
1290
1291
2
bool Log::HasOnDiskData(FsManager* fs_manager, const string& wal_dir) {
1292
2
  return fs_manager->env()->FileExists(wal_dir);
1293
2
}
1294
1295
Status Log::DeleteOnDiskData(Env* env,
1296
                             const string& tablet_id,
1297
                             const string& wal_dir,
1298
47.7k
                             const string& peer_uuid) {
1299
47.7k
  if (!env->FileExists(wal_dir)) {
1300
120
    return Status::OK();
1301
120
  }
1302
47.6k
  LOG(INFO) << "T " << tablet_id << " P " << peer_uuid
1303
47.6k
            << ": Deleting WAL dir " << wal_dir;
1304
47.6k
  RETURN_NOT_OK_PREPEND(env->DeleteRecursively(wal_dir),
1305
47.6k
                        "Unable to recursively delete WAL dir for tablet " + tablet_id);
1306
47.6k
  return Status::OK();
1307
47.6k
}
1308
1309
44
Status Log::FlushIndex() {
1310
44
  if (!log_index_) {
1311
0
    return Status::OK();
1312
0
  }
1313
44
  return log_index_->Flush();
1314
44
}
1315
1316
117
Status Log::CopyTo(const std::string& dest_wal_dir) {
1317
  // We mainly need log_copy_mutex_ to simplify managing of log_copy_min_index_.
1318
117
  std::lock_guard<decltype(log_copy_mutex_)> log_copy_lock(log_copy_mutex_);
1319
117
  auto se = ScopeExit([this]() {
1320
117
    std::lock_guard<percpu_rwlock> l(state_lock_);
1321
117
    log_copy_min_index_ = std::numeric_limits<int64_t>::max();
1322
117
  });
1323
1324
117
  SegmentSequence segments;
1325
117
  scoped_refptr<LogIndex> log_index;
1326
117
  {
1327
117
    UniqueLock<percpu_rwlock> l(state_lock_);
1328
117
    if (log_state_ != kLogInitialized) {
1329
115
      SCHECK_EQ(log_state_, kLogWriting, IllegalState, Format("Invalid log state: $0", log_state_));
1330
115
      ReverseLock<decltype(l)> rlock(l);
1331
      // Rollover current active segment if it is not empty.
1332
115
      RETURN_NOT_OK(AllocateSegmentAndRollOver());
1333
115
    }
1334
1335
117
    SCHECK(
1336
117
        log_state_ == kLogInitialized || log_state_ == kLogWriting, IllegalState,
1337
117
        Format("Invalid log state: $0", log_state_));
1338
    // Remember log_index, because it could be reset if someone closes the log after we release
1339
    // state_lock_.
1340
117
    log_index = log_index_;
1341
117
    RETURN_NOT_OK(reader_->GetSegmentsSnapshot(&segments));
1342
1343
    // We skip the last snapshot segment because it might be mutable and is either:
1344
    // 1) A segment that was empty when we tried to rollover at the beginning of the
1345
    // function.
1346
    // 2) A segment that was created after calling AllocateSegmentAndRollOver above (or
1347
    // created even after by concurrent operations).
1348
    // In both cases segments in snapshot prior to the last one contain all operations that
1349
    // were present in log before calling Log::CopyTo and not yet GCed.
1350
117
    segments.pop_back();
1351
    // At this point all segments in `segments` are closed and immutable.
1352
1353
    // Looking for first non-empty segment.
1354
117
    auto it =
1355
117
        std::find_if(segments.begin(), segments.end(), [](const ReadableLogSegmentPtr& segment) {
1356
          // Check whether segment is not empty.
1357
117
          return segment->readable_up_to() > segment->first_entry_offset();
1358
117
        });
1359
117
    if (it != segments.end()) {
1360
      // We've found first non-empty segment to copy, set an anchor for Log GC.
1361
117
      log_copy_min_index_ = VERIFY_RESULT((*it)->ReadFirstEntryMetadata()).op_id.index;
1362
117
    }
1363
117
  }
1364
1365
117
  RETURN_NOT_OK_PREPEND(env_util::CreateDirIfMissing(options_.env, dest_wal_dir),
1366
117
                        Format("Failed to create tablet WAL dir $0", dest_wal_dir));
1367
1368
117
  RETURN_NOT_OK(log_index->Flush());
1369
1370
117
  auto* const env = options_.env;
1371
1372
117
  {
1373
596
    for (const auto& segment : segments) {
1374
596
      const auto sequence_number = segment->header().sequence_number();
1375
596
      const auto file_name = FsManager::GetWalSegmentFileName(sequence_number);
1376
596
      const auto src_path = JoinPathSegments(wal_dir_, file_name);
1377
596
      SCHECK_EQ(src_path, segment->path(), InternalError, "Log segment path does not match");
1378
596
      const auto dest_path = JoinPathSegments(dest_wal_dir, file_name);
1379
1380
596
      RETURN_NOT_OK(env->LinkFile(src_path, dest_path));
1381
0
      VLOG_WITH_PREFIX(1) << Format("Hard linked $0 to $1", src_path, dest_path);
1382
596
    }
1383
117
  }
1384
1385
117
  const auto files = VERIFY_RESULT(env->GetChildren(wal_dir_, ExcludeDots::kTrue));
1386
1387
830
  for (const auto& file : files) {
1388
830
    const auto src_path = JoinPathSegments(wal_dir_, file);
1389
830
    const auto dest_path = JoinPathSegments(dest_wal_dir, file);
1390
1391
830
    if (FsManager::IsWalSegmentFileName(file)) {
1392
      // Already processed above.
1393
117
    } else if (!boost::starts_with(file, kSegmentPlaceholderFilePrefix)) {
1394
117
      RETURN_NOT_OK_PREPEND(
1395
117
          CopyFile(env, src_path, dest_path),
1396
117
          Format("Failed to copy file $0 to $1", src_path, dest_path));
1397
0
      VLOG_WITH_PREFIX(1) << Format("Copied $0 to $1", src_path, dest_path);
1398
117
    }
1399
830
  }
1400
117
  return Status::OK();
1401
117
}
1402
1403
193k
uint64_t Log::NextSegmentDesiredSize() {
1404
193k
  return std::min(cur_max_segment_size_ * 2, max_segment_size_);
1405
193k
}
1406
1407
97.0k
Status Log::PreAllocateNewSegment() {
1408
97.0k
  TRACE_EVENT1("log", "PreAllocateNewSegment", "file", next_segment_path_);
1409
97.0k
  CHECK_EQ(allocation_state(), SegmentAllocationState::kAllocationInProgress);
1410
1411
97.0k
  WritableFileOptions opts;
1412
  // We always want to sync on close: https://github.com/yugabyte/yugabyte-db/issues/3490
1413
97.0k
  opts.sync_on_close = true;
1414
97.0k
  opts.o_direct = durable_wal_write_;
1415
97.0k
  RETURN_NOT_OK(CreatePlaceholderSegment(opts, &next_segment_path_, &next_segment_file_));
1416
1417
97.0k
  if (options_.preallocate_segments) {
1418
97.0k
    uint64_t next_segment_size = NextSegmentDesiredSize();
1419
97.0k
    TRACE("Preallocating $0 byte segment in $1", next_segment_size, next_segment_path_);
1420
    // TODO (perf) zero the new segments -- this could result in additional performance
1421
    // improvements.
1422
97.0k
    RETURN_NOT_OK(next_segment_file_->PreAllocate(next_segment_size));
1423
97.0k
  }
1424
1425
97.0k
  allocation_state_.store(SegmentAllocationState::kAllocationFinished, std::memory_order_release);
1426
97.0k
  return Status::OK();
1427
97.0k
}
1428
1429
97.0k
Status Log::SwitchToAllocatedSegment() {
1430
97.0k
  CHECK_EQ(allocation_state(), SegmentAllocationState::kAllocationFinished);
1431
1432
  // Increment "next" log segment seqno.
1433
97.0k
  active_segment_sequence_number_++;
1434
97.0k
  const string new_segment_path =
1435
97.0k
      FsManager::GetWalSegmentFilePath(wal_dir_, active_segment_sequence_number_);
1436
1437
97.0k
  RETURN_NOT_OK(get_env()->RenameFile(next_segment_path_, new_segment_path));
1438
97.0k
  RETURN_NOT_OK(get_env()->SyncDir(wal_dir_));
1439
1440
97.0k
  int64_t fault_after_min_replicate_index =
1441
97.0k
      FLAGS_TEST_log_fault_after_segment_allocation_min_replicate_index;
1442
97.0k
  if (PREDICT_FALSE(fault_after_min_replicate_index)) {
1443
11
    if (reader_->GetMinReplicateIndex() >= fault_after_min_replicate_index) {
1444
1
      MAYBE_FAULT(1.0);
1445
1
    }
1446
11
  }
1447
1448
  // Create a new segment.
1449
97.0k
  std::unique_ptr<WritableLogSegment> new_segment(
1450
97.0k
      new WritableLogSegment(new_segment_path, next_segment_file_));
1451
1452
  // Set up the new header and footer.
1453
97.0k
  LogSegmentHeaderPB header;
1454
97.0k
  header.set_major_version(kLogMajorVersion);
1455
97.0k
  header.set_minor_version(kLogMinorVersion);
1456
97.0k
  header.set_sequence_number(active_segment_sequence_number_);
1457
97.0k
  header.set_unused_tablet_id(tablet_id_);
1458
1459
  // Set up the new footer. This will be maintained as the segment is written.
1460
97.0k
  footer_builder_.Clear();
1461
97.0k
  footer_builder_.set_num_entries(0);
1462
1463
  // Set the new segment's schema.
1464
97.0k
  {
1465
97.0k
    SharedLock<decltype(schema_lock_)> l(schema_lock_);
1466
97.0k
    SchemaToPB(*schema_, header.mutable_unused_schema());
1467
97.0k
    header.set_unused_schema_version(schema_version_);
1468
97.0k
  }
1469
1470
97.0k
  RETURN_NOT_OK(new_segment->WriteHeaderAndOpen(header));
1471
  // Transform the currently-active segment into a readable one, since we need to be able to replay
1472
  // the segments for other peers.
1473
97.0k
  {
1474
97.0k
    if (active_segment_.get() != nullptr) {
1475
7.40k
      std::lock_guard<decltype(state_lock_)> l(state_lock_);
1476
7.40k
      CHECK_OK(ReplaceSegmentInReaderUnlocked());
1477
7.40k
    }
1478
97.0k
  }
1479
1480
  // Open the segment we just created in readable form and add it to the reader.
1481
97.0k
  std::unique_ptr<RandomAccessFile> readable_file;
1482
97.0k
  RETURN_NOT_OK(get_env()->NewRandomAccessFile(new_segment_path, &readable_file));
1483
1484
97.0k
  scoped_refptr<ReadableLogSegment> readable_segment(
1485
97.0k
    new ReadableLogSegment(new_segment_path,
1486
97.0k
                           shared_ptr<RandomAccessFile>(readable_file.release())));
1487
97.0k
  RETURN_NOT_OK(readable_segment->Init(header, new_segment->first_entry_offset()));
1488
97.0k
  RETURN_NOT_OK(reader_->AppendEmptySegment(readable_segment));
1489
1490
  // Now set 'active_segment_' to the new segment.
1491
97.0k
  active_segment_ = std::move(new_segment);
1492
97.0k
  cur_max_segment_size_ = NextSegmentDesiredSize();
1493
1494
97.0k
  allocation_state_.store(
1495
97.0k
      SegmentAllocationState::kAllocationNotStarted, std::memory_order_release);
1496
1497
97.0k
  return Status::OK();
1498
97.0k
}
1499
1500
55.9k
Status Log::ReplaceSegmentInReaderUnlocked() {
1501
  // We should never switch to a new segment if we wrote nothing to the old one.
1502
55.9k
  CHECK(active_segment_->IsClosed());
1503
55.9k
  shared_ptr<RandomAccessFile> readable_file;
1504
55.9k
  RETURN_NOT_OK(OpenFileForRandom(
1505
55.9k
      get_env(), active_segment_->path(), &readable_file));
1506
1507
55.5k
  scoped_refptr<ReadableLogSegment> readable_segment(
1508
55.5k
      new ReadableLogSegment(active_segment_->path(), readable_file));
1509
  // Note: active_segment_->header() will only contain an initialized PB if we wrote the header out.
1510
55.5k
  RETURN_NOT_OK(readable_segment->Init(active_segment_->header(),
1511
55.5k
                                       active_segment_->footer(),
1512
55.5k
                                       active_segment_->first_entry_offset()));
1513
1514
55.5k
  return reader_->ReplaceLastSegment(readable_segment);
1515
55.5k
}
1516
1517
Status Log::CreatePlaceholderSegment(const WritableFileOptions& opts,
1518
                                     string* result_path,
1519
97.0k
                                     shared_ptr<WritableFile>* out) {
1520
97.0k
  string path_tmpl = JoinPathSegments(wal_dir_, kSegmentPlaceholderFileTemplate);
1521
9
  VLOG_WITH_PREFIX(2) << "Creating temp. file for place holder segment, template: " << path_tmpl;
1522
97.0k
  std::unique_ptr<WritableFile> segment_file;
1523
97.0k
  RETURN_NOT_OK(get_env()->NewTempWritableFile(opts,
1524
97.0k
                                               path_tmpl,
1525
97.0k
                                               result_path,
1526
97.0k
                                               &segment_file));
1527
13
  VLOG_WITH_PREFIX(1) << "Created next WAL segment, placeholder path: " << *result_path;
1528
97.0k
  out->reset(segment_file.release());
1529
97.0k
  return Status::OK();
1530
97.0k
}
1531
1532
2.24k
uint64_t Log::active_segment_sequence_number() const {
1533
2.24k
  return active_segment_sequence_number_;
1534
2.24k
}
1535
1536
13
Status Log::TEST_SubmitFuncToAppendToken(const std::function<void()>& func) {
1537
13
  return appender_->TEST_SubmitFunc(func);
1538
13
}
1539
1540
99
Status Log::ResetLastSyncedEntryOpId(const OpId& op_id) {
1541
99
  RETURN_NOT_OK(WaitUntilAllFlushed());
1542
1543
99
  OpId old_value;
1544
99
  {
1545
99
    std::lock_guard<std::mutex> write_lock(last_synced_entry_op_id_mutex_);
1546
99
    old_value = last_synced_entry_op_id_.load(boost::memory_order_acquire);
1547
99
    last_synced_entry_op_id_.store(op_id, boost::memory_order_release);
1548
99
    last_synced_entry_op_id_cond_.notify_all();
1549
99
  }
1550
99
  LOG_WITH_PREFIX(INFO) << "Reset last synced entry op id from " << old_value << " to " << op_id;
1551
1552
99
  return Status::OK();
1553
99
}
1554
1555
47.9k
Log::~Log() {
1556
47.9k
  WARN_NOT_OK(Close(), "Error closing log");
1557
47.9k
}
1558
1559
// ------------------------------------------------------------------------------------------------
1560
// LogEntryBatch
1561
1562
LogEntryBatch::LogEntryBatch(LogEntryTypePB type, LogEntryBatchPB&& entry_batch_pb)
1563
    : type_(type),
1564
      entry_batch_pb_(std::move(entry_batch_pb)),
1565
14.8M
      count_(entry_batch_pb_.entry().size()) {
1566
14.8M
  if (!IsMarkerType(type_)) {
1567
14.8M
    DCHECK_NE(entry_batch_pb_.mono_time(), 0);
1568
14.8M
  }
1569
14.8M
}
1570
1571
14.8M
LogEntryBatch::~LogEntryBatch() {
1572
  // ReplicateMsg objects are pointed to by LogEntryBatchPB but are really owned by shared pointers
1573
  // in replicates_. To avoid double freeing, release them from the protobuf.
1574
8.68M
  for (auto& entry : *entry_batch_pb_.mutable_entry()) {
1575
8.68M
    if (entry.has_replicate()) {
1576
0
      entry.release_replicate();
1577
0
    }
1578
8.68M
  }
1579
14.8M
}
1580
1581
13.8M
void LogEntryBatch::MarkReserved() {
1582
13.8M
  DCHECK_EQ(state_, kEntryInitialized);
1583
13.8M
  state_ = kEntryReserved;
1584
13.8M
}
1585
1586
13.9M
bool LogEntryBatch::IsMarker() const {
1587
13.9M
  return count() == 1 && IsMarkerType(entry_batch_pb_.entry(0).type());
1588
13.9M
}
1589
1590
13.9M
bool LogEntryBatch::IsSingleEntryOfType(LogEntryTypePB type) const {
1591
13.9M
  return count() == 1 && entry_batch_pb_.entry(0).type() == type;
1592
13.9M
}
1593
1594
13.9M
Status LogEntryBatch::Serialize() {
1595
13.9M
  DCHECK_EQ(state_, kEntryReady);
1596
13.9M
  buffer_.clear();
1597
  // *_MARKER LogEntries are markers and are not serialized.
1598
13.9M
  if (PREDICT_FALSE(IsMarker())) {
1599
6.46k
    total_size_bytes_ = 0;
1600
6.46k
    state_ = kEntrySerialized;
1601
6.46k
    return Status::OK();
1602
6.46k
  }
1603
13.8M
  DCHECK_NE(entry_batch_pb_.mono_time(), 0);
1604
13.8M
  total_size_bytes_ = entry_batch_pb_.ByteSize();
1605
13.8M
  buffer_.reserve(total_size_bytes_);
1606
1607
13.8M
  pb_util::AppendToString(entry_batch_pb_, &buffer_);
1608
1609
13.8M
  state_ = kEntrySerialized;
1610
13.8M
  return Status::OK();
1611
13.8M
}
1612
1613
14.8M
void LogEntryBatch::MarkReady() {
1614
14.8M
  DCHECK_EQ(state_, kEntryReserved);
1615
14.8M
  state_ = kEntryReady;
1616
14.8M
}
1617
1618
}  // namespace log
1619
}  // namespace yb