YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/master/catalog_manager_bg_tasks.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
32
#include "yb/master/catalog_manager_bg_tasks.h"
33
34
#include <memory>
35
36
#include "yb/gutil/casts.h"
37
38
#include "yb/master/cluster_balance.h"
39
#include "yb/master/master.h"
40
#include "yb/master/ts_descriptor.h"
41
#include "yb/master/tablet_split_manager.h"
42
43
#include "yb/util/flag_tags.h"
44
#include "yb/util/mutex.h"
45
#include "yb/util/status_log.h"
46
#include "yb/util/thread.h"
47
48
using std::shared_ptr;
49
50
DEFINE_int32(catalog_manager_bg_task_wait_ms, 1000,
51
             "Amount of time the catalog manager background task thread waits "
52
             "between runs");
53
TAG_FLAG(catalog_manager_bg_task_wait_ms, runtime);
54
55
DEFINE_int32(load_balancer_initial_delay_secs, yb::master::kDelayAfterFailoverSecs,
56
             "Amount of time to wait between becoming master leader and enabling the load "
57
             "balancer.");
58
59
DEFINE_bool(sys_catalog_respect_affinity_task, true,
60
            "Whether the master sys catalog tablet respects cluster config preferred zones "
61
            "and sends step down requests to a preferred leader.");
