YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/tablet/operations/operation_driver.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include "yb/tablet/operations/operation_driver.h"
34
35
#include <atomic>
36
#include <future>
37
#include <map>
38
#include <memory>
39
#include <mutex>
40
#include <string>
41
#include <vector>
42
43
#include "yb/consensus/consensus_fwd.h"
44
#include "yb/consensus/consensus.h"
45
#include "yb/consensus/consensus.pb.h"
46
47
#include "yb/gutil/callback.h"
48
#include "yb/gutil/ref_counted.h"
49
#include "yb/gutil/strings/substitute.h"
50
#include "yb/gutil/thread_annotations.h"
51
52
#include "yb/master/sys_catalog_constants.h"
53
54
#include "yb/rpc/rpc_fwd.h"
55
56
#include "yb/tablet/mvcc.h"
57
#include "yb/tablet/operations/operation_tracker.h"
58
#include "yb/tablet/preparer.h"
59
#include "yb/tablet/tablet.h"
60
#include "yb/tablet/tablet_options.h"
61
62
#include "yb/util/atomic.h"
63
#include "yb/util/debug-util.h"
64
#include "yb/util/debug/trace_event.h"
65
#include "yb/util/flag_tags.h"
66
#include "yb/util/logging.h"
67
#include "yb/util/threadpool.h"
68
#include "yb/util/trace.h"
69
70
using namespace std::literals;
71
72
DEFINE_test_flag(int32, delay_execute_async_ms, 0,
73
                 "Delay execution of ExecuteAsync for specified amount of milliseconds during "
74
                     "tests");
