/Users/deen/code/yugabyte-db/src/yb/master/async_rpc_tasks.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | // |
13 | | |
14 | | #include "yb/master/async_rpc_tasks.h" |
15 | | |
16 | | #include "yb/common/wire_protocol.h" |
17 | | |
18 | | #include "yb/consensus/consensus.proxy.h" |
19 | | #include "yb/consensus/consensus_meta.h" |
20 | | |
21 | | #include "yb/gutil/map-util.h" |
22 | | |
23 | | #include "yb/master/catalog_entity_info.h" |
24 | | #include "yb/master/catalog_manager_if.h" |
25 | | #include "yb/master/master.h" |
26 | | #include "yb/master/tablet_split_complete_handler.h" |
27 | | #include "yb/master/ts_descriptor.h" |
28 | | #include "yb/master/ts_manager.h" |
29 | | |
30 | | #include "yb/rpc/messenger.h" |
31 | | |
32 | | #include "yb/tserver/backup.proxy.h" |
33 | | #include "yb/tserver/tserver_admin.proxy.h" |
34 | | #include "yb/tserver/tserver_service.proxy.h" |
35 | | |
36 | | #include "yb/util/atomic.h" |
37 | | #include "yb/util/flag_tags.h" |
38 | | #include "yb/util/metrics.h" |
39 | | #include "yb/util/source_location.h" |
40 | | #include "yb/util/status_format.h" |
41 | | #include "yb/util/status_log.h" |
42 | | #include "yb/util/thread_restrictions.h" |
43 | | #include "yb/util/threadpool.h" |
44 | | |
45 | | using namespace std::literals; |
46 | | |
47 | | DEFINE_int32(unresponsive_ts_rpc_timeout_ms, 15 * 60 * 1000, // 15 minutes |
48 | | "After this amount of time (or after we have retried unresponsive_ts_rpc_retry_limit " |
49 | | "times, whichever happens first), the master will stop attempting to contact a tablet " |
50 | | "server in order to perform operations such as deleting a tablet."); |
51 | | TAG_FLAG(unresponsive_ts_rpc_timeout_ms, advanced); |
52 | | |
53 | | DEFINE_int32(unresponsive_ts_rpc_retry_limit, 20, |
54 | | "After this number of retries (or unresponsive_ts_rpc_timeout_ms expires, whichever " |
55 | | "happens first), the master will stop attempting to contact a tablet server in order " |
56 | | "to perform operations such as deleting a tablet."); |
57 | | TAG_FLAG(unresponsive_ts_rpc_retry_limit, advanced); |
58 | | |
59 | | DEFINE_int32(retrying_ts_rpc_max_delay_ms, 60 * 1000, |
60 | | "Maximum delay between successive attempts to contact an unresponsive tablet server"); |
61 | | TAG_FLAG(retrying_ts_rpc_max_delay_ms, advanced); |
62 | | |
63 | | DEFINE_test_flag(int32, slowdown_master_async_rpc_tasks_by_ms, 0, |
64 | | "For testing purposes, slow down the run method to take longer."); |
65 | | |
66 | | // The flags are defined in catalog_manager.cc. |
67 | | DECLARE_int32(master_ts_rpc_timeout_ms); |
68 | | DECLARE_int32(tablet_creation_timeout_ms); |
69 | | DECLARE_int32(TEST_slowdown_alter_table_rpcs_ms); |
70 | | |
71 | | namespace yb { |
72 | | namespace master { |
73 | | |
74 | | using namespace std::placeholders; |
75 | | |
76 | | using std::string; |
77 | | using std::shared_ptr; |
78 | | |
79 | | using strings::Substitute; |
80 | | using consensus::RaftPeerPB; |
81 | | using server::MonitoredTaskState; |
82 | | using tserver::TabletServerErrorPB; |
83 | | |
84 | | void RetryingTSRpcTask::UpdateMetrics(scoped_refptr<Histogram> metric, MonoTime start_time, |
85 | | const std::string& metric_name, |
86 | 884k | const std::string& metric_type) { |
87 | 884k | metric->Increment(MonoTime::Now().GetDeltaSince(start_time).ToMicroseconds()); |
88 | 884k | } |
89 | | |
90 | | // ============================================================================ |
91 | | // Class PickSpecificUUID. |
92 | | // ============================================================================ |
93 | 304k | Status PickSpecificUUID::PickReplica(TSDescriptor** ts_desc) { |
94 | 304k | shared_ptr<TSDescriptor> ts; |
95 | 304k | if (!master_->ts_manager()->LookupTSByUUID(ts_uuid_, &ts)) { |
96 | 0 | return STATUS(NotFound, "unknown tablet server id", ts_uuid_); |
97 | 0 | } |
98 | 304k | *ts_desc = ts.get(); |
99 | 304k | return Status::OK(); |
100 | 304k | } |
101 | | |
102 | 0 | string ReplicaMapToString(const TabletReplicaMap& replicas) { |
103 | 0 | string ret = ""; |
104 | 0 | for (const auto& r : replicas) { |
105 | 0 | if (!ret.empty()) { |
106 | 0 | ret += ", "; |
107 | 0 | } else { |
108 | 0 | ret += "("; |
109 | 0 | } |
110 | 0 | ret += r.second.ts_desc->permanent_uuid(); |
111 | 0 | } |
112 | 0 | ret += ")"; |
113 | 0 | return ret; |
114 | 0 | } |
115 | | |
116 | | // ============================================================================ |
117 | | // Class PickLeaderReplica. |
118 | | // ============================================================================ |
119 | | PickLeaderReplica::PickLeaderReplica(const scoped_refptr<TabletInfo>& tablet) |
120 | 157k | : tablet_(tablet) { |
121 | 157k | } |
122 | | |
123 | 161k | Status PickLeaderReplica::PickReplica(TSDescriptor** ts_desc) { |
124 | 161k | *ts_desc = VERIFY_RESULT158k (158k tablet_->GetLeader()); |
125 | 0 | return Status::OK(); |
126 | 161k | } |
127 | | |
128 | | // ============================================================================ |
129 | | // Class RetryingTSRpcTask. |
130 | | // ============================================================================ |
131 | | |
132 | | RetryingTSRpcTask::RetryingTSRpcTask(Master *master, |
133 | | ThreadPool* callback_pool, |
134 | | std::unique_ptr<TSPicker> replica_picker, |
135 | | const scoped_refptr<TableInfo>& table) |
136 | | : master_(master), |
137 | | callback_pool_(callback_pool), |
138 | | replica_picker_(std::move(replica_picker)), |
139 | | table_(table), |
140 | | start_ts_(MonoTime::Now()), |
141 | 421k | deadline_(start_ts_ + FLAGS_unresponsive_ts_rpc_timeout_ms * 1ms) { |
142 | 421k | } |
143 | | |
144 | 281k | RetryingTSRpcTask::~RetryingTSRpcTask() { |
145 | 281k | auto state = state_.load(std::memory_order_acquire); |
146 | 281k | LOG_IF(DFATAL, !IsStateTerminal(state)) |
147 | 1 | << "Destroying " << this << " task in a wrong state: " << AsString(state); |
148 | 281k | VLOG_WITH_FUNC1 (1) << "Destroying " << this << " in " << AsString(state)1 ; |
149 | 281k | } |
150 | | |
151 | 465k | std::string RetryingTSRpcTask::LogPrefix() const { |
152 | 465k | return Format("$0 (task=$1, state=$2): ", description(), static_cast<const void*>(this), state()); |
153 | 465k | } |
154 | | |
155 | 1.06M | std::string RetryingTSRpcTask::table_name() const { |
156 | 1.06M | return !table_ ? ""0 : table_->ToString(); |
157 | 1.06M | } |
158 | | |
159 | | // Send the subclass RPC request. |
160 | 466k | Status RetryingTSRpcTask::Run() { |
161 | 466k | VLOG_WITH_PREFIX43 (1) << "Start Running"43 ; |
162 | 466k | attempt_start_ts_ = MonoTime::Now(); |
163 | 466k | ++attempt_; |
164 | 18.4E | VLOG_WITH_PREFIX(1) << "Start Running, attempt: " << attempt_; |
165 | 466k | for (;;) { |
166 | 466k | auto task_state = state(); |
167 | 466k | if (task_state == MonitoredTaskState::kAborted) { |
168 | 0 | return STATUS(IllegalState, "Unable to run task because it has been aborted"); |
169 | 0 | } |
170 | 466k | if (task_state == MonitoredTaskState::kWaiting) { |
171 | 466k | break; |
172 | 466k | } |
173 | | |
174 | 102 | LOG_IF_WITH_PREFIX(DFATAL, task_state != MonitoredTaskState::kScheduling) |
175 | 102 | << "Expected task to be in kScheduling state but found: " << AsString(task_state); |
176 | | |
177 | | // We expect this case to be very rare, since we switching to waiting state right after |
178 | | // scheduling task on messenger. So just busy wait. |
179 | 102 | std::this_thread::yield(); |
180 | 102 | } |
181 | | |
182 | 466k | Status s = ResetTSProxy(); |
183 | 466k | if (!s.ok()) { |
184 | 2.54k | s = s.CloneAndPrepend("Failed to reset TS proxy"); |
185 | 2.54k | LOG_WITH_PREFIX(INFO) << s; |
186 | 2.54k | if (s.IsExpired()) { |
187 | 0 | TransitionToTerminalState(MonitoredTaskState::kWaiting, MonitoredTaskState::kFailed, s); |
188 | 0 | UnregisterAsyncTask(); |
189 | 0 | return s; |
190 | 0 | } |
191 | 2.55k | if (2.54k RescheduleWithBackoffDelay()2.54k ) { |
192 | 2.55k | return Status::OK(); |
193 | 2.55k | } |
194 | | |
195 | 18.4E | auto state = this->state(); |
196 | 18.4E | UnregisterAsyncTask(); // May delete this. |
197 | | |
198 | 18.4E | if (state == MonitoredTaskState::kFailed) { |
199 | 0 | return s; |
200 | 0 | } |
201 | 18.4E | if (state == MonitoredTaskState::kAborted) { |
202 | 0 | return STATUS(IllegalState, "Unable to run task because it has been aborted"); |
203 | 0 | } |
204 | | |
205 | 18.4E | LOG_WITH_PREFIX(FATAL) << "Failed to change task to MonitoredTaskState::kFailed state from " |
206 | 18.4E | << state; |
207 | 18.4E | } else { |
208 | 463k | rpc_.Reset(); |
209 | 463k | } |
210 | | |
211 | | // Calculate and set the timeout deadline. |
212 | 463k | const MonoTime deadline = ComputeDeadline(); |
213 | 463k | rpc_.set_deadline(deadline); |
214 | | |
215 | 463k | if (!PerformStateTransition(MonitoredTaskState::kWaiting, MonitoredTaskState::kRunning)) { |
216 | 0 | if (state() == MonitoredTaskState::kAborted) { |
217 | 0 | return STATUS(Aborted, "Unable to run task because it has been aborted"); |
218 | 0 | } |
219 | | |
220 | 0 | LOG_WITH_PREFIX(DFATAL) << |
221 | 0 | "Task transition MonitoredTaskState::kWaiting -> MonitoredTaskState::kRunning failed"; |
222 | 0 | return Failed(STATUS_FORMAT(IllegalState, "Task in invalid state $0", state())); |
223 | 0 | } |
224 | | |
225 | 463k | auto slowdown_flag_val = GetAtomicFlag(&FLAGS_TEST_slowdown_master_async_rpc_tasks_by_ms); |
226 | 463k | if (PREDICT_FALSE(slowdown_flag_val> 0)) { |
227 | 0 | VLOG_WITH_PREFIX(1) << "Slowing down by " << slowdown_flag_val << " ms."; |
228 | 0 | bool old_thread_restriction = ThreadRestrictions::SetWaitAllowed(true); |
229 | 0 | SleepFor(MonoDelta::FromMilliseconds(slowdown_flag_val)); |
230 | 0 | ThreadRestrictions::SetWaitAllowed(old_thread_restriction); |
231 | 0 | VLOG_WITH_PREFIX(2) << "Slowing down done. Resuming."; |
232 | 0 | } |
233 | 463k | if (!SendRequest(attempt_) && !RescheduleWithBackoffDelay()416 ) { |
234 | 416 | UnregisterAsyncTask(); // May call 'delete this'. |
235 | 416 | } |
236 | 463k | return Status::OK(); |
237 | 463k | } |
238 | | |
239 | 458k | MonoTime RetryingTSRpcTask::ComputeDeadline() { |
240 | 458k | MonoTime timeout = MonoTime::Now(); |
241 | 458k | timeout.AddDelta(MonoDelta::FromMilliseconds(FLAGS_master_ts_rpc_timeout_ms)); |
242 | 458k | return MonoTime::Earliest(timeout, deadline_); |
243 | 458k | } |
244 | | |
245 | | // Abort this task and return its value before it was successfully aborted. If the task entered |
246 | | // a different terminal state before we were able to abort it, return that state. |
247 | 805 | MonitoredTaskState RetryingTSRpcTask::AbortAndReturnPrevState(const Status& status) { |
248 | 805 | auto prev_state = state(); |
249 | 805 | while (!IsStateTerminal(prev_state)) { |
250 | 722 | auto expected = prev_state; |
251 | 722 | if (state_.compare_exchange_weak(expected, MonitoredTaskState::kAborted)) { |
252 | 722 | VLOG_WITH_PREFIX_AND_FUNC0 (1) |
253 | 0 | << "Aborted with: " << status << ", prev state: " << AsString(prev_state); |
254 | 722 | AbortIfScheduled(); |
255 | 722 | Finished(status); |
256 | 722 | UnregisterAsyncTask(); |
257 | 722 | return prev_state; |
258 | 722 | } |
259 | 0 | prev_state = state(); |
260 | 0 | } |
261 | 83 | VLOG_WITH_PREFIX_AND_FUNC0 (1) |
262 | 0 | << "Already terminated, prev state: " << AsString(prev_state); |
263 | 83 | UnregisterAsyncTask(); |
264 | 83 | return prev_state; |
265 | 805 | } |
266 | | |
267 | 475 | void RetryingTSRpcTask::AbortTask(const Status& status) { |
268 | 475 | AbortAndReturnPrevState(status); |
269 | 475 | } |
270 | | |
271 | 463k | void RetryingTSRpcTask::RpcCallback() { |
272 | | // Defer the actual work of the callback off of the reactor thread. |
273 | | // This is necessary because our callbacks often do synchronous writes to |
274 | | // the catalog table, and we can't do synchronous IO on the reactor. |
275 | | // |
276 | | // Note: This can fail on shutdown, so just print a warning for it. |
277 | 463k | Status s = callback_pool_->SubmitFunc( |
278 | 463k | std::bind(&RetryingTSRpcTask::DoRpcCallback, shared_from(this))); |
279 | 18.4E | VLOG_WITH_PREFIX_AND_FUNC(3) << "Submit status: " << s; |
280 | 463k | if (!s.ok()) { |
281 | 0 | WARN_NOT_OK(s, "Could not submit to queue, probably shutting down"); |
282 | 0 | AbortTask(s); |
283 | 0 | } |
284 | 463k | } |
285 | | |
286 | | // Handle the actual work of the RPC callback. This is run on the master's worker |
287 | | // pool, rather than a reactor thread, so it may do blocking IO operations. |
288 | 463k | void RetryingTSRpcTask::DoRpcCallback() { |
289 | 463k | VLOG_WITH_PREFIX_AND_FUNC102 (3) << "Rpc status: " << rpc_.status()102 ; |
290 | | |
291 | 463k | if (!rpc_.status().ok()) { |
292 | 1.38k | LOG_WITH_PREFIX(WARNING) << "TS " << target_ts_desc_->permanent_uuid() << ": " |
293 | 1.38k | << type_name() << " RPC failed for tablet " |
294 | 1.38k | << tablet_id() << ": " << rpc_.status().ToString(); |
295 | 1.38k | if (!target_ts_desc_->IsLive() && type() == ASYNC_DELETE_REPLICA235 ) { |
296 | 222 | LOG_WITH_PREFIX(WARNING) |
297 | 222 | << "TS " << target_ts_desc_->permanent_uuid() << ": delete failed for tablet " |
298 | 222 | << tablet_id() << ". TS is DEAD. No further retry."; |
299 | 222 | TransitionToCompleteState(); |
300 | 222 | } |
301 | 461k | } else if (state() != MonitoredTaskState::kAborted) { |
302 | 461k | HandleResponse(attempt_); // Modifies state_. |
303 | 461k | } |
304 | 463k | UpdateMetrics(master_->GetMetric(type_name(), Master::AttemptMetric, description()), |
305 | 463k | attempt_start_ts_, type_name(), "attempt metric"); |
306 | | |
307 | | // Schedule a retry if the RPC call was not successful. |
308 | 463k | if (RescheduleWithBackoffDelay()) { |
309 | 42.8k | return; |
310 | 42.8k | } |
311 | | |
312 | 420k | UnregisterAsyncTask(); // May call 'delete this'. |
313 | 420k | } |
314 | | |
315 | 42.5k | int RetryingTSRpcTask::num_max_retries() { return FLAGS_unresponsive_ts_rpc_retry_limit; } |
316 | | |
317 | 8 | int RetryingTSRpcTask::max_delay_ms() { |
318 | 8 | return FLAGS_retrying_ts_rpc_max_delay_ms; |
319 | 8 | } |
320 | | |
321 | 466k | bool RetryingTSRpcTask::RescheduleWithBackoffDelay() { |
322 | 466k | auto task_state = state(); |
323 | 466k | if (task_state != MonitoredTaskState::kRunning && |
324 | | // Allow kWaiting for task(s) that have never successfully ResetTSProxy(). |
325 | 466k | task_state != MonitoredTaskState::kWaiting423k ) { |
326 | 420k | if (task_state != MonitoredTaskState::kComplete) { |
327 | 455 | LOG_WITH_PREFIX(INFO) << "No reschedule for this task: " << AsString(task_state); |
328 | 455 | } |
329 | 420k | return false; |
330 | 420k | } |
331 | | |
332 | 45.4k | int attempt_threshold = std::numeric_limits<int>::max(); |
333 | 45.4k | if (NoRetryTaskType()) { |
334 | 0 | attempt_threshold = 0; |
335 | 45.4k | } else if (RetryLimitTaskType()) { |
336 | 42.5k | attempt_threshold = num_max_retries(); |
337 | 42.5k | } |
338 | | |
339 | 45.4k | if (attempt_ > attempt_threshold) { |
340 | 13 | auto status = STATUS_FORMAT( |
341 | 13 | Aborted, "Reached maximum number of retries ($0)", attempt_threshold); |
342 | 13 | LOG_WITH_PREFIX(WARNING) |
343 | 13 | << status << " for request " << description() |
344 | 13 | << ", task=" << this << " state=" << state(); |
345 | 13 | TransitionToFailedState(task_state, status); |
346 | 13 | return false; |
347 | 13 | } |
348 | | |
349 | 45.4k | MonoTime now = MonoTime::Now(); |
350 | | // We assume it might take 10ms to process the request in the best case, |
351 | | // fail if we have less than that amount of time remaining. |
352 | 45.4k | int64_t millis_remaining = deadline_.GetDeltaSince(now).ToMilliseconds() - 10; |
353 | | // Exponential backoff with jitter. |
354 | 45.4k | int64_t base_delay_ms; |
355 | 45.4k | if (attempt_ <= 12) { |
356 | 45.4k | base_delay_ms = 1 << (attempt_ + 3); // 1st retry delayed 2^4 ms, 2nd 2^5, etc. |
357 | 45.4k | } else { |
358 | 3 | base_delay_ms = max_delay_ms(); |
359 | 3 | } |
360 | | // Normal rand is seeded by default with 1. Using the same for rand_r seed. |
361 | 45.4k | unsigned int seed = 1; |
362 | 45.4k | int64_t jitter_ms = rand_r(&seed) % 50; // Add up to 50ms of additional random delay. |
363 | 45.4k | int64_t delay_millis = std::min<int64_t>(base_delay_ms + jitter_ms, millis_remaining); |
364 | | |
365 | 45.4k | if (delay_millis <= 0) { |
366 | 17 | auto status = STATUS(TimedOut, "Request timed out"); |
367 | 17 | LOG_WITH_PREFIX(WARNING) << status; |
368 | 17 | TransitionToFailedState(task_state, status); |
369 | 17 | return false; |
370 | 17 | } |
371 | | |
372 | 45.4k | LOG_WITH_PREFIX(INFO) << "Scheduling retry with a delay of " << delay_millis |
373 | 45.4k | << "ms (attempt = " << attempt_ << " / " << attempt_threshold << ")..."; |
374 | | |
375 | 45.4k | if (!PerformStateTransition(task_state, MonitoredTaskState::kScheduling)) { |
376 | 0 | LOG_WITH_PREFIX(WARNING) << "Unable to mark this task as MonitoredTaskState::kScheduling"; |
377 | 0 | return false; |
378 | 0 | } |
379 | 45.4k | auto task_id = master_->messenger()->ScheduleOnReactor( |
380 | 45.4k | std::bind(&RetryingTSRpcTask::RunDelayedTask, shared_from(this), _1), |
381 | 45.4k | MonoDelta::FromMilliseconds(delay_millis), SOURCE_LOCATION(), master_->messenger()); |
382 | 18.4E | VLOG_WITH_PREFIX_AND_FUNC(4) << "Task id: " << task_id; |
383 | 45.4k | reactor_task_id_.store(task_id, std::memory_order_release); |
384 | | |
385 | 45.4k | if (task_id == rpc::kInvalidTaskId) { |
386 | 1 | AbortTask(STATUS(Aborted, "Messenger closing")); |
387 | 1 | UnregisterAsyncTask(); |
388 | 1 | return false; |
389 | 1 | } |
390 | | |
391 | 45.4k | return TransitionToWaitingState(MonitoredTaskState::kScheduling); |
392 | 45.4k | } |
393 | | |
394 | 45.2k | void RetryingTSRpcTask::RunDelayedTask(const Status& status) { |
395 | 45.2k | if (state() == MonitoredTaskState::kAborted) { |
396 | 51 | UnregisterAsyncTask(); // May delete this. |
397 | 51 | return; |
398 | 51 | } |
399 | | |
400 | 45.1k | if (!status.ok()) { |
401 | 58 | LOG_WITH_PREFIX(WARNING) << "Async tablet task failed or was cancelled: " << status; |
402 | 58 | if (status.IsAborted() || status.IsServiceUnavailable()0 ) { |
403 | 58 | AbortTask(status); |
404 | 58 | } |
405 | 58 | UnregisterAsyncTask(); // May delete this. |
406 | 58 | return; |
407 | 58 | } |
408 | | |
409 | 45.1k | auto log_prefix = LogPrefix(); // Save in case we need to log after deletion. |
410 | 45.1k | Status s = Run(); // May delete this. |
411 | 45.1k | if (!s.ok()) { |
412 | 0 | LOG(WARNING) << log_prefix << "Async tablet task failed: " << s; |
413 | 0 | } |
414 | 45.1k | } |
415 | | |
416 | 336k | void RetryingTSRpcTask::UnregisterAsyncTaskCallback() {} |
417 | | |
418 | 0 | Status RetryingTSRpcTask::Failed(const Status& status) { |
419 | 0 | LOG_WITH_PREFIX(WARNING) << "Async task failed: " << status; |
420 | 0 | Finished(status); |
421 | 0 | UnregisterAsyncTask(); |
422 | 0 | return status; |
423 | 0 | } |
424 | | |
425 | 421k | void RetryingTSRpcTask::UnregisterAsyncTask() { |
426 | | // Retain a reference to the object, in case RemoveTask would have removed the last one. |
427 | 421k | auto self = shared_from_this(); |
428 | 421k | std::unique_lock<decltype(unregister_mutex_)> lock(unregister_mutex_); |
429 | 421k | UpdateMetrics(master_->GetMetric(type_name(), Master::TaskMetric, description()), start_ts_, |
430 | 421k | type_name(), "task metric"); |
431 | | |
432 | 421k | auto s = state(); |
433 | 421k | if (!IsStateTerminal(s)) { |
434 | 0 | LOG_WITH_PREFIX(FATAL) << "Invalid task state " << s; |
435 | 0 | } |
436 | 421k | end_ts_ = MonoTime::Now(); |
437 | 421k | if (table_ != nullptr421k && table_->RemoveTask(self)) { |
438 | | // We don't delete table while it have running tasks, so should check whether it was last task, |
439 | | // even it is not delete table task. |
440 | 97.8k | master_->catalog_manager()->CheckTableDeleted(table_); |
441 | 97.8k | } |
442 | | // Make sure to run the callbacks last, in case they rely on the task no longer being tracked |
443 | | // by the table. |
444 | 421k | UnregisterAsyncTaskCallback(); |
445 | 421k | } |
446 | | |
447 | 722 | void RetryingTSRpcTask::AbortIfScheduled() { |
448 | 722 | auto reactor_task_id = reactor_task_id_.load(std::memory_order_acquire); |
449 | 722 | VLOG_WITH_PREFIX_AND_FUNC0 (1) << "Reactor task id: " << reactor_task_id0 ; |
450 | 722 | if (reactor_task_id != rpc::kInvalidTaskId) { |
451 | 522 | master_->messenger()->AbortOnReactor(reactor_task_id); |
452 | 522 | } |
453 | 722 | } |
454 | | |
455 | 466k | Status RetryingTSRpcTask::ResetTSProxy() { |
456 | | // TODO: if there is no replica available, should we still keep the task running? |
457 | 466k | RETURN_NOT_OK(replica_picker_->PickReplica(&target_ts_desc_)); |
458 | | |
459 | 463k | shared_ptr<tserver::TabletServerServiceProxy> ts_proxy; |
460 | 463k | shared_ptr<tserver::TabletServerAdminServiceProxy> ts_admin_proxy; |
461 | 463k | shared_ptr<consensus::ConsensusServiceProxy> consensus_proxy; |
462 | 463k | shared_ptr<tserver::TabletServerBackupServiceProxy> ts_backup_proxy; |
463 | | |
464 | 463k | RETURN_NOT_OK(target_ts_desc_->GetProxy(&ts_proxy)); |
465 | 463k | RETURN_NOT_OK(target_ts_desc_->GetProxy(&ts_admin_proxy)); |
466 | 463k | RETURN_NOT_OK(target_ts_desc_->GetProxy(&consensus_proxy)); |
467 | 463k | RETURN_NOT_OK(target_ts_desc_->GetProxy(&ts_backup_proxy)); |
468 | | |
469 | 463k | ts_proxy_.swap(ts_proxy); |
470 | 463k | ts_admin_proxy_.swap(ts_admin_proxy); |
471 | 463k | consensus_proxy_.swap(consensus_proxy); |
472 | 463k | ts_backup_proxy_.swap(ts_backup_proxy); |
473 | | |
474 | 463k | return Status::OK(); |
475 | 463k | } |
476 | | |
477 | | void RetryingTSRpcTask::TransitionToTerminalState(MonitoredTaskState expected, |
478 | | MonitoredTaskState terminal_state, |
479 | 420k | const Status& status) { |
480 | 420k | if (!PerformStateTransition(expected, terminal_state)) { |
481 | 0 | if (terminal_state != MonitoredTaskState::kAborted && state() == MonitoredTaskState::kAborted) { |
482 | 0 | LOG_WITH_PREFIX(WARNING) << "Unable to perform transition " << expected << " -> " |
483 | 0 | << terminal_state << ". Task has been aborted"; |
484 | 0 | } else { |
485 | 0 | LOG_WITH_PREFIX(DFATAL) << "State transition " << expected << " -> " |
486 | 0 | << terminal_state << " failed. Current task is in an invalid state: " |
487 | 0 | << state(); |
488 | 0 | } |
489 | 0 | return; |
490 | 0 | } |
491 | | |
492 | 420k | Finished(status); |
493 | 420k | } |
494 | | |
495 | | void RetryingTSRpcTask::TransitionToFailedState(server::MonitoredTaskState expected, |
496 | 68 | const yb::Status& status) { |
497 | 68 | TransitionToTerminalState(expected, MonitoredTaskState::kFailed, status); |
498 | 68 | } |
499 | | |
500 | 420k | void RetryingTSRpcTask::TransitionToCompleteState() { |
501 | 420k | TransitionToTerminalState( |
502 | 420k | MonitoredTaskState::kRunning, MonitoredTaskState::kComplete, Status::OK()); |
503 | 420k | } |
504 | | |
505 | 45.4k | bool RetryingTSRpcTask::TransitionToWaitingState(MonitoredTaskState expected) { |
506 | 45.4k | if (!PerformStateTransition(expected, MonitoredTaskState::kWaiting)) { |
507 | | // The only valid reason for state not being MonitoredTaskState is because the task got |
508 | | // aborted. |
509 | 0 | if (state() != MonitoredTaskState::kAborted) { |
510 | 0 | LOG_WITH_PREFIX(FATAL) << "Unable to mark task as MonitoredTaskState::kWaiting"; |
511 | 0 | } |
512 | 0 | AbortIfScheduled(); |
513 | 0 | return false; |
514 | 45.4k | } else { |
515 | 45.4k | return true; |
516 | 45.4k | } |
517 | 45.4k | } |
518 | | |
519 | | // ============================================================================ |
520 | | // Class AsyncTabletLeaderTask. |
521 | | // ============================================================================ |
522 | | AsyncTabletLeaderTask::AsyncTabletLeaderTask( |
523 | | Master* master, ThreadPool* callback_pool, const scoped_refptr<TabletInfo>& tablet) |
524 | | : RetryingTSRpcTask( |
525 | | master, callback_pool, std::unique_ptr<TSPicker>(new PickLeaderReplica(tablet)), |
526 | | tablet->table().get()), |
527 | 61.4k | tablet_(tablet) { |
528 | 61.4k | } |
529 | | |
530 | | AsyncTabletLeaderTask::AsyncTabletLeaderTask( |
531 | | Master* master, ThreadPool* callback_pool, const scoped_refptr<TabletInfo>& tablet, |
532 | | const scoped_refptr<TableInfo>& table) |
533 | | : RetryingTSRpcTask( |
534 | | master, callback_pool, std::unique_ptr<TSPicker>(new PickLeaderReplica(tablet)), table), |
535 | 28.0k | tablet_(tablet) { |
536 | 28.0k | } |
537 | | |
538 | 80.5k | AsyncTabletLeaderTask::~AsyncTabletLeaderTask() = default; |
539 | | |
540 | 189k | std::string AsyncTabletLeaderTask::description() const { |
541 | 189k | return Format("$0 RPC for tablet $1 ($2)", type_name(), tablet_, table_name()); |
542 | 189k | } |
543 | | |
544 | 57.5k | TabletId AsyncTabletLeaderTask::tablet_id() const { |
545 | 57.5k | return tablet_->tablet_id(); |
546 | 57.5k | } |
547 | | |
548 | 33.3k | TabletServerId AsyncTabletLeaderTask::permanent_uuid() const { |
549 | 18.4E | return target_ts_desc_ != nullptr33.3k ? target_ts_desc_->permanent_uuid()33.3k : ""; |
550 | 33.3k | } |
551 | | |
552 | | // ============================================================================ |
553 | | // Class AsyncCreateReplica. |
554 | | // ============================================================================ |
555 | | AsyncCreateReplica::AsyncCreateReplica(Master *master, |
556 | | ThreadPool *callback_pool, |
557 | | const string& permanent_uuid, |
558 | | const scoped_refptr<TabletInfo>& tablet, |
559 | | const std::vector<SnapshotScheduleId>& snapshot_schedules) |
560 | | : RetrySpecificTSRpcTask(master, callback_pool, permanent_uuid, tablet->table().get()), |
561 | 140k | tablet_id_(tablet->tablet_id()) { |
562 | 140k | deadline_ = start_ts_; |
563 | 140k | deadline_.AddDelta(MonoDelta::FromMilliseconds(FLAGS_tablet_creation_timeout_ms)); |
564 | | |
565 | 140k | auto table_lock = tablet->table()->LockForRead(); |
566 | 140k | const SysTabletsEntryPB& tablet_pb = tablet->metadata().dirty().pb; |
567 | | |
568 | 140k | req_.set_dest_uuid(permanent_uuid); |
569 | 140k | req_.set_table_id(tablet->table()->id()); |
570 | 140k | req_.set_tablet_id(tablet->tablet_id()); |
571 | 140k | req_.set_table_type(tablet->table()->metadata().state().pb.table_type()); |
572 | 140k | req_.mutable_partition()->CopyFrom(tablet_pb.partition()); |
573 | 140k | req_.set_namespace_id(table_lock->pb.namespace_id()); |
574 | 140k | req_.set_namespace_name(table_lock->pb.namespace_name()); |
575 | 140k | req_.set_table_name(table_lock->pb.name()); |
576 | 140k | req_.mutable_schema()->CopyFrom(table_lock->pb.schema()); |
577 | 140k | req_.mutable_partition_schema()->CopyFrom(table_lock->pb.partition_schema()); |
578 | 140k | req_.mutable_config()->CopyFrom(tablet_pb.committed_consensus_state().config()); |
579 | 140k | req_.set_colocated(tablet_pb.colocated()); |
580 | 140k | if (table_lock->pb.has_index_info()) { |
581 | 14.7k | req_.mutable_index_info()->CopyFrom(table_lock->pb.index_info()); |
582 | 14.7k | } |
583 | 140k | auto& req_schedules = *req_.mutable_snapshot_schedules(); |
584 | 140k | req_schedules.Reserve(narrow_cast<int>(snapshot_schedules.size())); |
585 | 140k | for (const auto& id : snapshot_schedules) { |
586 | 72 | req_schedules.Add()->assign(id.AsSlice().cdata(), id.size()); |
587 | 72 | } |
588 | 140k | } |
589 | | |
590 | 281k | std::string AsyncCreateReplica::description() const { |
591 | 281k | return Format("CreateTablet RPC for tablet $0 ($1) on TS=$2", |
592 | 281k | tablet_id_, table_name(), permanent_uuid_); |
593 | 281k | } |
594 | | |
595 | 139k | void AsyncCreateReplica::HandleResponse(int attempt) { |
596 | 139k | if (resp_.has_error()) { |
597 | 11 | Status s = StatusFromPB(resp_.error().status()); |
598 | 11 | if (s.IsAlreadyPresent()) { |
599 | 0 | LOG_WITH_PREFIX(INFO) << "CreateTablet RPC for tablet " << tablet_id_ |
600 | 0 | << " on TS " << permanent_uuid_ << " returned already present: " |
601 | 0 | << s; |
602 | 0 | TransitionToCompleteState(); |
603 | 11 | } else { |
604 | 11 | LOG_WITH_PREFIX(WARNING) << "CreateTablet RPC for tablet " << tablet_id_ |
605 | 11 | << " on TS " << permanent_uuid_ << " failed: " << s; |
606 | 11 | } |
607 | | |
608 | 11 | return; |
609 | 11 | } |
610 | | |
611 | 139k | TransitionToCompleteState(); |
612 | 139k | VLOG_WITH_PREFIX151 (1) << "TS " << permanent_uuid_ << ": complete on tablet " << tablet_id_151 ; |
613 | 139k | } |
614 | | |
615 | 139k | bool AsyncCreateReplica::SendRequest(int attempt) { |
616 | 139k | ts_admin_proxy_->CreateTabletAsync(req_, &resp_, &rpc_, BindRpcCallback()); |
617 | 139k | VLOG_WITH_PREFIX1.05k (1) << "Send create tablet request to " << permanent_uuid_ << ":\n" |
618 | 1.05k | << " (attempt " << attempt << "):\n" |
619 | 1.05k | << req_.DebugString(); |
620 | 139k | return true; |
621 | 139k | } |
622 | | |
623 | | // ============================================================================ |
624 | | // Class AsyncStartElection. |
625 | | // ============================================================================ |
626 | | AsyncStartElection::AsyncStartElection(Master *master, |
627 | | ThreadPool *callback_pool, |
628 | | const string& permanent_uuid, |
629 | | const scoped_refptr<TabletInfo>& tablet) |
630 | | : RetrySpecificTSRpcTask(master, callback_pool, permanent_uuid, tablet->table().get()), |
631 | 47.8k | tablet_id_(tablet->tablet_id()) { |
632 | 47.8k | deadline_ = start_ts_; |
633 | 47.8k | deadline_.AddDelta(MonoDelta::FromMilliseconds(FLAGS_tablet_creation_timeout_ms)); |
634 | | |
635 | 47.8k | req_.set_dest_uuid(permanent_uuid_); |
636 | 47.8k | req_.set_tablet_id(tablet_id_); |
637 | 47.8k | req_.set_initial_election(true); |
638 | 47.8k | } |
639 | | |
640 | 86.2k | void AsyncStartElection::HandleResponse(int attempt) { |
641 | 86.2k | if (resp_.has_error()) { |
642 | 38.3k | Status s = StatusFromPB(resp_.error().status()); |
643 | 38.3k | if (!s.ok()) { |
644 | 38.3k | LOG_WITH_PREFIX(WARNING) << "RunLeaderElection RPC for tablet " << tablet_id_ |
645 | 38.3k | << " on TS " << permanent_uuid_ << " failed: " << s; |
646 | 38.3k | } |
647 | | |
648 | 38.3k | return; |
649 | 38.3k | } |
650 | | |
651 | 47.8k | TransitionToCompleteState(); |
652 | 47.8k | } |
653 | | |
654 | 335k | std::string AsyncStartElection::description() const { |
655 | 335k | return Format("RunLeaderElection RPC for tablet $0 ($1) on TS=$2", |
656 | 335k | tablet_id_, table_name(), permanent_uuid_); |
657 | 335k | } |
658 | | |
659 | 86.2k | bool AsyncStartElection::SendRequest(int attempt) { |
660 | 86.2k | LOG_WITH_PREFIX(INFO) << Format( |
661 | 86.2k | "Hinted Leader start election at $0 for tablet $1, attempt $2", |
662 | 86.2k | permanent_uuid_, tablet_id_, attempt); |
663 | 86.2k | consensus_proxy_->RunLeaderElectionAsync(req_, &resp_, &rpc_, BindRpcCallback()); |
664 | | |
665 | 86.2k | return true; |
666 | 86.2k | } |
667 | | |
668 | | // ============================================================================ |
669 | | // Class AsyncDeleteReplica. |
670 | | // ============================================================================ |
671 | 77.4k | void AsyncDeleteReplica::HandleResponse(int attempt) { |
672 | 77.4k | if (resp_.has_error()) { |
673 | 1.99k | Status status = StatusFromPB(resp_.error().status()); |
674 | | |
675 | | // Do not retry on a fatal error |
676 | 1.99k | TabletServerErrorPB::Code code = resp_.error().code(); |
677 | 1.99k | switch (code) { |
678 | 43 | case TabletServerErrorPB::TABLET_NOT_FOUND: |
679 | 43 | LOG_WITH_PREFIX(WARNING) |
680 | 43 | << "TS " << permanent_uuid_ << ": delete failed for tablet " << tablet_id_ |
681 | 43 | << " because the tablet was not found. No further retry: " |
682 | 43 | << status.ToString(); |
683 | 43 | TransitionToCompleteState(); |
684 | 43 | break; |
685 | 79 | case TabletServerErrorPB::CAS_FAILED: |
686 | 79 | LOG_WITH_PREFIX(WARNING) |
687 | 79 | << "TS " << permanent_uuid_ << ": delete failed for tablet " << tablet_id_ |
688 | 79 | << " due to a CAS failure. No further retry: " << status.ToString(); |
689 | 79 | TransitionToCompleteState(); |
690 | 79 | break; |
691 | 1 | case TabletServerErrorPB::WRONG_SERVER_UUID: |
692 | 1 | LOG_WITH_PREFIX(WARNING) |
693 | 1 | << "TS " << permanent_uuid_ << ": delete failed for tablet " << tablet_id_ |
694 | 1 | << " due to an incorrect UUID. No further retry: " << status.ToString(); |
695 | 1 | TransitionToCompleteState(); |
696 | 1 | break; |
697 | 1.87k | default: |
698 | 1.87k | LOG_WITH_PREFIX(WARNING) |
699 | 1.87k | << "TS " << permanent_uuid_ << ": delete failed for tablet " << tablet_id_ |
700 | 1.87k | << " with error code " << TabletServerErrorPB::Code_Name(code) |
701 | 1.87k | << ": " << status.ToString(); |
702 | 1.87k | break; |
703 | 1.99k | } |
704 | 75.4k | } else { |
705 | 75.4k | if (table_75.4k ) { |
706 | 75.4k | LOG_WITH_PREFIX(INFO) |
707 | 75.4k | << "TS " << permanent_uuid_ << ": tablet " << tablet_id_ |
708 | 75.4k | << " (table " << table_->ToString() << ") successfully done"; |
709 | 18.4E | } else { |
710 | 18.4E | LOG_WITH_PREFIX(WARNING) |
711 | 18.4E | << "TS " << permanent_uuid_ << ": tablet " << tablet_id_ |
712 | 18.4E | << " did not belong to a known table, but was successfully deleted"; |
713 | 18.4E | } |
714 | 75.4k | TransitionToCompleteState(); |
715 | 18.4E | VLOG_WITH_PREFIX(1) << "TS " << permanent_uuid_ << ": complete on tablet " << tablet_id_; |
716 | 75.4k | } |
717 | 77.4k | } |
718 | | |
719 | 237k | std::string AsyncDeleteReplica::description() const { |
720 | 237k | return Format("$0Tablet RPC for tablet $1 ($2) on TS=$3", |
721 | 237k | hide_only_ ? "Hide"108 : "Delete"236k , tablet_id_, table_name(), permanent_uuid_); |
722 | 237k | } |
723 | | |
724 | 78.1k | bool AsyncDeleteReplica::SendRequest(int attempt) { |
725 | 78.1k | tserver::DeleteTabletRequestPB req; |
726 | 78.1k | req.set_dest_uuid(permanent_uuid_); |
727 | 78.1k | req.set_tablet_id(tablet_id_); |
728 | 78.1k | req.set_reason(reason_); |
729 | 78.1k | req.set_delete_type(delete_type_); |
730 | 78.1k | if (hide_only_) { |
731 | 36 | req.set_hide_only(hide_only_); |
732 | 36 | } |
733 | 78.1k | if (cas_config_opid_index_less_or_equal_) { |
734 | 4.16k | req.set_cas_config_opid_index_less_or_equal(*cas_config_opid_index_less_or_equal_); |
735 | 4.16k | } |
736 | | |
737 | 78.1k | ts_admin_proxy_->DeleteTabletAsync(req, &resp_, &rpc_, BindRpcCallback()); |
738 | 18.4E | VLOG_WITH_PREFIX(1) << "Send delete tablet request to " << permanent_uuid_ |
739 | 18.4E | << " (attempt " << attempt << "):\n" |
740 | 18.4E | << req.DebugString(); |
741 | 78.1k | return true; |
742 | 78.1k | } |
743 | | |
744 | 75.8k | void AsyncDeleteReplica::UnregisterAsyncTaskCallback() { |
745 | | // Only notify if we are in a success state. |
746 | 75.8k | if (state() == MonitoredTaskState::kComplete) { |
747 | 75.8k | master_->catalog_manager()->NotifyTabletDeleteFinished(permanent_uuid_, tablet_id_, table()); |
748 | 75.8k | } |
749 | 75.8k | } |
750 | | |
751 | | // ============================================================================ |
752 | | // Class AsyncAlterTable. |
753 | | // ============================================================================ |
754 | 32.4k | void AsyncAlterTable::HandleResponse(int attempt) { |
755 | 32.4k | if (PREDICT_FALSE(FLAGS_TEST_slowdown_alter_table_rpcs_ms > 0)) { |
756 | 0 | VLOG_WITH_PREFIX(1) << "Sleeping for " << tablet_->tablet_id() |
757 | 0 | << FLAGS_TEST_slowdown_alter_table_rpcs_ms |
758 | 0 | << "ms before returning response in async alter table request handler"; |
759 | 0 | SleepFor(MonoDelta::FromMilliseconds(FLAGS_TEST_slowdown_alter_table_rpcs_ms)); |
760 | 0 | } |
761 | | |
762 | 32.4k | if (resp_.has_error()) { |
763 | 615 | Status status = StatusFromPB(resp_.error().status()); |
764 | | |
765 | 615 | LOG_WITH_PREFIX(WARNING) << "TS " << permanent_uuid() << " failed: " |
766 | 615 | << status << " for version " << schema_version_; |
767 | | |
768 | | // Do not retry on a fatal error |
769 | 615 | switch (resp_.error().code()) { |
770 | 0 | case TabletServerErrorPB::TABLET_NOT_FOUND: |
771 | 0 | case TabletServerErrorPB::MISMATCHED_SCHEMA: |
772 | 0 | case TabletServerErrorPB::TABLET_HAS_A_NEWER_SCHEMA: |
773 | 0 | TransitionToCompleteState(); |
774 | 0 | break; |
775 | 615 | default: |
776 | 615 | break; |
777 | 615 | } |
778 | 31.7k | } else { |
779 | 31.7k | TransitionToCompleteState(); |
780 | 31.7k | VLOG_WITH_PREFIX49 (1) |
781 | 49 | << "TS " << permanent_uuid() << " completed: for version " << schema_version_; |
782 | 31.7k | } |
783 | | |
784 | 32.4k | server::UpdateClock(resp_, master_->clock()); |
785 | | |
786 | 32.4k | if (state() == MonitoredTaskState::kComplete) { |
787 | | // TODO: proper error handling here. Not critical, since TSHeartbeat will retry on failure. |
788 | 31.7k | WARN_NOT_OK( |
789 | 31.7k | master_->catalog_manager()->HandleTabletSchemaVersionReport( |
790 | 31.7k | tablet_.get(), schema_version_, table()), |
791 | 31.7k | Format( |
792 | 31.7k | "$0 failed while running AsyncAlterTable::HandleResponse. Response $1", |
793 | 31.7k | description(), resp_.ShortDebugString())); |
794 | 31.7k | } else { |
795 | 624 | VLOG_WITH_PREFIX9 (1) << "Task is not completed " << tablet_->ToString() << " for version " |
796 | 9 | << schema_version_; |
797 | 624 | } |
798 | 32.4k | } |
799 | | |
800 | 28.6k | TableType AsyncAlterTable::table_type() const { |
801 | 28.6k | return tablet_->table()->GetTableType(); |
802 | 28.6k | } |
803 | | |
804 | 28.6k | bool AsyncAlterTable::SendRequest(int attempt) { |
805 | 28.6k | VLOG_WITH_PREFIX58 (1) << "Send alter table request to " << permanent_uuid() << " for " |
806 | 58 | << tablet_->tablet_id() << " waiting for a read lock."; |
807 | | |
808 | 28.6k | tablet::ChangeMetadataRequestPB req; |
809 | 28.6k | { |
810 | 28.6k | auto l = table_->LockForRead(); |
811 | 18.4E | VLOG_WITH_PREFIX(1) << "Send alter table request to " << permanent_uuid() << " for " |
812 | 18.4E | << tablet_->tablet_id() << " obtained the read lock."; |
813 | | |
814 | 28.6k | req.set_schema_version(l->pb.version()); |
815 | 28.6k | req.set_dest_uuid(permanent_uuid()); |
816 | 28.6k | req.set_tablet_id(tablet_->tablet_id()); |
817 | 28.6k | req.set_alter_table_id(table_->id()); |
818 | | |
819 | 28.6k | if (l->pb.has_wal_retention_secs()) { |
820 | 5.22k | req.set_wal_retention_secs(l->pb.wal_retention_secs()); |
821 | 5.22k | } |
822 | | |
823 | 28.6k | req.mutable_schema()->CopyFrom(l->pb.schema()); |
824 | 28.6k | req.set_new_table_name(l->pb.name()); |
825 | 28.6k | req.mutable_indexes()->CopyFrom(l->pb.indexes()); |
826 | 28.6k | req.set_propagated_hybrid_time(master_->clock()->Now().ToUint64()); |
827 | | |
828 | 28.6k | if (table_type() == TableType::PGSQL_TABLE_TYPE && !transaction_id_.IsNil()13.3k ) { |
829 | 1.18k | VLOG_WITH_PREFIX0 (1) << "Transaction ID is provided for tablet " << tablet_->tablet_id() |
830 | 0 | << " with ID " << transaction_id_.ToString() << " for ALTER TABLE operation"; |
831 | 1.18k | req.set_should_abort_active_txns(true); |
832 | 1.18k | req.set_transaction_id(transaction_id_.ToString()); |
833 | 1.18k | } |
834 | | |
835 | 28.6k | schema_version_ = l->pb.version(); |
836 | 28.6k | } |
837 | | |
838 | 28.6k | ts_admin_proxy_->AlterSchemaAsync(req, &resp_, &rpc_, BindRpcCallback()); |
839 | 28.6k | VLOG_WITH_PREFIX32 (1) |
840 | 32 | << "Send alter table request to " << permanent_uuid() << " for " << tablet_->tablet_id() |
841 | 32 | << " (attempt " << attempt << "):\n" << req.DebugString(); |
842 | 28.6k | return true; |
843 | 28.6k | } |
844 | | |
845 | 3.79k | bool AsyncBackfillDone::SendRequest(int attempt) { |
846 | 3.79k | VLOG_WITH_PREFIX11 (1) |
847 | 11 | << "Send alter table request to " << permanent_uuid() << " for " << tablet_->tablet_id() |
848 | 11 | << " version " << schema_version_ << " waiting for a read lock."; |
849 | | |
850 | 3.79k | tablet::ChangeMetadataRequestPB req; |
851 | 3.79k | { |
852 | 3.79k | auto l = table_->LockForRead(); |
853 | 18.4E | VLOG_WITH_PREFIX(1) |
854 | 18.4E | << "Send alter table request to " << permanent_uuid() << " for " << tablet_->tablet_id() |
855 | 18.4E | << " version " << schema_version_ << " obtained the read lock."; |
856 | | |
857 | 3.79k | req.set_backfill_done_table_id(table_id_); |
858 | 3.79k | req.set_dest_uuid(permanent_uuid()); |
859 | 3.79k | req.set_tablet_id(tablet_->tablet_id()); |
860 | 3.79k | req.set_propagated_hybrid_time(master_->clock()->Now().ToUint64()); |
861 | 3.79k | req.set_mark_backfill_done(true); |
862 | 3.79k | schema_version_ = l->pb.version(); |
863 | 3.79k | } |
864 | | |
865 | 3.79k | ts_admin_proxy_->BackfillDoneAsync(req, &resp_, &rpc_, BindRpcCallback()); |
866 | 3.79k | VLOG_WITH_PREFIX5 (1) |
867 | 5 | << "Send backfill done request to " << permanent_uuid() << " for " << tablet_->tablet_id() |
868 | 5 | << " (attempt " << attempt << "):\n" << req.DebugString(); |
869 | 3.79k | return true; |
870 | 3.79k | } |
871 | | |
872 | | // ============================================================================ |
873 | | // Class AsyncCopartitionTable. |
874 | | // ============================================================================ |
875 | | AsyncCopartitionTable::AsyncCopartitionTable(Master *master, |
876 | | ThreadPool* callback_pool, |
877 | | const scoped_refptr<TabletInfo>& tablet, |
878 | | const scoped_refptr<TableInfo>& table) |
879 | | : RetryingTSRpcTask(master, |
880 | | callback_pool, |
881 | | std::unique_ptr<TSPicker>(new PickLeaderReplica(tablet)), |
882 | | table.get()), |
883 | 0 | tablet_(tablet), table_(table) { |
884 | 0 | } |
885 | | |
886 | 0 | string AsyncCopartitionTable::description() const { |
887 | 0 | return "Copartition Table RPC for tablet " + tablet_->ToString() |
888 | 0 | + " for " + table_->ToString(); |
889 | 0 | } |
890 | | |
891 | 0 | TabletId AsyncCopartitionTable::tablet_id() const { |
892 | 0 | return tablet_->tablet_id(); |
893 | 0 | } |
894 | | |
895 | 0 | TabletServerId AsyncCopartitionTable::permanent_uuid() const { |
896 | 0 | return target_ts_desc_ != nullptr ? target_ts_desc_->permanent_uuid() : ""; |
897 | 0 | } |
898 | | |
899 | | // TODO(sagnik): modify this to fill all relevant fields for the AsyncCopartition request. |
900 | 0 | bool AsyncCopartitionTable::SendRequest(int attempt) { |
901 | |
|
902 | 0 | tserver::CopartitionTableRequestPB req; |
903 | 0 | req.set_dest_uuid(permanent_uuid()); |
904 | 0 | req.set_tablet_id(tablet_->tablet_id()); |
905 | 0 | req.set_table_id(table_->id()); |
906 | 0 | req.set_table_name(table_->name()); |
907 | |
|
908 | 0 | ts_admin_proxy_->CopartitionTableAsync(req, &resp_, &rpc_, BindRpcCallback()); |
909 | 0 | VLOG_WITH_PREFIX(1) << "Send copartition table request to " << permanent_uuid() |
910 | 0 | << " (attempt " << attempt << "):\n" << req.DebugString(); |
911 | 0 | return true; |
912 | 0 | } |
913 | | |
914 | | // TODO(sagnik): modify this to handle the AsyncCopartition Response and retry fail as necessary. |
915 | 0 | void AsyncCopartitionTable::HandleResponse(int attempt) { |
916 | 0 | LOG_WITH_PREFIX(INFO) << "master can't handle server responses yet"; |
917 | 0 | } |
918 | | |
919 | | // ============================================================================ |
920 | | // Class AsyncTruncate. |
921 | | // ============================================================================ |
922 | 57.2k | void AsyncTruncate::HandleResponse(int attempt) { |
923 | 57.2k | if (resp_.has_error()) { |
924 | 10 | const Status s = StatusFromPB(resp_.error().status()); |
925 | 10 | const TabletServerErrorPB::Code code = resp_.error().code(); |
926 | 10 | LOG_WITH_PREFIX(WARNING) |
927 | 10 | << "TS " << permanent_uuid() << ": truncate failed for tablet " << tablet_id() |
928 | 10 | << " with error code " << TabletServerErrorPB::Code_Name(code) << ": " << s; |
929 | 57.2k | } else { |
930 | 57.2k | VLOG_WITH_PREFIX57 (1) |
931 | 57 | << "TS " << permanent_uuid() << ": truncate complete on tablet " << tablet_id(); |
932 | 57.2k | TransitionToCompleteState(); |
933 | 57.2k | } |
934 | | |
935 | 57.2k | server::UpdateClock(resp_, master_->clock()); |
936 | 57.2k | } |
937 | | |
938 | 56.9k | bool AsyncTruncate::SendRequest(int attempt) { |
939 | 56.9k | tserver::TruncateRequestPB req; |
940 | 56.9k | req.set_tablet_id(tablet_id()); |
941 | 56.9k | req.set_propagated_hybrid_time(master_->clock()->Now().ToUint64()); |
942 | 56.9k | ts_proxy_->TruncateAsync(req, &resp_, &rpc_, BindRpcCallback()); |
943 | 56.9k | VLOG_WITH_PREFIX185 (1) << "Send truncate tablet request to " << permanent_uuid() |
944 | 185 | << " (attempt " << attempt << "):\n" << req.DebugString(); |
945 | 56.9k | return true; |
946 | 56.9k | } |
947 | | |
948 | | // ============================================================================ |
949 | | // Class CommonInfoForRaftTask. |
950 | | // ============================================================================ |
951 | | CommonInfoForRaftTask::CommonInfoForRaftTask( |
952 | | Master* master, ThreadPool* callback_pool, const scoped_refptr<TabletInfo>& tablet, |
953 | | const consensus::ConsensusStatePB& cstate, const string& change_config_ts_uuid) |
954 | | : RetryingTSRpcTask( |
955 | | master, callback_pool, std::unique_ptr<TSPicker>(new PickLeaderReplica(tablet)), |
956 | | tablet->table()), |
957 | | tablet_(tablet), |
958 | | cstate_(cstate), |
959 | 58.7k | change_config_ts_uuid_(change_config_ts_uuid) { |
960 | 58.7k | deadline_ = MonoTime::Max(); // Never time out. |
961 | 58.7k | } |
962 | | |
963 | 51.8k | CommonInfoForRaftTask::~CommonInfoForRaftTask() = default; |
964 | | |
965 | 1.69k | TabletId CommonInfoForRaftTask::tablet_id() const { |
966 | 1.69k | return tablet_->tablet_id(); |
967 | 1.69k | } |
968 | | |
969 | 190k | TabletServerId CommonInfoForRaftTask::permanent_uuid() const { |
970 | 190k | return target_ts_desc_ != nullptr ? target_ts_desc_->permanent_uuid()190k : ""20 ; |
971 | 190k | } |
972 | | |
973 | | // ============================================================================ |
974 | | // Class AsyncChangeConfigTask. |
975 | | // ============================================================================ |
976 | 17.9k | string AsyncChangeConfigTask::description() const { |
977 | 17.9k | return Format( |
978 | 17.9k | "$0 RPC for tablet $1 ($2) on peer $3 with cas_config_opid_index $4", type_name(), |
979 | 17.9k | tablet_->tablet_id(), table_name(), permanent_uuid(), cstate_.config().opid_index()); |
980 | 17.9k | } |
981 | | |
982 | 5.52k | bool AsyncChangeConfigTask::SendRequest(int attempt) { |
983 | | // Bail if we're retrying in vain. |
984 | 5.52k | int64_t latest_index; |
985 | 5.52k | { |
986 | 5.52k | auto tablet_lock = tablet_->LockForRead(); |
987 | 5.52k | latest_index = tablet_lock->pb.committed_consensus_state().config().opid_index(); |
988 | | // Adding this logic for a race condition that occurs in this scenario: |
989 | | // 1. CatalogManager receives a DeleteTable request and sends DeleteTablet requests to the |
990 | | // tservers, but doesn't yet update the tablet in memory state to not running. |
991 | | // 2. The CB runs and sees that this tablet is still running, sees that it is over-replicated |
992 | | // (since the placement now dictates it should have 0 replicas), |
993 | | // but before it can send the ChangeConfig RPC to a tserver. |
994 | | // 3. That tserver processes the DeleteTablet request. |
995 | | // 4. The ChangeConfig RPC now returns tablet not found, |
996 | | // which prompts an infinite retry of the RPC. |
997 | 5.52k | bool tablet_running = tablet_lock->is_running(); |
998 | 5.52k | if (!tablet_running) { |
999 | 0 | AbortTask(STATUS(Aborted, "Tablet is not running")); |
1000 | 0 | return false; |
1001 | 0 | } |
1002 | 5.52k | } |
1003 | 5.52k | if (latest_index > cstate_.config().opid_index()) { |
1004 | 395 | auto status = STATUS_FORMAT( |
1005 | 395 | Aborted, |
1006 | 395 | "Latest config for has opid_index of $0 while this task has opid_index of $1", |
1007 | 395 | latest_index, cstate_.config().opid_index()); |
1008 | 395 | LOG_WITH_PREFIX(INFO) << status; |
1009 | 395 | AbortTask(status); |
1010 | 395 | return false; |
1011 | 395 | } |
1012 | | |
1013 | | // Logging should be covered inside based on failure reasons. |
1014 | 5.12k | auto prepare_status = PrepareRequest(attempt); |
1015 | 5.12k | if (!prepare_status.ok()) { |
1016 | 0 | AbortTask(prepare_status); |
1017 | 0 | return false; |
1018 | 0 | } |
1019 | | |
1020 | 5.12k | consensus_proxy_->ChangeConfigAsync(req_, &resp_, &rpc_, BindRpcCallback()); |
1021 | 5.12k | VLOG_WITH_PREFIX5 (1) << "Task " << description() << " sent request:\n" << req_.DebugString()5 ; |
1022 | 5.12k | return true; |
1023 | 5.12k | } |
1024 | | |
1025 | 5.10k | void AsyncChangeConfigTask::HandleResponse(int attempt) { |
1026 | 5.10k | if (!resp_.has_error()) { |
1027 | 3.46k | TransitionToCompleteState(); |
1028 | 3.46k | LOG_WITH_PREFIX(INFO) << Substitute( |
1029 | 3.46k | "Change config succeeded on leader TS $0 for tablet $1 with type $2 for replica $3", |
1030 | 3.46k | permanent_uuid(), tablet_->tablet_id(), type_name(), change_config_ts_uuid_); |
1031 | 3.46k | return; |
1032 | 3.46k | } |
1033 | | |
1034 | 1.63k | Status status = StatusFromPB(resp_.error().status()); |
1035 | | |
1036 | | // Do not retry on some known errors, otherwise retry forever or until cancelled. |
1037 | 1.63k | switch (resp_.error().code()) { |
1038 | 13 | case TabletServerErrorPB::CAS_FAILED: |
1039 | 13 | case TabletServerErrorPB::ADD_CHANGE_CONFIG_ALREADY_PRESENT: |
1040 | 13 | case TabletServerErrorPB::REMOVE_CHANGE_CONFIG_NOT_PRESENT: |
1041 | 807 | case TabletServerErrorPB::NOT_THE_LEADER: |
1042 | 807 | LOG_WITH_PREFIX(WARNING) << "ChangeConfig() failed on leader " << permanent_uuid() |
1043 | 807 | << ". No further retry: " << status.ToString(); |
1044 | 807 | TransitionToCompleteState(); |
1045 | 807 | break; |
1046 | 830 | default: |
1047 | 830 | LOG_WITH_PREFIX(INFO) << "ChangeConfig() failed on leader " << permanent_uuid() |
1048 | 830 | << " due to error " |
1049 | 830 | << TabletServerErrorPB::Code_Name(resp_.error().code()) |
1050 | 830 | << ". This operation will be retried. Error detail: " |
1051 | 830 | << status.ToString(); |
1052 | 830 | break; |
1053 | 1.63k | } |
1054 | 1.63k | } |
1055 | | |
1056 | | // ============================================================================ |
1057 | | // Class AsyncAddServerTask. |
1058 | | // ============================================================================ |
1059 | 2.51k | Status AsyncAddServerTask::PrepareRequest(int attempt) { |
1060 | | // Select the replica we wish to add to the config. |
1061 | | // Do not include current members of the config. |
1062 | 2.51k | std::unordered_set<string> replica_uuids; |
1063 | 7.39k | for (const RaftPeerPB& peer : cstate_.config().peers()) { |
1064 | 7.39k | InsertOrDie(&replica_uuids, peer.permanent_uuid()); |
1065 | 7.39k | } |
1066 | 2.51k | TSDescriptorVector ts_descs; |
1067 | 2.51k | master_->ts_manager()->GetAllLiveDescriptors(&ts_descs); |
1068 | 2.51k | shared_ptr<TSDescriptor> replacement_replica; |
1069 | 5.98k | for (auto ts_desc : ts_descs) { |
1070 | 5.98k | if (ts_desc->permanent_uuid() == change_config_ts_uuid_) { |
1071 | | // This is given by the client, so we assume it is a well chosen uuid. |
1072 | 2.51k | replacement_replica = ts_desc; |
1073 | 2.51k | break; |
1074 | 2.51k | } |
1075 | 5.98k | } |
1076 | 2.51k | if (PREDICT_FALSE(!replacement_replica)) { |
1077 | 0 | auto status = STATUS_FORMAT( |
1078 | 0 | TimedOut, "Could not find desired replica $0 in live set", change_config_ts_uuid_); |
1079 | 0 | LOG_WITH_PREFIX(WARNING) << status; |
1080 | 0 | return status; |
1081 | 0 | } |
1082 | | |
1083 | 2.51k | req_.set_dest_uuid(permanent_uuid()); |
1084 | 2.51k | req_.set_tablet_id(tablet_->tablet_id()); |
1085 | 2.51k | req_.set_type(consensus::ADD_SERVER); |
1086 | 2.51k | req_.set_cas_config_opid_index(cstate_.config().opid_index()); |
1087 | 2.51k | RaftPeerPB* peer = req_.mutable_server(); |
1088 | 2.51k | peer->set_permanent_uuid(replacement_replica->permanent_uuid()); |
1089 | 2.51k | peer->set_member_type(member_type_); |
1090 | 2.51k | TSRegistrationPB peer_reg = replacement_replica->GetRegistration(); |
1091 | | |
1092 | 2.51k | if (peer_reg.common().private_rpc_addresses().empty()) { |
1093 | 0 | auto status = STATUS_FORMAT( |
1094 | 0 | IllegalState, "Candidate replacement $0 has no registered rpc address: $1", |
1095 | 0 | replacement_replica->permanent_uuid(), peer_reg); |
1096 | 0 | YB_LOG_EVERY_N(WARNING, 100) << LogPrefix() << status; |
1097 | 0 | return status; |
1098 | 0 | } |
1099 | | |
1100 | 2.51k | TakeRegistration(peer_reg.mutable_common(), peer); |
1101 | | |
1102 | 2.51k | return Status::OK(); |
1103 | 2.51k | } |
1104 | | |
1105 | | // ============================================================================ |
1106 | | // Class AsyncRemoveServerTask. |
1107 | | // ============================================================================ |
1108 | 2.61k | Status AsyncRemoveServerTask::PrepareRequest(int attempt) { |
1109 | 2.61k | bool found = false; |
1110 | 10.0k | for (const RaftPeerPB& peer : cstate_.config().peers()) { |
1111 | 10.0k | if (change_config_ts_uuid_ == peer.permanent_uuid()) { |
1112 | 2.61k | found = true; |
1113 | 2.61k | } |
1114 | 10.0k | } |
1115 | | |
1116 | 2.61k | if (!found) { |
1117 | 0 | auto status = STATUS_FORMAT( |
1118 | 0 | NotFound, "Asked to remove TS with uuid $0 but could not find it in config peers!", |
1119 | 0 | change_config_ts_uuid_); |
1120 | 0 | LOG_WITH_PREFIX(WARNING) << status; |
1121 | 0 | return status; |
1122 | 0 | } |
1123 | | |
1124 | 2.61k | req_.set_dest_uuid(permanent_uuid()); |
1125 | 2.61k | req_.set_tablet_id(tablet_->tablet_id()); |
1126 | 2.61k | req_.set_type(consensus::REMOVE_SERVER); |
1127 | 2.61k | req_.set_cas_config_opid_index(cstate_.config().opid_index()); |
1128 | 2.61k | RaftPeerPB* peer = req_.mutable_server(); |
1129 | 2.61k | peer->set_permanent_uuid(change_config_ts_uuid_); |
1130 | | |
1131 | 2.61k | return Status::OK(); |
1132 | 2.61k | } |
1133 | | |
1134 | | // ============================================================================ |
1135 | | // Class AsyncTryStepDown. |
1136 | | // ============================================================================ |
1137 | 54.1k | Status AsyncTryStepDown::PrepareRequest(int attempt) { |
1138 | 54.1k | LOG_WITH_PREFIX(INFO) << Substitute("Prep Leader step down $0, leader_uuid=$1, change_ts_uuid=$2", |
1139 | 54.1k | attempt, permanent_uuid(), change_config_ts_uuid_); |
1140 | 54.1k | if (attempt > 1) { |
1141 | 18 | return STATUS(RuntimeError, "Retry is not allowed"); |
1142 | 18 | } |
1143 | | |
1144 | | // If we were asked to remove the server even if it is the leader, we have to call StepDown, but |
1145 | | // only if our current leader is the server we are asked to remove. |
1146 | 54.0k | if (permanent_uuid() != change_config_ts_uuid_) { |
1147 | 3 | auto status = STATUS_FORMAT( |
1148 | 3 | IllegalState, |
1149 | 3 | "Incorrect state config leader $0 does not match target uuid $1 for a leader step down op", |
1150 | 3 | permanent_uuid(), change_config_ts_uuid_); |
1151 | 3 | LOG_WITH_PREFIX(WARNING) << status; |
1152 | 3 | return status; |
1153 | 3 | } |
1154 | | |
1155 | 54.0k | stepdown_req_.set_dest_uuid(change_config_ts_uuid_); |
1156 | 54.0k | stepdown_req_.set_tablet_id(tablet_->tablet_id()); |
1157 | 54.0k | if (!new_leader_uuid_.empty()) { |
1158 | 53.2k | stepdown_req_.set_new_leader_uuid(new_leader_uuid_); |
1159 | 53.2k | } |
1160 | | |
1161 | 54.0k | return Status::OK(); |
1162 | 54.0k | } |
1163 | | |
1164 | 54.1k | bool AsyncTryStepDown::SendRequest(int attempt) { |
1165 | 54.1k | auto prepare_status = PrepareRequest(attempt); |
1166 | 54.1k | if (!prepare_status.ok()) { |
1167 | 21 | AbortTask(prepare_status); |
1168 | 21 | return false; |
1169 | 21 | } |
1170 | | |
1171 | 54.0k | LOG_WITH_PREFIX(INFO) << Substitute("Stepping down leader $0 for tablet $1", |
1172 | 54.0k | change_config_ts_uuid_, tablet_->tablet_id()); |
1173 | 54.0k | consensus_proxy_->LeaderStepDownAsync( |
1174 | 54.0k | stepdown_req_, &stepdown_resp_, &rpc_, BindRpcCallback()); |
1175 | | |
1176 | 54.0k | return true; |
1177 | 54.1k | } |
1178 | | |
1179 | 54.0k | void AsyncTryStepDown::HandleResponse(int attempt) { |
1180 | 54.0k | if (!rpc_.status().ok()) { |
1181 | 0 | AbortTask(rpc_.status()); |
1182 | 0 | LOG_WITH_PREFIX(WARNING) << Substitute( |
1183 | 0 | "Got error on stepdown for tablet $0 with leader $1, attempt $2 and error $3", |
1184 | 0 | tablet_->tablet_id(), permanent_uuid(), attempt, rpc_.status().ToString()); |
1185 | |
|
1186 | 0 | return; |
1187 | 0 | } |
1188 | | |
1189 | 54.0k | TransitionToCompleteState(); |
1190 | 54.0k | const bool stepdown_failed = stepdown_resp_.error().status().code() != AppStatusPB::OK; |
1191 | 54.0k | LOG_WITH_PREFIX(INFO) << Format( |
1192 | 54.0k | "Leader step down done attempt=$0, leader_uuid=$1, change_uuid=$2, " |
1193 | 54.0k | "error=$3, failed=$4, should_remove=$5 for tablet $6.", |
1194 | 54.0k | attempt, permanent_uuid(), change_config_ts_uuid_, stepdown_resp_.error(), |
1195 | 54.0k | stepdown_failed, should_remove_, tablet_->tablet_id()); |
1196 | | |
1197 | 54.0k | if (stepdown_failed54.0k ) { |
1198 | 54.0k | tablet_->RegisterLeaderStepDownFailure(change_config_ts_uuid_, |
1199 | 54.0k | MonoDelta::FromMilliseconds(stepdown_resp_.has_time_since_election_failure_ms() ? |
1200 | 53.1k | stepdown_resp_.time_since_election_failure_ms()903 : 0)); |
1201 | 54.0k | } |
1202 | | |
1203 | 54.0k | if (should_remove_) { |
1204 | 779 | auto task = std::make_shared<AsyncRemoveServerTask>( |
1205 | 779 | master_, callback_pool_, tablet_, cstate_, change_config_ts_uuid_); |
1206 | | |
1207 | 779 | tablet_->table()->AddTask(task); |
1208 | 779 | Status status = task->Run(); |
1209 | 779 | WARN_NOT_OK(status, "Failed to send new RemoveServer request"); |
1210 | 779 | } |
1211 | 54.0k | } |
1212 | | |
1213 | | // ============================================================================ |
1214 | | // Class AsyncAddTableToTablet. |
1215 | | // ============================================================================ |
1216 | | AsyncAddTableToTablet::AsyncAddTableToTablet( |
1217 | | Master* master, ThreadPool* callback_pool, const scoped_refptr<TabletInfo>& tablet, |
1218 | | const scoped_refptr<TableInfo>& table) |
1219 | | : RetryingTSRpcTask( |
1220 | | master, callback_pool, std::make_unique<PickLeaderReplica>(tablet), table.get()), |
1221 | | tablet_(tablet), |
1222 | | table_(table), |
1223 | 127 | tablet_id_(tablet->tablet_id()) { |
1224 | 127 | req_.set_tablet_id(tablet->id()); |
1225 | 127 | auto& add_table = *req_.mutable_add_table(); |
1226 | 127 | add_table.set_table_id(table_->id()); |
1227 | 127 | add_table.set_table_name(table_->name()); |
1228 | 127 | add_table.set_table_type(table_->GetTableType()); |
1229 | 127 | { |
1230 | 127 | auto l = table->LockForRead(); |
1231 | 127 | add_table.set_schema_version(l->pb.version()); |
1232 | 127 | *add_table.mutable_schema() = l->pb.schema(); |
1233 | 127 | *add_table.mutable_partition_schema() = l->pb.partition_schema(); |
1234 | 127 | } |
1235 | 127 | } |
1236 | | |
1237 | 256 | string AsyncAddTableToTablet::description() const { |
1238 | 256 | return Substitute("AddTableToTablet RPC ($0) ($1)", table_->ToString(), tablet_->ToString()); |
1239 | 256 | } |
1240 | | |
1241 | 126 | void AsyncAddTableToTablet::HandleResponse(int attempt) { |
1242 | 126 | if (!rpc_.status().ok()) { |
1243 | 0 | AbortTask(rpc_.status()); |
1244 | 0 | LOG_WITH_PREFIX(WARNING) << Substitute( |
1245 | 0 | "Got error when adding table $0 to tablet $1, attempt $2 and error $3", |
1246 | 0 | table_->ToString(), tablet_->ToString(), attempt, rpc_.status().ToString()); |
1247 | 0 | return; |
1248 | 0 | } |
1249 | 126 | if (resp_.has_error()) { |
1250 | 0 | LOG_WITH_PREFIX(WARNING) << "AddTableToTablet() responded with error code " |
1251 | 0 | << TabletServerErrorPB_Code_Name(resp_.error().code()); |
1252 | 0 | switch (resp_.error().code()) { |
1253 | 0 | case TabletServerErrorPB::LEADER_NOT_READY_TO_SERVE: FALLTHROUGH_INTENDED; |
1254 | 0 | case TabletServerErrorPB::NOT_THE_LEADER: |
1255 | 0 | TransitionToWaitingState(MonitoredTaskState::kRunning); |
1256 | 0 | break; |
1257 | 0 | default: |
1258 | 0 | TransitionToCompleteState(); |
1259 | 0 | break; |
1260 | 0 | } |
1261 | | |
1262 | 0 | return; |
1263 | 0 | } |
1264 | | |
1265 | 126 | TransitionToCompleteState(); |
1266 | 126 | } |
1267 | | |
1268 | 126 | bool AsyncAddTableToTablet::SendRequest(int attempt) { |
1269 | 126 | ts_admin_proxy_->AddTableToTabletAsync(req_, &resp_, &rpc_, BindRpcCallback()); |
1270 | 126 | VLOG_WITH_PREFIX0 (1) |
1271 | 0 | << "Send AddTableToTablet request (attempt " << attempt << "):\n" << req_.DebugString(); |
1272 | 126 | return true; |
1273 | 126 | } |
1274 | | |
1275 | | // ============================================================================ |
1276 | | // Class AsyncRemoveTableFromTablet. |
1277 | | // ============================================================================ |
1278 | | AsyncRemoveTableFromTablet::AsyncRemoveTableFromTablet( |
1279 | | Master* master, ThreadPool* callback_pool, const scoped_refptr<TabletInfo>& tablet, |
1280 | | const scoped_refptr<TableInfo>& table) |
1281 | | : RetryingTSRpcTask( |
1282 | | master, callback_pool, std::make_unique<PickLeaderReplica>(tablet), table.get()), |
1283 | | table_(table), |
1284 | | tablet_(tablet), |
1285 | 81 | tablet_id_(tablet->tablet_id()) { |
1286 | 81 | req_.set_tablet_id(tablet->id()); |
1287 | 81 | req_.set_remove_table_id(table->id()); |
1288 | 81 | } |
1289 | | |
1290 | 169 | string AsyncRemoveTableFromTablet::description() const { |
1291 | 169 | return Substitute("RemoveTableFromTablet RPC ($0) ($1)", table_->ToString(), tablet_->ToString()); |
1292 | 169 | } |
1293 | | |
1294 | 81 | void AsyncRemoveTableFromTablet::HandleResponse(int attempt) { |
1295 | 81 | if (!rpc_.status().ok()) { |
1296 | 0 | AbortTask(rpc_.status()); |
1297 | 0 | LOG_WITH_PREFIX(WARNING) << Substitute( |
1298 | 0 | "Got error when removing table $0 from tablet $1, attempt $2 and error $3", |
1299 | 0 | table_->ToString(), tablet_->ToString(), attempt, rpc_.status().ToString()); |
1300 | 0 | return; |
1301 | 0 | } |
1302 | 81 | if (resp_.has_error()) { |
1303 | 1 | LOG_WITH_PREFIX(WARNING) << "RemoveTableFromTablet() responded with error code " |
1304 | 1 | << TabletServerErrorPB_Code_Name(resp_.error().code()); |
1305 | 1 | switch (resp_.error().code()) { |
1306 | 0 | case TabletServerErrorPB::LEADER_NOT_READY_TO_SERVE: FALLTHROUGH_INTENDED; |
1307 | 1 | case TabletServerErrorPB::NOT_THE_LEADER: |
1308 | 1 | TransitionToWaitingState(MonitoredTaskState::kRunning); |
1309 | 1 | break; |
1310 | 0 | default: |
1311 | 0 | TransitionToCompleteState(); |
1312 | 0 | break; |
1313 | 1 | } |
1314 | 80 | } else { |
1315 | 80 | TransitionToCompleteState(); |
1316 | 80 | } |
1317 | 81 | } |
1318 | | |
1319 | 81 | bool AsyncRemoveTableFromTablet::SendRequest(int attempt) { |
1320 | 81 | ts_admin_proxy_->RemoveTableFromTabletAsync(req_, &resp_, &rpc_, BindRpcCallback()); |
1321 | 81 | VLOG_WITH_PREFIX0 (1) << "Send RemoveTableFromTablet request (attempt " << attempt << "):\n" |
1322 | 0 | << req_.DebugString(); |
1323 | 81 | return true; |
1324 | 81 | } |
1325 | | |
1326 | | namespace { |
1327 | | |
1328 | 1 | bool IsDefinitelyPermanentError(const Status& s) { |
1329 | 1 | return s.IsInvalidArgument() || s.IsNotFound(); |
1330 | 1 | } |
1331 | | |
1332 | | } // namespace |
1333 | | |
1334 | | // ============================================================================ |
1335 | | // Class AsyncGetTabletSplitKey. |
1336 | | // ============================================================================ |
1337 | | AsyncGetTabletSplitKey::AsyncGetTabletSplitKey( |
1338 | | Master* master, ThreadPool* callback_pool, const scoped_refptr<TabletInfo>& tablet, |
1339 | | DataCallbackType result_cb) |
1340 | 143 | : AsyncTabletLeaderTask(master, callback_pool, tablet), result_cb_(result_cb) { |
1341 | 143 | req_.set_tablet_id(tablet_id()); |
1342 | 143 | } |
1343 | | |
1344 | 143 | void AsyncGetTabletSplitKey::HandleResponse(int attempt) { |
1345 | 143 | if (resp_.has_error()) { |
1346 | 1 | const Status s = StatusFromPB(resp_.error().status()); |
1347 | 1 | const TabletServerErrorPB::Code code = resp_.error().code(); |
1348 | 1 | LOG_WITH_PREFIX(WARNING) << "TS " << permanent_uuid() << ": GetSplitKey (attempt " << attempt |
1349 | 1 | << ") failed for tablet " << tablet_id() << " with error code " |
1350 | 1 | << TabletServerErrorPB::Code_Name(code) << ": " << s; |
1351 | 1 | if (IsDefinitelyPermanentError(s) || s.IsIllegalState()) { |
1352 | | // It can happen that tablet leader has completed post-split compaction after previous split, |
1353 | | // but followers have not yet completed post-split compaction. |
1354 | | // Catalog manager decides to split again and sends GetTabletSplitKey RPC, but tablet leader |
1355 | | // changes due to some reason and new tablet leader is not yet compacted. |
1356 | | // In this case we get IllegalState error and we don't want to retry until post-split |
1357 | | // compaction happened on leader. Once post-split compaction is done, CatalogManager will |
1358 | | // resend RPC. |
1359 | | // |
1360 | | // Another case for IsIllegalState is trying to split a tablet that has all the data with |
1361 | | // the same hash_code or the same doc_key, in this case we also don't want to retry RPC |
1362 | | // automatically. |
1363 | | // See https://github.com/yugabyte/yugabyte-db/issues/9159. |
1364 | 0 | TransitionToFailedState(state(), s); |
1365 | 0 | } |
1366 | 142 | } else { |
1367 | 142 | VLOG_WITH_PREFIX0 (1) |
1368 | 0 | << "TS " << permanent_uuid() << ": got split key for tablet " << tablet_id(); |
1369 | 142 | TransitionToCompleteState(); |
1370 | 142 | } |
1371 | | |
1372 | 143 | server::UpdateClock(resp_, master_->clock()); |
1373 | 143 | } |
1374 | | |
1375 | 143 | bool AsyncGetTabletSplitKey::SendRequest(int attempt) { |
1376 | 143 | req_.set_propagated_hybrid_time(master_->clock()->Now().ToUint64()); |
1377 | 143 | ts_proxy_->GetSplitKeyAsync(req_, &resp_, &rpc_, BindRpcCallback()); |
1378 | 143 | VLOG_WITH_PREFIX0 (1) |
1379 | 0 | << "Sent get split key request to " << permanent_uuid() << " (attempt " << attempt << "):\n" |
1380 | 0 | << req_.DebugString(); |
1381 | 143 | return true; |
1382 | 143 | } |
1383 | | |
1384 | 143 | void AsyncGetTabletSplitKey::Finished(const Status& status) { |
1385 | 143 | if (result_cb_) { |
1386 | 143 | if (status.ok()) { |
1387 | 142 | result_cb_(Data{resp_.split_encoded_key(), resp_.split_partition_key()}); |
1388 | 142 | } else { |
1389 | 1 | result_cb_(status); |
1390 | 1 | } |
1391 | 143 | } |
1392 | 143 | } |
1393 | | |
1394 | | // ============================================================================ |
1395 | | // Class AsyncSplitTablet. |
1396 | | // ============================================================================ |
1397 | | AsyncSplitTablet::AsyncSplitTablet( |
1398 | | Master* master, ThreadPool* callback_pool, const scoped_refptr<TabletInfo>& tablet, |
1399 | | const std::array<TabletId, kNumSplitParts>& new_tablet_ids, |
1400 | | const std::string& split_encoded_key, const std::string& split_partition_key, |
1401 | | TabletSplitCompleteHandlerIf* tablet_split_complete_handler) |
1402 | | : AsyncTabletLeaderTask(master, callback_pool, tablet), |
1403 | 140 | tablet_split_complete_handler_(tablet_split_complete_handler) { |
1404 | 140 | req_.set_tablet_id(tablet_id()); |
1405 | 140 | req_.set_new_tablet1_id(new_tablet_ids[0]); |
1406 | 140 | req_.set_new_tablet2_id(new_tablet_ids[1]); |
1407 | 140 | req_.set_split_encoded_key(split_encoded_key); |
1408 | 140 | req_.set_split_partition_key(split_partition_key); |
1409 | 140 | } |
1410 | | |
1411 | 140 | void AsyncSplitTablet::HandleResponse(int attempt) { |
1412 | 140 | if (resp_.has_error()) { |
1413 | 96 | const Status s = StatusFromPB(resp_.error().status()); |
1414 | 96 | const TabletServerErrorPB::Code code = resp_.error().code(); |
1415 | 96 | LOG_WITH_PREFIX(WARNING) << "TS " << permanent_uuid() << ": split (attempt " << attempt |
1416 | 96 | << ") failed for tablet " << tablet_id() << " with error code " |
1417 | 96 | << TabletServerErrorPB::Code_Name(code) << ": " << s; |
1418 | 96 | if (s.IsAlreadyPresent()) { |
1419 | 96 | TransitionToCompleteState(); |
1420 | 96 | } else if (0 IsDefinitelyPermanentError(s)0 ) { |
1421 | 0 | TransitionToFailedState(state(), s); |
1422 | 0 | } |
1423 | 96 | } else { |
1424 | 44 | VLOG_WITH_PREFIX0 (1) |
1425 | 0 | << "TS " << permanent_uuid() << ": split complete on tablet " << tablet_id(); |
1426 | 44 | TransitionToCompleteState(); |
1427 | 44 | } |
1428 | | |
1429 | 140 | server::UpdateClock(resp_, master_->clock()); |
1430 | 140 | } |
1431 | | |
1432 | 140 | bool AsyncSplitTablet::SendRequest(int attempt) { |
1433 | 140 | req_.set_dest_uuid(permanent_uuid()); |
1434 | 140 | req_.set_propagated_hybrid_time(master_->clock()->Now().ToUint64()); |
1435 | 140 | ts_admin_proxy_->SplitTabletAsync(req_, &resp_, &rpc_, BindRpcCallback()); |
1436 | 140 | VLOG_WITH_PREFIX0 (1) |
1437 | 0 | << "Sent split tablet request to " << permanent_uuid() << " (attempt " << attempt << "):\n" |
1438 | 0 | << req_.DebugString(); |
1439 | 140 | return true; |
1440 | 140 | } |
1441 | | |
1442 | 140 | void AsyncSplitTablet::Finished(const Status& status) { |
1443 | 140 | if (tablet_split_complete_handler_) { |
1444 | 140 | SplitTabletIds split_tablet_ids { |
1445 | 140 | .source = req_.tablet_id(), |
1446 | 140 | .children = {req_.new_tablet1_id(), req_.new_tablet2_id()} |
1447 | 140 | }; |
1448 | 140 | tablet_split_complete_handler_->ProcessSplitTabletResult( |
1449 | 140 | status, table_->id(), split_tablet_ids); |
1450 | 140 | } |
1451 | 140 | } |
1452 | | |
1453 | | } // namespace master |
1454 | | } // namespace yb |