YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/rpc/acceptor.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include "yb/rpc/acceptor.h"
34
35
#include <inttypes.h>
36
#include <pthread.h>
37
#include <stdint.h>
38
39
#include <atomic>
40
#include <list>
41
#include <memory>
42
#include <string>
43
#include <unordered_map>
44
#include <unordered_set>
45
#include <vector>
46
47
#include <gflags/gflags.h>
48
#include <glog/logging.h>
49
#include <gtest/gtest_prod.h>
50
51
#include "yb/gutil/ref_counted.h"
52
#include "yb/gutil/strings/substitute.h"
53
54
#include "yb/rpc/reactor.h"
55
56
#include "yb/util/flag_tags.h"
57
#include "yb/util/metrics.h"
58
#include "yb/util/monotime.h"
59
#include "yb/util/net/sockaddr.h"
60
#include "yb/util/status.h"
61
#include "yb/util/status_format.h"
62
#include "yb/util/status_log.h"
63
#include "yb/util/thread.h"
64
65
using google::protobuf::Message;
66
67
METRIC_DEFINE_counter(server, rpc_connections_accepted,
68
                      "RPC Connections Accepted",
69
                      yb::MetricUnit::kConnections,
70
                      "Number of incoming TCP connections made to the RPC server");
71
72
DEFINE_int32(rpc_acceptor_listen_backlog, 128,
73
             "Socket backlog parameter used when listening for RPC connections. "
74
             "This defines the maximum length to which the queue of pending "
75
             "TCP connections inbound to the RPC server may grow. If a connection "
76
             "request arrives when the queue is full, the client may receive "
77
             "an error. Higher values may help the server ride over bursts of "
78
             "new inbound connection requests.");
