YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/master/ts_manager.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include "yb/master/ts_manager.h"
34
35
#include <mutex>
36
#include <vector>
37
38
#include "yb/gutil/map-util.h"
39
40
#include "yb/master/master_heartbeat.pb.h"
41
#include "yb/master/ts_descriptor.h"
42
43
using std::shared_ptr;
44
using std::string;
45
using std::vector;
46
47
namespace yb {
48
namespace master {
49
50
5.45k
TSManager::TSManager() {
51
5.45k
}
52
53
92
TSManager::~TSManager() {
54
92
}
55
56
Status TSManager::LookupTS(const NodeInstancePB& instance,
57
384k
                           TSDescriptorPtr* ts_desc) {
58
384k
  SharedLock<decltype(lock_)> l(lock_);
59
60
384k
  const TSDescriptorPtr* found_ptr =
61
384k
    FindOrNull(servers_by_id_, instance.permanent_uuid());
62
384k
  if (!found_ptr || (*found_ptr)->IsRemoved()) {
63
101
    return STATUS(NotFound, "unknown tablet server ID", instance.ShortDebugString());
64
101
  }
65
383k
  const TSDescriptorPtr& found = *found_ptr;
66
67
383k
  if (instance.instance_seqno() != found->latest_seqno()) {
68
33
    return STATUS(NotFound, "mismatched instance sequence number", instance.ShortDebugString());
69
33
  }
70
71
383k
  *ts_desc = found;
72
383k
  return Status::OK();
73
383k
}
74
75
bool TSManager::LookupTSByUUID(const string& uuid,
76
423k
                               TSDescriptorPtr* ts_desc) {
77
423k
  SharedLock<decltype(lock_)> l(lock_);
78
423k
  const TSDescriptorPtr* found_ptr = FindOrNull(servers_by_id_, uuid);
79
423k
  if (!found_ptr || (*found_ptr)->IsRemoved()) {
80
65
    return false;
81
65
  }
82
423k
  *ts_desc = *found_ptr;
83
423k
  return true;
84
423k
}
85
86
bool HasSameHostPort(const google::protobuf::RepeatedPtrField<HostPortPB>& old_addresses,
87
11.5k
                     const google::protobuf::RepeatedPtrField<HostPortPB>& new_addresses) {
88
5.78k
  for (const auto& old_address : old_addresses) {
89
5.78k
    for (const auto& new_address : new_addresses) {
90
5.78k
      if (old_address.host() == new_address.host() && old_address.port() == new_address.port())
91
1
        return true;
92
5.78k
    }
93
5.78k
  }
94
95
11.5k
  return false;
96
11.5k
}
97
98
Status TSManager::RegisterTS(const NodeInstancePB& instance,
99
                             const TSRegistrationPB& registration,
100
                             CloudInfoPB local_cloud_info,
101
                             rpc::ProxyCache* proxy_cache,
102
5.65k
                             RegisteredThroughHeartbeat registered_through_heartbeat) {
103
5.65k
  TSCountCallback callback_to_call;
104
105
5.65k
  {
106
5.65k
    std::lock_guard<decltype(lock_)> l(lock_);
107
5.65k
    const string& uuid = instance.permanent_uuid();
108
109
5.65k
    auto it = servers_by_id_.find(uuid);
110
5.65k
    if (it == servers_by_id_.end()) {
111
      // Check if a server with the same host and port already exists.
112
5.78k
      for (const auto& map_entry : servers_by_id_) {
113
5.78k
        const auto ts_info = map_entry.second->GetTSInformationPB();
114
115
5.78k
        if (HasSameHostPort(ts_info->registration().common().private_rpc_addresses(),
116
5.78k
                            registration.common().private_rpc_addresses()) ||
117
5.78k
            HasSameHostPort(ts_info->registration().common().broadcast_addresses(),
118
1
                            registration.common().broadcast_addresses())) {
119
1
          if (ts_info->tserver_instance().instance_seqno() >= instance.instance_seqno()) {
120
            // Skip adding the node since we already have a node with the same rpc address and
121
            // a higher sequence number.
122
0
            LOG(WARNING) << "Skipping registration for TS " << instance.ShortDebugString()
123
0
                << " since an entry with same host/port but a higher sequence number exists "
124
0
                << ts_info->ShortDebugString();
125
0
            return Status::OK();
126
1
          } else {
127
1
            LOG(WARNING) << "Removing entry: " << ts_info->ShortDebugString()
128
1
                << " since we received registration for a tserver with a higher sequence number: "
129
1
                << instance.ShortDebugString();
130
            // Mark the old node to be removed, since we have a newer sequence number.
131
1
            map_entry.second->SetRemoved();
132
1
          }
133
1
        }
134
5.78k
      }
135
136
5.53k
      auto new_desc = VERIFY_RESULT(TSDescriptor::RegisterNew(
137
5.53k
          instance, registration, std::move(local_cloud_info), proxy_cache,
138
5.53k
          registered_through_heartbeat));
139
5.53k
      InsertOrDie(&servers_by_id_, uuid, std::move(new_desc));
140
5.53k
      LOG(INFO) << "Registered new tablet server { " << instance.ShortDebugString()
141
5.53k
                << " } with Master, full list: " << yb::ToString(servers_by_id_);
142
143
116
    } else {
144
116
      RETURN_NOT_OK(it->second->Register(
145
116
          instance, registration, std::move(local_cloud_info), proxy_cache));
146
107
      LOG(INFO) << "Re-registered known tablet server { " << instance.ShortDebugString()
147
107
                << " }: " << registration.ShortDebugString();
148
107
    }
149
150
5.64k
    if (!ts_count_callback_.empty()) {
151
903
      auto new_count = GetCountUnlocked();
152
903
      if (new_count >= ts_count_callback_min_count_) {
153
391
        callback_to_call = std::move(ts_count_callback_);
154
391
        ts_count_callback_min_count_ = 0;
155
391
      }
156
903
    }
157
5.64k
  }
158
159
5.64k
  if (!callback_to_call.empty()) {
160
391
    callback_to_call();
161
391
  }
162
163
5.64k
  return Status::OK();
164
5.65k
}
165
166
void TSManager::GetDescriptors(std::function<bool(const TSDescriptorPtr&)> condition,
167
743k
                               TSDescriptorVector* descs) const {
168
743k
  SharedLock<decltype(lock_)> l(lock_);
169
743k
  GetDescriptorsUnlocked(condition, descs);
170
743k
}
171
172
void TSManager::GetDescriptorsUnlocked(
173
1.02M
    std::function<bool(const TSDescriptorPtr&)> condition, TSDescriptorVector* descs) const {
174
1.02M
  descs->clear();
175
176
1.02M
  descs->reserve(servers_by_id_.size());
177
3.13M
  for (const TSDescriptorMap::value_type& entry : servers_by_id_) {
178
3.13M
    const TSDescriptorPtr& ts = entry.second;
179
3.13M
    if (condition(ts)) {
180
18.4E
      VLOG(1) << " Adding " << yb::ToString(*ts);
181
3.10M
      descs->push_back(ts);
182
24.5k
    } else {
183
18.4E
      VLOG(1) << " NOT Adding " << yb::ToString(*ts);
184
24.5k
    }
185
3.13M
  }
186
1.02M
}
187
188
275k
void TSManager::GetAllDescriptors(TSDescriptorVector* descs) const {
189
275k
  SharedLock<decltype(lock_)> l(lock_);
190
275k
  GetAllDescriptorsUnlocked(descs);
191
275k
}
192
193
1
TSDescriptorVector TSManager::GetAllDescriptors() const {
194
1
  TSDescriptorVector descs;
195
1
  GetAllDescriptors(&descs);
196
1
  return descs;
197
1
}
198
199
276k
void TSManager::GetAllDescriptorsUnlocked(TSDescriptorVector* descs) const {
200
781k
  GetDescriptorsUnlocked([](const TSDescriptorPtr& ts) -> bool { return !ts->IsRemoved(); }, descs);
201
276k
}
202
203
void TSManager::GetAllLiveDescriptors(TSDescriptorVector* descs,
204
744k
                                      const boost::optional<BlacklistSet>& blacklist) const {
205
2.35M
  GetDescriptors([blacklist](const TSDescriptorPtr& ts) -> bool {
206
2.35M
    return ts->IsLive() && !IsTsBlacklisted(ts, blacklist); }, descs);
207
744k
}
208
209
0
void TSManager::GetAllReportedDescriptors(TSDescriptorVector* descs) const {
210
0
  GetDescriptors([](const TSDescriptorPtr& ts)
211
0
                   -> bool { return ts->IsLive() && ts->has_tablet_report(); }, descs);
212
0
}
213
214
10.2k
bool TSManager::IsTsInCluster(const TSDescriptorPtr& ts, string cluster_uuid) {
215
10.2k
  return ts->placement_uuid() == cluster_uuid;
216
10.2k
}
217
218
bool TSManager::IsTsBlacklisted(const TSDescriptorPtr& ts,
219
2.33M
                                const boost::optional<BlacklistSet>& blacklist) {
220
2.33M
  if (!blacklist.is_initialized()) {
221
1.89M
    return false;
222
1.89M
  }
223
444k
  return ts->IsBlacklisted(*blacklist);
224
444k
}
225
226
void TSManager::GetAllLiveDescriptorsInCluster(TSDescriptorVector* descs,
227
    string placement_uuid,
228
    const boost::optional<BlacklistSet>& blacklist,
229
3.66k
    bool primary_cluster) const {
230
3.66k
  descs->clear();
231
3.66k
  SharedLock<decltype(lock_)> l(lock_);
232
233
3.66k
  descs->reserve(servers_by_id_.size());
234
10.2k
  for (const TSDescriptorMap::value_type& entry : servers_by_id_) {
235
10.2k
    const TSDescriptorPtr& ts = entry.second;
236
    // ts_in_cluster true if there's a matching config and tserver placement uuid or
237
    // if we're getting primary nodes and the tserver placement uuid is empty.
238
10.2k
    bool ts_in_cluster = (IsTsInCluster(ts, placement_uuid) ||
239
32
                         (primary_cluster && ts->placement_uuid().empty()));
240
10.2k
    if (ts->IsLive() && !IsTsBlacklisted(ts, blacklist) && ts_in_cluster) {
241
10.2k
      descs->push_back(ts);
242
10.2k
    }
243
10.2k
  }
244
3.66k
}
245
246
0
const TSDescriptorPtr TSManager::GetTSDescriptor(const HostPortPB& host_port) const {
247
0
  SharedLock<decltype(lock_)> l(lock_);
248
249
0
  for (const TSDescriptorMap::value_type& entry : servers_by_id_) {
250
0
    const TSDescriptorPtr& ts = entry.second;
251
0
    if (ts->IsLive() && ts->IsRunningOn(host_port)) {
252
0
      return ts;
253
0
    }
254
0
  }
255
256
0
  return nullptr;
257
0
}
258
259
903
size_t TSManager::GetCountUnlocked() const {
260
903
  TSDescriptorVector descs;
261
903
  GetAllDescriptorsUnlocked(&descs);
262
903
  return descs.size();
263
903
}
264
265
// Register a callback to be called when the number of tablet servers reaches a certain number.
266
450
void TSManager::SetTSCountCallback(int min_count, TSCountCallback callback) {
267
450
  std::lock_guard<rw_spinlock> l(lock_);
268
450
  ts_count_callback_ = std::move(callback);
269
450
  ts_count_callback_min_count_ = min_count;
270
450
}
271
272
} // namespace master
273
} // namespace yb