YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/rocksdb/db/write_thread.cc
Line
Count
Source (jump to first uncovered line)
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under the BSD-style license found in the
3
//  LICENSE file in the root directory of this source tree. An additional grant
4
//  of patent rights can be found in the PATENTS file in the same directory.
5
//
6
// The following only applies to changes made to this file as part of YugaByte development.
7
//
8
// Portions Copyright (c) YugaByte, Inc.
9
//
10
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
11
// in compliance with the License.  You may obtain a copy of the License at
12
//
13
// http://www.apache.org/licenses/LICENSE-2.0
14
//
15
// Unless required by applicable law or agreed to in writing, software distributed under the License
16
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
17
// or implied.  See the License for the specific language governing permissions and limitations
18
// under the License.
19
//
20
21
#include "yb/rocksdb/db/write_thread.h"
22
#include <thread>
23
24
#include <chrono>
25
#include <limits>
26
27
#include "yb/rocksdb/db/write_batch_internal.h"
28
#include "yb/rocksdb/port/port.h"
29
#include "yb/rocksdb/util/random.h"
30
#include "yb/rocksdb/util/sync_point.h"
31
32
namespace rocksdb {
33
34
WriteThread::WriteThread(uint64_t max_yield_usec, uint64_t slow_yield_usec)
35
    : max_yield_usec_(max_yield_usec),
36
      slow_yield_usec_(slow_yield_usec),
37
341k
      newest_writer_(nullptr) {}
38
39
1.52M
uint8_t WriteThread::BlockingAwaitState(Writer* w, uint8_t goal_mask) {
40
  // We're going to block.  Lazily create the mutex.  We guarantee
41
  // propagation of this construction to the waker via the
42
  // STATE_LOCKED_WAITING state.  The waker won't try to touch the mutex
43
  // or the condvar unless they CAS away the STATE_LOCKED_WAITING that
44
  // we install below.
45
1.52M
  w->CreateMutex();
46
47
1.52M
  auto state = w->state.load(std::memory_order_acquire);
48
1.52M
  assert(state != STATE_LOCKED_WAITING);
49
1.52M
  if ((state & goal_mask) == 0 &&
50
1.54M
      w->state.compare_exchange_strong(state, STATE_LOCKED_WAITING)) {
51
    // we have permission (and an obligation) to use StateMutex
52
1.54M
    std::unique_lock<std::mutex> guard(w->StateMutex());
53
3.04M
    w->StateCV().wait(guard, [w] {
54
3.04M
      return w->state.load(std::memory_order_relaxed) != STATE_LOCKED_WAITING;
55
3.04M
    });
56
1.54M
    state = w->state.load(std::memory_order_relaxed);
57
1.54M
  }
58
  // else tricky.  Goal is met or CAS failed.  In the latter case the waker
59
  // must have changed the state, and compare_exchange_strong has updated
60
  // our local variable with the new one.  At the moment WriteThread never
61
  // waits for a transition across intermediate states, so we know that
62
  // since a state change has occurred the goal must have been met.
63
1.52M
  assert((state & goal_mask) != 0);
64
1.52M
  return state;
65
1.52M
}
66
67
uint8_t WriteThread::AwaitState(Writer* w, uint8_t goal_mask,
68
1.69M
                                AdaptationContext* ctx) {
69
1.69M
  uint8_t state;
70
71
  // On a modern Xeon each loop takes about 7 nanoseconds (most of which
72
  // is the effect of the pause instruction), so 200 iterations is a bit
73
  // more than a microsecond.  This is long enough that waits longer than
74
  // this can amortize the cost of accessing the clock and yielding.
75
272M
  for (uint32_t tries = 0; tries < 200; ++tries) {
76
270M
    state = w->state.load(std::memory_order_acquire);
77
270M
    if ((state & goal_mask) != 0) {
78
111k
      return state;
79
111k
    }
80
270M
    port::AsmVolatilePause();
81
270M
  }
82
83
  // If we're only going to end up waiting a short period of time,
84
  // it can be a lot more efficient to call std::this_thread::yield()
85
  // in a loop than to block in StateMutex().  For reference, on my 4.0
86
  // SELinux test server with support for syscall auditing enabled, the
87
  // minimum latency between FUTEX_WAKE to returning from FUTEX_WAIT is
88
  // 2.7 usec, and the average is more like 10 usec.  That can be a big
89
  // drag on RockDB's single-writer design.  Of course, spinning is a
90
  // bad idea if other threads are waiting to run or if we're going to
91
  // wait for a long time.  How do we decide?
92
  //
93
  // We break waiting into 3 categories: short-uncontended,
94
  // short-contended, and long.  If we had an oracle, then we would always
95
  // spin for short-uncontended, always block for long, and our choice for
96
  // short-contended might depend on whether we were trying to optimize
97
  // RocksDB throughput or avoid being greedy with system resources.
98
  //
99
  // Bucketing into short or long is easy by measuring elapsed time.
100
  // Differentiating short-uncontended from short-contended is a bit
101
  // trickier, but not too bad.  We could look for involuntary context
102
  // switches using getrusage(RUSAGE_THREAD, ..), but it's less work
103
  // (portability code and CPU) to just look for yield calls that take
104
  // longer than we expect.  sched_yield() doesn't actually result in any
105
  // context switch overhead if there are no other runnable processes
106
  // on the current core, in which case it usually takes less than
107
  // a microsecond.
108
  //
109
  // There are two primary tunables here: the threshold between "short"
110
  // and "long" waits, and the threshold at which we suspect that a yield
111
  // is slow enough to indicate we should probably block.  If these
112
  // thresholds are chosen well then CPU-bound workloads that don't
113
  // have more threads than cores will experience few context switches
114
  // (voluntary or involuntary), and the total number of context switches
115
  // (voluntary and involuntary) will not be dramatically larger (maybe
116
  // 2x) than the number of voluntary context switches that occur when
117
  // --max_yield_wait_micros=0.
118
  //
119
  // There's another constant, which is the number of slow yields we will
120
  // tolerate before reversing our previous decision.  Solitary slow
121
  // yields are pretty common (low-priority small jobs ready to run),
122
  // so this should be at least 2.  We set this conservatively to 3 so
123
  // that we can also immediately schedule a ctx adaptation, rather than
124
  // waiting for the next update_ctx.
125
126
1.57M
  const size_t kMaxSlowYieldsWhileSpinning = 3;
127
128
1.57M
  bool update_ctx = false;
129
1.57M
  bool would_spin_again = false;
130
131
1.57M
  if (max_yield_usec_ > 0) {
132
56.0k
    update_ctx = Random::GetTLSInstance()->OneIn(256);
133
134
56.0k
    if (update_ctx || ctx->value.load(std::memory_order_relaxed) >= 0) {
135
      // we're updating the adaptation statistics, or spinning has >
136
      // 50% chance of being shorter than max_yield_usec_ and causing no
137
      // involuntary context switches
138
1.09k
      auto spin_begin = std::chrono::steady_clock::now();
139
140
      // this variable doesn't include the final yield (if any) that
141
      // causes the goal to be met
142
1.09k
      size_t slow_yield_count = 0;
143
144
1.09k
      auto iter_begin = spin_begin;
145
104k
      while ((iter_begin - spin_begin) <=
146
103k
             std::chrono::microseconds(max_yield_usec_)) {
147
103k
        std::this_thread::yield();
148
149
103k
        state = w->state.load(std::memory_order_acquire);
150
103k
        if ((state & goal_mask) != 0) {
151
          // success
152
819
          would_spin_again = true;
153
819
          break;
154
819
        }
155
156
103k
        auto now = std::chrono::steady_clock::now();
157
103k
        if (now == iter_begin ||
158
103k
            now - iter_begin >= std::chrono::microseconds(slow_yield_usec_)) {
159
          // conservatively count it as a slow yield if our clock isn't
160
          // accurate enough to measure the yield duration
161
411
          ++slow_yield_count;
162
411
          if (slow_yield_count >= kMaxSlowYieldsWhileSpinning) {
163
            // Not just one ivcsw, but several.  Immediately update ctx
164
            // and fall back to blocking
165
35
            update_ctx = true;
166
35
            break;
167
35
          }
168
102k
        }
169
102k
        iter_begin = now;
170
102k
      }
171
1.09k
    }
172
56.0k
  }
173
174
1.57M
  if ((state & goal_mask) == 0) {
175
1.53M
    state = BlockingAwaitState(w, goal_mask);
176
1.53M
  }
177
178
1.57M
  if (update_ctx) {
179
235
    auto v = ctx->value.load(std::memory_order_relaxed);
180
    // fixed point exponential decay with decay constant 1/1024, with +1
181
    // and -1 scaled to avoid overflow for int32_t
182
155
    v = v + (v / 1024) + (would_spin_again ? 1 : -1) * 16384;
183
235
    ctx->value.store(v, std::memory_order_relaxed);
184
235
  }
185
186
1.57M
  assert((state & goal_mask) != 0);
187
1.57M
  return state;
188
1.69M
}
189
190
1.70M
void WriteThread::SetState(Writer* w, uint8_t new_state) {
191
1.70M
  auto state = w->state.load(std::memory_order_acquire);
192
1.70M
  if (state == STATE_LOCKED_WAITING ||
193
1.54M
      !w->state.compare_exchange_strong(state, new_state)) {
194
1.54M
    assert(state == STATE_LOCKED_WAITING);
195
196
1.54M
    std::lock_guard<std::mutex> guard(w->StateMutex());
197
1.54M
    assert(w->state.load(std::memory_order_relaxed) != new_state);
198
1.54M
    w->state.store(new_state, std::memory_order_relaxed);
199
1.54M
    w->StateCV().notify_one();
200
1.54M
  }
201
1.70M
}
202
203
24.7M
void WriteThread::LinkOne(Writer* w, bool* linked_as_leader) {
204
24.7M
  assert(w->state == STATE_INIT);
205
206
24.7M
  Writer* writers = newest_writer_.load(std::memory_order_relaxed);
207
24.7M
  while (true) {
208
24.7M
    w->link_older = writers;
209
24.7M
    if (newest_writer_.compare_exchange_strong(writers, w)) {
210
24.7M
      if (writers == nullptr) {
211
        // this isn't part of the WriteThread machinery, but helps with
212
        // debugging and is checked by an assert in WriteImpl
213
23.0M
        w->state.store(STATE_GROUP_LEADER, std::memory_order_relaxed);
214
23.0M
      }
215
24.7M
      *linked_as_leader = (writers == nullptr);
216
24.7M
      return;
217
24.7M
    }
218
24.7M
  }
219
24.7M
}
220
221
23.1M
void WriteThread::CreateMissingNewerLinks(Writer* head) {
222
24.7M
  while (true) {
223
24.7M
    Writer* next = head->link_older;
224
24.7M
    if (next == nullptr || next->link_newer != nullptr) {
225
23.1M
      assert(next == nullptr || next->link_newer == head);
226
23.1M
      break;
227
23.1M
    }
228
1.64M
    next->link_newer = head;
229
1.64M
    head = next;
230
1.64M
  }
231
23.1M
}
232
233
24.0M
void WriteThread::JoinBatchGroup(Writer* w) {
234
24.0M
  static AdaptationContext ctx("JoinBatchGroup");
235
236
24.0M
  assert(w->batch != nullptr);
237
24.0M
  bool linked_as_leader;
238
24.0M
  LinkOne(w, &linked_as_leader);
239
240
24.0M
  TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:Wait", w);
241
242
24.0M
  if (!linked_as_leader) {
243
1.64M
    AwaitState(w,
244
1.64M
               STATE_GROUP_LEADER | STATE_PARALLEL_FOLLOWER | STATE_COMPLETED,
245
1.64M
               &ctx);
246
1.64M
    TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:DoneWaiting", w);
247
1.64M
  }
248
24.0M
}
249
250
size_t WriteThread::EnterAsBatchGroupLeader(
251
    Writer* leader, WriteThread::Writer** last_writer,
252
22.7M
    autovector<WriteThread::Writer*>* write_batch_group) {
253
22.7M
  assert(leader->link_older == nullptr);
254
22.7M
  assert(leader->batch != nullptr);
255
256
22.7M
  size_t size = WriteBatchInternal::ByteSize(leader->batch);
257
22.7M
  write_batch_group->push_back(leader);
258
259
  // Allow the group to grow up to a maximum size, but if the
260
  // original write is small, limit the growth so we do not slow
261
  // down the small write too much.
262
22.7M
  size_t max_size = 1 << 20;
263
22.7M
  if (size <= (128 << 10)) {
264
22.7M
    max_size = size + (128 << 10);
265
22.7M
  }
266
267
22.7M
  *last_writer = leader;
268
269
22.7M
  Writer* newest_writer = newest_writer_.load(std::memory_order_acquire);
270
271
  // This is safe regardless of any db mutex status of the caller. Previous
272
  // calls to ExitAsGroupLeader either didn't call CreateMissingNewerLinks
273
  // (they emptied the list and then we added ourself as leader) or had to
274
  // explicitly wake us up (the list was non-empty when we added ourself,
275
  // so we have already received our MarkJoined).
276
22.7M
  CreateMissingNewerLinks(newest_writer);
277
278
  // Tricky. Iteration start (leader) is exclusive and finish
279
  // (newest_writer) is inclusive. Iteration goes from old to new.
280
22.7M
  Writer* w = leader;
281
24.0M
  while (w != newest_writer) {
282
1.28M
    w = w->link_newer;
283
284
1.28M
    if (w->sync && !leader->sync) {
285
      // Do not include a sync write into a batch handled by a non-sync write.
286
0
      break;
287
0
    }
288
289
1.28M
    if (!w->disableWAL && leader->disableWAL) {
290
      // Do not include a write that needs WAL into a batch that has
291
      // WAL disabled.
292
0
      break;
293
0
    }
294
295
1.28M
    if (w->batch == nullptr) {
296
      // Do not include those writes with nullptr batch. Those are not writes,
297
      // those are something else. They want to be alone
298
191
      break;
299
191
    }
300
301
1.28M
    if (w->callback != nullptr && !w->callback->AllowWriteBatching()) {
302
      // dont batch writes that don't want to be batched
303
96
      break;
304
96
    }
305
306
1.28M
    auto batch_size = WriteBatchInternal::ByteSize(w->batch);
307
1.28M
    if (size + batch_size > max_size) {
308
      // Do not make batch too big
309
0
      break;
310
0
    }
311
312
1.28M
    size += batch_size;
313
1.28M
    write_batch_group->push_back(w);
314
1.28M
    w->in_batch_group = true;
315
1.28M
    *last_writer = w;
316
1.28M
  }
317
22.7M
  return size;
318
22.7M
}
319
320
void WriteThread::LaunchParallelFollowers(ParallelGroup* pg,
321
12.4k
                                          SequenceNumber sequence) {
322
  // EnterAsBatchGroupLeader already created the links from leader to
323
  // newer writers in the group
324
325
12.4k
  pg->leader->parallel_group = pg;
326
327
12.4k
  Writer* w = pg->leader;
328
12.4k
  w->sequence = sequence;
329
330
55.5k
  while (w != pg->last_writer) {
331
    // Writers that won't write don't get sequence allotment
332
43.0k
    if (!w->CallbackFailed()) {
333
43.0k
      sequence += WriteBatchInternal::Count(w->batch);
334
43.0k
    }
335
43.0k
    w = w->link_newer;
336
337
43.0k
    w->sequence = sequence;
338
43.0k
    w->parallel_group = pg;
339
43.0k
    SetState(w, STATE_PARALLEL_FOLLOWER);
340
43.0k
  }
341
12.4k
}
342
343
55.4k
bool WriteThread::CompleteParallelWorker(Writer* w) {
344
55.4k
  static AdaptationContext ctx("CompleteParallelWorker");
345
346
55.4k
  auto* pg = w->parallel_group;
347
55.4k
  if (!w->status.ok()) {
348
0
    std::lock_guard<std::mutex> guard(w->StateMutex());
349
0
    pg->status = w->status;
350
0
  }
351
352
55.4k
  auto leader = pg->leader;
353
55.4k
  auto early_exit_allowed = pg->early_exit_allowed;
354
355
55.4k
  if (pg->running.load(std::memory_order_acquire) > 1 && pg->running-- > 1) {
356
    // we're not the last one
357
43.0k
    AwaitState(w, STATE_COMPLETED, &ctx);
358
359
    // Caller only needs to perform exit duties if early exit doesn't
360
    // apply and this is the leader.  Can't touch pg here.  Whoever set
361
    // our state to STATE_COMPLETED copied pg->status to w.status for us.
362
43.0k
    return w == leader && !(early_exit_allowed && w->status.ok());
363
43.0k
  }
364
  // else we're the last parallel worker
365
366
12.4k
  if (w == leader || (early_exit_allowed && pg->status.ok())) {
367
    // this thread should perform exit duties
368
12.4k
    w->status = pg->status;
369
12.4k
    return true;
370
18.4E
  } else {
371
    // We're the last parallel follower but early commit is not
372
    // applicable.  Wake up the leader and then wait for it to exit.
373
18.4E
    assert(w->state == STATE_PARALLEL_FOLLOWER);
374
18.4E
    SetState(leader, STATE_COMPLETED);
375
18.4E
    AwaitState(w, STATE_COMPLETED, &ctx);
376
18.4E
    return false;
377
18.4E
  }
378
12.3k
}
379
380
11.8k
void WriteThread::EarlyExitParallelGroup(Writer* w) {
381
11.8k
  auto* pg = w->parallel_group;
382
383
11.8k
  assert(w->state == STATE_PARALLEL_FOLLOWER);
384
11.8k
  assert(pg->status.ok());
385
11.8k
  ExitAsBatchGroupLeader(pg->leader, pg->last_writer, pg->status);
386
11.8k
  assert(w->status.ok());
387
11.8k
  assert(w->state == STATE_COMPLETED);
388
11.8k
  SetState(pg->leader, STATE_COMPLETED);
389
11.8k
}
390
391
void WriteThread::ExitAsBatchGroupLeader(Writer* leader, Writer* last_writer,
392
23.4M
                                         Status status) {
393
23.4M
  assert(leader->link_older == nullptr);
394
395
23.4M
  Writer* head = newest_writer_.load(std::memory_order_acquire);
396
23.4M
  if (head != last_writer ||
397
23.0M
      !newest_writer_.compare_exchange_strong(head, nullptr)) {
398
    // Either w wasn't the head during the load(), or it was the head
399
    // during the load() but somebody else pushed onto the list before
400
    // we did the compare_exchange_strong (causing it to fail).  In the
401
    // latter case compare_exchange_strong has the effect of re-reading
402
    // its first param (head).  No need to retry a failing CAS, because
403
    // only a departing leader (which we are at the moment) can remove
404
    // nodes from the list.
405
364k
    assert(head != last_writer);
406
407
    // After walking link_older starting from head (if not already done)
408
    // we will be able to traverse w->link_newer below. This function
409
    // can only be called from an active leader, only a leader can
410
    // clear newest_writer_, we didn't, and only a clear newest_writer_
411
    // could cause the next leader to start their work without a call
412
    // to MarkJoined, so we can definitely conclude that no other leader
413
    // work is going on here (with or without db mutex).
414
364k
    CreateMissingNewerLinks(head);
415
364k
    assert(last_writer->link_newer->link_older == last_writer);
416
364k
    last_writer->link_newer->link_older = nullptr;
417
418
    // Next leader didn't self-identify, because newest_writer_ wasn't
419
    // nullptr when they enqueued (we were definitely enqueued before them
420
    // and are still in the list).  That means leader handoff occurs when
421
    // we call MarkJoined
422
364k
    SetState(last_writer->link_newer, STATE_GROUP_LEADER);
423
364k
  }
424
  // else nobody else was waiting, although there might already be a new
425
  // leader now
426
427
24.7M
  while (last_writer != leader) {
428
1.28M
    last_writer->status = status;
429
    // we need to read link_older before calling SetState, because as soon
430
    // as it is marked committed the other thread's Await may return and
431
    // deallocate the Writer.
432
1.28M
    auto next = last_writer->link_older;
433
1.28M
    SetState(last_writer, STATE_COMPLETED);
434
435
1.28M
    last_writer = next;
436
1.28M
  }
437
23.4M
}
438
439
680k
void WriteThread::EnterUnbatched(Writer* w, InstrumentedMutex* mu) {
440
680k
  static AdaptationContext ctx("EnterUnbatched");
441
442
680k
  assert(w->batch == nullptr);
443
680k
  bool linked_as_leader;
444
680k
  LinkOne(w, &linked_as_leader);
445
680k
  if (!linked_as_leader) {
446
1.62k
    mu->Unlock();
447
1.62k
    TEST_SYNC_POINT("WriteThread::EnterUnbatched:Wait");
448
1.62k
    AwaitState(w, STATE_GROUP_LEADER, &ctx);
449
1.62k
    mu->Lock();
450
1.62k
  }
451
680k
}
452
453
680k
void WriteThread::ExitUnbatched(Writer* w) {
454
680k
  Status dummy_status;
455
680k
  ExitAsBatchGroupLeader(w, w, dummy_status);
456
680k
}
457
458
}  // namespace rocksdb