/Users/deen/code/yugabyte-db/src/yb/master/cluster_balance.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | // |
13 | | |
14 | | #include "yb/master/cluster_balance.h" |
15 | | |
16 | | #include <algorithm> |
17 | | #include <memory> |
18 | | #include <utility> |
19 | | |
20 | | #include <boost/algorithm/string/join.hpp> |
21 | | #include <boost/optional/optional.hpp> |
22 | | |
23 | | #include "yb/common/common.pb.h" |
24 | | |
25 | | #include "yb/consensus/quorum_util.h" |
26 | | |
27 | | #include "yb/gutil/casts.h" |
28 | | |
29 | | #include "yb/master/catalog_manager_util.h" |
30 | | |
31 | | #include "yb/master/master_fwd.h" |
32 | | #include "yb/master/master.h" |
33 | | #include "yb/master/master_error.h" |
34 | | |
35 | | #include "yb/util/flag_tags.h" |
36 | | #include "yb/util/status.h" |
37 | | #include "yb/util/status_format.h" |
38 | | #include "yb/util/status_log.h" |
39 | | |
40 | | DEFINE_bool(enable_load_balancing, |
41 | | true, |
42 | | "Choose whether to enable the load balancing algorithm, to move tablets around."); |
43 | | |
44 | | DEFINE_bool(transaction_tables_use_preferred_zones, |
45 | | false, |
46 | | "Choose whether transaction tablet leaders respect preferred zones."); |
47 | | |
48 | | DEFINE_bool(enable_global_load_balancing, |
49 | | true, |
50 | | "Choose whether to allow the load balancer to make moves that strictly only balance " |
51 | | "global load. Note that global balancing only occurs after all tables are balanced."); |
52 | | |
53 | | DEFINE_int32(leader_balance_threshold, |
54 | | 0, |
55 | | "Number of leaders per each tablet server to balance below. If this is configured to " |
56 | | "0 (the default), the leaders will be balanced optimally at extra cost."); |
57 | | |
58 | | DEFINE_int32(leader_balance_unresponsive_timeout_ms, |
59 | | 3 * 1000, |
60 | | "The period of time that a master can go without receiving a heartbeat from a " |
61 | | "tablet server before considering it unresponsive. Unresponsive servers are " |
62 | | "excluded from leader balancing."); |
63 | | |
64 | | DEFINE_int32(load_balancer_max_concurrent_tablet_remote_bootstraps, |
65 | | 10, |
66 | | "Maximum number of tablets being remote bootstrapped across the cluster."); |
67 | | |
68 | | DEFINE_int32(load_balancer_max_concurrent_tablet_remote_bootstraps_per_table, |
69 | | 2, |
70 | | "Maximum number of tablets being remote bootstrapped for any table. The maximum " |
71 | | "number of remote bootstraps across the cluster is still limited by the flag " |
72 | | "load_balancer_max_concurrent_tablet_remote_bootstraps. This flag is meant to prevent " |
73 | | "a single table use all the available remote bootstrap sessions and starving other " |
74 | | "tables."); |
75 | | |
76 | | DEFINE_int32(load_balancer_max_over_replicated_tablets, |
77 | | 1, |
78 | | "Maximum number of running tablet replicas that are allowed to be over the configured " |
79 | | "replication factor."); |
80 | | |
81 | | DEFINE_int32(load_balancer_max_concurrent_adds, |
82 | | 1, |
83 | | "Maximum number of tablet peer replicas to add in any one run of the load balancer."); |
84 | | |
85 | | DEFINE_int32(load_balancer_max_concurrent_removals, |
86 | | 1, |
87 | | "Maximum number of over-replicated tablet peer removals to do in any one run of the " |
88 | | "load balancer."); |
89 | | |
90 | | DEFINE_int32(load_balancer_max_concurrent_moves, |
91 | | 2, |
92 | | "Maximum number of tablet leaders on tablet servers (across the cluster) to move in " |
93 | | "any one run of the load balancer."); |
94 | | |
95 | | DEFINE_int32(load_balancer_max_concurrent_moves_per_table, |
96 | | 1, |
97 | | "Maximum number of tablet leaders per table to move in any one run of the load " |
98 | | "balancer. The maximum number of tablet leader moves across the cluster is still " |
99 | | "limited by the flag load_balancer_max_concurrent_moves. This flag is meant to " |
100 | | "prevent a single table from using all of the leader moves quota and starving " |
101 | | "other tables."); |
102 | | |
103 | | DEFINE_int32(load_balancer_num_idle_runs, |
104 | | 5, |
105 | | "Number of idle runs of load balancer to deem it idle."); |
106 | | |
107 | | DEFINE_test_flag(bool, load_balancer_handle_under_replicated_tablets_only, false, |
108 | | "Limit the functionality of the load balancer during tests so tests can make " |
109 | | "progress"); |
110 | | |
111 | | // No longer used because leader stepdown is not as slow as it used to be.) |
112 | | DEFINE_bool(load_balancer_skip_leader_as_remove_victim, false, |
113 | | "DEPRECATED. Should the LB skip a leader as a possible remove candidate."); |
114 | | |
115 | | DEFINE_bool(allow_leader_balancing_dead_node, true, |
116 | | "When a tserver is marked as dead, do we continue leader balancing for tables that " |
117 | | "have a replica on this tserver"); |
118 | | |
119 | | DEFINE_test_flag(int32, load_balancer_wait_after_count_pending_tasks_ms, 0, |
120 | | "For testing purposes, number of milliseconds to wait after counting and " |
121 | | "finding pending tasks."); |
122 | | |
123 | | DECLARE_int32(min_leader_stepdown_retry_interval_ms); |
124 | | DECLARE_bool(enable_ysql_tablespaces_for_placement); |
125 | | |
126 | | DEFINE_bool(load_balancer_count_move_as_add, true, |
127 | | "Should we enable state change to count add server triggered by load move as just an " |
128 | | "add instead of both an add and remove."); |
129 | | |
130 | | DEFINE_bool(load_balancer_drive_aware, true, |
131 | | "When LB decides to move a tablet from server A to B, on the target LB " |
132 | | "should select the tablet to move from most loaded drive."); |
133 | | |
134 | | DEFINE_bool(load_balancer_ignore_cloud_info_similarity, false, |
135 | | "If true, ignore the similarity between cloud infos when deciding which tablet " |
136 | | "to move."); |
137 | | |
138 | | // TODO(tsplit): make false by default or even remove flag after |
139 | | // https://github.com/yugabyte/yugabyte-db/issues/10301 is fixed. |
140 | | DEFINE_test_flag( |
141 | | bool, load_balancer_skip_inactive_tablets, true, "Don't move inactive (hidden) tablets"); |
142 | | |
143 | | namespace yb { |
144 | | namespace master { |
145 | | |
146 | | using std::unique_ptr; |
147 | | using std::make_unique; |
148 | | using std::string; |
149 | | using std::set; |
150 | | using std::vector; |
151 | | using strings::Substitute; |
152 | | |
153 | | namespace { |
154 | | |
155 | | vector<set<TabletId>> GetTabletsOnTSToMove(bool drive_aware, |
156 | 270k | const CBTabletServerMetadata& from_ts_meta) { |
157 | 270k | vector<set<TabletId>> all_tablets; |
158 | 270k | if (drive_aware) { |
159 | 544k | for (const auto& path : from_ts_meta.sorted_path_load_by_tablets_count) { |
160 | 544k | auto path_list = from_ts_meta.path_to_tablets.find(path); |
161 | 544k | if (path_list == from_ts_meta.path_to_tablets.end()) { |
162 | 0 | LOG(INFO) << "Found uninitialized path: " << path; |
163 | 0 | continue; |
164 | 0 | } |
165 | 544k | all_tablets.push_back(path_list->second); |
166 | 544k | } |
167 | 270k | } else { |
168 | 0 | all_tablets.push_back(from_ts_meta.running_tablets); |
169 | 0 | } |
170 | 270k | return all_tablets; |
171 | 270k | } |
172 | | |
173 | | // Returns sorted list of pair tablet id and path on to_ts. |
174 | | std::vector<std::pair<TabletId, std::string>> GetLeadersOnTSToMove( |
175 | 139k | bool drive_aware, const set<TabletId>& leaders, const CBTabletServerMetadata& to_ts_meta) { |
176 | 139k | std::vector<std::pair<TabletId, std::string>> peers; |
177 | 139k | if (drive_aware) { |
178 | 193k | for (const auto& path : to_ts_meta.sorted_path_load_by_leader_count) { |
179 | 193k | auto path_list = to_ts_meta.path_to_tablets.find(path); |
180 | 193k | if (path_list == to_ts_meta.path_to_tablets.end()) { |
181 | | // No tablets on this path, so skip it. |
182 | 130k | continue; |
183 | 130k | } |
184 | 63.4k | transform(path_list->second.begin(), path_list->second.end(), std::back_inserter(peers), |
185 | 243k | [&path_list](const TabletId& tablet_id) -> std::pair<TabletId, std::string> { |
186 | 243k | return make_pair(tablet_id, path_list->first); |
187 | 243k | }); |
188 | 63.4k | } |
189 | 139k | } else { |
190 | 0 | transform(to_ts_meta.running_tablets.begin(), to_ts_meta.running_tablets.end(), |
191 | 0 | std::back_inserter(peers), |
192 | 0 | [](const TabletId& tablet_id) -> std::pair<TabletId, std::string> { |
193 | 0 | return make_pair(tablet_id, ""); |
194 | 0 | }); |
195 | 0 | } |
196 | 139k | std::vector<std::pair<TabletId, std::string>> intersection; |
197 | 139k | copy_if(peers.begin(), peers.end(), std::back_inserter(intersection), |
198 | 243k | [&leaders](const std::pair<TabletId, std::string>& tablet) { |
199 | 243k | return leaders.count(tablet.first) > 0; |
200 | 243k | }); |
201 | 139k | return intersection; |
202 | 139k | } |
203 | | |
204 | | } // namespace |
205 | | |
206 | | Result<ReplicationInfoPB> ClusterLoadBalancer::GetTableReplicationInfo( |
207 | 896k | const scoped_refptr<TableInfo>& table) const { |
208 | | |
209 | | // Return custom placement policy if it exists. |
210 | 896k | { |
211 | 896k | auto l = table->LockForRead(); |
212 | 896k | if (l->pb.has_replication_info()) { |
213 | 17.8k | return l->pb.replication_info(); |
214 | 17.8k | } |
215 | 896k | } |
216 | | |
217 | | // Custom placement policy does not exist. Check whether this table |
218 | | // has a tablespace associated with it, if so, return the placement info |
219 | | // for that tablespace. |
220 | 878k | auto replication_info = VERIFY_RESULT(tablespace_manager_->GetTableReplicationInfo(table)); |
221 | 878k | if (replication_info) { |
222 | 5.60k | return replication_info.value(); |
223 | 5.60k | } |
224 | | |
225 | | // No custom policy or tablespace specified for table. |
226 | 872k | return GetClusterReplicationInfo(); |
227 | 878k | } |
228 | | |
229 | 947k | void ClusterLoadBalancer::InitTablespaceManager() { |
230 | 947k | tablespace_manager_ = catalog_manager_->GetTablespaceManager(); |
231 | 947k | } |
232 | | |
233 | 891k | Status ClusterLoadBalancer::PopulatePlacementInfo(TabletInfo* tablet, PlacementInfoPB* pb) { |
234 | 891k | const auto& replication_info = VERIFY_RESULT(GetTableReplicationInfo(tablet->table())); |
235 | 891k | if (state_->options_->type == LIVE) { |
236 | 888k | pb->CopyFrom(replication_info.live_replicas()); |
237 | 888k | } |
238 | 891k | if (state_->options_->type == READ_ONLY) { |
239 | 3.25k | if (replication_info.read_replicas_size() == 0) { |
240 | | // Should not reach here as tables that should not have read replicas should |
241 | | // have already been skipped before reaching here. |
242 | 0 | return STATUS(IllegalState, |
243 | 0 | Format("Encountered a table $0 with no read replicas. Placement info $1", |
244 | 0 | tablet->table()->name(), |
245 | 0 | replication_info.DebugString())); |
246 | 0 | } |
247 | 3.25k | pb->CopyFrom(GetReadOnlyPlacementFromUuid(replication_info)); |
248 | 3.25k | } |
249 | 891k | return Status::OK(); |
250 | 891k | } |
251 | | |
252 | 3.94M | Status ClusterLoadBalancer::UpdateTabletInfo(TabletInfo* tablet) { |
253 | 3.94M | const auto& table_id = tablet->table()->id(); |
254 | | // Set the placement information on a per-table basis, only once. |
255 | 3.94M | if (!state_->placement_by_table_.count(table_id)) { |
256 | 891k | PlacementInfoPB pb; |
257 | 891k | { |
258 | 891k | RETURN_NOT_OK(PopulatePlacementInfo(tablet, &pb)); |
259 | 891k | } |
260 | 891k | state_->placement_by_table_[table_id] = std::move(pb); |
261 | 891k | } |
262 | | |
263 | 3.94M | return state_->UpdateTablet(tablet); |
264 | 3.94M | } |
265 | | |
266 | 2.90M | const PlacementInfoPB& ClusterLoadBalancer::GetPlacementByTablet(const TabletId& tablet_id) const { |
267 | 2.90M | const auto& table_id = GetTabletMap().at(tablet_id)->table()->id(); |
268 | 2.90M | return state_->placement_by_table_.at(table_id); |
269 | 2.90M | } |
270 | | |
271 | 7.99k | size_t ClusterLoadBalancer::get_total_wrong_placement() const { |
272 | 7.99k | return state_->tablets_wrong_placement_.size(); |
273 | 7.99k | } |
274 | | |
275 | 7.99k | size_t ClusterLoadBalancer::get_total_blacklisted_servers() const { |
276 | 7.99k | return state_->blacklisted_servers_.size(); |
277 | 7.99k | } |
278 | | |
279 | 7.99k | size_t ClusterLoadBalancer::get_total_leader_blacklisted_servers() const { |
280 | 7.99k | return state_->leader_blacklisted_servers_.size(); |
281 | 7.99k | } |
282 | | |
283 | 815k | size_t ClusterLoadBalancer::get_total_over_replication() const { |
284 | 815k | return state_->tablets_over_replicated_.size(); |
285 | 815k | } |
286 | | |
287 | 7.99k | size_t ClusterLoadBalancer::get_total_under_replication() const { |
288 | 7.99k | return state_->tablets_missing_replicas_.size(); |
289 | 7.99k | } |
290 | | |
291 | 8.00k | size_t ClusterLoadBalancer::get_total_starting_tablets() const { |
292 | 8.00k | return global_state_->total_starting_tablets_; |
293 | 8.00k | } |
294 | | |
295 | 8.00k | int ClusterLoadBalancer::get_total_running_tablets() const { |
296 | 8.00k | return state_->total_running_; |
297 | 8.00k | } |
298 | | |
299 | 1.56M | bool ClusterLoadBalancer::IsLoadBalancerEnabled() const { |
300 | 1.56M | return FLAGS_enable_load_balancing && is_enabled_950k ; |
301 | 1.56M | } |
302 | | |
303 | | // Load balancer class. |
304 | | ClusterLoadBalancer::ClusterLoadBalancer(CatalogManager* cm) |
305 | | : random_(GetRandomSeed32()), |
306 | | is_enabled_(FLAGS_enable_load_balancing), |
307 | 8.07k | cbuf_activities_(FLAGS_load_balancer_num_idle_runs) { |
308 | 8.07k | ResetGlobalState(false /* initialize_ts_descs */); |
309 | | |
310 | 8.07k | catalog_manager_ = cm; |
311 | 8.07k | } |
312 | | |
313 | | // Reduce remaining_tasks by pending_tasks value, after sanitizing inputs. |
314 | | template <class T> |
315 | 3.73M | void set_remaining(T pending_tasks, T* remaining_tasks) { |
316 | 3.73M | if (pending_tasks > *remaining_tasks) { |
317 | 0 | LOG(WARNING) << "Pending tasks > max allowed tasks: " << pending_tasks << " > " |
318 | 0 | << *remaining_tasks; |
319 | 0 | *remaining_tasks = 0; |
320 | 3.73M | } else { |
321 | 3.73M | *remaining_tasks -= pending_tasks; |
322 | 3.73M | } |
323 | 3.73M | } void yb::master::set_remaining<int>(int, int*) Line | Count | Source | 315 | 2.84M | void set_remaining(T pending_tasks, T* remaining_tasks) { | 316 | 2.84M | if (pending_tasks > *remaining_tasks) { | 317 | 0 | LOG(WARNING) << "Pending tasks > max allowed tasks: " << pending_tasks << " > " | 318 | 0 | << *remaining_tasks; | 319 | 0 | *remaining_tasks = 0; | 320 | 2.84M | } else { | 321 | 2.84M | *remaining_tasks -= pending_tasks; | 322 | 2.84M | } | 323 | 2.84M | } |
void yb::master::set_remaining<unsigned long>(unsigned long, unsigned long*) Line | Count | Source | 315 | 891k | void set_remaining(T pending_tasks, T* remaining_tasks) { | 316 | 891k | if (pending_tasks > *remaining_tasks) { | 317 | 0 | LOG(WARNING) << "Pending tasks > max allowed tasks: " << pending_tasks << " > " | 318 | 0 | << *remaining_tasks; | 319 | 0 | *remaining_tasks = 0; | 320 | 891k | } else { | 321 | 891k | *remaining_tasks -= pending_tasks; | 322 | 891k | } | 323 | 891k | } |
|
324 | | |
325 | | // Needed as we have a unique_ptr to the forward declared PerTableLoadState class. |
326 | 93 | ClusterLoadBalancer::~ClusterLoadBalancer() = default; |
327 | | |
328 | 1.55M | void ClusterLoadBalancer::RunLoadBalancerWithOptions(Options* options) { |
329 | 1.55M | ResetGlobalState(); |
330 | | |
331 | 1.55M | uint32_t master_errors = 0; |
332 | | |
333 | 1.55M | if (!IsLoadBalancerEnabled()) { |
334 | 610k | LOG(INFO) << "Load balancing is not enabled."; |
335 | 610k | return; |
336 | 610k | } |
337 | | |
338 | 947k | if (!FLAGS_transaction_tables_use_preferred_zones) { |
339 | 947k | VLOG(1) << "Transaction tables will not respect leadership affinity."1.02k ; |
340 | 947k | } |
341 | | |
342 | 947k | std::unique_ptr<Options> options_unique_ptr; |
343 | 947k | if (options == nullptr) { |
344 | 0 | options_unique_ptr = std::make_unique<Options>(); |
345 | 0 | options = options_unique_ptr.get(); |
346 | 0 | } |
347 | | |
348 | 947k | InitTablespaceManager(); |
349 | | |
350 | | // Lock the CatalogManager maps for the duration of the load balancer run. |
351 | 947k | CatalogManager::SharedLock lock(catalog_manager_->mutex_); |
352 | | |
353 | 947k | int remaining_adds = options->kMaxConcurrentAdds; |
354 | 947k | int remaining_removals = options->kMaxConcurrentRemovals; |
355 | 947k | int remaining_leader_moves = options->kMaxConcurrentLeaderMoves; |
356 | | |
357 | | // Loop over all tables to get the count of pending tasks. |
358 | 947k | int pending_add_replica_tasks = 0; |
359 | 947k | int pending_remove_replica_tasks = 0; |
360 | 947k | int pending_stepdown_leader_tasks = 0; |
361 | | |
362 | 77.1M | for (const auto& table : GetTableMap()) { |
363 | 77.1M | if (SkipLoadBalancing(*table.second)) { |
364 | | // Populate the list of tables for which LB has been skipped |
365 | | // in LB's internal vector. |
366 | 76.2M | skipped_tables_per_run_.push_back(table.second); |
367 | 76.2M | continue; |
368 | 76.2M | } |
369 | 895k | const TableId& table_id = table.first; |
370 | 895k | if (tablespace_manager_->NeedsRefreshToFindTablePlacement(table.second)) { |
371 | | // Placement information was not present in catalog manager cache. This is probably a |
372 | | // recently created table, skip load balancing for now, hopefully by the next run, |
373 | | // the background task in the catalog manager will pick up the placement information |
374 | | // for this table from the PG catalog tables. |
375 | | // TODO(deepthi) Keep track of the number of times this happens, take appropriate action |
376 | | // if placement stays missing over period of time. |
377 | 2.88k | YB_LOG_EVERY_N(INFO, 10) << "Skipping load balancing for table " << table.second->name() |
378 | 305 | << " as its placement information is not available yet"; |
379 | 2.88k | master_errors++; |
380 | 2.88k | continue; |
381 | 2.88k | } |
382 | | |
383 | 892k | if (options->type == READ_ONLY) { |
384 | 4.60k | const auto& result = GetTableReplicationInfo(table.second); |
385 | 4.60k | if (!result) { |
386 | 0 | YB_LOG_EVERY_N(WARNING, 10) << "Skipping load balancing for " << table.first << ": " |
387 | 0 | << "as fetching replication info failed with error " |
388 | 0 | << StatusToString(result.status()); |
389 | 0 | master_errors++; |
390 | 0 | continue; |
391 | 0 | } |
392 | 4.60k | if (result.get().read_replicas_size() == 0) { |
393 | | // The table has a replication policy without any read replicas present. |
394 | | // The LoadBalancer is handling read replicas in this run, so this |
395 | | // table can be skipped. |
396 | 1.35k | continue; |
397 | 1.35k | } |
398 | 4.60k | } |
399 | 891k | ResetTableStatePtr(table_id, options); |
400 | | |
401 | 891k | bool is_txn_table = table.second->GetTableType() == TRANSACTION_STATUS_TABLE_TYPE; |
402 | 891k | state_->use_preferred_zones_ = !is_txn_table || FLAGS_transaction_tables_use_preferred_zones115k ; |
403 | 891k | InitializeTSDescriptors(); |
404 | | |
405 | 891k | Status s = CountPendingTasksUnlocked(table_id, |
406 | 891k | &pending_add_replica_tasks, |
407 | 891k | &pending_remove_replica_tasks, |
408 | 891k | &pending_stepdown_leader_tasks); |
409 | 891k | if (!s.ok()) { |
410 | | // Found uninitialized ts_meta, so don't load balance this table yet. |
411 | 0 | LOG(WARNING) << "Skipping load balancing " << table.first << ": " << StatusToString(s); |
412 | 0 | per_table_states_.erase(table_id); |
413 | 0 | master_errors++; |
414 | 0 | continue; |
415 | 0 | } |
416 | 891k | } |
417 | | |
418 | 947k | if (pending_add_replica_tasks + pending_remove_replica_tasks + pending_stepdown_leader_tasks> 0) { |
419 | 1.50k | LOG(INFO) << "Total pending adds=" << pending_add_replica_tasks << ", total pending removals=" |
420 | 1.50k | << pending_remove_replica_tasks << ", total pending leader stepdowns=" |
421 | 1.50k | << pending_stepdown_leader_tasks; |
422 | 1.50k | if (PREDICT_FALSE(FLAGS_TEST_load_balancer_wait_after_count_pending_tasks_ms > 0)) { |
423 | 0 | LOG(INFO) << "Sleeping after finding pending tasks for " |
424 | 0 | << FLAGS_TEST_load_balancer_wait_after_count_pending_tasks_ms << " ms"; |
425 | 0 | SleepFor( |
426 | 0 | MonoDelta::FromMilliseconds(FLAGS_TEST_load_balancer_wait_after_count_pending_tasks_ms)); |
427 | 0 | } |
428 | 1.50k | } |
429 | | |
430 | 947k | set_remaining(pending_add_replica_tasks, &remaining_adds); |
431 | 947k | set_remaining(pending_remove_replica_tasks, &remaining_removals); |
432 | 947k | set_remaining(pending_stepdown_leader_tasks, &remaining_leader_moves); |
433 | | |
434 | | // At the start of the run, report LB state that might prevent it from running smoothly. |
435 | 947k | ReportUnusualLoadBalancerState(); |
436 | | |
437 | | // Loop over all tables to analyze the global and per-table load. |
438 | 77.1M | for (const auto& table : GetTableMap()) { |
439 | 77.1M | if (SkipLoadBalancing(*table.second)) { |
440 | 76.2M | continue; |
441 | 76.2M | } |
442 | | |
443 | 895k | auto it = per_table_states_.find(table.first); |
444 | 895k | if (it == per_table_states_.end()) { |
445 | | // If the table state doesn't exist, it was not fully initialized in the previous iteration. |
446 | 4.24k | VLOG(1) << "Unable to find the state for table " << table.first1.42k ; |
447 | 4.24k | continue; |
448 | 4.24k | } |
449 | 891k | state_ = it->second.get(); |
450 | | |
451 | | // Prepare the in-memory structures. |
452 | 891k | auto handle_analyze_tablets = AnalyzeTabletsUnlocked(table.first); |
453 | 891k | if (!handle_analyze_tablets.ok()) { |
454 | 208 | LOG(WARNING) << "Skipping load balancing " << table.first << ": " |
455 | 208 | << StatusToString(handle_analyze_tablets); |
456 | 208 | per_table_states_.erase(table.first); |
457 | 208 | master_errors++; |
458 | 208 | } |
459 | 891k | } |
460 | | |
461 | 947k | VLOG(1) << "Number of remote bootstraps before running load balancer: " |
462 | 1.02k | << global_state_->total_starting_tablets_; |
463 | | |
464 | | // Iterate over all the tables to take actions based on the data collected on the previous loop. |
465 | 77.1M | for (const auto& table : GetTableMap()) { |
466 | 77.1M | state_ = nullptr; |
467 | 77.1M | if (remaining_adds == 0 && remaining_removals == 037.7k && remaining_leader_moves == 05.55k ) { |
468 | 4 | break; |
469 | 4 | } |
470 | 77.1M | if (SkipLoadBalancing(*table.second)) { |
471 | 76.2M | continue; |
472 | 76.2M | } |
473 | | |
474 | 895k | auto it = per_table_states_.find(table.first); |
475 | 895k | if (it == per_table_states_.end()) { |
476 | | // If the table state doesn't exist, it didn't get analyzed by the previous iteration. |
477 | 4.45k | VLOG(1) << "Unable to find table state for table " << table.first |
478 | 1.46k | << ". Skipping load balancing execution"; |
479 | 4.45k | continue; |
480 | 891k | } else { |
481 | 891k | VLOG(5) << "Load balancing table " << table.first0 ; |
482 | 891k | } |
483 | 891k | state_ = it->second.get(); |
484 | | |
485 | | // We may have modified global loads, so we need to reset this state's load. |
486 | 891k | state_->SortLoad(); |
487 | | |
488 | | // Output parameters are unused in the load balancer, but useful in testing. |
489 | 891k | TabletId out_tablet_id; |
490 | 891k | TabletServerId out_from_ts; |
491 | 891k | TabletServerId out_to_ts; |
492 | | |
493 | | // Handle adding and moving replicas. |
494 | 893k | for ( ; remaining_adds > 0; --remaining_adds2.17k ) { |
495 | 890k | if (state_->allow_only_leader_balancing_) { |
496 | 83.9k | YB_LOG_EVERY_N_SECS(INFO, 30) << "Skipping Add replicas. Only leader balancing table " |
497 | 2.03k | << table.first; |
498 | 83.9k | break; |
499 | 83.9k | } |
500 | 806k | auto handle_add = HandleAddReplicas(&out_tablet_id, &out_from_ts, &out_to_ts); |
501 | 806k | if (!handle_add.ok()) { |
502 | 2.62k | LOG(WARNING) << "Skipping add replicas for " << table.first << ": " |
503 | 2.62k | << StatusToString(handle_add); |
504 | 2.62k | master_errors++; |
505 | 2.62k | break; |
506 | 2.62k | } |
507 | 803k | if (!*handle_add) { |
508 | 801k | break; |
509 | 801k | } |
510 | 803k | } |
511 | 891k | if (PREDICT_FALSE(FLAGS_TEST_load_balancer_handle_under_replicated_tablets_only)) { |
512 | 210 | LOG(INFO) << "Skipping remove replicas and leader moves for " << table.first; |
513 | 210 | continue; |
514 | 210 | } |
515 | | |
516 | | // Handle cleanup after over-replication. |
517 | 893k | for ( ; 891k remaining_removals > 0; --remaining_removals2.51k ) { |
518 | 891k | if (state_->allow_only_leader_balancing_) { |
519 | 83.4k | YB_LOG_EVERY_N_SECS(INFO, 30) << "Skipping remove replicas. Only leader balancing table " |
520 | 2.01k | << table.first; |
521 | 83.4k | break; |
522 | 83.4k | } |
523 | 807k | auto handle_remove = HandleRemoveReplicas(&out_tablet_id, &out_from_ts); |
524 | 807k | if (!handle_remove.ok()) { |
525 | 0 | LOG(WARNING) << "Skipping remove replicas for " << table.first << ": " |
526 | 0 | << StatusToString(handle_remove); |
527 | 0 | master_errors++; |
528 | 0 | break; |
529 | 0 | } |
530 | 807k | if (!*handle_remove) { |
531 | 805k | break; |
532 | 805k | } |
533 | 807k | } |
534 | | |
535 | | // Handle tablet servers with too many leaders. |
536 | | // Check the current pending tasks per table to ensure we don't trigger the same task. |
537 | 891k | size_t table_remaining_leader_moves = state_->options_->kMaxConcurrentLeaderMovesPerTable; |
538 | 891k | set_remaining(state_->pending_stepdown_leader_tasks_[table.first].size(), |
539 | 891k | &table_remaining_leader_moves); |
540 | | // Keep track of both the global and per table limit on number of moves. |
541 | 891k | for ( ; |
542 | 944k | remaining_leader_moves > 0 && table_remaining_leader_moves > 0937k ; |
543 | 891k | --remaining_leader_moves, --table_remaining_leader_moves53.2k ) { |
544 | 885k | auto handle_leader = HandleLeaderMoves(&out_tablet_id, &out_from_ts, &out_to_ts); |
545 | 885k | if (!handle_leader.ok()) { |
546 | 1 | LOG(WARNING) << "Skipping leader moves for " << table.first << ": " |
547 | 1 | << StatusToString(handle_leader); |
548 | 1 | master_errors++; |
549 | 1 | break; |
550 | 1 | } |
551 | 885k | if (!*handle_leader) { |
552 | 832k | break; |
553 | 832k | } |
554 | 885k | } |
555 | 891k | } |
556 | | |
557 | 947k | RecordActivity(master_errors); |
558 | 947k | } |
559 | | |
560 | 1.54M | void ClusterLoadBalancer::RunLoadBalancer(Options* options) { |
561 | 1.54M | SysClusterConfigEntryPB config; |
562 | 1.54M | CHECK_OK(catalog_manager_->GetClusterConfig(&config)); |
563 | | |
564 | 1.54M | std::unique_ptr<Options> options_unique_ptr = |
565 | 1.54M | std::make_unique<Options>(); |
566 | 1.54M | Options* options_ent = options_unique_ptr.get(); |
567 | | // First, we load balance the live cluster. |
568 | 1.54M | options_ent->type = LIVE; |
569 | 1.54M | if (config.replication_info().live_replicas().has_placement_uuid()) { |
570 | 12.7k | options_ent->placement_uuid = config.replication_info().live_replicas().placement_uuid(); |
571 | 12.7k | options_ent->live_placement_uuid = options_ent->placement_uuid; |
572 | 1.53M | } else { |
573 | 1.53M | options_ent->placement_uuid = ""; |
574 | 1.53M | options_ent->live_placement_uuid = ""; |
575 | 1.53M | } |
576 | 1.54M | RunLoadBalancerWithOptions(options_ent); |
577 | | |
578 | | // Then, we balance all read-only clusters. |
579 | 1.54M | options_ent->type = READ_ONLY; |
580 | 1.55M | for (int i = 0; i < config.replication_info().read_replicas_size(); i++12.5k ) { |
581 | 12.5k | const PlacementInfoPB& read_only_cluster = config.replication_info().read_replicas(i); |
582 | 12.5k | options_ent->placement_uuid = read_only_cluster.placement_uuid(); |
583 | 12.5k | RunLoadBalancerWithOptions(options_ent); |
584 | 12.5k | } |
585 | 1.54M | } |
586 | | |
587 | 947k | void ClusterLoadBalancer::RecordActivity(uint32_t master_errors) { |
588 | | // Update the list of tables for whom load-balancing has been |
589 | | // skipped in this run. |
590 | 947k | { |
591 | 947k | std::lock_guard<decltype(mutex_)> l(mutex_); |
592 | 947k | skipped_tables_ = skipped_tables_per_run_; |
593 | 947k | } |
594 | | |
595 | 947k | uint32_t table_tasks = 0; |
596 | 77.1M | for (const auto& table : GetTableMap()) { |
597 | 77.1M | table_tasks += table.second->NumLBTasks(); |
598 | 77.1M | } |
599 | | |
600 | 947k | struct ActivityInfo ai {table_tasks, master_errors}; |
601 | | |
602 | | // Update circular buffer summary. |
603 | | |
604 | 947k | if (ai.IsIdle()) { |
605 | 892k | num_idle_runs_++; |
606 | 892k | } else { |
607 | 54.9k | VLOG(1) << |
608 | 162 | Substitute("Load balancer has $0 table tasks and $1 master errors", |
609 | 162 | table_tasks, master_errors); |
610 | 54.9k | } |
611 | | |
612 | 947k | if (cbuf_activities_.full()) { |
613 | 935k | if (cbuf_activities_.front().IsIdle()) { |
614 | 880k | num_idle_runs_--; |
615 | 880k | } |
616 | 935k | } |
617 | | |
618 | | // Mutate circular buffer. |
619 | 947k | cbuf_activities_.push_back(std::move(ai)); |
620 | | |
621 | | // Update state. |
622 | 947k | is_idle_.store(num_idle_runs_ == cbuf_activities_.size(), std::memory_order_release); |
623 | | |
624 | | // Two interesting cases when updating can_perform_global_operations_ state: |
625 | | // If we previously couldn't balance global load, but now the LB is idle, enable global balancing. |
626 | | // If we previously could balance global load, but now the LB is busy, then it is busy balancing |
627 | | // global load or doing other operations (remove, etc.). In this case, we keep global balancing |
628 | | // enabled up until we perform a non-global balancing move (see GetLoadToMove()). |
629 | | // TODO(julien) some small improvements can be made here, such as ignoring leader stepdown tasks. |
630 | 947k | can_perform_global_operations_ = can_perform_global_operations_ || ai.IsIdle()59.8k ; |
631 | 947k | } |
632 | | |
633 | 2.71k | Status ClusterLoadBalancer::IsIdle() const { |
634 | 2.71k | if (IsLoadBalancerEnabled() && !is_idle_.load(std::memory_order_acquire)) { |
635 | 2.18k | return STATUS( |
636 | 2.18k | IllegalState, |
637 | 2.18k | "Task or error encountered recently.", |
638 | 2.18k | MasterError(MasterErrorPB::LOAD_BALANCER_RECENTLY_ACTIVE)); |
639 | 2.18k | } |
640 | | |
641 | 538 | return Status::OK(); |
642 | 2.71k | } |
643 | | |
644 | 291k | bool ClusterLoadBalancer::CanBalanceGlobalLoad() const { |
645 | 291k | return FLAGS_enable_global_load_balancing && can_perform_global_operations_290k ; |
646 | 291k | } |
647 | | |
648 | 947k | void ClusterLoadBalancer::ReportUnusualLoadBalancerState() const { |
649 | 3.02M | for (const auto& ts_desc : global_state_->ts_descs_) { |
650 | | // Report if any ts has a pending delete. |
651 | 3.02M | if (ts_desc->HasTabletDeletePending()) { |
652 | 16.4k | LOG(INFO) << Format("tablet server $0 has a pending delete for tablets $1", |
653 | 16.4k | ts_desc->permanent_uuid(), ts_desc->PendingTabletDeleteToString()); |
654 | 16.4k | } |
655 | 3.02M | } |
656 | 947k | } |
657 | | |
658 | 1.56M | void ClusterLoadBalancer::ResetGlobalState(bool initialize_ts_descs) { |
659 | 1.56M | per_table_states_.clear(); |
660 | 1.56M | global_state_ = std::make_unique<GlobalLoadState>(); |
661 | 1.56M | global_state_->drive_aware_ = FLAGS_load_balancer_drive_aware; |
662 | 1.56M | if (initialize_ts_descs) { |
663 | | // Only call GetAllDescriptors once for a LB run, and then cache it in global_state_. |
664 | 1.55M | GetAllDescriptors(&global_state_->ts_descs_); |
665 | 1.55M | } |
666 | 1.56M | skipped_tables_per_run_.clear(); |
667 | 1.56M | } |
668 | | |
669 | 891k | void ClusterLoadBalancer::ResetTableStatePtr(const TableId& table_id, Options* options) { |
670 | 891k | auto table_state = std::make_unique<PerTableLoadState>(global_state_.get()); |
671 | 891k | table_state->options_ = options; |
672 | 891k | state_ = table_state.get(); |
673 | 891k | per_table_states_[table_id] = std::move(table_state); |
674 | | |
675 | 891k | state_->table_id_ = table_id; |
676 | 891k | } |
677 | | |
678 | 891k | Status ClusterLoadBalancer::AnalyzeTabletsUnlocked(const TableId& table_uuid) { |
679 | 891k | auto tablets = VERIFY_RESULT_PREPEND( |
680 | 891k | GetTabletsForTable(table_uuid), "Skipping table " + table_uuid + "due to error: "); |
681 | | |
682 | | // Loop over tablet map to register the load that is already live in the cluster. |
683 | 3.94M | for (const auto& tablet : tablets) { |
684 | 3.94M | bool tablet_running = false; |
685 | 3.94M | { |
686 | 3.94M | auto tablet_lock = tablet->LockForRead(); |
687 | | |
688 | 3.94M | if (!tablet->table()) { |
689 | | // Tablet is orphaned or in preparing state, continue. |
690 | 0 | continue; |
691 | 0 | } |
692 | 3.94M | tablet_running = tablet_lock->is_running(); |
693 | 3.94M | } |
694 | | |
695 | | // This is from the perspective of the CatalogManager and the on-disk, persisted |
696 | | // SysCatalogStatePB. What this means is that this tablet was properly created as part of a |
697 | | // CreateTable and the information was sent to the initial set of TS and the tablet got to an |
698 | | // initial running state. |
699 | | // |
700 | | // This is different from the individual, per-TS state of the tablet, which can vary based on |
701 | | // the TS itself. The tablet can be registered as RUNNING, as far as the CatalogManager is |
702 | | // concerned, but just be underreplicated, and have some TS currently bootstrapping instances |
703 | | // of the tablet. |
704 | 3.94M | if (tablet_running) { |
705 | 3.94M | RETURN_NOT_OK(UpdateTabletInfo(tablet.get())); |
706 | 3.94M | } |
707 | 3.94M | } |
708 | | |
709 | | // After updating the tablets and tablet servers, adjust the configured threshold if it is too |
710 | | // low for the given configuration. |
711 | 891k | state_->AdjustLeaderBalanceThreshold(); |
712 | | |
713 | | // Once we've analyzed both the tablet server information as well as the tablets, we can sort the |
714 | | // load and are ready to apply the load balancing rules. |
715 | 891k | state_->SortLoad(); |
716 | | |
717 | | // Since leader load is only needed to rebalance leaders, we keep the sorting separate. |
718 | 891k | state_->SortLeaderLoad(); |
719 | | |
720 | 891k | VLOG(1) << Substitute( |
721 | 7.99k | "Table: $0. Total running tablets: $1. Total overreplication: $2. Total starting tablets: $3." |
722 | 7.99k | " Wrong placement: $4. BlackListed: $5. Total underreplication: $6, Leader BlackListed: $7", |
723 | 7.99k | table_uuid, get_total_running_tablets(), get_total_over_replication(), |
724 | 7.99k | get_total_starting_tablets(), get_total_wrong_placement(), get_total_blacklisted_servers(), |
725 | 7.99k | get_total_under_replication(), get_total_leader_blacklisted_servers()); |
726 | | |
727 | 3.94M | for (const auto& tablet : tablets) { |
728 | 3.94M | const auto& tablet_id = tablet->id(); |
729 | 3.94M | if (state_->pending_remove_replica_tasks_[table_uuid].count(tablet_id) > 0) { |
730 | 651 | RETURN_NOT_OK(state_->RemoveReplica( |
731 | 651 | tablet_id, state_->pending_remove_replica_tasks_[table_uuid][tablet_id])); |
732 | 651 | } |
733 | 3.94M | if (state_->pending_stepdown_leader_tasks_[table_uuid].count(tablet_id) > 0) { |
734 | 530 | const auto& tablet_meta = state_->per_tablet_meta_[tablet_id]; |
735 | 530 | const auto& from_ts = tablet_meta.leader_uuid; |
736 | 530 | const auto& to_ts = state_->pending_stepdown_leader_tasks_[table_uuid][tablet_id]; |
737 | 530 | RETURN_NOT_OK(state_->MoveLeader(tablet->id(), from_ts, to_ts)); |
738 | 530 | } |
739 | 3.94M | if (state_->pending_add_replica_tasks_[table_uuid].count(tablet_id) > 0) { |
740 | 423 | RETURN_NOT_OK(state_->AddReplica(tablet->id(), |
741 | 423 | state_->pending_add_replica_tasks_[table_uuid][tablet_id])); |
742 | 423 | } |
743 | 3.94M | } |
744 | | |
745 | 891k | return Status::OK(); |
746 | 891k | } |
747 | | |
748 | | Result<bool> ClusterLoadBalancer::HandleAddIfMissingPlacement( |
749 | 803k | TabletId* out_tablet_id, TabletServerId* out_to_ts) { |
750 | 803k | for (const auto& tablet_id : state_->tablets_missing_replicas_) { |
751 | 116k | const auto& tablet_meta = state_->per_tablet_meta_[tablet_id]; |
752 | 116k | const auto& placement_info = GetPlacementByTablet(tablet_id); |
753 | 116k | const auto& missing_placements = tablet_meta.under_replicated_placements; |
754 | | // Loop through TSs by load to find a TS that matches the placement needed and does not already |
755 | | // host this tablet. |
756 | 229k | for (const auto& ts_uuid : state_->sorted_load_) { |
757 | 229k | bool can_choose_ts = false; |
758 | | // If we had no placement information, it means we are just under-replicated, so just check |
759 | | // that we can use this tablet server. |
760 | 229k | if (placement_info.placement_blocks().empty()) { |
761 | | // No need to check placement info, as there is none. |
762 | 224k | can_choose_ts = VERIFY_RESULT(state_->CanAddTabletToTabletServer(tablet_id, ts_uuid)); |
763 | 224k | } else { |
764 | | // We added a tablet to the set with missing replicas both if it is under-replicated, and we |
765 | | // added a placement to the tablet_meta under_replicated_placements if the num replicas in |
766 | | // that placement is fewer than min_num_replicas. If the under-replicated tablet has a |
767 | | // placement that is under-replicated and the ts is not in that placement, then that ts |
768 | | // isn't valid. |
769 | 5.03k | const auto& ts_meta = state_->per_ts_meta_[ts_uuid]; |
770 | | // Either we have specific placement blocks that are under-replicated, so confirm |
771 | | // that this TS matches or all the placement blocks have min_num_replicas |
772 | | // but overall num_replicas is fewer than expected. |
773 | | // In the latter case, we still need to conform to the placement rules. |
774 | 5.03k | if (missing_placements.empty() || |
775 | 5.03k | tablet_meta.CanAddTSToMissingPlacements(ts_meta.descriptor)294 ) { |
776 | | // If we don't have any missing placements but are under-replicated then we need to |
777 | | // validate placement information in order to avoid adding to a wrong placement block. |
778 | | // |
779 | | // Do the placement check for both the cases. |
780 | | // If we have missing placements then this check is a tautology otherwise it matters. |
781 | 4.86k | can_choose_ts = VERIFY_RESULT(state_->CanAddTabletToTabletServer(tablet_id, ts_uuid, |
782 | 4.86k | &placement_info)); |
783 | 4.86k | } |
784 | 5.03k | } |
785 | | // If we've passed the checks, then we can choose this TS to add the replica to. |
786 | 229k | if (can_choose_ts) { |
787 | 713 | *out_tablet_id = tablet_id; |
788 | 713 | *out_to_ts = ts_uuid; |
789 | 713 | RETURN_NOT_OK(AddReplica(tablet_id, ts_uuid)); |
790 | 713 | state_->tablets_missing_replicas_.erase(tablet_id); |
791 | 713 | return true; |
792 | 713 | } |
793 | 229k | } |
794 | 116k | } |
795 | 803k | return false; |
796 | 803k | } |
797 | | |
798 | | Result<bool> ClusterLoadBalancer::HandleAddIfWrongPlacement( |
799 | 803k | TabletId* out_tablet_id, TabletServerId* out_from_ts, TabletServerId* out_to_ts) { |
800 | 803k | for (const auto& tablet_id : state_->tablets_wrong_placement_) { |
801 | | // Skip this tablet, if it is already over-replicated, as it does not need another replica, it |
802 | | // should just have one removed in the removal step. |
803 | 1.15k | if (state_->tablets_over_replicated_.count(tablet_id)) { |
804 | 296 | continue; |
805 | 296 | } |
806 | 863 | if (VERIFY_RESULT(state_->CanSelectWrongReplicaToMove( |
807 | 863 | tablet_id, GetPlacementByTablet(tablet_id), out_from_ts, out_to_ts))) { |
808 | 447 | *out_tablet_id = tablet_id; |
809 | 447 | RETURN_NOT_OK(MoveReplica(tablet_id, *out_from_ts, *out_to_ts)); |
810 | 447 | return true; |
811 | 447 | } |
812 | 863 | } |
813 | 802k | return false; |
814 | 803k | } |
815 | | |
816 | | Result<bool> ClusterLoadBalancer::HandleAddReplicas( |
817 | 806k | TabletId* out_tablet_id, TabletServerId* out_from_ts, TabletServerId* out_to_ts) { |
818 | 806k | if (state_->options_->kAllowLimitStartingTablets) { |
819 | 806k | if (global_state_->total_starting_tablets_ >= state_->options_->kMaxTabletRemoteBootstraps) { |
820 | 2 | return STATUS_SUBSTITUTE(TryAgain, "Cannot add replicas. Currently remote bootstrapping $0 " |
821 | 2 | "tablets, when our max allowed is $1", |
822 | 2 | global_state_->total_starting_tablets_, state_->options_->kMaxTabletRemoteBootstraps); |
823 | 806k | } else if (state_->total_starting_ >= state_->options_->kMaxTabletRemoteBootstrapsPerTable) { |
824 | 813 | return STATUS_SUBSTITUTE(TryAgain, "Cannot add replicas. Currently remote bootstrapping $0 " |
825 | 813 | "tablets for table $1, when our max allowed is $2 per table", |
826 | 813 | state_->total_starting_, state_->table_id_, |
827 | 813 | state_->options_->kMaxTabletRemoteBootstrapsPerTable); |
828 | 813 | } |
829 | 806k | } |
830 | | |
831 | 805k | if (state_->options_->kAllowLimitOverReplicatedTablets && |
832 | 805k | get_total_over_replication() >= |
833 | 805k | implicit_cast<size_t>(state_->options_->kMaxOverReplicatedTablets)) { |
834 | 1.81k | return STATUS_SUBSTITUTE(TryAgain, |
835 | 1.81k | "Cannot add replicas. Currently have a total overreplication of $0, when max allowed is $1" |
836 | 1.81k | ", overreplicated tablets: $2", |
837 | 1.81k | get_total_over_replication(), state_->options_->kMaxOverReplicatedTablets, |
838 | 1.81k | boost::algorithm::join(state_->tablets_over_replicated_, ", ")); |
839 | 1.81k | } |
840 | | |
841 | 803k | VLOG(1) << "Number of global concurrent remote bootstrap sessions: " |
842 | 7.86k | << global_state_->total_starting_tablets_ |
843 | 7.86k | << ", max allowed: " << state_->options_->kMaxTabletRemoteBootstraps |
844 | 7.86k | << ". Number of concurrent remote bootstrap sessions for table " << state_->table_id_ |
845 | 7.86k | << ": " << state_->total_starting_ |
846 | 7.86k | << ", max allowed: " << state_->options_->kMaxTabletRemoteBootstrapsPerTable; |
847 | | |
848 | | // Handle missing placements with highest priority, as it means we're potentially |
849 | | // under-replicated. |
850 | 803k | if (VERIFY_RESULT(HandleAddIfMissingPlacement(out_tablet_id, out_to_ts))) { |
851 | 713 | return true; |
852 | 713 | } |
853 | | |
854 | | // Handle wrong placements as next priority, as these could be servers we're moving off of, so |
855 | | // we can decommission ASAP. |
856 | 803k | if (VERIFY_RESULT(HandleAddIfWrongPlacement(out_tablet_id, out_from_ts, out_to_ts))) { |
857 | 447 | return true; |
858 | 447 | } |
859 | | |
860 | | // Finally, handle normal load balancing. |
861 | 802k | if (!VERIFY_RESULT(GetLoadToMove(out_tablet_id, out_from_ts, out_to_ts))) { |
862 | 801k | VLOG(1) << "Cannot find any more tablets to move, under current constraints."7.81k ; |
863 | 801k | if (VLOG_IS_ON(1)) { |
864 | 7.81k | DumpSortedLoad(); |
865 | 7.81k | } |
866 | 801k | return false; |
867 | 801k | } |
868 | | |
869 | 1.07k | return true; |
870 | 802k | } |
871 | | |
872 | 7.81k | void ClusterLoadBalancer::DumpSortedLoad() const { |
873 | 7.81k | ssize_t last_pos = state_->sorted_load_.size() - 1; |
874 | 7.81k | std::ostringstream out; |
875 | 7.81k | out << "Table load (global load): "; |
876 | 28.9k | for (ssize_t left = 0; left <= last_pos; ++left21.0k ) { |
877 | 21.0k | const TabletServerId& uuid = state_->sorted_load_[left]; |
878 | 21.0k | auto load = state_->GetLoad(uuid); |
879 | 21.0k | out << uuid << ":" << load << " (" << global_state_->GetGlobalLoad(uuid) << ") "; |
880 | 21.0k | } |
881 | 7.81k | VLOG(1) << out.str(); |
882 | 7.81k | } |
883 | | |
884 | | Result<bool> ClusterLoadBalancer::GetLoadToMove( |
885 | 802k | TabletId* moving_tablet_id, TabletServerId* from_ts, TabletServerId* to_ts) { |
886 | 802k | if (state_->sorted_load_.empty()) { |
887 | 118 | return false; |
888 | 118 | } |
889 | | |
890 | | // Start with two indices pointing at left and right most ends of the sorted_load_ structure. |
891 | | // |
892 | | // We will try to find two TSs that have at least one tablet that can be moved amongst them, from |
893 | | // the higher load to the lower load TS. To do this, we will go through comparing the TSs |
894 | | // corresponding to our left and right indices, exclude tablets from the right, high loaded TS |
895 | | // according to our load balancing rules, such as load variance, starting tablets and not moving |
896 | | // already over-replicated tablets. We then compare the remaining set of tablets with the ones |
897 | | // hosted by the lower loaded TS and use ReservoirSample to pick a tablet from the set |
898 | | // difference. If there were no tablets to pick, we advance our state. |
899 | | // |
900 | | // The state is defined as the positions of the start and end indices. We always try to move the |
901 | | // right index back, until we cannot any more, due to either reaching the left index (cannot |
902 | | // rebalance from one TS to itself), or the difference of load between the two TSs is too low to |
903 | | // try to rebalance (if load variance is 1, it does not make sense to move tablets between the |
904 | | // TSs). When we cannot lower the right index any further, we reset it back to last_pos and |
905 | | // increment the left index. |
906 | | // |
907 | | // We stop the whole algorithm if the left index reaches last_pos, or if we reset the right index |
908 | | // and are already breaking the invariance rule, as that means that any further differences in |
909 | | // the interval between left and right cannot have load > kMinLoadVarianceToBalance. |
910 | 802k | ssize_t last_pos = state_->sorted_load_.size() - 1; |
911 | 911k | for (ssize_t left = 0; left <= last_pos; ++left109k ) { |
912 | 1.18M | for (auto right = last_pos; right >= 0; --right269k ) { |
913 | 1.18M | const TabletServerId& low_load_uuid = state_->sorted_load_[left]; |
914 | 1.18M | const TabletServerId& high_load_uuid = state_->sorted_load_[right]; |
915 | 1.18M | ssize_t load_variance = state_->GetLoad(high_load_uuid) - state_->GetLoad(low_load_uuid); |
916 | 1.18M | bool is_global_balancing_move = false; |
917 | | |
918 | | // Check for state change or end conditions. |
919 | 1.18M | if (left == right || load_variance < state_->options_->kMinLoadVarianceToBalance1.10M ) { |
920 | | // Either both left and right are at the end, or there is no load_variance, which means |
921 | | // there will be no load_variance for any TSs between left and right, so we can return. |
922 | 912k | if (right == last_pos && load_variance == 0805k ) { |
923 | 781k | return false; |
924 | 781k | } |
925 | | // If there is load variance, then there is a chance we can benefit from globally balancing. |
926 | 131k | if (load_variance > 0 && CanBalanceGlobalLoad()26.0k ) { |
927 | 22.2k | int global_load_variance = global_state_->GetGlobalLoad(high_load_uuid) - |
928 | 22.2k | global_state_->GetGlobalLoad(low_load_uuid); |
929 | 22.2k | if (global_load_variance < state_->options_->kMinGlobalLoadVarianceToBalance) { |
930 | | // Already globally balanced. Since we are sorted by global load, we can return here as |
931 | | // there are no other moves for us to make. |
932 | 20.5k | return false; |
933 | 20.5k | } |
934 | | // Mark this move as a global balancing move and try to find a tablet to move. |
935 | 1.70k | is_global_balancing_move = true; |
936 | 109k | } else { |
937 | | // The load_variance is too low, which means we weren't able to find a load to move to |
938 | | // the left tserver. Continue and try with the next left tserver. |
939 | 109k | break; |
940 | 109k | } |
941 | 131k | } |
942 | | |
943 | | // If we don't find a tablet_id to move between these two TSs, advance the state. |
944 | 270k | if (VERIFY_RESULT(GetTabletToMove(high_load_uuid, low_load_uuid, moving_tablet_id))) { |
945 | | // If we got this far, we have the candidate we want, so fill in the output params and |
946 | | // return. The tablet_id is filled in from GetTabletToMove. |
947 | 1.07k | *from_ts = high_load_uuid; |
948 | 1.07k | *to_ts = low_load_uuid; |
949 | 1.07k | RETURN_NOT_OK(MoveReplica(*moving_tablet_id, high_load_uuid, low_load_uuid)); |
950 | | // Update global state if necessary. |
951 | 1.07k | if (!is_global_balancing_move) { |
952 | 1.04k | can_perform_global_operations_ = false; |
953 | 1.04k | } |
954 | 1.07k | return true; |
955 | 1.07k | } |
956 | 270k | } |
957 | 911k | } |
958 | | |
959 | | // Should never get here. |
960 | 0 | return STATUS(IllegalState, "Load balancing algorithm reached illegal state."); |
961 | 802k | } |
962 | | |
963 | | Result<bool> ClusterLoadBalancer::GetTabletToMove( |
964 | 270k | const TabletServerId& from_ts, const TabletServerId& to_ts, TabletId* moving_tablet_id) { |
965 | 270k | const auto& from_ts_meta = state_->per_ts_meta_[from_ts]; |
966 | | // If drive aware, all_tablets is sorted by decreasing drive load. |
967 | 270k | vector<set<TabletId>> all_tablets_by_drive = GetTabletsOnTSToMove(global_state_->drive_aware_, |
968 | 270k | from_ts_meta); |
969 | 270k | vector<set<TabletId>> all_filtered_tablets_by_drive; |
970 | 544k | for (const set<TabletId>& drive_tablets : all_tablets_by_drive) { |
971 | 544k | set<TabletId> filtered_drive_tablets; |
972 | 2.49M | for (const TabletId& tablet_id : drive_tablets) { |
973 | | // We don't want to add a new replica to an already over-replicated tablet. |
974 | | // |
975 | | // TODO(bogdan): should make sure we pick tablets that this TS is not a leader of, so we |
976 | | // can ensure HandleRemoveReplicas removes them from this TS. |
977 | 2.49M | if (state_->tablets_over_replicated_.count(tablet_id)) { |
978 | 8.10k | continue; |
979 | 8.10k | } |
980 | | // Don't move a replica right after split |
981 | 2.48M | if (ContainsKey(from_ts_meta.disabled_by_ts_tablets, tablet_id)) { |
982 | 0 | continue; |
983 | 0 | } |
984 | | |
985 | 2.48M | if (VERIFY_RESULT( |
986 | 2.48M | state_->CanAddTabletToTabletServer(tablet_id, to_ts, &GetPlacementByTablet(tablet_id)))) { |
987 | 311k | filtered_drive_tablets.insert(tablet_id); |
988 | 311k | } |
989 | 2.48M | } |
990 | 544k | all_filtered_tablets_by_drive.push_back(filtered_drive_tablets); |
991 | 544k | } |
992 | | |
993 | | // Below, we choose a tablet to move. We first filter out any tablets which cannot be moved |
994 | | // because of placement limitations. Then, we prioritize moving a tablet whose leader is in the |
995 | | // same zone/region it is moving to (for faster remote bootstrapping). |
996 | 543k | for (const set<TabletId>& drive_tablets : all_filtered_tablets_by_drive) { |
997 | 543k | bool found_tablet_to_move = false; |
998 | 543k | CatalogManagerUtil::CloudInfoSimilarity chosen_tablet_ci_similarity = |
999 | 543k | CatalogManagerUtil::NO_MATCH; |
1000 | 543k | for (const TabletId& tablet_id : drive_tablets) { |
1001 | 300k | const auto& placement_info = GetPlacementByTablet(tablet_id); |
1002 | | // TODO(bogdan): this should be augmented as well to allow dropping by one replica, if still |
1003 | | // leaving us with more than the minimum. |
1004 | | // |
1005 | | // If we have placement information, we want to only pick the tablet if it's moving to the |
1006 | | // same placement, so we guarantee we're keeping the same type of distribution. |
1007 | | // Since we allow prefixes as well, we can still respect the placement of this tablet |
1008 | | // even if their placement ids aren't the same. An e.g. |
1009 | | // placement info of tablet: C.R1.* |
1010 | | // placement info of from_ts: C.R1.Z1 |
1011 | | // placement info of to_ts: C.R2.Z2 |
1012 | | // Note that we've assumed that for every TS there is a unique placement block to which it |
1013 | | // can be mapped (see the validation rules in yb_admin-client). If there is no unique |
1014 | | // placement block then it is simply the C.R.Z of the TS itself. |
1015 | 300k | auto from_ts_block = state_->GetValidPlacement(from_ts, &placement_info); |
1016 | 300k | auto to_ts_block = state_->GetValidPlacement(to_ts, &placement_info); |
1017 | 300k | bool same_placement = false; |
1018 | 300k | if (to_ts_block.has_value() && from_ts_block.has_value()) { |
1019 | 300k | same_placement = TSDescriptor::generate_placement_id(*from_ts_block) == |
1020 | 300k | TSDescriptor::generate_placement_id(*to_ts_block); |
1021 | 300k | } |
1022 | | |
1023 | 300k | if (!placement_info.placement_blocks().empty() && !same_placement291k ) { |
1024 | 290k | continue; |
1025 | 290k | } |
1026 | | |
1027 | 9.75k | TabletServerId leader_ts = state_->per_tablet_meta_[tablet_id].leader_uuid; |
1028 | 9.75k | auto ci_similarity = CatalogManagerUtil::CloudInfoSimilarity::NO_MATCH; |
1029 | 9.75k | if (!leader_ts.empty() && !FLAGS_load_balancer_ignore_cloud_info_similarity9.42k ) { |
1030 | 9.41k | const auto leader_ci = state_->per_ts_meta_[leader_ts].descriptor->GetCloudInfo(); |
1031 | 9.41k | const auto to_ts_ci = state_->per_ts_meta_[to_ts].descriptor->GetCloudInfo(); |
1032 | 9.41k | ci_similarity = CatalogManagerUtil::ComputeCloudInfoSimilarity(leader_ci, to_ts_ci); |
1033 | 9.41k | } |
1034 | | |
1035 | 9.75k | if (found_tablet_to_move && ci_similarity <= chosen_tablet_ci_similarity8.68k ) { |
1036 | 8.63k | continue; |
1037 | 8.63k | } |
1038 | | // This is the best tablet to move, so far. |
1039 | 1.12k | found_tablet_to_move = true; |
1040 | 1.12k | *moving_tablet_id = tablet_id; |
1041 | 1.12k | chosen_tablet_ci_similarity = ci_similarity; |
1042 | 1.12k | } |
1043 | | |
1044 | | // If there is any tablet we can move from this drive, choose it and return. |
1045 | 543k | if (found_tablet_to_move) { |
1046 | 1.07k | return true; |
1047 | 1.07k | } |
1048 | 543k | } |
1049 | | |
1050 | 269k | return false; |
1051 | 270k | } |
1052 | | |
1053 | | Result<bool> ClusterLoadBalancer::GetLeaderToMove(TabletId* moving_tablet_id, |
1054 | | TabletServerId* from_ts, |
1055 | | TabletServerId* to_ts, |
1056 | 885k | std::string* to_ts_path) { |
1057 | 885k | if (state_->sorted_leader_load_.empty()) { |
1058 | 7.31k | return false; |
1059 | 7.31k | } |
1060 | | |
1061 | | // Find out if there are leaders to be moved. |
1062 | 879k | for (auto right = state_->sorted_leader_load_.size(); 878k right > 0;) { |
1063 | 879k | --right; |
1064 | 879k | const TabletServerId& high_load_uuid = state_->sorted_leader_load_[right]; |
1065 | 879k | auto high_leader_blacklisted = (state_->leader_blacklisted_servers_.find(high_load_uuid) != |
1066 | 879k | state_->leader_blacklisted_servers_.end()); |
1067 | 879k | if (high_leader_blacklisted) { |
1068 | 1.15k | auto high_load = state_->GetLeaderLoad(high_load_uuid); |
1069 | 1.15k | if (high_load > 0) { |
1070 | | // Leader blacklisted tserver with a leader replica. |
1071 | 55 | break; |
1072 | 1.09k | } else { |
1073 | | // Leader blacklisted tserver without leader replica. |
1074 | 1.09k | continue; |
1075 | 1.09k | } |
1076 | 878k | } else { |
1077 | 878k | if (state_->IsLeaderLoadBelowThreshold(state_->sorted_leader_load_[right])) { |
1078 | | // Non-leader blacklisted tserver with not too many leader replicas. |
1079 | | // TODO(Sanket): Even though per table load is below the configured threshold, |
1080 | | // we might want to do global leader balancing above a certain threshold that is lower |
1081 | | // than the per table threshold. Can add another gflag/knob here later. |
1082 | 10 | return false; |
1083 | 878k | } else { |
1084 | | // Non-leader blacklisted tserver with too many leader replicas. |
1085 | 878k | break; |
1086 | 878k | } |
1087 | 878k | } |
1088 | 879k | } |
1089 | | |
1090 | | // The algorithm to balance the leaders is very similar to the one for tablets: |
1091 | | // |
1092 | | // Start with two indices pointing at left and right most ends of the sorted_leader_load_ |
1093 | | // structure. Note that leader blacklisted tserver is considered as having infinite leader load. |
1094 | | // |
1095 | | // We will try to find two TSs that have at least one leader that can be moved amongst them, from |
1096 | | // the higher load to the lower load TS. To do this, we will go through comparing the TSs |
1097 | | // corresponding to our left and right indices. We go through leaders on the higher loaded TS |
1098 | | // and find a running replica on the lower loaded TS to move the leader. If no leader can be |
1099 | | // be picked, we advance our state. |
1100 | | // |
1101 | | // The state is defined as the positions of the start and end indices. We always try to move the |
1102 | | // right index back, until we cannot any more, due to either reaching the left index (cannot |
1103 | | // rebalance from one TS to itself), or the difference of load between the two TSs is too low to |
1104 | | // try to rebalance (if load variance is 1, it does not make sense to move leaders between the |
1105 | | // TSs). When we cannot lower the right index any further, we reset it back to last_pos and |
1106 | | // increment the left index. |
1107 | | // |
1108 | | // We stop the whole algorithm if the left index reaches last_pos, or if we reset the right index |
1109 | | // and are already breaking the invariance rule, as that means that any further differences in |
1110 | | // the interval between left and right cannot have load > kMinLeaderLoadVarianceToBalance. |
1111 | 878k | const auto current_time = MonoTime::Now(); |
1112 | 878k | ssize_t last_pos = state_->sorted_leader_load_.size() - 1; |
1113 | 931k | for (ssize_t left = 0; left <= last_pos; ++left53.0k ) { |
1114 | 931k | const TabletServerId& low_load_uuid = state_->sorted_leader_load_[left]; |
1115 | 931k | auto low_leader_blacklisted = (state_->leader_blacklisted_servers_.find(low_load_uuid) != |
1116 | 931k | state_->leader_blacklisted_servers_.end()); |
1117 | 931k | if (low_leader_blacklisted) { |
1118 | | // Left marker has gone beyond non-leader blacklisted tservers. |
1119 | 1.06k | return false; |
1120 | 1.06k | } |
1121 | | |
1122 | 1.01M | for (auto right = last_pos; 930k right >= 0; --right85.7k ) { |
1123 | 1.01M | const TabletServerId& high_load_uuid = state_->sorted_leader_load_[right]; |
1124 | 1.01M | auto high_leader_blacklisted = (state_->leader_blacklisted_servers_.find(high_load_uuid) != |
1125 | 1.01M | state_->leader_blacklisted_servers_.end()); |
1126 | 1.01M | ssize_t load_variance = |
1127 | 1.01M | state_->GetLeaderLoad(high_load_uuid) - state_->GetLeaderLoad(low_load_uuid); |
1128 | | |
1129 | 1.01M | bool is_global_balancing_move = false; |
1130 | | |
1131 | | // Check for state change or end conditions. |
1132 | 1.01M | if (left == right || (923k load_variance < state_->options_->kMinLeaderLoadVarianceToBalance923k && |
1133 | 923k | !high_leader_blacklisted792k )) { |
1134 | | // Global leader balancing only if per table variance is > 0. |
1135 | | // If both left and right are same (i.e. load_variance is 0) and right is last_pos |
1136 | | // or right is last_pos and load_variance is 0 then we can return as we don't |
1137 | | // have any other moves to make. |
1138 | 880k | if (load_variance == 0 && right == last_pos615k ) { |
1139 | 596k | return false; |
1140 | 596k | } |
1141 | | // Check if we can benefit from global leader balancing. |
1142 | | // If we have > 0 load_variance and there are no per table moves left. |
1143 | 283k | if (load_variance > 0 && CanBalanceGlobalLoad()265k ) { |
1144 | 230k | int global_load_variance = state_->global_state_->GetGlobalLeaderLoad(high_load_uuid) - |
1145 | 230k | state_->global_state_->GetGlobalLeaderLoad(low_load_uuid); |
1146 | | // Already globally balanced. Since we are sorted by global load, we can return here as |
1147 | | // there are no other moves for us to make. |
1148 | 230k | if (global_load_variance < state_->options_->kMinGlobalLeaderLoadVarianceToBalance) { |
1149 | 226k | return false; |
1150 | 226k | } |
1151 | 4.10k | is_global_balancing_move = true; |
1152 | 53.0k | } else { |
1153 | 53.0k | break; |
1154 | 53.0k | } |
1155 | 283k | } |
1156 | | |
1157 | | // Find the leaders on the higher loaded TS that have running peers on the lower loaded TS. |
1158 | | // If there are, we have a candidate we want, so fill in the output params and return. |
1159 | 139k | const set<TabletId>& leaders = state_->per_ts_meta_[high_load_uuid].leaders; |
1160 | 139k | for (const auto& tablet : GetLeadersOnTSToMove(global_state_->drive_aware_, |
1161 | 139k | leaders, |
1162 | 139k | state_->per_ts_meta_[low_load_uuid])) { |
1163 | 53.8k | *moving_tablet_id = tablet.first; |
1164 | 53.8k | *to_ts_path = tablet.second; |
1165 | 53.8k | *from_ts = high_load_uuid; |
1166 | 53.8k | *to_ts = low_load_uuid; |
1167 | | |
1168 | 53.8k | const auto& per_tablet_meta = state_->per_tablet_meta_; |
1169 | 53.8k | const auto tablet_meta_iter = per_tablet_meta.find(tablet.first); |
1170 | 53.8k | if (PREDICT_TRUE(tablet_meta_iter != per_tablet_meta.end())) { |
1171 | 53.8k | const auto& tablet_meta = tablet_meta_iter->second; |
1172 | 53.8k | const auto& stepdown_failures = tablet_meta.leader_stepdown_failures; |
1173 | 53.8k | const auto stepdown_failure_iter = stepdown_failures.find(low_load_uuid); |
1174 | 53.8k | if (stepdown_failure_iter != stepdown_failures.end()) { |
1175 | 554 | const auto time_since_failure = current_time - stepdown_failure_iter->second; |
1176 | 554 | if (time_since_failure.ToMilliseconds() < FLAGS_min_leader_stepdown_retry_interval_ms) { |
1177 | 554 | LOG(INFO) << "Cannot move tablet " << tablet.first << " leader from TS " |
1178 | 554 | << *from_ts << " to TS " << *to_ts << " yet: previous attempt with the same" |
1179 | 554 | << " intended leader failed only " << ToString(time_since_failure) |
1180 | 554 | << " ago (less " << "than " << FLAGS_min_leader_stepdown_retry_interval_ms |
1181 | 554 | << "ms)."; |
1182 | 554 | } |
1183 | 554 | continue; |
1184 | 554 | } |
1185 | 53.8k | } else { |
1186 | 0 | LOG(WARNING) << "Did not find load balancer metadata for tablet " << *moving_tablet_id; |
1187 | 0 | } |
1188 | | |
1189 | | // Leader movement solely due to leader blacklist. |
1190 | 53.2k | if (load_variance < state_->options_->kMinLeaderLoadVarianceToBalance && |
1191 | 53.2k | high_leader_blacklisted582 ) { |
1192 | 51 | state_->LogSortedLeaderLoad(); |
1193 | 51 | LOG(INFO) << "Move tablet " << tablet.first << " leader from leader blacklisted TS " |
1194 | 51 | << *from_ts << " to TS " << *to_ts; |
1195 | 51 | } |
1196 | 53.2k | if (!is_global_balancing_move) { |
1197 | 52.7k | can_perform_global_operations_ = false; |
1198 | 52.7k | } |
1199 | 53.2k | return true; |
1200 | 53.8k | } |
1201 | 139k | } |
1202 | 930k | } |
1203 | | |
1204 | | // Should never get here. |
1205 | 0 | FATAL_ERROR("Load balancing algorithm reached invalid state!"); |
1206 | 0 | } |
1207 | | |
1208 | | Result<bool> ClusterLoadBalancer::HandleRemoveReplicas( |
1209 | 807k | TabletId* out_tablet_id, TabletServerId* out_from_ts) { |
1210 | | // Give high priority to removing tablets that are not respecting the placement policy. |
1211 | 807k | if (VERIFY_RESULT(HandleRemoveIfWrongPlacement(out_tablet_id, out_from_ts))) { |
1212 | 968 | return true; |
1213 | 968 | } |
1214 | | |
1215 | 806k | for (const auto& tablet_id : state_->tablets_over_replicated_) { |
1216 | | // Skip if there is a pending ADD_SERVER. |
1217 | 4.45k | if (VERIFY_RESULT(IsConfigMemberInTransitionMode(tablet_id)) || |
1218 | 4.45k | state_->per_tablet_meta_[tablet_id].starting > 03.14k ) { |
1219 | 2.90k | continue; |
1220 | 2.90k | } |
1221 | | |
1222 | 1.54k | const auto& tablet_meta = state_->per_tablet_meta_[tablet_id]; |
1223 | 1.54k | const auto& tablet_servers = tablet_meta.over_replicated_tablet_servers; |
1224 | 1.54k | auto comparator = PerTableLoadState::Comparator(state_); |
1225 | 1.54k | vector<TabletServerId> sorted_ts; |
1226 | | // Don't include any tservers where this tablet is still starting. |
1227 | 1.54k | std::copy_if( |
1228 | 1.54k | tablet_servers.begin(), tablet_servers.end(), std::back_inserter(sorted_ts), |
1229 | 5.81k | [&](const TabletServerId& ts_uuid) { |
1230 | 5.81k | return !state_->per_ts_meta_[ts_uuid].starting_tablets.count(tablet_id); |
1231 | 5.81k | }); |
1232 | 1.54k | if (sorted_ts.empty()) { |
1233 | 0 | return STATUS_SUBSTITUTE(IllegalState, "No tservers to remove from over-replicated " |
1234 | 0 | "tablet $0", tablet_id); |
1235 | 0 | } |
1236 | | // Sort in reverse to first try to remove a replica from the highest loaded TS. |
1237 | 1.54k | sort(sorted_ts.rbegin(), sorted_ts.rend(), comparator); |
1238 | 1.54k | string remove_candidate = sorted_ts[0]; |
1239 | 1.54k | *out_tablet_id = tablet_id; |
1240 | 1.54k | *out_from_ts = remove_candidate; |
1241 | | // Do force leader stepdown, as we are either not the leader or we are allowed to step down. |
1242 | 1.54k | RETURN_NOT_OK(RemoveReplica(tablet_id, remove_candidate)); |
1243 | 1.54k | return true; |
1244 | 1.54k | } |
1245 | 805k | return false; |
1246 | 806k | } |
1247 | | |
1248 | | Result<bool> ClusterLoadBalancer::HandleRemoveIfWrongPlacement( |
1249 | 807k | TabletId* out_tablet_id, TabletServerId* out_from_ts) { |
1250 | 807k | for (const auto& tablet_id : state_->tablets_wrong_placement_) { |
1251 | 5.02k | LOG(INFO) << "Processing tablet " << tablet_id; |
1252 | | // Skip this tablet if it is not over-replicated. |
1253 | 5.02k | if (!state_->tablets_over_replicated_.count(tablet_id)) { |
1254 | 3.68k | continue; |
1255 | 3.68k | } |
1256 | | // Skip if there is a pending ADD_SERVER |
1257 | 1.34k | if (VERIFY_RESULT(IsConfigMemberInTransitionMode(tablet_id))) { |
1258 | 372 | continue; |
1259 | 372 | } |
1260 | 968 | const auto& tablet_meta = state_->per_tablet_meta_[tablet_id]; |
1261 | 968 | TabletServerId target_uuid; |
1262 | | // Prioritize blacklisted servers, if any. |
1263 | 968 | if (!tablet_meta.blacklisted_tablet_servers.empty()) { |
1264 | 852 | target_uuid = *tablet_meta.blacklisted_tablet_servers.begin(); |
1265 | 852 | } |
1266 | | // If no blacklisted server could be chosen, try the wrong placement ones. |
1267 | 968 | if (target_uuid.empty()) { |
1268 | 116 | if (!tablet_meta.wrong_placement_tablet_servers.empty()) { |
1269 | 116 | target_uuid = *tablet_meta.wrong_placement_tablet_servers.begin(); |
1270 | 116 | } |
1271 | 116 | } |
1272 | | // If we found a tablet server, choose it. |
1273 | 968 | if (!target_uuid.empty()) { |
1274 | 968 | *out_tablet_id = tablet_id; |
1275 | 968 | *out_from_ts = std::move(target_uuid); |
1276 | | // Force leader stepdown if we have wrong placements or blacklisted servers. |
1277 | 968 | RETURN_NOT_OK(RemoveReplica(tablet_id, *out_from_ts)); |
1278 | 968 | return true; |
1279 | 968 | } |
1280 | 968 | } |
1281 | 806k | return false; |
1282 | 807k | } |
1283 | | |
1284 | | Result<bool> ClusterLoadBalancer::HandleLeaderLoadIfNonAffinitized(TabletId* moving_tablet_id, |
1285 | | TabletServerId* from_ts, |
1286 | | TabletServerId* to_ts, |
1287 | 770k | std::string* to_ts_path) { |
1288 | | // Similar to normal leader balancing, we double iterate from most loaded to least loaded |
1289 | | // non-affinitized nodes and least to most affinitized nodes. For each pair, we check whether |
1290 | | // there is any tablet intersection and if so, there is a match and we return true. |
1291 | | // |
1292 | | // If we go through all the node pairs or we see that the current non-affinitized |
1293 | | // leader load is 0, we know that there is no match from non-affinitized to affinitized nodes |
1294 | | // and we return false. |
1295 | 770k | for (size_t non_affinitized_idx = state_->sorted_non_affinitized_leader_load_.size(); |
1296 | 771k | non_affinitized_idx > 0;) { |
1297 | 712 | --non_affinitized_idx; |
1298 | 712 | const TabletServerId& non_affinitized_uuid = |
1299 | 712 | state_->sorted_non_affinitized_leader_load_[non_affinitized_idx]; |
1300 | 712 | if (state_->GetLeaderLoad(non_affinitized_uuid) == 0) { |
1301 | | // All subsequent non-affinitized nodes have no leaders, no match found. |
1302 | 73 | return false; |
1303 | 73 | } |
1304 | 639 | const set<TabletId>& leaders = state_->per_ts_meta_[non_affinitized_uuid].leaders; |
1305 | 639 | for (const auto& affinitized_uuid : state_->sorted_leader_load_) { |
1306 | 37 | auto peers = GetLeadersOnTSToMove(global_state_->drive_aware_, |
1307 | 37 | leaders, |
1308 | 37 | state_->per_ts_meta_[affinitized_uuid]); |
1309 | | |
1310 | 37 | if (!peers.empty()) { |
1311 | 37 | auto peer = peers.begin(); |
1312 | 37 | *moving_tablet_id = peer->first; |
1313 | 37 | *to_ts_path = peer->first; |
1314 | 37 | *from_ts = non_affinitized_uuid; |
1315 | 37 | *to_ts = affinitized_uuid; |
1316 | 37 | return true; |
1317 | 37 | } |
1318 | 37 | } |
1319 | 639 | } |
1320 | 770k | return false; |
1321 | 770k | } |
1322 | | |
1323 | | Result<bool> ClusterLoadBalancer::HandleLeaderMoves( |
1324 | 885k | TabletId* out_tablet_id, TabletServerId* out_from_ts, TabletServerId* out_to_ts) { |
1325 | | // If the user sets 'transaction_tables_use_preferred_zones' gflag to 0 and the tablet |
1326 | | // being balanced is a transaction tablet, then logical flow will be changed to ignore |
1327 | | // preferred zones and instead proceed to normal leader balancing. |
1328 | 885k | string out_ts_ts_path; |
1329 | 885k | if (state_->use_preferred_zones_ && |
1330 | 885k | VERIFY_RESULT770k (HandleLeaderLoadIfNonAffinitized( |
1331 | 885k | out_tablet_id, out_from_ts, out_to_ts, &out_ts_ts_path))) { |
1332 | 37 | RETURN_NOT_OK(MoveLeader(*out_tablet_id, *out_from_ts, *out_to_ts, out_ts_ts_path)); |
1333 | 37 | return true; |
1334 | 37 | } |
1335 | | |
1336 | 885k | if (VERIFY_RESULT(GetLeaderToMove(out_tablet_id, out_from_ts, out_to_ts, &out_ts_ts_path))) { |
1337 | 53.2k | RETURN_NOT_OK(MoveLeader(*out_tablet_id, *out_from_ts, *out_to_ts, out_ts_ts_path)); |
1338 | 53.2k | return true; |
1339 | 53.2k | } |
1340 | 832k | return false; |
1341 | 885k | } |
1342 | | |
1343 | | Status ClusterLoadBalancer::MoveReplica( |
1344 | 1.51k | const TabletId& tablet_id, const TabletServerId& from_ts, const TabletServerId& to_ts) { |
1345 | 1.51k | LOG(INFO) << Substitute("Moving tablet $0 from $1 to $2", tablet_id, from_ts, to_ts); |
1346 | 1.51k | RETURN_NOT_OK(SendReplicaChanges(GetTabletMap().at(tablet_id), to_ts, true /* is_add */, |
1347 | 1.51k | true /* should_remove_leader */)); |
1348 | 1.51k | RETURN_NOT_OK(state_->AddReplica(tablet_id, to_ts)); |
1349 | 1.51k | return GetAtomicFlag(&FLAGS_load_balancer_count_move_as_add) ? |
1350 | 1.51k | Status::OK() : state_->RemoveReplica(tablet_id, from_ts)0 ; |
1351 | 1.51k | } |
1352 | | |
1353 | 713 | Status ClusterLoadBalancer::AddReplica(const TabletId& tablet_id, const TabletServerId& to_ts) { |
1354 | 713 | LOG(INFO) << Substitute("Adding tablet $0 to $1", tablet_id, to_ts); |
1355 | | // This is an add operation, so the "should_remove_leader" flag is irrelevant. |
1356 | 713 | RETURN_NOT_OK(SendReplicaChanges(GetTabletMap().at(tablet_id), to_ts, true /* is_add */, |
1357 | 713 | true /* should_remove_leader */)); |
1358 | 713 | return state_->AddReplica(tablet_id, to_ts); |
1359 | 713 | } |
1360 | | |
1361 | | Status ClusterLoadBalancer::RemoveReplica( |
1362 | 2.55k | const TabletId& tablet_id, const TabletServerId& ts_uuid) { |
1363 | 2.55k | LOG(INFO) << Substitute("Removing replica $0 from tablet $1", ts_uuid, tablet_id); |
1364 | 2.55k | RETURN_NOT_OK(SendReplicaChanges(GetTabletMap().at(tablet_id), ts_uuid, false /* is_add */, |
1365 | 2.55k | true /* should_remove_leader */)); |
1366 | 2.55k | return state_->RemoveReplica(tablet_id, ts_uuid); |
1367 | 2.55k | } |
1368 | | |
1369 | | Status ClusterLoadBalancer::MoveLeader(const TabletId& tablet_id, |
1370 | | const TabletServerId& from_ts, |
1371 | | const TabletServerId& to_ts, |
1372 | 53.3k | const string& to_ts_path) { |
1373 | 53.3k | LOG(INFO) << Substitute("Moving leader of $0 from TS $1 to $2", tablet_id, from_ts, to_ts); |
1374 | 53.3k | RETURN_NOT_OK(SendReplicaChanges(GetTabletMap().at(tablet_id), from_ts, false /* is_add */, |
1375 | 53.3k | false /* should_remove_leader */, to_ts)); |
1376 | | |
1377 | 53.3k | return state_->MoveLeader(tablet_id, from_ts, to_ts, to_ts_path); |
1378 | 53.3k | } |
1379 | | |
1380 | 775k | void ClusterLoadBalancer::GetAllAffinitizedZones(AffinitizedZonesSet* affinitized_zones) const { |
1381 | 775k | SysClusterConfigEntryPB config; |
1382 | 775k | CHECK_OK(catalog_manager_->GetClusterConfig(&config)); |
1383 | 775k | const int num_zones = config.replication_info().affinitized_leaders_size(); |
1384 | 776k | for (int i = 0; i < num_zones; i++306 ) { |
1385 | 306 | CloudInfoPB ci = config.replication_info().affinitized_leaders(i); |
1386 | 306 | affinitized_zones->insert(ci); |
1387 | 306 | } |
1388 | 775k | } |
1389 | | |
1390 | 891k | void ClusterLoadBalancer::InitializeTSDescriptors() { |
1391 | 891k | if (state_->use_preferred_zones_) { |
1392 | 775k | GetAllAffinitizedZones(&state_->affinitized_zones_); |
1393 | 775k | } |
1394 | | // Set the blacklist so we can also mark the tablet servers as we add them up. |
1395 | 891k | state_->SetBlacklist(GetServerBlacklist()); |
1396 | | |
1397 | | // Set the leader blacklist so we can also mark the tablet servers as we add them up. |
1398 | 891k | state_->SetLeaderBlacklist(GetLeaderBlacklist()); |
1399 | | |
1400 | | // Loop over tablet servers to set empty defaults, so we can also have info on those |
1401 | | // servers that have yet to receive load (have heartbeated to the master, but have not been |
1402 | | // assigned any tablets yet). |
1403 | 2.86M | for (const auto& ts_desc : global_state_->ts_descs_) { |
1404 | 2.86M | state_->UpdateTabletServer(ts_desc); |
1405 | 2.86M | } |
1406 | 891k | } |
1407 | | |
1408 | | // CatalogManager indirection methods that are set as virtual to be bypassed in testing. |
1409 | | // |
1410 | 0 | void ClusterLoadBalancer::GetAllReportedDescriptors(TSDescriptorVector* ts_descs) const { |
1411 | 0 | catalog_manager_->master_->ts_manager()->GetAllReportedDescriptors(ts_descs); |
1412 | 0 | } |
1413 | | |
1414 | 1.55M | void ClusterLoadBalancer::GetAllDescriptors(TSDescriptorVector* ts_descs) const { |
1415 | 1.55M | catalog_manager_->master_->ts_manager()->GetAllDescriptors(ts_descs); |
1416 | 1.55M | } |
1417 | | |
1418 | 2.96M | const TabletInfoMap& ClusterLoadBalancer::GetTabletMap() const { |
1419 | 2.96M | return *catalog_manager_->tablet_map_; |
1420 | 2.96M | } |
1421 | | |
1422 | 891k | const scoped_refptr<TableInfo> ClusterLoadBalancer::GetTableInfo(const TableId& table_uuid) const { |
1423 | 891k | return catalog_manager_->GetTableInfoUnlocked(table_uuid); |
1424 | 891k | } |
1425 | | |
1426 | 891k | Result<TabletInfos> ClusterLoadBalancer::GetTabletsForTable(const TableId& table_uuid) const { |
1427 | 891k | auto table_info = GetTableInfo(table_uuid); |
1428 | | |
1429 | 891k | if (table_info == nullptr) { |
1430 | 0 | return STATUS_FORMAT( |
1431 | 0 | InvalidArgument, "Invalid UUID '$0' - no entry found in catalog manager table map", |
1432 | 0 | table_uuid); |
1433 | 0 | } |
1434 | | |
1435 | 891k | return table_info->GetTablets(IncludeInactive(!FLAGS_TEST_load_balancer_skip_inactive_tablets)); |
1436 | 891k | } |
1437 | | |
1438 | 3.79M | const TableInfoMap& ClusterLoadBalancer::GetTableMap() const { |
1439 | 3.79M | return *catalog_manager_->table_ids_map_; |
1440 | 3.79M | } |
1441 | | |
1442 | 872k | const ReplicationInfoPB& ClusterLoadBalancer::GetClusterReplicationInfo() const { |
1443 | 872k | return catalog_manager_->ClusterConfig()->LockForRead()->pb.replication_info(); |
1444 | 872k | } |
1445 | | |
1446 | 0 | const PlacementInfoPB& ClusterLoadBalancer::GetClusterPlacementInfo() const { |
1447 | 0 | auto l = catalog_manager_->ClusterConfig()->LockForRead(); |
1448 | 0 | if (state_->options_->type == LIVE) { |
1449 | 0 | return l->pb.replication_info().live_replicas(); |
1450 | 0 | } else { |
1451 | 0 | return GetReadOnlyPlacementFromUuid(l->pb.replication_info()); |
1452 | 0 | } |
1453 | 0 | } |
1454 | | |
1455 | 891k | const BlacklistPB& ClusterLoadBalancer::GetServerBlacklist() const { |
1456 | 891k | return catalog_manager_->ClusterConfig()->LockForRead()->pb.server_blacklist(); |
1457 | 891k | } |
1458 | | |
1459 | 891k | const BlacklistPB& ClusterLoadBalancer::GetLeaderBlacklist() const { |
1460 | 891k | return catalog_manager_->ClusterConfig()->LockForRead()->pb.leader_blacklist(); |
1461 | 891k | } |
1462 | | |
1463 | 231M | bool ClusterLoadBalancer::SkipLoadBalancing(const TableInfo& table) const { |
1464 | | // Skip load-balancing of some tables: |
1465 | | // * system tables: they are virtual tables not hosted by tservers. |
1466 | | // * colocated user tables: they occupy the same tablet as their colocated parent table, so load |
1467 | | // balancing just the colocated parent table is sufficient. |
1468 | | // * deleted/deleting tables: as they are no longer in effect. For tables that are being deleted |
1469 | | // currently as well, load distribution wouldn't matter as eventually they would get deleted. |
1470 | 231M | auto l = table.LockForRead(); |
1471 | 231M | return (catalog_manager_->IsSystemTable(table) || |
1472 | 231M | table.IsColocatedUserTable()4.09M || |
1473 | 231M | l->started_deleting()4.06M ); |
1474 | 231M | } |
1475 | | |
1476 | | Status ClusterLoadBalancer::CountPendingTasksUnlocked(const TableId& table_uuid, |
1477 | | int* pending_add_replica_tasks, |
1478 | | int* pending_remove_replica_tasks, |
1479 | 891k | int* pending_stepdown_leader_tasks) { |
1480 | 891k | GetPendingTasks(table_uuid, |
1481 | 891k | &state_->pending_add_replica_tasks_[table_uuid], |
1482 | 891k | &state_->pending_remove_replica_tasks_[table_uuid], |
1483 | 891k | &state_->pending_stepdown_leader_tasks_[table_uuid]); |
1484 | | |
1485 | 891k | *pending_add_replica_tasks += state_->pending_add_replica_tasks_[table_uuid].size(); |
1486 | 891k | *pending_remove_replica_tasks += state_->pending_remove_replica_tasks_[table_uuid].size(); |
1487 | 891k | *pending_stepdown_leader_tasks += state_->pending_stepdown_leader_tasks_[table_uuid].size(); |
1488 | 891k | for (auto e : state_->pending_add_replica_tasks_[table_uuid]) { |
1489 | 423 | const auto& ts_uuid = e.second; |
1490 | 423 | const auto& tablet_id = e.first; |
1491 | 423 | RETURN_NOT_OK(state_->AddStartingTablet(tablet_id, ts_uuid)); |
1492 | 423 | } |
1493 | 891k | return Status::OK(); |
1494 | 891k | } |
1495 | | |
1496 | | void ClusterLoadBalancer::GetPendingTasks(const TableId& table_uuid, |
1497 | | TabletToTabletServerMap* add_replica_tasks, |
1498 | | TabletToTabletServerMap* remove_replica_tasks, |
1499 | 891k | TabletToTabletServerMap* stepdown_leader_tasks) { |
1500 | 891k | catalog_manager_->GetPendingServerTasksUnlocked( |
1501 | 891k | table_uuid, add_replica_tasks, remove_replica_tasks, stepdown_leader_tasks); |
1502 | 891k | } |
1503 | | |
1504 | | Status ClusterLoadBalancer::SendReplicaChanges( |
1505 | | scoped_refptr<TabletInfo> tablet, const TabletServerId& ts_uuid, const bool is_add, |
1506 | 57.9k | const bool should_remove_leader, const TabletServerId& new_leader_ts_uuid) { |
1507 | 57.9k | auto l = tablet->LockForRead(); |
1508 | 57.9k | if (is_add) { |
1509 | | // These checks are temporary. They will be removed once we are confident that the algorithm is |
1510 | | // always doing the right thing. |
1511 | 2.17k | SCHECK_EQ(state_->pending_add_replica_tasks_[tablet->table()->id()].count(tablet->tablet_id()), |
1512 | 2.17k | 0U, |
1513 | 2.17k | IllegalState, |
1514 | 2.17k | "Sending duplicate add replica task."); |
1515 | 2.17k | catalog_manager_->SendAddServerRequest( |
1516 | 2.17k | tablet, GetDefaultMemberType(), l->pb.committed_consensus_state(), ts_uuid); |
1517 | 55.8k | } else { |
1518 | | // If the replica is also the leader, first step it down and then remove. |
1519 | 55.8k | if (state_->per_tablet_meta_[tablet->id()].leader_uuid == ts_uuid) { |
1520 | 54.0k | SCHECK_EQ( |
1521 | 54.0k | state_->pending_stepdown_leader_tasks_[tablet->table()->id()].count(tablet->tablet_id()), |
1522 | 54.0k | 0U, |
1523 | 54.0k | IllegalState, |
1524 | 54.0k | "Sending duplicate leader stepdown task."); |
1525 | 54.0k | catalog_manager_->SendLeaderStepDownRequest( |
1526 | 54.0k | tablet, l->pb.committed_consensus_state(), ts_uuid, should_remove_leader, |
1527 | 54.0k | new_leader_ts_uuid); |
1528 | 54.0k | } else { |
1529 | 1.71k | SCHECK_EQ( |
1530 | 1.71k | state_->pending_remove_replica_tasks_[tablet->table()->id()].count(tablet->tablet_id()), |
1531 | 1.71k | 0U, |
1532 | 1.71k | IllegalState, |
1533 | 1.71k | "Sending duplicate remove replica task."); |
1534 | 1.71k | catalog_manager_->SendRemoveServerRequest( |
1535 | 1.71k | tablet, l->pb.committed_consensus_state(), ts_uuid); |
1536 | 1.71k | } |
1537 | 55.8k | } |
1538 | 57.9k | return Status::OK(); |
1539 | 57.9k | } |
1540 | | |
1541 | 2.17k | consensus::PeerMemberType ClusterLoadBalancer::GetDefaultMemberType() { |
1542 | 2.17k | if (state_->options_->type == LIVE) { |
1543 | 2.09k | return consensus::PeerMemberType::PRE_VOTER; |
1544 | 2.09k | } else { |
1545 | 73 | return consensus::PeerMemberType::PRE_OBSERVER; |
1546 | 73 | } |
1547 | 2.17k | } |
1548 | | |
1549 | 5.79k | Result<bool> ClusterLoadBalancer::IsConfigMemberInTransitionMode(const TabletId &tablet_id) const { |
1550 | 5.79k | auto tablet = GetTabletMap().at(tablet_id); |
1551 | 5.79k | auto l = tablet->LockForRead(); |
1552 | 5.79k | auto config = l->pb.committed_consensus_state().config(); |
1553 | 5.79k | return CountVotersInTransition(config) != 0; |
1554 | 5.79k | } |
1555 | | |
1556 | | const PlacementInfoPB& ClusterLoadBalancer::GetReadOnlyPlacementFromUuid( |
1557 | 3.25k | const ReplicationInfoPB& replication_info) const { |
1558 | | // We assume we have an read replicas field in our replication info. |
1559 | 3.29k | for (int i = 0; i < replication_info.read_replicas_size(); i++42 ) { |
1560 | 3.29k | const PlacementInfoPB& read_only_placement = replication_info.read_replicas(i); |
1561 | 3.29k | if (read_only_placement.placement_uuid() == state_->options_->placement_uuid) { |
1562 | 3.25k | return read_only_placement; |
1563 | 3.25k | } |
1564 | 3.29k | } |
1565 | | // Should never get here. |
1566 | 0 | LOG(ERROR) << "Could not find read only cluster with placement uuid: " |
1567 | 0 | << state_->options_->placement_uuid; |
1568 | 0 | return replication_info.read_replicas(0); |
1569 | 3.25k | } |
1570 | | |
1571 | 0 | const PlacementInfoPB& ClusterLoadBalancer::GetLiveClusterPlacementInfo() const { |
1572 | 0 | auto l = catalog_manager_->ClusterConfig()->LockForRead(); |
1573 | 0 | return l->pb.replication_info().live_replicas(); |
1574 | 0 | } |
1575 | | |
1576 | 0 | vector<scoped_refptr<TableInfo>> ClusterLoadBalancer::GetAllTablesLoadBalancerSkipped() { |
1577 | 0 | SharedLock<decltype(mutex_)> l(mutex_); |
1578 | 0 | return skipped_tables_; |
1579 | 0 | } |
1580 | | |
1581 | | } // namespace master |
1582 | | } // namespace yb |