/Users/deen/code/yugabyte-db/src/yb/rocksdb/util/env_posix.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | | // This source code is licensed under the BSD-style license found in the |
3 | | // LICENSE file in the root directory of this source tree. An additional grant |
4 | | // of patent rights can be found in the PATENTS file in the same directory. |
5 | | // |
6 | | // The following only applies to changes made to this file as part of YugaByte development. |
7 | | // |
8 | | // Portions Copyright (c) YugaByte, Inc. |
9 | | // |
10 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
11 | | // in compliance with the License. You may obtain a copy of the License at |
12 | | // |
13 | | // http://www.apache.org/licenses/LICENSE-2.0 |
14 | | // |
15 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
16 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
17 | | // or implied. See the License for the specific language governing permissions and limitations |
18 | | // under the License. |
19 | | // |
20 | | // Copyright (c) 2011 The LevelDB Authors. All rights reserved. |
21 | | // Use of this source code is governed by a BSD-style license that can be |
22 | | // found in the LICENSE file. See the AUTHORS file for names of contributors. |
23 | | #include <dirent.h> |
24 | | #include <fcntl.h> |
25 | | #if defined(__linux__) |
26 | | #include <linux/fs.h> |
27 | | #endif |
28 | | #include <pthread.h> |
29 | | #include <stdio.h> |
30 | | #include <stdlib.h> |
31 | | #include <string.h> |
32 | | #include <sys/mman.h> |
33 | | #include <sys/stat.h> |
34 | | #ifdef __linux__ |
35 | | #include <sys/statfs.h> |
36 | | #include <sys/syscall.h> |
37 | | #endif |
38 | | #include <sys/types.h> |
39 | | #include <time.h> |
40 | | #include <algorithm> |
41 | | // Get nano time includes |
42 | | #if defined(__linux__) || defined(OS_FREEBSD) |
43 | | #elif defined(__MACH__) |
44 | | #include <mach/clock.h> |
45 | | #include <mach/mach.h> |
46 | | #else |
47 | | #include <chrono> |
48 | | #endif |
49 | | #include <deque> |
50 | | #include <set> |
51 | | |
52 | | #include "yb/gutil/casts.h" |
53 | | |
54 | | #include "yb/rocksdb/port/port.h" |
55 | | #include "yb/rocksdb/options.h" |
56 | | #include "yb/rocksdb/util/io_posix.h" |
57 | | #include "yb/rocksdb/util/thread_posix.h" |
58 | | #include "yb/rocksdb/util/posix_logger.h" |
59 | | #include "yb/rocksdb/util/random.h" |
60 | | #include "yb/rocksdb/util/sync_point.h" |
61 | | #include "yb/rocksdb/util/thread_local.h" |
62 | | |
63 | | #include "yb/util/logging.h" |
64 | | #include "yb/util/slice.h" |
65 | | #include "yb/util/stats/iostats_context_imp.h" |
66 | | #include "yb/util/string_util.h" |
67 | | |
68 | | #if !defined(TMPFS_MAGIC) |
69 | | #define TMPFS_MAGIC 0x01021994 |
70 | | #endif |
71 | | #if !defined(XFS_SUPER_MAGIC) |
72 | | #define XFS_SUPER_MAGIC 0x58465342 |
73 | | #endif |
74 | | #if !defined(EXT4_SUPER_MAGIC) |
75 | | #define EXT4_SUPER_MAGIC 0xEF53 |
76 | | #endif |
77 | | |
78 | | #include "yb/util/file_system_posix.h" |
79 | | |
80 | 355k | #define STATUS_IO_ERROR(context, err_number) STATUS(IOError, (context), strerror(err_number)) |
81 | | |
82 | | namespace rocksdb { |
83 | | |
84 | | namespace { |
85 | | |
86 | | // list of pathnames that are locked |
87 | | static std::set<std::string> lockedFiles; |
88 | | static port::Mutex mutex_lockedFiles; |
89 | | |
90 | 1.35M | static int LockOrUnlock(const std::string& fname, int fd, bool lock) { |
91 | 1.35M | mutex_lockedFiles.Lock(); |
92 | 1.35M | if (lock) { |
93 | | // If it already exists in the lockedFiles set, then it is already locked, |
94 | | // and fail this lock attempt. Otherwise, insert it into lockedFiles. |
95 | | // This check is needed because fcntl() does not detect lock conflict |
96 | | // if the fcntl is issued by the same thread that earlier acquired |
97 | | // this lock. |
98 | 687k | if (lockedFiles.insert(fname).second == false) { |
99 | 9 | mutex_lockedFiles.Unlock(); |
100 | 9 | errno = ENOLCK; |
101 | 9 | return -1; |
102 | 9 | } |
103 | 668k | } else { |
104 | | // If we are unlocking, then verify that we had locked it earlier, |
105 | | // it should already exist in lockedFiles. Remove it from lockedFiles. |
106 | 668k | if (lockedFiles.erase(fname) != 1) { |
107 | 0 | mutex_lockedFiles.Unlock(); |
108 | 0 | errno = ENOLCK; |
109 | 0 | return -1; |
110 | 0 | } |
111 | 1.35M | } |
112 | 1.35M | errno = 0; |
113 | 1.35M | struct flock f; |
114 | 1.35M | memset(&f, 0, sizeof(f)); |
115 | 1.35M | f.l_type = (lock ? F_WRLCK : F_UNLCK); |
116 | 1.35M | f.l_whence = SEEK_SET; |
117 | 1.35M | f.l_start = 0; |
118 | 1.35M | f.l_len = 0; // Lock/unlock entire file |
119 | 1.35M | int value = fcntl(fd, F_SETLK, &f); |
120 | 1.35M | if (value == -1 && lock) { |
121 | | // if there is an error in locking, then remove the pathname from lockedfiles |
122 | 0 | lockedFiles.erase(fname); |
123 | 0 | } |
124 | 1.35M | mutex_lockedFiles.Unlock(); |
125 | 1.35M | return value; |
126 | 1.35M | } |
127 | | |
128 | 7.17M | void SetFD_CLOEXEC(int fd, const EnvOptions* options) { |
129 | 7.17M | if ((options == nullptr || options->set_fd_cloexec) && fd > 0) { |
130 | 7.17M | fcntl(fd, F_SETFD, fcntl(fd, F_GETFD) | FD_CLOEXEC); |
131 | 7.17M | } |
132 | 7.17M | } |
133 | | |
134 | | class PosixFileLock : public FileLock { |
135 | | public: |
136 | | int fd_; |
137 | | std::string filename; |
138 | | }; |
139 | | |
140 | | class PosixEnv : public Env { |
141 | | public: |
142 | | PosixEnv(); |
143 | | explicit PosixEnv(std::unique_ptr<RocksDBFileFactory> file_factory); |
144 | | |
145 | 1.39k | virtual ~PosixEnv() { |
146 | 415 | for (const auto tid : threads_to_join_) { |
147 | 415 | pthread_join(tid, nullptr); |
148 | 415 | } |
149 | 4.19k | for (int pool_id = 0; pool_id < Env::Priority::TOTAL; ++pool_id) { |
150 | 2.79k | thread_pools_[pool_id].JoinAllThreads(); |
151 | 2.79k | } |
152 | 1.39k | } |
153 | | |
154 | | virtual Status NewSequentialFile(const std::string& fname, |
155 | | unique_ptr<SequentialFile>* result, |
156 | 1.71M | const EnvOptions& options) override { |
157 | 1.71M | return file_factory_->NewSequentialFile(fname, result, options); |
158 | 1.71M | } |
159 | | |
160 | | virtual Status NewRandomAccessFile(const std::string& fname, |
161 | | unique_ptr<RandomAccessFile>* result, |
162 | 155k | const EnvOptions& options) override { |
163 | 155k | return file_factory_->NewRandomAccessFile(fname, result, options); |
164 | 155k | } |
165 | | |
166 | | virtual Status NewWritableFile(const std::string& fname, |
167 | | unique_ptr<WritableFile>* result, |
168 | 3.00M | const EnvOptions& options) override { |
169 | 3.00M | return file_factory_->NewWritableFile(fname, result, options); |
170 | 3.00M | } |
171 | | |
172 | | virtual Status ReuseWritableFile(const std::string& fname, |
173 | | const std::string& old_fname, |
174 | | unique_ptr<WritableFile>* result, |
175 | 57 | const EnvOptions& options) override { |
176 | 57 | return file_factory_->ReuseWritableFile(fname, old_fname, result, options); |
177 | 57 | } |
178 | | |
179 | 7.00k | bool IsPlainText() const override { |
180 | 7.00k | return file_factory_->IsPlainText(); |
181 | 7.00k | } |
182 | | |
183 | | virtual Status NewDirectory(const std::string& name, |
184 | 343k | unique_ptr<Directory>* result) override { |
185 | 343k | result->reset(); |
186 | 343k | int fd; |
187 | 343k | { |
188 | 343k | IOSTATS_TIMER_GUARD(open_nanos); |
189 | 343k | fd = open(name.c_str(), 0); |
190 | 343k | } |
191 | 343k | if (fd < 0) { |
192 | 0 | return STATUS_IO_ERROR(name, errno); |
193 | 343k | } else { |
194 | 343k | result->reset(new PosixDirectory(fd)); |
195 | 343k | } |
196 | 343k | return Status::OK(); |
197 | 343k | } |
198 | | |
199 | 2.14M | Status FileExists(const std::string& fname) override { |
200 | 2.14M | int result = access(fname.c_str(), F_OK); |
201 | | |
202 | 2.14M | if (result == 0) { |
203 | 419k | return Status::OK(); |
204 | 419k | } |
205 | | |
206 | 1.72M | switch (errno) { |
207 | 0 | case EACCES: |
208 | 0 | case ELOOP: |
209 | 0 | case ENAMETOOLONG: |
210 | 1.72M | case ENOENT: |
211 | 1.72M | case ENOTDIR: |
212 | 1.72M | return STATUS(NotFound, ""); |
213 | 0 | default: |
214 | 0 | assert(result == EIO || result == ENOMEM); |
215 | 0 | return STATUS(IOError, "Unexpected error(" + ToString(result) + |
216 | 1.72M | ") accessing file `" + fname + "' "); |
217 | 1.72M | } |
218 | 1.72M | } |
219 | | |
220 | | virtual Status GetChildren(const std::string& dir, |
221 | 2.72M | std::vector<std::string>* result) override { |
222 | 2.72M | result->clear(); |
223 | 2.72M | DIR* d = opendir(dir.c_str()); |
224 | 2.72M | if (d == nullptr) { |
225 | 348k | return STATUS_IO_ERROR(dir, errno); |
226 | 348k | } |
227 | 2.37M | struct dirent* entry; |
228 | 23.4M | while ((entry = readdir(d)) != nullptr) { |
229 | 21.0M | result->push_back(entry->d_name); |
230 | 21.0M | } |
231 | 2.37M | closedir(d); |
232 | 2.37M | return Status::OK(); |
233 | 2.37M | } |
234 | | |
235 | 2.98M | Status DeleteFile(const std::string& fname) override { |
236 | 2.98M | Status result; |
237 | 2.98M | if (unlink(fname.c_str()) != 0) { |
238 | 4.14k | result = STATUS_IO_ERROR(fname, errno); |
239 | 4.14k | } |
240 | 2.98M | return result; |
241 | 2.98M | }; |
242 | | |
243 | 9.73k | Status CreateDir(const std::string& name) override { |
244 | 9.73k | Status result; |
245 | 9.73k | if (mkdir(name.c_str(), 0755) != 0) { |
246 | 3 | result = STATUS_IO_ERROR(name, errno); |
247 | 3 | } |
248 | 9.73k | return result; |
249 | 9.73k | }; |
250 | | |
251 | 1.02M | Status CreateDirIfMissing(const std::string& name) override { |
252 | 1.02M | Status result; |
253 | 1.02M | if (mkdir(name.c_str(), 0755) != 0) { |
254 | 1.02M | if (errno != EEXIST) { |
255 | 0 | result = STATUS_IO_ERROR(name, errno); |
256 | 0 | LOG(DFATAL) << "Mkdir failed: " << result; |
257 | 1.02M | } else if (!DirExists(name)) { // Check that name is actually a |
258 | | // directory. |
259 | | // Message is taken from mkdir |
260 | 0 | result = STATUS(IOError, "`"+name+"' exists but is not a directory"); |
261 | 0 | } |
262 | 1.02M | } |
263 | 1.02M | return result; |
264 | 1.02M | }; |
265 | | |
266 | 347k | Status DeleteDir(const std::string& name) override { |
267 | 347k | Status result; |
268 | 347k | if (rmdir(name.c_str()) != 0) { |
269 | 2.31k | result = STATUS_IO_ERROR(name, errno); |
270 | 2.31k | } |
271 | 347k | return result; |
272 | 347k | }; |
273 | | |
274 | 710k | Status GetFileSize(const std::string& fname, uint64_t* size) override { |
275 | 710k | return file_factory_->GetFileSize(fname, size); |
276 | 710k | } |
277 | | |
278 | | virtual Status GetFileModificationTime(const std::string& fname, |
279 | 81 | uint64_t* file_mtime) override { |
280 | 81 | struct stat s; |
281 | 81 | if (stat(fname.c_str(), &s) !=0) { |
282 | 0 | return STATUS_IO_ERROR(fname, errno); |
283 | 0 | } |
284 | 81 | *file_mtime = static_cast<uint64_t>(s.st_mtime); |
285 | 81 | return Status::OK(); |
286 | 81 | } |
287 | | virtual Status RenameFile(const std::string& src, |
288 | 1.93M | const std::string& target) override { |
289 | 1.93M | Status result; |
290 | 1.93M | if (rename(src.c_str(), target.c_str()) != 0) { |
291 | 10 | result = STATUS_IO_ERROR(src, errno); |
292 | 10 | } |
293 | 1.93M | return result; |
294 | 1.93M | } |
295 | | |
296 | | virtual Status LinkFile(const std::string& src, |
297 | 8.44k | const std::string& target) override { |
298 | 8.44k | Status result; |
299 | 8.44k | if (link(src.c_str(), target.c_str()) != 0) { |
300 | 0 | if (errno == EXDEV) { |
301 | 0 | return STATUS(NotSupported, "No cross FS links allowed"); |
302 | 0 | } |
303 | 0 | result = STATUS_IO_ERROR(src, errno); |
304 | 0 | } |
305 | 8.44k | return result; |
306 | 8.44k | } |
307 | | |
308 | 687k | Status LockFile(const std::string& fname, FileLock** lock) override { |
309 | 687k | *lock = nullptr; |
310 | 687k | Status result; |
311 | 687k | int fd; |
312 | 687k | { |
313 | 687k | IOSTATS_TIMER_GUARD(open_nanos); |
314 | 687k | fd = open(fname.c_str(), O_RDWR | O_CREAT, 0644); |
315 | 687k | } |
316 | 687k | if (fd < 0) { |
317 | 158 | result = STATUS_IO_ERROR(fname, errno); |
318 | 687k | } else if (LockOrUnlock(fname, fd, true) == -1) { |
319 | 9 | result = STATUS_IO_ERROR("lock " + fname, errno); |
320 | 9 | close(fd); |
321 | 687k | } else { |
322 | 687k | SetFD_CLOEXEC(fd, nullptr); |
323 | 687k | PosixFileLock* my_lock = new PosixFileLock; |
324 | 687k | my_lock->fd_ = fd; |
325 | 687k | my_lock->filename = fname; |
326 | 687k | *lock = my_lock; |
327 | 687k | } |
328 | 687k | return result; |
329 | 687k | } |
330 | | |
331 | 668k | Status UnlockFile(FileLock* lock) override { |
332 | 668k | PosixFileLock* my_lock = reinterpret_cast<PosixFileLock*>(lock); |
333 | 668k | Status result; |
334 | 668k | if (LockOrUnlock(my_lock->filename, my_lock->fd_, false) == -1) { |
335 | 0 | result = STATUS_IO_ERROR("unlock", errno); |
336 | 0 | } |
337 | 668k | close(my_lock->fd_); |
338 | 668k | delete my_lock; |
339 | 668k | return result; |
340 | 668k | } |
341 | | |
342 | | virtual void Schedule(void (*function)(void* arg1), void* arg, |
343 | | Priority pri = LOW, void* tag = nullptr, |
344 | | void (*unschedFunction)(void* arg) = 0) override; |
345 | | |
346 | | int UnSchedule(void* arg, Priority pri) override; |
347 | | |
348 | | void StartThread(void (*function)(void* arg), void* arg) override; |
349 | | |
350 | | void WaitForJoin() override; |
351 | | |
352 | | unsigned int GetThreadPoolQueueLen(Priority pri = LOW) const override; |
353 | | |
354 | 4.22k | Status GetTestDirectory(std::string* result) override { |
355 | 4.22k | const char* env = getenv("TEST_TMPDIR"); |
356 | 4.22k | if (env && env[0] != '\0') { |
357 | 4.22k | *result = env; |
358 | 0 | } else { |
359 | 0 | char buf[100]; |
360 | 0 | snprintf(buf, sizeof(buf), "/tmp/rocksdbtest-%ud", geteuid()); |
361 | 0 | *result = buf; |
362 | 0 | } |
363 | | // Directory may already exist |
364 | 4.22k | if (!DirExists(*result)) { |
365 | 0 | RETURN_NOT_OK(CreateDir(*result)); |
366 | 0 | } |
367 | 4.22k | return Status::OK(); |
368 | 4.22k | } |
369 | | |
370 | 1.94M | static uint64_t gettid(pthread_t tid) { |
371 | 1.94M | uint64_t thread_id = 0; |
372 | 1.94M | memcpy(&thread_id, &tid, std::min(sizeof(thread_id), sizeof(tid))); |
373 | 1.94M | return thread_id; |
374 | 1.94M | } |
375 | | |
376 | 1.93M | static uint64_t gettid() { |
377 | 1.93M | pthread_t tid = pthread_self(); |
378 | 1.93M | return gettid(tid); |
379 | 1.93M | } |
380 | | |
381 | 3.62k | uint64_t GetThreadID() const override { |
382 | 3.62k | return gettid(pthread_self()); |
383 | 3.62k | } |
384 | | |
385 | | virtual Status NewLogger(const std::string& fname, |
386 | 21.0k | shared_ptr<Logger>* result) override { |
387 | 21.0k | FILE* f; |
388 | 21.0k | { |
389 | 21.0k | IOSTATS_TIMER_GUARD(open_nanos); |
390 | 21.0k | f = fopen(fname.c_str(), "w"); |
391 | 21.0k | } |
392 | 21.0k | if (f == nullptr) { |
393 | 2 | result->reset(); |
394 | 2 | return STATUS_IO_ERROR(fname, errno); |
395 | 21.0k | } else { |
396 | 21.0k | int fd = fileno(f); |
397 | | #ifdef ROCKSDB_FALLOCATE_PRESENT |
398 | | fallocate(fd, FALLOC_FL_KEEP_SIZE, 0, 4 * 1024); |
399 | | #endif |
400 | 21.0k | SetFD_CLOEXEC(fd, nullptr); |
401 | 21.0k | result->reset(new PosixLogger(f, &PosixEnv::gettid, this)); |
402 | 21.0k | return Status::OK(); |
403 | 21.0k | } |
404 | 21.0k | } |
405 | | |
406 | 17.0M | uint64_t NowMicros() override { |
407 | 17.0M | struct timeval tv; |
408 | 17.0M | gettimeofday(&tv, nullptr); |
409 | 17.0M | return static_cast<uint64_t>(tv.tv_sec) * 1000000 + tv.tv_usec; |
410 | 17.0M | } |
411 | | |
412 | 3.27M | uint64_t NowNanos() override { |
413 | | #if defined(__linux__) || defined(OS_FREEBSD) |
414 | | struct timespec ts; |
415 | | clock_gettime(CLOCK_MONOTONIC, &ts); |
416 | | return static_cast<uint64_t>(ts.tv_sec) * 1000000000 + ts.tv_nsec; |
417 | | #elif defined(__MACH__) |
418 | 3.27M | clock_serv_t cclock; |
419 | 3.27M | mach_timespec_t ts; |
420 | 3.27M | host_get_clock_service(mach_host_self(), CALENDAR_CLOCK, &cclock); |
421 | 3.27M | clock_get_time(cclock, &ts); |
422 | 3.27M | mach_port_deallocate(mach_task_self(), cclock); |
423 | 3.27M | return static_cast<uint64_t>(ts.tv_sec) * 1000000000 + ts.tv_nsec; |
424 | | #else |
425 | | return std::chrono::duration_cast<std::chrono::nanoseconds>( |
426 | | std::chrono::steady_clock::now().time_since_epoch()).count(); |
427 | | #endif |
428 | 3.27M | } |
429 | | |
430 | 61.6k | void SleepForMicroseconds(int micros) override { usleep(micros); } |
431 | | |
432 | 0 | Status GetHostName(char* name, uint64_t len) override { |
433 | 0 | int ret = gethostname(name, static_cast<size_t>(len)); |
434 | 0 | if (ret < 0) { |
435 | 0 | if (errno == EFAULT || errno == EINVAL) |
436 | 0 | return STATUS(InvalidArgument, strerror(errno)); |
437 | 0 | else |
438 | 0 | return STATUS_IO_ERROR("GetHostName", errno); |
439 | 0 | } |
440 | 0 | return Status::OK(); |
441 | 0 | } |
442 | | |
443 | 285k | Status GetCurrentTime(int64_t* unix_time) override { |
444 | 285k | time_t ret = time(nullptr); |
445 | 285k | if (ret == (time_t) -1) { |
446 | 0 | return STATUS_IO_ERROR("GetCurrentTime", errno); |
447 | 0 | } |
448 | 285k | *unix_time = (int64_t) ret; |
449 | 285k | return Status::OK(); |
450 | 285k | } |
451 | | |
452 | | virtual Status GetAbsolutePath(const std::string& db_path, |
453 | 363k | std::string* output_path) override { |
454 | 363k | if (db_path.find('/') == 0) { |
455 | 362k | *output_path = db_path; |
456 | 362k | return Status::OK(); |
457 | 362k | } |
458 | | |
459 | 180 | char the_path[256]; |
460 | 180 | char* ret = getcwd(the_path, 256); |
461 | 180 | if (ret == nullptr) { |
462 | 0 | return STATUS(IOError, strerror(errno)); |
463 | 0 | } |
464 | | |
465 | 180 | *output_path = ret; |
466 | 180 | return Status::OK(); |
467 | 180 | } |
468 | | |
469 | | // Allow increasing the number of worker threads. |
470 | 1.18k | void SetBackgroundThreads(int num, Priority pri) override { |
471 | 1.18k | assert(pri >= Priority::LOW && pri <= Priority::HIGH); |
472 | 1.18k | thread_pools_[pri].SetBackgroundThreads(num); |
473 | 1.18k | } |
474 | | |
475 | | // Allow increasing the number of worker threads. |
476 | 1.37M | void IncBackgroundThreadsIfNeeded(int num, Priority pri) override { |
477 | 1.37M | assert(pri >= Priority::LOW && pri <= Priority::HIGH); |
478 | 1.37M | thread_pools_[pri].IncBackgroundThreadsIfNeeded(num); |
479 | 1.37M | } |
480 | | |
481 | 0 | void LowerThreadPoolIOPriority(Priority pool = LOW) override { |
482 | 0 | assert(pool >= Priority::LOW && pool <= Priority::HIGH); |
483 | | #ifdef __linux__ |
484 | | thread_pools_[pool].LowerIOPriority(); |
485 | | #endif |
486 | 0 | } |
487 | | |
488 | 0 | std::string TimeToString(uint64_t secondsSince1970) override { |
489 | 0 | const time_t seconds = (time_t)secondsSince1970; |
490 | 0 | struct tm t; |
491 | 0 | int maxsize = 64; |
492 | 0 | std::string dummy; |
493 | 0 | dummy.reserve(maxsize); |
494 | 0 | dummy.resize(maxsize); |
495 | 0 | char* p = &dummy[0]; |
496 | 0 | localtime_r(&seconds, &t); |
497 | 0 | snprintf(p, maxsize, |
498 | 0 | "%04d/%02d/%02d-%02d:%02d:%02d ", |
499 | 0 | t.tm_year + 1900, |
500 | 0 | t.tm_mon + 1, |
501 | 0 | t.tm_mday, |
502 | 0 | t.tm_hour, |
503 | 0 | t.tm_min, |
504 | 0 | t.tm_sec); |
505 | 0 | return dummy; |
506 | 0 | } |
507 | | |
508 | | EnvOptions OptimizeForLogWrite(const EnvOptions& env_options, |
509 | 333k | const DBOptions& db_options) const override { |
510 | 333k | EnvOptions optimized = env_options; |
511 | 333k | optimized.use_mmap_writes = false; |
512 | 333k | optimized.bytes_per_sync = db_options.wal_bytes_per_sync; |
513 | | // TODO(icanadi) it's faster if fallocate_with_keep_size is false, but it |
514 | | // breaks TransactionLogIteratorStallAtLastRecord unit test. Fix the unit |
515 | | // test and make this false |
516 | 333k | optimized.fallocate_with_keep_size = true; |
517 | 333k | return optimized; |
518 | 333k | } |
519 | | |
520 | | EnvOptions OptimizeForManifestWrite( |
521 | 577k | const EnvOptions& env_options) const override { |
522 | 577k | EnvOptions optimized = env_options; |
523 | 577k | optimized.use_mmap_writes = false; |
524 | 577k | optimized.fallocate_with_keep_size = true; |
525 | 577k | return optimized; |
526 | 577k | } |
527 | | |
528 | 6.11k | RocksDBFileFactory* GetFileFactory() { |
529 | 6.11k | return file_factory_.get(); |
530 | 6.11k | } |
531 | | |
532 | | private: |
533 | | std::unique_ptr<RocksDBFileFactory> file_factory_; |
534 | | // Returns true iff the named directory exists and is a directory. |
535 | 1.05M | bool DirExists(const std::string& dname) override { |
536 | 1.05M | struct stat statbuf; |
537 | 1.05M | if (stat(dname.c_str(), &statbuf) == 0) { |
538 | 1.05M | return S_ISDIR(statbuf.st_mode); |
539 | 1.05M | } |
540 | 7.46k | return false; // stat() failed return false |
541 | 7.46k | } |
542 | | |
543 | 0 | bool SupportsFastAllocate(const std::string& path) { |
544 | 0 | #ifdef ROCKSDB_FALLOCATE_PRESENT |
545 | 0 | struct statfs s; |
546 | 0 | if (statfs(path.c_str(), &s)) { |
547 | 0 | return false; |
548 | 0 | } |
549 | 0 | switch (s.f_type) { |
550 | 0 | case EXT4_SUPER_MAGIC: |
551 | 0 | return true; |
552 | 0 | case XFS_SUPER_MAGIC: |
553 | 0 | return true; |
554 | 0 | case TMPFS_MAGIC: |
555 | 0 | return true; |
556 | 0 | default: |
557 | 0 | return false; |
558 | 0 | } |
559 | 0 | #else |
560 | 0 | return false; |
561 | 0 | #endif |
562 | 0 | } |
563 | | |
564 | | std::vector<ThreadPool> thread_pools_; |
565 | | pthread_mutex_t mu_; |
566 | | std::vector<pthread_t> threads_to_join_; |
567 | | }; |
568 | | |
569 | | class PosixRocksDBFileFactory : public RocksDBFileFactory { |
570 | | public: |
571 | 11.9k | PosixRocksDBFileFactory() {} |
572 | | |
573 | 1.27k | ~PosixRocksDBFileFactory() {} |
574 | | |
575 | | Status NewSequentialFile(const std::string& fname, |
576 | | unique_ptr<SequentialFile>* result, |
577 | 1.72M | const EnvOptions& options) override { |
578 | 1.72M | result->reset(); |
579 | 1.72M | FILE* f = nullptr; |
580 | 1.72M | do { |
581 | 1.72M | IOSTATS_TIMER_GUARD(open_nanos); |
582 | 1.72M | f = fopen(fname.c_str(), "r"); |
583 | 1.72M | } while (f == nullptr && errno == EINTR); |
584 | 1.72M | if (f == nullptr) { |
585 | 0 | *result = nullptr; |
586 | 0 | return STATUS_IO_ERROR(fname, errno); |
587 | 1.72M | } else { |
588 | 1.72M | int fd = fileno(f); |
589 | 1.72M | SetFD_CLOEXEC(fd, &options); |
590 | 1.72M | *result = std::make_unique<yb::PosixSequentialFile>(fname, f, options); |
591 | 1.72M | return Status::OK(); |
592 | 1.72M | } |
593 | 1.72M | } |
594 | | |
595 | | Status NewRandomAccessFile(const std::string& fname, |
596 | | unique_ptr<RandomAccessFile>* result, |
597 | 1.73M | const EnvOptions& options) override { |
598 | 1.73M | result->reset(); |
599 | 1.73M | Status s; |
600 | 1.73M | int fd; |
601 | 1.73M | { |
602 | 1.73M | IOSTATS_TIMER_GUARD(open_nanos); |
603 | 1.73M | fd = open(fname.c_str(), O_RDONLY); |
604 | 1.73M | } |
605 | 1.73M | SetFD_CLOEXEC(fd, &options); |
606 | 1.73M | if (fd < 0) { |
607 | 528 | s = STATUS_IO_ERROR(fname, errno); |
608 | 1.73M | } else if (options.use_mmap_reads && sizeof(void*) >= 8) { |
609 | | // Use of mmap for random reads has been removed because it |
610 | | // kills performance when storage is fast. |
611 | | // Use mmap when virtual address-space is plentiful. |
612 | 4.74k | uint64_t size; |
613 | 4.74k | s = GetFileSize(fname, &size); |
614 | 4.74k | if (s.ok()) { |
615 | 4.74k | void* base = mmap(nullptr, size, PROT_READ, MAP_SHARED, fd, 0); |
616 | 4.74k | if (base != MAP_FAILED) { |
617 | 4.74k | *result = std::make_unique<PosixMmapReadableFile>(fd, fname, base, size, options); |
618 | 0 | } else { |
619 | 0 | s = STATUS_IO_ERROR(fname, errno); |
620 | 0 | } |
621 | 4.74k | } |
622 | 4.74k | close(fd); |
623 | 1.73M | } else { |
624 | 1.73M | *result = std::make_unique<yb::PosixRandomAccessFile>(fname, fd, options); |
625 | 1.73M | } |
626 | 1.73M | return s; |
627 | 1.73M | } |
628 | | |
629 | | Status NewWritableFile(const std::string& fname, |
630 | | unique_ptr<WritableFile>* result, |
631 | 3.00M | const EnvOptions& options) override { |
632 | 3.00M | result->reset(); |
633 | 3.00M | Status s; |
634 | 3.00M | int fd = -1; |
635 | 3.00M | do { |
636 | 3.00M | IOSTATS_TIMER_GUARD(open_nanos); |
637 | 3.00M | fd = open(fname.c_str(), O_CREAT | O_RDWR | O_TRUNC, 0644); |
638 | 3.00M | } while (fd < 0 && errno == EINTR); |
639 | 3.00M | if (fd < 0) { |
640 | 25 | s = STATUS_IO_ERROR(fname, errno); |
641 | 3.00M | } else { |
642 | 3.00M | SetFD_CLOEXEC(fd, &options); |
643 | 3.00M | if (options.use_mmap_writes) { |
644 | 1.18k | if (!checkedDiskForMmap_) { |
645 | | // this will be executed once in the program's lifetime. |
646 | | // do not use mmapWrite on non ext-3/xfs/tmpfs systems. |
647 | 39 | if (!SupportsFastAllocate(fname)) { |
648 | 39 | forceMmapOff = true; |
649 | 39 | } |
650 | 39 | checkedDiskForMmap_ = true; |
651 | 39 | } |
652 | 1.18k | } |
653 | 3.00M | if (options.use_mmap_writes && !forceMmapOff) { |
654 | 0 | *result = std::make_unique<PosixMmapFile>(fname, fd, page_size_, options); |
655 | 3.00M | } else { |
656 | | // disable mmap writes |
657 | 3.00M | EnvOptions no_mmap_writes_options = options; |
658 | 3.00M | no_mmap_writes_options.use_mmap_writes = false; |
659 | 3.00M | *result = std::make_unique<PosixWritableFile>(fname, fd, no_mmap_writes_options); |
660 | 3.00M | } |
661 | 3.00M | } |
662 | 3.00M | return s; |
663 | 3.00M | } |
664 | | |
665 | | Status ReuseWritableFile(const std::string& fname, |
666 | | const std::string& old_fname, |
667 | | unique_ptr<WritableFile>* result, |
668 | 57 | const EnvOptions& options) override { |
669 | 57 | result->reset(); |
670 | 57 | Status s; |
671 | 57 | int fd = -1; |
672 | 57 | do { |
673 | 57 | IOSTATS_TIMER_GUARD(open_nanos); |
674 | 57 | fd = open(old_fname.c_str(), O_RDWR, 0644); |
675 | 57 | } while (fd < 0 && errno == EINTR); |
676 | 57 | if (fd < 0) { |
677 | 0 | s = STATUS_IO_ERROR(fname, errno); |
678 | 57 | } else { |
679 | 57 | SetFD_CLOEXEC(fd, &options); |
680 | | // rename into place |
681 | 57 | if (rename(old_fname.c_str(), fname.c_str()) != 0) { |
682 | 0 | Status r = STATUS_IO_ERROR(old_fname, errno); |
683 | 0 | close(fd); |
684 | 0 | return r; |
685 | 0 | } |
686 | 57 | if (options.use_mmap_writes) { |
687 | 0 | if (!checkedDiskForMmap_) { |
688 | | // this will be executed once in the program's lifetime. |
689 | | // do not use mmapWrite on non ext-3/xfs/tmpfs systems. |
690 | 0 | if (!SupportsFastAllocate(fname)) { |
691 | 0 | forceMmapOff = true; |
692 | 0 | } |
693 | 0 | checkedDiskForMmap_ = true; |
694 | 0 | } |
695 | 0 | } |
696 | 57 | if (options.use_mmap_writes && !forceMmapOff) { |
697 | 0 | *result = std::make_unique<PosixMmapFile>(fname, fd, page_size_, options); |
698 | 57 | } else { |
699 | | // disable mmap writes |
700 | 57 | EnvOptions no_mmap_writes_options = options; |
701 | 57 | no_mmap_writes_options.use_mmap_writes = false; |
702 | | |
703 | 57 | *result = std::make_unique<PosixWritableFile>(fname, fd, no_mmap_writes_options); |
704 | 57 | } |
705 | 57 | } |
706 | 57 | return s; |
707 | 57 | } |
708 | | |
709 | 714k | Status GetFileSize(const std::string& fname, uint64_t* size) override { |
710 | 714k | Status s; |
711 | 714k | struct stat sbuf; |
712 | 714k | if (stat(fname.c_str(), &sbuf) != 0) { |
713 | 20 | *size = 0; |
714 | 20 | s = STATUS_IO_ERROR(fname, errno); |
715 | 714k | } else { |
716 | 714k | *size = sbuf.st_size; |
717 | 714k | } |
718 | 714k | return s; |
719 | 714k | } |
720 | | |
721 | 7.00k | bool IsPlainText() const override { |
722 | 7.00k | return true; |
723 | 7.00k | } |
724 | | |
725 | | private: |
726 | | bool checkedDiskForMmap_ = false; |
727 | | bool forceMmapOff = false; |
728 | | size_t page_size_ = getpagesize(); |
729 | | |
730 | 39 | bool SupportsFastAllocate(const std::string& path) { |
731 | | #ifdef ROCKSDB_FALLOCATE_PRESENT |
732 | | struct statfs s; |
733 | | if (statfs(path.c_str(), &s)) { |
734 | | return false; |
735 | | } |
736 | | switch (s.f_type) { |
737 | | case EXT4_SUPER_MAGIC: |
738 | | return true; |
739 | | case XFS_SUPER_MAGIC: |
740 | | return true; |
741 | | case TMPFS_MAGIC: |
742 | | return true; |
743 | | default: |
744 | | return false; |
745 | | } |
746 | | #else |
747 | 39 | return false; |
748 | 39 | #endif |
749 | 39 | } |
750 | | |
751 | | }; |
752 | | |
753 | | PosixEnv::PosixEnv() |
754 | 11.9k | : file_factory_(std::make_unique<PosixRocksDBFileFactory>()), thread_pools_(Priority::TOTAL) { |
755 | 11.9k | ThreadPool::PthreadCall("mutex_init", pthread_mutex_init(&mu_, nullptr)); |
756 | 35.8k | for (int pool_id = 0; pool_id < Env::Priority::TOTAL; ++pool_id) { |
757 | 23.9k | thread_pools_[pool_id].SetThreadPriority( |
758 | 23.9k | static_cast<Env::Priority>(pool_id)); |
759 | | // This allows later initializing the thread-local-env of each thread. |
760 | 23.9k | thread_pools_[pool_id].SetHostEnv(this); |
761 | 23.9k | } |
762 | 11.9k | } |
763 | | |
764 | | PosixEnv::PosixEnv(std::unique_ptr<RocksDBFileFactory> file_factory) : |
765 | 6.11k | file_factory_(std::move(file_factory)), thread_pools_(Priority::TOTAL) { |
766 | 6.11k | ThreadPool::PthreadCall("mutex_init", pthread_mutex_init(&mu_, nullptr)); |
767 | 18.3k | for (int pool_id = 0; pool_id < Env::Priority::TOTAL; ++pool_id) { |
768 | 12.2k | thread_pools_[pool_id].SetThreadPriority( |
769 | 12.2k | static_cast<Env::Priority>(pool_id)); |
770 | | // This allows later initializing the thread-local-env of each thread. |
771 | 12.2k | thread_pools_[pool_id].SetHostEnv(this); |
772 | 12.2k | } |
773 | 6.11k | } |
774 | | |
775 | | void PosixEnv::Schedule(void (*function)(void* arg1), void* arg, Priority pri, |
776 | 124k | void* tag, void (*unschedFunction)(void* arg)) { |
777 | 124k | assert(pri >= Priority::LOW && pri <= Priority::HIGH); |
778 | 124k | thread_pools_[pri].Schedule(function, arg, tag, unschedFunction); |
779 | 124k | } |
780 | | |
781 | 647k | int PosixEnv::UnSchedule(void* arg, Priority pri) { |
782 | 647k | return thread_pools_[pri].UnSchedule(arg); |
783 | 647k | } |
784 | | |
785 | 33 | unsigned int PosixEnv::GetThreadPoolQueueLen(Priority pri) const { |
786 | 33 | assert(pri >= Priority::LOW && pri <= Priority::HIGH); |
787 | 33 | return thread_pools_[pri].GetQueueLen(); |
788 | 33 | } |
789 | | |
790 | | struct StartThreadState { |
791 | | void (*user_function)(void*); |
792 | | void* arg; |
793 | | }; |
794 | | |
795 | 2.48k | static void* StartThreadWrapper(void* arg) { |
796 | 2.48k | StartThreadState* state = reinterpret_cast<StartThreadState*>(arg); |
797 | 2.48k | state->user_function(state->arg); |
798 | 2.48k | delete state; |
799 | 2.48k | return nullptr; |
800 | 2.48k | } |
801 | | |
802 | 2.49k | void PosixEnv::StartThread(void (*function)(void* arg), void* arg) { |
803 | 2.49k | pthread_t t; |
804 | 2.49k | StartThreadState* state = new StartThreadState; |
805 | 2.49k | state->user_function = function; |
806 | 2.49k | state->arg = arg; |
807 | 2.49k | ThreadPool::PthreadCall( |
808 | 2.49k | "start thread", pthread_create(&t, nullptr, &StartThreadWrapper, state)); |
809 | 2.49k | ThreadPool::PthreadCall("lock", pthread_mutex_lock(&mu_)); |
810 | 2.49k | threads_to_join_.push_back(t); |
811 | 2.49k | ThreadPool::PthreadCall("unlock", pthread_mutex_unlock(&mu_)); |
812 | 2.49k | } |
813 | | |
814 | 32 | void PosixEnv::WaitForJoin() { |
815 | 2.07k | for (const auto tid : threads_to_join_) { |
816 | 2.07k | pthread_join(tid, nullptr); |
817 | 2.07k | } |
818 | 32 | threads_to_join_.clear(); |
819 | 32 | } |
820 | | |
821 | | } // namespace |
822 | | |
823 | 344k | std::string Env::GenerateUniqueId() { |
824 | 344k | std::string uuid_file = "/proc/sys/kernel/random/uuid"; |
825 | | |
826 | 344k | Status s = FileExists(uuid_file); |
827 | 344k | if (s.ok()) { |
828 | 0 | std::string uuid; |
829 | 0 | s = ReadFileToString(this, uuid_file, &uuid); |
830 | 0 | if (s.ok()) { |
831 | 0 | return uuid; |
832 | 0 | } |
833 | 344k | } |
834 | | // Could not read uuid_file - generate uuid using "nanos-random" |
835 | 344k | Random64 r(time(nullptr)); |
836 | 344k | uint64_t random_uuid_portion = |
837 | 344k | r.Uniform(std::numeric_limits<uint64_t>::max()); |
838 | 344k | uint64_t nanos_uuid_portion = NowNanos(); |
839 | 344k | char uuid2[200]; |
840 | 344k | snprintf(uuid2, |
841 | 344k | 200, |
842 | 344k | "%" PRIu64 "x-%" PRIu64 "x", |
843 | 344k | nanos_uuid_portion, |
844 | 344k | random_uuid_portion); |
845 | 344k | return uuid2; |
846 | 344k | } |
847 | | |
848 | 9.20M | Env* Env::Default() { |
849 | | // The following function call initializes the singletons of ThreadLocalPtr |
850 | | // right before the static default_env. This guarantees default_env will |
851 | | // always being destructed before the ThreadLocalPtr singletons get |
852 | | // destructed as C++ guarantees that the destructions of static variables |
853 | | // is in the reverse order of their constructions. |
854 | | // |
855 | | // Since static members are destructed in the reverse order |
856 | | // of their construction, having this call here guarantees that |
857 | | // the destructor of static PosixEnv will go first, then the |
858 | | // the singletons of ThreadLocalPtr. |
859 | 9.20M | ThreadLocalPtr::InitSingletons(); |
860 | | // Make sure that SyncPoint inited before PosixEnv, to be deleted after it. |
861 | 9.20M | #ifndef NDEBUG |
862 | 9.20M | SyncPoint::GetInstance(); |
863 | 9.20M | #endif |
864 | 9.20M | static PosixEnv default_env; |
865 | 9.20M | return &default_env; |
866 | 9.20M | } |
867 | | |
868 | 6.11k | RocksDBFileFactory* Env::DefaultFileFactory() { |
869 | 6.11k | return down_cast<PosixEnv*>(Env::Default())->GetFileFactory(); |
870 | 6.11k | } |
871 | | |
872 | | std::unique_ptr<Env> Env::NewRocksDBDefaultEnv( |
873 | 6.11k | std::unique_ptr<RocksDBFileFactory> file_factory) { |
874 | 6.11k | return std::make_unique<PosixEnv>(std::move(file_factory)); |
875 | 6.11k | } |
876 | | |
877 | | } // namespace rocksdb |