YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/rocksdb/util/env_hdfs.cc
Line
Count
Source (jump to first uncovered line)
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under the BSD-style license found in the
3
//  LICENSE file in the root directory of this source tree. An additional grant
4
//  of patent rights can be found in the PATENTS file in the same directory.
5
//
6
// The following only applies to changes made to this file as part of YugaByte development.
7
//
8
// Portions Copyright (c) YugaByte, Inc.
9
//
10
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
11
// in compliance with the License.  You may obtain a copy of the License at
12
//
13
// http://www.apache.org/licenses/LICENSE-2.0
14
//
15
// Unless required by applicable law or agreed to in writing, software distributed under the License
16
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
17
// or implied.  See the License for the specific language governing permissions and limitations
18
// under the License.
19
//
20
21
#include "yb/rocksdb/env.h"
22
#include "yb/rocksdb/hdfs/env_hdfs.h"
23
24
#ifdef USE_HDFS
25
#ifndef ROCKSDB_HDFS_FILE_C
26
#define ROCKSDB_HDFS_FILE_C
27
28
#include <stdio.h>
29
#include <sys/time.h>
30
#include <time.h>
31
32
#include <algorithm>
33
#include <iostream>
34
#include <sstream>
35
36
#include "yb/rocksdb/status.h"
37
#include "yb/util/string_util.h"
38
39
#define HDFS_EXISTS 0
40
#define HDFS_DOESNT_EXIST -1
41
#define HDFS_SUCCESS 0
42
43
//
44
// This file defines an HDFS environment for rocksdb. It uses the libhdfs
45
// api to access HDFS. All HDFS files created by one instance of rocksdb
46
// will reside on the same HDFS cluster.
47
//
48
49
namespace rocksdb {
50
51
namespace {
52
53
// Log error message
54
static Status IOError(const std::string& context, int err_number) {
55
  return STATUS(IOError, context, strerror(err_number));
56
}
57
58
// assume that there is one global logger for now. It is not thread-safe,
59
// but need not be because the logger is initialized at db-open time.
60
static Logger* mylog = nullptr;
61
62
// Used for reading a file from HDFS. It implements both sequential-read
63
// access methods as well as random read access methods.
64
class HdfsReadableFile : virtual public SequentialFile,
65
                         virtual public RandomAccessFile {
66
 private:
67
  hdfsFS fileSys_;
68
  std::string filename_;
69
  hdfsFile hfile_;
70
71
 public:
72
  HdfsReadableFile(hdfsFS fileSys, const std::string& fname)
73
      : fileSys_(fileSys), filename_(fname), hfile_(nullptr) {
74
    Log(InfoLogLevel::DEBUG_LEVEL, mylog,
75
        "[hdfs] HdfsReadableFile opening file %s\n",
76
        filename_.c_str());
77
    hfile_ = hdfsOpenFile(fileSys_, filename_.c_str(), O_RDONLY, 0, 0, 0);
78
    Log(InfoLogLevel::DEBUG_LEVEL, mylog,
79
        "[hdfs] HdfsReadableFile opened file %s hfile_=0x%p\n",
80
        filename_.c_str(), hfile_);
81
  }
82
83
  virtual ~HdfsReadableFile() {
84
    Log(InfoLogLevel::DEBUG_LEVEL, mylog,
85
        "[hdfs] HdfsReadableFile closing file %s\n",
86
        filename_.c_str());
87
    hdfsCloseFile(fileSys_, hfile_);
88
    Log(InfoLogLevel::DEBUG_LEVEL, mylog,
89
        "[hdfs] HdfsReadableFile closed file %s\n",
90
        filename_.c_str());
91
    hfile_ = nullptr;
92
  }
93
94
  bool isValid() {
95
    return hfile_ != nullptr;
96
  }
97
98
  // sequential access, read data at current offset in file
99
  virtual Status Read(size_t n, Slice* result, uint8_t* scratch) {
100
    Status s;
101
    Log(InfoLogLevel::DEBUG_LEVEL, mylog,
102
        "[hdfs] HdfsReadableFile reading %s %ld\n",
103
        filename_.c_str(), n);
104
105
    char* buffer = scratch;
106
    size_t total_bytes_read = 0;
107
    tSize bytes_read = 0;
108
    tSize remaining_bytes = (tSize)n;
109
110
    // Read a total of n bytes repeatedly until we hit error or eof
111
    while (remaining_bytes > 0) {
112
      bytes_read = hdfsRead(fileSys_, hfile_, buffer, remaining_bytes);
113
      if (bytes_read <= 0) {
114
        break;
115
      }
116
      assert(bytes_read <= remaining_bytes);
117
118
      total_bytes_read += bytes_read;
119
      remaining_bytes -= bytes_read;
120
      buffer += bytes_read;
121
    }
122
    assert(total_bytes_read <= n);
123
124
    Log(InfoLogLevel::DEBUG_LEVEL, mylog,
125
        "[hdfs] HdfsReadableFile read %s\n", filename_.c_str());
126
127
    if (bytes_read < 0) {
128
      s = IOError(filename_, errno);
129
    } else {
130
      *result = Slice(scratch, total_bytes_read);
131
    }
132
133
    return s;
134
  }
135
136
  // random access, read data from specified offset in file
137
  virtual Status Read(uint64_t offset, size_t n, Slice* result,
138
                      char* scratch) const {
139
    Status s;
140
    Log(InfoLogLevel::DEBUG_LEVEL, mylog,
141
        "[hdfs] HdfsReadableFile preading %s\n", filename_.c_str());
142
    ssize_t bytes_read = hdfsPread(fileSys_,
143
                                   hfile_,
144
                                   offset,
145
                                   static_cast<void*>(scratch),
146
                                   static_cast<tSize>(n));
147
    Log(InfoLogLevel::DEBUG_LEVEL, mylog,
148
        "[hdfs] HdfsReadableFile pread %s\n", filename_.c_str());
149
    *result = Slice(scratch, (bytes_read < 0) ? 0 : bytes_read);
150
    if (bytes_read < 0) {
151
      // An error: return a non-ok status
152
      s = IOError(filename_, errno);
153
    }
154
    return s;
155
  }
156
157
  virtual Status Skip(uint64_t n) {
158
    Log(InfoLogLevel::DEBUG_LEVEL, mylog,
159
        "[hdfs] HdfsReadableFile skip %s\n", filename_.c_str());
160
    // get current offset from file
161
    tOffset current = hdfsTell(fileSys_, hfile_);
162
    if (current < 0) {
163
      return IOError(filename_, errno);
164
    }
165
    // seek to new offset in file
166
    tOffset newoffset = current + n;
167
    int val = hdfsSeek(fileSys_, hfile_, newoffset);
168
    if (val < 0) {
169
      return IOError(filename_, errno);
170
    }
171
    return Status::OK();
172
  }
173
174
 private:
175
176
  // returns true if we are at the end of file, false otherwise
177
  bool feof() {
178
    Log(InfoLogLevel::DEBUG_LEVEL, mylog,
179
        "[hdfs] HdfsReadableFile feof %s\n", filename_.c_str());
180
    if (hdfsTell(fileSys_, hfile_) == fileSize()) {
181
      return true;
182
    }
183
    return false;
184
  }
185
186
  // the current size of the file
187
  tOffset fileSize() {
188
    Log(InfoLogLevel::DEBUG_LEVEL, mylog,
189
        "[hdfs] HdfsReadableFile fileSize %s\n", filename_.c_str());
190
    hdfsFileInfo* pFileInfo = hdfsGetPathInfo(fileSys_, filename_.c_str());
191
    tOffset size = 0L;
192
    if (pFileInfo != nullptr) {
193
      size = pFileInfo->mSize;
194
      hdfsFreeFileInfo(pFileInfo, 1);
195
    } else {
196
      throw HdfsFatalException("fileSize on unknown file " + filename_);
197
    }
198
    return size;
199
  }
200
};
201
202
// Appends to an existing file in HDFS.
203
class HdfsWritableFile: public WritableFile {
204
 private:
205
  hdfsFS fileSys_;
206
  std::string filename_;
207
  hdfsFile hfile_;
208
209
 public:
210
  HdfsWritableFile(hdfsFS fileSys, const std::string& fname)
211
      : fileSys_(fileSys), filename_(fname) , hfile_(nullptr) {
212
    Log(InfoLogLevel::DEBUG_LEVEL, mylog,
213
        "[hdfs] HdfsWritableFile opening %s\n", filename_.c_str());
214
    hfile_ = hdfsOpenFile(fileSys_, filename_.c_str(), O_WRONLY, 0, 0, 0);
215
    Log(InfoLogLevel::DEBUG_LEVEL, mylog,
216
        "[hdfs] HdfsWritableFile opened %s\n", filename_.c_str());
217
    assert(hfile_ != nullptr);
218
  }
219
  virtual ~HdfsWritableFile() {
220
    if (hfile_ != nullptr) {
221
      Log(InfoLogLevel::DEBUG_LEVEL, mylog,
222
          "[hdfs] HdfsWritableFile closing %s\n", filename_.c_str());
223
      hdfsCloseFile(fileSys_, hfile_);
224
      Log(InfoLogLevel::DEBUG_LEVEL, mylog,
225
          "[hdfs] HdfsWritableFile closed %s\n", filename_.c_str());
226
      hfile_ = nullptr;
227
    }
228
  }
229
230
  // If the file was successfully created, then this returns true.
231
  // Otherwise returns false.
232
  bool isValid() {
233
    return hfile_ != nullptr;
234
  }
235
236
  // The name of the file, mostly needed for debug logging.
237
  const std::string& getName() {
238
    return filename_;
239
  }
240
241
  virtual Status Append(const Slice& data) {
242
    Log(InfoLogLevel::DEBUG_LEVEL, mylog,
243
        "[hdfs] HdfsWritableFile Append %s\n", filename_.c_str());
244
    const char* src = data.data();
245
    size_t left = data.size();
246
    size_t ret = hdfsWrite(fileSys_, hfile_, src, left);
247
    Log(InfoLogLevel::DEBUG_LEVEL, mylog,
248
        "[hdfs] HdfsWritableFile Appended %s\n", filename_.c_str());
249
    if (ret != left) {
250
      return IOError(filename_, errno);
251
    }
252
    return Status::OK();
253
  }
254
255
  virtual Status Flush() {
256
    return Status::OK();
257
  }
258
259
  virtual Status Sync() {
260
    Status s;
261
    Log(InfoLogLevel::DEBUG_LEVEL, mylog,
262
        "[hdfs] HdfsWritableFile Sync %s\n", filename_.c_str());
263
    if (hdfsFlush(fileSys_, hfile_) == -1) {
264
      return IOError(filename_, errno);
265
    }
266
    if (hdfsHSync(fileSys_, hfile_) == -1) {
267
      return IOError(filename_, errno);
268
    }
269
    Log(InfoLogLevel::DEBUG_LEVEL, mylog,
270
        "[hdfs] HdfsWritableFile Synced %s\n", filename_.c_str());
271
    return Status::OK();
272
  }
273
274
  // This is used by HdfsLogger to write data to the debug log file
275
  virtual Status Append(const char* src, size_t size) {
276
    if (hdfsWrite(fileSys_, hfile_, src, size) != (tSize)size) {
277
      return IOError(filename_, errno);
278
    }
279
    return Status::OK();
280
  }
281
282
  virtual Status Close() {
283
    Log(InfoLogLevel::DEBUG_LEVEL, mylog,
284
        "[hdfs] HdfsWritableFile closing %s\n", filename_.c_str());
285
    if (hdfsCloseFile(fileSys_, hfile_) != 0) {
286
      return IOError(filename_, errno);
287
    }
288
    Log(InfoLogLevel::DEBUG_LEVEL, mylog,
289
        "[hdfs] HdfsWritableFile closed %s\n", filename_.c_str());
290
    hfile_ = nullptr;
291
    return Status::OK();
292
  }
293
};
294
295
// The object that implements the debug logs to reside in HDFS.
296
class HdfsLogger : public Logger {
297
 private:
298
  HdfsWritableFile* file_;
299
  uint64_t (*gettid_)();  // Return the thread id for the current thread
300
301
 public:
302
  HdfsLogger(HdfsWritableFile* f, uint64_t (*gettid)())
303
      : file_(f), gettid_(gettid) {
304
    Log(InfoLogLevel::DEBUG_LEVEL, mylog,
305
        "[hdfs] HdfsLogger opened %s\n",
306
        file_->getName().c_str());
307
  }
308
309
  virtual ~HdfsLogger() {
310
    Log(InfoLogLevel::DEBUG_LEVEL, mylog,
311
        "[hdfs] HdfsLogger closed %s\n",
312
        file_->getName().c_str());
313
    delete file_;
314
    if (mylog != nullptr && mylog == this) {
315
      mylog = nullptr;
316
    }
317
  }
318
319
  virtual void Logv(const char* format, va_list ap) {
320
    const uint64_t thread_id = (*gettid_)();
321
322
    // We try twice: the first time with a fixed-size stack allocated buffer,
323
    // and the second time with a much larger dynamically allocated buffer.
324
    char buffer[500];
325
    for (int iter = 0; iter < 2; iter++) {
326
      char* base;
327
      int bufsize;
328
      if (iter == 0) {
329
        bufsize = sizeof(buffer);
330
        base = buffer;
331
      } else {
332
        bufsize = 30000;
333
        base = new char[bufsize];
334
      }
335
      char* p = base;
336
      char* limit = base + bufsize;
337
338
      struct timeval now_tv;
339
      gettimeofday(&now_tv, nullptr);
340
      const time_t seconds = now_tv.tv_sec;
341
      struct tm t;
342
      localtime_r(&seconds, &t);
343
      p += snprintf(p, limit - p,
344
                    "%04d/%02d/%02d-%02d:%02d:%02d.%06d %llx ",
345
                    t.tm_year + 1900,
346
                    t.tm_mon + 1,
347
                    t.tm_mday,
348
                    t.tm_hour,
349
                    t.tm_min,
350
                    t.tm_sec,
351
                    static_cast<int>(now_tv.tv_usec),
352
                    static_cast<uint64_t>(thread_id));
353
354
      // Print the message
355
      if (p < limit) {
356
        va_list backup_ap;
357
        va_copy(backup_ap, ap);
358
        p += vsnprintf(p, limit - p, format, backup_ap);
359
        va_end(backup_ap);
360
      }
361
362
      // Truncate to available space if necessary
363
      if (p >= limit) {
364
        if (iter == 0) {
365
          continue;       // Try again with larger buffer
366
        } else {
367
          p = limit - 1;
368
        }
369
      }
370
371
      // Add newline if necessary
372
      if (p == base || p[-1] != '\n') {
373
        *p++ = '\n';
374
      }
375
376
      assert(p <= limit);
377
      file_->Append(base, p-base);
378
      file_->Flush();
379
      if (base != buffer) {
380
        delete[] base;
381
      }
382
      break;
383
    }
384
  }
385
};
386
387
}  // namespace
388
389
// Finally, the hdfs environment
390
391
const std::string HdfsEnv::kProto = "hdfs://";
392
const std::string HdfsEnv::pathsep = "/";
393
394
// open a file for sequential reading
395
Status HdfsEnv::NewSequentialFile(const std::string& fname,
396
                                  unique_ptr<SequentialFile>* result,
397
                                  const EnvOptions& options) {
398
  result->reset();
399
  HdfsReadableFile* f = new HdfsReadableFile(fileSys_, fname);
400
  if (f == nullptr || !f->isValid()) {
401
    delete f;
402
    *result = nullptr;
403
    return IOError(fname, errno);
404
  }
405
  result->reset(dynamic_cast<SequentialFile*>(f));
406
  return Status::OK();
407
}
408
409
// open a file for random reading
410
Status HdfsEnv::NewRandomAccessFile(const std::string& fname,
411
                                    unique_ptr<RandomAccessFile>* result,
412
                                    const EnvOptions& options) {
413
  result->reset();
414
  HdfsReadableFile* f = new HdfsReadableFile(fileSys_, fname);
415
  if (f == nullptr || !f->isValid()) {
416
    delete f;
417
    *result = nullptr;
418
    return IOError(fname, errno);
419
  }
420
  result->reset(dynamic_cast<RandomAccessFile*>(f));
421
  return Status::OK();
422
}
423
424
// create a new file for writing
425
Status HdfsEnv::NewWritableFile(const std::string& fname,
426
                                unique_ptr<WritableFile>* result,
427
                                const EnvOptions& options) {
428
  result->reset();
429
  Status s;
430
  HdfsWritableFile* f = new HdfsWritableFile(fileSys_, fname);
431
  if (f == nullptr || !f->isValid()) {
432
    delete f;
433
    *result = nullptr;
434
    return IOError(fname, errno);
435
  }
436
  result->reset(dynamic_cast<WritableFile*>(f));
437
  return Status::OK();
438
}
439
440
class HdfsDirectory : public Directory {
441
 public:
442
  explicit HdfsDirectory(int fd) : fd_(fd) {}
443
  ~HdfsDirectory() {}
444
445
  virtual Status Fsync() { return Status::OK(); }
446
447
 private:
448
  int fd_;
449
};
450
451
Status HdfsEnv::NewDirectory(const std::string& name,
452
                             unique_ptr<Directory>* result) {
453
  int value = hdfsExists(fileSys_, name.c_str());
454
  switch (value) {
455
    case HDFS_EXISTS:
456
      result->reset(new HdfsDirectory(0));
457
      return Status::OK();
458
    default:  // fail if the directory doesn't exist
459
      Log(InfoLogLevel::FATAL_LEVEL,
460
          mylog, "NewDirectory hdfsExists call failed");
461
      throw HdfsFatalException("hdfsExists call failed with error " +
462
                               ToString(value) + " on path " + name +
463
                               ".\n");
464
  }
465
}
466
467
Status HdfsEnv::FileExists(const std::string& fname) {
468
  int value = hdfsExists(fileSys_, fname.c_str());
469
  switch (value) {
470
    case HDFS_EXISTS:
471
      return Status::OK();
472
    case HDFS_DOESNT_EXIST:
473
      return STATUS(NotFound, "");
474
    default:  // anything else should be an error
475
      Log(InfoLogLevel::FATAL_LEVEL,
476
          mylog, "FileExists hdfsExists call failed");
477
      return STATUS(IOError, "hdfsExists call failed with error " +
478
                             ToString(value) + " on path " + fname + ".\n");
479
  }
480
}
481
482
Status HdfsEnv::GetChildren(const std::string& path,
483
                            std::vector<std::string>* result) {
484
  int value = hdfsExists(fileSys_, path.c_str());
485
  switch (value) {
486
    case HDFS_EXISTS: {  // directory exists
487
    int numEntries = 0;
488
    hdfsFileInfo* pHdfsFileInfo = 0;
489
    pHdfsFileInfo = hdfsListDirectory(fileSys_, path.c_str(), &numEntries);
490
    if (numEntries >= 0) {
491
      for(int i = 0; i < numEntries; i++) {
492
        char* pathname = pHdfsFileInfo[i].mName;
493
        char* filename = rindex(pathname, '/');
494
        if (filename != nullptr) {
495
          result->push_back(filename+1);
496
        }
497
      }
498
      if (pHdfsFileInfo != nullptr) {
499
        hdfsFreeFileInfo(pHdfsFileInfo, numEntries);
500
      }
501
    } else {
502
      // numEntries < 0 indicates error
503
      Log(InfoLogLevel::FATAL_LEVEL, mylog,
504
          "hdfsListDirectory call failed with error ");
505
      throw HdfsFatalException(
506
          "hdfsListDirectory call failed negative error.\n");
507
    }
508
    break;
509
  }
510
  case HDFS_DOESNT_EXIST:  // directory does not exist, exit
511
    break;
512
  default:          // anything else should be an error
513
    Log(InfoLogLevel::FATAL_LEVEL, mylog,
514
        "GetChildren hdfsExists call failed");
515
    throw HdfsFatalException("hdfsExists call failed with error " +
516
                             ToString(value) + ".\n");
517
  }
518
  return Status::OK();
519
}
520
521
Status HdfsEnv::DeleteFile(const std::string& fname) {
522
  if (hdfsDelete(fileSys_, fname.c_str(), 1) == 0) {
523
    return Status::OK();
524
  }
525
  return IOError(fname, errno);
526
}
527
528
Status HdfsEnv::CreateDir(const std::string& name) {
529
  if (hdfsCreateDirectory(fileSys_, name.c_str()) == 0) {
530
    return Status::OK();
531
  }
532
  return IOError(name, errno);
533
}
534
535
Status HdfsEnv::CreateDirIfMissing(const std::string& name) {
536
  const int value = hdfsExists(fileSys_, name.c_str());
537
  //  Not atomic. state might change b/w hdfsExists and CreateDir.
538
  switch (value) {
539
    case HDFS_EXISTS:
540
    return Status::OK();
541
    case HDFS_DOESNT_EXIST:
542
    return CreateDir(name);
543
    default:  // anything else should be an error
544
      Log(InfoLogLevel::FATAL_LEVEL, mylog,
545
          "CreateDirIfMissing hdfsExists call failed");
546
      throw HdfsFatalException("hdfsExists call failed with error " +
547
                               ToString(value) + ".\n");
548
  }
549
}
550
551
Status HdfsEnv::DeleteDir(const std::string& name) {
552
  return DeleteFile(name);
553
}
554
555
Status HdfsEnv::GetFileSize(const std::string& fname, uint64_t* size) {
556
  *size = 0L;
557
  hdfsFileInfo* pFileInfo = hdfsGetPathInfo(fileSys_, fname.c_str());
558
  if (pFileInfo != nullptr) {
559
    *size = pFileInfo->mSize;
560
    hdfsFreeFileInfo(pFileInfo, 1);
561
    return Status::OK();
562
  }
563
  return IOError(fname, errno);
564
}
565
566
Status HdfsEnv::GetFileModificationTime(const std::string& fname,
567
                                        uint64_t* time) {
568
  hdfsFileInfo* pFileInfo = hdfsGetPathInfo(fileSys_, fname.c_str());
569
  if (pFileInfo != nullptr) {
570
    *time = static_cast<uint64_t>(pFileInfo->mLastMod);
571
    hdfsFreeFileInfo(pFileInfo, 1);
572
    return Status::OK();
573
  }
574
  return IOError(fname, errno);
575
}
576
577
// The rename is not atomic. HDFS does not allow a renaming if the
578
// target already exists. So, we delete the target before attempting the
579
// rename.
580
Status HdfsEnv::RenameFile(const std::string& src, const std::string& target) {
581
  hdfsDelete(fileSys_, target.c_str(), 1);
582
  if (hdfsRename(fileSys_, src.c_str(), target.c_str()) == 0) {
583
    return Status::OK();
584
  }
585
  return IOError(src, errno);
586
}
587
588
Status HdfsEnv::LockFile(const std::string& fname, FileLock** lock) {
589
  // there isn's a very good way to atomically check and create
590
  // a file via libhdfs
591
  *lock = nullptr;
592
  return Status::OK();
593
}
594
595
Status HdfsEnv::UnlockFile(FileLock* lock) {
596
  return Status::OK();
597
}
598
599
Status HdfsEnv::NewLogger(const std::string& fname,
600
                          shared_ptr<Logger>* result) {
601
  HdfsWritableFile* f = new HdfsWritableFile(fileSys_, fname);
602
  if (f == nullptr || !f->isValid()) {
603
    delete f;
604
    *result = nullptr;
605
    return IOError(fname, errno);
606
  }
607
  HdfsLogger* h = new HdfsLogger(f, &HdfsEnv::gettid);
608
  result->reset(h);
609
  if (mylog == nullptr) {
610
    // mylog = h; // uncomment this for detailed logging
611
  }
612
  return Status::OK();
613
}
614
615
// The factory method for creating an HDFS Env
616
Status NewHdfsEnv(Env** hdfs_env, const std::string& fsname) {
617
  *hdfs_env = new HdfsEnv(fsname);
618
  return Status::OK();
619
}
620
}  // namespace rocksdb
621
622
#endif // ROCKSDB_HDFS_FILE_C
623
624
#else // USE_HDFS
625
626
// dummy placeholders used when HDFS is not available
627
namespace rocksdb {
628
  Status HdfsEnv::NewSequentialFile(const std::string& fname,
629
                                    std::unique_ptr<SequentialFile>* result,
630
0
                                    const EnvOptions& options) {
631
0
    return STATUS(NotSupported, "Not compiled with hdfs support");
632
0
  }
633
634
0
  Status NewHdfsEnv(Env** hdfs_env, const std::string& fsname) {
635
0
    return STATUS(NotSupported, "Not compiled with hdfs support");
636
0
  }
637
} // namespace rocksdb
638
639
#endif