YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/tablet/operations/operation_driver.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include "yb/tablet/operations/operation_driver.h"
34
35
#include <atomic>
36
#include <future>
37
#include <map>
38
#include <memory>
39
#include <mutex>
40
#include <string>
41
#include <vector>
42
43
#include "yb/consensus/consensus_fwd.h"
44
#include "yb/consensus/consensus.h"
45
#include "yb/consensus/consensus.pb.h"
46
47
#include "yb/gutil/callback.h"
48
#include "yb/gutil/ref_counted.h"
49
#include "yb/gutil/strings/substitute.h"
50
#include "yb/gutil/thread_annotations.h"
51
52
#include "yb/master/sys_catalog_constants.h"
53
54
#include "yb/rpc/rpc_fwd.h"
55
56
#include "yb/tablet/mvcc.h"
57
#include "yb/tablet/operations/operation_tracker.h"
58
#include "yb/tablet/preparer.h"
59
#include "yb/tablet/tablet.h"
60
#include "yb/tablet/tablet_options.h"
61
62
#include "yb/util/atomic.h"
63
#include "yb/util/debug-util.h"
64
#include "yb/util/debug/trace_event.h"
65
#include "yb/util/flag_tags.h"
66
#include "yb/util/logging.h"
67
#include "yb/util/threadpool.h"
68
#include "yb/util/trace.h"
69
70
using namespace std::literals;
71
72
DEFINE_test_flag(int32, delay_execute_async_ms, 0,
73
                 "Delay execution of ExecuteAsync for specified amount of milliseconds during "
74
                     "tests");
