YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/rpc/acceptor.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include "yb/rpc/acceptor.h"
34
35
#include <inttypes.h>
36
#include <pthread.h>
37
#include <stdint.h>
38
39
#include <atomic>
40
#include <list>
41
#include <memory>
42
#include <string>
43
#include <unordered_map>
44
#include <unordered_set>
45
#include <vector>
46
47
#include <gflags/gflags.h>
48
#include <glog/logging.h>
49
#include <gtest/gtest_prod.h>
50
51
#include "yb/gutil/ref_counted.h"
52
#include "yb/gutil/strings/substitute.h"
53
54
#include "yb/rpc/reactor.h"
55
56
#include "yb/util/flag_tags.h"
57
#include "yb/util/metrics.h"
58
#include "yb/util/monotime.h"
59
#include "yb/util/net/sockaddr.h"
60
#include "yb/util/status.h"
61
#include "yb/util/status_format.h"
62
#include "yb/util/status_log.h"
63
#include "yb/util/thread.h"
64
65
using google::protobuf::Message;
66
67
METRIC_DEFINE_counter(server, rpc_connections_accepted,
68
                      "RPC Connections Accepted",
69
                      yb::MetricUnit::kConnections,
70
                      "Number of incoming TCP connections made to the RPC server");
71
72
DEFINE_int32(rpc_acceptor_listen_backlog, 128,
73
             "Socket backlog parameter used when listening for RPC connections. "
74
             "This defines the maximum length to which the queue of pending "
75
             "TCP connections inbound to the RPC server may grow. If a connection "
76
             "request arrives when the queue is full, the client may receive "
77
             "an error. Higher values may help the server ride over bursts of "
78
             "new inbound connection requests.");
