/Users/deen/code/yugabyte-db/ent/src/yb/cdc/cdc_service.h
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | |
13 | | #ifndef ENT_SRC_YB_CDC_CDC_SERVICE_H |
14 | | #define ENT_SRC_YB_CDC_CDC_SERVICE_H |
15 | | |
16 | | #include <memory> |
17 | | |
18 | | #include "yb/cdc/cdc_error.h" |
19 | | #include "yb/cdc/cdc_metrics.h" |
20 | | #include "yb/cdc/cdc_producer.h" |
21 | | #include "yb/cdc/cdc_service.proxy.h" |
22 | | #include "yb/cdc/cdc_service.service.h" |
23 | | #include "yb/cdc/cdc_util.h" |
24 | | #include "yb/client/async_initializer.h" |
25 | | |
26 | | #include "yb/common/schema.h" |
27 | | |
28 | | #include "yb/master/master_client.fwd.h" |
29 | | |
30 | | #include "yb/rpc/rpc.h" |
31 | | #include "yb/rpc/rpc_context.h" |
32 | | #include "yb/rpc/rpc_controller.h" |
33 | | |
34 | | #include "yb/util/net/net_util.h" |
35 | | #include "yb/util/service_util.h" |
36 | | |
37 | | namespace yb { |
38 | | |
39 | | namespace client { |
40 | | |
41 | | class TableHandle; |
42 | | |
43 | | } |
44 | | |
45 | | namespace tserver { |
46 | | |
47 | | class TSTabletManager; |
48 | | |
49 | | } |
50 | | |
51 | | namespace cdc { |
52 | | |
53 | | typedef std::unordered_map<HostPort, std::shared_ptr<CDCServiceProxy>, HostPortHash> |
54 | | CDCServiceProxyMap; |
55 | | |
56 | | static const char* const kRecordType = "record_type"; |
57 | | static const char* const kRecordFormat = "record_format"; |
58 | | static const char* const kRetentionSec = "retention_sec"; |
59 | | static const char* const kSourceType = "source_type"; |
60 | | static const char* const kCheckpointType = "checkpoint_type"; |
61 | | static const char* const kIdType = "id_type"; |
62 | | static const char* const kNamespaceId = "NAMESPACEID"; |
63 | | static const char* const kTableId = "TABLEID"; |
64 | | |
65 | | struct TabletCheckpoint { |
66 | | OpId op_id; |
67 | | // Timestamp at which the op ID was last updated. |
68 | | CoarseTimePoint last_update_time; |
69 | | |
70 | 8.44k | bool ExpiredAt(std::chrono::milliseconds duration, std::chrono::time_point<CoarseMonoClock> now) { |
71 | 8.44k | return (now - last_update_time) > duration; |
72 | 8.44k | } |
73 | | }; |
74 | | |
75 | | class CDCServiceImpl : public CDCServiceIf { |
76 | | public: |
77 | | CDCServiceImpl(tserver::TSTabletManager* tablet_manager, |
78 | | const scoped_refptr<MetricEntity>& metric_entity_server, |
79 | | MetricRegistry* metric_registry); |
80 | | |
81 | | CDCServiceImpl(const CDCServiceImpl&) = delete; |
82 | | void operator=(const CDCServiceImpl&) = delete; |
83 | | |
84 | | ~CDCServiceImpl(); |
85 | | |
86 | | void CreateCDCStream(const CreateCDCStreamRequestPB* req, |
87 | | CreateCDCStreamResponsePB* resp, |
88 | | rpc::RpcContext rpc) override; |
89 | | void DeleteCDCStream(const DeleteCDCStreamRequestPB *req, |
90 | | DeleteCDCStreamResponsePB* resp, |
91 | | rpc::RpcContext rpc) override; |
92 | | void ListTablets(const ListTabletsRequestPB *req, |
93 | | ListTabletsResponsePB* resp, |
94 | | rpc::RpcContext rpc) override; |
95 | | void GetChanges(const GetChangesRequestPB* req, |
96 | | GetChangesResponsePB* resp, |
97 | | rpc::RpcContext rpc) override; |
98 | | void GetCheckpoint(const GetCheckpointRequestPB* req, |
99 | | GetCheckpointResponsePB* resp, |
100 | | rpc::RpcContext rpc) override; |
101 | | |
102 | | // Update peers in other tablet servers about the latest minimum applied cdc index for a specific |
103 | | // tablet. |
104 | | void UpdateCdcReplicatedIndex(const UpdateCdcReplicatedIndexRequestPB* req, |
105 | | UpdateCdcReplicatedIndexResponsePB* resp, |
106 | | rpc::RpcContext rpc) override; |
107 | | |
108 | | void GetLatestEntryOpId(const GetLatestEntryOpIdRequestPB* req, |
109 | | GetLatestEntryOpIdResponsePB* resp, |
110 | | rpc::RpcContext context) override; |
111 | | |
112 | | void BootstrapProducer(const BootstrapProducerRequestPB* req, |
113 | | BootstrapProducerResponsePB* resp, |
114 | | rpc::RpcContext rpc) override; |
115 | | |
116 | | void GetCDCDBStreamInfo(const GetCDCDBStreamInfoRequestPB* req, |
117 | | GetCDCDBStreamInfoResponsePB* resp, |
118 | | rpc::RpcContext context) override; |
119 | | |
120 | | Result<SetCDCCheckpointResponsePB> SetCDCCheckpoint( |
121 | | const SetCDCCheckpointRequestPB& req, CoarseTimePoint deadline) override; |
122 | | |
123 | | void Shutdown() override; |
124 | | |
125 | | // Used in cdc_service-int-test.cc. |
126 | | std::shared_ptr<CDCTabletMetrics> GetCDCTabletMetrics( |
127 | | const ProducerTabletInfo& producer, |
128 | | std::shared_ptr<tablet::TabletPeer> tablet_peer = nullptr); |
129 | | |
130 | | void UpdateCDCTabletMetrics(const GetChangesResponsePB* resp, |
131 | | const ProducerTabletInfo& producer_tablet, |
132 | | const std::shared_ptr<tablet::TabletPeer>& tablet_peer, |
133 | | const OpId& op_id, |
134 | | int64_t last_readable_index); |
135 | | |
136 | 0 | std::shared_ptr<CDCServerMetrics> GetCDCServerMetrics() { |
137 | 0 | return server_metrics_; |
138 | 0 | } |
139 | | |
140 | | // Returns true if this server has received a GetChanges call. |
141 | | bool CDCEnabled(); |
142 | | |
143 | | private: |
144 | | FRIEND_TEST(CDCServiceTestMultipleServersOneTablet, TestMetricsAfterServerFailure); |
145 | | |
146 | | class Impl; |
147 | | |
148 | | template <class ReqType, class RespType> |
149 | | bool CheckOnline(const ReqType* req, RespType* resp, rpc::RpcContext* rpc); |
150 | | |
151 | | Result<OpId> GetLastCheckpoint(const ProducerTabletInfo& producer_tablet, |
152 | | const client::YBSessionPtr& session); |
153 | | |
154 | | Result<std::vector<pair<std::string, std::string>>> GetDBStreamInfo( |
155 | | const std::string& db_stream_id, |
156 | | const client::YBSessionPtr& session); |
157 | | |
158 | | Result<std::string> GetCdcStreamId(const ProducerTabletInfo& producer_tablet, |
159 | | const std::shared_ptr<client::YBSession>& session); |
160 | | |
161 | | CHECKED_STATUS UpdateCheckpoint(const ProducerTabletInfo& producer_tablet, |
162 | | const OpId& sent_op_id, |
163 | | const OpId& commit_op_id, |
164 | | const client::YBSessionPtr& session, |
165 | | uint64_t last_record_hybrid_time); |
166 | | |
167 | | Result<google::protobuf::RepeatedPtrField<master::TabletLocationsPB>> GetTablets( |
168 | | const CDCStreamId& stream_id); |
169 | | |
170 | | Status CreateCDCStreamForTable( |
171 | | const TableId& table_id, |
172 | | const std::unordered_map<std::string, std::string>& options, |
173 | | const CDCStreamId& stream_id); |
174 | | |
175 | | void RollbackPartialCreate(const CDCCreationState& creation_state); |
176 | | |
177 | | Result<NamespaceId> GetNamespaceId(const std::string& ns_name); |
178 | | |
179 | | Result<std::shared_ptr<StreamMetadata>> GetStream(const std::string& stream_id); |
180 | | |
181 | | std::shared_ptr<StreamMetadata> GetStreamMetadataFromCache(const std::string& stream_id); |
182 | | void AddStreamMetadataToCache(const std::string& stream_id, |
183 | | const std::shared_ptr<StreamMetadata>& stream_metadata); |
184 | | |
185 | | CHECKED_STATUS CheckTabletValidForStream(const ProducerTabletInfo& producer_info); |
186 | | |
187 | | void TabletLeaderGetChanges(const GetChangesRequestPB* req, |
188 | | GetChangesResponsePB* resp, |
189 | | std::shared_ptr<rpc::RpcContext> context, |
190 | | std::shared_ptr<tablet::TabletPeer> peer); |
191 | | |
192 | | void TabletLeaderGetCheckpoint(const GetCheckpointRequestPB* req, |
193 | | GetCheckpointResponsePB* resp, |
194 | | rpc::RpcContext* context, |
195 | | const std::shared_ptr<tablet::TabletPeer>& peer); |
196 | | |
197 | | Result<OpId> TabletLeaderLatestEntryOpId(const TabletId& tablet_id); |
198 | | |
199 | | Result<client::internal::RemoteTabletPtr> GetRemoteTablet(const TabletId& tablet_id); |
200 | | Result<client::internal::RemoteTabletServer *> GetLeaderTServer(const TabletId& tablet_id); |
201 | | CHECKED_STATUS GetTServers(const TabletId& tablet_id, |
202 | | std::vector<client::internal::RemoteTabletServer*>* servers); |
203 | | |
204 | | std::shared_ptr<CDCServiceProxy> GetCDCServiceProxy(client::internal::RemoteTabletServer* ts); |
205 | | |
206 | | OpId GetMinSentCheckpointForTablet(const std::string& tablet_id); |
207 | | |
208 | | std::shared_ptr<MemTracker> GetMemTracker( |
209 | | const std::shared_ptr<tablet::TabletPeer>& tablet_peer, |
210 | | const ProducerTabletInfo& producer_info); |
211 | | |
212 | | OpId GetMinAppliedCheckpointForTablet(const std::string& tablet_id, |
213 | | const client::YBSessionPtr& session); |
214 | | |
215 | | CHECKED_STATUS UpdatePeersCdcMinReplicatedIndex(const TabletId& tablet_id, int64_t min_index, |
216 | | int64_t min_term = -1); |
217 | | |
218 | | void ComputeLagMetric(int64_t last_replicated_micros, int64_t metric_last_timestamp_micros, |
219 | | int64_t cdc_state_last_replication_time_micros, |
220 | | scoped_refptr<AtomicGauge<int64_t>> metric); |
221 | | |
222 | | // Update metrics async_replication_sent_lag_micros and async_replication_committed_lag_micros. |
223 | | // Called periodically default 1s. |
224 | | void UpdateLagMetrics(); |
225 | | |
226 | | // This method is used to read the cdc_state table to find the minimum replicated index for each |
227 | | // tablet and then update the peers' log objects. Also used to update lag metrics. |
228 | | void UpdatePeersAndMetrics(); |
229 | | |
230 | | MicrosTime GetLastReplicatedTime(const std::shared_ptr<tablet::TabletPeer>& tablet_peer); |
231 | | |
232 | | bool ShouldUpdateLagMetrics(MonoTime time_since_update_metrics); |
233 | | |
234 | | Result<std::shared_ptr<client::TableHandle>> GetCdcStateTable() EXCLUDES(mutex_); |
235 | | |
236 | | void RefreshCdcStateTable() EXCLUDES(mutex_); |
237 | | |
238 | | Status RefreshCacheOnFail(const Status& s) EXCLUDES(mutex_); |
239 | | |
240 | | client::YBClient* client(); |
241 | | |
242 | | void CreateEntryInCdcStateTable( |
243 | | const std::shared_ptr<client::TableHandle>& cdc_state_table, |
244 | | std::vector<ProducerTabletInfo>* producer_entries_modified, |
245 | | std::vector<client::YBOperationPtr>* ops, |
246 | | const CDCStreamId& stream_id, |
247 | | const TableId& table_id, |
248 | | const TabletId& tablet_id); |
249 | | |
250 | | Status CreateCDCStreamForNamespace( |
251 | | const CreateCDCStreamRequestPB* req, |
252 | | CreateCDCStreamResponsePB* resp, |
253 | | CoarseTimePoint deadline); |
254 | | |
255 | | rpc::Rpcs rpcs_; |
256 | | |
257 | | tserver::TSTabletManager* tablet_manager_; |
258 | | |
259 | | MetricRegistry* metric_registry_; |
260 | | |
261 | | std::shared_ptr<CDCServerMetrics> server_metrics_; |
262 | | |
263 | | // Used to protect tablet_checkpoints_ and stream_metadata_ maps. |
264 | | mutable rw_spinlock mutex_; |
265 | | |
266 | | std::unique_ptr<Impl> impl_; |
267 | | |
268 | | std::shared_ptr<client::TableHandle> cdc_state_table_ GUARDED_BY(mutex_); |
269 | | |
270 | | std::unordered_map<std::string, std::shared_ptr<StreamMetadata>> stream_metadata_ |
271 | | GUARDED_BY(mutex_); |
272 | | |
273 | | // Map of HostPort -> CDCServiceProxy. This is used to redirect requests to tablet leader's |
274 | | // CDC service proxy. |
275 | | CDCServiceProxyMap cdc_service_map_ GUARDED_BY(mutex_); |
276 | | |
277 | | // Thread with a few functions: |
278 | | // |
279 | | // Read the cdc_state table and get the minimum checkpoint for each tablet |
280 | | // and then, for each tablet this tserver is a leader, update the log minimum cdc replicated |
281 | | // index so we can use this information to correctly keep log files that are needed so we |
282 | | // can continue replicating cdc records. This runs periodically to handle |
283 | | // leadership changes (FLAGS_update_min_cdc_indices_interval_secs). |
284 | | // TODO(hector): It would be better to do this update only when a local peer becomes a leader. |
285 | | // |
286 | | // Periodically update lag metrics (FLAGS_update_metrics_interval_ms). |
287 | | std::unique_ptr<std::thread> update_peers_and_metrics_thread_; |
288 | | |
289 | | // True when this service is stopped. Used to inform |
290 | | // get_minimum_checkpoints_and_update_peers_thread_ that it should exit. |
291 | | bool cdc_service_stopped_ GUARDED_BY(mutex_){false}; |
292 | | |
293 | | // True when this service has received a GetChanges request on a valid replication stream. |
294 | | std::atomic<bool> cdc_enabled_{false}; |
295 | | }; |
296 | | |
297 | | } // namespace cdc |
298 | | } // namespace yb |
299 | | |
300 | | #endif // ENT_SRC_YB_CDC_CDC_SERVICE_H |