/Users/deen/code/yugabyte-db/src/yb/tserver/tablet_service.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | // |
18 | | // The following only applies to changes made to this file as part of YugaByte development. |
19 | | // |
20 | | // Portions Copyright (c) YugaByte, Inc. |
21 | | // |
22 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
23 | | // in compliance with the License. You may obtain a copy of the License at |
24 | | // |
25 | | // http://www.apache.org/licenses/LICENSE-2.0 |
26 | | // |
27 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
28 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
29 | | // or implied. See the License for the specific language governing permissions and limitations |
30 | | // under the License. |
31 | | // |
32 | | |
33 | | #include "yb/tserver/tablet_service.h" |
34 | | |
35 | | #include <algorithm> |
36 | | #include <memory> |
37 | | #include <string> |
38 | | #include <vector> |
39 | | |
40 | | #include <glog/logging.h> |
41 | | |
42 | | #include "yb/client/forward_rpc.h" |
43 | | #include "yb/client/transaction.h" |
44 | | #include "yb/client/transaction_pool.h" |
45 | | |
46 | | #include "yb/common/ql_rowblock.h" |
47 | | #include "yb/common/ql_value.h" |
48 | | #include "yb/common/row_mark.h" |
49 | | #include "yb/common/schema.h" |
50 | | #include "yb/common/wire_protocol.h" |
51 | | #include "yb/consensus/leader_lease.h" |
52 | | #include "yb/consensus/consensus.pb.h" |
53 | | #include "yb/consensus/raft_consensus.h" |
54 | | |
55 | | #include "yb/docdb/cql_operation.h" |
56 | | #include "yb/docdb/pgsql_operation.h" |
57 | | |
58 | | #include "yb/gutil/bind.h" |
59 | | #include "yb/gutil/casts.h" |
60 | | #include "yb/gutil/stl_util.h" |
61 | | #include "yb/gutil/stringprintf.h" |
62 | | #include "yb/gutil/strings/escaping.h" |
63 | | |
64 | | #include "yb/rpc/thread_pool.h" |
65 | | |
66 | | #include "yb/server/hybrid_clock.h" |
67 | | |
68 | | #include "yb/tablet/abstract_tablet.h" |
69 | | #include "yb/tablet/metadata.pb.h" |
70 | | #include "yb/tablet/operations/change_metadata_operation.h" |
71 | | #include "yb/tablet/operations/split_operation.h" |
72 | | #include "yb/tablet/operations/truncate_operation.h" |
73 | | #include "yb/tablet/operations/update_txn_operation.h" |
74 | | #include "yb/tablet/operations/write_operation.h" |
75 | | #include "yb/tablet/read_result.h" |
76 | | #include "yb/tablet/tablet.h" |
77 | | #include "yb/tablet/tablet_bootstrap_if.h" |
78 | | #include "yb/tablet/tablet_metadata.h" |
79 | | #include "yb/tablet/tablet_metrics.h" |
80 | | #include "yb/tablet/transaction_participant.h" |
81 | | #include "yb/tablet/write_query.h" |
82 | | |
83 | | #include "yb/tserver/read_query.h" |
84 | | #include "yb/tserver/service_util.h" |
85 | | #include "yb/tserver/tablet_server.h" |
86 | | #include "yb/tserver/ts_tablet_manager.h" |
87 | | #include "yb/tserver/tserver_error.h" |
88 | | |
89 | | #include "yb/util/crc.h" |
90 | | #include "yb/util/debug-util.h" |
91 | | #include "yb/util/debug/long_operation_tracker.h" |
92 | | #include "yb/util/debug/trace_event.h" |
93 | | #include "yb/util/faststring.h" |
94 | | #include "yb/util/flag_tags.h" |
95 | | #include "yb/util/format.h" |
96 | | #include "yb/util/logging.h" |
97 | | #include "yb/util/math_util.h" |
98 | | #include "yb/util/mem_tracker.h" |
99 | | #include "yb/util/metrics.h" |
100 | | #include "yb/util/monotime.h" |
101 | | #include "yb/util/random_util.h" |
102 | | #include "yb/util/scope_exit.h" |
103 | | #include "yb/util/size_literals.h" |
104 | | #include "yb/util/status.h" |
105 | | #include "yb/util/status_callback.h" |
106 | | #include "yb/util/status_format.h" |
107 | | #include "yb/util/status_log.h" |
108 | | #include "yb/util/string_util.h" |
109 | | #include "yb/util/trace.h" |
110 | | |
111 | | #include "yb/yql/pgwrapper/ysql_upgrade.h" |
112 | | |
113 | | using namespace std::literals; // NOLINT |
114 | | |
115 | | DEFINE_int32(scanner_default_batch_size_bytes, 64 * 1024, |
116 | | "The default size for batches of scan results"); |
117 | | TAG_FLAG(scanner_default_batch_size_bytes, advanced); |
118 | | TAG_FLAG(scanner_default_batch_size_bytes, runtime); |
119 | | |
120 | | DEFINE_int32(scanner_max_batch_size_bytes, 8 * 1024 * 1024, |
121 | | "The maximum batch size that a client may request for " |
122 | | "scan results."); |
123 | | TAG_FLAG(scanner_max_batch_size_bytes, advanced); |
124 | | TAG_FLAG(scanner_max_batch_size_bytes, runtime); |
125 | | |
126 | | DEFINE_int32(scanner_batch_size_rows, 100, |
127 | | "The number of rows to batch for servicing scan requests."); |
128 | | TAG_FLAG(scanner_batch_size_rows, advanced); |
129 | | TAG_FLAG(scanner_batch_size_rows, runtime); |
130 | | |
131 | | // Fault injection flags. |
132 | | DEFINE_test_flag(int32, scanner_inject_latency_on_each_batch_ms, 0, |
133 | | "If set, the scanner will pause the specified number of milliesconds " |
134 | | "before reading each batch of data on the tablet server."); |
135 | | |
136 | | DEFINE_int32(max_wait_for_safe_time_ms, 5000, |
137 | | "Maximum time in milliseconds to wait for the safe time to advance when trying to " |
138 | | "scan at the given hybrid_time."); |
139 | | |
140 | | DEFINE_int32(num_concurrent_backfills_allowed, -1, |
141 | | "Maximum number of concurrent backfill jobs that is allowed to run."); |
142 | | |
143 | | DEFINE_test_flag(bool, tserver_noop_read_write, false, "Respond NOOP to read/write."); |
144 | | |
145 | | DEFINE_uint64(index_backfill_upperbound_for_user_enforced_txn_duration_ms, 65000, |
146 | | "For Non-Txn tables, it is impossible to know at the tservers " |
147 | | "whether or not an 'old transaction' is still active. To avoid " |
148 | | "having such old transactions, we assume a bound on the duration " |
149 | | "of such transactions (during the backfill process) and wait " |
150 | | "it out. This flag denotes a conservative upper bound on the " |
151 | | "duration of such user enforced transactions."); |
152 | | TAG_FLAG(index_backfill_upperbound_for_user_enforced_txn_duration_ms, evolving); |
153 | | TAG_FLAG(index_backfill_upperbound_for_user_enforced_txn_duration_ms, runtime); |
154 | | |
155 | | DEFINE_int32(index_backfill_additional_delay_before_backfilling_ms, 0, |
156 | | "Operations that are received by the tserver, and have decided how " |
157 | | "the indexes need to be updated (based on the IndexPermission), will " |
158 | | "not be added to the list of current transactions until they are " |
159 | | "replicated/applied. This delay allows for the GetSafeTime method " |
160 | | "to wait for such operations to be replicated/applied. Ideally, this " |
161 | | "value should be set to be something larger than the raft-heartbeat-interval " |
162 | | "but can be as high as the client_rpc_timeout if we want to be more conservative."); |
163 | | TAG_FLAG(index_backfill_additional_delay_before_backfilling_ms, evolving); |
164 | | TAG_FLAG(index_backfill_additional_delay_before_backfilling_ms, runtime); |
165 | | |
166 | | DEFINE_int32(index_backfill_wait_for_old_txns_ms, 0, |
167 | | "Index backfill needs to wait for transactions that started before the " |
168 | | "WRITE_AND_DELETE phase to commit or abort before choosing a time for " |
169 | | "backfilling the index. This is the max time that the GetSafeTime call will " |
170 | | "wait for, before it resorts to attempt aborting old transactions. This is " |
171 | | "necessary to guard against the pathological active transaction that never " |
172 | | "commits from blocking the index backfill forever."); |
173 | | TAG_FLAG(index_backfill_wait_for_old_txns_ms, evolving); |
174 | | TAG_FLAG(index_backfill_wait_for_old_txns_ms, runtime); |
175 | | |
176 | | DEFINE_test_flag(double, respond_write_failed_probability, 0.0, |
177 | | "Probability to respond that write request is failed"); |
178 | | |
179 | | DEFINE_test_flag(bool, rpc_delete_tablet_fail, false, "Should delete tablet RPC fail."); |
180 | | |
181 | | DECLARE_bool(disable_alter_vs_write_mutual_exclusion); |
182 | | DECLARE_uint64(max_clock_skew_usec); |
183 | | DECLARE_uint64(transaction_min_running_check_interval_ms); |
184 | | DECLARE_int64(transaction_rpc_timeout_ms); |
185 | | |
186 | | DEFINE_test_flag(int32, txn_status_table_tablet_creation_delay_ms, 0, |
187 | | "Extra delay to slowdown creation of transaction status table tablet."); |
188 | | |
189 | | DEFINE_test_flag(int32, leader_stepdown_delay_ms, 0, |
190 | | "Amount of time to delay before starting a leader stepdown change."); |
191 | | |
192 | | DEFINE_test_flag(int32, alter_schema_delay_ms, 0, "Delay before processing AlterSchema."); |
193 | | |
194 | | DEFINE_test_flag(bool, disable_post_split_tablet_rbs_check, false, |
195 | | "If true, bypass any checks made to reject remote boostrap requests for post " |
196 | | "split tablets whose parent tablets are still present."); |
197 | | |
198 | | DEFINE_test_flag(double, fail_tablet_split_probability, 0.0, |
199 | | "Probability of failing in TabletServiceAdminImpl::SplitTablet."); |
200 | | |
201 | | DEFINE_test_flag(bool, pause_tserver_get_split_key, false, |
202 | | "Pause before processing a GetSplitKey request."); |
203 | | |
204 | | DECLARE_int32(heartbeat_interval_ms); |
205 | | |
206 | | DECLARE_int32(ysql_transaction_abort_timeout_ms); |
207 | | |
208 | | DEFINE_test_flag(bool, fail_alter_schema_after_abort_transactions, false, |
209 | | "If true, setup an error status in AlterSchema and respond success to rpc call. " |
210 | | "This failure should not cause the TServer to crash but " |
211 | | "instead return an error message on the YSQL connection."); |
212 | | |
213 | | double TEST_delay_create_transaction_probability = 0; |
214 | | |
215 | | namespace yb { |
216 | | namespace tserver { |
217 | | |
218 | | using client::internal::ForwardReadRpc; |
219 | | using client::internal::ForwardWriteRpc; |
220 | | using consensus::ChangeConfigRequestPB; |
221 | | using consensus::ChangeConfigResponsePB; |
222 | | using consensus::Consensus; |
223 | | using consensus::CONSENSUS_CONFIG_ACTIVE; |
224 | | using consensus::CONSENSUS_CONFIG_COMMITTED; |
225 | | using consensus::ConsensusConfigType; |
226 | | using consensus::ConsensusRequestPB; |
227 | | using consensus::ConsensusResponsePB; |
228 | | using consensus::GetLastOpIdRequestPB; |
229 | | using consensus::GetNodeInstanceRequestPB; |
230 | | using consensus::GetNodeInstanceResponsePB; |
231 | | using consensus::LeaderLeaseStatus; |
232 | | using consensus::LeaderStepDownRequestPB; |
233 | | using consensus::LeaderStepDownResponsePB; |
234 | | using consensus::RaftPeerPB; |
235 | | using consensus::RunLeaderElectionRequestPB; |
236 | | using consensus::RunLeaderElectionResponsePB; |
237 | | using consensus::StartRemoteBootstrapRequestPB; |
238 | | using consensus::StartRemoteBootstrapResponsePB; |
239 | | using consensus::UnsafeChangeConfigRequestPB; |
240 | | using consensus::UnsafeChangeConfigResponsePB; |
241 | | using consensus::VoteRequestPB; |
242 | | using consensus::VoteResponsePB; |
243 | | |
244 | | using std::unique_ptr; |
245 | | using google::protobuf::RepeatedPtrField; |
246 | | using rpc::RpcContext; |
247 | | using std::shared_ptr; |
248 | | using std::vector; |
249 | | using std::string; |
250 | | using strings::Substitute; |
251 | | using tablet::ChangeMetadataOperation; |
252 | | using tablet::Tablet; |
253 | | using tablet::TabletPeer; |
254 | | using tablet::TabletPeerPtr; |
255 | | using tablet::TabletStatusPB; |
256 | | using tablet::TruncateOperation; |
257 | | using tablet::OperationCompletionCallback; |
258 | | using tablet::WriteOperation; |
259 | | |
260 | | namespace { |
261 | | |
262 | 10.4M | Result<std::shared_ptr<consensus::RaftConsensus>> GetConsensus(const TabletPeerPtr& tablet_peer) { |
263 | 10.4M | auto result = tablet_peer->shared_raft_consensus(); |
264 | 10.4M | if (!result) { |
265 | 0 | Status s = STATUS(ServiceUnavailable, "Consensus unavailable. Tablet not running"); |
266 | 0 | return s.CloneAndAddErrorCode(TabletServerError(TabletServerErrorPB::TABLET_NOT_RUNNING)); |
267 | 0 | } |
268 | 10.4M | return result; |
269 | 10.4M | } |
270 | | |
271 | | template<class RespClass> |
272 | | std::shared_ptr<consensus::RaftConsensus> GetConsensusOrRespond(const TabletPeerPtr& tablet_peer, |
273 | | RespClass* resp, |
274 | 557 | rpc::RpcContext* context) { |
275 | 557 | auto result = GetConsensus(tablet_peer); |
276 | 557 | if (!result.ok()) { |
277 | 0 | SetupErrorAndRespond(resp->mutable_error(), result.status(), context); |
278 | 0 | } |
279 | 557 | return result.get(); |
280 | 557 | } |
281 | | |
282 | | template<class RespClass> |
283 | | bool GetConsensusOrRespond(const TabletPeerPtr& tablet_peer, |
284 | | RespClass* resp, |
285 | | rpc::RpcContext* context, |
286 | 10.4M | shared_ptr<Consensus>* consensus) { |
287 | 10.4M | auto result = GetConsensus(tablet_peer); |
288 | 10.4M | if (!result.ok()) { |
289 | 0 | SetupErrorAndRespond(resp->mutable_error(), result.status(), context); |
290 | 0 | return false; |
291 | 0 | } |
292 | 10.4M | return (*consensus = result.get()) != nullptr; |
293 | 10.4M | } tablet_service.cc:_ZN2yb7tserver12_GLOBAL__N_121GetConsensusOrRespondINS_9consensus19ConsensusResponsePBEEEbRKNSt3__110shared_ptrINS_6tablet10TabletPeerEEEPT_PNS_3rpc10RpcContextEPNS6_INS3_9ConsensusEEE Line | Count | Source | 286 | 10.2M | shared_ptr<Consensus>* consensus) { | 287 | 10.2M | auto result = GetConsensus(tablet_peer); | 288 | 10.2M | if (!result.ok()) { | 289 | 0 | SetupErrorAndRespond(resp->mutable_error(), result.status(), context); | 290 | 0 | return false; | 291 | 0 | } | 292 | 10.2M | return (*consensus = result.get()) != nullptr; | 293 | 10.2M | } |
tablet_service.cc:_ZN2yb7tserver12_GLOBAL__N_121GetConsensusOrRespondINS_9consensus14VoteResponsePBEEEbRKNSt3__110shared_ptrINS_6tablet10TabletPeerEEEPT_PNS_3rpc10RpcContextEPNS6_INS3_9ConsensusEEE Line | Count | Source | 286 | 125k | shared_ptr<Consensus>* consensus) { | 287 | 125k | auto result = GetConsensus(tablet_peer); | 288 | 125k | if (!result.ok()) { | 289 | 0 | SetupErrorAndRespond(resp->mutable_error(), result.status(), context); | 290 | 0 | return false; | 291 | 0 | } | 292 | 125k | return (*consensus = result.get()) != nullptr; | 293 | 125k | } |
tablet_service.cc:_ZN2yb7tserver12_GLOBAL__N_121GetConsensusOrRespondINS_9consensus22ChangeConfigResponsePBEEEbRKNSt3__110shared_ptrINS_6tablet10TabletPeerEEEPT_PNS_3rpc10RpcContextEPNS6_INS3_9ConsensusEEE Line | Count | Source | 286 | 2.50k | shared_ptr<Consensus>* consensus) { | 287 | 2.50k | auto result = GetConsensus(tablet_peer); | 288 | 2.50k | if (!result.ok()) { | 289 | 0 | SetupErrorAndRespond(resp->mutable_error(), result.status(), context); | 290 | 0 | return false; | 291 | 0 | } | 292 | 2.50k | return (*consensus = result.get()) != nullptr; | 293 | 2.50k | } |
Unexecuted instantiation: tablet_service.cc:_ZN2yb7tserver12_GLOBAL__N_121GetConsensusOrRespondINS_9consensus28UnsafeChangeConfigResponsePBEEEbRKNSt3__110shared_ptrINS_6tablet10TabletPeerEEEPT_PNS_3rpc10RpcContextEPNS6_INS3_9ConsensusEEE tablet_service.cc:_ZN2yb7tserver12_GLOBAL__N_121GetConsensusOrRespondINS_9consensus27RunLeaderElectionResponsePBEEEbRKNSt3__110shared_ptrINS_6tablet10TabletPeerEEEPT_PNS_3rpc10RpcContextEPNS6_INS3_9ConsensusEEE Line | Count | Source | 286 | 32.8k | shared_ptr<Consensus>* consensus) { | 287 | 32.8k | auto result = GetConsensus(tablet_peer); | 288 | 32.8k | if (!result.ok()) { | 289 | 0 | SetupErrorAndRespond(resp->mutable_error(), result.status(), context); | 290 | 0 | return false; | 291 | 0 | } | 292 | 32.8k | return (*consensus = result.get()) != nullptr; | 293 | 32.8k | } |
tablet_service.cc:_ZN2yb7tserver12_GLOBAL__N_121GetConsensusOrRespondINS_9consensus28LeaderElectionLostResponsePBEEEbRKNSt3__110shared_ptrINS_6tablet10TabletPeerEEEPT_PNS_3rpc10RpcContextEPNS6_INS3_9ConsensusEEE Line | Count | Source | 286 | 44 | shared_ptr<Consensus>* consensus) { | 287 | 44 | auto result = GetConsensus(tablet_peer); | 288 | 44 | if (!result.ok()) { | 289 | 0 | SetupErrorAndRespond(resp->mutable_error(), result.status(), context); | 290 | 0 | return false; | 291 | 0 | } | 292 | 44 | return (*consensus = result.get()) != nullptr; | 293 | 44 | } |
tablet_service.cc:_ZN2yb7tserver12_GLOBAL__N_121GetConsensusOrRespondINS_9consensus24LeaderStepDownResponsePBEEEbRKNSt3__110shared_ptrINS_6tablet10TabletPeerEEEPT_PNS_3rpc10RpcContextEPNS6_INS3_9ConsensusEEE Line | Count | Source | 286 | 5.87k | shared_ptr<Consensus>* consensus) { | 287 | 5.87k | auto result = GetConsensus(tablet_peer); | 288 | 5.87k | if (!result.ok()) { | 289 | 0 | SetupErrorAndRespond(resp->mutable_error(), result.status(), context); | 290 | 0 | return false; | 291 | 0 | } | 292 | 5.87k | return (*consensus = result.get()) != nullptr; | 293 | 5.87k | } |
tablet_service.cc:_ZN2yb7tserver12_GLOBAL__N_121GetConsensusOrRespondINS_9consensus27GetConsensusStateResponsePBEEEbRKNSt3__110shared_ptrINS_6tablet10TabletPeerEEEPT_PNS_3rpc10RpcContextEPNS6_INS3_9ConsensusEEE Line | Count | Source | 286 | 3.68k | shared_ptr<Consensus>* consensus) { | 287 | 3.68k | auto result = GetConsensus(tablet_peer); | 288 | 3.68k | if (!result.ok()) { | 289 | 0 | SetupErrorAndRespond(resp->mutable_error(), result.status(), context); | 290 | 0 | return false; | 291 | 0 | } | 292 | 3.68k | return (*consensus = result.get()) != nullptr; | 293 | 3.68k | } |
|
294 | | |
295 | | } // namespace |
296 | | |
297 | | template<class Resp> |
298 | | bool TabletServiceImpl::CheckWriteThrottlingOrRespond( |
299 | 1.34M | double score, tablet::TabletPeer* tablet_peer, Resp* resp, rpc::RpcContext* context) { |
300 | | // Check for memory pressure; don't bother doing any additional work if we've |
301 | | // exceeded the limit. |
302 | 1.34M | auto status = CheckWriteThrottling(score, tablet_peer); |
303 | 1.34M | if (!status.ok()) { |
304 | 22 | SetupErrorAndRespond(resp->mutable_error(), status, context); |
305 | 22 | return false; |
306 | 22 | } |
307 | | |
308 | 1.34M | return true; |
309 | 1.34M | } |
310 | | |
311 | | typedef ListTabletsResponsePB::StatusAndSchemaPB StatusAndSchemaPB; |
312 | | |
313 | | class WriteQueryCompletionCallback { |
314 | | public: |
315 | | WriteQueryCompletionCallback( |
316 | | tablet::TabletPeerPtr tablet_peer, |
317 | | std::shared_ptr<rpc::RpcContext> context, |
318 | | WriteResponsePB* response, |
319 | | tablet::WriteQuery* query, |
320 | | const server::ClockPtr& clock, |
321 | | bool trace = false) |
322 | | : tablet_peer_(std::move(tablet_peer)), |
323 | | context_(std::move(context)), |
324 | | response_(response), |
325 | | query_(query), |
326 | | clock_(clock), |
327 | | include_trace_(trace), |
328 | 1.34M | trace_(include_trace_ ? Trace::CurrentTrace() : nullptr) {} |
329 | | |
330 | 1.34M | void operator()(Status status) const { |
331 | 444 | VLOG(1) << __PRETTY_FUNCTION__ << " completing with status " << status; |
332 | | // When we don't need to return any data, we could return success on duplicate request. |
333 | 1.34M | if (status.IsAlreadyPresent() && |
334 | 1 | query_->ql_write_ops()->empty() && |
335 | 1 | query_->pgsql_write_ops()->empty() && |
336 | 1 | query_->client_request()->redis_write_batch().empty()) { |
337 | 1 | status = Status::OK(); |
338 | 1 | } |
339 | | |
340 | 1.34M | TRACE("Write completing with status $0", yb::ToString(status)); |
341 | | |
342 | 1.34M | if (!status.ok()) { |
343 | 80.5k | LOG(INFO) << tablet_peer_->LogPrefix() << "Write failed: " << status; |
344 | 80.5k | if (include_trace_ && trace_) { |
345 | 0 | response_->set_trace_buffer(trace_->DumpToString(true)); |
346 | 0 | } |
347 | 80.5k | SetupErrorAndRespond(get_error(), status, context_.get()); |
348 | 80.5k | return; |
349 | 80.5k | } |
350 | | |
351 | | // Retrieve the rowblocks returned from the QL write operations and return them as RPC |
352 | | // sidecars. Populate the row schema also. |
353 | 1.26M | faststring rows_data; |
354 | 273 | for (const auto& ql_write_op : *query_->ql_write_ops()) { |
355 | 273 | const auto& ql_write_req = ql_write_op->request(); |
356 | 273 | auto* ql_write_resp = ql_write_op->response(); |
357 | 273 | const QLRowBlock* rowblock = ql_write_op->rowblock(); |
358 | 273 | SchemaToColumnPBs(rowblock->schema(), ql_write_resp->mutable_column_schemas()); |
359 | 273 | rows_data.clear(); |
360 | 273 | rowblock->Serialize(ql_write_req.client(), &rows_data); |
361 | 273 | ql_write_resp->set_rows_data_sidecar( |
362 | 273 | narrow_cast<int32_t>(context_->AddRpcSidecar(rows_data))); |
363 | 273 | } |
364 | | |
365 | 1.26M | if (!query_->pgsql_write_ops()->empty()) { |
366 | | // Retrieve the resultset returned from the PGSQL write operations and return them as RPC |
367 | | // sidecars. |
368 | | |
369 | 237k | size_t sidecars_size = 0; |
370 | 2.10M | for (const auto& pgsql_write_op : *query_->pgsql_write_ops()) { |
371 | 2.10M | sidecars_size += pgsql_write_op->result_buffer().size(); |
372 | 2.10M | } |
373 | | |
374 | 237k | if (sidecars_size != 0) { |
375 | 236k | context_->ReserveSidecarSpace(sidecars_size); |
376 | 2.10M | for (const auto& pgsql_write_op : *query_->pgsql_write_ops()) { |
377 | 2.10M | auto* pgsql_write_resp = pgsql_write_op->response(); |
378 | 2.10M | const faststring& result_buffer = pgsql_write_op->result_buffer(); |
379 | 2.10M | if (!result_buffer.empty()) { |
380 | 2.10M | pgsql_write_resp->set_rows_data_sidecar( |
381 | 2.10M | narrow_cast<int32_t>(context_->AddRpcSidecar(result_buffer))); |
382 | 2.10M | } |
383 | 2.10M | } |
384 | 236k | } |
385 | 237k | } |
386 | | |
387 | 1.26M | if (include_trace_ && trace_) { |
388 | 0 | response_->set_trace_buffer(trace_->DumpToString(true)); |
389 | 0 | } |
390 | 1.26M | response_->set_propagated_hybrid_time(clock_->Now().ToUint64()); |
391 | 1.26M | context_->RespondSuccess(); |
392 | 18.4E | VLOG(1) << __PRETTY_FUNCTION__ << " RespondedSuccess"; |
393 | 1.26M | } |
394 | | |
395 | | private: |
396 | 80.6k | TabletServerErrorPB* get_error() const { |
397 | 80.6k | return response_->mutable_error(); |
398 | 80.6k | } |
399 | | |
400 | | tablet::TabletPeerPtr tablet_peer_; |
401 | | const std::shared_ptr<rpc::RpcContext> context_; |
402 | | WriteResponsePB* const response_; |
403 | | tablet::WriteQuery* const query_; |
404 | | server::ClockPtr clock_; |
405 | | const bool include_trace_; |
406 | | scoped_refptr<Trace> trace_; |
407 | | }; |
408 | | |
409 | | // Checksums the scan result. |
410 | | class ScanResultChecksummer { |
411 | | public: |
412 | 2.81k | ScanResultChecksummer() {} |
413 | | |
414 | 1.72M | void HandleRow(const Schema& schema, const QLTableRow& row) { |
415 | 1.72M | QLValue value; |
416 | 1.72M | buffer_.clear(); |
417 | 7.81M | for (uint32_t col_index = 0; col_index != schema.num_columns(); ++col_index) { |
418 | 6.08M | auto status = row.GetValue(schema.column_id(col_index), &value); |
419 | 6.08M | if (!status.ok()) { |
420 | 0 | LOG(WARNING) << "Column " << schema.column_id(col_index) |
421 | 0 | << " not found in " << row.ToString(); |
422 | 0 | continue; |
423 | 0 | } |
424 | 6.08M | buffer_.append(pointer_cast<const char*>(&col_index), sizeof(col_index)); |
425 | 6.08M | if (schema.column(col_index).is_nullable()) { |
426 | 3.04M | uint8_t defined = value.IsNull() ? 0 : 1; |
427 | 3.04M | buffer_.append(pointer_cast<const char*>(&defined), sizeof(defined)); |
428 | 3.04M | } |
429 | 6.08M | if (!value.IsNull()) { |
430 | 6.03M | value.value().AppendToString(&buffer_); |
431 | 6.03M | } |
432 | 6.08M | } |
433 | 1.72M | crc_->Compute(buffer_.c_str(), buffer_.size(), &agg_checksum_, nullptr); |
434 | 1.72M | } |
435 | | |
436 | | // Accessors for initializing / setting the checksum. |
437 | 2.90k | uint64_t agg_checksum() const { return agg_checksum_; } |
438 | | |
439 | | private: |
440 | | crc::Crc* const crc_ = crc::GetCrc32cInstance(); |
441 | | uint64_t agg_checksum_ = 0; |
442 | | std::string buffer_; |
443 | | }; |
444 | | |
445 | | Result<std::shared_ptr<tablet::AbstractTablet>> TabletServiceImpl::GetTabletForRead( |
446 | | const TabletId& tablet_id, tablet::TabletPeerPtr tablet_peer, |
447 | 3.91M | YBConsistencyLevel consistency_level, tserver::AllowSplitTablet allow_split_tablet) { |
448 | 3.91M | return GetTablet(server_->tablet_peer_lookup(), tablet_id, std::move(tablet_peer), |
449 | 3.91M | consistency_level, allow_split_tablet); |
450 | 3.91M | } |
451 | | |
452 | | TabletServiceImpl::TabletServiceImpl(TabletServerIf* server) |
453 | | : TabletServerServiceIf(server->MetricEnt()), |
454 | 11.2k | server_(server) { |
455 | 11.2k | } |
456 | | |
457 | | TabletServiceAdminImpl::TabletServiceAdminImpl(TabletServer* server) |
458 | 5.81k | : TabletServerAdminServiceIf(server->MetricEnt()), server_(server) {} |
459 | | |
460 | | void TabletServiceAdminImpl::BackfillDone( |
461 | | const tablet::ChangeMetadataRequestPB* req, ChangeMetadataResponsePB* resp, |
462 | 2.47k | rpc::RpcContext context) { |
463 | 2.47k | if (!CheckUuidMatchOrRespond(server_->tablet_manager(), "BackfillDone", req, resp, &context)) { |
464 | 0 | return; |
465 | 0 | } |
466 | 23 | DVLOG(3) << "Received BackfillDone RPC: " << req->DebugString(); |
467 | | |
468 | 2.47k | server::UpdateClock(*req, server_->Clock()); |
469 | | |
470 | | // For now, we shall only allow this RPC on the leader. |
471 | 2.47k | auto tablet = |
472 | 2.47k | LookupLeaderTabletOrRespond(server_->tablet_peer_lookup(), req->tablet_id(), resp, &context); |
473 | 2.47k | if (!tablet) { |
474 | 11 | return; |
475 | 11 | } |
476 | | |
477 | 2.45k | auto operation = std::make_unique<ChangeMetadataOperation>( |
478 | 2.45k | tablet.peer->tablet(), tablet.peer->log(), req); |
479 | | |
480 | 2.45k | operation->set_completion_callback( |
481 | 2.45k | MakeRpcOperationCompletionCallback(std::move(context), resp, server_->Clock())); |
482 | | |
483 | | // Submit the alter schema op. The RPC will be responded to asynchronously. |
484 | 2.45k | tablet.peer->Submit(std::move(operation), tablet.leader_term); |
485 | 2.45k | } |
486 | | |
487 | | void TabletServiceAdminImpl::GetSafeTime( |
488 | 2.65k | const GetSafeTimeRequestPB* req, GetSafeTimeResponsePB* resp, rpc::RpcContext context) { |
489 | 2.65k | if (!CheckUuidMatchOrRespond(server_->tablet_manager(), "GetSafeTime", req, resp, &context)) { |
490 | 0 | return; |
491 | 0 | } |
492 | 7 | DVLOG(3) << "Received GetSafeTime RPC: " << req->DebugString(); |
493 | | |
494 | 2.65k | server::UpdateClock(*req, server_->Clock()); |
495 | | |
496 | | // For now, we shall only allow this RPC on the leader. |
497 | 2.65k | auto tablet = |
498 | 2.65k | LookupLeaderTabletOrRespond(server_->tablet_peer_lookup(), req->tablet_id(), resp, &context); |
499 | 2.65k | if (!tablet) { |
500 | 0 | return; |
501 | 0 | } |
502 | 2.65k | const CoarseTimePoint& deadline = context.GetClientDeadline(); |
503 | 2.65k | HybridTime min_hybrid_time(HybridTime::kMin); |
504 | 2.65k | if (req->has_min_hybrid_time_for_backfill()) { |
505 | 2.64k | min_hybrid_time = HybridTime(req->min_hybrid_time_for_backfill()); |
506 | | // For Transactional tables, wait until there are no pending transactions that started |
507 | | // prior to min_hybrid_time. These may not have updated the index correctly, if they |
508 | | // happen to commit after the backfill scan, it is possible that they may miss updating |
509 | | // the index because the some operations may have taken place prior to min_hybrid_time. |
510 | | // |
511 | | // For Non-Txn tables, it is impossible to know at the tservers whether or not an "old |
512 | | // transaction" is still active. To avoid having such old transactions, we assume a |
513 | | // bound on the length of such transactions (during the backfill process) and wait it |
514 | | // out. |
515 | 2.64k | if (!tablet.peer->tablet()->transaction_participant()) { |
516 | 498 | min_hybrid_time = min_hybrid_time.AddMilliseconds( |
517 | 498 | FLAGS_index_backfill_upperbound_for_user_enforced_txn_duration_ms); |
518 | 0 | VLOG(2) << "GetSafeTime called on a user enforced transaction tablet " |
519 | 0 | << tablet.peer->tablet_id() << " will wait until " |
520 | 0 | << min_hybrid_time << " is safe."; |
521 | 2.14k | } else { |
522 | | // Add some extra delay to wait for operations being replicated to be |
523 | | // applied. |
524 | 2.14k | SleepFor(MonoDelta::FromMilliseconds( |
525 | 2.14k | FLAGS_index_backfill_additional_delay_before_backfilling_ms)); |
526 | | |
527 | 2.14k | auto txn_particpant = tablet.peer->tablet()->transaction_participant(); |
528 | 2.14k | auto wait_until = CoarseMonoClock::Now() + FLAGS_index_backfill_wait_for_old_txns_ms * 1ms; |
529 | 2.14k | HybridTime min_running_ht; |
530 | 2.13k | for (;;) { |
531 | 2.13k | min_running_ht = txn_particpant->MinRunningHybridTime(); |
532 | 2.15k | if ((min_running_ht && min_running_ht >= min_hybrid_time) || |
533 | 2.14k | CoarseMonoClock::Now() > wait_until) { |
534 | 2.14k | break; |
535 | 2.14k | } |
536 | 18.4E | VLOG(2) << "MinRunningHybridTime is " << min_running_ht |
537 | 18.4E | << " need to wait for " << min_hybrid_time; |
538 | 18.4E | SleepFor(MonoDelta::FromMilliseconds(FLAGS_transaction_min_running_check_interval_ms)); |
539 | 18.4E | } |
540 | | |
541 | 18.4E | VLOG(2) << "Finally MinRunningHybridTime is " << min_running_ht; |
542 | 2.14k | if (min_running_ht < min_hybrid_time) { |
543 | 0 | VLOG(2) << "Aborting Txns that started prior to " << min_hybrid_time; |
544 | 1 | auto s = txn_particpant->StopActiveTxnsPriorTo(min_hybrid_time, deadline); |
545 | 1 | if (!s.ok()) { |
546 | 0 | SetupErrorAndRespond(resp->mutable_error(), s, &context); |
547 | 0 | return; |
548 | 0 | } |
549 | 2.65k | } |
550 | 2.14k | } |
551 | 2.64k | } |
552 | | |
553 | 2.65k | auto safe_time = tablet.peer->tablet()->SafeTime( |
554 | 2.65k | tablet::RequireLease::kTrue, min_hybrid_time, deadline); |
555 | 2.65k | if (!safe_time.ok()) { |
556 | 5 | SetupErrorAndRespond(resp->mutable_error(), safe_time.status(), &context); |
557 | 5 | return; |
558 | 5 | } |
559 | | |
560 | 2.64k | resp->set_safe_time(safe_time->ToUint64()); |
561 | 2.64k | resp->set_propagated_hybrid_time(server_->Clock()->Now().ToUint64()); |
562 | 18.4E | VLOG(1) << "Tablet " << tablet.peer->tablet_id() |
563 | 18.4E | << " returning safe time " << yb::ToString(safe_time); |
564 | | |
565 | 2.64k | context.RespondSuccess(); |
566 | 2.64k | } |
567 | | |
568 | | void TabletServiceAdminImpl::BackfillIndex( |
569 | 2.70k | const BackfillIndexRequestPB* req, BackfillIndexResponsePB* resp, rpc::RpcContext context) { |
570 | 2.70k | if (!CheckUuidMatchOrRespond(server_->tablet_manager(), "BackfillIndex", req, resp, &context)) { |
571 | 0 | return; |
572 | 0 | } |
573 | 1 | DVLOG(3) << "Received BackfillIndex RPC: " << req->DebugString(); |
574 | | |
575 | 2.70k | server::UpdateClock(*req, server_->Clock()); |
576 | | |
577 | | // For now, we shall only allow this RPC on the leader. |
578 | 2.70k | auto tablet = |
579 | 2.70k | LookupLeaderTabletOrRespond(server_->tablet_peer_lookup(), req->tablet_id(), resp, &context); |
580 | 2.70k | if (!tablet) { |
581 | 0 | return; |
582 | 0 | } |
583 | | |
584 | 2.70k | if (req->indexes().empty()) { |
585 | 0 | SetupErrorAndRespond( |
586 | 0 | resp->mutable_error(), |
587 | 0 | STATUS(InvalidArgument, "No indexes given in request"), |
588 | 0 | TabletServerErrorPB::OPERATION_NOT_SUPPORTED, |
589 | 0 | &context); |
590 | 0 | return; |
591 | 0 | } |
592 | | |
593 | 2.70k | const CoarseTimePoint &deadline = context.GetClientDeadline(); |
594 | 2.70k | const auto coarse_start = CoarseMonoClock::Now(); |
595 | 2.70k | { |
596 | 2.70k | std::unique_lock<std::mutex> l(backfill_lock_); |
597 | 3.63k | while (num_tablets_backfilling_ >= FLAGS_num_concurrent_backfills_allowed) { |
598 | 934 | if (backfill_cond_.wait_until(l, deadline) == std::cv_status::timeout) { |
599 | 0 | SetupErrorAndRespond( |
600 | 0 | resp->mutable_error(), |
601 | 0 | STATUS_FORMAT(ServiceUnavailable, |
602 | 0 | "Already running $0 backfill requests", |
603 | 0 | num_tablets_backfilling_), |
604 | 0 | &context); |
605 | 0 | return; |
606 | 0 | } |
607 | 934 | } |
608 | 2.70k | num_tablets_backfilling_++; |
609 | 2.70k | } |
610 | 2.62k | auto se = ScopeExit([this] { |
611 | 2.62k | std::unique_lock<std::mutex> l(this->backfill_lock_); |
612 | 2.62k | this->num_tablets_backfilling_--; |
613 | 2.62k | this->backfill_cond_.notify_all(); |
614 | 2.62k | }); |
615 | | |
616 | | // Wait for SafeTime to get past read_at; |
617 | 2.70k | const HybridTime read_at(req->read_at_hybrid_time()); |
618 | 18.4E | DVLOG(1) << "Waiting for safe time to be past " << read_at; |
619 | 2.70k | const auto safe_time = |
620 | 2.70k | tablet.peer->tablet()->SafeTime(tablet::RequireLease::kFalse, read_at, deadline); |
621 | 78 | DVLOG(1) << "Got safe time " << safe_time.ToString(); |
622 | 2.70k | if (!safe_time.ok()) { |
623 | 0 | LOG(ERROR) << "Could not get a good enough safe time " << safe_time.ToString(); |
624 | 0 | SetupErrorAndRespond(resp->mutable_error(), safe_time.status(), &context); |
625 | 0 | return; |
626 | 0 | } |
627 | | |
628 | | // Don't work on the request if we have had to wait more than 50% |
629 | | // of the time allocated to us for the RPC. |
630 | | // Backfill is a costly operation, we do not want to start working |
631 | | // on it if we expect the client (master) to time out the RPC and |
632 | | // force us to redo the work. |
633 | 2.70k | const auto coarse_now = CoarseMonoClock::Now(); |
634 | 2.70k | if (deadline - coarse_now < coarse_now - coarse_start) { |
635 | 0 | SetupErrorAndRespond( |
636 | 0 | resp->mutable_error(), |
637 | 0 | STATUS_FORMAT( |
638 | 0 | ServiceUnavailable, "Not enough time left $0", deadline - coarse_now), |
639 | 0 | &context); |
640 | 0 | return; |
641 | 0 | } |
642 | | |
643 | 2.70k | bool all_at_backfill = true; |
644 | 2.70k | bool all_past_backfill = true; |
645 | 2.70k | bool is_pg_table = tablet.peer->tablet()->table_type() == TableType::PGSQL_TABLE_TYPE; |
646 | 2.70k | const shared_ptr<IndexMap> index_map = tablet.peer->tablet_metadata()->index_map( |
647 | 2.70k | req->indexed_table_id()); |
648 | 2.70k | std::vector<IndexInfo> indexes_to_backfill; |
649 | 2.70k | std::vector<TableId> index_ids; |
650 | 2.65k | for (const auto& idx : req->indexes()) { |
651 | 2.65k | auto result = index_map->FindIndex(idx.table_id()); |
652 | 2.65k | if (result) { |
653 | 2.64k | const IndexInfo* index_info = *result; |
654 | 2.64k | indexes_to_backfill.push_back(*index_info); |
655 | 2.64k | index_ids.push_back(index_info->table_id()); |
656 | | |
657 | 2.64k | IndexInfoPB idx_info_pb; |
658 | 2.64k | index_info->ToPB(&idx_info_pb); |
659 | 2.64k | if (!is_pg_table) { |
660 | 2.40k | all_at_backfill &= |
661 | 2.40k | idx_info_pb.index_permissions() == IndexPermissions::INDEX_PERM_DO_BACKFILL; |
662 | 239 | } else { |
663 | | // YSQL tables don't use all the docdb permissions, so use this approximation. |
664 | | // TODO(jason): change this back to being like YCQL once we bring the docdb permission |
665 | | // DO_BACKFILL back (issue #6218). |
666 | 239 | all_at_backfill &= |
667 | 239 | idx_info_pb.index_permissions() == IndexPermissions::INDEX_PERM_WRITE_AND_DELETE; |
668 | 239 | } |
669 | 2.64k | all_past_backfill &= |
670 | 2.64k | idx_info_pb.index_permissions() > IndexPermissions::INDEX_PERM_DO_BACKFILL; |
671 | 6 | } else { |
672 | 6 | LOG(WARNING) << "index " << idx.table_id() << " not found in tablet metadata"; |
673 | 6 | all_at_backfill = false; |
674 | 6 | all_past_backfill = false; |
675 | 6 | } |
676 | 2.65k | } |
677 | | |
678 | 2.70k | if (!all_at_backfill) { |
679 | 0 | if (all_past_backfill) { |
680 | | // Change this to see if for all indexes: IndexPermission > DO_BACKFILL. |
681 | 0 | LOG(WARNING) << "Received BackfillIndex RPC: " << req->DebugString() |
682 | 0 | << " after all indexes have moved past DO_BACKFILL. IndexMap is " |
683 | 0 | << ToString(index_map); |
684 | | // This is possible if this tablet completed the backfill. But the master failed over before |
685 | | // other tablets could complete. |
686 | | // The new master is redoing the backfill. We are safe to ignore this request. |
687 | 0 | context.RespondSuccess(); |
688 | 0 | return; |
689 | 0 | } |
690 | | |
691 | 0 | uint32_t our_schema_version = tablet.peer->tablet_metadata()->schema_version(); |
692 | 0 | uint32_t their_schema_version = req->schema_version(); |
693 | 0 | DCHECK_NE(our_schema_version, their_schema_version); |
694 | 0 | SetupErrorAndRespond( |
695 | 0 | resp->mutable_error(), |
696 | 0 | STATUS_SUBSTITUTE( |
697 | 0 | InvalidArgument, |
698 | 0 | "Tablet has a different schema $0 vs $1. " |
699 | 0 | "Requested index is not ready to backfill. IndexMap: $2", |
700 | 0 | our_schema_version, their_schema_version, ToString(index_map)), |
701 | 0 | TabletServerErrorPB::MISMATCHED_SCHEMA, &context); |
702 | 0 | return; |
703 | 0 | } |
704 | | |
705 | 2.70k | Status backfill_status; |
706 | 2.70k | std::string backfilled_until; |
707 | 2.70k | std::unordered_set<TableId> failed_indexes; |
708 | 2.70k | size_t number_rows_processed = 0; |
709 | 2.70k | if (is_pg_table) { |
710 | 235 | if (!req->has_namespace_name()) { |
711 | 0 | SetupErrorAndRespond( |
712 | 0 | resp->mutable_error(), |
713 | 0 | STATUS( |
714 | 0 | InvalidArgument, |
715 | 0 | "Attempted backfill on YSQL table without supplying database name"), |
716 | 0 | TabletServerErrorPB::OPERATION_NOT_SUPPORTED, |
717 | 0 | &context); |
718 | 0 | return; |
719 | 0 | } |
720 | 235 | backfill_status = tablet.peer->tablet()->BackfillIndexesForYsql( |
721 | 235 | indexes_to_backfill, |
722 | 235 | req->start_key(), |
723 | 235 | deadline, |
724 | 235 | read_at, |
725 | 235 | server_->pgsql_proxy_bind_address(), |
726 | 235 | req->namespace_name(), |
727 | 235 | server_->GetSharedMemoryPostgresAuthKey(), |
728 | 235 | &number_rows_processed, |
729 | 235 | &backfilled_until); |
730 | 235 | if (backfill_status.IsIllegalState()) { |
731 | 0 | DCHECK_EQ(failed_indexes.size(), 0) << "We don't support batching in YSQL yet"; |
732 | 1 | for (const auto& idx_info : indexes_to_backfill) { |
733 | 1 | failed_indexes.insert(idx_info.table_id()); |
734 | 1 | } |
735 | 0 | DCHECK_EQ(failed_indexes.size(), 1) << "We don't support batching in YSQL yet"; |
736 | 1 | } |
737 | 2.46k | } else if (tablet.peer->tablet()->table_type() == TableType::YQL_TABLE_TYPE) { |
738 | 2.39k | backfill_status = tablet.peer->tablet()->BackfillIndexes( |
739 | 2.39k | indexes_to_backfill, |
740 | 2.39k | req->start_key(), |
741 | 2.39k | deadline, |
742 | 2.39k | read_at, |
743 | 2.39k | &number_rows_processed, |
744 | 2.39k | &backfilled_until, |
745 | 2.39k | &failed_indexes); |
746 | 76 | } else { |
747 | 76 | SetupErrorAndRespond( |
748 | 76 | resp->mutable_error(), |
749 | 76 | STATUS(InvalidArgument, "Attempted backfill on tablet of invalid table type"), |
750 | 76 | TabletServerErrorPB::OPERATION_NOT_SUPPORTED, |
751 | 76 | &context); |
752 | 76 | return; |
753 | 76 | } |
754 | 0 | DVLOG(1) << "Tablet " << tablet.peer->tablet_id() << " backfilled indexes " |
755 | 0 | << yb::ToString(index_ids) << " and got " << backfill_status |
756 | 0 | << " backfilled until : " << backfilled_until; |
757 | | |
758 | 2.62k | resp->set_backfilled_until(backfilled_until); |
759 | 2.62k | resp->set_propagated_hybrid_time(server_->Clock()->Now().ToUint64()); |
760 | 2.62k | resp->set_number_rows_processed(number_rows_processed); |
761 | | |
762 | 2.62k | if (!backfill_status.ok()) { |
763 | 0 | VLOG(2) << " Failed indexes are " << yb::ToString(failed_indexes); |
764 | 6 | for (const auto& idx : failed_indexes) { |
765 | 6 | *resp->add_failed_index_ids() = idx; |
766 | 6 | } |
767 | 6 | SetupErrorAndRespond( |
768 | 6 | resp->mutable_error(), |
769 | 6 | backfill_status, |
770 | 6 | (backfill_status.IsIllegalState() |
771 | 6 | ? TabletServerErrorPB::OPERATION_NOT_SUPPORTED |
772 | 0 | : TabletServerErrorPB::UNKNOWN_ERROR), |
773 | 6 | &context); |
774 | 6 | return; |
775 | 6 | } |
776 | | |
777 | 2.62k | context.RespondSuccess(); |
778 | 2.62k | } |
779 | | |
780 | | void TabletServiceAdminImpl::AlterSchema(const tablet::ChangeMetadataRequestPB* req, |
781 | | ChangeMetadataResponsePB* resp, |
782 | 18.2k | rpc::RpcContext context) { |
783 | 18.2k | if (!CheckUuidMatchOrRespond(server_->tablet_manager(), "ChangeMetadata", req, resp, &context)) { |
784 | 0 | return; |
785 | 0 | } |
786 | 221 | VLOG(1) << "Received Change Metadata RPC: " << req->DebugString(); |
787 | 18.2k | if (FLAGS_TEST_alter_schema_delay_ms) { |
788 | 0 | LOG(INFO) << __func__ << ": sleeping for " << FLAGS_TEST_alter_schema_delay_ms << "ms"; |
789 | 0 | SleepFor(MonoDelta::FromMilliseconds(FLAGS_TEST_alter_schema_delay_ms)); |
790 | 0 | LOG(INFO) << __func__ << ": done sleeping for " << FLAGS_TEST_alter_schema_delay_ms << "ms"; |
791 | 0 | } |
792 | | |
793 | 18.2k | server::UpdateClock(*req, server_->Clock()); |
794 | | |
795 | 18.2k | auto tablet = LookupLeaderTabletOrRespond( |
796 | 18.2k | server_->tablet_peer_lookup(), req->tablet_id(), resp, &context); |
797 | 18.2k | if (!tablet) { |
798 | 2 | return; |
799 | 2 | } |
800 | | |
801 | 18.2k | tablet::TableInfoPtr table_info; |
802 | 18.2k | if (req->has_alter_table_id()) { |
803 | 17.9k | auto result = tablet.peer->tablet_metadata()->GetTableInfo(req->alter_table_id()); |
804 | 17.9k | if (!result.ok()) { |
805 | 0 | SetupErrorAndRespond(resp->mutable_error(), result.status(), |
806 | 0 | TabletServerErrorPB::INVALID_SCHEMA, &context); |
807 | 0 | return; |
808 | 0 | } |
809 | 17.9k | table_info = *result; |
810 | 249 | } else { |
811 | 249 | table_info = tablet.peer->tablet_metadata()->primary_table_info(); |
812 | 249 | } |
813 | 18.2k | const Schema& tablet_schema = *table_info->schema; |
814 | 18.2k | uint32_t schema_version = table_info->schema_version; |
815 | | // Sanity check, to verify that the tablet should have the same schema |
816 | | // specified in the request. |
817 | 18.2k | Schema req_schema; |
818 | 18.2k | Status s = SchemaFromPB(req->schema(), &req_schema); |
819 | 18.2k | if (!s.ok()) { |
820 | 0 | SetupErrorAndRespond(resp->mutable_error(), s, TabletServerErrorPB::INVALID_SCHEMA, &context); |
821 | 0 | return; |
822 | 0 | } |
823 | | |
824 | | // If the schema was already applied, respond as succeeded. |
825 | 18.2k | if (!req->has_wal_retention_secs() && schema_version == req->schema_version()) { |
826 | | |
827 | 74 | if (req_schema.Equals(tablet_schema)) { |
828 | 74 | context.RespondSuccess(); |
829 | 74 | return; |
830 | 74 | } |
831 | | |
832 | 0 | schema_version = tablet.peer->tablet_metadata()->schema_version( |
833 | 0 | req->has_alter_table_id() ? req->alter_table_id() : ""); |
834 | 0 | if (schema_version == req->schema_version()) { |
835 | 0 | LOG(ERROR) << "The current schema does not match the request schema." |
836 | 0 | << " version=" << schema_version |
837 | 0 | << " current-schema=" << tablet_schema.ToString() |
838 | 0 | << " request-schema=" << req_schema.ToString() |
839 | 0 | << " (corruption)"; |
840 | 0 | SetupErrorAndRespond(resp->mutable_error(), |
841 | 0 | STATUS(Corruption, "got a different schema for the same version number"), |
842 | 0 | TabletServerErrorPB::MISMATCHED_SCHEMA, &context); |
843 | 0 | return; |
844 | 0 | } |
845 | 18.1k | } |
846 | | |
847 | | // If the current schema is newer than the one in the request reject the request. |
848 | 18.1k | if (schema_version > req->schema_version()) { |
849 | 0 | LOG(ERROR) << "Tablet " << req->tablet_id() << " has a newer schema" |
850 | 0 | << " version=" << schema_version |
851 | 0 | << " req->schema_version()=" << req->schema_version() |
852 | 0 | << "\n current-schema=" << tablet_schema.ToString() |
853 | 0 | << "\n request-schema=" << req_schema.ToString(); |
854 | 0 | SetupErrorAndRespond( |
855 | 0 | resp->mutable_error(), |
856 | 0 | STATUS_SUBSTITUTE( |
857 | 0 | InvalidArgument, "Tablet has a newer schema Tab $0. Req $1 vs Existing version : $2", |
858 | 0 | req->tablet_id(), req->schema_version(), schema_version), |
859 | 0 | TabletServerErrorPB::TABLET_HAS_A_NEWER_SCHEMA, &context); |
860 | 0 | return; |
861 | 0 | } |
862 | | |
863 | 153 | VLOG(1) << "Tablet updating schema from " |
864 | 153 | << " version=" << schema_version << " current-schema=" << tablet_schema.ToString() |
865 | 153 | << " to request-schema=" << req_schema.ToString() |
866 | 153 | << " for table ID=" << table_info->table_id; |
867 | 18.1k | ScopedRWOperationPause pause_writes; |
868 | 18.1k | if ((tablet.peer->tablet()->table_type() == TableType::YQL_TABLE_TYPE && |
869 | 13.8k | !GetAtomicFlag(&FLAGS_disable_alter_vs_write_mutual_exclusion)) || |
870 | 17.7k | tablet.peer->tablet()->table_type() == TableType::PGSQL_TABLE_TYPE) { |
871 | | // For schema change operations we will have to pause the write operations |
872 | | // until the schema change is done. This will be done synchronously. |
873 | 17.7k | pause_writes = tablet.peer->tablet()->PauseWritePermits(context.GetClientDeadline()); |
874 | 17.7k | if (!pause_writes.ok()) { |
875 | 0 | SetupErrorAndRespond( |
876 | 0 | resp->mutable_error(), |
877 | 0 | STATUS( |
878 | 0 | TryAgain, "Could not lock the tablet against write operations for schema change"), |
879 | 0 | &context); |
880 | 0 | return; |
881 | 0 | } |
882 | | |
883 | | // After write operation is paused, active transactions will be aborted for YSQL transactions. |
884 | 17.7k | if (tablet.peer->tablet()->table_type() == TableType::PGSQL_TABLE_TYPE && |
885 | 4.00k | req->should_abort_active_txns()) { |
886 | 317 | DCHECK(req->has_transaction_id()); |
887 | 317 | if (tablet.peer->tablet()->transaction_participant() == nullptr) { |
888 | 0 | auto status = STATUS( |
889 | 0 | IllegalState, "Transaction participant is null for tablet " + req->tablet_id()); |
890 | 0 | LOG(ERROR) << status; |
891 | 0 | SetupErrorAndRespond( |
892 | 0 | resp->mutable_error(), |
893 | 0 | status, |
894 | 0 | &context); |
895 | 0 | return; |
896 | 0 | } |
897 | 317 | HybridTime max_cutoff = HybridTime::kMax; |
898 | 317 | CoarseTimePoint deadline = |
899 | 317 | CoarseMonoClock::Now() + |
900 | 317 | MonoDelta::FromMilliseconds(FLAGS_ysql_transaction_abort_timeout_ms); |
901 | 317 | TransactionId txn_id = CHECK_RESULT(TransactionId::FromString(req->transaction_id())); |
902 | 317 | LOG(INFO) << "Aborting transactions that started prior to " << max_cutoff |
903 | 317 | << " for tablet id " << req->tablet_id() |
904 | 317 | << " excluding transaction with id " << txn_id; |
905 | | // There could be a chance where a transaction does not appear by transaction_participant |
906 | | // but has already begun replicating through Raft. Such transactions might succeed rather |
907 | | // than get aborted. This race codnition is dismissable for this intermediate solution. |
908 | 317 | Status status = tablet.peer->tablet()->transaction_participant()->StopActiveTxnsPriorTo( |
909 | 317 | max_cutoff, deadline, &txn_id); |
910 | 317 | if (!status.ok() || PREDICT_FALSE(FLAGS_TEST_fail_alter_schema_after_abort_transactions)) { |
911 | 0 | auto status = STATUS(TryAgain, "Transaction abort failed for tablet " + req->tablet_id()); |
912 | 0 | LOG(WARNING) << status; |
913 | 0 | SetupErrorAndRespond( |
914 | 0 | resp->mutable_error(), |
915 | 0 | status, |
916 | 0 | &context); |
917 | 0 | return; |
918 | 0 | } |
919 | 18.1k | } |
920 | 17.7k | } |
921 | 18.1k | auto operation = std::make_unique<ChangeMetadataOperation>( |
922 | 18.1k | tablet.peer->tablet(), tablet.peer->log(), req); |
923 | | |
924 | 18.1k | operation->set_completion_callback( |
925 | 18.1k | MakeRpcOperationCompletionCallback(std::move(context), resp, server_->Clock())); |
926 | 18.1k | operation->UsePermitToken(std::move(pause_writes)); |
927 | | |
928 | | // Submit the alter schema op. The RPC will be responded to asynchronously. |
929 | 18.1k | tablet.peer->Submit(std::move(operation), tablet.leader_term); |
930 | 18.1k | } |
931 | | |
932 | 11.0M | #define VERIFY_RESULT_OR_RETURN(expr) RESULT_CHECKER_HELPER( \ |
933 | 11.0M | expr, \ |
934 | 11.0M | if (!__result.ok()) { return; }); |
935 | | |
936 | | void TabletServiceImpl::VerifyTableRowRange( |
937 | | const VerifyTableRowRangeRequestPB* req, |
938 | | VerifyTableRowRangeResponsePB* resp, |
939 | 0 | rpc::RpcContext context) { |
940 | 0 | DVLOG(3) << "Received VerifyTableRowRange RPC: " << req->DebugString(); |
941 | |
|
942 | 0 | server::UpdateClock(*req, server_->Clock()); |
943 | |
|
944 | 0 | auto peer_tablet = |
945 | 0 | LookupTabletPeerOrRespond(server_->tablet_peer_lookup(), req->tablet_id(), resp, &context); |
946 | 0 | if (!peer_tablet) { |
947 | 0 | return; |
948 | 0 | } |
949 | | |
950 | 0 | auto tablet = peer_tablet->tablet; |
951 | 0 | bool is_pg_table = tablet->table_type() == TableType::PGSQL_TABLE_TYPE; |
952 | 0 | if (is_pg_table) { |
953 | 0 | SetupErrorAndRespond( |
954 | 0 | resp->mutable_error(), STATUS(NotFound, "Verify operation not supported for PGSQL tables."), |
955 | 0 | &context); |
956 | 0 | return; |
957 | 0 | } |
958 | | |
959 | 0 | const CoarseTimePoint& deadline = context.GetClientDeadline(); |
960 | | |
961 | | // Wait for SafeTime to get past read_at; |
962 | 0 | const HybridTime read_at(req->read_time()); |
963 | 0 | DVLOG(1) << "Waiting for safe time to be past " << read_at; |
964 | 0 | const auto safe_time = tablet->SafeTime(tablet::RequireLease::kFalse, read_at, deadline); |
965 | 0 | DVLOG(1) << "Got safe time " << safe_time.ToString(); |
966 | 0 | if (!safe_time.ok()) { |
967 | 0 | LOG(ERROR) << "Could not get a good enough safe time " << safe_time.ToString(); |
968 | 0 | SetupErrorAndRespond(resp->mutable_error(), safe_time.status(), &context); |
969 | 0 | return; |
970 | 0 | } |
971 | | |
972 | 0 | auto valid_read_at = req->has_read_time() ? read_at : *safe_time; |
973 | 0 | std::string verified_until = ""; |
974 | 0 | std::unordered_map<TableId, uint64> consistency_stats; |
975 | |
|
976 | 0 | if (peer_tablet->tablet_peer->tablet_metadata()->primary_table_info()->index_info) { |
977 | 0 | auto index_info = |
978 | 0 | *peer_tablet->tablet_peer->tablet_metadata()->primary_table_info()->index_info; |
979 | 0 | const auto& table_id = index_info.indexed_table_id(); |
980 | 0 | Status verify_status = tablet->VerifyMainTableConsistencyForCQL( |
981 | 0 | table_id, req->start_key(), req->num_rows(), deadline, valid_read_at, &consistency_stats, |
982 | 0 | &verified_until); |
983 | 0 | if (!verify_status.ok()) { |
984 | 0 | SetupErrorAndRespond(resp->mutable_error(), verify_status, &context); |
985 | 0 | return; |
986 | 0 | } |
987 | | |
988 | 0 | (*resp->mutable_consistency_stats())[table_id] = consistency_stats[table_id]; |
989 | 0 | } else { |
990 | 0 | const IndexMap index_map = |
991 | 0 | *peer_tablet->tablet_peer->tablet_metadata()->primary_table_info()->index_map; |
992 | 0 | vector<IndexInfo> indexes; |
993 | 0 | vector<TableId> index_ids; |
994 | 0 | if (req->index_ids().empty()) { |
995 | 0 | for (auto it = index_map.begin(); it != index_map.end(); it++) { |
996 | 0 | indexes.push_back(it->second); |
997 | 0 | } |
998 | 0 | } else { |
999 | 0 | for (const auto& idx : req->index_ids()) { |
1000 | 0 | auto result = index_map.FindIndex(idx); |
1001 | 0 | if (result) { |
1002 | 0 | const IndexInfo* index_info = *result; |
1003 | 0 | indexes.push_back(*index_info); |
1004 | 0 | index_ids.push_back(index_info->table_id()); |
1005 | 0 | } else { |
1006 | 0 | LOG(WARNING) << "Index " << idx << " not found in tablet metadata"; |
1007 | 0 | } |
1008 | 0 | } |
1009 | 0 | } |
1010 | |
|
1011 | 0 | Status verify_status = tablet->VerifyIndexTableConsistencyForCQL( |
1012 | 0 | indexes, req->start_key(), req->num_rows(), deadline, valid_read_at, &consistency_stats, |
1013 | 0 | &verified_until); |
1014 | 0 | if (!verify_status.ok()) { |
1015 | 0 | SetupErrorAndRespond(resp->mutable_error(), verify_status, &context); |
1016 | 0 | return; |
1017 | 0 | } |
1018 | | |
1019 | 0 | for (const IndexInfo& index : indexes) { |
1020 | 0 | const auto& table_id = index.table_id(); |
1021 | 0 | (*resp->mutable_consistency_stats())[table_id] = consistency_stats[table_id]; |
1022 | 0 | } |
1023 | 0 | } |
1024 | 0 | resp->set_verified_until(verified_until); |
1025 | 0 | context.RespondSuccess(); |
1026 | 0 | } |
1027 | | |
1028 | | void TabletServiceImpl::UpdateTransaction(const UpdateTransactionRequestPB* req, |
1029 | | UpdateTransactionResponsePB* resp, |
1030 | 1.48M | rpc::RpcContext context) { |
1031 | 1.48M | TRACE("UpdateTransaction"); |
1032 | | |
1033 | 1.48M | if (req->state().status() == TransactionStatus::CREATED && |
1034 | 243k | RandomActWithProbability(TEST_delay_create_transaction_probability)) { |
1035 | 0 | std::this_thread::sleep_for( |
1036 | 0 | (FLAGS_transaction_rpc_timeout_ms + RandomUniformInt(-200, 200)) * 1ms); |
1037 | 0 | } |
1038 | | |
1039 | 783 | VLOG(1) << "UpdateTransaction: " << req->ShortDebugString() |
1040 | 783 | << ", context: " << context.ToString(); |
1041 | 1.77k | LOG_IF(DFATAL, !req->has_propagated_hybrid_time()) |
1042 | 1.77k | << __func__ << " missing propagated hybrid time for " |
1043 | 1.77k | << TransactionStatus_Name(req->state().status()); |
1044 | 1.48M | UpdateClock(*req, server_->Clock()); |
1045 | | |
1046 | 1.48M | LeaderTabletPeer tablet; |
1047 | 1.48M | auto txn_status = req->state().status(); |
1048 | 1.48M | auto cleanup = txn_status == TransactionStatus::IMMEDIATE_CLEANUP || |
1049 | 1.12M | txn_status == TransactionStatus::GRACEFUL_CLEANUP; |
1050 | 1.48M | if (cleanup) { |
1051 | 548k | auto peer_tablet = VERIFY_RESULT_OR_RETURN(LookupTabletPeerOrRespond( |
1052 | 548k | server_->tablet_peer_lookup(), req->tablet_id(), resp, &context)); |
1053 | 548k | tablet.FillTabletPeer(std::move(peer_tablet)); |
1054 | 548k | tablet.leader_term = OpId::kUnknownTerm; |
1055 | 940k | } else { |
1056 | 940k | tablet = LookupLeaderTabletOrRespond( |
1057 | 940k | server_->tablet_peer_lookup(), req->tablet_id(), resp, &context); |
1058 | 940k | } |
1059 | 1.48M | if (!tablet) { |
1060 | 6.71k | return; |
1061 | 6.71k | } |
1062 | | |
1063 | 1.48M | auto state = std::make_unique<tablet::UpdateTxnOperation>(tablet.tablet.get(), &req->state()); |
1064 | 1.48M | state->set_completion_callback(MakeRpcOperationCompletionCallback( |
1065 | 1.48M | std::move(context), resp, server_->Clock())); |
1066 | | |
1067 | 1.48M | if (req->state().status() == TransactionStatus::APPLYING || cleanup) { |
1068 | 794k | auto* participant = tablet.tablet->transaction_participant(); |
1069 | 794k | if (participant) { |
1070 | 793k | participant->Handle(std::move(state), tablet.leader_term); |
1071 | 1.21k | } else { |
1072 | 1.21k | state->CompleteWithStatus(STATUS_FORMAT( |
1073 | 1.21k | InvalidArgument, "Does not have transaction participant to process $0", |
1074 | 1.21k | req->state().status())); |
1075 | 1.21k | } |
1076 | 686k | } else { |
1077 | 686k | auto* coordinator = tablet.tablet->transaction_coordinator(); |
1078 | 686k | if (coordinator) { |
1079 | 685k | coordinator->Handle(std::move(state), tablet.leader_term); |
1080 | 1.43k | } else { |
1081 | 1.43k | state->CompleteWithStatus(STATUS_FORMAT( |
1082 | 1.43k | InvalidArgument, "Does not have transaction coordinator to process $0", |
1083 | 1.43k | req->state().status())); |
1084 | 1.43k | } |
1085 | 686k | } |
1086 | 1.48M | } |
1087 | | |
1088 | | template <class Req, class Resp, class Action> |
1089 | | void TabletServiceImpl::PerformAtLeader( |
1090 | 208k | const Req& req, Resp* resp, rpc::RpcContext* context, const Action& action) { |
1091 | 208k | UpdateClock(*req, server_->Clock()); |
1092 | | |
1093 | 208k | auto tablet_peer = LookupLeaderTabletOrRespond( |
1094 | 208k | server_->tablet_peer_lookup(), req->tablet_id(), resp, context); |
1095 | | |
1096 | 208k | if (!tablet_peer) { |
1097 | 47 | return; |
1098 | 47 | } |
1099 | | |
1100 | 208k | auto status = action(tablet_peer); |
1101 | | |
1102 | 208k | if (*context) { |
1103 | 208k | resp->set_propagated_hybrid_time(server_->Clock()->Now().ToUint64()); |
1104 | 208k | if (status.ok()) { |
1105 | 208k | context->RespondSuccess(); |
1106 | 18.4E | } else { |
1107 | 18.4E | SetupErrorAndRespond(resp->mutable_error(), status, context); |
1108 | 18.4E | } |
1109 | 208k | } |
1110 | 208k | } tablet_service.cc:_ZN2yb7tserver17TabletServiceImpl15PerformAtLeaderIPKNS0_29GetTransactionStatusRequestPBENS0_30GetTransactionStatusResponsePBEZNS1_20GetTransactionStatusES5_PS6_NS_3rpc10RpcContextEE3$_1EEvRKT_PT0_PS9_RKT1_ Line | Count | Source | 1090 | 208k | const Req& req, Resp* resp, rpc::RpcContext* context, const Action& action) { | 1091 | 208k | UpdateClock(*req, server_->Clock()); | 1092 | | | 1093 | 208k | auto tablet_peer = LookupLeaderTabletOrRespond( | 1094 | 208k | server_->tablet_peer_lookup(), req->tablet_id(), resp, context); | 1095 | | | 1096 | 208k | if (!tablet_peer) { | 1097 | 45 | return; | 1098 | 45 | } | 1099 | | | 1100 | 208k | auto status = action(tablet_peer); | 1101 | | | 1102 | 208k | if (*context) { | 1103 | 208k | resp->set_propagated_hybrid_time(server_->Clock()->Now().ToUint64()); | 1104 | 208k | if (status.ok()) { | 1105 | 208k | context->RespondSuccess(); | 1106 | 18.4E | } else { | 1107 | 18.4E | SetupErrorAndRespond(resp->mutable_error(), status, context); | 1108 | 18.4E | } | 1109 | 208k | } | 1110 | 208k | } |
Unexecuted instantiation: tablet_service.cc:_ZN2yb7tserver17TabletServiceImpl15PerformAtLeaderIPKNS0_42GetTransactionStatusAtParticipantRequestPBENS0_43GetTransactionStatusAtParticipantResponsePBEZNS1_33GetTransactionStatusAtParticipantES5_PS6_NS_3rpc10RpcContextEE3$_2EEvRKT_PT0_PS9_RKT1_ tablet_service.cc:_ZN2yb7tserver17TabletServiceImpl15PerformAtLeaderIPKNS0_20GetSplitKeyRequestPBENS0_21GetSplitKeyResponsePBEZNS1_11GetSplitKeyES5_PS6_NS_3rpc10RpcContextEE3$_4EEvRKT_PT0_PS9_RKT1_ Line | Count | Source | 1090 | 49 | const Req& req, Resp* resp, rpc::RpcContext* context, const Action& action) { | 1091 | 49 | UpdateClock(*req, server_->Clock()); | 1092 | | | 1093 | 49 | auto tablet_peer = LookupLeaderTabletOrRespond( | 1094 | 49 | server_->tablet_peer_lookup(), req->tablet_id(), resp, context); | 1095 | | | 1096 | 49 | if (!tablet_peer) { | 1097 | 2 | return; | 1098 | 2 | } | 1099 | | | 1100 | 47 | auto status = action(tablet_peer); | 1101 | | | 1102 | 47 | if (*context) { | 1103 | 47 | resp->set_propagated_hybrid_time(server_->Clock()->Now().ToUint64()); | 1104 | 47 | if (status.ok()) { | 1105 | 46 | context->RespondSuccess(); | 1106 | 1 | } else { | 1107 | 1 | SetupErrorAndRespond(resp->mutable_error(), status, context); | 1108 | 1 | } | 1109 | 47 | } | 1110 | 47 | } |
|
1111 | | |
1112 | | void TabletServiceImpl::GetTransactionStatus(const GetTransactionStatusRequestPB* req, |
1113 | | GetTransactionStatusResponsePB* resp, |
1114 | 208k | rpc::RpcContext context) { |
1115 | 208k | TRACE("GetTransactionStatus"); |
1116 | | |
1117 | 208k | PerformAtLeader(req, resp, &context, |
1118 | 208k | [req, resp, &context](const LeaderTabletPeer& tablet_peer) { |
1119 | 208k | auto* transaction_coordinator = tablet_peer.tablet->transaction_coordinator(); |
1120 | 208k | if (!transaction_coordinator) { |
1121 | 0 | return STATUS_FORMAT( |
1122 | 0 | InvalidArgument, "No transaction coordinator at tablet $0", |
1123 | 0 | tablet_peer.peer->tablet_id()); |
1124 | 0 | } |
1125 | 208k | return transaction_coordinator->GetStatus( |
1126 | 208k | req->transaction_id(), context.GetClientDeadline(), resp); |
1127 | 208k | }); |
1128 | 208k | } |
1129 | | |
1130 | | void TabletServiceImpl::GetTransactionStatusAtParticipant( |
1131 | | const GetTransactionStatusAtParticipantRequestPB* req, |
1132 | | GetTransactionStatusAtParticipantResponsePB* resp, |
1133 | 0 | rpc::RpcContext context) { |
1134 | 0 | TRACE("GetTransactionStatusAtParticipant"); |
1135 | |
|
1136 | 0 | PerformAtLeader(req, resp, &context, |
1137 | 0 | [req, resp, &context](const LeaderTabletPeer& tablet_peer) -> Status { |
1138 | 0 | auto* transaction_participant = tablet_peer.peer->tablet()->transaction_participant(); |
1139 | 0 | if (!transaction_participant) { |
1140 | 0 | return STATUS_FORMAT( |
1141 | 0 | InvalidArgument, "No transaction participant at tablet $0", |
1142 | 0 | tablet_peer.peer->tablet_id()); |
1143 | 0 | } |
1144 | | |
1145 | 0 | transaction_participant->GetStatus( |
1146 | 0 | VERIFY_RESULT(FullyDecodeTransactionId(req->transaction_id())), |
1147 | 0 | req->required_num_replicated_batches(), tablet_peer.leader_term, resp, &context); |
1148 | 0 | return Status::OK(); |
1149 | 0 | }); |
1150 | 0 | } |
1151 | | |
1152 | | void TabletServiceImpl::AbortTransaction(const AbortTransactionRequestPB* req, |
1153 | | AbortTransactionResponsePB* resp, |
1154 | 126k | rpc::RpcContext context) { |
1155 | 126k | TRACE("AbortTransaction"); |
1156 | | |
1157 | 126k | UpdateClock(*req, server_->Clock()); |
1158 | | |
1159 | 126k | auto tablet = LookupLeaderTabletOrRespond( |
1160 | 126k | server_->tablet_peer_lookup(), req->tablet_id(), resp, &context); |
1161 | 126k | if (!tablet) { |
1162 | 9.80k | return; |
1163 | 9.80k | } |
1164 | | |
1165 | 116k | server::ClockPtr clock(server_->Clock()); |
1166 | 116k | auto context_ptr = std::make_shared<rpc::RpcContext>(std::move(context)); |
1167 | 116k | tablet.peer->tablet()->transaction_coordinator()->Abort( |
1168 | 116k | req->transaction_id(), |
1169 | 116k | tablet.leader_term, |
1170 | 115k | [resp, context_ptr, clock, peer = tablet.peer](Result<TransactionStatusResult> result) { |
1171 | 115k | resp->set_propagated_hybrid_time(clock->Now().ToUint64()); |
1172 | 115k | Status status; |
1173 | 115k | if (result.ok()) { |
1174 | 115k | auto leader_safe_time = peer->LeaderSafeTime(); |
1175 | 115k | if (leader_safe_time.ok()) { |
1176 | 115k | resp->set_status(result->status); |
1177 | 115k | if (result->status_time.is_valid()) { |
1178 | 25.7k | resp->set_status_hybrid_time(result->status_time.ToUint64()); |
1179 | 25.7k | } |
1180 | | // See comment above WaitForSafeTime in TransactionStatusCache::DoGetCommitData |
1181 | | // for details. |
1182 | 115k | resp->set_coordinator_safe_time(leader_safe_time->ToUint64()); |
1183 | 115k | context_ptr->RespondSuccess(); |
1184 | 115k | return; |
1185 | 115k | } |
1186 | | |
1187 | 13 | status = leader_safe_time.status(); |
1188 | 18.4E | } else { |
1189 | 18.4E | status = result.status(); |
1190 | 18.4E | } |
1191 | 18.4E | SetupErrorAndRespond(resp->mutable_error(), status, context_ptr.get()); |
1192 | 18.4E | }); |
1193 | 116k | } |
1194 | | |
1195 | | void TabletServiceImpl::Truncate(const TruncateRequestPB* req, |
1196 | | TruncateResponsePB* resp, |
1197 | 53.6k | rpc::RpcContext context) { |
1198 | 53.6k | TRACE("Truncate"); |
1199 | | |
1200 | 53.6k | UpdateClock(*req, server_->Clock()); |
1201 | | |
1202 | 53.6k | auto tablet = LookupLeaderTabletOrRespond( |
1203 | 53.6k | server_->tablet_peer_lookup(), req->tablet_id(), resp, &context); |
1204 | 53.6k | if (!tablet) { |
1205 | 9 | return; |
1206 | 9 | } |
1207 | | |
1208 | 53.5k | auto operation = std::make_unique<TruncateOperation>(tablet.peer->tablet(), &req->truncate()); |
1209 | | |
1210 | 53.5k | operation->set_completion_callback( |
1211 | 53.5k | MakeRpcOperationCompletionCallback(std::move(context), resp, server_->Clock())); |
1212 | | |
1213 | | // Submit the truncate tablet op. The RPC will be responded to asynchronously. |
1214 | 53.5k | tablet.peer->Submit(std::move(operation), tablet.leader_term); |
1215 | 53.5k | } |
1216 | | |
1217 | | void TabletServiceAdminImpl::CreateTablet(const CreateTabletRequestPB* req, |
1218 | | CreateTabletResponsePB* resp, |
1219 | 81.9k | rpc::RpcContext context) { |
1220 | 81.9k | if (!CheckUuidMatchOrRespond(server_->tablet_manager(), "CreateTablet", req, resp, &context)) { |
1221 | 0 | return; |
1222 | 0 | } |
1223 | 81.9k | auto status = DoCreateTablet(req, resp); |
1224 | 81.9k | if (!status.ok()) { |
1225 | 6 | SetupErrorAndRespond(resp->mutable_error(), status, &context); |
1226 | 81.9k | } else { |
1227 | 81.9k | context.RespondSuccess(); |
1228 | 81.9k | } |
1229 | 81.9k | } |
1230 | | |
1231 | | Status TabletServiceAdminImpl::DoCreateTablet(const CreateTabletRequestPB* req, |
1232 | 81.5k | CreateTabletResponsePB* resp) { |
1233 | 81.5k | if (PREDICT_FALSE(FLAGS_TEST_txn_status_table_tablet_creation_delay_ms > 0 && |
1234 | 71 | req->table_type() == TableType::TRANSACTION_STATUS_TABLE_TYPE)) { |
1235 | 71 | std::this_thread::sleep_for(FLAGS_TEST_txn_status_table_tablet_creation_delay_ms * 1ms); |
1236 | 71 | } |
1237 | | |
1238 | 18.4E | DVLOG(3) << "Received CreateTablet RPC: " << yb::ToString(*req); |
1239 | 81.5k | TRACE_EVENT1("tserver", "CreateTablet", |
1240 | 81.5k | "tablet_id", req->tablet_id()); |
1241 | | |
1242 | 81.5k | Schema schema; |
1243 | 81.5k | PartitionSchema partition_schema; |
1244 | 81.5k | auto status = SchemaFromPB(req->schema(), &schema); |
1245 | 81.5k | if (status.ok()) { |
1246 | 81.4k | DCHECK(schema.has_column_ids()); |
1247 | 81.4k | status = PartitionSchema::FromPB(req->partition_schema(), schema, &partition_schema); |
1248 | 81.4k | } |
1249 | 81.5k | if (!status.ok()) { |
1250 | 0 | return status.CloneAndAddErrorCode(TabletServerError(TabletServerErrorPB::INVALID_SCHEMA)); |
1251 | 0 | } |
1252 | | |
1253 | 81.5k | Partition partition; |
1254 | 81.5k | Partition::FromPB(req->partition(), &partition); |
1255 | | |
1256 | 81.5k | LOG(INFO) << "Processing CreateTablet for T " << req->tablet_id() << " P " << req->dest_uuid() |
1257 | 81.5k | << " (table=" << req->table_name() |
1258 | 81.5k | << " [id=" << req->table_id() << "]), partition=" |
1259 | 81.5k | << partition_schema.PartitionDebugString(partition, schema); |
1260 | 18.4E | VLOG(1) << "Full request: " << req->DebugString(); |
1261 | | |
1262 | 81.5k | auto table_info = std::make_shared<tablet::TableInfo>( |
1263 | 81.5k | req->table_id(), req->namespace_name(), req->table_name(), req->table_type(), schema, |
1264 | 81.5k | IndexMap(), |
1265 | 72.1k | req->has_index_info() ? boost::optional<IndexInfo>(req->index_info()) : boost::none, |
1266 | 81.5k | 0 /* schema_version */, partition_schema); |
1267 | 81.5k | std::vector<SnapshotScheduleId> snapshot_schedules; |
1268 | 81.5k | snapshot_schedules.reserve(req->snapshot_schedules().size()); |
1269 | 0 | for (const auto& id : req->snapshot_schedules()) { |
1270 | 0 | snapshot_schedules.push_back(VERIFY_RESULT(FullyDecodeSnapshotScheduleId(id))); |
1271 | 0 | } |
1272 | 81.5k | status = ResultToStatus(server_->tablet_manager()->CreateNewTablet( |
1273 | 81.5k | table_info, req->tablet_id(), partition, req->config(), req->colocated(), |
1274 | 81.5k | snapshot_schedules)); |
1275 | 81.5k | if (PREDICT_FALSE(!status.ok())) { |
1276 | 6 | return status.IsAlreadyPresent() |
1277 | 5 | ? status.CloneAndAddErrorCode(TabletServerError(TabletServerErrorPB::TABLET_ALREADY_EXISTS)) |
1278 | 1 | : status; |
1279 | 6 | } |
1280 | 81.5k | return Status::OK(); |
1281 | 81.5k | } |
1282 | | |
1283 | | void TabletServiceAdminImpl::DeleteTablet(const DeleteTabletRequestPB* req, |
1284 | | DeleteTabletResponsePB* resp, |
1285 | 49.3k | rpc::RpcContext context) { |
1286 | 49.3k | if (PREDICT_FALSE(FLAGS_TEST_rpc_delete_tablet_fail)) { |
1287 | 0 | context.RespondFailure(STATUS(NetworkError, "Simulating network partition for test")); |
1288 | 0 | return; |
1289 | 0 | } |
1290 | | |
1291 | 49.3k | if (!CheckUuidMatchOrRespond(server_->tablet_manager(), "DeleteTablet", req, resp, &context)) { |
1292 | 0 | return; |
1293 | 0 | } |
1294 | 49.3k | TRACE_EVENT2("tserver", "DeleteTablet", |
1295 | 49.3k | "tablet_id", req->tablet_id(), |
1296 | 49.3k | "reason", req->reason()); |
1297 | | |
1298 | 49.3k | tablet::TabletDataState delete_type = tablet::TABLET_DATA_UNKNOWN; |
1299 | 49.3k | if (req->has_delete_type()) { |
1300 | 49.0k | delete_type = req->delete_type(); |
1301 | 49.0k | } |
1302 | 49.3k | LOG(INFO) << "T " << req->tablet_id() << " P " << server_->permanent_uuid() |
1303 | 49.3k | << ": Processing DeleteTablet with delete_type " << TabletDataState_Name(delete_type) |
1304 | 49.2k | << (req->has_reason() ? (" (" + req->reason() + ")") : "") |
1305 | 49.3k | << (req->hide_only() ? " (Hide only)" : "") |
1306 | 49.3k | << " from " << context.requestor_string(); |
1307 | 18.4E | VLOG(1) << "Full request: " << req->DebugString(); |
1308 | | |
1309 | 49.3k | boost::optional<int64_t> cas_config_opid_index_less_or_equal; |
1310 | 49.3k | if (req->has_cas_config_opid_index_less_or_equal()) { |
1311 | 2.42k | cas_config_opid_index_less_or_equal = req->cas_config_opid_index_less_or_equal(); |
1312 | 2.42k | } |
1313 | 49.3k | boost::optional<TabletServerErrorPB::Code> error_code; |
1314 | 49.3k | Status s = server_->tablet_manager()->DeleteTablet(req->tablet_id(), |
1315 | 49.3k | delete_type, |
1316 | 49.3k | cas_config_opid_index_less_or_equal, |
1317 | 49.3k | req->hide_only(), |
1318 | 49.3k | &error_code); |
1319 | 49.3k | if (PREDICT_FALSE(!s.ok())) { |
1320 | 1.76k | HandleErrorResponse(resp, &context, s, error_code); |
1321 | 1.76k | return; |
1322 | 1.76k | } |
1323 | 47.5k | context.RespondSuccess(); |
1324 | 47.5k | } |
1325 | | |
1326 | | // TODO(sagnik): Modify this to actually create a copartitioned table |
1327 | | void TabletServiceAdminImpl::CopartitionTable(const CopartitionTableRequestPB* req, |
1328 | | CopartitionTableResponsePB* resp, |
1329 | 0 | rpc::RpcContext context) { |
1330 | 0 | context.RespondSuccess(); |
1331 | 0 | LOG(INFO) << "tserver doesn't support co-partitioning yet"; |
1332 | 0 | } |
1333 | | |
1334 | | void TabletServiceAdminImpl::FlushTablets(const FlushTabletsRequestPB* req, |
1335 | | FlushTabletsResponsePB* resp, |
1336 | 30 | rpc::RpcContext context) { |
1337 | 30 | if (!CheckUuidMatchOrRespond(server_->tablet_manager(), "FlushTablets", req, resp, &context)) { |
1338 | 0 | return; |
1339 | 0 | } |
1340 | | |
1341 | 30 | if (!req->all_tablets() && req->tablet_ids_size() == 0) { |
1342 | 0 | const Status s = STATUS(InvalidArgument, "No tablet ids"); |
1343 | 0 | SetupErrorAndRespond(resp->mutable_error(), s, &context); |
1344 | 0 | return; |
1345 | 0 | } |
1346 | | |
1347 | 30 | server::UpdateClock(*req, server_->Clock()); |
1348 | | |
1349 | 30 | TRACE_EVENT1("tserver", "FlushTablets", |
1350 | 30 | "TS: ", req->dest_uuid()); |
1351 | | |
1352 | 30 | LOG(INFO) << "Processing FlushTablets from " << context.requestor_string(); |
1353 | 0 | VLOG(1) << "Full FlushTablets request: " << req->DebugString(); |
1354 | 30 | TabletPeers tablet_peers; |
1355 | 30 | TSTabletManager::TabletPtrs tablet_ptrs; |
1356 | | |
1357 | 30 | if (req->all_tablets()) { |
1358 | 4 | tablet_peers = server_->tablet_manager()->GetTabletPeers(&tablet_ptrs); |
1359 | 26 | } else { |
1360 | 26 | for (const TabletId& id : req->tablet_ids()) { |
1361 | 26 | auto tablet_peer = VERIFY_RESULT_OR_RETURN(LookupTabletPeerOrRespond( |
1362 | 26 | server_->tablet_peer_lookup(), id, resp, &context)); |
1363 | 26 | tablet_peers.push_back(std::move(tablet_peer.tablet_peer)); |
1364 | 26 | auto tablet = tablet_peer.tablet; |
1365 | 26 | if (tablet != nullptr) { |
1366 | 26 | tablet_ptrs.push_back(std::move(tablet)); |
1367 | 26 | } |
1368 | 26 | } |
1369 | 26 | } |
1370 | 30 | switch (req->operation()) { |
1371 | 26 | case FlushTabletsRequestPB::FLUSH: |
1372 | 26 | for (const tablet::TabletPtr& tablet : tablet_ptrs) { |
1373 | 26 | resp->set_failed_tablet_id(tablet->tablet_id()); |
1374 | 26 | RETURN_UNKNOWN_ERROR_IF_NOT_OK(tablet->Flush(tablet::FlushMode::kAsync), resp, &context); |
1375 | 26 | resp->clear_failed_tablet_id(); |
1376 | 26 | } |
1377 | | |
1378 | | // Wait for end of all flush operations. |
1379 | 26 | for (const tablet::TabletPtr& tablet : tablet_ptrs) { |
1380 | 26 | resp->set_failed_tablet_id(tablet->tablet_id()); |
1381 | 26 | RETURN_UNKNOWN_ERROR_IF_NOT_OK(tablet->WaitForFlush(), resp, &context); |
1382 | 26 | resp->clear_failed_tablet_id(); |
1383 | 26 | } |
1384 | 26 | break; |
1385 | 0 | case FlushTabletsRequestPB::COMPACT: |
1386 | 0 | RETURN_UNKNOWN_ERROR_IF_NOT_OK( |
1387 | 0 | server_->tablet_manager()->TriggerCompactionAndWait(tablet_ptrs), resp, &context); |
1388 | 0 | break; |
1389 | 4 | case FlushTabletsRequestPB::LOG_GC: |
1390 | 4 | for (const auto& tablet : tablet_peers) { |
1391 | 4 | resp->set_failed_tablet_id(tablet->tablet_id()); |
1392 | 4 | RETURN_UNKNOWN_ERROR_IF_NOT_OK(tablet->RunLogGC(), resp, &context); |
1393 | 4 | resp->clear_failed_tablet_id(); |
1394 | 4 | } |
1395 | 4 | break; |
1396 | 30 | } |
1397 | | |
1398 | 30 | context.RespondSuccess(); |
1399 | 30 | } |
1400 | | |
1401 | | void TabletServiceAdminImpl::CountIntents( |
1402 | | const CountIntentsRequestPB* req, |
1403 | | CountIntentsResponsePB* resp, |
1404 | 0 | rpc::RpcContext context) { |
1405 | 0 | TSTabletManager::TabletPtrs tablet_ptrs; |
1406 | 0 | TabletPeers tablet_peers = server_->tablet_manager()->GetTabletPeers(&tablet_ptrs); |
1407 | 0 | int64_t total_intents = 0; |
1408 | | // TODO: do this in parallel. |
1409 | | // TODO: per-tablet intent counts. |
1410 | 0 | for (const auto& tablet : tablet_ptrs) { |
1411 | 0 | auto num_intents = tablet->CountIntents(); |
1412 | 0 | if (!num_intents.ok()) { |
1413 | 0 | SetupErrorAndRespond(resp->mutable_error(), num_intents.status(), &context); |
1414 | 0 | return; |
1415 | 0 | } |
1416 | 0 | total_intents += *num_intents; |
1417 | 0 | } |
1418 | 0 | resp->set_num_intents(total_intents); |
1419 | 0 | context.RespondSuccess(); |
1420 | 0 | } |
1421 | | |
1422 | | void TabletServiceAdminImpl::AddTableToTablet( |
1423 | | const AddTableToTabletRequestPB* req, AddTableToTabletResponsePB* resp, |
1424 | 27 | rpc::RpcContext context) { |
1425 | 27 | auto tablet_id = req->tablet_id(); |
1426 | | |
1427 | 27 | const auto tablet = |
1428 | 27 | LookupLeaderTabletOrRespond(server_->tablet_peer_lookup(), tablet_id, resp, &context); |
1429 | 27 | if (!tablet) { |
1430 | 0 | return; |
1431 | 0 | } |
1432 | 0 | DVLOG(3) << "Received AddTableToTablet RPC: " << yb::ToString(*req); |
1433 | | |
1434 | 27 | tablet::ChangeMetadataRequestPB change_req; |
1435 | 27 | *change_req.mutable_add_table() = req->add_table(); |
1436 | 27 | change_req.set_tablet_id(tablet_id); |
1437 | 27 | Status s = tablet::SyncReplicateChangeMetadataOperation( |
1438 | 27 | &change_req, tablet.peer.get(), tablet.leader_term); |
1439 | 27 | if (PREDICT_FALSE(!s.ok())) { |
1440 | 0 | SetupErrorAndRespond(resp->mutable_error(), s, &context); |
1441 | 0 | return; |
1442 | 0 | } |
1443 | 27 | context.RespondSuccess(); |
1444 | 27 | } |
1445 | | |
1446 | | void TabletServiceAdminImpl::RemoveTableFromTablet( |
1447 | | const RemoveTableFromTabletRequestPB* req, |
1448 | | RemoveTableFromTabletResponsePB* resp, |
1449 | 15 | rpc::RpcContext context) { |
1450 | 15 | auto tablet = |
1451 | 15 | LookupLeaderTabletOrRespond(server_->tablet_peer_lookup(), req->tablet_id(), resp, &context); |
1452 | 15 | if (!tablet) { |
1453 | 0 | return; |
1454 | 0 | } |
1455 | | |
1456 | 15 | tablet::ChangeMetadataRequestPB change_req; |
1457 | 15 | change_req.set_remove_table_id(req->remove_table_id()); |
1458 | 15 | change_req.set_tablet_id(req->tablet_id()); |
1459 | 15 | Status s = tablet::SyncReplicateChangeMetadataOperation( |
1460 | 15 | &change_req, tablet.peer.get(), tablet.leader_term); |
1461 | 15 | if (PREDICT_FALSE(!s.ok())) { |
1462 | 0 | SetupErrorAndRespond(resp->mutable_error(), s, &context); |
1463 | 0 | return; |
1464 | 0 | } |
1465 | 15 | context.RespondSuccess(); |
1466 | 15 | } |
1467 | | |
1468 | | void TabletServiceAdminImpl::SplitTablet( |
1469 | 44 | const tablet::SplitTabletRequestPB* req, SplitTabletResponsePB* resp, rpc::RpcContext context) { |
1470 | 44 | if (!CheckUuidMatchOrRespond(server_->tablet_manager(), "SplitTablet", req, resp, &context)) { |
1471 | 0 | return; |
1472 | 0 | } |
1473 | 44 | if (PREDICT_FALSE(FLAGS_TEST_fail_tablet_split_probability > 0) && |
1474 | 0 | RandomActWithProbability(FLAGS_TEST_fail_tablet_split_probability)) { |
1475 | 0 | return SetupErrorAndRespond( |
1476 | 0 | resp->mutable_error(), |
1477 | 0 | STATUS(InvalidArgument, // Use InvalidArgument to hit IsDefinitelyPermanentError(). |
1478 | 0 | "Failing tablet split due to FLAGS_TEST_fail_tablet_split_probability"), |
1479 | 0 | TabletServerErrorPB::UNKNOWN_ERROR, |
1480 | 0 | &context); |
1481 | 0 | } |
1482 | 44 | TRACE_EVENT1("tserver", "SplitTablet", "tablet_id", req->tablet_id()); |
1483 | | |
1484 | 44 | server::UpdateClock(*req, server_->Clock()); |
1485 | 44 | auto leader_tablet_peer = |
1486 | 44 | LookupLeaderTabletOrRespond(server_->tablet_peer_lookup(), req->tablet_id(), resp, &context); |
1487 | 44 | if (!leader_tablet_peer) { |
1488 | 0 | return; |
1489 | 0 | } |
1490 | | |
1491 | 44 | { |
1492 | 44 | auto tablet_data_state = leader_tablet_peer.peer->data_state(); |
1493 | 44 | if (tablet_data_state != tablet::TABLET_DATA_READY) { |
1494 | 25 | auto s = tablet_data_state == tablet::TABLET_DATA_SPLIT_COMPLETED |
1495 | 25 | ? STATUS_FORMAT(AlreadyPresent, "Tablet $0 is already split.", req->tablet_id()) |
1496 | 0 | : STATUS_FORMAT( |
1497 | 25 | InvalidArgument, "Invalid tablet $0 data state: $1", req->tablet_id(), |
1498 | 25 | tablet_data_state); |
1499 | 25 | SetupErrorAndRespond( |
1500 | 25 | resp->mutable_error(), s, TabletServerErrorPB::TABLET_NOT_RUNNING, &context); |
1501 | 25 | return; |
1502 | 25 | } |
1503 | 19 | } |
1504 | | |
1505 | 19 | auto state = std::make_unique<tablet::SplitOperation>( |
1506 | 19 | leader_tablet_peer.peer->tablet(), server_->tablet_manager(), req); |
1507 | | |
1508 | 19 | state->set_completion_callback( |
1509 | 19 | MakeRpcOperationCompletionCallback(std::move(context), resp, server_->Clock())); |
1510 | | |
1511 | 19 | leader_tablet_peer.peer->Submit(std::move(state), leader_tablet_peer.leader_term); |
1512 | 19 | } |
1513 | | |
1514 | | void TabletServiceAdminImpl::UpgradeYsql( |
1515 | | const UpgradeYsqlRequestPB* req, |
1516 | | UpgradeYsqlResponsePB* resp, |
1517 | 0 | rpc::RpcContext context) { |
1518 | 0 | LOG(INFO) << "Starting YSQL upgrade"; |
1519 | |
|
1520 | 0 | pgwrapper::YsqlUpgradeHelper upgrade_helper(server_->pgsql_proxy_bind_address(), |
1521 | 0 | server_->GetSharedMemoryPostgresAuthKey(), |
1522 | 0 | FLAGS_heartbeat_interval_ms); |
1523 | 0 | const auto status = upgrade_helper.Upgrade(); |
1524 | 0 | if (!status.ok()) { |
1525 | 0 | LOG(INFO) << "YSQL upgrade failed: " << status; |
1526 | 0 | SetupErrorAndRespond(resp->mutable_error(), status, &context); |
1527 | 0 | return; |
1528 | 0 | } |
1529 | | |
1530 | 0 | LOG(INFO) << "YSQL upgrade done successfully"; |
1531 | 0 | context.RespondSuccess(); |
1532 | 0 | } |
1533 | | |
1534 | | |
1535 | 0 | bool EmptyWriteBatch(const docdb::KeyValueWriteBatchPB& write_batch) { |
1536 | 0 | return write_batch.write_pairs().empty() && write_batch.apply_external_transactions().empty(); |
1537 | 0 | } |
1538 | | |
1539 | | void TabletServiceImpl::Write(const WriteRequestPB* req, |
1540 | | WriteResponsePB* resp, |
1541 | 1.39M | rpc::RpcContext context) { |
1542 | 1.39M | if (FLAGS_TEST_tserver_noop_read_write) { |
1543 | 0 | for (int i = 0; i < req->ql_write_batch_size(); ++i) { |
1544 | 0 | resp->add_ql_response_batch(); |
1545 | 0 | } |
1546 | 0 | context.RespondSuccess(); |
1547 | 0 | return; |
1548 | 0 | } |
1549 | 1.39M | TRACE("Start Write"); |
1550 | 1.39M | TRACE_EVENT1("tserver", "TabletServiceImpl::Write", |
1551 | 1.39M | "tablet_id", req->tablet_id()); |
1552 | 147 | VLOG(2) << "Received Write RPC: " << req->DebugString(); |
1553 | 1.39M | UpdateClock(*req, server_->Clock()); |
1554 | | |
1555 | 1.39M | auto tablet = LookupLeaderTabletOrRespond( |
1556 | 1.39M | server_->tablet_peer_lookup(), req->tablet_id(), resp, &context); |
1557 | 1.39M | if (!tablet || |
1558 | 1.34M | !CheckWriteThrottlingOrRespond( |
1559 | 45.6k | req->rejection_score(), tablet.peer.get(), resp, &context)) { |
1560 | 45.6k | return; |
1561 | 45.6k | } |
1562 | | |
1563 | 1.34M | if (tablet.peer->tablet()->metadata()->hidden()) { |
1564 | 0 | auto status = STATUS(NotFound, "Tablet not found", req->tablet_id()); |
1565 | 0 | SetupErrorAndRespond( |
1566 | 0 | resp->mutable_error(), status, TabletServerErrorPB::TABLET_NOT_FOUND, &context); |
1567 | 0 | return; |
1568 | 0 | } |
1569 | | |
1570 | | #if defined(DUMP_WRITE) |
1571 | | if (req->has_write_batch() && req->write_batch().has_transaction()) { |
1572 | | VLOG(1) << "Write with transaction: " << req->write_batch().transaction().ShortDebugString(); |
1573 | | if (req->pgsql_write_batch_size() != 0) { |
1574 | | auto txn_id = CHECK_RESULT(FullyDecodeTransactionId( |
1575 | | req->write_batch().transaction().transaction_id())); |
1576 | | for (const auto& entry : req->pgsql_write_batch()) { |
1577 | | if (entry.stmt_type() == PgsqlWriteRequestPB::PGSQL_UPDATE) { |
1578 | | auto key = entry.column_new_values(0).expr().value().int32_value(); |
1579 | | LOG(INFO) << txn_id << " UPDATE: " << key << " = " |
1580 | | << entry.column_new_values(1).expr().value().string_value(); |
1581 | | } else if ( |
1582 | | entry.stmt_type() == PgsqlWriteRequestPB::PGSQL_INSERT || |
1583 | | entry.stmt_type() == PgsqlWriteRequestPB::PGSQL_UPSERT) { |
1584 | | docdb::DocKey doc_key; |
1585 | | CHECK_OK(doc_key.FullyDecodeFrom(entry.ybctid_column_value().value().binary_value())); |
1586 | | LOG(INFO) << txn_id << " INSERT: " << doc_key.hashed_group()[0].GetInt32() << " = " |
1587 | | << entry.column_values(0).expr().value().string_value(); |
1588 | | } else if (entry.stmt_type() == PgsqlWriteRequestPB::PGSQL_DELETE) { |
1589 | | LOG(INFO) << txn_id << " DELETE: " << entry.ShortDebugString(); |
1590 | | } |
1591 | | } |
1592 | | } |
1593 | | } |
1594 | | #endif |
1595 | | |
1596 | 1.34M | if (PREDICT_FALSE(req->has_write_batch() && !req->has_external_hybrid_time() && |
1597 | 0 | (!req->write_batch().write_pairs().empty() || !req->write_batch().read_pairs().empty()))) { |
1598 | 0 | Status s = STATUS(NotSupported, "Write Request contains write batch. This field should be " |
1599 | 0 | "used only for post-processed write requests during " |
1600 | 0 | "Raft replication."); |
1601 | 0 | SetupErrorAndRespond(resp->mutable_error(), s, |
1602 | 0 | TabletServerErrorPB::INVALID_MUTATION, |
1603 | 0 | &context); |
1604 | 0 | return; |
1605 | 0 | } |
1606 | | |
1607 | 1.34M | bool has_operations = req->ql_write_batch_size() != 0 || |
1608 | 319k | req->redis_write_batch_size() != 0 || |
1609 | 258k | req->pgsql_write_batch_size() != 0 || |
1610 | 1 | (req->has_external_hybrid_time() && !EmptyWriteBatch(req->write_batch())); |
1611 | 1.34M | if (!has_operations && tablet.peer->tablet()->table_type() != TableType::REDIS_TABLE_TYPE) { |
1612 | | // An empty request. This is fine, can just exit early with ok status instead of working hard. |
1613 | | // This doesn't need to go to Raft log. |
1614 | 1 | MakeRpcOperationCompletionCallback<WriteResponsePB>( |
1615 | 1 | std::move(context), resp, server_->Clock())(Status::OK()); |
1616 | 1 | return; |
1617 | 1 | } |
1618 | | |
1619 | | // For postgres requests check that the syscatalog version matches. |
1620 | 1.34M | if (tablet.peer->tablet()->table_type() == TableType::PGSQL_TABLE_TYPE) { |
1621 | 214k | uint64_t last_breaking_catalog_version = 0; // unset. |
1622 | 1.90M | for (const auto& pg_req : req->pgsql_write_batch()) { |
1623 | 1.90M | if (pg_req.has_ysql_catalog_version()) { |
1624 | 1.90M | if (last_breaking_catalog_version == 0) { |
1625 | | // Initialize last breaking version if not yet set. |
1626 | 214k | server_->get_ysql_catalog_version(nullptr /* current_version */, |
1627 | 214k | &last_breaking_catalog_version); |
1628 | 214k | } |
1629 | 1.90M | if (pg_req.ysql_catalog_version() < last_breaking_catalog_version) { |
1630 | 1 | SetupErrorAndRespond(resp->mutable_error(), |
1631 | 1 | STATUS_SUBSTITUTE(QLError, "The catalog snapshot used for this " |
1632 | 1 | "transaction has been invalidated."), |
1633 | 1 | TabletServerErrorPB::MISMATCHED_SCHEMA, &context); |
1634 | 1 | return; |
1635 | 1 | } |
1636 | 1.90M | } |
1637 | 1.90M | } |
1638 | 214k | } |
1639 | | |
1640 | 1.34M | auto query = std::make_unique<tablet::WriteQuery>( |
1641 | 1.34M | tablet.leader_term, context.GetClientDeadline(), tablet.peer.get(), |
1642 | 1.34M | tablet.peer->tablet(), resp); |
1643 | 1.34M | query->set_client_request(*req); |
1644 | | |
1645 | 1.34M | auto context_ptr = std::make_shared<RpcContext>(std::move(context)); |
1646 | 1.34M | if (RandomActWithProbability(GetAtomicFlag(&FLAGS_TEST_respond_write_failed_probability))) { |
1647 | 0 | LOG(INFO) << "Responding with a failure to " << req->DebugString(); |
1648 | 0 | SetupErrorAndRespond(resp->mutable_error(), STATUS(LeaderHasNoLease, "TEST: Random failure"), |
1649 | 0 | context_ptr.get()); |
1650 | 1.34M | } else { |
1651 | 1.34M | query->set_callback(WriteQueryCompletionCallback( |
1652 | 1.34M | tablet.peer, context_ptr, resp, query.get(), server_->Clock(), req->include_trace())); |
1653 | 1.34M | } |
1654 | | |
1655 | 1.34M | query->AdjustYsqlQueryTransactionality(req->pgsql_write_batch_size()); |
1656 | | |
1657 | 1.34M | tablet.peer->WriteAsync(std::move(query)); |
1658 | 1.34M | } |
1659 | | |
1660 | | void TabletServiceImpl::Read(const ReadRequestPB* req, |
1661 | | ReadResponsePB* resp, |
1662 | 4.70M | rpc::RpcContext context) { |
1663 | 4.70M | if (FLAGS_TEST_tserver_noop_read_write) { |
1664 | 0 | context.RespondSuccess(); |
1665 | 0 | return; |
1666 | 0 | } |
1667 | | |
1668 | 4.70M | PerformRead(server_, this, req, resp, std::move(context)); |
1669 | 4.70M | } |
1670 | | |
1671 | | ConsensusServiceImpl::ConsensusServiceImpl(const scoped_refptr<MetricEntity>& metric_entity, |
1672 | | TabletPeerLookupIf* tablet_manager) |
1673 | | : ConsensusServiceIf(metric_entity), |
1674 | 11.2k | tablet_manager_(tablet_manager) { |
1675 | 11.2k | } |
1676 | | |
1677 | 160 | ConsensusServiceImpl::~ConsensusServiceImpl() { |
1678 | 160 | } |
1679 | | |
1680 | | void ConsensusServiceImpl::CompleteUpdateConsensusResponse( |
1681 | | std::shared_ptr<tablet::TabletPeer> tablet_peer, |
1682 | 10.2M | consensus::ConsensusResponsePB* resp) { |
1683 | 10.2M | auto tablet = tablet_peer->shared_tablet(); |
1684 | 10.2M | if (tablet) { |
1685 | 10.2M | resp->set_num_sst_files(tablet->GetCurrentVersionNumSSTFiles()); |
1686 | 10.2M | } |
1687 | 10.2M | resp->set_propagated_hybrid_time(tablet_peer->clock().Now().ToUint64()); |
1688 | 10.2M | } |
1689 | | |
1690 | | void ConsensusServiceImpl::MultiRaftUpdateConsensus( |
1691 | | const consensus::MultiRaftConsensusRequestPB *req, |
1692 | | consensus::MultiRaftConsensusResponsePB *resp, |
1693 | 0 | rpc::RpcContext context) { |
1694 | 0 | DVLOG(3) << "Received Batch Consensus Update RPC: " << req->ShortDebugString(); |
1695 | | // Effectively performs ConsensusServiceImpl::UpdateConsensus for |
1696 | | // each ConsensusRequestPB in the batch but does not fail the entire |
1697 | | // batch if a single request fails. |
1698 | 0 | for (int i = 0; i < req->consensus_request_size(); i++) { |
1699 | | // Unfortunately, we have to use const_cast here, |
1700 | | // because the protobuf-generated interface only gives us a const request |
1701 | | // but we need to be able to move messages out of the request for efficiency. |
1702 | 0 | auto consensus_req = const_cast<ConsensusRequestPB*>(&req->consensus_request(i)); |
1703 | 0 | auto consensus_resp = resp->add_consensus_response();; |
1704 | |
|
1705 | 0 | auto uuid_match_res = CheckUuidMatch(tablet_manager_, "UpdateConsensus", consensus_req, |
1706 | 0 | context.requestor_string()); |
1707 | 0 | if (!uuid_match_res.ok()) { |
1708 | 0 | SetupError(consensus_resp->mutable_error(), uuid_match_res.status()); |
1709 | 0 | continue; |
1710 | 0 | } |
1711 | | |
1712 | 0 | auto peer_tablet_res = LookupTabletPeer(tablet_manager_, consensus_req->tablet_id()); |
1713 | 0 | if (!peer_tablet_res.ok()) { |
1714 | 0 | SetupError(consensus_resp->mutable_error(), peer_tablet_res.status()); |
1715 | 0 | continue; |
1716 | 0 | } |
1717 | 0 | auto tablet_peer = peer_tablet_res.get().tablet_peer; |
1718 | | |
1719 | | // Submit the update directly to the TabletPeer's Consensus instance. |
1720 | 0 | auto consensus_res = GetConsensus(tablet_peer); |
1721 | 0 | if (!consensus_res.ok()) { |
1722 | 0 | SetupError(consensus_resp->mutable_error(), consensus_res.status()); |
1723 | 0 | continue; |
1724 | 0 | } |
1725 | 0 | auto consensus = *consensus_res; |
1726 | |
|
1727 | 0 | Status s = consensus->Update( |
1728 | 0 | consensus_req, consensus_resp, context.GetClientDeadline()); |
1729 | 0 | if (PREDICT_FALSE(!s.ok())) { |
1730 | | // Clear the response first, since a partially-filled response could |
1731 | | // result in confusing a caller, or in having missing required fields |
1732 | | // in embedded optional messages. |
1733 | 0 | consensus_resp->Clear(); |
1734 | 0 | SetupError(consensus_resp->mutable_error(), s); |
1735 | 0 | continue; |
1736 | 0 | } |
1737 | | |
1738 | 0 | CompleteUpdateConsensusResponse(tablet_peer, consensus_resp); |
1739 | 0 | } |
1740 | 0 | context.RespondSuccess(); |
1741 | 0 | } |
1742 | | |
1743 | | void ConsensusServiceImpl::UpdateConsensus(const ConsensusRequestPB* req, |
1744 | | ConsensusResponsePB* resp, |
1745 | 10.3M | rpc::RpcContext context) { |
1746 | 982 | DVLOG(3) << "Received Consensus Update RPC: " << req->ShortDebugString(); |
1747 | 10.3M | if (!CheckUuidMatchOrRespond(tablet_manager_, "UpdateConsensus", req, resp, &context)) { |
1748 | 12 | return; |
1749 | 12 | } |
1750 | 10.3M | auto peer_tablet = VERIFY_RESULT_OR_RETURN(LookupTabletPeerOrRespond( |
1751 | 10.2M | tablet_manager_, req->tablet_id(), resp, &context)); |
1752 | 10.2M | auto tablet_peer = peer_tablet.tablet_peer; |
1753 | | |
1754 | | // Submit the update directly to the TabletPeer's Consensus instance. |
1755 | 10.2M | shared_ptr<Consensus> consensus; |
1756 | 10.2M | if (!GetConsensusOrRespond(tablet_peer, resp, &context, &consensus)) return; |
1757 | | |
1758 | | // Unfortunately, we have to use const_cast here, because the protobuf-generated interface only |
1759 | | // gives us a const request, but we need to be able to move messages out of the request for |
1760 | | // efficiency. |
1761 | 10.2M | Status s = consensus->Update( |
1762 | 10.2M | const_cast<ConsensusRequestPB*>(req), resp, context.GetClientDeadline()); |
1763 | 10.2M | if (PREDICT_FALSE(!s.ok())) { |
1764 | | // Clear the response first, since a partially-filled response could |
1765 | | // result in confusing a caller, or in having missing required fields |
1766 | | // in embedded optional messages. |
1767 | 520 | resp->Clear(); |
1768 | | |
1769 | 520 | SetupErrorAndRespond(resp->mutable_error(), s, &context); |
1770 | 520 | return; |
1771 | 520 | } |
1772 | | |
1773 | 10.2M | CompleteUpdateConsensusResponse(tablet_peer, resp); |
1774 | | |
1775 | 10.2M | context.RespondSuccess(); |
1776 | 10.2M | } |
1777 | | |
1778 | | void ConsensusServiceImpl::RequestConsensusVote(const VoteRequestPB* req, |
1779 | | VoteResponsePB* resp, |
1780 | 144k | rpc::RpcContext context) { |
1781 | 121 | DVLOG(3) << "Received Consensus Request Vote RPC: " << req->DebugString(); |
1782 | 144k | if (!CheckUuidMatchOrRespond(tablet_manager_, "RequestConsensusVote", req, resp, &context)) { |
1783 | 8 | return; |
1784 | 8 | } |
1785 | 144k | auto peer_tablet = VERIFY_RESULT_OR_RETURN(LookupTabletPeerOrRespond( |
1786 | 125k | tablet_manager_, req->tablet_id(), resp, &context)); |
1787 | 125k | auto tablet_peer = peer_tablet.tablet_peer; |
1788 | | |
1789 | | // Submit the vote request directly to the consensus instance. |
1790 | 125k | shared_ptr<Consensus> consensus; |
1791 | 125k | if (!GetConsensusOrRespond(tablet_peer, resp, &context, &consensus)) return; |
1792 | 125k | Status s = consensus->RequestVote(req, resp); |
1793 | 125k | RETURN_UNKNOWN_ERROR_IF_NOT_OK(s, resp, &context); |
1794 | 125k | context.RespondSuccess(); |
1795 | 125k | } |
1796 | | |
1797 | | void ConsensusServiceImpl::ChangeConfig(const ChangeConfigRequestPB* req, |
1798 | | ChangeConfigResponsePB* resp, |
1799 | 2.50k | RpcContext context) { |
1800 | 0 | VLOG(1) << "Received ChangeConfig RPC: " << req->ShortDebugString(); |
1801 | | // If the destination uuid is empty string, it means the client was retrying after a leader |
1802 | | // stepdown and did not have a chance to update the uuid inside the request. |
1803 | | // TODO: Note that this can be removed once Java YBClient will reset change config's uuid |
1804 | | // correctly after leader step down. |
1805 | 2.50k | if (req->dest_uuid() != "" && |
1806 | 2.41k | !CheckUuidMatchOrRespond(tablet_manager_, "ChangeConfig", req, resp, &context)) { |
1807 | 0 | return; |
1808 | 0 | } |
1809 | 2.50k | auto peer_tablet = VERIFY_RESULT_OR_RETURN(LookupTabletPeerOrRespond( |
1810 | 2.50k | tablet_manager_, req->tablet_id(), resp, &context)); |
1811 | 2.50k | auto tablet_peer = peer_tablet.tablet_peer; |
1812 | | |
1813 | 2.50k | shared_ptr<Consensus> consensus; |
1814 | 2.50k | if (!GetConsensusOrRespond(tablet_peer, resp, &context, &consensus)) return; |
1815 | 2.50k | boost::optional<TabletServerErrorPB::Code> error_code; |
1816 | 2.50k | std::shared_ptr<RpcContext> context_ptr = std::make_shared<RpcContext>(std::move(context)); |
1817 | 2.50k | Status s = consensus->ChangeConfig(*req, BindHandleResponse(resp, context_ptr), &error_code); |
1818 | 1 | VLOG(1) << "Sent ChangeConfig req " << req->ShortDebugString() << " to consensus layer."; |
1819 | 2.50k | if (PREDICT_FALSE(!s.ok())) { |
1820 | 717 | HandleErrorResponse(resp, context_ptr.get(), s, error_code); |
1821 | 717 | return; |
1822 | 717 | } |
1823 | | // The success case is handled when the callback fires. |
1824 | 2.50k | } |
1825 | | |
1826 | | void ConsensusServiceImpl::UnsafeChangeConfig(const UnsafeChangeConfigRequestPB* req, |
1827 | | UnsafeChangeConfigResponsePB* resp, |
1828 | 0 | RpcContext context) { |
1829 | 0 | VLOG(1) << "Received UnsafeChangeConfig RPC: " << req->ShortDebugString(); |
1830 | 0 | if (!CheckUuidMatchOrRespond(tablet_manager_, "UnsafeChangeConfig", req, resp, &context)) { |
1831 | 0 | return; |
1832 | 0 | } |
1833 | 0 | auto peer_tablet = VERIFY_RESULT_OR_RETURN(LookupTabletPeerOrRespond( |
1834 | 0 | tablet_manager_, req->tablet_id(), resp, &context)); |
1835 | 0 | auto tablet_peer = peer_tablet.tablet_peer; |
1836 | |
|
1837 | 0 | shared_ptr<Consensus> consensus; |
1838 | 0 | if (!GetConsensusOrRespond(tablet_peer, resp, &context, &consensus)) { |
1839 | 0 | return; |
1840 | 0 | } |
1841 | 0 | boost::optional<TabletServerErrorPB::Code> error_code; |
1842 | 0 | const Status s = consensus->UnsafeChangeConfig(*req, &error_code); |
1843 | 0 | if (PREDICT_FALSE(!s.ok())) { |
1844 | 0 | SetupErrorAndRespond(resp->mutable_error(), s, &context); |
1845 | 0 | HandleErrorResponse(resp, &context, s, error_code); |
1846 | 0 | return; |
1847 | 0 | } |
1848 | 0 | context.RespondSuccess(); |
1849 | 0 | } |
1850 | | |
1851 | | void ConsensusServiceImpl::GetNodeInstance(const GetNodeInstanceRequestPB* req, |
1852 | | GetNodeInstanceResponsePB* resp, |
1853 | 14.5k | rpc::RpcContext context) { |
1854 | 0 | DVLOG(3) << "Received Get Node Instance RPC: " << req->DebugString(); |
1855 | 14.5k | resp->mutable_node_instance()->CopyFrom(tablet_manager_->NodeInstance()); |
1856 | 14.5k | auto status = tablet_manager_->GetRegistration(resp->mutable_registration()); |
1857 | 14.5k | if (!status.ok()) { |
1858 | 0 | context.RespondFailure(status); |
1859 | 14.5k | } else { |
1860 | 14.5k | context.RespondSuccess(); |
1861 | 14.5k | } |
1862 | 14.5k | } |
1863 | | |
1864 | | namespace { |
1865 | | |
1866 | | class RpcScope { |
1867 | | public: |
1868 | | template<class Req, class Resp> |
1869 | | RpcScope(TabletPeerLookupIf* tablet_manager, |
1870 | | const char* method_name, |
1871 | | const Req* req, |
1872 | | Resp* resp, |
1873 | | rpc::RpcContext* context) |
1874 | 66.1k | : context_(context) { |
1875 | 66.1k | if (!CheckUuidMatchOrRespond(tablet_manager, method_name, req, resp, context)) { |
1876 | 0 | return; |
1877 | 0 | } |
1878 | 66.1k | auto peer_tablet = VERIFY_RESULT_OR_RETURN(LookupTabletPeerOrRespond( |
1879 | 42.5k | tablet_manager, req->tablet_id(), resp, context)); |
1880 | 42.5k | auto tablet_peer = peer_tablet.tablet_peer; |
1881 | | |
1882 | 42.5k | if (!GetConsensusOrRespond(tablet_peer, resp, context, &consensus_)) { |
1883 | 0 | return; |
1884 | 0 | } |
1885 | 42.5k | responded_ = false; |
1886 | 42.5k | } tablet_service.cc:_ZN2yb7tserver12_GLOBAL__N_18RpcScopeC2INS_9consensus26RunLeaderElectionRequestPBENS4_27RunLeaderElectionResponsePBEEEPNS0_18TabletPeerLookupIfEPKcPKT_PT0_PNS_3rpc10RpcContextE Line | Count | Source | 1874 | 56.5k | : context_(context) { | 1875 | 56.5k | if (!CheckUuidMatchOrRespond(tablet_manager, method_name, req, resp, context)) { | 1876 | 0 | return; | 1877 | 0 | } | 1878 | 56.5k | auto peer_tablet = VERIFY_RESULT_OR_RETURN(LookupTabletPeerOrRespond( | 1879 | 32.9k | tablet_manager, req->tablet_id(), resp, context)); | 1880 | 32.9k | auto tablet_peer = peer_tablet.tablet_peer; | 1881 | | | 1882 | 32.9k | if (!GetConsensusOrRespond(tablet_peer, resp, context, &consensus_)) { | 1883 | 0 | return; | 1884 | 0 | } | 1885 | 32.9k | responded_ = false; | 1886 | 32.9k | } |
tablet_service.cc:_ZN2yb7tserver12_GLOBAL__N_18RpcScopeC2INS_9consensus27LeaderElectionLostRequestPBENS4_28LeaderElectionLostResponsePBEEEPNS0_18TabletPeerLookupIfEPKcPKT_PT0_PNS_3rpc10RpcContextE Line | Count | Source | 1874 | 44 | : context_(context) { | 1875 | 44 | if (!CheckUuidMatchOrRespond(tablet_manager, method_name, req, resp, context)) { | 1876 | 0 | return; | 1877 | 0 | } | 1878 | 44 | auto peer_tablet = VERIFY_RESULT_OR_RETURN(LookupTabletPeerOrRespond( | 1879 | 44 | tablet_manager, req->tablet_id(), resp, context)); | 1880 | 44 | auto tablet_peer = peer_tablet.tablet_peer; | 1881 | | | 1882 | 44 | if (!GetConsensusOrRespond(tablet_peer, resp, context, &consensus_)) { | 1883 | 0 | return; | 1884 | 0 | } | 1885 | 44 | responded_ = false; | 1886 | 44 | } |
tablet_service.cc:_ZN2yb7tserver12_GLOBAL__N_18RpcScopeC2INS_9consensus23LeaderStepDownRequestPBENS4_24LeaderStepDownResponsePBEEEPNS0_18TabletPeerLookupIfEPKcPKT_PT0_PNS_3rpc10RpcContextE Line | Count | Source | 1874 | 5.88k | : context_(context) { | 1875 | 5.88k | if (!CheckUuidMatchOrRespond(tablet_manager, method_name, req, resp, context)) { | 1876 | 0 | return; | 1877 | 0 | } | 1878 | 5.88k | auto peer_tablet = VERIFY_RESULT_OR_RETURN(LookupTabletPeerOrRespond( | 1879 | 5.88k | tablet_manager, req->tablet_id(), resp, context)); | 1880 | 5.88k | auto tablet_peer = peer_tablet.tablet_peer; | 1881 | | | 1882 | 5.88k | if (!GetConsensusOrRespond(tablet_peer, resp, context, &consensus_)) { | 1883 | 0 | return; | 1884 | 0 | } | 1885 | 5.88k | responded_ = false; | 1886 | 5.88k | } |
tablet_service.cc:_ZN2yb7tserver12_GLOBAL__N_18RpcScopeC2INS_9consensus26GetConsensusStateRequestPBENS4_27GetConsensusStateResponsePBEEEPNS0_18TabletPeerLookupIfEPKcPKT_PT0_PNS_3rpc10RpcContextE Line | Count | Source | 1874 | 3.68k | : context_(context) { | 1875 | 3.68k | if (!CheckUuidMatchOrRespond(tablet_manager, method_name, req, resp, context)) { | 1876 | 0 | return; | 1877 | 0 | } | 1878 | 3.68k | auto peer_tablet = VERIFY_RESULT_OR_RETURN(LookupTabletPeerOrRespond( | 1879 | 3.68k | tablet_manager, req->tablet_id(), resp, context)); | 1880 | 3.68k | auto tablet_peer = peer_tablet.tablet_peer; | 1881 | | | 1882 | 3.68k | if (!GetConsensusOrRespond(tablet_peer, resp, context, &consensus_)) { | 1883 | 0 | return; | 1884 | 0 | } | 1885 | 3.68k | responded_ = false; | 1886 | 3.68k | } |
|
1887 | | |
1888 | 66.1k | ~RpcScope() { |
1889 | 66.1k | if (!responded_) { |
1890 | 42.5k | context_->RespondSuccess(); |
1891 | 42.5k | } |
1892 | 66.1k | } |
1893 | | |
1894 | | template<class Resp> |
1895 | 38.8k | void CheckStatus(const Status& status, Resp* resp) { |
1896 | 38.8k | if (!status.ok()) { |
1897 | 0 | LOG(INFO) << "Status failed: " << status.ToString(); |
1898 | 0 | SetupErrorAndRespond(resp->mutable_error(), status, context_); |
1899 | 0 | responded_ = true; |
1900 | 0 | } |
1901 | 38.8k | } tablet_service.cc:_ZN2yb7tserver12_GLOBAL__N_18RpcScope11CheckStatusINS_9consensus27RunLeaderElectionResponsePBEEEvRKNS_6StatusEPT_ Line | Count | Source | 1895 | 32.9k | void CheckStatus(const Status& status, Resp* resp) { | 1896 | 32.9k | if (!status.ok()) { | 1897 | 0 | LOG(INFO) << "Status failed: " << status.ToString(); | 1898 | 0 | SetupErrorAndRespond(resp->mutable_error(), status, context_); | 1899 | 0 | responded_ = true; | 1900 | 0 | } | 1901 | 32.9k | } |
tablet_service.cc:_ZN2yb7tserver12_GLOBAL__N_18RpcScope11CheckStatusINS_9consensus28LeaderElectionLostResponsePBEEEvRKNS_6StatusEPT_ Line | Count | Source | 1895 | 44 | void CheckStatus(const Status& status, Resp* resp) { | 1896 | 44 | if (!status.ok()) { | 1897 | 0 | LOG(INFO) << "Status failed: " << status.ToString(); | 1898 | 0 | SetupErrorAndRespond(resp->mutable_error(), status, context_); | 1899 | 0 | responded_ = true; | 1900 | 0 | } | 1901 | 44 | } |
tablet_service.cc:_ZN2yb7tserver12_GLOBAL__N_18RpcScope11CheckStatusINS_9consensus24LeaderStepDownResponsePBEEEvRKNS_6StatusEPT_ Line | Count | Source | 1895 | 5.88k | void CheckStatus(const Status& status, Resp* resp) { | 1896 | 5.88k | if (!status.ok()) { | 1897 | 0 | LOG(INFO) << "Status failed: " << status.ToString(); | 1898 | 0 | SetupErrorAndRespond(resp->mutable_error(), status, context_); | 1899 | 0 | responded_ = true; | 1900 | 0 | } | 1901 | 5.88k | } |
|
1902 | | |
1903 | 42.5k | Consensus* operator->() { |
1904 | 42.5k | return consensus_.get(); |
1905 | 42.5k | } |
1906 | | |
1907 | 66.1k | explicit operator bool() const { |
1908 | 66.1k | return !responded_; |
1909 | 66.1k | } |
1910 | | |
1911 | | private: |
1912 | | rpc::RpcContext* context_; |
1913 | | bool responded_ = true; |
1914 | | shared_ptr<Consensus> consensus_; |
1915 | | }; |
1916 | | |
1917 | | } // namespace |
1918 | | |
1919 | | void ConsensusServiceImpl::RunLeaderElection(const RunLeaderElectionRequestPB* req, |
1920 | | RunLeaderElectionResponsePB* resp, |
1921 | 56.5k | rpc::RpcContext context) { |
1922 | 60 | VLOG(1) << "Received Run Leader Election RPC: " << req->DebugString(); |
1923 | 56.5k | RpcScope scope(tablet_manager_, "RunLeaderElection", req, resp, &context); |
1924 | 56.5k | if (!scope) { |
1925 | 23.5k | return; |
1926 | 23.5k | } |
1927 | | |
1928 | 32.9k | Status s = scope->StartElection(consensus::LeaderElectionData { |
1929 | 32.9k | .mode = consensus::ElectionMode::ELECT_EVEN_IF_LEADER_IS_ALIVE, |
1930 | 32.9k | .pending_commit = req->has_committed_index(), |
1931 | 32.9k | .must_be_committed_opid = OpId::FromPB(req->committed_index()), |
1932 | 28.0k | .originator_uuid = req->has_originator_uuid() ? req->originator_uuid() : std::string(), |
1933 | 32.9k | .suppress_vote_request = consensus::TEST_SuppressVoteRequest(req->suppress_vote_request()), |
1934 | 32.9k | .initial_election = req->initial_election() }); |
1935 | 32.9k | scope.CheckStatus(s, resp); |
1936 | 32.9k | } |
1937 | | |
1938 | | void ConsensusServiceImpl::LeaderElectionLost(const consensus::LeaderElectionLostRequestPB *req, |
1939 | | consensus::LeaderElectionLostResponsePB *resp, |
1940 | 44 | ::yb::rpc::RpcContext context) { |
1941 | 44 | LOG(INFO) << "LeaderElectionLost, req: " << req->ShortDebugString(); |
1942 | 44 | RpcScope scope(tablet_manager_, "LeaderElectionLost", req, resp, &context); |
1943 | 44 | if (!scope) { |
1944 | 0 | return; |
1945 | 0 | } |
1946 | 44 | auto status = scope->ElectionLostByProtege(req->election_lost_by_uuid()); |
1947 | 44 | scope.CheckStatus(status, resp); |
1948 | 44 | LOG(INFO) << "LeaderElectionLost, outcome: " << (scope ? "success" : "failure") << "req: " |
1949 | 44 | << req->ShortDebugString(); |
1950 | 44 | } |
1951 | | |
1952 | | void ConsensusServiceImpl::LeaderStepDown(const LeaderStepDownRequestPB* req, |
1953 | | LeaderStepDownResponsePB* resp, |
1954 | 5.88k | RpcContext context) { |
1955 | 5.88k | LOG(INFO) << "Received Leader stepdown RPC: " << req->ShortDebugString(); |
1956 | | |
1957 | 5.88k | if (PREDICT_FALSE(FLAGS_TEST_leader_stepdown_delay_ms > 0)) { |
1958 | 3 | LOG(INFO) << "Delaying leader stepdown for " |
1959 | 3 | << FLAGS_TEST_leader_stepdown_delay_ms << " ms."; |
1960 | 3 | SleepFor(MonoDelta::FromMilliseconds(FLAGS_TEST_leader_stepdown_delay_ms)); |
1961 | 3 | } |
1962 | | |
1963 | 5.88k | RpcScope scope(tablet_manager_, "LeaderStepDown", req, resp, &context); |
1964 | 5.88k | if (!scope) { |
1965 | 0 | return; |
1966 | 0 | } |
1967 | 5.88k | Status s = scope->StepDown(req, resp); |
1968 | 5.88k | LOG(INFO) << "Leader stepdown request " << req->ShortDebugString() << " success. Resp code=" |
1969 | 5.88k | << TabletServerErrorPB::Code_Name(resp->error().code()); |
1970 | 5.88k | scope.CheckStatus(s, resp); |
1971 | 5.88k | } |
1972 | | |
1973 | | void ConsensusServiceImpl::GetLastOpId(const consensus::GetLastOpIdRequestPB *req, |
1974 | | consensus::GetLastOpIdResponsePB *resp, |
1975 | 576 | rpc::RpcContext context) { |
1976 | 0 | DVLOG(3) << "Received GetLastOpId RPC: " << req->DebugString(); |
1977 | | |
1978 | 576 | if (PREDICT_FALSE(req->opid_type() == consensus::UNKNOWN_OPID_TYPE)) { |
1979 | 0 | HandleErrorResponse(resp, &context, |
1980 | 0 | STATUS(InvalidArgument, "Invalid opid_type specified to GetLastOpId()")); |
1981 | 0 | return; |
1982 | 0 | } |
1983 | | |
1984 | 576 | if (!CheckUuidMatchOrRespond(tablet_manager_, "GetLastOpId", req, resp, &context)) { |
1985 | 0 | return; |
1986 | 0 | } |
1987 | 576 | auto peer_tablet = VERIFY_RESULT_OR_RETURN(LookupTabletPeerOrRespond( |
1988 | 557 | tablet_manager_, req->tablet_id(), resp, &context)); |
1989 | 557 | auto tablet_peer = peer_tablet.tablet_peer; |
1990 | | |
1991 | 557 | if (tablet_peer->state() != tablet::RUNNING) { |
1992 | 0 | SetupErrorAndRespond(resp->mutable_error(), |
1993 | 0 | STATUS(ServiceUnavailable, "Tablet Peer not in RUNNING state"), |
1994 | 0 | TabletServerErrorPB::TABLET_NOT_RUNNING, &context); |
1995 | 0 | return; |
1996 | 0 | } |
1997 | | |
1998 | 557 | auto consensus = GetConsensusOrRespond(tablet_peer, resp, &context); |
1999 | 557 | if (!consensus) return; |
2000 | 557 | auto op_id = req->has_op_type() |
2001 | 3 | ? consensus->TEST_GetLastOpIdWithType(req->opid_type(), req->op_type()) |
2002 | 554 | : consensus->GetLastOpId(req->opid_type()); |
2003 | | |
2004 | | // RETURN_UNKNOWN_ERROR_IF_NOT_OK does not support Result, so have to add extra check here. |
2005 | 557 | if (!op_id.ok()) { |
2006 | 3 | RETURN_UNKNOWN_ERROR_IF_NOT_OK(op_id.status(), resp, &context); |
2007 | 3 | } |
2008 | 554 | op_id->ToPB(resp->mutable_opid()); |
2009 | 554 | context.RespondSuccess(); |
2010 | 554 | } |
2011 | | |
2012 | | void ConsensusServiceImpl::GetConsensusState(const consensus::GetConsensusStateRequestPB *req, |
2013 | | consensus::GetConsensusStateResponsePB *resp, |
2014 | 3.68k | rpc::RpcContext context) { |
2015 | 1 | DVLOG(3) << "Received GetConsensusState RPC: " << req->DebugString(); |
2016 | | |
2017 | 3.68k | RpcScope scope(tablet_manager_, "GetConsensusState", req, resp, &context); |
2018 | 3.68k | if (!scope) { |
2019 | 2 | return; |
2020 | 2 | } |
2021 | 3.68k | ConsensusConfigType type = req->type(); |
2022 | 3.68k | if (PREDICT_FALSE(type != CONSENSUS_CONFIG_ACTIVE && type != CONSENSUS_CONFIG_COMMITTED)) { |
2023 | 0 | HandleErrorResponse(resp, &context, |
2024 | 0 | STATUS(InvalidArgument, Substitute("Unsupported ConsensusConfigType $0 ($1)", |
2025 | 0 | ConsensusConfigType_Name(type), type))); |
2026 | 0 | return; |
2027 | 0 | } |
2028 | 3.68k | LeaderLeaseStatus leader_lease_status; |
2029 | 3.68k | *resp->mutable_cstate() = scope->ConsensusState(req->type(), &leader_lease_status); |
2030 | 3.68k | resp->set_leader_lease_status(leader_lease_status); |
2031 | 3.68k | } |
2032 | | |
2033 | | void ConsensusServiceImpl::StartRemoteBootstrap(const StartRemoteBootstrapRequestPB* req, |
2034 | | StartRemoteBootstrapResponsePB* resp, |
2035 | 4.91k | rpc::RpcContext context) { |
2036 | 4.91k | if (!CheckUuidMatchOrRespond(tablet_manager_, "StartRemoteBootstrap", req, resp, &context)) { |
2037 | 0 | return; |
2038 | 0 | } |
2039 | 4.91k | if (req->has_split_parent_tablet_id() |
2040 | 84 | && !PREDICT_FALSE(FLAGS_TEST_disable_post_split_tablet_rbs_check)) { |
2041 | | // For any tablet that was the result of a split, the raft group leader will always send the |
2042 | | // split_parent_tablet_id. However, our local tablet manager should only know about the parent |
2043 | | // if it was part of the raft group which committed the split to the parent, and if the parent |
2044 | | // tablet has yet to be deleted across the cluster. |
2045 | 82 | TabletPeerTablet result; |
2046 | 82 | if (tablet_manager_->GetTabletPeer(req->split_parent_tablet_id(), &result.tablet_peer).ok()) { |
2047 | 78 | YB_LOG_EVERY_N_SECS(WARNING, 30) |
2048 | 1 | << "Start remote bootstrap rejected: parent tablet not yet split."; |
2049 | 78 | SetupErrorAndRespond( |
2050 | 78 | resp->mutable_error(), |
2051 | 78 | STATUS(Incomplete, "Rejecting bootstrap request while parent tablet is present."), |
2052 | 78 | TabletServerErrorPB::TABLET_SPLIT_PARENT_STILL_LIVE, |
2053 | 78 | &context); |
2054 | 78 | return; |
2055 | 78 | } |
2056 | 4.83k | } |
2057 | | |
2058 | 4.83k | Status s = tablet_manager_->StartRemoteBootstrap(*req); |
2059 | 4.83k | if (!s.ok()) { |
2060 | | // Using Status::AlreadyPresent for a remote bootstrap operation that is already in progress. |
2061 | 3.83k | if (s.IsAlreadyPresent()) { |
2062 | 3.82k | YB_LOG_EVERY_N_SECS(WARNING, 30) << "Start remote bootstrap failed: " << s; |
2063 | 3.82k | SetupErrorAndRespond(resp->mutable_error(), s, TabletServerErrorPB::ALREADY_IN_PROGRESS, |
2064 | 3.82k | &context); |
2065 | 3.82k | return; |
2066 | 16 | } else { |
2067 | 16 | LOG(WARNING) << "Start remote bootstrap failed: " << s; |
2068 | 16 | } |
2069 | 3.83k | } |
2070 | | |
2071 | 1.01k | RETURN_UNKNOWN_ERROR_IF_NOT_OK(s, resp, &context); |
2072 | 996 | context.RespondSuccess(); |
2073 | 996 | } |
2074 | | |
2075 | | void TabletServiceImpl::NoOp(const NoOpRequestPB *req, |
2076 | | NoOpResponsePB *resp, |
2077 | 0 | rpc::RpcContext context) { |
2078 | 0 | context.RespondSuccess(); |
2079 | 0 | } |
2080 | | |
2081 | | void TabletServiceImpl::Publish( |
2082 | 0 | const PublishRequestPB* req, PublishResponsePB* resp, rpc::RpcContext context) { |
2083 | 0 | rpc::Publisher* publisher = server_->GetPublisher(); |
2084 | 0 | resp->set_num_clients_forwarded_to(publisher ? (*publisher)(req->channel(), req->message()) : 0); |
2085 | 0 | context.RespondSuccess(); |
2086 | 0 | } |
2087 | | |
2088 | | void TabletServiceImpl::ListTablets(const ListTabletsRequestPB* req, |
2089 | | ListTabletsResponsePB* resp, |
2090 | 213 | rpc::RpcContext context) { |
2091 | 213 | TabletPeers peers = server_->tablet_manager()->GetTabletPeers(); |
2092 | 213 | RepeatedPtrField<StatusAndSchemaPB>* peer_status = resp->mutable_status_and_schema(); |
2093 | 3.14k | for (const TabletPeerPtr& peer : peers) { |
2094 | 3.14k | StatusAndSchemaPB* status = peer_status->Add(); |
2095 | 3.14k | peer->GetTabletStatusPB(status->mutable_tablet_status()); |
2096 | 3.14k | SchemaToPB(*peer->status_listener()->schema(), status->mutable_schema()); |
2097 | 3.14k | peer->tablet_metadata()->partition_schema()->ToPB(status->mutable_partition_schema()); |
2098 | 3.14k | } |
2099 | 213 | context.RespondSuccess(); |
2100 | 213 | } |
2101 | | |
2102 | | void TabletServiceImpl::GetMasterAddresses(const GetMasterAddressesRequestPB* req, |
2103 | | GetMasterAddressesResponsePB* resp, |
2104 | 6 | rpc::RpcContext context) { |
2105 | 6 | resp->set_master_addresses(server::MasterAddressesToString( |
2106 | 6 | *server_->tablet_manager()->server()->options().GetMasterAddresses())); |
2107 | 6 | context.RespondSuccess(); |
2108 | 6 | } |
2109 | | |
2110 | | void TabletServiceImpl::GetLogLocation( |
2111 | | const GetLogLocationRequestPB* req, |
2112 | | GetLogLocationResponsePB* resp, |
2113 | 0 | rpc::RpcContext context) { |
2114 | 0 | resp->set_log_location(FLAGS_log_dir); |
2115 | 0 | context.RespondSuccess(); |
2116 | 0 | } |
2117 | | |
2118 | | void TabletServiceImpl::ListTabletsForTabletServer(const ListTabletsForTabletServerRequestPB* req, |
2119 | | ListTabletsForTabletServerResponsePB* resp, |
2120 | 892 | rpc::RpcContext context) { |
2121 | | // Replicating logic from path-handlers. |
2122 | 892 | TabletPeers peers = server_->tablet_manager()->GetTabletPeers(); |
2123 | 3.77k | for (const TabletPeerPtr& peer : peers) { |
2124 | 3.77k | TabletStatusPB status; |
2125 | 3.77k | peer->GetTabletStatusPB(&status); |
2126 | | |
2127 | 3.77k | ListTabletsForTabletServerResponsePB::Entry* data_entry = resp->add_entries(); |
2128 | 3.77k | data_entry->set_table_name(status.table_name()); |
2129 | 3.77k | data_entry->set_tablet_id(status.tablet_id()); |
2130 | | |
2131 | 3.77k | std::shared_ptr<consensus::Consensus> consensus = peer->shared_consensus(); |
2132 | 3.77k | data_entry->set_is_leader(consensus && consensus->role() == PeerRole::LEADER); |
2133 | 3.77k | data_entry->set_state(status.state()); |
2134 | | |
2135 | 3.77k | auto tablet = peer->shared_tablet(); |
2136 | 3.74k | uint64_t num_sst_files = tablet ? tablet->GetCurrentVersionNumSSTFiles() : 0; |
2137 | 3.77k | data_entry->set_num_sst_files(num_sst_files); |
2138 | | |
2139 | 3.77k | uint64_t num_log_segments = peer->GetNumLogSegments(); |
2140 | 3.77k | data_entry->set_num_log_segments(num_log_segments); |
2141 | | |
2142 | 3.74k | auto num_memtables = tablet ? tablet->GetNumMemtables() : std::make_pair(0, 0); |
2143 | 3.77k | data_entry->set_num_memtables_intents(num_memtables.first); |
2144 | 3.77k | data_entry->set_num_memtables_regular(num_memtables.second); |
2145 | 3.77k | } |
2146 | | |
2147 | 892 | context.RespondSuccess(); |
2148 | 892 | } |
2149 | | |
2150 | | namespace { |
2151 | | |
2152 | 2.89k | Result<uint64_t> CalcChecksum(tablet::Tablet* tablet, CoarseTimePoint deadline) { |
2153 | 2.89k | const shared_ptr<Schema> schema = tablet->metadata()->schema(); |
2154 | 2.89k | auto client_schema = schema->CopyWithoutColumnIds(); |
2155 | 2.89k | auto iter = tablet->NewRowIterator(client_schema, {}, "", deadline); |
2156 | 2.89k | RETURN_NOT_OK(iter); |
2157 | | |
2158 | 2.89k | QLTableRow value_map; |
2159 | 2.89k | ScanResultChecksummer collector; |
2160 | | |
2161 | 1.73M | while (VERIFY_RESULT((**iter).HasNext())) { |
2162 | 1.73M | RETURN_NOT_OK((**iter).NextRow(&value_map)); |
2163 | 1.73M | collector.HandleRow(*schema, value_map); |
2164 | 1.73M | } |
2165 | | |
2166 | 2.89k | return collector.agg_checksum(); |
2167 | 2.89k | } |
2168 | | |
2169 | | } // namespace |
2170 | | |
2171 | | Result<uint64_t> TabletServiceImpl::DoChecksum( |
2172 | 2.90k | const ChecksumRequestPB* req, CoarseTimePoint deadline) { |
2173 | 2.90k | auto abstract_tablet = VERIFY_RESULT(GetTablet( |
2174 | 2.90k | server_->tablet_peer_lookup(), req->tablet_id(), /* tablet_peer = */ nullptr, |
2175 | 2.90k | req->consistency_level(), AllowSplitTablet::kTrue)); |
2176 | 2.90k | return CalcChecksum(down_cast<tablet::Tablet*>(abstract_tablet.get()), deadline); |
2177 | 2.90k | } |
2178 | | |
2179 | | void TabletServiceImpl::Checksum(const ChecksumRequestPB* req, |
2180 | | ChecksumResponsePB* resp, |
2181 | 2.90k | rpc::RpcContext context) { |
2182 | 3 | VLOG(1) << "Full request: " << req->DebugString(); |
2183 | | |
2184 | 2.90k | auto checksum = DoChecksum(req, context.GetClientDeadline()); |
2185 | 2.90k | if (!checksum.ok()) { |
2186 | 4 | SetupErrorAndRespond(resp->mutable_error(), checksum.status(), &context); |
2187 | 4 | return; |
2188 | 4 | } |
2189 | | |
2190 | 2.90k | resp->set_checksum(*checksum); |
2191 | 2.90k | context.RespondSuccess(); |
2192 | 2.90k | } |
2193 | | |
2194 | | void TabletServiceImpl::ImportData(const ImportDataRequestPB* req, |
2195 | | ImportDataResponsePB* resp, |
2196 | 0 | rpc::RpcContext context) { |
2197 | 0 | auto peer = VERIFY_RESULT_OR_RETURN(LookupTabletPeerOrRespond( |
2198 | 0 | server_->tablet_peer_lookup(), req->tablet_id(), resp, &context)); |
2199 | |
|
2200 | 0 | auto status = peer.tablet_peer->tablet()->ImportData(req->source_dir()); |
2201 | 0 | if (!status.ok()) { |
2202 | 0 | SetupErrorAndRespond(resp->mutable_error(), status, &context); |
2203 | 0 | return; |
2204 | 0 | } |
2205 | 0 | context.RespondSuccess(); |
2206 | 0 | } |
2207 | | |
2208 | | void TabletServiceImpl::GetTabletStatus(const GetTabletStatusRequestPB* req, |
2209 | | GetTabletStatusResponsePB* resp, |
2210 | 1 | rpc::RpcContext context) { |
2211 | 1 | const Status s = server_->GetTabletStatus(req, resp); |
2212 | 1 | if (!s.ok()) { |
2213 | 0 | SetupErrorAndRespond(resp->mutable_error(), s, |
2214 | 0 | s.IsNotFound() ? TabletServerErrorPB::TABLET_NOT_FOUND |
2215 | 0 | : TabletServerErrorPB::UNKNOWN_ERROR, |
2216 | 0 | &context); |
2217 | 0 | return; |
2218 | 0 | } |
2219 | 1 | context.RespondSuccess(); |
2220 | 1 | } |
2221 | | |
2222 | | void TabletServiceImpl::IsTabletServerReady(const IsTabletServerReadyRequestPB* req, |
2223 | | IsTabletServerReadyResponsePB* resp, |
2224 | 9 | rpc::RpcContext context) { |
2225 | 9 | Status s = server_->tablet_manager()->GetNumTabletsPendingBootstrap(resp); |
2226 | 9 | if (!s.ok()) { |
2227 | 0 | SetupErrorAndRespond(resp->mutable_error(), s, &context); |
2228 | 0 | return; |
2229 | 0 | } |
2230 | 9 | context.RespondSuccess(); |
2231 | 9 | } |
2232 | | |
2233 | | void TabletServiceImpl::TakeTransaction(const TakeTransactionRequestPB* req, |
2234 | | TakeTransactionResponsePB* resp, |
2235 | 0 | rpc::RpcContext context) { |
2236 | 0 | auto transaction = server_->TransactionPool()->Take( |
2237 | 0 | client::ForceGlobalTransaction(req->has_is_global() && req->is_global())); |
2238 | 0 | auto metadata = transaction->Release(); |
2239 | 0 | if (!metadata.ok()) { |
2240 | 0 | LOG(INFO) << "Take failed: " << metadata.status(); |
2241 | 0 | context.RespondFailure(metadata.status()); |
2242 | 0 | return; |
2243 | 0 | } |
2244 | 0 | metadata->ForceToPB(resp->mutable_metadata()); |
2245 | 0 | VLOG(2) << "Taken metadata: " << metadata->ToString(); |
2246 | 0 | context.RespondSuccess(); |
2247 | 0 | } |
2248 | | |
2249 | | void TabletServiceImpl::GetSplitKey( |
2250 | 49 | const GetSplitKeyRequestPB* req, GetSplitKeyResponsePB* resp, RpcContext context) { |
2251 | 49 | TEST_PAUSE_IF_FLAG(TEST_pause_tserver_get_split_key); |
2252 | 49 | PerformAtLeader(req, resp, &context, |
2253 | 47 | [resp](const LeaderTabletPeer& leader_tablet_peer) -> Status { |
2254 | 47 | const auto& tablet = leader_tablet_peer.tablet; |
2255 | | |
2256 | 47 | if (tablet->MayHaveOrphanedPostSplitData()) { |
2257 | 0 | return STATUS(IllegalState, "Tablet has orphaned post-split data"); |
2258 | 0 | } |
2259 | 47 | const auto split_encoded_key = VERIFY_RESULT(tablet->GetEncodedMiddleSplitKey()); |
2260 | 46 | resp->set_split_encoded_key(split_encoded_key); |
2261 | 46 | const auto doc_key_hash = VERIFY_RESULT(docdb::DecodeDocKeyHash(split_encoded_key)); |
2262 | 46 | if (doc_key_hash.has_value()) { |
2263 | 46 | resp->set_split_partition_key(PartitionSchema::EncodeMultiColumnHashValue( |
2264 | 46 | doc_key_hash.value())); |
2265 | 0 | } else { |
2266 | 0 | resp->set_split_partition_key(split_encoded_key); |
2267 | 0 | } |
2268 | 46 | return Status::OK(); |
2269 | 46 | }); |
2270 | 49 | } |
2271 | | |
2272 | | void TabletServiceImpl::GetSharedData(const GetSharedDataRequestPB* req, |
2273 | | GetSharedDataResponsePB* resp, |
2274 | 0 | rpc::RpcContext context) { |
2275 | 0 | auto& data = server_->SharedObject(); |
2276 | 0 | resp->mutable_data()->assign(pointer_cast<const char*>(&data), sizeof(data)); |
2277 | 0 | context.RespondSuccess(); |
2278 | 0 | } |
2279 | | |
2280 | 160 | void TabletServiceImpl::Shutdown() { |
2281 | 160 | } |
2282 | | |
2283 | | scoped_refptr<Histogram> TabletServer::GetMetricsHistogram( |
2284 | 75.5k | TabletServerServiceRpcMethodIndexes metric) { |
2285 | | // Returns the metric Histogram by holding a lock to make sure tablet_server_service_ remains |
2286 | | // unchanged during the operation. |
2287 | 75.5k | std::lock_guard<simple_spinlock> l(lock_); |
2288 | 75.5k | if (tablet_server_service_) { |
2289 | 75.5k | return tablet_server_service_->GetMetric(metric).handler_latency; |
2290 | 75.5k | } |
2291 | 0 | return nullptr; |
2292 | 0 | } |
2293 | | |
2294 | | TabletServerForwardServiceImpl::TabletServerForwardServiceImpl(TabletServiceImpl *impl, |
2295 | | TabletServerIf *server) |
2296 | | : TabletServerForwardServiceIf(server->MetricEnt()), |
2297 | 5.81k | server_(server) { |
2298 | 5.81k | } |
2299 | | |
2300 | | void TabletServerForwardServiceImpl::Write(const WriteRequestPB* req, |
2301 | | WriteResponsePB* resp, |
2302 | 0 | rpc::RpcContext context) { |
2303 | | // Forward the rpc to the required Tserver. |
2304 | 0 | std::shared_ptr<ForwardWriteRpc> forward_rpc = |
2305 | 0 | std::make_shared<ForwardWriteRpc>(req, resp, std::move(context), server_->client()); |
2306 | 0 | forward_rpc->SendRpc(); |
2307 | 0 | } |
2308 | | |
2309 | | void TabletServerForwardServiceImpl::Read(const ReadRequestPB* req, |
2310 | | ReadResponsePB* resp, |
2311 | 0 | rpc::RpcContext context) { |
2312 | 0 | std::shared_ptr<ForwardReadRpc> forward_rpc = |
2313 | 0 | std::make_shared<ForwardReadRpc>(req, resp, std::move(context), server_->client()); |
2314 | 0 | forward_rpc->SendRpc(); |
2315 | 0 | } |
2316 | | |
2317 | | } // namespace tserver |
2318 | | } // namespace yb |