YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/master/cluster_balance.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include "yb/master/cluster_balance.h"
15
16
#include <algorithm>
17
#include <memory>
18
#include <utility>
19
20
#include <boost/algorithm/string/join.hpp>
21
#include <boost/optional/optional.hpp>
22
23
#include "yb/common/common.pb.h"
24
25
#include "yb/consensus/quorum_util.h"
26
27
#include "yb/gutil/casts.h"
28
29
#include "yb/master/catalog_manager_util.h"
30
31
#include "yb/master/master_fwd.h"
32
#include "yb/master/master.h"
33
#include "yb/master/master_error.h"
34
35
#include "yb/util/flag_tags.h"
36
#include "yb/util/status.h"
37
#include "yb/util/status_format.h"
38
#include "yb/util/status_log.h"
39
40
DEFINE_bool(enable_load_balancing,
41
            true,
42
            "Choose whether to enable the load balancing algorithm, to move tablets around.");
43
44
DEFINE_bool(transaction_tables_use_preferred_zones,
45
            false,
46
            "Choose whether transaction tablet leaders respect preferred zones.");
47
48
DEFINE_bool(enable_global_load_balancing,
49
            true,
50
            "Choose whether to allow the load balancer to make moves that strictly only balance "
51
            "global load. Note that global balancing only occurs after all tables are balanced.");
52
53
DEFINE_int32(leader_balance_threshold,
54
             0,
55
             "Number of leaders per each tablet server to balance below. If this is configured to "
56
                 "0 (the default), the leaders will be balanced optimally at extra cost.");
57
58
DEFINE_int32(leader_balance_unresponsive_timeout_ms,
59
             3 * 1000,
60
             "The period of time that a master can go without receiving a heartbeat from a "
61
                 "tablet server before considering it unresponsive. Unresponsive servers are "
62
                 "excluded from leader balancing.");
63
64
DEFINE_int32(load_balancer_max_concurrent_tablet_remote_bootstraps,
65
             10,
66
             "Maximum number of tablets being remote bootstrapped across the cluster.");
67
68
DEFINE_int32(load_balancer_max_concurrent_tablet_remote_bootstraps_per_table,
69
             2,
70
             "Maximum number of tablets being remote bootstrapped for any table. The maximum "
71
             "number of remote bootstraps across the cluster is still limited by the flag "
72
             "load_balancer_max_concurrent_tablet_remote_bootstraps. This flag is meant to prevent "
73
             "a single table use all the available remote bootstrap sessions and starving other "
74
             "tables.");
75
76
DEFINE_int32(load_balancer_max_over_replicated_tablets,
77
             1,
78
             "Maximum number of running tablet replicas that are allowed to be over the configured "
79
             "replication factor.");
80
81
DEFINE_int32(load_balancer_max_concurrent_adds,
82
             1,
83
             "Maximum number of tablet peer replicas to add in any one run of the load balancer.");
84
85
DEFINE_int32(load_balancer_max_concurrent_removals,
86
             1,
87
             "Maximum number of over-replicated tablet peer removals to do in any one run of the "
88
             "load balancer.");
89
90
DEFINE_int32(load_balancer_max_concurrent_moves,
91
             2,
92
             "Maximum number of tablet leaders on tablet servers (across the cluster) to move in "
93
             "any one run of the load balancer.");
94
95
DEFINE_int32(load_balancer_max_concurrent_moves_per_table,
96
             1,
97
             "Maximum number of tablet leaders per table to move in any one run of the load "
98
             "balancer. The maximum number of tablet leader moves across the cluster is still "
99
             "limited by the flag load_balancer_max_concurrent_moves. This flag is meant to "
100
             "prevent a single table from using all of the leader moves quota and starving "
101
             "other tables.");
102
103
DEFINE_int32(load_balancer_num_idle_runs,
104
             5,
105
             "Number of idle runs of load balancer to deem it idle.");
106
107
DEFINE_test_flag(bool, load_balancer_handle_under_replicated_tablets_only, false,
108
                 "Limit the functionality of the load balancer during tests so tests can make "
109
                 "progress");
110
111
// No longer used because leader stepdown is not as slow as it used to be.)
112
DEFINE_bool(load_balancer_skip_leader_as_remove_victim, false,
113
            "DEPRECATED. Should the LB skip a leader as a possible remove candidate.");
114
115
DEFINE_bool(allow_leader_balancing_dead_node, true,
116
            "When a tserver is marked as dead, do we continue leader balancing for tables that "
117
            "have a replica on this tserver");
118
119
DEFINE_test_flag(int32, load_balancer_wait_after_count_pending_tasks_ms, 0,
120
                 "For testing purposes, number of milliseconds to wait after counting and "
121
                 "finding pending tasks.");
122
123
DECLARE_int32(min_leader_stepdown_retry_interval_ms);
124
DECLARE_bool(enable_ysql_tablespaces_for_placement);
125
126
DEFINE_bool(load_balancer_count_move_as_add, true,
127
            "Should we enable state change to count add server triggered by load move as just an "
128
            "add instead of both an add and remove.");
129
130
DEFINE_bool(load_balancer_drive_aware, true,
131
            "When LB decides to move a tablet from server A to B, on the target LB "
132
            "should select the tablet to move from most loaded drive.");
133
134
DEFINE_bool(load_balancer_ignore_cloud_info_similarity, false,
135
            "If true, ignore the similarity between cloud infos when deciding which tablet "
136
            "to move.");
137
138
// TODO(tsplit): make false by default or even remove flag after
139
// https://github.com/yugabyte/yugabyte-db/issues/10301 is fixed.
140
DEFINE_test_flag(
141
    bool, load_balancer_skip_inactive_tablets, true, "Don't move inactive (hidden) tablets");
