YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/yql/cql/cqlserver/cql_server.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include "yb/yql/cql/cqlserver/cql_server.h"
15
16
#include <boost/bind.hpp>
17
18
#include "yb/client/client.h"
19
20
#include "yb/gutil/strings/substitute.h"
21
22
#include "yb/master/master_heartbeat.pb.h"
23
24
#include "yb/rpc/connection_context.h"
25
#include "yb/rpc/messenger.h"
26
#include "yb/rpc/rpc_introspection.pb.h"
27
28
#include "yb/tserver/tablet_server_interface.h"
29
30
#include "yb/util/flag_tags.h"
31
#include "yb/util/net/dns_resolver.h"
32
#include "yb/util/result.h"
33
#include "yb/util/size_literals.h"
34
#include "yb/util/source_location.h"
35
36
#include "yb/yql/cql/cqlserver/cql_rpc.h"
37
#include "yb/yql/cql/cqlserver/cql_service.h"
38
39
DEFINE_int32(cql_service_queue_length, 10000,
40
             "RPC queue length for CQL service");
41
TAG_FLAG(cql_service_queue_length, advanced);
42
43
DEFINE_int32(cql_nodelist_refresh_interval_secs, 300,
44
             "Interval after which a node list refresh event should be sent to all CQL clients.");
