YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/ent/src/yb/tserver/cdc_consumer.h
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#ifndef ENT_SRC_YB_TSERVER_CDC_CONSUMER_H
15
#define ENT_SRC_YB_TSERVER_CDC_CONSUMER_H
16
17
#include <unordered_map>
18
#include <unordered_set>
19
20
#include "yb/cdc/cdc_util.h"
21
#include "yb/util/locks.h"
22
23
namespace yb {
24
25
class Thread;
26
class ThreadPool;
27
28
namespace rpc {
29
30
class Messenger;
31
class ProxyCache;
32
class Rpcs;
33
class SecureContext;
34
35
} // namespace rpc
36
37
namespace cdc {
38
39
class ConsumerRegistryPB;
40
41
} // namespace cdc
42
43
namespace client {
44
45
class YBClient;
46
47
} // namespace client
48
49
namespace tserver {
50
namespace enterprise {
51
52
class CDCPoller;
53
class TabletServer;
54
55
struct CDCClient {
56
  std::unique_ptr<rpc::Messenger> messenger;
57
  std::unique_ptr<rpc::SecureContext> secure_context;
58
  std::shared_ptr<client::YBClient> client;
59
60
  ~CDCClient();
61
  void Shutdown();
62
};
63
64
class CDCConsumer {
65
 public:
66
  static Result<std::unique_ptr<CDCConsumer>> Create(
67
      std::function<bool(const std::string&)> is_leader_for_tablet,
68
      rpc::ProxyCache* proxy_cache,
69
      TabletServer* tserver);
70
71
  CDCConsumer(std::function<bool(const std::string&)> is_leader_for_tablet,
72
      rpc::ProxyCache* proxy_cache,
73
      const std::string& ts_uuid,
74
      std::unique_ptr<CDCClient> local_client);
75
76
  ~CDCConsumer();
77
  void Shutdown();
78
79
  // Refreshes the in memory state when we receive a new registry from master.
80
  void RefreshWithNewRegistryFromMaster(const cdc::ConsumerRegistryPB* consumer_registry,
81
                                        int32_t cluster_config_version);
82
83
  std::vector<std::string> TEST_producer_tablets_running();
84
85
  std::string LogPrefix();
86
87
  // Return the value stored in cluster_config_version_. Since we are reading an atomic variable,
88
  // we don't need to hold the mutex.
89
  int32_t cluster_config_version() const NO_THREAD_SAFETY_ANALYSIS;
90
91
0
  void IncrementNumSuccessfulWriteRpcs() {
92
0
    TEST_num_successful_write_rpcs++;
93
0
  }
94
95
0
  uint32_t GetNumSuccessfulWriteRpcs() {
96
0
    return TEST_num_successful_write_rpcs.load(std::memory_order_acquire);
97
0
  }
98
99
 private:
100
  // Runs a thread that periodically polls for any new threads.
101
  void RunThread();
102
103
  // Loops through all the entries in the registry and creates a producer -> consumer tablet
104
  // mapping.
105
  void UpdateInMemoryState(const cdc::ConsumerRegistryPB* consumer_producer_map,
106
                           int32_t cluster_config_version);
107
108
  // Loops through all entries in registry from master to check if all producer tablets are being
109
  // polled for.
110
  void TriggerPollForNewTablets();
111
112
  bool ShouldContinuePolling(const cdc::ProducerTabletInfo producer_tablet_info);
113
114
  void RemoveFromPollersMap(const cdc::ProducerTabletInfo producer_tablet_info);
115
116
  // Mutex and cond for should_run_ state.
117
  std::mutex should_run_mutex_;
118
  std::condition_variable cond_;
119
120
  // Mutex for producer_consumer_tablet_map_from_master_.
121
  rw_spinlock master_data_mutex_ ACQUIRED_AFTER(should_run_mutex_);
122
123
  // Mutex for producer_pollers_map_.
124
  rw_spinlock producer_pollers_map_mutex_ ACQUIRED_AFTER(should_run_mutex_, master_data_mutex_);
125
126
  std::function<bool(const std::string&)> is_leader_for_tablet_;
127
128
  std::unordered_map<cdc::ProducerTabletInfo, cdc::ConsumerTabletInfo,
129
                     cdc::ProducerTabletInfo::Hash> producer_consumer_tablet_map_from_master_
130
                     GUARDED_BY(master_data_mutex_);
131
132
  std::unordered_set<std::string> streams_with_same_num_producer_consumer_tablets_
133
    GUARDED_BY(master_data_mutex_);
134
135
  scoped_refptr<Thread> run_trigger_poll_thread_;
136
137
  std::unordered_map<cdc::ProducerTabletInfo, std::shared_ptr<CDCPoller>,
138
                     cdc::ProducerTabletInfo::Hash> producer_pollers_map_
139
                     GUARDED_BY(producer_pollers_map_mutex_);
140
141
  std::unique_ptr<ThreadPool> thread_pool_;
142
  std::unique_ptr<rpc::Rpcs> rpcs_;
143
144
  std::string log_prefix_;
145
  std::shared_ptr<CDCClient> local_client_;
146
147
  // map: {universe_uuid : ...}.
148
  std::unordered_map<std::string, std::shared_ptr<CDCClient>> remote_clients_
149
    GUARDED_BY(producer_pollers_map_mutex_);
150
  std::unordered_map<std::string, std::string> uuid_master_addrs_
151
    GUARDED_BY(master_data_mutex_);
152
  std::unordered_set<std::string> changed_master_addrs_ GUARDED_BY(master_data_mutex_);
153
154
  bool should_run_ = true;
155
156
  std::atomic<int32_t> cluster_config_version_ GUARDED_BY(master_data_mutex_) = {-1};
157
158
  std::atomic<uint32_t> TEST_num_successful_write_rpcs {0};
159
};
160
161
} // namespace enterprise
162
} // namespace tserver
163
} // namespace yb
164
165
#endif // ENT_SRC_YB_TSERVER_CDC_CONSUMER_H