/Users/deen/code/yugabyte-db/src/yb/tserver/tablet_service.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | // |
18 | | // The following only applies to changes made to this file as part of YugaByte development. |
19 | | // |
20 | | // Portions Copyright (c) YugaByte, Inc. |
21 | | // |
22 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
23 | | // in compliance with the License. You may obtain a copy of the License at |
24 | | // |
25 | | // http://www.apache.org/licenses/LICENSE-2.0 |
26 | | // |
27 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
28 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
29 | | // or implied. See the License for the specific language governing permissions and limitations |
30 | | // under the License. |
31 | | // |
32 | | |
33 | | #include "yb/tserver/tablet_service.h" |
34 | | |
35 | | #include <algorithm> |
36 | | #include <memory> |
37 | | #include <string> |
38 | | #include <vector> |
39 | | |
40 | | #include <glog/logging.h> |
41 | | |
42 | | #include "yb/client/forward_rpc.h" |
43 | | #include "yb/client/transaction.h" |
44 | | #include "yb/client/transaction_pool.h" |
45 | | |
46 | | #include "yb/common/ql_rowblock.h" |
47 | | #include "yb/common/ql_value.h" |
48 | | #include "yb/common/row_mark.h" |
49 | | #include "yb/common/schema.h" |
50 | | #include "yb/common/wire_protocol.h" |
51 | | #include "yb/consensus/leader_lease.h" |
52 | | #include "yb/consensus/consensus.pb.h" |
53 | | #include "yb/consensus/raft_consensus.h" |
54 | | |
55 | | #include "yb/docdb/cql_operation.h" |
56 | | #include "yb/docdb/pgsql_operation.h" |
57 | | |
58 | | #include "yb/gutil/bind.h" |
59 | | #include "yb/gutil/casts.h" |
60 | | #include "yb/gutil/stl_util.h" |
61 | | #include "yb/gutil/stringprintf.h" |
62 | | #include "yb/gutil/strings/escaping.h" |
63 | | |
64 | | #include "yb/rpc/thread_pool.h" |
65 | | |
66 | | #include "yb/server/hybrid_clock.h" |
67 | | |
68 | | #include "yb/tablet/abstract_tablet.h" |
69 | | #include "yb/tablet/metadata.pb.h" |
70 | | #include "yb/tablet/operations/change_metadata_operation.h" |
71 | | #include "yb/tablet/operations/split_operation.h" |
72 | | #include "yb/tablet/operations/truncate_operation.h" |
73 | | #include "yb/tablet/operations/update_txn_operation.h" |
74 | | #include "yb/tablet/operations/write_operation.h" |
75 | | #include "yb/tablet/read_result.h" |
76 | | #include "yb/tablet/tablet.h" |
77 | | #include "yb/tablet/tablet_bootstrap_if.h" |
78 | | #include "yb/tablet/tablet_metadata.h" |
79 | | #include "yb/tablet/tablet_metrics.h" |
80 | | #include "yb/tablet/transaction_participant.h" |
81 | | #include "yb/tablet/write_query.h" |
82 | | |
83 | | #include "yb/tserver/read_query.h" |
84 | | #include "yb/tserver/service_util.h" |
85 | | #include "yb/tserver/tablet_server.h" |
86 | | #include "yb/tserver/ts_tablet_manager.h" |
87 | | #include "yb/tserver/tserver_error.h" |
88 | | |
89 | | #include "yb/util/crc.h" |
90 | | #include "yb/util/debug-util.h" |
91 | | #include "yb/util/debug/long_operation_tracker.h" |
92 | | #include "yb/util/debug/trace_event.h" |
93 | | #include "yb/util/faststring.h" |
94 | | #include "yb/util/flag_tags.h" |
95 | | #include "yb/util/format.h" |
96 | | #include "yb/util/logging.h" |
97 | | #include "yb/util/math_util.h" |
98 | | #include "yb/util/mem_tracker.h" |
99 | | #include "yb/util/metrics.h" |
100 | | #include "yb/util/monotime.h" |
101 | | #include "yb/util/random_util.h" |
102 | | #include "yb/util/scope_exit.h" |
103 | | #include "yb/util/size_literals.h" |
104 | | #include "yb/util/status.h" |
105 | | #include "yb/util/status_callback.h" |
106 | | #include "yb/util/status_format.h" |
107 | | #include "yb/util/status_log.h" |
108 | | #include "yb/util/string_util.h" |
109 | | #include "yb/util/trace.h" |
110 | | |
111 | | #include "yb/yql/pgwrapper/ysql_upgrade.h" |
112 | | |
113 | | using namespace std::literals; // NOLINT |
114 | | |
115 | | DEFINE_int32(scanner_default_batch_size_bytes, 64 * 1024, |
116 | | "The default size for batches of scan results"); |
117 | | TAG_FLAG(scanner_default_batch_size_bytes, advanced); |
118 | | TAG_FLAG(scanner_default_batch_size_bytes, runtime); |
119 | | |
120 | | DEFINE_int32(scanner_max_batch_size_bytes, 8 * 1024 * 1024, |
121 | | "The maximum batch size that a client may request for " |
122 | | "scan results."); |
123 | | TAG_FLAG(scanner_max_batch_size_bytes, advanced); |
124 | | TAG_FLAG(scanner_max_batch_size_bytes, runtime); |
125 | | |
126 | | DEFINE_int32(scanner_batch_size_rows, 100, |
127 | | "The number of rows to batch for servicing scan requests."); |
128 | | TAG_FLAG(scanner_batch_size_rows, advanced); |
129 | | TAG_FLAG(scanner_batch_size_rows, runtime); |
130 | | |
131 | | // Fault injection flags. |
132 | | DEFINE_test_flag(int32, scanner_inject_latency_on_each_batch_ms, 0, |
133 | | "If set, the scanner will pause the specified number of milliesconds " |
134 | | "before reading each batch of data on the tablet server."); |
135 | | |
136 | | DEFINE_int32(max_wait_for_safe_time_ms, 5000, |
137 | | "Maximum time in milliseconds to wait for the safe time to advance when trying to " |
138 | | "scan at the given hybrid_time."); |
139 | | |
140 | | DEFINE_int32(num_concurrent_backfills_allowed, -1, |
141 | | "Maximum number of concurrent backfill jobs that is allowed to run."); |
142 | | |
143 | | DEFINE_test_flag(bool, tserver_noop_read_write, false, "Respond NOOP to read/write."); |
144 | | |
145 | | DEFINE_uint64(index_backfill_upperbound_for_user_enforced_txn_duration_ms, 65000, |
146 | | "For Non-Txn tables, it is impossible to know at the tservers " |
147 | | "whether or not an 'old transaction' is still active. To avoid " |
148 | | "having such old transactions, we assume a bound on the duration " |
149 | | "of such transactions (during the backfill process) and wait " |
150 | | "it out. This flag denotes a conservative upper bound on the " |
151 | | "duration of such user enforced transactions."); |
152 | | TAG_FLAG(index_backfill_upperbound_for_user_enforced_txn_duration_ms, evolving); |
153 | | TAG_FLAG(index_backfill_upperbound_for_user_enforced_txn_duration_ms, runtime); |
154 | | |
155 | | DEFINE_int32(index_backfill_additional_delay_before_backfilling_ms, 0, |
156 | | "Operations that are received by the tserver, and have decided how " |
157 | | "the indexes need to be updated (based on the IndexPermission), will " |
158 | | "not be added to the list of current transactions until they are " |
159 | | "replicated/applied. This delay allows for the GetSafeTime method " |
160 | | "to wait for such operations to be replicated/applied. Ideally, this " |
161 | | "value should be set to be something larger than the raft-heartbeat-interval " |
162 | | "but can be as high as the client_rpc_timeout if we want to be more conservative."); |
163 | | TAG_FLAG(index_backfill_additional_delay_before_backfilling_ms, evolving); |
164 | | TAG_FLAG(index_backfill_additional_delay_before_backfilling_ms, runtime); |
165 | | |
166 | | DEFINE_int32(index_backfill_wait_for_old_txns_ms, 0, |
167 | | "Index backfill needs to wait for transactions that started before the " |
168 | | "WRITE_AND_DELETE phase to commit or abort before choosing a time for " |
169 | | "backfilling the index. This is the max time that the GetSafeTime call will " |
170 | | "wait for, before it resorts to attempt aborting old transactions. This is " |
171 | | "necessary to guard against the pathological active transaction that never " |
172 | | "commits from blocking the index backfill forever."); |
173 | | TAG_FLAG(index_backfill_wait_for_old_txns_ms, evolving); |
174 | | TAG_FLAG(index_backfill_wait_for_old_txns_ms, runtime); |
175 | | |
176 | | DEFINE_test_flag(double, respond_write_failed_probability, 0.0, |
177 | | "Probability to respond that write request is failed"); |
178 | | |
179 | | DEFINE_test_flag(bool, rpc_delete_tablet_fail, false, "Should delete tablet RPC fail."); |
180 | | |
181 | | DECLARE_bool(disable_alter_vs_write_mutual_exclusion); |
182 | | DECLARE_uint64(max_clock_skew_usec); |
183 | | DECLARE_uint64(transaction_min_running_check_interval_ms); |
184 | | DECLARE_int64(transaction_rpc_timeout_ms); |
185 | | |
186 | | DEFINE_test_flag(int32, txn_status_table_tablet_creation_delay_ms, 0, |
187 | | "Extra delay to slowdown creation of transaction status table tablet."); |
188 | | |
189 | | DEFINE_test_flag(int32, leader_stepdown_delay_ms, 0, |
190 | | "Amount of time to delay before starting a leader stepdown change."); |
191 | | |
192 | | DEFINE_test_flag(int32, alter_schema_delay_ms, 0, "Delay before processing AlterSchema."); |
193 | | |
194 | | DEFINE_test_flag(bool, disable_post_split_tablet_rbs_check, false, |
195 | | "If true, bypass any checks made to reject remote boostrap requests for post " |
196 | | "split tablets whose parent tablets are still present."); |
197 | | |
198 | | DEFINE_test_flag(double, fail_tablet_split_probability, 0.0, |
199 | | "Probability of failing in TabletServiceAdminImpl::SplitTablet."); |
200 | | |
201 | | DEFINE_test_flag(bool, pause_tserver_get_split_key, false, |
202 | | "Pause before processing a GetSplitKey request."); |
203 | | |
204 | | DECLARE_int32(heartbeat_interval_ms); |
205 | | DECLARE_uint64(rocksdb_max_file_size_for_compaction); |
206 | | |
207 | | DECLARE_int32(ysql_transaction_abort_timeout_ms); |
208 | | |
209 | | DEFINE_test_flag(bool, fail_alter_schema_after_abort_transactions, false, |
210 | | "If true, setup an error status in AlterSchema and respond success to rpc call. " |
211 | | "This failure should not cause the TServer to crash but " |
212 | | "instead return an error message on the YSQL connection."); |
213 | | |
214 | | double TEST_delay_create_transaction_probability = 0; |
215 | | |
216 | | namespace yb { |
217 | | namespace tserver { |
218 | | |
219 | | using client::internal::ForwardReadRpc; |
220 | | using client::internal::ForwardWriteRpc; |
221 | | using consensus::ChangeConfigRequestPB; |
222 | | using consensus::ChangeConfigResponsePB; |
223 | | using consensus::Consensus; |
224 | | using consensus::CONSENSUS_CONFIG_ACTIVE; |
225 | | using consensus::CONSENSUS_CONFIG_COMMITTED; |
226 | | using consensus::ConsensusConfigType; |
227 | | using consensus::ConsensusRequestPB; |
228 | | using consensus::ConsensusResponsePB; |
229 | | using consensus::GetLastOpIdRequestPB; |
230 | | using consensus::GetNodeInstanceRequestPB; |
231 | | using consensus::GetNodeInstanceResponsePB; |
232 | | using consensus::LeaderLeaseStatus; |
233 | | using consensus::LeaderStepDownRequestPB; |
234 | | using consensus::LeaderStepDownResponsePB; |
235 | | using consensus::RaftPeerPB; |
236 | | using consensus::RunLeaderElectionRequestPB; |
237 | | using consensus::RunLeaderElectionResponsePB; |
238 | | using consensus::StartRemoteBootstrapRequestPB; |
239 | | using consensus::StartRemoteBootstrapResponsePB; |
240 | | using consensus::UnsafeChangeConfigRequestPB; |
241 | | using consensus::UnsafeChangeConfigResponsePB; |
242 | | using consensus::VoteRequestPB; |
243 | | using consensus::VoteResponsePB; |
244 | | |
245 | | using std::unique_ptr; |
246 | | using google::protobuf::RepeatedPtrField; |
247 | | using rpc::RpcContext; |
248 | | using std::shared_ptr; |
249 | | using std::vector; |
250 | | using std::string; |
251 | | using strings::Substitute; |
252 | | using tablet::ChangeMetadataOperation; |
253 | | using tablet::Tablet; |
254 | | using tablet::TabletPeer; |
255 | | using tablet::TabletPeerPtr; |
256 | | using tablet::TabletStatusPB; |
257 | | using tablet::TruncateOperation; |
258 | | using tablet::OperationCompletionCallback; |
259 | | using tablet::WriteOperation; |
260 | | |
261 | | namespace { |
262 | | |
263 | 27.0M | Result<std::shared_ptr<consensus::RaftConsensus>> GetConsensus(const TabletPeerPtr& tablet_peer) { |
264 | 27.0M | auto result = tablet_peer->shared_raft_consensus(); |
265 | 27.0M | if (!result) { |
266 | 0 | Status s = STATUS(ServiceUnavailable, "Consensus unavailable. Tablet not running"); |
267 | 0 | return s.CloneAndAddErrorCode(TabletServerError(TabletServerErrorPB::TABLET_NOT_RUNNING)); |
268 | 0 | } |
269 | 27.0M | return result; |
270 | 27.0M | } |
271 | | |
272 | | template<class RespClass> |
273 | | std::shared_ptr<consensus::RaftConsensus> GetConsensusOrRespond(const TabletPeerPtr& tablet_peer, |
274 | | RespClass* resp, |
275 | 556 | rpc::RpcContext* context) { |
276 | 556 | auto result = GetConsensus(tablet_peer); |
277 | 556 | if (!result.ok()) { |
278 | 0 | SetupErrorAndRespond(resp->mutable_error(), result.status(), context); |
279 | 0 | } |
280 | 556 | return result.get(); |
281 | 556 | } |
282 | | |
283 | | template<class RespClass> |
284 | | bool GetConsensusOrRespond(const TabletPeerPtr& tablet_peer, |
285 | | RespClass* resp, |
286 | | rpc::RpcContext* context, |
287 | 27.0M | shared_ptr<Consensus>* consensus) { |
288 | 27.0M | auto result = GetConsensus(tablet_peer); |
289 | 27.0M | if (!result.ok()) { |
290 | 0 | SetupErrorAndRespond(resp->mutable_error(), result.status(), context); |
291 | 0 | return false; |
292 | 0 | } |
293 | 27.0M | return (*consensus = result.get()) != nullptr; |
294 | 27.0M | } tablet_service.cc:bool yb::tserver::(anonymous namespace)::GetConsensusOrRespond<yb::consensus::ConsensusResponsePB>(std::__1::shared_ptr<yb::tablet::TabletPeer> const&, yb::consensus::ConsensusResponsePB*, yb::rpc::RpcContext*, std::__1::shared_ptr<yb::consensus::Consensus>*) Line | Count | Source | 287 | 25.5M | shared_ptr<Consensus>* consensus) { | 288 | 25.5M | auto result = GetConsensus(tablet_peer); | 289 | 25.5M | if (!result.ok()) { | 290 | 0 | SetupErrorAndRespond(resp->mutable_error(), result.status(), context); | 291 | 0 | return false; | 292 | 0 | } | 293 | 25.5M | return (*consensus = result.get()) != nullptr; | 294 | 25.5M | } |
tablet_service.cc:bool yb::tserver::(anonymous namespace)::GetConsensusOrRespond<yb::consensus::VoteResponsePB>(std::__1::shared_ptr<yb::tablet::TabletPeer> const&, yb::consensus::VoteResponsePB*, yb::rpc::RpcContext*, std::__1::shared_ptr<yb::consensus::Consensus>*) Line | Count | Source | 287 | 1.36M | shared_ptr<Consensus>* consensus) { | 288 | 1.36M | auto result = GetConsensus(tablet_peer); | 289 | 1.36M | if (!result.ok()) { | 290 | 0 | SetupErrorAndRespond(resp->mutable_error(), result.status(), context); | 291 | 0 | return false; | 292 | 0 | } | 293 | 1.36M | return (*consensus = result.get()) != nullptr; | 294 | 1.36M | } |
tablet_service.cc:bool yb::tserver::(anonymous namespace)::GetConsensusOrRespond<yb::consensus::ChangeConfigResponsePB>(std::__1::shared_ptr<yb::tablet::TabletPeer> const&, yb::consensus::ChangeConfigResponsePB*, yb::rpc::RpcContext*, std::__1::shared_ptr<yb::consensus::Consensus>*) Line | Count | Source | 287 | 5.29k | shared_ptr<Consensus>* consensus) { | 288 | 5.29k | auto result = GetConsensus(tablet_peer); | 289 | 5.29k | if (!result.ok()) { | 290 | 0 | SetupErrorAndRespond(resp->mutable_error(), result.status(), context); | 291 | 0 | return false; | 292 | 0 | } | 293 | 5.29k | return (*consensus = result.get()) != nullptr; | 294 | 5.29k | } |
tablet_service.cc:bool yb::tserver::(anonymous namespace)::GetConsensusOrRespond<yb::consensus::UnsafeChangeConfigResponsePB>(std::__1::shared_ptr<yb::tablet::TabletPeer> const&, yb::consensus::UnsafeChangeConfigResponsePB*, yb::rpc::RpcContext*, std::__1::shared_ptr<yb::consensus::Consensus>*) Line | Count | Source | 287 | 6 | shared_ptr<Consensus>* consensus) { | 288 | 6 | auto result = GetConsensus(tablet_peer); | 289 | 6 | if (!result.ok()) { | 290 | 0 | SetupErrorAndRespond(resp->mutable_error(), result.status(), context); | 291 | 0 | return false; | 292 | 0 | } | 293 | 6 | return (*consensus = result.get()) != nullptr; | 294 | 6 | } |
tablet_service.cc:bool yb::tserver::(anonymous namespace)::GetConsensusOrRespond<yb::consensus::RunLeaderElectionResponsePB>(std::__1::shared_ptr<yb::tablet::TabletPeer> const&, yb::consensus::RunLeaderElectionResponsePB*, yb::rpc::RpcContext*, std::__1::shared_ptr<yb::consensus::Consensus>*) Line | Count | Source | 287 | 57.8k | shared_ptr<Consensus>* consensus) { | 288 | 57.8k | auto result = GetConsensus(tablet_peer); | 289 | 57.8k | if (!result.ok()) { | 290 | 0 | SetupErrorAndRespond(resp->mutable_error(), result.status(), context); | 291 | 0 | return false; | 292 | 0 | } | 293 | 57.8k | return (*consensus = result.get()) != nullptr; | 294 | 57.8k | } |
tablet_service.cc:bool yb::tserver::(anonymous namespace)::GetConsensusOrRespond<yb::consensus::LeaderElectionLostResponsePB>(std::__1::shared_ptr<yb::tablet::TabletPeer> const&, yb::consensus::LeaderElectionLostResponsePB*, yb::rpc::RpcContext*, std::__1::shared_ptr<yb::consensus::Consensus>*) Line | Count | Source | 287 | 77 | shared_ptr<Consensus>* consensus) { | 288 | 77 | auto result = GetConsensus(tablet_peer); | 289 | 77 | if (!result.ok()) { | 290 | 0 | SetupErrorAndRespond(resp->mutable_error(), result.status(), context); | 291 | 0 | return false; | 292 | 0 | } | 293 | 77 | return (*consensus = result.get()) != nullptr; | 294 | 77 | } |
tablet_service.cc:bool yb::tserver::(anonymous namespace)::GetConsensusOrRespond<yb::consensus::LeaderStepDownResponsePB>(std::__1::shared_ptr<yb::tablet::TabletPeer> const&, yb::consensus::LeaderStepDownResponsePB*, yb::rpc::RpcContext*, std::__1::shared_ptr<yb::consensus::Consensus>*) Line | Count | Source | 287 | 54.0k | shared_ptr<Consensus>* consensus) { | 288 | 54.0k | auto result = GetConsensus(tablet_peer); | 289 | 54.0k | if (!result.ok()) { | 290 | 0 | SetupErrorAndRespond(resp->mutable_error(), result.status(), context); | 291 | 0 | return false; | 292 | 0 | } | 293 | 54.0k | return (*consensus = result.get()) != nullptr; | 294 | 54.0k | } |
tablet_service.cc:bool yb::tserver::(anonymous namespace)::GetConsensusOrRespond<yb::consensus::GetConsensusStateResponsePB>(std::__1::shared_ptr<yb::tablet::TabletPeer> const&, yb::consensus::GetConsensusStateResponsePB*, yb::rpc::RpcContext*, std::__1::shared_ptr<yb::consensus::Consensus>*) Line | Count | Source | 287 | 3.88k | shared_ptr<Consensus>* consensus) { | 288 | 3.88k | auto result = GetConsensus(tablet_peer); | 289 | 3.88k | if (!result.ok()) { | 290 | 0 | SetupErrorAndRespond(resp->mutable_error(), result.status(), context); | 291 | 0 | return false; | 292 | 0 | } | 293 | 3.88k | return (*consensus = result.get()) != nullptr; | 294 | 3.88k | } |
|
295 | | |
296 | | } // namespace |
297 | | |
298 | | template<class Resp> |
299 | | bool TabletServiceImpl::CheckWriteThrottlingOrRespond( |
300 | 2.56M | double score, tablet::TabletPeer* tablet_peer, Resp* resp, rpc::RpcContext* context) { |
301 | | // Check for memory pressure; don't bother doing any additional work if we've |
302 | | // exceeded the limit. |
303 | 2.56M | auto status = CheckWriteThrottling(score, tablet_peer); |
304 | 2.56M | if (!status.ok()) { |
305 | 15 | SetupErrorAndRespond(resp->mutable_error(), status, context); |
306 | 15 | return false; |
307 | 15 | } |
308 | | |
309 | 2.56M | return true; |
310 | 2.56M | } |
311 | | |
312 | | typedef ListTabletsResponsePB::StatusAndSchemaPB StatusAndSchemaPB; |
313 | | |
314 | | class WriteQueryCompletionCallback { |
315 | | public: |
316 | | WriteQueryCompletionCallback( |
317 | | tablet::TabletPeerPtr tablet_peer, |
318 | | std::shared_ptr<rpc::RpcContext> context, |
319 | | WriteResponsePB* response, |
320 | | tablet::WriteQuery* query, |
321 | | const server::ClockPtr& clock, |
322 | | bool trace = false) |
323 | | : tablet_peer_(std::move(tablet_peer)), |
324 | | context_(std::move(context)), |
325 | | response_(response), |
326 | | query_(query), |
327 | | clock_(clock), |
328 | | include_trace_(trace), |
329 | 2.56M | trace_(include_trace_ ? Trace::CurrentTrace() : nullptr) {} |
330 | | |
331 | 2.56M | void operator()(Status status) const { |
332 | 18.4E | VLOG(1) << __PRETTY_FUNCTION__ << " completing with status " << status; |
333 | | // When we don't need to return any data, we could return success on duplicate request. |
334 | 2.56M | if (status.IsAlreadyPresent() && |
335 | 2.56M | query_->ql_write_ops()->empty()1 && |
336 | 2.56M | query_->pgsql_write_ops()->empty()1 && |
337 | 2.56M | query_->client_request()->redis_write_batch().empty()1 ) { |
338 | 1 | status = Status::OK(); |
339 | 1 | } |
340 | | |
341 | 2.56M | TRACE("Write completing with status $0", yb::ToString(status)); |
342 | | |
343 | 2.56M | if (!status.ok()) { |
344 | 90.4k | LOG(INFO) << tablet_peer_->LogPrefix() << "Write failed: " << status; |
345 | 90.4k | if (include_trace_ && trace_0 ) { |
346 | 0 | response_->set_trace_buffer(trace_->DumpToString(true)); |
347 | 0 | } |
348 | 90.4k | SetupErrorAndRespond(get_error(), status, context_.get()); |
349 | 90.4k | return; |
350 | 90.4k | } |
351 | | |
352 | | // Retrieve the rowblocks returned from the QL write operations and return them as RPC |
353 | | // sidecars. Populate the row schema also. |
354 | 2.47M | faststring rows_data; |
355 | 2.47M | for (const auto& ql_write_op : *query_->ql_write_ops()) { |
356 | 273 | const auto& ql_write_req = ql_write_op->request(); |
357 | 273 | auto* ql_write_resp = ql_write_op->response(); |
358 | 273 | const QLRowBlock* rowblock = ql_write_op->rowblock(); |
359 | 273 | SchemaToColumnPBs(rowblock->schema(), ql_write_resp->mutable_column_schemas()); |
360 | 273 | rows_data.clear(); |
361 | 273 | rowblock->Serialize(ql_write_req.client(), &rows_data); |
362 | 273 | ql_write_resp->set_rows_data_sidecar( |
363 | 273 | narrow_cast<int32_t>(context_->AddRpcSidecar(rows_data))); |
364 | 273 | } |
365 | | |
366 | 2.47M | if (!query_->pgsql_write_ops()->empty()) { |
367 | | // Retrieve the resultset returned from the PGSQL write operations and return them as RPC |
368 | | // sidecars. |
369 | | |
370 | 644k | size_t sidecars_size = 0; |
371 | 7.16M | for (const auto& pgsql_write_op : *query_->pgsql_write_ops()) { |
372 | 7.16M | sidecars_size += pgsql_write_op->result_buffer().size(); |
373 | 7.16M | } |
374 | | |
375 | 644k | if (sidecars_size != 0) { |
376 | 633k | context_->ReserveSidecarSpace(sidecars_size); |
377 | 7.14M | for (const auto& pgsql_write_op : *query_->pgsql_write_ops()) { |
378 | 7.14M | auto* pgsql_write_resp = pgsql_write_op->response(); |
379 | 7.14M | const faststring& result_buffer = pgsql_write_op->result_buffer(); |
380 | 7.14M | if (!result_buffer.empty()7.14M ) { |
381 | 7.14M | pgsql_write_resp->set_rows_data_sidecar( |
382 | 7.14M | narrow_cast<int32_t>(context_->AddRpcSidecar(result_buffer))); |
383 | 7.14M | } |
384 | 7.14M | } |
385 | 633k | } |
386 | 644k | } |
387 | | |
388 | 2.47M | if (include_trace_ && trace_0 ) { |
389 | 0 | response_->set_trace_buffer(trace_->DumpToString(true)); |
390 | 0 | } |
391 | 2.47M | response_->set_propagated_hybrid_time(clock_->Now().ToUint64()); |
392 | 2.47M | context_->RespondSuccess(); |
393 | 18.4E | VLOG(1) << __PRETTY_FUNCTION__ << " RespondedSuccess"; |
394 | 2.47M | } |
395 | | |
396 | | private: |
397 | 90.5k | TabletServerErrorPB* get_error() const { |
398 | 90.5k | return response_->mutable_error(); |
399 | 90.5k | } |
400 | | |
401 | | tablet::TabletPeerPtr tablet_peer_; |
402 | | const std::shared_ptr<rpc::RpcContext> context_; |
403 | | WriteResponsePB* const response_; |
404 | | tablet::WriteQuery* const query_; |
405 | | server::ClockPtr clock_; |
406 | | const bool include_trace_; |
407 | | scoped_refptr<Trace> trace_; |
408 | | }; |
409 | | |
410 | | // Checksums the scan result. |
411 | | class ScanResultChecksummer { |
412 | | public: |
413 | 3.23k | ScanResultChecksummer() {} |
414 | | |
415 | 1.60M | void HandleRow(const Schema& schema, const QLTableRow& row) { |
416 | 1.60M | QLValue value; |
417 | 1.60M | buffer_.clear(); |
418 | 7.31M | for (uint32_t col_index = 0; col_index != schema.num_columns(); ++col_index5.71M ) { |
419 | 5.71M | auto status = row.GetValue(schema.column_id(col_index), &value); |
420 | 5.71M | if (!status.ok()) { |
421 | 0 | LOG(WARNING) << "Column " << schema.column_id(col_index) |
422 | 0 | << " not found in " << row.ToString(); |
423 | 0 | continue; |
424 | 0 | } |
425 | 5.71M | buffer_.append(pointer_cast<const char*>(&col_index), sizeof(col_index)); |
426 | 5.71M | if (schema.column(col_index).is_nullable()) { |
427 | 2.98M | uint8_t defined = value.IsNull() ? 02.29k : 12.98M ; |
428 | 2.98M | buffer_.append(pointer_cast<const char*>(&defined), sizeof(defined)); |
429 | 2.98M | } |
430 | 5.71M | if (!value.IsNull()) { |
431 | 5.71M | value.value().AppendToString(&buffer_); |
432 | 5.71M | } |
433 | 5.71M | } |
434 | 1.60M | crc_->Compute(buffer_.c_str(), buffer_.size(), &agg_checksum_, nullptr); |
435 | 1.60M | } |
436 | | |
437 | | // Accessors for initializing / setting the checksum. |
438 | 3.30k | uint64_t agg_checksum() const { return agg_checksum_; } |
439 | | |
440 | | private: |
441 | | crc::Crc* const crc_ = crc::GetCrc32cInstance(); |
442 | | uint64_t agg_checksum_ = 0; |
443 | | std::string buffer_; |
444 | | }; |
445 | | |
446 | | Result<std::shared_ptr<tablet::AbstractTablet>> TabletServiceImpl::GetTabletForRead( |
447 | | const TabletId& tablet_id, tablet::TabletPeerPtr tablet_peer, |
448 | 7.72M | YBConsistencyLevel consistency_level, tserver::AllowSplitTablet allow_split_tablet) { |
449 | 7.72M | return GetTablet(server_->tablet_peer_lookup(), tablet_id, std::move(tablet_peer), |
450 | 7.72M | consistency_level, allow_split_tablet); |
451 | 7.72M | } |
452 | | |
453 | | TabletServiceImpl::TabletServiceImpl(TabletServerIf* server) |
454 | | : TabletServerServiceIf(server->MetricEnt()), |
455 | 16.7k | server_(server) { |
456 | 16.7k | } |
457 | | |
458 | | TabletServiceAdminImpl::TabletServiceAdminImpl(TabletServer* server) |
459 | 8.74k | : TabletServerAdminServiceIf(server->MetricEnt()), server_(server) {} |
460 | | |
461 | | void TabletServiceAdminImpl::BackfillDone( |
462 | | const tablet::ChangeMetadataRequestPB* req, ChangeMetadataResponsePB* resp, |
463 | 3.79k | rpc::RpcContext context) { |
464 | 3.79k | if (!CheckUuidMatchOrRespond(server_->tablet_manager(), "BackfillDone", req, resp, &context)) { |
465 | 0 | return; |
466 | 0 | } |
467 | 3.79k | DVLOG(3) << "Received BackfillDone RPC: " << req->DebugString()27 ; |
468 | | |
469 | 3.79k | server::UpdateClock(*req, server_->Clock()); |
470 | | |
471 | | // For now, we shall only allow this RPC on the leader. |
472 | 3.79k | auto tablet = |
473 | 3.79k | LookupLeaderTabletOrRespond(server_->tablet_peer_lookup(), req->tablet_id(), resp, &context); |
474 | 3.79k | if (!tablet) { |
475 | 24 | return; |
476 | 24 | } |
477 | | |
478 | 3.77k | auto operation = std::make_unique<ChangeMetadataOperation>( |
479 | 3.77k | tablet.peer->tablet(), tablet.peer->log(), req); |
480 | | |
481 | 3.77k | operation->set_completion_callback( |
482 | 3.77k | MakeRpcOperationCompletionCallback(std::move(context), resp, server_->Clock())); |
483 | | |
484 | | // Submit the alter schema op. The RPC will be responded to asynchronously. |
485 | 3.77k | tablet.peer->Submit(std::move(operation), tablet.leader_term); |
486 | 3.77k | } |
487 | | |
488 | | void TabletServiceAdminImpl::GetSafeTime( |
489 | 4.39k | const GetSafeTimeRequestPB* req, GetSafeTimeResponsePB* resp, rpc::RpcContext context) { |
490 | 4.39k | if (!CheckUuidMatchOrRespond(server_->tablet_manager(), "GetSafeTime", req, resp, &context)) { |
491 | 0 | return; |
492 | 0 | } |
493 | 4.39k | DVLOG(3) << "Received GetSafeTime RPC: " << req->DebugString()13 ; |
494 | | |
495 | 4.39k | server::UpdateClock(*req, server_->Clock()); |
496 | | |
497 | | // For now, we shall only allow this RPC on the leader. |
498 | 4.39k | auto tablet = |
499 | 4.39k | LookupLeaderTabletOrRespond(server_->tablet_peer_lookup(), req->tablet_id(), resp, &context); |
500 | 4.39k | if (!tablet) { |
501 | 0 | return; |
502 | 0 | } |
503 | 4.39k | const CoarseTimePoint& deadline = context.GetClientDeadline(); |
504 | 4.39k | HybridTime min_hybrid_time(HybridTime::kMin); |
505 | 4.39k | if (req->has_min_hybrid_time_for_backfill()) { |
506 | 4.37k | min_hybrid_time = HybridTime(req->min_hybrid_time_for_backfill()); |
507 | | // For Transactional tables, wait until there are no pending transactions that started |
508 | | // prior to min_hybrid_time. These may not have updated the index correctly, if they |
509 | | // happen to commit after the backfill scan, it is possible that they may miss updating |
510 | | // the index because the some operations may have taken place prior to min_hybrid_time. |
511 | | // |
512 | | // For Non-Txn tables, it is impossible to know at the tservers whether or not an "old |
513 | | // transaction" is still active. To avoid having such old transactions, we assume a |
514 | | // bound on the length of such transactions (during the backfill process) and wait it |
515 | | // out. |
516 | 4.37k | if (!tablet.peer->tablet()->transaction_participant()) { |
517 | 510 | min_hybrid_time = min_hybrid_time.AddMilliseconds( |
518 | 510 | FLAGS_index_backfill_upperbound_for_user_enforced_txn_duration_ms); |
519 | 510 | VLOG(2) << "GetSafeTime called on a user enforced transaction tablet " |
520 | 3 | << tablet.peer->tablet_id() << " will wait until " |
521 | 3 | << min_hybrid_time << " is safe."; |
522 | 3.86k | } else { |
523 | | // Add some extra delay to wait for operations being replicated to be |
524 | | // applied. |
525 | 3.86k | SleepFor(MonoDelta::FromMilliseconds( |
526 | 3.86k | FLAGS_index_backfill_additional_delay_before_backfilling_ms)); |
527 | | |
528 | 3.86k | auto txn_particpant = tablet.peer->tablet()->transaction_participant(); |
529 | 3.86k | auto wait_until = CoarseMonoClock::Now() + FLAGS_index_backfill_wait_for_old_txns_ms * 1ms; |
530 | 3.86k | HybridTime min_running_ht; |
531 | 3.86k | for (;;) { |
532 | 3.86k | min_running_ht = txn_particpant->MinRunningHybridTime(); |
533 | 3.87k | if ((3.86k min_running_ht3.86k && min_running_ht >= min_hybrid_time) || |
534 | 3.86k | CoarseMonoClock::Now() > wait_until3 ) { |
535 | 3.86k | break; |
536 | 3.86k | } |
537 | 18.4E | VLOG(2) << "MinRunningHybridTime is " << min_running_ht |
538 | 18.4E | << " need to wait for " << min_hybrid_time; |
539 | 18.4E | SleepFor(MonoDelta::FromMilliseconds(FLAGS_transaction_min_running_check_interval_ms)); |
540 | 18.4E | } |
541 | | |
542 | 18.4E | VLOG(2) << "Finally MinRunningHybridTime is " << min_running_ht; |
543 | 3.86k | if (min_running_ht < min_hybrid_time) { |
544 | 3 | VLOG(2) << "Aborting Txns that started prior to " << min_hybrid_time0 ; |
545 | 3 | auto s = txn_particpant->StopActiveTxnsPriorTo(min_hybrid_time, deadline); |
546 | 3 | if (!s.ok()) { |
547 | 0 | SetupErrorAndRespond(resp->mutable_error(), s, &context); |
548 | 0 | return; |
549 | 0 | } |
550 | 3 | } |
551 | 3.86k | } |
552 | 4.37k | } |
553 | | |
554 | 4.39k | auto safe_time = tablet.peer->tablet()->SafeTime( |
555 | 4.39k | tablet::RequireLease::kTrue, min_hybrid_time, deadline); |
556 | 4.39k | if (!safe_time.ok()) { |
557 | 3 | SetupErrorAndRespond(resp->mutable_error(), safe_time.status(), &context); |
558 | 3 | return; |
559 | 3 | } |
560 | | |
561 | 4.39k | resp->set_safe_time(safe_time->ToUint64()); |
562 | 4.39k | resp->set_propagated_hybrid_time(server_->Clock()->Now().ToUint64()); |
563 | 18.4E | VLOG(1) << "Tablet " << tablet.peer->tablet_id() |
564 | 18.4E | << " returning safe time " << yb::ToString(safe_time); |
565 | | |
566 | 4.39k | context.RespondSuccess(); |
567 | 4.39k | } |
568 | | |
569 | | void TabletServiceAdminImpl::BackfillIndex( |
570 | 4.49k | const BackfillIndexRequestPB* req, BackfillIndexResponsePB* resp, rpc::RpcContext context) { |
571 | 4.49k | if (!CheckUuidMatchOrRespond(server_->tablet_manager(), "BackfillIndex", req, resp, &context)) { |
572 | 0 | return; |
573 | 0 | } |
574 | 18.4E | DVLOG(3) << "Received BackfillIndex RPC: " << req->DebugString(); |
575 | | |
576 | 4.49k | server::UpdateClock(*req, server_->Clock()); |
577 | | |
578 | | // For now, we shall only allow this RPC on the leader. |
579 | 4.49k | auto tablet = |
580 | 4.49k | LookupLeaderTabletOrRespond(server_->tablet_peer_lookup(), req->tablet_id(), resp, &context); |
581 | 4.49k | if (!tablet) { |
582 | 0 | return; |
583 | 0 | } |
584 | | |
585 | 4.49k | if (req->indexes().empty()) { |
586 | 0 | SetupErrorAndRespond( |
587 | 0 | resp->mutable_error(), |
588 | 0 | STATUS(InvalidArgument, "No indexes given in request"), |
589 | 0 | TabletServerErrorPB::OPERATION_NOT_SUPPORTED, |
590 | 0 | &context); |
591 | 0 | return; |
592 | 0 | } |
593 | | |
594 | 4.49k | const CoarseTimePoint &deadline = context.GetClientDeadline(); |
595 | 4.49k | const auto coarse_start = CoarseMonoClock::Now(); |
596 | 4.49k | { |
597 | 4.49k | std::unique_lock<std::mutex> l(backfill_lock_); |
598 | 6.12k | while (num_tablets_backfilling_ >= FLAGS_num_concurrent_backfills_allowed) { |
599 | 1.62k | if (backfill_cond_.wait_until(l, deadline) == std::cv_status::timeout) { |
600 | 0 | SetupErrorAndRespond( |
601 | 0 | resp->mutable_error(), |
602 | 0 | STATUS_FORMAT(ServiceUnavailable, |
603 | 0 | "Already running $0 backfill requests", |
604 | 0 | num_tablets_backfilling_), |
605 | 0 | &context); |
606 | 0 | return; |
607 | 0 | } |
608 | 1.62k | } |
609 | 4.49k | num_tablets_backfilling_++; |
610 | 4.49k | } |
611 | 4.36k | auto se = ScopeExit([this] { |
612 | 4.36k | std::unique_lock<std::mutex> l(this->backfill_lock_); |
613 | 4.36k | this->num_tablets_backfilling_--; |
614 | 4.36k | this->backfill_cond_.notify_all(); |
615 | 4.36k | }); |
616 | | |
617 | | // Wait for SafeTime to get past read_at; |
618 | 4.49k | const HybridTime read_at(req->read_at_hybrid_time()); |
619 | 18.4E | DVLOG(1) << "Waiting for safe time to be past " << read_at; |
620 | 4.49k | const auto safe_time = |
621 | 4.49k | tablet.peer->tablet()->SafeTime(tablet::RequireLease::kFalse, read_at, deadline); |
622 | 4.49k | DVLOG(1) << "Got safe time " << safe_time.ToString()88 ; |
623 | 4.49k | if (!safe_time.ok()) { |
624 | 0 | LOG(ERROR) << "Could not get a good enough safe time " << safe_time.ToString(); |
625 | 0 | SetupErrorAndRespond(resp->mutable_error(), safe_time.status(), &context); |
626 | 0 | return; |
627 | 0 | } |
628 | | |
629 | | // Don't work on the request if we have had to wait more than 50% |
630 | | // of the time allocated to us for the RPC. |
631 | | // Backfill is a costly operation, we do not want to start working |
632 | | // on it if we expect the client (master) to time out the RPC and |
633 | | // force us to redo the work. |
634 | 4.49k | const auto coarse_now = CoarseMonoClock::Now(); |
635 | 4.49k | if (deadline - coarse_now < coarse_now - coarse_start) { |
636 | 0 | SetupErrorAndRespond( |
637 | 0 | resp->mutable_error(), |
638 | 0 | STATUS_FORMAT( |
639 | 0 | ServiceUnavailable, "Not enough time left $0", deadline - coarse_now), |
640 | 0 | &context); |
641 | 0 | return; |
642 | 0 | } |
643 | | |
644 | 4.49k | bool all_at_backfill = true; |
645 | 4.49k | bool all_past_backfill = true; |
646 | 4.49k | bool is_pg_table = tablet.peer->tablet()->table_type() == TableType::PGSQL_TABLE_TYPE; |
647 | 4.49k | const shared_ptr<IndexMap> index_map = tablet.peer->tablet_metadata()->index_map( |
648 | 4.49k | req->indexed_table_id()); |
649 | 4.49k | std::vector<IndexInfo> indexes_to_backfill; |
650 | 4.49k | std::vector<TableId> index_ids; |
651 | 4.49k | for (const auto& idx : req->indexes()) { |
652 | 4.43k | auto result = index_map->FindIndex(idx.table_id()); |
653 | 4.43k | if (result) { |
654 | 4.42k | const IndexInfo* index_info = *result; |
655 | 4.42k | indexes_to_backfill.push_back(*index_info); |
656 | 4.42k | index_ids.push_back(index_info->table_id()); |
657 | | |
658 | 4.42k | IndexInfoPB idx_info_pb; |
659 | 4.42k | index_info->ToPB(&idx_info_pb); |
660 | 4.42k | if (!is_pg_table) { |
661 | 2.45k | all_at_backfill &= |
662 | 2.45k | idx_info_pb.index_permissions() == IndexPermissions::INDEX_PERM_DO_BACKFILL; |
663 | 2.45k | } else { |
664 | | // YSQL tables don't use all the docdb permissions, so use this approximation. |
665 | | // TODO(jason): change this back to being like YCQL once we bring the docdb permission |
666 | | // DO_BACKFILL back (issue #6218). |
667 | 1.97k | all_at_backfill &= |
668 | 1.97k | idx_info_pb.index_permissions() == IndexPermissions::INDEX_PERM_WRITE_AND_DELETE; |
669 | 1.97k | } |
670 | 4.42k | all_past_backfill &= |
671 | 4.42k | idx_info_pb.index_permissions() > IndexPermissions::INDEX_PERM_DO_BACKFILL; |
672 | 4.42k | } else { |
673 | 13 | LOG(WARNING) << "index " << idx.table_id() << " not found in tablet metadata"; |
674 | 13 | all_at_backfill = false; |
675 | 13 | all_past_backfill = false; |
676 | 13 | } |
677 | 4.43k | } |
678 | | |
679 | 4.49k | if (!all_at_backfill) { |
680 | 17 | if (all_past_backfill) { |
681 | | // Change this to see if for all indexes: IndexPermission > DO_BACKFILL. |
682 | 8 | LOG(WARNING) << "Received BackfillIndex RPC: " << req->DebugString() |
683 | 8 | << " after all indexes have moved past DO_BACKFILL. IndexMap is " |
684 | 8 | << ToString(index_map); |
685 | | // This is possible if this tablet completed the backfill. But the master failed over before |
686 | | // other tablets could complete. |
687 | | // The new master is redoing the backfill. We are safe to ignore this request. |
688 | 8 | context.RespondSuccess(); |
689 | 8 | return; |
690 | 8 | } |
691 | | |
692 | 9 | uint32_t our_schema_version = tablet.peer->tablet_metadata()->schema_version(); |
693 | 9 | uint32_t their_schema_version = req->schema_version(); |
694 | 9 | DCHECK_NE(our_schema_version, their_schema_version); |
695 | 9 | SetupErrorAndRespond( |
696 | 9 | resp->mutable_error(), |
697 | 9 | STATUS_SUBSTITUTE( |
698 | 9 | InvalidArgument, |
699 | 9 | "Tablet has a different schema $0 vs $1. " |
700 | 9 | "Requested index is not ready to backfill. IndexMap: $2", |
701 | 9 | our_schema_version, their_schema_version, ToString(index_map)), |
702 | 9 | TabletServerErrorPB::MISMATCHED_SCHEMA, &context); |
703 | 9 | return; |
704 | 17 | } |
705 | | |
706 | 4.48k | Status backfill_status; |
707 | 4.48k | std::string backfilled_until; |
708 | 4.48k | std::unordered_set<TableId> failed_indexes; |
709 | 4.48k | size_t number_rows_processed = 0; |
710 | 4.48k | if (is_pg_table) { |
711 | 1.95k | if (!req->has_namespace_name()) { |
712 | 0 | SetupErrorAndRespond( |
713 | 0 | resp->mutable_error(), |
714 | 0 | STATUS( |
715 | 0 | InvalidArgument, |
716 | 0 | "Attempted backfill on YSQL table without supplying database name"), |
717 | 0 | TabletServerErrorPB::OPERATION_NOT_SUPPORTED, |
718 | 0 | &context); |
719 | 0 | return; |
720 | 0 | } |
721 | 1.95k | backfill_status = tablet.peer->tablet()->BackfillIndexesForYsql( |
722 | 1.95k | indexes_to_backfill, |
723 | 1.95k | req->start_key(), |
724 | 1.95k | deadline, |
725 | 1.95k | read_at, |
726 | 1.95k | server_->pgsql_proxy_bind_address(), |
727 | 1.95k | req->namespace_name(), |
728 | 1.95k | server_->GetSharedMemoryPostgresAuthKey(), |
729 | 1.95k | &number_rows_processed, |
730 | 1.95k | &backfilled_until); |
731 | 1.95k | if (backfill_status.IsIllegalState()) { |
732 | 24 | DCHECK_EQ(failed_indexes.size(), 0) << "We don't support batching in YSQL yet"0 ; |
733 | 24 | for (const auto& idx_info : indexes_to_backfill) { |
734 | 24 | failed_indexes.insert(idx_info.table_id()); |
735 | 24 | } |
736 | 24 | DCHECK_EQ(failed_indexes.size(), 1) << "We don't support batching in YSQL yet"0 ; |
737 | 24 | } |
738 | 2.52k | } else if (tablet.peer->tablet()->table_type() == TableType::YQL_TABLE_TYPE) { |
739 | 2.43k | backfill_status = tablet.peer->tablet()->BackfillIndexes( |
740 | 2.43k | indexes_to_backfill, |
741 | 2.43k | req->start_key(), |
742 | 2.43k | deadline, |
743 | 2.43k | read_at, |
744 | 2.43k | &number_rows_processed, |
745 | 2.43k | &backfilled_until, |
746 | 2.43k | &failed_indexes); |
747 | 2.43k | } else { |
748 | 89 | SetupErrorAndRespond( |
749 | 89 | resp->mutable_error(), |
750 | 89 | STATUS(InvalidArgument, "Attempted backfill on tablet of invalid table type"), |
751 | 89 | TabletServerErrorPB::OPERATION_NOT_SUPPORTED, |
752 | 89 | &context); |
753 | 89 | return; |
754 | 89 | } |
755 | 4.39k | DVLOG(1) << "Tablet " << tablet.peer->tablet_id() << " backfilled indexes " |
756 | 41 | << yb::ToString(index_ids) << " and got " << backfill_status |
757 | 41 | << " backfilled until : " << backfilled_until; |
758 | | |
759 | 4.39k | resp->set_backfilled_until(backfilled_until); |
760 | 4.39k | resp->set_propagated_hybrid_time(server_->Clock()->Now().ToUint64()); |
761 | 4.39k | resp->set_number_rows_processed(number_rows_processed); |
762 | | |
763 | 4.39k | if (!backfill_status.ok()) { |
764 | 29 | VLOG(2) << " Failed indexes are " << yb::ToString(failed_indexes)0 ; |
765 | 29 | for (const auto& idx : failed_indexes) { |
766 | 29 | *resp->add_failed_index_ids() = idx; |
767 | 29 | } |
768 | 29 | SetupErrorAndRespond( |
769 | 29 | resp->mutable_error(), |
770 | 29 | backfill_status, |
771 | 29 | (backfill_status.IsIllegalState() |
772 | 29 | ? TabletServerErrorPB::OPERATION_NOT_SUPPORTED |
773 | 29 | : TabletServerErrorPB::UNKNOWN_ERROR0 ), |
774 | 29 | &context); |
775 | 29 | return; |
776 | 29 | } |
777 | | |
778 | 4.36k | context.RespondSuccess(); |
779 | 4.36k | } |
780 | | |
781 | | void TabletServiceAdminImpl::AlterSchema(const tablet::ChangeMetadataRequestPB* req, |
782 | | ChangeMetadataResponsePB* resp, |
783 | 28.5k | rpc::RpcContext context) { |
784 | 28.5k | if (!CheckUuidMatchOrRespond(server_->tablet_manager(), "ChangeMetadata", req, resp, &context)) { |
785 | 0 | return; |
786 | 0 | } |
787 | 28.5k | VLOG(1) << "Received Change Metadata RPC: " << req->DebugString()297 ; |
788 | 28.5k | if (FLAGS_TEST_alter_schema_delay_ms) { |
789 | 54 | LOG(INFO) << __func__ << ": sleeping for " << FLAGS_TEST_alter_schema_delay_ms << "ms"; |
790 | 54 | SleepFor(MonoDelta::FromMilliseconds(FLAGS_TEST_alter_schema_delay_ms)); |
791 | 54 | LOG(INFO) << __func__ << ": done sleeping for " << FLAGS_TEST_alter_schema_delay_ms << "ms"; |
792 | 54 | } |
793 | | |
794 | 28.5k | server::UpdateClock(*req, server_->Clock()); |
795 | | |
796 | 28.5k | auto tablet = LookupLeaderTabletOrRespond( |
797 | 28.5k | server_->tablet_peer_lookup(), req->tablet_id(), resp, &context); |
798 | 28.5k | if (!tablet) { |
799 | 570 | return; |
800 | 570 | } |
801 | | |
802 | 27.9k | tablet::TableInfoPtr table_info; |
803 | 27.9k | if (req->has_alter_table_id()) { |
804 | 27.6k | auto result = tablet.peer->tablet_metadata()->GetTableInfo(req->alter_table_id()); |
805 | 27.6k | if (!result.ok()) { |
806 | 0 | SetupErrorAndRespond(resp->mutable_error(), result.status(), |
807 | 0 | TabletServerErrorPB::INVALID_SCHEMA, &context); |
808 | 0 | return; |
809 | 0 | } |
810 | 27.6k | table_info = *result; |
811 | 27.6k | } else { |
812 | 320 | table_info = tablet.peer->tablet_metadata()->primary_table_info(); |
813 | 320 | } |
814 | 27.9k | const Schema& tablet_schema = *table_info->schema; |
815 | 27.9k | uint32_t schema_version = table_info->schema_version; |
816 | | // Sanity check, to verify that the tablet should have the same schema |
817 | | // specified in the request. |
818 | 27.9k | Schema req_schema; |
819 | 27.9k | Status s = SchemaFromPB(req->schema(), &req_schema); |
820 | 27.9k | if (!s.ok()) { |
821 | 0 | SetupErrorAndRespond(resp->mutable_error(), s, TabletServerErrorPB::INVALID_SCHEMA, &context); |
822 | 0 | return; |
823 | 0 | } |
824 | | |
825 | | // If the schema was already applied, respond as succeeded. |
826 | 27.9k | if (!req->has_wal_retention_secs() && schema_version == req->schema_version()22.6k ) { |
827 | | |
828 | 401 | if (req_schema.Equals(tablet_schema)398 ) { |
829 | 401 | context.RespondSuccess(); |
830 | 401 | return; |
831 | 401 | } |
832 | | |
833 | 18.4E | schema_version = tablet.peer->tablet_metadata()->schema_version( |
834 | 18.4E | req->has_alter_table_id() ? req->alter_table_id()0 : ""); |
835 | 18.4E | if (schema_version == req->schema_version()) { |
836 | 0 | LOG(ERROR) << "The current schema does not match the request schema." |
837 | 0 | << " version=" << schema_version |
838 | 0 | << " current-schema=" << tablet_schema.ToString() |
839 | 0 | << " request-schema=" << req_schema.ToString() |
840 | 0 | << " (corruption)"; |
841 | 0 | SetupErrorAndRespond(resp->mutable_error(), |
842 | 0 | STATUS(Corruption, "got a different schema for the same version number"), |
843 | 0 | TabletServerErrorPB::MISMATCHED_SCHEMA, &context); |
844 | 0 | return; |
845 | 0 | } |
846 | 18.4E | } |
847 | | |
848 | | // If the current schema is newer than the one in the request reject the request. |
849 | 27.5k | if (schema_version > req->schema_version()) { |
850 | 0 | LOG(ERROR) << "Tablet " << req->tablet_id() << " has a newer schema" |
851 | 0 | << " version=" << schema_version |
852 | 0 | << " req->schema_version()=" << req->schema_version() |
853 | 0 | << "\n current-schema=" << tablet_schema.ToString() |
854 | 0 | << "\n request-schema=" << req_schema.ToString(); |
855 | 0 | SetupErrorAndRespond( |
856 | 0 | resp->mutable_error(), |
857 | 0 | STATUS_SUBSTITUTE( |
858 | 0 | InvalidArgument, "Tablet has a newer schema Tab $0. Req $1 vs Existing version : $2", |
859 | 0 | req->tablet_id(), req->schema_version(), schema_version), |
860 | 0 | TabletServerErrorPB::TABLET_HAS_A_NEWER_SCHEMA, &context); |
861 | 0 | return; |
862 | 0 | } |
863 | | |
864 | 18.4E | VLOG(1) << "Tablet updating schema from " |
865 | 18.4E | << " version=" << schema_version << " current-schema=" << tablet_schema.ToString() |
866 | 18.4E | << " to request-schema=" << req_schema.ToString() |
867 | 18.4E | << " for table ID=" << table_info->table_id; |
868 | 27.5k | ScopedRWOperationPause pause_writes; |
869 | 27.5k | if ((tablet.peer->tablet()->table_type() == TableType::YQL_TABLE_TYPE && |
870 | 27.5k | !GetAtomicFlag(&FLAGS_disable_alter_vs_write_mutual_exclusion)14.1k ) || |
871 | 27.5k | tablet.peer->tablet()->table_type() == TableType::PGSQL_TABLE_TYPE13.1k ) { |
872 | | // For schema change operations we will have to pause the write operations |
873 | | // until the schema change is done. This will be done synchronously. |
874 | 27.4k | pause_writes = tablet.peer->tablet()->PauseWritePermits(context.GetClientDeadline()); |
875 | 27.4k | if (!pause_writes.ok()) { |
876 | 0 | SetupErrorAndRespond( |
877 | 0 | resp->mutable_error(), |
878 | 0 | STATUS( |
879 | 0 | TryAgain, "Could not lock the tablet against write operations for schema change"), |
880 | 0 | &context); |
881 | 0 | return; |
882 | 0 | } |
883 | | |
884 | | // After write operation is paused, active transactions will be aborted for YSQL transactions. |
885 | 27.4k | if (tablet.peer->tablet()->table_type() == TableType::PGSQL_TABLE_TYPE && |
886 | 27.4k | req->should_abort_active_txns()13.2k ) { |
887 | 1.13k | DCHECK(req->has_transaction_id()); |
888 | 1.13k | if (tablet.peer->tablet()->transaction_participant() == nullptr) { |
889 | 0 | auto status = STATUS( |
890 | 0 | IllegalState, "Transaction participant is null for tablet " + req->tablet_id()); |
891 | 0 | LOG(ERROR) << status; |
892 | 0 | SetupErrorAndRespond( |
893 | 0 | resp->mutable_error(), |
894 | 0 | status, |
895 | 0 | &context); |
896 | 0 | return; |
897 | 0 | } |
898 | 1.13k | HybridTime max_cutoff = HybridTime::kMax; |
899 | 1.13k | CoarseTimePoint deadline = |
900 | 1.13k | CoarseMonoClock::Now() + |
901 | 1.13k | MonoDelta::FromMilliseconds(FLAGS_ysql_transaction_abort_timeout_ms); |
902 | 1.13k | TransactionId txn_id = CHECK_RESULT(TransactionId::FromString(req->transaction_id())); |
903 | 1.13k | LOG(INFO) << "Aborting transactions that started prior to " << max_cutoff |
904 | 1.13k | << " for tablet id " << req->tablet_id() |
905 | 1.13k | << " excluding transaction with id " << txn_id; |
906 | | // There could be a chance where a transaction does not appear by transaction_participant |
907 | | // but has already begun replicating through Raft. Such transactions might succeed rather |
908 | | // than get aborted. This race codnition is dismissable for this intermediate solution. |
909 | 1.13k | Status status = tablet.peer->tablet()->transaction_participant()->StopActiveTxnsPriorTo( |
910 | 1.13k | max_cutoff, deadline, &txn_id); |
911 | 1.13k | if (!status.ok() || PREDICT_FALSE1.13k (FLAGS_TEST_fail_alter_schema_after_abort_transactions)) { |
912 | 0 | auto status = STATUS(TryAgain, "Transaction abort failed for tablet " + req->tablet_id()); |
913 | 0 | LOG(WARNING) << status; |
914 | 0 | SetupErrorAndRespond( |
915 | 0 | resp->mutable_error(), |
916 | 0 | status, |
917 | 0 | &context); |
918 | 0 | return; |
919 | 0 | } |
920 | 1.13k | } |
921 | 27.4k | } |
922 | 27.5k | auto operation = std::make_unique<ChangeMetadataOperation>( |
923 | 27.5k | tablet.peer->tablet(), tablet.peer->log(), req); |
924 | | |
925 | 27.5k | operation->set_completion_callback( |
926 | 27.5k | MakeRpcOperationCompletionCallback(std::move(context), resp, server_->Clock())); |
927 | 27.5k | operation->UsePermitToken(std::move(pause_writes)); |
928 | | |
929 | | // Submit the alter schema op. The RPC will be responded to asynchronously. |
930 | 27.5k | tablet.peer->Submit(std::move(operation), tablet.leader_term); |
931 | 27.5k | } |
932 | | |
933 | 28.0M | #define VERIFY_RESULT_OR_RETURN(expr) RESULT_CHECKER_HELPER( \ |
934 | 27.9M | expr, \ |
935 | 27.9M | if (!__result.ok()) { return; }); |
936 | | |
937 | | void TabletServiceImpl::VerifyTableRowRange( |
938 | | const VerifyTableRowRangeRequestPB* req, |
939 | | VerifyTableRowRangeResponsePB* resp, |
940 | 0 | rpc::RpcContext context) { |
941 | 0 | DVLOG(3) << "Received VerifyTableRowRange RPC: " << req->DebugString(); |
942 | |
|
943 | 0 | server::UpdateClock(*req, server_->Clock()); |
944 | |
|
945 | 0 | auto peer_tablet = |
946 | 0 | LookupTabletPeerOrRespond(server_->tablet_peer_lookup(), req->tablet_id(), resp, &context); |
947 | 0 | if (!peer_tablet) { |
948 | 0 | return; |
949 | 0 | } |
950 | | |
951 | 0 | auto tablet = peer_tablet->tablet; |
952 | 0 | bool is_pg_table = tablet->table_type() == TableType::PGSQL_TABLE_TYPE; |
953 | 0 | if (is_pg_table) { |
954 | 0 | SetupErrorAndRespond( |
955 | 0 | resp->mutable_error(), STATUS(NotFound, "Verify operation not supported for PGSQL tables."), |
956 | 0 | &context); |
957 | 0 | return; |
958 | 0 | } |
959 | | |
960 | 0 | const CoarseTimePoint& deadline = context.GetClientDeadline(); |
961 | | |
962 | | // Wait for SafeTime to get past read_at; |
963 | 0 | const HybridTime read_at(req->read_time()); |
964 | 0 | DVLOG(1) << "Waiting for safe time to be past " << read_at; |
965 | 0 | const auto safe_time = tablet->SafeTime(tablet::RequireLease::kFalse, read_at, deadline); |
966 | 0 | DVLOG(1) << "Got safe time " << safe_time.ToString(); |
967 | 0 | if (!safe_time.ok()) { |
968 | 0 | LOG(ERROR) << "Could not get a good enough safe time " << safe_time.ToString(); |
969 | 0 | SetupErrorAndRespond(resp->mutable_error(), safe_time.status(), &context); |
970 | 0 | return; |
971 | 0 | } |
972 | | |
973 | 0 | auto valid_read_at = req->has_read_time() ? read_at : *safe_time; |
974 | 0 | std::string verified_until = ""; |
975 | 0 | std::unordered_map<TableId, uint64> consistency_stats; |
976 | |
|
977 | 0 | if (peer_tablet->tablet_peer->tablet_metadata()->primary_table_info()->index_info) { |
978 | 0 | auto index_info = |
979 | 0 | *peer_tablet->tablet_peer->tablet_metadata()->primary_table_info()->index_info; |
980 | 0 | const auto& table_id = index_info.indexed_table_id(); |
981 | 0 | Status verify_status = tablet->VerifyMainTableConsistencyForCQL( |
982 | 0 | table_id, req->start_key(), req->num_rows(), deadline, valid_read_at, &consistency_stats, |
983 | 0 | &verified_until); |
984 | 0 | if (!verify_status.ok()) { |
985 | 0 | SetupErrorAndRespond(resp->mutable_error(), verify_status, &context); |
986 | 0 | return; |
987 | 0 | } |
988 | | |
989 | 0 | (*resp->mutable_consistency_stats())[table_id] = consistency_stats[table_id]; |
990 | 0 | } else { |
991 | 0 | const IndexMap index_map = |
992 | 0 | *peer_tablet->tablet_peer->tablet_metadata()->primary_table_info()->index_map; |
993 | 0 | vector<IndexInfo> indexes; |
994 | 0 | vector<TableId> index_ids; |
995 | 0 | if (req->index_ids().empty()) { |
996 | 0 | for (auto it = index_map.begin(); it != index_map.end(); it++) { |
997 | 0 | indexes.push_back(it->second); |
998 | 0 | } |
999 | 0 | } else { |
1000 | 0 | for (const auto& idx : req->index_ids()) { |
1001 | 0 | auto result = index_map.FindIndex(idx); |
1002 | 0 | if (result) { |
1003 | 0 | const IndexInfo* index_info = *result; |
1004 | 0 | indexes.push_back(*index_info); |
1005 | 0 | index_ids.push_back(index_info->table_id()); |
1006 | 0 | } else { |
1007 | 0 | LOG(WARNING) << "Index " << idx << " not found in tablet metadata"; |
1008 | 0 | } |
1009 | 0 | } |
1010 | 0 | } |
1011 | |
|
1012 | 0 | Status verify_status = tablet->VerifyIndexTableConsistencyForCQL( |
1013 | 0 | indexes, req->start_key(), req->num_rows(), deadline, valid_read_at, &consistency_stats, |
1014 | 0 | &verified_until); |
1015 | 0 | if (!verify_status.ok()) { |
1016 | 0 | SetupErrorAndRespond(resp->mutable_error(), verify_status, &context); |
1017 | 0 | return; |
1018 | 0 | } |
1019 | | |
1020 | 0 | for (const IndexInfo& index : indexes) { |
1021 | 0 | const auto& table_id = index.table_id(); |
1022 | 0 | (*resp->mutable_consistency_stats())[table_id] = consistency_stats[table_id]; |
1023 | 0 | } |
1024 | 0 | } |
1025 | 0 | resp->set_verified_until(verified_until); |
1026 | 0 | context.RespondSuccess(); |
1027 | 0 | } |
1028 | | |
1029 | | void TabletServiceImpl::UpdateTransaction(const UpdateTransactionRequestPB* req, |
1030 | | UpdateTransactionResponsePB* resp, |
1031 | 2.53M | rpc::RpcContext context) { |
1032 | 2.53M | TRACE("UpdateTransaction"); |
1033 | | |
1034 | 2.53M | if (req->state().status() == TransactionStatus::CREATED && |
1035 | 2.53M | RandomActWithProbability(TEST_delay_create_transaction_probability)411k ) { |
1036 | 0 | std::this_thread::sleep_for( |
1037 | 0 | (FLAGS_transaction_rpc_timeout_ms + RandomUniformInt(-200, 200)) * 1ms); |
1038 | 0 | } |
1039 | | |
1040 | 2.53M | VLOG(1) << "UpdateTransaction: " << req->ShortDebugString() |
1041 | 1.83k | << ", context: " << context.ToString(); |
1042 | 2.53M | LOG_IF(DFATAL, !req->has_propagated_hybrid_time()) |
1043 | 3.08k | << __func__ << " missing propagated hybrid time for " |
1044 | 3.08k | << TransactionStatus_Name(req->state().status()); |
1045 | 2.53M | UpdateClock(*req, server_->Clock()); |
1046 | | |
1047 | 2.53M | LeaderTabletPeer tablet; |
1048 | 2.53M | auto txn_status = req->state().status(); |
1049 | 2.53M | auto cleanup = txn_status == TransactionStatus::IMMEDIATE_CLEANUP || |
1050 | 2.53M | txn_status == TransactionStatus::GRACEFUL_CLEANUP1.85M ; |
1051 | 2.53M | if (cleanup) { |
1052 | 927k | auto peer_tablet = VERIFY_RESULT_OR_RETURN(LookupTabletPeerOrRespond( |
1053 | 927k | server_->tablet_peer_lookup(), req->tablet_id(), resp, &context)); |
1054 | 927k | tablet.FillTabletPeer(std::move(peer_tablet)); |
1055 | 927k | tablet.leader_term = OpId::kUnknownTerm; |
1056 | 1.60M | } else { |
1057 | 1.60M | tablet = LookupLeaderTabletOrRespond( |
1058 | 1.60M | server_->tablet_peer_lookup(), req->tablet_id(), resp, &context); |
1059 | 1.60M | } |
1060 | 2.53M | if (!tablet) { |
1061 | 3.77k | return; |
1062 | 3.77k | } |
1063 | | |
1064 | 2.52M | auto state = std::make_unique<tablet::UpdateTxnOperation>(tablet.tablet.get(), &req->state()); |
1065 | 2.52M | state->set_completion_callback(MakeRpcOperationCompletionCallback( |
1066 | 2.52M | std::move(context), resp, server_->Clock())); |
1067 | | |
1068 | 2.52M | if (req->state().status() == TransactionStatus::APPLYING || cleanup2.09M ) { |
1069 | 1.36M | auto* participant = tablet.tablet->transaction_participant(); |
1070 | 1.36M | if (participant1.36M ) { |
1071 | 1.36M | participant->Handle(std::move(state), tablet.leader_term); |
1072 | 18.4E | } else { |
1073 | 18.4E | state->CompleteWithStatus(STATUS_FORMAT( |
1074 | 18.4E | InvalidArgument, "Does not have transaction participant to process $0", |
1075 | 18.4E | req->state().status())); |
1076 | 18.4E | } |
1077 | 1.36M | } else { |
1078 | 1.16M | auto* coordinator = tablet.tablet->transaction_coordinator(); |
1079 | 1.16M | if (coordinator) { |
1080 | 1.16M | coordinator->Handle(std::move(state), tablet.leader_term); |
1081 | 1.16M | } else { |
1082 | 783 | state->CompleteWithStatus(STATUS_FORMAT( |
1083 | 783 | InvalidArgument, "Does not have transaction coordinator to process $0", |
1084 | 783 | req->state().status())); |
1085 | 783 | } |
1086 | 1.16M | } |
1087 | 2.52M | } |
1088 | | |
1089 | | template <class Req, class Resp, class Action> |
1090 | | void TabletServiceImpl::PerformAtLeader( |
1091 | 296k | const Req& req, Resp* resp, rpc::RpcContext* context, const Action& action) { |
1092 | 296k | UpdateClock(*req, server_->Clock()); |
1093 | | |
1094 | 296k | auto tablet_peer = LookupLeaderTabletOrRespond( |
1095 | 296k | server_->tablet_peer_lookup(), req->tablet_id(), resp, context); |
1096 | | |
1097 | 296k | if (!tablet_peer) { |
1098 | 223 | return; |
1099 | 223 | } |
1100 | | |
1101 | 296k | auto status = action(tablet_peer); |
1102 | | |
1103 | 296k | if (*context) { |
1104 | 296k | resp->set_propagated_hybrid_time(server_->Clock()->Now().ToUint64()); |
1105 | 296k | if (status.ok()296k ) { |
1106 | 296k | context->RespondSuccess(); |
1107 | 18.4E | } else { |
1108 | 18.4E | SetupErrorAndRespond(resp->mutable_error(), status, context); |
1109 | 18.4E | } |
1110 | 296k | } |
1111 | 296k | } tablet_service.cc:void yb::tserver::TabletServiceImpl::PerformAtLeader<yb::tserver::GetTransactionStatusRequestPB const*, yb::tserver::GetTransactionStatusResponsePB, yb::tserver::TabletServiceImpl::GetTransactionStatus(yb::tserver::GetTransactionStatusRequestPB const*, yb::tserver::GetTransactionStatusResponsePB*, yb::rpc::RpcContext)::$_1>(yb::tserver::GetTransactionStatusRequestPB const* const&, yb::tserver::GetTransactionStatusResponsePB*, yb::rpc::RpcContext*, yb::tserver::TabletServiceImpl::GetTransactionStatus(yb::tserver::GetTransactionStatusRequestPB const*, yb::tserver::GetTransactionStatusResponsePB*, yb::rpc::RpcContext)::$_1 const&) Line | Count | Source | 1091 | 296k | const Req& req, Resp* resp, rpc::RpcContext* context, const Action& action) { | 1092 | 296k | UpdateClock(*req, server_->Clock()); | 1093 | | | 1094 | 296k | auto tablet_peer = LookupLeaderTabletOrRespond( | 1095 | 296k | server_->tablet_peer_lookup(), req->tablet_id(), resp, context); | 1096 | | | 1097 | 296k | if (!tablet_peer) { | 1098 | 223 | return; | 1099 | 223 | } | 1100 | | | 1101 | 296k | auto status = action(tablet_peer); | 1102 | | | 1103 | 296k | if (*context) { | 1104 | 296k | resp->set_propagated_hybrid_time(server_->Clock()->Now().ToUint64()); | 1105 | 296k | if (status.ok()296k ) { | 1106 | 296k | context->RespondSuccess(); | 1107 | 18.4E | } else { | 1108 | 18.4E | SetupErrorAndRespond(resp->mutable_error(), status, context); | 1109 | 18.4E | } | 1110 | 296k | } | 1111 | 296k | } |
Unexecuted instantiation: tablet_service.cc:void yb::tserver::TabletServiceImpl::PerformAtLeader<yb::tserver::GetTransactionStatusAtParticipantRequestPB const*, yb::tserver::GetTransactionStatusAtParticipantResponsePB, yb::tserver::TabletServiceImpl::GetTransactionStatusAtParticipant(yb::tserver::GetTransactionStatusAtParticipantRequestPB const*, yb::tserver::GetTransactionStatusAtParticipantResponsePB*, yb::rpc::RpcContext)::$_2>(yb::tserver::GetTransactionStatusAtParticipantRequestPB const* const&, yb::tserver::GetTransactionStatusAtParticipantResponsePB*, yb::rpc::RpcContext*, yb::tserver::TabletServiceImpl::GetTransactionStatusAtParticipant(yb::tserver::GetTransactionStatusAtParticipantRequestPB const*, yb::tserver::GetTransactionStatusAtParticipantResponsePB*, yb::rpc::RpcContext)::$_2 const&) tablet_service.cc:void yb::tserver::TabletServiceImpl::PerformAtLeader<yb::tserver::GetSplitKeyRequestPB const*, yb::tserver::GetSplitKeyResponsePB, yb::tserver::TabletServiceImpl::GetSplitKey(yb::tserver::GetSplitKeyRequestPB const*, yb::tserver::GetSplitKeyResponsePB*, yb::rpc::RpcContext)::$_4>(yb::tserver::GetSplitKeyRequestPB const* const&, yb::tserver::GetSplitKeyResponsePB*, yb::rpc::RpcContext*, yb::tserver::TabletServiceImpl::GetSplitKey(yb::tserver::GetSplitKeyRequestPB const*, yb::tserver::GetSplitKeyResponsePB*, yb::rpc::RpcContext)::$_4 const&) Line | Count | Source | 1091 | 144 | const Req& req, Resp* resp, rpc::RpcContext* context, const Action& action) { | 1092 | 144 | UpdateClock(*req, server_->Clock()); | 1093 | | | 1094 | 144 | auto tablet_peer = LookupLeaderTabletOrRespond( | 1095 | 144 | server_->tablet_peer_lookup(), req->tablet_id(), resp, context); | 1096 | | | 1097 | 144 | if (!tablet_peer) { | 1098 | 0 | return; | 1099 | 0 | } | 1100 | | | 1101 | 144 | auto status = action(tablet_peer); | 1102 | | | 1103 | 144 | if (*context) { | 1104 | 144 | resp->set_propagated_hybrid_time(server_->Clock()->Now().ToUint64()); | 1105 | 144 | if (status.ok()) { | 1106 | 143 | context->RespondSuccess(); | 1107 | 143 | } else { | 1108 | 1 | SetupErrorAndRespond(resp->mutable_error(), status, context); | 1109 | 1 | } | 1110 | 144 | } | 1111 | 144 | } |
|
1112 | | |
1113 | | void TabletServiceImpl::GetTransactionStatus(const GetTransactionStatusRequestPB* req, |
1114 | | GetTransactionStatusResponsePB* resp, |
1115 | 296k | rpc::RpcContext context) { |
1116 | 296k | TRACE("GetTransactionStatus"); |
1117 | | |
1118 | 296k | PerformAtLeader(req, resp, &context, |
1119 | 296k | [req, resp, &context](const LeaderTabletPeer& tablet_peer) { |
1120 | 296k | auto* transaction_coordinator = tablet_peer.tablet->transaction_coordinator(); |
1121 | 296k | if (!transaction_coordinator) { |
1122 | 0 | return STATUS_FORMAT( |
1123 | 0 | InvalidArgument, "No transaction coordinator at tablet $0", |
1124 | 0 | tablet_peer.peer->tablet_id()); |
1125 | 0 | } |
1126 | 296k | return transaction_coordinator->GetStatus( |
1127 | 296k | req->transaction_id(), context.GetClientDeadline(), resp); |
1128 | 296k | }); |
1129 | 296k | } |
1130 | | |
1131 | | void TabletServiceImpl::GetTransactionStatusAtParticipant( |
1132 | | const GetTransactionStatusAtParticipantRequestPB* req, |
1133 | | GetTransactionStatusAtParticipantResponsePB* resp, |
1134 | 0 | rpc::RpcContext context) { |
1135 | 0 | TRACE("GetTransactionStatusAtParticipant"); |
1136 | |
|
1137 | 0 | PerformAtLeader(req, resp, &context, |
1138 | 0 | [req, resp, &context](const LeaderTabletPeer& tablet_peer) -> Status { |
1139 | 0 | auto* transaction_participant = tablet_peer.peer->tablet()->transaction_participant(); |
1140 | 0 | if (!transaction_participant) { |
1141 | 0 | return STATUS_FORMAT( |
1142 | 0 | InvalidArgument, "No transaction participant at tablet $0", |
1143 | 0 | tablet_peer.peer->tablet_id()); |
1144 | 0 | } |
1145 | | |
1146 | 0 | transaction_participant->GetStatus( |
1147 | 0 | VERIFY_RESULT(FullyDecodeTransactionId(req->transaction_id())), |
1148 | 0 | req->required_num_replicated_batches(), tablet_peer.leader_term, resp, &context); |
1149 | 0 | return Status::OK(); |
1150 | 0 | }); |
1151 | 0 | } |
1152 | | |
1153 | | void TabletServiceImpl::AbortTransaction(const AbortTransactionRequestPB* req, |
1154 | | AbortTransactionResponsePB* resp, |
1155 | 193k | rpc::RpcContext context) { |
1156 | 193k | TRACE("AbortTransaction"); |
1157 | | |
1158 | 193k | UpdateClock(*req, server_->Clock()); |
1159 | | |
1160 | 193k | auto tablet = LookupLeaderTabletOrRespond( |
1161 | 193k | server_->tablet_peer_lookup(), req->tablet_id(), resp, &context); |
1162 | 193k | if (!tablet) { |
1163 | 6.74k | return; |
1164 | 6.74k | } |
1165 | | |
1166 | 186k | server::ClockPtr clock(server_->Clock()); |
1167 | 186k | auto context_ptr = std::make_shared<rpc::RpcContext>(std::move(context)); |
1168 | 186k | tablet.peer->tablet()->transaction_coordinator()->Abort( |
1169 | 186k | req->transaction_id(), |
1170 | 186k | tablet.leader_term, |
1171 | 186k | [resp, context_ptr, clock, peer = tablet.peer](Result<TransactionStatusResult> result) { |
1172 | 184k | resp->set_propagated_hybrid_time(clock->Now().ToUint64()); |
1173 | 184k | Status status; |
1174 | 184k | if (result.ok()184k ) { |
1175 | 184k | auto leader_safe_time = peer->LeaderSafeTime(); |
1176 | 184k | if (leader_safe_time.ok()) { |
1177 | 184k | resp->set_status(result->status); |
1178 | 184k | if (result->status_time.is_valid()) { |
1179 | 22.5k | resp->set_status_hybrid_time(result->status_time.ToUint64()); |
1180 | 22.5k | } |
1181 | | // See comment above WaitForSafeTime in TransactionStatusCache::DoGetCommitData |
1182 | | // for details. |
1183 | 184k | resp->set_coordinator_safe_time(leader_safe_time->ToUint64()); |
1184 | 184k | context_ptr->RespondSuccess(); |
1185 | 184k | return; |
1186 | 184k | } |
1187 | | |
1188 | 11 | status = leader_safe_time.status(); |
1189 | 18.4E | } else { |
1190 | 18.4E | status = result.status(); |
1191 | 18.4E | } |
1192 | 18.4E | SetupErrorAndRespond(resp->mutable_error(), status, context_ptr.get()); |
1193 | 18.4E | }); |
1194 | 186k | } |
1195 | | |
1196 | | void TabletServiceImpl::Truncate(const TruncateRequestPB* req, |
1197 | | TruncateResponsePB* resp, |
1198 | 57.2k | rpc::RpcContext context) { |
1199 | 57.2k | TRACE("Truncate"); |
1200 | | |
1201 | 57.2k | UpdateClock(*req, server_->Clock()); |
1202 | | |
1203 | 57.2k | auto tablet = LookupLeaderTabletOrRespond( |
1204 | 57.2k | server_->tablet_peer_lookup(), req->tablet_id(), resp, &context); |
1205 | 57.2k | if (!tablet) { |
1206 | 5 | return; |
1207 | 5 | } |
1208 | | |
1209 | 57.1k | auto operation = std::make_unique<TruncateOperation>(tablet.peer->tablet(), &req->truncate()); |
1210 | | |
1211 | 57.1k | operation->set_completion_callback( |
1212 | 57.1k | MakeRpcOperationCompletionCallback(std::move(context), resp, server_->Clock())); |
1213 | | |
1214 | | // Submit the truncate tablet op. The RPC will be responded to asynchronously. |
1215 | 57.1k | tablet.peer->Submit(std::move(operation), tablet.leader_term); |
1216 | 57.1k | } |
1217 | | |
1218 | | void TabletServiceAdminImpl::CreateTablet(const CreateTabletRequestPB* req, |
1219 | | CreateTabletResponsePB* resp, |
1220 | 139k | rpc::RpcContext context) { |
1221 | 139k | if (!CheckUuidMatchOrRespond(server_->tablet_manager(), "CreateTablet", req, resp, &context)) { |
1222 | 0 | return; |
1223 | 0 | } |
1224 | 139k | auto status = DoCreateTablet(req, resp); |
1225 | 139k | if (!status.ok()) { |
1226 | 12 | SetupErrorAndRespond(resp->mutable_error(), status, &context); |
1227 | 139k | } else { |
1228 | 139k | context.RespondSuccess(); |
1229 | 139k | } |
1230 | 139k | } |
1231 | | |
1232 | | Status TabletServiceAdminImpl::DoCreateTablet(const CreateTabletRequestPB* req, |
1233 | 138k | CreateTabletResponsePB* resp) { |
1234 | 138k | if (PREDICT_FALSE(FLAGS_TEST_txn_status_table_tablet_creation_delay_ms > 0 && |
1235 | 138k | req->table_type() == TableType::TRANSACTION_STATUS_TABLE_TYPE)) { |
1236 | 142 | std::this_thread::sleep_for(FLAGS_TEST_txn_status_table_tablet_creation_delay_ms * 1ms); |
1237 | 142 | } |
1238 | | |
1239 | 138k | DVLOG(3) << "Received CreateTablet RPC: " << yb::ToString(*req)15 ; |
1240 | 138k | TRACE_EVENT1("tserver", "CreateTablet", |
1241 | 138k | "tablet_id", req->tablet_id()); |
1242 | | |
1243 | 138k | Schema schema; |
1244 | 138k | PartitionSchema partition_schema; |
1245 | 138k | auto status = SchemaFromPB(req->schema(), &schema); |
1246 | 138k | if (status.ok()) { |
1247 | 138k | DCHECK(schema.has_column_ids()); |
1248 | 138k | status = PartitionSchema::FromPB(req->partition_schema(), schema, &partition_schema); |
1249 | 138k | } |
1250 | 138k | if (!status.ok()) { |
1251 | 0 | return status.CloneAndAddErrorCode(TabletServerError(TabletServerErrorPB::INVALID_SCHEMA)); |
1252 | 0 | } |
1253 | | |
1254 | 138k | Partition partition; |
1255 | 138k | Partition::FromPB(req->partition(), &partition); |
1256 | | |
1257 | 138k | LOG(INFO) << "Processing CreateTablet for T " << req->tablet_id() << " P " << req->dest_uuid() |
1258 | 138k | << " (table=" << req->table_name() |
1259 | 138k | << " [id=" << req->table_id() << "]), partition=" |
1260 | 138k | << partition_schema.PartitionDebugString(partition, schema); |
1261 | 18.4E | VLOG(1) << "Full request: " << req->DebugString(); |
1262 | | |
1263 | 138k | auto table_info = std::make_shared<tablet::TableInfo>( |
1264 | 138k | req->table_id(), req->namespace_name(), req->table_name(), req->table_type(), schema, |
1265 | 138k | IndexMap(), |
1266 | 138k | req->has_index_info() ? boost::optional<IndexInfo>(req->index_info())14.7k : boost::none124k , |
1267 | 138k | 0 /* schema_version */, partition_schema); |
1268 | 138k | std::vector<SnapshotScheduleId> snapshot_schedules; |
1269 | 138k | snapshot_schedules.reserve(req->snapshot_schedules().size()); |
1270 | 138k | for (const auto& id : req->snapshot_schedules()) { |
1271 | 71 | snapshot_schedules.push_back(VERIFY_RESULT(FullyDecodeSnapshotScheduleId(id))); |
1272 | 71 | } |
1273 | 138k | status = ResultToStatus(server_->tablet_manager()->CreateNewTablet( |
1274 | 138k | table_info, req->tablet_id(), partition, req->config(), req->colocated(), |
1275 | 138k | snapshot_schedules)); |
1276 | 138k | if (PREDICT_FALSE(!status.ok())) { |
1277 | 12 | return status.IsAlreadyPresent() |
1278 | 12 | ? status.CloneAndAddErrorCode(TabletServerError(TabletServerErrorPB::TABLET_ALREADY_EXISTS))1 |
1279 | 12 | : status11 ; |
1280 | 12 | } |
1281 | 138k | return Status::OK(); |
1282 | 138k | } |
1283 | | |
1284 | | void TabletServiceAdminImpl::DeleteTablet(const DeleteTabletRequestPB* req, |
1285 | | DeleteTabletResponsePB* resp, |
1286 | 77.4k | rpc::RpcContext context) { |
1287 | 77.4k | if (PREDICT_FALSE(FLAGS_TEST_rpc_delete_tablet_fail)) { |
1288 | 0 | context.RespondFailure(STATUS(NetworkError, "Simulating network partition for test")); |
1289 | 0 | return; |
1290 | 0 | } |
1291 | | |
1292 | 77.4k | if (!CheckUuidMatchOrRespond(server_->tablet_manager(), "DeleteTablet", req, resp, &context)) { |
1293 | 1 | return; |
1294 | 1 | } |
1295 | 77.4k | TRACE_EVENT2("tserver", "DeleteTablet", |
1296 | 77.4k | "tablet_id", req->tablet_id(), |
1297 | 77.4k | "reason", req->reason()); |
1298 | | |
1299 | 77.4k | tablet::TabletDataState delete_type = tablet::TABLET_DATA_UNKNOWN; |
1300 | 77.4k | if (req->has_delete_type()) { |
1301 | 76.9k | delete_type = req->delete_type(); |
1302 | 76.9k | } |
1303 | 77.4k | LOG(INFO) << "T " << req->tablet_id() << " P " << server_->permanent_uuid() |
1304 | 77.4k | << ": Processing DeleteTablet with delete_type " << TabletDataState_Name(delete_type) |
1305 | 77.4k | << (req->has_reason() ? (" (" + req->reason() + ")")77.4k : ""20 ) |
1306 | 77.4k | << (req->hide_only() ? " (Hide only)"36 : ""77.4k ) |
1307 | 77.4k | << " from " << context.requestor_string(); |
1308 | 18.4E | VLOG(1) << "Full request: " << req->DebugString(); |
1309 | | |
1310 | 77.4k | boost::optional<int64_t> cas_config_opid_index_less_or_equal; |
1311 | 77.4k | if (req->has_cas_config_opid_index_less_or_equal()) { |
1312 | 3.47k | cas_config_opid_index_less_or_equal = req->cas_config_opid_index_less_or_equal(); |
1313 | 3.47k | } |
1314 | 77.4k | boost::optional<TabletServerErrorPB::Code> error_code; |
1315 | 77.4k | Status s = server_->tablet_manager()->DeleteTablet(req->tablet_id(), |
1316 | 77.4k | delete_type, |
1317 | 77.4k | cas_config_opid_index_less_or_equal, |
1318 | 77.4k | req->hide_only(), |
1319 | 77.4k | &error_code); |
1320 | 77.4k | if (PREDICT_FALSE(!s.ok())) { |
1321 | 2.00k | HandleErrorResponse(resp, &context, s, error_code); |
1322 | 2.00k | return; |
1323 | 2.00k | } |
1324 | 75.4k | context.RespondSuccess(); |
1325 | 75.4k | } |
1326 | | |
1327 | | // TODO(sagnik): Modify this to actually create a copartitioned table |
1328 | | void TabletServiceAdminImpl::CopartitionTable(const CopartitionTableRequestPB* req, |
1329 | | CopartitionTableResponsePB* resp, |
1330 | 0 | rpc::RpcContext context) { |
1331 | 0 | context.RespondSuccess(); |
1332 | 0 | LOG(INFO) << "tserver doesn't support co-partitioning yet"; |
1333 | 0 | } |
1334 | | |
1335 | | void TabletServiceAdminImpl::FlushTablets(const FlushTabletsRequestPB* req, |
1336 | | FlushTabletsResponsePB* resp, |
1337 | 43 | rpc::RpcContext context) { |
1338 | 43 | if (!CheckUuidMatchOrRespond(server_->tablet_manager(), "FlushTablets", req, resp, &context)) { |
1339 | 0 | return; |
1340 | 0 | } |
1341 | | |
1342 | 43 | if (!req->all_tablets() && req->tablet_ids_size() == 035 ) { |
1343 | 0 | const Status s = STATUS(InvalidArgument, "No tablet ids"); |
1344 | 0 | SetupErrorAndRespond(resp->mutable_error(), s, &context); |
1345 | 0 | return; |
1346 | 0 | } |
1347 | | |
1348 | 43 | server::UpdateClock(*req, server_->Clock()); |
1349 | | |
1350 | 43 | TRACE_EVENT1("tserver", "FlushTablets", |
1351 | 43 | "TS: ", req->dest_uuid()); |
1352 | | |
1353 | 43 | LOG(INFO) << "Processing FlushTablets from " << context.requestor_string(); |
1354 | 43 | VLOG(1) << "Full FlushTablets request: " << req->DebugString()0 ; |
1355 | 43 | TabletPeers tablet_peers; |
1356 | 43 | TSTabletManager::TabletPtrs tablet_ptrs; |
1357 | | |
1358 | 43 | if (req->all_tablets()) { |
1359 | 8 | tablet_peers = server_->tablet_manager()->GetTabletPeers(&tablet_ptrs); |
1360 | 35 | } else { |
1361 | 50 | for (const TabletId& id : req->tablet_ids()) { |
1362 | 50 | auto tablet_peer = VERIFY_RESULT_OR_RETURN(LookupTabletPeerOrRespond( |
1363 | 50 | server_->tablet_peer_lookup(), id, resp, &context)); |
1364 | 50 | tablet_peers.push_back(std::move(tablet_peer.tablet_peer)); |
1365 | 50 | auto tablet = tablet_peer.tablet; |
1366 | 50 | if (tablet != nullptr) { |
1367 | 50 | tablet_ptrs.push_back(std::move(tablet)); |
1368 | 50 | } |
1369 | 50 | } |
1370 | 35 | } |
1371 | 43 | switch (req->operation()) { |
1372 | 35 | case FlushTabletsRequestPB::FLUSH: |
1373 | 50 | for (const tablet::TabletPtr& tablet : tablet_ptrs) { |
1374 | 50 | resp->set_failed_tablet_id(tablet->tablet_id()); |
1375 | 50 | RETURN_UNKNOWN_ERROR_IF_NOT_OK(tablet->Flush(tablet::FlushMode::kAsync), resp, &context); |
1376 | 50 | resp->clear_failed_tablet_id(); |
1377 | 50 | } |
1378 | | |
1379 | | // Wait for end of all flush operations. |
1380 | 50 | for (const tablet::TabletPtr& tablet : tablet_ptrs)35 { |
1381 | 50 | resp->set_failed_tablet_id(tablet->tablet_id()); |
1382 | 50 | RETURN_UNKNOWN_ERROR_IF_NOT_OK(tablet->WaitForFlush(), resp, &context); |
1383 | 50 | resp->clear_failed_tablet_id(); |
1384 | 50 | } |
1385 | 35 | break; |
1386 | 35 | case FlushTabletsRequestPB::COMPACT: |
1387 | 0 | RETURN_UNKNOWN_ERROR_IF_NOT_OK( |
1388 | 0 | server_->tablet_manager()->TriggerCompactionAndWait(tablet_ptrs), resp, &context); |
1389 | 0 | break; |
1390 | 8 | case FlushTabletsRequestPB::LOG_GC: |
1391 | 8 | for (const auto& tablet : tablet_peers) { |
1392 | 8 | resp->set_failed_tablet_id(tablet->tablet_id()); |
1393 | 8 | RETURN_UNKNOWN_ERROR_IF_NOT_OK(tablet->RunLogGC(), resp, &context); |
1394 | 8 | resp->clear_failed_tablet_id(); |
1395 | 8 | } |
1396 | 8 | break; |
1397 | 43 | } |
1398 | | |
1399 | 43 | context.RespondSuccess(); |
1400 | 43 | } |
1401 | | |
1402 | | void TabletServiceAdminImpl::CountIntents( |
1403 | | const CountIntentsRequestPB* req, |
1404 | | CountIntentsResponsePB* resp, |
1405 | 0 | rpc::RpcContext context) { |
1406 | 0 | TSTabletManager::TabletPtrs tablet_ptrs; |
1407 | 0 | TabletPeers tablet_peers = server_->tablet_manager()->GetTabletPeers(&tablet_ptrs); |
1408 | 0 | int64_t total_intents = 0; |
1409 | | // TODO: do this in parallel. |
1410 | | // TODO: per-tablet intent counts. |
1411 | 0 | for (const auto& tablet : tablet_ptrs) { |
1412 | 0 | auto num_intents = tablet->CountIntents(); |
1413 | 0 | if (!num_intents.ok()) { |
1414 | 0 | SetupErrorAndRespond(resp->mutable_error(), num_intents.status(), &context); |
1415 | 0 | return; |
1416 | 0 | } |
1417 | 0 | total_intents += *num_intents; |
1418 | 0 | } |
1419 | 0 | resp->set_num_intents(total_intents); |
1420 | 0 | context.RespondSuccess(); |
1421 | 0 | } |
1422 | | |
1423 | | void TabletServiceAdminImpl::AddTableToTablet( |
1424 | | const AddTableToTabletRequestPB* req, AddTableToTabletResponsePB* resp, |
1425 | 126 | rpc::RpcContext context) { |
1426 | 126 | auto tablet_id = req->tablet_id(); |
1427 | | |
1428 | 126 | const auto tablet = |
1429 | 126 | LookupLeaderTabletOrRespond(server_->tablet_peer_lookup(), tablet_id, resp, &context); |
1430 | 126 | if (!tablet) { |
1431 | 0 | return; |
1432 | 0 | } |
1433 | 126 | DVLOG(3) << "Received AddTableToTablet RPC: " << yb::ToString(*req)0 ; |
1434 | | |
1435 | 126 | tablet::ChangeMetadataRequestPB change_req; |
1436 | 126 | *change_req.mutable_add_table() = req->add_table(); |
1437 | 126 | change_req.set_tablet_id(tablet_id); |
1438 | 126 | Status s = tablet::SyncReplicateChangeMetadataOperation( |
1439 | 126 | &change_req, tablet.peer.get(), tablet.leader_term); |
1440 | 126 | if (PREDICT_FALSE(!s.ok())) { |
1441 | 0 | SetupErrorAndRespond(resp->mutable_error(), s, &context); |
1442 | 0 | return; |
1443 | 0 | } |
1444 | 126 | context.RespondSuccess(); |
1445 | 126 | } |
1446 | | |
1447 | | void TabletServiceAdminImpl::RemoveTableFromTablet( |
1448 | | const RemoveTableFromTabletRequestPB* req, |
1449 | | RemoveTableFromTabletResponsePB* resp, |
1450 | 81 | rpc::RpcContext context) { |
1451 | 81 | auto tablet = |
1452 | 81 | LookupLeaderTabletOrRespond(server_->tablet_peer_lookup(), req->tablet_id(), resp, &context); |
1453 | 81 | if (!tablet) { |
1454 | 1 | return; |
1455 | 1 | } |
1456 | | |
1457 | 80 | tablet::ChangeMetadataRequestPB change_req; |
1458 | 80 | change_req.set_remove_table_id(req->remove_table_id()); |
1459 | 80 | change_req.set_tablet_id(req->tablet_id()); |
1460 | 80 | Status s = tablet::SyncReplicateChangeMetadataOperation( |
1461 | 80 | &change_req, tablet.peer.get(), tablet.leader_term); |
1462 | 80 | if (PREDICT_FALSE(!s.ok())) { |
1463 | 0 | SetupErrorAndRespond(resp->mutable_error(), s, &context); |
1464 | 0 | return; |
1465 | 0 | } |
1466 | 80 | context.RespondSuccess(); |
1467 | 80 | } |
1468 | | |
1469 | | void TabletServiceAdminImpl::SplitTablet( |
1470 | 141 | const tablet::SplitTabletRequestPB* req, SplitTabletResponsePB* resp, rpc::RpcContext context) { |
1471 | 141 | if (!CheckUuidMatchOrRespond(server_->tablet_manager(), "SplitTablet", req, resp, &context)) { |
1472 | 0 | return; |
1473 | 0 | } |
1474 | 141 | if (PREDICT_FALSE(FLAGS_TEST_fail_tablet_split_probability > 0) && |
1475 | 141 | RandomActWithProbability(FLAGS_TEST_fail_tablet_split_probability)0 ) { |
1476 | 0 | return SetupErrorAndRespond( |
1477 | 0 | resp->mutable_error(), |
1478 | 0 | STATUS(InvalidArgument, // Use InvalidArgument to hit IsDefinitelyPermanentError(). |
1479 | 0 | "Failing tablet split due to FLAGS_TEST_fail_tablet_split_probability"), |
1480 | 0 | TabletServerErrorPB::UNKNOWN_ERROR, |
1481 | 0 | &context); |
1482 | 0 | } |
1483 | 141 | TRACE_EVENT1("tserver", "SplitTablet", "tablet_id", req->tablet_id()); |
1484 | | |
1485 | 141 | server::UpdateClock(*req, server_->Clock()); |
1486 | 141 | auto leader_tablet_peer = |
1487 | 141 | LookupLeaderTabletOrRespond(server_->tablet_peer_lookup(), req->tablet_id(), resp, &context); |
1488 | 141 | if (!leader_tablet_peer) { |
1489 | 0 | return; |
1490 | 0 | } |
1491 | | |
1492 | 141 | { |
1493 | 141 | auto tablet_data_state = leader_tablet_peer.peer->data_state(); |
1494 | 141 | if (tablet_data_state != tablet::TABLET_DATA_READY) { |
1495 | 96 | auto s = tablet_data_state == tablet::TABLET_DATA_SPLIT_COMPLETED |
1496 | 96 | ? STATUS_FORMAT(AlreadyPresent, "Tablet $0 is already split.", req->tablet_id()) |
1497 | 96 | : STATUS_FORMAT0 ( |
1498 | 96 | InvalidArgument, "Invalid tablet $0 data state: $1", req->tablet_id(), |
1499 | 96 | tablet_data_state); |
1500 | 96 | SetupErrorAndRespond( |
1501 | 96 | resp->mutable_error(), s, TabletServerErrorPB::TABLET_NOT_RUNNING, &context); |
1502 | 96 | return; |
1503 | 96 | } |
1504 | 141 | } |
1505 | | |
1506 | 45 | auto state = std::make_unique<tablet::SplitOperation>( |
1507 | 45 | leader_tablet_peer.peer->tablet(), server_->tablet_manager(), req); |
1508 | | |
1509 | 45 | state->set_completion_callback( |
1510 | 45 | MakeRpcOperationCompletionCallback(std::move(context), resp, server_->Clock())); |
1511 | | |
1512 | 45 | leader_tablet_peer.peer->Submit(std::move(state), leader_tablet_peer.leader_term); |
1513 | 45 | } |
1514 | | |
1515 | | void TabletServiceAdminImpl::UpgradeYsql( |
1516 | | const UpgradeYsqlRequestPB* req, |
1517 | | UpgradeYsqlResponsePB* resp, |
1518 | 4 | rpc::RpcContext context) { |
1519 | 4 | LOG(INFO) << "Starting YSQL upgrade"; |
1520 | | |
1521 | 4 | pgwrapper::YsqlUpgradeHelper upgrade_helper(server_->pgsql_proxy_bind_address(), |
1522 | 4 | server_->GetSharedMemoryPostgresAuthKey(), |
1523 | 4 | FLAGS_heartbeat_interval_ms); |
1524 | 4 | const auto status = upgrade_helper.Upgrade(); |
1525 | 4 | if (!status.ok()) { |
1526 | 0 | LOG(INFO) << "YSQL upgrade failed: " << status; |
1527 | 0 | SetupErrorAndRespond(resp->mutable_error(), status, &context); |
1528 | 0 | return; |
1529 | 0 | } |
1530 | | |
1531 | 4 | LOG(INFO) << "YSQL upgrade done successfully"; |
1532 | 4 | context.RespondSuccess(); |
1533 | 4 | } |
1534 | | |
1535 | | |
1536 | 0 | bool EmptyWriteBatch(const docdb::KeyValueWriteBatchPB& write_batch) { |
1537 | 0 | return write_batch.write_pairs().empty() && write_batch.apply_external_transactions().empty(); |
1538 | 0 | } |
1539 | | |
1540 | | void TabletServiceImpl::Write(const WriteRequestPB* req, |
1541 | | WriteResponsePB* resp, |
1542 | 2.61M | rpc::RpcContext context) { |
1543 | 2.61M | if (FLAGS_TEST_tserver_noop_read_write) { |
1544 | 0 | for (int i = 0; i < req->ql_write_batch_size(); ++i) { |
1545 | 0 | resp->add_ql_response_batch(); |
1546 | 0 | } |
1547 | 0 | context.RespondSuccess(); |
1548 | 0 | return; |
1549 | 0 | } |
1550 | 2.61M | TRACE("Start Write"); |
1551 | 2.61M | TRACE_EVENT1("tserver", "TabletServiceImpl::Write", |
1552 | 2.61M | "tablet_id", req->tablet_id()); |
1553 | 2.61M | VLOG(2) << "Received Write RPC: " << req->DebugString()367 ; |
1554 | 2.61M | UpdateClock(*req, server_->Clock()); |
1555 | | |
1556 | 2.61M | auto tablet = LookupLeaderTabletOrRespond( |
1557 | 2.61M | server_->tablet_peer_lookup(), req->tablet_id(), resp, &context); |
1558 | 2.61M | if (!tablet || |
1559 | 2.61M | !CheckWriteThrottlingOrRespond( |
1560 | 2.56M | req->rejection_score(), tablet.peer.get(), resp, &context)) { |
1561 | 51.1k | return; |
1562 | 51.1k | } |
1563 | | |
1564 | 2.56M | if (tablet.peer->tablet()->metadata()->hidden()) { |
1565 | 3 | auto status = STATUS(NotFound, "Tablet not found", req->tablet_id()); |
1566 | 3 | SetupErrorAndRespond( |
1567 | 3 | resp->mutable_error(), status, TabletServerErrorPB::TABLET_NOT_FOUND, &context); |
1568 | 3 | return; |
1569 | 3 | } |
1570 | | |
1571 | | #if defined(DUMP_WRITE) |
1572 | | if (req->has_write_batch() && req->write_batch().has_transaction()) { |
1573 | | VLOG(1) << "Write with transaction: " << req->write_batch().transaction().ShortDebugString(); |
1574 | | if (req->pgsql_write_batch_size() != 0) { |
1575 | | auto txn_id = CHECK_RESULT(FullyDecodeTransactionId( |
1576 | | req->write_batch().transaction().transaction_id())); |
1577 | | for (const auto& entry : req->pgsql_write_batch()) { |
1578 | | if (entry.stmt_type() == PgsqlWriteRequestPB::PGSQL_UPDATE) { |
1579 | | auto key = entry.column_new_values(0).expr().value().int32_value(); |
1580 | | LOG(INFO) << txn_id << " UPDATE: " << key << " = " |
1581 | | << entry.column_new_values(1).expr().value().string_value(); |
1582 | | } else if ( |
1583 | | entry.stmt_type() == PgsqlWriteRequestPB::PGSQL_INSERT || |
1584 | | entry.stmt_type() == PgsqlWriteRequestPB::PGSQL_UPSERT) { |
1585 | | docdb::DocKey doc_key; |
1586 | | CHECK_OK(doc_key.FullyDecodeFrom(entry.ybctid_column_value().value().binary_value())); |
1587 | | LOG(INFO) << txn_id << " INSERT: " << doc_key.hashed_group()[0].GetInt32() << " = " |
1588 | | << entry.column_values(0).expr().value().string_value(); |
1589 | | } else if (entry.stmt_type() == PgsqlWriteRequestPB::PGSQL_DELETE) { |
1590 | | LOG(INFO) << txn_id << " DELETE: " << entry.ShortDebugString(); |
1591 | | } |
1592 | | } |
1593 | | } |
1594 | | } |
1595 | | #endif |
1596 | | |
1597 | 2.56M | if (PREDICT_FALSE(req->has_write_batch() && !req->has_external_hybrid_time() && |
1598 | 2.56M | (!req->write_batch().write_pairs().empty() || !req->write_batch().read_pairs().empty()))) { |
1599 | 0 | Status s = STATUS(NotSupported, "Write Request contains write batch. This field should be " |
1600 | 0 | "used only for post-processed write requests during " |
1601 | 0 | "Raft replication."); |
1602 | 0 | SetupErrorAndRespond(resp->mutable_error(), s, |
1603 | 0 | TabletServerErrorPB::INVALID_MUTATION, |
1604 | 0 | &context); |
1605 | 0 | return; |
1606 | 0 | } |
1607 | | |
1608 | 2.56M | bool has_operations = req->ql_write_batch_size() != 0 || |
1609 | 2.56M | req->redis_write_batch_size() != 0815k || |
1610 | 2.56M | req->pgsql_write_batch_size() != 0691k || |
1611 | 2.56M | (1 req->has_external_hybrid_time()1 && !EmptyWriteBatch(req->write_batch())0 ); |
1612 | 2.56M | if (!has_operations && tablet.peer->tablet()->table_type() != TableType::REDIS_TABLE_TYPE1 ) { |
1613 | | // An empty request. This is fine, can just exit early with ok status instead of working hard. |
1614 | | // This doesn't need to go to Raft log. |
1615 | 1 | MakeRpcOperationCompletionCallback<WriteResponsePB>( |
1616 | 1 | std::move(context), resp, server_->Clock())(Status::OK()); |
1617 | 1 | return; |
1618 | 1 | } |
1619 | | |
1620 | | // For postgres requests check that the syscatalog version matches. |
1621 | 2.56M | if (tablet.peer->tablet()->table_type() == TableType::PGSQL_TABLE_TYPE) { |
1622 | 514k | uint64_t last_breaking_catalog_version = 0; // unset. |
1623 | 6.43M | for (const auto& pg_req : req->pgsql_write_batch()) { |
1624 | 6.43M | if (pg_req.has_ysql_catalog_version()) { |
1625 | 6.43M | if (last_breaking_catalog_version == 0) { |
1626 | | // Initialize last breaking version if not yet set. |
1627 | 513k | server_->get_ysql_catalog_version(nullptr /* current_version */, |
1628 | 513k | &last_breaking_catalog_version); |
1629 | 513k | } |
1630 | 6.43M | if (pg_req.ysql_catalog_version() < last_breaking_catalog_version) { |
1631 | 5 | SetupErrorAndRespond(resp->mutable_error(), |
1632 | 5 | STATUS_SUBSTITUTE(QLError, "The catalog snapshot used for this " |
1633 | 5 | "transaction has been invalidated."), |
1634 | 5 | TabletServerErrorPB::MISMATCHED_SCHEMA, &context); |
1635 | 5 | return; |
1636 | 5 | } |
1637 | 6.43M | } |
1638 | 6.43M | } |
1639 | 514k | } |
1640 | | |
1641 | 2.56M | auto query = std::make_unique<tablet::WriteQuery>( |
1642 | 2.56M | tablet.leader_term, context.GetClientDeadline(), tablet.peer.get(), |
1643 | 2.56M | tablet.peer->tablet(), resp); |
1644 | 2.56M | query->set_client_request(*req); |
1645 | | |
1646 | 2.56M | auto context_ptr = std::make_shared<RpcContext>(std::move(context)); |
1647 | 2.56M | if (RandomActWithProbability(GetAtomicFlag(&FLAGS_TEST_respond_write_failed_probability))) { |
1648 | 0 | LOG(INFO) << "Responding with a failure to " << req->DebugString(); |
1649 | 0 | SetupErrorAndRespond(resp->mutable_error(), STATUS(LeaderHasNoLease, "TEST: Random failure"), |
1650 | 0 | context_ptr.get()); |
1651 | 2.56M | } else { |
1652 | 2.56M | query->set_callback(WriteQueryCompletionCallback( |
1653 | 2.56M | tablet.peer, context_ptr, resp, query.get(), server_->Clock(), req->include_trace())); |
1654 | 2.56M | } |
1655 | | |
1656 | 2.56M | query->AdjustYsqlQueryTransactionality(req->pgsql_write_batch_size()); |
1657 | | |
1658 | 2.56M | tablet.peer->WriteAsync(std::move(query)); |
1659 | 2.56M | } |
1660 | | |
1661 | | void TabletServiceImpl::Read(const ReadRequestPB* req, |
1662 | | ReadResponsePB* resp, |
1663 | 9.56M | rpc::RpcContext context) { |
1664 | 9.56M | if (FLAGS_TEST_tserver_noop_read_write) { |
1665 | 0 | context.RespondSuccess(); |
1666 | 0 | return; |
1667 | 0 | } |
1668 | | |
1669 | 9.56M | PerformRead(server_, this, req, resp, std::move(context)); |
1670 | 9.56M | } |
1671 | | |
1672 | | ConsensusServiceImpl::ConsensusServiceImpl(const scoped_refptr<MetricEntity>& metric_entity, |
1673 | | TabletPeerLookupIf* tablet_manager) |
1674 | | : ConsensusServiceIf(metric_entity), |
1675 | 16.7k | tablet_manager_(tablet_manager) { |
1676 | 16.7k | } |
1677 | | |
1678 | 182 | ConsensusServiceImpl::~ConsensusServiceImpl() { |
1679 | 182 | } |
1680 | | |
1681 | | void ConsensusServiceImpl::CompleteUpdateConsensusResponse( |
1682 | | std::shared_ptr<tablet::TabletPeer> tablet_peer, |
1683 | 25.5M | consensus::ConsensusResponsePB* resp) { |
1684 | 25.5M | auto tablet = tablet_peer->shared_tablet(); |
1685 | 25.5M | if (tablet25.5M ) { |
1686 | 25.5M | resp->set_num_sst_files(tablet->GetCurrentVersionNumSSTFiles()); |
1687 | 25.5M | } |
1688 | 25.5M | resp->set_propagated_hybrid_time(tablet_peer->clock().Now().ToUint64()); |
1689 | 25.5M | } |
1690 | | |
1691 | | void ConsensusServiceImpl::MultiRaftUpdateConsensus( |
1692 | | const consensus::MultiRaftConsensusRequestPB *req, |
1693 | | consensus::MultiRaftConsensusResponsePB *resp, |
1694 | 0 | rpc::RpcContext context) { |
1695 | 0 | DVLOG(3) << "Received Batch Consensus Update RPC: " << req->ShortDebugString(); |
1696 | | // Effectively performs ConsensusServiceImpl::UpdateConsensus for |
1697 | | // each ConsensusRequestPB in the batch but does not fail the entire |
1698 | | // batch if a single request fails. |
1699 | 0 | for (int i = 0; i < req->consensus_request_size(); i++) { |
1700 | | // Unfortunately, we have to use const_cast here, |
1701 | | // because the protobuf-generated interface only gives us a const request |
1702 | | // but we need to be able to move messages out of the request for efficiency. |
1703 | 0 | auto consensus_req = const_cast<ConsensusRequestPB*>(&req->consensus_request(i)); |
1704 | 0 | auto consensus_resp = resp->add_consensus_response();; |
1705 | |
|
1706 | 0 | auto uuid_match_res = CheckUuidMatch(tablet_manager_, "UpdateConsensus", consensus_req, |
1707 | 0 | context.requestor_string()); |
1708 | 0 | if (!uuid_match_res.ok()) { |
1709 | 0 | SetupError(consensus_resp->mutable_error(), uuid_match_res.status()); |
1710 | 0 | continue; |
1711 | 0 | } |
1712 | | |
1713 | 0 | auto peer_tablet_res = LookupTabletPeer(tablet_manager_, consensus_req->tablet_id()); |
1714 | 0 | if (!peer_tablet_res.ok()) { |
1715 | 0 | SetupError(consensus_resp->mutable_error(), peer_tablet_res.status()); |
1716 | 0 | continue; |
1717 | 0 | } |
1718 | 0 | auto tablet_peer = peer_tablet_res.get().tablet_peer; |
1719 | | |
1720 | | // Submit the update directly to the TabletPeer's Consensus instance. |
1721 | 0 | auto consensus_res = GetConsensus(tablet_peer); |
1722 | 0 | if (!consensus_res.ok()) { |
1723 | 0 | SetupError(consensus_resp->mutable_error(), consensus_res.status()); |
1724 | 0 | continue; |
1725 | 0 | } |
1726 | 0 | auto consensus = *consensus_res; |
1727 | |
|
1728 | 0 | Status s = consensus->Update( |
1729 | 0 | consensus_req, consensus_resp, context.GetClientDeadline()); |
1730 | 0 | if (PREDICT_FALSE(!s.ok())) { |
1731 | | // Clear the response first, since a partially-filled response could |
1732 | | // result in confusing a caller, or in having missing required fields |
1733 | | // in embedded optional messages. |
1734 | 0 | consensus_resp->Clear(); |
1735 | 0 | SetupError(consensus_resp->mutable_error(), s); |
1736 | 0 | continue; |
1737 | 0 | } |
1738 | | |
1739 | 0 | CompleteUpdateConsensusResponse(tablet_peer, consensus_resp); |
1740 | 0 | } |
1741 | 0 | context.RespondSuccess(); |
1742 | 0 | } |
1743 | | |
1744 | | void ConsensusServiceImpl::UpdateConsensus(const ConsensusRequestPB* req, |
1745 | | ConsensusResponsePB* resp, |
1746 | 25.5M | rpc::RpcContext context) { |
1747 | 25.5M | DVLOG(3) << "Received Consensus Update RPC: " << req->ShortDebugString()31.8k ; |
1748 | 25.5M | if (!CheckUuidMatchOrRespond(tablet_manager_, "UpdateConsensus", req, resp, &context)) { |
1749 | 95 | return; |
1750 | 95 | } |
1751 | 25.5M | auto peer_tablet = VERIFY_RESULT_OR_RETURN(LookupTabletPeerOrRespond( |
1752 | 25.5M | tablet_manager_, req->tablet_id(), resp, &context)); |
1753 | 25.5M | auto tablet_peer = peer_tablet.tablet_peer; |
1754 | | |
1755 | | // Submit the update directly to the TabletPeer's Consensus instance. |
1756 | 25.5M | shared_ptr<Consensus> consensus; |
1757 | 25.5M | if (!GetConsensusOrRespond(tablet_peer, resp, &context, &consensus)) return0 ; |
1758 | | |
1759 | | // Unfortunately, we have to use const_cast here, because the protobuf-generated interface only |
1760 | | // gives us a const request, but we need to be able to move messages out of the request for |
1761 | | // efficiency. |
1762 | 25.5M | Status s = consensus->Update( |
1763 | 25.5M | const_cast<ConsensusRequestPB*>(req), resp, context.GetClientDeadline()); |
1764 | 25.5M | if (PREDICT_FALSE(!s.ok())) { |
1765 | | // Clear the response first, since a partially-filled response could |
1766 | | // result in confusing a caller, or in having missing required fields |
1767 | | // in embedded optional messages. |
1768 | 1.93k | resp->Clear(); |
1769 | | |
1770 | 1.93k | SetupErrorAndRespond(resp->mutable_error(), s, &context); |
1771 | 1.93k | return; |
1772 | 1.93k | } |
1773 | | |
1774 | 25.5M | CompleteUpdateConsensusResponse(tablet_peer, resp); |
1775 | | |
1776 | 25.5M | context.RespondSuccess(); |
1777 | 25.5M | } |
1778 | | |
1779 | | void ConsensusServiceImpl::RequestConsensusVote(const VoteRequestPB* req, |
1780 | | VoteResponsePB* resp, |
1781 | 1.39M | rpc::RpcContext context) { |
1782 | 1.39M | DVLOG(3) << "Received Consensus Request Vote RPC: " << req->DebugString()513 ; |
1783 | 1.39M | if (!CheckUuidMatchOrRespond(tablet_manager_, "RequestConsensusVote", req, resp, &context)) { |
1784 | 2.26k | return; |
1785 | 2.26k | } |
1786 | 1.39M | auto peer_tablet = VERIFY_RESULT_OR_RETURN(LookupTabletPeerOrRespond( |
1787 | 1.36M | tablet_manager_, req->tablet_id(), resp, &context)); |
1788 | 1.36M | auto tablet_peer = peer_tablet.tablet_peer; |
1789 | | |
1790 | | // Submit the vote request directly to the consensus instance. |
1791 | 1.36M | shared_ptr<Consensus> consensus; |
1792 | 1.36M | if (!GetConsensusOrRespond(tablet_peer, resp, &context, &consensus)) return0 ; |
1793 | 1.36M | Status s = consensus->RequestVote(req, resp); |
1794 | 1.36M | RETURN_UNKNOWN_ERROR_IF_NOT_OK(s, resp, &context); |
1795 | 1.36M | context.RespondSuccess(); |
1796 | 1.36M | } |
1797 | | |
1798 | | void ConsensusServiceImpl::ChangeConfig(const ChangeConfigRequestPB* req, |
1799 | | ChangeConfigResponsePB* resp, |
1800 | 5.29k | RpcContext context) { |
1801 | 5.29k | VLOG(1) << "Received ChangeConfig RPC: " << req->ShortDebugString()4 ; |
1802 | | // If the destination uuid is empty string, it means the client was retrying after a leader |
1803 | | // stepdown and did not have a chance to update the uuid inside the request. |
1804 | | // TODO: Note that this can be removed once Java YBClient will reset change config's uuid |
1805 | | // correctly after leader step down. |
1806 | 5.29k | if (req->dest_uuid() != "" && |
1807 | 5.29k | !CheckUuidMatchOrRespond(tablet_manager_, "ChangeConfig", req, resp, &context)5.15k ) { |
1808 | 0 | return; |
1809 | 0 | } |
1810 | 5.29k | auto peer_tablet = VERIFY_RESULT_OR_RETURN(LookupTabletPeerOrRespond( |
1811 | 5.29k | tablet_manager_, req->tablet_id(), resp, &context)); |
1812 | 5.29k | auto tablet_peer = peer_tablet.tablet_peer; |
1813 | | |
1814 | 5.29k | shared_ptr<Consensus> consensus; |
1815 | 5.29k | if (!GetConsensusOrRespond(tablet_peer, resp, &context, &consensus)) return0 ; |
1816 | 5.29k | boost::optional<TabletServerErrorPB::Code> error_code; |
1817 | 5.29k | std::shared_ptr<RpcContext> context_ptr = std::make_shared<RpcContext>(std::move(context)); |
1818 | 5.29k | Status s = consensus->ChangeConfig(*req, BindHandleResponse(resp, context_ptr), &error_code); |
1819 | 18.4E | VLOG(1) << "Sent ChangeConfig req " << req->ShortDebugString() << " to consensus layer."; |
1820 | 5.29k | if (PREDICT_FALSE(!s.ok())) { |
1821 | 1.67k | HandleErrorResponse(resp, context_ptr.get(), s, error_code); |
1822 | 1.67k | return; |
1823 | 1.67k | } |
1824 | | // The success case is handled when the callback fires. |
1825 | 5.29k | } |
1826 | | |
1827 | | void ConsensusServiceImpl::UnsafeChangeConfig(const UnsafeChangeConfigRequestPB* req, |
1828 | | UnsafeChangeConfigResponsePB* resp, |
1829 | 6 | RpcContext context) { |
1830 | 6 | VLOG(1) << "Received UnsafeChangeConfig RPC: " << req->ShortDebugString()0 ; |
1831 | 6 | if (!CheckUuidMatchOrRespond(tablet_manager_, "UnsafeChangeConfig", req, resp, &context)) { |
1832 | 0 | return; |
1833 | 0 | } |
1834 | 6 | auto peer_tablet = VERIFY_RESULT_OR_RETURN(LookupTabletPeerOrRespond( |
1835 | 6 | tablet_manager_, req->tablet_id(), resp, &context)); |
1836 | 6 | auto tablet_peer = peer_tablet.tablet_peer; |
1837 | | |
1838 | 6 | shared_ptr<Consensus> consensus; |
1839 | 6 | if (!GetConsensusOrRespond(tablet_peer, resp, &context, &consensus)) { |
1840 | 0 | return; |
1841 | 0 | } |
1842 | 6 | boost::optional<TabletServerErrorPB::Code> error_code; |
1843 | 6 | const Status s = consensus->UnsafeChangeConfig(*req, &error_code); |
1844 | 6 | if (PREDICT_FALSE(!s.ok())) { |
1845 | 0 | SetupErrorAndRespond(resp->mutable_error(), s, &context); |
1846 | 0 | HandleErrorResponse(resp, &context, s, error_code); |
1847 | 0 | return; |
1848 | 0 | } |
1849 | 6 | context.RespondSuccess(); |
1850 | 6 | } |
1851 | | |
1852 | | void ConsensusServiceImpl::GetNodeInstance(const GetNodeInstanceRequestPB* req, |
1853 | | GetNodeInstanceResponsePB* resp, |
1854 | 20.9k | rpc::RpcContext context) { |
1855 | 20.9k | DVLOG(3) << "Received Get Node Instance RPC: " << req->DebugString()6 ; |
1856 | 20.9k | resp->mutable_node_instance()->CopyFrom(tablet_manager_->NodeInstance()); |
1857 | 20.9k | auto status = tablet_manager_->GetRegistration(resp->mutable_registration()); |
1858 | 20.9k | if (!status.ok()) { |
1859 | 0 | context.RespondFailure(status); |
1860 | 20.9k | } else { |
1861 | 20.9k | context.RespondSuccess(); |
1862 | 20.9k | } |
1863 | 20.9k | } |
1864 | | |
1865 | | namespace { |
1866 | | |
1867 | | class RpcScope { |
1868 | | public: |
1869 | | template<class Req, class Resp> |
1870 | | RpcScope(TabletPeerLookupIf* tablet_manager, |
1871 | | const char* method_name, |
1872 | | const Req* req, |
1873 | | Resp* resp, |
1874 | | rpc::RpcContext* context) |
1875 | 154k | : context_(context) { |
1876 | 154k | if (!CheckUuidMatchOrRespond(tablet_manager, method_name, req, resp, context)) { |
1877 | 0 | return; |
1878 | 0 | } |
1879 | 154k | auto peer_tablet = VERIFY_RESULT_OR_RETURN(LookupTabletPeerOrRespond( |
1880 | 116k | tablet_manager, req->tablet_id(), resp, context)); |
1881 | 116k | auto tablet_peer = peer_tablet.tablet_peer; |
1882 | | |
1883 | 116k | if (!GetConsensusOrRespond(tablet_peer, resp, context, &consensus_)) { |
1884 | 0 | return; |
1885 | 0 | } |
1886 | 116k | responded_ = false; |
1887 | 116k | } tablet_service.cc:yb::tserver::(anonymous namespace)::RpcScope::RpcScope<yb::consensus::RunLeaderElectionRequestPB, yb::consensus::RunLeaderElectionResponsePB>(yb::tserver::TabletPeerLookupIf*, char const*, yb::consensus::RunLeaderElectionRequestPB const*, yb::consensus::RunLeaderElectionResponsePB*, yb::rpc::RpcContext*) Line | Count | Source | 1875 | 96.2k | : context_(context) { | 1876 | 96.2k | if (!CheckUuidMatchOrRespond(tablet_manager, method_name, req, resp, context)) { | 1877 | 0 | return; | 1878 | 0 | } | 1879 | 96.2k | auto peer_tablet = VERIFY_RESULT_OR_RETURN(LookupTabletPeerOrRespond( | 1880 | 57.9k | tablet_manager, req->tablet_id(), resp, context)); | 1881 | 57.9k | auto tablet_peer = peer_tablet.tablet_peer; | 1882 | | | 1883 | 57.9k | if (!GetConsensusOrRespond(tablet_peer, resp, context, &consensus_)) { | 1884 | 0 | return; | 1885 | 0 | } | 1886 | 57.9k | responded_ = false; | 1887 | 57.9k | } |
tablet_service.cc:yb::tserver::(anonymous namespace)::RpcScope::RpcScope<yb::consensus::LeaderElectionLostRequestPB, yb::consensus::LeaderElectionLostResponsePB>(yb::tserver::TabletPeerLookupIf*, char const*, yb::consensus::LeaderElectionLostRequestPB const*, yb::consensus::LeaderElectionLostResponsePB*, yb::rpc::RpcContext*) Line | Count | Source | 1875 | 77 | : context_(context) { | 1876 | 77 | if (!CheckUuidMatchOrRespond(tablet_manager, method_name, req, resp, context)) { | 1877 | 0 | return; | 1878 | 0 | } | 1879 | 77 | auto peer_tablet = VERIFY_RESULT_OR_RETURN(LookupTabletPeerOrRespond( | 1880 | 77 | tablet_manager, req->tablet_id(), resp, context)); | 1881 | 77 | auto tablet_peer = peer_tablet.tablet_peer; | 1882 | | | 1883 | 77 | if (!GetConsensusOrRespond(tablet_peer, resp, context, &consensus_)) { | 1884 | 0 | return; | 1885 | 0 | } | 1886 | 77 | responded_ = false; | 1887 | 77 | } |
tablet_service.cc:yb::tserver::(anonymous namespace)::RpcScope::RpcScope<yb::consensus::LeaderStepDownRequestPB, yb::consensus::LeaderStepDownResponsePB>(yb::tserver::TabletPeerLookupIf*, char const*, yb::consensus::LeaderStepDownRequestPB const*, yb::consensus::LeaderStepDownResponsePB*, yb::rpc::RpcContext*) Line | Count | Source | 1875 | 54.1k | : context_(context) { | 1876 | 54.1k | if (!CheckUuidMatchOrRespond(tablet_manager, method_name, req, resp, context)) { | 1877 | 0 | return; | 1878 | 0 | } | 1879 | 54.1k | auto peer_tablet = VERIFY_RESULT_OR_RETURN(LookupTabletPeerOrRespond( | 1880 | 54.1k | tablet_manager, req->tablet_id(), resp, context)); | 1881 | 54.1k | auto tablet_peer = peer_tablet.tablet_peer; | 1882 | | | 1883 | 54.1k | if (!GetConsensusOrRespond(tablet_peer, resp, context, &consensus_)) { | 1884 | 0 | return; | 1885 | 0 | } | 1886 | 54.1k | responded_ = false; | 1887 | 54.1k | } |
tablet_service.cc:yb::tserver::(anonymous namespace)::RpcScope::RpcScope<yb::consensus::GetConsensusStateRequestPB, yb::consensus::GetConsensusStateResponsePB>(yb::tserver::TabletPeerLookupIf*, char const*, yb::consensus::GetConsensusStateRequestPB const*, yb::consensus::GetConsensusStateResponsePB*, yb::rpc::RpcContext*) Line | Count | Source | 1875 | 4.06k | : context_(context) { | 1876 | 4.06k | if (!CheckUuidMatchOrRespond(tablet_manager, method_name, req, resp, context)) { | 1877 | 0 | return; | 1878 | 0 | } | 1879 | 4.06k | auto peer_tablet = VERIFY_RESULT_OR_RETURN(LookupTabletPeerOrRespond( | 1880 | 3.88k | tablet_manager, req->tablet_id(), resp, context)); | 1881 | 3.88k | auto tablet_peer = peer_tablet.tablet_peer; | 1882 | | | 1883 | 3.88k | if (!GetConsensusOrRespond(tablet_peer, resp, context, &consensus_)) { | 1884 | 0 | return; | 1885 | 0 | } | 1886 | 3.88k | responded_ = false; | 1887 | 3.88k | } |
|
1888 | | |
1889 | 154k | ~RpcScope() { |
1890 | 154k | if (!responded_) { |
1891 | 115k | context_->RespondSuccess(); |
1892 | 115k | } |
1893 | 154k | } |
1894 | | |
1895 | | template<class Resp> |
1896 | 112k | void CheckStatus(const Status& status, Resp* resp) { |
1897 | 112k | if (!status.ok()) { |
1898 | 0 | LOG(INFO) << "Status failed: " << status.ToString(); |
1899 | 0 | SetupErrorAndRespond(resp->mutable_error(), status, context_); |
1900 | 0 | responded_ = true; |
1901 | 0 | } |
1902 | 112k | } tablet_service.cc:void yb::tserver::(anonymous namespace)::RpcScope::CheckStatus<yb::consensus::RunLeaderElectionResponsePB>(yb::Status const&, yb::consensus::RunLeaderElectionResponsePB*) Line | Count | Source | 1896 | 57.8k | void CheckStatus(const Status& status, Resp* resp) { | 1897 | 57.8k | if (!status.ok()) { | 1898 | 0 | LOG(INFO) << "Status failed: " << status.ToString(); | 1899 | 0 | SetupErrorAndRespond(resp->mutable_error(), status, context_); | 1900 | 0 | responded_ = true; | 1901 | 0 | } | 1902 | 57.8k | } |
tablet_service.cc:void yb::tserver::(anonymous namespace)::RpcScope::CheckStatus<yb::consensus::LeaderElectionLostResponsePB>(yb::Status const&, yb::consensus::LeaderElectionLostResponsePB*) Line | Count | Source | 1896 | 77 | void CheckStatus(const Status& status, Resp* resp) { | 1897 | 77 | if (!status.ok()) { | 1898 | 0 | LOG(INFO) << "Status failed: " << status.ToString(); | 1899 | 0 | SetupErrorAndRespond(resp->mutable_error(), status, context_); | 1900 | 0 | responded_ = true; | 1901 | 0 | } | 1902 | 77 | } |
tablet_service.cc:void yb::tserver::(anonymous namespace)::RpcScope::CheckStatus<yb::consensus::LeaderStepDownResponsePB>(yb::Status const&, yb::consensus::LeaderStepDownResponsePB*) Line | Count | Source | 1896 | 54.1k | void CheckStatus(const Status& status, Resp* resp) { | 1897 | 54.1k | if (!status.ok()) { | 1898 | 0 | LOG(INFO) << "Status failed: " << status.ToString(); | 1899 | 0 | SetupErrorAndRespond(resp->mutable_error(), status, context_); | 1900 | 0 | responded_ = true; | 1901 | 0 | } | 1902 | 54.1k | } |
|
1903 | | |
1904 | 115k | Consensus* operator->() { |
1905 | 115k | return consensus_.get(); |
1906 | 115k | } |
1907 | | |
1908 | 154k | explicit operator bool() const { |
1909 | 154k | return !responded_; |
1910 | 154k | } |
1911 | | |
1912 | | private: |
1913 | | rpc::RpcContext* context_; |
1914 | | bool responded_ = true; |
1915 | | shared_ptr<Consensus> consensus_; |
1916 | | }; |
1917 | | |
1918 | | } // namespace |
1919 | | |
1920 | | void ConsensusServiceImpl::RunLeaderElection(const RunLeaderElectionRequestPB* req, |
1921 | | RunLeaderElectionResponsePB* resp, |
1922 | 96.3k | rpc::RpcContext context) { |
1923 | 96.3k | VLOG(1) << "Received Run Leader Election RPC: " << req->DebugString()157 ; |
1924 | 96.3k | RpcScope scope(tablet_manager_, "RunLeaderElection", req, resp, &context); |
1925 | 96.3k | if (!scope) { |
1926 | 38.3k | return; |
1927 | 38.3k | } |
1928 | | |
1929 | 57.9k | Status s = scope->StartElection(consensus::LeaderElectionData { |
1930 | 57.9k | .mode = consensus::ElectionMode::ELECT_EVEN_IF_LEADER_IS_ALIVE, |
1931 | 57.9k | .pending_commit = req->has_committed_index(), |
1932 | 57.9k | .must_be_committed_opid = OpId::FromPB(req->committed_index()), |
1933 | 57.9k | .originator_uuid = req->has_originator_uuid() ? req->originator_uuid()9.97k : std::string()47.9k , |
1934 | 57.9k | .suppress_vote_request = consensus::TEST_SuppressVoteRequest(req->suppress_vote_request()), |
1935 | 57.9k | .initial_election = req->initial_election() }); |
1936 | 57.9k | scope.CheckStatus(s, resp); |
1937 | 57.9k | } |
1938 | | |
1939 | | void ConsensusServiceImpl::LeaderElectionLost(const consensus::LeaderElectionLostRequestPB *req, |
1940 | | consensus::LeaderElectionLostResponsePB *resp, |
1941 | 77 | ::yb::rpc::RpcContext context) { |
1942 | 77 | LOG(INFO) << "LeaderElectionLost, req: " << req->ShortDebugString(); |
1943 | 77 | RpcScope scope(tablet_manager_, "LeaderElectionLost", req, resp, &context); |
1944 | 77 | if (!scope) { |
1945 | 0 | return; |
1946 | 0 | } |
1947 | 77 | auto status = scope->ElectionLostByProtege(req->election_lost_by_uuid()); |
1948 | 77 | scope.CheckStatus(status, resp); |
1949 | 77 | LOG(INFO) << "LeaderElectionLost, outcome: " << (scope ? "success" : "failure"0 ) << "req: " |
1950 | 77 | << req->ShortDebugString(); |
1951 | 77 | } |
1952 | | |
1953 | | void ConsensusServiceImpl::LeaderStepDown(const LeaderStepDownRequestPB* req, |
1954 | | LeaderStepDownResponsePB* resp, |
1955 | 54.1k | RpcContext context) { |
1956 | 54.1k | LOG(INFO) << "Received Leader stepdown RPC: " << req->ShortDebugString(); |
1957 | | |
1958 | 54.1k | if (PREDICT_FALSE(FLAGS_TEST_leader_stepdown_delay_ms > 0)) { |
1959 | 2 | LOG(INFO) << "Delaying leader stepdown for " |
1960 | 2 | << FLAGS_TEST_leader_stepdown_delay_ms << " ms."; |
1961 | 2 | SleepFor(MonoDelta::FromMilliseconds(FLAGS_TEST_leader_stepdown_delay_ms)); |
1962 | 2 | } |
1963 | | |
1964 | 54.1k | RpcScope scope(tablet_manager_, "LeaderStepDown", req, resp, &context); |
1965 | 54.1k | if (!scope) { |
1966 | 0 | return; |
1967 | 0 | } |
1968 | 54.1k | Status s = scope->StepDown(req, resp); |
1969 | 54.1k | LOG(INFO) << "Leader stepdown request " << req->ShortDebugString() << " success. Resp code=" |
1970 | 54.1k | << TabletServerErrorPB::Code_Name(resp->error().code()); |
1971 | 54.1k | scope.CheckStatus(s, resp); |
1972 | 54.1k | } |
1973 | | |
1974 | | void ConsensusServiceImpl::GetLastOpId(const consensus::GetLastOpIdRequestPB *req, |
1975 | | consensus::GetLastOpIdResponsePB *resp, |
1976 | 575 | rpc::RpcContext context) { |
1977 | 575 | DVLOG(3) << "Received GetLastOpId RPC: " << req->DebugString()0 ; |
1978 | | |
1979 | 575 | if (PREDICT_FALSE(req->opid_type() == consensus::UNKNOWN_OPID_TYPE)) { |
1980 | 0 | HandleErrorResponse(resp, &context, |
1981 | 0 | STATUS(InvalidArgument, "Invalid opid_type specified to GetLastOpId()")); |
1982 | 0 | return; |
1983 | 0 | } |
1984 | | |
1985 | 575 | if (!CheckUuidMatchOrRespond(tablet_manager_, "GetLastOpId", req, resp, &context)) { |
1986 | 0 | return; |
1987 | 0 | } |
1988 | 575 | auto peer_tablet = VERIFY_RESULT_OR_RETURN(LookupTabletPeerOrRespond( |
1989 | 556 | tablet_manager_, req->tablet_id(), resp, &context)); |
1990 | 556 | auto tablet_peer = peer_tablet.tablet_peer; |
1991 | | |
1992 | 556 | if (tablet_peer->state() != tablet::RUNNING) { |
1993 | 0 | SetupErrorAndRespond(resp->mutable_error(), |
1994 | 0 | STATUS(ServiceUnavailable, "Tablet Peer not in RUNNING state"), |
1995 | 0 | TabletServerErrorPB::TABLET_NOT_RUNNING, &context); |
1996 | 0 | return; |
1997 | 0 | } |
1998 | | |
1999 | 556 | auto consensus = GetConsensusOrRespond(tablet_peer, resp, &context); |
2000 | 556 | if (!consensus) return0 ; |
2001 | 556 | auto op_id = req->has_op_type() |
2002 | 556 | ? consensus->TEST_GetLastOpIdWithType(req->opid_type(), req->op_type())3 |
2003 | 556 | : consensus->GetLastOpId(req->opid_type())553 ; |
2004 | | |
2005 | | // RETURN_UNKNOWN_ERROR_IF_NOT_OK does not support Result, so have to add extra check here. |
2006 | 556 | if (!op_id.ok()) { |
2007 | 2 | RETURN_UNKNOWN_ERROR_IF_NOT_OK(op_id.status(), resp, &context); |
2008 | 2 | } |
2009 | 554 | op_id->ToPB(resp->mutable_opid()); |
2010 | 554 | context.RespondSuccess(); |
2011 | 554 | } |
2012 | | |
2013 | | void ConsensusServiceImpl::GetConsensusState(const consensus::GetConsensusStateRequestPB *req, |
2014 | | consensus::GetConsensusStateResponsePB *resp, |
2015 | 4.06k | rpc::RpcContext context) { |
2016 | 18.4E | DVLOG(3) << "Received GetConsensusState RPC: " << req->DebugString(); |
2017 | | |
2018 | 4.06k | RpcScope scope(tablet_manager_, "GetConsensusState", req, resp, &context); |
2019 | 4.06k | if (!scope) { |
2020 | 181 | return; |
2021 | 181 | } |
2022 | 3.88k | ConsensusConfigType type = req->type(); |
2023 | 3.88k | if (PREDICT_FALSE(type != CONSENSUS_CONFIG_ACTIVE && type != CONSENSUS_CONFIG_COMMITTED)) { |
2024 | 0 | HandleErrorResponse(resp, &context, |
2025 | 0 | STATUS(InvalidArgument, Substitute("Unsupported ConsensusConfigType $0 ($1)", |
2026 | 0 | ConsensusConfigType_Name(type), type))); |
2027 | 0 | return; |
2028 | 0 | } |
2029 | 3.88k | LeaderLeaseStatus leader_lease_status; |
2030 | 3.88k | *resp->mutable_cstate() = scope->ConsensusState(req->type(), &leader_lease_status); |
2031 | 3.88k | resp->set_leader_lease_status(leader_lease_status); |
2032 | 3.88k | } |
2033 | | |
2034 | | void ConsensusServiceImpl::StartRemoteBootstrap(const StartRemoteBootstrapRequestPB* req, |
2035 | | StartRemoteBootstrapResponsePB* resp, |
2036 | 10.6k | rpc::RpcContext context) { |
2037 | 10.6k | if (!CheckUuidMatchOrRespond(tablet_manager_, "StartRemoteBootstrap", req, resp, &context)) { |
2038 | 0 | return; |
2039 | 0 | } |
2040 | 10.6k | if (req->has_split_parent_tablet_id() |
2041 | 10.6k | && !88 PREDICT_FALSE88 (FLAGS_TEST_disable_post_split_tablet_rbs_check)) { |
2042 | | // For any tablet that was the result of a split, the raft group leader will always send the |
2043 | | // split_parent_tablet_id. However, our local tablet manager should only know about the parent |
2044 | | // if it was part of the raft group which committed the split to the parent, and if the parent |
2045 | | // tablet has yet to be deleted across the cluster. |
2046 | 86 | TabletPeerTablet result; |
2047 | 86 | if (tablet_manager_->GetTabletPeer(req->split_parent_tablet_id(), &result.tablet_peer).ok()) { |
2048 | 85 | YB_LOG_EVERY_N_SECS(WARNING, 30) |
2049 | 2 | << "Start remote bootstrap rejected: parent tablet not yet split."; |
2050 | 85 | SetupErrorAndRespond( |
2051 | 85 | resp->mutable_error(), |
2052 | 85 | STATUS(Incomplete, "Rejecting bootstrap request while parent tablet is present."), |
2053 | 85 | TabletServerErrorPB::TABLET_SPLIT_PARENT_STILL_LIVE, |
2054 | 85 | &context); |
2055 | 85 | return; |
2056 | 85 | } |
2057 | 86 | } |
2058 | | |
2059 | 10.5k | Status s = tablet_manager_->StartRemoteBootstrap(*req); |
2060 | 10.5k | if (!s.ok()) { |
2061 | | // Using Status::AlreadyPresent for a remote bootstrap operation that is already in progress. |
2062 | 8.54k | if (s.IsAlreadyPresent()) { |
2063 | 8.53k | YB_LOG_EVERY_N_SECS(WARNING, 30) << "Start remote bootstrap failed: " << s67 ; |
2064 | 8.53k | SetupErrorAndRespond(resp->mutable_error(), s, TabletServerErrorPB::ALREADY_IN_PROGRESS, |
2065 | 8.53k | &context); |
2066 | 8.53k | return; |
2067 | 8.53k | } else { |
2068 | 6 | LOG(WARNING) << "Start remote bootstrap failed: " << s; |
2069 | 6 | } |
2070 | 8.54k | } |
2071 | | |
2072 | 2.02k | RETURN_UNKNOWN_ERROR_IF_NOT_OK(s, resp, &context); |
2073 | 2.01k | context.RespondSuccess(); |
2074 | 2.01k | } |
2075 | | |
2076 | | void TabletServiceImpl::NoOp(const NoOpRequestPB *req, |
2077 | | NoOpResponsePB *resp, |
2078 | 0 | rpc::RpcContext context) { |
2079 | 0 | context.RespondSuccess(); |
2080 | 0 | } |
2081 | | |
2082 | | void TabletServiceImpl::Publish( |
2083 | 684 | const PublishRequestPB* req, PublishResponsePB* resp, rpc::RpcContext context) { |
2084 | 684 | rpc::Publisher* publisher = server_->GetPublisher(); |
2085 | 684 | resp->set_num_clients_forwarded_to(publisher ? (*publisher)(req->channel(), req->message())239 : 0445 ); |
2086 | 684 | context.RespondSuccess(); |
2087 | 684 | } |
2088 | | |
2089 | | void TabletServiceImpl::ListTablets(const ListTabletsRequestPB* req, |
2090 | | ListTabletsResponsePB* resp, |
2091 | 217 | rpc::RpcContext context) { |
2092 | 217 | TabletPeers peers = server_->tablet_manager()->GetTabletPeers(); |
2093 | 217 | RepeatedPtrField<StatusAndSchemaPB>* peer_status = resp->mutable_status_and_schema(); |
2094 | 3.19k | for (const TabletPeerPtr& peer : peers) { |
2095 | 3.19k | StatusAndSchemaPB* status = peer_status->Add(); |
2096 | 3.19k | peer->GetTabletStatusPB(status->mutable_tablet_status()); |
2097 | 3.19k | SchemaToPB(*peer->status_listener()->schema(), status->mutable_schema()); |
2098 | 3.19k | peer->tablet_metadata()->partition_schema()->ToPB(status->mutable_partition_schema()); |
2099 | 3.19k | } |
2100 | 217 | context.RespondSuccess(); |
2101 | 217 | } |
2102 | | |
2103 | | void TabletServiceImpl::GetMasterAddresses(const GetMasterAddressesRequestPB* req, |
2104 | | GetMasterAddressesResponsePB* resp, |
2105 | 9 | rpc::RpcContext context) { |
2106 | 9 | resp->set_master_addresses(server::MasterAddressesToString( |
2107 | 9 | *server_->tablet_manager()->server()->options().GetMasterAddresses())); |
2108 | 9 | context.RespondSuccess(); |
2109 | 9 | } |
2110 | | |
2111 | | void TabletServiceImpl::GetLogLocation( |
2112 | | const GetLogLocationRequestPB* req, |
2113 | | GetLogLocationResponsePB* resp, |
2114 | 0 | rpc::RpcContext context) { |
2115 | 0 | resp->set_log_location(FLAGS_log_dir); |
2116 | 0 | context.RespondSuccess(); |
2117 | 0 | } |
2118 | | |
2119 | | void TabletServiceImpl::ListTabletsForTabletServer(const ListTabletsForTabletServerRequestPB* req, |
2120 | | ListTabletsForTabletServerResponsePB* resp, |
2121 | 881 | rpc::RpcContext context) { |
2122 | | // Replicating logic from path-handlers. |
2123 | 881 | TabletPeers peers = server_->tablet_manager()->GetTabletPeers(); |
2124 | 4.41k | for (const TabletPeerPtr& peer : peers) { |
2125 | 4.41k | TabletStatusPB status; |
2126 | 4.41k | peer->GetTabletStatusPB(&status); |
2127 | | |
2128 | 4.41k | ListTabletsForTabletServerResponsePB::Entry* data_entry = resp->add_entries(); |
2129 | 4.41k | data_entry->set_table_name(status.table_name()); |
2130 | 4.41k | data_entry->set_tablet_id(status.tablet_id()); |
2131 | | |
2132 | 4.41k | std::shared_ptr<consensus::Consensus> consensus = peer->shared_consensus(); |
2133 | 4.41k | data_entry->set_is_leader(consensus && consensus->role() == PeerRole::LEADER4.39k ); |
2134 | 4.41k | data_entry->set_state(status.state()); |
2135 | | |
2136 | 4.41k | auto tablet = peer->shared_tablet(); |
2137 | 4.41k | uint64_t num_sst_files = tablet ? tablet->GetCurrentVersionNumSSTFiles()4.39k : 012 ; |
2138 | 4.41k | data_entry->set_num_sst_files(num_sst_files); |
2139 | | |
2140 | 4.41k | uint64_t num_log_segments = peer->GetNumLogSegments(); |
2141 | 4.41k | data_entry->set_num_log_segments(num_log_segments); |
2142 | | |
2143 | 4.41k | auto num_memtables = tablet ? tablet->GetNumMemtables()4.39k : std::make_pair(0, 0)12 ; |
2144 | 4.41k | data_entry->set_num_memtables_intents(num_memtables.first); |
2145 | 4.41k | data_entry->set_num_memtables_regular(num_memtables.second); |
2146 | 4.41k | } |
2147 | | |
2148 | 881 | context.RespondSuccess(); |
2149 | 881 | } |
2150 | | |
2151 | | namespace { |
2152 | | |
2153 | 3.29k | Result<uint64_t> CalcChecksum(tablet::Tablet* tablet, CoarseTimePoint deadline) { |
2154 | 3.29k | const shared_ptr<Schema> schema = tablet->metadata()->schema(); |
2155 | 3.29k | auto client_schema = schema->CopyWithoutColumnIds(); |
2156 | 3.29k | auto iter = tablet->NewRowIterator(client_schema, {}, "", deadline); |
2157 | 3.29k | RETURN_NOT_OK(iter); |
2158 | | |
2159 | 3.29k | QLTableRow value_map; |
2160 | 3.29k | ScanResultChecksummer collector; |
2161 | | |
2162 | 1.62M | while (VERIFY_RESULT((**iter).HasNext())) { |
2163 | 1.62M | RETURN_NOT_OK((**iter).NextRow(&value_map)); |
2164 | 1.62M | collector.HandleRow(*schema, value_map); |
2165 | 1.62M | } |
2166 | | |
2167 | 3.29k | return collector.agg_checksum(); |
2168 | 3.29k | } |
2169 | | |
2170 | | } // namespace |
2171 | | |
2172 | | Result<uint64_t> TabletServiceImpl::DoChecksum( |
2173 | 3.30k | const ChecksumRequestPB* req, CoarseTimePoint deadline) { |
2174 | 3.30k | auto abstract_tablet = VERIFY_RESULT3.30k (GetTablet( |
2175 | 3.30k | server_->tablet_peer_lookup(), req->tablet_id(), /* tablet_peer = */ nullptr, |
2176 | 3.30k | req->consistency_level(), AllowSplitTablet::kTrue)); |
2177 | 0 | return CalcChecksum(down_cast<tablet::Tablet*>(abstract_tablet.get()), deadline); |
2178 | 3.30k | } |
2179 | | |
2180 | | void TabletServiceImpl::Checksum(const ChecksumRequestPB* req, |
2181 | | ChecksumResponsePB* resp, |
2182 | 3.30k | rpc::RpcContext context) { |
2183 | 3.30k | VLOG(1) << "Full request: " << req->DebugString()7 ; |
2184 | | |
2185 | 3.30k | auto checksum = DoChecksum(req, context.GetClientDeadline()); |
2186 | 3.30k | if (!checksum.ok()) { |
2187 | 1 | SetupErrorAndRespond(resp->mutable_error(), checksum.status(), &context); |
2188 | 1 | return; |
2189 | 1 | } |
2190 | | |
2191 | 3.30k | resp->set_checksum(*checksum); |
2192 | 3.30k | context.RespondSuccess(); |
2193 | 3.30k | } |
2194 | | |
2195 | | void TabletServiceImpl::ImportData(const ImportDataRequestPB* req, |
2196 | | ImportDataResponsePB* resp, |
2197 | 0 | rpc::RpcContext context) { |
2198 | 0 | auto peer = VERIFY_RESULT_OR_RETURN(LookupTabletPeerOrRespond( |
2199 | 0 | server_->tablet_peer_lookup(), req->tablet_id(), resp, &context)); |
2200 | |
|
2201 | 0 | auto status = peer.tablet_peer->tablet()->ImportData(req->source_dir()); |
2202 | 0 | if (!status.ok()) { |
2203 | 0 | SetupErrorAndRespond(resp->mutable_error(), status, &context); |
2204 | 0 | return; |
2205 | 0 | } |
2206 | 0 | context.RespondSuccess(); |
2207 | 0 | } |
2208 | | |
2209 | | void TabletServiceImpl::GetTabletStatus(const GetTabletStatusRequestPB* req, |
2210 | | GetTabletStatusResponsePB* resp, |
2211 | 1 | rpc::RpcContext context) { |
2212 | 1 | const Status s = server_->GetTabletStatus(req, resp); |
2213 | 1 | if (!s.ok()) { |
2214 | 0 | SetupErrorAndRespond(resp->mutable_error(), s, |
2215 | 0 | s.IsNotFound() ? TabletServerErrorPB::TABLET_NOT_FOUND |
2216 | 0 | : TabletServerErrorPB::UNKNOWN_ERROR, |
2217 | 0 | &context); |
2218 | 0 | return; |
2219 | 0 | } |
2220 | 1 | context.RespondSuccess(); |
2221 | 1 | } |
2222 | | |
2223 | | void TabletServiceImpl::IsTabletServerReady(const IsTabletServerReadyRequestPB* req, |
2224 | | IsTabletServerReadyResponsePB* resp, |
2225 | 9 | rpc::RpcContext context) { |
2226 | 9 | Status s = server_->tablet_manager()->GetNumTabletsPendingBootstrap(resp); |
2227 | 9 | if (!s.ok()) { |
2228 | 0 | SetupErrorAndRespond(resp->mutable_error(), s, &context); |
2229 | 0 | return; |
2230 | 0 | } |
2231 | 9 | context.RespondSuccess(); |
2232 | 9 | } |
2233 | | |
2234 | | void TabletServiceImpl::TakeTransaction(const TakeTransactionRequestPB* req, |
2235 | | TakeTransactionResponsePB* resp, |
2236 | 0 | rpc::RpcContext context) { |
2237 | 0 | auto transaction = server_->TransactionPool()->Take( |
2238 | 0 | client::ForceGlobalTransaction(req->has_is_global() && req->is_global()), |
2239 | 0 | context.GetClientDeadline()); |
2240 | 0 | auto metadata = transaction->Release(); |
2241 | 0 | if (!metadata.ok()) { |
2242 | 0 | LOG(INFO) << "Take failed: " << metadata.status(); |
2243 | 0 | context.RespondFailure(metadata.status()); |
2244 | 0 | return; |
2245 | 0 | } |
2246 | 0 | metadata->ForceToPB(resp->mutable_metadata()); |
2247 | 0 | VLOG(2) << "Taken metadata: " << metadata->ToString(); |
2248 | 0 | context.RespondSuccess(); |
2249 | 0 | } |
2250 | | |
2251 | | void TabletServiceImpl::GetSplitKey( |
2252 | 144 | const GetSplitKeyRequestPB* req, GetSplitKeyResponsePB* resp, RpcContext context) { |
2253 | 144 | TEST_PAUSE_IF_FLAG(TEST_pause_tserver_get_split_key); |
2254 | 144 | PerformAtLeader(req, resp, &context, |
2255 | 144 | [resp](const LeaderTabletPeer& leader_tablet_peer) -> Status { |
2256 | 144 | const auto& tablet = leader_tablet_peer.tablet; |
2257 | 144 | if (FLAGS_rocksdb_max_file_size_for_compaction > 0 && |
2258 | 144 | tablet->schema()->table_properties().HasDefaultTimeToLive()0 ) { |
2259 | 0 | auto s = STATUS(NotSupported, "Tablet splitting not supported for TTL tables."); |
2260 | 0 | return s.CloneAndAddErrorCode( |
2261 | 0 | TabletServerError(TabletServerErrorPB::TABLET_SPLIT_DISABLED_TTL_EXPIRY)); |
2262 | 0 | } |
2263 | 144 | if (tablet->MayHaveOrphanedPostSplitData()) { |
2264 | 0 | return STATUS(IllegalState, "Tablet has orphaned post-split data"); |
2265 | 0 | } |
2266 | 144 | const auto split_encoded_key = VERIFY_RESULT143 (tablet->GetEncodedMiddleSplitKey());143 |
2267 | 0 | resp->set_split_encoded_key(split_encoded_key); |
2268 | 143 | const auto doc_key_hash = VERIFY_RESULT(docdb::DecodeDocKeyHash(split_encoded_key)); |
2269 | 143 | if (doc_key_hash.has_value()) { |
2270 | 143 | resp->set_split_partition_key(PartitionSchema::EncodeMultiColumnHashValue( |
2271 | 143 | doc_key_hash.value())); |
2272 | 143 | } else { |
2273 | 0 | resp->set_split_partition_key(split_encoded_key); |
2274 | 0 | } |
2275 | 143 | return Status::OK(); |
2276 | 143 | }); |
2277 | 144 | } |
2278 | | |
2279 | | void TabletServiceImpl::GetSharedData(const GetSharedDataRequestPB* req, |
2280 | | GetSharedDataResponsePB* resp, |
2281 | 8 | rpc::RpcContext context) { |
2282 | 8 | auto& data = server_->SharedObject(); |
2283 | 8 | resp->mutable_data()->assign(pointer_cast<const char*>(&data), sizeof(data)); |
2284 | 8 | context.RespondSuccess(); |
2285 | 8 | } |
2286 | | |
2287 | 188 | void TabletServiceImpl::Shutdown() { |
2288 | 188 | } |
2289 | | |
2290 | | scoped_refptr<Histogram> TabletServer::GetMetricsHistogram( |
2291 | 2.14M | TabletServerServiceRpcMethodIndexes metric) { |
2292 | | // Returns the metric Histogram by holding a lock to make sure tablet_server_service_ remains |
2293 | | // unchanged during the operation. |
2294 | 2.14M | std::lock_guard<simple_spinlock> l(lock_); |
2295 | 2.14M | if (tablet_server_service_) { |
2296 | 2.14M | return tablet_server_service_->GetMetric(metric).handler_latency; |
2297 | 2.14M | } |
2298 | 0 | return nullptr; |
2299 | 2.14M | } |
2300 | | |
2301 | | TabletServerForwardServiceImpl::TabletServerForwardServiceImpl(TabletServiceImpl *impl, |
2302 | | TabletServerIf *server) |
2303 | | : TabletServerForwardServiceIf(server->MetricEnt()), |
2304 | 8.74k | server_(server) { |
2305 | 8.74k | } |
2306 | | |
2307 | | void TabletServerForwardServiceImpl::Write(const WriteRequestPB* req, |
2308 | | WriteResponsePB* resp, |
2309 | 0 | rpc::RpcContext context) { |
2310 | | // Forward the rpc to the required Tserver. |
2311 | 0 | std::shared_ptr<ForwardWriteRpc> forward_rpc = |
2312 | 0 | std::make_shared<ForwardWriteRpc>(req, resp, std::move(context), server_->client()); |
2313 | 0 | forward_rpc->SendRpc(); |
2314 | 0 | } |
2315 | | |
2316 | | void TabletServerForwardServiceImpl::Read(const ReadRequestPB* req, |
2317 | | ReadResponsePB* resp, |
2318 | 0 | rpc::RpcContext context) { |
2319 | 0 | std::shared_ptr<ForwardReadRpc> forward_rpc = |
2320 | 0 | std::make_shared<ForwardReadRpc>(req, resp, std::move(context), server_->client()); |
2321 | 0 | forward_rpc->SendRpc(); |
2322 | 0 | } |
2323 | | |
2324 | | } // namespace tserver |
2325 | | } // namespace yb |