YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/rocksdb/db/write_thread.h
Line
Count
Source (jump to first uncovered line)
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under the BSD-style license found in the
3
//  LICENSE file in the root directory of this source tree. An additional grant
4
//  of patent rights can be found in the PATENTS file in the same directory.
5
//
6
// The following only applies to changes made to this file as part of YugaByte development.
7
//
8
// Portions Copyright (c) YugaByte, Inc.
9
//
10
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
11
// in compliance with the License.  You may obtain a copy of the License at
12
//
13
// http://www.apache.org/licenses/LICENSE-2.0
14
//
15
// Unless required by applicable law or agreed to in writing, software distributed under the License
16
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
17
// or implied.  See the License for the specific language governing permissions and limitations
18
// under the License.
19
//
20
21
#ifndef YB_ROCKSDB_DB_WRITE_THREAD_H
22
#define YB_ROCKSDB_DB_WRITE_THREAD_H
23
24
#pragma once
25
26
#include <assert.h>
27
#include <stdint.h>
28
29
#include <atomic>
30
#include <chrono>
31
#include <condition_variable>
32
#include <mutex>
33
#include <stack>
34
#include <string>
35
#include <type_traits>
36
#include <vector>
37
38
#include "yb/rocksdb/db/write_callback.h"
39
#include "yb/rocksdb/status.h"
40
#include "yb/rocksdb/types.h"
41
#include "yb/rocksdb/util/autovector.h"
42
#include "yb/rocksdb/util/instrumented_mutex.h"
43
#include "yb/rocksdb/write_batch_base.h"
44
45
namespace rocksdb {
46
47
class WriteThread {
48
 public:
49
  enum State : uint8_t {
50
    // The initial state of a writer.  This is a Writer that is
51
    // waiting in JoinBatchGroup.  This state can be left when another
52
    // thread informs the waiter that it has become a group leader
53
    // (-> STATE_GROUP_LEADER), when a leader that has chosen to be
54
    // non-parallel informs a follower that its writes have been committed
55
    // (-> STATE_COMPLETED), or when a leader that has chosen to perform
56
    // updates in parallel and needs this Writer to apply its batch (->
57
    // STATE_PARALLEL_FOLLOWER).
58
    STATE_INIT = 1,
59
60
    // The state used to inform a waiting Writer that it has become the
61
    // leader, and it should now build a write batch group.  Tricky:
62
    // this state is not used if newest_writer_ is empty when a writer
63
    // enqueues itself, because there is no need to wait (or even to
64
    // create the mutex and condvar used to wait) in that case.  This is
65
    // a terminal state unless the leader chooses to make this a parallel
66
    // batch, in which case the last parallel worker to finish will move
67
    // the leader to STATE_COMPLETED.
68
    STATE_GROUP_LEADER = 2,
69
70
    // A Writer that has returned as a follower in a parallel group.
71
    // It should apply its batch to the memtable and then call
72
    // CompleteParallelWorker.  When someone calls ExitAsBatchGroupLeader
73
    // or EarlyExitParallelGroup this state will get transitioned to
74
    // STATE_COMPLETED.
75
    STATE_PARALLEL_FOLLOWER = 4,
76
77
    // A follower whose writes have been applied, or a parallel leader
78
    // whose followers have all finished their work.  This is a terminal
79
    // state.
80
    STATE_COMPLETED = 8,
81
82
    // A state indicating that the thread may be waiting using StateMutex()
83
    // and StateCondVar()
84
    STATE_LOCKED_WAITING = 16,
85
  };
86
87
  struct Writer;
88
89
  struct ParallelGroup {
90
    Writer* leader;
91
    Writer* last_writer;
92
    SequenceNumber last_sequence;
93
    bool early_exit_allowed;
94
    // before running goes to zero, status needs leader->StateMutex()
95
    Status status;
96
    std::atomic<uint32_t> running;
97
  };
98
99
  // Information kept for every waiting writer.
100
  struct Writer {
101
    WriteBatch* batch;
102
    bool sync;
103
    bool disableWAL;
104
    bool in_batch_group;
105
    WriteCallback* callback;
106
    bool made_waitable;          // records lazy construction of mutex and cv
107
    std::atomic<uint8_t> state;  // write under StateMutex() or pre-link
108
    ParallelGroup* parallel_group;
109
    SequenceNumber sequence;  // the sequence number to use
110
    Status status;            // status of memtable inserter
111
    Status callback_status;   // status returned by callback->Callback()
112
    std::aligned_storage<sizeof(std::mutex)>::type state_mutex_bytes;
113
    std::aligned_storage<sizeof(std::condition_variable)>::type state_cv_bytes;
114
    Writer* link_older;  // read/write only before linking, or as leader
115
    Writer* link_newer;  // lazy, read/write only before linking, or as leader
116
117
    Writer()
118
        : batch(nullptr),
119
          sync(false),
120
          disableWAL(false),
121
          in_batch_group(false),
122
          callback(nullptr),
123
          made_waitable(false),
124
          state(STATE_INIT),
125
          parallel_group(nullptr),
126
          link_older(nullptr),
127
30.9M
          link_newer(nullptr) {}
128
129
30.8M
    ~Writer() {
130
30.8M
      if (made_waitable) {
131
1.50M
        StateMutex().~mutex();
132
1.50M
        StateCV().~condition_variable();
133
1.50M
      }
134
30.8M
    }
135
136
29.1M
    bool CheckCallback(DB* db) {
137
29.1M
      if (callback != nullptr) {
138
353
        callback_status = callback->Callback(db);
139
353
      }
140
29.1M
      return callback_status.ok();
141
29.1M
    }
142
143
1.51M
    void CreateMutex() {
144
1.51M
      if (!made_waitable) {
145
        // Note that made_waitable is tracked separately from state
146
        // transitions, because we can't atomically create the mutex and
147
        // link into the list.
148
1.50M
        made_waitable = true;
149
1.50M
        new (&state_mutex_bytes) std::mutex;
150
1.50M
        new (&state_cv_bytes) std::condition_variable;
151
1.50M
      }
152
1.51M
    }
153
154
    // returns the aggregate status of this Writer
155
29.1M
    Status FinalStatus() {
156
29.1M
      if (!status.ok()) {
157
        // a non-ok memtable write status takes presidence
158
0
        assert(callback == nullptr || callback_status.ok());
159
0
        return status;
160
29.1M
      } else if (!callback_status.ok()) {
161
        // if the callback failed then that is the status we want
162
        // because a memtable insert should not have been attempted
163
155
        assert(callback != nullptr);
164
0
        assert(status.ok());
165
0
        return callback_status;
166
29.1M
      } else {
167
        // if there is no callback then we only care about
168
        // the memtable insert status
169
29.1M
        assert(callback == nullptr || callback_status.ok());
170
0
        return status;
171
29.1M
      }
172
29.1M
    }
173
174
46.6M
    bool CallbackFailed() {
175
46.6M
      return (callback != nullptr) && 
!callback_status.ok()866
;
176
46.6M
    }
177
178
    // No other mutexes may be acquired while holding StateMutex(), it is
179
    // always last in the order
180
4.54M
    std::mutex& StateMutex() {
181
4.54M
      assert(made_waitable);
182
0
      return *static_cast<std::mutex*>(static_cast<void*>(&state_mutex_bytes));
183
4.54M
    }
184
185
4.52M
    std::condition_variable& StateCV() {
186
4.52M
      assert(made_waitable);
187
0
      return *static_cast<std::condition_variable*>(
188
4.52M
                 static_cast<void*>(&state_cv_bytes));
189
4.52M
    }
190
  };
191
192
  WriteThread(uint64_t max_yield_usec, uint64_t slow_yield_usec);
193
194
  // IMPORTANT: None of the methods in this class rely on the db mutex
195
  // for correctness. All of the methods except JoinBatchGroup and
196
  // EnterUnbatched may be called either with or without the db mutex held.
197
  // Correctness is maintained by ensuring that only a single thread is
198
  // a leader at a time.
199
200
  // Registers w as ready to become part of a batch group, waits until the
201
  // caller should perform some work, and returns the current state of the
202
  // writer.  If w has become the leader of a write batch group, returns
203
  // STATE_GROUP_LEADER.  If w has been made part of a sequential batch
204
  // group and the leader has performed the write, returns STATE_DONE.
205
  // If w has been made part of a parallel batch group and is reponsible
206
  // for updating the memtable, returns STATE_PARALLEL_FOLLOWER.
207
  //
208
  // The db mutex SHOULD NOT be held when calling this function, because
209
  // it will block.
210
  //
211
  // Writer* w:        Writer to be executed as part of a batch group
212
  void JoinBatchGroup(Writer* w);
213
214
  // Constructs a write batch group led by leader, which should be a
215
  // Writer passed to JoinBatchGroup on the current thread.
216
  //
217
  // Writer* leader:         Writer that is STATE_GROUP_LEADER
218
  // Writer** last_writer:   Out-param that identifies the last follower
219
  // autovector<WriteBatch*>* write_batch_group: Out-param of group members
220
  // returns:                Total batch group byte size
221
  size_t EnterAsBatchGroupLeader(
222
      Writer* leader, Writer** last_writer,
223
      autovector<WriteThread::Writer*>* write_batch_group);
224
225
  // Causes JoinBatchGroup to return STATE_PARALLEL_FOLLOWER for all of the
226
  // non-leader members of this write batch group.  Sets Writer::sequence
227
  // before waking them up.
228
  //
229
  // ParallalGroup* pg:       Extra state used to coordinate the parallel add
230
  // SequenceNumber sequence: Starting sequence number to assign to Writer-s
231
  void LaunchParallelFollowers(ParallelGroup* pg, SequenceNumber sequence);
232
233
  // Reports the completion of w's batch to the parallel group leader, and
234
  // waits for the rest of the parallel batch to complete.  Returns true
235
  // if this thread is the last to complete, and hence should advance
236
  // the sequence number and then call EarlyExitParallelGroup, false if
237
  // someone else has already taken responsibility for that.
238
  bool CompleteParallelWorker(Writer* w);
239
240
  // This method performs an early completion of a parallel write group,
241
  // where the cleanup work of the leader is performed by a follower who
242
  // happens to be the last parallel worker to complete.
243
  void EarlyExitParallelGroup(Writer* w);
244
245
  // Unlinks the Writer-s in a batch group, wakes up the non-leaders,
246
  // and wakes up the next leader (if any).
247
  //
248
  // Writer* leader:         From EnterAsBatchGroupLeader
249
  // Writer* last_writer:    Value of out-param of EnterAsBatchGroupLeader
250
  // Status status:          Status of write operation
251
  void ExitAsBatchGroupLeader(Writer* leader, Writer* last_writer,
252
                              Status status);
253
254
  // Waits for all preceding writers (unlocking mu while waiting), then
255
  // registers w as the currently proceeding writer.
256
  //
257
  // Writer* w:              A Writer not eligible for batching
258
  // InstrumentedMutex* mu:  The db mutex, to unlock while waiting
259
  // REQUIRES: db mutex held
260
  void EnterUnbatched(Writer* w, InstrumentedMutex* mu);
261
262
  // Completes a Writer begun with EnterUnbatched, unblocking subsequent
263
  // writers.
264
  void ExitUnbatched(Writer* w);
265
266
  struct AdaptationContext {
267
    const char* name;
268
    std::atomic<int32_t> value;
269
270
26.9k
    explicit AdaptationContext(const char* name0) : name(name0), value(0) {}
271
  };
272
273
 private:
274
  uint64_t max_yield_usec_;
275
  uint64_t slow_yield_usec_;
276
277
  // Points to the newest pending Writer.  Only leader can remove
278
  // elements, adding can be done lock-free by anybody
279
  std::atomic<Writer*> newest_writer_;
280
281
  // Waits for w->state & goal_mask using w->StateMutex().  Returns
282
  // the state that satisfies goal_mask.
283
  uint8_t BlockingAwaitState(Writer* w, uint8_t goal_mask);
284
285
  // Blocks until w->state & goal_mask, returning the state value
286
  // that satisfied the predicate.  Uses ctx to adaptively use
287
  // std::this_thread::yield() to avoid mutex overheads.  ctx should be
288
  // a context-dependent static.
289
  uint8_t AwaitState(Writer* w, uint8_t goal_mask, AdaptationContext* ctx);
290
291
  void SetState(Writer* w, uint8_t new_state);
292
293
  // Links w into the newest_writer_ list. Sets *linked_as_leader to
294
  // true if w was linked directly into the leader position.  Safe to
295
  // call from multiple threads without external locking.
296
  void LinkOne(Writer* w, bool* linked_as_leader);
297
298
  // Computes any missing link_newer links.  Should not be called
299
  // concurrently with itself.
300
  void CreateMissingNewerLinks(Writer* head);
301
};
302
303
}  // namespace rocksdb
304
305
#endif // YB_ROCKSDB_DB_WRITE_THREAD_H