YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/server/server_base_options.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include "yb/server/server_base_options.h"
34
35
#include <functional>
36
#include <memory>
37
#include <mutex>
38
#include <string>
39
#include <thread>
40
#include <unordered_map>
41
#include <vector>
42
43
#include <glog/logging.h>
44
45
#include "yb/common/common_net.pb.h"
46
47
#include "yb/gutil/macros.h"
48
#include "yb/gutil/ref_counted.h"
49
#include "yb/gutil/strings/join.h"
50
51
#include "yb/master/master_defaults.h"
52
53
#include "yb/rpc/rpc_fwd.h"
54
55
#include "yb/util/faststring.h"
56
#include "yb/util/flag_tags.h"
57
#include "yb/util/monotime.h"
58
#include "yb/util/net/net_util.h"
59
#include "yb/util/net/sockaddr.h"
60
#include "yb/util/result.h"
61
#include "yb/util/slice.h"
62
#include "yb/util/status.h"
63
#include "yb/util/status_format.h"
64
65
// The following flags related to the cloud, region and availability zone that an instance is
66
// started in. These are passed in from whatever provisioning mechanics start the servers. They
67
// are used for generic placement policies on table creation and tablet load balancing, to
68
// either constrain data to a certain location (table A should only live in aws.us-west2.a), or to
69
// define the required level of fault tolerance expected (table B should have N replicas, across
70
// two regions of AWS and one of GCE).
71
//
72
// These are currently for use in a cloud-based deployment, but could be retrofitted to work for
73
// an on-premise deployment as well, with datacenter, cluster and rack levels, for example.
74
DEFINE_string(placement_cloud, "cloud1",
75
              "The cloud in which this instance is started.");
76
DEFINE_string(placement_region, "datacenter1",
77
              "The cloud region in which this instance is started.");
78
DEFINE_string(placement_zone, "rack1",
79
              "The cloud availability zone in which this instance is started.");
80
DEFINE_string(placement_uuid, "",
81
              "The uuid of the tservers cluster/placement.");
82
83
DEFINE_int32(master_discovery_timeout_ms, 3600000,
84
             "Timeout for masters to discover each other during cluster creation/startup");
