YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/util/taskstream.h
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#ifndef YB_UTIL_TASKSTREAM_H
15
#define YB_UTIL_TASKSTREAM_H
16
17
#include <atomic>
18
#include <chrono>
19
#include <condition_variable>
20
#include <memory>
21
#include <vector>
22
23
#include <gflags/gflags.h>
24
25
#include "yb/util/status_fwd.h"
26
#include "yb/util/blocking_queue.h"
27
#include "yb/util/status_format.h"
28
#include "yb/util/thread.h"
29
#include "yb/util/threadpool.h"
30
31
using namespace std::chrono_literals;
32
33
using std::vector;
34
35
namespace yb {
36
class ThreadPool;
37
class ThreadPoolToken;
38
39
template <typename T>
40
class TaskStreamImpl;
41
42
YB_DEFINE_ENUM(TaskStreamRunState, (kIdle)(kSubmit)(kDrain)(kProcess)(kComplete)(kFinish));
43
44
template <typename T>
45
// TaskStream has a thread pool token in the given thread pool.
46
// TaskStream does not manage a thread but only submits to the token in the thread pool.
47
// When we submit tasks to the taskstream, it adds tasks to the queue to be processed.
48
// The internal Run function will call the user-provided function,
49
// for each element in the queue in a loop.
50
// When the queue is empty, it calls the user-provided function with no parameter,
51
// to indicate it needs to process the end of the group of tasks processed.
52
// This feature is used for the preparer and appender functionality.
53
class TaskStream {
54
 public:
55
  explicit TaskStream(std::function<void(T*)> process_item,
56
                      ThreadPool* thread_pool,
57
                      int32_t queue_max_size,
58
                      const MonoDelta& queue_max_wait);
59
  ~TaskStream();
60
61
  CHECKED_STATUS Start();
62
  void Stop();
63
64
  CHECKED_STATUS Submit(T* item);
65
66
  CHECKED_STATUS TEST_SubmitFunc(const std::function<void()>& func);
67
68
0
  std::string GetRunThreadStack() {
69
0
    auto result = ThreadStack(run_tid_);
70
0
    if (!result.ok()) {
71
0
      return result.status().ToString();
72
0
    }
73
0
    return (*result).Symbolize();
74
0
  }
75
76
0
  std::string ToString() const {
77
0
    return YB_CLASS_TO_STRING(queue, run_state, stopped, stop_requested);
78
0
  }
79
80
 private:
81
  using RunState = TaskStreamRunState;
82
83
75.7M
  void ChangeRunState(RunState expected_old_state, RunState new_state) {
84
75.7M
    auto old_state = run_state_.exchange(new_state, std::memory_order_acq_rel);
85
18.4E
    LOG_IF(DFATAL, old_state != expected_old_state)
86
18.4E
        << "Task stream was in wrong state " << old_state << " while "
87
18.4E
        << expected_old_state << " was expected";
88
75.7M
  }
89
90
  // We set this to true to tell the Run function to return. No new tasks will be accepted, but
91
  // existing tasks will still be processed.
92
  std::atomic<bool> stop_requested_{false};
93
94
  std::atomic<RunState> run_state_{RunState::kIdle};
95
96
  // This is set to true immediately before the thread exits.
97
  std::atomic<bool> stopped_{false};
98
99
  // The objects in the queue are owned by the queue and ownership gets tranferred to ProcessItem.
100
  BlockingQueue<T*> queue_;
101
102
  // This mutex/condition combination is used in Stop() in case multiple threads are calling that
103
  // function concurrently. One of them will ask the taskstream thread to stop and wait for it, and
104
  // then will notify other threads that have called Stop().
105
  std::mutex stop_mtx_;
106
  std::condition_variable stop_cond_;
107
108
  std::unique_ptr<ThreadPoolToken> taskstream_pool_token_;
109
  std::function<void(T*)> process_item_;
110
  ThreadIdForStack run_tid_ = 0;
111
112
  // Maximum time to wait for the queue to become non-empty.
113
  const MonoDelta queue_max_wait_;
114
115
  void Run();
116
  void ProcessItem(T* item);
117
};
118
119
template <typename T>
120
TaskStream<T>::TaskStream(std::function<void(T*)> process_item,
121
                          ThreadPool* thread_pool,
122
                          int32_t queue_max_size,
123
                          const MonoDelta& queue_max_wait)
124
    : queue_(queue_max_size),
125
      // run_state_ is responsible for serial execution, so we use concurrent here to avoid
126
      // unnecessary checks in thread pool.
127
      taskstream_pool_token_(thread_pool->NewToken(ThreadPool::ExecutionMode::CONCURRENT)),
128
      process_item_(process_item),
129
150k
      queue_max_wait_(queue_max_wait) {
130
150k
}
131
132
template <typename T>
133
76.4k
TaskStream<T>::~TaskStream() {
134
76.4k
  Stop();
135
76.4k
}
136
137
template <typename T>
138
Status TaskStream<T>::Start() {
139
  VLOG(1) << "Starting the TaskStream";
140
  return Status::OK();
141
}
142
143
template <typename T>
144
152k
void TaskStream<T>::Stop() {
145
18.4E
  VLOG(1) << "Stopping the TaskStream";
146
152k
  auto scope_exit = ScopeExit([] {
147
152k
    VLOG
(1) << "The TaskStream has stopped"1
;
148
152k
  });
149
152k
  queue_.Shutdown();
150
152k
  if (stopped_.load(std::memory_order_acquire)) {
151
76.5k
    return;
152
76.5k
  }
153
76.4k
  stop_requested_ = true;
154
76.4k
  {
155
76.4k
    std::unique_lock<std::mutex> stop_lock(stop_mtx_);
156
107k
    stop_cond_.wait(stop_lock, [this] {
157
107k
      return (run_state_.load(std::memory_order_acquire) == RunState::kIdle && 
queue_.empty()76.4k
);
158
107k
    });
159
76.4k
  }
160
76.4k
  stopped_.store(true, std::memory_order_release);
161
76.4k
}
162
163
template <typename T>
164
25.0M
Status TaskStream<T>::Submit(T *task) {
165
25.0M
  if (stop_requested_.load(std::memory_order_acquire)) {
166
0
    return STATUS(IllegalState, "Tablet is shutting down");
167
0
  }
168
25.0M
  if (!queue_.BlockingPut(task)) {
169
0
    return STATUS_FORMAT(ServiceUnavailable,
170
0
                         "TaskStream queue is full (max capacity $0)",
171
0
                         queue_.max_size());
172
0
  }
173
174
25.0M
  RunState expected = RunState::kIdle;
175
25.0M
  if (!run_state_.compare_exchange_strong(
176
25.0M
          expected, RunState::kSubmit, std::memory_order_acq_rel)) {
177
    // run_state_ was not idle, so we are not creating a task to process operations.
178
24.7M
    return Status::OK();
179
24.7M
  }
180
333k
  return taskstream_pool_token_->SubmitFunc(std::bind(&TaskStream::Run, this));
181
25.0M
}
182
183
template <typename T>
184
13
Status TaskStream<T>::TEST_SubmitFunc(const std::function<void()>& func) {
185
13
  return taskstream_pool_token_->SubmitFunc(func);
186
13
}
187
188
template <typename T>
189
333k
void TaskStream<T>::Run() {
190
333k
  VLOG
(1) << "Starting taskstream task:" << this6
;
191
333k
  ChangeRunState(RunState::kSubmit, RunState::kDrain);
192
333k
  run_tid_ = Thread::CurrentThreadIdForStack();
193
25.2M
  for (;;) {
194
25.2M
    MonoTime wait_timeout_deadline = MonoTime::Now() + queue_max_wait_;
195
25.2M
    std::vector<T *> group;
196
25.2M
    queue_.BlockingDrainTo(&group, wait_timeout_deadline);
197
25.2M
    if (!group.empty()) {
198
24.9M
      ChangeRunState(RunState::kDrain, RunState::kProcess);
199
25.0M
      for (T* item : group) {
200
25.0M
        ProcessItem(item);
201
25.0M
      }
202
24.9M
      ChangeRunState(RunState::kProcess, RunState::kComplete);
203
24.9M
      ProcessItem(nullptr);
204
24.9M
      group.clear();
205
24.9M
      ChangeRunState(RunState::kComplete, RunState::kDrain);
206
24.9M
      continue;
207
24.9M
    }
208
333k
    ChangeRunState(RunState::kDrain, RunState::kFinish);
209
    // Not processing and queue empty, return from task.
210
333k
    std::unique_lock<std::mutex> stop_lock(stop_mtx_);
211
333k
    ChangeRunState(RunState::kFinish, RunState::kIdle);
212
333k
    if (!queue_.empty()) {
213
      // Got more operations, try stay in the loop.
214
5
      RunState expected = RunState::kIdle;
215
5
      if (run_state_.compare_exchange_strong(
216
5
              expected, RunState::kDrain, std::memory_order_acq_rel)) {
217
5
        continue;
218
5
      }
219
5
    }
220
333k
    if (stop_requested_.load(std::memory_order_acquire)) {
221
18.4E
      VLOG(1) << "TaskStream task's Run() function is returning because stop is requested.";
222
30.6k
      stop_cond_.notify_all();
223
30.6k
      return;
224
30.6k
    }
225
303k
    VLOG
(1) << "Returning from TaskStream task after inactivity:" << this21.2k
;
226
303k
    return;
227
333k
  }
228
333k
}
229
230
template <typename T>
231
50.0M
void TaskStream<T>::ProcessItem(T* item) {
232
50.0M
  process_item_(item);
233
50.0M
}
234
235
}  // namespace yb
236
237
#endif  // YB_UTIL_TASKSTREAM_H