/Users/deen/code/yugabyte-db/src/yb/rocksdb/util/file_reader_writer.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | | // This source code is licensed under the BSD-style license found in the |
3 | | // LICENSE file in the root directory of this source tree. An additional grant |
4 | | // of patent rights can be found in the PATENTS file in the same directory. |
5 | | // |
6 | | // The following only applies to changes made to this file as part of YugaByte development. |
7 | | // |
8 | | // Portions Copyright (c) YugaByte, Inc. |
9 | | // |
10 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
11 | | // in compliance with the License. You may obtain a copy of the License at |
12 | | // |
13 | | // http://www.apache.org/licenses/LICENSE-2.0 |
14 | | // |
15 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
16 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
17 | | // or implied. See the License for the specific language governing permissions and limitations |
18 | | // under the License. |
19 | | // |
20 | | // Copyright (c) 2011 The LevelDB Authors. All rights reserved. |
21 | | // Use of this source code is governed by a BSD-style license that can be |
22 | | // found in the LICENSE file. See the AUTHORS file for names of contributors. |
23 | | |
24 | | #include "yb/rocksdb/util/file_reader_writer.h" |
25 | | |
26 | | #include <algorithm> |
27 | | |
28 | | #include "yb/rocksdb/port/port.h" |
29 | | #include "yb/rocksdb/rate_limiter.h" |
30 | | #include "yb/rocksdb/util/histogram.h" |
31 | | #include "yb/rocksdb/util/stop_watch.h" |
32 | | #include "yb/rocksdb/util/sync_point.h" |
33 | | |
34 | | #include "yb/util/priority_thread_pool.h" |
35 | | #include "yb/util/stats/iostats_context_imp.h" |
36 | | #include "yb/util/status_log.h" |
37 | | |
38 | | DEFINE_bool(allow_preempting_compactions, true, |
39 | | "Whether a compaction may be preempted in favor of another compaction with higher " |
40 | | "priority"); |
41 | | |
42 | | DEFINE_int32(rocksdb_file_starting_buffer_size, 8192, |
43 | | "Starting buffer size for writable files, grows by 2x every new allocation."); |
44 | | |
45 | | namespace rocksdb { |
46 | | |
47 | 396k | Status SequentialFileReader::Read(size_t n, Slice* result, uint8_t* scratch) { |
48 | 396k | Status s = file_->Read(n, result, scratch); |
49 | 396k | IOSTATS_ADD(bytes_read, result->size()); |
50 | 396k | return s; |
51 | 396k | } |
52 | | |
53 | 12 | Status SequentialFileReader::Skip(uint64_t n) { return file_->Skip(n); } |
54 | | |
55 | | Status RandomAccessFileReader::Read(uint64_t offset, size_t n, Slice* result, |
56 | 595k | char* scratch) const { |
57 | 595k | Status s; |
58 | 595k | uint64_t elapsed = 0; |
59 | 595k | { |
60 | 595k | StopWatch sw(env_, stats_, hist_type_, |
61 | 595k | (stats_ != nullptr) ? &elapsed : nullptr); |
62 | 595k | IOSTATS_TIMER_GUARD(read_nanos); |
63 | 595k | s = file_->Read(offset, n, result, scratch); |
64 | 595k | IOSTATS_ADD_IF_POSITIVE(bytes_read, result->size()); |
65 | 595k | } |
66 | 595k | if (stats_ != nullptr && file_read_hist_ != nullptr) { |
67 | 0 | file_read_hist_->Add(elapsed); |
68 | 0 | } |
69 | 595k | return s; |
70 | 595k | } |
71 | | |
72 | | Status RandomAccessFileReader::ReadAndValidate( |
73 | 3.98M | uint64_t offset, size_t n, Slice* result, char* scratch, const yb::ReadValidator& validator) { |
74 | 3.98M | uint64_t elapsed = 0; |
75 | 3.98M | Status s; |
76 | 3.98M | { |
77 | 3.98M | StopWatch sw(env_, stats_, hist_type_, |
78 | 3.69M | (stats_ != nullptr) ? &elapsed : nullptr); |
79 | 3.98M | IOSTATS_TIMER_GUARD(read_nanos); |
80 | 3.98M | s = file_->ReadAndValidate(offset, n, result, scratch, validator); |
81 | 3.98M | IOSTATS_ADD_IF_POSITIVE(bytes_read, result->size()); |
82 | 3.98M | } |
83 | 3.98M | if (stats_ != nullptr && file_read_hist_ != nullptr) { |
84 | 282k | file_read_hist_->Add(elapsed); |
85 | 282k | } |
86 | 3.98M | return s; |
87 | 3.98M | } |
88 | | |
89 | 1.07M | WritableFileWriter::~WritableFileWriter() { |
90 | 1.07M | WARN_NOT_OK(Close(), "Failed to close file"); |
91 | 1.07M | } |
92 | | |
93 | 48.3M | Status WritableFileWriter::Append(const Slice& data) { |
94 | 48.3M | const char* src = data.cdata(); |
95 | 48.3M | size_t left = data.size(); |
96 | 48.3M | Status s; |
97 | 48.3M | pending_sync_ = true; |
98 | 48.3M | pending_fsync_ = true; |
99 | | |
100 | 48.3M | TEST_KILL_RANDOM("WritableFileWriter::Append:0", |
101 | 48.3M | rocksdb_kill_odds * REDUCE_ODDS2); |
102 | | |
103 | 48.3M | { |
104 | 48.3M | IOSTATS_TIMER_GUARD(prepare_write_nanos); |
105 | 48.3M | TEST_SYNC_POINT("WritableFileWriter::Append:BeforePrepareWrite"); |
106 | 48.3M | writable_file_->PrepareWrite(static_cast<size_t>(GetFileSize()), left); |
107 | 48.3M | } |
108 | | |
109 | | // Flush only when I/O is buffered |
110 | 48.3M | if (use_os_buffer_ && |
111 | 48.3M | (buf_.Capacity() - buf_.CurrentSize()) < left) { |
112 | 13.8k | if (buf_.CurrentSize() > 0) { |
113 | 9.37k | s = Flush(); |
114 | 9.37k | if (!s.ok()) { |
115 | 0 | return s; |
116 | 0 | } |
117 | 13.8k | } |
118 | | |
119 | 13.8k | if (buf_.Capacity() < max_buffer_size_) { |
120 | 13.8k | size_t desiredCapacity = buf_.Capacity() * 2; |
121 | 13.8k | desiredCapacity = std::min(desiredCapacity, max_buffer_size_); |
122 | 13.8k | buf_.AllocateNewBuffer(desiredCapacity); |
123 | 13.8k | } |
124 | 13.8k | assert(buf_.CurrentSize() == 0); |
125 | 13.8k | } |
126 | | |
127 | | // We never write directly to disk with unbuffered I/O on. |
128 | | // or we simply use it for its original purpose to accumulate many small |
129 | | // chunks |
130 | 48.3M | if (!use_os_buffer_ || (buf_.Capacity() >= left)) { |
131 | 96.5M | while (left > 0) { |
132 | 48.2M | size_t appended = buf_.Append(src, left); |
133 | 48.2M | left -= appended; |
134 | 48.2M | src += appended; |
135 | | |
136 | 48.2M | if (left > 0) { |
137 | 0 | s = Flush(); |
138 | 0 | if (!s.ok()) { |
139 | 0 | break; |
140 | 0 | } |
141 | | |
142 | | // We double the buffer here because |
143 | | // Flush calls do not keep up with the incoming bytes |
144 | | // This is the only place when buffer is changed with unbuffered I/O |
145 | 0 | if (buf_.Capacity() < max_buffer_size_) { |
146 | 0 | size_t desiredCapacity = buf_.Capacity() * 2; |
147 | 0 | desiredCapacity = std::min(desiredCapacity, max_buffer_size_); |
148 | 0 | buf_.AllocateNewBuffer(desiredCapacity); |
149 | 0 | } |
150 | 0 | } |
151 | 48.2M | } |
152 | 18.4E | } else { |
153 | | // Writing directly to file bypassing the buffer |
154 | 18.4E | assert(buf_.CurrentSize() == 0); |
155 | 18.4E | s = WriteBuffered(src, left); |
156 | 18.4E | } |
157 | | |
158 | 48.3M | TEST_KILL_RANDOM("WritableFileWriter::Append:1", rocksdb_kill_odds); |
159 | 48.3M | if (s.ok()) { |
160 | 48.3M | filesize_ += data.size(); |
161 | 48.3M | } |
162 | 48.3M | return s; |
163 | 48.3M | } |
164 | | |
165 | 1.19M | Status WritableFileWriter::Close() { |
166 | | |
167 | | // Do not quit immediately on failure the file MUST be closed |
168 | 1.19M | Status s; |
169 | | |
170 | | // Possible to close it twice now as we MUST close |
171 | | // in __dtor, simply flushing is not enough |
172 | | // Windows when pre-allocating does not fill with zeros |
173 | | // also with unbuffered access we also set the end of data. |
174 | 1.19M | if (!writable_file_) { |
175 | 117k | return s; |
176 | 117k | } |
177 | | |
178 | 1.07M | s = Flush(); // flush cache to OS |
179 | | |
180 | | // In unbuffered mode we write whole pages so |
181 | | // we need to let the file know where data ends. |
182 | 1.07M | Status interim = writable_file_->Truncate(filesize_); |
183 | 1.07M | if (!interim.ok() && s.ok()) { |
184 | 0 | s = interim; |
185 | 0 | } |
186 | | |
187 | 1.07M | TEST_KILL_RANDOM("WritableFileWriter::Close:0", rocksdb_kill_odds); |
188 | 1.07M | interim = writable_file_->Close(); |
189 | 1.07M | if (!interim.ok() && s.ok()) { |
190 | 0 | s = interim; |
191 | 0 | } |
192 | | |
193 | 1.07M | writable_file_.reset(); |
194 | 1.07M | TEST_KILL_RANDOM("WritableFileWriter::Close:1", rocksdb_kill_odds); |
195 | | |
196 | 1.07M | return s; |
197 | 1.07M | } |
198 | | |
199 | | // write out the cached data to the OS cache |
200 | 24.4M | Status WritableFileWriter::Flush() { |
201 | 24.4M | Status s; |
202 | 24.4M | TEST_KILL_RANDOM("WritableFileWriter::Flush:0", |
203 | 24.4M | rocksdb_kill_odds * REDUCE_ODDS2); |
204 | | |
205 | 24.4M | if (buf_.CurrentSize() > 0) { |
206 | 23.2M | if (use_os_buffer_) { |
207 | 23.2M | s = WriteBuffered(buf_.BufferStart(), buf_.CurrentSize()); |
208 | 4 | } else { |
209 | 4 | s = WriteUnbuffered(); |
210 | 4 | } |
211 | 23.2M | if (!s.ok()) { |
212 | 15 | return s; |
213 | 15 | } |
214 | 24.4M | } |
215 | | |
216 | 24.4M | s = writable_file_->Flush(); |
217 | | |
218 | 24.4M | if (!s.ok()) { |
219 | 0 | return s; |
220 | 0 | } |
221 | | |
222 | | // sync OS cache to disk for every bytes_per_sync_ |
223 | | // TODO: give log file and sst file different options (log |
224 | | // files could be potentially cached in OS for their whole |
225 | | // life time, thus we might not want to flush at all). |
226 | | |
227 | | // We try to avoid sync to the last 1MB of data. For two reasons: |
228 | | // (1) avoid rewrite the same page that is modified later. |
229 | | // (2) for older version of OS, write can block while writing out |
230 | | // the page. |
231 | | // Xfs does neighbor page flushing outside of the specified ranges. We |
232 | | // need to make sure sync range is far from the write offset. |
233 | 24.4M | if (!direct_io_ && bytes_per_sync_) { |
234 | 2.04M | const uint64_t kBytesNotSyncRange = 1024 * 1024; // recent 1MB is not synced. |
235 | 2.04M | const uint64_t kBytesAlignWhenSync = 4 * 1024; // Align 4KB. |
236 | 2.04M | if (filesize_ > kBytesNotSyncRange) { |
237 | 78.7k | uint64_t offset_sync_to = filesize_ - kBytesNotSyncRange; |
238 | 78.7k | offset_sync_to -= offset_sync_to % kBytesAlignWhenSync; |
239 | 78.7k | assert(offset_sync_to >= last_sync_size_); |
240 | 78.7k | if (offset_sync_to > 0 && |
241 | 78.5k | offset_sync_to - last_sync_size_ >= bytes_per_sync_) { |
242 | 421 | s = RangeSync(last_sync_size_, offset_sync_to - last_sync_size_); |
243 | 421 | last_sync_size_ = offset_sync_to; |
244 | 421 | } |
245 | 78.7k | } |
246 | 2.04M | } |
247 | | |
248 | 24.4M | return s; |
249 | 24.4M | } |
250 | | |
251 | 178k | Status WritableFileWriter::Sync(bool use_fsync) { |
252 | 178k | Status s = Flush(); |
253 | 178k | if (!s.ok()) { |
254 | 0 | return s; |
255 | 0 | } |
256 | 178k | TEST_KILL_RANDOM("WritableFileWriter::Sync:0", rocksdb_kill_odds); |
257 | 178k | if (!direct_io_ && pending_sync_) { |
258 | 178k | s = SyncInternal(use_fsync); |
259 | 178k | if (!s.ok()) { |
260 | 1 | return s; |
261 | 1 | } |
262 | 178k | } |
263 | 178k | TEST_KILL_RANDOM("WritableFileWriter::Sync:1", rocksdb_kill_odds); |
264 | 178k | pending_sync_ = false; |
265 | 178k | if (use_fsync) { |
266 | 0 | pending_fsync_ = false; |
267 | 0 | } |
268 | 178k | return Status::OK(); |
269 | 178k | } |
270 | | |
271 | 8 | Status WritableFileWriter::SyncWithoutFlush(bool use_fsync) { |
272 | 8 | if (!writable_file_->IsSyncThreadSafe()) { |
273 | 0 | return STATUS(NotSupported, |
274 | 0 | "Can't WritableFileWriter::SyncWithoutFlush() because " |
275 | 0 | "WritableFile::IsSyncThreadSafe() is false"); |
276 | 0 | } |
277 | 8 | TEST_SYNC_POINT("WritableFileWriter::SyncWithoutFlush:1"); |
278 | 8 | Status s = SyncInternal(use_fsync); |
279 | 8 | TEST_SYNC_POINT("WritableFileWriter::SyncWithoutFlush:2"); |
280 | 8 | return s; |
281 | 8 | } |
282 | | |
283 | 9.90k | Status WritableFileWriter::InvalidateCache(size_t offset, size_t length) { |
284 | 9.90k | return writable_file_->InvalidateCache(offset, length); |
285 | 9.90k | } |
286 | | |
287 | 178k | Status WritableFileWriter::SyncInternal(bool use_fsync) { |
288 | 178k | Status s; |
289 | 178k | IOSTATS_TIMER_GUARD(fsync_nanos); |
290 | 178k | TEST_SYNC_POINT("WritableFileWriter::SyncInternal:0"); |
291 | 178k | if (use_fsync) { |
292 | 0 | s = writable_file_->Fsync(); |
293 | 178k | } else { |
294 | 178k | s = writable_file_->Sync(); |
295 | 178k | } |
296 | 178k | return s; |
297 | 178k | } |
298 | | |
299 | 421 | Status WritableFileWriter::RangeSync(uint64_t offset, uint64_t nbytes) { |
300 | 421 | IOSTATS_TIMER_GUARD(range_sync_nanos); |
301 | 421 | TEST_SYNC_POINT("WritableFileWriter::RangeSync:0"); |
302 | 421 | return writable_file_->RangeSync(offset, nbytes); |
303 | 421 | } |
304 | | |
305 | 23.2M | size_t WritableFileWriter::RequestToken(size_t bytes, bool align) { |
306 | 23.2M | if (suspender_ && FLAGS_allow_preempting_compactions) { |
307 | 92.3k | suspender_->PauseIfNecessary(); |
308 | 92.3k | } |
309 | 23.2M | Env::IOPriority io_priority; |
310 | 23.2M | if (rate_limiter_ && (io_priority = writable_file_->GetIOPriority()) < |
311 | 397k | Env::IO_TOTAL) { |
312 | 397k | bytes = std::min( |
313 | 397k | bytes, static_cast<size_t>(rate_limiter_->GetSingleBurstBytes())); |
314 | | |
315 | 397k | if (align) { |
316 | | // Here we may actually require more than burst and block |
317 | | // but we can not write less than one page at a time on unbuffered |
318 | | // thus we may want not to use ratelimiter s |
319 | 0 | size_t alignment = buf_.Alignment(); |
320 | 0 | bytes = std::max(alignment, TruncateToPageBoundary(alignment, bytes)); |
321 | 0 | } |
322 | 397k | rate_limiter_->Request(bytes, io_priority); |
323 | 397k | } |
324 | 23.2M | return bytes; |
325 | 23.2M | } |
326 | | |
327 | | // This method writes to disk the specified data and makes use of the rate |
328 | | // limiter if available |
329 | 23.2M | Status WritableFileWriter::WriteBuffered(const char* data, size_t size) { |
330 | 23.2M | Status s; |
331 | 23.2M | assert(use_os_buffer_); |
332 | 23.2M | const char* src = data; |
333 | 23.2M | size_t left = size; |
334 | | |
335 | 46.5M | while (left > 0) { |
336 | 23.2M | size_t allowed = RequestToken(left, false); |
337 | | |
338 | 23.2M | { |
339 | 23.2M | IOSTATS_TIMER_GUARD(write_nanos); |
340 | 23.2M | TEST_SYNC_POINT("WritableFileWriter::Flush:BeforeAppend"); |
341 | 23.2M | s = writable_file_->Append(Slice(src, allowed)); |
342 | 23.2M | if (!s.ok()) { |
343 | 15 | return s; |
344 | 15 | } |
345 | 23.2M | } |
346 | | |
347 | 23.2M | IOSTATS_ADD(bytes_written, allowed); |
348 | 23.2M | TEST_KILL_RANDOM("WritableFileWriter::WriteBuffered:0", rocksdb_kill_odds); |
349 | | |
350 | 23.2M | left -= allowed; |
351 | 23.2M | src += allowed; |
352 | 23.2M | } |
353 | 23.2M | buf_.Size(0); |
354 | 23.2M | return s; |
355 | 23.2M | } |
356 | | |
357 | | |
358 | | // This flushes the accumulated data in the buffer. We pad data with zeros if |
359 | | // necessary to the whole page. |
360 | | // However, during automatic flushes padding would not be necessary. |
361 | | // We always use RateLimiter if available. We move (Refit) any buffer bytes |
362 | | // that are left over the |
363 | | // whole number of pages to be written again on the next flush because we can |
364 | | // only write on aligned |
365 | | // offsets. |
366 | 0 | Status WritableFileWriter::WriteUnbuffered() { |
367 | 0 | Status s; |
368 | |
|
369 | 0 | assert(!use_os_buffer_); |
370 | 0 | const size_t alignment = buf_.Alignment(); |
371 | 0 | assert((next_write_offset_ % alignment) == 0); |
372 | | |
373 | | // Calculate whole page final file advance if all writes succeed |
374 | 0 | size_t file_advance = |
375 | 0 | TruncateToPageBoundary(alignment, buf_.CurrentSize()); |
376 | | |
377 | | // Calculate the leftover tail, we write it here padded with zeros BUT we |
378 | | // will write |
379 | | // it again in the future either on Close() OR when the current whole page |
380 | | // fills out |
381 | 0 | size_t leftover_tail = buf_.CurrentSize() - file_advance; |
382 | | |
383 | | // Round up and pad |
384 | 0 | buf_.PadToAlignmentWith(0); |
385 | |
|
386 | 0 | const char* src = buf_.BufferStart(); |
387 | 0 | uint64_t write_offset = next_write_offset_; |
388 | 0 | size_t left = buf_.CurrentSize(); |
389 | |
|
390 | 0 | while (left > 0) { |
391 | | // Check how much is allowed |
392 | 0 | size_t size = RequestToken(left, true); |
393 | |
|
394 | 0 | { |
395 | 0 | IOSTATS_TIMER_GUARD(write_nanos); |
396 | 0 | TEST_SYNC_POINT("WritableFileWriter::Flush:BeforeAppend"); |
397 | | // Unbuffered writes must be positional |
398 | 0 | s = writable_file_->PositionedAppend(Slice(src, size), write_offset); |
399 | 0 | if (!s.ok()) { |
400 | 0 | buf_.Size(file_advance + leftover_tail); |
401 | 0 | return s; |
402 | 0 | } |
403 | 0 | } |
404 | | |
405 | 0 | IOSTATS_ADD(bytes_written, size); |
406 | 0 | left -= size; |
407 | 0 | src += size; |
408 | 0 | write_offset += size; |
409 | 0 | assert((next_write_offset_ % alignment) == 0); |
410 | 0 | } |
411 | |
|
412 | 0 | if (s.ok()) { |
413 | | // Move the tail to the beginning of the buffer |
414 | | // This never happens during normal Append but rather during |
415 | | // explicit call to Flush()/Sync() or Close() |
416 | 0 | buf_.RefitTail(file_advance, leftover_tail); |
417 | | // This is where we start writing next time which may or not be |
418 | | // the actual file size on disk. They match if the buffer size |
419 | | // is a multiple of whole pages otherwise filesize_ is leftover_tail |
420 | | // behind |
421 | 0 | next_write_offset_ += file_advance; |
422 | 0 | } |
423 | 0 | return s; |
424 | 0 | } |
425 | | |
426 | | |
427 | | namespace { |
428 | | |
429 | | class ReadaheadRandomAccessFile : public yb::RandomAccessFileWrapper { |
430 | | public: |
431 | | ReadaheadRandomAccessFile(std::unique_ptr<RandomAccessFile>&& file, |
432 | | size_t readahead_size) |
433 | | : RandomAccessFileWrapper(std::move(file)), |
434 | | readahead_size_(readahead_size), |
435 | | forward_calls_(ShouldForwardRawRequest()), |
436 | | buffer_(), |
437 | | buffer_offset_(0), |
438 | 922 | buffer_len_(0) { |
439 | 922 | if (!forward_calls_) { |
440 | 922 | buffer_.reset(new uint8_t[readahead_size_]); |
441 | 0 | } else if (readahead_size_ > 0) { |
442 | 0 | EnableReadAhead(); |
443 | 0 | } |
444 | 922 | } |
445 | | |
446 | | ReadaheadRandomAccessFile(const ReadaheadRandomAccessFile&) = delete; |
447 | | |
448 | | ReadaheadRandomAccessFile& operator=(const ReadaheadRandomAccessFile&) = delete; |
449 | | |
450 | 4.13k | CHECKED_STATUS Read(uint64_t offset, size_t n, Slice* result, uint8_t* scratch) const override { |
451 | 4.13k | if (n >= readahead_size_) { |
452 | 0 | return RandomAccessFileWrapper::Read(offset, n, result, scratch); |
453 | 0 | } |
454 | | |
455 | | // On Windows in unbuffered mode this will lead to double buffering |
456 | | // and double locking so we avoid that. |
457 | | // In normal mode Windows caches so much data from disk that we do |
458 | | // not need readahead. |
459 | 4.13k | if (forward_calls_) { |
460 | 0 | return RandomAccessFileWrapper::Read(offset, n, result, scratch); |
461 | 0 | } |
462 | | |
463 | 4.13k | std::unique_lock<std::mutex> lk(lock_); |
464 | | |
465 | 4.13k | size_t copied = 0; |
466 | | // if offset between [buffer_offset_, buffer_offset_ + buffer_len> |
467 | 4.13k | if (offset >= buffer_offset_ && offset < buffer_len_ + buffer_offset_) { |
468 | 1.37k | uint64_t offset_in_buffer = offset - buffer_offset_; |
469 | 1.37k | copied = std::min(buffer_len_ - static_cast<size_t>(offset_in_buffer), n); |
470 | 1.37k | memcpy(scratch, buffer_.get() + offset_in_buffer, copied); |
471 | 1.37k | if (copied == n) { |
472 | | // fully cached |
473 | 1.37k | *result = Slice(scratch, n); |
474 | 1.37k | return Status::OK(); |
475 | 1.37k | } |
476 | 2.76k | } |
477 | 2.76k | Slice readahead_result; |
478 | 2.76k | Status s = RandomAccessFileWrapper::Read(offset + copied, readahead_size_, &readahead_result, |
479 | 2.76k | buffer_.get()); |
480 | 2.76k | if (!s.ok()) { |
481 | 0 | return s; |
482 | 0 | } |
483 | | |
484 | 2.76k | auto left_to_copy = std::min(readahead_result.size(), n - copied); |
485 | 2.76k | memcpy(scratch + copied, readahead_result.data(), left_to_copy); |
486 | 2.76k | *result = Slice(scratch, copied + left_to_copy); |
487 | | |
488 | 2.76k | if (readahead_result.data() == buffer_.get()) { |
489 | 2.76k | buffer_offset_ = offset + copied; |
490 | 2.76k | buffer_len_ = readahead_result.size(); |
491 | 0 | } else { |
492 | 0 | buffer_len_ = 0; |
493 | 0 | } |
494 | | |
495 | 2.76k | return Status::OK(); |
496 | 2.76k | } |
497 | | |
498 | | private: |
499 | | size_t readahead_size_; |
500 | | const bool forward_calls_; |
501 | | |
502 | | mutable std::mutex lock_; |
503 | | mutable std::unique_ptr<uint8_t[]> buffer_; |
504 | | mutable uint64_t buffer_offset_; |
505 | | mutable size_t buffer_len_; |
506 | | }; |
507 | | } // namespace |
508 | | |
509 | | std::unique_ptr<RandomAccessFile> NewReadaheadRandomAccessFile( |
510 | 922 | std::unique_ptr<RandomAccessFile>&& file, size_t readahead_size) { |
511 | 922 | std::unique_ptr<RandomAccessFile> result( |
512 | 922 | new ReadaheadRandomAccessFile(std::move(file), readahead_size)); |
513 | 922 | return result; |
514 | 922 | } |
515 | | |
516 | | Status NewWritableFile(Env* env, const std::string& fname, |
517 | | unique_ptr<WritableFile>* result, |
518 | 1.04M | const EnvOptions& options) { |
519 | 1.04M | Status s = env->NewWritableFile(fname, result, options); |
520 | 1.04M | TEST_KILL_RANDOM("NewWritableFile:0", rocksdb_kill_odds * REDUCE_ODDS2); |
521 | 1.04M | return s; |
522 | 1.04M | } |
523 | | |
524 | | } // namespace rocksdb |