/Users/deen/code/yugabyte-db/src/yb/master/catalog_manager_bg_tasks.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | // |
18 | | // The following only applies to changes made to this file as part of YugaByte development. |
19 | | // |
20 | | // Portions Copyright (c) YugaByte, Inc. |
21 | | // |
22 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
23 | | // in compliance with the License. You may obtain a copy of the License at |
24 | | // |
25 | | // http://www.apache.org/licenses/LICENSE-2.0 |
26 | | // |
27 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
28 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
29 | | // or implied. See the License for the specific language governing permissions and limitations |
30 | | // under the License. |
31 | | |
32 | | #include "yb/master/catalog_manager_bg_tasks.h" |
33 | | |
34 | | #include <memory> |
35 | | |
36 | | #include "yb/gutil/casts.h" |
37 | | |
38 | | #include "yb/master/cluster_balance.h" |
39 | | #include "yb/master/master.h" |
40 | | #include "yb/master/ts_descriptor.h" |
41 | | #include "yb/master/tablet_split_manager.h" |
42 | | |
43 | | #include "yb/util/flag_tags.h" |
44 | | #include "yb/util/mutex.h" |
45 | | #include "yb/util/status_log.h" |
46 | | #include "yb/util/thread.h" |
47 | | |
48 | | using std::shared_ptr; |
49 | | |
50 | | DEFINE_int32(catalog_manager_bg_task_wait_ms, 1000, |
51 | | "Amount of time the catalog manager background task thread waits " |
52 | | "between runs"); |
53 | | TAG_FLAG(catalog_manager_bg_task_wait_ms, runtime); |
54 | | |
55 | | DEFINE_int32(load_balancer_initial_delay_secs, yb::master::kDelayAfterFailoverSecs, |
56 | | "Amount of time to wait between becoming master leader and enabling the load " |
57 | | "balancer."); |
58 | | |
59 | | DEFINE_bool(sys_catalog_respect_affinity_task, true, |
60 | | "Whether the master sys catalog tablet respects cluster config preferred zones " |
61 | | "and sends step down requests to a preferred leader."); |
62 | | |
63 | | DECLARE_bool(enable_ysql); |
64 | | |
65 | | namespace yb { |
66 | | namespace master { |
67 | | |
68 | | CatalogManagerBgTasks::CatalogManagerBgTasks(CatalogManager *catalog_manager) |
69 | | : closing_(false), |
70 | | pending_updates_(false), |
71 | | cond_(&lock_), |
72 | | thread_(nullptr), |
73 | 5.35k | catalog_manager_(down_cast<enterprise::CatalogManager*>(catalog_manager)) { |
74 | 5.35k | } |
75 | | |
76 | 13.0k | void CatalogManagerBgTasks::Wake() { |
77 | 13.0k | MutexLock lock(lock_); |
78 | 13.0k | pending_updates_ = true; |
79 | 13.0k | cond_.Broadcast(); |
80 | 13.0k | } |
81 | | |
82 | 244k | void CatalogManagerBgTasks::Wait(int msec) { |
83 | 244k | MutexLock lock(lock_); |
84 | 244k | if (closing_.load()) return; |
85 | 244k | if (!pending_updates_) { |
86 | 241k | cond_.TimedWait(MonoDelta::FromMilliseconds(msec)); |
87 | 241k | } |
88 | 244k | pending_updates_ = false; |
89 | 244k | } |
90 | | |
91 | 168k | void CatalogManagerBgTasks::WakeIfHasPendingUpdates() { |
92 | 168k | MutexLock lock(lock_); |
93 | 168k | if (pending_updates_) { |
94 | 3 | cond_.Broadcast(); |
95 | 3 | } |
96 | 168k | } |
97 | | |
98 | 5.35k | Status CatalogManagerBgTasks::Init() { |
99 | 5.35k | RETURN_NOT_OK(yb::Thread::Create("catalog manager", "bgtasks", |
100 | 5.35k | &CatalogManagerBgTasks::Run, this, &thread_)); |
101 | 5.35k | return Status::OK(); |
102 | 5.35k | } |
103 | | |
104 | 101 | void CatalogManagerBgTasks::Shutdown() { |
105 | 101 | { |
106 | 101 | bool closing_expected = false; |
107 | 101 | if (!closing_.compare_exchange_strong(closing_expected, true)) { |
108 | 0 | VLOG(2) << "CatalogManagerBgTasks already shut down"; |
109 | 0 | return; |
110 | 0 | } |
111 | 101 | } |
112 | | |
113 | 101 | Wake(); |
114 | 101 | if (thread_ != nullptr) { |
115 | 101 | CHECK_OK(ThreadJoiner(thread_.get()).Join()); |
116 | 101 | } |
117 | 101 | } |
118 | | |
119 | 5.35k | void CatalogManagerBgTasks::Run() { |
120 | 249k | while (!closing_.load()) { |
121 | | // Perform assignment processing. |
122 | 244k | SCOPED_LEADER_SHARED_LOCK(l, catalog_manager_); |
123 | 244k | if (!l.catalog_status().ok()) { |
124 | 3.26k | LOG(WARNING) << "Catalog manager background task thread going to sleep: " |
125 | 3.26k | << l.catalog_status().ToString(); |
126 | 240k | } else if (l.leader_status().ok()) { |
127 | | // Clear metrics for dead tservers. |
128 | 90.0k | vector<shared_ptr<TSDescriptor>> descs; |
129 | 90.0k | const auto& ts_manager = catalog_manager_->master_->ts_manager(); |
130 | 90.0k | ts_manager->GetAllDescriptors(&descs); |
131 | 262k | for (auto& ts_desc : descs) { |
132 | 262k | if (!ts_desc->IsLive()) { |
133 | 6.86k | ts_desc->ClearMetrics(); |
134 | 6.86k | } |
135 | 262k | } |
136 | | |
137 | | // Report metrics. |
138 | 90.0k | catalog_manager_->ReportMetrics(); |
139 | | |
140 | | // Cleanup old tasks from tracker. |
141 | 90.0k | catalog_manager_->tasks_tracker_->CleanupOldTasks(); |
142 | | |
143 | 90.0k | TabletInfos to_delete; |
144 | 90.0k | TableToTabletInfos to_process; |
145 | | |
146 | | // Get list of tablets not yet running or already replaced. |
147 | 90.0k | catalog_manager_->ExtractTabletsToProcess(&to_delete, &to_process); |
148 | | |
149 | 90.0k | bool processed_tablets = false; |
150 | 90.0k | if (!to_process.empty()) { |
151 | | // For those tablets which need to be created in this round, assign replicas. |
152 | 11.8k | TSDescriptorVector ts_descs = catalog_manager_->GetAllLiveNotBlacklistedTServers(); |
153 | 11.8k | CMGlobalLoadState global_load_state; |
154 | 11.8k | catalog_manager_->InitializeGlobalLoadState(ts_descs, &global_load_state); |
155 | | // Transition tablet assignment state from preparing to creating, send |
156 | | // and schedule creation / deletion RPC messages, etc. |
157 | | // This is done table by table. |
158 | 12.6k | for (const auto& entries : to_process) { |
159 | 12.6k | LOG(INFO) << "Processing pending assignments for table: " << entries.first; |
160 | 12.6k | Status s = catalog_manager_->ProcessPendingAssignmentsPerTable( |
161 | 12.6k | entries.first, entries.second, &global_load_state); |
162 | 12.6k | WARN_NOT_OK(s, "Assignment failed"); |
163 | | // Set processed_tablets as true if the call succeeds for at least one table. |
164 | 12.6k | processed_tablets = processed_tablets || s.ok(); |
165 | | // TODO Add tests for this in the revision that makes |
166 | | // create/alter fault tolerant. |
167 | 12.6k | } |
168 | 11.8k | } |
169 | | |
170 | | // Do the LB enabling check |
171 | 90.0k | if (!processed_tablets) { |
172 | 78.1k | if (catalog_manager_->TimeSinceElectedLeader() > |
173 | 77.4k | MonoDelta::FromSeconds(FLAGS_load_balancer_initial_delay_secs)) { |
174 | 77.4k | catalog_manager_->load_balance_policy_->RunLoadBalancer(); |
175 | 77.4k | } |
176 | 78.1k | } |
177 | | |
178 | 90.0k | TableInfoMap table_info_map; |
179 | 90.0k | { |
180 | 90.0k | CatalogManager::SharedLock lock(catalog_manager_->mutex_); |
181 | 90.0k | table_info_map = *catalog_manager_->table_ids_map_; |
182 | 90.0k | } |
183 | 90.0k | catalog_manager_->tablet_split_manager()->MaybeDoSplitting(table_info_map); |
184 | | |
185 | 90.0k | if (!to_delete.empty() || catalog_manager_->AreTablesDeleting()) { |
186 | 17.2k | catalog_manager_->CleanUpDeletedTables(); |
187 | 17.2k | } |
188 | 90.0k | std::vector<scoped_refptr<CDCStreamInfo>> streams; |
189 | 90.0k | auto s = catalog_manager_->FindCDCStreamsMarkedAsDeleting(&streams); |
190 | 90.0k | if (s.ok() && !streams.empty()) { |
191 | 0 | s = catalog_manager_->CleanUpDeletedCDCStreams(streams); |
192 | 0 | } |
193 | | |
194 | | // Ensure the master sys catalog tablet follows the cluster's affinity specification. |
195 | 90.0k | if (FLAGS_sys_catalog_respect_affinity_task) { |
196 | 90.0k | s = catalog_manager_->SysCatalogRespectLeaderAffinity(); |
197 | 90.0k | if (!s.ok()) { |
198 | 80 | YB_LOG_EVERY_N(INFO, 10) << s.message().ToBuffer(); |
199 | 80 | } |
200 | 90.0k | } |
201 | | |
202 | 90.0k | if (FLAGS_enable_ysql) { |
203 | | // Start the tablespace background task. |
204 | 17.2k | catalog_manager_->StartTablespaceBgTaskIfStopped(); |
205 | 17.2k | } |
206 | 150k | } else { |
207 | | // Reset Metrics when leader_status is not ok. |
208 | 150k | catalog_manager_->ResetMetrics(); |
209 | 150k | } |
210 | | // Wait for a notification or a timeout expiration. |
211 | | // - CreateTable will call Wake() to notify about the tablets to add |
212 | | // - HandleReportedTablet/ProcessPendingAssignments will call WakeIfHasPendingUpdates() |
213 | | // to notify about tablets creation. |
214 | | // - DeleteTable will call Wake() to finish destructing any table internals |
215 | 244k | l.Unlock(); |
216 | 244k | Wait(FLAGS_catalog_manager_bg_task_wait_ms); |
217 | 244k | } |
218 | 5.25k | VLOG(1) << "Catalog manager background task thread shutting down"; |
219 | 5.35k | } |
220 | | |
221 | | } // namespace master |
222 | | } // namespace yb |