/Users/deen/code/yugabyte-db/src/yb/rocksdb/db/write_thread.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | | // This source code is licensed under the BSD-style license found in the |
3 | | // LICENSE file in the root directory of this source tree. An additional grant |
4 | | // of patent rights can be found in the PATENTS file in the same directory. |
5 | | // |
6 | | // The following only applies to changes made to this file as part of YugaByte development. |
7 | | // |
8 | | // Portions Copyright (c) YugaByte, Inc. |
9 | | // |
10 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
11 | | // in compliance with the License. You may obtain a copy of the License at |
12 | | // |
13 | | // http://www.apache.org/licenses/LICENSE-2.0 |
14 | | // |
15 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
16 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
17 | | // or implied. See the License for the specific language governing permissions and limitations |
18 | | // under the License. |
19 | | // |
20 | | |
21 | | #include "yb/rocksdb/db/write_thread.h" |
22 | | #include <thread> |
23 | | |
24 | | #include <chrono> |
25 | | #include <limits> |
26 | | |
27 | | #include "yb/rocksdb/db/write_batch_internal.h" |
28 | | #include "yb/rocksdb/port/port.h" |
29 | | #include "yb/rocksdb/util/random.h" |
30 | | #include "yb/rocksdb/util/sync_point.h" |
31 | | |
32 | | namespace rocksdb { |
33 | | |
34 | | WriteThread::WriteThread(uint64_t max_yield_usec, uint64_t slow_yield_usec) |
35 | | : max_yield_usec_(max_yield_usec), |
36 | | slow_yield_usec_(slow_yield_usec), |
37 | 435k | newest_writer_(nullptr) {} |
38 | | |
39 | 1.51M | uint8_t WriteThread::BlockingAwaitState(Writer* w, uint8_t goal_mask) { |
40 | | // We're going to block. Lazily create the mutex. We guarantee |
41 | | // propagation of this construction to the waker via the |
42 | | // STATE_LOCKED_WAITING state. The waker won't try to touch the mutex |
43 | | // or the condvar unless they CAS away the STATE_LOCKED_WAITING that |
44 | | // we install below. |
45 | 1.51M | w->CreateMutex(); |
46 | | |
47 | 1.51M | auto state = w->state.load(std::memory_order_acquire); |
48 | 1.51M | assert(state != STATE_LOCKED_WAITING); |
49 | 1.51M | if ((state & goal_mask) == 0 && |
50 | 1.52M | w->state.compare_exchange_strong(state, STATE_LOCKED_WAITING)1.50M ) { |
51 | | // we have permission (and an obligation) to use StateMutex |
52 | 1.52M | std::unique_lock<std::mutex> guard(w->StateMutex()); |
53 | 3.01M | w->StateCV().wait(guard, [w] { |
54 | 3.01M | return w->state.load(std::memory_order_relaxed) != STATE_LOCKED_WAITING; |
55 | 3.01M | }); |
56 | 1.52M | state = w->state.load(std::memory_order_relaxed); |
57 | 1.52M | } |
58 | | // else tricky. Goal is met or CAS failed. In the latter case the waker |
59 | | // must have changed the state, and compare_exchange_strong has updated |
60 | | // our local variable with the new one. At the moment WriteThread never |
61 | | // waits for a transition across intermediate states, so we know that |
62 | | // since a state change has occurred the goal must have been met. |
63 | 1.51M | assert((state & goal_mask) != 0); |
64 | 0 | return state; |
65 | 1.51M | } |
66 | | |
67 | | uint8_t WriteThread::AwaitState(Writer* w, uint8_t goal_mask, |
68 | 1.66M | AdaptationContext* ctx) { |
69 | 1.66M | uint8_t state; |
70 | | |
71 | | // On a modern Xeon each loop takes about 7 nanoseconds (most of which |
72 | | // is the effect of the pause instruction), so 200 iterations is a bit |
73 | | // more than a microsecond. This is long enough that waits longer than |
74 | | // this can amortize the cost of accessing the clock and yielding. |
75 | 269M | for (uint32_t tries = 0; tries < 200; ++tries268M ) { |
76 | 268M | state = w->state.load(std::memory_order_acquire); |
77 | 268M | if ((state & goal_mask) != 0) { |
78 | 110k | return state; |
79 | 110k | } |
80 | 268M | port::AsmVolatilePause(); |
81 | 268M | } |
82 | | |
83 | | // If we're only going to end up waiting a short period of time, |
84 | | // it can be a lot more efficient to call std::this_thread::yield() |
85 | | // in a loop than to block in StateMutex(). For reference, on my 4.0 |
86 | | // SELinux test server with support for syscall auditing enabled, the |
87 | | // minimum latency between FUTEX_WAKE to returning from FUTEX_WAIT is |
88 | | // 2.7 usec, and the average is more like 10 usec. That can be a big |
89 | | // drag on RockDB's single-writer design. Of course, spinning is a |
90 | | // bad idea if other threads are waiting to run or if we're going to |
91 | | // wait for a long time. How do we decide? |
92 | | // |
93 | | // We break waiting into 3 categories: short-uncontended, |
94 | | // short-contended, and long. If we had an oracle, then we would always |
95 | | // spin for short-uncontended, always block for long, and our choice for |
96 | | // short-contended might depend on whether we were trying to optimize |
97 | | // RocksDB throughput or avoid being greedy with system resources. |
98 | | // |
99 | | // Bucketing into short or long is easy by measuring elapsed time. |
100 | | // Differentiating short-uncontended from short-contended is a bit |
101 | | // trickier, but not too bad. We could look for involuntary context |
102 | | // switches using getrusage(RUSAGE_THREAD, ..), but it's less work |
103 | | // (portability code and CPU) to just look for yield calls that take |
104 | | // longer than we expect. sched_yield() doesn't actually result in any |
105 | | // context switch overhead if there are no other runnable processes |
106 | | // on the current core, in which case it usually takes less than |
107 | | // a microsecond. |
108 | | // |
109 | | // There are two primary tunables here: the threshold between "short" |
110 | | // and "long" waits, and the threshold at which we suspect that a yield |
111 | | // is slow enough to indicate we should probably block. If these |
112 | | // thresholds are chosen well then CPU-bound workloads that don't |
113 | | // have more threads than cores will experience few context switches |
114 | | // (voluntary or involuntary), and the total number of context switches |
115 | | // (voluntary and involuntary) will not be dramatically larger (maybe |
116 | | // 2x) than the number of voluntary context switches that occur when |
117 | | // --max_yield_wait_micros=0. |
118 | | // |
119 | | // There's another constant, which is the number of slow yields we will |
120 | | // tolerate before reversing our previous decision. Solitary slow |
121 | | // yields are pretty common (low-priority small jobs ready to run), |
122 | | // so this should be at least 2. We set this conservatively to 3 so |
123 | | // that we can also immediately schedule a ctx adaptation, rather than |
124 | | // waiting for the next update_ctx. |
125 | | |
126 | 1.55M | const size_t kMaxSlowYieldsWhileSpinning = 3; |
127 | | |
128 | 1.55M | bool update_ctx = false; |
129 | 1.55M | bool would_spin_again = false; |
130 | | |
131 | 1.55M | if (max_yield_usec_ > 0) { |
132 | 63.0k | update_ctx = Random::GetTLSInstance()->OneIn(256); |
133 | | |
134 | 63.0k | if (update_ctx || ctx->value.load(std::memory_order_relaxed) >= 062.8k ) { |
135 | | // we're updating the adaptation statistics, or spinning has > |
136 | | // 50% chance of being shorter than max_yield_usec_ and causing no |
137 | | // involuntary context switches |
138 | 340 | auto spin_begin = std::chrono::steady_clock::now(); |
139 | | |
140 | | // this variable doesn't include the final yield (if any) that |
141 | | // causes the goal to be met |
142 | 340 | size_t slow_yield_count = 0; |
143 | | |
144 | 340 | auto iter_begin = spin_begin; |
145 | 36.5k | while ((iter_begin - spin_begin) <= |
146 | 36.5k | std::chrono::microseconds(max_yield_usec_)) { |
147 | 36.4k | std::this_thread::yield(); |
148 | | |
149 | 36.4k | state = w->state.load(std::memory_order_acquire); |
150 | 36.4k | if ((state & goal_mask) != 0) { |
151 | | // success |
152 | 102 | would_spin_again = true; |
153 | 102 | break; |
154 | 102 | } |
155 | | |
156 | 36.3k | auto now = std::chrono::steady_clock::now(); |
157 | 36.3k | if (now == iter_begin || |
158 | 36.3k | now - iter_begin >= std::chrono::microseconds(slow_yield_usec_)36.2k ) { |
159 | | // conservatively count it as a slow yield if our clock isn't |
160 | | // accurate enough to measure the yield duration |
161 | 303 | ++slow_yield_count; |
162 | 303 | if (slow_yield_count >= kMaxSlowYieldsWhileSpinning) { |
163 | | // Not just one ivcsw, but several. Immediately update ctx |
164 | | // and fall back to blocking |
165 | 49 | update_ctx = true; |
166 | 49 | break; |
167 | 49 | } |
168 | 303 | } |
169 | 36.2k | iter_begin = now; |
170 | 36.2k | } |
171 | 340 | } |
172 | 63.0k | } |
173 | | |
174 | 1.55M | if ((state & goal_mask) == 0) { |
175 | 1.51M | state = BlockingAwaitState(w, goal_mask); |
176 | 1.51M | } |
177 | | |
178 | 1.55M | if (update_ctx) { |
179 | 287 | auto v = ctx->value.load(std::memory_order_relaxed); |
180 | | // fixed point exponential decay with decay constant 1/1024, with +1 |
181 | | // and -1 scaled to avoid overflow for int32_t |
182 | 287 | v = v + (v / 1024) + (would_spin_again ? 170 : -1217 ) * 16384; |
183 | 287 | ctx->value.store(v, std::memory_order_relaxed); |
184 | 287 | } |
185 | | |
186 | 1.55M | assert((state & goal_mask) != 0); |
187 | 0 | return state; |
188 | 1.66M | } |
189 | | |
190 | 1.67M | void WriteThread::SetState(Writer* w, uint8_t new_state) { |
191 | 1.67M | auto state = w->state.load(std::memory_order_acquire); |
192 | 1.67M | if (state == STATE_LOCKED_WAITING || |
193 | 1.67M | !w->state.compare_exchange_strong(state, new_state)151k ) { |
194 | 1.52M | assert(state == STATE_LOCKED_WAITING); |
195 | | |
196 | 0 | std::lock_guard<std::mutex> guard(w->StateMutex()); |
197 | 1.52M | assert(w->state.load(std::memory_order_relaxed) != new_state); |
198 | 0 | w->state.store(new_state, std::memory_order_relaxed); |
199 | 1.52M | w->StateCV().notify_one(); |
200 | 1.52M | } |
201 | 1.67M | } |
202 | | |
203 | 30.8M | void WriteThread::LinkOne(Writer* w, bool* linked_as_leader) { |
204 | 30.8M | assert(w->state == STATE_INIT); |
205 | | |
206 | 0 | Writer* writers = newest_writer_.load(std::memory_order_relaxed); |
207 | 30.8M | while (true30.8M ) { |
208 | 30.8M | w->link_older = writers; |
209 | 30.9M | if (newest_writer_.compare_exchange_strong(writers, w)30.8M ) { |
210 | 30.9M | if (writers == nullptr) { |
211 | | // this isn't part of the WriteThread machinery, but helps with |
212 | | // debugging and is checked by an assert in WriteImpl |
213 | 29.2M | w->state.store(STATE_GROUP_LEADER, std::memory_order_relaxed); |
214 | 29.2M | } |
215 | 30.9M | *linked_as_leader = (writers == nullptr); |
216 | 30.9M | return; |
217 | 30.9M | } |
218 | 30.8M | } |
219 | 30.8M | } |
220 | | |
221 | 29.1M | void WriteThread::CreateMissingNewerLinks(Writer* head) { |
222 | 30.7M | while (true) { |
223 | 30.7M | Writer* next = head->link_older; |
224 | 30.7M | if (next == nullptr || next->link_newer != nullptr2.28M ) { |
225 | 29.1M | assert(next == nullptr || next->link_newer == head); |
226 | 0 | break; |
227 | 29.1M | } |
228 | 1.62M | next->link_newer = head; |
229 | 1.62M | head = next; |
230 | 1.62M | } |
231 | 29.1M | } |
232 | | |
233 | 30.0M | void WriteThread::JoinBatchGroup(Writer* w) { |
234 | 30.0M | static AdaptationContext ctx("JoinBatchGroup"); |
235 | | |
236 | 30.0M | assert(w->batch != nullptr); |
237 | 0 | bool linked_as_leader; |
238 | 30.0M | LinkOne(w, &linked_as_leader); |
239 | | |
240 | 30.0M | TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:Wait", w); |
241 | | |
242 | 30.0M | if (!linked_as_leader) { |
243 | 1.62M | AwaitState(w, |
244 | 1.62M | STATE_GROUP_LEADER | STATE_PARALLEL_FOLLOWER | STATE_COMPLETED, |
245 | 1.62M | &ctx); |
246 | 1.62M | TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:DoneWaiting", w); |
247 | 1.62M | } |
248 | 30.0M | } |
249 | | |
250 | | size_t WriteThread::EnterAsBatchGroupLeader( |
251 | | Writer* leader, WriteThread::Writer** last_writer, |
252 | 28.7M | autovector<WriteThread::Writer*>* write_batch_group) { |
253 | 28.7M | assert(leader->link_older == nullptr); |
254 | 0 | assert(leader->batch != nullptr); |
255 | | |
256 | 0 | size_t size = WriteBatchInternal::ByteSize(leader->batch); |
257 | 28.7M | write_batch_group->push_back(leader); |
258 | | |
259 | | // Allow the group to grow up to a maximum size, but if the |
260 | | // original write is small, limit the growth so we do not slow |
261 | | // down the small write too much. |
262 | 28.7M | size_t max_size = 1 << 20; |
263 | 28.7M | if (size <= (128 << 10)) { |
264 | 28.7M | max_size = size + (128 << 10); |
265 | 28.7M | } |
266 | | |
267 | 28.7M | *last_writer = leader; |
268 | | |
269 | 28.7M | Writer* newest_writer = newest_writer_.load(std::memory_order_acquire); |
270 | | |
271 | | // This is safe regardless of any db mutex status of the caller. Previous |
272 | | // calls to ExitAsGroupLeader either didn't call CreateMissingNewerLinks |
273 | | // (they emptied the list and then we added ourself as leader) or had to |
274 | | // explicitly wake us up (the list was non-empty when we added ourself, |
275 | | // so we have already received our MarkJoined). |
276 | 28.7M | CreateMissingNewerLinks(newest_writer); |
277 | | |
278 | | // Tricky. Iteration start (leader) is exclusive and finish |
279 | | // (newest_writer) is inclusive. Iteration goes from old to new. |
280 | 28.7M | Writer* w = leader; |
281 | 30.0M | while (w != newest_writer) { |
282 | 1.25M | w = w->link_newer; |
283 | | |
284 | 1.25M | if (w->sync && !leader->sync96 ) { |
285 | | // Do not include a sync write into a batch handled by a non-sync write. |
286 | 0 | break; |
287 | 0 | } |
288 | | |
289 | 1.25M | if (!w->disableWAL && leader->disableWAL1.25M ) { |
290 | | // Do not include a write that needs WAL into a batch that has |
291 | | // WAL disabled. |
292 | 8 | break; |
293 | 8 | } |
294 | | |
295 | 1.25M | if (w->batch == nullptr) { |
296 | | // Do not include those writes with nullptr batch. Those are not writes, |
297 | | // those are something else. They want to be alone |
298 | 164 | break; |
299 | 164 | } |
300 | | |
301 | 1.25M | if (w->callback != nullptr && !w->callback->AllowWriteBatching()192 ) { |
302 | | // dont batch writes that don't want to be batched |
303 | 96 | break; |
304 | 96 | } |
305 | | |
306 | 1.25M | auto batch_size = WriteBatchInternal::ByteSize(w->batch); |
307 | 1.25M | if (size + batch_size > max_size) { |
308 | | // Do not make batch too big |
309 | 0 | break; |
310 | 0 | } |
311 | | |
312 | 1.25M | size += batch_size; |
313 | 1.25M | write_batch_group->push_back(w); |
314 | 1.25M | w->in_batch_group = true; |
315 | 1.25M | *last_writer = w; |
316 | 1.25M | } |
317 | 28.7M | return size; |
318 | 28.7M | } |
319 | | |
320 | | void WriteThread::LaunchParallelFollowers(ParallelGroup* pg, |
321 | 12.2k | SequenceNumber sequence) { |
322 | | // EnterAsBatchGroupLeader already created the links from leader to |
323 | | // newer writers in the group |
324 | | |
325 | 12.2k | pg->leader->parallel_group = pg; |
326 | | |
327 | 12.2k | Writer* w = pg->leader; |
328 | 12.2k | w->sequence = sequence; |
329 | | |
330 | 56.0k | while (w != pg->last_writer) { |
331 | | // Writers that won't write don't get sequence allotment |
332 | 43.7k | if (!w->CallbackFailed()) { |
333 | 43.7k | sequence += WriteBatchInternal::Count(w->batch); |
334 | 43.7k | } |
335 | 43.7k | w = w->link_newer; |
336 | | |
337 | 43.7k | w->sequence = sequence; |
338 | 43.7k | w->parallel_group = pg; |
339 | 43.7k | SetState(w, STATE_PARALLEL_FOLLOWER); |
340 | 43.7k | } |
341 | 12.2k | } |
342 | | |
343 | 55.9k | bool WriteThread::CompleteParallelWorker(Writer* w) { |
344 | 55.9k | static AdaptationContext ctx("CompleteParallelWorker"); |
345 | | |
346 | 55.9k | auto* pg = w->parallel_group; |
347 | 55.9k | if (!w->status.ok()) { |
348 | 0 | std::lock_guard<std::mutex> guard(w->StateMutex()); |
349 | 0 | pg->status = w->status; |
350 | 0 | } |
351 | | |
352 | 55.9k | auto leader = pg->leader; |
353 | 55.9k | auto early_exit_allowed = pg->early_exit_allowed; |
354 | | |
355 | 55.9k | if (pg->running.load(std::memory_order_acquire) > 1 && pg->running-- > 143.7k ) { |
356 | | // we're not the last one |
357 | 43.7k | AwaitState(w, STATE_COMPLETED, &ctx); |
358 | | |
359 | | // Caller only needs to perform exit duties if early exit doesn't |
360 | | // apply and this is the leader. Can't touch pg here. Whoever set |
361 | | // our state to STATE_COMPLETED copied pg->status to w.status for us. |
362 | 43.7k | return w == leader && !(11.9k early_exit_allowed11.9k && w->status.ok()11.9k ); |
363 | 43.7k | } |
364 | | // else we're the last parallel worker |
365 | | |
366 | 12.2k | if (12.1k w == leader12.1k || (11.9k early_exit_allowed11.9k && pg->status.ok()11.9k )) { |
367 | | // this thread should perform exit duties |
368 | 12.2k | w->status = pg->status; |
369 | 12.2k | return true; |
370 | 18.4E | } else { |
371 | | // We're the last parallel follower but early commit is not |
372 | | // applicable. Wake up the leader and then wait for it to exit. |
373 | 18.4E | assert(w->state == STATE_PARALLEL_FOLLOWER); |
374 | 0 | SetState(leader, STATE_COMPLETED); |
375 | 18.4E | AwaitState(w, STATE_COMPLETED, &ctx); |
376 | 18.4E | return false; |
377 | 18.4E | } |
378 | 12.1k | } |
379 | | |
380 | 11.9k | void WriteThread::EarlyExitParallelGroup(Writer* w) { |
381 | 11.9k | auto* pg = w->parallel_group; |
382 | | |
383 | 11.9k | assert(w->state == STATE_PARALLEL_FOLLOWER); |
384 | 0 | assert(pg->status.ok()); |
385 | 0 | ExitAsBatchGroupLeader(pg->leader, pg->last_writer, pg->status); |
386 | 11.9k | assert(w->status.ok()); |
387 | 0 | assert(w->state == STATE_COMPLETED); |
388 | 0 | SetState(pg->leader, STATE_COMPLETED); |
389 | 11.9k | } |
390 | | |
391 | | void WriteThread::ExitAsBatchGroupLeader(Writer* leader, Writer* last_writer, |
392 | 29.6M | Status status) { |
393 | 29.6M | assert(leader->link_older == nullptr); |
394 | | |
395 | 0 | Writer* head = newest_writer_.load(std::memory_order_acquire); |
396 | 29.6M | if (head != last_writer || |
397 | 29.6M | !newest_writer_.compare_exchange_strong(head, nullptr)29.2M ) { |
398 | | // Either w wasn't the head during the load(), or it was the head |
399 | | // during the load() but somebody else pushed onto the list before |
400 | | // we did the compare_exchange_strong (causing it to fail). In the |
401 | | // latter case compare_exchange_strong has the effect of re-reading |
402 | | // its first param (head). No need to retry a failing CAS, because |
403 | | // only a departing leader (which we are at the moment) can remove |
404 | | // nodes from the list. |
405 | 365k | assert(head != last_writer); |
406 | | |
407 | | // After walking link_older starting from head (if not already done) |
408 | | // we will be able to traverse w->link_newer below. This function |
409 | | // can only be called from an active leader, only a leader can |
410 | | // clear newest_writer_, we didn't, and only a clear newest_writer_ |
411 | | // could cause the next leader to start their work without a call |
412 | | // to MarkJoined, so we can definitely conclude that no other leader |
413 | | // work is going on here (with or without db mutex). |
414 | 0 | CreateMissingNewerLinks(head); |
415 | 365k | assert(last_writer->link_newer->link_older == last_writer); |
416 | 0 | last_writer->link_newer->link_older = nullptr; |
417 | | |
418 | | // Next leader didn't self-identify, because newest_writer_ wasn't |
419 | | // nullptr when they enqueued (we were definitely enqueued before them |
420 | | // and are still in the list). That means leader handoff occurs when |
421 | | // we call MarkJoined |
422 | 365k | SetState(last_writer->link_newer, STATE_GROUP_LEADER); |
423 | 365k | } |
424 | | // else nobody else was waiting, although there might already be a new |
425 | | // leader now |
426 | | |
427 | 30.9M | while (last_writer != leader) { |
428 | 1.25M | last_writer->status = status; |
429 | | // we need to read link_older before calling SetState, because as soon |
430 | | // as it is marked committed the other thread's Await may return and |
431 | | // deallocate the Writer. |
432 | 1.25M | auto next = last_writer->link_older; |
433 | 1.25M | SetState(last_writer, STATE_COMPLETED); |
434 | | |
435 | 1.25M | last_writer = next; |
436 | 1.25M | } |
437 | 29.6M | } |
438 | | |
439 | 869k | void WriteThread::EnterUnbatched(Writer* w, InstrumentedMutex* mu) { |
440 | 869k | static AdaptationContext ctx("EnterUnbatched"); |
441 | | |
442 | 869k | assert(w->batch == nullptr); |
443 | 0 | bool linked_as_leader; |
444 | 869k | LinkOne(w, &linked_as_leader); |
445 | 869k | if (!linked_as_leader) { |
446 | 1.72k | mu->Unlock(); |
447 | 1.72k | TEST_SYNC_POINT("WriteThread::EnterUnbatched:Wait"); |
448 | 1.72k | AwaitState(w, STATE_GROUP_LEADER, &ctx); |
449 | 1.72k | mu->Lock(); |
450 | 1.72k | } |
451 | 869k | } |
452 | | |
453 | 869k | void WriteThread::ExitUnbatched(Writer* w) { |
454 | 869k | Status dummy_status; |
455 | 869k | ExitAsBatchGroupLeader(w, w, dummy_status); |
456 | 869k | } |
457 | | |
458 | | } // namespace rocksdb |