79
TAG_FLAG(rpc_acceptor_listen_backlog, advanced);
80
81
namespace yb {
82
namespace rpc {
83
84
Acceptor::Acceptor(const scoped_refptr<MetricEntity>& metric_entity, NewSocketHandler handler)
85
    : handler_(std::move(handler)),
86
      rpc_connections_accepted_(METRIC_rpc_connections_accepted.Instantiate(metric_entity)),
87
17.2k
      loop_(kDefaultLibEvFlags) {
88
17.2k
}
89
90
258
Acceptor::~Acceptor() {
91
258
  Shutdown();
92
258
}
93
94
17.2k
Status Acceptor::Listen(const Endpoint& endpoint, Endpoint* bound_endpoint) {
95
17.2k
  Socket socket;
96
17.2k
  RETURN_NOT_OK(socket.Init(endpoint.address().is_v6() ? Socket::FLAG_IPV6 : 0));
97
17.2k
  RETURN_NOT_OK(socket.SetReuseAddr(true));
98
17.2k
  RETURN_NOT_OK(socket.Bind(endpoint));
99
17.2k
  if (bound_endpoint) {
100
17.2k
    RETURN_NOT_OK(socket.GetSocketAddress(bound_endpoint));
101
17.2k
  }
102
17.2k
  RETURN_NOT_OK(socket.SetNonBlocking(true));
103
17.2k
  RETURN_NOT_OK(socket.Listen(FLAGS_rpc_acceptor_listen_backlog));
104
105
17.2k
  bool was_empty;
106
17.2k
  {
107
17.2k
    std::lock_guard<std::mutex> lock(mutex_);
108
17.2k
    if (closing_) {
109
0
      return STATUS_SUBSTITUTE(ServiceUnavailable, "Acceptor closing");
110
0
    }
111
17.2k
    was_empty = sockets_to_add_.empty();
112
17.2k
    sockets_to_add_.push_back(std::move(socket));
113
17.2k
  }
114
115
17.2k
  if (was_empty) {
116
17.2k
    async_.send();
117
17.2k
  }
118
119
17.2k
  return Status::OK();
120
17.2k
}
121
122
17.2k
Status Acceptor::Start() {
123
17.2k
  async_.set(loop_);
124
17.2k
  async_.set<Acceptor, &Acceptor::AsyncHandler>(this);
125
17.2k
  async_.start();
126
17.2k
  async_.send();
127
17.2k
  return yb::Thread::Create("acceptor", "acceptor", &Acceptor::RunThread, this, &thread_);
128
17.2k
}
129
130
516
void Acceptor::Shutdown() {
131
516
  {
132
516
    std::lock_guard<std::mutex> lock(mutex_);
133
516
    if (closing_) {
134
258
      CHECK(sockets_to_add_.empty());
135
2
      VLOG(2) << "Acceptor already shut down";
136
258
      return;
137
258
    }
138
258
    closing_ = true;
139
258
  }
140
141
258
  if (thread_) {
142
249
    async_.send();
143
144
249
    CHECK_OK(ThreadJoiner(thread_.get()).Join());
145
249
    thread_.reset();
146
249
  }
147
258
}
148
149
264k
void Acceptor::IoHandler(ev::io& io, int events) {
150
264k
  auto it = sockets_.find(&io);
151
264k
  if (it == sockets_.end()) {
152
0
    LOG(ERROR) << "IoHandler for unknown socket: " << &io;
153
0
    return;
154
0
  }
155
264k
  Socket& socket = it->second.socket;
156
264k
  if (events & EV_ERROR) {
157
0
    LOG(INFO) << "Acceptor socket failure: " << socket.GetFd()
158
0
              << ", endpoint: " << it->second.endpoint;
159
0
    sockets_.erase(it);
160
0
    return;
161
0
  }
162
163
264k
  if (events & EV_READ) {
164
579k
    for (;;) {
165
579k
      Socket new_sock;
166
579k
      Endpoint remote;
167
0
      VLOG(2) << "calling accept() on socket " << socket.GetFd();
168
579k
      Status s = socket.Accept(&new_sock, &remote, Socket::FLAG_NONBLOCKING);
169
579k
      if (!s.ok()) {
170
264k
        if (!s.IsTryAgain()) {
171
158
          LOG(WARNING) << "Acceptor: accept failed: " << s;
172
158
        }
173
264k
        return;
174
264k
      }
175
314k
      s = new_sock.SetNoDelay(true);
176
314k
      if (!s.ok()) {
177
0
        LOG(WARNING) << "Acceptor with remote = " << remote
178
0
                     << " failed to set TCP_NODELAY on a newly accepted socket: "
179
0
                     << s.ToString();
180
0
        continue;
181
0
      }
182
314k
      rpc_connections_accepted_->Increment();
183
314k
      handler_(&new_sock, remote);
184
314k
    }
185
264k
  }
186
264k
}
187
188
17.5k
void Acceptor::AsyncHandler(ev::async& async, int events) {
189
17.5k
  bool closing;
190
17.5k
  {
191
17.5k
    std::lock_guard<std::mutex> lock(mutex_);
192
17.5k
    closing = closing_;
193
17.5k
    sockets_to_add_.swap(processing_sockets_to_add_);
194
17.5k
  }
195
196
17.5k
  if (closing) {
197
249
    processing_sockets_to_add_.clear();
198
249
    sockets_.clear();
199
249
    loop_.break_loop();
200
249
    return;
201
249
  }
202
203
34.5k
  while (!processing_sockets_to_add_.empty()) {
204
17.2k
    auto& socket = processing_sockets_to_add_.back();
205
17.2k
    Endpoint endpoint;
206
17.2k
    auto status = socket.GetSocketAddress(&endpoint);
207
17.2k
    if (!status.ok()) {
208
0
      LOG(WARNING) << "Failed to get address for socket: "
209
0
                   << socket.GetFd() << ": " << status.ToString();
210
0
    }
211
0
    VLOG(1) << "Adding socket fd " << socket.GetFd() << " at " << endpoint;
212
17.2k
    AcceptingSocket ac{ std::unique_ptr<ev::io>(new ev::io),
213
17.2k
                        Socket(std::move(socket)),
214
17.2k
                        endpoint };
215
17.2k
    processing_sockets_to_add_.pop_back();
216
17.2k
    ac.io->set(loop_);
217
17.2k
    ac.io->set<Acceptor, &Acceptor::IoHandler>(this);
218
17.2k
    ac.io->start(ac.socket.GetFd(), EV_READ);
219
17.2k
    sockets_.emplace(ac.io.get(), std::move(ac));
220
17.2k
  }
221
17.2k
}
222
223
17.2k
void Acceptor::RunThread() {
224
17.2k
  loop_.run();
225
17.0k
  VLOG(1) << "Acceptor shutting down.";
226
17.2k
}
227
228
} // namespace rpc
229
} // namespace yb