45
TAG_FLAG(cql_nodelist_refresh_interval_secs, runtime);
46
TAG_FLAG(cql_nodelist_refresh_interval_secs, advanced);
47
48
DEFINE_int64(cql_rpc_memory_limit, 0, "CQL RPC memory limit");
49
50
namespace yb {
51
namespace cqlserver {
52
53
using namespace std::placeholders;
54
using namespace yb::size_literals;
55
using namespace yb::ql; // NOLINT
56
57
using yb::rpc::ServiceIf;
58
59
namespace {
60
61
15.3k
boost::posix_time::time_duration refresh_interval() {
62
15.3k
  return boost::posix_time::seconds(FLAGS_cql_nodelist_refresh_interval_secs);
63
15.3k
}
64
65
}
66
67
CQLServer::CQLServer(const CQLServerOptions& opts,
68
                     boost::asio::io_service* io,
69
                     tserver::TabletServerIf* tserver)
70
    : RpcAndWebServerBase(
71
          "CQLServer", opts, "yb.cqlserver",
72
          MemTracker::CreateTracker(
73
              "CQL", tserver ? tserver->mem_tracker() : MemTracker::GetRootTracker(),
74
              AddToParent::kTrue, CreateMetrics::kFalse),
75
          tserver->Clock()),
76
      opts_(opts),
77
      timer_(*io, refresh_interval()),
78
4.54k
      tserver_(tserver) {
79
4.54k
  SetConnectionContextFactory(rpc::CreateConnectionContextFactory<CQLConnectionContext>(
80
4.54k
      FLAGS_cql_rpc_memory_limit, mem_tracker()->parent()));
81
4.54k
}
82
83
4.54k
Status CQLServer::Start() {
84
4.54k
  RETURN_NOT_OK(server::RpcAndWebServerBase::Init());
85
86
4.54k
  auto cql_service = std::make_shared<CQLServiceImpl>(this, opts_);
87
4.54k
  cql_service->CompleteInit();
88
89
4.54k
  RETURN_NOT_OK(RegisterService(FLAGS_cql_service_queue_length, std::move(cql_service)));
90
91
4.54k
  RETURN_NOT_OK(server::RpcAndWebServerBase::Start());
92
93
  // Start the CQL node list refresh timer.
94
4.54k
  timer_.async_wait(boost::bind(&CQLServer::CQLNodeListRefresh, this,
95
4.54k
                                boost::asio::placeholders::error));
96
4.54k
  return Status::OK();
97
4.54k
}
98
99
1
void CQLServer::Shutdown() {
100
1
  boost::system::error_code ec;
101
1
  timer_.cancel(ec);
102
1
  if (ec) {
103
0
    LOG(WARNING) << "Failed to cancel timer: " << ec;
104
0
  }
105
1
  server::RpcAndWebServerBase::Shutdown();
106
1
}
107
108
10.8k
void CQLServer::RescheduleTimer() {
109
  // Reschedule the timer.
110
10.8k
  boost::system::error_code ec;
111
10.8k
  auto new_expires = timer_.expires_at() + refresh_interval();
112
10.8k
  timer_.expires_at(new_expires, ec);
113
10.8k
  if (ec) {
114
    // Happens during shutdown.
115
0
    LOG(WARNING) << "Failed to reschedule timer: " << ec;
116
0
    return;
117
0
  }
118
10.8k
  timer_.async_wait(boost::bind(&CQLServer::CQLNodeListRefresh, this,
119
10.8k
                                boost::asio::placeholders::error));
120
10.8k
}
121
122
std::unique_ptr<CQLServerEvent> CQLServer::BuildTopologyChangeEvent(
123
52.8k
    const std::string& event_type, const Endpoint& addr) {
124
52.8k
  std::unique_ptr<EventResponse> event_response(new TopologyChangeEventResponse(event_type, addr));
125
52.8k
  std::unique_ptr<CQLServerEvent> cql_server_event(new CQLServerEvent(std::move(event_response)));
126
52.8k
  return cql_server_event;
127
52.8k
}
128
129
10.8k
void CQLServer::CQLNodeListRefresh(const boost::system::error_code &ec) {
130
10.8k
  if (ec) {
131
0
    return;
132
0
  }
133
134
10.8k
  auto cqlserver_event_list = std::make_shared<CQLServerEventList>();
135
10.8k
  auto& resolver = tserver_->client()->messenger()->resolver();
136
10.8k
  if (tserver_ != nullptr) {
137
    // Get all live tservers.
138
10.8k
    std::vector<master::TSInformationPB> live_tservers;
139
10.8k
    Status s = tserver_->GetLiveTServers(&live_tservers);
140
10.8k
    if (!s.ok()) {
141
0
      LOG(WARNING) << s.ToString();
142
0
      RescheduleTimer();
143
0
      return;
144
0
    }
145
146
    // Queue NEW_NODE event for all the live tservers.
147
42.0k
    for (const master::TSInformationPB& ts_info : live_tservers) {
148
42.0k
      const auto& hostport_pb = DesiredHostPort(ts_info.registration().common(), CloudInfoPB());
149
42.0k
      if (hostport_pb.host().empty()) {
150
0
        LOG (WARNING) << "Skipping TS since it doesn't have any rpc address: "
151
0
                      << ts_info.DebugString();
152
0
        continue;
153
0
      }
154
155
      // Use only the first rpc address.
156
42.0k
      auto addr = resolver.Resolve(hostport_pb.host());
157
42.0k
      if (PREDICT_FALSE(!addr.ok())) {
158
0
        LOG(WARNING) << Format("Couldn't result host $0: $1", hostport_pb.host(), addr.status());
159
0
        continue;
160
0
      }
161
162
      // We need the CQL port not the tserver port so use the rpc port from the local CQL server.
163
      // Note: this relies on the fact that all tservers must use the same CQL port which is not
164
      // currently enforced on YB side, but is practically required by the drivers.
165
42.0k
      const auto cql_port = first_rpc_address().port();
166
167
      // Queue event for all clients to add a node.
168
      //
169
      // TODO: the event should be sent only if there is appropriate subscription.
170
      //       https://github.com/yugabyte/yugabyte-db/issues/3090
171
42.0k
      cqlserver_event_list->AddEvent(
172
42.0k
          BuildTopologyChangeEvent(TopologyChangeEventResponse::kNewNode,
173
42.0k
                                   Endpoint(*addr, cql_port)));
174
42.0k
    }
175
10.8k
  }
176
177
  // Queue node refresh event, to remove any nodes that are down. Note that the 'MOVED_NODE'
178
  // event forces the client to refresh its entire cluster topology. The RPC address associated
179
  // with the event doesn't have much significance.
180
  //
181
  // TODO: the event should be sent only if there is appropriate subscription.
182
  //       https://github.com/yugabyte/yugabyte-db/issues/3090
183
10.8k
  cqlserver_event_list->AddEvent(
184
10.8k
      BuildTopologyChangeEvent(TopologyChangeEventResponse::kMovedNode, first_rpc_address()));
185
186
10.8k
  Status s = messenger_->QueueEventOnAllReactors(cqlserver_event_list, SOURCE_LOCATION());
187
10.8k
  if (!s.ok()) {
188
0
    LOG (WARNING) << strings::Substitute("Failed to push events: [$0], due to: $1",
189
0
                                         cqlserver_event_list->ToString(), s.ToString());
190
0
  }
191
192
10.8k
  RescheduleTimer();
193
10.8k
}
194
195
}  // namespace cqlserver
196
}  // namespace yb