/Users/deen/code/yugabyte-db/src/yb/tablet/operations/operation_driver.h
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | // |
18 | | // The following only applies to changes made to this file as part of YugaByte development. |
19 | | // |
20 | | // Portions Copyright (c) YugaByte, Inc. |
21 | | // |
22 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
23 | | // in compliance with the License. You may obtain a copy of the License at |
24 | | // |
25 | | // http://www.apache.org/licenses/LICENSE-2.0 |
26 | | // |
27 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
28 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
29 | | // or implied. See the License for the specific language governing permissions and limitations |
30 | | // under the License. |
31 | | // |
32 | | |
33 | | #ifndef YB_TABLET_OPERATIONS_OPERATION_DRIVER_H |
34 | | #define YB_TABLET_OPERATIONS_OPERATION_DRIVER_H |
35 | | |
36 | | #include <condition_variable> |
37 | | #include <string> |
38 | | |
39 | | #include <boost/atomic.hpp> |
40 | | |
41 | | #include "yb/common/common_types.pb.h" |
42 | | |
43 | | #include "yb/consensus/log_fwd.h" |
44 | | #include "yb/consensus/consensus_round.h" |
45 | | |
46 | | #include "yb/gutil/ref_counted.h" |
47 | | #include "yb/gutil/walltime.h" |
48 | | |
49 | | #include "yb/tablet/operations/operation.h" |
50 | | |
51 | | #include "yb/util/status_fwd.h" |
52 | | #include "yb/util/lockfree.h" |
53 | | #include "yb/util/opid.h" |
54 | | #include "yb/util/trace.h" |
55 | | |
56 | | namespace yb { |
57 | | class ThreadPool; |
58 | | |
59 | | namespace tablet { |
60 | | class MvccManager; |
61 | | class OperationTracker; |
62 | | class OperationDriver; |
63 | | class Preparer; |
64 | | |
65 | | // Base class for operation drivers. |
66 | | // |
67 | | // OperationDriver classes encapsulate the logic of coordinating the execution of an operation. The |
68 | | // exact triggering of the methods differs based on whether the operation is being executed on a |
69 | | // leader or replica, but the general flow is: |
70 | | // |
71 | | // 1 - Init() is called on a newly created driver object. If the driver is instantiated from a |
72 | | // REPLICA, then we know that the operation is already "REPLICATING" (and thus we don't need to |
73 | | // trigger replication ourself later on). |
74 | | // |
75 | | // 2 - ExecuteAsync() is called. This submits the operation driver to the Preparer and returns |
76 | | // immediately. |
77 | | // |
78 | | // 3 - PrepareAndStartTask() calls Prepare() and Start() on the operation. |
79 | | // |
80 | | // Once successfully prepared, if we have not yet replicated (i.e we are leader), also triggers |
81 | | // consensus->Replicate() and changes the replication state to REPLICATING. |
82 | | // |
83 | | // What happens in reality is more complicated, as Preparer tries to batch leader-side |
84 | | // operations before submitting them to consensus. |
85 | | // |
86 | | // On the other hand, if we have already successfully replicated (e.g. we are the follower and |
87 | | // ConsensusCommitted() has already been called, then we can move on to ApplyOperation(). |
88 | | // |
89 | | // 4 - The Consensus implementation calls ConsensusCommitted() |
90 | | // |
91 | | // This is triggered by consensus when the commit index moves past our own OpId. On followers, |
92 | | // this can happen before Prepare() finishes, and thus we have to check whether we have already |
93 | | // done step 3. On leaders, we don't start the consensus round until after Prepare, so this |
94 | | // check always passes. |
95 | | // |
96 | | // If Prepare() has already completed, then we trigger ApplyAsync(). |
97 | | // |
98 | | // 5 - ApplyOperation() calls ApplyTask(), which then calls operation_->Apply(). |
99 | | // |
100 | | // When operation_->Apply() is called, changes are made to the in-memory data structures. These |
101 | | // changes are not visible to clients yet. |
102 | | // |
103 | | // 6 - The driver executes Finalize() which, in turn, makes operations make their changes visible |
104 | | // to other operations. After this step the driver replies to the client if needed and the |
105 | | // operation is completed. In-mem data structures that contain the changes made by the |
106 | | // operation can now be made durable. |
107 | | // |
108 | | // [1] - see 'Implementation Techniques for Main Memory Database Systems', DeWitt et. al. |
109 | | // |
110 | | // This class is thread safe. |
111 | | class OperationDriver : public RefCountedThreadSafe<OperationDriver>, |
112 | | public consensus::ConsensusRoundCallback, |
113 | | public MPSCQueueEntry<OperationDriver> { |
114 | | |
115 | | public: |
116 | | // Construct OperationDriver. OperationDriver does not take ownership |
117 | | // of any of the objects pointed to in the constructor's arguments. |
118 | | OperationDriver(OperationTracker* operation_tracker, |
119 | | consensus::Consensus* consensus, |
120 | | Preparer* preparer, |
121 | | TableType table_type_); |
122 | | |
123 | | // Perform any non-constructor initialization. Sets the operation |
124 | | // that will be executed. |
125 | | // if term == kUnknownTerm then we launch this operation as replica, otherwise |
126 | | // we are leader and operation should be bound to this term. |
127 | | CHECKED_STATUS Init(std::unique_ptr<Operation>* operation, int64_t term); |
128 | | |
129 | | // Returns the OpId of the operation being executed or an uninitialized |
130 | | // OpId if none has been assigned. Returns a copy and thus should not |
131 | | // be used in tight loops. |
132 | | yb::OpId GetOpId(); |
133 | | |
134 | | // Submits the operation for execution. |
135 | | // The returned status acknowledges any error on the submission process. |
136 | | // The operation will be replied to asynchronously. |
137 | | void ExecuteAsync(); |
138 | | |
139 | | // Aborts the operation, if possible. Since operations are executed in |
140 | | // multiple stages by multiple executors it might not be possible to stop |
141 | | // the operation immediately, but this will make sure it is aborted |
142 | | // at the next synchronization point. |
143 | | void Abort(const Status& status); |
144 | | |
145 | | // Callback from Consensus when replication is complete, and thus the operation |
146 | | // is considered "committed" from the consensus perspective (ie it will be |
147 | | // applied on every node, and not ever truncated from the state machine history). |
148 | | // If status is anything different from OK() we don't proceed with the apply. |
149 | | // |
150 | | // see comment in the interface for an important TODO. |
151 | | void ReplicationFinished( |
152 | | const Status& status, int64_t leader_term, OpIds* applied_op_ids) override; |
153 | | |
154 | | std::string ToString() const; |
155 | | |
156 | | std::string ToStringUnlocked() const; |
157 | | |
158 | | std::string LogPrefix() const; |
159 | | |
160 | | // Returns the type of the operation being executed by this driver. |
161 | | OperationType operation_type() const; |
162 | | |
163 | | // Returns the state of the operation being executed by this driver. |
164 | | const Operation* operation() const; |
165 | | |
166 | 0 | const MonoTime& start_time() const { return start_time_; } |
167 | | |
168 | 82.2M | Trace* trace() { return trace_.get(); } |
169 | | |
170 | | void AddedToLeader(const OpId& op_id, const OpId& committed_op_id) override; |
171 | | |
172 | 36.5M | bool is_leader_side() { |
173 | | // TODO: switch state to an atomic. |
174 | 36.5M | std::lock_guard<simple_spinlock> lock(lock_); |
175 | 36.5M | return replication_state_ == ReplicationState::NOT_REPLICATING; |
176 | 36.5M | } |
177 | | |
178 | | // Actually prepare and start. In case of leader-side operations, this stops short of calling |
179 | | // Consensus::Replicate, which is the responsibility of the caller. This is being done so that |
180 | | // we can append multiple rounds to the consensus queue together. |
181 | | CHECKED_STATUS PrepareAndStart(); |
182 | | |
183 | | // The task used to be submitted to the prepare threadpool to prepare and start the operation. |
184 | | // If PrepareAndStart() fails, calls HandleFailure. Since 07/07/2017 this is being used for |
185 | | // non-leader-side operations from Preparer, and for leader-side operations the handling |
186 | | // is a bit more complicated due to batching. |
187 | | void PrepareAndStartTask(); |
188 | | |
189 | | // Handle a failure in any of the stages of the operation. |
190 | | // In some cases, this will end the operation and call its callback. |
191 | | // In others, where we can't recover, this will FATAL. |
192 | | void HandleFailure(const Status& status); |
193 | | |
194 | 0 | consensus::Consensus* consensus() { return consensus_; } |
195 | | |
196 | 14.7M | consensus::ConsensusRound* consensus_round() { |
197 | 14.7M | return mutable_operation()->consensus_round(); |
198 | 14.7M | } |
199 | | |
200 | 25.2M | void SetPropagatedSafeTime(HybridTime safe_time, MvccManager* mvcc) { |
201 | 25.2M | propagated_safe_time_ = safe_time; |
202 | 25.2M | mvcc_ = mvcc; |
203 | 25.2M | } |
204 | | |
205 | | int64_t SpaceUsed(); |
206 | | |
207 | | private: |
208 | | friend class RefCountedThreadSafe<OperationDriver>; |
209 | | enum ReplicationState { |
210 | | // The operation has not yet been sent to consensus for replication |
211 | | NOT_REPLICATING, |
212 | | |
213 | | // Replication has been triggered (either because we are the leader and triggered it, |
214 | | // or because we are a follower and we started this operation in response to a |
215 | | // leader's call) |
216 | | REPLICATING, |
217 | | |
218 | | // Replication has failed, and we are certain that no other may have received the |
219 | | // operation (ie we failed before even sending the request off of our node). |
220 | | REPLICATION_FAILED, |
221 | | |
222 | | // Replication has succeeded. |
223 | | REPLICATED |
224 | | }; |
225 | | |
226 | | enum PrepareState { |
227 | | NOT_PREPARED, |
228 | | PREPARED |
229 | | }; |
230 | | |
231 | | ~OperationDriver(); |
232 | | |
233 | | // Starts operation, returns false is we should NOT continue processing the operation. |
234 | | bool StartOperation(); |
235 | | |
236 | | // Calls Operation::Apply() followed by Consensus::Commit() with the |
237 | | // results from the Apply(). |
238 | | void ApplyTask(int64_t leader_term, OpIds* applied_op_ids); |
239 | | |
240 | | // Returns the mutable state of the operation being executed by |
241 | | // this driver. |
242 | | Operation* mutable_operation(); |
243 | | |
244 | | // Return a short string indicating where the operation currently is in the |
245 | | // state machine. |
246 | | static std::string StateString(ReplicationState repl_state, |
247 | | PrepareState prep_state); |
248 | | |
249 | | OperationTracker* const operation_tracker_; |
250 | | consensus::Consensus* const consensus_; |
251 | | Preparer* const preparer_; |
252 | | |
253 | | // Lock that synchronizes access to the operation's state. |
254 | | mutable simple_spinlock lock_; |
255 | | |
256 | | // A copy of the operation's OpId, set when the operation first |
257 | | // receives one from Consensus and uninitialized until then. |
258 | | // TODO: we have two separate copies of this now -- in Operation, and here... we should be able |
259 | | // to consolidate! |
260 | | boost::atomic<yb::OpId> op_id_copy_{yb::OpId::Invalid()}; |
261 | | |
262 | | // The operation to be executed by this driver. |
263 | | std::unique_ptr<Operation> operation_; |
264 | | |
265 | | // Trace object for tracing any operations started by this driver. |
266 | | scoped_refptr<Trace> trace_; |
267 | | |
268 | | const MonoTime start_time_; |
269 | | |
270 | | ReplicationState replication_state_; |
271 | | PrepareState prepare_state_; |
272 | | |
273 | | // The system monotonic time when the operation was prepared. |
274 | | // This is used for debugging only, not any actual operation ordering. |
275 | | MicrosecondsInt64 prepare_physical_hybrid_time_ = 0; |
276 | | |
277 | | TableType table_type_; |
278 | | |
279 | | MvccManager* mvcc_ = nullptr; |
280 | | HybridTime propagated_safe_time_; |
281 | | |
282 | | DISALLOW_COPY_AND_ASSIGN(OperationDriver); |
283 | | }; |
284 | | |
285 | | } // namespace tablet |
286 | | } // namespace yb |
287 | | |
288 | | #endif // YB_TABLET_OPERATIONS_OPERATION_DRIVER_H |