/Users/deen/code/yugabyte-db/src/yb/consensus/log_reader.h
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | // |
18 | | // The following only applies to changes made to this file as part of YugaByte development. |
19 | | // |
20 | | // Portions Copyright (c) YugaByte, Inc. |
21 | | // |
22 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
23 | | // in compliance with the License. You may obtain a copy of the License at |
24 | | // |
25 | | // http://www.apache.org/licenses/LICENSE-2.0 |
26 | | // |
27 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
28 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
29 | | // or implied. See the License for the specific language governing permissions and limitations |
30 | | // under the License. |
31 | | // |
32 | | #ifndef YB_CONSENSUS_LOG_READER_H |
33 | | #define YB_CONSENSUS_LOG_READER_H |
34 | | |
35 | | #include <map> |
36 | | #include <string> |
37 | | #include <utility> |
38 | | #include <vector> |
39 | | |
40 | | #include <gtest/gtest.h> |
41 | | |
42 | | #include "yb/consensus/consensus_fwd.h" |
43 | | #include "yb/consensus/log_metrics.h" |
44 | | #include "yb/consensus/log_fwd.h" |
45 | | |
46 | | #include "yb/gutil/ref_counted.h" |
47 | | #include "yb/gutil/spinlock.h" |
48 | | |
49 | | #include "yb/util/locks.h" |
50 | | #include "yb/util/monotime.h" |
51 | | |
52 | | namespace yb { |
53 | | |
54 | | class Env; |
55 | | struct OpId; |
56 | | |
57 | | namespace cdc { |
58 | | class CDCServiceTestMaxRentionTime_TestLogRetentionByOpId_MaxRentionTime_Test; |
59 | | class CDCServiceTestMinSpace_TestLogRetentionByOpId_MinSpace_Test; |
60 | | } |
61 | | |
62 | | namespace log { |
63 | | |
64 | | // Reads a set of segments from a given path. Segment headers and footers |
65 | | // are read and parsed, but entries are not. |
66 | | // This class is thread safe. |
67 | | class LogReader { |
68 | | public: |
69 | | ~LogReader(); |
70 | | |
71 | | // Opens a LogReader on a specific log directory, and sets 'reader' to the newly created |
72 | | // LogReader. |
73 | | // |
74 | | // 'index' may be nullptr, but if it is, ReadReplicatesInRange() may not be used. |
75 | | static CHECKED_STATUS Open(Env *env, |
76 | | const scoped_refptr<LogIndex>& index, |
77 | | std::string log_prefix, |
78 | | const std::string& tablet_wal_path, |
79 | | const scoped_refptr<MetricEntity>& table_metric_entity, |
80 | | const scoped_refptr<MetricEntity>& tablet_metric_entity, |
81 | | std::unique_ptr<LogReader> *reader); |
82 | | |
83 | | // Returns the biggest prefix of segments, from the current sequence, guaranteed |
84 | | // not to include any replicate messages with indexes >= 'index'. |
85 | | CHECKED_STATUS GetSegmentPrefixNotIncluding(int64_t index, SegmentSequence* segments) const; |
86 | | |
87 | | CHECKED_STATUS GetSegmentPrefixNotIncluding(int64_t index, int64_t cdc_replicated_index, |
88 | | SegmentSequence* segments) const; |
89 | | |
90 | | // Return the minimum replicate index that is retained in the currently available |
91 | | // logs. May return -1 if no replicates have been logged. |
92 | | int64_t GetMinReplicateIndex() const; |
93 | | |
94 | | // Return a readable segment with the given sequence number, or nullptr if it |
95 | | // cannot be found (e.g. if it has already been GCed). |
96 | | scoped_refptr<ReadableLogSegment> GetSegmentBySequenceNumber(int64_t seq) const; |
97 | | |
98 | | // Copies a snapshot of the current sequence of segments into 'segments'. |
99 | | // 'segments' will be cleared first. |
100 | | CHECKED_STATUS GetSegmentsSnapshot(SegmentSequence* segments) const; |
101 | | |
102 | | // Reads all ReplicateMsgs from 'starting_at' to 'up_to' both inclusive. |
103 | | // The caller takes ownership of the returned ReplicateMsg objects. |
104 | | // |
105 | | // Will attempt to read no more than 'max_bytes_to_read', unless it is set to |
106 | | // LogReader::kNoSizeLimit. If the size limit would prevent reading any operations at |
107 | | // all, then will read exactly one operation. |
108 | | // |
109 | | // Requires that a LogIndex was passed into LogReader::Open(). |
110 | | // The parameters starting_op_segment_seq_num, modified_schema, schema_version are used to read |
111 | | // appropriate schema corresponding to the from_op_id in the segment header or from the segment |
112 | | // itself if there is a DDL log |
113 | | CHECKED_STATUS ReadReplicatesInRange( |
114 | | const int64_t starting_at, |
115 | | const int64_t up_to, |
116 | | int64_t max_bytes_to_read, |
117 | | consensus::ReplicateMsgs* replicates, |
118 | | int64_t *starting_op_segment_seq_num, |
119 | | yb::SchemaPB* modified_schema, |
120 | | uint32_t *schema_version, |
121 | | CoarseTimePoint deadline = CoarseTimePoint::max()) const; |
122 | | |
123 | | static const int64_t kNoSizeLimit; |
124 | | |
125 | | // Look up the OpId for the given operation index. |
126 | | // Returns a bad Status if the log index fails to load (eg. due to an IO error). |
127 | | Result<yb::OpId> LookupOpId(int64_t op_index) const; |
128 | | Result<int64_t> LookupHeader(int64_t op_index) const; |
129 | | |
130 | | // Returns the number of segments. |
131 | | size_t num_segments() const; |
132 | | |
133 | | std::string ToString() const; |
134 | | |
135 | 2.38k | const std::string& LogPrefix() const { |
136 | 2.38k | return log_prefix_; |
137 | 2.38k | } |
138 | | |
139 | | private: |
140 | | FRIEND_TEST(cdc::CDCServiceTestMaxRentionTime, TestLogRetentionByOpId_MaxRentionTime); |
141 | | FRIEND_TEST(cdc::CDCServiceTestMinSpace, TestLogRetentionByOpId_MinSpace); |
142 | | FRIEND_TEST(LogTest, TestLogReader); |
143 | | FRIEND_TEST(LogTest, TestReadLogWithReplacedReplicates); |
144 | | friend class Log; |
145 | | friend class LogTest; |
146 | | |
147 | | enum State { |
148 | | kLogReaderInitialized, |
149 | | kLogReaderReading, |
150 | | kLogReaderClosed |
151 | | }; |
152 | | |
153 | | // Appends 'segment' to the segments available for read by this reader. |
154 | | // Index entries in 'segment's footer will be added to the index. |
155 | | // If the segment has no footer it will be scanned so this should not be used |
156 | | // for new segments. |
157 | | CHECKED_STATUS AppendSegment(const scoped_refptr<ReadableLogSegment>& segment); |
158 | | |
159 | | // Same as above but for segments without any entries. |
160 | | // Used by the Log to add "empty" segments. |
161 | | CHECKED_STATUS AppendEmptySegment(const scoped_refptr<ReadableLogSegment>& segment); |
162 | | |
163 | | // Removes segments with sequence numbers less than or equal to 'seg_seqno' from this reader. |
164 | | CHECKED_STATUS TrimSegmentsUpToAndIncluding(uint64_t seg_seqno); |
165 | | |
166 | | // Replaces the last segment in the reader with 'segment'. |
167 | | // Used to replace a segment that was still in the process of being written |
168 | | // with its complete version which has a footer and index entries. |
169 | | // Requires that the last segment in 'segments_' has the same sequence |
170 | | // number as 'segment'. |
171 | | // Expects 'segment' to be properly closed and to have footer. |
172 | | CHECKED_STATUS ReplaceLastSegment(const scoped_refptr<ReadableLogSegment>& segment); |
173 | | |
174 | | // Appends 'segment' to the segment sequence. |
175 | | // Assumes that the segment was scanned, if no footer was found. |
176 | | // To be used only internally, clients of this class with private access (i.e. friends) |
177 | | // should use the thread safe version, AppendSegment(), which will also scan the segment |
178 | | // if no footer is present. |
179 | | CHECKED_STATUS AppendSegmentUnlocked(const scoped_refptr<ReadableLogSegment>& segment); |
180 | | |
181 | | // Used by Log to update its LogReader on how far it is possible to read |
182 | | // the current segment. Requires that the reader has at least one segment |
183 | | // and that the last segment has no footer, meaning it is currently being |
184 | | // written to. |
185 | | void UpdateLastSegmentOffset(int64_t readable_to_offset); |
186 | | |
187 | | // Read the LogEntryBatch pointed to by the provided index entry. |
188 | | // 'tmp_buf' is used as scratch space to avoid extra allocation. |
189 | | CHECKED_STATUS ReadBatchUsingIndexEntry(const LogIndexEntry& index_entry, |
190 | | faststring* tmp_buf, |
191 | | LogEntryBatchPB* batch) const; |
192 | | |
193 | | LogReader(Env* env, const scoped_refptr<LogIndex>& index, |
194 | | std::string log_prefix, |
195 | | const scoped_refptr<MetricEntity>& table_metric_entity, |
196 | | const scoped_refptr<MetricEntity>& tablet_metric_entity); |
197 | | |
198 | | // Reads the headers of all segments in 'path_'. |
199 | | CHECKED_STATUS Init(const std::string& path); |
200 | | |
201 | | // Initializes an 'empty' reader for tests, i.e. does not scan a path looking for segments. |
202 | | CHECKED_STATUS InitEmptyReaderForTests(); |
203 | | |
204 | | // Determines if a file is older than the time specified by FLAGS_log_max_seconds_to_retain. |
205 | | bool ViolatesMaxTimePolicy(const scoped_refptr<ReadableLogSegment>& segment) const; |
206 | | |
207 | | // Return true if by keeping this log segment, we would violate the required minimum free space. |
208 | | // potential_reclaimed_space is used for the calculation of free space. If NotEnoughSpace returns |
209 | | // true, it will add the size of segment to potential_reclaimed_space. |
210 | | bool ViolatesMinSpacePolicy(const scoped_refptr<ReadableLogSegment>& segment, |
211 | | int64_t *potential_reclaimed_space) const; |
212 | | |
213 | | Env *env_; |
214 | | |
215 | | const scoped_refptr<LogIndex> log_index_; |
216 | | const std::string log_prefix_; |
217 | | |
218 | | // Metrics |
219 | | scoped_refptr<Counter> bytes_read_; |
220 | | scoped_refptr<Counter> entries_read_; |
221 | | scoped_refptr<Histogram> read_batch_latency_; |
222 | | |
223 | | // The sequence of all current log segments in increasing sequence number |
224 | | // order. |
225 | | SegmentSequence segments_; |
226 | | |
227 | | mutable simple_spinlock lock_; |
228 | | |
229 | | State state_; |
230 | | |
231 | | // Used for test only. |
232 | | mutable std::unique_ptr<SegmentSequence> segments_violate_max_time_policy_; |
233 | | mutable std::unique_ptr<SegmentSequence> segments_violate_min_space_policy_; |
234 | | |
235 | | DISALLOW_COPY_AND_ASSIGN(LogReader); |
236 | | }; |
237 | | |
238 | | } // namespace log |
239 | | } // namespace yb |
240 | | |
241 | | #endif // YB_CONSENSUS_LOG_READER_H |