YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/tablet/maintenance_manager.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include "yb/tablet/maintenance_manager.h"
34
35
#include <stdint.h>
36
37
#include <memory>
38
#include <string>
39
#include <utility>
40
41
#include "yb/gutil/stringprintf.h"
42
43
#include "yb/util/debug/trace_event.h"
44
#include "yb/util/debug/trace_logging.h"
45
#include "yb/util/flag_tags.h"
46
#include "yb/util/logging.h"
47
#include "yb/util/mem_tracker.h"
48
#include "yb/util/metrics.h"
49
#include "yb/util/status_log.h"
50
#include "yb/util/stopwatch.h"
51
#include "yb/util/thread.h"
52
#include "yb/util/unique_lock.h"
53
54
using std::pair;
55
using std::shared_ptr;
56
using strings::Substitute;
57
58
using namespace std::literals;
59
60
DEFINE_int32(maintenance_manager_num_threads, 1,
61
       "Size of the maintenance manager thread pool. Beyond a value of '1', one thread is "
62
       "reserved for emergency flushes. For spinning disks, the number of threads should "
63
       "not be above the number of devices.");
64
TAG_FLAG(maintenance_manager_num_threads, stable);
65
66
DEFINE_int32(maintenance_manager_polling_interval_ms, 250,
67
       "Polling interval for the maintenance manager scheduler, "
68
       "in milliseconds.");
69
TAG_FLAG(maintenance_manager_polling_interval_ms, hidden);
70
71
DEFINE_int32(maintenance_manager_history_size, 8,
72
       "Number of completed operations the manager is keeping track of.");
73
TAG_FLAG(maintenance_manager_history_size, hidden);
74
75
DEFINE_bool(enable_maintenance_manager, true,
76
       "Enable the maintenance manager, runs compaction and tablet cleaning tasks.");