79
TAG_FLAG(rpc_acceptor_listen_backlog, advanced);
80
81
namespace yb {
82
namespace rpc {
83
84
Acceptor::Acceptor(const scoped_refptr<MetricEntity>& metric_entity, NewSocketHandler handler)
85
    : handler_(std::move(handler)),
86
      rpc_connections_accepted_(METRIC_rpc_connections_accepted.Instantiate(metric_entity)),
87
25.9k
      loop_(kDefaultLibEvFlags) {
88
25.9k
}
89
90
363
Acceptor::~Acceptor() {
91
363
  Shutdown();
92
363
}
93
94
25.9k
Status Acceptor::Listen(const Endpoint& endpoint, Endpoint* bound_endpoint) {
95
25.9k
  Socket socket;
96
25.9k
  RETURN_NOT_OK(socket.Init(endpoint.address().is_v6() ? Socket::FLAG_IPV6 : 0));
97
25.9k
  RETURN_NOT_OK(socket.SetReuseAddr(true));
98
25.9k
  RETURN_NOT_OK(socket.Bind(endpoint));
99
25.9k
  if (bound_endpoint) {
100
25.9k
    RETURN_NOT_OK(socket.GetSocketAddress(bound_endpoint));
101
25.9k
  }
102
25.9k
  RETURN_NOT_OK(socket.SetNonBlocking(true));
103
25.9k
  RETURN_NOT_OK(socket.Listen(FLAGS_rpc_acceptor_listen_backlog));
104
105
25.9k
  bool was_empty;
106
25.9k
  {
107
25.9k
    std::lock_guard<std::mutex> lock(mutex_);
108
25.9k
    if (closing_) {
109
0
      return STATUS_SUBSTITUTE(ServiceUnavailable, "Acceptor closing");
110
0
    }
111
25.9k
    was_empty = sockets_to_add_.empty();
112
25.9k
    sockets_to_add_.push_back(std::move(socket));
113
25.9k
  }
114
115
25.9k
  if (was_empty) {
116
25.9k
    async_.send();
117
25.9k
  }
118
119
25.9k
  return Status::OK();
120
25.9k
}
121
122
25.8k
Status Acceptor::Start() {
123
25.8k
  async_.set(loop_);
124
25.8k
  async_.set<Acceptor, &Acceptor::AsyncHandler>(this);
125
25.8k
  async_.start();
126
25.8k
  async_.send();
127
25.8k
  return yb::Thread::Create("acceptor", "acceptor", &Acceptor::RunThread, this, &thread_);
128
25.8k
}
129
130
726
void Acceptor::Shutdown() {
131
726
  {
132
726
    std::lock_guard<std::mutex> lock(mutex_);
133
726
    if (closing_) {
134
363
      CHECK(sockets_to_add_.empty());
135
363
      VLOG
(2) << "Acceptor already shut down"80
;
136
363
      return;
137
363
    }
138
363
    closing_ = true;
139
363
  }
140
141
363
  if (thread_) {
142
278
    async_.send();
143
144
278
    CHECK_OK(ThreadJoiner(thread_.get()).Join());
145
278
    thread_.reset();
146
278
  }
147
363
}
148
149
546k
void Acceptor::IoHandler(ev::io& io, int events) {
150
546k
  auto it = sockets_.find(&io);
151
546k
  if (it == sockets_.end()) {
152
0
    LOG(ERROR) << "IoHandler for unknown socket: " << &io;
153
0
    return;
154
0
  }
155
546k
  Socket& socket = it->second.socket;
156
546k
  if (events & EV_ERROR) {
157
0
    LOG(INFO) << "Acceptor socket failure: " << socket.GetFd()
158
0
              << ", endpoint: " << it->second.endpoint;
159
0
    sockets_.erase(it);
160
0
    return;
161
0
  }
162
163
546k
  if (events & EV_READ) {
164
1.17M
    for (;;) {
165
1.17M
      Socket new_sock;
166
1.17M
      Endpoint remote;
167
1.17M
      VLOG
(2) << "calling accept() on socket " << socket.GetFd()0
;
168
1.17M
      Status s = socket.Accept(&new_sock, &remote, Socket::FLAG_NONBLOCKING);
169
1.17M
      if (!s.ok()) {
170
546k
        if (!s.IsTryAgain()) {
171
601
          LOG(WARNING) << "Acceptor: accept failed: " << s;
172
601
        }
173
546k
        return;
174
546k
      }
175
628k
      s = new_sock.SetNoDelay(true);
176
628k
      if (!s.ok()) {
177
1
        LOG(WARNING) << "Acceptor with remote = " << remote
178
1
                     << " failed to set TCP_NODELAY on a newly accepted socket: "
179
1
                     << s.ToString();
180
1
        continue;
181
1
      }
182
628k
      rpc_connections_accepted_->Increment();
183
628k
      handler_(&new_sock, remote);
184
628k
    }
185
546k
  }
186
546k
}
187
188
26.0k
void Acceptor::AsyncHandler(ev::async& async, int events) {
189
26.0k
  bool closing;
190
26.0k
  {
191
26.0k
    std::lock_guard<std::mutex> lock(mutex_);
192
26.0k
    closing = closing_;
193
26.0k
    sockets_to_add_.swap(processing_sockets_to_add_);
194
26.0k
  }
195
196
26.0k
  if (closing) {
197
278
    processing_sockets_to_add_.clear();
198
278
    sockets_.clear();
199
278
    loop_.break_loop();
200
278
    return;
201
278
  }
202
203
51.5k
  
while (25.7k
!processing_sockets_to_add_.empty()) {
204
25.7k
    auto& socket = processing_sockets_to_add_.back();
205
25.7k
    Endpoint endpoint;
206
25.7k
    auto status = socket.GetSocketAddress(&endpoint);
207
25.7k
    if (!status.ok()) {
208
0
      LOG(WARNING) << "Failed to get address for socket: "
209
0
                   << socket.GetFd() << ": " << status.ToString();
210
0
    }
211
25.7k
    VLOG
(1) << "Adding socket fd " << socket.GetFd() << " at " << endpoint0
;
212
25.7k
    AcceptingSocket ac{ std::unique_ptr<ev::io>(new ev::io),
213
25.7k
                        Socket(std::move(socket)),
214
25.7k
                        endpoint };
215
25.7k
    processing_sockets_to_add_.pop_back();
216
25.7k
    ac.io->set(loop_);
217
25.7k
    ac.io->set<Acceptor, &Acceptor::IoHandler>(this);
218
25.7k
    ac.io->start(ac.socket.GetFd(), EV_READ);
219
25.7k
    sockets_.emplace(ac.io.get(), std::move(ac));
220
25.7k
  }
221
25.7k
}
222
223
25.7k
void Acceptor::RunThread() {
224
25.7k
  loop_.run();
225
25.7k
  VLOG
(1) << "Acceptor shutting down."25.4k
;
226
25.7k
}
227
228
} // namespace rpc
229
} // namespace yb