YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/master/async_rpc_tasks.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include "yb/master/async_rpc_tasks.h"
15
16
#include "yb/common/wire_protocol.h"
17
18
#include "yb/consensus/consensus.proxy.h"
19
#include "yb/consensus/consensus_meta.h"
20
21
#include "yb/gutil/map-util.h"
22
23
#include "yb/master/catalog_entity_info.h"
24
#include "yb/master/catalog_manager_if.h"
25
#include "yb/master/master.h"
26
#include "yb/master/tablet_split_complete_handler.h"
27
#include "yb/master/ts_descriptor.h"
28
#include "yb/master/ts_manager.h"
29
30
#include "yb/rpc/messenger.h"
31
32
#include "yb/tserver/backup.proxy.h"
33
#include "yb/tserver/tserver_admin.proxy.h"
34
#include "yb/tserver/tserver_service.proxy.h"
35
36
#include "yb/util/atomic.h"
37
#include "yb/util/flag_tags.h"
38
#include "yb/util/metrics.h"
39
#include "yb/util/source_location.h"
40
#include "yb/util/status_format.h"
41
#include "yb/util/status_log.h"
42
#include "yb/util/thread_restrictions.h"
43
#include "yb/util/threadpool.h"
44
45
using namespace std::literals;
46
47
DEFINE_int32(unresponsive_ts_rpc_timeout_ms, 15 * 60 * 1000,  // 15 minutes
48
             "After this amount of time (or after we have retried unresponsive_ts_rpc_retry_limit "
49
             "times, whichever happens first), the master will stop attempting to contact a tablet "
50
             "server in order to perform operations such as deleting a tablet.");
51
TAG_FLAG(unresponsive_ts_rpc_timeout_ms, advanced);
52
53
DEFINE_int32(unresponsive_ts_rpc_retry_limit, 20,
54
             "After this number of retries (or unresponsive_ts_rpc_timeout_ms expires, whichever "
55
             "happens first), the master will stop attempting to contact a tablet server in order "
56
             "to perform operations such as deleting a tablet.");
57
TAG_FLAG(unresponsive_ts_rpc_retry_limit, advanced);
58
59
DEFINE_int32(retrying_ts_rpc_max_delay_ms, 60 * 1000,
60
             "Maximum delay between successive attempts to contact an unresponsive tablet server");
61
TAG_FLAG(retrying_ts_rpc_max_delay_ms, advanced);
62
63
DEFINE_test_flag(int32, slowdown_master_async_rpc_tasks_by_ms, 0,
64
                 "For testing purposes, slow down the run method to take longer.");
