YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/rocksdb/util/file_reader_writer.cc
Line
Count
Source (jump to first uncovered line)
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under the BSD-style license found in the
3
//  LICENSE file in the root directory of this source tree. An additional grant
4
//  of patent rights can be found in the PATENTS file in the same directory.
5
//
6
// The following only applies to changes made to this file as part of YugaByte development.
7
//
8
// Portions Copyright (c) YugaByte, Inc.
9
//
10
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
11
// in compliance with the License.  You may obtain a copy of the License at
12
//
13
// http://www.apache.org/licenses/LICENSE-2.0
14
//
15
// Unless required by applicable law or agreed to in writing, software distributed under the License
16
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
17
// or implied.  See the License for the specific language governing permissions and limitations
18
// under the License.
19
//
20
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
21
// Use of this source code is governed by a BSD-style license that can be
22
// found in the LICENSE file. See the AUTHORS file for names of contributors.
23
24
#include "yb/rocksdb/util/file_reader_writer.h"
25
26
#include <algorithm>
27
28
#include "yb/rocksdb/port/port.h"
29
#include "yb/rocksdb/rate_limiter.h"
30
#include "yb/rocksdb/util/histogram.h"
31
#include "yb/rocksdb/util/stop_watch.h"
32
#include "yb/rocksdb/util/sync_point.h"
33
34
#include "yb/util/priority_thread_pool.h"
35
#include "yb/util/stats/iostats_context_imp.h"
36
#include "yb/util/status_log.h"
37
38
DEFINE_bool(allow_preempting_compactions, true,
39
            "Whether a compaction may be preempted in favor of another compaction with higher "
40
            "priority");
41
42
DEFINE_int32(rocksdb_file_starting_buffer_size, 8192,
43
             "Starting buffer size for writable files, grows by 2x every new allocation.");
