YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/tablet/operations/operation.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include "yb/tablet/operations/operation.h"
34
35
#include "yb/consensus/consensus_round.h"
36
#include "yb/consensus/consensus.pb.h"
37
38
#include "yb/tablet/tablet.h"
39
40
#include "yb/tserver/tserver_error.h"
41
42
#include "yb/util/async_util.h"
43
#include "yb/util/logging.h"
44
#include "yb/util/size_literals.h"
45
#include "yb/util/trace.h"
46
47
namespace yb {
48
namespace tablet {
49
50
using tserver::TabletServerError;
51
using tserver::TabletServerErrorPB;
52
53
Operation::Operation(OperationType operation_type, Tablet* tablet)
54
16.2M
    : operation_type_(operation_type), tablet_(tablet) {
55
16.2M
}
56
57
16.2M
Operation::~Operation() {}
58
59
5.31k
std::string Operation::LogPrefix() const {
60
5.31k
  return Format("T $0 $1: ", tablet()->tablet_id(), this);
61
5.31k
}
62
63
7
std::string Operation::ToString() const {
64
7
  return Format("{ type: $0 consensus_round: $1 }", operation_type(), consensus_round());
65
7
}
66
67
68
14.3M
Status Operation::Replicated(int64_t leader_term) {
69
14.3M
  Status complete_status = Status::OK();
70
14.3M
  RETURN_NOT_OK(DoReplicated(leader_term, &complete_status));
71
14.3M
  Replicated();
72
14.3M
  Release();
73
14.3M
  CompleteWithStatus(complete_status);
74
14.3M
  return Status::OK();
75
14.3M
}
76
77
612
void Operation::Aborted(const Status& status, bool was_pending) {
78
612
  
VLOG_WITH_PREFIX_AND_FUNC0
(4) << status0
;
79
612
  Aborted(was_pending);
80
612
  Release();
81
612
  CompleteWithStatus(DoAborted(status));
82
612
}
83
84
15.8M
void Operation::CompleteWithStatus(const Status& status) const {
85
15.8M
  bool expected = false;
86
15.8M
  if (!complete_.compare_exchange_strong(expected, true)) {
87
0
    LOG_WITH_PREFIX(DFATAL) << __func__ << " called twice, new status: " << status;
88
0
    return;
89
0
  }
90
15.8M
  if (completion_clbk_) {
91
6.16M
    completion_clbk_(status);
92
6.16M
  }
93
15.8M
}
94
95
14.3M
void Operation::set_consensus_round(const consensus::ConsensusRoundPtr& consensus_round) {
96
14.3M
  consensus_round_ = consensus_round;
97
14.3M
  set_op_id(consensus_round_->id());
98
14.3M
  UpdateRequestFromConsensusRound();
99
14.3M
}
100
101
9.63M
void Operation::set_hybrid_time(const HybridTime& hybrid_time) {
102
  // make sure we set the hybrid_time only once
103
9.63M
  std::lock_guard<simple_spinlock> l(mutex_);
104
9.63M
  DCHECK(!hybrid_time_.is_valid());
105
9.63M
  hybrid_time_ = hybrid_time;
106
9.63M
}
107
108
8.82M
HybridTime Operation::WriteHybridTime() const {
109
8.82M
  return hybrid_time();
110
8.82M
}
111
112
5.04M
void Operation::AddedToLeader(const OpId& op_id, const OpId& committed_op_id) {
113
5.04M
  HybridTime hybrid_time;
114
5.04M
  if (use_mvcc()) {
115
4.52M
    hybrid_time = tablet_->mvcc_manager()->AddLeaderPending(op_id);
116
4.52M
  } else {
117
522k
    hybrid_time = tablet_->clock()->Now();
118
522k
  }
119
120
5.04M
  {
121
5.04M
    std::lock_guard<simple_spinlock> l(mutex_);
122
5.04M
    hybrid_time_ = hybrid_time;
123
5.04M
    op_id_ = op_id;
124
5.04M
    auto* replicate_msg = consensus_round_->replicate_msg().get();
125
5.04M
    op_id.ToPB(replicate_msg->mutable_id());
126
5.04M
    committed_op_id.ToPB(replicate_msg->mutable_committed_op_id());
127
5.04M
    replicate_msg->set_hybrid_time(hybrid_time_.ToUint64());
128
5.04M
    replicate_msg->set_monotonic_counter(*tablet()->monotonic_counter());
129
5.04M
  }
130
131
5.04M
  AddedAsPending();
132
5.04M
}
133
134
9.27M
void Operation::AddedToFollower() {
135
9.27M
  if (use_mvcc()) {
136
8.51M
    tablet()->mvcc_manager()->AddFollowerPending(hybrid_time(), op_id());
137
8.51M
  }
138
139
9.27M
  AddedAsPending();
140
9.27M
}
141
142
612
void Operation::Aborted(bool was_pending) {
143
612
  if (use_mvcc()) {
144
590
    auto hybrid_time = hybrid_time_even_if_unset();
145
590
    if (hybrid_time.is_valid()) {
146
533
      tablet()->mvcc_manager()->Aborted(hybrid_time, op_id());
147
533
    }
148
590
  }
149
150
612
  if (was_pending) {
151
541
    RemovedFromPending();
152
541
  }
153
612
}
154
155
14.3M
void Operation::Replicated() {
156
14.3M
  if (use_mvcc()) {
157
13.0M
    tablet()->mvcc_manager()->Replicated(hybrid_time(), op_id());
158
13.0M
  }
159
160
14.3M
  RemovedFromPending();
161
14.3M
}
162
163
13.2M
void Operation::Release() {
164
13.2M
}
165
166
1.10M
void ExclusiveSchemaOperationBase::ReleasePermitToken() {
167
1.10M
  permit_token_.Reset();
168
1.10M
  TRACE("Released permit token");
169
1.10M
}
170
171
OperationCompletionCallback MakeWeakSynchronizerOperationCompletionCallback(
172
13
    std::weak_ptr<Synchronizer> synchronizer) {
173
13
  return [synchronizer = std::move(synchronizer)](const Status& status) {
174
13
    auto shared_synchronizer = synchronizer.lock();
175
13
    if (shared_synchronizer) {
176
13
      shared_synchronizer->StatusCB(status);
177
13
    }
178
13
  };
179
13
}
180
181
5.04M
consensus::ReplicateMsgPtr CreateReplicateMsg(OperationType op_type) {
182
5.04M
  auto result = std::make_shared<consensus::ReplicateMsg>();
183
5.04M
  result->set_op_type(static_cast<consensus::OperationType>(op_type));
184
5.04M
  return result;
185
5.04M
}
186
187
}  // namespace tablet
188
}  // namespace yb