/Users/deen/code/yugabyte-db/src/yb/tablet/preparer.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | // |
13 | | |
14 | | #include "yb/tablet/preparer.h" |
15 | | |
16 | | #include <atomic> |
17 | | #include <condition_variable> |
18 | | #include <memory> |
19 | | #include <mutex> |
20 | | #include <thread> |
21 | | #include <vector> |
22 | | |
23 | | #include <boost/range/iterator_range_core.hpp> |
24 | | #include <gflags/gflags.h> |
25 | | |
26 | | #include "yb/consensus/consensus.h" |
27 | | |
28 | | #include "yb/gutil/macros.h" |
29 | | |
30 | | #include "yb/tablet/operations/operation_driver.h" |
31 | | |
32 | | #include "yb/util/flag_tags.h" |
33 | | #include "yb/util/lockfree.h" |
34 | | #include "yb/util/logging.h" |
35 | | #include "yb/util/threadpool.h" |
36 | | |
37 | | DEFINE_uint64(max_group_replicate_batch_size, 16, |
38 | | "Maximum number of operations to submit to consensus for replication in a batch."); |
39 | | |
40 | | DEFINE_test_flag(int32, preparer_batch_inject_latency_ms, 0, |
41 | | "Inject latency before replicating batch."); |
42 | | |
43 | | using namespace std::literals; |
44 | | using std::vector; |
45 | | |
46 | | namespace yb { |
47 | | class ThreadPool; |
48 | | class ThreadPoolToken; |
49 | | |
50 | | namespace tablet { |
51 | | |
52 | | // ------------------------------------------------------------------------------------------------ |
53 | | // PreparerImpl |
54 | | |
55 | | class PreparerImpl { |
56 | | public: |
57 | | explicit PreparerImpl(consensus::Consensus* consensus, ThreadPool* tablet_prepare_pool); |
58 | | ~PreparerImpl(); |
59 | | CHECKED_STATUS Start(); |
60 | | void Stop(); |
61 | | |
62 | | CHECKED_STATUS Submit(OperationDriver* operation_driver); |
63 | | |
64 | 0 | ThreadPoolToken* PoolToken() { |
65 | 0 | return tablet_prepare_pool_token_.get(); |
66 | 0 | } |
67 | | |
68 | | private: |
69 | | using OperationDrivers = std::vector<OperationDriver*>; |
70 | | |
71 | | consensus::Consensus* const consensus_; |
72 | | |
73 | | // We set this to true to tell the Run function to return. No new tasks will be accepted, but |
74 | | // existing tasks will still be processed. |
75 | | std::atomic<bool> stop_requested_{false}; |
76 | | |
77 | | // If true, a task is running for this tablet already. |
78 | | // If false, no tasks are running for this tablet, |
79 | | // and we can submit a task to the thread pool token. |
80 | | std::atomic<bool> running_{false}; |
81 | | |
82 | | // This is set to true immediately before the thread exits. |
83 | | std::atomic<bool> stopped_{false}; |
84 | | |
85 | | // Number or active tasks is incremented before task added to queue and decremented after |
86 | | // it was popped. |
87 | | // So it always greater than or equal to number of entries in queue. |
88 | | std::atomic<int64_t> active_tasks_{0}; |
89 | | |
90 | | // This flag is used in a sanity check to ensure that after this server becomes a follower, |
91 | | // the earlier leader-side operations that are still in the preparer's queue should fail to get |
92 | | // prepared due to old term. This sanity check will only be performed when an UpdateConsensus |
93 | | // with follower-side operations is received while earlier leader-side operations still have not |
94 | | // been processed, e.g. in an overloaded tablet server with lots of leader changes. |
95 | | std::atomic<bool> prepare_should_fail_{false}; |
96 | | |
97 | | MPSCQueue<OperationDriver> queue_; |
98 | | |
99 | | // This mutex/condition combination is used in Stop() in case multiple threads are calling that |
100 | | // function concurrently. One of them will ask the prepare thread to stop and wait for it, and |
101 | | // then will notify other threads that have called Stop(). |
102 | | std::mutex stop_mtx_; |
103 | | std::condition_variable stop_cond_; |
104 | | |
105 | | OperationDrivers leader_side_batch_; |
106 | | |
107 | | std::unique_ptr<ThreadPoolToken> tablet_prepare_pool_token_; |
108 | | |
109 | | // A temporary buffer of rounds to replicate, used to reduce reallocation. |
110 | | consensus::ConsensusRounds rounds_to_replicate_; |
111 | | |
112 | | void Run(); |
113 | | void ProcessItem(OperationDriver* item); |
114 | | |
115 | | void ProcessAndClearLeaderSideBatch(); |
116 | | |
117 | | // A wrapper around ProcessAndClearLeaderSideBatch that assumes we are currently holding the |
118 | | // mutex. |
119 | | |
120 | | void ReplicateSubBatch(OperationDrivers::iterator begin, |
121 | | OperationDrivers::iterator end); |
122 | | }; |
123 | | |
124 | | PreparerImpl::PreparerImpl(consensus::Consensus* consensus, ThreadPool* tablet_prepare_pool) |
125 | | : consensus_(consensus), |
126 | | tablet_prepare_pool_token_(tablet_prepare_pool |
127 | 150k | ->NewToken(ThreadPool::ExecutionMode::SERIAL)) { |
128 | 150k | } |
129 | | |
130 | 75.6k | PreparerImpl::~PreparerImpl() { |
131 | 75.6k | Stop(); |
132 | 75.6k | } |
133 | | |
134 | 150k | Status PreparerImpl::Start() { |
135 | 150k | return Status::OK(); |
136 | 150k | } |
137 | | |
138 | 151k | void PreparerImpl::Stop() { |
139 | 151k | if (stopped_.load(std::memory_order_acquire)) { |
140 | 75.6k | return; |
141 | 75.6k | } |
142 | 75.6k | stop_requested_ = true; |
143 | 75.6k | { |
144 | 75.6k | std::unique_lock<std::mutex> stop_lock(stop_mtx_); |
145 | 75.6k | stop_cond_.wait(stop_lock, [this] { |
146 | 75.6k | return !running_.load(std::memory_order_acquire) && |
147 | 75.6k | active_tasks_.load(std::memory_order_acquire) == 075.5k ; |
148 | 75.6k | }); |
149 | 75.6k | } |
150 | 75.6k | stopped_.store(true, std::memory_order_release); |
151 | 75.6k | } |
152 | | |
153 | 31.4M | Status PreparerImpl::Submit(OperationDriver* operation_driver) { |
154 | 31.4M | if (stop_requested_.load(std::memory_order_acquire)) { |
155 | 0 | return STATUS(IllegalState, "Tablet is shutting down"); |
156 | 0 | } |
157 | | |
158 | 31.4M | const bool leader_side = operation_driver->is_leader_side(); |
159 | | |
160 | | // When a leader becomes a follower, we expect the leader-side operations still in the preparer's |
161 | | // queue to fail to be prepared because their term will be too old as we try to add them to the |
162 | | // Raft queue. |
163 | 31.4M | prepare_should_fail_.store(!leader_side, std::memory_order_release); |
164 | | |
165 | 31.4M | if (leader_side) { |
166 | | // Prepare leader-side operations on the "preparer thread" so we can only acquire the |
167 | | // ReplicaState lock once and append multiple operations. |
168 | 5.04M | active_tasks_.fetch_add(1, std::memory_order_release); |
169 | 5.04M | queue_.Push(operation_driver); |
170 | 26.4M | } else { |
171 | | // For follower-side operations, there would be no benefit in preparing them on the preparer |
172 | | // thread. |
173 | 26.4M | operation_driver->PrepareAndStartTask(); |
174 | 26.4M | } |
175 | | |
176 | 31.4M | auto expected = false; |
177 | 31.4M | if (!running_.compare_exchange_strong(expected, true, std::memory_order_acq_rel)) { |
178 | | // running_ was already true, so we are not creating a task to process operations. |
179 | 800k | return Status::OK(); |
180 | 800k | } |
181 | | // We flipped running_ from 0 to 1. The previously running thread could go back to doing another |
182 | | // iteration, but in that case since we are submitting to a token of a thread pool, only one |
183 | | // such thread will be running, the other will be in the queue. |
184 | 30.6M | return tablet_prepare_pool_token_->SubmitFunc(std::bind(&PreparerImpl::Run, this)); |
185 | 31.4M | } |
186 | | |
187 | 30.6M | void PreparerImpl::Run() { |
188 | 30.6M | VLOG(2) << "Starting prepare task:" << this34 ; |
189 | 30.7M | for (;;) { |
190 | 35.8M | while (OperationDriver *item = queue_.Pop()) { |
191 | 5.04M | active_tasks_.fetch_sub(1, std::memory_order_release); |
192 | 5.04M | ProcessItem(item); |
193 | 5.04M | } |
194 | 30.7M | ProcessAndClearLeaderSideBatch(); |
195 | 30.7M | std::unique_lock<std::mutex> stop_lock(stop_mtx_); |
196 | 30.7M | running_.store(false, std::memory_order_release); |
197 | | // Check whether tasks were added while we were setting running to false. |
198 | 30.7M | if (active_tasks_.load(std::memory_order_acquire)) { |
199 | | // Got more operations, try stay in the loop. |
200 | 76.7k | bool expected = false; |
201 | 76.7k | if (running_.compare_exchange_strong(expected, true, std::memory_order_acq_rel)) { |
202 | 76.7k | continue; |
203 | 76.7k | } |
204 | | // If someone else has flipped running_ to true, we can safely exit this function because |
205 | | // another task is already submitted to the same token. |
206 | 76.7k | } |
207 | 30.6M | if (stop_requested_.load(std::memory_order_acquire)) { |
208 | 2 | VLOG(2) << "Prepare task's Run() function is returning because stop is requested."0 ; |
209 | 2 | stop_cond_.notify_all(); |
210 | 2 | } |
211 | 30.6M | VLOG(2) << "Returning from prepare task after inactivity: " << this376 ; |
212 | 30.6M | return; |
213 | 30.7M | } |
214 | 30.6M | } |
215 | | |
216 | | namespace { |
217 | | |
218 | 5.04M | bool ShouldApplySeparately(OperationType operation_type) { |
219 | 5.04M | switch (operation_type) { |
220 | | // For certain operations types we have to apply them in a batch of their own. |
221 | | // E.g. ChangeMetadataOperation::Prepare calls Tablet::CreatePreparedChangeMetadata, which |
222 | | // acquires the schema lock. Because of this, we must not attempt to process two |
223 | | // ChangeMetadataOperations in one batch, otherwise we'll deadlock. |
224 | | // |
225 | | // Also, for infrequently occuring operations batching has little performance benefit in |
226 | | // general. |
227 | 462k | case OperationType::kChangeMetadata: FALLTHROUGH_INTENDED; |
228 | 462k | case OperationType::kSnapshot: FALLTHROUGH_INTENDED; |
229 | 520k | case OperationType::kTruncate: FALLTHROUGH_INTENDED; |
230 | 520k | case OperationType::kSplit: FALLTHROUGH_INTENDED; |
231 | 520k | case OperationType::kEmpty: FALLTHROUGH_INTENDED; |
232 | 520k | case OperationType::kHistoryCutoff: |
233 | 520k | return true; |
234 | | |
235 | 3.02M | case OperationType::kWrite: FALLTHROUGH_INTENDED; |
236 | 4.52M | case OperationType::kUpdateTransaction: |
237 | 4.52M | return false; |
238 | 5.04M | } |
239 | 0 | FATAL_INVALID_ENUM_VALUE(OperationType, operation_type); |
240 | 0 | } |
241 | | |
242 | | } // anonymous namespace |
243 | | |
244 | 5.04M | void PreparerImpl::ProcessItem(OperationDriver* item) { |
245 | 5.04M | CHECK_NOTNULL(item); |
246 | | |
247 | 18.4E | LOG_IF(DFATAL, !item->is_leader_side()) << "Processing follower-side item"; |
248 | | |
249 | 5.04M | auto operation_type = item->operation_type(); |
250 | | |
251 | 5.04M | const bool apply_separately = ShouldApplySeparately(operation_type); |
252 | 5.04M | const int64_t bound_term = apply_separately ? -1520k : item->consensus_round()->bound_term()4.52M ; |
253 | | |
254 | | // Don't add more than the max number of operations to a batch, and also don't add |
255 | | // operations bound to different terms, so as not to fail unrelated operations |
256 | | // unnecessarily in case of a bound term mismatch. |
257 | 5.04M | if (leader_side_batch_.size() >= FLAGS_max_group_replicate_batch_size || |
258 | 5.04M | (5.04M !leader_side_batch_.empty()5.04M && |
259 | 5.04M | bound_term != leader_side_batch_.back()->consensus_round()->bound_term()116k )) { |
260 | 3.07k | ProcessAndClearLeaderSideBatch(); |
261 | 3.07k | } |
262 | 5.04M | leader_side_batch_.push_back(item); |
263 | 5.04M | if (apply_separately) { |
264 | 520k | ProcessAndClearLeaderSideBatch(); |
265 | 520k | } |
266 | 5.04M | } |
267 | | |
268 | 31.2M | void PreparerImpl::ProcessAndClearLeaderSideBatch() { |
269 | 31.2M | if (leader_side_batch_.empty()) { |
270 | 26.3M | return; |
271 | 26.3M | } |
272 | | |
273 | 4.93M | VLOG(2) << "Preparing a batch of " << leader_side_batch_.size() << " leader-side operations"147 ; |
274 | | |
275 | 4.93M | auto iter = leader_side_batch_.begin(); |
276 | 4.93M | auto replication_subbatch_begin = iter; |
277 | 4.93M | auto replication_subbatch_end = iter; |
278 | | |
279 | | // PrepareAndStart does not call Consensus::Replicate anymore as of 07/07/2017, and it is our |
280 | | // responsibility to do so in case of success. We call Consensus::ReplicateBatch for batches |
281 | | // of consecutive successfully prepared operations. |
282 | | |
283 | 9.97M | while (iter != leader_side_batch_.end()) { |
284 | 5.04M | auto* operation_driver = *iter; |
285 | | |
286 | 5.04M | Status s = operation_driver->PrepareAndStart(); |
287 | | |
288 | 5.04M | if (PREDICT_TRUE(s.ok())) { |
289 | 5.04M | replication_subbatch_end = ++iter; |
290 | 5.04M | } else { |
291 | 178 | ReplicateSubBatch(replication_subbatch_begin, replication_subbatch_end); |
292 | | |
293 | | // Handle failure for this operation itself. |
294 | 178 | operation_driver->HandleFailure(s); |
295 | | |
296 | | // Now we'll start accumulating a new batch. |
297 | 178 | replication_subbatch_begin = replication_subbatch_end = ++iter; |
298 | 178 | } |
299 | 5.04M | } |
300 | | |
301 | | // Replicate the remaining batch. No-op for an empty batch. |
302 | 4.93M | ReplicateSubBatch(replication_subbatch_begin, replication_subbatch_end); |
303 | | |
304 | 4.93M | leader_side_batch_.clear(); |
305 | 4.93M | } |
306 | | |
307 | | void PreparerImpl::ReplicateSubBatch( |
308 | | OperationDrivers::iterator batch_begin, |
309 | 4.93M | OperationDrivers::iterator batch_end) { |
310 | 4.93M | DCHECK_GE(std::distance(batch_begin, batch_end), 0); |
311 | 4.93M | if (batch_begin == batch_end) { |
312 | 0 | return; |
313 | 0 | } |
314 | 4.93M | VLOG(2) << "Replicating a sub-batch of " << std::distance(batch_begin, batch_end) |
315 | 285 | << " leader-side operations"; |
316 | 4.93M | if (VLOG_IS_ON(3)) { |
317 | 2 | for (auto batch_iter = batch_begin; batch_iter != batch_end; ++batch_iter1 ) { |
318 | 1 | VLOG(3) << "Leader-side operation to be replicated: " << (*batch_iter)->ToString()0 ; |
319 | 1 | } |
320 | 1 | } |
321 | | |
322 | 4.93M | rounds_to_replicate_.clear(); |
323 | 4.93M | rounds_to_replicate_.reserve(std::distance(batch_begin, batch_end)); |
324 | 9.97M | for (auto batch_iter = batch_begin; batch_iter != batch_end; ++batch_iter5.04M ) { |
325 | 5.04M | DCHECK_ONLY_NOTNULL(*batch_iter); |
326 | 5.04M | DCHECK_ONLY_NOTNULL((*batch_iter)->consensus_round()); |
327 | 5.04M | rounds_to_replicate_.push_back((*batch_iter)->consensus_round()); |
328 | 5.04M | } |
329 | | |
330 | 4.93M | AtomicFlagSleepMs(&FLAGS_TEST_preparer_batch_inject_latency_ms); |
331 | | // Have to save this value before calling replicate batch. |
332 | | // Because the following scenario is legal: |
333 | | // Operation successfully processed by ReplicateBatch, but ReplicateBatch did not return yet. |
334 | | // Submit of follower side operation is called from another thread. |
335 | 4.93M | bool should_fail = prepare_should_fail_.load(std::memory_order_acquire); |
336 | 4.93M | const Status s = consensus_->ReplicateBatch(rounds_to_replicate_); |
337 | 4.93M | rounds_to_replicate_.clear(); |
338 | | |
339 | 4.93M | if (s.ok()4.93M && should_fail) { |
340 | 0 | LOG(DFATAL) << "Operations should fail, but was successfully prepared: " |
341 | 0 | << AsString(boost::make_iterator_range(batch_begin, batch_end)); |
342 | 0 | } |
343 | 4.93M | } |
344 | | |
345 | | // ------------------------------------------------------------------------------------------------ |
346 | | // Preparer |
347 | | |
348 | | Preparer::Preparer(consensus::Consensus* consensus, ThreadPool* tablet_prepare_thread) |
349 | 150k | : impl_(std::make_unique<PreparerImpl>(consensus, tablet_prepare_thread)) { |
350 | 150k | } |
351 | | |
352 | 75.6k | Preparer::~Preparer() = default; |
353 | | |
354 | 150k | Status Preparer::Start() { |
355 | 150k | VLOG(1) << "Starting the preparer"42 ; |
356 | 150k | return impl_->Start(); |
357 | 150k | } |
358 | | |
359 | 75.6k | void Preparer::Stop() { |
360 | 75.6k | VLOG(1) << "Stopping the preparer"28 ; |
361 | 75.6k | impl_->Stop(); |
362 | 75.6k | VLOG(1) << "The preparer has stopped"48 ; |
363 | 75.6k | } |
364 | | |
365 | 31.4M | Status Preparer::Submit(OperationDriver* operation_driver) { |
366 | 31.4M | return impl_->Submit(operation_driver); |
367 | 31.4M | } |
368 | | |
369 | 0 | ThreadPoolToken* Preparer::PoolToken() { |
370 | 0 | return impl_->PoolToken(); |
371 | 0 | } |
372 | | |
373 | | } // namespace tablet |
374 | | } // namespace yb |