YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/server/rpc_server.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include "yb/server/rpc_server.h"
34
35
#include <list>
36
#include <string>
37
#include <vector>
38
39
#include <boost/preprocessor/cat.hpp>
40
#include <boost/preprocessor/stringize.hpp>
41
42
#include "yb/gutil/casts.h"
43
44
#include "yb/rpc/messenger.h"
45
#include "yb/rpc/service_if.h"
46
#include "yb/rpc/service_pool.h"
47
48
#include "yb/util/atomic.h"
49
#include "yb/util/flag_tags.h"
50
#include "yb/util/metric_entity.h"
51
#include "yb/util/monotime.h"
52
#include "yb/util/net/net_util.h"
53
#include "yb/util/status.h"
54
55
using yb::rpc::Messenger;
56
using yb::rpc::ServiceIf;
57
using std::shared_ptr;
58
using std::string;
59
using std::vector;
60
using strings::Substitute;
61
using std::unique_ptr;
62
using std::make_unique;
63
64
DEFINE_string(rpc_bind_addresses, "0.0.0.0",
65
              "Comma-separated list of addresses to bind to for RPC connections. "
66
              "Currently, ephemeral ports (i.e. port 0) are not allowed.");
67
TAG_FLAG(rpc_bind_addresses, stable);
68
69
DEFINE_bool(rpc_server_allow_ephemeral_ports, false,
70
            "Allow binding to ephemeral ports. This can cause problems, so currently "
71
            "only allowed in tests.");
72
TAG_FLAG(rpc_server_allow_ephemeral_ports, unsafe);
73
74
DECLARE_int32(rpc_default_keepalive_time_ms);
75
76
namespace yb {
77
namespace server {
78
79
RpcServerOptions::RpcServerOptions()
80
  : rpc_bind_addresses(FLAGS_rpc_bind_addresses),
81
19.1k
    connection_keepalive_time_ms(FLAGS_rpc_default_keepalive_time_ms) {
82
19.1k
}
83
84
RpcServer::RpcServer(const std::string& name, RpcServerOptions opts,
85
                     rpc::ConnectionContextFactoryPtr connection_context_factory)
86
    : name_(name),
87
      server_state_(UNINITIALIZED),
88
      options_(std::move(opts)),
89
17.5k
      connection_context_factory_(std::move(connection_context_factory)) {
90
17.5k
        LOG(INFO) << "yb::server::RpcServer created at " << this;
91
17.5k
      }
92
93
208
RpcServer::~RpcServer() {
94
208
  Shutdown();
95
208
}
96
97
0
string RpcServer::ToString() const {
98
  // TODO: include port numbers, etc.
99
0
  return "RpcServer";
100
0
}
101
102
17.1k
Status RpcServer::Init(Messenger* messenger) {
103
17.1k
  CHECK_EQ(server_state_, UNINITIALIZED);
104
17.1k
  messenger_ = messenger;
105
106
17.1k
  RETURN_NOT_OK(HostPort::ParseStrings(options_.rpc_bind_addresses,
107
17.1k
                                       options_.default_port,
108
17.1k
                                       &rpc_host_port_));
109
110
17.1k
  RETURN_NOT_OK(ParseAddressList(options_.rpc_bind_addresses,
111
17.1k
                                 options_.default_port,
112
17.1k
                                 &rpc_bind_addresses_));
113
17.2k
  for (const auto& addr : rpc_bind_addresses_) {
114
17.2k
    if (IsPrivilegedPort(addr.port())) {
115
0
      LOG(WARNING) << "May be unable to bind to privileged port for address " << addr;
116
0
    }
117
118
    // Currently, we can't support binding to ephemeral ports outside of
119
    // unit tests, because consensus caches RPC ports of other servers
120
    // across restarts. See KUDU-334.
121
17.2k
    if (addr.port() == 0 && !FLAGS_rpc_server_allow_ephemeral_ports) {
122
0
      LOG(FATAL) << "Binding to ephemeral ports not supported (RPC address "
123
0
                 << "configured to " << addr << ")";
124
0
    }
125
17.2k
  }
126
127
17.1k
  server_state_ = INITIALIZED;
128
17.1k
  return Status::OK();
129
17.1k
}
130
131
Status RpcServer::RegisterService(size_t queue_limit,
132
                                  rpc::ServiceIfPtr service,
133
140k
                                  rpc::ServicePriority priority) {
134
0
  CHECK(server_state_ == INITIALIZED ||
135
0
        server_state_ == BOUND) << "bad state: " << server_state_;
136
140k
  const scoped_refptr<MetricEntity>& metric_entity = messenger_->metric_entity();
137
140k
  string service_name = service->service_name();
138
139
140k
  rpc::ThreadPool& thread_pool = messenger_->ThreadPool(priority);
140
141
140k
  scoped_refptr<rpc::ServicePool> service_pool(new rpc::ServicePool(
142
140k
      queue_limit, &thread_pool, &messenger_->scheduler(), std::move(service), metric_entity));
143
140k
  RETURN_NOT_OK(messenger_->RegisterService(service_name, service_pool));
144
140k
  return Status::OK();
145
140k
}
146
147
17.1k
Status RpcServer::Bind() {
148
17.1k
  CHECK_EQ(server_state_, INITIALIZED);
149
150
17.1k
  rpc_bound_addresses_.resize(rpc_bind_addresses_.size());
151
34.3k
  for (size_t i = 0; i != rpc_bind_addresses_.size(); ++i) {
152
17.1k
    RETURN_NOT_OK(messenger_->ListenAddress(
153
17.1k
        connection_context_factory_, rpc_bind_addresses_[i], &rpc_bound_addresses_[i]));
154
17.1k
  }
155
156
17.1k
  server_state_ = BOUND;
157
17.1k
  return Status::OK();
158
17.1k
}
159
160
17.1k
Status RpcServer::Start() {
161
17.1k
  if (server_state_ == INITIALIZED) {
162
0
    RETURN_NOT_OK(Bind());
163
0
  }
164
17.1k
  CHECK_EQ(server_state_, BOUND);
165
17.1k
  server_state_ = STARTED;
166
167
17.1k
  RETURN_NOT_OK(messenger_->StartAcceptor());
168
17.1k
  string bound_addrs_str;
169
17.1k
  for (const auto& bind_addr : rpc_bound_addresses_) {
170
17.1k
    if (!bound_addrs_str.empty()) bound_addrs_str += ", ";
171
17.1k
    bound_addrs_str += yb::ToString(bind_addr);
172
17.1k
  }
173
17.1k
  LOG(INFO) << "RPC server started. Bound to: " << bound_addrs_str;
174
175
17.1k
  return Status::OK();
176
17.1k
}
177
178
868
void RpcServer::Shutdown() {
179
868
  if (messenger_) {
180
751
    messenger_->ShutdownThreadPools();
181
751
    messenger_->ShutdownAcceptor();
182
751
    messenger_->UnregisterAllServices();
183
751
  }
184
868
}
185
186
0
const rpc::ServicePool* RpcServer::TEST_service_pool(const string& service_name) const {
187
0
  return down_cast<rpc::ServicePool*>(messenger_->TEST_rpc_service(service_name).get());
188
0
}
189
190
} // namespace server
191
} // namespace yb