YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/tablet/maintenance_manager.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include "yb/tablet/maintenance_manager.h"
34
35
#include <stdint.h>
36
37
#include <memory>
38
#include <string>
39
#include <utility>
40
41
#include "yb/gutil/stringprintf.h"
42
43
#include "yb/util/debug/trace_event.h"
44
#include "yb/util/debug/trace_logging.h"
45
#include "yb/util/flag_tags.h"
46
#include "yb/util/logging.h"
47
#include "yb/util/mem_tracker.h"
48
#include "yb/util/metrics.h"
49
#include "yb/util/status_log.h"
50
#include "yb/util/stopwatch.h"
51
#include "yb/util/thread.h"
52
#include "yb/util/unique_lock.h"
53
54
using std::pair;
55
using std::shared_ptr;
56
using strings::Substitute;
57
58
using namespace std::literals;
59
60
DEFINE_int32(maintenance_manager_num_threads, 1,
61
       "Size of the maintenance manager thread pool. Beyond a value of '1', one thread is "
62
       "reserved for emergency flushes. For spinning disks, the number of threads should "
63
       "not be above the number of devices.");
64
TAG_FLAG(maintenance_manager_num_threads, stable);
65
66
DEFINE_int32(maintenance_manager_polling_interval_ms, 250,
67
       "Polling interval for the maintenance manager scheduler, "
68
       "in milliseconds.");
69
TAG_FLAG(maintenance_manager_polling_interval_ms, hidden);
70
71
DEFINE_int32(maintenance_manager_history_size, 8,
72
       "Number of completed operations the manager is keeping track of.");
73
TAG_FLAG(maintenance_manager_history_size, hidden);
74
75
DEFINE_bool(enable_maintenance_manager, true,
76
       "Enable the maintenance manager, runs compaction and tablet cleaning tasks.");
