YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/rocksdb/hdfs/env_hdfs.h
Line
Count
Source (jump to first uncovered line)
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under the BSD-style license found in the
3
//  LICENSE file in the root directory of this source tree. An additional grant
4
//  of patent rights can be found in the PATENTS file in the same directory.
5
//
6
// The following only applies to changes made to this file as part of YugaByte development.
7
//
8
// Portions Copyright (c) YugaByte, Inc.
9
//
10
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
11
// in compliance with the License.  You may obtain a copy of the License at
12
//
13
// http://www.apache.org/licenses/LICENSE-2.0
14
//
15
// Unless required by applicable law or agreed to in writing, software distributed under the License
16
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
17
// or implied.  See the License for the specific language governing permissions and limitations
18
// under the License.
19
//
20
#ifndef ROCKSDB_HDFS_ENV_HDFS_H
21
#define ROCKSDB_HDFS_ENV_HDFS_H
22
23
#pragma once
24
#include <stdio.h>
25
#include <time.h>
26
27
#include <algorithm>
28
#include <iostream>
29
30
#include "yb/rocksdb/port/sys_time.h"
31
#include "yb/rocksdb/env.h"
32
#include "yb/rocksdb/status.h"
33
34
#ifdef USE_HDFS
35
#include <hdfs.h>
36
37
namespace rocksdb {
38
39
// Thrown during execution when there is an issue with the supplied
40
// arguments.
41
class HdfsUsageException : public std::exception { };
42
43
// A simple exception that indicates something went wrong that is not
44
// recoverable.  The intention is for the message to be printed (with
45
// nothing else) and the process terminate.
46
class HdfsFatalException : public std::exception {
47
 public:
48
  explicit HdfsFatalException(const std::string& s) : what_(s) { }
49
  virtual ~HdfsFatalException() throw() { }
50
  virtual const char* what() const throw() {
51
    return what_.c_str();
52
  }
53
 private:
54
  const std::string what_;
55
};
56
57
//
58
// The HDFS environment for rocksdb. This class overrides all the
59
// file/dir access methods and delegates the thread-mgmt methods to the
60
// default posix environment.
61
//
62
class HdfsEnv : public Env {
63
 public:
64
  explicit HdfsEnv(const std::string& fsname) : fsname_(fsname) {
65
    posixEnv = Env::Default();
66
    fileSys_ = connectToPath(fsname_);
67
  }
68
69
  virtual ~HdfsEnv() {
70
    fprintf(stderr, "Destroying HdfsEnv::Default()\n");
71
    hdfsDisconnect(fileSys_);
72
  }
73
74
  virtual Status NewSequentialFile(const std::string& fname,
75
                                   std::unique_ptr<SequentialFile>* result,
76
                                   const EnvOptions& options);
77
78
  virtual Status NewRandomAccessFile(const std::string& fname,
79
                                     std::unique_ptr<RandomAccessFile>* result,
80
                                     const EnvOptions& options);
81
82
  virtual Status NewWritableFile(const std::string& fname,
83
                                 std::unique_ptr<WritableFile>* result,
84
                                 const EnvOptions& options);
85
86
  virtual Status NewDirectory(const std::string& name,
87
                              std::unique_ptr<Directory>* result);
88
89
  virtual Status FileExists(const std::string& fname);
90
91
  virtual Status GetChildren(const std::string& path,
92
                             std::vector<std::string>* result);
93
94
  virtual Status DeleteFile(const std::string& fname);
95
96
  virtual Status CreateDir(const std::string& name);
97
98
  virtual Status CreateDirIfMissing(const std::string& name);
99
100
  virtual Status DeleteDir(const std::string& name);
101
102
  virtual Status GetFileSize(const std::string& fname, uint64_t* size);
103
104
  virtual Status GetFileModificationTime(const std::string& fname,
105
                                         uint64_t* file_mtime);
106
107
  virtual Status RenameFile(const std::string& src, const std::string& target);
108
109
  virtual Status LinkFile(const std::string& src, const std::string& target) {
110
    return STATUS(NotSupported, ""); // not supported
111
  }
112
113
  virtual Status LockFile(const std::string& fname, FileLock** lock);
114
115
  virtual Status UnlockFile(FileLock* lock);
116
117
  virtual Status NewLogger(const std::string& fname,
118
                           std::shared_ptr<Logger>* result);
119
120
  virtual void Schedule(void (*function)(void* arg),
121
                        void* arg,
122
                        Priority pri = LOW,
123
                        void* tag = nullptr,
124
                        void (*unschedFunction)(void* arg) = 0) {
125
    posixEnv->Schedule(function, arg, pri, tag, unschedFunction);
126
  }
127
128
  virtual int UnSchedule(void* tag, Priority pri) {
129
    posixEnv->UnSchedule(tag, pri);
130
  }
131
132
  virtual void StartThread(void (*function)(void* arg), void* arg) {
133
    posixEnv->StartThread(function, arg);
134
  }
135
136
  virtual void WaitForJoin() { posixEnv->WaitForJoin(); }
137
138
  virtual unsigned int GetThreadPoolQueueLen(Priority pri = LOW) const
139
      override {
140
    return posixEnv->GetThreadPoolQueueLen(pri);
141
  }
142
143
  virtual Status GetTestDirectory(std::string* path) {
144
    return posixEnv->GetTestDirectory(path);
145
  }
146
147
  virtual uint64_t NowMicros() {
148
    return posixEnv->NowMicros();
149
  }
150
151
  virtual void SleepForMicroseconds(int micros) {
152
    posixEnv->SleepForMicroseconds(micros);
153
  }
154
155
  virtual Status GetHostName(char* name, uint64_t len) {
156
    return posixEnv->GetHostName(name, len);
157
  }
158
159
  virtual Status GetCurrentTime(int64_t* unix_time) {
160
    return posixEnv->GetCurrentTime(unix_time);
161
  }
162
163
  virtual Status GetAbsolutePath(const std::string& db_path,
164
      std::string* output_path) {
165
    return posixEnv->GetAbsolutePath(db_path, output_path);
166
  }
167
168
  virtual void SetBackgroundThreads(int number, Priority pri = LOW) {
169
    posixEnv->SetBackgroundThreads(number, pri);
170
  }
171
172
  virtual void IncBackgroundThreadsIfNeeded(int number, Priority pri) override {
173
    posixEnv->IncBackgroundThreadsIfNeeded(number, pri);
174
  }
175
176
  virtual std::string TimeToString(uint64_t number) {
177
    return posixEnv->TimeToString(number);
178
  }
179
180
  static uint64_t gettid() {
181
    assert(sizeof(pthread_t) <= sizeof(uint64_t));
182
    return (uint64_t)pthread_self();
183
  }
184
185
  virtual uint64_t GetThreadID() const override {
186
    return HdfsEnv::gettid();
187
  }
188
189
 private:
190
  std::string fsname_;  // string of the form "hdfs://hostname:port/"
191
  hdfsFS fileSys_;      //  a single FileSystem object for all files
192
  Env*  posixEnv;       // This object is derived from Env, but not from
193
                        // posixEnv. We have posixnv as an encapsulated
194
                        // object here so that we can use posix timers,
195
                        // posix threads, etc.
196
197
  static const std::string kProto;
198
  static const std::string pathsep;
199
200
  /**
201
   * If the URI is specified of the form hdfs://server:port/path,
202
   * then connect to the specified cluster
203
   * else connect to default.
204
   */
205
  hdfsFS connectToPath(const std::string& uri) {
206
    if (uri.empty()) {
207
      return nullptr;
208
    }
209
    if (uri.find(kProto) != 0) {
210
      // uri doesn't start with hdfs:// -> use default:0, which is special
211
      // to libhdfs.
212
      return hdfsConnectNewInstance("default", 0);
213
    }
214
    const std::string hostport = uri.substr(kProto.length());
215
216
    std::vector <std::string> parts;
217
    split(hostport, ':', parts);
218
    if (parts.size() != 2) {
219
      throw HdfsFatalException("Bad uri for hdfs " + uri);
220
    }
221
    // parts[0] = hosts, parts[1] = port/xxx/yyy
222
    std::string host(parts[0]);
223
    std::string remaining(parts[1]);
224
225
    int rem = remaining.find(pathsep);
226
    std::string portStr = (rem == 0 ? remaining :
227
                           remaining.substr(0, rem));
228
229
    tPort port;
230
    port = atoi(portStr.c_str());
231
    if (port == 0) {
232
      throw HdfsFatalException("Bad host-port for hdfs " + uri);
233
    }
234
    hdfsFS fs = hdfsConnectNewInstance(host.c_str(), port);
235
    return fs;
236
  }
237
238
  void split(const std::string &s,
239
             char delim,
240
             std::vector<std::string> *elems) {
241
    elems.clear();
242
    size_t prev = 0;
243
    size_t pos = s.find(delim);
244
    while (pos != std::string::npos) {
245
      elems->push_back(s.substr(prev, pos));
246
      prev = pos + 1;
247
      pos = s.find(delim, prev);
248
    }
249
    elems->push_back(s.substr(prev, s.size()));
250
  }
251
};
252
253
}  // namespace rocksdb
254
255
#else // USE_HDFS
256
257
258
namespace rocksdb {
259
260
static const Status notsup;
261
262
class HdfsEnv : public Env {
263
264
 public:
265
0
  explicit HdfsEnv(const std::string& fsname) {
266
0
    fprintf(stderr, "You have not build rocksdb with HDFS support\n");
267
0
    fprintf(stderr, "Please see hdfs/README for details\n");
268
0
    abort();
269
0
  }
270
271
0
  virtual ~HdfsEnv() {
272
0
  }
273
274
  virtual Status NewSequentialFile(const std::string& fname,
275
                                   unique_ptr<SequentialFile>* result,
276
                                   const EnvOptions& options) override;
277
278
  virtual Status NewRandomAccessFile(const std::string& fname,
279
                                     unique_ptr<RandomAccessFile>* result,
280
0
                                     const EnvOptions& options) override {
281
0
    return notsup;
282
0
  }
283
284
  virtual Status NewWritableFile(const std::string& fname,
285
                                 unique_ptr<WritableFile>* result,
286
0
                                 const EnvOptions& options) override {
287
0
    return notsup;
288
0
  }
289
290
  virtual Status NewDirectory(const std::string& name,
291
0
                              unique_ptr<Directory>* result) override {
292
0
    return notsup;
293
0
  }
294
295
0
  virtual Status FileExists(const std::string& fname) override {
296
0
    return notsup;
297
0
  }
298
299
  virtual Status GetChildren(const std::string& path,
300
0
                             std::vector<std::string>* result) override {
301
0
    return notsup;
302
0
  }
303
304
0
  virtual Status DeleteFile(const std::string& fname) override {
305
0
    return notsup;
306
0
  }
307
308
0
  virtual Status CreateDir(const std::string& name) override { return notsup; }
309
310
0
  virtual Status CreateDirIfMissing(const std::string& name) override {
311
0
    return notsup;
312
0
  }
313
314
0
  virtual Status DeleteDir(const std::string& name) override { return notsup; }
315
316
  virtual Status GetFileSize(const std::string& fname,
317
0
                             uint64_t* size) override {
318
0
    return notsup;
319
0
  }
320
321
  virtual Status GetFileModificationTime(const std::string& fname,
322
0
                                         uint64_t* time) override {
323
0
    return notsup;
324
0
  }
325
326
  virtual Status RenameFile(const std::string& src,
327
0
                            const std::string& target) override {
328
0
    return notsup;
329
0
  }
330
331
  virtual Status LinkFile(const std::string& src,
332
0
                          const std::string& target) override {
333
0
    return notsup;
334
0
  }
335
336
0
  virtual Status LockFile(const std::string& fname, FileLock** lock) override {
337
0
    return notsup;
338
0
  }
339
340
0
  virtual Status UnlockFile(FileLock* lock) override { return notsup; }
341
342
  virtual Status NewLogger(const std::string& fname,
343
0
                           shared_ptr<Logger>* result) override {
344
0
    return notsup;
345
0
  }
346
347
  virtual void Schedule(void (*function)(void* arg), void* arg,
348
                        Priority pri = LOW, void* tag = nullptr,
349
0
                        void (*unschedFunction)(void* arg) = 0) override {}
350
351
0
  virtual int UnSchedule(void* tag, Priority pri) override { return 0; }
352
353
0
  virtual void StartThread(void (*function)(void* arg), void* arg) override {}
354
355
0
  virtual void WaitForJoin() override {}
356
357
  virtual unsigned int GetThreadPoolQueueLen(
358
0
      Priority pri = LOW) const override {
359
0
    return 0;
360
0
  }
361
362
0
  virtual Status GetTestDirectory(std::string* path) override { return notsup; }
363
364
0
  virtual uint64_t NowMicros() override { return 0; }
365
366
0
  virtual void SleepForMicroseconds(int micros) override {}
367
368
0
  virtual Status GetHostName(char* name, uint64_t len) override {
369
0
    return notsup;
370
0
  }
371
372
0
  virtual Status GetCurrentTime(int64_t* unix_time) override { return notsup; }
373
374
  virtual Status GetAbsolutePath(const std::string& db_path,
375
0
                                 std::string* outputpath) override {
376
0
    return notsup;
377
0
  }
378
379
0
  virtual void SetBackgroundThreads(int number, Priority pri = LOW) override {}
380
0
  virtual void IncBackgroundThreadsIfNeeded(int number, Priority pri) override {
381
0
  }
382
0
  virtual std::string TimeToString(uint64_t number) override { return ""; }
383
384
0
  virtual uint64_t GetThreadID() const override {
385
0
    return 0;
386
0
  }
387
};
388
} // namespace rocksdb
389
390
#endif // USE_HDFS
391
392
#endif // ROCKSDB_HDFS_ENV_HDFS_H