YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/master/async_rpc_tasks.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include "yb/master/async_rpc_tasks.h"
15
16
#include "yb/common/wire_protocol.h"
17
18
#include "yb/consensus/consensus.proxy.h"
19
#include "yb/consensus/consensus_meta.h"
20
21
#include "yb/gutil/map-util.h"
22
23
#include "yb/master/catalog_entity_info.h"
24
#include "yb/master/catalog_manager_if.h"
25
#include "yb/master/master.h"
26
#include "yb/master/tablet_split_complete_handler.h"
27
#include "yb/master/ts_descriptor.h"
28
#include "yb/master/ts_manager.h"
29
30
#include "yb/rpc/messenger.h"
31
32
#include "yb/tserver/backup.proxy.h"
33
#include "yb/tserver/tserver_admin.proxy.h"
34
#include "yb/tserver/tserver_service.proxy.h"
35
36
#include "yb/util/atomic.h"
37
#include "yb/util/flag_tags.h"
38
#include "yb/util/metrics.h"
39
#include "yb/util/source_location.h"
40
#include "yb/util/status_format.h"
41
#include "yb/util/status_log.h"
42
#include "yb/util/thread_restrictions.h"
43
#include "yb/util/threadpool.h"
44
45
using namespace std::literals;
46
47
DEFINE_int32(unresponsive_ts_rpc_timeout_ms, 15 * 60 * 1000,  // 15 minutes
48
             "After this amount of time (or after we have retried unresponsive_ts_rpc_retry_limit "
49
             "times, whichever happens first), the master will stop attempting to contact a tablet "
50
             "server in order to perform operations such as deleting a tablet.");
51
TAG_FLAG(unresponsive_ts_rpc_timeout_ms, advanced);
52
53
DEFINE_int32(unresponsive_ts_rpc_retry_limit, 20,
54
             "After this number of retries (or unresponsive_ts_rpc_timeout_ms expires, whichever "
55
             "happens first), the master will stop attempting to contact a tablet server in order "
56
             "to perform operations such as deleting a tablet.");
57
TAG_FLAG(unresponsive_ts_rpc_retry_limit, advanced);
58
59
DEFINE_int32(retrying_ts_rpc_max_delay_ms, 60 * 1000,
60
             "Maximum delay between successive attempts to contact an unresponsive tablet server");
61
TAG_FLAG(retrying_ts_rpc_max_delay_ms, advanced);
62
63
DEFINE_test_flag(int32, slowdown_master_async_rpc_tasks_by_ms, 0,
64
                 "For testing purposes, slow down the run method to take longer.");
