YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/rocksdb/util/env_posix.cc
Line
Count
Source (jump to first uncovered line)
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under the BSD-style license found in the
3
//  LICENSE file in the root directory of this source tree. An additional grant
4
//  of patent rights can be found in the PATENTS file in the same directory.
5
//
6
// The following only applies to changes made to this file as part of YugaByte development.
7
//
8
// Portions Copyright (c) YugaByte, Inc.
9
//
10
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
11
// in compliance with the License.  You may obtain a copy of the License at
12
//
13
// http://www.apache.org/licenses/LICENSE-2.0
14
//
15
// Unless required by applicable law or agreed to in writing, software distributed under the License
16
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
17
// or implied.  See the License for the specific language governing permissions and limitations
18
// under the License.
19
//
20
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
21
// Use of this source code is governed by a BSD-style license that can be
22
// found in the LICENSE file. See the AUTHORS file for names of contributors.
23
#include <dirent.h>
24
#include <fcntl.h>
25
#if defined(__linux__)
26
#include <linux/fs.h>
27
#endif
28
#include <pthread.h>
29
#include <stdio.h>
30
#include <stdlib.h>
31
#include <string.h>
32
#include <sys/mman.h>
33
#include <sys/stat.h>
34
#ifdef __linux__
35
#include <sys/statfs.h>
36
#include <sys/syscall.h>
37
#endif
38
#include <sys/types.h>
39
#include <time.h>
40
#include <algorithm>
41
// Get nano time includes
42
#if defined(__linux__) || defined(OS_FREEBSD)
43
#elif defined(__MACH__)
44
#include <mach/clock.h>
45
#include <mach/mach.h>
46
#else
47
#include <chrono>
48
#endif
49
#include <deque>
50
#include <set>
51
52
#include "yb/gutil/casts.h"
53
54
#include "yb/rocksdb/port/port.h"
55
#include "yb/rocksdb/options.h"
56
#include "yb/rocksdb/util/io_posix.h"
57
#include "yb/rocksdb/util/thread_posix.h"
58
#include "yb/rocksdb/util/posix_logger.h"
59
#include "yb/rocksdb/util/random.h"
60
#include "yb/rocksdb/util/sync_point.h"
61
#include "yb/rocksdb/util/thread_local.h"
62
63
#include "yb/util/logging.h"
64
#include "yb/util/slice.h"
65
#include "yb/util/stats/iostats_context_imp.h"
66
#include "yb/util/string_util.h"
67
68
#if !defined(TMPFS_MAGIC)
69
#define TMPFS_MAGIC 0x01021994
70
#endif
71
#if !defined(XFS_SUPER_MAGIC)
72
#define XFS_SUPER_MAGIC 0x58465342
73
#endif
74
#if !defined(EXT4_SUPER_MAGIC)
75
#define EXT4_SUPER_MAGIC 0xEF53
76
#endif
77
78
#include "yb/util/file_system_posix.h"
79
80
11.5k
#define STATUS_IO_ERROR(context, err_number) STATUS(IOError, (context), strerror(err_number))
81
82
namespace rocksdb {
83
84
namespace {
85
86
// list of pathnames that are locked
87
static std::set<std::string> lockedFiles;
88
static port::Mutex mutex_lockedFiles;
89
90
1.67M
static int LockOrUnlock(const std::string& fname, int fd, bool lock) {
91
1.67M
  mutex_lockedFiles.Lock();
92
1.67M
  if (lock) {
93
    // If it already exists in the lockedFiles set, then it is already locked,
94
    // and fail this lock attempt. Otherwise, insert it into lockedFiles.
95
    // This check is needed because fcntl() does not detect lock conflict
96
    // if the fcntl is issued by the same thread that earlier acquired
97
    // this lock.
98
855k
    if (lockedFiles.insert(fname).second == false) {
99
9
      mutex_lockedFiles.Unlock();
100
9
      errno = ENOLCK;
101
9
      return -1;
102
9
    }
103
855k
  } else {
104
    // If we are unlocking, then verify that we had locked it earlier,
105
    // it should already exist in lockedFiles. Remove it from lockedFiles.
106
815k
    if (lockedFiles.erase(fname) != 1) {
107
0
      mutex_lockedFiles.Unlock();
108
0
      errno = ENOLCK;
109
0
      return -1;
110
0
    }
111
815k
  }
112
1.67M
  errno = 0;
113
1.67M
  struct flock f;
114
1.67M
  memset(&f, 0, sizeof(f));
115
1.67M
  f.l_type = (lock ? F_WRLCK : F_UNLCK);
116
1.67M
  f.l_whence = SEEK_SET;
117
1.67M
  f.l_start = 0;
118
1.67M
  f.l_len = 0;        // Lock/unlock entire file
119
1.67M
  int value = fcntl(fd, F_SETLK, &f);
120
1.67M
  if (value == -1 && 
lock0
) {
121
    // if there is an error in locking, then remove the pathname from lockedfiles
122
0
    lockedFiles.erase(fname);
123
0
  }
124
1.67M
  mutex_lockedFiles.Unlock();
125
1.67M
  return value;
126
1.67M
}
127
128
8.96M
void SetFD_CLOEXEC(int fd, const EnvOptions* options) {
129
8.96M
  if ((options == nullptr || 
options->set_fd_cloexec8.09M
) &&
fd > 08.96M
) {
130
8.96M
    fcntl(fd, F_SETFD, fcntl(fd, F_GETFD) | FD_CLOEXEC);
131
8.96M
  }
132
8.96M
}
133
134
class PosixFileLock : public FileLock {
135
 public:
136
  int fd_;
137
  std::string filename;
138
};
139
140
class PosixEnv : public Env {
141
 public:
142
  PosixEnv();
143
  explicit PosixEnv(std::unique_ptr<RocksDBFileFactory> file_factory);
144
145
1.57k
  virtual ~PosixEnv() {
146
1.57k
    for (const auto tid : threads_to_join_) {
147
415
      pthread_join(tid, nullptr);
148
415
    }
149
4.73k
    for (int pool_id = 0; pool_id < Env::Priority::TOTAL; 
++pool_id3.15k
) {
150
3.15k
      thread_pools_[pool_id].JoinAllThreads();
151
3.15k
    }
152
1.57k
  }
153
154
  virtual Status NewSequentialFile(const std::string& fname,
155
                                   unique_ptr<SequentialFile>* result,
156
2.18M
                                   const EnvOptions& options) override {
157
2.18M
    return file_factory_->NewSequentialFile(fname, result, options);
158
2.18M
  }
159
160
  virtual Status NewRandomAccessFile(const std::string& fname,
161
                                     unique_ptr<RandomAccessFile>* result,
162
173k
                                     const EnvOptions& options) override {
163
173k
    return file_factory_->NewRandomAccessFile(fname, result, options);
164
173k
  }
165
166
  virtual Status NewWritableFile(const std::string& fname,
167
                                 unique_ptr<WritableFile>* result,
168
3.71M
                                 const EnvOptions& options) override {
169
3.71M
    return file_factory_->NewWritableFile(fname, result, options);
170
3.71M
  }
171
172
  virtual Status ReuseWritableFile(const std::string& fname,
173
                                   const std::string& old_fname,
174
                                   unique_ptr<WritableFile>* result,
175
57
                                   const EnvOptions& options) override {
176
57
    return file_factory_->ReuseWritableFile(fname, old_fname, result, options);
177
57
  }
178
179
7.57k
  bool IsPlainText() const override {
180
7.57k
    return file_factory_->IsPlainText();
181
7.57k
  }
182
183
  virtual Status NewDirectory(const std::string& name,
184
437k
                              unique_ptr<Directory>* result) override {
185
437k
    result->reset();
186
437k
    int fd;
187
437k
    {
188
437k
      IOSTATS_TIMER_GUARD(open_nanos);
189
437k
      fd = open(name.c_str(), 0);
190
437k
    }
191
437k
    if (fd < 0) {
192
0
      return STATUS_IO_ERROR(name, errno);
193
437k
    } else {
194
437k
      result->reset(new PosixDirectory(fd));
195
437k
    }
196
437k
    return Status::OK();
197
437k
  }
198
199
2.65M
  Status FileExists(const std::string& fname) override {
200
2.65M
    int result = access(fname.c_str(), F_OK);
201
202
2.65M
    if (result == 0) {
203
501k
      return Status::OK();
204
501k
    }
205
206
2.15M
    switch (errno) {
207
0
      case EACCES:
208
0
      case ELOOP:
209
0
      case ENAMETOOLONG:
210
2.15M
      case ENOENT:
211
2.15M
      case ENOTDIR:
212
2.15M
        return STATUS(NotFound, "");
213
0
      default:
214
0
        assert(result == EIO || result == ENOMEM);
215
0
        return STATUS(IOError, "Unexpected error(" + ToString(result) +
216
2.15M
                               ") accessing file `" + fname + "' ");
217
2.15M
    }
218
2.15M
  }
219
220
  virtual Status GetChildren(const std::string& dir,
221
2.99M
                             std::vector<std::string>* result) override {
222
2.99M
    result->clear();
223
2.99M
    DIR* d = opendir(dir.c_str());
224
2.99M
    if (d == nullptr) {
225
2.67k
      return STATUS_IO_ERROR(dir, errno);
226
2.67k
    }
227
2.99M
    struct dirent* entry;
228
29.2M
    while ((entry = readdir(d)) != nullptr) {
229
26.2M
      result->push_back(entry->d_name);
230
26.2M
    }
231
2.99M
    closedir(d);
232
2.99M
    return Status::OK();
233
2.99M
  }
234
235
3.61M
  Status DeleteFile(const std::string& fname) override {
236
3.61M
    Status result;
237
3.61M
    if (unlink(fname.c_str()) != 0) {
238
4.72k
      result = STATUS_IO_ERROR(fname, errno);
239
4.72k
    }
240
3.61M
    return result;
241
3.61M
  };
242
243
11.2k
  Status CreateDir(const std::string& name) override {
244
11.2k
    Status result;
245
11.2k
    if (mkdir(name.c_str(), 0755) != 0) {
246
3
      result = STATUS_IO_ERROR(name, errno);
247
3
    }
248
11.2k
    return result;
249
11.2k
  };
250
251
1.30M
  Status CreateDirIfMissing(const std::string& name) override {
252
1.30M
    Status result;
253
1.30M
    if (mkdir(name.c_str(), 0755) != 0) {
254
1.30M
      if (errno != EEXIST) {
255
0
        result = STATUS_IO_ERROR(name, errno);
256
0
        LOG(DFATAL) << "Mkdir failed: " << result;
257
1.30M
      } else if (!DirExists(name)) { // Check that name is actually a
258
                                     // directory.
259
        // Message is taken from mkdir
260
0
        result = STATUS(IOError, "`"+name+"' exists but is not a directory");
261
0
      }
262
1.30M
    }
263
1.30M
    return result;
264
1.30M
  };
265
266
421k
  Status DeleteDir(const std::string& name) override {
267
421k
    Status result;
268
421k
    if (rmdir(name.c_str()) != 0) {
269
3.07k
      result = STATUS_IO_ERROR(name, errno);
270
3.07k
    }
271
421k
    return result;
272
421k
  };
273
274
837k
  Status GetFileSize(const std::string& fname, uint64_t* size) override {
275
837k
    return file_factory_->GetFileSize(fname, size);
276
837k
  }
277
278
  virtual Status GetFileModificationTime(const std::string& fname,
279
81
                                         uint64_t* file_mtime) override {
280
81
    struct stat s;
281
81
    if (stat(fname.c_str(), &s) !=0) {
282
0
      return STATUS_IO_ERROR(fname, errno);
283
0
    }
284
81
    *file_mtime = static_cast<uint64_t>(s.st_mtime);
285
81
    return Status::OK();
286
81
  }
287
  virtual Status RenameFile(const std::string& src,
288
2.42M
                            const std::string& target) override {
289
2.42M
    Status result;
290
2.42M
    if (rename(src.c_str(), target.c_str()) != 0) {
291
10
      result = STATUS_IO_ERROR(src, errno);
292
10
    }
293
2.42M
    return result;
294
2.42M
  }
295
296
  virtual Status LinkFile(const std::string& src,
297
16.3k
                          const std::string& target) override {
298
16.3k
    Status result;
299
16.3k
    if (link(src.c_str(), target.c_str()) != 0) {
300
27
      if (errno == EXDEV) {
301
0
        return STATUS(NotSupported, "No cross FS links allowed");
302
0
      }
303
27
      result = STATUS_IO_ERROR(src, errno);
304
27
    }
305
16.3k
    return result;
306
16.3k
  }
307
308
855k
  Status LockFile(const std::string& fname, FileLock** lock) override {
309
855k
    *lock = nullptr;
310
855k
    Status result;
311
855k
    int fd;
312
855k
    {
313
855k
      IOSTATS_TIMER_GUARD(open_nanos);
314
855k
      fd = open(fname.c_str(), O_RDWR | O_CREAT, 0644);
315
855k
    }
316
855k
    if (fd < 0) {
317
472
      result = STATUS_IO_ERROR(fname, errno);
318
855k
    } else if (LockOrUnlock(fname, fd, true) == -1) {
319
9
      result = STATUS_IO_ERROR("lock " + fname, errno);
320
9
      close(fd);
321
855k
    } else {
322
855k
      SetFD_CLOEXEC(fd, nullptr);
323
855k
      PosixFileLock* my_lock = new PosixFileLock;
324
855k
      my_lock->fd_ = fd;
325
855k
      my_lock->filename = fname;
326
855k
      *lock = my_lock;
327
855k
    }
328
855k
    return result;
329
855k
  }
330
331
816k
  Status UnlockFile(FileLock* lock) override {
332
816k
    PosixFileLock* my_lock = reinterpret_cast<PosixFileLock*>(lock);
333
816k
    Status result;
334
816k
    if (LockOrUnlock(my_lock->filename, my_lock->fd_, false) == -1) {
335
0
      result = STATUS_IO_ERROR("unlock", errno);
336
0
    }
337
816k
    close(my_lock->fd_);
338
816k
    delete my_lock;
339
816k
    return result;
340
816k
  }
341
342
  virtual void Schedule(void (*function)(void* arg1), void* arg,
343
                        Priority pri = LOW, void* tag = nullptr,
344
                        void (*unschedFunction)(void* arg) = 0) override;
345
346
  int UnSchedule(void* arg, Priority pri) override;
347
348
  void StartThread(void (*function)(void* arg), void* arg) override;
349
350
  void WaitForJoin() override;
351
352
  unsigned int GetThreadPoolQueueLen(Priority pri = LOW) const override;
353
354
4.22k
  Status GetTestDirectory(std::string* result) override {
355
4.22k
    const char* env = getenv("TEST_TMPDIR");
356
4.22k
    if (env && env[0] != '\0') {
357
4.22k
      *result = env;
358
4.22k
    } else {
359
0
      char buf[100];
360
0
      snprintf(buf, sizeof(buf), "/tmp/rocksdbtest-%ud", geteuid());
361
0
      *result = buf;
362
0
    }
363
    // Directory may already exist
364
4.22k
    if (!DirExists(*result)) {
365
0
      RETURN_NOT_OK(CreateDir(*result));
366
0
    }
367
4.22k
    return Status::OK();
368
4.22k
  }
369
370
1.96M
  static uint64_t gettid(pthread_t tid) {
371
1.96M
    uint64_t thread_id = 0;
372
1.96M
    memcpy(&thread_id, &tid, std::min(sizeof(thread_id), sizeof(tid)));
373
1.96M
    return thread_id;
374
1.96M
  }
375
376
1.95M
  static uint64_t gettid() {
377
1.95M
    pthread_t tid = pthread_self();
378
1.95M
    return gettid(tid);
379
1.95M
  }
380
381
4.62k
  uint64_t GetThreadID() const override {
382
4.62k
    return gettid(pthread_self());
383
4.62k
  }
384
385
  virtual Status NewLogger(const std::string& fname,
386
21.0k
                           shared_ptr<Logger>* result) override {
387
21.0k
    FILE* f;
388
21.0k
    {
389
21.0k
      IOSTATS_TIMER_GUARD(open_nanos);
390
21.0k
      f = fopen(fname.c_str(), "w");
391
21.0k
    }
392
21.0k
    if (f == nullptr) {
393
2
      result->reset();
394
2
      return STATUS_IO_ERROR(fname, errno);
395
21.0k
    } else {
396
21.0k
      int fd = fileno(f);
397
#ifdef ROCKSDB_FALLOCATE_PRESENT
398
      fallocate(fd, FALLOC_FL_KEEP_SIZE, 0, 4 * 1024);
399
#endif
400
21.0k
      SetFD_CLOEXEC(fd, nullptr);
401
21.0k
      result->reset(new PosixLogger(f, &PosixEnv::gettid, this));
402
21.0k
      return Status::OK();
403
21.0k
    }
404
21.0k
  }
405
406
31.6M
  uint64_t NowMicros() override {
407
31.6M
    struct timeval tv;
408
31.6M
    gettimeofday(&tv, nullptr);
409
31.6M
    return static_cast<uint64_t>(tv.tv_sec) * 1000000 + tv.tv_usec;
410
31.6M
  }
411
412
3.36M
  uint64_t NowNanos() override {
413
#if defined(__linux__) || defined(OS_FREEBSD)
414
    struct timespec ts;
415
    clock_gettime(CLOCK_MONOTONIC, &ts);
416
    return static_cast<uint64_t>(ts.tv_sec) * 1000000000 + ts.tv_nsec;
417
#elif defined(__MACH__)
418
3.36M
    clock_serv_t cclock;
419
3.36M
    mach_timespec_t ts;
420
3.36M
    host_get_clock_service(mach_host_self(), CALENDAR_CLOCK, &cclock);
421
3.36M
    clock_get_time(cclock, &ts);
422
3.36M
    mach_port_deallocate(mach_task_self(), cclock);
423
3.36M
    return static_cast<uint64_t>(ts.tv_sec) * 1000000000 + ts.tv_nsec;
424
#else
425
    return std::chrono::duration_cast<std::chrono::nanoseconds>(
426
       std::chrono::steady_clock::now().time_since_epoch()).count();
427
#endif
428
3.36M
  }
429
430
60.8k
  void SleepForMicroseconds(int micros) override { usleep(micros); }
431
432
0
  Status GetHostName(char* name, uint64_t len) override {
433
0
    int ret = gethostname(name, static_cast<size_t>(len));
434
0
    if (ret < 0) {
435
0
      if (errno == EFAULT || errno == EINVAL)
436
0
        return STATUS(InvalidArgument, strerror(errno));
437
0
      else
438
0
        return STATUS_IO_ERROR("GetHostName", errno);
439
0
    }
440
0
    return Status::OK();
441
0
  }
442
443
285k
  Status GetCurrentTime(int64_t* unix_time) override {
444
285k
    time_t ret = time(nullptr);
445
285k
    if (ret == (time_t) -1) {
446
0
      return STATUS_IO_ERROR("GetCurrentTime", errno);
447
0
    }
448
285k
    *unix_time = (int64_t) ret;
449
285k
    return Status::OK();
450
285k
  }
451
452
  virtual Status GetAbsolutePath(const std::string& db_path,
453
456k
                                 std::string* output_path) override {
454
456k
    if (db_path.find('/') == 0) {
455
456k
      *output_path = db_path;
456
456k
      return Status::OK();
457
456k
    }
458
459
192
    char the_path[256];
460
192
    char* ret = getcwd(the_path, 256);
461
192
    if (ret == nullptr) {
462
0
      return STATUS(IOError, strerror(errno));
463
0
    }
464
465
192
    *output_path = ret;
466
192
    return Status::OK();
467
192
  }
468
469
  // Allow increasing the number of worker threads.
470
1.17k
  void SetBackgroundThreads(int num, Priority pri) override {
471
1.17k
    assert(pri >= Priority::LOW && pri <= Priority::HIGH);
472
0
    thread_pools_[pri].SetBackgroundThreads(num);
473
1.17k
  }
474
475
  // Allow increasing the number of worker threads.
476
1.71M
  void IncBackgroundThreadsIfNeeded(int num, Priority pri) override {
477
1.71M
    assert(pri >= Priority::LOW && pri <= Priority::HIGH);
478
0
    thread_pools_[pri].IncBackgroundThreadsIfNeeded(num);
479
1.71M
  }
480
481
0
  void LowerThreadPoolIOPriority(Priority pool = LOW) override {
482
0
    assert(pool >= Priority::LOW && pool <= Priority::HIGH);
483
#ifdef __linux__
484
    thread_pools_[pool].LowerIOPriority();
485
#endif
486
0
  }
487
488
0
  std::string TimeToString(uint64_t secondsSince1970) override {
489
0
    const time_t seconds = (time_t)secondsSince1970;
490
0
    struct tm t;
491
0
    int maxsize = 64;
492
0
    std::string dummy;
493
0
    dummy.reserve(maxsize);
494
0
    dummy.resize(maxsize);
495
0
    char* p = &dummy[0];
496
0
    localtime_r(&seconds, &t);
497
0
    snprintf(p, maxsize,
498
0
             "%04d/%02d/%02d-%02d:%02d:%02d ",
499
0
             t.tm_year + 1900,
500
0
             t.tm_mon + 1,
501
0
             t.tm_mday,
502
0
             t.tm_hour,
503
0
             t.tm_min,
504
0
             t.tm_sec);
505
0
    return dummy;
506
0
  }
507
508
  EnvOptions OptimizeForLogWrite(const EnvOptions& env_options,
509
427k
                                 const DBOptions& db_options) const override {
510
427k
    EnvOptions optimized = env_options;
511
427k
    optimized.use_mmap_writes = false;
512
427k
    optimized.bytes_per_sync = db_options.wal_bytes_per_sync;
513
    // TODO(icanadi) it's faster if fallocate_with_keep_size is false, but it
514
    // breaks TransactionLogIteratorStallAtLastRecord unit test. Fix the unit
515
    // test and make this false
516
427k
    optimized.fallocate_with_keep_size = true;
517
427k
    return optimized;
518
427k
  }
519
520
  EnvOptions OptimizeForManifestWrite(
521
691k
      const EnvOptions& env_options) const override {
522
691k
    EnvOptions optimized = env_options;
523
691k
    optimized.use_mmap_writes = false;
524
691k
    optimized.fallocate_with_keep_size = true;
525
691k
    return optimized;
526
691k
  }
527
528
9.28k
  RocksDBFileFactory* GetFileFactory() {
529
9.28k
    return file_factory_.get();
530
9.28k
  }
531
532
 private:
533
  std::unique_ptr<RocksDBFileFactory> file_factory_;
534
  // Returns true iff the named directory exists and is a directory.
535
1.33M
  bool DirExists(const std::string& dname) override {
536
1.33M
    struct stat statbuf;
537
1.33M
    if (stat(dname.c_str(), &statbuf) == 0) {
538
1.33M
      return S_ISDIR(statbuf.st_mode);
539
1.33M
    }
540
7.49k
    return false; // stat() failed return false
541
1.33M
  }
542
543
0
  bool SupportsFastAllocate(const std::string& path) {
544
0
#ifdef ROCKSDB_FALLOCATE_PRESENT
545
0
    struct statfs s;
546
0
    if (statfs(path.c_str(), &s)) {
547
0
      return false;
548
0
    }
549
0
    switch (s.f_type) {
550
0
      case EXT4_SUPER_MAGIC:
551
0
        return true;
552
0
      case XFS_SUPER_MAGIC:
553
0
        return true;
554
0
      case TMPFS_MAGIC:
555
0
        return true;
556
0
      default:
557
0
        return false;
558
0
    }
559
0
#else
560
0
    return false;
561
0
#endif
562
0
  }
563
564
  std::vector<ThreadPool> thread_pools_;
565
  pthread_mutex_t mu_;
566
  std::vector<pthread_t> threads_to_join_;
567
};
568
569
class PosixRocksDBFileFactory : public RocksDBFileFactory {
570
 public:
571
17.0k
  PosixRocksDBFileFactory() {}
572
573
1.41k
  ~PosixRocksDBFileFactory() {}
574
575
  Status NewSequentialFile(const std::string& fname,
576
                           unique_ptr<SequentialFile>* result,
577
2.19M
                           const EnvOptions& options) override {
578
2.19M
    result->reset();
579
2.19M
    FILE* f = nullptr;
580
2.19M
    do {
581
2.19M
      IOSTATS_TIMER_GUARD(open_nanos);
582
2.19M
      f = fopen(fname.c_str(), "r");
583
2.19M
    } while (f == nullptr && errno
== EINTR2
);
584
2.19M
    if (f == nullptr) {
585
2
      *result = nullptr;
586
2
      return STATUS_IO_ERROR(fname, errno);
587
2.19M
    } else {
588
2.19M
      int fd = fileno(f);
589
2.19M
      SetFD_CLOEXEC(fd, &options);
590
2.19M
      *result = std::make_unique<yb::PosixSequentialFile>(fname, f, options);
591
2.19M
      return Status::OK();
592
2.19M
    }
593
2.19M
  }
594
595
  Status NewRandomAccessFile(const std::string& fname,
596
                             unique_ptr<RandomAccessFile>* result,
597
2.18M
                             const EnvOptions& options) override {
598
2.18M
    result->reset();
599
2.18M
    Status s;
600
2.18M
    int fd;
601
2.18M
    {
602
2.18M
      IOSTATS_TIMER_GUARD(open_nanos);
603
2.18M
      fd = open(fname.c_str(), O_RDONLY);
604
2.18M
    }
605
2.18M
    SetFD_CLOEXEC(fd, &options);
606
2.18M
    if (fd < 0) {
607
549
      s = STATUS_IO_ERROR(fname, errno);
608
2.18M
    } else if (options.use_mmap_reads && 
sizeof(void*) >= 84.74k
) {
609
      // Use of mmap for random reads has been removed because it
610
      // kills performance when storage is fast.
611
      // Use mmap when virtual address-space is plentiful.
612
4.74k
      uint64_t size;
613
4.74k
      s = GetFileSize(fname, &size);
614
4.74k
      if (s.ok()) {
615
4.74k
        void* base = mmap(nullptr, size, PROT_READ, MAP_SHARED, fd, 0);
616
4.74k
        if (base != MAP_FAILED) {
617
4.74k
          *result = std::make_unique<PosixMmapReadableFile>(fd, fname, base, size, options);
618
4.74k
        } else {
619
0
          s = STATUS_IO_ERROR(fname, errno);
620
0
        }
621
4.74k
      }
622
4.74k
      close(fd);
623
2.17M
    } else {
624
2.17M
      *result = std::make_unique<yb::PosixRandomAccessFile>(fname, fd, options);
625
2.17M
    }
626
2.18M
    return s;
627
2.18M
  }
628
629
  Status NewWritableFile(const std::string& fname,
630
                         unique_ptr<WritableFile>* result,
631
3.71M
                         const EnvOptions& options) override {
632
3.71M
    result->reset();
633
3.71M
    Status s;
634
3.71M
    int fd = -1;
635
3.71M
    do {
636
3.71M
      IOSTATS_TIMER_GUARD(open_nanos);
637
3.71M
      fd = open(fname.c_str(), O_CREAT | O_RDWR | O_TRUNC, 0644);
638
3.71M
    } while (fd < 0 && errno
== EINTR28
);
639
3.71M
    if (fd < 0) {
640
28
      s = STATUS_IO_ERROR(fname, errno);
641
3.71M
    } else {
642
3.71M
      SetFD_CLOEXEC(fd, &options);
643
3.71M
      if (options.use_mmap_writes) {
644
1.14k
        if (!checkedDiskForMmap_) {
645
          // this will be executed once in the program's lifetime.
646
          // do not use mmapWrite on non ext-3/xfs/tmpfs systems.
647
39
          if (!SupportsFastAllocate(fname)) {
648
39
            forceMmapOff = true;
649
39
          }
650
39
          checkedDiskForMmap_ = true;
651
39
        }
652
1.14k
      }
653
3.71M
      if (options.use_mmap_writes && 
!forceMmapOff1.14k
) {
654
0
        *result = std::make_unique<PosixMmapFile>(fname, fd, page_size_, options);
655
3.71M
      } else {
656
        // disable mmap writes
657
3.71M
        EnvOptions no_mmap_writes_options = options;
658
3.71M
        no_mmap_writes_options.use_mmap_writes = false;
659
3.71M
        *result = std::make_unique<PosixWritableFile>(fname, fd, no_mmap_writes_options);
660
3.71M
      }
661
3.71M
    }
662
3.71M
    return s;
663
3.71M
  }
664
665
  Status ReuseWritableFile(const std::string& fname,
666
                           const std::string& old_fname,
667
                           unique_ptr<WritableFile>* result,
668
57
                           const EnvOptions& options) override {
669
57
    result->reset();
670
57
    Status s;
671
57
    int fd = -1;
672
57
    do {
673
57
      IOSTATS_TIMER_GUARD(open_nanos);
674
57
      fd = open(old_fname.c_str(), O_RDWR, 0644);
675
57
    } while (fd < 0 && errno
== EINTR0
);
676
57
    if (fd < 0) {
677
0
      s = STATUS_IO_ERROR(fname, errno);
678
57
    } else {
679
57
      SetFD_CLOEXEC(fd, &options);
680
      // rename into place
681
57
      if (rename(old_fname.c_str(), fname.c_str()) != 0) {
682
0
        Status r = STATUS_IO_ERROR(old_fname, errno);
683
0
        close(fd);
684
0
        return r;
685
0
      }
686
57
      if (options.use_mmap_writes) {
687
0
        if (!checkedDiskForMmap_) {
688
          // this will be executed once in the program's lifetime.
689
          // do not use mmapWrite on non ext-3/xfs/tmpfs systems.
690
0
          if (!SupportsFastAllocate(fname)) {
691
0
            forceMmapOff = true;
692
0
          }
693
0
          checkedDiskForMmap_ = true;
694
0
        }
695
0
      }
696
57
      if (options.use_mmap_writes && 
!forceMmapOff0
) {
697
0
        *result = std::make_unique<PosixMmapFile>(fname, fd, page_size_, options);
698
57
      } else {
699
        // disable mmap writes
700
57
        EnvOptions no_mmap_writes_options = options;
701
57
        no_mmap_writes_options.use_mmap_writes = false;
702
703
57
        *result = std::make_unique<PosixWritableFile>(fname, fd, no_mmap_writes_options);
704
57
      }
705
57
    }
706
57
    return s;
707
57
  }
708
709
841k
  Status GetFileSize(const std::string& fname, uint64_t* size) override {
710
841k
    Status s;
711
841k
    struct stat sbuf;
712
841k
    if (stat(fname.c_str(), &sbuf) != 0) {
713
20
      *size = 0;
714
20
      s = STATUS_IO_ERROR(fname, errno);
715
841k
    } else {
716
841k
      *size = sbuf.st_size;
717
841k
    }
718
841k
    return s;
719
841k
  }
720
721
7.57k
  bool IsPlainText() const override {
722
7.57k
    return true;
723
7.57k
  }
724
725
 private:
726
  bool checkedDiskForMmap_ = false;
727
  bool forceMmapOff = false;
728
  size_t page_size_ = getpagesize();
729
730
39
  bool SupportsFastAllocate(const std::string& path) {
731
#ifdef ROCKSDB_FALLOCATE_PRESENT
732
    struct statfs s;
733
    if (statfs(path.c_str(), &s)) {
734
      return false;
735
    }
736
    switch (s.f_type) {
737
      case EXT4_SUPER_MAGIC:
738
        return true;
739
      case XFS_SUPER_MAGIC:
740
        return true;
741
      case TMPFS_MAGIC:
742
        return true;
743
      default:
744
        return false;
745
    }
746
#else
747
39
    return false;
748
39
#endif
749
39
  }
750
751
};
752
753
PosixEnv::PosixEnv()
754
17.0k
    : file_factory_(std::make_unique<PosixRocksDBFileFactory>()), thread_pools_(Priority::TOTAL) {
755
17.0k
  ThreadPool::PthreadCall("mutex_init", pthread_mutex_init(&mu_, nullptr));
756
51.0k
  for (int pool_id = 0; pool_id < Env::Priority::TOTAL; 
++pool_id34.0k
) {
757
34.0k
    thread_pools_[pool_id].SetThreadPriority(
758
34.0k
        static_cast<Env::Priority>(pool_id));
759
    // This allows later initializing the thread-local-env of each thread.
760
34.0k
    thread_pools_[pool_id].SetHostEnv(this);
761
34.0k
  }
762
17.0k
}
763
764
PosixEnv::PosixEnv(std::unique_ptr<RocksDBFileFactory> file_factory) :
765
9.28k
    file_factory_(std::move(file_factory)), thread_pools_(Priority::TOTAL) {
766
9.28k
  ThreadPool::PthreadCall("mutex_init", pthread_mutex_init(&mu_, nullptr));
767
27.8k
  for (int pool_id = 0; pool_id < Env::Priority::TOTAL; 
++pool_id18.5k
) {
768
18.5k
    thread_pools_[pool_id].SetThreadPriority(
769
18.5k
        static_cast<Env::Priority>(pool_id));
770
    // This allows later initializing the thread-local-env of each thread.
771
18.5k
    thread_pools_[pool_id].SetHostEnv(this);
772
18.5k
  }
773
9.28k
}
774
775
void PosixEnv::Schedule(void (*function)(void* arg1), void* arg, Priority pri,
776
170k
                        void* tag, void (*unschedFunction)(void* arg)) {
777
170k
  assert(pri >= Priority::LOW && pri <= Priority::HIGH);
778
0
  thread_pools_[pri].Schedule(function, arg, tag, unschedFunction);
779
170k
}
780
781
792k
int PosixEnv::UnSchedule(void* arg, Priority pri) {
782
792k
  return thread_pools_[pri].UnSchedule(arg);
783
792k
}
784
785
33
unsigned int PosixEnv::GetThreadPoolQueueLen(Priority pri) const {
786
33
  assert(pri >= Priority::LOW && pri <= Priority::HIGH);
787
0
  return thread_pools_[pri].GetQueueLen();
788
33
}
789
790
struct StartThreadState {
791
  void (*user_function)(void*);
792
  void* arg;
793
};
794
795
2.48k
static void* StartThreadWrapper(void* arg) {
796
2.48k
  StartThreadState* state = reinterpret_cast<StartThreadState*>(arg);
797
2.48k
  state->user_function(state->arg);
798
2.48k
  delete state;
799
2.48k
  return nullptr;
800
2.48k
}
801
802
2.49k
void PosixEnv::StartThread(void (*function)(void* arg), void* arg) {
803
2.49k
  pthread_t t;
804
2.49k
  StartThreadState* state = new StartThreadState;
805
2.49k
  state->user_function = function;
806
2.49k
  state->arg = arg;
807
2.49k
  ThreadPool::PthreadCall(
808
2.49k
      "start thread", pthread_create(&t, nullptr, &StartThreadWrapper, state));
809
2.49k
  ThreadPool::PthreadCall("lock", pthread_mutex_lock(&mu_));
810
2.49k
  threads_to_join_.push_back(t);
811
2.49k
  ThreadPool::PthreadCall("unlock", pthread_mutex_unlock(&mu_));
812
2.49k
}
813
814
32
void PosixEnv::WaitForJoin() {
815
2.07k
  for (const auto tid : threads_to_join_) {
816
2.07k
    pthread_join(tid, nullptr);
817
2.07k
  }
818
32
  threads_to_join_.clear();
819
32
}
820
821
}  // namespace
822
823
437k
std::string Env::GenerateUniqueId() {
824
437k
  std::string uuid_file = "/proc/sys/kernel/random/uuid";
825
826
437k
  Status s = FileExists(uuid_file);
827
437k
  if (s.ok()) {
828
0
    std::string uuid;
829
0
    s = ReadFileToString(this, uuid_file, &uuid);
830
0
    if (s.ok()) {
831
0
      return uuid;
832
0
    }
833
0
  }
834
  // Could not read uuid_file - generate uuid using "nanos-random"
835
437k
  Random64 r(time(nullptr));
836
437k
  uint64_t random_uuid_portion =
837
437k
    r.Uniform(std::numeric_limits<uint64_t>::max());
838
437k
  uint64_t nanos_uuid_portion = NowNanos();
839
437k
  char uuid2[200];
840
437k
  snprintf(uuid2,
841
437k
           200,
842
437k
           "%" PRIu64 "x-%" PRIu64 "x",
843
437k
           nanos_uuid_portion,
844
437k
           random_uuid_portion);
845
437k
  return uuid2;
846
437k
}
847
848
11.4M
Env* Env::Default() {
849
  // The following function call initializes the singletons of ThreadLocalPtr
850
  // right before the static default_env.  This guarantees default_env will
851
  // always being destructed before the ThreadLocalPtr singletons get
852
  // destructed as C++ guarantees that the destructions of static variables
853
  // is in the reverse order of their constructions.
854
  //
855
  // Since static members are destructed in the reverse order
856
  // of their construction, having this call here guarantees that
857
  // the destructor of static PosixEnv will go first, then the
858
  // the singletons of ThreadLocalPtr.
859
11.4M
  ThreadLocalPtr::InitSingletons();
860
  // Make sure that SyncPoint inited before PosixEnv, to be deleted after it.
861
11.4M
#ifndef NDEBUG
862
11.4M
  SyncPoint::GetInstance();
863
11.4M
#endif
864
11.4M
  static PosixEnv default_env;
865
11.4M
  return &default_env;
866
11.4M
}
867
868
9.28k
RocksDBFileFactory* Env::DefaultFileFactory() {
869
9.28k
  return down_cast<PosixEnv*>(Env::Default())->GetFileFactory();
870
9.28k
}
871
872
std::unique_ptr<Env> Env::NewRocksDBDefaultEnv(
873
9.28k
    std::unique_ptr<RocksDBFileFactory> file_factory) {
874
9.28k
  return std::make_unique<PosixEnv>(std::move(file_factory));
875
9.28k
}
876
877
}  // namespace rocksdb