YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/ent/src/yb/tserver/cdc_consumer.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include <shared_mutex>
15
#include <chrono>
16
17
#include "yb/common/wire_protocol.h"
18
19
#include "yb/rpc/messenger.h"
20
#include "yb/rpc/proxy.h"
21
#include "yb/rpc/rpc.h"
22
#include "yb/rpc/secure_stream.h"
23
#include "yb/tserver/cdc_consumer.h"
24
#include "yb/tserver/twodc_output_client.h"
25
#include "yb/tserver/tablet_server.h"
26
#include "yb/tserver/cdc_poller.h"
27
28
#include "yb/cdc/cdc_consumer.pb.h"
29
30
#include "yb/client/client.h"
31
32
#include "yb/gutil/map-util.h"
33
#include "yb/server/secure.h"
34
#include "yb/util/flag_tags.h"
35
#include "yb/util/logging.h"
36
#include "yb/util/shared_lock.h"
37
#include "yb/util/status_log.h"
38
#include "yb/util/string_util.h"
39
#include "yb/util/thread.h"
40
41
DEFINE_int32(cdc_consumer_handler_thread_pool_size, 0,
42
             "Override the max thread pool size for CDCConsumerHandler, which is used by "
43
             "CDCPollers. If set to 0, then the thread pool will use the default size (number of "
44
             "cpus on the system).");
