YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/tablet/operations/operation.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include "yb/tablet/operations/operation.h"
34
35
#include "yb/consensus/consensus_round.h"
36
#include "yb/consensus/consensus.pb.h"
37
38
#include "yb/tablet/tablet.h"
39
40
#include "yb/tserver/tserver_error.h"
41
42
#include "yb/util/async_util.h"
43
#include "yb/util/logging.h"
44
#include "yb/util/size_literals.h"
45
#include "yb/util/trace.h"
46
47
namespace yb {
48
namespace tablet {
49
50
using tserver::TabletServerError;
51
using tserver::TabletServerErrorPB;
52
53
Operation::Operation(OperationType operation_type, Tablet* tablet)
54
9.12M
    : operation_type_(operation_type), tablet_(tablet) {
55
9.12M
}
56
57
9.13M
Operation::~Operation() {}
58
59
2.59k
std::string Operation::LogPrefix() const {
60
2.59k
  return Format("T $0 $1: ", tablet()->tablet_id(), this);
61
2.59k
}
62
63
4
std::string Operation::ToString() const {
64
4
  return Format("{ type: $0 consensus_round: $1 }", operation_type(), consensus_round());
65
4
}
66
67
68
7.79M
Status Operation::Replicated(int64_t leader_term) {
69
7.79M
  Status complete_status = Status::OK();
70
7.79M
  RETURN_NOT_OK(DoReplicated(leader_term, &complete_status));
71
7.79M
  Replicated();
72
7.79M
  Release();
73
7.79M
  CompleteWithStatus(complete_status);
74
7.79M
  return Status::OK();
75
7.79M
}
76
77
416
void Operation::Aborted(const Status& status, bool was_pending) {
78
0
  VLOG_WITH_PREFIX_AND_FUNC(4) << status;
79
416
  Aborted(was_pending);
80
416
  Release();
81
416
  CompleteWithStatus(DoAborted(status));
82
416
}
83
84
8.73M
void Operation::CompleteWithStatus(const Status& status) const {
85
8.73M
  bool expected = false;
86
8.73M
  if (!complete_.compare_exchange_strong(expected, true)) {
87
0
    LOG_WITH_PREFIX(DFATAL) << __func__ << " called twice, new status: " << status;
88
0
    return;
89
0
  }
90
8.73M
  if (completion_clbk_) {
91
3.40M
    completion_clbk_(status);
92
3.40M
  }
93
8.73M
}
94
95
7.79M
void Operation::set_consensus_round(const consensus::ConsensusRoundPtr& consensus_round) {
96
7.79M
  consensus_round_ = consensus_round;
97
7.79M
  set_op_id(consensus_round_->id());
98
7.79M
  UpdateRequestFromConsensusRound();
99
7.79M
}
100
101
5.47M
void Operation::set_hybrid_time(const HybridTime& hybrid_time) {
102
  // make sure we set the hybrid_time only once
103
5.47M
  std::lock_guard<simple_spinlock> l(mutex_);
104
5.47M
  DCHECK(!hybrid_time_.is_valid());
105
5.47M
  hybrid_time_ = hybrid_time;
106
5.47M
}
107
108
4.75M
HybridTime Operation::WriteHybridTime() const {
109
4.75M
  return hybrid_time();
110
4.75M
}
111
112
2.69M
void Operation::AddedToLeader(const OpId& op_id, const OpId& committed_op_id) {
113
2.69M
  HybridTime hybrid_time;
114
2.69M
  if (use_mvcc()) {
115
2.41M
    hybrid_time = tablet_->mvcc_manager()->AddLeaderPending(op_id);
116
280k
  } else {
117
280k
    hybrid_time = tablet_->clock()->Now();
118
280k
  }
119
120
2.69M
  {
121
2.69M
    std::lock_guard<simple_spinlock> l(mutex_);
122
2.69M
    hybrid_time_ = hybrid_time;
123
2.69M
    op_id_ = op_id;
124
2.69M
    auto* replicate_msg = consensus_round_->replicate_msg().get();
125
2.69M
    op_id.ToPB(replicate_msg->mutable_id());
126
2.69M
    committed_op_id.ToPB(replicate_msg->mutable_committed_op_id());
127
2.69M
    replicate_msg->set_hybrid_time(hybrid_time_.ToUint64());
128
2.69M
    replicate_msg->set_monotonic_counter(*tablet()->monotonic_counter());
129
2.69M
  }
130
131
2.69M
  AddedAsPending();
132
2.69M
}
133
134
5.09M
void Operation::AddedToFollower() {
135
5.09M
  if (use_mvcc()) {
136
4.66M
    tablet()->mvcc_manager()->AddFollowerPending(hybrid_time(), op_id());
137
4.66M
  }
138
139
5.09M
  AddedAsPending();
140
5.09M
}
141
142
416
void Operation::Aborted(bool was_pending) {
143
416
  if (use_mvcc()) {
144
396
    auto hybrid_time = hybrid_time_even_if_unset();
145
396
    if (hybrid_time.is_valid()) {
146
383
      tablet()->mvcc_manager()->Aborted(hybrid_time, op_id());
147
383
    }
148
396
  }
149
150
416
  if (was_pending) {
151
392
    RemovedFromPending();
152
392
  }
153
416
}
154
155
7.79M
void Operation::Replicated() {
156
7.79M
  if (use_mvcc()) {
157
7.08M
    tablet()->mvcc_manager()->Replicated(hybrid_time(), op_id());
158
7.08M
  }
159
160
7.79M
  RemovedFromPending();
161
7.79M
}
162
163
7.24M
void Operation::Release() {
164
7.24M
}
165
166
546k
void ExclusiveSchemaOperationBase::ReleasePermitToken() {
167
546k
  permit_token_.Reset();
168
546k
  TRACE("Released permit token");
169
546k
}
170
171
OperationCompletionCallback MakeWeakSynchronizerOperationCompletionCallback(
172
0
    std::weak_ptr<Synchronizer> synchronizer) {
173
0
  return [synchronizer = std::move(synchronizer)](const Status& status) {
174
0
    auto shared_synchronizer = synchronizer.lock();
175
0
    if (shared_synchronizer) {
176
0
      shared_synchronizer->StatusCB(status);
177
0
    }
178
0
  };
179
0
}
180
181
2.69M
consensus::ReplicateMsgPtr CreateReplicateMsg(OperationType op_type) {
182
2.69M
  auto result = std::make_shared<consensus::ReplicateMsg>();
183
2.69M
  result->set_op_type(static_cast<consensus::OperationType>(op_type));
184
2.69M
  return result;
185
2.69M
}
186
187
}  // namespace tablet
188
}  // namespace yb