YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/consensus/log.h
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#ifndef YB_CONSENSUS_LOG_H_
34
#define YB_CONSENSUS_LOG_H_
35
36
#include <pthread.h>
37
#include <sys/types.h>
38
39
#include <atomic>
40
#include <condition_variable>
41
#include <map>
42
#include <memory>
43
#include <mutex>
44
#include <string>
45
#include <vector>
46
47
#include <boost/atomic.hpp>
48
#include <glog/logging.h>
49
50
#include "yb/common/common_fwd.h"
51
52
#include "yb/consensus/consensus_fwd.h"
53
#include "yb/consensus/log_util.h"
54
55
#include "yb/fs/fs_manager.h"
56
57
#include "yb/gutil/macros.h"
58
#include "yb/gutil/ref_counted.h"
59
#include "yb/gutil/spinlock.h"
60
61
#include "yb/util/status_fwd.h"
62
#include "yb/util/locks.h"
63
#include "yb/util/monotime.h"
64
#include "yb/util/mutex.h"
65
#include "yb/util/opid.h"
66
#include "yb/util/promise.h"
67
#include "yb/util/shared_lock.h"
68
#include "yb/util/status_callback.h"
69
#include "yb/util/threadpool.h"
70
71
namespace yb {
72
73
class MetricEntity;
74
class ThreadPool;
75
76
namespace cdc {
77
class CDCServiceTestMaxRentionTime_TestLogRetentionByOpId_MaxRentionTime_Test;
78
class CDCServiceTestMinSpace_TestLogRetentionByOpId_MinSpace_Test;
79
}
80
81
namespace log {
82
83
YB_STRONGLY_TYPED_BOOL(CreateNewSegment);
84
YB_DEFINE_ENUM(
85
    SegmentAllocationState,
86
    (kAllocationNotStarted)  // No segment allocation requested
87
    (kAllocationInProgress)  // Next segment allocation started
88
    (kAllocationFinished)    // Next segment ready
89
);
90
91
// Log interface, inspired by Raft's (logcabin) Log. Provides durability to YugaByte as a normal
92
// Write Ahead Log and also plays the role of persistent storage for the consensus state machine.
93
//
94
// Note: This class is not thread safe, the caller is expected to synchronize Log::Reserve() and
95
// Log::Append() calls.
96
//
97
// Log uses group commit to improve write throughput and latency without compromising ordering and
98
// durability guarantees.
99
//
100
// To add operations to the log, the caller must obtain the lock and call Reserve() with the
101
// collection of operations to be added. Then, the caller may release the lock and call
102
// AsyncAppend(). Reserve() reserves a slot on a queue for the log entry; AsyncAppend() indicates
103
// that the entry in the slot is safe to write to disk and adds a callback that will be invoked once
104
// the entry is written and synchronized to disk.
105
//
106
// For sample usage see mt-log-test.cc
107
//
108
// Methods on this class are _not_ thread-safe and must be externally synchronized unless otherwise
109
// noted.
110
//
111
// Note: The Log needs to be Close()d before any log-writing class is destroyed, otherwise the Log
112
// might hold references to these classes to execute the callbacks after each write.
113
class Log : public RefCountedThreadSafe<Log> {
114
 public:
115
  static const Status kLogShutdownStatus;
116
117
  // Opens or continues a log and sets 'log' to the newly built Log.
118
  // After a successful Open() the Log is ready to receive entries, if create_new_segment is true.
119
  static CHECKED_STATUS Open(const LogOptions &options,
120
                             const std::string& tablet_id,
121
                             const std::string& wal_dir,
122
                             const std::string& peer_uuid,
123
                             const Schema& schema,
124
                             uint32_t schema_version,
125
                             const scoped_refptr<MetricEntity>& table_metric_entity,
126
                             const scoped_refptr<MetricEntity>& tablet_metric_entity,
127
                             ThreadPool *append_thread_pool,
128
                             ThreadPool* allocation_thread_pool,
129
                             int64_t cdc_min_replicated_index,
130
                             scoped_refptr<Log> *log,
131
                             CreateNewSegment create_new_segment = CreateNewSegment::kTrue);
132
133
  ~Log();
134
135
  // Reserves a spot in the log's queue for 'entry_batch'.
136
  //
137
  // 'reserved_entry' is initialized by this method and any resources associated with it will be
138
  // released in AsyncAppend().  In order to ensure correct ordering of operations across multiple
139
  // threads, calls to this method must be externally synchronized.
140
  //
141
  // WARNING: the caller _must_ call AsyncAppend() or else the log will "stall" and will never be
142
  // able to make forward progress.
143
  void Reserve(LogEntryTypePB type, LogEntryBatchPB* entry_batch, LogEntryBatch** reserved_entry);
144
145
  // Asynchronously appends 'entry' to the log. Once the append completes and is synced, 'callback'
146
  // will be invoked.
147
  CHECKED_STATUS AsyncAppend(LogEntryBatch* entry,
148
                             const StatusCallback& callback);
149
150
  CHECKED_STATUS TEST_AsyncAppendWithReplicates(
151
      LogEntryBatch* entry, const ReplicateMsgs& replicates, const StatusCallback& callback);
152
153
  // Synchronously append a new entry to the log.  Log does not take ownership of the passed
154
  // 'entry'. If skip_wal_write is true, only update consensus metadata and LogIndex, skip write
155
  // to wal.
156
  // TODO get rid of this method, transition to the asynchronous API.
157
  CHECKED_STATUS Append(LogEntryPB* entry,
158
                        LogEntryMetadata entry_metadata,
159
                        bool skip_wal_write = false);
160
161
  // Append the given set of replicate messages, asynchronously.  This requires that the replicates
162
  // have already been assigned OpIds.
163
  CHECKED_STATUS AsyncAppendReplicates(const ReplicateMsgs& replicates, const OpId& committed_op_id,
164
                                       RestartSafeCoarseTimePoint batch_mono_time,
165
                                       const StatusCallback& callback);
166
167
  // Blocks the current thread until all the entries in the log queue are flushed and fsynced (if
168
  // fsync of log entries is enabled).
169
  CHECKED_STATUS WaitUntilAllFlushed();
170
171
  // The closure submitted to allocation_pool_ to allocate a new segment.
172
  void SegmentAllocationTask();
173
174
  // Syncs all state and closes the log.
175
  CHECKED_STATUS Close();
176
177
  // Return true if there is any on-disk data for the given tablet.
178
  static bool HasOnDiskData(FsManager* fs_manager, const std::string& tablet_id);
179
180
  // Delete all WAL data from the log associated with this tablet.
181
  // REQUIRES: The Log must be closed.
182
  static CHECKED_STATUS DeleteOnDiskData(Env* env,
183
                                         const std::string& tablet_id,
184
                                         const std::string& wal_dir,
185
                                         const std::string& peer_uuid);
186
187
  // Returns a reader that is able to read through the previous segments. The reader pointer is
188
  // guaranteed to be live as long as the log itself is initialized and live.
189
  LogReader* GetLogReader() const;
190
191
  CHECKED_STATUS GetSegmentsSnapshot(SegmentSequence* segments) const;
192
193
  void SetMaxSegmentSizeForTests(uint64_t max_segment_size) {
194
    max_segment_size_ = max_segment_size;
195
  }
196
197
150k
  void DisableSync() {
198
150k
    sync_disabled_ = true;
199
150k
  }
200
201
  // If we previous called DisableSync(), we should restore the default behavior and then call
202
  // Sync() which will perform the actual syncing if required.
203
150k
  CHECKED_STATUS ReEnableSyncIfRequired() {
204
150k
    sync_disabled_ = false;
205
150k
    return Sync();
206
150k
  }
207
208
  // Get ID of tablet.
209
151k
  const std::string& tablet_id() const {
210
151k
    return tablet_id_;
211
151k
  }
212
213
  // Gets the last-used OpId written to the log.  If no entry has ever been written to the log,
214
  // returns (0, 0)
215
  yb::OpId GetLatestEntryOpId() const;
216
217
  int64_t GetMinReplicateIndex() const;
218
219
  // Runs the garbage collector on the set of previous segments. Segments that only refer to in-mem
220
  // state that has been flushed are candidates for garbage collection.
221
  //
222
  // 'min_op_idx' is the minimum operation index required to be retained.  If successful, num_gced
223
  // is set to the number of deleted log segments.
224
  //
225
  // This method is thread-safe.
226
  CHECKED_STATUS GC(int64_t min_op_idx, int* num_gced);
227
228
  // Computes the amount of bytes that would have been GC'd if Log::GC had been called.
229
  CHECKED_STATUS GetGCableDataSize(int64_t min_op_idx, int64_t* total_size) const;
230
231
  // Returns the file system location of the currently active WAL segment.
232
  const WritableLogSegment* ActiveSegmentForTests() const {
233
    return active_segment_.get();
234
  }
235
236
  // If active segment is not empty, forces the Log to allocate a new segment and roll over.
237
  // This can be used to make sure all entries appended up to this point are available in closed,
238
  // readable segments. Note that this assumes there is already a valid active_segment_.
239
  CHECKED_STATUS AllocateSegmentAndRollOver();
240
241
  // For a log created with CreateNewSegment::kFalse, this is used to finish log initialization by
242
  // allocating a new segment.
243
  CHECKED_STATUS EnsureInitialNewSegmentAllocated();
244
245
  // Returns the total size of the current segments, in bytes.
246
  // Returns 0 if the log is shut down.
247
  uint64_t OnDiskSize();
248
249
  // Set the schema for the _next_ log segment.
250
  //
251
  // This method is thread-safe.
252
  void SetSchemaForNextLogSegment(const Schema& schema, uint32_t version);
253
254
  void set_wal_retention_secs(uint32_t wal_retention_secs);
255
256
  uint32_t wal_retention_secs() const;
257
258
  // Waits until specified op id is added to log.
259
  // Returns current op id after waiting, which could be greater than or equal to specified op id.
260
  //
261
  // On timeout returns default constructed OpId.
262
  yb::OpId WaitForSafeOpIdToApply(const yb::OpId& op_id, MonoDelta duration = MonoDelta());
263
264
  // Return a readable segment with the given sequence number, or nullptr if it
265
  // cannot be found (e.g. if it has already been GCed).
266
  scoped_refptr<ReadableLogSegment> GetSegmentBySequenceNumber(int64_t seq) const;
267
268
  void TEST_SetSleepDuration(const std::chrono::nanoseconds& duration) {
269
    sleep_duration_.store(duration, std::memory_order_release);
270
  }
271
272
  void TEST_SetAllOpIdsSafe(bool value) {
273
    all_op_ids_safe_ = value;
274
  }
275
276
  uint64_t active_segment_sequence_number() const;
277
278
  CHECKED_STATUS TEST_SubmitFuncToAppendToken(const std::function<void()>& func);
279
280
  // Returns the number of segments.
281
  size_t num_segments() const;
282
283
443k
  const std::string& LogPrefix() const {
284
443k
    return log_prefix_;
285
443k
  }
286
287
0
  std::string wal_dir() const {
288
0
    return wal_dir_;
289
0
  }
290
291
301k
  void set_cdc_min_replicated_index(int64_t cdc_min_replicated_index) {
292
    // TODO: check that the passed index is greater than the current index.
293
301k
    cdc_min_replicated_index_.store(cdc_min_replicated_index, std::memory_order_release);
294
301k
  }
295
296
0
  int64_t cdc_min_replicated_index() {
297
0
    return cdc_min_replicated_index_.load(std::memory_order_acquire);
298
0
  }
299
300
  CHECKED_STATUS FlushIndex();
301
302
  // Copies log to a new dir.
303
  // Flushes necessary files and uses hard links where it is safe.
304
  CHECKED_STATUS CopyTo(const std::string& dest_wal_dir);
305
306
  // Waits until all entries flushed, then reset last received op id to specified one.
307
  CHECKED_STATUS ResetLastSyncedEntryOpId(const OpId& op_id);
308
309
 private:
310
  friend class LogTest;
311
  friend class LogTestBase;
312
313
  FRIEND_TEST(LogTest, TestMultipleEntriesInABatch);
314
  FRIEND_TEST(LogTest, TestReadLogWithReplacedReplicates);
315
  FRIEND_TEST(LogTest, TestWriteAndReadToAndFromInProgressSegment);
316
  FRIEND_TEST(LogTest, TestLogMetrics);
317
318
  FRIEND_TEST(cdc::CDCServiceTestMaxRentionTime, TestLogRetentionByOpId_MaxRentionTime);
319
  FRIEND_TEST(cdc::CDCServiceTestMinSpace, TestLogRetentionByOpId_MinSpace);
320
321
  class Appender;
322
323
  // Log state.
324
  enum LogState {
325
    kLogInitialized,
326
    kLogWriting,
327
    kLogClosed
328
  };
329
330
  Log(LogOptions options,
331
      std::string wal_dir,
332
      std::string tablet_id,
333
      std::string peer_uuid,
334
      const Schema& schema,
335
      uint32_t schema_version,
336
      const scoped_refptr<MetricEntity>& table_metric_entity,
337
      const scoped_refptr<MetricEntity>& tablet_metric_entity,
338
      ThreadPool* append_thread_pool,
339
      ThreadPool* allocation_thread_pool,
340
      CreateNewSegment create_new_segment = CreateNewSegment::kTrue);
341
342
876k
  Env* get_env() {
343
876k
    return options_.env;
344
876k
  }
345
346
  // Initializes a new one or continues an existing log.
347
  CHECKED_STATUS Init();
348
349
  // Make segments roll over. Note this assumes there was an existing valid active_segment_ we are
350
  // rolling over from.
351
  CHECKED_STATUS RollOver();
352
353
  // Writes the footer and closes the current segment.
354
  CHECKED_STATUS CloseCurrentSegment();
355
356
  // Sets 'out' to a newly created temporary file (see Env::NewTempWritableFile()) for a placeholder
357
  // segment. Sets 'result_path' to the fully qualified path to the unique filename created for the
358
  // segment.
359
  CHECKED_STATUS CreatePlaceholderSegment(const WritableFileOptions& opts,
360
                                          std::string* result_path,
361
                                          std::shared_ptr<WritableFile>* out);
362
363
  // Creates a new WAL segment on disk, writes the next_segment_header_ to disk as the header, and
364
  // sets active_segment_ to point to this new segment.
365
  CHECKED_STATUS SwitchToAllocatedSegment();
366
367
  // Preallocates the space for a new segment.
368
  CHECKED_STATUS PreAllocateNewSegment();
369
370
  // Returns the desired size for the next log segment to be created.
371
  uint64_t NextSegmentDesiredSize();
372
373
  // Writes serialized contents of 'entry' to the log. Called inside AppenderThread. If
374
  // 'caller_owns_operation' is true, then the 'operation' field of the entry will be released after
375
  // the entry is appended. If skip_wal_write is true, only update consensus metadata and LogIndex,
376
  // skip WAL write.
377
  //
378
  // TODO once Append() is removed, 'caller_owns_operation' and associated logic will no longer be
379
  // needed.
380
  CHECKED_STATUS DoAppend(
381
      LogEntryBatch* entry, bool caller_owns_operation = true, bool skip_wal_write = false);
382
383
  // Update footer_builder_ to reflect the log indexes seen in 'batch'.
384
  void UpdateFooterForBatch(LogEntryBatch* batch);
385
386
  // Update the LogIndex to include entries for the replicate messages found in 'batch'. The index
387
  // entry points to the offset 'start_offset' in the current log segment.
388
  CHECKED_STATUS UpdateIndexForBatch(const LogEntryBatch& batch);
389
390
  // Replaces the last "empty" segment in 'log_reader_', i.e. the one currently being written to, by
391
  // the same segment once properly closed.
392
  CHECKED_STATUS ReplaceSegmentInReaderUnlocked();
393
394
  CHECKED_STATUS Sync();
395
396
  // Helper method to get the segment sequence to GC based on the provided min_op_idx.
397
  CHECKED_STATUS GetSegmentsToGCUnlocked(int64_t min_op_idx, SegmentSequence* segments_to_gc) const;
398
399
  // Kick off an asynchronous task that pre-allocates a new log-segment, setting
400
  // 'allocation_status_'. To wait for the result of the task, use allocation_status_.Get().
401
  CHECKED_STATUS AsyncAllocateSegment();
402
403
25.4M
  SegmentAllocationState allocation_state() {
404
25.4M
    return allocation_state_.load(std::memory_order_acquire);
405
25.4M
  }
406
407
  LogEntryBatch* ReserveMarker(LogEntryTypePB type);
408
409
  LogOptions options_;
410
411
  // The dir path where the write-ahead log for this tablet is stored.
412
  std::string wal_dir_;
413
414
  // The ID of the tablet this log is dedicated to.
415
  std::string tablet_id_;
416
417
  // Peer this log is dedicated to.
418
  std::string peer_uuid_;
419
420
  // Lock to protect modifications to schema_ and schema_version_.
421
  mutable rw_spinlock schema_lock_;
422
423
  // The current schema of the tablet this log is dedicated to.
424
  std::unique_ptr<Schema> schema_;
425
426
  // The schema version
427
  uint32_t schema_version_;
428
429
  // The currently active segment being written.
430
  std::unique_ptr<WritableLogSegment> active_segment_;
431
432
  // The current (active) segment sequence number. Initialized in the Log constructor based on
433
  // LogOptions.
434
  std::atomic<uint64_t> active_segment_sequence_number_;
435
436
  // The writable file for the next allocated segment
437
  std::shared_ptr<WritableFile> next_segment_file_;
438
439
  // The path for the next allocated segment.
440
  std::string next_segment_path_;
441
442
  // Lock to protect mutations to log_state_ and other shared state variables.
443
  mutable percpu_rwlock state_lock_;
444
445
  LogState log_state_;
446
447
  // A reader for the previous segments that were not yet GC'd.
448
  std::unique_ptr<LogReader> reader_;
449
450
  // Index which translates between operation indexes and the position of the operation in the log.
451
  scoped_refptr<LogIndex> log_index_;
452
453
  // Lock for notification of last_synced_entry_op_id_ changes.
454
  mutable std::mutex last_synced_entry_op_id_mutex_;
455
  mutable std::condition_variable last_synced_entry_op_id_cond_;
456
457
  // The last known OpId for a REPLICATE message appended and synced to this log (any segment).
458
  // NOTE: this op is not necessarily durable unless gflag durable_wal_write is true.
459
  boost::atomic<yb::OpId> last_synced_entry_op_id_{yb::OpId()};
460
461
  // The last know OpId for a REPLICATE message appended to this log (any segment).
462
  // This variable is not accessed concurrently.
463
  yb::OpId last_appended_entry_op_id_;
464
465
  yb::OpId last_submitted_op_id_;
466
467
  // A footer being prepared for the current segment.  When the segment is closed, it will be
468
  // written.
469
  LogSegmentFooterPB footer_builder_;
470
471
  // The maximum segment size, in bytes.
472
  uint64_t max_segment_size_;
473
474
  // The maximum segment size we want for the current WAL segment, in bytes.  This value keeps
475
  // doubling (for each subsequent WAL segment) till it gets to max_segment_size_.
476
  uint64_t cur_max_segment_size_;
477
478
  // Appender manages a TaskStream writing to the log. We will use one taskstream per tablet.
479
  std::unique_ptr<Appender> appender_;
480
481
  // A thread pool for asynchronously pre-allocating new log segments.
482
  std::unique_ptr<ThreadPoolToken> allocation_token_;
483
484
  // If true, sync on all appends.
485
  bool durable_wal_write_;
486
487
  // If non-zero, sync every interval of time.
488
  MonoDelta interval_durable_wal_write_;
489
490
  // If non-zero, sync if more than given amount of data to sync.
491
  int32_t bytes_durable_wal_write_mb_;
492
493
  // Keeps track of oldest entry which needs to be synced.
494
  MonoTime periodic_sync_earliest_unsync_entry_time_ = MonoTime::kMin;
495
496
  // For periodic sync, indicates if there are entries to be sync'ed.
497
  std::atomic<bool> periodic_sync_needed_ = {false};
498
499
  // For periodic sync, indicates number of bytes which need to be sync'ed.
500
  size_t periodic_sync_unsynced_bytes_ = 0;
501
502
  // If true, ignore the 'durable_wal_write_' flags above.  This is used to disable fsync during
503
  // bootstrap.
504
  bool sync_disabled_;
505
506
  // The status of the most recent log-allocation action.
507
  Promise<Status> allocation_status_;
508
509
  std::atomic<SegmentAllocationState> allocation_state_;
510
511
  scoped_refptr<MetricEntity> table_metric_entity_;
512
  scoped_refptr<MetricEntity> tablet_metric_entity_;
513
  std::unique_ptr<LogMetrics> metrics_;
514
515
  // The cached on-disk size of the log, used to track its size even if it has been closed.
516
  std::atomic<uint64_t> on_disk_size_;
517
518
  // Listener that will be invoked after new entry was appended to the log.
519
  std::function<void()> post_append_listener_;
520
521
  // Used in tests delay writing log entries.
522
  std::atomic<std::chrono::nanoseconds> sleep_duration_{std::chrono::nanoseconds(0)};
523
524
  // Used in tests to declare all operations as safe.
525
  bool all_op_ids_safe_ = false;
526
527
  const std::string log_prefix_;
528
529
  std::atomic<uint32_t> wal_retention_secs_{0};
530
531
  // Minimum replicate index for the current log being written. Used for CDC read initialization.
532
  std::atomic<int64_t> min_replicate_index_{-1};
533
534
  // The current replicated index that CDC has read.  Used for CDC read cache optimization.
535
  std::atomic<int64_t> cdc_min_replicated_index_{std::numeric_limits<int64_t>::max()};
536
537
  std::mutex log_copy_mutex_;
538
539
  // Used by GetSegmentsToGCUnlocked() as an anchor.
540
  int64_t log_copy_min_index_ GUARDED_BY(state_lock_) = std::numeric_limits<int64_t>::max();
541
542
  CreateNewSegment create_new_segment_at_start_;
543
544
  DISALLOW_COPY_AND_ASSIGN(Log);
545
};
546
547
}  // namespace log
548
}  // namespace yb
549
#endif /* YB_CONSENSUS_LOG_H_ */