/Users/deen/code/yugabyte-db/ent/src/yb/tserver/tablet_server_ent.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | |
13 | | #include "yb/cdc/cdc_service.h" |
14 | | |
15 | | #include "yb/encryption/encrypted_file_factory.h" |
16 | | #include "yb/encryption/header_manager_impl.h" |
17 | | #include "yb/encryption/universe_key_manager.h" |
18 | | |
19 | | #include "yb/rpc/secure_stream.h" |
20 | | |
21 | | #include "yb/server/hybrid_clock.h" |
22 | | #include "yb/server/secure.h" |
23 | | |
24 | | #include "yb/rpc/rpc.h" |
25 | | |
26 | | #include "yb/tablet/tablet_peer.h" |
27 | | |
28 | | #include "yb/tserver/backup_service.h" |
29 | | #include "yb/tserver/cdc_consumer.h" |
30 | | #include "yb/tserver/tablet_server.h" |
31 | | #include "yb/tserver/ts_tablet_manager.h" |
32 | | |
33 | | #include "yb/util/flags.h" |
34 | | #include "yb/util/flag_tags.h" |
35 | | #include "yb/util/ntp_clock.h" |
36 | | |
37 | | #include "yb/rocksutil/rocksdb_encrypted_file_factory.h" |
38 | | |
39 | | DEFINE_int32(ts_backup_svc_num_threads, 4, |
40 | | "Number of RPC worker threads for the TS backup service"); |
41 | | TAG_FLAG(ts_backup_svc_num_threads, advanced); |
42 | | |
43 | | DEFINE_int32(ts_backup_svc_queue_length, 50, |
44 | | "RPC queue length for the TS backup service"); |
45 | | TAG_FLAG(ts_backup_svc_queue_length, advanced); |
46 | | |
47 | | DECLARE_int32(svc_queue_length_default); |
48 | | |
49 | | DECLARE_string(cert_node_filename); |
50 | | |
51 | | namespace yb { |
52 | | namespace tserver { |
53 | | namespace enterprise { |
54 | | |
55 | | using cdc::CDCServiceImpl; |
56 | | using yb::rpc::ServiceIf; |
57 | | |
58 | | TabletServer::TabletServer(const TabletServerOptions& opts) |
59 | 9.27k | : super(opts) {} |
60 | | |
61 | 260 | TabletServer::~TabletServer() { |
62 | 260 | Shutdown(); |
63 | 260 | } |
64 | | |
65 | 359 | void TabletServer::Shutdown() { |
66 | 359 | auto cdc_consumer = GetCDCConsumer(); |
67 | 359 | if (cdc_consumer) { |
68 | 0 | cdc_consumer->Shutdown(); |
69 | 0 | } |
70 | 359 | super::Shutdown(); |
71 | 359 | } |
72 | | |
73 | 8.74k | Status TabletServer::RegisterServices() { |
74 | | #if !defined(__APPLE__) |
75 | | server::HybridClock::RegisterProvider(NtpClock::Name(), [](const std::string&) { |
76 | | return std::make_shared<NtpClock>(); |
77 | | }); |
78 | | #endif |
79 | | |
80 | 8.74k | RETURN_NOT_OK(RpcAndWebServerBase::RegisterService( |
81 | 8.74k | FLAGS_ts_backup_svc_queue_length, |
82 | 8.74k | std::make_unique<TabletServiceBackupImpl>(tablet_manager_.get(), metric_entity()))); |
83 | | |
84 | 8.74k | RETURN_NOT_OK(RpcAndWebServerBase::RegisterService( |
85 | 8.74k | FLAGS_svc_queue_length_default, |
86 | 8.74k | std::make_unique<CDCServiceImpl>(tablet_manager_.get(), metric_entity(), metric_registry()))); |
87 | | |
88 | 8.74k | return super::RegisterServices(); |
89 | 8.74k | } |
90 | | |
91 | 9.27k | Status TabletServer::SetupMessengerBuilder(rpc::MessengerBuilder* builder) { |
92 | 9.27k | RETURN_NOT_OK(super::SetupMessengerBuilder(builder)); |
93 | 9.27k | if (!FLAGS_cert_node_filename.empty()) { |
94 | 0 | secure_context_ = VERIFY_RESULT(server::SetupSecureContext( |
95 | 0 | server::DefaultRootDir(*fs_manager_), |
96 | 0 | FLAGS_cert_node_filename, |
97 | 0 | server::SecureContextType::kInternal, |
98 | 0 | builder)); |
99 | 9.27k | } else { |
100 | 9.27k | const string &hosts = !options_.server_broadcast_addresses.empty() |
101 | 9.27k | ? options_.server_broadcast_addresses0 |
102 | 9.27k | : options_.rpc_opts.rpc_bind_addresses; |
103 | 9.27k | secure_context_ = VERIFY_RESULT(server::SetupSecureContext( |
104 | 9.27k | hosts, *fs_manager_, server::SecureContextType::kInternal, builder)); |
105 | 9.27k | } |
106 | 9.27k | return Status::OK(); |
107 | 9.27k | } |
108 | | |
109 | 359 | CDCConsumer* TabletServer::GetCDCConsumer() { |
110 | 359 | std::lock_guard<decltype(cdc_consumer_mutex_)> l(cdc_consumer_mutex_); |
111 | 359 | return cdc_consumer_.get(); |
112 | 359 | } |
113 | | |
114 | 0 | encryption::UniverseKeyManager* TabletServer::GetUniverseKeyManager() { |
115 | 0 | return opts_.universe_key_manager; |
116 | 0 | } |
117 | | |
118 | | Status TabletServer::SetUniverseKeyRegistry( |
119 | 46 | const encryption::UniverseKeyRegistryPB& universe_key_registry) { |
120 | 46 | opts_.universe_key_manager->SetUniverseKeyRegistry(universe_key_registry); |
121 | 46 | return Status::OK(); |
122 | 46 | } |
123 | | |
124 | 0 | Status TabletServer::CreateCDCConsumer() { |
125 | 0 | auto is_leader_clbk = [this](const string& tablet_id){ |
126 | 0 | std::shared_ptr<tablet::TabletPeer> tablet_peer; |
127 | 0 | if (!tablet_manager_->LookupTablet(tablet_id, &tablet_peer)) { |
128 | 0 | return false; |
129 | 0 | } |
130 | 0 | return tablet_peer->LeaderStatus() == consensus::LeaderStatus::LEADER_AND_READY; |
131 | 0 | }; |
132 | 0 | cdc_consumer_ = VERIFY_RESULT(CDCConsumer::Create(std::move(is_leader_clbk), proxy_cache_.get(), |
133 | 0 | this)); |
134 | 0 | return Status::OK(); |
135 | 0 | } |
136 | | |
137 | | Status TabletServer::SetConfigVersionAndConsumerRegistry(int32_t cluster_config_version, |
138 | 4.83M | const cdc::ConsumerRegistryPB* consumer_registry) { |
139 | 4.83M | std::lock_guard<decltype(cdc_consumer_mutex_)> l(cdc_consumer_mutex_); |
140 | | |
141 | | // Only create a cdc consumer if consumer_registry is not null. |
142 | 4.83M | if (!cdc_consumer_ && consumer_registry) { |
143 | 0 | RETURN_NOT_OK(CreateCDCConsumer()); |
144 | 0 | } |
145 | 4.83M | if (cdc_consumer_) { |
146 | 0 | cdc_consumer_->RefreshWithNewRegistryFromMaster(consumer_registry, cluster_config_version); |
147 | 0 | } |
148 | 4.83M | return Status::OK(); |
149 | 4.83M | } |
150 | | |
151 | 5.25M | int32_t TabletServer::cluster_config_version() const { |
152 | 5.25M | std::lock_guard<decltype(cdc_consumer_mutex_)> l(cdc_consumer_mutex_); |
153 | | // If no CDC consumer, we will return -1, which will force the master to send the consumer |
154 | | // registry if one exists. If we receive one, we will create a new CDC consumer in |
155 | | // SetConsumerRegistry. |
156 | 5.25M | if (!cdc_consumer_) { |
157 | 5.25M | return -1; |
158 | 5.25M | } |
159 | 0 | return cdc_consumer_->cluster_config_version(); |
160 | 5.25M | } |
161 | | |
162 | | } // namespace enterprise |
163 | | } // namespace tserver |
164 | | } // namespace yb |