YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/rocksdb/db/write_thread.cc
Line
Count
Source (jump to first uncovered line)
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under the BSD-style license found in the
3
//  LICENSE file in the root directory of this source tree. An additional grant
4
//  of patent rights can be found in the PATENTS file in the same directory.
5
//
6
// The following only applies to changes made to this file as part of YugaByte development.
7
//
8
// Portions Copyright (c) YugaByte, Inc.
9
//
10
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
11
// in compliance with the License.  You may obtain a copy of the License at
12
//
13
// http://www.apache.org/licenses/LICENSE-2.0
14
//
15
// Unless required by applicable law or agreed to in writing, software distributed under the License
16
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
17
// or implied.  See the License for the specific language governing permissions and limitations
18
// under the License.
19
//
20
21
#include "yb/rocksdb/db/write_thread.h"
22
#include <thread>
23
24
#include <chrono>
25
#include <limits>
26
27
#include "yb/rocksdb/db/write_batch_internal.h"
28
#include "yb/rocksdb/port/port.h"
29
#include "yb/rocksdb/util/random.h"
30
#include "yb/rocksdb/util/sync_point.h"
31
32
namespace rocksdb {
33
34
WriteThread::WriteThread(uint64_t max_yield_usec, uint64_t slow_yield_usec)
35
    : max_yield_usec_(max_yield_usec),
36
      slow_yield_usec_(slow_yield_usec),
37
435k
      newest_writer_(nullptr) {}
38
39
1.51M
uint8_t WriteThread::BlockingAwaitState(Writer* w, uint8_t goal_mask) {
40
  // We're going to block.  Lazily create the mutex.  We guarantee
41
  // propagation of this construction to the waker via the
42
  // STATE_LOCKED_WAITING state.  The waker won't try to touch the mutex
43
  // or the condvar unless they CAS away the STATE_LOCKED_WAITING that
44
  // we install below.
45
1.51M
  w->CreateMutex();
46
47
1.51M
  auto state = w->state.load(std::memory_order_acquire);
48
1.51M
  assert(state != STATE_LOCKED_WAITING);
49
1.51M
  if ((state & goal_mask) == 0 &&
50
1.52M
      
w->state.compare_exchange_strong(state, STATE_LOCKED_WAITING)1.50M
) {
51
    // we have permission (and an obligation) to use StateMutex
52
1.52M
    std::unique_lock<std::mutex> guard(w->StateMutex());
53
3.01M
    w->StateCV().wait(guard, [w] {
54
3.01M
      return w->state.load(std::memory_order_relaxed) != STATE_LOCKED_WAITING;
55
3.01M
    });
56
1.52M
    state = w->state.load(std::memory_order_relaxed);
57
1.52M
  }
58
  // else tricky.  Goal is met or CAS failed.  In the latter case the waker
59
  // must have changed the state, and compare_exchange_strong has updated
60
  // our local variable with the new one.  At the moment WriteThread never
61
  // waits for a transition across intermediate states, so we know that
62
  // since a state change has occurred the goal must have been met.
63
1.51M
  assert((state & goal_mask) != 0);
64
0
  return state;
65
1.51M
}
66
67
uint8_t WriteThread::AwaitState(Writer* w, uint8_t goal_mask,
68
1.66M
                                AdaptationContext* ctx) {
69
1.66M
  uint8_t state;
70
71
  // On a modern Xeon each loop takes about 7 nanoseconds (most of which
72
  // is the effect of the pause instruction), so 200 iterations is a bit
73
  // more than a microsecond.  This is long enough that waits longer than
74
  // this can amortize the cost of accessing the clock and yielding.
75
269M
  for (uint32_t tries = 0; tries < 200; 
++tries268M
) {
76
268M
    state = w->state.load(std::memory_order_acquire);
77
268M
    if ((state & goal_mask) != 0) {
78
110k
      return state;
79
110k
    }
80
268M
    port::AsmVolatilePause();
81
268M
  }
82
83
  // If we're only going to end up waiting a short period of time,
84
  // it can be a lot more efficient to call std::this_thread::yield()
85
  // in a loop than to block in StateMutex().  For reference, on my 4.0
86
  // SELinux test server with support for syscall auditing enabled, the
87
  // minimum latency between FUTEX_WAKE to returning from FUTEX_WAIT is
88
  // 2.7 usec, and the average is more like 10 usec.  That can be a big
89
  // drag on RockDB's single-writer design.  Of course, spinning is a
90
  // bad idea if other threads are waiting to run or if we're going to
91
  // wait for a long time.  How do we decide?
92
  //
93
  // We break waiting into 3 categories: short-uncontended,
94
  // short-contended, and long.  If we had an oracle, then we would always
95
  // spin for short-uncontended, always block for long, and our choice for
96
  // short-contended might depend on whether we were trying to optimize
97
  // RocksDB throughput or avoid being greedy with system resources.
98
  //
99
  // Bucketing into short or long is easy by measuring elapsed time.
100
  // Differentiating short-uncontended from short-contended is a bit
101
  // trickier, but not too bad.  We could look for involuntary context
102
  // switches using getrusage(RUSAGE_THREAD, ..), but it's less work
103
  // (portability code and CPU) to just look for yield calls that take
104
  // longer than we expect.  sched_yield() doesn't actually result in any
105
  // context switch overhead if there are no other runnable processes
106
  // on the current core, in which case it usually takes less than
107
  // a microsecond.
108
  //
109
  // There are two primary tunables here: the threshold between "short"
110
  // and "long" waits, and the threshold at which we suspect that a yield
111
  // is slow enough to indicate we should probably block.  If these
112
  // thresholds are chosen well then CPU-bound workloads that don't
113
  // have more threads than cores will experience few context switches
114
  // (voluntary or involuntary), and the total number of context switches
115
  // (voluntary and involuntary) will not be dramatically larger (maybe
116
  // 2x) than the number of voluntary context switches that occur when
117
  // --max_yield_wait_micros=0.
118
  //
119
  // There's another constant, which is the number of slow yields we will
120
  // tolerate before reversing our previous decision.  Solitary slow
121
  // yields are pretty common (low-priority small jobs ready to run),
122
  // so this should be at least 2.  We set this conservatively to 3 so
123
  // that we can also immediately schedule a ctx adaptation, rather than
124
  // waiting for the next update_ctx.
125
126
1.55M
  const size_t kMaxSlowYieldsWhileSpinning = 3;
127
128
1.55M
  bool update_ctx = false;
129
1.55M
  bool would_spin_again = false;
130
131
1.55M
  if (max_yield_usec_ > 0) {
132
63.0k
    update_ctx = Random::GetTLSInstance()->OneIn(256);
133
134
63.0k
    if (update_ctx || 
ctx->value.load(std::memory_order_relaxed) >= 062.8k
) {
135
      // we're updating the adaptation statistics, or spinning has >
136
      // 50% chance of being shorter than max_yield_usec_ and causing no
137
      // involuntary context switches
138
340
      auto spin_begin = std::chrono::steady_clock::now();
139
140
      // this variable doesn't include the final yield (if any) that
141
      // causes the goal to be met
142
340
      size_t slow_yield_count = 0;
143
144
340
      auto iter_begin = spin_begin;
145
36.5k
      while ((iter_begin - spin_begin) <=
146
36.5k
             std::chrono::microseconds(max_yield_usec_)) {
147
36.4k
        std::this_thread::yield();
148
149
36.4k
        state = w->state.load(std::memory_order_acquire);
150
36.4k
        if ((state & goal_mask) != 0) {
151
          // success
152
102
          would_spin_again = true;
153
102
          break;
154
102
        }
155
156
36.3k
        auto now = std::chrono::steady_clock::now();
157
36.3k
        if (now == iter_begin ||
158
36.3k
            
now - iter_begin >= std::chrono::microseconds(slow_yield_usec_)36.2k
) {
159
          // conservatively count it as a slow yield if our clock isn't
160
          // accurate enough to measure the yield duration
161
303
          ++slow_yield_count;
162
303
          if (slow_yield_count >= kMaxSlowYieldsWhileSpinning) {
163
            // Not just one ivcsw, but several.  Immediately update ctx
164
            // and fall back to blocking
165
49
            update_ctx = true;
166
49
            break;
167
49
          }
168
303
        }
169
36.2k
        iter_begin = now;
170
36.2k
      }
171
340
    }
172
63.0k
  }
173
174
1.55M
  if ((state & goal_mask) == 0) {
175
1.51M
    state = BlockingAwaitState(w, goal_mask);
176
1.51M
  }
177
178
1.55M
  if (update_ctx) {
179
287
    auto v = ctx->value.load(std::memory_order_relaxed);
180
    // fixed point exponential decay with decay constant 1/1024, with +1
181
    // and -1 scaled to avoid overflow for int32_t
182
287
    v = v + (v / 1024) + (would_spin_again ? 
170
:
-1217
) * 16384;
183
287
    ctx->value.store(v, std::memory_order_relaxed);
184
287
  }
185
186
1.55M
  assert((state & goal_mask) != 0);
187
0
  return state;
188
1.66M
}
189
190
1.67M
void WriteThread::SetState(Writer* w, uint8_t new_state) {
191
1.67M
  auto state = w->state.load(std::memory_order_acquire);
192
1.67M
  if (state == STATE_LOCKED_WAITING ||
193
1.67M
      
!w->state.compare_exchange_strong(state, new_state)151k
) {
194
1.52M
    assert(state == STATE_LOCKED_WAITING);
195
196
0
    std::lock_guard<std::mutex> guard(w->StateMutex());
197
1.52M
    assert(w->state.load(std::memory_order_relaxed) != new_state);
198
0
    w->state.store(new_state, std::memory_order_relaxed);
199
1.52M
    w->StateCV().notify_one();
200
1.52M
  }
201
1.67M
}
202
203
30.8M
void WriteThread::LinkOne(Writer* w, bool* linked_as_leader) {
204
30.8M
  assert(w->state == STATE_INIT);
205
206
0
  Writer* writers = newest_writer_.load(std::memory_order_relaxed);
207
30.8M
  while (
true30.8M
) {
208
30.8M
    w->link_older = writers;
209
30.9M
    if (
newest_writer_.compare_exchange_strong(writers, w)30.8M
) {
210
30.9M
      if (writers == nullptr) {
211
        // this isn't part of the WriteThread machinery, but helps with
212
        // debugging and is checked by an assert in WriteImpl
213
29.2M
        w->state.store(STATE_GROUP_LEADER, std::memory_order_relaxed);
214
29.2M
      }
215
30.9M
      *linked_as_leader = (writers == nullptr);
216
30.9M
      return;
217
30.9M
    }
218
30.8M
  }
219
30.8M
}
220
221
29.1M
void WriteThread::CreateMissingNewerLinks(Writer* head) {
222
30.7M
  while (true) {
223
30.7M
    Writer* next = head->link_older;
224
30.7M
    if (next == nullptr || 
next->link_newer != nullptr2.28M
) {
225
29.1M
      assert(next == nullptr || next->link_newer == head);
226
0
      break;
227
29.1M
    }
228
1.62M
    next->link_newer = head;
229
1.62M
    head = next;
230
1.62M
  }
231
29.1M
}
232
233
30.0M
void WriteThread::JoinBatchGroup(Writer* w) {
234
30.0M
  static AdaptationContext ctx("JoinBatchGroup");
235
236
30.0M
  assert(w->batch != nullptr);
237
0
  bool linked_as_leader;
238
30.0M
  LinkOne(w, &linked_as_leader);
239
240
30.0M
  TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:Wait", w);
241
242
30.0M
  if (!linked_as_leader) {
243
1.62M
    AwaitState(w,
244
1.62M
               STATE_GROUP_LEADER | STATE_PARALLEL_FOLLOWER | STATE_COMPLETED,
245
1.62M
               &ctx);
246
1.62M
    TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:DoneWaiting", w);
247
1.62M
  }
248
30.0M
}
249
250
size_t WriteThread::EnterAsBatchGroupLeader(
251
    Writer* leader, WriteThread::Writer** last_writer,
252
28.7M
    autovector<WriteThread::Writer*>* write_batch_group) {
253
28.7M
  assert(leader->link_older == nullptr);
254
0
  assert(leader->batch != nullptr);
255
256
0
  size_t size = WriteBatchInternal::ByteSize(leader->batch);
257
28.7M
  write_batch_group->push_back(leader);
258
259
  // Allow the group to grow up to a maximum size, but if the
260
  // original write is small, limit the growth so we do not slow
261
  // down the small write too much.
262
28.7M
  size_t max_size = 1 << 20;
263
28.7M
  if (size <= (128 << 10)) {
264
28.7M
    max_size = size + (128 << 10);
265
28.7M
  }
266
267
28.7M
  *last_writer = leader;
268
269
28.7M
  Writer* newest_writer = newest_writer_.load(std::memory_order_acquire);
270
271
  // This is safe regardless of any db mutex status of the caller. Previous
272
  // calls to ExitAsGroupLeader either didn't call CreateMissingNewerLinks
273
  // (they emptied the list and then we added ourself as leader) or had to
274
  // explicitly wake us up (the list was non-empty when we added ourself,
275
  // so we have already received our MarkJoined).
276
28.7M
  CreateMissingNewerLinks(newest_writer);
277
278
  // Tricky. Iteration start (leader) is exclusive and finish
279
  // (newest_writer) is inclusive. Iteration goes from old to new.
280
28.7M
  Writer* w = leader;
281
30.0M
  while (w != newest_writer) {
282
1.25M
    w = w->link_newer;
283
284
1.25M
    if (w->sync && 
!leader->sync96
) {
285
      // Do not include a sync write into a batch handled by a non-sync write.
286
0
      break;
287
0
    }
288
289
1.25M
    if (!w->disableWAL && 
leader->disableWAL1.25M
) {
290
      // Do not include a write that needs WAL into a batch that has
291
      // WAL disabled.
292
8
      break;
293
8
    }
294
295
1.25M
    if (w->batch == nullptr) {
296
      // Do not include those writes with nullptr batch. Those are not writes,
297
      // those are something else. They want to be alone
298
164
      break;
299
164
    }
300
301
1.25M
    if (w->callback != nullptr && 
!w->callback->AllowWriteBatching()192
) {
302
      // dont batch writes that don't want to be batched
303
96
      break;
304
96
    }
305
306
1.25M
    auto batch_size = WriteBatchInternal::ByteSize(w->batch);
307
1.25M
    if (size + batch_size > max_size) {
308
      // Do not make batch too big
309
0
      break;
310
0
    }
311
312
1.25M
    size += batch_size;
313
1.25M
    write_batch_group->push_back(w);
314
1.25M
    w->in_batch_group = true;
315
1.25M
    *last_writer = w;
316
1.25M
  }
317
28.7M
  return size;
318
28.7M
}
319
320
void WriteThread::LaunchParallelFollowers(ParallelGroup* pg,
321
12.2k
                                          SequenceNumber sequence) {
322
  // EnterAsBatchGroupLeader already created the links from leader to
323
  // newer writers in the group
324
325
12.2k
  pg->leader->parallel_group = pg;
326
327
12.2k
  Writer* w = pg->leader;
328
12.2k
  w->sequence = sequence;
329
330
56.0k
  while (w != pg->last_writer) {
331
    // Writers that won't write don't get sequence allotment
332
43.7k
    if (!w->CallbackFailed()) {
333
43.7k
      sequence += WriteBatchInternal::Count(w->batch);
334
43.7k
    }
335
43.7k
    w = w->link_newer;
336
337
43.7k
    w->sequence = sequence;
338
43.7k
    w->parallel_group = pg;
339
43.7k
    SetState(w, STATE_PARALLEL_FOLLOWER);
340
43.7k
  }
341
12.2k
}
342
343
55.9k
bool WriteThread::CompleteParallelWorker(Writer* w) {
344
55.9k
  static AdaptationContext ctx("CompleteParallelWorker");
345
346
55.9k
  auto* pg = w->parallel_group;
347
55.9k
  if (!w->status.ok()) {
348
0
    std::lock_guard<std::mutex> guard(w->StateMutex());
349
0
    pg->status = w->status;
350
0
  }
351
352
55.9k
  auto leader = pg->leader;
353
55.9k
  auto early_exit_allowed = pg->early_exit_allowed;
354
355
55.9k
  if (pg->running.load(std::memory_order_acquire) > 1 && 
pg->running-- > 143.7k
) {
356
    // we're not the last one
357
43.7k
    AwaitState(w, STATE_COMPLETED, &ctx);
358
359
    // Caller only needs to perform exit duties if early exit doesn't
360
    // apply and this is the leader.  Can't touch pg here.  Whoever set
361
    // our state to STATE_COMPLETED copied pg->status to w.status for us.
362
43.7k
    return w == leader && 
!(11.9k
early_exit_allowed11.9k
&&
w->status.ok()11.9k
);
363
43.7k
  }
364
  // else we're the last parallel worker
365
366
12.2k
  
if (12.1k
w == leader12.1k
||
(11.9k
early_exit_allowed11.9k
&&
pg->status.ok()11.9k
)) {
367
    // this thread should perform exit duties
368
12.2k
    w->status = pg->status;
369
12.2k
    return true;
370
18.4E
  } else {
371
    // We're the last parallel follower but early commit is not
372
    // applicable.  Wake up the leader and then wait for it to exit.
373
18.4E
    assert(w->state == STATE_PARALLEL_FOLLOWER);
374
0
    SetState(leader, STATE_COMPLETED);
375
18.4E
    AwaitState(w, STATE_COMPLETED, &ctx);
376
18.4E
    return false;
377
18.4E
  }
378
12.1k
}
379
380
11.9k
void WriteThread::EarlyExitParallelGroup(Writer* w) {
381
11.9k
  auto* pg = w->parallel_group;
382
383
11.9k
  assert(w->state == STATE_PARALLEL_FOLLOWER);
384
0
  assert(pg->status.ok());
385
0
  ExitAsBatchGroupLeader(pg->leader, pg->last_writer, pg->status);
386
11.9k
  assert(w->status.ok());
387
0
  assert(w->state == STATE_COMPLETED);
388
0
  SetState(pg->leader, STATE_COMPLETED);
389
11.9k
}
390
391
void WriteThread::ExitAsBatchGroupLeader(Writer* leader, Writer* last_writer,
392
29.6M
                                         Status status) {
393
29.6M
  assert(leader->link_older == nullptr);
394
395
0
  Writer* head = newest_writer_.load(std::memory_order_acquire);
396
29.6M
  if (head != last_writer ||
397
29.6M
      
!newest_writer_.compare_exchange_strong(head, nullptr)29.2M
) {
398
    // Either w wasn't the head during the load(), or it was the head
399
    // during the load() but somebody else pushed onto the list before
400
    // we did the compare_exchange_strong (causing it to fail).  In the
401
    // latter case compare_exchange_strong has the effect of re-reading
402
    // its first param (head).  No need to retry a failing CAS, because
403
    // only a departing leader (which we are at the moment) can remove
404
    // nodes from the list.
405
365k
    assert(head != last_writer);
406
407
    // After walking link_older starting from head (if not already done)
408
    // we will be able to traverse w->link_newer below. This function
409
    // can only be called from an active leader, only a leader can
410
    // clear newest_writer_, we didn't, and only a clear newest_writer_
411
    // could cause the next leader to start their work without a call
412
    // to MarkJoined, so we can definitely conclude that no other leader
413
    // work is going on here (with or without db mutex).
414
0
    CreateMissingNewerLinks(head);
415
365k
    assert(last_writer->link_newer->link_older == last_writer);
416
0
    last_writer->link_newer->link_older = nullptr;
417
418
    // Next leader didn't self-identify, because newest_writer_ wasn't
419
    // nullptr when they enqueued (we were definitely enqueued before them
420
    // and are still in the list).  That means leader handoff occurs when
421
    // we call MarkJoined
422
365k
    SetState(last_writer->link_newer, STATE_GROUP_LEADER);
423
365k
  }
424
  // else nobody else was waiting, although there might already be a new
425
  // leader now
426
427
30.9M
  while (last_writer != leader) {
428
1.25M
    last_writer->status = status;
429
    // we need to read link_older before calling SetState, because as soon
430
    // as it is marked committed the other thread's Await may return and
431
    // deallocate the Writer.
432
1.25M
    auto next = last_writer->link_older;
433
1.25M
    SetState(last_writer, STATE_COMPLETED);
434
435
1.25M
    last_writer = next;
436
1.25M
  }
437
29.6M
}
438
439
869k
void WriteThread::EnterUnbatched(Writer* w, InstrumentedMutex* mu) {
440
869k
  static AdaptationContext ctx("EnterUnbatched");
441
442
869k
  assert(w->batch == nullptr);
443
0
  bool linked_as_leader;
444
869k
  LinkOne(w, &linked_as_leader);
445
869k
  if (!linked_as_leader) {
446
1.72k
    mu->Unlock();
447
1.72k
    TEST_SYNC_POINT("WriteThread::EnterUnbatched:Wait");
448
1.72k
    AwaitState(w, STATE_GROUP_LEADER, &ctx);
449
1.72k
    mu->Lock();
450
1.72k
  }
451
869k
}
452
453
869k
void WriteThread::ExitUnbatched(Writer* w) {
454
869k
  Status dummy_status;
455
869k
  ExitAsBatchGroupLeader(w, w, dummy_status);
456
869k
}
457
458
}  // namespace rocksdb