45
TAG_FLAG(cdc_consumer_handler_thread_pool_size, advanced);
46
47
DECLARE_int32(cdc_read_rpc_timeout_ms);
48
DECLARE_int32(cdc_write_rpc_timeout_ms);
49
DECLARE_bool(use_node_to_node_encryption);
50
DECLARE_string(certs_for_cdc_dir);
51
52
using namespace std::chrono_literals;
53
54
namespace yb {
55
56
namespace tserver {
57
namespace enterprise {
58
59
0
CDCClient::~CDCClient() {
60
0
  if (messenger) {
61
0
    messenger->Shutdown();
62
0
  }
63
0
}
64
65
0
void CDCClient::Shutdown() {
66
0
  client->Shutdown();
67
0
}
68
69
Result<std::unique_ptr<CDCConsumer>> CDCConsumer::Create(
70
    std::function<bool(const std::string&)> is_leader_for_tablet,
71
    rpc::ProxyCache* proxy_cache,
72
0
    TabletServer* tserver) {
73
0
  LOG(INFO) << "Creating CDC Consumer";
74
0
  auto master_addrs = tserver->options().GetMasterAddresses();
75
0
  std::vector<std::string> hostport_strs;
76
0
  hostport_strs.reserve(master_addrs->size());
77
0
  for (const auto& hp : *master_addrs) {
78
0
    hostport_strs.push_back(HostPort::ToCommaSeparatedString(hp));
79
0
  }
80
81
0
  auto local_client = std::make_unique<CDCClient>();
82
0
  if (FLAGS_use_node_to_node_encryption) {
83
0
    rpc::MessengerBuilder messenger_builder("cdc-consumer");
84
85
0
    local_client->secure_context = VERIFY_RESULT(server::SetupSecureContext(
86
0
        "", "", server::SecureContextType::kInternal, &messenger_builder));
87
88
0
    local_client->messenger = VERIFY_RESULT(messenger_builder.Build());
89
0
  }
90
91
0
  local_client->client = VERIFY_RESULT(client::YBClientBuilder()
92
0
      .master_server_addrs(hostport_strs)
93
0
      .set_client_name("CDCConsumerLocal")
94
0
      .default_rpc_timeout(MonoDelta::FromMilliseconds(FLAGS_cdc_write_rpc_timeout_ms))
95
0
      .Build(local_client->messenger.get()));
96
97
0
  local_client->client->SetLocalTabletServer(tserver->permanent_uuid(), tserver->proxy(), tserver);
98
0
  auto cdc_consumer = std::make_unique<CDCConsumer>(std::move(is_leader_for_tablet), proxy_cache,
99
0
      tserver->permanent_uuid(), std::move(local_client));
100
101
  // TODO(NIC): Unify cdc_consumer thread_pool & remote_client_ threadpools
102
0
  RETURN_NOT_OK(yb::Thread::Create(
103
0
      "CDCConsumer", "Poll", &CDCConsumer::RunThread, cdc_consumer.get(),
104
0
      &cdc_consumer->run_trigger_poll_thread_));
105
0
  ThreadPoolBuilder cdc_consumer_thread_pool_builder("CDCConsumerHandler");
106
0
  if (FLAGS_cdc_consumer_handler_thread_pool_size > 0) {
107
0
    cdc_consumer_thread_pool_builder.set_max_threads(FLAGS_cdc_consumer_handler_thread_pool_size);
108
0
  }
109
0
  RETURN_NOT_OK(cdc_consumer_thread_pool_builder.Build(&cdc_consumer->thread_pool_));
110
0
  return cdc_consumer;
111
0
}
112
113
CDCConsumer::CDCConsumer(std::function<bool(const std::string&)> is_leader_for_tablet,
114
                         rpc::ProxyCache* proxy_cache,
115
                         const string& ts_uuid,
116
                         std::unique_ptr<CDCClient> local_client) :
117
  is_leader_for_tablet_(std::move(is_leader_for_tablet)),
118
  rpcs_(new rpc::Rpcs),
119
  log_prefix_(Format("[TS $0]: ", ts_uuid)),
120
0
  local_client_(std::move(local_client)) {}
121
122
0
CDCConsumer::~CDCConsumer() {
123
0
  Shutdown();
124
0
}
125
126
0
void CDCConsumer::Shutdown() {
127
0
  LOG_WITH_PREFIX(INFO) << "Shutting down CDC Consumer";
128
0
  {
129
0
    std::lock_guard<std::mutex> l(should_run_mutex_);
130
0
    should_run_ = false;
131
0
  }
132
0
  cond_.notify_all();
133
134
0
  if (thread_pool_) {
135
0
    thread_pool_->Shutdown();
136
0
  }
137
138
0
  {
139
0
    std::lock_guard<rw_spinlock> write_lock(master_data_mutex_);
140
0
    producer_consumer_tablet_map_from_master_.clear();
141
0
    uuid_master_addrs_.clear();
142
0
    {
143
0
      SharedLock<rw_spinlock> read_lock(producer_pollers_map_mutex_);
144
0
      for (auto &uuid_and_client : remote_clients_) {
145
0
        uuid_and_client.second->Shutdown();
146
0
      }
147
0
      producer_pollers_map_.clear();
148
0
    }
149
0
    local_client_->client->Shutdown();
150
0
  }
151
152
0
  if (run_trigger_poll_thread_) {
153
0
    WARN_NOT_OK(ThreadJoiner(run_trigger_poll_thread_.get()).Join(), "Could not join thread");
154
0
  }
155
0
}
156
157
0
void CDCConsumer::RunThread() {
158
0
  while (true) {
159
0
    std::unique_lock<std::mutex> l(should_run_mutex_);
160
0
    if (!should_run_) {
161
0
      return;
162
0
    }
163
0
    cond_.wait_for(l, 1000ms);
164
0
    if (!should_run_) {
165
0
      return;
166
0
    }
167
0
    TriggerPollForNewTablets();
168
0
  }
169
0
}
170
171
void CDCConsumer::RefreshWithNewRegistryFromMaster(const cdc::ConsumerRegistryPB* consumer_registry,
172
0
                                                   int32_t cluster_config_version) {
173
0
  UpdateInMemoryState(consumer_registry, cluster_config_version);
174
0
  cond_.notify_all();
175
0
}
176
177
0
std::vector<std::string> CDCConsumer::TEST_producer_tablets_running() {
178
0
  SharedLock<rw_spinlock> read_lock(producer_pollers_map_mutex_);
179
180
0
  std::vector<string> tablets;
181
0
  for (const auto& producer : producer_pollers_map_) {
182
0
    tablets.push_back(producer.first.tablet_id);
183
0
  }
184
0
  return tablets;
185
0
}
186
187
// NOTE: This happens on TS.heartbeat, so it needs to finish quickly
188
void CDCConsumer::UpdateInMemoryState(const cdc::ConsumerRegistryPB* consumer_registry,
189
0
    int32_t cluster_config_version) {
190
0
  std::lock_guard<rw_spinlock> write_lock_master(master_data_mutex_);
191
192
  // Only update it if the version is newer.
193
0
  if (cluster_config_version <= cluster_config_version_.load(std::memory_order_acquire)) {
194
0
    return;
195
0
  }
196
197
0
  cluster_config_version_.store(cluster_config_version, std::memory_order_release);
198
0
  producer_consumer_tablet_map_from_master_.clear();
199
0
  decltype(uuid_master_addrs_) old_uuid_master_addrs;
200
0
  uuid_master_addrs_.swap(old_uuid_master_addrs);
201
202
0
  if (!consumer_registry) {
203
0
    LOG_WITH_PREFIX(INFO) << "Given empty CDC consumer registry: removing Pollers";
204
0
    cond_.notify_all();
205
0
    return;
206
0
  }
207
208
0
  LOG_WITH_PREFIX(INFO) << "Updating CDC consumer registry: " << consumer_registry->DebugString();
209
210
0
  streams_with_same_num_producer_consumer_tablets_.clear();
211
0
  for (const auto& producer_map : DCHECK_NOTNULL(consumer_registry)->producer_map()) {
212
0
    const auto& producer_entry_pb = producer_map.second;
213
0
    if (producer_entry_pb.disable_stream()) {
214
0
      continue;
215
0
    }
216
    // recreate the UUID connection information
217
0
    if (!ContainsKey(uuid_master_addrs_, producer_map.first)) {
218
0
      std::vector<HostPort> hp;
219
0
      HostPortsFromPBs(producer_map.second.master_addrs(), &hp);
220
0
      uuid_master_addrs_[producer_map.first] = HostPort::ToCommaSeparatedString(hp);
221
222
      // If master addresses changed, mark for YBClient update.
223
0
      if (ContainsKey(old_uuid_master_addrs, producer_map.first) &&
224
0
          uuid_master_addrs_[producer_map.first] != old_uuid_master_addrs[producer_map.first]) {
225
0
        changed_master_addrs_.insert(producer_map.first);
226
0
      }
227
0
    }
228
    // recreate the set of CDCPollers
229
0
    for (const auto& stream_entry : producer_entry_pb.stream_map()) {
230
0
      const auto& stream_entry_pb = stream_entry.second;
231
0
      if (stream_entry_pb.same_num_producer_consumer_tablets()) {
232
0
        LOG_WITH_PREFIX(INFO) << Format("Stream $0 will use local tserver optimization",
233
0
                                        stream_entry.first);
234
0
        streams_with_same_num_producer_consumer_tablets_.insert(stream_entry.first);
235
0
      }
236
0
      for (const auto& tablet_entry : stream_entry_pb.consumer_producer_tablet_map()) {
237
0
        const auto& consumer_tablet_id = tablet_entry.first;
238
0
        for (const auto& producer_tablet_id : tablet_entry.second.tablets()) {
239
0
          cdc::ProducerTabletInfo producer_tablet_info(
240
0
              {producer_map.first, stream_entry.first, producer_tablet_id});
241
0
          cdc::ConsumerTabletInfo consumer_tablet_info(
242
0
              {consumer_tablet_id, stream_entry_pb.consumer_table_id()});
243
0
          producer_consumer_tablet_map_from_master_[producer_tablet_info] = consumer_tablet_info;
244
0
        }
245
0
      }
246
0
    }
247
0
  }
248
0
  cond_.notify_all();
249
0
}
250
251
0
void CDCConsumer::TriggerPollForNewTablets() {
252
0
  std::lock_guard<rw_spinlock> write_lock_master(master_data_mutex_);
253
254
0
  for (const auto& entry : producer_consumer_tablet_map_from_master_) {
255
0
    auto uuid = entry.first.universe_uuid;
256
0
    bool start_polling;
257
0
    {
258
0
      SharedLock<rw_spinlock> read_lock_pollers(producer_pollers_map_mutex_);
259
0
      start_polling = producer_pollers_map_.find(entry.first) == producer_pollers_map_.end() &&
260
0
                      is_leader_for_tablet_(entry.second.tablet_id);
261
262
      // Update the Master Addresses, if altered after setup.
263
0
      if (ContainsKey(remote_clients_, uuid) && changed_master_addrs_.count(uuid) > 0) {
264
0
        auto status = remote_clients_[uuid]->client->SetMasterAddresses(uuid_master_addrs_[uuid]);
265
0
        if (status.ok()) {
266
0
          changed_master_addrs_.erase(uuid);
267
0
        } else {
268
0
          LOG_WITH_PREFIX(WARNING) << "Problem Setting Master Addresses for " << uuid
269
0
                                   << ": " << status.ToString();
270
0
        }
271
0
      }
272
0
    }
273
0
    if (start_polling) {
274
0
      std::lock_guard <rw_spinlock> write_lock_pollers(producer_pollers_map_mutex_);
275
276
      // Check again, since we unlocked.
277
0
      start_polling = producer_pollers_map_.find(entry.first) == producer_pollers_map_.end() &&
278
0
          is_leader_for_tablet_(entry.second.tablet_id);
279
0
      if (start_polling) {
280
        // This is a new tablet, trigger a poll.
281
        // See if we need to create a new client connection
282
0
        if (!ContainsKey(remote_clients_, uuid)) {
283
0
          CHECK(ContainsKey(uuid_master_addrs_, uuid));
284
285
0
          auto remote_client = std::make_unique<CDCClient>();
286
0
          std::string dir;
287
0
          if (FLAGS_use_node_to_node_encryption) {
288
0
            rpc::MessengerBuilder messenger_builder("cdc-consumer");
289
0
            if (!FLAGS_certs_for_cdc_dir.empty()) {
290
0
              dir = JoinPathSegments(FLAGS_certs_for_cdc_dir, uuid);
291
0
            }
292
293
0
            auto secure_context_result = server::SetupSecureContext(
294
0
                dir, "", "", server::SecureContextType::kInternal, &messenger_builder);
295
0
            if (!secure_context_result.ok()) {
296
0
              LOG(WARNING) << "Could not create secure context for " << uuid
297
0
                         << ": " << secure_context_result.status().ToString();
298
0
              return; // Don't finish creation.  Try again on the next heartbeat.
299
0
            }
300
0
            remote_client->secure_context = std::move(*secure_context_result);
301
302
0
            auto messenger_result = messenger_builder.Build();
303
0
            if (!messenger_result.ok()) {
304
0
              LOG(WARNING) << "Could not build messenger for " << uuid
305
0
                         << ": " << secure_context_result.status().ToString();
306
0
              return; // Don't finish creation.  Try again on the next heartbeat.
307
0
            }
308
0
            remote_client->messenger = std::move(*messenger_result);
309
0
          }
310
311
0
          auto client_result = yb::client::YBClientBuilder()
312
0
              .set_client_name("CDCConsumerRemote")
313
0
              .add_master_server_addr(uuid_master_addrs_[uuid])
314
0
              .skip_master_flagfile()
315
0
              .default_rpc_timeout(MonoDelta::FromMilliseconds(FLAGS_cdc_read_rpc_timeout_ms))
316
0
              .Build(remote_client->messenger.get());
317
0
          if (!client_result.ok()) {
318
0
            LOG(WARNING) << "Could not create a new YBClient for " << uuid
319
0
                         << ": " << client_result.status().ToString();
320
0
            return; // Don't finish creation.  Try again on the next heartbeat.
321
0
          }
322
323
0
          remote_client->client = std::move(*client_result);
324
0
          remote_clients_[uuid] = std::move(remote_client);
325
0
        }
326
327
        // now create the poller
328
0
        bool use_local_tserver =
329
0
            streams_with_same_num_producer_consumer_tablets_.find(entry.first.stream_id) !=
330
0
            streams_with_same_num_producer_consumer_tablets_.end();
331
0
        auto cdc_poller = std::make_shared<CDCPoller>(
332
0
            entry.first, entry.second,
333
0
            std::bind(&CDCConsumer::ShouldContinuePolling, this, entry.first),
334
0
            std::bind(&CDCConsumer::RemoveFromPollersMap, this, entry.first),
335
0
            thread_pool_.get(),
336
0
            rpcs_.get(),
337
0
            local_client_,
338
0
            remote_clients_[uuid],
339
0
            this,
340
0
            use_local_tserver);
341
0
        LOG_WITH_PREFIX(INFO) << Format("Start polling for producer tablet $0",
342
0
            entry.first.tablet_id);
343
0
        producer_pollers_map_[entry.first] = cdc_poller;
344
0
        cdc_poller->Poll();
345
0
      }
346
0
    }
347
0
  }
348
0
}
349
350
0
void CDCConsumer::RemoveFromPollersMap(const cdc::ProducerTabletInfo producer_tablet_info) {
351
0
  LOG_WITH_PREFIX(INFO) << Format("Stop polling for producer tablet $0",
352
0
                                  producer_tablet_info.tablet_id);
353
0
  std::shared_ptr<CDCClient> client_to_delete; // decrement refcount to 0 outside lock
354
0
  {
355
0
    SharedLock<rw_spinlock> read_lock_master(master_data_mutex_);
356
0
    std::lock_guard<rw_spinlock> write_lock_pollers(producer_pollers_map_mutex_);
357
0
    producer_pollers_map_.erase(producer_tablet_info);
358
    // Check if no more objects with this UUID exist after registry refresh.
359
0
    if (!ContainsKey(uuid_master_addrs_, producer_tablet_info.universe_uuid)) {
360
0
      auto it = remote_clients_.find(producer_tablet_info.universe_uuid);
361
0
      if (it != remote_clients_.end()) {
362
0
        client_to_delete = it->second;
363
0
        remote_clients_.erase(it);
364
0
      }
365
0
    }
366
0
  }
367
0
  if (client_to_delete != nullptr) {
368
0
    client_to_delete->Shutdown();
369
0
  }
370
0
}
371
372
0
bool CDCConsumer::ShouldContinuePolling(const cdc::ProducerTabletInfo producer_tablet_info) {
373
0
  std::lock_guard<std::mutex> l(should_run_mutex_);
374
0
  if (!should_run_) {
375
0
    return false;
376
0
  }
377
378
0
  SharedLock<rw_spinlock> read_lock_master(master_data_mutex_);
379
380
0
  const auto& it = producer_consumer_tablet_map_from_master_.find(producer_tablet_info);
381
0
  if (it == producer_consumer_tablet_map_from_master_.end()) {
382
    // We no longer care about this tablet, abort the cycle.
383
0
    return false;
384
0
  }
385
0
  return is_leader_for_tablet_(it->second.tablet_id);
386
0
}
387
388
0
std::string CDCConsumer::LogPrefix() {
389
0
  return log_prefix_;
390
0
}
391
392
0
int32_t CDCConsumer::cluster_config_version() const {
393
0
  return cluster_config_version_.load(std::memory_order_acquire);
394
0
}
395
396
} // namespace enterprise
397
} // namespace tserver
398
} // namespace yb