62
63
DECLARE_bool(enable_ysql);
64
65
namespace yb {
66
namespace master {
67
68
CatalogManagerBgTasks::CatalogManagerBgTasks(CatalogManager *catalog_manager)
69
    : closing_(false),
70
      pending_updates_(false),
71
      cond_(&lock_),
72
      thread_(nullptr),
73
5.35k
      catalog_manager_(down_cast<enterprise::CatalogManager*>(catalog_manager)) {
74
5.35k
}
75
76
13.0k
void CatalogManagerBgTasks::Wake() {
77
13.0k
  MutexLock lock(lock_);
78
13.0k
  pending_updates_ = true;
79
13.0k
  cond_.Broadcast();
80
13.0k
}
81
82
244k
void CatalogManagerBgTasks::Wait(int msec) {
83
244k
  MutexLock lock(lock_);
84
244k
  if (closing_.load()) return;
85
244k
  if (!pending_updates_) {
86
241k
    cond_.TimedWait(MonoDelta::FromMilliseconds(msec));
87
241k
  }
88
244k
  pending_updates_ = false;
89
244k
}
90
91
168k
void CatalogManagerBgTasks::WakeIfHasPendingUpdates() {
92
168k
  MutexLock lock(lock_);
93
168k
  if (pending_updates_) {
94
3
    cond_.Broadcast();
95
3
  }
96
168k
}
97
98
5.35k
Status CatalogManagerBgTasks::Init() {
99
5.35k
  RETURN_NOT_OK(yb::Thread::Create("catalog manager", "bgtasks",
100
5.35k
      &CatalogManagerBgTasks::Run, this, &thread_));
101
5.35k
  return Status::OK();
102
5.35k
}
103
104
101
void CatalogManagerBgTasks::Shutdown() {
105
101
  {
106
101
    bool closing_expected = false;
107
101
    if (!closing_.compare_exchange_strong(closing_expected, true)) {
108
0
      VLOG(2) << "CatalogManagerBgTasks already shut down";
109
0
      return;
110
0
    }
111
101
  }
112
113
101
  Wake();
114
101
  if (thread_ != nullptr) {
115
101
    CHECK_OK(ThreadJoiner(thread_.get()).Join());
116
101
  }
117
101
}
118
119
5.35k
void CatalogManagerBgTasks::Run() {
120
249k
  while (!closing_.load()) {
121
    // Perform assignment processing.
122
244k
    SCOPED_LEADER_SHARED_LOCK(l, catalog_manager_);
123
244k
    if (!l.catalog_status().ok()) {
124
3.26k
      LOG(WARNING) << "Catalog manager background task thread going to sleep: "
125
3.26k
                   << l.catalog_status().ToString();
126
240k
    } else if (l.leader_status().ok()) {
127
      // Clear metrics for dead tservers.
128
90.0k
      vector<shared_ptr<TSDescriptor>> descs;
129
90.0k
      const auto& ts_manager = catalog_manager_->master_->ts_manager();
130
90.0k
      ts_manager->GetAllDescriptors(&descs);
131
262k
      for (auto& ts_desc : descs) {
132
262k
        if (!ts_desc->IsLive()) {
133
6.86k
          ts_desc->ClearMetrics();
134
6.86k
        }
135
262k
      }
136
137
      // Report metrics.
138
90.0k
      catalog_manager_->ReportMetrics();
139
140
      // Cleanup old tasks from tracker.
141
90.0k
      catalog_manager_->tasks_tracker_->CleanupOldTasks();
142
143
90.0k
      TabletInfos to_delete;
144
90.0k
      TableToTabletInfos to_process;
145
146
      // Get list of tablets not yet running or already replaced.
147
90.0k
      catalog_manager_->ExtractTabletsToProcess(&to_delete, &to_process);
148
149
90.0k
      bool processed_tablets = false;
150
90.0k
      if (!to_process.empty()) {
151
        // For those tablets which need to be created in this round, assign replicas.
152
11.8k
        TSDescriptorVector ts_descs = catalog_manager_->GetAllLiveNotBlacklistedTServers();
153
11.8k
        CMGlobalLoadState global_load_state;
154
11.8k
        catalog_manager_->InitializeGlobalLoadState(ts_descs, &global_load_state);
155
        // Transition tablet assignment state from preparing to creating, send
156
        // and schedule creation / deletion RPC messages, etc.
157
        // This is done table by table.
158
12.6k
        for (const auto& entries : to_process) {
159
12.6k
          LOG(INFO) << "Processing pending assignments for table: " << entries.first;
160
12.6k
          Status s = catalog_manager_->ProcessPendingAssignmentsPerTable(
161
12.6k
              entries.first, entries.second, &global_load_state);
162
12.6k
          WARN_NOT_OK(s, "Assignment failed");
163
          // Set processed_tablets as true if the call succeeds for at least one table.
164
12.6k
          processed_tablets = processed_tablets || s.ok();
165
          // TODO Add tests for this in the revision that makes
166
          // create/alter fault tolerant.
167
12.6k
        }
168
11.8k
      }
169
170
      // Do the LB enabling check
171
90.0k
      if (!processed_tablets) {
172
78.1k
        if (catalog_manager_->TimeSinceElectedLeader() >
173
77.4k
            MonoDelta::FromSeconds(FLAGS_load_balancer_initial_delay_secs)) {
174
77.4k
          catalog_manager_->load_balance_policy_->RunLoadBalancer();
175
77.4k
        }
176
78.1k
      }
177
178
90.0k
      TableInfoMap table_info_map;
179
90.0k
      {
180
90.0k
        CatalogManager::SharedLock lock(catalog_manager_->mutex_);
181
90.0k
        table_info_map = *catalog_manager_->table_ids_map_;
182
90.0k
      }
183
90.0k
      catalog_manager_->tablet_split_manager()->MaybeDoSplitting(table_info_map);
184
185
90.0k
      if (!to_delete.empty() || catalog_manager_->AreTablesDeleting()) {
186
17.2k
        catalog_manager_->CleanUpDeletedTables();
187
17.2k
      }
188
90.0k
      std::vector<scoped_refptr<CDCStreamInfo>> streams;
189
90.0k
      auto s = catalog_manager_->FindCDCStreamsMarkedAsDeleting(&streams);
190
90.0k
      if (s.ok() && !streams.empty()) {
191
0
        s = catalog_manager_->CleanUpDeletedCDCStreams(streams);
192
0
      }
193
194
      // Ensure the master sys catalog tablet follows the cluster's affinity specification.
195
90.0k
      if (FLAGS_sys_catalog_respect_affinity_task) {
196
90.0k
        s = catalog_manager_->SysCatalogRespectLeaderAffinity();
197
90.0k
        if (!s.ok()) {
198
80
          YB_LOG_EVERY_N(INFO, 10) << s.message().ToBuffer();
199
80
        }
200
90.0k
      }
201
202
90.0k
      if (FLAGS_enable_ysql) {
203
        // Start the tablespace background task.
204
17.2k
        catalog_manager_->StartTablespaceBgTaskIfStopped();
205
17.2k
      }
206
150k
    } else {
207
      // Reset Metrics when leader_status is not ok.
208
150k
      catalog_manager_->ResetMetrics();
209
150k
    }
210
    // Wait for a notification or a timeout expiration.
211
    //  - CreateTable will call Wake() to notify about the tablets to add
212
    //  - HandleReportedTablet/ProcessPendingAssignments will call WakeIfHasPendingUpdates()
213
    //    to notify about tablets creation.
214
    //  - DeleteTable will call Wake() to finish destructing any table internals
215
244k
    l.Unlock();
216
244k
    Wait(FLAGS_catalog_manager_bg_task_wait_ms);
217
244k
  }
218
5.25k
  VLOG(1) << "Catalog manager background task thread shutting down";
219
5.35k
}
220
221
}  // namespace master
222
}  // namespace yb