/Users/deen/code/yugabyte-db/src/yb/server/rpc_server.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | // |
18 | | // The following only applies to changes made to this file as part of YugaByte development. |
19 | | // |
20 | | // Portions Copyright (c) YugaByte, Inc. |
21 | | // |
22 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
23 | | // in compliance with the License. You may obtain a copy of the License at |
24 | | // |
25 | | // http://www.apache.org/licenses/LICENSE-2.0 |
26 | | // |
27 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
28 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
29 | | // or implied. See the License for the specific language governing permissions and limitations |
30 | | // under the License. |
31 | | // |
32 | | |
33 | | #include "yb/server/rpc_server.h" |
34 | | |
35 | | #include <list> |
36 | | #include <string> |
37 | | #include <vector> |
38 | | |
39 | | #include <boost/preprocessor/cat.hpp> |
40 | | #include <boost/preprocessor/stringize.hpp> |
41 | | |
42 | | #include "yb/gutil/casts.h" |
43 | | |
44 | | #include "yb/rpc/messenger.h" |
45 | | #include "yb/rpc/service_if.h" |
46 | | #include "yb/rpc/service_pool.h" |
47 | | |
48 | | #include "yb/util/atomic.h" |
49 | | #include "yb/util/flag_tags.h" |
50 | | #include "yb/util/metric_entity.h" |
51 | | #include "yb/util/monotime.h" |
52 | | #include "yb/util/net/net_util.h" |
53 | | #include "yb/util/status.h" |
54 | | |
55 | | using yb::rpc::Messenger; |
56 | | using yb::rpc::ServiceIf; |
57 | | using std::shared_ptr; |
58 | | using std::string; |
59 | | using std::vector; |
60 | | using strings::Substitute; |
61 | | using std::unique_ptr; |
62 | | using std::make_unique; |
63 | | |
64 | | DEFINE_string(rpc_bind_addresses, "0.0.0.0", |
65 | | "Comma-separated list of addresses to bind to for RPC connections. " |
66 | | "Currently, ephemeral ports (i.e. port 0) are not allowed."); |
67 | | TAG_FLAG(rpc_bind_addresses, stable); |
68 | | |
69 | | DEFINE_bool(rpc_server_allow_ephemeral_ports, false, |
70 | | "Allow binding to ephemeral ports. This can cause problems, so currently " |
71 | | "only allowed in tests."); |
72 | | TAG_FLAG(rpc_server_allow_ephemeral_ports, unsafe); |
73 | | |
74 | | DECLARE_int32(rpc_default_keepalive_time_ms); |
75 | | |
76 | | namespace yb { |
77 | | namespace server { |
78 | | |
79 | | RpcServerOptions::RpcServerOptions() |
80 | | : rpc_bind_addresses(FLAGS_rpc_bind_addresses), |
81 | 32.5k | connection_keepalive_time_ms(FLAGS_rpc_default_keepalive_time_ms) { |
82 | 32.5k | } |
83 | | |
84 | | RpcServer::RpcServer(const std::string& name, RpcServerOptions opts, |
85 | | rpc::ConnectionContextFactoryPtr connection_context_factory) |
86 | | : name_(name), |
87 | | server_state_(UNINITIALIZED), |
88 | | options_(std::move(opts)), |
89 | 26.4k | connection_context_factory_(std::move(connection_context_factory)) { |
90 | 26.4k | LOG(INFO) << "yb::server::RpcServer created at " << this; |
91 | 26.4k | } |
92 | | |
93 | 250 | RpcServer::~RpcServer() { |
94 | 250 | Shutdown(); |
95 | 250 | } |
96 | | |
97 | 0 | string RpcServer::ToString() const { |
98 | | // TODO: include port numbers, etc. |
99 | 0 | return "RpcServer"; |
100 | 0 | } |
101 | | |
102 | 25.8k | Status RpcServer::Init(Messenger* messenger) { |
103 | 25.8k | CHECK_EQ(server_state_, UNINITIALIZED); |
104 | 25.8k | messenger_ = messenger; |
105 | | |
106 | 25.8k | RETURN_NOT_OK(HostPort::ParseStrings(options_.rpc_bind_addresses, |
107 | 25.8k | options_.default_port, |
108 | 25.8k | &rpc_host_port_)); |
109 | | |
110 | 25.8k | RETURN_NOT_OK(ParseAddressList(options_.rpc_bind_addresses, |
111 | 25.8k | options_.default_port, |
112 | 25.8k | &rpc_bind_addresses_)); |
113 | 25.8k | for (const auto& addr : rpc_bind_addresses_)25.8k { |
114 | 25.8k | if (IsPrivilegedPort(addr.port())) { |
115 | 0 | LOG(WARNING) << "May be unable to bind to privileged port for address " << addr; |
116 | 0 | } |
117 | | |
118 | | // Currently, we can't support binding to ephemeral ports outside of |
119 | | // unit tests, because consensus caches RPC ports of other servers |
120 | | // across restarts. See KUDU-334. |
121 | 25.8k | if (addr.port() == 0 && !FLAGS_rpc_server_allow_ephemeral_ports54 ) { |
122 | 0 | LOG(FATAL) << "Binding to ephemeral ports not supported (RPC address " |
123 | 0 | << "configured to " << addr << ")"; |
124 | 0 | } |
125 | 25.8k | } |
126 | | |
127 | 25.8k | server_state_ = INITIALIZED; |
128 | 25.8k | return Status::OK(); |
129 | 25.8k | } |
130 | | |
131 | | Status RpcServer::RegisterService(size_t queue_limit, |
132 | | rpc::ServiceIfPtr service, |
133 | 209k | rpc::ServicePriority priority) { |
134 | 209k | CHECK(server_state_ == INITIALIZED || |
135 | 0 | server_state_ == BOUND) << "bad state: " << server_state_; |
136 | 209k | const scoped_refptr<MetricEntity>& metric_entity = messenger_->metric_entity(); |
137 | 209k | string service_name = service->service_name(); |
138 | | |
139 | 209k | rpc::ThreadPool& thread_pool = messenger_->ThreadPool(priority); |
140 | | |
141 | 209k | scoped_refptr<rpc::ServicePool> service_pool(new rpc::ServicePool( |
142 | 209k | queue_limit, &thread_pool, &messenger_->scheduler(), std::move(service), metric_entity)); |
143 | 209k | RETURN_NOT_OK(messenger_->RegisterService(service_name, service_pool)); |
144 | 209k | return Status::OK(); |
145 | 209k | } |
146 | | |
147 | 25.8k | Status RpcServer::Bind() { |
148 | 25.8k | CHECK_EQ(server_state_, INITIALIZED); |
149 | | |
150 | 25.8k | rpc_bound_addresses_.resize(rpc_bind_addresses_.size()); |
151 | 51.6k | for (size_t i = 0; i != rpc_bind_addresses_.size(); ++i25.8k ) { |
152 | 25.8k | RETURN_NOT_OK(messenger_->ListenAddress( |
153 | 25.8k | connection_context_factory_, rpc_bind_addresses_[i], &rpc_bound_addresses_[i])); |
154 | 25.8k | } |
155 | | |
156 | 25.8k | server_state_ = BOUND; |
157 | 25.8k | return Status::OK(); |
158 | 25.8k | } |
159 | | |
160 | 25.7k | Status RpcServer::Start() { |
161 | 25.7k | if (server_state_ == INITIALIZED) { |
162 | 0 | RETURN_NOT_OK(Bind()); |
163 | 0 | } |
164 | 25.7k | CHECK_EQ(server_state_, BOUND); |
165 | 25.7k | server_state_ = STARTED; |
166 | | |
167 | 25.7k | RETURN_NOT_OK(messenger_->StartAcceptor()); |
168 | 25.7k | string bound_addrs_str; |
169 | 25.7k | for (const auto& bind_addr : rpc_bound_addresses_) { |
170 | 25.6k | if (!bound_addrs_str.empty()) bound_addrs_str += ", "0 ; |
171 | 25.6k | bound_addrs_str += yb::ToString(bind_addr); |
172 | 25.6k | } |
173 | 25.7k | LOG(INFO) << "RPC server started. Bound to: " << bound_addrs_str; |
174 | | |
175 | 25.7k | return Status::OK(); |
176 | 25.7k | } |
177 | | |
178 | 1.12k | void RpcServer::Shutdown() { |
179 | 1.12k | if (messenger_) { |
180 | 913 | messenger_->ShutdownThreadPools(); |
181 | 913 | messenger_->ShutdownAcceptor(); |
182 | 913 | messenger_->UnregisterAllServices(); |
183 | 913 | } |
184 | 1.12k | } |
185 | | |
186 | 0 | const rpc::ServicePool* RpcServer::TEST_service_pool(const string& service_name) const { |
187 | 0 | return down_cast<rpc::ServicePool*>(messenger_->TEST_rpc_service(service_name).get()); |
188 | 0 | } |
189 | | |
190 | | } // namespace server |
191 | | } // namespace yb |