75
76
namespace yb {
77
namespace tablet {
78
79
using namespace std::placeholders;
80
81
using consensus::Consensus;
82
using consensus::ConsensusRound;
83
84
////////////////////////////////////////////////////////////
85
// OperationDriver
86
////////////////////////////////////////////////////////////
87
88
OperationDriver::OperationDriver(OperationTracker *operation_tracker,
89
                                 Consensus* consensus,
90
                                 Preparer* preparer,
91
                                 TableType table_type)
92
    : operation_tracker_(operation_tracker),
93
      consensus_(consensus),
94
      preparer_(preparer),
95
      trace_(new Trace()),
96
      start_time_(MonoTime::Now()),
97
      replication_state_(NOT_REPLICATING),
98
      prepare_state_(NOT_PREPARED),
99
13.5M
      table_type_(table_type) {
100
13.5M
  if (Trace::CurrentTrace()) {
101
1.26M
    Trace::CurrentTrace()->AddChildTrace(trace_.get());
102
1.26M
  }
103
13.5M
  DCHECK(IsAcceptableAtomicImpl(op_id_copy_));
104
13.5M
}
105
106
13.4M
Status OperationDriver::Init(std::unique_ptr<Operation>* operation, int64_t term) {
107
13.4M
  if (operation) {
108
7.78M
    operation_ = std::move(*operation);
109
7.78M
  }
110
111
13.4M
  if (term == OpId::kUnknownTerm) {
112
10.7M
    if (operation_) {
113
5.09M
      op_id_copy_.store(operation_->op_id(), boost::memory_order_release);
114
5.09M
    }
115
10.7M
    replication_state_ = REPLICATING;
116
2.69M
  } else {
117
2.69M
    if (consensus_) {  // sometimes NULL in tests
118
2.69M
      consensus::ReplicateMsgPtr replicate_msg = operation_->NewReplicateMsg();
119
2.69M
      auto round = make_scoped_refptr<ConsensusRound>(consensus_, std::move(replicate_msg));
120
2.69M
      round->BindToTerm(term);
121
2.69M
      round->SetCallback(this);
122
2.69M
      mutable_operation()->set_consensus_round(std::move(round));
123
2.69M
    }
124
2.69M
  }
125
126
13.4M
  auto result = operation_tracker_->Add(this);
127
13.4M
  if (!result.ok() && operation) {
128
2
    *operation = std::move(operation_);
129
2
  }
130
131
13.4M
  if (term == OpId::kUnknownTerm && operation_) {
132
5.09M
    operation_->AddedToFollower();
133
5.09M
  }
134
135
13.4M
  return result;
136
13.4M
}
137
138
24.2M
yb::OpId OperationDriver::GetOpId() {
139
24.2M
  return op_id_copy_.load(boost::memory_order_acquire);
140
24.2M
}
141
142
2
const Operation* OperationDriver::operation() const {
143
2
  return operation_.get();
144
2
}
145
146
10.6M
Operation* OperationDriver::mutable_operation() {
147
10.6M
  return operation_.get();
148
10.6M
}
149
150
43.1M
OperationType OperationDriver::operation_type() const {
151
26.0M
  return operation_ ? operation_->operation_type() : OperationType::kEmpty;
152
43.1M
}
153
154
1
string OperationDriver::ToString() const {
155
1
  std::lock_guard<simple_spinlock> lock(lock_);
156
1
  return ToStringUnlocked();
157
1
}
158
159
1
string OperationDriver::ToStringUnlocked() const {
160
1
  string ret = StateString(replication_state_, prepare_state_);
161
1
  if (operation_ != nullptr) {
162
1
    ret += " " + operation_->ToString();
163
0
  } else {
164
0
    ret += "[unknown operation]";
165
0
  }
166
1
  return ret;
167
1
}
168
169
170
13.4M
void OperationDriver::ExecuteAsync() {
171
7.70k
  VLOG_WITH_PREFIX(4) << "ExecuteAsync()";
172
13.4M
  TRACE_EVENT_FLOW_BEGIN0("operation", "ExecuteAsync", this);
173
13.4M
  ADOPT_TRACE(trace());
174
13.4M
  TRACE_FUNC();
175
176
13.4M
  auto delay = GetAtomicFlag(&FLAGS_TEST_delay_execute_async_ms);
177
13.4M
  if (delay != 0 &&
178
0
      operation_type() == OperationType::kWrite &&
179
0
      operation_->tablet()->tablet_id() != master::kSysCatalogTabletId) {
180
0
    LOG(INFO) << "T " << operation_->tablet()->tablet_id()
181
0
              << " Debug sleep for: " << MonoDelta(1ms * delay) << "\n" << GetStackTrace();
182
0
    std::this_thread::sleep_for(1ms * delay);
183
0
  }
184
185
13.4M
  auto s = preparer_->Submit(this);
186
187
13.4M
  if (operation_) {
188
7.79M
    operation_->SubmittedToPreparer();
189
7.79M
  }
190
191
13.4M
  if (!s.ok()) {
192
0
    HandleFailure(s);
193
0
  }
194
13.4M
}
195
196
2.70M
void OperationDriver::AddedToLeader(const OpId& op_id, const OpId& committed_op_id) {
197
2.70M
  ADOPT_TRACE(trace());
198
2.70M
  CHECK(!GetOpId().valid());
199
2.70M
  op_id_copy_.store(op_id, boost::memory_order_release);
200
201
2.70M
  operation_->AddedToLeader(op_id, committed_op_id);
202
203
2.70M
  StartOperation();
204
2.70M
}
205
206
10.7M
void OperationDriver::PrepareAndStartTask() {
207
10.7M
  TRACE_EVENT_FLOW_END0("operation", "PrepareAndStartTask", this);
208
10.7M
  Status prepare_status = PrepareAndStart();
209
10.7M
  if (PREDICT_FALSE(!prepare_status.ok())) {
210
0
    HandleFailure(prepare_status);
211
0
  }
212
10.7M
}
213
214
13.5M
bool OperationDriver::StartOperation() {
215
13.5M
  if (propagated_safe_time_) {
216
10.1M
    mvcc_->SetPropagatedSafeTimeOnFollower(propagated_safe_time_);
217
10.1M
  }
218
13.5M
  if (!operation_) {
219
5.70M
    operation_tracker_->Release(this, nullptr /* applied_op_ids */);
220
5.70M
    return false;
221
5.70M
  }
222
7.80M
  return true;
223
7.80M
}
224
225
13.4M
Status OperationDriver::PrepareAndStart() {
226
13.4M
  ADOPT_TRACE(trace());
227
13.4M
  TRACE_EVENT1("operation", "PrepareAndStart", "operation", this);
228
1.95k
  VLOG_WITH_PREFIX(4) << "PrepareAndStart()";
229
  // Actually prepare and start the operation.
230
13.4M
  prepare_physical_hybrid_time_ = GetMonoTimeMicros();
231
13.4M
  if (operation_) {
232
7.79M
    RETURN_NOT_OK(operation_->Prepare());
233
7.79M
  }
234
235
  // Only take the lock long enough to take a local copy of the
236
  // replication state and set our prepare state. This ensures that
237
  // exactly one of Replicate/Prepare callbacks will trigger the apply
238
  // phase.
239
13.4M
  ReplicationState repl_state_copy;
240
13.4M
  {
241
13.4M
    std::lock_guard<simple_spinlock> lock(lock_);
242
13.4M
    CHECK_EQ(prepare_state_, NOT_PREPARED);
243
13.4M
    repl_state_copy = replication_state_;
244
13.4M
  }
245
246
13.4M
  if (repl_state_copy != NOT_REPLICATING) {
247
    // We want to call Start() as soon as possible, because the operation already has the
248
    // hybrid_time assigned.
249
10.8M
    if (!StartOperation()) {
250
5.71M
      return Status::OK();
251
5.71M
    }
252
7.78M
  }
253
254
7.78M
  {
255
7.78M
    std::lock_guard<simple_spinlock> lock(lock_);
256
    // No one should have modified prepare_state_ since we've read it under the lock a few lines
257
    // above, because PrepareAndStart should only run once per operation.
258
7.78M
    CHECK_EQ(prepare_state_, NOT_PREPARED);
259
    // After this update, the ReplicationFinished callback will be able to apply this operation.
260
    // We can only do this after we've called Start()
261
7.78M
    prepare_state_ = PREPARED;
262
263
7.78M
    if (replication_state_ == NOT_REPLICATING) {
264
2.70M
      replication_state_ = REPLICATING;
265
2.70M
    }
266
7.78M
  }
267
268
7.78M
  return Status::OK();
269
7.78M
}
270
271
13.5M
OperationDriver::~OperationDriver() {
272
13.5M
}
273
274
410
void OperationDriver::HandleFailure(const Status& status) {
275
410
  ReplicationState repl_state_copy;
276
277
410
  {
278
410
    std::lock_guard<simple_spinlock> lock(lock_);
279
410
    repl_state_copy = replication_state_;
280
410
  }
281
282
0
  VLOG_WITH_PREFIX(2) << "Failed operation: " << status;
283
410
  CHECK(!status.ok());
284
410
  ADOPT_TRACE(trace());
285
410
  TRACE("HandleFailure($0)", status.ToString());
286
287
410
  switch (repl_state_copy) {
288
0
    case NOT_REPLICATING:
289
410
    case REPLICATION_FAILED:
290
410
    {
291
0
      VLOG_WITH_PREFIX(1) << "Operation " << ToString() << " failed prior to "
292
0
          "replication success: " << status;
293
410
      operation_->Aborted(status, op_id_copy_.load().valid());
294
410
      operation_tracker_->Release(this, nullptr /* applied_op_ids */);
295
410
      return;
296
0
    }
297
298
0
    case REPLICATING:
299
0
    case REPLICATED:
300
0
    {
301
0
      LOG_WITH_PREFIX(FATAL) << "Cannot cancel operations that have already replicated"
302
0
                             << ": " << status << " operation: " << ToString();
303
0
    }
304
410
  }
305
410
}
306
307
void OperationDriver::ReplicationFinished(
308
7.79M
    const Status& status, int64_t leader_term, OpIds* applied_op_ids) {
309
1.89k
  LOG_IF(DFATAL, status.ok() && !GetOpId().valid()) << "Invalid op id after replication";
310
311
7.79M
  PrepareState prepare_state_copy;
312
7.79M
  {
313
7.79M
    std::lock_guard<simple_spinlock> lock(lock_);
314
7.79M
    if (replication_state_ == REPLICATION_FAILED) {
315
0
      LOG_IF(DFATAL, status.ok()) << "Successfully replicated operation that was previously failed";
316
0
      return;
317
0
    }
318
7.79M
    CHECK_EQ(replication_state_, REPLICATING);
319
7.79M
    if (status.ok()) {
320
7.79M
      replication_state_ = REPLICATED;
321
18.4E
    } else {
322
18.4E
      replication_state_ = REPLICATION_FAILED;
323
18.4E
    }
324
7.79M
    prepare_state_copy = prepare_state_;
325
7.79M
  }
326
327
  // If we have prepared and replicated, we're ready to move ahead and apply this operation.
328
  // Note that if we set the state to REPLICATION_FAILED above, ApplyOperation() will actually abort
329
  // the operation, i.e. ApplyTask() will never be called and the operation will never be applied to
330
  // the tablet.
331
7.79M
  if (prepare_state_copy != PrepareState::PREPARED) {
332
0
    LOG(DFATAL) << "Replicating an operation that has not been prepared: " << AsString(this);
333
334
0
    LOG(ERROR) << "Attempting to wait for the operation to be prepared";
335
336
    // This case should never happen, but if it happens we are trying to survive.
337
0
    for (;;) {
338
0
      std::this_thread::sleep_for(1ms);
339
0
      PrepareState prepare_state;
340
0
      {
341
0
        std::lock_guard<simple_spinlock> lock(lock_);
342
0
        prepare_state = prepare_state_;
343
0
        if (prepare_state == PrepareState::PREPARED) {
344
0
          break;
345
0
        }
346
0
      }
347
0
      YB_LOG_EVERY_N_SECS(WARNING, 1)
348
0
          << "Waiting for the operation to be prepared, current state: " << prepare_state;
349
0
    }
350
0
  }
351
352
7.79M
  if (status.ok()) {
353
7.79M
    TRACE_EVENT_FLOW_BEGIN0("operation", "ApplyTask", this);
354
7.79M
    ApplyTask(leader_term, applied_op_ids);
355
18.4E
  } else {
356
18.4E
    HandleFailure(status);
357
18.4E
  }
358
7.79M
}
359
360
0
void OperationDriver::Abort(const Status& status) {
361
0
  CHECK(!status.ok());
362
363
0
  ReplicationState repl_state_copy;
364
0
  {
365
0
    std::lock_guard<simple_spinlock> lock(lock_);
366
0
    repl_state_copy = replication_state_;
367
0
  }
368
369
  // If the state is not NOT_REPLICATING we abort immediately and the operation
370
  // will never be replicated.
371
  // In any other state we just set the operation status, if the operation's
372
  // Apply hasn't started yet this prevents it from starting, but if it has then
373
  // the operation runs to completion.
374
0
  if (repl_state_copy == NOT_REPLICATING) {
375
0
    HandleFailure(status);
376
0
  }
377
0
}
378
379
7.79M
void OperationDriver::ApplyTask(int64_t leader_term, OpIds* applied_op_ids) {
380
7.79M
  TRACE_EVENT_FLOW_END0("operation", "ApplyTask", this);
381
7.79M
  ADOPT_TRACE(trace());
382
383
7.79M
#ifndef NDEBUG
384
7.79M
  {
385
7.79M
    std::lock_guard<simple_spinlock> lock(lock_);
386
7.79M
    DCHECK_EQ(replication_state_, REPLICATED);
387
7.79M
    DCHECK_EQ(prepare_state_, PREPARED);
388
7.79M
  }
389
7.79M
#endif
390
391
  // We need to ref-count ourself, since Commit() may run very quickly
392
  // and end up calling Finalize() while we're still in this code.
393
7.79M
  scoped_refptr<OperationDriver> ref(this);
394
395
7.79M
  {
396
7.79M
    auto status = operation_->Replicated(leader_term);
397
1.41k
    LOG_IF_WITH_PREFIX(FATAL, !status.ok())
398
1.41k
        << "Apply failed: " << status
399
1.41k
        << ", request: " << operation_->request()->ShortDebugString();
400
7.79M
    operation_tracker_->Release(this, applied_op_ids);
401
7.79M
  }
402
7.79M
}
403
404
std::string OperationDriver::StateString(ReplicationState repl_state,
405
5
                                           PrepareState prep_state) {
406
5
  string state_str;
407
5
  switch (repl_state) {
408
0
    case NOT_REPLICATING:
409
0
      StrAppend(&state_str, "NR-");  // For Not Replicating
410
0
      break;
411
1
    case REPLICATING:
412
1
      StrAppend(&state_str, "R-");  // For Replicating
413
1
      break;
414
0
    case REPLICATION_FAILED:
415
0
      StrAppend(&state_str, "RF-");  // For Replication Failed
416
0
      break;
417
4
    case REPLICATED:
418
4
      StrAppend(&state_str, "RD-");  // For Replication Done
419
4
      break;
420
0
    default:
421
0
      LOG(DFATAL) << "Unexpected replication state: " << repl_state;
422
5
  }
423
5
  switch (prep_state) {
424
5
    case PREPARED:
425
5
      StrAppend(&state_str, "P");
426
5
      break;
427
0
    case NOT_PREPARED:
428
0
      StrAppend(&state_str, "NP");
429
0
      break;
430
0
    default:
431
0
      LOG(DFATAL) << "Unexpected prepare state: " << prep_state;
432
5
  }
433
5
  return state_str;
434
5
}
435
436
4
std::string OperationDriver::LogPrefix() const {
437
4
  ReplicationState repl_state_copy;
438
4
  PrepareState prep_state_copy;
439
4
  std::string ts_string;
440
4
  OperationType operation_type;
441
442
4
  {
443
4
    std::lock_guard<simple_spinlock> lock(lock_);
444
4
    repl_state_copy = replication_state_;
445
4
    prep_state_copy = prepare_state_;
446
4
    ts_string = operation_ && operation_->has_hybrid_time()
447
4
        ? operation_->hybrid_time().ToString() : "No hybrid_time";
448
4
    operation_type = this->operation_type();
449
4
  }
450
451
4
  string state_str = StateString(repl_state_copy, prep_state_copy);
452
  // We use the tablet and the peer (T, P) to identify ts and tablet and the hybrid_time (Ts) to
453
  // (help) identify the operation. The state string (S) describes the state of the operation.
454
4
  return Format("T $0 P $1 S $2 Ts $3 $4: ",
455
                // consensus_ is NULL in some unit tests.
456
4
                PREDICT_TRUE(consensus_) ? consensus_->tablet_id() : "(unknown)",
457
4
                PREDICT_TRUE(consensus_) ? consensus_->peer_uuid() : "(unknown)",
458
4
                state_str, ts_string, operation_type);
459
4
}
460
461
13.5M
int64_t OperationDriver::SpaceUsed() {
462
13.5M
  if (!operation_) {
463
5.70M
    return 0;
464
5.70M
  }
465
7.79M
  auto consensus_round = operation_->consensus_round();
466
7.79M
  if (consensus_round) {
467
7.77M
    return consensus_round->replicate_msg()->SpaceUsedLong();
468
7.77M
  }
469
17.5k
  return operation()->request()->SpaceUsedLong();
470
17.5k
}
471
472
}  // namespace tablet
473
}  // namespace yb