142
143
namespace yb {
144
namespace master {
145
146
using std::unique_ptr;
147
using std::make_unique;
148
using std::string;
149
using std::set;
150
using std::vector;
151
using strings::Substitute;
152
153
namespace {
154
155
vector<set<TabletId>> GetTabletsOnTSToMove(bool drive_aware,
156
270k
                                         const CBTabletServerMetadata& from_ts_meta) {
157
270k
  vector<set<TabletId>> all_tablets;
158
270k
  if (drive_aware) {
159
544k
    for (const auto& path : from_ts_meta.sorted_path_load_by_tablets_count) {
160
544k
      auto path_list = from_ts_meta.path_to_tablets.find(path);
161
544k
      if (path_list == from_ts_meta.path_to_tablets.end()) {
162
0
        LOG(INFO) << "Found uninitialized path: " << path;
163
0
        continue;
164
0
      }
165
544k
      all_tablets.push_back(path_list->second);
166
544k
    }
167
270k
  } else {
168
0
    all_tablets.push_back(from_ts_meta.running_tablets);
169
0
  }
170
270k
  return all_tablets;
171
270k
}
172
173
// Returns sorted list of pair tablet id and path on to_ts.
174
std::vector<std::pair<TabletId, std::string>> GetLeadersOnTSToMove(
175
139k
    bool drive_aware, const set<TabletId>& leaders, const CBTabletServerMetadata& to_ts_meta) {
176
139k
  std::vector<std::pair<TabletId, std::string>> peers;
177
139k
  if (drive_aware) {
178
193k
    for (const auto& path : to_ts_meta.sorted_path_load_by_leader_count) {
179
193k
      auto path_list = to_ts_meta.path_to_tablets.find(path);
180
193k
      if (path_list == to_ts_meta.path_to_tablets.end()) {
181
        // No tablets on this path, so skip it.
182
130k
        continue;
183
130k
      }
184
63.4k
      transform(path_list->second.begin(), path_list->second.end(), std::back_inserter(peers),
185
243k
                [&path_list](const TabletId& tablet_id) -> std::pair<TabletId, std::string> {
186
243k
                  return make_pair(tablet_id, path_list->first);
187
243k
                 });
188
63.4k
    }
189
139k
  } else {
190
0
    transform(to_ts_meta.running_tablets.begin(), to_ts_meta.running_tablets.end(),
191
0
              std::back_inserter(peers),
192
0
              [](const TabletId& tablet_id) -> std::pair<TabletId, std::string> {
193
0
                return make_pair(tablet_id, "");
194
0
               });
195
0
  }
196
139k
  std::vector<std::pair<TabletId, std::string>> intersection;
197
139k
  copy_if(peers.begin(), peers.end(), std::back_inserter(intersection),
198
243k
          [&leaders](const std::pair<TabletId, std::string>& tablet) {
199
243k
            return leaders.count(tablet.first) > 0;
200
243k
          });
201
139k
  return intersection;
202
139k
}
203
204
} // namespace
205
206
Result<ReplicationInfoPB> ClusterLoadBalancer::GetTableReplicationInfo(
207
896k
    const scoped_refptr<TableInfo>& table) const {
208
209
  // Return custom placement policy if it exists.
210
896k
  {
211
896k
    auto l = table->LockForRead();
212
896k
    if (l->pb.has_replication_info()) {
213
17.8k
      return l->pb.replication_info();
214
17.8k
    }
215
896k
  }
216
217
  // Custom placement policy does not exist. Check whether this table
218
  // has a tablespace associated with it, if so, return the placement info
219
  // for that tablespace.
220
878k
  auto replication_info = VERIFY_RESULT(tablespace_manager_->GetTableReplicationInfo(table));
221
878k
  if (replication_info) {
222
5.60k
    return replication_info.value();
223
5.60k
  }
224
225
  // No custom policy or tablespace specified for table.
226
872k
  return GetClusterReplicationInfo();
227
878k
}
228
229
947k
void ClusterLoadBalancer::InitTablespaceManager() {
230
947k
  tablespace_manager_ = catalog_manager_->GetTablespaceManager();
231
947k
}
232
233
891k
Status ClusterLoadBalancer::PopulatePlacementInfo(TabletInfo* tablet, PlacementInfoPB* pb) {
234
891k
  const auto& replication_info = VERIFY_RESULT(GetTableReplicationInfo(tablet->table()));
235
891k
  if (state_->options_->type == LIVE) {
236
888k
    pb->CopyFrom(replication_info.live_replicas());
237
888k
  }
238
891k
  if (state_->options_->type == READ_ONLY) {
239
3.25k
    if (replication_info.read_replicas_size() == 0) {
240
      // Should not reach here as tables that should not have read replicas should
241
      // have already been skipped before reaching here.
242
0
      return STATUS(IllegalState,
243
0
                    Format("Encountered a table $0 with no read replicas. Placement info $1",
244
0
                      tablet->table()->name(),
245
0
                      replication_info.DebugString()));
246
0
    }
247
3.25k
    pb->CopyFrom(GetReadOnlyPlacementFromUuid(replication_info));
248
3.25k
  }
249
891k
  return Status::OK();
250
891k
}
251
252
3.94M
Status ClusterLoadBalancer::UpdateTabletInfo(TabletInfo* tablet) {
253
3.94M
  const auto& table_id = tablet->table()->id();
254
  // Set the placement information on a per-table basis, only once.
255
3.94M
  if (!state_->placement_by_table_.count(table_id)) {
256
891k
    PlacementInfoPB pb;
257
891k
    {
258
891k
      RETURN_NOT_OK(PopulatePlacementInfo(tablet, &pb));
259
891k
    }
260
891k
    state_->placement_by_table_[table_id] = std::move(pb);
261
891k
  }
262
263
3.94M
  return state_->UpdateTablet(tablet);
264
3.94M
}
265
266
2.90M
const PlacementInfoPB& ClusterLoadBalancer::GetPlacementByTablet(const TabletId& tablet_id) const {
267
2.90M
  const auto& table_id = GetTabletMap().at(tablet_id)->table()->id();
268
2.90M
  return state_->placement_by_table_.at(table_id);
269
2.90M
}
270
271
7.99k
size_t ClusterLoadBalancer::get_total_wrong_placement() const {
272
7.99k
  return state_->tablets_wrong_placement_.size();
273
7.99k
}
274
275
7.99k
size_t ClusterLoadBalancer::get_total_blacklisted_servers() const {
276
7.99k
  return state_->blacklisted_servers_.size();
277
7.99k
}
278
279
7.99k
size_t ClusterLoadBalancer::get_total_leader_blacklisted_servers() const {
280
7.99k
  return state_->leader_blacklisted_servers_.size();
281
7.99k
}
282
283
815k
size_t ClusterLoadBalancer::get_total_over_replication() const {
284
815k
  return state_->tablets_over_replicated_.size();
285
815k
}
286
287
7.99k
size_t ClusterLoadBalancer::get_total_under_replication() const {
288
7.99k
  return state_->tablets_missing_replicas_.size();
289
7.99k
}
290
291
8.00k
size_t ClusterLoadBalancer::get_total_starting_tablets() const {
292
8.00k
  return global_state_->total_starting_tablets_;
293
8.00k
}
294
295
8.00k
int ClusterLoadBalancer::get_total_running_tablets() const {
296
8.00k
  return state_->total_running_;
297
8.00k
}
298
299
1.56M
bool ClusterLoadBalancer::IsLoadBalancerEnabled() const {
300
1.56M
  return FLAGS_enable_load_balancing && 
is_enabled_950k
;
301
1.56M
}
302
303
// Load balancer class.
304
ClusterLoadBalancer::ClusterLoadBalancer(CatalogManager* cm)
305
    : random_(GetRandomSeed32()),
306
      is_enabled_(FLAGS_enable_load_balancing),
307
8.07k
      cbuf_activities_(FLAGS_load_balancer_num_idle_runs) {
308
8.07k
  ResetGlobalState(false /* initialize_ts_descs */);
309
310
8.07k
  catalog_manager_ = cm;
311
8.07k
}
312
313
// Reduce remaining_tasks by pending_tasks value, after sanitizing inputs.
314
template <class T>
315
3.73M
void set_remaining(T pending_tasks, T* remaining_tasks) {
316
3.73M
  if (pending_tasks > *remaining_tasks) {
317
0
    LOG(WARNING) << "Pending tasks > max allowed tasks: " << pending_tasks << " > "
318
0
                 << *remaining_tasks;
319
0
    *remaining_tasks = 0;
320
3.73M
  } else {
321
3.73M
    *remaining_tasks -= pending_tasks;
322
3.73M
  }
323
3.73M
}
void yb::master::set_remaining<int>(int, int*)
Line
Count
Source
315
2.84M
void set_remaining(T pending_tasks, T* remaining_tasks) {
316
2.84M
  if (pending_tasks > *remaining_tasks) {
317
0
    LOG(WARNING) << "Pending tasks > max allowed tasks: " << pending_tasks << " > "
318
0
                 << *remaining_tasks;
319
0
    *remaining_tasks = 0;
320
2.84M
  } else {
321
2.84M
    *remaining_tasks -= pending_tasks;
322
2.84M
  }
323
2.84M
}
void yb::master::set_remaining<unsigned long>(unsigned long, unsigned long*)
Line
Count
Source
315
891k
void set_remaining(T pending_tasks, T* remaining_tasks) {
316
891k
  if (pending_tasks > *remaining_tasks) {
317
0
    LOG(WARNING) << "Pending tasks > max allowed tasks: " << pending_tasks << " > "
318
0
                 << *remaining_tasks;
319
0
    *remaining_tasks = 0;
320
891k
  } else {
321
891k
    *remaining_tasks -= pending_tasks;
322
891k
  }
323
891k
}
324
325
// Needed as we have a unique_ptr to the forward declared PerTableLoadState class.
326
93
ClusterLoadBalancer::~ClusterLoadBalancer() = default;
327
328
1.55M
void ClusterLoadBalancer::RunLoadBalancerWithOptions(Options* options) {
329
1.55M
  ResetGlobalState();
330
331
1.55M
  uint32_t master_errors = 0;
332
333
1.55M
  if (!IsLoadBalancerEnabled()) {
334
610k
    LOG(INFO) << "Load balancing is not enabled.";
335
610k
    return;
336
610k
  }
337
338
947k
  if (!FLAGS_transaction_tables_use_preferred_zones) {
339
947k
    VLOG
(1) << "Transaction tables will not respect leadership affinity."1.02k
;
340
947k
  }
341
342
947k
  std::unique_ptr<Options> options_unique_ptr;
343
947k
  if (options == nullptr) {
344
0
    options_unique_ptr = std::make_unique<Options>();
345
0
    options = options_unique_ptr.get();
346
0
  }
347
348
947k
  InitTablespaceManager();
349
350
  // Lock the CatalogManager maps for the duration of the load balancer run.
351
947k
  CatalogManager::SharedLock lock(catalog_manager_->mutex_);
352
353
947k
  int remaining_adds = options->kMaxConcurrentAdds;
354
947k
  int remaining_removals = options->kMaxConcurrentRemovals;
355
947k
  int remaining_leader_moves = options->kMaxConcurrentLeaderMoves;
356
357
  // Loop over all tables to get the count of pending tasks.
358
947k
  int pending_add_replica_tasks = 0;
359
947k
  int pending_remove_replica_tasks = 0;
360
947k
  int pending_stepdown_leader_tasks = 0;
361
362
77.1M
  for (const auto& table : GetTableMap()) {
363
77.1M
    if (SkipLoadBalancing(*table.second)) {
364
      // Populate the list of tables for which LB has been skipped
365
      // in LB's internal vector.
366
76.2M
      skipped_tables_per_run_.push_back(table.second);
367
76.2M
      continue;
368
76.2M
    }
369
895k
    const TableId& table_id = table.first;
370
895k
    if (tablespace_manager_->NeedsRefreshToFindTablePlacement(table.second)) {
371
      // Placement information was not present in catalog manager cache. This is probably a
372
      // recently created table, skip load balancing for now, hopefully by the next run,
373
      // the background task in the catalog manager will pick up the placement information
374
      // for this table from the PG catalog tables.
375
      // TODO(deepthi) Keep track of the number of times this happens, take appropriate action
376
      // if placement stays missing over period of time.
377
2.88k
      YB_LOG_EVERY_N(INFO, 10) << "Skipping load balancing for table " << table.second->name()
378
305
                               << " as its placement information is not available yet";
379
2.88k
      master_errors++;
380
2.88k
      continue;
381
2.88k
    }
382
383
892k
    if (options->type == READ_ONLY) {
384
4.60k
      const auto& result = GetTableReplicationInfo(table.second);
385
4.60k
      if (!result) {
386
0
        YB_LOG_EVERY_N(WARNING, 10) << "Skipping load balancing for " << table.first << ": "
387
0
                                    << "as fetching replication info failed with error "
388
0
                                    << StatusToString(result.status());
389
0
        master_errors++;
390
0
        continue;
391
0
      }
392
4.60k
      if (result.get().read_replicas_size() == 0) {
393
        // The table has a replication policy without any read replicas present.
394
        // The LoadBalancer is handling read replicas in this run, so this
395
        // table can be skipped.
396
1.35k
        continue;
397
1.35k
      }
398
4.60k
    }
399
891k
    ResetTableStatePtr(table_id, options);
400
401
891k
    bool is_txn_table = table.second->GetTableType() == TRANSACTION_STATUS_TABLE_TYPE;
402
891k
    state_->use_preferred_zones_ = !is_txn_table || 
FLAGS_transaction_tables_use_preferred_zones115k
;
403
891k
    InitializeTSDescriptors();
404
405
891k
    Status s = CountPendingTasksUnlocked(table_id,
406
891k
                                         &pending_add_replica_tasks,
407
891k
                                         &pending_remove_replica_tasks,
408
891k
                                         &pending_stepdown_leader_tasks);
409
891k
    if (!s.ok()) {
410
      // Found uninitialized ts_meta, so don't load balance this table yet.
411
0
      LOG(WARNING) << "Skipping load balancing " << table.first << ": " << StatusToString(s);
412
0
      per_table_states_.erase(table_id);
413
0
      master_errors++;
414
0
      continue;
415
0
    }
416
891k
  }
417
418
947k
  if (pending_add_replica_tasks + pending_remove_replica_tasks + pending_stepdown_leader_tasks> 0) {
419
1.50k
    LOG(INFO) << "Total pending adds=" << pending_add_replica_tasks << ", total pending removals="
420
1.50k
              << pending_remove_replica_tasks << ", total pending leader stepdowns="
421
1.50k
              << pending_stepdown_leader_tasks;
422
1.50k
    if (PREDICT_FALSE(FLAGS_TEST_load_balancer_wait_after_count_pending_tasks_ms > 0)) {
423
0
      LOG(INFO) << "Sleeping after finding pending tasks for "
424
0
                << FLAGS_TEST_load_balancer_wait_after_count_pending_tasks_ms << " ms";
425
0
      SleepFor(
426
0
          MonoDelta::FromMilliseconds(FLAGS_TEST_load_balancer_wait_after_count_pending_tasks_ms));
427
0
    }
428
1.50k
  }
429
430
947k
  set_remaining(pending_add_replica_tasks, &remaining_adds);
431
947k
  set_remaining(pending_remove_replica_tasks, &remaining_removals);
432
947k
  set_remaining(pending_stepdown_leader_tasks, &remaining_leader_moves);
433
434
  // At the start of the run, report LB state that might prevent it from running smoothly.
435
947k
  ReportUnusualLoadBalancerState();
436
437
  // Loop over all tables to analyze the global and per-table load.
438
77.1M
  for (const auto& table : GetTableMap()) {
439
77.1M
    if (SkipLoadBalancing(*table.second)) {
440
76.2M
      continue;
441
76.2M
    }
442
443
895k
    auto it = per_table_states_.find(table.first);
444
895k
    if (it == per_table_states_.end()) {
445
      // If the table state doesn't exist, it was not fully initialized in the previous iteration.
446
4.24k
      VLOG
(1) << "Unable to find the state for table " << table.first1.42k
;
447
4.24k
      continue;
448
4.24k
    }
449
891k
    state_ = it->second.get();
450
451
    // Prepare the in-memory structures.
452
891k
    auto handle_analyze_tablets = AnalyzeTabletsUnlocked(table.first);
453
891k
    if (!handle_analyze_tablets.ok()) {
454
208
      LOG(WARNING) << "Skipping load balancing " << table.first << ": "
455
208
                   << StatusToString(handle_analyze_tablets);
456
208
      per_table_states_.erase(table.first);
457
208
      master_errors++;
458
208
    }
459
891k
  }
460
461
947k
  VLOG(1) << "Number of remote bootstraps before running load balancer: "
462
1.02k
          << global_state_->total_starting_tablets_;
463
464
  // Iterate over all the tables to take actions based on the data collected on the previous loop.
465
77.1M
  for (const auto& table : GetTableMap()) {
466
77.1M
    state_ = nullptr;
467
77.1M
    if (remaining_adds == 0 && 
remaining_removals == 037.7k
&&
remaining_leader_moves == 05.55k
) {
468
4
      break;
469
4
    }
470
77.1M
    if (SkipLoadBalancing(*table.second)) {
471
76.2M
      continue;
472
76.2M
    }
473
474
895k
    auto it = per_table_states_.find(table.first);
475
895k
    if (it == per_table_states_.end()) {
476
      // If the table state doesn't exist, it didn't get analyzed by the previous iteration.
477
4.45k
      VLOG(1) << "Unable to find table state for table " << table.first
478
1.46k
              << ". Skipping load balancing execution";
479
4.45k
      continue;
480
891k
    } else {
481
891k
      VLOG
(5) << "Load balancing table " << table.first0
;
482
891k
    }
483
891k
    state_ = it->second.get();
484
485
    // We may have modified global loads, so we need to reset this state's load.
486
891k
    state_->SortLoad();
487
488
    // Output parameters are unused in the load balancer, but useful in testing.
489
891k
    TabletId out_tablet_id;
490
891k
    TabletServerId out_from_ts;
491
891k
    TabletServerId out_to_ts;
492
493
    // Handle adding and moving replicas.
494
893k
    for ( ; remaining_adds > 0; 
--remaining_adds2.17k
) {
495
890k
      if (state_->allow_only_leader_balancing_) {
496
83.9k
        YB_LOG_EVERY_N_SECS(INFO, 30) << "Skipping Add replicas. Only leader balancing table "
497
2.03k
                                      << table.first;
498
83.9k
        break;
499
83.9k
      }
500
806k
      auto handle_add = HandleAddReplicas(&out_tablet_id, &out_from_ts, &out_to_ts);
501
806k
      if (!handle_add.ok()) {
502
2.62k
        LOG(WARNING) << "Skipping add replicas for " << table.first << ": "
503
2.62k
                     << StatusToString(handle_add);
504
2.62k
        master_errors++;
505
2.62k
        break;
506
2.62k
      }
507
803k
      if (!*handle_add) {
508
801k
        break;
509
801k
      }
510
803k
    }
511
891k
    if (PREDICT_FALSE(FLAGS_TEST_load_balancer_handle_under_replicated_tablets_only)) {
512
210
      LOG(INFO) << "Skipping remove replicas and leader moves for " << table.first;
513
210
      continue;
514
210
    }
515
516
    // Handle cleanup after over-replication.
517
893k
    
for ( ; 891k
remaining_removals > 0;
--remaining_removals2.51k
) {
518
891k
      if (state_->allow_only_leader_balancing_) {
519
83.4k
        YB_LOG_EVERY_N_SECS(INFO, 30) << "Skipping remove replicas. Only leader balancing table "
520
2.01k
                                      << table.first;
521
83.4k
        break;
522
83.4k
      }
523
807k
      auto handle_remove = HandleRemoveReplicas(&out_tablet_id, &out_from_ts);
524
807k
      if (!handle_remove.ok()) {
525
0
        LOG(WARNING) << "Skipping remove replicas for " << table.first << ": "
526
0
                     << StatusToString(handle_remove);
527
0
        master_errors++;
528
0
        break;
529
0
      }
530
807k
      if (!*handle_remove) {
531
805k
        break;
532
805k
      }
533
807k
    }
534
535
    // Handle tablet servers with too many leaders.
536
    // Check the current pending tasks per table to ensure we don't trigger the same task.
537
891k
    size_t table_remaining_leader_moves = state_->options_->kMaxConcurrentLeaderMovesPerTable;
538
891k
    set_remaining(state_->pending_stepdown_leader_tasks_[table.first].size(),
539
891k
                  &table_remaining_leader_moves);
540
    // Keep track of both the global and per table limit on number of moves.
541
891k
    for ( ;
542
944k
         remaining_leader_moves > 0 && 
table_remaining_leader_moves > 0937k
;
543
891k
         
--remaining_leader_moves, --table_remaining_leader_moves53.2k
) {
544
885k
      auto handle_leader = HandleLeaderMoves(&out_tablet_id, &out_from_ts, &out_to_ts);
545
885k
      if (!handle_leader.ok()) {
546
1
        LOG(WARNING) << "Skipping leader moves for " << table.first << ": "
547
1
                     << StatusToString(handle_leader);
548
1
        master_errors++;
549
1
        break;
550
1
      }
551
885k
      if (!*handle_leader) {
552
832k
        break;
553
832k
      }
554
885k
    }
555
891k
  }
556
557
947k
  RecordActivity(master_errors);
558
947k
}
559
560
1.54M
void ClusterLoadBalancer::RunLoadBalancer(Options* options) {
561
1.54M
  SysClusterConfigEntryPB config;
562
1.54M
  CHECK_OK(catalog_manager_->GetClusterConfig(&config));
563
564
1.54M
  std::unique_ptr<Options> options_unique_ptr =
565
1.54M
      std::make_unique<Options>();
566
1.54M
  Options* options_ent = options_unique_ptr.get();
567
  // First, we load balance the live cluster.
568
1.54M
  options_ent->type = LIVE;
569
1.54M
  if (config.replication_info().live_replicas().has_placement_uuid()) {
570
12.7k
    options_ent->placement_uuid = config.replication_info().live_replicas().placement_uuid();
571
12.7k
    options_ent->live_placement_uuid = options_ent->placement_uuid;
572
1.53M
  } else {
573
1.53M
    options_ent->placement_uuid = "";
574
1.53M
    options_ent->live_placement_uuid = "";
575
1.53M
  }
576
1.54M
  RunLoadBalancerWithOptions(options_ent);
577
578
  // Then, we balance all read-only clusters.
579
1.54M
  options_ent->type = READ_ONLY;
580
1.55M
  for (int i = 0; i < config.replication_info().read_replicas_size(); 
i++12.5k
) {
581
12.5k
    const PlacementInfoPB& read_only_cluster = config.replication_info().read_replicas(i);
582
12.5k
    options_ent->placement_uuid = read_only_cluster.placement_uuid();
583
12.5k
    RunLoadBalancerWithOptions(options_ent);
584
12.5k
  }
585
1.54M
}
586
587
947k
void ClusterLoadBalancer::RecordActivity(uint32_t master_errors) {
588
  // Update the list of tables for whom load-balancing has been
589
  // skipped in this run.
590
947k
  {
591
947k
    std::lock_guard<decltype(mutex_)> l(mutex_);
592
947k
    skipped_tables_ = skipped_tables_per_run_;
593
947k
  }
594
595
947k
  uint32_t table_tasks = 0;
596
77.1M
  for (const auto& table : GetTableMap()) {
597
77.1M
    table_tasks += table.second->NumLBTasks();
598
77.1M
  }
599
600
947k
  struct ActivityInfo ai {table_tasks, master_errors};
601
602
  // Update circular buffer summary.
603
604
947k
  if (ai.IsIdle()) {
605
892k
    num_idle_runs_++;
606
892k
  } else {
607
54.9k
    VLOG(1) <<
608
162
      Substitute("Load balancer has $0 table tasks and $1 master errors",
609
162
          table_tasks, master_errors);
610
54.9k
  }
611
612
947k
  if (cbuf_activities_.full()) {
613
935k
    if (cbuf_activities_.front().IsIdle()) {
614
880k
      num_idle_runs_--;
615
880k
    }
616
935k
  }
617
618
  // Mutate circular buffer.
619
947k
  cbuf_activities_.push_back(std::move(ai));
620
621
  // Update state.
622
947k
  is_idle_.store(num_idle_runs_ == cbuf_activities_.size(), std::memory_order_release);
623
624
  // Two interesting cases when updating can_perform_global_operations_ state:
625
  // If we previously couldn't balance global load, but now the LB is idle, enable global balancing.
626
  // If we previously could balance global load, but now the LB is busy, then it is busy balancing
627
  // global load or doing other operations (remove, etc.). In this case, we keep global balancing
628
  // enabled up until we perform a non-global balancing move (see GetLoadToMove()).
629
  // TODO(julien) some small improvements can be made here, such as ignoring leader stepdown tasks.
630
947k
  can_perform_global_operations_ = can_perform_global_operations_ || 
ai.IsIdle()59.8k
;
631
947k
}
632
633
2.71k
Status ClusterLoadBalancer::IsIdle() const {
634
2.71k
  if (IsLoadBalancerEnabled() && !is_idle_.load(std::memory_order_acquire)) {
635
2.18k
    return STATUS(
636
2.18k
        IllegalState,
637
2.18k
        "Task or error encountered recently.",
638
2.18k
        MasterError(MasterErrorPB::LOAD_BALANCER_RECENTLY_ACTIVE));
639
2.18k
  }
640
641
538
  return Status::OK();
642
2.71k
}
643
644
291k
bool ClusterLoadBalancer::CanBalanceGlobalLoad() const {
645
291k
  return FLAGS_enable_global_load_balancing && 
can_perform_global_operations_290k
;
646
291k
}
647
648
947k
void ClusterLoadBalancer::ReportUnusualLoadBalancerState() const {
649
3.02M
  for (const auto& ts_desc : global_state_->ts_descs_) {
650
    // Report if any ts has a pending delete.
651
3.02M
    if (ts_desc->HasTabletDeletePending()) {
652
16.4k
      LOG(INFO) << Format("tablet server $0 has a pending delete for tablets $1",
653
16.4k
                          ts_desc->permanent_uuid(), ts_desc->PendingTabletDeleteToString());
654
16.4k
    }
655
3.02M
  }
656
947k
}
657
658
1.56M
void ClusterLoadBalancer::ResetGlobalState(bool initialize_ts_descs) {
659
1.56M
  per_table_states_.clear();
660
1.56M
  global_state_ = std::make_unique<GlobalLoadState>();
661
1.56M
  global_state_->drive_aware_ = FLAGS_load_balancer_drive_aware;
662
1.56M
  if (initialize_ts_descs) {
663
    // Only call GetAllDescriptors once for a LB run, and then cache it in global_state_.
664
1.55M
    GetAllDescriptors(&global_state_->ts_descs_);
665
1.55M
  }
666
1.56M
  skipped_tables_per_run_.clear();
667
1.56M
}
668
669
891k
void ClusterLoadBalancer::ResetTableStatePtr(const TableId& table_id, Options* options) {
670
891k
  auto table_state = std::make_unique<PerTableLoadState>(global_state_.get());
671
891k
  table_state->options_ = options;
672
891k
  state_ = table_state.get();
673
891k
  per_table_states_[table_id] = std::move(table_state);
674
675
891k
  state_->table_id_ = table_id;
676
891k
}
677
678
891k
Status ClusterLoadBalancer::AnalyzeTabletsUnlocked(const TableId& table_uuid) {
679
891k
  auto tablets = VERIFY_RESULT_PREPEND(
680
891k
      GetTabletsForTable(table_uuid), "Skipping table " + table_uuid + "due to error: ");
681
682
  // Loop over tablet map to register the load that is already live in the cluster.
683
3.94M
  for (const auto& tablet : tablets) {
684
3.94M
    bool tablet_running = false;
685
3.94M
    {
686
3.94M
      auto tablet_lock = tablet->LockForRead();
687
688
3.94M
      if (!tablet->table()) {
689
        // Tablet is orphaned or in preparing state, continue.
690
0
        continue;
691
0
      }
692
3.94M
      tablet_running = tablet_lock->is_running();
693
3.94M
    }
694
695
    // This is from the perspective of the CatalogManager and the on-disk, persisted
696
    // SysCatalogStatePB. What this means is that this tablet was properly created as part of a
697
    // CreateTable and the information was sent to the initial set of TS and the tablet got to an
698
    // initial running state.
699
    //
700
    // This is different from the individual, per-TS state of the tablet, which can vary based on
701
    // the TS itself. The tablet can be registered as RUNNING, as far as the CatalogManager is
702
    // concerned, but just be underreplicated, and have some TS currently bootstrapping instances
703
    // of the tablet.
704
3.94M
    if (tablet_running) {
705
3.94M
      RETURN_NOT_OK(UpdateTabletInfo(tablet.get()));
706
3.94M
    }
707
3.94M
  }
708
709
  // After updating the tablets and tablet servers, adjust the configured threshold if it is too
710
  // low for the given configuration.
711
891k
  state_->AdjustLeaderBalanceThreshold();
712
713
  // Once we've analyzed both the tablet server information as well as the tablets, we can sort the
714
  // load and are ready to apply the load balancing rules.
715
891k
  state_->SortLoad();
716
717
  // Since leader load is only needed to rebalance leaders, we keep the sorting separate.
718
891k
  state_->SortLeaderLoad();
719
720
891k
  VLOG(1) << Substitute(
721
7.99k
      "Table: $0. Total running tablets: $1. Total overreplication: $2. Total starting tablets: $3."
722
7.99k
      " Wrong placement: $4. BlackListed: $5. Total underreplication: $6, Leader BlackListed: $7",
723
7.99k
      table_uuid, get_total_running_tablets(), get_total_over_replication(),
724
7.99k
      get_total_starting_tablets(), get_total_wrong_placement(), get_total_blacklisted_servers(),
725
7.99k
      get_total_under_replication(), get_total_leader_blacklisted_servers());
726
727
3.94M
  for (const auto& tablet : tablets) {
728
3.94M
    const auto& tablet_id = tablet->id();
729
3.94M
    if (state_->pending_remove_replica_tasks_[table_uuid].count(tablet_id) > 0) {
730
651
      RETURN_NOT_OK(state_->RemoveReplica(
731
651
          tablet_id, state_->pending_remove_replica_tasks_[table_uuid][tablet_id]));
732
651
    }
733
3.94M
    if (state_->pending_stepdown_leader_tasks_[table_uuid].count(tablet_id) > 0) {
734
530
      const auto& tablet_meta = state_->per_tablet_meta_[tablet_id];
735
530
      const auto& from_ts = tablet_meta.leader_uuid;
736
530
      const auto& to_ts = state_->pending_stepdown_leader_tasks_[table_uuid][tablet_id];
737
530
      RETURN_NOT_OK(state_->MoveLeader(tablet->id(), from_ts, to_ts));
738
530
    }
739
3.94M
    if (state_->pending_add_replica_tasks_[table_uuid].count(tablet_id) > 0) {
740
423
      RETURN_NOT_OK(state_->AddReplica(tablet->id(),
741
423
                                       state_->pending_add_replica_tasks_[table_uuid][tablet_id]));
742
423
    }
743
3.94M
  }
744
745
891k
  return Status::OK();
746
891k
}
747
748
Result<bool> ClusterLoadBalancer::HandleAddIfMissingPlacement(
749
803k
    TabletId* out_tablet_id, TabletServerId* out_to_ts) {
750
803k
  for (const auto& tablet_id : state_->tablets_missing_replicas_) {
751
116k
    const auto& tablet_meta = state_->per_tablet_meta_[tablet_id];
752
116k
    const auto& placement_info = GetPlacementByTablet(tablet_id);
753
116k
    const auto& missing_placements = tablet_meta.under_replicated_placements;
754
    // Loop through TSs by load to find a TS that matches the placement needed and does not already
755
    // host this tablet.
756
229k
    for (const auto& ts_uuid : state_->sorted_load_) {
757
229k
      bool can_choose_ts = false;
758
      // If we had no placement information, it means we are just under-replicated, so just check
759
      // that we can use this tablet server.
760
229k
      if (placement_info.placement_blocks().empty()) {
761
        // No need to check placement info, as there is none.
762
224k
        can_choose_ts = VERIFY_RESULT(state_->CanAddTabletToTabletServer(tablet_id, ts_uuid));
763
224k
      } else {
764
        // We added a tablet to the set with missing replicas both if it is under-replicated, and we
765
        // added a placement to the tablet_meta under_replicated_placements if the num replicas in
766
        // that placement is fewer than min_num_replicas. If the under-replicated tablet has a
767
        // placement that is under-replicated and the ts is not in that placement, then that ts
768
        // isn't valid.
769
5.03k
        const auto& ts_meta = state_->per_ts_meta_[ts_uuid];
770
        // Either we have specific placement blocks that are under-replicated, so confirm
771
        // that this TS matches or all the placement blocks have min_num_replicas
772
        // but overall num_replicas is fewer than expected.
773
        // In the latter case, we still need to conform to the placement rules.
774
5.03k
        if (missing_placements.empty() ||
775
5.03k
            
tablet_meta.CanAddTSToMissingPlacements(ts_meta.descriptor)294
) {
776
          // If we don't have any missing placements but are under-replicated then we need to
777
          // validate placement information in order to avoid adding to a wrong placement block.
778
          //
779
          // Do the placement check for both the cases.
780
          // If we have missing placements then this check is a tautology otherwise it matters.
781
4.86k
          can_choose_ts = VERIFY_RESULT(state_->CanAddTabletToTabletServer(tablet_id, ts_uuid,
782
4.86k
                                                                                &placement_info));
783
4.86k
        }
784
5.03k
      }
785
      // If we've passed the checks, then we can choose this TS to add the replica to.
786
229k
      if (can_choose_ts) {
787
713
        *out_tablet_id = tablet_id;
788
713
        *out_to_ts = ts_uuid;
789
713
        RETURN_NOT_OK(AddReplica(tablet_id, ts_uuid));
790
713
        state_->tablets_missing_replicas_.erase(tablet_id);
791
713
        return true;
792
713
      }
793
229k
    }
794
116k
  }
795
803k
  return false;
796
803k
}
797
798
Result<bool> ClusterLoadBalancer::HandleAddIfWrongPlacement(
799
803k
    TabletId* out_tablet_id, TabletServerId* out_from_ts, TabletServerId* out_to_ts) {
800
803k
  for (const auto& tablet_id : state_->tablets_wrong_placement_) {
801
    // Skip this tablet, if it is already over-replicated, as it does not need another replica, it
802
    // should just have one removed in the removal step.
803
1.15k
    if (state_->tablets_over_replicated_.count(tablet_id)) {
804
296
      continue;
805
296
    }
806
863
    if (VERIFY_RESULT(state_->CanSelectWrongReplicaToMove(
807
863
            tablet_id, GetPlacementByTablet(tablet_id), out_from_ts, out_to_ts))) {
808
447
      *out_tablet_id = tablet_id;
809
447
      RETURN_NOT_OK(MoveReplica(tablet_id, *out_from_ts, *out_to_ts));
810
447
      return true;
811
447
    }
812
863
  }
813
802k
  return false;
814
803k
}
815
816
Result<bool> ClusterLoadBalancer::HandleAddReplicas(
817
806k
    TabletId* out_tablet_id, TabletServerId* out_from_ts, TabletServerId* out_to_ts) {
818
806k
  if (state_->options_->kAllowLimitStartingTablets) {
819
806k
    if (global_state_->total_starting_tablets_ >= state_->options_->kMaxTabletRemoteBootstraps) {
820
2
      return STATUS_SUBSTITUTE(TryAgain, "Cannot add replicas. Currently remote bootstrapping $0 "
821
2
          "tablets, when our max allowed is $1",
822
2
          global_state_->total_starting_tablets_, state_->options_->kMaxTabletRemoteBootstraps);
823
806k
    } else if (state_->total_starting_ >= state_->options_->kMaxTabletRemoteBootstrapsPerTable) {
824
813
      return STATUS_SUBSTITUTE(TryAgain, "Cannot add replicas. Currently remote bootstrapping $0 "
825
813
          "tablets for table $1, when our max allowed is $2 per table",
826
813
          state_->total_starting_, state_->table_id_,
827
813
          state_->options_->kMaxTabletRemoteBootstrapsPerTable);
828
813
    }
829
806k
  }
830
831
805k
  if (state_->options_->kAllowLimitOverReplicatedTablets &&
832
805k
      get_total_over_replication() >=
833
805k
          implicit_cast<size_t>(state_->options_->kMaxOverReplicatedTablets)) {
834
1.81k
    return STATUS_SUBSTITUTE(TryAgain,
835
1.81k
        "Cannot add replicas. Currently have a total overreplication of $0, when max allowed is $1"
836
1.81k
        ", overreplicated tablets: $2",
837
1.81k
        get_total_over_replication(), state_->options_->kMaxOverReplicatedTablets,
838
1.81k
        boost::algorithm::join(state_->tablets_over_replicated_, ", "));
839
1.81k
  }
840
841
803k
  VLOG(1) << "Number of global concurrent remote bootstrap sessions: "
842
7.86k
          <<  global_state_->total_starting_tablets_
843
7.86k
          << ", max allowed: " << state_->options_->kMaxTabletRemoteBootstraps
844
7.86k
          << ". Number of concurrent remote bootstrap sessions for table " << state_->table_id_
845
7.86k
          << ": " << state_->total_starting_
846
7.86k
          << ", max allowed: " << state_->options_->kMaxTabletRemoteBootstrapsPerTable;
847
848
  // Handle missing placements with highest priority, as it means we're potentially
849
  // under-replicated.
850
803k
  if (VERIFY_RESULT(HandleAddIfMissingPlacement(out_tablet_id, out_to_ts))) {
851
713
    return true;
852
713
  }
853
854
  // Handle wrong placements as next priority, as these could be servers we're moving off of, so
855
  // we can decommission ASAP.
856
803k
  if (VERIFY_RESULT(HandleAddIfWrongPlacement(out_tablet_id, out_from_ts, out_to_ts))) {
857
447
    return true;
858
447
  }
859
860
  // Finally, handle normal load balancing.
861
802k
  if (!VERIFY_RESULT(GetLoadToMove(out_tablet_id, out_from_ts, out_to_ts))) {
862
801k
    VLOG
(1) << "Cannot find any more tablets to move, under current constraints."7.81k
;
863
801k
    if (VLOG_IS_ON(1)) {
864
7.81k
      DumpSortedLoad();
865
7.81k
    }
866
801k
    return false;
867
801k
  }
868
869
1.07k
  return true;
870
802k
}
871
872
7.81k
void ClusterLoadBalancer::DumpSortedLoad() const {
873
7.81k
  ssize_t last_pos = state_->sorted_load_.size() - 1;
874
7.81k
  std::ostringstream out;
875
7.81k
  out << "Table load (global load): ";
876
28.9k
  for (ssize_t left = 0; left <= last_pos; 
++left21.0k
) {
877
21.0k
    const TabletServerId& uuid = state_->sorted_load_[left];
878
21.0k
    auto load = state_->GetLoad(uuid);
879
21.0k
    out << uuid << ":" << load << " (" << global_state_->GetGlobalLoad(uuid) << ") ";
880
21.0k
  }
881
7.81k
  VLOG(1) << out.str();
882
7.81k
}
883
884
Result<bool> ClusterLoadBalancer::GetLoadToMove(
885
802k
    TabletId* moving_tablet_id, TabletServerId* from_ts, TabletServerId* to_ts) {
886
802k
  if (state_->sorted_load_.empty()) {
887
118
    return false;
888
118
  }
889
890
  // Start with two indices pointing at left and right most ends of the sorted_load_ structure.
891
  //
892
  // We will try to find two TSs that have at least one tablet that can be moved amongst them, from
893
  // the higher load to the lower load TS. To do this, we will go through comparing the TSs
894
  // corresponding to our left and right indices, exclude tablets from the right, high loaded TS
895
  // according to our load balancing rules, such as load variance, starting tablets and not moving
896
  // already over-replicated tablets. We then compare the remaining set of tablets with the ones
897
  // hosted by the lower loaded TS and use ReservoirSample to pick a tablet from the set
898
  // difference. If there were no tablets to pick, we advance our state.
899
  //
900
  // The state is defined as the positions of the start and end indices. We always try to move the
901
  // right index back, until we cannot any more, due to either reaching the left index (cannot
902
  // rebalance from one TS to itself), or the difference of load between the two TSs is too low to
903
  // try to rebalance (if load variance is 1, it does not make sense to move tablets between the
904
  // TSs). When we cannot lower the right index any further, we reset it back to last_pos and
905
  // increment the left index.
906
  //
907
  // We stop the whole algorithm if the left index reaches last_pos, or if we reset the right index
908
  // and are already breaking the invariance rule, as that means that any further differences in
909
  // the interval between left and right cannot have load > kMinLoadVarianceToBalance.
910
802k
  ssize_t last_pos = state_->sorted_load_.size() - 1;
911
911k
  for (ssize_t left = 0; left <= last_pos; 
++left109k
) {
912
1.18M
    for (auto right = last_pos; right >= 0; 
--right269k
) {
913
1.18M
      const TabletServerId& low_load_uuid = state_->sorted_load_[left];
914
1.18M
      const TabletServerId& high_load_uuid = state_->sorted_load_[right];
915
1.18M
      ssize_t load_variance = state_->GetLoad(high_load_uuid) - state_->GetLoad(low_load_uuid);
916
1.18M
      bool is_global_balancing_move = false;
917
918
      // Check for state change or end conditions.
919
1.18M
      if (left == right || 
load_variance < state_->options_->kMinLoadVarianceToBalance1.10M
) {
920
        // Either both left and right are at the end, or there is no load_variance, which means
921
        // there will be no load_variance for any TSs between left and right, so we can return.
922
912k
        if (right == last_pos && 
load_variance == 0805k
) {
923
781k
          return false;
924
781k
        }
925
        // If there is load variance, then there is a chance we can benefit from globally balancing.
926
131k
        if (load_variance > 0 && 
CanBalanceGlobalLoad()26.0k
) {
927
22.2k
          int global_load_variance = global_state_->GetGlobalLoad(high_load_uuid) -
928
22.2k
                                     global_state_->GetGlobalLoad(low_load_uuid);
929
22.2k
          if (global_load_variance < state_->options_->kMinGlobalLoadVarianceToBalance) {
930
            // Already globally balanced. Since we are sorted by global load, we can return here as
931
            // there are no other moves for us to make.
932
20.5k
            return false;
933
20.5k
          }
934
          // Mark this move as a global balancing move and try to find a tablet to move.
935
1.70k
          is_global_balancing_move = true;
936
109k
        } else {
937
          // The load_variance is too low, which means we weren't able to find a load to move to
938
          // the left tserver. Continue and try with the next left tserver.
939
109k
          break;
940
109k
        }
941
131k
      }
942
943
      // If we don't find a tablet_id to move between these two TSs, advance the state.
944
270k
      if (VERIFY_RESULT(GetTabletToMove(high_load_uuid, low_load_uuid, moving_tablet_id))) {
945
        // If we got this far, we have the candidate we want, so fill in the output params and
946
        // return. The tablet_id is filled in from GetTabletToMove.
947
1.07k
        *from_ts = high_load_uuid;
948
1.07k
        *to_ts = low_load_uuid;
949
1.07k
        RETURN_NOT_OK(MoveReplica(*moving_tablet_id, high_load_uuid, low_load_uuid));
950
        // Update global state if necessary.
951
1.07k
        if (!is_global_balancing_move) {
952
1.04k
          can_perform_global_operations_ = false;
953
1.04k
        }
954
1.07k
        return true;
955
1.07k
      }
956
270k
    }
957
911k
  }
958
959
  // Should never get here.
960
0
  return STATUS(IllegalState, "Load balancing algorithm reached illegal state.");
961
802k
}
962
963
Result<bool> ClusterLoadBalancer::GetTabletToMove(
964
270k
    const TabletServerId& from_ts, const TabletServerId& to_ts, TabletId* moving_tablet_id) {
965
270k
  const auto& from_ts_meta = state_->per_ts_meta_[from_ts];
966
  // If drive aware, all_tablets is sorted by decreasing drive load.
967
270k
  vector<set<TabletId>> all_tablets_by_drive = GetTabletsOnTSToMove(global_state_->drive_aware_,
968
270k
                                                                    from_ts_meta);
969
270k
  vector<set<TabletId>> all_filtered_tablets_by_drive;
970
544k
  for (const set<TabletId>& drive_tablets : all_tablets_by_drive) {
971
544k
    set<TabletId> filtered_drive_tablets;
972
2.49M
    for (const TabletId& tablet_id : drive_tablets) {
973
      // We don't want to add a new replica to an already over-replicated tablet.
974
      //
975
      // TODO(bogdan): should make sure we pick tablets that this TS is not a leader of, so we
976
      // can ensure HandleRemoveReplicas removes them from this TS.
977
2.49M
      if (state_->tablets_over_replicated_.count(tablet_id)) {
978
8.10k
        continue;
979
8.10k
      }
980
      // Don't move a replica right after split
981
2.48M
      if (ContainsKey(from_ts_meta.disabled_by_ts_tablets, tablet_id)) {
982
0
        continue;
983
0
      }
984
985
2.48M
      if (VERIFY_RESULT(
986
2.48M
          state_->CanAddTabletToTabletServer(tablet_id, to_ts, &GetPlacementByTablet(tablet_id)))) {
987
311k
        filtered_drive_tablets.insert(tablet_id);
988
311k
      }
989
2.48M
    }
990
544k
    all_filtered_tablets_by_drive.push_back(filtered_drive_tablets);
991
544k
  }
992
993
  // Below, we choose a tablet to move. We first filter out any tablets which cannot be moved
994
  // because of placement limitations. Then, we prioritize moving a tablet whose leader is in the
995
  // same zone/region it is moving to (for faster remote bootstrapping).
996
543k
  for (const set<TabletId>& drive_tablets : all_filtered_tablets_by_drive) {
997
543k
    bool found_tablet_to_move = false;
998
543k
    CatalogManagerUtil::CloudInfoSimilarity chosen_tablet_ci_similarity =
999
543k
        CatalogManagerUtil::NO_MATCH;
1000
543k
    for (const TabletId& tablet_id : drive_tablets) {
1001
300k
      const auto& placement_info = GetPlacementByTablet(tablet_id);
1002
      // TODO(bogdan): this should be augmented as well to allow dropping by one replica, if still
1003
      // leaving us with more than the minimum.
1004
      //
1005
      // If we have placement information, we want to only pick the tablet if it's moving to the
1006
      // same placement, so we guarantee we're keeping the same type of distribution.
1007
      // Since we allow prefixes as well, we can still respect the placement of this tablet
1008
      // even if their placement ids aren't the same. An e.g.
1009
      // placement info of tablet: C.R1.*
1010
      // placement info of from_ts: C.R1.Z1
1011
      // placement info of to_ts: C.R2.Z2
1012
      // Note that we've assumed that for every TS there is a unique placement block to which it
1013
      // can be mapped (see the validation rules in yb_admin-client). If there is no unique
1014
      // placement block then it is simply the C.R.Z of the TS itself.
1015
300k
      auto from_ts_block = state_->GetValidPlacement(from_ts, &placement_info);
1016
300k
      auto to_ts_block = state_->GetValidPlacement(to_ts, &placement_info);
1017
300k
      bool same_placement = false;
1018
300k
      if (to_ts_block.has_value() && from_ts_block.has_value()) {
1019
300k
          same_placement = TSDescriptor::generate_placement_id(*from_ts_block) ==
1020
300k
                                  TSDescriptor::generate_placement_id(*to_ts_block);
1021
300k
      }
1022
1023
300k
      if (!placement_info.placement_blocks().empty() && 
!same_placement291k
) {
1024
290k
        continue;
1025
290k
      }
1026
1027
9.75k
      TabletServerId leader_ts = state_->per_tablet_meta_[tablet_id].leader_uuid;
1028
9.75k
      auto ci_similarity = CatalogManagerUtil::CloudInfoSimilarity::NO_MATCH;
1029
9.75k
      if (!leader_ts.empty() && 
!FLAGS_load_balancer_ignore_cloud_info_similarity9.42k
) {
1030
9.41k
        const auto leader_ci = state_->per_ts_meta_[leader_ts].descriptor->GetCloudInfo();
1031
9.41k
        const auto to_ts_ci = state_->per_ts_meta_[to_ts].descriptor->GetCloudInfo();
1032
9.41k
        ci_similarity = CatalogManagerUtil::ComputeCloudInfoSimilarity(leader_ci, to_ts_ci);
1033
9.41k
      }
1034
1035
9.75k
      if (found_tablet_to_move && 
ci_similarity <= chosen_tablet_ci_similarity8.68k
) {
1036
8.63k
        continue;
1037
8.63k
      }
1038
      // This is the best tablet to move, so far.
1039
1.12k
      found_tablet_to_move = true;
1040
1.12k
      *moving_tablet_id = tablet_id;
1041
1.12k
      chosen_tablet_ci_similarity = ci_similarity;
1042
1.12k
    }
1043
1044
    // If there is any tablet we can move from this drive, choose it and return.
1045
543k
    if (found_tablet_to_move) {
1046
1.07k
      return true;
1047
1.07k
    }
1048
543k
  }
1049
1050
269k
  return false;
1051
270k
}
1052
1053
Result<bool> ClusterLoadBalancer::GetLeaderToMove(TabletId* moving_tablet_id,
1054
                                                  TabletServerId* from_ts,
1055
                                                  TabletServerId* to_ts,
1056
885k
                                                  std::string* to_ts_path) {
1057
885k
  if (state_->sorted_leader_load_.empty()) {
1058
7.31k
    return false;
1059
7.31k
  }
1060
1061
  // Find out if there are leaders to be moved.
1062
879k
  
for (auto right = state_->sorted_leader_load_.size(); 878k
right > 0;) {
1063
879k
    --right;
1064
879k
    const TabletServerId& high_load_uuid = state_->sorted_leader_load_[right];
1065
879k
    auto high_leader_blacklisted = (state_->leader_blacklisted_servers_.find(high_load_uuid) !=
1066
879k
      state_->leader_blacklisted_servers_.end());
1067
879k
    if (high_leader_blacklisted) {
1068
1.15k
      auto high_load = state_->GetLeaderLoad(high_load_uuid);
1069
1.15k
      if (high_load > 0) {
1070
        // Leader blacklisted tserver with a leader replica.
1071
55
        break;
1072
1.09k
      } else {
1073
        // Leader blacklisted tserver without leader replica.
1074
1.09k
        continue;
1075
1.09k
      }
1076
878k
    } else {
1077
878k
      if (state_->IsLeaderLoadBelowThreshold(state_->sorted_leader_load_[right])) {
1078
        // Non-leader blacklisted tserver with not too many leader replicas.
1079
        // TODO(Sanket): Even though per table load is below the configured threshold,
1080
        // we might want to do global leader balancing above a certain threshold that is lower
1081
        // than the per table threshold. Can add another gflag/knob here later.
1082
10
        return false;
1083
878k
      } else {
1084
        // Non-leader blacklisted tserver with too many leader replicas.
1085
878k
        break;
1086
878k
      }
1087
878k
    }
1088
879k
  }
1089
1090
  // The algorithm to balance the leaders is very similar to the one for tablets:
1091
  //
1092
  // Start with two indices pointing at left and right most ends of the sorted_leader_load_
1093
  // structure. Note that leader blacklisted tserver is considered as having infinite leader load.
1094
  //
1095
  // We will try to find two TSs that have at least one leader that can be moved amongst them, from
1096
  // the higher load to the lower load TS. To do this, we will go through comparing the TSs
1097
  // corresponding to our left and right indices. We go through leaders on the higher loaded TS
1098
  // and find a running replica on the lower loaded TS to move the leader. If no leader can be
1099
  // be picked, we advance our state.
1100
  //
1101
  // The state is defined as the positions of the start and end indices. We always try to move the
1102
  // right index back, until we cannot any more, due to either reaching the left index (cannot
1103
  // rebalance from one TS to itself), or the difference of load between the two TSs is too low to
1104
  // try to rebalance (if load variance is 1, it does not make sense to move leaders between the
1105
  // TSs). When we cannot lower the right index any further, we reset it back to last_pos and
1106
  // increment the left index.
1107
  //
1108
  // We stop the whole algorithm if the left index reaches last_pos, or if we reset the right index
1109
  // and are already breaking the invariance rule, as that means that any further differences in
1110
  // the interval between left and right cannot have load > kMinLeaderLoadVarianceToBalance.
1111
878k
  const auto current_time = MonoTime::Now();
1112
878k
  ssize_t last_pos = state_->sorted_leader_load_.size() - 1;
1113
931k
  for (ssize_t left = 0; left <= last_pos; 
++left53.0k
) {
1114
931k
    const TabletServerId& low_load_uuid = state_->sorted_leader_load_[left];
1115
931k
    auto low_leader_blacklisted = (state_->leader_blacklisted_servers_.find(low_load_uuid) !=
1116
931k
        state_->leader_blacklisted_servers_.end());
1117
931k
    if (low_leader_blacklisted) {
1118
      // Left marker has gone beyond non-leader blacklisted tservers.
1119
1.06k
      return false;
1120
1.06k
    }
1121
1122
1.01M
    
for (auto right = last_pos; 930k
right >= 0;
--right85.7k
) {
1123
1.01M
      const TabletServerId& high_load_uuid = state_->sorted_leader_load_[right];
1124
1.01M
      auto high_leader_blacklisted = (state_->leader_blacklisted_servers_.find(high_load_uuid) !=
1125
1.01M
          state_->leader_blacklisted_servers_.end());
1126
1.01M
      ssize_t load_variance =
1127
1.01M
          state_->GetLeaderLoad(high_load_uuid) - state_->GetLeaderLoad(low_load_uuid);
1128
1129
1.01M
      bool is_global_balancing_move = false;
1130
1131
      // Check for state change or end conditions.
1132
1.01M
      if (left == right || 
(923k
load_variance < state_->options_->kMinLeaderLoadVarianceToBalance923k
&&
1133
923k
            
!high_leader_blacklisted792k
)) {
1134
        // Global leader balancing only if per table variance is > 0.
1135
        // If both left and right are same (i.e. load_variance is 0) and right is last_pos
1136
        // or right is last_pos and load_variance is 0 then we can return as we don't
1137
        // have any other moves to make.
1138
880k
        if (load_variance == 0 && 
right == last_pos615k
) {
1139
596k
          return false;
1140
596k
        }
1141
        // Check if we can benefit from global leader balancing.
1142
        // If we have > 0 load_variance and there are no per table moves left.
1143
283k
        if (load_variance > 0 && 
CanBalanceGlobalLoad()265k
) {
1144
230k
          int global_load_variance = state_->global_state_->GetGlobalLeaderLoad(high_load_uuid) -
1145
230k
                                        state_->global_state_->GetGlobalLeaderLoad(low_load_uuid);
1146
          // Already globally balanced. Since we are sorted by global load, we can return here as
1147
          // there are no other moves for us to make.
1148
230k
          if (global_load_variance < state_->options_->kMinGlobalLeaderLoadVarianceToBalance) {
1149
226k
            return false;
1150
226k
          }
1151
4.10k
          is_global_balancing_move = true;
1152
53.0k
        } else {
1153
53.0k
          break;
1154
53.0k
        }
1155
283k
      }
1156
1157
      // Find the leaders on the higher loaded TS that have running peers on the lower loaded TS.
1158
      // If there are, we have a candidate we want, so fill in the output params and return.
1159
139k
      const set<TabletId>& leaders = state_->per_ts_meta_[high_load_uuid].leaders;
1160
139k
      for (const auto& tablet : GetLeadersOnTSToMove(global_state_->drive_aware_,
1161
139k
                                                     leaders,
1162
139k
                                                     state_->per_ts_meta_[low_load_uuid])) {
1163
53.8k
        *moving_tablet_id = tablet.first;
1164
53.8k
        *to_ts_path = tablet.second;
1165
53.8k
        *from_ts = high_load_uuid;
1166
53.8k
        *to_ts = low_load_uuid;
1167
1168
53.8k
        const auto& per_tablet_meta = state_->per_tablet_meta_;
1169
53.8k
        const auto tablet_meta_iter = per_tablet_meta.find(tablet.first);
1170
53.8k
        if (PREDICT_TRUE(tablet_meta_iter != per_tablet_meta.end())) {
1171
53.8k
          const auto& tablet_meta = tablet_meta_iter->second;
1172
53.8k
          const auto& stepdown_failures = tablet_meta.leader_stepdown_failures;
1173
53.8k
          const auto stepdown_failure_iter = stepdown_failures.find(low_load_uuid);
1174
53.8k
          if (stepdown_failure_iter != stepdown_failures.end()) {
1175
554
            const auto time_since_failure = current_time - stepdown_failure_iter->second;
1176
554
            if (time_since_failure.ToMilliseconds() < FLAGS_min_leader_stepdown_retry_interval_ms) {
1177
554
              LOG(INFO) << "Cannot move tablet " << tablet.first << " leader from TS "
1178
554
                        << *from_ts << " to TS " << *to_ts << " yet: previous attempt with the same"
1179
554
                        << " intended leader failed only " << ToString(time_since_failure)
1180
554
                        << " ago (less " << "than " << FLAGS_min_leader_stepdown_retry_interval_ms
1181
554
                        << "ms).";
1182
554
            }
1183
554
            continue;
1184
554
          }
1185
53.8k
        } else {
1186
0
          LOG(WARNING) << "Did not find load balancer metadata for tablet " << *moving_tablet_id;
1187
0
        }
1188
1189
        // Leader movement solely due to leader blacklist.
1190
53.2k
        if (load_variance < state_->options_->kMinLeaderLoadVarianceToBalance &&
1191
53.2k
            
high_leader_blacklisted582
) {
1192
51
          state_->LogSortedLeaderLoad();
1193
51
          LOG(INFO) << "Move tablet " << tablet.first << " leader from leader blacklisted TS "
1194
51
            << *from_ts << " to TS " << *to_ts;
1195
51
        }
1196
53.2k
        if (!is_global_balancing_move) {
1197
52.7k
          can_perform_global_operations_ = false;
1198
52.7k
        }
1199
53.2k
        return true;
1200
53.8k
      }
1201
139k
    }
1202
930k
  }
1203
1204
  // Should never get here.
1205
0
  FATAL_ERROR("Load balancing algorithm reached invalid state!");
1206
0
}
1207
1208
Result<bool> ClusterLoadBalancer::HandleRemoveReplicas(
1209
807k
    TabletId* out_tablet_id, TabletServerId* out_from_ts) {
1210
  // Give high priority to removing tablets that are not respecting the placement policy.
1211
807k
  if (VERIFY_RESULT(HandleRemoveIfWrongPlacement(out_tablet_id, out_from_ts))) {
1212
968
    return true;
1213
968
  }
1214
1215
806k
  for (const auto& tablet_id : state_->tablets_over_replicated_) {
1216
    // Skip if there is a pending ADD_SERVER.
1217
4.45k
    if (VERIFY_RESULT(IsConfigMemberInTransitionMode(tablet_id)) ||
1218
4.45k
        
state_->per_tablet_meta_[tablet_id].starting > 03.14k
) {
1219
2.90k
      continue;
1220
2.90k
    }
1221
1222
1.54k
    const auto& tablet_meta = state_->per_tablet_meta_[tablet_id];
1223
1.54k
    const auto& tablet_servers = tablet_meta.over_replicated_tablet_servers;
1224
1.54k
    auto comparator = PerTableLoadState::Comparator(state_);
1225
1.54k
    vector<TabletServerId> sorted_ts;
1226
    // Don't include any tservers where this tablet is still starting.
1227
1.54k
    std::copy_if(
1228
1.54k
        tablet_servers.begin(), tablet_servers.end(), std::back_inserter(sorted_ts),
1229
5.81k
        [&](const TabletServerId& ts_uuid) {
1230
5.81k
          return !state_->per_ts_meta_[ts_uuid].starting_tablets.count(tablet_id);
1231
5.81k
        });
1232
1.54k
    if (sorted_ts.empty()) {
1233
0
      return STATUS_SUBSTITUTE(IllegalState, "No tservers to remove from over-replicated "
1234
0
                                             "tablet $0", tablet_id);
1235
0
    }
1236
    // Sort in reverse to first try to remove a replica from the highest loaded TS.
1237
1.54k
    sort(sorted_ts.rbegin(), sorted_ts.rend(), comparator);
1238
1.54k
    string remove_candidate = sorted_ts[0];
1239
1.54k
    *out_tablet_id = tablet_id;
1240
1.54k
    *out_from_ts = remove_candidate;
1241
    // Do force leader stepdown, as we are either not the leader or we are allowed to step down.
1242
1.54k
    RETURN_NOT_OK(RemoveReplica(tablet_id, remove_candidate));
1243
1.54k
    return true;
1244
1.54k
  }
1245
805k
  return false;
1246
806k
}
1247
1248
Result<bool> ClusterLoadBalancer::HandleRemoveIfWrongPlacement(
1249
807k
    TabletId* out_tablet_id, TabletServerId* out_from_ts) {
1250
807k
  for (const auto& tablet_id : state_->tablets_wrong_placement_) {
1251
5.02k
    LOG(INFO) << "Processing tablet " << tablet_id;
1252
    // Skip this tablet if it is not over-replicated.
1253
5.02k
    if (!state_->tablets_over_replicated_.count(tablet_id)) {
1254
3.68k
      continue;
1255
3.68k
    }
1256
    // Skip if there is a pending ADD_SERVER
1257
1.34k
    if (VERIFY_RESULT(IsConfigMemberInTransitionMode(tablet_id))) {
1258
372
      continue;
1259
372
    }
1260
968
    const auto& tablet_meta = state_->per_tablet_meta_[tablet_id];
1261
968
    TabletServerId target_uuid;
1262
    // Prioritize blacklisted servers, if any.
1263
968
    if (!tablet_meta.blacklisted_tablet_servers.empty()) {
1264
852
      target_uuid = *tablet_meta.blacklisted_tablet_servers.begin();
1265
852
    }
1266
    // If no blacklisted server could be chosen, try the wrong placement ones.
1267
968
    if (target_uuid.empty()) {
1268
116
      if (!tablet_meta.wrong_placement_tablet_servers.empty()) {
1269
116
        target_uuid = *tablet_meta.wrong_placement_tablet_servers.begin();
1270
116
      }
1271
116
    }
1272
    // If we found a tablet server, choose it.
1273
968
    if (!target_uuid.empty()) {
1274
968
      *out_tablet_id = tablet_id;
1275
968
      *out_from_ts = std::move(target_uuid);
1276
      // Force leader stepdown if we have wrong placements or blacklisted servers.
1277
968
      RETURN_NOT_OK(RemoveReplica(tablet_id, *out_from_ts));
1278
968
      return true;
1279
968
    }
1280
968
  }
1281
806k
  return false;
1282
807k
}
1283
1284
Result<bool> ClusterLoadBalancer::HandleLeaderLoadIfNonAffinitized(TabletId* moving_tablet_id,
1285
                                                                   TabletServerId* from_ts,
1286
                                                                   TabletServerId* to_ts,
1287
770k
                                                                   std::string* to_ts_path) {
1288
  // Similar to normal leader balancing, we double iterate from most loaded to least loaded
1289
  // non-affinitized nodes and least to most affinitized nodes. For each pair, we check whether
1290
  // there is any tablet intersection and if so, there is a match and we return true.
1291
  //
1292
  // If we go through all the node pairs or we see that the current non-affinitized
1293
  // leader load is 0, we know that there is no match from non-affinitized to affinitized nodes
1294
  // and we return false.
1295
770k
  for (size_t non_affinitized_idx = state_->sorted_non_affinitized_leader_load_.size();
1296
771k
       non_affinitized_idx > 0;) {
1297
712
    --non_affinitized_idx;
1298
712
    const TabletServerId& non_affinitized_uuid =
1299
712
        state_->sorted_non_affinitized_leader_load_[non_affinitized_idx];
1300
712
    if (state_->GetLeaderLoad(non_affinitized_uuid) == 0) {
1301
      // All subsequent non-affinitized nodes have no leaders, no match found.
1302
73
      return false;
1303
73
    }
1304
639
    const set<TabletId>& leaders = state_->per_ts_meta_[non_affinitized_uuid].leaders;
1305
639
    for (const auto& affinitized_uuid : state_->sorted_leader_load_) {
1306
37
      auto peers = GetLeadersOnTSToMove(global_state_->drive_aware_,
1307
37
                                        leaders,
1308
37
                                        state_->per_ts_meta_[affinitized_uuid]);
1309
1310
37
      if (!peers.empty()) {
1311
37
        auto peer = peers.begin();
1312
37
        *moving_tablet_id = peer->first;
1313
37
        *to_ts_path = peer->first;
1314
37
        *from_ts = non_affinitized_uuid;
1315
37
        *to_ts = affinitized_uuid;
1316
37
        return true;
1317
37
      }
1318
37
    }
1319
639
  }
1320
770k
  return false;
1321
770k
}
1322
1323
Result<bool> ClusterLoadBalancer::HandleLeaderMoves(
1324
885k
    TabletId* out_tablet_id, TabletServerId* out_from_ts, TabletServerId* out_to_ts) {
1325
  // If the user sets 'transaction_tables_use_preferred_zones' gflag to 0 and the tablet
1326
  // being balanced is a transaction tablet, then logical flow will be changed to ignore
1327
  // preferred zones and instead proceed to normal leader balancing.
1328
885k
  string out_ts_ts_path;
1329
885k
  if (state_->use_preferred_zones_ &&
1330
885k
    
VERIFY_RESULT770k
(HandleLeaderLoadIfNonAffinitized(
1331
885k
                    out_tablet_id, out_from_ts, out_to_ts, &out_ts_ts_path))) {
1332
37
    RETURN_NOT_OK(MoveLeader(*out_tablet_id, *out_from_ts, *out_to_ts, out_ts_ts_path));
1333
37
    return true;
1334
37
  }
1335
1336
885k
  if (VERIFY_RESULT(GetLeaderToMove(out_tablet_id, out_from_ts, out_to_ts, &out_ts_ts_path))) {
1337
53.2k
    RETURN_NOT_OK(MoveLeader(*out_tablet_id, *out_from_ts, *out_to_ts, out_ts_ts_path));
1338
53.2k
    return true;
1339
53.2k
  }
1340
832k
  return false;
1341
885k
}
1342
1343
Status ClusterLoadBalancer::MoveReplica(
1344
1.51k
    const TabletId& tablet_id, const TabletServerId& from_ts, const TabletServerId& to_ts) {
1345
1.51k
  LOG(INFO) << Substitute("Moving tablet $0 from $1 to $2", tablet_id, from_ts, to_ts);
1346
1.51k
  RETURN_NOT_OK(SendReplicaChanges(GetTabletMap().at(tablet_id), to_ts, true /* is_add */,
1347
1.51k
                                   true /* should_remove_leader */));
1348
1.51k
  RETURN_NOT_OK(state_->AddReplica(tablet_id, to_ts));
1349
1.51k
  return GetAtomicFlag(&FLAGS_load_balancer_count_move_as_add) ?
1350
1.51k
      Status::OK() : 
state_->RemoveReplica(tablet_id, from_ts)0
;
1351
1.51k
}
1352
1353
713
Status ClusterLoadBalancer::AddReplica(const TabletId& tablet_id, const TabletServerId& to_ts) {
1354
713
  LOG(INFO) << Substitute("Adding tablet $0 to $1", tablet_id, to_ts);
1355
  // This is an add operation, so the "should_remove_leader" flag is irrelevant.
1356
713
  RETURN_NOT_OK(SendReplicaChanges(GetTabletMap().at(tablet_id), to_ts, true /* is_add */,
1357
713
                                   true /* should_remove_leader */));
1358
713
  return state_->AddReplica(tablet_id, to_ts);
1359
713
}
1360
1361
Status ClusterLoadBalancer::RemoveReplica(
1362
2.55k
    const TabletId& tablet_id, const TabletServerId& ts_uuid) {
1363
2.55k
  LOG(INFO) << Substitute("Removing replica $0 from tablet $1", ts_uuid, tablet_id);
1364
2.55k
  RETURN_NOT_OK(SendReplicaChanges(GetTabletMap().at(tablet_id), ts_uuid, false /* is_add */,
1365
2.55k
                                   true /* should_remove_leader */));
1366
2.55k
  return state_->RemoveReplica(tablet_id, ts_uuid);
1367
2.55k
}
1368
1369
Status ClusterLoadBalancer::MoveLeader(const TabletId& tablet_id,
1370
                                       const TabletServerId& from_ts,
1371
                                       const TabletServerId& to_ts,
1372
53.3k
                                       const string& to_ts_path) {
1373
53.3k
  LOG(INFO) << Substitute("Moving leader of $0 from TS $1 to $2", tablet_id, from_ts, to_ts);
1374
53.3k
  RETURN_NOT_OK(SendReplicaChanges(GetTabletMap().at(tablet_id), from_ts, false /* is_add */,
1375
53.3k
                                   false /* should_remove_leader */, to_ts));
1376
1377
53.3k
  return state_->MoveLeader(tablet_id, from_ts, to_ts, to_ts_path);
1378
53.3k
}
1379
1380
775k
void ClusterLoadBalancer::GetAllAffinitizedZones(AffinitizedZonesSet* affinitized_zones) const {
1381
775k
  SysClusterConfigEntryPB config;
1382
775k
  CHECK_OK(catalog_manager_->GetClusterConfig(&config));
1383
775k
  const int num_zones = config.replication_info().affinitized_leaders_size();
1384
776k
  for (int i = 0; i < num_zones; 
i++306
) {
1385
306
    CloudInfoPB ci = config.replication_info().affinitized_leaders(i);
1386
306
    affinitized_zones->insert(ci);
1387
306
  }
1388
775k
}
1389
1390
891k
void ClusterLoadBalancer::InitializeTSDescriptors() {
1391
891k
  if (state_->use_preferred_zones_) {
1392
775k
    GetAllAffinitizedZones(&state_->affinitized_zones_);
1393
775k
  }
1394
  // Set the blacklist so we can also mark the tablet servers as we add them up.
1395
891k
  state_->SetBlacklist(GetServerBlacklist());
1396
1397
  // Set the leader blacklist so we can also mark the tablet servers as we add them up.
1398
891k
  state_->SetLeaderBlacklist(GetLeaderBlacklist());
1399
1400
  // Loop over tablet servers to set empty defaults, so we can also have info on those
1401
  // servers that have yet to receive load (have heartbeated to the master, but have not been
1402
  // assigned any tablets yet).
1403
2.86M
  for (const auto& ts_desc : global_state_->ts_descs_) {
1404
2.86M
    state_->UpdateTabletServer(ts_desc);
1405
2.86M
  }
1406
891k
}
1407
1408
// CatalogManager indirection methods that are set as virtual to be bypassed in testing.
1409
//
1410
0
void ClusterLoadBalancer::GetAllReportedDescriptors(TSDescriptorVector* ts_descs) const {
1411
0
  catalog_manager_->master_->ts_manager()->GetAllReportedDescriptors(ts_descs);
1412
0
}
1413
1414
1.55M
void ClusterLoadBalancer::GetAllDescriptors(TSDescriptorVector* ts_descs) const {
1415
1.55M
  catalog_manager_->master_->ts_manager()->GetAllDescriptors(ts_descs);
1416
1.55M
}
1417
1418
2.96M
const TabletInfoMap& ClusterLoadBalancer::GetTabletMap() const {
1419
2.96M
  return *catalog_manager_->tablet_map_;
1420
2.96M
}
1421
1422
891k
const scoped_refptr<TableInfo> ClusterLoadBalancer::GetTableInfo(const TableId& table_uuid) const {
1423
891k
  return catalog_manager_->GetTableInfoUnlocked(table_uuid);
1424
891k
}
1425
1426
891k
Result<TabletInfos> ClusterLoadBalancer::GetTabletsForTable(const TableId& table_uuid) const {
1427
891k
  auto table_info = GetTableInfo(table_uuid);
1428
1429
891k
  if (table_info == nullptr) {
1430
0
    return STATUS_FORMAT(
1431
0
        InvalidArgument, "Invalid UUID '$0' - no entry found in catalog manager table map",
1432
0
        table_uuid);
1433
0
  }
1434
1435
891k
  return table_info->GetTablets(IncludeInactive(!FLAGS_TEST_load_balancer_skip_inactive_tablets));
1436
891k
}
1437
1438
3.79M
const TableInfoMap& ClusterLoadBalancer::GetTableMap() const {
1439
3.79M
  return *catalog_manager_->table_ids_map_;
1440
3.79M
}
1441
1442
872k
const ReplicationInfoPB& ClusterLoadBalancer::GetClusterReplicationInfo() const {
1443
872k
  return catalog_manager_->ClusterConfig()->LockForRead()->pb.replication_info();
1444
872k
}
1445
1446
0
const PlacementInfoPB& ClusterLoadBalancer::GetClusterPlacementInfo() const {
1447
0
  auto l = catalog_manager_->ClusterConfig()->LockForRead();
1448
0
  if (state_->options_->type == LIVE) {
1449
0
    return l->pb.replication_info().live_replicas();
1450
0
  } else {
1451
0
    return GetReadOnlyPlacementFromUuid(l->pb.replication_info());
1452
0
  }
1453
0
}
1454
1455
891k
const BlacklistPB& ClusterLoadBalancer::GetServerBlacklist() const {
1456
891k
  return catalog_manager_->ClusterConfig()->LockForRead()->pb.server_blacklist();
1457
891k
}
1458
1459
891k
const BlacklistPB& ClusterLoadBalancer::GetLeaderBlacklist() const {
1460
891k
  return catalog_manager_->ClusterConfig()->LockForRead()->pb.leader_blacklist();
1461
891k
}
1462
1463
231M
bool ClusterLoadBalancer::SkipLoadBalancing(const TableInfo& table) const {
1464
  // Skip load-balancing of some tables:
1465
  // * system tables: they are virtual tables not hosted by tservers.
1466
  // * colocated user tables: they occupy the same tablet as their colocated parent table, so load
1467
  //   balancing just the colocated parent table is sufficient.
1468
  // * deleted/deleting tables: as they are no longer in effect. For tables that are being deleted
1469
  // currently as well, load distribution wouldn't matter as eventually they would get deleted.
1470
231M
  auto l = table.LockForRead();
1471
231M
  return (catalog_manager_->IsSystemTable(table) ||
1472
231M
          
table.IsColocatedUserTable()4.09M
||
1473
231M
          
l->started_deleting()4.06M
);
1474
231M
}
1475
1476
Status ClusterLoadBalancer::CountPendingTasksUnlocked(const TableId& table_uuid,
1477
                                            int* pending_add_replica_tasks,
1478
                                            int* pending_remove_replica_tasks,
1479
891k
                                            int* pending_stepdown_leader_tasks) {
1480
891k
  GetPendingTasks(table_uuid,
1481
891k
                  &state_->pending_add_replica_tasks_[table_uuid],
1482
891k
                  &state_->pending_remove_replica_tasks_[table_uuid],
1483
891k
                  &state_->pending_stepdown_leader_tasks_[table_uuid]);
1484
1485
891k
  *pending_add_replica_tasks += state_->pending_add_replica_tasks_[table_uuid].size();
1486
891k
  *pending_remove_replica_tasks += state_->pending_remove_replica_tasks_[table_uuid].size();
1487
891k
  *pending_stepdown_leader_tasks += state_->pending_stepdown_leader_tasks_[table_uuid].size();
1488
891k
  for (auto e : state_->pending_add_replica_tasks_[table_uuid]) {
1489
423
    const auto& ts_uuid = e.second;
1490
423
    const auto& tablet_id = e.first;
1491
423
    RETURN_NOT_OK(state_->AddStartingTablet(tablet_id, ts_uuid));
1492
423
  }
1493
891k
  return Status::OK();
1494
891k
}
1495
1496
void ClusterLoadBalancer::GetPendingTasks(const TableId& table_uuid,
1497
                                          TabletToTabletServerMap* add_replica_tasks,
1498
                                          TabletToTabletServerMap* remove_replica_tasks,
1499
891k
                                          TabletToTabletServerMap* stepdown_leader_tasks) {
1500
891k
  catalog_manager_->GetPendingServerTasksUnlocked(
1501
891k
      table_uuid, add_replica_tasks, remove_replica_tasks, stepdown_leader_tasks);
1502
891k
}
1503
1504
Status ClusterLoadBalancer::SendReplicaChanges(
1505
    scoped_refptr<TabletInfo> tablet, const TabletServerId& ts_uuid, const bool is_add,
1506
57.9k
    const bool should_remove_leader, const TabletServerId& new_leader_ts_uuid) {
1507
57.9k
  auto l = tablet->LockForRead();
1508
57.9k
  if (is_add) {
1509
    // These checks are temporary. They will be removed once we are confident that the algorithm is
1510
    // always doing the right thing.
1511
2.17k
    SCHECK_EQ(state_->pending_add_replica_tasks_[tablet->table()->id()].count(tablet->tablet_id()),
1512
2.17k
             0U,
1513
2.17k
             IllegalState,
1514
2.17k
             "Sending duplicate add replica task.");
1515
2.17k
    catalog_manager_->SendAddServerRequest(
1516
2.17k
        tablet, GetDefaultMemberType(), l->pb.committed_consensus_state(), ts_uuid);
1517
55.8k
  } else {
1518
    // If the replica is also the leader, first step it down and then remove.
1519
55.8k
    if (state_->per_tablet_meta_[tablet->id()].leader_uuid == ts_uuid) {
1520
54.0k
      SCHECK_EQ(
1521
54.0k
          state_->pending_stepdown_leader_tasks_[tablet->table()->id()].count(tablet->tablet_id()),
1522
54.0k
          0U,
1523
54.0k
          IllegalState,
1524
54.0k
          "Sending duplicate leader stepdown task.");
1525
54.0k
      catalog_manager_->SendLeaderStepDownRequest(
1526
54.0k
          tablet, l->pb.committed_consensus_state(), ts_uuid, should_remove_leader,
1527
54.0k
          new_leader_ts_uuid);
1528
54.0k
    } else {
1529
1.71k
      SCHECK_EQ(
1530
1.71k
          state_->pending_remove_replica_tasks_[tablet->table()->id()].count(tablet->tablet_id()),
1531
1.71k
          0U,
1532
1.71k
          IllegalState,
1533
1.71k
          "Sending duplicate remove replica task.");
1534
1.71k
      catalog_manager_->SendRemoveServerRequest(
1535
1.71k
          tablet, l->pb.committed_consensus_state(), ts_uuid);
1536
1.71k
    }
1537
55.8k
  }
1538
57.9k
  return Status::OK();
1539
57.9k
}
1540
1541
2.17k
consensus::PeerMemberType ClusterLoadBalancer::GetDefaultMemberType() {
1542
2.17k
  if (state_->options_->type == LIVE) {
1543
2.09k
    return consensus::PeerMemberType::PRE_VOTER;
1544
2.09k
  } else {
1545
73
    return consensus::PeerMemberType::PRE_OBSERVER;
1546
73
  }
1547
2.17k
}
1548
1549
5.79k
Result<bool> ClusterLoadBalancer::IsConfigMemberInTransitionMode(const TabletId &tablet_id) const {
1550
5.79k
  auto tablet = GetTabletMap().at(tablet_id);
1551
5.79k
  auto l = tablet->LockForRead();
1552
5.79k
  auto config = l->pb.committed_consensus_state().config();
1553
5.79k
  return CountVotersInTransition(config) != 0;
1554
5.79k
}
1555
1556
const PlacementInfoPB& ClusterLoadBalancer::GetReadOnlyPlacementFromUuid(
1557
3.25k
    const ReplicationInfoPB& replication_info) const {
1558
  // We assume we have an read replicas field in our replication info.
1559
3.29k
  for (int i = 0; i < replication_info.read_replicas_size(); 
i++42
) {
1560
3.29k
    const PlacementInfoPB& read_only_placement = replication_info.read_replicas(i);
1561
3.29k
    if (read_only_placement.placement_uuid() == state_->options_->placement_uuid) {
1562
3.25k
      return read_only_placement;
1563
3.25k
    }
1564
3.29k
  }
1565
  // Should never get here.
1566
0
  LOG(ERROR) << "Could not find read only cluster with placement uuid: "
1567
0
             << state_->options_->placement_uuid;
1568
0
  return replication_info.read_replicas(0);
1569
3.25k
}
1570
1571
0
const PlacementInfoPB& ClusterLoadBalancer::GetLiveClusterPlacementInfo() const {
1572
0
  auto l = catalog_manager_->ClusterConfig()->LockForRead();
1573
0
  return l->pb.replication_info().live_replicas();
1574
0
}
1575
1576
0
vector<scoped_refptr<TableInfo>> ClusterLoadBalancer::GetAllTablesLoadBalancerSkipped() {
1577
0
  SharedLock<decltype(mutex_)> l(mutex_);
1578
0
  return skipped_tables_;
1579
0
}
1580
1581
}  // namespace master
1582
}  // namespace yb