/Users/deen/code/yugabyte-db/src/yb/client/tablet_rpc.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // |
2 | | // Copyright (c) YugaByte, Inc. |
3 | | // |
4 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
5 | | // in compliance with the License. You may obtain a copy of the License at |
6 | | // |
7 | | // http://www.apache.org/licenses/LICENSE-2.0 |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
10 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
11 | | // or implied. See the License for the specific language governing permissions and limitations |
12 | | // under the License. |
13 | | // |
14 | | // |
15 | | |
16 | | #include "yb/client/tablet_rpc.h" |
17 | | |
18 | | #include "yb/client/client-internal.h" |
19 | | #include "yb/client/client.h" |
20 | | #include "yb/client/client_error.h" |
21 | | #include "yb/client/meta_cache.h" |
22 | | |
23 | | #include "yb/common/wire_protocol.h" |
24 | | |
25 | | #include "yb/rpc/rpc_controller.h" |
26 | | #include "yb/rpc/rpc_header.pb.h" |
27 | | |
28 | | #include "yb/tserver/tserver_error.h" |
29 | | #include "yb/tserver/tserver_forward_service.proxy.h" |
30 | | #include "yb/tserver/tserver_service.proxy.h" |
31 | | |
32 | | #include "yb/util/flag_tags.h" |
33 | | #include "yb/util/logging.h" |
34 | | #include "yb/util/result.h" |
35 | | #include "yb/util/trace.h" |
36 | | |
37 | | DEFINE_test_flag(bool, assert_local_op, false, |
38 | | "When set, we crash if we received an operation that cannot be served locally."); |
39 | | DEFINE_int32(force_lookup_cache_refresh_secs, 0, "When non-zero, specifies how often we send a " |
40 | | "GetTabletLocations request to the master leader to update the tablet replicas cache. " |
41 | | "This request is only sent if we are processing a ConsistentPrefix read."); |
42 | | |
43 | | DEFINE_int32(lookup_cache_refresh_secs, 60, "When non-zero, specifies how often we send a " |
44 | | "GetTabletLocations request to the master leader to update the tablet replicas cache. " |
45 | | "This request is only sent if we are processing a ConsistentPrefix read and the RPC " |
46 | | "layer has determined that its view of the replicas is inconsistent with what the " |
47 | | "master has reported"); |
48 | | DEFINE_test_flag(int32, assert_failed_replicas_less_than, 0, |
49 | | "If greater than 0, this process will crash if the number of failed replicas for " |
50 | | "a RemoteTabletServer is greater than the specified number."); |
51 | | |
52 | | DECLARE_bool(ysql_forward_rpcs_to_local_tserver); |
53 | | |
54 | | using namespace std::placeholders; |
55 | | |
56 | | namespace yb { |
57 | | namespace client { |
58 | | namespace internal { |
59 | | |
60 | | TabletInvoker::TabletInvoker(const bool local_tserver_only, |
61 | | const bool consistent_prefix, |
62 | | YBClient* client, |
63 | | rpc::RpcCommand* command, |
64 | | TabletRpc* rpc, |
65 | | RemoteTablet* tablet, |
66 | | const std::shared_ptr<const YBTable>& table, |
67 | | rpc::RpcRetrier* retrier, |
68 | | Trace* trace, |
69 | | master::IncludeInactive include_inactive) |
70 | | : client_(client), |
71 | | command_(command), |
72 | | rpc_(rpc), |
73 | | tablet_(tablet), |
74 | | tablet_id_(tablet != nullptr ? tablet->tablet_id() : std::string()), |
75 | | table_(table), |
76 | | retrier_(retrier), |
77 | | trace_(trace), |
78 | | include_inactive_(include_inactive), |
79 | | local_tserver_only_(local_tserver_only), |
80 | 14.1M | consistent_prefix_(consistent_prefix) {} |
81 | | |
82 | 14.1M | TabletInvoker::~TabletInvoker() {} |
83 | | |
84 | 21.7k | void TabletInvoker::SelectTabletServerWithConsistentPrefix() { |
85 | 21.7k | TRACE_TO(trace_, "SelectTabletServerWithConsistentPrefix()"); |
86 | 21.7k | if (FLAGS_TEST_assert_failed_replicas_less_than) { |
87 | 3.36k | if (tablet_->GetNumFailedReplicas() >= FLAGS_TEST_assert_failed_replicas_less_than) { |
88 | 0 | LOG(FATAL) << "Invalid number of failed replicas: " << tablet_->ReplicasAsString(); |
89 | 0 | } |
90 | 3.36k | } |
91 | | |
92 | 21.7k | std::vector<RemoteTabletServer*> candidates; |
93 | 21.7k | current_ts_ = client_->data_->SelectTServer(tablet_.get(), |
94 | 21.7k | YBClient::ReplicaSelection::CLOSEST_REPLICA, {}, |
95 | 21.7k | &candidates); |
96 | 21.7k | VLOG(1) << "Using tserver: " << yb::ToString(current_ts_)0 ; |
97 | 21.7k | } |
98 | | |
99 | 80.8k | void TabletInvoker::SelectLocalTabletServer() { |
100 | 80.8k | TRACE_TO(trace_, "SelectLocalTabletServer()"); |
101 | | |
102 | 80.8k | current_ts_ = client_->data_->meta_cache_->local_tserver(); |
103 | 80.8k | VLOG(1) << "Using local tserver: " << current_ts_->ToString()0 ; |
104 | 80.8k | } |
105 | | |
106 | 14.1M | void TabletInvoker::SelectTabletServer() { |
107 | 14.1M | TRACE_TO(trace_, "SelectTabletServer()"); |
108 | | |
109 | 14.1M | assign_new_leader_ = false; |
110 | | // Choose a destination TS according to the following algorithm: |
111 | | // 1. Select the leader, provided: |
112 | | // a. One exists, and |
113 | | // b. It hasn't failed, and |
114 | | // c. It isn't currently marked as a follower. |
115 | | // 2. If there's no good leader select another replica, provided: |
116 | | // a. It hasn't failed, and |
117 | | // b. It hasn't rejected our write due to being a follower. |
118 | | // 3. If we're out of appropriate replicas, force a lookup to the master |
119 | | // to fetch new consensus configuration information. |
120 | | // 4. When the lookup finishes, forget which replicas were followers and |
121 | | // retry the write (i.e. goto 1). |
122 | | // 5. If we issue the write and it fails because the destination was a |
123 | | // follower, remember that fact and retry the write (i.e. goto 1). |
124 | | // 6. Repeat steps 1-5 until the write succeeds, fails for other reasons, |
125 | | // or the write's deadline expires. |
126 | 14.1M | current_ts_ = tablet_->LeaderTServer(); |
127 | 14.1M | if (current_ts_ && followers_.count(current_ts_)14.0M ) { |
128 | 18.4E | VLOG(2) << "Tablet " << tablet_id_ << ": We have a follower for a leader: " |
129 | 18.4E | << current_ts_->ToString(); |
130 | | |
131 | | // Mark the node as a follower in the cache so that on the next go-round, |
132 | | // LeaderTServer() will not return it as a leader unless a full metadata |
133 | | // refresh has occurred. This also avoids LookupTabletByKey() going into |
134 | | // "fast path" mode and not actually performing a metadata refresh from the |
135 | | // Master when it needs to. |
136 | 3.35k | tablet_->MarkTServerAsFollower(current_ts_); |
137 | 3.35k | current_ts_ = nullptr; |
138 | 3.35k | } |
139 | 14.1M | if (!current_ts_) { |
140 | | // Try to "guess" the next leader. |
141 | 37.5k | vector<RemoteTabletServer*> replicas; |
142 | 37.5k | tablet_->GetRemoteTabletServers(&replicas); |
143 | 67.9k | for (RemoteTabletServer* ts : replicas) { |
144 | 67.9k | if (!followers_.count(ts)) { |
145 | 26.6k | current_ts_ = ts; |
146 | 26.6k | break; |
147 | 26.6k | } |
148 | 67.9k | } |
149 | 37.5k | if (current_ts_) { |
150 | 26.6k | assign_new_leader_ = true; |
151 | 26.6k | } else { |
152 | 10.9k | YB_LOG_EVERY_N_SECS(INFO, 1) |
153 | 836 | << "Unable to pick leader for " << tablet_id_ << ", replicas: " << AsString(replicas) |
154 | 836 | << ", followers: " << AsString(followers_) << THROTTLE_MSG; |
155 | 10.9k | } |
156 | 14.0M | } else { |
157 | 14.0M | VLOG(4) << "Selected TServer " << current_ts_->ToString() << " as leader for " << tablet_id_7.33k ; |
158 | 14.0M | } |
159 | 14.1M | VTRACE_TO(1, trace_, "Selected $0", (current_ts_ ? current_ts_->ToString() : "none")); |
160 | 14.1M | } |
161 | | |
162 | 15.4M | void TabletInvoker::Execute(const std::string& tablet_id, bool leader_only) { |
163 | 15.4M | if (tablet_id_.empty()) { |
164 | 1.21M | if (!tablet_id.empty()1.20M ) { |
165 | 1.21M | tablet_id_ = tablet_id; |
166 | 18.4E | } else { |
167 | 18.4E | tablet_id_ = CHECK_NOTNULL(tablet_.get())->tablet_id(); |
168 | 18.4E | } |
169 | 1.20M | } |
170 | | |
171 | 15.4M | if (!tablet_) { |
172 | 1.20M | client_->LookupTabletById(tablet_id_, table_, include_inactive_, retrier_->deadline(), |
173 | 1.20M | std::bind(&TabletInvoker::InitialLookupTabletDone, this, _1), |
174 | 1.20M | UseCache::kTrue); |
175 | 1.20M | return; |
176 | 1.20M | } |
177 | | |
178 | 14.2M | if (consistent_prefix_ && !leader_only26.4k ) { |
179 | 22.0k | bool refresh_cache = false; |
180 | 22.0k | if (PREDICT_FALSE(FLAGS_force_lookup_cache_refresh_secs > 0) && |
181 | 22.0k | MonoTime::Now().GetDeltaSince(tablet_->refresh_time()).ToSeconds() > |
182 | 0 | FLAGS_force_lookup_cache_refresh_secs) { |
183 | |
|
184 | 0 | refresh_cache = true; |
185 | |
|
186 | 0 | VLOG(1) << "Updating tablet " << tablet_->tablet_id() << " replicas cache " |
187 | 0 | << "force_lookup_cache_refresh_secs: " << FLAGS_force_lookup_cache_refresh_secs |
188 | 0 | << ". " << MonoTime::Now().GetDeltaSince(tablet_->refresh_time()).ToSeconds() |
189 | 0 | << " seconds since the last update. Replicas in current cache: " |
190 | 0 | << tablet_->ReplicasAsString(); |
191 | 22.0k | } else if (FLAGS_lookup_cache_refresh_secs > 0 && |
192 | 22.0k | MonoTime::Now().GetDeltaSince(tablet_->refresh_time()).ToSeconds() > |
193 | 22.0k | FLAGS_lookup_cache_refresh_secs && |
194 | 22.0k | !tablet_->IsReplicasCountConsistent()3.59k ) { |
195 | 356 | refresh_cache = true; |
196 | 356 | VLOG(1) << "Updating tablet " << tablet_->tablet_id() << " replicas cache " |
197 | 0 | << "force_lookup_cache_refresh_secs: " << FLAGS_force_lookup_cache_refresh_secs |
198 | 0 | << ". " << MonoTime::Now().GetDeltaSince(tablet_->refresh_time()).ToSeconds() |
199 | 0 | << " seconds since the last update. Replicas in current cache: " |
200 | 0 | << tablet_->ReplicasAsString(); |
201 | 356 | } |
202 | | |
203 | | |
204 | 22.0k | if (refresh_cache) { |
205 | 356 | client_->LookupTabletById(tablet_id_, |
206 | 356 | table_, |
207 | 356 | include_inactive_, |
208 | 356 | retrier_->deadline(), |
209 | 356 | std::bind(&TabletInvoker::LookupTabletCb, this, _1), |
210 | 356 | UseCache::kFalse); |
211 | 356 | return; |
212 | 356 | } |
213 | 22.0k | } |
214 | | |
215 | | // Sets current_ts_. |
216 | 14.2M | if (local_tserver_only_) { |
217 | 80.8k | SelectLocalTabletServer(); |
218 | 14.1M | } else if (consistent_prefix_ && !leader_only26.1k ) { |
219 | 21.7k | SelectTabletServerWithConsistentPrefix(); |
220 | 14.1M | } else { |
221 | 14.1M | SelectTabletServer(); |
222 | 14.1M | } |
223 | | |
224 | | // If we've tried all replicas, force a lookup to the master to find the |
225 | | // new leader. This relies on some properties of LookupTabletByKey(): |
226 | | // 1. The fast path only works when there's a non-failed leader (which we |
227 | | // know is untrue here). |
228 | | // 2. The slow path always fetches consensus configuration information and |
229 | | // updates the looked-up tablet. |
230 | | // Put another way, we don't care about the lookup results at all; we're |
231 | | // just using it to fetch the latest consensus configuration information. |
232 | | // |
233 | 14.2M | if (!current_ts_) { |
234 | 10.9k | client_->LookupTabletById(tablet_id_, |
235 | 10.9k | table_, |
236 | 10.9k | include_inactive_, |
237 | 10.9k | retrier_->deadline(), |
238 | 10.9k | std::bind(&TabletInvoker::LookupTabletCb, this, _1), |
239 | 10.9k | UseCache::kTrue); |
240 | 10.9k | return; |
241 | 10.9k | } |
242 | | |
243 | | // Make sure we have a working proxy before sending out the RPC. |
244 | 14.1M | auto status = current_ts_->InitProxy(client_); |
245 | | |
246 | | // Fail to a replica in the event of a DNS resolution failure. |
247 | 14.1M | if (!status.ok()) { |
248 | 0 | status = FailToNewReplica(status); |
249 | 0 | if (!status.ok()) { |
250 | 0 | command_->Finished(status); |
251 | 0 | } |
252 | 0 | return; |
253 | 0 | } |
254 | | |
255 | | // Now that the current_ts_ is set, check if we need to send the request to the node local forward |
256 | | // proxy. |
257 | 14.1M | should_use_local_node_proxy_ = ShouldUseNodeLocalForwardProxy(); |
258 | | |
259 | 18.4E | VLOG(2) << "Tablet " << tablet_id_ << ": Sending " << command_->ToString() << " to replica " |
260 | 18.4E | << current_ts_->ToString() << " using local node forward proxy " |
261 | 18.4E | << should_use_local_node_proxy_; |
262 | | |
263 | 14.1M | rpc_->SendRpcToTserver(retrier_->attempt_num()); |
264 | 14.1M | } |
265 | | |
266 | 14.2M | bool TabletInvoker::ShouldUseNodeLocalForwardProxy() { |
267 | 14.2M | DCHECK(current_ts_); |
268 | 14.2M | return FLAGS_ysql_forward_rpcs_to_local_tserver && |
269 | 14.2M | client().GetNodeLocalForwardProxy()0 && |
270 | 14.2M | !(current_ts_->ProxyEndpoint() == client().GetMasterLeaderAddress())0 && |
271 | 14.2M | !(current_ts_->ProxyEndpoint() == client().GetNodeLocalTServerHostPort())0 ; |
272 | 14.2M | } |
273 | | |
274 | | Status TabletInvoker::FailToNewReplica(const Status& reason, |
275 | 29.3k | const tserver::TabletServerErrorPB* error_code) { |
276 | 29.3k | if (ErrorCode(error_code) == tserver::TabletServerErrorPB::STALE_FOLLOWER) { |
277 | 4.00k | VLOG(1) << "Stale follower for " << command_->ToString() << " just retry"0 ; |
278 | 25.3k | } else if (ErrorCode(error_code) == tserver::TabletServerErrorPB::NOT_THE_LEADER) { |
279 | 22.5k | VLOG(1) << "Not the leader for " << command_->ToString() |
280 | 17 | << " retrying with a different replica"; |
281 | | // In the past we were marking a replica as failed whenever an error was returned. The problem |
282 | | // with this approach is that not all type of errors mean that the replica has failed. Some |
283 | | // errors like NOT_THE_LEADER are only specific to certain type of requests (Write and |
284 | | // UpdateTransaction RPCs), but other type of requests don't need to be sent to the leader |
285 | | // (consistent prefix reads). So instead of marking a replica as failed for all the RPCs (since |
286 | | // the RemoteTablet object is shared across all the rpcs in the same batcher), this remote |
287 | | // tablet server is marked as a follower so that it's not used during a retry for requests that |
288 | | // need to contact the leader only. This has the same effect as marking the replica as failed |
289 | | // for this specific RPC, but without affecting other RPCs. |
290 | 22.5k | followers_.emplace(current_ts_, FollowerData { |
291 | 22.5k | .status = STATUS(IllegalState, "Not the leader"), |
292 | 22.5k | .time = CoarseMonoClock::now() |
293 | 22.5k | }); |
294 | 22.5k | } else { |
295 | 2.82k | VLOG(1) << "Failing " << command_->ToString() << " to a new replica: " << reason |
296 | 34 | << ", old replica: " << yb::ToString(current_ts_); |
297 | | |
298 | 2.82k | bool found = !tablet_ || tablet_->MarkReplicaFailed(current_ts_, reason)2.79k ; |
299 | 2.82k | if (!found) { |
300 | | // Its possible that current_ts_ is not part of replicas if RemoteTablet.Refresh() is invoked |
301 | | // which updates the set of replicas. |
302 | 3 | LOG(WARNING) << "Tablet " << tablet_id_ << ": Unable to mark replica " |
303 | 3 | << current_ts_->ToString() |
304 | 3 | << " as failed. Replicas: " << tablet_->ReplicasAsString(); |
305 | 3 | } |
306 | 2.82k | } |
307 | | |
308 | 29.3k | auto status = retrier_->DelayedRetry(command_, reason); |
309 | 29.3k | if (!status.ok()) { |
310 | 0 | LOG(WARNING) << "Failed to schedule retry on new replica: " << status; |
311 | 0 | } |
312 | 29.3k | return status; |
313 | 29.3k | } |
314 | | |
315 | 14.1M | bool TabletInvoker::Done(Status* status) { |
316 | 14.1M | TRACE_TO(trace_, "Done($0)", status->ToString(false)); |
317 | 14.1M | ADOPT_TRACE(trace_); |
318 | | |
319 | 14.1M | bool assign_new_leader = assign_new_leader_; |
320 | 14.1M | assign_new_leader_ = false; |
321 | | |
322 | 14.1M | if (status->IsAborted() || retrier_->finished()14.1M ) { |
323 | 3 | if (status->ok()) { |
324 | 3 | *status = retrier_->controller().status(); |
325 | 3 | if (status->ok()) { |
326 | 3 | *status = STATUS(Aborted, "Retrier finished"); |
327 | 3 | } |
328 | 3 | } |
329 | 3 | return true; |
330 | 3 | } |
331 | | |
332 | | // Prefer early failures over controller failures. |
333 | 14.1M | if (status->ok() && retrier_->HandleResponse(command_, status)14.1M ) { |
334 | 33 | return false; |
335 | 33 | } |
336 | | |
337 | | // Failover to a replica in the event of any network failure. |
338 | | // |
339 | | // TODO: This is probably too harsh; some network failures should be |
340 | | // retried on the current replica. |
341 | 14.1M | if (status->IsNetworkError()) { |
342 | | // The whole operation is completed if we can't schedule a retry. |
343 | 951 | return !FailToNewReplica(*status).ok(); |
344 | 951 | } |
345 | | |
346 | | // Prefer controller failures over response failures. |
347 | 14.1M | auto rsp_err = rpc_->response_error(); |
348 | 14.1M | { |
349 | 14.1M | Status resp_error_status = ErrorStatus(rsp_err); |
350 | 14.1M | if (status->ok() && !resp_error_status.ok()14.1M ) { |
351 | 288k | *status = resp_error_status; |
352 | 13.8M | } else if (status->IsRemoteError()) { |
353 | 24 | if (!resp_error_status.ok()) { |
354 | 0 | *status = resp_error_status; |
355 | 24 | } else { |
356 | 24 | const auto* error = retrier_->controller().error_response(); |
357 | 24 | if (error && |
358 | 24 | (error->code() == rpc::ErrorStatusPB::FATAL_SERVER_SHUTTING_DOWN || |
359 | 24 | error->code() == rpc::ErrorStatusPB::ERROR_NO_SUCH_SERVICE)) { |
360 | 0 | *status = STATUS(ServiceUnavailable, error->message()); |
361 | 0 | } |
362 | 24 | } |
363 | 24 | } |
364 | 14.1M | } |
365 | | |
366 | 14.1M | const bool is_tablet_split = ErrorCode(rsp_err) == tserver::TabletServerErrorPB::TABLET_SPLIT; |
367 | 14.1M | if (is_tablet_split || ClientError(*status) == ClientErrorCode::kTablePartitionListIsStale14.1M ) { |
368 | | // Replace status error with TryAgain, so upper layer retry request after refreshing |
369 | | // table partitioning metadata. |
370 | 37 | *status = status->CloneAndReplaceCode(Status::kTryAgain); |
371 | 37 | if (is_tablet_split) { |
372 | 35 | tablet_->MarkAsSplit(); |
373 | 35 | } |
374 | 37 | rpc_->Failed(*status); |
375 | 37 | return true; |
376 | 37 | } |
377 | | |
378 | | // Oops, we failed over to a replica that wasn't a LEADER. Unlikely as |
379 | | // we're using consensus configuration information from the master, but still possible |
380 | | // (e.g. leader restarted and became a FOLLOWER). Try again. |
381 | | // |
382 | | // TODO: IllegalState is obviously way too broad an error category for |
383 | | // this case. |
384 | 14.1M | if (status->IsIllegalState() || status->IsServiceUnavailable()14.1M || status->IsAborted()14.1M || |
385 | 14.1M | status->IsLeaderNotReadyToServe()14.1M || status->IsLeaderHasNoLease()14.0M || |
386 | 14.1M | TabletNotFoundOnTServer(rsp_err, *status)14.0M || |
387 | 14.1M | (14.1M status->IsTimedOut()14.1M && CoarseMonoClock::Now() < retrier_->deadline()1.61k )) { |
388 | 18.4E | VLOG(4) << "Retryable failure: " << *status |
389 | 18.4E | << ", response: " << yb::ToString(rsp_err); |
390 | | |
391 | 40.5k | const bool leader_is_not_ready = |
392 | 40.5k | ErrorCode(rsp_err) == |
393 | 40.5k | tserver::TabletServerErrorPB::LEADER_NOT_READY_TO_SERVE || |
394 | 40.5k | status->IsLeaderNotReadyToServe()33.2k ; |
395 | | |
396 | | // If the leader just is not ready - let's retry the same tserver. |
397 | | // Else the leader became a follower and must be reset on retry. |
398 | 40.5k | if (!leader_is_not_ready) { |
399 | 33.1k | followers_.emplace(current_ts_, FollowerData { |
400 | 33.1k | .status = *status, |
401 | 33.1k | .time = CoarseMonoClock::now() |
402 | 33.1k | }); |
403 | 33.1k | } |
404 | | |
405 | 40.5k | if (PREDICT_FALSE(FLAGS_TEST_assert_local_op) && current_ts_->IsLocal()0 && |
406 | 40.5k | status->IsIllegalState()0 ) { |
407 | 0 | CHECK(false) << "Operation is not local"; |
408 | 0 | } |
409 | | |
410 | | // If only local tserver is requested and it is not the leader, respond error and done. |
411 | | // Otherwise, continue below to retry. |
412 | 40.5k | if (local_tserver_only_ && current_ts_->IsLocal()11 && status->IsIllegalState()11 ) { |
413 | 9 | rpc_->Failed(*status); |
414 | 9 | return true; |
415 | 9 | } |
416 | | |
417 | 40.5k | if (status->IsIllegalState() || TabletNotFoundOnTServer(rsp_err, *status)13.1k ) { |
418 | | // The whole operation is completed if we can't schedule a retry. |
419 | 28.3k | return !FailToNewReplica(*status, rsp_err).ok(); |
420 | 28.3k | } else { |
421 | 12.2k | tserver::TabletServerDelay delay(*status); |
422 | 12.2k | auto retry_status = delay.value().Initialized() |
423 | 12.2k | ? retrier_->DelayedRetry(command_, *status, delay.value())0 |
424 | 12.2k | : retrier_->DelayedRetry(command_, *status); |
425 | 12.2k | if (!retry_status.ok()) { |
426 | 0 | command_->Finished(retry_status); |
427 | 0 | } |
428 | 12.2k | } |
429 | 12.2k | return false; |
430 | 40.5k | } |
431 | | |
432 | 14.1M | if (!status->ok()) { |
433 | 249k | if (status->IsTimedOut()) { |
434 | 1.61k | VLOG(1) << "Call to " << yb::ToString(tablet_) << " timed out. Marking replica " |
435 | 0 | << yb::ToString(current_ts_) << " as failed."; |
436 | 1.61k | if (tablet_ != nullptr && current_ts_ != nullptr1.60k ) { |
437 | 1.42k | tablet_->MarkReplicaFailed(current_ts_, *status); |
438 | 1.42k | } |
439 | 1.61k | } |
440 | 249k | if (status->IsExpired() && rpc_->ShouldRetryExpiredRequest()64.7k ) { |
441 | 0 | client_->MaybeUpdateMinRunningRequestId( |
442 | 0 | tablet_->tablet_id(), MinRunningRequestIdStatusData(*status).value()); |
443 | 0 | *status = STATUS( |
444 | 0 | TryAgain, status->message(), ClientError(ClientErrorCode::kExpiredRequestToBeRetried)); |
445 | 0 | } |
446 | 249k | std::string current_ts_string; |
447 | 249k | if (current_ts_) { |
448 | 249k | current_ts_string = Format("on tablet server $0", *current_ts_); |
449 | 249k | } else { |
450 | 462 | current_ts_string = "(no tablet server available)"; |
451 | 462 | } |
452 | 249k | Status log_status = status->CloneAndPrepend( |
453 | 249k | Format("Failed $0 to tablet $1 $2 after $3 attempt(s)", |
454 | 249k | command_->ToString(), |
455 | 249k | tablet_id_, |
456 | 249k | current_ts_string, |
457 | 249k | retrier_->attempt_num())); |
458 | 249k | if (status->IsTryAgain() || status->IsExpired()66.7k || status->IsAlreadyPresent()2.01k ) { |
459 | 247k | YB_LOG_EVERY_N_SECS(INFO, 1) << log_status2.29k ; |
460 | 247k | } else { |
461 | 2.10k | YB_LOG_EVERY_N_SECS(WARNING, 1) << log_status562 ; |
462 | 2.10k | } |
463 | 249k | rpc_->Failed(*status); |
464 | 13.8M | } else if (assign_new_leader && current_ts_1.64k ) { |
465 | 1.64k | bool assigned = tablet_->MarkTServerAsLeader(current_ts_); |
466 | 1.64k | LOG_IF(INFO, !assigned) |
467 | 0 | << "Unable to mark as leader: " << current_ts_->ToString() << " for " |
468 | 0 | << tablet_->ToString(); |
469 | 1.64k | } |
470 | | |
471 | 14.1M | return true; |
472 | 14.1M | } |
473 | | |
474 | 1.21M | void TabletInvoker::InitialLookupTabletDone(const Result<RemoteTabletPtr>& result) { |
475 | 1.21M | VLOG(1) << "InitialLookupTabletDone(" << result << ")"664 ; |
476 | | |
477 | 1.21M | if (result.ok()) { |
478 | 1.20M | tablet_ = *result; |
479 | 1.20M | Execute(std::string()); |
480 | 1.20M | } else { |
481 | 1.89k | command_->Finished(result.status()); |
482 | 1.89k | } |
483 | 1.21M | } |
484 | | |
485 | 11.7M | bool TabletInvoker::IsLocalCall() const { |
486 | 11.7M | return current_ts_ != nullptr && current_ts_->IsLocal()11.7M ; |
487 | 11.7M | } |
488 | | |
489 | 2.09M | std::shared_ptr<tserver::TabletServerServiceProxy> TabletInvoker::proxy() const { |
490 | 2.09M | return current_ts_->proxy(); |
491 | 2.09M | } |
492 | | |
493 | 0 | ::yb::HostPort TabletInvoker::ProxyEndpoint() const { |
494 | 0 | return current_ts_->ProxyEndpoint(); |
495 | 0 | } |
496 | | |
497 | 11.0k | void TabletInvoker::LookupTabletCb(const Result<RemoteTabletPtr>& result) { |
498 | 11.0k | VLOG_WITH_FUNC0 (1) << AsString(result) << ", command: " << command_->ToString() |
499 | 0 | << ", retrier: " << retrier_->ToString(); |
500 | | |
501 | 11.0k | if (result.ok()) { |
502 | 10.6k | #ifndef DEBUG |
503 | 10.6k | TRACE_TO(trace_, Format("LookupTabletCb($0)", *result)); |
504 | | #else |
505 | | TRACE_TO(trace_, "LookupTabletCb(OK)"); |
506 | | #endif |
507 | 10.6k | } else { |
508 | 370 | TRACE_TO(trace_, "LookupTabletCb($0)", result.status().ToString(false)); |
509 | 370 | } |
510 | | |
511 | | // We should retry the RPC regardless of the outcome of the lookup, as |
512 | | // leader election doesn't depend on the existence of a master at all. |
513 | | // Unless we know that this status is persistent. |
514 | | // For instance if tablet was deleted, we would always receive "Not found". |
515 | 11.0k | if (!result.ok() && |
516 | 11.0k | (370 result.status().IsNotFound()370 || |
517 | 370 | ClientError(result.status()) == ClientErrorCode::kTablePartitionListIsStale60 )) { |
518 | 312 | command_->Finished(result.status()); |
519 | 312 | return; |
520 | 312 | } |
521 | | |
522 | | // Retry() imposes a slight delay, which is desirable in a lookup loop, |
523 | | // but unnecessary the first time through. Seeing as leader failures are |
524 | | // rare, perhaps this doesn't matter. |
525 | 10.6k | followers_.clear(); |
526 | 10.6k | auto retry_status = retrier_->DelayedRetry( |
527 | 10.6k | command_, result.ok() ? Status::OK()10.6k : result.status()59 ); |
528 | 10.6k | if (!retry_status.ok()) { |
529 | 0 | command_->Finished(!result.ok() ? result.status() : retry_status); |
530 | 0 | } |
531 | 10.6k | } |
532 | | |
533 | | void TabletInvoker::WriteAsync(const tserver::WriteRequestPB& req, |
534 | | tserver::WriteResponsePB *resp, |
535 | | rpc::RpcController *controller, |
536 | 2.55M | std::function<void()>&& cb) { |
537 | 2.55M | if (should_use_local_node_proxy_) { |
538 | 0 | client().GetNodeLocalForwardProxy()->WriteAsync(req, resp, controller, move(cb)); |
539 | 2.55M | } else { |
540 | 2.55M | current_ts_->proxy()->WriteAsync(req, resp, controller, move(cb)); |
541 | 2.55M | } |
542 | 2.55M | } |
543 | | |
544 | | void TabletInvoker::ReadAsync(const tserver::ReadRequestPB& req, |
545 | | tserver::ReadResponsePB *resp, |
546 | | rpc::RpcController *controller, |
547 | 9.56M | std::function<void()>&& cb) { |
548 | 9.56M | if (should_use_local_node_proxy_) { |
549 | 0 | client().GetNodeLocalForwardProxy()->ReadAsync(req, resp, controller, move(cb)); |
550 | 9.56M | } else { |
551 | 9.56M | current_ts_->proxy()->ReadAsync(req, resp, controller, move(cb)); |
552 | 9.56M | } |
553 | 9.56M | } |
554 | | |
555 | 2.20k | std::string TabletInvoker::FollowerData::ToString() const { |
556 | 2.20k | return Format("{ status: $0 time: $1 }", status, CoarseMonoClock::now() - time); |
557 | 2.20k | } |
558 | | |
559 | 14.1M | Status ErrorStatus(const tserver::TabletServerErrorPB* error) { |
560 | 14.1M | return error == nullptr ? Status::OK()13.8M |
561 | 14.1M | : StatusFromPB(error->status())288k ; |
562 | 14.1M | } |
563 | | |
564 | 14.2M | tserver::TabletServerErrorPB_Code ErrorCode(const tserver::TabletServerErrorPB* error) { |
565 | 14.2M | return error == nullptr ? tserver::TabletServerErrorPB::UNKNOWN_ERROR13.8M |
566 | 14.2M | : error->code()382k ; |
567 | 14.2M | } |
568 | | |
569 | | } // namespace internal |
570 | | } // namespace client |
571 | | } // namespace yb |