/Users/deen/code/yugabyte-db/src/yb/client/tablet_rpc.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // |
2 | | // Copyright (c) YugaByte, Inc. |
3 | | // |
4 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
5 | | // in compliance with the License. You may obtain a copy of the License at |
6 | | // |
7 | | // http://www.apache.org/licenses/LICENSE-2.0 |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
10 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
11 | | // or implied. See the License for the specific language governing permissions and limitations |
12 | | // under the License. |
13 | | // |
14 | | // |
15 | | |
16 | | #include "yb/client/tablet_rpc.h" |
17 | | |
18 | | #include "yb/client/client-internal.h" |
19 | | #include "yb/client/client.h" |
20 | | #include "yb/client/client_error.h" |
21 | | #include "yb/client/meta_cache.h" |
22 | | |
23 | | #include "yb/common/wire_protocol.h" |
24 | | |
25 | | #include "yb/rpc/rpc_controller.h" |
26 | | #include "yb/rpc/rpc_header.pb.h" |
27 | | |
28 | | #include "yb/tserver/tserver_error.h" |
29 | | #include "yb/tserver/tserver_forward_service.proxy.h" |
30 | | #include "yb/tserver/tserver_service.proxy.h" |
31 | | |
32 | | #include "yb/util/flag_tags.h" |
33 | | #include "yb/util/logging.h" |
34 | | #include "yb/util/result.h" |
35 | | #include "yb/util/trace.h" |
36 | | |
37 | | DEFINE_test_flag(bool, assert_local_op, false, |
38 | | "When set, we crash if we received an operation that cannot be served locally."); |
39 | | DEFINE_int32(force_lookup_cache_refresh_secs, 0, "When non-zero, specifies how often we send a " |
40 | | "GetTabletLocations request to the master leader to update the tablet replicas cache. " |
41 | | "This request is only sent if we are processing a ConsistentPrefix read."); |
42 | | |
43 | | DEFINE_int32(lookup_cache_refresh_secs, 60, "When non-zero, specifies how often we send a " |
44 | | "GetTabletLocations request to the master leader to update the tablet replicas cache. " |
45 | | "This request is only sent if we are processing a ConsistentPrefix read and the RPC " |
46 | | "layer has determined that its view of the replicas is inconsistent with what the " |
47 | | "master has reported"); |
48 | | DEFINE_test_flag(int32, assert_failed_replicas_less_than, 0, |
49 | | "If greater than 0, this process will crash if the number of failed replicas for " |
50 | | "a RemoteTabletServer is greater than the specified number."); |
51 | | |
52 | | DECLARE_bool(ysql_forward_rpcs_to_local_tserver); |
53 | | |
54 | | using namespace std::placeholders; |
55 | | |
56 | | namespace yb { |
57 | | namespace client { |
58 | | namespace internal { |
59 | | |
60 | | TabletInvoker::TabletInvoker(const bool local_tserver_only, |
61 | | const bool consistent_prefix, |
62 | | YBClient* client, |
63 | | rpc::RpcCommand* command, |
64 | | TabletRpc* rpc, |
65 | | RemoteTablet* tablet, |
66 | | const std::shared_ptr<const YBTable>& table, |
67 | | rpc::RpcRetrier* retrier, |
68 | | Trace* trace, |
69 | | master::IncludeInactive include_inactive) |
70 | | : client_(client), |
71 | | command_(command), |
72 | | rpc_(rpc), |
73 | | tablet_(tablet), |
74 | | tablet_id_(tablet != nullptr ? tablet->tablet_id() : std::string()), |
75 | | table_(table), |
76 | | retrier_(retrier), |
77 | | trace_(trace), |
78 | | include_inactive_(include_inactive), |
79 | | local_tserver_only_(local_tserver_only), |
80 | 7.25M | consistent_prefix_(consistent_prefix) {} |
81 | | |
82 | 7.25M | TabletInvoker::~TabletInvoker() {} |
83 | | |
84 | 11.6k | void TabletInvoker::SelectTabletServerWithConsistentPrefix() { |
85 | 11.6k | TRACE_TO(trace_, "SelectTabletServerWithConsistentPrefix()"); |
86 | 11.6k | if (FLAGS_TEST_assert_failed_replicas_less_than) { |
87 | 1.69k | if (tablet_->GetNumFailedReplicas() >= FLAGS_TEST_assert_failed_replicas_less_than) { |
88 | 0 | LOG(FATAL) << "Invalid number of failed replicas: " << tablet_->ReplicasAsString(); |
89 | 0 | } |
90 | 1.69k | } |
91 | | |
92 | 11.6k | std::vector<RemoteTabletServer*> candidates; |
93 | 11.6k | current_ts_ = client_->data_->SelectTServer(tablet_.get(), |
94 | 11.6k | YBClient::ReplicaSelection::CLOSEST_REPLICA, {}, |
95 | 11.6k | &candidates); |
96 | 0 | VLOG(1) << "Using tserver: " << yb::ToString(current_ts_); |
97 | 11.6k | } |
98 | | |
99 | 40.4k | void TabletInvoker::SelectLocalTabletServer() { |
100 | 40.4k | TRACE_TO(trace_, "SelectLocalTabletServer()"); |
101 | | |
102 | 40.4k | current_ts_ = client_->data_->meta_cache_->local_tserver(); |
103 | 0 | VLOG(1) << "Using local tserver: " << current_ts_->ToString(); |
104 | 40.4k | } |
105 | | |
106 | 7.26M | void TabletInvoker::SelectTabletServer() { |
107 | 7.26M | TRACE_TO(trace_, "SelectTabletServer()"); |
108 | | |
109 | 7.26M | assign_new_leader_ = false; |
110 | | // Choose a destination TS according to the following algorithm: |
111 | | // 1. Select the leader, provided: |
112 | | // a. One exists, and |
113 | | // b. It hasn't failed, and |
114 | | // c. It isn't currently marked as a follower. |
115 | | // 2. If there's no good leader select another replica, provided: |
116 | | // a. It hasn't failed, and |
117 | | // b. It hasn't rejected our write due to being a follower. |
118 | | // 3. If we're out of appropriate replicas, force a lookup to the master |
119 | | // to fetch new consensus configuration information. |
120 | | // 4. When the lookup finishes, forget which replicas were followers and |
121 | | // retry the write (i.e. goto 1). |
122 | | // 5. If we issue the write and it fails because the destination was a |
123 | | // follower, remember that fact and retry the write (i.e. goto 1). |
124 | | // 6. Repeat steps 1-5 until the write succeeds, fails for other reasons, |
125 | | // or the write's deadline expires. |
126 | 7.26M | current_ts_ = tablet_->LeaderTServer(); |
127 | 7.26M | if (current_ts_ && followers_.count(current_ts_)) { |
128 | 0 | VLOG(2) << "Tablet " << tablet_id_ << ": We have a follower for a leader: " |
129 | 0 | << current_ts_->ToString(); |
130 | | |
131 | | // Mark the node as a follower in the cache so that on the next go-round, |
132 | | // LeaderTServer() will not return it as a leader unless a full metadata |
133 | | // refresh has occurred. This also avoids LookupTabletByKey() going into |
134 | | // "fast path" mode and not actually performing a metadata refresh from the |
135 | | // Master when it needs to. |
136 | 1.56k | tablet_->MarkTServerAsFollower(current_ts_); |
137 | 1.56k | current_ts_ = nullptr; |
138 | 1.56k | } |
139 | 7.26M | if (!current_ts_) { |
140 | | // Try to "guess" the next leader. |
141 | 35.9k | vector<RemoteTabletServer*> replicas; |
142 | 35.9k | tablet_->GetRemoteTabletServers(&replicas); |
143 | 61.2k | for (RemoteTabletServer* ts : replicas) { |
144 | 61.2k | if (!followers_.count(ts)) { |
145 | 24.9k | current_ts_ = ts; |
146 | 24.9k | break; |
147 | 24.9k | } |
148 | 61.2k | } |
149 | 35.9k | if (current_ts_) { |
150 | 24.9k | assign_new_leader_ = true; |
151 | 10.9k | } else { |
152 | 10.9k | YB_LOG_EVERY_N_SECS(INFO, 1) |
153 | 576 | << "Unable to pick leader for " << tablet_id_ << ", replicas: " << AsString(replicas) |
154 | 576 | << ", followers: " << AsString(followers_) << THROTTLE_MSG; |
155 | 10.9k | } |
156 | 7.22M | } else { |
157 | 9.52k | VLOG(4) << "Selected TServer " << current_ts_->ToString() << " as leader for " << tablet_id_; |
158 | 7.22M | } |
159 | 7.26M | VTRACE_TO(1, trace_, "Selected $0", (current_ts_ ? current_ts_->ToString() : "none")); |
160 | 7.26M | } |
161 | | |
162 | 8.04M | void TabletInvoker::Execute(const std::string& tablet_id, bool leader_only) { |
163 | 8.04M | if (tablet_id_.empty()) { |
164 | 733k | if (!tablet_id.empty()) { |
165 | 733k | tablet_id_ = tablet_id; |
166 | 18.4E | } else { |
167 | 18.4E | tablet_id_ = CHECK_NOTNULL(tablet_.get())->tablet_id(); |
168 | 18.4E | } |
169 | 732k | } |
170 | | |
171 | 8.04M | if (!tablet_) { |
172 | 732k | client_->LookupTabletById(tablet_id_, table_, include_inactive_, retrier_->deadline(), |
173 | 732k | std::bind(&TabletInvoker::InitialLookupTabletDone, this, _1), |
174 | 732k | UseCache::kTrue); |
175 | 732k | return; |
176 | 732k | } |
177 | | |
178 | 7.31M | if (consistent_prefix_ && !leader_only) { |
179 | 11.8k | bool refresh_cache = false; |
180 | 11.8k | if (PREDICT_FALSE(FLAGS_force_lookup_cache_refresh_secs > 0) && |
181 | 0 | MonoTime::Now().GetDeltaSince(tablet_->refresh_time()).ToSeconds() > |
182 | 0 | FLAGS_force_lookup_cache_refresh_secs) { |
183 | |
|
184 | 0 | refresh_cache = true; |
185 | |
|
186 | 0 | VLOG(1) << "Updating tablet " << tablet_->tablet_id() << " replicas cache " |
187 | 0 | << "force_lookup_cache_refresh_secs: " << FLAGS_force_lookup_cache_refresh_secs |
188 | 0 | << ". " << MonoTime::Now().GetDeltaSince(tablet_->refresh_time()).ToSeconds() |
189 | 0 | << " seconds since the last update. Replicas in current cache: " |
190 | 0 | << tablet_->ReplicasAsString(); |
191 | 11.8k | } else if (FLAGS_lookup_cache_refresh_secs > 0 && |
192 | 11.8k | MonoTime::Now().GetDeltaSince(tablet_->refresh_time()).ToSeconds() > |
193 | 11.8k | FLAGS_lookup_cache_refresh_secs && |
194 | 1.79k | !tablet_->IsReplicasCountConsistent()) { |
195 | 204 | refresh_cache = true; |
196 | 0 | VLOG(1) << "Updating tablet " << tablet_->tablet_id() << " replicas cache " |
197 | 0 | << "force_lookup_cache_refresh_secs: " << FLAGS_force_lookup_cache_refresh_secs |
198 | 0 | << ". " << MonoTime::Now().GetDeltaSince(tablet_->refresh_time()).ToSeconds() |
199 | 0 | << " seconds since the last update. Replicas in current cache: " |
200 | 0 | << tablet_->ReplicasAsString(); |
201 | 204 | } |
202 | | |
203 | | |
204 | 11.8k | if (refresh_cache) { |
205 | 204 | client_->LookupTabletById(tablet_id_, |
206 | 204 | table_, |
207 | 204 | include_inactive_, |
208 | 204 | retrier_->deadline(), |
209 | 204 | std::bind(&TabletInvoker::LookupTabletCb, this, _1), |
210 | 204 | UseCache::kFalse); |
211 | 204 | return; |
212 | 204 | } |
213 | 7.31M | } |
214 | | |
215 | | // Sets current_ts_. |
216 | 7.31M | if (local_tserver_only_) { |
217 | 40.4k | SelectLocalTabletServer(); |
218 | 7.27M | } else if (consistent_prefix_ && !leader_only) { |
219 | 11.6k | SelectTabletServerWithConsistentPrefix(); |
220 | 7.25M | } else { |
221 | 7.25M | SelectTabletServer(); |
222 | 7.25M | } |
223 | | |
224 | | // If we've tried all replicas, force a lookup to the master to find the |
225 | | // new leader. This relies on some properties of LookupTabletByKey(): |
226 | | // 1. The fast path only works when there's a non-failed leader (which we |
227 | | // know is untrue here). |
228 | | // 2. The slow path always fetches consensus configuration information and |
229 | | // updates the looked-up tablet. |
230 | | // Put another way, we don't care about the lookup results at all; we're |
231 | | // just using it to fetch the latest consensus configuration information. |
232 | | // |
233 | 7.31M | if (!current_ts_) { |
234 | 10.9k | client_->LookupTabletById(tablet_id_, |
235 | 10.9k | table_, |
236 | 10.9k | include_inactive_, |
237 | 10.9k | retrier_->deadline(), |
238 | 10.9k | std::bind(&TabletInvoker::LookupTabletCb, this, _1), |
239 | 10.9k | UseCache::kTrue); |
240 | 10.9k | return; |
241 | 10.9k | } |
242 | | |
243 | | // Make sure we have a working proxy before sending out the RPC. |
244 | 7.29M | auto status = current_ts_->InitProxy(client_); |
245 | | |
246 | | // Fail to a replica in the event of a DNS resolution failure. |
247 | 7.29M | if (!status.ok()) { |
248 | 0 | status = FailToNewReplica(status); |
249 | 0 | if (!status.ok()) { |
250 | 0 | command_->Finished(status); |
251 | 0 | } |
252 | 0 | return; |
253 | 0 | } |
254 | | |
255 | | // Now that the current_ts_ is set, check if we need to send the request to the node local forward |
256 | | // proxy. |
257 | 7.29M | should_use_local_node_proxy_ = ShouldUseNodeLocalForwardProxy(); |
258 | | |
259 | 18.4E | VLOG(2) << "Tablet " << tablet_id_ << ": Sending " << command_->ToString() << " to replica " |
260 | 18.4E | << current_ts_->ToString() << " using local node forward proxy " |
261 | 18.4E | << should_use_local_node_proxy_; |
262 | | |
263 | 7.29M | rpc_->SendRpcToTserver(retrier_->attempt_num()); |
264 | 7.29M | } |
265 | | |
266 | 7.30M | bool TabletInvoker::ShouldUseNodeLocalForwardProxy() { |
267 | 7.30M | DCHECK(current_ts_); |
268 | 7.30M | return FLAGS_ysql_forward_rpcs_to_local_tserver && |
269 | 0 | client().GetNodeLocalForwardProxy() && |
270 | 0 | !(current_ts_->ProxyEndpoint() == client().GetMasterLeaderAddress()) && |
271 | 0 | !(current_ts_->ProxyEndpoint() == client().GetNodeLocalTServerHostPort()); |
272 | 7.30M | } |
273 | | |
274 | | Status TabletInvoker::FailToNewReplica(const Status& reason, |
275 | 26.8k | const tserver::TabletServerErrorPB* error_code) { |
276 | 26.8k | if (ErrorCode(error_code) == tserver::TabletServerErrorPB::STALE_FOLLOWER) { |
277 | 0 | VLOG(1) << "Stale follower for " << command_->ToString() << " just retry"; |
278 | 24.8k | } else if (ErrorCode(error_code) == tserver::TabletServerErrorPB::NOT_THE_LEADER) { |
279 | 9 | VLOG(1) << "Not the leader for " << command_->ToString() |
280 | 9 | << " retrying with a different replica"; |
281 | | // In the past we were marking a replica as failed whenever an error was returned. The problem |
282 | | // with this approach is that not all type of errors mean that the replica has failed. Some |
283 | | // errors like NOT_THE_LEADER are only specific to certain type of requests (Write and |
284 | | // UpdateTransaction RPCs), but other type of requests don't need to be sent to the leader |
285 | | // (consistent prefix reads). So instead of marking a replica as failed for all the RPCs (since |
286 | | // the RemoteTablet object is shared across all the rpcs in the same batcher), this remote |
287 | | // tablet server is marked as a follower so that it's not used during a retry for requests that |
288 | | // need to contact the leader only. This has the same effect as marking the replica as failed |
289 | | // for this specific RPC, but without affecting other RPCs. |
290 | 22.5k | followers_.emplace(current_ts_, FollowerData { |
291 | 22.5k | .status = STATUS(IllegalState, "Not the leader"), |
292 | 22.5k | .time = CoarseMonoClock::now() |
293 | 22.5k | }); |
294 | 2.26k | } else { |
295 | 18.4E | VLOG(1) << "Failing " << command_->ToString() << " to a new replica: " << reason |
296 | 18.4E | << ", old replica: " << yb::ToString(current_ts_); |
297 | | |
298 | 2.28k | bool found = !tablet_ || tablet_->MarkReplicaFailed(current_ts_, reason); |
299 | 2.26k | if (!found) { |
300 | | // Its possible that current_ts_ is not part of replicas if RemoteTablet.Refresh() is invoked |
301 | | // which updates the set of replicas. |
302 | 4 | LOG(WARNING) << "Tablet " << tablet_id_ << ": Unable to mark replica " |
303 | 4 | << current_ts_->ToString() |
304 | 4 | << " as failed. Replicas: " << tablet_->ReplicasAsString(); |
305 | 4 | } |
306 | 2.26k | } |
307 | | |
308 | 26.8k | auto status = retrier_->DelayedRetry(command_, reason); |
309 | 26.8k | if (!status.ok()) { |
310 | 0 | LOG(WARNING) << "Failed to schedule retry on new replica: " << status; |
311 | 0 | } |
312 | 26.8k | return status; |
313 | 26.8k | } |
314 | | |
315 | 7.29M | bool TabletInvoker::Done(Status* status) { |
316 | 7.29M | TRACE_TO(trace_, "Done($0)", status->ToString(false)); |
317 | 7.29M | ADOPT_TRACE(trace_); |
318 | | |
319 | 7.29M | bool assign_new_leader = assign_new_leader_; |
320 | 7.29M | assign_new_leader_ = false; |
321 | | |
322 | 7.29M | if (status->IsAborted() || retrier_->finished()) { |
323 | 0 | if (status->ok()) { |
324 | 0 | *status = retrier_->controller().status(); |
325 | 0 | if (status->ok()) { |
326 | 0 | *status = STATUS(Aborted, "Retrier finished"); |
327 | 0 | } |
328 | 0 | } |
329 | 0 | return true; |
330 | 0 | } |
331 | | |
332 | | // Prefer early failures over controller failures. |
333 | 7.29M | if (status->ok() && retrier_->HandleResponse(command_, status)) { |
334 | 27 | return false; |
335 | 27 | } |
336 | | |
337 | | // Failover to a replica in the event of any network failure. |
338 | | // |
339 | | // TODO: This is probably too harsh; some network failures should be |
340 | | // retried on the current replica. |
341 | 7.29M | if (status->IsNetworkError()) { |
342 | | // The whole operation is completed if we can't schedule a retry. |
343 | 896 | return !FailToNewReplica(*status).ok(); |
344 | 896 | } |
345 | | |
346 | | // Prefer controller failures over response failures. |
347 | 7.29M | auto rsp_err = rpc_->response_error(); |
348 | 7.29M | { |
349 | 7.29M | Status resp_error_status = ErrorStatus(rsp_err); |
350 | 7.29M | if (status->ok() && !resp_error_status.ok()) { |
351 | 208k | *status = resp_error_status; |
352 | 7.08M | } else if (status->IsRemoteError()) { |
353 | 11 | if (!resp_error_status.ok()) { |
354 | 0 | *status = resp_error_status; |
355 | 11 | } else { |
356 | 11 | const auto* error = retrier_->controller().error_response(); |
357 | 11 | if (error && |
358 | 11 | (error->code() == rpc::ErrorStatusPB::FATAL_SERVER_SHUTTING_DOWN || |
359 | 11 | error->code() == rpc::ErrorStatusPB::ERROR_NO_SUCH_SERVICE)) { |
360 | 0 | *status = STATUS(ServiceUnavailable, error->message()); |
361 | 0 | } |
362 | 11 | } |
363 | 11 | } |
364 | 7.29M | } |
365 | | |
366 | 7.29M | const bool is_tablet_split = ErrorCode(rsp_err) == tserver::TabletServerErrorPB::TABLET_SPLIT; |
367 | 7.29M | if (is_tablet_split || ClientError(*status) == ClientErrorCode::kTablePartitionListIsStale) { |
368 | | // Replace status error with TryAgain, so upper layer retry request after refreshing |
369 | | // table partitioning metadata. |
370 | 22 | *status = status->CloneAndReplaceCode(Status::kTryAgain); |
371 | 22 | if (is_tablet_split) { |
372 | 19 | tablet_->MarkAsSplit(); |
373 | 19 | } |
374 | 22 | rpc_->Failed(*status); |
375 | 22 | return true; |
376 | 22 | } |
377 | | |
378 | | // Oops, we failed over to a replica that wasn't a LEADER. Unlikely as |
379 | | // we're using consensus configuration information from the master, but still possible |
380 | | // (e.g. leader restarted and became a FOLLOWER). Try again. |
381 | | // |
382 | | // TODO: IllegalState is obviously way too broad an error category for |
383 | | // this case. |
384 | 7.29M | if (status->IsIllegalState() || status->IsServiceUnavailable() || status->IsAborted() || |
385 | 7.24M | status->IsLeaderNotReadyToServe() || status->IsLeaderHasNoLease() || |
386 | 7.23M | TabletNotFoundOnTServer(rsp_err, *status) || |
387 | 7.23M | (status->IsTimedOut() && CoarseMonoClock::Now() < retrier_->deadline())) { |
388 | 18.4E | VLOG(4) << "Retryable failure: " << *status |
389 | 18.4E | << ", response: " << yb::ToString(rsp_err); |
390 | | |
391 | 32.5k | const bool leader_is_not_ready = |
392 | 32.5k | ErrorCode(rsp_err) == |
393 | 32.5k | tserver::TabletServerErrorPB::LEADER_NOT_READY_TO_SERVE || |
394 | 27.8k | status->IsLeaderNotReadyToServe(); |
395 | | |
396 | | // If the leader just is not ready - let's retry the same tserver. |
397 | | // Else the leader became a follower and must be reset on retry. |
398 | 32.5k | if (!leader_is_not_ready) { |
399 | 27.8k | followers_.emplace(current_ts_, FollowerData { |
400 | 27.8k | .status = *status, |
401 | 27.8k | .time = CoarseMonoClock::now() |
402 | 27.8k | }); |
403 | 27.8k | } |
404 | | |
405 | 32.5k | if (PREDICT_FALSE(FLAGS_TEST_assert_local_op) && current_ts_->IsLocal() && |
406 | 0 | status->IsIllegalState()) { |
407 | 0 | CHECK(false) << "Operation is not local"; |
408 | 0 | } |
409 | | |
410 | | // If only local tserver is requested and it is not the leader, respond error and done. |
411 | | // Otherwise, continue below to retry. |
412 | 32.5k | if (local_tserver_only_ && current_ts_->IsLocal() && status->IsIllegalState()) { |
413 | 5 | rpc_->Failed(*status); |
414 | 5 | return true; |
415 | 5 | } |
416 | | |
417 | 32.4k | if (status->IsIllegalState() || TabletNotFoundOnTServer(rsp_err, *status)) { |
418 | | // The whole operation is completed if we can't schedule a retry. |
419 | 25.9k | return !FailToNewReplica(*status, rsp_err).ok(); |
420 | 6.54k | } else { |
421 | 6.54k | tserver::TabletServerDelay delay(*status); |
422 | 6.54k | auto retry_status = delay.value().Initialized() |
423 | 6 | ? retrier_->DelayedRetry(command_, *status, delay.value()) |
424 | 6.53k | : retrier_->DelayedRetry(command_, *status); |
425 | 6.54k | if (!retry_status.ok()) { |
426 | 0 | command_->Finished(retry_status); |
427 | 0 | } |
428 | 6.54k | } |
429 | 6.54k | return false; |
430 | 7.26M | } |
431 | | |
432 | 7.26M | if (!status->ok()) { |
433 | 179k | if (status->IsTimedOut()) { |
434 | 2 | VLOG(1) << "Call to " << yb::ToString(tablet_) << " timed out. Marking replica " |
435 | 2 | << yb::ToString(current_ts_) << " as failed."; |
436 | 2.78k | if (tablet_ != nullptr && current_ts_ != nullptr) { |
437 | 2.63k | tablet_->MarkReplicaFailed(current_ts_, *status); |
438 | 2.63k | } |
439 | 2.78k | } |
440 | 179k | if (status->IsExpired() && rpc_->ShouldRetryExpiredRequest()) { |
441 | 0 | client_->MaybeUpdateMinRunningRequestId( |
442 | 0 | tablet_->tablet_id(), MinRunningRequestIdStatusData(*status).value()); |
443 | 0 | *status = STATUS( |
444 | 0 | TryAgain, status->message(), ClientError(ClientErrorCode::kExpiredRequestToBeRetried)); |
445 | 0 | } |
446 | 179k | std::string current_ts_string; |
447 | 179k | if (current_ts_) { |
448 | 178k | current_ts_string = Format("on tablet server $0", *current_ts_); |
449 | 428 | } else { |
450 | 428 | current_ts_string = "(no tablet server available)"; |
451 | 428 | } |
452 | 179k | Status log_status = status->CloneAndPrepend( |
453 | 179k | Format("Failed $0 to tablet $1 $2 after $3 attempt(s)", |
454 | 179k | command_->ToString(), |
455 | 179k | tablet_id_, |
456 | 179k | current_ts_string, |
457 | 179k | retrier_->attempt_num())); |
458 | 179k | if (status->IsTryAgain() || status->IsExpired() || status->IsAlreadyPresent()) { |
459 | 176k | YB_LOG_EVERY_N_SECS(INFO, 1) << log_status; |
460 | 3.23k | } else { |
461 | 3.23k | YB_LOG_EVERY_N_SECS(WARNING, 1) << log_status; |
462 | 3.23k | } |
463 | 179k | rpc_->Failed(*status); |
464 | 7.08M | } else if (assign_new_leader && current_ts_) { |
465 | 938 | bool assigned = tablet_->MarkTServerAsLeader(current_ts_); |
466 | 18.4E | LOG_IF(INFO, !assigned) |
467 | 18.4E | << "Unable to mark as leader: " << current_ts_->ToString() << " for " |
468 | 18.4E | << tablet_->ToString(); |
469 | 938 | } |
470 | | |
471 | 7.26M | return true; |
472 | 7.26M | } |
473 | | |
474 | 734k | void TabletInvoker::InitialLookupTabletDone(const Result<RemoteTabletPtr>& result) { |
475 | 69 | VLOG(1) << "InitialLookupTabletDone(" << result << ")"; |
476 | | |
477 | 734k | if (result.ok()) { |
478 | 734k | tablet_ = *result; |
479 | 734k | Execute(std::string()); |
480 | 18.4E | } else { |
481 | 18.4E | command_->Finished(result.status()); |
482 | 18.4E | } |
483 | 734k | } |
484 | | |
485 | 5.70M | bool TabletInvoker::IsLocalCall() const { |
486 | 5.70M | return current_ts_ != nullptr && current_ts_->IsLocal(); |
487 | 5.70M | } |
488 | | |
489 | 1.27M | std::shared_ptr<tserver::TabletServerServiceProxy> TabletInvoker::proxy() const { |
490 | 1.27M | return current_ts_->proxy(); |
491 | 1.27M | } |
492 | | |
493 | 0 | ::yb::HostPort TabletInvoker::ProxyEndpoint() const { |
494 | 0 | return current_ts_->ProxyEndpoint(); |
495 | 0 | } |
496 | | |
497 | 10.8k | void TabletInvoker::LookupTabletCb(const Result<RemoteTabletPtr>& result) { |
498 | 0 | VLOG_WITH_FUNC(1) << AsString(result) << ", command: " << command_->ToString() |
499 | 0 | << ", retrier: " << retrier_->ToString(); |
500 | | |
501 | 10.8k | if (result.ok()) { |
502 | 10.4k | #ifndef DEBUG |
503 | 10.4k | TRACE_TO(trace_, Format("LookupTabletCb($0)", *result)); |
504 | | #else |
505 | | TRACE_TO(trace_, "LookupTabletCb(OK)"); |
506 | | #endif |
507 | 337 | } else { |
508 | 337 | TRACE_TO(trace_, "LookupTabletCb($0)", result.status().ToString(false)); |
509 | 337 | } |
510 | | |
511 | | // We should retry the RPC regardless of the outcome of the lookup, as |
512 | | // leader election doesn't depend on the existence of a master at all. |
513 | | // Unless we know that this status is persistent. |
514 | | // For instance if tablet was deleted, we would always receive "Not found". |
515 | 10.8k | if (!result.ok() && |
516 | 337 | (result.status().IsNotFound() || |
517 | 304 | ClientError(result.status()) == ClientErrorCode::kTablePartitionListIsStale)) { |
518 | 304 | command_->Finished(result.status()); |
519 | 304 | return; |
520 | 304 | } |
521 | | |
522 | | // Retry() imposes a slight delay, which is desirable in a lookup loop, |
523 | | // but unnecessary the first time through. Seeing as leader failures are |
524 | | // rare, perhaps this doesn't matter. |
525 | 10.5k | followers_.clear(); |
526 | 10.5k | auto retry_status = retrier_->DelayedRetry( |
527 | 10.4k | command_, result.ok() ? Status::OK() : result.status()); |
528 | 10.5k | if (!retry_status.ok()) { |
529 | 0 | command_->Finished(!result.ok() ? result.status() : retry_status); |
530 | 0 | } |
531 | 10.5k | } |
532 | | |
533 | | void TabletInvoker::WriteAsync(const tserver::WriteRequestPB& req, |
534 | | tserver::WriteResponsePB *resp, |
535 | | rpc::RpcController *controller, |
536 | 1.32M | std::function<void()>&& cb) { |
537 | 1.32M | if (should_use_local_node_proxy_) { |
538 | 0 | client().GetNodeLocalForwardProxy()->WriteAsync(req, resp, controller, move(cb)); |
539 | 1.32M | } else { |
540 | 1.32M | current_ts_->proxy()->WriteAsync(req, resp, controller, move(cb)); |
541 | 1.32M | } |
542 | 1.32M | } |
543 | | |
544 | | void TabletInvoker::ReadAsync(const tserver::ReadRequestPB& req, |
545 | | tserver::ReadResponsePB *resp, |
546 | | rpc::RpcController *controller, |
547 | 4.70M | std::function<void()>&& cb) { |
548 | 4.70M | if (should_use_local_node_proxy_) { |
549 | 0 | client().GetNodeLocalForwardProxy()->ReadAsync(req, resp, controller, move(cb)); |
550 | 4.70M | } else { |
551 | 4.70M | current_ts_->proxy()->ReadAsync(req, resp, controller, move(cb)); |
552 | 4.70M | } |
553 | 4.70M | } |
554 | | |
555 | 1.49k | std::string TabletInvoker::FollowerData::ToString() const { |
556 | 1.49k | return Format("{ status: $0 time: $1 }", status, CoarseMonoClock::now() - time); |
557 | 1.49k | } |
558 | | |
559 | 7.26M | Status ErrorStatus(const tserver::TabletServerErrorPB* error) { |
560 | 7.05M | return error == nullptr ? Status::OK() |
561 | 207k | : StatusFromPB(error->status()); |
562 | 7.26M | } |
563 | | |
564 | 7.36M | tserver::TabletServerErrorPB_Code ErrorCode(const tserver::TabletServerErrorPB* error) { |
565 | 7.06M | return error == nullptr ? tserver::TabletServerErrorPB::UNKNOWN_ERROR |
566 | 293k | : error->code(); |
567 | 7.36M | } |
568 | | |
569 | | } // namespace internal |
570 | | } // namespace client |
571 | | } // namespace yb |