65
66
// The flags are defined in catalog_manager.cc.
67
DECLARE_int32(master_ts_rpc_timeout_ms);
68
DECLARE_int32(tablet_creation_timeout_ms);
69
DECLARE_int32(TEST_slowdown_alter_table_rpcs_ms);
70
71
namespace yb {
72
namespace master {
73
74
using namespace std::placeholders;
75
76
using std::string;
77
using std::shared_ptr;
78
79
using strings::Substitute;
80
using consensus::RaftPeerPB;
81
using server::MonitoredTaskState;
82
using tserver::TabletServerErrorPB;
83
84
void RetryingTSRpcTask::UpdateMetrics(scoped_refptr<Histogram> metric, MonoTime start_time,
85
                                      const std::string& metric_name,
86
884k
                                      const std::string& metric_type) {
87
884k
  metric->Increment(MonoTime::Now().GetDeltaSince(start_time).ToMicroseconds());
88
884k
}
89
90
// ============================================================================
91
//  Class PickSpecificUUID.
92
// ============================================================================
93
304k
Status PickSpecificUUID::PickReplica(TSDescriptor** ts_desc) {
94
304k
  shared_ptr<TSDescriptor> ts;
95
304k
  if (!master_->ts_manager()->LookupTSByUUID(ts_uuid_, &ts)) {
96
0
    return STATUS(NotFound, "unknown tablet server id", ts_uuid_);
97
0
  }
98
304k
  *ts_desc = ts.get();
99
304k
  return Status::OK();
100
304k
}
101
102
0
string ReplicaMapToString(const TabletReplicaMap& replicas) {
103
0
  string ret = "";
104
0
  for (const auto& r : replicas) {
105
0
    if (!ret.empty()) {
106
0
      ret += ", ";
107
0
    } else {
108
0
      ret += "(";
109
0
    }
110
0
    ret += r.second.ts_desc->permanent_uuid();
111
0
  }
112
0
  ret += ")";
113
0
  return ret;
114
0
}
115
116
// ============================================================================
117
//  Class PickLeaderReplica.
118
// ============================================================================
119
PickLeaderReplica::PickLeaderReplica(const scoped_refptr<TabletInfo>& tablet)
120
157k
    : tablet_(tablet) {
121
157k
}
122
123
161k
Status PickLeaderReplica::PickReplica(TSDescriptor** ts_desc) {
124
161k
  *ts_desc = 
VERIFY_RESULT158k
(158k
tablet_->GetLeader());
125
0
  return Status::OK();
126
161k
}
127
128
// ============================================================================
129
//  Class RetryingTSRpcTask.
130
// ============================================================================
131
132
RetryingTSRpcTask::RetryingTSRpcTask(Master *master,
133
                                     ThreadPool* callback_pool,
134
                                     std::unique_ptr<TSPicker> replica_picker,
135
                                     const scoped_refptr<TableInfo>& table)
136
  : master_(master),
137
    callback_pool_(callback_pool),
138
    replica_picker_(std::move(replica_picker)),
139
    table_(table),
140
    start_ts_(MonoTime::Now()),
141
421k
    deadline_(start_ts_ + FLAGS_unresponsive_ts_rpc_timeout_ms * 1ms) {
142
421k
}
143
144
281k
RetryingTSRpcTask::~RetryingTSRpcTask() {
145
281k
  auto state = state_.load(std::memory_order_acquire);
146
281k
  LOG_IF(DFATAL, !IsStateTerminal(state))
147
1
      << "Destroying " << this << " task in a wrong state: " << AsString(state);
148
281k
  
VLOG_WITH_FUNC1
(1) << "Destroying " << this << " in " << AsString(state)1
;
149
281k
}
150
151
465k
std::string RetryingTSRpcTask::LogPrefix() const {
152
465k
  return Format("$0 (task=$1, state=$2): ", description(), static_cast<const void*>(this), state());
153
465k
}
154
155
1.06M
std::string RetryingTSRpcTask::table_name() const {
156
1.06M
  return !table_ ? 
""0
: table_->ToString();
157
1.06M
}
158
159
// Send the subclass RPC request.
160
466k
Status RetryingTSRpcTask::Run() {
161
466k
  
VLOG_WITH_PREFIX43
(1) << "Start Running"43
;
162
466k
  attempt_start_ts_ = MonoTime::Now();
163
466k
  ++attempt_;
164
18.4E
  VLOG_WITH_PREFIX(1) << "Start Running, attempt: " << attempt_;
165
466k
  for (;;) {
166
466k
    auto task_state = state();
167
466k
    if (task_state == MonitoredTaskState::kAborted) {
168
0
      return STATUS(IllegalState, "Unable to run task because it has been aborted");
169
0
    }
170
466k
    if (task_state == MonitoredTaskState::kWaiting) {
171
466k
      break;
172
466k
    }
173
174
102
    LOG_IF_WITH_PREFIX(DFATAL, task_state != MonitoredTaskState::kScheduling)
175
102
        << "Expected task to be in kScheduling state but found: " << AsString(task_state);
176
177
    // We expect this case to be very rare, since we switching to waiting state right after
178
    // scheduling task on messenger. So just busy wait.
179
102
    std::this_thread::yield();
180
102
  }
181
182
466k
  Status s = ResetTSProxy();
183
466k
  if (!s.ok()) {
184
2.54k
    s = s.CloneAndPrepend("Failed to reset TS proxy");
185
2.54k
    LOG_WITH_PREFIX(INFO) << s;
186
2.54k
    if (s.IsExpired()) {
187
0
      TransitionToTerminalState(MonitoredTaskState::kWaiting, MonitoredTaskState::kFailed, s);
188
0
      UnregisterAsyncTask();
189
0
      return s;
190
0
    }
191
2.55k
    
if (2.54k
RescheduleWithBackoffDelay()2.54k
) {
192
2.55k
      return Status::OK();
193
2.55k
    }
194
195
18.4E
    auto state = this->state();
196
18.4E
    UnregisterAsyncTask(); // May delete this.
197
198
18.4E
    if (state == MonitoredTaskState::kFailed) {
199
0
      return s;
200
0
    }
201
18.4E
    if (state == MonitoredTaskState::kAborted) {
202
0
      return STATUS(IllegalState, "Unable to run task because it has been aborted");
203
0
    }
204
205
18.4E
    LOG_WITH_PREFIX(FATAL) << "Failed to change task to MonitoredTaskState::kFailed state from "
206
18.4E
                           << state;
207
18.4E
  } else {
208
463k
    rpc_.Reset();
209
463k
  }
210
211
  // Calculate and set the timeout deadline.
212
463k
  const MonoTime deadline = ComputeDeadline();
213
463k
  rpc_.set_deadline(deadline);
214
215
463k
  if (!PerformStateTransition(MonitoredTaskState::kWaiting, MonitoredTaskState::kRunning)) {
216
0
    if (state() == MonitoredTaskState::kAborted) {
217
0
      return STATUS(Aborted, "Unable to run task because it has been aborted");
218
0
    }
219
220
0
    LOG_WITH_PREFIX(DFATAL) <<
221
0
        "Task transition MonitoredTaskState::kWaiting -> MonitoredTaskState::kRunning failed";
222
0
    return Failed(STATUS_FORMAT(IllegalState, "Task in invalid state $0", state()));
223
0
  }
224
225
463k
  auto slowdown_flag_val = GetAtomicFlag(&FLAGS_TEST_slowdown_master_async_rpc_tasks_by_ms);
226
463k
  if (PREDICT_FALSE(slowdown_flag_val> 0)) {
227
0
    VLOG_WITH_PREFIX(1) << "Slowing down by " << slowdown_flag_val << " ms.";
228
0
    bool old_thread_restriction = ThreadRestrictions::SetWaitAllowed(true);
229
0
    SleepFor(MonoDelta::FromMilliseconds(slowdown_flag_val));
230
0
    ThreadRestrictions::SetWaitAllowed(old_thread_restriction);
231
0
    VLOG_WITH_PREFIX(2) << "Slowing down done. Resuming.";
232
0
  }
233
463k
  if (!SendRequest(attempt_) && 
!RescheduleWithBackoffDelay()416
) {
234
416
    UnregisterAsyncTask();  // May call 'delete this'.
235
416
  }
236
463k
  return Status::OK();
237
463k
}
238
239
458k
MonoTime RetryingTSRpcTask::ComputeDeadline() {
240
458k
  MonoTime timeout = MonoTime::Now();
241
458k
  timeout.AddDelta(MonoDelta::FromMilliseconds(FLAGS_master_ts_rpc_timeout_ms));
242
458k
  return MonoTime::Earliest(timeout, deadline_);
243
458k
}
244
245
// Abort this task and return its value before it was successfully aborted. If the task entered
246
// a different terminal state before we were able to abort it, return that state.
247
805
MonitoredTaskState RetryingTSRpcTask::AbortAndReturnPrevState(const Status& status) {
248
805
  auto prev_state = state();
249
805
  while (!IsStateTerminal(prev_state)) {
250
722
    auto expected = prev_state;
251
722
    if (state_.compare_exchange_weak(expected, MonitoredTaskState::kAborted)) {
252
722
      
VLOG_WITH_PREFIX_AND_FUNC0
(1)
253
0
          << "Aborted with: " << status << ", prev state: " << AsString(prev_state);
254
722
      AbortIfScheduled();
255
722
      Finished(status);
256
722
      UnregisterAsyncTask();
257
722
      return prev_state;
258
722
    }
259
0
    prev_state = state();
260
0
  }
261
83
  
VLOG_WITH_PREFIX_AND_FUNC0
(1)
262
0
      << "Already terminated, prev state: " << AsString(prev_state);
263
83
  UnregisterAsyncTask();
264
83
  return prev_state;
265
805
}
266
267
475
void RetryingTSRpcTask::AbortTask(const Status& status) {
268
475
  AbortAndReturnPrevState(status);
269
475
}
270
271
463k
void RetryingTSRpcTask::RpcCallback() {
272
  // Defer the actual work of the callback off of the reactor thread.
273
  // This is necessary because our callbacks often do synchronous writes to
274
  // the catalog table, and we can't do synchronous IO on the reactor.
275
  //
276
  // Note: This can fail on shutdown, so just print a warning for it.
277
463k
  Status s = callback_pool_->SubmitFunc(
278
463k
      std::bind(&RetryingTSRpcTask::DoRpcCallback, shared_from(this)));
279
18.4E
  VLOG_WITH_PREFIX_AND_FUNC(3) << "Submit status: " << s;
280
463k
  if (!s.ok()) {
281
0
    WARN_NOT_OK(s, "Could not submit to queue, probably shutting down");
282
0
    AbortTask(s);
283
0
  }
284
463k
}
285
286
// Handle the actual work of the RPC callback. This is run on the master's worker
287
// pool, rather than a reactor thread, so it may do blocking IO operations.
288
463k
void RetryingTSRpcTask::DoRpcCallback() {
289
463k
  
VLOG_WITH_PREFIX_AND_FUNC102
(3) << "Rpc status: " << rpc_.status()102
;
290
291
463k
  if (!rpc_.status().ok()) {
292
1.38k
    LOG_WITH_PREFIX(WARNING) << "TS " << target_ts_desc_->permanent_uuid() << ": "
293
1.38k
                             << type_name() << " RPC failed for tablet "
294
1.38k
                             << tablet_id() << ": " << rpc_.status().ToString();
295
1.38k
    if (!target_ts_desc_->IsLive() && 
type() == ASYNC_DELETE_REPLICA235
) {
296
222
      LOG_WITH_PREFIX(WARNING)
297
222
          << "TS " << target_ts_desc_->permanent_uuid() << ": delete failed for tablet "
298
222
          << tablet_id() << ". TS is DEAD. No further retry.";
299
222
      TransitionToCompleteState();
300
222
    }
301
461k
  } else if (state() != MonitoredTaskState::kAborted) {
302
461k
    HandleResponse(attempt_);  // Modifies state_.
303
461k
  }
304
463k
  UpdateMetrics(master_->GetMetric(type_name(), Master::AttemptMetric, description()),
305
463k
                attempt_start_ts_, type_name(), "attempt metric");
306
307
  // Schedule a retry if the RPC call was not successful.
308
463k
  if (RescheduleWithBackoffDelay()) {
309
42.8k
    return;
310
42.8k
  }
311
312
420k
  UnregisterAsyncTask();  // May call 'delete this'.
313
420k
}
314
315
42.5k
int RetryingTSRpcTask::num_max_retries() { return FLAGS_unresponsive_ts_rpc_retry_limit; }
316
317
8
int RetryingTSRpcTask::max_delay_ms() {
318
8
  return FLAGS_retrying_ts_rpc_max_delay_ms;
319
8
}
320
321
466k
bool RetryingTSRpcTask::RescheduleWithBackoffDelay() {
322
466k
  auto task_state = state();
323
466k
  if (task_state != MonitoredTaskState::kRunning &&
324
      // Allow kWaiting for task(s) that have never successfully ResetTSProxy().
325
466k
      
task_state != MonitoredTaskState::kWaiting423k
) {
326
420k
    if (task_state != MonitoredTaskState::kComplete) {
327
455
      LOG_WITH_PREFIX(INFO) << "No reschedule for this task: " << AsString(task_state);
328
455
    }
329
420k
    return false;
330
420k
  }
331
332
45.4k
  int attempt_threshold = std::numeric_limits<int>::max();
333
45.4k
  if (NoRetryTaskType()) {
334
0
    attempt_threshold = 0;
335
45.4k
  } else if (RetryLimitTaskType()) {
336
42.5k
    attempt_threshold = num_max_retries();
337
42.5k
  }
338
339
45.4k
  if (attempt_ > attempt_threshold) {
340
13
    auto status = STATUS_FORMAT(
341
13
        Aborted, "Reached maximum number of retries ($0)", attempt_threshold);
342
13
    LOG_WITH_PREFIX(WARNING)
343
13
        << status << " for request " << description()
344
13
        << ", task=" << this << " state=" << state();
345
13
    TransitionToFailedState(task_state, status);
346
13
    return false;
347
13
  }
348
349
45.4k
  MonoTime now = MonoTime::Now();
350
  // We assume it might take 10ms to process the request in the best case,
351
  // fail if we have less than that amount of time remaining.
352
45.4k
  int64_t millis_remaining = deadline_.GetDeltaSince(now).ToMilliseconds() - 10;
353
  // Exponential backoff with jitter.
354
45.4k
  int64_t base_delay_ms;
355
45.4k
  if (attempt_ <= 12) {
356
45.4k
    base_delay_ms = 1 << (attempt_ + 3);  // 1st retry delayed 2^4 ms, 2nd 2^5, etc.
357
45.4k
  } else {
358
3
    base_delay_ms = max_delay_ms();
359
3
  }
360
  // Normal rand is seeded by default with 1. Using the same for rand_r seed.
361
45.4k
  unsigned int seed = 1;
362
45.4k
  int64_t jitter_ms = rand_r(&seed) % 50;  // Add up to 50ms of additional random delay.
363
45.4k
  int64_t delay_millis = std::min<int64_t>(base_delay_ms + jitter_ms, millis_remaining);
364
365
45.4k
  if (delay_millis <= 0) {
366
17
    auto status = STATUS(TimedOut, "Request timed out");
367
17
    LOG_WITH_PREFIX(WARNING) << status;
368
17
    TransitionToFailedState(task_state, status);
369
17
    return false;
370
17
  }
371
372
45.4k
  LOG_WITH_PREFIX(INFO) << "Scheduling retry with a delay of " << delay_millis
373
45.4k
                        << "ms (attempt = " << attempt_ << " / " << attempt_threshold << ")...";
374
375
45.4k
  if (!PerformStateTransition(task_state, MonitoredTaskState::kScheduling)) {
376
0
    LOG_WITH_PREFIX(WARNING) << "Unable to mark this task as MonitoredTaskState::kScheduling";
377
0
    return false;
378
0
  }
379
45.4k
  auto task_id = master_->messenger()->ScheduleOnReactor(
380
45.4k
      std::bind(&RetryingTSRpcTask::RunDelayedTask, shared_from(this), _1),
381
45.4k
      MonoDelta::FromMilliseconds(delay_millis), SOURCE_LOCATION(), master_->messenger());
382
18.4E
  VLOG_WITH_PREFIX_AND_FUNC(4) << "Task id: " << task_id;
383
45.4k
  reactor_task_id_.store(task_id, std::memory_order_release);
384
385
45.4k
  if (task_id == rpc::kInvalidTaskId) {
386
1
    AbortTask(STATUS(Aborted, "Messenger closing"));
387
1
    UnregisterAsyncTask();
388
1
    return false;
389
1
  }
390
391
45.4k
  return TransitionToWaitingState(MonitoredTaskState::kScheduling);
392
45.4k
}
393
394
45.2k
void RetryingTSRpcTask::RunDelayedTask(const Status& status) {
395
45.2k
  if (state() == MonitoredTaskState::kAborted) {
396
51
    UnregisterAsyncTask();  // May delete this.
397
51
    return;
398
51
  }
399
400
45.1k
  if (!status.ok()) {
401
58
    LOG_WITH_PREFIX(WARNING) << "Async tablet task failed or was cancelled: " << status;
402
58
    if (status.IsAborted() || 
status.IsServiceUnavailable()0
) {
403
58
      AbortTask(status);
404
58
    }
405
58
    UnregisterAsyncTask();  // May delete this.
406
58
    return;
407
58
  }
408
409
45.1k
  auto log_prefix = LogPrefix(); // Save in case we need to log after deletion.
410
45.1k
  Status s = Run();  // May delete this.
411
45.1k
  if (!s.ok()) {
412
0
    LOG(WARNING) << log_prefix << "Async tablet task failed: " << s;
413
0
  }
414
45.1k
}
415
416
336k
void RetryingTSRpcTask::UnregisterAsyncTaskCallback() {}
417
418
0
Status RetryingTSRpcTask::Failed(const Status& status) {
419
0
  LOG_WITH_PREFIX(WARNING) << "Async task failed: " << status;
420
0
  Finished(status);
421
0
  UnregisterAsyncTask();
422
0
  return status;
423
0
}
424
425
421k
void RetryingTSRpcTask::UnregisterAsyncTask() {
426
  // Retain a reference to the object, in case RemoveTask would have removed the last one.
427
421k
  auto self = shared_from_this();
428
421k
  std::unique_lock<decltype(unregister_mutex_)> lock(unregister_mutex_);
429
421k
  UpdateMetrics(master_->GetMetric(type_name(), Master::TaskMetric, description()), start_ts_,
430
421k
                type_name(), "task metric");
431
432
421k
  auto s = state();
433
421k
  if (!IsStateTerminal(s)) {
434
0
    LOG_WITH_PREFIX(FATAL) << "Invalid task state " << s;
435
0
  }
436
421k
  end_ts_ = MonoTime::Now();
437
421k
  if (
table_ != nullptr421k
&& table_->RemoveTask(self)) {
438
    // We don't delete table while it have running tasks, so should check whether it was last task,
439
    // even it is not delete table task.
440
97.8k
    master_->catalog_manager()->CheckTableDeleted(table_);
441
97.8k
  }
442
  // Make sure to run the callbacks last, in case they rely on the task no longer being tracked
443
  // by the table.
444
421k
  UnregisterAsyncTaskCallback();
445
421k
}
446
447
722
void RetryingTSRpcTask::AbortIfScheduled() {
448
722
  auto reactor_task_id = reactor_task_id_.load(std::memory_order_acquire);
449
722
  
VLOG_WITH_PREFIX_AND_FUNC0
(1) << "Reactor task id: " << reactor_task_id0
;
450
722
  if (reactor_task_id != rpc::kInvalidTaskId) {
451
522
    master_->messenger()->AbortOnReactor(reactor_task_id);
452
522
  }
453
722
}
454
455
466k
Status RetryingTSRpcTask::ResetTSProxy() {
456
  // TODO: if there is no replica available, should we still keep the task running?
457
466k
  RETURN_NOT_OK(replica_picker_->PickReplica(&target_ts_desc_));
458
459
463k
  shared_ptr<tserver::TabletServerServiceProxy> ts_proxy;
460
463k
  shared_ptr<tserver::TabletServerAdminServiceProxy> ts_admin_proxy;
461
463k
  shared_ptr<consensus::ConsensusServiceProxy> consensus_proxy;
462
463k
  shared_ptr<tserver::TabletServerBackupServiceProxy> ts_backup_proxy;
463
464
463k
  RETURN_NOT_OK(target_ts_desc_->GetProxy(&ts_proxy));
465
463k
  RETURN_NOT_OK(target_ts_desc_->GetProxy(&ts_admin_proxy));
466
463k
  RETURN_NOT_OK(target_ts_desc_->GetProxy(&consensus_proxy));
467
463k
  RETURN_NOT_OK(target_ts_desc_->GetProxy(&ts_backup_proxy));
468
469
463k
  ts_proxy_.swap(ts_proxy);
470
463k
  ts_admin_proxy_.swap(ts_admin_proxy);
471
463k
  consensus_proxy_.swap(consensus_proxy);
472
463k
  ts_backup_proxy_.swap(ts_backup_proxy);
473
474
463k
  return Status::OK();
475
463k
}
476
477
void RetryingTSRpcTask::TransitionToTerminalState(MonitoredTaskState expected,
478
                                                  MonitoredTaskState terminal_state,
479
420k
                                                  const Status& status) {
480
420k
  if (!PerformStateTransition(expected, terminal_state)) {
481
0
    if (terminal_state != MonitoredTaskState::kAborted && state() == MonitoredTaskState::kAborted) {
482
0
      LOG_WITH_PREFIX(WARNING) << "Unable to perform transition " << expected << " -> "
483
0
                               << terminal_state << ". Task has been aborted";
484
0
    } else {
485
0
      LOG_WITH_PREFIX(DFATAL) << "State transition " << expected << " -> "
486
0
                              << terminal_state << " failed. Current task is in an invalid state: "
487
0
                              << state();
488
0
    }
489
0
    return;
490
0
  }
491
492
420k
  Finished(status);
493
420k
}
494
495
void RetryingTSRpcTask::TransitionToFailedState(server::MonitoredTaskState expected,
496
68
                                                const yb::Status& status) {
497
68
  TransitionToTerminalState(expected, MonitoredTaskState::kFailed, status);
498
68
}
499
500
420k
void RetryingTSRpcTask::TransitionToCompleteState() {
501
420k
  TransitionToTerminalState(
502
420k
      MonitoredTaskState::kRunning, MonitoredTaskState::kComplete, Status::OK());
503
420k
}
504
505
45.4k
bool RetryingTSRpcTask::TransitionToWaitingState(MonitoredTaskState expected) {
506
45.4k
  if (!PerformStateTransition(expected, MonitoredTaskState::kWaiting)) {
507
    // The only valid reason for state not being MonitoredTaskState is because the task got
508
    // aborted.
509
0
    if (state() != MonitoredTaskState::kAborted) {
510
0
      LOG_WITH_PREFIX(FATAL) << "Unable to mark task as MonitoredTaskState::kWaiting";
511
0
    }
512
0
    AbortIfScheduled();
513
0
    return false;
514
45.4k
  } else {
515
45.4k
    return true;
516
45.4k
  }
517
45.4k
}
518
519
// ============================================================================
520
//  Class AsyncTabletLeaderTask.
521
// ============================================================================
522
AsyncTabletLeaderTask::AsyncTabletLeaderTask(
523
    Master* master, ThreadPool* callback_pool, const scoped_refptr<TabletInfo>& tablet)
524
    : RetryingTSRpcTask(
525
          master, callback_pool, std::unique_ptr<TSPicker>(new PickLeaderReplica(tablet)),
526
          tablet->table().get()),
527
61.4k
      tablet_(tablet) {
528
61.4k
}
529
530
AsyncTabletLeaderTask::AsyncTabletLeaderTask(
531
    Master* master, ThreadPool* callback_pool, const scoped_refptr<TabletInfo>& tablet,
532
    const scoped_refptr<TableInfo>& table)
533
    : RetryingTSRpcTask(
534
          master, callback_pool, std::unique_ptr<TSPicker>(new PickLeaderReplica(tablet)), table),
535
28.0k
      tablet_(tablet) {
536
28.0k
}
537
538
80.5k
AsyncTabletLeaderTask::~AsyncTabletLeaderTask() = default;
539
540
189k
std::string AsyncTabletLeaderTask::description() const {
541
189k
  return Format("$0 RPC for tablet $1 ($2)", type_name(), tablet_, table_name());
542
189k
}
543
544
57.5k
TabletId AsyncTabletLeaderTask::tablet_id() const {
545
57.5k
  return tablet_->tablet_id();
546
57.5k
}
547
548
33.3k
TabletServerId AsyncTabletLeaderTask::permanent_uuid() const {
549
18.4E
  return 
target_ts_desc_ != nullptr33.3k
?
target_ts_desc_->permanent_uuid()33.3k
: "";
550
33.3k
}
551
552
// ============================================================================
553
//  Class AsyncCreateReplica.
554
// ============================================================================
555
AsyncCreateReplica::AsyncCreateReplica(Master *master,
556
                                       ThreadPool *callback_pool,
557
                                       const string& permanent_uuid,
558
                                       const scoped_refptr<TabletInfo>& tablet,
559
                                       const std::vector<SnapshotScheduleId>& snapshot_schedules)
560
  : RetrySpecificTSRpcTask(master, callback_pool, permanent_uuid, tablet->table().get()),
561
140k
    tablet_id_(tablet->tablet_id()) {
562
140k
  deadline_ = start_ts_;
563
140k
  deadline_.AddDelta(MonoDelta::FromMilliseconds(FLAGS_tablet_creation_timeout_ms));
564
565
140k
  auto table_lock = tablet->table()->LockForRead();
566
140k
  const SysTabletsEntryPB& tablet_pb = tablet->metadata().dirty().pb;
567
568
140k
  req_.set_dest_uuid(permanent_uuid);
569
140k
  req_.set_table_id(tablet->table()->id());
570
140k
  req_.set_tablet_id(tablet->tablet_id());
571
140k
  req_.set_table_type(tablet->table()->metadata().state().pb.table_type());
572
140k
  req_.mutable_partition()->CopyFrom(tablet_pb.partition());
573
140k
  req_.set_namespace_id(table_lock->pb.namespace_id());
574
140k
  req_.set_namespace_name(table_lock->pb.namespace_name());
575
140k
  req_.set_table_name(table_lock->pb.name());
576
140k
  req_.mutable_schema()->CopyFrom(table_lock->pb.schema());
577
140k
  req_.mutable_partition_schema()->CopyFrom(table_lock->pb.partition_schema());
578
140k
  req_.mutable_config()->CopyFrom(tablet_pb.committed_consensus_state().config());
579
140k
  req_.set_colocated(tablet_pb.colocated());
580
140k
  if (table_lock->pb.has_index_info()) {
581
14.7k
    req_.mutable_index_info()->CopyFrom(table_lock->pb.index_info());
582
14.7k
  }
583
140k
  auto& req_schedules = *req_.mutable_snapshot_schedules();
584
140k
  req_schedules.Reserve(narrow_cast<int>(snapshot_schedules.size()));
585
140k
  for (const auto& id : snapshot_schedules) {
586
72
    req_schedules.Add()->assign(id.AsSlice().cdata(), id.size());
587
72
  }
588
140k
}
589
590
281k
std::string AsyncCreateReplica::description() const {
591
281k
  return Format("CreateTablet RPC for tablet $0 ($1) on TS=$2",
592
281k
                tablet_id_, table_name(), permanent_uuid_);
593
281k
}
594
595
139k
void AsyncCreateReplica::HandleResponse(int attempt) {
596
139k
  if (resp_.has_error()) {
597
11
    Status s = StatusFromPB(resp_.error().status());
598
11
    if (s.IsAlreadyPresent()) {
599
0
      LOG_WITH_PREFIX(INFO) << "CreateTablet RPC for tablet " << tablet_id_
600
0
                            << " on TS " << permanent_uuid_ << " returned already present: "
601
0
                            << s;
602
0
      TransitionToCompleteState();
603
11
    } else {
604
11
      LOG_WITH_PREFIX(WARNING) << "CreateTablet RPC for tablet " << tablet_id_
605
11
                               << " on TS " << permanent_uuid_ << " failed: " << s;
606
11
    }
607
608
11
    return;
609
11
  }
610
611
139k
  TransitionToCompleteState();
612
139k
  
VLOG_WITH_PREFIX151
(1) << "TS " << permanent_uuid_ << ": complete on tablet " << tablet_id_151
;
613
139k
}
614
615
139k
bool AsyncCreateReplica::SendRequest(int attempt) {
616
139k
  ts_admin_proxy_->CreateTabletAsync(req_, &resp_, &rpc_, BindRpcCallback());
617
139k
  
VLOG_WITH_PREFIX1.05k
(1) << "Send create tablet request to " << permanent_uuid_ << ":\n"
618
1.05k
                      << " (attempt " << attempt << "):\n"
619
1.05k
                      << req_.DebugString();
620
139k
  return true;
621
139k
}
622
623
// ============================================================================
624
//  Class AsyncStartElection.
625
// ============================================================================
626
AsyncStartElection::AsyncStartElection(Master *master,
627
                                       ThreadPool *callback_pool,
628
                                       const string& permanent_uuid,
629
                                       const scoped_refptr<TabletInfo>& tablet)
630
  : RetrySpecificTSRpcTask(master, callback_pool, permanent_uuid, tablet->table().get()),
631
47.8k
    tablet_id_(tablet->tablet_id()) {
632
47.8k
  deadline_ = start_ts_;
633
47.8k
  deadline_.AddDelta(MonoDelta::FromMilliseconds(FLAGS_tablet_creation_timeout_ms));
634
635
47.8k
  req_.set_dest_uuid(permanent_uuid_);
636
47.8k
  req_.set_tablet_id(tablet_id_);
637
47.8k
  req_.set_initial_election(true);
638
47.8k
}
639
640
86.2k
void AsyncStartElection::HandleResponse(int attempt) {
641
86.2k
  if (resp_.has_error()) {
642
38.3k
    Status s = StatusFromPB(resp_.error().status());
643
38.3k
    if (!s.ok()) {
644
38.3k
      LOG_WITH_PREFIX(WARNING) << "RunLeaderElection RPC for tablet " << tablet_id_
645
38.3k
                               << " on TS " << permanent_uuid_ << " failed: " << s;
646
38.3k
    }
647
648
38.3k
    return;
649
38.3k
  }
650
651
47.8k
  TransitionToCompleteState();
652
47.8k
}
653
654
335k
std::string AsyncStartElection::description() const {
655
335k
  return Format("RunLeaderElection RPC for tablet $0 ($1) on TS=$2",
656
335k
                tablet_id_, table_name(), permanent_uuid_);
657
335k
}
658
659
86.2k
bool AsyncStartElection::SendRequest(int attempt) {
660
86.2k
  LOG_WITH_PREFIX(INFO) << Format(
661
86.2k
      "Hinted Leader start election at $0 for tablet $1, attempt $2",
662
86.2k
      permanent_uuid_, tablet_id_, attempt);
663
86.2k
  consensus_proxy_->RunLeaderElectionAsync(req_, &resp_, &rpc_, BindRpcCallback());
664
665
86.2k
  return true;
666
86.2k
}
667
668
// ============================================================================
669
//  Class AsyncDeleteReplica.
670
// ============================================================================
671
77.4k
void AsyncDeleteReplica::HandleResponse(int attempt) {
672
77.4k
  if (resp_.has_error()) {
673
1.99k
    Status status = StatusFromPB(resp_.error().status());
674
675
    // Do not retry on a fatal error
676
1.99k
    TabletServerErrorPB::Code code = resp_.error().code();
677
1.99k
    switch (code) {
678
43
      case TabletServerErrorPB::TABLET_NOT_FOUND:
679
43
        LOG_WITH_PREFIX(WARNING)
680
43
            << "TS " << permanent_uuid_ << ": delete failed for tablet " << tablet_id_
681
43
            << " because the tablet was not found. No further retry: "
682
43
            << status.ToString();
683
43
        TransitionToCompleteState();
684
43
        break;
685
79
      case TabletServerErrorPB::CAS_FAILED:
686
79
        LOG_WITH_PREFIX(WARNING)
687
79
            << "TS " << permanent_uuid_ << ": delete failed for tablet " << tablet_id_
688
79
            << " due to a CAS failure. No further retry: " << status.ToString();
689
79
        TransitionToCompleteState();
690
79
        break;
691
1
      case TabletServerErrorPB::WRONG_SERVER_UUID:
692
1
        LOG_WITH_PREFIX(WARNING)
693
1
            << "TS " << permanent_uuid_ << ": delete failed for tablet " << tablet_id_
694
1
            << " due to an incorrect UUID. No further retry: " << status.ToString();
695
1
        TransitionToCompleteState();
696
1
        break;
697
1.87k
      default:
698
1.87k
        LOG_WITH_PREFIX(WARNING)
699
1.87k
            << "TS " << permanent_uuid_ << ": delete failed for tablet " << tablet_id_
700
1.87k
            << " with error code " << TabletServerErrorPB::Code_Name(code)
701
1.87k
            << ": " << status.ToString();
702
1.87k
        break;
703
1.99k
    }
704
75.4k
  } else {
705
75.4k
    if (
table_75.4k
) {
706
75.4k
      LOG_WITH_PREFIX(INFO)
707
75.4k
          << "TS " << permanent_uuid_ << ": tablet " << tablet_id_
708
75.4k
          << " (table " << table_->ToString() << ") successfully done";
709
18.4E
    } else {
710
18.4E
      LOG_WITH_PREFIX(WARNING)
711
18.4E
          << "TS " << permanent_uuid_ << ": tablet " << tablet_id_
712
18.4E
          << " did not belong to a known table, but was successfully deleted";
713
18.4E
    }
714
75.4k
    TransitionToCompleteState();
715
18.4E
    VLOG_WITH_PREFIX(1) << "TS " << permanent_uuid_ << ": complete on tablet " << tablet_id_;
716
75.4k
  }
717
77.4k
}
718
719
237k
std::string AsyncDeleteReplica::description() const {
720
237k
  return Format("$0Tablet RPC for tablet $1 ($2) on TS=$3",
721
237k
                hide_only_ ? 
"Hide"108
:
"Delete"236k
, tablet_id_, table_name(), permanent_uuid_);
722
237k
}
723
724
78.1k
bool AsyncDeleteReplica::SendRequest(int attempt) {
725
78.1k
  tserver::DeleteTabletRequestPB req;
726
78.1k
  req.set_dest_uuid(permanent_uuid_);
727
78.1k
  req.set_tablet_id(tablet_id_);
728
78.1k
  req.set_reason(reason_);
729
78.1k
  req.set_delete_type(delete_type_);
730
78.1k
  if (hide_only_) {
731
36
    req.set_hide_only(hide_only_);
732
36
  }
733
78.1k
  if (cas_config_opid_index_less_or_equal_) {
734
4.16k
    req.set_cas_config_opid_index_less_or_equal(*cas_config_opid_index_less_or_equal_);
735
4.16k
  }
736
737
78.1k
  ts_admin_proxy_->DeleteTabletAsync(req, &resp_, &rpc_, BindRpcCallback());
738
18.4E
  VLOG_WITH_PREFIX(1) << "Send delete tablet request to " << permanent_uuid_
739
18.4E
                      << " (attempt " << attempt << "):\n"
740
18.4E
                      << req.DebugString();
741
78.1k
  return true;
742
78.1k
}
743
744
75.8k
void AsyncDeleteReplica::UnregisterAsyncTaskCallback() {
745
  // Only notify if we are in a success state.
746
75.8k
  if (state() == MonitoredTaskState::kComplete) {
747
75.8k
    master_->catalog_manager()->NotifyTabletDeleteFinished(permanent_uuid_, tablet_id_, table());
748
75.8k
  }
749
75.8k
}
750
751
// ============================================================================
752
//  Class AsyncAlterTable.
753
// ============================================================================
754
32.4k
void AsyncAlterTable::HandleResponse(int attempt) {
755
32.4k
  if (PREDICT_FALSE(FLAGS_TEST_slowdown_alter_table_rpcs_ms > 0)) {
756
0
    VLOG_WITH_PREFIX(1) << "Sleeping for " << tablet_->tablet_id()
757
0
                        << FLAGS_TEST_slowdown_alter_table_rpcs_ms
758
0
                        << "ms before returning response in async alter table request handler";
759
0
    SleepFor(MonoDelta::FromMilliseconds(FLAGS_TEST_slowdown_alter_table_rpcs_ms));
760
0
  }
761
762
32.4k
  if (resp_.has_error()) {
763
615
    Status status = StatusFromPB(resp_.error().status());
764
765
615
    LOG_WITH_PREFIX(WARNING) << "TS " << permanent_uuid() << " failed: "
766
615
                             << status << " for version " << schema_version_;
767
768
    // Do not retry on a fatal error
769
615
    switch (resp_.error().code()) {
770
0
      case TabletServerErrorPB::TABLET_NOT_FOUND:
771
0
      case TabletServerErrorPB::MISMATCHED_SCHEMA:
772
0
      case TabletServerErrorPB::TABLET_HAS_A_NEWER_SCHEMA:
773
0
        TransitionToCompleteState();
774
0
        break;
775
615
      default:
776
615
        break;
777
615
    }
778
31.7k
  } else {
779
31.7k
    TransitionToCompleteState();
780
31.7k
    
VLOG_WITH_PREFIX49
(1)
781
49
        << "TS " << permanent_uuid() << " completed: for version " << schema_version_;
782
31.7k
  }
783
784
32.4k
  server::UpdateClock(resp_, master_->clock());
785
786
32.4k
  if (state() == MonitoredTaskState::kComplete) {
787
    // TODO: proper error handling here. Not critical, since TSHeartbeat will retry on failure.
788
31.7k
    WARN_NOT_OK(
789
31.7k
        master_->catalog_manager()->HandleTabletSchemaVersionReport(
790
31.7k
            tablet_.get(), schema_version_, table()),
791
31.7k
        Format(
792
31.7k
            "$0 failed while running AsyncAlterTable::HandleResponse. Response $1",
793
31.7k
            description(), resp_.ShortDebugString()));
794
31.7k
  } else {
795
624
    
VLOG_WITH_PREFIX9
(1) << "Task is not completed " << tablet_->ToString() << " for version "
796
9
                        << schema_version_;
797
624
  }
798
32.4k
}
799
800
28.6k
TableType AsyncAlterTable::table_type() const {
801
28.6k
  return tablet_->table()->GetTableType();
802
28.6k
}
803
804
28.6k
bool AsyncAlterTable::SendRequest(int attempt) {
805
28.6k
  
VLOG_WITH_PREFIX58
(1) << "Send alter table request to " << permanent_uuid() << " for "
806
58
                      << tablet_->tablet_id() << " waiting for a read lock.";
807
808
28.6k
  tablet::ChangeMetadataRequestPB req;
809
28.6k
  {
810
28.6k
    auto l = table_->LockForRead();
811
18.4E
    VLOG_WITH_PREFIX(1) << "Send alter table request to " << permanent_uuid() << " for "
812
18.4E
                        << tablet_->tablet_id() << " obtained the read lock.";
813
814
28.6k
    req.set_schema_version(l->pb.version());
815
28.6k
    req.set_dest_uuid(permanent_uuid());
816
28.6k
    req.set_tablet_id(tablet_->tablet_id());
817
28.6k
    req.set_alter_table_id(table_->id());
818
819
28.6k
    if (l->pb.has_wal_retention_secs()) {
820
5.22k
      req.set_wal_retention_secs(l->pb.wal_retention_secs());
821
5.22k
    }
822
823
28.6k
    req.mutable_schema()->CopyFrom(l->pb.schema());
824
28.6k
    req.set_new_table_name(l->pb.name());
825
28.6k
    req.mutable_indexes()->CopyFrom(l->pb.indexes());
826
28.6k
    req.set_propagated_hybrid_time(master_->clock()->Now().ToUint64());
827
828
28.6k
    if (table_type() == TableType::PGSQL_TABLE_TYPE && 
!transaction_id_.IsNil()13.3k
) {
829
1.18k
      
VLOG_WITH_PREFIX0
(1) << "Transaction ID is provided for tablet " << tablet_->tablet_id()
830
0
          << " with ID " << transaction_id_.ToString() << " for ALTER TABLE operation";
831
1.18k
      req.set_should_abort_active_txns(true);
832
1.18k
      req.set_transaction_id(transaction_id_.ToString());
833
1.18k
    }
834
835
28.6k
    schema_version_ = l->pb.version();
836
28.6k
  }
837
838
28.6k
  ts_admin_proxy_->AlterSchemaAsync(req, &resp_, &rpc_, BindRpcCallback());
839
28.6k
  
VLOG_WITH_PREFIX32
(1)
840
32
      << "Send alter table request to " << permanent_uuid() << " for " << tablet_->tablet_id()
841
32
      << " (attempt " << attempt << "):\n" << req.DebugString();
842
28.6k
  return true;
843
28.6k
}
844
845
3.79k
bool AsyncBackfillDone::SendRequest(int attempt) {
846
3.79k
  
VLOG_WITH_PREFIX11
(1)
847
11
      << "Send alter table request to " << permanent_uuid() << " for " << tablet_->tablet_id()
848
11
      << " version " << schema_version_ << " waiting for a read lock.";
849
850
3.79k
  tablet::ChangeMetadataRequestPB req;
851
3.79k
  {
852
3.79k
    auto l = table_->LockForRead();
853
18.4E
    VLOG_WITH_PREFIX(1)
854
18.4E
        << "Send alter table request to " << permanent_uuid() << " for " << tablet_->tablet_id()
855
18.4E
        << " version " << schema_version_ << " obtained the read lock.";
856
857
3.79k
    req.set_backfill_done_table_id(table_id_);
858
3.79k
    req.set_dest_uuid(permanent_uuid());
859
3.79k
    req.set_tablet_id(tablet_->tablet_id());
860
3.79k
    req.set_propagated_hybrid_time(master_->clock()->Now().ToUint64());
861
3.79k
    req.set_mark_backfill_done(true);
862
3.79k
    schema_version_ = l->pb.version();
863
3.79k
  }
864
865
3.79k
  ts_admin_proxy_->BackfillDoneAsync(req, &resp_, &rpc_, BindRpcCallback());
866
3.79k
  
VLOG_WITH_PREFIX5
(1)
867
5
      << "Send backfill done request to " << permanent_uuid() << " for " << tablet_->tablet_id()
868
5
      << " (attempt " << attempt << "):\n" << req.DebugString();
869
3.79k
  return true;
870
3.79k
}
871
872
// ============================================================================
873
//  Class AsyncCopartitionTable.
874
// ============================================================================
875
AsyncCopartitionTable::AsyncCopartitionTable(Master *master,
876
                                             ThreadPool* callback_pool,
877
                                             const scoped_refptr<TabletInfo>& tablet,
878
                                             const scoped_refptr<TableInfo>& table)
879
    : RetryingTSRpcTask(master,
880
                        callback_pool,
881
                        std::unique_ptr<TSPicker>(new PickLeaderReplica(tablet)),
882
                        table.get()),
883
0
      tablet_(tablet), table_(table) {
884
0
}
885
886
0
string AsyncCopartitionTable::description() const {
887
0
  return "Copartition Table RPC for tablet " + tablet_->ToString()
888
0
          + " for " + table_->ToString();
889
0
}
890
891
0
TabletId AsyncCopartitionTable::tablet_id() const {
892
0
  return tablet_->tablet_id();
893
0
}
894
895
0
TabletServerId AsyncCopartitionTable::permanent_uuid() const {
896
0
  return target_ts_desc_ != nullptr ? target_ts_desc_->permanent_uuid() : "";
897
0
}
898
899
// TODO(sagnik): modify this to fill all relevant fields for the AsyncCopartition request.
900
0
bool AsyncCopartitionTable::SendRequest(int attempt) {
901
902
0
  tserver::CopartitionTableRequestPB req;
903
0
  req.set_dest_uuid(permanent_uuid());
904
0
  req.set_tablet_id(tablet_->tablet_id());
905
0
  req.set_table_id(table_->id());
906
0
  req.set_table_name(table_->name());
907
908
0
  ts_admin_proxy_->CopartitionTableAsync(req, &resp_, &rpc_, BindRpcCallback());
909
0
  VLOG_WITH_PREFIX(1) << "Send copartition table request to " << permanent_uuid()
910
0
                      << " (attempt " << attempt << "):\n" << req.DebugString();
911
0
  return true;
912
0
}
913
914
// TODO(sagnik): modify this to handle the AsyncCopartition Response and retry fail as necessary.
915
0
void AsyncCopartitionTable::HandleResponse(int attempt) {
916
0
  LOG_WITH_PREFIX(INFO) << "master can't handle server responses yet";
917
0
}
918
919
// ============================================================================
920
//  Class AsyncTruncate.
921
// ============================================================================
922
57.2k
void AsyncTruncate::HandleResponse(int attempt) {
923
57.2k
  if (resp_.has_error()) {
924
10
    const Status s = StatusFromPB(resp_.error().status());
925
10
    const TabletServerErrorPB::Code code = resp_.error().code();
926
10
    LOG_WITH_PREFIX(WARNING)
927
10
        << "TS " << permanent_uuid() << ": truncate failed for tablet " << tablet_id()
928
10
        << " with error code " << TabletServerErrorPB::Code_Name(code) << ": " << s;
929
57.2k
  } else {
930
57.2k
    
VLOG_WITH_PREFIX57
(1)
931
57
        << "TS " << permanent_uuid() << ": truncate complete on tablet " << tablet_id();
932
57.2k
    TransitionToCompleteState();
933
57.2k
  }
934
935
57.2k
  server::UpdateClock(resp_, master_->clock());
936
57.2k
}
937
938
56.9k
bool AsyncTruncate::SendRequest(int attempt) {
939
56.9k
  tserver::TruncateRequestPB req;
940
56.9k
  req.set_tablet_id(tablet_id());
941
56.9k
  req.set_propagated_hybrid_time(master_->clock()->Now().ToUint64());
942
56.9k
  ts_proxy_->TruncateAsync(req, &resp_, &rpc_, BindRpcCallback());
943
56.9k
  
VLOG_WITH_PREFIX185
(1) << "Send truncate tablet request to " << permanent_uuid()
944
185
                      << " (attempt " << attempt << "):\n" << req.DebugString();
945
56.9k
  return true;
946
56.9k
}
947
948
// ============================================================================
949
//  Class CommonInfoForRaftTask.
950
// ============================================================================
951
CommonInfoForRaftTask::CommonInfoForRaftTask(
952
    Master* master, ThreadPool* callback_pool, const scoped_refptr<TabletInfo>& tablet,
953
    const consensus::ConsensusStatePB& cstate, const string& change_config_ts_uuid)
954
    : RetryingTSRpcTask(
955
          master, callback_pool, std::unique_ptr<TSPicker>(new PickLeaderReplica(tablet)),
956
          tablet->table()),
957
      tablet_(tablet),
958
      cstate_(cstate),
959
58.7k
      change_config_ts_uuid_(change_config_ts_uuid) {
960
58.7k
  deadline_ = MonoTime::Max();  // Never time out.
961
58.7k
}
962
963
51.8k
CommonInfoForRaftTask::~CommonInfoForRaftTask() = default;
964
965
1.69k
TabletId CommonInfoForRaftTask::tablet_id() const {
966
1.69k
  return tablet_->tablet_id();
967
1.69k
}
968
969
190k
TabletServerId CommonInfoForRaftTask::permanent_uuid() const {
970
190k
  return target_ts_desc_ != nullptr ? 
target_ts_desc_->permanent_uuid()190k
:
""20
;
971
190k
}
972
973
// ============================================================================
974
//  Class AsyncChangeConfigTask.
975
// ============================================================================
976
17.9k
string AsyncChangeConfigTask::description() const {
977
17.9k
  return Format(
978
17.9k
      "$0 RPC for tablet $1 ($2) on peer $3 with cas_config_opid_index $4", type_name(),
979
17.9k
      tablet_->tablet_id(), table_name(), permanent_uuid(), cstate_.config().opid_index());
980
17.9k
}
981
982
5.52k
bool AsyncChangeConfigTask::SendRequest(int attempt) {
983
  // Bail if we're retrying in vain.
984
5.52k
  int64_t latest_index;
985
5.52k
  {
986
5.52k
    auto tablet_lock = tablet_->LockForRead();
987
5.52k
    latest_index = tablet_lock->pb.committed_consensus_state().config().opid_index();
988
    // Adding this logic for a race condition that occurs in this scenario:
989
    // 1. CatalogManager receives a DeleteTable request and sends DeleteTablet requests to the
990
    // tservers, but doesn't yet update the tablet in memory state to not running.
991
    // 2. The CB runs and sees that this tablet is still running, sees that it is over-replicated
992
    // (since the placement now dictates it should have 0 replicas),
993
    // but before it can send the ChangeConfig RPC to a tserver.
994
    // 3. That tserver processes the DeleteTablet request.
995
    // 4. The ChangeConfig RPC now returns tablet not found,
996
    // which prompts an infinite retry of the RPC.
997
5.52k
    bool tablet_running = tablet_lock->is_running();
998
5.52k
    if (!tablet_running) {
999
0
      AbortTask(STATUS(Aborted, "Tablet is not running"));
1000
0
      return false;
1001
0
    }
1002
5.52k
  }
1003
5.52k
  if (latest_index > cstate_.config().opid_index()) {
1004
395
    auto status = STATUS_FORMAT(
1005
395
        Aborted,
1006
395
        "Latest config for has opid_index of $0 while this task has opid_index of $1",
1007
395
        latest_index, cstate_.config().opid_index());
1008
395
    LOG_WITH_PREFIX(INFO) << status;
1009
395
    AbortTask(status);
1010
395
    return false;
1011
395
  }
1012
1013
  // Logging should be covered inside based on failure reasons.
1014
5.12k
  auto prepare_status = PrepareRequest(attempt);
1015
5.12k
  if (!prepare_status.ok()) {
1016
0
    AbortTask(prepare_status);
1017
0
    return false;
1018
0
  }
1019
1020
5.12k
  consensus_proxy_->ChangeConfigAsync(req_, &resp_, &rpc_, BindRpcCallback());
1021
5.12k
  
VLOG_WITH_PREFIX5
(1) << "Task " << description() << " sent request:\n" << req_.DebugString()5
;
1022
5.12k
  return true;
1023
5.12k
}
1024
1025
5.10k
void AsyncChangeConfigTask::HandleResponse(int attempt) {
1026
5.10k
  if (!resp_.has_error()) {
1027
3.46k
    TransitionToCompleteState();
1028
3.46k
    LOG_WITH_PREFIX(INFO) << Substitute(
1029
3.46k
        "Change config succeeded on leader TS $0 for tablet $1 with type $2 for replica $3",
1030
3.46k
        permanent_uuid(), tablet_->tablet_id(), type_name(), change_config_ts_uuid_);
1031
3.46k
    return;
1032
3.46k
  }
1033
1034
1.63k
  Status status = StatusFromPB(resp_.error().status());
1035
1036
  // Do not retry on some known errors, otherwise retry forever or until cancelled.
1037
1.63k
  switch (resp_.error().code()) {
1038
13
    case TabletServerErrorPB::CAS_FAILED:
1039
13
    case TabletServerErrorPB::ADD_CHANGE_CONFIG_ALREADY_PRESENT:
1040
13
    case TabletServerErrorPB::REMOVE_CHANGE_CONFIG_NOT_PRESENT:
1041
807
    case TabletServerErrorPB::NOT_THE_LEADER:
1042
807
      LOG_WITH_PREFIX(WARNING) << "ChangeConfig() failed on leader " << permanent_uuid()
1043
807
                               << ". No further retry: " << status.ToString();
1044
807
      TransitionToCompleteState();
1045
807
      break;
1046
830
    default:
1047
830
      LOG_WITH_PREFIX(INFO) << "ChangeConfig() failed on leader " << permanent_uuid()
1048
830
                            << " due to error "
1049
830
                            << TabletServerErrorPB::Code_Name(resp_.error().code())
1050
830
                            << ". This operation will be retried. Error detail: "
1051
830
                            << status.ToString();
1052
830
      break;
1053
1.63k
  }
1054
1.63k
}
1055
1056
// ============================================================================
1057
//  Class AsyncAddServerTask.
1058
// ============================================================================
1059
2.51k
Status AsyncAddServerTask::PrepareRequest(int attempt) {
1060
  // Select the replica we wish to add to the config.
1061
  // Do not include current members of the config.
1062
2.51k
  std::unordered_set<string> replica_uuids;
1063
7.39k
  for (const RaftPeerPB& peer : cstate_.config().peers()) {
1064
7.39k
    InsertOrDie(&replica_uuids, peer.permanent_uuid());
1065
7.39k
  }
1066
2.51k
  TSDescriptorVector ts_descs;
1067
2.51k
  master_->ts_manager()->GetAllLiveDescriptors(&ts_descs);
1068
2.51k
  shared_ptr<TSDescriptor> replacement_replica;
1069
5.98k
  for (auto ts_desc : ts_descs) {
1070
5.98k
    if (ts_desc->permanent_uuid() == change_config_ts_uuid_) {
1071
      // This is given by the client, so we assume it is a well chosen uuid.
1072
2.51k
      replacement_replica = ts_desc;
1073
2.51k
      break;
1074
2.51k
    }
1075
5.98k
  }
1076
2.51k
  if (PREDICT_FALSE(!replacement_replica)) {
1077
0
    auto status = STATUS_FORMAT(
1078
0
        TimedOut, "Could not find desired replica $0 in live set", change_config_ts_uuid_);
1079
0
    LOG_WITH_PREFIX(WARNING) << status;
1080
0
    return status;
1081
0
  }
1082
1083
2.51k
  req_.set_dest_uuid(permanent_uuid());
1084
2.51k
  req_.set_tablet_id(tablet_->tablet_id());
1085
2.51k
  req_.set_type(consensus::ADD_SERVER);
1086
2.51k
  req_.set_cas_config_opid_index(cstate_.config().opid_index());
1087
2.51k
  RaftPeerPB* peer = req_.mutable_server();
1088
2.51k
  peer->set_permanent_uuid(replacement_replica->permanent_uuid());
1089
2.51k
  peer->set_member_type(member_type_);
1090
2.51k
  TSRegistrationPB peer_reg = replacement_replica->GetRegistration();
1091
1092
2.51k
  if (peer_reg.common().private_rpc_addresses().empty()) {
1093
0
    auto status = STATUS_FORMAT(
1094
0
        IllegalState, "Candidate replacement $0 has no registered rpc address: $1",
1095
0
        replacement_replica->permanent_uuid(), peer_reg);
1096
0
    YB_LOG_EVERY_N(WARNING, 100) << LogPrefix() << status;
1097
0
    return status;
1098
0
  }
1099
1100
2.51k
  TakeRegistration(peer_reg.mutable_common(), peer);
1101
1102
2.51k
  return Status::OK();
1103
2.51k
}
1104
1105
// ============================================================================
1106
//  Class AsyncRemoveServerTask.
1107
// ============================================================================
1108
2.61k
Status AsyncRemoveServerTask::PrepareRequest(int attempt) {
1109
2.61k
  bool found = false;
1110
10.0k
  for (const RaftPeerPB& peer : cstate_.config().peers()) {
1111
10.0k
    if (change_config_ts_uuid_ == peer.permanent_uuid()) {
1112
2.61k
      found = true;
1113
2.61k
    }
1114
10.0k
  }
1115
1116
2.61k
  if (!found) {
1117
0
    auto status = STATUS_FORMAT(
1118
0
        NotFound, "Asked to remove TS with uuid $0 but could not find it in config peers!",
1119
0
        change_config_ts_uuid_);
1120
0
    LOG_WITH_PREFIX(WARNING) << status;
1121
0
    return status;
1122
0
  }
1123
1124
2.61k
  req_.set_dest_uuid(permanent_uuid());
1125
2.61k
  req_.set_tablet_id(tablet_->tablet_id());
1126
2.61k
  req_.set_type(consensus::REMOVE_SERVER);
1127
2.61k
  req_.set_cas_config_opid_index(cstate_.config().opid_index());
1128
2.61k
  RaftPeerPB* peer = req_.mutable_server();
1129
2.61k
  peer->set_permanent_uuid(change_config_ts_uuid_);
1130
1131
2.61k
  return Status::OK();
1132
2.61k
}
1133
1134
// ============================================================================
1135
//  Class AsyncTryStepDown.
1136
// ============================================================================
1137
54.1k
Status AsyncTryStepDown::PrepareRequest(int attempt) {
1138
54.1k
  LOG_WITH_PREFIX(INFO) << Substitute("Prep Leader step down $0, leader_uuid=$1, change_ts_uuid=$2",
1139
54.1k
                                      attempt, permanent_uuid(), change_config_ts_uuid_);
1140
54.1k
  if (attempt > 1) {
1141
18
    return STATUS(RuntimeError, "Retry is not allowed");
1142
18
  }
1143
1144
  // If we were asked to remove the server even if it is the leader, we have to call StepDown, but
1145
  // only if our current leader is the server we are asked to remove.
1146
54.0k
  if (permanent_uuid() != change_config_ts_uuid_) {
1147
3
    auto status = STATUS_FORMAT(
1148
3
        IllegalState,
1149
3
        "Incorrect state config leader $0 does not match target uuid $1 for a leader step down op",
1150
3
        permanent_uuid(), change_config_ts_uuid_);
1151
3
    LOG_WITH_PREFIX(WARNING) << status;
1152
3
    return status;
1153
3
  }
1154
1155
54.0k
  stepdown_req_.set_dest_uuid(change_config_ts_uuid_);
1156
54.0k
  stepdown_req_.set_tablet_id(tablet_->tablet_id());
1157
54.0k
  if (!new_leader_uuid_.empty()) {
1158
53.2k
    stepdown_req_.set_new_leader_uuid(new_leader_uuid_);
1159
53.2k
  }
1160
1161
54.0k
  return Status::OK();
1162
54.0k
}
1163
1164
54.1k
bool AsyncTryStepDown::SendRequest(int attempt) {
1165
54.1k
  auto prepare_status = PrepareRequest(attempt);
1166
54.1k
  if (!prepare_status.ok()) {
1167
21
    AbortTask(prepare_status);
1168
21
    return false;
1169
21
  }
1170
1171
54.0k
  LOG_WITH_PREFIX(INFO) << Substitute("Stepping down leader $0 for tablet $1",
1172
54.0k
                                      change_config_ts_uuid_, tablet_->tablet_id());
1173
54.0k
  consensus_proxy_->LeaderStepDownAsync(
1174
54.0k
      stepdown_req_, &stepdown_resp_, &rpc_, BindRpcCallback());
1175
1176
54.0k
  return true;
1177
54.1k
}
1178
1179
54.0k
void AsyncTryStepDown::HandleResponse(int attempt) {
1180
54.0k
  if (!rpc_.status().ok()) {
1181
0
    AbortTask(rpc_.status());
1182
0
    LOG_WITH_PREFIX(WARNING) << Substitute(
1183
0
        "Got error on stepdown for tablet $0 with leader $1, attempt $2 and error $3",
1184
0
        tablet_->tablet_id(), permanent_uuid(), attempt, rpc_.status().ToString());
1185
1186
0
    return;
1187
0
  }
1188
1189
54.0k
  TransitionToCompleteState();
1190
54.0k
  const bool stepdown_failed = stepdown_resp_.error().status().code() != AppStatusPB::OK;
1191
54.0k
  LOG_WITH_PREFIX(INFO) << Format(
1192
54.0k
      "Leader step down done attempt=$0, leader_uuid=$1, change_uuid=$2, "
1193
54.0k
      "error=$3, failed=$4, should_remove=$5 for tablet $6.",
1194
54.0k
      attempt, permanent_uuid(), change_config_ts_uuid_, stepdown_resp_.error(),
1195
54.0k
      stepdown_failed, should_remove_, tablet_->tablet_id());
1196
1197
54.0k
  if (
stepdown_failed54.0k
) {
1198
54.0k
    tablet_->RegisterLeaderStepDownFailure(change_config_ts_uuid_,
1199
54.0k
        MonoDelta::FromMilliseconds(stepdown_resp_.has_time_since_election_failure_ms() ?
1200
53.1k
                                    
stepdown_resp_.time_since_election_failure_ms()903
: 0));
1201
54.0k
  }
1202
1203
54.0k
  if (should_remove_) {
1204
779
    auto task = std::make_shared<AsyncRemoveServerTask>(
1205
779
        master_, callback_pool_, tablet_, cstate_, change_config_ts_uuid_);
1206
1207
779
    tablet_->table()->AddTask(task);
1208
779
    Status status = task->Run();
1209
779
    WARN_NOT_OK(status, "Failed to send new RemoveServer request");
1210
779
  }
1211
54.0k
}
1212
1213
// ============================================================================
1214
//  Class AsyncAddTableToTablet.
1215
// ============================================================================
1216
AsyncAddTableToTablet::AsyncAddTableToTablet(
1217
    Master* master, ThreadPool* callback_pool, const scoped_refptr<TabletInfo>& tablet,
1218
    const scoped_refptr<TableInfo>& table)
1219
    : RetryingTSRpcTask(
1220
          master, callback_pool, std::make_unique<PickLeaderReplica>(tablet), table.get()),
1221
      tablet_(tablet),
1222
      table_(table),
1223
127
      tablet_id_(tablet->tablet_id()) {
1224
127
  req_.set_tablet_id(tablet->id());
1225
127
  auto& add_table = *req_.mutable_add_table();
1226
127
  add_table.set_table_id(table_->id());
1227
127
  add_table.set_table_name(table_->name());
1228
127
  add_table.set_table_type(table_->GetTableType());
1229
127
  {
1230
127
    auto l = table->LockForRead();
1231
127
    add_table.set_schema_version(l->pb.version());
1232
127
    *add_table.mutable_schema() = l->pb.schema();
1233
127
    *add_table.mutable_partition_schema() = l->pb.partition_schema();
1234
127
  }
1235
127
}
1236
1237
256
string AsyncAddTableToTablet::description() const {
1238
256
  return Substitute("AddTableToTablet RPC ($0) ($1)", table_->ToString(), tablet_->ToString());
1239
256
}
1240
1241
126
void AsyncAddTableToTablet::HandleResponse(int attempt) {
1242
126
  if (!rpc_.status().ok()) {
1243
0
    AbortTask(rpc_.status());
1244
0
    LOG_WITH_PREFIX(WARNING) << Substitute(
1245
0
        "Got error when adding table $0 to tablet $1, attempt $2 and error $3",
1246
0
        table_->ToString(), tablet_->ToString(), attempt, rpc_.status().ToString());
1247
0
    return;
1248
0
  }
1249
126
  if (resp_.has_error()) {
1250
0
    LOG_WITH_PREFIX(WARNING) << "AddTableToTablet() responded with error code "
1251
0
                             << TabletServerErrorPB_Code_Name(resp_.error().code());
1252
0
    switch (resp_.error().code()) {
1253
0
      case TabletServerErrorPB::LEADER_NOT_READY_TO_SERVE: FALLTHROUGH_INTENDED;
1254
0
      case TabletServerErrorPB::NOT_THE_LEADER:
1255
0
        TransitionToWaitingState(MonitoredTaskState::kRunning);
1256
0
        break;
1257
0
      default:
1258
0
        TransitionToCompleteState();
1259
0
        break;
1260
0
    }
1261
1262
0
    return;
1263
0
  }
1264
1265
126
  TransitionToCompleteState();
1266
126
}
1267
1268
126
bool AsyncAddTableToTablet::SendRequest(int attempt) {
1269
126
  ts_admin_proxy_->AddTableToTabletAsync(req_, &resp_, &rpc_, BindRpcCallback());
1270
126
  
VLOG_WITH_PREFIX0
(1)
1271
0
      << "Send AddTableToTablet request (attempt " << attempt << "):\n" << req_.DebugString();
1272
126
  return true;
1273
126
}
1274
1275
// ============================================================================
1276
//  Class AsyncRemoveTableFromTablet.
1277
// ============================================================================
1278
AsyncRemoveTableFromTablet::AsyncRemoveTableFromTablet(
1279
    Master* master, ThreadPool* callback_pool, const scoped_refptr<TabletInfo>& tablet,
1280
    const scoped_refptr<TableInfo>& table)
1281
    : RetryingTSRpcTask(
1282
          master, callback_pool, std::make_unique<PickLeaderReplica>(tablet), table.get()),
1283
      table_(table),
1284
      tablet_(tablet),
1285
81
      tablet_id_(tablet->tablet_id()) {
1286
81
  req_.set_tablet_id(tablet->id());
1287
81
  req_.set_remove_table_id(table->id());
1288
81
}
1289
1290
169
string AsyncRemoveTableFromTablet::description() const {
1291
169
  return Substitute("RemoveTableFromTablet RPC ($0) ($1)", table_->ToString(), tablet_->ToString());
1292
169
}
1293
1294
81
void AsyncRemoveTableFromTablet::HandleResponse(int attempt) {
1295
81
  if (!rpc_.status().ok()) {
1296
0
    AbortTask(rpc_.status());
1297
0
    LOG_WITH_PREFIX(WARNING) << Substitute(
1298
0
        "Got error when removing table $0 from tablet $1, attempt $2 and error $3",
1299
0
        table_->ToString(), tablet_->ToString(), attempt, rpc_.status().ToString());
1300
0
    return;
1301
0
  }
1302
81
  if (resp_.has_error()) {
1303
1
    LOG_WITH_PREFIX(WARNING) << "RemoveTableFromTablet() responded with error code "
1304
1
                             << TabletServerErrorPB_Code_Name(resp_.error().code());
1305
1
    switch (resp_.error().code()) {
1306
0
      case TabletServerErrorPB::LEADER_NOT_READY_TO_SERVE: FALLTHROUGH_INTENDED;
1307
1
      case TabletServerErrorPB::NOT_THE_LEADER:
1308
1
        TransitionToWaitingState(MonitoredTaskState::kRunning);
1309
1
        break;
1310
0
      default:
1311
0
        TransitionToCompleteState();
1312
0
        break;
1313
1
    }
1314
80
  } else {
1315
80
    TransitionToCompleteState();
1316
80
  }
1317
81
}
1318
1319
81
bool AsyncRemoveTableFromTablet::SendRequest(int attempt) {
1320
81
  ts_admin_proxy_->RemoveTableFromTabletAsync(req_, &resp_, &rpc_, BindRpcCallback());
1321
81
  
VLOG_WITH_PREFIX0
(1) << "Send RemoveTableFromTablet request (attempt " << attempt << "):\n"
1322
0
                      << req_.DebugString();
1323
81
  return true;
1324
81
}
1325
1326
namespace {
1327
1328
1
bool IsDefinitelyPermanentError(const Status& s) {
1329
1
  return s.IsInvalidArgument() || s.IsNotFound();
1330
1
}
1331
1332
} // namespace
1333
1334
// ============================================================================
1335
//  Class AsyncGetTabletSplitKey.
1336
// ============================================================================
1337
AsyncGetTabletSplitKey::AsyncGetTabletSplitKey(
1338
    Master* master, ThreadPool* callback_pool, const scoped_refptr<TabletInfo>& tablet,
1339
    DataCallbackType result_cb)
1340
143
    : AsyncTabletLeaderTask(master, callback_pool, tablet), result_cb_(result_cb) {
1341
143
  req_.set_tablet_id(tablet_id());
1342
143
}
1343
1344
143
void AsyncGetTabletSplitKey::HandleResponse(int attempt) {
1345
143
  if (resp_.has_error()) {
1346
1
    const Status s = StatusFromPB(resp_.error().status());
1347
1
    const TabletServerErrorPB::Code code = resp_.error().code();
1348
1
    LOG_WITH_PREFIX(WARNING) << "TS " << permanent_uuid() << ": GetSplitKey (attempt " << attempt
1349
1
                             << ") failed for tablet " << tablet_id() << " with error code "
1350
1
                             << TabletServerErrorPB::Code_Name(code) << ": " << s;
1351
1
    if (IsDefinitelyPermanentError(s) || s.IsIllegalState()) {
1352
      // It can happen that tablet leader has completed post-split compaction after previous split,
1353
      // but followers have not yet completed post-split compaction.
1354
      // Catalog manager decides to split again and sends GetTabletSplitKey RPC, but tablet leader
1355
      // changes due to some reason and new tablet leader is not yet compacted.
1356
      // In this case we get IllegalState error and we don't want to retry until post-split
1357
      // compaction happened on leader. Once post-split compaction is done, CatalogManager will
1358
      // resend RPC.
1359
      //
1360
      // Another case for IsIllegalState is trying to split a tablet that has all the data with
1361
      // the same hash_code or the same doc_key, in this case we also don't want to retry RPC
1362
      // automatically.
1363
      // See https://github.com/yugabyte/yugabyte-db/issues/9159.
1364
0
      TransitionToFailedState(state(), s);
1365
0
    }
1366
142
  } else {
1367
142
    
VLOG_WITH_PREFIX0
(1)
1368
0
        << "TS " << permanent_uuid() << ": got split key for tablet " << tablet_id();
1369
142
    TransitionToCompleteState();
1370
142
  }
1371
1372
143
  server::UpdateClock(resp_, master_->clock());
1373
143
}
1374
1375
143
bool AsyncGetTabletSplitKey::SendRequest(int attempt) {
1376
143
  req_.set_propagated_hybrid_time(master_->clock()->Now().ToUint64());
1377
143
  ts_proxy_->GetSplitKeyAsync(req_, &resp_, &rpc_, BindRpcCallback());
1378
143
  
VLOG_WITH_PREFIX0
(1)
1379
0
      << "Sent get split key request to " << permanent_uuid() << " (attempt " << attempt << "):\n"
1380
0
      << req_.DebugString();
1381
143
  return true;
1382
143
}
1383
1384
143
void AsyncGetTabletSplitKey::Finished(const Status& status) {
1385
143
  if (result_cb_) {
1386
143
    if (status.ok()) {
1387
142
      result_cb_(Data{resp_.split_encoded_key(), resp_.split_partition_key()});
1388
142
    } else {
1389
1
      result_cb_(status);
1390
1
    }
1391
143
  }
1392
143
}
1393
1394
// ============================================================================
1395
//  Class AsyncSplitTablet.
1396
// ============================================================================
1397
AsyncSplitTablet::AsyncSplitTablet(
1398
    Master* master, ThreadPool* callback_pool, const scoped_refptr<TabletInfo>& tablet,
1399
    const std::array<TabletId, kNumSplitParts>& new_tablet_ids,
1400
    const std::string& split_encoded_key, const std::string& split_partition_key,
1401
    TabletSplitCompleteHandlerIf* tablet_split_complete_handler)
1402
    : AsyncTabletLeaderTask(master, callback_pool, tablet),
1403
140
      tablet_split_complete_handler_(tablet_split_complete_handler) {
1404
140
  req_.set_tablet_id(tablet_id());
1405
140
  req_.set_new_tablet1_id(new_tablet_ids[0]);
1406
140
  req_.set_new_tablet2_id(new_tablet_ids[1]);
1407
140
  req_.set_split_encoded_key(split_encoded_key);
1408
140
  req_.set_split_partition_key(split_partition_key);
1409
140
}
1410
1411
140
void AsyncSplitTablet::HandleResponse(int attempt) {
1412
140
  if (resp_.has_error()) {
1413
96
    const Status s = StatusFromPB(resp_.error().status());
1414
96
    const TabletServerErrorPB::Code code = resp_.error().code();
1415
96
    LOG_WITH_PREFIX(WARNING) << "TS " << permanent_uuid() << ": split (attempt " << attempt
1416
96
                             << ") failed for tablet " << tablet_id() << " with error code "
1417
96
                             << TabletServerErrorPB::Code_Name(code) << ": " << s;
1418
96
    if (s.IsAlreadyPresent()) {
1419
96
      TransitionToCompleteState();
1420
96
    } else 
if (0
IsDefinitelyPermanentError(s)0
) {
1421
0
      TransitionToFailedState(state(), s);
1422
0
    }
1423
96
  } else {
1424
44
    
VLOG_WITH_PREFIX0
(1)
1425
0
        << "TS " << permanent_uuid() << ": split complete on tablet " << tablet_id();
1426
44
    TransitionToCompleteState();
1427
44
  }
1428
1429
140
  server::UpdateClock(resp_, master_->clock());
1430
140
}
1431
1432
140
bool AsyncSplitTablet::SendRequest(int attempt) {
1433
140
  req_.set_dest_uuid(permanent_uuid());
1434
140
  req_.set_propagated_hybrid_time(master_->clock()->Now().ToUint64());
1435
140
  ts_admin_proxy_->SplitTabletAsync(req_, &resp_, &rpc_, BindRpcCallback());
1436
140
  
VLOG_WITH_PREFIX0
(1)
1437
0
      << "Sent split tablet request to " << permanent_uuid() << " (attempt " << attempt << "):\n"
1438
0
      << req_.DebugString();
1439
140
  return true;
1440
140
}
1441
1442
140
void AsyncSplitTablet::Finished(const Status& status) {
1443
140
  if (tablet_split_complete_handler_) {
1444
140
    SplitTabletIds split_tablet_ids {
1445
140
      .source = req_.tablet_id(),
1446
140
      .children = {req_.new_tablet1_id(), req_.new_tablet2_id()}
1447
140
    };
1448
140
    tablet_split_complete_handler_->ProcessSplitTabletResult(
1449
140
        status, table_->id(), split_tablet_ids);
1450
140
  }
1451
140
}
1452
1453
}  // namespace master
1454
}  // namespace yb