YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/ent/src/yb/tserver/tablet_server_ent.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
13
#include "yb/cdc/cdc_service.h"
14
15
#include "yb/encryption/encrypted_file_factory.h"
16
#include "yb/encryption/header_manager_impl.h"
17
#include "yb/encryption/universe_key_manager.h"
18
19
#include "yb/rpc/secure_stream.h"
20
21
#include "yb/server/hybrid_clock.h"
22
#include "yb/server/secure.h"
23
24
#include "yb/rpc/rpc.h"
25
26
#include "yb/tablet/tablet_peer.h"
27
28
#include "yb/tserver/backup_service.h"
29
#include "yb/tserver/cdc_consumer.h"
30
#include "yb/tserver/tablet_server.h"
31
#include "yb/tserver/ts_tablet_manager.h"
32
33
#include "yb/util/flags.h"
34
#include "yb/util/flag_tags.h"
35
#include "yb/util/ntp_clock.h"
36
37
#include "yb/rocksutil/rocksdb_encrypted_file_factory.h"
38
39
DEFINE_int32(ts_backup_svc_num_threads, 4,
40
             "Number of RPC worker threads for the TS backup service");
41
TAG_FLAG(ts_backup_svc_num_threads, advanced);
42
43
DEFINE_int32(ts_backup_svc_queue_length, 50,
44
             "RPC queue length for the TS backup service");
45
TAG_FLAG(ts_backup_svc_queue_length, advanced);
46
47
DECLARE_int32(svc_queue_length_default);
48
49
DECLARE_string(cert_node_filename);
50
51
namespace yb {
52
namespace tserver {
53
namespace enterprise {
54
55
using cdc::CDCServiceImpl;
56
using yb::rpc::ServiceIf;
57
58
TabletServer::TabletServer(const TabletServerOptions& opts)
59
9.27k
  : super(opts) {}
60
61
260
TabletServer::~TabletServer() {
62
260
  Shutdown();
63
260
}
64
65
359
void TabletServer::Shutdown() {
66
359
  auto cdc_consumer = GetCDCConsumer();
67
359
  if (cdc_consumer) {
68
0
    cdc_consumer->Shutdown();
69
0
  }
70
359
  super::Shutdown();
71
359
}
72
73
8.74k
Status TabletServer::RegisterServices() {
74
#if !defined(__APPLE__)
75
  server::HybridClock::RegisterProvider(NtpClock::Name(), [](const std::string&) {
76
    return std::make_shared<NtpClock>();
77
  });
78
#endif
79
80
8.74k
  RETURN_NOT_OK(RpcAndWebServerBase::RegisterService(
81
8.74k
      FLAGS_ts_backup_svc_queue_length,
82
8.74k
      std::make_unique<TabletServiceBackupImpl>(tablet_manager_.get(), metric_entity())));
83
84
8.74k
  RETURN_NOT_OK(RpcAndWebServerBase::RegisterService(
85
8.74k
      FLAGS_svc_queue_length_default,
86
8.74k
      std::make_unique<CDCServiceImpl>(tablet_manager_.get(), metric_entity(), metric_registry())));
87
88
8.74k
  return super::RegisterServices();
89
8.74k
}
90
91
9.27k
Status TabletServer::SetupMessengerBuilder(rpc::MessengerBuilder* builder) {
92
9.27k
  RETURN_NOT_OK(super::SetupMessengerBuilder(builder));
93
9.27k
  if (!FLAGS_cert_node_filename.empty()) {
94
0
    secure_context_ = VERIFY_RESULT(server::SetupSecureContext(
95
0
        server::DefaultRootDir(*fs_manager_),
96
0
        FLAGS_cert_node_filename,
97
0
        server::SecureContextType::kInternal,
98
0
        builder));
99
9.27k
  } else {
100
9.27k
    const string &hosts = !options_.server_broadcast_addresses.empty()
101
9.27k
                        ? 
options_.server_broadcast_addresses0
102
9.27k
                        : options_.rpc_opts.rpc_bind_addresses;
103
9.27k
    secure_context_ = VERIFY_RESULT(server::SetupSecureContext(
104
9.27k
        hosts, *fs_manager_, server::SecureContextType::kInternal, builder));
105
9.27k
  }
106
9.27k
  return Status::OK();
107
9.27k
}
108
109
359
CDCConsumer* TabletServer::GetCDCConsumer() {
110
359
  std::lock_guard<decltype(cdc_consumer_mutex_)> l(cdc_consumer_mutex_);
111
359
  return cdc_consumer_.get();
112
359
}
113
114
0
encryption::UniverseKeyManager* TabletServer::GetUniverseKeyManager() {
115
0
  return opts_.universe_key_manager;
116
0
}
117
118
Status TabletServer::SetUniverseKeyRegistry(
119
46
    const encryption::UniverseKeyRegistryPB& universe_key_registry) {
120
46
  opts_.universe_key_manager->SetUniverseKeyRegistry(universe_key_registry);
121
46
  return Status::OK();
122
46
}
123
124
0
Status TabletServer::CreateCDCConsumer() {
125
0
  auto is_leader_clbk = [this](const string& tablet_id){
126
0
    std::shared_ptr<tablet::TabletPeer> tablet_peer;
127
0
    if (!tablet_manager_->LookupTablet(tablet_id, &tablet_peer)) {
128
0
      return false;
129
0
    }
130
0
    return tablet_peer->LeaderStatus() == consensus::LeaderStatus::LEADER_AND_READY;
131
0
  };
132
0
  cdc_consumer_ = VERIFY_RESULT(CDCConsumer::Create(std::move(is_leader_clbk), proxy_cache_.get(),
133
0
                                                    this));
134
0
  return Status::OK();
135
0
}
136
137
Status TabletServer::SetConfigVersionAndConsumerRegistry(int32_t cluster_config_version,
138
4.83M
    const cdc::ConsumerRegistryPB* consumer_registry) {
139
4.83M
  std::lock_guard<decltype(cdc_consumer_mutex_)> l(cdc_consumer_mutex_);
140
141
  // Only create a cdc consumer if consumer_registry is not null.
142
4.83M
  if (!cdc_consumer_ && consumer_registry) {
143
0
    RETURN_NOT_OK(CreateCDCConsumer());
144
0
  }
145
4.83M
  if (cdc_consumer_) {
146
0
    cdc_consumer_->RefreshWithNewRegistryFromMaster(consumer_registry, cluster_config_version);
147
0
  }
148
4.83M
  return Status::OK();
149
4.83M
}
150
151
5.25M
int32_t TabletServer::cluster_config_version() const {
152
5.25M
  std::lock_guard<decltype(cdc_consumer_mutex_)> l(cdc_consumer_mutex_);
153
  // If no CDC consumer, we will return -1, which will force the master to send the consumer
154
  // registry if one exists. If we receive one, we will create a new CDC consumer in
155
  // SetConsumerRegistry.
156
5.25M
  if (!cdc_consumer_) {
157
5.25M
    return -1;
158
5.25M
  }
159
0
  return cdc_consumer_->cluster_config_version();
160
5.25M
}
161
162
} // namespace enterprise
163
} // namespace tserver
164
} // namespace yb