/Users/deen/code/yugabyte-db/src/yb/rpc/acceptor.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | // |
18 | | // The following only applies to changes made to this file as part of YugaByte development. |
19 | | // |
20 | | // Portions Copyright (c) YugaByte, Inc. |
21 | | // |
22 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
23 | | // in compliance with the License. You may obtain a copy of the License at |
24 | | // |
25 | | // http://www.apache.org/licenses/LICENSE-2.0 |
26 | | // |
27 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
28 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
29 | | // or implied. See the License for the specific language governing permissions and limitations |
30 | | // under the License. |
31 | | // |
32 | | |
33 | | #include "yb/rpc/acceptor.h" |
34 | | |
35 | | #include <inttypes.h> |
36 | | #include <pthread.h> |
37 | | #include <stdint.h> |
38 | | |
39 | | #include <atomic> |
40 | | #include <list> |
41 | | #include <memory> |
42 | | #include <string> |
43 | | #include <unordered_map> |
44 | | #include <unordered_set> |
45 | | #include <vector> |
46 | | |
47 | | #include <gflags/gflags.h> |
48 | | #include <glog/logging.h> |
49 | | #include <gtest/gtest_prod.h> |
50 | | |
51 | | #include "yb/gutil/ref_counted.h" |
52 | | #include "yb/gutil/strings/substitute.h" |
53 | | |
54 | | #include "yb/rpc/reactor.h" |
55 | | |
56 | | #include "yb/util/flag_tags.h" |
57 | | #include "yb/util/metrics.h" |
58 | | #include "yb/util/monotime.h" |
59 | | #include "yb/util/net/sockaddr.h" |
60 | | #include "yb/util/status.h" |
61 | | #include "yb/util/status_format.h" |
62 | | #include "yb/util/status_log.h" |
63 | | #include "yb/util/thread.h" |
64 | | |
65 | | using google::protobuf::Message; |
66 | | |
67 | | METRIC_DEFINE_counter(server, rpc_connections_accepted, |
68 | | "RPC Connections Accepted", |
69 | | yb::MetricUnit::kConnections, |
70 | | "Number of incoming TCP connections made to the RPC server"); |
71 | | |
72 | | DEFINE_int32(rpc_acceptor_listen_backlog, 128, |
73 | | "Socket backlog parameter used when listening for RPC connections. " |
74 | | "This defines the maximum length to which the queue of pending " |
75 | | "TCP connections inbound to the RPC server may grow. If a connection " |
76 | | "request arrives when the queue is full, the client may receive " |
77 | | "an error. Higher values may help the server ride over bursts of " |
78 | | "new inbound connection requests."); |
79 | | TAG_FLAG(rpc_acceptor_listen_backlog, advanced); |
80 | | |
81 | | namespace yb { |
82 | | namespace rpc { |
83 | | |
84 | | Acceptor::Acceptor(const scoped_refptr<MetricEntity>& metric_entity, NewSocketHandler handler) |
85 | | : handler_(std::move(handler)), |
86 | | rpc_connections_accepted_(METRIC_rpc_connections_accepted.Instantiate(metric_entity)), |
87 | 17.2k | loop_(kDefaultLibEvFlags) { |
88 | 17.2k | } |
89 | | |
90 | 258 | Acceptor::~Acceptor() { |
91 | 258 | Shutdown(); |
92 | 258 | } |
93 | | |
94 | 17.2k | Status Acceptor::Listen(const Endpoint& endpoint, Endpoint* bound_endpoint) { |
95 | 17.2k | Socket socket; |
96 | 17.2k | RETURN_NOT_OK(socket.Init(endpoint.address().is_v6() ? Socket::FLAG_IPV6 : 0)); |
97 | 17.2k | RETURN_NOT_OK(socket.SetReuseAddr(true)); |
98 | 17.2k | RETURN_NOT_OK(socket.Bind(endpoint)); |
99 | 17.2k | if (bound_endpoint) { |
100 | 17.2k | RETURN_NOT_OK(socket.GetSocketAddress(bound_endpoint)); |
101 | 17.2k | } |
102 | 17.2k | RETURN_NOT_OK(socket.SetNonBlocking(true)); |
103 | 17.2k | RETURN_NOT_OK(socket.Listen(FLAGS_rpc_acceptor_listen_backlog)); |
104 | | |
105 | 17.2k | bool was_empty; |
106 | 17.2k | { |
107 | 17.2k | std::lock_guard<std::mutex> lock(mutex_); |
108 | 17.2k | if (closing_) { |
109 | 0 | return STATUS_SUBSTITUTE(ServiceUnavailable, "Acceptor closing"); |
110 | 0 | } |
111 | 17.2k | was_empty = sockets_to_add_.empty(); |
112 | 17.2k | sockets_to_add_.push_back(std::move(socket)); |
113 | 17.2k | } |
114 | | |
115 | 17.2k | if (was_empty) { |
116 | 17.2k | async_.send(); |
117 | 17.2k | } |
118 | | |
119 | 17.2k | return Status::OK(); |
120 | 17.2k | } |
121 | | |
122 | 17.2k | Status Acceptor::Start() { |
123 | 17.2k | async_.set(loop_); |
124 | 17.2k | async_.set<Acceptor, &Acceptor::AsyncHandler>(this); |
125 | 17.2k | async_.start(); |
126 | 17.2k | async_.send(); |
127 | 17.2k | return yb::Thread::Create("acceptor", "acceptor", &Acceptor::RunThread, this, &thread_); |
128 | 17.2k | } |
129 | | |
130 | 516 | void Acceptor::Shutdown() { |
131 | 516 | { |
132 | 516 | std::lock_guard<std::mutex> lock(mutex_); |
133 | 516 | if (closing_) { |
134 | 258 | CHECK(sockets_to_add_.empty()); |
135 | 2 | VLOG(2) << "Acceptor already shut down"; |
136 | 258 | return; |
137 | 258 | } |
138 | 258 | closing_ = true; |
139 | 258 | } |
140 | | |
141 | 258 | if (thread_) { |
142 | 249 | async_.send(); |
143 | | |
144 | 249 | CHECK_OK(ThreadJoiner(thread_.get()).Join()); |
145 | 249 | thread_.reset(); |
146 | 249 | } |
147 | 258 | } |
148 | | |
149 | 264k | void Acceptor::IoHandler(ev::io& io, int events) { |
150 | 264k | auto it = sockets_.find(&io); |
151 | 264k | if (it == sockets_.end()) { |
152 | 0 | LOG(ERROR) << "IoHandler for unknown socket: " << &io; |
153 | 0 | return; |
154 | 0 | } |
155 | 264k | Socket& socket = it->second.socket; |
156 | 264k | if (events & EV_ERROR) { |
157 | 0 | LOG(INFO) << "Acceptor socket failure: " << socket.GetFd() |
158 | 0 | << ", endpoint: " << it->second.endpoint; |
159 | 0 | sockets_.erase(it); |
160 | 0 | return; |
161 | 0 | } |
162 | | |
163 | 264k | if (events & EV_READ) { |
164 | 579k | for (;;) { |
165 | 579k | Socket new_sock; |
166 | 579k | Endpoint remote; |
167 | 0 | VLOG(2) << "calling accept() on socket " << socket.GetFd(); |
168 | 579k | Status s = socket.Accept(&new_sock, &remote, Socket::FLAG_NONBLOCKING); |
169 | 579k | if (!s.ok()) { |
170 | 264k | if (!s.IsTryAgain()) { |
171 | 158 | LOG(WARNING) << "Acceptor: accept failed: " << s; |
172 | 158 | } |
173 | 264k | return; |
174 | 264k | } |
175 | 314k | s = new_sock.SetNoDelay(true); |
176 | 314k | if (!s.ok()) { |
177 | 0 | LOG(WARNING) << "Acceptor with remote = " << remote |
178 | 0 | << " failed to set TCP_NODELAY on a newly accepted socket: " |
179 | 0 | << s.ToString(); |
180 | 0 | continue; |
181 | 0 | } |
182 | 314k | rpc_connections_accepted_->Increment(); |
183 | 314k | handler_(&new_sock, remote); |
184 | 314k | } |
185 | 264k | } |
186 | 264k | } |
187 | | |
188 | 17.5k | void Acceptor::AsyncHandler(ev::async& async, int events) { |
189 | 17.5k | bool closing; |
190 | 17.5k | { |
191 | 17.5k | std::lock_guard<std::mutex> lock(mutex_); |
192 | 17.5k | closing = closing_; |
193 | 17.5k | sockets_to_add_.swap(processing_sockets_to_add_); |
194 | 17.5k | } |
195 | | |
196 | 17.5k | if (closing) { |
197 | 249 | processing_sockets_to_add_.clear(); |
198 | 249 | sockets_.clear(); |
199 | 249 | loop_.break_loop(); |
200 | 249 | return; |
201 | 249 | } |
202 | | |
203 | 34.5k | while (!processing_sockets_to_add_.empty()) { |
204 | 17.2k | auto& socket = processing_sockets_to_add_.back(); |
205 | 17.2k | Endpoint endpoint; |
206 | 17.2k | auto status = socket.GetSocketAddress(&endpoint); |
207 | 17.2k | if (!status.ok()) { |
208 | 0 | LOG(WARNING) << "Failed to get address for socket: " |
209 | 0 | << socket.GetFd() << ": " << status.ToString(); |
210 | 0 | } |
211 | 0 | VLOG(1) << "Adding socket fd " << socket.GetFd() << " at " << endpoint; |
212 | 17.2k | AcceptingSocket ac{ std::unique_ptr<ev::io>(new ev::io), |
213 | 17.2k | Socket(std::move(socket)), |
214 | 17.2k | endpoint }; |
215 | 17.2k | processing_sockets_to_add_.pop_back(); |
216 | 17.2k | ac.io->set(loop_); |
217 | 17.2k | ac.io->set<Acceptor, &Acceptor::IoHandler>(this); |
218 | 17.2k | ac.io->start(ac.socket.GetFd(), EV_READ); |
219 | 17.2k | sockets_.emplace(ac.io.get(), std::move(ac)); |
220 | 17.2k | } |
221 | 17.2k | } |
222 | | |
223 | 17.2k | void Acceptor::RunThread() { |
224 | 17.2k | loop_.run(); |
225 | 17.0k | VLOG(1) << "Acceptor shutting down."; |
226 | 17.2k | } |
227 | | |
228 | | } // namespace rpc |
229 | | } // namespace yb |