85
TAG_FLAG(master_discovery_timeout_ms, hidden);
86
87
namespace yb {
88
namespace server {
89
90
using std::vector;
91
using namespace std::literals;
92
93
DEFINE_string(server_dump_info_path, "",
94
              "Path into which the server information will be "
95
              "dumped after startup. The dumped data is described by "
96
              "ServerStatusPB in server_base.proto. The dump format is "
97
              "determined by --server_dump_info_format");
98
DEFINE_string(server_dump_info_format, "json",
99
              "Format for --server_dump_info_path. This may be either "
100
              "'pb' or 'json'.");
101
TAG_FLAG(server_dump_info_path, hidden);
102
TAG_FLAG(server_dump_info_format, hidden);
103
104
DEFINE_int32(metrics_log_interval_ms, 0,
105
             "Interval (in milliseconds) at which the server will dump its "
106
             "metrics to a local log file. The log files are located in the same "
107
             "directory as specified by the -log_dir flag. If this is not a positive "
108
             "value, then metrics logging will be disabled.");
109
TAG_FLAG(metrics_log_interval_ms, advanced);
110
111
DEFINE_string(server_broadcast_addresses, "", "Broadcast addresses for this server.");
112
113
ServerBaseOptions::ServerBaseOptions(int default_port)
114
    : env(Env::Default()),
115
      dump_info_path(FLAGS_server_dump_info_path),
116
      dump_info_format(FLAGS_server_dump_info_format),
117
      metrics_log_interval_ms(FLAGS_metrics_log_interval_ms),
118
      placement_uuid(FLAGS_placement_uuid),
119
19.1k
      server_broadcast_addresses(FLAGS_server_broadcast_addresses) {
120
19.1k
  rpc_opts.default_port = default_port;
121
19.1k
  if (!FLAGS_server_broadcast_addresses.empty()) {
122
0
    auto status = HostPort::ParseStrings(FLAGS_server_broadcast_addresses, default_port,
123
0
                                         &broadcast_addresses);
124
0
    LOG_IF(DFATAL, !status.ok()) << "Bad public IPs " << FLAGS_server_broadcast_addresses
125
0
                                 << ": " << status;
126
0
  }
127
19.1k
}
128
129
ServerBaseOptions::ServerBaseOptions(const ServerBaseOptions& options)
130
    : env(options.env),
131
      server_type(options.server_type),
132
      fs_opts(options.fs_opts),
133
      rpc_opts(options.rpc_opts),
134
      webserver_opts(options.webserver_opts),
135
      dump_info_path(options.dump_info_path),
136
      dump_info_format(options.dump_info_format),
137
      metrics_log_interval_ms(options.metrics_log_interval_ms),
138
      placement_uuid(options.placement_uuid),
139
      master_addresses_flag(options.master_addresses_flag),
140
      server_broadcast_addresses(options.server_broadcast_addresses),
141
      broadcast_addresses(options.broadcast_addresses),
142
      placement_cloud_(options.placement_cloud_),
143
      placement_region_(options.placement_region_),
144
46.7k
      placement_zone_(options.placement_zone_) {
145
46.7k
  CompleteWebserverOptions();
146
46.7k
  SetMasterAddressesNoValidation(options.GetMasterAddresses());
147
46.7k
}
148
149
64.2k
WebserverOptions& ServerBaseOptions::CompleteWebserverOptions() {
150
64.2k
  if (webserver_opts.bind_interface.empty()) {
151
1.76k
    std::vector<HostPort> bind_addresses;
152
1.76k
    auto status = HostPort::ParseStrings(rpc_opts.rpc_bind_addresses, 0, &bind_addresses);
153
0
    LOG_IF(DFATAL, !status.ok()) << "Invalid rpc_bind_address "
154
0
                                 << rpc_opts.rpc_bind_addresses << ": " << status;
155
1.76k
    if (!bind_addresses.empty()) {
156
1.76k
      webserver_opts.bind_interface = bind_addresses.at(0).host();
157
1.76k
    }
158
1.76k
  }
159
160
64.2k
  return webserver_opts;
161
64.2k
}
162
163
67.1k
void ServerBaseOptions::SetMasterAddressesNoValidation(MasterAddressesPtr master_addresses) {
164
67.1k
  if (master_addresses) {
165
67.1k
    LOG(INFO) << "Updating master addrs to " << MasterAddressesToString(*master_addresses);
166
67.1k
  }
167
168
67.1k
  std::lock_guard<std::mutex> l(master_addresses_mtx_);
169
67.1k
  master_addresses_ = master_addresses;
170
67.1k
}
171
172
122k
MasterAddressesPtr ServerBaseOptions::GetMasterAddresses() const {
173
122k
  std::lock_guard<std::mutex> l(master_addresses_mtx_);
174
122k
  return master_addresses_;
175
122k
}
176
177
template <class It>
178
33.9k
Result<std::vector<HostPort>> MasterHostPortsFromIterators(It begin, const It& end) {
179
33.9k
  std::vector<HostPort> result;
180
33.9k
  for (;;) {
181
33.9k
    auto split = std::find(begin, end, ',');
182
33.9k
    result.push_back(VERIFY_RESULT(HostPort::FromString(
183
33.9k
        std::string(begin, split), master::kMasterDefaultPort)));
184
33.9k
    if (split == end) {
185
33.9k
      break;
186
33.9k
    }
187
18.4E
    begin = ++split;
188
18.4E
  }
189
33.9k
  return result;
190
33.9k
}
191
192
12.6k
Result<MasterAddresses> ParseMasterAddresses(const std::string& source) {
193
12.6k
  MasterAddresses result;
194
12.6k
  auto token_begin = source.begin();
195
12.6k
  auto end = source.end();
196
593k
  for (auto i = source.begin(); i != end; ++i) {
197
580k
    if (*i == '{') {
198
0
      if (token_begin != i) {
199
0
        return STATUS_FORMAT(InvalidArgument, "'{' in the middle of token in $0", source);
200
0
      }
201
0
      ++i;
202
0
      auto token_end = std::find(i, end, '}');
203
0
      if (token_end == end) {
204
0
        return STATUS_FORMAT(InvalidArgument, "'{' is not terminated in $0", source);
205
0
      }
206
0
      result.push_back(VERIFY_RESULT(MasterHostPortsFromIterators(i, token_end)));
207
0
      i = token_end;
208
0
      ++i;
209
0
      token_begin = i;
210
0
      if (i == end) {
211
0
        break;
212
0
      }
213
0
      if (*i != ',') {
214
0
        return STATUS_FORMAT(InvalidArgument, "',' expected after '}' in $0", source);
215
0
      }
216
0
      ++token_begin;
217
580k
    } else if (*i == ',') {
218
21.3k
      result.push_back(VERIFY_RESULT(MasterHostPortsFromIterators(token_begin, i)));
219
21.3k
      token_begin = i;
220
21.3k
      ++token_begin;
221
21.3k
    }
222
580k
  }
223
12.6k
  if (token_begin != end) {
224
12.6k
    result.push_back(VERIFY_RESULT(MasterHostPortsFromIterators(token_begin, end)));
225
12.6k
  }
226
12.6k
  return std::move(result);
227
12.6k
}
228
229
Status DetermineMasterAddresses(
230
    const std::string& master_addresses_flag_name, const std::string& master_addresses_flag,
231
    uint64_t master_replication_factor, MasterAddresses* master_addresses,
232
12.6k
    std::string* master_addresses_resolved_str) {
233
12.6k
  const auto kResolvePeriod = 1s;
234
235
12.6k
  *master_addresses = VERIFY_RESULT_PREPEND(
236
12.6k
      ParseMasterAddresses(master_addresses_flag),
237
12.6k
      Format("Couldn't parse the $0 flag ('$1')",
238
12.6k
             master_addresses_flag_name, master_addresses_flag));
239
240
12.6k
  if (master_replication_factor <= 0) {
241
12.6k
    *master_addresses_resolved_str = master_addresses_flag;
242
12.6k
    return Status::OK();
243
12.6k
  }
244
245
18.4E
  std::vector<Endpoint> addrs;
246
0
  for (;;) {
247
0
    addrs.clear();
248
0
    for (const auto& list : *master_addresses) {
249
0
      for (const auto& hp : list) {
250
0
        auto s = hp.ResolveAddresses(&addrs);
251
0
        LOG_IF(WARNING, !s.ok()) << s;
252
0
      }
253
0
    }
254
0
    if (addrs.size() >= master_replication_factor) {
255
0
      break;
256
0
    }
257
0
    std::this_thread::sleep_for(kResolvePeriod);
258
0
  }
259
18.4E
  if (addrs.size() > master_replication_factor) {
260
0
    return STATUS_FORMAT(
261
0
        ConfigurationError, "Expected $0 master endpoints, but got: $1",
262
0
        master_replication_factor, yb::ToString(addrs));
263
0
  }
264
18.4E
  LOG(INFO) << Format("Resolved master addresses: $0", yb::ToString(addrs));
265
18.4E
  master_addresses->clear();
266
18.4E
  std::vector<std::string> master_addr_strings(addrs.size());
267
0
  for (const auto& addr : addrs) {
268
0
    auto hp = HostPort(addr);
269
0
    master_addresses->emplace_back(std::vector<HostPort>(1, hp));
270
0
    master_addr_strings.emplace_back(hp.ToString());
271
0
  }
272
18.4E
  *master_addresses_resolved_str = JoinStrings(master_addr_strings, ",");
273
18.4E
  return Status::OK();
274
18.4E
}
275
276
11.4k
Result<std::vector<Endpoint>> ResolveMasterAddresses(const MasterAddresses& master_addresses) {
277
11.4k
  const auto resolve_sleep_interval_sec = 1;
278
11.4k
  auto resolve_max_iterations =
279
11.4k
      (FLAGS_master_discovery_timeout_ms / 1000) / resolve_sleep_interval_sec;
280
11.4k
  if (resolve_max_iterations < 120) {
281
0
    resolve_max_iterations = 120;
282
0
  }
283
284
11.4k
  std::vector<Endpoint> result;
285
30.0k
  for (const auto& list : master_addresses) {
286
31.2k
    for (const auto& master_addr : list) {
287
      // Retry resolving master address for 'master_discovery_timeout' period of time
288
31.2k
      int num_iters = 0;
289
31.2k
      Status s = master_addr.ResolveAddresses(&result);
290
31.2k
      while (!s.ok()) {
291
0
        num_iters++;
292
0
        if (num_iters > resolve_max_iterations) {
293
0
          return STATUS_FORMAT(ConfigurationError, "Couldn't resolve master service address '$0'",
294
0
              master_addr);
295
0
        }
296
0
        std::this_thread::sleep_for(std::chrono::seconds(resolve_sleep_interval_sec));
297
0
        s = master_addr.ResolveAddresses(&result);
298
0
      }
299
31.2k
    }
300
30.0k
  }
301
11.4k
  return result;
302
11.4k
}
303
304
67.8k
std::string MasterAddressesToString(const MasterAddresses& addresses) {
305
67.8k
  std::string result;
306
67.8k
  bool first_master = true;
307
174k
  for (const auto& list : addresses) {
308
174k
    if (first_master) {
309
67.6k
      first_master = false;
310
106k
    } else {
311
106k
      result += ',';
312
106k
    }
313
174k
    result += '{';
314
174k
    bool first_address = true;
315
178k
    for (const auto& hp : list) {
316
178k
      if (first_address) {
317
174k
        first_address = false;
318
4.61k
      } else {
319
4.61k
        result += ',';
320
4.61k
      }
321
178k
      result += hp.ToString();
322
178k
    }
323
174k
    result += '}';
324
174k
  }
325
67.8k
  return result;
326
67.8k
}
327
328
2.01k
CloudInfoPB GetPlacementFromGFlags() {
329
2.01k
  CloudInfoPB result;
330
2.01k
  result.set_placement_cloud(FLAGS_placement_cloud);
331
2.01k
  result.set_placement_region(FLAGS_placement_region);
332
2.01k
  result.set_placement_zone(FLAGS_placement_zone);
333
2.01k
  return result;
334
2.01k
}
335
336
45.0k
CloudInfoPB ServerBaseOptions::MakeCloudInfoPB() const {
337
45.0k
  CloudInfoPB result;
338
45.0k
  result.set_placement_cloud(placement_cloud());
339
45.0k
  result.set_placement_region(placement_region());
340
45.0k
  result.set_placement_zone(placement_zone());
341
45.0k
  return result;
342
45.0k
}
343
344
93.3k
const std::string& ServerBaseOptions::placement_cloud() const {
345
88.3k
  return placement_cloud_.empty() ? FLAGS_placement_cloud : placement_cloud_;
346
93.3k
}
347
348
93.3k
const std::string& ServerBaseOptions::placement_region() const {
349
88.3k
  return placement_region_.empty() ? FLAGS_placement_region : placement_region_;
350
93.3k
}
351
352
93.3k
const std::string& ServerBaseOptions::placement_zone() const {
353
88.3k
  return placement_zone_.empty() ? FLAGS_placement_zone : placement_zone_;
354
93.3k
}
355
356
1.23k
bool ServerBaseOptions::has_placement_cloud() const {
357
1.23k
  return !placement_cloud_.empty();
358
1.23k
}
359
360
1.24k
void ServerBaseOptions::SetPlacement(std::string cloud, std::string region, std::string zone) {
361
1.24k
  placement_cloud_ = std::move(cloud);
362
1.24k
  placement_region_ = std::move(region);
363
1.24k
  placement_zone_ = std::move(zone);
364
1.24k
}
365
366
} // namespace server
367
} // namespace yb