YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/consensus/log_reader.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
#ifndef YB_CONSENSUS_LOG_READER_H
33
#define YB_CONSENSUS_LOG_READER_H
34
35
#include <map>
36
#include <string>
37
#include <utility>
38
#include <vector>
39
40
#include <gtest/gtest.h>
41
42
#include "yb/consensus/consensus_fwd.h"
43
#include "yb/consensus/log_metrics.h"
44
#include "yb/consensus/log_fwd.h"
45
46
#include "yb/gutil/ref_counted.h"
47
#include "yb/gutil/spinlock.h"
48
49
#include "yb/util/locks.h"
50
#include "yb/util/monotime.h"
51
52
namespace yb {
53
54
class Env;
55
struct OpId;
56
57
namespace cdc {
58
class CDCServiceTestMaxRentionTime_TestLogRetentionByOpId_MaxRentionTime_Test;
59
class CDCServiceTestMinSpace_TestLogRetentionByOpId_MinSpace_Test;
60
}
61
62
namespace log {
63
64
// Reads a set of segments from a given path. Segment headers and footers
65
// are read and parsed, but entries are not.
66
// This class is thread safe.
67
class LogReader {
68
 public:
69
  ~LogReader();
70
71
  // Opens a LogReader on a specific log directory, and sets 'reader' to the newly created
72
  // LogReader.
73
  //
74
  // 'index' may be nullptr, but if it is, ReadReplicatesInRange() may not be used.
75
  static CHECKED_STATUS Open(Env *env,
76
                             const scoped_refptr<LogIndex>& index,
77
                             std::string log_prefix,
78
                             const std::string& tablet_wal_path,
79
                             const scoped_refptr<MetricEntity>& table_metric_entity,
80
                             const scoped_refptr<MetricEntity>& tablet_metric_entity,
81
                             std::unique_ptr<LogReader> *reader);
82
83
  // Returns the biggest prefix of segments, from the current sequence, guaranteed
84
  // not to include any replicate messages with indexes >= 'index'.
85
  CHECKED_STATUS GetSegmentPrefixNotIncluding(int64_t index, SegmentSequence* segments) const;
86
87
  CHECKED_STATUS GetSegmentPrefixNotIncluding(int64_t index, int64_t cdc_replicated_index,
88
                                              SegmentSequence* segments) const;
89
90
  // Return the minimum replicate index that is retained in the currently available
91
  // logs. May return -1 if no replicates have been logged.
92
  int64_t GetMinReplicateIndex() const;
93
94
  // Return a readable segment with the given sequence number, or nullptr if it
95
  // cannot be found (e.g. if it has already been GCed).
96
  scoped_refptr<ReadableLogSegment> GetSegmentBySequenceNumber(int64_t seq) const;
97
98
  // Copies a snapshot of the current sequence of segments into 'segments'.
99
  // 'segments' will be cleared first.
100
  CHECKED_STATUS GetSegmentsSnapshot(SegmentSequence* segments) const;
101
102
  // Reads all ReplicateMsgs from 'starting_at' to 'up_to' both inclusive.
103
  // The caller takes ownership of the returned ReplicateMsg objects.
104
  //
105
  // Will attempt to read no more than 'max_bytes_to_read', unless it is set to
106
  // LogReader::kNoSizeLimit. If the size limit would prevent reading any operations at
107
  // all, then will read exactly one operation.
108
  //
109
  // Requires that a LogIndex was passed into LogReader::Open().
110
  // The parameters starting_op_segment_seq_num, modified_schema, schema_version are used to read
111
  // appropriate schema corresponding to the from_op_id in the segment header or from the segment
112
  // itself if there is a DDL log
113
  CHECKED_STATUS ReadReplicatesInRange(
114
      const int64_t starting_at,
115
      const int64_t up_to,
116
      int64_t max_bytes_to_read,
117
      consensus::ReplicateMsgs* replicates,
118
      int64_t *starting_op_segment_seq_num,
119
      yb::SchemaPB* modified_schema,
120
      uint32_t *schema_version,
121
      CoarseTimePoint deadline = CoarseTimePoint::max()) const;
122
123
  static const int64_t kNoSizeLimit;
124
125
  // Look up the OpId for the given operation index.
126
  // Returns a bad Status if the log index fails to load (eg. due to an IO error).
127
  Result<yb::OpId> LookupOpId(int64_t op_index) const;
128
  Result<int64_t> LookupHeader(int64_t op_index) const;
129
130
  // Returns the number of segments.
131
  size_t num_segments() const;
132
133
  std::string ToString() const;
134
135
2.38k
  const std::string& LogPrefix() const {
136
2.38k
    return log_prefix_;
137
2.38k
  }
138
139
 private:
140
  FRIEND_TEST(cdc::CDCServiceTestMaxRentionTime, TestLogRetentionByOpId_MaxRentionTime);
141
  FRIEND_TEST(cdc::CDCServiceTestMinSpace, TestLogRetentionByOpId_MinSpace);
142
  FRIEND_TEST(LogTest, TestLogReader);
143
  FRIEND_TEST(LogTest, TestReadLogWithReplacedReplicates);
144
  friend class Log;
145
  friend class LogTest;
146
147
  enum State {
148
    kLogReaderInitialized,
149
    kLogReaderReading,
150
    kLogReaderClosed
151
  };
152
153
  // Appends 'segment' to the segments available for read by this reader.
154
  // Index entries in 'segment's footer will be added to the index.
155
  // If the segment has no footer it will be scanned so this should not be used
156
  // for new segments.
157
  CHECKED_STATUS AppendSegment(const scoped_refptr<ReadableLogSegment>& segment);
158
159
  // Same as above but for segments without any entries.
160
  // Used by the Log to add "empty" segments.
161
  CHECKED_STATUS AppendEmptySegment(const scoped_refptr<ReadableLogSegment>& segment);
162
163
  // Removes segments with sequence numbers less than or equal to 'seg_seqno' from this reader.
164
  CHECKED_STATUS TrimSegmentsUpToAndIncluding(uint64_t seg_seqno);
165
166
  // Replaces the last segment in the reader with 'segment'.
167
  // Used to replace a segment that was still in the process of being written
168
  // with its complete version which has a footer and index entries.
169
  // Requires that the last segment in 'segments_' has the same sequence
170
  // number as 'segment'.
171
  // Expects 'segment' to be properly closed and to have footer.
172
  CHECKED_STATUS ReplaceLastSegment(const scoped_refptr<ReadableLogSegment>& segment);
173
174
  // Appends 'segment' to the segment sequence.
175
  // Assumes that the segment was scanned, if no footer was found.
176
  // To be used only internally, clients of this class with private access (i.e. friends)
177
  // should use the thread safe version, AppendSegment(), which will also scan the segment
178
  // if no footer is present.
179
  CHECKED_STATUS AppendSegmentUnlocked(const scoped_refptr<ReadableLogSegment>& segment);
180
181
  // Used by Log to update its LogReader on how far it is possible to read
182
  // the current segment. Requires that the reader has at least one segment
183
  // and that the last segment has no footer, meaning it is currently being
184
  // written to.
185
  void UpdateLastSegmentOffset(int64_t readable_to_offset);
186
187
  // Read the LogEntryBatch pointed to by the provided index entry.
188
  // 'tmp_buf' is used as scratch space to avoid extra allocation.
189
  CHECKED_STATUS ReadBatchUsingIndexEntry(const LogIndexEntry& index_entry,
190
                                          faststring* tmp_buf,
191
                                          LogEntryBatchPB* batch) const;
192
193
  LogReader(Env* env, const scoped_refptr<LogIndex>& index,
194
            std::string log_prefix,
195
            const scoped_refptr<MetricEntity>& table_metric_entity,
196
            const scoped_refptr<MetricEntity>& tablet_metric_entity);
197
198
  // Reads the headers of all segments in 'path_'.
199
  CHECKED_STATUS Init(const std::string& path);
200
201
  // Initializes an 'empty' reader for tests, i.e. does not scan a path looking for segments.
202
  CHECKED_STATUS InitEmptyReaderForTests();
203
204
  // Determines if a file is older than the time specified by FLAGS_log_max_seconds_to_retain.
205
  bool ViolatesMaxTimePolicy(const scoped_refptr<ReadableLogSegment>& segment) const;
206
207
  // Return true if by keeping this log segment, we would violate the required minimum free space.
208
  // potential_reclaimed_space is used for the calculation of free space. If NotEnoughSpace returns
209
  // true, it will add the size of segment to potential_reclaimed_space.
210
  bool ViolatesMinSpacePolicy(const scoped_refptr<ReadableLogSegment>& segment,
211
                              int64_t *potential_reclaimed_space) const;
212
213
  Env *env_;
214
215
  const scoped_refptr<LogIndex> log_index_;
216
  const std::string log_prefix_;
217
218
  // Metrics
219
  scoped_refptr<Counter> bytes_read_;
220
  scoped_refptr<Counter> entries_read_;
221
  scoped_refptr<Histogram> read_batch_latency_;
222
223
  // The sequence of all current log segments in increasing sequence number
224
  // order.
225
  SegmentSequence segments_;
226
227
  mutable simple_spinlock lock_;
228
229
  State state_;
230
231
  // Used for test only.
232
  mutable std::unique_ptr<SegmentSequence> segments_violate_max_time_policy_;
233
  mutable std::unique_ptr<SegmentSequence> segments_violate_min_space_policy_;
234
235
  DISALLOW_COPY_AND_ASSIGN(LogReader);
236
};
237
238
}  // namespace log
239
}  // namespace yb
240
241
#endif // YB_CONSENSUS_LOG_READER_H