/Users/deen/code/yugabyte-db/src/yb/master/ts_manager.cc
| Line | Count | Source (jump to first uncovered line) | 
| 1 |  | // Licensed to the Apache Software Foundation (ASF) under one | 
| 2 |  | // or more contributor license agreements.  See the NOTICE file | 
| 3 |  | // distributed with this work for additional information | 
| 4 |  | // regarding copyright ownership.  The ASF licenses this file | 
| 5 |  | // to you under the Apache License, Version 2.0 (the | 
| 6 |  | // "License"); you may not use this file except in compliance | 
| 7 |  | // with the License.  You may obtain a copy of the License at | 
| 8 |  | // | 
| 9 |  | //   http://www.apache.org/licenses/LICENSE-2.0 | 
| 10 |  | // | 
| 11 |  | // Unless required by applicable law or agreed to in writing, | 
| 12 |  | // software distributed under the License is distributed on an | 
| 13 |  | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | 
| 14 |  | // KIND, either express or implied.  See the License for the | 
| 15 |  | // specific language governing permissions and limitations | 
| 16 |  | // under the License. | 
| 17 |  | // | 
| 18 |  | // The following only applies to changes made to this file as part of YugaByte development. | 
| 19 |  | // | 
| 20 |  | // Portions Copyright (c) YugaByte, Inc. | 
| 21 |  | // | 
| 22 |  | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except | 
| 23 |  | // in compliance with the License.  You may obtain a copy of the License at | 
| 24 |  | // | 
| 25 |  | // http://www.apache.org/licenses/LICENSE-2.0 | 
| 26 |  | // | 
| 27 |  | // Unless required by applicable law or agreed to in writing, software distributed under the License | 
| 28 |  | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express | 
| 29 |  | // or implied.  See the License for the specific language governing permissions and limitations | 
| 30 |  | // under the License. | 
| 31 |  | // | 
| 32 |  |  | 
| 33 |  | #include "yb/master/ts_manager.h" | 
| 34 |  |  | 
| 35 |  | #include <mutex> | 
| 36 |  | #include <vector> | 
| 37 |  |  | 
| 38 |  | #include "yb/gutil/map-util.h" | 
| 39 |  |  | 
| 40 |  | #include "yb/master/master_heartbeat.pb.h" | 
| 41 |  | #include "yb/master/ts_descriptor.h" | 
| 42 |  |  | 
| 43 |  | using std::shared_ptr; | 
| 44 |  | using std::string; | 
| 45 |  | using std::vector; | 
| 46 |  |  | 
| 47 |  | namespace yb { | 
| 48 |  | namespace master { | 
| 49 |  |  | 
| 50 | 8.07k | TSManager::TSManager() { | 
| 51 | 8.07k | } | 
| 52 |  |  | 
| 53 | 91 | TSManager::~TSManager() { | 
| 54 | 91 | } | 
| 55 |  |  | 
| 56 |  | Status TSManager::LookupTS(const NodeInstancePB& instance, | 
| 57 | 4.81M |                            TSDescriptorPtr* ts_desc) { | 
| 58 | 4.81M |   SharedLock<decltype(lock_)> l(lock_); | 
| 59 |  |  | 
| 60 | 4.81M |   const TSDescriptorPtr* found_ptr = | 
| 61 | 4.81M |     FindOrNull(servers_by_id_, instance.permanent_uuid()); | 
| 62 | 4.81M |   if (!found_ptr || (*found_ptr)->IsRemoved()4.80M) { | 
| 63 | 143 |     return STATUS(NotFound, "unknown tablet server ID", instance.ShortDebugString()); | 
| 64 | 143 |   } | 
| 65 | 4.81M |   const TSDescriptorPtr& found = *found_ptr; | 
| 66 |  |  | 
| 67 | 4.81M |   if (instance.instance_seqno() != found->latest_seqno()) { | 
| 68 | 58 |     return STATUS(NotFound, "mismatched instance sequence number", instance.ShortDebugString()); | 
| 69 | 58 |   } | 
| 70 |  |  | 
| 71 | 4.81M |   *ts_desc = found; | 
| 72 | 4.81M |   return Status::OK(); | 
| 73 | 4.81M | } | 
| 74 |  |  | 
| 75 |  | bool TSManager::LookupTSByUUID(const string& uuid, | 
| 76 | 719k |                                TSDescriptorPtr* ts_desc) { | 
| 77 | 719k |   SharedLock<decltype(lock_)> l(lock_); | 
| 78 | 719k |   const TSDescriptorPtr* found_ptr = FindOrNull(servers_by_id_, uuid); | 
| 79 | 719k |   if (!found_ptr || (*found_ptr)->IsRemoved()719k) { | 
| 80 | 107 |     return false; | 
| 81 | 107 |   } | 
| 82 | 719k |   *ts_desc = *found_ptr; | 
| 83 | 719k |   return true; | 
| 84 | 719k | } | 
| 85 |  |  | 
| 86 |  | bool HasSameHostPort(const google::protobuf::RepeatedPtrField<HostPortPB>& old_addresses, | 
| 87 | 17.7k |                      const google::protobuf::RepeatedPtrField<HostPortPB>& new_addresses) { | 
| 88 | 17.7k |   for (const auto& old_address : old_addresses) { | 
| 89 | 8.85k |     for (const auto& new_address : new_addresses) { | 
| 90 | 8.85k |       if (old_address.host() == new_address.host() && old_address.port() == new_address.port()1.35k) | 
| 91 | 1 |         return true; | 
| 92 | 8.85k |     } | 
| 93 | 8.85k |   } | 
| 94 |  |  | 
| 95 | 17.7k |   return false; | 
| 96 | 17.7k | } | 
| 97 |  |  | 
| 98 |  | Status TSManager::RegisterTS(const NodeInstancePB& instance, | 
| 99 |  |                              const TSRegistrationPB& registration, | 
| 100 |  |                              CloudInfoPB local_cloud_info, | 
| 101 |  |                              rpc::ProxyCache* proxy_cache, | 
| 102 | 8.29k |                              RegisteredThroughHeartbeat registered_through_heartbeat) { | 
| 103 | 8.29k |   TSCountCallback callback_to_call; | 
| 104 |  |  | 
| 105 | 8.29k |   { | 
| 106 | 8.29k |     std::lock_guard<decltype(lock_)> l(lock_); | 
| 107 | 8.29k |     const string& uuid = instance.permanent_uuid(); | 
| 108 |  |  | 
| 109 | 8.29k |     auto it = servers_by_id_.find(uuid); | 
| 110 | 8.29k |     if (it == servers_by_id_.end()) { | 
| 111 |  |       // Check if a server with the same host and port already exists. | 
| 112 | 8.85k |       for (const auto& map_entry : servers_by_id_) { | 
| 113 | 8.85k |         const auto ts_info = map_entry.second->GetTSInformationPB(); | 
| 114 |  |  | 
| 115 | 8.85k |         if (HasSameHostPort(ts_info->registration().common().private_rpc_addresses(), | 
| 116 | 8.85k |                             registration.common().private_rpc_addresses()) || | 
| 117 | 8.85k |             HasSameHostPort(ts_info->registration().common().broadcast_addresses(), | 
| 118 | 8.85k |                             registration.common().broadcast_addresses())) { | 
| 119 | 1 |           if (ts_info->tserver_instance().instance_seqno() >= instance.instance_seqno()) { | 
| 120 |  |             // Skip adding the node since we already have a node with the same rpc address and | 
| 121 |  |             // a higher sequence number. | 
| 122 | 0 |             LOG(WARNING) << "Skipping registration for TS " << instance.ShortDebugString() | 
| 123 | 0 |                 << " since an entry with same host/port but a higher sequence number exists " | 
| 124 | 0 |                 << ts_info->ShortDebugString(); | 
| 125 | 0 |             return Status::OK(); | 
| 126 | 1 |           } else { | 
| 127 | 1 |             LOG(WARNING) << "Removing entry: " << ts_info->ShortDebugString() | 
| 128 | 1 |                 << " since we received registration for a tserver with a higher sequence number: " | 
| 129 | 1 |                 << instance.ShortDebugString(); | 
| 130 |  |             // Mark the old node to be removed, since we have a newer sequence number. | 
| 131 | 1 |             map_entry.second->SetRemoved(); | 
| 132 | 1 |           } | 
| 133 | 1 |         } | 
| 134 | 8.85k |       } | 
| 135 |  |  | 
| 136 | 8.15k |       auto new_desc = VERIFY_RESULT(TSDescriptor::RegisterNew( | 
| 137 | 8.15k |           instance, registration, std::move(local_cloud_info), proxy_cache, | 
| 138 | 8.15k |           registered_through_heartbeat)); | 
| 139 | 0 |       InsertOrDie(&servers_by_id_, uuid, std::move(new_desc)); | 
| 140 | 8.15k |       LOG(INFO) << "Registered new tablet server { " << instance.ShortDebugString() | 
| 141 | 8.15k |                 << " } with Master, full list: " << yb::ToString(servers_by_id_); | 
| 142 |  |  | 
| 143 | 8.15k |     } else { | 
| 144 | 145 |       RETURN_NOT_OK(it->second->Register( | 
| 145 | 145 |           instance, registration, std::move(local_cloud_info), proxy_cache)); | 
| 146 | 136 |       LOG(INFO) << "Re-registered known tablet server { " << instance.ShortDebugString() | 
| 147 | 136 |                 << " }: " << registration.ShortDebugString(); | 
| 148 | 136 |     } | 
| 149 |  |  | 
| 150 | 8.28k |     if (!ts_count_callback_.empty()) { | 
| 151 | 2.14k |       auto new_count = GetCountUnlocked(); | 
| 152 | 2.14k |       if (new_count >= ts_count_callback_min_count_) { | 
| 153 | 901 |         callback_to_call = std::move(ts_count_callback_); | 
| 154 | 901 |         ts_count_callback_min_count_ = 0; | 
| 155 | 901 |       } | 
| 156 | 2.14k |     } | 
| 157 | 8.28k |   } | 
| 158 |  |  | 
| 159 | 8.28k |   if (!callback_to_call.empty()) { | 
| 160 | 901 |     callback_to_call(); | 
| 161 | 901 |   } | 
| 162 |  |  | 
| 163 | 8.28k |   return Status::OK(); | 
| 164 | 8.29k | } | 
| 165 |  |  | 
| 166 |  | void TSManager::GetDescriptors(std::function<bool(const TSDescriptorPtr&)> condition, | 
| 167 | 6.76M |                                TSDescriptorVector* descs) const { | 
| 168 | 6.76M |   SharedLock<decltype(lock_)> l(lock_); | 
| 169 | 6.76M |   GetDescriptorsUnlocked(condition, descs); | 
| 170 | 6.76M | } | 
| 171 |  |  | 
| 172 |  | void TSManager::GetDescriptorsUnlocked( | 
| 173 | 11.5M |     std::function<bool(const TSDescriptorPtr&)> condition, TSDescriptorVector* descs) const { | 
| 174 | 11.5M |   descs->clear(); | 
| 175 |  |  | 
| 176 | 11.5M |   descs->reserve(servers_by_id_.size()); | 
| 177 | 39.2M |   for (const TSDescriptorMap::value_type& entry : servers_by_id_) { | 
| 178 | 39.2M |     const TSDescriptorPtr& ts = entry.second; | 
| 179 | 39.2M |     if (condition(ts)) { | 
| 180 | 18.4E |       VLOG(1) << " Adding " << yb::ToString(*ts); | 
| 181 | 38.5M |       descs->push_back(ts); | 
| 182 | 38.5M |     } else { | 
| 183 | 18.4E |       VLOG(1) << " NOT Adding " << yb::ToString(*ts); | 
| 184 | 676k |     } | 
| 185 | 39.2M |   } | 
| 186 | 11.5M | } | 
| 187 |  |  | 
| 188 | 4.76M | void TSManager::GetAllDescriptors(TSDescriptorVector* descs) const { | 
| 189 | 4.76M |   SharedLock<decltype(lock_)> l(lock_); | 
| 190 | 4.76M |   GetAllDescriptorsUnlocked(descs); | 
| 191 | 4.76M | } | 
| 192 |  |  | 
| 193 | 1 | TSDescriptorVector TSManager::GetAllDescriptors() const { | 
| 194 | 1 |   TSDescriptorVector descs; | 
| 195 | 1 |   GetAllDescriptors(&descs); | 
| 196 | 1 |   return descs; | 
| 197 | 1 | } | 
| 198 |  |  | 
| 199 | 4.76M | void TSManager::GetAllDescriptorsUnlocked(TSDescriptorVector* descs) const { | 
| 200 | 15.0M |   GetDescriptorsUnlocked([](const TSDescriptorPtr& ts) -> bool { return !ts->IsRemoved(); }, descs); | 
| 201 | 4.76M | } | 
| 202 |  |  | 
| 203 |  | void TSManager::GetAllLiveDescriptors(TSDescriptorVector* descs, | 
| 204 | 6.75M |                                       const boost::optional<BlacklistSet>& blacklist) const { | 
| 205 | 24.1M |   GetDescriptors([blacklist](const TSDescriptorPtr& ts) -> bool { | 
| 206 | 24.1M |     return ts->IsLive() && !IsTsBlacklisted(ts, blacklist)23.4M; }, descs); | 
| 207 | 6.75M | } | 
| 208 |  |  | 
| 209 | 0 | void TSManager::GetAllReportedDescriptors(TSDescriptorVector* descs) const { | 
| 210 | 0 |   GetDescriptors([](const TSDescriptorPtr& ts) | 
| 211 | 0 |                    -> bool { return ts->IsLive() && ts->has_tablet_report(); }, descs); | 
| 212 | 0 | } | 
| 213 |  |  | 
| 214 | 22.7k | bool TSManager::IsTsInCluster(const TSDescriptorPtr& ts, string cluster_uuid) { | 
| 215 | 22.7k |   return ts->placement_uuid() == cluster_uuid; | 
| 216 | 22.7k | } | 
| 217 |  |  | 
| 218 |  | bool TSManager::IsTsBlacklisted(const TSDescriptorPtr& ts, | 
| 219 | 23.4M |                                 const boost::optional<BlacklistSet>& blacklist) { | 
| 220 | 23.4M |   if (!blacklist.is_initialized()) { | 
| 221 | 22.7M |     return false; | 
| 222 | 22.7M |   } | 
| 223 | 762k |   return ts->IsBlacklisted(*blacklist); | 
| 224 | 23.4M | } | 
| 225 |  |  | 
| 226 |  | void TSManager::GetAllLiveDescriptorsInCluster(TSDescriptorVector* descs, | 
| 227 |  |     string placement_uuid, | 
| 228 |  |     const boost::optional<BlacklistSet>& blacklist, | 
| 229 | 8.16k |     bool primary_cluster) const { | 
| 230 | 8.16k |   descs->clear(); | 
| 231 | 8.16k |   SharedLock<decltype(lock_)> l(lock_); | 
| 232 |  |  | 
| 233 | 8.16k |   descs->reserve(servers_by_id_.size()); | 
| 234 | 22.7k |   for (const TSDescriptorMap::value_type& entry : servers_by_id_) { | 
| 235 | 22.7k |     const TSDescriptorPtr& ts = entry.second; | 
| 236 |  |     // ts_in_cluster true if there's a matching config and tserver placement uuid or | 
| 237 |  |     // if we're getting primary nodes and the tserver placement uuid is empty. | 
| 238 | 22.7k |     bool ts_in_cluster = (IsTsInCluster(ts, placement_uuid) || | 
| 239 | 22.7k |                          (44 primary_cluster44&& ts->placement_uuid().empty()44)); | 
| 240 | 22.7k |     if (ts->IsLive() && !IsTsBlacklisted(ts, blacklist)22.7k&& ts_in_cluster22.7k) { | 
| 241 | 22.6k |       descs->push_back(ts); | 
| 242 | 22.6k |     } | 
| 243 | 22.7k |   } | 
| 244 | 8.16k | } | 
| 245 |  |  | 
| 246 | 0 | const TSDescriptorPtr TSManager::GetTSDescriptor(const HostPortPB& host_port) const { | 
| 247 | 0 |   SharedLock<decltype(lock_)> l(lock_); | 
| 248 |  | 
 | 
| 249 | 0 |   for (const TSDescriptorMap::value_type& entry : servers_by_id_) { | 
| 250 | 0 |     const TSDescriptorPtr& ts = entry.second; | 
| 251 | 0 |     if (ts->IsLive() && ts->IsRunningOn(host_port)) { | 
| 252 | 0 |       return ts; | 
| 253 | 0 |     } | 
| 254 | 0 |   } | 
| 255 |  |  | 
| 256 | 0 |   return nullptr; | 
| 257 | 0 | } | 
| 258 |  |  | 
| 259 | 2.14k | size_t TSManager::GetCountUnlocked() const { | 
| 260 | 2.14k |   TSDescriptorVector descs; | 
| 261 | 2.14k |   GetAllDescriptorsUnlocked(&descs); | 
| 262 | 2.14k |   return descs.size(); | 
| 263 | 2.14k | } | 
| 264 |  |  | 
| 265 |  | // Register a callback to be called when the number of tablet servers reaches a certain number. | 
| 266 | 977 | void TSManager::SetTSCountCallback(int min_count, TSCountCallback callback) { | 
| 267 | 977 |   std::lock_guard<rw_spinlock> l(lock_); | 
| 268 | 977 |   ts_count_callback_ = std::move(callback); | 
| 269 | 977 |   ts_count_callback_min_count_ = min_count; | 
| 270 | 977 | } | 
| 271 |  |  | 
| 272 |  | } // namespace master | 
| 273 |  | } // namespace yb |