YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/consensus/log_index.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
// The implementation of the Log Index.
34
//
35
// The log index is implemented by a set of on-disk files, each containing a fixed number
36
// (kEntriesPerIndexChunk) of fixed size entries. Each index chunk is numbered such that,
37
// for a given log index, we can determine which chunk contains its index entry by a
38
// simple division operation. Because the entries are fixed size, we can compute the
39
// index offset by a modulo.
40
//
41
// When the log is GCed, we remove any index chunks which are no longer needed, and
42
// unmap them.
43
44
#include "yb/consensus/log_index.h"
45
46
#include <fcntl.h>
47
#include <sys/mman.h>
48
49
#include <mutex>
50
#include <string>
51
#include <vector>
52
53
#include "yb/gutil/map-util.h"
54
55
#include "yb/util/locks.h"
56
57
using std::string;
58
using strings::Substitute;
59
60
300k
#define RETRY_ON_EINTR(ret, expr) do { \
61
300k
  ret = expr; \
62
300k
} while ((ret == -1) && 
(errno == EINTR)0
);
63
64
namespace yb {
65
namespace log {
66
67
// The actual physical entry in the file.
68
// This mirrors LogIndexEntry but uses simple primitives only so we can
69
// read/write it via mmap.
70
// See LogIndexEntry for docs.
71
struct PhysicalEntry {
72
  int64_t term;
73
  uint64_t segment_sequence_number;
74
  uint64_t offset_in_segment;
75
} PACKED;
76
77
// The number of index entries per index chunk.
78
//
79
// **** Note: This number cannot be changed after production!!!!! ***
80
//
81
// Why? Because, given a raft log index, the chunk (i.e. the specific index file) and index
82
// of the entry within the file is determined via simple "/" and "%" calculations respectively
83
// on this value. [Technically, if we did decide to change this number, we would have to
84
// implement some logic to compute the number of entries in each file from its size. But
85
// currently, that's not implemented.]
86
//
87
// On MacOS, ftruncate()'ing a file to its desired size before doing the mmap, immediately uses up
88
// actual disk space, whereas on Linux, it appears to be lazy. Since MacOS is unlikely to be
89
// used for production scenarios, to reduce disk space requirements and testing load (when creating
90
// lots of tables (and therefore tablets)), we set this number to a lower value for MacOS.
91
//
92
#if defined(__APPLE__)
93
static const int64_t kEntriesPerIndexChunk = 16 * 1024;
94
#else
95
static const int64_t kEntriesPerIndexChunk = 1000000;
96
#endif
97
98
static const int64_t kChunkFileSize = kEntriesPerIndexChunk * sizeof(PhysicalEntry);
99
100
////////////////////////////////////////////////////////////
101
// LogIndex::IndexChunk implementation
102
////////////////////////////////////////////////////////////
103
104
// A single chunk of the index, representing a fixed number of entries.
105
// This class maintains the open file descriptor and mapped memory.
106
class LogIndex::IndexChunk : public RefCountedThreadSafe<LogIndex::IndexChunk> {
107
 public:
108
  explicit IndexChunk(string path);
109
  ~IndexChunk();
110
111
  // Open and map the memory.
112
  Status Open();
113
  void GetEntry(int entry_index, PhysicalEntry* ret);
114
  void SetEntry(int entry_index, const PhysicalEntry& entry);
115
116
  // Flush memory-mapped chunk to file.
117
  Status Flush();
118
119
 private:
120
  const string path_;
121
  int fd_;
122
  uint8_t* mapping_;
123
};
124
125
namespace  {
126
300k
Status CheckError(int rc, const char* operation) {
127
300k
  if (PREDICT_FALSE(rc < 0)) {
128
0
    return STATUS(IOError, operation, Errno(errno));
129
0
  }
130
300k
  return Status::OK();
131
300k
}
132
} // anonymous namespace
133
134
LogIndex::IndexChunk::IndexChunk(std::string path)
135
150k
    : path_(std::move(path)), fd_(-1), mapping_(nullptr) {}
136
137
76.3k
LogIndex::IndexChunk::~IndexChunk() {
138
76.3k
  if (mapping_ != nullptr) {
139
75.9k
    munmap(mapping_, kChunkFileSize);
140
75.9k
  }
141
142
76.4k
  if (
fd_ >= 076.3k
) {
143
76.4k
    close(fd_);
144
76.4k
  }
145
76.3k
}
146
147
150k
Status LogIndex::IndexChunk::Open() {
148
150k
  RETRY_ON_EINTR(fd_, open(path_.c_str(), O_CLOEXEC | O_CREAT | O_RDWR, 0666));
149
150k
  RETURN_NOT_OK(CheckError(fd_, "open"));
150
151
150k
  int err;
152
150k
  RETRY_ON_EINTR(err, ftruncate(fd_, kChunkFileSize));
153
150k
  RETURN_NOT_OK(CheckError(fd_, "truncate"));
154
155
150k
  mapping_ = static_cast<uint8_t*>(mmap(nullptr, kChunkFileSize, PROT_READ | PROT_WRITE,
156
150k
                                        MAP_SHARED, fd_, 0));
157
150k
  if (mapping_ == nullptr) {
158
0
    return STATUS(IOError, "Unable to mmap()", Errno(err));
159
0
  }
160
161
150k
  return Status::OK();
162
150k
}
163
164
49.1M
void LogIndex::IndexChunk::GetEntry(int entry_index, PhysicalEntry* ret) {
165
49.1M
  DCHECK_GE
(fd_, 0) << "Must Open() first"0
;
166
49.1M
  DCHECK_LT(entry_index, kEntriesPerIndexChunk);
167
168
49.1M
  memcpy(ret, mapping_ + sizeof(PhysicalEntry) * entry_index, sizeof(PhysicalEntry));
169
49.1M
}
170
171
16.6M
void LogIndex::IndexChunk::SetEntry(int entry_index, const PhysicalEntry& phys) {
172
16.6M
  DCHECK_GE
(fd_, 0) << "Must Open() first"0
;
173
16.6M
  DCHECK_LT(entry_index, kEntriesPerIndexChunk);
174
175
16.6M
  memcpy(mapping_ + sizeof(PhysicalEntry) * entry_index, &phys, sizeof(PhysicalEntry));
176
16.6M
}
177
178
233
Status LogIndex::IndexChunk::Flush() {
179
233
  if (mapping_ != nullptr) {
180
233
    auto result = msync(mapping_, kChunkFileSize, MS_SYNC);
181
233
    return CheckError(result, "msync");
182
233
  }
183
0
  return Status::OK();
184
233
}
185
186
////////////////////////////////////////////////////////////
187
// LogIndex
188
////////////////////////////////////////////////////////////
189
190
151k
LogIndex::LogIndex(std::string base_dir) : base_dir_(std::move(base_dir)) {}
191
192
76.5k
LogIndex::~LogIndex() {
193
76.5k
}
194
195
150k
string LogIndex::GetChunkPath(int64_t chunk_idx) {
196
150k
  return StringPrintf("%s/index.%09" PRId64, base_dir_.c_str(), chunk_idx);
197
150k
}
198
199
150k
Status LogIndex::OpenChunk(int64_t chunk_idx, scoped_refptr<IndexChunk>* chunk) {
200
150k
  string path = GetChunkPath(chunk_idx);
201
202
150k
  scoped_refptr<IndexChunk> new_chunk(new IndexChunk(path));
203
150k
  RETURN_NOT_OK(new_chunk->Open());
204
150k
  chunk->swap(new_chunk);
205
150k
  return Status::OK();
206
150k
}
207
208
Status LogIndex::GetChunkForIndex(int64_t log_index, bool create,
209
65.7M
                                  scoped_refptr<IndexChunk>* chunk) {
210
65.7M
  CHECK_GT(log_index, 0);
211
65.7M
  int64_t chunk_idx = log_index / kEntriesPerIndexChunk;
212
213
65.7M
  {
214
65.7M
    std::lock_guard<simple_spinlock> l(open_chunks_lock_);
215
65.7M
    if (FindCopy(open_chunks_, chunk_idx, chunk)) {
216
65.6M
      return Status::OK();
217
65.6M
    }
218
65.7M
  }
219
220
98.3k
  if (!create) {
221
875
    return STATUS(NotFound, "chunk not found");
222
875
  }
223
224
97.4k
  RETURN_NOT_OK_PREPEND(OpenChunk(chunk_idx, chunk),
225
97.4k
                        "Couldn't open index chunk");
226
97.4k
  {
227
97.4k
    std::lock_guard<simple_spinlock> l(open_chunks_lock_);
228
97.4k
    if (PREDICT_FALSE(ContainsKey(open_chunks_, chunk_idx))) {
229
      // Someone else opened the chunk in the meantime.
230
      // We'll just return that one.
231
0
      *chunk = FindOrDie(open_chunks_, chunk_idx);
232
0
      return Status::OK();
233
0
    }
234
235
97.4k
    InsertOrDie(&open_chunks_, chunk_idx, *chunk);
236
97.4k
  }
237
238
0
  return Status::OK();
239
97.4k
}
240
241
16.6M
Status LogIndex::AddEntry(const LogIndexEntry& entry) {
242
16.6M
  scoped_refptr<IndexChunk> chunk;
243
16.6M
  RETURN_NOT_OK(GetChunkForIndex(entry.op_id.index,
244
16.6M
                                 true /* create if not found */,
245
16.6M
                                 &chunk));
246
16.6M
  int index_in_chunk = entry.op_id.index % kEntriesPerIndexChunk;
247
248
16.6M
  PhysicalEntry phys;
249
16.6M
  phys.term = entry.op_id.term;
250
16.6M
  phys.segment_sequence_number = entry.segment_sequence_number;
251
16.6M
  phys.offset_in_segment = entry.offset_in_segment;
252
16.6M
  std::lock_guard<simple_spinlock> l(open_chunks_lock_);
253
16.6M
  chunk->SetEntry(index_in_chunk, phys);
254
16.6M
  VLOG
(3) << "Added log index entry " << entry.ToString()7.52k
;
255
256
16.6M
  return Status::OK();
257
16.6M
}
258
259
49.1M
Status LogIndex::GetEntry(int64_t index, LogIndexEntry* entry) {
260
49.1M
  scoped_refptr<IndexChunk> chunk;
261
49.1M
  RETURN_NOT_OK(GetChunkForIndex(index, false /* do not create */, &chunk));
262
49.1M
  int index_in_chunk = index % kEntriesPerIndexChunk;
263
49.1M
  PhysicalEntry phys;
264
49.1M
  std::lock_guard<simple_spinlock> l(open_chunks_lock_);
265
49.1M
  chunk->GetEntry(index_in_chunk, &phys);
266
267
  // We never write any real entries to offset 0, because there's a header
268
  // in each log segment. So, this indicates an entry that was never written.
269
49.1M
  if (phys.offset_in_segment == 0) {
270
3.45M
    return STATUS(NotFound, "entry not found");
271
3.45M
  }
272
273
45.6M
  entry->op_id = yb::OpId(phys.term, index);
274
45.6M
  entry->segment_sequence_number = phys.segment_sequence_number;
275
45.6M
  entry->offset_in_segment = phys.offset_in_segment;
276
277
45.6M
  return Status::OK();
278
49.1M
}
279
280
94
void LogIndex::GC(int64_t min_index_to_retain) {
281
94
  auto min_chunk_to_retain = min_index_to_retain / kEntriesPerIndexChunk;
282
283
  // Enumerate which chunks to delete.
284
94
  vector<int64_t> chunks_to_delete;
285
94
  {
286
94
    std::lock_guard<simple_spinlock> l(open_chunks_lock_);
287
94
    for (auto it = open_chunks_.begin();
288
94
         it != open_chunks_.lower_bound(min_chunk_to_retain); 
++it0
) {
289
0
      chunks_to_delete.push_back(it->first);
290
0
    }
291
94
  }
292
293
  // Outside of the lock, try to delete them (avoid holding the lock during IO).
294
94
  for (int64_t chunk_idx : chunks_to_delete) {
295
0
    string path = GetChunkPath(chunk_idx);
296
0
    int rc = unlink(path.c_str());
297
0
    if (rc != 0) {
298
0
      PLOG(WARNING) << "Unable to delete index chunk " << path;
299
0
      continue;
300
0
    }
301
0
    LOG(INFO) << "Deleted log index segment " << path;
302
0
    {
303
0
      std::lock_guard<simple_spinlock> l(open_chunks_lock_);
304
0
      open_chunks_.erase(chunk_idx);
305
0
    }
306
0
  }
307
94
}
308
309
233
Status LogIndex::Flush() {
310
233
  std::vector<scoped_refptr<IndexChunk>> chunks_to_flush;
311
312
233
  {
313
233
    std::lock_guard<simple_spinlock> l(open_chunks_lock_);
314
233
    chunks_to_flush.reserve(open_chunks_.size());
315
233
    for (auto& it : open_chunks_) {
316
233
      chunks_to_flush.push_back(it.second);
317
233
    }
318
233
  }
319
320
233
  for (auto& chunk : chunks_to_flush) {
321
233
    RETURN_NOT_OK(chunk->Flush());
322
233
  }
323
324
233
  return Status::OK();
325
233
}
326
327
1
string LogIndexEntry::ToString() const {
328
1
  return Substitute("op_id=$0.$1 segment_sequence_number=$2 offset=$3",
329
1
                    op_id.term, op_id.index,
330
1
                    segment_sequence_number,
331
1
                    offset_in_segment);
332
1
}
333
334
} // namespace log
335
} // namespace yb