/Users/deen/code/yugabyte-db/src/yb/tablet/operations/operation_driver.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | // |
18 | | // The following only applies to changes made to this file as part of YugaByte development. |
19 | | // |
20 | | // Portions Copyright (c) YugaByte, Inc. |
21 | | // |
22 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
23 | | // in compliance with the License. You may obtain a copy of the License at |
24 | | // |
25 | | // http://www.apache.org/licenses/LICENSE-2.0 |
26 | | // |
27 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
28 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
29 | | // or implied. See the License for the specific language governing permissions and limitations |
30 | | // under the License. |
31 | | // |
32 | | |
33 | | #include "yb/tablet/operations/operation_driver.h" |
34 | | |
35 | | #include <atomic> |
36 | | #include <future> |
37 | | #include <map> |
38 | | #include <memory> |
39 | | #include <mutex> |
40 | | #include <string> |
41 | | #include <vector> |
42 | | |
43 | | #include "yb/consensus/consensus_fwd.h" |
44 | | #include "yb/consensus/consensus.h" |
45 | | #include "yb/consensus/consensus.pb.h" |
46 | | |
47 | | #include "yb/gutil/callback.h" |
48 | | #include "yb/gutil/ref_counted.h" |
49 | | #include "yb/gutil/strings/substitute.h" |
50 | | #include "yb/gutil/thread_annotations.h" |
51 | | |
52 | | #include "yb/master/sys_catalog_constants.h" |
53 | | |
54 | | #include "yb/rpc/rpc_fwd.h" |
55 | | |
56 | | #include "yb/tablet/mvcc.h" |
57 | | #include "yb/tablet/operations/operation_tracker.h" |
58 | | #include "yb/tablet/preparer.h" |
59 | | #include "yb/tablet/tablet.h" |
60 | | #include "yb/tablet/tablet_options.h" |
61 | | |
62 | | #include "yb/util/atomic.h" |
63 | | #include "yb/util/debug-util.h" |
64 | | #include "yb/util/debug/trace_event.h" |
65 | | #include "yb/util/flag_tags.h" |
66 | | #include "yb/util/logging.h" |
67 | | #include "yb/util/threadpool.h" |
68 | | #include "yb/util/trace.h" |
69 | | |
70 | | using namespace std::literals; |
71 | | |
72 | | DEFINE_test_flag(int32, delay_execute_async_ms, 0, |
73 | | "Delay execution of ExecuteAsync for specified amount of milliseconds during " |
74 | | "tests"); |
75 | | |
76 | | namespace yb { |
77 | | namespace tablet { |
78 | | |
79 | | using namespace std::placeholders; |
80 | | |
81 | | using consensus::Consensus; |
82 | | using consensus::ConsensusRound; |
83 | | |
84 | | //////////////////////////////////////////////////////////// |
85 | | // OperationDriver |
86 | | //////////////////////////////////////////////////////////// |
87 | | |
88 | | OperationDriver::OperationDriver(OperationTracker *operation_tracker, |
89 | | Consensus* consensus, |
90 | | Preparer* preparer, |
91 | | TableType table_type) |
92 | | : operation_tracker_(operation_tracker), |
93 | | consensus_(consensus), |
94 | | preparer_(preparer), |
95 | | trace_(new Trace()), |
96 | | start_time_(MonoTime::Now()), |
97 | | replication_state_(NOT_REPLICATING), |
98 | | prepare_state_(NOT_PREPARED), |
99 | 13.5M | table_type_(table_type) { |
100 | 13.5M | if (Trace::CurrentTrace()) { |
101 | 1.26M | Trace::CurrentTrace()->AddChildTrace(trace_.get()); |
102 | 1.26M | } |
103 | 13.5M | DCHECK(IsAcceptableAtomicImpl(op_id_copy_)); |
104 | 13.5M | } |
105 | | |
106 | 13.4M | Status OperationDriver::Init(std::unique_ptr<Operation>* operation, int64_t term) { |
107 | 13.4M | if (operation) { |
108 | 7.78M | operation_ = std::move(*operation); |
109 | 7.78M | } |
110 | | |
111 | 13.4M | if (term == OpId::kUnknownTerm) { |
112 | 10.7M | if (operation_) { |
113 | 5.09M | op_id_copy_.store(operation_->op_id(), boost::memory_order_release); |
114 | 5.09M | } |
115 | 10.7M | replication_state_ = REPLICATING; |
116 | 2.69M | } else { |
117 | 2.69M | if (consensus_) { // sometimes NULL in tests |
118 | 2.69M | consensus::ReplicateMsgPtr replicate_msg = operation_->NewReplicateMsg(); |
119 | 2.69M | auto round = make_scoped_refptr<ConsensusRound>(consensus_, std::move(replicate_msg)); |
120 | 2.69M | round->BindToTerm(term); |
121 | 2.69M | round->SetCallback(this); |
122 | 2.69M | mutable_operation()->set_consensus_round(std::move(round)); |
123 | 2.69M | } |
124 | 2.69M | } |
125 | | |
126 | 13.4M | auto result = operation_tracker_->Add(this); |
127 | 13.4M | if (!result.ok() && operation) { |
128 | 2 | *operation = std::move(operation_); |
129 | 2 | } |
130 | | |
131 | 13.4M | if (term == OpId::kUnknownTerm && operation_) { |
132 | 5.09M | operation_->AddedToFollower(); |
133 | 5.09M | } |
134 | | |
135 | 13.4M | return result; |
136 | 13.4M | } |
137 | | |
138 | 24.2M | yb::OpId OperationDriver::GetOpId() { |
139 | 24.2M | return op_id_copy_.load(boost::memory_order_acquire); |
140 | 24.2M | } |
141 | | |
142 | 2 | const Operation* OperationDriver::operation() const { |
143 | 2 | return operation_.get(); |
144 | 2 | } |
145 | | |
146 | 10.6M | Operation* OperationDriver::mutable_operation() { |
147 | 10.6M | return operation_.get(); |
148 | 10.6M | } |
149 | | |
150 | 43.1M | OperationType OperationDriver::operation_type() const { |
151 | 26.0M | return operation_ ? operation_->operation_type() : OperationType::kEmpty; |
152 | 43.1M | } |
153 | | |
154 | 1 | string OperationDriver::ToString() const { |
155 | 1 | std::lock_guard<simple_spinlock> lock(lock_); |
156 | 1 | return ToStringUnlocked(); |
157 | 1 | } |
158 | | |
159 | 1 | string OperationDriver::ToStringUnlocked() const { |
160 | 1 | string ret = StateString(replication_state_, prepare_state_); |
161 | 1 | if (operation_ != nullptr) { |
162 | 1 | ret += " " + operation_->ToString(); |
163 | 0 | } else { |
164 | 0 | ret += "[unknown operation]"; |
165 | 0 | } |
166 | 1 | return ret; |
167 | 1 | } |
168 | | |
169 | | |
170 | 13.4M | void OperationDriver::ExecuteAsync() { |
171 | 7.70k | VLOG_WITH_PREFIX(4) << "ExecuteAsync()"; |
172 | 13.4M | TRACE_EVENT_FLOW_BEGIN0("operation", "ExecuteAsync", this); |
173 | 13.4M | ADOPT_TRACE(trace()); |
174 | 13.4M | TRACE_FUNC(); |
175 | | |
176 | 13.4M | auto delay = GetAtomicFlag(&FLAGS_TEST_delay_execute_async_ms); |
177 | 13.4M | if (delay != 0 && |
178 | 0 | operation_type() == OperationType::kWrite && |
179 | 0 | operation_->tablet()->tablet_id() != master::kSysCatalogTabletId) { |
180 | 0 | LOG(INFO) << "T " << operation_->tablet()->tablet_id() |
181 | 0 | << " Debug sleep for: " << MonoDelta(1ms * delay) << "\n" << GetStackTrace(); |
182 | 0 | std::this_thread::sleep_for(1ms * delay); |
183 | 0 | } |
184 | | |
185 | 13.4M | auto s = preparer_->Submit(this); |
186 | | |
187 | 13.4M | if (operation_) { |
188 | 7.79M | operation_->SubmittedToPreparer(); |
189 | 7.79M | } |
190 | | |
191 | 13.4M | if (!s.ok()) { |
192 | 0 | HandleFailure(s); |
193 | 0 | } |
194 | 13.4M | } |
195 | | |
196 | 2.70M | void OperationDriver::AddedToLeader(const OpId& op_id, const OpId& committed_op_id) { |
197 | 2.70M | ADOPT_TRACE(trace()); |
198 | 2.70M | CHECK(!GetOpId().valid()); |
199 | 2.70M | op_id_copy_.store(op_id, boost::memory_order_release); |
200 | | |
201 | 2.70M | operation_->AddedToLeader(op_id, committed_op_id); |
202 | | |
203 | 2.70M | StartOperation(); |
204 | 2.70M | } |
205 | | |
206 | 10.7M | void OperationDriver::PrepareAndStartTask() { |
207 | 10.7M | TRACE_EVENT_FLOW_END0("operation", "PrepareAndStartTask", this); |
208 | 10.7M | Status prepare_status = PrepareAndStart(); |
209 | 10.7M | if (PREDICT_FALSE(!prepare_status.ok())) { |
210 | 0 | HandleFailure(prepare_status); |
211 | 0 | } |
212 | 10.7M | } |
213 | | |
214 | 13.5M | bool OperationDriver::StartOperation() { |
215 | 13.5M | if (propagated_safe_time_) { |
216 | 10.1M | mvcc_->SetPropagatedSafeTimeOnFollower(propagated_safe_time_); |
217 | 10.1M | } |
218 | 13.5M | if (!operation_) { |
219 | 5.70M | operation_tracker_->Release(this, nullptr /* applied_op_ids */); |
220 | 5.70M | return false; |
221 | 5.70M | } |
222 | 7.80M | return true; |
223 | 7.80M | } |
224 | | |
225 | 13.4M | Status OperationDriver::PrepareAndStart() { |
226 | 13.4M | ADOPT_TRACE(trace()); |
227 | 13.4M | TRACE_EVENT1("operation", "PrepareAndStart", "operation", this); |
228 | 1.95k | VLOG_WITH_PREFIX(4) << "PrepareAndStart()"; |
229 | | // Actually prepare and start the operation. |
230 | 13.4M | prepare_physical_hybrid_time_ = GetMonoTimeMicros(); |
231 | 13.4M | if (operation_) { |
232 | 7.79M | RETURN_NOT_OK(operation_->Prepare()); |
233 | 7.79M | } |
234 | | |
235 | | // Only take the lock long enough to take a local copy of the |
236 | | // replication state and set our prepare state. This ensures that |
237 | | // exactly one of Replicate/Prepare callbacks will trigger the apply |
238 | | // phase. |
239 | 13.4M | ReplicationState repl_state_copy; |
240 | 13.4M | { |
241 | 13.4M | std::lock_guard<simple_spinlock> lock(lock_); |
242 | 13.4M | CHECK_EQ(prepare_state_, NOT_PREPARED); |
243 | 13.4M | repl_state_copy = replication_state_; |
244 | 13.4M | } |
245 | | |
246 | 13.4M | if (repl_state_copy != NOT_REPLICATING) { |
247 | | // We want to call Start() as soon as possible, because the operation already has the |
248 | | // hybrid_time assigned. |
249 | 10.8M | if (!StartOperation()) { |
250 | 5.71M | return Status::OK(); |
251 | 5.71M | } |
252 | 7.78M | } |
253 | | |
254 | 7.78M | { |
255 | 7.78M | std::lock_guard<simple_spinlock> lock(lock_); |
256 | | // No one should have modified prepare_state_ since we've read it under the lock a few lines |
257 | | // above, because PrepareAndStart should only run once per operation. |
258 | 7.78M | CHECK_EQ(prepare_state_, NOT_PREPARED); |
259 | | // After this update, the ReplicationFinished callback will be able to apply this operation. |
260 | | // We can only do this after we've called Start() |
261 | 7.78M | prepare_state_ = PREPARED; |
262 | | |
263 | 7.78M | if (replication_state_ == NOT_REPLICATING) { |
264 | 2.70M | replication_state_ = REPLICATING; |
265 | 2.70M | } |
266 | 7.78M | } |
267 | | |
268 | 7.78M | return Status::OK(); |
269 | 7.78M | } |
270 | | |
271 | 13.5M | OperationDriver::~OperationDriver() { |
272 | 13.5M | } |
273 | | |
274 | 410 | void OperationDriver::HandleFailure(const Status& status) { |
275 | 410 | ReplicationState repl_state_copy; |
276 | | |
277 | 410 | { |
278 | 410 | std::lock_guard<simple_spinlock> lock(lock_); |
279 | 410 | repl_state_copy = replication_state_; |
280 | 410 | } |
281 | | |
282 | 0 | VLOG_WITH_PREFIX(2) << "Failed operation: " << status; |
283 | 410 | CHECK(!status.ok()); |
284 | 410 | ADOPT_TRACE(trace()); |
285 | 410 | TRACE("HandleFailure($0)", status.ToString()); |
286 | | |
287 | 410 | switch (repl_state_copy) { |
288 | 0 | case NOT_REPLICATING: |
289 | 410 | case REPLICATION_FAILED: |
290 | 410 | { |
291 | 0 | VLOG_WITH_PREFIX(1) << "Operation " << ToString() << " failed prior to " |
292 | 0 | "replication success: " << status; |
293 | 410 | operation_->Aborted(status, op_id_copy_.load().valid()); |
294 | 410 | operation_tracker_->Release(this, nullptr /* applied_op_ids */); |
295 | 410 | return; |
296 | 0 | } |
297 | |
|
298 | 0 | case REPLICATING: |
299 | 0 | case REPLICATED: |
300 | 0 | { |
301 | 0 | LOG_WITH_PREFIX(FATAL) << "Cannot cancel operations that have already replicated" |
302 | 0 | << ": " << status << " operation: " << ToString(); |
303 | 0 | } |
304 | 410 | } |
305 | 410 | } |
306 | | |
307 | | void OperationDriver::ReplicationFinished( |
308 | 7.79M | const Status& status, int64_t leader_term, OpIds* applied_op_ids) { |
309 | 1.89k | LOG_IF(DFATAL, status.ok() && !GetOpId().valid()) << "Invalid op id after replication"; |
310 | | |
311 | 7.79M | PrepareState prepare_state_copy; |
312 | 7.79M | { |
313 | 7.79M | std::lock_guard<simple_spinlock> lock(lock_); |
314 | 7.79M | if (replication_state_ == REPLICATION_FAILED) { |
315 | 0 | LOG_IF(DFATAL, status.ok()) << "Successfully replicated operation that was previously failed"; |
316 | 0 | return; |
317 | 0 | } |
318 | 7.79M | CHECK_EQ(replication_state_, REPLICATING); |
319 | 7.79M | if (status.ok()) { |
320 | 7.79M | replication_state_ = REPLICATED; |
321 | 18.4E | } else { |
322 | 18.4E | replication_state_ = REPLICATION_FAILED; |
323 | 18.4E | } |
324 | 7.79M | prepare_state_copy = prepare_state_; |
325 | 7.79M | } |
326 | | |
327 | | // If we have prepared and replicated, we're ready to move ahead and apply this operation. |
328 | | // Note that if we set the state to REPLICATION_FAILED above, ApplyOperation() will actually abort |
329 | | // the operation, i.e. ApplyTask() will never be called and the operation will never be applied to |
330 | | // the tablet. |
331 | 7.79M | if (prepare_state_copy != PrepareState::PREPARED) { |
332 | 0 | LOG(DFATAL) << "Replicating an operation that has not been prepared: " << AsString(this); |
333 | |
|
334 | 0 | LOG(ERROR) << "Attempting to wait for the operation to be prepared"; |
335 | | |
336 | | // This case should never happen, but if it happens we are trying to survive. |
337 | 0 | for (;;) { |
338 | 0 | std::this_thread::sleep_for(1ms); |
339 | 0 | PrepareState prepare_state; |
340 | 0 | { |
341 | 0 | std::lock_guard<simple_spinlock> lock(lock_); |
342 | 0 | prepare_state = prepare_state_; |
343 | 0 | if (prepare_state == PrepareState::PREPARED) { |
344 | 0 | break; |
345 | 0 | } |
346 | 0 | } |
347 | 0 | YB_LOG_EVERY_N_SECS(WARNING, 1) |
348 | 0 | << "Waiting for the operation to be prepared, current state: " << prepare_state; |
349 | 0 | } |
350 | 0 | } |
351 | | |
352 | 7.79M | if (status.ok()) { |
353 | 7.79M | TRACE_EVENT_FLOW_BEGIN0("operation", "ApplyTask", this); |
354 | 7.79M | ApplyTask(leader_term, applied_op_ids); |
355 | 18.4E | } else { |
356 | 18.4E | HandleFailure(status); |
357 | 18.4E | } |
358 | 7.79M | } |
359 | | |
360 | 0 | void OperationDriver::Abort(const Status& status) { |
361 | 0 | CHECK(!status.ok()); |
362 | |
|
363 | 0 | ReplicationState repl_state_copy; |
364 | 0 | { |
365 | 0 | std::lock_guard<simple_spinlock> lock(lock_); |
366 | 0 | repl_state_copy = replication_state_; |
367 | 0 | } |
368 | | |
369 | | // If the state is not NOT_REPLICATING we abort immediately and the operation |
370 | | // will never be replicated. |
371 | | // In any other state we just set the operation status, if the operation's |
372 | | // Apply hasn't started yet this prevents it from starting, but if it has then |
373 | | // the operation runs to completion. |
374 | 0 | if (repl_state_copy == NOT_REPLICATING) { |
375 | 0 | HandleFailure(status); |
376 | 0 | } |
377 | 0 | } |
378 | | |
379 | 7.79M | void OperationDriver::ApplyTask(int64_t leader_term, OpIds* applied_op_ids) { |
380 | 7.79M | TRACE_EVENT_FLOW_END0("operation", "ApplyTask", this); |
381 | 7.79M | ADOPT_TRACE(trace()); |
382 | | |
383 | 7.79M | #ifndef NDEBUG |
384 | 7.79M | { |
385 | 7.79M | std::lock_guard<simple_spinlock> lock(lock_); |
386 | 7.79M | DCHECK_EQ(replication_state_, REPLICATED); |
387 | 7.79M | DCHECK_EQ(prepare_state_, PREPARED); |
388 | 7.79M | } |
389 | 7.79M | #endif |
390 | | |
391 | | // We need to ref-count ourself, since Commit() may run very quickly |
392 | | // and end up calling Finalize() while we're still in this code. |
393 | 7.79M | scoped_refptr<OperationDriver> ref(this); |
394 | | |
395 | 7.79M | { |
396 | 7.79M | auto status = operation_->Replicated(leader_term); |
397 | 1.41k | LOG_IF_WITH_PREFIX(FATAL, !status.ok()) |
398 | 1.41k | << "Apply failed: " << status |
399 | 1.41k | << ", request: " << operation_->request()->ShortDebugString(); |
400 | 7.79M | operation_tracker_->Release(this, applied_op_ids); |
401 | 7.79M | } |
402 | 7.79M | } |
403 | | |
404 | | std::string OperationDriver::StateString(ReplicationState repl_state, |
405 | 5 | PrepareState prep_state) { |
406 | 5 | string state_str; |
407 | 5 | switch (repl_state) { |
408 | 0 | case NOT_REPLICATING: |
409 | 0 | StrAppend(&state_str, "NR-"); // For Not Replicating |
410 | 0 | break; |
411 | 1 | case REPLICATING: |
412 | 1 | StrAppend(&state_str, "R-"); // For Replicating |
413 | 1 | break; |
414 | 0 | case REPLICATION_FAILED: |
415 | 0 | StrAppend(&state_str, "RF-"); // For Replication Failed |
416 | 0 | break; |
417 | 4 | case REPLICATED: |
418 | 4 | StrAppend(&state_str, "RD-"); // For Replication Done |
419 | 4 | break; |
420 | 0 | default: |
421 | 0 | LOG(DFATAL) << "Unexpected replication state: " << repl_state; |
422 | 5 | } |
423 | 5 | switch (prep_state) { |
424 | 5 | case PREPARED: |
425 | 5 | StrAppend(&state_str, "P"); |
426 | 5 | break; |
427 | 0 | case NOT_PREPARED: |
428 | 0 | StrAppend(&state_str, "NP"); |
429 | 0 | break; |
430 | 0 | default: |
431 | 0 | LOG(DFATAL) << "Unexpected prepare state: " << prep_state; |
432 | 5 | } |
433 | 5 | return state_str; |
434 | 5 | } |
435 | | |
436 | 4 | std::string OperationDriver::LogPrefix() const { |
437 | 4 | ReplicationState repl_state_copy; |
438 | 4 | PrepareState prep_state_copy; |
439 | 4 | std::string ts_string; |
440 | 4 | OperationType operation_type; |
441 | | |
442 | 4 | { |
443 | 4 | std::lock_guard<simple_spinlock> lock(lock_); |
444 | 4 | repl_state_copy = replication_state_; |
445 | 4 | prep_state_copy = prepare_state_; |
446 | 4 | ts_string = operation_ && operation_->has_hybrid_time() |
447 | 4 | ? operation_->hybrid_time().ToString() : "No hybrid_time"; |
448 | 4 | operation_type = this->operation_type(); |
449 | 4 | } |
450 | | |
451 | 4 | string state_str = StateString(repl_state_copy, prep_state_copy); |
452 | | // We use the tablet and the peer (T, P) to identify ts and tablet and the hybrid_time (Ts) to |
453 | | // (help) identify the operation. The state string (S) describes the state of the operation. |
454 | 4 | return Format("T $0 P $1 S $2 Ts $3 $4: ", |
455 | | // consensus_ is NULL in some unit tests. |
456 | 4 | PREDICT_TRUE(consensus_) ? consensus_->tablet_id() : "(unknown)", |
457 | 4 | PREDICT_TRUE(consensus_) ? consensus_->peer_uuid() : "(unknown)", |
458 | 4 | state_str, ts_string, operation_type); |
459 | 4 | } |
460 | | |
461 | 13.5M | int64_t OperationDriver::SpaceUsed() { |
462 | 13.5M | if (!operation_) { |
463 | 5.70M | return 0; |
464 | 5.70M | } |
465 | 7.79M | auto consensus_round = operation_->consensus_round(); |
466 | 7.79M | if (consensus_round) { |
467 | 7.77M | return consensus_round->replicate_msg()->SpaceUsedLong(); |
468 | 7.77M | } |
469 | 17.5k | return operation()->request()->SpaceUsedLong(); |
470 | 17.5k | } |
471 | | |
472 | | } // namespace tablet |
473 | | } // namespace yb |