/Users/deen/code/yugabyte-db/src/yb/util/taskstream.h
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | // |
13 | | |
14 | | #ifndef YB_UTIL_TASKSTREAM_H |
15 | | #define YB_UTIL_TASKSTREAM_H |
16 | | |
17 | | #include <atomic> |
18 | | #include <chrono> |
19 | | #include <condition_variable> |
20 | | #include <memory> |
21 | | #include <vector> |
22 | | |
23 | | #include <gflags/gflags.h> |
24 | | |
25 | | #include "yb/util/status_fwd.h" |
26 | | #include "yb/util/blocking_queue.h" |
27 | | #include "yb/util/status_format.h" |
28 | | #include "yb/util/thread.h" |
29 | | #include "yb/util/threadpool.h" |
30 | | |
31 | | using namespace std::chrono_literals; |
32 | | |
33 | | using std::vector; |
34 | | |
35 | | namespace yb { |
36 | | class ThreadPool; |
37 | | class ThreadPoolToken; |
38 | | |
39 | | template <typename T> |
40 | | class TaskStreamImpl; |
41 | | |
42 | | YB_DEFINE_ENUM(TaskStreamRunState, (kIdle)(kSubmit)(kDrain)(kProcess)(kComplete)(kFinish)); |
43 | | |
44 | | template <typename T> |
45 | | // TaskStream has a thread pool token in the given thread pool. |
46 | | // TaskStream does not manage a thread but only submits to the token in the thread pool. |
47 | | // When we submit tasks to the taskstream, it adds tasks to the queue to be processed. |
48 | | // The internal Run function will call the user-provided function, |
49 | | // for each element in the queue in a loop. |
50 | | // When the queue is empty, it calls the user-provided function with no parameter, |
51 | | // to indicate it needs to process the end of the group of tasks processed. |
52 | | // This feature is used for the preparer and appender functionality. |
53 | | class TaskStream { |
54 | | public: |
55 | | explicit TaskStream(std::function<void(T*)> process_item, |
56 | | ThreadPool* thread_pool, |
57 | | int32_t queue_max_size, |
58 | | const MonoDelta& queue_max_wait); |
59 | | ~TaskStream(); |
60 | | |
61 | | CHECKED_STATUS Start(); |
62 | | void Stop(); |
63 | | |
64 | | CHECKED_STATUS Submit(T* item); |
65 | | |
66 | | CHECKED_STATUS TEST_SubmitFunc(const std::function<void()>& func); |
67 | | |
68 | 0 | std::string GetRunThreadStack() { |
69 | 0 | auto result = ThreadStack(run_tid_); |
70 | 0 | if (!result.ok()) { |
71 | 0 | return result.status().ToString(); |
72 | 0 | } |
73 | 0 | return (*result).Symbolize(); |
74 | 0 | } |
75 | | |
76 | 0 | std::string ToString() const { |
77 | 0 | return YB_CLASS_TO_STRING(queue, run_state, stopped, stop_requested); |
78 | 0 | } |
79 | | |
80 | | private: |
81 | | using RunState = TaskStreamRunState; |
82 | | |
83 | 75.7M | void ChangeRunState(RunState expected_old_state, RunState new_state) { |
84 | 75.7M | auto old_state = run_state_.exchange(new_state, std::memory_order_acq_rel); |
85 | 18.4E | LOG_IF(DFATAL, old_state != expected_old_state) |
86 | 18.4E | << "Task stream was in wrong state " << old_state << " while " |
87 | 18.4E | << expected_old_state << " was expected"; |
88 | 75.7M | } |
89 | | |
90 | | // We set this to true to tell the Run function to return. No new tasks will be accepted, but |
91 | | // existing tasks will still be processed. |
92 | | std::atomic<bool> stop_requested_{false}; |
93 | | |
94 | | std::atomic<RunState> run_state_{RunState::kIdle}; |
95 | | |
96 | | // This is set to true immediately before the thread exits. |
97 | | std::atomic<bool> stopped_{false}; |
98 | | |
99 | | // The objects in the queue are owned by the queue and ownership gets tranferred to ProcessItem. |
100 | | BlockingQueue<T*> queue_; |
101 | | |
102 | | // This mutex/condition combination is used in Stop() in case multiple threads are calling that |
103 | | // function concurrently. One of them will ask the taskstream thread to stop and wait for it, and |
104 | | // then will notify other threads that have called Stop(). |
105 | | std::mutex stop_mtx_; |
106 | | std::condition_variable stop_cond_; |
107 | | |
108 | | std::unique_ptr<ThreadPoolToken> taskstream_pool_token_; |
109 | | std::function<void(T*)> process_item_; |
110 | | ThreadIdForStack run_tid_ = 0; |
111 | | |
112 | | // Maximum time to wait for the queue to become non-empty. |
113 | | const MonoDelta queue_max_wait_; |
114 | | |
115 | | void Run(); |
116 | | void ProcessItem(T* item); |
117 | | }; |
118 | | |
119 | | template <typename T> |
120 | | TaskStream<T>::TaskStream(std::function<void(T*)> process_item, |
121 | | ThreadPool* thread_pool, |
122 | | int32_t queue_max_size, |
123 | | const MonoDelta& queue_max_wait) |
124 | | : queue_(queue_max_size), |
125 | | // run_state_ is responsible for serial execution, so we use concurrent here to avoid |
126 | | // unnecessary checks in thread pool. |
127 | | taskstream_pool_token_(thread_pool->NewToken(ThreadPool::ExecutionMode::CONCURRENT)), |
128 | | process_item_(process_item), |
129 | 150k | queue_max_wait_(queue_max_wait) { |
130 | 150k | } |
131 | | |
132 | | template <typename T> |
133 | 76.4k | TaskStream<T>::~TaskStream() { |
134 | 76.4k | Stop(); |
135 | 76.4k | } |
136 | | |
137 | | template <typename T> |
138 | | Status TaskStream<T>::Start() { |
139 | | VLOG(1) << "Starting the TaskStream"; |
140 | | return Status::OK(); |
141 | | } |
142 | | |
143 | | template <typename T> |
144 | 152k | void TaskStream<T>::Stop() { |
145 | 18.4E | VLOG(1) << "Stopping the TaskStream"; |
146 | 152k | auto scope_exit = ScopeExit([] { |
147 | 152k | VLOG(1) << "The TaskStream has stopped"1 ; |
148 | 152k | }); |
149 | 152k | queue_.Shutdown(); |
150 | 152k | if (stopped_.load(std::memory_order_acquire)) { |
151 | 76.5k | return; |
152 | 76.5k | } |
153 | 76.4k | stop_requested_ = true; |
154 | 76.4k | { |
155 | 76.4k | std::unique_lock<std::mutex> stop_lock(stop_mtx_); |
156 | 107k | stop_cond_.wait(stop_lock, [this] { |
157 | 107k | return (run_state_.load(std::memory_order_acquire) == RunState::kIdle && queue_.empty()76.4k ); |
158 | 107k | }); |
159 | 76.4k | } |
160 | 76.4k | stopped_.store(true, std::memory_order_release); |
161 | 76.4k | } |
162 | | |
163 | | template <typename T> |
164 | 25.0M | Status TaskStream<T>::Submit(T *task) { |
165 | 25.0M | if (stop_requested_.load(std::memory_order_acquire)) { |
166 | 0 | return STATUS(IllegalState, "Tablet is shutting down"); |
167 | 0 | } |
168 | 25.0M | if (!queue_.BlockingPut(task)) { |
169 | 0 | return STATUS_FORMAT(ServiceUnavailable, |
170 | 0 | "TaskStream queue is full (max capacity $0)", |
171 | 0 | queue_.max_size()); |
172 | 0 | } |
173 | | |
174 | 25.0M | RunState expected = RunState::kIdle; |
175 | 25.0M | if (!run_state_.compare_exchange_strong( |
176 | 25.0M | expected, RunState::kSubmit, std::memory_order_acq_rel)) { |
177 | | // run_state_ was not idle, so we are not creating a task to process operations. |
178 | 24.7M | return Status::OK(); |
179 | 24.7M | } |
180 | 333k | return taskstream_pool_token_->SubmitFunc(std::bind(&TaskStream::Run, this)); |
181 | 25.0M | } |
182 | | |
183 | | template <typename T> |
184 | 13 | Status TaskStream<T>::TEST_SubmitFunc(const std::function<void()>& func) { |
185 | 13 | return taskstream_pool_token_->SubmitFunc(func); |
186 | 13 | } |
187 | | |
188 | | template <typename T> |
189 | 333k | void TaskStream<T>::Run() { |
190 | 333k | VLOG(1) << "Starting taskstream task:" << this6 ; |
191 | 333k | ChangeRunState(RunState::kSubmit, RunState::kDrain); |
192 | 333k | run_tid_ = Thread::CurrentThreadIdForStack(); |
193 | 25.2M | for (;;) { |
194 | 25.2M | MonoTime wait_timeout_deadline = MonoTime::Now() + queue_max_wait_; |
195 | 25.2M | std::vector<T *> group; |
196 | 25.2M | queue_.BlockingDrainTo(&group, wait_timeout_deadline); |
197 | 25.2M | if (!group.empty()) { |
198 | 24.9M | ChangeRunState(RunState::kDrain, RunState::kProcess); |
199 | 25.0M | for (T* item : group) { |
200 | 25.0M | ProcessItem(item); |
201 | 25.0M | } |
202 | 24.9M | ChangeRunState(RunState::kProcess, RunState::kComplete); |
203 | 24.9M | ProcessItem(nullptr); |
204 | 24.9M | group.clear(); |
205 | 24.9M | ChangeRunState(RunState::kComplete, RunState::kDrain); |
206 | 24.9M | continue; |
207 | 24.9M | } |
208 | 333k | ChangeRunState(RunState::kDrain, RunState::kFinish); |
209 | | // Not processing and queue empty, return from task. |
210 | 333k | std::unique_lock<std::mutex> stop_lock(stop_mtx_); |
211 | 333k | ChangeRunState(RunState::kFinish, RunState::kIdle); |
212 | 333k | if (!queue_.empty()) { |
213 | | // Got more operations, try stay in the loop. |
214 | 5 | RunState expected = RunState::kIdle; |
215 | 5 | if (run_state_.compare_exchange_strong( |
216 | 5 | expected, RunState::kDrain, std::memory_order_acq_rel)) { |
217 | 5 | continue; |
218 | 5 | } |
219 | 5 | } |
220 | 333k | if (stop_requested_.load(std::memory_order_acquire)) { |
221 | 18.4E | VLOG(1) << "TaskStream task's Run() function is returning because stop is requested."; |
222 | 30.6k | stop_cond_.notify_all(); |
223 | 30.6k | return; |
224 | 30.6k | } |
225 | 303k | VLOG(1) << "Returning from TaskStream task after inactivity:" << this21.2k ; |
226 | 303k | return; |
227 | 333k | } |
228 | 333k | } |
229 | | |
230 | | template <typename T> |
231 | 50.0M | void TaskStream<T>::ProcessItem(T* item) { |
232 | 50.0M | process_item_(item); |
233 | 50.0M | } |
234 | | |
235 | | } // namespace yb |
236 | | |
237 | | #endif // YB_UTIL_TASKSTREAM_H |