44
45
namespace rocksdb {
46
47
396k
Status SequentialFileReader::Read(size_t n, Slice* result, uint8_t* scratch) {
48
396k
  Status s = file_->Read(n, result, scratch);
49
396k
  IOSTATS_ADD(bytes_read, result->size());
50
396k
  return s;
51
396k
}
52
53
12
Status SequentialFileReader::Skip(uint64_t n) { return file_->Skip(n); }
54
55
Status RandomAccessFileReader::Read(uint64_t offset, size_t n, Slice* result,
56
595k
                                    char* scratch) const {
57
595k
  Status s;
58
595k
  uint64_t elapsed = 0;
59
595k
  {
60
595k
    StopWatch sw(env_, stats_, hist_type_,
61
595k
                 (stats_ != nullptr) ? &elapsed : nullptr);
62
595k
    IOSTATS_TIMER_GUARD(read_nanos);
63
595k
    s = file_->Read(offset, n, result, scratch);
64
595k
    IOSTATS_ADD_IF_POSITIVE(bytes_read, result->size());
65
595k
  }
66
595k
  if (stats_ != nullptr && file_read_hist_ != nullptr) {
67
0
    file_read_hist_->Add(elapsed);
68
0
  }
69
595k
  return s;
70
595k
}
71
72
Status RandomAccessFileReader::ReadAndValidate(
73
3.98M
    uint64_t offset, size_t n, Slice* result, char* scratch, const yb::ReadValidator& validator) {
74
3.98M
  uint64_t elapsed = 0;
75
3.98M
  Status s;
76
3.98M
  {
77
3.98M
    StopWatch sw(env_, stats_, hist_type_,
78
3.69M
                 (stats_ != nullptr) ? &elapsed : nullptr);
79
3.98M
    IOSTATS_TIMER_GUARD(read_nanos);
80
3.98M
    s = file_->ReadAndValidate(offset, n, result, scratch, validator);
81
3.98M
    IOSTATS_ADD_IF_POSITIVE(bytes_read, result->size());
82
3.98M
  }
83
3.98M
  if (stats_ != nullptr && file_read_hist_ != nullptr) {
84
282k
    file_read_hist_->Add(elapsed);
85
282k
  }
86
3.98M
  return s;
87
3.98M
}
88
89
1.07M
WritableFileWriter::~WritableFileWriter() {
90
1.07M
  WARN_NOT_OK(Close(), "Failed to close file");
91
1.07M
}
92
93
48.3M
Status WritableFileWriter::Append(const Slice& data) {
94
48.3M
  const char* src = data.cdata();
95
48.3M
  size_t left = data.size();
96
48.3M
  Status s;
97
48.3M
  pending_sync_ = true;
98
48.3M
  pending_fsync_ = true;
99
100
48.3M
  TEST_KILL_RANDOM("WritableFileWriter::Append:0",
101
48.3M
                   rocksdb_kill_odds * REDUCE_ODDS2);
102
103
48.3M
  {
104
48.3M
    IOSTATS_TIMER_GUARD(prepare_write_nanos);
105
48.3M
    TEST_SYNC_POINT("WritableFileWriter::Append:BeforePrepareWrite");
106
48.3M
    writable_file_->PrepareWrite(static_cast<size_t>(GetFileSize()), left);
107
48.3M
  }
108
109
  // Flush only when I/O is buffered
110
48.3M
  if (use_os_buffer_ &&
111
48.3M
    (buf_.Capacity() - buf_.CurrentSize()) < left) {
112
13.8k
    if (buf_.CurrentSize() > 0) {
113
9.37k
      s = Flush();
114
9.37k
      if (!s.ok()) {
115
0
        return s;
116
0
      }
117
13.8k
    }
118
119
13.8k
    if (buf_.Capacity() < max_buffer_size_) {
120
13.8k
      size_t desiredCapacity = buf_.Capacity() * 2;
121
13.8k
      desiredCapacity = std::min(desiredCapacity, max_buffer_size_);
122
13.8k
      buf_.AllocateNewBuffer(desiredCapacity);
123
13.8k
    }
124
13.8k
    assert(buf_.CurrentSize() == 0);
125
13.8k
  }
126
127
  // We never write directly to disk with unbuffered I/O on.
128
  // or we simply use it for its original purpose to accumulate many small
129
  // chunks
130
48.3M
  if (!use_os_buffer_ || (buf_.Capacity() >= left)) {
131
96.5M
    while (left > 0) {
132
48.2M
      size_t appended = buf_.Append(src, left);
133
48.2M
      left -= appended;
134
48.2M
      src += appended;
135
136
48.2M
      if (left > 0) {
137
0
        s = Flush();
138
0
        if (!s.ok()) {
139
0
          break;
140
0
        }
141
142
        // We double the buffer here because
143
        // Flush calls do not keep up with the incoming bytes
144
        // This is the only place when buffer is changed with unbuffered I/O
145
0
        if (buf_.Capacity() < max_buffer_size_) {
146
0
          size_t desiredCapacity = buf_.Capacity() * 2;
147
0
          desiredCapacity = std::min(desiredCapacity, max_buffer_size_);
148
0
          buf_.AllocateNewBuffer(desiredCapacity);
149
0
        }
150
0
      }
151
48.2M
    }
152
18.4E
  } else {
153
    // Writing directly to file bypassing the buffer
154
18.4E
    assert(buf_.CurrentSize() == 0);
155
18.4E
    s = WriteBuffered(src, left);
156
18.4E
  }
157
158
48.3M
  TEST_KILL_RANDOM("WritableFileWriter::Append:1", rocksdb_kill_odds);
159
48.3M
  if (s.ok()) {
160
48.3M
    filesize_ += data.size();
161
48.3M
  }
162
48.3M
  return s;
163
48.3M
}
164
165
1.19M
Status WritableFileWriter::Close() {
166
167
  // Do not quit immediately on failure the file MUST be closed
168
1.19M
  Status s;
169
170
  // Possible to close it twice now as we MUST close
171
  // in __dtor, simply flushing is not enough
172
  // Windows when pre-allocating does not fill with zeros
173
  // also with unbuffered access we also set the end of data.
174
1.19M
  if (!writable_file_) {
175
117k
    return s;
176
117k
  }
177
178
1.07M
  s = Flush();  // flush cache to OS
179
180
  // In unbuffered mode we write whole pages so
181
  // we need to let the file know where data ends.
182
1.07M
  Status interim = writable_file_->Truncate(filesize_);
183
1.07M
  if (!interim.ok() && s.ok()) {
184
0
    s = interim;
185
0
  }
186
187
1.07M
  TEST_KILL_RANDOM("WritableFileWriter::Close:0", rocksdb_kill_odds);
188
1.07M
  interim = writable_file_->Close();
189
1.07M
  if (!interim.ok() && s.ok()) {
190
0
    s = interim;
191
0
  }
192
193
1.07M
  writable_file_.reset();
194
1.07M
  TEST_KILL_RANDOM("WritableFileWriter::Close:1", rocksdb_kill_odds);
195
196
1.07M
  return s;
197
1.07M
}
198
199
// write out the cached data to the OS cache
200
24.4M
Status WritableFileWriter::Flush() {
201
24.4M
  Status s;
202
24.4M
  TEST_KILL_RANDOM("WritableFileWriter::Flush:0",
203
24.4M
                   rocksdb_kill_odds * REDUCE_ODDS2);
204
205
24.4M
  if (buf_.CurrentSize() > 0) {
206
23.2M
    if (use_os_buffer_) {
207
23.2M
      s = WriteBuffered(buf_.BufferStart(), buf_.CurrentSize());
208
4
    } else {
209
4
      s = WriteUnbuffered();
210
4
    }
211
23.2M
    if (!s.ok()) {
212
15
      return s;
213
15
    }
214
24.4M
  }
215
216
24.4M
  s = writable_file_->Flush();
217
218
24.4M
  if (!s.ok()) {
219
0
    return s;
220
0
  }
221
222
  // sync OS cache to disk for every bytes_per_sync_
223
  // TODO: give log file and sst file different options (log
224
  // files could be potentially cached in OS for their whole
225
  // life time, thus we might not want to flush at all).
226
227
  // We try to avoid sync to the last 1MB of data. For two reasons:
228
  // (1) avoid rewrite the same page that is modified later.
229
  // (2) for older version of OS, write can block while writing out
230
  //     the page.
231
  // Xfs does neighbor page flushing outside of the specified ranges. We
232
  // need to make sure sync range is far from the write offset.
233
24.4M
  if (!direct_io_ && bytes_per_sync_) {
234
2.04M
    const uint64_t kBytesNotSyncRange = 1024 * 1024;  // recent 1MB is not synced.
235
2.04M
    const uint64_t kBytesAlignWhenSync = 4 * 1024;    // Align 4KB.
236
2.04M
    if (filesize_ > kBytesNotSyncRange) {
237
78.7k
      uint64_t offset_sync_to = filesize_ - kBytesNotSyncRange;
238
78.7k
      offset_sync_to -= offset_sync_to % kBytesAlignWhenSync;
239
78.7k
      assert(offset_sync_to >= last_sync_size_);
240
78.7k
      if (offset_sync_to > 0 &&
241
78.5k
          offset_sync_to - last_sync_size_ >= bytes_per_sync_) {
242
421
        s = RangeSync(last_sync_size_, offset_sync_to - last_sync_size_);
243
421
        last_sync_size_ = offset_sync_to;
244
421
      }
245
78.7k
    }
246
2.04M
  }
247
248
24.4M
  return s;
249
24.4M
}
250
251
178k
Status WritableFileWriter::Sync(bool use_fsync) {
252
178k
  Status s = Flush();
253
178k
  if (!s.ok()) {
254
0
    return s;
255
0
  }
256
178k
  TEST_KILL_RANDOM("WritableFileWriter::Sync:0", rocksdb_kill_odds);
257
178k
  if (!direct_io_ && pending_sync_) {
258
178k
    s = SyncInternal(use_fsync);
259
178k
    if (!s.ok()) {
260
1
      return s;
261
1
    }
262
178k
  }
263
178k
  TEST_KILL_RANDOM("WritableFileWriter::Sync:1", rocksdb_kill_odds);
264
178k
  pending_sync_ = false;
265
178k
  if (use_fsync) {
266
0
    pending_fsync_ = false;
267
0
  }
268
178k
  return Status::OK();
269
178k
}
270
271
8
Status WritableFileWriter::SyncWithoutFlush(bool use_fsync) {
272
8
  if (!writable_file_->IsSyncThreadSafe()) {
273
0
    return STATUS(NotSupported,
274
0
      "Can't WritableFileWriter::SyncWithoutFlush() because "
275
0
      "WritableFile::IsSyncThreadSafe() is false");
276
0
  }
277
8
  TEST_SYNC_POINT("WritableFileWriter::SyncWithoutFlush:1");
278
8
  Status s = SyncInternal(use_fsync);
279
8
  TEST_SYNC_POINT("WritableFileWriter::SyncWithoutFlush:2");
280
8
  return s;
281
8
}
282
283
9.90k
Status WritableFileWriter::InvalidateCache(size_t offset, size_t length) {
284
9.90k
  return writable_file_->InvalidateCache(offset, length);
285
9.90k
}
286
287
178k
Status WritableFileWriter::SyncInternal(bool use_fsync) {
288
178k
  Status s;
289
178k
  IOSTATS_TIMER_GUARD(fsync_nanos);
290
178k
  TEST_SYNC_POINT("WritableFileWriter::SyncInternal:0");
291
178k
  if (use_fsync) {
292
0
    s = writable_file_->Fsync();
293
178k
  } else {
294
178k
    s = writable_file_->Sync();
295
178k
  }
296
178k
  return s;
297
178k
}
298
299
421
Status WritableFileWriter::RangeSync(uint64_t offset, uint64_t nbytes) {
300
421
  IOSTATS_TIMER_GUARD(range_sync_nanos);
301
421
  TEST_SYNC_POINT("WritableFileWriter::RangeSync:0");
302
421
  return writable_file_->RangeSync(offset, nbytes);
303
421
}
304
305
23.2M
size_t WritableFileWriter::RequestToken(size_t bytes, bool align) {
306
23.2M
  if (suspender_ && FLAGS_allow_preempting_compactions) {
307
92.3k
    suspender_->PauseIfNecessary();
308
92.3k
  }
309
23.2M
  Env::IOPriority io_priority;
310
23.2M
  if (rate_limiter_ && (io_priority = writable_file_->GetIOPriority()) <
311
397k
      Env::IO_TOTAL) {
312
397k
    bytes = std::min(
313
397k
      bytes, static_cast<size_t>(rate_limiter_->GetSingleBurstBytes()));
314
315
397k
    if (align) {
316
      // Here we may actually require more than burst and block
317
      // but we can not write less than one page at a time on unbuffered
318
      // thus we may want not to use ratelimiter s
319
0
      size_t alignment = buf_.Alignment();
320
0
      bytes = std::max(alignment, TruncateToPageBoundary(alignment, bytes));
321
0
    }
322
397k
    rate_limiter_->Request(bytes, io_priority);
323
397k
  }
324
23.2M
  return bytes;
325
23.2M
}
326
327
// This method writes to disk the specified data and makes use of the rate
328
// limiter if available
329
23.2M
Status WritableFileWriter::WriteBuffered(const char* data, size_t size) {
330
23.2M
  Status s;
331
23.2M
  assert(use_os_buffer_);
332
23.2M
  const char* src = data;
333
23.2M
  size_t left = size;
334
335
46.5M
  while (left > 0) {
336
23.2M
    size_t allowed = RequestToken(left, false);
337
338
23.2M
    {
339
23.2M
      IOSTATS_TIMER_GUARD(write_nanos);
340
23.2M
      TEST_SYNC_POINT("WritableFileWriter::Flush:BeforeAppend");
341
23.2M
      s = writable_file_->Append(Slice(src, allowed));
342
23.2M
      if (!s.ok()) {
343
15
        return s;
344
15
      }
345
23.2M
    }
346
347
23.2M
    IOSTATS_ADD(bytes_written, allowed);
348
23.2M
    TEST_KILL_RANDOM("WritableFileWriter::WriteBuffered:0", rocksdb_kill_odds);
349
350
23.2M
    left -= allowed;
351
23.2M
    src += allowed;
352
23.2M
  }
353
23.2M
  buf_.Size(0);
354
23.2M
  return s;
355
23.2M
}
356
357
358
// This flushes the accumulated data in the buffer. We pad data with zeros if
359
// necessary to the whole page.
360
// However, during automatic flushes padding would not be necessary.
361
// We always use RateLimiter if available. We move (Refit) any buffer bytes
362
// that are left over the
363
// whole number of pages to be written again on the next flush because we can
364
// only write on aligned
365
// offsets.
366
0
Status WritableFileWriter::WriteUnbuffered() {
367
0
  Status s;
368
369
0
  assert(!use_os_buffer_);
370
0
  const size_t alignment = buf_.Alignment();
371
0
  assert((next_write_offset_ % alignment) == 0);
372
373
  // Calculate whole page final file advance if all writes succeed
374
0
  size_t file_advance =
375
0
    TruncateToPageBoundary(alignment, buf_.CurrentSize());
376
377
  // Calculate the leftover tail, we write it here padded with zeros BUT we
378
  // will write
379
  // it again in the future either on Close() OR when the current whole page
380
  // fills out
381
0
  size_t leftover_tail = buf_.CurrentSize() - file_advance;
382
383
  // Round up and pad
384
0
  buf_.PadToAlignmentWith(0);
385
386
0
  const char* src = buf_.BufferStart();
387
0
  uint64_t write_offset = next_write_offset_;
388
0
  size_t left = buf_.CurrentSize();
389
390
0
  while (left > 0) {
391
    // Check how much is allowed
392
0
    size_t size = RequestToken(left, true);
393
394
0
    {
395
0
      IOSTATS_TIMER_GUARD(write_nanos);
396
0
      TEST_SYNC_POINT("WritableFileWriter::Flush:BeforeAppend");
397
      // Unbuffered writes must be positional
398
0
      s = writable_file_->PositionedAppend(Slice(src, size), write_offset);
399
0
      if (!s.ok()) {
400
0
        buf_.Size(file_advance + leftover_tail);
401
0
        return s;
402
0
      }
403
0
    }
404
405
0
    IOSTATS_ADD(bytes_written, size);
406
0
    left -= size;
407
0
    src += size;
408
0
    write_offset += size;
409
0
    assert((next_write_offset_ % alignment) == 0);
410
0
  }
411
412
0
  if (s.ok()) {
413
    // Move the tail to the beginning of the buffer
414
    // This never happens during normal Append but rather during
415
    // explicit call to Flush()/Sync() or Close()
416
0
    buf_.RefitTail(file_advance, leftover_tail);
417
    // This is where we start writing next time which may or not be
418
    // the actual file size on disk. They match if the buffer size
419
    // is a multiple of whole pages otherwise filesize_ is leftover_tail
420
    // behind
421
0
    next_write_offset_ += file_advance;
422
0
  }
423
0
  return s;
424
0
}
425
426
427
namespace {
428
429
class ReadaheadRandomAccessFile : public yb::RandomAccessFileWrapper {
430
 public:
431
  ReadaheadRandomAccessFile(std::unique_ptr<RandomAccessFile>&& file,
432
                            size_t readahead_size)
433
      : RandomAccessFileWrapper(std::move(file)),
434
        readahead_size_(readahead_size),
435
        forward_calls_(ShouldForwardRawRequest()),
436
        buffer_(),
437
        buffer_offset_(0),
438
922
        buffer_len_(0) {
439
922
    if (!forward_calls_) {
440
922
      buffer_.reset(new uint8_t[readahead_size_]);
441
0
    } else if (readahead_size_ > 0) {
442
0
      EnableReadAhead();
443
0
    }
444
922
  }
445
446
  ReadaheadRandomAccessFile(const ReadaheadRandomAccessFile&) = delete;
447
448
  ReadaheadRandomAccessFile& operator=(const ReadaheadRandomAccessFile&) = delete;
449
450
4.13k
  CHECKED_STATUS Read(uint64_t offset, size_t n, Slice* result, uint8_t* scratch) const override {
451
4.13k
    if (n >= readahead_size_) {
452
0
      return RandomAccessFileWrapper::Read(offset, n, result, scratch);
453
0
    }
454
455
    // On Windows in unbuffered mode this will lead to double buffering
456
    // and double locking so we avoid that.
457
    // In normal mode Windows caches so much data from disk that we do
458
    // not need readahead.
459
4.13k
    if (forward_calls_) {
460
0
      return RandomAccessFileWrapper::Read(offset, n, result, scratch);
461
0
    }
462
463
4.13k
    std::unique_lock<std::mutex> lk(lock_);
464
465
4.13k
    size_t copied = 0;
466
    // if offset between [buffer_offset_, buffer_offset_ + buffer_len>
467
4.13k
    if (offset >= buffer_offset_ && offset < buffer_len_ + buffer_offset_) {
468
1.37k
      uint64_t offset_in_buffer = offset - buffer_offset_;
469
1.37k
      copied = std::min(buffer_len_ - static_cast<size_t>(offset_in_buffer), n);
470
1.37k
      memcpy(scratch, buffer_.get() + offset_in_buffer, copied);
471
1.37k
      if (copied == n) {
472
        // fully cached
473
1.37k
        *result = Slice(scratch, n);
474
1.37k
        return Status::OK();
475
1.37k
      }
476
2.76k
    }
477
2.76k
    Slice readahead_result;
478
2.76k
    Status s = RandomAccessFileWrapper::Read(offset + copied, readahead_size_, &readahead_result,
479
2.76k
      buffer_.get());
480
2.76k
    if (!s.ok()) {
481
0
      return s;
482
0
    }
483
484
2.76k
    auto left_to_copy = std::min(readahead_result.size(), n - copied);
485
2.76k
    memcpy(scratch + copied, readahead_result.data(), left_to_copy);
486
2.76k
    *result = Slice(scratch, copied + left_to_copy);
487
488
2.76k
    if (readahead_result.data() == buffer_.get()) {
489
2.76k
      buffer_offset_ = offset + copied;
490
2.76k
      buffer_len_ = readahead_result.size();
491
0
    } else {
492
0
      buffer_len_ = 0;
493
0
    }
494
495
2.76k
    return Status::OK();
496
2.76k
  }
497
498
 private:
499
  size_t               readahead_size_;
500
  const bool           forward_calls_;
501
502
  mutable std::mutex   lock_;
503
  mutable std::unique_ptr<uint8_t[]> buffer_;
504
  mutable uint64_t     buffer_offset_;
505
  mutable size_t       buffer_len_;
506
};
507
}  // namespace
508
509
std::unique_ptr<RandomAccessFile> NewReadaheadRandomAccessFile(
510
922
    std::unique_ptr<RandomAccessFile>&& file, size_t readahead_size) {
511
922
  std::unique_ptr<RandomAccessFile> result(
512
922
    new ReadaheadRandomAccessFile(std::move(file), readahead_size));
513
922
  return result;
514
922
}
515
516
Status NewWritableFile(Env* env, const std::string& fname,
517
                       unique_ptr<WritableFile>* result,
518
1.04M
                       const EnvOptions& options) {
519
1.04M
  Status s = env->NewWritableFile(fname, result, options);
520
1.04M
  TEST_KILL_RANDOM("NewWritableFile:0", rocksdb_kill_odds * REDUCE_ODDS2);
521
1.04M
  return s;
522
1.04M
}
523
524
}  // namespace rocksdb