/Users/deen/code/yugabyte-db/src/yb/master/cluster_balance.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | // |
13 | | |
14 | | #include "yb/master/cluster_balance.h" |
15 | | |
16 | | #include <algorithm> |
17 | | #include <memory> |
18 | | #include <utility> |
19 | | |
20 | | #include <boost/algorithm/string/join.hpp> |
21 | | #include <boost/optional/optional.hpp> |
22 | | |
23 | | #include "yb/common/common.pb.h" |
24 | | |
25 | | #include "yb/consensus/quorum_util.h" |
26 | | |
27 | | #include "yb/gutil/casts.h" |
28 | | |
29 | | #include "yb/master/catalog_manager_util.h" |
30 | | |
31 | | #include "yb/master/master_fwd.h" |
32 | | #include "yb/master/master.h" |
33 | | #include "yb/master/master_error.h" |
34 | | |
35 | | #include "yb/util/flag_tags.h" |
36 | | #include "yb/util/status.h" |
37 | | #include "yb/util/status_format.h" |
38 | | #include "yb/util/status_log.h" |
39 | | |
40 | | DEFINE_bool(enable_load_balancing, |
41 | | true, |
42 | | "Choose whether to enable the load balancing algorithm, to move tablets around."); |
43 | | |
44 | | DEFINE_bool(transaction_tables_use_preferred_zones, |
45 | | false, |
46 | | "Choose whether transaction tablet leaders respect preferred zones."); |
47 | | |
48 | | DEFINE_bool(enable_global_load_balancing, |
49 | | true, |
50 | | "Choose whether to allow the load balancer to make moves that strictly only balance " |
51 | | "global load. Note that global balancing only occurs after all tables are balanced."); |
52 | | |
53 | | DEFINE_int32(leader_balance_threshold, |
54 | | 0, |
55 | | "Number of leaders per each tablet server to balance below. If this is configured to " |
56 | | "0 (the default), the leaders will be balanced optimally at extra cost."); |
57 | | |
58 | | DEFINE_int32(leader_balance_unresponsive_timeout_ms, |
59 | | 3 * 1000, |
60 | | "The period of time that a master can go without receiving a heartbeat from a " |
61 | | "tablet server before considering it unresponsive. Unresponsive servers are " |
62 | | "excluded from leader balancing."); |
63 | | |
64 | | DEFINE_int32(load_balancer_max_concurrent_tablet_remote_bootstraps, |
65 | | 10, |
66 | | "Maximum number of tablets being remote bootstrapped across the cluster."); |
67 | | |
68 | | DEFINE_int32(load_balancer_max_concurrent_tablet_remote_bootstraps_per_table, |
69 | | 2, |
70 | | "Maximum number of tablets being remote bootstrapped for any table. The maximum " |
71 | | "number of remote bootstraps across the cluster is still limited by the flag " |
72 | | "load_balancer_max_concurrent_tablet_remote_bootstraps. This flag is meant to prevent " |
73 | | "a single table use all the available remote bootstrap sessions and starving other " |
74 | | "tables."); |
75 | | |
76 | | DEFINE_int32(load_balancer_max_over_replicated_tablets, |
77 | | 1, |
78 | | "Maximum number of running tablet replicas that are allowed to be over the configured " |
79 | | "replication factor."); |
80 | | |
81 | | DEFINE_int32(load_balancer_max_concurrent_adds, |
82 | | 1, |
83 | | "Maximum number of tablet peer replicas to add in any one run of the load balancer."); |
84 | | |
85 | | DEFINE_int32(load_balancer_max_concurrent_removals, |
86 | | 1, |
87 | | "Maximum number of over-replicated tablet peer removals to do in any one run of the " |
88 | | "load balancer."); |
89 | | |
90 | | DEFINE_int32(load_balancer_max_concurrent_moves, |
91 | | 2, |
92 | | "Maximum number of tablet leaders on tablet servers (across the cluster) to move in " |
93 | | "any one run of the load balancer."); |
94 | | |
95 | | DEFINE_int32(load_balancer_max_concurrent_moves_per_table, |
96 | | 1, |
97 | | "Maximum number of tablet leaders per table to move in any one run of the load " |
98 | | "balancer. The maximum number of tablet leader moves across the cluster is still " |
99 | | "limited by the flag load_balancer_max_concurrent_moves. This flag is meant to " |
100 | | "prevent a single table from using all of the leader moves quota and starving " |
101 | | "other tables."); |
102 | | |
103 | | DEFINE_int32(load_balancer_num_idle_runs, |
104 | | 5, |
105 | | "Number of idle runs of load balancer to deem it idle."); |
106 | | |
107 | | DEFINE_test_flag(bool, load_balancer_handle_under_replicated_tablets_only, false, |
108 | | "Limit the functionality of the load balancer during tests so tests can make " |
109 | | "progress"); |
110 | | |
111 | | // No longer used because leader stepdown is not as slow as it used to be.) |
112 | | DEFINE_bool(load_balancer_skip_leader_as_remove_victim, false, |
113 | | "DEPRECATED. Should the LB skip a leader as a possible remove candidate."); |
114 | | |
115 | | DEFINE_bool(allow_leader_balancing_dead_node, true, |
116 | | "When a tserver is marked as dead, do we continue leader balancing for tables that " |
117 | | "have a replica on this tserver"); |
118 | | |
119 | | DEFINE_test_flag(int32, load_balancer_wait_after_count_pending_tasks_ms, 0, |
120 | | "For testing purposes, number of milliseconds to wait after counting and " |
121 | | "finding pending tasks."); |
122 | | |
123 | | DECLARE_int32(min_leader_stepdown_retry_interval_ms); |
124 | | DECLARE_bool(enable_ysql_tablespaces_for_placement); |
125 | | |
126 | | DEFINE_bool(load_balancer_count_move_as_add, true, |
127 | | "Should we enable state change to count add server triggered by load move as just an " |
128 | | "add instead of both an add and remove."); |
129 | | |
130 | | DEFINE_bool(load_balancer_drive_aware, true, |
131 | | "When LB decides to move a tablet from server A to B, on the target LB " |
132 | | "should select the tablet to move from most loaded drive."); |
133 | | |
134 | | DEFINE_bool(load_balancer_ignore_cloud_info_similarity, false, |
135 | | "If true, ignore the similarity between cloud infos when deciding which tablet " |
136 | | "to move."); |
137 | | |
138 | | // TODO(tsplit): make false by default or even remove flag after |
139 | | // https://github.com/yugabyte/yugabyte-db/issues/10301 is fixed. |
140 | | DEFINE_test_flag( |
141 | | bool, load_balancer_skip_inactive_tablets, true, "Don't move inactive (hidden) tablets"); |
142 | | |
143 | | namespace yb { |
144 | | namespace master { |
145 | | |
146 | | using std::unique_ptr; |
147 | | using std::make_unique; |
148 | | using std::string; |
149 | | using std::set; |
150 | | using std::vector; |
151 | | using strings::Substitute; |
152 | | |
153 | | namespace { |
154 | | |
155 | | vector<set<TabletId>> GetTabletsOnTSToMove(bool drive_aware, |
156 | 19.0k | const CBTabletServerMetadata& from_ts_meta) { |
157 | 19.0k | vector<set<TabletId>> all_tablets; |
158 | 19.0k | if (drive_aware) { |
159 | 41.6k | for (const auto& path : from_ts_meta.sorted_path_load_by_tablets_count) { |
160 | 41.6k | auto path_list = from_ts_meta.path_to_tablets.find(path); |
161 | 41.6k | if (path_list == from_ts_meta.path_to_tablets.end()) { |
162 | 0 | LOG(INFO) << "Found uninitialized path: " << path; |
163 | 0 | continue; |
164 | 0 | } |
165 | 41.6k | all_tablets.push_back(path_list->second); |
166 | 41.6k | } |
167 | 0 | } else { |
168 | 0 | all_tablets.push_back(from_ts_meta.running_tablets); |
169 | 0 | } |
170 | 19.0k | return all_tablets; |
171 | 19.0k | } |
172 | | |
173 | | // Returns sorted list of pair tablet id and path on to_ts. |
174 | | std::vector<std::pair<TabletId, std::string>> GetLeadersOnTSToMove( |
175 | 22.4k | bool drive_aware, const set<TabletId>& leaders, const CBTabletServerMetadata& to_ts_meta) { |
176 | 22.4k | std::vector<std::pair<TabletId, std::string>> peers; |
177 | 22.4k | if (drive_aware) { |
178 | 26.5k | for (const auto& path : to_ts_meta.sorted_path_load_by_leader_count) { |
179 | 26.5k | auto path_list = to_ts_meta.path_to_tablets.find(path); |
180 | 26.5k | if (path_list == to_ts_meta.path_to_tablets.end()) { |
181 | | // No tablets on this path, so skip it. |
182 | 17.6k | continue; |
183 | 17.6k | } |
184 | 8.87k | transform(path_list->second.begin(), path_list->second.end(), std::back_inserter(peers), |
185 | 70.9k | [&path_list](const TabletId& tablet_id) -> std::pair<TabletId, std::string> { |
186 | 70.9k | return make_pair(tablet_id, path_list->first); |
187 | 70.9k | }); |
188 | 8.87k | } |
189 | 0 | } else { |
190 | 0 | transform(to_ts_meta.running_tablets.begin(), to_ts_meta.running_tablets.end(), |
191 | 0 | std::back_inserter(peers), |
192 | 0 | [](const TabletId& tablet_id) -> std::pair<TabletId, std::string> { |
193 | 0 | return make_pair(tablet_id, ""); |
194 | 0 | }); |
195 | 0 | } |
196 | 22.4k | std::vector<std::pair<TabletId, std::string>> intersection; |
197 | 22.4k | copy_if(peers.begin(), peers.end(), std::back_inserter(intersection), |
198 | 70.9k | [&leaders](const std::pair<TabletId, std::string>& tablet) { |
199 | 70.9k | return leaders.count(tablet.first) > 0; |
200 | 70.9k | }); |
201 | 22.4k | return intersection; |
202 | 22.4k | } |
203 | | |
204 | | } // namespace |
205 | | |
206 | | Result<ReplicationInfoPB> ClusterLoadBalancer::GetTableReplicationInfo( |
207 | 120k | const scoped_refptr<TableInfo>& table) const { |
208 | | |
209 | | // Return custom placement policy if it exists. |
210 | 120k | { |
211 | 120k | auto l = table->LockForRead(); |
212 | 120k | if (l->pb.has_replication_info()) { |
213 | 349 | return l->pb.replication_info(); |
214 | 349 | } |
215 | 120k | } |
216 | | |
217 | | // Custom placement policy does not exist. Check whether this table |
218 | | // has a tablespace associated with it, if so, return the placement info |
219 | | // for that tablespace. |
220 | 120k | auto replication_info = VERIFY_RESULT(tablespace_manager_->GetTableReplicationInfo(table)); |
221 | 120k | if (replication_info) { |
222 | 0 | return replication_info.value(); |
223 | 0 | } |
224 | | |
225 | | // No custom policy or tablespace specified for table. |
226 | 120k | return GetClusterReplicationInfo(); |
227 | 120k | } |
228 | | |
229 | 66.3k | void ClusterLoadBalancer::InitTablespaceManager() { |
230 | 66.3k | tablespace_manager_ = catalog_manager_->GetTablespaceManager(); |
231 | 66.3k | } |
232 | | |
233 | 121k | Status ClusterLoadBalancer::PopulatePlacementInfo(TabletInfo* tablet, PlacementInfoPB* pb) { |
234 | 121k | if (state_->options_->type == LIVE) { |
235 | 120k | const auto& replication_info = VERIFY_RESULT(GetTableReplicationInfo(tablet->table())); |
236 | 120k | pb->CopyFrom(replication_info.live_replicas()); |
237 | 120k | return Status::OK(); |
238 | 843 | } |
239 | 843 | auto l = tablet->table()->LockForRead(); |
240 | 843 | if (state_->options_->type == READ_ONLY && |
241 | 843 | l->pb.has_replication_info() && |
242 | 0 | !l->pb.replication_info().read_replicas().empty()) { |
243 | 0 | pb->CopyFrom(GetReadOnlyPlacementFromUuid(l->pb.replication_info())); |
244 | 843 | } else { |
245 | 843 | pb->CopyFrom(GetClusterPlacementInfo()); |
246 | 843 | } |
247 | 843 | return Status::OK(); |
248 | 843 | } |
249 | | |
250 | 967k | Status ClusterLoadBalancer::UpdateTabletInfo(TabletInfo* tablet) { |
251 | 967k | const auto& table_id = tablet->table()->id(); |
252 | | // Set the placement information on a per-table basis, only once. |
253 | 967k | if (!state_->placement_by_table_.count(table_id)) { |
254 | 121k | PlacementInfoPB pb; |
255 | 121k | { |
256 | 121k | RETURN_NOT_OK(PopulatePlacementInfo(tablet, &pb)); |
257 | 121k | } |
258 | 121k | state_->placement_by_table_[table_id] = std::move(pb); |
259 | 121k | } |
260 | | |
261 | 967k | return state_->UpdateTablet(tablet); |
262 | 967k | } |
263 | | |
264 | 310k | const PlacementInfoPB& ClusterLoadBalancer::GetPlacementByTablet(const TabletId& tablet_id) const { |
265 | 310k | const auto& table_id = GetTabletMap().at(tablet_id)->table()->id(); |
266 | 310k | return state_->placement_by_table_.at(table_id); |
267 | 310k | } |
268 | | |
269 | 0 | size_t ClusterLoadBalancer::get_total_wrong_placement() const { |
270 | 0 | return state_->tablets_wrong_placement_.size(); |
271 | 0 | } |
272 | | |
273 | 0 | size_t ClusterLoadBalancer::get_total_blacklisted_servers() const { |
274 | 0 | return state_->blacklisted_servers_.size(); |
275 | 0 | } |
276 | | |
277 | 0 | size_t ClusterLoadBalancer::get_total_leader_blacklisted_servers() const { |
278 | 0 | return state_->leader_blacklisted_servers_.size(); |
279 | 0 | } |
280 | | |
281 | 121k | size_t ClusterLoadBalancer::get_total_over_replication() const { |
282 | 121k | return state_->tablets_over_replicated_.size(); |
283 | 121k | } |
284 | | |
285 | 0 | size_t ClusterLoadBalancer::get_total_under_replication() const { |
286 | 0 | return state_->tablets_missing_replicas_.size(); |
287 | 0 | } |
288 | | |
289 | 6 | size_t ClusterLoadBalancer::get_total_starting_tablets() const { |
290 | 6 | return global_state_->total_starting_tablets_; |
291 | 6 | } |
292 | | |
293 | 6 | int ClusterLoadBalancer::get_total_running_tablets() const { |
294 | 6 | return state_->total_running_; |
295 | 6 | } |
296 | | |
297 | 80.8k | bool ClusterLoadBalancer::IsLoadBalancerEnabled() const { |
298 | 80.8k | return FLAGS_enable_load_balancing && is_enabled_; |
299 | 80.8k | } |
300 | | |
301 | | // Load balancer class. |
302 | | ClusterLoadBalancer::ClusterLoadBalancer(CatalogManager* cm) |
303 | | : random_(GetRandomSeed32()), |
304 | | is_enabled_(FLAGS_enable_load_balancing), |
305 | 5.46k | cbuf_activities_(FLAGS_load_balancer_num_idle_runs) { |
306 | 5.46k | ResetGlobalState(false /* initialize_ts_descs */); |
307 | | |
308 | 5.46k | catalog_manager_ = cm; |
309 | 5.46k | } |
310 | | |
311 | | // Reduce remaining_tasks by pending_tasks value, after sanitizing inputs. |
312 | | template <class T> |
313 | 320k | void set_remaining(T pending_tasks, T* remaining_tasks) { |
314 | 320k | if (pending_tasks > *remaining_tasks) { |
315 | 0 | LOG(WARNING) << "Pending tasks > max allowed tasks: " << pending_tasks << " > " |
316 | 0 | << *remaining_tasks; |
317 | 0 | *remaining_tasks = 0; |
318 | 320k | } else { |
319 | 320k | *remaining_tasks -= pending_tasks; |
320 | 320k | } |
321 | 320k | } _ZN2yb6master13set_remainingIiEEvT_PS2_ Line | Count | Source | 313 | 199k | void set_remaining(T pending_tasks, T* remaining_tasks) { | 314 | 199k | if (pending_tasks > *remaining_tasks) { | 315 | 0 | LOG(WARNING) << "Pending tasks > max allowed tasks: " << pending_tasks << " > " | 316 | 0 | << *remaining_tasks; | 317 | 0 | *remaining_tasks = 0; | 318 | 199k | } else { | 319 | 199k | *remaining_tasks -= pending_tasks; | 320 | 199k | } | 321 | 199k | } |
_ZN2yb6master13set_remainingImEEvT_PS2_ Line | Count | Source | 313 | 121k | void set_remaining(T pending_tasks, T* remaining_tasks) { | 314 | 121k | if (pending_tasks > *remaining_tasks) { | 315 | 0 | LOG(WARNING) << "Pending tasks > max allowed tasks: " << pending_tasks << " > " | 316 | 0 | << *remaining_tasks; | 317 | 0 | *remaining_tasks = 0; | 318 | 121k | } else { | 319 | 121k | *remaining_tasks -= pending_tasks; | 320 | 121k | } | 321 | 121k | } |
|
322 | | |
323 | | // Needed as we have a unique_ptr to the forward declared PerTableLoadState class. |
324 | 94 | ClusterLoadBalancer::~ClusterLoadBalancer() = default; |
325 | | |
326 | 78.4k | void ClusterLoadBalancer::RunLoadBalancerWithOptions(Options* options) { |
327 | 78.4k | ResetGlobalState(); |
328 | | |
329 | 78.4k | uint32_t master_errors = 0; |
330 | | |
331 | 78.4k | if (!IsLoadBalancerEnabled()) { |
332 | 12.0k | LOG(INFO) << "Load balancing is not enabled."; |
333 | 12.0k | return; |
334 | 12.0k | } |
335 | | |
336 | 66.3k | if (!FLAGS_transaction_tables_use_preferred_zones) { |
337 | 0 | VLOG(1) << "Transaction tables will not respect leadership affinity."; |
338 | 66.3k | } |
339 | | |
340 | 66.3k | std::unique_ptr<Options> options_unique_ptr; |
341 | 66.3k | if (options == nullptr) { |
342 | 0 | options_unique_ptr = std::make_unique<Options>(); |
343 | 0 | options = options_unique_ptr.get(); |
344 | 0 | } |
345 | | |
346 | 66.3k | InitTablespaceManager(); |
347 | | |
348 | | // Lock the CatalogManager maps for the duration of the load balancer run. |
349 | 66.3k | CatalogManager::SharedLock lock(catalog_manager_->mutex_); |
350 | | |
351 | 66.3k | int remaining_adds = options->kMaxConcurrentAdds; |
352 | 66.3k | int remaining_removals = options->kMaxConcurrentRemovals; |
353 | 66.3k | int remaining_leader_moves = options->kMaxConcurrentLeaderMoves; |
354 | | |
355 | | // Loop over all tables to get the count of pending tasks. |
356 | 66.3k | int pending_add_replica_tasks = 0; |
357 | 66.3k | int pending_remove_replica_tasks = 0; |
358 | 66.3k | int pending_stepdown_leader_tasks = 0; |
359 | | |
360 | 8.64M | for (const auto& table : GetTableMap()) { |
361 | 8.64M | if (SkipLoadBalancing(*table.second)) { |
362 | | // Populate the list of tables for which LB has been skipped |
363 | | // in LB's internal vector. |
364 | 8.52M | skipped_tables_per_run_.push_back(table.second); |
365 | 8.52M | continue; |
366 | 8.52M | } |
367 | 121k | const TableId& table_id = table.first; |
368 | 121k | if (tablespace_manager_->NeedsRefreshToFindTablePlacement(table.second)) { |
369 | | // Placement information was not present in catalog manager cache. This is probably a |
370 | | // recently created table, skip load balancing for now, hopefully by the next run, |
371 | | // the background task in the catalog manager will pick up the placement information |
372 | | // for this table from the PG catalog tables. |
373 | | // TODO(deepthi) Keep track of the number of times this happens, take appropriate action |
374 | | // if placement stays missing over period of time. |
375 | 1 | YB_LOG_EVERY_N(INFO, 10) << "Skipping load balancing for table " << table.second->name() |
376 | 1 | << " as its placement information is not available yet"; |
377 | 1 | master_errors++; |
378 | 1 | continue; |
379 | 1 | } |
380 | 121k | ResetTableStatePtr(table_id, options); |
381 | | |
382 | 121k | bool is_txn_table = table.second->GetTableType() == TRANSACTION_STATUS_TABLE_TYPE; |
383 | 121k | state_->use_preferred_zones_ = !is_txn_table || FLAGS_transaction_tables_use_preferred_zones; |
384 | 121k | InitializeTSDescriptors(); |
385 | | |
386 | 121k | Status s = CountPendingTasksUnlocked(table_id, |
387 | 121k | &pending_add_replica_tasks, |
388 | 121k | &pending_remove_replica_tasks, |
389 | 121k | &pending_stepdown_leader_tasks); |
390 | 121k | if (!s.ok()) { |
391 | | // Found uninitialized ts_meta, so don't load balance this table yet. |
392 | 0 | LOG(WARNING) << "Skipping load balancing " << table.first << ": " << StatusToString(s); |
393 | 0 | per_table_states_.erase(table_id); |
394 | 0 | master_errors++; |
395 | 0 | continue; |
396 | 0 | } |
397 | 121k | } |
398 | | |
399 | 66.3k | if (pending_add_replica_tasks + pending_remove_replica_tasks + pending_stepdown_leader_tasks> 0) { |
400 | 573 | LOG(INFO) << "Total pending adds=" << pending_add_replica_tasks << ", total pending removals=" |
401 | 573 | << pending_remove_replica_tasks << ", total pending leader stepdowns=" |
402 | 573 | << pending_stepdown_leader_tasks; |
403 | 573 | if (PREDICT_FALSE(FLAGS_TEST_load_balancer_wait_after_count_pending_tasks_ms > 0)) { |
404 | 0 | LOG(INFO) << "Sleeping after finding pending tasks for " |
405 | 0 | << FLAGS_TEST_load_balancer_wait_after_count_pending_tasks_ms << " ms"; |
406 | 0 | SleepFor( |
407 | 0 | MonoDelta::FromMilliseconds(FLAGS_TEST_load_balancer_wait_after_count_pending_tasks_ms)); |
408 | 0 | } |
409 | 573 | } |
410 | | |
411 | 66.3k | set_remaining(pending_add_replica_tasks, &remaining_adds); |
412 | 66.3k | set_remaining(pending_remove_replica_tasks, &remaining_removals); |
413 | 66.3k | set_remaining(pending_stepdown_leader_tasks, &remaining_leader_moves); |
414 | | |
415 | | // At the start of the run, report LB state that might prevent it from running smoothly. |
416 | 66.3k | ReportUnusualLoadBalancerState(); |
417 | | |
418 | | // Loop over all tables to analyze the global and per-table load. |
419 | 8.64M | for (const auto& table : GetTableMap()) { |
420 | 8.64M | if (SkipLoadBalancing(*table.second)) { |
421 | 8.52M | continue; |
422 | 8.52M | } |
423 | | |
424 | 121k | auto it = per_table_states_.find(table.first); |
425 | 121k | if (it == per_table_states_.end()) { |
426 | | // If the table state doesn't exist, it was not fully initialized in the previous iteration. |
427 | 0 | VLOG(1) << "Unable to find the state for table " << table.first; |
428 | 1 | continue; |
429 | 1 | } |
430 | 121k | state_ = it->second.get(); |
431 | | |
432 | | // Prepare the in-memory structures. |
433 | 121k | auto handle_analyze_tablets = AnalyzeTabletsUnlocked(table.first); |
434 | 121k | if (!handle_analyze_tablets.ok()) { |
435 | 59 | LOG(WARNING) << "Skipping load balancing " << table.first << ": " |
436 | 59 | << StatusToString(handle_analyze_tablets); |
437 | 59 | per_table_states_.erase(table.first); |
438 | 59 | master_errors++; |
439 | 59 | } |
440 | 121k | } |
441 | | |
442 | 1 | VLOG(1) << "Number of remote bootstraps before running load balancer: " |
443 | 1 | << global_state_->total_starting_tablets_; |
444 | | |
445 | | // Iterate over all the tables to take actions based on the data collected on the previous loop. |
446 | 8.64M | for (const auto& table : GetTableMap()) { |
447 | 8.64M | state_ = nullptr; |
448 | 8.64M | if (remaining_adds == 0 && remaining_removals == 0 && remaining_leader_moves == 0) { |
449 | 0 | break; |
450 | 0 | } |
451 | 8.64M | if (SkipLoadBalancing(*table.second)) { |
452 | 8.52M | continue; |
453 | 8.52M | } |
454 | | |
455 | 121k | auto it = per_table_states_.find(table.first); |
456 | 121k | if (it == per_table_states_.end()) { |
457 | | // If the table state doesn't exist, it didn't get analyzed by the previous iteration. |
458 | 0 | VLOG(1) << "Unable to find table state for table " << table.first |
459 | 0 | << ". Skipping load balancing execution"; |
460 | 60 | continue; |
461 | 121k | } else { |
462 | 0 | VLOG(5) << "Load balancing table " << table.first; |
463 | 121k | } |
464 | 121k | state_ = it->second.get(); |
465 | | |
466 | | // We may have modified global loads, so we need to reset this state's load. |
467 | 121k | state_->SortLoad(); |
468 | | |
469 | | // Output parameters are unused in the load balancer, but useful in testing. |
470 | 121k | TabletId out_tablet_id; |
471 | 121k | TabletServerId out_from_ts; |
472 | 121k | TabletServerId out_to_ts; |
473 | | |
474 | | // Handle adding and moving replicas. |
475 | 122k | for ( ; remaining_adds > 0; --remaining_adds) { |
476 | 121k | if (state_->allow_only_leader_balancing_) { |
477 | 975 | YB_LOG_EVERY_N_SECS(INFO, 30) << "Skipping Add replicas. Only leader balancing table " |
478 | 38 | << table.first; |
479 | 975 | break; |
480 | 975 | } |
481 | 120k | auto handle_add = HandleAddReplicas(&out_tablet_id, &out_from_ts, &out_to_ts); |
482 | 120k | if (!handle_add.ok()) { |
483 | 1.28k | LOG(WARNING) << "Skipping add replicas for " << table.first << ": " |
484 | 1.28k | << StatusToString(handle_add); |
485 | 1.28k | master_errors++; |
486 | 1.28k | break; |
487 | 1.28k | } |
488 | 119k | if (!*handle_add) { |
489 | 118k | break; |
490 | 118k | } |
491 | 119k | } |
492 | 121k | if (PREDICT_FALSE(FLAGS_TEST_load_balancer_handle_under_replicated_tablets_only)) { |
493 | 210 | LOG(INFO) << "Skipping remove replicas and leader moves for " << table.first; |
494 | 210 | continue; |
495 | 210 | } |
496 | | |
497 | | // Handle cleanup after over-replication. |
498 | 122k | for ( ; remaining_removals > 0; --remaining_removals) { |
499 | 121k | if (state_->allow_only_leader_balancing_) { |
500 | 686 | YB_LOG_EVERY_N_SECS(INFO, 30) << "Skipping remove replicas. Only leader balancing table " |
501 | 27 | << table.first; |
502 | 686 | break; |
503 | 686 | } |
504 | 120k | auto handle_remove = HandleRemoveReplicas(&out_tablet_id, &out_from_ts); |
505 | 120k | if (!handle_remove.ok()) { |
506 | 0 | LOG(WARNING) << "Skipping remove replicas for " << table.first << ": " |
507 | 0 | << StatusToString(handle_remove); |
508 | 0 | master_errors++; |
509 | 0 | break; |
510 | 0 | } |
511 | 120k | if (!*handle_remove) { |
512 | 119k | break; |
513 | 119k | } |
514 | 120k | } |
515 | | |
516 | | // Handle tablet servers with too many leaders. |
517 | | // Check the current pending tasks per table to ensure we don't trigger the same task. |
518 | 121k | size_t table_remaining_leader_moves = state_->options_->kMaxConcurrentLeaderMovesPerTable; |
519 | 121k | set_remaining(state_->pending_stepdown_leader_tasks_[table.first].size(), |
520 | 121k | &table_remaining_leader_moves); |
521 | | // Keep track of both the global and per table limit on number of moves. |
522 | 121k | for ( ; |
523 | 126k | remaining_leader_moves > 0 && table_remaining_leader_moves > 0; |
524 | 120k | --remaining_leader_moves, --table_remaining_leader_moves) { |
525 | 120k | auto handle_leader = HandleLeaderMoves(&out_tablet_id, &out_from_ts, &out_to_ts); |
526 | 120k | if (!handle_leader.ok()) { |
527 | 1 | LOG(WARNING) << "Skipping leader moves for " << table.first << ": " |
528 | 1 | << StatusToString(handle_leader); |
529 | 1 | master_errors++; |
530 | 1 | break; |
531 | 1 | } |
532 | 120k | if (!*handle_leader) { |
533 | 114k | break; |
534 | 114k | } |
535 | 120k | } |
536 | 121k | } |
537 | | |
538 | 66.3k | RecordActivity(master_errors); |
539 | 66.3k | } |
540 | | |
541 | 77.4k | void ClusterLoadBalancer::RunLoadBalancer(Options* options) { |
542 | 77.4k | SysClusterConfigEntryPB config; |
543 | 77.4k | CHECK_OK(catalog_manager_->GetClusterConfig(&config)); |
544 | | |
545 | 77.4k | std::unique_ptr<Options> options_unique_ptr = |
546 | 77.4k | std::make_unique<Options>(); |
547 | 77.4k | Options* options_ent = options_unique_ptr.get(); |
548 | | // First, we load balance the live cluster. |
549 | 77.4k | options_ent->type = LIVE; |
550 | 77.4k | if (config.replication_info().live_replicas().has_placement_uuid()) { |
551 | 1.03k | options_ent->placement_uuid = config.replication_info().live_replicas().placement_uuid(); |
552 | 1.03k | options_ent->live_placement_uuid = options_ent->placement_uuid; |
553 | 76.4k | } else { |
554 | 76.4k | options_ent->placement_uuid = ""; |
555 | 76.4k | options_ent->live_placement_uuid = ""; |
556 | 76.4k | } |
557 | 77.4k | RunLoadBalancerWithOptions(options_ent); |
558 | | |
559 | | // Then, we balance all read-only clusters. |
560 | 77.4k | options_ent->type = READ_ONLY; |
561 | 78.4k | for (int i = 0; i < config.replication_info().read_replicas_size(); i++) { |
562 | 962 | const PlacementInfoPB& read_only_cluster = config.replication_info().read_replicas(i); |
563 | 962 | options_ent->placement_uuid = read_only_cluster.placement_uuid(); |
564 | 962 | RunLoadBalancerWithOptions(options_ent); |
565 | 962 | } |
566 | 77.4k | } |
567 | | |
568 | 66.3k | void ClusterLoadBalancer::RecordActivity(uint32_t master_errors) { |
569 | | // Update the list of tables for whom load-balancing has been |
570 | | // skipped in this run. |
571 | 66.3k | { |
572 | 66.3k | std::lock_guard<decltype(mutex_)> l(mutex_); |
573 | 66.3k | skipped_tables_ = skipped_tables_per_run_; |
574 | 66.3k | } |
575 | | |
576 | 66.3k | uint32_t table_tasks = 0; |
577 | 8.64M | for (const auto& table : GetTableMap()) { |
578 | 8.64M | table_tasks += table.second->NumLBTasks(); |
579 | 8.64M | } |
580 | | |
581 | 66.3k | struct ActivityInfo ai {table_tasks, master_errors}; |
582 | | |
583 | | // Update circular buffer summary. |
584 | | |
585 | 66.3k | if (ai.IsIdle()) { |
586 | 60.1k | num_idle_runs_++; |
587 | 6.24k | } else { |
588 | 0 | VLOG(1) << |
589 | 0 | Substitute("Load balancer has $0 table tasks and $1 master errors", |
590 | 0 | table_tasks, master_errors); |
591 | 6.24k | } |
592 | | |
593 | 66.3k | if (cbuf_activities_.full()) { |
594 | 57.6k | if (cbuf_activities_.front().IsIdle()) { |
595 | 51.8k | num_idle_runs_--; |
596 | 51.8k | } |
597 | 57.6k | } |
598 | | |
599 | | // Mutate circular buffer. |
600 | 66.3k | cbuf_activities_.push_back(std::move(ai)); |
601 | | |
602 | | // Update state. |
603 | 66.3k | is_idle_.store(num_idle_runs_ == cbuf_activities_.size(), std::memory_order_release); |
604 | | |
605 | | // Two interesting cases when updating can_perform_global_operations_ state: |
606 | | // If we previously couldn't balance global load, but now the LB is idle, enable global balancing. |
607 | | // If we previously could balance global load, but now the LB is busy, then it is busy balancing |
608 | | // global load or doing other operations (remove, etc.). In this case, we keep global balancing |
609 | | // enabled up until we perform a non-global balancing move (see GetLoadToMove()). |
610 | | // TODO(julien) some small improvements can be made here, such as ignoring leader stepdown tasks. |
611 | 66.3k | can_perform_global_operations_ = can_perform_global_operations_ || ai.IsIdle(); |
612 | 66.3k | } |
613 | | |
614 | 2.45k | Status ClusterLoadBalancer::IsIdle() const { |
615 | 2.45k | if (IsLoadBalancerEnabled() && !is_idle_.load(std::memory_order_acquire)) { |
616 | 1.90k | return STATUS( |
617 | 1.90k | IllegalState, |
618 | 1.90k | "Task or error encountered recently.", |
619 | 1.90k | MasterError(MasterErrorPB::LOAD_BALANCER_RECENTLY_ACTIVE)); |
620 | 1.90k | } |
621 | | |
622 | 551 | return Status::OK(); |
623 | 551 | } |
624 | | |
625 | 22.4k | bool ClusterLoadBalancer::CanBalanceGlobalLoad() const { |
626 | 22.4k | return FLAGS_enable_global_load_balancing && can_perform_global_operations_; |
627 | 22.4k | } |
628 | | |
629 | 66.3k | void ClusterLoadBalancer::ReportUnusualLoadBalancerState() const { |
630 | 195k | for (const auto& ts_desc : global_state_->ts_descs_) { |
631 | | // Report if any ts has a pending delete. |
632 | 195k | if (ts_desc->HasTabletDeletePending()) { |
633 | 7.51k | LOG(INFO) << Format("tablet server $0 has a pending delete for tablets $1", |
634 | 7.51k | ts_desc->permanent_uuid(), ts_desc->PendingTabletDeleteToString()); |
635 | 7.51k | } |
636 | 195k | } |
637 | 66.3k | } |
638 | | |
639 | 83.8k | void ClusterLoadBalancer::ResetGlobalState(bool initialize_ts_descs) { |
640 | 83.8k | per_table_states_.clear(); |
641 | 83.8k | global_state_ = std::make_unique<GlobalLoadState>(); |
642 | 83.8k | global_state_->drive_aware_ = FLAGS_load_balancer_drive_aware; |
643 | 83.8k | if (initialize_ts_descs) { |
644 | | // Only call GetAllDescriptors once for a LB run, and then cache it in global_state_. |
645 | 78.4k | GetAllDescriptors(&global_state_->ts_descs_); |
646 | 78.4k | } |
647 | 83.8k | skipped_tables_per_run_.clear(); |
648 | 83.8k | } |
649 | | |
650 | 121k | void ClusterLoadBalancer::ResetTableStatePtr(const TableId& table_id, Options* options) { |
651 | 121k | auto table_state = std::make_unique<PerTableLoadState>(global_state_.get()); |
652 | 121k | table_state->options_ = options; |
653 | 121k | state_ = table_state.get(); |
654 | 121k | per_table_states_[table_id] = std::move(table_state); |
655 | | |
656 | 121k | state_->table_id_ = table_id; |
657 | 121k | } |
658 | | |
659 | 121k | Status ClusterLoadBalancer::AnalyzeTabletsUnlocked(const TableId& table_uuid) { |
660 | 121k | auto tablets = VERIFY_RESULT_PREPEND( |
661 | 121k | GetTabletsForTable(table_uuid), "Skipping table " + table_uuid + "due to error: "); |
662 | | |
663 | | // Loop over tablet map to register the load that is already live in the cluster. |
664 | 967k | for (const auto& tablet : tablets) { |
665 | 967k | bool tablet_running = false; |
666 | 967k | { |
667 | 967k | auto tablet_lock = tablet->LockForRead(); |
668 | | |
669 | 967k | if (!tablet->table()) { |
670 | | // Tablet is orphaned or in preparing state, continue. |
671 | 0 | continue; |
672 | 0 | } |
673 | 967k | tablet_running = tablet_lock->is_running(); |
674 | 967k | } |
675 | | |
676 | | // This is from the perspective of the CatalogManager and the on-disk, persisted |
677 | | // SysCatalogStatePB. What this means is that this tablet was properly created as part of a |
678 | | // CreateTable and the information was sent to the initial set of TS and the tablet got to an |
679 | | // initial running state. |
680 | | // |
681 | | // This is different from the individual, per-TS state of the tablet, which can vary based on |
682 | | // the TS itself. The tablet can be registered as RUNNING, as far as the CatalogManager is |
683 | | // concerned, but just be underreplicated, and have some TS currently bootstrapping instances |
684 | | // of the tablet. |
685 | 967k | if (tablet_running) { |
686 | 967k | RETURN_NOT_OK(UpdateTabletInfo(tablet.get())); |
687 | 967k | } |
688 | 967k | } |
689 | | |
690 | | // After updating the tablets and tablet servers, adjust the configured threshold if it is too |
691 | | // low for the given configuration. |
692 | 121k | state_->AdjustLeaderBalanceThreshold(); |
693 | | |
694 | | // Once we've analyzed both the tablet server information as well as the tablets, we can sort the |
695 | | // load and are ready to apply the load balancing rules. |
696 | 121k | state_->SortLoad(); |
697 | | |
698 | | // Since leader load is only needed to rebalance leaders, we keep the sorting separate. |
699 | 121k | state_->SortLeaderLoad(); |
700 | | |
701 | 0 | VLOG(1) << Substitute( |
702 | 0 | "Table: $0. Total running tablets: $1. Total overreplication: $2. Total starting tablets: $3." |
703 | 0 | " Wrong placement: $4. BlackListed: $5. Total underreplication: $6, Leader BlackListed: $7", |
704 | 0 | table_uuid, get_total_running_tablets(), get_total_over_replication(), |
705 | 0 | get_total_starting_tablets(), get_total_wrong_placement(), get_total_blacklisted_servers(), |
706 | 0 | get_total_under_replication(), get_total_leader_blacklisted_servers()); |
707 | | |
708 | 966k | for (const auto& tablet : tablets) { |
709 | 966k | const auto& tablet_id = tablet->id(); |
710 | 966k | if (state_->pending_remove_replica_tasks_[table_uuid].count(tablet_id) > 0) { |
711 | 351 | RETURN_NOT_OK(state_->RemoveReplica( |
712 | 351 | tablet_id, state_->pending_remove_replica_tasks_[table_uuid][tablet_id])); |
713 | 351 | } |
714 | 966k | if (state_->pending_stepdown_leader_tasks_[table_uuid].count(tablet_id) > 0) { |
715 | 90 | const auto& tablet_meta = state_->per_tablet_meta_[tablet_id]; |
716 | 90 | const auto& from_ts = tablet_meta.leader_uuid; |
717 | 90 | const auto& to_ts = state_->pending_stepdown_leader_tasks_[table_uuid][tablet_id]; |
718 | 90 | RETURN_NOT_OK(state_->MoveLeader(tablet->id(), from_ts, to_ts)); |
719 | 90 | } |
720 | 966k | if (state_->pending_add_replica_tasks_[table_uuid].count(tablet_id) > 0) { |
721 | 180 | RETURN_NOT_OK(state_->AddReplica(tablet->id(), |
722 | 180 | state_->pending_add_replica_tasks_[table_uuid][tablet_id])); |
723 | 180 | } |
724 | 966k | } |
725 | | |
726 | 121k | return Status::OK(); |
727 | 121k | } |
728 | | |
729 | | Result<bool> ClusterLoadBalancer::HandleAddIfMissingPlacement( |
730 | 119k | TabletId* out_tablet_id, TabletServerId* out_to_ts) { |
731 | 2.96k | for (const auto& tablet_id : state_->tablets_missing_replicas_) { |
732 | 2.96k | const auto& tablet_meta = state_->per_tablet_meta_[tablet_id]; |
733 | 2.96k | const auto& placement_info = GetPlacementByTablet(tablet_id); |
734 | 2.96k | const auto& missing_placements = tablet_meta.under_replicated_placements; |
735 | | // Loop through TSs by load to find a TS that matches the placement needed and does not already |
736 | | // host this tablet. |
737 | 5.33k | for (const auto& ts_uuid : state_->sorted_load_) { |
738 | 5.33k | bool can_choose_ts = false; |
739 | | // If we had no placement information, it means we are just under-replicated, so just check |
740 | | // that we can use this tablet server. |
741 | 5.33k | if (placement_info.placement_blocks().empty()) { |
742 | | // No need to check placement info, as there is none. |
743 | 4.94k | can_choose_ts = VERIFY_RESULT(state_->CanAddTabletToTabletServer(tablet_id, ts_uuid)); |
744 | 389 | } else { |
745 | | // We added a tablet to the set with missing replicas both if it is under-replicated, and we |
746 | | // added a placement to the tablet_meta under_replicated_placements if the num replicas in |
747 | | // that placement is fewer than min_num_replicas. If the under-replicated tablet has a |
748 | | // placement that is under-replicated and the ts is not in that placement, then that ts |
749 | | // isn't valid. |
750 | 389 | const auto& ts_meta = state_->per_ts_meta_[ts_uuid]; |
751 | | // Either we have specific placement blocks that are under-replicated, so confirm |
752 | | // that this TS matches or all the placement blocks have min_num_replicas |
753 | | // but overall num_replicas is fewer than expected. |
754 | | // In the latter case, we still need to conform to the placement rules. |
755 | 389 | if (missing_placements.empty() || |
756 | 315 | tablet_meta.CanAddTSToMissingPlacements(ts_meta.descriptor)) { |
757 | | // If we don't have any missing placements but are under-replicated then we need to |
758 | | // validate placement information in order to avoid adding to a wrong placement block. |
759 | | // |
760 | | // Do the placement check for both the cases. |
761 | | // If we have missing placements then this check is a tautology otherwise it matters. |
762 | 189 | can_choose_ts = VERIFY_RESULT(state_->CanAddTabletToTabletServer(tablet_id, ts_uuid, |
763 | 189 | &placement_info)); |
764 | 189 | } |
765 | 389 | } |
766 | | // If we've passed the checks, then we can choose this TS to add the replica to. |
767 | 5.33k | if (can_choose_ts) { |
768 | 364 | *out_tablet_id = tablet_id; |
769 | 364 | *out_to_ts = ts_uuid; |
770 | 364 | RETURN_NOT_OK(AddReplica(tablet_id, ts_uuid)); |
771 | 364 | state_->tablets_missing_replicas_.erase(tablet_id); |
772 | 364 | return true; |
773 | 364 | } |
774 | 5.33k | } |
775 | 2.96k | } |
776 | 118k | return false; |
777 | 119k | } |
778 | | |
779 | | Result<bool> ClusterLoadBalancer::HandleAddIfWrongPlacement( |
780 | 118k | TabletId* out_tablet_id, TabletServerId* out_from_ts, TabletServerId* out_to_ts) { |
781 | 1.07k | for (const auto& tablet_id : state_->tablets_wrong_placement_) { |
782 | | // Skip this tablet, if it is already over-replicated, as it does not need another replica, it |
783 | | // should just have one removed in the removal step. |
784 | 1.07k | if (state_->tablets_over_replicated_.count(tablet_id)) { |
785 | 159 | continue; |
786 | 159 | } |
787 | 916 | if (VERIFY_RESULT(state_->CanSelectWrongReplicaToMove( |
788 | 252 | tablet_id, GetPlacementByTablet(tablet_id), out_from_ts, out_to_ts))) { |
789 | 252 | *out_tablet_id = tablet_id; |
790 | 252 | RETURN_NOT_OK(MoveReplica(tablet_id, *out_from_ts, *out_to_ts)); |
791 | 252 | return true; |
792 | 252 | } |
793 | 916 | } |
794 | 118k | return false; |
795 | 118k | } |
796 | | |
797 | | Result<bool> ClusterLoadBalancer::HandleAddReplicas( |
798 | 120k | TabletId* out_tablet_id, TabletServerId* out_from_ts, TabletServerId* out_to_ts) { |
799 | 120k | if (state_->options_->kAllowLimitStartingTablets) { |
800 | 120k | if (global_state_->total_starting_tablets_ >= state_->options_->kMaxTabletRemoteBootstraps) { |
801 | 0 | return STATUS_SUBSTITUTE(TryAgain, "Cannot add replicas. Currently remote bootstrapping $0 " |
802 | 0 | "tablets, when our max allowed is $1", |
803 | 0 | global_state_->total_starting_tablets_, state_->options_->kMaxTabletRemoteBootstraps); |
804 | 120k | } else if (state_->total_starting_ >= state_->options_->kMaxTabletRemoteBootstrapsPerTable) { |
805 | 401 | return STATUS_SUBSTITUTE(TryAgain, "Cannot add replicas. Currently remote bootstrapping $0 " |
806 | 401 | "tablets for table $1, when our max allowed is $2 per table", |
807 | 401 | state_->total_starting_, state_->table_id_, |
808 | 401 | state_->options_->kMaxTabletRemoteBootstrapsPerTable); |
809 | 401 | } |
810 | 120k | } |
811 | | |
812 | 120k | if (state_->options_->kAllowLimitOverReplicatedTablets && |
813 | 120k | get_total_over_replication() >= |
814 | 883 | implicit_cast<size_t>(state_->options_->kMaxOverReplicatedTablets)) { |
815 | 883 | return STATUS_SUBSTITUTE(TryAgain, |
816 | 883 | "Cannot add replicas. Currently have a total overreplication of $0, when max allowed is $1" |
817 | 883 | ", overreplicated tablets: $2", |
818 | 883 | get_total_over_replication(), state_->options_->kMaxOverReplicatedTablets, |
819 | 883 | boost::algorithm::join(state_->tablets_over_replicated_, ", ")); |
820 | 883 | } |
821 | | |
822 | 0 | VLOG(1) << "Number of global concurrent remote bootstrap sessions: " |
823 | 0 | << global_state_->total_starting_tablets_ |
824 | 0 | << ", max allowed: " << state_->options_->kMaxTabletRemoteBootstraps |
825 | 0 | << ". Number of concurrent remote bootstrap sessions for table " << state_->table_id_ |
826 | 0 | << ": " << state_->total_starting_ |
827 | 0 | << ", max allowed: " << state_->options_->kMaxTabletRemoteBootstrapsPerTable; |
828 | | |
829 | | // Handle missing placements with highest priority, as it means we're potentially |
830 | | // under-replicated. |
831 | 119k | if (VERIFY_RESULT(HandleAddIfMissingPlacement(out_tablet_id, out_to_ts))) { |
832 | 364 | return true; |
833 | 364 | } |
834 | | |
835 | | // Handle wrong placements as next priority, as these could be servers we're moving off of, so |
836 | | // we can decommission ASAP. |
837 | 118k | if (VERIFY_RESULT(HandleAddIfWrongPlacement(out_tablet_id, out_from_ts, out_to_ts))) { |
838 | 252 | return true; |
839 | 252 | } |
840 | | |
841 | | // Finally, handle normal load balancing. |
842 | 118k | if (!VERIFY_RESULT(GetLoadToMove(out_tablet_id, out_from_ts, out_to_ts))) { |
843 | 0 | VLOG(1) << "Cannot find any more tablets to move, under current constraints."; |
844 | 0 | if (VLOG_IS_ON(1)) { |
845 | 0 | DumpSortedLoad(); |
846 | 0 | } |
847 | 118k | return false; |
848 | 118k | } |
849 | | |
850 | 489 | return true; |
851 | 489 | } |
852 | | |
853 | 0 | void ClusterLoadBalancer::DumpSortedLoad() const { |
854 | 0 | ssize_t last_pos = state_->sorted_load_.size() - 1; |
855 | 0 | std::ostringstream out; |
856 | 0 | out << "Table load (global load): "; |
857 | 0 | for (ssize_t left = 0; left <= last_pos; ++left) { |
858 | 0 | const TabletServerId& uuid = state_->sorted_load_[left]; |
859 | 0 | auto load = state_->GetLoad(uuid); |
860 | 0 | out << uuid << ":" << load << " (" << global_state_->GetGlobalLoad(uuid) << ") "; |
861 | 0 | } |
862 | 0 | VLOG(1) << out.str(); |
863 | 0 | } |
864 | | |
865 | | Result<bool> ClusterLoadBalancer::GetLoadToMove( |
866 | 118k | TabletId* moving_tablet_id, TabletServerId* from_ts, TabletServerId* to_ts) { |
867 | 118k | if (state_->sorted_load_.empty()) { |
868 | 33 | return false; |
869 | 33 | } |
870 | | |
871 | | // Start with two indices pointing at left and right most ends of the sorted_load_ structure. |
872 | | // |
873 | | // We will try to find two TSs that have at least one tablet that can be moved amongst them, from |
874 | | // the higher load to the lower load TS. To do this, we will go through comparing the TSs |
875 | | // corresponding to our left and right indices, exclude tablets from the right, high loaded TS |
876 | | // according to our load balancing rules, such as load variance, starting tablets and not moving |
877 | | // already over-replicated tablets. We then compare the remaining set of tablets with the ones |
878 | | // hosted by the lower loaded TS and use ReservoirSample to pick a tablet from the set |
879 | | // difference. If there were no tablets to pick, we advance our state. |
880 | | // |
881 | | // The state is defined as the positions of the start and end indices. We always try to move the |
882 | | // right index back, until we cannot any more, due to either reaching the left index (cannot |
883 | | // rebalance from one TS to itself), or the difference of load between the two TSs is too low to |
884 | | // try to rebalance (if load variance is 1, it does not make sense to move tablets between the |
885 | | // TSs). When we cannot lower the right index any further, we reset it back to last_pos and |
886 | | // increment the left index. |
887 | | // |
888 | | // We stop the whole algorithm if the left index reaches last_pos, or if we reset the right index |
889 | | // and are already breaking the invariance rule, as that means that any further differences in |
890 | | // the interval between left and right cannot have load > kMinLoadVarianceToBalance. |
891 | 118k | ssize_t last_pos = state_->sorted_load_.size() - 1; |
892 | 125k | for (ssize_t left = 0; left <= last_pos; ++left) { |
893 | 144k | for (auto right = last_pos; right >= 0; --right) { |
894 | 144k | const TabletServerId& low_load_uuid = state_->sorted_load_[left]; |
895 | 144k | const TabletServerId& high_load_uuid = state_->sorted_load_[right]; |
896 | 144k | ssize_t load_variance = state_->GetLoad(high_load_uuid) - state_->GetLoad(low_load_uuid); |
897 | 144k | bool is_global_balancing_move = false; |
898 | | |
899 | | // Check for state change or end conditions. |
900 | 144k | if (left == right || load_variance < state_->options_->kMinLoadVarianceToBalance) { |
901 | | // Either both left and right are at the end, or there is no load_variance, which means |
902 | | // there will be no load_variance for any TSs between left and right, so we can return. |
903 | 125k | if (right == last_pos && load_variance == 0) { |
904 | 115k | return false; |
905 | 115k | } |
906 | | // If there is load variance, then there is a chance we can benefit from globally balancing. |
907 | 9.59k | if (load_variance > 0 && CanBalanceGlobalLoad()) { |
908 | 2.67k | int global_load_variance = global_state_->GetGlobalLoad(high_load_uuid) - |
909 | 2.67k | global_state_->GetGlobalLoad(low_load_uuid); |
910 | 2.67k | if (global_load_variance < state_->options_->kMinGlobalLoadVarianceToBalance) { |
911 | | // Already globally balanced. Since we are sorted by global load, we can return here as |
912 | | // there are no other moves for us to make. |
913 | 2.50k | return false; |
914 | 2.50k | } |
915 | | // Mark this move as a global balancing move and try to find a tablet to move. |
916 | 175 | is_global_balancing_move = true; |
917 | 6.92k | } else { |
918 | | // The load_variance is too low, which means we weren't able to find a load to move to |
919 | | // the left tserver. Continue and try with the next left tserver. |
920 | 6.92k | break; |
921 | 6.92k | } |
922 | 19.0k | } |
923 | | |
924 | | // If we don't find a tablet_id to move between these two TSs, advance the state. |
925 | 19.0k | if (VERIFY_RESULT(GetTabletToMove(high_load_uuid, low_load_uuid, moving_tablet_id))) { |
926 | | // If we got this far, we have the candidate we want, so fill in the output params and |
927 | | // return. The tablet_id is filled in from GetTabletToMove. |
928 | 489 | *from_ts = high_load_uuid; |
929 | 489 | *to_ts = low_load_uuid; |
930 | 489 | RETURN_NOT_OK(MoveReplica(*moving_tablet_id, high_load_uuid, low_load_uuid)); |
931 | | // Update global state if necessary. |
932 | 489 | if (!is_global_balancing_move) { |
933 | 473 | can_perform_global_operations_ = false; |
934 | 473 | } |
935 | 489 | return true; |
936 | 489 | } |
937 | 19.0k | } |
938 | 125k | } |
939 | | |
940 | | // Should never get here. |
941 | 0 | return STATUS(IllegalState, "Load balancing algorithm reached illegal state."); |
942 | 118k | } |
943 | | |
944 | | Result<bool> ClusterLoadBalancer::GetTabletToMove( |
945 | 19.0k | const TabletServerId& from_ts, const TabletServerId& to_ts, TabletId* moving_tablet_id) { |
946 | 19.0k | const auto& from_ts_meta = state_->per_ts_meta_[from_ts]; |
947 | | // If drive aware, all_tablets is sorted by decreasing drive load. |
948 | 19.0k | vector<set<TabletId>> all_tablets_by_drive = GetTabletsOnTSToMove(global_state_->drive_aware_, |
949 | 19.0k | from_ts_meta); |
950 | 19.0k | vector<set<TabletId>> all_filtered_tablets_by_drive; |
951 | 41.6k | for (const set<TabletId>& drive_tablets : all_tablets_by_drive) { |
952 | 41.6k | set<TabletId> filtered_drive_tablets; |
953 | 294k | for (const TabletId& tablet_id : drive_tablets) { |
954 | | // We don't want to add a new replica to an already over-replicated tablet. |
955 | | // |
956 | | // TODO(bogdan): should make sure we pick tablets that this TS is not a leader of, so we |
957 | | // can ensure HandleRemoveReplicas removes them from this TS. |
958 | 294k | if (state_->tablets_over_replicated_.count(tablet_id)) { |
959 | 6.02k | continue; |
960 | 6.02k | } |
961 | | // Don't move a replica right after split |
962 | 288k | if (ContainsKey(from_ts_meta.disabled_by_ts_tablets, tablet_id)) { |
963 | 0 | continue; |
964 | 0 | } |
965 | | |
966 | 288k | if (VERIFY_RESULT( |
967 | 22.7k | state_->CanAddTabletToTabletServer(tablet_id, to_ts, &GetPlacementByTablet(tablet_id)))) { |
968 | 22.7k | filtered_drive_tablets.insert(tablet_id); |
969 | 22.7k | } |
970 | 288k | } |
971 | 41.6k | all_filtered_tablets_by_drive.push_back(filtered_drive_tablets); |
972 | 41.6k | } |
973 | | |
974 | | // Below, we choose a tablet to move. We first filter out any tablets which cannot be moved |
975 | | // because of placement limitations. Then, we prioritize moving a tablet whose leader is in the |
976 | | // same zone/region it is moving to (for faster remote bootstrapping). |
977 | 41.0k | for (const set<TabletId>& drive_tablets : all_filtered_tablets_by_drive) { |
978 | 41.0k | bool found_tablet_to_move = false; |
979 | 41.0k | CatalogManagerUtil::CloudInfoSimilarity chosen_tablet_ci_similarity = |
980 | 41.0k | CatalogManagerUtil::NO_MATCH; |
981 | 17.6k | for (const TabletId& tablet_id : drive_tablets) { |
982 | 17.6k | const auto& placement_info = GetPlacementByTablet(tablet_id); |
983 | | // TODO(bogdan): this should be augmented as well to allow dropping by one replica, if still |
984 | | // leaving us with more than the minimum. |
985 | | // |
986 | | // If we have placement information, we want to only pick the tablet if it's moving to the |
987 | | // same placement, so we guarantee we're keeping the same type of distribution. |
988 | | // Since we allow prefixes as well, we can still respect the placement of this tablet |
989 | | // even if their placement ids aren't the same. An e.g. |
990 | | // placement info of tablet: C.R1.* |
991 | | // placement info of from_ts: C.R1.Z1 |
992 | | // placement info of to_ts: C.R2.Z2 |
993 | | // Note that we've assumed that for every TS there is a unique placement block to which it |
994 | | // can be mapped (see the validation rules in yb_admin-client). If there is no unique |
995 | | // placement block then it is simply the C.R.Z of the TS itself. |
996 | 17.6k | auto from_ts_block = state_->GetValidPlacement(from_ts, &placement_info); |
997 | 17.6k | auto to_ts_block = state_->GetValidPlacement(to_ts, &placement_info); |
998 | 17.6k | bool same_placement = false; |
999 | 17.6k | if (to_ts_block.has_value() && from_ts_block.has_value()) { |
1000 | 17.6k | same_placement = TSDescriptor::generate_placement_id(*from_ts_block) == |
1001 | 17.6k | TSDescriptor::generate_placement_id(*to_ts_block); |
1002 | 17.6k | } |
1003 | | |
1004 | 17.6k | if (!placement_info.placement_blocks().empty() && !same_placement) { |
1005 | 13.0k | continue; |
1006 | 13.0k | } |
1007 | | |
1008 | 4.59k | TabletServerId leader_ts = state_->per_tablet_meta_[tablet_id].leader_uuid; |
1009 | 4.59k | auto ci_similarity = CatalogManagerUtil::CloudInfoSimilarity::NO_MATCH; |
1010 | 4.59k | if (!leader_ts.empty() && !FLAGS_load_balancer_ignore_cloud_info_similarity) { |
1011 | 4.36k | const auto leader_ci = state_->per_ts_meta_[leader_ts].descriptor->GetCloudInfo(); |
1012 | 4.36k | const auto to_ts_ci = state_->per_ts_meta_[to_ts].descriptor->GetCloudInfo(); |
1013 | 4.36k | ci_similarity = CatalogManagerUtil::ComputeCloudInfoSimilarity(leader_ci, to_ts_ci); |
1014 | 4.36k | } |
1015 | | |
1016 | 4.59k | if (found_tablet_to_move && ci_similarity <= chosen_tablet_ci_similarity) { |
1017 | 4.08k | continue; |
1018 | 4.08k | } |
1019 | | // This is the best tablet to move, so far. |
1020 | 509 | found_tablet_to_move = true; |
1021 | 509 | *moving_tablet_id = tablet_id; |
1022 | 509 | chosen_tablet_ci_similarity = ci_similarity; |
1023 | 509 | } |
1024 | | |
1025 | | // If there is any tablet we can move from this drive, choose it and return. |
1026 | 41.0k | if (found_tablet_to_move) { |
1027 | 489 | return true; |
1028 | 489 | } |
1029 | 41.0k | } |
1030 | | |
1031 | 18.6k | return false; |
1032 | 19.0k | } |
1033 | | |
1034 | | Result<bool> ClusterLoadBalancer::GetLeaderToMove(TabletId* moving_tablet_id, |
1035 | | TabletServerId* from_ts, |
1036 | | TabletServerId* to_ts, |
1037 | 120k | std::string* to_ts_path) { |
1038 | 120k | if (state_->sorted_leader_load_.empty()) { |
1039 | 2.61k | return false; |
1040 | 2.61k | } |
1041 | | |
1042 | | // Find out if there are leaders to be moved. |
1043 | 118k | for (auto right = state_->sorted_leader_load_.size(); right > 0;) { |
1044 | 118k | --right; |
1045 | 118k | const TabletServerId& high_load_uuid = state_->sorted_leader_load_[right]; |
1046 | 118k | auto high_leader_blacklisted = (state_->leader_blacklisted_servers_.find(high_load_uuid) != |
1047 | 118k | state_->leader_blacklisted_servers_.end()); |
1048 | 118k | if (high_leader_blacklisted) { |
1049 | 605 | auto high_load = state_->GetLeaderLoad(high_load_uuid); |
1050 | 605 | if (high_load > 0) { |
1051 | | // Leader blacklisted tserver with a leader replica. |
1052 | 35 | break; |
1053 | 570 | } else { |
1054 | | // Leader blacklisted tserver without leader replica. |
1055 | 570 | continue; |
1056 | 570 | } |
1057 | 117k | } else { |
1058 | 117k | if (state_->IsLeaderLoadBelowThreshold(state_->sorted_leader_load_[right])) { |
1059 | | // Non-leader blacklisted tserver with not too many leader replicas. |
1060 | | // TODO(Sanket): Even though per table load is below the configured threshold, |
1061 | | // we might want to do global leader balancing above a certain threshold that is lower |
1062 | | // than the per table threshold. Can add another gflag/knob here later. |
1063 | 10 | return false; |
1064 | 117k | } else { |
1065 | | // Non-leader blacklisted tserver with too many leader replicas. |
1066 | 117k | break; |
1067 | 117k | } |
1068 | 117k | } |
1069 | 118k | } |
1070 | | |
1071 | | // The algorithm to balance the leaders is very similar to the one for tablets: |
1072 | | // |
1073 | | // Start with two indices pointing at left and right most ends of the sorted_leader_load_ |
1074 | | // structure. Note that leader blacklisted tserver is considered as having infinite leader load. |
1075 | | // |
1076 | | // We will try to find two TSs that have at least one leader that can be moved amongst them, from |
1077 | | // the higher load to the lower load TS. To do this, we will go through comparing the TSs |
1078 | | // corresponding to our left and right indices. We go through leaders on the higher loaded TS |
1079 | | // and find a running replica on the lower loaded TS to move the leader. If no leader can be |
1080 | | // be picked, we advance our state. |
1081 | | // |
1082 | | // The state is defined as the positions of the start and end indices. We always try to move the |
1083 | | // right index back, until we cannot any more, due to either reaching the left index (cannot |
1084 | | // rebalance from one TS to itself), or the difference of load between the two TSs is too low to |
1085 | | // try to rebalance (if load variance is 1, it does not make sense to move leaders between the |
1086 | | // TSs). When we cannot lower the right index any further, we reset it back to last_pos and |
1087 | | // increment the left index. |
1088 | | // |
1089 | | // We stop the whole algorithm if the left index reaches last_pos, or if we reset the right index |
1090 | | // and are already breaking the invariance rule, as that means that any further differences in |
1091 | | // the interval between left and right cannot have load > kMinLeaderLoadVarianceToBalance. |
1092 | 117k | const auto current_time = MonoTime::Now(); |
1093 | 117k | ssize_t last_pos = state_->sorted_leader_load_.size() - 1; |
1094 | 125k | for (ssize_t left = 0; left <= last_pos; ++left) { |
1095 | 125k | const TabletServerId& low_load_uuid = state_->sorted_leader_load_[left]; |
1096 | 125k | auto low_leader_blacklisted = (state_->leader_blacklisted_servers_.find(low_load_uuid) != |
1097 | 125k | state_->leader_blacklisted_servers_.end()); |
1098 | 125k | if (low_leader_blacklisted) { |
1099 | | // Left marker has gone beyond non-leader blacklisted tservers. |
1100 | 547 | return false; |
1101 | 547 | } |
1102 | | |
1103 | 141k | for (auto right = last_pos; right >= 0; --right) { |
1104 | 141k | const TabletServerId& high_load_uuid = state_->sorted_leader_load_[right]; |
1105 | 141k | auto high_leader_blacklisted = (state_->leader_blacklisted_servers_.find(high_load_uuid) != |
1106 | 141k | state_->leader_blacklisted_servers_.end()); |
1107 | 141k | ssize_t load_variance = |
1108 | 141k | state_->GetLeaderLoad(high_load_uuid) - state_->GetLeaderLoad(low_load_uuid); |
1109 | | |
1110 | 141k | bool is_global_balancing_move = false; |
1111 | | |
1112 | | // Check for state change or end conditions. |
1113 | 141k | if (left == right || (load_variance < state_->options_->kMinLeaderLoadVarianceToBalance && |
1114 | 119k | !high_leader_blacklisted)) { |
1115 | | // Global leader balancing only if per table variance is > 0. |
1116 | | // If both left and right are same (i.e. load_variance is 0) and right is last_pos |
1117 | | // or right is last_pos and load_variance is 0 then we can return as we don't |
1118 | | // have any other moves to make. |
1119 | 119k | if (load_variance == 0 && right == last_pos) { |
1120 | 96.4k | return false; |
1121 | 96.4k | } |
1122 | | // Check if we can benefit from global leader balancing. |
1123 | | // If we have > 0 load_variance and there are no per table moves left. |
1124 | 23.0k | if (load_variance > 0 && CanBalanceGlobalLoad()) { |
1125 | 15.4k | int global_load_variance = state_->global_state_->GetGlobalLeaderLoad(high_load_uuid) - |
1126 | 15.4k | state_->global_state_->GetGlobalLeaderLoad(low_load_uuid); |
1127 | | // Already globally balanced. Since we are sorted by global load, we can return here as |
1128 | | // there are no other moves for us to make. |
1129 | 15.4k | if (global_load_variance < state_->options_->kMinGlobalLeaderLoadVarianceToBalance) { |
1130 | 15.1k | return false; |
1131 | 15.1k | } |
1132 | 260 | is_global_balancing_move = true; |
1133 | 7.67k | } else { |
1134 | 7.67k | break; |
1135 | 7.67k | } |
1136 | 22.3k | } |
1137 | | |
1138 | | // Find the leaders on the higher loaded TS that have running peers on the lower loaded TS. |
1139 | | // If there are, we have a candidate we want, so fill in the output params and return. |
1140 | 22.3k | const set<TabletId>& leaders = state_->per_ts_meta_[high_load_uuid].leaders; |
1141 | 22.3k | for (const auto& tablet : GetLeadersOnTSToMove(global_state_->drive_aware_, |
1142 | 22.3k | leaders, |
1143 | 5.59k | state_->per_ts_meta_[low_load_uuid])) { |
1144 | 5.59k | *moving_tablet_id = tablet.first; |
1145 | 5.59k | *to_ts_path = tablet.second; |
1146 | 5.59k | *from_ts = high_load_uuid; |
1147 | 5.59k | *to_ts = low_load_uuid; |
1148 | | |
1149 | 5.59k | const auto& per_tablet_meta = state_->per_tablet_meta_; |
1150 | 5.59k | const auto tablet_meta_iter = per_tablet_meta.find(tablet.first); |
1151 | 5.59k | if (PREDICT_TRUE(tablet_meta_iter != per_tablet_meta.end())) { |
1152 | 5.59k | const auto& tablet_meta = tablet_meta_iter->second; |
1153 | 5.59k | const auto& stepdown_failures = tablet_meta.leader_stepdown_failures; |
1154 | 5.59k | const auto stepdown_failure_iter = stepdown_failures.find(low_load_uuid); |
1155 | 5.59k | if (stepdown_failure_iter != stepdown_failures.end()) { |
1156 | 137 | const auto time_since_failure = current_time - stepdown_failure_iter->second; |
1157 | 137 | if (time_since_failure.ToMilliseconds() < FLAGS_min_leader_stepdown_retry_interval_ms) { |
1158 | 137 | LOG(INFO) << "Cannot move tablet " << tablet.first << " leader from TS " |
1159 | 137 | << *from_ts << " to TS " << *to_ts << " yet: previous attempt with the same" |
1160 | 137 | << " intended leader failed only " << ToString(time_since_failure) |
1161 | 137 | << " ago (less " << "than " << FLAGS_min_leader_stepdown_retry_interval_ms |
1162 | 137 | << "ms)."; |
1163 | 137 | } |
1164 | 137 | continue; |
1165 | 137 | } |
1166 | 0 | } else { |
1167 | 0 | LOG(WARNING) << "Did not find load balancer metadata for tablet " << *moving_tablet_id; |
1168 | 0 | } |
1169 | | |
1170 | | // Leader movement solely due to leader blacklist. |
1171 | 5.46k | if (load_variance < state_->options_->kMinLeaderLoadVarianceToBalance && |
1172 | 160 | high_leader_blacklisted) { |
1173 | 32 | state_->LogSortedLeaderLoad(); |
1174 | 32 | LOG(INFO) << "Move tablet " << tablet.first << " leader from leader blacklisted TS " |
1175 | 32 | << *from_ts << " to TS " << *to_ts; |
1176 | 32 | } |
1177 | 5.46k | if (!is_global_balancing_move) { |
1178 | 5.33k | can_perform_global_operations_ = false; |
1179 | 5.33k | } |
1180 | 5.46k | return true; |
1181 | 5.59k | } |
1182 | 22.3k | } |
1183 | 124k | } |
1184 | | |
1185 | | // Should never get here. |
1186 | 0 | FATAL_ERROR("Load balancing algorithm reached invalid state!"); |
1187 | 0 | } |
1188 | | |
1189 | | Result<bool> ClusterLoadBalancer::HandleRemoveReplicas( |
1190 | 120k | TabletId* out_tablet_id, TabletServerId* out_from_ts) { |
1191 | | // Give high priority to removing tablets that are not respecting the placement policy. |
1192 | 120k | if (VERIFY_RESULT(HandleRemoveIfWrongPlacement(out_tablet_id, out_from_ts))) { |
1193 | 481 | return true; |
1194 | 481 | } |
1195 | | |
1196 | 120k | for (const auto& tablet_id : state_->tablets_over_replicated_) { |
1197 | | // Skip if there is a pending ADD_SERVER. |
1198 | 2.02k | if (VERIFY_RESULT(IsConfigMemberInTransitionMode(tablet_id)) || |
1199 | 1.39k | state_->per_tablet_meta_[tablet_id].starting > 0) { |
1200 | 1.38k | continue; |
1201 | 1.38k | } |
1202 | | |
1203 | 640 | const auto& tablet_meta = state_->per_tablet_meta_[tablet_id]; |
1204 | 640 | const auto& tablet_servers = tablet_meta.over_replicated_tablet_servers; |
1205 | 640 | auto comparator = PerTableLoadState::Comparator(state_); |
1206 | 640 | vector<TabletServerId> sorted_ts; |
1207 | | // Don't include any tservers where this tablet is still starting. |
1208 | 640 | std::copy_if( |
1209 | 640 | tablet_servers.begin(), tablet_servers.end(), std::back_inserter(sorted_ts), |
1210 | 2.36k | [&](const TabletServerId& ts_uuid) { |
1211 | 2.36k | return !state_->per_ts_meta_[ts_uuid].starting_tablets.count(tablet_id); |
1212 | 2.36k | }); |
1213 | 640 | if (sorted_ts.empty()) { |
1214 | 0 | return STATUS_SUBSTITUTE(IllegalState, "No tservers to remove from over-replicated " |
1215 | 0 | "tablet $0", tablet_id); |
1216 | 0 | } |
1217 | | // Sort in reverse to first try to remove a replica from the highest loaded TS. |
1218 | 640 | sort(sorted_ts.rbegin(), sorted_ts.rend(), comparator); |
1219 | 640 | string remove_candidate = sorted_ts[0]; |
1220 | 640 | *out_tablet_id = tablet_id; |
1221 | 640 | *out_from_ts = remove_candidate; |
1222 | | // Do force leader stepdown, as we are either not the leader or we are allowed to step down. |
1223 | 640 | RETURN_NOT_OK(RemoveReplica(tablet_id, remove_candidate)); |
1224 | 640 | return true; |
1225 | 640 | } |
1226 | 119k | return false; |
1227 | 120k | } |
1228 | | |
1229 | | Result<bool> ClusterLoadBalancer::HandleRemoveIfWrongPlacement( |
1230 | 120k | TabletId* out_tablet_id, TabletServerId* out_from_ts) { |
1231 | 2.69k | for (const auto& tablet_id : state_->tablets_wrong_placement_) { |
1232 | 2.69k | LOG(INFO) << "Processing tablet " << tablet_id; |
1233 | | // Skip this tablet if it is not over-replicated. |
1234 | 2.69k | if (!state_->tablets_over_replicated_.count(tablet_id)) { |
1235 | 2.02k | continue; |
1236 | 2.02k | } |
1237 | | // Skip if there is a pending ADD_SERVER |
1238 | 667 | if (VERIFY_RESULT(IsConfigMemberInTransitionMode(tablet_id))) { |
1239 | 186 | continue; |
1240 | 186 | } |
1241 | 481 | const auto& tablet_meta = state_->per_tablet_meta_[tablet_id]; |
1242 | 481 | TabletServerId target_uuid; |
1243 | | // Prioritize blacklisted servers, if any. |
1244 | 481 | if (!tablet_meta.blacklisted_tablet_servers.empty()) { |
1245 | 468 | target_uuid = *tablet_meta.blacklisted_tablet_servers.begin(); |
1246 | 468 | } |
1247 | | // If no blacklisted server could be chosen, try the wrong placement ones. |
1248 | 481 | if (target_uuid.empty()) { |
1249 | 13 | if (!tablet_meta.wrong_placement_tablet_servers.empty()) { |
1250 | 13 | target_uuid = *tablet_meta.wrong_placement_tablet_servers.begin(); |
1251 | 13 | } |
1252 | 13 | } |
1253 | | // If we found a tablet server, choose it. |
1254 | 481 | if (!target_uuid.empty()) { |
1255 | 481 | *out_tablet_id = tablet_id; |
1256 | 481 | *out_from_ts = std::move(target_uuid); |
1257 | | // Force leader stepdown if we have wrong placements or blacklisted servers. |
1258 | 481 | RETURN_NOT_OK(RemoveReplica(tablet_id, *out_from_ts)); |
1259 | 481 | return true; |
1260 | 481 | } |
1261 | 481 | } |
1262 | 120k | return false; |
1263 | 120k | } |
1264 | | |
1265 | | Result<bool> ClusterLoadBalancer::HandleLeaderLoadIfNonAffinitized(TabletId* moving_tablet_id, |
1266 | | TabletServerId* from_ts, |
1267 | | TabletServerId* to_ts, |
1268 | 98.5k | std::string* to_ts_path) { |
1269 | | // Similar to normal leader balancing, we double iterate from most loaded to least loaded |
1270 | | // non-affinitized nodes and least to most affinitized nodes. For each pair, we check whether |
1271 | | // there is any tablet intersection and if so, there is a match and we return true. |
1272 | | // |
1273 | | // If we go through all the node pairs or we see that the current non-affinitized |
1274 | | // leader load is 0, we know that there is no match from non-affinitized to affinitized nodes |
1275 | | // and we return false. |
1276 | 98.5k | for (size_t non_affinitized_idx = state_->sorted_non_affinitized_leader_load_.size(); |
1277 | 98.8k | non_affinitized_idx > 0;) { |
1278 | 458 | --non_affinitized_idx; |
1279 | 458 | const TabletServerId& non_affinitized_uuid = |
1280 | 458 | state_->sorted_non_affinitized_leader_load_[non_affinitized_idx]; |
1281 | 458 | if (state_->GetLeaderLoad(non_affinitized_uuid) == 0) { |
1282 | | // All subsequent non-affinitized nodes have no leaders, no match found. |
1283 | 79 | return false; |
1284 | 79 | } |
1285 | 379 | const set<TabletId>& leaders = state_->per_ts_meta_[non_affinitized_uuid].leaders; |
1286 | 60 | for (const auto& affinitized_uuid : state_->sorted_leader_load_) { |
1287 | 60 | auto peers = GetLeadersOnTSToMove(global_state_->drive_aware_, |
1288 | 60 | leaders, |
1289 | 60 | state_->per_ts_meta_[affinitized_uuid]); |
1290 | | |
1291 | 60 | if (!peers.empty()) { |
1292 | 60 | auto peer = peers.begin(); |
1293 | 60 | *moving_tablet_id = peer->first; |
1294 | 60 | *to_ts_path = peer->first; |
1295 | 60 | *from_ts = non_affinitized_uuid; |
1296 | 60 | *to_ts = affinitized_uuid; |
1297 | 60 | return true; |
1298 | 60 | } |
1299 | 60 | } |
1300 | 379 | } |
1301 | 98.4k | return false; |
1302 | 98.5k | } |
1303 | | |
1304 | | Result<bool> ClusterLoadBalancer::HandleLeaderMoves( |
1305 | 120k | TabletId* out_tablet_id, TabletServerId* out_from_ts, TabletServerId* out_to_ts) { |
1306 | | // If the user sets 'transaction_tables_use_preferred_zones' gflag to 0 and the tablet |
1307 | | // being balanced is a transaction tablet, then logical flow will be changed to ignore |
1308 | | // preferred zones and instead proceed to normal leader balancing. |
1309 | 120k | string out_ts_ts_path; |
1310 | 120k | if (state_->use_preferred_zones_ && |
1311 | 98.5k | VERIFY_RESULT(HandleLeaderLoadIfNonAffinitized( |
1312 | 60 | out_tablet_id, out_from_ts, out_to_ts, &out_ts_ts_path))) { |
1313 | 60 | RETURN_NOT_OK(MoveLeader(*out_tablet_id, *out_from_ts, *out_to_ts, out_ts_ts_path)); |
1314 | 60 | return true; |
1315 | 120k | } |
1316 | | |
1317 | 120k | if (VERIFY_RESULT(GetLeaderToMove(out_tablet_id, out_from_ts, out_to_ts, &out_ts_ts_path))) { |
1318 | 5.46k | RETURN_NOT_OK(MoveLeader(*out_tablet_id, *out_from_ts, *out_to_ts, out_ts_ts_path)); |
1319 | 5.46k | return true; |
1320 | 114k | } |
1321 | 114k | return false; |
1322 | 114k | } |
1323 | | |
1324 | | Status ClusterLoadBalancer::MoveReplica( |
1325 | 741 | const TabletId& tablet_id, const TabletServerId& from_ts, const TabletServerId& to_ts) { |
1326 | 741 | LOG(INFO) << Substitute("Moving tablet $0 from $1 to $2", tablet_id, from_ts, to_ts); |
1327 | 741 | RETURN_NOT_OK(SendReplicaChanges(GetTabletMap().at(tablet_id), to_ts, true /* is_add */, |
1328 | 741 | true /* should_remove_leader */)); |
1329 | 741 | RETURN_NOT_OK(state_->AddReplica(tablet_id, to_ts)); |
1330 | 741 | return GetAtomicFlag(&FLAGS_load_balancer_count_move_as_add) ? |
1331 | 741 | Status::OK() : state_->RemoveReplica(tablet_id, from_ts); |
1332 | 741 | } |
1333 | | |
1334 | 364 | Status ClusterLoadBalancer::AddReplica(const TabletId& tablet_id, const TabletServerId& to_ts) { |
1335 | 364 | LOG(INFO) << Substitute("Adding tablet $0 to $1", tablet_id, to_ts); |
1336 | | // This is an add operation, so the "should_remove_leader" flag is irrelevant. |
1337 | 364 | RETURN_NOT_OK(SendReplicaChanges(GetTabletMap().at(tablet_id), to_ts, true /* is_add */, |
1338 | 364 | true /* should_remove_leader */)); |
1339 | 364 | return state_->AddReplica(tablet_id, to_ts); |
1340 | 364 | } |
1341 | | |
1342 | | Status ClusterLoadBalancer::RemoveReplica( |
1343 | 1.15k | const TabletId& tablet_id, const TabletServerId& ts_uuid) { |
1344 | 1.15k | LOG(INFO) << Substitute("Removing replica $0 from tablet $1", ts_uuid, tablet_id); |
1345 | 1.15k | RETURN_NOT_OK(SendReplicaChanges(GetTabletMap().at(tablet_id), ts_uuid, false /* is_add */, |
1346 | 1.15k | true /* should_remove_leader */)); |
1347 | 1.15k | return state_->RemoveReplica(tablet_id, ts_uuid); |
1348 | 1.15k | } |
1349 | | |
1350 | | Status ClusterLoadBalancer::MoveLeader(const TabletId& tablet_id, |
1351 | | const TabletServerId& from_ts, |
1352 | | const TabletServerId& to_ts, |
1353 | 5.52k | const string& to_ts_path) { |
1354 | 5.52k | LOG(INFO) << Substitute("Moving leader of $0 from TS $1 to $2", tablet_id, from_ts, to_ts); |
1355 | 5.52k | RETURN_NOT_OK(SendReplicaChanges(GetTabletMap().at(tablet_id), from_ts, false /* is_add */, |
1356 | 5.52k | false /* should_remove_leader */, to_ts)); |
1357 | | |
1358 | 5.52k | return state_->MoveLeader(tablet_id, from_ts, to_ts, to_ts_path); |
1359 | 5.52k | } |
1360 | | |
1361 | 99.6k | void ClusterLoadBalancer::GetAllAffinitizedZones(AffinitizedZonesSet* affinitized_zones) const { |
1362 | 99.6k | SysClusterConfigEntryPB config; |
1363 | 99.6k | CHECK_OK(catalog_manager_->GetClusterConfig(&config)); |
1364 | 99.6k | const int num_zones = config.replication_info().affinitized_leaders_size(); |
1365 | 99.8k | for (int i = 0; i < num_zones; i++) { |
1366 | 225 | CloudInfoPB ci = config.replication_info().affinitized_leaders(i); |
1367 | 225 | affinitized_zones->insert(ci); |
1368 | 225 | } |
1369 | 99.6k | } |
1370 | | |
1371 | 121k | void ClusterLoadBalancer::InitializeTSDescriptors() { |
1372 | 121k | if (state_->use_preferred_zones_) { |
1373 | 99.6k | GetAllAffinitizedZones(&state_->affinitized_zones_); |
1374 | 99.6k | } |
1375 | | // Set the blacklist so we can also mark the tablet servers as we add them up. |
1376 | 121k | state_->SetBlacklist(GetServerBlacklist()); |
1377 | | |
1378 | | // Set the leader blacklist so we can also mark the tablet servers as we add them up. |
1379 | 121k | state_->SetLeaderBlacklist(GetLeaderBlacklist()); |
1380 | | |
1381 | | // Loop over tablet servers to set empty defaults, so we can also have info on those |
1382 | | // servers that have yet to receive load (have heartbeated to the master, but have not been |
1383 | | // assigned any tablets yet). |
1384 | 382k | for (const auto& ts_desc : global_state_->ts_descs_) { |
1385 | 382k | state_->UpdateTabletServer(ts_desc); |
1386 | 382k | } |
1387 | 121k | } |
1388 | | |
1389 | | // CatalogManager indirection methods that are set as virtual to be bypassed in testing. |
1390 | | // |
1391 | 0 | void ClusterLoadBalancer::GetAllReportedDescriptors(TSDescriptorVector* ts_descs) const { |
1392 | 0 | catalog_manager_->master_->ts_manager()->GetAllReportedDescriptors(ts_descs); |
1393 | 0 | } |
1394 | | |
1395 | 78.4k | void ClusterLoadBalancer::GetAllDescriptors(TSDescriptorVector* ts_descs) const { |
1396 | 78.4k | catalog_manager_->master_->ts_manager()->GetAllDescriptors(ts_descs); |
1397 | 78.4k | } |
1398 | | |
1399 | 318k | const TabletInfoMap& ClusterLoadBalancer::GetTabletMap() const { |
1400 | 318k | return *catalog_manager_->tablet_map_; |
1401 | 318k | } |
1402 | | |
1403 | 121k | const scoped_refptr<TableInfo> ClusterLoadBalancer::GetTableInfo(const TableId& table_uuid) const { |
1404 | 121k | return catalog_manager_->GetTableInfoUnlocked(table_uuid); |
1405 | 121k | } |
1406 | | |
1407 | 121k | Result<TabletInfos> ClusterLoadBalancer::GetTabletsForTable(const TableId& table_uuid) const { |
1408 | 121k | auto table_info = GetTableInfo(table_uuid); |
1409 | | |
1410 | 121k | if (table_info == nullptr) { |
1411 | 0 | return STATUS_FORMAT( |
1412 | 0 | InvalidArgument, "Invalid UUID '$0' - no entry found in catalog manager table map", |
1413 | 0 | table_uuid); |
1414 | 0 | } |
1415 | | |
1416 | 121k | return table_info->GetTablets(IncludeInactive(!FLAGS_TEST_load_balancer_skip_inactive_tablets)); |
1417 | 121k | } |
1418 | | |
1419 | 265k | const TableInfoMap& ClusterLoadBalancer::GetTableMap() const { |
1420 | 265k | return *catalog_manager_->table_ids_map_; |
1421 | 265k | } |
1422 | | |
1423 | 120k | const ReplicationInfoPB& ClusterLoadBalancer::GetClusterReplicationInfo() const { |
1424 | 120k | return catalog_manager_->cluster_config_->LockForRead()->pb.replication_info(); |
1425 | 120k | } |
1426 | | |
1427 | 840 | const PlacementInfoPB& ClusterLoadBalancer::GetClusterPlacementInfo() const { |
1428 | 840 | auto l = down_cast<enterprise::CatalogManager*> |
1429 | 840 | (catalog_manager_)->GetClusterConfigInfo()->LockForRead(); |
1430 | 840 | if (state_->options_->type == LIVE) { |
1431 | 0 | return l->pb.replication_info().live_replicas(); |
1432 | 840 | } else { |
1433 | 840 | return GetReadOnlyPlacementFromUuid(l->pb.replication_info()); |
1434 | 840 | } |
1435 | 840 | } |
1436 | | |
1437 | 121k | const BlacklistPB& ClusterLoadBalancer::GetServerBlacklist() const { |
1438 | 121k | return catalog_manager_->cluster_config_->LockForRead()->pb.server_blacklist(); |
1439 | 121k | } |
1440 | | |
1441 | 121k | const BlacklistPB& ClusterLoadBalancer::GetLeaderBlacklist() const { |
1442 | 121k | return catalog_manager_->cluster_config_->LockForRead()->pb.leader_blacklist(); |
1443 | 121k | } |
1444 | | |
1445 | 25.9M | bool ClusterLoadBalancer::SkipLoadBalancing(const TableInfo& table) const { |
1446 | | // Skip load-balancing of some tables: |
1447 | | // * system tables: they are virtual tables not hosted by tservers. |
1448 | | // * colocated user tables: they occupy the same tablet as their colocated parent table, so load |
1449 | | // balancing just the colocated parent table is sufficient. |
1450 | | // * deleted/deleting tables: as they are no longer in effect. For tables that are being deleted |
1451 | | // currently as well, load distribution wouldn't matter as eventually they would get deleted. |
1452 | 25.9M | auto l = table.LockForRead(); |
1453 | 25.9M | return (catalog_manager_->IsSystemTable(table) || |
1454 | 728k | table.IsColocatedUserTable() || |
1455 | 725k | l->started_deleting()); |
1456 | 25.9M | } |
1457 | | |
1458 | | Status ClusterLoadBalancer::CountPendingTasksUnlocked(const TableId& table_uuid, |
1459 | | int* pending_add_replica_tasks, |
1460 | | int* pending_remove_replica_tasks, |
1461 | 121k | int* pending_stepdown_leader_tasks) { |
1462 | 121k | GetPendingTasks(table_uuid, |
1463 | 121k | &state_->pending_add_replica_tasks_[table_uuid], |
1464 | 121k | &state_->pending_remove_replica_tasks_[table_uuid], |
1465 | 121k | &state_->pending_stepdown_leader_tasks_[table_uuid]); |
1466 | | |
1467 | 121k | *pending_add_replica_tasks += state_->pending_add_replica_tasks_[table_uuid].size(); |
1468 | 121k | *pending_remove_replica_tasks += state_->pending_remove_replica_tasks_[table_uuid].size(); |
1469 | 121k | *pending_stepdown_leader_tasks += state_->pending_stepdown_leader_tasks_[table_uuid].size(); |
1470 | 181 | for (auto e : state_->pending_add_replica_tasks_[table_uuid]) { |
1471 | 181 | const auto& ts_uuid = e.second; |
1472 | 181 | const auto& tablet_id = e.first; |
1473 | 181 | RETURN_NOT_OK(state_->AddStartingTablet(tablet_id, ts_uuid)); |
1474 | 181 | } |
1475 | 121k | return Status::OK(); |
1476 | 121k | } |
1477 | | |
1478 | | void ClusterLoadBalancer::GetPendingTasks(const TableId& table_uuid, |
1479 | | TabletToTabletServerMap* add_replica_tasks, |
1480 | | TabletToTabletServerMap* remove_replica_tasks, |
1481 | 121k | TabletToTabletServerMap* stepdown_leader_tasks) { |
1482 | 121k | catalog_manager_->GetPendingServerTasksUnlocked( |
1483 | 121k | table_uuid, add_replica_tasks, remove_replica_tasks, stepdown_leader_tasks); |
1484 | 121k | } |
1485 | | |
1486 | | Status ClusterLoadBalancer::SendReplicaChanges( |
1487 | | scoped_refptr<TabletInfo> tablet, const TabletServerId& ts_uuid, const bool is_add, |
1488 | 7.65k | const bool should_remove_leader, const TabletServerId& new_leader_ts_uuid) { |
1489 | 7.65k | auto l = tablet->LockForRead(); |
1490 | 7.65k | if (is_add) { |
1491 | | // These checks are temporary. They will be removed once we are confident that the algorithm is |
1492 | | // always doing the right thing. |
1493 | 1.04k | SCHECK_EQ(state_->pending_add_replica_tasks_[tablet->table()->id()].count(tablet->tablet_id()), |
1494 | 1.04k | 0U, |
1495 | 1.04k | IllegalState, |
1496 | 1.04k | "Sending duplicate add replica task."); |
1497 | 1.04k | catalog_manager_->SendAddServerRequest( |
1498 | 1.04k | tablet, GetDefaultMemberType(), l->pb.committed_consensus_state(), ts_uuid); |
1499 | 6.60k | } else { |
1500 | | // If the replica is also the leader, first step it down and then remove. |
1501 | 6.60k | if (state_->per_tablet_meta_[tablet->id()].leader_uuid == ts_uuid) { |
1502 | 5.81k | SCHECK_EQ( |
1503 | 5.81k | state_->pending_stepdown_leader_tasks_[tablet->table()->id()].count(tablet->tablet_id()), |
1504 | 5.81k | 0U, |
1505 | 5.81k | IllegalState, |
1506 | 5.81k | "Sending duplicate leader stepdown task."); |
1507 | 5.81k | catalog_manager_->SendLeaderStepDownRequest( |
1508 | 5.81k | tablet, l->pb.committed_consensus_state(), ts_uuid, should_remove_leader, |
1509 | 5.81k | new_leader_ts_uuid); |
1510 | 794 | } else { |
1511 | 794 | SCHECK_EQ( |
1512 | 794 | state_->pending_remove_replica_tasks_[tablet->table()->id()].count(tablet->tablet_id()), |
1513 | 794 | 0U, |
1514 | 794 | IllegalState, |
1515 | 794 | "Sending duplicate remove replica task."); |
1516 | 794 | catalog_manager_->SendRemoveServerRequest( |
1517 | 794 | tablet, l->pb.committed_consensus_state(), ts_uuid); |
1518 | 794 | } |
1519 | 6.60k | } |
1520 | 7.65k | return Status::OK(); |
1521 | 7.65k | } |
1522 | | |
1523 | 1.04k | consensus::PeerMemberType ClusterLoadBalancer::GetDefaultMemberType() { |
1524 | 1.04k | if (state_->options_->type == LIVE) { |
1525 | 989 | return consensus::PeerMemberType::PRE_VOTER; |
1526 | 56 | } else { |
1527 | 56 | return consensus::PeerMemberType::PRE_OBSERVER; |
1528 | 56 | } |
1529 | 1.04k | } |
1530 | | |
1531 | 2.68k | Result<bool> ClusterLoadBalancer::IsConfigMemberInTransitionMode(const TabletId &tablet_id) const { |
1532 | 2.68k | auto tablet = GetTabletMap().at(tablet_id); |
1533 | 2.68k | auto l = tablet->LockForRead(); |
1534 | 2.68k | auto config = l->pb.committed_consensus_state().config(); |
1535 | 2.68k | return CountVotersInTransition(config) != 0; |
1536 | 2.68k | } |
1537 | | |
1538 | | const PlacementInfoPB& ClusterLoadBalancer::GetReadOnlyPlacementFromUuid( |
1539 | 840 | const ReplicationInfoPB& replication_info) const { |
1540 | | // We assume we have an read replicas field in our replication info. |
1541 | 876 | for (int i = 0; i < replication_info.read_replicas_size(); i++) { |
1542 | 876 | const PlacementInfoPB& read_only_placement = replication_info.read_replicas(i); |
1543 | 876 | if (read_only_placement.placement_uuid() == state_->options_->placement_uuid) { |
1544 | 840 | return read_only_placement; |
1545 | 840 | } |
1546 | 876 | } |
1547 | | // Should never get here. |
1548 | 0 | LOG(ERROR) << "Could not find read only cluster with placement uuid: " |
1549 | 0 | << state_->options_->placement_uuid; |
1550 | 0 | return replication_info.read_replicas(0); |
1551 | 840 | } |
1552 | | |
1553 | 0 | const PlacementInfoPB& ClusterLoadBalancer::GetLiveClusterPlacementInfo() const { |
1554 | 0 | auto l = down_cast<enterprise::CatalogManager*> |
1555 | 0 | (catalog_manager_)->GetClusterConfigInfo()->LockForRead(); |
1556 | 0 | return l->pb.replication_info().live_replicas(); |
1557 | 0 | } |
1558 | | |
1559 | 0 | vector<scoped_refptr<TableInfo>> ClusterLoadBalancer::GetAllTablesLoadBalancerSkipped() { |
1560 | 0 | SharedLock<decltype(mutex_)> l(mutex_); |
1561 | 0 | return skipped_tables_; |
1562 | 0 | } |
1563 | | |
1564 | | } // namespace master |
1565 | | } // namespace yb |