YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/master/cluster_balance.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include "yb/master/cluster_balance.h"
15
16
#include <algorithm>
17
#include <memory>
18
#include <utility>
19
20
#include <boost/algorithm/string/join.hpp>
21
#include <boost/optional/optional.hpp>
22
23
#include "yb/common/common.pb.h"
24
25
#include "yb/consensus/quorum_util.h"
26
27
#include "yb/gutil/casts.h"
28
29
#include "yb/master/catalog_manager_util.h"
30
31
#include "yb/master/master_fwd.h"
32
#include "yb/master/master.h"
33
#include "yb/master/master_error.h"
34
35
#include "yb/util/flag_tags.h"
36
#include "yb/util/status.h"
37
#include "yb/util/status_format.h"
38
#include "yb/util/status_log.h"
39
40
DEFINE_bool(enable_load_balancing,
41
            true,
42
            "Choose whether to enable the load balancing algorithm, to move tablets around.");
43
44
DEFINE_bool(transaction_tables_use_preferred_zones,
45
            false,
46
            "Choose whether transaction tablet leaders respect preferred zones.");
47
48
DEFINE_bool(enable_global_load_balancing,
49
            true,
50
            "Choose whether to allow the load balancer to make moves that strictly only balance "
51
            "global load. Note that global balancing only occurs after all tables are balanced.");
52
53
DEFINE_int32(leader_balance_threshold,
54
             0,
55
             "Number of leaders per each tablet server to balance below. If this is configured to "
56
                 "0 (the default), the leaders will be balanced optimally at extra cost.");
57
58
DEFINE_int32(leader_balance_unresponsive_timeout_ms,
59
             3 * 1000,
60
             "The period of time that a master can go without receiving a heartbeat from a "
61
                 "tablet server before considering it unresponsive. Unresponsive servers are "
62
                 "excluded from leader balancing.");
63
64
DEFINE_int32(load_balancer_max_concurrent_tablet_remote_bootstraps,
65
             10,
66
             "Maximum number of tablets being remote bootstrapped across the cluster.");
67
68
DEFINE_int32(load_balancer_max_concurrent_tablet_remote_bootstraps_per_table,
69
             2,
70
             "Maximum number of tablets being remote bootstrapped for any table. The maximum "
71
             "number of remote bootstraps across the cluster is still limited by the flag "
72
             "load_balancer_max_concurrent_tablet_remote_bootstraps. This flag is meant to prevent "
73
             "a single table use all the available remote bootstrap sessions and starving other "
74
             "tables.");
75
76
DEFINE_int32(load_balancer_max_over_replicated_tablets,
77
             1,
78
             "Maximum number of running tablet replicas that are allowed to be over the configured "
79
             "replication factor.");
80
81
DEFINE_int32(load_balancer_max_concurrent_adds,
82
             1,
83
             "Maximum number of tablet peer replicas to add in any one run of the load balancer.");
84
85
DEFINE_int32(load_balancer_max_concurrent_removals,
86
             1,
87
             "Maximum number of over-replicated tablet peer removals to do in any one run of the "
88
             "load balancer.");
89
90
DEFINE_int32(load_balancer_max_concurrent_moves,
91
             2,
92
             "Maximum number of tablet leaders on tablet servers (across the cluster) to move in "
93
             "any one run of the load balancer.");
94
95
DEFINE_int32(load_balancer_max_concurrent_moves_per_table,
96
             1,
97
             "Maximum number of tablet leaders per table to move in any one run of the load "
98
             "balancer. The maximum number of tablet leader moves across the cluster is still "
99
             "limited by the flag load_balancer_max_concurrent_moves. This flag is meant to "
100
             "prevent a single table from using all of the leader moves quota and starving "
101
             "other tables.");
102
103
DEFINE_int32(load_balancer_num_idle_runs,
104
             5,
105
             "Number of idle runs of load balancer to deem it idle.");
106
107
DEFINE_test_flag(bool, load_balancer_handle_under_replicated_tablets_only, false,
108
                 "Limit the functionality of the load balancer during tests so tests can make "
109
                 "progress");
110
111
// No longer used because leader stepdown is not as slow as it used to be.)
112
DEFINE_bool(load_balancer_skip_leader_as_remove_victim, false,
113
            "DEPRECATED. Should the LB skip a leader as a possible remove candidate.");
114
115
DEFINE_bool(allow_leader_balancing_dead_node, true,
116
            "When a tserver is marked as dead, do we continue leader balancing for tables that "
117
            "have a replica on this tserver");
118
119
DEFINE_test_flag(int32, load_balancer_wait_after_count_pending_tasks_ms, 0,
120
                 "For testing purposes, number of milliseconds to wait after counting and "
121
                 "finding pending tasks.");
122
123
DECLARE_int32(min_leader_stepdown_retry_interval_ms);
124
DECLARE_bool(enable_ysql_tablespaces_for_placement);
125
126
DEFINE_bool(load_balancer_count_move_as_add, true,
127
            "Should we enable state change to count add server triggered by load move as just an "
128
            "add instead of both an add and remove.");
129
130
DEFINE_bool(load_balancer_drive_aware, true,
131
            "When LB decides to move a tablet from server A to B, on the target LB "
132
            "should select the tablet to move from most loaded drive.");
133
134
DEFINE_bool(load_balancer_ignore_cloud_info_similarity, false,
135
            "If true, ignore the similarity between cloud infos when deciding which tablet "
136
            "to move.");
137
138
// TODO(tsplit): make false by default or even remove flag after
139
// https://github.com/yugabyte/yugabyte-db/issues/10301 is fixed.
140
DEFINE_test_flag(
141
    bool, load_balancer_skip_inactive_tablets, true, "Don't move inactive (hidden) tablets");