75
76
namespace yb {
77
namespace tablet {
78
79
using namespace std::placeholders;
80
81
using consensus::Consensus;
82
using consensus::ConsensusRound;
83
84
////////////////////////////////////////////////////////////
85
// OperationDriver
86
////////////////////////////////////////////////////////////
87
88
OperationDriver::OperationDriver(OperationTracker *operation_tracker,
89
                                 Consensus* consensus,
90
                                 Preparer* preparer,
91
                                 TableType table_type)
92
    : operation_tracker_(operation_tracker),
93
      consensus_(consensus),
94
      preparer_(preparer),
95
      trace_(new Trace()),
96
      start_time_(MonoTime::Now()),
97
      replication_state_(NOT_REPLICATING),
98
      prepare_state_(NOT_PREPARED),
99
31.4M
      table_type_(table_type) {
100
31.4M
  if (Trace::CurrentTrace()) {
101
6.49M
    Trace::CurrentTrace()->AddChildTrace(trace_.get());
102
6.49M
  }
103
31.4M
  DCHECK(IsAcceptableAtomicImpl(op_id_copy_));
104
31.4M
}
105
106
31.4M
Status OperationDriver::Init(std::unique_ptr<Operation>* operation, int64_t term) {
107
31.4M
  if (operation) {
108
14.3M
    operation_ = std::move(*operation);
109
14.3M
  }
110
111
31.4M
  if (term == OpId::kUnknownTerm) {
112
26.4M
    if (operation_) {
113
9.27M
      op_id_copy_.store(operation_->op_id(), boost::memory_order_release);
114
9.27M
    }
115
26.4M
    replication_state_ = REPLICATING;
116
26.4M
  } else {
117
5.04M
    if (
consensus_5.03M
) { // sometimes NULL in tests
118
5.04M
      consensus::ReplicateMsgPtr replicate_msg = operation_->NewReplicateMsg();
119
5.04M
      auto round = make_scoped_refptr<ConsensusRound>(consensus_, std::move(replicate_msg));
120
5.04M
      round->BindToTerm(term);
121
5.04M
      round->SetCallback(this);
122
5.04M
      mutable_operation()->set_consensus_round(std::move(round));
123
5.04M
    }
124
5.03M
  }
125
126
31.4M
  auto result = operation_tracker_->Add(this);
127
31.4M
  if (!result.ok() && 
operation4
) {
128
4
    *operation = std::move(operation_);
129
4
  }
130
131
31.4M
  if (term == OpId::kUnknownTerm && 
operation_26.4M
) {
132
9.27M
    operation_->AddedToFollower();
133
9.27M
  }
134
135
31.4M
  return result;
136
31.4M
}
137
138
51.8M
yb::OpId OperationDriver::GetOpId() {
139
51.8M
  return op_id_copy_.load(boost::memory_order_acquire);
140
51.8M
}
141
142
4
const Operation* OperationDriver::operation() const {
143
4
  return operation_.get();
144
4
}
145
146
19.7M
Operation* OperationDriver::mutable_operation() {
147
19.7M
  return operation_.get();
148
19.7M
}
149
150
99.4M
OperationType OperationDriver::operation_type() const {
151
99.4M
  return operation_ ? 
operation_->operation_type()48.0M
:
OperationType::kEmpty51.3M
;
152
99.4M
}
153
154
0
string OperationDriver::ToString() const {
155
0
  std::lock_guard<simple_spinlock> lock(lock_);
156
0
  return ToStringUnlocked();
157
0
}
158
159
0
string OperationDriver::ToStringUnlocked() const {
160
0
  string ret = StateString(replication_state_, prepare_state_);
161
0
  if (operation_ != nullptr) {
162
0
    ret += " " + operation_->ToString();
163
0
  } else {
164
0
    ret += "[unknown operation]";
165
0
  }
166
0
  return ret;
167
0
}
168
169
170
31.4M
void OperationDriver::ExecuteAsync() {
171
31.4M
  
VLOG_WITH_PREFIX25.8k
(4) << "ExecuteAsync()"25.8k
;
172
31.4M
  TRACE_EVENT_FLOW_BEGIN0("operation", "ExecuteAsync", this);
173
31.4M
  ADOPT_TRACE(trace());
174
31.4M
  TRACE_FUNC();
175
176
31.4M
  auto delay = GetAtomicFlag(&FLAGS_TEST_delay_execute_async_ms);
177
31.4M
  if (delay != 0 &&
178
31.4M
      
operation_type() == OperationType::kWrite0
&&
179
31.4M
      
operation_->tablet()->tablet_id() != master::kSysCatalogTabletId0
) {
180
0
    LOG(INFO) << "T " << operation_->tablet()->tablet_id()
181
0
              << " Debug sleep for: " << MonoDelta(1ms * delay) << "\n" << GetStackTrace();
182
0
    std::this_thread::sleep_for(1ms * delay);
183
0
  }
184
185
31.4M
  auto s = preparer_->Submit(this);
186
187
31.4M
  if (operation_) {
188
14.3M
    operation_->SubmittedToPreparer();
189
14.3M
  }
190
191
31.4M
  if (!s.ok()) {
192
0
    HandleFailure(s);
193
0
  }
194
31.4M
}
195
196
5.04M
void OperationDriver::AddedToLeader(const OpId& op_id, const OpId& committed_op_id) {
197
5.04M
  ADOPT_TRACE(trace());
198
5.04M
  CHECK(!GetOpId().valid());
199
5.04M
  op_id_copy_.store(op_id, boost::memory_order_release);
200
201
5.04M
  operation_->AddedToLeader(op_id, committed_op_id);
202
203
5.04M
  StartOperation();
204
5.04M
}
205
206
26.4M
void OperationDriver::PrepareAndStartTask() {
207
26.4M
  TRACE_EVENT_FLOW_END0("operation", "PrepareAndStartTask", this);
208
26.4M
  Status prepare_status = PrepareAndStart();
209
26.4M
  if (PREDICT_FALSE(!prepare_status.ok())) {
210
0
    HandleFailure(prepare_status);
211
0
  }
212
26.4M
}
213
214
31.4M
bool OperationDriver::StartOperation() {
215
31.4M
  if (propagated_safe_time_) {
216
25.2M
    mvcc_->SetPropagatedSafeTimeOnFollower(propagated_safe_time_);
217
25.2M
  }
218
31.4M
  if (!operation_) {
219
17.1M
    operation_tracker_->Release(this, nullptr /* applied_op_ids */);
220
17.1M
    return false;
221
17.1M
  }
222
14.3M
  return true;
223
31.4M
}
224
225
31.4M
Status OperationDriver::PrepareAndStart() {
226
31.4M
  ADOPT_TRACE(trace());
227
31.4M
  TRACE_EVENT1("operation", "PrepareAndStart", "operation", this);
228
31.4M
  
VLOG_WITH_PREFIX6.12k
(4) << "PrepareAndStart()"6.12k
;
229
  // Actually prepare and start the operation.
230
31.4M
  prepare_physical_hybrid_time_ = GetMonoTimeMicros();
231
31.4M
  if (operation_) {
232
14.3M
    RETURN_NOT_OK(operation_->Prepare());
233
14.3M
  }
234
235
  // Only take the lock long enough to take a local copy of the
236
  // replication state and set our prepare state. This ensures that
237
  // exactly one of Replicate/Prepare callbacks will trigger the apply
238
  // phase.
239
31.4M
  ReplicationState repl_state_copy;
240
31.4M
  {
241
31.4M
    std::lock_guard<simple_spinlock> lock(lock_);
242
31.4M
    CHECK_EQ(prepare_state_, NOT_PREPARED);
243
31.4M
    repl_state_copy = replication_state_;
244
31.4M
  }
245
246
31.4M
  if (repl_state_copy != NOT_REPLICATING) {
247
    // We want to call Start() as soon as possible, because the operation already has the
248
    // hybrid_time assigned.
249
26.4M
    if (!StartOperation()) {
250
17.1M
      return Status::OK();
251
17.1M
    }
252
26.4M
  }
253
254
14.3M
  {
255
14.3M
    std::lock_guard<simple_spinlock> lock(lock_);
256
    // No one should have modified prepare_state_ since we've read it under the lock a few lines
257
    // above, because PrepareAndStart should only run once per operation.
258
14.3M
    CHECK_EQ(prepare_state_, NOT_PREPARED);
259
    // After this update, the ReplicationFinished callback will be able to apply this operation.
260
    // We can only do this after we've called Start()
261
14.3M
    prepare_state_ = PREPARED;
262
263
14.3M
    if (replication_state_ == NOT_REPLICATING) {
264
5.04M
      replication_state_ = REPLICATING;
265
5.04M
    }
266
14.3M
  }
267
268
14.3M
  return Status::OK();
269
31.4M
}
270
271
31.4M
OperationDriver::~OperationDriver() {
272
31.4M
}
273
274
605
void OperationDriver::HandleFailure(const Status& status) {
275
605
  ReplicationState repl_state_copy;
276
277
605
  {
278
605
    std::lock_guard<simple_spinlock> lock(lock_);
279
605
    repl_state_copy = replication_state_;
280
605
  }
281
282
605
  
VLOG_WITH_PREFIX0
(2) << "Failed operation: " << status0
;
283
605
  CHECK(!status.ok());
284
605
  ADOPT_TRACE(trace());
285
605
  TRACE("HandleFailure($0)", status.ToString());
286
287
605
  switch (repl_state_copy) {
288
0
    case NOT_REPLICATING:
289
605
    case REPLICATION_FAILED:
290
605
    {
291
605
      
VLOG_WITH_PREFIX0
(1) << "Operation " << ToString() << " failed prior to "
292
0
          "replication success: " << status;
293
605
      operation_->Aborted(status, op_id_copy_.load().valid());
294
605
      operation_tracker_->Release(this, nullptr /* applied_op_ids */);
295
605
      return;
296
0
    }
297
298
0
    case REPLICATING:
299
0
    case REPLICATED:
300
0
    {
301
0
      LOG_WITH_PREFIX(FATAL) << "Cannot cancel operations that have already replicated"
302
0
                             << ": " << status << " operation: " << ToString();
303
0
    }
304
605
  }
305
605
}
306
307
void OperationDriver::ReplicationFinished(
308
14.3M
    const Status& status, int64_t leader_term, OpIds* applied_op_ids) {
309
14.3M
  LOG_IF
(DFATAL, status.ok() && !GetOpId().valid()) << "Invalid op id after replication"3.26k
;
310
311
14.3M
  PrepareState prepare_state_copy;
312
14.3M
  {
313
14.3M
    std::lock_guard<simple_spinlock> lock(lock_);
314
14.3M
    if (replication_state_ == REPLICATION_FAILED) {
315
0
      LOG_IF(DFATAL, status.ok()) << "Successfully replicated operation that was previously failed";
316
0
      return;
317
0
    }
318
14.3M
    CHECK_EQ(replication_state_, REPLICATING);
319
14.3M
    if (status.ok()) {
320
14.3M
      replication_state_ = REPLICATED;
321
14.3M
    } else {
322
404
      replication_state_ = REPLICATION_FAILED;
323
404
    }
324
14.3M
    prepare_state_copy = prepare_state_;
325
14.3M
  }
326
327
  // If we have prepared and replicated, we're ready to move ahead and apply this operation.
328
  // Note that if we set the state to REPLICATION_FAILED above, ApplyOperation() will actually abort
329
  // the operation, i.e. ApplyTask() will never be called and the operation will never be applied to
330
  // the tablet.
331
14.3M
  if (prepare_state_copy != PrepareState::PREPARED) {
332
0
    LOG(DFATAL) << "Replicating an operation that has not been prepared: " << AsString(this);
333
334
0
    LOG(ERROR) << "Attempting to wait for the operation to be prepared";
335
336
    // This case should never happen, but if it happens we are trying to survive.
337
0
    for (;;) {
338
0
      std::this_thread::sleep_for(1ms);
339
0
      PrepareState prepare_state;
340
0
      {
341
0
        std::lock_guard<simple_spinlock> lock(lock_);
342
0
        prepare_state = prepare_state_;
343
0
        if (prepare_state == PrepareState::PREPARED) {
344
0
          break;
345
0
        }
346
0
      }
347
0
      YB_LOG_EVERY_N_SECS(WARNING, 1)
348
0
          << "Waiting for the operation to be prepared, current state: " << prepare_state;
349
0
    }
350
0
  }
351
352
14.3M
  if (
status.ok()14.3M
) {
353
14.3M
    TRACE_EVENT_FLOW_BEGIN0("operation", "ApplyTask", this);
354
14.3M
    ApplyTask(leader_term, applied_op_ids);
355
18.4E
  } else {
356
18.4E
    HandleFailure(status);
357
18.4E
  }
358
14.3M
}
359
360
0
void OperationDriver::Abort(const Status& status) {
361
0
  CHECK(!status.ok());
362
363
0
  ReplicationState repl_state_copy;
364
0
  {
365
0
    std::lock_guard<simple_spinlock> lock(lock_);
366
0
    repl_state_copy = replication_state_;
367
0
  }
368
369
  // If the state is not NOT_REPLICATING we abort immediately and the operation
370
  // will never be replicated.
371
  // In any other state we just set the operation status, if the operation's
372
  // Apply hasn't started yet this prevents it from starting, but if it has then
373
  // the operation runs to completion.
374
0
  if (repl_state_copy == NOT_REPLICATING) {
375
0
    HandleFailure(status);
376
0
  }
377
0
}
378
379
14.3M
void OperationDriver::ApplyTask(int64_t leader_term, OpIds* applied_op_ids) {
380
14.3M
  TRACE_EVENT_FLOW_END0("operation", "ApplyTask", this);
381
14.3M
  ADOPT_TRACE(trace());
382
383
14.3M
#ifndef NDEBUG
384
14.3M
  {
385
14.3M
    std::lock_guard<simple_spinlock> lock(lock_);
386
14.3M
    DCHECK_EQ(replication_state_, REPLICATED);
387
14.3M
    DCHECK_EQ(prepare_state_, PREPARED);
388
14.3M
  }
389
14.3M
#endif
390
391
  // We need to ref-count ourself, since Commit() may run very quickly
392
  // and end up calling Finalize() while we're still in this code.
393
14.3M
  scoped_refptr<OperationDriver> ref(this);
394
395
14.3M
  {
396
14.3M
    auto status = operation_->Replicated(leader_term);
397
14.3M
    
LOG_IF_WITH_PREFIX1.23k
(FATAL, !status.ok())
398
1.23k
        << "Apply failed: " << status
399
1.23k
        << ", request: " << operation_->request()->ShortDebugString();
400
14.3M
    operation_tracker_->Release(this, applied_op_ids);
401
14.3M
  }
402
14.3M
}
403
404
std::string OperationDriver::StateString(ReplicationState repl_state,
405
8
                                           PrepareState prep_state) {
406
8
  string state_str;
407
8
  switch (repl_state) {
408
0
    case NOT_REPLICATING:
409
0
      StrAppend(&state_str, "NR-");  // For Not Replicating
410
0
      break;
411
0
    case REPLICATING:
412
0
      StrAppend(&state_str, "R-");  // For Replicating
413
0
      break;
414
0
    case REPLICATION_FAILED:
415
0
      StrAppend(&state_str, "RF-");  // For Replication Failed
416
0
      break;
417
8
    case REPLICATED:
418
8
      StrAppend(&state_str, "RD-");  // For Replication Done
419
8
      break;
420
0
    default:
421
0
      LOG(DFATAL) << "Unexpected replication state: " << repl_state;
422
8
  }
423
8
  switch (prep_state) {
424
8
    case PREPARED:
425
8
      StrAppend(&state_str, "P");
426
8
      break;
427
0
    case NOT_PREPARED:
428
0
      StrAppend(&state_str, "NP");
429
0
      break;
430
0
    default:
431
0
      LOG(DFATAL) << "Unexpected prepare state: " << prep_state;
432
8
  }
433
8
  return state_str;
434
8
}
435
436
8
std::string OperationDriver::LogPrefix() const {
437
8
  ReplicationState repl_state_copy;
438
8
  PrepareState prep_state_copy;
439
8
  std::string ts_string;
440
8
  OperationType operation_type;
441
442
8
  {
443
8
    std::lock_guard<simple_spinlock> lock(lock_);
444
8
    repl_state_copy = replication_state_;
445
8
    prep_state_copy = prepare_state_;
446
8
    ts_string = operation_ && operation_->has_hybrid_time()
447
8
        ? operation_->hybrid_time().ToString() : 
"No hybrid_time"0
;
448
8
    operation_type = this->operation_type();
449
8
  }
450
451
8
  string state_str = StateString(repl_state_copy, prep_state_copy);
452
  // We use the tablet and the peer (T, P) to identify ts and tablet and the hybrid_time (Ts) to
453
  // (help) identify the operation. The state string (S) describes the state of the operation.
454
8
  return Format("T $0 P $1 S $2 Ts $3 $4: ",
455
                // consensus_ is NULL in some unit tests.
456
8
                PREDICT_TRUE(consensus_) ? consensus_->tablet_id() : 
"(unknown)"0
,
457
8
                PREDICT_TRUE(consensus_) ? consensus_->peer_uuid() : 
"(unknown)"0
,
458
8
                state_str, ts_string, operation_type);
459
8
}
460
461
31.4M
int64_t OperationDriver::SpaceUsed() {
462
31.4M
  if (!operation_) {
463
17.1M
    return 0;
464
17.1M
  }
465
14.3M
  auto consensus_round = operation_->consensus_round();
466
14.3M
  if (consensus_round) {
467
14.3M
    return consensus_round->replicate_msg()->SpaceUsedLong();
468
14.3M
  }
469
27.4k
  return operation()->request()->SpaceUsedLong();
470
14.3M
}
471
472
}  // namespace tablet
473
}  // namespace yb