YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/util/net/dns_resolver.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include "yb/util/net/dns_resolver.h"
34
35
#include <mutex>
36
#include <shared_mutex>
37
#include <unordered_map>
38
#include <vector>
39
40
#include <glog/logging.h>
41
42
#include "yb/util/metrics.h"
43
#include "yb/util/net/net_fwd.h"
44
#include "yb/util/net/inetaddress.h"
45
#include "yb/util/net/net_util.h"
46
#include "yb/util/net/sockaddr.h"
47
#include "yb/util/result.h"
48
#include "yb/util/status_format.h"
49
50
using namespace std::literals;
51
52
DEFINE_int64(dns_cache_expiration_ms, 60000, "Time to store DNS resolution results in cache.");
53
54
namespace yb {
55
56
namespace {
57
58
59
Result<IpAddress> PickResolvedAddress(
60
    const std::string& host, const boost::system::error_code& error,
61
1
    const ResolverResults& entries) {
62
1
  if (error) {
63
0
    return STATUS_FORMAT(NetworkError, "Resolve failed $0: $1", host, error.message());
64
0
  }
65
1
  std::vector<IpAddress> addresses;
66
2
  for (const auto& entry : entries) {
67
2
    addresses.push_back(entry.endpoint().address());
68
2
    VLOG(3) << "Resolved address " << entry.endpoint().address().to_string()
69
0
            << " for host " << host;
70
2
  }
71
1
  FilterAddresses(FLAGS_net_address_filter, &addresses);
72
1
  if (addresses.empty()) {
73
0
    return STATUS_FORMAT(NetworkError, "No endpoints resolved for: $0", host);
74
0
  }
75
1
  std::sort(addresses.begin(), addresses.end());
76
1
  addresses.erase(std::unique(addresses.begin(), addresses.end()), addresses.end());
77
1
  if (addresses.size() > 1) {
78
1
    LOG(WARNING) << "Peer address '" << host << "' "
79
1
                 << "resolves to " << yb::ToString(addresses) << " different addresses. Using "
80
1
                 << yb::ToString(addresses.front());
81
1
  }
82
83
1
  VLOG(3) << "Returned address " << addresses[0].to_string() << " for host "
84
0
          << host;
85
1
  return addresses.front();
86
1
}
87
88
} // namespace
89
90
class DnsResolver::Impl {
91
 public:
92
35.2k
  explicit Impl(IoService* io_service) : io_service_(*io_service), resolver_(*io_service) {}
93
94
532k
  std::shared_future<Result<IpAddress>> ResolveFuture(const std::string& host) {
95
532k
    return ObtainEntry(host)->DoResolve(host, /* callback= */ nullptr, &io_service_, &resolver_);
96
532k
  }
97
98
0
  void AsyncResolve(const std::string& host, const AsyncResolveCallback& callback) {
99
0
    ObtainEntry(host)->DoResolve(host, &callback, &io_service_, &resolver_);
100
0
  }
101
102
 private:
103
  using Resolver = boost::asio::ip::basic_resolver<boost::asio::ip::tcp>;
104
105
  struct CacheEntry {
106
    std::mutex mutex;
107
    CoarseTimePoint expiration GUARDED_BY(mutex) = CoarseTimePoint::min();
108
    std::shared_future<Result<IpAddress>> future GUARDED_BY(mutex);
109
    std::vector<AsyncResolveCallback> waiters GUARDED_BY(mutex);
110
111
    void SetResult(
112
        const Result<IpAddress>& result,
113
1
        std::promise<Result<IpAddress>>* promise) EXCLUDES(mutex) {
114
1
      try {
115
1
        promise->set_value(result);
116
1
      } catch (std::future_error& error) {
117
0
        return;
118
0
      }
119
120
1
      decltype(waiters) to_notify;
121
1
      {
122
1
        std::lock_guard<std::mutex> lock(mutex);
123
1
        expiration = CoarseMonoClock::now() + FLAGS_dns_cache_expiration_ms * 1ms;
124
1
        waiters.swap(to_notify);
125
1
      }
126
1
      for (const auto& waiter : to_notify) {
127
0
        waiter(result);
128
0
      }
129
1
    }
130
131
    std::shared_future<Result<IpAddress>> DoResolve(
132
        const std::string& host, const AsyncResolveCallback* callback, IoService* io_service,
133
532k
        Resolver* resolver) {
134
532k
      std::shared_ptr<std::promise<Result<IpAddress>>> promise;
135
532k
      std::shared_future<Result<IpAddress>> result;
136
532k
      {
137
532k
        std::lock_guard<std::mutex> lock(mutex);
138
532k
        promise = StartResolve(host);
139
532k
        result = future;
140
532k
        if (callback && 
expiration == CoarseTimePoint::max()0
) {
141
          // Resolve is in progress by a different caller.
142
0
          waiters.push_back(*callback);
143
0
          callback = nullptr;
144
0
        }
145
532k
      }
146
147
532k
      if (callback) {
148
0
        (*callback)(result.get());
149
0
      }
150
151
532k
      if (promise) {
152
1
        static const std::string kService = "";
153
1
        resolver->async_resolve(
154
1
            Resolver::query(host, kService),
155
1
            [this, host, promise](
156
1
                const boost::system::error_code& error,
157
1
                const Resolver::results_type& entries) mutable {
158
          // Unfortunately there is no safe way to set promise value from 2 different threads, w/o
159
          // catching exception in case of concurrency.
160
1
          SetResult(PickResolvedAddress(host, error, entries), promise.get());
161
1
        });
162
163
1
        if (io_service->stopped()) {
164
0
          SetResult(STATUS(Aborted, "Messenger already stopped"), promise.get());
165
0
        }
166
1
      }
167
168
532k
      return result;
169
532k
    }
170
171
    std::shared_ptr<std::promise<Result<IpAddress>>> StartResolve(
172
532k
        const std::string& host) REQUIRES(mutex) {
173
532k
      if (expiration >= CoarseMonoClock::now()) {
174
508k
        return nullptr;
175
508k
      }
176
177
24.0k
      auto promise = std::make_shared<std::promise<Result<IpAddress>>>();
178
24.0k
      future = promise->get_future().share();
179
180
24.0k
      auto address = TryFastResolve(host);
181
24.0k
      if (address) {
182
23.8k
        expiration = CoarseTimePoint::max() - 1ms;
183
23.8k
        promise->set_value(*address);
184
23.8k
        return nullptr;
185
23.8k
      } else {
186
261
        expiration = CoarseTimePoint::max();
187
261
      }
188
189
261
      return promise;
190
24.0k
    }
191
  };
192
193
532k
  CacheEntry* ObtainEntry(const std::string& host) {
194
532k
    {
195
532k
      std::shared_lock<decltype(mutex_)> lock(mutex_);
196
532k
      auto it = cache_.find(host);
197
532k
      if (it != cache_.end()) {
198
508k
        return &it->second;
199
508k
      }
200
532k
    }
201
202
23.9k
    std::lock_guard<decltype(mutex_)> lock(mutex_);
203
23.9k
    return &cache_[host];
204
532k
  }
205
206
  IoService& io_service_;
207
  Resolver resolver_;
208
  std::shared_timed_mutex mutex_;
209
  std::unordered_map<std::string, CacheEntry> cache_;
210
};
211
212
35.2k
DnsResolver::DnsResolver(IoService* io_service) : impl_(new Impl(io_service)) {
213
35.2k
}
214
215
9.02k
DnsResolver::~DnsResolver() {
216
9.02k
}
217
218
namespace {
219
220
thread_local Histogram* active_metric_ = nullptr;
221
222
} // anonymous namespace
223
224
ScopedDnsTracker::ScopedDnsTracker(const scoped_refptr<Histogram>& metric)
225
86.2k
    : old_metric_(active_metric()), metric_(metric) {
226
86.2k
  active_metric_ = metric.get();
227
86.2k
}
228
229
86.2k
ScopedDnsTracker::~ScopedDnsTracker() {
230
86.2k
  DCHECK_EQ(metric_.get(), active_metric());
231
86.2k
  active_metric_ = old_metric_;
232
86.2k
}
233
234
385k
Histogram* ScopedDnsTracker::active_metric() {
235
385k
  return active_metric_;
236
385k
}
237
238
532k
std::shared_future<Result<IpAddress>> DnsResolver::ResolveFuture(const std::string& host) {
239
532k
  return impl_->ResolveFuture(host);
240
532k
}
241
242
0
void DnsResolver::AsyncResolve(const std::string& host, const AsyncResolveCallback& callback) {
243
0
  impl_->AsyncResolve(host, callback);
244
0
}
245
246
187k
Result<IpAddress> DnsResolver::Resolve(const std::string& host) {
247
187k
  return ResolveFuture(host).get();
248
187k
}
249
250
} // namespace yb