YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/util/taskstream.h
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#ifndef YB_UTIL_TASKSTREAM_H
15
#define YB_UTIL_TASKSTREAM_H
16
17
#include <atomic>
18
#include <chrono>
19
#include <condition_variable>
20
#include <memory>
21
#include <vector>
22
23
#include <gflags/gflags.h>
24
25
#include "yb/util/status_fwd.h"
26
#include "yb/util/blocking_queue.h"
27
#include "yb/util/status_format.h"
28
#include "yb/util/thread.h"
29
#include "yb/util/threadpool.h"
30
31
using namespace std::chrono_literals;
32
33
using std::vector;
34
35
namespace yb {
36
class ThreadPool;
37
class ThreadPoolToken;
38
39
template <typename T>
40
class TaskStreamImpl;
41
42
YB_DEFINE_ENUM(TaskStreamRunState, (kIdle)(kSubmit)(kDrain)(kProcess)(kComplete)(kFinish));
43
44
template <typename T>
45
// TaskStream has a thread pool token in the given thread pool.
46
// TaskStream does not manage a thread but only submits to the token in the thread pool.
47
// When we submit tasks to the taskstream, it adds tasks to the queue to be processed.
48
// The internal Run function will call the user-provided function,
49
// for each element in the queue in a loop.
50
// When the queue is empty, it calls the user-provided function with no parameter,
51
// to indicate it needs to process the end of the group of tasks processed.
52
// This feature is used for the preparer and appender functionality.
53
class TaskStream {
54
 public:
55
  explicit TaskStream(std::function<void(T*)> process_item,
56
                      ThreadPool* thread_pool,
57
                      int32_t queue_max_size,
58
                      const MonoDelta& queue_max_wait);
59
  ~TaskStream();
60
61
  CHECKED_STATUS Start();
62
  void Stop();
63
64
  CHECKED_STATUS Submit(T* item);
65
66
  CHECKED_STATUS TEST_SubmitFunc(const std::function<void()>& func);
67
68
0
  std::string GetRunThreadStack() {
69
0
    auto result = ThreadStack(run_tid_);
70
0
    if (!result.ok()) {
71
0
      return result.status().ToString();
72
0
    }
73
0
    return (*result).Symbolize();
74
0
  }
75
76
0
  std::string ToString() const {
77
0
    return YB_CLASS_TO_STRING(queue, run_state, stopped, stop_requested);
78
0
  }
79
80
 private:
81
  using RunState = TaskStreamRunState;
82
83
42.0M
  void ChangeRunState(RunState expected_old_state, RunState new_state) {
84
42.0M
    auto old_state = run_state_.exchange(new_state, std::memory_order_acq_rel);
85
18.4E
    LOG_IF(DFATAL, old_state != expected_old_state)
86
18.4E
        << "Task stream was in wrong state " << old_state << " while "
87
18.4E
        << expected_old_state << " was expected";
88
42.0M
  }
_ZN2yb10TaskStreamIiE14ChangeRunStateENS_18TaskStreamRunStateES2_
Line
Count
Source
83
18
  void ChangeRunState(RunState expected_old_state, RunState new_state) {
84
18
    auto old_state = run_state_.exchange(new_state, std::memory_order_acq_rel);
85
0
    LOG_IF(DFATAL, old_state != expected_old_state)
86
0
        << "Task stream was in wrong state " << old_state << " while "
87
0
        << expected_old_state << " was expected";
88
18
  }
_ZN2yb10TaskStreamINS_3log13LogEntryBatchEE14ChangeRunStateENS_18TaskStreamRunStateES4_
Line
Count
Source
83
42.0M
  void ChangeRunState(RunState expected_old_state, RunState new_state) {
84
42.0M
    auto old_state = run_state_.exchange(new_state, std::memory_order_acq_rel);
85
18.4E
    LOG_IF(DFATAL, old_state != expected_old_state)
86
18.4E
        << "Task stream was in wrong state " << old_state << " while "
87
18.4E
        << expected_old_state << " was expected";
88
42.0M
  }
89
90
  // We set this to true to tell the Run function to return. No new tasks will be accepted, but
91
  // existing tasks will still be processed.
92
  std::atomic<bool> stop_requested_{false};
93
94
  std::atomic<RunState> run_state_{RunState::kIdle};
95
96
  // This is set to true immediately before the thread exits.
97
  std::atomic<bool> stopped_{false};
98
99
  // The objects in the queue are owned by the queue and ownership gets tranferred to ProcessItem.
100
  BlockingQueue<T*> queue_;
101
102
  // This mutex/condition combination is used in Stop() in case multiple threads are calling that
103
  // function concurrently. One of them will ask the taskstream thread to stop and wait for it, and
104
  // then will notify other threads that have called Stop().
105
  std::mutex stop_mtx_;
106
  std::condition_variable stop_cond_;
107
108
  std::unique_ptr<ThreadPoolToken> taskstream_pool_token_;
109
  std::function<void(T*)> process_item_;
110
  ThreadIdForStack run_tid_ = 0;
111
112
  // Maximum time to wait for the queue to become non-empty.
113
  const MonoDelta queue_max_wait_;
114
115
  void Run();
116
  void ProcessItem(T* item);
117
};
118
119
template <typename T>
120
TaskStream<T>::TaskStream(std::function<void(T*)> process_item,
121
                          ThreadPool* thread_pool,
122
                          int32_t queue_max_size,
123
                          const MonoDelta& queue_max_wait)
124
    : queue_(queue_max_size),
125
      // run_state_ is responsible for serial execution, so we use concurrent here to avoid
126
      // unnecessary checks in thread pool.
127
      taskstream_pool_token_(thread_pool->NewToken(ThreadPool::ExecutionMode::CONCURRENT)),
128
      process_item_(process_item),
129
89.5k
      queue_max_wait_(queue_max_wait) {
130
89.5k
}
_ZN2yb10TaskStreamIiEC2ENSt3__18functionIFvPiEEEPNS_10ThreadPoolEiRKNS_9MonoDeltaE
Line
Count
Source
129
3
      queue_max_wait_(queue_max_wait) {
130
3
}
_ZN2yb10TaskStreamINS_3log13LogEntryBatchEEC2ENSt3__18functionIFvPS2_EEEPNS_10ThreadPoolEiRKNS_9MonoDeltaE
Line
Count
Source
129
89.5k
      queue_max_wait_(queue_max_wait) {
130
89.5k
}
131
132
template <typename T>
133
48.6k
TaskStream<T>::~TaskStream() {
134
48.6k
  Stop();
135
48.6k
}
_ZN2yb10TaskStreamIiED2Ev
Line
Count
Source
133
3
TaskStream<T>::~TaskStream() {
134
3
  Stop();
135
3
}
_ZN2yb10TaskStreamINS_3log13LogEntryBatchEED2Ev
Line
Count
Source
133
48.6k
TaskStream<T>::~TaskStream() {
134
48.6k
  Stop();
135
48.6k
}
136
137
template <typename T>
138
3
Status TaskStream<T>::Start() {
139
0
  VLOG(1) << "Starting the TaskStream";
140
3
  return Status::OK();
141
3
}
142
143
template <typename T>
144
97.3k
void TaskStream<T>::Stop() {
145
18.4E
  VLOG(1) << "Stopping the TaskStream";
146
97.3k
  auto scope_exit = ScopeExit([] {
147
18.4E
    VLOG(1) << "The TaskStream has stopped";
148
97.3k
  });
_ZZN2yb10TaskStreamIiE4StopEvENKUlvE_clEv
Line
Count
Source
146
6
  auto scope_exit = ScopeExit([] {
147
0
    VLOG(1) << "The TaskStream has stopped";
148
6
  });
_ZZN2yb10TaskStreamINS_3log13LogEntryBatchEE4StopEvENKUlvE_clEv
Line
Count
Source
146
97.3k
  auto scope_exit = ScopeExit([] {
147
18.4E
    VLOG(1) << "The TaskStream has stopped";
148
97.3k
  });
149
97.3k
  queue_.Shutdown();
150
97.3k
  if (stopped_.load(std::memory_order_acquire)) {
151
48.6k
    return;
152
48.6k
  }
153
48.6k
  stop_requested_ = true;
154
48.6k
  {
155
48.6k
    std::unique_lock<std::mutex> stop_lock(stop_mtx_);
156
68.9k
    stop_cond_.wait(stop_lock, [this] {
157
68.9k
      return (run_state_.load(std::memory_order_acquire) == RunState::kIdle && queue_.empty());
158
68.9k
    });
_ZZN2yb10TaskStreamIiE4StopEvENKUlvE0_clEv
Line
Count
Source
156
3
    stop_cond_.wait(stop_lock, [this] {
157
3
      return (run_state_.load(std::memory_order_acquire) == RunState::kIdle && queue_.empty());
158
3
    });
_ZZN2yb10TaskStreamINS_3log13LogEntryBatchEE4StopEvENKUlvE0_clEv
Line
Count
Source
156
68.9k
    stop_cond_.wait(stop_lock, [this] {
157
68.9k
      return (run_state_.load(std::memory_order_acquire) == RunState::kIdle && queue_.empty());
158
68.9k
    });
159
48.6k
  }
160
48.6k
  stopped_.store(true, std::memory_order_release);
161
48.6k
}
_ZN2yb10TaskStreamIiE4StopEv
Line
Count
Source
144
6
void TaskStream<T>::Stop() {
145
0
  VLOG(1) << "Stopping the TaskStream";
146
6
  auto scope_exit = ScopeExit([] {
147
6
    VLOG(1) << "The TaskStream has stopped";
148
6
  });
149
6
  queue_.Shutdown();
150
6
  if (stopped_.load(std::memory_order_acquire)) {
151
3
    return;
152
3
  }
153
3
  stop_requested_ = true;
154
3
  {
155
3
    std::unique_lock<std::mutex> stop_lock(stop_mtx_);
156
3
    stop_cond_.wait(stop_lock, [this] {
157
3
      return (run_state_.load(std::memory_order_acquire) == RunState::kIdle && queue_.empty());
158
3
    });
159
3
  }
160
3
  stopped_.store(true, std::memory_order_release);
161
3
}
_ZN2yb10TaskStreamINS_3log13LogEntryBatchEE4StopEv
Line
Count
Source
144
97.3k
void TaskStream<T>::Stop() {
145
18.4E
  VLOG(1) << "Stopping the TaskStream";
146
97.3k
  auto scope_exit = ScopeExit([] {
147
97.3k
    VLOG(1) << "The TaskStream has stopped";
148
97.3k
  });
149
97.3k
  queue_.Shutdown();
150
97.3k
  if (stopped_.load(std::memory_order_acquire)) {
151
48.6k
    return;
152
48.6k
  }
153
48.6k
  stop_requested_ = true;
154
48.6k
  {
155
48.6k
    std::unique_lock<std::mutex> stop_lock(stop_mtx_);
156
48.6k
    stop_cond_.wait(stop_lock, [this] {
157
48.6k
      return (run_state_.load(std::memory_order_acquire) == RunState::kIdle && queue_.empty());
158
48.6k
    });
159
48.6k
  }
160
48.6k
  stopped_.store(true, std::memory_order_release);
161
48.6k
}
162
163
template <typename T>
164
13.8M
Status TaskStream<T>::Submit(T *task) {
165
13.8M
  if (stop_requested_.load(std::memory_order_acquire)) {
166
0
    return STATUS(IllegalState, "Tablet is shutting down");
167
0
  }
168
13.8M
  if (!queue_.BlockingPut(task)) {
169
0
    return STATUS_FORMAT(ServiceUnavailable,
170
0
                         "TaskStream queue is full (max capacity $0)",
171
0
                         queue_.max_size());
172
0
  }
173
174
13.8M
  RunState expected = RunState::kIdle;
175
13.8M
  if (!run_state_.compare_exchange_strong(
176
13.7M
          expected, RunState::kSubmit, std::memory_order_acq_rel)) {
177
    // run_state_ was not idle, so we are not creating a task to process operations.
178
13.7M
    return Status::OK();
179
13.7M
  }
180
167k
  return taskstream_pool_token_->SubmitFunc(std::bind(&TaskStream::Run, this));
181
167k
}
_ZN2yb10TaskStreamIiE6SubmitEPi
Line
Count
Source
164
13
Status TaskStream<T>::Submit(T *task) {
165
13
  if (stop_requested_.load(std::memory_order_acquire)) {
166
0
    return STATUS(IllegalState, "Tablet is shutting down");
167
0
  }
168
13
  if (!queue_.BlockingPut(task)) {
169
0
    return STATUS_FORMAT(ServiceUnavailable,
170
0
                         "TaskStream queue is full (max capacity $0)",
171
0
                         queue_.max_size());
172
0
  }
173
174
13
  RunState expected = RunState::kIdle;
175
13
  if (!run_state_.compare_exchange_strong(
176
10
          expected, RunState::kSubmit, std::memory_order_acq_rel)) {
177
    // run_state_ was not idle, so we are not creating a task to process operations.
178
10
    return Status::OK();
179
10
  }
180
3
  return taskstream_pool_token_->SubmitFunc(std::bind(&TaskStream::Run, this));
181
3
}
_ZN2yb10TaskStreamINS_3log13LogEntryBatchEE6SubmitEPS2_
Line
Count
Source
164
13.8M
Status TaskStream<T>::Submit(T *task) {
165
13.8M
  if (stop_requested_.load(std::memory_order_acquire)) {
166
0
    return STATUS(IllegalState, "Tablet is shutting down");
167
0
  }
168
13.8M
  if (!queue_.BlockingPut(task)) {
169
0
    return STATUS_FORMAT(ServiceUnavailable,
170
0
                         "TaskStream queue is full (max capacity $0)",
171
0
                         queue_.max_size());
172
0
  }
173
174
13.8M
  RunState expected = RunState::kIdle;
175
13.8M
  if (!run_state_.compare_exchange_strong(
176
13.7M
          expected, RunState::kSubmit, std::memory_order_acq_rel)) {
177
    // run_state_ was not idle, so we are not creating a task to process operations.
178
13.7M
    return Status::OK();
179
13.7M
  }
180
167k
  return taskstream_pool_token_->SubmitFunc(std::bind(&TaskStream::Run, this));
181
167k
}
182
183
template <typename T>
184
13
Status TaskStream<T>::TEST_SubmitFunc(const std::function<void()>& func) {
185
13
  return taskstream_pool_token_->SubmitFunc(func);
186
13
}
187
188
template <typename T>
189
180k
void TaskStream<T>::Run() {
190
1
  VLOG(1) << "Starting taskstream task:" << this;
191
180k
  ChangeRunState(RunState::kSubmit, RunState::kDrain);
192
180k
  run_tid_ = Thread::CurrentThreadIdForStack();
193
14.0M
  for (;;) {
194
14.0M
    MonoTime wait_timeout_deadline = MonoTime::Now() + queue_max_wait_;
195
14.0M
    std::vector<T *> group;
196
14.0M
    queue_.BlockingDrainTo(&group, wait_timeout_deadline);
197
14.0M
    if (!group.empty()) {
198
13.8M
      ChangeRunState(RunState::kDrain, RunState::kProcess);
199
13.9M
      for (T* item : group) {
200
13.9M
        ProcessItem(item);
201
13.9M
      }
202
13.8M
      ChangeRunState(RunState::kProcess, RunState::kComplete);
203
13.8M
      ProcessItem(nullptr);
204
13.8M
      group.clear();
205
13.8M
      ChangeRunState(RunState::kComplete, RunState::kDrain);
206
13.8M
      continue;
207
13.8M
    }
208
179k
    ChangeRunState(RunState::kDrain, RunState::kFinish);
209
    // Not processing and queue empty, return from task.
210
179k
    std::unique_lock<std::mutex> stop_lock(stop_mtx_);
211
179k
    ChangeRunState(RunState::kFinish, RunState::kIdle);
212
179k
    if (!queue_.empty()) {
213
      // Got more operations, try stay in the loop.
214
0
      RunState expected = RunState::kIdle;
215
0
      if (run_state_.compare_exchange_strong(
216
0
              expected, RunState::kDrain, std::memory_order_acq_rel)) {
217
0
        continue;
218
0
      }
219
179k
    }
220
179k
    if (stop_requested_.load(std::memory_order_acquire)) {
221
18.4E
      VLOG(1) << "TaskStream task's Run() function is returning because stop is requested.";
222
20.3k
      stop_cond_.notify_all();
223
20.3k
      return;
224
20.3k
    }
225
8.67k
    VLOG(1) << "Returning from TaskStream task after inactivity:" << this;
226
159k
    return;
227
159k
  }
228
180k
}
_ZN2yb10TaskStreamIiE3RunEv
Line
Count
Source
189
3
void TaskStream<T>::Run() {
190
0
  VLOG(1) << "Starting taskstream task:" << this;
191
3
  ChangeRunState(RunState::kSubmit, RunState::kDrain);
192
3
  run_tid_ = Thread::CurrentThreadIdForStack();
193
6
  for (;;) {
194
6
    MonoTime wait_timeout_deadline = MonoTime::Now() + queue_max_wait_;
195
6
    std::vector<T *> group;
196
6
    queue_.BlockingDrainTo(&group, wait_timeout_deadline);
197
6
    if (!group.empty()) {
198
3
      ChangeRunState(RunState::kDrain, RunState::kProcess);
199
13
      for (T* item : group) {
200
13
        ProcessItem(item);
201
13
      }
202
3
      ChangeRunState(RunState::kProcess, RunState::kComplete);
203
3
      ProcessItem(nullptr);
204
3
      group.clear();
205
3
      ChangeRunState(RunState::kComplete, RunState::kDrain);
206
3
      continue;
207
3
    }
208
3
    ChangeRunState(RunState::kDrain, RunState::kFinish);
209
    // Not processing and queue empty, return from task.
210
3
    std::unique_lock<std::mutex> stop_lock(stop_mtx_);
211
3
    ChangeRunState(RunState::kFinish, RunState::kIdle);
212
3
    if (!queue_.empty()) {
213
      // Got more operations, try stay in the loop.
214
0
      RunState expected = RunState::kIdle;
215
0
      if (run_state_.compare_exchange_strong(
216
0
              expected, RunState::kDrain, std::memory_order_acq_rel)) {
217
0
        continue;
218
0
      }
219
3
    }
220
3
    if (stop_requested_.load(std::memory_order_acquire)) {
221
0
      VLOG(1) << "TaskStream task's Run() function is returning because stop is requested.";
222
0
      stop_cond_.notify_all();
223
0
      return;
224
0
    }
225
0
    VLOG(1) << "Returning from TaskStream task after inactivity:" << this;
226
3
    return;
227
3
  }
228
3
}
_ZN2yb10TaskStreamINS_3log13LogEntryBatchEE3RunEv
Line
Count
Source
189
180k
void TaskStream<T>::Run() {
190
1
  VLOG(1) << "Starting taskstream task:" << this;
191
180k
  ChangeRunState(RunState::kSubmit, RunState::kDrain);
192
180k
  run_tid_ = Thread::CurrentThreadIdForStack();
193
14.0M
  for (;;) {
194
14.0M
    MonoTime wait_timeout_deadline = MonoTime::Now() + queue_max_wait_;
195
14.0M
    std::vector<T *> group;
196
14.0M
    queue_.BlockingDrainTo(&group, wait_timeout_deadline);
197
14.0M
    if (!group.empty()) {
198
13.8M
      ChangeRunState(RunState::kDrain, RunState::kProcess);
199
13.9M
      for (T* item : group) {
200
13.9M
        ProcessItem(item);
201
13.9M
      }
202
13.8M
      ChangeRunState(RunState::kProcess, RunState::kComplete);
203
13.8M
      ProcessItem(nullptr);
204
13.8M
      group.clear();
205
13.8M
      ChangeRunState(RunState::kComplete, RunState::kDrain);
206
13.8M
      continue;
207
13.8M
    }
208
179k
    ChangeRunState(RunState::kDrain, RunState::kFinish);
209
    // Not processing and queue empty, return from task.
210
179k
    std::unique_lock<std::mutex> stop_lock(stop_mtx_);
211
179k
    ChangeRunState(RunState::kFinish, RunState::kIdle);
212
179k
    if (!queue_.empty()) {
213
      // Got more operations, try stay in the loop.
214
0
      RunState expected = RunState::kIdle;
215
0
      if (run_state_.compare_exchange_strong(
216
0
              expected, RunState::kDrain, std::memory_order_acq_rel)) {
217
0
        continue;
218
0
      }
219
179k
    }
220
179k
    if (stop_requested_.load(std::memory_order_acquire)) {
221
18.4E
      VLOG(1) << "TaskStream task's Run() function is returning because stop is requested.";
222
20.3k
      stop_cond_.notify_all();
223
20.3k
      return;
224
20.3k
    }
225
8.67k
    VLOG(1) << "Returning from TaskStream task after inactivity:" << this;
226
159k
    return;
227
159k
  }
228
180k
}
229
230
template <typename T>
231
27.7M
void TaskStream<T>::ProcessItem(T* item) {
232
27.7M
  process_item_(item);
233
27.7M
}
_ZN2yb10TaskStreamIiE11ProcessItemEPi
Line
Count
Source
231
16
void TaskStream<T>::ProcessItem(T* item) {
232
16
  process_item_(item);
233
16
}
_ZN2yb10TaskStreamINS_3log13LogEntryBatchEE11ProcessItemEPS2_
Line
Count
Source
231
27.7M
void TaskStream<T>::ProcessItem(T* item) {
232
27.7M
  process_item_(item);
233
27.7M
}
234
235
}  // namespace yb
236
237
#endif  // YB_UTIL_TASKSTREAM_H