77
TAG_FLAG(enable_maintenance_manager, unsafe);
78
79
namespace yb {
80
81
using yb::tablet::MaintenanceManagerStatusPB;
82
using yb::tablet::MaintenanceManagerStatusPB_CompletedOpPB;
83
using yb::tablet::MaintenanceManagerStatusPB_MaintenanceOpPB;
84
85
150k
MaintenanceOpStats::MaintenanceOpStats() {
86
150k
  Clear();
87
150k
}
88
89
50.5M
void MaintenanceOpStats::Clear() {
90
50.5M
  valid_ = false;
91
50.5M
  runnable_ = false;
92
50.5M
  ram_anchored_ = 0;
93
50.5M
  logs_retained_bytes_ = 0;
94
50.5M
  perf_improvement_ = 0;
95
50.5M
}
96
97
MaintenanceOp::MaintenanceOp(std::string name, IOUsage io_usage)
98
150k
    : name_(std::move(name)), io_usage_(io_usage) {}
99
100
75.6k
MaintenanceOp::~MaintenanceOp() {
101
75.6k
  CHECK(!manager_.get()) << "You must unregister the " << name_
102
0
         << " Op before destroying it.";
103
75.6k
  CHECK_EQ(running_, 0);
104
75.6k
}
105
106
75.6k
void MaintenanceOp::Unregister() {
107
75.6k
  CHECK
(manager_.get()) << "Op " << name_ << " was never registered."2
;
108
75.6k
  manager_->UnregisterOp(this);
109
75.6k
}
110
111
const MaintenanceManager::Options MaintenanceManager::DEFAULT_OPTIONS = {
112
  0,
113
  0,
114
  0,
115
  shared_ptr<MemTracker>(),
116
};
117
118
MaintenanceManager::MaintenanceManager(const Options& options)
119
  : num_threads_(options.num_threads <= 0 ?
120
      FLAGS_maintenance_manager_num_threads : options.num_threads),
121
    polling_interval_ms_(options.polling_interval_ms <= 0 ?
122
          FLAGS_maintenance_manager_polling_interval_ms :
123
          options.polling_interval_ms),
124
    parent_mem_tracker_(!options.parent_mem_tracker ?
125
17.3k
        MemTracker::GetRootTracker() : options.parent_mem_tracker) {
126
17.3k
  CHECK_OK(ThreadPoolBuilder("MaintenanceMgr").set_min_threads(num_threads_)
127
17.3k
               .set_max_threads(num_threads_).Build(&thread_pool_));
128
17.3k
  uint32_t history_size = options.history_size == 0 ?
129
17.3k
                          FLAGS_maintenance_manager_history_size :
130
17.3k
                          
options.history_size5
;
131
17.3k
  completed_ops_.resize(history_size);
132
17.3k
}
133
134
263
MaintenanceManager::~MaintenanceManager() {
135
263
  Shutdown();
136
263
}
137
138
16.6k
Status MaintenanceManager::Init() {
139
16.6k
  RETURN_NOT_OK(Thread::Create(
140
16.6k
      "maintenance", "maintenance_scheduler",
141
16.6k
      std::bind(&MaintenanceManager::RunSchedulerThread, this), &monitor_thread_));
142
16.6k
  return Status::OK();
143
16.6k
}
144
145
596
void MaintenanceManager::Shutdown() {
146
596
  {
147
596
    std::lock_guard<std::mutex> guard(mutex_);
148
596
    if (shutdown_) {
149
181
      return;
150
181
    }
151
415
    shutdown_ = true;
152
415
    cond_.notify_all();
153
415
  }
154
415
  if (monitor_thread_.get()) {
155
238
    CHECK_OK(ThreadJoiner(monitor_thread_.get()).Join());
156
238
    monitor_thread_.reset();
157
238
    thread_pool_->Shutdown();
158
238
  }
159
415
}
160
161
150k
void MaintenanceManager::RegisterOp(MaintenanceOp* op) {
162
150k
  std::lock_guard<std::mutex> lock(mutex_);
163
18.4E
  CHECK(!op->manager_.get()) << "Tried to register " << op->name()
164
18.4E
          << ", but it was already registered.";
165
150k
  auto inserted = ops_.emplace(op, MaintenanceOpStats()).second;
166
18.4E
  CHECK(inserted)
167
18.4E
      << "Tried to register " << op->name()
168
18.4E
      << ", but it already exists in ops_.";
169
150k
  op->manager_ = shared_from_this();
170
18.4E
  
VLOG_AND_TRACE150k
("maintenance", 1) << "Registered " << op->name();
171
150k
}
172
173
75.6k
void MaintenanceManager::UnregisterOp(MaintenanceOp* op) {
174
75.6k
  {
175
75.6k
    UNIQUE_LOCK(lock, mutex_);
176
177
18.4E
    CHECK(op->manager_.get() == this) << "Tried to unregister " << op->name()
178
18.4E
          << ", but it is not currently registered with this maintenance manager.";
179
    // While the op is running, wait for it to be finished.
180
75.6k
    if (op->running_ > 0) {
181
4
      VLOG_AND_TRACE("maintenance", 1) << "Waiting for op " << op->name() << " to finish so "
182
0
            << "we can unregister it.";
183
4
      for (;;) {
184
4
        auto wait_status = op->cond_.wait_for(GetLockForCondition(&lock), 15s);
185
4
        if (op->running_ == 0) {
186
4
          break;
187
4
        }
188
0
        LOG_IF(DFATAL, wait_status == std::cv_status::timeout)
189
0
            << "Op " << op->name() << " running for too long: " << op->running_;
190
0
      }
191
4
    }
192
75.6k
    auto iter = ops_.find(op);
193
18.4E
    CHECK(iter != ops_.end())
194
18.4E
        << "Tried to unregister " << op->name() << ", but it was never registered";
195
75.6k
    CHECK_EQ(iter->first, op);
196
75.6k
    ops_.erase(iter);
197
75.6k
  }
198
75.6k
  LOG(INFO) << "Unregistered op " << op->name();
199
  // Remove the op's shared_ptr reference to us.  This might 'delete this'.
200
75.6k
  op->manager_ = nullptr;
201
75.6k
}
202
203
16.6k
void MaintenanceManager::RunSchedulerThread() {
204
16.6k
  auto polling_interval = polling_interval_ms_ * 1ms;
205
206
16.6k
  UNIQUE_LOCK(lock, mutex_);
207
29.8M
  for (;;) {
208
    // Loop until we are shutting down or it is time to run another op.
209
29.8M
    cond_.wait_for(GetLockForCondition(&lock), polling_interval);
210
29.8M
    if (shutdown_) {
211
237
      VLOG_AND_TRACE
("maintenance", 1) << "Shutting down maintenance manager."0
;
212
237
      return;
213
237
    }
214
215
    // Find the best op.
216
29.8M
    MaintenanceOp* op = FindBestOp();
217
29.8M
    if (!op) {
218
29.8M
      VLOG_AND_TRACE
("maintenance", 2) << "No maintenance operations look worth doing."0
;
219
29.8M
      continue;
220
29.8M
    }
221
222
    // Prepare the maintenance operation.
223
16.4k
    ScopedMaintenanceOpRun scoped_run(op);
224
16.4k
    bool ready;
225
16.4k
    {
226
16.4k
      ReverseLock<decltype(lock)> rlock(lock);
227
16.4k
      ready = op->Prepare();
228
16.4k
      if (!ready) {
229
0
        LOG(INFO) << "Prepare failed for " << op->name() << ".  Re-running scheduler.";
230
0
        continue;
231
0
      }
232
233
      // Run the maintenance operation.
234
16.4k
      Status s = thread_pool_->SubmitFunc(
235
16.4k
          std::bind(&MaintenanceManager::LaunchOp, this, std::move(scoped_run)));
236
16.4k
      CHECK(s.ok());
237
16.4k
    }
238
16.4k
  }
239
16.6k
}
240
241
// Finding the best operation goes through four filters:
242
// - If there's an Op that we can run quickly that frees log retention, we run it.
243
// - If we've hit the overall process memory limit (note: this includes memory that the Ops cannot
244
//   free), we run the Op with the highest RAM usage.
245
// - If there are Ops that retain logs, we run the one that has the highest retention (and if many
246
//   qualify, then we run the one that also frees up the most RAM).
247
// - Finally, if there's nothing else that we really need to do, we run the Op that will improve
248
//   performance the most.
249
//
250
// The reason it's done this way is that we want to prioritize limiting the amount of resources we
251
// hold on to. Low IO Ops go first since we can quickly run them, then we can look at memory usage.
252
// Reversing those can starve the low IO Ops when the system is under intense memory pressure.
253
//
254
// In the third priority we're at a point where nothing's urgent and there's nothing we can run
255
// quickly.
256
// TODO We currently optimize for freeing log retention but we could consider having some sort of
257
// sliding priority between log retention and RAM usage. For example, is an Op that frees
258
// 128MB of log retention and 12MB of RAM always better than an op that frees 12MB of log retention
259
// and 128MB of RAM? Maybe a more holistic approach would be better.
260
29.8M
MaintenanceOp* MaintenanceManager::FindBestOp() {
261
29.8M
  TRACE_EVENT0("maintenance", "MaintenanceManager::FindBestOp");
262
29.8M
  if (!FLAGS_enable_maintenance_manager) {
263
34
    VLOG_AND_TRACE
("maintenance", 1) << "Maintenance manager is disabled. Doing nothing"0
;
264
34
    return nullptr;
265
34
  }
266
29.8M
  size_t free_threads = num_threads_ - running_ops_;
267
29.8M
  if (free_threads == 0) {
268
0
    VLOG_AND_TRACE("maintenance", 1) << "There are no free threads, so we can't run anything.";
269
0
    return nullptr;
270
0
  }
271
272
29.8M
  int64_t low_io_most_logs_retained_bytes = 0;
273
29.8M
  MaintenanceOp* low_io_most_logs_retained_bytes_op = nullptr;
274
275
29.8M
  uint64_t most_mem_anchored = 0;
276
29.8M
  MaintenanceOp* most_mem_anchored_op = nullptr;
277
278
29.8M
  int64_t most_logs_retained_bytes = 0;
279
29.8M
  uint64_t most_logs_retained_bytes_ram_anchored = 0;
280
29.8M
  MaintenanceOp* most_logs_retained_bytes_op = nullptr;
281
282
29.8M
  double best_perf_improvement = 0;
283
29.8M
  MaintenanceOp* best_perf_improvement_op = nullptr;
284
50.4M
  for (OpMapTy::value_type &val : ops_) {
285
50.4M
    MaintenanceOp* op(val.first);
286
50.4M
    MaintenanceOpStats& stats(val.second);
287
    // Update op stats.
288
50.4M
    stats.Clear();
289
50.4M
    op->UpdateStats(&stats);
290
50.4M
    if (!stats.valid() || 
!stats.runnable()50.4M
) {
291
600
      continue;
292
600
    }
293
50.4M
    if (stats.logs_retained_bytes() > low_io_most_logs_retained_bytes &&
294
50.4M
        
op->io_usage_ == MaintenanceOp::LOW_IO_USAGE15
) {
295
12
      low_io_most_logs_retained_bytes_op = op;
296
12
      low_io_most_logs_retained_bytes = stats.logs_retained_bytes();
297
12
    }
298
299
50.4M
    if (stats.ram_anchored() > most_mem_anchored) {
300
26
      most_mem_anchored_op = op;
301
26
      most_mem_anchored = stats.ram_anchored();
302
26
    }
303
    // We prioritize ops that can free more logs, but when it's the same we pick the one that
304
    // also frees up the most memory.
305
50.4M
    if (stats.logs_retained_bytes() > 0 &&
306
50.4M
        
(17
stats.logs_retained_bytes() > most_logs_retained_bytes17
||
307
17
            
(3
stats.logs_retained_bytes() == most_logs_retained_bytes3
&&
308
17
                
stats.ram_anchored() > most_logs_retained_bytes_ram_anchored3
))) {
309
17
      most_logs_retained_bytes_op = op;
310
17
      most_logs_retained_bytes = stats.logs_retained_bytes();
311
17
      most_logs_retained_bytes_ram_anchored = stats.ram_anchored();
312
17
    }
313
50.4M
    if ((!best_perf_improvement_op) ||
314
50.4M
        
(stats.perf_improvement() > best_perf_improvement)32.0M
) {
315
18.3M
      best_perf_improvement_op = op;
316
18.3M
      best_perf_improvement = stats.perf_improvement();
317
18.3M
    }
318
50.4M
  }
319
320
  // Look at ops that we can run quickly that free up log retention.
321
29.8M
  if (low_io_most_logs_retained_bytes_op) {
322
12
    if (low_io_most_logs_retained_bytes > 0) {
323
12
      VLOG_AND_TRACE("maintenance", 1)
324
0
                    << "Performing " << low_io_most_logs_retained_bytes_op->name() << ", "
325
0
                    << "because it can free up more logs "
326
0
                    << "at " << low_io_most_logs_retained_bytes
327
0
                    << " bytes with a low IO cost";
328
12
      return low_io_most_logs_retained_bytes_op;
329
12
    }
330
12
  }
331
332
  // Look at free memory. If it is dangerously low, we must select something
333
  // that frees memory-- the op with the most anchored memory.
334
29.8M
  auto soft_limit_exceeded_result = parent_mem_tracker_->AnySoftLimitExceeded(0.0 /* score */);
335
29.8M
  if (soft_limit_exceeded_result.exceeded) {
336
94
    if (!most_mem_anchored_op) {
337
92
      string msg = StringPrintf("we have exceeded our soft memory limit "
338
92
          "(current capacity is %.2f%%).  However, there are no ops currently "
339
92
          "runnable which would free memory.", soft_limit_exceeded_result.current_capacity_pct);
340
92
      YB_LOG_EVERY_N_SECS
(INFO, 5) << msg8
;
341
92
      return nullptr;
342
92
    }
343
2
    VLOG_AND_TRACE("maintenance", 1) << "we have exceeded our soft memory limit "
344
0
            << "(current capacity is " << soft_limit_exceeded_result.current_capacity_pct << "%). "
345
0
            << "Running the op which anchors the most memory: " << most_mem_anchored_op->name();
346
2
    return most_mem_anchored_op;
347
94
  }
348
349
29.8M
  if (most_logs_retained_bytes_op) {
350
2
    VLOG_AND_TRACE("maintenance", 1)
351
0
            << "Performing " << most_logs_retained_bytes_op->name() << ", "
352
0
            << "because it can free up more logs " << "at " << most_logs_retained_bytes
353
0
            << " bytes";
354
2
    return most_logs_retained_bytes_op;
355
2
  }
356
357
29.8M
  if (best_perf_improvement_op) {
358
18.3M
    if (best_perf_improvement > 0) {
359
5
      VLOG_AND_TRACE("maintenance", 1) << "Performing " << best_perf_improvement_op->name() << ", "
360
0
                 << "because it had the best perf_improvement score, "
361
0
                 << "at " << best_perf_improvement;
362
5
      return best_perf_improvement_op;
363
5
    }
364
18.3M
  }
365
29.8M
  return nullptr;
366
29.8M
}
367
368
18
void MaintenanceManager::LaunchOp(const ScopedMaintenanceOpRun& run) {
369
18
  auto op = run.get();
370
18
  MonoTime start_time(MonoTime::Now());
371
18
  op->RunningGauge()->Increment();
372
18
  LOG_TIMING(INFO, Substitute("running $0", op->name())) {
373
18
    TRACE_EVENT1("maintenance", "MaintenanceManager::LaunchOp",
374
18
                 "name", op->name());
375
18
    op->Perform();
376
18
  }
377
18
  op->RunningGauge()->Decrement();
378
18
  MonoTime end_time(MonoTime::Now());
379
18
  MonoDelta delta(end_time.GetDeltaSince(start_time));
380
18
  std::lock_guard<std::mutex> lock(mutex_);
381
382
18
  CompletedOp& completed_op = completed_ops_[completed_ops_count_ % completed_ops_.size()];
383
18
  completed_op.name = op->name();
384
18
  completed_op.duration = delta;
385
18
  completed_op.start_mono_time = start_time;
386
18
  completed_ops_count_++;
387
388
18
  op->DurationHistogram()->Increment(delta.ToMilliseconds());
389
18
}
390
391
5
void MaintenanceManager::GetMaintenanceManagerStatusDump(MaintenanceManagerStatusPB* out_pb) {
392
5
  DCHECK(out_pb != nullptr);
393
5
  std::lock_guard<std::mutex> lock(mutex_);
394
5
  MaintenanceOp* best_op = FindBestOp();
395
5
  for (MaintenanceManager::OpMapTy::value_type& val : ops_) {
396
0
    MaintenanceManagerStatusPB_MaintenanceOpPB* op_pb = out_pb->add_registered_operations();
397
0
    MaintenanceOp* op(val.first);
398
0
    MaintenanceOpStats& stat(val.second);
399
0
    op_pb->set_name(op->name());
400
0
    op_pb->set_running(op->running());
401
0
    if (stat.valid()) {
402
0
      op_pb->set_runnable(stat.runnable());
403
0
      op_pb->set_ram_anchored_bytes(stat.ram_anchored());
404
0
      op_pb->set_logs_retained_bytes(stat.logs_retained_bytes());
405
0
      op_pb->set_perf_improvement(stat.perf_improvement());
406
0
    } else {
407
0
      op_pb->set_runnable(false);
408
0
      op_pb->set_ram_anchored_bytes(0);
409
0
      op_pb->set_logs_retained_bytes(0);
410
0
      op_pb->set_perf_improvement(0);
411
0
    }
412
413
0
    if (best_op == op) {
414
0
      out_pb->mutable_best_op()->CopyFrom(*op_pb);
415
0
    }
416
0
  }
417
418
20
  for (const CompletedOp& completed_op : completed_ops_) {
419
20
    if (!completed_op.name.empty()) {
420
14
      MaintenanceManagerStatusPB_CompletedOpPB* completed_pb = out_pb->add_completed_operations();
421
14
      completed_pb->set_name(completed_op.name);
422
14
      completed_pb->set_duration_millis(
423
14
          narrow_cast<int32_t>(completed_op.duration.ToMilliseconds()));
424
425
14
      MonoDelta delta(MonoTime::Now().GetDeltaSince(completed_op.start_mono_time));
426
14
      completed_pb->set_secs_since_start(delta.ToSeconds());
427
14
    }
428
20
  }
429
5
}
430
431
18
ScopedMaintenanceOpRun::ScopedMaintenanceOpRun(MaintenanceOp* op) : op_(op) {
432
18
  ++op->running_;
433
18
  ++op->manager_->running_ops_;
434
18
}
435
436
0
ScopedMaintenanceOpRun::ScopedMaintenanceOpRun(const ScopedMaintenanceOpRun& rhs) : op_(rhs.op_) {
437
0
  Assign(rhs.op_);
438
0
}
439
440
0
void ScopedMaintenanceOpRun::operator=(const ScopedMaintenanceOpRun& rhs) {
441
0
  Reset();
442
0
  Assign(rhs.op_);
443
0
}
444
445
36
ScopedMaintenanceOpRun::ScopedMaintenanceOpRun(ScopedMaintenanceOpRun&& rhs) : op_(rhs.op_) {
446
36
  rhs.op_ = nullptr;
447
36
}
448
449
0
void ScopedMaintenanceOpRun::operator=(ScopedMaintenanceOpRun&& rhs) {
450
0
  Reset();
451
0
  op_ = rhs.op_;
452
0
  rhs.op_ = nullptr;
453
0
}
454
455
54
ScopedMaintenanceOpRun::~ScopedMaintenanceOpRun() {
456
54
  Reset();
457
54
}
458
459
54
void ScopedMaintenanceOpRun::Reset() {
460
54
  if (!op_) {
461
36
    return;
462
36
  }
463
18
  std::lock_guard<std::mutex> lock(op_->manager_->mutex_);
464
18
  if (--op_->running_ == 0) {
465
18
    op_->cond_.notify_all();
466
18
  }
467
18
  --op_->manager_->running_ops_;
468
18
  op_ = nullptr;
469
18
}
470
471
18
MaintenanceOp* ScopedMaintenanceOpRun::get() const {
472
18
  return op_;
473
18
}
474
475
0
void ScopedMaintenanceOpRun::Assign(MaintenanceOp* op) {
476
0
  op_ = op;
477
0
  std::lock_guard<std::mutex> lock(op_->manager_->mutex_);
478
0
  ++op->running_;
479
0
  ++op->manager_->running_ops_;
480
0
}
481
482
} // namespace yb