/Users/deen/code/yugabyte-db/src/yb/util/net/dns_resolver.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | // |
18 | | // The following only applies to changes made to this file as part of YugaByte development. |
19 | | // |
20 | | // Portions Copyright (c) YugaByte, Inc. |
21 | | // |
22 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
23 | | // in compliance with the License. You may obtain a copy of the License at |
24 | | // |
25 | | // http://www.apache.org/licenses/LICENSE-2.0 |
26 | | // |
27 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
28 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
29 | | // or implied. See the License for the specific language governing permissions and limitations |
30 | | // under the License. |
31 | | // |
32 | | |
33 | | #include "yb/util/net/dns_resolver.h" |
34 | | |
35 | | #include <mutex> |
36 | | #include <shared_mutex> |
37 | | #include <unordered_map> |
38 | | #include <vector> |
39 | | |
40 | | #include <glog/logging.h> |
41 | | |
42 | | #include "yb/util/metrics.h" |
43 | | #include "yb/util/net/net_fwd.h" |
44 | | #include "yb/util/net/inetaddress.h" |
45 | | #include "yb/util/net/net_util.h" |
46 | | #include "yb/util/net/sockaddr.h" |
47 | | #include "yb/util/result.h" |
48 | | #include "yb/util/status_format.h" |
49 | | |
50 | | using namespace std::literals; |
51 | | |
52 | | DEFINE_int64(dns_cache_expiration_ms, 60000, "Time to store DNS resolution results in cache."); |
53 | | |
54 | | namespace yb { |
55 | | |
56 | | namespace { |
57 | | |
58 | | |
59 | | Result<IpAddress> PickResolvedAddress( |
60 | | const std::string& host, const boost::system::error_code& error, |
61 | 1 | const ResolverResults& entries) { |
62 | 1 | if (error) { |
63 | 0 | return STATUS_FORMAT(NetworkError, "Resolve failed $0: $1", host, error.message()); |
64 | 0 | } |
65 | 1 | std::vector<IpAddress> addresses; |
66 | 2 | for (const auto& entry : entries) { |
67 | 2 | addresses.push_back(entry.endpoint().address()); |
68 | 2 | VLOG(3) << "Resolved address " << entry.endpoint().address().to_string() |
69 | 0 | << " for host " << host; |
70 | 2 | } |
71 | 1 | FilterAddresses(FLAGS_net_address_filter, &addresses); |
72 | 1 | if (addresses.empty()) { |
73 | 0 | return STATUS_FORMAT(NetworkError, "No endpoints resolved for: $0", host); |
74 | 0 | } |
75 | 1 | std::sort(addresses.begin(), addresses.end()); |
76 | 1 | addresses.erase(std::unique(addresses.begin(), addresses.end()), addresses.end()); |
77 | 1 | if (addresses.size() > 1) { |
78 | 1 | LOG(WARNING) << "Peer address '" << host << "' " |
79 | 1 | << "resolves to " << yb::ToString(addresses) << " different addresses. Using " |
80 | 1 | << yb::ToString(addresses.front()); |
81 | 1 | } |
82 | | |
83 | 1 | VLOG(3) << "Returned address " << addresses[0].to_string() << " for host " |
84 | 0 | << host; |
85 | 1 | return addresses.front(); |
86 | 1 | } |
87 | | |
88 | | } // namespace |
89 | | |
90 | | class DnsResolver::Impl { |
91 | | public: |
92 | 35.2k | explicit Impl(IoService* io_service) : io_service_(*io_service), resolver_(*io_service) {} |
93 | | |
94 | 532k | std::shared_future<Result<IpAddress>> ResolveFuture(const std::string& host) { |
95 | 532k | return ObtainEntry(host)->DoResolve(host, /* callback= */ nullptr, &io_service_, &resolver_); |
96 | 532k | } |
97 | | |
98 | 0 | void AsyncResolve(const std::string& host, const AsyncResolveCallback& callback) { |
99 | 0 | ObtainEntry(host)->DoResolve(host, &callback, &io_service_, &resolver_); |
100 | 0 | } |
101 | | |
102 | | private: |
103 | | using Resolver = boost::asio::ip::basic_resolver<boost::asio::ip::tcp>; |
104 | | |
105 | | struct CacheEntry { |
106 | | std::mutex mutex; |
107 | | CoarseTimePoint expiration GUARDED_BY(mutex) = CoarseTimePoint::min(); |
108 | | std::shared_future<Result<IpAddress>> future GUARDED_BY(mutex); |
109 | | std::vector<AsyncResolveCallback> waiters GUARDED_BY(mutex); |
110 | | |
111 | | void SetResult( |
112 | | const Result<IpAddress>& result, |
113 | 1 | std::promise<Result<IpAddress>>* promise) EXCLUDES(mutex) { |
114 | 1 | try { |
115 | 1 | promise->set_value(result); |
116 | 1 | } catch (std::future_error& error) { |
117 | 0 | return; |
118 | 0 | } |
119 | | |
120 | 1 | decltype(waiters) to_notify; |
121 | 1 | { |
122 | 1 | std::lock_guard<std::mutex> lock(mutex); |
123 | 1 | expiration = CoarseMonoClock::now() + FLAGS_dns_cache_expiration_ms * 1ms; |
124 | 1 | waiters.swap(to_notify); |
125 | 1 | } |
126 | 1 | for (const auto& waiter : to_notify) { |
127 | 0 | waiter(result); |
128 | 0 | } |
129 | 1 | } |
130 | | |
131 | | std::shared_future<Result<IpAddress>> DoResolve( |
132 | | const std::string& host, const AsyncResolveCallback* callback, IoService* io_service, |
133 | 532k | Resolver* resolver) { |
134 | 532k | std::shared_ptr<std::promise<Result<IpAddress>>> promise; |
135 | 532k | std::shared_future<Result<IpAddress>> result; |
136 | 532k | { |
137 | 532k | std::lock_guard<std::mutex> lock(mutex); |
138 | 532k | promise = StartResolve(host); |
139 | 532k | result = future; |
140 | 532k | if (callback && expiration == CoarseTimePoint::max()0 ) { |
141 | | // Resolve is in progress by a different caller. |
142 | 0 | waiters.push_back(*callback); |
143 | 0 | callback = nullptr; |
144 | 0 | } |
145 | 532k | } |
146 | | |
147 | 532k | if (callback) { |
148 | 0 | (*callback)(result.get()); |
149 | 0 | } |
150 | | |
151 | 532k | if (promise) { |
152 | 1 | static const std::string kService = ""; |
153 | 1 | resolver->async_resolve( |
154 | 1 | Resolver::query(host, kService), |
155 | 1 | [this, host, promise]( |
156 | 1 | const boost::system::error_code& error, |
157 | 1 | const Resolver::results_type& entries) mutable { |
158 | | // Unfortunately there is no safe way to set promise value from 2 different threads, w/o |
159 | | // catching exception in case of concurrency. |
160 | 1 | SetResult(PickResolvedAddress(host, error, entries), promise.get()); |
161 | 1 | }); |
162 | | |
163 | 1 | if (io_service->stopped()) { |
164 | 0 | SetResult(STATUS(Aborted, "Messenger already stopped"), promise.get()); |
165 | 0 | } |
166 | 1 | } |
167 | | |
168 | 532k | return result; |
169 | 532k | } |
170 | | |
171 | | std::shared_ptr<std::promise<Result<IpAddress>>> StartResolve( |
172 | 532k | const std::string& host) REQUIRES(mutex) { |
173 | 532k | if (expiration >= CoarseMonoClock::now()) { |
174 | 508k | return nullptr; |
175 | 508k | } |
176 | | |
177 | 24.0k | auto promise = std::make_shared<std::promise<Result<IpAddress>>>(); |
178 | 24.0k | future = promise->get_future().share(); |
179 | | |
180 | 24.0k | auto address = TryFastResolve(host); |
181 | 24.0k | if (address) { |
182 | 23.8k | expiration = CoarseTimePoint::max() - 1ms; |
183 | 23.8k | promise->set_value(*address); |
184 | 23.8k | return nullptr; |
185 | 23.8k | } else { |
186 | 261 | expiration = CoarseTimePoint::max(); |
187 | 261 | } |
188 | | |
189 | 261 | return promise; |
190 | 24.0k | } |
191 | | }; |
192 | | |
193 | 532k | CacheEntry* ObtainEntry(const std::string& host) { |
194 | 532k | { |
195 | 532k | std::shared_lock<decltype(mutex_)> lock(mutex_); |
196 | 532k | auto it = cache_.find(host); |
197 | 532k | if (it != cache_.end()) { |
198 | 508k | return &it->second; |
199 | 508k | } |
200 | 532k | } |
201 | | |
202 | 23.9k | std::lock_guard<decltype(mutex_)> lock(mutex_); |
203 | 23.9k | return &cache_[host]; |
204 | 532k | } |
205 | | |
206 | | IoService& io_service_; |
207 | | Resolver resolver_; |
208 | | std::shared_timed_mutex mutex_; |
209 | | std::unordered_map<std::string, CacheEntry> cache_; |
210 | | }; |
211 | | |
212 | 35.2k | DnsResolver::DnsResolver(IoService* io_service) : impl_(new Impl(io_service)) { |
213 | 35.2k | } |
214 | | |
215 | 9.02k | DnsResolver::~DnsResolver() { |
216 | 9.02k | } |
217 | | |
218 | | namespace { |
219 | | |
220 | | thread_local Histogram* active_metric_ = nullptr; |
221 | | |
222 | | } // anonymous namespace |
223 | | |
224 | | ScopedDnsTracker::ScopedDnsTracker(const scoped_refptr<Histogram>& metric) |
225 | 86.2k | : old_metric_(active_metric()), metric_(metric) { |
226 | 86.2k | active_metric_ = metric.get(); |
227 | 86.2k | } |
228 | | |
229 | 86.2k | ScopedDnsTracker::~ScopedDnsTracker() { |
230 | 86.2k | DCHECK_EQ(metric_.get(), active_metric()); |
231 | 86.2k | active_metric_ = old_metric_; |
232 | 86.2k | } |
233 | | |
234 | 385k | Histogram* ScopedDnsTracker::active_metric() { |
235 | 385k | return active_metric_; |
236 | 385k | } |
237 | | |
238 | 532k | std::shared_future<Result<IpAddress>> DnsResolver::ResolveFuture(const std::string& host) { |
239 | 532k | return impl_->ResolveFuture(host); |
240 | 532k | } |
241 | | |
242 | 0 | void DnsResolver::AsyncResolve(const std::string& host, const AsyncResolveCallback& callback) { |
243 | 0 | impl_->AsyncResolve(host, callback); |
244 | 0 | } |
245 | | |
246 | 187k | Result<IpAddress> DnsResolver::Resolve(const std::string& host) { |
247 | 187k | return ResolveFuture(host).get(); |
248 | 187k | } |
249 | | |
250 | | } // namespace yb |