/Users/deen/code/yugabyte-db/src/yb/util/taskstream.h
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | // |
13 | | |
14 | | #ifndef YB_UTIL_TASKSTREAM_H |
15 | | #define YB_UTIL_TASKSTREAM_H |
16 | | |
17 | | #include <atomic> |
18 | | #include <chrono> |
19 | | #include <condition_variable> |
20 | | #include <memory> |
21 | | #include <vector> |
22 | | |
23 | | #include <gflags/gflags.h> |
24 | | |
25 | | #include "yb/util/status_fwd.h" |
26 | | #include "yb/util/blocking_queue.h" |
27 | | #include "yb/util/status_format.h" |
28 | | #include "yb/util/thread.h" |
29 | | #include "yb/util/threadpool.h" |
30 | | |
31 | | using namespace std::chrono_literals; |
32 | | |
33 | | using std::vector; |
34 | | |
35 | | namespace yb { |
36 | | class ThreadPool; |
37 | | class ThreadPoolToken; |
38 | | |
39 | | template <typename T> |
40 | | class TaskStreamImpl; |
41 | | |
42 | | YB_DEFINE_ENUM(TaskStreamRunState, (kIdle)(kSubmit)(kDrain)(kProcess)(kComplete)(kFinish)); |
43 | | |
44 | | template <typename T> |
45 | | // TaskStream has a thread pool token in the given thread pool. |
46 | | // TaskStream does not manage a thread but only submits to the token in the thread pool. |
47 | | // When we submit tasks to the taskstream, it adds tasks to the queue to be processed. |
48 | | // The internal Run function will call the user-provided function, |
49 | | // for each element in the queue in a loop. |
50 | | // When the queue is empty, it calls the user-provided function with no parameter, |
51 | | // to indicate it needs to process the end of the group of tasks processed. |
52 | | // This feature is used for the preparer and appender functionality. |
53 | | class TaskStream { |
54 | | public: |
55 | | explicit TaskStream(std::function<void(T*)> process_item, |
56 | | ThreadPool* thread_pool, |
57 | | int32_t queue_max_size, |
58 | | const MonoDelta& queue_max_wait); |
59 | | ~TaskStream(); |
60 | | |
61 | | CHECKED_STATUS Start(); |
62 | | void Stop(); |
63 | | |
64 | | CHECKED_STATUS Submit(T* item); |
65 | | |
66 | | CHECKED_STATUS TEST_SubmitFunc(const std::function<void()>& func); |
67 | | |
68 | 0 | std::string GetRunThreadStack() { |
69 | 0 | auto result = ThreadStack(run_tid_); |
70 | 0 | if (!result.ok()) { |
71 | 0 | return result.status().ToString(); |
72 | 0 | } |
73 | 0 | return (*result).Symbolize(); |
74 | 0 | } |
75 | | |
76 | 0 | std::string ToString() const { |
77 | 0 | return YB_CLASS_TO_STRING(queue, run_state, stopped, stop_requested); |
78 | 0 | } |
79 | | |
80 | | private: |
81 | | using RunState = TaskStreamRunState; |
82 | | |
83 | 42.0M | void ChangeRunState(RunState expected_old_state, RunState new_state) { |
84 | 42.0M | auto old_state = run_state_.exchange(new_state, std::memory_order_acq_rel); |
85 | 18.4E | LOG_IF(DFATAL, old_state != expected_old_state) |
86 | 18.4E | << "Task stream was in wrong state " << old_state << " while " |
87 | 18.4E | << expected_old_state << " was expected"; |
88 | 42.0M | } _ZN2yb10TaskStreamIiE14ChangeRunStateENS_18TaskStreamRunStateES2_ Line | Count | Source | 83 | 18 | void ChangeRunState(RunState expected_old_state, RunState new_state) { | 84 | 18 | auto old_state = run_state_.exchange(new_state, std::memory_order_acq_rel); | 85 | 0 | LOG_IF(DFATAL, old_state != expected_old_state) | 86 | 0 | << "Task stream was in wrong state " << old_state << " while " | 87 | 0 | << expected_old_state << " was expected"; | 88 | 18 | } |
_ZN2yb10TaskStreamINS_3log13LogEntryBatchEE14ChangeRunStateENS_18TaskStreamRunStateES4_ Line | Count | Source | 83 | 42.0M | void ChangeRunState(RunState expected_old_state, RunState new_state) { | 84 | 42.0M | auto old_state = run_state_.exchange(new_state, std::memory_order_acq_rel); | 85 | 18.4E | LOG_IF(DFATAL, old_state != expected_old_state) | 86 | 18.4E | << "Task stream was in wrong state " << old_state << " while " | 87 | 18.4E | << expected_old_state << " was expected"; | 88 | 42.0M | } |
|
89 | | |
90 | | // We set this to true to tell the Run function to return. No new tasks will be accepted, but |
91 | | // existing tasks will still be processed. |
92 | | std::atomic<bool> stop_requested_{false}; |
93 | | |
94 | | std::atomic<RunState> run_state_{RunState::kIdle}; |
95 | | |
96 | | // This is set to true immediately before the thread exits. |
97 | | std::atomic<bool> stopped_{false}; |
98 | | |
99 | | // The objects in the queue are owned by the queue and ownership gets tranferred to ProcessItem. |
100 | | BlockingQueue<T*> queue_; |
101 | | |
102 | | // This mutex/condition combination is used in Stop() in case multiple threads are calling that |
103 | | // function concurrently. One of them will ask the taskstream thread to stop and wait for it, and |
104 | | // then will notify other threads that have called Stop(). |
105 | | std::mutex stop_mtx_; |
106 | | std::condition_variable stop_cond_; |
107 | | |
108 | | std::unique_ptr<ThreadPoolToken> taskstream_pool_token_; |
109 | | std::function<void(T*)> process_item_; |
110 | | ThreadIdForStack run_tid_ = 0; |
111 | | |
112 | | // Maximum time to wait for the queue to become non-empty. |
113 | | const MonoDelta queue_max_wait_; |
114 | | |
115 | | void Run(); |
116 | | void ProcessItem(T* item); |
117 | | }; |
118 | | |
119 | | template <typename T> |
120 | | TaskStream<T>::TaskStream(std::function<void(T*)> process_item, |
121 | | ThreadPool* thread_pool, |
122 | | int32_t queue_max_size, |
123 | | const MonoDelta& queue_max_wait) |
124 | | : queue_(queue_max_size), |
125 | | // run_state_ is responsible for serial execution, so we use concurrent here to avoid |
126 | | // unnecessary checks in thread pool. |
127 | | taskstream_pool_token_(thread_pool->NewToken(ThreadPool::ExecutionMode::CONCURRENT)), |
128 | | process_item_(process_item), |
129 | 89.5k | queue_max_wait_(queue_max_wait) { |
130 | 89.5k | } _ZN2yb10TaskStreamIiEC2ENSt3__18functionIFvPiEEEPNS_10ThreadPoolEiRKNS_9MonoDeltaE Line | Count | Source | 129 | 3 | queue_max_wait_(queue_max_wait) { | 130 | 3 | } |
_ZN2yb10TaskStreamINS_3log13LogEntryBatchEEC2ENSt3__18functionIFvPS2_EEEPNS_10ThreadPoolEiRKNS_9MonoDeltaE Line | Count | Source | 129 | 89.5k | queue_max_wait_(queue_max_wait) { | 130 | 89.5k | } |
|
131 | | |
132 | | template <typename T> |
133 | 48.6k | TaskStream<T>::~TaskStream() { |
134 | 48.6k | Stop(); |
135 | 48.6k | } _ZN2yb10TaskStreamIiED2Ev Line | Count | Source | 133 | 3 | TaskStream<T>::~TaskStream() { | 134 | 3 | Stop(); | 135 | 3 | } |
_ZN2yb10TaskStreamINS_3log13LogEntryBatchEED2Ev Line | Count | Source | 133 | 48.6k | TaskStream<T>::~TaskStream() { | 134 | 48.6k | Stop(); | 135 | 48.6k | } |
|
136 | | |
137 | | template <typename T> |
138 | 3 | Status TaskStream<T>::Start() { |
139 | 0 | VLOG(1) << "Starting the TaskStream"; |
140 | 3 | return Status::OK(); |
141 | 3 | } |
142 | | |
143 | | template <typename T> |
144 | 97.3k | void TaskStream<T>::Stop() { |
145 | 18.4E | VLOG(1) << "Stopping the TaskStream"; |
146 | 97.3k | auto scope_exit = ScopeExit([] { |
147 | 18.4E | VLOG(1) << "The TaskStream has stopped"; |
148 | 97.3k | }); _ZZN2yb10TaskStreamIiE4StopEvENKUlvE_clEv Line | Count | Source | 146 | 6 | auto scope_exit = ScopeExit([] { | 147 | 0 | VLOG(1) << "The TaskStream has stopped"; | 148 | 6 | }); |
_ZZN2yb10TaskStreamINS_3log13LogEntryBatchEE4StopEvENKUlvE_clEv Line | Count | Source | 146 | 97.3k | auto scope_exit = ScopeExit([] { | 147 | 18.4E | VLOG(1) << "The TaskStream has stopped"; | 148 | 97.3k | }); |
|
149 | 97.3k | queue_.Shutdown(); |
150 | 97.3k | if (stopped_.load(std::memory_order_acquire)) { |
151 | 48.6k | return; |
152 | 48.6k | } |
153 | 48.6k | stop_requested_ = true; |
154 | 48.6k | { |
155 | 48.6k | std::unique_lock<std::mutex> stop_lock(stop_mtx_); |
156 | 68.9k | stop_cond_.wait(stop_lock, [this] { |
157 | 68.9k | return (run_state_.load(std::memory_order_acquire) == RunState::kIdle && queue_.empty()); |
158 | 68.9k | }); _ZZN2yb10TaskStreamIiE4StopEvENKUlvE0_clEv Line | Count | Source | 156 | 3 | stop_cond_.wait(stop_lock, [this] { | 157 | 3 | return (run_state_.load(std::memory_order_acquire) == RunState::kIdle && queue_.empty()); | 158 | 3 | }); |
_ZZN2yb10TaskStreamINS_3log13LogEntryBatchEE4StopEvENKUlvE0_clEv Line | Count | Source | 156 | 68.9k | stop_cond_.wait(stop_lock, [this] { | 157 | 68.9k | return (run_state_.load(std::memory_order_acquire) == RunState::kIdle && queue_.empty()); | 158 | 68.9k | }); |
|
159 | 48.6k | } |
160 | 48.6k | stopped_.store(true, std::memory_order_release); |
161 | 48.6k | } _ZN2yb10TaskStreamIiE4StopEv Line | Count | Source | 144 | 6 | void TaskStream<T>::Stop() { | 145 | 0 | VLOG(1) << "Stopping the TaskStream"; | 146 | 6 | auto scope_exit = ScopeExit([] { | 147 | 6 | VLOG(1) << "The TaskStream has stopped"; | 148 | 6 | }); | 149 | 6 | queue_.Shutdown(); | 150 | 6 | if (stopped_.load(std::memory_order_acquire)) { | 151 | 3 | return; | 152 | 3 | } | 153 | 3 | stop_requested_ = true; | 154 | 3 | { | 155 | 3 | std::unique_lock<std::mutex> stop_lock(stop_mtx_); | 156 | 3 | stop_cond_.wait(stop_lock, [this] { | 157 | 3 | return (run_state_.load(std::memory_order_acquire) == RunState::kIdle && queue_.empty()); | 158 | 3 | }); | 159 | 3 | } | 160 | 3 | stopped_.store(true, std::memory_order_release); | 161 | 3 | } |
_ZN2yb10TaskStreamINS_3log13LogEntryBatchEE4StopEv Line | Count | Source | 144 | 97.3k | void TaskStream<T>::Stop() { | 145 | 18.4E | VLOG(1) << "Stopping the TaskStream"; | 146 | 97.3k | auto scope_exit = ScopeExit([] { | 147 | 97.3k | VLOG(1) << "The TaskStream has stopped"; | 148 | 97.3k | }); | 149 | 97.3k | queue_.Shutdown(); | 150 | 97.3k | if (stopped_.load(std::memory_order_acquire)) { | 151 | 48.6k | return; | 152 | 48.6k | } | 153 | 48.6k | stop_requested_ = true; | 154 | 48.6k | { | 155 | 48.6k | std::unique_lock<std::mutex> stop_lock(stop_mtx_); | 156 | 48.6k | stop_cond_.wait(stop_lock, [this] { | 157 | 48.6k | return (run_state_.load(std::memory_order_acquire) == RunState::kIdle && queue_.empty()); | 158 | 48.6k | }); | 159 | 48.6k | } | 160 | 48.6k | stopped_.store(true, std::memory_order_release); | 161 | 48.6k | } |
|
162 | | |
163 | | template <typename T> |
164 | 13.8M | Status TaskStream<T>::Submit(T *task) { |
165 | 13.8M | if (stop_requested_.load(std::memory_order_acquire)) { |
166 | 0 | return STATUS(IllegalState, "Tablet is shutting down"); |
167 | 0 | } |
168 | 13.8M | if (!queue_.BlockingPut(task)) { |
169 | 0 | return STATUS_FORMAT(ServiceUnavailable, |
170 | 0 | "TaskStream queue is full (max capacity $0)", |
171 | 0 | queue_.max_size()); |
172 | 0 | } |
173 | | |
174 | 13.8M | RunState expected = RunState::kIdle; |
175 | 13.8M | if (!run_state_.compare_exchange_strong( |
176 | 13.7M | expected, RunState::kSubmit, std::memory_order_acq_rel)) { |
177 | | // run_state_ was not idle, so we are not creating a task to process operations. |
178 | 13.7M | return Status::OK(); |
179 | 13.7M | } |
180 | 167k | return taskstream_pool_token_->SubmitFunc(std::bind(&TaskStream::Run, this)); |
181 | 167k | } _ZN2yb10TaskStreamIiE6SubmitEPi Line | Count | Source | 164 | 13 | Status TaskStream<T>::Submit(T *task) { | 165 | 13 | if (stop_requested_.load(std::memory_order_acquire)) { | 166 | 0 | return STATUS(IllegalState, "Tablet is shutting down"); | 167 | 0 | } | 168 | 13 | if (!queue_.BlockingPut(task)) { | 169 | 0 | return STATUS_FORMAT(ServiceUnavailable, | 170 | 0 | "TaskStream queue is full (max capacity $0)", | 171 | 0 | queue_.max_size()); | 172 | 0 | } | 173 | | | 174 | 13 | RunState expected = RunState::kIdle; | 175 | 13 | if (!run_state_.compare_exchange_strong( | 176 | 10 | expected, RunState::kSubmit, std::memory_order_acq_rel)) { | 177 | | // run_state_ was not idle, so we are not creating a task to process operations. | 178 | 10 | return Status::OK(); | 179 | 10 | } | 180 | 3 | return taskstream_pool_token_->SubmitFunc(std::bind(&TaskStream::Run, this)); | 181 | 3 | } |
_ZN2yb10TaskStreamINS_3log13LogEntryBatchEE6SubmitEPS2_ Line | Count | Source | 164 | 13.8M | Status TaskStream<T>::Submit(T *task) { | 165 | 13.8M | if (stop_requested_.load(std::memory_order_acquire)) { | 166 | 0 | return STATUS(IllegalState, "Tablet is shutting down"); | 167 | 0 | } | 168 | 13.8M | if (!queue_.BlockingPut(task)) { | 169 | 0 | return STATUS_FORMAT(ServiceUnavailable, | 170 | 0 | "TaskStream queue is full (max capacity $0)", | 171 | 0 | queue_.max_size()); | 172 | 0 | } | 173 | | | 174 | 13.8M | RunState expected = RunState::kIdle; | 175 | 13.8M | if (!run_state_.compare_exchange_strong( | 176 | 13.7M | expected, RunState::kSubmit, std::memory_order_acq_rel)) { | 177 | | // run_state_ was not idle, so we are not creating a task to process operations. | 178 | 13.7M | return Status::OK(); | 179 | 13.7M | } | 180 | 167k | return taskstream_pool_token_->SubmitFunc(std::bind(&TaskStream::Run, this)); | 181 | 167k | } |
|
182 | | |
183 | | template <typename T> |
184 | 13 | Status TaskStream<T>::TEST_SubmitFunc(const std::function<void()>& func) { |
185 | 13 | return taskstream_pool_token_->SubmitFunc(func); |
186 | 13 | } |
187 | | |
188 | | template <typename T> |
189 | 180k | void TaskStream<T>::Run() { |
190 | 1 | VLOG(1) << "Starting taskstream task:" << this; |
191 | 180k | ChangeRunState(RunState::kSubmit, RunState::kDrain); |
192 | 180k | run_tid_ = Thread::CurrentThreadIdForStack(); |
193 | 14.0M | for (;;) { |
194 | 14.0M | MonoTime wait_timeout_deadline = MonoTime::Now() + queue_max_wait_; |
195 | 14.0M | std::vector<T *> group; |
196 | 14.0M | queue_.BlockingDrainTo(&group, wait_timeout_deadline); |
197 | 14.0M | if (!group.empty()) { |
198 | 13.8M | ChangeRunState(RunState::kDrain, RunState::kProcess); |
199 | 13.9M | for (T* item : group) { |
200 | 13.9M | ProcessItem(item); |
201 | 13.9M | } |
202 | 13.8M | ChangeRunState(RunState::kProcess, RunState::kComplete); |
203 | 13.8M | ProcessItem(nullptr); |
204 | 13.8M | group.clear(); |
205 | 13.8M | ChangeRunState(RunState::kComplete, RunState::kDrain); |
206 | 13.8M | continue; |
207 | 13.8M | } |
208 | 179k | ChangeRunState(RunState::kDrain, RunState::kFinish); |
209 | | // Not processing and queue empty, return from task. |
210 | 179k | std::unique_lock<std::mutex> stop_lock(stop_mtx_); |
211 | 179k | ChangeRunState(RunState::kFinish, RunState::kIdle); |
212 | 179k | if (!queue_.empty()) { |
213 | | // Got more operations, try stay in the loop. |
214 | 0 | RunState expected = RunState::kIdle; |
215 | 0 | if (run_state_.compare_exchange_strong( |
216 | 0 | expected, RunState::kDrain, std::memory_order_acq_rel)) { |
217 | 0 | continue; |
218 | 0 | } |
219 | 179k | } |
220 | 179k | if (stop_requested_.load(std::memory_order_acquire)) { |
221 | 18.4E | VLOG(1) << "TaskStream task's Run() function is returning because stop is requested."; |
222 | 20.3k | stop_cond_.notify_all(); |
223 | 20.3k | return; |
224 | 20.3k | } |
225 | 8.67k | VLOG(1) << "Returning from TaskStream task after inactivity:" << this; |
226 | 159k | return; |
227 | 159k | } |
228 | 180k | } _ZN2yb10TaskStreamIiE3RunEv Line | Count | Source | 189 | 3 | void TaskStream<T>::Run() { | 190 | 0 | VLOG(1) << "Starting taskstream task:" << this; | 191 | 3 | ChangeRunState(RunState::kSubmit, RunState::kDrain); | 192 | 3 | run_tid_ = Thread::CurrentThreadIdForStack(); | 193 | 6 | for (;;) { | 194 | 6 | MonoTime wait_timeout_deadline = MonoTime::Now() + queue_max_wait_; | 195 | 6 | std::vector<T *> group; | 196 | 6 | queue_.BlockingDrainTo(&group, wait_timeout_deadline); | 197 | 6 | if (!group.empty()) { | 198 | 3 | ChangeRunState(RunState::kDrain, RunState::kProcess); | 199 | 13 | for (T* item : group) { | 200 | 13 | ProcessItem(item); | 201 | 13 | } | 202 | 3 | ChangeRunState(RunState::kProcess, RunState::kComplete); | 203 | 3 | ProcessItem(nullptr); | 204 | 3 | group.clear(); | 205 | 3 | ChangeRunState(RunState::kComplete, RunState::kDrain); | 206 | 3 | continue; | 207 | 3 | } | 208 | 3 | ChangeRunState(RunState::kDrain, RunState::kFinish); | 209 | | // Not processing and queue empty, return from task. | 210 | 3 | std::unique_lock<std::mutex> stop_lock(stop_mtx_); | 211 | 3 | ChangeRunState(RunState::kFinish, RunState::kIdle); | 212 | 3 | if (!queue_.empty()) { | 213 | | // Got more operations, try stay in the loop. | 214 | 0 | RunState expected = RunState::kIdle; | 215 | 0 | if (run_state_.compare_exchange_strong( | 216 | 0 | expected, RunState::kDrain, std::memory_order_acq_rel)) { | 217 | 0 | continue; | 218 | 0 | } | 219 | 3 | } | 220 | 3 | if (stop_requested_.load(std::memory_order_acquire)) { | 221 | 0 | VLOG(1) << "TaskStream task's Run() function is returning because stop is requested."; | 222 | 0 | stop_cond_.notify_all(); | 223 | 0 | return; | 224 | 0 | } | 225 | 0 | VLOG(1) << "Returning from TaskStream task after inactivity:" << this; | 226 | 3 | return; | 227 | 3 | } | 228 | 3 | } |
_ZN2yb10TaskStreamINS_3log13LogEntryBatchEE3RunEv Line | Count | Source | 189 | 180k | void TaskStream<T>::Run() { | 190 | 1 | VLOG(1) << "Starting taskstream task:" << this; | 191 | 180k | ChangeRunState(RunState::kSubmit, RunState::kDrain); | 192 | 180k | run_tid_ = Thread::CurrentThreadIdForStack(); | 193 | 14.0M | for (;;) { | 194 | 14.0M | MonoTime wait_timeout_deadline = MonoTime::Now() + queue_max_wait_; | 195 | 14.0M | std::vector<T *> group; | 196 | 14.0M | queue_.BlockingDrainTo(&group, wait_timeout_deadline); | 197 | 14.0M | if (!group.empty()) { | 198 | 13.8M | ChangeRunState(RunState::kDrain, RunState::kProcess); | 199 | 13.9M | for (T* item : group) { | 200 | 13.9M | ProcessItem(item); | 201 | 13.9M | } | 202 | 13.8M | ChangeRunState(RunState::kProcess, RunState::kComplete); | 203 | 13.8M | ProcessItem(nullptr); | 204 | 13.8M | group.clear(); | 205 | 13.8M | ChangeRunState(RunState::kComplete, RunState::kDrain); | 206 | 13.8M | continue; | 207 | 13.8M | } | 208 | 179k | ChangeRunState(RunState::kDrain, RunState::kFinish); | 209 | | // Not processing and queue empty, return from task. | 210 | 179k | std::unique_lock<std::mutex> stop_lock(stop_mtx_); | 211 | 179k | ChangeRunState(RunState::kFinish, RunState::kIdle); | 212 | 179k | if (!queue_.empty()) { | 213 | | // Got more operations, try stay in the loop. | 214 | 0 | RunState expected = RunState::kIdle; | 215 | 0 | if (run_state_.compare_exchange_strong( | 216 | 0 | expected, RunState::kDrain, std::memory_order_acq_rel)) { | 217 | 0 | continue; | 218 | 0 | } | 219 | 179k | } | 220 | 179k | if (stop_requested_.load(std::memory_order_acquire)) { | 221 | 18.4E | VLOG(1) << "TaskStream task's Run() function is returning because stop is requested."; | 222 | 20.3k | stop_cond_.notify_all(); | 223 | 20.3k | return; | 224 | 20.3k | } | 225 | 8.67k | VLOG(1) << "Returning from TaskStream task after inactivity:" << this; | 226 | 159k | return; | 227 | 159k | } | 228 | 180k | } |
|
229 | | |
230 | | template <typename T> |
231 | 27.7M | void TaskStream<T>::ProcessItem(T* item) { |
232 | 27.7M | process_item_(item); |
233 | 27.7M | } _ZN2yb10TaskStreamIiE11ProcessItemEPi Line | Count | Source | 231 | 16 | void TaskStream<T>::ProcessItem(T* item) { | 232 | 16 | process_item_(item); | 233 | 16 | } |
_ZN2yb10TaskStreamINS_3log13LogEntryBatchEE11ProcessItemEPS2_ Line | Count | Source | 231 | 27.7M | void TaskStream<T>::ProcessItem(T* item) { | 232 | 27.7M | process_item_(item); | 233 | 27.7M | } |
|
234 | | |
235 | | } // namespace yb |
236 | | |
237 | | #endif // YB_UTIL_TASKSTREAM_H |