YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/tablet/operations/operation_driver.h
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#ifndef YB_TABLET_OPERATIONS_OPERATION_DRIVER_H
34
#define YB_TABLET_OPERATIONS_OPERATION_DRIVER_H
35
36
#include <condition_variable>
37
#include <string>
38
39
#include <boost/atomic.hpp>
40
41
#include "yb/common/common_types.pb.h"
42
43
#include "yb/consensus/log_fwd.h"
44
#include "yb/consensus/consensus_round.h"
45
46
#include "yb/gutil/ref_counted.h"
47
#include "yb/gutil/walltime.h"
48
49
#include "yb/tablet/operations/operation.h"
50
51
#include "yb/util/status_fwd.h"
52
#include "yb/util/lockfree.h"
53
#include "yb/util/opid.h"
54
#include "yb/util/trace.h"
55
56
namespace yb {
57
class ThreadPool;
58
59
namespace tablet {
60
class MvccManager;
61
class OperationTracker;
62
class OperationDriver;
63
class Preparer;
64
65
// Base class for operation drivers.
66
//
67
// OperationDriver classes encapsulate the logic of coordinating the execution of an operation. The
68
// exact triggering of the methods differs based on whether the operation is being executed on a
69
// leader or replica, but the general flow is:
70
//
71
//  1 - Init() is called on a newly created driver object.  If the driver is instantiated from a
72
//  REPLICA, then we know that the operation is already "REPLICATING" (and thus we don't need to
73
//  trigger replication ourself later on).
74
//
75
//  2 - ExecuteAsync() is called. This submits the operation driver to the Preparer and returns
76
//      immediately.
77
//
78
//  3 - PrepareAndStartTask() calls Prepare() and Start() on the operation.
79
//
80
//      Once successfully prepared, if we have not yet replicated (i.e we are leader), also triggers
81
//      consensus->Replicate() and changes the replication state to REPLICATING.
82
//
83
//      What happens in reality is more complicated, as Preparer tries to batch leader-side
84
//      operations before submitting them to consensus.
85
//
86
//      On the other hand, if we have already successfully replicated (e.g. we are the follower and
87
//      ConsensusCommitted() has already been called, then we can move on to ApplyOperation().
88
//
89
//  4 - The Consensus implementation calls ConsensusCommitted()
90
//
91
//      This is triggered by consensus when the commit index moves past our own OpId. On followers,
92
//      this can happen before Prepare() finishes, and thus we have to check whether we have already
93
//      done step 3. On leaders, we don't start the consensus round until after Prepare, so this
94
//      check always passes.
95
//
96
//      If Prepare() has already completed, then we trigger ApplyAsync().
97
//
98
//  5 - ApplyOperation() calls ApplyTask(), which then calls operation_->Apply().
99
//
100
//      When operation_->Apply() is called, changes are made to the in-memory data structures. These
101
//      changes are not visible to clients yet.
102
//
103
//  6 - The driver executes Finalize() which, in turn, makes operations make their changes visible
104
//      to other operations.  After this step the driver replies to the client if needed and the
105
//      operation is completed.  In-mem data structures that contain the changes made by the
106
//      operation can now be made durable.
107
//
108
// [1] - see 'Implementation Techniques for Main Memory Database Systems', DeWitt et. al.
109
//
110
// This class is thread safe.
111
class OperationDriver : public RefCountedThreadSafe<OperationDriver>,
112
                        public consensus::ConsensusRoundCallback,
113
                        public MPSCQueueEntry<OperationDriver> {
114
115
 public:
116
  // Construct OperationDriver. OperationDriver does not take ownership
117
  // of any of the objects pointed to in the constructor's arguments.
118
  OperationDriver(OperationTracker* operation_tracker,
119
                  consensus::Consensus* consensus,
120
                  Preparer* preparer,
121
                  TableType table_type_);
122
123
  // Perform any non-constructor initialization. Sets the operation
124
  // that will be executed.
125
  // if term == kUnknownTerm then we launch this operation as replica, otherwise
126
  // we are leader and operation should be bound to this term.
127
  CHECKED_STATUS Init(std::unique_ptr<Operation>* operation, int64_t term);
128
129
  // Returns the OpId of the operation being executed or an uninitialized
130
  // OpId if none has been assigned. Returns a copy and thus should not
131
  // be used in tight loops.
132
  yb::OpId GetOpId();
133
134
  // Submits the operation for execution.
135
  // The returned status acknowledges any error on the submission process.
136
  // The operation will be replied to asynchronously.
137
  void ExecuteAsync();
138
139
  // Aborts the operation, if possible. Since operations are executed in
140
  // multiple stages by multiple executors it might not be possible to stop
141
  // the operation immediately, but this will make sure it is aborted
142
  // at the next synchronization point.
143
  void Abort(const Status& status);
144
145
  // Callback from Consensus when replication is complete, and thus the operation
146
  // is considered "committed" from the consensus perspective (ie it will be
147
  // applied on every node, and not ever truncated from the state machine history).
148
  // If status is anything different from OK() we don't proceed with the apply.
149
  //
150
  // see comment in the interface for an important TODO.
151
  void ReplicationFinished(
152
      const Status& status, int64_t leader_term, OpIds* applied_op_ids) override;
153
154
  std::string ToString() const;
155
156
  std::string ToStringUnlocked() const;
157
158
  std::string LogPrefix() const;
159
160
  // Returns the type of the operation being executed by this driver.
161
  OperationType operation_type() const;
162
163
  // Returns the state of the operation being executed by this driver.
164
  const Operation* operation() const;
165
166
0
  const MonoTime& start_time() const { return start_time_; }
167
168
82.2M
  Trace* trace() { return trace_.get(); }
169
170
  void AddedToLeader(const OpId& op_id, const OpId& committed_op_id) override;
171
172
36.5M
  bool is_leader_side() {
173
    // TODO: switch state to an atomic.
174
36.5M
    std::lock_guard<simple_spinlock> lock(lock_);
175
36.5M
    return replication_state_ == ReplicationState::NOT_REPLICATING;
176
36.5M
  }
177
178
  // Actually prepare and start. In case of leader-side operations, this stops short of calling
179
  // Consensus::Replicate, which is the responsibility of the caller. This is being done so that
180
  // we can append multiple rounds to the consensus queue together.
181
  CHECKED_STATUS PrepareAndStart();
182
183
  // The task used to be submitted to the prepare threadpool to prepare and start the operation.
184
  // If PrepareAndStart() fails, calls HandleFailure. Since 07/07/2017 this is being used for
185
  // non-leader-side operations from Preparer, and for leader-side operations the handling
186
  // is a bit more complicated due to batching.
187
  void PrepareAndStartTask();
188
189
  // Handle a failure in any of the stages of the operation.
190
  // In some cases, this will end the operation and call its callback.
191
  // In others, where we can't recover, this will FATAL.
192
  void HandleFailure(const Status& status);
193
194
0
  consensus::Consensus* consensus() { return consensus_; }
195
196
14.7M
  consensus::ConsensusRound* consensus_round() {
197
14.7M
    return mutable_operation()->consensus_round();
198
14.7M
  }
199
200
25.2M
  void SetPropagatedSafeTime(HybridTime safe_time, MvccManager* mvcc) {
201
25.2M
    propagated_safe_time_ = safe_time;
202
25.2M
    mvcc_ = mvcc;
203
25.2M
  }
204
205
  int64_t SpaceUsed();
206
207
 private:
208
  friend class RefCountedThreadSafe<OperationDriver>;
209
  enum ReplicationState {
210
    // The operation has not yet been sent to consensus for replication
211
    NOT_REPLICATING,
212
213
    // Replication has been triggered (either because we are the leader and triggered it,
214
    // or because we are a follower and we started this operation in response to a
215
    // leader's call)
216
    REPLICATING,
217
218
    // Replication has failed, and we are certain that no other may have received the
219
    // operation (ie we failed before even sending the request off of our node).
220
    REPLICATION_FAILED,
221
222
    // Replication has succeeded.
223
    REPLICATED
224
  };
225
226
  enum PrepareState {
227
    NOT_PREPARED,
228
    PREPARED
229
  };
230
231
  ~OperationDriver();
232
233
  // Starts operation, returns false is we should NOT continue processing the operation.
234
  bool StartOperation();
235
236
  // Calls Operation::Apply() followed by Consensus::Commit() with the
237
  // results from the Apply().
238
  void ApplyTask(int64_t leader_term, OpIds* applied_op_ids);
239
240
  // Returns the mutable state of the operation being executed by
241
  // this driver.
242
  Operation* mutable_operation();
243
244
  // Return a short string indicating where the operation currently is in the
245
  // state machine.
246
  static std::string StateString(ReplicationState repl_state,
247
                                 PrepareState prep_state);
248
249
  OperationTracker* const operation_tracker_;
250
  consensus::Consensus* const consensus_;
251
  Preparer* const preparer_;
252
253
  // Lock that synchronizes access to the operation's state.
254
  mutable simple_spinlock lock_;
255
256
  // A copy of the operation's OpId, set when the operation first
257
  // receives one from Consensus and uninitialized until then.
258
  // TODO: we have two separate copies of this now -- in Operation, and here... we should be able
259
  // to consolidate!
260
  boost::atomic<yb::OpId> op_id_copy_{yb::OpId::Invalid()};
261
262
  // The operation to be executed by this driver.
263
  std::unique_ptr<Operation> operation_;
264
265
  // Trace object for tracing any operations started by this driver.
266
  scoped_refptr<Trace> trace_;
267
268
  const MonoTime start_time_;
269
270
  ReplicationState replication_state_;
271
  PrepareState prepare_state_;
272
273
  // The system monotonic time when the operation was prepared.
274
  // This is used for debugging only, not any actual operation ordering.
275
  MicrosecondsInt64 prepare_physical_hybrid_time_ = 0;
276
277
  TableType table_type_;
278
279
  MvccManager* mvcc_ = nullptr;
280
  HybridTime propagated_safe_time_;
281
282
  DISALLOW_COPY_AND_ASSIGN(OperationDriver);
283
};
284
285
}  // namespace tablet
286
}  // namespace yb
287
288
#endif // YB_TABLET_OPERATIONS_OPERATION_DRIVER_H