/Users/deen/code/yugabyte-db/src/yb/tablet/operations/operation.h
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | // |
18 | | // The following only applies to changes made to this file as part of YugaByte development. |
19 | | // |
20 | | // Portions Copyright (c) YugaByte, Inc. |
21 | | // |
22 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
23 | | // in compliance with the License. You may obtain a copy of the License at |
24 | | // |
25 | | // http://www.apache.org/licenses/LICENSE-2.0 |
26 | | // |
27 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
28 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
29 | | // or implied. See the License for the specific language governing permissions and limitations |
30 | | // under the License. |
31 | | // |
32 | | |
33 | | #ifndef YB_TABLET_OPERATIONS_OPERATION_H |
34 | | #define YB_TABLET_OPERATIONS_OPERATION_H |
35 | | |
36 | | #include <mutex> |
37 | | #include <string> |
38 | | |
39 | | #include <boost/optional/optional.hpp> |
40 | | |
41 | | #include "yb/common/hybrid_time.h" |
42 | | |
43 | | #include "yb/consensus/consensus_fwd.h" |
44 | | #include "yb/consensus/consensus_round.h" |
45 | | #include "yb/consensus/consensus_types.pb.h" |
46 | | |
47 | | #include "yb/tablet/tablet_fwd.h" |
48 | | |
49 | | #include "yb/util/status_fwd.h" |
50 | | #include "yb/util/locks.h" |
51 | | #include "yb/util/operation_counter.h" |
52 | | #include "yb/util/opid.h" |
53 | | |
54 | | namespace yb { |
55 | | |
56 | | class Synchronizer; |
57 | | |
58 | | namespace tablet { |
59 | | |
60 | | using OperationCompletionCallback = std::function<void(const Status&)>; |
61 | | |
62 | | YB_DEFINE_ENUM( |
63 | | OperationType, |
64 | | ((kWrite, consensus::WRITE_OP)) |
65 | | ((kChangeMetadata, consensus::CHANGE_METADATA_OP)) |
66 | | ((kUpdateTransaction, consensus::UPDATE_TRANSACTION_OP)) |
67 | | ((kSnapshot, consensus::SNAPSHOT_OP)) |
68 | | ((kTruncate, consensus::TRUNCATE_OP)) |
69 | | ((kEmpty, consensus::UNKNOWN_OP)) |
70 | | ((kHistoryCutoff, consensus::HISTORY_CUTOFF_OP)) |
71 | | ((kSplit, consensus::SPLIT_OP))); |
72 | | |
73 | | // Base class for transactions. There are different implementations for different types (Write, |
74 | | // AlterSchema, etc.) OperationDriver implementations use Operations along with Consensus to execute |
75 | | // and replicate operations in a consensus configuration. |
76 | | class Operation { |
77 | | public: |
78 | | enum TraceType { |
79 | | NO_TRACE_TXNS = 0, |
80 | | TRACE_TXNS = 1 |
81 | | }; |
82 | | |
83 | | explicit Operation(OperationType operation_type, Tablet* tablet); |
84 | | |
85 | | // Returns this transaction's type. |
86 | 48.0M | OperationType operation_type() const { return operation_type_; } |
87 | | |
88 | | // Builds the ReplicateMsg for this transaction. |
89 | | virtual consensus::ReplicateMsgPtr NewReplicateMsg() = 0; |
90 | | |
91 | | // Executes the prepare phase of this transaction. The actual actions of this phase depend on the |
92 | | // transaction type, but usually are limited to what can be done without actually changing shared |
93 | | // data structures (such as the RocksDB memtable) and without side-effects. |
94 | | virtual CHECKED_STATUS Prepare() = 0; |
95 | | |
96 | | // Applies replicated operation, the actual actions of this phase depend on the |
97 | | // operation type, but usually this is the method where data-structures are changed. |
98 | | // Also it should notify callback if necessary. |
99 | | CHECKED_STATUS Replicated(int64_t leader_term); |
100 | | |
101 | | // Abort operation. Release resources and notify callbacks. |
102 | | void Aborted(const Status& status, bool was_pending); |
103 | | |
104 | | // Each implementation should have its own ToString() method. |
105 | | virtual std::string ToString() const; |
106 | | |
107 | | std::string LogPrefix() const; |
108 | | |
109 | 3.16M | void set_preparing_token(ScopedOperation&& preparing_token) { |
110 | 3.16M | preparing_token_ = std::move(preparing_token); |
111 | 3.16M | } |
112 | | |
113 | 14.3M | void SubmittedToPreparer() { |
114 | 14.3M | preparing_token_ = ScopedOperation(); |
115 | 14.3M | } |
116 | | |
117 | | // Returns the request PB associated with this transaction. May be NULL if the transaction's state |
118 | | // has been reset. |
119 | 0 | virtual const google::protobuf::Message* request() const { return nullptr; } |
120 | | |
121 | | // Sets the ConsensusRound for this transaction, if this transaction is being executed through the |
122 | | // consensus system. |
123 | | void set_consensus_round(const scoped_refptr<consensus::ConsensusRound>& consensus_round); |
124 | | |
125 | | // Each subclass should provide a way to update the internal reference to the Message* request, so |
126 | | // we can avoid copying the request object all the time. |
127 | | virtual void UpdateRequestFromConsensusRound() = 0; |
128 | | |
129 | | // Returns the ConsensusRound being used, if this transaction is being executed through the |
130 | | // consensus system or NULL if it's not. |
131 | 79.6M | consensus::ConsensusRound* consensus_round() { |
132 | 79.6M | return consensus_round_.get(); |
133 | 79.6M | } |
134 | | |
135 | 7 | const consensus::ConsensusRound* consensus_round() const { |
136 | 7 | return consensus_round_.get(); |
137 | 7 | } |
138 | | |
139 | 105M | Tablet* tablet() const { |
140 | 105M | return tablet_; |
141 | 105M | } |
142 | | |
143 | | virtual void Release(); |
144 | | |
145 | 52 | void SetTablet(Tablet* tablet) { |
146 | 52 | tablet_ = tablet; |
147 | 52 | } |
148 | | |
149 | | template <class F> |
150 | | void set_completion_callback(const F& completion_clbk) { |
151 | | completion_clbk_ = completion_clbk; |
152 | | } |
153 | | |
154 | | template <class F> |
155 | 6.16M | void set_completion_callback(F&& completion_clbk) { |
156 | 6.16M | completion_clbk_ = std::move(completion_clbk); |
157 | 6.16M | } void yb::tablet::Operation::set_completion_callback<auto yb::tserver::MakeRpcOperationCompletionCallback<yb::tserver::ChangeMetadataResponsePB>(yb::rpc::RpcContext, yb::tserver::ChangeMetadataResponsePB*, scoped_refptr<yb::server::Clock> const&)::'lambda'(yb::Status const&)>(yb::tserver::ChangeMetadataResponsePB*&&) Line | Count | Source | 155 | 31.2k | void set_completion_callback(F&& completion_clbk) { | 156 | 31.2k | completion_clbk_ = std::move(completion_clbk); | 157 | 31.2k | } |
void yb::tablet::Operation::set_completion_callback<auto yb::tserver::MakeRpcOperationCompletionCallback<yb::tserver::UpdateTransactionResponsePB>(yb::rpc::RpcContext, yb::tserver::UpdateTransactionResponsePB*, scoped_refptr<yb::server::Clock> const&)::'lambda'(yb::Status const&)>(yb::tserver::UpdateTransactionResponsePB*&&) Line | Count | Source | 155 | 2.52M | void set_completion_callback(F&& completion_clbk) { | 156 | 2.52M | completion_clbk_ = std::move(completion_clbk); | 157 | 2.52M | } |
void yb::tablet::Operation::set_completion_callback<auto yb::tserver::MakeRpcOperationCompletionCallback<yb::tserver::TruncateResponsePB>(yb::rpc::RpcContext, yb::tserver::TruncateResponsePB*, scoped_refptr<yb::server::Clock> const&)::'lambda'(yb::Status const&)>(yb::tserver::TruncateResponsePB*&&) Line | Count | Source | 155 | 57.0k | void set_completion_callback(F&& completion_clbk) { | 156 | 57.0k | completion_clbk_ = std::move(completion_clbk); | 157 | 57.0k | } |
void yb::tablet::Operation::set_completion_callback<auto yb::tserver::MakeRpcOperationCompletionCallback<yb::tserver::SplitTabletResponsePB>(yb::rpc::RpcContext, yb::tserver::SplitTabletResponsePB*, scoped_refptr<yb::server::Clock> const&)::'lambda'(yb::Status const&)>(yb::tserver::SplitTabletResponsePB*&&) Line | Count | Source | 155 | 45 | void set_completion_callback(F&& completion_clbk) { | 156 | 45 | completion_clbk_ = std::move(completion_clbk); | 157 | 45 | } |
void yb::tablet::Operation::set_completion_callback<auto yb::tserver::MakeRpcOperationCompletionCallback<yb::tserver::TabletSnapshotOpResponsePB>(yb::rpc::RpcContext, yb::tserver::TabletSnapshotOpResponsePB*, scoped_refptr<yb::server::Clock> const&)::'lambda'(yb::Status const&)>(yb::tserver::TabletSnapshotOpResponsePB*&&) Line | Count | Source | 155 | 48 | void set_completion_callback(F&& completion_clbk) { | 156 | 48 | completion_clbk_ = std::move(completion_clbk); | 157 | 48 | } |
void yb::tablet::Operation::set_completion_callback<std::__1::function<void (yb::Status const&)> >(std::__1::function<void (yb::Status const&)>&&) Line | Count | Source | 155 | 430k | void set_completion_callback(F&& completion_clbk) { | 156 | 430k | completion_clbk_ = std::move(completion_clbk); | 157 | 430k | } |
write_query.cc:void yb::tablet::Operation::set_completion_callback<yb::tablet::WriteQuery::PrepareSubmit()::$_0>(yb::tablet::WriteQuery::PrepareSubmit()::$_0&&) Line | Count | Source | 155 | 3.11M | void set_completion_callback(F&& completion_clbk) { | 156 | 3.11M | completion_clbk_ = std::move(completion_clbk); | 157 | 3.11M | } |
Unexecuted instantiation: void yb::tablet::Operation::set_completion_callback<yb::master::MasterSnapshotCoordinator::Impl::SubmitDelete(yb::StronglyTypedUuid<yb::TxnSnapshotId_Tag> const&, long long, std::__1::shared_ptr<yb::Synchronizer> const&)::'lambda'(yb::Status const&)>(yb::master::MasterSnapshotCoordinator::Impl::SubmitDelete(yb::StronglyTypedUuid<yb::TxnSnapshotId_Tag> const&, long long, std::__1::shared_ptr<yb::Synchronizer> const&)::'lambda'(yb::Status const&)&&) void yb::tablet::Operation::set_completion_callback<auto yb::tablet::MakeLatchOperationCompletionCallback<yb::CountDownLatch*, yb::tserver::TabletSnapshotOpResponsePB*>(yb::CountDownLatch*, yb::tserver::TabletSnapshotOpResponsePB*)::'lambda'(yb::Status const&)>(yb::tserver::TabletSnapshotOpResponsePB*&&) Line | Count | Source | 155 | 746 | void set_completion_callback(F&& completion_clbk) { | 156 | 746 | completion_clbk_ = std::move(completion_clbk); | 157 | 746 | } |
|
158 | | |
159 | | // Sets the hybrid_time for the transaction |
160 | | void set_hybrid_time(const HybridTime& hybrid_time); |
161 | | |
162 | 52.7M | HybridTime hybrid_time() const { |
163 | 52.7M | std::lock_guard<simple_spinlock> l(mutex_); |
164 | 52.7M | DCHECK(hybrid_time_.is_valid()); |
165 | 52.7M | return hybrid_time_; |
166 | 52.7M | } |
167 | | |
168 | 54.1k | HybridTime hybrid_time_even_if_unset() const { |
169 | 54.1k | std::lock_guard<simple_spinlock> l(mutex_); |
170 | 54.1k | return hybrid_time_; |
171 | 54.1k | } |
172 | | |
173 | 8 | bool has_hybrid_time() const { |
174 | 8 | std::lock_guard<simple_spinlock> l(mutex_); |
175 | 8 | return hybrid_time_.is_valid(); |
176 | 8 | } |
177 | | |
178 | | // Returns hybrid time that should be used for storing this operation result in RocksDB. |
179 | | // For instance it could be different from hybrid_time() for CDC. |
180 | | virtual HybridTime WriteHybridTime() const; |
181 | | |
182 | 14.6M | void set_op_id(const OpId& op_id) { |
183 | 14.6M | std::lock_guard<simple_spinlock> l(mutex_); |
184 | 14.6M | op_id_ = op_id; |
185 | 14.6M | } |
186 | | |
187 | 44.5M | const OpId& op_id() const { |
188 | 44.5M | return op_id_; |
189 | 44.5M | } |
190 | | |
191 | 0 | bool has_completion_callback() const { |
192 | 0 | return completion_clbk_ != nullptr; |
193 | 0 | } |
194 | | |
195 | | void CompleteWithStatus(const Status& status) const; |
196 | | |
197 | | // Whether we should use MVCC Manager to track this operation. |
198 | 2.56M | virtual bool use_mvcc() const { |
199 | 2.56M | return false; |
200 | 2.56M | } |
201 | | |
202 | | // Initialize operation at leader side. |
203 | | // op_id - operation id. |
204 | | // committed_op_id - current committed operation id. |
205 | | void AddedToLeader(const OpId& op_id, const OpId& committed_op_id); |
206 | | void AddedToFollower(); |
207 | | |
208 | | void Aborted(bool was_pending); |
209 | | void Replicated(); |
210 | | |
211 | | virtual ~Operation(); |
212 | | |
213 | | private: |
214 | | // Actual implementation of Replicated. |
215 | | // complete_status could be used to change completion status, i.e. callback will be invoked |
216 | | // with this status. |
217 | | virtual CHECKED_STATUS DoReplicated(int64_t leader_term, Status* complete_status) = 0; |
218 | | |
219 | | // Actual implementation of Aborted, should return status that should be passed to callback. |
220 | | virtual CHECKED_STATUS DoAborted(const Status& status) = 0; |
221 | | |
222 | | // A private version of this transaction's transaction state so that we can use base |
223 | | // Operation methods on destructors. |
224 | | const OperationType operation_type_; |
225 | | |
226 | 14.3M | virtual void AddedAsPending() {} |
227 | 14.3M | virtual void RemovedFromPending() {} |
228 | | |
229 | | // The tablet peer that is coordinating this transaction. |
230 | | Tablet* tablet_; |
231 | | |
232 | | // Optional callback to be called once the transaction completes. |
233 | | OperationCompletionCallback completion_clbk_; |
234 | | |
235 | | mutable std::atomic<bool> complete_{false}; |
236 | | |
237 | | mutable simple_spinlock mutex_; |
238 | | |
239 | | // This transaction's hybrid_time. Protected by mutex_. |
240 | | HybridTime hybrid_time_ GUARDED_BY(mutex_); |
241 | | |
242 | | // This OpId stores the canonical "anchor" OpId for this transaction. |
243 | | OpId op_id_ GUARDED_BY(mutex_); |
244 | | |
245 | | scoped_refptr<consensus::ConsensusRound> consensus_round_; |
246 | | |
247 | | ScopedOperation preparing_token_; |
248 | | }; |
249 | | |
250 | | template <class Request> |
251 | | struct RequestTraits { |
252 | | static void SetAllocatedRequest( |
253 | | consensus::ReplicateMsg* replicate, Request* request); |
254 | | |
255 | | static Request* MutableRequest(consensus::ReplicateMsg* replicate); |
256 | | }; |
257 | | |
258 | | consensus::ReplicateMsgPtr CreateReplicateMsg(OperationType op_type); |
259 | | |
260 | | template <OperationType op_type, class Request, class Base = Operation> |
261 | | class OperationBase : public Base { |
262 | | public: |
263 | | explicit OperationBase(Tablet* tablet, const Request* request = nullptr) |
264 | 16.2M | : Base(op_type, tablet), request_(request) {} yb::tablet::OperationBase<(yb::tablet::OperationType)6, yb::tablet::TransactionStatePB, yb::tablet::Operation>::OperationBase(yb::tablet::Tablet*, yb::tablet::TransactionStatePB const*) Line | Count | Source | 264 | 5.95M | : Base(op_type, tablet), request_(request) {} |
yb::tablet::OperationBase<(yb::tablet::OperationType)8, yb::tablet::TruncatePB, yb::tablet::Operation>::OperationBase(yb::tablet::Tablet*, yb::tablet::TruncatePB const*) Line | Count | Source | 264 | 170k | : Base(op_type, tablet), request_(request) {} |
yb::tablet::OperationBase<(yb::tablet::OperationType)10, yb::tablet::SplitTabletRequestPB, yb::tablet::Operation>::OperationBase(yb::tablet::Tablet*, yb::tablet::SplitTabletRequestPB const*) Line | Count | Source | 264 | 71 | : Base(op_type, tablet), request_(request) {} |
yb::tablet::OperationBase<(yb::tablet::OperationType)7, yb::tserver::TabletSnapshotOpRequestPB, yb::tablet::ExclusiveSchemaOperationBase>::OperationBase(yb::tablet::Tablet*, yb::tserver::TabletSnapshotOpRequestPB const*) Line | Count | Source | 264 | 1.98k | : Base(op_type, tablet), request_(request) {} |
yb::tablet::OperationBase<(yb::tablet::OperationType)3, yb::tablet::WritePB, yb::tablet::Operation>::OperationBase(yb::tablet::Tablet*, yb::tablet::WritePB const*) Line | Count | Source | 264 | 9.01M | : Base(op_type, tablet), request_(request) {} |
Unexecuted instantiation: yb::tablet::OperationBase<(yb::tablet::OperationType)9, yb::consensus::HistoryCutoffPB, yb::tablet::Operation>::OperationBase(yb::tablet::Tablet*, yb::consensus::HistoryCutoffPB const*) yb::tablet::OperationBase<(yb::tablet::OperationType)4, yb::tablet::ChangeMetadataRequestPB, yb::tablet::ExclusiveSchemaOperationBase>::OperationBase(yb::tablet::Tablet*, yb::tablet::ChangeMetadataRequestPB const*) Line | Count | Source | 264 | 1.10M | : Base(op_type, tablet), request_(request) {} |
|
265 | | |
266 | 31.2M | const Request* request() const override { |
267 | 31.2M | return request_.load(std::memory_order_acquire); |
268 | 31.2M | } yb::tablet::OperationBase<(yb::tablet::OperationType)6, yb::tablet::TransactionStatePB, yb::tablet::Operation>::request() const Line | Count | Source | 266 | 12.0M | const Request* request() const override { | 267 | 12.0M | return request_.load(std::memory_order_acquire); | 268 | 12.0M | } |
Unexecuted instantiation: yb::tablet::OperationBase<(yb::tablet::OperationType)8, yb::tablet::TruncatePB, yb::tablet::Operation>::request() const yb::tablet::OperationBase<(yb::tablet::OperationType)10, yb::tablet::SplitTabletRequestPB, yb::tablet::Operation>::request() const Line | Count | Source | 266 | 72 | const Request* request() const override { | 267 | 72 | return request_.load(std::memory_order_acquire); | 268 | 72 | } |
yb::tablet::OperationBase<(yb::tablet::OperationType)7, yb::tserver::TabletSnapshotOpRequestPB, yb::tablet::ExclusiveSchemaOperationBase>::request() const Line | Count | Source | 266 | 10.4k | const Request* request() const override { | 267 | 10.4k | return request_.load(std::memory_order_acquire); | 268 | 10.4k | } |
yb::tablet::OperationBase<(yb::tablet::OperationType)4, yb::tablet::ChangeMetadataRequestPB, yb::tablet::ExclusiveSchemaOperationBase>::request() const Line | Count | Source | 266 | 9.78M | const Request* request() const override { | 267 | 9.78M | return request_.load(std::memory_order_acquire); | 268 | 9.78M | } |
yb::tablet::OperationBase<(yb::tablet::OperationType)3, yb::tablet::WritePB, yb::tablet::Operation>::request() const Line | Count | Source | 266 | 9.42M | const Request* request() const override { | 267 | 9.42M | return request_.load(std::memory_order_acquire); | 268 | 9.42M | } |
Unexecuted instantiation: yb::tablet::OperationBase<(yb::tablet::OperationType)9, yb::consensus::HistoryCutoffPB, yb::tablet::Operation>::request() const |
269 | | |
270 | 3.24M | Request* AllocateRequest() { |
271 | 3.24M | request_holder_ = std::make_unique<Request>(); |
272 | 3.24M | request_.store(request_holder_.get(), std::memory_order_release); |
273 | 3.24M | return request_holder_.get(); |
274 | 3.24M | } yb::tablet::OperationBase<(yb::tablet::OperationType)3, yb::tablet::WritePB, yb::tablet::Operation>::AllocateRequest() Line | Count | Source | 270 | 3.24M | Request* AllocateRequest() { | 271 | 3.24M | request_holder_ = std::make_unique<Request>(); | 272 | 3.24M | request_.store(request_holder_.get(), std::memory_order_release); | 273 | 3.24M | return request_holder_.get(); | 274 | 3.24M | } |
Unexecuted instantiation: yb::tablet::OperationBase<(yb::tablet::OperationType)9, yb::consensus::HistoryCutoffPB, yb::tablet::Operation>::AllocateRequest() yb::tablet::OperationBase<(yb::tablet::OperationType)7, yb::tserver::TabletSnapshotOpRequestPB, yb::tablet::ExclusiveSchemaOperationBase>::AllocateRequest() Line | Count | Source | 270 | 24 | Request* AllocateRequest() { | 271 | 24 | request_holder_ = std::make_unique<Request>(); | 272 | 24 | request_.store(request_holder_.get(), std::memory_order_release); | 273 | 24 | return request_holder_.get(); | 274 | 24 | } |
|
275 | | |
276 | 11.9M | Request* mutable_request() { |
277 | 11.9M | return request_holder_.get(); |
278 | 11.9M | } |
279 | | |
280 | | Request* ReleaseRequest() { |
281 | | return request_holder_.release(); |
282 | | } |
283 | | |
284 | 406k | void TakeRequest(Request* request) { |
285 | 406k | request_holder_.reset(new Request); |
286 | 406k | request_.store(request_holder_.get(), std::memory_order_release); |
287 | 406k | request_holder_->Swap(request); |
288 | 406k | } |
289 | | |
290 | 5.04M | consensus::ReplicateMsgPtr NewReplicateMsg() override { |
291 | 5.04M | auto result = CreateReplicateMsg(op_type); |
292 | 5.04M | auto* request = request_holder_.release(); |
293 | 5.04M | if (request) { |
294 | 3.42M | RequestTraits<Request>::SetAllocatedRequest(result.get(), request); |
295 | 3.42M | } else { |
296 | 1.62M | *RequestTraits<Request>::MutableRequest(result.get()) = *request_; |
297 | 1.62M | } |
298 | 5.04M | return result; |
299 | 5.04M | } yb::tablet::OperationBase<(yb::tablet::OperationType)6, yb::tablet::TransactionStatePB, yb::tablet::Operation>::NewReplicateMsg() Line | Count | Source | 290 | 1.50M | consensus::ReplicateMsgPtr NewReplicateMsg() override { | 291 | 1.50M | auto result = CreateReplicateMsg(op_type); | 292 | 1.50M | auto* request = request_holder_.release(); | 293 | 1.50M | if (request) { | 294 | 406k | RequestTraits<Request>::SetAllocatedRequest(result.get(), request); | 295 | 1.09M | } else { | 296 | 1.09M | *RequestTraits<Request>::MutableRequest(result.get()) = *request_; | 297 | 1.09M | } | 298 | 1.50M | return result; | 299 | 1.50M | } |
yb::tablet::OperationBase<(yb::tablet::OperationType)8, yb::tablet::TruncatePB, yb::tablet::Operation>::NewReplicateMsg() Line | Count | Source | 290 | 57.1k | consensus::ReplicateMsgPtr NewReplicateMsg() override { | 291 | 57.1k | auto result = CreateReplicateMsg(op_type); | 292 | 57.1k | auto* request = request_holder_.release(); | 293 | 57.1k | if (request) { | 294 | 0 | RequestTraits<Request>::SetAllocatedRequest(result.get(), request); | 295 | 57.1k | } else { | 296 | 57.1k | *RequestTraits<Request>::MutableRequest(result.get()) = *request_; | 297 | 57.1k | } | 298 | 57.1k | return result; | 299 | 57.1k | } |
yb::tablet::OperationBase<(yb::tablet::OperationType)10, yb::tablet::SplitTabletRequestPB, yb::tablet::Operation>::NewReplicateMsg() Line | Count | Source | 290 | 45 | consensus::ReplicateMsgPtr NewReplicateMsg() override { | 291 | 45 | auto result = CreateReplicateMsg(op_type); | 292 | 45 | auto* request = request_holder_.release(); | 293 | 45 | if (request) { | 294 | 0 | RequestTraits<Request>::SetAllocatedRequest(result.get(), request); | 295 | 45 | } else { | 296 | 45 | *RequestTraits<Request>::MutableRequest(result.get()) = *request_; | 297 | 45 | } | 298 | 45 | return result; | 299 | 45 | } |
yb::tablet::OperationBase<(yb::tablet::OperationType)7, yb::tserver::TabletSnapshotOpRequestPB, yb::tablet::ExclusiveSchemaOperationBase>::NewReplicateMsg() Line | Count | Source | 290 | 818 | consensus::ReplicateMsgPtr NewReplicateMsg() override { | 291 | 818 | auto result = CreateReplicateMsg(op_type); | 292 | 818 | auto* request = request_holder_.release(); | 293 | 818 | if (request) { | 294 | 24 | RequestTraits<Request>::SetAllocatedRequest(result.get(), request); | 295 | 794 | } else { | 296 | 794 | *RequestTraits<Request>::MutableRequest(result.get()) = *request_; | 297 | 794 | } | 298 | 818 | return result; | 299 | 818 | } |
yb::tablet::OperationBase<(yb::tablet::OperationType)3, yb::tablet::WritePB, yb::tablet::Operation>::NewReplicateMsg() Line | Count | Source | 290 | 3.01M | consensus::ReplicateMsgPtr NewReplicateMsg() override { | 291 | 3.01M | auto result = CreateReplicateMsg(op_type); | 292 | 3.01M | auto* request = request_holder_.release(); | 293 | 3.01M | if (request) { | 294 | 3.01M | RequestTraits<Request>::SetAllocatedRequest(result.get(), request); | 295 | 3.01M | } else { | 296 | 4.03k | *RequestTraits<Request>::MutableRequest(result.get()) = *request_; | 297 | 4.03k | } | 298 | 3.01M | return result; | 299 | 3.01M | } |
Unexecuted instantiation: yb::tablet::OperationBase<(yb::tablet::OperationType)9, yb::consensus::HistoryCutoffPB, yb::tablet::Operation>::NewReplicateMsg() yb::tablet::OperationBase<(yb::tablet::OperationType)4, yb::tablet::ChangeMetadataRequestPB, yb::tablet::ExclusiveSchemaOperationBase>::NewReplicateMsg() Line | Count | Source | 290 | 462k | consensus::ReplicateMsgPtr NewReplicateMsg() override { | 291 | 462k | auto result = CreateReplicateMsg(op_type); | 292 | 462k | auto* request = request_holder_.release(); | 293 | 462k | if (request) { | 294 | 0 | RequestTraits<Request>::SetAllocatedRequest(result.get(), request); | 295 | 462k | } else { | 296 | 462k | *RequestTraits<Request>::MutableRequest(result.get()) = *request_; | 297 | 462k | } | 298 | 462k | return result; | 299 | 462k | } |
|
300 | | |
301 | 14.3M | void UpdateRequestFromConsensusRound() override { |
302 | 14.3M | UseRequest(RequestTraits<Request>::MutableRequest( |
303 | 14.3M | Base::consensus_round()->replicate_msg().get())); |
304 | 14.3M | } yb::tablet::OperationBase<(yb::tablet::OperationType)6, yb::tablet::TransactionStatePB, yb::tablet::Operation>::UpdateRequestFromConsensusRound() Line | Count | Source | 301 | 4.49M | void UpdateRequestFromConsensusRound() override { | 302 | 4.49M | UseRequest(RequestTraits<Request>::MutableRequest( | 303 | 4.49M | Base::consensus_round()->replicate_msg().get())); | 304 | 4.49M | } |
yb::tablet::OperationBase<(yb::tablet::OperationType)8, yb::tablet::TruncatePB, yb::tablet::Operation>::UpdateRequestFromConsensusRound() Line | Count | Source | 301 | 171k | void UpdateRequestFromConsensusRound() override { | 302 | 171k | UseRequest(RequestTraits<Request>::MutableRequest( | 303 | 171k | Base::consensus_round()->replicate_msg().get())); | 304 | 171k | } |
yb::tablet::OperationBase<(yb::tablet::OperationType)10, yb::tablet::SplitTabletRequestPB, yb::tablet::Operation>::UpdateRequestFromConsensusRound() Line | Count | Source | 301 | 71 | void UpdateRequestFromConsensusRound() override { | 302 | 71 | UseRequest(RequestTraits<Request>::MutableRequest( | 303 | 71 | Base::consensus_round()->replicate_msg().get())); | 304 | 71 | } |
yb::tablet::OperationBase<(yb::tablet::OperationType)7, yb::tserver::TabletSnapshotOpRequestPB, yb::tablet::ExclusiveSchemaOperationBase>::UpdateRequestFromConsensusRound() Line | Count | Source | 301 | 1.97k | void UpdateRequestFromConsensusRound() override { | 302 | 1.97k | UseRequest(RequestTraits<Request>::MutableRequest( | 303 | 1.97k | Base::consensus_round()->replicate_msg().get())); | 304 | 1.97k | } |
yb::tablet::OperationBase<(yb::tablet::OperationType)3, yb::tablet::WritePB, yb::tablet::Operation>::UpdateRequestFromConsensusRound() Line | Count | Source | 301 | 8.55M | void UpdateRequestFromConsensusRound() override { | 302 | 8.55M | UseRequest(RequestTraits<Request>::MutableRequest( | 303 | 8.55M | Base::consensus_round()->replicate_msg().get())); | 304 | 8.55M | } |
Unexecuted instantiation: yb::tablet::OperationBase<(yb::tablet::OperationType)9, yb::consensus::HistoryCutoffPB, yb::tablet::Operation>::UpdateRequestFromConsensusRound() yb::tablet::OperationBase<(yb::tablet::OperationType)4, yb::tablet::ChangeMetadataRequestPB, yb::tablet::ExclusiveSchemaOperationBase>::UpdateRequestFromConsensusRound() Line | Count | Source | 301 | 1.10M | void UpdateRequestFromConsensusRound() override { | 302 | 1.10M | UseRequest(RequestTraits<Request>::MutableRequest( | 303 | 1.10M | Base::consensus_round()->replicate_msg().get())); | 304 | 1.10M | } |
|
305 | | |
306 | | protected: |
307 | 15.4M | void UseRequest(const Request* request) { |
308 | 15.4M | request_.store(request, std::memory_order_release); |
309 | 15.4M | } yb::tablet::OperationBase<(yb::tablet::OperationType)6, yb::tablet::TransactionStatePB, yb::tablet::Operation>::UseRequest(yb::tablet::TransactionStatePB const*) Line | Count | Source | 307 | 4.49M | void UseRequest(const Request* request) { | 308 | 4.49M | request_.store(request, std::memory_order_release); | 309 | 4.49M | } |
yb::tablet::OperationBase<(yb::tablet::OperationType)8, yb::tablet::TruncatePB, yb::tablet::Operation>::UseRequest(yb::tablet::TruncatePB const*) Line | Count | Source | 307 | 171k | void UseRequest(const Request* request) { | 308 | 171k | request_.store(request, std::memory_order_release); | 309 | 171k | } |
yb::tablet::OperationBase<(yb::tablet::OperationType)10, yb::tablet::SplitTabletRequestPB, yb::tablet::Operation>::UseRequest(yb::tablet::SplitTabletRequestPB const*) Line | Count | Source | 307 | 71 | void UseRequest(const Request* request) { | 308 | 71 | request_.store(request, std::memory_order_release); | 309 | 71 | } |
yb::tablet::OperationBase<(yb::tablet::OperationType)7, yb::tserver::TabletSnapshotOpRequestPB, yb::tablet::ExclusiveSchemaOperationBase>::UseRequest(yb::tserver::TabletSnapshotOpRequestPB const*) Line | Count | Source | 307 | 3.93k | void UseRequest(const Request* request) { | 308 | 3.93k | request_.store(request, std::memory_order_release); | 309 | 3.93k | } |
yb::tablet::OperationBase<(yb::tablet::OperationType)3, yb::tablet::WritePB, yb::tablet::Operation>::UseRequest(yb::tablet::WritePB const*) Line | Count | Source | 307 | 8.55M | void UseRequest(const Request* request) { | 308 | 8.55M | request_.store(request, std::memory_order_release); | 309 | 8.55M | } |
Unexecuted instantiation: yb::tablet::OperationBase<(yb::tablet::OperationType)9, yb::consensus::HistoryCutoffPB, yb::tablet::Operation>::UseRequest(yb::consensus::HistoryCutoffPB const*) yb::tablet::OperationBase<(yb::tablet::OperationType)4, yb::tablet::ChangeMetadataRequestPB, yb::tablet::ExclusiveSchemaOperationBase>::UseRequest(yb::tablet::ChangeMetadataRequestPB const*) Line | Count | Source | 307 | 2.21M | void UseRequest(const Request* request) { | 308 | 2.21M | request_.store(request, std::memory_order_release); | 309 | 2.21M | } |
|
310 | | |
311 | | private: |
312 | | std::unique_ptr<Request> request_holder_; |
313 | | std::atomic<const Request*> request_; |
314 | | }; |
315 | | |
316 | | class ExclusiveSchemaOperationBase : public Operation { |
317 | | public: |
318 | | template <class... Args> |
319 | | explicit ExclusiveSchemaOperationBase(Args&&... args) |
320 | 1.11M | : Operation(std::forward<Args>(args)...) {} |
321 | | |
322 | | // Release the acquired schema lock. |
323 | | void ReleasePermitToken(); |
324 | | |
325 | 27.5k | void UsePermitToken(ScopedRWOperationPause&& token) { |
326 | 27.5k | permit_token_ = std::move(token); |
327 | 27.5k | } |
328 | | |
329 | | private: |
330 | | // Used to pause write operations from being accepted while alter is in progress. |
331 | | ScopedRWOperationPause permit_token_; |
332 | | }; |
333 | | |
334 | | template <OperationType operation_type, class Request> |
335 | | class ExclusiveSchemaOperation |
336 | | : public OperationBase<operation_type, Request, ExclusiveSchemaOperationBase> { |
337 | | public: |
338 | | template <class... Args> |
339 | | explicit ExclusiveSchemaOperation(Args&&... args) |
340 | | : OperationBase<operation_type, Request, ExclusiveSchemaOperationBase>( |
341 | 1.11M | std::forward<Args>(args)...) {} yb::tablet::ExclusiveSchemaOperation<(yb::tablet::OperationType)7, yb::tserver::TabletSnapshotOpRequestPB>::ExclusiveSchemaOperation<yb::tablet::Tablet*, yb::tserver::TabletSnapshotOpRequestPB const*&>(yb::tablet::Tablet*&&, yb::tserver::TabletSnapshotOpRequestPB const*&) Line | Count | Source | 341 | 48 | std::forward<Args>(args)...) {} |
Unexecuted instantiation: yb::tablet::ExclusiveSchemaOperation<(yb::tablet::OperationType)7, yb::tserver::TabletSnapshotOpRequestPB>::ExclusiveSchemaOperation<yb::tablet::Tablet*, yb::tserver::TabletSnapshotOpRequestPB* const&>(yb::tablet::Tablet*&&, yb::tserver::TabletSnapshotOpRequestPB* const&) yb::tablet::ExclusiveSchemaOperation<(yb::tablet::OperationType)7, yb::tserver::TabletSnapshotOpRequestPB>::ExclusiveSchemaOperation<yb::tablet::Tablet*>(yb::tablet::Tablet*&&) Line | Count | Source | 341 | 1.15k | std::forward<Args>(args)...) {} |
yb::tablet::ExclusiveSchemaOperation<(yb::tablet::OperationType)4, yb::tablet::ChangeMetadataRequestPB>::ExclusiveSchemaOperation<yb::tablet::Tablet*&, yb::tablet::ChangeMetadataRequestPB const*&>(yb::tablet::Tablet*&, yb::tablet::ChangeMetadataRequestPB const*&) Line | Count | Source | 341 | 1.10M | std::forward<Args>(args)...) {} |
yb::tablet::ExclusiveSchemaOperation<(yb::tablet::OperationType)7, yb::tserver::TabletSnapshotOpRequestPB>::ExclusiveSchemaOperation<std::nullptr_t>(std::nullptr_t&&) Line | Count | Source | 341 | 24 | std::forward<Args>(args)...) {} |
yb::tablet::ExclusiveSchemaOperation<(yb::tablet::OperationType)7, yb::tserver::TabletSnapshotOpRequestPB>::ExclusiveSchemaOperation<yb::tablet::Tablet*, yb::tserver::TabletSnapshotOpRequestPB*>(yb::tablet::Tablet*&&, yb::tserver::TabletSnapshotOpRequestPB*&&) Line | Count | Source | 341 | 758 | std::forward<Args>(args)...) {} |
|
342 | | |
343 | 1.10M | void Release() override { |
344 | 1.10M | ExclusiveSchemaOperationBase::ReleasePermitToken(); |
345 | | |
346 | | // Make the request NULL since after this operation commits |
347 | | // the request may be deleted at any moment. |
348 | 1.10M | OperationBase<operation_type, Request, ExclusiveSchemaOperationBase>::UseRequest(nullptr); |
349 | 1.10M | } yb::tablet::ExclusiveSchemaOperation<(yb::tablet::OperationType)7, yb::tserver::TabletSnapshotOpRequestPB>::Release() Line | Count | Source | 343 | 1.96k | void Release() override { | 344 | 1.96k | ExclusiveSchemaOperationBase::ReleasePermitToken(); | 345 | | | 346 | | // Make the request NULL since after this operation commits | 347 | | // the request may be deleted at any moment. | 348 | 1.96k | OperationBase<operation_type, Request, ExclusiveSchemaOperationBase>::UseRequest(nullptr); | 349 | 1.96k | } |
yb::tablet::ExclusiveSchemaOperation<(yb::tablet::OperationType)4, yb::tablet::ChangeMetadataRequestPB>::Release() Line | Count | Source | 343 | 1.10M | void Release() override { | 344 | 1.10M | ExclusiveSchemaOperationBase::ReleasePermitToken(); | 345 | | | 346 | | // Make the request NULL since after this operation commits | 347 | | // the request may be deleted at any moment. | 348 | 1.10M | OperationBase<operation_type, Request, ExclusiveSchemaOperationBase>::UseRequest(nullptr); | 349 | 1.10M | } |
|
350 | | }; |
351 | | |
352 | | template<class LatchPtr, class ResponsePBPtr> |
353 | 290k | auto MakeLatchOperationCompletionCallback(LatchPtr latch, ResponsePBPtr response) { |
354 | 290k | return [latch, response](const Status& status) { |
355 | 290k | if (!status.ok()) { |
356 | 28 | StatusToPB(status, response->mutable_error()->mutable_status()); |
357 | 28 | } |
358 | 290k | latch->CountDown(); |
359 | 290k | }; auto yb::tablet::MakeLatchOperationCompletionCallback<yb::CountDownLatch*, yb::tserver::WriteResponsePB*>(yb::CountDownLatch*, yb::tserver::WriteResponsePB*)::'lambda'(yb::Status const&)::operator()('lambda'(yb::Status const&)) const Line | Count | Source | 354 | 4.01k | return [latch, response](const Status& status) { | 355 | 4.01k | if (!status.ok()) { | 356 | 0 | StatusToPB(status, response->mutable_error()->mutable_status()); | 357 | 0 | } | 358 | 4.01k | latch->CountDown(); | 359 | 4.01k | }; |
auto yb::tablet::MakeLatchOperationCompletionCallback<std::__1::shared_ptr<yb::CountDownLatch>, std::__1::shared_ptr<yb::tserver::WriteResponsePB> >(std::__1::shared_ptr<yb::CountDownLatch>, std::__1::shared_ptr<yb::tserver::WriteResponsePB>)::'lambda'(yb::Status const&)::operator()('lambda'(yb::Status const&)) const Line | Count | Source | 354 | 285k | return [latch, response](const Status& status) { | 355 | 285k | if (!status.ok()) { | 356 | 28 | StatusToPB(status, response->mutable_error()->mutable_status()); | 357 | 28 | } | 358 | 285k | latch->CountDown(); | 359 | 285k | }; |
auto yb::tablet::MakeLatchOperationCompletionCallback<yb::CountDownLatch*, yb::tserver::TabletSnapshotOpResponsePB*>(yb::CountDownLatch*, yb::tserver::TabletSnapshotOpResponsePB*)::'lambda'(yb::Status const&)::operator()('lambda'(yb::Status const&)) const Line | Count | Source | 354 | 745 | return [latch, response](const Status& status) { | 355 | 745 | if (!status.ok()) { | 356 | 0 | StatusToPB(status, response->mutable_error()->mutable_status()); | 357 | 0 | } | 358 | 745 | latch->CountDown(); | 359 | 745 | }; |
|
360 | 290k | } auto yb::tablet::MakeLatchOperationCompletionCallback<yb::CountDownLatch*, yb::tserver::WriteResponsePB*>(yb::CountDownLatch*, yb::tserver::WriteResponsePB*) Line | Count | Source | 353 | 4.01k | auto MakeLatchOperationCompletionCallback(LatchPtr latch, ResponsePBPtr response) { | 354 | 4.01k | return [latch, response](const Status& status) { | 355 | 4.01k | if (!status.ok()) { | 356 | 4.01k | StatusToPB(status, response->mutable_error()->mutable_status()); | 357 | 4.01k | } | 358 | 4.01k | latch->CountDown(); | 359 | 4.01k | }; | 360 | 4.01k | } |
auto yb::tablet::MakeLatchOperationCompletionCallback<std::__1::shared_ptr<yb::CountDownLatch>, std::__1::shared_ptr<yb::tserver::WriteResponsePB> >(std::__1::shared_ptr<yb::CountDownLatch>, std::__1::shared_ptr<yb::tserver::WriteResponsePB>) Line | Count | Source | 353 | 285k | auto MakeLatchOperationCompletionCallback(LatchPtr latch, ResponsePBPtr response) { | 354 | 285k | return [latch, response](const Status& status) { | 355 | 285k | if (!status.ok()) { | 356 | 285k | StatusToPB(status, response->mutable_error()->mutable_status()); | 357 | 285k | } | 358 | 285k | latch->CountDown(); | 359 | 285k | }; | 360 | 285k | } |
auto yb::tablet::MakeLatchOperationCompletionCallback<yb::CountDownLatch*, yb::tserver::TabletSnapshotOpResponsePB*>(yb::CountDownLatch*, yb::tserver::TabletSnapshotOpResponsePB*) Line | Count | Source | 353 | 746 | auto MakeLatchOperationCompletionCallback(LatchPtr latch, ResponsePBPtr response) { | 354 | 746 | return [latch, response](const Status& status) { | 355 | 746 | if (!status.ok()) { | 356 | 746 | StatusToPB(status, response->mutable_error()->mutable_status()); | 357 | 746 | } | 358 | 746 | latch->CountDown(); | 359 | 746 | }; | 360 | 746 | } |
|
361 | | |
362 | | OperationCompletionCallback MakeWeakSynchronizerOperationCompletionCallback( |
363 | | std::weak_ptr<Synchronizer> synchronizer); |
364 | | |
365 | | } // namespace tablet |
366 | | } // namespace yb |
367 | | |
368 | | #endif // YB_TABLET_OPERATIONS_OPERATION_H |