65
66
// The flags are defined in catalog_manager.cc.
67
DECLARE_int32(master_ts_rpc_timeout_ms);
68
DECLARE_int32(tablet_creation_timeout_ms);
69
DECLARE_int32(TEST_slowdown_alter_table_rpcs_ms);
70
71
namespace yb {
72
namespace master {
73
74
using namespace std::placeholders;
75
76
using std::string;
77
using std::shared_ptr;
78
79
using strings::Substitute;
80
using consensus::RaftPeerPB;
81
using server::MonitoredTaskState;
82
using tserver::TabletServerErrorPB;
83
84
void RetryingTSRpcTask::UpdateMetrics(scoped_refptr<Histogram> metric, MonoTime start_time,
85
                                      const std::string& metric_name,
86
518k
                                      const std::string& metric_type) {
87
518k
  metric->Increment(MonoTime::Now().GetDeltaSince(start_time).ToMicroseconds());
88
518k
}
89
90
// ============================================================================
91
//  Class PickSpecificUUID.
92
// ============================================================================
93
183k
Status PickSpecificUUID::PickReplica(TSDescriptor** ts_desc) {
94
183k
  shared_ptr<TSDescriptor> ts;
95
183k
  if (!master_->ts_manager()->LookupTSByUUID(ts_uuid_, &ts)) {
96
0
    return STATUS(NotFound, "unknown tablet server id", ts_uuid_);
97
0
  }
98
183k
  *ts_desc = ts.get();
99
183k
  return Status::OK();
100
183k
}
101
102
0
string ReplicaMapToString(const TabletReplicaMap& replicas) {
103
0
  string ret = "";
104
0
  for (const auto& r : replicas) {
105
0
    if (!ret.empty()) {
106
0
      ret += ", ";
107
0
    } else {
108
0
      ret += "(";
109
0
    }
110
0
    ret += r.second.ts_desc->permanent_uuid();
111
0
  }
112
0
  ret += ")";
113
0
  return ret;
114
0
}
115
116
// ============================================================================
117
//  Class PickLeaderReplica.
118
// ============================================================================
119
PickLeaderReplica::PickLeaderReplica(const scoped_refptr<TabletInfo>& tablet)
120
88.0k
    : tablet_(tablet) {
121
88.0k
}
122
123
91.2k
Status PickLeaderReplica::PickReplica(TSDescriptor** ts_desc) {
124
88.1k
  *ts_desc = VERIFY_RESULT(tablet_->GetLeader());
125
88.1k
  return Status::OK();
126
91.2k
}
127
128
// ============================================================================
129
//  Class RetryingTSRpcTask.
130
// ============================================================================
131
132
RetryingTSRpcTask::RetryingTSRpcTask(Master *master,
133
                                     ThreadPool* callback_pool,
134
                                     std::unique_ptr<TSPicker> replica_picker,
135
                                     const scoped_refptr<TableInfo>& table)
136
  : master_(master),
137
    callback_pool_(callback_pool),
138
    replica_picker_(std::move(replica_picker)),
139
    table_(table),
140
    start_ts_(MonoTime::Now()),
141
246k
    deadline_(start_ts_ + FLAGS_unresponsive_ts_rpc_timeout_ms * 1ms) {
142
246k
}
143
144
152k
RetryingTSRpcTask::~RetryingTSRpcTask() {
145
152k
  auto state = state_.load(std::memory_order_acquire);
146
2
  LOG_IF(DFATAL, !IsStateTerminal(state))
147
2
      << "Destroying " << this << " task in a wrong state: " << AsString(state);
148
1
  VLOG_WITH_FUNC(1) << "Destroying " << this << " in " << AsString(state);
149
152k
}
150
151
208k
std::string RetryingTSRpcTask::LogPrefix() const {
152
208k
  return Format("$0 (task=$1, state=$2): ", description(), static_cast<const void*>(this), state());
153
208k
}
154
155
686k
std::string RetryingTSRpcTask::table_name() const {
156
686k
  return !table_ ? "" : table_->ToString();
157
686k
}
158
159
// Send the subclass RPC request.
160
275k
Status RetryingTSRpcTask::Run() {
161
19
  VLOG_WITH_PREFIX(1) << "Start Running";
162
275k
  attempt_start_ts_ = MonoTime::Now();
163
275k
  ++attempt_;
164
29
  VLOG_WITH_PREFIX(1) << "Start Running, attempt: " << attempt_;
165
275k
  for (;;) {
166
275k
    auto task_state = state();
167
275k
    if (task_state == MonitoredTaskState::kAborted) {
168
0
      return STATUS(IllegalState, "Unable to run task because it has been aborted");
169
0
    }
170
275k
    if (task_state == MonitoredTaskState::kWaiting) {
171
275k
      break;
172
275k
    }
173
174
43
    LOG_IF_WITH_PREFIX(DFATAL, task_state != MonitoredTaskState::kScheduling)
175
43
        << "Expected task to be in kScheduling state but found: " << AsString(task_state);
176
177
    // We expect this case to be very rare, since we switching to waiting state right after
178
    // scheduling task on messenger. So just busy wait.
179
43
    std::this_thread::yield();
180
43
  }
181
182
275k
  Status s = ResetTSProxy();
183
275k
  if (!s.ok()) {
184
3.10k
    s = s.CloneAndPrepend("Failed to reset TS proxy");
185
3.10k
    LOG_WITH_PREFIX(INFO) << s;
186
3.10k
    if (s.IsExpired()) {
187
0
      TransitionToTerminalState(MonitoredTaskState::kWaiting, MonitoredTaskState::kFailed, s);
188
0
      UnregisterAsyncTask();
189
0
      return s;
190
0
    }
191
3.13k
    if (RescheduleWithBackoffDelay()) {
192
3.13k
      return Status::OK();
193
3.13k
    }
194
195
18.4E
    auto state = this->state();
196
18.4E
    UnregisterAsyncTask(); // May delete this.
197
198
18.4E
    if (state == MonitoredTaskState::kFailed) {
199
0
      return s;
200
0
    }
201
18.4E
    if (state == MonitoredTaskState::kAborted) {
202
0
      return STATUS(IllegalState, "Unable to run task because it has been aborted");
203
0
    }
204
205
18.4E
    LOG_WITH_PREFIX(FATAL) << "Failed to change task to MonitoredTaskState::kFailed state from "
206
18.4E
                           << state;
207
272k
  } else {
208
272k
    rpc_.Reset();
209
272k
  }
210
211
  // Calculate and set the timeout deadline.
212
272k
  const MonoTime deadline = ComputeDeadline();
213
272k
  rpc_.set_deadline(deadline);
214
215
272k
  if (!PerformStateTransition(MonitoredTaskState::kWaiting, MonitoredTaskState::kRunning)) {
216
0
    if (state() == MonitoredTaskState::kAborted) {
217
0
      return STATUS(Aborted, "Unable to run task because it has been aborted");
218
0
    }
219
220
0
    LOG_WITH_PREFIX(DFATAL) <<
221
0
        "Task transition MonitoredTaskState::kWaiting -> MonitoredTaskState::kRunning failed";
222
0
    return Failed(STATUS_FORMAT(IllegalState, "Task in invalid state $0", state()));
223
0
  }
224
225
272k
  auto slowdown_flag_val = GetAtomicFlag(&FLAGS_TEST_slowdown_master_async_rpc_tasks_by_ms);
226
272k
  if (PREDICT_FALSE(slowdown_flag_val> 0)) {
227
0
    VLOG_WITH_PREFIX(1) << "Slowing down by " << slowdown_flag_val << " ms.";
228
0
    bool old_thread_restriction = ThreadRestrictions::SetWaitAllowed(true);
229
0
    SleepFor(MonoDelta::FromMilliseconds(slowdown_flag_val));
230
0
    ThreadRestrictions::SetWaitAllowed(old_thread_restriction);
231
0
    VLOG_WITH_PREFIX(2) << "Slowing down done. Resuming.";
232
0
  }
233
272k
  if (!SendRequest(attempt_) && !RescheduleWithBackoffDelay()) {
234
215
    UnregisterAsyncTask();  // May call 'delete this'.
235
215
  }
236
272k
  return Status::OK();
237
272k
}
238
239
269k
MonoTime RetryingTSRpcTask::ComputeDeadline() {
240
269k
  MonoTime timeout = MonoTime::Now();
241
269k
  timeout.AddDelta(MonoDelta::FromMilliseconds(FLAGS_master_ts_rpc_timeout_ms));
242
269k
  return MonoTime::Earliest(timeout, deadline_);
243
269k
}
244
245
// Abort this task and return its value before it was successfully aborted. If the task entered
246
// a different terminal state before we were able to abort it, return that state.
247
533
MonitoredTaskState RetryingTSRpcTask::AbortAndReturnPrevState(const Status& status) {
248
533
  auto prev_state = state();
249
533
  while (!IsStateTerminal(prev_state)) {
250
513
    auto expected = prev_state;
251
513
    if (state_.compare_exchange_weak(expected, MonitoredTaskState::kAborted)) {
252
0
      VLOG_WITH_PREFIX_AND_FUNC(1)
253
0
          << "Aborted with: " << status << ", prev state: " << AsString(prev_state);
254
513
      AbortIfScheduled();
255
513
      Finished(status);
256
513
      UnregisterAsyncTask();
257
513
      return prev_state;
258
513
    }
259
0
    prev_state = state();
260
0
  }
261
0
  VLOG_WITH_PREFIX_AND_FUNC(1)
262
0
      << "Already terminated, prev state: " << AsString(prev_state);
263
20
  UnregisterAsyncTask();
264
20
  return prev_state;
265
533
}
266
267
252
void RetryingTSRpcTask::AbortTask(const Status& status) {
268
252
  AbortAndReturnPrevState(status);
269
252
}
270
271
272k
void RetryingTSRpcTask::RpcCallback() {
272
  // Defer the actual work of the callback off of the reactor thread.
273
  // This is necessary because our callbacks often do synchronous writes to
274
  // the catalog table, and we can't do synchronous IO on the reactor.
275
  //
276
  // Note: This can fail on shutdown, so just print a warning for it.
277
272k
  Status s = callback_pool_->SubmitFunc(
278
272k
      std::bind(&RetryingTSRpcTask::DoRpcCallback, shared_from(this)));
279
3
  VLOG_WITH_PREFIX_AND_FUNC(3) << "Submit status: " << s;
280
272k
  if (!s.ok()) {
281
0
    WARN_NOT_OK(s, "Could not submit to queue, probably shutting down");
282
0
    AbortTask(s);
283
0
  }
284
272k
}
285
286
// Handle the actual work of the RPC callback. This is run on the master's worker
287
// pool, rather than a reactor thread, so it may do blocking IO operations.
288
272k
void RetryingTSRpcTask::DoRpcCallback() {
289
31
  VLOG_WITH_PREFIX_AND_FUNC(3) << "Rpc status: " << rpc_.status();
290
291
272k
  if (!rpc_.status().ok()) {
292
1.16k
    LOG_WITH_PREFIX(WARNING) << "TS " << target_ts_desc_->permanent_uuid() << ": "
293
1.16k
                             << type_name() << " RPC failed for tablet "
294
1.16k
                             << tablet_id() << ": " << rpc_.status().ToString();
295
1.16k
    if (!target_ts_desc_->IsLive() && type() == ASYNC_DELETE_REPLICA) {
296
145
      LOG_WITH_PREFIX(WARNING)
297
145
          << "TS " << target_ts_desc_->permanent_uuid() << ": delete failed for tablet "
298
145
          << tablet_id() << ". TS is DEAD. No further retry.";
299
145
      TransitionToCompleteState();
300
145
    }
301
270k
  } else if (state() != MonitoredTaskState::kAborted) {
302
270k
    HandleResponse(attempt_);  // Modifies state_.
303
270k
  }
304
272k
  UpdateMetrics(master_->GetMetric(type_name(), Master::AttemptMetric, description()),
305
272k
                attempt_start_ts_, type_name(), "attempt metric");
306
307
  // Schedule a retry if the RPC call was not successful.
308
272k
  if (RescheduleWithBackoffDelay()) {
309
26.5k
    return;
310
26.5k
  }
311
312
245k
  UnregisterAsyncTask();  // May call 'delete this'.
313
245k
}
314
315
27.2k
int RetryingTSRpcTask::num_max_retries() { return FLAGS_unresponsive_ts_rpc_retry_limit; }
316
317
4
int RetryingTSRpcTask::max_delay_ms() {
318
4
  return FLAGS_retrying_ts_rpc_max_delay_ms;
319
4
}
320
321
275k
bool RetryingTSRpcTask::RescheduleWithBackoffDelay() {
322
275k
  auto task_state = state();
323
275k
  if (task_state != MonitoredTaskState::kRunning &&
324
      // Allow kWaiting for task(s) that have never successfully ResetTSProxy().
325
248k
      task_state != MonitoredTaskState::kWaiting) {
326
245k
    if (task_state != MonitoredTaskState::kComplete) {
327
228
      LOG_WITH_PREFIX(INFO) << "No reschedule for this task: " << AsString(task_state);
328
228
    }
329
245k
    return false;
330
245k
  }
331
332
29.7k
  int attempt_threshold = std::numeric_limits<int>::max();
333
29.7k
  if (NoRetryTaskType()) {
334
0
    attempt_threshold = 0;
335
29.7k
  } else if (RetryLimitTaskType()) {
336
27.2k
    attempt_threshold = num_max_retries();
337
27.2k
  }
338
339
29.7k
  if (attempt_ > attempt_threshold) {
340
11
    auto status = STATUS_FORMAT(
341
11
        Aborted, "Reached maximum number of retries ($0)", attempt_threshold);
342
11
    LOG_WITH_PREFIX(WARNING)
343
11
        << status << " for request " << description()
344
11
        << ", task=" << this << " state=" << state();
345
11
    TransitionToFailedState(task_state, status);
346
11
    return false;
347
11
  }
348
349
29.7k
  MonoTime now = MonoTime::Now();
350
  // We assume it might take 10ms to process the request in the best case,
351
  // fail if we have less than that amount of time remaining.
352
29.7k
  int64_t millis_remaining = deadline_.GetDeltaSince(now).ToMilliseconds() - 10;
353
  // Exponential backoff with jitter.
354
29.7k
  int64_t base_delay_ms;
355
29.7k
  if (attempt_ <= 12) {
356
29.7k
    base_delay_ms = 1 << (attempt_ + 3);  // 1st retry delayed 2^4 ms, 2nd 2^5, etc.
357
6
  } else {
358
6
    base_delay_ms = max_delay_ms();
359
6
  }
360
  // Normal rand is seeded by default with 1. Using the same for rand_r seed.
361
29.7k
  unsigned int seed = 1;
362
29.7k
  int64_t jitter_ms = rand_r(&seed) % 50;  // Add up to 50ms of additional random delay.
363
29.7k
  int64_t delay_millis = std::min<int64_t>(base_delay_ms + jitter_ms, millis_remaining);
364
365
29.7k
  if (delay_millis <= 0) {
366
16
    auto status = STATUS(TimedOut, "Request timed out");
367
16
    LOG_WITH_PREFIX(WARNING) << status;
368
16
    TransitionToFailedState(task_state, status);
369
16
    return false;
370
16
  }
371
372
29.7k
  LOG_WITH_PREFIX(INFO) << "Scheduling retry with a delay of " << delay_millis
373
29.7k
                        << "ms (attempt = " << attempt_ << " / " << attempt_threshold << ")...";
374
375
29.7k
  if (!PerformStateTransition(task_state, MonitoredTaskState::kScheduling)) {
376
0
    LOG_WITH_PREFIX(WARNING) << "Unable to mark this task as MonitoredTaskState::kScheduling";
377
0
    return false;
378
0
  }
379
29.7k
  auto task_id = master_->messenger()->ScheduleOnReactor(
380
29.7k
      std::bind(&RetryingTSRpcTask::RunDelayedTask, shared_from(this), _1),
381
29.7k
      MonoDelta::FromMilliseconds(delay_millis), SOURCE_LOCATION(), master_->messenger());
382
3
  VLOG_WITH_PREFIX_AND_FUNC(4) << "Task id: " << task_id;
383
29.7k
  reactor_task_id_.store(task_id, std::memory_order_release);
384
385
29.7k
  if (task_id == rpc::kInvalidTaskId) {
386
1
    AbortTask(STATUS(Aborted, "Messenger closing"));
387
1
    UnregisterAsyncTask();
388
1
    return false;
389
1
  }
390
391
29.7k
  return TransitionToWaitingState(MonitoredTaskState::kScheduling);
392
29.7k
}
393
394
29.6k
void RetryingTSRpcTask::RunDelayedTask(const Status& status) {
395
29.6k
  if (state() == MonitoredTaskState::kAborted) {
396
60
    UnregisterAsyncTask();  // May delete this.
397
60
    return;
398
60
  }
399
400
29.5k
  if (!status.ok()) {
401
36
    LOG_WITH_PREFIX(WARNING) << "Async tablet task failed or was cancelled: " << status;
402
36
    if (status.IsAborted() || status.IsServiceUnavailable()) {
403
36
      AbortTask(status);
404
36
    }
405
36
    UnregisterAsyncTask();  // May delete this.
406
36
    return;
407
36
  }
408
409
29.5k
  auto log_prefix = LogPrefix(); // Save in case we need to log after deletion.
410
29.5k
  Status s = Run();  // May delete this.
411
29.5k
  if (!s.ok()) {
412
0
    LOG(WARNING) << log_prefix << "Async tablet task failed: " << s;
413
0
  }
414
29.5k
}
415
416
193k
void RetryingTSRpcTask::UnregisterAsyncTaskCallback() {}
417
418
0
Status RetryingTSRpcTask::Failed(const Status& status) {
419
0
  LOG_WITH_PREFIX(WARNING) << "Async task failed: " << status;
420
0
  Finished(status);
421
0
  UnregisterAsyncTask();
422
0
  return status;
423
0
}
424
425
246k
void RetryingTSRpcTask::UnregisterAsyncTask() {
426
  // Retain a reference to the object, in case RemoveTask would have removed the last one.
427
246k
  auto self = shared_from_this();
428
246k
  std::unique_lock<decltype(unregister_mutex_)> lock(unregister_mutex_);
429
246k
  UpdateMetrics(master_->GetMetric(type_name(), Master::TaskMetric, description()), start_ts_,
430
246k
                type_name(), "task metric");
431
432
246k
  auto s = state();
433
246k
  if (!IsStateTerminal(s)) {
434
0
    LOG_WITH_PREFIX(FATAL) << "Invalid task state " << s;
435
0
  }
436
246k
  end_ts_ = MonoTime::Now();
437
246k
  if (table_ != nullptr && table_->RemoveTask(self)) {
438
    // We don't delete table while it have running tasks, so should check whether it was last task,
439
    // even it is not delete table task.
440
29.0k
    master_->catalog_manager()->CheckTableDeleted(table_);
441
29.0k
  }
442
  // Make sure to run the callbacks last, in case they rely on the task no longer being tracked
443
  // by the table.
444
246k
  UnregisterAsyncTaskCallback();
445
246k
}
446
447
513
void RetryingTSRpcTask::AbortIfScheduled() {
448
513
  auto reactor_task_id = reactor_task_id_.load(std::memory_order_acquire);
449
0
  VLOG_WITH_PREFIX_AND_FUNC(1) << "Reactor task id: " << reactor_task_id;
450
513
  if (reactor_task_id != rpc::kInvalidTaskId) {
451
312
    master_->messenger()->AbortOnReactor(reactor_task_id);
452
312
  }
453
513
}
454
455
275k
Status RetryingTSRpcTask::ResetTSProxy() {
456
  // TODO: if there is no replica available, should we still keep the task running?
457
275k
  RETURN_NOT_OK(replica_picker_->PickReplica(&target_ts_desc_));
458
459
272k
  shared_ptr<tserver::TabletServerServiceProxy> ts_proxy;
460
272k
  shared_ptr<tserver::TabletServerAdminServiceProxy> ts_admin_proxy;
461
272k
  shared_ptr<consensus::ConsensusServiceProxy> consensus_proxy;
462
272k
  shared_ptr<tserver::TabletServerBackupServiceProxy> ts_backup_proxy;
463
464
272k
  RETURN_NOT_OK(target_ts_desc_->GetProxy(&ts_proxy));
465
272k
  RETURN_NOT_OK(target_ts_desc_->GetProxy(&ts_admin_proxy));
466
272k
  RETURN_NOT_OK(target_ts_desc_->GetProxy(&consensus_proxy));
467
272k
  RETURN_NOT_OK(target_ts_desc_->GetProxy(&ts_backup_proxy));
468
469
272k
  ts_proxy_.swap(ts_proxy);
470
272k
  ts_admin_proxy_.swap(ts_admin_proxy);
471
272k
  consensus_proxy_.swap(consensus_proxy);
472
272k
  ts_backup_proxy_.swap(ts_backup_proxy);
473
474
272k
  return Status::OK();
475
272k
}
476
477
void RetryingTSRpcTask::TransitionToTerminalState(MonitoredTaskState expected,
478
                                                  MonitoredTaskState terminal_state,
479
245k
                                                  const Status& status) {
480
245k
  if (!PerformStateTransition(expected, terminal_state)) {
481
0
    if (terminal_state != MonitoredTaskState::kAborted && state() == MonitoredTaskState::kAborted) {
482
0
      LOG_WITH_PREFIX(WARNING) << "Unable to perform transition " << expected << " -> "
483
0
                               << terminal_state << ". Task has been aborted";
484
0
    } else {
485
0
      LOG_WITH_PREFIX(DFATAL) << "State transition " << expected << " -> "
486
0
                              << terminal_state << " failed. Current task is in an invalid state: "
487
0
                              << state();
488
0
    }
489
0
    return;
490
0
  }
491
492
245k
  Finished(status);
493
245k
}
494
495
void RetryingTSRpcTask::TransitionToFailedState(server::MonitoredTaskState expected,
496
33
                                                const yb::Status& status) {
497
33
  TransitionToTerminalState(expected, MonitoredTaskState::kFailed, status);
498
33
}
499
500
245k
void RetryingTSRpcTask::TransitionToCompleteState() {
501
245k
  TransitionToTerminalState(
502
245k
      MonitoredTaskState::kRunning, MonitoredTaskState::kComplete, Status::OK());
503
245k
}
504
505
29.7k
bool RetryingTSRpcTask::TransitionToWaitingState(MonitoredTaskState expected) {
506
29.7k
  if (!PerformStateTransition(expected, MonitoredTaskState::kWaiting)) {
507
    // The only valid reason for state not being MonitoredTaskState is because the task got
508
    // aborted.
509
0
    if (state() != MonitoredTaskState::kAborted) {
510
0
      LOG_WITH_PREFIX(FATAL) << "Unable to mark task as MonitoredTaskState::kWaiting";
511
0
    }
512
0
    AbortIfScheduled();
513
0
    return false;
514
29.7k
  } else {
515
29.7k
    return true;
516
29.7k
  }
517
29.7k
}
518
519
// ============================================================================
520
//  Class AsyncTabletLeaderTask.
521
// ============================================================================
522
AsyncTabletLeaderTask::AsyncTabletLeaderTask(
523
    Master* master, ThreadPool* callback_pool, const scoped_refptr<TabletInfo>& tablet)
524
    : RetryingTSRpcTask(
525
          master, callback_pool, std::unique_ptr<TSPicker>(new PickLeaderReplica(tablet)),
526
          tablet->table().get()),
527
56.3k
      tablet_(tablet) {
528
56.3k
}
529
530
AsyncTabletLeaderTask::AsyncTabletLeaderTask(
531
    Master* master, ThreadPool* callback_pool, const scoped_refptr<TabletInfo>& tablet,
532
    const scoped_refptr<TableInfo>& table)
533
    : RetryingTSRpcTask(
534
          master, callback_pool, std::unique_ptr<TSPicker>(new PickLeaderReplica(tablet)), table),
535
18.3k
      tablet_(tablet) {
536
18.3k
}
537
538
69.7k
AsyncTabletLeaderTask::~AsyncTabletLeaderTask() = default;
539
540
158k
std::string AsyncTabletLeaderTask::description() const {
541
158k
  return Format("$0 RPC for tablet $1 ($2)", type_name(), tablet_, table_name());
542
158k
}
543
544
53.5k
TabletId AsyncTabletLeaderTask::tablet_id() const {
545
53.5k
  return tablet_->tablet_id();
546
53.5k
}
547
548
20.8k
TabletServerId AsyncTabletLeaderTask::permanent_uuid() const {
549
18.4E
  return target_ts_desc_ != nullptr ? target_ts_desc_->permanent_uuid() : "";
550
20.8k
}
551
552
// ============================================================================
553
//  Class AsyncCreateReplica.
554
// ============================================================================
555
AsyncCreateReplica::AsyncCreateReplica(Master *master,
556
                                       ThreadPool *callback_pool,
557
                                       const string& permanent_uuid,
558
                                       const scoped_refptr<TabletInfo>& tablet,
559
                                       const std::vector<SnapshotScheduleId>& snapshot_schedules)
560
  : RetrySpecificTSRpcTask(master, callback_pool, permanent_uuid, tablet->table().get()),
561
82.1k
    tablet_id_(tablet->tablet_id()) {
562
82.1k
  deadline_ = start_ts_;
563
82.1k
  deadline_.AddDelta(MonoDelta::FromMilliseconds(FLAGS_tablet_creation_timeout_ms));
564
565
82.1k
  auto table_lock = tablet->table()->LockForRead();
566
82.1k
  const SysTabletsEntryPB& tablet_pb = tablet->metadata().dirty().pb;
567
568
82.1k
  req_.set_dest_uuid(permanent_uuid);
569
82.1k
  req_.set_table_id(tablet->table()->id());
570
82.1k
  req_.set_tablet_id(tablet->tablet_id());
571
82.1k
  req_.set_table_type(tablet->table()->metadata().state().pb.table_type());
572
82.1k
  req_.mutable_partition()->CopyFrom(tablet_pb.partition());
573
82.1k
  req_.set_namespace_id(table_lock->pb.namespace_id());
574
82.1k
  req_.set_namespace_name(table_lock->pb.namespace_name());
575
82.1k
  req_.set_table_name(table_lock->pb.name());
576
82.1k
  req_.mutable_schema()->CopyFrom(table_lock->pb.schema());
577
82.1k
  req_.mutable_partition_schema()->CopyFrom(table_lock->pb.partition_schema());
578
82.1k
  req_.mutable_config()->CopyFrom(tablet_pb.committed_consensus_state().config());
579
82.1k
  req_.set_colocated(tablet_pb.colocated());
580
82.1k
  if (table_lock->pb.has_index_info()) {
581
9.43k
    req_.mutable_index_info()->CopyFrom(table_lock->pb.index_info());
582
9.43k
  }
583
82.1k
  auto& req_schedules = *req_.mutable_snapshot_schedules();
584
82.1k
  req_schedules.Reserve(narrow_cast<int>(snapshot_schedules.size()));
585
0
  for (const auto& id : snapshot_schedules) {
586
0
    req_schedules.Add()->assign(id.AsSlice().cdata(), id.size());
587
0
  }
588
82.1k
}
589
590
164k
std::string AsyncCreateReplica::description() const {
591
164k
  return Format("CreateTablet RPC for tablet $0 ($1) on TS=$2",
592
164k
                tablet_id_, table_name(), permanent_uuid_);
593
164k
}
594
595
82.0k
void AsyncCreateReplica::HandleResponse(int attempt) {
596
82.0k
  if (resp_.has_error()) {
597
5
    Status s = StatusFromPB(resp_.error().status());
598
5
    if (s.IsAlreadyPresent()) {
599
4
      LOG_WITH_PREFIX(INFO) << "CreateTablet RPC for tablet " << tablet_id_
600
4
                            << " on TS " << permanent_uuid_ << " returned already present: "
601
4
                            << s;
602
4
      TransitionToCompleteState();
603
1
    } else {
604
1
      LOG_WITH_PREFIX(WARNING) << "CreateTablet RPC for tablet " << tablet_id_
605
1
                               << " on TS " << permanent_uuid_ << " failed: " << s;
606
1
    }
607
608
5
    return;
609
5
  }
610
611
82.0k
  TransitionToCompleteState();
612
18.4E
  VLOG_WITH_PREFIX(1) << "TS " << permanent_uuid_ << ": complete on tablet " << tablet_id_;
613
82.0k
}
614
615
81.7k
bool AsyncCreateReplica::SendRequest(int attempt) {
616
81.7k
  ts_admin_proxy_->CreateTabletAsync(req_, &resp_, &rpc_, BindRpcCallback());
617
604
  VLOG_WITH_PREFIX(1) << "Send create tablet request to " << permanent_uuid_ << ":\n"
618
604
                      << " (attempt " << attempt << "):\n"
619
604
                      << req_.DebugString();
620
81.7k
  return true;
621
81.7k
}
622
623
// ============================================================================
624
//  Class AsyncStartElection.
625
// ============================================================================
626
AsyncStartElection::AsyncStartElection(Master *master,
627
                                       ThreadPool *callback_pool,
628
                                       const string& permanent_uuid,
629
                                       const scoped_refptr<TabletInfo>& tablet)
630
  : RetrySpecificTSRpcTask(master, callback_pool, permanent_uuid, tablet->table().get()),
631
28.0k
    tablet_id_(tablet->tablet_id()) {
632
28.0k
  deadline_ = start_ts_;
633
28.0k
  deadline_.AddDelta(MonoDelta::FromMilliseconds(FLAGS_tablet_creation_timeout_ms));
634
635
28.0k
  req_.set_dest_uuid(permanent_uuid_);
636
28.0k
  req_.set_tablet_id(tablet_id_);
637
28.0k
  req_.set_initial_election(true);
638
28.0k
}
639
640
51.5k
void AsyncStartElection::HandleResponse(int attempt) {
641
51.5k
  if (resp_.has_error()) {
642
23.5k
    Status s = StatusFromPB(resp_.error().status());
643
23.5k
    if (!s.ok()) {
644
23.5k
      LOG_WITH_PREFIX(WARNING) << "RunLeaderElection RPC for tablet " << tablet_id_
645
23.5k
                               << " on TS " << permanent_uuid_ << " failed: " << s;
646
23.5k
    }
647
648
23.5k
    return;
649
23.5k
  }
650
651
27.9k
  TransitionToCompleteState();
652
27.9k
}
653
654
201k
std::string AsyncStartElection::description() const {
655
201k
  return Format("RunLeaderElection RPC for tablet $0 ($1) on TS=$2",
656
201k
                tablet_id_, table_name(), permanent_uuid_);
657
201k
}
658
659
51.5k
bool AsyncStartElection::SendRequest(int attempt) {
660
51.5k
  LOG_WITH_PREFIX(INFO) << Format(
661
51.5k
      "Hinted Leader start election at $0 for tablet $1, attempt $2",
662
51.5k
      permanent_uuid_, tablet_id_, attempt);
663
51.5k
  consensus_proxy_->RunLeaderElectionAsync(req_, &resp_, &rpc_, BindRpcCallback());
664
665
51.5k
  return true;
666
51.5k
}
667
668
// ============================================================================
669
//  Class AsyncDeleteReplica.
670
// ============================================================================
671
49.2k
void AsyncDeleteReplica::HandleResponse(int attempt) {
672
49.2k
  if (resp_.has_error()) {
673
1.76k
    Status status = StatusFromPB(resp_.error().status());
674
675
    // Do not retry on a fatal error
676
1.76k
    TabletServerErrorPB::Code code = resp_.error().code();
677
1.76k
    switch (code) {
678
28
      case TabletServerErrorPB::TABLET_NOT_FOUND:
679
28
        LOG_WITH_PREFIX(WARNING)
680
28
            << "TS " << permanent_uuid_ << ": delete failed for tablet " << tablet_id_
681
28
            << " because the tablet was not found. No further retry: "
682
28
            << status.ToString();
683
28
        TransitionToCompleteState();
684
28
        break;
685
146
      case TabletServerErrorPB::CAS_FAILED:
686
146
        LOG_WITH_PREFIX(WARNING)
687
146
            << "TS " << permanent_uuid_ << ": delete failed for tablet " << tablet_id_
688
146
            << " due to a CAS failure. No further retry: " << status.ToString();
689
146
        TransitionToCompleteState();
690
146
        break;
691
0
      case TabletServerErrorPB::WRONG_SERVER_UUID:
692
0
        LOG_WITH_PREFIX(WARNING)
693
0
            << "TS " << permanent_uuid_ << ": delete failed for tablet " << tablet_id_
694
0
            << " due to an incorrect UUID. No further retry: " << status.ToString();
695
0
        TransitionToCompleteState();
696
0
        break;
697
1.58k
      default:
698
1.58k
        LOG_WITH_PREFIX(WARNING)
699
1.58k
            << "TS " << permanent_uuid_ << ": delete failed for tablet " << tablet_id_
700
1.58k
            << " with error code " << TabletServerErrorPB::Code_Name(code)
701
1.58k
            << ": " << status.ToString();
702
1.58k
        break;
703
47.5k
    }
704
47.5k
  } else {
705
47.5k
    if (table_) {
706
47.4k
      LOG_WITH_PREFIX(INFO)
707
47.4k
          << "TS " << permanent_uuid_ << ": tablet " << tablet_id_
708
47.4k
          << " (table " << table_->ToString() << ") successfully done";
709
22
    } else {
710
22
      LOG_WITH_PREFIX(WARNING)
711
22
          << "TS " << permanent_uuid_ << ": tablet " << tablet_id_
712
22
          << " did not belong to a known table, but was successfully deleted";
713
22
    }
714
47.5k
    TransitionToCompleteState();
715
18.4E
    VLOG_WITH_PREFIX(1) << "TS " << permanent_uuid_ << ": complete on tablet " << tablet_id_;
716
47.5k
  }
717
49.2k
}
718
719
153k
std::string AsyncDeleteReplica::description() const {
720
153k
  return Format("$0Tablet RPC for tablet $1 ($2) on TS=$3",
721
153k
                hide_only_ ? "Hide" : "Delete", tablet_id_, table_name(), permanent_uuid_);
722
153k
}
723
724
50.2k
bool AsyncDeleteReplica::SendRequest(int attempt) {
725
50.2k
  tserver::DeleteTabletRequestPB req;
726
50.2k
  req.set_dest_uuid(permanent_uuid_);
727
50.2k
  req.set_tablet_id(tablet_id_);
728
50.2k
  req.set_reason(reason_);
729
50.2k
  req.set_delete_type(delete_type_);
730
50.2k
  if (hide_only_) {
731
0
    req.set_hide_only(hide_only_);
732
0
  }
733
50.2k
  if (cas_config_opid_index_less_or_equal_) {
734
3.04k
    req.set_cas_config_opid_index_less_or_equal(*cas_config_opid_index_less_or_equal_);
735
3.04k
  }
736
737
50.2k
  ts_admin_proxy_->DeleteTabletAsync(req, &resp_, &rpc_, BindRpcCallback());
738
36
  VLOG_WITH_PREFIX(1) << "Send delete tablet request to " << permanent_uuid_
739
36
                      << " (attempt " << attempt << "):\n"
740
36
                      << req.DebugString();
741
50.2k
  return true;
742
50.2k
}
743
744
47.8k
void AsyncDeleteReplica::UnregisterAsyncTaskCallback() {
745
  // Only notify if we are in a success state.
746
47.8k
  if (state() == MonitoredTaskState::kComplete) {
747
47.8k
    master_->catalog_manager()->NotifyTabletDeleteFinished(permanent_uuid_, tablet_id_, table());
748
47.8k
  }
749
47.8k
}
750
751
// ============================================================================
752
//  Class AsyncAlterTable.
753
// ============================================================================
754
20.7k
void AsyncAlterTable::HandleResponse(int attempt) {
755
20.7k
  if (PREDICT_FALSE(FLAGS_TEST_slowdown_alter_table_rpcs_ms > 0)) {
756
0
    VLOG_WITH_PREFIX(1) << "Sleeping for " << tablet_->tablet_id()
757
0
                        << FLAGS_TEST_slowdown_alter_table_rpcs_ms
758
0
                        << "ms before returning response in async alter table request handler";
759
0
    SleepFor(MonoDelta::FromMilliseconds(FLAGS_TEST_slowdown_alter_table_rpcs_ms));
760
0
  }
761
762
20.7k
  if (resp_.has_error()) {
763
22
    Status status = StatusFromPB(resp_.error().status());
764
765
22
    LOG_WITH_PREFIX(WARNING) << "TS " << permanent_uuid() << " failed: "
766
22
                             << status << " for version " << schema_version_;
767
768
    // Do not retry on a fatal error
769
22
    switch (resp_.error().code()) {
770
0
      case TabletServerErrorPB::TABLET_NOT_FOUND:
771
0
      case TabletServerErrorPB::MISMATCHED_SCHEMA:
772
0
      case TabletServerErrorPB::TABLET_HAS_A_NEWER_SCHEMA:
773
0
        TransitionToCompleteState();
774
0
        break;
775
22
      default:
776
22
        break;
777
20.7k
    }
778
20.7k
  } else {
779
20.7k
    TransitionToCompleteState();
780
18
    VLOG_WITH_PREFIX(1)
781
18
        << "TS " << permanent_uuid() << " completed: for version " << schema_version_;
782
20.7k
  }
783
784
20.7k
  server::UpdateClock(resp_, master_->clock());
785
786
20.7k
  if (state() == MonitoredTaskState::kComplete) {
787
    // TODO: proper error handling here. Not critical, since TSHeartbeat will retry on failure.
788
20.7k
    WARN_NOT_OK(
789
20.7k
        master_->catalog_manager()->HandleTabletSchemaVersionReport(
790
20.7k
            tablet_.get(), schema_version_, table()),
791
20.7k
        Format(
792
20.7k
            "$0 failed while running AsyncAlterTable::HandleResponse. Response $1",
793
20.7k
            description(), resp_.ShortDebugString()));
794
18
  } else {
795
18.4E
    VLOG_WITH_PREFIX(1) << "Task is not completed " << tablet_->ToString() << " for version "
796
18.4E
                        << schema_version_;
797
18
  }
798
20.7k
}
799
800
18.2k
TableType AsyncAlterTable::table_type() const {
801
18.2k
  return tablet_->table()->GetTableType();
802
18.2k
}
803
804
18.2k
bool AsyncAlterTable::SendRequest(int attempt) {
805
19
  VLOG_WITH_PREFIX(1) << "Send alter table request to " << permanent_uuid() << " for "
806
19
                      << tablet_->tablet_id() << " waiting for a read lock.";
807
808
18.2k
  tablet::ChangeMetadataRequestPB req;
809
18.2k
  {
810
18.2k
    auto l = table_->LockForRead();
811
18.4E
    VLOG_WITH_PREFIX(1) << "Send alter table request to " << permanent_uuid() << " for "
812
18.4E
                        << tablet_->tablet_id() << " obtained the read lock.";
813
814
18.2k
    req.set_schema_version(l->pb.version());
815
18.2k
    req.set_dest_uuid(permanent_uuid());
816
18.2k
    req.set_tablet_id(tablet_->tablet_id());
817
18.2k
    req.set_alter_table_id(table_->id());
818
819
18.2k
    if (l->pb.has_wal_retention_secs()) {
820
2.55k
      req.set_wal_retention_secs(l->pb.wal_retention_secs());
821
2.55k
    }
822
823
18.2k
    req.mutable_schema()->CopyFrom(l->pb.schema());
824
18.2k
    req.set_new_table_name(l->pb.name());
825
18.2k
    req.mutable_indexes()->CopyFrom(l->pb.indexes());
826
18.2k
    req.set_propagated_hybrid_time(master_->clock()->Now().ToUint64());
827
828
18.2k
    if (table_type() == TableType::PGSQL_TABLE_TYPE && !transaction_id_.IsNil()) {
829
0
      VLOG_WITH_PREFIX(1) << "Transaction ID is provided for tablet " << tablet_->tablet_id()
830
0
          << " with ID " << transaction_id_.ToString() << " for ALTER TABLE operation";
831
325
      req.set_should_abort_active_txns(true);
832
325
      req.set_transaction_id(transaction_id_.ToString());
833
325
    }
834
835
18.2k
    schema_version_ = l->pb.version();
836
18.2k
  }
837
838
18.2k
  ts_admin_proxy_->AlterSchemaAsync(req, &resp_, &rpc_, BindRpcCallback());
839
28
  VLOG_WITH_PREFIX(1)
840
28
      << "Send alter table request to " << permanent_uuid() << " for " << tablet_->tablet_id()
841
28
      << " (attempt " << attempt << "):\n" << req.DebugString();
842
18.2k
  return true;
843
18.2k
}
844
845
2.46k
bool AsyncBackfillDone::SendRequest(int attempt) {
846
2
  VLOG_WITH_PREFIX(1)
847
2
      << "Send alter table request to " << permanent_uuid() << " for " << tablet_->tablet_id()
848
2
      << " version " << schema_version_ << " waiting for a read lock.";
849
850
2.46k
  tablet::ChangeMetadataRequestPB req;
851
2.46k
  {
852
2.46k
    auto l = table_->LockForRead();
853
18.4E
    VLOG_WITH_PREFIX(1)
854
18.4E
        << "Send alter table request to " << permanent_uuid() << " for " << tablet_->tablet_id()
855
18.4E
        << " version " << schema_version_ << " obtained the read lock.";
856
857
2.46k
    req.set_backfill_done_table_id(table_id_);
858
2.46k
    req.set_dest_uuid(permanent_uuid());
859
2.46k
    req.set_tablet_id(tablet_->tablet_id());
860
2.46k
    req.set_propagated_hybrid_time(master_->clock()->Now().ToUint64());
861
2.46k
    req.set_mark_backfill_done(true);
862
2.46k
    schema_version_ = l->pb.version();
863
2.46k
  }
864
865
2.46k
  ts_admin_proxy_->BackfillDoneAsync(req, &resp_, &rpc_, BindRpcCallback());
866
4
  VLOG_WITH_PREFIX(1)
867
4
      << "Send backfill done request to " << permanent_uuid() << " for " << tablet_->tablet_id()
868
4
      << " (attempt " << attempt << "):\n" << req.DebugString();
869
2.46k
  return true;
870
2.46k
}
871
872
// ============================================================================
873
//  Class AsyncCopartitionTable.
874
// ============================================================================
875
AsyncCopartitionTable::AsyncCopartitionTable(Master *master,
876
                                             ThreadPool* callback_pool,
877
                                             const scoped_refptr<TabletInfo>& tablet,
878
                                             const scoped_refptr<TableInfo>& table)
879
    : RetryingTSRpcTask(master,
880
                        callback_pool,
881
                        std::unique_ptr<TSPicker>(new PickLeaderReplica(tablet)),
882
                        table.get()),
883
0
      tablet_(tablet), table_(table) {
884
0
}
885
886
0
string AsyncCopartitionTable::description() const {
887
0
  return "Copartition Table RPC for tablet " + tablet_->ToString()
888
0
          + " for " + table_->ToString();
889
0
}
890
891
0
TabletId AsyncCopartitionTable::tablet_id() const {
892
0
  return tablet_->tablet_id();
893
0
}
894
895
0
TabletServerId AsyncCopartitionTable::permanent_uuid() const {
896
0
  return target_ts_desc_ != nullptr ? target_ts_desc_->permanent_uuid() : "";
897
0
}
898
899
// TODO(sagnik): modify this to fill all relevant fields for the AsyncCopartition request.
900
0
bool AsyncCopartitionTable::SendRequest(int attempt) {
901
902
0
  tserver::CopartitionTableRequestPB req;
903
0
  req.set_dest_uuid(permanent_uuid());
904
0
  req.set_tablet_id(tablet_->tablet_id());
905
0
  req.set_table_id(table_->id());
906
0
  req.set_table_name(table_->name());
907
908
0
  ts_admin_proxy_->CopartitionTableAsync(req, &resp_, &rpc_, BindRpcCallback());
909
0
  VLOG_WITH_PREFIX(1) << "Send copartition table request to " << permanent_uuid()
910
0
                      << " (attempt " << attempt << "):\n" << req.DebugString();
911
0
  return true;
912
0
}
913
914
// TODO(sagnik): modify this to handle the AsyncCopartition Response and retry fail as necessary.
915
0
void AsyncCopartitionTable::HandleResponse(int attempt) {
916
0
  LOG_WITH_PREFIX(INFO) << "master can't handle server responses yet";
917
0
}
918
919
// ============================================================================
920
//  Class AsyncTruncate.
921
// ============================================================================
922
53.5k
void AsyncTruncate::HandleResponse(int attempt) {
923
53.5k
  if (resp_.has_error()) {
924
13
    const Status s = StatusFromPB(resp_.error().status());
925
13
    const TabletServerErrorPB::Code code = resp_.error().code();
926
13
    LOG_WITH_PREFIX(WARNING)
927
13
        << "TS " << permanent_uuid() << ": truncate failed for tablet " << tablet_id()
928
13
        << " with error code " << TabletServerErrorPB::Code_Name(code) << ": " << s;
929
53.5k
  } else {
930
21
    VLOG_WITH_PREFIX(1)
931
21
        << "TS " << permanent_uuid() << ": truncate complete on tablet " << tablet_id();
932
53.5k
    TransitionToCompleteState();
933
53.5k
  }
934
935
53.5k
  server::UpdateClock(resp_, master_->clock());
936
53.5k
}
937
938
53.5k
bool AsyncTruncate::SendRequest(int attempt) {
939
53.5k
  tserver::TruncateRequestPB req;
940
53.5k
  req.set_tablet_id(tablet_id());
941
53.5k
  req.set_propagated_hybrid_time(master_->clock()->Now().ToUint64());
942
53.5k
  ts_proxy_->TruncateAsync(req, &resp_, &rpc_, BindRpcCallback());
943
250
  VLOG_WITH_PREFIX(1) << "Send truncate tablet request to " << permanent_uuid()
944
250
                      << " (attempt " << attempt << "):\n" << req.DebugString();
945
53.5k
  return true;
946
53.5k
}
947
948
// ============================================================================
949
//  Class CommonInfoForRaftTask.
950
// ============================================================================
951
CommonInfoForRaftTask::CommonInfoForRaftTask(
952
    Master* master, ThreadPool* callback_pool, const scoped_refptr<TabletInfo>& tablet,
953
    const consensus::ConsensusStatePB& cstate, const string& change_config_ts_uuid)
954
    : RetryingTSRpcTask(
955
          master, callback_pool, std::unique_ptr<TSPicker>(new PickLeaderReplica(tablet)),
956
          tablet->table()),
957
      tablet_(tablet),
958
      cstate_(cstate),
959
7.97k
      change_config_ts_uuid_(change_config_ts_uuid) {
960
7.97k
  deadline_ = MonoTime::Max();  // Never time out.
961
7.97k
}
962
963
3.70k
CommonInfoForRaftTask::~CommonInfoForRaftTask() = default;
964
965
641
TabletId CommonInfoForRaftTask::tablet_id() const {
966
641
  return tablet_->tablet_id();
967
641
}
968
969
30.3k
TabletServerId CommonInfoForRaftTask::permanent_uuid() const {
970
30.3k
  return target_ts_desc_ != nullptr ? target_ts_desc_->permanent_uuid() : "";
971
30.3k
}
972
973
// ============================================================================
974
//  Class AsyncChangeConfigTask.
975
// ============================================================================
976
8.28k
string AsyncChangeConfigTask::description() const {
977
8.28k
  return Format(
978
8.28k
      "$0 RPC for tablet $1 ($2) on peer $3 with cas_config_opid_index $4", type_name(),
979
8.28k
      tablet_->tablet_id(), table_name(), permanent_uuid(), cstate_.config().opid_index());
980
8.28k
}
981
982
2.54k
bool AsyncChangeConfigTask::SendRequest(int attempt) {
983
  // Bail if we're retrying in vain.
984
2.54k
  int64_t latest_index;
985
2.54k
  {
986
2.54k
    auto tablet_lock = tablet_->LockForRead();
987
2.54k
    latest_index = tablet_lock->pb.committed_consensus_state().config().opid_index();
988
    // Adding this logic for a race condition that occurs in this scenario:
989
    // 1. CatalogManager receives a DeleteTable request and sends DeleteTablet requests to the
990
    // tservers, but doesn't yet update the tablet in memory state to not running.
991
    // 2. The CB runs and sees that this tablet is still running, sees that it is over-replicated
992
    // (since the placement now dictates it should have 0 replicas),
993
    // but before it can send the ChangeConfig RPC to a tserver.
994
    // 3. That tserver processes the DeleteTablet request.
995
    // 4. The ChangeConfig RPC now returns tablet not found,
996
    // which prompts an infinite retry of the RPC.
997
2.54k
    bool tablet_running = tablet_lock->is_running();
998
2.54k
    if (!tablet_running) {
999
0
      AbortTask(STATUS(Aborted, "Tablet is not running"));
1000
0
      return false;
1001
0
    }
1002
2.54k
  }
1003
2.54k
  if (latest_index > cstate_.config().opid_index()) {
1004
208
    auto status = STATUS_FORMAT(
1005
208
        Aborted,
1006
208
        "Latest config for has opid_index of $0 while this task has opid_index of $1",
1007
208
        latest_index, cstate_.config().opid_index());
1008
208
    LOG_WITH_PREFIX(INFO) << status;
1009
208
    AbortTask(status);
1010
208
    return false;
1011
208
  }
1012
1013
  // Logging should be covered inside based on failure reasons.
1014
2.33k
  auto prepare_status = PrepareRequest(attempt);
1015
2.33k
  if (!prepare_status.ok()) {
1016
0
    AbortTask(prepare_status);
1017
0
    return false;
1018
0
  }
1019
1020
2.33k
  consensus_proxy_->ChangeConfigAsync(req_, &resp_, &rpc_, BindRpcCallback());
1021
0
  VLOG_WITH_PREFIX(1) << "Task " << description() << " sent request:\n" << req_.DebugString();
1022
2.33k
  return true;
1023
2.33k
}
1024
1025
2.31k
void AsyncChangeConfigTask::HandleResponse(int attempt) {
1026
2.31k
  if (!resp_.has_error()) {
1027
1.64k
    TransitionToCompleteState();
1028
1.64k
    LOG_WITH_PREFIX(INFO) << Substitute(
1029
1.64k
        "Change config succeeded on leader TS $0 for tablet $1 with type $2 for replica $3",
1030
1.64k
        permanent_uuid(), tablet_->tablet_id(), type_name(), change_config_ts_uuid_);
1031
1.64k
    return;
1032
1.64k
  }
1033
1034
673
  Status status = StatusFromPB(resp_.error().status());
1035
1036
  // Do not retry on some known errors, otherwise retry forever or until cancelled.
1037
673
  switch (resp_.error().code()) {
1038
1
    case TabletServerErrorPB::CAS_FAILED:
1039
1
    case TabletServerErrorPB::ADD_CHANGE_CONFIG_ALREADY_PRESENT:
1040
1
    case TabletServerErrorPB::REMOVE_CHANGE_CONFIG_NOT_PRESENT:
1041
309
    case TabletServerErrorPB::NOT_THE_LEADER:
1042
309
      LOG_WITH_PREFIX(WARNING) << "ChangeConfig() failed on leader " << permanent_uuid()
1043
309
                               << ". No further retry: " << status.ToString();
1044
309
      TransitionToCompleteState();
1045
309
      break;
1046
364
    default:
1047
364
      LOG_WITH_PREFIX(INFO) << "ChangeConfig() failed on leader " << permanent_uuid()
1048
364
                            << " due to error "
1049
364
                            << TabletServerErrorPB::Code_Name(resp_.error().code())
1050
364
                            << ". This operation will be retried. Error detail: "
1051
364
                            << status.ToString();
1052
364
      break;
1053
673
  }
1054
673
}
1055
1056
// ============================================================================
1057
//  Class AsyncAddServerTask.
1058
// ============================================================================
1059
1.15k
Status AsyncAddServerTask::PrepareRequest(int attempt) {
1060
  // Select the replica we wish to add to the config.
1061
  // Do not include current members of the config.
1062
1.15k
  std::unordered_set<string> replica_uuids;
1063
3.62k
  for (const RaftPeerPB& peer : cstate_.config().peers()) {
1064
3.62k
    InsertOrDie(&replica_uuids, peer.permanent_uuid());
1065
3.62k
  }
1066
1.15k
  TSDescriptorVector ts_descs;
1067
1.15k
  master_->ts_manager()->GetAllLiveDescriptors(&ts_descs);
1068
1.15k
  shared_ptr<TSDescriptor> replacement_replica;
1069
2.59k
  for (auto ts_desc : ts_descs) {
1070
2.59k
    if (ts_desc->permanent_uuid() == change_config_ts_uuid_) {
1071
      // This is given by the client, so we assume it is a well chosen uuid.
1072
1.15k
      replacement_replica = ts_desc;
1073
1.15k
      break;
1074
1.15k
    }
1075
2.59k
  }
1076
1.15k
  if (PREDICT_FALSE(!replacement_replica)) {
1077
0
    auto status = STATUS_FORMAT(
1078
0
        TimedOut, "Could not find desired replica $0 in live set", change_config_ts_uuid_);
1079
0
    LOG_WITH_PREFIX(WARNING) << status;
1080
0
    return status;
1081
0
  }
1082
1083
1.15k
  req_.set_dest_uuid(permanent_uuid());
1084
1.15k
  req_.set_tablet_id(tablet_->tablet_id());
1085
1.15k
  req_.set_type(consensus::ADD_SERVER);
1086
1.15k
  req_.set_cas_config_opid_index(cstate_.config().opid_index());
1087
1.15k
  RaftPeerPB* peer = req_.mutable_server();
1088
1.15k
  peer->set_permanent_uuid(replacement_replica->permanent_uuid());
1089
1.15k
  peer->set_member_type(member_type_);
1090
1.15k
  TSRegistrationPB peer_reg = replacement_replica->GetRegistration();
1091
1092
1.15k
  if (peer_reg.common().private_rpc_addresses().empty()) {
1093
0
    auto status = STATUS_FORMAT(
1094
0
        IllegalState, "Candidate replacement $0 has no registered rpc address: $1",
1095
0
        replacement_replica->permanent_uuid(), peer_reg);
1096
0
    YB_LOG_EVERY_N(WARNING, 100) << LogPrefix() << status;
1097
0
    return status;
1098
0
  }
1099
1100
1.15k
  TakeRegistration(peer_reg.mutable_common(), peer);
1101
1102
1.15k
  return Status::OK();
1103
1.15k
}
1104
1105
// ============================================================================
1106
//  Class AsyncRemoveServerTask.
1107
// ============================================================================
1108
1.18k
Status AsyncRemoveServerTask::PrepareRequest(int attempt) {
1109
1.18k
  bool found = false;
1110
4.59k
  for (const RaftPeerPB& peer : cstate_.config().peers()) {
1111
4.59k
    if (change_config_ts_uuid_ == peer.permanent_uuid()) {
1112
1.18k
      found = true;
1113
1.18k
    }
1114
4.59k
  }
1115
1116
1.18k
  if (!found) {
1117
0
    auto status = STATUS_FORMAT(
1118
0
        NotFound, "Asked to remove TS with uuid $0 but could not find it in config peers!",
1119
0
        change_config_ts_uuid_);
1120
0
    LOG_WITH_PREFIX(WARNING) << status;
1121
0
    return status;
1122
0
  }
1123
1124
1.18k
  req_.set_dest_uuid(permanent_uuid());
1125
1.18k
  req_.set_tablet_id(tablet_->tablet_id());
1126
1.18k
  req_.set_type(consensus::REMOVE_SERVER);
1127
1.18k
  req_.set_cas_config_opid_index(cstate_.config().opid_index());
1128
1.18k
  RaftPeerPB* peer = req_.mutable_server();
1129
1.18k
  peer->set_permanent_uuid(change_config_ts_uuid_);
1130
1131
1.18k
  return Status::OK();
1132
1.18k
}
1133
1134
// ============================================================================
1135
//  Class AsyncTryStepDown.
1136
// ============================================================================
1137
5.82k
Status AsyncTryStepDown::PrepareRequest(int attempt) {
1138
5.82k
  LOG_WITH_PREFIX(INFO) << Substitute("Prep Leader step down $0, leader_uuid=$1, change_ts_uuid=$2",
1139
5.82k
                                      attempt, permanent_uuid(), change_config_ts_uuid_);
1140
5.82k
  if (attempt > 1) {
1141
7
    return STATUS(RuntimeError, "Retry is not allowed");
1142
7
  }
1143
1144
  // If we were asked to remove the server even if it is the leader, we have to call StepDown, but
1145
  // only if our current leader is the server we are asked to remove.
1146
5.81k
  if (permanent_uuid() != change_config_ts_uuid_) {
1147
0
    auto status = STATUS_FORMAT(
1148
0
        IllegalState,
1149
0
        "Incorrect state config leader $0 does not match target uuid $1 for a leader step down op",
1150
0
        permanent_uuid(), change_config_ts_uuid_);
1151
0
    LOG_WITH_PREFIX(WARNING) << status;
1152
0
    return status;
1153
0
  }
1154
1155
5.81k
  stepdown_req_.set_dest_uuid(change_config_ts_uuid_);
1156
5.81k
  stepdown_req_.set_tablet_id(tablet_->tablet_id());
1157
5.81k
  if (!new_leader_uuid_.empty()) {
1158
5.49k
    stepdown_req_.set_new_leader_uuid(new_leader_uuid_);
1159
5.49k
  }
1160
1161
5.81k
  return Status::OK();
1162
5.81k
}
1163
1164
5.82k
bool AsyncTryStepDown::SendRequest(int attempt) {
1165
5.82k
  auto prepare_status = PrepareRequest(attempt);
1166
5.82k
  if (!prepare_status.ok()) {
1167
7
    AbortTask(prepare_status);
1168
7
    return false;
1169
7
  }
1170
1171
5.81k
  LOG_WITH_PREFIX(INFO) << Substitute("Stepping down leader $0 for tablet $1",
1172
5.81k
                                      change_config_ts_uuid_, tablet_->tablet_id());
1173
5.81k
  consensus_proxy_->LeaderStepDownAsync(
1174
5.81k
      stepdown_req_, &stepdown_resp_, &rpc_, BindRpcCallback());
1175
1176
5.81k
  return true;
1177
5.81k
}
1178
1179
5.80k
void AsyncTryStepDown::HandleResponse(int attempt) {
1180
5.80k
  if (!rpc_.status().ok()) {
1181
0
    AbortTask(rpc_.status());
1182
0
    LOG_WITH_PREFIX(WARNING) << Substitute(
1183
0
        "Got error on stepdown for tablet $0 with leader $1, attempt $2 and error $3",
1184
0
        tablet_->tablet_id(), permanent_uuid(), attempt, rpc_.status().ToString());
1185
1186
0
    return;
1187
0
  }
1188
1189
5.80k
  TransitionToCompleteState();
1190
5.80k
  const bool stepdown_failed = stepdown_resp_.error().status().code() != AppStatusPB::OK;
1191
5.80k
  LOG_WITH_PREFIX(INFO) << Format(
1192
5.80k
      "Leader step down done attempt=$0, leader_uuid=$1, change_uuid=$2, "
1193
5.80k
      "error=$3, failed=$4, should_remove=$5 for tablet $6.",
1194
5.80k
      attempt, permanent_uuid(), change_config_ts_uuid_, stepdown_resp_.error(),
1195
5.80k
      stepdown_failed, should_remove_, tablet_->tablet_id());
1196
1197
5.80k
  if (stepdown_failed) {
1198
5.80k
    tablet_->RegisterLeaderStepDownFailure(change_config_ts_uuid_,
1199
5.80k
        MonoDelta::FromMilliseconds(stepdown_resp_.has_time_since_election_failure_ms() ?
1200
5.50k
                                    stepdown_resp_.time_since_election_failure_ms() : 0));
1201
5.80k
  }
1202
1203
5.80k
  if (should_remove_) {
1204
322
    auto task = std::make_shared<AsyncRemoveServerTask>(
1205
322
        master_, callback_pool_, tablet_, cstate_, change_config_ts_uuid_);
1206
1207
322
    tablet_->table()->AddTask(task);
1208
322
    Status status = task->Run();
1209
322
    WARN_NOT_OK(status, "Failed to send new RemoveServer request");
1210
322
  }
1211
5.80k
}
1212
1213
// ============================================================================
1214
//  Class AsyncAddTableToTablet.
1215
// ============================================================================
1216
AsyncAddTableToTablet::AsyncAddTableToTablet(
1217
    Master* master, ThreadPool* callback_pool, const scoped_refptr<TabletInfo>& tablet,
1218
    const scoped_refptr<TableInfo>& table)
1219
    : RetryingTSRpcTask(
1220
          master, callback_pool, std::make_unique<PickLeaderReplica>(tablet), table.get()),
1221
      tablet_(tablet),
1222
      table_(table),
1223
28
      tablet_id_(tablet->tablet_id()) {
1224
28
  req_.set_tablet_id(tablet->id());
1225
28
  auto& add_table = *req_.mutable_add_table();
1226
28
  add_table.set_table_id(table_->id());
1227
28
  add_table.set_table_name(table_->name());
1228
28
  add_table.set_table_type(table_->GetTableType());
1229
28
  {
1230
28
    auto l = table->LockForRead();
1231
28
    add_table.set_schema_version(l->pb.version());
1232
28
    *add_table.mutable_schema() = l->pb.schema();
1233
28
    *add_table.mutable_partition_schema() = l->pb.partition_schema();
1234
28
  }
1235
28
}
1236
1237
59
string AsyncAddTableToTablet::description() const {
1238
59
  return Substitute("AddTableToTablet RPC ($0) ($1)", table_->ToString(), tablet_->ToString());
1239
59
}
1240
1241
27
void AsyncAddTableToTablet::HandleResponse(int attempt) {
1242
27
  if (!rpc_.status().ok()) {
1243
0
    AbortTask(rpc_.status());
1244
0
    LOG_WITH_PREFIX(WARNING) << Substitute(
1245
0
        "Got error when adding table $0 to tablet $1, attempt $2 and error $3",
1246
0
        table_->ToString(), tablet_->ToString(), attempt, rpc_.status().ToString());
1247
0
    return;
1248
0
  }
1249
27
  if (resp_.has_error()) {
1250
0
    LOG_WITH_PREFIX(WARNING) << "AddTableToTablet() responded with error code "
1251
0
                             << TabletServerErrorPB_Code_Name(resp_.error().code());
1252
0
    switch (resp_.error().code()) {
1253
0
      case TabletServerErrorPB::LEADER_NOT_READY_TO_SERVE: FALLTHROUGH_INTENDED;
1254
0
      case TabletServerErrorPB::NOT_THE_LEADER:
1255
0
        TransitionToWaitingState(MonitoredTaskState::kRunning);
1256
0
        break;
1257
0
      default:
1258
0
        TransitionToCompleteState();
1259
0
        break;
1260
0
    }
1261
1262
0
    return;
1263
0
  }
1264
1265
27
  TransitionToCompleteState();
1266
27
}
1267
1268
27
bool AsyncAddTableToTablet::SendRequest(int attempt) {
1269
27
  ts_admin_proxy_->AddTableToTabletAsync(req_, &resp_, &rpc_, BindRpcCallback());
1270
0
  VLOG_WITH_PREFIX(1)
1271
0
      << "Send AddTableToTablet request (attempt " << attempt << "):\n" << req_.DebugString();
1272
27
  return true;
1273
27
}
1274
1275
// ============================================================================
1276
//  Class AsyncRemoveTableFromTablet.
1277
// ============================================================================
1278
AsyncRemoveTableFromTablet::AsyncRemoveTableFromTablet(
1279
    Master* master, ThreadPool* callback_pool, const scoped_refptr<TabletInfo>& tablet,
1280
    const scoped_refptr<TableInfo>& table)
1281
    : RetryingTSRpcTask(
1282
          master, callback_pool, std::make_unique<PickLeaderReplica>(tablet), table.get()),
1283
      table_(table),
1284
      tablet_(tablet),
1285
15
      tablet_id_(tablet->tablet_id()) {
1286
15
  req_.set_tablet_id(tablet->id());
1287
15
  req_.set_remove_table_id(table->id());
1288
15
}
1289
1290
30
string AsyncRemoveTableFromTablet::description() const {
1291
30
  return Substitute("RemoveTableFromTablet RPC ($0) ($1)", table_->ToString(), tablet_->ToString());
1292
30
}
1293
1294
15
void AsyncRemoveTableFromTablet::HandleResponse(int attempt) {
1295
15
  if (!rpc_.status().ok()) {
1296
0
    AbortTask(rpc_.status());
1297
0
    LOG_WITH_PREFIX(WARNING) << Substitute(
1298
0
        "Got error when removing table $0 from tablet $1, attempt $2 and error $3",
1299
0
        table_->ToString(), tablet_->ToString(), attempt, rpc_.status().ToString());
1300
0
    return;
1301
0
  }
1302
15
  if (resp_.has_error()) {
1303
0
    LOG_WITH_PREFIX(WARNING) << "RemoveTableFromTablet() responded with error code "
1304
0
                             << TabletServerErrorPB_Code_Name(resp_.error().code());
1305
0
    switch (resp_.error().code()) {
1306
0
      case TabletServerErrorPB::LEADER_NOT_READY_TO_SERVE: FALLTHROUGH_INTENDED;
1307
0
      case TabletServerErrorPB::NOT_THE_LEADER:
1308
0
        TransitionToWaitingState(MonitoredTaskState::kRunning);
1309
0
        break;
1310
0
      default:
1311
0
        TransitionToCompleteState();
1312
0
        break;
1313
15
    }
1314
15
  } else {
1315
15
    TransitionToCompleteState();
1316
15
  }
1317
15
}
1318
1319
15
bool AsyncRemoveTableFromTablet::SendRequest(int attempt) {
1320
15
  ts_admin_proxy_->RemoveTableFromTabletAsync(req_, &resp_, &rpc_, BindRpcCallback());
1321
0
  VLOG_WITH_PREFIX(1) << "Send RemoveTableFromTablet request (attempt " << attempt << "):\n"
1322
0
                      << req_.DebugString();
1323
15
  return true;
1324
15
}
1325
1326
namespace {
1327
1328
1
bool IsDefinitelyPermanentError(const Status& s) {
1329
1
  return s.IsInvalidArgument() || s.IsNotFound();
1330
1
}
1331
1332
} // namespace
1333
1334
// ============================================================================
1335
//  Class AsyncGetTabletSplitKey.
1336
// ============================================================================
1337
AsyncGetTabletSplitKey::AsyncGetTabletSplitKey(
1338
    Master* master, ThreadPool* callback_pool, const scoped_refptr<TabletInfo>& tablet,
1339
    DataCallbackType result_cb)
1340
45
    : AsyncTabletLeaderTask(master, callback_pool, tablet), result_cb_(result_cb) {
1341
45
  req_.set_tablet_id(tablet_id());
1342
45
}
1343
1344
46
void AsyncGetTabletSplitKey::HandleResponse(int attempt) {
1345
46
  if (resp_.has_error()) {
1346
1
    const Status s = StatusFromPB(resp_.error().status());
1347
1
    const TabletServerErrorPB::Code code = resp_.error().code();
1348
1
    LOG_WITH_PREFIX(WARNING) << "TS " << permanent_uuid() << ": GetSplitKey (attempt " << attempt
1349
1
                             << ") failed for tablet " << tablet_id() << " with error code "
1350
1
                             << TabletServerErrorPB::Code_Name(code) << ": " << s;
1351
1
    if (IsDefinitelyPermanentError(s) || s.IsIllegalState()) {
1352
      // It can happen that tablet leader has completed post-split compaction after previous split,
1353
      // but followers have not yet completed post-split compaction.
1354
      // Catalog manager decides to split again and sends GetTabletSplitKey RPC, but tablet leader
1355
      // changes due to some reason and new tablet leader is not yet compacted.
1356
      // In this case we get IllegalState error and we don't want to retry until post-split
1357
      // compaction happened on leader. Once post-split compaction is done, CatalogManager will
1358
      // resend RPC.
1359
      //
1360
      // Another case for IsIllegalState is trying to split a tablet that has all the data with
1361
      // the same hash_code or the same doc_key, in this case we also don't want to retry RPC
1362
      // automatically.
1363
      // See https://github.com/yugabyte/yugabyte-db/issues/9159.
1364
0
      TransitionToFailedState(state(), s);
1365
0
    }
1366
45
  } else {
1367
0
    VLOG_WITH_PREFIX(1)
1368
0
        << "TS " << permanent_uuid() << ": got split key for tablet " << tablet_id();
1369
45
    TransitionToCompleteState();
1370
45
  }
1371
1372
46
  server::UpdateClock(resp_, master_->clock());
1373
46
}
1374
1375
46
bool AsyncGetTabletSplitKey::SendRequest(int attempt) {
1376
46
  req_.set_propagated_hybrid_time(master_->clock()->Now().ToUint64());
1377
46
  ts_proxy_->GetSplitKeyAsync(req_, &resp_, &rpc_, BindRpcCallback());
1378
0
  VLOG_WITH_PREFIX(1)
1379
0
      << "Sent get split key request to " << permanent_uuid() << " (attempt " << attempt << "):\n"
1380
0
      << req_.DebugString();
1381
46
  return true;
1382
46
}
1383
1384
45
void AsyncGetTabletSplitKey::Finished(const Status& status) {
1385
45
  if (result_cb_) {
1386
45
    if (status.ok()) {
1387
45
      result_cb_(Data{resp_.split_encoded_key(), resp_.split_partition_key()});
1388
0
    } else {
1389
0
      result_cb_(status);
1390
0
    }
1391
45
  }
1392
45
}
1393
1394
// ============================================================================
1395
//  Class AsyncSplitTablet.
1396
// ============================================================================
1397
AsyncSplitTablet::AsyncSplitTablet(
1398
    Master* master, ThreadPool* callback_pool, const scoped_refptr<TabletInfo>& tablet,
1399
    const std::array<TabletId, kNumSplitParts>& new_tablet_ids,
1400
    const std::string& split_encoded_key, const std::string& split_partition_key,
1401
    TabletSplitCompleteHandlerIf* tablet_split_complete_handler)
1402
    : AsyncTabletLeaderTask(master, callback_pool, tablet),
1403
43
      tablet_split_complete_handler_(tablet_split_complete_handler) {
1404
43
  req_.set_tablet_id(tablet_id());
1405
43
  req_.set_new_tablet1_id(new_tablet_ids[0]);
1406
43
  req_.set_new_tablet2_id(new_tablet_ids[1]);
1407
43
  req_.set_split_encoded_key(split_encoded_key);
1408
43
  req_.set_split_partition_key(split_partition_key);
1409
43
}
1410
1411
42
void AsyncSplitTablet::HandleResponse(int attempt) {
1412
42
  if (resp_.has_error()) {
1413
25
    const Status s = StatusFromPB(resp_.error().status());
1414
25
    const TabletServerErrorPB::Code code = resp_.error().code();
1415
25
    LOG_WITH_PREFIX(WARNING) << "TS " << permanent_uuid() << ": split (attempt " << attempt
1416
25
                             << ") failed for tablet " << tablet_id() << " with error code "
1417
25
                             << TabletServerErrorPB::Code_Name(code) << ": " << s;
1418
25
    if (s.IsAlreadyPresent()) {
1419
25
      TransitionToCompleteState();
1420
0
    } else if (IsDefinitelyPermanentError(s)) {
1421
0
      TransitionToFailedState(state(), s);
1422
0
    }
1423
17
  } else {
1424
0
    VLOG_WITH_PREFIX(1)
1425
0
        << "TS " << permanent_uuid() << ": split complete on tablet " << tablet_id();
1426
17
    TransitionToCompleteState();
1427
17
  }
1428
1429
42
  server::UpdateClock(resp_, master_->clock());
1430
42
}
1431
1432
43
bool AsyncSplitTablet::SendRequest(int attempt) {
1433
43
  req_.set_dest_uuid(permanent_uuid());
1434
43
  req_.set_propagated_hybrid_time(master_->clock()->Now().ToUint64());
1435
43
  ts_admin_proxy_->SplitTabletAsync(req_, &resp_, &rpc_, BindRpcCallback());
1436
0
  VLOG_WITH_PREFIX(1)
1437
0
      << "Sent split tablet request to " << permanent_uuid() << " (attempt " << attempt << "):\n"
1438
0
      << req_.DebugString();
1439
43
  return true;
1440
43
}
1441
1442
43
void AsyncSplitTablet::Finished(const Status& status) {
1443
43
  if (tablet_split_complete_handler_) {
1444
43
    SplitTabletIds split_tablet_ids {
1445
43
      .source = req_.tablet_id(),
1446
43
      .children = {req_.new_tablet1_id(), req_.new_tablet2_id()}
1447
43
    };
1448
43
    tablet_split_complete_handler_->ProcessSplitTabletResult(
1449
43
        status, table_->id(), split_tablet_ids);
1450
43
  }
1451
43
}
1452
1453
}  // namespace master
1454
}  // namespace yb