/Users/deen/code/yugabyte-db/src/yb/tablet/maintenance_manager.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | // |
18 | | // The following only applies to changes made to this file as part of YugaByte development. |
19 | | // |
20 | | // Portions Copyright (c) YugaByte, Inc. |
21 | | // |
22 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
23 | | // in compliance with the License. You may obtain a copy of the License at |
24 | | // |
25 | | // http://www.apache.org/licenses/LICENSE-2.0 |
26 | | // |
27 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
28 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
29 | | // or implied. See the License for the specific language governing permissions and limitations |
30 | | // under the License. |
31 | | // |
32 | | |
33 | | #include "yb/tablet/maintenance_manager.h" |
34 | | |
35 | | #include <stdint.h> |
36 | | |
37 | | #include <memory> |
38 | | #include <string> |
39 | | #include <utility> |
40 | | |
41 | | #include "yb/gutil/stringprintf.h" |
42 | | |
43 | | #include "yb/util/debug/trace_event.h" |
44 | | #include "yb/util/debug/trace_logging.h" |
45 | | #include "yb/util/flag_tags.h" |
46 | | #include "yb/util/logging.h" |
47 | | #include "yb/util/mem_tracker.h" |
48 | | #include "yb/util/metrics.h" |
49 | | #include "yb/util/status_log.h" |
50 | | #include "yb/util/stopwatch.h" |
51 | | #include "yb/util/thread.h" |
52 | | #include "yb/util/unique_lock.h" |
53 | | |
54 | | using std::pair; |
55 | | using std::shared_ptr; |
56 | | using strings::Substitute; |
57 | | |
58 | | using namespace std::literals; |
59 | | |
60 | | DEFINE_int32(maintenance_manager_num_threads, 1, |
61 | | "Size of the maintenance manager thread pool. Beyond a value of '1', one thread is " |
62 | | "reserved for emergency flushes. For spinning disks, the number of threads should " |
63 | | "not be above the number of devices."); |
64 | | TAG_FLAG(maintenance_manager_num_threads, stable); |
65 | | |
66 | | DEFINE_int32(maintenance_manager_polling_interval_ms, 250, |
67 | | "Polling interval for the maintenance manager scheduler, " |
68 | | "in milliseconds."); |
69 | | TAG_FLAG(maintenance_manager_polling_interval_ms, hidden); |
70 | | |
71 | | DEFINE_int32(maintenance_manager_history_size, 8, |
72 | | "Number of completed operations the manager is keeping track of."); |
73 | | TAG_FLAG(maintenance_manager_history_size, hidden); |
74 | | |
75 | | DEFINE_bool(enable_maintenance_manager, true, |
76 | | "Enable the maintenance manager, runs compaction and tablet cleaning tasks."); |
77 | | TAG_FLAG(enable_maintenance_manager, unsafe); |
78 | | |
79 | | namespace yb { |
80 | | |
81 | | using yb::tablet::MaintenanceManagerStatusPB; |
82 | | using yb::tablet::MaintenanceManagerStatusPB_CompletedOpPB; |
83 | | using yb::tablet::MaintenanceManagerStatusPB_MaintenanceOpPB; |
84 | | |
85 | 88.7k | MaintenanceOpStats::MaintenanceOpStats() { |
86 | 88.7k | Clear(); |
87 | 88.7k | } |
88 | | |
89 | 6.70M | void MaintenanceOpStats::Clear() { |
90 | 6.70M | valid_ = false; |
91 | 6.70M | runnable_ = false; |
92 | 6.70M | ram_anchored_ = 0; |
93 | 6.70M | logs_retained_bytes_ = 0; |
94 | 6.70M | perf_improvement_ = 0; |
95 | 6.70M | } |
96 | | |
97 | | MaintenanceOp::MaintenanceOp(std::string name, IOUsage io_usage) |
98 | 88.7k | : name_(std::move(name)), io_usage_(io_usage) {} |
99 | | |
100 | 47.7k | MaintenanceOp::~MaintenanceOp() { |
101 | 0 | CHECK(!manager_.get()) << "You must unregister the " << name_ |
102 | 0 | << " Op before destroying it."; |
103 | 47.7k | CHECK_EQ(running_, 0); |
104 | 47.7k | } |
105 | | |
106 | 47.7k | void MaintenanceOp::Unregister() { |
107 | 2 | CHECK(manager_.get()) << "Op " << name_ << " was never registered."; |
108 | 47.7k | manager_->UnregisterOp(this); |
109 | 47.7k | } |
110 | | |
111 | | const MaintenanceManager::Options MaintenanceManager::DEFAULT_OPTIONS = { |
112 | | 0, |
113 | | 0, |
114 | | 0, |
115 | | shared_ptr<MemTracker>(), |
116 | | }; |
117 | | |
118 | | MaintenanceManager::MaintenanceManager(const Options& options) |
119 | | : num_threads_(options.num_threads <= 0 ? |
120 | | FLAGS_maintenance_manager_num_threads : options.num_threads), |
121 | | polling_interval_ms_(options.polling_interval_ms <= 0 ? |
122 | | FLAGS_maintenance_manager_polling_interval_ms : |
123 | | options.polling_interval_ms), |
124 | | parent_mem_tracker_(!options.parent_mem_tracker ? |
125 | 11.5k | MemTracker::GetRootTracker() : options.parent_mem_tracker) { |
126 | 11.5k | CHECK_OK(ThreadPoolBuilder("MaintenanceMgr").set_min_threads(num_threads_) |
127 | 11.5k | .set_max_threads(num_threads_).Build(&thread_pool_)); |
128 | 11.5k | uint32_t history_size = options.history_size == 0 ? |
129 | 11.5k | FLAGS_maintenance_manager_history_size : |
130 | 5 | options.history_size; |
131 | 11.5k | completed_ops_.resize(history_size); |
132 | 11.5k | } |
133 | | |
134 | 210 | MaintenanceManager::~MaintenanceManager() { |
135 | 210 | Shutdown(); |
136 | 210 | } |
137 | | |
138 | 11.2k | Status MaintenanceManager::Init() { |
139 | 11.2k | RETURN_NOT_OK(Thread::Create( |
140 | 11.2k | "maintenance", "maintenance_scheduler", |
141 | 11.2k | std::bind(&MaintenanceManager::RunSchedulerThread, this), &monitor_thread_)); |
142 | 11.2k | return Status::OK(); |
143 | 11.2k | } |
144 | | |
145 | 406 | void MaintenanceManager::Shutdown() { |
146 | 406 | { |
147 | 406 | std::lock_guard<std::mutex> guard(mutex_); |
148 | 406 | if (shutdown_) { |
149 | 164 | return; |
150 | 164 | } |
151 | 242 | shutdown_ = true; |
152 | 242 | cond_.notify_all(); |
153 | 242 | } |
154 | 242 | if (monitor_thread_.get()) { |
155 | 192 | CHECK_OK(ThreadJoiner(monitor_thread_.get()).Join()); |
156 | 192 | monitor_thread_.reset(); |
157 | 192 | thread_pool_->Shutdown(); |
158 | 192 | } |
159 | 242 | } |
160 | | |
161 | 88.7k | void MaintenanceManager::RegisterOp(MaintenanceOp* op) { |
162 | 88.7k | std::lock_guard<std::mutex> lock(mutex_); |
163 | 18.4E | CHECK(!op->manager_.get()) << "Tried to register " << op->name() |
164 | 18.4E | << ", but it was already registered."; |
165 | 88.7k | auto inserted = ops_.emplace(op, MaintenanceOpStats()).second; |
166 | 18.4E | CHECK(inserted) |
167 | 18.4E | << "Tried to register " << op->name() |
168 | 18.4E | << ", but it already exists in ops_."; |
169 | 88.7k | op->manager_ = shared_from_this(); |
170 | 18.4E | VLOG_AND_TRACE("maintenance", 1) << "Registered " << op->name(); |
171 | 88.7k | } |
172 | | |
173 | 47.7k | void MaintenanceManager::UnregisterOp(MaintenanceOp* op) { |
174 | 47.7k | { |
175 | 47.7k | UNIQUE_LOCK(lock, mutex_); |
176 | | |
177 | 18.4E | CHECK(op->manager_.get() == this) << "Tried to unregister " << op->name() |
178 | 18.4E | << ", but it is not currently registered with this maintenance manager."; |
179 | | // While the op is running, wait for it to be finished. |
180 | 47.7k | if (op->running_ > 0) { |
181 | 5 | VLOG_AND_TRACE("maintenance", 1) << "Waiting for op " << op->name() << " to finish so " |
182 | 0 | << "we can unregister it."; |
183 | 5 | for (;;) { |
184 | 5 | auto wait_status = op->cond_.wait_for(GetLockForCondition(&lock), 15s); |
185 | 5 | if (op->running_ == 0) { |
186 | 5 | break; |
187 | 5 | } |
188 | 0 | LOG_IF(DFATAL, wait_status == std::cv_status::timeout) |
189 | 0 | << "Op " << op->name() << " running for too long: " << op->running_; |
190 | 0 | } |
191 | 5 | } |
192 | 47.7k | auto iter = ops_.find(op); |
193 | 18.4E | CHECK(iter != ops_.end()) |
194 | 18.4E | << "Tried to unregister " << op->name() << ", but it was never registered"; |
195 | 47.7k | CHECK_EQ(iter->first, op); |
196 | 47.7k | ops_.erase(iter); |
197 | 47.7k | } |
198 | 47.7k | LOG(INFO) << "Unregistered op " << op->name(); |
199 | | // Remove the op's shared_ptr reference to us. This might 'delete this'. |
200 | 47.7k | op->manager_ = nullptr; |
201 | 47.7k | } |
202 | | |
203 | 11.2k | void MaintenanceManager::RunSchedulerThread() { |
204 | 11.2k | auto polling_interval = polling_interval_ms_ * 1ms; |
205 | | |
206 | 11.2k | UNIQUE_LOCK(lock, mutex_); |
207 | 1.21M | for (;;) { |
208 | | // Loop until we are shutting down or it is time to run another op. |
209 | 1.21M | cond_.wait_for(GetLockForCondition(&lock), polling_interval); |
210 | 1.21M | if (shutdown_) { |
211 | 192 | VLOG_AND_TRACE("maintenance", 1) << "Shutting down maintenance manager."; |
212 | 192 | return; |
213 | 192 | } |
214 | | |
215 | | // Find the best op. |
216 | 1.21M | MaintenanceOp* op = FindBestOp(); |
217 | 1.21M | if (!op) { |
218 | 1.20M | VLOG_AND_TRACE("maintenance", 2) << "No maintenance operations look worth doing."; |
219 | 1.20M | continue; |
220 | 1.20M | } |
221 | | |
222 | | // Prepare the maintenance operation. |
223 | 11.0k | ScopedMaintenanceOpRun scoped_run(op); |
224 | 11.0k | bool ready; |
225 | 11.0k | { |
226 | 11.0k | ReverseLock<decltype(lock)> rlock(lock); |
227 | 11.0k | ready = op->Prepare(); |
228 | 11.0k | if (!ready) { |
229 | 0 | LOG(INFO) << "Prepare failed for " << op->name() << ". Re-running scheduler."; |
230 | 0 | continue; |
231 | 0 | } |
232 | | |
233 | | // Run the maintenance operation. |
234 | 11.0k | Status s = thread_pool_->SubmitFunc( |
235 | 11.0k | std::bind(&MaintenanceManager::LaunchOp, this, std::move(scoped_run))); |
236 | 11.0k | CHECK(s.ok()); |
237 | 11.0k | } |
238 | 11.0k | } |
239 | 11.2k | } |
240 | | |
241 | | // Finding the best operation goes through four filters: |
242 | | // - If there's an Op that we can run quickly that frees log retention, we run it. |
243 | | // - If we've hit the overall process memory limit (note: this includes memory that the Ops cannot |
244 | | // free), we run the Op with the highest RAM usage. |
245 | | // - If there are Ops that retain logs, we run the one that has the highest retention (and if many |
246 | | // qualify, then we run the one that also frees up the most RAM). |
247 | | // - Finally, if there's nothing else that we really need to do, we run the Op that will improve |
248 | | // performance the most. |
249 | | // |
250 | | // The reason it's done this way is that we want to prioritize limiting the amount of resources we |
251 | | // hold on to. Low IO Ops go first since we can quickly run them, then we can look at memory usage. |
252 | | // Reversing those can starve the low IO Ops when the system is under intense memory pressure. |
253 | | // |
254 | | // In the third priority we're at a point where nothing's urgent and there's nothing we can run |
255 | | // quickly. |
256 | | // TODO We currently optimize for freeing log retention but we could consider having some sort of |
257 | | // sliding priority between log retention and RAM usage. For example, is an Op that frees |
258 | | // 128MB of log retention and 12MB of RAM always better than an op that frees 12MB of log retention |
259 | | // and 128MB of RAM? Maybe a more holistic approach would be better. |
260 | 1.20M | MaintenanceOp* MaintenanceManager::FindBestOp() { |
261 | 1.20M | TRACE_EVENT0("maintenance", "MaintenanceManager::FindBestOp"); |
262 | 1.20M | if (!FLAGS_enable_maintenance_manager) { |
263 | 32 | VLOG_AND_TRACE("maintenance", 1) << "Maintenance manager is disabled. Doing nothing"; |
264 | 32 | return nullptr; |
265 | 32 | } |
266 | 1.20M | size_t free_threads = num_threads_ - running_ops_; |
267 | 1.20M | if (free_threads == 0) { |
268 | 0 | VLOG_AND_TRACE("maintenance", 1) << "There are no free threads, so we can't run anything."; |
269 | 0 | return nullptr; |
270 | 0 | } |
271 | | |
272 | 1.20M | int64_t low_io_most_logs_retained_bytes = 0; |
273 | 1.20M | MaintenanceOp* low_io_most_logs_retained_bytes_op = nullptr; |
274 | | |
275 | 1.20M | uint64_t most_mem_anchored = 0; |
276 | 1.20M | MaintenanceOp* most_mem_anchored_op = nullptr; |
277 | | |
278 | 1.20M | int64_t most_logs_retained_bytes = 0; |
279 | 1.20M | uint64_t most_logs_retained_bytes_ram_anchored = 0; |
280 | 1.20M | MaintenanceOp* most_logs_retained_bytes_op = nullptr; |
281 | | |
282 | 1.20M | double best_perf_improvement = 0; |
283 | 1.20M | MaintenanceOp* best_perf_improvement_op = nullptr; |
284 | 6.61M | for (OpMapTy::value_type &val : ops_) { |
285 | 6.61M | MaintenanceOp* op(val.first); |
286 | 6.61M | MaintenanceOpStats& stats(val.second); |
287 | | // Update op stats. |
288 | 6.61M | stats.Clear(); |
289 | 6.61M | op->UpdateStats(&stats); |
290 | 6.61M | if (!stats.valid() || !stats.runnable()) { |
291 | 938 | continue; |
292 | 938 | } |
293 | 6.61M | if (stats.logs_retained_bytes() > low_io_most_logs_retained_bytes && |
294 | 5 | op->io_usage_ == MaintenanceOp::LOW_IO_USAGE) { |
295 | 2 | low_io_most_logs_retained_bytes_op = op; |
296 | 2 | low_io_most_logs_retained_bytes = stats.logs_retained_bytes(); |
297 | 2 | } |
298 | | |
299 | 6.61M | if (stats.ram_anchored() > most_mem_anchored) { |
300 | 30 | most_mem_anchored_op = op; |
301 | 30 | most_mem_anchored = stats.ram_anchored(); |
302 | 30 | } |
303 | | // We prioritize ops that can free more logs, but when it's the same we pick the one that |
304 | | // also frees up the most memory. |
305 | 6.61M | if (stats.logs_retained_bytes() > 0 && |
306 | 7 | (stats.logs_retained_bytes() > most_logs_retained_bytes || |
307 | 3 | (stats.logs_retained_bytes() == most_logs_retained_bytes && |
308 | 7 | stats.ram_anchored() > most_logs_retained_bytes_ram_anchored))) { |
309 | 7 | most_logs_retained_bytes_op = op; |
310 | 7 | most_logs_retained_bytes = stats.logs_retained_bytes(); |
311 | 7 | most_logs_retained_bytes_ram_anchored = stats.ram_anchored(); |
312 | 7 | } |
313 | 6.61M | if ((!best_perf_improvement_op) || |
314 | 5.73M | (stats.perf_improvement() > best_perf_improvement)) { |
315 | 879k | best_perf_improvement_op = op; |
316 | 879k | best_perf_improvement = stats.perf_improvement(); |
317 | 879k | } |
318 | 6.61M | } |
319 | | |
320 | | // Look at ops that we can run quickly that free up log retention. |
321 | 1.20M | if (low_io_most_logs_retained_bytes_op) { |
322 | 2 | if (low_io_most_logs_retained_bytes > 0) { |
323 | 2 | VLOG_AND_TRACE("maintenance", 1) |
324 | 0 | << "Performing " << low_io_most_logs_retained_bytes_op->name() << ", " |
325 | 0 | << "because it can free up more logs " |
326 | 0 | << "at " << low_io_most_logs_retained_bytes |
327 | 0 | << " bytes with a low IO cost"; |
328 | 2 | return low_io_most_logs_retained_bytes_op; |
329 | 2 | } |
330 | 1.20M | } |
331 | | |
332 | | // Look at free memory. If it is dangerously low, we must select something |
333 | | // that frees memory-- the op with the most anchored memory. |
334 | 1.20M | auto soft_limit_exceeded_result = parent_mem_tracker_->AnySoftLimitExceeded(0.0 /* score */); |
335 | 1.20M | if (soft_limit_exceeded_result.exceeded) { |
336 | 49 | if (!most_mem_anchored_op) { |
337 | 47 | string msg = StringPrintf("we have exceeded our soft memory limit " |
338 | 47 | "(current capacity is %.2f%%). However, there are no ops currently " |
339 | 47 | "runnable which would free memory.", soft_limit_exceeded_result.current_capacity_pct); |
340 | 47 | YB_LOG_EVERY_N_SECS(INFO, 5) << msg; |
341 | 47 | return nullptr; |
342 | 47 | } |
343 | 2 | VLOG_AND_TRACE("maintenance", 1) << "we have exceeded our soft memory limit " |
344 | 0 | << "(current capacity is " << soft_limit_exceeded_result.current_capacity_pct << "%). " |
345 | 0 | << "Running the op which anchors the most memory: " << most_mem_anchored_op->name(); |
346 | 2 | return most_mem_anchored_op; |
347 | 2 | } |
348 | | |
349 | 1.20M | if (most_logs_retained_bytes_op) { |
350 | 2 | VLOG_AND_TRACE("maintenance", 1) |
351 | 0 | << "Performing " << most_logs_retained_bytes_op->name() << ", " |
352 | 0 | << "because it can free up more logs " << "at " << most_logs_retained_bytes |
353 | 0 | << " bytes"; |
354 | 2 | return most_logs_retained_bytes_op; |
355 | 2 | } |
356 | | |
357 | 1.20M | if (best_perf_improvement_op) { |
358 | 879k | if (best_perf_improvement > 0) { |
359 | 5 | VLOG_AND_TRACE("maintenance", 1) << "Performing " << best_perf_improvement_op->name() << ", " |
360 | 0 | << "because it had the best perf_improvement score, " |
361 | 0 | << "at " << best_perf_improvement; |
362 | 5 | return best_perf_improvement_op; |
363 | 5 | } |
364 | 1.20M | } |
365 | 1.20M | return nullptr; |
366 | 1.20M | } |
367 | | |
368 | 8 | void MaintenanceManager::LaunchOp(const ScopedMaintenanceOpRun& run) { |
369 | 8 | auto op = run.get(); |
370 | 8 | MonoTime start_time(MonoTime::Now()); |
371 | 8 | op->RunningGauge()->Increment(); |
372 | 8 | LOG_TIMING(INFO, Substitute("running $0", op->name())) { |
373 | 8 | TRACE_EVENT1("maintenance", "MaintenanceManager::LaunchOp", |
374 | 8 | "name", op->name()); |
375 | 8 | op->Perform(); |
376 | 8 | } |
377 | 8 | op->RunningGauge()->Decrement(); |
378 | 8 | MonoTime end_time(MonoTime::Now()); |
379 | 8 | MonoDelta delta(end_time.GetDeltaSince(start_time)); |
380 | 8 | std::lock_guard<std::mutex> lock(mutex_); |
381 | | |
382 | 8 | CompletedOp& completed_op = completed_ops_[completed_ops_count_ % completed_ops_.size()]; |
383 | 8 | completed_op.name = op->name(); |
384 | 8 | completed_op.duration = delta; |
385 | 8 | completed_op.start_mono_time = start_time; |
386 | 8 | completed_ops_count_++; |
387 | | |
388 | 8 | op->DurationHistogram()->Increment(delta.ToMilliseconds()); |
389 | 8 | } |
390 | | |
391 | 5 | void MaintenanceManager::GetMaintenanceManagerStatusDump(MaintenanceManagerStatusPB* out_pb) { |
392 | 5 | DCHECK(out_pb != nullptr); |
393 | 5 | std::lock_guard<std::mutex> lock(mutex_); |
394 | 5 | MaintenanceOp* best_op = FindBestOp(); |
395 | 0 | for (MaintenanceManager::OpMapTy::value_type& val : ops_) { |
396 | 0 | MaintenanceManagerStatusPB_MaintenanceOpPB* op_pb = out_pb->add_registered_operations(); |
397 | 0 | MaintenanceOp* op(val.first); |
398 | 0 | MaintenanceOpStats& stat(val.second); |
399 | 0 | op_pb->set_name(op->name()); |
400 | 0 | op_pb->set_running(op->running()); |
401 | 0 | if (stat.valid()) { |
402 | 0 | op_pb->set_runnable(stat.runnable()); |
403 | 0 | op_pb->set_ram_anchored_bytes(stat.ram_anchored()); |
404 | 0 | op_pb->set_logs_retained_bytes(stat.logs_retained_bytes()); |
405 | 0 | op_pb->set_perf_improvement(stat.perf_improvement()); |
406 | 0 | } else { |
407 | 0 | op_pb->set_runnable(false); |
408 | 0 | op_pb->set_ram_anchored_bytes(0); |
409 | 0 | op_pb->set_logs_retained_bytes(0); |
410 | 0 | op_pb->set_perf_improvement(0); |
411 | 0 | } |
412 | |
|
413 | 0 | if (best_op == op) { |
414 | 0 | out_pb->mutable_best_op()->CopyFrom(*op_pb); |
415 | 0 | } |
416 | 0 | } |
417 | | |
418 | 20 | for (const CompletedOp& completed_op : completed_ops_) { |
419 | 20 | if (!completed_op.name.empty()) { |
420 | 14 | MaintenanceManagerStatusPB_CompletedOpPB* completed_pb = out_pb->add_completed_operations(); |
421 | 14 | completed_pb->set_name(completed_op.name); |
422 | 14 | completed_pb->set_duration_millis( |
423 | 14 | narrow_cast<int32_t>(completed_op.duration.ToMilliseconds())); |
424 | | |
425 | 14 | MonoDelta delta(MonoTime::Now().GetDeltaSince(completed_op.start_mono_time)); |
426 | 14 | completed_pb->set_secs_since_start(delta.ToSeconds()); |
427 | 14 | } |
428 | 20 | } |
429 | 5 | } |
430 | | |
431 | 8 | ScopedMaintenanceOpRun::ScopedMaintenanceOpRun(MaintenanceOp* op) : op_(op) { |
432 | 8 | ++op->running_; |
433 | 8 | ++op->manager_->running_ops_; |
434 | 8 | } |
435 | | |
436 | 0 | ScopedMaintenanceOpRun::ScopedMaintenanceOpRun(const ScopedMaintenanceOpRun& rhs) : op_(rhs.op_) { |
437 | 0 | Assign(rhs.op_); |
438 | 0 | } |
439 | | |
440 | 0 | void ScopedMaintenanceOpRun::operator=(const ScopedMaintenanceOpRun& rhs) { |
441 | 0 | Reset(); |
442 | 0 | Assign(rhs.op_); |
443 | 0 | } |
444 | | |
445 | 16 | ScopedMaintenanceOpRun::ScopedMaintenanceOpRun(ScopedMaintenanceOpRun&& rhs) : op_(rhs.op_) { |
446 | 16 | rhs.op_ = nullptr; |
447 | 16 | } |
448 | | |
449 | 0 | void ScopedMaintenanceOpRun::operator=(ScopedMaintenanceOpRun&& rhs) { |
450 | 0 | Reset(); |
451 | 0 | op_ = rhs.op_; |
452 | 0 | rhs.op_ = nullptr; |
453 | 0 | } |
454 | | |
455 | 24 | ScopedMaintenanceOpRun::~ScopedMaintenanceOpRun() { |
456 | 24 | Reset(); |
457 | 24 | } |
458 | | |
459 | 24 | void ScopedMaintenanceOpRun::Reset() { |
460 | 24 | if (!op_) { |
461 | 16 | return; |
462 | 16 | } |
463 | 8 | std::lock_guard<std::mutex> lock(op_->manager_->mutex_); |
464 | 8 | if (--op_->running_ == 0) { |
465 | 8 | op_->cond_.notify_all(); |
466 | 8 | } |
467 | 8 | --op_->manager_->running_ops_; |
468 | 8 | op_ = nullptr; |
469 | 8 | } |
470 | | |
471 | 8 | MaintenanceOp* ScopedMaintenanceOpRun::get() const { |
472 | 8 | return op_; |
473 | 8 | } |
474 | | |
475 | 0 | void ScopedMaintenanceOpRun::Assign(MaintenanceOp* op) { |
476 | 0 | op_ = op; |
477 | 0 | std::lock_guard<std::mutex> lock(op_->manager_->mutex_); |
478 | 0 | ++op->running_; |
479 | 0 | ++op->manager_->running_ops_; |
480 | 0 | } |
481 | | |
482 | | } // namespace yb |