YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/util/priority_thread_pool.h
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#ifndef YB_UTIL_PRIORITY_THREAD_POOL_H
15
#define YB_UTIL_PRIORITY_THREAD_POOL_H
16
17
#include <memory>
18
19
#include <gflags/gflags_declare.h>
20
21
#include "yb/gutil/casts.h"
22
23
#include "yb/util/status_fwd.h"
24
#include "yb/util/status.h"
25
#include "yb/util/metrics.h"
26
27
namespace yb {
28
29
class MetricEntity;
30
31
struct CompactionInfo {
32
  uint64_t file_count;
33
  uint64_t byte_count;
34
};
35
36
const CompactionInfo kNoCompactionInfo = {uint64_t(0), uint64_t(0)};
37
38
39
// PriorityThreadPoolSuspender is provided to task ran by thread pool, task could use it to check
40
// whether is should be preempted in favor of another task with higher priority.
41
class PriorityThreadPoolSuspender {
42
 public:
43
  virtual void PauseIfNecessary() = 0;
44
123
  virtual ~PriorityThreadPoolSuspender() {}
45
};
46
47
class PriorityThreadPoolTask {
48
 public:
49
  PriorityThreadPoolTask();
50
51
20.5k
  virtual ~PriorityThreadPoolTask() = default;
52
53
  // If status is OK - execute this task in the current thread.
54
  // Otherwise - abort task with specified status.
55
  virtual void Run(const Status& status, PriorityThreadPoolSuspender* suspender) = 0;
56
57
  // Returns true if the task belongs to specified key, which was passed to
58
  // PriorityThreadPool::Remove and and should be removed when we remove key.
59
  virtual bool ShouldRemoveWithKey(void* key) = 0;
60
61
  virtual std::string ToString() const = 0;
62
63
  // For compaction tasks, returns the number of files and bytes that the task is compacting.
64
  // For non-compaction tasks, returns a value of 0 for each.
65
0
  virtual CompactionInfo GetFileAndByteInfoIfCompaction() const {
66
0
    return kNoCompactionInfo;
67
0
  }
68
69
21.8k
  size_t SerialNo() const {
70
21.8k
    return serial_no_;
71
21.8k
  }
72
73
 private:
74
  const size_t serial_no_;
75
};
76
77
// Tasks submitted to this pool have assigned priority and are picked from queue using it.
78
class PriorityThreadPool {
79
 public:
80
  explicit PriorityThreadPool(size_t max_running_tasks,
81
    const scoped_refptr<MetricEntity>& metric_entity = nullptr);
82
  ~PriorityThreadPool();
83
84
  // Submit task to the pool.
85
  // On success task ownership is transferred to the pool, i.e. `task` would point to nullptr.
86
  CHECKED_STATUS Submit(int priority, std::unique_ptr<PriorityThreadPoolTask>* task);
87
88
  template <class Task>
89
1.17k
  CHECKED_STATUS Submit(int priority, std::unique_ptr<Task>* task) {
90
1.17k
    std::unique_ptr<PriorityThreadPoolTask> temp_task = std::move(*task);
91
1.17k
    auto result = Submit(priority, &temp_task);
92
1.17k
    task->reset(down_cast<Task*>(temp_task.release()));
93
1.17k
    return result;
94
1.17k
  }
95
96
  // Remove all removable (see PriorityThreadPoolTask::ShouldRemoveWithKey) tasks with provided key
97
  // from the pool.
98
  void Remove(void* key);
99
100
  // Change priority of task with specified serial no.
101
  // Returns true if change was performed.
102
  bool ChangeTaskPriority(size_t serial_no, int priority);
103
104
  void Shutdown() {
105
    StartShutdown();
106
    CompleteShutdown();
107
  }
108
109
  // Two step shutdown paradigm is used to prevent deadlock when shutting down multiple components.
110
  // There could be case when one component wait until other component aborts specific job, but
111
  // it is not done since shutdown of second component is invoked after shutdown of the first one.
112
  // To avoid this case StartShutdown could be invoked on both of them, then CompleteShutdown waits
113
  // until they complete it.
114
115
  // Initiates shutdown of this pool. All new tasks will be aborted after this point.
116
  void StartShutdown();
117
118
  // Completes shutdown of this pool. It is safe to destroy pool after it.
119
  void CompleteShutdown();
120
121
  // Dumps state to string, useful for debugging.
122
  std::string StateToString();
123
124
  void TEST_SetThreadCreationFailureProbability(double probability);
125
126
  size_t TEST_num_tasks_pending();
127
128
 private:
129
  class Impl;
130
  std::unique_ptr<Impl> impl_;
131
};
132
133
} // namespace yb
134
135
#endif // YB_UTIL_PRIORITY_THREAD_POOL_H