77
TAG_FLAG(enable_maintenance_manager, unsafe);
78
79
namespace yb {
80
81
using yb::tablet::MaintenanceManagerStatusPB;
82
using yb::tablet::MaintenanceManagerStatusPB_CompletedOpPB;
83
using yb::tablet::MaintenanceManagerStatusPB_MaintenanceOpPB;
84
85
88.7k
MaintenanceOpStats::MaintenanceOpStats() {
86
88.7k
  Clear();
87
88.7k
}
88
89
6.70M
void MaintenanceOpStats::Clear() {
90
6.70M
  valid_ = false;
91
6.70M
  runnable_ = false;
92
6.70M
  ram_anchored_ = 0;
93
6.70M
  logs_retained_bytes_ = 0;
94
6.70M
  perf_improvement_ = 0;
95
6.70M
}
96
97
MaintenanceOp::MaintenanceOp(std::string name, IOUsage io_usage)
98
88.7k
    : name_(std::move(name)), io_usage_(io_usage) {}
99
100
47.7k
MaintenanceOp::~MaintenanceOp() {
101
0
  CHECK(!manager_.get()) << "You must unregister the " << name_
102
0
         << " Op before destroying it.";
103
47.7k
  CHECK_EQ(running_, 0);
104
47.7k
}
105
106
47.7k
void MaintenanceOp::Unregister() {
107
2
  CHECK(manager_.get()) << "Op " << name_ << " was never registered.";
108
47.7k
  manager_->UnregisterOp(this);
109
47.7k
}
110
111
const MaintenanceManager::Options MaintenanceManager::DEFAULT_OPTIONS = {
112
  0,
113
  0,
114
  0,
115
  shared_ptr<MemTracker>(),
116
};
117
118
MaintenanceManager::MaintenanceManager(const Options& options)
119
  : num_threads_(options.num_threads <= 0 ?
120
      FLAGS_maintenance_manager_num_threads : options.num_threads),
121
    polling_interval_ms_(options.polling_interval_ms <= 0 ?
122
          FLAGS_maintenance_manager_polling_interval_ms :
123
          options.polling_interval_ms),
124
    parent_mem_tracker_(!options.parent_mem_tracker ?
125
11.5k
        MemTracker::GetRootTracker() : options.parent_mem_tracker) {
126
11.5k
  CHECK_OK(ThreadPoolBuilder("MaintenanceMgr").set_min_threads(num_threads_)
127
11.5k
               .set_max_threads(num_threads_).Build(&thread_pool_));
128
11.5k
  uint32_t history_size = options.history_size == 0 ?
129
11.5k
                          FLAGS_maintenance_manager_history_size :
130
5
                          options.history_size;
131
11.5k
  completed_ops_.resize(history_size);
132
11.5k
}
133
134
210
MaintenanceManager::~MaintenanceManager() {
135
210
  Shutdown();
136
210
}
137
138
11.2k
Status MaintenanceManager::Init() {
139
11.2k
  RETURN_NOT_OK(Thread::Create(
140
11.2k
      "maintenance", "maintenance_scheduler",
141
11.2k
      std::bind(&MaintenanceManager::RunSchedulerThread, this), &monitor_thread_));
142
11.2k
  return Status::OK();
143
11.2k
}
144
145
406
void MaintenanceManager::Shutdown() {
146
406
  {
147
406
    std::lock_guard<std::mutex> guard(mutex_);
148
406
    if (shutdown_) {
149
164
      return;
150
164
    }
151
242
    shutdown_ = true;
152
242
    cond_.notify_all();
153
242
  }
154
242
  if (monitor_thread_.get()) {
155
192
    CHECK_OK(ThreadJoiner(monitor_thread_.get()).Join());
156
192
    monitor_thread_.reset();
157
192
    thread_pool_->Shutdown();
158
192
  }
159
242
}
160
161
88.7k
void MaintenanceManager::RegisterOp(MaintenanceOp* op) {
162
88.7k
  std::lock_guard<std::mutex> lock(mutex_);
163
18.4E
  CHECK(!op->manager_.get()) << "Tried to register " << op->name()
164
18.4E
          << ", but it was already registered.";
165
88.7k
  auto inserted = ops_.emplace(op, MaintenanceOpStats()).second;
166
18.4E
  CHECK(inserted)
167
18.4E
      << "Tried to register " << op->name()
168
18.4E
      << ", but it already exists in ops_.";
169
88.7k
  op->manager_ = shared_from_this();
170
18.4E
  VLOG_AND_TRACE("maintenance", 1) << "Registered " << op->name();
171
88.7k
}
172
173
47.7k
void MaintenanceManager::UnregisterOp(MaintenanceOp* op) {
174
47.7k
  {
175
47.7k
    UNIQUE_LOCK(lock, mutex_);
176
177
18.4E
    CHECK(op->manager_.get() == this) << "Tried to unregister " << op->name()
178
18.4E
          << ", but it is not currently registered with this maintenance manager.";
179
    // While the op is running, wait for it to be finished.
180
47.7k
    if (op->running_ > 0) {
181
5
      VLOG_AND_TRACE("maintenance", 1) << "Waiting for op " << op->name() << " to finish so "
182
0
            << "we can unregister it.";
183
5
      for (;;) {
184
5
        auto wait_status = op->cond_.wait_for(GetLockForCondition(&lock), 15s);
185
5
        if (op->running_ == 0) {
186
5
          break;
187
5
        }
188
0
        LOG_IF(DFATAL, wait_status == std::cv_status::timeout)
189
0
            << "Op " << op->name() << " running for too long: " << op->running_;
190
0
      }
191
5
    }
192
47.7k
    auto iter = ops_.find(op);
193
18.4E
    CHECK(iter != ops_.end())
194
18.4E
        << "Tried to unregister " << op->name() << ", but it was never registered";
195
47.7k
    CHECK_EQ(iter->first, op);
196
47.7k
    ops_.erase(iter);
197
47.7k
  }
198
47.7k
  LOG(INFO) << "Unregistered op " << op->name();
199
  // Remove the op's shared_ptr reference to us.  This might 'delete this'.
200
47.7k
  op->manager_ = nullptr;
201
47.7k
}
202
203
11.2k
void MaintenanceManager::RunSchedulerThread() {
204
11.2k
  auto polling_interval = polling_interval_ms_ * 1ms;
205
206
11.2k
  UNIQUE_LOCK(lock, mutex_);
207
1.21M
  for (;;) {
208
    // Loop until we are shutting down or it is time to run another op.
209
1.21M
    cond_.wait_for(GetLockForCondition(&lock), polling_interval);
210
1.21M
    if (shutdown_) {
211
192
      VLOG_AND_TRACE("maintenance", 1) << "Shutting down maintenance manager.";
212
192
      return;
213
192
    }
214
215
    // Find the best op.
216
1.21M
    MaintenanceOp* op = FindBestOp();
217
1.21M
    if (!op) {
218
1.20M
      VLOG_AND_TRACE("maintenance", 2) << "No maintenance operations look worth doing.";
219
1.20M
      continue;
220
1.20M
    }
221
222
    // Prepare the maintenance operation.
223
11.0k
    ScopedMaintenanceOpRun scoped_run(op);
224
11.0k
    bool ready;
225
11.0k
    {
226
11.0k
      ReverseLock<decltype(lock)> rlock(lock);
227
11.0k
      ready = op->Prepare();
228
11.0k
      if (!ready) {
229
0
        LOG(INFO) << "Prepare failed for " << op->name() << ".  Re-running scheduler.";
230
0
        continue;
231
0
      }
232
233
      // Run the maintenance operation.
234
11.0k
      Status s = thread_pool_->SubmitFunc(
235
11.0k
          std::bind(&MaintenanceManager::LaunchOp, this, std::move(scoped_run)));
236
11.0k
      CHECK(s.ok());
237
11.0k
    }
238
11.0k
  }
239
11.2k
}
240
241
// Finding the best operation goes through four filters:
242
// - If there's an Op that we can run quickly that frees log retention, we run it.
243
// - If we've hit the overall process memory limit (note: this includes memory that the Ops cannot
244
//   free), we run the Op with the highest RAM usage.
245
// - If there are Ops that retain logs, we run the one that has the highest retention (and if many
246
//   qualify, then we run the one that also frees up the most RAM).
247
// - Finally, if there's nothing else that we really need to do, we run the Op that will improve
248
//   performance the most.
249
//
250
// The reason it's done this way is that we want to prioritize limiting the amount of resources we
251
// hold on to. Low IO Ops go first since we can quickly run them, then we can look at memory usage.
252
// Reversing those can starve the low IO Ops when the system is under intense memory pressure.
253
//
254
// In the third priority we're at a point where nothing's urgent and there's nothing we can run
255
// quickly.
256
// TODO We currently optimize for freeing log retention but we could consider having some sort of
257
// sliding priority between log retention and RAM usage. For example, is an Op that frees
258
// 128MB of log retention and 12MB of RAM always better than an op that frees 12MB of log retention
259
// and 128MB of RAM? Maybe a more holistic approach would be better.
260
1.20M
MaintenanceOp* MaintenanceManager::FindBestOp() {
261
1.20M
  TRACE_EVENT0("maintenance", "MaintenanceManager::FindBestOp");
262
1.20M
  if (!FLAGS_enable_maintenance_manager) {
263
32
    VLOG_AND_TRACE("maintenance", 1) << "Maintenance manager is disabled. Doing nothing";
264
32
    return nullptr;
265
32
  }
266
1.20M
  size_t free_threads = num_threads_ - running_ops_;
267
1.20M
  if (free_threads == 0) {
268
0
    VLOG_AND_TRACE("maintenance", 1) << "There are no free threads, so we can't run anything.";
269
0
    return nullptr;
270
0
  }
271
272
1.20M
  int64_t low_io_most_logs_retained_bytes = 0;
273
1.20M
  MaintenanceOp* low_io_most_logs_retained_bytes_op = nullptr;
274
275
1.20M
  uint64_t most_mem_anchored = 0;
276
1.20M
  MaintenanceOp* most_mem_anchored_op = nullptr;
277
278
1.20M
  int64_t most_logs_retained_bytes = 0;
279
1.20M
  uint64_t most_logs_retained_bytes_ram_anchored = 0;
280
1.20M
  MaintenanceOp* most_logs_retained_bytes_op = nullptr;
281
282
1.20M
  double best_perf_improvement = 0;
283
1.20M
  MaintenanceOp* best_perf_improvement_op = nullptr;
284
6.61M
  for (OpMapTy::value_type &val : ops_) {
285
6.61M
    MaintenanceOp* op(val.first);
286
6.61M
    MaintenanceOpStats& stats(val.second);
287
    // Update op stats.
288
6.61M
    stats.Clear();
289
6.61M
    op->UpdateStats(&stats);
290
6.61M
    if (!stats.valid() || !stats.runnable()) {
291
938
      continue;
292
938
    }
293
6.61M
    if (stats.logs_retained_bytes() > low_io_most_logs_retained_bytes &&
294
5
        op->io_usage_ == MaintenanceOp::LOW_IO_USAGE) {
295
2
      low_io_most_logs_retained_bytes_op = op;
296
2
      low_io_most_logs_retained_bytes = stats.logs_retained_bytes();
297
2
    }
298
299
6.61M
    if (stats.ram_anchored() > most_mem_anchored) {
300
30
      most_mem_anchored_op = op;
301
30
      most_mem_anchored = stats.ram_anchored();
302
30
    }
303
    // We prioritize ops that can free more logs, but when it's the same we pick the one that
304
    // also frees up the most memory.
305
6.61M
    if (stats.logs_retained_bytes() > 0 &&
306
7
        (stats.logs_retained_bytes() > most_logs_retained_bytes ||
307
3
            (stats.logs_retained_bytes() == most_logs_retained_bytes &&
308
7
                stats.ram_anchored() > most_logs_retained_bytes_ram_anchored))) {
309
7
      most_logs_retained_bytes_op = op;
310
7
      most_logs_retained_bytes = stats.logs_retained_bytes();
311
7
      most_logs_retained_bytes_ram_anchored = stats.ram_anchored();
312
7
    }
313
6.61M
    if ((!best_perf_improvement_op) ||
314
5.73M
        (stats.perf_improvement() > best_perf_improvement)) {
315
879k
      best_perf_improvement_op = op;
316
879k
      best_perf_improvement = stats.perf_improvement();
317
879k
    }
318
6.61M
  }
319
320
  // Look at ops that we can run quickly that free up log retention.
321
1.20M
  if (low_io_most_logs_retained_bytes_op) {
322
2
    if (low_io_most_logs_retained_bytes > 0) {
323
2
      VLOG_AND_TRACE("maintenance", 1)
324
0
                    << "Performing " << low_io_most_logs_retained_bytes_op->name() << ", "
325
0
                    << "because it can free up more logs "
326
0
                    << "at " << low_io_most_logs_retained_bytes
327
0
                    << " bytes with a low IO cost";
328
2
      return low_io_most_logs_retained_bytes_op;
329
2
    }
330
1.20M
  }
331
332
  // Look at free memory. If it is dangerously low, we must select something
333
  // that frees memory-- the op with the most anchored memory.
334
1.20M
  auto soft_limit_exceeded_result = parent_mem_tracker_->AnySoftLimitExceeded(0.0 /* score */);
335
1.20M
  if (soft_limit_exceeded_result.exceeded) {
336
49
    if (!most_mem_anchored_op) {
337
47
      string msg = StringPrintf("we have exceeded our soft memory limit "
338
47
          "(current capacity is %.2f%%).  However, there are no ops currently "
339
47
          "runnable which would free memory.", soft_limit_exceeded_result.current_capacity_pct);
340
47
      YB_LOG_EVERY_N_SECS(INFO, 5) << msg;
341
47
      return nullptr;
342
47
    }
343
2
    VLOG_AND_TRACE("maintenance", 1) << "we have exceeded our soft memory limit "
344
0
            << "(current capacity is " << soft_limit_exceeded_result.current_capacity_pct << "%). "
345
0
            << "Running the op which anchors the most memory: " << most_mem_anchored_op->name();
346
2
    return most_mem_anchored_op;
347
2
  }
348
349
1.20M
  if (most_logs_retained_bytes_op) {
350
2
    VLOG_AND_TRACE("maintenance", 1)
351
0
            << "Performing " << most_logs_retained_bytes_op->name() << ", "
352
0
            << "because it can free up more logs " << "at " << most_logs_retained_bytes
353
0
            << " bytes";
354
2
    return most_logs_retained_bytes_op;
355
2
  }
356
357
1.20M
  if (best_perf_improvement_op) {
358
879k
    if (best_perf_improvement > 0) {
359
5
      VLOG_AND_TRACE("maintenance", 1) << "Performing " << best_perf_improvement_op->name() << ", "
360
0
                 << "because it had the best perf_improvement score, "
361
0
                 << "at " << best_perf_improvement;
362
5
      return best_perf_improvement_op;
363
5
    }
364
1.20M
  }
365
1.20M
  return nullptr;
366
1.20M
}
367
368
8
void MaintenanceManager::LaunchOp(const ScopedMaintenanceOpRun& run) {
369
8
  auto op = run.get();
370
8
  MonoTime start_time(MonoTime::Now());
371
8
  op->RunningGauge()->Increment();
372
8
  LOG_TIMING(INFO, Substitute("running $0", op->name())) {
373
8
    TRACE_EVENT1("maintenance", "MaintenanceManager::LaunchOp",
374
8
                 "name", op->name());
375
8
    op->Perform();
376
8
  }
377
8
  op->RunningGauge()->Decrement();
378
8
  MonoTime end_time(MonoTime::Now());
379
8
  MonoDelta delta(end_time.GetDeltaSince(start_time));
380
8
  std::lock_guard<std::mutex> lock(mutex_);
381
382
8
  CompletedOp& completed_op = completed_ops_[completed_ops_count_ % completed_ops_.size()];
383
8
  completed_op.name = op->name();
384
8
  completed_op.duration = delta;
385
8
  completed_op.start_mono_time = start_time;
386
8
  completed_ops_count_++;
387
388
8
  op->DurationHistogram()->Increment(delta.ToMilliseconds());
389
8
}
390
391
5
void MaintenanceManager::GetMaintenanceManagerStatusDump(MaintenanceManagerStatusPB* out_pb) {
392
5
  DCHECK(out_pb != nullptr);
393
5
  std::lock_guard<std::mutex> lock(mutex_);
394
5
  MaintenanceOp* best_op = FindBestOp();
395
0
  for (MaintenanceManager::OpMapTy::value_type& val : ops_) {
396
0
    MaintenanceManagerStatusPB_MaintenanceOpPB* op_pb = out_pb->add_registered_operations();
397
0
    MaintenanceOp* op(val.first);
398
0
    MaintenanceOpStats& stat(val.second);
399
0
    op_pb->set_name(op->name());
400
0
    op_pb->set_running(op->running());
401
0
    if (stat.valid()) {
402
0
      op_pb->set_runnable(stat.runnable());
403
0
      op_pb->set_ram_anchored_bytes(stat.ram_anchored());
404
0
      op_pb->set_logs_retained_bytes(stat.logs_retained_bytes());
405
0
      op_pb->set_perf_improvement(stat.perf_improvement());
406
0
    } else {
407
0
      op_pb->set_runnable(false);
408
0
      op_pb->set_ram_anchored_bytes(0);
409
0
      op_pb->set_logs_retained_bytes(0);
410
0
      op_pb->set_perf_improvement(0);
411
0
    }
412
413
0
    if (best_op == op) {
414
0
      out_pb->mutable_best_op()->CopyFrom(*op_pb);
415
0
    }
416
0
  }
417
418
20
  for (const CompletedOp& completed_op : completed_ops_) {
419
20
    if (!completed_op.name.empty()) {
420
14
      MaintenanceManagerStatusPB_CompletedOpPB* completed_pb = out_pb->add_completed_operations();
421
14
      completed_pb->set_name(completed_op.name);
422
14
      completed_pb->set_duration_millis(
423
14
          narrow_cast<int32_t>(completed_op.duration.ToMilliseconds()));
424
425
14
      MonoDelta delta(MonoTime::Now().GetDeltaSince(completed_op.start_mono_time));
426
14
      completed_pb->set_secs_since_start(delta.ToSeconds());
427
14
    }
428
20
  }
429
5
}
430
431
8
ScopedMaintenanceOpRun::ScopedMaintenanceOpRun(MaintenanceOp* op) : op_(op) {
432
8
  ++op->running_;
433
8
  ++op->manager_->running_ops_;
434
8
}
435
436
0
ScopedMaintenanceOpRun::ScopedMaintenanceOpRun(const ScopedMaintenanceOpRun& rhs) : op_(rhs.op_) {
437
0
  Assign(rhs.op_);
438
0
}
439
440
0
void ScopedMaintenanceOpRun::operator=(const ScopedMaintenanceOpRun& rhs) {
441
0
  Reset();
442
0
  Assign(rhs.op_);
443
0
}
444
445
16
ScopedMaintenanceOpRun::ScopedMaintenanceOpRun(ScopedMaintenanceOpRun&& rhs) : op_(rhs.op_) {
446
16
  rhs.op_ = nullptr;
447
16
}
448
449
0
void ScopedMaintenanceOpRun::operator=(ScopedMaintenanceOpRun&& rhs) {
450
0
  Reset();
451
0
  op_ = rhs.op_;
452
0
  rhs.op_ = nullptr;
453
0
}
454
455
24
ScopedMaintenanceOpRun::~ScopedMaintenanceOpRun() {
456
24
  Reset();
457
24
}
458
459
24
void ScopedMaintenanceOpRun::Reset() {
460
24
  if (!op_) {
461
16
    return;
462
16
  }
463
8
  std::lock_guard<std::mutex> lock(op_->manager_->mutex_);
464
8
  if (--op_->running_ == 0) {
465
8
    op_->cond_.notify_all();
466
8
  }
467
8
  --op_->manager_->running_ops_;
468
8
  op_ = nullptr;
469
8
}
470
471
8
MaintenanceOp* ScopedMaintenanceOpRun::get() const {
472
8
  return op_;
473
8
}
474
475
0
void ScopedMaintenanceOpRun::Assign(MaintenanceOp* op) {
476
0
  op_ = op;
477
0
  std::lock_guard<std::mutex> lock(op_->manager_->mutex_);
478
0
  ++op->running_;
479
0
  ++op->manager_->running_ops_;
480
0
}
481
482
} // namespace yb