YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/rocksdb/util/delete_scheduler.cc
Line
Count
Source (jump to first uncovered line)
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under the BSD-style license found in the
3
//  LICENSE file in the root directory of this source tree. An additional grant
4
//  of patent rights can be found in the PATENTS file in the same directory.
5
//
6
// The following only applies to changes made to this file as part of YugaByte development.
7
//
8
// Portions Copyright (c) YugaByte, Inc.
9
//
10
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
11
// in compliance with the License.  You may obtain a copy of the License at
12
//
13
// http://www.apache.org/licenses/LICENSE-2.0
14
//
15
// Unless required by applicable law or agreed to in writing, software distributed under the License
16
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
17
// or implied.  See the License for the specific language governing permissions and limitations
18
// under the License.
19
//
20
21
#include "yb/rocksdb/util/delete_scheduler.h"
22
23
#include <thread>
24
#include <vector>
25
26
#include "yb/rocksdb/port/port.h"
27
#include "yb/rocksdb/env.h"
28
#include "yb/rocksdb/util/sst_file_manager_impl.h"
29
#include "yb/rocksdb/util/mutexlock.h"
30
#include "yb/rocksdb/util/sync_point.h"
31
32
namespace rocksdb {
33
34
DeleteScheduler::DeleteScheduler(Env* env, const std::string& trash_dir,
35
                                 int64_t rate_bytes_per_sec, Logger* info_log,
36
                                 SstFileManagerImpl* sst_file_manager)
37
    : env_(env),
38
      trash_dir_(trash_dir),
39
      rate_bytes_per_sec_(rate_bytes_per_sec),
40
      pending_files_(0),
41
      closing_(false),
42
      cv_(&mu_),
43
      info_log_(info_log),
44
27
      sst_file_manager_(sst_file_manager) {
45
27
  if (rate_bytes_per_sec_ <= 0) {
46
    // Rate limiting is disabled
47
9
    bg_thread_.reset();
48
18
  } else {
49
18
    bg_thread_.reset(
50
18
        new std::thread(&DeleteScheduler::BackgroundEmptyTrash, this));
51
18
  }
52
27
}
53
54
27
DeleteScheduler::~DeleteScheduler() {
55
27
  {
56
27
    MutexLock l(&mu_);
57
27
    closing_ = true;
58
27
    cv_.SignalAll();
59
27
  }
60
27
  if (bg_thread_) {
61
18
    bg_thread_->join();
62
18
  }
63
27
}
64
65
1.38k
Status DeleteScheduler::DeleteFile(const std::string& file_path) {
66
1.38k
  Status s;
67
1.38k
  if (rate_bytes_per_sec_ <= 0) {
68
    // Rate limiting is disabled
69
170
    s = env_->DeleteFile(file_path);
70
170
    if (s.ok() && sst_file_manager_) {
71
160
      RETURN_NOT_OK(sst_file_manager_->OnDeleteFile(file_path));
72
160
    }
73
170
    return s;
74
170
  }
75
76
  // Move file to trash
77
1.21k
  std::string path_in_trash;
78
1.21k
  s = MoveToTrash(file_path, &path_in_trash);
79
1.21k
  if (!s.ok()) {
80
10
    RLOG(InfoLogLevel::ERROR_LEVEL, info_log_,
81
10
        "Failed to move %s to trash directory (%s)", file_path.c_str(),
82
10
        trash_dir_.c_str());
83
10
    s = env_->DeleteFile(file_path);
84
10
    if (s.ok() && sst_file_manager_) {
85
0
      RETURN_NOT_OK(sst_file_manager_->OnDeleteFile(file_path));
86
0
    }
87
10
    return s;
88
10
  }
89
90
  // Add file to delete queue
91
1.20k
  {
92
1.20k
    MutexLock l(&mu_);
93
1.20k
    queue_.push(path_in_trash);
94
1.20k
    pending_files_++;
95
1.20k
    if (pending_files_ == 1) {
96
23
      cv_.SignalAll();
97
23
    }
98
1.20k
  }
99
1.20k
  return s;
100
1.21k
}
101
102
17
std::map<std::string, Status> DeleteScheduler::GetBackgroundErrors() {
103
17
  MutexLock l(&mu_);
104
17
  return bg_errors_;
105
17
}
106
107
Status DeleteScheduler::MoveToTrash(const std::string& file_path,
108
1.21k
                                    std::string* path_in_trash) {
109
1.21k
  Status s;
110
  // Figure out the name of the file in trash folder
111
1.21k
  size_t idx = file_path.rfind("/");
112
1.21k
  if (idx == std::string::npos || idx == file_path.size() - 1) {
113
0
    return STATUS(InvalidArgument, "file_path is corrupted");
114
0
  }
115
1.21k
  *path_in_trash = trash_dir_ + file_path.substr(idx);
116
1.21k
  std::string unique_suffix = "";
117
118
1.21k
  if (*path_in_trash == file_path) {
119
    // This file is already in trash
120
0
    return s;
121
0
  }
122
123
  // TODO(tec) : Implement Env::RenameFileIfNotExist and remove
124
  //             file_move_mu mutex.
125
1.21k
  MutexLock l(&file_move_mu_);
126
1.22k
  while (true) {
127
1.22k
    s = env_->FileExists(*path_in_trash + unique_suffix);
128
1.22k
    if (s.IsNotFound()) {
129
      // We found a path for our file in trash
130
1.21k
      *path_in_trash += unique_suffix;
131
1.21k
      s = env_->RenameFile(file_path, *path_in_trash);
132
1.21k
      break;
133
1.21k
    } else 
if (9
s.ok()9
) {
134
      // Name conflict, generate new random suffix
135
9
      unique_suffix = env_->GenerateUniqueId();
136
9
    } else {
137
      // Error during FileExists call, we cannot continue
138
0
      break;
139
0
    }
140
1.22k
  }
141
1.21k
  if (s.ok() && 
sst_file_manager_1.20k
) {
142
32
    RETURN_NOT_OK(sst_file_manager_->OnMoveFile(file_path, *path_in_trash));
143
32
  }
144
1.21k
  return s;
145
1.21k
}
146
147
18
void DeleteScheduler::BackgroundEmptyTrash() {
148
18
  TEST_SYNC_POINT("DeleteScheduler::BackgroundEmptyTrash");
149
150
41
  while (true) {
151
41
    MutexLock l(&mu_);
152
69
    while (queue_.empty() && 
!closing_45
) {
153
28
      cv_.Wait();
154
28
    }
155
156
41
    if (closing_) {
157
18
      return;
158
18
    }
159
160
    // Delete all files in queue_
161
23
    uint64_t start_time = env_->NowMicros();
162
23
    uint64_t total_deleted_bytes = 0;
163
1.12k
    while (!queue_.empty() && 
!closing_1.10k
) {
164
1.10k
      std::string path_in_trash = queue_.front();
165
1.10k
      queue_.pop();
166
167
      // We dont need to hold the lock while deleting the file
168
1.10k
      mu_.Unlock();
169
1.10k
      uint64_t deleted_bytes = 0;
170
      // Delete file from trash and update total_penlty value
171
1.10k
      Status s = DeleteTrashFile(path_in_trash,  &deleted_bytes);
172
1.10k
      total_deleted_bytes += deleted_bytes;
173
1.10k
      mu_.Lock();
174
175
1.10k
      if (!s.ok()) {
176
10
        bg_errors_[path_in_trash] = s;
177
10
      }
178
179
      // Apply penlty if necessary
180
1.10k
      uint64_t total_penlty =
181
1.10k
          ((total_deleted_bytes * kMicrosInSecond) / rate_bytes_per_sec_);
182
1.10k
      while (!closing_ && 
!cv_.TimedWait(start_time + total_penlty)1.10k
)
{}1
183
1.10k
      TEST_SYNC_POINT_CALLBACK("DeleteScheduler::BackgroundEmptyTrash:Wait",
184
1.10k
                               &total_penlty);
185
186
1.10k
      pending_files_--;
187
1.10k
      if (pending_files_ == 0) {
188
        // Unblock WaitForEmptyTrash since there are no more files waiting
189
        // to be deleted
190
22
        cv_.SignalAll();
191
22
      }
192
1.10k
    }
193
23
  }
194
18
}
195
196
Status DeleteScheduler::DeleteTrashFile(const std::string& path_in_trash,
197
1.10k
                                        uint64_t* deleted_bytes) {
198
1.10k
  uint64_t file_size;
199
1.10k
  Status s = env_->GetFileSize(path_in_trash, &file_size);
200
1.10k
  if (s.ok()) {
201
1.09k
    TEST_SYNC_POINT("DeleteScheduler::DeleteTrashFile:DeleteFile");
202
1.09k
    s = env_->DeleteFile(path_in_trash);
203
1.09k
  }
204
205
1.10k
  if (!s.ok()) {
206
    // Error while getting file size or while deleting
207
10
    RLOG(InfoLogLevel::ERROR_LEVEL, info_log_,
208
10
        "Failed to delete %s from trash -- %s", path_in_trash.c_str(),
209
10
        s.ToString().c_str());
210
10
    *deleted_bytes = 0;
211
1.09k
  } else {
212
1.09k
    *deleted_bytes = file_size;
213
1.09k
    if (sst_file_manager_) {
214
32
      RETURN_NOT_OK(sst_file_manager_->OnDeleteFile(path_in_trash));
215
32
    }
216
1.09k
  }
217
218
1.10k
  return s;
219
1.10k
}
220
221
21
void DeleteScheduler::WaitForEmptyTrash() {
222
21
  MutexLock l(&mu_);
223
41
  while (pending_files_ > 0 && 
!closing_20
) {
224
20
    cv_.Wait();
225
20
  }
226
21
}
227
228
}  // namespace rocksdb