YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/master/ts_manager.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include "yb/master/ts_manager.h"
34
35
#include <mutex>
36
#include <vector>
37
38
#include "yb/gutil/map-util.h"
39
40
#include "yb/master/master_heartbeat.pb.h"
41
#include "yb/master/ts_descriptor.h"
42
43
using std::shared_ptr;
44
using std::string;
45
using std::vector;
46
47
namespace yb {
48
namespace master {
49
50
8.07k
TSManager::TSManager() {
51
8.07k
}
52
53
91
TSManager::~TSManager() {
54
91
}
55
56
Status TSManager::LookupTS(const NodeInstancePB& instance,
57
4.81M
                           TSDescriptorPtr* ts_desc) {
58
4.81M
  SharedLock<decltype(lock_)> l(lock_);
59
60
4.81M
  const TSDescriptorPtr* found_ptr =
61
4.81M
    FindOrNull(servers_by_id_, instance.permanent_uuid());
62
4.81M
  if (!found_ptr || 
(*found_ptr)->IsRemoved()4.80M
) {
63
143
    return STATUS(NotFound, "unknown tablet server ID", instance.ShortDebugString());
64
143
  }
65
4.81M
  const TSDescriptorPtr& found = *found_ptr;
66
67
4.81M
  if (instance.instance_seqno() != found->latest_seqno()) {
68
58
    return STATUS(NotFound, "mismatched instance sequence number", instance.ShortDebugString());
69
58
  }
70
71
4.81M
  *ts_desc = found;
72
4.81M
  return Status::OK();
73
4.81M
}
74
75
bool TSManager::LookupTSByUUID(const string& uuid,
76
719k
                               TSDescriptorPtr* ts_desc) {
77
719k
  SharedLock<decltype(lock_)> l(lock_);
78
719k
  const TSDescriptorPtr* found_ptr = FindOrNull(servers_by_id_, uuid);
79
719k
  if (!found_ptr || 
(*found_ptr)->IsRemoved()719k
) {
80
107
    return false;
81
107
  }
82
719k
  *ts_desc = *found_ptr;
83
719k
  return true;
84
719k
}
85
86
bool HasSameHostPort(const google::protobuf::RepeatedPtrField<HostPortPB>& old_addresses,
87
17.7k
                     const google::protobuf::RepeatedPtrField<HostPortPB>& new_addresses) {
88
17.7k
  for (const auto& old_address : old_addresses) {
89
8.85k
    for (const auto& new_address : new_addresses) {
90
8.85k
      if (old_address.host() == new_address.host() && 
old_address.port() == new_address.port()1.35k
)
91
1
        return true;
92
8.85k
    }
93
8.85k
  }
94
95
17.7k
  return false;
96
17.7k
}
97
98
Status TSManager::RegisterTS(const NodeInstancePB& instance,
99
                             const TSRegistrationPB& registration,
100
                             CloudInfoPB local_cloud_info,
101
                             rpc::ProxyCache* proxy_cache,
102
8.29k
                             RegisteredThroughHeartbeat registered_through_heartbeat) {
103
8.29k
  TSCountCallback callback_to_call;
104
105
8.29k
  {
106
8.29k
    std::lock_guard<decltype(lock_)> l(lock_);
107
8.29k
    const string& uuid = instance.permanent_uuid();
108
109
8.29k
    auto it = servers_by_id_.find(uuid);
110
8.29k
    if (it == servers_by_id_.end()) {
111
      // Check if a server with the same host and port already exists.
112
8.85k
      for (const auto& map_entry : servers_by_id_) {
113
8.85k
        const auto ts_info = map_entry.second->GetTSInformationPB();
114
115
8.85k
        if (HasSameHostPort(ts_info->registration().common().private_rpc_addresses(),
116
8.85k
                            registration.common().private_rpc_addresses()) ||
117
8.85k
            HasSameHostPort(ts_info->registration().common().broadcast_addresses(),
118
8.85k
                            registration.common().broadcast_addresses())) {
119
1
          if (ts_info->tserver_instance().instance_seqno() >= instance.instance_seqno()) {
120
            // Skip adding the node since we already have a node with the same rpc address and
121
            // a higher sequence number.
122
0
            LOG(WARNING) << "Skipping registration for TS " << instance.ShortDebugString()
123
0
                << " since an entry with same host/port but a higher sequence number exists "
124
0
                << ts_info->ShortDebugString();
125
0
            return Status::OK();
126
1
          } else {
127
1
            LOG(WARNING) << "Removing entry: " << ts_info->ShortDebugString()
128
1
                << " since we received registration for a tserver with a higher sequence number: "
129
1
                << instance.ShortDebugString();
130
            // Mark the old node to be removed, since we have a newer sequence number.
131
1
            map_entry.second->SetRemoved();
132
1
          }
133
1
        }
134
8.85k
      }
135
136
8.15k
      auto new_desc = VERIFY_RESULT(TSDescriptor::RegisterNew(
137
8.15k
          instance, registration, std::move(local_cloud_info), proxy_cache,
138
8.15k
          registered_through_heartbeat));
139
0
      InsertOrDie(&servers_by_id_, uuid, std::move(new_desc));
140
8.15k
      LOG(INFO) << "Registered new tablet server { " << instance.ShortDebugString()
141
8.15k
                << " } with Master, full list: " << yb::ToString(servers_by_id_);
142
143
8.15k
    } else {
144
145
      RETURN_NOT_OK(it->second->Register(
145
145
          instance, registration, std::move(local_cloud_info), proxy_cache));
146
136
      LOG(INFO) << "Re-registered known tablet server { " << instance.ShortDebugString()
147
136
                << " }: " << registration.ShortDebugString();
148
136
    }
149
150
8.28k
    if (!ts_count_callback_.empty()) {
151
2.14k
      auto new_count = GetCountUnlocked();
152
2.14k
      if (new_count >= ts_count_callback_min_count_) {
153
901
        callback_to_call = std::move(ts_count_callback_);
154
901
        ts_count_callback_min_count_ = 0;
155
901
      }
156
2.14k
    }
157
8.28k
  }
158
159
8.28k
  if (!callback_to_call.empty()) {
160
901
    callback_to_call();
161
901
  }
162
163
8.28k
  return Status::OK();
164
8.29k
}
165
166
void TSManager::GetDescriptors(std::function<bool(const TSDescriptorPtr&)> condition,
167
6.76M
                               TSDescriptorVector* descs) const {
168
6.76M
  SharedLock<decltype(lock_)> l(lock_);
169
6.76M
  GetDescriptorsUnlocked(condition, descs);
170
6.76M
}
171
172
void TSManager::GetDescriptorsUnlocked(
173
11.5M
    std::function<bool(const TSDescriptorPtr&)> condition, TSDescriptorVector* descs) const {
174
11.5M
  descs->clear();
175
176
11.5M
  descs->reserve(servers_by_id_.size());
177
39.2M
  for (const TSDescriptorMap::value_type& entry : servers_by_id_) {
178
39.2M
    const TSDescriptorPtr& ts = entry.second;
179
39.2M
    if (condition(ts)) {
180
18.4E
      VLOG(1) << " Adding " << yb::ToString(*ts);
181
38.5M
      descs->push_back(ts);
182
38.5M
    } else {
183
18.4E
      VLOG(1) << " NOT Adding " << yb::ToString(*ts);
184
676k
    }
185
39.2M
  }
186
11.5M
}
187
188
4.76M
void TSManager::GetAllDescriptors(TSDescriptorVector* descs) const {
189
4.76M
  SharedLock<decltype(lock_)> l(lock_);
190
4.76M
  GetAllDescriptorsUnlocked(descs);
191
4.76M
}
192
193
1
TSDescriptorVector TSManager::GetAllDescriptors() const {
194
1
  TSDescriptorVector descs;
195
1
  GetAllDescriptors(&descs);
196
1
  return descs;
197
1
}
198
199
4.76M
void TSManager::GetAllDescriptorsUnlocked(TSDescriptorVector* descs) const {
200
15.0M
  GetDescriptorsUnlocked([](const TSDescriptorPtr& ts) -> bool { return !ts->IsRemoved(); }, descs);
201
4.76M
}
202
203
void TSManager::GetAllLiveDescriptors(TSDescriptorVector* descs,
204
6.75M
                                      const boost::optional<BlacklistSet>& blacklist) const {
205
24.1M
  GetDescriptors([blacklist](const TSDescriptorPtr& ts) -> bool {
206
24.1M
    return ts->IsLive() && 
!IsTsBlacklisted(ts, blacklist)23.4M
; }, descs);
207
6.75M
}
208
209
0
void TSManager::GetAllReportedDescriptors(TSDescriptorVector* descs) const {
210
0
  GetDescriptors([](const TSDescriptorPtr& ts)
211
0
                   -> bool { return ts->IsLive() && ts->has_tablet_report(); }, descs);
212
0
}
213
214
22.7k
bool TSManager::IsTsInCluster(const TSDescriptorPtr& ts, string cluster_uuid) {
215
22.7k
  return ts->placement_uuid() == cluster_uuid;
216
22.7k
}
217
218
bool TSManager::IsTsBlacklisted(const TSDescriptorPtr& ts,
219
23.4M
                                const boost::optional<BlacklistSet>& blacklist) {
220
23.4M
  if (!blacklist.is_initialized()) {
221
22.7M
    return false;
222
22.7M
  }
223
762k
  return ts->IsBlacklisted(*blacklist);
224
23.4M
}
225
226
void TSManager::GetAllLiveDescriptorsInCluster(TSDescriptorVector* descs,
227
    string placement_uuid,
228
    const boost::optional<BlacklistSet>& blacklist,
229
8.16k
    bool primary_cluster) const {
230
8.16k
  descs->clear();
231
8.16k
  SharedLock<decltype(lock_)> l(lock_);
232
233
8.16k
  descs->reserve(servers_by_id_.size());
234
22.7k
  for (const TSDescriptorMap::value_type& entry : servers_by_id_) {
235
22.7k
    const TSDescriptorPtr& ts = entry.second;
236
    // ts_in_cluster true if there's a matching config and tserver placement uuid or
237
    // if we're getting primary nodes and the tserver placement uuid is empty.
238
22.7k
    bool ts_in_cluster = (IsTsInCluster(ts, placement_uuid) ||
239
22.7k
                         
(44
primary_cluster44
&&
ts->placement_uuid().empty()44
));
240
22.7k
    if (ts->IsLive() && 
!IsTsBlacklisted(ts, blacklist)22.7k
&&
ts_in_cluster22.7k
) {
241
22.6k
      descs->push_back(ts);
242
22.6k
    }
243
22.7k
  }
244
8.16k
}
245
246
0
const TSDescriptorPtr TSManager::GetTSDescriptor(const HostPortPB& host_port) const {
247
0
  SharedLock<decltype(lock_)> l(lock_);
248
249
0
  for (const TSDescriptorMap::value_type& entry : servers_by_id_) {
250
0
    const TSDescriptorPtr& ts = entry.second;
251
0
    if (ts->IsLive() && ts->IsRunningOn(host_port)) {
252
0
      return ts;
253
0
    }
254
0
  }
255
256
0
  return nullptr;
257
0
}
258
259
2.14k
size_t TSManager::GetCountUnlocked() const {
260
2.14k
  TSDescriptorVector descs;
261
2.14k
  GetAllDescriptorsUnlocked(&descs);
262
2.14k
  return descs.size();
263
2.14k
}
264
265
// Register a callback to be called when the number of tablet servers reaches a certain number.
266
977
void TSManager::SetTSCountCallback(int min_count, TSCountCallback callback) {
267
977
  std::lock_guard<rw_spinlock> l(lock_);
268
977
  ts_count_callback_ = std::move(callback);
269
977
  ts_count_callback_min_count_ = min_count;
270
977
}
271
272
} // namespace master
273
} // namespace yb