/Users/deen/code/yugabyte-db/src/yb/rocksdb/util/env_hdfs.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | | // This source code is licensed under the BSD-style license found in the |
3 | | // LICENSE file in the root directory of this source tree. An additional grant |
4 | | // of patent rights can be found in the PATENTS file in the same directory. |
5 | | // |
6 | | // The following only applies to changes made to this file as part of YugaByte development. |
7 | | // |
8 | | // Portions Copyright (c) YugaByte, Inc. |
9 | | // |
10 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
11 | | // in compliance with the License. You may obtain a copy of the License at |
12 | | // |
13 | | // http://www.apache.org/licenses/LICENSE-2.0 |
14 | | // |
15 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
16 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
17 | | // or implied. See the License for the specific language governing permissions and limitations |
18 | | // under the License. |
19 | | // |
20 | | |
21 | | #include "yb/rocksdb/env.h" |
22 | | #include "yb/rocksdb/hdfs/env_hdfs.h" |
23 | | |
24 | | #ifdef USE_HDFS |
25 | | #ifndef ROCKSDB_HDFS_FILE_C |
26 | | #define ROCKSDB_HDFS_FILE_C |
27 | | |
28 | | #include <stdio.h> |
29 | | #include <sys/time.h> |
30 | | #include <time.h> |
31 | | |
32 | | #include <algorithm> |
33 | | #include <iostream> |
34 | | #include <sstream> |
35 | | |
36 | | #include "yb/rocksdb/status.h" |
37 | | #include "yb/util/string_util.h" |
38 | | |
39 | | #define HDFS_EXISTS 0 |
40 | | #define HDFS_DOESNT_EXIST -1 |
41 | | #define HDFS_SUCCESS 0 |
42 | | |
43 | | // |
44 | | // This file defines an HDFS environment for rocksdb. It uses the libhdfs |
45 | | // api to access HDFS. All HDFS files created by one instance of rocksdb |
46 | | // will reside on the same HDFS cluster. |
47 | | // |
48 | | |
49 | | namespace rocksdb { |
50 | | |
51 | | namespace { |
52 | | |
53 | | // Log error message |
54 | | static Status IOError(const std::string& context, int err_number) { |
55 | | return STATUS(IOError, context, strerror(err_number)); |
56 | | } |
57 | | |
58 | | // assume that there is one global logger for now. It is not thread-safe, |
59 | | // but need not be because the logger is initialized at db-open time. |
60 | | static Logger* mylog = nullptr; |
61 | | |
62 | | // Used for reading a file from HDFS. It implements both sequential-read |
63 | | // access methods as well as random read access methods. |
64 | | class HdfsReadableFile : virtual public SequentialFile, |
65 | | virtual public RandomAccessFile { |
66 | | private: |
67 | | hdfsFS fileSys_; |
68 | | std::string filename_; |
69 | | hdfsFile hfile_; |
70 | | |
71 | | public: |
72 | | HdfsReadableFile(hdfsFS fileSys, const std::string& fname) |
73 | | : fileSys_(fileSys), filename_(fname), hfile_(nullptr) { |
74 | | Log(InfoLogLevel::DEBUG_LEVEL, mylog, |
75 | | "[hdfs] HdfsReadableFile opening file %s\n", |
76 | | filename_.c_str()); |
77 | | hfile_ = hdfsOpenFile(fileSys_, filename_.c_str(), O_RDONLY, 0, 0, 0); |
78 | | Log(InfoLogLevel::DEBUG_LEVEL, mylog, |
79 | | "[hdfs] HdfsReadableFile opened file %s hfile_=0x%p\n", |
80 | | filename_.c_str(), hfile_); |
81 | | } |
82 | | |
83 | | virtual ~HdfsReadableFile() { |
84 | | Log(InfoLogLevel::DEBUG_LEVEL, mylog, |
85 | | "[hdfs] HdfsReadableFile closing file %s\n", |
86 | | filename_.c_str()); |
87 | | hdfsCloseFile(fileSys_, hfile_); |
88 | | Log(InfoLogLevel::DEBUG_LEVEL, mylog, |
89 | | "[hdfs] HdfsReadableFile closed file %s\n", |
90 | | filename_.c_str()); |
91 | | hfile_ = nullptr; |
92 | | } |
93 | | |
94 | | bool isValid() { |
95 | | return hfile_ != nullptr; |
96 | | } |
97 | | |
98 | | // sequential access, read data at current offset in file |
99 | | virtual Status Read(size_t n, Slice* result, uint8_t* scratch) { |
100 | | Status s; |
101 | | Log(InfoLogLevel::DEBUG_LEVEL, mylog, |
102 | | "[hdfs] HdfsReadableFile reading %s %ld\n", |
103 | | filename_.c_str(), n); |
104 | | |
105 | | char* buffer = scratch; |
106 | | size_t total_bytes_read = 0; |
107 | | tSize bytes_read = 0; |
108 | | tSize remaining_bytes = (tSize)n; |
109 | | |
110 | | // Read a total of n bytes repeatedly until we hit error or eof |
111 | | while (remaining_bytes > 0) { |
112 | | bytes_read = hdfsRead(fileSys_, hfile_, buffer, remaining_bytes); |
113 | | if (bytes_read <= 0) { |
114 | | break; |
115 | | } |
116 | | assert(bytes_read <= remaining_bytes); |
117 | | |
118 | | total_bytes_read += bytes_read; |
119 | | remaining_bytes -= bytes_read; |
120 | | buffer += bytes_read; |
121 | | } |
122 | | assert(total_bytes_read <= n); |
123 | | |
124 | | Log(InfoLogLevel::DEBUG_LEVEL, mylog, |
125 | | "[hdfs] HdfsReadableFile read %s\n", filename_.c_str()); |
126 | | |
127 | | if (bytes_read < 0) { |
128 | | s = IOError(filename_, errno); |
129 | | } else { |
130 | | *result = Slice(scratch, total_bytes_read); |
131 | | } |
132 | | |
133 | | return s; |
134 | | } |
135 | | |
136 | | // random access, read data from specified offset in file |
137 | | virtual Status Read(uint64_t offset, size_t n, Slice* result, |
138 | | char* scratch) const { |
139 | | Status s; |
140 | | Log(InfoLogLevel::DEBUG_LEVEL, mylog, |
141 | | "[hdfs] HdfsReadableFile preading %s\n", filename_.c_str()); |
142 | | ssize_t bytes_read = hdfsPread(fileSys_, |
143 | | hfile_, |
144 | | offset, |
145 | | static_cast<void*>(scratch), |
146 | | static_cast<tSize>(n)); |
147 | | Log(InfoLogLevel::DEBUG_LEVEL, mylog, |
148 | | "[hdfs] HdfsReadableFile pread %s\n", filename_.c_str()); |
149 | | *result = Slice(scratch, (bytes_read < 0) ? 0 : bytes_read); |
150 | | if (bytes_read < 0) { |
151 | | // An error: return a non-ok status |
152 | | s = IOError(filename_, errno); |
153 | | } |
154 | | return s; |
155 | | } |
156 | | |
157 | | virtual Status Skip(uint64_t n) { |
158 | | Log(InfoLogLevel::DEBUG_LEVEL, mylog, |
159 | | "[hdfs] HdfsReadableFile skip %s\n", filename_.c_str()); |
160 | | // get current offset from file |
161 | | tOffset current = hdfsTell(fileSys_, hfile_); |
162 | | if (current < 0) { |
163 | | return IOError(filename_, errno); |
164 | | } |
165 | | // seek to new offset in file |
166 | | tOffset newoffset = current + n; |
167 | | int val = hdfsSeek(fileSys_, hfile_, newoffset); |
168 | | if (val < 0) { |
169 | | return IOError(filename_, errno); |
170 | | } |
171 | | return Status::OK(); |
172 | | } |
173 | | |
174 | | private: |
175 | | |
176 | | // returns true if we are at the end of file, false otherwise |
177 | | bool feof() { |
178 | | Log(InfoLogLevel::DEBUG_LEVEL, mylog, |
179 | | "[hdfs] HdfsReadableFile feof %s\n", filename_.c_str()); |
180 | | if (hdfsTell(fileSys_, hfile_) == fileSize()) { |
181 | | return true; |
182 | | } |
183 | | return false; |
184 | | } |
185 | | |
186 | | // the current size of the file |
187 | | tOffset fileSize() { |
188 | | Log(InfoLogLevel::DEBUG_LEVEL, mylog, |
189 | | "[hdfs] HdfsReadableFile fileSize %s\n", filename_.c_str()); |
190 | | hdfsFileInfo* pFileInfo = hdfsGetPathInfo(fileSys_, filename_.c_str()); |
191 | | tOffset size = 0L; |
192 | | if (pFileInfo != nullptr) { |
193 | | size = pFileInfo->mSize; |
194 | | hdfsFreeFileInfo(pFileInfo, 1); |
195 | | } else { |
196 | | throw HdfsFatalException("fileSize on unknown file " + filename_); |
197 | | } |
198 | | return size; |
199 | | } |
200 | | }; |
201 | | |
202 | | // Appends to an existing file in HDFS. |
203 | | class HdfsWritableFile: public WritableFile { |
204 | | private: |
205 | | hdfsFS fileSys_; |
206 | | std::string filename_; |
207 | | hdfsFile hfile_; |
208 | | |
209 | | public: |
210 | | HdfsWritableFile(hdfsFS fileSys, const std::string& fname) |
211 | | : fileSys_(fileSys), filename_(fname) , hfile_(nullptr) { |
212 | | Log(InfoLogLevel::DEBUG_LEVEL, mylog, |
213 | | "[hdfs] HdfsWritableFile opening %s\n", filename_.c_str()); |
214 | | hfile_ = hdfsOpenFile(fileSys_, filename_.c_str(), O_WRONLY, 0, 0, 0); |
215 | | Log(InfoLogLevel::DEBUG_LEVEL, mylog, |
216 | | "[hdfs] HdfsWritableFile opened %s\n", filename_.c_str()); |
217 | | assert(hfile_ != nullptr); |
218 | | } |
219 | | virtual ~HdfsWritableFile() { |
220 | | if (hfile_ != nullptr) { |
221 | | Log(InfoLogLevel::DEBUG_LEVEL, mylog, |
222 | | "[hdfs] HdfsWritableFile closing %s\n", filename_.c_str()); |
223 | | hdfsCloseFile(fileSys_, hfile_); |
224 | | Log(InfoLogLevel::DEBUG_LEVEL, mylog, |
225 | | "[hdfs] HdfsWritableFile closed %s\n", filename_.c_str()); |
226 | | hfile_ = nullptr; |
227 | | } |
228 | | } |
229 | | |
230 | | // If the file was successfully created, then this returns true. |
231 | | // Otherwise returns false. |
232 | | bool isValid() { |
233 | | return hfile_ != nullptr; |
234 | | } |
235 | | |
236 | | // The name of the file, mostly needed for debug logging. |
237 | | const std::string& getName() { |
238 | | return filename_; |
239 | | } |
240 | | |
241 | | virtual Status Append(const Slice& data) { |
242 | | Log(InfoLogLevel::DEBUG_LEVEL, mylog, |
243 | | "[hdfs] HdfsWritableFile Append %s\n", filename_.c_str()); |
244 | | const char* src = data.data(); |
245 | | size_t left = data.size(); |
246 | | size_t ret = hdfsWrite(fileSys_, hfile_, src, left); |
247 | | Log(InfoLogLevel::DEBUG_LEVEL, mylog, |
248 | | "[hdfs] HdfsWritableFile Appended %s\n", filename_.c_str()); |
249 | | if (ret != left) { |
250 | | return IOError(filename_, errno); |
251 | | } |
252 | | return Status::OK(); |
253 | | } |
254 | | |
255 | | virtual Status Flush() { |
256 | | return Status::OK(); |
257 | | } |
258 | | |
259 | | virtual Status Sync() { |
260 | | Status s; |
261 | | Log(InfoLogLevel::DEBUG_LEVEL, mylog, |
262 | | "[hdfs] HdfsWritableFile Sync %s\n", filename_.c_str()); |
263 | | if (hdfsFlush(fileSys_, hfile_) == -1) { |
264 | | return IOError(filename_, errno); |
265 | | } |
266 | | if (hdfsHSync(fileSys_, hfile_) == -1) { |
267 | | return IOError(filename_, errno); |
268 | | } |
269 | | Log(InfoLogLevel::DEBUG_LEVEL, mylog, |
270 | | "[hdfs] HdfsWritableFile Synced %s\n", filename_.c_str()); |
271 | | return Status::OK(); |
272 | | } |
273 | | |
274 | | // This is used by HdfsLogger to write data to the debug log file |
275 | | virtual Status Append(const char* src, size_t size) { |
276 | | if (hdfsWrite(fileSys_, hfile_, src, size) != (tSize)size) { |
277 | | return IOError(filename_, errno); |
278 | | } |
279 | | return Status::OK(); |
280 | | } |
281 | | |
282 | | virtual Status Close() { |
283 | | Log(InfoLogLevel::DEBUG_LEVEL, mylog, |
284 | | "[hdfs] HdfsWritableFile closing %s\n", filename_.c_str()); |
285 | | if (hdfsCloseFile(fileSys_, hfile_) != 0) { |
286 | | return IOError(filename_, errno); |
287 | | } |
288 | | Log(InfoLogLevel::DEBUG_LEVEL, mylog, |
289 | | "[hdfs] HdfsWritableFile closed %s\n", filename_.c_str()); |
290 | | hfile_ = nullptr; |
291 | | return Status::OK(); |
292 | | } |
293 | | }; |
294 | | |
295 | | // The object that implements the debug logs to reside in HDFS. |
296 | | class HdfsLogger : public Logger { |
297 | | private: |
298 | | HdfsWritableFile* file_; |
299 | | uint64_t (*gettid_)(); // Return the thread id for the current thread |
300 | | |
301 | | public: |
302 | | HdfsLogger(HdfsWritableFile* f, uint64_t (*gettid)()) |
303 | | : file_(f), gettid_(gettid) { |
304 | | Log(InfoLogLevel::DEBUG_LEVEL, mylog, |
305 | | "[hdfs] HdfsLogger opened %s\n", |
306 | | file_->getName().c_str()); |
307 | | } |
308 | | |
309 | | virtual ~HdfsLogger() { |
310 | | Log(InfoLogLevel::DEBUG_LEVEL, mylog, |
311 | | "[hdfs] HdfsLogger closed %s\n", |
312 | | file_->getName().c_str()); |
313 | | delete file_; |
314 | | if (mylog != nullptr && mylog == this) { |
315 | | mylog = nullptr; |
316 | | } |
317 | | } |
318 | | |
319 | | virtual void Logv(const char* format, va_list ap) { |
320 | | const uint64_t thread_id = (*gettid_)(); |
321 | | |
322 | | // We try twice: the first time with a fixed-size stack allocated buffer, |
323 | | // and the second time with a much larger dynamically allocated buffer. |
324 | | char buffer[500]; |
325 | | for (int iter = 0; iter < 2; iter++) { |
326 | | char* base; |
327 | | int bufsize; |
328 | | if (iter == 0) { |
329 | | bufsize = sizeof(buffer); |
330 | | base = buffer; |
331 | | } else { |
332 | | bufsize = 30000; |
333 | | base = new char[bufsize]; |
334 | | } |
335 | | char* p = base; |
336 | | char* limit = base + bufsize; |
337 | | |
338 | | struct timeval now_tv; |
339 | | gettimeofday(&now_tv, nullptr); |
340 | | const time_t seconds = now_tv.tv_sec; |
341 | | struct tm t; |
342 | | localtime_r(&seconds, &t); |
343 | | p += snprintf(p, limit - p, |
344 | | "%04d/%02d/%02d-%02d:%02d:%02d.%06d %llx ", |
345 | | t.tm_year + 1900, |
346 | | t.tm_mon + 1, |
347 | | t.tm_mday, |
348 | | t.tm_hour, |
349 | | t.tm_min, |
350 | | t.tm_sec, |
351 | | static_cast<int>(now_tv.tv_usec), |
352 | | static_cast<uint64_t>(thread_id)); |
353 | | |
354 | | // Print the message |
355 | | if (p < limit) { |
356 | | va_list backup_ap; |
357 | | va_copy(backup_ap, ap); |
358 | | p += vsnprintf(p, limit - p, format, backup_ap); |
359 | | va_end(backup_ap); |
360 | | } |
361 | | |
362 | | // Truncate to available space if necessary |
363 | | if (p >= limit) { |
364 | | if (iter == 0) { |
365 | | continue; // Try again with larger buffer |
366 | | } else { |
367 | | p = limit - 1; |
368 | | } |
369 | | } |
370 | | |
371 | | // Add newline if necessary |
372 | | if (p == base || p[-1] != '\n') { |
373 | | *p++ = '\n'; |
374 | | } |
375 | | |
376 | | assert(p <= limit); |
377 | | file_->Append(base, p-base); |
378 | | file_->Flush(); |
379 | | if (base != buffer) { |
380 | | delete[] base; |
381 | | } |
382 | | break; |
383 | | } |
384 | | } |
385 | | }; |
386 | | |
387 | | } // namespace |
388 | | |
389 | | // Finally, the hdfs environment |
390 | | |
391 | | const std::string HdfsEnv::kProto = "hdfs://"; |
392 | | const std::string HdfsEnv::pathsep = "/"; |
393 | | |
394 | | // open a file for sequential reading |
395 | | Status HdfsEnv::NewSequentialFile(const std::string& fname, |
396 | | unique_ptr<SequentialFile>* result, |
397 | | const EnvOptions& options) { |
398 | | result->reset(); |
399 | | HdfsReadableFile* f = new HdfsReadableFile(fileSys_, fname); |
400 | | if (f == nullptr || !f->isValid()) { |
401 | | delete f; |
402 | | *result = nullptr; |
403 | | return IOError(fname, errno); |
404 | | } |
405 | | result->reset(dynamic_cast<SequentialFile*>(f)); |
406 | | return Status::OK(); |
407 | | } |
408 | | |
409 | | // open a file for random reading |
410 | | Status HdfsEnv::NewRandomAccessFile(const std::string& fname, |
411 | | unique_ptr<RandomAccessFile>* result, |
412 | | const EnvOptions& options) { |
413 | | result->reset(); |
414 | | HdfsReadableFile* f = new HdfsReadableFile(fileSys_, fname); |
415 | | if (f == nullptr || !f->isValid()) { |
416 | | delete f; |
417 | | *result = nullptr; |
418 | | return IOError(fname, errno); |
419 | | } |
420 | | result->reset(dynamic_cast<RandomAccessFile*>(f)); |
421 | | return Status::OK(); |
422 | | } |
423 | | |
424 | | // create a new file for writing |
425 | | Status HdfsEnv::NewWritableFile(const std::string& fname, |
426 | | unique_ptr<WritableFile>* result, |
427 | | const EnvOptions& options) { |
428 | | result->reset(); |
429 | | Status s; |
430 | | HdfsWritableFile* f = new HdfsWritableFile(fileSys_, fname); |
431 | | if (f == nullptr || !f->isValid()) { |
432 | | delete f; |
433 | | *result = nullptr; |
434 | | return IOError(fname, errno); |
435 | | } |
436 | | result->reset(dynamic_cast<WritableFile*>(f)); |
437 | | return Status::OK(); |
438 | | } |
439 | | |
440 | | class HdfsDirectory : public Directory { |
441 | | public: |
442 | | explicit HdfsDirectory(int fd) : fd_(fd) {} |
443 | | ~HdfsDirectory() {} |
444 | | |
445 | | virtual Status Fsync() { return Status::OK(); } |
446 | | |
447 | | private: |
448 | | int fd_; |
449 | | }; |
450 | | |
451 | | Status HdfsEnv::NewDirectory(const std::string& name, |
452 | | unique_ptr<Directory>* result) { |
453 | | int value = hdfsExists(fileSys_, name.c_str()); |
454 | | switch (value) { |
455 | | case HDFS_EXISTS: |
456 | | result->reset(new HdfsDirectory(0)); |
457 | | return Status::OK(); |
458 | | default: // fail if the directory doesn't exist |
459 | | Log(InfoLogLevel::FATAL_LEVEL, |
460 | | mylog, "NewDirectory hdfsExists call failed"); |
461 | | throw HdfsFatalException("hdfsExists call failed with error " + |
462 | | ToString(value) + " on path " + name + |
463 | | ".\n"); |
464 | | } |
465 | | } |
466 | | |
467 | | Status HdfsEnv::FileExists(const std::string& fname) { |
468 | | int value = hdfsExists(fileSys_, fname.c_str()); |
469 | | switch (value) { |
470 | | case HDFS_EXISTS: |
471 | | return Status::OK(); |
472 | | case HDFS_DOESNT_EXIST: |
473 | | return STATUS(NotFound, ""); |
474 | | default: // anything else should be an error |
475 | | Log(InfoLogLevel::FATAL_LEVEL, |
476 | | mylog, "FileExists hdfsExists call failed"); |
477 | | return STATUS(IOError, "hdfsExists call failed with error " + |
478 | | ToString(value) + " on path " + fname + ".\n"); |
479 | | } |
480 | | } |
481 | | |
482 | | Status HdfsEnv::GetChildren(const std::string& path, |
483 | | std::vector<std::string>* result) { |
484 | | int value = hdfsExists(fileSys_, path.c_str()); |
485 | | switch (value) { |
486 | | case HDFS_EXISTS: { // directory exists |
487 | | int numEntries = 0; |
488 | | hdfsFileInfo* pHdfsFileInfo = 0; |
489 | | pHdfsFileInfo = hdfsListDirectory(fileSys_, path.c_str(), &numEntries); |
490 | | if (numEntries >= 0) { |
491 | | for(int i = 0; i < numEntries; i++) { |
492 | | char* pathname = pHdfsFileInfo[i].mName; |
493 | | char* filename = rindex(pathname, '/'); |
494 | | if (filename != nullptr) { |
495 | | result->push_back(filename+1); |
496 | | } |
497 | | } |
498 | | if (pHdfsFileInfo != nullptr) { |
499 | | hdfsFreeFileInfo(pHdfsFileInfo, numEntries); |
500 | | } |
501 | | } else { |
502 | | // numEntries < 0 indicates error |
503 | | Log(InfoLogLevel::FATAL_LEVEL, mylog, |
504 | | "hdfsListDirectory call failed with error "); |
505 | | throw HdfsFatalException( |
506 | | "hdfsListDirectory call failed negative error.\n"); |
507 | | } |
508 | | break; |
509 | | } |
510 | | case HDFS_DOESNT_EXIST: // directory does not exist, exit |
511 | | break; |
512 | | default: // anything else should be an error |
513 | | Log(InfoLogLevel::FATAL_LEVEL, mylog, |
514 | | "GetChildren hdfsExists call failed"); |
515 | | throw HdfsFatalException("hdfsExists call failed with error " + |
516 | | ToString(value) + ".\n"); |
517 | | } |
518 | | return Status::OK(); |
519 | | } |
520 | | |
521 | | Status HdfsEnv::DeleteFile(const std::string& fname) { |
522 | | if (hdfsDelete(fileSys_, fname.c_str(), 1) == 0) { |
523 | | return Status::OK(); |
524 | | } |
525 | | return IOError(fname, errno); |
526 | | } |
527 | | |
528 | | Status HdfsEnv::CreateDir(const std::string& name) { |
529 | | if (hdfsCreateDirectory(fileSys_, name.c_str()) == 0) { |
530 | | return Status::OK(); |
531 | | } |
532 | | return IOError(name, errno); |
533 | | } |
534 | | |
535 | | Status HdfsEnv::CreateDirIfMissing(const std::string& name) { |
536 | | const int value = hdfsExists(fileSys_, name.c_str()); |
537 | | // Not atomic. state might change b/w hdfsExists and CreateDir. |
538 | | switch (value) { |
539 | | case HDFS_EXISTS: |
540 | | return Status::OK(); |
541 | | case HDFS_DOESNT_EXIST: |
542 | | return CreateDir(name); |
543 | | default: // anything else should be an error |
544 | | Log(InfoLogLevel::FATAL_LEVEL, mylog, |
545 | | "CreateDirIfMissing hdfsExists call failed"); |
546 | | throw HdfsFatalException("hdfsExists call failed with error " + |
547 | | ToString(value) + ".\n"); |
548 | | } |
549 | | } |
550 | | |
551 | | Status HdfsEnv::DeleteDir(const std::string& name) { |
552 | | return DeleteFile(name); |
553 | | } |
554 | | |
555 | | Status HdfsEnv::GetFileSize(const std::string& fname, uint64_t* size) { |
556 | | *size = 0L; |
557 | | hdfsFileInfo* pFileInfo = hdfsGetPathInfo(fileSys_, fname.c_str()); |
558 | | if (pFileInfo != nullptr) { |
559 | | *size = pFileInfo->mSize; |
560 | | hdfsFreeFileInfo(pFileInfo, 1); |
561 | | return Status::OK(); |
562 | | } |
563 | | return IOError(fname, errno); |
564 | | } |
565 | | |
566 | | Status HdfsEnv::GetFileModificationTime(const std::string& fname, |
567 | | uint64_t* time) { |
568 | | hdfsFileInfo* pFileInfo = hdfsGetPathInfo(fileSys_, fname.c_str()); |
569 | | if (pFileInfo != nullptr) { |
570 | | *time = static_cast<uint64_t>(pFileInfo->mLastMod); |
571 | | hdfsFreeFileInfo(pFileInfo, 1); |
572 | | return Status::OK(); |
573 | | } |
574 | | return IOError(fname, errno); |
575 | | } |
576 | | |
577 | | // The rename is not atomic. HDFS does not allow a renaming if the |
578 | | // target already exists. So, we delete the target before attempting the |
579 | | // rename. |
580 | | Status HdfsEnv::RenameFile(const std::string& src, const std::string& target) { |
581 | | hdfsDelete(fileSys_, target.c_str(), 1); |
582 | | if (hdfsRename(fileSys_, src.c_str(), target.c_str()) == 0) { |
583 | | return Status::OK(); |
584 | | } |
585 | | return IOError(src, errno); |
586 | | } |
587 | | |
588 | | Status HdfsEnv::LockFile(const std::string& fname, FileLock** lock) { |
589 | | // there isn's a very good way to atomically check and create |
590 | | // a file via libhdfs |
591 | | *lock = nullptr; |
592 | | return Status::OK(); |
593 | | } |
594 | | |
595 | | Status HdfsEnv::UnlockFile(FileLock* lock) { |
596 | | return Status::OK(); |
597 | | } |
598 | | |
599 | | Status HdfsEnv::NewLogger(const std::string& fname, |
600 | | shared_ptr<Logger>* result) { |
601 | | HdfsWritableFile* f = new HdfsWritableFile(fileSys_, fname); |
602 | | if (f == nullptr || !f->isValid()) { |
603 | | delete f; |
604 | | *result = nullptr; |
605 | | return IOError(fname, errno); |
606 | | } |
607 | | HdfsLogger* h = new HdfsLogger(f, &HdfsEnv::gettid); |
608 | | result->reset(h); |
609 | | if (mylog == nullptr) { |
610 | | // mylog = h; // uncomment this for detailed logging |
611 | | } |
612 | | return Status::OK(); |
613 | | } |
614 | | |
615 | | // The factory method for creating an HDFS Env |
616 | | Status NewHdfsEnv(Env** hdfs_env, const std::string& fsname) { |
617 | | *hdfs_env = new HdfsEnv(fsname); |
618 | | return Status::OK(); |
619 | | } |
620 | | } // namespace rocksdb |
621 | | |
622 | | #endif // ROCKSDB_HDFS_FILE_C |
623 | | |
624 | | #else // USE_HDFS |
625 | | |
626 | | // dummy placeholders used when HDFS is not available |
627 | | namespace rocksdb { |
628 | | Status HdfsEnv::NewSequentialFile(const std::string& fname, |
629 | | std::unique_ptr<SequentialFile>* result, |
630 | 0 | const EnvOptions& options) { |
631 | 0 | return STATUS(NotSupported, "Not compiled with hdfs support"); |
632 | 0 | } |
633 | | |
634 | 0 | Status NewHdfsEnv(Env** hdfs_env, const std::string& fsname) { |
635 | 0 | return STATUS(NotSupported, "Not compiled with hdfs support"); |
636 | 0 | } |
637 | | } // namespace rocksdb |
638 | | |
639 | | #endif |