/Users/deen/code/yugabyte-db/src/yb/rocksdb/util/delete_scheduler.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | | // This source code is licensed under the BSD-style license found in the |
3 | | // LICENSE file in the root directory of this source tree. An additional grant |
4 | | // of patent rights can be found in the PATENTS file in the same directory. |
5 | | // |
6 | | // The following only applies to changes made to this file as part of YugaByte development. |
7 | | // |
8 | | // Portions Copyright (c) YugaByte, Inc. |
9 | | // |
10 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
11 | | // in compliance with the License. You may obtain a copy of the License at |
12 | | // |
13 | | // http://www.apache.org/licenses/LICENSE-2.0 |
14 | | // |
15 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
16 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
17 | | // or implied. See the License for the specific language governing permissions and limitations |
18 | | // under the License. |
19 | | // |
20 | | |
21 | | #include "yb/rocksdb/util/delete_scheduler.h" |
22 | | |
23 | | #include <thread> |
24 | | #include <vector> |
25 | | |
26 | | #include "yb/rocksdb/port/port.h" |
27 | | #include "yb/rocksdb/env.h" |
28 | | #include "yb/rocksdb/util/sst_file_manager_impl.h" |
29 | | #include "yb/rocksdb/util/mutexlock.h" |
30 | | #include "yb/rocksdb/util/sync_point.h" |
31 | | |
32 | | namespace rocksdb { |
33 | | |
34 | | DeleteScheduler::DeleteScheduler(Env* env, const std::string& trash_dir, |
35 | | int64_t rate_bytes_per_sec, Logger* info_log, |
36 | | SstFileManagerImpl* sst_file_manager) |
37 | | : env_(env), |
38 | | trash_dir_(trash_dir), |
39 | | rate_bytes_per_sec_(rate_bytes_per_sec), |
40 | | pending_files_(0), |
41 | | closing_(false), |
42 | | cv_(&mu_), |
43 | | info_log_(info_log), |
44 | 27 | sst_file_manager_(sst_file_manager) { |
45 | 27 | if (rate_bytes_per_sec_ <= 0) { |
46 | | // Rate limiting is disabled |
47 | 9 | bg_thread_.reset(); |
48 | 18 | } else { |
49 | 18 | bg_thread_.reset( |
50 | 18 | new std::thread(&DeleteScheduler::BackgroundEmptyTrash, this)); |
51 | 18 | } |
52 | 27 | } |
53 | | |
54 | 27 | DeleteScheduler::~DeleteScheduler() { |
55 | 27 | { |
56 | 27 | MutexLock l(&mu_); |
57 | 27 | closing_ = true; |
58 | 27 | cv_.SignalAll(); |
59 | 27 | } |
60 | 27 | if (bg_thread_) { |
61 | 18 | bg_thread_->join(); |
62 | 18 | } |
63 | 27 | } |
64 | | |
65 | 1.38k | Status DeleteScheduler::DeleteFile(const std::string& file_path) { |
66 | 1.38k | Status s; |
67 | 1.38k | if (rate_bytes_per_sec_ <= 0) { |
68 | | // Rate limiting is disabled |
69 | 170 | s = env_->DeleteFile(file_path); |
70 | 170 | if (s.ok() && sst_file_manager_) { |
71 | 160 | RETURN_NOT_OK(sst_file_manager_->OnDeleteFile(file_path)); |
72 | 160 | } |
73 | 170 | return s; |
74 | 1.21k | } |
75 | | |
76 | | // Move file to trash |
77 | 1.21k | std::string path_in_trash; |
78 | 1.21k | s = MoveToTrash(file_path, &path_in_trash); |
79 | 1.21k | if (!s.ok()) { |
80 | 10 | RLOG(InfoLogLevel::ERROR_LEVEL, info_log_, |
81 | 10 | "Failed to move %s to trash directory (%s)", file_path.c_str(), |
82 | 10 | trash_dir_.c_str()); |
83 | 10 | s = env_->DeleteFile(file_path); |
84 | 10 | if (s.ok() && sst_file_manager_) { |
85 | 0 | RETURN_NOT_OK(sst_file_manager_->OnDeleteFile(file_path)); |
86 | 0 | } |
87 | 10 | return s; |
88 | 1.20k | } |
89 | | |
90 | | // Add file to delete queue |
91 | 1.20k | { |
92 | 1.20k | MutexLock l(&mu_); |
93 | 1.20k | queue_.push(path_in_trash); |
94 | 1.20k | pending_files_++; |
95 | 1.20k | if (pending_files_ == 1) { |
96 | 22 | cv_.SignalAll(); |
97 | 22 | } |
98 | 1.20k | } |
99 | 1.20k | return s; |
100 | 1.20k | } |
101 | | |
102 | 17 | std::map<std::string, Status> DeleteScheduler::GetBackgroundErrors() { |
103 | 17 | MutexLock l(&mu_); |
104 | 17 | return bg_errors_; |
105 | 17 | } |
106 | | |
107 | | Status DeleteScheduler::MoveToTrash(const std::string& file_path, |
108 | 1.21k | std::string* path_in_trash) { |
109 | 1.21k | Status s; |
110 | | // Figure out the name of the file in trash folder |
111 | 1.21k | size_t idx = file_path.rfind("/"); |
112 | 1.21k | if (idx == std::string::npos || idx == file_path.size() - 1) { |
113 | 0 | return STATUS(InvalidArgument, "file_path is corrupted"); |
114 | 0 | } |
115 | 1.21k | *path_in_trash = trash_dir_ + file_path.substr(idx); |
116 | 1.21k | std::string unique_suffix = ""; |
117 | | |
118 | 1.21k | if (*path_in_trash == file_path) { |
119 | | // This file is already in trash |
120 | 0 | return s; |
121 | 0 | } |
122 | | |
123 | | // TODO(tec) : Implement Env::RenameFileIfNotExist and remove |
124 | | // file_move_mu mutex. |
125 | 1.21k | MutexLock l(&file_move_mu_); |
126 | 1.22k | while (true) { |
127 | 1.22k | s = env_->FileExists(*path_in_trash + unique_suffix); |
128 | 1.22k | if (s.IsNotFound()) { |
129 | | // We found a path for our file in trash |
130 | 1.21k | *path_in_trash += unique_suffix; |
131 | 1.21k | s = env_->RenameFile(file_path, *path_in_trash); |
132 | 1.21k | break; |
133 | 9 | } else if (s.ok()) { |
134 | | // Name conflict, generate new random suffix |
135 | 9 | unique_suffix = env_->GenerateUniqueId(); |
136 | 0 | } else { |
137 | | // Error during FileExists call, we cannot continue |
138 | 0 | break; |
139 | 0 | } |
140 | 1.22k | } |
141 | 1.21k | if (s.ok() && sst_file_manager_) { |
142 | 32 | RETURN_NOT_OK(sst_file_manager_->OnMoveFile(file_path, *path_in_trash)); |
143 | 32 | } |
144 | 1.21k | return s; |
145 | 1.21k | } |
146 | | |
147 | 18 | void DeleteScheduler::BackgroundEmptyTrash() { |
148 | 18 | TEST_SYNC_POINT("DeleteScheduler::BackgroundEmptyTrash"); |
149 | | |
150 | 40 | while (true) { |
151 | 40 | MutexLock l(&mu_); |
152 | 67 | while (queue_.empty() && !closing_) { |
153 | 27 | cv_.Wait(); |
154 | 27 | } |
155 | | |
156 | 40 | if (closing_) { |
157 | 18 | return; |
158 | 18 | } |
159 | | |
160 | | // Delete all files in queue_ |
161 | 22 | uint64_t start_time = env_->NowMicros(); |
162 | 22 | uint64_t total_deleted_bytes = 0; |
163 | 1.12k | while (!queue_.empty() && !closing_) { |
164 | 1.10k | std::string path_in_trash = queue_.front(); |
165 | 1.10k | queue_.pop(); |
166 | | |
167 | | // We dont need to hold the lock while deleting the file |
168 | 1.10k | mu_.Unlock(); |
169 | 1.10k | uint64_t deleted_bytes = 0; |
170 | | // Delete file from trash and update total_penlty value |
171 | 1.10k | Status s = DeleteTrashFile(path_in_trash, &deleted_bytes); |
172 | 1.10k | total_deleted_bytes += deleted_bytes; |
173 | 1.10k | mu_.Lock(); |
174 | | |
175 | 1.10k | if (!s.ok()) { |
176 | 10 | bg_errors_[path_in_trash] = s; |
177 | 10 | } |
178 | | |
179 | | // Apply penlty if necessary |
180 | 1.10k | uint64_t total_penlty = |
181 | 1.10k | ((total_deleted_bytes * kMicrosInSecond) / rate_bytes_per_sec_); |
182 | 1.10k | while (!closing_ && !cv_.TimedWait(start_time + total_penlty)) {} |
183 | 1.10k | TEST_SYNC_POINT_CALLBACK("DeleteScheduler::BackgroundEmptyTrash:Wait", |
184 | 1.10k | &total_penlty); |
185 | | |
186 | 1.10k | pending_files_--; |
187 | 1.10k | if (pending_files_ == 0) { |
188 | | // Unblock WaitForEmptyTrash since there are no more files waiting |
189 | | // to be deleted |
190 | 21 | cv_.SignalAll(); |
191 | 21 | } |
192 | 1.10k | } |
193 | 22 | } |
194 | 18 | } |
195 | | |
196 | | Status DeleteScheduler::DeleteTrashFile(const std::string& path_in_trash, |
197 | 1.10k | uint64_t* deleted_bytes) { |
198 | 1.10k | uint64_t file_size; |
199 | 1.10k | Status s = env_->GetFileSize(path_in_trash, &file_size); |
200 | 1.10k | if (s.ok()) { |
201 | 1.09k | TEST_SYNC_POINT("DeleteScheduler::DeleteTrashFile:DeleteFile"); |
202 | 1.09k | s = env_->DeleteFile(path_in_trash); |
203 | 1.09k | } |
204 | | |
205 | 1.10k | if (!s.ok()) { |
206 | | // Error while getting file size or while deleting |
207 | 10 | RLOG(InfoLogLevel::ERROR_LEVEL, info_log_, |
208 | 10 | "Failed to delete %s from trash -- %s", path_in_trash.c_str(), |
209 | 10 | s.ToString().c_str()); |
210 | 10 | *deleted_bytes = 0; |
211 | 1.09k | } else { |
212 | 1.09k | *deleted_bytes = file_size; |
213 | 1.09k | if (sst_file_manager_) { |
214 | 32 | RETURN_NOT_OK(sst_file_manager_->OnDeleteFile(path_in_trash)); |
215 | 32 | } |
216 | 1.09k | } |
217 | | |
218 | 1.10k | return s; |
219 | 1.10k | } |
220 | | |
221 | 21 | void DeleteScheduler::WaitForEmptyTrash() { |
222 | 21 | MutexLock l(&mu_); |
223 | 41 | while (pending_files_ > 0 && !closing_) { |
224 | 20 | cv_.Wait(); |
225 | 20 | } |
226 | 21 | } |
227 | | |
228 | | } // namespace rocksdb |