142
143
namespace yb {
144
namespace master {
145
146
using std::unique_ptr;
147
using std::make_unique;
148
using std::string;
149
using std::set;
150
using std::vector;
151
using strings::Substitute;
152
153
namespace {
154
155
vector<set<TabletId>> GetTabletsOnTSToMove(bool drive_aware,
156
19.0k
                                         const CBTabletServerMetadata& from_ts_meta) {
157
19.0k
  vector<set<TabletId>> all_tablets;
158
19.0k
  if (drive_aware) {
159
41.6k
    for (const auto& path : from_ts_meta.sorted_path_load_by_tablets_count) {
160
41.6k
      auto path_list = from_ts_meta.path_to_tablets.find(path);
161
41.6k
      if (path_list == from_ts_meta.path_to_tablets.end()) {
162
0
        LOG(INFO) << "Found uninitialized path: " << path;
163
0
        continue;
164
0
      }
165
41.6k
      all_tablets.push_back(path_list->second);
166
41.6k
    }
167
0
  } else {
168
0
    all_tablets.push_back(from_ts_meta.running_tablets);
169
0
  }
170
19.0k
  return all_tablets;
171
19.0k
}
172
173
// Returns sorted list of pair tablet id and path on to_ts.
174
std::vector<std::pair<TabletId, std::string>> GetLeadersOnTSToMove(
175
22.4k
    bool drive_aware, const set<TabletId>& leaders, const CBTabletServerMetadata& to_ts_meta) {
176
22.4k
  std::vector<std::pair<TabletId, std::string>> peers;
177
22.4k
  if (drive_aware) {
178
26.5k
    for (const auto& path : to_ts_meta.sorted_path_load_by_leader_count) {
179
26.5k
      auto path_list = to_ts_meta.path_to_tablets.find(path);
180
26.5k
      if (path_list == to_ts_meta.path_to_tablets.end()) {
181
        // No tablets on this path, so skip it.
182
17.6k
        continue;
183
17.6k
      }
184
8.87k
      transform(path_list->second.begin(), path_list->second.end(), std::back_inserter(peers),
185
70.9k
                [&path_list](const TabletId& tablet_id) -> std::pair<TabletId, std::string> {
186
70.9k
                  return make_pair(tablet_id, path_list->first);
187
70.9k
                 });
188
8.87k
    }
189
0
  } else {
190
0
    transform(to_ts_meta.running_tablets.begin(), to_ts_meta.running_tablets.end(),
191
0
              std::back_inserter(peers),
192
0
              [](const TabletId& tablet_id) -> std::pair<TabletId, std::string> {
193
0
                return make_pair(tablet_id, "");
194
0
               });
195
0
  }
196
22.4k
  std::vector<std::pair<TabletId, std::string>> intersection;
197
22.4k
  copy_if(peers.begin(), peers.end(), std::back_inserter(intersection),
198
70.9k
          [&leaders](const std::pair<TabletId, std::string>& tablet) {
199
70.9k
            return leaders.count(tablet.first) > 0;
200
70.9k
          });
201
22.4k
  return intersection;
202
22.4k
}
203
204
} // namespace
205
206
Result<ReplicationInfoPB> ClusterLoadBalancer::GetTableReplicationInfo(
207
120k
    const scoped_refptr<TableInfo>& table) const {
208
209
  // Return custom placement policy if it exists.
210
120k
  {
211
120k
    auto l = table->LockForRead();
212
120k
    if (l->pb.has_replication_info()) {
213
349
      return l->pb.replication_info();
214
349
    }
215
120k
  }
216
217
  // Custom placement policy does not exist. Check whether this table
218
  // has a tablespace associated with it, if so, return the placement info
219
  // for that tablespace.
220
120k
  auto replication_info = VERIFY_RESULT(tablespace_manager_->GetTableReplicationInfo(table));
221
120k
  if (replication_info) {
222
0
    return replication_info.value();
223
0
  }
224
225
  // No custom policy or tablespace specified for table.
226
120k
  return GetClusterReplicationInfo();
227
120k
}
228
229
66.3k
void ClusterLoadBalancer::InitTablespaceManager() {
230
66.3k
  tablespace_manager_ = catalog_manager_->GetTablespaceManager();
231
66.3k
}
232
233
121k
Status ClusterLoadBalancer::PopulatePlacementInfo(TabletInfo* tablet, PlacementInfoPB* pb) {
234
121k
  if (state_->options_->type == LIVE) {
235
120k
    const auto& replication_info = VERIFY_RESULT(GetTableReplicationInfo(tablet->table()));
236
120k
    pb->CopyFrom(replication_info.live_replicas());
237
120k
    return Status::OK();
238
843
  }
239
843
  auto l = tablet->table()->LockForRead();
240
843
  if (state_->options_->type == READ_ONLY &&
241
843
      l->pb.has_replication_info() &&
242
0
      !l->pb.replication_info().read_replicas().empty()) {
243
0
    pb->CopyFrom(GetReadOnlyPlacementFromUuid(l->pb.replication_info()));
244
843
  } else {
245
843
    pb->CopyFrom(GetClusterPlacementInfo());
246
843
  }
247
843
  return Status::OK();
248
843
}
249
250
967k
Status ClusterLoadBalancer::UpdateTabletInfo(TabletInfo* tablet) {
251
967k
  const auto& table_id = tablet->table()->id();
252
  // Set the placement information on a per-table basis, only once.
253
967k
  if (!state_->placement_by_table_.count(table_id)) {
254
121k
    PlacementInfoPB pb;
255
121k
    {
256
121k
      RETURN_NOT_OK(PopulatePlacementInfo(tablet, &pb));
257
121k
    }
258
121k
    state_->placement_by_table_[table_id] = std::move(pb);
259
121k
  }
260
261
967k
  return state_->UpdateTablet(tablet);
262
967k
}
263
264
310k
const PlacementInfoPB& ClusterLoadBalancer::GetPlacementByTablet(const TabletId& tablet_id) const {
265
310k
  const auto& table_id = GetTabletMap().at(tablet_id)->table()->id();
266
310k
  return state_->placement_by_table_.at(table_id);
267
310k
}
268
269
0
size_t ClusterLoadBalancer::get_total_wrong_placement() const {
270
0
  return state_->tablets_wrong_placement_.size();
271
0
}
272
273
0
size_t ClusterLoadBalancer::get_total_blacklisted_servers() const {
274
0
  return state_->blacklisted_servers_.size();
275
0
}
276
277
0
size_t ClusterLoadBalancer::get_total_leader_blacklisted_servers() const {
278
0
  return state_->leader_blacklisted_servers_.size();
279
0
}
280
281
121k
size_t ClusterLoadBalancer::get_total_over_replication() const {
282
121k
  return state_->tablets_over_replicated_.size();
283
121k
}
284
285
0
size_t ClusterLoadBalancer::get_total_under_replication() const {
286
0
  return state_->tablets_missing_replicas_.size();
287
0
}
288
289
6
size_t ClusterLoadBalancer::get_total_starting_tablets() const {
290
6
  return global_state_->total_starting_tablets_;
291
6
}
292
293
6
int ClusterLoadBalancer::get_total_running_tablets() const {
294
6
  return state_->total_running_;
295
6
}
296
297
80.8k
bool ClusterLoadBalancer::IsLoadBalancerEnabled() const {
298
80.8k
  return FLAGS_enable_load_balancing && is_enabled_;
299
80.8k
}
300
301
// Load balancer class.
302
ClusterLoadBalancer::ClusterLoadBalancer(CatalogManager* cm)
303
    : random_(GetRandomSeed32()),
304
      is_enabled_(FLAGS_enable_load_balancing),
305
5.46k
      cbuf_activities_(FLAGS_load_balancer_num_idle_runs) {
306
5.46k
  ResetGlobalState(false /* initialize_ts_descs */);
307
308
5.46k
  catalog_manager_ = cm;
309
5.46k
}
310
311
// Reduce remaining_tasks by pending_tasks value, after sanitizing inputs.
312
template <class T>
313
320k
void set_remaining(T pending_tasks, T* remaining_tasks) {
314
320k
  if (pending_tasks > *remaining_tasks) {
315
0
    LOG(WARNING) << "Pending tasks > max allowed tasks: " << pending_tasks << " > "
316
0
                 << *remaining_tasks;
317
0
    *remaining_tasks = 0;
318
320k
  } else {
319
320k
    *remaining_tasks -= pending_tasks;
320
320k
  }
321
320k
}
_ZN2yb6master13set_remainingIiEEvT_PS2_
Line
Count
Source
313
199k
void set_remaining(T pending_tasks, T* remaining_tasks) {
314
199k
  if (pending_tasks > *remaining_tasks) {
315
0
    LOG(WARNING) << "Pending tasks > max allowed tasks: " << pending_tasks << " > "
316
0
                 << *remaining_tasks;
317
0
    *remaining_tasks = 0;
318
199k
  } else {
319
199k
    *remaining_tasks -= pending_tasks;
320
199k
  }
321
199k
}
_ZN2yb6master13set_remainingImEEvT_PS2_
Line
Count
Source
313
121k
void set_remaining(T pending_tasks, T* remaining_tasks) {
314
121k
  if (pending_tasks > *remaining_tasks) {
315
0
    LOG(WARNING) << "Pending tasks > max allowed tasks: " << pending_tasks << " > "
316
0
                 << *remaining_tasks;
317
0
    *remaining_tasks = 0;
318
121k
  } else {
319
121k
    *remaining_tasks -= pending_tasks;
320
121k
  }
321
121k
}
322
323
// Needed as we have a unique_ptr to the forward declared PerTableLoadState class.
324
94
ClusterLoadBalancer::~ClusterLoadBalancer() = default;
325
326
78.4k
void ClusterLoadBalancer::RunLoadBalancerWithOptions(Options* options) {
327
78.4k
  ResetGlobalState();
328
329
78.4k
  uint32_t master_errors = 0;
330
331
78.4k
  if (!IsLoadBalancerEnabled()) {
332
12.0k
    LOG(INFO) << "Load balancing is not enabled.";
333
12.0k
    return;
334
12.0k
  }
335
336
66.3k
  if (!FLAGS_transaction_tables_use_preferred_zones) {
337
0
    VLOG(1) << "Transaction tables will not respect leadership affinity.";
338
66.3k
  }
339
340
66.3k
  std::unique_ptr<Options> options_unique_ptr;
341
66.3k
  if (options == nullptr) {
342
0
    options_unique_ptr = std::make_unique<Options>();
343
0
    options = options_unique_ptr.get();
344
0
  }
345
346
66.3k
  InitTablespaceManager();
347
348
  // Lock the CatalogManager maps for the duration of the load balancer run.
349
66.3k
  CatalogManager::SharedLock lock(catalog_manager_->mutex_);
350
351
66.3k
  int remaining_adds = options->kMaxConcurrentAdds;
352
66.3k
  int remaining_removals = options->kMaxConcurrentRemovals;
353
66.3k
  int remaining_leader_moves = options->kMaxConcurrentLeaderMoves;
354
355
  // Loop over all tables to get the count of pending tasks.
356
66.3k
  int pending_add_replica_tasks = 0;
357
66.3k
  int pending_remove_replica_tasks = 0;
358
66.3k
  int pending_stepdown_leader_tasks = 0;
359
360
8.64M
  for (const auto& table : GetTableMap()) {
361
8.64M
    if (SkipLoadBalancing(*table.second)) {
362
      // Populate the list of tables for which LB has been skipped
363
      // in LB's internal vector.
364
8.52M
      skipped_tables_per_run_.push_back(table.second);
365
8.52M
      continue;
366
8.52M
    }
367
121k
    const TableId& table_id = table.first;
368
121k
    if (tablespace_manager_->NeedsRefreshToFindTablePlacement(table.second)) {
369
      // Placement information was not present in catalog manager cache. This is probably a
370
      // recently created table, skip load balancing for now, hopefully by the next run,
371
      // the background task in the catalog manager will pick up the placement information
372
      // for this table from the PG catalog tables.
373
      // TODO(deepthi) Keep track of the number of times this happens, take appropriate action
374
      // if placement stays missing over period of time.
375
1
      YB_LOG_EVERY_N(INFO, 10) << "Skipping load balancing for table " << table.second->name()
376
1
                               << " as its placement information is not available yet";
377
1
      master_errors++;
378
1
      continue;
379
1
    }
380
121k
    ResetTableStatePtr(table_id, options);
381
382
121k
    bool is_txn_table = table.second->GetTableType() == TRANSACTION_STATUS_TABLE_TYPE;
383
121k
    state_->use_preferred_zones_ = !is_txn_table || FLAGS_transaction_tables_use_preferred_zones;
384
121k
    InitializeTSDescriptors();
385
386
121k
    Status s = CountPendingTasksUnlocked(table_id,
387
121k
                                         &pending_add_replica_tasks,
388
121k
                                         &pending_remove_replica_tasks,
389
121k
                                         &pending_stepdown_leader_tasks);
390
121k
    if (!s.ok()) {
391
      // Found uninitialized ts_meta, so don't load balance this table yet.
392
0
      LOG(WARNING) << "Skipping load balancing " << table.first << ": " << StatusToString(s);
393
0
      per_table_states_.erase(table_id);
394
0
      master_errors++;
395
0
      continue;
396
0
    }
397
121k
  }
398
399
66.3k
  if (pending_add_replica_tasks + pending_remove_replica_tasks + pending_stepdown_leader_tasks> 0) {
400
573
    LOG(INFO) << "Total pending adds=" << pending_add_replica_tasks << ", total pending removals="
401
573
              << pending_remove_replica_tasks << ", total pending leader stepdowns="
402
573
              << pending_stepdown_leader_tasks;
403
573
    if (PREDICT_FALSE(FLAGS_TEST_load_balancer_wait_after_count_pending_tasks_ms > 0)) {
404
0
      LOG(INFO) << "Sleeping after finding pending tasks for "
405
0
                << FLAGS_TEST_load_balancer_wait_after_count_pending_tasks_ms << " ms";
406
0
      SleepFor(
407
0
          MonoDelta::FromMilliseconds(FLAGS_TEST_load_balancer_wait_after_count_pending_tasks_ms));
408
0
    }
409
573
  }
410
411
66.3k
  set_remaining(pending_add_replica_tasks, &remaining_adds);
412
66.3k
  set_remaining(pending_remove_replica_tasks, &remaining_removals);
413
66.3k
  set_remaining(pending_stepdown_leader_tasks, &remaining_leader_moves);
414
415
  // At the start of the run, report LB state that might prevent it from running smoothly.
416
66.3k
  ReportUnusualLoadBalancerState();
417
418
  // Loop over all tables to analyze the global and per-table load.
419
8.64M
  for (const auto& table : GetTableMap()) {
420
8.64M
    if (SkipLoadBalancing(*table.second)) {
421
8.52M
      continue;
422
8.52M
    }
423
424
121k
    auto it = per_table_states_.find(table.first);
425
121k
    if (it == per_table_states_.end()) {
426
      // If the table state doesn't exist, it was not fully initialized in the previous iteration.
427
0
      VLOG(1) << "Unable to find the state for table " << table.first;
428
1
      continue;
429
1
    }
430
121k
    state_ = it->second.get();
431
432
    // Prepare the in-memory structures.
433
121k
    auto handle_analyze_tablets = AnalyzeTabletsUnlocked(table.first);
434
121k
    if (!handle_analyze_tablets.ok()) {
435
59
      LOG(WARNING) << "Skipping load balancing " << table.first << ": "
436
59
                   << StatusToString(handle_analyze_tablets);
437
59
      per_table_states_.erase(table.first);
438
59
      master_errors++;
439
59
    }
440
121k
  }
441
442
1
  VLOG(1) << "Number of remote bootstraps before running load balancer: "
443
1
          << global_state_->total_starting_tablets_;
444
445
  // Iterate over all the tables to take actions based on the data collected on the previous loop.
446
8.64M
  for (const auto& table : GetTableMap()) {
447
8.64M
    state_ = nullptr;
448
8.64M
    if (remaining_adds == 0 && remaining_removals == 0 && remaining_leader_moves == 0) {
449
0
      break;
450
0
    }
451
8.64M
    if (SkipLoadBalancing(*table.second)) {
452
8.52M
      continue;
453
8.52M
    }
454
455
121k
    auto it = per_table_states_.find(table.first);
456
121k
    if (it == per_table_states_.end()) {
457
      // If the table state doesn't exist, it didn't get analyzed by the previous iteration.
458
0
      VLOG(1) << "Unable to find table state for table " << table.first
459
0
              << ". Skipping load balancing execution";
460
60
      continue;
461
121k
    } else {
462
0
      VLOG(5) << "Load balancing table " << table.first;
463
121k
    }
464
121k
    state_ = it->second.get();
465
466
    // We may have modified global loads, so we need to reset this state's load.
467
121k
    state_->SortLoad();
468
469
    // Output parameters are unused in the load balancer, but useful in testing.
470
121k
    TabletId out_tablet_id;
471
121k
    TabletServerId out_from_ts;
472
121k
    TabletServerId out_to_ts;
473
474
    // Handle adding and moving replicas.
475
122k
    for ( ; remaining_adds > 0; --remaining_adds) {
476
121k
      if (state_->allow_only_leader_balancing_) {
477
975
        YB_LOG_EVERY_N_SECS(INFO, 30) << "Skipping Add replicas. Only leader balancing table "
478
38
                                      << table.first;
479
975
        break;
480
975
      }
481
120k
      auto handle_add = HandleAddReplicas(&out_tablet_id, &out_from_ts, &out_to_ts);
482
120k
      if (!handle_add.ok()) {
483
1.28k
        LOG(WARNING) << "Skipping add replicas for " << table.first << ": "
484
1.28k
                     << StatusToString(handle_add);
485
1.28k
        master_errors++;
486
1.28k
        break;
487
1.28k
      }
488
119k
      if (!*handle_add) {
489
118k
        break;
490
118k
      }
491
119k
    }
492
121k
    if (PREDICT_FALSE(FLAGS_TEST_load_balancer_handle_under_replicated_tablets_only)) {
493
210
      LOG(INFO) << "Skipping remove replicas and leader moves for " << table.first;
494
210
      continue;
495
210
    }
496
497
    // Handle cleanup after over-replication.
498
122k
    for ( ; remaining_removals > 0; --remaining_removals) {
499
121k
      if (state_->allow_only_leader_balancing_) {
500
686
        YB_LOG_EVERY_N_SECS(INFO, 30) << "Skipping remove replicas. Only leader balancing table "
501
27
                                      << table.first;
502
686
        break;
503
686
      }
504
120k
      auto handle_remove = HandleRemoveReplicas(&out_tablet_id, &out_from_ts);
505
120k
      if (!handle_remove.ok()) {
506
0
        LOG(WARNING) << "Skipping remove replicas for " << table.first << ": "
507
0
                     << StatusToString(handle_remove);
508
0
        master_errors++;
509
0
        break;
510
0
      }
511
120k
      if (!*handle_remove) {
512
119k
        break;
513
119k
      }
514
120k
    }
515
516
    // Handle tablet servers with too many leaders.
517
    // Check the current pending tasks per table to ensure we don't trigger the same task.
518
121k
    size_t table_remaining_leader_moves = state_->options_->kMaxConcurrentLeaderMovesPerTable;
519
121k
    set_remaining(state_->pending_stepdown_leader_tasks_[table.first].size(),
520
121k
                  &table_remaining_leader_moves);
521
    // Keep track of both the global and per table limit on number of moves.
522
121k
    for ( ;
523
126k
         remaining_leader_moves > 0 && table_remaining_leader_moves > 0;
524
120k
         --remaining_leader_moves, --table_remaining_leader_moves) {
525
120k
      auto handle_leader = HandleLeaderMoves(&out_tablet_id, &out_from_ts, &out_to_ts);
526
120k
      if (!handle_leader.ok()) {
527
1
        LOG(WARNING) << "Skipping leader moves for " << table.first << ": "
528
1
                     << StatusToString(handle_leader);
529
1
        master_errors++;
530
1
        break;
531
1
      }
532
120k
      if (!*handle_leader) {
533
114k
        break;
534
114k
      }
535
120k
    }
536
121k
  }
537
538
66.3k
  RecordActivity(master_errors);
539
66.3k
}
540
541
77.4k
void ClusterLoadBalancer::RunLoadBalancer(Options* options) {
542
77.4k
  SysClusterConfigEntryPB config;
543
77.4k
  CHECK_OK(catalog_manager_->GetClusterConfig(&config));
544
545
77.4k
  std::unique_ptr<Options> options_unique_ptr =
546
77.4k
      std::make_unique<Options>();
547
77.4k
  Options* options_ent = options_unique_ptr.get();
548
  // First, we load balance the live cluster.
549
77.4k
  options_ent->type = LIVE;
550
77.4k
  if (config.replication_info().live_replicas().has_placement_uuid()) {
551
1.03k
    options_ent->placement_uuid = config.replication_info().live_replicas().placement_uuid();
552
1.03k
    options_ent->live_placement_uuid = options_ent->placement_uuid;
553
76.4k
  } else {
554
76.4k
    options_ent->placement_uuid = "";
555
76.4k
    options_ent->live_placement_uuid = "";
556
76.4k
  }
557
77.4k
  RunLoadBalancerWithOptions(options_ent);
558
559
  // Then, we balance all read-only clusters.
560
77.4k
  options_ent->type = READ_ONLY;
561
78.4k
  for (int i = 0; i < config.replication_info().read_replicas_size(); i++) {
562
962
    const PlacementInfoPB& read_only_cluster = config.replication_info().read_replicas(i);
563
962
    options_ent->placement_uuid = read_only_cluster.placement_uuid();
564
962
    RunLoadBalancerWithOptions(options_ent);
565
962
  }
566
77.4k
}
567
568
66.3k
void ClusterLoadBalancer::RecordActivity(uint32_t master_errors) {
569
  // Update the list of tables for whom load-balancing has been
570
  // skipped in this run.
571
66.3k
  {
572
66.3k
    std::lock_guard<decltype(mutex_)> l(mutex_);
573
66.3k
    skipped_tables_ = skipped_tables_per_run_;
574
66.3k
  }
575
576
66.3k
  uint32_t table_tasks = 0;
577
8.64M
  for (const auto& table : GetTableMap()) {
578
8.64M
    table_tasks += table.second->NumLBTasks();
579
8.64M
  }
580
581
66.3k
  struct ActivityInfo ai {table_tasks, master_errors};
582
583
  // Update circular buffer summary.
584
585
66.3k
  if (ai.IsIdle()) {
586
60.1k
    num_idle_runs_++;
587
6.24k
  } else {
588
0
    VLOG(1) <<
589
0
      Substitute("Load balancer has $0 table tasks and $1 master errors",
590
0
          table_tasks, master_errors);
591
6.24k
  }
592
593
66.3k
  if (cbuf_activities_.full()) {
594
57.6k
    if (cbuf_activities_.front().IsIdle()) {
595
51.8k
      num_idle_runs_--;
596
51.8k
    }
597
57.6k
  }
598
599
  // Mutate circular buffer.
600
66.3k
  cbuf_activities_.push_back(std::move(ai));
601
602
  // Update state.
603
66.3k
  is_idle_.store(num_idle_runs_ == cbuf_activities_.size(), std::memory_order_release);
604
605
  // Two interesting cases when updating can_perform_global_operations_ state:
606
  // If we previously couldn't balance global load, but now the LB is idle, enable global balancing.
607
  // If we previously could balance global load, but now the LB is busy, then it is busy balancing
608
  // global load or doing other operations (remove, etc.). In this case, we keep global balancing
609
  // enabled up until we perform a non-global balancing move (see GetLoadToMove()).
610
  // TODO(julien) some small improvements can be made here, such as ignoring leader stepdown tasks.
611
66.3k
  can_perform_global_operations_ = can_perform_global_operations_ || ai.IsIdle();
612
66.3k
}
613
614
2.45k
Status ClusterLoadBalancer::IsIdle() const {
615
2.45k
  if (IsLoadBalancerEnabled() && !is_idle_.load(std::memory_order_acquire)) {
616
1.90k
    return STATUS(
617
1.90k
        IllegalState,
618
1.90k
        "Task or error encountered recently.",
619
1.90k
        MasterError(MasterErrorPB::LOAD_BALANCER_RECENTLY_ACTIVE));
620
1.90k
  }
621
622
551
  return Status::OK();
623
551
}
624
625
22.4k
bool ClusterLoadBalancer::CanBalanceGlobalLoad() const {
626
22.4k
  return FLAGS_enable_global_load_balancing && can_perform_global_operations_;
627
22.4k
}
628
629
66.3k
void ClusterLoadBalancer::ReportUnusualLoadBalancerState() const {
630
195k
  for (const auto& ts_desc : global_state_->ts_descs_) {
631
    // Report if any ts has a pending delete.
632
195k
    if (ts_desc->HasTabletDeletePending()) {
633
7.51k
      LOG(INFO) << Format("tablet server $0 has a pending delete for tablets $1",
634
7.51k
                          ts_desc->permanent_uuid(), ts_desc->PendingTabletDeleteToString());
635
7.51k
    }
636
195k
  }
637
66.3k
}
638
639
83.8k
void ClusterLoadBalancer::ResetGlobalState(bool initialize_ts_descs) {
640
83.8k
  per_table_states_.clear();
641
83.8k
  global_state_ = std::make_unique<GlobalLoadState>();
642
83.8k
  global_state_->drive_aware_ = FLAGS_load_balancer_drive_aware;
643
83.8k
  if (initialize_ts_descs) {
644
    // Only call GetAllDescriptors once for a LB run, and then cache it in global_state_.
645
78.4k
    GetAllDescriptors(&global_state_->ts_descs_);
646
78.4k
  }
647
83.8k
  skipped_tables_per_run_.clear();
648
83.8k
}
649
650
121k
void ClusterLoadBalancer::ResetTableStatePtr(const TableId& table_id, Options* options) {
651
121k
  auto table_state = std::make_unique<PerTableLoadState>(global_state_.get());
652
121k
  table_state->options_ = options;
653
121k
  state_ = table_state.get();
654
121k
  per_table_states_[table_id] = std::move(table_state);
655
656
121k
  state_->table_id_ = table_id;
657
121k
}
658
659
121k
Status ClusterLoadBalancer::AnalyzeTabletsUnlocked(const TableId& table_uuid) {
660
121k
  auto tablets = VERIFY_RESULT_PREPEND(
661
121k
      GetTabletsForTable(table_uuid), "Skipping table " + table_uuid + "due to error: ");
662
663
  // Loop over tablet map to register the load that is already live in the cluster.
664
967k
  for (const auto& tablet : tablets) {
665
967k
    bool tablet_running = false;
666
967k
    {
667
967k
      auto tablet_lock = tablet->LockForRead();
668
669
967k
      if (!tablet->table()) {
670
        // Tablet is orphaned or in preparing state, continue.
671
0
        continue;
672
0
      }
673
967k
      tablet_running = tablet_lock->is_running();
674
967k
    }
675
676
    // This is from the perspective of the CatalogManager and the on-disk, persisted
677
    // SysCatalogStatePB. What this means is that this tablet was properly created as part of a
678
    // CreateTable and the information was sent to the initial set of TS and the tablet got to an
679
    // initial running state.
680
    //
681
    // This is different from the individual, per-TS state of the tablet, which can vary based on
682
    // the TS itself. The tablet can be registered as RUNNING, as far as the CatalogManager is
683
    // concerned, but just be underreplicated, and have some TS currently bootstrapping instances
684
    // of the tablet.
685
967k
    if (tablet_running) {
686
967k
      RETURN_NOT_OK(UpdateTabletInfo(tablet.get()));
687
967k
    }
688
967k
  }
689
690
  // After updating the tablets and tablet servers, adjust the configured threshold if it is too
691
  // low for the given configuration.
692
121k
  state_->AdjustLeaderBalanceThreshold();
693
694
  // Once we've analyzed both the tablet server information as well as the tablets, we can sort the
695
  // load and are ready to apply the load balancing rules.
696
121k
  state_->SortLoad();
697
698
  // Since leader load is only needed to rebalance leaders, we keep the sorting separate.
699
121k
  state_->SortLeaderLoad();
700
701
0
  VLOG(1) << Substitute(
702
0
      "Table: $0. Total running tablets: $1. Total overreplication: $2. Total starting tablets: $3."
703
0
      " Wrong placement: $4. BlackListed: $5. Total underreplication: $6, Leader BlackListed: $7",
704
0
      table_uuid, get_total_running_tablets(), get_total_over_replication(),
705
0
      get_total_starting_tablets(), get_total_wrong_placement(), get_total_blacklisted_servers(),
706
0
      get_total_under_replication(), get_total_leader_blacklisted_servers());
707
708
966k
  for (const auto& tablet : tablets) {
709
966k
    const auto& tablet_id = tablet->id();
710
966k
    if (state_->pending_remove_replica_tasks_[table_uuid].count(tablet_id) > 0) {
711
351
      RETURN_NOT_OK(state_->RemoveReplica(
712
351
          tablet_id, state_->pending_remove_replica_tasks_[table_uuid][tablet_id]));
713
351
    }
714
966k
    if (state_->pending_stepdown_leader_tasks_[table_uuid].count(tablet_id) > 0) {
715
90
      const auto& tablet_meta = state_->per_tablet_meta_[tablet_id];
716
90
      const auto& from_ts = tablet_meta.leader_uuid;
717
90
      const auto& to_ts = state_->pending_stepdown_leader_tasks_[table_uuid][tablet_id];
718
90
      RETURN_NOT_OK(state_->MoveLeader(tablet->id(), from_ts, to_ts));
719
90
    }
720
966k
    if (state_->pending_add_replica_tasks_[table_uuid].count(tablet_id) > 0) {
721
180
      RETURN_NOT_OK(state_->AddReplica(tablet->id(),
722
180
                                       state_->pending_add_replica_tasks_[table_uuid][tablet_id]));
723
180
    }
724
966k
  }
725
726
121k
  return Status::OK();
727
121k
}
728
729
Result<bool> ClusterLoadBalancer::HandleAddIfMissingPlacement(
730
119k
    TabletId* out_tablet_id, TabletServerId* out_to_ts) {
731
2.96k
  for (const auto& tablet_id : state_->tablets_missing_replicas_) {
732
2.96k
    const auto& tablet_meta = state_->per_tablet_meta_[tablet_id];
733
2.96k
    const auto& placement_info = GetPlacementByTablet(tablet_id);
734
2.96k
    const auto& missing_placements = tablet_meta.under_replicated_placements;
735
    // Loop through TSs by load to find a TS that matches the placement needed and does not already
736
    // host this tablet.
737
5.33k
    for (const auto& ts_uuid : state_->sorted_load_) {
738
5.33k
      bool can_choose_ts = false;
739
      // If we had no placement information, it means we are just under-replicated, so just check
740
      // that we can use this tablet server.
741
5.33k
      if (placement_info.placement_blocks().empty()) {
742
        // No need to check placement info, as there is none.
743
4.94k
        can_choose_ts = VERIFY_RESULT(state_->CanAddTabletToTabletServer(tablet_id, ts_uuid));
744
389
      } else {
745
        // We added a tablet to the set with missing replicas both if it is under-replicated, and we
746
        // added a placement to the tablet_meta under_replicated_placements if the num replicas in
747
        // that placement is fewer than min_num_replicas. If the under-replicated tablet has a
748
        // placement that is under-replicated and the ts is not in that placement, then that ts
749
        // isn't valid.
750
389
        const auto& ts_meta = state_->per_ts_meta_[ts_uuid];
751
        // Either we have specific placement blocks that are under-replicated, so confirm
752
        // that this TS matches or all the placement blocks have min_num_replicas
753
        // but overall num_replicas is fewer than expected.
754
        // In the latter case, we still need to conform to the placement rules.
755
389
        if (missing_placements.empty() ||
756
315
            tablet_meta.CanAddTSToMissingPlacements(ts_meta.descriptor)) {
757
          // If we don't have any missing placements but are under-replicated then we need to
758
          // validate placement information in order to avoid adding to a wrong placement block.
759
          //
760
          // Do the placement check for both the cases.
761
          // If we have missing placements then this check is a tautology otherwise it matters.
762
189
          can_choose_ts = VERIFY_RESULT(state_->CanAddTabletToTabletServer(tablet_id, ts_uuid,
763
189
                                                                                &placement_info));
764
189
        }
765
389
      }
766
      // If we've passed the checks, then we can choose this TS to add the replica to.
767
5.33k
      if (can_choose_ts) {
768
364
        *out_tablet_id = tablet_id;
769
364
        *out_to_ts = ts_uuid;
770
364
        RETURN_NOT_OK(AddReplica(tablet_id, ts_uuid));
771
364
        state_->tablets_missing_replicas_.erase(tablet_id);
772
364
        return true;
773
364
      }
774
5.33k
    }
775
2.96k
  }
776
118k
  return false;
777
119k
}
778
779
Result<bool> ClusterLoadBalancer::HandleAddIfWrongPlacement(
780
118k
    TabletId* out_tablet_id, TabletServerId* out_from_ts, TabletServerId* out_to_ts) {
781
1.07k
  for (const auto& tablet_id : state_->tablets_wrong_placement_) {
782
    // Skip this tablet, if it is already over-replicated, as it does not need another replica, it
783
    // should just have one removed in the removal step.
784
1.07k
    if (state_->tablets_over_replicated_.count(tablet_id)) {
785
159
      continue;
786
159
    }
787
916
    if (VERIFY_RESULT(state_->CanSelectWrongReplicaToMove(
788
252
            tablet_id, GetPlacementByTablet(tablet_id), out_from_ts, out_to_ts))) {
789
252
      *out_tablet_id = tablet_id;
790
252
      RETURN_NOT_OK(MoveReplica(tablet_id, *out_from_ts, *out_to_ts));
791
252
      return true;
792
252
    }
793
916
  }
794
118k
  return false;
795
118k
}
796
797
Result<bool> ClusterLoadBalancer::HandleAddReplicas(
798
120k
    TabletId* out_tablet_id, TabletServerId* out_from_ts, TabletServerId* out_to_ts) {
799
120k
  if (state_->options_->kAllowLimitStartingTablets) {
800
120k
    if (global_state_->total_starting_tablets_ >= state_->options_->kMaxTabletRemoteBootstraps) {
801
0
      return STATUS_SUBSTITUTE(TryAgain, "Cannot add replicas. Currently remote bootstrapping $0 "
802
0
          "tablets, when our max allowed is $1",
803
0
          global_state_->total_starting_tablets_, state_->options_->kMaxTabletRemoteBootstraps);
804
120k
    } else if (state_->total_starting_ >= state_->options_->kMaxTabletRemoteBootstrapsPerTable) {
805
401
      return STATUS_SUBSTITUTE(TryAgain, "Cannot add replicas. Currently remote bootstrapping $0 "
806
401
          "tablets for table $1, when our max allowed is $2 per table",
807
401
          state_->total_starting_, state_->table_id_,
808
401
          state_->options_->kMaxTabletRemoteBootstrapsPerTable);
809
401
    }
810
120k
  }
811
812
120k
  if (state_->options_->kAllowLimitOverReplicatedTablets &&
813
120k
      get_total_over_replication() >=
814
883
          implicit_cast<size_t>(state_->options_->kMaxOverReplicatedTablets)) {
815
883
    return STATUS_SUBSTITUTE(TryAgain,
816
883
        "Cannot add replicas. Currently have a total overreplication of $0, when max allowed is $1"
817
883
        ", overreplicated tablets: $2",
818
883
        get_total_over_replication(), state_->options_->kMaxOverReplicatedTablets,
819
883
        boost::algorithm::join(state_->tablets_over_replicated_, ", "));
820
883
  }
821
822
0
  VLOG(1) << "Number of global concurrent remote bootstrap sessions: "
823
0
          <<  global_state_->total_starting_tablets_
824
0
          << ", max allowed: " << state_->options_->kMaxTabletRemoteBootstraps
825
0
          << ". Number of concurrent remote bootstrap sessions for table " << state_->table_id_
826
0
          << ": " << state_->total_starting_
827
0
          << ", max allowed: " << state_->options_->kMaxTabletRemoteBootstrapsPerTable;
828
829
  // Handle missing placements with highest priority, as it means we're potentially
830
  // under-replicated.
831
119k
  if (VERIFY_RESULT(HandleAddIfMissingPlacement(out_tablet_id, out_to_ts))) {
832
364
    return true;
833
364
  }
834
835
  // Handle wrong placements as next priority, as these could be servers we're moving off of, so
836
  // we can decommission ASAP.
837
118k
  if (VERIFY_RESULT(HandleAddIfWrongPlacement(out_tablet_id, out_from_ts, out_to_ts))) {
838
252
    return true;
839
252
  }
840
841
  // Finally, handle normal load balancing.
842
118k
  if (!VERIFY_RESULT(GetLoadToMove(out_tablet_id, out_from_ts, out_to_ts))) {
843
0
    VLOG(1) << "Cannot find any more tablets to move, under current constraints.";
844
0
    if (VLOG_IS_ON(1)) {
845
0
      DumpSortedLoad();
846
0
    }
847
118k
    return false;
848
118k
  }
849
850
489
  return true;
851
489
}
852
853
0
void ClusterLoadBalancer::DumpSortedLoad() const {
854
0
  ssize_t last_pos = state_->sorted_load_.size() - 1;
855
0
  std::ostringstream out;
856
0
  out << "Table load (global load): ";
857
0
  for (ssize_t left = 0; left <= last_pos; ++left) {
858
0
    const TabletServerId& uuid = state_->sorted_load_[left];
859
0
    auto load = state_->GetLoad(uuid);
860
0
    out << uuid << ":" << load << " (" << global_state_->GetGlobalLoad(uuid) << ") ";
861
0
  }
862
0
  VLOG(1) << out.str();
863
0
}
864
865
Result<bool> ClusterLoadBalancer::GetLoadToMove(
866
118k
    TabletId* moving_tablet_id, TabletServerId* from_ts, TabletServerId* to_ts) {
867
118k
  if (state_->sorted_load_.empty()) {
868
33
    return false;
869
33
  }
870
871
  // Start with two indices pointing at left and right most ends of the sorted_load_ structure.
872
  //
873
  // We will try to find two TSs that have at least one tablet that can be moved amongst them, from
874
  // the higher load to the lower load TS. To do this, we will go through comparing the TSs
875
  // corresponding to our left and right indices, exclude tablets from the right, high loaded TS
876
  // according to our load balancing rules, such as load variance, starting tablets and not moving
877
  // already over-replicated tablets. We then compare the remaining set of tablets with the ones
878
  // hosted by the lower loaded TS and use ReservoirSample to pick a tablet from the set
879
  // difference. If there were no tablets to pick, we advance our state.
880
  //
881
  // The state is defined as the positions of the start and end indices. We always try to move the
882
  // right index back, until we cannot any more, due to either reaching the left index (cannot
883
  // rebalance from one TS to itself), or the difference of load between the two TSs is too low to
884
  // try to rebalance (if load variance is 1, it does not make sense to move tablets between the
885
  // TSs). When we cannot lower the right index any further, we reset it back to last_pos and
886
  // increment the left index.
887
  //
888
  // We stop the whole algorithm if the left index reaches last_pos, or if we reset the right index
889
  // and are already breaking the invariance rule, as that means that any further differences in
890
  // the interval between left and right cannot have load > kMinLoadVarianceToBalance.
891
118k
  ssize_t last_pos = state_->sorted_load_.size() - 1;
892
125k
  for (ssize_t left = 0; left <= last_pos; ++left) {
893
144k
    for (auto right = last_pos; right >= 0; --right) {
894
144k
      const TabletServerId& low_load_uuid = state_->sorted_load_[left];
895
144k
      const TabletServerId& high_load_uuid = state_->sorted_load_[right];
896
144k
      ssize_t load_variance = state_->GetLoad(high_load_uuid) - state_->GetLoad(low_load_uuid);
897
144k
      bool is_global_balancing_move = false;
898
899
      // Check for state change or end conditions.
900
144k
      if (left == right || load_variance < state_->options_->kMinLoadVarianceToBalance) {
901
        // Either both left and right are at the end, or there is no load_variance, which means
902
        // there will be no load_variance for any TSs between left and right, so we can return.
903
125k
        if (right == last_pos && load_variance == 0) {
904
115k
          return false;
905
115k
        }
906
        // If there is load variance, then there is a chance we can benefit from globally balancing.
907
9.59k
        if (load_variance > 0 && CanBalanceGlobalLoad()) {
908
2.67k
          int global_load_variance = global_state_->GetGlobalLoad(high_load_uuid) -
909
2.67k
                                     global_state_->GetGlobalLoad(low_load_uuid);
910
2.67k
          if (global_load_variance < state_->options_->kMinGlobalLoadVarianceToBalance) {
911
            // Already globally balanced. Since we are sorted by global load, we can return here as
912
            // there are no other moves for us to make.
913
2.50k
            return false;
914
2.50k
          }
915
          // Mark this move as a global balancing move and try to find a tablet to move.
916
175
          is_global_balancing_move = true;
917
6.92k
        } else {
918
          // The load_variance is too low, which means we weren't able to find a load to move to
919
          // the left tserver. Continue and try with the next left tserver.
920
6.92k
          break;
921
6.92k
        }
922
19.0k
      }
923
924
      // If we don't find a tablet_id to move between these two TSs, advance the state.
925
19.0k
      if (VERIFY_RESULT(GetTabletToMove(high_load_uuid, low_load_uuid, moving_tablet_id))) {
926
        // If we got this far, we have the candidate we want, so fill in the output params and
927
        // return. The tablet_id is filled in from GetTabletToMove.
928
489
        *from_ts = high_load_uuid;
929
489
        *to_ts = low_load_uuid;
930
489
        RETURN_NOT_OK(MoveReplica(*moving_tablet_id, high_load_uuid, low_load_uuid));
931
        // Update global state if necessary.
932
489
        if (!is_global_balancing_move) {
933
473
          can_perform_global_operations_ = false;
934
473
        }
935
489
        return true;
936
489
      }
937
19.0k
    }
938
125k
  }
939
940
  // Should never get here.
941
0
  return STATUS(IllegalState, "Load balancing algorithm reached illegal state.");
942
118k
}
943
944
Result<bool> ClusterLoadBalancer::GetTabletToMove(
945
19.0k
    const TabletServerId& from_ts, const TabletServerId& to_ts, TabletId* moving_tablet_id) {
946
19.0k
  const auto& from_ts_meta = state_->per_ts_meta_[from_ts];
947
  // If drive aware, all_tablets is sorted by decreasing drive load.
948
19.0k
  vector<set<TabletId>> all_tablets_by_drive = GetTabletsOnTSToMove(global_state_->drive_aware_,
949
19.0k
                                                                    from_ts_meta);
950
19.0k
  vector<set<TabletId>> all_filtered_tablets_by_drive;
951
41.6k
  for (const set<TabletId>& drive_tablets : all_tablets_by_drive) {
952
41.6k
    set<TabletId> filtered_drive_tablets;
953
294k
    for (const TabletId& tablet_id : drive_tablets) {
954
      // We don't want to add a new replica to an already over-replicated tablet.
955
      //
956
      // TODO(bogdan): should make sure we pick tablets that this TS is not a leader of, so we
957
      // can ensure HandleRemoveReplicas removes them from this TS.
958
294k
      if (state_->tablets_over_replicated_.count(tablet_id)) {
959
6.02k
        continue;
960
6.02k
      }
961
      // Don't move a replica right after split
962
288k
      if (ContainsKey(from_ts_meta.disabled_by_ts_tablets, tablet_id)) {
963
0
        continue;
964
0
      }
965
966
288k
      if (VERIFY_RESULT(
967
22.7k
          state_->CanAddTabletToTabletServer(tablet_id, to_ts, &GetPlacementByTablet(tablet_id)))) {
968
22.7k
        filtered_drive_tablets.insert(tablet_id);
969
22.7k
      }
970
288k
    }
971
41.6k
    all_filtered_tablets_by_drive.push_back(filtered_drive_tablets);
972
41.6k
  }
973
974
  // Below, we choose a tablet to move. We first filter out any tablets which cannot be moved
975
  // because of placement limitations. Then, we prioritize moving a tablet whose leader is in the
976
  // same zone/region it is moving to (for faster remote bootstrapping).
977
41.0k
  for (const set<TabletId>& drive_tablets : all_filtered_tablets_by_drive) {
978
41.0k
    bool found_tablet_to_move = false;
979
41.0k
    CatalogManagerUtil::CloudInfoSimilarity chosen_tablet_ci_similarity =
980
41.0k
        CatalogManagerUtil::NO_MATCH;
981
17.6k
    for (const TabletId& tablet_id : drive_tablets) {
982
17.6k
      const auto& placement_info = GetPlacementByTablet(tablet_id);
983
      // TODO(bogdan): this should be augmented as well to allow dropping by one replica, if still
984
      // leaving us with more than the minimum.
985
      //
986
      // If we have placement information, we want to only pick the tablet if it's moving to the
987
      // same placement, so we guarantee we're keeping the same type of distribution.
988
      // Since we allow prefixes as well, we can still respect the placement of this tablet
989
      // even if their placement ids aren't the same. An e.g.
990
      // placement info of tablet: C.R1.*
991
      // placement info of from_ts: C.R1.Z1
992
      // placement info of to_ts: C.R2.Z2
993
      // Note that we've assumed that for every TS there is a unique placement block to which it
994
      // can be mapped (see the validation rules in yb_admin-client). If there is no unique
995
      // placement block then it is simply the C.R.Z of the TS itself.
996
17.6k
      auto from_ts_block = state_->GetValidPlacement(from_ts, &placement_info);
997
17.6k
      auto to_ts_block = state_->GetValidPlacement(to_ts, &placement_info);
998
17.6k
      bool same_placement = false;
999
17.6k
      if (to_ts_block.has_value() && from_ts_block.has_value()) {
1000
17.6k
          same_placement = TSDescriptor::generate_placement_id(*from_ts_block) ==
1001
17.6k
                                  TSDescriptor::generate_placement_id(*to_ts_block);
1002
17.6k
      }
1003
1004
17.6k
      if (!placement_info.placement_blocks().empty() && !same_placement) {
1005
13.0k
        continue;
1006
13.0k
      }
1007
1008
4.59k
      TabletServerId leader_ts = state_->per_tablet_meta_[tablet_id].leader_uuid;
1009
4.59k
      auto ci_similarity = CatalogManagerUtil::CloudInfoSimilarity::NO_MATCH;
1010
4.59k
      if (!leader_ts.empty() && !FLAGS_load_balancer_ignore_cloud_info_similarity) {
1011
4.36k
        const auto leader_ci = state_->per_ts_meta_[leader_ts].descriptor->GetCloudInfo();
1012
4.36k
        const auto to_ts_ci = state_->per_ts_meta_[to_ts].descriptor->GetCloudInfo();
1013
4.36k
        ci_similarity = CatalogManagerUtil::ComputeCloudInfoSimilarity(leader_ci, to_ts_ci);
1014
4.36k
      }
1015
1016
4.59k
      if (found_tablet_to_move && ci_similarity <= chosen_tablet_ci_similarity) {
1017
4.08k
        continue;
1018
4.08k
      }
1019
      // This is the best tablet to move, so far.
1020
509
      found_tablet_to_move = true;
1021
509
      *moving_tablet_id = tablet_id;
1022
509
      chosen_tablet_ci_similarity = ci_similarity;
1023
509
    }
1024
1025
    // If there is any tablet we can move from this drive, choose it and return.
1026
41.0k
    if (found_tablet_to_move) {
1027
489
      return true;
1028
489
    }
1029
41.0k
  }
1030
1031
18.6k
  return false;
1032
19.0k
}
1033
1034
Result<bool> ClusterLoadBalancer::GetLeaderToMove(TabletId* moving_tablet_id,
1035
                                                  TabletServerId* from_ts,
1036
                                                  TabletServerId* to_ts,
1037
120k
                                                  std::string* to_ts_path) {
1038
120k
  if (state_->sorted_leader_load_.empty()) {
1039
2.61k
    return false;
1040
2.61k
  }
1041
1042
  // Find out if there are leaders to be moved.
1043
118k
  for (auto right = state_->sorted_leader_load_.size(); right > 0;) {
1044
118k
    --right;
1045
118k
    const TabletServerId& high_load_uuid = state_->sorted_leader_load_[right];
1046
118k
    auto high_leader_blacklisted = (state_->leader_blacklisted_servers_.find(high_load_uuid) !=
1047
118k
      state_->leader_blacklisted_servers_.end());
1048
118k
    if (high_leader_blacklisted) {
1049
605
      auto high_load = state_->GetLeaderLoad(high_load_uuid);
1050
605
      if (high_load > 0) {
1051
        // Leader blacklisted tserver with a leader replica.
1052
35
        break;
1053
570
      } else {
1054
        // Leader blacklisted tserver without leader replica.
1055
570
        continue;
1056
570
      }
1057
117k
    } else {
1058
117k
      if (state_->IsLeaderLoadBelowThreshold(state_->sorted_leader_load_[right])) {
1059
        // Non-leader blacklisted tserver with not too many leader replicas.
1060
        // TODO(Sanket): Even though per table load is below the configured threshold,
1061
        // we might want to do global leader balancing above a certain threshold that is lower
1062
        // than the per table threshold. Can add another gflag/knob here later.
1063
10
        return false;
1064
117k
      } else {
1065
        // Non-leader blacklisted tserver with too many leader replicas.
1066
117k
        break;
1067
117k
      }
1068
117k
    }
1069
118k
  }
1070
1071
  // The algorithm to balance the leaders is very similar to the one for tablets:
1072
  //
1073
  // Start with two indices pointing at left and right most ends of the sorted_leader_load_
1074
  // structure. Note that leader blacklisted tserver is considered as having infinite leader load.
1075
  //
1076
  // We will try to find two TSs that have at least one leader that can be moved amongst them, from
1077
  // the higher load to the lower load TS. To do this, we will go through comparing the TSs
1078
  // corresponding to our left and right indices. We go through leaders on the higher loaded TS
1079
  // and find a running replica on the lower loaded TS to move the leader. If no leader can be
1080
  // be picked, we advance our state.
1081
  //
1082
  // The state is defined as the positions of the start and end indices. We always try to move the
1083
  // right index back, until we cannot any more, due to either reaching the left index (cannot
1084
  // rebalance from one TS to itself), or the difference of load between the two TSs is too low to
1085
  // try to rebalance (if load variance is 1, it does not make sense to move leaders between the
1086
  // TSs). When we cannot lower the right index any further, we reset it back to last_pos and
1087
  // increment the left index.
1088
  //
1089
  // We stop the whole algorithm if the left index reaches last_pos, or if we reset the right index
1090
  // and are already breaking the invariance rule, as that means that any further differences in
1091
  // the interval between left and right cannot have load > kMinLeaderLoadVarianceToBalance.
1092
117k
  const auto current_time = MonoTime::Now();
1093
117k
  ssize_t last_pos = state_->sorted_leader_load_.size() - 1;
1094
125k
  for (ssize_t left = 0; left <= last_pos; ++left) {
1095
125k
    const TabletServerId& low_load_uuid = state_->sorted_leader_load_[left];
1096
125k
    auto low_leader_blacklisted = (state_->leader_blacklisted_servers_.find(low_load_uuid) !=
1097
125k
        state_->leader_blacklisted_servers_.end());
1098
125k
    if (low_leader_blacklisted) {
1099
      // Left marker has gone beyond non-leader blacklisted tservers.
1100
547
      return false;
1101
547
    }
1102
1103
141k
    for (auto right = last_pos; right >= 0; --right) {
1104
141k
      const TabletServerId& high_load_uuid = state_->sorted_leader_load_[right];
1105
141k
      auto high_leader_blacklisted = (state_->leader_blacklisted_servers_.find(high_load_uuid) !=
1106
141k
          state_->leader_blacklisted_servers_.end());
1107
141k
      ssize_t load_variance =
1108
141k
          state_->GetLeaderLoad(high_load_uuid) - state_->GetLeaderLoad(low_load_uuid);
1109
1110
141k
      bool is_global_balancing_move = false;
1111
1112
      // Check for state change or end conditions.
1113
141k
      if (left == right || (load_variance < state_->options_->kMinLeaderLoadVarianceToBalance &&
1114
119k
            !high_leader_blacklisted)) {
1115
        // Global leader balancing only if per table variance is > 0.
1116
        // If both left and right are same (i.e. load_variance is 0) and right is last_pos
1117
        // or right is last_pos and load_variance is 0 then we can return as we don't
1118
        // have any other moves to make.
1119
119k
        if (load_variance == 0 && right == last_pos) {
1120
96.4k
          return false;
1121
96.4k
        }
1122
        // Check if we can benefit from global leader balancing.
1123
        // If we have > 0 load_variance and there are no per table moves left.
1124
23.0k
        if (load_variance > 0 && CanBalanceGlobalLoad()) {
1125
15.4k
          int global_load_variance = state_->global_state_->GetGlobalLeaderLoad(high_load_uuid) -
1126
15.4k
                                        state_->global_state_->GetGlobalLeaderLoad(low_load_uuid);
1127
          // Already globally balanced. Since we are sorted by global load, we can return here as
1128
          // there are no other moves for us to make.
1129
15.4k
          if (global_load_variance < state_->options_->kMinGlobalLeaderLoadVarianceToBalance) {
1130
15.1k
            return false;
1131
15.1k
          }
1132
260
          is_global_balancing_move = true;
1133
7.67k
        } else {
1134
7.67k
          break;
1135
7.67k
        }
1136
22.3k
      }
1137
1138
      // Find the leaders on the higher loaded TS that have running peers on the lower loaded TS.
1139
      // If there are, we have a candidate we want, so fill in the output params and return.
1140
22.3k
      const set<TabletId>& leaders = state_->per_ts_meta_[high_load_uuid].leaders;
1141
22.3k
      for (const auto& tablet : GetLeadersOnTSToMove(global_state_->drive_aware_,
1142
22.3k
                                                     leaders,
1143
5.59k
                                                     state_->per_ts_meta_[low_load_uuid])) {
1144
5.59k
        *moving_tablet_id = tablet.first;
1145
5.59k
        *to_ts_path = tablet.second;
1146
5.59k
        *from_ts = high_load_uuid;
1147
5.59k
        *to_ts = low_load_uuid;
1148
1149
5.59k
        const auto& per_tablet_meta = state_->per_tablet_meta_;
1150
5.59k
        const auto tablet_meta_iter = per_tablet_meta.find(tablet.first);
1151
5.59k
        if (PREDICT_TRUE(tablet_meta_iter != per_tablet_meta.end())) {
1152
5.59k
          const auto& tablet_meta = tablet_meta_iter->second;
1153
5.59k
          const auto& stepdown_failures = tablet_meta.leader_stepdown_failures;
1154
5.59k
          const auto stepdown_failure_iter = stepdown_failures.find(low_load_uuid);
1155
5.59k
          if (stepdown_failure_iter != stepdown_failures.end()) {
1156
137
            const auto time_since_failure = current_time - stepdown_failure_iter->second;
1157
137
            if (time_since_failure.ToMilliseconds() < FLAGS_min_leader_stepdown_retry_interval_ms) {
1158
137
              LOG(INFO) << "Cannot move tablet " << tablet.first << " leader from TS "
1159
137
                        << *from_ts << " to TS " << *to_ts << " yet: previous attempt with the same"
1160
137
                        << " intended leader failed only " << ToString(time_since_failure)
1161
137
                        << " ago (less " << "than " << FLAGS_min_leader_stepdown_retry_interval_ms
1162
137
                        << "ms).";
1163
137
            }
1164
137
            continue;
1165
137
          }
1166
0
        } else {
1167
0
          LOG(WARNING) << "Did not find load balancer metadata for tablet " << *moving_tablet_id;
1168
0
        }
1169
1170
        // Leader movement solely due to leader blacklist.
1171
5.46k
        if (load_variance < state_->options_->kMinLeaderLoadVarianceToBalance &&
1172
160
            high_leader_blacklisted) {
1173
32
          state_->LogSortedLeaderLoad();
1174
32
          LOG(INFO) << "Move tablet " << tablet.first << " leader from leader blacklisted TS "
1175
32
            << *from_ts << " to TS " << *to_ts;
1176
32
        }
1177
5.46k
        if (!is_global_balancing_move) {
1178
5.33k
          can_perform_global_operations_ = false;
1179
5.33k
        }
1180
5.46k
        return true;
1181
5.59k
      }
1182
22.3k
    }
1183
124k
  }
1184
1185
  // Should never get here.
1186
0
  FATAL_ERROR("Load balancing algorithm reached invalid state!");
1187
0
}
1188
1189
Result<bool> ClusterLoadBalancer::HandleRemoveReplicas(
1190
120k
    TabletId* out_tablet_id, TabletServerId* out_from_ts) {
1191
  // Give high priority to removing tablets that are not respecting the placement policy.
1192
120k
  if (VERIFY_RESULT(HandleRemoveIfWrongPlacement(out_tablet_id, out_from_ts))) {
1193
481
    return true;
1194
481
  }
1195
1196
120k
  for (const auto& tablet_id : state_->tablets_over_replicated_) {
1197
    // Skip if there is a pending ADD_SERVER.
1198
2.02k
    if (VERIFY_RESULT(IsConfigMemberInTransitionMode(tablet_id)) ||
1199
1.39k
        state_->per_tablet_meta_[tablet_id].starting > 0) {
1200
1.38k
      continue;
1201
1.38k
    }
1202
1203
640
    const auto& tablet_meta = state_->per_tablet_meta_[tablet_id];
1204
640
    const auto& tablet_servers = tablet_meta.over_replicated_tablet_servers;
1205
640
    auto comparator = PerTableLoadState::Comparator(state_);
1206
640
    vector<TabletServerId> sorted_ts;
1207
    // Don't include any tservers where this tablet is still starting.
1208
640
    std::copy_if(
1209
640
        tablet_servers.begin(), tablet_servers.end(), std::back_inserter(sorted_ts),
1210
2.36k
        [&](const TabletServerId& ts_uuid) {
1211
2.36k
          return !state_->per_ts_meta_[ts_uuid].starting_tablets.count(tablet_id);
1212
2.36k
        });
1213
640
    if (sorted_ts.empty()) {
1214
0
      return STATUS_SUBSTITUTE(IllegalState, "No tservers to remove from over-replicated "
1215
0
                                             "tablet $0", tablet_id);
1216
0
    }
1217
    // Sort in reverse to first try to remove a replica from the highest loaded TS.
1218
640
    sort(sorted_ts.rbegin(), sorted_ts.rend(), comparator);
1219
640
    string remove_candidate = sorted_ts[0];
1220
640
    *out_tablet_id = tablet_id;
1221
640
    *out_from_ts = remove_candidate;
1222
    // Do force leader stepdown, as we are either not the leader or we are allowed to step down.
1223
640
    RETURN_NOT_OK(RemoveReplica(tablet_id, remove_candidate));
1224
640
    return true;
1225
640
  }
1226
119k
  return false;
1227
120k
}
1228
1229
Result<bool> ClusterLoadBalancer::HandleRemoveIfWrongPlacement(
1230
120k
    TabletId* out_tablet_id, TabletServerId* out_from_ts) {
1231
2.69k
  for (const auto& tablet_id : state_->tablets_wrong_placement_) {
1232
2.69k
    LOG(INFO) << "Processing tablet " << tablet_id;
1233
    // Skip this tablet if it is not over-replicated.
1234
2.69k
    if (!state_->tablets_over_replicated_.count(tablet_id)) {
1235
2.02k
      continue;
1236
2.02k
    }
1237
    // Skip if there is a pending ADD_SERVER
1238
667
    if (VERIFY_RESULT(IsConfigMemberInTransitionMode(tablet_id))) {
1239
186
      continue;
1240
186
    }
1241
481
    const auto& tablet_meta = state_->per_tablet_meta_[tablet_id];
1242
481
    TabletServerId target_uuid;
1243
    // Prioritize blacklisted servers, if any.
1244
481
    if (!tablet_meta.blacklisted_tablet_servers.empty()) {
1245
468
      target_uuid = *tablet_meta.blacklisted_tablet_servers.begin();
1246
468
    }
1247
    // If no blacklisted server could be chosen, try the wrong placement ones.
1248
481
    if (target_uuid.empty()) {
1249
13
      if (!tablet_meta.wrong_placement_tablet_servers.empty()) {
1250
13
        target_uuid = *tablet_meta.wrong_placement_tablet_servers.begin();
1251
13
      }
1252
13
    }
1253
    // If we found a tablet server, choose it.
1254
481
    if (!target_uuid.empty()) {
1255
481
      *out_tablet_id = tablet_id;
1256
481
      *out_from_ts = std::move(target_uuid);
1257
      // Force leader stepdown if we have wrong placements or blacklisted servers.
1258
481
      RETURN_NOT_OK(RemoveReplica(tablet_id, *out_from_ts));
1259
481
      return true;
1260
481
    }
1261
481
  }
1262
120k
  return false;
1263
120k
}
1264
1265
Result<bool> ClusterLoadBalancer::HandleLeaderLoadIfNonAffinitized(TabletId* moving_tablet_id,
1266
                                                                   TabletServerId* from_ts,
1267
                                                                   TabletServerId* to_ts,
1268
98.5k
                                                                   std::string* to_ts_path) {
1269
  // Similar to normal leader balancing, we double iterate from most loaded to least loaded
1270
  // non-affinitized nodes and least to most affinitized nodes. For each pair, we check whether
1271
  // there is any tablet intersection and if so, there is a match and we return true.
1272
  //
1273
  // If we go through all the node pairs or we see that the current non-affinitized
1274
  // leader load is 0, we know that there is no match from non-affinitized to affinitized nodes
1275
  // and we return false.
1276
98.5k
  for (size_t non_affinitized_idx = state_->sorted_non_affinitized_leader_load_.size();
1277
98.8k
       non_affinitized_idx > 0;) {
1278
458
    --non_affinitized_idx;
1279
458
    const TabletServerId& non_affinitized_uuid =
1280
458
        state_->sorted_non_affinitized_leader_load_[non_affinitized_idx];
1281
458
    if (state_->GetLeaderLoad(non_affinitized_uuid) == 0) {
1282
      // All subsequent non-affinitized nodes have no leaders, no match found.
1283
79
      return false;
1284
79
    }
1285
379
    const set<TabletId>& leaders = state_->per_ts_meta_[non_affinitized_uuid].leaders;
1286
60
    for (const auto& affinitized_uuid : state_->sorted_leader_load_) {
1287
60
      auto peers = GetLeadersOnTSToMove(global_state_->drive_aware_,
1288
60
                                        leaders,
1289
60
                                        state_->per_ts_meta_[affinitized_uuid]);
1290
1291
60
      if (!peers.empty()) {
1292
60
        auto peer = peers.begin();
1293
60
        *moving_tablet_id = peer->first;
1294
60
        *to_ts_path = peer->first;
1295
60
        *from_ts = non_affinitized_uuid;
1296
60
        *to_ts = affinitized_uuid;
1297
60
        return true;
1298
60
      }
1299
60
    }
1300
379
  }
1301
98.4k
  return false;
1302
98.5k
}
1303
1304
Result<bool> ClusterLoadBalancer::HandleLeaderMoves(
1305
120k
    TabletId* out_tablet_id, TabletServerId* out_from_ts, TabletServerId* out_to_ts) {
1306
  // If the user sets 'transaction_tables_use_preferred_zones' gflag to 0 and the tablet
1307
  // being balanced is a transaction tablet, then logical flow will be changed to ignore
1308
  // preferred zones and instead proceed to normal leader balancing.
1309
120k
  string out_ts_ts_path;
1310
120k
  if (state_->use_preferred_zones_ &&
1311
98.5k
    VERIFY_RESULT(HandleLeaderLoadIfNonAffinitized(
1312
60
                    out_tablet_id, out_from_ts, out_to_ts, &out_ts_ts_path))) {
1313
60
    RETURN_NOT_OK(MoveLeader(*out_tablet_id, *out_from_ts, *out_to_ts, out_ts_ts_path));
1314
60
    return true;
1315
120k
  }
1316
1317
120k
  if (VERIFY_RESULT(GetLeaderToMove(out_tablet_id, out_from_ts, out_to_ts, &out_ts_ts_path))) {
1318
5.46k
    RETURN_NOT_OK(MoveLeader(*out_tablet_id, *out_from_ts, *out_to_ts, out_ts_ts_path));
1319
5.46k
    return true;
1320
114k
  }
1321
114k
  return false;
1322
114k
}
1323
1324
Status ClusterLoadBalancer::MoveReplica(
1325
741
    const TabletId& tablet_id, const TabletServerId& from_ts, const TabletServerId& to_ts) {
1326
741
  LOG(INFO) << Substitute("Moving tablet $0 from $1 to $2", tablet_id, from_ts, to_ts);
1327
741
  RETURN_NOT_OK(SendReplicaChanges(GetTabletMap().at(tablet_id), to_ts, true /* is_add */,
1328
741
                                   true /* should_remove_leader */));
1329
741
  RETURN_NOT_OK(state_->AddReplica(tablet_id, to_ts));
1330
741
  return GetAtomicFlag(&FLAGS_load_balancer_count_move_as_add) ?
1331
741
      Status::OK() : state_->RemoveReplica(tablet_id, from_ts);
1332
741
}
1333
1334
364
Status ClusterLoadBalancer::AddReplica(const TabletId& tablet_id, const TabletServerId& to_ts) {
1335
364
  LOG(INFO) << Substitute("Adding tablet $0 to $1", tablet_id, to_ts);
1336
  // This is an add operation, so the "should_remove_leader" flag is irrelevant.
1337
364
  RETURN_NOT_OK(SendReplicaChanges(GetTabletMap().at(tablet_id), to_ts, true /* is_add */,
1338
364
                                   true /* should_remove_leader */));
1339
364
  return state_->AddReplica(tablet_id, to_ts);
1340
364
}
1341
1342
Status ClusterLoadBalancer::RemoveReplica(
1343
1.15k
    const TabletId& tablet_id, const TabletServerId& ts_uuid) {
1344
1.15k
  LOG(INFO) << Substitute("Removing replica $0 from tablet $1", ts_uuid, tablet_id);
1345
1.15k
  RETURN_NOT_OK(SendReplicaChanges(GetTabletMap().at(tablet_id), ts_uuid, false /* is_add */,
1346
1.15k
                                   true /* should_remove_leader */));
1347
1.15k
  return state_->RemoveReplica(tablet_id, ts_uuid);
1348
1.15k
}
1349
1350
Status ClusterLoadBalancer::MoveLeader(const TabletId& tablet_id,
1351
                                       const TabletServerId& from_ts,
1352
                                       const TabletServerId& to_ts,
1353
5.52k
                                       const string& to_ts_path) {
1354
5.52k
  LOG(INFO) << Substitute("Moving leader of $0 from TS $1 to $2", tablet_id, from_ts, to_ts);
1355
5.52k
  RETURN_NOT_OK(SendReplicaChanges(GetTabletMap().at(tablet_id), from_ts, false /* is_add */,
1356
5.52k
                                   false /* should_remove_leader */, to_ts));
1357
1358
5.52k
  return state_->MoveLeader(tablet_id, from_ts, to_ts, to_ts_path);
1359
5.52k
}
1360
1361
99.6k
void ClusterLoadBalancer::GetAllAffinitizedZones(AffinitizedZonesSet* affinitized_zones) const {
1362
99.6k
  SysClusterConfigEntryPB config;
1363
99.6k
  CHECK_OK(catalog_manager_->GetClusterConfig(&config));
1364
99.6k
  const int num_zones = config.replication_info().affinitized_leaders_size();
1365
99.8k
  for (int i = 0; i < num_zones; i++) {
1366
225
    CloudInfoPB ci = config.replication_info().affinitized_leaders(i);
1367
225
    affinitized_zones->insert(ci);
1368
225
  }
1369
99.6k
}
1370
1371
121k
void ClusterLoadBalancer::InitializeTSDescriptors() {
1372
121k
  if (state_->use_preferred_zones_) {
1373
99.6k
    GetAllAffinitizedZones(&state_->affinitized_zones_);
1374
99.6k
  }
1375
  // Set the blacklist so we can also mark the tablet servers as we add them up.
1376
121k
  state_->SetBlacklist(GetServerBlacklist());
1377
1378
  // Set the leader blacklist so we can also mark the tablet servers as we add them up.
1379
121k
  state_->SetLeaderBlacklist(GetLeaderBlacklist());
1380
1381
  // Loop over tablet servers to set empty defaults, so we can also have info on those
1382
  // servers that have yet to receive load (have heartbeated to the master, but have not been
1383
  // assigned any tablets yet).
1384
382k
  for (const auto& ts_desc : global_state_->ts_descs_) {
1385
382k
    state_->UpdateTabletServer(ts_desc);
1386
382k
  }
1387
121k
}
1388
1389
// CatalogManager indirection methods that are set as virtual to be bypassed in testing.
1390
//
1391
0
void ClusterLoadBalancer::GetAllReportedDescriptors(TSDescriptorVector* ts_descs) const {
1392
0
  catalog_manager_->master_->ts_manager()->GetAllReportedDescriptors(ts_descs);
1393
0
}
1394
1395
78.4k
void ClusterLoadBalancer::GetAllDescriptors(TSDescriptorVector* ts_descs) const {
1396
78.4k
  catalog_manager_->master_->ts_manager()->GetAllDescriptors(ts_descs);
1397
78.4k
}
1398
1399
318k
const TabletInfoMap& ClusterLoadBalancer::GetTabletMap() const {
1400
318k
  return *catalog_manager_->tablet_map_;
1401
318k
}
1402
1403
121k
const scoped_refptr<TableInfo> ClusterLoadBalancer::GetTableInfo(const TableId& table_uuid) const {
1404
121k
  return catalog_manager_->GetTableInfoUnlocked(table_uuid);
1405
121k
}
1406
1407
121k
Result<TabletInfos> ClusterLoadBalancer::GetTabletsForTable(const TableId& table_uuid) const {
1408
121k
  auto table_info = GetTableInfo(table_uuid);
1409
1410
121k
  if (table_info == nullptr) {
1411
0
    return STATUS_FORMAT(
1412
0
        InvalidArgument, "Invalid UUID '$0' - no entry found in catalog manager table map",
1413
0
        table_uuid);
1414
0
  }
1415
1416
121k
  return table_info->GetTablets(IncludeInactive(!FLAGS_TEST_load_balancer_skip_inactive_tablets));
1417
121k
}
1418
1419
265k
const TableInfoMap& ClusterLoadBalancer::GetTableMap() const {
1420
265k
  return *catalog_manager_->table_ids_map_;
1421
265k
}
1422
1423
120k
const ReplicationInfoPB& ClusterLoadBalancer::GetClusterReplicationInfo() const {
1424
120k
  return catalog_manager_->cluster_config_->LockForRead()->pb.replication_info();
1425
120k
}
1426
1427
840
const PlacementInfoPB& ClusterLoadBalancer::GetClusterPlacementInfo() const {
1428
840
  auto l = down_cast<enterprise::CatalogManager*>
1429
840
                      (catalog_manager_)->GetClusterConfigInfo()->LockForRead();
1430
840
  if (state_->options_->type == LIVE) {
1431
0
    return l->pb.replication_info().live_replicas();
1432
840
  } else {
1433
840
    return GetReadOnlyPlacementFromUuid(l->pb.replication_info());
1434
840
  }
1435
840
}
1436
1437
121k
const BlacklistPB& ClusterLoadBalancer::GetServerBlacklist() const {
1438
121k
  return catalog_manager_->cluster_config_->LockForRead()->pb.server_blacklist();
1439
121k
}
1440
1441
121k
const BlacklistPB& ClusterLoadBalancer::GetLeaderBlacklist() const {
1442
121k
  return catalog_manager_->cluster_config_->LockForRead()->pb.leader_blacklist();
1443
121k
}
1444
1445
25.9M
bool ClusterLoadBalancer::SkipLoadBalancing(const TableInfo& table) const {
1446
  // Skip load-balancing of some tables:
1447
  // * system tables: they are virtual tables not hosted by tservers.
1448
  // * colocated user tables: they occupy the same tablet as their colocated parent table, so load
1449
  //   balancing just the colocated parent table is sufficient.
1450
  // * deleted/deleting tables: as they are no longer in effect. For tables that are being deleted
1451
  // currently as well, load distribution wouldn't matter as eventually they would get deleted.
1452
25.9M
  auto l = table.LockForRead();
1453
25.9M
  return (catalog_manager_->IsSystemTable(table) ||
1454
728k
          table.IsColocatedUserTable() ||
1455
725k
          l->started_deleting());
1456
25.9M
}
1457
1458
Status ClusterLoadBalancer::CountPendingTasksUnlocked(const TableId& table_uuid,
1459
                                            int* pending_add_replica_tasks,
1460
                                            int* pending_remove_replica_tasks,
1461
121k
                                            int* pending_stepdown_leader_tasks) {
1462
121k
  GetPendingTasks(table_uuid,
1463
121k
                  &state_->pending_add_replica_tasks_[table_uuid],
1464
121k
                  &state_->pending_remove_replica_tasks_[table_uuid],
1465
121k
                  &state_->pending_stepdown_leader_tasks_[table_uuid]);
1466
1467
121k
  *pending_add_replica_tasks += state_->pending_add_replica_tasks_[table_uuid].size();
1468
121k
  *pending_remove_replica_tasks += state_->pending_remove_replica_tasks_[table_uuid].size();
1469
121k
  *pending_stepdown_leader_tasks += state_->pending_stepdown_leader_tasks_[table_uuid].size();
1470
181
  for (auto e : state_->pending_add_replica_tasks_[table_uuid]) {
1471
181
    const auto& ts_uuid = e.second;
1472
181
    const auto& tablet_id = e.first;
1473
181
    RETURN_NOT_OK(state_->AddStartingTablet(tablet_id, ts_uuid));
1474
181
  }
1475
121k
  return Status::OK();
1476
121k
}
1477
1478
void ClusterLoadBalancer::GetPendingTasks(const TableId& table_uuid,
1479
                                          TabletToTabletServerMap* add_replica_tasks,
1480
                                          TabletToTabletServerMap* remove_replica_tasks,
1481
121k
                                          TabletToTabletServerMap* stepdown_leader_tasks) {
1482
121k
  catalog_manager_->GetPendingServerTasksUnlocked(
1483
121k
      table_uuid, add_replica_tasks, remove_replica_tasks, stepdown_leader_tasks);
1484
121k
}
1485
1486
Status ClusterLoadBalancer::SendReplicaChanges(
1487
    scoped_refptr<TabletInfo> tablet, const TabletServerId& ts_uuid, const bool is_add,
1488
7.65k
    const bool should_remove_leader, const TabletServerId& new_leader_ts_uuid) {
1489
7.65k
  auto l = tablet->LockForRead();
1490
7.65k
  if (is_add) {
1491
    // These checks are temporary. They will be removed once we are confident that the algorithm is
1492
    // always doing the right thing.
1493
1.04k
    SCHECK_EQ(state_->pending_add_replica_tasks_[tablet->table()->id()].count(tablet->tablet_id()),
1494
1.04k
             0U,
1495
1.04k
             IllegalState,
1496
1.04k
             "Sending duplicate add replica task.");
1497
1.04k
    catalog_manager_->SendAddServerRequest(
1498
1.04k
        tablet, GetDefaultMemberType(), l->pb.committed_consensus_state(), ts_uuid);
1499
6.60k
  } else {
1500
    // If the replica is also the leader, first step it down and then remove.
1501
6.60k
    if (state_->per_tablet_meta_[tablet->id()].leader_uuid == ts_uuid) {
1502
5.81k
      SCHECK_EQ(
1503
5.81k
          state_->pending_stepdown_leader_tasks_[tablet->table()->id()].count(tablet->tablet_id()),
1504
5.81k
          0U,
1505
5.81k
          IllegalState,
1506
5.81k
          "Sending duplicate leader stepdown task.");
1507
5.81k
      catalog_manager_->SendLeaderStepDownRequest(
1508
5.81k
          tablet, l->pb.committed_consensus_state(), ts_uuid, should_remove_leader,
1509
5.81k
          new_leader_ts_uuid);
1510
794
    } else {
1511
794
      SCHECK_EQ(
1512
794
          state_->pending_remove_replica_tasks_[tablet->table()->id()].count(tablet->tablet_id()),
1513
794
          0U,
1514
794
          IllegalState,
1515
794
          "Sending duplicate remove replica task.");
1516
794
      catalog_manager_->SendRemoveServerRequest(
1517
794
          tablet, l->pb.committed_consensus_state(), ts_uuid);
1518
794
    }
1519
6.60k
  }
1520
7.65k
  return Status::OK();
1521
7.65k
}
1522
1523
1.04k
consensus::PeerMemberType ClusterLoadBalancer::GetDefaultMemberType() {
1524
1.04k
  if (state_->options_->type == LIVE) {
1525
989
    return consensus::PeerMemberType::PRE_VOTER;
1526
56
  } else {
1527
56
    return consensus::PeerMemberType::PRE_OBSERVER;
1528
56
  }
1529
1.04k
}
1530
1531
2.68k
Result<bool> ClusterLoadBalancer::IsConfigMemberInTransitionMode(const TabletId &tablet_id) const {
1532
2.68k
  auto tablet = GetTabletMap().at(tablet_id);
1533
2.68k
  auto l = tablet->LockForRead();
1534
2.68k
  auto config = l->pb.committed_consensus_state().config();
1535
2.68k
  return CountVotersInTransition(config) != 0;
1536
2.68k
}
1537
1538
const PlacementInfoPB& ClusterLoadBalancer::GetReadOnlyPlacementFromUuid(
1539
840
    const ReplicationInfoPB& replication_info) const {
1540
  // We assume we have an read replicas field in our replication info.
1541
876
  for (int i = 0; i < replication_info.read_replicas_size(); i++) {
1542
876
    const PlacementInfoPB& read_only_placement = replication_info.read_replicas(i);
1543
876
    if (read_only_placement.placement_uuid() == state_->options_->placement_uuid) {
1544
840
      return read_only_placement;
1545
840
    }
1546
876
  }
1547
  // Should never get here.
1548
0
  LOG(ERROR) << "Could not find read only cluster with placement uuid: "
1549
0
             << state_->options_->placement_uuid;
1550
0
  return replication_info.read_replicas(0);
1551
840
}
1552
1553
0
const PlacementInfoPB& ClusterLoadBalancer::GetLiveClusterPlacementInfo() const {
1554
0
  auto l = down_cast<enterprise::CatalogManager*>
1555
0
                    (catalog_manager_)->GetClusterConfigInfo()->LockForRead();
1556
0
  return l->pb.replication_info().live_replicas();
1557
0
}
1558
1559
0
vector<scoped_refptr<TableInfo>> ClusterLoadBalancer::GetAllTablesLoadBalancerSkipped() {
1560
0
  SharedLock<decltype(mutex_)> l(mutex_);
1561
0
  return skipped_tables_;
1562
0
}
1563
1564
}  // namespace master
1565
}  // namespace yb