/Users/deen/code/yugabyte-db/src/yb/rocksdb/db/write_thread.h
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | | // This source code is licensed under the BSD-style license found in the |
3 | | // LICENSE file in the root directory of this source tree. An additional grant |
4 | | // of patent rights can be found in the PATENTS file in the same directory. |
5 | | // |
6 | | // The following only applies to changes made to this file as part of YugaByte development. |
7 | | // |
8 | | // Portions Copyright (c) YugaByte, Inc. |
9 | | // |
10 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
11 | | // in compliance with the License. You may obtain a copy of the License at |
12 | | // |
13 | | // http://www.apache.org/licenses/LICENSE-2.0 |
14 | | // |
15 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
16 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
17 | | // or implied. See the License for the specific language governing permissions and limitations |
18 | | // under the License. |
19 | | // |
20 | | |
21 | | #ifndef YB_ROCKSDB_DB_WRITE_THREAD_H |
22 | | #define YB_ROCKSDB_DB_WRITE_THREAD_H |
23 | | |
24 | | #pragma once |
25 | | |
26 | | #include <assert.h> |
27 | | #include <stdint.h> |
28 | | |
29 | | #include <atomic> |
30 | | #include <chrono> |
31 | | #include <condition_variable> |
32 | | #include <mutex> |
33 | | #include <stack> |
34 | | #include <string> |
35 | | #include <type_traits> |
36 | | #include <vector> |
37 | | |
38 | | #include "yb/rocksdb/db/write_callback.h" |
39 | | #include "yb/rocksdb/status.h" |
40 | | #include "yb/rocksdb/types.h" |
41 | | #include "yb/rocksdb/util/autovector.h" |
42 | | #include "yb/rocksdb/util/instrumented_mutex.h" |
43 | | #include "yb/rocksdb/write_batch_base.h" |
44 | | |
45 | | namespace rocksdb { |
46 | | |
47 | | class WriteThread { |
48 | | public: |
49 | | enum State : uint8_t { |
50 | | // The initial state of a writer. This is a Writer that is |
51 | | // waiting in JoinBatchGroup. This state can be left when another |
52 | | // thread informs the waiter that it has become a group leader |
53 | | // (-> STATE_GROUP_LEADER), when a leader that has chosen to be |
54 | | // non-parallel informs a follower that its writes have been committed |
55 | | // (-> STATE_COMPLETED), or when a leader that has chosen to perform |
56 | | // updates in parallel and needs this Writer to apply its batch (-> |
57 | | // STATE_PARALLEL_FOLLOWER). |
58 | | STATE_INIT = 1, |
59 | | |
60 | | // The state used to inform a waiting Writer that it has become the |
61 | | // leader, and it should now build a write batch group. Tricky: |
62 | | // this state is not used if newest_writer_ is empty when a writer |
63 | | // enqueues itself, because there is no need to wait (or even to |
64 | | // create the mutex and condvar used to wait) in that case. This is |
65 | | // a terminal state unless the leader chooses to make this a parallel |
66 | | // batch, in which case the last parallel worker to finish will move |
67 | | // the leader to STATE_COMPLETED. |
68 | | STATE_GROUP_LEADER = 2, |
69 | | |
70 | | // A Writer that has returned as a follower in a parallel group. |
71 | | // It should apply its batch to the memtable and then call |
72 | | // CompleteParallelWorker. When someone calls ExitAsBatchGroupLeader |
73 | | // or EarlyExitParallelGroup this state will get transitioned to |
74 | | // STATE_COMPLETED. |
75 | | STATE_PARALLEL_FOLLOWER = 4, |
76 | | |
77 | | // A follower whose writes have been applied, or a parallel leader |
78 | | // whose followers have all finished their work. This is a terminal |
79 | | // state. |
80 | | STATE_COMPLETED = 8, |
81 | | |
82 | | // A state indicating that the thread may be waiting using StateMutex() |
83 | | // and StateCondVar() |
84 | | STATE_LOCKED_WAITING = 16, |
85 | | }; |
86 | | |
87 | | struct Writer; |
88 | | |
89 | | struct ParallelGroup { |
90 | | Writer* leader; |
91 | | Writer* last_writer; |
92 | | SequenceNumber last_sequence; |
93 | | bool early_exit_allowed; |
94 | | // before running goes to zero, status needs leader->StateMutex() |
95 | | Status status; |
96 | | std::atomic<uint32_t> running; |
97 | | }; |
98 | | |
99 | | // Information kept for every waiting writer. |
100 | | struct Writer { |
101 | | WriteBatch* batch; |
102 | | bool sync; |
103 | | bool disableWAL; |
104 | | bool in_batch_group; |
105 | | WriteCallback* callback; |
106 | | bool made_waitable; // records lazy construction of mutex and cv |
107 | | std::atomic<uint8_t> state; // write under StateMutex() or pre-link |
108 | | ParallelGroup* parallel_group; |
109 | | SequenceNumber sequence; // the sequence number to use |
110 | | Status status; // status of memtable inserter |
111 | | Status callback_status; // status returned by callback->Callback() |
112 | | std::aligned_storage<sizeof(std::mutex)>::type state_mutex_bytes; |
113 | | std::aligned_storage<sizeof(std::condition_variable)>::type state_cv_bytes; |
114 | | Writer* link_older; // read/write only before linking, or as leader |
115 | | Writer* link_newer; // lazy, read/write only before linking, or as leader |
116 | | |
117 | | Writer() |
118 | | : batch(nullptr), |
119 | | sync(false), |
120 | | disableWAL(false), |
121 | | in_batch_group(false), |
122 | | callback(nullptr), |
123 | | made_waitable(false), |
124 | | state(STATE_INIT), |
125 | | parallel_group(nullptr), |
126 | | link_older(nullptr), |
127 | 30.9M | link_newer(nullptr) {} |
128 | | |
129 | 30.8M | ~Writer() { |
130 | 30.8M | if (made_waitable) { |
131 | 1.50M | StateMutex().~mutex(); |
132 | 1.50M | StateCV().~condition_variable(); |
133 | 1.50M | } |
134 | 30.8M | } |
135 | | |
136 | 29.1M | bool CheckCallback(DB* db) { |
137 | 29.1M | if (callback != nullptr) { |
138 | 353 | callback_status = callback->Callback(db); |
139 | 353 | } |
140 | 29.1M | return callback_status.ok(); |
141 | 29.1M | } |
142 | | |
143 | 1.51M | void CreateMutex() { |
144 | 1.51M | if (!made_waitable) { |
145 | | // Note that made_waitable is tracked separately from state |
146 | | // transitions, because we can't atomically create the mutex and |
147 | | // link into the list. |
148 | 1.50M | made_waitable = true; |
149 | 1.50M | new (&state_mutex_bytes) std::mutex; |
150 | 1.50M | new (&state_cv_bytes) std::condition_variable; |
151 | 1.50M | } |
152 | 1.51M | } |
153 | | |
154 | | // returns the aggregate status of this Writer |
155 | 29.1M | Status FinalStatus() { |
156 | 29.1M | if (!status.ok()) { |
157 | | // a non-ok memtable write status takes presidence |
158 | 0 | assert(callback == nullptr || callback_status.ok()); |
159 | 0 | return status; |
160 | 29.1M | } else if (!callback_status.ok()) { |
161 | | // if the callback failed then that is the status we want |
162 | | // because a memtable insert should not have been attempted |
163 | 155 | assert(callback != nullptr); |
164 | 0 | assert(status.ok()); |
165 | 0 | return callback_status; |
166 | 29.1M | } else { |
167 | | // if there is no callback then we only care about |
168 | | // the memtable insert status |
169 | 29.1M | assert(callback == nullptr || callback_status.ok()); |
170 | 0 | return status; |
171 | 29.1M | } |
172 | 29.1M | } |
173 | | |
174 | 46.6M | bool CallbackFailed() { |
175 | 46.6M | return (callback != nullptr) && !callback_status.ok()866 ; |
176 | 46.6M | } |
177 | | |
178 | | // No other mutexes may be acquired while holding StateMutex(), it is |
179 | | // always last in the order |
180 | 4.54M | std::mutex& StateMutex() { |
181 | 4.54M | assert(made_waitable); |
182 | 0 | return *static_cast<std::mutex*>(static_cast<void*>(&state_mutex_bytes)); |
183 | 4.54M | } |
184 | | |
185 | 4.52M | std::condition_variable& StateCV() { |
186 | 4.52M | assert(made_waitable); |
187 | 0 | return *static_cast<std::condition_variable*>( |
188 | 4.52M | static_cast<void*>(&state_cv_bytes)); |
189 | 4.52M | } |
190 | | }; |
191 | | |
192 | | WriteThread(uint64_t max_yield_usec, uint64_t slow_yield_usec); |
193 | | |
194 | | // IMPORTANT: None of the methods in this class rely on the db mutex |
195 | | // for correctness. All of the methods except JoinBatchGroup and |
196 | | // EnterUnbatched may be called either with or without the db mutex held. |
197 | | // Correctness is maintained by ensuring that only a single thread is |
198 | | // a leader at a time. |
199 | | |
200 | | // Registers w as ready to become part of a batch group, waits until the |
201 | | // caller should perform some work, and returns the current state of the |
202 | | // writer. If w has become the leader of a write batch group, returns |
203 | | // STATE_GROUP_LEADER. If w has been made part of a sequential batch |
204 | | // group and the leader has performed the write, returns STATE_DONE. |
205 | | // If w has been made part of a parallel batch group and is reponsible |
206 | | // for updating the memtable, returns STATE_PARALLEL_FOLLOWER. |
207 | | // |
208 | | // The db mutex SHOULD NOT be held when calling this function, because |
209 | | // it will block. |
210 | | // |
211 | | // Writer* w: Writer to be executed as part of a batch group |
212 | | void JoinBatchGroup(Writer* w); |
213 | | |
214 | | // Constructs a write batch group led by leader, which should be a |
215 | | // Writer passed to JoinBatchGroup on the current thread. |
216 | | // |
217 | | // Writer* leader: Writer that is STATE_GROUP_LEADER |
218 | | // Writer** last_writer: Out-param that identifies the last follower |
219 | | // autovector<WriteBatch*>* write_batch_group: Out-param of group members |
220 | | // returns: Total batch group byte size |
221 | | size_t EnterAsBatchGroupLeader( |
222 | | Writer* leader, Writer** last_writer, |
223 | | autovector<WriteThread::Writer*>* write_batch_group); |
224 | | |
225 | | // Causes JoinBatchGroup to return STATE_PARALLEL_FOLLOWER for all of the |
226 | | // non-leader members of this write batch group. Sets Writer::sequence |
227 | | // before waking them up. |
228 | | // |
229 | | // ParallalGroup* pg: Extra state used to coordinate the parallel add |
230 | | // SequenceNumber sequence: Starting sequence number to assign to Writer-s |
231 | | void LaunchParallelFollowers(ParallelGroup* pg, SequenceNumber sequence); |
232 | | |
233 | | // Reports the completion of w's batch to the parallel group leader, and |
234 | | // waits for the rest of the parallel batch to complete. Returns true |
235 | | // if this thread is the last to complete, and hence should advance |
236 | | // the sequence number and then call EarlyExitParallelGroup, false if |
237 | | // someone else has already taken responsibility for that. |
238 | | bool CompleteParallelWorker(Writer* w); |
239 | | |
240 | | // This method performs an early completion of a parallel write group, |
241 | | // where the cleanup work of the leader is performed by a follower who |
242 | | // happens to be the last parallel worker to complete. |
243 | | void EarlyExitParallelGroup(Writer* w); |
244 | | |
245 | | // Unlinks the Writer-s in a batch group, wakes up the non-leaders, |
246 | | // and wakes up the next leader (if any). |
247 | | // |
248 | | // Writer* leader: From EnterAsBatchGroupLeader |
249 | | // Writer* last_writer: Value of out-param of EnterAsBatchGroupLeader |
250 | | // Status status: Status of write operation |
251 | | void ExitAsBatchGroupLeader(Writer* leader, Writer* last_writer, |
252 | | Status status); |
253 | | |
254 | | // Waits for all preceding writers (unlocking mu while waiting), then |
255 | | // registers w as the currently proceeding writer. |
256 | | // |
257 | | // Writer* w: A Writer not eligible for batching |
258 | | // InstrumentedMutex* mu: The db mutex, to unlock while waiting |
259 | | // REQUIRES: db mutex held |
260 | | void EnterUnbatched(Writer* w, InstrumentedMutex* mu); |
261 | | |
262 | | // Completes a Writer begun with EnterUnbatched, unblocking subsequent |
263 | | // writers. |
264 | | void ExitUnbatched(Writer* w); |
265 | | |
266 | | struct AdaptationContext { |
267 | | const char* name; |
268 | | std::atomic<int32_t> value; |
269 | | |
270 | 26.9k | explicit AdaptationContext(const char* name0) : name(name0), value(0) {} |
271 | | }; |
272 | | |
273 | | private: |
274 | | uint64_t max_yield_usec_; |
275 | | uint64_t slow_yield_usec_; |
276 | | |
277 | | // Points to the newest pending Writer. Only leader can remove |
278 | | // elements, adding can be done lock-free by anybody |
279 | | std::atomic<Writer*> newest_writer_; |
280 | | |
281 | | // Waits for w->state & goal_mask using w->StateMutex(). Returns |
282 | | // the state that satisfies goal_mask. |
283 | | uint8_t BlockingAwaitState(Writer* w, uint8_t goal_mask); |
284 | | |
285 | | // Blocks until w->state & goal_mask, returning the state value |
286 | | // that satisfied the predicate. Uses ctx to adaptively use |
287 | | // std::this_thread::yield() to avoid mutex overheads. ctx should be |
288 | | // a context-dependent static. |
289 | | uint8_t AwaitState(Writer* w, uint8_t goal_mask, AdaptationContext* ctx); |
290 | | |
291 | | void SetState(Writer* w, uint8_t new_state); |
292 | | |
293 | | // Links w into the newest_writer_ list. Sets *linked_as_leader to |
294 | | // true if w was linked directly into the leader position. Safe to |
295 | | // call from multiple threads without external locking. |
296 | | void LinkOne(Writer* w, bool* linked_as_leader); |
297 | | |
298 | | // Computes any missing link_newer links. Should not be called |
299 | | // concurrently with itself. |
300 | | void CreateMissingNewerLinks(Writer* head); |
301 | | }; |
302 | | |
303 | | } // namespace rocksdb |
304 | | |
305 | | #endif // YB_ROCKSDB_DB_WRITE_THREAD_H |